• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.16
/watchtower/wtdb/client_db.go
1
package wtdb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "net"
9
        "sync"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/lightningnetwork/lnd/fn/v2"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
        "github.com/lightningnetwork/lnd/tlv"
16
        "github.com/lightningnetwork/lnd/watchtower/blob"
17
)
18

19
var (
20
        // cSessionKeyIndexBkt is a top-level bucket storing:
21
        //   tower-id -> reserved-session-key-index (uint32).
22
        cSessionKeyIndexBkt = []byte("client-session-key-index-bucket")
23

24
        // cChanDetailsBkt is a top-level bucket storing:
25
        //   channel-id => cChannelSummary -> encoded ClientChanSummary.
26
        //                  => cChanDBID -> db-assigned-id
27
        //                 => cChanSessions => db-session-id -> 1
28
        //                 => cChanClosedHeight -> block-height
29
        //                 => cChanMaxCommitmentHeight -> commitment-height
30
        cChanDetailsBkt = []byte("client-channel-detail-bucket")
31

32
        // cChanSessions is a sub-bucket of cChanDetailsBkt which stores:
33
        //    db-session-id -> 1
34
        cChanSessions = []byte("client-channel-sessions")
35

36
        // cChanDBID is a key used in the cChanDetailsBkt to store the
37
        // db-assigned-id of a channel.
38
        cChanDBID = []byte("client-channel-db-id")
39

40
        // cChanClosedHeight is a key used in the cChanDetailsBkt to store the
41
        // block height at which the channel's closing transaction was mined in.
42
        // If this there is no associated value for this key, then the channel
43
        // has not yet been marked as closed.
44
        cChanClosedHeight = []byte("client-channel-closed-height")
45

46
        // cChannelSummary is a key used in cChanDetailsBkt to store the encoded
47
        // body of ClientChanSummary.
48
        cChannelSummary = []byte("client-channel-summary")
49

50
        // cChanMaxCommitmentHeight is a key used in the cChanDetailsBkt used
51
        // to store the highest commitment height for this channel that the
52
        // tower has been handed.
53
        cChanMaxCommitmentHeight = []byte(
54
                "client-channel-max-commitment-height",
55
        )
56

57
        // cSessionBkt is a top-level bucket storing:
58
        //   session-id => cSessionBody -> encoded ClientSessionBody
59
        //                 => cSessionDBID -> db-assigned-id
60
        //              => cSessionCommits => seqnum -> encoded CommittedUpdate
61
        //              => cSessionAckRangeIndex => db-chan-id => start -> end
62
        //                 => cSessionRogueUpdateCount -> count
63
        cSessionBkt = []byte("client-session-bucket")
64

65
        // cSessionDBID is a key used in the cSessionBkt to store the
66
        // db-assigned-id of a session.
67
        cSessionDBID = []byte("client-session-db-id")
68

69
        // cSessionBody is a sub-bucket of cSessionBkt storing only the body of
70
        // the ClientSession.
71
        cSessionBody = []byte("client-session-body")
72

73
        // cSessionBody is a sub-bucket of cSessionBkt storing:
74
        //    seqnum -> encoded CommittedUpdate.
75
        cSessionCommits = []byte("client-session-commits")
76

77
        // cSessionAckRangeIndex is a sub-bucket of cSessionBkt storing
78
        //    chan-id => start -> end
79
        cSessionAckRangeIndex = []byte("client-session-ack-range-index")
80

81
        // cSessionRogueUpdateCount is a key in the cSessionBkt bucket storing
82
        // the number of rogue updates that were backed up using the session.
83
        // Rogue updates are updates for channels that have been closed already
84
        // at the time of the back-up.
85
        cSessionRogueUpdateCount = []byte("client-session-rogue-update-count")
86

87
        // cChanIDIndexBkt is a top-level bucket storing:
88
        //    db-assigned-id -> channel-ID
89
        cChanIDIndexBkt = []byte("client-channel-id-index")
90

91
        // cSessionIDIndexBkt is a top-level bucket storing:
92
        //    db-assigned-id -> session-id
93
        cSessionIDIndexBkt = []byte("client-session-id-index")
94

95
        // cTowerBkt is a top-level bucket storing:
96
        //    tower-id -> encoded Tower.
97
        cTowerBkt = []byte("client-tower-bucket")
98

99
        // cTowerIndexBkt is a top-level bucket storing:
100
        //    tower-pubkey -> tower-id.
101
        cTowerIndexBkt = []byte("client-tower-index-bucket")
102

103
        // cTowerToSessionIndexBkt is a top-level bucket storing:
104
        //         tower-id -> session-id -> 1
105
        cTowerToSessionIndexBkt = []byte(
106
                "client-tower-to-session-index-bucket",
107
        )
108

109
        // cClosableSessionsBkt is a top-level bucket storing:
110
        //         db-session-id -> last-channel-close-height
111
        cClosableSessionsBkt = []byte("client-closable-sessions-bucket")
112

113
        // cTaskQueue is a top-level bucket where the disk queue may store its
114
        // content.
115
        cTaskQueue = []byte("client-task-queue")
116

117
        // ErrTowerNotFound signals that the target tower was not found in the
118
        // database.
119
        ErrTowerNotFound = errors.New("tower not found")
120

121
        // ErrTowerUnackedUpdates is an error returned when we attempt to mark a
122
        // tower's sessions as inactive, but one of its sessions has unacked
123
        // updates.
124
        ErrTowerUnackedUpdates = errors.New("tower has unacked updates")
125

126
        // ErrCorruptClientSession signals that the client session's on-disk
127
        // structure deviates from what is expected.
128
        ErrCorruptClientSession = errors.New("client session corrupted")
129

130
        // ErrCorruptChanDetails signals that the clients channel detail's
131
        // on-disk structure deviates from what is expected.
132
        ErrCorruptChanDetails = errors.New("channel details corrupted")
133

134
        // ErrClientSessionAlreadyExists signals an attempt to reinsert a client
135
        // session that has already been created.
136
        ErrClientSessionAlreadyExists = errors.New(
137
                "client session already exists",
138
        )
139

140
        // ErrChannelAlreadyRegistered signals a duplicate attempt to register a
141
        // channel with the client database.
142
        ErrChannelAlreadyRegistered = errors.New("channel already registered")
143

144
        // ErrChannelNotRegistered signals a channel has not yet been registered
145
        // in the client database.
146
        ErrChannelNotRegistered = errors.New("channel not registered")
147

148
        // ErrClientSessionNotFound signals that the requested client session
149
        // was not found in the database.
150
        ErrClientSessionNotFound = errors.New("client session not found")
151

152
        // ErrUpdateAlreadyCommitted signals that the chosen sequence number has
153
        // already been committed to an update with a different breach hint.
154
        ErrUpdateAlreadyCommitted = errors.New("update already committed")
155

156
        // ErrCommitUnorderedUpdate signals the client tried to commit a
157
        // sequence number other than the next unallocated sequence number.
158
        ErrCommitUnorderedUpdate = errors.New("update seqnum not monotonic")
159

160
        // ErrCommittedUpdateNotFound signals that the tower tried to ACK a
161
        // sequence number that has not yet been allocated by the client.
162
        ErrCommittedUpdateNotFound = errors.New("committed update not found")
163

164
        // ErrUnallocatedLastApplied signals that the tower tried to provide a
165
        // LastApplied value greater than any allocated sequence number.
166
        ErrUnallocatedLastApplied = errors.New("tower echoed last appiled " +
167
                "greater than allocated seqnum")
168

169
        // ErrNoReservedKeyIndex signals that a client session could not be
170
        // created because no session key index was reserved.
171
        ErrNoReservedKeyIndex = errors.New("key index not reserved")
172

173
        // ErrIncorrectKeyIndex signals that the client session could not be
174
        // created because session key index differs from the reserved key
175
        // index.
176
        ErrIncorrectKeyIndex = errors.New("incorrect key index")
177

178
        // ErrLastTowerAddr is an error returned when the last address of a
179
        // watchtower is attempted to be removed.
180
        ErrLastTowerAddr = errors.New("cannot remove last tower address")
181

182
        // ErrNoRangeIndexFound is returned when there is no persisted
183
        // range-index found for the given session ID to channel ID pair.
184
        ErrNoRangeIndexFound = errors.New("no range index found for the " +
185
                "given session-channel pair")
186

187
        // ErrSessionFailedFilterFn indicates that a particular session did
188
        // not pass the filter func provided by the caller.
189
        ErrSessionFailedFilterFn = errors.New("session failed filter func")
190

191
        // ErrSessionNotClosable is returned when a session is not found in the
192
        // closable list.
193
        ErrSessionNotClosable = errors.New("session is not closable")
194

195
        // errSessionHasOpenChannels is an error used to indicate that a
196
        // session has updates for channels that are still open.
197
        errSessionHasOpenChannels = errors.New("session has open channels")
198

199
        // ErrSessionHasUnackedUpdates is an error used to indicate that a
200
        // session has un-acked updates.
201
        ErrSessionHasUnackedUpdates = errors.New("session has un-acked updates")
202

203
        // errChannelHasMoreSessions is an error used to indicate that a channel
204
        // has updates in other non-closed sessions.
205
        errChannelHasMoreSessions = errors.New("channel has updates in " +
206
                "other sessions")
207
)
208

209
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
210
// the watchtower database.
211
func NewBoltBackendCreator(active bool, dbPath,
212
        dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend,
213
        error) {
106✔
214

106✔
215
        // If the watchtower client isn't active, we return a function that
106✔
216
        // always returns a nil DB to make sure we don't create empty database
106✔
217
        // files.
106✔
218
        if !active {
106✔
219
                return func(_ *kvdb.BoltConfig) (kvdb.Backend, error) {
×
220
                        return nil, nil
×
221
                }
×
222
        }
223

224
        return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
212✔
225
                cfg := &kvdb.BoltBackendConfig{
106✔
226
                        DBPath:            dbPath,
106✔
227
                        DBFileName:        dbFileName,
106✔
228
                        NoFreelistSync:    boltCfg.NoFreelistSync,
106✔
229
                        AutoCompact:       boltCfg.AutoCompact,
106✔
230
                        AutoCompactMinAge: boltCfg.AutoCompactMinAge,
106✔
231
                        DBTimeout:         boltCfg.DBTimeout,
106✔
232
                }
106✔
233

106✔
234
                db, err := kvdb.GetBoltBackend(cfg)
106✔
235
                if err != nil {
106✔
236
                        return nil, fmt.Errorf("could not open boltdb: %w", err)
×
237
                }
×
238

239
                return db, nil
106✔
240
        }
241
}
242

243
// ClientDB is single database providing a persistent storage engine for the
244
// wtclient.
245
type ClientDB struct {
246
        db kvdb.Backend
247

248
        // ackedRangeIndex is a map from session ID to channel ID to a
249
        // RangeIndex which represents the backups that have been acked for that
250
        // channel using that session.
251
        ackedRangeIndex   map[SessionID]map[lnwire.ChannelID]*RangeIndex
252
        ackedRangeIndexMu sync.Mutex
253
}
254

255
// OpenClientDB opens the client database given the path to the database's
256
// directory. If no such database exists, this method will initialize a fresh
257
// one using the latest version number and bucket structure. If a database
258
// exists but has a lower version number than the current version, any necessary
259
// migrations will be applied before returning. Any attempt to open a database
260
// with a version number higher that the latest version will fail to prevent
261
// accidental reversion.
262
func OpenClientDB(db kvdb.Backend) (*ClientDB, error) {
67✔
263
        firstInit, err := isFirstInit(db)
67✔
264
        if err != nil {
67✔
265
                return nil, err
×
266
        }
×
267

268
        clientDB := &ClientDB{
67✔
269
                db: db,
67✔
270
                ackedRangeIndex: make(
67✔
271
                        map[SessionID]map[lnwire.ChannelID]*RangeIndex,
67✔
272
                ),
67✔
273
        }
67✔
274

67✔
275
        err = initOrSyncVersions(clientDB, firstInit, clientDBVersions)
67✔
276
        if err != nil {
67✔
277
                db.Close()
×
278
                return nil, err
×
279
        }
×
280

281
        // Now that the database version fully consistent with our latest known
282
        // version, ensure that all top-level buckets known to this version are
283
        // initialized. This allows us to assume their presence throughout all
284
        // operations. If an known top-level bucket is expected to exist but is
285
        // missing, this will trigger a ErrUninitializedDB error.
286
        err = kvdb.Update(clientDB.db, initClientDBBuckets, func() {})
134✔
287
        if err != nil {
67✔
288
                db.Close()
×
289
                return nil, err
×
290
        }
×
291

292
        return clientDB, nil
67✔
293
}
294

295
// initClientDBBuckets creates all top-level buckets required to handle database
296
// operations required by the latest version.
297
func initClientDBBuckets(tx kvdb.RwTx) error {
67✔
298
        buckets := [][]byte{
67✔
299
                cSessionKeyIndexBkt,
67✔
300
                cChanDetailsBkt,
67✔
301
                cSessionBkt,
67✔
302
                cTowerBkt,
67✔
303
                cTowerIndexBkt,
67✔
304
                cTowerToSessionIndexBkt,
67✔
305
                cChanIDIndexBkt,
67✔
306
                cSessionIDIndexBkt,
67✔
307
                cClosableSessionsBkt,
67✔
308
        }
67✔
309

67✔
310
        for _, bucket := range buckets {
670✔
311
                _, err := tx.CreateTopLevelBucket(bucket)
603✔
312
                if err != nil {
603✔
313
                        return err
×
314
                }
×
315
        }
316

317
        return nil
67✔
318
}
319

320
// bdb returns the backing bbolt.DB instance.
321
//
322
// NOTE: Part of the versionedDB interface.
323
func (c *ClientDB) bdb() kvdb.Backend {
55✔
324
        return c.db
55✔
325
}
55✔
326

327
// Version returns the database's current version number.
328
//
329
// NOTE: Part of the versionedDB interface.
330
func (c *ClientDB) Version() (uint32, error) {
12✔
331
        var version uint32
12✔
332
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
24✔
333
                var err error
12✔
334
                version, err = getDBVersion(tx)
12✔
335
                return err
12✔
336
        }, func() {
24✔
337
                version = 0
12✔
338
        })
12✔
339
        if err != nil {
12✔
340
                return 0, err
×
341
        }
×
342

343
        return version, nil
12✔
344
}
345

346
// Close closes the underlying database.
347
func (c *ClientDB) Close() error {
66✔
348
        return c.db.Close()
66✔
349
}
66✔
350

351
// CreateTower initialize an address record used to communicate with a
352
// watchtower. Each Tower is assigned a unique ID, that is used to amortize
353
// storage costs of the public key when used by multiple sessions. If the tower
354
// already exists, the address is appended to the list of all addresses used to
355
// that tower previously and its corresponding sessions are marked as active.
356
func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) {
83✔
357
        var towerPubKey [33]byte
83✔
358
        copy(towerPubKey[:], lnAddr.IdentityKey.SerializeCompressed())
83✔
359

83✔
360
        var tower *Tower
83✔
361
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
166✔
362
                towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
83✔
363
                if towerIndex == nil {
83✔
364
                        return ErrUninitializedDB
×
365
                }
×
366

367
                towers := tx.ReadWriteBucket(cTowerBkt)
83✔
368
                if towers == nil {
83✔
369
                        return ErrUninitializedDB
×
370
                }
×
371

372
                towerToSessionIndex := tx.ReadWriteBucket(
83✔
373
                        cTowerToSessionIndexBkt,
83✔
374
                )
83✔
375
                if towerToSessionIndex == nil {
83✔
376
                        return ErrUninitializedDB
×
377
                }
×
378

379
                // Check if the tower index already knows of this pubkey.
380
                towerIDBytes := towerIndex.Get(towerPubKey[:])
83✔
381
                if len(towerIDBytes) == 8 {
104✔
382
                        // The tower already exists, deserialize the existing
21✔
383
                        // record.
21✔
384
                        var err error
21✔
385
                        tower, err = getTower(towers, towerIDBytes)
21✔
386
                        if err != nil {
21✔
387
                                return err
×
388
                        }
×
389

390
                        // Set its status to active.
391
                        tower.Status = TowerStatusActive
21✔
392

21✔
393
                        // Add the new address to the existing tower. If the
21✔
394
                        // address is a duplicate, this will result in no
21✔
395
                        // change.
21✔
396
                        tower.AddAddress(lnAddr.Address)
21✔
397
                } else {
62✔
398
                        // No such tower exists, create a new tower id for our
62✔
399
                        // new tower. The error is unhandled since NextSequence
62✔
400
                        // never fails in an Update.
62✔
401
                        towerID, _ := towerIndex.NextSequence()
62✔
402

62✔
403
                        tower = &Tower{
62✔
404
                                ID:          TowerID(towerID),
62✔
405
                                IdentityKey: lnAddr.IdentityKey,
62✔
406
                                Addresses:   []net.Addr{lnAddr.Address},
62✔
407
                                Status:      TowerStatusActive,
62✔
408
                        }
62✔
409

62✔
410
                        towerIDBytes = tower.ID.Bytes()
62✔
411

62✔
412
                        // Since this tower is new, record the mapping from
62✔
413
                        // tower pubkey to tower id in the tower index.
62✔
414
                        err := towerIndex.Put(towerPubKey[:], towerIDBytes)
62✔
415
                        if err != nil {
62✔
416
                                return err
×
417
                        }
×
418

419
                        // Create a new bucket for this tower in the
420
                        // tower-to-sessions index.
421
                        _, err = towerToSessionIndex.CreateBucket(towerIDBytes)
62✔
422
                        if err != nil {
62✔
423
                                return err
×
424
                        }
×
425
                }
426

427
                // Store the new or updated tower under its tower id.
428
                return putTower(towers, tower)
83✔
429
        }, func() {
83✔
430
                tower = nil
83✔
431
        })
83✔
432
        if err != nil {
83✔
433
                return nil, err
×
434
        }
×
435

436
        return tower, nil
83✔
437
}
438

439
// RemoveTower modifies a tower's record within the database. If an address is
440
// provided, then _only_ the address record should be removed from the tower's
441
// persisted state. Otherwise, we'll attempt to mark the tower as inactive. If
442
// any of its sessions has unacked updates, then ErrTowerUnackedUpdates is
443
// returned. If the tower doesn't have any sessions at all, it'll be completely
444
// removed from the database.
445
//
446
// NOTE: An error is not returned if the tower doesn't exist.
447
func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error {
24✔
448
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
48✔
449
                towers := tx.ReadWriteBucket(cTowerBkt)
24✔
450
                if towers == nil {
24✔
451
                        return ErrUninitializedDB
×
452
                }
×
453

454
                towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
24✔
455
                if towerIndex == nil {
24✔
456
                        return ErrUninitializedDB
×
457
                }
×
458

459
                towersToSessionsIndex := tx.ReadWriteBucket(
24✔
460
                        cTowerToSessionIndexBkt,
24✔
461
                )
24✔
462
                if towersToSessionsIndex == nil {
24✔
463
                        return ErrUninitializedDB
×
464
                }
×
465

466
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
24✔
467
                if chanIDIndexBkt == nil {
24✔
468
                        return ErrUninitializedDB
×
469
                }
×
470

471
                // Don't return an error if the watchtower doesn't exist to act
472
                // as a NOP.
473
                pubKeyBytes := pubKey.SerializeCompressed()
24✔
474
                towerIDBytes := towerIndex.Get(pubKeyBytes)
24✔
475
                if towerIDBytes == nil {
26✔
476
                        return nil
2✔
477
                }
2✔
478

479
                tower, err := getTower(towers, towerIDBytes)
22✔
480
                if err != nil {
22✔
481
                        return err
×
482
                }
×
483

484
                // If an address is provided, then we should _only_ remove the
485
                // address record from the database.
486
                if addr != nil {
28✔
487
                        // Towers should always have at least one address saved.
6✔
488
                        tower.RemoveAddress(addr)
6✔
489
                        if len(tower.Addresses) == 0 {
8✔
490
                                return ErrLastTowerAddr
2✔
491
                        }
2✔
492

493
                        return putTower(towers, tower)
4✔
494
                }
495

496
                // Otherwise, we should attempt to mark the tower's sessions as
497
                // inactive.
498
                sessions := tx.ReadWriteBucket(cSessionBkt)
16✔
499
                if sessions == nil {
16✔
500
                        return ErrUninitializedDB
×
501
                }
×
502
                towerID := TowerIDFromBytes(towerIDBytes)
16✔
503

16✔
504
                committedUpdateCount := make(map[SessionID]uint16)
16✔
505
                perCommittedUpdate := func(s *ClientSession,
16✔
506
                        _ *CommittedUpdate) {
18✔
507

2✔
508
                        committedUpdateCount[s.ID]++
2✔
509
                }
2✔
510

511
                towerSessions, err := c.listTowerSessions(
16✔
512
                        towerID, sessions, chanIDIndexBkt,
16✔
513
                        towersToSessionsIndex,
16✔
514
                        WithPerCommittedUpdate(perCommittedUpdate),
16✔
515
                )
16✔
516
                if err != nil {
16✔
517
                        return err
×
518
                }
×
519

520
                // If it doesn't have any, we can completely remove it from the
521
                // database.
522
                if len(towerSessions) == 0 {
20✔
523
                        if err := towerIndex.Delete(pubKeyBytes); err != nil {
4✔
524
                                return err
×
525
                        }
×
526

527
                        if err := towers.Delete(towerIDBytes); err != nil {
4✔
528
                                return err
×
529
                        }
×
530

531
                        return towersToSessionsIndex.DeleteNestedBucket(
4✔
532
                                towerIDBytes,
4✔
533
                        )
4✔
534
                }
535

536
                // Otherwise, we mark the tower as inactive.
537
                tower.Status = TowerStatusInactive
12✔
538
                err = putTower(towers, tower)
12✔
539
                if err != nil {
12✔
540
                        return err
×
541
                }
×
542

543
                // We'll do a check to ensure that the tower's sessions don't
544
                // have any pending back-ups.
545
                for _, session := range towerSessions {
27✔
546
                        if committedUpdateCount[session.ID] > 0 {
17✔
547
                                return ErrTowerUnackedUpdates
2✔
548
                        }
2✔
549
                }
550

551
                return nil
10✔
552
        }, func() {})
24✔
553
}
554

555
// DeactivateTower sets the given tower's status to inactive. This means that
556
// this tower's sessions won't be loaded and used for backups. CreateTower can
557
// be used to reactivate the tower again.
558
func (c *ClientDB) DeactivateTower(pubKey *btcec.PublicKey) error {
4✔
559
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
560
                towers := tx.ReadWriteBucket(cTowerBkt)
4✔
561
                if towers == nil {
4✔
562
                        return ErrUninitializedDB
×
563
                }
×
564

565
                towerIndex := tx.ReadWriteBucket(cTowerIndexBkt)
4✔
566
                if towerIndex == nil {
4✔
567
                        return ErrUninitializedDB
×
568
                }
×
569

570
                towersToSessionsIndex := tx.ReadWriteBucket(
4✔
571
                        cTowerToSessionIndexBkt,
4✔
572
                )
4✔
573
                if towersToSessionsIndex == nil {
4✔
574
                        return ErrUninitializedDB
×
575
                }
×
576

577
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
4✔
578
                if chanIDIndexBkt == nil {
4✔
579
                        return ErrUninitializedDB
×
580
                }
×
581

582
                pubKeyBytes := pubKey.SerializeCompressed()
4✔
583
                towerIDBytes := towerIndex.Get(pubKeyBytes)
4✔
584
                if towerIDBytes == nil {
4✔
585
                        return ErrTowerNotFound
×
586
                }
×
587

588
                tower, err := getTower(towers, towerIDBytes)
4✔
589
                if err != nil {
4✔
590
                        return err
×
591
                }
×
592

593
                // If the tower already has the desired status, then we can exit
594
                // here.
595
                if tower.Status == TowerStatusInactive {
4✔
596
                        return nil
×
597
                }
×
598

599
                // Otherwise, we update the status and re-store the tower.
600
                tower.Status = TowerStatusInactive
4✔
601

4✔
602
                return putTower(towers, tower)
4✔
603
        }, func() {})
4✔
604
}
605

606
// LoadTowerByID retrieves a tower by its tower ID.
607
func (c *ClientDB) LoadTowerByID(towerID TowerID) (*Tower, error) {
17✔
608
        var tower *Tower
17✔
609
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
34✔
610
                towers := tx.ReadBucket(cTowerBkt)
17✔
611
                if towers == nil {
17✔
612
                        return ErrUninitializedDB
×
613
                }
×
614

615
                var err error
17✔
616
                tower, err = getTower(towers, towerID.Bytes())
17✔
617
                return err
17✔
618
        }, func() {
17✔
619
                tower = nil
17✔
620
        })
17✔
621
        if err != nil {
19✔
622
                return nil, err
2✔
623
        }
2✔
624

625
        return tower, nil
15✔
626
}
627

628
// LoadTower retrieves a tower by its public key.
629
func (c *ClientDB) LoadTower(pubKey *btcec.PublicKey) (*Tower, error) {
32✔
630
        var tower *Tower
32✔
631
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
64✔
632
                towers := tx.ReadBucket(cTowerBkt)
32✔
633
                if towers == nil {
32✔
634
                        return ErrUninitializedDB
×
635
                }
×
636
                towerIndex := tx.ReadBucket(cTowerIndexBkt)
32✔
637
                if towerIndex == nil {
32✔
638
                        return ErrUninitializedDB
×
639
                }
×
640

641
                towerIDBytes := towerIndex.Get(pubKey.SerializeCompressed())
32✔
642
                if towerIDBytes == nil {
36✔
643
                        return ErrTowerNotFound
4✔
644
                }
4✔
645

646
                var err error
28✔
647
                tower, err = getTower(towers, towerIDBytes)
28✔
648
                return err
28✔
649
        }, func() {
32✔
650
                tower = nil
32✔
651
        })
32✔
652
        if err != nil {
36✔
653
                return nil, err
4✔
654
        }
4✔
655

656
        return tower, nil
28✔
657
}
658

659
// TowerFilterFn is the signature of a call-back function that can be used to
660
// skip certain towers in the ListTowers method.
661
type TowerFilterFn func(*Tower) bool
662

663
// ListTowers retrieves the list of towers available within the database that
664
// have a status matching the given status. The filter function may be set in
665
// order to filter out the towers to be returned.
666
func (c *ClientDB) ListTowers(filter TowerFilterFn) ([]*Tower, error) {
44✔
667
        var towers []*Tower
44✔
668
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
88✔
669
                towerBucket := tx.ReadBucket(cTowerBkt)
44✔
670
                if towerBucket == nil {
44✔
671
                        return ErrUninitializedDB
×
672
                }
×
673

674
                return towerBucket.ForEach(func(towerIDBytes, _ []byte) error {
61✔
675
                        tower, err := getTower(towerBucket, towerIDBytes)
17✔
676
                        if err != nil {
17✔
677
                                return err
×
678
                        }
×
679

680
                        if filter != nil && !filter(tower) {
17✔
681
                                return nil
×
682
                        }
×
683

684
                        towers = append(towers, tower)
17✔
685

17✔
686
                        return nil
17✔
687
                })
688
        }, func() {
44✔
689
                towers = nil
44✔
690
        })
44✔
691
        if err != nil {
44✔
692
                return nil, err
×
693
        }
×
694

695
        return towers, nil
44✔
696
}
697

698
// NextSessionKeyIndex reserves a new session key derivation index for a
699
// particular tower id. The index is reserved for that tower until
700
// CreateClientSession is invoked for that tower and index, at which point a new
701
// index for that tower can be reserved. Multiple calls to this method before
702
// CreateClientSession is invoked should return the same index unless forceNext
703
// is true.
704
func (c *ClientDB) NextSessionKeyIndex(towerID TowerID,
705
        blobType blob.Type, forceNext bool) (uint32, error) {
149✔
706

149✔
707
        var index uint32
149✔
708
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
298✔
709
                keyIndex := tx.ReadWriteBucket(cSessionKeyIndexBkt)
149✔
710
                if keyIndex == nil {
149✔
711
                        return ErrUninitializedDB
×
712
                }
×
713

714
                var err error
149✔
715
                if !forceNext {
295✔
716
                        // Check the session key index to see if a key has
146✔
717
                        // already been reserved for this tower. If so, we'll
146✔
718
                        // deserialize and return the index directly.
146✔
719
                        index, err = getSessionKeyIndex(
146✔
720
                                keyIndex, towerID, blobType,
146✔
721
                        )
146✔
722
                        if err == nil {
187✔
723
                                return nil
41✔
724
                        }
41✔
725
                }
726

727
                // By default, we use the next available bucket sequence as the
728
                // key index. But if forceNext is true, then it is assumed that
729
                // some data loss occurred and so the sequence is incremented a
730
                // by a jump of 1000 so that we can arrive at a brand new key
731
                // index quicker.
732
                currentSequence := keyIndex.Sequence()
108✔
733
                nextIndex := currentSequence + 1
108✔
734
                if forceNext {
111✔
735
                        nextIndex = currentSequence + 1000
3✔
736
                }
3✔
737

738
                if err = keyIndex.SetSequence(nextIndex); err != nil {
108✔
739
                        return fmt.Errorf("could not set next bucket "+
×
740
                                "sequence: %w", err)
×
741
                }
×
742

743
                // As a sanity check, assert that the index is still in the
744
                // valid range of unhardened pubkeys. In the future, we should
745
                // move to only using hardened keys, and this will prevent any
746
                // overlap from occurring until then. This also prevents us from
747
                // overflowing uint32s.
748
                if nextIndex > math.MaxInt32 {
108✔
749
                        return fmt.Errorf("exhausted session key indexes")
×
750
                }
×
751

752
                // Create the key that will used to be store the reserved index.
753
                keyBytes := createSessionKeyIndexKey(towerID, blobType)
108✔
754

108✔
755
                index = uint32(nextIndex)
108✔
756

108✔
757
                var indexBuf [4]byte
108✔
758
                byteOrder.PutUint32(indexBuf[:], index)
108✔
759

108✔
760
                // Record the reserved session key index under this tower's id.
108✔
761
                return keyIndex.Put(keyBytes, indexBuf[:])
108✔
762
        }, func() {
149✔
763
                index = 0
149✔
764
        })
149✔
765
        if err != nil {
149✔
766
                return 0, err
×
767
        }
×
768

769
        return index, nil
149✔
770
}
771

772
// CreateClientSession records a newly negotiated client session in the set of
773
// active sessions. The session can be identified by its SessionID.
774
func (c *ClientDB) CreateClientSession(session *ClientSession) error {
108✔
775
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
216✔
776
                keyIndexes := tx.ReadWriteBucket(cSessionKeyIndexBkt)
108✔
777
                if keyIndexes == nil {
108✔
778
                        return ErrUninitializedDB
×
779
                }
×
780

781
                sessions := tx.ReadWriteBucket(cSessionBkt)
108✔
782
                if sessions == nil {
108✔
783
                        return ErrUninitializedDB
×
784
                }
×
785

786
                towers := tx.ReadBucket(cTowerBkt)
108✔
787
                if towers == nil {
108✔
788
                        return ErrUninitializedDB
×
789
                }
×
790

791
                towerToSessionIndex := tx.ReadWriteBucket(
108✔
792
                        cTowerToSessionIndexBkt,
108✔
793
                )
108✔
794
                if towerToSessionIndex == nil {
108✔
795
                        return ErrUninitializedDB
×
796
                }
×
797

798
                // Check that  client session with this session id doesn't
799
                // already exist.
800
                existingSessionBytes := sessions.NestedReadWriteBucket(
108✔
801
                        session.ID[:],
108✔
802
                )
108✔
803
                if existingSessionBytes != nil {
110✔
804
                        return ErrClientSessionAlreadyExists
2✔
805
                }
2✔
806

807
                // Ensure that a tower with the given ID actually exists in the
808
                // DB.
809
                towerID := session.TowerID
106✔
810
                if _, err := getTower(towers, towerID.Bytes()); err != nil {
106✔
811
                        return err
×
812
                }
×
813

814
                blobType := session.Policy.BlobType
106✔
815

106✔
816
                // Check that this tower has a reserved key index.
106✔
817
                index, err := getSessionKeyIndex(keyIndexes, towerID, blobType)
106✔
818
                if err != nil {
108✔
819
                        return err
2✔
820
                }
2✔
821

822
                // Assert that the key index of the inserted session matches the
823
                // reserved session key index.
824
                if index != session.KeyIndex {
106✔
825
                        return ErrIncorrectKeyIndex
2✔
826
                }
2✔
827

828
                // Remove the key index reservation. For altruist commit
829
                // sessions, we'll also purge under the old legacy key format.
830
                key := createSessionKeyIndexKey(towerID, blobType)
102✔
831
                err = keyIndexes.Delete(key)
102✔
832
                if err != nil {
102✔
833
                        return err
×
834
                }
×
835
                if blobType == blob.TypeAltruistCommit {
128✔
836
                        err = keyIndexes.Delete(towerID.Bytes())
26✔
837
                        if err != nil {
26✔
838
                                return err
×
839
                        }
×
840
                }
841

842
                // Get the session-ID index bucket.
843
                dbIDIndex := tx.ReadWriteBucket(cSessionIDIndexBkt)
102✔
844
                if dbIDIndex == nil {
102✔
845
                        return ErrUninitializedDB
×
846
                }
×
847

848
                // Get a new, unique, ID for this session from the session-ID
849
                // index bucket.
850
                nextSeq, err := dbIDIndex.NextSequence()
102✔
851
                if err != nil {
102✔
852
                        return err
×
853
                }
×
854

855
                // Add the new entry to the dbID-to-SessionID index.
856
                newIndex, err := writeBigSize(nextSeq)
102✔
857
                if err != nil {
102✔
858
                        return err
×
859
                }
×
860

861
                err = dbIDIndex.Put(newIndex, session.ID[:])
102✔
862
                if err != nil {
102✔
863
                        return err
×
864
                }
×
865

866
                // Also add the db-assigned-id to the session bucket under the
867
                // cSessionDBID key.
868
                sessionBkt, err := sessions.CreateBucket(session.ID[:])
102✔
869
                if err != nil {
102✔
870
                        return err
×
871
                }
×
872

873
                err = sessionBkt.Put(cSessionDBID, newIndex)
102✔
874
                if err != nil {
102✔
875
                        return err
×
876
                }
×
877

878
                // TODO(elle): migrate the towerID-to-SessionID to use the
879
                // new db-assigned sessionID's rather.
880

881
                // Add the new entry to the towerID-to-SessionID index.
882
                towerSessions := towerToSessionIndex.NestedReadWriteBucket(
102✔
883
                        towerID.Bytes(),
102✔
884
                )
102✔
885
                if towerSessions == nil {
102✔
886
                        return ErrTowerNotFound
×
887
                }
×
888

889
                err = towerSessions.Put(session.ID[:], []byte{1})
102✔
890
                if err != nil {
102✔
891
                        return err
×
892
                }
×
893

894
                // Finally, write the client session's body in the sessions
895
                // bucket.
896
                return putClientSessionBody(sessionBkt, session)
102✔
897
        }, func() {})
108✔
898
}
899

900
// readRangeIndex reads a persisted RangeIndex from the passed bucket and into
901
// a new in-memory RangeIndex.
902
func readRangeIndex(rangesBkt kvdb.RBucket) (*RangeIndex, error) {
92✔
903
        ranges := make(map[uint64]uint64)
92✔
904
        err := rangesBkt.ForEach(func(k, v []byte) error {
100✔
905
                start, err := readBigSize(k)
8✔
906
                if err != nil {
8✔
907
                        return err
×
908
                }
×
909

910
                end, err := readBigSize(v)
8✔
911
                if err != nil {
8✔
912
                        return err
×
913
                }
×
914

915
                ranges[start] = end
8✔
916

8✔
917
                return nil
8✔
918
        })
919
        if err != nil {
92✔
920
                return nil, err
×
921
        }
×
922

923
        return NewRangeIndex(ranges, WithSerializeUint64Fn(writeBigSize))
92✔
924
}
925

926
// getRangeIndex checks the ClientDB's in-memory range index map to see if it
927
// has an entry for the given session and channel ID. If it does, this is
928
// returned, otherwise the range index is loaded from the DB. An optional db
929
// transaction parameter may be provided. If one is provided then it will be
930
// used to query the DB for the range index, otherwise, a new transaction will
931
// be created and used.
932
func (c *ClientDB) getRangeIndex(tx kvdb.RTx, sID SessionID,
933
        chanID lnwire.ChannelID) (*RangeIndex, error) {
512✔
934

512✔
935
        c.ackedRangeIndexMu.Lock()
512✔
936
        defer c.ackedRangeIndexMu.Unlock()
512✔
937

512✔
938
        if _, ok := c.ackedRangeIndex[sID]; !ok {
587✔
939
                c.ackedRangeIndex[sID] = make(map[lnwire.ChannelID]*RangeIndex)
75✔
940
        }
75✔
941

942
        // If the in-memory range-index map already includes an entry for this
943
        // session ID and channel ID pair, then return it.
944
        if index, ok := c.ackedRangeIndex[sID][chanID]; ok {
940✔
945
                return index, nil
428✔
946
        }
428✔
947

948
        // readRangeIndexFromBkt is a helper that is used to read in a
949
        // RangeIndex structure from the passed in bucket and store it in the
950
        // ackedRangeIndex map.
951
        readRangeIndexFromBkt := func(rangesBkt kvdb.RBucket) (*RangeIndex,
84✔
952
                error) {
168✔
953

84✔
954
                // Create a new in-memory RangeIndex by reading in ranges from
84✔
955
                // the DB.
84✔
956
                rangeIndex, err := readRangeIndex(rangesBkt)
84✔
957
                if err != nil {
84✔
958
                        return nil, err
×
959
                }
×
960

961
                c.ackedRangeIndex[sID][chanID] = rangeIndex
84✔
962

84✔
963
                return rangeIndex, nil
84✔
964
        }
965

966
        // If a DB transaction is provided then use it to fetch the ranges
967
        // bucket from the DB.
968
        if tx != nil {
168✔
969
                rangesBkt, err := getRangesReadBucket(tx, sID, chanID)
84✔
970
                if err != nil {
84✔
971
                        return nil, err
×
972
                }
×
973

974
                return readRangeIndexFromBkt(rangesBkt)
84✔
975
        }
976

977
        // No DB transaction was provided. So create and use a new one.
978
        var index *RangeIndex
×
979
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
×
980
                rangesBkt, err := getRangesReadBucket(tx, sID, chanID)
×
981
                if err != nil {
×
982
                        return err
×
983
                }
×
984

985
                index, err = readRangeIndexFromBkt(rangesBkt)
×
986

×
987
                return err
×
988
        }, func() {})
×
989
        if err != nil {
×
990
                return nil, err
×
991
        }
×
992

993
        return index, nil
×
994
}
995

996
// getRangesReadBucket gets the range index bucket where the range index for the
997
// given session-channel pair is stored. If any sub-buckets along the way do not
998
// exist, then an error is returned. If the sub-buckets should be created
999
// instead, then use getRangesWriteBucket.
1000
func getRangesReadBucket(tx kvdb.RTx, sID SessionID, chanID lnwire.ChannelID) (
1001
        kvdb.RBucket, error) {
84✔
1002

84✔
1003
        sessions := tx.ReadBucket(cSessionBkt)
84✔
1004
        if sessions == nil {
84✔
1005
                return nil, ErrUninitializedDB
×
1006
        }
×
1007

1008
        chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
84✔
1009
        if chanDetailsBkt == nil {
84✔
1010
                return nil, ErrUninitializedDB
×
1011
        }
×
1012

1013
        sessionBkt := sessions.NestedReadBucket(sID[:])
84✔
1014
        if sessionsBkt == nil {
84✔
1015
                return nil, ErrNoRangeIndexFound
×
1016
        }
×
1017

1018
        // Get the DB representation of the channel-ID.
1019
        _, dbChanIDBytes, err := getDBChanID(chanDetailsBkt, chanID)
84✔
1020
        if err != nil {
84✔
1021
                return nil, err
×
1022
        }
×
1023

1024
        sessionAckRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
84✔
1025
        if sessionAckRanges == nil {
84✔
1026
                return nil, ErrNoRangeIndexFound
×
1027
        }
×
1028

1029
        return sessionAckRanges.NestedReadBucket(dbChanIDBytes), nil
84✔
1030
}
1031

1032
// getRangesWriteBucket gets the range index bucket where the range index for
1033
// the given session-channel pair is stored. If any sub-buckets along the way do
1034
// not exist, then they are created.
1035
func getRangesWriteBucket(sessionBkt kvdb.RwBucket, dbChanIDBytes []byte) (
1036
        kvdb.RwBucket, error) {
498✔
1037

498✔
1038
        sessionAckRanges, err := sessionBkt.CreateBucketIfNotExists(
498✔
1039
                cSessionAckRangeIndex,
498✔
1040
        )
498✔
1041
        if err != nil {
498✔
1042
                return nil, err
×
1043
        }
×
1044

1045
        return sessionAckRanges.CreateBucketIfNotExists(dbChanIDBytes)
498✔
1046
}
1047

1048
// createSessionKeyIndexKey returns the identifier used in the
1049
// session-key-index index, created as tower-id||blob-type.
1050
//
1051
// NOTE: The original serialization only used tower-id, which prevents
1052
// concurrent client types from reserving sessions with the same tower.
1053
func createSessionKeyIndexKey(towerID TowerID, blobType blob.Type) []byte {
462✔
1054
        towerIDBytes := towerID.Bytes()
462✔
1055

462✔
1056
        // Session key indexes are stored under as tower-id||blob-type.
462✔
1057
        var keyBytes [6]byte
462✔
1058
        copy(keyBytes[:4], towerIDBytes)
462✔
1059
        byteOrder.PutUint16(keyBytes[4:], uint16(blobType))
462✔
1060

462✔
1061
        return keyBytes[:]
462✔
1062
}
462✔
1063

1064
// getSessionKeyIndex is a helper method.
1065
func getSessionKeyIndex(keyIndexes kvdb.RwBucket, towerID TowerID,
1066
        blobType blob.Type) (uint32, error) {
252✔
1067

252✔
1068
        // Session key indexes are store under as tower-id||blob-type. The
252✔
1069
        // original serialization only used tower-id, which prevents concurrent
252✔
1070
        // client types from reserving sessions with the same tower.
252✔
1071
        keyBytes := createSessionKeyIndexKey(towerID, blobType)
252✔
1072

252✔
1073
        // Retrieve the index using the key bytes. If the key wasn't found, we
252✔
1074
        // will fall back to the legacy format that only uses the tower id, but
252✔
1075
        // _only_ if the blob type is for altruist commit sessions since that
252✔
1076
        // was the only operational session type prior to changing the key
252✔
1077
        // format.
252✔
1078
        keyIndexBytes := keyIndexes.Get(keyBytes)
252✔
1079
        if keyIndexBytes == nil && blobType == blob.TypeAltruistCommit {
278✔
1080
                keyIndexBytes = keyIndexes.Get(towerID.Bytes())
26✔
1081
        }
26✔
1082

1083
        // All session key indexes should be serialized uint32's. If no key
1084
        // index was found, the length of keyIndexBytes will be 0.
1085
        if len(keyIndexBytes) != 4 {
359✔
1086
                return 0, ErrNoReservedKeyIndex
107✔
1087
        }
107✔
1088

1089
        return byteOrder.Uint32(keyIndexBytes), nil
145✔
1090
}
1091

1092
// GetClientSession loads the ClientSession with the given ID from the DB.
1093
func (c *ClientDB) GetClientSession(id SessionID,
1094
        opts ...ClientSessionListOption) (*ClientSession, error) {
9✔
1095

9✔
1096
        var sess *ClientSession
9✔
1097
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
18✔
1098
                sessionsBkt := tx.ReadBucket(cSessionBkt)
9✔
1099
                if sessionsBkt == nil {
9✔
1100
                        return ErrUninitializedDB
×
1101
                }
×
1102

1103
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
9✔
1104
                if chanIDIndexBkt == nil {
9✔
1105
                        return ErrUninitializedDB
×
1106
                }
×
1107

1108
                session, err := c.getClientSession(
9✔
1109
                        sessionsBkt, chanIDIndexBkt, id[:], opts...,
9✔
1110
                )
9✔
1111
                if err != nil {
9✔
1112
                        return err
×
1113
                }
×
1114

1115
                sess = session
9✔
1116

9✔
1117
                return nil
9✔
1118
        }, func() {})
9✔
1119

1120
        return sess, err
9✔
1121
}
1122

1123
// ListClientSessions returns the set of all client sessions known to the db. An
1124
// optional tower ID can be used to filter out any client sessions in the
1125
// response that do not correspond to this tower.
1126
func (c *ClientDB) ListClientSessions(id *TowerID,
1127
        opts ...ClientSessionListOption) (map[SessionID]*ClientSession, error) {
134✔
1128

134✔
1129
        var clientSessions map[SessionID]*ClientSession
134✔
1130
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
268✔
1131
                sessions := tx.ReadBucket(cSessionBkt)
134✔
1132
                if sessions == nil {
134✔
1133
                        return ErrUninitializedDB
×
1134
                }
×
1135

1136
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
134✔
1137
                if chanIDIndexBkt == nil {
134✔
1138
                        return ErrUninitializedDB
×
1139
                }
×
1140

1141
                // If no tower ID is specified, then fetch all the sessions
1142
                // known to the db.
1143
                var err error
134✔
1144
                if id == nil {
147✔
1145
                        clientSessions, err = c.listClientAllSessions(
13✔
1146
                                sessions, chanIDIndexBkt, opts...,
13✔
1147
                        )
13✔
1148
                        return err
13✔
1149
                }
13✔
1150

1151
                // Otherwise, fetch the sessions for the given tower.
1152
                towerToSessionIndex := tx.ReadBucket(cTowerToSessionIndexBkt)
121✔
1153
                if towerToSessionIndex == nil {
121✔
1154
                        return ErrUninitializedDB
×
1155
                }
×
1156

1157
                clientSessions, err = c.listTowerSessions(
121✔
1158
                        *id, sessions, chanIDIndexBkt, towerToSessionIndex,
121✔
1159
                        opts...,
121✔
1160
                )
121✔
1161
                return err
121✔
1162
        }, func() {
134✔
1163
                clientSessions = nil
134✔
1164
        })
134✔
1165
        if err != nil {
134✔
1166
                return nil, err
×
1167
        }
×
1168

1169
        return clientSessions, nil
134✔
1170
}
1171

1172
// listClientAllSessions returns the set of all client sessions known to the db.
1173
func (c *ClientDB) listClientAllSessions(sessions, chanIDIndexBkt kvdb.RBucket,
1174
        opts ...ClientSessionListOption) (map[SessionID]*ClientSession, error) {
13✔
1175

13✔
1176
        clientSessions := make(map[SessionID]*ClientSession)
13✔
1177
        err := sessions.ForEach(func(k, _ []byte) error {
33✔
1178
                // We'll load the full client session since the client will need
20✔
1179
                // the CommittedUpdates and AckedUpdates on startup to resume
20✔
1180
                // committed updates and compute the highest known commit height
20✔
1181
                // for each channel.
20✔
1182
                session, err := c.getClientSession(
20✔
1183
                        sessions, chanIDIndexBkt, k, opts...,
20✔
1184
                )
20✔
1185
                if errors.Is(err, ErrSessionFailedFilterFn) {
20✔
1186
                        return nil
×
1187
                } else if err != nil {
20✔
1188
                        return err
×
1189
                }
×
1190

1191
                clientSessions[session.ID] = session
20✔
1192

20✔
1193
                return nil
20✔
1194
        })
1195
        if err != nil {
13✔
1196
                return nil, err
×
1197
        }
×
1198

1199
        return clientSessions, nil
13✔
1200
}
1201

1202
// listTowerSessions returns the set of all client sessions known to the db
1203
// that are associated with the given tower id.
1204
func (c *ClientDB) listTowerSessions(id TowerID, sessionsBkt, chanIDIndexBkt,
1205
        towerToSessionIndex kvdb.RBucket, opts ...ClientSessionListOption) (
1206
        map[SessionID]*ClientSession, error) {
137✔
1207

137✔
1208
        towerIndexBkt := towerToSessionIndex.NestedReadBucket(id.Bytes())
137✔
1209
        if towerIndexBkt == nil {
137✔
1210
                return nil, ErrTowerNotFound
×
1211
        }
×
1212

1213
        clientSessions := make(map[SessionID]*ClientSession)
137✔
1214
        err := towerIndexBkt.ForEach(func(k, _ []byte) error {
214✔
1215
                // We'll load the full client session since the client will need
77✔
1216
                // the CommittedUpdates and AckedUpdates on startup to resume
77✔
1217
                // committed updates and compute the highest known commit height
77✔
1218
                // for each channel.
77✔
1219
                session, err := c.getClientSession(
77✔
1220
                        sessionsBkt, chanIDIndexBkt, k, opts...,
77✔
1221
                )
77✔
1222
                if errors.Is(err, ErrSessionFailedFilterFn) {
82✔
1223
                        return nil
5✔
1224
                } else if err != nil {
77✔
1225
                        return err
×
1226
                }
×
1227

1228
                clientSessions[session.ID] = session
72✔
1229
                return nil
72✔
1230
        })
1231
        if err != nil {
137✔
1232
                return nil, err
×
1233
        }
×
1234

1235
        return clientSessions, nil
137✔
1236
}
1237

1238
// FetchSessionCommittedUpdates retrieves the current set of un-acked updates
1239
// of the given session.
1240
func (c *ClientDB) FetchSessionCommittedUpdates(id *SessionID) (
1241
        []CommittedUpdate, error) {
191✔
1242

191✔
1243
        var committedUpdates []CommittedUpdate
191✔
1244
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
382✔
1245
                sessions := tx.ReadBucket(cSessionBkt)
191✔
1246
                if sessions == nil {
191✔
1247
                        return ErrUninitializedDB
×
1248
                }
×
1249

1250
                sessionBkt := sessions.NestedReadBucket(id[:])
191✔
1251
                if sessionBkt == nil {
193✔
1252
                        return ErrClientSessionNotFound
2✔
1253
                }
2✔
1254

1255
                var err error
189✔
1256
                committedUpdates, err = getClientSessionCommits(
189✔
1257
                        sessionBkt, nil, nil,
189✔
1258
                )
189✔
1259
                return err
189✔
1260
        }, func() {})
191✔
1261
        if err != nil {
193✔
1262
                return nil, err
2✔
1263
        }
2✔
1264

1265
        return committedUpdates, nil
189✔
1266
}
1267

1268
// IsAcked returns true if the given backup has been backed up using the given
1269
// session.
1270
func (c *ClientDB) IsAcked(id *SessionID, backupID *BackupID) (bool, error) {
6✔
1271
        index, err := c.getRangeIndex(nil, *id, backupID.ChanID)
6✔
1272
        if errors.Is(err, ErrNoRangeIndexFound) {
6✔
1273
                return false, nil
×
1274
        } else if err != nil {
6✔
1275
                return false, err
×
1276
        }
×
1277

1278
        return index.IsInIndex(backupID.CommitHeight), nil
6✔
1279
}
1280

1281
// NumAckedUpdates returns the number of backups that have been successfully
1282
// backed up using the given session.
1283
func (c *ClientDB) NumAckedUpdates(id *SessionID) (uint64, error) {
22✔
1284
        var numAcked uint64
22✔
1285
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
44✔
1286
                sessions := tx.ReadBucket(cSessionBkt)
22✔
1287
                if sessions == nil {
22✔
1288
                        return ErrUninitializedDB
×
1289
                }
×
1290

1291
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
22✔
1292
                if chanIDIndexBkt == nil {
22✔
1293
                        return ErrUninitializedDB
×
1294
                }
×
1295

1296
                sessionBkt := sessions.NestedReadBucket(id[:])
22✔
1297
                if sessionBkt == nil {
22✔
1298
                        return nil
×
1299
                }
×
1300

1301
                // First, account for any rogue updates.
1302
                rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
22✔
1303
                if len(rogueCountBytes) != 0 {
24✔
1304
                        rogueCount, err := readBigSize(rogueCountBytes)
2✔
1305
                        if err != nil {
2✔
1306
                                return err
×
1307
                        }
×
1308

1309
                        numAcked += rogueCount
2✔
1310
                }
1311

1312
                // Then, check if the session-ack-ranges contains any entries
1313
                // to account for.
1314
                sessionAckRanges := sessionBkt.NestedReadBucket(
22✔
1315
                        cSessionAckRangeIndex,
22✔
1316
                )
22✔
1317
                if sessionAckRanges == nil {
38✔
1318
                        return nil
16✔
1319
                }
16✔
1320

1321
                // Iterate over the channel ID's in the sessionAckRanges
1322
                // bucket.
1323
                return sessionAckRanges.ForEach(func(dbChanID, _ []byte) error {
14✔
1324
                        // Get the range index for the session-channel pair.
8✔
1325
                        chanIDBytes := chanIDIndexBkt.Get(dbChanID)
8✔
1326
                        var chanID lnwire.ChannelID
8✔
1327
                        copy(chanID[:], chanIDBytes)
8✔
1328

8✔
1329
                        index, err := c.getRangeIndex(tx, *id, chanID)
8✔
1330
                        if err != nil {
8✔
1331
                                return err
×
1332
                        }
×
1333

1334
                        numAcked += index.NumInSet()
8✔
1335

8✔
1336
                        return nil
8✔
1337
                })
1338
        }, func() {
22✔
1339
                numAcked = 0
22✔
1340
        })
22✔
1341
        if err != nil {
22✔
1342
                return 0, err
×
1343
        }
×
1344

1345
        return numAcked, nil
22✔
1346
}
1347

1348
// FetchChanInfos loads a mapping from all registered channels to their
1349
// ChannelInfo. Only the channels that have not yet been marked as closed will
1350
// be loaded.
1351
func (c *ClientDB) FetchChanInfos() (ChannelInfos, error) {
47✔
1352
        var infos ChannelInfos
47✔
1353

47✔
1354
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
94✔
1355
                chanDetailsBkt := tx.ReadBucket(cChanDetailsBkt)
47✔
1356
                if chanDetailsBkt == nil {
47✔
1357
                        return ErrUninitializedDB
×
1358
                }
×
1359

1360
                return chanDetailsBkt.ForEach(func(k, _ []byte) error {
63✔
1361
                        chanDetails := chanDetailsBkt.NestedReadBucket(k)
16✔
1362
                        if chanDetails == nil {
16✔
1363
                                return ErrCorruptChanDetails
×
1364
                        }
×
1365
                        // If this channel has already been marked as closed,
1366
                        // then its summary does not need to be loaded.
1367
                        closedHeight := chanDetails.Get(cChanClosedHeight)
16✔
1368
                        if len(closedHeight) > 0 {
16✔
1369
                                return nil
×
1370
                        }
×
1371
                        var chanID lnwire.ChannelID
16✔
1372
                        copy(chanID[:], k)
16✔
1373
                        summary, err := getChanSummary(chanDetails)
16✔
1374
                        if err != nil {
16✔
1375
                                return err
×
1376
                        }
×
1377

1378
                        info := &ChannelInfo{
16✔
1379
                                ClientChanSummary: *summary,
16✔
1380
                        }
16✔
1381

16✔
1382
                        maxHeightBytes := chanDetails.Get(
16✔
1383
                                cChanMaxCommitmentHeight,
16✔
1384
                        )
16✔
1385
                        if len(maxHeightBytes) != 0 {
28✔
1386
                                height, err := readBigSize(maxHeightBytes)
12✔
1387
                                if err != nil {
12✔
1388
                                        return err
×
1389
                                }
×
1390

1391
                                info.MaxHeight = fn.Some(height)
12✔
1392
                        }
1393

1394
                        infos[chanID] = info
16✔
1395

16✔
1396
                        return nil
16✔
1397
                })
1398
        }, func() {
47✔
1399
                infos = make(ChannelInfos)
47✔
1400
        })
47✔
1401
        if err != nil {
47✔
1402
                return nil, err
×
1403
        }
×
1404

1405
        return infos, nil
47✔
1406
}
1407

1408
// RegisterChannel registers a channel for use within the client database. For
1409
// now, all that is stored in the channel summary is the sweep pkscript that
1410
// we'd like any tower sweeps to pay into. In the future, this will be extended
1411
// to contain more info to allow the client efficiently request historical
1412
// states to be backed up under the client's active policy.
1413
func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID,
1414
        sweepPkScript []byte) error {
69✔
1415

69✔
1416
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
138✔
1417
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
69✔
1418
                if chanDetailsBkt == nil {
69✔
1419
                        return ErrUninitializedDB
×
1420
                }
×
1421

1422
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:])
69✔
1423
                if chanDetails != nil {
71✔
1424
                        // Channel is already registered.
2✔
1425
                        return ErrChannelAlreadyRegistered
2✔
1426
                }
2✔
1427

1428
                chanDetails, err := chanDetailsBkt.CreateBucket(chanID[:])
67✔
1429
                if err != nil {
67✔
1430
                        return err
×
1431
                }
×
1432

1433
                // Get the channel-id-index bucket.
1434
                indexBkt := tx.ReadWriteBucket(cChanIDIndexBkt)
67✔
1435
                if indexBkt == nil {
67✔
1436
                        return ErrUninitializedDB
×
1437
                }
×
1438

1439
                // Request the next unique id from the bucket.
1440
                nextSeq, err := indexBkt.NextSequence()
67✔
1441
                if err != nil {
67✔
1442
                        return err
×
1443
                }
×
1444

1445
                // Use BigSize encoding to encode the db-assigned index.
1446
                newIndex, err := writeBigSize(nextSeq)
67✔
1447
                if err != nil {
67✔
1448
                        return err
×
1449
                }
×
1450

1451
                // Add the new db-assigned ID to channel-ID pair.
1452
                err = indexBkt.Put(newIndex, chanID[:])
67✔
1453
                if err != nil {
67✔
1454
                        return err
×
1455
                }
×
1456

1457
                // Add the db-assigned ID to the channel's channel details
1458
                // bucket under the cChanDBID key.
1459
                err = chanDetails.Put(cChanDBID, newIndex)
67✔
1460
                if err != nil {
67✔
1461
                        return err
×
1462
                }
×
1463

1464
                summary := ClientChanSummary{
67✔
1465
                        SweepPkScript: sweepPkScript,
67✔
1466
                }
67✔
1467

67✔
1468
                return putChanSummary(chanDetails, &summary)
67✔
1469
        }, func() {})
69✔
1470
}
1471

1472
// MarkBackupIneligible records that the state identified by the (channel id,
1473
// commit height) tuple was ineligible for being backed up under the current
1474
// policy. This state can be retried later under a different policy.
1475
func (c *ClientDB) MarkBackupIneligible(chanID lnwire.ChannelID,
1476
        commitHeight uint64) error {
5✔
1477

5✔
1478
        return nil
5✔
1479
}
5✔
1480

1481
// ListClosableSessions fetches and returns the IDs for all sessions marked as
1482
// closable.
1483
func (c *ClientDB) ListClosableSessions() (map[SessionID]uint32, error) {
61✔
1484
        sessions := make(map[SessionID]uint32)
61✔
1485
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
122✔
1486
                csBkt := tx.ReadBucket(cClosableSessionsBkt)
61✔
1487
                if csBkt == nil {
61✔
1488
                        return ErrUninitializedDB
×
1489
                }
×
1490

1491
                sessIDIndexBkt := tx.ReadBucket(cSessionIDIndexBkt)
61✔
1492
                if sessIDIndexBkt == nil {
61✔
1493
                        return ErrUninitializedDB
×
1494
                }
×
1495

1496
                return csBkt.ForEach(func(dbIDBytes, heightBytes []byte) error {
81✔
1497
                        dbID, err := readBigSize(dbIDBytes)
20✔
1498
                        if err != nil {
20✔
1499
                                return err
×
1500
                        }
×
1501

1502
                        sessID, err := getRealSessionID(sessIDIndexBkt, dbID)
20✔
1503
                        if err != nil {
20✔
1504
                                return err
×
1505
                        }
×
1506

1507
                        sessions[*sessID] = byteOrder.Uint32(heightBytes)
20✔
1508

20✔
1509
                        return nil
20✔
1510
                })
1511
        }, func() {
61✔
1512
                sessions = make(map[SessionID]uint32)
61✔
1513
        })
61✔
1514
        if err != nil {
61✔
1515
                return nil, err
×
1516
        }
×
1517

1518
        return sessions, nil
61✔
1519
}
1520

1521
// DeleteSession can be called when a session should be deleted from the DB.
1522
// All references to the session will also be deleted from the DB. Note that a
1523
// session will only be deleted if was previously marked as closable.
1524
func (c *ClientDB) DeleteSession(id SessionID) error {
14✔
1525
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
28✔
1526
                sessionsBkt := tx.ReadWriteBucket(cSessionBkt)
14✔
1527
                if sessionsBkt == nil {
14✔
1528
                        return ErrUninitializedDB
×
1529
                }
×
1530

1531
                closableBkt := tx.ReadWriteBucket(cClosableSessionsBkt)
14✔
1532
                if closableBkt == nil {
14✔
1533
                        return ErrUninitializedDB
×
1534
                }
×
1535

1536
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
14✔
1537
                if chanDetailsBkt == nil {
14✔
1538
                        return ErrUninitializedDB
×
1539
                }
×
1540

1541
                sessIDIndexBkt := tx.ReadWriteBucket(cSessionIDIndexBkt)
14✔
1542
                if sessIDIndexBkt == nil {
14✔
1543
                        return ErrUninitializedDB
×
1544
                }
×
1545

1546
                chanIDIndexBkt := tx.ReadWriteBucket(cChanIDIndexBkt)
14✔
1547
                if chanIDIndexBkt == nil {
14✔
1548
                        return ErrUninitializedDB
×
1549
                }
×
1550

1551
                towerToSessBkt := tx.ReadWriteBucket(cTowerToSessionIndexBkt)
14✔
1552
                if towerToSessBkt == nil {
14✔
1553
                        return ErrUninitializedDB
×
1554
                }
×
1555

1556
                // Get the sub-bucket for this session ID. If it does not exist
1557
                // then the session has already been deleted and so our work is
1558
                // done.
1559
                sessionBkt := sessionsBkt.NestedReadBucket(id[:])
14✔
1560
                if sessionBkt == nil {
14✔
1561
                        return nil
×
1562
                }
×
1563

1564
                _, dbIDBytes, err := getDBSessionID(sessionsBkt, id)
14✔
1565
                if err != nil {
14✔
1566
                        return err
×
1567
                }
×
1568

1569
                // First we check if the session has actually been marked as
1570
                // closable.
1571
                if closableBkt.Get(dbIDBytes) == nil {
18✔
1572
                        return ErrSessionNotClosable
4✔
1573
                }
4✔
1574

1575
                sess, err := getClientSessionBody(sessionsBkt, id[:])
10✔
1576
                if err != nil {
10✔
1577
                        return err
×
1578
                }
×
1579

1580
                // Delete from the tower-to-sessionID index.
1581
                towerIndexBkt := towerToSessBkt.NestedReadWriteBucket(
10✔
1582
                        sess.TowerID.Bytes(),
10✔
1583
                )
10✔
1584
                if towerIndexBkt == nil {
10✔
1585
                        return fmt.Errorf("no entry in the tower-to-session "+
×
1586
                                "index found for tower ID %v", sess.TowerID)
×
1587
                }
×
1588

1589
                err = towerIndexBkt.Delete(id[:])
10✔
1590
                if err != nil {
10✔
1591
                        return err
×
1592
                }
×
1593

1594
                // Delete entry from session ID index.
1595
                err = sessIDIndexBkt.Delete(dbIDBytes)
10✔
1596
                if err != nil {
10✔
1597
                        return err
×
1598
                }
×
1599

1600
                // Delete the entry from the closable sessions index.
1601
                err = closableBkt.Delete(dbIDBytes)
10✔
1602
                if err != nil {
10✔
1603
                        return err
×
1604
                }
×
1605

1606
                ackRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
10✔
1607

10✔
1608
                // There is a small chance that the session only contains rogue
10✔
1609
                // updates. In that case, there will be no ack-ranges index but
10✔
1610
                // the rogue update count will be equal the MaxUpdates.
10✔
1611
                rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
10✔
1612
                if len(rogueCountBytes) != 0 {
10✔
1613
                        rogueCount, err := readBigSize(rogueCountBytes)
×
1614
                        if err != nil {
×
1615
                                return err
×
1616
                        }
×
1617

1618
                        maxUpdates := sess.ClientSessionBody.Policy.MaxUpdates
×
1619
                        if rogueCount == uint64(maxUpdates) {
×
1620
                                // Do a sanity check to ensure that the acked
×
1621
                                // ranges bucket does not exist in this case.
×
1622
                                if ackRanges != nil {
×
1623
                                        return fmt.Errorf("acked updates "+
×
1624
                                                "exist for session with a "+
×
1625
                                                "max-updates(%d) rogue count",
×
1626
                                                rogueCount)
×
1627
                                }
×
1628

1629
                                return sessionsBkt.DeleteNestedBucket(id[:])
×
1630
                        }
1631
                }
1632

1633
                // A session would only be considered closable if it was
1634
                // exhausted. Meaning that it should not be the case that it has
1635
                // no acked-updates.
1636
                if ackRanges == nil {
10✔
1637
                        return fmt.Errorf("cannot delete session %s since it "+
×
1638
                                "is not yet exhausted", id)
×
1639
                }
×
1640

1641
                // For each of the channels, delete the session ID entry.
1642
                err = ackRanges.ForEach(func(chanDBID, _ []byte) error {
27✔
1643
                        chanDBIDInt, err := readBigSize(chanDBID)
17✔
1644
                        if err != nil {
17✔
1645
                                return err
×
1646
                        }
×
1647

1648
                        chanID, err := getRealChannelID(
17✔
1649
                                chanIDIndexBkt, chanDBIDInt,
17✔
1650
                        )
17✔
1651
                        if err != nil {
17✔
1652
                                return err
×
1653
                        }
×
1654

1655
                        chanDetails := chanDetailsBkt.NestedReadWriteBucket(
17✔
1656
                                chanID[:],
17✔
1657
                        )
17✔
1658
                        if chanDetails == nil {
17✔
1659
                                return ErrChannelNotRegistered
×
1660
                        }
×
1661

1662
                        chanSessions := chanDetails.NestedReadWriteBucket(
17✔
1663
                                cChanSessions,
17✔
1664
                        )
17✔
1665
                        if chanSessions == nil {
17✔
1666
                                return fmt.Errorf("no session list found for "+
×
1667
                                        "channel %s", chanID)
×
1668
                        }
×
1669

1670
                        // Check that this session was actually listed in the
1671
                        // session list for this channel.
1672
                        if len(chanSessions.Get(dbIDBytes)) == 0 {
17✔
1673
                                return fmt.Errorf("session %s not found in "+
×
1674
                                        "the session list for channel %s", id,
×
1675
                                        chanID)
×
1676
                        }
×
1677

1678
                        // If it was, then delete it.
1679
                        err = chanSessions.Delete(dbIDBytes)
17✔
1680
                        if err != nil {
17✔
1681
                                return err
×
1682
                        }
×
1683

1684
                        // If this was the last session for this channel, we can
1685
                        // now delete the channel details for this channel
1686
                        // completely.
1687
                        err = chanSessions.ForEach(func(_, _ []byte) error {
18✔
1688
                                return errChannelHasMoreSessions
1✔
1689
                        })
1✔
1690
                        if errors.Is(err, errChannelHasMoreSessions) {
18✔
1691
                                return nil
1✔
1692
                        } else if err != nil {
17✔
1693
                                return err
×
1694
                        }
×
1695

1696
                        // Delete the channel's entry from the channel-id-index.
1697
                        dbID := chanDetails.Get(cChanDBID)
16✔
1698
                        err = chanIDIndexBkt.Delete(dbID)
16✔
1699
                        if err != nil {
16✔
1700
                                return err
×
1701
                        }
×
1702

1703
                        // Delete the channel details.
1704
                        return chanDetailsBkt.DeleteNestedBucket(chanID[:])
16✔
1705
                })
1706
                if err != nil {
10✔
1707
                        return err
×
1708
                }
×
1709

1710
                // Delete the actual session.
1711
                return sessionsBkt.DeleteNestedBucket(id[:])
10✔
1712
        }, func() {})
14✔
1713
}
1714

1715
// MarkChannelClosed will mark a registered channel as closed by setting its
1716
// closed-height as the given block height. It returns a list of session IDs for
1717
// sessions that are now considered closable due to the close of this channel.
1718
// The details for this channel will be deleted from the DB if there are no more
1719
// sessions in the DB that contain updates for this channel.
1720
func (c *ClientDB) MarkChannelClosed(chanID lnwire.ChannelID,
1721
        blockHeight uint32) ([]SessionID, error) {
23✔
1722

23✔
1723
        var closableSessions []SessionID
23✔
1724
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
46✔
1725
                sessionsBkt := tx.ReadBucket(cSessionBkt)
23✔
1726
                if sessionsBkt == nil {
23✔
1727
                        return ErrUninitializedDB
×
1728
                }
×
1729

1730
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
23✔
1731
                if chanDetailsBkt == nil {
23✔
1732
                        return ErrUninitializedDB
×
1733
                }
×
1734

1735
                closableSessBkt := tx.ReadWriteBucket(cClosableSessionsBkt)
23✔
1736
                if closableSessBkt == nil {
23✔
1737
                        return ErrUninitializedDB
×
1738
                }
×
1739

1740
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
23✔
1741
                if chanIDIndexBkt == nil {
23✔
1742
                        return ErrUninitializedDB
×
1743
                }
×
1744

1745
                sessIDIndexBkt := tx.ReadBucket(cSessionIDIndexBkt)
23✔
1746
                if sessIDIndexBkt == nil {
23✔
1747
                        return ErrUninitializedDB
×
1748
                }
×
1749

1750
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(chanID[:])
23✔
1751
                if chanDetails == nil {
25✔
1752
                        return ErrChannelNotRegistered
2✔
1753
                }
2✔
1754

1755
                // If there are no sessions for this channel, the channel
1756
                // details can be deleted.
1757
                chanSessIDsBkt := chanDetails.NestedReadBucket(cChanSessions)
21✔
1758
                if chanSessIDsBkt == nil {
25✔
1759
                        return chanDetailsBkt.DeleteNestedBucket(chanID[:])
4✔
1760
                }
4✔
1761

1762
                // Otherwise, mark the channel as closed.
1763
                var height [4]byte
17✔
1764
                byteOrder.PutUint32(height[:], blockHeight)
17✔
1765

17✔
1766
                err := chanDetails.Put(cChanClosedHeight, height[:])
17✔
1767
                if err != nil {
17✔
1768
                        return err
×
1769
                }
×
1770

1771
                // Now iterate through all the sessions of the channel to check
1772
                // if any of them are closeable.
1773
                return chanSessIDsBkt.ForEach(func(sessDBID, _ []byte) error {
35✔
1774
                        sessDBIDInt, err := readBigSize(sessDBID)
18✔
1775
                        if err != nil {
18✔
1776
                                return err
×
1777
                        }
×
1778

1779
                        // Use the session-ID index to get the real session ID.
1780
                        sID, err := getRealSessionID(
18✔
1781
                                sessIDIndexBkt, sessDBIDInt,
18✔
1782
                        )
18✔
1783
                        if err != nil {
18✔
1784
                                return err
×
1785
                        }
×
1786

1787
                        isClosable, err := isSessionClosable(
18✔
1788
                                sessionsBkt, chanDetailsBkt, chanIDIndexBkt,
18✔
1789
                                sID,
18✔
1790
                        )
18✔
1791
                        if err != nil {
18✔
1792
                                return err
×
1793
                        }
×
1794

1795
                        if !isClosable {
26✔
1796
                                return nil
8✔
1797
                        }
8✔
1798

1799
                        // Add session to "closableSessions" list and add the
1800
                        // block height that this last channel was closed in.
1801
                        // This will be used in future to determine when we
1802
                        // should delete the session.
1803
                        var height [4]byte
10✔
1804
                        byteOrder.PutUint32(height[:], blockHeight)
10✔
1805
                        err = closableSessBkt.Put(sessDBID, height[:])
10✔
1806
                        if err != nil {
10✔
1807
                                return err
×
1808
                        }
×
1809

1810
                        closableSessions = append(closableSessions, *sID)
10✔
1811

10✔
1812
                        return nil
10✔
1813
                })
1814
        }, func() {
23✔
1815
                closableSessions = nil
23✔
1816
        })
23✔
1817
        if err != nil {
25✔
1818
                return nil, err
2✔
1819
        }
2✔
1820

1821
        return closableSessions, nil
21✔
1822
}
1823

1824
// isSessionClosable returns true if a session is considered closable. A session
1825
// is considered closable only if all the following points are true:
1826
//  1. It has no un-acked updates.
1827
//  2. It is exhausted (ie it can't accept any more updates) OR it has been
1828
//     marked as terminal.
1829
//  3. All the channels that it has acked updates for are closed.
1830
func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket,
1831
        id *SessionID) (bool, error) {
18✔
1832

18✔
1833
        sessBkt := sessionsBkt.NestedReadBucket(id[:])
18✔
1834
        if sessBkt == nil {
18✔
1835
                return false, ErrSessionNotFound
×
1836
        }
×
1837

1838
        // Since the DeleteCommittedUpdates method deletes the cSessionCommits
1839
        // bucket in one go, it is possible for the session to be closable even
1840
        // if this bucket no longer exists.
1841
        commitsBkt := sessBkt.NestedReadBucket(cSessionCommits)
18✔
1842
        if commitsBkt != nil {
34✔
1843
                // If the session has any un-acked updates, then it is not yet
16✔
1844
                // closable.
16✔
1845
                err := commitsBkt.ForEach(func(_, _ []byte) error {
18✔
1846
                        return ErrSessionHasUnackedUpdates
2✔
1847
                })
2✔
1848
                if errors.Is(err, ErrSessionHasUnackedUpdates) {
18✔
1849
                        return false, nil
2✔
1850
                } else if err != nil {
16✔
1851
                        return false, err
×
1852
                }
×
1853
        }
1854

1855
        session, err := getClientSessionBody(sessionsBkt, id[:])
16✔
1856
        if err != nil {
16✔
1857
                return false, err
×
1858
        }
×
1859

1860
        isTerminal := session.Status == CSessionTerminal
16✔
1861

16✔
1862
        // We have already checked that the session has no more committed
16✔
1863
        // updates. So now we can check if the session is exhausted or has a
16✔
1864
        // terminal state.
16✔
1865
        if !isTerminal && session.SeqNum < session.Policy.MaxUpdates {
20✔
1866
                // If the session is not yet exhausted, and it is not yet in a
4✔
1867
                // terminal state then it is not yet closable.
4✔
1868
                return false, nil
4✔
1869
        }
4✔
1870

1871
        // Either the acked-update bucket should exist _or_ the rogue update
1872
        // count must be equal to the session's MaxUpdates value, otherwise
1873
        // something is wrong because the above check ensures that the session
1874
        // has been exhausted.
1875
        rogueCountBytes := sessBkt.Get(cSessionRogueUpdateCount)
12✔
1876
        if len(rogueCountBytes) != 0 {
12✔
1877
                rogueCount, err := readBigSize(rogueCountBytes)
×
1878
                if err != nil {
×
1879
                        return false, err
×
1880
                }
×
1881

1882
                if rogueCount == uint64(session.Policy.MaxUpdates) {
×
1883
                        return true, nil
×
1884
                }
×
1885
        }
1886

1887
        ackedRangeBkt := sessBkt.NestedReadBucket(cSessionAckRangeIndex)
12✔
1888
        if ackedRangeBkt == nil {
12✔
1889
                if isTerminal {
×
1890
                        return true, nil
×
1891
                }
×
1892

1893
                // If the session has no acked-updates, and it is not in a
1894
                // terminal state then something is wrong since the above check
1895
                // ensures that this session has been exhausted meaning that it
1896
                // should have MaxUpdates acked updates.
1897
                return false, fmt.Errorf("no acked-updates found for "+
×
1898
                        "exhausted session %s", id)
×
1899
        }
1900

1901
        // Iterate over each of the channels that the session has acked-updates
1902
        // for. If any of those channels are not closed, then the session is
1903
        // not yet closable.
1904
        err = ackedRangeBkt.ForEach(func(dbChanID, _ []byte) error {
37✔
1905
                dbChanIDInt, err := readBigSize(dbChanID)
25✔
1906
                if err != nil {
25✔
1907
                        return err
×
1908
                }
×
1909

1910
                chanID, err := getRealChannelID(chanIDIndexBkt, dbChanIDInt)
25✔
1911
                if err != nil {
25✔
1912
                        return err
×
1913
                }
×
1914

1915
                // Get the channel details bucket for the channel.
1916
                chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
25✔
1917
                if chanDetails == nil {
25✔
1918
                        return fmt.Errorf("no channel details found for "+
×
1919
                                "channel %s referenced by session %s", chanID,
×
1920
                                id)
×
1921
                }
×
1922

1923
                // If a closed height has been set, then the channel is closed.
1924
                closedHeight := chanDetails.Get(cChanClosedHeight)
25✔
1925
                if len(closedHeight) > 0 {
48✔
1926
                        return nil
23✔
1927
                }
23✔
1928

1929
                // Otherwise, the channel is not yet closed meaning that the
1930
                // session is not yet closable. We break the ForEach by
1931
                // returning an error to indicate this.
1932
                return errSessionHasOpenChannels
2✔
1933
        })
1934
        if errors.Is(err, errSessionHasOpenChannels) {
14✔
1935
                return false, nil
2✔
1936
        } else if err != nil {
12✔
1937
                return false, err
×
1938
        }
×
1939

1940
        return true, nil
10✔
1941
}
1942

1943
// CommitUpdate persists the CommittedUpdate provided in the slot for (session,
1944
// seqNum). This allows the client to retransmit this update on startup.
1945
func (c *ClientDB) CommitUpdate(id *SessionID,
1946
        update *CommittedUpdate) (uint16, error) {
746✔
1947

746✔
1948
        var lastApplied uint16
746✔
1949
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
1,492✔
1950
                sessions := tx.ReadWriteBucket(cSessionBkt)
746✔
1951
                if sessions == nil {
746✔
1952
                        return ErrUninitializedDB
×
1953
                }
×
1954

1955
                // We'll only load the ClientSession body for performance, since
1956
                // we primarily need to inspect its SeqNum and TowerLastApplied
1957
                // fields. The CommittedUpdates will be modified on disk
1958
                // directly.
1959
                session, err := getClientSessionBody(sessions, id[:])
746✔
1960
                if err != nil {
748✔
1961
                        return err
2✔
1962
                }
2✔
1963

1964
                // Can't fail if the above didn't fail.
1965
                sessionBkt := sessions.NestedReadWriteBucket(id[:])
744✔
1966

744✔
1967
                // Ensure the session commits sub-bucket is initialized.
744✔
1968
                sessionCommits, err := sessionBkt.CreateBucketIfNotExists(
744✔
1969
                        cSessionCommits,
744✔
1970
                )
744✔
1971
                if err != nil {
744✔
1972
                        return err
×
1973
                }
×
1974

1975
                var seqNumBuf [2]byte
744✔
1976
                byteOrder.PutUint16(seqNumBuf[:], update.SeqNum)
744✔
1977

744✔
1978
                // Check to see if a committed update already exists for this
744✔
1979
                // sequence number.
744✔
1980
                committedUpdateBytes := sessionCommits.Get(seqNumBuf[:])
744✔
1981
                if committedUpdateBytes != nil {
962✔
1982
                        var dbUpdate CommittedUpdate
218✔
1983
                        err := dbUpdate.Decode(
218✔
1984
                                bytes.NewReader(committedUpdateBytes),
218✔
1985
                        )
218✔
1986
                        if err != nil {
218✔
1987
                                return err
×
1988
                        }
×
1989

1990
                        // If an existing committed update has a different hint,
1991
                        // we'll reject this newer update.
1992
                        if dbUpdate.Hint != update.Hint {
220✔
1993
                                return ErrUpdateAlreadyCommitted
2✔
1994
                        }
2✔
1995

1996
                        // Otherwise, capture the last applied value and
1997
                        // succeed.
1998
                        lastApplied = session.TowerLastApplied
216✔
1999
                        return nil
216✔
2000
                }
2001

2002
                // There's no committed update for this sequence number, ensure
2003
                // that we are committing the next unallocated one.
2004
                if update.SeqNum != session.SeqNum+1 {
528✔
2005
                        return ErrCommitUnorderedUpdate
2✔
2006
                }
2✔
2007

2008
                // Increment the session's sequence number and store the updated
2009
                // client session.
2010
                //
2011
                // TODO(conner): split out seqnum and last applied own bucket to
2012
                // eliminate serialization of full struct during CommitUpdate?
2013
                // Can also read/write directly to byes [:2] without migration.
2014
                session.SeqNum++
524✔
2015
                err = putClientSessionBody(sessionBkt, session)
524✔
2016
                if err != nil {
524✔
2017
                        return err
×
2018
                }
×
2019

2020
                // Encode and store the committed update in the sessionCommits
2021
                // sub-bucket under the requested sequence number.
2022
                var b bytes.Buffer
524✔
2023
                err = update.Encode(&b)
524✔
2024
                if err != nil {
524✔
2025
                        return err
×
2026
                }
×
2027

2028
                err = sessionCommits.Put(seqNumBuf[:], b.Bytes())
524✔
2029
                if err != nil {
524✔
2030
                        return err
×
2031
                }
×
2032

2033
                // Update the channel's max commitment height if needed.
2034
                err = maybeUpdateMaxCommitHeight(tx, update.BackupID)
524✔
2035
                if err != nil {
524✔
2036
                        return err
×
2037
                }
×
2038

2039
                // Finally, capture the session's last applied value so it can
2040
                // be sent in the next state update to the tower.
2041
                lastApplied = session.TowerLastApplied
524✔
2042

524✔
2043
                return nil
524✔
2044
        }, func() {
746✔
2045
                lastApplied = 0
746✔
2046
        })
746✔
2047
        if err != nil {
752✔
2048
                return 0, err
6✔
2049
        }
6✔
2050

2051
        return lastApplied, nil
740✔
2052
}
2053

2054
// AckUpdate persists an acknowledgment for a given (session, seqnum) pair. This
2055
// removes the update from the set of committed updates, and validates the
2056
// lastApplied value returned from the tower.
2057
func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
2058
        lastApplied uint16) error {
528✔
2059

528✔
2060
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
1,056✔
2061
                sessions := tx.ReadWriteBucket(cSessionBkt)
528✔
2062
                if sessions == nil {
528✔
2063
                        return ErrUninitializedDB
×
2064
                }
×
2065

2066
                chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
528✔
2067
                if chanDetailsBkt == nil {
528✔
2068
                        return ErrUninitializedDB
×
2069
                }
×
2070

2071
                // We'll only load the ClientSession body for performance, since
2072
                // we primarily need to inspect its SeqNum and TowerLastApplied
2073
                // fields. The CommittedUpdates and AckedUpdates will be
2074
                // modified on disk directly.
2075
                session, err := getClientSessionBody(sessions, id[:])
528✔
2076
                if err != nil {
530✔
2077
                        return err
2✔
2078
                }
2✔
2079

2080
                // If the tower has acked a sequence number beyond our highest
2081
                // sequence number, fail.
2082
                if lastApplied > session.SeqNum {
530✔
2083
                        return ErrUnallocatedLastApplied
4✔
2084
                }
4✔
2085

2086
                // If the tower acked with a lower sequence number than it gave
2087
                // us prior, fail.
2088
                if lastApplied < session.TowerLastApplied {
526✔
2089
                        return ErrLastAppliedReversion
4✔
2090
                }
4✔
2091

2092
                // TODO(conner): split out seqnum and last applied own bucket to
2093
                // eliminate serialization of full struct during AckUpdate?  Can
2094
                // also read/write directly to byes [2:4] without migration.
2095
                session.TowerLastApplied = lastApplied
518✔
2096

518✔
2097
                // Can't fail because getClientSession succeeded.
518✔
2098
                sessionBkt := sessions.NestedReadWriteBucket(id[:])
518✔
2099

518✔
2100
                // Write the client session with the updated last applied value.
518✔
2101
                err = putClientSessionBody(sessionBkt, session)
518✔
2102
                if err != nil {
518✔
2103
                        return err
×
2104
                }
×
2105

2106
                // If the commits sub-bucket doesn't exist, there can't possibly
2107
                // be a corresponding committed update to remove.
2108
                sessionCommits := sessionBkt.NestedReadWriteBucket(
518✔
2109
                        cSessionCommits,
518✔
2110
                )
518✔
2111
                if sessionCommits == nil {
520✔
2112
                        return ErrCommittedUpdateNotFound
2✔
2113
                }
2✔
2114

2115
                var seqNumBuf [2]byte
516✔
2116
                byteOrder.PutUint16(seqNumBuf[:], seqNum)
516✔
2117

516✔
2118
                // Assert that a committed update exists for this sequence
516✔
2119
                // number.
516✔
2120
                committedUpdateBytes := sessionCommits.Get(seqNumBuf[:])
516✔
2121
                if committedUpdateBytes == nil {
520✔
2122
                        return ErrCommittedUpdateNotFound
4✔
2123
                }
4✔
2124

2125
                var committedUpdate CommittedUpdate
512✔
2126
                err = committedUpdate.Decode(
512✔
2127
                        bytes.NewReader(committedUpdateBytes),
512✔
2128
                )
512✔
2129
                if err != nil {
512✔
2130
                        return err
×
2131
                }
×
2132

2133
                // Remove the corresponding committed update.
2134
                err = sessionCommits.Delete(seqNumBuf[:])
512✔
2135
                if err != nil {
512✔
2136
                        return err
×
2137
                }
×
2138

2139
                dbSessionID, dbSessIDBytes, err := getDBSessionID(sessions, *id)
512✔
2140
                if err != nil {
512✔
2141
                        return err
×
2142
                }
×
2143

2144
                chanID := committedUpdate.BackupID.ChanID
512✔
2145
                height := committedUpdate.BackupID.CommitHeight
512✔
2146

512✔
2147
                // Get the DB representation of the channel-ID. There is a
512✔
2148
                // chance that the channel corresponding to this update has been
512✔
2149
                // closed and that the details for this channel no longer exist
512✔
2150
                // in the tower client DB. In that case, we consider this a
512✔
2151
                // rogue update and all we do is make sure to keep track of the
512✔
2152
                // number of rogue updates for this session.
512✔
2153
                _, dbChanIDBytes, err := getDBChanID(chanDetailsBkt, chanID)
512✔
2154
                if errors.Is(err, ErrChannelNotRegistered) {
526✔
2155
                        var (
14✔
2156
                                count uint64
14✔
2157
                                err   error
14✔
2158
                        )
14✔
2159

14✔
2160
                        rogueCountBytes := sessionBkt.Get(
14✔
2161
                                cSessionRogueUpdateCount,
14✔
2162
                        )
14✔
2163
                        if len(rogueCountBytes) != 0 {
25✔
2164
                                count, err = readBigSize(rogueCountBytes)
11✔
2165
                                if err != nil {
11✔
2166
                                        return err
×
2167
                                }
×
2168
                        }
2169

2170
                        rogueCount := count + 1
14✔
2171
                        countBytes, err := writeBigSize(rogueCount)
14✔
2172
                        if err != nil {
14✔
2173
                                return err
×
2174
                        }
×
2175

2176
                        err = sessionBkt.Put(
14✔
2177
                                cSessionRogueUpdateCount, countBytes,
14✔
2178
                        )
14✔
2179
                        if err != nil {
14✔
2180
                                return err
×
2181
                        }
×
2182

2183
                        // In the rare chance that this session only has rogue
2184
                        // updates, we check here if the count is equal to the
2185
                        // MaxUpdate of the session. If it is, then we mark the
2186
                        // session as closable.
2187
                        if rogueCount != uint64(session.Policy.MaxUpdates) {
26✔
2188
                                return nil
12✔
2189
                        }
12✔
2190

2191
                        // Before we mark the session as closable, we do a
2192
                        // sanity check to ensure that this session has no
2193
                        // acked-update index.
2194
                        sessionAckRanges := sessionBkt.NestedReadBucket(
2✔
2195
                                cSessionAckRangeIndex,
2✔
2196
                        )
2✔
2197
                        if sessionAckRanges != nil {
2✔
2198
                                return fmt.Errorf("session(%s) has an "+
×
2199
                                        "acked ranges index but has a rogue "+
×
2200
                                        "count indicating saturation",
×
2201
                                        session.ID)
×
2202
                        }
×
2203

2204
                        closableSessBkt := tx.ReadWriteBucket(
2✔
2205
                                cClosableSessionsBkt,
2✔
2206
                        )
2✔
2207
                        if closableSessBkt == nil {
2✔
2208
                                return ErrUninitializedDB
×
2209
                        }
×
2210

2211
                        var height [4]byte
2✔
2212
                        byteOrder.PutUint32(height[:], 0)
2✔
2213

2✔
2214
                        return closableSessBkt.Put(dbSessIDBytes, height[:])
2✔
2215
                } else if err != nil {
498✔
2216
                        return err
×
2217
                }
×
2218

2219
                // Get the ranges write bucket before getting the range index to
2220
                // ensure that the session acks sub-bucket is initialized, so
2221
                // that we can insert an entry.
2222
                rangesBkt, err := getRangesWriteBucket(
498✔
2223
                        sessionBkt, dbChanIDBytes,
498✔
2224
                )
498✔
2225
                if err != nil {
498✔
2226
                        return err
×
2227
                }
×
2228

2229
                chanDetails := chanDetailsBkt.NestedReadWriteBucket(
498✔
2230
                        committedUpdate.BackupID.ChanID[:],
498✔
2231
                )
498✔
2232
                if chanDetails == nil {
498✔
2233
                        return ErrChannelNotRegistered
×
2234
                }
×
2235

2236
                err = putChannelToSessionMapping(chanDetails, dbSessionID)
498✔
2237
                if err != nil {
498✔
2238
                        return err
×
2239
                }
×
2240

2241
                // Get the range index for the given session-channel pair.
2242
                index, err := c.getRangeIndex(tx, *id, chanID)
498✔
2243
                if err != nil {
498✔
2244
                        return err
×
2245
                }
×
2246

2247
                return index.Add(height, rangesBkt)
498✔
2248
        }, func() {})
528✔
2249
}
2250

2251
// GetDBQueue returns a BackupID Queue instance under the given namespace.
2252
func (c *ClientDB) GetDBQueue(namespace []byte) Queue[*BackupID] {
40✔
2253
        return NewQueueDB(
40✔
2254
                c.db, namespace, func() *BackupID {
323,603✔
2255
                        return &BackupID{}
323,563✔
2256
                }, func(tx kvdb.RwTx, item *BackupID) error {
490,760✔
2257
                        return maybeUpdateMaxCommitHeight(tx, *item)
167,197✔
2258
                },
167,197✔
2259
        )
2260
}
2261

2262
// TerminateSession sets the given session's status to CSessionTerminal meaning
2263
// that it will not be usable again. An error will be returned if the given
2264
// session still has un-acked updates that should be attended to.
2265
func (c *ClientDB) TerminateSession(id SessionID) error {
6✔
2266
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
12✔
2267
                sessions := tx.ReadWriteBucket(cSessionBkt)
6✔
2268
                if sessions == nil {
6✔
2269
                        return ErrUninitializedDB
×
2270
                }
×
2271

2272
                sessionsBkt := tx.ReadBucket(cSessionBkt)
6✔
2273
                if sessionsBkt == nil {
6✔
2274
                        return ErrUninitializedDB
×
2275
                }
×
2276

2277
                chanIDIndexBkt := tx.ReadBucket(cChanIDIndexBkt)
6✔
2278
                if chanIDIndexBkt == nil {
6✔
2279
                        return ErrUninitializedDB
×
2280
                }
×
2281

2282
                // Collect any un-acked updates for this session.
2283
                committedUpdateCount := make(map[SessionID]uint16)
6✔
2284
                perCommittedUpdate := func(s *ClientSession,
6✔
2285
                        _ *CommittedUpdate) {
9✔
2286

3✔
2287
                        committedUpdateCount[s.ID]++
3✔
2288
                }
3✔
2289

2290
                session, err := c.getClientSession(
6✔
2291
                        sessionsBkt, chanIDIndexBkt, id[:],
6✔
2292
                        WithPerCommittedUpdate(perCommittedUpdate),
6✔
2293
                )
6✔
2294
                if err != nil {
6✔
2295
                        return err
×
2296
                }
×
2297

2298
                // If there are any un-acked updates for this session then
2299
                // we don't allow the change of status as these updates must
2300
                // first be dealt with somehow.
2301
                if committedUpdateCount[id] > 0 {
9✔
2302
                        return ErrSessionHasUnackedUpdates
3✔
2303
                }
3✔
2304

2305
                return markSessionStatus(sessions, session, CSessionTerminal)
3✔
2306
        }, func() {})
6✔
2307
}
2308

2309
// DeleteCommittedUpdates deletes all the committed updates for the given
2310
// session.
2311
func (c *ClientDB) DeleteCommittedUpdates(id *SessionID) error {
18✔
2312
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
36✔
2313
                sessions := tx.ReadWriteBucket(cSessionBkt)
18✔
2314
                if sessions == nil {
18✔
2315
                        return ErrUninitializedDB
×
2316
                }
×
2317

2318
                sessionBkt := sessions.NestedReadWriteBucket(id[:])
18✔
2319
                if sessionBkt == nil {
18✔
2320
                        return fmt.Errorf("session bucket %s not found",
×
2321
                                id.String())
×
2322
                }
×
2323

2324
                // If the commits sub-bucket doesn't exist, there can't possibly
2325
                // be corresponding updates to remove.
2326
                sessionCommits := sessionBkt.NestedReadWriteBucket(
18✔
2327
                        cSessionCommits,
18✔
2328
                )
18✔
2329
                if sessionCommits == nil {
20✔
2330
                        return nil
2✔
2331
                }
2✔
2332

2333
                // errFoundUpdates is an error we will use to exit early from
2334
                // the ForEach loop. The return of this error means that at
2335
                // least one committed update exists.
2336
                var errFoundUpdates = fmt.Errorf("found committed updates")
16✔
2337
                err := sessionCommits.ForEach(func(k, v []byte) error {
26✔
2338
                        return errFoundUpdates
10✔
2339
                })
10✔
2340
                switch {
16✔
2341
                // If the errFoundUpdates signal error was returned then there
2342
                // are some updates that need to be deleted.
2343
                case errors.Is(err, errFoundUpdates):
10✔
2344

2345
                // If no error is returned then the ForEach call back was never
2346
                // entered meaning that there are no un-acked committed updates.
2347
                // So we can exit now as there is nothing left to do.
2348
                case err == nil:
6✔
2349
                        return nil
6✔
2350

2351
                // If an expected error is returned, return that error.
2352
                default:
×
2353
                        return err
×
2354
                }
2355

2356
                session, err := getClientSessionBody(sessions, id[:])
10✔
2357
                if err != nil {
10✔
2358
                        return err
×
2359
                }
×
2360

2361
                // Once we delete a committed update from the session, the
2362
                // SeqNum of the session will be incorrect and so the session
2363
                // should be marked as terminal.
2364
                session.Status = CSessionTerminal
10✔
2365
                err = putClientSessionBody(sessionBkt, session)
10✔
2366
                if err != nil {
10✔
2367
                        return err
×
2368
                }
×
2369

2370
                // Delete all the committed updates in one go by deleting the
2371
                // session commits bucket.
2372
                return sessionBkt.DeleteNestedBucket(cSessionCommits)
10✔
2373
        }, func() {})
18✔
2374
}
2375

2376
// putChannelToSessionMapping adds the given session ID to a channel's
2377
// cChanSessions bucket.
2378
func putChannelToSessionMapping(chanDetails kvdb.RwBucket,
2379
        dbSessID uint64) error {
498✔
2380

498✔
2381
        chanSessIDsBkt, err := chanDetails.CreateBucketIfNotExists(
498✔
2382
                cChanSessions,
498✔
2383
        )
498✔
2384
        if err != nil {
498✔
2385
                return err
×
2386
        }
×
2387

2388
        b, err := writeBigSize(dbSessID)
498✔
2389
        if err != nil {
498✔
2390
                return err
×
2391
        }
×
2392

2393
        return chanSessIDsBkt.Put(b, []byte{1})
498✔
2394
}
2395

2396
// getClientSessionBody loads the body of a ClientSession from the sessions
2397
// bucket corresponding to the serialized session id. This does not deserialize
2398
// the CommittedUpdates, AckUpdates or the Tower associated with the session.
2399
// If the caller requires this info, use getClientSession.
2400
func getClientSessionBody(sessions kvdb.RBucket,
2401
        idBytes []byte) (*ClientSession, error) {
1,422✔
2402

1,422✔
2403
        sessionBkt := sessions.NestedReadBucket(idBytes)
1,422✔
2404
        if sessionBkt == nil {
1,426✔
2405
                return nil, ErrClientSessionNotFound
4✔
2406
        }
4✔
2407

2408
        // Should never have a sessionBkt without also having its body.
2409
        sessionBody := sessionBkt.Get(cSessionBody)
1,418✔
2410
        if sessionBody == nil {
1,418✔
2411
                return nil, ErrCorruptClientSession
×
2412
        }
×
2413

2414
        var session ClientSession
1,418✔
2415
        copy(session.ID[:], idBytes)
1,418✔
2416

1,418✔
2417
        err := session.Decode(bytes.NewReader(sessionBody))
1,418✔
2418
        if err != nil {
1,418✔
2419
                return nil, err
×
2420
        }
×
2421

2422
        return &session, nil
1,418✔
2423
}
2424

2425
// ClientSessionFilterFn describes the signature of a callback function that can
2426
// be used to filter the sessions that are returned in any of the DB methods
2427
// that read sessions from the DB.
2428
type ClientSessionFilterFn func(*ClientSession) bool
2429

2430
// ClientSessWithNumCommittedUpdatesFilterFn describes the signature of a
2431
// callback function that can be used to filter out a session based on the
2432
// contents of ClientSession along with the number of un-acked committed updates
2433
// that the session has.
2434
type ClientSessWithNumCommittedUpdatesFilterFn func(*ClientSession, uint16) bool
2435

2436
// PerMaxHeightCB describes the signature of a callback function that can be
2437
// called for each channel that a session has updates for to communicate the
2438
// maximum commitment height that the session has backed up for the channel.
2439
type PerMaxHeightCB func(*ClientSession, lnwire.ChannelID, uint64)
2440

2441
// PerNumAckedUpdatesCB describes the signature of a callback function that can
2442
// be called for each channel that a session has updates for to communicate the
2443
// number of updates that the session has for the channel.
2444
type PerNumAckedUpdatesCB func(*ClientSession, lnwire.ChannelID, uint16)
2445

2446
// PerRogueUpdateCountCB describes the signature of a callback function that can
2447
// be called for each session with the number of rogue updates that the session
2448
// has.
2449
type PerRogueUpdateCountCB func(*ClientSession, uint16)
2450

2451
// PerAckedUpdateCB describes the signature of a callback function that can be
2452
// called for each of a session's acked updates.
2453
type PerAckedUpdateCB func(*ClientSession, uint16, BackupID)
2454

2455
// PerCommittedUpdateCB describes the signature of a callback function that can
2456
// be called for each of a session's committed updates (updates that the client
2457
// has not yet received an ACK for).
2458
type PerCommittedUpdateCB func(*ClientSession, *CommittedUpdate)
2459

2460
// ClientSessionListOption describes the signature of a functional option that
2461
// can be used when listing client sessions in order to provide any extra
2462
// instruction to the query.
2463
type ClientSessionListOption func(cfg *ClientSessionListCfg)
2464

2465
// ClientSessionListCfg defines various query parameters that will be used when
2466
// querying the DB for client sessions.
2467
type ClientSessionListCfg struct {
2468
        // PerNumAckedUpdates will, if set, be called for each of the session's
2469
        // channels to communicate the number of updates stored for that
2470
        // channel.
2471
        PerNumAckedUpdates PerNumAckedUpdatesCB
2472

2473
        // PerRogueUpdateCount will, if set, be called with the number of rogue
2474
        // updates that the session has backed up.
2475
        PerRogueUpdateCount PerRogueUpdateCountCB
2476

2477
        // PerMaxHeight will, if set, be called for each of the session's
2478
        // channels to communicate the highest commit height of updates stored
2479
        // for that channel.
2480
        PerMaxHeight PerMaxHeightCB
2481

2482
        // PerCommittedUpdate will, if set, be called for each of the session's
2483
        // committed (un-acked) updates.
2484
        PerCommittedUpdate PerCommittedUpdateCB
2485

2486
        // PreEvaluateFilterFn will be run after loading a session from the DB
2487
        // and _before_ any of the other call-back functions in
2488
        // ClientSessionListCfg. Therefore, if a session fails this filter
2489
        // function, then it will not be passed to any of the other call backs
2490
        // and won't be included in the return list.
2491
        PreEvaluateFilterFn ClientSessionFilterFn
2492

2493
        // PostEvaluateFilterFn will be run _after_ all the other call-back
2494
        // functions in ClientSessionListCfg. If a session fails this filter
2495
        // function then all it means is that it won't be included in the list
2496
        // of sessions to return.
2497
        PostEvaluateFilterFn ClientSessWithNumCommittedUpdatesFilterFn
2498
}
2499

2500
// NewClientSessionCfg constructs a new ClientSessionListCfg.
2501
func NewClientSessionCfg() *ClientSessionListCfg {
112✔
2502
        return &ClientSessionListCfg{}
112✔
2503
}
112✔
2504

2505
// WithPerMaxHeight constructs a functional option that will set a call-back
2506
// function to be called for each of a session's channels to communicate the
2507
// maximum commitment height that the session has stored for the channel.
2508
func WithPerMaxHeight(cb PerMaxHeightCB) ClientSessionListOption {
×
2509
        return func(cfg *ClientSessionListCfg) {
×
2510
                cfg.PerMaxHeight = cb
×
2511
        }
×
2512
}
2513

2514
// WithPerNumAckedUpdates constructs a functional option that will set a
2515
// call-back function to be called for each of a session's channels to
2516
// communicate the number of updates that the session has stored for the
2517
// channel.
2518
func WithPerNumAckedUpdates(cb PerNumAckedUpdatesCB) ClientSessionListOption {
4✔
2519
        return func(cfg *ClientSessionListCfg) {
13✔
2520
                cfg.PerNumAckedUpdates = cb
9✔
2521
        }
9✔
2522
}
2523

2524
// WithPerRogueUpdateCount constructs a functional option that will set a
2525
// call-back function to be called with the number of rogue updates that the
2526
// session has backed up.
UNCOV
2527
func WithPerRogueUpdateCount(cb PerRogueUpdateCountCB) ClientSessionListOption {
×
UNCOV
2528
        return func(cfg *ClientSessionListCfg) {
×
UNCOV
2529
                cfg.PerRogueUpdateCount = cb
×
UNCOV
2530
        }
×
2531
}
2532

2533
// WithPerCommittedUpdate constructs a functional option that will set a
2534
// call-back function to be called for each of a client's un-acked updates.
2535
func WithPerCommittedUpdate(cb PerCommittedUpdateCB) ClientSessionListOption {
25✔
2536
        return func(cfg *ClientSessionListCfg) {
52✔
2537
                cfg.PerCommittedUpdate = cb
27✔
2538
        }
27✔
2539
}
2540

2541
// WithPreEvalFilterFn constructs a functional option that will set a call-back
2542
// function that will be called immediately after loading a session. If the
2543
// session fails this filter function, then it will not be passed to any of the
2544
// other evaluation call-back functions.
2545
func WithPreEvalFilterFn(fn ClientSessionFilterFn) ClientSessionListOption {
85✔
2546
        return func(cfg *ClientSessionListCfg) {
113✔
2547
                cfg.PreEvaluateFilterFn = fn
28✔
2548
        }
28✔
2549
}
2550

2551
// WithPostEvalFilterFn constructs a functional option that will set a call-back
2552
// function that will be used to determine if a session should be included in
2553
// the returned list. This differs from WithPreEvalFilterFn since that call-back
2554
// is used to determine if the session should be evaluated at all (and thus
2555
// run against the other ClientSessionListCfg call-backs) whereas the session
2556
// will only reach the PostEvalFilterFn call-back once it has already been
2557
// evaluated by all the other call-backs.
2558
func WithPostEvalFilterFn(
2559
        fn ClientSessWithNumCommittedUpdatesFilterFn) ClientSessionListOption {
83✔
2560

83✔
2561
        return func(cfg *ClientSessionListCfg) {
107✔
2562
                cfg.PostEvaluateFilterFn = fn
24✔
2563
        }
24✔
2564
}
2565

2566
// getClientSession loads the full ClientSession associated with the serialized
2567
// session id. This method populates the CommittedUpdates, AckUpdates and Tower
2568
// in addition to the ClientSession's body.
2569
func (c *ClientDB) getClientSession(sessionsBkt, chanIDIndexBkt kvdb.RBucket,
2570
        idBytes []byte, opts ...ClientSessionListOption) (*ClientSession,
2571
        error) {
112✔
2572

112✔
2573
        cfg := NewClientSessionCfg()
112✔
2574
        for _, o := range opts {
200✔
2575
                o(cfg)
88✔
2576
        }
88✔
2577

2578
        session, err := getClientSessionBody(sessionsBkt, idBytes)
112✔
2579
        if err != nil {
112✔
2580
                return nil, err
×
2581
        }
×
2582

2583
        if cfg.PreEvaluateFilterFn != nil && !cfg.PreEvaluateFilterFn(session) {
112✔
UNCOV
2584
                return nil, ErrSessionFailedFilterFn
×
UNCOV
2585
        }
×
2586

2587
        // Can't fail because client session body has already been read.
2588
        sessionBkt := sessionsBkt.NestedReadBucket(idBytes)
112✔
2589

112✔
2590
        // Pass the session's committed (un-acked) updates through the call-back
112✔
2591
        // if one is provided.
112✔
2592
        numCommittedUpdates, err := filterClientSessionCommits(
112✔
2593
                sessionBkt, session, cfg.PerCommittedUpdate,
112✔
2594
        )
112✔
2595
        if err != nil {
112✔
2596
                return nil, err
×
2597
        }
×
2598

2599
        // Pass the session's acked updates through the call-back if one is
2600
        // provided.
2601
        err = c.filterClientSessionAcks(
112✔
2602
                sessionBkt, chanIDIndexBkt, session, cfg.PerMaxHeight,
112✔
2603
                cfg.PerNumAckedUpdates, cfg.PerRogueUpdateCount,
112✔
2604
        )
112✔
2605
        if err != nil {
112✔
2606
                return nil, err
×
2607
        }
×
2608

2609
        if cfg.PostEvaluateFilterFn != nil &&
112✔
2610
                !cfg.PostEvaluateFilterFn(session, numCommittedUpdates) {
117✔
2611

5✔
2612
                return nil, ErrSessionFailedFilterFn
5✔
2613
        }
5✔
2614

2615
        return session, nil
107✔
2616
}
2617

2618
// getClientSessionCommits retrieves all committed updates for the session
2619
// identified by the serialized session id. If a PerCommittedUpdateCB is
2620
// provided, then it will be called for each of the session's committed updates.
2621
func getClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession,
2622
        cb PerCommittedUpdateCB) ([]CommittedUpdate, error) {
189✔
2623

189✔
2624
        // Initialize committedUpdates so that we can return an initialized map
189✔
2625
        // if no committed updates exist.
189✔
2626
        committedUpdates := make([]CommittedUpdate, 0)
189✔
2627

189✔
2628
        sessionCommits := sessionBkt.NestedReadBucket(cSessionCommits)
189✔
2629
        if sessionCommits == nil {
283✔
2630
                return committedUpdates, nil
94✔
2631
        }
94✔
2632

2633
        err := sessionCommits.ForEach(func(k, v []byte) error {
119✔
2634
                var committedUpdate CommittedUpdate
24✔
2635
                err := committedUpdate.Decode(bytes.NewReader(v))
24✔
2636
                if err != nil {
24✔
2637
                        return err
×
2638
                }
×
2639
                committedUpdate.SeqNum = byteOrder.Uint16(k)
24✔
2640

24✔
2641
                committedUpdates = append(committedUpdates, committedUpdate)
24✔
2642

24✔
2643
                if cb != nil {
24✔
2644
                        cb(s, &committedUpdate)
×
2645
                }
×
2646

2647
                return nil
24✔
2648
        })
2649
        if err != nil {
95✔
2650
                return nil, err
×
2651
        }
×
2652

2653
        return committedUpdates, nil
95✔
2654
}
2655

2656
// filterClientSessionAcks retrieves all acked updates for the session
2657
// identified by the serialized session id and passes them to the provided
2658
// call back if one is provided.
2659
func (c *ClientDB) filterClientSessionAcks(sessionBkt,
2660
        chanIDIndexBkt kvdb.RBucket, s *ClientSession, perMaxCb PerMaxHeightCB,
2661
        perNumAckedUpdates PerNumAckedUpdatesCB,
2662
        perRogueUpdateCount PerRogueUpdateCountCB) error {
112✔
2663

112✔
2664
        if perRogueUpdateCount != nil {
112✔
UNCOV
2665
                var (
×
UNCOV
2666
                        count uint64
×
UNCOV
2667
                        err   error
×
UNCOV
2668
                )
×
UNCOV
2669
                rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
×
UNCOV
2670
                if len(rogueCountBytes) != 0 {
×
2671
                        count, err = readBigSize(rogueCountBytes)
×
2672
                        if err != nil {
×
2673
                                return err
×
2674
                        }
×
2675
                }
2676

UNCOV
2677
                perRogueUpdateCount(s, uint16(count))
×
2678
        }
2679

2680
        if perMaxCb == nil && perNumAckedUpdates == nil {
215✔
2681
                return nil
103✔
2682
        }
103✔
2683

2684
        sessionAcksRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
9✔
2685
        if sessionAcksRanges == nil {
10✔
2686
                return nil
1✔
2687
        }
1✔
2688

2689
        return sessionAcksRanges.ForEach(func(dbChanID, _ []byte) error {
16✔
2690
                rangeBkt := sessionAcksRanges.NestedReadBucket(dbChanID)
8✔
2691
                if rangeBkt == nil {
8✔
2692
                        return nil
×
2693
                }
×
2694

2695
                index, err := readRangeIndex(rangeBkt)
8✔
2696
                if err != nil {
8✔
2697
                        return err
×
2698
                }
×
2699

2700
                chanIDBytes := chanIDIndexBkt.Get(dbChanID)
8✔
2701
                var chanID lnwire.ChannelID
8✔
2702
                copy(chanID[:], chanIDBytes)
8✔
2703

8✔
2704
                if perMaxCb != nil {
8✔
2705
                        perMaxCb(s, chanID, index.MaxHeight())
×
2706
                }
×
2707

2708
                if perNumAckedUpdates != nil {
16✔
2709
                        perNumAckedUpdates(s, chanID, uint16(index.NumInSet()))
8✔
2710
                }
8✔
2711
                return nil
8✔
2712
        })
2713
}
2714

2715
// filterClientSessionCommits retrieves all committed updates for the session
2716
// identified by the serialized session id and passes them to the given
2717
// PerCommittedUpdateCB callback.
2718
func filterClientSessionCommits(sessionBkt kvdb.RBucket, s *ClientSession,
2719
        cb PerCommittedUpdateCB) (uint16, error) {
112✔
2720

112✔
2721
        sessionCommits := sessionBkt.NestedReadBucket(cSessionCommits)
112✔
2722
        if sessionCommits == nil {
164✔
2723
                return 0, nil
52✔
2724
        }
52✔
2725

2726
        var numUpdates uint16
60✔
2727
        err := sessionCommits.ForEach(func(k, v []byte) error {
79✔
2728
                numUpdates++
19✔
2729

19✔
2730
                if cb == nil {
30✔
2731
                        return nil
11✔
2732
                }
11✔
2733

2734
                var committedUpdate CommittedUpdate
8✔
2735
                err := committedUpdate.Decode(bytes.NewReader(v))
8✔
2736
                if err != nil {
8✔
2737
                        return err
×
2738
                }
×
2739
                committedUpdate.SeqNum = byteOrder.Uint16(k)
8✔
2740

8✔
2741
                cb(s, &committedUpdate)
8✔
2742

8✔
2743
                return nil
8✔
2744
        })
2745
        if err != nil {
60✔
2746
                return 0, err
×
2747
        }
×
2748

2749
        return numUpdates, nil
60✔
2750
}
2751

2752
// putClientSessionBody stores the body of the ClientSession (everything but the
2753
// CommittedUpdates and AckedUpdates).
2754
func putClientSessionBody(sessionBkt kvdb.RwBucket,
2755
        session *ClientSession) error {
1,157✔
2756

1,157✔
2757
        var b bytes.Buffer
1,157✔
2758
        err := session.Encode(&b)
1,157✔
2759
        if err != nil {
1,157✔
2760
                return err
×
2761
        }
×
2762

2763
        return sessionBkt.Put(cSessionBody, b.Bytes())
1,157✔
2764
}
2765

2766
// markSessionStatus updates the persisted state of the session to the new
2767
// status.
2768
func markSessionStatus(sessions kvdb.RwBucket, session *ClientSession,
2769
        status CSessionStatus) error {
3✔
2770

3✔
2771
        sessionBkt, err := sessions.CreateBucketIfNotExists(session.ID[:])
3✔
2772
        if err != nil {
3✔
2773
                return err
×
2774
        }
×
2775

2776
        session.Status = status
3✔
2777

3✔
2778
        return putClientSessionBody(sessionBkt, session)
3✔
2779
}
2780

2781
// getChanSummary loads a ClientChanSummary for the passed chanID.
2782
func getChanSummary(chanDetails kvdb.RBucket) (*ClientChanSummary, error) {
16✔
2783
        chanSummaryBytes := chanDetails.Get(cChannelSummary)
16✔
2784
        if chanSummaryBytes == nil {
16✔
2785
                return nil, ErrChannelNotRegistered
×
2786
        }
×
2787

2788
        var summary ClientChanSummary
16✔
2789
        err := summary.Decode(bytes.NewReader(chanSummaryBytes))
16✔
2790
        if err != nil {
16✔
2791
                return nil, err
×
2792
        }
×
2793

2794
        return &summary, nil
16✔
2795
}
2796

2797
// putChanSummary stores a ClientChanSummary for the passed chanID.
2798
func putChanSummary(chanDetails kvdb.RwBucket,
2799
        summary *ClientChanSummary) error {
67✔
2800

67✔
2801
        var b bytes.Buffer
67✔
2802
        err := summary.Encode(&b)
67✔
2803
        if err != nil {
67✔
2804
                return err
×
2805
        }
×
2806

2807
        return chanDetails.Put(cChannelSummary, b.Bytes())
67✔
2808
}
2809

2810
// getTower loads a Tower identified by its serialized tower id.
2811
func getTower(towers kvdb.RBucket, id []byte) (*Tower, error) {
215✔
2812
        towerBytes := towers.Get(id)
215✔
2813
        if towerBytes == nil {
217✔
2814
                return nil, ErrTowerNotFound
2✔
2815
        }
2✔
2816

2817
        var tower Tower
213✔
2818
        err := tower.Decode(bytes.NewReader(towerBytes))
213✔
2819
        if err != nil {
213✔
2820
                return nil, err
×
2821
        }
×
2822

2823
        tower.ID = TowerIDFromBytes(id)
213✔
2824

213✔
2825
        return &tower, nil
213✔
2826
}
2827

2828
// putTower stores a Tower identified by its serialized tower id.
2829
func putTower(towers kvdb.RwBucket, tower *Tower) error {
103✔
2830
        var b bytes.Buffer
103✔
2831
        err := tower.Encode(&b)
103✔
2832
        if err != nil {
103✔
2833
                return err
×
2834
        }
×
2835

2836
        return towers.Put(tower.ID.Bytes(), b.Bytes())
103✔
2837
}
2838

2839
// getDBChanID returns the db-assigned channel ID for the given real channel ID.
2840
// It returns both the uint64 and byte representation.
2841
func getDBChanID(chanDetailsBkt kvdb.RBucket, chanID lnwire.ChannelID) (uint64,
2842
        []byte, error) {
596✔
2843

596✔
2844
        chanDetails := chanDetailsBkt.NestedReadBucket(chanID[:])
596✔
2845
        if chanDetails == nil {
610✔
2846
                return 0, nil, ErrChannelNotRegistered
14✔
2847
        }
14✔
2848

2849
        idBytes := chanDetails.Get(cChanDBID)
582✔
2850
        if len(idBytes) == 0 {
582✔
2851
                return 0, nil, fmt.Errorf("no db-assigned ID found for "+
×
2852
                        "channel ID %s", chanID)
×
2853
        }
×
2854

2855
        id, err := readBigSize(idBytes)
582✔
2856
        if err != nil {
582✔
2857
                return 0, nil, err
×
2858
        }
×
2859

2860
        return id, idBytes, nil
582✔
2861
}
2862

2863
// getDBSessionID returns the db-assigned session ID for the given real session
2864
// ID. It returns both the uint64 and byte representation.
2865
func getDBSessionID(sessionsBkt kvdb.RBucket, sessionID SessionID) (uint64,
2866
        []byte, error) {
526✔
2867

526✔
2868
        sessionBkt := sessionsBkt.NestedReadBucket(sessionID[:])
526✔
2869
        if sessionBkt == nil {
526✔
2870
                return 0, nil, ErrClientSessionNotFound
×
2871
        }
×
2872

2873
        idBytes := sessionBkt.Get(cSessionDBID)
526✔
2874
        if len(idBytes) == 0 {
526✔
2875
                return 0, nil, fmt.Errorf("no db-assigned ID found for "+
×
2876
                        "session ID %s", sessionID)
×
2877
        }
×
2878

2879
        id, err := readBigSize(idBytes)
526✔
2880
        if err != nil {
526✔
2881
                return 0, nil, err
×
2882
        }
×
2883

2884
        return id, idBytes, nil
526✔
2885
}
2886

2887
// maybeUpdateMaxCommitHeight updates the given channel details bucket with the
2888
// given height if it is larger than the current max height stored for the
2889
// channel.
2890
func maybeUpdateMaxCommitHeight(tx kvdb.RwTx, backupID BackupID) error {
167,721✔
2891
        chanDetailsBkt := tx.ReadWriteBucket(cChanDetailsBkt)
167,721✔
2892
        if chanDetailsBkt == nil {
167,721✔
2893
                return ErrUninitializedDB
×
2894
        }
×
2895

2896
        // If an entry for this channel does not exist in the channel details
2897
        // bucket then we exit here as this means that the channel has been
2898
        // closed.
2899
        chanDetails := chanDetailsBkt.NestedReadWriteBucket(backupID.ChanID[:])
167,721✔
2900
        if chanDetails == nil {
334,478✔
2901
                return nil
166,757✔
2902
        }
166,757✔
2903

2904
        putHeight := func() error {
1,440✔
2905
                b, err := writeBigSize(backupID.CommitHeight)
476✔
2906
                if err != nil {
476✔
2907
                        return err
×
2908
                }
×
2909

2910
                return chanDetails.Put(
476✔
2911
                        cChanMaxCommitmentHeight, b,
476✔
2912
                )
476✔
2913
        }
2914

2915
        // Get current height.
2916
        heightBytes := chanDetails.Get(cChanMaxCommitmentHeight)
964✔
2917

964✔
2918
        // The height might have not been set yet, in which case
964✔
2919
        // we can just write the new height.
964✔
2920
        if len(heightBytes) == 0 {
1,023✔
2921
                return putHeight()
59✔
2922
        }
59✔
2923

2924
        // Otherwise, read in the current max commitment height for the channel.
2925
        currentHeight, err := readBigSize(heightBytes)
905✔
2926
        if err != nil {
905✔
2927
                return err
×
2928
        }
×
2929

2930
        // If the new height is not larger than the current persisted height,
2931
        // then there is nothing left for us to do.
2932
        if backupID.CommitHeight <= currentHeight {
1,393✔
2933
                return nil
488✔
2934
        }
488✔
2935

2936
        return putHeight()
417✔
2937
}
2938

2939
func getRealSessionID(sessIDIndexBkt kvdb.RBucket, dbID uint64) (*SessionID,
2940
        error) {
38✔
2941

38✔
2942
        dbIDBytes, err := writeBigSize(dbID)
38✔
2943
        if err != nil {
38✔
2944
                return nil, err
×
2945
        }
×
2946

2947
        sessIDBytes := sessIDIndexBkt.Get(dbIDBytes)
38✔
2948
        if len(sessIDBytes) != SessionIDSize {
38✔
2949
                return nil, fmt.Errorf("session ID not found")
×
2950
        }
×
2951

2952
        var sessID SessionID
38✔
2953
        copy(sessID[:], sessIDBytes)
38✔
2954

38✔
2955
        return &sessID, nil
38✔
2956
}
2957

2958
func getRealChannelID(chanIDIndexBkt kvdb.RBucket,
2959
        dbID uint64) (*lnwire.ChannelID, error) {
42✔
2960

42✔
2961
        dbIDBytes, err := writeBigSize(dbID)
42✔
2962
        if err != nil {
42✔
2963
                return nil, err
×
2964
        }
×
2965

2966
        chanIDBytes := chanIDIndexBkt.Get(dbIDBytes)
42✔
2967
        if len(chanIDBytes) != 32 { //nolint:gomnd
42✔
2968
                return nil, fmt.Errorf("channel ID not found")
×
2969
        }
×
2970

2971
        var chanIDS lnwire.ChannelID
42✔
2972
        copy(chanIDS[:], chanIDBytes)
42✔
2973

42✔
2974
        return &chanIDS, nil
42✔
2975
}
2976

2977
// writeBigSize will encode the given uint64 as a BigSize byte slice.
2978
func writeBigSize(i uint64) ([]byte, error) {
493,067✔
2979
        var b bytes.Buffer
493,067✔
2980
        err := tlv.WriteVarInt(&b, i, &[8]byte{})
493,067✔
2981
        if err != nil {
493,067✔
2982
                return nil, err
×
2983
        }
×
2984

2985
        return b.Bytes(), nil
493,067✔
2986
}
2987

2988
// readBigSize converts the given byte slice into a uint64 and assumes that the
2989
// bytes slice is using BigSize encoding.
2990
func readBigSize(b []byte) (uint64, error) {
819,290✔
2991
        r := bytes.NewReader(b)
819,290✔
2992
        i, err := tlv.ReadVarInt(r, &[8]byte{})
819,290✔
2993
        if err != nil {
819,290✔
2994
                return 0, err
×
2995
        }
×
2996

2997
        return i, nil
819,290✔
2998
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc