• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16073727682

04 Jul 2025 12:24PM UTC coverage: 57.969% (+0.1%) from 57.822%
16073727682

Pull #10036

github

web-flow
Merge ab4a75e5b into b3eb9a3cb
Pull Request #10036: [graph mig 1]: graph/db: migrate graph nodes from kvdb to SQL

0 of 192 new or added lines in 1 file covered. (0.0%)

35 existing lines in 6 files now uncovered.

99021 of 170816 relevant lines covered (57.97%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "reflect"
8
        "sort"
9
        "time"
10

11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/graph/db/models"
14
        "github.com/lightningnetwork/lnd/kvdb"
15
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
16
        "github.com/pmezard/go-difflib/difflib"
17
)
18

19
// ErrMigrationMismatch is returned when a migrated graph record does not match
20
// the original record.
21
var ErrMigrationMismatch = fmt.Errorf("migrated graph record does not match " +
22
        "original record")
23

24
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
25
// backend.
26
//
27
// NOTE: this is currently not called from any code path. It is called via tests
28
// only for now and will be called from the main lnd binary once the
29
// migration is fully implemented and tested.
30
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
NEW
31
        sqlDB SQLQueries, _ chainhash.Hash) error {
×
NEW
32

×
NEW
33
        log.Infof("Starting migration of the graph store from KV to SQL")
×
NEW
34
        t0 := time.Now()
×
NEW
35

×
NEW
36
        // Check if there is a graph to migrate.
×
NEW
37
        graphExists, err := checkGraphExists(kvBackend)
×
NEW
38
        if err != nil {
×
NEW
39
                return fmt.Errorf("failed to check graph existence: %w", err)
×
NEW
40
        }
×
NEW
41
        if !graphExists {
×
NEW
42
                log.Infof("No graph found in KV store, skipping the migration")
×
NEW
43
                return nil
×
NEW
44
        }
×
45

46
        // 1) Migrate all the nodes.
NEW
47
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
NEW
48
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
49
        }
×
50

51
        // 2) Migrate the source node.
NEW
52
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
NEW
53
                return fmt.Errorf("could not migrate source node: %w", err)
×
NEW
54
        }
×
55

NEW
56
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
NEW
57
                time.Since(t0))
×
NEW
58

×
NEW
59
        return nil
×
60
}
61

62
// checkGraphExists checks if the graph exists in the KV backend.
NEW
63
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
NEW
64
        // Check if there is even a graph to migrate.
×
NEW
65
        err := db.View(func(tx kvdb.RTx) error {
×
NEW
66
                // Check for the existence of the node bucket which is a top
×
NEW
67
                // level bucket that would have been created on the initial
×
NEW
68
                // creation of the graph store.
×
NEW
69
                nodes := tx.ReadBucket(nodeBucket)
×
NEW
70
                if nodes == nil {
×
NEW
71
                        return ErrGraphNotFound
×
NEW
72
                }
×
73

NEW
74
                return nil
×
NEW
75
        }, func() {})
×
NEW
76
        if errors.Is(err, ErrGraphNotFound) {
×
NEW
77
                return false, nil
×
NEW
78
        } else if err != nil {
×
NEW
79
                return false, fmt.Errorf("failed to check graph existence: %w",
×
NEW
80
                        err)
×
NEW
81
        }
×
82

NEW
83
        return true, nil
×
84
}
85

86
// migrateNodes migrates all nodes from the KV backend to the SQL database.
87
// This includes doing a sanity check after each migration to ensure that the
88
// migrated node matches the original node.
89
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
NEW
90
        sqlDB SQLQueries) error {
×
NEW
91

×
NEW
92
        // Keep track of the number of nodes migrated and the number of
×
NEW
93
        // nodes skipped due to errors.
×
NEW
94
        var (
×
NEW
95
                count   uint64
×
NEW
96
                skipped uint64
×
NEW
97
        )
×
NEW
98

×
NEW
99
        // Loop through each node in the KV store and insert it into the SQL
×
NEW
100
        // database.
×
NEW
101
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
NEW
102
                node *models.LightningNode) error {
×
NEW
103

×
NEW
104
                pub := node.PubKeyBytes
×
NEW
105

×
NEW
106
                // Sanity check to ensure that the node has valid extra opaque
×
NEW
107
                // data. If it does not, we'll skip it. We need to do this
×
NEW
108
                // because previously we would just persist any TLV bytes that
×
NEW
109
                // we received without validating them. Now, however, we
×
NEW
110
                // normalise the storage of extra opaque data, so we need to
×
NEW
111
                // ensure that the data is valid. We don't want to abort the
×
NEW
112
                // migration if we encounter a node with invalid extra opaque
×
NEW
113
                // data, so we'll just skip it and log a warning.
×
NEW
114
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
NEW
115
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
NEW
116
                        skipped++
×
NEW
117
                        log.Warnf("Skipping migration of node %x with invalid "+
×
NEW
118
                                "extra opaque data: %v", pub,
×
NEW
119
                                node.ExtraOpaqueData)
×
NEW
120

×
NEW
121
                        return nil
×
NEW
122
                } else if err != nil {
×
NEW
123
                        return fmt.Errorf("unable to marshal extra "+
×
NEW
124
                                "opaque data for node %x: %w", pub, err)
×
NEW
125
                }
×
126

NEW
127
                count++
×
NEW
128

×
NEW
129
                // Write the node to the SQL database.
×
NEW
130
                id, err := upsertNode(ctx, sqlDB, node)
×
NEW
131
                if err != nil {
×
NEW
132
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
NEW
133
                                err)
×
NEW
134
                }
×
135

136
                // Fetch it from the SQL store and compare it against the
137
                // original node object to ensure the migration was successful.
NEW
138
                dbNode, err := sqlDB.GetNodeByPubKey(
×
NEW
139
                        ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
140
                                PubKey:  node.PubKeyBytes[:],
×
NEW
141
                                Version: int16(ProtocolV1),
×
NEW
142
                        },
×
NEW
143
                )
×
NEW
144
                if err != nil {
×
NEW
145
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
NEW
146
                                "after migration: %w", pub, err)
×
NEW
147
                }
×
148

149
                // Sanity check: ensure the migrated node ID matches the one we
150
                // just inserted.
NEW
151
                if dbNode.ID != id {
×
NEW
152
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
NEW
153
                                "after migration: expected %d, got %d",
×
NEW
154
                                pub, id, dbNode.ID)
×
NEW
155
                }
×
156

NEW
157
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
NEW
158
                if err != nil {
×
NEW
159
                        return fmt.Errorf("could not build migrated node "+
×
NEW
160
                                "from dbNode(db id: %d, node pub: %x): %w",
×
NEW
161
                                dbNode.ID, pub, err)
×
NEW
162
                }
×
163

164
                // Make sure that the node addresses are sorted before
165
                // comparing them to ensure that the order of addresses does
166
                // not affect the comparison.
NEW
167
                sort.Slice(node.Addresses, func(i, j int) bool {
×
NEW
168
                        return node.Addresses[i].String() <
×
NEW
169
                                node.Addresses[j].String()
×
NEW
170
                })
×
NEW
171
                sort.Slice(migratedNode.Addresses, func(i, j int) bool {
×
NEW
172
                        return migratedNode.Addresses[i].String() <
×
NEW
173
                                migratedNode.Addresses[j].String()
×
NEW
174
                })
×
175

NEW
176
                return compare(node, migratedNode, fmt.Sprintf("node %x", pub))
×
177
        })
NEW
178
        if err != nil {
×
NEW
179
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
180
        }
×
181

NEW
182
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
NEW
183
                "invalid TLV streams)", count, skipped)
×
NEW
184

×
NEW
185
        return nil
×
186
}
187

188
// migrateSourceNode migrates the source node from the KV backend to the
189
// SQL database.
190
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
NEW
191
        sqlDB SQLQueries) error {
×
NEW
192

×
NEW
193
        sourceNode, err := sourceNode(kvdb)
×
NEW
194
        if errors.Is(err, ErrSourceNodeNotSet) {
×
NEW
195
                // If the source node has not been set yet, we can skip this
×
NEW
196
                // migration step.
×
NEW
197
                return nil
×
NEW
198
        } else if err != nil {
×
NEW
199
                return fmt.Errorf("could not get source node from kv "+
×
NEW
200
                        "store: %w", err)
×
NEW
201
        }
×
202

NEW
203
        pub := sourceNode.PubKeyBytes
×
NEW
204

×
NEW
205
        // Get the DB ID of the source node by its public key. This node must
×
NEW
206
        // already exist in the SQL database, as it should have been migrated
×
NEW
207
        // in the previous node-migration step.
×
NEW
208
        id, err := sqlDB.GetNodeIDByPubKey(
×
NEW
209
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
NEW
210
                        PubKey:  pub[:],
×
NEW
211
                        Version: int16(ProtocolV1),
×
NEW
212
                },
×
NEW
213
        )
×
NEW
214
        if err != nil {
×
NEW
215
                return fmt.Errorf("could not get source node ID: %w", err)
×
NEW
216
        }
×
217

218
        // Now we can add the source node to the SQL database.
NEW
219
        err = sqlDB.AddSourceNode(ctx, id)
×
NEW
220
        if err != nil {
×
NEW
221
                return fmt.Errorf("could not add source node to SQL store: %w",
×
NEW
222
                        err)
×
NEW
223
        }
×
224

225
        // Verify that the source node was added correctly by fetching it back
226
        // from the SQL database and checking that the expected DB ID and
227
        // pub key are returned. We don't need to do a whole node comparison
228
        // here, as this was already done in the previous migration step.
NEW
229
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
NEW
230
        if err != nil {
×
NEW
231
                return fmt.Errorf("could not get source nodes from SQL "+
×
NEW
232
                        "store: %w", err)
×
NEW
233
        }
×
234

235
        // The SQL store has support for multiple source nodes (for future
236
        // protocol versions) but this migration is purely aimed at the V1
237
        // store, and so we expect exactly one source node to be present.
NEW
238
        if len(srcNodes) != 1 {
×
NEW
239
                return fmt.Errorf("expected exactly one source node, "+
×
NEW
240
                        "got %d", len(srcNodes))
×
NEW
241
        }
×
242

243
        // Check that the source node ID and pub key match the original
244
        // source node.
NEW
245
        if srcNodes[0].NodeID != id {
×
NEW
246
                return fmt.Errorf("source node ID mismatch after migration: "+
×
NEW
247
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
NEW
248
        }
×
NEW
249
        err = compare(pub[:], srcNodes[0].PubKey, "source node")
×
NEW
250
        if err != nil {
×
NEW
251
                return fmt.Errorf("source node pubkey mismatch after "+
×
NEW
252
                        "migration: %w", err)
×
NEW
253
        }
×
254

NEW
255
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
NEW
256

×
NEW
257
        return nil
×
258
}
259

260
// compare checks if the original and migrated objects are equal. If they
261
// are not, it returns an error with a unified diff of the two objects.
NEW
262
func compare(original, migrated any, identifier string) error {
×
NEW
263
        if reflect.DeepEqual(original, migrated) {
×
NEW
264
                return nil
×
NEW
265
        }
×
266

NEW
267
        diff := difflib.UnifiedDiff{
×
NEW
268
                A:        difflib.SplitLines(spew.Sdump(original)),
×
NEW
269
                B:        difflib.SplitLines(spew.Sdump(migrated)),
×
NEW
270
                FromFile: "Expected",
×
NEW
271
                FromDate: "",
×
NEW
272
                ToFile:   "Actual",
×
NEW
273
                ToDate:   "",
×
NEW
274
                Context:  3,
×
NEW
275
        }
×
NEW
276
        diffText, _ := difflib.GetUnifiedDiffString(diff)
×
NEW
277

×
NEW
278
        return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier,
×
NEW
279
                diffText)
×
280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc