• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.45
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "net"
8
        "os"
9
        "testing"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/go-errors/errors"
15
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
16
        "github.com/lightningnetwork/lnd/channeldb/migration12"
17
        "github.com/lightningnetwork/lnd/channeldb/migration13"
18
        "github.com/lightningnetwork/lnd/channeldb/migration16"
19
        "github.com/lightningnetwork/lnd/channeldb/migration20"
20
        "github.com/lightningnetwork/lnd/channeldb/migration21"
21
        "github.com/lightningnetwork/lnd/channeldb/migration23"
22
        "github.com/lightningnetwork/lnd/channeldb/migration24"
23
        "github.com/lightningnetwork/lnd/channeldb/migration25"
24
        "github.com/lightningnetwork/lnd/channeldb/migration26"
25
        "github.com/lightningnetwork/lnd/channeldb/migration27"
26
        "github.com/lightningnetwork/lnd/channeldb/migration29"
27
        "github.com/lightningnetwork/lnd/channeldb/migration30"
28
        "github.com/lightningnetwork/lnd/channeldb/migration31"
29
        "github.com/lightningnetwork/lnd/channeldb/migration32"
30
        "github.com/lightningnetwork/lnd/channeldb/migration33"
31
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
32
        "github.com/lightningnetwork/lnd/clock"
33
        graphdb "github.com/lightningnetwork/lnd/graph/db"
34
        "github.com/lightningnetwork/lnd/invoices"
35
        "github.com/lightningnetwork/lnd/kvdb"
36
        "github.com/lightningnetwork/lnd/lnwire"
37
        "github.com/stretchr/testify/require"
38
)
39

40
const (
41
        dbName = "channel.db"
42
)
43

44
var (
45
        // ErrDryRunMigrationOK signals that a migration executed successful,
46
        // but we intentionally did not commit the result.
47
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
48

49
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
50
        // bucket does not exist.
51
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
52
                "found")
53

54
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
55
        // final htlc outcomes does not exist.
56
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
57
                "bucket not found")
58
)
59

60
// migration is a function which takes a prior outdated version of the database
61
// instances and mutates the key/bucket structure to arrive at a more
62
// up-to-date version of the database.
63
type migration func(tx kvdb.RwTx) error
64

65
// mandatoryVersion defines a db version that must be applied before the lnd
66
// starts.
67
type mandatoryVersion struct {
68
        number    uint32
69
        migration migration
70
}
71

72
// MigrationConfig is an interface combines the config interfaces of all
73
// optional migrations.
74
type MigrationConfig interface {
75
        migration30.MigrateRevLogConfig
76
}
77

78
// MigrationConfigImpl is a super set of all the various migration configs and
79
// an implementation of MigrationConfig.
80
type MigrationConfigImpl struct {
81
        migration30.MigrateRevLogConfigImpl
82
}
83

84
// optionalMigration defines an optional migration function. When a migration
85
// is optional, it usually involves a large scale of changes that might touch
86
// millions of keys. Due to OOM concern, the update cannot be safely done
87
// within one db transaction. Thus, for optional migrations, they must take the
88
// db backend and construct transactions as needed.
89
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
90

91
// optionalVersion defines a db version that can be optionally applied. When
92
// applying migrations, we must apply all the mandatory migrations first before
93
// attempting optional ones.
94
type optionalVersion struct {
95
        name      string
96
        migration optionalMigration
97
}
98

99
var (
100
        // dbVersions is storing all mandatory versions of database. If current
101
        // version of database don't match with latest version this list will
102
        // be used for retrieving all migration function that are need to apply
103
        // to the current db.
104
        dbVersions = []mandatoryVersion{
105
                {
106
                        // The base DB version requires no migration.
107
                        number:    0,
108
                        migration: nil,
109
                },
110
                {
111
                        // The version of the database where two new indexes
112
                        // for the update time of node and channel updates were
113
                        // added.
114
                        number:    1,
115
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
116
                },
117
                {
118
                        // The DB version that added the invoice event time
119
                        // series.
120
                        number:    2,
121
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
122
                },
123
                {
124
                        // The DB version that updated the embedded invoice in
125
                        // outgoing payments to match the new format.
126
                        number:    3,
127
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
128
                },
129
                {
130
                        // The version of the database where every channel
131
                        // always has two entries in the edges bucket. If
132
                        // a policy is unknown, this will be represented
133
                        // by a special byte sequence.
134
                        number:    4,
135
                        migration: migration_01_to_11.MigrateEdgePolicies,
136
                },
137
                {
138
                        // The DB version where we persist each attempt to send
139
                        // an HTLC to a payment hash, and track whether the
140
                        // payment is in-flight, succeeded, or failed.
141
                        number:    5,
142
                        migration: migration_01_to_11.PaymentStatusesMigration,
143
                },
144
                {
145
                        // The DB version that properly prunes stale entries
146
                        // from the edge update index.
147
                        number:    6,
148
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
149
                },
150
                {
151
                        // The DB version that migrates the ChannelCloseSummary
152
                        // to a format where optional fields are indicated with
153
                        // boolean flags.
154
                        number:    7,
155
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
156
                },
157
                {
158
                        // The DB version that changes the gossiper's message
159
                        // store keys to account for the message's type and
160
                        // ShortChannelID.
161
                        number:    8,
162
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
163
                },
164
                {
165
                        // The DB version where the payments and payment
166
                        // statuses are moved to being stored in a combined
167
                        // bucket.
168
                        number:    9,
169
                        migration: migration_01_to_11.MigrateOutgoingPayments,
170
                },
171
                {
172
                        // The DB version where we started to store legacy
173
                        // payload information for all routes, as well as the
174
                        // optional TLV records.
175
                        number:    10,
176
                        migration: migration_01_to_11.MigrateRouteSerialization,
177
                },
178
                {
179
                        // Add invoice htlc and cltv delta fields.
180
                        number:    11,
181
                        migration: migration_01_to_11.MigrateInvoices,
182
                },
183
                {
184
                        // Migrate to TLV invoice bodies, add payment address
185
                        // and features, remove receipt.
186
                        number:    12,
187
                        migration: migration12.MigrateInvoiceTLV,
188
                },
189
                {
190
                        // Migrate to multi-path payments.
191
                        number:    13,
192
                        migration: migration13.MigrateMPP,
193
                },
194
                {
195
                        // Initialize payment address index and begin using it
196
                        // as the default index, falling back to payment hash
197
                        // index.
198
                        number:    14,
199
                        migration: mig.CreateTLB(payAddrIndexBucket),
200
                },
201
                {
202
                        // Initialize payment index bucket which will be used
203
                        // to index payments by sequence number. This index will
204
                        // be used to allow more efficient ListPayments queries.
205
                        number:    15,
206
                        migration: mig.CreateTLB(paymentsIndexBucket),
207
                },
208
                {
209
                        // Add our existing payments to the index bucket created
210
                        // in migration 15.
211
                        number:    16,
212
                        migration: migration16.MigrateSequenceIndex,
213
                },
214
                {
215
                        // Create a top level bucket which will store extra
216
                        // information about channel closes.
217
                        number:    17,
218
                        migration: mig.CreateTLB(closeSummaryBucket),
219
                },
220
                {
221
                        // Create a top level bucket which holds information
222
                        // about our peers.
223
                        number:    18,
224
                        migration: mig.CreateTLB(peersBucket),
225
                },
226
                {
227
                        // Create a top level bucket which holds outpoint
228
                        // information.
229
                        number:    19,
230
                        migration: mig.CreateTLB(outpointBucket),
231
                },
232
                {
233
                        // Migrate some data to the outpoint index.
234
                        number:    20,
235
                        migration: migration20.MigrateOutpointIndex,
236
                },
237
                {
238
                        // Migrate to length prefixed wire messages everywhere
239
                        // in the database.
240
                        number:    21,
241
                        migration: migration21.MigrateDatabaseWireMessages,
242
                },
243
                {
244
                        // Initialize set id index so that invoices can be
245
                        // queried by individual htlc sets.
246
                        number:    22,
247
                        migration: mig.CreateTLB(setIDIndexBucket),
248
                },
249
                {
250
                        number:    23,
251
                        migration: migration23.MigrateHtlcAttempts,
252
                },
253
                {
254
                        // Remove old forwarding packages of closed channels.
255
                        number:    24,
256
                        migration: migration24.MigrateFwdPkgCleanup,
257
                },
258
                {
259
                        // Save the initial local/remote balances in channel
260
                        // info.
261
                        number:    25,
262
                        migration: migration25.MigrateInitialBalances,
263
                },
264
                {
265
                        // Migrate the initial local/remote balance fields into
266
                        // tlv records.
267
                        number:    26,
268
                        migration: migration26.MigrateBalancesToTlvRecords,
269
                },
270
                {
271
                        // Patch the initial local/remote balance fields with
272
                        // empty values for historical channels.
273
                        number:    27,
274
                        migration: migration27.MigrateHistoricalBalances,
275
                },
276
                {
277
                        number:    28,
278
                        migration: mig.CreateTLB(chanIDBucket),
279
                },
280
                {
281
                        number:    29,
282
                        migration: migration29.MigrateChanID,
283
                },
284
                {
285
                        // Removes the "sweeper-last-tx" bucket. Although we
286
                        // do not have a mandatory version 30 we skip this
287
                        // version because its naming is already used for the
288
                        // first optional migration.
289
                        number:    31,
290
                        migration: migration31.DeleteLastPublishedTxTLB,
291
                },
292
                {
293
                        number:    32,
294
                        migration: migration32.MigrateMCRouteSerialisation,
295
                },
296
                {
297
                        number:    33,
298
                        migration: migration33.MigrateMCStoreNameSpacedResults,
299
                },
300
        }
301

302
        // optionalVersions stores all optional migrations that are applied
303
        // after dbVersions.
304
        //
305
        // NOTE: optional migrations must be fault-tolerant and re-run already
306
        // migrated data must be noop, which means the migration must be able
307
        // to determine its state.
308
        optionalVersions = []optionalVersion{
309
                {
310
                        name: "prune revocation log",
311
                        migration: func(db kvdb.Backend,
312
                                cfg MigrationConfig) error {
×
313

×
314
                                return migration30.MigrateRevocationLog(db, cfg)
×
315
                        },
×
316
                },
317
        }
318

319
        // Big endian is the preferred byte order, due to cursor scans over
320
        // integer keys iterating in order.
321
        byteOrder = binary.BigEndian
322

323
        // channelOpeningStateBucket is the database bucket used to store the
324
        // channelOpeningState for each channel that is currently in the process
325
        // of being opened.
326
        channelOpeningStateBucket = []byte("channelOpeningState")
327
)
328

329
// DB is the primary datastore for the lnd daemon. The database stores
330
// information related to nodes, routing data, open/closed channels, fee
331
// schedules, and reputation data.
332
type DB struct {
333
        kvdb.Backend
334

335
        // channelStateDB separates all DB operations on channel state.
336
        channelStateDB *ChannelStateDB
337

338
        dbPath                    string
339
        clock                     clock.Clock
340
        dryRun                    bool
341
        keepFailedPaymentAttempts bool
342
        storeFinalHtlcResolutions bool
343

344
        // noRevLogAmtData if true, means that commitment transaction amount
345
        // data should not be stored in the revocation log.
346
        noRevLogAmtData bool
347
}
348

349
// OpenForTesting opens or creates a channeldb to be used for tests. Any
350
// necessary schemas migrations due to updates will take place as necessary.
351
func OpenForTesting(t testing.TB, dbPath string,
UNCOV
352
        modifiers ...OptionModifier) *DB {
×
UNCOV
353

×
UNCOV
354
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
×
UNCOV
355
                DBPath:            dbPath,
×
UNCOV
356
                DBFileName:        dbName,
×
UNCOV
357
                NoFreelistSync:    true,
×
UNCOV
358
                AutoCompact:       false,
×
UNCOV
359
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
×
UNCOV
360
                DBTimeout:         kvdb.DefaultDBTimeout,
×
UNCOV
361
        })
×
UNCOV
362
        require.NoError(t, err)
×
UNCOV
363

×
UNCOV
364
        db, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
365
        require.NoError(t, err)
×
UNCOV
366

×
UNCOV
367
        db.dbPath = dbPath
×
UNCOV
368

×
UNCOV
369
        t.Cleanup(func() {
×
UNCOV
370
                require.NoError(t, db.Close())
×
UNCOV
371
        })
×
372

UNCOV
373
        return db
×
374
}
375

376
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
377
// Any necessary schemas migrations due to updates will take place as necessary.
378
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
379
        error) {
3✔
380

3✔
381
        opts := DefaultOptions()
3✔
382
        for _, modifier := range modifiers {
6✔
383
                modifier(&opts)
3✔
384
        }
3✔
385

386
        if !opts.NoMigration {
6✔
387
                if err := initChannelDB(backend); err != nil {
3✔
UNCOV
388
                        return nil, err
×
UNCOV
389
                }
×
390
        }
391

392
        chanDB := &DB{
3✔
393
                Backend: backend,
3✔
394
                channelStateDB: &ChannelStateDB{
3✔
395
                        linkNodeDB: &LinkNodeDB{
3✔
396
                                backend: backend,
3✔
397
                        },
3✔
398
                        backend: backend,
3✔
399
                },
3✔
400
                clock:                     opts.clock,
3✔
401
                dryRun:                    opts.dryRun,
3✔
402
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
3✔
403
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
3✔
404
                noRevLogAmtData:           opts.NoRevLogAmtData,
3✔
405
        }
3✔
406

3✔
407
        // Set the parent pointer (only used in tests).
3✔
408
        chanDB.channelStateDB.parent = chanDB
3✔
409

3✔
410
        // Synchronize the version of database and apply migrations if needed.
3✔
411
        if !opts.NoMigration {
6✔
412
                if err := chanDB.syncVersions(dbVersions); err != nil {
3✔
UNCOV
413
                        backend.Close()
×
UNCOV
414
                        return nil, err
×
UNCOV
415
                }
×
416

417
                // Grab the optional migration config.
418
                omc := opts.OptionalMiragtionConfig
3✔
419
                if err := chanDB.applyOptionalVersions(omc); err != nil {
3✔
420
                        backend.Close()
×
421
                        return nil, err
×
422
                }
×
423
        }
424

425
        return chanDB, nil
3✔
426
}
427

428
// Path returns the file path to the channel database.
UNCOV
429
func (d *DB) Path() string {
×
UNCOV
430
        return d.dbPath
×
UNCOV
431
}
×
432

433
var dbTopLevelBuckets = [][]byte{
434
        openChannelBucket,
435
        closedChannelBucket,
436
        forwardingLogBucket,
437
        fwdPackagesKey,
438
        invoiceBucket,
439
        payAddrIndexBucket,
440
        setIDIndexBucket,
441
        paymentsIndexBucket,
442
        peersBucket,
443
        nodeInfoBucket,
444
        metaBucket,
445
        closeSummaryBucket,
446
        outpointBucket,
447
        chanIDBucket,
448
        historicalChannelBucket,
449
}
450

451
// Wipe completely deletes all saved state within all used buckets within the
452
// database. The deletion is done in a single transaction, therefore this
453
// operation is fully atomic.
UNCOV
454
func (d *DB) Wipe() error {
×
UNCOV
455
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
456
                for _, tlb := range dbTopLevelBuckets {
×
UNCOV
457
                        err := tx.DeleteTopLevelBucket(tlb)
×
UNCOV
458
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
459
                                return err
×
460
                        }
×
461
                }
UNCOV
462
                return nil
×
UNCOV
463
        }, func() {})
×
UNCOV
464
        if err != nil {
×
465
                return err
×
466
        }
×
467

UNCOV
468
        return initChannelDB(d.Backend)
×
469
}
470

471
// initChannelDB creates and initializes a fresh version of channeldb. In the
472
// case that the target path has not yet been created or doesn't yet exist, then
473
// the path is created. Additionally, all required top-level buckets used within
474
// the database are created.
475
func initChannelDB(db kvdb.Backend) error {
3✔
476
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
477
                // Check if DB was marked as inactive with a tomb stone.
3✔
478
                if err := EnsureNoTombstone(tx); err != nil {
3✔
UNCOV
479
                        return err
×
UNCOV
480
                }
×
481

482
                meta := &Meta{}
3✔
483
                // Check if DB is already initialized.
3✔
484
                err := FetchMeta(meta, tx)
3✔
485
                if err == nil {
6✔
486
                        return nil
3✔
487
                }
3✔
488

489
                for _, tlb := range dbTopLevelBuckets {
6✔
490
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
3✔
491
                                return err
×
492
                        }
×
493
                }
494

495
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
3✔
496
                return putMeta(meta, tx)
3✔
497
        }, func() {})
3✔
498
        if err != nil {
3✔
UNCOV
499
                return fmt.Errorf("unable to create new channeldb: %w", err)
×
UNCOV
500
        }
×
501

502
        return nil
3✔
503
}
504

505
// fileExists returns true if the file exists, and false otherwise.
UNCOV
506
func fileExists(path string) bool {
×
UNCOV
507
        if _, err := os.Stat(path); err != nil {
×
508
                if os.IsNotExist(err) {
×
509
                        return false
×
510
                }
×
511
        }
512

UNCOV
513
        return true
×
514
}
515

516
// ChannelStateDB is a database that keeps track of all channel state.
517
type ChannelStateDB struct {
518
        // linkNodeDB separates all DB operations on LinkNodes.
519
        linkNodeDB *LinkNodeDB
520

521
        // parent holds a pointer to the "main" channeldb.DB object. This is
522
        // only used for testing and should never be used in production code.
523
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
524
        // this pointer.
525
        parent *DB
526

527
        // backend points to the actual backend holding the channel state
528
        // database. This may be a real backend or a cache middleware.
529
        backend kvdb.Backend
530
}
531

532
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
533
// ChannelStateDB instance. Use this function only in tests where passing around
534
// pointers makes testing less readable. Never to be used in production code!
UNCOV
535
func (c *ChannelStateDB) GetParentDB() *DB {
×
UNCOV
536
        return c.parent
×
UNCOV
537
}
×
538

539
// LinkNodeDB returns the current instance of the link node database.
540
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
3✔
541
        return c.linkNodeDB
3✔
542
}
3✔
543

544
// FetchOpenChannels starts a new database transaction and returns all stored
545
// currently active/open channels associated with the target nodeID. In the case
546
// that no active channels are known to have been created with this node, then a
547
// zero-length slice is returned.
548
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
549
        []*OpenChannel, error) {
3✔
550

3✔
551
        var channels []*OpenChannel
3✔
552
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
553
                var err error
3✔
554
                channels, err = c.fetchOpenChannels(tx, nodeID)
3✔
555
                return err
3✔
556
        }, func() {
6✔
557
                channels = nil
3✔
558
        })
3✔
559

560
        return channels, err
3✔
561
}
562

563
// fetchOpenChannels uses and existing database transaction and returns all
564
// stored currently active/open channels associated with the target nodeID. In
565
// the case that no active channels are known to have been created with this
566
// node, then a zero-length slice is returned.
567
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
568
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
3✔
569

3✔
570
        // Get the bucket dedicated to storing the metadata for open channels.
3✔
571
        openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
572
        if openChanBucket == nil {
3✔
573
                return nil, nil
×
574
        }
×
575

576
        // Within this top level bucket, fetch the bucket dedicated to storing
577
        // open channel data specific to the remote node.
578
        pub := nodeID.SerializeCompressed()
3✔
579
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
3✔
580
        if nodeChanBucket == nil {
6✔
581
                return nil, nil
3✔
582
        }
3✔
583

584
        // Next, we'll need to go down an additional layer in order to retrieve
585
        // the channels for each chain the node knows of.
586
        var channels []*OpenChannel
3✔
587
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
588
                // If there's a value, it's not a bucket so ignore it.
3✔
589
                if v != nil {
3✔
590
                        return nil
×
591
                }
×
592

593
                // If we've found a valid chainhash bucket, then we'll retrieve
594
                // that so we can extract all the channels.
595
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
3✔
596
                if chainBucket == nil {
3✔
597
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
598
                                chainHash[:])
×
599
                }
×
600

601
                // Finally, we both of the necessary buckets retrieved, fetch
602
                // all the active channels related to this node.
603
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
3✔
604
                if err != nil {
3✔
605
                        return fmt.Errorf("unable to read channel for "+
×
606
                                "chain_hash=%x, node_key=%x: %v",
×
607
                                chainHash[:], pub, err)
×
608
                }
×
609

610
                channels = append(channels, nodeChannels...)
3✔
611
                return nil
3✔
612
        })
613

614
        return channels, err
3✔
615
}
616

617
// fetchNodeChannels retrieves all active channels from the target chainBucket
618
// which is under a node's dedicated channel bucket. This function is typically
619
// used to fetch all the active channels related to a particular node.
620
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
621
        []*OpenChannel, error) {
3✔
622

3✔
623
        var channels []*OpenChannel
3✔
624

3✔
625
        // A node may have channels on several chains, so for each known chain,
3✔
626
        // we'll extract all the channels.
3✔
627
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
6✔
628
                // If there's a value, it's not a bucket so ignore it.
3✔
629
                if v != nil {
3✔
630
                        return nil
×
631
                }
×
632

633
                // Once we've found a valid channel bucket, we'll extract it
634
                // from the node's chain bucket.
635
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
3✔
636

3✔
637
                var outPoint wire.OutPoint
3✔
638
                err := graphdb.ReadOutpoint(
3✔
639
                        bytes.NewReader(chanPoint), &outPoint,
3✔
640
                )
3✔
641
                if err != nil {
3✔
642
                        return err
×
643
                }
×
644
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
3✔
645
                if err != nil {
3✔
646
                        return fmt.Errorf("unable to read channel data for "+
×
647
                                "chan_point=%v: %w", outPoint, err)
×
648
                }
×
649
                oChannel.Db = c
3✔
650

3✔
651
                channels = append(channels, oChannel)
3✔
652

3✔
653
                return nil
3✔
654
        })
655
        if err != nil {
3✔
656
                return nil, err
×
657
        }
×
658

659
        return channels, nil
3✔
660
}
661

662
// FetchChannel attempts to locate a channel specified by the passed channel
663
// point. If the channel cannot be found, then an error will be returned.
664
func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel,
665
        error) {
3✔
666

3✔
667
        var targetChanPoint bytes.Buffer
3✔
668
        err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint)
3✔
669
        if err != nil {
3✔
670
                return nil, err
×
671
        }
×
672

673
        targetChanPointBytes := targetChanPoint.Bytes()
3✔
674
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
675
                error) {
6✔
676

3✔
677
                return targetChanPointBytes, &chanPoint, nil
3✔
678
        }
3✔
679

680
        return c.channelScanner(nil, selector)
3✔
681
}
682

683
// FetchChannelByID attempts to locate a channel specified by the passed channel
684
// ID. If the channel cannot be found, then an error will be returned.
685
// Optionally an existing db tx can be supplied.
686
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
687
        *OpenChannel, error) {
3✔
688

3✔
689
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
690
                error) {
6✔
691

3✔
692
                var (
3✔
693
                        targetChanPointBytes []byte
3✔
694
                        targetChanPoint      *wire.OutPoint
3✔
695

3✔
696
                        // errChanFound is used to signal that the channel has
3✔
697
                        // been found so that iteration through the DB buckets
3✔
698
                        // can stop.
3✔
699
                        errChanFound = errors.New("channel found")
3✔
700
                )
3✔
701
                err := chainBkt.ForEach(func(k, _ []byte) error {
6✔
702
                        var outPoint wire.OutPoint
3✔
703
                        err := graphdb.ReadOutpoint(
3✔
704
                                bytes.NewReader(k), &outPoint,
3✔
705
                        )
3✔
706
                        if err != nil {
3✔
707
                                return err
×
708
                        }
×
709

710
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
3✔
711
                        if chanID != id {
3✔
UNCOV
712
                                return nil
×
UNCOV
713
                        }
×
714

715
                        targetChanPoint = &outPoint
3✔
716
                        targetChanPointBytes = k
3✔
717

3✔
718
                        return errChanFound
3✔
719
                })
720
                if err != nil && !errors.Is(err, errChanFound) {
3✔
721
                        return nil, nil, err
×
722
                }
×
723
                if targetChanPoint == nil {
3✔
UNCOV
724
                        return nil, nil, ErrChannelNotFound
×
UNCOV
725
                }
×
726

727
                return targetChanPointBytes, targetChanPoint, nil
3✔
728
        }
729

730
        return c.channelScanner(tx, selector)
3✔
731
}
732

733
// ChanCount is used by the server in determining access control.
734
type ChanCount struct {
735
        HasOpenOrClosedChan bool
736
        PendingOpenCount    uint64
737
}
738

739
// FetchPermAndTempPeers returns a map where the key is the remote node's
740
// public key and the value is a struct that has a tally of the pending-open
741
// channels and whether the peer has an open or closed channel with us.
742
func (c *ChannelStateDB) FetchPermAndTempPeers(
743
        chainHash []byte) (map[string]ChanCount, error) {
3✔
744

3✔
745
        peerCounts := make(map[string]ChanCount)
3✔
746

3✔
747
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
748
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
749
                if openChanBucket == nil {
3✔
750
                        return ErrNoChanDBExists
×
751
                }
×
752

753
                openChanErr := openChanBucket.ForEach(func(nodePub,
3✔
754
                        v []byte) error {
6✔
755

3✔
756
                        // If there is a value, this is not a bucket.
3✔
757
                        if v != nil {
3✔
758
                                return nil
×
759
                        }
×
760

761
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
762
                                nodePub,
3✔
763
                        )
3✔
764
                        if nodeChanBucket == nil {
3✔
765
                                return nil
×
766
                        }
×
767

768
                        chainBucket := nodeChanBucket.NestedReadBucket(
3✔
769
                                chainHash,
3✔
770
                        )
3✔
771
                        if chainBucket == nil {
3✔
772
                                return fmt.Errorf("no chain bucket exists")
×
773
                        }
×
774

775
                        var isPermPeer bool
3✔
776
                        var pendingOpenCount uint64
3✔
777

3✔
778
                        internalErr := chainBucket.ForEach(func(chanPoint,
3✔
779
                                val []byte) error {
6✔
780

3✔
781
                                // If there is a value, this is not a bucket.
3✔
782
                                if val != nil {
3✔
783
                                        return nil
×
784
                                }
×
785

786
                                chanBucket := chainBucket.NestedReadBucket(
3✔
787
                                        chanPoint,
3✔
788
                                )
3✔
789
                                if chanBucket == nil {
3✔
790
                                        return nil
×
791
                                }
×
792

793
                                var op wire.OutPoint
3✔
794
                                readErr := graphdb.ReadOutpoint(
3✔
795
                                        bytes.NewReader(chanPoint), &op,
3✔
796
                                )
3✔
797
                                if readErr != nil {
3✔
798
                                        return readErr
×
799
                                }
×
800

801
                                // We need to go through each channel and look
802
                                // at the IsPending status.
803
                                openChan, err := fetchOpenChannel(
3✔
804
                                        chanBucket, &op,
3✔
805
                                )
3✔
806
                                if err != nil {
3✔
807
                                        return err
×
808
                                }
×
809

810
                                if openChan.IsPending {
6✔
811
                                        // Add to the pending-open count since
3✔
812
                                        // this is a temp peer.
3✔
813
                                        pendingOpenCount++
3✔
814
                                        return nil
3✔
815
                                }
3✔
816

817
                                // Since IsPending is false, this is a perm
818
                                // peer.
819
                                isPermPeer = true
3✔
820

3✔
821
                                return nil
3✔
822
                        })
823
                        if internalErr != nil {
3✔
824
                                return internalErr
×
825
                        }
×
826

827
                        peerCount := ChanCount{
3✔
828
                                HasOpenOrClosedChan: isPermPeer,
3✔
829
                                PendingOpenCount:    pendingOpenCount,
3✔
830
                        }
3✔
831
                        peerCounts[string(nodePub)] = peerCount
3✔
832

3✔
833
                        return nil
3✔
834
                })
835
                if openChanErr != nil {
3✔
836
                        return openChanErr
×
837
                }
×
838

839
                // Now check the closed channel bucket.
840
                historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
3✔
841
                if historicalChanBucket == nil {
3✔
842
                        return ErrNoHistoricalBucket
×
843
                }
×
844

845
                historicalErr := historicalChanBucket.ForEach(func(chanPoint,
3✔
846
                        v []byte) error {
6✔
847
                        // Parse each nested bucket and the chanInfoKey to get
3✔
848
                        // the IsPending bool. This determines whether the
3✔
849
                        // peer is protected or not.
3✔
850
                        if v != nil {
3✔
851
                                // This is not a bucket. This is currently not
×
852
                                // possible.
×
853
                                return nil
×
854
                        }
×
855

856
                        chanBucket := historicalChanBucket.NestedReadBucket(
3✔
857
                                chanPoint,
3✔
858
                        )
3✔
859
                        if chanBucket == nil {
3✔
860
                                // This is not possible.
×
861
                                return fmt.Errorf("no historical channel " +
×
862
                                        "bucket exists")
×
863
                        }
×
864

865
                        var op wire.OutPoint
3✔
866
                        readErr := graphdb.ReadOutpoint(
3✔
867
                                bytes.NewReader(chanPoint), &op,
3✔
868
                        )
3✔
869
                        if readErr != nil {
3✔
870
                                return readErr
×
871
                        }
×
872

873
                        // This channel is closed, but the structure of the
874
                        // historical bucket is the same. This is by design,
875
                        // which means we can call fetchOpenChannel.
876
                        channel, fetchErr := fetchOpenChannel(chanBucket, &op)
3✔
877
                        if fetchErr != nil {
3✔
878
                                return fetchErr
×
879
                        }
×
880

881
                        // Only include this peer in the protected class if
882
                        // the closing transaction confirmed. Note that
883
                        // CloseChannel can be called in the funding manager
884
                        // while IsPending is true which is why we need this
885
                        // special-casing to not count premature funding
886
                        // manager calls to CloseChannel.
887
                        if !channel.IsPending {
6✔
888
                                // Fetch the public key of the remote node. We
3✔
889
                                // need to use the string-ified serialized,
3✔
890
                                // compressed bytes as the key.
3✔
891
                                remotePub := channel.IdentityPub
3✔
892
                                remoteSer := remotePub.SerializeCompressed()
3✔
893
                                remoteKey := string(remoteSer)
3✔
894

3✔
895
                                count, exists := peerCounts[remoteKey]
3✔
896
                                if exists {
6✔
897
                                        count.HasOpenOrClosedChan = true
3✔
898
                                        peerCounts[remoteKey] = count
3✔
899
                                } else {
3✔
900
                                        peerCount := ChanCount{
×
901
                                                HasOpenOrClosedChan: true,
×
902
                                        }
×
903
                                        peerCounts[remoteKey] = peerCount
×
904
                                }
×
905
                        }
906

907
                        return nil
3✔
908
                })
909
                if historicalErr != nil {
3✔
910
                        return historicalErr
×
911
                }
×
912

913
                return nil
3✔
914
        }, func() {
3✔
915
                clear(peerCounts)
3✔
916
        })
3✔
917

918
        return peerCounts, err
3✔
919
}
920

921
// channelSelector describes a function that takes a chain-hash bucket from
922
// within the open-channel DB and returns the wanted channel point bytes, and
923
// channel point. It must return the ErrChannelNotFound error if the wanted
924
// channel is not in the given bucket.
925
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
926
        error)
927

928
// channelScanner will traverse the DB to each chain-hash bucket of each node
929
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
930
// to fetch the wanted channel outpoint from the chain bucket.
931
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
932
        chanSelect channelSelector) (*OpenChannel, error) {
3✔
933

3✔
934
        var (
3✔
935
                targetChan *OpenChannel
3✔
936

3✔
937
                // errChanFound is used to signal that the channel has been
3✔
938
                // found so that iteration through the DB buckets can stop.
3✔
939
                errChanFound = errors.New("channel found")
3✔
940
        )
3✔
941

3✔
942
        // chanScan will traverse the following bucket structure:
3✔
943
        //  * nodePub => chainHash => chanPoint
3✔
944
        //
3✔
945
        // At each level we go one further, ensuring that we're traversing the
3✔
946
        // proper key (that's actually a bucket). By only reading the bucket
3✔
947
        // structure and skipping fully decoding each channel, we save a good
3✔
948
        // bit of CPU as we don't need to do things like decompress public
3✔
949
        // keys.
3✔
950
        chanScan := func(tx kvdb.RTx) error {
6✔
951
                // Get the bucket dedicated to storing the metadata for open
3✔
952
                // channels.
3✔
953
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
954
                if openChanBucket == nil {
3✔
955
                        return ErrNoActiveChannels
×
956
                }
×
957

958
                // Within the node channel bucket, are the set of node pubkeys
959
                // we have channels with, we don't know the entire set, so we'll
960
                // check them all.
961
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
6✔
962
                        // Ensure that this is a key the same size as a pubkey,
3✔
963
                        // and also that it leads directly to a bucket.
3✔
964
                        if len(nodePub) != 33 || v != nil {
3✔
965
                                return nil
×
966
                        }
×
967

968
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
969
                                nodePub,
3✔
970
                        )
3✔
971
                        if nodeChanBucket == nil {
3✔
972
                                return nil
×
973
                        }
×
974

975
                        // The next layer down is all the chains that this node
976
                        // has channels on with us.
977
                        return nodeChanBucket.ForEach(func(chainHash,
3✔
978
                                v []byte) error {
6✔
979

3✔
980
                                // If there's a value, it's not a bucket so
3✔
981
                                // ignore it.
3✔
982
                                if v != nil {
3✔
983
                                        return nil
×
984
                                }
×
985

986
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
987
                                        chainHash,
3✔
988
                                )
3✔
989
                                if chainBucket == nil {
3✔
990
                                        return fmt.Errorf("unable to read "+
×
991
                                                "bucket for chain=%x",
×
992
                                                chainHash)
×
993
                                }
×
994

995
                                // Finally, we reach the leaf bucket that stores
996
                                // all the chanPoints for this node.
997
                                targetChanBytes, chanPoint, err := chanSelect(
3✔
998
                                        chainBucket,
3✔
999
                                )
3✔
1000
                                if errors.Is(err, ErrChannelNotFound) {
3✔
UNCOV
1001
                                        return nil
×
1002
                                } else if err != nil {
3✔
1003
                                        return err
×
1004
                                }
×
1005

1006
                                chanBucket := chainBucket.NestedReadBucket(
3✔
1007
                                        targetChanBytes,
3✔
1008
                                )
3✔
1009
                                if chanBucket == nil {
6✔
1010
                                        return nil
3✔
1011
                                }
3✔
1012

1013
                                channel, err := fetchOpenChannel(
3✔
1014
                                        chanBucket, chanPoint,
3✔
1015
                                )
3✔
1016
                                if err != nil {
3✔
1017
                                        return err
×
1018
                                }
×
1019

1020
                                targetChan = channel
3✔
1021
                                targetChan.Db = c
3✔
1022

3✔
1023
                                return errChanFound
3✔
1024
                        })
1025
                })
1026
        }
1027

1028
        var err error
3✔
1029
        if tx == nil {
6✔
1030
                err = kvdb.View(c.backend, chanScan, func() {})
6✔
1031
        } else {
×
1032
                err = chanScan(tx)
×
1033
        }
×
1034
        if err != nil && !errors.Is(err, errChanFound) {
3✔
1035
                return nil, err
×
1036
        }
×
1037

1038
        if targetChan != nil {
6✔
1039
                return targetChan, nil
3✔
1040
        }
3✔
1041

1042
        // If we can't find the channel, then we return with an error, as we
1043
        // have nothing to back up.
1044
        return nil, ErrChannelNotFound
3✔
1045
}
1046

1047
// FetchAllChannels attempts to retrieve all open channels currently stored
1048
// within the database, including pending open, fully open and channels waiting
1049
// for a closing transaction to confirm.
1050
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
3✔
1051
        return fetchChannels(c)
3✔
1052
}
3✔
1053

1054
// FetchAllOpenChannels will return all channels that have the funding
1055
// transaction confirmed, and is not waiting for a closing transaction to be
1056
// confirmed.
1057
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
3✔
1058
        return fetchChannels(
3✔
1059
                c,
3✔
1060
                pendingChannelFilter(false),
3✔
1061
                waitingCloseFilter(false),
3✔
1062
        )
3✔
1063
}
3✔
1064

1065
// FetchPendingChannels will return channels that have completed the process of
1066
// generating and broadcasting funding transactions, but whose funding
1067
// transactions have yet to be confirmed on the blockchain.
1068
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
3✔
1069
        return fetchChannels(c,
3✔
1070
                pendingChannelFilter(true),
3✔
1071
                waitingCloseFilter(false),
3✔
1072
        )
3✔
1073
}
3✔
1074

1075
// FetchWaitingCloseChannels will return all channels that have been opened,
1076
// but are now waiting for a closing transaction to be confirmed.
1077
//
1078
// NOTE: This includes channels that are also pending to be opened.
1079
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
3✔
1080
        return fetchChannels(
3✔
1081
                c, waitingCloseFilter(true),
3✔
1082
        )
3✔
1083
}
3✔
1084

1085
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
1086
// A set of filters can be combined to filter across multiple dimensions.
1087
type fetchChannelsFilter func(channel *OpenChannel) bool
1088

1089
// pendingChannelFilter returns a filter based on whether channels are pending
1090
// (ie, their funding transaction still needs to confirm). If pending is false,
1091
// channels with confirmed funding transactions are returned.
1092
func pendingChannelFilter(pending bool) fetchChannelsFilter {
3✔
1093
        return func(channel *OpenChannel) bool {
6✔
1094
                return channel.IsPending == pending
3✔
1095
        }
3✔
1096
}
1097

1098
// waitingCloseFilter returns a filter which filters channels based on whether
1099
// they are awaiting the confirmation of their closing transaction. If waiting
1100
// close is true, channels that have had their closing tx broadcast are
1101
// included. If it is false, channels that are not awaiting confirmation of
1102
// their close transaction are returned.
1103
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
3✔
1104
        return func(channel *OpenChannel) bool {
6✔
1105
                // If the channel is in any other state than Default,
3✔
1106
                // then it means it is waiting to be closed.
3✔
1107
                channelWaitingClose :=
3✔
1108
                        channel.ChanStatus() != ChanStatusDefault
3✔
1109

3✔
1110
                // Include the channel if it matches the value for
3✔
1111
                // waiting close that we are filtering on.
3✔
1112
                return channelWaitingClose == waitingClose
3✔
1113
        }
3✔
1114
}
1115

1116
// fetchChannels attempts to retrieve channels currently stored in the
1117
// database. It takes a set of filters which are applied to each channel to
1118
// obtain a set of channels with the desired set of properties. Only channels
1119
// which have a true value returned for *all* of the filters will be returned.
1120
// If no filters are provided, every channel in the open channels bucket will
1121
// be returned.
1122
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
1123
        []*OpenChannel, error) {
3✔
1124

3✔
1125
        var channels []*OpenChannel
3✔
1126

3✔
1127
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1128
                // Get the bucket dedicated to storing the metadata for open
3✔
1129
                // channels.
3✔
1130
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
1131
                if openChanBucket == nil {
3✔
1132
                        return ErrNoActiveChannels
×
1133
                }
×
1134

1135
                // Next, fetch the bucket dedicated to storing metadata related
1136
                // to all nodes. All keys within this bucket are the serialized
1137
                // public keys of all our direct counterparties.
1138
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
3✔
1139
                if nodeMetaBucket == nil {
3✔
1140
                        return fmt.Errorf("node bucket not created")
×
1141
                }
×
1142

1143
                // Finally for each node public key in the bucket, fetch all
1144
                // the channels related to this particular node.
1145
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
6✔
1146
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
3✔
1147
                        if nodeChanBucket == nil {
3✔
1148
                                return nil
×
1149
                        }
×
1150

1151
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
1152
                                // If there's a value, it's not a bucket so
3✔
1153
                                // ignore it.
3✔
1154
                                if v != nil {
3✔
1155
                                        return nil
×
1156
                                }
×
1157

1158
                                // If we've found a valid chainhash bucket,
1159
                                // then we'll retrieve that so we can extract
1160
                                // all the channels.
1161
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
1162
                                        chainHash,
3✔
1163
                                )
3✔
1164
                                if chainBucket == nil {
3✔
1165
                                        return fmt.Errorf("unable to read "+
×
1166
                                                "bucket for chain=%x", chainHash[:])
×
1167
                                }
×
1168

1169
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
3✔
1170
                                if err != nil {
3✔
1171
                                        return fmt.Errorf("unable to read "+
×
1172
                                                "channel for chain_hash=%x, "+
×
1173
                                                "node_key=%x: %v", chainHash[:], k, err)
×
1174
                                }
×
1175
                                for _, channel := range nodeChans {
6✔
1176
                                        // includeChannel indicates whether the channel
3✔
1177
                                        // meets the criteria specified by our filters.
3✔
1178
                                        includeChannel := true
3✔
1179

3✔
1180
                                        // Run through each filter and check whether the
3✔
1181
                                        // channel should be included.
3✔
1182
                                        for _, f := range filters {
6✔
1183
                                                // If the channel fails the filter, set
3✔
1184
                                                // includeChannel to false and don't bother
3✔
1185
                                                // checking the remaining filters.
3✔
1186
                                                if !f(channel) {
6✔
1187
                                                        includeChannel = false
3✔
1188
                                                        break
3✔
1189
                                                }
1190
                                        }
1191

1192
                                        // If the channel passed every filter, include it in
1193
                                        // our set of channels.
1194
                                        if includeChannel {
6✔
1195
                                                channels = append(channels, channel)
3✔
1196
                                        }
3✔
1197
                                }
1198
                                return nil
3✔
1199
                        })
1200

1201
                })
1202
        }, func() {
3✔
1203
                channels = nil
3✔
1204
        })
3✔
1205
        if err != nil {
3✔
1206
                return nil, err
×
1207
        }
×
1208

1209
        return channels, nil
3✔
1210
}
1211

1212
// FetchClosedChannels attempts to fetch all closed channels from the database.
1213
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1214
// be returned in the response or not. When a channel was cooperatively closed,
1215
// it becomes fully closed after a single confirmation.  When a channel was
1216
// forcibly closed, it will become fully closed after _all_ the pending funds
1217
// (if any) have been swept.
1218
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1219
        []*ChannelCloseSummary, error) {
3✔
1220

3✔
1221
        var chanSummaries []*ChannelCloseSummary
3✔
1222

3✔
1223
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1224
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1225
                if closeBucket == nil {
3✔
1226
                        return ErrNoClosedChannels
×
1227
                }
×
1228

1229
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
6✔
1230
                        summaryReader := bytes.NewReader(summaryBytes)
3✔
1231
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
3✔
1232
                        if err != nil {
3✔
1233
                                return err
×
1234
                        }
×
1235

1236
                        // If the query specified to only include pending
1237
                        // channels, then we'll skip any channels which aren't
1238
                        // currently pending.
1239
                        if !chanSummary.IsPending && pendingOnly {
6✔
1240
                                return nil
3✔
1241
                        }
3✔
1242

1243
                        chanSummaries = append(chanSummaries, chanSummary)
3✔
1244
                        return nil
3✔
1245
                })
1246
        }, func() {
3✔
1247
                chanSummaries = nil
3✔
1248
        }); err != nil {
3✔
1249
                return nil, err
×
1250
        }
×
1251

1252
        return chanSummaries, nil
3✔
1253
}
1254

1255
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1256
// the channeldb.
1257
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1258

1259
// FetchClosedChannel queries for a channel close summary using the channel
1260
// point of the channel in question.
1261
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1262
        *ChannelCloseSummary, error) {
3✔
1263

3✔
1264
        var chanSummary *ChannelCloseSummary
3✔
1265
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1266
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1267
                if closeBucket == nil {
3✔
1268
                        return ErrClosedChannelNotFound
×
1269
                }
×
1270

1271
                var b bytes.Buffer
3✔
1272
                var err error
3✔
1273
                if err = graphdb.WriteOutpoint(&b, chanID); err != nil {
3✔
1274
                        return err
×
1275
                }
×
1276

1277
                summaryBytes := closeBucket.Get(b.Bytes())
3✔
1278
                if summaryBytes == nil {
3✔
UNCOV
1279
                        return ErrClosedChannelNotFound
×
UNCOV
1280
                }
×
1281

1282
                summaryReader := bytes.NewReader(summaryBytes)
3✔
1283
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
3✔
1284

3✔
1285
                return err
3✔
1286
        }, func() {
3✔
1287
                chanSummary = nil
3✔
1288
        }); err != nil {
3✔
UNCOV
1289
                return nil, err
×
UNCOV
1290
        }
×
1291

1292
        return chanSummary, nil
3✔
1293
}
1294

1295
// FetchClosedChannelForID queries for a channel close summary using the
1296
// channel ID of the channel in question.
1297
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1298
        *ChannelCloseSummary, error) {
3✔
1299

3✔
1300
        var chanSummary *ChannelCloseSummary
3✔
1301
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1302
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1303
                if closeBucket == nil {
3✔
1304
                        return ErrClosedChannelNotFound
×
1305
                }
×
1306

1307
                // The first 30 bytes of the channel ID and outpoint will be
1308
                // equal.
1309
                cursor := closeBucket.ReadCursor()
3✔
1310
                op, c := cursor.Seek(cid[:30])
3✔
1311

3✔
1312
                // We scan over all possible candidates for this channel ID.
3✔
1313
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
6✔
1314
                        var outPoint wire.OutPoint
3✔
1315
                        err := graphdb.ReadOutpoint(
3✔
1316
                                bytes.NewReader(op), &outPoint,
3✔
1317
                        )
3✔
1318
                        if err != nil {
3✔
1319
                                return err
×
1320
                        }
×
1321

1322
                        // If the found outpoint does not correspond to this
1323
                        // channel ID, we continue.
1324
                        if !cid.IsChanPoint(&outPoint) {
3✔
UNCOV
1325
                                continue
×
1326
                        }
1327

1328
                        // Deserialize the close summary and return.
1329
                        r := bytes.NewReader(c)
3✔
1330
                        chanSummary, err = deserializeCloseChannelSummary(r)
3✔
1331
                        if err != nil {
3✔
1332
                                return err
×
1333
                        }
×
1334

1335
                        return nil
3✔
1336
                }
1337
                return ErrClosedChannelNotFound
3✔
1338
        }, func() {
3✔
1339
                chanSummary = nil
3✔
1340
        }); err != nil {
6✔
1341
                return nil, err
3✔
1342
        }
3✔
1343

1344
        return chanSummary, nil
3✔
1345
}
1346

1347
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1348
// channel should be marked as fully closed if the channel was initially
1349
// cooperatively closed and it's reached a single confirmation, or after all
1350
// the pending funds in a channel that has been forcibly closed have been
1351
// swept.
1352
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
3✔
1353
        var (
3✔
1354
                openChannels  []*OpenChannel
3✔
1355
                pruneLinkNode *btcec.PublicKey
3✔
1356
        )
3✔
1357
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1358
                var b bytes.Buffer
3✔
1359
                if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
3✔
1360
                        return err
×
1361
                }
×
1362

1363
                chanID := b.Bytes()
3✔
1364

3✔
1365
                closedChanBucket, err := tx.CreateTopLevelBucket(
3✔
1366
                        closedChannelBucket,
3✔
1367
                )
3✔
1368
                if err != nil {
3✔
1369
                        return err
×
1370
                }
×
1371

1372
                chanSummaryBytes := closedChanBucket.Get(chanID)
3✔
1373
                if chanSummaryBytes == nil {
3✔
1374
                        return fmt.Errorf("no closed channel for "+
×
1375
                                "chan_point=%v found", chanPoint)
×
1376
                }
×
1377

1378
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
3✔
1379
                chanSummary, err := deserializeCloseChannelSummary(
3✔
1380
                        chanSummaryReader,
3✔
1381
                )
3✔
1382
                if err != nil {
3✔
1383
                        return err
×
1384
                }
×
1385

1386
                chanSummary.IsPending = false
3✔
1387

3✔
1388
                var newSummary bytes.Buffer
3✔
1389
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
3✔
1390
                if err != nil {
3✔
1391
                        return err
×
1392
                }
×
1393

1394
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
3✔
1395
                if err != nil {
3✔
1396
                        return err
×
1397
                }
×
1398

1399
                // Now that the channel is closed, we'll check if we have any
1400
                // other open channels with this peer. If we don't we'll
1401
                // garbage collect it to ensure we don't establish persistent
1402
                // connections to peers without open channels.
1403
                pruneLinkNode = chanSummary.RemotePub
3✔
1404
                openChannels, err = c.fetchOpenChannels(
3✔
1405
                        tx, pruneLinkNode,
3✔
1406
                )
3✔
1407
                if err != nil {
3✔
1408
                        return fmt.Errorf("unable to fetch open channels for "+
×
1409
                                "peer %x: %v",
×
1410
                                pruneLinkNode.SerializeCompressed(), err)
×
1411
                }
×
1412

1413
                return nil
3✔
1414
        }, func() {
3✔
1415
                openChannels = nil
3✔
1416
                pruneLinkNode = nil
3✔
1417
        })
3✔
1418
        if err != nil {
3✔
1419
                return err
×
1420
        }
×
1421

1422
        // Decide whether we want to remove the link node, based upon the number
1423
        // of still open channels.
1424
        return c.pruneLinkNode(openChannels, pruneLinkNode)
3✔
1425
}
1426

1427
// pruneLinkNode determines whether we should garbage collect a link node from
1428
// the database due to no longer having any open channels with it. If there are
1429
// any left, then this acts as a no-op.
1430
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1431
        remotePub *btcec.PublicKey) error {
3✔
1432

3✔
1433
        if len(openChannels) > 0 {
6✔
1434
                return nil
3✔
1435
        }
3✔
1436

1437
        log.Infof("Pruning link node %x with zero open channels from database",
3✔
1438
                remotePub.SerializeCompressed())
3✔
1439

3✔
1440
        return c.linkNodeDB.DeleteLinkNode(remotePub)
3✔
1441
}
1442

1443
// PruneLinkNodes attempts to prune all link nodes found within the database
1444
// with whom we no longer have any open channels with.
1445
func (c *ChannelStateDB) PruneLinkNodes() error {
3✔
1446
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
3✔
1447
        if err != nil {
3✔
1448
                return err
×
1449
        }
×
1450

1451
        for _, linkNode := range allLinkNodes {
6✔
1452
                var (
3✔
1453
                        openChannels []*OpenChannel
3✔
1454
                        linkNode     = linkNode
3✔
1455
                )
3✔
1456
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1457
                        var err error
3✔
1458
                        openChannels, err = c.fetchOpenChannels(
3✔
1459
                                tx, linkNode.IdentityPub,
3✔
1460
                        )
3✔
1461
                        return err
3✔
1462
                }, func() {
6✔
1463
                        openChannels = nil
3✔
1464
                })
3✔
1465
                if err != nil {
3✔
1466
                        return err
×
1467
                }
×
1468

1469
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
3✔
1470
                if err != nil {
3✔
1471
                        return err
×
1472
                }
×
1473
        }
1474

1475
        return nil
3✔
1476
}
1477

1478
// ChannelShell is a shell of a channel that is meant to be used for channel
1479
// recovery purposes. It contains a minimal OpenChannel instance along with
1480
// addresses for that target node.
1481
type ChannelShell struct {
1482
        // NodeAddrs the set of addresses that this node has known to be
1483
        // reachable at in the past.
1484
        NodeAddrs []net.Addr
1485

1486
        // Chan is a shell of an OpenChannel, it contains only the items
1487
        // required to restore the channel on disk.
1488
        Chan *OpenChannel
1489
}
1490

1491
// RestoreChannelShells is a method that allows the caller to reconstruct the
1492
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1493
// new channel to disk, create a LinkNode instance with the passed node
1494
// addresses, and finally create an edge within the graph for the channel as
1495
// well. This method is idempotent, so repeated calls with the same set of
1496
// channel shells won't modify the database after the initial call.
1497
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
3✔
1498
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1499
                for _, channelShell := range channelShells {
6✔
1500
                        channel := channelShell.Chan
3✔
1501

3✔
1502
                        // When we make a channel, we mark that the channel has
3✔
1503
                        // been restored, this will signal to other sub-systems
3✔
1504
                        // to not attempt to use the channel as if it was a
3✔
1505
                        // regular one.
3✔
1506
                        channel.chanStatus |= ChanStatusRestored
3✔
1507

3✔
1508
                        // First, we'll attempt to create a new open channel
3✔
1509
                        // and link node for this channel. If the channel
3✔
1510
                        // already exists, then in order to ensure this method
3✔
1511
                        // is idempotent, we'll continue to the next step.
3✔
1512
                        channel.Db = c
3✔
1513
                        err := syncNewChannel(
3✔
1514
                                tx, channel, channelShell.NodeAddrs,
3✔
1515
                        )
3✔
1516
                        if err != nil {
6✔
1517
                                return err
3✔
1518
                        }
3✔
1519
                }
1520

1521
                return nil
3✔
1522
        }, func() {})
3✔
1523
        if err != nil {
6✔
1524
                return err
3✔
1525
        }
3✔
1526

1527
        return nil
3✔
1528
}
1529

1530
// AddrsForNode consults the channel database for all addresses known to the
1531
// passed node public key. The returned boolean indicates if the given node is
1532
// unknown to the channel DB or not.
1533
//
1534
// NOTE: this is part of the AddrSource interface.
1535
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr, error) {
3✔
1536
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
3✔
1537
        // Only if the error is something other than ErrNodeNotFound do we
3✔
1538
        // return it.
3✔
1539
        switch {
3✔
1540
        case err != nil && !errors.Is(err, ErrNodeNotFound):
×
1541
                return false, nil, err
×
1542

1543
        case errors.Is(err, ErrNodeNotFound):
×
1544
                return false, nil, nil
×
1545
        }
1546

1547
        return true, linkNode.Addresses, nil
3✔
1548
}
1549

1550
// AbandonChannel attempts to remove the target channel from the open channel
1551
// database. If the channel was already removed (has a closed channel entry),
1552
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1553
// into the database.
1554
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1555
        bestHeight uint32) error {
3✔
1556

3✔
1557
        // With the chanPoint constructed, we'll attempt to find the target
3✔
1558
        // channel in the database. If we can't find the channel, then we'll
3✔
1559
        // return the error back to the caller.
3✔
1560
        dbChan, err := c.FetchChannel(*chanPoint)
3✔
1561
        switch {
3✔
1562
        // If the channel wasn't found, then it's possible that it was already
1563
        // abandoned from the database.
1564
        case err == ErrChannelNotFound:
3✔
1565
                _, closedErr := c.FetchClosedChannel(chanPoint)
3✔
1566
                if closedErr != nil {
3✔
UNCOV
1567
                        return closedErr
×
UNCOV
1568
                }
×
1569

1570
                // If the channel was already closed, then we don't return an
1571
                // error as we'd like this step to be repeatable.
1572
                return nil
3✔
1573
        case err != nil:
×
1574
                return err
×
1575
        }
1576

1577
        // Now that we've found the channel, we'll populate a close summary for
1578
        // the channel, so we can store as much information for this abounded
1579
        // channel as possible. We also ensure that we set Pending to false, to
1580
        // indicate that this channel has been "fully" closed.
1581
        summary := &ChannelCloseSummary{
3✔
1582
                CloseType:               Abandoned,
3✔
1583
                ChanPoint:               *chanPoint,
3✔
1584
                ChainHash:               dbChan.ChainHash,
3✔
1585
                CloseHeight:             bestHeight,
3✔
1586
                RemotePub:               dbChan.IdentityPub,
3✔
1587
                Capacity:                dbChan.Capacity,
3✔
1588
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
3✔
1589
                ShortChanID:             dbChan.ShortChanID(),
3✔
1590
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
3✔
1591
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
3✔
1592
                LocalChanConfig:         dbChan.LocalChanCfg,
3✔
1593
        }
3✔
1594

3✔
1595
        // Finally, we'll close the channel in the DB, and return back to the
3✔
1596
        // caller. We set ourselves as the close initiator because we abandoned
3✔
1597
        // the channel.
3✔
1598
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
3✔
1599
}
1600

1601
// SaveChannelOpeningState saves the serialized channel state for the provided
1602
// chanPoint to the channelOpeningStateBucket.
1603
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1604
        serializedState []byte) error {
3✔
1605

3✔
1606
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1607
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
3✔
1608
                if err != nil {
3✔
1609
                        return err
×
1610
                }
×
1611

1612
                return bucket.Put(outPoint, serializedState)
3✔
1613
        }, func() {})
3✔
1614
}
1615

1616
// GetChannelOpeningState fetches the serialized channel state for the provided
1617
// outPoint from the database, or returns ErrChannelNotFound if the channel
1618
// is not found.
1619
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1620
        error) {
3✔
1621

3✔
1622
        var serializedState []byte
3✔
1623
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1624
                bucket := tx.ReadBucket(channelOpeningStateBucket)
3✔
1625
                if bucket == nil {
6✔
1626
                        // If the bucket does not exist, it means we never added
3✔
1627
                        //  a channel to the db, so return ErrChannelNotFound.
3✔
1628
                        return ErrChannelNotFound
3✔
1629
                }
3✔
1630

1631
                stateBytes := bucket.Get(outPoint)
3✔
1632
                if stateBytes == nil {
6✔
1633
                        return ErrChannelNotFound
3✔
1634
                }
3✔
1635

1636
                serializedState = append(serializedState, stateBytes...)
3✔
1637

3✔
1638
                return nil
3✔
1639
        }, func() {
3✔
1640
                serializedState = nil
3✔
1641
        })
3✔
1642
        return serializedState, err
3✔
1643
}
1644

1645
// DeleteChannelOpeningState removes any state for outPoint from the database.
1646
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
3✔
1647
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1648
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
3✔
1649
                if bucket == nil {
3✔
1650
                        return ErrChannelNotFound
×
1651
                }
×
1652

1653
                return bucket.Delete(outPoint)
3✔
1654
        }, func() {})
3✔
1655
}
1656

1657
// syncVersions function is used for safe db version synchronization. It
1658
// applies migration functions to the current database and recovers the
1659
// previous state of db if at least one error/panic appeared during migration.
1660
func (d *DB) syncVersions(versions []mandatoryVersion) error {
3✔
1661
        meta, err := d.FetchMeta()
3✔
1662
        if err != nil {
3✔
1663
                if err == ErrMetaNotFound {
×
1664
                        meta = &Meta{}
×
1665
                } else {
×
1666
                        return err
×
1667
                }
×
1668
        }
1669

1670
        latestVersion := getLatestDBVersion(versions)
3✔
1671
        log.Infof("Checking for schema update: latest_version=%v, "+
3✔
1672
                "db_version=%v", latestVersion, meta.DbVersionNumber)
3✔
1673

3✔
1674
        switch {
3✔
1675

1676
        // If the database reports a higher version that we are aware of, the
1677
        // user is probably trying to revert to a prior version of lnd. We fail
1678
        // here to prevent reversions and unintended corruption.
UNCOV
1679
        case meta.DbVersionNumber > latestVersion:
×
UNCOV
1680
                log.Errorf("Refusing to revert from db_version=%d to "+
×
UNCOV
1681
                        "lower version=%d", meta.DbVersionNumber,
×
UNCOV
1682
                        latestVersion)
×
UNCOV
1683
                return ErrDBReversion
×
1684

1685
        // If the current database version matches the latest version number,
1686
        // then we don't need to perform any migrations.
1687
        case meta.DbVersionNumber == latestVersion:
3✔
1688
                return nil
3✔
1689
        }
1690

UNCOV
1691
        log.Infof("Performing database schema migration")
×
UNCOV
1692

×
UNCOV
1693
        // Otherwise, we fetch the migrations which need to applied, and
×
UNCOV
1694
        // execute them serially within a single database transaction to ensure
×
UNCOV
1695
        // the migration is atomic.
×
UNCOV
1696
        migrations, migrationVersions := getMigrationsToApply(
×
UNCOV
1697
                versions, meta.DbVersionNumber,
×
UNCOV
1698
        )
×
UNCOV
1699
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
1700
                for i, migration := range migrations {
×
UNCOV
1701
                        if migration == nil {
×
1702
                                continue
×
1703
                        }
1704

UNCOV
1705
                        log.Infof("Applying migration #%v",
×
UNCOV
1706
                                migrationVersions[i])
×
UNCOV
1707

×
UNCOV
1708
                        if err := migration(tx); err != nil {
×
UNCOV
1709
                                log.Infof("Unable to apply migration #%v",
×
UNCOV
1710
                                        migrationVersions[i])
×
UNCOV
1711
                                return err
×
UNCOV
1712
                        }
×
1713
                }
1714

UNCOV
1715
                meta.DbVersionNumber = latestVersion
×
UNCOV
1716
                err := putMeta(meta, tx)
×
UNCOV
1717
                if err != nil {
×
1718
                        return err
×
1719
                }
×
1720

1721
                // In dry-run mode, return an error to prevent the transaction
1722
                // from committing.
UNCOV
1723
                if d.dryRun {
×
UNCOV
1724
                        return ErrDryRunMigrationOK
×
UNCOV
1725
                }
×
1726

UNCOV
1727
                return nil
×
UNCOV
1728
        }, func() {})
×
1729
}
1730

1731
// applyOptionalVersions takes a config to determine whether the optional
1732
// migrations will be applied.
1733
//
1734
// NOTE: only support the prune_revocation_log optional migration atm.
1735
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
3✔
1736
        // TODO(yy): need to design the db to support dry run for optional
3✔
1737
        // migrations.
3✔
1738
        if d.dryRun {
3✔
UNCOV
1739
                log.Info("Skipped optional migrations as dry run mode is not " +
×
UNCOV
1740
                        "supported yet")
×
UNCOV
1741
                return nil
×
UNCOV
1742
        }
×
1743

1744
        om, err := d.fetchOptionalMeta()
3✔
1745
        if err != nil {
3✔
1746
                if err == ErrMetaNotFound {
×
1747
                        om = &OptionalMeta{
×
1748
                                Versions: make(map[uint64]string),
×
1749
                        }
×
1750
                } else {
×
1751
                        return err
×
1752
                }
×
1753
        }
1754

1755
        log.Infof("Checking for optional update: prune_revocation_log=%v, "+
3✔
1756
                "db_version=%s", cfg.PruneRevocationLog, om)
3✔
1757

3✔
1758
        // Exit early if the optional migration is not specified.
3✔
1759
        if !cfg.PruneRevocationLog {
6✔
1760
                return nil
3✔
1761
        }
3✔
1762

1763
        // Exit early if the optional migration has already been applied.
UNCOV
1764
        if _, ok := om.Versions[0]; ok {
×
UNCOV
1765
                return nil
×
UNCOV
1766
        }
×
1767

1768
        // Get the optional version.
UNCOV
1769
        version := optionalVersions[0]
×
UNCOV
1770
        log.Infof("Performing database optional migration: %s", version.name)
×
UNCOV
1771

×
UNCOV
1772
        migrationCfg := &MigrationConfigImpl{
×
UNCOV
1773
                migration30.MigrateRevLogConfigImpl{
×
UNCOV
1774
                        NoAmountData: d.noRevLogAmtData,
×
UNCOV
1775
                },
×
UNCOV
1776
        }
×
UNCOV
1777

×
UNCOV
1778
        // Migrate the data.
×
UNCOV
1779
        if err := version.migration(d, migrationCfg); err != nil {
×
1780
                log.Errorf("Unable to apply optional migration: %s, error: %v",
×
1781
                        version.name, err)
×
1782
                return err
×
1783
        }
×
1784

1785
        // Update the optional meta. Notice that unlike the mandatory db
1786
        // migrations where we perform the migration and updating meta in a
1787
        // single db transaction, we use different transactions here. Even when
1788
        // the following update is failed, we should be fine here as we would
1789
        // re-run the optional migration again, which is a noop, during next
1790
        // startup.
UNCOV
1791
        om.Versions[0] = version.name
×
UNCOV
1792
        if err := d.putOptionalMeta(om); err != nil {
×
1793
                log.Errorf("Unable to update optional meta: %v", err)
×
1794
                return err
×
1795
        }
×
1796

UNCOV
1797
        return nil
×
1798
}
1799

1800
// ChannelStateDB returns the sub database that is concerned with the channel
1801
// state.
1802
func (d *DB) ChannelStateDB() *ChannelStateDB {
3✔
1803
        return d.channelStateDB
3✔
1804
}
3✔
1805

1806
// LatestDBVersion returns the number of the latest database version currently
1807
// known to the channel DB.
UNCOV
1808
func LatestDBVersion() uint32 {
×
UNCOV
1809
        return getLatestDBVersion(dbVersions)
×
UNCOV
1810
}
×
1811

1812
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
3✔
1813
        return versions[len(versions)-1].number
3✔
1814
}
3✔
1815

1816
// getMigrationsToApply retrieves the migration function that should be
1817
// applied to the database.
1818
func getMigrationsToApply(versions []mandatoryVersion,
UNCOV
1819
        version uint32) ([]migration, []uint32) {
×
UNCOV
1820

×
UNCOV
1821
        migrations := make([]migration, 0, len(versions))
×
UNCOV
1822
        migrationVersions := make([]uint32, 0, len(versions))
×
UNCOV
1823

×
UNCOV
1824
        for _, v := range versions {
×
UNCOV
1825
                if v.number > version {
×
UNCOV
1826
                        migrations = append(migrations, v.migration)
×
UNCOV
1827
                        migrationVersions = append(migrationVersions, v.number)
×
UNCOV
1828
                }
×
1829
        }
1830

UNCOV
1831
        return migrations, migrationVersions
×
1832
}
1833

1834
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1835
// from the historical channel bucket. If the bucket does not exist,
1836
// ErrNoHistoricalBucket is returned.
1837
func fetchHistoricalChanBucket(tx kvdb.RTx,
1838
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
3✔
1839

3✔
1840
        // First fetch the top level bucket which stores all data related to
3✔
1841
        // historically stored channels.
3✔
1842
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
3✔
1843
        if historicalChanBucket == nil {
3✔
1844
                return nil, ErrNoHistoricalBucket
×
1845
        }
×
1846

1847
        // With the bucket for the node and chain fetched, we can now go down
1848
        // another level, for the channel itself.
1849
        var chanPointBuf bytes.Buffer
3✔
1850
        if err := graphdb.WriteOutpoint(&chanPointBuf, outPoint); err != nil {
3✔
1851
                return nil, err
×
1852
        }
×
1853
        chanBucket := historicalChanBucket.NestedReadBucket(
3✔
1854
                chanPointBuf.Bytes(),
3✔
1855
        )
3✔
1856
        if chanBucket == nil {
3✔
UNCOV
1857
                return nil, ErrChannelNotFound
×
UNCOV
1858
        }
×
1859

1860
        return chanBucket, nil
3✔
1861
}
1862

1863
// FetchHistoricalChannel fetches open channel data from the historical channel
1864
// bucket.
1865
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1866
        *OpenChannel, error) {
3✔
1867

3✔
1868
        var channel *OpenChannel
3✔
1869
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1870
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
3✔
1871
                if err != nil {
3✔
UNCOV
1872
                        return err
×
UNCOV
1873
                }
×
1874

1875
                channel, err = fetchOpenChannel(chanBucket, outPoint)
3✔
1876
                if err != nil {
3✔
1877
                        return err
×
1878
                }
×
1879

1880
                channel.Db = c
3✔
1881
                return nil
3✔
1882
        }, func() {
3✔
1883
                channel = nil
3✔
1884
        })
3✔
1885
        if err != nil {
3✔
UNCOV
1886
                return nil, err
×
UNCOV
1887
        }
×
1888

1889
        return channel, nil
3✔
1890
}
1891

1892
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1893
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
3✔
1894

3✔
1895
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
3✔
1896
        if finalHtlcsBucket == nil {
3✔
UNCOV
1897
                return nil, ErrFinalHtlcsBucketNotFound
×
UNCOV
1898
        }
×
1899

1900
        var chanIDBytes [8]byte
3✔
1901
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
3✔
1902

3✔
1903
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
3✔
1904
        if chanBucket == nil {
3✔
1905
                return nil, ErrFinalChannelBucketNotFound
×
1906
        }
×
1907

1908
        return chanBucket, nil
3✔
1909
}
1910

1911
var ErrHtlcUnknown = errors.New("htlc unknown")
1912

1913
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1914
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1915
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1916
        htlcIndex uint64) (*FinalHtlcInfo, error) {
3✔
1917

3✔
1918
        var idBytes [8]byte
3✔
1919
        byteOrder.PutUint64(idBytes[:], htlcIndex)
3✔
1920

3✔
1921
        var settledByte byte
3✔
1922

3✔
1923
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1924
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
3✔
1925
                        tx, chanID,
3✔
1926
                )
3✔
1927
                switch {
3✔
UNCOV
1928
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
×
UNCOV
1929
                        fallthrough
×
1930

UNCOV
1931
                case errors.Is(err, ErrFinalChannelBucketNotFound):
×
UNCOV
1932
                        return ErrHtlcUnknown
×
1933

1934
                case err != nil:
×
1935
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1936
                                err)
×
1937
                }
1938

1939
                value := finalHtlcsBucket.Get(idBytes[:])
3✔
1940
                if value == nil {
3✔
UNCOV
1941
                        return ErrHtlcUnknown
×
UNCOV
1942
                }
×
1943

1944
                if len(value) != 1 {
3✔
1945
                        return errors.New("unexpected final htlc value length")
×
1946
                }
×
1947

1948
                settledByte = value[0]
3✔
1949

3✔
1950
                return nil
3✔
1951
        }, func() {
3✔
1952
                settledByte = 0
3✔
1953
        })
3✔
1954
        if err != nil {
3✔
UNCOV
1955
                return nil, err
×
UNCOV
1956
        }
×
1957

1958
        info := FinalHtlcInfo{
3✔
1959
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
3✔
1960
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
3✔
1961
        }
3✔
1962

3✔
1963
        return &info, nil
3✔
1964
}
1965

1966
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
1967
// the database.
1968
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
1969
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
3✔
1970

3✔
1971
        // Skip if the user did not opt in to storing final resolutions.
3✔
1972
        if !c.parent.storeFinalHtlcResolutions {
6✔
1973
                return nil
3✔
1974
        }
3✔
1975

UNCOV
1976
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
×
UNCOV
1977
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
×
UNCOV
1978
                if err != nil {
×
1979
                        return err
×
1980
                }
×
1981

UNCOV
1982
                return putFinalHtlc(
×
UNCOV
1983
                        finalHtlcsBucket, htlcID,
×
UNCOV
1984
                        FinalHtlcInfo{
×
UNCOV
1985
                                Settled:  settled,
×
UNCOV
1986
                                Offchain: false,
×
UNCOV
1987
                        },
×
UNCOV
1988
                )
×
UNCOV
1989
        }, func() {})
×
1990
}
1991

1992
// MakeTestInvoiceDB is used to create a test invoice database for testing
1993
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
1994
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
UNCOV
1995
        invoices.InvoiceDB, error) {
×
UNCOV
1996

×
UNCOV
1997
        return MakeTestDB(t, modifiers...)
×
UNCOV
1998
}
×
1999

2000
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
2001
// A callback which cleans up the created temporary directories is also
2002
// returned and intended to be executed after the test completes.
UNCOV
2003
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
×
UNCOV
2004
        // First, create a temporary directory to be used for the duration of
×
UNCOV
2005
        // this test.
×
UNCOV
2006
        tempDirName := t.TempDir()
×
UNCOV
2007

×
UNCOV
2008
        // Next, create channeldb for the first time.
×
UNCOV
2009
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
×
UNCOV
2010
        if err != nil {
×
2011
                backendCleanup()
×
2012
                return nil, err
×
2013
        }
×
2014

UNCOV
2015
        cdb, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
2016
        if err != nil {
×
2017
                backendCleanup()
×
2018
                return nil, err
×
2019
        }
×
2020

UNCOV
2021
        t.Cleanup(func() {
×
UNCOV
2022
                cdb.Close()
×
UNCOV
2023
                backendCleanup()
×
UNCOV
2024
        })
×
2025

UNCOV
2026
        return cdb, nil
×
2027
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc