• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/channeldb/migration_01_to_11/migrations.go
1
package migration_01_to_11
2

3
import (
4
        "bytes"
5
        "crypto/sha256"
6
        "encoding/binary"
7
        "fmt"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
)
13

14
// MigrateNodeAndEdgeUpdateIndex is a migration function that will update the
15
// database from version 0 to version 1. In version 1, we add two new indexes
16
// (one for nodes and one for edges) to keep track of the last time a node or
17
// edge was updated on the network. These new indexes allow us to implement the
18
// new graph sync protocol added.
19
func MigrateNodeAndEdgeUpdateIndex(tx kvdb.RwTx) error {
×
20
        // First, we'll populating the node portion of the new index. Before we
×
21
        // can add new values to the index, we'll first create the new bucket
×
22
        // where these items will be housed.
×
23
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
×
24
        if err != nil {
×
25
                return fmt.Errorf("unable to create node bucket: %w", err)
×
26
        }
×
27
        nodeUpdateIndex, err := nodes.CreateBucketIfNotExists(
×
28
                nodeUpdateIndexBucket,
×
29
        )
×
30
        if err != nil {
×
31
                return fmt.Errorf("unable to create node update index: %w", err)
×
32
        }
×
33

34
        log.Infof("Populating new node update index bucket")
×
35

×
36
        // Now that we know the bucket has been created, we'll iterate over the
×
37
        // entire node bucket so we can add the (updateTime || nodePub) key
×
38
        // into the node update index.
×
39
        err = nodes.ForEach(func(nodePub, nodeInfo []byte) error {
×
40
                if len(nodePub) != 33 {
×
41
                        return nil
×
42
                }
×
43

44
                log.Tracef("Adding %x to node update index", nodePub)
×
45

×
46
                // The first 8 bytes of a node's serialize data is the update
×
47
                // time, so we can extract that without decoding the entire
×
48
                // structure.
×
49
                updateTime := nodeInfo[:8]
×
50

×
51
                // Now that we have the update time, we can construct the key
×
52
                // to insert into the index.
×
53
                var indexKey [8 + 33]byte
×
54
                copy(indexKey[:8], updateTime)
×
55
                copy(indexKey[8:], nodePub)
×
56

×
57
                return nodeUpdateIndex.Put(indexKey[:], nil)
×
58
        })
59
        if err != nil {
×
60
                return fmt.Errorf("unable to update node indexes: %w", err)
×
61
        }
×
62

63
        log.Infof("Populating new edge update index bucket")
×
64

×
65
        // With the set of nodes updated, we'll now update all edges to have a
×
66
        // corresponding entry in the edge update index.
×
67
        edges, err := tx.CreateTopLevelBucket(edgeBucket)
×
68
        if err != nil {
×
69
                return fmt.Errorf("unable to create edge bucket: %w", err)
×
70
        }
×
71
        edgeUpdateIndex, err := edges.CreateBucketIfNotExists(
×
72
                edgeUpdateIndexBucket,
×
73
        )
×
74
        if err != nil {
×
75
                return fmt.Errorf("unable to create edge update index: %w", err)
×
76
        }
×
77

78
        // We'll now run through each edge policy in the database, and update
79
        // the index to ensure each edge has the proper record.
80
        err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
×
81
                if len(edgeKey) != 41 {
×
82
                        return nil
×
83
                }
×
84

85
                // Now that we know this is the proper record, we'll grab the
86
                // channel ID (last 8 bytes of the key), and then decode the
87
                // edge policy so we can access the update time.
88
                chanID := edgeKey[33:]
×
89
                edgePolicyReader := bytes.NewReader(edgePolicyBytes)
×
90

×
91
                edgePolicy, err := deserializeChanEdgePolicy(
×
92
                        edgePolicyReader, nodes,
×
93
                )
×
94
                if err != nil {
×
95
                        return err
×
96
                }
×
97

98
                log.Tracef("Adding chan_id=%v to edge update index",
×
99
                        edgePolicy.ChannelID)
×
100

×
101
                // We'll now construct the index key using the channel ID, and
×
102
                // the last time it was updated: (updateTime || chanID).
×
103
                var indexKey [8 + 8]byte
×
104
                byteOrder.PutUint64(
×
105
                        indexKey[:], uint64(edgePolicy.LastUpdate.Unix()),
×
106
                )
×
107
                copy(indexKey[8:], chanID)
×
108

×
109
                return edgeUpdateIndex.Put(indexKey[:], nil)
×
110
        })
111
        if err != nil {
×
112
                return fmt.Errorf("unable to update edge indexes: %w", err)
×
113
        }
×
114

115
        log.Infof("Migration to node and edge update indexes complete!")
×
116

×
117
        return nil
×
118
}
119

120
// MigrateInvoiceTimeSeries is a database migration that assigns all existing
121
// invoices an index in the add and/or the settle index. Additionally, all
122
// existing invoices will have their bytes padded out in order to encode the
123
// add+settle index as well as the amount paid.
124
func MigrateInvoiceTimeSeries(tx kvdb.RwTx) error {
×
125
        invoices, err := tx.CreateTopLevelBucket(invoiceBucket)
×
126
        if err != nil {
×
127
                return err
×
128
        }
×
129

130
        addIndex, err := invoices.CreateBucketIfNotExists(
×
131
                addIndexBucket,
×
132
        )
×
133
        if err != nil {
×
134
                return err
×
135
        }
×
136
        settleIndex, err := invoices.CreateBucketIfNotExists(
×
137
                settleIndexBucket,
×
138
        )
×
139
        if err != nil {
×
140
                return err
×
141
        }
×
142

143
        log.Infof("Migrating invoice database to new time series format")
×
144

×
145
        // Now that we have all the buckets we need, we'll run through each
×
146
        // invoice in the database, and update it to reflect the new format
×
147
        // expected post migration.
×
148
        // NOTE: we store the converted invoices and put them back into the
×
149
        // database after the loop, since modifying the bucket within the
×
150
        // ForEach loop is not safe.
×
151
        var invoicesKeys [][]byte
×
152
        var invoicesValues [][]byte
×
153
        err = invoices.ForEach(func(invoiceNum, invoiceBytes []byte) error {
×
154
                // If this is a sub bucket, then we'll skip it.
×
155
                if invoiceBytes == nil {
×
156
                        return nil
×
157
                }
×
158

159
                // First, we'll make a copy of the encoded invoice bytes.
160
                invoiceBytesCopy := make([]byte, len(invoiceBytes))
×
161
                copy(invoiceBytesCopy, invoiceBytes)
×
162

×
163
                // With the bytes copied over, we'll append 24 additional
×
164
                // bytes. We do this so we can decode the invoice under the new
×
165
                // serialization format.
×
166
                padding := bytes.Repeat([]byte{0}, 24)
×
167
                invoiceBytesCopy = append(invoiceBytesCopy, padding...)
×
168

×
169
                invoiceReader := bytes.NewReader(invoiceBytesCopy)
×
170
                invoice, err := deserializeInvoiceLegacy(invoiceReader)
×
171
                if err != nil {
×
172
                        return fmt.Errorf("unable to decode invoice: %w", err)
×
173
                }
×
174

175
                // Now that we have the fully decoded invoice, we can update
176
                // the various indexes that we're added, and finally the
177
                // invoice itself before re-inserting it.
178

179
                // First, we'll get the new sequence in the addIndex in order
180
                // to create the proper mapping.
181
                nextAddSeqNo, err := addIndex.NextSequence()
×
182
                if err != nil {
×
183
                        return err
×
184
                }
×
185
                var seqNoBytes [8]byte
×
186
                byteOrder.PutUint64(seqNoBytes[:], nextAddSeqNo)
×
187
                err = addIndex.Put(seqNoBytes[:], invoiceNum[:])
×
188
                if err != nil {
×
189
                        return err
×
190
                }
×
191

192
                log.Tracef("Adding invoice (preimage=%x, add_index=%v) to add "+
×
193
                        "time series", invoice.Terms.PaymentPreimage[:],
×
194
                        nextAddSeqNo)
×
195

×
196
                // Next, we'll check if the invoice has been settled or not. If
×
197
                // so, then we'll also add it to the settle index.
×
198
                var nextSettleSeqNo uint64
×
199
                if invoice.Terms.State == ContractSettled {
×
200
                        nextSettleSeqNo, err = settleIndex.NextSequence()
×
201
                        if err != nil {
×
202
                                return err
×
203
                        }
×
204

205
                        var seqNoBytes [8]byte
×
206
                        byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo)
×
207
                        err := settleIndex.Put(seqNoBytes[:], invoiceNum)
×
208
                        if err != nil {
×
209
                                return err
×
210
                        }
×
211

212
                        invoice.AmtPaid = invoice.Terms.Value
×
213

×
214
                        log.Tracef("Adding invoice (preimage=%x, "+
×
215
                                "settle_index=%v) to add time series",
×
216
                                invoice.Terms.PaymentPreimage[:],
×
217
                                nextSettleSeqNo)
×
218
                }
219

220
                // Finally, we'll update the invoice itself with the new
221
                // indexing information as well as the amount paid if it has
222
                // been settled or not.
223
                invoice.AddIndex = nextAddSeqNo
×
224
                invoice.SettleIndex = nextSettleSeqNo
×
225

×
226
                // We've fully migrated an invoice, so we'll now update the
×
227
                // invoice in-place.
×
228
                var b bytes.Buffer
×
229
                if err := serializeInvoiceLegacy(&b, &invoice); err != nil {
×
230
                        return err
×
231
                }
×
232

233
                // Save the key and value pending update for after the ForEach
234
                // is done.
235
                invoicesKeys = append(invoicesKeys, invoiceNum)
×
236
                invoicesValues = append(invoicesValues, b.Bytes())
×
237
                return nil
×
238
        })
239
        if err != nil {
×
240
                return err
×
241
        }
×
242

243
        // Now put the converted invoices into the DB.
244
        for i := range invoicesKeys {
×
245
                key := invoicesKeys[i]
×
246
                value := invoicesValues[i]
×
247
                if err := invoices.Put(key, value); err != nil {
×
248
                        return err
×
249
                }
×
250
        }
251

252
        log.Infof("Migration to invoice time series index complete!")
×
253

×
254
        return nil
×
255
}
256

257
// MigrateInvoiceTimeSeriesOutgoingPayments is a follow up to the
258
// migrateInvoiceTimeSeries migration. As at the time of writing, the
259
// OutgoingPayment struct embeddeds an instance of the Invoice struct. As a
260
// result, we also need to migrate the internal invoice to the new format.
261
func MigrateInvoiceTimeSeriesOutgoingPayments(tx kvdb.RwTx) error {
×
262
        payBucket := tx.ReadWriteBucket(paymentBucket)
×
263
        if payBucket == nil {
×
264
                return nil
×
265
        }
×
266

267
        log.Infof("Migrating invoice database to new outgoing payment format")
×
268

×
269
        // We store the keys and values we want to modify since it is not safe
×
270
        // to modify them directly within the ForEach loop.
×
271
        var paymentKeys [][]byte
×
272
        var paymentValues [][]byte
×
273
        err := payBucket.ForEach(func(payID, paymentBytes []byte) error {
×
274
                log.Tracef("Migrating payment %x", payID[:])
×
275

×
276
                // The internal invoices for each payment only contain a
×
277
                // populated contract term, and creation date, as a result,
×
278
                // most of the bytes will be "empty".
×
279

×
280
                // We'll calculate the end of the invoice index assuming a
×
281
                // "minimal" index that's embedded within the greater
×
282
                // OutgoingPayment. The breakdown is:
×
283
                //  3 bytes empty var bytes, 16 bytes creation date, 16 bytes
×
284
                //  settled date, 32 bytes payment pre-image, 8 bytes value, 1
×
285
                //  byte settled.
×
286
                endOfInvoiceIndex := 1 + 1 + 1 + 16 + 16 + 32 + 8 + 1
×
287

×
288
                // We'll now extract the prefix of the pure invoice embedded
×
289
                // within.
×
290
                invoiceBytes := paymentBytes[:endOfInvoiceIndex]
×
291

×
292
                // With the prefix extracted, we'll copy over the invoice, and
×
293
                // also add padding for the new 24 bytes of fields, and finally
×
294
                // append the remainder of the outgoing payment.
×
295
                paymentCopy := make([]byte, len(invoiceBytes))
×
296
                copy(paymentCopy[:], invoiceBytes)
×
297

×
298
                padding := bytes.Repeat([]byte{0}, 24)
×
299
                paymentCopy = append(paymentCopy, padding...)
×
300
                paymentCopy = append(
×
301
                        paymentCopy, paymentBytes[endOfInvoiceIndex:]...,
×
302
                )
×
303

×
304
                // At this point, we now have the new format of the outgoing
×
305
                // payments, we'll attempt to deserialize it to ensure the
×
306
                // bytes are properly formatted.
×
307
                paymentReader := bytes.NewReader(paymentCopy)
×
308
                _, err := deserializeOutgoingPayment(paymentReader)
×
309
                if err != nil {
×
310
                        return fmt.Errorf("unable to deserialize payment: %w",
×
311
                                err)
×
312
                }
×
313

314
                // Now that we know the modifications was successful, we'll
315
                // store it to our slice of keys and values, and write it back
316
                // to disk in the new format after the ForEach loop is over.
317
                paymentKeys = append(paymentKeys, payID)
×
318
                paymentValues = append(paymentValues, paymentCopy)
×
319
                return nil
×
320
        })
321
        if err != nil {
×
322
                return err
×
323
        }
×
324

325
        // Finally store the updated payments to the bucket.
326
        for i := range paymentKeys {
×
327
                key := paymentKeys[i]
×
328
                value := paymentValues[i]
×
329
                if err := payBucket.Put(key, value); err != nil {
×
330
                        return err
×
331
                }
×
332
        }
333

334
        log.Infof("Migration to outgoing payment invoices complete!")
×
335

×
336
        return nil
×
337
}
338

339
// MigrateEdgePolicies is a migration function that will update the edges
340
// bucket. It ensure that edges with unknown policies will also have an entry
341
// in the bucket. After the migration, there will be two edge entries for
342
// every channel, regardless of whether the policies are known.
343
func MigrateEdgePolicies(tx kvdb.RwTx) error {
×
344
        nodes := tx.ReadWriteBucket(nodeBucket)
×
345
        if nodes == nil {
×
346
                return nil
×
347
        }
×
348

349
        edges := tx.ReadWriteBucket(edgeBucket)
×
350
        if edges == nil {
×
351
                return nil
×
352
        }
×
353

354
        edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
×
355
        if edgeIndex == nil {
×
356
                return nil
×
357
        }
×
358

359
        // checkKey gets the policy from the database with a low-level call
360
        // so that it is still possible to distinguish between unknown and
361
        // not present.
362
        checkKey := func(channelId uint64, keyBytes []byte) error {
×
363
                var channelID [8]byte
×
364
                byteOrder.PutUint64(channelID[:], channelId)
×
365

×
366
                _, err := fetchChanEdgePolicy(edges,
×
367
                        channelID[:], keyBytes, nodes)
×
368

×
369
                if err == ErrEdgeNotFound {
×
370
                        log.Tracef("Adding unknown edge policy present for node %x, channel %v",
×
371
                                keyBytes, channelId)
×
372

×
373
                        err := putChanEdgePolicyUnknown(edges, channelId, keyBytes)
×
374
                        if err != nil {
×
375
                                return err
×
376
                        }
×
377

378
                        return nil
×
379
                }
380

381
                return err
×
382
        }
383

384
        // Iterate over all channels and check both edge policies.
385
        err := edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
×
386
                infoReader := bytes.NewReader(edgeInfoBytes)
×
387
                edgeInfo, err := deserializeChanEdgeInfo(infoReader)
×
388
                if err != nil {
×
389
                        return err
×
390
                }
×
391

392
                for _, key := range [][]byte{edgeInfo.NodeKey1Bytes[:],
×
393
                        edgeInfo.NodeKey2Bytes[:]} {
×
394

×
395
                        if err := checkKey(edgeInfo.ChannelID, key); err != nil {
×
396
                                return err
×
397
                        }
×
398
                }
399

400
                return nil
×
401
        })
402

403
        if err != nil {
×
404
                return fmt.Errorf("unable to update edge policies: %w", err)
×
405
        }
×
406

407
        log.Infof("Migration of edge policies complete!")
×
408

×
409
        return nil
×
410
}
411

412
// PaymentStatusesMigration is a database migration intended for adding payment
413
// statuses for each existing payment entity in bucket to be able control
414
// transitions of statuses and prevent cases such as double payment
UNCOV
415
func PaymentStatusesMigration(tx kvdb.RwTx) error {
×
UNCOV
416
        // Get the bucket dedicated to storing statuses of payments,
×
UNCOV
417
        // where a key is payment hash, value is payment status.
×
UNCOV
418
        paymentStatuses, err := tx.CreateTopLevelBucket(paymentStatusBucket)
×
UNCOV
419
        if err != nil {
×
420
                return err
×
421
        }
×
422

UNCOV
423
        log.Infof("Migrating database to support payment statuses")
×
UNCOV
424

×
UNCOV
425
        circuitAddKey := []byte("circuit-adds")
×
UNCOV
426
        circuits := tx.ReadWriteBucket(circuitAddKey)
×
UNCOV
427
        if circuits != nil {
×
UNCOV
428
                log.Infof("Marking all known circuits with status InFlight")
×
UNCOV
429

×
UNCOV
430
                err = circuits.ForEach(func(k, v []byte) error {
×
UNCOV
431
                        // Parse the first 8 bytes as the short chan ID for the
×
UNCOV
432
                        // circuit. We'll skip all short chan IDs are not
×
UNCOV
433
                        // locally initiated, which includes all non-zero short
×
UNCOV
434
                        // chan ids.
×
UNCOV
435
                        chanID := binary.BigEndian.Uint64(k[:8])
×
UNCOV
436
                        if chanID != 0 {
×
UNCOV
437
                                return nil
×
UNCOV
438
                        }
×
439

440
                        // The payment hash is the third item in the serialized
441
                        // payment circuit. The first two items are an AddRef
442
                        // (10 bytes) and the incoming circuit key (16 bytes).
UNCOV
443
                        const payHashOffset = 10 + 16
×
UNCOV
444

×
UNCOV
445
                        paymentHash := v[payHashOffset : payHashOffset+32]
×
UNCOV
446

×
UNCOV
447
                        return paymentStatuses.Put(
×
UNCOV
448
                                paymentHash[:], StatusInFlight.Bytes(),
×
UNCOV
449
                        )
×
450
                })
UNCOV
451
                if err != nil {
×
452
                        return err
×
453
                }
×
454
        }
455

UNCOV
456
        log.Infof("Marking all existing payments with status Completed")
×
UNCOV
457

×
UNCOV
458
        // Get the bucket dedicated to storing payments
×
UNCOV
459
        bucket := tx.ReadWriteBucket(paymentBucket)
×
UNCOV
460
        if bucket == nil {
×
461
                return nil
×
462
        }
×
463

464
        // For each payment in the bucket, deserialize the payment and mark it
465
        // as completed.
UNCOV
466
        err = bucket.ForEach(func(k, v []byte) error {
×
UNCOV
467
                // Ignores if it is sub-bucket.
×
UNCOV
468
                if v == nil {
×
469
                        return nil
×
470
                }
×
471

UNCOV
472
                r := bytes.NewReader(v)
×
UNCOV
473
                payment, err := deserializeOutgoingPayment(r)
×
UNCOV
474
                if err != nil {
×
475
                        return err
×
476
                }
×
477

478
                // Calculate payment hash for current payment.
UNCOV
479
                paymentHash := sha256.Sum256(payment.PaymentPreimage[:])
×
UNCOV
480

×
UNCOV
481
                // Update status for current payment to completed. If it fails,
×
UNCOV
482
                // the migration is aborted and the payment bucket is returned
×
UNCOV
483
                // to its previous state.
×
UNCOV
484
                return paymentStatuses.Put(paymentHash[:], StatusSucceeded.Bytes())
×
485
        })
UNCOV
486
        if err != nil {
×
487
                return err
×
488
        }
×
489

UNCOV
490
        log.Infof("Migration of payment statuses complete!")
×
UNCOV
491

×
UNCOV
492
        return nil
×
493
}
494

495
// MigratePruneEdgeUpdateIndex is a database migration that attempts to resolve
496
// some lingering bugs with regards to edge policies and their update index.
497
// Stale entries within the edge update index were not being properly pruned due
498
// to a miscalculation on the offset of an edge's policy last update. This
499
// migration also fixes the case where the public keys within edge policies were
500
// being serialized with an extra byte, causing an even greater error when
501
// attempting to perform the offset calculation described earlier.
502
func MigratePruneEdgeUpdateIndex(tx kvdb.RwTx) error {
×
503
        // To begin the migration, we'll retrieve the update index bucket. If it
×
504
        // does not exist, we have nothing left to do so we can simply exit.
×
505
        edges := tx.ReadWriteBucket(edgeBucket)
×
506
        if edges == nil {
×
507
                return nil
×
508
        }
×
509
        edgeUpdateIndex := edges.NestedReadWriteBucket(edgeUpdateIndexBucket)
×
510
        if edgeUpdateIndex == nil {
×
511
                return nil
×
512
        }
×
513

514
        // Retrieve some buckets that will be needed later on. These should
515
        // already exist given the assumption that the buckets above do as
516
        // well.
517
        edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
×
518
        if err != nil {
×
519
                return fmt.Errorf("error creating edge index bucket: %w", err)
×
520
        }
×
521
        if edgeIndex == nil {
×
522
                return fmt.Errorf("unable to create/fetch edge index " +
×
523
                        "bucket")
×
524
        }
×
525
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
×
526
        if err != nil {
×
527
                return fmt.Errorf("unable to make node bucket")
×
528
        }
×
529

530
        log.Info("Migrating database to properly prune edge update index")
×
531

×
532
        // We'll need to properly prune all the outdated entries within the edge
×
533
        // update index. To do so, we'll gather all of the existing policies
×
534
        // within the graph to re-populate them later on.
×
535
        var edgeKeys [][]byte
×
536
        err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
×
537
                // All valid entries are indexed by a public key (33 bytes)
×
538
                // followed by a channel ID (8 bytes), so we'll skip any entries
×
539
                // with keys that do not match this.
×
540
                if len(edgeKey) != 33+8 {
×
541
                        return nil
×
542
                }
×
543

544
                edgeKeys = append(edgeKeys, edgeKey)
×
545

×
546
                return nil
×
547
        })
548
        if err != nil {
×
549
                return fmt.Errorf("unable to gather existing edge policies: %w",
×
550
                        err)
×
551
        }
×
552

553
        log.Info("Constructing set of edge update entries to purge.")
×
554

×
555
        // Build the set of keys that we will remove from the edge update index.
×
556
        // This will include all keys contained within the bucket.
×
557
        var updateKeysToRemove [][]byte
×
558
        err = edgeUpdateIndex.ForEach(func(updKey, _ []byte) error {
×
559
                updateKeysToRemove = append(updateKeysToRemove, updKey)
×
560
                return nil
×
561
        })
×
562
        if err != nil {
×
563
                return fmt.Errorf("unable to gather existing edge updates: %w",
×
564
                        err)
×
565
        }
×
566

567
        log.Infof("Removing %d entries from edge update index.",
×
568
                len(updateKeysToRemove))
×
569

×
570
        // With the set of keys contained in the edge update index constructed,
×
571
        // we'll proceed in purging all of them from the index.
×
572
        for _, updKey := range updateKeysToRemove {
×
573
                if err := edgeUpdateIndex.Delete(updKey); err != nil {
×
574
                        return err
×
575
                }
×
576
        }
577

578
        log.Infof("Repopulating edge update index with %d valid entries.",
×
579
                len(edgeKeys))
×
580

×
581
        // For each edge key, we'll retrieve the policy, deserialize it, and
×
582
        // re-add it to the different buckets. By doing so, we'll ensure that
×
583
        // all existing edge policies are serialized correctly within their
×
584
        // respective buckets and that the correct entries are populated within
×
585
        // the edge update index.
×
586
        for _, edgeKey := range edgeKeys {
×
587
                edgePolicyBytes := edges.Get(edgeKey)
×
588

×
589
                // Skip any entries with unknown policies as there will not be
×
590
                // any entries for them in the edge update index.
×
591
                if bytes.Equal(edgePolicyBytes[:], unknownPolicy) {
×
592
                        continue
×
593
                }
594

595
                edgePolicy, err := deserializeChanEdgePolicy(
×
596
                        bytes.NewReader(edgePolicyBytes), nodes,
×
597
                )
×
598
                if err != nil {
×
599
                        return err
×
600
                }
×
601

602
                _, err = updateEdgePolicy(tx, edgePolicy)
×
603
                if err != nil {
×
604
                        return err
×
605
                }
×
606
        }
607

608
        log.Info("Migration to properly prune edge update index complete!")
×
609

×
610
        return nil
×
611
}
612

613
// MigrateOptionalChannelCloseSummaryFields migrates the serialized format of
614
// ChannelCloseSummary to a format where optional fields' presence is indicated
615
// with boolean markers.
UNCOV
616
func MigrateOptionalChannelCloseSummaryFields(tx kvdb.RwTx) error {
×
UNCOV
617
        closedChanBucket := tx.ReadWriteBucket(closedChannelBucket)
×
UNCOV
618
        if closedChanBucket == nil {
×
619
                return nil
×
620
        }
×
621

UNCOV
622
        log.Info("Migrating to new closed channel format...")
×
UNCOV
623

×
UNCOV
624
        // We store the converted keys and values and put them back into the
×
UNCOV
625
        // database after the loop, since modifying the bucket within the
×
UNCOV
626
        // ForEach loop is not safe.
×
UNCOV
627
        var closedChansKeys [][]byte
×
UNCOV
628
        var closedChansValues [][]byte
×
UNCOV
629
        err := closedChanBucket.ForEach(func(chanID, summary []byte) error {
×
UNCOV
630
                r := bytes.NewReader(summary)
×
UNCOV
631

×
UNCOV
632
                // Read the old (v6) format from the database.
×
UNCOV
633
                c, err := deserializeCloseChannelSummaryV6(r)
×
UNCOV
634
                if err != nil {
×
635
                        return err
×
636
                }
×
637

638
                // Serialize using the new format, and put back into the
639
                // bucket.
UNCOV
640
                var b bytes.Buffer
×
UNCOV
641
                if err := serializeChannelCloseSummary(&b, c); err != nil {
×
642
                        return err
×
643
                }
×
644

645
                // Now that we know the modifications was successful, we'll
646
                // Store the key and value to our slices, and write it back to
647
                // disk in the new format after the ForEach loop is over.
UNCOV
648
                closedChansKeys = append(closedChansKeys, chanID)
×
UNCOV
649
                closedChansValues = append(closedChansValues, b.Bytes())
×
UNCOV
650
                return nil
×
651
        })
UNCOV
652
        if err != nil {
×
653
                return fmt.Errorf("unable to update closed channels: %w", err)
×
654
        }
×
655

656
        // Now put the new format back into the DB.
UNCOV
657
        for i := range closedChansKeys {
×
UNCOV
658
                key := closedChansKeys[i]
×
UNCOV
659
                value := closedChansValues[i]
×
UNCOV
660
                if err := closedChanBucket.Put(key, value); err != nil {
×
661
                        return err
×
662
                }
×
663
        }
664

UNCOV
665
        log.Info("Migration to new closed channel format complete!")
×
UNCOV
666

×
UNCOV
667
        return nil
×
668
}
669

670
var messageStoreBucket = []byte("message-store")
671

672
// MigrateGossipMessageStoreKeys migrates the key format for gossip messages
673
// found in the message store to a new one that takes into consideration the of
674
// the message being stored.
UNCOV
675
func MigrateGossipMessageStoreKeys(tx kvdb.RwTx) error {
×
UNCOV
676
        // We'll start by retrieving the bucket in which these messages are
×
UNCOV
677
        // stored within. If there isn't one, there's nothing left for us to do
×
UNCOV
678
        // so we can avoid the migration.
×
UNCOV
679
        messageStore := tx.ReadWriteBucket(messageStoreBucket)
×
UNCOV
680
        if messageStore == nil {
×
681
                return nil
×
682
        }
×
683

UNCOV
684
        log.Info("Migrating to the gossip message store new key format")
×
UNCOV
685

×
UNCOV
686
        // Otherwise we'll proceed with the migration. We'll start by coalescing
×
UNCOV
687
        // all the current messages within the store, which are indexed by the
×
UNCOV
688
        // public key of the peer which they should be sent to, followed by the
×
UNCOV
689
        // short channel ID of the channel for which the message belongs to. We
×
UNCOV
690
        // should only expect to find channel announcement signatures as that
×
UNCOV
691
        // was the only support message type previously.
×
UNCOV
692
        msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures)
×
UNCOV
693
        err := messageStore.ForEach(func(k, v []byte) error {
×
UNCOV
694
                var msgKey [33 + 8]byte
×
UNCOV
695
                copy(msgKey[:], k)
×
UNCOV
696

×
UNCOV
697
                msg := &lnwire.AnnounceSignatures{}
×
UNCOV
698
                if err := msg.Decode(bytes.NewReader(v), 0); err != nil {
×
699
                        return err
×
700
                }
×
701

UNCOV
702
                msgs[msgKey] = msg
×
UNCOV
703

×
UNCOV
704
                return nil
×
705

706
        })
UNCOV
707
        if err != nil {
×
708
                return err
×
709
        }
×
710

711
        // Then, we'll go over all of our messages, remove their previous entry,
712
        // and add another with the new key format. Once we've done this for
713
        // every message, we can consider the migration complete.
UNCOV
714
        for oldMsgKey, msg := range msgs {
×
UNCOV
715
                if err := messageStore.Delete(oldMsgKey[:]); err != nil {
×
716
                        return err
×
717
                }
×
718

719
                // Construct the new key for which we'll find this message with
720
                // in the store. It'll be the same as the old, but we'll also
721
                // include the message type.
UNCOV
722
                var msgType [2]byte
×
UNCOV
723
                binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType()))
×
UNCOV
724
                newMsgKey := append(oldMsgKey[:], msgType[:]...)
×
UNCOV
725

×
UNCOV
726
                // Serialize the message with its wire encoding.
×
UNCOV
727
                var b bytes.Buffer
×
UNCOV
728
                if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
×
729
                        return err
×
730
                }
×
731

UNCOV
732
                if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil {
×
733
                        return err
×
734
                }
×
735
        }
736

UNCOV
737
        log.Info("Migration to the gossip message store new key format complete!")
×
UNCOV
738

×
UNCOV
739
        return nil
×
740
}
741

742
// MigrateOutgoingPayments moves the OutgoingPayments into a new bucket format
743
// where they all reside in a top-level bucket indexed by the payment hash. In
744
// this sub-bucket we store information relevant to this payment, such as the
745
// payment status.
746
//
747
// Since the router cannot handle resumed payments that have the status
748
// InFlight (we have no PaymentAttemptInfo available for pre-migration
749
// payments) we delete those statuses, so only Completed payments remain in the
750
// new bucket structure.
UNCOV
751
func MigrateOutgoingPayments(tx kvdb.RwTx) error {
×
UNCOV
752
        log.Infof("Migrating outgoing payments to new bucket structure")
×
UNCOV
753

×
UNCOV
754
        oldPayments := tx.ReadWriteBucket(paymentBucket)
×
UNCOV
755

×
UNCOV
756
        // Return early if there are no payments to migrate.
×
UNCOV
757
        if oldPayments == nil {
×
758
                log.Infof("No outgoing payments found, nothing to migrate.")
×
759
                return nil
×
760
        }
×
761

UNCOV
762
        newPayments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
×
UNCOV
763
        if err != nil {
×
764
                return err
×
765
        }
×
766

767
        // Helper method to get the source pubkey. We define it such that we
768
        // only attempt to fetch it if needed.
UNCOV
769
        sourcePub := func() ([33]byte, error) {
×
UNCOV
770
                var pub [33]byte
×
UNCOV
771
                nodes := tx.ReadWriteBucket(nodeBucket)
×
UNCOV
772
                if nodes == nil {
×
773
                        return pub, ErrGraphNotFound
×
774
                }
×
775

UNCOV
776
                selfPub := nodes.Get(sourceKey)
×
UNCOV
777
                if selfPub == nil {
×
778
                        return pub, ErrSourceNodeNotSet
×
779
                }
×
UNCOV
780
                copy(pub[:], selfPub[:])
×
UNCOV
781
                return pub, nil
×
782
        }
783

UNCOV
784
        err = oldPayments.ForEach(func(k, v []byte) error {
×
UNCOV
785
                // Ignores if it is sub-bucket.
×
UNCOV
786
                if v == nil {
×
787
                        return nil
×
788
                }
×
789

790
                // Read the old payment format.
UNCOV
791
                r := bytes.NewReader(v)
×
UNCOV
792
                payment, err := deserializeOutgoingPayment(r)
×
UNCOV
793
                if err != nil {
×
794
                        return err
×
795
                }
×
796

797
                // Calculate payment hash from the payment preimage.
UNCOV
798
                paymentHash := sha256.Sum256(payment.PaymentPreimage[:])
×
UNCOV
799

×
UNCOV
800
                // Now create and add a PaymentCreationInfo to the bucket.
×
UNCOV
801
                c := &PaymentCreationInfo{
×
UNCOV
802
                        PaymentHash:    paymentHash,
×
UNCOV
803
                        Value:          payment.Terms.Value,
×
UNCOV
804
                        CreationDate:   payment.CreationDate,
×
UNCOV
805
                        PaymentRequest: payment.PaymentRequest,
×
UNCOV
806
                }
×
UNCOV
807

×
UNCOV
808
                var infoBuf bytes.Buffer
×
UNCOV
809
                if err := serializePaymentCreationInfo(&infoBuf, c); err != nil {
×
810
                        return err
×
811
                }
×
812

UNCOV
813
                sourcePubKey, err := sourcePub()
×
UNCOV
814
                if err != nil {
×
815
                        return err
×
816
                }
×
817

818
                // Do the same for the PaymentAttemptInfo.
UNCOV
819
                totalAmt := payment.Terms.Value + payment.Fee
×
UNCOV
820
                rt := Route{
×
UNCOV
821
                        TotalTimeLock: payment.TimeLockLength,
×
UNCOV
822
                        TotalAmount:   totalAmt,
×
UNCOV
823
                        SourcePubKey:  sourcePubKey,
×
UNCOV
824
                        Hops:          []*Hop{},
×
UNCOV
825
                }
×
UNCOV
826
                for _, hop := range payment.Path {
×
UNCOV
827
                        rt.Hops = append(rt.Hops, &Hop{
×
UNCOV
828
                                PubKeyBytes:  hop,
×
UNCOV
829
                                AmtToForward: totalAmt,
×
UNCOV
830
                        })
×
UNCOV
831
                }
×
832

833
                // Since the old format didn't store the fee for individual
834
                // hops, we let the last hop eat the whole fee for the total to
835
                // add up.
UNCOV
836
                if len(rt.Hops) > 0 {
×
UNCOV
837
                        rt.Hops[len(rt.Hops)-1].AmtToForward = payment.Terms.Value
×
UNCOV
838
                }
×
839

840
                // Since we don't have the session key for old payments, we
841
                // create a random one to be able to serialize the attempt
842
                // info.
UNCOV
843
                priv, _ := btcec.NewPrivateKey()
×
UNCOV
844
                s := &PaymentAttemptInfo{
×
UNCOV
845
                        PaymentID:  0,    // unknown.
×
UNCOV
846
                        SessionKey: priv, // unknown.
×
UNCOV
847
                        Route:      rt,
×
UNCOV
848
                }
×
UNCOV
849

×
UNCOV
850
                var attemptBuf bytes.Buffer
×
UNCOV
851
                if err := serializePaymentAttemptInfoMigration9(&attemptBuf, s); err != nil {
×
852
                        return err
×
853
                }
×
854

855
                // Reuse the existing payment sequence number.
UNCOV
856
                var seqNum [8]byte
×
UNCOV
857
                copy(seqNum[:], k)
×
UNCOV
858

×
UNCOV
859
                // Create a bucket indexed by the payment hash.
×
UNCOV
860
                bucket, err := newPayments.CreateBucket(paymentHash[:])
×
UNCOV
861

×
UNCOV
862
                // If the bucket already exists, it means that we are migrating
×
UNCOV
863
                // from a database containing duplicate payments to a payment
×
UNCOV
864
                // hash. To keep this information, we store such duplicate
×
UNCOV
865
                // payments in a sub-bucket.
×
UNCOV
866
                if err == kvdb.ErrBucketExists {
×
UNCOV
867
                        pHashBucket := newPayments.NestedReadWriteBucket(paymentHash[:])
×
UNCOV
868

×
UNCOV
869
                        // Create a bucket for duplicate payments within this
×
UNCOV
870
                        // payment hash's bucket.
×
UNCOV
871
                        dup, err := pHashBucket.CreateBucketIfNotExists(
×
UNCOV
872
                                paymentDuplicateBucket,
×
UNCOV
873
                        )
×
UNCOV
874
                        if err != nil {
×
875
                                return err
×
876
                        }
×
877

878
                        // Each duplicate will get its own sub-bucket within
879
                        // this bucket, so use their sequence number to index
880
                        // them by.
UNCOV
881
                        bucket, err = dup.CreateBucket(seqNum[:])
×
UNCOV
882
                        if err != nil {
×
883
                                return err
×
884
                        }
×
885

UNCOV
886
                } else if err != nil {
×
887
                        return err
×
888
                }
×
889

890
                // Store the payment's information to the bucket.
UNCOV
891
                err = bucket.Put(paymentSequenceKey, seqNum[:])
×
UNCOV
892
                if err != nil {
×
893
                        return err
×
894
                }
×
895

UNCOV
896
                err = bucket.Put(paymentCreationInfoKey, infoBuf.Bytes())
×
UNCOV
897
                if err != nil {
×
898
                        return err
×
899
                }
×
900

UNCOV
901
                err = bucket.Put(paymentAttemptInfoKey, attemptBuf.Bytes())
×
UNCOV
902
                if err != nil {
×
903
                        return err
×
904
                }
×
905

UNCOV
906
                err = bucket.Put(paymentSettleInfoKey, payment.PaymentPreimage[:])
×
UNCOV
907
                if err != nil {
×
908
                        return err
×
909
                }
×
910

UNCOV
911
                return nil
×
912
        })
UNCOV
913
        if err != nil {
×
914
                return err
×
915
        }
×
916

917
        // To continue producing unique sequence numbers, we set the sequence
918
        // of the new bucket to that of the old one.
UNCOV
919
        seq := oldPayments.Sequence()
×
UNCOV
920
        if err := newPayments.SetSequence(seq); err != nil {
×
921
                return err
×
922
        }
×
923

924
        // Now we delete the old buckets. Deleting the payment status buckets
925
        // deletes all payment statuses other than Complete.
UNCOV
926
        err = tx.DeleteTopLevelBucket(paymentStatusBucket)
×
UNCOV
927
        if err != nil && err != kvdb.ErrBucketNotFound {
×
928
                return err
×
929
        }
×
930

931
        // Finally delete the old payment bucket.
UNCOV
932
        err = tx.DeleteTopLevelBucket(paymentBucket)
×
UNCOV
933
        if err != nil && err != kvdb.ErrBucketNotFound {
×
934
                return err
×
935
        }
×
936

UNCOV
937
        log.Infof("Migration of outgoing payment bucket structure completed!")
×
UNCOV
938
        return nil
×
939
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc