• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13586005509

28 Feb 2025 10:14AM UTC coverage: 68.629% (+9.9%) from 58.77%
13586005509

Pull #9521

github

web-flow
Merge 37d3a70a5 into 8532955b3
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

129950 of 189351 relevant lines covered (68.63%)

23726.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.52
/channeldb/migration21/migration.go
1
package migration21
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7

8
        lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
9
        "github.com/lightningnetwork/lnd/channeldb/migration21/common"
10
        "github.com/lightningnetwork/lnd/channeldb/migration21/current"
11
        "github.com/lightningnetwork/lnd/channeldb/migration21/legacy"
12
        "github.com/lightningnetwork/lnd/kvdb"
13
)
14

15
var (
16
        byteOrder = binary.BigEndian
17

18
        // openChanBucket stores all the currently open channels. This bucket
19
        // has a second, nested bucket which is keyed by a node's ID. Within
20
        // that node ID bucket, all attributes required to track, update, and
21
        // close a channel are stored.
22
        //
23
        // openChan -> nodeID -> chanPoint
24
        //
25
        // TODO(roasbeef): flesh out comment
26
        openChannelBucket = []byte("open-chan-bucket")
27

28
        // commitDiffKey stores the current pending commitment state we've
29
        // extended to the remote party (if any). Each time we propose a new
30
        // state, we store the information necessary to reconstruct this state
31
        // from the prior commitment. This allows us to resync the remote party
32
        // to their expected state in the case of message loss.
33
        //
34
        // TODO(roasbeef): rename to commit chain?
35
        commitDiffKey = []byte("commit-diff-key")
36

37
        // unsignedAckedUpdatesKey is an entry in the channel bucket that
38
        // contains the remote updates that we have acked, but not yet signed
39
        // for in one of our remote commits.
40
        unsignedAckedUpdatesKey = []byte("unsigned-acked-updates-key")
41

42
        // remoteUnsignedLocalUpdatesKey is an entry in the channel bucket that
43
        // contains the local updates that the remote party has acked, but
44
        // has not yet signed for in one of their local commits.
45
        remoteUnsignedLocalUpdatesKey = []byte("remote-unsigned-local-updates-key")
46

47
        // networkResultStoreBucketKey is used for the root level bucket that
48
        // stores the network result for each payment ID.
49
        networkResultStoreBucketKey = []byte("network-result-store-bucket")
50

51
        // closedChannelBucket stores summarization information concerning
52
        // previously open, but now closed channels.
53
        closedChannelBucket = []byte("closed-chan-bucket")
54

55
        // fwdPackagesKey is the root-level bucket that all forwarding packages
56
        // are written. This bucket is further subdivided based on the short
57
        // channel ID of each channel.
58
        fwdPackagesKey = []byte("fwd-packages")
59
)
60

61
// MigrateDatabaseWireMessages performs a migration in all areas that we
62
// currently store wire messages without length prefixes. This includes the
63
// CommitDiff struct, ChannelCloseSummary, LogUpdates, and also the
64
// networkResult struct as well.
65
func MigrateDatabaseWireMessages(tx kvdb.RwTx) error {
1✔
66
        // The migration will proceed in three phases: we'll need to update any
1✔
67
        // pending commit diffs, then any unsigned acked updates for all open
1✔
68
        // channels, then finally we'll need to update all the current
1✔
69
        // stored network results for payments in the switch.
1✔
70
        //
1✔
71
        // In this phase, we'll migrate the open channel data.
1✔
72
        if err := migrateOpenChanBucket(tx); err != nil {
1✔
73
                return err
×
74
        }
×
75

76
        // Next, we'll update all the present close channel summaries as well.
77
        if err := migrateCloseChanSummaries(tx); err != nil {
1✔
78
                return err
×
79
        }
×
80

81
        // We'll migrate forwarding packages, which have log updates as part of
82
        // their serialized data.
83
        if err := migrateForwardingPackages(tx); err != nil {
1✔
84
                return err
×
85
        }
×
86

87
        // Finally, we'll update the pending network results as well.
88
        return migrateNetworkResults(tx)
1✔
89
}
90

91
func migrateOpenChanBucket(tx kvdb.RwTx) error {
1✔
92
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
1✔
93

1✔
94
        // If no bucket is found, we can exit early.
1✔
95
        if openChanBucket == nil {
1✔
96
                return nil
×
97
        }
×
98

99
        type channelPath struct {
1✔
100
                nodePub   []byte
1✔
101
                chainHash []byte
1✔
102
                chanPoint []byte
1✔
103
        }
1✔
104
        var channelPaths []channelPath
1✔
105
        err := openChanBucket.ForEach(func(nodePub, v []byte) error {
2✔
106
                // Ensure that this is a key the same size as a pubkey, and
1✔
107
                // also that it leads directly to a bucket.
1✔
108
                if len(nodePub) != 33 || v != nil {
1✔
109
                        return nil
×
110
                }
×
111

112
                nodeChanBucket := openChanBucket.NestedReadBucket(nodePub)
1✔
113
                if nodeChanBucket == nil {
1✔
114
                        return fmt.Errorf("no bucket for node %x", nodePub)
×
115
                }
×
116

117
                // The next layer down is all the chains that this node
118
                // has channels on with us.
119
                return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
2✔
120
                        // If there's a value, it's not a bucket so
1✔
121
                        // ignore it.
1✔
122
                        if v != nil {
1✔
123
                                return nil
×
124
                        }
×
125

126
                        chainBucket := nodeChanBucket.NestedReadBucket(
1✔
127
                                chainHash,
1✔
128
                        )
1✔
129
                        if chainBucket == nil {
1✔
130
                                return fmt.Errorf("unable to read "+
×
131
                                        "bucket for chain=%x", chainHash)
×
132
                        }
×
133

134
                        return chainBucket.ForEach(func(chanPoint, v []byte) error {
2✔
135
                                // If there's a value, it's not a bucket so
1✔
136
                                // ignore it.
1✔
137
                                if v != nil {
1✔
138
                                        return nil
×
139
                                }
×
140

141
                                channelPaths = append(channelPaths, channelPath{
1✔
142
                                        nodePub:   nodePub,
1✔
143
                                        chainHash: chainHash,
1✔
144
                                        chanPoint: chanPoint,
1✔
145
                                })
1✔
146

1✔
147
                                return nil
1✔
148
                        })
149
                })
150
        })
151
        if err != nil {
1✔
152
                return err
×
153
        }
×
154

155
        // Now that we have all the paths of the channel we need to migrate,
156
        // we'll update all the state in a distinct step to avoid weird
157
        // behavior from  modifying buckets in a ForEach statement.
158
        for _, channelPath := range channelPaths {
2✔
159
                // First, we'll extract it from the node's chain bucket.
1✔
160
                nodeChanBucket := openChanBucket.NestedReadWriteBucket(
1✔
161
                        channelPath.nodePub,
1✔
162
                )
1✔
163
                chainBucket := nodeChanBucket.NestedReadWriteBucket(
1✔
164
                        channelPath.chainHash,
1✔
165
                )
1✔
166
                chanBucket := chainBucket.NestedReadWriteBucket(
1✔
167
                        channelPath.chanPoint,
1✔
168
                )
1✔
169

1✔
170
                // At this point, we have the channel bucket now, so we'll
1✔
171
                // check to see if this channel has a pending commitment or
1✔
172
                // not.
1✔
173
                commitDiffBytes := chanBucket.Get(commitDiffKey)
1✔
174
                if commitDiffBytes != nil {
2✔
175
                        // Now that we have the commit diff in the _old_
1✔
176
                        // encoding, we'll write it back to disk using the new
1✔
177
                        // encoding which has a length prefix in front of the
1✔
178
                        // CommitSig.
1✔
179
                        commitDiff, err := legacy.DeserializeCommitDiff(
1✔
180
                                bytes.NewReader(commitDiffBytes),
1✔
181
                        )
1✔
182
                        if err != nil {
1✔
183
                                return err
×
184
                        }
×
185

186
                        var b bytes.Buffer
1✔
187
                        err = current.SerializeCommitDiff(&b, commitDiff)
1✔
188
                        if err != nil {
1✔
189
                                return err
×
190
                        }
×
191

192
                        err = chanBucket.Put(commitDiffKey, b.Bytes())
1✔
193
                        if err != nil {
1✔
194
                                return err
×
195
                        }
×
196
                }
197

198
                // With the commit diff migrated, we'll now check to see if
199
                // there're any un-acked updates we need to migrate as well.
200
                updateBytes := chanBucket.Get(unsignedAckedUpdatesKey)
1✔
201
                if updateBytes != nil {
2✔
202
                        // We have un-acked updates we need to migrate so we'll
1✔
203
                        // decode then re-encode them here using the new
1✔
204
                        // format.
1✔
205
                        legacyUnackedUpdates, err := legacy.DeserializeLogUpdates(
1✔
206
                                bytes.NewReader(updateBytes),
1✔
207
                        )
1✔
208
                        if err != nil {
1✔
209
                                return err
×
210
                        }
×
211

212
                        var b bytes.Buffer
1✔
213
                        err = current.SerializeLogUpdates(&b, legacyUnackedUpdates)
1✔
214
                        if err != nil {
1✔
215
                                return err
×
216
                        }
×
217

218
                        err = chanBucket.Put(unsignedAckedUpdatesKey, b.Bytes())
1✔
219
                        if err != nil {
1✔
220
                                return err
×
221
                        }
×
222
                }
223

224
                // Remote unsigned updates as well.
225
                updateBytes = chanBucket.Get(remoteUnsignedLocalUpdatesKey)
1✔
226
                if updateBytes != nil {
2✔
227
                        legacyUnsignedUpdates, err := legacy.DeserializeLogUpdates(
1✔
228
                                bytes.NewReader(updateBytes),
1✔
229
                        )
1✔
230
                        if err != nil {
1✔
231
                                return err
×
232
                        }
×
233

234
                        var b bytes.Buffer
1✔
235
                        err = current.SerializeLogUpdates(&b, legacyUnsignedUpdates)
1✔
236
                        if err != nil {
1✔
237
                                return err
×
238
                        }
×
239

240
                        err = chanBucket.Put(remoteUnsignedLocalUpdatesKey, b.Bytes())
1✔
241
                        if err != nil {
1✔
242
                                return err
×
243
                        }
×
244
                }
245
        }
246

247
        return nil
1✔
248
}
249

250
func migrateCloseChanSummaries(tx kvdb.RwTx) error {
1✔
251
        closedChanBucket := tx.ReadWriteBucket(closedChannelBucket)
1✔
252

1✔
253
        // Exit early if bucket is not found.
1✔
254
        if closedChannelBucket == nil {
1✔
255
                return nil
×
256
        }
×
257

258
        type closedChan struct {
1✔
259
                chanKey      []byte
1✔
260
                summaryBytes []byte
1✔
261
        }
1✔
262
        var closedChans []closedChan
1✔
263
        err := closedChanBucket.ForEach(func(k, v []byte) error {
2✔
264
                closedChans = append(closedChans, closedChan{
1✔
265
                        chanKey:      k,
1✔
266
                        summaryBytes: v,
1✔
267
                })
1✔
268
                return nil
1✔
269
        })
1✔
270
        if err != nil {
1✔
271
                return err
×
272
        }
×
273

274
        for _, closedChan := range closedChans {
2✔
275
                oldSummary, err := legacy.DeserializeCloseChannelSummary(
1✔
276
                        bytes.NewReader(closedChan.summaryBytes),
1✔
277
                )
1✔
278
                if err != nil {
1✔
279
                        return err
×
280
                }
×
281

282
                var newSummaryBytes bytes.Buffer
1✔
283
                err = current.SerializeChannelCloseSummary(
1✔
284
                        &newSummaryBytes, oldSummary,
1✔
285
                )
1✔
286
                if err != nil {
1✔
287
                        return err
×
288
                }
×
289

290
                err = closedChanBucket.Put(
1✔
291
                        closedChan.chanKey, newSummaryBytes.Bytes(),
1✔
292
                )
1✔
293
                if err != nil {
1✔
294
                        return err
×
295
                }
×
296
        }
297
        return nil
1✔
298
}
299

300
func migrateForwardingPackages(tx kvdb.RwTx) error {
1✔
301
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
1✔
302

1✔
303
        // Exit early if bucket is not found.
1✔
304
        if fwdPkgBkt == nil {
1✔
305
                return nil
×
306
        }
×
307

308
        // We Go through the bucket and fetches all short channel IDs.
309
        var sources []lnwire.ShortChannelID
1✔
310
        err := fwdPkgBkt.ForEach(func(k, v []byte) error {
101✔
311
                source := lnwire.NewShortChanIDFromInt(byteOrder.Uint64(k))
100✔
312
                sources = append(sources, source)
100✔
313
                return nil
100✔
314
        })
100✔
315
        if err != nil {
1✔
316
                return err
×
317
        }
×
318

319
        // Now load all forwarding packages using the legacy encoding.
320
        var pkgsToMigrate []*common.FwdPkg
1✔
321
        for _, source := range sources {
101✔
322
                packager := legacy.NewChannelPackager(source)
100✔
323
                fwdPkgs, err := packager.LoadFwdPkgs(tx)
100✔
324
                if err != nil {
100✔
325
                        return err
×
326
                }
×
327

328
                pkgsToMigrate = append(pkgsToMigrate, fwdPkgs...)
100✔
329
        }
330

331
        // Add back the packages using the current encoding.
332
        for _, pkg := range pkgsToMigrate {
101✔
333
                packager := current.NewChannelPackager(pkg.Source)
100✔
334
                err := packager.AddFwdPkg(tx, pkg)
100✔
335
                if err != nil {
100✔
336
                        return err
×
337
                }
×
338
        }
339

340
        return nil
1✔
341
}
342

343
func migrateNetworkResults(tx kvdb.RwTx) error {
1✔
344
        networkResults := tx.ReadWriteBucket(networkResultStoreBucketKey)
1✔
345

1✔
346
        // Exit early if bucket is not found.
1✔
347
        if networkResults == nil {
1✔
348
                return nil
×
349
        }
×
350

351
        // Similar to the prior migrations, we'll do this one in two phases:
352
        // we'll first grab all the keys we need to migrate in one loop, then
353
        // update them all in another loop.
354
        var netResultsToMigrate [][2][]byte
1✔
355
        err := networkResults.ForEach(func(k, v []byte) error {
2✔
356
                netResultsToMigrate = append(netResultsToMigrate, [2][]byte{
1✔
357
                        k, v,
1✔
358
                })
1✔
359
                return nil
1✔
360
        })
1✔
361
        if err != nil {
1✔
362
                return err
×
363
        }
×
364

365
        for _, netResult := range netResultsToMigrate {
2✔
366
                resKey := netResult[0]
1✔
367
                resBytes := netResult[1]
1✔
368
                oldResult, err := legacy.DeserializeNetworkResult(
1✔
369
                        bytes.NewReader(resBytes),
1✔
370
                )
1✔
371
                if err != nil {
1✔
372
                        return err
×
373
                }
×
374

375
                var newResultBuf bytes.Buffer
1✔
376
                err = current.SerializeNetworkResult(&newResultBuf, oldResult)
1✔
377
                if err != nil {
1✔
378
                        return err
×
379
                }
×
380

381
                err = networkResults.Put(resKey, newResultBuf.Bytes())
1✔
382
                if err != nil {
1✔
383
                        return err
×
384
                }
×
385
        }
386
        return nil
1✔
387
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc