• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16291181271

15 Jul 2025 10:47AM UTC coverage: 57.167% (-10.2%) from 67.349%
16291181271

Pull #9822

github

web-flow
Merge dabf3ae6a into 302551ade
Pull Request #9822: Refactor Payments Code (Head PR for refactor to make sure the itest pass)

650 of 2407 new or added lines in 25 files covered. (27.0%)

28129 existing lines in 454 files now uncovered.

98745 of 172731 relevant lines covered (57.17%)

1.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.59
/payments/db/kvstore.go
1
package paymentsdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "sort"
12
        "sync"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcec/v2"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/kvdb"
19
        "github.com/lightningnetwork/lnd/lntypes"
20
        "github.com/lightningnetwork/lnd/lnwire"
21
        "github.com/lightningnetwork/lnd/record"
22
        "github.com/lightningnetwork/lnd/routing/route"
23
        "github.com/lightningnetwork/lnd/tlv"
24
)
25

26
var (
27
        // paymentsRootBucket is the name of the top-level bucket within the
28
        // database that stores all data related to payments. Within this
29
        // bucket, each payment hash its own sub-bucket keyed by its payment
30
        // hash.
31
        //
32
        // Bucket hierarchy:
33
        //
34
        // root-bucket
35
        //      |
36
        //      |-- <paymenthash>
37
        //      |        |--sequence-key: <sequence number>
38
        //      |        |--creation-info-key: <creation info>
39
        //      |        |--fail-info-key: <(optional) fail info>
40
        //      |        |
41
        //      |        |--payment-htlcs-bucket (shard-bucket)
42
        //      |        |        |
43
        //      |        |        |-- ai<htlc attempt ID>: <htlc attempt info>
44
        //      |        |        |-- si<htlc attempt ID>: <(optional) settle info>
45
        //      |        |        |-- fi<htlc attempt ID>: <(optional) fail info>
46
        //      |        |        |
47
        //      |        |       ...
48
        //      |        |
49
        //      |        |
50
        //      |        |--duplicate-bucket (only for old, completed payments)
51
        //      |                 |
52
        //      |                 |-- <seq-num>
53
        //      |                 |       |--sequence-key: <sequence number>
54
        //      |                 |       |--creation-info-key: <creation info>
55
        //      |                 |       |--ai: <attempt info>
56
        //      |                 |       |--si: <settle info>
57
        //      |                 |       |--fi: <fail info>
58
        //      |                 |
59
        //      |                 |-- <seq-num>
60
        //      |                 |       |
61
        //      |                ...     ...
62
        //      |
63
        //      |-- <paymenthash>
64
        //      |        |
65
        //      |       ...
66
        //     ...
67
        //
68
        paymentsRootBucket = []byte("payments-root-bucket")
69

70
        // paymentSequenceKey is a key used in the payment's sub-bucket to
71
        // store the sequence number of the payment.
72
        paymentSequenceKey = []byte("payment-sequence-key")
73

74
        // paymentCreationInfoKey is a key used in the payment's sub-bucket to
75
        // store the creation info of the payment.
76
        paymentCreationInfoKey = []byte("payment-creation-info")
77

78
        // paymentHtlcsBucket is a bucket where we'll store the information
79
        // about the HTLCs that were attempted for a payment.
80
        paymentHtlcsBucket = []byte("payment-htlcs-bucket")
81

82
        // htlcAttemptInfoKey is the key used as the prefix of an HTLC attempt
83
        // to store the info about the attempt that was done for the HTLC in
84
        // question. The HTLC attempt ID is concatenated at the end.
85
        htlcAttemptInfoKey = []byte("ai")
86

87
        // htlcSettleInfoKey is the key used as the prefix of an HTLC attempt
88
        // settle info, if any. The HTLC attempt ID is concatenated at the end.
89
        htlcSettleInfoKey = []byte("si")
90

91
        // htlcFailInfoKey is the key used as the prefix of an HTLC attempt
92
        // failure information, if any.The  HTLC attempt ID is concatenated at
93
        // the end.
94
        htlcFailInfoKey = []byte("fi")
95

96
        // paymentFailInfoKey is a key used in the payment's sub-bucket to
97
        // store information about the reason a payment failed.
98
        paymentFailInfoKey = []byte("payment-fail-info")
99

100
        // paymentsIndexBucket is the name of the top-level bucket within the
101
        // database that stores an index of payment sequence numbers to its
102
        // payment hash.
103
        // payments-sequence-index-bucket
104
        //         |--<sequence-number>: <payment hash>
105
        //         |--...
106
        //         |--<sequence-number>: <payment hash>
107
        paymentsIndexBucket = []byte("payments-index-bucket")
108
)
109

110
// bucket name for duplicate payments.
111
var (
112
        // duplicatePaymentsBucket is the name of a optional sub-bucket within
113
        // the payment hash bucket, that is used to hold duplicate payments to a
114
        // payment hash. This is needed to support information from earlier
115
        // versions of lnd, where it was possible to pay to a payment hash more
116
        // than once.
117
        duplicatePaymentsBucket = []byte("payment-duplicate-bucket")
118

119
        // duplicatePaymentSettleInfoKey is a key used in the payment's
120
        // sub-bucket to store the settle info of the payment.
121
        duplicatePaymentSettleInfoKey = []byte("payment-settle-info")
122

123
        // duplicatePaymentAttemptInfoKey is a key used in the payment's
124
        // sub-bucket to store the info about the latest attempt that was done
125
        // for the payment in question.
126
        duplicatePaymentAttemptInfoKey = []byte("payment-attempt-info")
127

128
        // duplicatePaymentCreationInfoKey is a key used in the payment's
129
        // sub-bucket to store the creation info of the payment.
130
        duplicatePaymentCreationInfoKey = []byte("payment-creation-info")
131

132
        // duplicatePaymentFailInfoKey is a key used in the payment's sub-bucket
133
        // to store information about the reason a payment failed.
134
        duplicatePaymentFailInfoKey = []byte("payment-fail-info")
135

136
        // duplicatePaymentSequenceKey is a key used in the payment's sub-bucket
137
        // to store the sequence number of the payment.
138
        duplicatePaymentSequenceKey = []byte("payment-sequence-key")
139
)
140

141
var (
142
        // ErrNoSequenceNumber is returned if we look up a payment which does
143
        // not have a sequence number.
144
        ErrNoSequenceNumber = errors.New("sequence number not found")
145

146
        // ErrDuplicateNotFound is returned when we lookup a payment by its
147
        // index and cannot find a payment with a matching sequence number.
148
        ErrDuplicateNotFound = errors.New("duplicate payment not found")
149

150
        // ErrNoDuplicateBucket is returned when we expect to find duplicates
151
        // when looking up a payment from its index, but the payment does not
152
        // have any.
153
        ErrNoDuplicateBucket = errors.New("expected duplicate bucket")
154

155
        // ErrNoDuplicateNestedBucket is returned if we do not find duplicate
156
        // payments in their own sub-bucket.
157
        ErrNoDuplicateNestedBucket = errors.New("nested duplicate bucket not " +
158
                "found")
159
)
160

161
// Payment operations related constants.
162
const (
163
        // paymentSeqBlockSize is the block size used when we batch allocate
164
        // payment sequences for future payments.
165
        paymentSeqBlockSize = 1000
166

167
        // paymentProgrespLogInterval is the interval we use limiting the
168
        // logging output of payment processing.
169
        paymentProgrespLogInterval = 30 * time.Second
170
)
171

172
// KVStore is kv implementation of the payment store.
173
type KVStore struct {
174
        // db is the underlying database implementation.
175
        db kvdb.Backend
176

177
        kvStoreConfig *StoreOptions
178

179
        // Sequence management for the kv store.
180
        seqMu     sync.Mutex
181
        currSeq   uint64
182
        storedSeq uint64
183

184
        // Sequencer for the htlc attempts.
185
        Sequencer
186
}
187

188
// defaultKVStoreOptions returns the default options for the KV store.
189
func defaultKVStoreOptions() *StoreOptions {
3✔
190
        return &StoreOptions{
3✔
191
                keepFailedPaymentAttempts: false,
3✔
192
        }
3✔
193
}
3✔
194

195
// NewKVStore creates a new KVStore for payments.
196
func NewKVStore(db kvdb.Backend, options ...OptionModifier) (*KVStore,
197
        error) {
3✔
198

3✔
199
        opts := defaultKVStoreOptions()
3✔
200
        for _, applyOption := range options {
6✔
201
                applyOption(opts)
3✔
202
        }
3✔
203

204
        if !opts.NoMigration {
6✔
205
                if err := initKVStore(db); err != nil {
3✔
NEW
206
                        return nil, err
×
NEW
207
                }
×
208
        }
209

210
        sequencer, err := newPersistentSequencer(db)
3✔
211
        if err != nil {
3✔
NEW
212
                return nil, err
×
NEW
213
        }
×
214

215
        return &KVStore{
3✔
216
                db:            db,
3✔
217
                Sequencer:     sequencer,
3✔
218
                kvStoreConfig: opts,
3✔
219
        }, nil
3✔
220
}
221

222
var paymentsTopLevelBuckets = [][]byte{
223
        paymentsRootBucket,
224
        paymentsIndexBucket,
225
}
226

227
// initKVStore creates and initializes the top-level buckets for the payment db.
228
func initKVStore(db kvdb.Backend) error {
3✔
229
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
230
                for _, tlb := range paymentsTopLevelBuckets {
6✔
231
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
3✔
NEW
232
                                return err
×
NEW
233
                        }
×
234
                }
235

236
                return nil
3✔
237
        }, func() {})
3✔
238
        if err != nil {
3✔
NEW
239
                return fmt.Errorf("unable to create new payments db: %w", err)
×
NEW
240
        }
×
241

242
        return nil
3✔
243
}
244

245
// Wipe completely deletes all saved state within all used buckets within the
246
// database. The deletion is done in a single transaction, therefore this
247
// operation is fully atomic.
248
//
249
// NOTE: USE WITH CAUTION.
NEW
250
func (s *KVStore) Wipe() error {
×
NEW
251
        err := kvdb.Update(s.db, func(tx kvdb.RwTx) error {
×
NEW
252
                for _, tlb := range paymentsTopLevelBuckets {
×
NEW
253
                        err := tx.DeleteTopLevelBucket(tlb)
×
NEW
254
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
NEW
255
                                return err
×
NEW
256
                        }
×
257
                }
258

NEW
259
                return nil
×
NEW
260
        }, func() {})
×
NEW
261
        if err != nil {
×
NEW
262
                return err
×
NEW
263
        }
×
264

NEW
265
        return initKVStore(s.db)
×
266
}
267

268
// nextPaymentSequence returns the next sequence number to store for a new
269
// payment.
270
func (s *KVStore) nextPaymentSequence() ([]byte, error) {
3✔
271
        s.seqMu.Lock()
3✔
272
        defer s.seqMu.Unlock()
3✔
273

3✔
274
        // Set a new upper bound in the DB every 1000 payments to avoid
3✔
275
        // conflicts on the sequence when using etcd.
3✔
276
        if s.currSeq == s.storedSeq {
6✔
277
                var currSeq, newUpperBound uint64
3✔
278
                if err := kvdb.Update(s.db, func(tx kvdb.RwTx) error {
6✔
279
                        paymentsBucket, err := tx.CreateTopLevelBucket(
3✔
280
                                paymentsRootBucket,
3✔
281
                        )
3✔
282
                        if err != nil {
3✔
NEW
283
                                return err
×
NEW
284
                        }
×
285

286
                        currSeq = paymentsBucket.Sequence()
3✔
287
                        newUpperBound = currSeq + paymentSeqBlockSize
3✔
288

3✔
289
                        return paymentsBucket.SetSequence(newUpperBound)
3✔
290
                }, func() {}); err != nil {
3✔
NEW
291
                        return nil, err
×
NEW
292
                }
×
293

294
                // We lazy initialize the cached currPaymentSeq here using the
295
                // first nextPaymentSequence() call. This if statement will auto
296
                // initialize our stored currPaymentSeq, since by default both
297
                // this variable and storedPaymentSeq are zero which in turn
298
                // will have us fetch the current values from the DB.
299
                if s.currSeq == 0 {
6✔
300
                        s.currSeq = currSeq
3✔
301
                }
3✔
302

303
                s.storedSeq = newUpperBound
3✔
304
        }
305

306
        s.currSeq++
3✔
307
        b := make([]byte, 8)
3✔
308
        binary.BigEndian.PutUint64(b, s.currSeq)
3✔
309

3✔
310
        return b, nil
3✔
311
}
312

313
// InitPayment checks or records the given PaymentCreationInfo with the DB,
314
// making sure it does not already exist as an in-flight payment. When this
315
// method returns successfully, the payment is guaranteed to be in the InFlight
316
// state.
317
func (s *KVStore) InitPayment(paymentHash lntypes.Hash,
318
        info *PaymentCreationInfo) error {
3✔
319

3✔
320
        // Obtain a new sequence number for this payment. This is used
3✔
321
        // to sort the payments in order of creation, and also acts as
3✔
322
        // a unique identifier for each payment.
3✔
323
        sequenceNum, err := s.nextPaymentSequence()
3✔
324
        if err != nil {
3✔
NEW
325
                return err
×
NEW
326
        }
×
327

328
        var b bytes.Buffer
3✔
329
        if err := serializePaymentCreationInfo(&b, info); err != nil {
3✔
NEW
330
                return err
×
NEW
331
        }
×
332
        infoBytes := b.Bytes()
3✔
333

3✔
334
        var updateErr error
3✔
335
        err = kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
6✔
336
                // Reset the update error, to avoid carrying over an error
3✔
337
                // from a previous execution of the batched db transaction.
3✔
338
                updateErr = nil
3✔
339

3✔
340
                prefetchPayment(tx, paymentHash)
3✔
341
                bucket, err := createPaymentBucket(tx, paymentHash)
3✔
342
                if err != nil {
3✔
NEW
343
                        return err
×
NEW
344
                }
×
345

346
                // Get the existing status of this payment, if any.
347
                paymentStatus, err := fetchPaymentStatus(bucket)
3✔
348

3✔
349
                switch {
3✔
350
                // If no error is returned, it means we already have this
351
                // payment. We'll check the status to decide whether we allow
352
                // retrying the payment or return a specific error.
353
                case err == nil:
3✔
354
                        if err := paymentStatus.Initializable(); err != nil {
6✔
355
                                updateErr = err
3✔
356
                                return nil
3✔
357
                        }
3✔
358

359
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
360
                // we'll return the error.
NEW
361
                case !errors.Is(err, ErrPaymentNotInitiated):
×
NEW
362
                        return err
×
363
                }
364

365
                // Before we set our new sequence number, we check whether this
366
                // payment has a previously set sequence number and remove its
367
                // index entry if it exists. This happens in the case where we
368
                // have a previously attempted payment which was left in a state
369
                // where we can retry.
370
                seqBytes := bucket.Get(paymentSequenceKey)
3✔
371
                if seqBytes != nil {
6✔
372
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
373
                        if err := indexBucket.Delete(seqBytes); err != nil {
3✔
NEW
374
                                return err
×
NEW
375
                        }
×
376
                }
377

378
                // Once we have obtained a sequence number, we add an entry
379
                // to our index bucket which will map the sequence number to
380
                // our payment identifier.
381
                err = createPaymentIndexEntry(
3✔
382
                        tx, sequenceNum, info.PaymentIdentifier,
3✔
383
                )
3✔
384
                if err != nil {
3✔
NEW
385
                        return err
×
NEW
386
                }
×
387

388
                err = bucket.Put(paymentSequenceKey, sequenceNum)
3✔
389
                if err != nil {
3✔
NEW
390
                        return err
×
NEW
391
                }
×
392

393
                // Add the payment info to the bucket, which contains the
394
                // static information for this payment
395
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
3✔
396
                if err != nil {
3✔
NEW
397
                        return err
×
NEW
398
                }
×
399

400
                // We'll delete any lingering HTLCs to start with, in case we
401
                // are initializing a payment that was attempted earlier, but
402
                // left in a state where we could retry.
403
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
3✔
404
                if err != nil && !errors.Is(err, kvdb.ErrBucketNotFound) {
3✔
NEW
405
                        return err
×
NEW
406
                }
×
407

408
                // Also delete any lingering failure info now that we are
409
                // re-attempting.
410
                return bucket.Delete(paymentFailInfoKey)
3✔
411
        })
412
        if err != nil {
3✔
NEW
413
                return fmt.Errorf("unable to init payment: %w", err)
×
NEW
414
        }
×
415

416
        return updateErr
3✔
417
}
418

419
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
420
// by the KVPaymentsDB db.
421
func (s *KVStore) DeleteFailedAttempts(hash lntypes.Hash) error {
3✔
422
        if !s.kvStoreConfig.keepFailedPaymentAttempts {
3✔
NEW
423
                const failedAttemptsOnly = true
×
NEW
424
                err := s.DeletePayment(hash, failedAttemptsOnly)
×
NEW
425
                if err != nil {
×
NEW
426
                        return err
×
NEW
427
                }
×
428
        }
429

430
        return nil
3✔
431
}
432

433
// paymentIndexTypeHash is a payment index type which indicates that we have
434
// created an index of payment sequence number to payment hash.
435
type paymentIndexType uint8
436

437
// paymentIndexTypeHash is a payment index type which indicates that we have
438
// created an index of payment sequence number to payment hash.
439
const paymentIndexTypeHash paymentIndexType = 0
440

441
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
442
// index produced contains a payment index type (which can be used in future to
443
// signal different payment index types) and the payment identifier.
444
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
445
        id lntypes.Hash) error {
3✔
446

3✔
447
        var b bytes.Buffer
3✔
448
        WriteElements(&b, paymentIndexTypeHash, id[:])
3✔
449

3✔
450
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
451

3✔
452
        return indexes.Put(sequenceNumber, b.Bytes())
3✔
453
}
3✔
454

455
// deserializePaymentIndex deserializes a payment index entry. This function
456
// currently only supports deserialization of payment hash indexes, and will
457
// fail for other types.
458
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
3✔
459
        var (
3✔
460
                indexType   paymentIndexType
3✔
461
                paymentHash []byte
3✔
462
        )
3✔
463

3✔
464
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
3✔
NEW
465
                return lntypes.Hash{}, err
×
NEW
466
        }
×
467

468
        // While we only have on payment index type, we do not need to use our
469
        // index type to deserialize the index. However, we sanity check that
470
        // this type is as expected, since we had to read it out anyway.
471
        if indexType != paymentIndexTypeHash {
3✔
NEW
472
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
NEW
473
                        "type: %v", indexType)
×
NEW
474
        }
×
475

476
        hash, err := lntypes.MakeHash(paymentHash)
3✔
477
        if err != nil {
3✔
NEW
478
                return lntypes.Hash{}, err
×
NEW
479
        }
×
480

481
        return hash, nil
3✔
482
}
483

484
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
485
// DB.
486
func (s *KVStore) RegisterAttempt(paymentHash lntypes.Hash,
487
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
3✔
488

3✔
489
        // Serialize the information before opening the db transaction.
3✔
490
        var a bytes.Buffer
3✔
491
        err := serializeHTLCAttemptInfo(&a, attempt)
3✔
492
        if err != nil {
3✔
NEW
493
                return nil, err
×
NEW
494
        }
×
495
        htlcInfoBytes := a.Bytes()
3✔
496

3✔
497
        htlcIDBytes := make([]byte, 8)
3✔
498
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
3✔
499

3✔
500
        var payment *MPPayment
3✔
501
        err = kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
6✔
502
                prefetchPayment(tx, paymentHash)
3✔
503
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
504
                if err != nil {
3✔
NEW
505
                        return err
×
NEW
506
                }
×
507

508
                payment, err = fetchPayment(bucket)
3✔
509
                if err != nil {
3✔
NEW
510
                        return err
×
NEW
511
                }
×
512

513
                // Check if registering a new attempt is allowed.
514
                if err := payment.Registrable(); err != nil {
3✔
NEW
515
                        return err
×
NEW
516
                }
×
517

518
                // Verify the attempt is compatible with the existing payment.
519
                if err := verifyAttempt(payment, attempt); err != nil {
3✔
NEW
520
                        return err
×
NEW
521
                }
×
522

523
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
3✔
524
                        paymentHtlcsBucket,
3✔
525
                )
3✔
526
                if err != nil {
3✔
NEW
527
                        return err
×
NEW
528
                }
×
529

530
                err = htlcsBucket.Put(
3✔
531
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
3✔
532
                        htlcInfoBytes,
3✔
533
                )
3✔
534
                if err != nil {
3✔
NEW
535
                        return err
×
NEW
536
                }
×
537

538
                // Retrieve attempt info for the notification.
539
                payment, err = fetchPayment(bucket)
3✔
540

3✔
541
                return err
3✔
542
        })
543
        if err != nil {
3✔
NEW
544
                return nil, err
×
NEW
545
        }
×
546

547
        return payment, err
3✔
548
}
549

550
// SettleAttempt marks the given attempt settled with the preimage. If this is
551
// a multi shard payment, this might implicitly mean that the full payment
552
// succeeded.
553
//
554
// After invoking this method, InitPayment should always return an error to
555
// prevent us from making duplicate payments to the same payment hash. The
556
// provided preimage is atomically saved to the DB for record keeping.
557
func (s *KVStore) SettleAttempt(hash lntypes.Hash, attemptID uint64,
558
        settleInfo *HTLCSettleInfo) (*MPPayment, error) {
3✔
559

3✔
560
        var b bytes.Buffer
3✔
561
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
3✔
NEW
562
                return nil, err
×
NEW
563
        }
×
564
        settleBytes := b.Bytes()
3✔
565

3✔
566
        return s.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
3✔
567
}
568

569
// FailAttempt marks the given payment attempt failed.
570
func (s *KVStore) FailAttempt(hash lntypes.Hash, attemptID uint64,
571
        failInfo *HTLCFailInfo) (*MPPayment, error) {
3✔
572

3✔
573
        var b bytes.Buffer
3✔
574
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
3✔
NEW
575
                return nil, err
×
NEW
576
        }
×
577
        failBytes := b.Bytes()
3✔
578

3✔
579
        return s.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
3✔
580
}
581

582
// updateHtlcKey updates a database key for the specified htlc.
583
func (s *KVStore) updateHtlcKey(paymentHash lntypes.Hash,
584
        attemptID uint64, key, value []byte) (*MPPayment, error) {
3✔
585

3✔
586
        aid := make([]byte, 8)
3✔
587
        binary.BigEndian.PutUint64(aid, attemptID)
3✔
588

3✔
589
        var payment *MPPayment
3✔
590
        err := kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
6✔
591
                payment = nil
3✔
592

3✔
593
                prefetchPayment(tx, paymentHash)
3✔
594
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
595
                if err != nil {
3✔
NEW
596
                        return err
×
NEW
597
                }
×
598

599
                payment, err = fetchPayment(bucket)
3✔
600
                if err != nil {
3✔
NEW
601
                        return err
×
NEW
602
                }
×
603

604
                // We can only update keys of in-flight payments. We allow
605
                // updating keys even if the payment has reached a terminal
606
                // condition, since the HTLC outcomes must still be updated.
607
                if err := payment.Status.Updatable(); err != nil {
3✔
NEW
608
                        return err
×
NEW
609
                }
×
610

611
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
3✔
612
                if htlcsBucket == nil {
3✔
NEW
613
                        return fmt.Errorf("htlcs bucket not found")
×
NEW
614
                }
×
615

616
                attemptInfo := htlcsBucket.Get(
3✔
617
                        htlcBucketKey(htlcAttemptInfoKey, aid),
3✔
618
                )
3✔
619
                if attemptInfo == nil {
3✔
NEW
620
                        return fmt.Errorf("HTLC with ID %v not registered",
×
NEW
621
                                attemptID)
×
NEW
622
                }
×
623

624
                failInfo := htlcsBucket.Get(
3✔
625
                        htlcBucketKey(htlcFailInfoKey, aid),
3✔
626
                )
3✔
627
                if failInfo != nil {
3✔
NEW
628
                        return ErrAttemptAlreadyFailed
×
NEW
629
                }
×
630

631
                settleInfo := htlcsBucket.Get(
3✔
632
                        htlcBucketKey(htlcSettleInfoKey, aid),
3✔
633
                )
3✔
634
                if settleInfo != nil {
3✔
NEW
635
                        return ErrAttemptAlreadySettled
×
NEW
636
                }
×
637

638
                // Add or update the key for this htlc.
639
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
3✔
640
                if err != nil {
3✔
NEW
641
                        return err
×
NEW
642
                }
×
643

644
                // Retrieve attempt info for the notification.
645
                payment, err = fetchPayment(bucket)
3✔
646

3✔
647
                return err
3✔
648
        })
649
        if err != nil {
3✔
NEW
650
                return nil, err
×
NEW
651
        }
×
652

653
        return payment, err
3✔
654
}
655

656
// Fail transitions a payment into the Failed state, and records the reason the
657
// payment failed. After invoking this method, InitPayment should return nil on
658
// its next call for this payment hash, allowing the switch to make a
659
// subsequent payment.
660
func (s *KVStore) Fail(paymentHash lntypes.Hash,
661
        reason FailureReason) (*MPPayment, error) {
3✔
662

3✔
663
        var (
3✔
664
                updateErr error
3✔
665
                payment   *MPPayment
3✔
666
        )
3✔
667
        err := kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
6✔
668
                // Reset the update error, to avoid carrying over an error
3✔
669
                // from a previous execution of the batched db transaction.
3✔
670
                updateErr = nil
3✔
671
                payment = nil
3✔
672

3✔
673
                prefetchPayment(tx, paymentHash)
3✔
674
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
675
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
NEW
676
                        updateErr = ErrPaymentNotInitiated
×
NEW
677
                        return nil
×
678
                } else if err != nil {
3✔
NEW
679
                        return err
×
NEW
680
                }
×
681

682
                // We mark the payment as failed as long as it is known. This
683
                // lets the last attempt to fail with a terminal write its
684
                // failure to the KVPaymentsDB without synchronizing with
685
                // other attempts.
686
                _, err = fetchPaymentStatus(bucket)
3✔
687
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
NEW
688
                        updateErr = ErrPaymentNotInitiated
×
NEW
689
                        return nil
×
690
                } else if err != nil {
3✔
NEW
691
                        return err
×
NEW
692
                }
×
693

694
                // Put the failure reason in the bucket for record keeping.
695
                v := []byte{byte(reason)}
3✔
696
                err = bucket.Put(paymentFailInfoKey, v)
3✔
697
                if err != nil {
3✔
NEW
698
                        return err
×
NEW
699
                }
×
700

701
                // Retrieve attempt info for the notification, if available.
702
                payment, err = fetchPayment(bucket)
3✔
703
                if err != nil {
3✔
NEW
704
                        return err
×
NEW
705
                }
×
706

707
                return nil
3✔
708
        })
709
        if err != nil {
3✔
NEW
710
                return nil, err
×
NEW
711
        }
×
712

713
        return payment, updateErr
3✔
714
}
715

716
// FetchPayment returns information about a payment from the database.
717
func (s *KVStore) FetchPayment(paymentHash lntypes.Hash) (
718
        *MPPayment, error) {
3✔
719

3✔
720
        var payment *MPPayment
3✔
721
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
722
                prefetchPayment(tx, paymentHash)
3✔
723
                bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
724
                if err != nil {
3✔
NEW
725
                        return err
×
NEW
726
                }
×
727

728
                payment, err = fetchPayment(bucket)
3✔
729

3✔
730
                return err
3✔
731
        }, func() {
3✔
732
                payment = nil
3✔
733
        })
3✔
734
        if err != nil {
3✔
NEW
735
                return nil, err
×
NEW
736
        }
×
737

738
        return payment, nil
3✔
739
}
740

741
// prefetchPayment attempts to prefetch as much of the payment as possible to
742
// reduce DB roundtrips.
743
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
3✔
744
        rb := kvdb.RootBucket(tx)
3✔
745
        kvdb.Prefetch(
3✔
746
                rb,
3✔
747
                []string{
3✔
748
                        // Prefetch all keys in the payment's bucket.
3✔
749
                        string(paymentsRootBucket),
3✔
750
                        string(paymentHash[:]),
3✔
751
                },
3✔
752
                []string{
3✔
753
                        // Prefetch all keys in the payment's htlc bucket.
3✔
754
                        string(paymentsRootBucket),
3✔
755
                        string(paymentHash[:]),
3✔
756
                        string(paymentHtlcsBucket),
3✔
757
                },
3✔
758
        )
3✔
759
}
3✔
760

761
// createPaymentBucket creates or fetches the sub-bucket assigned to this
762
// payment hash.
763
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
764
        kvdb.RwBucket, error) {
3✔
765

3✔
766
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
3✔
767
        if err != nil {
3✔
NEW
768
                return nil, err
×
NEW
769
        }
×
770

771
        return payments.CreateBucketIfNotExists(paymentHash[:])
3✔
772
}
773

774
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
775
// the bucket does not exist, it returns ErrPaymentNotInitiated.
776
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
777
        kvdb.RBucket, error) {
3✔
778

3✔
779
        payments := tx.ReadBucket(paymentsRootBucket)
3✔
780
        if payments == nil {
3✔
NEW
781
                return nil, ErrPaymentNotInitiated
×
NEW
782
        }
×
783

784
        bucket := payments.NestedReadBucket(paymentHash[:])
3✔
785
        if bucket == nil {
3✔
NEW
786
                return nil, ErrPaymentNotInitiated
×
NEW
787
        }
×
788

789
        return bucket, nil
3✔
790
}
791

792
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
793
// bucket that can be written to.
794
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
795
        kvdb.RwBucket, error) {
3✔
796

3✔
797
        payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
798
        if payments == nil {
3✔
NEW
799
                return nil, ErrPaymentNotInitiated
×
NEW
800
        }
×
801

802
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
3✔
803
        if bucket == nil {
3✔
NEW
804
                return nil, ErrPaymentNotInitiated
×
NEW
805
        }
×
806

807
        return bucket, nil
3✔
808
}
809

810
// fetchPaymentStatus fetches the payment status of the payment. If the payment
811
// isn't found, it will return error `ErrPaymentNotInitiated`.
812
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
3✔
813
        // Creation info should be set for all payments, regardless of state.
3✔
814
        // If not, it is unknown.
3✔
815
        if bucket.Get(paymentCreationInfoKey) == nil {
6✔
816
                return 0, ErrPaymentNotInitiated
3✔
817
        }
3✔
818

819
        payment, err := fetchPayment(bucket)
3✔
820
        if err != nil {
3✔
NEW
821
                return 0, err
×
NEW
822
        }
×
823

824
        return payment.Status, nil
3✔
825
}
826

827
// FetchInFlightPayments returns all payments with status InFlight.
828
func (s *KVStore) FetchInFlightPayments() ([]*MPPayment, error) {
3✔
829
        var (
3✔
830
                inFlights      []*MPPayment
3✔
831
                start          = time.Now()
3✔
832
                lastLogTime    = time.Now()
3✔
833
                processedCount int
3✔
834
        )
3✔
835

3✔
836
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
837
                payments := tx.ReadBucket(paymentsRootBucket)
3✔
838
                if payments == nil {
3✔
NEW
839
                        return nil
×
NEW
840
                }
×
841

842
                return payments.ForEach(func(k, _ []byte) error {
6✔
843
                        bucket := payments.NestedReadBucket(k)
3✔
844
                        if bucket == nil {
3✔
NEW
845
                                return fmt.Errorf("non bucket element")
×
NEW
846
                        }
×
847

848
                        payment, err := fetchPayment(bucket)
3✔
849
                        if err != nil {
3✔
NEW
850
                                return err
×
NEW
851
                        }
×
852

853
                        processedCount++
3✔
854
                        if time.Since(lastLogTime) >=
3✔
855
                                paymentProgrespLogInterval {
3✔
856

×
NEW
857
                                log.Debugf("Scanning inflight payments "+
×
NEW
858
                                        "(in progresp), processed %d, last "+
×
NEW
859
                                        "processed payment: %v", processedCount,
×
NEW
860
                                        payment.Info)
×
861

×
NEW
862
                                lastLogTime = time.Now()
×
NEW
863
                        }
×
864

865
                        // Skip the payment if it's terminated.
866
                        if payment.Terminated() {
6✔
867
                                return nil
3✔
868
                        }
3✔
869

870
                        inFlights = append(inFlights, payment)
3✔
871

3✔
872
                        return nil
3✔
873
                })
874
        }, func() {
3✔
875
                inFlights = nil
3✔
876
        })
3✔
877
        if err != nil {
3✔
NEW
878
                return nil, err
×
NEW
879
        }
×
880

881
        elapsed := time.Since(start)
3✔
882
        log.Debugf("Completed scanning for inflight payments: "+
3✔
883
                "total_processed=%d, found_inflight=%d, elapsed=%v",
3✔
884
                processedCount, len(inFlights),
3✔
885
                elapsed.Round(time.Millisecond))
3✔
886

3✔
887
        return inFlights, nil
3✔
888
}
889

890
// htlcBucketKey creates a composite key from prefix and id where the result is
891
// simply the two concatenated.
892
func htlcBucketKey(prefix, id []byte) []byte {
3✔
893
        key := make([]byte, len(prefix)+len(id))
3✔
894
        copy(key, prefix)
3✔
895
        copy(key[len(prefix):], id)
3✔
896
        return key
3✔
897
}
3✔
898

899
// FetchPayments returns all sent payments found in the DB.
900
//
901
// TODO(ziggie): This is a duplicate can can be achievied with QueryPayments.
902
// nolint: dupl
NEW
903
func (s *KVStore) FetchPayments() ([]*MPPayment, error) {
×
904
        var payments []*MPPayment
×
905

×
NEW
906
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
×
907
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
×
908
                if paymentsBucket == nil {
×
909
                        return nil
×
910
                }
×
911

912
                return paymentsBucket.ForEach(func(k, v []byte) error {
×
913
                        bucket := paymentsBucket.NestedReadBucket(k)
×
914
                        if bucket == nil {
×
915
                                // We only expect sub-buckets to be found in
×
916
                                // this top-level bucket.
×
917
                                return fmt.Errorf("non bucket element in " +
×
918
                                        "payments bucket")
×
919
                        }
×
920

921
                        p, err := fetchPayment(bucket)
×
922
                        if err != nil {
×
923
                                return err
×
924
                        }
×
925

926
                        payments = append(payments, p)
×
927

×
928
                        // For older versions of lnd, duplicate payments to a
×
929
                        // payment has was possible. These will be found in a
×
930
                        // sub-bucket indexed by their sequence number if
×
931
                        // available.
×
932
                        duplicatePayments, err := fetchDuplicatePayments(bucket)
×
933
                        if err != nil {
×
934
                                return err
×
935
                        }
×
936

937
                        payments = append(payments, duplicatePayments...)
×
938
                        return nil
×
939
                })
940
        }, func() {
×
941
                payments = nil
×
942
        })
×
943
        if err != nil {
×
944
                return nil, err
×
945
        }
×
946

947
        // Before returning, sort the payments by their sequence number.
948
        sort.Slice(payments, func(i, j int) bool {
×
949
                return payments[i].SequenceNum < payments[j].SequenceNum
×
950
        })
×
951

952
        return payments, nil
×
953
}
954

955
func fetchCreationInfo(bucket kvdb.RBucket) (*PaymentCreationInfo,
956
        error) {
3✔
957

3✔
958
        b := bucket.Get(paymentCreationInfoKey)
3✔
959
        if b == nil {
3✔
960
                return nil, fmt.Errorf("creation info not found")
×
961
        }
×
962

963
        r := bytes.NewReader(b)
3✔
964
        return deserializePaymentCreationInfo(r)
3✔
965
}
966

967
func fetchPayment(bucket kvdb.RBucket) (*MPPayment, error) {
3✔
968
        seqBytes := bucket.Get(paymentSequenceKey)
3✔
969
        if seqBytes == nil {
3✔
970
                return nil, fmt.Errorf("sequence number not found")
×
971
        }
×
972

973
        sequenceNum := binary.BigEndian.Uint64(seqBytes)
3✔
974

3✔
975
        // Get the PaymentCreationInfo.
3✔
976
        creationInfo, err := fetchCreationInfo(bucket)
3✔
977
        if err != nil {
3✔
978
                return nil, err
×
979
        }
×
980

981
        var htlcs []HTLCAttempt
3✔
982
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
3✔
983
        if htlcsBucket != nil {
6✔
984
                // Get the payment attempts. This can be empty.
3✔
985
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
3✔
986
                if err != nil {
3✔
987
                        return nil, err
×
988
                }
×
989
        }
990

991
        // Get failure reason if available.
992
        var failureReason *FailureReason
3✔
993
        b := bucket.Get(paymentFailInfoKey)
3✔
994
        if b != nil {
6✔
995
                reason := FailureReason(b[0])
3✔
996
                failureReason = &reason
3✔
997
        }
3✔
998

999
        // Create a new payment.
1000
        payment := &MPPayment{
3✔
1001
                SequenceNum:   sequenceNum,
3✔
1002
                Info:          creationInfo,
3✔
1003
                HTLCs:         htlcs,
3✔
1004
                FailureReason: failureReason,
3✔
1005
        }
3✔
1006

3✔
1007
        // Set its state and status.
3✔
1008
        if err := payment.SetState(); err != nil {
3✔
1009
                return nil, err
×
1010
        }
×
1011

1012
        return payment, nil
3✔
1013
}
1014

1015
// fetchHtlcAttempts retrieves all htlc attempts made for the payment found in
1016
// the given bucket.
1017
func fetchHtlcAttempts(bucket kvdb.RBucket) ([]HTLCAttempt, error) {
3✔
1018
        htlcsMap := make(map[uint64]*HTLCAttempt)
3✔
1019

3✔
1020
        attemptInfoCount := 0
3✔
1021
        err := bucket.ForEach(func(k, v []byte) error {
6✔
1022
                aid := byteOrder.Uint64(k[len(k)-8:])
3✔
1023

3✔
1024
                if _, ok := htlcsMap[aid]; !ok {
6✔
1025
                        htlcsMap[aid] = &HTLCAttempt{}
3✔
1026
                }
3✔
1027

1028
                var err error
3✔
1029
                switch {
3✔
1030
                case bytes.HasPrefix(k, htlcAttemptInfoKey):
3✔
1031
                        attemptInfo, err := readHtlcAttemptInfo(v)
3✔
1032
                        if err != nil {
3✔
1033
                                return err
×
1034
                        }
×
1035

1036
                        attemptInfo.AttemptID = aid
3✔
1037
                        htlcsMap[aid].HTLCAttemptInfo = *attemptInfo
3✔
1038
                        attemptInfoCount++
3✔
1039

1040
                case bytes.HasPrefix(k, htlcSettleInfoKey):
3✔
1041
                        htlcsMap[aid].Settle, err = readHtlcSettleInfo(v)
3✔
1042
                        if err != nil {
3✔
1043
                                return err
×
1044
                        }
×
1045

1046
                case bytes.HasPrefix(k, htlcFailInfoKey):
3✔
1047
                        htlcsMap[aid].Failure, err = readHtlcFailInfo(v)
3✔
1048
                        if err != nil {
3✔
1049
                                return err
×
1050
                        }
×
1051

1052
                default:
×
1053
                        return fmt.Errorf("unknown htlc attempt key")
×
1054
                }
1055

1056
                return nil
3✔
1057
        })
1058
        if err != nil {
3✔
1059
                return nil, err
×
1060
        }
×
1061

1062
        // Sanity check that all htlcs have an attempt info.
1063
        if attemptInfoCount != len(htlcsMap) {
3✔
NEW
1064
                return nil, ErrNoAttemptInfo
×
1065
        }
×
1066

1067
        keys := make([]uint64, len(htlcsMap))
3✔
1068
        i := 0
3✔
1069
        for k := range htlcsMap {
6✔
1070
                keys[i] = k
3✔
1071
                i++
3✔
1072
        }
3✔
1073

1074
        // Sort HTLC attempts by their attempt ID. This is needed because in the
1075
        // DB we store the attempts with keys prefixed by their status which
1076
        // changes order (groups them together by status).
1077
        sort.Slice(keys, func(i, j int) bool {
6✔
1078
                return keys[i] < keys[j]
3✔
1079
        })
3✔
1080

1081
        htlcs := make([]HTLCAttempt, len(htlcsMap))
3✔
1082
        for i, key := range keys {
6✔
1083
                htlcs[i] = *htlcsMap[key]
3✔
1084
        }
3✔
1085

1086
        return htlcs, nil
3✔
1087
}
1088

1089
// readHtlcAttemptInfo reads the payment attempt info for this htlc.
1090
func readHtlcAttemptInfo(b []byte) (*HTLCAttemptInfo, error) {
3✔
1091
        r := bytes.NewReader(b)
3✔
1092
        return deserializeHTLCAttemptInfo(r)
3✔
1093
}
3✔
1094

1095
// readHtlcSettleInfo reads the settle info for the htlc. If the htlc isn't
1096
// settled, nil is returned.
1097
func readHtlcSettleInfo(b []byte) (*HTLCSettleInfo, error) {
3✔
1098
        r := bytes.NewReader(b)
3✔
1099
        return deserializeHTLCSettleInfo(r)
3✔
1100
}
3✔
1101

1102
// readHtlcFailInfo reads the failure info for the htlc. If the htlc hasn't
1103
// failed, nil is returned.
1104
func readHtlcFailInfo(b []byte) (*HTLCFailInfo, error) {
3✔
1105
        r := bytes.NewReader(b)
3✔
1106
        return deserializeHTLCFailInfo(r)
3✔
1107
}
3✔
1108

1109
// fetchFailedHtlcKeys retrieves the bucket keys of all failed HTLCs of a
1110
// payment bucket.
1111
func fetchFailedHtlcKeys(bucket kvdb.RBucket) ([][]byte, error) {
×
1112
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
×
1113

×
1114
        var htlcs []HTLCAttempt
×
1115
        var err error
×
1116
        if htlcsBucket != nil {
×
1117
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
×
1118
                if err != nil {
×
1119
                        return nil, err
×
1120
                }
×
1121
        }
1122

1123
        // Now iterate though them and save the bucket keys for the failed
1124
        // HTLCs.
1125
        var htlcKeys [][]byte
×
1126
        for _, h := range htlcs {
×
1127
                if h.Failure == nil {
×
1128
                        continue
×
1129
                }
1130

1131
                htlcKeyBytes := make([]byte, 8)
×
1132
                binary.BigEndian.PutUint64(htlcKeyBytes, h.AttemptID)
×
1133

×
1134
                htlcKeys = append(htlcKeys, htlcKeyBytes)
×
1135
        }
1136

1137
        return htlcKeys, nil
×
1138
}
1139

1140
// QueryPayments is a query to the payments database which is restricted
1141
// to a subset of payments by the payments query, containing an offset
1142
// index and a maximum number of returned payments.
1143
func (s *KVStore) QueryPayments(_ context.Context,
1144
        query Query) (Response, error) {
3✔
1145

3✔
1146
        var resp Response
3✔
1147

3✔
1148
        if err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
1149
                // Get the root payments bucket.
3✔
1150
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
3✔
1151
                if paymentsBucket == nil {
3✔
1152
                        return nil
×
1153
                }
×
1154

1155
                // Get the index bucket which maps sequence number -> payment
1156
                // hash and duplicate bool. If we have a payments bucket, we
1157
                // should have an indexes bucket as well.
1158
                indexes := tx.ReadBucket(paymentsIndexBucket)
3✔
1159
                if indexes == nil {
3✔
1160
                        return fmt.Errorf("index bucket does not exist")
×
1161
                }
×
1162

1163
                // accumulatePayments gets payments with the sequence number
1164
                // and hash provided and adds them to our list of payments if
1165
                // they meet the criteria of our query. It returns the number
1166
                // of payments that were added.
1167
                accumulatePayments := func(sequenceKey, hash []byte) (bool,
3✔
1168
                        error) {
6✔
1169

3✔
1170
                        r := bytes.NewReader(hash)
3✔
1171
                        paymentHash, err := deserializePaymentIndex(r)
3✔
1172
                        if err != nil {
3✔
1173
                                return false, err
×
1174
                        }
×
1175

1176
                        payment, err := fetchPaymentWithSequenceNumber(
3✔
1177
                                tx, paymentHash, sequenceKey,
3✔
1178
                        )
3✔
1179
                        if err != nil {
3✔
1180
                                return false, err
×
1181
                        }
×
1182

1183
                        // To keep compatibility with the old API, we only
1184
                        // return non-succeeded payments if requested.
1185
                        if payment.Status != StatusSucceeded &&
3✔
1186
                                !query.IncludeIncomplete {
3✔
1187

×
1188
                                return false, err
×
1189
                        }
×
1190

1191
                        // Get the creation time in Unix seconds, this always
1192
                        // rounds down the nanoseconds to full seconds.
1193
                        createTime := payment.Info.CreationTime.Unix()
3✔
1194

3✔
1195
                        // Skip any payments that were created before the
3✔
1196
                        // specified time.
3✔
1197
                        if createTime < query.CreationDateStart {
6✔
1198
                                return false, nil
3✔
1199
                        }
3✔
1200

1201
                        // Skip any payments that were created after the
1202
                        // specified time.
1203
                        if query.CreationDateEnd != 0 &&
3✔
1204
                                createTime > query.CreationDateEnd {
6✔
1205

3✔
1206
                                return false, nil
3✔
1207
                        }
3✔
1208

1209
                        // At this point, we've exhausted the offset, so we'll
1210
                        // begin collecting invoices found within the range.
1211
                        resp.Payments = append(resp.Payments, payment)
3✔
1212
                        return true, nil
3✔
1213
                }
1214

1215
                // Create a paginator which reads from our sequence index bucket
1216
                // with the parameters provided by the payments query.
1217
                paginator := channeldb.NewPaginator(
3✔
1218
                        indexes.ReadCursor(), query.Reversed, query.IndexOffset,
3✔
1219
                        query.MaxPayments,
3✔
1220
                )
3✔
1221

3✔
1222
                // Run a paginated query, adding payments to our response.
3✔
1223
                if err := paginator.Query(accumulatePayments); err != nil {
3✔
1224
                        return err
×
1225
                }
×
1226

1227
                // Counting the total number of payments is expensive, since we
1228
                // literally have to traverse the cursor linearly, which can
1229
                // take quite a while. So it's an optional query parameter.
1230
                if query.CountTotal {
3✔
1231
                        var (
×
1232
                                totalPayments uint64
×
1233
                                err           error
×
1234
                        )
×
1235
                        countFn := func(_, _ []byte) error {
×
1236
                                totalPayments++
×
1237

×
1238
                                return nil
×
1239
                        }
×
1240

1241
                        // In non-boltdb database backends, there's a faster
1242
                        // ForAll query that allows for batch fetching items.
1243
                        if fastBucket, ok := indexes.(kvdb.ExtendedRBucket); ok {
×
1244
                                err = fastBucket.ForAll(countFn)
×
1245
                        } else {
×
1246
                                err = indexes.ForEach(countFn)
×
1247
                        }
×
1248
                        if err != nil {
×
1249
                                return fmt.Errorf("error counting payments: %w",
×
1250
                                        err)
×
1251
                        }
×
1252

1253
                        resp.TotalCount = totalPayments
×
1254
                }
1255

1256
                return nil
3✔
1257
        }, func() {
3✔
1258
                resp = Response{}
3✔
1259
        }); err != nil {
3✔
1260
                return resp, err
×
1261
        }
×
1262

1263
        // Need to swap the payments slice order if reversed order.
1264
        if query.Reversed {
3✔
1265
                for l, r := 0, len(resp.Payments)-1; l < r; l, r = l+1, r-1 {
×
1266
                        resp.Payments[l], resp.Payments[r] =
×
1267
                                resp.Payments[r], resp.Payments[l]
×
1268
                }
×
1269
        }
1270

1271
        // Set the first and last index of the returned payments so that the
1272
        // caller can resume from this point later on.
1273
        if len(resp.Payments) > 0 {
6✔
1274
                resp.FirstIndexOffset = resp.Payments[0].SequenceNum
3✔
1275
                resp.LastIndexOffset =
3✔
1276
                        resp.Payments[len(resp.Payments)-1].SequenceNum
3✔
1277
        }
3✔
1278

1279
        return resp, nil
3✔
1280
}
1281

1282
// fetchPaymentWithSequenceNumber get the payment which matches the payment hash
1283
// *and* sequence number provided from the database. This is required because
1284
// we previously had more than one payment per hash, so we have multiple indexes
1285
// pointing to a single payment; we want to retrieve the correct one.
1286
func fetchPaymentWithSequenceNumber(tx kvdb.RTx, paymentHash lntypes.Hash,
1287
        sequenceNumber []byte) (*MPPayment, error) {
3✔
1288

3✔
1289
        // We can now lookup the payment keyed by its hash in
3✔
1290
        // the payments root bucket.
3✔
1291
        bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
1292
        if err != nil {
3✔
1293
                return nil, err
×
1294
        }
×
1295

1296
        // A single payment hash can have multiple payments associated with it.
1297
        // We lookup our sequence number first, to determine whether this is
1298
        // the payment we are actually looking for.
1299
        seqBytes := bucket.Get(paymentSequenceKey)
3✔
1300
        if seqBytes == nil {
3✔
1301
                return nil, ErrNoSequenceNumber
×
1302
        }
×
1303

1304
        // If this top level payment has the sequence number we are looking for,
1305
        // return it.
1306
        if bytes.Equal(seqBytes, sequenceNumber) {
6✔
1307
                return fetchPayment(bucket)
3✔
1308
        }
3✔
1309

1310
        // If we were not looking for the top level payment, we are looking for
1311
        // one of our duplicate payments. We need to iterate through the seq
1312
        // numbers in this bucket to find the correct payments. If we do not
1313
        // find a duplicate payments bucket here, something is wrong.
1314
        dup := bucket.NestedReadBucket(duplicatePaymentsBucket)
×
1315
        if dup == nil {
×
1316
                return nil, ErrNoDuplicateBucket
×
1317
        }
×
1318

1319
        var duplicatePayment *MPPayment
×
1320
        err = dup.ForEach(func(k, v []byte) error {
×
1321
                subBucket := dup.NestedReadBucket(k)
×
1322
                if subBucket == nil {
×
1323
                        // We one bucket for each duplicate to be found.
×
1324
                        return ErrNoDuplicateNestedBucket
×
1325
                }
×
1326

1327
                seqBytes := subBucket.Get(duplicatePaymentSequenceKey)
×
1328
                if seqBytes == nil {
×
1329
                        return err
×
1330
                }
×
1331

1332
                // If this duplicate payment is not the sequence number we are
1333
                // looking for, we can continue.
1334
                if !bytes.Equal(seqBytes, sequenceNumber) {
×
1335
                        return nil
×
1336
                }
×
1337

1338
                duplicatePayment, err = fetchDuplicatePayment(subBucket)
×
1339
                if err != nil {
×
1340
                        return err
×
1341
                }
×
1342

1343
                return nil
×
1344
        })
1345
        if err != nil {
×
1346
                return nil, err
×
1347
        }
×
1348

1349
        // If none of the duplicate payments matched our sequence number, we
1350
        // failed to find the payment with this sequence number; something is
1351
        // wrong.
1352
        if duplicatePayment == nil {
×
1353
                return nil, ErrDuplicateNotFound
×
1354
        }
×
1355

1356
        return duplicatePayment, nil
×
1357
}
1358

1359
// DeletePayment deletes a payment from the DB given its payment hash. If
1360
// failedAttemptsOnly is set, only failed HTLC attempts of the payment will be
1361
// deleted.
1362
func (s *KVStore) DeletePayment(paymentHash lntypes.Hash,
NEW
1363
        failedAttemptsOnly bool) error {
×
1364

×
NEW
1365
        return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
×
1366
                payments := tx.ReadWriteBucket(paymentsRootBucket)
×
1367
                if payments == nil {
×
1368
                        return nil
×
1369
                }
×
1370

1371
                bucket := payments.NestedReadWriteBucket(paymentHash[:])
×
1372
                if bucket == nil {
×
1373
                        return fmt.Errorf("non bucket element in payments " +
×
1374
                                "bucket")
×
1375
                }
×
1376

1377
                // If the status is InFlight, we cannot safely delete
1378
                // the payment information, so we return early.
1379
                paymentStatus, err := fetchPaymentStatus(bucket)
×
1380
                if err != nil {
×
1381
                        return err
×
1382
                }
×
1383

1384
                // If the payment has inflight HTLCs, we cannot safely delete
1385
                // the payment information, so we return an error.
NEW
1386
                if err := paymentStatus.Removable(); err != nil {
×
1387
                        return fmt.Errorf("payment '%v' has inflight HTLCs"+
×
1388
                                "and therefore cannot be deleted: %w",
×
1389
                                paymentHash.String(), err)
×
1390
                }
×
1391

1392
                // Delete the failed HTLC attempts we found.
NEW
1393
                if failedAttemptsOnly {
×
1394
                        toDelete, err := fetchFailedHtlcKeys(bucket)
×
1395
                        if err != nil {
×
1396
                                return err
×
1397
                        }
×
1398

1399
                        htlcsBucket := bucket.NestedReadWriteBucket(
×
1400
                                paymentHtlcsBucket,
×
1401
                        )
×
1402

×
1403
                        for _, htlcID := range toDelete {
×
1404
                                err = htlcsBucket.Delete(
×
1405
                                        htlcBucketKey(htlcAttemptInfoKey, htlcID),
×
1406
                                )
×
1407
                                if err != nil {
×
1408
                                        return err
×
1409
                                }
×
1410

1411
                                err = htlcsBucket.Delete(
×
1412
                                        htlcBucketKey(htlcFailInfoKey, htlcID),
×
1413
                                )
×
1414
                                if err != nil {
×
1415
                                        return err
×
1416
                                }
×
1417

1418
                                err = htlcsBucket.Delete(
×
1419
                                        htlcBucketKey(htlcSettleInfoKey, htlcID),
×
1420
                                )
×
1421
                                if err != nil {
×
1422
                                        return err
×
1423
                                }
×
1424
                        }
1425

1426
                        return nil
×
1427
                }
1428

1429
                seqNrs, err := fetchSequenceNumbers(bucket)
×
1430
                if err != nil {
×
1431
                        return err
×
1432
                }
×
1433

1434
                if err := payments.DeleteNestedBucket(paymentHash[:]); err != nil {
×
1435
                        return err
×
1436
                }
×
1437

1438
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
×
1439
                for _, k := range seqNrs {
×
1440
                        if err := indexBucket.Delete(k); err != nil {
×
1441
                                return err
×
1442
                        }
×
1443
                }
1444

1445
                return nil
×
1446
        }, func() {})
×
1447
}
1448

1449
// DeletePayments deletes all completed and failed payments from the DB. If
1450
// failedOnly is set, only failed payments will be considered for deletion. If
1451
// failedHtlcsOnly is set, the payment itself won't be deleted, only failed HTLC
1452
// attempts. The method returns the number of deleted payments, which is always
1453
// 0 if failedHtlcsOnly is set.
1454
func (s *KVStore) DeletePayments(failedOnly, failedAttemptsOnly bool) (int,
1455
        error) {
3✔
1456

3✔
1457
        var numPayments int
3✔
1458
        err := kvdb.Update(s.db, func(tx kvdb.RwTx) error {
6✔
1459
                payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
1460
                if payments == nil {
3✔
1461
                        return nil
×
1462
                }
×
1463

1464
                var (
3✔
1465
                        // deleteBuckets is the set of payment buckets we need
3✔
1466
                        // to delete.
3✔
1467
                        deleteBuckets [][]byte
3✔
1468

3✔
1469
                        // deleteIndexes is the set of indexes pointing to these
3✔
1470
                        // payments that need to be deleted.
3✔
1471
                        deleteIndexes [][]byte
3✔
1472

3✔
1473
                        // deleteHtlcs maps a payment hash to the HTLC IDs we
3✔
1474
                        // want to delete for that payment.
3✔
1475
                        deleteHtlcs = make(map[lntypes.Hash][][]byte)
3✔
1476
                )
3✔
1477
                err := payments.ForEach(func(k, _ []byte) error {
6✔
1478
                        bucket := payments.NestedReadBucket(k)
3✔
1479
                        if bucket == nil {
3✔
1480
                                // We only expect sub-buckets to be found in
×
1481
                                // this top-level bucket.
×
1482
                                return fmt.Errorf("non bucket element in " +
×
1483
                                        "payments bucket")
×
1484
                        }
×
1485

1486
                        // If the status is InFlight, we cannot safely delete
1487
                        // the payment information, so we return early.
1488
                        paymentStatus, err := fetchPaymentStatus(bucket)
3✔
1489
                        if err != nil {
3✔
1490
                                return err
×
1491
                        }
×
1492

1493
                        // If the payment has inflight HTLCs, we cannot safely
1494
                        // delete the payment information, so we return an nil
1495
                        // to skip it.
1496
                        if err := paymentStatus.Removable(); err != nil {
3✔
1497
                                return nil
×
1498
                        }
×
1499

1500
                        // If we requested to only delete failed payments, we
1501
                        // can return if this one is not.
1502
                        if failedOnly && paymentStatus != StatusFailed {
3✔
1503
                                return nil
×
1504
                        }
×
1505

1506
                        // If we are only deleting failed HTLCs, fetch them.
1507
                        if failedAttemptsOnly {
3✔
1508
                                toDelete, err := fetchFailedHtlcKeys(bucket)
×
1509
                                if err != nil {
×
1510
                                        return err
×
1511
                                }
×
1512

1513
                                hash, err := lntypes.MakeHash(k)
×
1514
                                if err != nil {
×
1515
                                        return err
×
1516
                                }
×
1517

1518
                                deleteHtlcs[hash] = toDelete
×
1519

×
1520
                                // We return, we are only deleting attempts.
×
1521
                                return nil
×
1522
                        }
1523

1524
                        // Add the bucket to the set of buckets we can delete.
1525
                        deleteBuckets = append(deleteBuckets, k)
3✔
1526

3✔
1527
                        // Get all the sequence number associated with the
3✔
1528
                        // payment, including duplicates.
3✔
1529
                        seqNrs, err := fetchSequenceNumbers(bucket)
3✔
1530
                        if err != nil {
3✔
1531
                                return err
×
1532
                        }
×
1533

1534
                        deleteIndexes = append(deleteIndexes, seqNrs...)
3✔
1535
                        numPayments++
3✔
1536
                        return nil
3✔
1537
                })
1538
                if err != nil {
3✔
1539
                        return err
×
1540
                }
×
1541

1542
                // Delete the failed HTLC attempts we found.
1543
                for hash, htlcIDs := range deleteHtlcs {
3✔
1544
                        bucket := payments.NestedReadWriteBucket(hash[:])
×
1545
                        htlcsBucket := bucket.NestedReadWriteBucket(
×
1546
                                paymentHtlcsBucket,
×
1547
                        )
×
1548

×
1549
                        for _, aid := range htlcIDs {
×
1550
                                if err := htlcsBucket.Delete(
×
1551
                                        htlcBucketKey(htlcAttemptInfoKey, aid),
×
1552
                                ); err != nil {
×
1553
                                        return err
×
1554
                                }
×
1555

1556
                                if err := htlcsBucket.Delete(
×
1557
                                        htlcBucketKey(htlcFailInfoKey, aid),
×
1558
                                ); err != nil {
×
1559
                                        return err
×
1560
                                }
×
1561

1562
                                if err := htlcsBucket.Delete(
×
1563
                                        htlcBucketKey(htlcSettleInfoKey, aid),
×
1564
                                ); err != nil {
×
1565
                                        return err
×
1566
                                }
×
1567
                        }
1568
                }
1569

1570
                for _, k := range deleteBuckets {
6✔
1571
                        if err := payments.DeleteNestedBucket(k); err != nil {
3✔
1572
                                return err
×
1573
                        }
×
1574
                }
1575

1576
                // Get our index bucket and delete all indexes pointing to the
1577
                // payments we are deleting.
1578
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
1579
                for _, k := range deleteIndexes {
6✔
1580
                        if err := indexBucket.Delete(k); err != nil {
3✔
1581
                                return err
×
1582
                        }
×
1583
                }
1584

1585
                return nil
3✔
1586
        }, func() {
3✔
1587
                numPayments = 0
3✔
1588
        })
3✔
1589
        if err != nil {
3✔
1590
                return 0, err
×
1591
        }
×
1592

1593
        return numPayments, nil
3✔
1594
}
1595

1596
// fetchSequenceNumbers fetches all the sequence numbers associated with a
1597
// payment, including those belonging to any duplicate payments.
1598
func fetchSequenceNumbers(paymentBucket kvdb.RBucket) ([][]byte, error) {
3✔
1599
        seqNum := paymentBucket.Get(paymentSequenceKey)
3✔
1600
        if seqNum == nil {
3✔
1601
                return nil, errors.New("expected sequence number")
×
1602
        }
×
1603

1604
        sequenceNumbers := [][]byte{seqNum}
3✔
1605

3✔
1606
        // Get the duplicate payments bucket, if it has no duplicates, just
3✔
1607
        // return early with the payment sequence number.
3✔
1608
        duplicates := paymentBucket.NestedReadBucket(duplicatePaymentsBucket)
3✔
1609
        if duplicates == nil {
6✔
1610
                return sequenceNumbers, nil
3✔
1611
        }
3✔
1612

1613
        // If we do have duplicated, they are keyed by sequence number, so we
1614
        // iterate through the duplicates bucket and add them to our set of
1615
        // sequence numbers.
1616
        if err := duplicates.ForEach(func(k, v []byte) error {
×
1617
                sequenceNumbers = append(sequenceNumbers, k)
×
1618
                return nil
×
1619
        }); err != nil {
×
1620
                return nil, err
×
1621
        }
×
1622

1623
        return sequenceNumbers, nil
×
1624
}
1625

1626
// nolint: dupl
1627
func serializePaymentCreationInfo(w io.Writer,
1628
        c *PaymentCreationInfo) error {
3✔
1629

3✔
1630
        var scratch [8]byte
3✔
1631

3✔
1632
        if _, err := w.Write(c.PaymentIdentifier[:]); err != nil {
3✔
1633
                return err
×
1634
        }
×
1635

1636
        byteOrder.PutUint64(scratch[:], uint64(c.Value))
3✔
1637
        if _, err := w.Write(scratch[:]); err != nil {
3✔
1638
                return err
×
1639
        }
×
1640

1641
        if err := serializeTime(w, c.CreationTime); err != nil {
3✔
1642
                return err
×
1643
        }
×
1644

1645
        byteOrder.PutUint32(scratch[:4], uint32(len(c.PaymentRequest)))
3✔
1646
        if _, err := w.Write(scratch[:4]); err != nil {
3✔
1647
                return err
×
1648
        }
×
1649

1650
        if _, err := w.Write(c.PaymentRequest[:]); err != nil {
3✔
1651
                return err
×
1652
        }
×
1653

1654
        // Any remaining bytes are TLV encoded records. Currently, these are
1655
        // only the custom records provided by the user to be sent to the first
1656
        // hos. But this can easily be extended with further records by merging
1657
        // the records into a single TLV stream.
1658
        err := c.FirstHopCustomRecords.SerializeTo(w)
3✔
1659
        if err != nil {
3✔
1660
                return err
×
1661
        }
×
1662

1663
        return nil
3✔
1664
}
1665

1666
func deserializePaymentCreationInfo(r io.Reader) (*PaymentCreationInfo,
1667
        error) {
3✔
1668

3✔
1669
        var scratch [8]byte
3✔
1670

3✔
1671
        c := &PaymentCreationInfo{}
3✔
1672

3✔
1673
        if _, err := io.ReadFull(r, c.PaymentIdentifier[:]); err != nil {
3✔
1674
                return nil, err
×
1675
        }
×
1676

1677
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
3✔
1678
                return nil, err
×
1679
        }
×
1680
        c.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
3✔
1681

3✔
1682
        creationTime, err := deserializeTime(r)
3✔
1683
        if err != nil {
3✔
1684
                return nil, err
×
1685
        }
×
1686
        c.CreationTime = creationTime
3✔
1687

3✔
1688
        if _, err := io.ReadFull(r, scratch[:4]); err != nil {
3✔
1689
                return nil, err
×
1690
        }
×
1691

1692
        reqLen := uint32(byteOrder.Uint32(scratch[:4]))
3✔
1693
        payReq := make([]byte, reqLen)
3✔
1694
        if reqLen > 0 {
6✔
1695
                if _, err := io.ReadFull(r, payReq); err != nil {
3✔
1696
                        return nil, err
×
1697
                }
×
1698
        }
1699
        c.PaymentRequest = payReq
3✔
1700

3✔
1701
        // Any remaining bytes are TLV encoded records. Currently, these are
3✔
1702
        // only the custom records provided by the user to be sent to the first
3✔
1703
        // hos. But this can easily be extended with further records by merging
3✔
1704
        // the records into a single TLV stream.
3✔
1705
        c.FirstHopCustomRecords, err = lnwire.ParseCustomRecordsFrom(r)
3✔
1706
        if err != nil {
3✔
1707
                return nil, err
×
1708
        }
×
1709

1710
        return c, nil
3✔
1711
}
1712

1713
func serializeHTLCAttemptInfo(w io.Writer, a *HTLCAttemptInfo) error {
3✔
1714
        // We nned to make sure the session key is 32 bytes, so we copy the
3✔
1715
        // session key into a 32 byte array.
3✔
1716
        sessionKeySlice := a.SessionKey().Serialize()
3✔
1717
        var sessionKey [btcec.PrivKeyBytesLen]byte
3✔
1718
        copy(sessionKey[:], sessionKeySlice)
3✔
1719
        if err := WriteElements(w, sessionKey); err != nil {
3✔
1720
                return err
×
1721
        }
×
1722

1723
        if err := SerializeRoute(w, a.Route); err != nil {
3✔
1724
                return err
×
1725
        }
×
1726

1727
        if err := serializeTime(w, a.AttemptTime); err != nil {
3✔
1728
                return err
×
1729
        }
×
1730

1731
        // If the hash is nil we can just return.
1732
        if a.Hash == nil {
3✔
1733
                return nil
×
1734
        }
×
1735

1736
        if _, err := w.Write(a.Hash[:]); err != nil {
3✔
1737
                return err
×
1738
        }
×
1739

1740
        // Merge the fixed/known records together with the custom records to
1741
        // serialize them as a single blob. We can't do this in SerializeRoute
1742
        // because we're in the middle of the byte stream there. We can only do
1743
        // TLV serialization at the end of the stream, since EOF is allowed for
1744
        // a stream if no more data is expected.
1745
        producers := []tlv.RecordProducer{
3✔
1746
                &a.Route.FirstHopAmount,
3✔
1747
        }
3✔
1748
        tlvData, err := lnwire.MergeAndEncode(
3✔
1749
                producers, nil, a.Route.FirstHopWireCustomRecords,
3✔
1750
        )
3✔
1751
        if err != nil {
3✔
1752
                return err
×
1753
        }
×
1754

1755
        if _, err := w.Write(tlvData); err != nil {
3✔
1756
                return err
×
1757
        }
×
1758

1759
        return nil
3✔
1760
}
1761

1762
func deserializeHTLCAttemptInfo(r io.Reader) (*HTLCAttemptInfo, error) {
3✔
1763
        a := &HTLCAttemptInfo{}
3✔
1764
        var sessionKey [btcec.PrivKeyBytesLen]byte
3✔
1765
        err := ReadElements(r, &sessionKey)
3✔
1766
        if err != nil {
3✔
1767
                return nil, err
×
1768
        }
×
1769
        a.SetSessionKey(sessionKey)
3✔
1770

3✔
1771
        a.Route, err = DeserializeRoute(r)
3✔
1772
        if err != nil {
3✔
1773
                return nil, err
×
1774
        }
×
1775

1776
        a.AttemptTime, err = deserializeTime(r)
3✔
1777
        if err != nil {
3✔
1778
                return nil, err
×
1779
        }
×
1780

1781
        hash := lntypes.Hash{}
3✔
1782
        _, err = io.ReadFull(r, hash[:])
3✔
1783

3✔
1784
        switch {
3✔
1785
        // Older payment attempts wouldn't have the hash set, in which case we
1786
        // can just return.
1787
        case err == io.EOF, err == io.ErrUnexpectedEOF:
×
1788
                return a, nil
×
1789

1790
        case err != nil:
×
1791
                return nil, err
×
1792

1793
        default:
3✔
1794
        }
1795

1796
        a.Hash = &hash
3✔
1797

3✔
1798
        // Read any remaining data (if any) and parse it into the known records
3✔
1799
        // and custom records.
3✔
1800
        extraData, err := io.ReadAll(r)
3✔
1801
        if err != nil {
3✔
1802
                return nil, err
×
1803
        }
×
1804

1805
        customRecords, _, _, err := lnwire.ParseAndExtractCustomRecords(
3✔
1806
                extraData, &a.Route.FirstHopAmount,
3✔
1807
        )
3✔
1808
        if err != nil {
3✔
1809
                return nil, err
×
1810
        }
×
1811

1812
        a.Route.FirstHopWireCustomRecords = customRecords
3✔
1813

3✔
1814
        return a, nil
3✔
1815
}
1816

1817
func serializeHop(w io.Writer, h *route.Hop) error {
3✔
1818
        if err := WriteElements(w,
3✔
1819
                h.PubKeyBytes[:],
3✔
1820
                h.ChannelID,
3✔
1821
                h.OutgoingTimeLock,
3✔
1822
                h.AmtToForward,
3✔
1823
        ); err != nil {
3✔
1824
                return err
×
1825
        }
×
1826

1827
        if err := binary.Write(w, byteOrder, h.LegacyPayload); err != nil {
3✔
1828
                return err
×
1829
        }
×
1830

1831
        // For legacy payloads, we don't need to write any TLV records, so
1832
        // we'll write a zero indicating the our serialized TLV map has no
1833
        // records.
1834
        if h.LegacyPayload {
3✔
1835
                return WriteElements(w, uint32(0))
×
1836
        }
×
1837

1838
        // Gather all non-primitive TLV records so that they can be serialized
1839
        // as a single blob.
1840
        //
1841
        // TODO(conner): add migration to unify all fields in a single TLV
1842
        // blobs. The split approach will cause headaches down the road as more
1843
        // fields are added, which we can avoid by having a single TLV stream
1844
        // for all payload fields.
1845
        var records []tlv.Record
3✔
1846
        if h.MPP != nil {
6✔
1847
                records = append(records, h.MPP.Record())
3✔
1848
        }
3✔
1849

1850
        // Add blinding point and encrypted data if present.
1851
        if h.EncryptedData != nil {
6✔
1852
                records = append(records, record.NewEncryptedDataRecord(
3✔
1853
                        &h.EncryptedData,
3✔
1854
                ))
3✔
1855
        }
3✔
1856

1857
        if h.BlindingPoint != nil {
6✔
1858
                records = append(records, record.NewBlindingPointRecord(
3✔
1859
                        &h.BlindingPoint,
3✔
1860
                ))
3✔
1861
        }
3✔
1862

1863
        if h.AMP != nil {
6✔
1864
                records = append(records, h.AMP.Record())
3✔
1865
        }
3✔
1866

1867
        if h.Metadata != nil {
3✔
1868
                records = append(records, record.NewMetadataRecord(&h.Metadata))
×
1869
        }
×
1870

1871
        if h.TotalAmtMsat != 0 {
6✔
1872
                totalMsatInt := uint64(h.TotalAmtMsat)
3✔
1873
                records = append(
3✔
1874
                        records, record.NewTotalAmtMsatBlinded(&totalMsatInt),
3✔
1875
                )
3✔
1876
        }
3✔
1877

1878
        // Final sanity check to absolutely rule out custom records that are not
1879
        // custom and write into the standard range.
1880
        if err := h.CustomRecords.Validate(); err != nil {
3✔
1881
                return err
×
1882
        }
×
1883

1884
        // Convert custom records to tlv and add to the record list.
1885
        // MapToRecords sorts the list, so adding it here will keep the list
1886
        // canonical.
1887
        tlvRecords := tlv.MapToRecords(h.CustomRecords)
3✔
1888
        records = append(records, tlvRecords...)
3✔
1889

3✔
1890
        // Otherwise, we'll transform our slice of records into a map of the
3✔
1891
        // raw bytes, then serialize them in-line with a length (number of
3✔
1892
        // elements) prefix.
3✔
1893
        mapRecords, err := tlv.RecordsToMap(records)
3✔
1894
        if err != nil {
3✔
1895
                return err
×
1896
        }
×
1897

1898
        numRecords := uint32(len(mapRecords))
3✔
1899
        if err := WriteElements(w, numRecords); err != nil {
3✔
1900
                return err
×
1901
        }
×
1902

1903
        for recordType, rawBytes := range mapRecords {
6✔
1904
                if err := WriteElements(w, recordType); err != nil {
3✔
1905
                        return err
×
1906
                }
×
1907

1908
                if err := wire.WriteVarBytes(w, 0, rawBytes); err != nil {
3✔
1909
                        return err
×
1910
                }
×
1911
        }
1912

1913
        return nil
3✔
1914
}
1915

1916
// maxOnionPayloadSize is the largest Sphinx payload possible, so we don't need
1917
// to read/write a TLV stream larger than this.
1918
const maxOnionPayloadSize = 1300
1919

1920
func deserializeHop(r io.Reader) (*route.Hop, error) {
3✔
1921
        h := &route.Hop{}
3✔
1922

3✔
1923
        var pub []byte
3✔
1924
        if err := ReadElements(r, &pub); err != nil {
3✔
1925
                return nil, err
×
1926
        }
×
1927
        copy(h.PubKeyBytes[:], pub)
3✔
1928

3✔
1929
        if err := ReadElements(r,
3✔
1930
                &h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward,
3✔
1931
        ); err != nil {
3✔
1932
                return nil, err
×
1933
        }
×
1934

1935
        // TODO(roasbeef): change field to allow LegacyPayload false to be the
1936
        // legacy default?
1937
        err := binary.Read(r, byteOrder, &h.LegacyPayload)
3✔
1938
        if err != nil {
3✔
1939
                return nil, err
×
1940
        }
×
1941

1942
        var numElements uint32
3✔
1943
        if err := ReadElements(r, &numElements); err != nil {
3✔
1944
                return nil, err
×
1945
        }
×
1946

1947
        // If there're no elements, then we can return early.
1948
        if numElements == 0 {
6✔
1949
                return h, nil
3✔
1950
        }
3✔
1951

1952
        tlvMap := make(map[uint64][]byte)
3✔
1953
        for i := uint32(0); i < numElements; i++ {
6✔
1954
                var tlvType uint64
3✔
1955
                if err := ReadElements(r, &tlvType); err != nil {
3✔
1956
                        return nil, err
×
1957
                }
×
1958

1959
                rawRecordBytes, err := wire.ReadVarBytes(
3✔
1960
                        r, 0, maxOnionPayloadSize, "tlv",
3✔
1961
                )
3✔
1962
                if err != nil {
3✔
1963
                        return nil, err
×
1964
                }
×
1965

1966
                tlvMap[tlvType] = rawRecordBytes
3✔
1967
        }
1968

1969
        // If the MPP type is present, remove it from the generic TLV map and
1970
        // parse it back into a proper MPP struct.
1971
        //
1972
        // TODO(conner): add migration to unify all fields in a single TLV
1973
        // blobs. The split approach will cause headaches down the road as more
1974
        // fields are added, which we can avoid by having a single TLV stream
1975
        // for all payload fields.
1976
        mppType := uint64(record.MPPOnionType)
3✔
1977
        if mppBytes, ok := tlvMap[mppType]; ok {
6✔
1978
                delete(tlvMap, mppType)
3✔
1979

3✔
1980
                var (
3✔
1981
                        mpp    = &record.MPP{}
3✔
1982
                        mppRec = mpp.Record()
3✔
1983
                        r      = bytes.NewReader(mppBytes)
3✔
1984
                )
3✔
1985
                err := mppRec.Decode(r, uint64(len(mppBytes)))
3✔
1986
                if err != nil {
3✔
1987
                        return nil, err
×
1988
                }
×
1989
                h.MPP = mpp
3✔
1990
        }
1991

1992
        // If encrypted data or blinding key are present, remove them from
1993
        // the TLV map and parse into proper types.
1994
        encryptedDataType := uint64(record.EncryptedDataOnionType)
3✔
1995
        if data, ok := tlvMap[encryptedDataType]; ok {
6✔
1996
                delete(tlvMap, encryptedDataType)
3✔
1997
                h.EncryptedData = data
3✔
1998
        }
3✔
1999

2000
        blindingType := uint64(record.BlindingPointOnionType)
3✔
2001
        if blindingPoint, ok := tlvMap[blindingType]; ok {
6✔
2002
                delete(tlvMap, blindingType)
3✔
2003

3✔
2004
                h.BlindingPoint, err = btcec.ParsePubKey(blindingPoint)
3✔
2005
                if err != nil {
3✔
2006
                        return nil, fmt.Errorf("invalid blinding point: %w",
×
2007
                                err)
×
2008
                }
×
2009
        }
2010

2011
        ampType := uint64(record.AMPOnionType)
3✔
2012
        if ampBytes, ok := tlvMap[ampType]; ok {
6✔
2013
                delete(tlvMap, ampType)
3✔
2014

3✔
2015
                var (
3✔
2016
                        amp    = &record.AMP{}
3✔
2017
                        ampRec = amp.Record()
3✔
2018
                        r      = bytes.NewReader(ampBytes)
3✔
2019
                )
3✔
2020
                err := ampRec.Decode(r, uint64(len(ampBytes)))
3✔
2021
                if err != nil {
3✔
2022
                        return nil, err
×
2023
                }
×
2024
                h.AMP = amp
3✔
2025
        }
2026

2027
        // If the metadata type is present, remove it from the tlv map and
2028
        // populate directly on the hos.
2029
        metadataType := uint64(record.MetadataOnionType)
3✔
2030
        if metadata, ok := tlvMap[metadataType]; ok {
3✔
2031
                delete(tlvMap, metadataType)
×
2032

×
2033
                h.Metadata = metadata
×
2034
        }
×
2035

2036
        totalAmtMsatType := uint64(record.TotalAmtMsatBlindedType)
3✔
2037
        if totalAmtMsat, ok := tlvMap[totalAmtMsatType]; ok {
6✔
2038
                delete(tlvMap, totalAmtMsatType)
3✔
2039

3✔
2040
                var (
3✔
2041
                        totalAmtMsatInt uint64
3✔
2042
                        buf             [8]byte
3✔
2043
                )
3✔
2044
                if err := tlv.DTUint64(
3✔
2045
                        bytes.NewReader(totalAmtMsat),
3✔
2046
                        &totalAmtMsatInt,
3✔
2047
                        &buf,
3✔
2048
                        uint64(len(totalAmtMsat)),
3✔
2049
                ); err != nil {
3✔
2050
                        return nil, err
×
2051
                }
×
2052

2053
                h.TotalAmtMsat = lnwire.MilliSatoshi(totalAmtMsatInt)
3✔
2054
        }
2055

2056
        h.CustomRecords = tlvMap
3✔
2057

3✔
2058
        return h, nil
3✔
2059
}
2060

2061
// SerializeRoute serializes a route.
2062
func SerializeRoute(w io.Writer, r route.Route) error {
3✔
2063
        if err := WriteElements(w,
3✔
2064
                r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:],
3✔
2065
        ); err != nil {
3✔
2066
                return err
×
2067
        }
×
2068

2069
        if err := WriteElements(w, uint32(len(r.Hops))); err != nil {
3✔
2070
                return err
×
2071
        }
×
2072

2073
        for _, h := range r.Hops {
6✔
2074
                if err := serializeHop(w, h); err != nil {
3✔
2075
                        return err
×
2076
                }
×
2077
        }
2078

2079
        // Any new/extra TLV data is encoded in serializeHTLCAttemptInfo!
2080

2081
        return nil
3✔
2082
}
2083

2084
// DeserializeRoute deserializes a route.
2085
func DeserializeRoute(r io.Reader) (route.Route, error) {
3✔
2086
        rt := route.Route{}
3✔
2087
        if err := ReadElements(r,
3✔
2088
                &rt.TotalTimeLock, &rt.TotalAmount,
3✔
2089
        ); err != nil {
3✔
2090
                return rt, err
×
2091
        }
×
2092

2093
        var pub []byte
3✔
2094
        if err := ReadElements(r, &pub); err != nil {
3✔
2095
                return rt, err
×
2096
        }
×
2097
        copy(rt.SourcePubKey[:], pub)
3✔
2098

3✔
2099
        var numHops uint32
3✔
2100
        if err := ReadElements(r, &numHops); err != nil {
3✔
2101
                return rt, err
×
2102
        }
×
2103

2104
        var hops []*route.Hop
3✔
2105
        for i := uint32(0); i < numHops; i++ {
6✔
2106
                hop, err := deserializeHop(r)
3✔
2107
                if err != nil {
3✔
2108
                        return rt, err
×
2109
                }
×
2110
                hops = append(hops, hop)
3✔
2111
        }
2112
        rt.Hops = hops
3✔
2113

3✔
2114
        // Any new/extra TLV data is decoded in deserializeHTLCAttemptInfo!
3✔
2115

3✔
2116
        return rt, nil
3✔
2117
}
2118

2119
// serializeHTLCSettleInfo serializes the details of a settled htlc.
2120
func serializeHTLCSettleInfo(w io.Writer, s *HTLCSettleInfo) error {
3✔
2121
        if _, err := w.Write(s.Preimage[:]); err != nil {
3✔
NEW
2122
                return err
×
NEW
2123
        }
×
2124

2125
        if err := serializeTime(w, s.SettleTime); err != nil {
3✔
NEW
2126
                return err
×
NEW
2127
        }
×
2128

2129
        return nil
3✔
2130
}
2131

2132
// deserializeHTLCSettleInfo deserializes the details of a settled htlc.
2133
func deserializeHTLCSettleInfo(r io.Reader) (*HTLCSettleInfo, error) {
3✔
2134
        s := &HTLCSettleInfo{}
3✔
2135
        if _, err := io.ReadFull(r, s.Preimage[:]); err != nil {
3✔
NEW
2136
                return nil, err
×
NEW
2137
        }
×
2138

2139
        var err error
3✔
2140
        s.SettleTime, err = deserializeTime(r)
3✔
2141
        if err != nil {
3✔
NEW
2142
                return nil, err
×
NEW
2143
        }
×
2144

2145
        return s, nil
3✔
2146
}
2147

2148
// serializeHTLCFailInfo serializes the details of a failed htlc including the
2149
// wire failure.
2150
func serializeHTLCFailInfo(w io.Writer, f *HTLCFailInfo) error {
3✔
2151
        if err := serializeTime(w, f.FailTime); err != nil {
3✔
NEW
2152
                return err
×
NEW
2153
        }
×
2154

2155
        // Write failure. If there is no failure message, write an empty
2156
        // byte slice.
2157
        var messageBytes bytes.Buffer
3✔
2158
        if f.Message != nil {
6✔
2159
                err := lnwire.EncodeFailureMessage(&messageBytes, f.Message, 0)
3✔
2160
                if err != nil {
3✔
NEW
2161
                        return err
×
NEW
2162
                }
×
2163
        }
2164
        if err := wire.WriteVarBytes(w, 0, messageBytes.Bytes()); err != nil {
3✔
NEW
2165
                return err
×
NEW
2166
        }
×
2167

2168
        return WriteElements(w, byte(f.Reason), f.FailureSourceIndex)
3✔
2169
}
2170

2171
// deserializeHTLCFailInfo deserializes the details of a failed htlc including
2172
// the wire failure.
2173
func deserializeHTLCFailInfo(r io.Reader) (*HTLCFailInfo, error) {
3✔
2174
        f := &HTLCFailInfo{}
3✔
2175
        var err error
3✔
2176
        f.FailTime, err = deserializeTime(r)
3✔
2177
        if err != nil {
3✔
NEW
2178
                return nil, err
×
NEW
2179
        }
×
2180

2181
        // Read failure.
2182
        failureBytes, err := wire.ReadVarBytes(
3✔
2183
                r, 0, math.MaxUint16, "failure",
3✔
2184
        )
3✔
2185
        if err != nil {
3✔
NEW
2186
                return nil, err
×
NEW
2187
        }
×
2188
        if len(failureBytes) > 0 {
6✔
2189
                f.Message, err = lnwire.DecodeFailureMessage(
3✔
2190
                        bytes.NewReader(failureBytes), 0,
3✔
2191
                )
3✔
2192
                if err != nil {
3✔
NEW
2193
                        return nil, err
×
NEW
2194
                }
×
2195
        }
2196

2197
        var reason byte
3✔
2198
        err = ReadElements(r, &reason, &f.FailureSourceIndex)
3✔
2199
        if err != nil {
3✔
NEW
2200
                return nil, err
×
NEW
2201
        }
×
2202
        f.Reason = HTLCFailReason(reason)
3✔
2203

3✔
2204
        return f, nil
3✔
2205
}
2206

2207
// deserializeTime deserializes time as unix nanoseconds.
2208
func deserializeTime(r io.Reader) (time.Time, error) {
3✔
2209
        var scratch [8]byte
3✔
2210
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
3✔
NEW
2211
                return time.Time{}, err
×
NEW
2212
        }
×
2213

2214
        // Convert to time.Time. Interpret unix nano time zero as a zero
2215
        // time.Time value.
2216
        unixNano := byteOrder.Uint64(scratch[:])
3✔
2217
        if unixNano == 0 {
3✔
NEW
2218
                return time.Time{}, nil
×
NEW
2219
        }
×
2220

2221
        return time.Unix(0, int64(unixNano)), nil
3✔
2222
}
2223

2224
// serializeTime serializes time as unix nanoseconds.
2225
func serializeTime(w io.Writer, t time.Time) error {
3✔
2226
        var scratch [8]byte
3✔
2227

3✔
2228
        // Convert to unix nano seconds, but only if time is non-zero. Calling
3✔
2229
        // UnixNano() on a zero time yields an undefined result.
3✔
2230
        var unixNano int64
3✔
2231
        if !t.IsZero() {
6✔
2232
                unixNano = t.UnixNano()
3✔
2233
        }
3✔
2234

2235
        byteOrder.PutUint64(scratch[:], uint64(unixNano))
3✔
2236
        _, err := w.Write(scratch[:])
3✔
2237
        return err
3✔
2238
}
2239

2240
// duplicateHTLCAttemptInfo contains static information about a specific HTLC
2241
// attempt for a payment. This information is used by the router to handle any
2242
// errors coming back after an attempt is made, and to query the switch about
2243
// the status of the attempt.
2244
type duplicateHTLCAttemptInfo struct {
2245
        // attemptID is the unique ID used for this attempt.
2246
        attemptID uint64
2247

2248
        // sessionKey is the ephemeral key used for this attempt.
2249
        sessionKey [btcec.PrivKeyBytesLen]byte
2250

2251
        // route is the route attempted to send the HTLC.
2252
        route route.Route
2253
}
2254

2255
// fetchDuplicatePaymentStatus fetches the payment status of the payment. If
2256
// the payment isn't found, it will return error `ErrPaymentNotInitiated`.
2257
func fetchDuplicatePaymentStatus(bucket kvdb.RBucket) (PaymentStatus,
NEW
2258
        error) {
×
NEW
2259

×
NEW
2260
        if bucket.Get(duplicatePaymentSettleInfoKey) != nil {
×
NEW
2261
                return StatusSucceeded, nil
×
NEW
2262
        }
×
2263

NEW
2264
        if bucket.Get(duplicatePaymentFailInfoKey) != nil {
×
NEW
2265
                return StatusFailed, nil
×
NEW
2266
        }
×
2267

NEW
2268
        if bucket.Get(duplicatePaymentCreationInfoKey) != nil {
×
NEW
2269
                return StatusInFlight, nil
×
NEW
2270
        }
×
2271

NEW
2272
        return 0, ErrPaymentNotInitiated
×
2273
}
2274

2275
func deserializeDuplicateHTLCAttemptInfo(r io.Reader) (
NEW
2276
        *duplicateHTLCAttemptInfo, error) {
×
NEW
2277

×
NEW
2278
        a := &duplicateHTLCAttemptInfo{}
×
NEW
2279
        err := ReadElements(r, &a.attemptID, &a.sessionKey)
×
NEW
2280
        if err != nil {
×
NEW
2281
                return nil, err
×
NEW
2282
        }
×
NEW
2283
        a.route, err = DeserializeRoute(r)
×
NEW
2284
        if err != nil {
×
NEW
2285
                return nil, err
×
NEW
2286
        }
×
NEW
2287
        return a, nil
×
2288
}
2289

2290
func deserializeDuplicatePaymentCreationInfo(r io.Reader) (
NEW
2291
        *PaymentCreationInfo, error) {
×
NEW
2292

×
NEW
2293
        var scratch [8]byte
×
NEW
2294

×
NEW
2295
        c := &PaymentCreationInfo{}
×
NEW
2296

×
NEW
2297
        if _, err := io.ReadFull(r, c.PaymentIdentifier[:]); err != nil {
×
NEW
2298
                return nil, err
×
NEW
2299
        }
×
2300

NEW
2301
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
×
NEW
2302
                return nil, err
×
NEW
2303
        }
×
NEW
2304
        c.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
×
NEW
2305

×
NEW
2306
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
×
NEW
2307
                return nil, err
×
NEW
2308
        }
×
NEW
2309
        c.CreationTime = time.Unix(int64(byteOrder.Uint64(scratch[:])), 0)
×
NEW
2310

×
NEW
2311
        if _, err := io.ReadFull(r, scratch[:4]); err != nil {
×
NEW
2312
                return nil, err
×
NEW
2313
        }
×
2314

NEW
2315
        reqLen := byteOrder.Uint32(scratch[:4])
×
NEW
2316
        payReq := make([]byte, reqLen)
×
NEW
2317
        if reqLen > 0 {
×
NEW
2318
                if _, err := io.ReadFull(r, payReq); err != nil {
×
NEW
2319
                        return nil, err
×
NEW
2320
                }
×
2321
        }
NEW
2322
        c.PaymentRequest = payReq
×
NEW
2323

×
NEW
2324
        return c, nil
×
2325
}
2326

NEW
2327
func fetchDuplicatePayment(bucket kvdb.RBucket) (*MPPayment, error) {
×
NEW
2328
        seqBytes := bucket.Get(duplicatePaymentSequenceKey)
×
NEW
2329
        if seqBytes == nil {
×
NEW
2330
                return nil, fmt.Errorf("sequence number not found")
×
NEW
2331
        }
×
2332

NEW
2333
        sequenceNum := binary.BigEndian.Uint64(seqBytes)
×
NEW
2334

×
NEW
2335
        // Get the payment status.
×
NEW
2336
        paymentStatus, err := fetchDuplicatePaymentStatus(bucket)
×
NEW
2337
        if err != nil {
×
NEW
2338
                return nil, err
×
NEW
2339
        }
×
2340

2341
        // Get the PaymentCreationInfo.
NEW
2342
        b := bucket.Get(duplicatePaymentCreationInfoKey)
×
NEW
2343
        if b == nil {
×
NEW
2344
                return nil, fmt.Errorf("creation info not found")
×
NEW
2345
        }
×
2346

NEW
2347
        r := bytes.NewReader(b)
×
NEW
2348
        creationInfo, err := deserializeDuplicatePaymentCreationInfo(r)
×
NEW
2349
        if err != nil {
×
NEW
2350
                return nil, err
×
NEW
2351
        }
×
2352

2353
        // Get failure reason if available.
NEW
2354
        var failureReason *FailureReason
×
NEW
2355
        b = bucket.Get(duplicatePaymentFailInfoKey)
×
NEW
2356
        if b != nil {
×
NEW
2357
                reason := FailureReason(b[0])
×
NEW
2358
                failureReason = &reason
×
NEW
2359
        }
×
2360

NEW
2361
        payment := &MPPayment{
×
NEW
2362
                SequenceNum:   sequenceNum,
×
NEW
2363
                Info:          creationInfo,
×
NEW
2364
                FailureReason: failureReason,
×
NEW
2365
                Status:        paymentStatus,
×
NEW
2366
        }
×
NEW
2367

×
NEW
2368
        // Get the HTLCAttemptInfo. It can be absent.
×
NEW
2369
        b = bucket.Get(duplicatePaymentAttemptInfoKey)
×
NEW
2370
        if b != nil {
×
NEW
2371
                r = bytes.NewReader(b)
×
NEW
2372
                attempt, err := deserializeDuplicateHTLCAttemptInfo(r)
×
NEW
2373
                if err != nil {
×
NEW
2374
                        return nil, err
×
NEW
2375
                }
×
2376

NEW
2377
                htlc := HTLCAttempt{
×
NEW
2378
                        HTLCAttemptInfo: HTLCAttemptInfo{
×
NEW
2379
                                AttemptID: attempt.attemptID,
×
NEW
2380
                                Route:     attempt.route,
×
NEW
2381
                        },
×
NEW
2382
                }
×
NEW
2383
                htlc.SetSessionKey(attempt.sessionKey)
×
NEW
2384

×
NEW
2385
                // Get the payment preimage. This is only found for
×
NEW
2386
                // successful payments.
×
NEW
2387
                b = bucket.Get(duplicatePaymentSettleInfoKey)
×
NEW
2388
                if b != nil {
×
NEW
2389
                        var preimg lntypes.Preimage
×
NEW
2390
                        copy(preimg[:], b)
×
NEW
2391

×
NEW
2392
                        htlc.Settle = &HTLCSettleInfo{
×
NEW
2393
                                Preimage:   preimg,
×
NEW
2394
                                SettleTime: time.Time{},
×
NEW
2395
                        }
×
NEW
2396
                } else {
×
NEW
2397
                        // Otherwise the payment must have failed.
×
NEW
2398
                        htlc.Failure = &HTLCFailInfo{
×
NEW
2399
                                FailTime: time.Time{},
×
NEW
2400
                        }
×
NEW
2401
                }
×
2402

NEW
2403
                payment.HTLCs = []HTLCAttempt{htlc}
×
2404
        }
2405

NEW
2406
        return payment, nil
×
2407
}
2408

2409
// fetchDuplicatePayments fetches all duplicate payments for a payment hash.
2410
func fetchDuplicatePayments(
NEW
2411
        paymentHashBucket kvdb.RBucket) ([]*MPPayment, error) {
×
NEW
2412

×
NEW
2413
        var payments []*MPPayment
×
NEW
2414

×
NEW
2415
        // For older versions of lnd, duplicate payments to a payment has was
×
NEW
2416
        // possible. These will be found in a sub-bucket indexed by their
×
NEW
2417
        // sequence number if available.
×
NEW
2418
        dup := paymentHashBucket.NestedReadBucket(duplicatePaymentsBucket)
×
NEW
2419
        if dup == nil {
×
NEW
2420
                return nil, nil
×
NEW
2421
        }
×
2422

NEW
2423
        err := dup.ForEach(func(k, v []byte) error {
×
NEW
2424
                subBucket := dup.NestedReadBucket(k)
×
NEW
2425
                if subBucket == nil {
×
NEW
2426
                        // We one bucket for each duplicate to be found.
×
NEW
2427
                        return fmt.Errorf("non bucket element" +
×
NEW
2428
                                "in duplicate bucket")
×
NEW
2429
                }
×
2430

NEW
2431
                p, err := fetchDuplicatePayment(subBucket)
×
NEW
2432
                if err != nil {
×
NEW
2433
                        return err
×
NEW
2434
                }
×
2435

NEW
2436
                payments = append(payments, p)
×
NEW
2437
                return nil
×
2438
        })
NEW
2439
        if err != nil {
×
NEW
2440
                return nil, err
×
NEW
2441
        }
×
2442

NEW
2443
        return payments, nil
×
2444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc