• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16122795439

pending completion
16122795439

Pull #10036

github

web-flow
Merge f13874ae2 into b815109b8
Pull Request #10036: [graph mig 1]: graph/db: migrate graph nodes from kvdb to SQL

0 of 188 new or added lines in 2 files covered. (0.0%)

34 existing lines in 6 files now uncovered.

98511 of 170567 relevant lines covered (57.76%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sort"
8
        "time"
9

10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/lightningnetwork/lnd/graph/db/models"
12
        "github.com/lightningnetwork/lnd/kvdb"
13
        "github.com/lightningnetwork/lnd/sqldb"
14
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
15
)
16

17
// ErrMigrationMismatch is returned when a migrated graph record does not match
18
// the original record.
19
var ErrMigrationMismatch = fmt.Errorf("migrated graph record does not match " +
20
        "original record")
21

22
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
23
// backend.
24
//
25
// NOTE: this is currently not called from any code path. It is called via tests
26
// only for now and will be called from the main lnd binary once the
27
// migration is fully implemented and tested.
28
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
NEW
29
        sqlDB SQLQueries, _ chainhash.Hash) error {
×
NEW
30

×
NEW
31
        log.Infof("Starting migration of the graph store from KV to SQL")
×
NEW
32
        t0 := time.Now()
×
NEW
33

×
NEW
34
        // Check if there is a graph to migrate.
×
NEW
35
        graphExists, err := checkGraphExists(kvBackend)
×
NEW
36
        if err != nil {
×
NEW
37
                return fmt.Errorf("failed to check graph existence: %w", err)
×
NEW
38
        }
×
NEW
39
        if !graphExists {
×
NEW
40
                log.Infof("No graph found in KV store, skipping the migration")
×
NEW
41
                return nil
×
NEW
42
        }
×
43

44
        // 1) Migrate all the nodes.
NEW
45
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
NEW
46
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
47
        }
×
48

49
        // 2) Migrate the source node.
NEW
50
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
NEW
51
                return fmt.Errorf("could not migrate source node: %w", err)
×
NEW
52
        }
×
53

NEW
54
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
NEW
55
                time.Since(t0))
×
NEW
56

×
NEW
57
        return nil
×
58
}
59

60
// checkGraphExists checks if the graph exists in the KV backend.
NEW
61
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
NEW
62
        // Check if there is even a graph to migrate.
×
NEW
63
        err := db.View(func(tx kvdb.RTx) error {
×
NEW
64
                // Check for the existence of the node bucket which is a top
×
NEW
65
                // level bucket that would have been created on the initial
×
NEW
66
                // creation of the graph store.
×
NEW
67
                nodes := tx.ReadBucket(nodeBucket)
×
NEW
68
                if nodes == nil {
×
NEW
69
                        return ErrGraphNotFound
×
NEW
70
                }
×
71

NEW
72
                return nil
×
NEW
73
        }, func() {})
×
NEW
74
        if errors.Is(err, ErrGraphNotFound) {
×
NEW
75
                return false, nil
×
NEW
76
        } else if err != nil {
×
NEW
77
                return false, err
×
NEW
78
        }
×
79

NEW
80
        return true, nil
×
81
}
82

83
// migrateNodes migrates all nodes from the KV backend to the SQL database.
84
// This includes doing a sanity check after each migration to ensure that the
85
// migrated node matches the original node.
86
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
NEW
87
        sqlDB SQLQueries) error {
×
NEW
88

×
NEW
89
        // Keep track of the number of nodes migrated and the number of
×
NEW
90
        // nodes skipped due to errors.
×
NEW
91
        var (
×
NEW
92
                count   uint64
×
NEW
93
                skipped uint64
×
NEW
94
        )
×
NEW
95

×
NEW
96
        // Loop through each node in the KV store and insert it into the SQL
×
NEW
97
        // database.
×
NEW
98
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
NEW
99
                node *models.LightningNode) error {
×
NEW
100

×
NEW
101
                pub := node.PubKeyBytes
×
NEW
102

×
NEW
103
                // Sanity check to ensure that the node has valid extra opaque
×
NEW
104
                // data. If it does not, we'll skip it. We need to do this
×
NEW
105
                // because previously we would just persist any TLV bytes that
×
NEW
106
                // we received without validating them. Now, however, we
×
NEW
107
                // normalise the storage of extra opaque data, so we need to
×
NEW
108
                // ensure that the data is valid. We don't want to abort the
×
NEW
109
                // migration if we encounter a node with invalid extra opaque
×
NEW
110
                // data, so we'll just skip it and log a warning.
×
NEW
111
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
NEW
112
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
NEW
113
                        skipped++
×
NEW
114
                        log.Warnf("Skipping migration of node %x with invalid "+
×
NEW
115
                                "extra opaque data: %v", pub,
×
NEW
116
                                node.ExtraOpaqueData)
×
NEW
117

×
NEW
118
                        return nil
×
NEW
119
                } else if err != nil {
×
NEW
120
                        return fmt.Errorf("unable to marshal extra "+
×
NEW
121
                                "opaque data for node %x: %w", pub, err)
×
NEW
122
                }
×
123

NEW
124
                count++
×
NEW
125

×
NEW
126
                // TODO(elle): At this point, we should check the loaded node
×
NEW
127
                // to see if we should extract any DNS addresses from its
×
NEW
128
                // opaque type addresses. This is expected to be done in:
×
NEW
129
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
NEW
130
                // This TODO is being tracked in
×
NEW
131
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
NEW
132
                // must be addressed before making this code path active in
×
NEW
133
                // production.
×
NEW
134

×
NEW
135
                // Write the node to the SQL database.
×
NEW
136
                id, err := upsertNode(ctx, sqlDB, node)
×
NEW
137
                if err != nil {
×
NEW
138
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
NEW
139
                                err)
×
NEW
140
                }
×
141

142
                // Fetch it from the SQL store and compare it against the
143
                // original node object to ensure the migration was successful.
NEW
144
                dbNode, err := sqlDB.GetNodeByPubKey(
×
NEW
145
                        ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
146
                                PubKey:  node.PubKeyBytes[:],
×
NEW
147
                                Version: int16(ProtocolV1),
×
NEW
148
                        },
×
NEW
149
                )
×
NEW
150
                if err != nil {
×
NEW
151
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
NEW
152
                                "after migration: %w", pub, err)
×
NEW
153
                }
×
154

155
                // Sanity check: ensure the migrated node ID matches the one we
156
                // just inserted.
NEW
157
                if dbNode.ID != id {
×
NEW
158
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
NEW
159
                                "after migration: expected %d, got %d",
×
NEW
160
                                pub, id, dbNode.ID)
×
NEW
161
                }
×
162

NEW
163
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
NEW
164
                if err != nil {
×
NEW
165
                        return fmt.Errorf("could not build migrated node "+
×
NEW
166
                                "from dbNode(db id: %d, node pub: %x): %w",
×
NEW
167
                                dbNode.ID, pub, err)
×
NEW
168
                }
×
169

170
                // Make sure that the node addresses are sorted before
171
                // comparing them to ensure that the order of addresses does
172
                // not affect the comparison.
NEW
173
                sort.Slice(node.Addresses, func(i, j int) bool {
×
NEW
174
                        return node.Addresses[i].String() <
×
NEW
175
                                node.Addresses[j].String()
×
NEW
176
                })
×
NEW
177
                sort.Slice(migratedNode.Addresses, func(i, j int) bool {
×
NEW
178
                        return migratedNode.Addresses[i].String() <
×
NEW
179
                                migratedNode.Addresses[j].String()
×
NEW
180
                })
×
181

NEW
182
                return sqldb.CompareRecords(
×
NEW
183
                        node, migratedNode, fmt.Sprintf("node %x", pub),
×
NEW
184
                )
×
185
        })
NEW
186
        if err != nil {
×
NEW
187
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
188
        }
×
189

NEW
190
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
NEW
191
                "invalid TLV streams)", count, skipped)
×
NEW
192

×
NEW
193
        return nil
×
194
}
195

196
// migrateSourceNode migrates the source node from the KV backend to the
197
// SQL database.
198
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
NEW
199
        sqlDB SQLQueries) error {
×
NEW
200

×
NEW
201
        sourceNode, err := sourceNode(kvdb)
×
NEW
202
        if errors.Is(err, ErrSourceNodeNotSet) {
×
NEW
203
                // If the source node has not been set yet, we can skip this
×
NEW
204
                // migration step.
×
NEW
205
                return nil
×
NEW
206
        } else if err != nil {
×
NEW
207
                return fmt.Errorf("could not get source node from kv "+
×
NEW
208
                        "store: %w", err)
×
NEW
209
        }
×
210

NEW
211
        pub := sourceNode.PubKeyBytes
×
NEW
212

×
NEW
213
        // Get the DB ID of the source node by its public key. This node must
×
NEW
214
        // already exist in the SQL database, as it should have been migrated
×
NEW
215
        // in the previous node-migration step.
×
NEW
216
        id, err := sqlDB.GetNodeIDByPubKey(
×
NEW
217
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
NEW
218
                        PubKey:  pub[:],
×
NEW
219
                        Version: int16(ProtocolV1),
×
NEW
220
                },
×
NEW
221
        )
×
NEW
222
        if err != nil {
×
NEW
223
                return fmt.Errorf("could not get source node ID: %w", err)
×
NEW
224
        }
×
225

226
        // Now we can add the source node to the SQL database.
NEW
227
        err = sqlDB.AddSourceNode(ctx, id)
×
NEW
228
        if err != nil {
×
NEW
229
                return fmt.Errorf("could not add source node to SQL store: %w",
×
NEW
230
                        err)
×
NEW
231
        }
×
232

233
        // Verify that the source node was added correctly by fetching it back
234
        // from the SQL database and checking that the expected DB ID and
235
        // pub key are returned. We don't need to do a whole node comparison
236
        // here, as this was already done in the previous migration step.
NEW
237
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
NEW
238
        if err != nil {
×
NEW
239
                return fmt.Errorf("could not get source nodes from SQL "+
×
NEW
240
                        "store: %w", err)
×
NEW
241
        }
×
242

243
        // The SQL store has support for multiple source nodes (for future
244
        // protocol versions) but this migration is purely aimed at the V1
245
        // store, and so we expect exactly one source node to be present.
NEW
246
        if len(srcNodes) != 1 {
×
NEW
247
                return fmt.Errorf("expected exactly one source node, "+
×
NEW
248
                        "got %d", len(srcNodes))
×
NEW
249
        }
×
250

251
        // Check that the source node ID and pub key match the original
252
        // source node.
NEW
253
        if srcNodes[0].NodeID != id {
×
NEW
254
                return fmt.Errorf("source node ID mismatch after migration: "+
×
NEW
255
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
NEW
256
        }
×
NEW
257
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
NEW
258
        if err != nil {
×
NEW
259
                return fmt.Errorf("source node pubkey mismatch after "+
×
NEW
260
                        "migration: %w", err)
×
NEW
261
        }
×
262

NEW
263
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
NEW
264

×
NEW
265
        return nil
×
266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc