• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17027244024

17 Aug 2025 11:32PM UTC coverage: 57.287% (-9.5%) from 66.765%
17027244024

Pull #10167

github

web-flow
Merge fcb4f4303 into fb1adfc21
Pull Request #10167: multi: bump Go to 1.24.6

3 of 18 new or added lines in 6 files covered. (16.67%)

28537 existing lines in 457 files now uncovered.

99094 of 172978 relevant lines covered (57.29%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.43
/payments/db/kv_store.go
1
package paymentsdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "sort"
11
        "sync"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/kvdb"
18
        "github.com/lightningnetwork/lnd/lntypes"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/record"
21
        "github.com/lightningnetwork/lnd/routing/route"
22
        "github.com/lightningnetwork/lnd/tlv"
23
)
24

25
const (
26
        // paymentSeqBlockSize is the block size used when we batch allocate
27
        // payment sequences for future payments.
28
        paymentSeqBlockSize = 1000
29

30
        // paymentProgressLogInterval is the interval we use limiting the
31
        // logging output of payment processing.
32
        paymentProgressLogInterval = 30 * time.Second
33
)
34

35
//nolint:ll
36
var (
37
        // paymentsRootBucket is the name of the top-level bucket within the
38
        // database that stores all data related to payments. Within this
39
        // bucket, each payment hash its own sub-bucket keyed by its payment
40
        // hash.
41
        //
42
        // Bucket hierarchy:
43
        //
44
        // root-bucket
45
        //      |
46
        //      |-- <paymenthash>
47
        //      |        |--sequence-key: <sequence number>
48
        //      |        |--creation-info-key: <creation info>
49
        //      |        |--fail-info-key: <(optional) fail info>
50
        //      |        |
51
        //      |        |--payment-htlcs-bucket (shard-bucket)
52
        //      |        |        |
53
        //      |        |        |-- ai<htlc attempt ID>: <htlc attempt info>
54
        //      |        |        |-- si<htlc attempt ID>: <(optional) settle info>
55
        //      |        |        |-- fi<htlc attempt ID>: <(optional) fail info>
56
        //      |        |        |
57
        //      |        |       ...
58
        //      |        |
59
        //      |        |
60
        //      |        |--duplicate-bucket (only for old, completed payments)
61
        //      |                 |
62
        //      |                 |-- <seq-num>
63
        //      |                 |       |--sequence-key: <sequence number>
64
        //      |                 |       |--creation-info-key: <creation info>
65
        //      |                 |       |--ai: <attempt info>
66
        //      |                 |       |--si: <settle info>
67
        //      |                 |       |--fi: <fail info>
68
        //      |                 |
69
        //      |                 |-- <seq-num>
70
        //      |                 |       |
71
        //      |                ...     ...
72
        //      |
73
        //      |-- <paymenthash>
74
        //      |        |
75
        //      |       ...
76
        //     ...
77
        //
78
        paymentsRootBucket = []byte("payments-root-bucket")
79

80
        // paymentSequenceKey is a key used in the payment's sub-bucket to
81
        // store the sequence number of the payment.
82
        paymentSequenceKey = []byte("payment-sequence-key")
83

84
        // paymentCreationInfoKey is a key used in the payment's sub-bucket to
85
        // store the creation info of the payment.
86
        paymentCreationInfoKey = []byte("payment-creation-info")
87

88
        // paymentHtlcsBucket is a bucket where we'll store the information
89
        // about the HTLCs that were attempted for a payment.
90
        paymentHtlcsBucket = []byte("payment-htlcs-bucket")
91

92
        // htlcAttemptInfoKey is the key used as the prefix of an HTLC attempt
93
        // to store the info about the attempt that was done for the HTLC in
94
        // question. The HTLC attempt ID is concatenated at the end.
95
        htlcAttemptInfoKey = []byte("ai")
96

97
        // htlcSettleInfoKey is the key used as the prefix of an HTLC attempt
98
        // settle info, if any. The HTLC attempt ID is concatenated at the end.
99
        htlcSettleInfoKey = []byte("si")
100

101
        // htlcFailInfoKey is the key used as the prefix of an HTLC attempt
102
        // failure information, if any.The  HTLC attempt ID is concatenated at
103
        // the end.
104
        htlcFailInfoKey = []byte("fi")
105

106
        // paymentFailInfoKey is a key used in the payment's sub-bucket to
107
        // store information about the reason a payment failed.
108
        paymentFailInfoKey = []byte("payment-fail-info")
109

110
        // paymentsIndexBucket is the name of the top-level bucket within the
111
        // database that stores an index of payment sequence numbers to its
112
        // payment hash.
113
        // payments-sequence-index-bucket
114
        //         |--<sequence-number>: <payment hash>
115
        //         |--...
116
        //         |--<sequence-number>: <payment hash>
117
        paymentsIndexBucket = []byte("payments-index-bucket")
118
)
119

120
// KVPaymentsDB implements persistence for payments and payment attempts.
121
type KVPaymentsDB struct {
122
        // Sequence management for the kv store.
123
        seqMu     sync.Mutex
124
        currSeq   uint64
125
        storedSeq uint64
126

127
        // db is the underlying database implementation.
128
        db kvdb.Backend
129

130
        // keepFailedPaymentAttempts is a flag that indicates whether we should
131
        // keep failed payment attempts in the database.
132
        keepFailedPaymentAttempts bool
133
}
134

135
// defaultKVStoreOptions returns the default options for the KV store.
136
func defaultKVStoreOptions() *StoreOptions {
3✔
137
        return &StoreOptions{
3✔
138
                KeepFailedPaymentAttempts: false,
3✔
139
        }
3✔
140
}
3✔
141

142
// NewKVPaymentsDB creates a new KVStore for payments.
143
func NewKVPaymentsDB(db kvdb.Backend,
144
        options ...OptionModifier) (*KVPaymentsDB, error) {
3✔
145

3✔
146
        opts := defaultKVStoreOptions()
3✔
147
        for _, applyOption := range options {
6✔
148
                applyOption(opts)
3✔
149
        }
3✔
150

151
        if !opts.NoMigration {
6✔
152
                if err := initKVStore(db); err != nil {
3✔
153
                        return nil, err
×
154
                }
×
155
        }
156

157
        return &KVPaymentsDB{
3✔
158
                db:                        db,
3✔
159
                keepFailedPaymentAttempts: opts.KeepFailedPaymentAttempts,
3✔
160
        }, nil
3✔
161
}
162

163
// paymentsTopLevelBuckets is a list of top-level buckets that are used for
164
// the payments database when using the kv store.
165
var paymentsTopLevelBuckets = [][]byte{
166
        paymentsRootBucket,
167
        paymentsIndexBucket,
168
}
169

170
// initKVStore creates and initializes the top-level buckets for the payment db.
171
func initKVStore(db kvdb.Backend) error {
3✔
172
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
173
                for _, tlb := range paymentsTopLevelBuckets {
6✔
174
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
3✔
175
                                return err
×
176
                        }
×
177
                }
178

179
                return nil
3✔
180
        }, func() {})
3✔
181
        if err != nil {
3✔
182
                return fmt.Errorf("unable to create new payments db: %w", err)
×
183
        }
×
184

185
        return nil
3✔
186
}
187

188
// InitPayment checks or records the given PaymentCreationInfo with the DB,
189
// making sure it does not already exist as an in-flight payment. When this
190
// method returns successfully, the payment is guaranteed to be in the InFlight
191
// state.
192
func (p *KVPaymentsDB) InitPayment(paymentHash lntypes.Hash,
193
        info *PaymentCreationInfo) error {
3✔
194

3✔
195
        // Obtain a new sequence number for this payment. This is used
3✔
196
        // to sort the payments in order of creation, and also acts as
3✔
197
        // a unique identifier for each payment.
3✔
198
        sequenceNum, err := p.nextPaymentSequence()
3✔
199
        if err != nil {
3✔
200
                return err
×
201
        }
×
202

203
        var b bytes.Buffer
3✔
204
        if err := serializePaymentCreationInfo(&b, info); err != nil {
3✔
205
                return err
×
206
        }
×
207
        infoBytes := b.Bytes()
3✔
208

3✔
209
        var updateErr error
3✔
210
        err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
6✔
211
                // Reset the update error, to avoid carrying over an error
3✔
212
                // from a previous execution of the batched db transaction.
3✔
213
                updateErr = nil
3✔
214

3✔
215
                prefetchPayment(tx, paymentHash)
3✔
216
                bucket, err := createPaymentBucket(tx, paymentHash)
3✔
217
                if err != nil {
3✔
218
                        return err
×
219
                }
×
220

221
                // Get the existing status of this payment, if any.
222
                paymentStatus, err := fetchPaymentStatus(bucket)
3✔
223

3✔
224
                switch {
3✔
225
                // If no error is returned, it means we already have this
226
                // payment. We'll check the status to decide whether we allow
227
                // retrying the payment or return a specific error.
228
                case err == nil:
3✔
229
                        if err := paymentStatus.initializable(); err != nil {
6✔
230
                                updateErr = err
3✔
231
                                return nil
3✔
232
                        }
3✔
233

234
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
235
                // we'll return the error.
236
                case !errors.Is(err, ErrPaymentNotInitiated):
×
237
                        return err
×
238
                }
239

240
                // Before we set our new sequence number, we check whether this
241
                // payment has a previously set sequence number and remove its
242
                // index entry if it exists. This happens in the case where we
243
                // have a previously attempted payment which was left in a state
244
                // where we can retry.
245
                seqBytes := bucket.Get(paymentSequenceKey)
3✔
246
                if seqBytes != nil {
6✔
247
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
248
                        if err := indexBucket.Delete(seqBytes); err != nil {
3✔
249
                                return err
×
250
                        }
×
251
                }
252

253
                // Once we have obtained a sequence number, we add an entry
254
                // to our index bucket which will map the sequence number to
255
                // our payment identifier.
256
                err = createPaymentIndexEntry(
3✔
257
                        tx, sequenceNum, info.PaymentIdentifier,
3✔
258
                )
3✔
259
                if err != nil {
3✔
260
                        return err
×
261
                }
×
262

263
                err = bucket.Put(paymentSequenceKey, sequenceNum)
3✔
264
                if err != nil {
3✔
265
                        return err
×
266
                }
×
267

268
                // Add the payment info to the bucket, which contains the
269
                // static information for this payment
270
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
3✔
271
                if err != nil {
3✔
272
                        return err
×
273
                }
×
274

275
                // We'll delete any lingering HTLCs to start with, in case we
276
                // are initializing a payment that was attempted earlier, but
277
                // left in a state where we could retry.
278
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
3✔
279
                if err != nil && !errors.Is(err, kvdb.ErrBucketNotFound) {
3✔
280
                        return err
×
281
                }
×
282

283
                // Also delete any lingering failure info now that we are
284
                // re-attempting.
285
                return bucket.Delete(paymentFailInfoKey)
3✔
286
        })
287
        if err != nil {
3✔
288
                return fmt.Errorf("unable to init payment: %w", err)
×
289
        }
×
290

291
        return updateErr
3✔
292
}
293

294
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
295
// by the KVPaymentsDB db.
296
func (p *KVPaymentsDB) DeleteFailedAttempts(hash lntypes.Hash) error {
3✔
297
        if !p.keepFailedPaymentAttempts {
3✔
UNCOV
298
                const failedHtlcsOnly = true
×
UNCOV
299
                err := p.DeletePayment(hash, failedHtlcsOnly)
×
UNCOV
300
                if err != nil {
×
UNCOV
301
                        return err
×
UNCOV
302
                }
×
303
        }
304

305
        return nil
3✔
306
}
307

308
// paymentIndexTypeHash is a payment index type which indicates that we have
309
// created an index of payment sequence number to payment hash.
310
type paymentIndexType uint8
311

312
// paymentIndexTypeHash is a payment index type which indicates that we have
313
// created an index of payment sequence number to payment hash.
314
const paymentIndexTypeHash paymentIndexType = 0
315

316
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
317
// index produced contains a payment index type (which can be used in future to
318
// signal different payment index types) and the payment identifier.
319
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
320
        id lntypes.Hash) error {
3✔
321

3✔
322
        var b bytes.Buffer
3✔
323
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
3✔
324
                return err
×
325
        }
×
326

327
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
328

3✔
329
        return indexes.Put(sequenceNumber, b.Bytes())
3✔
330
}
331

332
// deserializePaymentIndex deserializes a payment index entry. This function
333
// currently only supports deserialization of payment hash indexes, and will
334
// fail for other types.
335
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
3✔
336
        var (
3✔
337
                indexType   paymentIndexType
3✔
338
                paymentHash []byte
3✔
339
        )
3✔
340

3✔
341
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
3✔
342
                return lntypes.Hash{}, err
×
343
        }
×
344

345
        // While we only have on payment index type, we do not need to use our
346
        // index type to deserialize the index. However, we sanity check that
347
        // this type is as expected, since we had to read it out anyway.
348
        if indexType != paymentIndexTypeHash {
3✔
349
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
350
                        "type: %v", indexType)
×
351
        }
×
352

353
        hash, err := lntypes.MakeHash(paymentHash)
3✔
354
        if err != nil {
3✔
355
                return lntypes.Hash{}, err
×
356
        }
×
357

358
        return hash, nil
3✔
359
}
360

361
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
362
// DB.
363
func (p *KVPaymentsDB) RegisterAttempt(paymentHash lntypes.Hash,
364
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
3✔
365

3✔
366
        // Serialize the information before opening the db transaction.
3✔
367
        var a bytes.Buffer
3✔
368
        err := serializeHTLCAttemptInfo(&a, attempt)
3✔
369
        if err != nil {
3✔
370
                return nil, err
×
371
        }
×
372
        htlcInfoBytes := a.Bytes()
3✔
373

3✔
374
        htlcIDBytes := make([]byte, 8)
3✔
375
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
3✔
376

3✔
377
        var payment *MPPayment
3✔
378
        err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
6✔
379
                prefetchPayment(tx, paymentHash)
3✔
380
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
381
                if err != nil {
3✔
382
                        return err
×
383
                }
×
384

385
                payment, err = fetchPayment(bucket)
3✔
386
                if err != nil {
3✔
387
                        return err
×
388
                }
×
389

390
                // Check if registering a new attempt is allowed.
391
                if err := payment.Registrable(); err != nil {
3✔
UNCOV
392
                        return err
×
UNCOV
393
                }
×
394

395
                // If the final hop has encrypted data, then we know this is a
396
                // blinded payment. In blinded payments, MPP records are not set
397
                // for split payments and the recipient is responsible for using
398
                // a consistent PathID across the various encrypted data
399
                // payloads that we received from them for this payment. All we
400
                // need to check is that the total amount field for each HTLC
401
                // in the split payment is correct.
402
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
3✔
403

3✔
404
                // Make sure any existing shards match the new one with regards
3✔
405
                // to MPP options.
3✔
406
                mpp := attempt.Route.FinalHop().MPP
3✔
407

3✔
408
                // MPP records should not be set for attempts to blinded paths.
3✔
409
                if isBlinded && mpp != nil {
3✔
410
                        return ErrMPPRecordInBlindedPayment
×
411
                }
×
412

413
                for _, h := range payment.InFlightHTLCs() {
6✔
414
                        hMpp := h.Route.FinalHop().MPP
3✔
415

3✔
416
                        // If this is a blinded payment, then no existing HTLCs
3✔
417
                        // should have MPP records.
3✔
418
                        if isBlinded && hMpp != nil {
3✔
419
                                return ErrMPPRecordInBlindedPayment
×
420
                        }
×
421

422
                        // If this is a blinded payment, then we just need to
423
                        // check that the TotalAmtMsat field for this shard
424
                        // is equal to that of any other shard in the same
425
                        // payment.
426
                        if isBlinded {
6✔
427
                                if attempt.Route.FinalHop().TotalAmtMsat !=
3✔
428
                                        h.Route.FinalHop().TotalAmtMsat {
3✔
429

×
430
                                        //nolint:ll
×
431
                                        return ErrBlindedPaymentTotalAmountMismatch
×
432
                                }
×
433

434
                                continue
3✔
435
                        }
436

437
                        switch {
3✔
438
                        // We tried to register a non-MPP attempt for a MPP
439
                        // payment.
UNCOV
440
                        case mpp == nil && hMpp != nil:
×
UNCOV
441
                                return ErrMPPayment
×
442

443
                        // We tried to register a MPP shard for a non-MPP
444
                        // payment.
UNCOV
445
                        case mpp != nil && hMpp == nil:
×
UNCOV
446
                                return ErrNonMPPayment
×
447

448
                        // Non-MPP payment, nothing more to validate.
449
                        case mpp == nil:
×
450
                                continue
×
451
                        }
452

453
                        // Check that MPP options match.
454
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
3✔
UNCOV
455
                                return ErrMPPPaymentAddrMismatch
×
UNCOV
456
                        }
×
457

458
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
3✔
UNCOV
459
                                return ErrMPPTotalAmountMismatch
×
UNCOV
460
                        }
×
461
                }
462

463
                // If this is a non-MPP attempt, it must match the total amount
464
                // exactly. Note that a blinded payment is considered an MPP
465
                // attempt.
466
                amt := attempt.Route.ReceiverAmt()
3✔
467
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
3✔
468
                        return ErrValueMismatch
×
469
                }
×
470

471
                // Ensure we aren't sending more than the total payment amount.
472
                sentAmt, _ := payment.SentAmt()
3✔
473
                if sentAmt+amt > payment.Info.Value {
3✔
UNCOV
474
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
×
UNCOV
475
                                "%v", ErrValueExceedsAmt,
×
UNCOV
476
                                sentAmt+amt, payment.Info.Value)
×
UNCOV
477
                }
×
478

479
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
3✔
480
                        paymentHtlcsBucket,
3✔
481
                )
3✔
482
                if err != nil {
3✔
483
                        return err
×
484
                }
×
485

486
                err = htlcsBucket.Put(
3✔
487
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
3✔
488
                        htlcInfoBytes,
3✔
489
                )
3✔
490
                if err != nil {
3✔
491
                        return err
×
492
                }
×
493

494
                // Retrieve attempt info for the notification.
495
                payment, err = fetchPayment(bucket)
3✔
496

3✔
497
                return err
3✔
498
        })
499
        if err != nil {
3✔
UNCOV
500
                return nil, err
×
UNCOV
501
        }
×
502

503
        return payment, err
3✔
504
}
505

506
// SettleAttempt marks the given attempt settled with the preimage. If this is
507
// a multi shard payment, this might implicitly mean that the full payment
508
// succeeded.
509
//
510
// After invoking this method, InitPayment should always return an error to
511
// prevent us from making duplicate payments to the same payment hash. The
512
// provided preimage is atomically saved to the DB for record keeping.
513
func (p *KVPaymentsDB) SettleAttempt(hash lntypes.Hash,
514
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
3✔
515

3✔
516
        var b bytes.Buffer
3✔
517
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
3✔
518
                return nil, err
×
519
        }
×
520
        settleBytes := b.Bytes()
3✔
521

3✔
522
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
3✔
523
}
524

525
// FailAttempt marks the given payment attempt failed.
526
func (p *KVPaymentsDB) FailAttempt(hash lntypes.Hash,
527
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
3✔
528

3✔
529
        var b bytes.Buffer
3✔
530
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
3✔
531
                return nil, err
×
532
        }
×
533
        failBytes := b.Bytes()
3✔
534

3✔
535
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
3✔
536
}
537

538
// updateHtlcKey updates a database key for the specified htlc.
539
func (p *KVPaymentsDB) updateHtlcKey(paymentHash lntypes.Hash,
540
        attemptID uint64, key, value []byte) (*MPPayment, error) {
3✔
541

3✔
542
        aid := make([]byte, 8)
3✔
543
        binary.BigEndian.PutUint64(aid, attemptID)
3✔
544

3✔
545
        var payment *MPPayment
3✔
546
        err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
6✔
547
                payment = nil
3✔
548

3✔
549
                prefetchPayment(tx, paymentHash)
3✔
550
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
551
                if err != nil {
3✔
UNCOV
552
                        return err
×
UNCOV
553
                }
×
554

555
                p, err := fetchPayment(bucket)
3✔
556
                if err != nil {
3✔
557
                        return err
×
558
                }
×
559

560
                // We can only update keys of in-flight payments. We allow
561
                // updating keys even if the payment has reached a terminal
562
                // condition, since the HTLC outcomes must still be updated.
563
                if err := p.Status.updatable(); err != nil {
3✔
564
                        return err
×
565
                }
×
566

567
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
3✔
568
                if htlcsBucket == nil {
3✔
569
                        return fmt.Errorf("htlcs bucket not found")
×
570
                }
×
571

572
                attemptKey := htlcBucketKey(htlcAttemptInfoKey, aid)
3✔
573
                if htlcsBucket.Get(attemptKey) == nil {
3✔
574
                        return fmt.Errorf("HTLC with ID %v not registered",
×
575
                                attemptID)
×
576
                }
×
577

578
                // Make sure the shard is not already failed or settled.
579
                failKey := htlcBucketKey(htlcFailInfoKey, aid)
3✔
580
                if htlcsBucket.Get(failKey) != nil {
3✔
581
                        return ErrAttemptAlreadyFailed
×
582
                }
×
583

584
                settleKey := htlcBucketKey(htlcSettleInfoKey, aid)
3✔
585
                if htlcsBucket.Get(settleKey) != nil {
3✔
586
                        return ErrAttemptAlreadySettled
×
587
                }
×
588

589
                // Add or update the key for this htlc.
590
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
3✔
591
                if err != nil {
3✔
592
                        return err
×
593
                }
×
594

595
                // Retrieve attempt info for the notification.
596
                payment, err = fetchPayment(bucket)
3✔
597

3✔
598
                return err
3✔
599
        })
600
        if err != nil {
3✔
UNCOV
601
                return nil, err
×
UNCOV
602
        }
×
603

604
        return payment, err
3✔
605
}
606

607
// Fail transitions a payment into the Failed state, and records the reason the
608
// payment failed. After invoking this method, InitPayment should return nil on
609
// its next call for this payment hash, allowing the switch to make a
610
// subsequent payment.
611
func (p *KVPaymentsDB) Fail(paymentHash lntypes.Hash,
612
        reason FailureReason) (*MPPayment, error) {
3✔
613

3✔
614
        var (
3✔
615
                updateErr error
3✔
616
                payment   *MPPayment
3✔
617
        )
3✔
618
        err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
6✔
619
                // Reset the update error, to avoid carrying over an error
3✔
620
                // from a previous execution of the batched db transaction.
3✔
621
                updateErr = nil
3✔
622
                payment = nil
3✔
623

3✔
624
                prefetchPayment(tx, paymentHash)
3✔
625
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
626
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
UNCOV
627
                        updateErr = ErrPaymentNotInitiated
×
UNCOV
628
                        return nil
×
629
                } else if err != nil {
3✔
630
                        return err
×
631
                }
×
632

633
                // We mark the payment as failed as long as it is known. This
634
                // lets the last attempt to fail with a terminal write its
635
                // failure to the KVPaymentsDB without synchronizing with
636
                // other attempts.
637
                _, err = fetchPaymentStatus(bucket)
3✔
638
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
639
                        updateErr = ErrPaymentNotInitiated
×
640
                        return nil
×
641
                } else if err != nil {
3✔
642
                        return err
×
643
                }
×
644

645
                // Put the failure reason in the bucket for record keeping.
646
                v := []byte{byte(reason)}
3✔
647
                err = bucket.Put(paymentFailInfoKey, v)
3✔
648
                if err != nil {
3✔
649
                        return err
×
650
                }
×
651

652
                // Retrieve attempt info for the notification, if available.
653
                payment, err = fetchPayment(bucket)
3✔
654
                if err != nil {
3✔
655
                        return err
×
656
                }
×
657

658
                return nil
3✔
659
        })
660
        if err != nil {
3✔
661
                return nil, err
×
662
        }
×
663

664
        return payment, updateErr
3✔
665
}
666

667
// FetchPayment returns information about a payment from the database.
668
func (p *KVPaymentsDB) FetchPayment(paymentHash lntypes.Hash) (
669
        *MPPayment, error) {
3✔
670

3✔
671
        var payment *MPPayment
3✔
672
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
673
                prefetchPayment(tx, paymentHash)
3✔
674
                bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
675
                if err != nil {
3✔
UNCOV
676
                        return err
×
UNCOV
677
                }
×
678

679
                payment, err = fetchPayment(bucket)
3✔
680

3✔
681
                return err
3✔
682
        }, func() {
3✔
683
                payment = nil
3✔
684
        })
3✔
685
        if err != nil {
3✔
UNCOV
686
                return nil, err
×
UNCOV
687
        }
×
688

689
        return payment, nil
3✔
690
}
691

692
// prefetchPayment attempts to prefetch as much of the payment as possible to
693
// reduce DB roundtrips.
694
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
3✔
695
        rb := kvdb.RootBucket(tx)
3✔
696
        kvdb.Prefetch(
3✔
697
                rb,
3✔
698
                []string{
3✔
699
                        // Prefetch all keys in the payment's bucket.
3✔
700
                        string(paymentsRootBucket),
3✔
701
                        string(paymentHash[:]),
3✔
702
                },
3✔
703
                []string{
3✔
704
                        // Prefetch all keys in the payment's htlc bucket.
3✔
705
                        string(paymentsRootBucket),
3✔
706
                        string(paymentHash[:]),
3✔
707
                        string(paymentHtlcsBucket),
3✔
708
                },
3✔
709
        )
3✔
710
}
3✔
711

712
// createPaymentBucket creates or fetches the sub-bucket assigned to this
713
// payment hash.
714
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
715
        kvdb.RwBucket, error) {
3✔
716

3✔
717
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
3✔
718
        if err != nil {
3✔
719
                return nil, err
×
720
        }
×
721

722
        return payments.CreateBucketIfNotExists(paymentHash[:])
3✔
723
}
724

725
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
726
// the bucket does not exist, it returns ErrPaymentNotInitiated.
727
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
728
        kvdb.RBucket, error) {
3✔
729

3✔
730
        payments := tx.ReadBucket(paymentsRootBucket)
3✔
731
        if payments == nil {
3✔
732
                return nil, ErrPaymentNotInitiated
×
733
        }
×
734

735
        bucket := payments.NestedReadBucket(paymentHash[:])
3✔
736
        if bucket == nil {
3✔
UNCOV
737
                return nil, ErrPaymentNotInitiated
×
UNCOV
738
        }
×
739

740
        return bucket, nil
3✔
741
}
742

743
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
744
// bucket that can be written to.
745
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
746
        kvdb.RwBucket, error) {
3✔
747

3✔
748
        payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
749
        if payments == nil {
3✔
750
                return nil, ErrPaymentNotInitiated
×
751
        }
×
752

753
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
3✔
754
        if bucket == nil {
3✔
UNCOV
755
                return nil, ErrPaymentNotInitiated
×
UNCOV
756
        }
×
757

758
        return bucket, nil
3✔
759
}
760

761
// nextPaymentSequence returns the next sequence number to store for a new
762
// payment.
763
func (p *KVPaymentsDB) nextPaymentSequence() ([]byte, error) {
3✔
764
        p.seqMu.Lock()
3✔
765
        defer p.seqMu.Unlock()
3✔
766

3✔
767
        // Set a new upper bound in the DB every 1000 payments to avoid
3✔
768
        // conflicts on the sequence when using etcd.
3✔
769
        if p.currSeq == p.storedSeq {
6✔
770
                var currPaymentSeq, newUpperBound uint64
3✔
771
                if err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
6✔
772
                        paymentsBucket, err := tx.CreateTopLevelBucket(
3✔
773
                                paymentsRootBucket,
3✔
774
                        )
3✔
775
                        if err != nil {
3✔
776
                                return err
×
777
                        }
×
778

779
                        currPaymentSeq = paymentsBucket.Sequence()
3✔
780
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
3✔
781

3✔
782
                        return paymentsBucket.SetSequence(newUpperBound)
3✔
783
                }, func() {}); err != nil {
3✔
784
                        return nil, err
×
785
                }
×
786

787
                // We lazy initialize the cached currPaymentSeq here using the
788
                // first nextPaymentSequence() call. This if statement will auto
789
                // initialize our stored currPaymentSeq, since by default both
790
                // this variable and storedPaymentSeq are zero which in turn
791
                // will have us fetch the current values from the DB.
792
                if p.currSeq == 0 {
6✔
793
                        p.currSeq = currPaymentSeq
3✔
794
                }
3✔
795

796
                p.storedSeq = newUpperBound
3✔
797
        }
798

799
        p.currSeq++
3✔
800
        b := make([]byte, 8)
3✔
801
        binary.BigEndian.PutUint64(b, p.currSeq)
3✔
802

3✔
803
        return b, nil
3✔
804
}
805

806
// fetchPaymentStatus fetches the payment status of the payment. If the payment
807
// isn't found, it will return error `ErrPaymentNotInitiated`.
808
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
3✔
809
        // Creation info should be set for all payments, regardless of state.
3✔
810
        // If not, it is unknown.
3✔
811
        if bucket.Get(paymentCreationInfoKey) == nil {
6✔
812
                return 0, ErrPaymentNotInitiated
3✔
813
        }
3✔
814

815
        payment, err := fetchPayment(bucket)
3✔
816
        if err != nil {
3✔
817
                return 0, err
×
818
        }
×
819

820
        return payment.Status, nil
3✔
821
}
822

823
// FetchInFlightPayments returns all payments with status InFlight.
824
func (p *KVPaymentsDB) FetchInFlightPayments() ([]*MPPayment, error) {
3✔
825
        var (
3✔
826
                inFlights      []*MPPayment
3✔
827
                start          = time.Now()
3✔
828
                lastLogTime    = time.Now()
3✔
829
                processedCount int
3✔
830
        )
3✔
831

3✔
832
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
833
                payments := tx.ReadBucket(paymentsRootBucket)
3✔
834
                if payments == nil {
3✔
835
                        return nil
×
836
                }
×
837

838
                return payments.ForEach(func(k, _ []byte) error {
6✔
839
                        bucket := payments.NestedReadBucket(k)
3✔
840
                        if bucket == nil {
3✔
841
                                return fmt.Errorf("non bucket element")
×
842
                        }
×
843

844
                        p, err := fetchPayment(bucket)
3✔
845
                        if err != nil {
3✔
846
                                return err
×
847
                        }
×
848

849
                        processedCount++
3✔
850
                        if time.Since(lastLogTime) >=
3✔
851
                                paymentProgressLogInterval {
3✔
852

×
853
                                log.Debugf("Scanning inflight payments "+
×
854
                                        "(in progress), processed %d, last "+
×
855
                                        "processed payment: %v", processedCount,
×
856
                                        p.Info)
×
857

×
858
                                lastLogTime = time.Now()
×
859
                        }
×
860

861
                        // Skip the payment if it's terminated.
862
                        if p.Terminated() {
6✔
863
                                return nil
3✔
864
                        }
3✔
865

866
                        inFlights = append(inFlights, p)
3✔
867

3✔
868
                        return nil
3✔
869
                })
870
        }, func() {
3✔
871
                inFlights = nil
3✔
872
        })
3✔
873
        if err != nil {
3✔
874
                return nil, err
×
875
        }
×
876

877
        elapsed := time.Since(start)
3✔
878
        log.Debugf("Completed scanning for inflight payments: "+
3✔
879
                "total_processed=%d, found_inflight=%d, elapsed=%v",
3✔
880
                processedCount, len(inFlights),
3✔
881
                elapsed.Round(time.Millisecond))
3✔
882

3✔
883
        return inFlights, nil
3✔
884
}
885

886
// htlcBucketKey creates a composite key from prefix and id where the result is
887
// simply the two concatenated.
888
func htlcBucketKey(prefix, id []byte) []byte {
3✔
889
        key := make([]byte, len(prefix)+len(id))
3✔
890
        copy(key, prefix)
3✔
891
        copy(key[len(prefix):], id)
3✔
892

3✔
893
        return key
3✔
894
}
3✔
895

896
// FetchPayments returns all sent payments found in the DB.
UNCOV
897
func (p *KVPaymentsDB) FetchPayments() ([]*MPPayment, error) {
×
UNCOV
898
        var payments []*MPPayment
×
UNCOV
899

×
UNCOV
900
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
×
UNCOV
901
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
×
UNCOV
902
                if paymentsBucket == nil {
×
903
                        return nil
×
904
                }
×
905

UNCOV
906
                return paymentsBucket.ForEach(func(k, v []byte) error {
×
UNCOV
907
                        bucket := paymentsBucket.NestedReadBucket(k)
×
UNCOV
908
                        if bucket == nil {
×
909
                                // We only expect sub-buckets to be found in
×
910
                                // this top-level bucket.
×
911
                                return fmt.Errorf("non bucket element in " +
×
912
                                        "payments bucket")
×
913
                        }
×
914

UNCOV
915
                        p, err := fetchPayment(bucket)
×
UNCOV
916
                        if err != nil {
×
917
                                return err
×
918
                        }
×
919

UNCOV
920
                        payments = append(payments, p)
×
UNCOV
921

×
UNCOV
922
                        // For older versions of lnd, duplicate payments to a
×
UNCOV
923
                        // payment has was possible. These will be found in a
×
UNCOV
924
                        // sub-bucket indexed by their sequence number if
×
UNCOV
925
                        // available.
×
UNCOV
926
                        duplicatePayments, err := fetchDuplicatePayments(bucket)
×
UNCOV
927
                        if err != nil {
×
928
                                return err
×
929
                        }
×
930

UNCOV
931
                        payments = append(payments, duplicatePayments...)
×
UNCOV
932

×
UNCOV
933
                        return nil
×
934
                })
UNCOV
935
        }, func() {
×
UNCOV
936
                payments = nil
×
UNCOV
937
        })
×
UNCOV
938
        if err != nil {
×
939
                return nil, err
×
940
        }
×
941

942
        // Before returning, sort the payments by their sequence number.
UNCOV
943
        sort.Slice(payments, func(i, j int) bool {
×
UNCOV
944
                return payments[i].SequenceNum < payments[j].SequenceNum
×
UNCOV
945
        })
×
946

UNCOV
947
        return payments, nil
×
948
}
949

950
func fetchCreationInfo(bucket kvdb.RBucket) (*PaymentCreationInfo, error) {
3✔
951
        b := bucket.Get(paymentCreationInfoKey)
3✔
952
        if b == nil {
3✔
953
                return nil, fmt.Errorf("creation info not found")
×
954
        }
×
955

956
        r := bytes.NewReader(b)
3✔
957

3✔
958
        return deserializePaymentCreationInfo(r)
3✔
959
}
960

961
func fetchPayment(bucket kvdb.RBucket) (*MPPayment, error) {
3✔
962
        seqBytes := bucket.Get(paymentSequenceKey)
3✔
963
        if seqBytes == nil {
3✔
964
                return nil, fmt.Errorf("sequence number not found")
×
965
        }
×
966

967
        sequenceNum := binary.BigEndian.Uint64(seqBytes)
3✔
968

3✔
969
        // Get the PaymentCreationInfo.
3✔
970
        creationInfo, err := fetchCreationInfo(bucket)
3✔
971
        if err != nil {
3✔
972
                return nil, err
×
973
        }
×
974

975
        var htlcs []HTLCAttempt
3✔
976
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
3✔
977
        if htlcsBucket != nil {
6✔
978
                // Get the payment attempts. This can be empty.
3✔
979
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
3✔
980
                if err != nil {
3✔
981
                        return nil, err
×
982
                }
×
983
        }
984

985
        // Get failure reason if available.
986
        var failureReason *FailureReason
3✔
987
        b := bucket.Get(paymentFailInfoKey)
3✔
988
        if b != nil {
6✔
989
                reason := FailureReason(b[0])
3✔
990
                failureReason = &reason
3✔
991
        }
3✔
992

993
        // Create a new payment.
994
        payment := &MPPayment{
3✔
995
                SequenceNum:   sequenceNum,
3✔
996
                Info:          creationInfo,
3✔
997
                HTLCs:         htlcs,
3✔
998
                FailureReason: failureReason,
3✔
999
        }
3✔
1000

3✔
1001
        // Set its state and status.
3✔
1002
        if err := payment.setState(); err != nil {
3✔
1003
                return nil, err
×
1004
        }
×
1005

1006
        return payment, nil
3✔
1007
}
1008

1009
// fetchHtlcAttempts retrieves all htlc attempts made for the payment found in
1010
// the given bucket.
1011
func fetchHtlcAttempts(bucket kvdb.RBucket) ([]HTLCAttempt, error) {
3✔
1012
        htlcsMap := make(map[uint64]*HTLCAttempt)
3✔
1013

3✔
1014
        attemptInfoCount := 0
3✔
1015
        err := bucket.ForEach(func(k, v []byte) error {
6✔
1016
                aid := byteOrder.Uint64(k[len(k)-8:])
3✔
1017

3✔
1018
                if _, ok := htlcsMap[aid]; !ok {
6✔
1019
                        htlcsMap[aid] = &HTLCAttempt{}
3✔
1020
                }
3✔
1021

1022
                var err error
3✔
1023
                switch {
3✔
1024
                case bytes.HasPrefix(k, htlcAttemptInfoKey):
3✔
1025
                        attemptInfo, err := readHtlcAttemptInfo(v)
3✔
1026
                        if err != nil {
3✔
1027
                                return err
×
1028
                        }
×
1029

1030
                        attemptInfo.AttemptID = aid
3✔
1031
                        htlcsMap[aid].HTLCAttemptInfo = *attemptInfo
3✔
1032
                        attemptInfoCount++
3✔
1033

1034
                case bytes.HasPrefix(k, htlcSettleInfoKey):
3✔
1035
                        htlcsMap[aid].Settle, err = readHtlcSettleInfo(v)
3✔
1036
                        if err != nil {
3✔
1037
                                return err
×
1038
                        }
×
1039

1040
                case bytes.HasPrefix(k, htlcFailInfoKey):
3✔
1041
                        htlcsMap[aid].Failure, err = readHtlcFailInfo(v)
3✔
1042
                        if err != nil {
3✔
1043
                                return err
×
1044
                        }
×
1045

1046
                default:
×
1047
                        return fmt.Errorf("unknown htlc attempt key")
×
1048
                }
1049

1050
                return nil
3✔
1051
        })
1052
        if err != nil {
3✔
1053
                return nil, err
×
1054
        }
×
1055

1056
        // Sanity check that all htlcs have an attempt info.
1057
        if attemptInfoCount != len(htlcsMap) {
3✔
1058
                return nil, ErrNoAttemptInfo
×
1059
        }
×
1060

1061
        keys := make([]uint64, len(htlcsMap))
3✔
1062
        i := 0
3✔
1063
        for k := range htlcsMap {
6✔
1064
                keys[i] = k
3✔
1065
                i++
3✔
1066
        }
3✔
1067

1068
        // Sort HTLC attempts by their attempt ID. This is needed because in the
1069
        // DB we store the attempts with keys prefixed by their status which
1070
        // changes order (groups them together by status).
1071
        sort.Slice(keys, func(i, j int) bool {
6✔
1072
                return keys[i] < keys[j]
3✔
1073
        })
3✔
1074

1075
        htlcs := make([]HTLCAttempt, len(htlcsMap))
3✔
1076
        for i, key := range keys {
6✔
1077
                htlcs[i] = *htlcsMap[key]
3✔
1078
        }
3✔
1079

1080
        return htlcs, nil
3✔
1081
}
1082

1083
// readHtlcAttemptInfo reads the payment attempt info for this htlc.
1084
func readHtlcAttemptInfo(b []byte) (*HTLCAttemptInfo, error) {
3✔
1085
        r := bytes.NewReader(b)
3✔
1086
        return deserializeHTLCAttemptInfo(r)
3✔
1087
}
3✔
1088

1089
// readHtlcSettleInfo reads the settle info for the htlc. If the htlc isn't
1090
// settled, nil is returned.
1091
func readHtlcSettleInfo(b []byte) (*HTLCSettleInfo, error) {
3✔
1092
        r := bytes.NewReader(b)
3✔
1093
        return deserializeHTLCSettleInfo(r)
3✔
1094
}
3✔
1095

1096
// readHtlcFailInfo reads the failure info for the htlc. If the htlc hasn't
1097
// failed, nil is returned.
1098
func readHtlcFailInfo(b []byte) (*HTLCFailInfo, error) {
3✔
1099
        r := bytes.NewReader(b)
3✔
1100
        return deserializeHTLCFailInfo(r)
3✔
1101
}
3✔
1102

1103
// fetchFailedHtlcKeys retrieves the bucket keys of all failed HTLCs of a
1104
// payment bucket.
UNCOV
1105
func fetchFailedHtlcKeys(bucket kvdb.RBucket) ([][]byte, error) {
×
UNCOV
1106
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
×
UNCOV
1107

×
UNCOV
1108
        var htlcs []HTLCAttempt
×
UNCOV
1109
        var err error
×
UNCOV
1110
        if htlcsBucket != nil {
×
UNCOV
1111
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
×
UNCOV
1112
                if err != nil {
×
1113
                        return nil, err
×
1114
                }
×
1115
        }
1116

1117
        // Now iterate though them and save the bucket keys for the failed
1118
        // HTLCs.
UNCOV
1119
        var htlcKeys [][]byte
×
UNCOV
1120
        for _, h := range htlcs {
×
UNCOV
1121
                if h.Failure == nil {
×
UNCOV
1122
                        continue
×
1123
                }
1124

UNCOV
1125
                htlcKeyBytes := make([]byte, 8)
×
UNCOV
1126
                binary.BigEndian.PutUint64(htlcKeyBytes, h.AttemptID)
×
UNCOV
1127

×
UNCOV
1128
                htlcKeys = append(htlcKeys, htlcKeyBytes)
×
1129
        }
1130

UNCOV
1131
        return htlcKeys, nil
×
1132
}
1133

1134
// QueryPayments is a query to the payments database which is restricted
1135
// to a subset of payments by the payments query, containing an offset
1136
// index and a maximum number of returned payments.
1137
func (p *KVPaymentsDB) QueryPayments(_ context.Context,
1138
        query Query) (Response, error) {
3✔
1139

3✔
1140
        var resp Response
3✔
1141

3✔
1142
        if err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
1143
                // Get the root payments bucket.
3✔
1144
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
3✔
1145
                if paymentsBucket == nil {
3✔
1146
                        return nil
×
1147
                }
×
1148

1149
                // Get the index bucket which maps sequence number -> payment
1150
                // hash and duplicate bool. If we have a payments bucket, we
1151
                // should have an indexes bucket as well.
1152
                indexes := tx.ReadBucket(paymentsIndexBucket)
3✔
1153
                if indexes == nil {
3✔
1154
                        return fmt.Errorf("index bucket does not exist")
×
1155
                }
×
1156

1157
                // accumulatePayments gets payments with the sequence number
1158
                // and hash provided and adds them to our list of payments if
1159
                // they meet the criteria of our query. It returns the number
1160
                // of payments that were added.
1161
                accumulatePayments := func(sequenceKey, hash []byte) (bool,
3✔
1162
                        error) {
6✔
1163

3✔
1164
                        r := bytes.NewReader(hash)
3✔
1165
                        paymentHash, err := deserializePaymentIndex(r)
3✔
1166
                        if err != nil {
3✔
1167
                                return false, err
×
1168
                        }
×
1169

1170
                        payment, err := fetchPaymentWithSequenceNumber(
3✔
1171
                                tx, paymentHash, sequenceKey,
3✔
1172
                        )
3✔
1173
                        if err != nil {
3✔
1174
                                return false, err
×
1175
                        }
×
1176

1177
                        // To keep compatibility with the old API, we only
1178
                        // return non-succeeded payments if requested.
1179
                        if payment.Status != StatusSucceeded &&
3✔
1180
                                !query.IncludeIncomplete {
3✔
UNCOV
1181

×
UNCOV
1182
                                return false, err
×
UNCOV
1183
                        }
×
1184

1185
                        // Get the creation time in Unix seconds, this always
1186
                        // rounds down the nanoseconds to full seconds.
1187
                        createTime := payment.Info.CreationTime.Unix()
3✔
1188

3✔
1189
                        // Skip any payments that were created before the
3✔
1190
                        // specified time.
3✔
1191
                        if createTime < query.CreationDateStart {
6✔
1192
                                return false, nil
3✔
1193
                        }
3✔
1194

1195
                        // Skip any payments that were created after the
1196
                        // specified time.
1197
                        if query.CreationDateEnd != 0 &&
3✔
1198
                                createTime > query.CreationDateEnd {
6✔
1199

3✔
1200
                                return false, nil
3✔
1201
                        }
3✔
1202

1203
                        // At this point, we've exhausted the offset, so we'll
1204
                        // begin collecting invoices found within the range.
1205
                        resp.Payments = append(resp.Payments, payment)
3✔
1206

3✔
1207
                        return true, nil
3✔
1208
                }
1209

1210
                // Create a paginator which reads from our sequence index bucket
1211
                // with the parameters provided by the payments query.
1212
                paginator := channeldb.NewPaginator(
3✔
1213
                        indexes.ReadCursor(), query.Reversed, query.IndexOffset,
3✔
1214
                        query.MaxPayments,
3✔
1215
                )
3✔
1216

3✔
1217
                // Run a paginated query, adding payments to our response.
3✔
1218
                if err := paginator.Query(accumulatePayments); err != nil {
3✔
1219
                        return err
×
1220
                }
×
1221

1222
                // Counting the total number of payments is expensive, since we
1223
                // literally have to traverse the cursor linearly, which can
1224
                // take quite a while. So it's an optional query parameter.
1225
                if query.CountTotal {
3✔
1226
                        var (
×
1227
                                totalPayments uint64
×
1228
                                err           error
×
1229
                        )
×
1230
                        countFn := func(_, _ []byte) error {
×
1231
                                totalPayments++
×
1232

×
1233
                                return nil
×
1234
                        }
×
1235

1236
                        // In non-boltdb database backends, there's a faster
1237
                        // ForAll query that allows for batch fetching items.
1238
                        fastBucket, ok := indexes.(kvdb.ExtendedRBucket)
×
1239
                        if ok {
×
1240
                                err = fastBucket.ForAll(countFn)
×
1241
                        } else {
×
1242
                                err = indexes.ForEach(countFn)
×
1243
                        }
×
1244
                        if err != nil {
×
1245
                                return fmt.Errorf("error counting payments: %w",
×
1246
                                        err)
×
1247
                        }
×
1248

1249
                        resp.TotalCount = totalPayments
×
1250
                }
1251

1252
                return nil
3✔
1253
        }, func() {
3✔
1254
                resp = Response{}
3✔
1255
        }); err != nil {
3✔
1256
                return resp, err
×
1257
        }
×
1258

1259
        // Need to swap the payments slice order if reversed order.
1260
        if query.Reversed {
3✔
UNCOV
1261
                for l, r := 0, len(resp.Payments)-1; l < r; l, r = l+1, r-1 {
×
UNCOV
1262
                        resp.Payments[l], resp.Payments[r] =
×
UNCOV
1263
                                resp.Payments[r], resp.Payments[l]
×
UNCOV
1264
                }
×
1265
        }
1266

1267
        // Set the first and last index of the returned payments so that the
1268
        // caller can resume from this point later on.
1269
        if len(resp.Payments) > 0 {
6✔
1270
                resp.FirstIndexOffset = resp.Payments[0].SequenceNum
3✔
1271
                resp.LastIndexOffset =
3✔
1272
                        resp.Payments[len(resp.Payments)-1].SequenceNum
3✔
1273
        }
3✔
1274

1275
        return resp, nil
3✔
1276
}
1277

1278
// fetchPaymentWithSequenceNumber get the payment which matches the payment hash
1279
// *and* sequence number provided from the database. This is required because
1280
// we previously had more than one payment per hash, so we have multiple indexes
1281
// pointing to a single payment; we want to retrieve the correct one.
1282
func fetchPaymentWithSequenceNumber(tx kvdb.RTx, paymentHash lntypes.Hash,
1283
        sequenceNumber []byte) (*MPPayment, error) {
3✔
1284

3✔
1285
        // We can now lookup the payment keyed by its hash in
3✔
1286
        // the payments root bucket.
3✔
1287
        bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
1288
        if err != nil {
3✔
1289
                return nil, err
×
1290
        }
×
1291

1292
        // A single payment hash can have multiple payments associated with it.
1293
        // We lookup our sequence number first, to determine whether this is
1294
        // the payment we are actually looking for.
1295
        seqBytes := bucket.Get(paymentSequenceKey)
3✔
1296
        if seqBytes == nil {
3✔
1297
                return nil, ErrNoSequenceNumber
×
1298
        }
×
1299

1300
        // If this top level payment has the sequence number we are looking for,
1301
        // return it.
1302
        if bytes.Equal(seqBytes, sequenceNumber) {
6✔
1303
                return fetchPayment(bucket)
3✔
1304
        }
3✔
1305

1306
        // If we were not looking for the top level payment, we are looking for
1307
        // one of our duplicate payments. We need to iterate through the seq
1308
        // numbers in this bucket to find the correct payments. If we do not
1309
        // find a duplicate payments bucket here, something is wrong.
UNCOV
1310
        dup := bucket.NestedReadBucket(duplicatePaymentsBucket)
×
UNCOV
1311
        if dup == nil {
×
UNCOV
1312
                return nil, ErrNoDuplicateBucket
×
UNCOV
1313
        }
×
1314

UNCOV
1315
        var duplicatePayment *MPPayment
×
UNCOV
1316
        err = dup.ForEach(func(k, v []byte) error {
×
UNCOV
1317
                subBucket := dup.NestedReadBucket(k)
×
UNCOV
1318
                if subBucket == nil {
×
1319
                        // We one bucket for each duplicate to be found.
×
1320
                        return ErrNoDuplicateNestedBucket
×
1321
                }
×
1322

UNCOV
1323
                seqBytes := subBucket.Get(duplicatePaymentSequenceKey)
×
UNCOV
1324
                if seqBytes == nil {
×
1325
                        return err
×
1326
                }
×
1327

1328
                // If this duplicate payment is not the sequence number we are
1329
                // looking for, we can continue.
UNCOV
1330
                if !bytes.Equal(seqBytes, sequenceNumber) {
×
UNCOV
1331
                        return nil
×
UNCOV
1332
                }
×
1333

UNCOV
1334
                duplicatePayment, err = fetchDuplicatePayment(subBucket)
×
UNCOV
1335
                if err != nil {
×
1336
                        return err
×
1337
                }
×
1338

UNCOV
1339
                return nil
×
1340
        })
UNCOV
1341
        if err != nil {
×
1342
                return nil, err
×
1343
        }
×
1344

1345
        // If none of the duplicate payments matched our sequence number, we
1346
        // failed to find the payment with this sequence number; something is
1347
        // wrong.
UNCOV
1348
        if duplicatePayment == nil {
×
UNCOV
1349
                return nil, ErrDuplicateNotFound
×
UNCOV
1350
        }
×
1351

UNCOV
1352
        return duplicatePayment, nil
×
1353
}
1354

1355
// DeletePayment deletes a payment from the DB given its payment hash. If
1356
// failedHtlcsOnly is set, only failed HTLC attempts of the payment will be
1357
// deleted.
1358
func (p *KVPaymentsDB) DeletePayment(paymentHash lntypes.Hash,
UNCOV
1359
        failedHtlcsOnly bool) error {
×
UNCOV
1360

×
UNCOV
1361
        return kvdb.Update(p.db, func(tx kvdb.RwTx) error {
×
UNCOV
1362
                payments := tx.ReadWriteBucket(paymentsRootBucket)
×
UNCOV
1363
                if payments == nil {
×
1364
                        return nil
×
1365
                }
×
1366

UNCOV
1367
                bucket := payments.NestedReadWriteBucket(paymentHash[:])
×
UNCOV
1368
                if bucket == nil {
×
UNCOV
1369
                        return fmt.Errorf("non bucket element in payments " +
×
UNCOV
1370
                                "bucket")
×
UNCOV
1371
                }
×
1372

1373
                // If the status is InFlight, we cannot safely delete
1374
                // the payment information, so we return early.
UNCOV
1375
                paymentStatus, err := fetchPaymentStatus(bucket)
×
UNCOV
1376
                if err != nil {
×
1377
                        return err
×
1378
                }
×
1379

1380
                // If the payment has inflight HTLCs, we cannot safely delete
1381
                // the payment information, so we return an error.
UNCOV
1382
                if err := paymentStatus.removable(); err != nil {
×
UNCOV
1383
                        return fmt.Errorf("payment '%v' has inflight HTLCs"+
×
UNCOV
1384
                                "and therefore cannot be deleted: %w",
×
UNCOV
1385
                                paymentHash.String(), err)
×
UNCOV
1386
                }
×
1387

1388
                // Delete the failed HTLC attempts we found.
UNCOV
1389
                if failedHtlcsOnly {
×
UNCOV
1390
                        toDelete, err := fetchFailedHtlcKeys(bucket)
×
UNCOV
1391
                        if err != nil {
×
1392
                                return err
×
1393
                        }
×
1394

UNCOV
1395
                        htlcsBucket := bucket.NestedReadWriteBucket(
×
UNCOV
1396
                                paymentHtlcsBucket,
×
UNCOV
1397
                        )
×
UNCOV
1398

×
UNCOV
1399
                        for _, htlcID := range toDelete {
×
UNCOV
1400
                                err = htlcsBucket.Delete(
×
UNCOV
1401
                                        htlcBucketKey(
×
UNCOV
1402
                                                htlcAttemptInfoKey, htlcID,
×
UNCOV
1403
                                        ),
×
UNCOV
1404
                                )
×
UNCOV
1405
                                if err != nil {
×
1406
                                        return err
×
1407
                                }
×
1408

UNCOV
1409
                                err = htlcsBucket.Delete(
×
UNCOV
1410
                                        htlcBucketKey(htlcFailInfoKey, htlcID),
×
UNCOV
1411
                                )
×
UNCOV
1412
                                if err != nil {
×
1413
                                        return err
×
1414
                                }
×
1415

UNCOV
1416
                                err = htlcsBucket.Delete(
×
UNCOV
1417
                                        htlcBucketKey(
×
UNCOV
1418
                                                htlcSettleInfoKey, htlcID,
×
UNCOV
1419
                                        ),
×
UNCOV
1420
                                )
×
UNCOV
1421
                                if err != nil {
×
1422
                                        return err
×
1423
                                }
×
1424
                        }
1425

UNCOV
1426
                        return nil
×
1427
                }
1428

UNCOV
1429
                seqNrs, err := fetchSequenceNumbers(bucket)
×
UNCOV
1430
                if err != nil {
×
1431
                        return err
×
1432
                }
×
1433

UNCOV
1434
                err = payments.DeleteNestedBucket(paymentHash[:])
×
UNCOV
1435
                if err != nil {
×
1436
                        return err
×
1437
                }
×
1438

UNCOV
1439
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
×
UNCOV
1440
                for _, k := range seqNrs {
×
UNCOV
1441
                        if err := indexBucket.Delete(k); err != nil {
×
1442
                                return err
×
1443
                        }
×
1444
                }
1445

UNCOV
1446
                return nil
×
UNCOV
1447
        }, func() {})
×
1448
}
1449

1450
// DeletePayments deletes all completed and failed payments from the DB. If
1451
// failedOnly is set, only failed payments will be considered for deletion. If
1452
// failedHtlcsOnly is set, the payment itself won't be deleted, only failed HTLC
1453
// attempts. The method returns the number of deleted payments, which is always
1454
// 0 if failedHtlcsOnly is set.
1455
func (p *KVPaymentsDB) DeletePayments(failedOnly,
1456
        failedHtlcsOnly bool) (int, error) {
3✔
1457

3✔
1458
        var numPayments int
3✔
1459
        err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
6✔
1460
                payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
1461
                if payments == nil {
3✔
1462
                        return nil
×
1463
                }
×
1464

1465
                var (
3✔
1466
                        // deleteBuckets is the set of payment buckets we need
3✔
1467
                        // to delete.
3✔
1468
                        deleteBuckets [][]byte
3✔
1469

3✔
1470
                        // deleteIndexes is the set of indexes pointing to these
3✔
1471
                        // payments that need to be deleted.
3✔
1472
                        deleteIndexes [][]byte
3✔
1473

3✔
1474
                        // deleteHtlcs maps a payment hash to the HTLC IDs we
3✔
1475
                        // want to delete for that payment.
3✔
1476
                        deleteHtlcs = make(map[lntypes.Hash][][]byte)
3✔
1477
                )
3✔
1478
                err := payments.ForEach(func(k, _ []byte) error {
6✔
1479
                        bucket := payments.NestedReadBucket(k)
3✔
1480
                        if bucket == nil {
3✔
1481
                                // We only expect sub-buckets to be found in
×
1482
                                // this top-level bucket.
×
1483
                                return fmt.Errorf("non bucket element in " +
×
1484
                                        "payments bucket")
×
1485
                        }
×
1486

1487
                        // If the status is InFlight, we cannot safely delete
1488
                        // the payment information, so we return early.
1489
                        paymentStatus, err := fetchPaymentStatus(bucket)
3✔
1490
                        if err != nil {
3✔
1491
                                return err
×
1492
                        }
×
1493

1494
                        // If the payment has inflight HTLCs, we cannot safely
1495
                        // delete the payment information, so we return an nil
1496
                        // to skip it.
1497
                        if err := paymentStatus.removable(); err != nil {
3✔
UNCOV
1498
                                return nil
×
UNCOV
1499
                        }
×
1500

1501
                        // If we requested to only delete failed payments, we
1502
                        // can return if this one is not.
1503
                        if failedOnly && paymentStatus != StatusFailed {
3✔
UNCOV
1504
                                return nil
×
UNCOV
1505
                        }
×
1506

1507
                        // If we are only deleting failed HTLCs, fetch them.
1508
                        if failedHtlcsOnly {
3✔
UNCOV
1509
                                toDelete, err := fetchFailedHtlcKeys(bucket)
×
UNCOV
1510
                                if err != nil {
×
1511
                                        return err
×
1512
                                }
×
1513

UNCOV
1514
                                hash, err := lntypes.MakeHash(k)
×
UNCOV
1515
                                if err != nil {
×
1516
                                        return err
×
1517
                                }
×
1518

UNCOV
1519
                                deleteHtlcs[hash] = toDelete
×
UNCOV
1520

×
UNCOV
1521
                                // We return, we are only deleting attempts.
×
UNCOV
1522
                                return nil
×
1523
                        }
1524

1525
                        // Add the bucket to the set of buckets we can delete.
1526
                        deleteBuckets = append(deleteBuckets, k)
3✔
1527

3✔
1528
                        // Get all the sequence number associated with the
3✔
1529
                        // payment, including duplicates.
3✔
1530
                        seqNrs, err := fetchSequenceNumbers(bucket)
3✔
1531
                        if err != nil {
3✔
1532
                                return err
×
1533
                        }
×
1534

1535
                        deleteIndexes = append(deleteIndexes, seqNrs...)
3✔
1536
                        numPayments++
3✔
1537

3✔
1538
                        return nil
3✔
1539
                })
1540
                if err != nil {
3✔
1541
                        return err
×
1542
                }
×
1543

1544
                // Delete the failed HTLC attempts we found.
1545
                for hash, htlcIDs := range deleteHtlcs {
3✔
UNCOV
1546
                        bucket := payments.NestedReadWriteBucket(hash[:])
×
UNCOV
1547
                        htlcsBucket := bucket.NestedReadWriteBucket(
×
UNCOV
1548
                                paymentHtlcsBucket,
×
UNCOV
1549
                        )
×
UNCOV
1550

×
UNCOV
1551
                        for _, aid := range htlcIDs {
×
UNCOV
1552
                                if err := htlcsBucket.Delete(
×
UNCOV
1553
                                        htlcBucketKey(htlcAttemptInfoKey, aid),
×
UNCOV
1554
                                ); err != nil {
×
1555
                                        return err
×
1556
                                }
×
1557

UNCOV
1558
                                if err := htlcsBucket.Delete(
×
UNCOV
1559
                                        htlcBucketKey(htlcFailInfoKey, aid),
×
UNCOV
1560
                                ); err != nil {
×
1561
                                        return err
×
1562
                                }
×
1563

UNCOV
1564
                                if err := htlcsBucket.Delete(
×
UNCOV
1565
                                        htlcBucketKey(htlcSettleInfoKey, aid),
×
UNCOV
1566
                                ); err != nil {
×
1567
                                        return err
×
1568
                                }
×
1569
                        }
1570
                }
1571

1572
                for _, k := range deleteBuckets {
6✔
1573
                        if err := payments.DeleteNestedBucket(k); err != nil {
3✔
1574
                                return err
×
1575
                        }
×
1576
                }
1577

1578
                // Get our index bucket and delete all indexes pointing to the
1579
                // payments we are deleting.
1580
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
1581
                for _, k := range deleteIndexes {
6✔
1582
                        if err := indexBucket.Delete(k); err != nil {
3✔
1583
                                return err
×
1584
                        }
×
1585
                }
1586

1587
                return nil
3✔
1588
        }, func() {
3✔
1589
                numPayments = 0
3✔
1590
        })
3✔
1591
        if err != nil {
3✔
1592
                return 0, err
×
1593
        }
×
1594

1595
        return numPayments, nil
3✔
1596
}
1597

1598
// fetchSequenceNumbers fetches all the sequence numbers associated with a
1599
// payment, including those belonging to any duplicate payments.
1600
func fetchSequenceNumbers(paymentBucket kvdb.RBucket) ([][]byte, error) {
3✔
1601
        seqNum := paymentBucket.Get(paymentSequenceKey)
3✔
1602
        if seqNum == nil {
3✔
1603
                return nil, errors.New("expected sequence number")
×
1604
        }
×
1605

1606
        sequenceNumbers := [][]byte{seqNum}
3✔
1607

3✔
1608
        // Get the duplicate payments bucket, if it has no duplicates, just
3✔
1609
        // return early with the payment sequence number.
3✔
1610
        duplicates := paymentBucket.NestedReadBucket(duplicatePaymentsBucket)
3✔
1611
        if duplicates == nil {
6✔
1612
                return sequenceNumbers, nil
3✔
1613
        }
3✔
1614

1615
        // If we do have duplicated, they are keyed by sequence number, so we
1616
        // iterate through the duplicates bucket and add them to our set of
1617
        // sequence numbers.
UNCOV
1618
        if err := duplicates.ForEach(func(k, v []byte) error {
×
UNCOV
1619
                sequenceNumbers = append(sequenceNumbers, k)
×
UNCOV
1620
                return nil
×
UNCOV
1621
        }); err != nil {
×
1622
                return nil, err
×
1623
        }
×
1624

UNCOV
1625
        return sequenceNumbers, nil
×
1626
}
1627

1628
func serializePaymentCreationInfo(w io.Writer, c *PaymentCreationInfo) error {
3✔
1629
        var scratch [8]byte
3✔
1630

3✔
1631
        if _, err := w.Write(c.PaymentIdentifier[:]); err != nil {
3✔
1632
                return err
×
1633
        }
×
1634

1635
        byteOrder.PutUint64(scratch[:], uint64(c.Value))
3✔
1636
        if _, err := w.Write(scratch[:]); err != nil {
3✔
1637
                return err
×
1638
        }
×
1639

1640
        if err := serializeTime(w, c.CreationTime); err != nil {
3✔
1641
                return err
×
1642
        }
×
1643

1644
        byteOrder.PutUint32(scratch[:4], uint32(len(c.PaymentRequest)))
3✔
1645
        if _, err := w.Write(scratch[:4]); err != nil {
3✔
1646
                return err
×
1647
        }
×
1648

1649
        if _, err := w.Write(c.PaymentRequest); err != nil {
3✔
1650
                return err
×
1651
        }
×
1652

1653
        // Any remaining bytes are TLV encoded records. Currently, these are
1654
        // only the custom records provided by the user to be sent to the first
1655
        // hop. But this can easily be extended with further records by merging
1656
        // the records into a single TLV stream.
1657
        err := c.FirstHopCustomRecords.SerializeTo(w)
3✔
1658
        if err != nil {
3✔
1659
                return err
×
1660
        }
×
1661

1662
        return nil
3✔
1663
}
1664

1665
func deserializePaymentCreationInfo(r io.Reader) (*PaymentCreationInfo,
1666
        error) {
3✔
1667

3✔
1668
        var scratch [8]byte
3✔
1669

3✔
1670
        c := &PaymentCreationInfo{}
3✔
1671

3✔
1672
        if _, err := io.ReadFull(r, c.PaymentIdentifier[:]); err != nil {
3✔
1673
                return nil, err
×
1674
        }
×
1675

1676
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
3✔
1677
                return nil, err
×
1678
        }
×
1679
        c.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
3✔
1680

3✔
1681
        creationTime, err := deserializeTime(r)
3✔
1682
        if err != nil {
3✔
1683
                return nil, err
×
1684
        }
×
1685
        c.CreationTime = creationTime
3✔
1686

3✔
1687
        if _, err := io.ReadFull(r, scratch[:4]); err != nil {
3✔
1688
                return nil, err
×
1689
        }
×
1690

1691
        reqLen := byteOrder.Uint32(scratch[:4])
3✔
1692
        payReq := make([]byte, reqLen)
3✔
1693
        if reqLen > 0 {
6✔
1694
                if _, err := io.ReadFull(r, payReq); err != nil {
3✔
1695
                        return nil, err
×
1696
                }
×
1697
        }
1698
        c.PaymentRequest = payReq
3✔
1699

3✔
1700
        // Any remaining bytes are TLV encoded records. Currently, these are
3✔
1701
        // only the custom records provided by the user to be sent to the first
3✔
1702
        // hop. But this can easily be extended with further records by merging
3✔
1703
        // the records into a single TLV stream.
3✔
1704
        c.FirstHopCustomRecords, err = lnwire.ParseCustomRecordsFrom(r)
3✔
1705
        if err != nil {
3✔
1706
                return nil, err
×
1707
        }
×
1708

1709
        return c, nil
3✔
1710
}
1711

1712
func serializeHTLCAttemptInfo(w io.Writer, a *HTLCAttemptInfo) error {
3✔
1713
        if err := WriteElements(w, a.sessionKey); err != nil {
3✔
1714
                return err
×
1715
        }
×
1716

1717
        if err := SerializeRoute(w, a.Route); err != nil {
3✔
1718
                return err
×
1719
        }
×
1720

1721
        if err := serializeTime(w, a.AttemptTime); err != nil {
3✔
1722
                return err
×
1723
        }
×
1724

1725
        // If the hash is nil we can just return.
1726
        if a.Hash == nil {
3✔
1727
                return nil
×
1728
        }
×
1729

1730
        if _, err := w.Write(a.Hash[:]); err != nil {
3✔
1731
                return err
×
1732
        }
×
1733

1734
        // Merge the fixed/known records together with the custom records to
1735
        // serialize them as a single blob. We can't do this in SerializeRoute
1736
        // because we're in the middle of the byte stream there. We can only do
1737
        // TLV serialization at the end of the stream, since EOF is allowed for
1738
        // a stream if no more data is expected.
1739
        producers := []tlv.RecordProducer{
3✔
1740
                &a.Route.FirstHopAmount,
3✔
1741
        }
3✔
1742
        tlvData, err := lnwire.MergeAndEncode(
3✔
1743
                producers, nil, a.Route.FirstHopWireCustomRecords,
3✔
1744
        )
3✔
1745
        if err != nil {
3✔
1746
                return err
×
1747
        }
×
1748

1749
        if _, err := w.Write(tlvData); err != nil {
3✔
1750
                return err
×
1751
        }
×
1752

1753
        return nil
3✔
1754
}
1755

1756
func deserializeHTLCAttemptInfo(r io.Reader) (*HTLCAttemptInfo, error) {
3✔
1757
        a := &HTLCAttemptInfo{}
3✔
1758
        err := ReadElements(r, &a.sessionKey)
3✔
1759
        if err != nil {
3✔
1760
                return nil, err
×
1761
        }
×
1762

1763
        a.Route, err = DeserializeRoute(r)
3✔
1764
        if err != nil {
3✔
1765
                return nil, err
×
1766
        }
×
1767

1768
        a.AttemptTime, err = deserializeTime(r)
3✔
1769
        if err != nil {
3✔
1770
                return nil, err
×
1771
        }
×
1772

1773
        hash := lntypes.Hash{}
3✔
1774
        _, err = io.ReadFull(r, hash[:])
3✔
1775

3✔
1776
        switch {
3✔
1777
        // Older payment attempts wouldn't have the hash set, in which case we
1778
        // can just return.
1779
        case errors.Is(err, io.EOF), errors.Is(err, io.ErrUnexpectedEOF):
×
1780
                return a, nil
×
1781

1782
        case err != nil:
×
1783
                return nil, err
×
1784

1785
        default:
3✔
1786
        }
1787

1788
        a.Hash = &hash
3✔
1789

3✔
1790
        // Read any remaining data (if any) and parse it into the known records
3✔
1791
        // and custom records.
3✔
1792
        extraData, err := io.ReadAll(r)
3✔
1793
        if err != nil {
3✔
1794
                return nil, err
×
1795
        }
×
1796

1797
        customRecords, _, _, err := lnwire.ParseAndExtractCustomRecords(
3✔
1798
                extraData, &a.Route.FirstHopAmount,
3✔
1799
        )
3✔
1800
        if err != nil {
3✔
1801
                return nil, err
×
1802
        }
×
1803

1804
        a.Route.FirstHopWireCustomRecords = customRecords
3✔
1805

3✔
1806
        return a, nil
3✔
1807
}
1808

1809
func serializeHop(w io.Writer, h *route.Hop) error {
3✔
1810
        if err := WriteElements(w,
3✔
1811
                h.PubKeyBytes[:],
3✔
1812
                h.ChannelID,
3✔
1813
                h.OutgoingTimeLock,
3✔
1814
                h.AmtToForward,
3✔
1815
        ); err != nil {
3✔
1816
                return err
×
1817
        }
×
1818

1819
        if err := binary.Write(w, byteOrder, h.LegacyPayload); err != nil {
3✔
1820
                return err
×
1821
        }
×
1822

1823
        // For legacy payloads, we don't need to write any TLV records, so
1824
        // we'll write a zero indicating the our serialized TLV map has no
1825
        // records.
1826
        if h.LegacyPayload {
3✔
UNCOV
1827
                return WriteElements(w, uint32(0))
×
UNCOV
1828
        }
×
1829

1830
        // Gather all non-primitive TLV records so that they can be serialized
1831
        // as a single blob.
1832
        //
1833
        // TODO(conner): add migration to unify all fields in a single TLV
1834
        // blobs. The split approach will cause headaches down the road as more
1835
        // fields are added, which we can avoid by having a single TLV stream
1836
        // for all payload fields.
1837
        var records []tlv.Record
3✔
1838
        if h.MPP != nil {
6✔
1839
                records = append(records, h.MPP.Record())
3✔
1840
        }
3✔
1841

1842
        // Add blinding point and encrypted data if present.
1843
        if h.EncryptedData != nil {
6✔
1844
                records = append(records, record.NewEncryptedDataRecord(
3✔
1845
                        &h.EncryptedData,
3✔
1846
                ))
3✔
1847
        }
3✔
1848

1849
        if h.BlindingPoint != nil {
6✔
1850
                records = append(records, record.NewBlindingPointRecord(
3✔
1851
                        &h.BlindingPoint,
3✔
1852
                ))
3✔
1853
        }
3✔
1854

1855
        if h.AMP != nil {
6✔
1856
                records = append(records, h.AMP.Record())
3✔
1857
        }
3✔
1858

1859
        if h.Metadata != nil {
3✔
UNCOV
1860
                records = append(records, record.NewMetadataRecord(&h.Metadata))
×
UNCOV
1861
        }
×
1862

1863
        if h.TotalAmtMsat != 0 {
6✔
1864
                totalMsatInt := uint64(h.TotalAmtMsat)
3✔
1865
                records = append(
3✔
1866
                        records, record.NewTotalAmtMsatBlinded(&totalMsatInt),
3✔
1867
                )
3✔
1868
        }
3✔
1869

1870
        // Final sanity check to absolutely rule out custom records that are not
1871
        // custom and write into the standard range.
1872
        if err := h.CustomRecords.Validate(); err != nil {
3✔
1873
                return err
×
1874
        }
×
1875

1876
        // Convert custom records to tlv and add to the record list.
1877
        // MapToRecords sorts the list, so adding it here will keep the list
1878
        // canonical.
1879
        tlvRecords := tlv.MapToRecords(h.CustomRecords)
3✔
1880
        records = append(records, tlvRecords...)
3✔
1881

3✔
1882
        // Otherwise, we'll transform our slice of records into a map of the
3✔
1883
        // raw bytes, then serialize them in-line with a length (number of
3✔
1884
        // elements) prefix.
3✔
1885
        mapRecords, err := tlv.RecordsToMap(records)
3✔
1886
        if err != nil {
3✔
1887
                return err
×
1888
        }
×
1889

1890
        numRecords := uint32(len(mapRecords))
3✔
1891
        if err := WriteElements(w, numRecords); err != nil {
3✔
1892
                return err
×
1893
        }
×
1894

1895
        for recordType, rawBytes := range mapRecords {
6✔
1896
                if err := WriteElements(w, recordType); err != nil {
3✔
1897
                        return err
×
1898
                }
×
1899

1900
                if err := wire.WriteVarBytes(w, 0, rawBytes); err != nil {
3✔
1901
                        return err
×
1902
                }
×
1903
        }
1904

1905
        return nil
3✔
1906
}
1907

1908
// maxOnionPayloadSize is the largest Sphinx payload possible, so we don't need
1909
// to read/write a TLV stream larger than this.
1910
const maxOnionPayloadSize = 1300
1911

1912
func deserializeHop(r io.Reader) (*route.Hop, error) {
3✔
1913
        h := &route.Hop{}
3✔
1914

3✔
1915
        var pub []byte
3✔
1916
        if err := ReadElements(r, &pub); err != nil {
3✔
1917
                return nil, err
×
1918
        }
×
1919
        copy(h.PubKeyBytes[:], pub)
3✔
1920

3✔
1921
        if err := ReadElements(r,
3✔
1922
                &h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward,
3✔
1923
        ); err != nil {
3✔
1924
                return nil, err
×
1925
        }
×
1926

1927
        // TODO(roasbeef): change field to allow LegacyPayload false to be the
1928
        // legacy default?
1929
        err := binary.Read(r, byteOrder, &h.LegacyPayload)
3✔
1930
        if err != nil {
3✔
1931
                return nil, err
×
1932
        }
×
1933

1934
        var numElements uint32
3✔
1935
        if err := ReadElements(r, &numElements); err != nil {
3✔
1936
                return nil, err
×
1937
        }
×
1938

1939
        // If there're no elements, then we can return early.
1940
        if numElements == 0 {
6✔
1941
                return h, nil
3✔
1942
        }
3✔
1943

1944
        tlvMap := make(map[uint64][]byte)
3✔
1945
        for i := uint32(0); i < numElements; i++ {
6✔
1946
                var tlvType uint64
3✔
1947
                if err := ReadElements(r, &tlvType); err != nil {
3✔
1948
                        return nil, err
×
1949
                }
×
1950

1951
                rawRecordBytes, err := wire.ReadVarBytes(
3✔
1952
                        r, 0, maxOnionPayloadSize, "tlv",
3✔
1953
                )
3✔
1954
                if err != nil {
3✔
1955
                        return nil, err
×
1956
                }
×
1957

1958
                tlvMap[tlvType] = rawRecordBytes
3✔
1959
        }
1960

1961
        // If the MPP type is present, remove it from the generic TLV map and
1962
        // parse it back into a proper MPP struct.
1963
        //
1964
        // TODO(conner): add migration to unify all fields in a single TLV
1965
        // blobs. The split approach will cause headaches down the road as more
1966
        // fields are added, which we can avoid by having a single TLV stream
1967
        // for all payload fields.
1968
        mppType := uint64(record.MPPOnionType)
3✔
1969
        if mppBytes, ok := tlvMap[mppType]; ok {
6✔
1970
                delete(tlvMap, mppType)
3✔
1971

3✔
1972
                var (
3✔
1973
                        mpp    = &record.MPP{}
3✔
1974
                        mppRec = mpp.Record()
3✔
1975
                        r      = bytes.NewReader(mppBytes)
3✔
1976
                )
3✔
1977
                err := mppRec.Decode(r, uint64(len(mppBytes)))
3✔
1978
                if err != nil {
3✔
1979
                        return nil, err
×
1980
                }
×
1981
                h.MPP = mpp
3✔
1982
        }
1983

1984
        // If encrypted data or blinding key are present, remove them from
1985
        // the TLV map and parse into proper types.
1986
        encryptedDataType := uint64(record.EncryptedDataOnionType)
3✔
1987
        if data, ok := tlvMap[encryptedDataType]; ok {
6✔
1988
                delete(tlvMap, encryptedDataType)
3✔
1989
                h.EncryptedData = data
3✔
1990
        }
3✔
1991

1992
        blindingType := uint64(record.BlindingPointOnionType)
3✔
1993
        if blindingPoint, ok := tlvMap[blindingType]; ok {
6✔
1994
                delete(tlvMap, blindingType)
3✔
1995

3✔
1996
                h.BlindingPoint, err = btcec.ParsePubKey(blindingPoint)
3✔
1997
                if err != nil {
3✔
1998
                        return nil, fmt.Errorf("invalid blinding point: %w",
×
1999
                                err)
×
2000
                }
×
2001
        }
2002

2003
        ampType := uint64(record.AMPOnionType)
3✔
2004
        if ampBytes, ok := tlvMap[ampType]; ok {
6✔
2005
                delete(tlvMap, ampType)
3✔
2006

3✔
2007
                var (
3✔
2008
                        amp    = &record.AMP{}
3✔
2009
                        ampRec = amp.Record()
3✔
2010
                        r      = bytes.NewReader(ampBytes)
3✔
2011
                )
3✔
2012
                err := ampRec.Decode(r, uint64(len(ampBytes)))
3✔
2013
                if err != nil {
3✔
2014
                        return nil, err
×
2015
                }
×
2016
                h.AMP = amp
3✔
2017
        }
2018

2019
        // If the metadata type is present, remove it from the tlv map and
2020
        // populate directly on the hop.
2021
        metadataType := uint64(record.MetadataOnionType)
3✔
2022
        if metadata, ok := tlvMap[metadataType]; ok {
3✔
UNCOV
2023
                delete(tlvMap, metadataType)
×
UNCOV
2024

×
UNCOV
2025
                h.Metadata = metadata
×
UNCOV
2026
        }
×
2027

2028
        totalAmtMsatType := uint64(record.TotalAmtMsatBlindedType)
3✔
2029
        if totalAmtMsat, ok := tlvMap[totalAmtMsatType]; ok {
6✔
2030
                delete(tlvMap, totalAmtMsatType)
3✔
2031

3✔
2032
                var (
3✔
2033
                        totalAmtMsatInt uint64
3✔
2034
                        buf             [8]byte
3✔
2035
                )
3✔
2036
                if err := tlv.DTUint64(
3✔
2037
                        bytes.NewReader(totalAmtMsat),
3✔
2038
                        &totalAmtMsatInt,
3✔
2039
                        &buf,
3✔
2040
                        uint64(len(totalAmtMsat)),
3✔
2041
                ); err != nil {
3✔
2042
                        return nil, err
×
2043
                }
×
2044

2045
                h.TotalAmtMsat = lnwire.MilliSatoshi(totalAmtMsatInt)
3✔
2046
        }
2047

2048
        h.CustomRecords = tlvMap
3✔
2049

3✔
2050
        return h, nil
3✔
2051
}
2052

2053
// SerializeRoute serializes a route.
2054
func SerializeRoute(w io.Writer, r route.Route) error {
3✔
2055
        if err := WriteElements(w,
3✔
2056
                r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:],
3✔
2057
        ); err != nil {
3✔
2058
                return err
×
2059
        }
×
2060

2061
        if err := WriteElements(w, uint32(len(r.Hops))); err != nil {
3✔
2062
                return err
×
2063
        }
×
2064

2065
        for _, h := range r.Hops {
6✔
2066
                if err := serializeHop(w, h); err != nil {
3✔
2067
                        return err
×
2068
                }
×
2069
        }
2070

2071
        // Any new/extra TLV data is encoded in serializeHTLCAttemptInfo!
2072

2073
        return nil
3✔
2074
}
2075

2076
// DeserializeRoute deserializes a route.
2077
func DeserializeRoute(r io.Reader) (route.Route, error) {
3✔
2078
        rt := route.Route{}
3✔
2079
        if err := ReadElements(r,
3✔
2080
                &rt.TotalTimeLock, &rt.TotalAmount,
3✔
2081
        ); err != nil {
3✔
2082
                return rt, err
×
2083
        }
×
2084

2085
        var pub []byte
3✔
2086
        if err := ReadElements(r, &pub); err != nil {
3✔
2087
                return rt, err
×
2088
        }
×
2089
        copy(rt.SourcePubKey[:], pub)
3✔
2090

3✔
2091
        var numHops uint32
3✔
2092
        if err := ReadElements(r, &numHops); err != nil {
3✔
2093
                return rt, err
×
2094
        }
×
2095

2096
        var hops []*route.Hop
3✔
2097
        for i := uint32(0); i < numHops; i++ {
6✔
2098
                hop, err := deserializeHop(r)
3✔
2099
                if err != nil {
3✔
2100
                        return rt, err
×
2101
                }
×
2102
                hops = append(hops, hop)
3✔
2103
        }
2104
        rt.Hops = hops
3✔
2105

3✔
2106
        // Any new/extra TLV data is decoded in deserializeHTLCAttemptInfo!
3✔
2107

3✔
2108
        return rt, nil
3✔
2109
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc