• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15736109134

18 Jun 2025 02:46PM UTC coverage: 58.197% (-10.1%) from 68.248%
15736109134

Pull #9752

github

web-flow
Merge d2634a68c into 31c74f20f
Pull Request #9752: routerrpc: reject payment to invoice that don't have payment secret or blinded paths

6 of 13 new or added lines in 2 files covered. (46.15%)

28331 existing lines in 455 files now uncovered.

97860 of 168153 relevant lines covered (58.2%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.52
/watchtower/wtdb/migration4/client_db.go
1
package migration4
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8

9
        "github.com/lightningnetwork/lnd/kvdb"
10
        "github.com/lightningnetwork/lnd/tlv"
11
)
12

13
var (
14
        // cChanDetailsBkt is a top-level bucket storing:
15
        //   channel-id => cChannelSummary -> encoded ClientChanSummary.
16
        //                 => cChanDBID -> db-assigned-id
17
        cChanDetailsBkt = []byte("client-channel-detail-bucket")
18

19
        // cChanDBID is a key used in the cChanDetailsBkt to store the
20
        // db-assigned-id of a channel.
21
        cChanDBID = []byte("client-channel-db-id")
22

23
        // cSessionBkt is a top-level bucket storing:
24
        //   session-id => cSessionBody -> encoded ClientSessionBody
25
        //              => cSessionCommits => seqnum -> encoded CommittedUpdate
26
        //              => cSessionAcks => seqnum -> encoded BackupID
27
        cSessionBkt = []byte("client-session-bucket")
28

29
        // cSessionAcks is a sub-bucket of cSessionBkt storing:
30
        //    seqnum -> encoded BackupID.
31
        cSessionAcks = []byte("client-session-acks")
32

33
        // cSessionAckRangeIndex is a sub-bucket of cSessionBkt storing:
34
        //    chan-id => start -> end
35
        cSessionAckRangeIndex = []byte("client-session-ack-range-index")
36

37
        // ErrUninitializedDB signals that top-level buckets for the database
38
        // have not been initialized.
39
        ErrUninitializedDB = errors.New("db not initialized")
40

41
        // ErrClientSessionNotFound signals that the requested client session
42
        // was not found in the database.
43
        ErrClientSessionNotFound = errors.New("client session not found")
44

45
        // ErrCorruptChanDetails signals that the clients channel detail's
46
        // on-disk structure deviates from what is expected.
47
        ErrCorruptChanDetails = errors.New("channel details corrupted")
48

49
        // ErrChannelNotRegistered signals a channel has not yet been registered
50
        // in the client database.
51
        ErrChannelNotRegistered = errors.New("channel not registered")
52

53
        // byteOrder is the default endianness used when serializing integers.
54
        byteOrder = binary.BigEndian
55

56
        // errExit is an error used to signal that the sessionIterator should
57
        // exit.
58
        errExit = errors.New("the exit condition has been met")
59
)
60

61
// DefaultSessionsPerTx is the default number of sessions that should be
62
// migrated per db transaction.
63
const DefaultSessionsPerTx = 5000
64

65
// MigrateAckedUpdates migrates the tower client DB. It takes the individual
66
// Acked Updates that are stored for each session and re-stores them using the
67
// RangeIndex representation.
68
func MigrateAckedUpdates(sessionsPerTx int) func(kvdb.Backend) error {
3✔
69
        return func(db kvdb.Backend) error {
3✔
UNCOV
70
                log.Infof("Migrating the tower client db to move all Acked " +
×
UNCOV
71
                        "Updates to the new Range Index representation.")
×
UNCOV
72

×
UNCOV
73
                // Migrate the old acked-updates.
×
UNCOV
74
                err := migrateAckedUpdates(db, sessionsPerTx)
×
UNCOV
75
                if err != nil {
×
UNCOV
76
                        return fmt.Errorf("migration failed: %w", err)
×
UNCOV
77
                }
×
78

UNCOV
79
                log.Infof("Migrating old session acked updates finished, now " +
×
UNCOV
80
                        "checking the migration results...")
×
UNCOV
81

×
UNCOV
82
                // Before we can safety delete the old buckets, we perform a
×
UNCOV
83
                // check to make sure the sessions have been migrated as
×
UNCOV
84
                // expected.
×
UNCOV
85
                err = kvdb.View(db, validateMigration, func() {})
×
UNCOV
86
                if err != nil {
×
87
                        return fmt.Errorf("validate migration failed: %w", err)
×
88
                }
×
89

90
                // Delete old acked updates.
UNCOV
91
                err = kvdb.Update(db, deleteOldAckedUpdates, func() {})
×
UNCOV
92
                if err != nil {
×
93
                        return fmt.Errorf("failed to delete old acked "+
×
94
                                "updates: %w", err)
×
95
                }
×
96

UNCOV
97
                return nil
×
98
        }
99
}
100

101
// migrateAckedUpdates migrates the acked updates of each session in the
102
// wtclient db into the new RangeIndex form. This is done over multiple db
103
// transactions in order to prevent the migration from taking up too much RAM.
104
// The sessionsPerTx parameter can be used to set the maximum number of sessions
105
// that should be migrated per transaction.
UNCOV
106
func migrateAckedUpdates(db kvdb.Backend, sessionsPerTx int) error {
×
UNCOV
107
        // Get migration progress stats.
×
UNCOV
108
        total, migrated, err := logMigrationStats(db)
×
UNCOV
109
        if err != nil {
×
UNCOV
110
                return err
×
UNCOV
111
        }
×
UNCOV
112
        log.Infof("Total sessions=%d, migrated=%d", total, migrated)
×
UNCOV
113

×
UNCOV
114
        // Exit early if the old session acked updates have already been
×
UNCOV
115
        // migrated and deleted.
×
UNCOV
116
        if total == 0 {
×
UNCOV
117
                log.Info("Migration already finished!")
×
UNCOV
118
                return nil
×
UNCOV
119
        }
×
120

UNCOV
121
        var (
×
UNCOV
122
                finished bool
×
UNCOV
123
                startKey []byte
×
UNCOV
124
        )
×
UNCOV
125
        for {
×
UNCOV
126
                // Process the migration.
×
UNCOV
127
                err = kvdb.Update(db, func(tx kvdb.RwTx) error {
×
UNCOV
128
                        startKey, finished, err = processMigration(
×
UNCOV
129
                                tx, startKey, sessionsPerTx,
×
UNCOV
130
                        )
×
UNCOV
131

×
UNCOV
132
                        return err
×
UNCOV
133
                }, func() {})
×
UNCOV
134
                if err != nil {
×
135
                        return err
×
136
                }
×
137

UNCOV
138
                if finished {
×
139
                        break
×
140
                }
141

142
                // Each time we finished the above process, we'd read the stats
143
                // again to understand the current progress.
UNCOV
144
                total, migrated, err = logMigrationStats(db)
×
UNCOV
145
                if err != nil {
×
146
                        return err
×
147
                }
×
148

149
                // Calculate and log the progress if the progress is less than
150
                // one hundred percent.
UNCOV
151
                progress := float64(migrated) / float64(total) * 100
×
UNCOV
152
                if progress >= 100 {
×
UNCOV
153
                        break
×
154
                }
155

UNCOV
156
                log.Infof("Migration progress: %.3f%%, still have: %d",
×
UNCOV
157
                        progress, total-migrated)
×
158
        }
159

UNCOV
160
        return nil
×
161
}
162

UNCOV
163
func validateMigration(tx kvdb.RTx) error {
×
UNCOV
164
        mainSessionsBkt := tx.ReadBucket(cSessionBkt)
×
UNCOV
165
        if mainSessionsBkt == nil {
×
166
                return ErrUninitializedDB
×
167
        }
×
168

UNCOV
169
        chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
×
UNCOV
170
        if chanDetailsBkt == nil {
×
171
                return ErrUninitializedDB
×
172
        }
×
173

UNCOV
174
        return mainSessionsBkt.ForEach(func(sessID, _ []byte) error {
×
UNCOV
175
                // Get the bucket for this particular session.
×
UNCOV
176
                sessionBkt := mainSessionsBkt.NestedReadBucket(sessID)
×
UNCOV
177
                if sessionBkt == nil {
×
178
                        return ErrClientSessionNotFound
×
179
                }
×
180

181
                // Get the bucket where any old acked updates would be stored.
UNCOV
182
                oldAcksBucket := sessionBkt.NestedReadBucket(cSessionAcks)
×
UNCOV
183

×
UNCOV
184
                // Get the bucket where any new acked updates would be stored.
×
UNCOV
185
                newAcksBucket := sessionBkt.NestedReadBucket(
×
UNCOV
186
                        cSessionAckRangeIndex,
×
UNCOV
187
                )
×
UNCOV
188

×
UNCOV
189
                switch {
×
190
                // If both the old and new acked updates buckets are nil, then
191
                // we can safely skip this session.
192
                case oldAcksBucket == nil && newAcksBucket == nil:
×
193
                        return nil
×
194

195
                case oldAcksBucket == nil:
×
196
                        return fmt.Errorf("no old acks but do have new acks")
×
197

198
                case newAcksBucket == nil:
×
199
                        return fmt.Errorf("no new acks but have old acks")
×
200

UNCOV
201
                default:
×
202
                }
203

204
                // Collect acked ranges for this session.
UNCOV
205
                ackedRanges := make(map[uint64]*RangeIndex)
×
UNCOV
206
                err := newAcksBucket.ForEach(func(dbChanID, _ []byte) error {
×
UNCOV
207
                        rangeIndexBkt := newAcksBucket.NestedReadBucket(
×
UNCOV
208
                                dbChanID,
×
UNCOV
209
                        )
×
UNCOV
210
                        if rangeIndexBkt == nil {
×
211
                                return fmt.Errorf("no acked updates bucket "+
×
212
                                        "found for channel %x", dbChanID)
×
213
                        }
×
214

215
                        // Read acked ranges from new bucket.
UNCOV
216
                        ri, err := readRangeIndex(rangeIndexBkt)
×
UNCOV
217
                        if err != nil {
×
218
                                return err
×
219
                        }
×
220

UNCOV
221
                        dbChanIDNum, err := readBigSize(dbChanID)
×
UNCOV
222
                        if err != nil {
×
223
                                return err
×
224
                        }
×
225

UNCOV
226
                        ackedRanges[dbChanIDNum] = ri
×
UNCOV
227

×
UNCOV
228
                        return nil
×
229
                })
UNCOV
230
                if err != nil {
×
231
                        return err
×
232
                }
×
233

234
                // Now we will iterate through each of the old acked updates and
235
                // make sure that the update appears in the new bucket.
UNCOV
236
                return oldAcksBucket.ForEach(func(_, v []byte) error {
×
UNCOV
237
                        var backupID BackupID
×
UNCOV
238
                        err := backupID.Decode(bytes.NewReader(v))
×
UNCOV
239
                        if err != nil {
×
240
                                return err
×
241
                        }
×
242

UNCOV
243
                        dbChanID, _, err := getDBChanID(
×
UNCOV
244
                                chanDetailsBkt, backupID.ChanID,
×
UNCOV
245
                        )
×
UNCOV
246
                        if err != nil {
×
247
                                return err
×
248
                        }
×
249

UNCOV
250
                        index, ok := ackedRanges[dbChanID]
×
UNCOV
251
                        if !ok {
×
252
                                return fmt.Errorf("no index found for this " +
×
253
                                        "channel")
×
254
                        }
×
255

UNCOV
256
                        if !index.IsInIndex(backupID.CommitHeight) {
×
257
                                return fmt.Errorf("commit height not found " +
×
258
                                        "in index")
×
259
                        }
×
260

UNCOV
261
                        return nil
×
262
                })
263
        })
264
}
265

UNCOV
266
func readRangeIndex(rangesBkt kvdb.RBucket) (*RangeIndex, error) {
×
UNCOV
267
        ranges := make(map[uint64]uint64)
×
UNCOV
268
        err := rangesBkt.ForEach(func(k, v []byte) error {
×
UNCOV
269
                start, err := readBigSize(k)
×
UNCOV
270
                if err != nil {
×
271
                        return err
×
272
                }
×
273

UNCOV
274
                end, err := readBigSize(v)
×
UNCOV
275
                if err != nil {
×
276
                        return err
×
277
                }
×
278

UNCOV
279
                ranges[start] = end
×
UNCOV
280

×
UNCOV
281
                return nil
×
282
        })
UNCOV
283
        if err != nil {
×
284
                return nil, err
×
285
        }
×
286

UNCOV
287
        return NewRangeIndex(ranges, WithSerializeUint64Fn(writeBigSize))
×
288
}
289

UNCOV
290
func deleteOldAckedUpdates(tx kvdb.RwTx) error {
×
UNCOV
291
        mainSessionsBkt := tx.ReadWriteBucket(cSessionBkt)
×
UNCOV
292
        if mainSessionsBkt == nil {
×
293
                return ErrUninitializedDB
×
294
        }
×
295

UNCOV
296
        return mainSessionsBkt.ForEach(func(sessID, _ []byte) error {
×
UNCOV
297
                // Get the bucket for this particular session.
×
UNCOV
298
                sessionBkt := mainSessionsBkt.NestedReadWriteBucket(
×
UNCOV
299
                        sessID,
×
UNCOV
300
                )
×
UNCOV
301
                if sessionBkt == nil {
×
302
                        return ErrClientSessionNotFound
×
303
                }
×
304

305
                // Get the bucket where any old acked updates would be stored.
UNCOV
306
                oldAcksBucket := sessionBkt.NestedReadBucket(cSessionAcks)
×
UNCOV
307
                if oldAcksBucket == nil {
×
308
                        return nil
×
309
                }
×
310

311
                // Now that we have read everything that we need to from
312
                // the cSessionAcks sub-bucket, we can delete it.
UNCOV
313
                return sessionBkt.DeleteNestedBucket(cSessionAcks)
×
314
        })
315
}
316

317
// processMigration uses the given transaction to perform a maximum of
318
// sessionsPerTx session migrations. If startKey is non-nil, it is used to
319
// determine the first session to start the migration at. The first return
320
// item is the key of the last session that was migrated successfully and the
321
// boolean is true if there are no more sessions left to migrate.
322
func processMigration(tx kvdb.RwTx, startKey []byte, sessionsPerTx int) ([]byte,
UNCOV
323
        bool, error) {
×
UNCOV
324

×
UNCOV
325
        chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
×
UNCOV
326
        if chanDetailsBkt == nil {
×
327
                return nil, false, ErrUninitializedDB
×
328
        }
×
329

330
        // sessionCount keeps track of the number of sessions that have been
331
        // migrated under the current db transaction.
UNCOV
332
        var sessionCount int
×
UNCOV
333

×
UNCOV
334
        // migrateSessionCB is a callback function that calls migrateSession
×
UNCOV
335
        // in order to migrate a single session. Upon success, the sessionCount
×
UNCOV
336
        // is incremented and is then compared against sessionsPerTx to
×
UNCOV
337
        // determine if we should continue migrating more sessions in this db
×
UNCOV
338
        // transaction.
×
UNCOV
339
        migrateSessionCB := func(sessionBkt kvdb.RwBucket) error {
×
UNCOV
340
                err := migrateSession(chanDetailsBkt, sessionBkt)
×
UNCOV
341
                if err != nil {
×
342
                        return err
×
343
                }
×
344

UNCOV
345
                sessionCount++
×
UNCOV
346

×
UNCOV
347
                // If we have migrated sessionsPerTx sessions in this tx, then
×
UNCOV
348
                // we return errExit in order to signal that this tx should be
×
UNCOV
349
                // committed and the migration should be continued in a new
×
UNCOV
350
                // transaction.
×
UNCOV
351
                if sessionCount >= sessionsPerTx {
×
UNCOV
352
                        return errExit
×
UNCOV
353
                }
×
354

UNCOV
355
                return nil
×
356
        }
357

358
        // Starting at startKey, iterate over the sessions in the db and migrate
359
        // them until either all are migrated or until the errExit signal is
360
        // received.
UNCOV
361
        lastKey, err := sessionIterator(tx, startKey, migrateSessionCB)
×
UNCOV
362
        if err != nil && errors.Is(err, errExit) {
×
UNCOV
363
                return lastKey, false, nil
×
UNCOV
364
        } else if err != nil {
×
365
                return nil, false, err
×
366
        }
×
367

368
        // The migration is complete.
369
        return nil, true, nil
×
370
}
371

372
// migrateSession migrates a single session's acked-updates to the new
373
// RangeIndex form.
374
func migrateSession(chanDetailsBkt kvdb.RBucket,
UNCOV
375
        sessionBkt kvdb.RwBucket) error {
×
UNCOV
376

×
UNCOV
377
        // Get the existing cSessionAcks bucket. If there is no such bucket,
×
UNCOV
378
        // then there are no acked-updates to migrate for this session.
×
UNCOV
379
        sessionAcks := sessionBkt.NestedReadBucket(cSessionAcks)
×
UNCOV
380
        if sessionAcks == nil {
×
381
                return nil
×
382
        }
×
383

384
        // If there is already a new cSessionAckedRangeIndex bucket, then this
385
        // session has already been migrated.
UNCOV
386
        sessionAckRangesBkt := sessionBkt.NestedReadBucket(
×
UNCOV
387
                cSessionAckRangeIndex,
×
UNCOV
388
        )
×
UNCOV
389
        if sessionAckRangesBkt != nil {
×
UNCOV
390
                return nil
×
UNCOV
391
        }
×
392

393
        // Otherwise, we will iterate over each of the acked-updates, and we
394
        // will construct a new RangeIndex for each channel.
UNCOV
395
        m := make(map[ChannelID]*RangeIndex)
×
UNCOV
396
        if err := sessionAcks.ForEach(func(_, v []byte) error {
×
UNCOV
397
                var backupID BackupID
×
UNCOV
398
                err := backupID.Decode(bytes.NewReader(v))
×
UNCOV
399
                if err != nil {
×
400
                        return err
×
401
                }
×
402

UNCOV
403
                if _, ok := m[backupID.ChanID]; !ok {
×
UNCOV
404
                        index, err := NewRangeIndex(nil)
×
UNCOV
405
                        if err != nil {
×
406
                                return err
×
407
                        }
×
408

UNCOV
409
                        m[backupID.ChanID] = index
×
410
                }
411

UNCOV
412
                return m[backupID.ChanID].Add(backupID.CommitHeight, nil)
×
413
        }); err != nil {
×
414
                return err
×
415
        }
×
416

417
        // Create a new sub-bucket that will be used to store the new RangeIndex
418
        // representation of the acked updates.
UNCOV
419
        ackRangeBkt, err := sessionBkt.CreateBucket(cSessionAckRangeIndex)
×
UNCOV
420
        if err != nil {
×
421
                return err
×
422
        }
×
423

424
        // Iterate over each of the new range indexes that we will add for this
425
        // session.
UNCOV
426
        for chanID, rangeIndex := range m {
×
UNCOV
427
                // Get db chanID.
×
UNCOV
428
                chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
×
UNCOV
429
                if chanDetails == nil {
×
430
                        return ErrCorruptChanDetails
×
431
                }
×
432

433
                // Create a sub-bucket for this channel using the db-assigned ID
434
                // for the channel.
UNCOV
435
                dbChanID := chanDetails.Get(cChanDBID)
×
UNCOV
436
                chanAcksBkt, err := ackRangeBkt.CreateBucket(dbChanID)
×
UNCOV
437
                if err != nil {
×
438
                        return err
×
439
                }
×
440

441
                // Iterate over the range pairs that we need to add to the DB.
UNCOV
442
                for k, v := range rangeIndex.GetAllRanges() {
×
UNCOV
443
                        start, err := writeBigSize(k)
×
UNCOV
444
                        if err != nil {
×
445
                                return err
×
446
                        }
×
447

UNCOV
448
                        end, err := writeBigSize(v)
×
UNCOV
449
                        if err != nil {
×
450
                                return err
×
451
                        }
×
452

UNCOV
453
                        err = chanAcksBkt.Put(start, end)
×
UNCOV
454
                        if err != nil {
×
455
                                return err
×
456
                        }
×
457
                }
458
        }
459

UNCOV
460
        return nil
×
461
}
462

463
// logMigrationStats reads the buckets to provide stats over current migration
464
// progress. The returned values are the numbers of total records and already
465
// migrated records.
UNCOV
466
func logMigrationStats(db kvdb.Backend) (uint64, uint64, error) {
×
UNCOV
467
        var (
×
UNCOV
468
                err        error
×
UNCOV
469
                total      uint64
×
UNCOV
470
                unmigrated uint64
×
UNCOV
471
        )
×
UNCOV
472

×
UNCOV
473
        err = kvdb.View(db, func(tx kvdb.RTx) error {
×
UNCOV
474
                total, unmigrated, err = getMigrationStats(tx)
×
UNCOV
475

×
UNCOV
476
                return err
×
UNCOV
477
        }, func() {})
×
478

UNCOV
479
        log.Debugf("Total sessions=%d, unmigrated=%d", total, unmigrated)
×
UNCOV
480

×
UNCOV
481
        return total, total - unmigrated, err
×
482
}
483

484
// getMigrationStats iterates over all sessions. It counts the total number of
485
// sessions as well as the total number of unmigrated sessions.
UNCOV
486
func getMigrationStats(tx kvdb.RTx) (uint64, uint64, error) {
×
UNCOV
487
        var (
×
UNCOV
488
                total      uint64
×
UNCOV
489
                unmigrated uint64
×
UNCOV
490
        )
×
UNCOV
491

×
UNCOV
492
        // Get sessions bucket.
×
UNCOV
493
        mainSessionsBkt := tx.ReadBucket(cSessionBkt)
×
UNCOV
494
        if mainSessionsBkt == nil {
×
495
                return 0, 0, ErrUninitializedDB
×
496
        }
×
497

498
        // Iterate over each session ID in the bucket.
UNCOV
499
        err := mainSessionsBkt.ForEach(func(sessID, _ []byte) error {
×
UNCOV
500
                // Get the bucket for this particular session.
×
UNCOV
501
                sessionBkt := mainSessionsBkt.NestedReadBucket(sessID)
×
UNCOV
502
                if sessionBkt == nil {
×
UNCOV
503
                        return ErrClientSessionNotFound
×
UNCOV
504
                }
×
505

UNCOV
506
                total++
×
UNCOV
507

×
UNCOV
508
                // Get the cSessionAckRangeIndex bucket.
×
UNCOV
509
                sessionAcksBkt := sessionBkt.NestedReadBucket(cSessionAcks)
×
UNCOV
510

×
UNCOV
511
                // Get the cSessionAckRangeIndex bucket.
×
UNCOV
512
                sessionAckRangesBkt := sessionBkt.NestedReadBucket(
×
UNCOV
513
                        cSessionAckRangeIndex,
×
UNCOV
514
                )
×
UNCOV
515

×
UNCOV
516
                // If both buckets do not exist, then this session is empty and
×
UNCOV
517
                // does not need to be migrated.
×
UNCOV
518
                if sessionAckRangesBkt == nil && sessionAcksBkt == nil {
×
519
                        return nil
×
520
                }
×
521

522
                // If the sessionAckRangesBkt is not nil, then the session has
523
                // already been migrated.
UNCOV
524
                if sessionAckRangesBkt != nil {
×
UNCOV
525
                        return nil
×
UNCOV
526
                }
×
527

528
                // Else the session has not yet been migrated.
UNCOV
529
                unmigrated++
×
UNCOV
530

×
UNCOV
531
                return nil
×
532
        })
UNCOV
533
        if err != nil {
×
UNCOV
534
                return 0, 0, err
×
UNCOV
535
        }
×
536

UNCOV
537
        return total, unmigrated, nil
×
538
}
539

540
// getDBChanID returns the db-assigned channel ID for the given real channel ID.
541
// It returns both the uint64 and byte representation.
542
func getDBChanID(chanDetailsBkt kvdb.RBucket, chanID ChannelID) (uint64,
UNCOV
543
        []byte, error) {
×
UNCOV
544

×
UNCOV
545
        chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
×
UNCOV
546
        if chanDetails == nil {
×
547
                return 0, nil, ErrChannelNotRegistered
×
548
        }
×
549

UNCOV
550
        idBytes := chanDetails.Get(cChanDBID)
×
UNCOV
551
        if len(idBytes) == 0 {
×
552
                return 0, nil, fmt.Errorf("no db-assigned ID found for "+
×
553
                        "channel ID %s", chanID)
×
554
        }
×
555

UNCOV
556
        id, err := readBigSize(idBytes)
×
UNCOV
557
        if err != nil {
×
558
                return 0, nil, err
×
559
        }
×
560

UNCOV
561
        return id, idBytes, nil
×
562
}
563

564
// callback defines a type that's used by the sessionIterator.
565
type callback func(bkt kvdb.RwBucket) error
566

567
// sessionIterator is a helper function that iterates over the main sessions
568
// bucket and performs the callback function on each individual session. If a
569
// seeker is specified, it will move the cursor to the given position otherwise
570
// it will start from the first item.
UNCOV
571
func sessionIterator(tx kvdb.RwTx, seeker []byte, cb callback) ([]byte, error) {
×
UNCOV
572
        // Get sessions bucket.
×
UNCOV
573
        mainSessionsBkt := tx.ReadWriteBucket(cSessionBkt)
×
UNCOV
574
        if mainSessionsBkt == nil {
×
575
                return nil, ErrUninitializedDB
×
576
        }
×
577

UNCOV
578
        c := mainSessionsBkt.ReadCursor()
×
UNCOV
579
        k, _ := c.First()
×
UNCOV
580

×
UNCOV
581
        // Move the cursor to the specified position if seeker is non-nil.
×
UNCOV
582
        if seeker != nil {
×
UNCOV
583
                k, _ = c.Seek(seeker)
×
UNCOV
584
        }
×
585

586
        // Start the iteration and exit on condition.
UNCOV
587
        for k := k; k != nil; k, _ = c.Next() {
×
UNCOV
588
                // Get the bucket for this particular session.
×
UNCOV
589
                bkt := mainSessionsBkt.NestedReadWriteBucket(k)
×
UNCOV
590
                if bkt == nil {
×
591
                        return nil, ErrClientSessionNotFound
×
592
                }
×
593

594
                // Call the callback function with the session's bucket.
UNCOV
595
                if err := cb(bkt); err != nil {
×
UNCOV
596
                        // return k, err
×
UNCOV
597
                        lastIndex := make([]byte, len(k))
×
UNCOV
598
                        copy(lastIndex, k)
×
UNCOV
599
                        return lastIndex, err
×
UNCOV
600
                }
×
601
        }
602

603
        return nil, nil
×
604
}
605

606
// writeBigSize will encode the given uint64 as a BigSize byte slice.
UNCOV
607
func writeBigSize(i uint64) ([]byte, error) {
×
UNCOV
608
        var b bytes.Buffer
×
UNCOV
609
        err := tlv.WriteVarInt(&b, i, &[8]byte{})
×
UNCOV
610
        if err != nil {
×
611
                return nil, err
×
612
        }
×
613

UNCOV
614
        return b.Bytes(), nil
×
615
}
616

617
// readBigSize converts the given byte slice into a uint64 and assumes that the
618
// bytes slice is using BigSize encoding.
UNCOV
619
func readBigSize(b []byte) (uint64, error) {
×
UNCOV
620
        r := bytes.NewReader(b)
×
UNCOV
621
        i, err := tlv.ReadVarInt(r, &[8]byte{})
×
UNCOV
622
        if err != nil {
×
623
                return 0, err
×
624
        }
×
625

UNCOV
626
        return i, nil
×
627
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc