• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16139624707

08 Jul 2025 09:37AM UTC coverage: 67.518% (+9.7%) from 57.787%
16139624707

Pull #10036

github

web-flow
Merge cb959bddb into b815109b8
Pull Request #10036: [graph mig 1]: graph/db: migrate graph nodes from kvdb to SQL

0 of 204 new or added lines in 3 files covered. (0.0%)

27 existing lines in 7 files now uncovered.

135164 of 200190 relevant lines covered (67.52%)

21804.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "cmp"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "net"
9
        "slices"
10
        "time"
11

12
        "github.com/btcsuite/btcd/chaincfg/chainhash"
13
        "github.com/lightningnetwork/lnd/graph/db/models"
14
        "github.com/lightningnetwork/lnd/kvdb"
15
        "github.com/lightningnetwork/lnd/sqldb"
16
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
17
)
18

19
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
20
// backend.
21
//
22
// NOTE: this is currently not called from any code path. It is called via tests
23
// only for now and will be called from the main lnd binary once the
24
// migration is fully implemented and tested.
25
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
NEW
26
        sqlDB SQLQueries, _ chainhash.Hash) error {
×
NEW
27

×
NEW
28
        log.Infof("Starting migration of the graph store from KV to SQL")
×
NEW
29
        t0 := time.Now()
×
NEW
30

×
NEW
31
        // Check if there is a graph to migrate.
×
NEW
32
        graphExists, err := checkGraphExists(kvBackend)
×
NEW
33
        if err != nil {
×
NEW
34
                return fmt.Errorf("failed to check graph existence: %w", err)
×
NEW
35
        }
×
NEW
36
        if !graphExists {
×
NEW
37
                log.Infof("No graph found in KV store, skipping the migration")
×
NEW
38
                return nil
×
NEW
39
        }
×
40

41
        // 1) Migrate all the nodes.
NEW
42
        if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
×
NEW
43
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
44
        }
×
45

46
        // 2) Migrate the source node.
NEW
47
        if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
×
NEW
48
                return fmt.Errorf("could not migrate source node: %w", err)
×
NEW
49
        }
×
50

NEW
51
        log.Infof("Finished migration of the graph store from KV to SQL in %v",
×
NEW
52
                time.Since(t0))
×
NEW
53

×
NEW
54
        return nil
×
55
}
56

57
// checkGraphExists checks if the graph exists in the KV backend.
NEW
58
func checkGraphExists(db kvdb.Backend) (bool, error) {
×
NEW
59
        // Check if there is even a graph to migrate.
×
NEW
60
        err := db.View(func(tx kvdb.RTx) error {
×
NEW
61
                // Check for the existence of the node bucket which is a top
×
NEW
62
                // level bucket that would have been created on the initial
×
NEW
63
                // creation of the graph store.
×
NEW
64
                nodes := tx.ReadBucket(nodeBucket)
×
NEW
65
                if nodes == nil {
×
NEW
66
                        return ErrGraphNotFound
×
NEW
67
                }
×
68

NEW
69
                return nil
×
NEW
70
        }, func() {})
×
NEW
71
        if errors.Is(err, ErrGraphNotFound) {
×
NEW
72
                return false, nil
×
NEW
73
        } else if err != nil {
×
NEW
74
                return false, err
×
NEW
75
        }
×
76

NEW
77
        return true, nil
×
78
}
79

80
// migrateNodes migrates all nodes from the KV backend to the SQL database.
81
// This includes doing a sanity check after each migration to ensure that the
82
// migrated node matches the original node.
83
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
NEW
84
        sqlDB SQLQueries) error {
×
NEW
85

×
NEW
86
        // Keep track of the number of nodes migrated and the number of
×
NEW
87
        // nodes skipped due to errors.
×
NEW
88
        var (
×
NEW
89
                count   uint64
×
NEW
90
                skipped uint64
×
NEW
91
        )
×
NEW
92

×
NEW
93
        // Loop through each node in the KV store and insert it into the SQL
×
NEW
94
        // database.
×
NEW
95
        err := forEachNode(kvBackend, func(_ kvdb.RTx,
×
NEW
96
                node *models.LightningNode) error {
×
NEW
97

×
NEW
98
                pub := node.PubKeyBytes
×
NEW
99

×
NEW
100
                // Sanity check to ensure that the node has valid extra opaque
×
NEW
101
                // data. If it does not, we'll skip it. We need to do this
×
NEW
102
                // because previously we would just persist any TLV bytes that
×
NEW
103
                // we received without validating them. Now, however, we
×
NEW
104
                // normalise the storage of extra opaque data, so we need to
×
NEW
105
                // ensure that the data is valid. We don't want to abort the
×
NEW
106
                // migration if we encounter a node with invalid extra opaque
×
NEW
107
                // data, so we'll just skip it and log a warning.
×
NEW
108
                _, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
NEW
109
                if errors.Is(err, ErrParsingExtraTLVBytes) {
×
NEW
110
                        skipped++
×
NEW
111
                        log.Warnf("Skipping migration of node %x with invalid "+
×
NEW
112
                                "extra opaque data: %v", pub,
×
NEW
113
                                node.ExtraOpaqueData)
×
NEW
114

×
NEW
115
                        return nil
×
NEW
116
                } else if err != nil {
×
NEW
117
                        return fmt.Errorf("unable to marshal extra "+
×
NEW
118
                                "opaque data for node %x: %w", pub, err)
×
NEW
119
                }
×
120

NEW
121
                count++
×
NEW
122

×
NEW
123
                // TODO(elle): At this point, we should check the loaded node
×
NEW
124
                // to see if we should extract any DNS addresses from its
×
NEW
125
                // opaque type addresses. This is expected to be done in:
×
NEW
126
                // https://github.com/lightningnetwork/lnd/pull/9455.
×
NEW
127
                // This TODO is being tracked in
×
NEW
128
                //  https://github.com/lightningnetwork/lnd/issues/9795 as this
×
NEW
129
                // must be addressed before making this code path active in
×
NEW
130
                // production.
×
NEW
131

×
NEW
132
                // Write the node to the SQL database.
×
NEW
133
                id, err := upsertNode(ctx, sqlDB, node)
×
NEW
134
                if err != nil {
×
NEW
135
                        return fmt.Errorf("could not persist node(%x): %w", pub,
×
NEW
136
                                err)
×
NEW
137
                }
×
138

139
                // Fetch it from the SQL store and compare it against the
140
                // original node object to ensure the migration was successful.
NEW
141
                dbNode, err := sqlDB.GetNodeByPubKey(
×
NEW
142
                        ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
143
                                PubKey:  node.PubKeyBytes[:],
×
NEW
144
                                Version: int16(ProtocolV1),
×
NEW
145
                        },
×
NEW
146
                )
×
NEW
147
                if err != nil {
×
NEW
148
                        return fmt.Errorf("could not get node by pubkey (%x)"+
×
NEW
149
                                "after migration: %w", pub, err)
×
NEW
150
                }
×
151

152
                // Sanity check: ensure the migrated node ID matches the one we
153
                // just inserted.
NEW
154
                if dbNode.ID != id {
×
NEW
155
                        return fmt.Errorf("node ID mismatch for node (%x) "+
×
NEW
156
                                "after migration: expected %d, got %d",
×
NEW
157
                                pub, id, dbNode.ID)
×
NEW
158
                }
×
159

NEW
160
                migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
×
NEW
161
                if err != nil {
×
NEW
162
                        return fmt.Errorf("could not build migrated node "+
×
NEW
163
                                "from dbNode(db id: %d, node pub: %x): %w",
×
NEW
164
                                dbNode.ID, pub, err)
×
NEW
165
                }
×
166

167
                // Make sure that the node addresses are sorted before
168
                // comparing them to ensure that the order of addresses does
169
                // not affect the comparison.
NEW
170
                slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
×
NEW
171
                        return cmp.Compare(i.String(), j.String())
×
NEW
172
                })
×
NEW
173
                slices.SortFunc(
×
NEW
174
                        migratedNode.Addresses, func(i, j net.Addr) int {
×
NEW
175
                                return cmp.Compare(i.String(), j.String())
×
NEW
176
                        },
×
177
                )
178

NEW
179
                return sqldb.CompareRecords(
×
NEW
180
                        node, migratedNode, fmt.Sprintf("node %x", pub),
×
NEW
181
                )
×
182
        })
NEW
183
        if err != nil {
×
NEW
184
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
185
        }
×
186

NEW
187
        log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
×
NEW
188
                "invalid TLV streams)", count, skipped)
×
NEW
189

×
NEW
190
        return nil
×
191
}
192

193
// migrateSourceNode migrates the source node from the KV backend to the
194
// SQL database.
195
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
NEW
196
        sqlDB SQLQueries) error {
×
NEW
197

×
NEW
198
        sourceNode, err := sourceNode(kvdb)
×
NEW
199
        if errors.Is(err, ErrSourceNodeNotSet) {
×
NEW
200
                // If the source node has not been set yet, we can skip this
×
NEW
201
                // migration step.
×
NEW
202
                return nil
×
NEW
203
        } else if err != nil {
×
NEW
204
                return fmt.Errorf("could not get source node from kv "+
×
NEW
205
                        "store: %w", err)
×
NEW
206
        }
×
207

NEW
208
        pub := sourceNode.PubKeyBytes
×
NEW
209

×
NEW
210
        // Get the DB ID of the source node by its public key. This node must
×
NEW
211
        // already exist in the SQL database, as it should have been migrated
×
NEW
212
        // in the previous node-migration step.
×
NEW
213
        id, err := sqlDB.GetNodeIDByPubKey(
×
NEW
214
                ctx, sqlc.GetNodeIDByPubKeyParams{
×
NEW
215
                        PubKey:  pub[:],
×
NEW
216
                        Version: int16(ProtocolV1),
×
NEW
217
                },
×
NEW
218
        )
×
NEW
219
        if err != nil {
×
NEW
220
                return fmt.Errorf("could not get source node ID: %w", err)
×
NEW
221
        }
×
222

223
        // Now we can add the source node to the SQL database.
NEW
224
        err = sqlDB.AddSourceNode(ctx, id)
×
NEW
225
        if err != nil {
×
NEW
226
                return fmt.Errorf("could not add source node to SQL store: %w",
×
NEW
227
                        err)
×
NEW
228
        }
×
229

230
        // Verify that the source node was added correctly by fetching it back
231
        // from the SQL database and checking that the expected DB ID and
232
        // pub key are returned. We don't need to do a whole node comparison
233
        // here, as this was already done in the previous migration step.
NEW
234
        srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
×
NEW
235
        if err != nil {
×
NEW
236
                return fmt.Errorf("could not get source nodes from SQL "+
×
NEW
237
                        "store: %w", err)
×
NEW
238
        }
×
239

240
        // The SQL store has support for multiple source nodes (for future
241
        // protocol versions) but this migration is purely aimed at the V1
242
        // store, and so we expect exactly one source node to be present.
NEW
243
        if len(srcNodes) != 1 {
×
NEW
244
                return fmt.Errorf("expected exactly one source node, "+
×
NEW
245
                        "got %d", len(srcNodes))
×
NEW
246
        }
×
247

248
        // Check that the source node ID and pub key match the original
249
        // source node.
NEW
250
        if srcNodes[0].NodeID != id {
×
NEW
251
                return fmt.Errorf("source node ID mismatch after migration: "+
×
NEW
252
                        "expected %d, got %d", id, srcNodes[0].NodeID)
×
NEW
253
        }
×
NEW
254
        err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
×
NEW
255
        if err != nil {
×
NEW
256
                return fmt.Errorf("source node pubkey mismatch after "+
×
NEW
257
                        "migration: %w", err)
×
NEW
258
        }
×
259

NEW
260
        log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
×
NEW
261

×
NEW
262
        return nil
×
263
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc