• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13980275562

20 Mar 2025 10:06PM UTC coverage: 58.6% (-10.2%) from 68.789%
13980275562

Pull #9623

github

web-flow
Merge b9b960345 into 09b674508
Pull Request #9623: Size msg test msg

0 of 1518 new or added lines in 42 files covered. (0.0%)

26603 existing lines in 443 files now uncovered.

96807 of 165200 relevant lines covered (58.6%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/channeldb/migration30/iterator.go
1
package migration30
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8

9
        mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
)
12

13
var (
14
        // openChanBucket stores all the currently open channels. This bucket
15
        // has a second, nested bucket which is keyed by a node's ID. Within
16
        // that node ID bucket, all attributes required to track, update, and
17
        // close a channel are stored.
18
        openChannelBucket = []byte("open-chan-bucket")
19

20
        // errExit is returned when the callback function used in iterator
21
        // needs to exit the iteration.
22
        errExit = errors.New("exit condition met")
23
)
24

25
// updateLocator defines a locator that can be used to find the next record to
26
// be migrated. This is useful when an interrupted migration that leads to a
27
// mixed revocation log formats saved in our database, we can then restart the
28
// migration using the locator to continue migrating the rest.
29
type updateLocator struct {
30
        // nodePub, chainHash and fundingOutpoint are used to locate the
31
        // channel bucket.
32
        nodePub         []byte
33
        chainHash       []byte
34
        fundingOutpoint []byte
35

36
        // nextHeight is used to locate the next old revocation log to be
37
        // migrated. A nil value means we've finished the migration.
38
        nextHeight []byte
39
}
40

41
// fetchChanBucket is a helper function that returns the bucket where a
42
// channel's data resides in given: the public key for the node, the outpoint,
43
// and the chainhash that the channel resides on.
44
func (ul *updateLocator) locateChanBucket(rootBucket kvdb.RwBucket) (
UNCOV
45
        kvdb.RwBucket, error) {
×
UNCOV
46

×
UNCOV
47
        // Within this top level bucket, fetch the bucket dedicated to storing
×
UNCOV
48
        // open channel data specific to the remote node.
×
UNCOV
49
        nodeChanBucket := rootBucket.NestedReadWriteBucket(ul.nodePub)
×
UNCOV
50
        if nodeChanBucket == nil {
×
UNCOV
51
                return nil, mig25.ErrNoActiveChannels
×
UNCOV
52
        }
×
53

54
        // We'll then recurse down an additional layer in order to fetch the
55
        // bucket for this particular chain.
UNCOV
56
        chainBucket := nodeChanBucket.NestedReadWriteBucket(ul.chainHash)
×
UNCOV
57
        if chainBucket == nil {
×
UNCOV
58
                return nil, mig25.ErrNoActiveChannels
×
UNCOV
59
        }
×
60

61
        // With the bucket for the node and chain fetched, we can now go down
62
        // another level, for this channel itself.
UNCOV
63
        chanBucket := chainBucket.NestedReadWriteBucket(ul.fundingOutpoint)
×
UNCOV
64
        if chanBucket == nil {
×
UNCOV
65
                return nil, mig25.ErrChannelNotFound
×
UNCOV
66
        }
×
67

UNCOV
68
        return chanBucket, nil
×
69
}
70

71
// findNextMigrateHeight finds the next commit height that's not migrated. It
72
// returns the commit height bytes found. A nil return value means the
73
// migration has been completed for this particular channel bucket.
UNCOV
74
func findNextMigrateHeight(chanBucket kvdb.RwBucket) []byte {
×
UNCOV
75
        // Read the old log bucket. The old bucket doesn't exist, indicating
×
UNCOV
76
        // either we don't have any old logs for this channel, or the migration
×
UNCOV
77
        // has been finished and the old bucket has been deleted.
×
UNCOV
78
        oldBucket := chanBucket.NestedReadBucket(
×
UNCOV
79
                revocationLogBucketDeprecated,
×
UNCOV
80
        )
×
UNCOV
81
        if oldBucket == nil {
×
UNCOV
82
                return nil
×
UNCOV
83
        }
×
84

85
        // Acquire a read cursor for the old bucket.
UNCOV
86
        oldCursor := oldBucket.ReadCursor()
×
UNCOV
87

×
UNCOV
88
        // Read the new log bucket. The sub-bucket hasn't been created yet,
×
UNCOV
89
        // indicating we haven't migrated any logs under this channel. In this
×
UNCOV
90
        // case, we'll return the first commit height found from the old
×
UNCOV
91
        // revocation log bucket as the next height.
×
UNCOV
92
        logBucket := chanBucket.NestedReadBucket(revocationLogBucket)
×
UNCOV
93
        if logBucket == nil {
×
UNCOV
94
                nextHeight, _ := oldCursor.First()
×
UNCOV
95
                return nextHeight
×
UNCOV
96
        }
×
97

98
        // Acquire a read cursor for the new bucket.
UNCOV
99
        cursor := logBucket.ReadCursor()
×
UNCOV
100

×
UNCOV
101
        // Read the last migrated record. If the key is nil, we haven't
×
UNCOV
102
        // migrated any logs yet. In this case we return the first commit
×
UNCOV
103
        // height found from the old revocation log bucket. For instance,
×
UNCOV
104
        // - old log: [1, 2]
×
UNCOV
105
        // - new log: []
×
UNCOV
106
        // We will return the first key [1].
×
UNCOV
107
        migratedHeight, _ := cursor.Last()
×
UNCOV
108
        if migratedHeight == nil {
×
109
                nextHeight, _ := oldCursor.First()
×
110
                return nextHeight
×
111
        }
×
112

113
        // Read the last height from the old log bucket.
UNCOV
114
        endHeight, _ := oldCursor.Last()
×
UNCOV
115

×
UNCOV
116
        switch bytes.Compare(migratedHeight, endHeight) {
×
117
        // If the height of the last old revocation equals to the migrated
118
        // height, we've done migrating for this channel. For instance,
119
        // - old log: [1, 2]
120
        // - new log: [1, 2]
UNCOV
121
        case 0:
×
UNCOV
122
                return nil
×
123

124
        // If the migrated height is smaller, it means this is a resumed
125
        // migration. In this case we will return the next height found in the
126
        // old bucket. For instance,
127
        // - old log: [1, 2]
128
        // - new log: [1]
129
        // We will return the key [2].
UNCOV
130
        case -1:
×
UNCOV
131
                // Now point the cursor to the migratedHeight. If we cannot
×
UNCOV
132
                // find this key from the old log bucket, the database might be
×
UNCOV
133
                // corrupted. In this case, we would return the first key so
×
UNCOV
134
                // that we would redo the migration for this chan bucket.
×
UNCOV
135
                matchedHeight, _ := oldCursor.Seek(migratedHeight)
×
UNCOV
136

×
UNCOV
137
                // NOTE: because Seek will return the next key when the passed
×
UNCOV
138
                // key cannot be found, we need to compare the `matchedHeight`
×
UNCOV
139
                // to decide whether `migratedHeight` is found or not.
×
UNCOV
140
                if !bytes.Equal(matchedHeight, migratedHeight) {
×
141
                        log.Warnf("Old revocation bucket doesn't have "+
×
142
                                "CommitHeight=%v yet it's found in the new "+
×
143
                                "bucket. It's likely the new revocation log "+
×
144
                                "bucket is corrupted. Migrations will be"+
×
145
                                "applied again.",
×
146
                                binary.BigEndian.Uint64(migratedHeight))
×
147

×
148
                        // Now return the first height found in the old bucket
×
149
                        // so we can redo the migration.
×
150
                        nextHeight, _ := oldCursor.First()
×
151
                        return nextHeight
×
152
                }
×
153

154
                // Otherwise, find the next height to be migrated.
UNCOV
155
                nextHeight, _ := oldCursor.Next()
×
UNCOV
156
                return nextHeight
×
157

158
        // If the migrated height is greater, it means this node has new logs
159
        // saved after v0.15.0. In this case, we need to further decide whether
160
        // the old logs have been migrated or not.
UNCOV
161
        case 1:
×
162
        }
163

164
        // If we ever reached here, it means we have a mixed of new and old
165
        // logs saved. Suppose we have old logs as,
166
        //   - old log: [1, 2]
167
        // We'd have four possible scenarios,
168
        //   - new log: [      3, 4] <- no migration happened, return [1].
169
        //   - new log: [1,    3, 4] <- resumed migration, return [2].
170
        //   - new log: [   2, 3, 4] <- corrupted migration, return [1].
171
        //   - new log: [1, 2, 3, 4] <- finished migration, return nil.
172
        // To find the next migration height, we will iterate the old logs to
173
        // grab the heights and query them in the new bucket until an height
174
        // cannot be found, which is our next migration height. Or, if the old
175
        // heights can all be found, it indicates a finished migration.
176

177
        // Move the cursor to the first record.
UNCOV
178
        oldKey, _ := oldCursor.First()
×
UNCOV
179

×
UNCOV
180
        // NOTE: this action can be time-consuming as we are iterating the
×
UNCOV
181
        // records and compare them. However, we would only ever hit here if
×
UNCOV
182
        // this is a resumed migration with new logs created after v.0.15.0.
×
UNCOV
183
        for {
×
UNCOV
184
                // Try to locate the old key in the new bucket. If it cannot be
×
UNCOV
185
                // found, it will be the next migrate height.
×
UNCOV
186
                newKey, _ := cursor.Seek(oldKey)
×
UNCOV
187

×
UNCOV
188
                // If the old key is not found in the new bucket, return it as
×
UNCOV
189
                // our next migration height.
×
UNCOV
190
                //
×
UNCOV
191
                // NOTE: because Seek will return the next key when the passed
×
UNCOV
192
                // key cannot be found, we need to compare the keys to deicde
×
UNCOV
193
                // whether the old key is found or not.
×
UNCOV
194
                if !bytes.Equal(newKey, oldKey) {
×
UNCOV
195
                        return oldKey
×
UNCOV
196
                }
×
197

198
                // Otherwise, keep iterating the old bucket.
UNCOV
199
                oldKey, _ = oldCursor.Next()
×
UNCOV
200

×
UNCOV
201
                // If we've done iterating, yet all the old keys can be found
×
UNCOV
202
                // in the new bucket, this means the migration has been
×
UNCOV
203
                // finished.
×
UNCOV
204
                if oldKey == nil {
×
UNCOV
205
                        return nil
×
UNCOV
206
                }
×
207
        }
208
}
209

210
// locateNextUpdateNum returns a locator that's used to start our migration. A
211
// nil locator means the migration has been finished.
UNCOV
212
func locateNextUpdateNum(openChanBucket kvdb.RwBucket) (*updateLocator, error) {
×
UNCOV
213
        locator := &updateLocator{}
×
UNCOV
214

×
UNCOV
215
        // cb is the callback function to be used when iterating the buckets.
×
UNCOV
216
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
×
UNCOV
217
                locator = l
×
UNCOV
218

×
UNCOV
219
                updateNum := findNextMigrateHeight(chanBucket)
×
UNCOV
220

×
UNCOV
221
                // We've found the next commit height and can now exit.
×
UNCOV
222
                if updateNum != nil {
×
UNCOV
223
                        locator.nextHeight = updateNum
×
UNCOV
224
                        return errExit
×
UNCOV
225
                }
×
UNCOV
226
                return nil
×
227
        }
228

229
        // Iterate the buckets. If we received an exit signal, return the
230
        // locator.
UNCOV
231
        err := iterateBuckets(openChanBucket, nil, cb)
×
UNCOV
232
        if err == errExit {
×
UNCOV
233
                log.Debugf("found locator: nodePub=%x, fundingOutpoint=%x, "+
×
UNCOV
234
                        "nextHeight=%x", locator.nodePub, locator.chainHash,
×
UNCOV
235
                        locator.nextHeight)
×
UNCOV
236
                return locator, nil
×
UNCOV
237
        }
×
238

239
        // If the err is nil, we've iterated all the sub-buckets and the
240
        // migration is finished.
UNCOV
241
        return nil, err
×
242
}
243

244
// callback defines a type that's used by the iterator.
245
type callback func(k, v []byte) error
246

247
// iterator is a helper function that iterates a given bucket and performs the
248
// callback function on each key. If a seeker is specified, it will move the
249
// cursor to the given position otherwise it will start from the first item.
UNCOV
250
func iterator(bucket kvdb.RBucket, seeker []byte, cb callback) error {
×
UNCOV
251
        c := bucket.ReadCursor()
×
UNCOV
252
        k, v := c.First()
×
UNCOV
253

×
UNCOV
254
        // Move the cursor to the specified position if seeker is non-nil.
×
UNCOV
255
        if seeker != nil {
×
UNCOV
256
                k, v = c.Seek(seeker)
×
UNCOV
257
        }
×
258

259
        // Start the iteration and exit on condition.
UNCOV
260
        for k, v := k, v; k != nil; k, v = c.Next() {
×
UNCOV
261
                // cb might return errExit to signal exiting the iteration.
×
UNCOV
262
                if err := cb(k, v); err != nil {
×
UNCOV
263
                        return err
×
UNCOV
264
                }
×
265
        }
UNCOV
266
        return nil
×
267
}
268

269
// step defines the callback type that's used when iterating the buckets.
270
type step func(bucket kvdb.RwBucket, l *updateLocator) error
271

272
// iterateBuckets locates the cursor at a given position specified by the
273
// updateLocator and starts the iteration. If a nil locator is passed, it will
274
// start the iteration from the beginning. During each iteration, the callback
275
// function is called and it may exit the iteration when the callback returns
276
// an errExit to signal an exit condition.
277
func iterateBuckets(openChanBucket kvdb.RwBucket,
UNCOV
278
        l *updateLocator, cb step) error {
×
UNCOV
279

×
UNCOV
280
        // If the locator is nil, we will initiate an empty one, which is
×
UNCOV
281
        // further used by the iterator.
×
UNCOV
282
        if l == nil {
×
UNCOV
283
                l = &updateLocator{}
×
UNCOV
284
        }
×
285

286
        // iterChanBucket iterates the chain bucket to act on each of the
287
        // channel buckets.
UNCOV
288
        iterChanBucket := func(chain kvdb.RwBucket,
×
UNCOV
289
                k1, k2, _ []byte, cb step) error {
×
UNCOV
290

×
UNCOV
291
                return iterator(
×
UNCOV
292
                        chain, l.fundingOutpoint,
×
UNCOV
293
                        func(k3, _ []byte) error {
×
UNCOV
294
                                // Read the sub-bucket level 3.
×
UNCOV
295
                                chanBucket := chain.NestedReadWriteBucket(k3)
×
UNCOV
296
                                if chanBucket == nil {
×
297
                                        return fmt.Errorf("no bucket for "+
×
298
                                                "chanPoint=%x", k3)
×
299
                                }
×
300

301
                                // Construct a new locator at this position.
UNCOV
302
                                locator := &updateLocator{
×
UNCOV
303
                                        nodePub:         k1,
×
UNCOV
304
                                        chainHash:       k2,
×
UNCOV
305
                                        fundingOutpoint: k3,
×
UNCOV
306
                                }
×
UNCOV
307

×
UNCOV
308
                                // Set the seeker to nil so it won't affect
×
UNCOV
309
                                // other buckets.
×
UNCOV
310
                                l.fundingOutpoint = nil
×
UNCOV
311

×
UNCOV
312
                                return cb(chanBucket, locator)
×
313
                        })
314
        }
315

UNCOV
316
        return iterator(openChanBucket, l.nodePub, func(k1, v []byte) error {
×
UNCOV
317
                // Read the sub-bucket level 1.
×
UNCOV
318
                node := openChanBucket.NestedReadWriteBucket(k1)
×
UNCOV
319
                if node == nil {
×
320
                        return fmt.Errorf("no bucket for node %x", k1)
×
321
                }
×
322

UNCOV
323
                return iterator(node, l.chainHash, func(k2, v []byte) error {
×
UNCOV
324
                        // Read the sub-bucket level 2.
×
UNCOV
325
                        chain := node.NestedReadWriteBucket(k2)
×
UNCOV
326
                        if chain == nil {
×
327
                                return fmt.Errorf("no bucket for chain=%x", k2)
×
328
                        }
×
329

330
                        // Set the seeker to nil so it won't affect other
331
                        // buckets.
UNCOV
332
                        l.chainHash = nil
×
UNCOV
333

×
UNCOV
334
                        return iterChanBucket(chain, k1, k2, v, cb)
×
335
                })
336
        })
337
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc