• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12252204654

10 Dec 2024 08:24AM UTC coverage: 49.833% (+0.06%) from 49.773%
12252204654

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

100202 of 201077 relevant lines covered (49.83%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.59
/watchtower/wtdb/client_db.go
1
package wtdb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "net"
9
        "sync"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/lightningnetwork/lnd/fn/v2"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
        "github.com/lightningnetwork/lnd/tlv"
16
        "github.com/lightningnetwork/lnd/watchtower/blob"
17
)
18

19
var (
20
        // cSessionKeyIndexBkt is a top-level bucket storing:
21
        //   tower-id -> reserved-session-key-index (uint32).
22
        cSessionKeyIndexBkt = []byte("client-session-key-index-bucket")
23

24
        // cChanDetailsBkt is a top-level bucket storing:
25
        //   channel-id => cChannelSummary -> encoded ClientChanSummary.
26
        //                  => cChanDBID -> db-assigned-id
27
        //                 => cChanSessions => db-session-id -> 1
28
        //                 => cChanClosedHeight -> block-height
29
        //                 => cChanMaxCommitmentHeight -> commitment-height
30
        cChanDetailsBkt = []byte("client-channel-detail-bucket")
31

32
        // cChanSessions is a sub-bucket of cChanDetailsBkt which stores:
33
        //    db-session-id -> 1
34
        cChanSessions = []byte("client-channel-sessions")
35

36
        // cChanDBID is a key used in the cChanDetailsBkt to store the
37
        // db-assigned-id of a channel.
38
        cChanDBID = []byte("client-channel-db-id")
39

40
        // cChanClosedHeight is a key used in the cChanDetailsBkt to store the
41
        // block height at which the channel's closing transaction was mined in.
42
        // If this there is no associated value for this key, then the channel
43
        // has not yet been marked as closed.
44
        cChanClosedHeight = []byte("client-channel-closed-height")
45

46
        // cChannelSummary is a key used in cChanDetailsBkt to store the encoded
47
        // body of ClientChanSummary.
48
        cChannelSummary = []byte("client-channel-summary")
49

50
        // cChanMaxCommitmentHeight is a key used in the cChanDetailsBkt used
51
        // to store the highest commitment height for this channel that the
52
        // tower has been handed.
53
        cChanMaxCommitmentHeight = []byte(
54
                "client-channel-max-commitment-height",
55
        )
56

57
        // cSessionBkt is a top-level bucket storing:
58
        //   session-id => cSessionBody -> encoded ClientSessionBody
59
        //                 => cSessionDBID -> db-assigned-id
60
        //              => cSessionCommits => seqnum -> encoded CommittedUpdate
61
        //              => cSessionAckRangeIndex => db-chan-id => start -> end
62
        //                 => cSessionRogueUpdateCount -> count
63
        cSessionBkt = []byte("client-session-bucket")
64

65
        // cSessionDBID is a key used in the cSessionBkt to store the
66
        // db-assigned-id of a session.
67
        cSessionDBID = []byte("client-session-db-id")
68

69
        // cSessionBody is a sub-bucket of cSessionBkt storing only the body of
70
        // the ClientSession.
71
        cSessionBody = []byte("client-session-body")
72

73
        // cSessionBody is a sub-bucket of cSessionBkt storing:
74
        //    seqnum -> encoded CommittedUpdate.
75
        cSessionCommits = []byte("client-session-commits")
76

77
        // cSessionAckRangeIndex is a sub-bucket of cSessionBkt storing
78
        //    chan-id => start -> end
79
        cSessionAckRangeIndex = []byte("client-session-ack-range-index")
80

81
        // cSessionRogueUpdateCount is a key in the cSessionBkt bucket storing
82
        // the number of rogue updates that were backed up using the session.
83
        // Rogue updates are updates for channels that have been closed already
84
        // at the time of the back-up.
85
        cSessionRogueUpdateCount = []byte("client-session-rogue-update-count")
86

87
        // cChanIDIndexBkt is a top-level bucket storing:
88
        //    db-assigned-id -> channel-ID
89
        cChanIDIndexBkt = []byte("client-channel-id-index")
90

91
        // cSessionIDIndexBkt is a top-level bucket storing:
92
        //    db-assigned-id -> session-id
93
        cSessionIDIndexBkt = []byte("client-session-id-index")
94

95
        // cTowerBkt is a top-level bucket storing:
96
        //    tower-id -> encoded Tower.
97
        cTowerBkt = []byte("client-tower-bucket")
98

99
        // cTowerIndexBkt is a top-level bucket storing:
100
        //    tower-pubkey -> tower-id.
101
        cTowerIndexBkt = []byte("client-tower-index-bucket")
102

103
        // cTowerToSessionIndexBkt is a top-level bucket storing:
104
        //         tower-id -> session-id -> 1
105
        cTowerToSessionIndexBkt = []byte(
106
                "client-tower-to-session-index-bucket",
107
        )
108

109
        // cClosableSessionsBkt is a top-level bucket storing:
110
        //         db-session-id -> last-channel-close-height
111
        cClosableSessionsBkt = []byte("client-closable-sessions-bucket")
112

113
        // cTaskQueue is a top-level bucket where the disk queue may store its
114
        // content.
115
        cTaskQueue = []byte("client-task-queue")
116

117
        // ErrTowerNotFound signals that the target tower was not found in the
118
        // database.
119
        ErrTowerNotFound = errors.New("tower not found")
120

121
        // ErrTowerUnackedUpdates is an error returned when we attempt to mark a
122
        // tower's sessions as inactive, but one of its sessions has unacked
123
        // updates.
124
        ErrTowerUnackedUpdates = errors.New("tower has unacked updates")
125

126
        // ErrCorruptClientSession signals that the client session's on-disk
127
        // structure deviates from what is expected.
128
        ErrCorruptClientSession = errors.New("client session corrupted")
129

130
        // ErrCorruptChanDetails signals that the clients channel detail's
131
        // on-disk structure deviates from what is expected.
132
        ErrCorruptChanDetails = errors.New("channel details corrupted")
133

134
        // ErrClientSessionAlreadyExists signals an attempt to reinsert a client
135
        // session that has already been created.
136
        ErrClientSessionAlreadyExists = errors.New(
137
                "client session already exists",
138
        )
139

140
        // ErrChannelAlreadyRegistered signals a duplicate attempt to register a
141
        // channel with the client database.
142
        ErrChannelAlreadyRegistered = errors.New("channel already registered")
143

144
        // ErrChannelNotRegistered signals a channel has not yet been registered
145
        // in the client database.
146
        ErrChannelNotRegistered = errors.New("channel not registered")
147

148
        // ErrClientSessionNotFound signals that the requested client session
149
        // was not found in the database.
150
        ErrClientSessionNotFound = errors.New("client session not found")
151

152
        // ErrUpdateAlreadyCommitted signals that the chosen sequence number has
153
        // already been committed to an update with a different breach hint.
154
        ErrUpdateAlreadyCommitted = errors.New("update already committed")
155

156
        // ErrCommitUnorderedUpdate signals the client tried to commit a
157
        // sequence number other than the next unallocated sequence number.
158
        ErrCommitUnorderedUpdate = errors.New("update seqnum not monotonic")
159

160
        // ErrCommittedUpdateNotFound signals that the tower tried to ACK a
161
        // sequence number that has not yet been allocated by the client.
162
        ErrCommittedUpdateNotFound = errors.New("committed update not found")
163

164
        // ErrUnallocatedLastApplied signals that the tower tried to provide a
165
        // LastApplied value greater than any allocated sequence number.
166
        ErrUnallocatedLastApplied = errors.New("tower echoed last appiled " +
167
                "greater than allocated seqnum")
168

169
        // ErrNoReservedKeyIndex signals that a client session could not be
170
        // created because no session key index was reserved.
171
        ErrNoReservedKeyIndex = errors.New("key index not reserved")
172

173
        // ErrIncorrectKeyIndex signals that the client session could not be
174
        // created because session key index differs from the reserved key
175
        // index.
176
        ErrIncorrectKeyIndex = errors.New("incorrect key index")
177

178
        // ErrLastTowerAddr is an error returned when the last address of a
179
        // watchtower is attempted to be removed.
180
        ErrLastTowerAddr = errors.New("cannot remove last tower address")
181

182
        // ErrNoRangeIndexFound is returned when there is no persisted
183
        // range-index found for the given session ID to channel ID pair.
184
        ErrNoRangeIndexFound = errors.New("no range index found for the " +
185
                "given session-channel pair")
186

187
        // ErrSessionFailedFilterFn indicates that a particular session did
188
        // not pass the filter func provided by the caller.
189
        ErrSessionFailedFilterFn = errors.New("session failed filter func")
190

191
        // ErrSessionNotClosable is returned when a session is not found in the
192
        // closable list.
193
        ErrSessionNotClosable = errors.New("session is not closable")
194

195
        // errSessionHasOpenChannels is an error used to indicate that a
196
        // session has updates for channels that are still open.
197
        errSessionHasOpenChannels = errors.New("session has open channels")
198

199
        // ErrSessionHasUnackedUpdates is an error used to indicate that a
200
        // session has un-acked updates.
201
        ErrSessionHasUnackedUpdates = errors.New("session has un-acked updates")
202

203
        // errChannelHasMoreSessions is an error used to indicate that a channel
204
        // has updates in other non-closed sessions.
205
        errChannelHasMoreSessions = errors.New("channel has updates in " +
206
                "other sessions")
207
)
208

209
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
210
// the watchtower database.
211
func NewBoltBackendCreator(active bool, dbPath,
212
        dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend,
213
        error) {
×
214

×
215
        // If the watchtower client isn't active, we return a function that
×
216
        // always returns a nil DB to make sure we don't create empty database
×
217
        // files.
×
218
        if !active {
×
219
                return func(_ *kvdb.BoltConfig) (kvdb.Backend, error) {
×
220
                        return nil, nil
×
221
                }
×
222
        }
223

224
        return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
225
                cfg := &kvdb.BoltBackendConfig{
×
226
                        DBPath:            dbPath,
×
227
                        DBFileName:        dbFileName,
×
228
                        NoFreelistSync:    boltCfg.NoFreelistSync,
×
229
                        AutoCompact:       boltCfg.AutoCompact,
×
230
                        AutoCompactMinAge: boltCfg.AutoCompactMinAge,
×
231
                        DBTimeout:         boltCfg.DBTimeout,
×
232
                }
×
233

×
234
                db, err := kvdb.GetBoltBackend(cfg)
×
235
                if err != nil {
×
236
                        return nil, fmt.Errorf("could not open boltdb: %w", err)
×
237
                }
×
238

239
                return db, nil
×
240
        }
241
}
242

243
// ClientDB is single database providing a persistent storage engine for the
244
// wtclient.
245
type ClientDB struct {
246
        db kvdb.Backend
247

248
        // ackedRangeIndex is a map from session ID to channel ID to a
249
        // RangeIndex which represents the backups that have been acked for that
250
        // channel using that session.
251
        ackedRangeIndex   map[SessionID]map[lnwire.ChannelID]*RangeIndex
252
        ackedRangeIndexMu sync.Mutex
253
}
254

255
// OpenClientDB opens the client database given the path to the database's
256
// directory. If no such database exists, this method will initialize a fresh
257
// one using the latest version number and bucket structure. If a database
258
// exists but has a lower version number than the current version, any necessary
259
// migrations will be applied before returning. Any attempt to open a database
260
// with a version number higher that the latest version will fail to prevent
261
// accidental reversion.
262
func OpenClientDB(db kvdb.Backend) (*ClientDB, error) {
4✔
263
        firstInit, err := isFirstInit(db)
4✔
264
        if err != nil {
4✔
265
                return nil, err
×
266
        }
×
267

268
        clientDB := &ClientDB{
4✔
269
                db: db,
4✔
270
                ackedRangeIndex: make(
4✔
271
                        map[SessionID]map[lnwire.ChannelID]*RangeIndex,
4✔
272
                ),
4✔
273
        }
4✔
274

4✔
275
        err = initOrSyncVersions(clientDB, firstInit, clientDBVersions)
4✔
276
        if err != nil {
4✔
277
                db.Close()
×
278
                return nil, err
×
279
        }
×
280

281
        // Now that the database version fully consistent with our latest known
282
        // version, ensure that all top-level buckets known to this version are
283
        // initialized. This allows us to assume their presence throughout all
284
        // operations. If an known top-level bucket is expected to exist but is
285
        // missing, this will trigger a ErrUninitializedDB error.
286
        err = kvdb.Update(clientDB.db, initClientDBBuckets, func() {})
8✔
287
        if err != nil {
4✔
288
                db.Close()
×
289
                return nil, err
×
290
        }
×
291

292
        return clientDB, nil
4✔
293
}
294

295
// initClientDBBuckets creates all top-level buckets required to handle database
296
// operations required by the latest version.
297
func initClientDBBuckets(tx kvdb.RwTx) error {
4✔
298
        buckets := [][]byte{
4✔
299
                cSessionKeyIndexBkt,
4✔
300
                cChanDetailsBkt,
4✔
301
                cSessionBkt,
4✔
302
                cTowerBkt,
4✔
303
                cTowerIndexBkt,
4✔
304
                cTowerToSessionIndexBkt,
4✔
305
                cChanIDIndexBkt,
4✔
306
                cSessionIDIndexBkt,
4✔
307
                cClosableSessionsBkt,
4✔
308
        }
4✔
309

4✔
310
        for _, bucket := range buckets {
8✔
311
                _, err := tx.CreateTopLevelBucket(bucket)
4✔
312
                if err != nil {
4✔
313
                        return err
×
314
                }
×
315
        }
316

317
        return nil
4✔
318
}
319

320
// bdb returns the backing bbolt.DB instance.
321
//
322
// NOTE: Part of the versionedDB interface.
323
func (c *ClientDB) bdb() kvdb.Backend {
4✔
324
        return c.db
4✔
325
}
4✔
326

327
// Version returns the database's current version number.
328
//
329
// NOTE: Part of the versionedDB interface.
330
func (c *ClientDB) Version() (uint32, error) {
4✔
331
        var version uint32
4✔
332
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
333
                var err error
4✔
334
                version, err = getDBVersion(tx)
4✔
335
                return err
4✔
336
        }, func() {
8✔
337
                version = 0
4✔
338
        })
4✔
339
        if err != nil {
4✔
340
                return 0, err
×
341
        }
×
342

343
        return version, nil
4✔
344
}
345

346
// Close closes the underlying database.
347
func (c *ClientDB) Close() error {
×
348
        return c.db.Close()
×
349
}
×
350

351
// CreateTower initialize an address record used to communicate with a
352
// watchtower. Each Tower is assigned a unique ID, that is used to amortize
353
// storage costs of the public key when used by multiple sessions. If the tower
354
// already exists, the address is appended to the list of all addresses used to
355
// that tower previously and its corresponding sessions are marked as active.
356
func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) {
4✔
357
        var towerPubKey [33]byte
4✔
358
        copy(towerPubKey[:], lnAddr.IdentityKey.SerializeCompressed())
4✔
359

4✔
360
        var tower *Tower
4✔
361
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
362
                towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
4✔
363
                if towerIndex == nil {
4✔
364
                        return ErrUninitializedDB
×
365
                }
×
366

367
                towers := tx.ReadWriteBucket(cTowerBkt)
4✔
368
                if towers == nil {
4✔
369
                        return ErrUninitializedDB
×
370
                }
×
371

372
                towerToSessionIndex := tx.ReadWriteBucket(
4✔
373
                        cTowerToSessionIndexBkt,
4✔
374
                )
4✔
375
                if towerToSessionIndex == nil {
4✔
376
                        return ErrUninitializedDB
×
377
                }
×
378

379
                // Check if the tower index already knows of this pubkey.
380
                towerIDBytes := towerIndex.Get(towerPubKey[:])
4✔
381
                if len(towerIDBytes) == 8 {
8✔
382
                        // The tower already exists, deserialize the existing
4✔
383
                        // record.
4✔
384
                        var err error
4✔
385
                        tower, err = getTower(towers, towerIDBytes)
4✔
386
                        if err != nil {
4✔
387
                                return err
×
388
                        }
×
389

390
                        // Set its status to active.
391
                        tower.Status = TowerStatusActive
4✔
392

4✔
393
                        // Add the new address to the existing tower. If the
4✔
394
                        // address is a duplicate, this will result in no
4✔
395
                        // change.
4✔
396
                        tower.AddAddress(lnAddr.Address)
4✔
397
                } else {
4✔
398
                        // No such tower exists, create a new tower id for our
4✔
399
                        // new tower. The error is unhandled since NextSequence
4✔
400
                        // never fails in an Update.
4✔
401
                        towerID, _ := towerIndex.NextSequence()
4✔
402

4✔
403
                        tower = &Tower{
4✔
404
                                ID:          TowerID(towerID),
4✔
405
                                IdentityKey: lnAddr.IdentityKey,
4✔
406
                                Addresses:   []net.Addr{lnAddr.Address},
4✔
407
                                Status:      TowerStatusActive,
4✔
408
                        }
4✔
409

4✔
410
                        towerIDBytes = tower.ID.Bytes()
4✔
411

4✔
412
                        // Since this tower is new, record the mapping from
4✔
413
                        // tower pubkey to tower id in the tower index.
4✔
414
                        err := towerIndex.Put(towerPubKey[:], towerIDBytes)
4✔
415
                        if err != nil {
4✔
416
                                return err
×
417
                        }
×
418

419
                        // Create a new bucket for this tower in the
420
                        // tower-to-sessions index.
421
                        _, err = towerToSessionIndex.CreateBucket(towerIDBytes)
4✔
422
                        if err != nil {
4✔
423
                                return err
×
424
                        }
×
425
                }
426

427
                // Store the new or updated tower under its tower id.
428
                return putTower(towers, tower)
4✔
429
        }, func() {
4✔
430
                tower = nil
4✔
431
        })
4✔
432
        if err != nil {
4✔
433
                return nil, err
×
434
        }
×
435

436
        return tower, nil
4✔
437
}
438

439
// RemoveTower modifies a tower's record within the database. If an address is
440
// provided, then _only_ the address record should be removed from the tower's
441
// persisted state. Otherwise, we'll attempt to mark the tower as inactive. If
442
// any of its sessions has unacked updates, then ErrTowerUnackedUpdates is
443
// returned. If the tower doesn't have any sessions at all, it'll be completely
444
// removed from the database.
445
//
446
// NOTE: An error is not returned if the tower doesn't exist.
447
func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error {
4✔
448
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
449
                towers := tx.ReadWriteBucket(cTowerBkt)
4✔
450
                if towers == nil {
4✔
451
                        return ErrUninitializedDB
×
452
                }
×
453

454
                towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
4✔
455
                if towerIndex == nil {
4✔
456
                        return ErrUninitializedDB
×
457
                }
×
458

459
                towersToSessionsIndex := tx.ReadWriteBucket(
4✔
460
                        cTowerToSessionIndexBkt,
4✔
461
                )
4✔
462
                if towersToSessionsIndex == nil {
4✔
463
                        return ErrUninitializedDB
×
464
                }
×
465

466
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
467
                if chanIDIndexBkt == nil {
4✔
468
                        return ErrUninitializedDB
×
469
                }
×
470

471
                // Don't return an error if the watchtower doesn't exist to act
472
                // as a NOP.
473
                pubKeyBytes := pubKey.SerializeCompressed()
4✔
474
                towerIDBytes := towerIndex.Get(pubKeyBytes)
4✔
475
                if towerIDBytes == nil {
4✔
476
                        return nil
×
477
                }
×
478

479
                tower, err := getTower(towers, towerIDBytes)
4✔
480
                if err != nil {
4✔
481
                        return err
×
482
                }
×
483

484
                // If an address is provided, then we should _only_ remove the
485
                // address record from the database.
486
                if addr != nil {
4✔
487
                        // Towers should always have at least one address saved.
×
488
                        tower.RemoveAddress(addr)
×
489
                        if len(tower.Addresses) == 0 {
×
490
                                return ErrLastTowerAddr
×
491
                        }
×
492

493
                        return putTower(towers, tower)
×
494
                }
495

496
                // Otherwise, we should attempt to mark the tower's sessions as
497
                // inactive.
498
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
499
                if sessions == nil {
4✔
500
                        return ErrUninitializedDB
×
501
                }
×
502
                towerID := TowerIDFromBytes(towerIDBytes)
4✔
503

4✔
504
                committedUpdateCount := make(map[SessionID]uint16)
4✔
505
                perCommittedUpdate := func(s *ClientSession,
4✔
506
                        _ *CommittedUpdate) {
4✔
507

×
508
                        committedUpdateCount[s.ID]++
×
509
                }
×
510

511
                towerSessions, err := c.listTowerSessions(
4✔
512
                        towerID, sessions, chanIDIndexBkt,
4✔
513
                        towersToSessionsIndex,
4✔
514
                        WithPerCommittedUpdate(perCommittedUpdate),
4✔
515
                )
4✔
516
                if err != nil {
4✔
517
                        return err
×
518
                }
×
519

520
                // If it doesn't have any, we can completely remove it from the
521
                // database.
522
                if len(towerSessions) == 0 {
4✔
523
                        if err := towerIndex.Delete(pubKeyBytes); err != nil {
×
524
                                return err
×
525
                        }
×
526

527
                        if err := towers.Delete(towerIDBytes); err != nil {
×
528
                                return err
×
529
                        }
×
530

531
                        return towersToSessionsIndex.DeleteNestedBucket(
×
532
                                towerIDBytes,
×
533
                        )
×
534
                }
535

536
                // Otherwise, we mark the tower as inactive.
537
                tower.Status = TowerStatusInactive
4✔
538
                err = putTower(towers, tower)
4✔
539
                if err != nil {
4✔
540
                        return err
×
541
                }
×
542

543
                // We'll do a check to ensure that the tower's sessions don't
544
                // have any pending back-ups.
545
                for _, session := range towerSessions {
8✔
546
                        if committedUpdateCount[session.ID] > 0 {
4✔
547
                                return ErrTowerUnackedUpdates
×
548
                        }
×
549
                }
550

551
                return nil
4✔
552
        }, func() {})
4✔
553
}
554

555
// DeactivateTower sets the given tower's status to inactive. This means that
556
// this tower's sessions won't be loaded and used for backups. CreateTower can
557
// be used to reactivate the tower again.
558
func (c *ClientDB) DeactivateTower(pubKey *btcec.PublicKey) error {
4✔
559
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
560
                towers := tx.ReadWriteBucket(cTowerBkt)
4✔
561
                if towers == nil {
4✔
562
                        return ErrUninitializedDB
×
563
                }
×
564

565
                towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
4✔
566
                if towerIndex == nil {
4✔
567
                        return ErrUninitializedDB
×
568
                }
×
569

570
                towersToSessionsIndex := tx.ReadWriteBucket(
4✔
571
                        cTowerToSessionIndexBkt,
4✔
572
                )
4✔
573
                if towersToSessionsIndex == nil {
4✔
574
                        return ErrUninitializedDB
×
575
                }
×
576

577
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
578
                if chanIDIndexBkt == nil {
4✔
579
                        return ErrUninitializedDB
×
580
                }
×
581

582
                pubKeyBytes := pubKey.SerializeCompressed()
4✔
583
                towerIDBytes := towerIndex.Get(pubKeyBytes)
4✔
584
                if towerIDBytes == nil {
4✔
585
                        return ErrTowerNotFound
×
586
                }
×
587

588
                tower, err := getTower(towers, towerIDBytes)
4✔
589
                if err != nil {
4✔
590
                        return err
×
591
                }
×
592

593
                // If the tower already has the desired status, then we can exit
594
                // here.
595
                if tower.Status == TowerStatusInactive {
4✔
596
                        return nil
×
597
                }
×
598

599
                // Otherwise, we update the status and re-store the tower.
600
                tower.Status = TowerStatusInactive
4✔
601

4✔
602
                return putTower(towers, tower)
4✔
603
        }, func() {})
4✔
604
}
605

606
// LoadTowerByID retrieves a tower by its tower ID.
607
func (c *ClientDB) LoadTowerByID(towerID TowerID) (*Tower, error) {
4✔
608
        var tower *Tower
4✔
609
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
610
                towers := tx.ReadBucket(cTowerBkt)
4✔
611
                if towers == nil {
4✔
612
                        return ErrUninitializedDB
×
613
                }
×
614

615
                var err error
4✔
616
                tower, err = getTower(towers, towerID.Bytes())
4✔
617
                return err
4✔
618
        }, func() {
4✔
619
                tower = nil
4✔
620
        })
4✔
621
        if err != nil {
4✔
622
                return nil, err
×
623
        }
×
624

625
        return tower, nil
4✔
626
}
627

628
// LoadTower retrieves a tower by its public key.
629
func (c *ClientDB) LoadTower(pubKey *btcec.PublicKey) (*Tower, error) {
4✔
630
        var tower *Tower
4✔
631
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
632
                towers := tx.ReadBucket(cTowerBkt)
4✔
633
                if towers == nil {
4✔
634
                        return ErrUninitializedDB
×
635
                }
×
636
                towerIndex := tx.ReadBucket(cTowerIndexBkt)
4✔
637
                if towerIndex == nil {
4✔
638
                        return ErrUninitializedDB
×
639
                }
×
640

641
                towerIDBytes := towerIndex.Get(pubKey.SerializeCompressed())
4✔
642
                if towerIDBytes == nil {
4✔
643
                        return ErrTowerNotFound
×
644
                }
×
645

646
                var err error
4✔
647
                tower, err = getTower(towers, towerIDBytes)
4✔
648
                return err
4✔
649
        }, func() {
4✔
650
                tower = nil
4✔
651
        })
4✔
652
        if err != nil {
4✔
653
                return nil, err
×
654
        }
×
655

656
        return tower, nil
4✔
657
}
658

659
// TowerFilterFn is the signature of a call-back function that can be used to
660
// skip certain towers in the ListTowers method.
661
type TowerFilterFn func(*Tower) bool
662

663
// ListTowers retrieves the list of towers available within the database that
664
// have a status matching the given status. The filter function may be set in
665
// order to filter out the towers to be returned.
666
func (c *ClientDB) ListTowers(filter TowerFilterFn) ([]*Tower, error) {
4✔
667
        var towers []*Tower
4✔
668
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
669
                towerBucket := tx.ReadBucket(cTowerBkt)
4✔
670
                if towerBucket == nil {
4✔
671
                        return ErrUninitializedDB
×
672
                }
×
673

674
                return towerBucket.ForEach(func(towerIDBytes, _ []byte) error {
8✔
675
                        tower, err := getTower(towerBucket, towerIDBytes)
4✔
676
                        if err != nil {
4✔
677
                                return err
×
678
                        }
×
679

680
                        if filter != nil && !filter(tower) {
4✔
681
                                return nil
×
682
                        }
×
683

684
                        towers = append(towers, tower)
4✔
685

4✔
686
                        return nil
4✔
687
                })
688
        }, func() {
4✔
689
                towers = nil
4✔
690
        })
4✔
691
        if err != nil {
4✔
692
                return nil, err
×
693
        }
×
694

695
        return towers, nil
4✔
696
}
697

698
// NextSessionKeyIndex reserves a new session key derivation index for a
699
// particular tower id. The index is reserved for that tower until
700
// CreateClientSession is invoked for that tower and index, at which point a new
701
// index for that tower can be reserved. Multiple calls to this method before
702
// CreateClientSession is invoked should return the same index unless forceNext
703
// is true.
704
func (c *ClientDB) NextSessionKeyIndex(towerID TowerID,
705
        blobType blob.Type, forceNext bool) (uint32, error) {
4✔
706

4✔
707
        var index uint32
4✔
708
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
709
                keyIndex := tx.ReadWriteBucket(cSessionKeyIndexBkt)
4✔
710
                if keyIndex == nil {
4✔
711
                        return ErrUninitializedDB
×
712
                }
×
713

714
                var err error
4✔
715
                if !forceNext {
8✔
716
                        // Check the session key index to see if a key has
4✔
717
                        // already been reserved for this tower. If so, we'll
4✔
718
                        // deserialize and return the index directly.
4✔
719
                        index, err = getSessionKeyIndex(
4✔
720
                                keyIndex, towerID, blobType,
4✔
721
                        )
4✔
722
                        if err == nil {
4✔
723
                                return nil
×
724
                        }
×
725
                }
726

727
                // By default, we use the next available bucket sequence as the
728
                // key index. But if forceNext is true, then it is assumed that
729
                // some data loss occurred and so the sequence is incremented a
730
                // by a jump of 1000 so that we can arrive at a brand new key
731
                // index quicker.
732
                currentSequence := keyIndex.Sequence()
4✔
733
                nextIndex := currentSequence + 1
4✔
734
                if forceNext {
4✔
735
                        nextIndex = currentSequence + 1000
×
736
                }
×
737

738
                if err = keyIndex.SetSequence(nextIndex); err != nil {
4✔
739
                        return fmt.Errorf("could not set next bucket "+
×
740
                                "sequence: %w", err)
×
741
                }
×
742

743
                // As a sanity check, assert that the index is still in the
744
                // valid range of unhardened pubkeys. In the future, we should
745
                // move to only using hardened keys, and this will prevent any
746
                // overlap from occurring until then. This also prevents us from
747
                // overflowing uint32s.
748
                if nextIndex > math.MaxInt32 {
4✔
749
                        return fmt.Errorf("exhausted session key indexes")
×
750
                }
×
751

752
                // Create the key that will used to be store the reserved index.
753
                keyBytes := createSessionKeyIndexKey(towerID, blobType)
4✔
754

4✔
755
                index = uint32(nextIndex)
4✔
756

4✔
757
                var indexBuf [4]byte
4✔
758
                byteOrder.PutUint32(indexBuf[:], index)
4✔
759

4✔
760
                // Record the reserved session key index under this tower's id.
4✔
761
                return keyIndex.Put(keyBytes, indexBuf[:])
4✔
762
        }, func() {
4✔
763
                index = 0
4✔
764
        })
4✔
765
        if err != nil {
4✔
766
                return 0, err
×
767
        }
×
768

769
        return index, nil
4✔
770
}
771

772
// CreateClientSession records a newly negotiated client session in the set of
773
// active sessions. The session can be identified by its SessionID.
774
func (c *ClientDB) CreateClientSession(session *ClientSession) error {
4✔
775
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
776
                keyIndexes := tx.ReadWriteBucket(cSessionKeyIndexBkt)
4✔
777
                if keyIndexes == nil {
4✔
778
                        return ErrUninitializedDB
×
779
                }
×
780

781
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
782
                if sessions == nil {
4✔
783
                        return ErrUninitializedDB
×
784
                }
×
785

786
                towers := tx.ReadBucket(cTowerBkt)
4✔
787
                if towers == nil {
4✔
788
                        return ErrUninitializedDB
×
789
                }
×
790

791
                towerToSessionIndex := tx.ReadWriteBucket(
4✔
792
                        cTowerToSessionIndexBkt,
4✔
793
                )
4✔
794
                if towerToSessionIndex == nil {
4✔
795
                        return ErrUninitializedDB
×
796
                }
×
797

798
                // Check that  client session with this session id doesn't
799
                // already exist.
800
                existingSessionBytes := sessions.NestedReadWriteBucket(
4✔
801
                        session.ID[:],
4✔
802
                )
4✔
803
                if existingSessionBytes != nil {
4✔
804
                        return ErrClientSessionAlreadyExists
×
805
                }
×
806

807
                // Ensure that a tower with the given ID actually exists in the
808
                // DB.
809
                towerID := session.TowerID
4✔
810
                if _, err := getTower(towers, towerID.Bytes()); err != nil {
4✔
811
                        return err
×
812
                }
×
813

814
                blobType := session.Policy.BlobType
4✔
815

4✔
816
                // Check that this tower has a reserved key index.
4✔
817
                index, err := getSessionKeyIndex(keyIndexes, towerID, blobType)
4✔
818
                if err != nil {
4✔
819
                        return err
×
820
                }
×
821

822
                // Assert that the key index of the inserted session matches the
823
                // reserved session key index.
824
                if index != session.KeyIndex {
4✔
825
                        return ErrIncorrectKeyIndex
×
826
                }
×
827

828
                // Remove the key index reservation. For altruist commit
829
                // sessions, we'll also purge under the old legacy key format.
830
                key := createSessionKeyIndexKey(towerID, blobType)
4✔
831
                err = keyIndexes.Delete(key)
4✔
832
                if err != nil {
4✔
833
                        return err
×
834
                }
×
835
                if blobType == blob.TypeAltruistCommit {
8✔
836
                        err = keyIndexes.Delete(towerID.Bytes())
4✔
837
                        if err != nil {
4✔
838
                                return err
×
839
                        }
×
840
                }
841

842
                // Get the session-ID index bucket.
843
                dbIDIndex := tx.ReadWriteBucket(cSessionIDIndexBkt)
4✔
844
                if dbIDIndex == nil {
4✔
845
                        return ErrUninitializedDB
×
846
                }
×
847

848
                // Get a new, unique, ID for this session from the session-ID
849
                // index bucket.
850
                nextSeq, err := dbIDIndex.NextSequence()
4✔
851
                if err != nil {
4✔
852
                        return err
×
853
                }
×
854

855
                // Add the new entry to the dbID-to-SessionID index.
856
                newIndex, err := writeBigSize(nextSeq)
4✔
857
                if err != nil {
4✔
858
                        return err
×
859
                }
×
860

861
                err = dbIDIndex.Put(newIndex, session.ID[:])
4✔
862
                if err != nil {
4✔
863
                        return err
×
864
                }
×
865

866
                // Also add the db-assigned-id to the session bucket under the
867
                // cSessionDBID key.
868
                sessionBkt, err := sessions.CreateBucket(session.ID[:])
4✔
869
                if err != nil {
4✔
870
                        return err
×
871
                }
×
872

873
                err = sessionBkt.Put(cSessionDBID, newIndex)
4✔
874
                if err != nil {
4✔
875
                        return err
×
876
                }
×
877

878
                // TODO(elle): migrate the towerID-to-SessionID to use the
879
                // new db-assigned sessionID's rather.
880

881
                // Add the new entry to the towerID-to-SessionID index.
882
                towerSessions := towerToSessionIndex.NestedReadWriteBucket(
4✔
883
                        towerID.Bytes(),
4✔
884
                )
4✔
885
                if towerSessions == nil {
4✔
886
                        return ErrTowerNotFound
×
887
                }
×
888

889
                err = towerSessions.Put(session.ID[:], []byte{1})
4✔
890
                if err != nil {
4✔
891
                        return err
×
892
                }
×
893

894
                // Finally, write the client session's body in the sessions
895
                // bucket.
896
                return putClientSessionBody(sessionBkt, session)
4✔
897
        }, func() {})
4✔
898
}
899

900
// readRangeIndex reads a persisted RangeIndex from the passed bucket and into
901
// a new in-memory RangeIndex.
902
func readRangeIndex(rangesBkt kvdb.RBucket) (*RangeIndex, error) {
4✔
903
        ranges := make(map[uint64]uint64)
4✔
904
        err := rangesBkt.ForEach(func(k, v []byte) error {
8✔
905
                start, err := readBigSize(k)
4✔
906
                if err != nil {
4✔
907
                        return err
×
908
                }
×
909

910
                end, err := readBigSize(v)
4✔
911
                if err != nil {
4✔
912
                        return err
×
913
                }
×
914

915
                ranges[start] = end
4✔
916

4✔
917
                return nil
4✔
918
        })
919
        if err != nil {
4✔
920
                return nil, err
×
921
        }
×
922

923
        return NewRangeIndex(ranges, WithSerializeUint64Fn(writeBigSize))
4✔
924
}
925

926
// getRangeIndex checks the ClientDB's in-memory range index map to see if it
927
// has an entry for the given session and channel ID. If it does, this is
928
// returned, otherwise the range index is loaded from the DB. An optional db
929
// transaction parameter may be provided. If one is provided then it will be
930
// used to query the DB for the range index, otherwise, a new transaction will
931
// be created and used.
932
func (c *ClientDB) getRangeIndex(tx kvdb.RTx, sID SessionID,
933
        chanID lnwire.ChannelID) (*RangeIndex, error) {
4✔
934

4✔
935
        c.ackedRangeIndexMu.Lock()
4✔
936
        defer c.ackedRangeIndexMu.Unlock()
4✔
937

4✔
938
        if _, ok := c.ackedRangeIndex[sID]; !ok {
8✔
939
                c.ackedRangeIndex[sID] = make(map[lnwire.ChannelID]*RangeIndex)
4✔
940
        }
4✔
941

942
        // If the in-memory range-index map already includes an entry for this
943
        // session ID and channel ID pair, then return it.
944
        if index, ok := c.ackedRangeIndex[sID][chanID]; ok {
8✔
945
                return index, nil
4✔
946
        }
4✔
947

948
        // readRangeIndexFromBkt is a helper that is used to read in a
949
        // RangeIndex structure from the passed in bucket and store it in the
950
        // ackedRangeIndex map.
951
        readRangeIndexFromBkt := func(rangesBkt kvdb.RBucket) (*RangeIndex,
4✔
952
                error) {
8✔
953

4✔
954
                // Create a new in-memory RangeIndex by reading in ranges from
4✔
955
                // the DB.
4✔
956
                rangeIndex, err := readRangeIndex(rangesBkt)
4✔
957
                if err != nil {
4✔
958
                        return nil, err
×
959
                }
×
960

961
                c.ackedRangeIndex[sID][chanID] = rangeIndex
4✔
962

4✔
963
                return rangeIndex, nil
4✔
964
        }
965

966
        // If a DB transaction is provided then use it to fetch the ranges
967
        // bucket from the DB.
968
        if tx != nil {
8✔
969
                rangesBkt, err := getRangesReadBucket(tx, sID, chanID)
4✔
970
                if err != nil {
4✔
971
                        return nil, err
×
972
                }
×
973

974
                return readRangeIndexFromBkt(rangesBkt)
4✔
975
        }
976

977
        // No DB transaction was provided. So create and use a new one.
978
        var index *RangeIndex
×
979
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
×
980
                rangesBkt, err := getRangesReadBucket(tx, sID, chanID)
×
981
                if err != nil {
×
982
                        return err
×
983
                }
×
984

985
                index, err = readRangeIndexFromBkt(rangesBkt)
×
986

×
987
                return err
×
988
        }, func() {})
×
989
        if err != nil {
×
990
                return nil, err
×
991
        }
×
992

993
        return index, nil
×
994
}
995

996
// getRangesReadBucket gets the range index bucket where the range index for the
997
// given session-channel pair is stored. If any sub-buckets along the way do not
998
// exist, then an error is returned. If the sub-buckets should be created
999
// instead, then use getRangesWriteBucket.
1000
func getRangesReadBucket(tx kvdb.RTx, sID SessionID, chanID lnwire.ChannelID) (
1001
        kvdb.RBucket, error) {
4✔
1002

4✔
1003
        sessions := tx.ReadBucket(cSessionBkt)
4✔
1004
        if sessions == nil {
4✔
1005
                return nil, ErrUninitializedDB
×
1006
        }
×
1007

1008
        chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
4✔
1009
        if chanDetailsBkt == nil {
4✔
1010
                return nil, ErrUninitializedDB
×
1011
        }
×
1012

1013
        sessionBkt := sessions.NestedReadBucket(sID[:])
4✔
1014
        if sessionsBkt == nil {
4✔
1015
                return nil, ErrNoRangeIndexFound
×
1016
        }
×
1017

1018
        // Get the DB representation of the channel-ID.
1019
        _, dbChanIDBytes, err := getDBChanID(chanDetailsBkt, chanID)
4✔
1020
        if err != nil {
4✔
1021
                return nil, err
×
1022
        }
×
1023

1024
        sessionAckRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
4✔
1025
        if sessionAckRanges == nil {
4✔
1026
                return nil, ErrNoRangeIndexFound
×
1027
        }
×
1028

1029
        return sessionAckRanges.NestedReadBucket(dbChanIDBytes), nil
4✔
1030
}
1031

1032
// getRangesWriteBucket gets the range index bucket where the range index for
1033
// the given session-channel pair is stored. If any sub-buckets along the way do
1034
// not exist, then they are created.
1035
func getRangesWriteBucket(sessionBkt kvdb.RwBucket, dbChanIDBytes []byte) (
1036
        kvdb.RwBucket, error) {
4✔
1037

4✔
1038
        sessionAckRanges, err := sessionBkt.CreateBucketIfNotExists(
4✔
1039
                cSessionAckRangeIndex,
4✔
1040
        )
4✔
1041
        if err != nil {
4✔
1042
                return nil, err
×
1043
        }
×
1044

1045
        return sessionAckRanges.CreateBucketIfNotExists(dbChanIDBytes)
4✔
1046
}
1047

1048
// createSessionKeyIndexKey returns the identifier used in the
1049
// session-key-index index, created as tower-id||blob-type.
1050
//
1051
// NOTE: The original serialization only used tower-id, which prevents
1052
// concurrent client types from reserving sessions with the same tower.
1053
func createSessionKeyIndexKey(towerID TowerID, blobType blob.Type) []byte {
4✔
1054
        towerIDBytes := towerID.Bytes()
4✔
1055

4✔
1056
        // Session key indexes are stored under as tower-id||blob-type.
4✔
1057
        var keyBytes [6]byte
4✔
1058
        copy(keyBytes[:4], towerIDBytes)
4✔
1059
        byteOrder.PutUint16(keyBytes[4:], uint16(blobType))
4✔
1060

4✔
1061
        return keyBytes[:]
4✔
1062
}
4✔
1063

1064
// getSessionKeyIndex is a helper method.
1065
func getSessionKeyIndex(keyIndexes kvdb.RwBucket, towerID TowerID,
1066
        blobType blob.Type) (uint32, error) {
4✔
1067

4✔
1068
        // Session key indexes are store under as tower-id||blob-type. The
4✔
1069
        // original serialization only used tower-id, which prevents concurrent
4✔
1070
        // client types from reserving sessions with the same tower.
4✔
1071
        keyBytes := createSessionKeyIndexKey(towerID, blobType)
4✔
1072

4✔
1073
        // Retrieve the index using the key bytes. If the key wasn't found, we
4✔
1074
        // will fall back to the legacy format that only uses the tower id, but
4✔
1075
        // _only_ if the blob type is for altruist commit sessions since that
4✔
1076
        // was the only operational session type prior to changing the key
4✔
1077
        // format.
4✔
1078
        keyIndexBytes := keyIndexes.Get(keyBytes)
4✔
1079
        if keyIndexBytes == nil && blobType == blob.TypeAltruistCommit {
8✔
1080
                keyIndexBytes = keyIndexes.Get(towerID.Bytes())
4✔
1081
        }
4✔
1082

1083
        // All session key indexes should be serialized uint32's. If no key
1084
        // index was found, the length of keyIndexBytes will be 0.
1085
        if len(keyIndexBytes) != 4 {
8✔
1086
                return 0, ErrNoReservedKeyIndex
4✔
1087
        }
4✔
1088

1089
        return byteOrder.Uint32(keyIndexBytes), nil
4✔
1090
}
1091

1092
// GetClientSession loads the ClientSession with the given ID from the DB.
1093
func (c *ClientDB) GetClientSession(id SessionID,
1094
        opts ...ClientSessionListOption) (*ClientSession, error) {
4✔
1095

4✔
1096
        var sess *ClientSession
4✔
1097
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1098
                sessionsBkt := tx.ReadBucket(cSessionBkt)
4✔
1099
                if sessionsBkt == nil {
4✔
1100
                        return ErrUninitializedDB
×
1101
                }
×
1102

1103
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
1104
                if chanIDIndexBkt == nil {
4✔
1105
                        return ErrUninitializedDB
×
1106
                }
×
1107

1108
                session, err := c.getClientSession(
4✔
1109
                        sessionsBkt, chanIDIndexBkt, id[:], opts...,
4✔
1110
                )
4✔
1111
                if err != nil {
4✔
1112
                        return err
×
1113
                }
×
1114

1115
                sess = session
4✔
1116

4✔
1117
                return nil
4✔
1118
        }, func() {})
4✔
1119

1120
        return sess, err
4✔
1121
}
1122

1123
// ListClientSessions returns the set of all client sessions known to the db. An
1124
// optional tower ID can be used to filter out any client sessions in the
1125
// response that do not correspond to this tower.
1126
func (c *ClientDB) ListClientSessions(id *TowerID,
1127
        opts ...ClientSessionListOption) (map[SessionID]*ClientSession, error) {
4✔
1128

4✔
1129
        var clientSessions map[SessionID]*ClientSession
4✔
1130
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1131
                sessions := tx.ReadBucket(cSessionBkt)
4✔
1132
                if sessions == nil {
4✔
1133
                        return ErrUninitializedDB
×
1134
                }
×
1135

1136
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
1137
                if chanIDIndexBkt == nil {
4✔
1138
                        return ErrUninitializedDB
×
1139
                }
×
1140

1141
                // If no tower ID is specified, then fetch all the sessions
1142
                // known to the db.
1143
                var err error
4✔
1144
                if id == nil {
4✔
1145
                        clientSessions, err = c.listClientAllSessions(
×
1146
                                sessions, chanIDIndexBkt, opts...,
×
1147
                        )
×
1148
                        return err
×
1149
                }
×
1150

1151
                // Otherwise, fetch the sessions for the given tower.
1152
                towerToSessionIndex := tx.ReadBucket(cTowerToSessionIndexBkt)
4✔
1153
                if towerToSessionIndex == nil {
4✔
1154
                        return ErrUninitializedDB
×
1155
                }
×
1156

1157
                clientSessions, err = c.listTowerSessions(
4✔
1158
                        *id, sessions, chanIDIndexBkt, towerToSessionIndex,
4✔
1159
                        opts...,
4✔
1160
                )
4✔
1161
                return err
4✔
1162
        }, func() {
4✔
1163
                clientSessions = nil
4✔
1164
        })
4✔
1165
        if err != nil {
4✔
1166
                return nil, err
×
1167
        }
×
1168

1169
        return clientSessions, nil
4✔
1170
}
1171

1172
// listClientAllSessions returns the set of all client sessions known to the db.
1173
func (c *ClientDB) listClientAllSessions(sessions, chanIDIndexBkt kvdb.RBucket,
1174
        opts ...ClientSessionListOption) (map[SessionID]*ClientSession, error) {
×
1175

×
1176
        clientSessions := make(map[SessionID]*ClientSession)
×
1177
        err := sessions.ForEach(func(k, _ []byte) error {
×
1178
                // We'll load the full client session since the client will need
×
1179
                // the CommittedUpdates and AckedUpdates on startup to resume
×
1180
                // committed updates and compute the highest known commit height
×
1181
                // for each channel.
×
1182
                session, err := c.getClientSession(
×
1183
                        sessions, chanIDIndexBkt, k, opts...,
×
1184
                )
×
1185
                if errors.Is(err, ErrSessionFailedFilterFn) {
×
1186
                        return nil
×
1187
                } else if err != nil {
×
1188
                        return err
×
1189
                }
×
1190

1191
                clientSessions[session.ID] = session
×
1192

×
1193
                return nil
×
1194
        })
1195
        if err != nil {
×
1196
                return nil, err
×
1197
        }
×
1198

1199
        return clientSessions, nil
×
1200
}
1201

1202
// listTowerSessions returns the set of all client sessions known to the db
1203
// that are associated with the given tower id.
1204
func (c *ClientDB) listTowerSessions(id TowerID, sessionsBkt, chanIDIndexBkt,
1205
        towerToSessionIndex kvdb.RBucket, opts ...ClientSessionListOption) (
1206
        map[SessionID]*ClientSession, error) {
4✔
1207

4✔
1208
        towerIndexBkt := towerToSessionIndex.NestedReadBucket(id.Bytes())
4✔
1209
        if towerIndexBkt == nil {
4✔
1210
                return nil, ErrTowerNotFound
×
1211
        }
×
1212

1213
        clientSessions := make(map[SessionID]*ClientSession)
4✔
1214
        err := towerIndexBkt.ForEach(func(k, _ []byte) error {
8✔
1215
                // We'll load the full client session since the client will need
4✔
1216
                // the CommittedUpdates and AckedUpdates on startup to resume
4✔
1217
                // committed updates and compute the highest known commit height
4✔
1218
                // for each channel.
4✔
1219
                session, err := c.getClientSession(
4✔
1220
                        sessionsBkt, chanIDIndexBkt, k, opts...,
4✔
1221
                )
4✔
1222
                if errors.Is(err, ErrSessionFailedFilterFn) {
8✔
1223
                        return nil
4✔
1224
                } else if err != nil {
8✔
1225
                        return err
×
1226
                }
×
1227

1228
                clientSessions[session.ID] = session
4✔
1229
                return nil
4✔
1230
        })
1231
        if err != nil {
4✔
1232
                return nil, err
×
1233
        }
×
1234

1235
        return clientSessions, nil
4✔
1236
}
1237

1238
// FetchSessionCommittedUpdates retrieves the current set of un-acked updates
1239
// of the given session.
1240
func (c *ClientDB) FetchSessionCommittedUpdates(id *SessionID) (
1241
        []CommittedUpdate, error) {
4✔
1242

4✔
1243
        var committedUpdates []CommittedUpdate
4✔
1244
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1245
                sessions := tx.ReadBucket(cSessionBkt)
4✔
1246
                if sessions == nil {
4✔
1247
                        return ErrUninitializedDB
×
1248
                }
×
1249

1250
                sessionBkt := sessions.NestedReadBucket(id[:])
4✔
1251
                if sessionBkt == nil {
4✔
1252
                        return ErrClientSessionNotFound
×
1253
                }
×
1254

1255
                var err error
4✔
1256
                committedUpdates, err = getClientSessionCommits(
4✔
1257
                        sessionBkt, nil, nil,
4✔
1258
                )
4✔
1259
                return err
4✔
1260
        }, func() {})
4✔
1261
        if err != nil {
4✔
1262
                return nil, err
×
1263
        }
×
1264

1265
        return committedUpdates, nil
4✔
1266
}
1267

1268
// IsAcked returns true if the given backup has been backed up using the given
1269
// session.
1270
func (c *ClientDB) IsAcked(id *SessionID, backupID *BackupID) (bool, error) {
×
1271
        index, err := c.getRangeIndex(nil, *id, backupID.ChanID)
×
1272
        if errors.Is(err, ErrNoRangeIndexFound) {
×
1273
                return false, nil
×
1274
        } else if err != nil {
×
1275
                return false, err
×
1276
        }
×
1277

1278
        return index.IsInIndex(backupID.CommitHeight), nil
×
1279
}
1280

1281
// NumAckedUpdates returns the number of backups that have been successfully
1282
// backed up using the given session.
1283
func (c *ClientDB) NumAckedUpdates(id *SessionID) (uint64, error) {
×
1284
        var numAcked uint64
×
1285
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
×
1286
                sessions := tx.ReadBucket(cSessionBkt)
×
1287
                if sessions == nil {
×
1288
                        return ErrUninitializedDB
×
1289
                }
×
1290

1291
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
×
1292
                if chanIDIndexBkt == nil {
×
1293
                        return ErrUninitializedDB
×
1294
                }
×
1295

1296
                sessionBkt := sessions.NestedReadBucket(id[:])
×
1297
                if sessionBkt == nil {
×
1298
                        return nil
×
1299
                }
×
1300

1301
                // First, account for any rogue updates.
1302
                rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
×
1303
                if len(rogueCountBytes) != 0 {
×
1304
                        rogueCount, err := readBigSize(rogueCountBytes)
×
1305
                        if err != nil {
×
1306
                                return err
×
1307
                        }
×
1308

1309
                        numAcked += rogueCount
×
1310
                }
1311

1312
                // Then, check if the session-ack-ranges contains any entries
1313
                // to account for.
1314
                sessionAckRanges := sessionBkt.NestedReadBucket(
×
1315
                        cSessionAckRangeIndex,
×
1316
                )
×
1317
                if sessionAckRanges == nil {
×
1318
                        return nil
×
1319
                }
×
1320

1321
                // Iterate over the channel ID's in the sessionAckRanges
1322
                // bucket.
1323
                return sessionAckRanges.ForEach(func(dbChanID, _ []byte) error {
×
1324
                        // Get the range index for the session-channel pair.
×
1325
                        chanIDBytes := chanIDIndexBkt.Get(dbChanID)
×
1326
                        var chanID lnwire.ChannelID
×
1327
                        copy(chanID[:], chanIDBytes)
×
1328

×
1329
                        index, err := c.getRangeIndex(tx, *id, chanID)
×
1330
                        if err != nil {
×
1331
                                return err
×
1332
                        }
×
1333

1334
                        numAcked += index.NumInSet()
×
1335

×
1336
                        return nil
×
1337
                })
1338
        }, func() {
×
1339
                numAcked = 0
×
1340
        })
×
1341
        if err != nil {
×
1342
                return 0, err
×
1343
        }
×
1344

1345
        return numAcked, nil
×
1346
}
1347

1348
// FetchChanInfos loads a mapping from all registered channels to their
1349
// ChannelInfo. Only the channels that have not yet been marked as closed will
1350
// be loaded.
1351
func (c *ClientDB) FetchChanInfos() (ChannelInfos, error) {
4✔
1352
        var infos ChannelInfos
4✔
1353

4✔
1354
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1355
                chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
4✔
1356
                if chanDetailsBkt == nil {
4✔
1357
                        return ErrUninitializedDB
×
1358
                }
×
1359

1360
                return chanDetailsBkt.ForEach(func(k, _ []byte) error {
8✔
1361
                        chanDetails := chanDetailsBkt.NestedReadBucket(k)
4✔
1362
                        if chanDetails == nil {
4✔
1363
                                return ErrCorruptChanDetails
×
1364
                        }
×
1365
                        // If this channel has already been marked as closed,
1366
                        // then its summary does not need to be loaded.
1367
                        closedHeight := chanDetails.Get(cChanClosedHeight)
4✔
1368
                        if len(closedHeight) > 0 {
4✔
1369
                                return nil
×
1370
                        }
×
1371
                        var chanID lnwire.ChannelID
4✔
1372
                        copy(chanID[:], k)
4✔
1373
                        summary, err := getChanSummary(chanDetails)
4✔
1374
                        if err != nil {
4✔
1375
                                return err
×
1376
                        }
×
1377

1378
                        info := &ChannelInfo{
4✔
1379
                                ClientChanSummary: *summary,
4✔
1380
                        }
4✔
1381

4✔
1382
                        maxHeightBytes := chanDetails.Get(
4✔
1383
                                cChanMaxCommitmentHeight,
4✔
1384
                        )
4✔
1385
                        if len(maxHeightBytes) != 0 {
8✔
1386
                                height, err := readBigSize(maxHeightBytes)
4✔
1387
                                if err != nil {
4✔
1388
                                        return err
×
1389
                                }
×
1390

1391
                                info.MaxHeight = fn.Some(height)
4✔
1392
                        }
1393

1394
                        infos[chanID] = info
4✔
1395

4✔
1396
                        return nil
4✔
1397
                })
1398
        }, func() {
4✔
1399
                infos = make(ChannelInfos)
4✔
1400
        })
4✔
1401
        if err != nil {
4✔
1402
                return nil, err
×
1403
        }
×
1404

1405
        return infos, nil
4✔
1406
}
1407

1408
// RegisterChannel registers a channel for use within the client database. For
1409
// now, all that is stored in the channel summary is the sweep pkscript that
1410
// we'd like any tower sweeps to pay into. In the future, this will be extended
1411
// to contain more info to allow the client efficiently request historical
1412
// states to be backed up under the client's active policy.
1413
func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID,
1414
        sweepPkScript []byte) error {
4✔
1415

4✔
1416
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
1417
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
1418
                if chanDetailsBkt == nil {
4✔
1419
                        return ErrUninitializedDB
×
1420
                }
×
1421

1422
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:])
4✔
1423
                if chanDetails != nil {
4✔
1424
                        // Channel is already registered.
×
1425
                        return ErrChannelAlreadyRegistered
×
1426
                }
×
1427

1428
                chanDetails, err := chanDetailsBkt.CreateBucket(chanID[:])
4✔
1429
                if err != nil {
4✔
1430
                        return err
×
1431
                }
×
1432

1433
                // Get the channel-id-index bucket.
1434
                indexBkt := tx.ReadWriteBucket(cChanIDIndexBkt)
4✔
1435
                if indexBkt == nil {
4✔
1436
                        return ErrUninitializedDB
×
1437
                }
×
1438

1439
                // Request the next unique id from the bucket.
1440
                nextSeq, err := indexBkt.NextSequence()
4✔
1441
                if err != nil {
4✔
1442
                        return err
×
1443
                }
×
1444

1445
                // Use BigSize encoding to encode the db-assigned index.
1446
                newIndex, err := writeBigSize(nextSeq)
4✔
1447
                if err != nil {
4✔
1448
                        return err
×
1449
                }
×
1450

1451
                // Add the new db-assigned ID to channel-ID pair.
1452
                err = indexBkt.Put(newIndex, chanID[:])
4✔
1453
                if err != nil {
4✔
1454
                        return err
×
1455
                }
×
1456

1457
                // Add the db-assigned ID to the channel's channel details
1458
                // bucket under the cChanDBID key.
1459
                err = chanDetails.Put(cChanDBID, newIndex)
4✔
1460
                if err != nil {
4✔
1461
                        return err
×
1462
                }
×
1463

1464
                summary := ClientChanSummary{
4✔
1465
                        SweepPkScript: sweepPkScript,
4✔
1466
                }
4✔
1467

4✔
1468
                return putChanSummary(chanDetails, &summary)
4✔
1469
        }, func() {})
4✔
1470
}
1471

1472
// MarkBackupIneligible records that the state identified by the (channel id,
1473
// commit height) tuple was ineligible for being backed up under the current
1474
// policy. This state can be retried later under a different policy.
1475
func (c *ClientDB) MarkBackupIneligible(chanID lnwire.ChannelID,
1476
        commitHeight uint64) error {
×
1477

×
1478
        return nil
×
1479
}
×
1480

1481
// ListClosableSessions fetches and returns the IDs for all sessions marked as
1482
// closable.
1483
func (c *ClientDB) ListClosableSessions() (map[SessionID]uint32, error) {
4✔
1484
        sessions := make(map[SessionID]uint32)
4✔
1485
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1486
                csBkt := tx.ReadBucket(cClosableSessionsBkt)
4✔
1487
                if csBkt == nil {
4✔
1488
                        return ErrUninitializedDB
×
1489
                }
×
1490

1491
                sessIDIndexBkt := tx.ReadBucket(cSessionIDIndexBkt)
4✔
1492
                if sessIDIndexBkt == nil {
4✔
1493
                        return ErrUninitializedDB
×
1494
                }
×
1495

1496
                return csBkt.ForEach(func(dbIDBytes, heightBytes []byte) error {
4✔
1497
                        dbID, err := readBigSize(dbIDBytes)
×
1498
                        if err != nil {
×
1499
                                return err
×
1500
                        }
×
1501

1502
                        sessID, err := getRealSessionID(sessIDIndexBkt, dbID)
×
1503
                        if err != nil {
×
1504
                                return err
×
1505
                        }
×
1506

1507
                        sessions[*sessID] = byteOrder.Uint32(heightBytes)
×
1508

×
1509
                        return nil
×
1510
                })
1511
        }, func() {
4✔
1512
                sessions = make(map[SessionID]uint32)
4✔
1513
        })
4✔
1514
        if err != nil {
4✔
1515
                return nil, err
×
1516
        }
×
1517

1518
        return sessions, nil
4✔
1519
}
1520

1521
// DeleteSession can be called when a session should be deleted from the DB.
1522
// All references to the session will also be deleted from the DB. Note that a
1523
// session will only be deleted if was previously marked as closable.
1524
func (c *ClientDB) DeleteSession(id SessionID) error {
4✔
1525
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
1526
                sessionsBkt := tx.ReadWriteBucket(cSessionBkt)
4✔
1527
                if sessionsBkt == nil {
4✔
1528
                        return ErrUninitializedDB
×
1529
                }
×
1530

1531
                closableBkt := tx.ReadWriteBucket(cClosableSessionsBkt)
4✔
1532
                if closableBkt == nil {
4✔
1533
                        return ErrUninitializedDB
×
1534
                }
×
1535

1536
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
1537
                if chanDetailsBkt == nil {
4✔
1538
                        return ErrUninitializedDB
×
1539
                }
×
1540

1541
                sessIDIndexBkt := tx.ReadWriteBucket(cSessionIDIndexBkt)
4✔
1542
                if sessIDIndexBkt == nil {
4✔
1543
                        return ErrUninitializedDB
×
1544
                }
×
1545

1546
                chanIDIndexBkt := tx.ReadWriteBucket(cChanIDIndexBkt)
4✔
1547
                if chanIDIndexBkt == nil {
4✔
1548
                        return ErrUninitializedDB
×
1549
                }
×
1550

1551
                towerToSessBkt := tx.ReadWriteBucket(cTowerToSessionIndexBkt)
4✔
1552
                if towerToSessBkt == nil {
4✔
1553
                        return ErrUninitializedDB
×
1554
                }
×
1555

1556
                // Get the sub-bucket for this session ID. If it does not exist
1557
                // then the session has already been deleted and so our work is
1558
                // done.
1559
                sessionBkt := sessionsBkt.NestedReadBucket(id[:])
4✔
1560
                if sessionBkt == nil {
4✔
1561
                        return nil
×
1562
                }
×
1563

1564
                _, dbIDBytes, err := getDBSessionID(sessionsBkt, id)
4✔
1565
                if err != nil {
4✔
1566
                        return err
×
1567
                }
×
1568

1569
                // First we check if the session has actually been marked as
1570
                // closable.
1571
                if closableBkt.Get(dbIDBytes) == nil {
4✔
1572
                        return ErrSessionNotClosable
×
1573
                }
×
1574

1575
                sess, err := getClientSessionBody(sessionsBkt, id[:])
4✔
1576
                if err != nil {
4✔
1577
                        return err
×
1578
                }
×
1579

1580
                // Delete from the tower-to-sessionID index.
1581
                towerIndexBkt := towerToSessBkt.NestedReadWriteBucket(
4✔
1582
                        sess.TowerID.Bytes(),
4✔
1583
                )
4✔
1584
                if towerIndexBkt == nil {
4✔
1585
                        return fmt.Errorf("no entry in the tower-to-session "+
×
1586
                                "index found for tower ID %v", sess.TowerID)
×
1587
                }
×
1588

1589
                err = towerIndexBkt.Delete(id[:])
4✔
1590
                if err != nil {
4✔
1591
                        return err
×
1592
                }
×
1593

1594
                // Delete entry from session ID index.
1595
                err = sessIDIndexBkt.Delete(dbIDBytes)
4✔
1596
                if err != nil {
4✔
1597
                        return err
×
1598
                }
×
1599

1600
                // Delete the entry from the closable sessions index.
1601
                err = closableBkt.Delete(dbIDBytes)
4✔
1602
                if err != nil {
4✔
1603
                        return err
×
1604
                }
×
1605

1606
                ackRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
4✔
1607

4✔
1608
                // There is a small chance that the session only contains rogue
4✔
1609
                // updates. In that case, there will be no ack-ranges index but
4✔
1610
                // the rogue update count will be equal the MaxUpdates.
4✔
1611
                rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
4✔
1612
                if len(rogueCountBytes) != 0 {
4✔
1613
                        rogueCount, err := readBigSize(rogueCountBytes)
×
1614
                        if err != nil {
×
1615
                                return err
×
1616
                        }
×
1617

1618
                        maxUpdates := sess.ClientSessionBody.Policy.MaxUpdates
×
1619
                        if rogueCount == uint64(maxUpdates) {
×
1620
                                // Do a sanity check to ensure that the acked
×
1621
                                // ranges bucket does not exist in this case.
×
1622
                                if ackRanges != nil {
×
1623
                                        return fmt.Errorf("acked updates "+
×
1624
                                                "exist for session with a "+
×
1625
                                                "max-updates(%d) rogue count",
×
1626
                                                rogueCount)
×
1627
                                }
×
1628

1629
                                return sessionsBkt.DeleteNestedBucket(id[:])
×
1630
                        }
1631
                }
1632

1633
                // A session would only be considered closable if it was
1634
                // exhausted. Meaning that it should not be the case that it has
1635
                // no acked-updates.
1636
                if ackRanges == nil {
4✔
1637
                        return fmt.Errorf("cannot delete session %s since it "+
×
1638
                                "is not yet exhausted", id)
×
1639
                }
×
1640

1641
                // For each of the channels, delete the session ID entry.
1642
                err = ackRanges.ForEach(func(chanDBID, _ []byte) error {
8✔
1643
                        chanDBIDInt, err := readBigSize(chanDBID)
4✔
1644
                        if err != nil {
4✔
1645
                                return err
×
1646
                        }
×
1647

1648
                        chanID, err := getRealChannelID(
4✔
1649
                                chanIDIndexBkt, chanDBIDInt,
4✔
1650
                        )
4✔
1651
                        if err != nil {
4✔
1652
                                return err
×
1653
                        }
×
1654

1655
                        chanDetails := chanDetailsBkt.NestedReadWriteBucket(
4✔
1656
                                chanID[:],
4✔
1657
                        )
4✔
1658
                        if chanDetails == nil {
4✔
1659
                                return ErrChannelNotRegistered
×
1660
                        }
×
1661

1662
                        chanSessions := chanDetails.NestedReadWriteBucket(
4✔
1663
                                cChanSessions,
4✔
1664
                        )
4✔
1665
                        if chanSessions == nil {
4✔
1666
                                return fmt.Errorf("no session list found for "+
×
1667
                                        "channel %s", chanID)
×
1668
                        }
×
1669

1670
                        // Check that this session was actually listed in the
1671
                        // session list for this channel.
1672
                        if len(chanSessions.Get(dbIDBytes)) == 0 {
4✔
1673
                                return fmt.Errorf("session %s not found in "+
×
1674
                                        "the session list for channel %s", id,
×
1675
                                        chanID)
×
1676
                        }
×
1677

1678
                        // If it was, then delete it.
1679
                        err = chanSessions.Delete(dbIDBytes)
4✔
1680
                        if err != nil {
4✔
1681
                                return err
×
1682
                        }
×
1683

1684
                        // If this was the last session for this channel, we can
1685
                        // now delete the channel details for this channel
1686
                        // completely.
1687
                        err = chanSessions.ForEach(func(_, _ []byte) error {
8✔
1688
                                return errChannelHasMoreSessions
4✔
1689
                        })
4✔
1690
                        if errors.Is(err, errChannelHasMoreSessions) {
8✔
1691
                                return nil
4✔
1692
                        } else if err != nil {
8✔
1693
                                return err
×
1694
                        }
×
1695

1696
                        // Delete the channel's entry from the channel-id-index.
1697
                        dbID := chanDetails.Get(cChanDBID)
4✔
1698
                        err = chanIDIndexBkt.Delete(dbID)
4✔
1699
                        if err != nil {
4✔
1700
                                return err
×
1701
                        }
×
1702

1703
                        // Delete the channel details.
1704
                        return chanDetailsBkt.DeleteNestedBucket(chanID[:])
4✔
1705
                })
1706
                if err != nil {
4✔
1707
                        return err
×
1708
                }
×
1709

1710
                // Delete the actual session.
1711
                return sessionsBkt.DeleteNestedBucket(id[:])
4✔
1712
        }, func() {})
4✔
1713
}
1714

1715
// MarkChannelClosed will mark a registered channel as closed by setting its
1716
// closed-height as the given block height. It returns a list of session IDs for
1717
// sessions that are now considered closable due to the close of this channel.
1718
// The details for this channel will be deleted from the DB if there are no more
1719
// sessions in the DB that contain updates for this channel.
1720
func (c *ClientDB) MarkChannelClosed(chanID lnwire.ChannelID,
1721
        blockHeight uint32) ([]SessionID, error) {
4✔
1722

4✔
1723
        var closableSessions []SessionID
4✔
1724
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
1725
                sessionsBkt := tx.ReadBucket(cSessionBkt)
4✔
1726
                if sessionsBkt == nil {
4✔
1727
                        return ErrUninitializedDB
×
1728
                }
×
1729

1730
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
1731
                if chanDetailsBkt == nil {
4✔
1732
                        return ErrUninitializedDB
×
1733
                }
×
1734

1735
                closableSessBkt := tx.ReadWriteBucket(cClosableSessionsBkt)
4✔
1736
                if closableSessBkt == nil {
4✔
1737
                        return ErrUninitializedDB
×
1738
                }
×
1739

1740
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
1741
                if chanIDIndexBkt == nil {
4✔
1742
                        return ErrUninitializedDB
×
1743
                }
×
1744

1745
                sessIDIndexBkt := tx.ReadBucket(cSessionIDIndexBkt)
4✔
1746
                if sessIDIndexBkt == nil {
4✔
1747
                        return ErrUninitializedDB
×
1748
                }
×
1749

1750
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:])
4✔
1751
                if chanDetails == nil {
4✔
1752
                        return ErrChannelNotRegistered
×
1753
                }
×
1754

1755
                // If there are no sessions for this channel, the channel
1756
                // details can be deleted.
1757
                chanSessIDsBkt := chanDetails.NestedReadBucket(cChanSessions)
4✔
1758
                if chanSessIDsBkt == nil {
4✔
1759
                        return chanDetailsBkt.DeleteNestedBucket(chanID[:])
×
1760
                }
×
1761

1762
                // Otherwise, mark the channel as closed.
1763
                var height [4]byte
4✔
1764
                byteOrder.PutUint32(height[:], blockHeight)
4✔
1765

4✔
1766
                err := chanDetails.Put(cChanClosedHeight, height[:])
4✔
1767
                if err != nil {
4✔
1768
                        return err
×
1769
                }
×
1770

1771
                // Now iterate through all the sessions of the channel to check
1772
                // if any of them are closeable.
1773
                return chanSessIDsBkt.ForEach(func(sessDBID, _ []byte) error {
8✔
1774
                        sessDBIDInt, err := readBigSize(sessDBID)
4✔
1775
                        if err != nil {
4✔
1776
                                return err
×
1777
                        }
×
1778

1779
                        // Use the session-ID index to get the real session ID.
1780
                        sID, err := getRealSessionID(
4✔
1781
                                sessIDIndexBkt, sessDBIDInt,
4✔
1782
                        )
4✔
1783
                        if err != nil {
4✔
1784
                                return err
×
1785
                        }
×
1786

1787
                        isClosable, err := isSessionClosable(
4✔
1788
                                sessionsBkt, chanDetailsBkt, chanIDIndexBkt,
4✔
1789
                                sID,
4✔
1790
                        )
4✔
1791
                        if err != nil {
4✔
1792
                                return err
×
1793
                        }
×
1794

1795
                        if !isClosable {
8✔
1796
                                return nil
4✔
1797
                        }
4✔
1798

1799
                        // Add session to "closableSessions" list and add the
1800
                        // block height that this last channel was closed in.
1801
                        // This will be used in future to determine when we
1802
                        // should delete the session.
1803
                        var height [4]byte
4✔
1804
                        byteOrder.PutUint32(height[:], blockHeight)
4✔
1805
                        err = closableSessBkt.Put(sessDBID, height[:])
4✔
1806
                        if err != nil {
4✔
1807
                                return err
×
1808
                        }
×
1809

1810
                        closableSessions = append(closableSessions, *sID)
4✔
1811

4✔
1812
                        return nil
4✔
1813
                })
1814
        }, func() {
4✔
1815
                closableSessions = nil
4✔
1816
        })
4✔
1817
        if err != nil {
4✔
1818
                return nil, err
×
1819
        }
×
1820

1821
        return closableSessions, nil
4✔
1822
}
1823

1824
// isSessionClosable returns true if a session is considered closable. A session
1825
// is considered closable only if all the following points are true:
1826
//  1. It has no un-acked updates.
1827
//  2. It is exhausted (ie it can't accept any more updates) OR it has been
1828
//     marked as terminal.
1829
//  3. All the channels that it has acked updates for are closed.
1830
func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket,
1831
        id *SessionID) (bool, error) {
4✔
1832

4✔
1833
        sessBkt := sessionsBkt.NestedReadBucket(id[:])
4✔
1834
        if sessBkt == nil {
4✔
1835
                return false, ErrSessionNotFound
×
1836
        }
×
1837

1838
        // Since the DeleteCommittedUpdates method deletes the cSessionCommits
1839
        // bucket in one go, it is possible for the session to be closable even
1840
        // if this bucket no longer exists.
1841
        commitsBkt := sessBkt.NestedReadBucket(cSessionCommits)
4✔
1842
        if commitsBkt != nil {
8✔
1843
                // If the session has any un-acked updates, then it is not yet
4✔
1844
                // closable.
4✔
1845
                err := commitsBkt.ForEach(func(_, _ []byte) error {
4✔
1846
                        return ErrSessionHasUnackedUpdates
×
1847
                })
×
1848
                if errors.Is(err, ErrSessionHasUnackedUpdates) {
4✔
1849
                        return false, nil
×
1850
                } else if err != nil {
4✔
1851
                        return false, err
×
1852
                }
×
1853
        }
1854

1855
        session, err := getClientSessionBody(sessionsBkt, id[:])
4✔
1856
        if err != nil {
4✔
1857
                return false, err
×
1858
        }
×
1859

1860
        isTerminal := session.Status == CSessionTerminal
4✔
1861

4✔
1862
        // We have already checked that the session has no more committed
4✔
1863
        // updates. So now we can check if the session is exhausted or has a
4✔
1864
        // terminal state.
4✔
1865
        if !isTerminal && session.SeqNum < session.Policy.MaxUpdates {
8✔
1866
                // If the session is not yet exhausted, and it is not yet in a
4✔
1867
                // terminal state then it is not yet closable.
4✔
1868
                return false, nil
4✔
1869
        }
4✔
1870

1871
        // Either the acked-update bucket should exist _or_ the rogue update
1872
        // count must be equal to the session's MaxUpdates value, otherwise
1873
        // something is wrong because the above check ensures that the session
1874
        // has been exhausted.
1875
        rogueCountBytes := sessBkt.Get(cSessionRogueUpdateCount)
4✔
1876
        if len(rogueCountBytes) != 0 {
4✔
1877
                rogueCount, err := readBigSize(rogueCountBytes)
×
1878
                if err != nil {
×
1879
                        return false, err
×
1880
                }
×
1881

1882
                if rogueCount == uint64(session.Policy.MaxUpdates) {
×
1883
                        return true, nil
×
1884
                }
×
1885
        }
1886

1887
        ackedRangeBkt := sessBkt.NestedReadBucket(cSessionAckRangeIndex)
4✔
1888
        if ackedRangeBkt == nil {
4✔
1889
                if isTerminal {
×
1890
                        return true, nil
×
1891
                }
×
1892

1893
                // If the session has no acked-updates, and it is not in a
1894
                // terminal state then something is wrong since the above check
1895
                // ensures that this session has been exhausted meaning that it
1896
                // should have MaxUpdates acked updates.
1897
                return false, fmt.Errorf("no acked-updates found for "+
×
1898
                        "exhausted session %s", id)
×
1899
        }
1900

1901
        // Iterate over each of the channels that the session has acked-updates
1902
        // for. If any of those channels are not closed, then the session is
1903
        // not yet closable.
1904
        err = ackedRangeBkt.ForEach(func(dbChanID, _ []byte) error {
8✔
1905
                dbChanIDInt, err := readBigSize(dbChanID)
4✔
1906
                if err != nil {
4✔
1907
                        return err
×
1908
                }
×
1909

1910
                chanID, err := getRealChannelID(chanIDIndexBkt, dbChanIDInt)
4✔
1911
                if err != nil {
4✔
1912
                        return err
×
1913
                }
×
1914

1915
                // Get the channel details bucket for the channel.
1916
                chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
4✔
1917
                if chanDetails == nil {
4✔
1918
                        return fmt.Errorf("no channel details found for "+
×
1919
                                "channel %s referenced by session %s", chanID,
×
1920
                                id)
×
1921
                }
×
1922

1923
                // If a closed height has been set, then the channel is closed.
1924
                closedHeight := chanDetails.Get(cChanClosedHeight)
4✔
1925
                if len(closedHeight) > 0 {
8✔
1926
                        return nil
4✔
1927
                }
4✔
1928

1929
                // Otherwise, the channel is not yet closed meaning that the
1930
                // session is not yet closable. We break the ForEach by
1931
                // returning an error to indicate this.
1932
                return errSessionHasOpenChannels
×
1933
        })
1934
        if errors.Is(err, errSessionHasOpenChannels) {
4✔
1935
                return false, nil
×
1936
        } else if err != nil {
4✔
1937
                return false, err
×
1938
        }
×
1939

1940
        return true, nil
4✔
1941
}
1942

1943
// CommitUpdate persists the CommittedUpdate provided in the slot for (session,
1944
// seqNum). This allows the client to retransmit this update on startup.
1945
func (c *ClientDB) CommitUpdate(id *SessionID,
1946
        update *CommittedUpdate) (uint16, error) {
4✔
1947

4✔
1948
        var lastApplied uint16
4✔
1949
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
1950
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
1951
                if sessions == nil {
4✔
1952
                        return ErrUninitializedDB
×
1953
                }
×
1954

1955
                // We'll only load the ClientSession body for performance, since
1956
                // we primarily need to inspect its SeqNum and TowerLastApplied
1957
                // fields. The CommittedUpdates will be modified on disk
1958
                // directly.
1959
                session, err := getClientSessionBody(sessions, id[:])
4✔
1960
                if err != nil {
4✔
1961
                        return err
×
1962
                }
×
1963

1964
                // Can't fail if the above didn't fail.
1965
                sessionBkt := sessions.NestedReadWriteBucket(id[:])
4✔
1966

4✔
1967
                // Ensure the session commits sub-bucket is initialized.
4✔
1968
                sessionCommits, err := sessionBkt.CreateBucketIfNotExists(
4✔
1969
                        cSessionCommits,
4✔
1970
                )
4✔
1971
                if err != nil {
4✔
1972
                        return err
×
1973
                }
×
1974

1975
                var seqNumBuf [2]byte
4✔
1976
                byteOrder.PutUint16(seqNumBuf[:], update.SeqNum)
4✔
1977

4✔
1978
                // Check to see if a committed update already exists for this
4✔
1979
                // sequence number.
4✔
1980
                committedUpdateBytes := sessionCommits.Get(seqNumBuf[:])
4✔
1981
                if committedUpdateBytes != nil {
4✔
1982
                        var dbUpdate CommittedUpdate
×
1983
                        err := dbUpdate.Decode(
×
1984
                                bytes.NewReader(committedUpdateBytes),
×
1985
                        )
×
1986
                        if err != nil {
×
1987
                                return err
×
1988
                        }
×
1989

1990
                        // If an existing committed update has a different hint,
1991
                        // we'll reject this newer update.
1992
                        if dbUpdate.Hint != update.Hint {
×
1993
                                return ErrUpdateAlreadyCommitted
×
1994
                        }
×
1995

1996
                        // Otherwise, capture the last applied value and
1997
                        // succeed.
1998
                        lastApplied = session.TowerLastApplied
×
1999
                        return nil
×
2000
                }
2001

2002
                // There's no committed update for this sequence number, ensure
2003
                // that we are committing the next unallocated one.
2004
                if update.SeqNum != session.SeqNum+1 {
4✔
2005
                        return ErrCommitUnorderedUpdate
×
2006
                }
×
2007

2008
                // Increment the session's sequence number and store the updated
2009
                // client session.
2010
                //
2011
                // TODO(conner): split out seqnum and last applied own bucket to
2012
                // eliminate serialization of full struct during CommitUpdate?
2013
                // Can also read/write directly to byes [:2] without migration.
2014
                session.SeqNum++
4✔
2015
                err = putClientSessionBody(sessionBkt, session)
4✔
2016
                if err != nil {
4✔
2017
                        return err
×
2018
                }
×
2019

2020
                // Encode and store the committed update in the sessionCommits
2021
                // sub-bucket under the requested sequence number.
2022
                var b bytes.Buffer
4✔
2023
                err = update.Encode(&b)
4✔
2024
                if err != nil {
4✔
2025
                        return err
×
2026
                }
×
2027

2028
                err = sessionCommits.Put(seqNumBuf[:], b.Bytes())
4✔
2029
                if err != nil {
4✔
2030
                        return err
×
2031
                }
×
2032

2033
                // Update the channel's max commitment height if needed.
2034
                err = maybeUpdateMaxCommitHeight(tx, update.BackupID)
4✔
2035
                if err != nil {
4✔
2036
                        return err
×
2037
                }
×
2038

2039
                // Finally, capture the session's last applied value so it can
2040
                // be sent in the next state update to the tower.
2041
                lastApplied = session.TowerLastApplied
4✔
2042

4✔
2043
                return nil
4✔
2044
        }, func() {
4✔
2045
                lastApplied = 0
4✔
2046
        })
4✔
2047
        if err != nil {
4✔
2048
                return 0, err
×
2049
        }
×
2050

2051
        return lastApplied, nil
4✔
2052
}
2053

2054
// AckUpdate persists an acknowledgment for a given (session, seqnum) pair. This
2055
// removes the update from the set of committed updates, and validates the
2056
// lastApplied value returned from the tower.
2057
func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
2058
        lastApplied uint16) error {
4✔
2059

4✔
2060
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
2061
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
2062
                if sessions == nil {
4✔
2063
                        return ErrUninitializedDB
×
2064
                }
×
2065

2066
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
2067
                if chanDetailsBkt == nil {
4✔
2068
                        return ErrUninitializedDB
×
2069
                }
×
2070

2071
                // We'll only load the ClientSession body for performance, since
2072
                // we primarily need to inspect its SeqNum and TowerLastApplied
2073
                // fields. The CommittedUpdates and AckedUpdates will be
2074
                // modified on disk directly.
2075
                session, err := getClientSessionBody(sessions, id[:])
4✔
2076
                if err != nil {
4✔
2077
                        return err
×
2078
                }
×
2079

2080
                // If the tower has acked a sequence number beyond our highest
2081
                // sequence number, fail.
2082
                if lastApplied > session.SeqNum {
4✔
2083
                        return ErrUnallocatedLastApplied
×
2084
                }
×
2085

2086
                // If the tower acked with a lower sequence number than it gave
2087
                // us prior, fail.
2088
                if lastApplied < session.TowerLastApplied {
4✔
2089
                        return ErrLastAppliedReversion
×
2090
                }
×
2091

2092
                // TODO(conner): split out seqnum and last applied own bucket to
2093
                // eliminate serialization of full struct during AckUpdate?  Can
2094
                // also read/write directly to byes [2:4] without migration.
2095
                session.TowerLastApplied = lastApplied
4✔
2096

4✔
2097
                // Can't fail because getClientSession succeeded.
4✔
2098
                sessionBkt := sessions.NestedReadWriteBucket(id[:])
4✔
2099

4✔
2100
                // Write the client session with the updated last applied value.
4✔
2101
                err = putClientSessionBody(sessionBkt, session)
4✔
2102
                if err != nil {
4✔
2103
                        return err
×
2104
                }
×
2105

2106
                // If the commits sub-bucket doesn't exist, there can't possibly
2107
                // be a corresponding committed update to remove.
2108
                sessionCommits := sessionBkt.NestedReadWriteBucket(
4✔
2109
                        cSessionCommits,
4✔
2110
                )
4✔
2111
                if sessionCommits == nil {
4✔
2112
                        return ErrCommittedUpdateNotFound
×
2113
                }
×
2114

2115
                var seqNumBuf [2]byte
4✔
2116
                byteOrder.PutUint16(seqNumBuf[:], seqNum)
4✔
2117

4✔
2118
                // Assert that a committed update exists for this sequence
4✔
2119
                // number.
4✔
2120
                committedUpdateBytes := sessionCommits.Get(seqNumBuf[:])
4✔
2121
                if committedUpdateBytes == nil {
4✔
2122
                        return ErrCommittedUpdateNotFound
×
2123
                }
×
2124

2125
                var committedUpdate CommittedUpdate
4✔
2126
                err = committedUpdate.Decode(
4✔
2127
                        bytes.NewReader(committedUpdateBytes),
4✔
2128
                )
4✔
2129
                if err != nil {
4✔
2130
                        return err
×
2131
                }
×
2132

2133
                // Remove the corresponding committed update.
2134
                err = sessionCommits.Delete(seqNumBuf[:])
4✔
2135
                if err != nil {
4✔
2136
                        return err
×
2137
                }
×
2138

2139
                dbSessionID, dbSessIDBytes, err := getDBSessionID(sessions, *id)
4✔
2140
                if err != nil {
4✔
2141
                        return err
×
2142
                }
×
2143

2144
                chanID := committedUpdate.BackupID.ChanID
4✔
2145
                height := committedUpdate.BackupID.CommitHeight
4✔
2146

4✔
2147
                // Get the DB representation of the channel-ID. There is a
4✔
2148
                // chance that the channel corresponding to this update has been
4✔
2149
                // closed and that the details for this channel no longer exist
4✔
2150
                // in the tower client DB. In that case, we consider this a
4✔
2151
                // rogue update and all we do is make sure to keep track of the
4✔
2152
                // number of rogue updates for this session.
4✔
2153
                _, dbChanIDBytes, err := getDBChanID(chanDetailsBkt, chanID)
4✔
2154
                if errors.Is(err, ErrChannelNotRegistered) {
4✔
2155
                        var (
×
2156
                                count uint64
×
2157
                                err   error
×
2158
                        )
×
2159

×
2160
                        rogueCountBytes := sessionBkt.Get(
×
2161
                                cSessionRogueUpdateCount,
×
2162
                        )
×
2163
                        if len(rogueCountBytes) != 0 {
×
2164
                                count, err = readBigSize(rogueCountBytes)
×
2165
                                if err != nil {
×
2166
                                        return err
×
2167
                                }
×
2168
                        }
2169

2170
                        rogueCount := count + 1
×
2171
                        countBytes, err := writeBigSize(rogueCount)
×
2172
                        if err != nil {
×
2173
                                return err
×
2174
                        }
×
2175

2176
                        err = sessionBkt.Put(
×
2177
                                cSessionRogueUpdateCount, countBytes,
×
2178
                        )
×
2179
                        if err != nil {
×
2180
                                return err
×
2181
                        }
×
2182

2183
                        // In the rare chance that this session only has rogue
2184
                        // updates, we check here if the count is equal to the
2185
                        // MaxUpdate of the session. If it is, then we mark the
2186
                        // session as closable.
2187
                        if rogueCount != uint64(session.Policy.MaxUpdates) {
×
2188
                                return nil
×
2189
                        }
×
2190

2191
                        // Before we mark the session as closable, we do a
2192
                        // sanity check to ensure that this session has no
2193
                        // acked-update index.
2194
                        sessionAckRanges := sessionBkt.NestedReadBucket(
×
2195
                                cSessionAckRangeIndex,
×
2196
                        )
×
2197
                        if sessionAckRanges != nil {
×
2198
                                return fmt.Errorf("session(%s) has an "+
×
2199
                                        "acked ranges index but has a rogue "+
×
2200
                                        "count indicating saturation",
×
2201
                                        session.ID)
×
2202
                        }
×
2203

2204
                        closableSessBkt := tx.ReadWriteBucket(
×
2205
                                cClosableSessionsBkt,
×
2206
                        )
×
2207
                        if closableSessBkt == nil {
×
2208
                                return ErrUninitializedDB
×
2209
                        }
×
2210

2211
                        var height [4]byte
×
2212
                        byteOrder.PutUint32(height[:], 0)
×
2213

×
2214
                        return closableSessBkt.Put(dbSessIDBytes, height[:])
×
2215
                } else if err != nil {
4✔
2216
                        return err
×
2217
                }
×
2218

2219
                // Get the ranges write bucket before getting the range index to
2220
                // ensure that the session acks sub-bucket is initialized, so
2221
                // that we can insert an entry.
2222
                rangesBkt, err := getRangesWriteBucket(
4✔
2223
                        sessionBkt, dbChanIDBytes,
4✔
2224
                )
4✔
2225
                if err != nil {
4✔
2226
                        return err
×
2227
                }
×
2228

2229
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(
4✔
2230
                        committedUpdate.BackupID.ChanID[:],
4✔
2231
                )
4✔
2232
                if chanDetails == nil {
4✔
2233
                        return ErrChannelNotRegistered
×
2234
                }
×
2235

2236
                err = putChannelToSessionMapping(chanDetails, dbSessionID)
4✔
2237
                if err != nil {
4✔
2238
                        return err
×
2239
                }
×
2240

2241
                // Get the range index for the given session-channel pair.
2242
                index, err := c.getRangeIndex(tx, *id, chanID)
4✔
2243
                if err != nil {
4✔
2244
                        return err
×
2245
                }
×
2246

2247
                return index.Add(height, rangesBkt)
4✔
2248
        }, func() {})
4✔
2249
}
2250

2251
// GetDBQueue returns a BackupID Queue instance under the given namespace.
2252
func (c *ClientDB) GetDBQueue(namespace []byte) Queue[*BackupID] {
4✔
2253
        return NewQueueDB(
4✔
2254
                c.db, namespace, func() *BackupID {
8✔
2255
                        return &BackupID{}
4✔
2256
                }, func(tx kvdb.RwTx, item *BackupID) error {
4✔
2257
                        return maybeUpdateMaxCommitHeight(tx, *item)
×
2258
                },
×
2259
        )
2260
}
2261

2262
// TerminateSession sets the given session's status to CSessionTerminal meaning
2263
// that it will not be usable again. An error will be returned if the given
2264
// session still has un-acked updates that should be attended to.
2265
func (c *ClientDB) TerminateSession(id SessionID) error {
4✔
2266
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
2267
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
2268
                if sessions == nil {
4✔
2269
                        return ErrUninitializedDB
×
2270
                }
×
2271

2272
                sessionsBkt := tx.ReadBucket(cSessionBkt)
4✔
2273
                if sessionsBkt == nil {
4✔
2274
                        return ErrUninitializedDB
×
2275
                }
×
2276

2277
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
2278
                if chanIDIndexBkt == nil {
4✔
2279
                        return ErrUninitializedDB
×
2280
                }
×
2281

2282
                // Collect any un-acked updates for this session.
2283
                committedUpdateCount := make(map[SessionID]uint16)
4✔
2284
                perCommittedUpdate := func(s *ClientSession,
4✔
2285
                        _ *CommittedUpdate) {
4✔
2286

×
2287
                        committedUpdateCount[s.ID]++
×
2288
                }
×
2289

2290
                session, err := c.getClientSession(
4✔
2291
                        sessionsBkt, chanIDIndexBkt, id[:],
4✔
2292
                        WithPerCommittedUpdate(perCommittedUpdate),
4✔
2293
                )
4✔
2294
                if err != nil {
4✔
2295
                        return err
×
2296
                }
×
2297

2298
                // If there are any un-acked updates for this session then
2299
                // we don't allow the change of status as these updates must
2300
                // first be dealt with somehow.
2301
                if committedUpdateCount[id] > 0 {
4✔
2302
                        return ErrSessionHasUnackedUpdates
×
2303
                }
×
2304

2305
                return markSessionStatus(sessions, session, CSessionTerminal)
4✔
2306
        }, func() {})
4✔
2307
}
2308

2309
// DeleteCommittedUpdates deletes all the committed updates for the given
2310
// session.
2311
func (c *ClientDB) DeleteCommittedUpdates(id *SessionID) error {
4✔
2312
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
2313
                sessions := tx.ReadWriteBucket(cSessionBkt)
4✔
2314
                if sessions == nil {
4✔
2315
                        return ErrUninitializedDB
×
2316
                }
×
2317

2318
                sessionBkt := sessions.NestedReadWriteBucket(id[:])
4✔
2319
                if sessionBkt == nil {
4✔
2320
                        return fmt.Errorf("session bucket %s not found",
×
2321
                                id.String())
×
2322
                }
×
2323

2324
                // If the commits sub-bucket doesn't exist, there can't possibly
2325
                // be corresponding updates to remove.
2326
                sessionCommits := sessionBkt.NestedReadWriteBucket(
4✔
2327
                        cSessionCommits,
4✔
2328
                )
4✔
2329
                if sessionCommits == nil {
8✔
2330
                        return nil
4✔
2331
                }
4✔
2332

2333
                // errFoundUpdates is an error we will use to exit early from
2334
                // the ForEach loop. The return of this error means that at
2335
                // least one committed update exists.
2336
                var errFoundUpdates = fmt.Errorf("found committed updates")
4✔
2337
                err := sessionCommits.ForEach(func(k, v []byte) error {
4✔
2338
                        return errFoundUpdates
×
2339
                })
×
2340
                switch {
4✔
2341
                // If the errFoundUpdates signal error was returned then there
2342
                // are some updates that need to be deleted.
2343
                case errors.Is(err, errFoundUpdates):
×
2344

2345
                // If no error is returned then the ForEach call back was never
2346
                // entered meaning that there are no un-acked committed updates.
2347
                // So we can exit now as there is nothing left to do.
2348
                case err == nil:
4✔
2349
                        return nil
4✔
2350

2351
                // If an expected error is returned, return that error.
2352
                default:
×
2353
                        return err
×
2354
                }
2355

2356
                session, err := getClientSessionBody(sessions, id[:])
×
2357
                if err != nil {
×
2358
                        return err
×
2359
                }
×
2360

2361
                // Once we delete a committed update from the session, the
2362
                // SeqNum of the session will be incorrect and so the session
2363
                // should be marked as terminal.
2364
                session.Status = CSessionTerminal
×
2365
                err = putClientSessionBody(sessionBkt, session)
×
2366
                if err != nil {
×
2367
                        return err
×
2368
                }
×
2369

2370
                // Delete all the committed updates in one go by deleting the
2371
                // session commits bucket.
2372
                return sessionBkt.DeleteNestedBucket(cSessionCommits)
×
2373
        }, func() {})
4✔
2374
}
2375

2376
// putChannelToSessionMapping adds the given session ID to a channel's
2377
// cChanSessions bucket.
2378
func putChannelToSessionMapping(chanDetails kvdb.RwBucket,
2379
        dbSessID uint64) error {
4✔
2380

4✔
2381
        chanSessIDsBkt, err := chanDetails.CreateBucketIfNotExists(
4✔
2382
                cChanSessions,
4✔
2383
        )
4✔
2384
        if err != nil {
4✔
2385
                return err
×
2386
        }
×
2387

2388
        b, err := writeBigSize(dbSessID)
4✔
2389
        if err != nil {
4✔
2390
                return err
×
2391
        }
×
2392

2393
        return chanSessIDsBkt.Put(b, []byte{1})
4✔
2394
}
2395

2396
// getClientSessionBody loads the body of a ClientSession from the sessions
2397
// bucket corresponding to the serialized session id. This does not deserialize
2398
// the CommittedUpdates, AckUpdates or the Tower associated with the session.
2399
// If the caller requires this info, use getClientSession.
2400
func getClientSessionBody(sessions kvdb.RBucket,
2401
        idBytes []byte) (*ClientSession, error) {
4✔
2402

4✔
2403
        sessionBkt := sessions.NestedReadBucket(idBytes)
4✔
2404
        if sessionBkt == nil {
4✔
2405
                return nil, ErrClientSessionNotFound
×
2406
        }
×
2407

2408
        // Should never have a sessionBkt without also having its body.
2409
        sessionBody := sessionBkt.Get(cSessionBody)
4✔
2410
        if sessionBody == nil {
4✔
2411
                return nil, ErrCorruptClientSession
×
2412
        }
×
2413

2414
        var session ClientSession
4✔
2415
        copy(session.ID[:], idBytes)
4✔
2416

4✔
2417
        err := session.Decode(bytes.NewReader(sessionBody))
4✔
2418
        if err != nil {
4✔
2419
                return nil, err
×
2420
        }
×
2421

2422
        return &session, nil
4✔
2423
}
2424

2425
// ClientSessionFilterFn describes the signature of a callback function that can
2426
// be used to filter the sessions that are returned in any of the DB methods
2427
// that read sessions from the DB.
2428
type ClientSessionFilterFn func(*ClientSession) bool
2429

2430
// ClientSessWithNumCommittedUpdatesFilterFn describes the signature of a
2431
// callback function that can be used to filter out a session based on the
2432
// contents of ClientSession along with the number of un-acked committed updates
2433
// that the session has.
2434
type ClientSessWithNumCommittedUpdatesFilterFn func(*ClientSession, uint16) bool
2435

2436
// PerMaxHeightCB describes the signature of a callback function that can be
2437
// called for each channel that a session has updates for to communicate the
2438
// maximum commitment height that the session has backed up for the channel.
2439
type PerMaxHeightCB func(*ClientSession, lnwire.ChannelID, uint64)
2440

2441
// PerNumAckedUpdatesCB describes the signature of a callback function that can
2442
// be called for each channel that a session has updates for to communicate the
2443
// number of updates that the session has for the channel.
2444
type PerNumAckedUpdatesCB func(*ClientSession, lnwire.ChannelID, uint16)
2445

2446
// PerRogueUpdateCountCB describes the signature of a callback function that can
2447
// be called for each session with the number of rogue updates that the session
2448
// has.
2449
type PerRogueUpdateCountCB func(*ClientSession, uint16)
2450

2451
// PerAckedUpdateCB describes the signature of a callback function that can be
2452
// called for each of a session's acked updates.
2453
type PerAckedUpdateCB func(*ClientSession, uint16, BackupID)
2454

2455
// PerCommittedUpdateCB describes the signature of a callback function that can
2456
// be called for each of a session's committed updates (updates that the client
2457
// has not yet received an ACK for).
2458
type PerCommittedUpdateCB func(*ClientSession, *CommittedUpdate)
2459

2460
// ClientSessionListOption describes the signature of a functional option that
2461
// can be used when listing client sessions in order to provide any extra
2462
// instruction to the query.
2463
type ClientSessionListOption func(cfg *ClientSessionListCfg)
2464

2465
// ClientSessionListCfg defines various query parameters that will be used when
2466
// querying the DB for client sessions.
2467
type ClientSessionListCfg struct {
2468
        // PerNumAckedUpdates will, if set, be called for each of the session's
2469
        // channels to communicate the number of updates stored for that
2470
        // channel.
2471
        PerNumAckedUpdates PerNumAckedUpdatesCB
2472

2473
        // PerRogueUpdateCount will, if set, be called with the number of rogue
2474
        // updates that the session has backed up.
2475
        PerRogueUpdateCount PerRogueUpdateCountCB
2476

2477
        // PerMaxHeight will, if set, be called for each of the session's
2478
        // channels to communicate the highest commit height of updates stored
2479
        // for that channel.
2480
        PerMaxHeight PerMaxHeightCB
2481

2482
        // PerCommittedUpdate will, if set, be called for each of the session's
2483
        // committed (un-acked) updates.
2484
        PerCommittedUpdate PerCommittedUpdateCB
2485

2486
        // PreEvaluateFilterFn will be run after loading a session from the DB
2487
        // and _before_ any of the other call-back functions in
2488
        // ClientSessionListCfg. Therefore, if a session fails this filter
2489
        // function, then it will not be passed to any of the other call backs
2490
        // and won't be included in the return list.
2491
        PreEvaluateFilterFn ClientSessionFilterFn
2492

2493
        // PostEvaluateFilterFn will be run _after_ all the other call-back
2494
        // functions in ClientSessionListCfg. If a session fails this filter
2495
        // function then all it means is that it won't be included in the list
2496
        // of sessions to return.
2497
        PostEvaluateFilterFn ClientSessWithNumCommittedUpdatesFilterFn
2498
}
2499

2500
// NewClientSessionCfg constructs a new ClientSessionListCfg.
2501
func NewClientSessionCfg() *ClientSessionListCfg {
4✔
2502
        return &ClientSessionListCfg{}
4✔
2503
}
4✔
2504

2505
// WithPerMaxHeight constructs a functional option that will set a call-back
2506
// function to be called for each of a session's channels to communicate the
2507
// maximum commitment height that the session has stored for the channel.
2508
func WithPerMaxHeight(cb PerMaxHeightCB) ClientSessionListOption {
×
2509
        return func(cfg *ClientSessionListCfg) {
×
2510
                cfg.PerMaxHeight = cb
×
2511
        }
×
2512
}
2513

2514
// WithPerNumAckedUpdates constructs a functional option that will set a
2515
// call-back function to be called for each of a session's channels to
2516
// communicate the number of updates that the session has stored for the
2517
// channel.
2518
func WithPerNumAckedUpdates(cb PerNumAckedUpdatesCB) ClientSessionListOption {
4✔
2519
        return func(cfg *ClientSessionListCfg) {
8✔
2520
                cfg.PerNumAckedUpdates = cb
4✔
2521
        }
4✔
2522
}
2523

2524
// WithPerRogueUpdateCount constructs a functional option that will set a
2525
// call-back function to be called with the number of rogue updates that the
2526
// session has backed up.
2527
func WithPerRogueUpdateCount(cb PerRogueUpdateCountCB) ClientSessionListOption {
4✔
2528
        return func(cfg *ClientSessionListCfg) {
8✔
2529
                cfg.PerRogueUpdateCount = cb
4✔
2530
        }
4✔
2531
}
2532

2533
// WithPerCommittedUpdate constructs a functional option that will set a
2534
// call-back function to be called for each of a client's un-acked updates.
2535
func WithPerCommittedUpdate(cb PerCommittedUpdateCB) ClientSessionListOption {
4✔
2536
        return func(cfg *ClientSessionListCfg) {
8✔
2537
                cfg.PerCommittedUpdate = cb
4✔
2538
        }
4✔
2539
}
2540

2541
// WithPreEvalFilterFn constructs a functional option that will set a call-back
2542
// function that will be called immediately after loading a session. If the
2543
// session fails this filter function, then it will not be passed to any of the
2544
// other evaluation call-back functions.
2545
func WithPreEvalFilterFn(fn ClientSessionFilterFn) ClientSessionListOption {
4✔
2546
        return func(cfg *ClientSessionListCfg) {
8✔
2547
                cfg.PreEvaluateFilterFn = fn
4✔
2548
        }
4✔
2549
}
2550

2551
// WithPostEvalFilterFn constructs a functional option that will set a call-back
2552
// function that will be used to determine if a session should be included in
2553
// the returned list. This differs from WithPreEvalFilterFn since that call-back
2554
// is used to determine if the session should be evaluated at all (and thus
2555
// run against the other ClientSessionListCfg call-backs) whereas the session
2556
// will only reach the PostEvalFilterFn call-back once it has already been
2557
// evaluated by all the other call-backs.
2558
func WithPostEvalFilterFn(
2559
        fn ClientSessWithNumCommittedUpdatesFilterFn) ClientSessionListOption {
4✔
2560

4✔
2561
        return func(cfg *ClientSessionListCfg) {
8✔
2562
                cfg.PostEvaluateFilterFn = fn
4✔
2563
        }
4✔
2564
}
2565

2566
// getClientSession loads the full ClientSession associated with the serialized
2567
// session id. This method populates the CommittedUpdates, AckUpdates and Tower
2568
// in addition to the ClientSession's body.
2569
func (c *ClientDB) getClientSession(sessionsBkt, chanIDIndexBkt kvdb.RBucket,
2570
        idBytes []byte, opts ...ClientSessionListOption) (*ClientSession,
2571
        error) {
4✔
2572

4✔
2573
        cfg := NewClientSessionCfg()
4✔
2574
        for _, o := range opts {
8✔
2575
                o(cfg)
4✔
2576
        }
4✔
2577

2578
        session, err := getClientSessionBody(sessionsBkt, idBytes)
4✔
2579
        if err != nil {
4✔
2580
                return nil, err
×
2581
        }
×
2582

2583
        if cfg.PreEvaluateFilterFn != nil && !cfg.PreEvaluateFilterFn(session) {
8✔
2584
                return nil, ErrSessionFailedFilterFn
4✔
2585
        }
4✔
2586

2587
        // Can't fail because client session body has already been read.
2588
        sessionBkt := sessionsBkt.NestedReadBucket(idBytes)
4✔
2589

4✔
2590
        // Pass the session's committed (un-acked) updates through the call-back
4✔
2591
        // if one is provided.
4✔
2592
        numCommittedUpdates, err := filterClientSessionCommits(
4✔
2593
                sessionBkt, session, cfg.PerCommittedUpdate,
4✔
2594
        )
4✔
2595
        if err != nil {
4✔
2596
                return nil, err
×
2597
        }
×
2598

2599
        // Pass the session's acked updates through the call-back if one is
2600
        // provided.
2601
        err = c.filterClientSessionAcks(
4✔
2602
                sessionBkt, chanIDIndexBkt, session, cfg.PerMaxHeight,
4✔
2603
                cfg.PerNumAckedUpdates, cfg.PerRogueUpdateCount,
4✔
2604
        )
4✔
2605
        if err != nil {
4✔
2606
                return nil, err
×
2607
        }
×
2608

2609
        if cfg.PostEvaluateFilterFn != nil &&
4✔
2610
                !cfg.PostEvaluateFilterFn(session, numCommittedUpdates) {
4✔
2611

×
2612
                return nil, ErrSessionFailedFilterFn
×
2613
        }
×
2614

2615
        return session, nil
4✔
2616
}
2617

2618
// getClientSessionCommits retrieves all committed updates for the session
2619
// identified by the serialized session id. If a PerCommittedUpdateCB is
2620
// provided, then it will be called for each of the session's committed updates.
2621
func getClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession,
2622
        cb PerCommittedUpdateCB) ([]CommittedUpdate, error) {
4✔
2623

4✔
2624
        // Initialize committedUpdates so that we can return an initialized map
4✔
2625
        // if no committed updates exist.
4✔
2626
        committedUpdates := make([]CommittedUpdate, 0)
4✔
2627

4✔
2628
        sessionCommits := sessionBkt.NestedReadBucket(cSessionCommits)
4✔
2629
        if sessionCommits == nil {
8✔
2630
                return committedUpdates, nil
4✔
2631
        }
4✔
2632

2633
        err := sessionCommits.ForEach(func(k, v []byte) error {
4✔
2634
                var committedUpdate CommittedUpdate
×
2635
                err := committedUpdate.Decode(bytes.NewReader(v))
×
2636
                if err != nil {
×
2637
                        return err
×
2638
                }
×
2639
                committedUpdate.SeqNum = byteOrder.Uint16(k)
×
2640

×
2641
                committedUpdates = append(committedUpdates, committedUpdate)
×
2642

×
2643
                if cb != nil {
×
2644
                        cb(s, &committedUpdate)
×
2645
                }
×
2646

2647
                return nil
×
2648
        })
2649
        if err != nil {
4✔
2650
                return nil, err
×
2651
        }
×
2652

2653
        return committedUpdates, nil
4✔
2654
}
2655

2656
// filterClientSessionAcks retrieves all acked updates for the session
2657
// identified by the serialized session id and passes them to the provided
2658
// call back if one is provided.
2659
func (c *ClientDB) filterClientSessionAcks(sessionBkt,
2660
        chanIDIndexBkt kvdb.RBucket, s *ClientSession, perMaxCb PerMaxHeightCB,
2661
        perNumAckedUpdates PerNumAckedUpdatesCB,
2662
        perRogueUpdateCount PerRogueUpdateCountCB) error {
4✔
2663

4✔
2664
        if perRogueUpdateCount != nil {
8✔
2665
                var (
4✔
2666
                        count uint64
4✔
2667
                        err   error
4✔
2668
                )
4✔
2669
                rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
4✔
2670
                if len(rogueCountBytes) != 0 {
4✔
2671
                        count, err = readBigSize(rogueCountBytes)
×
2672
                        if err != nil {
×
2673
                                return err
×
2674
                        }
×
2675
                }
2676

2677
                perRogueUpdateCount(s, uint16(count))
4✔
2678
        }
2679

2680
        if perMaxCb == nil && perNumAckedUpdates == nil {
8✔
2681
                return nil
4✔
2682
        }
4✔
2683

2684
        sessionAcksRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
4✔
2685
        if sessionAcksRanges == nil {
8✔
2686
                return nil
4✔
2687
        }
4✔
2688

2689
        return sessionAcksRanges.ForEach(func(dbChanID, _ []byte) error {
8✔
2690
                rangeBkt := sessionAcksRanges.NestedReadBucket(dbChanID)
4✔
2691
                if rangeBkt == nil {
4✔
2692
                        return nil
×
2693
                }
×
2694

2695
                index, err := readRangeIndex(rangeBkt)
4✔
2696
                if err != nil {
4✔
2697
                        return err
×
2698
                }
×
2699

2700
                chanIDBytes := chanIDIndexBkt.Get(dbChanID)
4✔
2701
                var chanID lnwire.ChannelID
4✔
2702
                copy(chanID[:], chanIDBytes)
4✔
2703

4✔
2704
                if perMaxCb != nil {
4✔
2705
                        perMaxCb(s, chanID, index.MaxHeight())
×
2706
                }
×
2707

2708
                if perNumAckedUpdates != nil {
8✔
2709
                        perNumAckedUpdates(s, chanID, uint16(index.NumInSet()))
4✔
2710
                }
4✔
2711
                return nil
4✔
2712
        })
2713
}
2714

2715
// filterClientSessionCommits retrieves all committed updates for the session
2716
// identified by the serialized session id and passes them to the given
2717
// PerCommittedUpdateCB callback.
2718
func filterClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession,
2719
        cb PerCommittedUpdateCB) (uint16, error) {
4✔
2720

4✔
2721
        sessionCommits := sessionBkt.NestedReadBucket(cSessionCommits)
4✔
2722
        if sessionCommits == nil {
8✔
2723
                return 0, nil
4✔
2724
        }
4✔
2725

2726
        var numUpdates uint16
4✔
2727
        err := sessionCommits.ForEach(func(k, v []byte) error {
4✔
2728
                numUpdates++
×
2729

×
2730
                if cb == nil {
×
2731
                        return nil
×
2732
                }
×
2733

2734
                var committedUpdate CommittedUpdate
×
2735
                err := committedUpdate.Decode(bytes.NewReader(v))
×
2736
                if err != nil {
×
2737
                        return err
×
2738
                }
×
2739
                committedUpdate.SeqNum = byteOrder.Uint16(k)
×
2740

×
2741
                cb(s, &committedUpdate)
×
2742

×
2743
                return nil
×
2744
        })
2745
        if err != nil {
4✔
2746
                return 0, err
×
2747
        }
×
2748

2749
        return numUpdates, nil
4✔
2750
}
2751

2752
// putClientSessionBody stores the body of the ClientSession (everything but the
2753
// CommittedUpdates and AckedUpdates).
2754
func putClientSessionBody(sessionBkt kvdb.RwBucket,
2755
        session *ClientSession) error {
4✔
2756

4✔
2757
        var b bytes.Buffer
4✔
2758
        err := session.Encode(&b)
4✔
2759
        if err != nil {
4✔
2760
                return err
×
2761
        }
×
2762

2763
        return sessionBkt.Put(cSessionBody, b.Bytes())
4✔
2764
}
2765

2766
// markSessionStatus updates the persisted state of the session to the new
2767
// status.
2768
func markSessionStatus(sessions kvdb.RwBucket, session *ClientSession,
2769
        status CSessionStatus) error {
4✔
2770

4✔
2771
        sessionBkt, err := sessions.CreateBucketIfNotExists(session.ID[:])
4✔
2772
        if err != nil {
4✔
2773
                return err
×
2774
        }
×
2775

2776
        session.Status = status
4✔
2777

4✔
2778
        return putClientSessionBody(sessionBkt, session)
4✔
2779
}
2780

2781
// getChanSummary loads a ClientChanSummary for the passed chanID.
2782
func getChanSummary(chanDetails kvdb.RBucket) (*ClientChanSummary, error) {
4✔
2783
        chanSummaryBytes := chanDetails.Get(cChannelSummary)
4✔
2784
        if chanSummaryBytes == nil {
4✔
2785
                return nil, ErrChannelNotRegistered
×
2786
        }
×
2787

2788
        var summary ClientChanSummary
4✔
2789
        err := summary.Decode(bytes.NewReader(chanSummaryBytes))
4✔
2790
        if err != nil {
4✔
2791
                return nil, err
×
2792
        }
×
2793

2794
        return &summary, nil
4✔
2795
}
2796

2797
// putChanSummary stores a ClientChanSummary for the passed chanID.
2798
func putChanSummary(chanDetails kvdb.RwBucket,
2799
        summary *ClientChanSummary) error {
4✔
2800

4✔
2801
        var b bytes.Buffer
4✔
2802
        err := summary.Encode(&b)
4✔
2803
        if err != nil {
4✔
2804
                return err
×
2805
        }
×
2806

2807
        return chanDetails.Put(cChannelSummary, b.Bytes())
4✔
2808
}
2809

2810
// getTower loads a Tower identified by its serialized tower id.
2811
func getTower(towers kvdb.RBucket, id []byte) (*Tower, error) {
4✔
2812
        towerBytes := towers.Get(id)
4✔
2813
        if towerBytes == nil {
4✔
2814
                return nil, ErrTowerNotFound
×
2815
        }
×
2816

2817
        var tower Tower
4✔
2818
        err := tower.Decode(bytes.NewReader(towerBytes))
4✔
2819
        if err != nil {
4✔
2820
                return nil, err
×
2821
        }
×
2822

2823
        tower.ID = TowerIDFromBytes(id)
4✔
2824

4✔
2825
        return &tower, nil
4✔
2826
}
2827

2828
// putTower stores a Tower identified by its serialized tower id.
2829
func putTower(towers kvdb.RwBucket, tower *Tower) error {
4✔
2830
        var b bytes.Buffer
4✔
2831
        err := tower.Encode(&b)
4✔
2832
        if err != nil {
4✔
2833
                return err
×
2834
        }
×
2835

2836
        return towers.Put(tower.ID.Bytes(), b.Bytes())
4✔
2837
}
2838

2839
// getDBChanID returns the db-assigned channel ID for the given real channel ID.
2840
// It returns both the uint64 and byte representation.
2841
func getDBChanID(chanDetailsBkt kvdb.RBucket, chanID lnwire.ChannelID) (uint64,
2842
        []byte, error) {
4✔
2843

4✔
2844
        chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
4✔
2845
        if chanDetails == nil {
4✔
2846
                return 0, nil, ErrChannelNotRegistered
×
2847
        }
×
2848

2849
        idBytes := chanDetails.Get(cChanDBID)
4✔
2850
        if len(idBytes) == 0 {
4✔
2851
                return 0, nil, fmt.Errorf("no db-assigned ID found for "+
×
2852
                        "channel ID %s", chanID)
×
2853
        }
×
2854

2855
        id, err := readBigSize(idBytes)
4✔
2856
        if err != nil {
4✔
2857
                return 0, nil, err
×
2858
        }
×
2859

2860
        return id, idBytes, nil
4✔
2861
}
2862

2863
// getDBSessionID returns the db-assigned session ID for the given real session
2864
// ID. It returns both the uint64 and byte representation.
2865
func getDBSessionID(sessionsBkt kvdb.RBucket, sessionID SessionID) (uint64,
2866
        []byte, error) {
4✔
2867

4✔
2868
        sessionBkt := sessionsBkt.NestedReadBucket(sessionID[:])
4✔
2869
        if sessionBkt == nil {
4✔
2870
                return 0, nil, ErrClientSessionNotFound
×
2871
        }
×
2872

2873
        idBytes := sessionBkt.Get(cSessionDBID)
4✔
2874
        if len(idBytes) == 0 {
4✔
2875
                return 0, nil, fmt.Errorf("no db-assigned ID found for "+
×
2876
                        "session ID %s", sessionID)
×
2877
        }
×
2878

2879
        id, err := readBigSize(idBytes)
4✔
2880
        if err != nil {
4✔
2881
                return 0, nil, err
×
2882
        }
×
2883

2884
        return id, idBytes, nil
4✔
2885
}
2886

2887
// maybeUpdateMaxCommitHeight updates the given channel details bucket with the
2888
// given height if it is larger than the current max height stored for the
2889
// channel.
2890
func maybeUpdateMaxCommitHeight(tx kvdb.RwTx, backupID BackupID) error {
4✔
2891
        chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
4✔
2892
        if chanDetailsBkt == nil {
4✔
2893
                return ErrUninitializedDB
×
2894
        }
×
2895

2896
        // If an entry for this channel does not exist in the channel details
2897
        // bucket then we exit here as this means that the channel has been
2898
        // closed.
2899
        chanDetails := chanDetailsBkt.NestedReadWriteBucket(backupID.ChanID[:])
4✔
2900
        if chanDetails == nil {
4✔
2901
                return nil
×
2902
        }
×
2903

2904
        putHeight := func() error {
8✔
2905
                b, err := writeBigSize(backupID.CommitHeight)
4✔
2906
                if err != nil {
4✔
2907
                        return err
×
2908
                }
×
2909

2910
                return chanDetails.Put(
4✔
2911
                        cChanMaxCommitmentHeight, b,
4✔
2912
                )
4✔
2913
        }
2914

2915
        // Get current height.
2916
        heightBytes := chanDetails.Get(cChanMaxCommitmentHeight)
4✔
2917

4✔
2918
        // The height might have not been set yet, in which case
4✔
2919
        // we can just write the new height.
4✔
2920
        if len(heightBytes) == 0 {
8✔
2921
                return putHeight()
4✔
2922
        }
4✔
2923

2924
        // Otherwise, read in the current max commitment height for the channel.
2925
        currentHeight, err := readBigSize(heightBytes)
4✔
2926
        if err != nil {
4✔
2927
                return err
×
2928
        }
×
2929

2930
        // If the new height is not larger than the current persisted height,
2931
        // then there is nothing left for us to do.
2932
        if backupID.CommitHeight <= currentHeight {
4✔
2933
                return nil
×
2934
        }
×
2935

2936
        return putHeight()
4✔
2937
}
2938

2939
func getRealSessionID(sessIDIndexBkt kvdb.RBucket, dbID uint64) (*SessionID,
2940
        error) {
4✔
2941

4✔
2942
        dbIDBytes, err := writeBigSize(dbID)
4✔
2943
        if err != nil {
4✔
2944
                return nil, err
×
2945
        }
×
2946

2947
        sessIDBytes := sessIDIndexBkt.Get(dbIDBytes)
4✔
2948
        if len(sessIDBytes) != SessionIDSize {
4✔
2949
                return nil, fmt.Errorf("session ID not found")
×
2950
        }
×
2951

2952
        var sessID SessionID
4✔
2953
        copy(sessID[:], sessIDBytes)
4✔
2954

4✔
2955
        return &sessID, nil
4✔
2956
}
2957

2958
func getRealChannelID(chanIDIndexBkt kvdb.RBucket,
2959
        dbID uint64) (*lnwire.ChannelID, error) {
4✔
2960

4✔
2961
        dbIDBytes, err := writeBigSize(dbID)
4✔
2962
        if err != nil {
4✔
2963
                return nil, err
×
2964
        }
×
2965

2966
        chanIDBytes := chanIDIndexBkt.Get(dbIDBytes)
4✔
2967
        if len(chanIDBytes) != 32 { //nolint:gomnd
4✔
2968
                return nil, fmt.Errorf("channel ID not found")
×
2969
        }
×
2970

2971
        var chanIDS lnwire.ChannelID
4✔
2972
        copy(chanIDS[:], chanIDBytes)
4✔
2973

4✔
2974
        return &chanIDS, nil
4✔
2975
}
2976

2977
// writeBigSize will encode the given uint64 as a BigSize byte slice.
2978
func writeBigSize(i uint64) ([]byte, error) {
4✔
2979
        var b bytes.Buffer
4✔
2980
        err := tlv.WriteVarInt(&b, i, &[8]byte{})
4✔
2981
        if err != nil {
4✔
2982
                return nil, err
×
2983
        }
×
2984

2985
        return b.Bytes(), nil
4✔
2986
}
2987

2988
// readBigSize converts the given byte slice into a uint64 and assumes that the
2989
// bytes slice is using BigSize encoding.
2990
func readBigSize(b []byte) (uint64, error) {
4✔
2991
        r := bytes.NewReader(b)
4✔
2992
        i, err := tlv.ReadVarInt(r, &[8]byte{})
4✔
2993
        if err != nil {
4✔
2994
                return 0, err
×
2995
        }
×
2996

2997
        return i, nil
4✔
2998
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc