• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17101605539

20 Aug 2025 02:35PM UTC coverage: 57.321% (-9.4%) from 66.68%
17101605539

push

github

web-flow
Merge pull request #10102 from yyforyongyu/fix-UpdatesInHorizon

Catch bad gossip peer and fix `UpdatesInHorizon`

28 of 89 new or added lines in 4 files covered. (31.46%)

29163 existing lines in 459 files now uncovered.

99187 of 173038 relevant lines covered (57.32%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.62
/payments/db/kv_store.go
1
package paymentsdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "sort"
12
        "sync"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcec/v2"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/kvdb"
19
        "github.com/lightningnetwork/lnd/lntypes"
20
        "github.com/lightningnetwork/lnd/lnwire"
21
        "github.com/lightningnetwork/lnd/record"
22
        "github.com/lightningnetwork/lnd/routing/route"
23
        "github.com/lightningnetwork/lnd/tlv"
24
)
25

26
const (
27
        // paymentSeqBlockSize is the block size used when we batch allocate
28
        // payment sequences for future payments.
29
        paymentSeqBlockSize = 1000
30

31
        // paymentProgressLogInterval is the interval we use limiting the
32
        // logging output of payment processing.
33
        paymentProgressLogInterval = 30 * time.Second
34
)
35

36
//nolint:ll
37
var (
38
        // paymentsRootBucket is the name of the top-level bucket within the
39
        // database that stores all data related to payments. Within this
40
        // bucket, each payment hash its own sub-bucket keyed by its payment
41
        // hash.
42
        //
43
        // Bucket hierarchy:
44
        //
45
        // root-bucket
46
        //      |
47
        //      |-- <paymenthash>
48
        //      |        |--sequence-key: <sequence number>
49
        //      |        |--creation-info-key: <creation info>
50
        //      |        |--fail-info-key: <(optional) fail info>
51
        //      |        |
52
        //      |        |--payment-htlcs-bucket (shard-bucket)
53
        //      |        |        |
54
        //      |        |        |-- ai<htlc attempt ID>: <htlc attempt info>
55
        //      |        |        |-- si<htlc attempt ID>: <(optional) settle info>
56
        //      |        |        |-- fi<htlc attempt ID>: <(optional) fail info>
57
        //      |        |        |
58
        //      |        |       ...
59
        //      |        |
60
        //      |        |
61
        //      |        |--duplicate-bucket (only for old, completed payments)
62
        //      |                 |
63
        //      |                 |-- <seq-num>
64
        //      |                 |       |--sequence-key: <sequence number>
65
        //      |                 |       |--creation-info-key: <creation info>
66
        //      |                 |       |--ai: <attempt info>
67
        //      |                 |       |--si: <settle info>
68
        //      |                 |       |--fi: <fail info>
69
        //      |                 |
70
        //      |                 |-- <seq-num>
71
        //      |                 |       |
72
        //      |                ...     ...
73
        //      |
74
        //      |-- <paymenthash>
75
        //      |        |
76
        //      |       ...
77
        //     ...
78
        //
79
        paymentsRootBucket = []byte("payments-root-bucket")
80

81
        // paymentSequenceKey is a key used in the payment's sub-bucket to
82
        // store the sequence number of the payment.
83
        paymentSequenceKey = []byte("payment-sequence-key")
84

85
        // paymentCreationInfoKey is a key used in the payment's sub-bucket to
86
        // store the creation info of the payment.
87
        paymentCreationInfoKey = []byte("payment-creation-info")
88

89
        // paymentHtlcsBucket is a bucket where we'll store the information
90
        // about the HTLCs that were attempted for a payment.
91
        paymentHtlcsBucket = []byte("payment-htlcs-bucket")
92

93
        // htlcAttemptInfoKey is the key used as the prefix of an HTLC attempt
94
        // to store the info about the attempt that was done for the HTLC in
95
        // question. The HTLC attempt ID is concatenated at the end.
96
        htlcAttemptInfoKey = []byte("ai")
97

98
        // htlcSettleInfoKey is the key used as the prefix of an HTLC attempt
99
        // settle info, if any. The HTLC attempt ID is concatenated at the end.
100
        htlcSettleInfoKey = []byte("si")
101

102
        // htlcFailInfoKey is the key used as the prefix of an HTLC attempt
103
        // failure information, if any.The  HTLC attempt ID is concatenated at
104
        // the end.
105
        htlcFailInfoKey = []byte("fi")
106

107
        // paymentFailInfoKey is a key used in the payment's sub-bucket to
108
        // store information about the reason a payment failed.
109
        paymentFailInfoKey = []byte("payment-fail-info")
110

111
        // paymentsIndexBucket is the name of the top-level bucket within the
112
        // database that stores an index of payment sequence numbers to its
113
        // payment hash.
114
        // payments-sequence-index-bucket
115
        //         |--<sequence-number>: <payment hash>
116
        //         |--...
117
        //         |--<sequence-number>: <payment hash>
118
        paymentsIndexBucket = []byte("payments-index-bucket")
119
)
120

121
// KVStore implements persistence for payments and payment attempts.
122
type KVStore struct {
123
        // Sequence management for the kv store.
124
        seqMu     sync.Mutex
125
        currSeq   uint64
126
        storedSeq uint64
127

128
        // db is the underlying database implementation.
129
        db kvdb.Backend
130

131
        // keepFailedPaymentAttempts is a flag that indicates whether we should
132
        // keep failed payment attempts in the database.
133
        keepFailedPaymentAttempts bool
134
}
135

136
// defaultKVStoreOptions returns the default options for the KV store.
137
func defaultKVStoreOptions() *StoreOptions {
3✔
138
        return &StoreOptions{
3✔
139
                KeepFailedPaymentAttempts: false,
3✔
140
        }
3✔
141
}
3✔
142

143
// NewKVStore creates a new KVStore for payments.
144
func NewKVStore(db kvdb.Backend,
145
        options ...OptionModifier) (*KVStore, error) {
3✔
146

3✔
147
        opts := defaultKVStoreOptions()
3✔
148
        for _, applyOption := range options {
6✔
149
                applyOption(opts)
3✔
150
        }
3✔
151

152
        if !opts.NoMigration {
6✔
153
                if err := initKVStore(db); err != nil {
3✔
154
                        return nil, err
×
UNCOV
155
                }
×
156
        }
157

158
        return &KVStore{
3✔
159
                db:                        db,
3✔
160
                keepFailedPaymentAttempts: opts.KeepFailedPaymentAttempts,
3✔
161
        }, nil
3✔
162
}
163

164
// paymentsTopLevelBuckets is a list of top-level buckets that are used for
165
// the payments database when using the kv store.
166
var paymentsTopLevelBuckets = [][]byte{
167
        paymentsRootBucket,
168
        paymentsIndexBucket,
169
}
170

171
// initKVStore creates and initializes the top-level buckets for the payment db.
172
func initKVStore(db kvdb.Backend) error {
3✔
173
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
174
                for _, tlb := range paymentsTopLevelBuckets {
6✔
175
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
3✔
176
                                return err
×
UNCOV
177
                        }
×
178
                }
179

180
                return nil
3✔
181
        }, func() {})
3✔
182
        if err != nil {
3✔
183
                return fmt.Errorf("unable to create new payments db: %w", err)
×
UNCOV
184
        }
×
185

186
        return nil
3✔
187
}
188

189
// InitPayment checks or records the given PaymentCreationInfo with the DB,
190
// making sure it does not already exist as an in-flight payment. When this
191
// method returns successfully, the payment is guaranteed to be in the InFlight
192
// state.
193
func (p *KVStore) InitPayment(paymentHash lntypes.Hash,
194
        info *PaymentCreationInfo) error {
3✔
195

3✔
196
        // Obtain a new sequence number for this payment. This is used
3✔
197
        // to sort the payments in order of creation, and also acts as
3✔
198
        // a unique identifier for each payment.
3✔
199
        sequenceNum, err := p.nextPaymentSequence()
3✔
200
        if err != nil {
3✔
201
                return err
×
UNCOV
202
        }
×
203

204
        var b bytes.Buffer
3✔
205
        if err := serializePaymentCreationInfo(&b, info); err != nil {
3✔
206
                return err
×
UNCOV
207
        }
×
208
        infoBytes := b.Bytes()
3✔
209

3✔
210
        var updateErr error
3✔
211
        err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
6✔
212
                // Reset the update error, to avoid carrying over an error
3✔
213
                // from a previous execution of the batched db transaction.
3✔
214
                updateErr = nil
3✔
215

3✔
216
                prefetchPayment(tx, paymentHash)
3✔
217
                bucket, err := createPaymentBucket(tx, paymentHash)
3✔
218
                if err != nil {
3✔
219
                        return err
×
UNCOV
220
                }
×
221

222
                // Get the existing status of this payment, if any.
223
                paymentStatus, err := fetchPaymentStatus(bucket)
3✔
224

3✔
225
                switch {
3✔
226
                // If no error is returned, it means we already have this
227
                // payment. We'll check the status to decide whether we allow
228
                // retrying the payment or return a specific error.
229
                case err == nil:
3✔
230
                        if err := paymentStatus.initializable(); err != nil {
6✔
231
                                updateErr = err
3✔
232
                                return nil
3✔
233
                        }
3✔
234

235
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
236
                // we'll return the error.
237
                case !errors.Is(err, ErrPaymentNotInitiated):
×
UNCOV
238
                        return err
×
239
                }
240

241
                // Before we set our new sequence number, we check whether this
242
                // payment has a previously set sequence number and remove its
243
                // index entry if it exists. This happens in the case where we
244
                // have a previously attempted payment which was left in a state
245
                // where we can retry.
246
                seqBytes := bucket.Get(paymentSequenceKey)
3✔
247
                if seqBytes != nil {
6✔
248
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
249
                        if err := indexBucket.Delete(seqBytes); err != nil {
3✔
250
                                return err
×
UNCOV
251
                        }
×
252
                }
253

254
                // Once we have obtained a sequence number, we add an entry
255
                // to our index bucket which will map the sequence number to
256
                // our payment identifier.
257
                err = createPaymentIndexEntry(
3✔
258
                        tx, sequenceNum, info.PaymentIdentifier,
3✔
259
                )
3✔
260
                if err != nil {
3✔
261
                        return err
×
UNCOV
262
                }
×
263

264
                err = bucket.Put(paymentSequenceKey, sequenceNum)
3✔
265
                if err != nil {
3✔
266
                        return err
×
UNCOV
267
                }
×
268

269
                // Add the payment info to the bucket, which contains the
270
                // static information for this payment
271
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
3✔
272
                if err != nil {
3✔
273
                        return err
×
UNCOV
274
                }
×
275

276
                // We'll delete any lingering HTLCs to start with, in case we
277
                // are initializing a payment that was attempted earlier, but
278
                // left in a state where we could retry.
279
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
3✔
280
                if err != nil && !errors.Is(err, kvdb.ErrBucketNotFound) {
3✔
281
                        return err
×
UNCOV
282
                }
×
283

284
                // Also delete any lingering failure info now that we are
285
                // re-attempting.
286
                return bucket.Delete(paymentFailInfoKey)
3✔
287
        })
288
        if err != nil {
3✔
289
                return fmt.Errorf("unable to init payment: %w", err)
×
UNCOV
290
        }
×
291

292
        return updateErr
3✔
293
}
294

295
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
296
// by the KVStore db.
297
func (p *KVStore) DeleteFailedAttempts(hash lntypes.Hash) error {
3✔
298
        if !p.keepFailedPaymentAttempts {
3✔
UNCOV
299
                const failedHtlcsOnly = true
×
UNCOV
300
                err := p.DeletePayment(hash, failedHtlcsOnly)
×
UNCOV
301
                if err != nil {
×
UNCOV
302
                        return err
×
UNCOV
303
                }
×
304
        }
305

306
        return nil
3✔
307
}
308

309
// paymentIndexTypeHash is a payment index type which indicates that we have
310
// created an index of payment sequence number to payment hash.
311
type paymentIndexType uint8
312

313
// paymentIndexTypeHash is a payment index type which indicates that we have
314
// created an index of payment sequence number to payment hash.
315
const paymentIndexTypeHash paymentIndexType = 0
316

317
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
318
// index produced contains a payment index type (which can be used in future to
319
// signal different payment index types) and the payment identifier.
320
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
321
        id lntypes.Hash) error {
3✔
322

3✔
323
        var b bytes.Buffer
3✔
324
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
3✔
325
                return err
×
UNCOV
326
        }
×
327

328
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
329

3✔
330
        return indexes.Put(sequenceNumber, b.Bytes())
3✔
331
}
332

333
// deserializePaymentIndex deserializes a payment index entry. This function
334
// currently only supports deserialization of payment hash indexes, and will
335
// fail for other types.
336
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
3✔
337
        var (
3✔
338
                indexType   paymentIndexType
3✔
339
                paymentHash []byte
3✔
340
        )
3✔
341

3✔
342
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
3✔
343
                return lntypes.Hash{}, err
×
UNCOV
344
        }
×
345

346
        // While we only have on payment index type, we do not need to use our
347
        // index type to deserialize the index. However, we sanity check that
348
        // this type is as expected, since we had to read it out anyway.
349
        if indexType != paymentIndexTypeHash {
3✔
350
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
351
                        "type: %v", indexType)
×
UNCOV
352
        }
×
353

354
        hash, err := lntypes.MakeHash(paymentHash)
3✔
355
        if err != nil {
3✔
356
                return lntypes.Hash{}, err
×
UNCOV
357
        }
×
358

359
        return hash, nil
3✔
360
}
361

362
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
363
// DB.
364
func (p *KVStore) RegisterAttempt(paymentHash lntypes.Hash,
365
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
3✔
366

3✔
367
        // Serialize the information before opening the db transaction.
3✔
368
        var a bytes.Buffer
3✔
369
        err := serializeHTLCAttemptInfo(&a, attempt)
3✔
370
        if err != nil {
3✔
371
                return nil, err
×
UNCOV
372
        }
×
373
        htlcInfoBytes := a.Bytes()
3✔
374

3✔
375
        htlcIDBytes := make([]byte, 8)
3✔
376
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
3✔
377

3✔
378
        var payment *MPPayment
3✔
379
        err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
6✔
380
                prefetchPayment(tx, paymentHash)
3✔
381
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
382
                if err != nil {
3✔
383
                        return err
×
UNCOV
384
                }
×
385

386
                payment, err = fetchPayment(bucket)
3✔
387
                if err != nil {
3✔
388
                        return err
×
UNCOV
389
                }
×
390

391
                // Check if registering a new attempt is allowed.
392
                if err := payment.Registrable(); err != nil {
3✔
UNCOV
393
                        return err
×
UNCOV
394
                }
×
395

396
                // If the final hop has encrypted data, then we know this is a
397
                // blinded payment. In blinded payments, MPP records are not set
398
                // for split payments and the recipient is responsible for using
399
                // a consistent PathID across the various encrypted data
400
                // payloads that we received from them for this payment. All we
401
                // need to check is that the total amount field for each HTLC
402
                // in the split payment is correct.
403
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
3✔
404

3✔
405
                // Make sure any existing shards match the new one with regards
3✔
406
                // to MPP options.
3✔
407
                mpp := attempt.Route.FinalHop().MPP
3✔
408

3✔
409
                // MPP records should not be set for attempts to blinded paths.
3✔
410
                if isBlinded && mpp != nil {
3✔
411
                        return ErrMPPRecordInBlindedPayment
×
UNCOV
412
                }
×
413

414
                for _, h := range payment.InFlightHTLCs() {
6✔
415
                        hMpp := h.Route.FinalHop().MPP
3✔
416

3✔
417
                        // If this is a blinded payment, then no existing HTLCs
3✔
418
                        // should have MPP records.
3✔
419
                        if isBlinded && hMpp != nil {
3✔
420
                                return ErrMPPRecordInBlindedPayment
×
UNCOV
421
                        }
×
422

423
                        // If this is a blinded payment, then we just need to
424
                        // check that the TotalAmtMsat field for this shard
425
                        // is equal to that of any other shard in the same
426
                        // payment.
427
                        if isBlinded {
6✔
428
                                if attempt.Route.FinalHop().TotalAmtMsat !=
3✔
429
                                        h.Route.FinalHop().TotalAmtMsat {
3✔
430

×
431
                                        //nolint:ll
×
432
                                        return ErrBlindedPaymentTotalAmountMismatch
×
UNCOV
433
                                }
×
434

435
                                continue
3✔
436
                        }
437

438
                        switch {
3✔
439
                        // We tried to register a non-MPP attempt for a MPP
440
                        // payment.
UNCOV
441
                        case mpp == nil && hMpp != nil:
×
UNCOV
442
                                return ErrMPPayment
×
443

444
                        // We tried to register a MPP shard for a non-MPP
445
                        // payment.
UNCOV
446
                        case mpp != nil && hMpp == nil:
×
UNCOV
447
                                return ErrNonMPPayment
×
448

449
                        // Non-MPP payment, nothing more to validate.
450
                        case mpp == nil:
×
UNCOV
451
                                continue
×
452
                        }
453

454
                        // Check that MPP options match.
455
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
3✔
UNCOV
456
                                return ErrMPPPaymentAddrMismatch
×
UNCOV
457
                        }
×
458

459
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
3✔
UNCOV
460
                                return ErrMPPTotalAmountMismatch
×
UNCOV
461
                        }
×
462
                }
463

464
                // If this is a non-MPP attempt, it must match the total amount
465
                // exactly. Note that a blinded payment is considered an MPP
466
                // attempt.
467
                amt := attempt.Route.ReceiverAmt()
3✔
468
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
3✔
469
                        return ErrValueMismatch
×
UNCOV
470
                }
×
471

472
                // Ensure we aren't sending more than the total payment amount.
473
                sentAmt, _ := payment.SentAmt()
3✔
474
                if sentAmt+amt > payment.Info.Value {
3✔
UNCOV
475
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
×
UNCOV
476
                                "%v", ErrValueExceedsAmt,
×
UNCOV
477
                                sentAmt+amt, payment.Info.Value)
×
UNCOV
478
                }
×
479

480
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
3✔
481
                        paymentHtlcsBucket,
3✔
482
                )
3✔
483
                if err != nil {
3✔
484
                        return err
×
UNCOV
485
                }
×
486

487
                err = htlcsBucket.Put(
3✔
488
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
3✔
489
                        htlcInfoBytes,
3✔
490
                )
3✔
491
                if err != nil {
3✔
492
                        return err
×
UNCOV
493
                }
×
494

495
                // Retrieve attempt info for the notification.
496
                payment, err = fetchPayment(bucket)
3✔
497

3✔
498
                return err
3✔
499
        })
500
        if err != nil {
3✔
UNCOV
501
                return nil, err
×
UNCOV
502
        }
×
503

504
        return payment, err
3✔
505
}
506

507
// SettleAttempt marks the given attempt settled with the preimage. If this is
508
// a multi shard payment, this might implicitly mean that the full payment
509
// succeeded.
510
//
511
// After invoking this method, InitPayment should always return an error to
512
// prevent us from making duplicate payments to the same payment hash. The
513
// provided preimage is atomically saved to the DB for record keeping.
514
func (p *KVStore) SettleAttempt(hash lntypes.Hash,
515
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
3✔
516

3✔
517
        var b bytes.Buffer
3✔
518
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
3✔
519
                return nil, err
×
UNCOV
520
        }
×
521
        settleBytes := b.Bytes()
3✔
522

3✔
523
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
3✔
524
}
525

526
// FailAttempt marks the given payment attempt failed.
527
func (p *KVStore) FailAttempt(hash lntypes.Hash,
528
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
3✔
529

3✔
530
        var b bytes.Buffer
3✔
531
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
3✔
532
                return nil, err
×
UNCOV
533
        }
×
534
        failBytes := b.Bytes()
3✔
535

3✔
536
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
3✔
537
}
538

539
// updateHtlcKey updates a database key for the specified htlc.
540
func (p *KVStore) updateHtlcKey(paymentHash lntypes.Hash,
541
        attemptID uint64, key, value []byte) (*MPPayment, error) {
3✔
542

3✔
543
        aid := make([]byte, 8)
3✔
544
        binary.BigEndian.PutUint64(aid, attemptID)
3✔
545

3✔
546
        var payment *MPPayment
3✔
547
        err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
6✔
548
                payment = nil
3✔
549

3✔
550
                prefetchPayment(tx, paymentHash)
3✔
551
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
552
                if err != nil {
3✔
UNCOV
553
                        return err
×
UNCOV
554
                }
×
555

556
                p, err := fetchPayment(bucket)
3✔
557
                if err != nil {
3✔
558
                        return err
×
UNCOV
559
                }
×
560

561
                // We can only update keys of in-flight payments. We allow
562
                // updating keys even if the payment has reached a terminal
563
                // condition, since the HTLC outcomes must still be updated.
564
                if err := p.Status.updatable(); err != nil {
3✔
565
                        return err
×
UNCOV
566
                }
×
567

568
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
3✔
569
                if htlcsBucket == nil {
3✔
570
                        return fmt.Errorf("htlcs bucket not found")
×
UNCOV
571
                }
×
572

573
                attemptKey := htlcBucketKey(htlcAttemptInfoKey, aid)
3✔
574
                if htlcsBucket.Get(attemptKey) == nil {
3✔
575
                        return fmt.Errorf("HTLC with ID %v not registered",
×
576
                                attemptID)
×
UNCOV
577
                }
×
578

579
                // Make sure the shard is not already failed or settled.
580
                failKey := htlcBucketKey(htlcFailInfoKey, aid)
3✔
581
                if htlcsBucket.Get(failKey) != nil {
3✔
582
                        return ErrAttemptAlreadyFailed
×
UNCOV
583
                }
×
584

585
                settleKey := htlcBucketKey(htlcSettleInfoKey, aid)
3✔
586
                if htlcsBucket.Get(settleKey) != nil {
3✔
587
                        return ErrAttemptAlreadySettled
×
UNCOV
588
                }
×
589

590
                // Add or update the key for this htlc.
591
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
3✔
592
                if err != nil {
3✔
593
                        return err
×
UNCOV
594
                }
×
595

596
                // Retrieve attempt info for the notification.
597
                payment, err = fetchPayment(bucket)
3✔
598

3✔
599
                return err
3✔
600
        })
601
        if err != nil {
3✔
UNCOV
602
                return nil, err
×
UNCOV
603
        }
×
604

605
        return payment, err
3✔
606
}
607

608
// Fail transitions a payment into the Failed state, and records the reason the
609
// payment failed. After invoking this method, InitPayment should return nil on
610
// its next call for this payment hash, allowing the switch to make a
611
// subsequent payment.
612
func (p *KVStore) Fail(paymentHash lntypes.Hash,
613
        reason FailureReason) (*MPPayment, error) {
3✔
614

3✔
615
        var (
3✔
616
                updateErr error
3✔
617
                payment   *MPPayment
3✔
618
        )
3✔
619
        err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
6✔
620
                // Reset the update error, to avoid carrying over an error
3✔
621
                // from a previous execution of the batched db transaction.
3✔
622
                updateErr = nil
3✔
623
                payment = nil
3✔
624

3✔
625
                prefetchPayment(tx, paymentHash)
3✔
626
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
627
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
UNCOV
628
                        updateErr = ErrPaymentNotInitiated
×
UNCOV
629
                        return nil
×
630
                } else if err != nil {
3✔
631
                        return err
×
UNCOV
632
                }
×
633

634
                // We mark the payment as failed as long as it is known. This
635
                // lets the last attempt to fail with a terminal write its
636
                // failure to the KVStore without synchronizing with
637
                // other attempts.
638
                _, err = fetchPaymentStatus(bucket)
3✔
639
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
640
                        updateErr = ErrPaymentNotInitiated
×
UNCOV
641
                        return nil
×
642
                } else if err != nil {
3✔
643
                        return err
×
UNCOV
644
                }
×
645

646
                // Put the failure reason in the bucket for record keeping.
647
                v := []byte{byte(reason)}
3✔
648
                err = bucket.Put(paymentFailInfoKey, v)
3✔
649
                if err != nil {
3✔
650
                        return err
×
UNCOV
651
                }
×
652

653
                // Retrieve attempt info for the notification, if available.
654
                payment, err = fetchPayment(bucket)
3✔
655
                if err != nil {
3✔
656
                        return err
×
UNCOV
657
                }
×
658

659
                return nil
3✔
660
        })
661
        if err != nil {
3✔
662
                return nil, err
×
UNCOV
663
        }
×
664

665
        return payment, updateErr
3✔
666
}
667

668
// FetchPayment returns information about a payment from the database.
669
func (p *KVStore) FetchPayment(paymentHash lntypes.Hash) (
670
        *MPPayment, error) {
3✔
671

3✔
672
        var payment *MPPayment
3✔
673
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
674
                prefetchPayment(tx, paymentHash)
3✔
675
                bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
676
                if err != nil {
3✔
UNCOV
677
                        return err
×
UNCOV
678
                }
×
679

680
                payment, err = fetchPayment(bucket)
3✔
681

3✔
682
                return err
3✔
683
        }, func() {
3✔
684
                payment = nil
3✔
685
        })
3✔
686
        if err != nil {
3✔
UNCOV
687
                return nil, err
×
UNCOV
688
        }
×
689

690
        return payment, nil
3✔
691
}
692

693
// prefetchPayment attempts to prefetch as much of the payment as possible to
694
// reduce DB roundtrips.
695
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
3✔
696
        rb := kvdb.RootBucket(tx)
3✔
697
        kvdb.Prefetch(
3✔
698
                rb,
3✔
699
                []string{
3✔
700
                        // Prefetch all keys in the payment's bucket.
3✔
701
                        string(paymentsRootBucket),
3✔
702
                        string(paymentHash[:]),
3✔
703
                },
3✔
704
                []string{
3✔
705
                        // Prefetch all keys in the payment's htlc bucket.
3✔
706
                        string(paymentsRootBucket),
3✔
707
                        string(paymentHash[:]),
3✔
708
                        string(paymentHtlcsBucket),
3✔
709
                },
3✔
710
        )
3✔
711
}
3✔
712

713
// createPaymentBucket creates or fetches the sub-bucket assigned to this
714
// payment hash.
715
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
716
        kvdb.RwBucket, error) {
3✔
717

3✔
718
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
3✔
719
        if err != nil {
3✔
720
                return nil, err
×
UNCOV
721
        }
×
722

723
        return payments.CreateBucketIfNotExists(paymentHash[:])
3✔
724
}
725

726
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
727
// the bucket does not exist, it returns ErrPaymentNotInitiated.
728
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
729
        kvdb.RBucket, error) {
3✔
730

3✔
731
        payments := tx.ReadBucket(paymentsRootBucket)
3✔
732
        if payments == nil {
3✔
733
                return nil, ErrPaymentNotInitiated
×
UNCOV
734
        }
×
735

736
        bucket := payments.NestedReadBucket(paymentHash[:])
3✔
737
        if bucket == nil {
3✔
UNCOV
738
                return nil, ErrPaymentNotInitiated
×
UNCOV
739
        }
×
740

741
        return bucket, nil
3✔
742
}
743

744
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
745
// bucket that can be written to.
746
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
747
        kvdb.RwBucket, error) {
3✔
748

3✔
749
        payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
750
        if payments == nil {
3✔
751
                return nil, ErrPaymentNotInitiated
×
UNCOV
752
        }
×
753

754
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
3✔
755
        if bucket == nil {
3✔
UNCOV
756
                return nil, ErrPaymentNotInitiated
×
UNCOV
757
        }
×
758

759
        return bucket, nil
3✔
760
}
761

762
// nextPaymentSequence returns the next sequence number to store for a new
763
// payment.
764
func (p *KVStore) nextPaymentSequence() ([]byte, error) {
3✔
765
        p.seqMu.Lock()
3✔
766
        defer p.seqMu.Unlock()
3✔
767

3✔
768
        // Set a new upper bound in the DB every 1000 payments to avoid
3✔
769
        // conflicts on the sequence when using etcd.
3✔
770
        if p.currSeq == p.storedSeq {
6✔
771
                var currPaymentSeq, newUpperBound uint64
3✔
772
                if err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
6✔
773
                        paymentsBucket, err := tx.CreateTopLevelBucket(
3✔
774
                                paymentsRootBucket,
3✔
775
                        )
3✔
776
                        if err != nil {
3✔
777
                                return err
×
UNCOV
778
                        }
×
779

780
                        currPaymentSeq = paymentsBucket.Sequence()
3✔
781
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
3✔
782

3✔
783
                        return paymentsBucket.SetSequence(newUpperBound)
3✔
784
                }, func() {}); err != nil {
3✔
785
                        return nil, err
×
UNCOV
786
                }
×
787

788
                // We lazy initialize the cached currPaymentSeq here using the
789
                // first nextPaymentSequence() call. This if statement will auto
790
                // initialize our stored currPaymentSeq, since by default both
791
                // this variable and storedPaymentSeq are zero which in turn
792
                // will have us fetch the current values from the DB.
793
                if p.currSeq == 0 {
6✔
794
                        p.currSeq = currPaymentSeq
3✔
795
                }
3✔
796

797
                p.storedSeq = newUpperBound
3✔
798
        }
799

800
        p.currSeq++
3✔
801
        b := make([]byte, 8)
3✔
802
        binary.BigEndian.PutUint64(b, p.currSeq)
3✔
803

3✔
804
        return b, nil
3✔
805
}
806

807
// fetchPaymentStatus fetches the payment status of the payment. If the payment
808
// isn't found, it will return error `ErrPaymentNotInitiated`.
809
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
3✔
810
        // Creation info should be set for all payments, regardless of state.
3✔
811
        // If not, it is unknown.
3✔
812
        if bucket.Get(paymentCreationInfoKey) == nil {
6✔
813
                return 0, ErrPaymentNotInitiated
3✔
814
        }
3✔
815

816
        payment, err := fetchPayment(bucket)
3✔
817
        if err != nil {
3✔
818
                return 0, err
×
UNCOV
819
        }
×
820

821
        return payment.Status, nil
3✔
822
}
823

824
// FetchInFlightPayments returns all payments with status InFlight.
825
func (p *KVStore) FetchInFlightPayments() ([]*MPPayment, error) {
3✔
826
        var (
3✔
827
                inFlights      []*MPPayment
3✔
828
                start          = time.Now()
3✔
829
                lastLogTime    = time.Now()
3✔
830
                processedCount int
3✔
831
        )
3✔
832

3✔
833
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
834
                payments := tx.ReadBucket(paymentsRootBucket)
3✔
835
                if payments == nil {
3✔
836
                        return nil
×
UNCOV
837
                }
×
838

839
                return payments.ForEach(func(k, _ []byte) error {
6✔
840
                        bucket := payments.NestedReadBucket(k)
3✔
841
                        if bucket == nil {
3✔
842
                                return fmt.Errorf("non bucket element")
×
UNCOV
843
                        }
×
844

845
                        p, err := fetchPayment(bucket)
3✔
846
                        if err != nil {
3✔
847
                                return err
×
UNCOV
848
                        }
×
849

850
                        processedCount++
3✔
851
                        if time.Since(lastLogTime) >=
3✔
852
                                paymentProgressLogInterval {
3✔
853

×
854
                                log.Debugf("Scanning inflight payments "+
×
855
                                        "(in progress), processed %d, last "+
×
856
                                        "processed payment: %v", processedCount,
×
857
                                        p.Info)
×
858

×
859
                                lastLogTime = time.Now()
×
UNCOV
860
                        }
×
861

862
                        // Skip the payment if it's terminated.
863
                        if p.Terminated() {
6✔
864
                                return nil
3✔
865
                        }
3✔
866

867
                        inFlights = append(inFlights, p)
3✔
868

3✔
869
                        return nil
3✔
870
                })
871
        }, func() {
3✔
872
                inFlights = nil
3✔
873
        })
3✔
874
        if err != nil {
3✔
875
                return nil, err
×
UNCOV
876
        }
×
877

878
        elapsed := time.Since(start)
3✔
879
        log.Debugf("Completed scanning for inflight payments: "+
3✔
880
                "total_processed=%d, found_inflight=%d, elapsed=%v",
3✔
881
                processedCount, len(inFlights),
3✔
882
                elapsed.Round(time.Millisecond))
3✔
883

3✔
884
        return inFlights, nil
3✔
885
}
886

887
// htlcBucketKey creates a composite key from prefix and id where the result is
888
// simply the two concatenated.
889
func htlcBucketKey(prefix, id []byte) []byte {
3✔
890
        key := make([]byte, len(prefix)+len(id))
3✔
891
        copy(key, prefix)
3✔
892
        copy(key[len(prefix):], id)
3✔
893

3✔
894
        return key
3✔
895
}
3✔
896

897
// FetchPayments returns all sent payments found in the DB.
UNCOV
898
func (p *KVStore) FetchPayments() ([]*MPPayment, error) {
×
UNCOV
899
        var payments []*MPPayment
×
UNCOV
900

×
UNCOV
901
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
×
UNCOV
902
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
×
903
                if paymentsBucket == nil {
×
904
                        return nil
×
UNCOV
905
                }
×
906

UNCOV
907
                return paymentsBucket.ForEach(func(k, v []byte) error {
×
UNCOV
908
                        bucket := paymentsBucket.NestedReadBucket(k)
×
909
                        if bucket == nil {
×
910
                                // We only expect sub-buckets to be found in
×
911
                                // this top-level bucket.
×
912
                                return fmt.Errorf("non bucket element in " +
×
913
                                        "payments bucket")
×
UNCOV
914
                        }
×
915

UNCOV
916
                        p, err := fetchPayment(bucket)
×
917
                        if err != nil {
×
918
                                return err
×
UNCOV
919
                        }
×
920

UNCOV
921
                        payments = append(payments, p)
×
UNCOV
922

×
UNCOV
923
                        // For older versions of lnd, duplicate payments to a
×
UNCOV
924
                        // payment has was possible. These will be found in a
×
UNCOV
925
                        // sub-bucket indexed by their sequence number if
×
UNCOV
926
                        // available.
×
UNCOV
927
                        duplicatePayments, err := fetchDuplicatePayments(bucket)
×
928
                        if err != nil {
×
929
                                return err
×
UNCOV
930
                        }
×
931

UNCOV
932
                        payments = append(payments, duplicatePayments...)
×
UNCOV
933

×
UNCOV
934
                        return nil
×
935
                })
UNCOV
936
        }, func() {
×
UNCOV
937
                payments = nil
×
UNCOV
938
        })
×
939
        if err != nil {
×
940
                return nil, err
×
UNCOV
941
        }
×
942

943
        // Before returning, sort the payments by their sequence number.
UNCOV
944
        sort.Slice(payments, func(i, j int) bool {
×
UNCOV
945
                return payments[i].SequenceNum < payments[j].SequenceNum
×
UNCOV
946
        })
×
947

UNCOV
948
        return payments, nil
×
949
}
950

951
func fetchCreationInfo(bucket kvdb.RBucket) (*PaymentCreationInfo, error) {
3✔
952
        b := bucket.Get(paymentCreationInfoKey)
3✔
953
        if b == nil {
3✔
954
                return nil, fmt.Errorf("creation info not found")
×
UNCOV
955
        }
×
956

957
        r := bytes.NewReader(b)
3✔
958

3✔
959
        return deserializePaymentCreationInfo(r)
3✔
960
}
961

962
func fetchPayment(bucket kvdb.RBucket) (*MPPayment, error) {
3✔
963
        seqBytes := bucket.Get(paymentSequenceKey)
3✔
964
        if seqBytes == nil {
3✔
965
                return nil, fmt.Errorf("sequence number not found")
×
UNCOV
966
        }
×
967

968
        sequenceNum := binary.BigEndian.Uint64(seqBytes)
3✔
969

3✔
970
        // Get the PaymentCreationInfo.
3✔
971
        creationInfo, err := fetchCreationInfo(bucket)
3✔
972
        if err != nil {
3✔
973
                return nil, err
×
UNCOV
974
        }
×
975

976
        var htlcs []HTLCAttempt
3✔
977
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
3✔
978
        if htlcsBucket != nil {
6✔
979
                // Get the payment attempts. This can be empty.
3✔
980
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
3✔
981
                if err != nil {
3✔
982
                        return nil, err
×
UNCOV
983
                }
×
984
        }
985

986
        // Get failure reason if available.
987
        var failureReason *FailureReason
3✔
988
        b := bucket.Get(paymentFailInfoKey)
3✔
989
        if b != nil {
6✔
990
                reason := FailureReason(b[0])
3✔
991
                failureReason = &reason
3✔
992
        }
3✔
993

994
        // Create a new payment.
995
        payment := &MPPayment{
3✔
996
                SequenceNum:   sequenceNum,
3✔
997
                Info:          creationInfo,
3✔
998
                HTLCs:         htlcs,
3✔
999
                FailureReason: failureReason,
3✔
1000
        }
3✔
1001

3✔
1002
        // Set its state and status.
3✔
1003
        if err := payment.setState(); err != nil {
3✔
1004
                return nil, err
×
UNCOV
1005
        }
×
1006

1007
        return payment, nil
3✔
1008
}
1009

1010
// fetchHtlcAttempts retrieves all htlc attempts made for the payment found in
1011
// the given bucket.
1012
func fetchHtlcAttempts(bucket kvdb.RBucket) ([]HTLCAttempt, error) {
3✔
1013
        htlcsMap := make(map[uint64]*HTLCAttempt)
3✔
1014

3✔
1015
        attemptInfoCount := 0
3✔
1016
        err := bucket.ForEach(func(k, v []byte) error {
6✔
1017
                aid := byteOrder.Uint64(k[len(k)-8:])
3✔
1018

3✔
1019
                if _, ok := htlcsMap[aid]; !ok {
6✔
1020
                        htlcsMap[aid] = &HTLCAttempt{}
3✔
1021
                }
3✔
1022

1023
                var err error
3✔
1024
                switch {
3✔
1025
                case bytes.HasPrefix(k, htlcAttemptInfoKey):
3✔
1026
                        attemptInfo, err := readHtlcAttemptInfo(v)
3✔
1027
                        if err != nil {
3✔
1028
                                return err
×
UNCOV
1029
                        }
×
1030

1031
                        attemptInfo.AttemptID = aid
3✔
1032
                        htlcsMap[aid].HTLCAttemptInfo = *attemptInfo
3✔
1033
                        attemptInfoCount++
3✔
1034

1035
                case bytes.HasPrefix(k, htlcSettleInfoKey):
3✔
1036
                        htlcsMap[aid].Settle, err = readHtlcSettleInfo(v)
3✔
1037
                        if err != nil {
3✔
1038
                                return err
×
UNCOV
1039
                        }
×
1040

1041
                case bytes.HasPrefix(k, htlcFailInfoKey):
3✔
1042
                        htlcsMap[aid].Failure, err = readHtlcFailInfo(v)
3✔
1043
                        if err != nil {
3✔
1044
                                return err
×
UNCOV
1045
                        }
×
1046

1047
                default:
×
UNCOV
1048
                        return fmt.Errorf("unknown htlc attempt key")
×
1049
                }
1050

1051
                return nil
3✔
1052
        })
1053
        if err != nil {
3✔
1054
                return nil, err
×
UNCOV
1055
        }
×
1056

1057
        // Sanity check that all htlcs have an attempt info.
1058
        if attemptInfoCount != len(htlcsMap) {
3✔
1059
                return nil, ErrNoAttemptInfo
×
UNCOV
1060
        }
×
1061

1062
        keys := make([]uint64, len(htlcsMap))
3✔
1063
        i := 0
3✔
1064
        for k := range htlcsMap {
6✔
1065
                keys[i] = k
3✔
1066
                i++
3✔
1067
        }
3✔
1068

1069
        // Sort HTLC attempts by their attempt ID. This is needed because in the
1070
        // DB we store the attempts with keys prefixed by their status which
1071
        // changes order (groups them together by status).
1072
        sort.Slice(keys, func(i, j int) bool {
6✔
1073
                return keys[i] < keys[j]
3✔
1074
        })
3✔
1075

1076
        htlcs := make([]HTLCAttempt, len(htlcsMap))
3✔
1077
        for i, key := range keys {
6✔
1078
                htlcs[i] = *htlcsMap[key]
3✔
1079
        }
3✔
1080

1081
        return htlcs, nil
3✔
1082
}
1083

1084
// readHtlcAttemptInfo reads the payment attempt info for this htlc.
1085
func readHtlcAttemptInfo(b []byte) (*HTLCAttemptInfo, error) {
3✔
1086
        r := bytes.NewReader(b)
3✔
1087
        return deserializeHTLCAttemptInfo(r)
3✔
1088
}
3✔
1089

1090
// readHtlcSettleInfo reads the settle info for the htlc. If the htlc isn't
1091
// settled, nil is returned.
1092
func readHtlcSettleInfo(b []byte) (*HTLCSettleInfo, error) {
3✔
1093
        r := bytes.NewReader(b)
3✔
1094
        return deserializeHTLCSettleInfo(r)
3✔
1095
}
3✔
1096

1097
// readHtlcFailInfo reads the failure info for the htlc. If the htlc hasn't
1098
// failed, nil is returned.
1099
func readHtlcFailInfo(b []byte) (*HTLCFailInfo, error) {
3✔
1100
        r := bytes.NewReader(b)
3✔
1101
        return deserializeHTLCFailInfo(r)
3✔
1102
}
3✔
1103

1104
// fetchFailedHtlcKeys retrieves the bucket keys of all failed HTLCs of a
1105
// payment bucket.
UNCOV
1106
func fetchFailedHtlcKeys(bucket kvdb.RBucket) ([][]byte, error) {
×
UNCOV
1107
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
×
UNCOV
1108

×
UNCOV
1109
        var htlcs []HTLCAttempt
×
UNCOV
1110
        var err error
×
UNCOV
1111
        if htlcsBucket != nil {
×
UNCOV
1112
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
×
1113
                if err != nil {
×
1114
                        return nil, err
×
UNCOV
1115
                }
×
1116
        }
1117

1118
        // Now iterate though them and save the bucket keys for the failed
1119
        // HTLCs.
UNCOV
1120
        var htlcKeys [][]byte
×
UNCOV
1121
        for _, h := range htlcs {
×
UNCOV
1122
                if h.Failure == nil {
×
UNCOV
1123
                        continue
×
1124
                }
1125

UNCOV
1126
                htlcKeyBytes := make([]byte, 8)
×
UNCOV
1127
                binary.BigEndian.PutUint64(htlcKeyBytes, h.AttemptID)
×
UNCOV
1128

×
UNCOV
1129
                htlcKeys = append(htlcKeys, htlcKeyBytes)
×
1130
        }
1131

UNCOV
1132
        return htlcKeys, nil
×
1133
}
1134

1135
// QueryPayments is a query to the payments database which is restricted
1136
// to a subset of payments by the payments query, containing an offset
1137
// index and a maximum number of returned payments.
1138
func (p *KVStore) QueryPayments(_ context.Context,
1139
        query Query) (Response, error) {
3✔
1140

3✔
1141
        var resp Response
3✔
1142

3✔
1143
        if err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
1144
                // Get the root payments bucket.
3✔
1145
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
3✔
1146
                if paymentsBucket == nil {
3✔
1147
                        return nil
×
UNCOV
1148
                }
×
1149

1150
                // Get the index bucket which maps sequence number -> payment
1151
                // hash and duplicate bool. If we have a payments bucket, we
1152
                // should have an indexes bucket as well.
1153
                indexes := tx.ReadBucket(paymentsIndexBucket)
3✔
1154
                if indexes == nil {
3✔
1155
                        return fmt.Errorf("index bucket does not exist")
×
UNCOV
1156
                }
×
1157

1158
                // accumulatePayments gets payments with the sequence number
1159
                // and hash provided and adds them to our list of payments if
1160
                // they meet the criteria of our query. It returns the number
1161
                // of payments that were added.
1162
                accumulatePayments := func(sequenceKey, hash []byte) (bool,
3✔
1163
                        error) {
6✔
1164

3✔
1165
                        r := bytes.NewReader(hash)
3✔
1166
                        paymentHash, err := deserializePaymentIndex(r)
3✔
1167
                        if err != nil {
3✔
1168
                                return false, err
×
UNCOV
1169
                        }
×
1170

1171
                        payment, err := fetchPaymentWithSequenceNumber(
3✔
1172
                                tx, paymentHash, sequenceKey,
3✔
1173
                        )
3✔
1174
                        if err != nil {
3✔
1175
                                return false, err
×
UNCOV
1176
                        }
×
1177

1178
                        // To keep compatibility with the old API, we only
1179
                        // return non-succeeded payments if requested.
1180
                        if payment.Status != StatusSucceeded &&
3✔
1181
                                !query.IncludeIncomplete {
3✔
UNCOV
1182

×
UNCOV
1183
                                return false, err
×
UNCOV
1184
                        }
×
1185

1186
                        // Get the creation time in Unix seconds, this always
1187
                        // rounds down the nanoseconds to full seconds.
1188
                        createTime := payment.Info.CreationTime.Unix()
3✔
1189

3✔
1190
                        // Skip any payments that were created before the
3✔
1191
                        // specified time.
3✔
1192
                        if createTime < query.CreationDateStart {
6✔
1193
                                return false, nil
3✔
1194
                        }
3✔
1195

1196
                        // Skip any payments that were created after the
1197
                        // specified time.
1198
                        if query.CreationDateEnd != 0 &&
3✔
1199
                                createTime > query.CreationDateEnd {
6✔
1200

3✔
1201
                                return false, nil
3✔
1202
                        }
3✔
1203

1204
                        // At this point, we've exhausted the offset, so we'll
1205
                        // begin collecting invoices found within the range.
1206
                        resp.Payments = append(resp.Payments, payment)
3✔
1207

3✔
1208
                        return true, nil
3✔
1209
                }
1210

1211
                // Create a paginator which reads from our sequence index bucket
1212
                // with the parameters provided by the payments query.
1213
                paginator := channeldb.NewPaginator(
3✔
1214
                        indexes.ReadCursor(), query.Reversed, query.IndexOffset,
3✔
1215
                        query.MaxPayments,
3✔
1216
                )
3✔
1217

3✔
1218
                // Run a paginated query, adding payments to our response.
3✔
1219
                if err := paginator.Query(accumulatePayments); err != nil {
3✔
1220
                        return err
×
UNCOV
1221
                }
×
1222

1223
                // Counting the total number of payments is expensive, since we
1224
                // literally have to traverse the cursor linearly, which can
1225
                // take quite a while. So it's an optional query parameter.
1226
                if query.CountTotal {
3✔
1227
                        var (
×
1228
                                totalPayments uint64
×
1229
                                err           error
×
1230
                        )
×
1231
                        countFn := func(_, _ []byte) error {
×
1232
                                totalPayments++
×
1233

×
1234
                                return nil
×
UNCOV
1235
                        }
×
1236

1237
                        // In non-boltdb database backends, there's a faster
1238
                        // ForAll query that allows for batch fetching items.
1239
                        fastBucket, ok := indexes.(kvdb.ExtendedRBucket)
×
1240
                        if ok {
×
1241
                                err = fastBucket.ForAll(countFn)
×
1242
                        } else {
×
1243
                                err = indexes.ForEach(countFn)
×
1244
                        }
×
1245
                        if err != nil {
×
1246
                                return fmt.Errorf("error counting payments: %w",
×
1247
                                        err)
×
UNCOV
1248
                        }
×
1249

UNCOV
1250
                        resp.TotalCount = totalPayments
×
1251
                }
1252

1253
                return nil
3✔
1254
        }, func() {
3✔
1255
                resp = Response{}
3✔
1256
        }); err != nil {
3✔
1257
                return resp, err
×
UNCOV
1258
        }
×
1259

1260
        // Need to swap the payments slice order if reversed order.
1261
        if query.Reversed {
3✔
UNCOV
1262
                for l, r := 0, len(resp.Payments)-1; l < r; l, r = l+1, r-1 {
×
UNCOV
1263
                        resp.Payments[l], resp.Payments[r] =
×
UNCOV
1264
                                resp.Payments[r], resp.Payments[l]
×
UNCOV
1265
                }
×
1266
        }
1267

1268
        // Set the first and last index of the returned payments so that the
1269
        // caller can resume from this point later on.
1270
        if len(resp.Payments) > 0 {
6✔
1271
                resp.FirstIndexOffset = resp.Payments[0].SequenceNum
3✔
1272
                resp.LastIndexOffset =
3✔
1273
                        resp.Payments[len(resp.Payments)-1].SequenceNum
3✔
1274
        }
3✔
1275

1276
        return resp, nil
3✔
1277
}
1278

1279
// fetchPaymentWithSequenceNumber get the payment which matches the payment hash
1280
// *and* sequence number provided from the database. This is required because
1281
// we previously had more than one payment per hash, so we have multiple indexes
1282
// pointing to a single payment; we want to retrieve the correct one.
1283
func fetchPaymentWithSequenceNumber(tx kvdb.RTx, paymentHash lntypes.Hash,
1284
        sequenceNumber []byte) (*MPPayment, error) {
3✔
1285

3✔
1286
        // We can now lookup the payment keyed by its hash in
3✔
1287
        // the payments root bucket.
3✔
1288
        bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
1289
        if err != nil {
3✔
1290
                return nil, err
×
UNCOV
1291
        }
×
1292

1293
        // A single payment hash can have multiple payments associated with it.
1294
        // We lookup our sequence number first, to determine whether this is
1295
        // the payment we are actually looking for.
1296
        seqBytes := bucket.Get(paymentSequenceKey)
3✔
1297
        if seqBytes == nil {
3✔
1298
                return nil, ErrNoSequenceNumber
×
UNCOV
1299
        }
×
1300

1301
        // If this top level payment has the sequence number we are looking for,
1302
        // return it.
1303
        if bytes.Equal(seqBytes, sequenceNumber) {
6✔
1304
                return fetchPayment(bucket)
3✔
1305
        }
3✔
1306

1307
        // If we were not looking for the top level payment, we are looking for
1308
        // one of our duplicate payments. We need to iterate through the seq
1309
        // numbers in this bucket to find the correct payments. If we do not
1310
        // find a duplicate payments bucket here, something is wrong.
UNCOV
1311
        dup := bucket.NestedReadBucket(duplicatePaymentsBucket)
×
UNCOV
1312
        if dup == nil {
×
UNCOV
1313
                return nil, ErrNoDuplicateBucket
×
UNCOV
1314
        }
×
1315

UNCOV
1316
        var duplicatePayment *MPPayment
×
UNCOV
1317
        err = dup.ForEach(func(k, v []byte) error {
×
UNCOV
1318
                subBucket := dup.NestedReadBucket(k)
×
1319
                if subBucket == nil {
×
1320
                        // We one bucket for each duplicate to be found.
×
1321
                        return ErrNoDuplicateNestedBucket
×
UNCOV
1322
                }
×
1323

UNCOV
1324
                seqBytes := subBucket.Get(duplicatePaymentSequenceKey)
×
1325
                if seqBytes == nil {
×
1326
                        return err
×
UNCOV
1327
                }
×
1328

1329
                // If this duplicate payment is not the sequence number we are
1330
                // looking for, we can continue.
UNCOV
1331
                if !bytes.Equal(seqBytes, sequenceNumber) {
×
UNCOV
1332
                        return nil
×
UNCOV
1333
                }
×
1334

UNCOV
1335
                duplicatePayment, err = fetchDuplicatePayment(subBucket)
×
1336
                if err != nil {
×
1337
                        return err
×
UNCOV
1338
                }
×
1339

UNCOV
1340
                return nil
×
1341
        })
1342
        if err != nil {
×
1343
                return nil, err
×
UNCOV
1344
        }
×
1345

1346
        // If none of the duplicate payments matched our sequence number, we
1347
        // failed to find the payment with this sequence number; something is
1348
        // wrong.
UNCOV
1349
        if duplicatePayment == nil {
×
UNCOV
1350
                return nil, ErrDuplicateNotFound
×
UNCOV
1351
        }
×
1352

UNCOV
1353
        return duplicatePayment, nil
×
1354
}
1355

1356
// DeletePayment deletes a payment from the DB given its payment hash. If
1357
// failedHtlcsOnly is set, only failed HTLC attempts of the payment will be
1358
// deleted.
1359
func (p *KVStore) DeletePayment(paymentHash lntypes.Hash,
UNCOV
1360
        failedHtlcsOnly bool) error {
×
UNCOV
1361

×
UNCOV
1362
        return kvdb.Update(p.db, func(tx kvdb.RwTx) error {
×
UNCOV
1363
                payments := tx.ReadWriteBucket(paymentsRootBucket)
×
1364
                if payments == nil {
×
1365
                        return nil
×
UNCOV
1366
                }
×
1367

UNCOV
1368
                bucket := payments.NestedReadWriteBucket(paymentHash[:])
×
UNCOV
1369
                if bucket == nil {
×
UNCOV
1370
                        return fmt.Errorf("non bucket element in payments " +
×
UNCOV
1371
                                "bucket")
×
UNCOV
1372
                }
×
1373

1374
                // If the status is InFlight, we cannot safely delete
1375
                // the payment information, so we return early.
UNCOV
1376
                paymentStatus, err := fetchPaymentStatus(bucket)
×
1377
                if err != nil {
×
1378
                        return err
×
UNCOV
1379
                }
×
1380

1381
                // If the payment has inflight HTLCs, we cannot safely delete
1382
                // the payment information, so we return an error.
UNCOV
1383
                if err := paymentStatus.removable(); err != nil {
×
UNCOV
1384
                        return fmt.Errorf("payment '%v' has inflight HTLCs"+
×
UNCOV
1385
                                "and therefore cannot be deleted: %w",
×
UNCOV
1386
                                paymentHash.String(), err)
×
UNCOV
1387
                }
×
1388

1389
                // Delete the failed HTLC attempts we found.
UNCOV
1390
                if failedHtlcsOnly {
×
UNCOV
1391
                        toDelete, err := fetchFailedHtlcKeys(bucket)
×
1392
                        if err != nil {
×
1393
                                return err
×
UNCOV
1394
                        }
×
1395

UNCOV
1396
                        htlcsBucket := bucket.NestedReadWriteBucket(
×
UNCOV
1397
                                paymentHtlcsBucket,
×
UNCOV
1398
                        )
×
UNCOV
1399

×
UNCOV
1400
                        for _, htlcID := range toDelete {
×
UNCOV
1401
                                err = htlcsBucket.Delete(
×
UNCOV
1402
                                        htlcBucketKey(
×
UNCOV
1403
                                                htlcAttemptInfoKey, htlcID,
×
UNCOV
1404
                                        ),
×
UNCOV
1405
                                )
×
1406
                                if err != nil {
×
1407
                                        return err
×
UNCOV
1408
                                }
×
1409

UNCOV
1410
                                err = htlcsBucket.Delete(
×
UNCOV
1411
                                        htlcBucketKey(htlcFailInfoKey, htlcID),
×
UNCOV
1412
                                )
×
1413
                                if err != nil {
×
1414
                                        return err
×
UNCOV
1415
                                }
×
1416

UNCOV
1417
                                err = htlcsBucket.Delete(
×
UNCOV
1418
                                        htlcBucketKey(
×
UNCOV
1419
                                                htlcSettleInfoKey, htlcID,
×
UNCOV
1420
                                        ),
×
UNCOV
1421
                                )
×
1422
                                if err != nil {
×
1423
                                        return err
×
UNCOV
1424
                                }
×
1425
                        }
1426

UNCOV
1427
                        return nil
×
1428
                }
1429

UNCOV
1430
                seqNrs, err := fetchSequenceNumbers(bucket)
×
1431
                if err != nil {
×
1432
                        return err
×
UNCOV
1433
                }
×
1434

UNCOV
1435
                err = payments.DeleteNestedBucket(paymentHash[:])
×
1436
                if err != nil {
×
1437
                        return err
×
UNCOV
1438
                }
×
1439

UNCOV
1440
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
×
UNCOV
1441
                for _, k := range seqNrs {
×
1442
                        if err := indexBucket.Delete(k); err != nil {
×
1443
                                return err
×
UNCOV
1444
                        }
×
1445
                }
1446

UNCOV
1447
                return nil
×
UNCOV
1448
        }, func() {})
×
1449
}
1450

1451
// DeletePayments deletes all completed and failed payments from the DB. If
1452
// failedOnly is set, only failed payments will be considered for deletion. If
1453
// failedHtlcsOnly is set, the payment itself won't be deleted, only failed HTLC
1454
// attempts. The method returns the number of deleted payments, which is always
1455
// 0 if failedHtlcsOnly is set.
1456
func (p *KVStore) DeletePayments(failedOnly,
1457
        failedHtlcsOnly bool) (int, error) {
3✔
1458

3✔
1459
        var numPayments int
3✔
1460
        err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
6✔
1461
                payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
1462
                if payments == nil {
3✔
1463
                        return nil
×
UNCOV
1464
                }
×
1465

1466
                var (
3✔
1467
                        // deleteBuckets is the set of payment buckets we need
3✔
1468
                        // to delete.
3✔
1469
                        deleteBuckets [][]byte
3✔
1470

3✔
1471
                        // deleteIndexes is the set of indexes pointing to these
3✔
1472
                        // payments that need to be deleted.
3✔
1473
                        deleteIndexes [][]byte
3✔
1474

3✔
1475
                        // deleteHtlcs maps a payment hash to the HTLC IDs we
3✔
1476
                        // want to delete for that payment.
3✔
1477
                        deleteHtlcs = make(map[lntypes.Hash][][]byte)
3✔
1478
                )
3✔
1479
                err := payments.ForEach(func(k, _ []byte) error {
6✔
1480
                        bucket := payments.NestedReadBucket(k)
3✔
1481
                        if bucket == nil {
3✔
1482
                                // We only expect sub-buckets to be found in
×
1483
                                // this top-level bucket.
×
1484
                                return fmt.Errorf("non bucket element in " +
×
1485
                                        "payments bucket")
×
UNCOV
1486
                        }
×
1487

1488
                        // If the status is InFlight, we cannot safely delete
1489
                        // the payment information, so we return early.
1490
                        paymentStatus, err := fetchPaymentStatus(bucket)
3✔
1491
                        if err != nil {
3✔
1492
                                return err
×
UNCOV
1493
                        }
×
1494

1495
                        // If the payment has inflight HTLCs, we cannot safely
1496
                        // delete the payment information, so we return an nil
1497
                        // to skip it.
1498
                        if err := paymentStatus.removable(); err != nil {
3✔
UNCOV
1499
                                return nil
×
UNCOV
1500
                        }
×
1501

1502
                        // If we requested to only delete failed payments, we
1503
                        // can return if this one is not.
1504
                        if failedOnly && paymentStatus != StatusFailed {
3✔
UNCOV
1505
                                return nil
×
UNCOV
1506
                        }
×
1507

1508
                        // If we are only deleting failed HTLCs, fetch them.
1509
                        if failedHtlcsOnly {
3✔
UNCOV
1510
                                toDelete, err := fetchFailedHtlcKeys(bucket)
×
1511
                                if err != nil {
×
1512
                                        return err
×
UNCOV
1513
                                }
×
1514

UNCOV
1515
                                hash, err := lntypes.MakeHash(k)
×
1516
                                if err != nil {
×
1517
                                        return err
×
UNCOV
1518
                                }
×
1519

UNCOV
1520
                                deleteHtlcs[hash] = toDelete
×
UNCOV
1521

×
UNCOV
1522
                                // We return, we are only deleting attempts.
×
UNCOV
1523
                                return nil
×
1524
                        }
1525

1526
                        // Add the bucket to the set of buckets we can delete.
1527
                        deleteBuckets = append(deleteBuckets, k)
3✔
1528

3✔
1529
                        // Get all the sequence number associated with the
3✔
1530
                        // payment, including duplicates.
3✔
1531
                        seqNrs, err := fetchSequenceNumbers(bucket)
3✔
1532
                        if err != nil {
3✔
1533
                                return err
×
UNCOV
1534
                        }
×
1535

1536
                        deleteIndexes = append(deleteIndexes, seqNrs...)
3✔
1537
                        numPayments++
3✔
1538

3✔
1539
                        return nil
3✔
1540
                })
1541
                if err != nil {
3✔
1542
                        return err
×
UNCOV
1543
                }
×
1544

1545
                // Delete the failed HTLC attempts we found.
1546
                for hash, htlcIDs := range deleteHtlcs {
3✔
UNCOV
1547
                        bucket := payments.NestedReadWriteBucket(hash[:])
×
UNCOV
1548
                        htlcsBucket := bucket.NestedReadWriteBucket(
×
UNCOV
1549
                                paymentHtlcsBucket,
×
UNCOV
1550
                        )
×
UNCOV
1551

×
UNCOV
1552
                        for _, aid := range htlcIDs {
×
UNCOV
1553
                                if err := htlcsBucket.Delete(
×
UNCOV
1554
                                        htlcBucketKey(htlcAttemptInfoKey, aid),
×
1555
                                ); err != nil {
×
1556
                                        return err
×
UNCOV
1557
                                }
×
1558

UNCOV
1559
                                if err := htlcsBucket.Delete(
×
UNCOV
1560
                                        htlcBucketKey(htlcFailInfoKey, aid),
×
1561
                                ); err != nil {
×
1562
                                        return err
×
UNCOV
1563
                                }
×
1564

UNCOV
1565
                                if err := htlcsBucket.Delete(
×
UNCOV
1566
                                        htlcBucketKey(htlcSettleInfoKey, aid),
×
1567
                                ); err != nil {
×
1568
                                        return err
×
UNCOV
1569
                                }
×
1570
                        }
1571
                }
1572

1573
                for _, k := range deleteBuckets {
6✔
1574
                        if err := payments.DeleteNestedBucket(k); err != nil {
3✔
1575
                                return err
×
UNCOV
1576
                        }
×
1577
                }
1578

1579
                // Get our index bucket and delete all indexes pointing to the
1580
                // payments we are deleting.
1581
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
1582
                for _, k := range deleteIndexes {
6✔
1583
                        if err := indexBucket.Delete(k); err != nil {
3✔
1584
                                return err
×
UNCOV
1585
                        }
×
1586
                }
1587

1588
                return nil
3✔
1589
        }, func() {
3✔
1590
                numPayments = 0
3✔
1591
        })
3✔
1592
        if err != nil {
3✔
1593
                return 0, err
×
UNCOV
1594
        }
×
1595

1596
        return numPayments, nil
3✔
1597
}
1598

1599
// fetchSequenceNumbers fetches all the sequence numbers associated with a
1600
// payment, including those belonging to any duplicate payments.
1601
func fetchSequenceNumbers(paymentBucket kvdb.RBucket) ([][]byte, error) {
3✔
1602
        seqNum := paymentBucket.Get(paymentSequenceKey)
3✔
1603
        if seqNum == nil {
3✔
1604
                return nil, errors.New("expected sequence number")
×
UNCOV
1605
        }
×
1606

1607
        sequenceNumbers := [][]byte{seqNum}
3✔
1608

3✔
1609
        // Get the duplicate payments bucket, if it has no duplicates, just
3✔
1610
        // return early with the payment sequence number.
3✔
1611
        duplicates := paymentBucket.NestedReadBucket(duplicatePaymentsBucket)
3✔
1612
        if duplicates == nil {
6✔
1613
                return sequenceNumbers, nil
3✔
1614
        }
3✔
1615

1616
        // If we do have duplicated, they are keyed by sequence number, so we
1617
        // iterate through the duplicates bucket and add them to our set of
1618
        // sequence numbers.
UNCOV
1619
        if err := duplicates.ForEach(func(k, v []byte) error {
×
UNCOV
1620
                sequenceNumbers = append(sequenceNumbers, k)
×
UNCOV
1621
                return nil
×
1622
        }); err != nil {
×
1623
                return nil, err
×
UNCOV
1624
        }
×
1625

UNCOV
1626
        return sequenceNumbers, nil
×
1627
}
1628

1629
func serializePaymentCreationInfo(w io.Writer, c *PaymentCreationInfo) error {
3✔
1630
        var scratch [8]byte
3✔
1631

3✔
1632
        if _, err := w.Write(c.PaymentIdentifier[:]); err != nil {
3✔
1633
                return err
×
UNCOV
1634
        }
×
1635

1636
        byteOrder.PutUint64(scratch[:], uint64(c.Value))
3✔
1637
        if _, err := w.Write(scratch[:]); err != nil {
3✔
1638
                return err
×
UNCOV
1639
        }
×
1640

1641
        if err := serializeTime(w, c.CreationTime); err != nil {
3✔
1642
                return err
×
UNCOV
1643
        }
×
1644

1645
        byteOrder.PutUint32(scratch[:4], uint32(len(c.PaymentRequest)))
3✔
1646
        if _, err := w.Write(scratch[:4]); err != nil {
3✔
1647
                return err
×
UNCOV
1648
        }
×
1649

1650
        if _, err := w.Write(c.PaymentRequest); err != nil {
3✔
1651
                return err
×
UNCOV
1652
        }
×
1653

1654
        // Any remaining bytes are TLV encoded records. Currently, these are
1655
        // only the custom records provided by the user to be sent to the first
1656
        // hop. But this can easily be extended with further records by merging
1657
        // the records into a single TLV stream.
1658
        err := c.FirstHopCustomRecords.SerializeTo(w)
3✔
1659
        if err != nil {
3✔
1660
                return err
×
UNCOV
1661
        }
×
1662

1663
        return nil
3✔
1664
}
1665

1666
func deserializePaymentCreationInfo(r io.Reader) (*PaymentCreationInfo,
1667
        error) {
3✔
1668

3✔
1669
        var scratch [8]byte
3✔
1670

3✔
1671
        c := &PaymentCreationInfo{}
3✔
1672

3✔
1673
        if _, err := io.ReadFull(r, c.PaymentIdentifier[:]); err != nil {
3✔
1674
                return nil, err
×
UNCOV
1675
        }
×
1676

1677
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
3✔
1678
                return nil, err
×
UNCOV
1679
        }
×
1680
        c.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
3✔
1681

3✔
1682
        creationTime, err := deserializeTime(r)
3✔
1683
        if err != nil {
3✔
1684
                return nil, err
×
UNCOV
1685
        }
×
1686
        c.CreationTime = creationTime
3✔
1687

3✔
1688
        if _, err := io.ReadFull(r, scratch[:4]); err != nil {
3✔
1689
                return nil, err
×
UNCOV
1690
        }
×
1691

1692
        reqLen := byteOrder.Uint32(scratch[:4])
3✔
1693
        payReq := make([]byte, reqLen)
3✔
1694
        if reqLen > 0 {
6✔
1695
                if _, err := io.ReadFull(r, payReq); err != nil {
3✔
1696
                        return nil, err
×
UNCOV
1697
                }
×
1698
        }
1699
        c.PaymentRequest = payReq
3✔
1700

3✔
1701
        // Any remaining bytes are TLV encoded records. Currently, these are
3✔
1702
        // only the custom records provided by the user to be sent to the first
3✔
1703
        // hop. But this can easily be extended with further records by merging
3✔
1704
        // the records into a single TLV stream.
3✔
1705
        c.FirstHopCustomRecords, err = lnwire.ParseCustomRecordsFrom(r)
3✔
1706
        if err != nil {
3✔
1707
                return nil, err
×
UNCOV
1708
        }
×
1709

1710
        return c, nil
3✔
1711
}
1712

1713
func serializeHTLCAttemptInfo(w io.Writer, a *HTLCAttemptInfo) error {
3✔
1714
        if err := WriteElements(w, a.sessionKey); err != nil {
3✔
1715
                return err
×
UNCOV
1716
        }
×
1717

1718
        if err := SerializeRoute(w, a.Route); err != nil {
3✔
1719
                return err
×
UNCOV
1720
        }
×
1721

1722
        if err := serializeTime(w, a.AttemptTime); err != nil {
3✔
1723
                return err
×
UNCOV
1724
        }
×
1725

1726
        // If the hash is nil we can just return.
1727
        if a.Hash == nil {
3✔
1728
                return nil
×
UNCOV
1729
        }
×
1730

1731
        if _, err := w.Write(a.Hash[:]); err != nil {
3✔
1732
                return err
×
UNCOV
1733
        }
×
1734

1735
        // Merge the fixed/known records together with the custom records to
1736
        // serialize them as a single blob. We can't do this in SerializeRoute
1737
        // because we're in the middle of the byte stream there. We can only do
1738
        // TLV serialization at the end of the stream, since EOF is allowed for
1739
        // a stream if no more data is expected.
1740
        producers := []tlv.RecordProducer{
3✔
1741
                &a.Route.FirstHopAmount,
3✔
1742
        }
3✔
1743
        tlvData, err := lnwire.MergeAndEncode(
3✔
1744
                producers, nil, a.Route.FirstHopWireCustomRecords,
3✔
1745
        )
3✔
1746
        if err != nil {
3✔
1747
                return err
×
UNCOV
1748
        }
×
1749

1750
        if _, err := w.Write(tlvData); err != nil {
3✔
1751
                return err
×
UNCOV
1752
        }
×
1753

1754
        return nil
3✔
1755
}
1756

1757
func deserializeHTLCAttemptInfo(r io.Reader) (*HTLCAttemptInfo, error) {
3✔
1758
        a := &HTLCAttemptInfo{}
3✔
1759
        err := ReadElements(r, &a.sessionKey)
3✔
1760
        if err != nil {
3✔
1761
                return nil, err
×
UNCOV
1762
        }
×
1763

1764
        a.Route, err = DeserializeRoute(r)
3✔
1765
        if err != nil {
3✔
1766
                return nil, err
×
UNCOV
1767
        }
×
1768

1769
        a.AttemptTime, err = deserializeTime(r)
3✔
1770
        if err != nil {
3✔
1771
                return nil, err
×
UNCOV
1772
        }
×
1773

1774
        hash := lntypes.Hash{}
3✔
1775
        _, err = io.ReadFull(r, hash[:])
3✔
1776

3✔
1777
        switch {
3✔
1778
        // Older payment attempts wouldn't have the hash set, in which case we
1779
        // can just return.
1780
        case errors.Is(err, io.EOF), errors.Is(err, io.ErrUnexpectedEOF):
×
UNCOV
1781
                return a, nil
×
1782

1783
        case err != nil:
×
UNCOV
1784
                return nil, err
×
1785

1786
        default:
3✔
1787
        }
1788

1789
        a.Hash = &hash
3✔
1790

3✔
1791
        // Read any remaining data (if any) and parse it into the known records
3✔
1792
        // and custom records.
3✔
1793
        extraData, err := io.ReadAll(r)
3✔
1794
        if err != nil {
3✔
1795
                return nil, err
×
UNCOV
1796
        }
×
1797

1798
        customRecords, _, _, err := lnwire.ParseAndExtractCustomRecords(
3✔
1799
                extraData, &a.Route.FirstHopAmount,
3✔
1800
        )
3✔
1801
        if err != nil {
3✔
1802
                return nil, err
×
UNCOV
1803
        }
×
1804

1805
        a.Route.FirstHopWireCustomRecords = customRecords
3✔
1806

3✔
1807
        return a, nil
3✔
1808
}
1809

1810
func serializeHop(w io.Writer, h *route.Hop) error {
3✔
1811
        if err := WriteElements(w,
3✔
1812
                h.PubKeyBytes[:],
3✔
1813
                h.ChannelID,
3✔
1814
                h.OutgoingTimeLock,
3✔
1815
                h.AmtToForward,
3✔
1816
        ); err != nil {
3✔
1817
                return err
×
UNCOV
1818
        }
×
1819

1820
        if err := binary.Write(w, byteOrder, h.LegacyPayload); err != nil {
3✔
1821
                return err
×
UNCOV
1822
        }
×
1823

1824
        // For legacy payloads, we don't need to write any TLV records, so
1825
        // we'll write a zero indicating the our serialized TLV map has no
1826
        // records.
1827
        if h.LegacyPayload {
3✔
UNCOV
1828
                return WriteElements(w, uint32(0))
×
UNCOV
1829
        }
×
1830

1831
        // Gather all non-primitive TLV records so that they can be serialized
1832
        // as a single blob.
1833
        //
1834
        // TODO(conner): add migration to unify all fields in a single TLV
1835
        // blobs. The split approach will cause headaches down the road as more
1836
        // fields are added, which we can avoid by having a single TLV stream
1837
        // for all payload fields.
1838
        var records []tlv.Record
3✔
1839
        if h.MPP != nil {
6✔
1840
                records = append(records, h.MPP.Record())
3✔
1841
        }
3✔
1842

1843
        // Add blinding point and encrypted data if present.
1844
        if h.EncryptedData != nil {
6✔
1845
                records = append(records, record.NewEncryptedDataRecord(
3✔
1846
                        &h.EncryptedData,
3✔
1847
                ))
3✔
1848
        }
3✔
1849

1850
        if h.BlindingPoint != nil {
6✔
1851
                records = append(records, record.NewBlindingPointRecord(
3✔
1852
                        &h.BlindingPoint,
3✔
1853
                ))
3✔
1854
        }
3✔
1855

1856
        if h.AMP != nil {
6✔
1857
                records = append(records, h.AMP.Record())
3✔
1858
        }
3✔
1859

1860
        if h.Metadata != nil {
3✔
UNCOV
1861
                records = append(records, record.NewMetadataRecord(&h.Metadata))
×
UNCOV
1862
        }
×
1863

1864
        if h.TotalAmtMsat != 0 {
6✔
1865
                totalMsatInt := uint64(h.TotalAmtMsat)
3✔
1866
                records = append(
3✔
1867
                        records, record.NewTotalAmtMsatBlinded(&totalMsatInt),
3✔
1868
                )
3✔
1869
        }
3✔
1870

1871
        // Final sanity check to absolutely rule out custom records that are not
1872
        // custom and write into the standard range.
1873
        if err := h.CustomRecords.Validate(); err != nil {
3✔
1874
                return err
×
UNCOV
1875
        }
×
1876

1877
        // Convert custom records to tlv and add to the record list.
1878
        // MapToRecords sorts the list, so adding it here will keep the list
1879
        // canonical.
1880
        tlvRecords := tlv.MapToRecords(h.CustomRecords)
3✔
1881
        records = append(records, tlvRecords...)
3✔
1882

3✔
1883
        // Otherwise, we'll transform our slice of records into a map of the
3✔
1884
        // raw bytes, then serialize them in-line with a length (number of
3✔
1885
        // elements) prefix.
3✔
1886
        mapRecords, err := tlv.RecordsToMap(records)
3✔
1887
        if err != nil {
3✔
1888
                return err
×
UNCOV
1889
        }
×
1890

1891
        numRecords := uint32(len(mapRecords))
3✔
1892
        if err := WriteElements(w, numRecords); err != nil {
3✔
1893
                return err
×
UNCOV
1894
        }
×
1895

1896
        for recordType, rawBytes := range mapRecords {
6✔
1897
                if err := WriteElements(w, recordType); err != nil {
3✔
1898
                        return err
×
UNCOV
1899
                }
×
1900

1901
                if err := wire.WriteVarBytes(w, 0, rawBytes); err != nil {
3✔
1902
                        return err
×
UNCOV
1903
                }
×
1904
        }
1905

1906
        return nil
3✔
1907
}
1908

1909
// maxOnionPayloadSize is the largest Sphinx payload possible, so we don't need
1910
// to read/write a TLV stream larger than this.
1911
const maxOnionPayloadSize = 1300
1912

1913
func deserializeHop(r io.Reader) (*route.Hop, error) {
3✔
1914
        h := &route.Hop{}
3✔
1915

3✔
1916
        var pub []byte
3✔
1917
        if err := ReadElements(r, &pub); err != nil {
3✔
1918
                return nil, err
×
UNCOV
1919
        }
×
1920
        copy(h.PubKeyBytes[:], pub)
3✔
1921

3✔
1922
        if err := ReadElements(r,
3✔
1923
                &h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward,
3✔
1924
        ); err != nil {
3✔
1925
                return nil, err
×
UNCOV
1926
        }
×
1927

1928
        // TODO(roasbeef): change field to allow LegacyPayload false to be the
1929
        // legacy default?
1930
        err := binary.Read(r, byteOrder, &h.LegacyPayload)
3✔
1931
        if err != nil {
3✔
1932
                return nil, err
×
UNCOV
1933
        }
×
1934

1935
        var numElements uint32
3✔
1936
        if err := ReadElements(r, &numElements); err != nil {
3✔
1937
                return nil, err
×
UNCOV
1938
        }
×
1939

1940
        // If there're no elements, then we can return early.
1941
        if numElements == 0 {
6✔
1942
                return h, nil
3✔
1943
        }
3✔
1944

1945
        tlvMap := make(map[uint64][]byte)
3✔
1946
        for i := uint32(0); i < numElements; i++ {
6✔
1947
                var tlvType uint64
3✔
1948
                if err := ReadElements(r, &tlvType); err != nil {
3✔
1949
                        return nil, err
×
UNCOV
1950
                }
×
1951

1952
                rawRecordBytes, err := wire.ReadVarBytes(
3✔
1953
                        r, 0, maxOnionPayloadSize, "tlv",
3✔
1954
                )
3✔
1955
                if err != nil {
3✔
1956
                        return nil, err
×
UNCOV
1957
                }
×
1958

1959
                tlvMap[tlvType] = rawRecordBytes
3✔
1960
        }
1961

1962
        // If the MPP type is present, remove it from the generic TLV map and
1963
        // parse it back into a proper MPP struct.
1964
        //
1965
        // TODO(conner): add migration to unify all fields in a single TLV
1966
        // blobs. The split approach will cause headaches down the road as more
1967
        // fields are added, which we can avoid by having a single TLV stream
1968
        // for all payload fields.
1969
        mppType := uint64(record.MPPOnionType)
3✔
1970
        if mppBytes, ok := tlvMap[mppType]; ok {
6✔
1971
                delete(tlvMap, mppType)
3✔
1972

3✔
1973
                var (
3✔
1974
                        mpp    = &record.MPP{}
3✔
1975
                        mppRec = mpp.Record()
3✔
1976
                        r      = bytes.NewReader(mppBytes)
3✔
1977
                )
3✔
1978
                err := mppRec.Decode(r, uint64(len(mppBytes)))
3✔
1979
                if err != nil {
3✔
1980
                        return nil, err
×
UNCOV
1981
                }
×
1982
                h.MPP = mpp
3✔
1983
        }
1984

1985
        // If encrypted data or blinding key are present, remove them from
1986
        // the TLV map and parse into proper types.
1987
        encryptedDataType := uint64(record.EncryptedDataOnionType)
3✔
1988
        if data, ok := tlvMap[encryptedDataType]; ok {
6✔
1989
                delete(tlvMap, encryptedDataType)
3✔
1990
                h.EncryptedData = data
3✔
1991
        }
3✔
1992

1993
        blindingType := uint64(record.BlindingPointOnionType)
3✔
1994
        if blindingPoint, ok := tlvMap[blindingType]; ok {
6✔
1995
                delete(tlvMap, blindingType)
3✔
1996

3✔
1997
                h.BlindingPoint, err = btcec.ParsePubKey(blindingPoint)
3✔
1998
                if err != nil {
3✔
1999
                        return nil, fmt.Errorf("invalid blinding point: %w",
×
2000
                                err)
×
UNCOV
2001
                }
×
2002
        }
2003

2004
        ampType := uint64(record.AMPOnionType)
3✔
2005
        if ampBytes, ok := tlvMap[ampType]; ok {
6✔
2006
                delete(tlvMap, ampType)
3✔
2007

3✔
2008
                var (
3✔
2009
                        amp    = &record.AMP{}
3✔
2010
                        ampRec = amp.Record()
3✔
2011
                        r      = bytes.NewReader(ampBytes)
3✔
2012
                )
3✔
2013
                err := ampRec.Decode(r, uint64(len(ampBytes)))
3✔
2014
                if err != nil {
3✔
2015
                        return nil, err
×
UNCOV
2016
                }
×
2017
                h.AMP = amp
3✔
2018
        }
2019

2020
        // If the metadata type is present, remove it from the tlv map and
2021
        // populate directly on the hop.
2022
        metadataType := uint64(record.MetadataOnionType)
3✔
2023
        if metadata, ok := tlvMap[metadataType]; ok {
3✔
UNCOV
2024
                delete(tlvMap, metadataType)
×
UNCOV
2025

×
UNCOV
2026
                h.Metadata = metadata
×
UNCOV
2027
        }
×
2028

2029
        totalAmtMsatType := uint64(record.TotalAmtMsatBlindedType)
3✔
2030
        if totalAmtMsat, ok := tlvMap[totalAmtMsatType]; ok {
6✔
2031
                delete(tlvMap, totalAmtMsatType)
3✔
2032

3✔
2033
                var (
3✔
2034
                        totalAmtMsatInt uint64
3✔
2035
                        buf             [8]byte
3✔
2036
                )
3✔
2037
                if err := tlv.DTUint64(
3✔
2038
                        bytes.NewReader(totalAmtMsat),
3✔
2039
                        &totalAmtMsatInt,
3✔
2040
                        &buf,
3✔
2041
                        uint64(len(totalAmtMsat)),
3✔
2042
                ); err != nil {
3✔
2043
                        return nil, err
×
UNCOV
2044
                }
×
2045

2046
                h.TotalAmtMsat = lnwire.MilliSatoshi(totalAmtMsatInt)
3✔
2047
        }
2048

2049
        h.CustomRecords = tlvMap
3✔
2050

3✔
2051
        return h, nil
3✔
2052
}
2053

2054
// SerializeRoute serializes a route.
2055
func SerializeRoute(w io.Writer, r route.Route) error {
3✔
2056
        if err := WriteElements(w,
3✔
2057
                r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:],
3✔
2058
        ); err != nil {
3✔
2059
                return err
×
UNCOV
2060
        }
×
2061

2062
        if err := WriteElements(w, uint32(len(r.Hops))); err != nil {
3✔
2063
                return err
×
UNCOV
2064
        }
×
2065

2066
        for _, h := range r.Hops {
6✔
2067
                if err := serializeHop(w, h); err != nil {
3✔
2068
                        return err
×
UNCOV
2069
                }
×
2070
        }
2071

2072
        // Any new/extra TLV data is encoded in serializeHTLCAttemptInfo!
2073

2074
        return nil
3✔
2075
}
2076

2077
// DeserializeRoute deserializes a route.
2078
func DeserializeRoute(r io.Reader) (route.Route, error) {
3✔
2079
        rt := route.Route{}
3✔
2080
        if err := ReadElements(r,
3✔
2081
                &rt.TotalTimeLock, &rt.TotalAmount,
3✔
2082
        ); err != nil {
3✔
2083
                return rt, err
×
UNCOV
2084
        }
×
2085

2086
        var pub []byte
3✔
2087
        if err := ReadElements(r, &pub); err != nil {
3✔
2088
                return rt, err
×
UNCOV
2089
        }
×
2090
        copy(rt.SourcePubKey[:], pub)
3✔
2091

3✔
2092
        var numHops uint32
3✔
2093
        if err := ReadElements(r, &numHops); err != nil {
3✔
2094
                return rt, err
×
UNCOV
2095
        }
×
2096

2097
        var hops []*route.Hop
3✔
2098
        for i := uint32(0); i < numHops; i++ {
6✔
2099
                hop, err := deserializeHop(r)
3✔
2100
                if err != nil {
3✔
2101
                        return rt, err
×
UNCOV
2102
                }
×
2103
                hops = append(hops, hop)
3✔
2104
        }
2105
        rt.Hops = hops
3✔
2106

3✔
2107
        // Any new/extra TLV data is decoded in deserializeHTLCAttemptInfo!
3✔
2108

3✔
2109
        return rt, nil
3✔
2110
}
2111

2112
// serializeHTLCSettleInfo serializes the details of a settled htlc.
2113
func serializeHTLCSettleInfo(w io.Writer, s *HTLCSettleInfo) error {
3✔
2114
        if _, err := w.Write(s.Preimage[:]); err != nil {
3✔
UNCOV
2115
                return err
×
UNCOV
2116
        }
×
2117

2118
        if err := serializeTime(w, s.SettleTime); err != nil {
3✔
UNCOV
2119
                return err
×
UNCOV
2120
        }
×
2121

2122
        return nil
3✔
2123
}
2124

2125
// deserializeHTLCSettleInfo deserializes the details of a settled htlc.
2126
func deserializeHTLCSettleInfo(r io.Reader) (*HTLCSettleInfo, error) {
3✔
2127
        s := &HTLCSettleInfo{}
3✔
2128
        if _, err := io.ReadFull(r, s.Preimage[:]); err != nil {
3✔
UNCOV
2129
                return nil, err
×
UNCOV
2130
        }
×
2131

2132
        var err error
3✔
2133
        s.SettleTime, err = deserializeTime(r)
3✔
2134
        if err != nil {
3✔
UNCOV
2135
                return nil, err
×
UNCOV
2136
        }
×
2137

2138
        return s, nil
3✔
2139
}
2140

2141
// serializeHTLCFailInfo serializes the details of a failed htlc including the
2142
// wire failure.
2143
func serializeHTLCFailInfo(w io.Writer, f *HTLCFailInfo) error {
3✔
2144
        if err := serializeTime(w, f.FailTime); err != nil {
3✔
UNCOV
2145
                return err
×
UNCOV
2146
        }
×
2147

2148
        // Write failure. If there is no failure message, write an empty
2149
        // byte slice.
2150
        var messageBytes bytes.Buffer
3✔
2151
        if f.Message != nil {
6✔
2152
                err := lnwire.EncodeFailureMessage(&messageBytes, f.Message, 0)
3✔
2153
                if err != nil {
3✔
UNCOV
2154
                        return err
×
UNCOV
2155
                }
×
2156
        }
2157
        if err := wire.WriteVarBytes(w, 0, messageBytes.Bytes()); err != nil {
3✔
UNCOV
2158
                return err
×
UNCOV
2159
        }
×
2160

2161
        return WriteElements(w, byte(f.Reason), f.FailureSourceIndex)
3✔
2162
}
2163

2164
// deserializeHTLCFailInfo deserializes the details of a failed htlc including
2165
// the wire failure.
2166
func deserializeHTLCFailInfo(r io.Reader) (*HTLCFailInfo, error) {
3✔
2167
        f := &HTLCFailInfo{}
3✔
2168
        var err error
3✔
2169
        f.FailTime, err = deserializeTime(r)
3✔
2170
        if err != nil {
3✔
UNCOV
2171
                return nil, err
×
UNCOV
2172
        }
×
2173

2174
        // Read failure.
2175
        failureBytes, err := wire.ReadVarBytes(
3✔
2176
                r, 0, math.MaxUint16, "failure",
3✔
2177
        )
3✔
2178
        if err != nil {
3✔
UNCOV
2179
                return nil, err
×
UNCOV
2180
        }
×
2181
        if len(failureBytes) > 0 {
6✔
2182
                f.Message, err = lnwire.DecodeFailureMessage(
3✔
2183
                        bytes.NewReader(failureBytes), 0,
3✔
2184
                )
3✔
2185
                if err != nil {
3✔
UNCOV
2186
                        return nil, err
×
UNCOV
2187
                }
×
2188
        }
2189

2190
        var reason byte
3✔
2191
        err = ReadElements(r, &reason, &f.FailureSourceIndex)
3✔
2192
        if err != nil {
3✔
UNCOV
2193
                return nil, err
×
UNCOV
2194
        }
×
2195
        f.Reason = HTLCFailReason(reason)
3✔
2196

3✔
2197
        return f, nil
3✔
2198
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc