• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/channeldb/migration21/migration.go
1
package migration21
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7

8
        lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
9
        "github.com/lightningnetwork/lnd/channeldb/migration21/common"
10
        "github.com/lightningnetwork/lnd/channeldb/migration21/current"
11
        "github.com/lightningnetwork/lnd/channeldb/migration21/legacy"
12
        "github.com/lightningnetwork/lnd/kvdb"
13
)
14

15
var (
16
        byteOrder = binary.BigEndian
17

18
        // openChanBucket stores all the currently open channels. This bucket
19
        // has a second, nested bucket which is keyed by a node's ID. Within
20
        // that node ID bucket, all attributes required to track, update, and
21
        // close a channel are stored.
22
        //
23
        // openChan -> nodeID -> chanPoint
24
        //
25
        // TODO(roasbeef): flesh out comment
26
        openChannelBucket = []byte("open-chan-bucket")
27

28
        // commitDiffKey stores the current pending commitment state we've
29
        // extended to the remote party (if any). Each time we propose a new
30
        // state, we store the information necessary to reconstruct this state
31
        // from the prior commitment. This allows us to resync the remote party
32
        // to their expected state in the case of message loss.
33
        //
34
        // TODO(roasbeef): rename to commit chain?
35
        commitDiffKey = []byte("commit-diff-key")
36

37
        // unsignedAckedUpdatesKey is an entry in the channel bucket that
38
        // contains the remote updates that we have acked, but not yet signed
39
        // for in one of our remote commits.
40
        unsignedAckedUpdatesKey = []byte("unsigned-acked-updates-key")
41

42
        // remoteUnsignedLocalUpdatesKey is an entry in the channel bucket that
43
        // contains the local updates that the remote party has acked, but
44
        // has not yet signed for in one of their local commits.
45
        remoteUnsignedLocalUpdatesKey = []byte("remote-unsigned-local-updates-key")
46

47
        // networkResultStoreBucketKey is used for the root level bucket that
48
        // stores the network result for each payment ID.
49
        networkResultStoreBucketKey = []byte("network-result-store-bucket")
50

51
        // closedChannelBucket stores summarization information concerning
52
        // previously open, but now closed channels.
53
        closedChannelBucket = []byte("closed-chan-bucket")
54

55
        // fwdPackagesKey is the root-level bucket that all forwarding packages
56
        // are written. This bucket is further subdivided based on the short
57
        // channel ID of each channel.
58
        fwdPackagesKey = []byte("fwd-packages")
59
)
60

61
// MigrateDatabaseWireMessages performs a migration in all areas that we
62
// currently store wire messages without length prefixes. This includes the
63
// CommitDiff struct, ChannelCloseSummary, LogUpdates, and also the
64
// networkResult struct as well.
UNCOV
65
func MigrateDatabaseWireMessages(tx kvdb.RwTx) error {
×
UNCOV
66
        // The migration will proceed in three phases: we'll need to update any
×
UNCOV
67
        // pending commit diffs, then any unsigned acked updates for all open
×
UNCOV
68
        // channels, then finally we'll need to update all the current
×
UNCOV
69
        // stored network results for payments in the switch.
×
UNCOV
70
        //
×
UNCOV
71
        // In this phase, we'll migrate the open channel data.
×
UNCOV
72
        if err := migrateOpenChanBucket(tx); err != nil {
×
73
                return err
×
74
        }
×
75

76
        // Next, we'll update all the present close channel summaries as well.
UNCOV
77
        if err := migrateCloseChanSummaries(tx); err != nil {
×
78
                return err
×
79
        }
×
80

81
        // We'll migrate forwarding packages, which have log updates as part of
82
        // their serialized data.
UNCOV
83
        if err := migrateForwardingPackages(tx); err != nil {
×
84
                return err
×
85
        }
×
86

87
        // Finally, we'll update the pending network results as well.
UNCOV
88
        return migrateNetworkResults(tx)
×
89
}
90

UNCOV
91
func migrateOpenChanBucket(tx kvdb.RwTx) error {
×
UNCOV
92
        openChanBucket := tx.ReadWriteBucket(openChannelBucket)
×
UNCOV
93

×
UNCOV
94
        // If no bucket is found, we can exit early.
×
UNCOV
95
        if openChanBucket == nil {
×
96
                return nil
×
97
        }
×
98

UNCOV
99
        type channelPath struct {
×
UNCOV
100
                nodePub   []byte
×
UNCOV
101
                chainHash []byte
×
UNCOV
102
                chanPoint []byte
×
UNCOV
103
        }
×
UNCOV
104
        var channelPaths []channelPath
×
UNCOV
105
        err := openChanBucket.ForEach(func(nodePub, v []byte) error {
×
UNCOV
106
                // Ensure that this is a key the same size as a pubkey, and
×
UNCOV
107
                // also that it leads directly to a bucket.
×
UNCOV
108
                if len(nodePub) != 33 || v != nil {
×
109
                        return nil
×
110
                }
×
111

UNCOV
112
                nodeChanBucket := openChanBucket.NestedReadBucket(nodePub)
×
UNCOV
113
                if nodeChanBucket == nil {
×
114
                        return fmt.Errorf("no bucket for node %x", nodePub)
×
115
                }
×
116

117
                // The next layer down is all the chains that this node
118
                // has channels on with us.
UNCOV
119
                return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
×
UNCOV
120
                        // If there's a value, it's not a bucket so
×
UNCOV
121
                        // ignore it.
×
UNCOV
122
                        if v != nil {
×
123
                                return nil
×
124
                        }
×
125

UNCOV
126
                        chainBucket := nodeChanBucket.NestedReadBucket(
×
UNCOV
127
                                chainHash,
×
UNCOV
128
                        )
×
UNCOV
129
                        if chainBucket == nil {
×
130
                                return fmt.Errorf("unable to read "+
×
131
                                        "bucket for chain=%x", chainHash)
×
132
                        }
×
133

UNCOV
134
                        return chainBucket.ForEach(func(chanPoint, v []byte) error {
×
UNCOV
135
                                // If there's a value, it's not a bucket so
×
UNCOV
136
                                // ignore it.
×
UNCOV
137
                                if v != nil {
×
138
                                        return nil
×
139
                                }
×
140

UNCOV
141
                                channelPaths = append(channelPaths, channelPath{
×
UNCOV
142
                                        nodePub:   nodePub,
×
UNCOV
143
                                        chainHash: chainHash,
×
UNCOV
144
                                        chanPoint: chanPoint,
×
UNCOV
145
                                })
×
UNCOV
146

×
UNCOV
147
                                return nil
×
148
                        })
149
                })
150
        })
UNCOV
151
        if err != nil {
×
152
                return err
×
153
        }
×
154

155
        // Now that we have all the paths of the channel we need to migrate,
156
        // we'll update all the state in a distinct step to avoid weird
157
        // behavior from  modifying buckets in a ForEach statement.
UNCOV
158
        for _, channelPath := range channelPaths {
×
UNCOV
159
                // First, we'll extract it from the node's chain bucket.
×
UNCOV
160
                nodeChanBucket := openChanBucket.NestedReadWriteBucket(
×
UNCOV
161
                        channelPath.nodePub,
×
UNCOV
162
                )
×
UNCOV
163
                chainBucket := nodeChanBucket.NestedReadWriteBucket(
×
UNCOV
164
                        channelPath.chainHash,
×
UNCOV
165
                )
×
UNCOV
166
                chanBucket := chainBucket.NestedReadWriteBucket(
×
UNCOV
167
                        channelPath.chanPoint,
×
UNCOV
168
                )
×
UNCOV
169

×
UNCOV
170
                // At this point, we have the channel bucket now, so we'll
×
UNCOV
171
                // check to see if this channel has a pending commitment or
×
UNCOV
172
                // not.
×
UNCOV
173
                commitDiffBytes := chanBucket.Get(commitDiffKey)
×
UNCOV
174
                if commitDiffBytes != nil {
×
UNCOV
175
                        // Now that we have the commit diff in the _old_
×
UNCOV
176
                        // encoding, we'll write it back to disk using the new
×
UNCOV
177
                        // encoding which has a length prefix in front of the
×
UNCOV
178
                        // CommitSig.
×
UNCOV
179
                        commitDiff, err := legacy.DeserializeCommitDiff(
×
UNCOV
180
                                bytes.NewReader(commitDiffBytes),
×
UNCOV
181
                        )
×
UNCOV
182
                        if err != nil {
×
183
                                return err
×
184
                        }
×
185

UNCOV
186
                        var b bytes.Buffer
×
UNCOV
187
                        err = current.SerializeCommitDiff(&b, commitDiff)
×
UNCOV
188
                        if err != nil {
×
189
                                return err
×
190
                        }
×
191

UNCOV
192
                        err = chanBucket.Put(commitDiffKey, b.Bytes())
×
UNCOV
193
                        if err != nil {
×
194
                                return err
×
195
                        }
×
196
                }
197

198
                // With the commit diff migrated, we'll now check to see if
199
                // there're any un-acked updates we need to migrate as well.
UNCOV
200
                updateBytes := chanBucket.Get(unsignedAckedUpdatesKey)
×
UNCOV
201
                if updateBytes != nil {
×
UNCOV
202
                        // We have un-acked updates we need to migrate so we'll
×
UNCOV
203
                        // decode then re-encode them here using the new
×
UNCOV
204
                        // format.
×
UNCOV
205
                        legacyUnackedUpdates, err := legacy.DeserializeLogUpdates(
×
UNCOV
206
                                bytes.NewReader(updateBytes),
×
UNCOV
207
                        )
×
UNCOV
208
                        if err != nil {
×
209
                                return err
×
210
                        }
×
211

UNCOV
212
                        var b bytes.Buffer
×
UNCOV
213
                        err = current.SerializeLogUpdates(&b, legacyUnackedUpdates)
×
UNCOV
214
                        if err != nil {
×
215
                                return err
×
216
                        }
×
217

UNCOV
218
                        err = chanBucket.Put(unsignedAckedUpdatesKey, b.Bytes())
×
UNCOV
219
                        if err != nil {
×
220
                                return err
×
221
                        }
×
222
                }
223

224
                // Remote unsigned updates as well.
UNCOV
225
                updateBytes = chanBucket.Get(remoteUnsignedLocalUpdatesKey)
×
UNCOV
226
                if updateBytes != nil {
×
UNCOV
227
                        legacyUnsignedUpdates, err := legacy.DeserializeLogUpdates(
×
UNCOV
228
                                bytes.NewReader(updateBytes),
×
UNCOV
229
                        )
×
UNCOV
230
                        if err != nil {
×
231
                                return err
×
232
                        }
×
233

UNCOV
234
                        var b bytes.Buffer
×
UNCOV
235
                        err = current.SerializeLogUpdates(&b, legacyUnsignedUpdates)
×
UNCOV
236
                        if err != nil {
×
237
                                return err
×
238
                        }
×
239

UNCOV
240
                        err = chanBucket.Put(remoteUnsignedLocalUpdatesKey, b.Bytes())
×
UNCOV
241
                        if err != nil {
×
242
                                return err
×
243
                        }
×
244
                }
245
        }
246

UNCOV
247
        return nil
×
248
}
249

UNCOV
250
func migrateCloseChanSummaries(tx kvdb.RwTx) error {
×
UNCOV
251
        closedChanBucket := tx.ReadWriteBucket(closedChannelBucket)
×
UNCOV
252

×
UNCOV
253
        // Exit early if bucket is not found.
×
UNCOV
254
        if closedChannelBucket == nil {
×
255
                return nil
×
256
        }
×
257

UNCOV
258
        type closedChan struct {
×
UNCOV
259
                chanKey      []byte
×
UNCOV
260
                summaryBytes []byte
×
UNCOV
261
        }
×
UNCOV
262
        var closedChans []closedChan
×
UNCOV
263
        err := closedChanBucket.ForEach(func(k, v []byte) error {
×
UNCOV
264
                closedChans = append(closedChans, closedChan{
×
UNCOV
265
                        chanKey:      k,
×
UNCOV
266
                        summaryBytes: v,
×
UNCOV
267
                })
×
UNCOV
268
                return nil
×
UNCOV
269
        })
×
UNCOV
270
        if err != nil {
×
271
                return err
×
272
        }
×
273

UNCOV
274
        for _, closedChan := range closedChans {
×
UNCOV
275
                oldSummary, err := legacy.DeserializeCloseChannelSummary(
×
UNCOV
276
                        bytes.NewReader(closedChan.summaryBytes),
×
UNCOV
277
                )
×
UNCOV
278
                if err != nil {
×
279
                        return err
×
280
                }
×
281

UNCOV
282
                var newSummaryBytes bytes.Buffer
×
UNCOV
283
                err = current.SerializeChannelCloseSummary(
×
UNCOV
284
                        &newSummaryBytes, oldSummary,
×
UNCOV
285
                )
×
UNCOV
286
                if err != nil {
×
287
                        return err
×
288
                }
×
289

UNCOV
290
                err = closedChanBucket.Put(
×
UNCOV
291
                        closedChan.chanKey, newSummaryBytes.Bytes(),
×
UNCOV
292
                )
×
UNCOV
293
                if err != nil {
×
294
                        return err
×
295
                }
×
296
        }
UNCOV
297
        return nil
×
298
}
299

UNCOV
300
func migrateForwardingPackages(tx kvdb.RwTx) error {
×
UNCOV
301
        fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
×
UNCOV
302

×
UNCOV
303
        // Exit early if bucket is not found.
×
UNCOV
304
        if fwdPkgBkt == nil {
×
305
                return nil
×
306
        }
×
307

308
        // We Go through the bucket and fetches all short channel IDs.
UNCOV
309
        var sources []lnwire.ShortChannelID
×
UNCOV
310
        err := fwdPkgBkt.ForEach(func(k, v []byte) error {
×
UNCOV
311
                source := lnwire.NewShortChanIDFromInt(byteOrder.Uint64(k))
×
UNCOV
312
                sources = append(sources, source)
×
UNCOV
313
                return nil
×
UNCOV
314
        })
×
UNCOV
315
        if err != nil {
×
316
                return err
×
317
        }
×
318

319
        // Now load all forwarding packages using the legacy encoding.
UNCOV
320
        var pkgsToMigrate []*common.FwdPkg
×
UNCOV
321
        for _, source := range sources {
×
UNCOV
322
                packager := legacy.NewChannelPackager(source)
×
UNCOV
323
                fwdPkgs, err := packager.LoadFwdPkgs(tx)
×
UNCOV
324
                if err != nil {
×
325
                        return err
×
326
                }
×
327

UNCOV
328
                pkgsToMigrate = append(pkgsToMigrate, fwdPkgs...)
×
329
        }
330

331
        // Add back the packages using the current encoding.
UNCOV
332
        for _, pkg := range pkgsToMigrate {
×
UNCOV
333
                packager := current.NewChannelPackager(pkg.Source)
×
UNCOV
334
                err := packager.AddFwdPkg(tx, pkg)
×
UNCOV
335
                if err != nil {
×
336
                        return err
×
337
                }
×
338
        }
339

UNCOV
340
        return nil
×
341
}
342

UNCOV
343
func migrateNetworkResults(tx kvdb.RwTx) error {
×
UNCOV
344
        networkResults := tx.ReadWriteBucket(networkResultStoreBucketKey)
×
UNCOV
345

×
UNCOV
346
        // Exit early if bucket is not found.
×
UNCOV
347
        if networkResults == nil {
×
348
                return nil
×
349
        }
×
350

351
        // Similar to the prior migrations, we'll do this one in two phases:
352
        // we'll first grab all the keys we need to migrate in one loop, then
353
        // update them all in another loop.
UNCOV
354
        var netResultsToMigrate [][2][]byte
×
UNCOV
355
        err := networkResults.ForEach(func(k, v []byte) error {
×
UNCOV
356
                netResultsToMigrate = append(netResultsToMigrate, [2][]byte{
×
UNCOV
357
                        k, v,
×
UNCOV
358
                })
×
UNCOV
359
                return nil
×
UNCOV
360
        })
×
UNCOV
361
        if err != nil {
×
362
                return err
×
363
        }
×
364

UNCOV
365
        for _, netResult := range netResultsToMigrate {
×
UNCOV
366
                resKey := netResult[0]
×
UNCOV
367
                resBytes := netResult[1]
×
UNCOV
368
                oldResult, err := legacy.DeserializeNetworkResult(
×
UNCOV
369
                        bytes.NewReader(resBytes),
×
UNCOV
370
                )
×
UNCOV
371
                if err != nil {
×
372
                        return err
×
373
                }
×
374

UNCOV
375
                var newResultBuf bytes.Buffer
×
UNCOV
376
                err = current.SerializeNetworkResult(&newResultBuf, oldResult)
×
UNCOV
377
                if err != nil {
×
378
                        return err
×
379
                }
×
380

UNCOV
381
                err = networkResults.Put(resKey, newResultBuf.Bytes())
×
UNCOV
382
                if err != nil {
×
383
                        return err
×
384
                }
×
385
        }
UNCOV
386
        return nil
×
387
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc