• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14193549836

01 Apr 2025 10:40AM UTC coverage: 69.046% (+0.007%) from 69.039%
14193549836

Pull #9665

github

web-flow
Merge e8825f209 into b01f4e514
Pull Request #9665: kvdb: bump etcd libs to v3.5.12

133439 of 193262 relevant lines covered (69.05%)

22119.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.48
/channeldb/migration30/iterator.go
1
package migration30
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8

9
        mig25 "github.com/lightningnetwork/lnd/channeldb/migration25"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
)
12

13
var (
14
        // openChanBucket stores all the currently open channels. This bucket
15
        // has a second, nested bucket which is keyed by a node's ID. Within
16
        // that node ID bucket, all attributes required to track, update, and
17
        // close a channel are stored.
18
        openChannelBucket = []byte("open-chan-bucket")
19

20
        // errExit is returned when the callback function used in iterator
21
        // needs to exit the iteration.
22
        errExit = errors.New("exit condition met")
23
)
24

25
// updateLocator defines a locator that can be used to find the next record to
26
// be migrated. This is useful when an interrupted migration that leads to a
27
// mixed revocation log formats saved in our database, we can then restart the
28
// migration using the locator to continue migrating the rest.
29
type updateLocator struct {
30
        // nodePub, chainHash and fundingOutpoint are used to locate the
31
        // channel bucket.
32
        nodePub         []byte
33
        chainHash       []byte
34
        fundingOutpoint []byte
35

36
        // nextHeight is used to locate the next old revocation log to be
37
        // migrated. A nil value means we've finished the migration.
38
        nextHeight []byte
39
}
40

41
// fetchChanBucket is a helper function that returns the bucket where a
42
// channel's data resides in given: the public key for the node, the outpoint,
43
// and the chainhash that the channel resides on.
44
func (ul *updateLocator) locateChanBucket(rootBucket kvdb.RwBucket) (
45
        kvdb.RwBucket, error) {
4,186✔
46

4,186✔
47
        // Within this top level bucket, fetch the bucket dedicated to storing
4,186✔
48
        // open channel data specific to the remote node.
4,186✔
49
        nodeChanBucket := rootBucket.NestedReadWriteBucket(ul.nodePub)
4,186✔
50
        if nodeChanBucket == nil {
4,187✔
51
                return nil, mig25.ErrNoActiveChannels
1✔
52
        }
1✔
53

54
        // We'll then recurse down an additional layer in order to fetch the
55
        // bucket for this particular chain.
56
        chainBucket := nodeChanBucket.NestedReadWriteBucket(ul.chainHash)
4,185✔
57
        if chainBucket == nil {
4,186✔
58
                return nil, mig25.ErrNoActiveChannels
1✔
59
        }
1✔
60

61
        // With the bucket for the node and chain fetched, we can now go down
62
        // another level, for this channel itself.
63
        chanBucket := chainBucket.NestedReadWriteBucket(ul.fundingOutpoint)
4,184✔
64
        if chanBucket == nil {
4,185✔
65
                return nil, mig25.ErrChannelNotFound
1✔
66
        }
1✔
67

68
        return chanBucket, nil
4,183✔
69
}
70

71
// findNextMigrateHeight finds the next commit height that's not migrated. It
72
// returns the commit height bytes found. A nil return value means the
73
// migration has been completed for this particular channel bucket.
74
func findNextMigrateHeight(chanBucket kvdb.RwBucket) []byte {
10,167✔
75
        // Read the old log bucket. The old bucket doesn't exist, indicating
10,167✔
76
        // either we don't have any old logs for this channel, or the migration
10,167✔
77
        // has been finished and the old bucket has been deleted.
10,167✔
78
        oldBucket := chanBucket.NestedReadBucket(
10,167✔
79
                revocationLogBucketDeprecated,
10,167✔
80
        )
10,167✔
81
        if oldBucket == nil {
12,449✔
82
                return nil
2,282✔
83
        }
2,282✔
84

85
        // Acquire a read cursor for the old bucket.
86
        oldCursor := oldBucket.ReadCursor()
7,885✔
87

7,885✔
88
        // Read the new log bucket. The sub-bucket hasn't been created yet,
7,885✔
89
        // indicating we haven't migrated any logs under this channel. In this
7,885✔
90
        // case, we'll return the first commit height found from the old
7,885✔
91
        // revocation log bucket as the next height.
7,885✔
92
        logBucket := chanBucket.NestedReadBucket(revocationLogBucket)
7,885✔
93
        if logBucket == nil {
8,642✔
94
                nextHeight, _ := oldCursor.First()
757✔
95
                return nextHeight
757✔
96
        }
757✔
97

98
        // Acquire a read cursor for the new bucket.
99
        cursor := logBucket.ReadCursor()
7,128✔
100

7,128✔
101
        // Read the last migrated record. If the key is nil, we haven't
7,128✔
102
        // migrated any logs yet. In this case we return the first commit
7,128✔
103
        // height found from the old revocation log bucket. For instance,
7,128✔
104
        // - old log: [1, 2]
7,128✔
105
        // - new log: []
7,128✔
106
        // We will return the first key [1].
7,128✔
107
        migratedHeight, _ := cursor.Last()
7,128✔
108
        if migratedHeight == nil {
7,128✔
109
                nextHeight, _ := oldCursor.First()
×
110
                return nextHeight
×
111
        }
×
112

113
        // Read the last height from the old log bucket.
114
        endHeight, _ := oldCursor.Last()
7,128✔
115

7,128✔
116
        switch bytes.Compare(migratedHeight, endHeight) {
7,128✔
117
        // If the height of the last old revocation equals to the migrated
118
        // height, we've done migrating for this channel. For instance,
119
        // - old log: [1, 2]
120
        // - new log: [1, 2]
121
        case 0:
6,365✔
122
                return nil
6,365✔
123

124
        // If the migrated height is smaller, it means this is a resumed
125
        // migration. In this case we will return the next height found in the
126
        // old bucket. For instance,
127
        // - old log: [1, 2]
128
        // - new log: [1]
129
        // We will return the key [2].
130
        case -1:
759✔
131
                // Now point the cursor to the migratedHeight. If we cannot
759✔
132
                // find this key from the old log bucket, the database might be
759✔
133
                // corrupted. In this case, we would return the first key so
759✔
134
                // that we would redo the migration for this chan bucket.
759✔
135
                matchedHeight, _ := oldCursor.Seek(migratedHeight)
759✔
136

759✔
137
                // NOTE: because Seek will return the next key when the passed
759✔
138
                // key cannot be found, we need to compare the `matchedHeight`
759✔
139
                // to decide whether `migratedHeight` is found or not.
759✔
140
                if !bytes.Equal(matchedHeight, migratedHeight) {
759✔
141
                        log.Warnf("Old revocation bucket doesn't have "+
×
142
                                "CommitHeight=%v yet it's found in the new "+
×
143
                                "bucket. It's likely the new revocation log "+
×
144
                                "bucket is corrupted. Migrations will be"+
×
145
                                "applied again.",
×
146
                                binary.BigEndian.Uint64(migratedHeight))
×
147

×
148
                        // Now return the first height found in the old bucket
×
149
                        // so we can redo the migration.
×
150
                        nextHeight, _ := oldCursor.First()
×
151
                        return nextHeight
×
152
                }
×
153

154
                // Otherwise, find the next height to be migrated.
155
                nextHeight, _ := oldCursor.Next()
759✔
156
                return nextHeight
759✔
157

158
        // If the migrated height is greater, it means this node has new logs
159
        // saved after v0.15.0. In this case, we need to further decide whether
160
        // the old logs have been migrated or not.
161
        case 1:
4✔
162
        }
163

164
        // If we ever reached here, it means we have a mixed of new and old
165
        // logs saved. Suppose we have old logs as,
166
        //   - old log: [1, 2]
167
        // We'd have four possible scenarios,
168
        //   - new log: [      3, 4] <- no migration happened, return [1].
169
        //   - new log: [1,    3, 4] <- resumed migration, return [2].
170
        //   - new log: [   2, 3, 4] <- corrupted migration, return [1].
171
        //   - new log: [1, 2, 3, 4] <- finished migration, return nil.
172
        // To find the next migration height, we will iterate the old logs to
173
        // grab the heights and query them in the new bucket until an height
174
        // cannot be found, which is our next migration height. Or, if the old
175
        // heights can all be found, it indicates a finished migration.
176

177
        // Move the cursor to the first record.
178
        oldKey, _ := oldCursor.First()
4✔
179

4✔
180
        // NOTE: this action can be time-consuming as we are iterating the
4✔
181
        // records and compare them. However, we would only ever hit here if
4✔
182
        // this is a resumed migration with new logs created after v.0.15.0.
4✔
183
        for {
10✔
184
                // Try to locate the old key in the new bucket. If it cannot be
6✔
185
                // found, it will be the next migrate height.
6✔
186
                newKey, _ := cursor.Seek(oldKey)
6✔
187

6✔
188
                // If the old key is not found in the new bucket, return it as
6✔
189
                // our next migration height.
6✔
190
                //
6✔
191
                // NOTE: because Seek will return the next key when the passed
6✔
192
                // key cannot be found, we need to compare the keys to deicde
6✔
193
                // whether the old key is found or not.
6✔
194
                if !bytes.Equal(newKey, oldKey) {
9✔
195
                        return oldKey
3✔
196
                }
3✔
197

198
                // Otherwise, keep iterating the old bucket.
199
                oldKey, _ = oldCursor.Next()
3✔
200

3✔
201
                // If we've done iterating, yet all the old keys can be found
3✔
202
                // in the new bucket, this means the migration has been
3✔
203
                // finished.
3✔
204
                if oldKey == nil {
4✔
205
                        return nil
1✔
206
                }
1✔
207
        }
208
}
209

210
// locateNextUpdateNum returns a locator that's used to start our migration. A
211
// nil locator means the migration has been finished.
212
func locateNextUpdateNum(openChanBucket kvdb.RwBucket) (*updateLocator, error) {
4,146✔
213
        locator := &updateLocator{}
4,146✔
214

4,146✔
215
        // cb is the callback function to be used when iterating the buckets.
4,146✔
216
        cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error {
14,305✔
217
                locator = l
10,159✔
218

10,159✔
219
                updateNum := findNextMigrateHeight(chanBucket)
10,159✔
220

10,159✔
221
                // We've found the next commit height and can now exit.
10,159✔
222
                if updateNum != nil {
11,673✔
223
                        locator.nextHeight = updateNum
1,514✔
224
                        return errExit
1,514✔
225
                }
1,514✔
226
                return nil
8,645✔
227
        }
228

229
        // Iterate the buckets. If we received an exit signal, return the
230
        // locator.
231
        err := iterateBuckets(openChanBucket, nil, cb)
4,146✔
232
        if err == errExit {
5,660✔
233
                log.Debugf("found locator: nodePub=%x, fundingOutpoint=%x, "+
1,514✔
234
                        "nextHeight=%x", locator.nodePub, locator.chainHash,
1,514✔
235
                        locator.nextHeight)
1,514✔
236
                return locator, nil
1,514✔
237
        }
1,514✔
238

239
        // If the err is nil, we've iterated all the sub-buckets and the
240
        // migration is finished.
241
        return nil, err
2,632✔
242
}
243

244
// callback defines a type that's used by the iterator.
245
type callback func(k, v []byte) error
246

247
// iterator is a helper function that iterates a given bucket and performs the
248
// callback function on each key. If a seeker is specified, it will move the
249
// cursor to the given position otherwise it will start from the first item.
250
func iterator(bucket kvdb.RBucket, seeker []byte, cb callback) error {
46,252✔
251
        c := bucket.ReadCursor()
46,252✔
252
        k, v := c.First()
46,252✔
253

46,252✔
254
        // Move the cursor to the specified position if seeker is non-nil.
46,252✔
255
        if seeker != nil {
51,548✔
256
                k, v = c.Seek(seeker)
5,296✔
257
        }
5,296✔
258

259
        // Start the iteration and exit on condition.
260
        for k, v := k, v; k != nil; k, v = c.Next() {
110,437✔
261
                // cb might return errExit to signal exiting the iteration.
64,185✔
262
                if err := cb(k, v); err != nil {
68,734✔
263
                        return err
4,549✔
264
                }
4,549✔
265
        }
266
        return nil
41,703✔
267
}
268

269
// step defines the callback type that's used when iterating the buckets.
270
type step func(bucket kvdb.RwBucket, l *updateLocator) error
271

272
// iterateBuckets locates the cursor at a given position specified by the
273
// updateLocator and starts the iteration. If a nil locator is passed, it will
274
// start the iteration from the beginning. During each iteration, the callback
275
// function is called and it may exit the iteration when the callback returns
276
// an errExit to signal an exit condition.
277
func iterateBuckets(openChanBucket kvdb.RwBucket,
278
        l *updateLocator, cb step) error {
9,912✔
279

9,912✔
280
        // If the locator is nil, we will initiate an empty one, which is
9,912✔
281
        // further used by the iterator.
9,912✔
282
        if l == nil {
18,311✔
283
                l = &updateLocator{}
8,399✔
284
        }
8,399✔
285

286
        // iterChanBucket iterates the chain bucket to act on each of the
287
        // channel buckets.
288
        iterChanBucket := func(chain kvdb.RwBucket,
9,912✔
289
                k1, k2, _ []byte, cb step) error {
27,280✔
290

17,368✔
291
                return iterator(
17,368✔
292
                        chain, l.fundingOutpoint,
17,368✔
293
                        func(k3, _ []byte) error {
43,988✔
294
                                // Read the sub-bucket level 3.
26,620✔
295
                                chanBucket := chain.NestedReadWriteBucket(k3)
26,620✔
296
                                if chanBucket == nil {
26,620✔
297
                                        return fmt.Errorf("no bucket for "+
×
298
                                                "chanPoint=%x", k3)
×
299
                                }
×
300

301
                                // Construct a new locator at this position.
302
                                locator := &updateLocator{
26,620✔
303
                                        nodePub:         k1,
26,620✔
304
                                        chainHash:       k2,
26,620✔
305
                                        fundingOutpoint: k3,
26,620✔
306
                                }
26,620✔
307

26,620✔
308
                                // Set the seeker to nil so it won't affect
26,620✔
309
                                // other buckets.
26,620✔
310
                                l.fundingOutpoint = nil
26,620✔
311

26,620✔
312
                                return cb(chanBucket, locator)
26,620✔
313
                        })
314
        }
315

316
        return iterator(openChanBucket, l.nodePub, func(k1, v []byte) error {
27,280✔
317
                // Read the sub-bucket level 1.
17,368✔
318
                node := openChanBucket.NestedReadWriteBucket(k1)
17,368✔
319
                if node == nil {
17,368✔
320
                        return fmt.Errorf("no bucket for node %x", k1)
×
321
                }
×
322

323
                return iterator(node, l.chainHash, func(k2, v []byte) error {
34,736✔
324
                        // Read the sub-bucket level 2.
17,368✔
325
                        chain := node.NestedReadWriteBucket(k2)
17,368✔
326
                        if chain == nil {
17,368✔
327
                                return fmt.Errorf("no bucket for chain=%x", k2)
×
328
                        }
×
329

330
                        // Set the seeker to nil so it won't affect other
331
                        // buckets.
332
                        l.chainHash = nil
17,368✔
333

17,368✔
334
                        return iterChanBucket(chain, k1, k2, v, cb)
17,368✔
335
                })
336
        })
337
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc