• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15606538456

12 Jun 2025 09:14AM UTC coverage: 67.414% (+9.1%) from 58.333%
15606538456

Pull #9932

github

web-flow
Merge 25e652669 into 35102e7c3
Pull Request #9932: [draft] graph/db+sqldb: graph store SQL implementation + migration

23 of 3319 new or added lines in 7 files covered. (0.69%)

39 existing lines in 8 files now uncovered.

134459 of 199453 relevant lines covered (67.41%)

21872.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_migration.go
1
package graphdb
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7

8
        "github.com/btcsuite/btcd/chaincfg/chainhash"
9
        "github.com/lightningnetwork/lnd/graph/db/models"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
12
)
13

14
func MigrateGraphToSQL(ctx context.Context, kvStore *KVStore,
NEW
15
        tx *sqlc.Queries) error {
×
NEW
16

×
NEW
17
        sqlDB := SQLQueries(tx)
×
NEW
18

×
NEW
19
        // First, migrate all nodes.
×
NEW
20
        err := kvStore.ForEachNode(func(nodeTx NodeRTx) error {
×
NEW
21
                _, err := upsertNode(ctx, sqlDB, nodeTx.Node())
×
NEW
22
                if err != nil {
×
NEW
23
                        return fmt.Errorf("could not migrate node(%x): %w",
×
NEW
24
                                nodeTx.Node().PubKeyBytes, err)
×
NEW
25
                }
×
26

NEW
27
                return nil
×
28
        })
NEW
29
        if err != nil {
×
NEW
30
                return fmt.Errorf("could not migrate nodes: %w", err)
×
NEW
31
        }
×
32

NEW
33
        migrateChanPolicy := func(dbInfo *dbChanInfo,
×
NEW
34
                policy *models.ChannelEdgePolicy) error {
×
NEW
35

×
NEW
36
                if policy == nil {
×
NEW
37
                        return nil
×
NEW
38
                }
×
39

NEW
40
                _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
×
NEW
41

×
NEW
42
                return err
×
43
        }
44

45
        // Next, migrate all channels and channel updates
NEW
46
        err = kvStore.ForEachChannel(
×
NEW
47
                func(info *models.ChannelEdgeInfo,
×
NEW
48
                        policy1 *models.ChannelEdgePolicy,
×
NEW
49
                        policy2 *models.ChannelEdgePolicy) error {
×
NEW
50

×
NEW
51
                        dbChanInfo, err := insertChannel(ctx, sqlDB, info)
×
NEW
52
                        if err != nil {
×
NEW
53
                                return fmt.Errorf("could not migrate "+
×
NEW
54
                                        "channel(%d): %w", info.ChannelID, err)
×
NEW
55
                        }
×
56

NEW
57
                        err = migrateChanPolicy(dbChanInfo, policy1)
×
NEW
58
                        if err != nil {
×
NEW
59
                                return fmt.Errorf("could not migrate "+
×
NEW
60
                                        "policy1(%d): %w", info.ChannelID, err)
×
NEW
61
                        }
×
62

NEW
63
                        err = migrateChanPolicy(dbChanInfo, policy2)
×
NEW
64
                        if err != nil {
×
NEW
65
                                return fmt.Errorf("could not migrate "+
×
NEW
66
                                        "policy2(%d): %w", info.ChannelID, err)
×
NEW
67
                        }
×
68

NEW
69
                        return nil
×
70
                },
71
        )
NEW
72
        if err != nil {
×
NEW
73
                return fmt.Errorf("could not migrate channels and channel "+
×
NEW
74
                        "updates: %w", err)
×
NEW
75
        }
×
76

77
        // Migrate source node.
NEW
78
        err = migrateSourceNode(ctx, kvStore, sqlDB)
×
NEW
79
        if err != nil {
×
NEW
80
                return fmt.Errorf("could not migrate source node: %w", err)
×
NEW
81
        }
×
82

83
        // Migrate prune log.
NEW
84
        err = kvStore.forEachPruneLogEntry(
×
NEW
85
                func(height uint32, hash *chainhash.Hash) error {
×
NEW
86
                        err := sqlDB.UpsertPruneLogEntry(
×
NEW
87
                                ctx, sqlc.UpsertPruneLogEntryParams{
×
NEW
88
                                        BlockHeight: int64(height),
×
NEW
89
                                        BlockHash:   hash[:],
×
NEW
90
                                },
×
NEW
91
                        )
×
NEW
92
                        if err != nil {
×
NEW
93
                                return fmt.Errorf("unable to insert prune log "+
×
NEW
94
                                        "entry for height %d: %w", height, err)
×
NEW
95
                        }
×
96

NEW
97
                        return nil
×
98
                },
99
        )
NEW
100
        if err != nil {
×
NEW
101
                return fmt.Errorf("could not migrate prune log: %w", err)
×
NEW
102
        }
×
103

104
        // Migrate closed SCID index.
NEW
105
        err = kvStore.forEachClosedSCID(func(scid lnwire.ShortChannelID) error {
×
NEW
106
                var chanIDB [8]byte
×
NEW
107
                byteOrder.PutUint64(chanIDB[:], scid.ToUint64())
×
NEW
108

×
NEW
109
                return sqlDB.InsertClosedChannel(ctx, chanIDB[:])
×
NEW
110
        })
×
NEW
111
        if err != nil {
×
NEW
112
                return fmt.Errorf("could not migrate closed SCID index: %w",
×
NEW
113
                        err)
×
NEW
114
        }
×
115

116
        // Migrate zombie index.
NEW
117
        err = kvStore.forEachZombieEntry(func(chanID uint64, pubKey1,
×
NEW
118
                pubKey2 [33]byte) error {
×
NEW
119

×
NEW
120
                var chanIDB [8]byte
×
NEW
121
                byteOrder.PutUint64(chanIDB[:], chanID)
×
NEW
122

×
NEW
123
                // If it is in the closed SCID index, we don't need to
×
NEW
124
                // add it to the zombie index.
×
NEW
125
                isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB[:])
×
NEW
126
                if err != nil {
×
NEW
127
                        return fmt.Errorf("could not check closed "+
×
NEW
128
                                "channel: %w", err)
×
NEW
129
                }
×
130

NEW
131
                if isClosed {
×
NEW
132
                        return nil
×
NEW
133
                }
×
134

NEW
135
                return sqlDB.UpsertZombieChannel(
×
NEW
136
                        ctx, sqlc.UpsertZombieChannelParams{
×
NEW
137
                                Version:  int16(ProtocolV1),
×
NEW
138
                                Scid:     int64(chanID),
×
NEW
139
                                NodeKey1: pubKey1[:],
×
NEW
140
                                NodeKey2: pubKey2[:],
×
NEW
141
                        },
×
NEW
142
                )
×
143
        })
NEW
144
        if err != nil {
×
NEW
145
                return fmt.Errorf("could not migrate zombie index: %w", err)
×
NEW
146
        }
×
147

NEW
148
        return nil
×
149
}
150

151
func migrateSourceNode(ctx context.Context, kvStore *KVStore,
NEW
152
        sqlDB SQLQueries) error {
×
NEW
153

×
NEW
154
        sourceNode, err := kvStore.SourceNode()
×
NEW
155
        if errors.Is(err, ErrSourceNodeNotSet) {
×
NEW
156
                return nil
×
NEW
157
        } else if err != nil {
×
NEW
158
                return fmt.Errorf("could not get source node: %w", err)
×
NEW
159
        }
×
160

NEW
161
        dbSourceNode, err := sqlDB.GetNodeByPubKey(
×
NEW
162
                ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
163
                        PubKey:  sourceNode.PubKeyBytes[:],
×
NEW
164
                        Version: int16(ProtocolV1),
×
NEW
165
                },
×
NEW
166
        )
×
NEW
167
        if err != nil {
×
NEW
168
                return fmt.Errorf("could not get source node ID: %w", err)
×
NEW
169
        }
×
170

NEW
171
        err = sqlDB.AddSourceNode(ctx, dbSourceNode.ID)
×
NEW
172
        if err != nil {
×
NEW
173
                return fmt.Errorf("could not add source node: %w", err)
×
NEW
174
        }
×
175

NEW
176
        return nil
×
177
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc