• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17830307614

18 Sep 2025 01:29PM UTC coverage: 54.617% (-12.0%) from 66.637%
17830307614

Pull #10200

github

web-flow
Merge 181a0a7bc into b34fc964b
Pull Request #10200: github: change to form-based issue template

109249 of 200028 relevant lines covered (54.62%)

21896.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.22
/payments/db/kv_store.go
1
package paymentsdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "sort"
12
        "sync"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcec/v2"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/kvdb"
19
        "github.com/lightningnetwork/lnd/lntypes"
20
        "github.com/lightningnetwork/lnd/lnwire"
21
        "github.com/lightningnetwork/lnd/record"
22
        "github.com/lightningnetwork/lnd/routing/route"
23
        "github.com/lightningnetwork/lnd/tlv"
24
)
25

26
const (
27
        // paymentSeqBlockSize is the block size used when we batch allocate
28
        // payment sequences for future payments.
29
        paymentSeqBlockSize = 1000
30

31
        // paymentProgressLogInterval is the interval we use limiting the
32
        // logging output of payment processing.
33
        paymentProgressLogInterval = 30 * time.Second
34
)
35

36
//nolint:ll
37
var (
38
        // paymentsRootBucket is the name of the top-level bucket within the
39
        // database that stores all data related to payments. Within this
40
        // bucket, each payment hash its own sub-bucket keyed by its payment
41
        // hash.
42
        //
43
        // Bucket hierarchy:
44
        //
45
        // root-bucket
46
        //      |
47
        //      |-- <paymenthash>
48
        //      |        |--sequence-key: <sequence number>
49
        //      |        |--creation-info-key: <creation info>
50
        //      |        |--fail-info-key: <(optional) fail info>
51
        //      |        |
52
        //      |        |--payment-htlcs-bucket (shard-bucket)
53
        //      |        |        |
54
        //      |        |        |-- ai<htlc attempt ID>: <htlc attempt info>
55
        //      |        |        |-- si<htlc attempt ID>: <(optional) settle info>
56
        //      |        |        |-- fi<htlc attempt ID>: <(optional) fail info>
57
        //      |        |        |
58
        //      |        |       ...
59
        //      |        |
60
        //      |        |
61
        //      |        |--duplicate-bucket (only for old, completed payments)
62
        //      |                 |
63
        //      |                 |-- <seq-num>
64
        //      |                 |       |--sequence-key: <sequence number>
65
        //      |                 |       |--creation-info-key: <creation info>
66
        //      |                 |       |--ai: <attempt info>
67
        //      |                 |       |--si: <settle info>
68
        //      |                 |       |--fi: <fail info>
69
        //      |                 |
70
        //      |                 |-- <seq-num>
71
        //      |                 |       |
72
        //      |                ...     ...
73
        //      |
74
        //      |-- <paymenthash>
75
        //      |        |
76
        //      |       ...
77
        //     ...
78
        //
79
        paymentsRootBucket = []byte("payments-root-bucket")
80

81
        // paymentSequenceKey is a key used in the payment's sub-bucket to
82
        // store the sequence number of the payment.
83
        paymentSequenceKey = []byte("payment-sequence-key")
84

85
        // paymentCreationInfoKey is a key used in the payment's sub-bucket to
86
        // store the creation info of the payment.
87
        paymentCreationInfoKey = []byte("payment-creation-info")
88

89
        // paymentHtlcsBucket is a bucket where we'll store the information
90
        // about the HTLCs that were attempted for a payment.
91
        paymentHtlcsBucket = []byte("payment-htlcs-bucket")
92

93
        // htlcAttemptInfoKey is the key used as the prefix of an HTLC attempt
94
        // to store the info about the attempt that was done for the HTLC in
95
        // question. The HTLC attempt ID is concatenated at the end.
96
        htlcAttemptInfoKey = []byte("ai")
97

98
        // htlcSettleInfoKey is the key used as the prefix of an HTLC attempt
99
        // settle info, if any. The HTLC attempt ID is concatenated at the end.
100
        htlcSettleInfoKey = []byte("si")
101

102
        // htlcFailInfoKey is the key used as the prefix of an HTLC attempt
103
        // failure information, if any.The  HTLC attempt ID is concatenated at
104
        // the end.
105
        htlcFailInfoKey = []byte("fi")
106

107
        // paymentFailInfoKey is a key used in the payment's sub-bucket to
108
        // store information about the reason a payment failed.
109
        paymentFailInfoKey = []byte("payment-fail-info")
110

111
        // paymentsIndexBucket is the name of the top-level bucket within the
112
        // database that stores an index of payment sequence numbers to its
113
        // payment hash.
114
        // payments-sequence-index-bucket
115
        //         |--<sequence-number>: <payment hash>
116
        //         |--...
117
        //         |--<sequence-number>: <payment hash>
118
        paymentsIndexBucket = []byte("payments-index-bucket")
119
)
120

121
// KVStore implements persistence for payments and payment attempts.
122
type KVStore struct {
123
        // Sequence management for the kv store.
124
        seqMu     sync.Mutex
125
        currSeq   uint64
126
        storedSeq uint64
127

128
        // db is the underlying database implementation.
129
        db kvdb.Backend
130

131
        // keepFailedPaymentAttempts is a flag that indicates whether we should
132
        // keep failed payment attempts in the database.
133
        keepFailedPaymentAttempts bool
134
}
135

136
// A compile-time constraint to ensure KVStore implements DB.
137
var _ DB = (*KVStore)(nil)
138

139
// NewKVStore creates a new KVStore for payments.
140
func NewKVStore(db kvdb.Backend,
141
        options ...OptionModifier) (*KVStore, error) {
60✔
142

60✔
143
        opts := DefaultOptions()
60✔
144
        for _, applyOption := range options {
70✔
145
                applyOption(opts)
10✔
146
        }
10✔
147

148
        if !opts.NoMigration {
120✔
149
                if err := initKVStore(db); err != nil {
60✔
150
                        return nil, err
×
151
                }
×
152
        }
153

154
        return &KVStore{
60✔
155
                db:                        db,
60✔
156
                keepFailedPaymentAttempts: opts.KeepFailedPaymentAttempts,
60✔
157
        }, nil
60✔
158
}
159

160
// paymentsTopLevelBuckets is a list of top-level buckets that are used for
161
// the payments database when using the kv store.
162
var paymentsTopLevelBuckets = [][]byte{
163
        paymentsRootBucket,
164
        paymentsIndexBucket,
165
}
166

167
// initKVStore creates and initializes the top-level buckets for the payment db.
168
func initKVStore(db kvdb.Backend) error {
60✔
169
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
120✔
170
                for _, tlb := range paymentsTopLevelBuckets {
180✔
171
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
120✔
172
                                return err
×
173
                        }
×
174
                }
175

176
                return nil
60✔
177
        }, func() {})
60✔
178
        if err != nil {
60✔
179
                return fmt.Errorf("unable to create new payments db: %w", err)
×
180
        }
×
181

182
        return nil
60✔
183
}
184

185
// InitPayment checks or records the given PaymentCreationInfo with the DB,
186
// making sure it does not already exist as an in-flight payment. When this
187
// method returns successfully, the payment is guaranteed to be in the InFlight
188
// state.
189
func (p *KVStore) InitPayment(paymentHash lntypes.Hash,
190
        info *PaymentCreationInfo) error {
149✔
191

149✔
192
        // Obtain a new sequence number for this payment. This is used
149✔
193
        // to sort the payments in order of creation, and also acts as
149✔
194
        // a unique identifier for each payment.
149✔
195
        sequenceNum, err := p.nextPaymentSequence()
149✔
196
        if err != nil {
149✔
197
                return err
×
198
        }
×
199

200
        var b bytes.Buffer
149✔
201
        if err := serializePaymentCreationInfo(&b, info); err != nil {
149✔
202
                return err
×
203
        }
×
204
        infoBytes := b.Bytes()
149✔
205

149✔
206
        var updateErr error
149✔
207
        err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
298✔
208
                // Reset the update error, to avoid carrying over an error
149✔
209
                // from a previous execution of the batched db transaction.
149✔
210
                updateErr = nil
149✔
211

149✔
212
                prefetchPayment(tx, paymentHash)
149✔
213
                bucket, err := createPaymentBucket(tx, paymentHash)
149✔
214
                if err != nil {
149✔
215
                        return err
×
216
                }
×
217

218
                // Get the existing status of this payment, if any.
219
                paymentStatus, err := fetchPaymentStatus(bucket)
149✔
220

149✔
221
                switch {
149✔
222
                // If no error is returned, it means we already have this
223
                // payment. We'll check the status to decide whether we allow
224
                // retrying the payment or return a specific error.
225
                case err == nil:
5✔
226
                        if err := paymentStatus.initializable(); err != nil {
9✔
227
                                updateErr = err
4✔
228
                                return nil
4✔
229
                        }
4✔
230

231
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
232
                // we'll return the error.
233
                case !errors.Is(err, ErrPaymentNotInitiated):
×
234
                        return err
×
235
                }
236

237
                // Before we set our new sequence number, we check whether this
238
                // payment has a previously set sequence number and remove its
239
                // index entry if it exists. This happens in the case where we
240
                // have a previously attempted payment which was left in a state
241
                // where we can retry.
242
                seqBytes := bucket.Get(paymentSequenceKey)
145✔
243
                if seqBytes != nil {
146✔
244
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
1✔
245
                        if err := indexBucket.Delete(seqBytes); err != nil {
1✔
246
                                return err
×
247
                        }
×
248
                }
249

250
                // Once we have obtained a sequence number, we add an entry
251
                // to our index bucket which will map the sequence number to
252
                // our payment identifier.
253
                err = createPaymentIndexEntry(
145✔
254
                        tx, sequenceNum, info.PaymentIdentifier,
145✔
255
                )
145✔
256
                if err != nil {
145✔
257
                        return err
×
258
                }
×
259

260
                err = bucket.Put(paymentSequenceKey, sequenceNum)
145✔
261
                if err != nil {
145✔
262
                        return err
×
263
                }
×
264

265
                // Add the payment info to the bucket, which contains the
266
                // static information for this payment
267
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
145✔
268
                if err != nil {
145✔
269
                        return err
×
270
                }
×
271

272
                // We'll delete any lingering HTLCs to start with, in case we
273
                // are initializing a payment that was attempted earlier, but
274
                // left in a state where we could retry.
275
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
145✔
276
                if err != nil && !errors.Is(err, kvdb.ErrBucketNotFound) {
145✔
277
                        return err
×
278
                }
×
279

280
                // Also delete any lingering failure info now that we are
281
                // re-attempting.
282
                return bucket.Delete(paymentFailInfoKey)
145✔
283
        })
284
        if err != nil {
149✔
285
                return fmt.Errorf("unable to init payment: %w", err)
×
286
        }
×
287

288
        return updateErr
149✔
289
}
290

291
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
292
// by the KVStore db.
293
func (p *KVStore) DeleteFailedAttempts(hash lntypes.Hash) error {
8✔
294
        if !p.keepFailedPaymentAttempts {
12✔
295
                const failedHtlcsOnly = true
4✔
296
                err := p.DeletePayment(hash, failedHtlcsOnly)
4✔
297
                if err != nil {
6✔
298
                        return err
2✔
299
                }
2✔
300
        }
301

302
        return nil
6✔
303
}
304

305
// paymentIndexTypeHash is a payment index type which indicates that we have
306
// created an index of payment sequence number to payment hash.
307
type paymentIndexType uint8
308

309
// paymentIndexTypeHash is a payment index type which indicates that we have
310
// created an index of payment sequence number to payment hash.
311
const paymentIndexTypeHash paymentIndexType = 0
312

313
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
314
// index produced contains a payment index type (which can be used in future to
315
// signal different payment index types) and the payment identifier.
316
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
317
        id lntypes.Hash) error {
166✔
318

166✔
319
        var b bytes.Buffer
166✔
320
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
166✔
321
                return err
×
322
        }
×
323

324
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
166✔
325

166✔
326
        return indexes.Put(sequenceNumber, b.Bytes())
166✔
327
}
328

329
// deserializePaymentIndex deserializes a payment index entry. This function
330
// currently only supports deserialization of payment hash indexes, and will
331
// fail for other types.
332
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
113✔
333
        var (
113✔
334
                indexType   paymentIndexType
113✔
335
                paymentHash []byte
113✔
336
        )
113✔
337

113✔
338
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
113✔
339
                return lntypes.Hash{}, err
×
340
        }
×
341

342
        // While we only have on payment index type, we do not need to use our
343
        // index type to deserialize the index. However, we sanity check that
344
        // this type is as expected, since we had to read it out anyway.
345
        if indexType != paymentIndexTypeHash {
113✔
346
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
347
                        "type: %v", indexType)
×
348
        }
×
349

350
        hash, err := lntypes.MakeHash(paymentHash)
113✔
351
        if err != nil {
113✔
352
                return lntypes.Hash{}, err
×
353
        }
×
354

355
        return hash, nil
113✔
356
}
357

358
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
359
// DB.
360
func (p *KVStore) RegisterAttempt(paymentHash lntypes.Hash,
361
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
70✔
362

70✔
363
        // Serialize the information before opening the db transaction.
70✔
364
        var a bytes.Buffer
70✔
365
        err := serializeHTLCAttemptInfo(&a, attempt)
70✔
366
        if err != nil {
70✔
367
                return nil, err
×
368
        }
×
369
        htlcInfoBytes := a.Bytes()
70✔
370

70✔
371
        htlcIDBytes := make([]byte, 8)
70✔
372
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
70✔
373

70✔
374
        var payment *MPPayment
70✔
375
        err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
156✔
376
                prefetchPayment(tx, paymentHash)
86✔
377
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
86✔
378
                if err != nil {
86✔
379
                        return err
×
380
                }
×
381

382
                payment, err = fetchPayment(bucket)
86✔
383
                if err != nil {
86✔
384
                        return err
×
385
                }
×
386

387
                // Check if registering a new attempt is allowed.
388
                if err := payment.Registrable(); err != nil {
102✔
389
                        return err
16✔
390
                }
16✔
391

392
                // Verify the attempt is compatible with the existing payment.
393
                if err := verifyAttempt(payment, attempt); err != nil {
86✔
394
                        return err
16✔
395
                }
16✔
396

397
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
54✔
398
                        paymentHtlcsBucket,
54✔
399
                )
54✔
400
                if err != nil {
54✔
401
                        return err
×
402
                }
×
403

404
                err = htlcsBucket.Put(
54✔
405
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
54✔
406
                        htlcInfoBytes,
54✔
407
                )
54✔
408
                if err != nil {
54✔
409
                        return err
×
410
                }
×
411

412
                // Retrieve attempt info for the notification.
413
                payment, err = fetchPayment(bucket)
54✔
414

54✔
415
                return err
54✔
416
        })
417
        if err != nil {
86✔
418
                return nil, err
16✔
419
        }
16✔
420

421
        return payment, err
54✔
422
}
423

424
// SettleAttempt marks the given attempt settled with the preimage. If this is
425
// a multi shard payment, this might implicitly mean that the full payment
426
// succeeded.
427
//
428
// After invoking this method, InitPayment should always return an error to
429
// prevent us from making duplicate payments to the same payment hash. The
430
// provided preimage is atomically saved to the DB for record keeping.
431
func (p *KVStore) SettleAttempt(hash lntypes.Hash,
432
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
16✔
433

16✔
434
        var b bytes.Buffer
16✔
435
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
16✔
436
                return nil, err
×
437
        }
×
438
        settleBytes := b.Bytes()
16✔
439

16✔
440
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
16✔
441
}
442

443
// FailAttempt marks the given payment attempt failed.
444
func (p *KVStore) FailAttempt(hash lntypes.Hash,
445
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
31✔
446

31✔
447
        var b bytes.Buffer
31✔
448
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
31✔
449
                return nil, err
×
450
        }
×
451
        failBytes := b.Bytes()
31✔
452

31✔
453
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
31✔
454
}
455

456
// updateHtlcKey updates a database key for the specified htlc.
457
func (p *KVStore) updateHtlcKey(paymentHash lntypes.Hash,
458
        attemptID uint64, key, value []byte) (*MPPayment, error) {
47✔
459

47✔
460
        aid := make([]byte, 8)
47✔
461
        binary.BigEndian.PutUint64(aid, attemptID)
47✔
462

47✔
463
        var payment *MPPayment
47✔
464
        err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
95✔
465
                payment = nil
48✔
466

48✔
467
                prefetchPayment(tx, paymentHash)
48✔
468
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
48✔
469
                if err != nil {
50✔
470
                        return err
2✔
471
                }
2✔
472

473
                p, err := fetchPayment(bucket)
46✔
474
                if err != nil {
46✔
475
                        return err
×
476
                }
×
477

478
                // We can only update keys of in-flight payments. We allow
479
                // updating keys even if the payment has reached a terminal
480
                // condition, since the HTLC outcomes must still be updated.
481
                if err := p.Status.updatable(); err != nil {
46✔
482
                        return err
×
483
                }
×
484

485
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
46✔
486
                if htlcsBucket == nil {
46✔
487
                        return fmt.Errorf("htlcs bucket not found")
×
488
                }
×
489

490
                attemptKey := htlcBucketKey(htlcAttemptInfoKey, aid)
46✔
491
                if htlcsBucket.Get(attemptKey) == nil {
46✔
492
                        return fmt.Errorf("HTLC with ID %v not registered",
×
493
                                attemptID)
×
494
                }
×
495

496
                // Make sure the shard is not already failed or settled.
497
                failKey := htlcBucketKey(htlcFailInfoKey, aid)
46✔
498
                if htlcsBucket.Get(failKey) != nil {
46✔
499
                        return ErrAttemptAlreadyFailed
×
500
                }
×
501

502
                settleKey := htlcBucketKey(htlcSettleInfoKey, aid)
46✔
503
                if htlcsBucket.Get(settleKey) != nil {
46✔
504
                        return ErrAttemptAlreadySettled
×
505
                }
×
506

507
                // Add or update the key for this htlc.
508
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
46✔
509
                if err != nil {
46✔
510
                        return err
×
511
                }
×
512

513
                // Retrieve attempt info for the notification.
514
                payment, err = fetchPayment(bucket)
46✔
515

46✔
516
                return err
46✔
517
        })
518
        if err != nil {
48✔
519
                return nil, err
1✔
520
        }
1✔
521

522
        return payment, err
46✔
523
}
524

525
// Fail transitions a payment into the Failed state, and records the reason the
526
// payment failed. After invoking this method, InitPayment should return nil on
527
// its next call for this payment hash, allowing the switch to make a
528
// subsequent payment.
529
func (p *KVStore) Fail(paymentHash lntypes.Hash,
530
        reason FailureReason) (*MPPayment, error) {
16✔
531

16✔
532
        var (
16✔
533
                updateErr error
16✔
534
                payment   *MPPayment
16✔
535
        )
16✔
536
        err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
32✔
537
                // Reset the update error, to avoid carrying over an error
16✔
538
                // from a previous execution of the batched db transaction.
16✔
539
                updateErr = nil
16✔
540
                payment = nil
16✔
541

16✔
542
                prefetchPayment(tx, paymentHash)
16✔
543
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
16✔
544
                if errors.Is(err, ErrPaymentNotInitiated) {
17✔
545
                        updateErr = ErrPaymentNotInitiated
1✔
546
                        return nil
1✔
547
                } else if err != nil {
16✔
548
                        return err
×
549
                }
×
550

551
                // We mark the payment as failed as long as it is known. This
552
                // lets the last attempt to fail with a terminal write its
553
                // failure to the KVStore without synchronizing with
554
                // other attempts.
555
                _, err = fetchPaymentStatus(bucket)
15✔
556
                if errors.Is(err, ErrPaymentNotInitiated) {
15✔
557
                        updateErr = ErrPaymentNotInitiated
×
558
                        return nil
×
559
                } else if err != nil {
15✔
560
                        return err
×
561
                }
×
562

563
                // Put the failure reason in the bucket for record keeping.
564
                v := []byte{byte(reason)}
15✔
565
                err = bucket.Put(paymentFailInfoKey, v)
15✔
566
                if err != nil {
15✔
567
                        return err
×
568
                }
×
569

570
                // Retrieve attempt info for the notification, if available.
571
                payment, err = fetchPayment(bucket)
15✔
572
                if err != nil {
15✔
573
                        return err
×
574
                }
×
575

576
                return nil
15✔
577
        })
578
        if err != nil {
16✔
579
                return nil, err
×
580
        }
×
581

582
        return payment, updateErr
16✔
583
}
584

585
// FetchPayment returns information about a payment from the database.
586
func (p *KVStore) FetchPayment(paymentHash lntypes.Hash) (
587
        *MPPayment, error) {
151✔
588

151✔
589
        var payment *MPPayment
151✔
590
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
302✔
591
                prefetchPayment(tx, paymentHash)
151✔
592
                bucket, err := fetchPaymentBucket(tx, paymentHash)
151✔
593
                if err != nil {
152✔
594
                        return err
1✔
595
                }
1✔
596

597
                payment, err = fetchPayment(bucket)
150✔
598

150✔
599
                return err
150✔
600
        }, func() {
151✔
601
                payment = nil
151✔
602
        })
151✔
603
        if err != nil {
152✔
604
                return nil, err
1✔
605
        }
1✔
606

607
        return payment, nil
150✔
608
}
609

610
// prefetchPayment attempts to prefetch as much of the payment as possible to
611
// reduce DB roundtrips.
612
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
450✔
613
        rb := kvdb.RootBucket(tx)
450✔
614
        kvdb.Prefetch(
450✔
615
                rb,
450✔
616
                []string{
450✔
617
                        // Prefetch all keys in the payment's bucket.
450✔
618
                        string(paymentsRootBucket),
450✔
619
                        string(paymentHash[:]),
450✔
620
                },
450✔
621
                []string{
450✔
622
                        // Prefetch all keys in the payment's htlc bucket.
450✔
623
                        string(paymentsRootBucket),
450✔
624
                        string(paymentHash[:]),
450✔
625
                        string(paymentHtlcsBucket),
450✔
626
                },
450✔
627
        )
450✔
628
}
450✔
629

630
// createPaymentBucket creates or fetches the sub-bucket assigned to this
631
// payment hash.
632
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
633
        kvdb.RwBucket, error) {
149✔
634

149✔
635
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
149✔
636
        if err != nil {
149✔
637
                return nil, err
×
638
        }
×
639

640
        return payments.CreateBucketIfNotExists(paymentHash[:])
149✔
641
}
642

643
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
644
// the bucket does not exist, it returns ErrPaymentNotInitiated.
645
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
646
        kvdb.RBucket, error) {
263✔
647

263✔
648
        payments := tx.ReadBucket(paymentsRootBucket)
263✔
649
        if payments == nil {
263✔
650
                return nil, ErrPaymentNotInitiated
×
651
        }
×
652

653
        bucket := payments.NestedReadBucket(paymentHash[:])
263✔
654
        if bucket == nil {
264✔
655
                return nil, ErrPaymentNotInitiated
1✔
656
        }
1✔
657

658
        return bucket, nil
262✔
659
}
660

661
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
662
// bucket that can be written to.
663
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
664
        kvdb.RwBucket, error) {
171✔
665

171✔
666
        payments := tx.ReadWriteBucket(paymentsRootBucket)
171✔
667
        if payments == nil {
171✔
668
                return nil, ErrPaymentNotInitiated
×
669
        }
×
670

671
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
171✔
672
        if bucket == nil {
174✔
673
                return nil, ErrPaymentNotInitiated
3✔
674
        }
3✔
675

676
        return bucket, nil
168✔
677
}
678

679
// nextPaymentSequence returns the next sequence number to store for a new
680
// payment.
681
func (p *KVStore) nextPaymentSequence() ([]byte, error) {
149✔
682
        p.seqMu.Lock()
149✔
683
        defer p.seqMu.Unlock()
149✔
684

149✔
685
        // Set a new upper bound in the DB every 1000 payments to avoid
149✔
686
        // conflicts on the sequence when using etcd.
149✔
687
        if p.currSeq == p.storedSeq {
188✔
688
                var currPaymentSeq, newUpperBound uint64
39✔
689
                if err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
78✔
690
                        paymentsBucket, err := tx.CreateTopLevelBucket(
39✔
691
                                paymentsRootBucket,
39✔
692
                        )
39✔
693
                        if err != nil {
39✔
694
                                return err
×
695
                        }
×
696

697
                        currPaymentSeq = paymentsBucket.Sequence()
39✔
698
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
39✔
699

39✔
700
                        return paymentsBucket.SetSequence(newUpperBound)
39✔
701
                }, func() {}); err != nil {
39✔
702
                        return nil, err
×
703
                }
×
704

705
                // We lazy initialize the cached currPaymentSeq here using the
706
                // first nextPaymentSequence() call. This if statement will auto
707
                // initialize our stored currPaymentSeq, since by default both
708
                // this variable and storedPaymentSeq are zero which in turn
709
                // will have us fetch the current values from the DB.
710
                if p.currSeq == 0 {
78✔
711
                        p.currSeq = currPaymentSeq
39✔
712
                }
39✔
713

714
                p.storedSeq = newUpperBound
39✔
715
        }
716

717
        p.currSeq++
149✔
718
        b := make([]byte, 8)
149✔
719
        binary.BigEndian.PutUint64(b, p.currSeq)
149✔
720

149✔
721
        return b, nil
149✔
722
}
723

724
// fetchPaymentStatus fetches the payment status of the payment. If the payment
725
// isn't found, it will return error `ErrPaymentNotInitiated`.
726
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
192✔
727
        // Creation info should be set for all payments, regardless of state.
192✔
728
        // If not, it is unknown.
192✔
729
        if bucket.Get(paymentCreationInfoKey) == nil {
336✔
730
                return 0, ErrPaymentNotInitiated
144✔
731
        }
144✔
732

733
        payment, err := fetchPayment(bucket)
48✔
734
        if err != nil {
48✔
735
                return 0, err
×
736
        }
×
737

738
        return payment.Status, nil
48✔
739
}
740

741
// FetchInFlightPayments returns all payments with status InFlight.
742
func (p *KVStore) FetchInFlightPayments() ([]*MPPayment, error) {
4✔
743
        var (
4✔
744
                inFlights      []*MPPayment
4✔
745
                start          = time.Now()
4✔
746
                lastLogTime    = time.Now()
4✔
747
                processedCount int
4✔
748
        )
4✔
749

4✔
750
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
8✔
751
                payments := tx.ReadBucket(paymentsRootBucket)
4✔
752
                if payments == nil {
4✔
753
                        return nil
×
754
                }
×
755

756
                return payments.ForEach(func(k, _ []byte) error {
6✔
757
                        bucket := payments.NestedReadBucket(k)
2✔
758
                        if bucket == nil {
2✔
759
                                return fmt.Errorf("non bucket element")
×
760
                        }
×
761

762
                        p, err := fetchPayment(bucket)
2✔
763
                        if err != nil {
2✔
764
                                return err
×
765
                        }
×
766

767
                        processedCount++
2✔
768
                        if time.Since(lastLogTime) >=
2✔
769
                                paymentProgressLogInterval {
2✔
770

×
771
                                log.Debugf("Scanning inflight payments "+
×
772
                                        "(in progress), processed %d, last "+
×
773
                                        "processed payment: %v", processedCount,
×
774
                                        p.Info)
×
775

×
776
                                lastLogTime = time.Now()
×
777
                        }
×
778

779
                        // Skip the payment if it's terminated.
780
                        if p.Terminated() {
2✔
781
                                return nil
×
782
                        }
×
783

784
                        inFlights = append(inFlights, p)
2✔
785

2✔
786
                        return nil
2✔
787
                })
788
        }, func() {
4✔
789
                inFlights = nil
4✔
790
        })
4✔
791
        if err != nil {
4✔
792
                return nil, err
×
793
        }
×
794

795
        elapsed := time.Since(start)
4✔
796
        log.Debugf("Completed scanning for inflight payments: "+
4✔
797
                "total_processed=%d, found_inflight=%d, elapsed=%v",
4✔
798
                processedCount, len(inFlights),
4✔
799
                elapsed.Round(time.Millisecond))
4✔
800

4✔
801
        return inFlights, nil
4✔
802
}
803

804
// htlcBucketKey creates a composite key from prefix and id where the result is
805
// simply the two concatenated.
806
func htlcBucketKey(prefix, id []byte) []byte {
265✔
807
        key := make([]byte, len(prefix)+len(id))
265✔
808
        copy(key, prefix)
265✔
809
        copy(key[len(prefix):], id)
265✔
810

265✔
811
        return key
265✔
812
}
265✔
813

814
// FetchPayments returns all sent payments found in the DB.
815
func (p *KVStore) FetchPayments() ([]*MPPayment, error) {
20✔
816
        var payments []*MPPayment
20✔
817

20✔
818
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
40✔
819
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
20✔
820
                if paymentsBucket == nil {
20✔
821
                        return nil
×
822
                }
×
823

824
                return paymentsBucket.ForEach(func(k, v []byte) error {
114✔
825
                        bucket := paymentsBucket.NestedReadBucket(k)
94✔
826
                        if bucket == nil {
94✔
827
                                // We only expect sub-buckets to be found in
×
828
                                // this top-level bucket.
×
829
                                return fmt.Errorf("non bucket element in " +
×
830
                                        "payments bucket")
×
831
                        }
×
832

833
                        p, err := fetchPayment(bucket)
94✔
834
                        if err != nil {
94✔
835
                                return err
×
836
                        }
×
837

838
                        payments = append(payments, p)
94✔
839

94✔
840
                        // For older versions of lnd, duplicate payments to a
94✔
841
                        // payment has was possible. These will be found in a
94✔
842
                        // sub-bucket indexed by their sequence number if
94✔
843
                        // available.
94✔
844
                        duplicatePayments, err := fetchDuplicatePayments(bucket)
94✔
845
                        if err != nil {
94✔
846
                                return err
×
847
                        }
×
848

849
                        payments = append(payments, duplicatePayments...)
94✔
850

94✔
851
                        return nil
94✔
852
                })
853
        }, func() {
20✔
854
                payments = nil
20✔
855
        })
20✔
856
        if err != nil {
20✔
857
                return nil, err
×
858
        }
×
859

860
        // Before returning, sort the payments by their sequence number.
861
        sort.Slice(payments, func(i, j int) bool {
219✔
862
                return payments[i].SequenceNum < payments[j].SequenceNum
199✔
863
        })
199✔
864

865
        return payments, nil
20✔
866
}
867

868
func fetchCreationInfo(bucket kvdb.RBucket) (*PaymentCreationInfo, error) {
640✔
869
        b := bucket.Get(paymentCreationInfoKey)
640✔
870
        if b == nil {
640✔
871
                return nil, fmt.Errorf("creation info not found")
×
872
        }
×
873

874
        r := bytes.NewReader(b)
640✔
875

640✔
876
        return deserializePaymentCreationInfo(r)
640✔
877
}
878

879
func fetchPayment(bucket kvdb.RBucket) (*MPPayment, error) {
640✔
880
        seqBytes := bucket.Get(paymentSequenceKey)
640✔
881
        if seqBytes == nil {
640✔
882
                return nil, fmt.Errorf("sequence number not found")
×
883
        }
×
884

885
        sequenceNum := binary.BigEndian.Uint64(seqBytes)
640✔
886

640✔
887
        // Get the PaymentCreationInfo.
640✔
888
        creationInfo, err := fetchCreationInfo(bucket)
640✔
889
        if err != nil {
640✔
890
                return nil, err
×
891
        }
×
892

893
        var htlcs []HTLCAttempt
640✔
894
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
640✔
895
        if htlcsBucket != nil {
1,026✔
896
                // Get the payment attempts. This can be empty.
386✔
897
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
386✔
898
                if err != nil {
386✔
899
                        return nil, err
×
900
                }
×
901
        }
902

903
        // Get failure reason if available.
904
        var failureReason *FailureReason
640✔
905
        b := bucket.Get(paymentFailInfoKey)
640✔
906
        if b != nil {
713✔
907
                reason := FailureReason(b[0])
73✔
908
                failureReason = &reason
73✔
909
        }
73✔
910

911
        // Create a new payment.
912
        payment := &MPPayment{
640✔
913
                SequenceNum:   sequenceNum,
640✔
914
                Info:          creationInfo,
640✔
915
                HTLCs:         htlcs,
640✔
916
                FailureReason: failureReason,
640✔
917
        }
640✔
918

640✔
919
        // Set its state and status.
640✔
920
        if err := payment.setState(); err != nil {
640✔
921
                return nil, err
×
922
        }
×
923

924
        return payment, nil
640✔
925
}
926

927
// fetchHtlcAttempts retrieves all htlc attempts made for the payment found in
928
// the given bucket.
929
func fetchHtlcAttempts(bucket kvdb.RBucket) ([]HTLCAttempt, error) {
393✔
930
        htlcsMap := make(map[uint64]*HTLCAttempt)
393✔
931

393✔
932
        attemptInfoCount := 0
393✔
933
        err := bucket.ForEach(func(k, v []byte) error {
1,488✔
934
                aid := byteOrder.Uint64(k[len(k)-8:])
1,095✔
935

1,095✔
936
                if _, ok := htlcsMap[aid]; !ok {
1,796✔
937
                        htlcsMap[aid] = &HTLCAttempt{}
701✔
938
                }
701✔
939

940
                var err error
1,095✔
941
                switch {
1,095✔
942
                case bytes.HasPrefix(k, htlcAttemptInfoKey):
701✔
943
                        attemptInfo, err := readHtlcAttemptInfo(v)
701✔
944
                        if err != nil {
701✔
945
                                return err
×
946
                        }
×
947

948
                        attemptInfo.AttemptID = aid
701✔
949
                        htlcsMap[aid].HTLCAttemptInfo = *attemptInfo
701✔
950
                        attemptInfoCount++
701✔
951

952
                case bytes.HasPrefix(k, htlcSettleInfoKey):
89✔
953
                        htlcsMap[aid].Settle, err = readHtlcSettleInfo(v)
89✔
954
                        if err != nil {
89✔
955
                                return err
×
956
                        }
×
957

958
                case bytes.HasPrefix(k, htlcFailInfoKey):
305✔
959
                        htlcsMap[aid].Failure, err = readHtlcFailInfo(v)
305✔
960
                        if err != nil {
305✔
961
                                return err
×
962
                        }
×
963

964
                default:
×
965
                        return fmt.Errorf("unknown htlc attempt key")
×
966
                }
967

968
                return nil
1,095✔
969
        })
970
        if err != nil {
393✔
971
                return nil, err
×
972
        }
×
973

974
        // Sanity check that all htlcs have an attempt info.
975
        if attemptInfoCount != len(htlcsMap) {
393✔
976
                return nil, ErrNoAttemptInfo
×
977
        }
×
978

979
        keys := make([]uint64, len(htlcsMap))
393✔
980
        i := 0
393✔
981
        for k := range htlcsMap {
1,094✔
982
                keys[i] = k
701✔
983
                i++
701✔
984
        }
701✔
985

986
        // Sort HTLC attempts by their attempt ID. This is needed because in the
987
        // DB we store the attempts with keys prefixed by their status which
988
        // changes order (groups them together by status).
989
        sort.Slice(keys, func(i, j int) bool {
733✔
990
                return keys[i] < keys[j]
340✔
991
        })
340✔
992

993
        htlcs := make([]HTLCAttempt, len(htlcsMap))
393✔
994
        for i, key := range keys {
1,094✔
995
                htlcs[i] = *htlcsMap[key]
701✔
996
        }
701✔
997

998
        return htlcs, nil
393✔
999
}
1000

1001
// readHtlcAttemptInfo reads the payment attempt info for this htlc.
1002
func readHtlcAttemptInfo(b []byte) (*HTLCAttemptInfo, error) {
701✔
1003
        r := bytes.NewReader(b)
701✔
1004
        return deserializeHTLCAttemptInfo(r)
701✔
1005
}
701✔
1006

1007
// readHtlcSettleInfo reads the settle info for the htlc. If the htlc isn't
1008
// settled, nil is returned.
1009
func readHtlcSettleInfo(b []byte) (*HTLCSettleInfo, error) {
89✔
1010
        r := bytes.NewReader(b)
89✔
1011
        return deserializeHTLCSettleInfo(r)
89✔
1012
}
89✔
1013

1014
// readHtlcFailInfo reads the failure info for the htlc. If the htlc hasn't
1015
// failed, nil is returned.
1016
func readHtlcFailInfo(b []byte) (*HTLCFailInfo, error) {
305✔
1017
        r := bytes.NewReader(b)
305✔
1018
        return deserializeHTLCFailInfo(r)
305✔
1019
}
305✔
1020

1021
// fetchFailedHtlcKeys retrieves the bucket keys of all failed HTLCs of a
1022
// payment bucket.
1023
func fetchFailedHtlcKeys(bucket kvdb.RBucket) ([][]byte, error) {
7✔
1024
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
7✔
1025

7✔
1026
        var htlcs []HTLCAttempt
7✔
1027
        var err error
7✔
1028
        if htlcsBucket != nil {
14✔
1029
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
7✔
1030
                if err != nil {
7✔
1031
                        return nil, err
×
1032
                }
×
1033
        }
1034

1035
        // Now iterate though them and save the bucket keys for the failed
1036
        // HTLCs.
1037
        var htlcKeys [][]byte
7✔
1038
        for _, h := range htlcs {
19✔
1039
                if h.Failure == nil {
15✔
1040
                        continue
3✔
1041
                }
1042

1043
                htlcKeyBytes := make([]byte, 8)
9✔
1044
                binary.BigEndian.PutUint64(htlcKeyBytes, h.AttemptID)
9✔
1045

9✔
1046
                htlcKeys = append(htlcKeys, htlcKeyBytes)
9✔
1047
        }
1048

1049
        return htlcKeys, nil
7✔
1050
}
1051

1052
// QueryPayments is a query to the payments database which is restricted
1053
// to a subset of payments by the payments query, containing an offset
1054
// index and a maximum number of returned payments.
1055
func (p *KVStore) QueryPayments(_ context.Context,
1056
        query Query) (Response, error) {
57✔
1057

57✔
1058
        var resp Response
57✔
1059

57✔
1060
        if err := kvdb.View(p.db, func(tx kvdb.RTx) error {
114✔
1061
                // Get the root payments bucket.
57✔
1062
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
57✔
1063
                if paymentsBucket == nil {
57✔
1064
                        return nil
×
1065
                }
×
1066

1067
                // Get the index bucket which maps sequence number -> payment
1068
                // hash and duplicate bool. If we have a payments bucket, we
1069
                // should have an indexes bucket as well.
1070
                indexes := tx.ReadBucket(paymentsIndexBucket)
57✔
1071
                if indexes == nil {
57✔
1072
                        return fmt.Errorf("index bucket does not exist")
×
1073
                }
×
1074

1075
                // accumulatePayments gets payments with the sequence number
1076
                // and hash provided and adds them to our list of payments if
1077
                // they meet the criteria of our query. It returns the number
1078
                // of payments that were added.
1079
                accumulatePayments := func(sequenceKey, hash []byte) (bool,
57✔
1080
                        error) {
163✔
1081

106✔
1082
                        r := bytes.NewReader(hash)
106✔
1083
                        paymentHash, err := deserializePaymentIndex(r)
106✔
1084
                        if err != nil {
106✔
1085
                                return false, err
×
1086
                        }
×
1087

1088
                        payment, err := fetchPaymentWithSequenceNumber(
106✔
1089
                                tx, paymentHash, sequenceKey,
106✔
1090
                        )
106✔
1091
                        if err != nil {
106✔
1092
                                return false, err
×
1093
                        }
×
1094

1095
                        // To keep compatibility with the old API, we only
1096
                        // return non-succeeded payments if requested.
1097
                        if payment.Status != StatusSucceeded &&
106✔
1098
                                !query.IncludeIncomplete {
111✔
1099

5✔
1100
                                return false, err
5✔
1101
                        }
5✔
1102

1103
                        // Get the creation time in Unix seconds, this always
1104
                        // rounds down the nanoseconds to full seconds.
1105
                        createTime := payment.Info.CreationTime.Unix()
101✔
1106

101✔
1107
                        // Skip any payments that were created before the
101✔
1108
                        // specified time.
101✔
1109
                        if createTime < query.CreationDateStart {
110✔
1110
                                return false, nil
9✔
1111
                        }
9✔
1112

1113
                        // Skip any payments that were created after the
1114
                        // specified time.
1115
                        if query.CreationDateEnd != 0 &&
92✔
1116
                                createTime > query.CreationDateEnd {
94✔
1117

2✔
1118
                                return false, nil
2✔
1119
                        }
2✔
1120

1121
                        // At this point, we've exhausted the offset, so we'll
1122
                        // begin collecting invoices found within the range.
1123
                        resp.Payments = append(resp.Payments, payment)
90✔
1124

90✔
1125
                        return true, nil
90✔
1126
                }
1127

1128
                // Create a paginator which reads from our sequence index bucket
1129
                // with the parameters provided by the payments query.
1130
                paginator := channeldb.NewPaginator(
57✔
1131
                        indexes.ReadCursor(), query.Reversed, query.IndexOffset,
57✔
1132
                        query.MaxPayments,
57✔
1133
                )
57✔
1134

57✔
1135
                // Run a paginated query, adding payments to our response.
57✔
1136
                if err := paginator.Query(accumulatePayments); err != nil {
57✔
1137
                        return err
×
1138
                }
×
1139

1140
                // Counting the total number of payments is expensive, since we
1141
                // literally have to traverse the cursor linearly, which can
1142
                // take quite a while. So it's an optional query parameter.
1143
                if query.CountTotal {
57✔
1144
                        var (
×
1145
                                totalPayments uint64
×
1146
                                err           error
×
1147
                        )
×
1148
                        countFn := func(_, _ []byte) error {
×
1149
                                totalPayments++
×
1150

×
1151
                                return nil
×
1152
                        }
×
1153

1154
                        // In non-boltdb database backends, there's a faster
1155
                        // ForAll query that allows for batch fetching items.
1156
                        fastBucket, ok := indexes.(kvdb.ExtendedRBucket)
×
1157
                        if ok {
×
1158
                                err = fastBucket.ForAll(countFn)
×
1159
                        } else {
×
1160
                                err = indexes.ForEach(countFn)
×
1161
                        }
×
1162
                        if err != nil {
×
1163
                                return fmt.Errorf("error counting payments: %w",
×
1164
                                        err)
×
1165
                        }
×
1166

1167
                        resp.TotalCount = totalPayments
×
1168
                }
1169

1170
                return nil
57✔
1171
        }, func() {
57✔
1172
                resp = Response{}
57✔
1173
        }); err != nil {
57✔
1174
                return resp, err
×
1175
        }
×
1176

1177
        // Need to swap the payments slice order if reversed order.
1178
        if query.Reversed {
73✔
1179
                for l, r := 0, len(resp.Payments)-1; l < r; l, r = l+1, r-1 {
24✔
1180
                        resp.Payments[l], resp.Payments[r] =
8✔
1181
                                resp.Payments[r], resp.Payments[l]
8✔
1182
                }
8✔
1183
        }
1184

1185
        // Set the first and last index of the returned payments so that the
1186
        // caller can resume from this point later on.
1187
        if len(resp.Payments) > 0 {
94✔
1188
                resp.FirstIndexOffset = resp.Payments[0].SequenceNum
37✔
1189
                resp.LastIndexOffset =
37✔
1190
                        resp.Payments[len(resp.Payments)-1].SequenceNum
37✔
1191
        }
37✔
1192

1193
        return resp, nil
57✔
1194
}
1195

1196
// fetchPaymentWithSequenceNumber get the payment which matches the payment hash
1197
// *and* sequence number provided from the database. This is required because
1198
// we previously had more than one payment per hash, so we have multiple indexes
1199
// pointing to a single payment; we want to retrieve the correct one.
1200
func fetchPaymentWithSequenceNumber(tx kvdb.RTx, paymentHash lntypes.Hash,
1201
        sequenceNumber []byte) (*MPPayment, error) {
112✔
1202

112✔
1203
        // We can now lookup the payment keyed by its hash in
112✔
1204
        // the payments root bucket.
112✔
1205
        bucket, err := fetchPaymentBucket(tx, paymentHash)
112✔
1206
        if err != nil {
112✔
1207
                return nil, err
×
1208
        }
×
1209

1210
        // A single payment hash can have multiple payments associated with it.
1211
        // We lookup our sequence number first, to determine whether this is
1212
        // the payment we are actually looking for.
1213
        seqBytes := bucket.Get(paymentSequenceKey)
112✔
1214
        if seqBytes == nil {
112✔
1215
                return nil, ErrNoSequenceNumber
×
1216
        }
×
1217

1218
        // If this top level payment has the sequence number we are looking for,
1219
        // return it.
1220
        if bytes.Equal(seqBytes, sequenceNumber) {
211✔
1221
                return fetchPayment(bucket)
99✔
1222
        }
99✔
1223

1224
        // If we were not looking for the top level payment, we are looking for
1225
        // one of our duplicate payments. We need to iterate through the seq
1226
        // numbers in this bucket to find the correct payments. If we do not
1227
        // find a duplicate payments bucket here, something is wrong.
1228
        dup := bucket.NestedReadBucket(duplicatePaymentsBucket)
13✔
1229
        if dup == nil {
14✔
1230
                return nil, ErrNoDuplicateBucket
1✔
1231
        }
1✔
1232

1233
        var duplicatePayment *MPPayment
12✔
1234
        err = dup.ForEach(func(k, v []byte) error {
27✔
1235
                subBucket := dup.NestedReadBucket(k)
15✔
1236
                if subBucket == nil {
15✔
1237
                        // We one bucket for each duplicate to be found.
×
1238
                        return ErrNoDuplicateNestedBucket
×
1239
                }
×
1240

1241
                seqBytes := subBucket.Get(duplicatePaymentSequenceKey)
15✔
1242
                if seqBytes == nil {
15✔
1243
                        return err
×
1244
                }
×
1245

1246
                // If this duplicate payment is not the sequence number we are
1247
                // looking for, we can continue.
1248
                if !bytes.Equal(seqBytes, sequenceNumber) {
19✔
1249
                        return nil
4✔
1250
                }
4✔
1251

1252
                duplicatePayment, err = fetchDuplicatePayment(subBucket)
11✔
1253
                if err != nil {
11✔
1254
                        return err
×
1255
                }
×
1256

1257
                return nil
11✔
1258
        })
1259
        if err != nil {
12✔
1260
                return nil, err
×
1261
        }
×
1262

1263
        // If none of the duplicate payments matched our sequence number, we
1264
        // failed to find the payment with this sequence number; something is
1265
        // wrong.
1266
        if duplicatePayment == nil {
13✔
1267
                return nil, ErrDuplicateNotFound
1✔
1268
        }
1✔
1269

1270
        return duplicatePayment, nil
11✔
1271
}
1272

1273
// DeletePayment deletes a payment from the DB given its payment hash. If
1274
// failedHtlcsOnly is set, only failed HTLC attempts of the payment will be
1275
// deleted.
1276
func (p *KVStore) DeletePayment(paymentHash lntypes.Hash,
1277
        failedHtlcsOnly bool) error {
11✔
1278

11✔
1279
        return kvdb.Update(p.db, func(tx kvdb.RwTx) error {
22✔
1280
                payments := tx.ReadWriteBucket(paymentsRootBucket)
11✔
1281
                if payments == nil {
11✔
1282
                        return nil
×
1283
                }
×
1284

1285
                bucket := payments.NestedReadWriteBucket(paymentHash[:])
11✔
1286
                if bucket == nil {
12✔
1287
                        return fmt.Errorf("non bucket element in payments " +
1✔
1288
                                "bucket")
1✔
1289
                }
1✔
1290

1291
                // If the status is InFlight, we cannot safely delete
1292
                // the payment information, so we return early.
1293
                paymentStatus, err := fetchPaymentStatus(bucket)
10✔
1294
                if err != nil {
10✔
1295
                        return err
×
1296
                }
×
1297

1298
                // If the payment has inflight HTLCs, we cannot safely delete
1299
                // the payment information, so we return an error.
1300
                if err := paymentStatus.removable(); err != nil {
13✔
1301
                        return fmt.Errorf("payment '%v' has inflight HTLCs"+
3✔
1302
                                "and therefore cannot be deleted: %w",
3✔
1303
                                paymentHash.String(), err)
3✔
1304
                }
3✔
1305

1306
                // Delete the failed HTLC attempts we found.
1307
                if failedHtlcsOnly {
11✔
1308
                        toDelete, err := fetchFailedHtlcKeys(bucket)
4✔
1309
                        if err != nil {
4✔
1310
                                return err
×
1311
                        }
×
1312

1313
                        htlcsBucket := bucket.NestedReadWriteBucket(
4✔
1314
                                paymentHtlcsBucket,
4✔
1315
                        )
4✔
1316

4✔
1317
                        for _, htlcID := range toDelete {
10✔
1318
                                err = htlcsBucket.Delete(
6✔
1319
                                        htlcBucketKey(
6✔
1320
                                                htlcAttemptInfoKey, htlcID,
6✔
1321
                                        ),
6✔
1322
                                )
6✔
1323
                                if err != nil {
6✔
1324
                                        return err
×
1325
                                }
×
1326

1327
                                err = htlcsBucket.Delete(
6✔
1328
                                        htlcBucketKey(htlcFailInfoKey, htlcID),
6✔
1329
                                )
6✔
1330
                                if err != nil {
6✔
1331
                                        return err
×
1332
                                }
×
1333

1334
                                err = htlcsBucket.Delete(
6✔
1335
                                        htlcBucketKey(
6✔
1336
                                                htlcSettleInfoKey, htlcID,
6✔
1337
                                        ),
6✔
1338
                                )
6✔
1339
                                if err != nil {
6✔
1340
                                        return err
×
1341
                                }
×
1342
                        }
1343

1344
                        return nil
4✔
1345
                }
1346

1347
                seqNrs, err := fetchSequenceNumbers(bucket)
3✔
1348
                if err != nil {
3✔
1349
                        return err
×
1350
                }
×
1351

1352
                err = payments.DeleteNestedBucket(paymentHash[:])
3✔
1353
                if err != nil {
3✔
1354
                        return err
×
1355
                }
×
1356

1357
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
1358
                for _, k := range seqNrs {
6✔
1359
                        if err := indexBucket.Delete(k); err != nil {
3✔
1360
                                return err
×
1361
                        }
×
1362
                }
1363

1364
                return nil
3✔
1365
        }, func() {})
11✔
1366
}
1367

1368
// DeletePayments deletes all completed and failed payments from the DB. If
1369
// failedOnly is set, only failed payments will be considered for deletion. If
1370
// failedHtlcsOnly is set, the payment itself won't be deleted, only failed HTLC
1371
// attempts. The method returns the number of deleted payments, which is always
1372
// 0 if failedHtlcsOnly is set.
1373
func (p *KVStore) DeletePayments(failedOnly,
1374
        failedHtlcsOnly bool) (int, error) {
6✔
1375

6✔
1376
        var numPayments int
6✔
1377
        err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
12✔
1378
                payments := tx.ReadWriteBucket(paymentsRootBucket)
6✔
1379
                if payments == nil {
6✔
1380
                        return nil
×
1381
                }
×
1382

1383
                var (
6✔
1384
                        // deleteBuckets is the set of payment buckets we need
6✔
1385
                        // to delete.
6✔
1386
                        deleteBuckets [][]byte
6✔
1387

6✔
1388
                        // deleteIndexes is the set of indexes pointing to these
6✔
1389
                        // payments that need to be deleted.
6✔
1390
                        deleteIndexes [][]byte
6✔
1391

6✔
1392
                        // deleteHtlcs maps a payment hash to the HTLC IDs we
6✔
1393
                        // want to delete for that payment.
6✔
1394
                        deleteHtlcs = make(map[lntypes.Hash][][]byte)
6✔
1395
                )
6✔
1396
                err := payments.ForEach(func(k, _ []byte) error {
24✔
1397
                        bucket := payments.NestedReadBucket(k)
18✔
1398
                        if bucket == nil {
18✔
1399
                                // We only expect sub-buckets to be found in
×
1400
                                // this top-level bucket.
×
1401
                                return fmt.Errorf("non bucket element in " +
×
1402
                                        "payments bucket")
×
1403
                        }
×
1404

1405
                        // If the status is InFlight, we cannot safely delete
1406
                        // the payment information, so we return early.
1407
                        paymentStatus, err := fetchPaymentStatus(bucket)
18✔
1408
                        if err != nil {
18✔
1409
                                return err
×
1410
                        }
×
1411

1412
                        // If the payment has inflight HTLCs, we cannot safely
1413
                        // delete the payment information, so we return an nil
1414
                        // to skip it.
1415
                        if err := paymentStatus.removable(); err != nil {
24✔
1416
                                return nil
6✔
1417
                        }
6✔
1418

1419
                        // If we requested to only delete failed payments, we
1420
                        // can return if this one is not.
1421
                        if failedOnly && paymentStatus != StatusFailed {
16✔
1422
                                return nil
4✔
1423
                        }
4✔
1424

1425
                        // If we are only deleting failed HTLCs, fetch them.
1426
                        if failedHtlcsOnly {
11✔
1427
                                toDelete, err := fetchFailedHtlcKeys(bucket)
3✔
1428
                                if err != nil {
3✔
1429
                                        return err
×
1430
                                }
×
1431

1432
                                hash, err := lntypes.MakeHash(k)
3✔
1433
                                if err != nil {
3✔
1434
                                        return err
×
1435
                                }
×
1436

1437
                                deleteHtlcs[hash] = toDelete
3✔
1438

3✔
1439
                                // We return, we are only deleting attempts.
3✔
1440
                                return nil
3✔
1441
                        }
1442

1443
                        // Add the bucket to the set of buckets we can delete.
1444
                        deleteBuckets = append(deleteBuckets, k)
5✔
1445

5✔
1446
                        // Get all the sequence number associated with the
5✔
1447
                        // payment, including duplicates.
5✔
1448
                        seqNrs, err := fetchSequenceNumbers(bucket)
5✔
1449
                        if err != nil {
5✔
1450
                                return err
×
1451
                        }
×
1452

1453
                        deleteIndexes = append(deleteIndexes, seqNrs...)
5✔
1454
                        numPayments++
5✔
1455

5✔
1456
                        return nil
5✔
1457
                })
1458
                if err != nil {
6✔
1459
                        return err
×
1460
                }
×
1461

1462
                // Delete the failed HTLC attempts we found.
1463
                for hash, htlcIDs := range deleteHtlcs {
9✔
1464
                        bucket := payments.NestedReadWriteBucket(hash[:])
3✔
1465
                        htlcsBucket := bucket.NestedReadWriteBucket(
3✔
1466
                                paymentHtlcsBucket,
3✔
1467
                        )
3✔
1468

3✔
1469
                        for _, aid := range htlcIDs {
6✔
1470
                                if err := htlcsBucket.Delete(
3✔
1471
                                        htlcBucketKey(htlcAttemptInfoKey, aid),
3✔
1472
                                ); err != nil {
3✔
1473
                                        return err
×
1474
                                }
×
1475

1476
                                if err := htlcsBucket.Delete(
3✔
1477
                                        htlcBucketKey(htlcFailInfoKey, aid),
3✔
1478
                                ); err != nil {
3✔
1479
                                        return err
×
1480
                                }
×
1481

1482
                                if err := htlcsBucket.Delete(
3✔
1483
                                        htlcBucketKey(htlcSettleInfoKey, aid),
3✔
1484
                                ); err != nil {
3✔
1485
                                        return err
×
1486
                                }
×
1487
                        }
1488
                }
1489

1490
                for _, k := range deleteBuckets {
11✔
1491
                        if err := payments.DeleteNestedBucket(k); err != nil {
5✔
1492
                                return err
×
1493
                        }
×
1494
                }
1495

1496
                // Get our index bucket and delete all indexes pointing to the
1497
                // payments we are deleting.
1498
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
6✔
1499
                for _, k := range deleteIndexes {
12✔
1500
                        if err := indexBucket.Delete(k); err != nil {
6✔
1501
                                return err
×
1502
                        }
×
1503
                }
1504

1505
                return nil
6✔
1506
        }, func() {
6✔
1507
                numPayments = 0
6✔
1508
        })
6✔
1509
        if err != nil {
6✔
1510
                return 0, err
×
1511
        }
×
1512

1513
        return numPayments, nil
6✔
1514
}
1515

1516
// fetchSequenceNumbers fetches all the sequence numbers associated with a
1517
// payment, including those belonging to any duplicate payments.
1518
func fetchSequenceNumbers(paymentBucket kvdb.RBucket) ([][]byte, error) {
8✔
1519
        seqNum := paymentBucket.Get(paymentSequenceKey)
8✔
1520
        if seqNum == nil {
8✔
1521
                return nil, errors.New("expected sequence number")
×
1522
        }
×
1523

1524
        sequenceNumbers := [][]byte{seqNum}
8✔
1525

8✔
1526
        // Get the duplicate payments bucket, if it has no duplicates, just
8✔
1527
        // return early with the payment sequence number.
8✔
1528
        duplicates := paymentBucket.NestedReadBucket(duplicatePaymentsBucket)
8✔
1529
        if duplicates == nil {
15✔
1530
                return sequenceNumbers, nil
7✔
1531
        }
7✔
1532

1533
        // If we do have duplicated, they are keyed by sequence number, so we
1534
        // iterate through the duplicates bucket and add them to our set of
1535
        // sequence numbers.
1536
        if err := duplicates.ForEach(func(k, v []byte) error {
2✔
1537
                sequenceNumbers = append(sequenceNumbers, k)
1✔
1538
                return nil
1✔
1539
        }); err != nil {
1✔
1540
                return nil, err
×
1541
        }
×
1542

1543
        return sequenceNumbers, nil
1✔
1544
}
1545

1546
func serializePaymentCreationInfo(w io.Writer, c *PaymentCreationInfo) error {
151✔
1547
        var scratch [8]byte
151✔
1548

151✔
1549
        if _, err := w.Write(c.PaymentIdentifier[:]); err != nil {
151✔
1550
                return err
×
1551
        }
×
1552

1553
        byteOrder.PutUint64(scratch[:], uint64(c.Value))
151✔
1554
        if _, err := w.Write(scratch[:]); err != nil {
151✔
1555
                return err
×
1556
        }
×
1557

1558
        if err := serializeTime(w, c.CreationTime); err != nil {
151✔
1559
                return err
×
1560
        }
×
1561

1562
        byteOrder.PutUint32(scratch[:4], uint32(len(c.PaymentRequest)))
151✔
1563
        if _, err := w.Write(scratch[:4]); err != nil {
151✔
1564
                return err
×
1565
        }
×
1566

1567
        if _, err := w.Write(c.PaymentRequest); err != nil {
151✔
1568
                return err
×
1569
        }
×
1570

1571
        // Any remaining bytes are TLV encoded records. Currently, these are
1572
        // only the custom records provided by the user to be sent to the first
1573
        // hop. But this can easily be extended with further records by merging
1574
        // the records into a single TLV stream.
1575
        err := c.FirstHopCustomRecords.SerializeTo(w)
151✔
1576
        if err != nil {
151✔
1577
                return err
×
1578
        }
×
1579

1580
        return nil
151✔
1581
}
1582

1583
func deserializePaymentCreationInfo(r io.Reader) (*PaymentCreationInfo,
1584
        error) {
642✔
1585

642✔
1586
        var scratch [8]byte
642✔
1587

642✔
1588
        c := &PaymentCreationInfo{}
642✔
1589

642✔
1590
        if _, err := io.ReadFull(r, c.PaymentIdentifier[:]); err != nil {
642✔
1591
                return nil, err
×
1592
        }
×
1593

1594
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
642✔
1595
                return nil, err
×
1596
        }
×
1597
        c.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
642✔
1598

642✔
1599
        creationTime, err := deserializeTime(r)
642✔
1600
        if err != nil {
642✔
1601
                return nil, err
×
1602
        }
×
1603
        c.CreationTime = creationTime
642✔
1604

642✔
1605
        if _, err := io.ReadFull(r, scratch[:4]); err != nil {
642✔
1606
                return nil, err
×
1607
        }
×
1608

1609
        reqLen := byteOrder.Uint32(scratch[:4])
642✔
1610
        payReq := make([]byte, reqLen)
642✔
1611
        if reqLen > 0 {
1,284✔
1612
                if _, err := io.ReadFull(r, payReq); err != nil {
642✔
1613
                        return nil, err
×
1614
                }
×
1615
        }
1616
        c.PaymentRequest = payReq
642✔
1617

642✔
1618
        // Any remaining bytes are TLV encoded records. Currently, these are
642✔
1619
        // only the custom records provided by the user to be sent to the first
642✔
1620
        // hop. But this can easily be extended with further records by merging
642✔
1621
        // the records into a single TLV stream.
642✔
1622
        c.FirstHopCustomRecords, err = lnwire.ParseCustomRecordsFrom(r)
642✔
1623
        if err != nil {
642✔
1624
                return nil, err
×
1625
        }
×
1626

1627
        return c, nil
642✔
1628
}
1629

1630
func serializeHTLCAttemptInfo(w io.Writer, a *HTLCAttemptInfo) error {
72✔
1631
        if err := WriteElements(w, a.sessionKey); err != nil {
72✔
1632
                return err
×
1633
        }
×
1634

1635
        if err := SerializeRoute(w, a.Route); err != nil {
72✔
1636
                return err
×
1637
        }
×
1638

1639
        if err := serializeTime(w, a.AttemptTime); err != nil {
72✔
1640
                return err
×
1641
        }
×
1642

1643
        // If the hash is nil we can just return.
1644
        if a.Hash == nil {
72✔
1645
                return nil
×
1646
        }
×
1647

1648
        if _, err := w.Write(a.Hash[:]); err != nil {
72✔
1649
                return err
×
1650
        }
×
1651

1652
        // Merge the fixed/known records together with the custom records to
1653
        // serialize them as a single blob. We can't do this in SerializeRoute
1654
        // because we're in the middle of the byte stream there. We can only do
1655
        // TLV serialization at the end of the stream, since EOF is allowed for
1656
        // a stream if no more data is expected.
1657
        producers := []tlv.RecordProducer{
72✔
1658
                &a.Route.FirstHopAmount,
72✔
1659
        }
72✔
1660
        tlvData, err := lnwire.MergeAndEncode(
72✔
1661
                producers, nil, a.Route.FirstHopWireCustomRecords,
72✔
1662
        )
72✔
1663
        if err != nil {
72✔
1664
                return err
×
1665
        }
×
1666

1667
        if _, err := w.Write(tlvData); err != nil {
72✔
1668
                return err
×
1669
        }
×
1670

1671
        return nil
72✔
1672
}
1673

1674
func deserializeHTLCAttemptInfo(r io.Reader) (*HTLCAttemptInfo, error) {
703✔
1675
        a := &HTLCAttemptInfo{}
703✔
1676
        err := ReadElements(r, &a.sessionKey)
703✔
1677
        if err != nil {
703✔
1678
                return nil, err
×
1679
        }
×
1680

1681
        a.Route, err = DeserializeRoute(r)
703✔
1682
        if err != nil {
703✔
1683
                return nil, err
×
1684
        }
×
1685

1686
        a.AttemptTime, err = deserializeTime(r)
703✔
1687
        if err != nil {
703✔
1688
                return nil, err
×
1689
        }
×
1690

1691
        hash := lntypes.Hash{}
703✔
1692
        _, err = io.ReadFull(r, hash[:])
703✔
1693

703✔
1694
        switch {
703✔
1695
        // Older payment attempts wouldn't have the hash set, in which case we
1696
        // can just return.
1697
        case errors.Is(err, io.EOF), errors.Is(err, io.ErrUnexpectedEOF):
×
1698
                return a, nil
×
1699

1700
        case err != nil:
×
1701
                return nil, err
×
1702

1703
        default:
703✔
1704
        }
1705

1706
        a.Hash = &hash
703✔
1707

703✔
1708
        // Read any remaining data (if any) and parse it into the known records
703✔
1709
        // and custom records.
703✔
1710
        extraData, err := io.ReadAll(r)
703✔
1711
        if err != nil {
703✔
1712
                return nil, err
×
1713
        }
×
1714

1715
        customRecords, _, _, err := lnwire.ParseAndExtractCustomRecords(
703✔
1716
                extraData, &a.Route.FirstHopAmount,
703✔
1717
        )
703✔
1718
        if err != nil {
703✔
1719
                return nil, err
×
1720
        }
×
1721

1722
        a.Route.FirstHopWireCustomRecords = customRecords
703✔
1723

703✔
1724
        return a, nil
703✔
1725
}
1726

1727
func serializeHop(w io.Writer, h *route.Hop) error {
149✔
1728
        if err := WriteElements(w,
149✔
1729
                h.PubKeyBytes[:],
149✔
1730
                h.ChannelID,
149✔
1731
                h.OutgoingTimeLock,
149✔
1732
                h.AmtToForward,
149✔
1733
        ); err != nil {
149✔
1734
                return err
×
1735
        }
×
1736

1737
        if err := binary.Write(w, byteOrder, h.LegacyPayload); err != nil {
149✔
1738
                return err
×
1739
        }
×
1740

1741
        // For legacy payloads, we don't need to write any TLV records, so
1742
        // we'll write a zero indicating the our serialized TLV map has no
1743
        // records.
1744
        if h.LegacyPayload {
229✔
1745
                return WriteElements(w, uint32(0))
80✔
1746
        }
80✔
1747

1748
        // Gather all non-primitive TLV records so that they can be serialized
1749
        // as a single blob.
1750
        //
1751
        // TODO(conner): add migration to unify all fields in a single TLV
1752
        // blobs. The split approach will cause headaches down the road as more
1753
        // fields are added, which we can avoid by having a single TLV stream
1754
        // for all payload fields.
1755
        var records []tlv.Record
69✔
1756
        if h.MPP != nil {
133✔
1757
                records = append(records, h.MPP.Record())
64✔
1758
        }
64✔
1759

1760
        // Add blinding point and encrypted data if present.
1761
        if h.EncryptedData != nil {
71✔
1762
                records = append(records, record.NewEncryptedDataRecord(
2✔
1763
                        &h.EncryptedData,
2✔
1764
                ))
2✔
1765
        }
2✔
1766

1767
        if h.BlindingPoint != nil {
70✔
1768
                records = append(records, record.NewBlindingPointRecord(
1✔
1769
                        &h.BlindingPoint,
1✔
1770
                ))
1✔
1771
        }
1✔
1772

1773
        if h.AMP != nil {
69✔
1774
                records = append(records, h.AMP.Record())
×
1775
        }
×
1776

1777
        if h.Metadata != nil {
136✔
1778
                records = append(records, record.NewMetadataRecord(&h.Metadata))
67✔
1779
        }
67✔
1780

1781
        if h.TotalAmtMsat != 0 {
70✔
1782
                totalMsatInt := uint64(h.TotalAmtMsat)
1✔
1783
                records = append(
1✔
1784
                        records, record.NewTotalAmtMsatBlinded(&totalMsatInt),
1✔
1785
                )
1✔
1786
        }
1✔
1787

1788
        // Final sanity check to absolutely rule out custom records that are not
1789
        // custom and write into the standard range.
1790
        if err := h.CustomRecords.Validate(); err != nil {
69✔
1791
                return err
×
1792
        }
×
1793

1794
        // Convert custom records to tlv and add to the record list.
1795
        // MapToRecords sorts the list, so adding it here will keep the list
1796
        // canonical.
1797
        tlvRecords := tlv.MapToRecords(h.CustomRecords)
69✔
1798
        records = append(records, tlvRecords...)
69✔
1799

69✔
1800
        // Otherwise, we'll transform our slice of records into a map of the
69✔
1801
        // raw bytes, then serialize them in-line with a length (number of
69✔
1802
        // elements) prefix.
69✔
1803
        mapRecords, err := tlv.RecordsToMap(records)
69✔
1804
        if err != nil {
69✔
1805
                return err
×
1806
        }
×
1807

1808
        numRecords := uint32(len(mapRecords))
69✔
1809
        if err := WriteElements(w, numRecords); err != nil {
69✔
1810
                return err
×
1811
        }
×
1812

1813
        for recordType, rawBytes := range mapRecords {
336✔
1814
                if err := WriteElements(w, recordType); err != nil {
267✔
1815
                        return err
×
1816
                }
×
1817

1818
                if err := wire.WriteVarBytes(w, 0, rawBytes); err != nil {
267✔
1819
                        return err
×
1820
                }
×
1821
        }
1822

1823
        return nil
69✔
1824
}
1825

1826
// maxOnionPayloadSize is the largest Sphinx payload possible, so we don't need
1827
// to read/write a TLV stream larger than this.
1828
const maxOnionPayloadSize = 1300
1829

1830
func deserializeHop(r io.Reader) (*route.Hop, error) {
1,411✔
1831
        h := &route.Hop{}
1,411✔
1832

1,411✔
1833
        var pub []byte
1,411✔
1834
        if err := ReadElements(r, &pub); err != nil {
1,411✔
1835
                return nil, err
×
1836
        }
×
1837
        copy(h.PubKeyBytes[:], pub)
1,411✔
1838

1,411✔
1839
        if err := ReadElements(r,
1,411✔
1840
                &h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward,
1,411✔
1841
        ); err != nil {
1,411✔
1842
                return nil, err
×
1843
        }
×
1844

1845
        // TODO(roasbeef): change field to allow LegacyPayload false to be the
1846
        // legacy default?
1847
        err := binary.Read(r, byteOrder, &h.LegacyPayload)
1,411✔
1848
        if err != nil {
1,411✔
1849
                return nil, err
×
1850
        }
×
1851

1852
        var numElements uint32
1,411✔
1853
        if err := ReadElements(r, &numElements); err != nil {
1,411✔
1854
                return nil, err
×
1855
        }
×
1856

1857
        // If there're no elements, then we can return early.
1858
        if numElements == 0 {
2,143✔
1859
                return h, nil
732✔
1860
        }
732✔
1861

1862
        tlvMap := make(map[uint64][]byte)
679✔
1863
        for i := uint32(0); i < numElements; i++ {
3,385✔
1864
                var tlvType uint64
2,706✔
1865
                if err := ReadElements(r, &tlvType); err != nil {
2,706✔
1866
                        return nil, err
×
1867
                }
×
1868

1869
                rawRecordBytes, err := wire.ReadVarBytes(
2,706✔
1870
                        r, 0, maxOnionPayloadSize, "tlv",
2,706✔
1871
                )
2,706✔
1872
                if err != nil {
2,706✔
1873
                        return nil, err
×
1874
                }
×
1875

1876
                tlvMap[tlvType] = rawRecordBytes
2,706✔
1877
        }
1878

1879
        // If the MPP type is present, remove it from the generic TLV map and
1880
        // parse it back into a proper MPP struct.
1881
        //
1882
        // TODO(conner): add migration to unify all fields in a single TLV
1883
        // blobs. The split approach will cause headaches down the road as more
1884
        // fields are added, which we can avoid by having a single TLV stream
1885
        // for all payload fields.
1886
        mppType := uint64(record.MPPOnionType)
679✔
1887
        if mppBytes, ok := tlvMap[mppType]; ok {
1,352✔
1888
                delete(tlvMap, mppType)
673✔
1889

673✔
1890
                var (
673✔
1891
                        mpp    = &record.MPP{}
673✔
1892
                        mppRec = mpp.Record()
673✔
1893
                        r      = bytes.NewReader(mppBytes)
673✔
1894
                )
673✔
1895
                err := mppRec.Decode(r, uint64(len(mppBytes)))
673✔
1896
                if err != nil {
673✔
1897
                        return nil, err
×
1898
                }
×
1899
                h.MPP = mpp
673✔
1900
        }
1901

1902
        // If encrypted data or blinding key are present, remove them from
1903
        // the TLV map and parse into proper types.
1904
        encryptedDataType := uint64(record.EncryptedDataOnionType)
679✔
1905
        if data, ok := tlvMap[encryptedDataType]; ok {
681✔
1906
                delete(tlvMap, encryptedDataType)
2✔
1907
                h.EncryptedData = data
2✔
1908
        }
2✔
1909

1910
        blindingType := uint64(record.BlindingPointOnionType)
679✔
1911
        if blindingPoint, ok := tlvMap[blindingType]; ok {
680✔
1912
                delete(tlvMap, blindingType)
1✔
1913

1✔
1914
                h.BlindingPoint, err = btcec.ParsePubKey(blindingPoint)
1✔
1915
                if err != nil {
1✔
1916
                        return nil, fmt.Errorf("invalid blinding point: %w",
×
1917
                                err)
×
1918
                }
×
1919
        }
1920

1921
        ampType := uint64(record.AMPOnionType)
679✔
1922
        if ampBytes, ok := tlvMap[ampType]; ok {
679✔
1923
                delete(tlvMap, ampType)
×
1924

×
1925
                var (
×
1926
                        amp    = &record.AMP{}
×
1927
                        ampRec = amp.Record()
×
1928
                        r      = bytes.NewReader(ampBytes)
×
1929
                )
×
1930
                err := ampRec.Decode(r, uint64(len(ampBytes)))
×
1931
                if err != nil {
×
1932
                        return nil, err
×
1933
                }
×
1934
                h.AMP = amp
×
1935
        }
1936

1937
        // If the metadata type is present, remove it from the tlv map and
1938
        // populate directly on the hop.
1939
        metadataType := uint64(record.MetadataOnionType)
679✔
1940
        if metadata, ok := tlvMap[metadataType]; ok {
1,356✔
1941
                delete(tlvMap, metadataType)
677✔
1942

677✔
1943
                h.Metadata = metadata
677✔
1944
        }
677✔
1945

1946
        totalAmtMsatType := uint64(record.TotalAmtMsatBlindedType)
679✔
1947
        if totalAmtMsat, ok := tlvMap[totalAmtMsatType]; ok {
680✔
1948
                delete(tlvMap, totalAmtMsatType)
1✔
1949

1✔
1950
                var (
1✔
1951
                        totalAmtMsatInt uint64
1✔
1952
                        buf             [8]byte
1✔
1953
                )
1✔
1954
                if err := tlv.DTUint64(
1✔
1955
                        bytes.NewReader(totalAmtMsat),
1✔
1956
                        &totalAmtMsatInt,
1✔
1957
                        &buf,
1✔
1958
                        uint64(len(totalAmtMsat)),
1✔
1959
                ); err != nil {
1✔
1960
                        return nil, err
×
1961
                }
×
1962

1963
                h.TotalAmtMsat = lnwire.MilliSatoshi(totalAmtMsatInt)
1✔
1964
        }
1965

1966
        h.CustomRecords = tlvMap
679✔
1967

679✔
1968
        return h, nil
679✔
1969
}
1970

1971
// SerializeRoute serializes a route.
1972
func SerializeRoute(w io.Writer, r route.Route) error {
74✔
1973
        if err := WriteElements(w,
74✔
1974
                r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:],
74✔
1975
        ); err != nil {
74✔
1976
                return err
×
1977
        }
×
1978

1979
        if err := WriteElements(w, uint32(len(r.Hops))); err != nil {
74✔
1980
                return err
×
1981
        }
×
1982

1983
        for _, h := range r.Hops {
223✔
1984
                if err := serializeHop(w, h); err != nil {
149✔
1985
                        return err
×
1986
                }
×
1987
        }
1988

1989
        // Any new/extra TLV data is encoded in serializeHTLCAttemptInfo!
1990

1991
        return nil
74✔
1992
}
1993

1994
// DeserializeRoute deserializes a route.
1995
func DeserializeRoute(r io.Reader) (route.Route, error) {
705✔
1996
        rt := route.Route{}
705✔
1997
        if err := ReadElements(r,
705✔
1998
                &rt.TotalTimeLock, &rt.TotalAmount,
705✔
1999
        ); err != nil {
705✔
2000
                return rt, err
×
2001
        }
×
2002

2003
        var pub []byte
705✔
2004
        if err := ReadElements(r, &pub); err != nil {
705✔
2005
                return rt, err
×
2006
        }
×
2007
        copy(rt.SourcePubKey[:], pub)
705✔
2008

705✔
2009
        var numHops uint32
705✔
2010
        if err := ReadElements(r, &numHops); err != nil {
705✔
2011
                return rt, err
×
2012
        }
×
2013

2014
        var hops []*route.Hop
705✔
2015
        for i := uint32(0); i < numHops; i++ {
2,116✔
2016
                hop, err := deserializeHop(r)
1,411✔
2017
                if err != nil {
1,411✔
2018
                        return rt, err
×
2019
                }
×
2020
                hops = append(hops, hop)
1,411✔
2021
        }
2022
        rt.Hops = hops
705✔
2023

705✔
2024
        // Any new/extra TLV data is decoded in deserializeHTLCAttemptInfo!
705✔
2025

705✔
2026
        return rt, nil
705✔
2027
}
2028

2029
// serializeHTLCSettleInfo serializes the details of a settled htlc.
2030
func serializeHTLCSettleInfo(w io.Writer, s *HTLCSettleInfo) error {
16✔
2031
        if _, err := w.Write(s.Preimage[:]); err != nil {
16✔
2032
                return err
×
2033
        }
×
2034

2035
        if err := serializeTime(w, s.SettleTime); err != nil {
16✔
2036
                return err
×
2037
        }
×
2038

2039
        return nil
16✔
2040
}
2041

2042
// deserializeHTLCSettleInfo deserializes the details of a settled htlc.
2043
func deserializeHTLCSettleInfo(r io.Reader) (*HTLCSettleInfo, error) {
89✔
2044
        s := &HTLCSettleInfo{}
89✔
2045
        if _, err := io.ReadFull(r, s.Preimage[:]); err != nil {
89✔
2046
                return nil, err
×
2047
        }
×
2048

2049
        var err error
89✔
2050
        s.SettleTime, err = deserializeTime(r)
89✔
2051
        if err != nil {
89✔
2052
                return nil, err
×
2053
        }
×
2054

2055
        return s, nil
89✔
2056
}
2057

2058
// serializeHTLCFailInfo serializes the details of a failed htlc including the
2059
// wire failure.
2060
func serializeHTLCFailInfo(w io.Writer, f *HTLCFailInfo) error {
31✔
2061
        if err := serializeTime(w, f.FailTime); err != nil {
31✔
2062
                return err
×
2063
        }
×
2064

2065
        // Write failure. If there is no failure message, write an empty
2066
        // byte slice.
2067
        var messageBytes bytes.Buffer
31✔
2068
        if f.Message != nil {
31✔
2069
                err := lnwire.EncodeFailureMessage(&messageBytes, f.Message, 0)
×
2070
                if err != nil {
×
2071
                        return err
×
2072
                }
×
2073
        }
2074
        if err := wire.WriteVarBytes(w, 0, messageBytes.Bytes()); err != nil {
31✔
2075
                return err
×
2076
        }
×
2077

2078
        return WriteElements(w, byte(f.Reason), f.FailureSourceIndex)
31✔
2079
}
2080

2081
// deserializeHTLCFailInfo deserializes the details of a failed htlc including
2082
// the wire failure.
2083
func deserializeHTLCFailInfo(r io.Reader) (*HTLCFailInfo, error) {
305✔
2084
        f := &HTLCFailInfo{}
305✔
2085
        var err error
305✔
2086
        f.FailTime, err = deserializeTime(r)
305✔
2087
        if err != nil {
305✔
2088
                return nil, err
×
2089
        }
×
2090

2091
        // Read failure.
2092
        failureBytes, err := wire.ReadVarBytes(
305✔
2093
                r, 0, math.MaxUint16, "failure",
305✔
2094
        )
305✔
2095
        if err != nil {
305✔
2096
                return nil, err
×
2097
        }
×
2098
        if len(failureBytes) > 0 {
305✔
2099
                f.Message, err = lnwire.DecodeFailureMessage(
×
2100
                        bytes.NewReader(failureBytes), 0,
×
2101
                )
×
2102
                if err != nil {
×
2103
                        return nil, err
×
2104
                }
×
2105
        }
2106

2107
        var reason byte
305✔
2108
        err = ReadElements(r, &reason, &f.FailureSourceIndex)
305✔
2109
        if err != nil {
305✔
2110
                return nil, err
×
2111
        }
×
2112
        f.Reason = HTLCFailReason(reason)
305✔
2113

305✔
2114
        return f, nil
305✔
2115
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc