• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12250203495

10 Dec 2024 05:45AM UTC coverage: 49.842% (+0.07%) from 49.773%
12250203495

Pull #9342

github

ellemouton
docs: add release notes entry
Pull Request #9342: protofsm: update GR Manager usage and start using structured logging

100221 of 201077 relevant lines covered (49.84%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.59
/watchtower/wtdb/client_db.go
1
package wtdb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "net"
9
        "sync"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/lightningnetwork/lnd/fn/v2"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
        "github.com/lightningnetwork/lnd/tlv"
16
        "github.com/lightningnetwork/lnd/watchtower/blob"
17
)
18

19
var (
20
        // cSessionKeyIndexBkt is a top-level bucket storing:
21
        //   tower-id -> reserved-session-key-index (uint32).
22
        cSessionKeyIndexBkt = []byte("client-session-key-index-bucket")
23

24
        // cChanDetailsBkt is a top-level bucket storing:
25
        //   channel-id => cChannelSummary -> encoded ClientChanSummary.
26
        //                  => cChanDBID -> db-assigned-id
27
        //                 => cChanSessions => db-session-id -> 1
28
        //                 => cChanClosedHeight -> block-height
29
        //                 => cChanMaxCommitmentHeight -> commitment-height
30
        cChanDetailsBkt = []byte("client-channel-detail-bucket")
31

32
        // cChanSessions is a sub-bucket of cChanDetailsBkt which stores:
33
        //    db-session-id -> 1
34
        cChanSessions = []byte("client-channel-sessions")
35

36
        // cChanDBID is a key used in the cChanDetailsBkt to store the
37
        // db-assigned-id of a channel.
38
        cChanDBID = []byte("client-channel-db-id")
39

40
        // cChanClosedHeight is a key used in the cChanDetailsBkt to store the
41
        // block height at which the channel's closing transaction was mined in.
42
        // If this there is no associated value for this key, then the channel
43
        // has not yet been marked as closed.
44
        cChanClosedHeight = []byte("client-channel-closed-height")
45

46
        // cChannelSummary is a key used in cChanDetailsBkt to store the encoded
47
        // body of ClientChanSummary.
48
        cChannelSummary = []byte("client-channel-summary")
49

50
        // cChanMaxCommitmentHeight is a key used in the cChanDetailsBkt used
51
        // to store the highest commitment height for this channel that the
52
        // tower has been handed.
53
        cChanMaxCommitmentHeight = []byte(
54
                "client-channel-max-commitment-height",
55
        )
56

57
        // cSessionBkt is a top-level bucket storing:
58
        //   session-id => cSessionBody -> encoded ClientSessionBody
59
        //                 => cSessionDBID -> db-assigned-id
60
        //              => cSessionCommits => seqnum -> encoded CommittedUpdate
61
        //              => cSessionAckRangeIndex => db-chan-id => start -> end
62
        //                 => cSessionRogueUpdateCount -> count
63
        cSessionBkt = []byte("client-session-bucket")
64

65
        // cSessionDBID is a key used in the cSessionBkt to store the
66
        // db-assigned-id of a session.
67
        cSessionDBID = []byte("client-session-db-id")
68

69
        // cSessionBody is a sub-bucket of cSessionBkt storing only the body of
70
        // the ClientSession.
71
        cSessionBody = []byte("client-session-body")
72

73
        // cSessionBody is a sub-bucket of cSessionBkt storing:
74
        //    seqnum -> encoded CommittedUpdate.
75
        cSessionCommits = []byte("client-session-commits")
76

77
        // cSessionAckRangeIndex is a sub-bucket of cSessionBkt storing
78
        //    chan-id => start -> end
79
        cSessionAckRangeIndex = []byte("client-session-ack-range-index")
80

81
        // cSessionRogueUpdateCount is a key in the cSessionBkt bucket storing
82
        // the number of rogue updates that were backed up using the session.
83
        // Rogue updates are updates for channels that have been closed already
84
        // at the time of the back-up.
85
        cSessionRogueUpdateCount = []byte("client-session-rogue-update-count")
86

87
        // cChanIDIndexBkt is a top-level bucket storing:
88
        //    db-assigned-id -> channel-ID
89
        cChanIDIndexBkt = []byte("client-channel-id-index")
90

91
        // cSessionIDIndexBkt is a top-level bucket storing:
92
        //    db-assigned-id -> session-id
93
        cSessionIDIndexBkt = []byte("client-session-id-index")
94

95
        // cTowerBkt is a top-level bucket storing:
96
        //    tower-id -> encoded Tower.
97
        cTowerBkt = []byte("client-tower-bucket")
98

99
        // cTowerIndexBkt is a top-level bucket storing:
100
        //    tower-pubkey -> tower-id.
101
        cTowerIndexBkt = []byte("client-tower-index-bucket")
102

103
        // cTowerToSessionIndexBkt is a top-level bucket storing:
104
        //         tower-id -> session-id -> 1
105
        cTowerToSessionIndexBkt = []byte(
106
                "client-tower-to-session-index-bucket",
107
        )
108

109
        // cClosableSessionsBkt is a top-level bucket storing:
110
        //         db-session-id -> last-channel-close-height
111
        cClosableSessionsBkt = []byte("client-closable-sessions-bucket")
112

113
        // cTaskQueue is a top-level bucket where the disk queue may store its
114
        // content.
115
        cTaskQueue = []byte("client-task-queue")
116

117
        // ErrTowerNotFound signals that the target tower was not found in the
118
        // database.
119
        ErrTowerNotFound = errors.New("tower not found")
120

121
        // ErrTowerUnackedUpdates is an error returned when we attempt to mark a
122
        // tower's sessions as inactive, but one of its sessions has unacked
123
        // updates.
124
        ErrTowerUnackedUpdates = errors.New("tower has unacked updates")
125

126
        // ErrCorruptClientSession signals that the client session's on-disk
127
        // structure deviates from what is expected.
128
        ErrCorruptClientSession = errors.New("client session corrupted")
129

130
        // ErrCorruptChanDetails signals that the clients channel detail's
131
        // on-disk structure deviates from what is expected.
132
        ErrCorruptChanDetails = errors.New("channel details corrupted")
133

134
        // ErrClientSessionAlreadyExists signals an attempt to reinsert a client
135
        // session that has already been created.
136
        ErrClientSessionAlreadyExists = errors.New(
137
                "client session already exists",
138
        )
139

140
        // ErrChannelAlreadyRegistered signals a duplicate attempt to register a
141
        // channel with the client database.
142
        ErrChannelAlreadyRegistered = errors.New("channel already registered")
143

144
        // ErrChannelNotRegistered signals a channel has not yet been registered
145
        // in the client database.
146
        ErrChannelNotRegistered = errors.New("channel not registered")
147

148
        // ErrClientSessionNotFound signals that the requested client session
149
        // was not found in the database.
150
        ErrClientSessionNotFound = errors.New("client session not found")
151

152
        // ErrUpdateAlreadyCommitted signals that the chosen sequence number has
153
        // already been committed to an update with a different breach hint.
154
        ErrUpdateAlreadyCommitted = errors.New("update already committed")
155

156
        // ErrCommitUnorderedUpdate signals the client tried to commit a
157
        // sequence number other than the next unallocated sequence number.
158
        ErrCommitUnorderedUpdate = errors.New("update seqnum not monotonic")
159

160
        // ErrCommittedUpdateNotFound signals that the tower tried to ACK a
161
        // sequence number that has not yet been allocated by the client.
162
        ErrCommittedUpdateNotFound = errors.New("committed update not found")
163

164
        // ErrUnallocatedLastApplied signals that the tower tried to provide a
165
        // LastApplied value greater than any allocated sequence number.
166
        ErrUnallocatedLastApplied = errors.New("tower echoed last appiled " +
167
                "greater than allocated seqnum")
168

169
        // ErrNoReservedKeyIndex signals that a client session could not be
170
        // created because no session key index was reserved.
171
        ErrNoReservedKeyIndex = errors.New("key index not reserved")
172

173
        // ErrIncorrectKeyIndex signals that the client session could not be
174
        // created because session key index differs from the reserved key
175
        // index.
176
        ErrIncorrectKeyIndex = errors.New("incorrect key index")
177

178
        // ErrLastTowerAddr is an error returned when the last address of a
179
        // watchtower is attempted to be removed.
180
        ErrLastTowerAddr = errors.New("cannot remove last tower address")
181

182
        // ErrNoRangeIndexFound is returned when there is no persisted
183
        // range-index found for the given session ID to channel ID pair.
184
        ErrNoRangeIndexFound = errors.New("no range index found for the " +
185
                "given session-channel pair")
186

187
        // ErrSessionFailedFilterFn indicates that a particular session did
188
        // not pass the filter func provided by the caller.
189
        ErrSessionFailedFilterFn = errors.New("session failed filter func")
190

191
        // ErrSessionNotClosable is returned when a session is not found in the
192
        // closable list.
193
        ErrSessionNotClosable = errors.New("session is not closable")
194

195
        // errSessionHasOpenChannels is an error used to indicate that a
196
        // session has updates for channels that are still open.
197
        errSessionHasOpenChannels = errors.New("session has open channels")
198

199
        // ErrSessionHasUnackedUpdates is an error used to indicate that a
200
        // session has un-acked updates.
201
        ErrSessionHasUnackedUpdates = errors.New("session has un-acked updates")
202

203
        // errChannelHasMoreSessions is an error used to indicate that a channel
204
        // has updates in other non-closed sessions.
205
        errChannelHasMoreSessions = errors.New("channel has updates in " +
206
                "other sessions")
207
)
208

209
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
210
// the watchtower database.
211
func NewBoltBackendCreator(active bool, dbPath,
212
        dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend,
213
        error) {
×
214

×
215
        // If the watchtower client isn't active, we return a function that
×
216
        // always returns a nil DB to make sure we don't create empty database
×
217
        // files.
×
218
        if !active {
×
219
                return func(_ *kvdb.BoltConfig) (kvdb.Backend, error) {
×
220
                        return nil, nil
×
221
                }
×
222
        }
223

224
        return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
225
                cfg := &kvdb.BoltBackendConfig{
×
226
                        DBPath:            dbPath,
×
227
                        DBFileName:        dbFileName,
×
228
                        NoFreelistSync:    boltCfg.NoFreelistSync,
×
229
                        AutoCompact:       boltCfg.AutoCompact,
×
230
                        AutoCompactMinAge: boltCfg.AutoCompactMinAge,
×
231
                        DBTimeout:         boltCfg.DBTimeout,
×
232
                }
×
233

×
234
                db, err := kvdb.GetBoltBackend(cfg)
×
235
                if err != nil {
×
236
                        return nil, fmt.Errorf("could not open boltdb: %w", err)
×
237
                }
×
238

239
                return db, nil
×
240
        }
241
}
242

243
// ClientDB is single database providing a persistent storage engine for the
244
// wtclient.
245
type ClientDB struct {
246
        db kvdb.Backend
247

248
        // ackedRangeIndex is a map from session ID to channel ID to a
249
        // RangeIndex which represents the backups that have been acked for that
250
        // channel using that session.
251
        ackedRangeIndex   map[SessionID]map[lnwire.ChannelID]*RangeIndex
252
        ackedRangeIndexMu sync.Mutex
253
}
254

255
// OpenClientDB opens the client database given the path to the database's
256
// directory. If no such database exists, this method will initialize a fresh
257
// one using the latest version number and bucket structure. If a database
258
// exists but has a lower version number than the current version, any necessary
259
// migrations will be applied before returning. Any attempt to open a database
260
// with a version number higher that the latest version will fail to prevent
261
// accidental reversion.
262
func OpenClientDB(db kvdb.Backend) (*ClientDB, error) {
4✔
263
        firstInit, err := isFirstInit(db)
4✔
264
        if err != nil {
4✔
265
                return nil, err
×
266
        }
×
267

268
        clientDB := &ClientDB{
4✔
269
                db: db,
4✔
270
                ackedRangeIndex: make(
4✔
271
                        map[SessionID]map[lnwire.ChannelID]*RangeIndex,
4✔
272
                ),
4✔
273
        }
4✔
274

4✔
275
        err = initOrSyncVersions(clientDB, firstInit, clientDBVersions)
4✔
276
        if err != nil {
4✔
277
                db.Close()
×
278
                return nil, err
×
279
        }
×
280

281
        // Now that the database version fully consistent with our latest known
282
        // version, ensure that all top-level buckets known to this version are
283
        // initialized. This allows us to assume their presence throughout all
284
        // operations. If an known top-level bucket is expected to exist but is
285
        // missing, this will trigger a ErrUninitializedDB error.
286
        err = kvdb.Update(clientDB.db, initClientDBBuckets, func() {})
8✔
287
        if err != nil {
4✔
288
                db.Close()
×
289
                return nil, err
×
290
        }
×
291

292
        return clientDB, nil
4✔
293
}
294

295
// initClientDBBuckets creates all top-level buckets required to handle database
296
// operations required by the latest version.
297
func initClientDBBuckets(tx kvdb.RwTx) error {
4✔
298
        buckets := [][]byte{
4✔
299
                cSessionKeyIndexBkt,
4✔
300
                cChanDetailsBkt,
4✔
301
                cSessionBkt,
4✔
302
                cTowerBkt,
4✔
303
                cTowerIndexBkt,
4✔
304
                cTowerToSessionIndexBkt,
4✔
305
                cChanIDIndexBkt,
4✔
306
                cSessionIDIndexBkt,
4✔
307
                cClosableSessionsBkt,
4✔
308
        }
4✔
309

4✔
310
        for _, bucket := range buckets {
8✔
311
                _, err := tx.CreateTopLevelBucket(bucket)
4✔
312
                if err != nil {
4✔
313
                        return err
×
314
                }
×
315
        }
316

317
        return nil
4✔
318
}
319

320
// bdb returns the backing bbolt.DB instance.
321
//
322
// NOTE: Part of the versionedDB interface.
323
func (c *ClientDB) bdb() kvdb.Backend {
4✔
324
        return c.db
4✔
325
}
4✔
326

327
// Version returns the database's current version number.
328
//
329
// NOTE: Part of the versionedDB interface.
330
func (c *ClientDB) Version() (uint32, error) {
4✔
331
        var version uint32
4✔
332
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
333
                var err error
4✔
334
                version, err = getDBVersion(tx)
4✔
335
                return err
4✔
336
        }, func() {
8✔
337
                version = 0
4✔
338
        })
4✔
339
        if err != nil {
4✔
340
                return 0, err
×
341
        }
×
342

343
        return version, nil
4✔
344
}
345

346
// Close closes the underlying database.
347
func (c *ClientDB) Close() error {
×
348
        return c.db.Close()
×
349
}
×
350

351
// CreateTower initialize an address record used to communicate with a
352
// watchtower. Each Tower is assigned a unique ID, that is used to amortize
353
// storage costs of the public key when used by multiple sessions. If the tower
354
// already exists, the address is appended to the list of all addresses used to
355
// that tower previously and its corresponding sessions are marked as active.
356
func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) {
4✔
357
        var towerPubKey [33]byte
4✔
358
        copy(towerPubKey[:], lnAddr.IdentityKey.SerializeCompressed())
4✔
359

4✔
360
        var tower *Tower
4✔
361
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
362
                towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
4✔
363
                if towerIndex == nil {
4✔
364
                        return ErrUninitializedDB
×
365
                }
×
366

367
                towers := tx.ReadWriteBucket(cTowerBkt)
4✔
368
                if towers == nil {
4✔
369
                        return ErrUninitializedDB
×
370
                }
×
371

372
                towerToSessionIndex := tx.ReadWriteBucket(
4✔
373
                        cTowerToSessionIndexBkt,
4✔
374
                )
4✔
375
                if towerToSessionIndex == nil {
4✔
376
                        return ErrUninitializedDB
×
377
                }
×
378

379
                // Check if the tower index already knows of this pubkey.
380
                towerIDBytes := towerIndex.Get(towerPubKey[:])
4✔
381
                if len(towerIDBytes) == 8 {
8✔
382
                        // The tower already exists, deserialize the existing
4✔
383
                        // record.
4✔
384
                        var err error
4✔
385
                        tower, err = getTower(towers, towerIDBytes)
4✔
386
                        if err != nil {
4✔
387
                                return err
×
388
                        }
×
389

390
                        // Set its status to active.
391
                        tower.Status = TowerStatusActive
4✔
392

4✔
393
                        // Add the new address to the existing tower. If the
4✔
394
                        // address is a duplicate, this will result in no
4✔
395
                        // change.
4✔
396
                        tower.AddAddress(lnAddr.Address)
4✔
397
                } else {
4✔
398
                        // No such tower exists, create a new tower id for our
4✔
399
                        // new tower. The error is unhandled since NextSequence
4✔
400
                        // never fails in an Update.
4✔
401
                        towerID, _ := towerIndex.NextSequence()
4✔
402

4✔
403
                        tower = &Tower{
4✔
404
                                ID:          TowerID(towerID),
4✔
405
                                IdentityKey: lnAddr.IdentityKey,
4✔
406
                                Addresses:   []net.Addr{lnAddr.Address},
4✔
407
                                Status:      TowerStatusActive,
4✔
408
                        }
4✔
409

4✔
410
                        towerIDBytes = tower.ID.Bytes()
4✔
411

4✔
412
                        // Since this tower is new, record the mapping from
4✔
413
                        // tower pubkey to tower id in the tower index.
4✔
414
                        err := towerIndex.Put(towerPubKey[:], towerIDBytes)
4✔
415
                        if err != nil {
4✔
416
                                return err
×
417
                        }
×
418

419
                        // Create a new bucket for this tower in the
420
                        // tower-to-sessions index.
421
                        _, err = towerToSessionIndex.CreateBucket(towerIDBytes)
4✔
422
                        if err != nil {
4✔
423
                                return err
×
424
                        }
×
425
                }
426

427
                // Store the new or updated tower under its tower id.
428
                return putTower(towers, tower)
4✔
429
        }, func() {
4✔
430
                tower = nil
4✔
431
        })
4✔
432
        if err != nil {
4✔
433
                return nil, err
×
434
        }
×
435

436
        return tower, nil
4✔
437
}
438

439
// RemoveTower modifies a tower's record within the database. If an address is
440
// provided, then _only_ the address record should be removed from the tower's
441
// persisted state. Otherwise, we'll attempt to mark the tower as inactive. If
442
// any of its sessions has unacked updates, then ErrTowerUnackedUpdates is
443
// returned. If the tower doesn't have any sessions at all, it'll be completely
444
// removed from the database.
445
//
446
// NOTE: An error is not returned if the tower doesn't exist.
447
func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error {
4✔
448
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
449
                towers := tx.ReadWriteBucket(cTowerBkt)
4✔
450
                if towers == nil {
4✔
451
                        return ErrUninitializedDB
×
452
                }
×
453

454
                towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
4✔
455
                if towerIndex == nil {
4✔
456
                        return ErrUninitializedDB
×
457
                }
×
458

459
                towersToSessionsIndex := tx.ReadWriteBucket(
4✔
460
                        cTowerToSessionIndexBkt,
4✔
461
                )
4✔
462
                if towersToSessionsIndex == nil {
4✔
463
                        return ErrUninitializedDB
×
464
                }
×
465

466
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
467
                if chanIDIndexBkt == nil {
4✔
468
                        return ErrUninitializedDB
×
469
                }
×
470

471
                // Don't return an error if the watchtower doesn't exist to act
472
                // as a NOP.
473
                pubKeyBytes := pubKey.SerializeCompressed()
4✔
474
                towerIDBytes := towerIndex.Get(pubKeyBytes)
4✔
475
                if towerIDBytes == nil {
4✔
476
                        return nil
×
477
                }
×
478

479
                tower, err := getTower(towers, towerIDBytes)
4✔
480
                if err != nil {
4✔
481
                        return err
×
482
                }
×
483

484
                // If an address is provided, then we should _only_ remove the
485
                // address record from the database.
486
                if addr != nil {
4✔
487
                        // Towers should always have at least one address saved.
×
488
                        tower.RemoveAddress(addr)
×
489
                        if len(tower.Addresses) == 0 {
×
490
                                return ErrLastTowerAddr
×
491
                        }
×
492

493
                        return putTower(towers, tower)
×
494
                }
495

496
                // Otherwise, we should attempt to mark the tower's sessions as
497
                // inactive.
498
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
499
                if sessions == nil {
4✔
500
                        return ErrUninitializedDB
×
501
                }
×
502
                towerID := TowerIDFromBytes(towerIDBytes)
4✔
503

4✔
504
                committedUpdateCount := make(map[SessionID]uint16)
4✔
505
                perCommittedUpdate := func(s *ClientSession,
4✔
506
                        _ *CommittedUpdate) {
4✔
507

×
508
                        committedUpdateCount[s.ID]++
×
509
                }
×
510

511
                towerSessions, err := c.listTowerSessions(
4✔
512
                        towerID, sessions, chanIDIndexBkt,
4✔
513
                        towersToSessionsIndex,
4✔
514
                        WithPerCommittedUpdate(perCommittedUpdate),
4✔
515
                )
4✔
516
                if err != nil {
4✔
517
                        return err
×
518
                }
×
519

520
                // If it doesn't have any, we can completely remove it from the
521
                // database.
522
                if len(towerSessions) == 0 {
4✔
523
                        if err := towerIndex.Delete(pubKeyBytes); err != nil {
×
524
                                return err
×
525
                        }
×
526

527
                        if err := towers.Delete(towerIDBytes); err != nil {
×
528
                                return err
×
529
                        }
×
530

531
                        return towersToSessionsIndex.DeleteNestedBucket(
×
532
                                towerIDBytes,
×
533
                        )
×
534
                }
535

536
                // Otherwise, we mark the tower as inactive.
537
                tower.Status = TowerStatusInactive
4✔
538
                err = putTower(towers, tower)
4✔
539
                if err != nil {
4✔
540
                        return err
×
541
                }
×
542

543
                // We'll do a check to ensure that the tower's sessions don't
544
                // have any pending back-ups.
545
                for _, session := range towerSessions {
8✔
546
                        if committedUpdateCount[session.ID] > 0 {
4✔
547
                                return ErrTowerUnackedUpdates
×
548
                        }
×
549
                }
550

551
                return nil
4✔
552
        }, func() {})
4✔
553
}
554

555
// DeactivateTower sets the given tower's status to inactive. This means that
556
// this tower's sessions won't be loaded and used for backups. CreateTower can
557
// be used to reactivate the tower again.
558
func (c *ClientDB) DeactivateTower(pubKey *btcec.PublicKey) error {
4✔
559
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
560
                towers := tx.ReadWriteBucket(cTowerBkt)
4✔
561
                if towers == nil {
4✔
562
                        return ErrUninitializedDB
×
563
                }
×
564

565
                towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
4✔
566
                if towerIndex == nil {
4✔
567
                        return ErrUninitializedDB
×
568
                }
×
569

570
                towersToSessionsIndex := tx.ReadWriteBucket(
4✔
571
                        cTowerToSessionIndexBkt,
4✔
572
                )
4✔
573
                if towersToSessionsIndex == nil {
4✔
574
                        return ErrUninitializedDB
×
575
                }
×
576

577
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
578
                if chanIDIndexBkt == nil {
4✔
579
                        return ErrUninitializedDB
×
580
                }
×
581

582
                pubKeyBytes := pubKey.SerializeCompressed()
4✔
583
                towerIDBytes := towerIndex.Get(pubKeyBytes)
4✔
584
                if towerIDBytes == nil {
4✔
585
                        return ErrTowerNotFound
×
586
                }
×
587

588
                tower, err := getTower(towers, towerIDBytes)
4✔
589
                if err != nil {
4✔
590
                        return err
×
591
                }
×
592

593
                // If the tower already has the desired status, then we can exit
594
                // here.
595
                if tower.Status == TowerStatusInactive {
4✔
596
                        return nil
×
597
                }
×
598

599
                // Otherwise, we update the status and re-store the tower.
600
                tower.Status = TowerStatusInactive
4✔
601

4✔
602
                return putTower(towers, tower)
4✔
603
        }, func() {})
4✔
604
}
605

606
// LoadTowerByID retrieves a tower by its tower ID.
607
func (c *ClientDB) LoadTowerByID(towerID TowerID) (*Tower, error) {
4✔
608
        var tower *Tower
4✔
609
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
610
                towers := tx.ReadBucket(cTowerBkt)
4✔
611
                if towers == nil {
4✔
612
                        return ErrUninitializedDB
×
613
                }
×
614

615
                var err error
4✔
616
                tower, err = getTower(towers, towerID.Bytes())
4✔
617
                return err
4✔
618
        }, func() {
4✔
619
                tower = nil
4✔
620
        })
4✔
621
        if err != nil {
4✔
622
                return nil, err
×
623
        }
×
624

625
        return tower, nil
4✔
626
}
627

628
// LoadTower retrieves a tower by its public key.
629
func (c *ClientDB) LoadTower(pubKey *btcec.PublicKey) (*Tower, error) {
4✔
630
        var tower *Tower
4✔
631
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
632
                towers := tx.ReadBucket(cTowerBkt)
4✔
633
                if towers == nil {
4✔
634
                        return ErrUninitializedDB
×
635
                }
×
636
                towerIndex := tx.ReadBucket(cTowerIndexBkt)
4✔
637
                if towerIndex == nil {
4✔
638
                        return ErrUninitializedDB
×
639
                }
×
640

641
                towerIDBytes := towerIndex.Get(pubKey.SerializeCompressed())
4✔
642
                if towerIDBytes == nil {
4✔
643
                        return ErrTowerNotFound
×
644
                }
×
645

646
                var err error
4✔
647
                tower, err = getTower(towers, towerIDBytes)
4✔
648
                return err
4✔
649
        }, func() {
4✔
650
                tower = nil
4✔
651
        })
4✔
652
        if err != nil {
4✔
653
                return nil, err
×
654
        }
×
655

656
        return tower, nil
4✔
657
}
658

659
// TowerFilterFn is the signature of a call-back function that can be used to
660
// skip certain towers in the ListTowers method.
661
type TowerFilterFn func(*Tower) bool
662

663
// ListTowers retrieves the list of towers available within the database that
664
// have a status matching the given status. The filter function may be set in
665
// order to filter out the towers to be returned.
666
func (c *ClientDB) ListTowers(filter TowerFilterFn) ([]*Tower, error) {
4✔
667
        var towers []*Tower
4✔
668
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
669
                towerBucket := tx.ReadBucket(cTowerBkt)
4✔
670
                if towerBucket == nil {
4✔
671
                        return ErrUninitializedDB
×
672
                }
×
673

674
                return towerBucket.ForEach(func(towerIDBytes, _ []byte) error {
8✔
675
                        tower, err := getTower(towerBucket, towerIDBytes)
4✔
676
                        if err != nil {
4✔
677
                                return err
×
678
                        }
×
679

680
                        if filter != nil && !filter(tower) {
4✔
681
                                return nil
×
682
                        }
×
683

684
                        towers = append(towers, tower)
4✔
685

4✔
686
                        return nil
4✔
687
                })
688
        }, func() {
4✔
689
                towers = nil
4✔
690
        })
4✔
691
        if err != nil {
4✔
692
                return nil, err
×
693
        }
×
694

695
        return towers, nil
4✔
696
}
697

698
// NextSessionKeyIndex reserves a new session key derivation index for a
699
// particular tower id. The index is reserved for that tower until
700
// CreateClientSession is invoked for that tower and index, at which point a new
701
// index for that tower can be reserved. Multiple calls to this method before
702
// CreateClientSession is invoked should return the same index unless forceNext
703
// is true.
704
func (c *ClientDB) NextSessionKeyIndex(towerID TowerID,
705
        blobType blob.Type, forceNext bool) (uint32, error) {
4✔
706

4✔
707
        var index uint32
4✔
708
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
709
                keyIndex := tx.ReadWriteBucket(cSessionKeyIndexBkt)
4✔
710
                if keyIndex == nil {
4✔
711
                        return ErrUninitializedDB
×
712
                }
×
713

714
                var err error
4✔
715
                if !forceNext {
8✔
716
                        // Check the session key index to see if a key has
4✔
717
                        // already been reserved for this tower. If so, we'll
4✔
718
                        // deserialize and return the index directly.
4✔
719
                        index, err = getSessionKeyIndex(
4✔
720
                                keyIndex, towerID, blobType,
4✔
721
                        )
4✔
722
                        if err == nil {
4✔
723
                                return nil
×
724
                        }
×
725
                }
726

727
                // By default, we use the next available bucket sequence as the
728
                // key index. But if forceNext is true, then it is assumed that
729
                // some data loss occurred and so the sequence is incremented a
730
                // by a jump of 1000 so that we can arrive at a brand new key
731
                // index quicker.
732
                currentSequence := keyIndex.Sequence()
4✔
733
                nextIndex := currentSequence + 1
4✔
734
                if forceNext {
4✔
735
                        nextIndex = currentSequence + 1000
×
736
                }
×
737

738
                if err = keyIndex.SetSequence(nextIndex); err != nil {
4✔
739
                        return fmt.Errorf("could not set next bucket "+
×
740
                                "sequence: %w", err)
×
741
                }
×
742

743
                // As a sanity check, assert that the index is still in the
744
                // valid range of unhardened pubkeys. In the future, we should
745
                // move to only using hardened keys, and this will prevent any
746
                // overlap from occurring until then. This also prevents us from
747
                // overflowing uint32s.
748
                if nextIndex > math.MaxInt32 {
4✔
749
                        return fmt.Errorf("exhausted session key indexes")
×
750
                }
×
751

752
                // Create the key that will used to be store the reserved index.
753
                keyBytes := createSessionKeyIndexKey(towerID, blobType)
4✔
754

4✔
755
                index = uint32(nextIndex)
4✔
756

4✔
757
                var indexBuf [4]byte
4✔
758
                byteOrder.PutUint32(indexBuf[:], index)
4✔
759

4✔
760
                // Record the reserved session key index under this tower's id.
4✔
761
                return keyIndex.Put(keyBytes, indexBuf[:])
4✔
762
        }, func() {
4✔
763
                index = 0
4✔
764
        })
4✔
765
        if err != nil {
4✔
766
                return 0, err
×
767
        }
×
768

769
        return index, nil
4✔
770
}
771

772
// CreateClientSession records a newly negotiated client session in the set of
773
// active sessions. The session can be identified by its SessionID.
774
func (c *ClientDB) CreateClientSession(session *ClientSession) error {
4✔
775
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
776
                keyIndexes := tx.ReadWriteBucket(cSessionKeyIndexBkt)
4✔
777
                if keyIndexes == nil {
4✔
778
                        return ErrUninitializedDB
×
779
                }
×
780

781
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
782
                if sessions == nil {
4✔
783
                        return ErrUninitializedDB
×
784
                }
×
785

786
                towers := tx.ReadBucket(cTowerBkt)
4✔
787
                if towers == nil {
4✔
788
                        return ErrUninitializedDB
×
789
                }
×
790

791
                towerToSessionIndex := tx.ReadWriteBucket(
4✔
792
                        cTowerToSessionIndexBkt,
4✔
793
                )
4✔
794
                if towerToSessionIndex == nil {
4✔
795
                        return ErrUninitializedDB
×
796
                }
×
797

798
                // Check that  client session with this session id doesn't
799
                // already exist.
800
                existingSessionBytes := sessions.NestedReadWriteBucket(
4✔
801
                        session.ID[:],
4✔
802
                )
4✔
803
                if existingSessionBytes != nil {
4✔
804
                        return ErrClientSessionAlreadyExists
×
805
                }
×
806

807
                // Ensure that a tower with the given ID actually exists in the
808
                // DB.
809
                towerID := session.TowerID
4✔
810
                if _, err := getTower(towers, towerID.Bytes()); err != nil {
4✔
811
                        return err
×
812
                }
×
813

814
                blobType := session.Policy.BlobType
4✔
815

4✔
816
                // Check that this tower has a reserved key index.
4✔
817
                index, err := getSessionKeyIndex(keyIndexes, towerID, blobType)
4✔
818
                if err != nil {
4✔
819
                        return err
×
820
                }
×
821

822
                // Assert that the key index of the inserted session matches the
823
                // reserved session key index.
824
                if index != session.KeyIndex {
4✔
825
                        return ErrIncorrectKeyIndex
×
826
                }
×
827

828
                // Remove the key index reservation. For altruist commit
829
                // sessions, we'll also purge under the old legacy key format.
830
                key := createSessionKeyIndexKey(towerID, blobType)
4✔
831
                err = keyIndexes.Delete(key)
4✔
832
                if err != nil {
4✔
833
                        return err
×
834
                }
×
835
                if blobType == blob.TypeAltruistCommit {
8✔
836
                        err = keyIndexes.Delete(towerID.Bytes())
4✔
837
                        if err != nil {
4✔
838
                                return err
×
839
                        }
×
840
                }
841

842
                // Get the session-ID index bucket.
843
                dbIDIndex := tx.ReadWriteBucket(cSessionIDIndexBkt)
4✔
844
                if dbIDIndex == nil {
4✔
845
                        return ErrUninitializedDB
×
846
                }
×
847

848
                // Get a new, unique, ID for this session from the session-ID
849
                // index bucket.
850
                nextSeq, err := dbIDIndex.NextSequence()
4✔
851
                if err != nil {
4✔
852
                        return err
×
853
                }
×
854

855
                // Add the new entry to the dbID-to-SessionID index.
856
                newIndex, err := writeBigSize(nextSeq)
4✔
857
                if err != nil {
4✔
858
                        return err
×
859
                }
×
860

861
                err = dbIDIndex.Put(newIndex, session.ID[:])
4✔
862
                if err != nil {
4✔
863
                        return err
×
864
                }
×
865

866
                // Also add the db-assigned-id to the session bucket under the
867
                // cSessionDBID key.
868
                sessionBkt, err := sessions.CreateBucket(session.ID[:])
4✔
869
                if err != nil {
4✔
870
                        return err
×
871
                }
×
872

873
                err = sessionBkt.Put(cSessionDBID, newIndex)
4✔
874
                if err != nil {
4✔
875
                        return err
×
876
                }
×
877

878
                // TODO(elle): migrate the towerID-to-SessionID to use the
879
                // new db-assigned sessionID's rather.
880

881
                // Add the new entry to the towerID-to-SessionID index.
882
                towerSessions := towerToSessionIndex.NestedReadWriteBucket(
4✔
883
                        towerID.Bytes(),
4✔
884
                )
4✔
885
                if towerSessions == nil {
4✔
886
                        return ErrTowerNotFound
×
887
                }
×
888

889
                err = towerSessions.Put(session.ID[:], []byte{1})
4✔
890
                if err != nil {
4✔
891
                        return err
×
892
                }
×
893

894
                // Finally, write the client session's body in the sessions
895
                // bucket.
896
                return putClientSessionBody(sessionBkt, session)
4✔
897
        }, func() {})
4✔
898
}
899

900
// readRangeIndex reads a persisted RangeIndex from the passed bucket and into
901
// a new in-memory RangeIndex.
902
func readRangeIndex(rangesBkt kvdb.RBucket) (*RangeIndex, error) {
4✔
903
        ranges := make(map[uint64]uint64)
4✔
904
        err := rangesBkt.ForEach(func(k, v []byte) error {
8✔
905
                start, err := readBigSize(k)
4✔
906
                if err != nil {
4✔
907
                        return err
×
908
                }
×
909

910
                end, err := readBigSize(v)
4✔
911
                if err != nil {
4✔
912
                        return err
×
913
                }
×
914

915
                ranges[start] = end
4✔
916

4✔
917
                return nil
4✔
918
        })
919
        if err != nil {
4✔
920
                return nil, err
×
921
        }
×
922

923
        return NewRangeIndex(ranges, WithSerializeUint64Fn(writeBigSize))
4✔
924
}
925

926
// getRangeIndex checks the ClientDB's in-memory range index map to see if it
927
// has an entry for the given session and channel ID. If it does, this is
928
// returned, otherwise the range index is loaded from the DB. An optional db
929
// transaction parameter may be provided. If one is provided then it will be
930
// used to query the DB for the range index, otherwise, a new transaction will
931
// be created and used.
932
func (c *ClientDB) getRangeIndex(tx kvdb.RTx, sID SessionID,
933
        chanID lnwire.ChannelID) (*RangeIndex, error) {
4✔
934

4✔
935
        c.ackedRangeIndexMu.Lock()
4✔
936
        defer c.ackedRangeIndexMu.Unlock()
4✔
937

4✔
938
        if _, ok := c.ackedRangeIndex[sID]; !ok {
8✔
939
                c.ackedRangeIndex[sID] = make(map[lnwire.ChannelID]*RangeIndex)
4✔
940
        }
4✔
941

942
        // If the in-memory range-index map already includes an entry for this
943
        // session ID and channel ID pair, then return it.
944
        if index, ok := c.ackedRangeIndex[sID][chanID]; ok {
8✔
945
                return index, nil
4✔
946
        }
4✔
947

948
        // readRangeIndexFromBkt is a helper that is used to read in a
949
        // RangeIndex structure from the passed in bucket and store it in the
950
        // ackedRangeIndex map.
951
        readRangeIndexFromBkt := func(rangesBkt kvdb.RBucket) (*RangeIndex,
4✔
952
                error) {
8✔
953

4✔
954
                // Create a new in-memory RangeIndex by reading in ranges from
4✔
955
                // the DB.
4✔
956
                rangeIndex, err := readRangeIndex(rangesBkt)
4✔
957
                if err != nil {
4✔
958
                        return nil, err
×
959
                }
×
960

961
                c.ackedRangeIndex[sID][chanID] = rangeIndex
4✔
962

4✔
963
                return rangeIndex, nil
4✔
964
        }
965

966
        // If a DB transaction is provided then use it to fetch the ranges
967
        // bucket from the DB.
968
        if tx != nil {
8✔
969
                rangesBkt, err := getRangesReadBucket(tx, sID, chanID)
4✔
970
                if err != nil {
4✔
971
                        return nil, err
×
972
                }
×
973

974
                return readRangeIndexFromBkt(rangesBkt)
4✔
975
        }
976

977
        // No DB transaction was provided. So create and use a new one.
978
        var index *RangeIndex
×
979
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
×
980
                rangesBkt, err := getRangesReadBucket(tx, sID, chanID)
×
981
                if err != nil {
×
982
                        return err
×
983
                }
×
984

985
                index, err = readRangeIndexFromBkt(rangesBkt)
×
986

×
987
                return err
×
988
        }, func() {})
×
989
        if err != nil {
×
990
                return nil, err
×
991
        }
×
992

993
        return index, nil
×
994
}
995

996
// getRangesReadBucket gets the range index bucket where the range index for the
997
// given session-channel pair is stored. If any sub-buckets along the way do not
998
// exist, then an error is returned. If the sub-buckets should be created
999
// instead, then use getRangesWriteBucket.
1000
func getRangesReadBucket(tx kvdb.RTx, sID SessionID, chanID lnwire.ChannelID) (
1001
        kvdb.RBucket, error) {
4✔
1002

4✔
1003
        sessions := tx.ReadBucket(cSessionBkt)
4✔
1004
        if sessions == nil {
4✔
1005
                return nil, ErrUninitializedDB
×
1006
        }
×
1007

1008
        chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
4✔
1009
        if chanDetailsBkt == nil {
4✔
1010
                return nil, ErrUninitializedDB
×
1011
        }
×
1012

1013
        sessionBkt := sessions.NestedReadBucket(sID[:])
4✔
1014
        if sessionsBkt == nil {
4✔
1015
                return nil, ErrNoRangeIndexFound
×
1016
        }
×
1017

1018
        // Get the DB representation of the channel-ID.
1019
        _, dbChanIDBytes, err := getDBChanID(chanDetailsBkt, chanID)
4✔
1020
        if err != nil {
4✔
1021
                return nil, err
×
1022
        }
×
1023

1024
        sessionAckRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
4✔
1025
        if sessionAckRanges == nil {
4✔
1026
                return nil, ErrNoRangeIndexFound
×
1027
        }
×
1028

1029
        return sessionAckRanges.NestedReadBucket(dbChanIDBytes), nil
4✔
1030
}
1031

1032
// getRangesWriteBucket gets the range index bucket where the range index for
1033
// the given session-channel pair is stored. If any sub-buckets along the way do
1034
// not exist, then they are created.
1035
func getRangesWriteBucket(sessionBkt kvdb.RwBucket, dbChanIDBytes []byte) (
1036
        kvdb.RwBucket, error) {
4✔
1037

4✔
1038
        sessionAckRanges, err := sessionBkt.CreateBucketIfNotExists(
4✔
1039
                cSessionAckRangeIndex,
4✔
1040
        )
4✔
1041
        if err != nil {
4✔
1042
                return nil, err
×
1043
        }
×
1044

1045
        return sessionAckRanges.CreateBucketIfNotExists(dbChanIDBytes)
4✔
1046
}
1047

1048
// createSessionKeyIndexKey returns the identifier used in the
1049
// session-key-index index, created as tower-id||blob-type.
1050
//
1051
// NOTE: The original serialization only used tower-id, which prevents
1052
// concurrent client types from reserving sessions with the same tower.
1053
func createSessionKeyIndexKey(towerID TowerID, blobType blob.Type) []byte {
4✔
1054
        towerIDBytes := towerID.Bytes()
4✔
1055

4✔
1056
        // Session key indexes are stored under as tower-id||blob-type.
4✔
1057
        var keyBytes [6]byte
4✔
1058
        copy(keyBytes[:4], towerIDBytes)
4✔
1059
        byteOrder.PutUint16(keyBytes[4:], uint16(blobType))
4✔
1060

4✔
1061
        return keyBytes[:]
4✔
1062
}
4✔
1063

1064
// getSessionKeyIndex is a helper method.
1065
func getSessionKeyIndex(keyIndexes kvdb.RwBucket, towerID TowerID,
1066
        blobType blob.Type) (uint32, error) {
4✔
1067

4✔
1068
        // Session key indexes are store under as tower-id||blob-type. The
4✔
1069
        // original serialization only used tower-id, which prevents concurrent
4✔
1070
        // client types from reserving sessions with the same tower.
4✔
1071
        keyBytes := createSessionKeyIndexKey(towerID, blobType)
4✔
1072

4✔
1073
        // Retrieve the index using the key bytes. If the key wasn't found, we
4✔
1074
        // will fall back to the legacy format that only uses the tower id, but
4✔
1075
        // _only_ if the blob type is for altruist commit sessions since that
4✔
1076
        // was the only operational session type prior to changing the key
4✔
1077
        // format.
4✔
1078
        keyIndexBytes := keyIndexes.Get(keyBytes)
4✔
1079
        if keyIndexBytes == nil && blobType == blob.TypeAltruistCommit {
8✔
1080
                keyIndexBytes = keyIndexes.Get(towerID.Bytes())
4✔
1081
        }
4✔
1082

1083
        // All session key indexes should be serialized uint32's. If no key
1084
        // index was found, the length of keyIndexBytes will be 0.
1085
        if len(keyIndexBytes) != 4 {
8✔
1086
                return 0, ErrNoReservedKeyIndex
4✔
1087
        }
4✔
1088

1089
        return byteOrder.Uint32(keyIndexBytes), nil
4✔
1090
}
1091

1092
// GetClientSession loads the ClientSession with the given ID from the DB.
1093
func (c *ClientDB) GetClientSession(id SessionID,
1094
        opts ...ClientSessionListOption) (*ClientSession, error) {
4✔
1095

4✔
1096
        var sess *ClientSession
4✔
1097
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1098
                sessionsBkt := tx.ReadBucket(cSessionBkt)
4✔
1099
                if sessionsBkt == nil {
4✔
1100
                        return ErrUninitializedDB
×
1101
                }
×
1102

1103
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
1104
                if chanIDIndexBkt == nil {
4✔
1105
                        return ErrUninitializedDB
×
1106
                }
×
1107

1108
                session, err := c.getClientSession(
4✔
1109
                        sessionsBkt, chanIDIndexBkt, id[:], opts...,
4✔
1110
                )
4✔
1111
                if err != nil {
4✔
1112
                        return err
×
1113
                }
×
1114

1115
                sess = session
4✔
1116

4✔
1117
                return nil
4✔
1118
        }, func() {})
4✔
1119

1120
        return sess, err
4✔
1121
}
1122

1123
// ListClientSessions returns the set of all client sessions known to the db. An
1124
// optional tower ID can be used to filter out any client sessions in the
1125
// response that do not correspond to this tower.
1126
func (c *ClientDB) ListClientSessions(id *TowerID,
1127
        opts ...ClientSessionListOption) (map[SessionID]*ClientSession, error) {
4✔
1128

4✔
1129
        var clientSessions map[SessionID]*ClientSession
4✔
1130
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1131
                sessions := tx.ReadBucket(cSessionBkt)
4✔
1132
                if sessions == nil {
4✔
1133
                        return ErrUninitializedDB
×
1134
                }
×
1135

1136
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
1137
                if chanIDIndexBkt == nil {
4✔
1138
                        return ErrUninitializedDB
×
1139
                }
×
1140

1141
                // If no tower ID is specified, then fetch all the sessions
1142
                // known to the db.
1143
                var err error
4✔
1144
                if id == nil {
4✔
1145
                        clientSessions, err = c.listClientAllSessions(
×
1146
                                sessions, chanIDIndexBkt, opts...,
×
1147
                        )
×
1148
                        return err
×
1149
                }
×
1150

1151
                // Otherwise, fetch the sessions for the given tower.
1152
                towerToSessionIndex := tx.ReadBucket(cTowerToSessionIndexBkt)
4✔
1153
                if towerToSessionIndex == nil {
4✔
1154
                        return ErrUninitializedDB
×
1155
                }
×
1156

1157
                clientSessions, err = c.listTowerSessions(
4✔
1158
                        *id, sessions, chanIDIndexBkt, towerToSessionIndex,
4✔
1159
                        opts...,
4✔
1160
                )
4✔
1161
                return err
4✔
1162
        }, func() {
4✔
1163
                clientSessions = nil
4✔
1164
        })
4✔
1165
        if err != nil {
4✔
1166
                return nil, err
×
1167
        }
×
1168

1169
        return clientSessions, nil
4✔
1170
}
1171

1172
// listClientAllSessions returns the set of all client sessions known to the db.
1173
func (c *ClientDB) listClientAllSessions(sessions, chanIDIndexBkt kvdb.RBucket,
1174
        opts ...ClientSessionListOption) (map[SessionID]*ClientSession, error) {
×
1175

×
1176
        clientSessions := make(map[SessionID]*ClientSession)
×
1177
        err := sessions.ForEach(func(k, _ []byte) error {
×
1178
                // We'll load the full client session since the client will need
×
1179
                // the CommittedUpdates and AckedUpdates on startup to resume
×
1180
                // committed updates and compute the highest known commit height
×
1181
                // for each channel.
×
1182
                session, err := c.getClientSession(
×
1183
                        sessions, chanIDIndexBkt, k, opts...,
×
1184
                )
×
1185
                if errors.Is(err, ErrSessionFailedFilterFn) {
×
1186
                        return nil
×
1187
                } else if err != nil {
×
1188
                        return err
×
1189
                }
×
1190

1191
                clientSessions[session.ID] = session
×
1192

×
1193
                return nil
×
1194
        })
1195
        if err != nil {
×
1196
                return nil, err
×
1197
        }
×
1198

1199
        return clientSessions, nil
×
1200
}
1201

1202
// listTowerSessions returns the set of all client sessions known to the db
1203
// that are associated with the given tower id.
1204
func (c *ClientDB) listTowerSessions(id TowerID, sessionsBkt, chanIDIndexBkt,
1205
        towerToSessionIndex kvdb.RBucket, opts ...ClientSessionListOption) (
1206
        map[SessionID]*ClientSession, error) {
4✔
1207

4✔
1208
        towerIndexBkt := towerToSessionIndex.NestedReadBucket(id.Bytes())
4✔
1209
        if towerIndexBkt == nil {
4✔
1210
                return nil, ErrTowerNotFound
×
1211
        }
×
1212

1213
        clientSessions := make(map[SessionID]*ClientSession)
4✔
1214
        err := towerIndexBkt.ForEach(func(k, _ []byte) error {
8✔
1215
                // We'll load the full client session since the client will need
4✔
1216
                // the CommittedUpdates and AckedUpdates on startup to resume
4✔
1217
                // committed updates and compute the highest known commit height
4✔
1218
                // for each channel.
4✔
1219
                session, err := c.getClientSession(
4✔
1220
                        sessionsBkt, chanIDIndexBkt, k, opts...,
4✔
1221
                )
4✔
1222
                if errors.Is(err, ErrSessionFailedFilterFn) {
8✔
1223
                        return nil
4✔
1224
                } else if err != nil {
8✔
1225
                        return err
×
1226
                }
×
1227

1228
                clientSessions[session.ID] = session
4✔
1229
                return nil
4✔
1230
        })
1231
        if err != nil {
4✔
1232
                return nil, err
×
1233
        }
×
1234

1235
        return clientSessions, nil
4✔
1236
}
1237

1238
// FetchSessionCommittedUpdates retrieves the current set of un-acked updates
1239
// of the given session.
1240
func (c *ClientDB) FetchSessionCommittedUpdates(id *SessionID) (
1241
        []CommittedUpdate, error) {
4✔
1242

4✔
1243
        var committedUpdates []CommittedUpdate
4✔
1244
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1245
                sessions := tx.ReadBucket(cSessionBkt)
4✔
1246
                if sessions == nil {
4✔
1247
                        return ErrUninitializedDB
×
1248
                }
×
1249

1250
                sessionBkt := sessions.NestedReadBucket(id[:])
4✔
1251
                if sessionBkt == nil {
4✔
1252
                        return ErrClientSessionNotFound
×
1253
                }
×
1254

1255
                var err error
4✔
1256
                committedUpdates, err = getClientSessionCommits(
4✔
1257
                        sessionBkt, nil, nil,
4✔
1258
                )
4✔
1259
                return err
4✔
1260
        }, func() {})
4✔
1261
        if err != nil {
4✔
1262
                return nil, err
×
1263
        }
×
1264

1265
        return committedUpdates, nil
4✔
1266
}
1267

1268
// IsAcked returns true if the given backup has been backed up using the given
1269
// session.
1270
func (c *ClientDB) IsAcked(id *SessionID, backupID *BackupID) (bool, error) {
×
1271
        index, err := c.getRangeIndex(nil, *id, backupID.ChanID)
×
1272
        if errors.Is(err, ErrNoRangeIndexFound) {
×
1273
                return false, nil
×
1274
        } else if err != nil {
×
1275
                return false, err
×
1276
        }
×
1277

1278
        return index.IsInIndex(backupID.CommitHeight), nil
×
1279
}
1280

1281
// NumAckedUpdates returns the number of backups that have been successfully
1282
// backed up using the given session.
1283
func (c *ClientDB) NumAckedUpdates(id *SessionID) (uint64, error) {
×
1284
        var numAcked uint64
×
1285
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
×
1286
                sessions := tx.ReadBucket(cSessionBkt)
×
1287
                if sessions == nil {
×
1288
                        return ErrUninitializedDB
×
1289
                }
×
1290

1291
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
×
1292
                if chanIDIndexBkt == nil {
×
1293
                        return ErrUninitializedDB
×
1294
                }
×
1295

1296
                sessionBkt := sessions.NestedReadBucket(id[:])
×
1297
                if sessionBkt == nil {
×
1298
                        return nil
×
1299
                }
×
1300

1301
                // First, account for any rogue updates.
1302
                rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
×
1303
                if len(rogueCountBytes) != 0 {
×
1304
                        rogueCount, err := readBigSize(rogueCountBytes)
×
1305
                        if err != nil {
×
1306
                                return err
×
1307
                        }
×
1308

1309
                        numAcked += rogueCount
×
1310
                }
1311

1312
                // Then, check if the session-ack-ranges contains any entries
1313
                // to account for.
1314
                sessionAckRanges := sessionBkt.NestedReadBucket(
×
1315
                        cSessionAckRangeIndex,
×
1316
                )
×
1317
                if sessionAckRanges == nil {
×
1318
                        return nil
×
1319
                }
×
1320

1321
                // Iterate over the channel ID's in the sessionAckRanges
1322
                // bucket.
1323
                return sessionAckRanges.ForEach(func(dbChanID, _ []byte) error {
×
1324
                        // Get the range index for the session-channel pair.
×
1325
                        chanIDBytes := chanIDIndexBkt.Get(dbChanID)
×
1326
                        var chanID lnwire.ChannelID
×
1327
                        copy(chanID[:], chanIDBytes)
×
1328

×
1329
                        index, err := c.getRangeIndex(tx, *id, chanID)
×
1330
                        if err != nil {
×
1331
                                return err
×
1332
                        }
×
1333

1334
                        numAcked += index.NumInSet()
×
1335

×
1336
                        return nil
×
1337
                })
1338
        }, func() {
×
1339
                numAcked = 0
×
1340
        })
×
1341
        if err != nil {
×
1342
                return 0, err
×
1343
        }
×
1344

1345
        return numAcked, nil
×
1346
}
1347

1348
// FetchChanInfos loads a mapping from all registered channels to their
1349
// ChannelInfo. Only the channels that have not yet been marked as closed will
1350
// be loaded.
1351
func (c *ClientDB) FetchChanInfos() (ChannelInfos, error) {
4✔
1352
        var infos ChannelInfos
4✔
1353

4✔
1354
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1355
                chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
4✔
1356
                if chanDetailsBkt == nil {
4✔
1357
                        return ErrUninitializedDB
×
1358
                }
×
1359

1360
                return chanDetailsBkt.ForEach(func(k, _ []byte) error {
8✔
1361
                        chanDetails := chanDetailsBkt.NestedReadBucket(k)
4✔
1362
                        if chanDetails == nil {
4✔
1363
                                return ErrCorruptChanDetails
×
1364
                        }
×
1365
                        // If this channel has already been marked as closed,
1366
                        // then its summary does not need to be loaded.
1367
                        closedHeight := chanDetails.Get(cChanClosedHeight)
4✔
1368
                        if len(closedHeight) > 0 {
4✔
1369
                                return nil
×
1370
                        }
×
1371
                        var chanID lnwire.ChannelID
4✔
1372
                        copy(chanID[:], k)
4✔
1373
                        summary, err := getChanSummary(chanDetails)
4✔
1374
                        if err != nil {
4✔
1375
                                return err
×
1376
                        }
×
1377

1378
                        info := &ChannelInfo{
4✔
1379
                                ClientChanSummary: *summary,
4✔
1380
                        }
4✔
1381

4✔
1382
                        maxHeightBytes := chanDetails.Get(
4✔
1383
                                cChanMaxCommitmentHeight,
4✔
1384
                        )
4✔
1385
                        if len(maxHeightBytes) != 0 {
8✔
1386
                                height, err := readBigSize(maxHeightBytes)
4✔
1387
                                if err != nil {
4✔
1388
                                        return err
×
1389
                                }
×
1390

1391
                                info.MaxHeight = fn.Some(height)
4✔
1392
                        }
1393

1394
                        infos[chanID] = info
4✔
1395

4✔
1396
                        return nil
4✔
1397
                })
1398
        }, func() {
4✔
1399
                infos = make(ChannelInfos)
4✔
1400
        })
4✔
1401
        if err != nil {
4✔
1402
                return nil, err
×
1403
        }
×
1404

1405
        return infos, nil
4✔
1406
}
1407

1408
// RegisterChannel registers a channel for use within the client database. For
1409
// now, all that is stored in the channel summary is the sweep pkscript that
1410
// we'd like any tower sweeps to pay into. In the future, this will be extended
1411
// to contain more info to allow the client efficiently request historical
1412
// states to be backed up under the client's active policy.
1413
func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID,
1414
        sweepPkScript []byte) error {
4✔
1415

4✔
1416
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
1417
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
1418
                if chanDetailsBkt == nil {
4✔
1419
                        return ErrUninitializedDB
×
1420
                }
×
1421

1422
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:])
4✔
1423
                if chanDetails != nil {
4✔
1424
                        // Channel is already registered.
×
1425
                        return ErrChannelAlreadyRegistered
×
1426
                }
×
1427

1428
                chanDetails, err := chanDetailsBkt.CreateBucket(chanID[:])
4✔
1429
                if err != nil {
4✔
1430
                        return err
×
1431
                }
×
1432

1433
                // Get the channel-id-index bucket.
1434
                indexBkt := tx.ReadWriteBucket(cChanIDIndexBkt)
4✔
1435
                if indexBkt == nil {
4✔
1436
                        return ErrUninitializedDB
×
1437
                }
×
1438

1439
                // Request the next unique id from the bucket.
1440
                nextSeq, err := indexBkt.NextSequence()
4✔
1441
                if err != nil {
4✔
1442
                        return err
×
1443
                }
×
1444

1445
                // Use BigSize encoding to encode the db-assigned index.
1446
                newIndex, err := writeBigSize(nextSeq)
4✔
1447
                if err != nil {
4✔
1448
                        return err
×
1449
                }
×
1450

1451
                // Add the new db-assigned ID to channel-ID pair.
1452
                err = indexBkt.Put(newIndex, chanID[:])
4✔
1453
                if err != nil {
4✔
1454
                        return err
×
1455
                }
×
1456

1457
                // Add the db-assigned ID to the channel's channel details
1458
                // bucket under the cChanDBID key.
1459
                err = chanDetails.Put(cChanDBID, newIndex)
4✔
1460
                if err != nil {
4✔
1461
                        return err
×
1462
                }
×
1463

1464
                summary := ClientChanSummary{
4✔
1465
                        SweepPkScript: sweepPkScript,
4✔
1466
                }
4✔
1467

4✔
1468
                return putChanSummary(chanDetails, &summary)
4✔
1469
        }, func() {})
4✔
1470
}
1471

1472
// MarkBackupIneligible records that the state identified by the (channel id,
1473
// commit height) tuple was ineligible for being backed up under the current
1474
// policy. This state can be retried later under a different policy.
1475
func (c *ClientDB) MarkBackupIneligible(chanID lnwire.ChannelID,
1476
        commitHeight uint64) error {
×
1477

×
1478
        return nil
×
1479
}
×
1480

1481
// ListClosableSessions fetches and returns the IDs for all sessions marked as
1482
// closable.
1483
func (c *ClientDB) ListClosableSessions() (map[SessionID]uint32, error) {
4✔
1484
        sessions := make(map[SessionID]uint32)
4✔
1485
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1486
                csBkt := tx.ReadBucket(cClosableSessionsBkt)
4✔
1487
                if csBkt == nil {
4✔
1488
                        return ErrUninitializedDB
×
1489
                }
×
1490

1491
                sessIDIndexBkt := tx.ReadBucket(cSessionIDIndexBkt)
4✔
1492
                if sessIDIndexBkt == nil {
4✔
1493
                        return ErrUninitializedDB
×
1494
                }
×
1495

1496
                return csBkt.ForEach(func(dbIDBytes, heightBytes []byte) error {
4✔
1497
                        dbID, err := readBigSize(dbIDBytes)
×
1498
                        if err != nil {
×
1499
                                return err
×
1500
                        }
×
1501

1502
                        sessID, err := getRealSessionID(sessIDIndexBkt, dbID)
×
1503
                        if err != nil {
×
1504
                                return err
×
1505
                        }
×
1506

1507
                        sessions[*sessID] = byteOrder.Uint32(heightBytes)
×
1508

×
1509
                        return nil
×
1510
                })
1511
        }, func() {
4✔
1512
                sessions = make(map[SessionID]uint32)
4✔
1513
        })
4✔
1514
        if err != nil {
4✔
1515
                return nil, err
×
1516
        }
×
1517

1518
        return sessions, nil
4✔
1519
}
1520

1521
// DeleteSession can be called when a session should be deleted from the DB.
1522
// All references to the session will also be deleted from the DB. Note that a
1523
// session will only be deleted if was previously marked as closable.
1524
func (c *ClientDB) DeleteSession(id SessionID) error {
4✔
1525
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
1526
                sessionsBkt := tx.ReadWriteBucket(cSessionBkt)
4✔
1527
                if sessionsBkt == nil {
4✔
1528
                        return ErrUninitializedDB
×
1529
                }
×
1530

1531
                closableBkt := tx.ReadWriteBucket(cClosableSessionsBkt)
4✔
1532
                if closableBkt == nil {
4✔
1533
                        return ErrUninitializedDB
×
1534
                }
×
1535

1536
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
1537
                if chanDetailsBkt == nil {
4✔
1538
                        return ErrUninitializedDB
×
1539
                }
×
1540

1541
                sessIDIndexBkt := tx.ReadWriteBucket(cSessionIDIndexBkt)
4✔
1542
                if sessIDIndexBkt == nil {
4✔
1543
                        return ErrUninitializedDB
×
1544
                }
×
1545

1546
                chanIDIndexBkt := tx.ReadWriteBucket(cChanIDIndexBkt)
4✔
1547
                if chanIDIndexBkt == nil {
4✔
1548
                        return ErrUninitializedDB
×
1549
                }
×
1550

1551
                towerToSessBkt := tx.ReadWriteBucket(cTowerToSessionIndexBkt)
4✔
1552
                if towerToSessBkt == nil {
4✔
1553
                        return ErrUninitializedDB
×
1554
                }
×
1555

1556
                // Get the sub-bucket for this session ID. If it does not exist
1557
                // then the session has already been deleted and so our work is
1558
                // done.
1559
                sessionBkt := sessionsBkt.NestedReadBucket(id[:])
4✔
1560
                if sessionBkt == nil {
4✔
1561
                        return nil
×
1562
                }
×
1563

1564
                _, dbIDBytes, err := getDBSessionID(sessionsBkt, id)
4✔
1565
                if err != nil {
4✔
1566
                        return err
×
1567
                }
×
1568

1569
                // First we check if the session has actually been marked as
1570
                // closable.
1571
                if closableBkt.Get(dbIDBytes) == nil {
4✔
1572
                        return ErrSessionNotClosable
×
1573
                }
×
1574

1575
                sess, err := getClientSessionBody(sessionsBkt, id[:])
4✔
1576
                if err != nil {
4✔
1577
                        return err
×
1578
                }
×
1579

1580
                // Delete from the tower-to-sessionID index.
1581
                towerIndexBkt := towerToSessBkt.NestedReadWriteBucket(
4✔
1582
                        sess.TowerID.Bytes(),
4✔
1583
                )
4✔
1584
                if towerIndexBkt == nil {
4✔
1585
                        return fmt.Errorf("no entry in the tower-to-session "+
×
1586
                                "index found for tower ID %v", sess.TowerID)
×
1587
                }
×
1588

1589
                err = towerIndexBkt.Delete(id[:])
4✔
1590
                if err != nil {
4✔
1591
                        return err
×
1592
                }
×
1593

1594
                // Delete entry from session ID index.
1595
                err = sessIDIndexBkt.Delete(dbIDBytes)
4✔
1596
                if err != nil {
4✔
1597
                        return err
×
1598
                }
×
1599

1600
                // Delete the entry from the closable sessions index.
1601
                err = closableBkt.Delete(dbIDBytes)
4✔
1602
                if err != nil {
4✔
1603
                        return err
×
1604
                }
×
1605

1606
                ackRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
4✔
1607

4✔
1608
                // There is a small chance that the session only contains rogue
4✔
1609
                // updates. In that case, there will be no ack-ranges index but
4✔
1610
                // the rogue update count will be equal the MaxUpdates.
4✔
1611
                rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
4✔
1612
                if len(rogueCountBytes) != 0 {
4✔
1613
                        rogueCount, err := readBigSize(rogueCountBytes)
×
1614
                        if err != nil {
×
1615
                                return err
×
1616
                        }
×
1617

1618
                        maxUpdates := sess.ClientSessionBody.Policy.MaxUpdates
×
1619
                        if rogueCount == uint64(maxUpdates) {
×
1620
                                // Do a sanity check to ensure that the acked
×
1621
                                // ranges bucket does not exist in this case.
×
1622
                                if ackRanges != nil {
×
1623
                                        return fmt.Errorf("acked updates "+
×
1624
                                                "exist for session with a "+
×
1625
                                                "max-updates(%d) rogue count",
×
1626
                                                rogueCount)
×
1627
                                }
×
1628

1629
                                return sessionsBkt.DeleteNestedBucket(id[:])
×
1630
                        }
1631
                }
1632

1633
                // A session would only be considered closable if it was
1634
                // exhausted. Meaning that it should not be the case that it has
1635
                // no acked-updates.
1636
                if ackRanges == nil {
4✔
1637
                        return fmt.Errorf("cannot delete session %s since it "+
×
1638
                                "is not yet exhausted", id)
×
1639
                }
×
1640

1641
                // For each of the channels, delete the session ID entry.
1642
                err = ackRanges.ForEach(func(chanDBID, _ []byte) error {
8✔
1643
                        chanDBIDInt, err := readBigSize(chanDBID)
4✔
1644
                        if err != nil {
4✔
1645
                                return err
×
1646
                        }
×
1647

1648
                        chanID, err := getRealChannelID(
4✔
1649
                                chanIDIndexBkt, chanDBIDInt,
4✔
1650
                        )
4✔
1651
                        if err != nil {
4✔
1652
                                return err
×
1653
                        }
×
1654

1655
                        chanDetails := chanDetailsBkt.NestedReadWriteBucket(
4✔
1656
                                chanID[:],
4✔
1657
                        )
4✔
1658
                        if chanDetails == nil {
4✔
1659
                                return ErrChannelNotRegistered
×
1660
                        }
×
1661

1662
                        chanSessions := chanDetails.NestedReadWriteBucket(
4✔
1663
                                cChanSessions,
4✔
1664
                        )
4✔
1665
                        if chanSessions == nil {
4✔
1666
                                return fmt.Errorf("no session list found for "+
×
1667
                                        "channel %s", chanID)
×
1668
                        }
×
1669

1670
                        // Check that this session was actually listed in the
1671
                        // session list for this channel.
1672
                        if len(chanSessions.Get(dbIDBytes)) == 0 {
4✔
1673
                                return fmt.Errorf("session %s not found in "+
×
1674
                                        "the session list for channel %s", id,
×
1675
                                        chanID)
×
1676
                        }
×
1677

1678
                        // If it was, then delete it.
1679
                        err = chanSessions.Delete(dbIDBytes)
4✔
1680
                        if err != nil {
4✔
1681
                                return err
×
1682
                        }
×
1683

1684
                        // If this was the last session for this channel, we can
1685
                        // now delete the channel details for this channel
1686
                        // completely.
1687
                        err = chanSessions.ForEach(func(_, _ []byte) error {
8✔
1688
                                return errChannelHasMoreSessions
4✔
1689
                        })
4✔
1690
                        if errors.Is(err, errChannelHasMoreSessions) {
8✔
1691
                                return nil
4✔
1692
                        } else if err != nil {
8✔
1693
                                return err
×
1694
                        }
×
1695

1696
                        // Delete the channel's entry from the channel-id-index.
1697
                        dbID := chanDetails.Get(cChanDBID)
4✔
1698
                        err = chanIDIndexBkt.Delete(dbID)
4✔
1699
                        if err != nil {
4✔
1700
                                return err
×
1701
                        }
×
1702

1703
                        // Delete the channel details.
1704
                        return chanDetailsBkt.DeleteNestedBucket(chanID[:])
4✔
1705
                })
1706
                if err != nil {
4✔
1707
                        return err
×
1708
                }
×
1709

1710
                // Delete the actual session.
1711
                return sessionsBkt.DeleteNestedBucket(id[:])
4✔
1712
        }, func() {})
4✔
1713
}
1714

1715
// MarkChannelClosed will mark a registered channel as closed by setting its
1716
// closed-height as the given block height. It returns a list of session IDs for
1717
// sessions that are now considered closable due to the close of this channel.
1718
// The details for this channel will be deleted from the DB if there are no more
1719
// sessions in the DB that contain updates for this channel.
1720
func (c *ClientDB) MarkChannelClosed(chanID lnwire.ChannelID,
1721
        blockHeight uint32) ([]SessionID, error) {
4✔
1722

4✔
1723
        var closableSessions []SessionID
4✔
1724
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
1725
                sessionsBkt := tx.ReadBucket(cSessionBkt)
4✔
1726
                if sessionsBkt == nil {
4✔
1727
                        return ErrUninitializedDB
×
1728
                }
×
1729

1730
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
1731
                if chanDetailsBkt == nil {
4✔
1732
                        return ErrUninitializedDB
×
1733
                }
×
1734

1735
                closableSessBkt := tx.ReadWriteBucket(cClosableSessionsBkt)
4✔
1736
                if closableSessBkt == nil {
4✔
1737
                        return ErrUninitializedDB
×
1738
                }
×
1739

1740
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
1741
                if chanIDIndexBkt == nil {
4✔
1742
                        return ErrUninitializedDB
×
1743
                }
×
1744

1745
                sessIDIndexBkt := tx.ReadBucket(cSessionIDIndexBkt)
4✔
1746
                if sessIDIndexBkt == nil {
4✔
1747
                        return ErrUninitializedDB
×
1748
                }
×
1749

1750
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:])
4✔
1751
                if chanDetails == nil {
4✔
1752
                        return ErrChannelNotRegistered
×
1753
                }
×
1754

1755
                // If there are no sessions for this channel, the channel
1756
                // details can be deleted.
1757
                chanSessIDsBkt := chanDetails.NestedReadBucket(cChanSessions)
4✔
1758
                if chanSessIDsBkt == nil {
4✔
1759
                        return chanDetailsBkt.DeleteNestedBucket(chanID[:])
×
1760
                }
×
1761

1762
                // Otherwise, mark the channel as closed.
1763
                var height [4]byte
4✔
1764
                byteOrder.PutUint32(height[:], blockHeight)
4✔
1765

4✔
1766
                err := chanDetails.Put(cChanClosedHeight, height[:])
4✔
1767
                if err != nil {
4✔
1768
                        return err
×
1769
                }
×
1770

1771
                // Now iterate through all the sessions of the channel to check
1772
                // if any of them are closeable.
1773
                return chanSessIDsBkt.ForEach(func(sessDBID, _ []byte) error {
8✔
1774
                        sessDBIDInt, err := readBigSize(sessDBID)
4✔
1775
                        if err != nil {
4✔
1776
                                return err
×
1777
                        }
×
1778

1779
                        // Use the session-ID index to get the real session ID.
1780
                        sID, err := getRealSessionID(
4✔
1781
                                sessIDIndexBkt, sessDBIDInt,
4✔
1782
                        )
4✔
1783
                        if err != nil {
4✔
1784
                                return err
×
1785
                        }
×
1786

1787
                        isClosable, err := isSessionClosable(
4✔
1788
                                sessionsBkt, chanDetailsBkt, chanIDIndexBkt,
4✔
1789
                                sID,
4✔
1790
                        )
4✔
1791
                        if err != nil {
4✔
1792
                                return err
×
1793
                        }
×
1794

1795
                        if !isClosable {
8✔
1796
                                return nil
4✔
1797
                        }
4✔
1798

1799
                        // Add session to "closableSessions" list and add the
1800
                        // block height that this last channel was closed in.
1801
                        // This will be used in future to determine when we
1802
                        // should delete the session.
1803
                        var height [4]byte
4✔
1804
                        byteOrder.PutUint32(height[:], blockHeight)
4✔
1805
                        err = closableSessBkt.Put(sessDBID, height[:])
4✔
1806
                        if err != nil {
4✔
1807
                                return err
×
1808
                        }
×
1809

1810
                        closableSessions = append(closableSessions, *sID)
4✔
1811

4✔
1812
                        return nil
4✔
1813
                })
1814
        }, func() {
4✔
1815
                closableSessions = nil
4✔
1816
        })
4✔
1817
        if err != nil {
4✔
1818
                return nil, err
×
1819
        }
×
1820

1821
        return closableSessions, nil
4✔
1822
}
1823

1824
// isSessionClosable returns true if a session is considered closable. A session
1825
// is considered closable only if all the following points are true:
1826
//  1. It has no un-acked updates.
1827
//  2. It is exhausted (ie it can't accept any more updates) OR it has been
1828
//     marked as terminal.
1829
//  3. All the channels that it has acked updates for are closed.
1830
func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket,
1831
        id *SessionID) (bool, error) {
4✔
1832

4✔
1833
        sessBkt := sessionsBkt.NestedReadBucket(id[:])
4✔
1834
        if sessBkt == nil {
4✔
1835
                return false, ErrSessionNotFound
×
1836
        }
×
1837

1838
        // Since the DeleteCommittedUpdates method deletes the cSessionCommits
1839
        // bucket in one go, it is possible for the session to be closable even
1840
        // if this bucket no longer exists.
1841
        commitsBkt := sessBkt.NestedReadBucket(cSessionCommits)
4✔
1842
        if commitsBkt != nil {
8✔
1843
                // If the session has any un-acked updates, then it is not yet
4✔
1844
                // closable.
4✔
1845
                err := commitsBkt.ForEach(func(_, _ []byte) error {
4✔
1846
                        return ErrSessionHasUnackedUpdates
×
1847
                })
×
1848
                if errors.Is(err, ErrSessionHasUnackedUpdates) {
4✔
1849
                        return false, nil
×
1850
                } else if err != nil {
4✔
1851
                        return false, err
×
1852
                }
×
1853
        }
1854

1855
        session, err := getClientSessionBody(sessionsBkt, id[:])
4✔
1856
        if err != nil {
4✔
1857
                return false, err
×
1858
        }
×
1859

1860
        isTerminal := session.Status == CSessionTerminal
4✔
1861

4✔
1862
        // We have already checked that the session has no more committed
4✔
1863
        // updates. So now we can check if the session is exhausted or has a
4✔
1864
        // terminal state.
4✔
1865
        if !isTerminal && session.SeqNum < session.Policy.MaxUpdates {
8✔
1866
                // If the session is not yet exhausted, and it is not yet in a
4✔
1867
                // terminal state then it is not yet closable.
4✔
1868
                return false, nil
4✔
1869
        }
4✔
1870

1871
        // Either the acked-update bucket should exist _or_ the rogue update
1872
        // count must be equal to the session's MaxUpdates value, otherwise
1873
        // something is wrong because the above check ensures that the session
1874
        // has been exhausted.
1875
        rogueCountBytes := sessBkt.Get(cSessionRogueUpdateCount)
4✔
1876
        if len(rogueCountBytes) != 0 {
4✔
1877
                rogueCount, err := readBigSize(rogueCountBytes)
×
1878
                if err != nil {
×
1879
                        return false, err
×
1880
                }
×
1881

1882
                if rogueCount == uint64(session.Policy.MaxUpdates) {
×
1883
                        return true, nil
×
1884
                }
×
1885
        }
1886

1887
        ackedRangeBkt := sessBkt.NestedReadBucket(cSessionAckRangeIndex)
4✔
1888
        if ackedRangeBkt == nil {
4✔
1889
                if isTerminal {
×
1890
                        return true, nil
×
1891
                }
×
1892

1893
                // If the session has no acked-updates, and it is not in a
1894
                // terminal state then something is wrong since the above check
1895
                // ensures that this session has been exhausted meaning that it
1896
                // should have MaxUpdates acked updates.
1897
                return false, fmt.Errorf("no acked-updates found for "+
×
1898
                        "exhausted session %s", id)
×
1899
        }
1900

1901
        // Iterate over each of the channels that the session has acked-updates
1902
        // for. If any of those channels are not closed, then the session is
1903
        // not yet closable.
1904
        err = ackedRangeBkt.ForEach(func(dbChanID, _ []byte) error {
8✔
1905
                dbChanIDInt, err := readBigSize(dbChanID)
4✔
1906
                if err != nil {
4✔
1907
                        return err
×
1908
                }
×
1909

1910
                chanID, err := getRealChannelID(chanIDIndexBkt, dbChanIDInt)
4✔
1911
                if err != nil {
4✔
1912
                        return err
×
1913
                }
×
1914

1915
                // Get the channel details bucket for the channel.
1916
                chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
4✔
1917
                if chanDetails == nil {
4✔
1918
                        return fmt.Errorf("no channel details found for "+
×
1919
                                "channel %s referenced by session %s", chanID,
×
1920
                                id)
×
1921
                }
×
1922

1923
                // If a closed height has been set, then the channel is closed.
1924
                closedHeight := chanDetails.Get(cChanClosedHeight)
4✔
1925
                if len(closedHeight) > 0 {
8✔
1926
                        return nil
4✔
1927
                }
4✔
1928

1929
                // Otherwise, the channel is not yet closed meaning that the
1930
                // session is not yet closable. We break the ForEach by
1931
                // returning an error to indicate this.
1932
                return errSessionHasOpenChannels
×
1933
        })
1934
        if errors.Is(err, errSessionHasOpenChannels) {
4✔
1935
                return false, nil
×
1936
        } else if err != nil {
4✔
1937
                return false, err
×
1938
        }
×
1939

1940
        return true, nil
4✔
1941
}
1942

1943
// CommitUpdate persists the CommittedUpdate provided in the slot for (session,
1944
// seqNum). This allows the client to retransmit this update on startup.
1945
func (c *ClientDB) CommitUpdate(id *SessionID,
1946
        update *CommittedUpdate) (uint16, error) {
4✔
1947

4✔
1948
        var lastApplied uint16
4✔
1949
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
1950
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
1951
                if sessions == nil {
4✔
1952
                        return ErrUninitializedDB
×
1953
                }
×
1954

1955
                // We'll only load the ClientSession body for performance, since
1956
                // we primarily need to inspect its SeqNum and TowerLastApplied
1957
                // fields. The CommittedUpdates will be modified on disk
1958
                // directly.
1959
                session, err := getClientSessionBody(sessions, id[:])
4✔
1960
                if err != nil {
4✔
1961
                        return err
×
1962
                }
×
1963

1964
                // Can't fail if the above didn't fail.
1965
                sessionBkt := sessions.NestedReadWriteBucket(id[:])
4✔
1966

4✔
1967
                // Ensure the session commits sub-bucket is initialized.
4✔
1968
                sessionCommits, err := sessionBkt.CreateBucketIfNotExists(
4✔
1969
                        cSessionCommits,
4✔
1970
                )
4✔
1971
                if err != nil {
4✔
1972
                        return err
×
1973
                }
×
1974

1975
                var seqNumBuf [2]byte
4✔
1976
                byteOrder.PutUint16(seqNumBuf[:], update.SeqNum)
4✔
1977

4✔
1978
                // Check to see if a committed update already exists for this
4✔
1979
                // sequence number.
4✔
1980
                committedUpdateBytes := sessionCommits.Get(seqNumBuf[:])
4✔
1981
                if committedUpdateBytes != nil {
4✔
1982
                        var dbUpdate CommittedUpdate
×
1983
                        err := dbUpdate.Decode(
×
1984
                                bytes.NewReader(committedUpdateBytes),
×
1985
                        )
×
1986
                        if err != nil {
×
1987
                                return err
×
1988
                        }
×
1989

1990
                        // If an existing committed update has a different hint,
1991
                        // we'll reject this newer update.
1992
                        if dbUpdate.Hint != update.Hint {
×
1993
                                return ErrUpdateAlreadyCommitted
×
1994
                        }
×
1995

1996
                        // Otherwise, capture the last applied value and
1997
                        // succeed.
1998
                        lastApplied = session.TowerLastApplied
×
1999
                        return nil
×
2000
                }
2001

2002
                // There's no committed update for this sequence number, ensure
2003
                // that we are committing the next unallocated one.
2004
                if update.SeqNum != session.SeqNum+1 {
4✔
2005
                        return ErrCommitUnorderedUpdate
×
2006
                }
×
2007

2008
                // Increment the session's sequence number and store the updated
2009
                // client session.
2010
                //
2011
                // TODO(conner): split out seqnum and last applied own bucket to
2012
                // eliminate serialization of full struct during CommitUpdate?
2013
                // Can also read/write directly to byes [:2] without migration.
2014
                session.SeqNum++
4✔
2015
                err = putClientSessionBody(sessionBkt, session)
4✔
2016
                if err != nil {
4✔
2017
                        return err
×
2018
                }
×
2019

2020
                // Encode and store the committed update in the sessionCommits
2021
                // sub-bucket under the requested sequence number.
2022
                var b bytes.Buffer
4✔
2023
                err = update.Encode(&b)
4✔
2024
                if err != nil {
4✔
2025
                        return err
×
2026
                }
×
2027

2028
                err = sessionCommits.Put(seqNumBuf[:], b.Bytes())
4✔
2029
                if err != nil {
4✔
2030
                        return err
×
2031
                }
×
2032

2033
                // Update the channel's max commitment height if needed.
2034
                err = maybeUpdateMaxCommitHeight(tx, update.BackupID)
4✔
2035
                if err != nil {
4✔
2036
                        return err
×
2037
                }
×
2038

2039
                // Finally, capture the session's last applied value so it can
2040
                // be sent in the next state update to the tower.
2041
                lastApplied = session.TowerLastApplied
4✔
2042

4✔
2043
                return nil
4✔
2044
        }, func() {
4✔
2045
                lastApplied = 0
4✔
2046
        })
4✔
2047
        if err != nil {
4✔
2048
                return 0, err
×
2049
        }
×
2050

2051
        return lastApplied, nil
4✔
2052
}
2053

2054
// AckUpdate persists an acknowledgment for a given (session, seqnum) pair. This
2055
// removes the update from the set of committed updates, and validates the
2056
// lastApplied value returned from the tower.
2057
func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
2058
        lastApplied uint16) error {
4✔
2059

4✔
2060
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
2061
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
2062
                if sessions == nil {
4✔
2063
                        return ErrUninitializedDB
×
2064
                }
×
2065

2066
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
2067
                if chanDetailsBkt == nil {
4✔
2068
                        return ErrUninitializedDB
×
2069
                }
×
2070

2071
                // We'll only load the ClientSession body for performance, since
2072
                // we primarily need to inspect its SeqNum and TowerLastApplied
2073
                // fields. The CommittedUpdates and AckedUpdates will be
2074
                // modified on disk directly.
2075
                session, err := getClientSessionBody(sessions, id[:])
4✔
2076
                if err != nil {
4✔
2077
                        return err
×
2078
                }
×
2079

2080
                // If the tower has acked a sequence number beyond our highest
2081
                // sequence number, fail.
2082
                if lastApplied > session.SeqNum {
4✔
2083
                        return ErrUnallocatedLastApplied
×
2084
                }
×
2085

2086
                // If the tower acked with a lower sequence number than it gave
2087
                // us prior, fail.
2088
                if lastApplied < session.TowerLastApplied {
4✔
2089
                        return ErrLastAppliedReversion
×
2090
                }
×
2091

2092
                // TODO(conner): split out seqnum and last applied own bucket to
2093
                // eliminate serialization of full struct during AckUpdate?  Can
2094
                // also read/write directly to byes [2:4] without migration.
2095
                session.TowerLastApplied = lastApplied
4✔
2096

4✔
2097
                // Can't fail because getClientSession succeeded.
4✔
2098
                sessionBkt := sessions.NestedReadWriteBucket(id[:])
4✔
2099

4✔
2100
                // Write the client session with the updated last applied value.
4✔
2101
                err = putClientSessionBody(sessionBkt, session)
4✔
2102
                if err != nil {
4✔
2103
                        return err
×
2104
                }
×
2105

2106
                // If the commits sub-bucket doesn't exist, there can't possibly
2107
                // be a corresponding committed update to remove.
2108
                sessionCommits := sessionBkt.NestedReadWriteBucket(
4✔
2109
                        cSessionCommits,
4✔
2110
                )
4✔
2111
                if sessionCommits == nil {
4✔
2112
                        return ErrCommittedUpdateNotFound
×
2113
                }
×
2114

2115
                var seqNumBuf [2]byte
4✔
2116
                byteOrder.PutUint16(seqNumBuf[:], seqNum)
4✔
2117

4✔
2118
                // Assert that a committed update exists for this sequence
4✔
2119
                // number.
4✔
2120
                committedUpdateBytes := sessionCommits.Get(seqNumBuf[:])
4✔
2121
                if committedUpdateBytes == nil {
4✔
2122
                        return ErrCommittedUpdateNotFound
×
2123
                }
×
2124

2125
                var committedUpdate CommittedUpdate
4✔
2126
                err = committedUpdate.Decode(
4✔
2127
                        bytes.NewReader(committedUpdateBytes),
4✔
2128
                )
4✔
2129
                if err != nil {
4✔
2130
                        return err
×
2131
                }
×
2132

2133
                // Remove the corresponding committed update.
2134
                err = sessionCommits.Delete(seqNumBuf[:])
4✔
2135
                if err != nil {
4✔
2136
                        return err
×
2137
                }
×
2138

2139
                dbSessionID, dbSessIDBytes, err := getDBSessionID(sessions, *id)
4✔
2140
                if err != nil {
4✔
2141
                        return err
×
2142
                }
×
2143

2144
                chanID := committedUpdate.BackupID.ChanID
4✔
2145
                height := committedUpdate.BackupID.CommitHeight
4✔
2146

4✔
2147
                // Get the DB representation of the channel-ID. There is a
4✔
2148
                // chance that the channel corresponding to this update has been
4✔
2149
                // closed and that the details for this channel no longer exist
4✔
2150
                // in the tower client DB. In that case, we consider this a
4✔
2151
                // rogue update and all we do is make sure to keep track of the
4✔
2152
                // number of rogue updates for this session.
4✔
2153
                _, dbChanIDBytes, err := getDBChanID(chanDetailsBkt, chanID)
4✔
2154
                if errors.Is(err, ErrChannelNotRegistered) {
4✔
2155
                        var (
×
2156
                                count uint64
×
2157
                                err   error
×
2158
                        )
×
2159

×
2160
                        rogueCountBytes := sessionBkt.Get(
×
2161
                                cSessionRogueUpdateCount,
×
2162
                        )
×
2163
                        if len(rogueCountBytes) != 0 {
×
2164
                                count, err = readBigSize(rogueCountBytes)
×
2165
                                if err != nil {
×
2166
                                        return err
×
2167
                                }
×
2168
                        }
2169

2170
                        rogueCount := count + 1
×
2171
                        countBytes, err := writeBigSize(rogueCount)
×
2172
                        if err != nil {
×
2173
                                return err
×
2174
                        }
×
2175

2176
                        err = sessionBkt.Put(
×
2177
                                cSessionRogueUpdateCount, countBytes,
×
2178
                        )
×
2179
                        if err != nil {
×
2180
                                return err
×
2181
                        }
×
2182

2183
                        // In the rare chance that this session only has rogue
2184
                        // updates, we check here if the count is equal to the
2185
                        // MaxUpdate of the session. If it is, then we mark the
2186
                        // session as closable.
2187
                        if rogueCount != uint64(session.Policy.MaxUpdates) {
×
2188
                                return nil
×
2189
                        }
×
2190

2191
                        // Before we mark the session as closable, we do a
2192
                        // sanity check to ensure that this session has no
2193
                        // acked-update index.
2194
                        sessionAckRanges := sessionBkt.NestedReadBucket(
×
2195
                                cSessionAckRangeIndex,
×
2196
                        )
×
2197
                        if sessionAckRanges != nil {
×
2198
                                return fmt.Errorf("session(%s) has an "+
×
2199
                                        "acked ranges index but has a rogue "+
×
2200
                                        "count indicating saturation",
×
2201
                                        session.ID)
×
2202
                        }
×
2203

2204
                        closableSessBkt := tx.ReadWriteBucket(
×
2205
                                cClosableSessionsBkt,
×
2206
                        )
×
2207
                        if closableSessBkt == nil {
×
2208
                                return ErrUninitializedDB
×
2209
                        }
×
2210

2211
                        var height [4]byte
×
2212
                        byteOrder.PutUint32(height[:], 0)
×
2213

×
2214
                        return closableSessBkt.Put(dbSessIDBytes, height[:])
×
2215
                } else if err != nil {
4✔
2216
                        return err
×
2217
                }
×
2218

2219
                // Get the ranges write bucket before getting the range index to
2220
                // ensure that the session acks sub-bucket is initialized, so
2221
                // that we can insert an entry.
2222
                rangesBkt, err := getRangesWriteBucket(
4✔
2223
                        sessionBkt, dbChanIDBytes,
4✔
2224
                )
4✔
2225
                if err != nil {
4✔
2226
                        return err
×
2227
                }
×
2228

2229
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(
4✔
2230
                        committedUpdate.BackupID.ChanID[:],
4✔
2231
                )
4✔
2232
                if chanDetails == nil {
4✔
2233
                        return ErrChannelNotRegistered
×
2234
                }
×
2235

2236
                err = putChannelToSessionMapping(chanDetails, dbSessionID)
4✔
2237
                if err != nil {
4✔
2238
                        return err
×
2239
                }
×
2240

2241
                // Get the range index for the given session-channel pair.
2242
                index, err := c.getRangeIndex(tx, *id, chanID)
4✔
2243
                if err != nil {
4✔
2244
                        return err
×
2245
                }
×
2246

2247
                return index.Add(height, rangesBkt)
4✔
2248
        }, func() {})
4✔
2249
}
2250

2251
// GetDBQueue returns a BackupID Queue instance under the given namespace.
2252
func (c *ClientDB) GetDBQueue(namespace []byte) Queue[*BackupID] {
4✔
2253
        return NewQueueDB(
4✔
2254
                c.db, namespace, func() *BackupID {
8✔
2255
                        return &BackupID{}
4✔
2256
                }, func(tx kvdb.RwTx, item *BackupID) error {
4✔
2257
                        return maybeUpdateMaxCommitHeight(tx, *item)
×
2258
                },
×
2259
        )
2260
}
2261

2262
// TerminateSession sets the given session's status to CSessionTerminal meaning
2263
// that it will not be usable again. An error will be returned if the given
2264
// session still has un-acked updates that should be attended to.
2265
func (c *ClientDB) TerminateSession(id SessionID) error {
4✔
2266
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
2267
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
2268
                if sessions == nil {
4✔
2269
                        return ErrUninitializedDB
×
2270
                }
×
2271

2272
                sessionsBkt := tx.ReadBucket(cSessionBkt)
4✔
2273
                if sessionsBkt == nil {
4✔
2274
                        return ErrUninitializedDB
×
2275
                }
×
2276

2277
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
2278
                if chanIDIndexBkt == nil {
4✔
2279
                        return ErrUninitializedDB
×
2280
                }
×
2281

2282
                // Collect any un-acked updates for this session.
2283
                committedUpdateCount := make(map[SessionID]uint16)
4✔
2284
                perCommittedUpdate := func(s *ClientSession,
4✔
2285
                        _ *CommittedUpdate) {
4✔
2286

×
2287
                        committedUpdateCount[s.ID]++
×
2288
                }
×
2289

2290
                session, err := c.getClientSession(
4✔
2291
                        sessionsBkt, chanIDIndexBkt, id[:],
4✔
2292
                        WithPerCommittedUpdate(perCommittedUpdate),
4✔
2293
                )
4✔
2294
                if err != nil {
4✔
2295
                        return err
×
2296
                }
×
2297

2298
                // If there are any un-acked updates for this session then
2299
                // we don't allow the change of status as these updates must
2300
                // first be dealt with somehow.
2301
                if committedUpdateCount[id] > 0 {
4✔
2302
                        return ErrSessionHasUnackedUpdates
×
2303
                }
×
2304

2305
                return markSessionStatus(sessions, session, CSessionTerminal)
4✔
2306
        }, func() {})
4✔
2307
}
2308

2309
// DeleteCommittedUpdates deletes all the committed updates for the given
2310
// session.
2311
func (c *ClientDB) DeleteCommittedUpdates(id *SessionID) error {
4✔
2312
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
2313
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
2314
                if sessions == nil {
4✔
2315
                        return ErrUninitializedDB
×
2316
                }
×
2317

2318
                sessionBkt := sessions.NestedReadWriteBucket(id[:])
4✔
2319
                if sessionBkt == nil {
4✔
2320
                        return fmt.Errorf("session bucket %s not found",
×
2321
                                id.String())
×
2322
                }
×
2323

2324
                // If the commits sub-bucket doesn't exist, there can't possibly
2325
                // be corresponding updates to remove.
2326
                sessionCommits := sessionBkt.NestedReadWriteBucket(
4✔
2327
                        cSessionCommits,
4✔
2328
                )
4✔
2329
                if sessionCommits == nil {
8✔
2330
                        return nil
4✔
2331
                }
4✔
2332

2333
                // errFoundUpdates is an error we will use to exit early from
2334
                // the ForEach loop. The return of this error means that at
2335
                // least one committed update exists.
2336
                var errFoundUpdates = fmt.Errorf("found committed updates")
4✔
2337
                err := sessionCommits.ForEach(func(k, v []byte) error {
4✔
2338
                        return errFoundUpdates
×
2339
                })
×
2340
                switch {
4✔
2341
                // If the errFoundUpdates signal error was returned then there
2342
                // are some updates that need to be deleted.
2343
                case errors.Is(err, errFoundUpdates):
×
2344

2345
                // If no error is returned then the ForEach call back was never
2346
                // entered meaning that there are no un-acked committed updates.
2347
                // So we can exit now as there is nothing left to do.
2348
                case err == nil:
4✔
2349
                        return nil
4✔
2350

2351
                // If an expected error is returned, return that error.
2352
                default:
×
2353
                        return err
×
2354
                }
2355

2356
                session, err := getClientSessionBody(sessions, id[:])
×
2357
                if err != nil {
×
2358
                        return err
×
2359
                }
×
2360

2361
                // Once we delete a committed update from the session, the
2362
                // SeqNum of the session will be incorrect and so the session
2363
                // should be marked as terminal.
2364
                session.Status = CSessionTerminal
×
2365
                err = putClientSessionBody(sessionBkt, session)
×
2366
                if err != nil {
×
2367
                        return err
×
2368
                }
×
2369

2370
                // Delete all the committed updates in one go by deleting the
2371
                // session commits bucket.
2372
                return sessionBkt.DeleteNestedBucket(cSessionCommits)
×
2373
        }, func() {})
4✔
2374
}
2375

2376
// putChannelToSessionMapping adds the given session ID to a channel's
2377
// cChanSessions bucket.
2378
func putChannelToSessionMapping(chanDetails kvdb.RwBucket,
2379
        dbSessID uint64) error {
4✔
2380

4✔
2381
        chanSessIDsBkt, err := chanDetails.CreateBucketIfNotExists(
4✔
2382
                cChanSessions,
4✔
2383
        )
4✔
2384
        if err != nil {
4✔
2385
                return err
×
2386
        }
×
2387

2388
        b, err := writeBigSize(dbSessID)
4✔
2389
        if err != nil {
4✔
2390
                return err
×
2391
        }
×
2392

2393
        return chanSessIDsBkt.Put(b, []byte{1})
4✔
2394
}
2395

2396
// getClientSessionBody loads the body of a ClientSession from the sessions
2397
// bucket corresponding to the serialized session id. This does not deserialize
2398
// the CommittedUpdates, AckUpdates or the Tower associated with the session.
2399
// If the caller requires this info, use getClientSession.
2400
func getClientSessionBody(sessions kvdb.RBucket,
2401
        idBytes []byte) (*ClientSession, error) {
4✔
2402

4✔
2403
        sessionBkt := sessions.NestedReadBucket(idBytes)
4✔
2404
        if sessionBkt == nil {
4✔
2405
                return nil, ErrClientSessionNotFound
×
2406
        }
×
2407

2408
        // Should never have a sessionBkt without also having its body.
2409
        sessionBody := sessionBkt.Get(cSessionBody)
4✔
2410
        if sessionBody == nil {
4✔
2411
                return nil, ErrCorruptClientSession
×
2412
        }
×
2413

2414
        var session ClientSession
4✔
2415
        copy(session.ID[:], idBytes)
4✔
2416

4✔
2417
        err := session.Decode(bytes.NewReader(sessionBody))
4✔
2418
        if err != nil {
4✔
2419
                return nil, err
×
2420
        }
×
2421

2422
        return &session, nil
4✔
2423
}
2424

2425
// ClientSessionFilterFn describes the signature of a callback function that can
2426
// be used to filter the sessions that are returned in any of the DB methods
2427
// that read sessions from the DB.
2428
type ClientSessionFilterFn func(*ClientSession) bool
2429

2430
// ClientSessWithNumCommittedUpdatesFilterFn describes the signature of a
2431
// callback function that can be used to filter out a session based on the
2432
// contents of ClientSession along with the number of un-acked committed updates
2433
// that the session has.
2434
type ClientSessWithNumCommittedUpdatesFilterFn func(*ClientSession, uint16) bool
2435

2436
// PerMaxHeightCB describes the signature of a callback function that can be
2437
// called for each channel that a session has updates for to communicate the
2438
// maximum commitment height that the session has backed up for the channel.
2439
type PerMaxHeightCB func(*ClientSession, lnwire.ChannelID, uint64)
2440

2441
// PerNumAckedUpdatesCB describes the signature of a callback function that can
2442
// be called for each channel that a session has updates for to communicate the
2443
// number of updates that the session has for the channel.
2444
type PerNumAckedUpdatesCB func(*ClientSession, lnwire.ChannelID, uint16)
2445

2446
// PerRogueUpdateCountCB describes the signature of a callback function that can
2447
// be called for each session with the number of rogue updates that the session
2448
// has.
2449
type PerRogueUpdateCountCB func(*ClientSession, uint16)
2450

2451
// PerAckedUpdateCB describes the signature of a callback function that can be
2452
// called for each of a session's acked updates.
2453
type PerAckedUpdateCB func(*ClientSession, uint16, BackupID)
2454

2455
// PerCommittedUpdateCB describes the signature of a callback function that can
2456
// be called for each of a session's committed updates (updates that the client
2457
// has not yet received an ACK for).
2458
type PerCommittedUpdateCB func(*ClientSession, *CommittedUpdate)
2459

2460
// ClientSessionListOption describes the signature of a functional option that
2461
// can be used when listing client sessions in order to provide any extra
2462
// instruction to the query.
2463
type ClientSessionListOption func(cfg *ClientSessionListCfg)
2464

2465
// ClientSessionListCfg defines various query parameters that will be used when
2466
// querying the DB for client sessions.
2467
type ClientSessionListCfg struct {
2468
        // PerNumAckedUpdates will, if set, be called for each of the session's
2469
        // channels to communicate the number of updates stored for that
2470
        // channel.
2471
        PerNumAckedUpdates PerNumAckedUpdatesCB
2472

2473
        // PerRogueUpdateCount will, if set, be called with the number of rogue
2474
        // updates that the session has backed up.
2475
        PerRogueUpdateCount PerRogueUpdateCountCB
2476

2477
        // PerMaxHeight will, if set, be called for each of the session's
2478
        // channels to communicate the highest commit height of updates stored
2479
        // for that channel.
2480
        PerMaxHeight PerMaxHeightCB
2481

2482
        // PerCommittedUpdate will, if set, be called for each of the session's
2483
        // committed (un-acked) updates.
2484
        PerCommittedUpdate PerCommittedUpdateCB
2485

2486
        // PreEvaluateFilterFn will be run after loading a session from the DB
2487
        // and _before_ any of the other call-back functions in
2488
        // ClientSessionListCfg. Therefore, if a session fails this filter
2489
        // function, then it will not be passed to any of the other call backs
2490
        // and won't be included in the return list.
2491
        PreEvaluateFilterFn ClientSessionFilterFn
2492

2493
        // PostEvaluateFilterFn will be run _after_ all the other call-back
2494
        // functions in ClientSessionListCfg. If a session fails this filter
2495
        // function then all it means is that it won't be included in the list
2496
        // of sessions to return.
2497
        PostEvaluateFilterFn ClientSessWithNumCommittedUpdatesFilterFn
2498
}
2499

2500
// NewClientSessionCfg constructs a new ClientSessionListCfg.
2501
func NewClientSessionCfg() *ClientSessionListCfg {
4✔
2502
        return &ClientSessionListCfg{}
4✔
2503
}
4✔
2504

2505
// WithPerMaxHeight constructs a functional option that will set a call-back
2506
// function to be called for each of a session's channels to communicate the
2507
// maximum commitment height that the session has stored for the channel.
2508
func WithPerMaxHeight(cb PerMaxHeightCB) ClientSessionListOption {
×
2509
        return func(cfg *ClientSessionListCfg) {
×
2510
                cfg.PerMaxHeight = cb
×
2511
        }
×
2512
}
2513

2514
// WithPerNumAckedUpdates constructs a functional option that will set a
2515
// call-back function to be called for each of a session's channels to
2516
// communicate the number of updates that the session has stored for the
2517
// channel.
2518
func WithPerNumAckedUpdates(cb PerNumAckedUpdatesCB) ClientSessionListOption {
4✔
2519
        return func(cfg *ClientSessionListCfg) {
8✔
2520
                cfg.PerNumAckedUpdates = cb
4✔
2521
        }
4✔
2522
}
2523

2524
// WithPerRogueUpdateCount constructs a functional option that will set a
2525
// call-back function to be called with the number of rogue updates that the
2526
// session has backed up.
2527
func WithPerRogueUpdateCount(cb PerRogueUpdateCountCB) ClientSessionListOption {
4✔
2528
        return func(cfg *ClientSessionListCfg) {
8✔
2529
                cfg.PerRogueUpdateCount = cb
4✔
2530
        }
4✔
2531
}
2532

2533
// WithPerCommittedUpdate constructs a functional option that will set a
2534
// call-back function to be called for each of a client's un-acked updates.
2535
func WithPerCommittedUpdate(cb PerCommittedUpdateCB) ClientSessionListOption {
4✔
2536
        return func(cfg *ClientSessionListCfg) {
8✔
2537
                cfg.PerCommittedUpdate = cb
4✔
2538
        }
4✔
2539
}
2540

2541
// WithPreEvalFilterFn constructs a functional option that will set a call-back
2542
// function that will be called immediately after loading a session. If the
2543
// session fails this filter function, then it will not be passed to any of the
2544
// other evaluation call-back functions.
2545
func WithPreEvalFilterFn(fn ClientSessionFilterFn) ClientSessionListOption {
4✔
2546
        return func(cfg *ClientSessionListCfg) {
8✔
2547
                cfg.PreEvaluateFilterFn = fn
4✔
2548
        }
4✔
2549
}
2550

2551
// WithPostEvalFilterFn constructs a functional option that will set a call-back
2552
// function that will be used to determine if a session should be included in
2553
// the returned list. This differs from WithPreEvalFilterFn since that call-back
2554
// is used to determine if the session should be evaluated at all (and thus
2555
// run against the other ClientSessionListCfg call-backs) whereas the session
2556
// will only reach the PostEvalFilterFn call-back once it has already been
2557
// evaluated by all the other call-backs.
2558
func WithPostEvalFilterFn(
2559
        fn ClientSessWithNumCommittedUpdatesFilterFn) ClientSessionListOption {
4✔
2560

4✔
2561
        return func(cfg *ClientSessionListCfg) {
8✔
2562
                cfg.PostEvaluateFilterFn = fn
4✔
2563
        }
4✔
2564
}
2565

2566
// getClientSession loads the full ClientSession associated with the serialized
2567
// session id. This method populates the CommittedUpdates, AckUpdates and Tower
2568
// in addition to the ClientSession's body.
2569
func (c *ClientDB) getClientSession(sessionsBkt, chanIDIndexBkt kvdb.RBucket,
2570
        idBytes []byte, opts ...ClientSessionListOption) (*ClientSession,
2571
        error) {
4✔
2572

4✔
2573
        cfg := NewClientSessionCfg()
4✔
2574
        for _, o := range opts {
8✔
2575
                o(cfg)
4✔
2576
        }
4✔
2577

2578
        session, err := getClientSessionBody(sessionsBkt, idBytes)
4✔
2579
        if err != nil {
4✔
2580
                return nil, err
×
2581
        }
×
2582

2583
        if cfg.PreEvaluateFilterFn != nil && !cfg.PreEvaluateFilterFn(session) {
8✔
2584
                return nil, ErrSessionFailedFilterFn
4✔
2585
        }
4✔
2586

2587
        // Can't fail because client session body has already been read.
2588
        sessionBkt := sessionsBkt.NestedReadBucket(idBytes)
4✔
2589

4✔
2590
        // Pass the session's committed (un-acked) updates through the call-back
4✔
2591
        // if one is provided.
4✔
2592
        numCommittedUpdates, err := filterClientSessionCommits(
4✔
2593
                sessionBkt, session, cfg.PerCommittedUpdate,
4✔
2594
        )
4✔
2595
        if err != nil {
4✔
2596
                return nil, err
×
2597
        }
×
2598

2599
        // Pass the session's acked updates through the call-back if one is
2600
        // provided.
2601
        err = c.filterClientSessionAcks(
4✔
2602
                sessionBkt, chanIDIndexBkt, session, cfg.PerMaxHeight,
4✔
2603
                cfg.PerNumAckedUpdates, cfg.PerRogueUpdateCount,
4✔
2604
        )
4✔
2605
        if err != nil {
4✔
2606
                return nil, err
×
2607
        }
×
2608

2609
        if cfg.PostEvaluateFilterFn != nil &&
4✔
2610
                !cfg.PostEvaluateFilterFn(session, numCommittedUpdates) {
4✔
2611

×
2612
                return nil, ErrSessionFailedFilterFn
×
2613
        }
×
2614

2615
        return session, nil
4✔
2616
}
2617

2618
// getClientSessionCommits retrieves all committed updates for the session
2619
// identified by the serialized session id. If a PerCommittedUpdateCB is
2620
// provided, then it will be called for each of the session's committed updates.
2621
func getClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession,
2622
        cb PerCommittedUpdateCB) ([]CommittedUpdate, error) {
4✔
2623

4✔
2624
        // Initialize committedUpdates so that we can return an initialized map
4✔
2625
        // if no committed updates exist.
4✔
2626
        committedUpdates := make([]CommittedUpdate, 0)
4✔
2627

4✔
2628
        sessionCommits := sessionBkt.NestedReadBucket(cSessionCommits)
4✔
2629
        if sessionCommits == nil {
8✔
2630
                return committedUpdates, nil
4✔
2631
        }
4✔
2632

2633
        err := sessionCommits.ForEach(func(k, v []byte) error {
4✔
2634
                var committedUpdate CommittedUpdate
×
2635
                err := committedUpdate.Decode(bytes.NewReader(v))
×
2636
                if err != nil {
×
2637
                        return err
×
2638
                }
×
2639
                committedUpdate.SeqNum = byteOrder.Uint16(k)
×
2640

×
2641
                committedUpdates = append(committedUpdates, committedUpdate)
×
2642

×
2643
                if cb != nil {
×
2644
                        cb(s, &committedUpdate)
×
2645
                }
×
2646

2647
                return nil
×
2648
        })
2649
        if err != nil {
4✔
2650
                return nil, err
×
2651
        }
×
2652

2653
        return committedUpdates, nil
4✔
2654
}
2655

2656
// filterClientSessionAcks retrieves all acked updates for the session
2657
// identified by the serialized session id and passes them to the provided
2658
// call back if one is provided.
2659
func (c *ClientDB) filterClientSessionAcks(sessionBkt,
2660
        chanIDIndexBkt kvdb.RBucket, s *ClientSession, perMaxCb PerMaxHeightCB,
2661
        perNumAckedUpdates PerNumAckedUpdatesCB,
2662
        perRogueUpdateCount PerRogueUpdateCountCB) error {
4✔
2663

4✔
2664
        if perRogueUpdateCount != nil {
8✔
2665
                var (
4✔
2666
                        count uint64
4✔
2667
                        err   error
4✔
2668
                )
4✔
2669
                rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
4✔
2670
                if len(rogueCountBytes) != 0 {
4✔
2671
                        count, err = readBigSize(rogueCountBytes)
×
2672
                        if err != nil {
×
2673
                                return err
×
2674
                        }
×
2675
                }
2676

2677
                perRogueUpdateCount(s, uint16(count))
4✔
2678
        }
2679

2680
        if perMaxCb == nil && perNumAckedUpdates == nil {
8✔
2681
                return nil
4✔
2682
        }
4✔
2683

2684
        sessionAcksRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
4✔
2685
        if sessionAcksRanges == nil {
8✔
2686
                return nil
4✔
2687
        }
4✔
2688

2689
        return sessionAcksRanges.ForEach(func(dbChanID, _ []byte) error {
8✔
2690
                rangeBkt := sessionAcksRanges.NestedReadBucket(dbChanID)
4✔
2691
                if rangeBkt == nil {
4✔
2692
                        return nil
×
2693
                }
×
2694

2695
                index, err := readRangeIndex(rangeBkt)
4✔
2696
                if err != nil {
4✔
2697
                        return err
×
2698
                }
×
2699

2700
                chanIDBytes := chanIDIndexBkt.Get(dbChanID)
4✔
2701
                var chanID lnwire.ChannelID
4✔
2702
                copy(chanID[:], chanIDBytes)
4✔
2703

4✔
2704
                if perMaxCb != nil {
4✔
2705
                        perMaxCb(s, chanID, index.MaxHeight())
×
2706
                }
×
2707

2708
                if perNumAckedUpdates != nil {
8✔
2709
                        perNumAckedUpdates(s, chanID, uint16(index.NumInSet()))
4✔
2710
                }
4✔
2711
                return nil
4✔
2712
        })
2713
}
2714

2715
// filterClientSessionCommits retrieves all committed updates for the session
2716
// identified by the serialized session id and passes them to the given
2717
// PerCommittedUpdateCB callback.
2718
func filterClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession,
2719
        cb PerCommittedUpdateCB) (uint16, error) {
4✔
2720

4✔
2721
        sessionCommits := sessionBkt.NestedReadBucket(cSessionCommits)
4✔
2722
        if sessionCommits == nil {
8✔
2723
                return 0, nil
4✔
2724
        }
4✔
2725

2726
        var numUpdates uint16
4✔
2727
        err := sessionCommits.ForEach(func(k, v []byte) error {
4✔
2728
                numUpdates++
×
2729

×
2730
                if cb == nil {
×
2731
                        return nil
×
2732
                }
×
2733

2734
                var committedUpdate CommittedUpdate
×
2735
                err := committedUpdate.Decode(bytes.NewReader(v))
×
2736
                if err != nil {
×
2737
                        return err
×
2738
                }
×
2739
                committedUpdate.SeqNum = byteOrder.Uint16(k)
×
2740

×
2741
                cb(s, &committedUpdate)
×
2742

×
2743
                return nil
×
2744
        })
2745
        if err != nil {
4✔
2746
                return 0, err
×
2747
        }
×
2748

2749
        return numUpdates, nil
4✔
2750
}
2751

2752
// putClientSessionBody stores the body of the ClientSession (everything but the
2753
// CommittedUpdates and AckedUpdates).
2754
func putClientSessionBody(sessionBkt kvdb.RwBucket,
2755
        session *ClientSession) error {
4✔
2756

4✔
2757
        var b bytes.Buffer
4✔
2758
        err := session.Encode(&b)
4✔
2759
        if err != nil {
4✔
2760
                return err
×
2761
        }
×
2762

2763
        return sessionBkt.Put(cSessionBody, b.Bytes())
4✔
2764
}
2765

2766
// markSessionStatus updates the persisted state of the session to the new
2767
// status.
2768
func markSessionStatus(sessions kvdb.RwBucket, session *ClientSession,
2769
        status CSessionStatus) error {
4✔
2770

4✔
2771
        sessionBkt, err := sessions.CreateBucketIfNotExists(session.ID[:])
4✔
2772
        if err != nil {
4✔
2773
                return err
×
2774
        }
×
2775

2776
        session.Status = status
4✔
2777

4✔
2778
        return putClientSessionBody(sessionBkt, session)
4✔
2779
}
2780

2781
// getChanSummary loads a ClientChanSummary for the passed chanID.
2782
func getChanSummary(chanDetails kvdb.RBucket) (*ClientChanSummary, error) {
4✔
2783
        chanSummaryBytes := chanDetails.Get(cChannelSummary)
4✔
2784
        if chanSummaryBytes == nil {
4✔
2785
                return nil, ErrChannelNotRegistered
×
2786
        }
×
2787

2788
        var summary ClientChanSummary
4✔
2789
        err := summary.Decode(bytes.NewReader(chanSummaryBytes))
4✔
2790
        if err != nil {
4✔
2791
                return nil, err
×
2792
        }
×
2793

2794
        return &summary, nil
4✔
2795
}
2796

2797
// putChanSummary stores a ClientChanSummary for the passed chanID.
2798
func putChanSummary(chanDetails kvdb.RwBucket,
2799
        summary *ClientChanSummary) error {
4✔
2800

4✔
2801
        var b bytes.Buffer
4✔
2802
        err := summary.Encode(&b)
4✔
2803
        if err != nil {
4✔
2804
                return err
×
2805
        }
×
2806

2807
        return chanDetails.Put(cChannelSummary, b.Bytes())
4✔
2808
}
2809

2810
// getTower loads a Tower identified by its serialized tower id.
2811
func getTower(towers kvdb.RBucket, id []byte) (*Tower, error) {
4✔
2812
        towerBytes := towers.Get(id)
4✔
2813
        if towerBytes == nil {
4✔
2814
                return nil, ErrTowerNotFound
×
2815
        }
×
2816

2817
        var tower Tower
4✔
2818
        err := tower.Decode(bytes.NewReader(towerBytes))
4✔
2819
        if err != nil {
4✔
2820
                return nil, err
×
2821
        }
×
2822

2823
        tower.ID = TowerIDFromBytes(id)
4✔
2824

4✔
2825
        return &tower, nil
4✔
2826
}
2827

2828
// putTower stores a Tower identified by its serialized tower id.
2829
func putTower(towers kvdb.RwBucket, tower *Tower) error {
4✔
2830
        var b bytes.Buffer
4✔
2831
        err := tower.Encode(&b)
4✔
2832
        if err != nil {
4✔
2833
                return err
×
2834
        }
×
2835

2836
        return towers.Put(tower.ID.Bytes(), b.Bytes())
4✔
2837
}
2838

2839
// getDBChanID returns the db-assigned channel ID for the given real channel ID.
2840
// It returns both the uint64 and byte representation.
2841
func getDBChanID(chanDetailsBkt kvdb.RBucket, chanID lnwire.ChannelID) (uint64,
2842
        []byte, error) {
4✔
2843

4✔
2844
        chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
4✔
2845
        if chanDetails == nil {
4✔
2846
                return 0, nil, ErrChannelNotRegistered
×
2847
        }
×
2848

2849
        idBytes := chanDetails.Get(cChanDBID)
4✔
2850
        if len(idBytes) == 0 {
4✔
2851
                return 0, nil, fmt.Errorf("no db-assigned ID found for "+
×
2852
                        "channel ID %s", chanID)
×
2853
        }
×
2854

2855
        id, err := readBigSize(idBytes)
4✔
2856
        if err != nil {
4✔
2857
                return 0, nil, err
×
2858
        }
×
2859

2860
        return id, idBytes, nil
4✔
2861
}
2862

2863
// getDBSessionID returns the db-assigned session ID for the given real session
2864
// ID. It returns both the uint64 and byte representation.
2865
func getDBSessionID(sessionsBkt kvdb.RBucket, sessionID SessionID) (uint64,
2866
        []byte, error) {
4✔
2867

4✔
2868
        sessionBkt := sessionsBkt.NestedReadBucket(sessionID[:])
4✔
2869
        if sessionBkt == nil {
4✔
2870
                return 0, nil, ErrClientSessionNotFound
×
2871
        }
×
2872

2873
        idBytes := sessionBkt.Get(cSessionDBID)
4✔
2874
        if len(idBytes) == 0 {
4✔
2875
                return 0, nil, fmt.Errorf("no db-assigned ID found for "+
×
2876
                        "session ID %s", sessionID)
×
2877
        }
×
2878

2879
        id, err := readBigSize(idBytes)
4✔
2880
        if err != nil {
4✔
2881
                return 0, nil, err
×
2882
        }
×
2883

2884
        return id, idBytes, nil
4✔
2885
}
2886

2887
// maybeUpdateMaxCommitHeight updates the given channel details bucket with the
2888
// given height if it is larger than the current max height stored for the
2889
// channel.
2890
func maybeUpdateMaxCommitHeight(tx kvdb.RwTx, backupID BackupID) error {
4✔
2891
        chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
2892
        if chanDetailsBkt == nil {
4✔
2893
                return ErrUninitializedDB
×
2894
        }
×
2895

2896
        // If an entry for this channel does not exist in the channel details
2897
        // bucket then we exit here as this means that the channel has been
2898
        // closed.
2899
        chanDetails := chanDetailsBkt.NestedReadWriteBucket(backupID.ChanID[:])
4✔
2900
        if chanDetails == nil {
4✔
2901
                return nil
×
2902
        }
×
2903

2904
        putHeight := func() error {
8✔
2905
                b, err := writeBigSize(backupID.CommitHeight)
4✔
2906
                if err != nil {
4✔
2907
                        return err
×
2908
                }
×
2909

2910
                return chanDetails.Put(
4✔
2911
                        cChanMaxCommitmentHeight, b,
4✔
2912
                )
4✔
2913
        }
2914

2915
        // Get current height.
2916
        heightBytes := chanDetails.Get(cChanMaxCommitmentHeight)
4✔
2917

4✔
2918
        // The height might have not been set yet, in which case
4✔
2919
        // we can just write the new height.
4✔
2920
        if len(heightBytes) == 0 {
8✔
2921
                return putHeight()
4✔
2922
        }
4✔
2923

2924
        // Otherwise, read in the current max commitment height for the channel.
2925
        currentHeight, err := readBigSize(heightBytes)
4✔
2926
        if err != nil {
4✔
2927
                return err
×
2928
        }
×
2929

2930
        // If the new height is not larger than the current persisted height,
2931
        // then there is nothing left for us to do.
2932
        if backupID.CommitHeight <= currentHeight {
4✔
2933
                return nil
×
2934
        }
×
2935

2936
        return putHeight()
4✔
2937
}
2938

2939
func getRealSessionID(sessIDIndexBkt kvdb.RBucket, dbID uint64) (*SessionID,
2940
        error) {
4✔
2941

4✔
2942
        dbIDBytes, err := writeBigSize(dbID)
4✔
2943
        if err != nil {
4✔
2944
                return nil, err
×
2945
        }
×
2946

2947
        sessIDBytes := sessIDIndexBkt.Get(dbIDBytes)
4✔
2948
        if len(sessIDBytes) != SessionIDSize {
4✔
2949
                return nil, fmt.Errorf("session ID not found")
×
2950
        }
×
2951

2952
        var sessID SessionID
4✔
2953
        copy(sessID[:], sessIDBytes)
4✔
2954

4✔
2955
        return &sessID, nil
4✔
2956
}
2957

2958
func getRealChannelID(chanIDIndexBkt kvdb.RBucket,
2959
        dbID uint64) (*lnwire.ChannelID, error) {
4✔
2960

4✔
2961
        dbIDBytes, err := writeBigSize(dbID)
4✔
2962
        if err != nil {
4✔
2963
                return nil, err
×
2964
        }
×
2965

2966
        chanIDBytes := chanIDIndexBkt.Get(dbIDBytes)
4✔
2967
        if len(chanIDBytes) != 32 { //nolint:gomnd
4✔
2968
                return nil, fmt.Errorf("channel ID not found")
×
2969
        }
×
2970

2971
        var chanIDS lnwire.ChannelID
4✔
2972
        copy(chanIDS[:], chanIDBytes)
4✔
2973

4✔
2974
        return &chanIDS, nil
4✔
2975
}
2976

2977
// writeBigSize will encode the given uint64 as a BigSize byte slice.
2978
func writeBigSize(i uint64) ([]byte, error) {
4✔
2979
        var b bytes.Buffer
4✔
2980
        err := tlv.WriteVarInt(&b, i, &[8]byte{})
4✔
2981
        if err != nil {
4✔
2982
                return nil, err
×
2983
        }
×
2984

2985
        return b.Bytes(), nil
4✔
2986
}
2987

2988
// readBigSize converts the given byte slice into a uint64 and assumes that the
2989
// bytes slice is using BigSize encoding.
2990
func readBigSize(b []byte) (uint64, error) {
4✔
2991
        r := bytes.NewReader(b)
4✔
2992
        i, err := tlv.ReadVarInt(r, &[8]byte{})
4✔
2993
        if err != nil {
4✔
2994
                return 0, err
×
2995
        }
×
2996

2997
        return i, nil
4✔
2998
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc