• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.52
/watchtower/wtdb/migration4/client_db.go
1
package migration4
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8

9
        "github.com/lightningnetwork/lnd/kvdb"
10
        "github.com/lightningnetwork/lnd/tlv"
11
)
12

13
var (
14
        // cChanDetailsBkt is a top-level bucket storing:
15
        //   channel-id => cChannelSummary -> encoded ClientChanSummary.
16
        //                 => cChanDBID -> db-assigned-id
17
        cChanDetailsBkt = []byte("client-channel-detail-bucket")
18

19
        // cChanDBID is a key used in the cChanDetailsBkt to store the
20
        // db-assigned-id of a channel.
21
        cChanDBID = []byte("client-channel-db-id")
22

23
        // cSessionBkt is a top-level bucket storing:
24
        //   session-id => cSessionBody -> encoded ClientSessionBody
25
        //              => cSessionCommits => seqnum -> encoded CommittedUpdate
26
        //              => cSessionAcks => seqnum -> encoded BackupID
27
        cSessionBkt = []byte("client-session-bucket")
28

29
        // cSessionAcks is a sub-bucket of cSessionBkt storing:
30
        //    seqnum -> encoded BackupID.
31
        cSessionAcks = []byte("client-session-acks")
32

33
        // cSessionAckRangeIndex is a sub-bucket of cSessionBkt storing:
34
        //    chan-id => start -> end
35
        cSessionAckRangeIndex = []byte("client-session-ack-range-index")
36

37
        // ErrUninitializedDB signals that top-level buckets for the database
38
        // have not been initialized.
39
        ErrUninitializedDB = errors.New("db not initialized")
40

41
        // ErrClientSessionNotFound signals that the requested client session
42
        // was not found in the database.
43
        ErrClientSessionNotFound = errors.New("client session not found")
44

45
        // ErrCorruptChanDetails signals that the clients channel detail's
46
        // on-disk structure deviates from what is expected.
47
        ErrCorruptChanDetails = errors.New("channel details corrupted")
48

49
        // ErrChannelNotRegistered signals a channel has not yet been registered
50
        // in the client database.
51
        ErrChannelNotRegistered = errors.New("channel not registered")
52

53
        // byteOrder is the default endianness used when serializing integers.
54
        byteOrder = binary.BigEndian
55

56
        // errExit is an error used to signal that the sessionIterator should
57
        // exit.
58
        errExit = errors.New("the exit condition has been met")
59
)
60

61
// DefaultSessionsPerTx is the default number of sessions that should be
62
// migrated per db transaction.
63
const DefaultSessionsPerTx = 5000
64

65
// MigrateAckedUpdates migrates the tower client DB. It takes the individual
66
// Acked Updates that are stored for each session and re-stores them using the
67
// RangeIndex representation.
68
func MigrateAckedUpdates(sessionsPerTx int) func(kvdb.Backend) error {
3✔
69
        return func(db kvdb.Backend) error {
3✔
UNCOV
70
                log.Infof("Migrating the tower client db to move all Acked " +
×
UNCOV
71
                        "Updates to the new Range Index representation.")
×
UNCOV
72

×
UNCOV
73
                // Migrate the old acked-updates.
×
UNCOV
74
                err := migrateAckedUpdates(db, sessionsPerTx)
×
UNCOV
75
                if err != nil {
×
UNCOV
76
                        return fmt.Errorf("migration failed: %w", err)
×
UNCOV
77
                }
×
78

UNCOV
79
                log.Infof("Migrating old session acked updates finished, now " +
×
UNCOV
80
                        "checking the migration results...")
×
UNCOV
81

×
UNCOV
82
                // Before we can safety delete the old buckets, we perform a
×
UNCOV
83
                // check to make sure the sessions have been migrated as
×
UNCOV
84
                // expected.
×
UNCOV
85
                err = kvdb.View(db, validateMigration, func() {})
×
UNCOV
86
                if err != nil {
×
87
                        return fmt.Errorf("validate migration failed: %w", err)
×
88
                }
×
89

90
                // Delete old acked updates.
UNCOV
91
                err = kvdb.Update(db, deleteOldAckedUpdates, func() {})
×
UNCOV
92
                if err != nil {
×
93
                        return fmt.Errorf("failed to delete old acked "+
×
94
                                "updates: %w", err)
×
95
                }
×
96

UNCOV
97
                return nil
×
98
        }
99
}
100

101
// migrateAckedUpdates migrates the acked updates of each session in the
102
// wtclient db into the new RangeIndex form. This is done over multiple db
103
// transactions in order to prevent the migration from taking up too much RAM.
104
// The sessionsPerTx parameter can be used to set the maximum number of sessions
105
// that should be migrated per transaction.
UNCOV
106
func migrateAckedUpdates(db kvdb.Backend, sessionsPerTx int) error {
×
UNCOV
107
        // Get migration progress stats.
×
UNCOV
108
        total, migrated, err := logMigrationStats(db)
×
UNCOV
109
        if err != nil {
×
UNCOV
110
                return err
×
UNCOV
111
        }
×
UNCOV
112
        log.Infof("Total sessions=%d, migrated=%d", total, migrated)
×
UNCOV
113

×
UNCOV
114
        // Exit early if the old session acked updates have already been
×
UNCOV
115
        // migrated and deleted.
×
UNCOV
116
        if total == 0 {
×
UNCOV
117
                log.Info("Migration already finished!")
×
UNCOV
118
                return nil
×
UNCOV
119
        }
×
120

UNCOV
121
        var (
×
UNCOV
122
                finished bool
×
UNCOV
123
                startKey []byte
×
UNCOV
124
        )
×
UNCOV
125
        for {
×
UNCOV
126
                // Process the migration.
×
UNCOV
127
                err = kvdb.Update(db, func(tx kvdb.RwTx) error {
×
UNCOV
128
                        startKey, finished, err = processMigration(
×
UNCOV
129
                                tx, startKey, sessionsPerTx,
×
UNCOV
130
                        )
×
UNCOV
131

×
UNCOV
132
                        return err
×
UNCOV
133
                }, func() {})
×
UNCOV
134
                if err != nil {
×
135
                        return err
×
136
                }
×
137

UNCOV
138
                if finished {
×
139
                        break
×
140
                }
141

142
                // Each time we finished the above process, we'd read the stats
143
                // again to understand the current progress.
UNCOV
144
                total, migrated, err = logMigrationStats(db)
×
UNCOV
145
                if err != nil {
×
146
                        return err
×
147
                }
×
148

149
                // Calculate and log the progress if the progress is less than
150
                // one hundred percent.
UNCOV
151
                progress := float64(migrated) / float64(total) * 100
×
UNCOV
152
                if progress >= 100 {
×
UNCOV
153
                        break
×
154
                }
155

UNCOV
156
                log.Infof("Migration progress: %.3f%%, still have: %d",
×
UNCOV
157
                        progress, total-migrated)
×
158
        }
159

UNCOV
160
        return nil
×
161
}
162

UNCOV
163
func validateMigration(tx kvdb.RTx) error {
×
UNCOV
164
        mainSessionsBkt := tx.ReadBucket(cSessionBkt)
×
UNCOV
165
        if mainSessionsBkt == nil {
×
166
                return ErrUninitializedDB
×
167
        }
×
168

UNCOV
169
        chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
×
UNCOV
170
        if chanDetailsBkt == nil {
×
171
                return ErrUninitializedDB
×
172
        }
×
173

UNCOV
174
        return mainSessionsBkt.ForEach(func(sessID, _ []byte) error {
×
UNCOV
175
                // Get the bucket for this particular session.
×
UNCOV
176
                sessionBkt := mainSessionsBkt.NestedReadBucket(sessID)
×
UNCOV
177
                if sessionBkt == nil {
×
178
                        return ErrClientSessionNotFound
×
179
                }
×
180

181
                // Get the bucket where any old acked updates would be stored.
UNCOV
182
                oldAcksBucket := sessionBkt.NestedReadBucket(cSessionAcks)
×
UNCOV
183

×
UNCOV
184
                // Get the bucket where any new acked updates would be stored.
×
UNCOV
185
                newAcksBucket := sessionBkt.NestedReadBucket(
×
UNCOV
186
                        cSessionAckRangeIndex,
×
UNCOV
187
                )
×
UNCOV
188

×
UNCOV
189
                switch {
×
190
                // If both the old and new acked updates buckets are nil, then
191
                // we can safely skip this session.
192
                case oldAcksBucket == nil && newAcksBucket == nil:
×
193
                        return nil
×
194

195
                case oldAcksBucket == nil:
×
196
                        return fmt.Errorf("no old acks but do have new acks")
×
197

198
                case newAcksBucket == nil:
×
199
                        return fmt.Errorf("no new acks but have old acks")
×
200

UNCOV
201
                default:
×
202
                }
203

204
                // Collect acked ranges for this session.
UNCOV
205
                ackedRanges := make(map[uint64]*RangeIndex)
×
UNCOV
206
                err := newAcksBucket.ForEach(func(dbChanID, _ []byte) error {
×
UNCOV
207
                        rangeIndexBkt := newAcksBucket.NestedReadBucket(
×
UNCOV
208
                                dbChanID,
×
UNCOV
209
                        )
×
UNCOV
210
                        if rangeIndexBkt == nil {
×
211
                                return fmt.Errorf("no acked updates bucket "+
×
212
                                        "found for channel %x", dbChanID)
×
213
                        }
×
214

215
                        // Read acked ranges from new bucket.
UNCOV
216
                        ri, err := readRangeIndex(rangeIndexBkt)
×
UNCOV
217
                        if err != nil {
×
218
                                return err
×
219
                        }
×
220

UNCOV
221
                        dbChanIDNum, err := readBigSize(dbChanID)
×
UNCOV
222
                        if err != nil {
×
223
                                return err
×
224
                        }
×
225

UNCOV
226
                        ackedRanges[dbChanIDNum] = ri
×
UNCOV
227

×
UNCOV
228
                        return nil
×
229
                })
UNCOV
230
                if err != nil {
×
231
                        return err
×
232
                }
×
233

234
                // Now we will iterate through each of the old acked updates and
235
                // make sure that the update appears in the new bucket.
UNCOV
236
                return oldAcksBucket.ForEach(func(_, v []byte) error {
×
UNCOV
237
                        var backupID BackupID
×
UNCOV
238
                        err := backupID.Decode(bytes.NewReader(v))
×
UNCOV
239
                        if err != nil {
×
240
                                return err
×
241
                        }
×
242

UNCOV
243
                        dbChanID, _, err := getDBChanID(
×
UNCOV
244
                                chanDetailsBkt, backupID.ChanID,
×
UNCOV
245
                        )
×
UNCOV
246
                        if err != nil {
×
247
                                return err
×
248
                        }
×
249

UNCOV
250
                        index, ok := ackedRanges[dbChanID]
×
UNCOV
251
                        if !ok {
×
252
                                return fmt.Errorf("no index found for this " +
×
253
                                        "channel")
×
254
                        }
×
255

UNCOV
256
                        if !index.IsInIndex(backupID.CommitHeight) {
×
257
                                return fmt.Errorf("commit height not found " +
×
258
                                        "in index")
×
259
                        }
×
260

UNCOV
261
                        return nil
×
262
                })
263
        })
264
}
265

UNCOV
266
func readRangeIndex(rangesBkt kvdb.RBucket) (*RangeIndex, error) {
×
UNCOV
267
        ranges := make(map[uint64]uint64)
×
UNCOV
268
        err := rangesBkt.ForEach(func(k, v []byte) error {
×
UNCOV
269
                start, err := readBigSize(k)
×
UNCOV
270
                if err != nil {
×
271
                        return err
×
272
                }
×
273

UNCOV
274
                end, err := readBigSize(v)
×
UNCOV
275
                if err != nil {
×
276
                        return err
×
277
                }
×
278

UNCOV
279
                ranges[start] = end
×
UNCOV
280

×
UNCOV
281
                return nil
×
282
        })
UNCOV
283
        if err != nil {
×
284
                return nil, err
×
285
        }
×
286

UNCOV
287
        return NewRangeIndex(ranges, WithSerializeUint64Fn(writeBigSize))
×
288
}
289

UNCOV
290
func deleteOldAckedUpdates(tx kvdb.RwTx) error {
×
UNCOV
291
        mainSessionsBkt := tx.ReadWriteBucket(cSessionBkt)
×
UNCOV
292
        if mainSessionsBkt == nil {
×
293
                return ErrUninitializedDB
×
294
        }
×
295

UNCOV
296
        return mainSessionsBkt.ForEach(func(sessID, _ []byte) error {
×
UNCOV
297
                // Get the bucket for this particular session.
×
UNCOV
298
                sessionBkt := mainSessionsBkt.NestedReadWriteBucket(
×
UNCOV
299
                        sessID,
×
UNCOV
300
                )
×
UNCOV
301
                if sessionBkt == nil {
×
302
                        return ErrClientSessionNotFound
×
303
                }
×
304

305
                // Get the bucket where any old acked updates would be stored.
UNCOV
306
                oldAcksBucket := sessionBkt.NestedReadBucket(cSessionAcks)
×
UNCOV
307
                if oldAcksBucket == nil {
×
308
                        return nil
×
309
                }
×
310

311
                // Now that we have read everything that we need to from
312
                // the cSessionAcks sub-bucket, we can delete it.
UNCOV
313
                return sessionBkt.DeleteNestedBucket(cSessionAcks)
×
314
        })
315
}
316

317
// processMigration uses the given transaction to perform a maximum of
318
// sessionsPerTx session migrations. If startKey is non-nil, it is used to
319
// determine the first session to start the migration at. The first return
320
// item is the key of the last session that was migrated successfully and the
321
// boolean is true if there are no more sessions left to migrate.
322
func processMigration(tx kvdb.RwTx, startKey []byte, sessionsPerTx int) ([]byte,
UNCOV
323
        bool, error) {
×
UNCOV
324

×
UNCOV
325
        chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
×
UNCOV
326
        if chanDetailsBkt == nil {
×
327
                return nil, false, ErrUninitializedDB
×
328
        }
×
329

330
        // sessionCount keeps track of the number of sessions that have been
331
        // migrated under the current db transaction.
UNCOV
332
        var sessionCount int
×
UNCOV
333

×
UNCOV
334
        // migrateSessionCB is a callback function that calls migrateSession
×
UNCOV
335
        // in order to migrate a single session. Upon success, the sessionCount
×
UNCOV
336
        // is incremented and is then compared against sessionsPerTx to
×
UNCOV
337
        // determine if we should continue migrating more sessions in this db
×
UNCOV
338
        // transaction.
×
UNCOV
339
        migrateSessionCB := func(sessionBkt kvdb.RwBucket) error {
×
UNCOV
340
                err := migrateSession(chanDetailsBkt, sessionBkt)
×
UNCOV
341
                if err != nil {
×
342
                        return err
×
343
                }
×
344

UNCOV
345
                sessionCount++
×
UNCOV
346

×
UNCOV
347
                // If we have migrated sessionsPerTx sessions in this tx, then
×
UNCOV
348
                // we return errExit in order to signal that this tx should be
×
UNCOV
349
                // committed and the migration should be continued in a new
×
UNCOV
350
                // transaction.
×
UNCOV
351
                if sessionCount >= sessionsPerTx {
×
UNCOV
352
                        return errExit
×
UNCOV
353
                }
×
354

UNCOV
355
                return nil
×
356
        }
357

358
        // Starting at startKey, iterate over the sessions in the db and migrate
359
        // them until either all are migrated or until the errExit signal is
360
        // received.
UNCOV
361
        lastKey, err := sessionIterator(tx, startKey, migrateSessionCB)
×
UNCOV
362
        if err != nil && errors.Is(err, errExit) {
×
UNCOV
363
                return lastKey, false, nil
×
UNCOV
364
        } else if err != nil {
×
365
                return nil, false, err
×
366
        }
×
367

368
        // The migration is complete.
369
        return nil, true, nil
×
370
}
371

372
// migrateSession migrates a single session's acked-updates to the new
373
// RangeIndex form.
374
func migrateSession(chanDetailsBkt kvdb.RBucket,
UNCOV
375
        sessionBkt kvdb.RwBucket) error {
×
UNCOV
376

×
UNCOV
377
        // Get the existing cSessionAcks bucket. If there is no such bucket,
×
UNCOV
378
        // then there are no acked-updates to migrate for this session.
×
UNCOV
379
        sessionAcks := sessionBkt.NestedReadBucket(cSessionAcks)
×
UNCOV
380
        if sessionAcks == nil {
×
381
                return nil
×
382
        }
×
383

384
        // If there is already a new cSessionAckedRangeIndex bucket, then this
385
        // session has already been migrated.
UNCOV
386
        sessionAckRangesBkt := sessionBkt.NestedReadBucket(
×
UNCOV
387
                cSessionAckRangeIndex,
×
UNCOV
388
        )
×
UNCOV
389
        if sessionAckRangesBkt != nil {
×
UNCOV
390
                return nil
×
UNCOV
391
        }
×
392

393
        // Otherwise, we will iterate over each of the acked-updates, and we
394
        // will construct a new RangeIndex for each channel.
UNCOV
395
        m := make(map[ChannelID]*RangeIndex)
×
UNCOV
396
        if err := sessionAcks.ForEach(func(_, v []byte) error {
×
UNCOV
397
                var backupID BackupID
×
UNCOV
398
                err := backupID.Decode(bytes.NewReader(v))
×
UNCOV
399
                if err != nil {
×
400
                        return err
×
401
                }
×
402

UNCOV
403
                if _, ok := m[backupID.ChanID]; !ok {
×
UNCOV
404
                        index, err := NewRangeIndex(nil)
×
UNCOV
405
                        if err != nil {
×
406
                                return err
×
407
                        }
×
408

UNCOV
409
                        m[backupID.ChanID] = index
×
410
                }
411

UNCOV
412
                return m[backupID.ChanID].Add(backupID.CommitHeight, nil)
×
413
        }); err != nil {
×
414
                return err
×
415
        }
×
416

417
        // Create a new sub-bucket that will be used to store the new RangeIndex
418
        // representation of the acked updates.
UNCOV
419
        ackRangeBkt, err := sessionBkt.CreateBucket(cSessionAckRangeIndex)
×
UNCOV
420
        if err != nil {
×
421
                return err
×
422
        }
×
423

424
        // Iterate over each of the new range indexes that we will add for this
425
        // session.
UNCOV
426
        for chanID, rangeIndex := range m {
×
UNCOV
427
                // Get db chanID.
×
UNCOV
428
                chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
×
UNCOV
429
                if chanDetails == nil {
×
430
                        return ErrCorruptChanDetails
×
431
                }
×
432

433
                // Create a sub-bucket for this channel using the db-assigned ID
434
                // for the channel.
UNCOV
435
                dbChanID := chanDetails.Get(cChanDBID)
×
UNCOV
436
                chanAcksBkt, err := ackRangeBkt.CreateBucket(dbChanID)
×
UNCOV
437
                if err != nil {
×
438
                        return err
×
439
                }
×
440

441
                // Iterate over the range pairs that we need to add to the DB.
UNCOV
442
                for k, v := range rangeIndex.GetAllRanges() {
×
UNCOV
443
                        start, err := writeBigSize(k)
×
UNCOV
444
                        if err != nil {
×
445
                                return err
×
446
                        }
×
447

UNCOV
448
                        end, err := writeBigSize(v)
×
UNCOV
449
                        if err != nil {
×
450
                                return err
×
451
                        }
×
452

UNCOV
453
                        err = chanAcksBkt.Put(start, end)
×
UNCOV
454
                        if err != nil {
×
455
                                return err
×
456
                        }
×
457
                }
458
        }
459

UNCOV
460
        return nil
×
461
}
462

463
// logMigrationStats reads the buckets to provide stats over current migration
464
// progress. The returned values are the numbers of total records and already
465
// migrated records.
UNCOV
466
func logMigrationStats(db kvdb.Backend) (uint64, uint64, error) {
×
UNCOV
467
        var (
×
UNCOV
468
                err        error
×
UNCOV
469
                total      uint64
×
UNCOV
470
                unmigrated uint64
×
UNCOV
471
        )
×
UNCOV
472

×
UNCOV
473
        err = kvdb.View(db, func(tx kvdb.RTx) error {
×
UNCOV
474
                total, unmigrated, err = getMigrationStats(tx)
×
UNCOV
475

×
UNCOV
476
                return err
×
UNCOV
477
        }, func() {})
×
478

UNCOV
479
        log.Debugf("Total sessions=%d, unmigrated=%d", total, unmigrated)
×
UNCOV
480

×
UNCOV
481
        return total, total - unmigrated, err
×
482
}
483

484
// getMigrationStats iterates over all sessions. It counts the total number of
485
// sessions as well as the total number of unmigrated sessions.
UNCOV
486
func getMigrationStats(tx kvdb.RTx) (uint64, uint64, error) {
×
UNCOV
487
        var (
×
UNCOV
488
                total      uint64
×
UNCOV
489
                unmigrated uint64
×
UNCOV
490
        )
×
UNCOV
491

×
UNCOV
492
        // Get sessions bucket.
×
UNCOV
493
        mainSessionsBkt := tx.ReadBucket(cSessionBkt)
×
UNCOV
494
        if mainSessionsBkt == nil {
×
495
                return 0, 0, ErrUninitializedDB
×
496
        }
×
497

498
        // Iterate over each session ID in the bucket.
UNCOV
499
        err := mainSessionsBkt.ForEach(func(sessID, _ []byte) error {
×
UNCOV
500
                // Get the bucket for this particular session.
×
UNCOV
501
                sessionBkt := mainSessionsBkt.NestedReadBucket(sessID)
×
UNCOV
502
                if sessionBkt == nil {
×
UNCOV
503
                        return ErrClientSessionNotFound
×
UNCOV
504
                }
×
505

UNCOV
506
                total++
×
UNCOV
507

×
UNCOV
508
                // Get the cSessionAckRangeIndex bucket.
×
UNCOV
509
                sessionAcksBkt := sessionBkt.NestedReadBucket(cSessionAcks)
×
UNCOV
510

×
UNCOV
511
                // Get the cSessionAckRangeIndex bucket.
×
UNCOV
512
                sessionAckRangesBkt := sessionBkt.NestedReadBucket(
×
UNCOV
513
                        cSessionAckRangeIndex,
×
UNCOV
514
                )
×
UNCOV
515

×
UNCOV
516
                // If both buckets do not exist, then this session is empty and
×
UNCOV
517
                // does not need to be migrated.
×
UNCOV
518
                if sessionAckRangesBkt == nil && sessionAcksBkt == nil {
×
519
                        return nil
×
520
                }
×
521

522
                // If the sessionAckRangesBkt is not nil, then the session has
523
                // already been migrated.
UNCOV
524
                if sessionAckRangesBkt != nil {
×
UNCOV
525
                        return nil
×
UNCOV
526
                }
×
527

528
                // Else the session has not yet been migrated.
UNCOV
529
                unmigrated++
×
UNCOV
530

×
UNCOV
531
                return nil
×
532
        })
UNCOV
533
        if err != nil {
×
UNCOV
534
                return 0, 0, err
×
UNCOV
535
        }
×
536

UNCOV
537
        return total, unmigrated, nil
×
538
}
539

540
// getDBChanID returns the db-assigned channel ID for the given real channel ID.
541
// It returns both the uint64 and byte representation.
542
func getDBChanID(chanDetailsBkt kvdb.RBucket, chanID ChannelID) (uint64,
UNCOV
543
        []byte, error) {
×
UNCOV
544

×
UNCOV
545
        chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
×
UNCOV
546
        if chanDetails == nil {
×
547
                return 0, nil, ErrChannelNotRegistered
×
548
        }
×
549

UNCOV
550
        idBytes := chanDetails.Get(cChanDBID)
×
UNCOV
551
        if len(idBytes) == 0 {
×
552
                return 0, nil, fmt.Errorf("no db-assigned ID found for "+
×
553
                        "channel ID %s", chanID)
×
554
        }
×
555

UNCOV
556
        id, err := readBigSize(idBytes)
×
UNCOV
557
        if err != nil {
×
558
                return 0, nil, err
×
559
        }
×
560

UNCOV
561
        return id, idBytes, nil
×
562
}
563

564
// callback defines a type that's used by the sessionIterator.
565
type callback func(bkt kvdb.RwBucket) error
566

567
// sessionIterator is a helper function that iterates over the main sessions
568
// bucket and performs the callback function on each individual session. If a
569
// seeker is specified, it will move the cursor to the given position otherwise
570
// it will start from the first item.
UNCOV
571
func sessionIterator(tx kvdb.RwTx, seeker []byte, cb callback) ([]byte, error) {
×
UNCOV
572
        // Get sessions bucket.
×
UNCOV
573
        mainSessionsBkt := tx.ReadWriteBucket(cSessionBkt)
×
UNCOV
574
        if mainSessionsBkt == nil {
×
575
                return nil, ErrUninitializedDB
×
576
        }
×
577

UNCOV
578
        c := mainSessionsBkt.ReadCursor()
×
UNCOV
579
        k, _ := c.First()
×
UNCOV
580

×
UNCOV
581
        // Move the cursor to the specified position if seeker is non-nil.
×
UNCOV
582
        if seeker != nil {
×
UNCOV
583
                k, _ = c.Seek(seeker)
×
UNCOV
584
        }
×
585

586
        // Start the iteration and exit on condition.
UNCOV
587
        for k := k; k != nil; k, _ = c.Next() {
×
UNCOV
588
                // Get the bucket for this particular session.
×
UNCOV
589
                bkt := mainSessionsBkt.NestedReadWriteBucket(k)
×
UNCOV
590
                if bkt == nil {
×
591
                        return nil, ErrClientSessionNotFound
×
592
                }
×
593

594
                // Call the callback function with the session's bucket.
UNCOV
595
                if err := cb(bkt); err != nil {
×
UNCOV
596
                        // return k, err
×
UNCOV
597
                        lastIndex := make([]byte, len(k))
×
UNCOV
598
                        copy(lastIndex, k)
×
UNCOV
599
                        return lastIndex, err
×
UNCOV
600
                }
×
601
        }
602

603
        return nil, nil
×
604
}
605

606
// writeBigSize will encode the given uint64 as a BigSize byte slice.
UNCOV
607
func writeBigSize(i uint64) ([]byte, error) {
×
UNCOV
608
        var b bytes.Buffer
×
UNCOV
609
        err := tlv.WriteVarInt(&b, i, &[8]byte{})
×
UNCOV
610
        if err != nil {
×
611
                return nil, err
×
612
        }
×
613

UNCOV
614
        return b.Bytes(), nil
×
615
}
616

617
// readBigSize converts the given byte slice into a uint64 and assumes that the
618
// bytes slice is using BigSize encoding.
UNCOV
619
func readBigSize(b []byte) (uint64, error) {
×
UNCOV
620
        r := bytes.NewReader(b)
×
UNCOV
621
        i, err := tlv.ReadVarInt(r, &[8]byte{})
×
UNCOV
622
        if err != nil {
×
623
                return 0, err
×
624
        }
×
625

UNCOV
626
        return i, nil
×
627
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc