• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13980275562

20 Mar 2025 10:06PM UTC coverage: 58.6% (-10.2%) from 68.789%
13980275562

Pull #9623

github

web-flow
Merge b9b960345 into 09b674508
Pull Request #9623: Size msg test msg

0 of 1518 new or added lines in 42 files covered. (0.0%)

26603 existing lines in 443 files now uncovered.

96807 of 165200 relevant lines covered (58.6%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/channeldb/migration_01_to_11/migrations.go
1
package migration_01_to_11
2

3
import (
4
        "bytes"
5
        "crypto/sha256"
6
        "encoding/binary"
7
        "fmt"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
)
13

14
// MigrateNodeAndEdgeUpdateIndex is a migration function that will update the
15
// database from version 0 to version 1. In version 1, we add two new indexes
16
// (one for nodes and one for edges) to keep track of the last time a node or
17
// edge was updated on the network. These new indexes allow us to implement the
18
// new graph sync protocol added.
19
func MigrateNodeAndEdgeUpdateIndex(tx kvdb.RwTx) error {
×
20
        // First, we'll populating the node portion of the new index. Before we
×
21
        // can add new values to the index, we'll first create the new bucket
×
22
        // where these items will be housed.
×
23
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
×
24
        if err != nil {
×
25
                return fmt.Errorf("unable to create node bucket: %w", err)
×
26
        }
×
27
        nodeUpdateIndex, err := nodes.CreateBucketIfNotExists(
×
28
                nodeUpdateIndexBucket,
×
29
        )
×
30
        if err != nil {
×
31
                return fmt.Errorf("unable to create node update index: %w", err)
×
32
        }
×
33

34
        log.Infof("Populating new node update index bucket")
×
35

×
36
        // Now that we know the bucket has been created, we'll iterate over the
×
37
        // entire node bucket so we can add the (updateTime || nodePub) key
×
38
        // into the node update index.
×
39
        err = nodes.ForEach(func(nodePub, nodeInfo []byte) error {
×
40
                if len(nodePub) != 33 {
×
41
                        return nil
×
42
                }
×
43

44
                log.Tracef("Adding %x to node update index", nodePub)
×
45

×
46
                // The first 8 bytes of a node's serialize data is the update
×
47
                // time, so we can extract that without decoding the entire
×
48
                // structure.
×
49
                updateTime := nodeInfo[:8]
×
50

×
51
                // Now that we have the update time, we can construct the key
×
52
                // to insert into the index.
×
53
                var indexKey [8 + 33]byte
×
54
                copy(indexKey[:8], updateTime)
×
55
                copy(indexKey[8:], nodePub)
×
56

×
57
                return nodeUpdateIndex.Put(indexKey[:], nil)
×
58
        })
59
        if err != nil {
×
60
                return fmt.Errorf("unable to update node indexes: %w", err)
×
61
        }
×
62

63
        log.Infof("Populating new edge update index bucket")
×
64

×
65
        // With the set of nodes updated, we'll now update all edges to have a
×
66
        // corresponding entry in the edge update index.
×
67
        edges, err := tx.CreateTopLevelBucket(edgeBucket)
×
68
        if err != nil {
×
69
                return fmt.Errorf("unable to create edge bucket: %w", err)
×
70
        }
×
71
        edgeUpdateIndex, err := edges.CreateBucketIfNotExists(
×
72
                edgeUpdateIndexBucket,
×
73
        )
×
74
        if err != nil {
×
75
                return fmt.Errorf("unable to create edge update index: %w", err)
×
76
        }
×
77

78
        // We'll now run through each edge policy in the database, and update
79
        // the index to ensure each edge has the proper record.
80
        err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
×
81
                if len(edgeKey) != 41 {
×
82
                        return nil
×
83
                }
×
84

85
                // Now that we know this is the proper record, we'll grab the
86
                // channel ID (last 8 bytes of the key), and then decode the
87
                // edge policy so we can access the update time.
88
                chanID := edgeKey[33:]
×
89
                edgePolicyReader := bytes.NewReader(edgePolicyBytes)
×
90

×
91
                edgePolicy, err := deserializeChanEdgePolicy(
×
92
                        edgePolicyReader, nodes,
×
93
                )
×
94
                if err != nil {
×
95
                        return err
×
96
                }
×
97

98
                log.Tracef("Adding chan_id=%v to edge update index",
×
99
                        edgePolicy.ChannelID)
×
100

×
101
                // We'll now construct the index key using the channel ID, and
×
102
                // the last time it was updated: (updateTime || chanID).
×
103
                var indexKey [8 + 8]byte
×
104
                byteOrder.PutUint64(
×
105
                        indexKey[:], uint64(edgePolicy.LastUpdate.Unix()),
×
106
                )
×
107
                copy(indexKey[8:], chanID)
×
108

×
109
                return edgeUpdateIndex.Put(indexKey[:], nil)
×
110
        })
111
        if err != nil {
×
112
                return fmt.Errorf("unable to update edge indexes: %w", err)
×
113
        }
×
114

115
        log.Infof("Migration to node and edge update indexes complete!")
×
116

×
117
        return nil
×
118
}
119

120
// MigrateInvoiceTimeSeries is a database migration that assigns all existing
121
// invoices an index in the add and/or the settle index. Additionally, all
122
// existing invoices will have their bytes padded out in order to encode the
123
// add+settle index as well as the amount paid.
124
func MigrateInvoiceTimeSeries(tx kvdb.RwTx) error {
×
125
        invoices, err := tx.CreateTopLevelBucket(invoiceBucket)
×
126
        if err != nil {
×
127
                return err
×
128
        }
×
129

130
        addIndex, err := invoices.CreateBucketIfNotExists(
×
131
                addIndexBucket,
×
132
        )
×
133
        if err != nil {
×
134
                return err
×
135
        }
×
136
        settleIndex, err := invoices.CreateBucketIfNotExists(
×
137
                settleIndexBucket,
×
138
        )
×
139
        if err != nil {
×
140
                return err
×
141
        }
×
142

143
        log.Infof("Migrating invoice database to new time series format")
×
144

×
145
        // Now that we have all the buckets we need, we'll run through each
×
146
        // invoice in the database, and update it to reflect the new format
×
147
        // expected post migration.
×
148
        // NOTE: we store the converted invoices and put them back into the
×
149
        // database after the loop, since modifying the bucket within the
×
150
        // ForEach loop is not safe.
×
151
        var invoicesKeys [][]byte
×
152
        var invoicesValues [][]byte
×
153
        err = invoices.ForEach(func(invoiceNum, invoiceBytes []byte) error {
×
154
                // If this is a sub bucket, then we'll skip it.
×
155
                if invoiceBytes == nil {
×
156
                        return nil
×
157
                }
×
158

159
                // First, we'll make a copy of the encoded invoice bytes.
160
                invoiceBytesCopy := make([]byte, len(invoiceBytes))
×
161
                copy(invoiceBytesCopy, invoiceBytes)
×
162

×
163
                // With the bytes copied over, we'll append 24 additional
×
164
                // bytes. We do this so we can decode the invoice under the new
×
165
                // serialization format.
×
166
                padding := bytes.Repeat([]byte{0}, 24)
×
167
                invoiceBytesCopy = append(invoiceBytesCopy, padding...)
×
168

×
169
                invoiceReader := bytes.NewReader(invoiceBytesCopy)
×
170
                invoice, err := deserializeInvoiceLegacy(invoiceReader)
×
171
                if err != nil {
×
172
                        return fmt.Errorf("unable to decode invoice: %w", err)
×
173
                }
×
174

175
                // Now that we have the fully decoded invoice, we can update
176
                // the various indexes that we're added, and finally the
177
                // invoice itself before re-inserting it.
178

179
                // First, we'll get the new sequence in the addIndex in order
180
                // to create the proper mapping.
181
                nextAddSeqNo, err := addIndex.NextSequence()
×
182
                if err != nil {
×
183
                        return err
×
184
                }
×
185
                var seqNoBytes [8]byte
×
186
                byteOrder.PutUint64(seqNoBytes[:], nextAddSeqNo)
×
187
                err = addIndex.Put(seqNoBytes[:], invoiceNum[:])
×
188
                if err != nil {
×
189
                        return err
×
190
                }
×
191

192
                log.Tracef("Adding invoice (preimage=%x, add_index=%v) to add "+
×
193
                        "time series", invoice.Terms.PaymentPreimage[:],
×
194
                        nextAddSeqNo)
×
195

×
196
                // Next, we'll check if the invoice has been settled or not. If
×
197
                // so, then we'll also add it to the settle index.
×
198
                var nextSettleSeqNo uint64
×
199
                if invoice.Terms.State == ContractSettled {
×
200
                        nextSettleSeqNo, err = settleIndex.NextSequence()
×
201
                        if err != nil {
×
202
                                return err
×
203
                        }
×
204

205
                        var seqNoBytes [8]byte
×
206
                        byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo)
×
207
                        err := settleIndex.Put(seqNoBytes[:], invoiceNum)
×
208
                        if err != nil {
×
209
                                return err
×
210
                        }
×
211

212
                        invoice.AmtPaid = invoice.Terms.Value
×
213

×
214
                        log.Tracef("Adding invoice (preimage=%x, "+
×
215
                                "settle_index=%v) to add time series",
×
216
                                invoice.Terms.PaymentPreimage[:],
×
217
                                nextSettleSeqNo)
×
218
                }
219

220
                // Finally, we'll update the invoice itself with the new
221
                // indexing information as well as the amount paid if it has
222
                // been settled or not.
223
                invoice.AddIndex = nextAddSeqNo
×
224
                invoice.SettleIndex = nextSettleSeqNo
×
225

×
226
                // We've fully migrated an invoice, so we'll now update the
×
227
                // invoice in-place.
×
228
                var b bytes.Buffer
×
229
                if err := serializeInvoiceLegacy(&b, &invoice); err != nil {
×
230
                        return err
×
231
                }
×
232

233
                // Save the key and value pending update for after the ForEach
234
                // is done.
235
                invoicesKeys = append(invoicesKeys, invoiceNum)
×
236
                invoicesValues = append(invoicesValues, b.Bytes())
×
237
                return nil
×
238
        })
239
        if err != nil {
×
240
                return err
×
241
        }
×
242

243
        // Now put the converted invoices into the DB.
244
        for i := range invoicesKeys {
×
245
                key := invoicesKeys[i]
×
246
                value := invoicesValues[i]
×
247
                if err := invoices.Put(key, value); err != nil {
×
248
                        return err
×
249
                }
×
250
        }
251

252
        log.Infof("Migration to invoice time series index complete!")
×
253

×
254
        return nil
×
255
}
256

257
// MigrateInvoiceTimeSeriesOutgoingPayments is a follow up to the
258
// migrateInvoiceTimeSeries migration. As at the time of writing, the
259
// OutgoingPayment struct embeddeds an instance of the Invoice struct. As a
260
// result, we also need to migrate the internal invoice to the new format.
261
func MigrateInvoiceTimeSeriesOutgoingPayments(tx kvdb.RwTx) error {
×
262
        payBucket := tx.ReadWriteBucket(paymentBucket)
×
263
        if payBucket == nil {
×
264
                return nil
×
265
        }
×
266

267
        log.Infof("Migrating invoice database to new outgoing payment format")
×
268

×
269
        // We store the keys and values we want to modify since it is not safe
×
270
        // to modify them directly within the ForEach loop.
×
271
        var paymentKeys [][]byte
×
272
        var paymentValues [][]byte
×
273
        err := payBucket.ForEach(func(payID, paymentBytes []byte) error {
×
274
                log.Tracef("Migrating payment %x", payID[:])
×
275

×
276
                // The internal invoices for each payment only contain a
×
277
                // populated contract term, and creation date, as a result,
×
278
                // most of the bytes will be "empty".
×
279

×
280
                // We'll calculate the end of the invoice index assuming a
×
281
                // "minimal" index that's embedded within the greater
×
282
                // OutgoingPayment. The breakdown is:
×
283
                //  3 bytes empty var bytes, 16 bytes creation date, 16 bytes
×
284
                //  settled date, 32 bytes payment pre-image, 8 bytes value, 1
×
285
                //  byte settled.
×
286
                endOfInvoiceIndex := 1 + 1 + 1 + 16 + 16 + 32 + 8 + 1
×
287

×
288
                // We'll now extract the prefix of the pure invoice embedded
×
289
                // within.
×
290
                invoiceBytes := paymentBytes[:endOfInvoiceIndex]
×
291

×
292
                // With the prefix extracted, we'll copy over the invoice, and
×
293
                // also add padding for the new 24 bytes of fields, and finally
×
294
                // append the remainder of the outgoing payment.
×
295
                paymentCopy := make([]byte, len(invoiceBytes))
×
296
                copy(paymentCopy[:], invoiceBytes)
×
297

×
298
                padding := bytes.Repeat([]byte{0}, 24)
×
299
                paymentCopy = append(paymentCopy, padding...)
×
300
                paymentCopy = append(
×
301
                        paymentCopy, paymentBytes[endOfInvoiceIndex:]...,
×
302
                )
×
303

×
304
                // At this point, we now have the new format of the outgoing
×
305
                // payments, we'll attempt to deserialize it to ensure the
×
306
                // bytes are properly formatted.
×
307
                paymentReader := bytes.NewReader(paymentCopy)
×
308
                _, err := deserializeOutgoingPayment(paymentReader)
×
309
                if err != nil {
×
310
                        return fmt.Errorf("unable to deserialize payment: %w",
×
311
                                err)
×
312
                }
×
313

314
                // Now that we know the modifications was successful, we'll
315
                // store it to our slice of keys and values, and write it back
316
                // to disk in the new format after the ForEach loop is over.
317
                paymentKeys = append(paymentKeys, payID)
×
318
                paymentValues = append(paymentValues, paymentCopy)
×
319
                return nil
×
320
        })
321
        if err != nil {
×
322
                return err
×
323
        }
×
324

325
        // Finally store the updated payments to the bucket.
326
        for i := range paymentKeys {
×
327
                key := paymentKeys[i]
×
328
                value := paymentValues[i]
×
329
                if err := payBucket.Put(key, value); err != nil {
×
330
                        return err
×
331
                }
×
332
        }
333

334
        log.Infof("Migration to outgoing payment invoices complete!")
×
335

×
336
        return nil
×
337
}
338

339
// MigrateEdgePolicies is a migration function that will update the edges
340
// bucket. It ensure that edges with unknown policies will also have an entry
341
// in the bucket. After the migration, there will be two edge entries for
342
// every channel, regardless of whether the policies are known.
343
func MigrateEdgePolicies(tx kvdb.RwTx) error {
×
344
        nodes := tx.ReadWriteBucket(nodeBucket)
×
345
        if nodes == nil {
×
346
                return nil
×
347
        }
×
348

349
        edges := tx.ReadWriteBucket(edgeBucket)
×
350
        if edges == nil {
×
351
                return nil
×
352
        }
×
353

354
        edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
×
355
        if edgeIndex == nil {
×
356
                return nil
×
357
        }
×
358

359
        // checkKey gets the policy from the database with a low-level call
360
        // so that it is still possible to distinguish between unknown and
361
        // not present.
362
        checkKey := func(channelId uint64, keyBytes []byte) error {
×
363
                var channelID [8]byte
×
364
                byteOrder.PutUint64(channelID[:], channelId)
×
365

×
366
                _, err := fetchChanEdgePolicy(edges,
×
367
                        channelID[:], keyBytes, nodes)
×
368

×
369
                if err == ErrEdgeNotFound {
×
370
                        log.Tracef("Adding unknown edge policy present for node %x, channel %v",
×
371
                                keyBytes, channelId)
×
372

×
373
                        err := putChanEdgePolicyUnknown(edges, channelId, keyBytes)
×
374
                        if err != nil {
×
375
                                return err
×
376
                        }
×
377

378
                        return nil
×
379
                }
380

381
                return err
×
382
        }
383

384
        // Iterate over all channels and check both edge policies.
385
        err := edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
×
386
                infoReader := bytes.NewReader(edgeInfoBytes)
×
387
                edgeInfo, err := deserializeChanEdgeInfo(infoReader)
×
388
                if err != nil {
×
389
                        return err
×
390
                }
×
391

392
                for _, key := range [][]byte{edgeInfo.NodeKey1Bytes[:],
×
393
                        edgeInfo.NodeKey2Bytes[:]} {
×
394

×
395
                        if err := checkKey(edgeInfo.ChannelID, key); err != nil {
×
396
                                return err
×
397
                        }
×
398
                }
399

400
                return nil
×
401
        })
402

403
        if err != nil {
×
404
                return fmt.Errorf("unable to update edge policies: %w", err)
×
405
        }
×
406

407
        log.Infof("Migration of edge policies complete!")
×
408

×
409
        return nil
×
410
}
411

412
// PaymentStatusesMigration is a database migration intended for adding payment
413
// statuses for each existing payment entity in bucket to be able control
414
// transitions of statuses and prevent cases such as double payment
UNCOV
415
func PaymentStatusesMigration(tx kvdb.RwTx) error {
×
UNCOV
416
        // Get the bucket dedicated to storing statuses of payments,
×
UNCOV
417
        // where a key is payment hash, value is payment status.
×
UNCOV
418
        paymentStatuses, err := tx.CreateTopLevelBucket(paymentStatusBucket)
×
UNCOV
419
        if err != nil {
×
420
                return err
×
421
        }
×
422

UNCOV
423
        log.Infof("Migrating database to support payment statuses")
×
UNCOV
424

×
UNCOV
425
        circuitAddKey := []byte("circuit-adds")
×
UNCOV
426
        circuits := tx.ReadWriteBucket(circuitAddKey)
×
UNCOV
427
        if circuits != nil {
×
UNCOV
428
                log.Infof("Marking all known circuits with status InFlight")
×
UNCOV
429

×
UNCOV
430
                err = circuits.ForEach(func(k, v []byte) error {
×
UNCOV
431
                        // Parse the first 8 bytes as the short chan ID for the
×
UNCOV
432
                        // circuit. We'll skip all short chan IDs are not
×
UNCOV
433
                        // locally initiated, which includes all non-zero short
×
UNCOV
434
                        // chan ids.
×
UNCOV
435
                        chanID := binary.BigEndian.Uint64(k[:8])
×
UNCOV
436
                        if chanID != 0 {
×
UNCOV
437
                                return nil
×
UNCOV
438
                        }
×
439

440
                        // The payment hash is the third item in the serialized
441
                        // payment circuit. The first two items are an AddRef
442
                        // (10 bytes) and the incoming circuit key (16 bytes).
UNCOV
443
                        const payHashOffset = 10 + 16
×
UNCOV
444

×
UNCOV
445
                        paymentHash := v[payHashOffset : payHashOffset+32]
×
UNCOV
446

×
UNCOV
447
                        return paymentStatuses.Put(
×
UNCOV
448
                                paymentHash[:], StatusInFlight.Bytes(),
×
UNCOV
449
                        )
×
450
                })
UNCOV
451
                if err != nil {
×
452
                        return err
×
453
                }
×
454
        }
455

UNCOV
456
        log.Infof("Marking all existing payments with status Completed")
×
UNCOV
457

×
UNCOV
458
        // Get the bucket dedicated to storing payments
×
UNCOV
459
        bucket := tx.ReadWriteBucket(paymentBucket)
×
UNCOV
460
        if bucket == nil {
×
461
                return nil
×
462
        }
×
463

464
        // For each payment in the bucket, deserialize the payment and mark it
465
        // as completed.
UNCOV
466
        err = bucket.ForEach(func(k, v []byte) error {
×
UNCOV
467
                // Ignores if it is sub-bucket.
×
UNCOV
468
                if v == nil {
×
469
                        return nil
×
470
                }
×
471

UNCOV
472
                r := bytes.NewReader(v)
×
UNCOV
473
                payment, err := deserializeOutgoingPayment(r)
×
UNCOV
474
                if err != nil {
×
475
                        return err
×
476
                }
×
477

478
                // Calculate payment hash for current payment.
UNCOV
479
                paymentHash := sha256.Sum256(payment.PaymentPreimage[:])
×
UNCOV
480

×
UNCOV
481
                // Update status for current payment to completed. If it fails,
×
UNCOV
482
                // the migration is aborted and the payment bucket is returned
×
UNCOV
483
                // to its previous state.
×
UNCOV
484
                return paymentStatuses.Put(paymentHash[:], StatusSucceeded.Bytes())
×
485
        })
UNCOV
486
        if err != nil {
×
487
                return err
×
488
        }
×
489

UNCOV
490
        log.Infof("Migration of payment statuses complete!")
×
UNCOV
491

×
UNCOV
492
        return nil
×
493
}
494

495
// MigratePruneEdgeUpdateIndex is a database migration that attempts to resolve
496
// some lingering bugs with regards to edge policies and their update index.
497
// Stale entries within the edge update index were not being properly pruned due
498
// to a miscalculation on the offset of an edge's policy last update. This
499
// migration also fixes the case where the public keys within edge policies were
500
// being serialized with an extra byte, causing an even greater error when
501
// attempting to perform the offset calculation described earlier.
502
func MigratePruneEdgeUpdateIndex(tx kvdb.RwTx) error {
×
503
        // To begin the migration, we'll retrieve the update index bucket. If it
×
504
        // does not exist, we have nothing left to do so we can simply exit.
×
505
        edges := tx.ReadWriteBucket(edgeBucket)
×
506
        if edges == nil {
×
507
                return nil
×
508
        }
×
509
        edgeUpdateIndex := edges.NestedReadWriteBucket(edgeUpdateIndexBucket)
×
510
        if edgeUpdateIndex == nil {
×
511
                return nil
×
512
        }
×
513

514
        // Retrieve some buckets that will be needed later on. These should
515
        // already exist given the assumption that the buckets above do as
516
        // well.
517
        edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
×
518
        if err != nil {
×
519
                return fmt.Errorf("error creating edge index bucket: %w", err)
×
520
        }
×
521
        if edgeIndex == nil {
×
522
                return fmt.Errorf("unable to create/fetch edge index " +
×
523
                        "bucket")
×
524
        }
×
525
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
×
526
        if err != nil {
×
527
                return fmt.Errorf("unable to make node bucket")
×
528
        }
×
529

530
        log.Info("Migrating database to properly prune edge update index")
×
531

×
532
        // We'll need to properly prune all the outdated entries within the edge
×
533
        // update index. To do so, we'll gather all of the existing policies
×
534
        // within the graph to re-populate them later on.
×
535
        var edgeKeys [][]byte
×
536
        err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
×
537
                // All valid entries are indexed by a public key (33 bytes)
×
538
                // followed by a channel ID (8 bytes), so we'll skip any entries
×
539
                // with keys that do not match this.
×
540
                if len(edgeKey) != 33+8 {
×
541
                        return nil
×
542
                }
×
543

544
                edgeKeys = append(edgeKeys, edgeKey)
×
545

×
546
                return nil
×
547
        })
548
        if err != nil {
×
549
                return fmt.Errorf("unable to gather existing edge policies: %w",
×
550
                        err)
×
551
        }
×
552

553
        log.Info("Constructing set of edge update entries to purge.")
×
554

×
555
        // Build the set of keys that we will remove from the edge update index.
×
556
        // This will include all keys contained within the bucket.
×
557
        var updateKeysToRemove [][]byte
×
558
        err = edgeUpdateIndex.ForEach(func(updKey, _ []byte) error {
×
559
                updateKeysToRemove = append(updateKeysToRemove, updKey)
×
560
                return nil
×
561
        })
×
562
        if err != nil {
×
563
                return fmt.Errorf("unable to gather existing edge updates: %w",
×
564
                        err)
×
565
        }
×
566

567
        log.Infof("Removing %d entries from edge update index.",
×
568
                len(updateKeysToRemove))
×
569

×
570
        // With the set of keys contained in the edge update index constructed,
×
571
        // we'll proceed in purging all of them from the index.
×
572
        for _, updKey := range updateKeysToRemove {
×
573
                if err := edgeUpdateIndex.Delete(updKey); err != nil {
×
574
                        return err
×
575
                }
×
576
        }
577

578
        log.Infof("Repopulating edge update index with %d valid entries.",
×
579
                len(edgeKeys))
×
580

×
581
        // For each edge key, we'll retrieve the policy, deserialize it, and
×
582
        // re-add it to the different buckets. By doing so, we'll ensure that
×
583
        // all existing edge policies are serialized correctly within their
×
584
        // respective buckets and that the correct entries are populated within
×
585
        // the edge update index.
×
586
        for _, edgeKey := range edgeKeys {
×
587
                edgePolicyBytes := edges.Get(edgeKey)
×
588

×
589
                // Skip any entries with unknown policies as there will not be
×
590
                // any entries for them in the edge update index.
×
591
                if bytes.Equal(edgePolicyBytes[:], unknownPolicy) {
×
592
                        continue
×
593
                }
594

595
                edgePolicy, err := deserializeChanEdgePolicy(
×
596
                        bytes.NewReader(edgePolicyBytes), nodes,
×
597
                )
×
598
                if err != nil {
×
599
                        return err
×
600
                }
×
601

602
                _, err = updateEdgePolicy(tx, edgePolicy)
×
603
                if err != nil {
×
604
                        return err
×
605
                }
×
606
        }
607

608
        log.Info("Migration to properly prune edge update index complete!")
×
609

×
610
        return nil
×
611
}
612

613
// MigrateOptionalChannelCloseSummaryFields migrates the serialized format of
614
// ChannelCloseSummary to a format where optional fields' presence is indicated
615
// with boolean markers.
UNCOV
616
func MigrateOptionalChannelCloseSummaryFields(tx kvdb.RwTx) error {
×
UNCOV
617
        closedChanBucket := tx.ReadWriteBucket(closedChannelBucket)
×
UNCOV
618
        if closedChanBucket == nil {
×
619
                return nil
×
620
        }
×
621

UNCOV
622
        log.Info("Migrating to new closed channel format...")
×
UNCOV
623

×
UNCOV
624
        // We store the converted keys and values and put them back into the
×
UNCOV
625
        // database after the loop, since modifying the bucket within the
×
UNCOV
626
        // ForEach loop is not safe.
×
UNCOV
627
        var closedChansKeys [][]byte
×
UNCOV
628
        var closedChansValues [][]byte
×
UNCOV
629
        err := closedChanBucket.ForEach(func(chanID, summary []byte) error {
×
UNCOV
630
                r := bytes.NewReader(summary)
×
UNCOV
631

×
UNCOV
632
                // Read the old (v6) format from the database.
×
UNCOV
633
                c, err := deserializeCloseChannelSummaryV6(r)
×
UNCOV
634
                if err != nil {
×
635
                        return err
×
636
                }
×
637

638
                // Serialize using the new format, and put back into the
639
                // bucket.
UNCOV
640
                var b bytes.Buffer
×
UNCOV
641
                if err := serializeChannelCloseSummary(&b, c); err != nil {
×
642
                        return err
×
643
                }
×
644

645
                // Now that we know the modifications was successful, we'll
646
                // Store the key and value to our slices, and write it back to
647
                // disk in the new format after the ForEach loop is over.
UNCOV
648
                closedChansKeys = append(closedChansKeys, chanID)
×
UNCOV
649
                closedChansValues = append(closedChansValues, b.Bytes())
×
UNCOV
650
                return nil
×
651
        })
UNCOV
652
        if err != nil {
×
653
                return fmt.Errorf("unable to update closed channels: %w", err)
×
654
        }
×
655

656
        // Now put the new format back into the DB.
UNCOV
657
        for i := range closedChansKeys {
×
UNCOV
658
                key := closedChansKeys[i]
×
UNCOV
659
                value := closedChansValues[i]
×
UNCOV
660
                if err := closedChanBucket.Put(key, value); err != nil {
×
661
                        return err
×
662
                }
×
663
        }
664

UNCOV
665
        log.Info("Migration to new closed channel format complete!")
×
UNCOV
666

×
UNCOV
667
        return nil
×
668
}
669

670
var messageStoreBucket = []byte("message-store")
671

672
// MigrateGossipMessageStoreKeys migrates the key format for gossip messages
673
// found in the message store to a new one that takes into consideration the of
674
// the message being stored.
UNCOV
675
func MigrateGossipMessageStoreKeys(tx kvdb.RwTx) error {
×
UNCOV
676
        // We'll start by retrieving the bucket in which these messages are
×
UNCOV
677
        // stored within. If there isn't one, there's nothing left for us to do
×
UNCOV
678
        // so we can avoid the migration.
×
UNCOV
679
        messageStore := tx.ReadWriteBucket(messageStoreBucket)
×
UNCOV
680
        if messageStore == nil {
×
681
                return nil
×
682
        }
×
683

UNCOV
684
        log.Info("Migrating to the gossip message store new key format")
×
UNCOV
685

×
UNCOV
686
        // Otherwise we'll proceed with the migration. We'll start by coalescing
×
UNCOV
687
        // all the current messages within the store, which are indexed by the
×
UNCOV
688
        // public key of the peer which they should be sent to, followed by the
×
UNCOV
689
        // short channel ID of the channel for which the message belongs to. We
×
UNCOV
690
        // should only expect to find channel announcement signatures as that
×
UNCOV
691
        // was the only support message type previously.
×
UNCOV
692
        msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures)
×
UNCOV
693
        err := messageStore.ForEach(func(k, v []byte) error {
×
UNCOV
694
                var msgKey [33 + 8]byte
×
UNCOV
695
                copy(msgKey[:], k)
×
UNCOV
696

×
UNCOV
697
                msg := &lnwire.AnnounceSignatures{}
×
UNCOV
698
                if err := msg.Decode(bytes.NewReader(v), 0); err != nil {
×
699
                        return err
×
700
                }
×
701

UNCOV
702
                msgs[msgKey] = msg
×
UNCOV
703

×
UNCOV
704
                return nil
×
705

706
        })
UNCOV
707
        if err != nil {
×
708
                return err
×
709
        }
×
710

711
        // Then, we'll go over all of our messages, remove their previous entry,
712
        // and add another with the new key format. Once we've done this for
713
        // every message, we can consider the migration complete.
UNCOV
714
        for oldMsgKey, msg := range msgs {
×
UNCOV
715
                if err := messageStore.Delete(oldMsgKey[:]); err != nil {
×
716
                        return err
×
717
                }
×
718

719
                // Construct the new key for which we'll find this message with
720
                // in the store. It'll be the same as the old, but we'll also
721
                // include the message type.
UNCOV
722
                var msgType [2]byte
×
UNCOV
723
                binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType()))
×
UNCOV
724
                newMsgKey := append(oldMsgKey[:], msgType[:]...)
×
UNCOV
725

×
UNCOV
726
                // Serialize the message with its wire encoding.
×
UNCOV
727
                var b bytes.Buffer
×
UNCOV
728
                if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
×
729
                        return err
×
730
                }
×
731

UNCOV
732
                if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil {
×
733
                        return err
×
734
                }
×
735
        }
736

UNCOV
737
        log.Info("Migration to the gossip message store new key format complete!")
×
UNCOV
738

×
UNCOV
739
        return nil
×
740
}
741

742
// MigrateOutgoingPayments moves the OutgoingPayments into a new bucket format
743
// where they all reside in a top-level bucket indexed by the payment hash. In
744
// this sub-bucket we store information relevant to this payment, such as the
745
// payment status.
746
//
747
// Since the router cannot handle resumed payments that have the status
748
// InFlight (we have no PaymentAttemptInfo available for pre-migration
749
// payments) we delete those statuses, so only Completed payments remain in the
750
// new bucket structure.
UNCOV
751
func MigrateOutgoingPayments(tx kvdb.RwTx) error {
×
UNCOV
752
        log.Infof("Migrating outgoing payments to new bucket structure")
×
UNCOV
753

×
UNCOV
754
        oldPayments := tx.ReadWriteBucket(paymentBucket)
×
UNCOV
755

×
UNCOV
756
        // Return early if there are no payments to migrate.
×
UNCOV
757
        if oldPayments == nil {
×
758
                log.Infof("No outgoing payments found, nothing to migrate.")
×
759
                return nil
×
760
        }
×
761

UNCOV
762
        newPayments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
×
UNCOV
763
        if err != nil {
×
764
                return err
×
765
        }
×
766

767
        // Helper method to get the source pubkey. We define it such that we
768
        // only attempt to fetch it if needed.
UNCOV
769
        sourcePub := func() ([33]byte, error) {
×
UNCOV
770
                var pub [33]byte
×
UNCOV
771
                nodes := tx.ReadWriteBucket(nodeBucket)
×
UNCOV
772
                if nodes == nil {
×
773
                        return pub, ErrGraphNotFound
×
774
                }
×
775

UNCOV
776
                selfPub := nodes.Get(sourceKey)
×
UNCOV
777
                if selfPub == nil {
×
778
                        return pub, ErrSourceNodeNotSet
×
779
                }
×
UNCOV
780
                copy(pub[:], selfPub[:])
×
UNCOV
781
                return pub, nil
×
782
        }
783

UNCOV
784
        err = oldPayments.ForEach(func(k, v []byte) error {
×
UNCOV
785
                // Ignores if it is sub-bucket.
×
UNCOV
786
                if v == nil {
×
787
                        return nil
×
788
                }
×
789

790
                // Read the old payment format.
UNCOV
791
                r := bytes.NewReader(v)
×
UNCOV
792
                payment, err := deserializeOutgoingPayment(r)
×
UNCOV
793
                if err != nil {
×
794
                        return err
×
795
                }
×
796

797
                // Calculate payment hash from the payment preimage.
UNCOV
798
                paymentHash := sha256.Sum256(payment.PaymentPreimage[:])
×
UNCOV
799

×
UNCOV
800
                // Now create and add a PaymentCreationInfo to the bucket.
×
UNCOV
801
                c := &PaymentCreationInfo{
×
UNCOV
802
                        PaymentHash:    paymentHash,
×
UNCOV
803
                        Value:          payment.Terms.Value,
×
UNCOV
804
                        CreationDate:   payment.CreationDate,
×
UNCOV
805
                        PaymentRequest: payment.PaymentRequest,
×
UNCOV
806
                }
×
UNCOV
807

×
UNCOV
808
                var infoBuf bytes.Buffer
×
UNCOV
809
                if err := serializePaymentCreationInfo(&infoBuf, c); err != nil {
×
810
                        return err
×
811
                }
×
812

UNCOV
813
                sourcePubKey, err := sourcePub()
×
UNCOV
814
                if err != nil {
×
815
                        return err
×
816
                }
×
817

818
                // Do the same for the PaymentAttemptInfo.
UNCOV
819
                totalAmt := payment.Terms.Value + payment.Fee
×
UNCOV
820
                rt := Route{
×
UNCOV
821
                        TotalTimeLock: payment.TimeLockLength,
×
UNCOV
822
                        TotalAmount:   totalAmt,
×
UNCOV
823
                        SourcePubKey:  sourcePubKey,
×
UNCOV
824
                        Hops:          []*Hop{},
×
UNCOV
825
                }
×
UNCOV
826
                for _, hop := range payment.Path {
×
UNCOV
827
                        rt.Hops = append(rt.Hops, &Hop{
×
UNCOV
828
                                PubKeyBytes:  hop,
×
UNCOV
829
                                AmtToForward: totalAmt,
×
UNCOV
830
                        })
×
UNCOV
831
                }
×
832

833
                // Since the old format didn't store the fee for individual
834
                // hops, we let the last hop eat the whole fee for the total to
835
                // add up.
UNCOV
836
                if len(rt.Hops) > 0 {
×
UNCOV
837
                        rt.Hops[len(rt.Hops)-1].AmtToForward = payment.Terms.Value
×
UNCOV
838
                }
×
839

840
                // Since we don't have the session key for old payments, we
841
                // create a random one to be able to serialize the attempt
842
                // info.
UNCOV
843
                priv, _ := btcec.NewPrivateKey()
×
UNCOV
844
                s := &PaymentAttemptInfo{
×
UNCOV
845
                        PaymentID:  0,    // unknown.
×
UNCOV
846
                        SessionKey: priv, // unknown.
×
UNCOV
847
                        Route:      rt,
×
UNCOV
848
                }
×
UNCOV
849

×
UNCOV
850
                var attemptBuf bytes.Buffer
×
UNCOV
851
                if err := serializePaymentAttemptInfoMigration9(&attemptBuf, s); err != nil {
×
852
                        return err
×
853
                }
×
854

855
                // Reuse the existing payment sequence number.
UNCOV
856
                var seqNum [8]byte
×
UNCOV
857
                copy(seqNum[:], k)
×
UNCOV
858

×
UNCOV
859
                // Create a bucket indexed by the payment hash.
×
UNCOV
860
                bucket, err := newPayments.CreateBucket(paymentHash[:])
×
UNCOV
861

×
UNCOV
862
                // If the bucket already exists, it means that we are migrating
×
UNCOV
863
                // from a database containing duplicate payments to a payment
×
UNCOV
864
                // hash. To keep this information, we store such duplicate
×
UNCOV
865
                // payments in a sub-bucket.
×
UNCOV
866
                if err == kvdb.ErrBucketExists {
×
UNCOV
867
                        pHashBucket := newPayments.NestedReadWriteBucket(paymentHash[:])
×
UNCOV
868

×
UNCOV
869
                        // Create a bucket for duplicate payments within this
×
UNCOV
870
                        // payment hash's bucket.
×
UNCOV
871
                        dup, err := pHashBucket.CreateBucketIfNotExists(
×
UNCOV
872
                                paymentDuplicateBucket,
×
UNCOV
873
                        )
×
UNCOV
874
                        if err != nil {
×
875
                                return err
×
876
                        }
×
877

878
                        // Each duplicate will get its own sub-bucket within
879
                        // this bucket, so use their sequence number to index
880
                        // them by.
UNCOV
881
                        bucket, err = dup.CreateBucket(seqNum[:])
×
UNCOV
882
                        if err != nil {
×
883
                                return err
×
884
                        }
×
885

UNCOV
886
                } else if err != nil {
×
887
                        return err
×
888
                }
×
889

890
                // Store the payment's information to the bucket.
UNCOV
891
                err = bucket.Put(paymentSequenceKey, seqNum[:])
×
UNCOV
892
                if err != nil {
×
893
                        return err
×
894
                }
×
895

UNCOV
896
                err = bucket.Put(paymentCreationInfoKey, infoBuf.Bytes())
×
UNCOV
897
                if err != nil {
×
898
                        return err
×
899
                }
×
900

UNCOV
901
                err = bucket.Put(paymentAttemptInfoKey, attemptBuf.Bytes())
×
UNCOV
902
                if err != nil {
×
903
                        return err
×
904
                }
×
905

UNCOV
906
                err = bucket.Put(paymentSettleInfoKey, payment.PaymentPreimage[:])
×
UNCOV
907
                if err != nil {
×
908
                        return err
×
909
                }
×
910

UNCOV
911
                return nil
×
912
        })
UNCOV
913
        if err != nil {
×
914
                return err
×
915
        }
×
916

917
        // To continue producing unique sequence numbers, we set the sequence
918
        // of the new bucket to that of the old one.
UNCOV
919
        seq := oldPayments.Sequence()
×
UNCOV
920
        if err := newPayments.SetSequence(seq); err != nil {
×
921
                return err
×
922
        }
×
923

924
        // Now we delete the old buckets. Deleting the payment status buckets
925
        // deletes all payment statuses other than Complete.
UNCOV
926
        err = tx.DeleteTopLevelBucket(paymentStatusBucket)
×
UNCOV
927
        if err != nil && err != kvdb.ErrBucketNotFound {
×
928
                return err
×
929
        }
×
930

931
        // Finally delete the old payment bucket.
UNCOV
932
        err = tx.DeleteTopLevelBucket(paymentBucket)
×
UNCOV
933
        if err != nil && err != kvdb.ErrBucketNotFound {
×
934
                return err
×
935
        }
×
936

UNCOV
937
        log.Infof("Migration of outgoing payment bucket structure completed!")
×
UNCOV
938
        return nil
×
939
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc