• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16293817609

15 Jul 2025 12:53PM UTC coverage: 57.584% (-9.8%) from 67.338%
16293817609

Pull #10031

github

web-flow
Merge 8fd26da8b into df6c02e3a
Pull Request #10031: Skip unnecessary disconnect and wait for disconnect to finish in shutdown

36 of 40 new or added lines in 2 files covered. (90.0%)

28482 existing lines in 458 files now uncovered.

98681 of 171370 relevant lines covered (57.58%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.03
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "net"
10
        "os"
11
        "testing"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/btcsuite/btcwallet/walletdb"
16
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
17
        "github.com/lightningnetwork/lnd/channeldb/migration12"
18
        "github.com/lightningnetwork/lnd/channeldb/migration13"
19
        "github.com/lightningnetwork/lnd/channeldb/migration16"
20
        "github.com/lightningnetwork/lnd/channeldb/migration20"
21
        "github.com/lightningnetwork/lnd/channeldb/migration21"
22
        "github.com/lightningnetwork/lnd/channeldb/migration23"
23
        "github.com/lightningnetwork/lnd/channeldb/migration24"
24
        "github.com/lightningnetwork/lnd/channeldb/migration25"
25
        "github.com/lightningnetwork/lnd/channeldb/migration26"
26
        "github.com/lightningnetwork/lnd/channeldb/migration27"
27
        "github.com/lightningnetwork/lnd/channeldb/migration29"
28
        "github.com/lightningnetwork/lnd/channeldb/migration30"
29
        "github.com/lightningnetwork/lnd/channeldb/migration31"
30
        "github.com/lightningnetwork/lnd/channeldb/migration32"
31
        "github.com/lightningnetwork/lnd/channeldb/migration33"
32
        "github.com/lightningnetwork/lnd/channeldb/migration34"
33
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
34
        "github.com/lightningnetwork/lnd/clock"
35
        graphdb "github.com/lightningnetwork/lnd/graph/db"
36
        "github.com/lightningnetwork/lnd/invoices"
37
        "github.com/lightningnetwork/lnd/kvdb"
38
        "github.com/lightningnetwork/lnd/lnwire"
39
        "github.com/stretchr/testify/require"
40
)
41

42
const (
43
        dbName = "channel.db"
44
)
45

46
var (
47
        // ErrDryRunMigrationOK signals that a migration executed successful,
48
        // but we intentionally did not commit the result.
49
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
50

51
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
52
        // bucket does not exist.
53
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
54
                "found")
55

56
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
57
        // final htlc outcomes does not exist.
58
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
59
                "bucket not found")
60
)
61

62
// migration is a function which takes a prior outdated version of the database
63
// instances and mutates the key/bucket structure to arrive at a more
64
// up-to-date version of the database.
65
type migration func(tx kvdb.RwTx) error
66

67
// mandatoryVersion defines a db version that must be applied before the lnd
68
// starts.
69
type mandatoryVersion struct {
70
        number    uint32
71
        migration migration
72
}
73

74
// MigrationConfig is an interface combines the config interfaces of all
75
// optional migrations.
76
type MigrationConfig interface {
77
        migration30.MigrateRevLogConfig
78
        migration34.MigrationConfig
79
}
80

81
// MigrationConfigImpl is a super set of all the various migration configs and
82
// an implementation of MigrationConfig.
83
type MigrationConfigImpl struct {
84
        migration30.MigrateRevLogConfigImpl
85
        migration34.MigrationConfigImpl
86
}
87

88
// optionalMigration defines an optional migration function. When a migration
89
// is optional, it usually involves a large scale of changes that might touch
90
// millions of keys. Due to OOM concern, the update cannot be safely done
91
// within one db transaction. Thus, for optional migrations, they must take the
92
// db backend and construct transactions as needed.
93
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
94

95
// optionalVersion defines a db version that can be optionally applied. When
96
// applying migrations, we must apply all the mandatory migrations first before
97
// attempting optional ones.
98
type optionalVersion struct {
99
        name      string
100
        migration optionalMigration
101
}
102

103
var (
104
        // dbVersions is storing all mandatory versions of database. If current
105
        // version of database don't match with latest version this list will
106
        // be used for retrieving all migration function that are need to apply
107
        // to the current db.
108
        dbVersions = []mandatoryVersion{
109
                {
110
                        // The base DB version requires no migration.
111
                        number:    0,
112
                        migration: nil,
113
                },
114
                {
115
                        // The version of the database where two new indexes
116
                        // for the update time of node and channel updates were
117
                        // added.
118
                        number:    1,
119
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
120
                },
121
                {
122
                        // The DB version that added the invoice event time
123
                        // series.
124
                        number:    2,
125
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
126
                },
127
                {
128
                        // The DB version that updated the embedded invoice in
129
                        // outgoing payments to match the new format.
130
                        number:    3,
131
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
132
                },
133
                {
134
                        // The version of the database where every channel
135
                        // always has two entries in the edges bucket. If
136
                        // a policy is unknown, this will be represented
137
                        // by a special byte sequence.
138
                        number:    4,
139
                        migration: migration_01_to_11.MigrateEdgePolicies,
140
                },
141
                {
142
                        // The DB version where we persist each attempt to send
143
                        // an HTLC to a payment hash, and track whether the
144
                        // payment is in-flight, succeeded, or failed.
145
                        number:    5,
146
                        migration: migration_01_to_11.PaymentStatusesMigration,
147
                },
148
                {
149
                        // The DB version that properly prunes stale entries
150
                        // from the edge update index.
151
                        number:    6,
152
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
153
                },
154
                {
155
                        // The DB version that migrates the ChannelCloseSummary
156
                        // to a format where optional fields are indicated with
157
                        // boolean flags.
158
                        number:    7,
159
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
160
                },
161
                {
162
                        // The DB version that changes the gossiper's message
163
                        // store keys to account for the message's type and
164
                        // ShortChannelID.
165
                        number:    8,
166
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
167
                },
168
                {
169
                        // The DB version where the payments and payment
170
                        // statuses are moved to being stored in a combined
171
                        // bucket.
172
                        number:    9,
173
                        migration: migration_01_to_11.MigrateOutgoingPayments,
174
                },
175
                {
176
                        // The DB version where we started to store legacy
177
                        // payload information for all routes, as well as the
178
                        // optional TLV records.
179
                        number:    10,
180
                        migration: migration_01_to_11.MigrateRouteSerialization,
181
                },
182
                {
183
                        // Add invoice htlc and cltv delta fields.
184
                        number:    11,
185
                        migration: migration_01_to_11.MigrateInvoices,
186
                },
187
                {
188
                        // Migrate to TLV invoice bodies, add payment address
189
                        // and features, remove receipt.
190
                        number:    12,
191
                        migration: migration12.MigrateInvoiceTLV,
192
                },
193
                {
194
                        // Migrate to multi-path payments.
195
                        number:    13,
196
                        migration: migration13.MigrateMPP,
197
                },
198
                {
199
                        // Initialize payment address index and begin using it
200
                        // as the default index, falling back to payment hash
201
                        // index.
202
                        number:    14,
203
                        migration: mig.CreateTLB(payAddrIndexBucket),
204
                },
205
                {
206
                        // Initialize payment index bucket which will be used
207
                        // to index payments by sequence number. This index will
208
                        // be used to allow more efficient ListPayments queries.
209
                        number:    15,
210
                        migration: mig.CreateTLB(paymentsIndexBucket),
211
                },
212
                {
213
                        // Add our existing payments to the index bucket created
214
                        // in migration 15.
215
                        number:    16,
216
                        migration: migration16.MigrateSequenceIndex,
217
                },
218
                {
219
                        // Create a top level bucket which will store extra
220
                        // information about channel closes.
221
                        number:    17,
222
                        migration: mig.CreateTLB(closeSummaryBucket),
223
                },
224
                {
225
                        // Create a top level bucket which holds information
226
                        // about our peers.
227
                        number:    18,
228
                        migration: mig.CreateTLB(peersBucket),
229
                },
230
                {
231
                        // Create a top level bucket which holds outpoint
232
                        // information.
233
                        number:    19,
234
                        migration: mig.CreateTLB(outpointBucket),
235
                },
236
                {
237
                        // Migrate some data to the outpoint index.
238
                        number:    20,
239
                        migration: migration20.MigrateOutpointIndex,
240
                },
241
                {
242
                        // Migrate to length prefixed wire messages everywhere
243
                        // in the database.
244
                        number:    21,
245
                        migration: migration21.MigrateDatabaseWireMessages,
246
                },
247
                {
248
                        // Initialize set id index so that invoices can be
249
                        // queried by individual htlc sets.
250
                        number:    22,
251
                        migration: mig.CreateTLB(setIDIndexBucket),
252
                },
253
                {
254
                        number:    23,
255
                        migration: migration23.MigrateHtlcAttempts,
256
                },
257
                {
258
                        // Remove old forwarding packages of closed channels.
259
                        number:    24,
260
                        migration: migration24.MigrateFwdPkgCleanup,
261
                },
262
                {
263
                        // Save the initial local/remote balances in channel
264
                        // info.
265
                        number:    25,
266
                        migration: migration25.MigrateInitialBalances,
267
                },
268
                {
269
                        // Migrate the initial local/remote balance fields into
270
                        // tlv records.
271
                        number:    26,
272
                        migration: migration26.MigrateBalancesToTlvRecords,
273
                },
274
                {
275
                        // Patch the initial local/remote balance fields with
276
                        // empty values for historical channels.
277
                        number:    27,
278
                        migration: migration27.MigrateHistoricalBalances,
279
                },
280
                {
281
                        number:    28,
282
                        migration: mig.CreateTLB(chanIDBucket),
283
                },
284
                {
285
                        number:    29,
286
                        migration: migration29.MigrateChanID,
287
                },
288
                {
289
                        // Removes the "sweeper-last-tx" bucket. Although we
290
                        // do not have a mandatory version 30 we skip this
291
                        // version because its naming is already used for the
292
                        // first optional migration.
293
                        number:    31,
294
                        migration: migration31.DeleteLastPublishedTxTLB,
295
                },
296
                {
297
                        number:    32,
298
                        migration: migration32.MigrateMCRouteSerialisation,
299
                },
300
                {
301
                        number:    33,
302
                        migration: migration33.MigrateMCStoreNameSpacedResults,
303
                },
304
        }
305

306
        // optionalVersions stores all optional migrations that are applied
307
        // after dbVersions.
308
        //
309
        // NOTE: optional migrations must be fault-tolerant and re-run already
310
        // migrated data must be noop, which means the migration must be able
311
        // to determine its state.
312
        optionalVersions = []optionalVersion{
313
                {
314
                        name: "prune_revocation_log",
315
                        migration: func(db kvdb.Backend,
316
                                cfg MigrationConfig) error {
×
317

×
318
                                return migration30.MigrateRevocationLog(db, cfg)
×
319
                        },
×
320
                },
321
                {
322
                        name: "gc_decayed_log",
323
                        migration: func(db kvdb.Backend,
324
                                cfg MigrationConfig) error {
3✔
325

3✔
326
                                return migration34.MigrateDecayedLog(
3✔
327
                                        db, cfg,
3✔
328
                                )
3✔
329
                        },
3✔
330
                },
331
        }
332

333
        // Big endian is the preferred byte order, due to cursor scans over
334
        // integer keys iterating in order.
335
        byteOrder = binary.BigEndian
336

337
        // channelOpeningStateBucket is the database bucket used to store the
338
        // channelOpeningState for each channel that is currently in the process
339
        // of being opened.
340
        channelOpeningStateBucket = []byte("channelOpeningState")
341
)
342

343
// DB is the primary datastore for the lnd daemon. The database stores
344
// information related to nodes, routing data, open/closed channels, fee
345
// schedules, and reputation data.
346
type DB struct {
347
        kvdb.Backend
348

349
        // channelStateDB separates all DB operations on channel state.
350
        channelStateDB *ChannelStateDB
351

352
        dbPath                    string
353
        clock                     clock.Clock
354
        dryRun                    bool
355
        keepFailedPaymentAttempts bool
356
        storeFinalHtlcResolutions bool
357

358
        // noRevLogAmtData if true, means that commitment transaction amount
359
        // data should not be stored in the revocation log.
360
        noRevLogAmtData bool
361
}
362

363
// OpenForTesting opens or creates a channeldb to be used for tests. Any
364
// necessary schemas migrations due to updates will take place as necessary.
365
func OpenForTesting(t testing.TB, dbPath string,
UNCOV
366
        modifiers ...OptionModifier) *DB {
×
UNCOV
367

×
UNCOV
368
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
×
UNCOV
369
                DBPath:            dbPath,
×
UNCOV
370
                DBFileName:        dbName,
×
UNCOV
371
                NoFreelistSync:    true,
×
UNCOV
372
                AutoCompact:       false,
×
UNCOV
373
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
×
UNCOV
374
                DBTimeout:         kvdb.DefaultDBTimeout,
×
UNCOV
375
        })
×
UNCOV
376
        require.NoError(t, err)
×
UNCOV
377

×
UNCOV
378
        db, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
379
        require.NoError(t, err)
×
UNCOV
380

×
UNCOV
381
        db.dbPath = dbPath
×
UNCOV
382

×
UNCOV
383
        t.Cleanup(func() {
×
UNCOV
384
                require.NoError(t, db.Close())
×
UNCOV
385
        })
×
386

UNCOV
387
        return db
×
388
}
389

390
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
391
// Any necessary schemas migrations due to updates will take place as necessary.
392
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
393
        error) {
3✔
394

3✔
395
        opts := DefaultOptions()
3✔
396
        for _, modifier := range modifiers {
6✔
397
                modifier(&opts)
3✔
398
        }
3✔
399

400
        if !opts.NoMigration {
6✔
401
                if err := initChannelDB(backend); err != nil {
3✔
UNCOV
402
                        return nil, err
×
UNCOV
403
                }
×
404
        }
405

406
        chanDB := &DB{
3✔
407
                Backend: backend,
3✔
408
                channelStateDB: &ChannelStateDB{
3✔
409
                        linkNodeDB: &LinkNodeDB{
3✔
410
                                backend: backend,
3✔
411
                        },
3✔
412
                        backend: backend,
3✔
413
                },
3✔
414
                clock:                     opts.clock,
3✔
415
                dryRun:                    opts.dryRun,
3✔
416
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
3✔
417
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
3✔
418
                noRevLogAmtData:           opts.NoRevLogAmtData,
3✔
419
        }
3✔
420

3✔
421
        // Set the parent pointer (only used in tests).
3✔
422
        chanDB.channelStateDB.parent = chanDB
3✔
423

3✔
424
        // Synchronize the version of database and apply migrations if needed.
3✔
425
        if !opts.NoMigration {
6✔
426
                if err := chanDB.syncVersions(dbVersions); err != nil {
3✔
UNCOV
427
                        backend.Close()
×
UNCOV
428
                        return nil, err
×
UNCOV
429
                }
×
430

431
                // Grab the optional migration config.
432
                omc := opts.OptionalMiragtionConfig
3✔
433
                if err := chanDB.applyOptionalVersions(omc); err != nil {
3✔
434
                        backend.Close()
×
435
                        return nil, err
×
436
                }
×
437
        }
438

439
        return chanDB, nil
3✔
440
}
441

442
// Path returns the file path to the channel database.
UNCOV
443
func (d *DB) Path() string {
×
UNCOV
444
        return d.dbPath
×
UNCOV
445
}
×
446

447
var dbTopLevelBuckets = [][]byte{
448
        openChannelBucket,
449
        closedChannelBucket,
450
        forwardingLogBucket,
451
        fwdPackagesKey,
452
        invoiceBucket,
453
        payAddrIndexBucket,
454
        setIDIndexBucket,
455
        paymentsIndexBucket,
456
        peersBucket,
457
        nodeInfoBucket,
458
        metaBucket,
459
        closeSummaryBucket,
460
        outpointBucket,
461
        chanIDBucket,
462
        historicalChannelBucket,
463
}
464

465
// Wipe completely deletes all saved state within all used buckets within the
466
// database. The deletion is done in a single transaction, therefore this
467
// operation is fully atomic.
UNCOV
468
func (d *DB) Wipe() error {
×
UNCOV
469
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
470
                for _, tlb := range dbTopLevelBuckets {
×
UNCOV
471
                        err := tx.DeleteTopLevelBucket(tlb)
×
UNCOV
472
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
473
                                return err
×
474
                        }
×
475
                }
UNCOV
476
                return nil
×
UNCOV
477
        }, func() {})
×
UNCOV
478
        if err != nil {
×
479
                return err
×
480
        }
×
481

UNCOV
482
        return initChannelDB(d.Backend)
×
483
}
484

485
// initChannelDB creates and initializes a fresh version of channeldb. In the
486
// case that the target path has not yet been created or doesn't yet exist, then
487
// the path is created. Additionally, all required top-level buckets used within
488
// the database are created.
489
func initChannelDB(db kvdb.Backend) error {
3✔
490
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
491
                // Check if DB was marked as inactive with a tomb stone.
3✔
492
                if err := EnsureNoTombstone(tx); err != nil {
3✔
UNCOV
493
                        return err
×
UNCOV
494
                }
×
495

496
                for _, tlb := range dbTopLevelBuckets {
6✔
497
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
3✔
498
                                return err
×
499
                        }
×
500
                }
501

502
                meta := &Meta{}
3✔
503
                // Check if DB is already initialized.
3✔
504
                err := FetchMeta(meta, tx)
3✔
505
                if err == nil {
6✔
506
                        return nil
3✔
507
                }
3✔
508

509
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
×
510
                return putMeta(meta, tx)
×
511
        }, func() {})
3✔
512
        if err != nil {
3✔
UNCOV
513
                return fmt.Errorf("unable to create new channeldb: %w", err)
×
UNCOV
514
        }
×
515

516
        return nil
3✔
517
}
518

519
// fileExists returns true if the file exists, and false otherwise.
UNCOV
520
func fileExists(path string) bool {
×
UNCOV
521
        if _, err := os.Stat(path); err != nil {
×
522
                if os.IsNotExist(err) {
×
523
                        return false
×
524
                }
×
525
        }
526

UNCOV
527
        return true
×
528
}
529

530
// ChannelStateDB is a database that keeps track of all channel state.
531
type ChannelStateDB struct {
532
        // linkNodeDB separates all DB operations on LinkNodes.
533
        linkNodeDB *LinkNodeDB
534

535
        // parent holds a pointer to the "main" channeldb.DB object. This is
536
        // only used for testing and should never be used in production code.
537
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
538
        // this pointer.
539
        parent *DB
540

541
        // backend points to the actual backend holding the channel state
542
        // database. This may be a real backend or a cache middleware.
543
        backend kvdb.Backend
544
}
545

546
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
547
// ChannelStateDB instance. Use this function only in tests where passing around
548
// pointers makes testing less readable. Never to be used in production code!
UNCOV
549
func (c *ChannelStateDB) GetParentDB() *DB {
×
UNCOV
550
        return c.parent
×
UNCOV
551
}
×
552

553
// LinkNodeDB returns the current instance of the link node database.
554
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
3✔
555
        return c.linkNodeDB
3✔
556
}
3✔
557

558
// FetchOpenChannels starts a new database transaction and returns all stored
559
// currently active/open channels associated with the target nodeID. In the case
560
// that no active channels are known to have been created with this node, then a
561
// zero-length slice is returned.
562
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
563
        []*OpenChannel, error) {
3✔
564

3✔
565
        var channels []*OpenChannel
3✔
566
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
567
                var err error
3✔
568
                channels, err = c.fetchOpenChannels(tx, nodeID)
3✔
569
                return err
3✔
570
        }, func() {
6✔
571
                channels = nil
3✔
572
        })
3✔
573

574
        return channels, err
3✔
575
}
576

577
// fetchOpenChannels uses and existing database transaction and returns all
578
// stored currently active/open channels associated with the target nodeID. In
579
// the case that no active channels are known to have been created with this
580
// node, then a zero-length slice is returned.
581
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
582
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
3✔
583

3✔
584
        // Get the bucket dedicated to storing the metadata for open channels.
3✔
585
        openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
586
        if openChanBucket == nil {
3✔
587
                return nil, nil
×
588
        }
×
589

590
        // Within this top level bucket, fetch the bucket dedicated to storing
591
        // open channel data specific to the remote node.
592
        pub := nodeID.SerializeCompressed()
3✔
593
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
3✔
594
        if nodeChanBucket == nil {
6✔
595
                return nil, nil
3✔
596
        }
3✔
597

598
        // Next, we'll need to go down an additional layer in order to retrieve
599
        // the channels for each chain the node knows of.
600
        var channels []*OpenChannel
3✔
601
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
602
                // If there's a value, it's not a bucket so ignore it.
3✔
603
                if v != nil {
3✔
604
                        return nil
×
605
                }
×
606

607
                // If we've found a valid chainhash bucket, then we'll retrieve
608
                // that so we can extract all the channels.
609
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
3✔
610
                if chainBucket == nil {
3✔
611
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
612
                                chainHash[:])
×
613
                }
×
614

615
                // Finally, we both of the necessary buckets retrieved, fetch
616
                // all the active channels related to this node.
617
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
3✔
618
                if err != nil {
3✔
619
                        return fmt.Errorf("unable to read channel for "+
×
620
                                "chain_hash=%x, node_key=%x: %v",
×
621
                                chainHash[:], pub, err)
×
622
                }
×
623

624
                channels = append(channels, nodeChannels...)
3✔
625
                return nil
3✔
626
        })
627

628
        return channels, err
3✔
629
}
630

631
// fetchNodeChannels retrieves all active channels from the target chainBucket
632
// which is under a node's dedicated channel bucket. This function is typically
633
// used to fetch all the active channels related to a particular node.
634
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
635
        []*OpenChannel, error) {
3✔
636

3✔
637
        var channels []*OpenChannel
3✔
638

3✔
639
        // A node may have channels on several chains, so for each known chain,
3✔
640
        // we'll extract all the channels.
3✔
641
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
6✔
642
                // If there's a value, it's not a bucket so ignore it.
3✔
643
                if v != nil {
3✔
644
                        return nil
×
645
                }
×
646

647
                // Once we've found a valid channel bucket, we'll extract it
648
                // from the node's chain bucket.
649
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
3✔
650

3✔
651
                var outPoint wire.OutPoint
3✔
652
                err := graphdb.ReadOutpoint(
3✔
653
                        bytes.NewReader(chanPoint), &outPoint,
3✔
654
                )
3✔
655
                if err != nil {
3✔
656
                        return err
×
657
                }
×
658
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
3✔
659
                if err != nil {
3✔
660
                        return fmt.Errorf("unable to read channel data for "+
×
661
                                "chan_point=%v: %w", outPoint, err)
×
662
                }
×
663
                oChannel.Db = c
3✔
664

3✔
665
                channels = append(channels, oChannel)
3✔
666

3✔
667
                return nil
3✔
668
        })
669
        if err != nil {
3✔
670
                return nil, err
×
671
        }
×
672

673
        return channels, nil
3✔
674
}
675

676
// FetchChannel attempts to locate a channel specified by the passed channel
677
// point. If the channel cannot be found, then an error will be returned.
678
func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel,
679
        error) {
3✔
680

3✔
681
        var targetChanPoint bytes.Buffer
3✔
682
        err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint)
3✔
683
        if err != nil {
3✔
684
                return nil, err
×
685
        }
×
686

687
        targetChanPointBytes := targetChanPoint.Bytes()
3✔
688
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
689
                error) {
6✔
690

3✔
691
                return targetChanPointBytes, &chanPoint, nil
3✔
692
        }
3✔
693

694
        return c.channelScanner(nil, selector)
3✔
695
}
696

697
// FetchChannelByID attempts to locate a channel specified by the passed channel
698
// ID. If the channel cannot be found, then an error will be returned.
699
// Optionally an existing db tx can be supplied.
700
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
701
        *OpenChannel, error) {
3✔
702

3✔
703
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
704
                error) {
6✔
705

3✔
706
                var (
3✔
707
                        targetChanPointBytes []byte
3✔
708
                        targetChanPoint      *wire.OutPoint
3✔
709

3✔
710
                        // errChanFound is used to signal that the channel has
3✔
711
                        // been found so that iteration through the DB buckets
3✔
712
                        // can stop.
3✔
713
                        errChanFound = errors.New("channel found")
3✔
714
                )
3✔
715
                err := chainBkt.ForEach(func(k, _ []byte) error {
6✔
716
                        var outPoint wire.OutPoint
3✔
717
                        err := graphdb.ReadOutpoint(
3✔
718
                                bytes.NewReader(k), &outPoint,
3✔
719
                        )
3✔
720
                        if err != nil {
3✔
721
                                return err
×
722
                        }
×
723

724
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
3✔
725
                        if chanID != id {
3✔
UNCOV
726
                                return nil
×
UNCOV
727
                        }
×
728

729
                        targetChanPoint = &outPoint
3✔
730
                        targetChanPointBytes = k
3✔
731

3✔
732
                        return errChanFound
3✔
733
                })
734
                if err != nil && !errors.Is(err, errChanFound) {
3✔
735
                        return nil, nil, err
×
736
                }
×
737
                if targetChanPoint == nil {
3✔
UNCOV
738
                        return nil, nil, ErrChannelNotFound
×
UNCOV
739
                }
×
740

741
                return targetChanPointBytes, targetChanPoint, nil
3✔
742
        }
743

744
        return c.channelScanner(tx, selector)
3✔
745
}
746

747
// ChanCount is used by the server in determining access control.
748
type ChanCount struct {
749
        HasOpenOrClosedChan bool
750
        PendingOpenCount    uint64
751
}
752

753
// FetchPermAndTempPeers returns a map where the key is the remote node's
754
// public key and the value is a struct that has a tally of the pending-open
755
// channels and whether the peer has an open or closed channel with us.
756
func (c *ChannelStateDB) FetchPermAndTempPeers(
757
        chainHash []byte) (map[string]ChanCount, error) {
3✔
758

3✔
759
        peerChanInfo := make(map[string]ChanCount)
3✔
760

3✔
761
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
762
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
763
                if openChanBucket == nil {
3✔
764
                        return ErrNoChanDBExists
×
765
                }
×
766

767
                openChanErr := openChanBucket.ForEach(func(nodePub,
3✔
768
                        v []byte) error {
6✔
769

3✔
770
                        // If there is a value, this is not a bucket.
3✔
771
                        if v != nil {
3✔
772
                                return nil
×
773
                        }
×
774

775
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
776
                                nodePub,
3✔
777
                        )
3✔
778
                        if nodeChanBucket == nil {
3✔
779
                                return nil
×
780
                        }
×
781

782
                        chainBucket := nodeChanBucket.NestedReadBucket(
3✔
783
                                chainHash,
3✔
784
                        )
3✔
785
                        if chainBucket == nil {
3✔
786
                                return fmt.Errorf("no chain bucket exists")
×
787
                        }
×
788

789
                        var isPermPeer bool
3✔
790
                        var pendingOpenCount uint64
3✔
791

3✔
792
                        internalErr := chainBucket.ForEach(func(chanPoint,
3✔
793
                                val []byte) error {
6✔
794

3✔
795
                                // If there is a value, this is not a bucket.
3✔
796
                                if val != nil {
3✔
797
                                        return nil
×
798
                                }
×
799

800
                                chanBucket := chainBucket.NestedReadBucket(
3✔
801
                                        chanPoint,
3✔
802
                                )
3✔
803
                                if chanBucket == nil {
3✔
804
                                        return nil
×
805
                                }
×
806

807
                                var op wire.OutPoint
3✔
808
                                readErr := graphdb.ReadOutpoint(
3✔
809
                                        bytes.NewReader(chanPoint), &op,
3✔
810
                                )
3✔
811
                                if readErr != nil {
3✔
812
                                        return readErr
×
813
                                }
×
814

815
                                // We need to go through each channel and look
816
                                // at the IsPending status.
817
                                openChan, err := fetchOpenChannel(
3✔
818
                                        chanBucket, &op,
3✔
819
                                )
3✔
820
                                if err != nil {
3✔
821
                                        return err
×
822
                                }
×
823

824
                                if openChan.IsPending {
6✔
825
                                        // Add to the pending-open count since
3✔
826
                                        // this is a temp peer.
3✔
827
                                        pendingOpenCount++
3✔
828
                                        return nil
3✔
829
                                }
3✔
830

831
                                // Since IsPending is false, this is a perm
832
                                // peer.
833
                                isPermPeer = true
3✔
834

3✔
835
                                return nil
3✔
836
                        })
837
                        if internalErr != nil {
3✔
838
                                return internalErr
×
839
                        }
×
840

841
                        peerCount := ChanCount{
3✔
842
                                HasOpenOrClosedChan: isPermPeer,
3✔
843
                                PendingOpenCount:    pendingOpenCount,
3✔
844
                        }
3✔
845
                        peerChanInfo[string(nodePub)] = peerCount
3✔
846

3✔
847
                        return nil
3✔
848
                })
849
                if openChanErr != nil {
3✔
850
                        return openChanErr
×
851
                }
×
852

853
                // Now check the closed channel bucket.
854
                historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
3✔
855
                if historicalChanBucket == nil {
3✔
856
                        return ErrNoHistoricalBucket
×
857
                }
×
858

859
                historicalErr := historicalChanBucket.ForEach(func(chanPoint,
3✔
860
                        v []byte) error {
6✔
861
                        // Parse each nested bucket and the chanInfoKey to get
3✔
862
                        // the IsPending bool. This determines whether the
3✔
863
                        // peer is protected or not.
3✔
864
                        if v != nil {
3✔
865
                                // This is not a bucket. This is currently not
×
866
                                // possible.
×
867
                                return nil
×
868
                        }
×
869

870
                        chanBucket := historicalChanBucket.NestedReadBucket(
3✔
871
                                chanPoint,
3✔
872
                        )
3✔
873
                        if chanBucket == nil {
3✔
874
                                // This is not possible.
×
875
                                return fmt.Errorf("no historical channel " +
×
876
                                        "bucket exists")
×
877
                        }
×
878

879
                        var op wire.OutPoint
3✔
880
                        readErr := graphdb.ReadOutpoint(
3✔
881
                                bytes.NewReader(chanPoint), &op,
3✔
882
                        )
3✔
883
                        if readErr != nil {
3✔
884
                                return readErr
×
885
                        }
×
886

887
                        // This channel is closed, but the structure of the
888
                        // historical bucket is the same. This is by design,
889
                        // which means we can call fetchOpenChannel.
890
                        channel, fetchErr := fetchOpenChannel(chanBucket, &op)
3✔
891
                        if fetchErr != nil {
3✔
892
                                return fetchErr
×
893
                        }
×
894

895
                        // Only include this peer in the protected class if
896
                        // the closing transaction confirmed. Note that
897
                        // CloseChannel can be called in the funding manager
898
                        // while IsPending is true which is why we need this
899
                        // special-casing to not count premature funding
900
                        // manager calls to CloseChannel.
901
                        if !channel.IsPending {
6✔
902
                                // Fetch the public key of the remote node. We
3✔
903
                                // need to use the string-ified serialized,
3✔
904
                                // compressed bytes as the key.
3✔
905
                                remotePub := channel.IdentityPub
3✔
906
                                remoteSer := remotePub.SerializeCompressed()
3✔
907
                                remoteKey := string(remoteSer)
3✔
908

3✔
909
                                count, exists := peerChanInfo[remoteKey]
3✔
910
                                if exists {
6✔
911
                                        count.HasOpenOrClosedChan = true
3✔
912
                                        peerChanInfo[remoteKey] = count
3✔
913
                                } else {
3✔
914
                                        peerCount := ChanCount{
×
915
                                                HasOpenOrClosedChan: true,
×
916
                                        }
×
917
                                        peerChanInfo[remoteKey] = peerCount
×
918
                                }
×
919
                        }
920

921
                        return nil
3✔
922
                })
923
                if historicalErr != nil {
3✔
924
                        return historicalErr
×
925
                }
×
926

927
                return nil
3✔
928
        }, func() {
3✔
929
                clear(peerChanInfo)
3✔
930
        })
3✔
931

932
        return peerChanInfo, err
3✔
933
}
934

935
// channelSelector describes a function that takes a chain-hash bucket from
936
// within the open-channel DB and returns the wanted channel point bytes, and
937
// channel point. It must return the ErrChannelNotFound error if the wanted
938
// channel is not in the given bucket.
939
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
940
        error)
941

942
// channelScanner will traverse the DB to each chain-hash bucket of each node
943
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
944
// to fetch the wanted channel outpoint from the chain bucket.
945
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
946
        chanSelect channelSelector) (*OpenChannel, error) {
3✔
947

3✔
948
        var (
3✔
949
                targetChan *OpenChannel
3✔
950

3✔
951
                // errChanFound is used to signal that the channel has been
3✔
952
                // found so that iteration through the DB buckets can stop.
3✔
953
                errChanFound = errors.New("channel found")
3✔
954
        )
3✔
955

3✔
956
        // chanScan will traverse the following bucket structure:
3✔
957
        //  * nodePub => chainHash => chanPoint
3✔
958
        //
3✔
959
        // At each level we go one further, ensuring that we're traversing the
3✔
960
        // proper key (that's actually a bucket). By only reading the bucket
3✔
961
        // structure and skipping fully decoding each channel, we save a good
3✔
962
        // bit of CPU as we don't need to do things like decompress public
3✔
963
        // keys.
3✔
964
        chanScan := func(tx kvdb.RTx) error {
6✔
965
                // Get the bucket dedicated to storing the metadata for open
3✔
966
                // channels.
3✔
967
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
968
                if openChanBucket == nil {
3✔
969
                        return ErrNoActiveChannels
×
970
                }
×
971

972
                // Within the node channel bucket, are the set of node pubkeys
973
                // we have channels with, we don't know the entire set, so we'll
974
                // check them all.
975
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
6✔
976
                        // Ensure that this is a key the same size as a pubkey,
3✔
977
                        // and also that it leads directly to a bucket.
3✔
978
                        if len(nodePub) != 33 || v != nil {
3✔
979
                                return nil
×
980
                        }
×
981

982
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
983
                                nodePub,
3✔
984
                        )
3✔
985
                        if nodeChanBucket == nil {
3✔
986
                                return nil
×
987
                        }
×
988

989
                        // The next layer down is all the chains that this node
990
                        // has channels on with us.
991
                        return nodeChanBucket.ForEach(func(chainHash,
3✔
992
                                v []byte) error {
6✔
993

3✔
994
                                // If there's a value, it's not a bucket so
3✔
995
                                // ignore it.
3✔
996
                                if v != nil {
3✔
997
                                        return nil
×
998
                                }
×
999

1000
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
1001
                                        chainHash,
3✔
1002
                                )
3✔
1003
                                if chainBucket == nil {
3✔
1004
                                        return fmt.Errorf("unable to read "+
×
1005
                                                "bucket for chain=%x",
×
1006
                                                chainHash)
×
1007
                                }
×
1008

1009
                                // Finally, we reach the leaf bucket that stores
1010
                                // all the chanPoints for this node.
1011
                                targetChanBytes, chanPoint, err := chanSelect(
3✔
1012
                                        chainBucket,
3✔
1013
                                )
3✔
1014
                                if errors.Is(err, ErrChannelNotFound) {
3✔
UNCOV
1015
                                        return nil
×
1016
                                } else if err != nil {
3✔
1017
                                        return err
×
1018
                                }
×
1019

1020
                                chanBucket := chainBucket.NestedReadBucket(
3✔
1021
                                        targetChanBytes,
3✔
1022
                                )
3✔
1023
                                if chanBucket == nil {
6✔
1024
                                        return nil
3✔
1025
                                }
3✔
1026

1027
                                channel, err := fetchOpenChannel(
3✔
1028
                                        chanBucket, chanPoint,
3✔
1029
                                )
3✔
1030
                                if err != nil {
3✔
1031
                                        return err
×
1032
                                }
×
1033

1034
                                targetChan = channel
3✔
1035
                                targetChan.Db = c
3✔
1036

3✔
1037
                                return errChanFound
3✔
1038
                        })
1039
                })
1040
        }
1041

1042
        var err error
3✔
1043
        if tx == nil {
6✔
1044
                err = kvdb.View(c.backend, chanScan, func() {})
6✔
1045
        } else {
×
1046
                err = chanScan(tx)
×
1047
        }
×
1048
        if err != nil && !errors.Is(err, errChanFound) {
3✔
1049
                return nil, err
×
1050
        }
×
1051

1052
        if targetChan != nil {
6✔
1053
                return targetChan, nil
3✔
1054
        }
3✔
1055

1056
        // If we can't find the channel, then we return with an error, as we
1057
        // have nothing to back up.
1058
        return nil, ErrChannelNotFound
3✔
1059
}
1060

1061
// FetchAllChannels attempts to retrieve all open channels currently stored
1062
// within the database, including pending open, fully open and channels waiting
1063
// for a closing transaction to confirm.
1064
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
3✔
1065
        return fetchChannels(c)
3✔
1066
}
3✔
1067

1068
// FetchAllOpenChannels will return all channels that have the funding
1069
// transaction confirmed, and is not waiting for a closing transaction to be
1070
// confirmed.
1071
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
3✔
1072
        return fetchChannels(
3✔
1073
                c,
3✔
1074
                pendingChannelFilter(false),
3✔
1075
                waitingCloseFilter(false),
3✔
1076
        )
3✔
1077
}
3✔
1078

1079
// FetchPendingChannels will return channels that have completed the process of
1080
// generating and broadcasting funding transactions, but whose funding
1081
// transactions have yet to be confirmed on the blockchain.
1082
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
3✔
1083
        return fetchChannels(c,
3✔
1084
                pendingChannelFilter(true),
3✔
1085
                waitingCloseFilter(false),
3✔
1086
        )
3✔
1087
}
3✔
1088

1089
// FetchWaitingCloseChannels will return all channels that have been opened,
1090
// but are now waiting for a closing transaction to be confirmed.
1091
//
1092
// NOTE: This includes channels that are also pending to be opened.
1093
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
3✔
1094
        return fetchChannels(
3✔
1095
                c, waitingCloseFilter(true),
3✔
1096
        )
3✔
1097
}
3✔
1098

1099
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
1100
// A set of filters can be combined to filter across multiple dimensions.
1101
type fetchChannelsFilter func(channel *OpenChannel) bool
1102

1103
// pendingChannelFilter returns a filter based on whether channels are pending
1104
// (ie, their funding transaction still needs to confirm). If pending is false,
1105
// channels with confirmed funding transactions are returned.
1106
func pendingChannelFilter(pending bool) fetchChannelsFilter {
3✔
1107
        return func(channel *OpenChannel) bool {
6✔
1108
                return channel.IsPending == pending
3✔
1109
        }
3✔
1110
}
1111

1112
// waitingCloseFilter returns a filter which filters channels based on whether
1113
// they are awaiting the confirmation of their closing transaction. If waiting
1114
// close is true, channels that have had their closing tx broadcast are
1115
// included. If it is false, channels that are not awaiting confirmation of
1116
// their close transaction are returned.
1117
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
3✔
1118
        return func(channel *OpenChannel) bool {
6✔
1119
                // If the channel is in any other state than Default,
3✔
1120
                // then it means it is waiting to be closed.
3✔
1121
                channelWaitingClose :=
3✔
1122
                        channel.ChanStatus() != ChanStatusDefault
3✔
1123

3✔
1124
                // Include the channel if it matches the value for
3✔
1125
                // waiting close that we are filtering on.
3✔
1126
                return channelWaitingClose == waitingClose
3✔
1127
        }
3✔
1128
}
1129

1130
// fetchChannels attempts to retrieve channels currently stored in the
1131
// database. It takes a set of filters which are applied to each channel to
1132
// obtain a set of channels with the desired set of properties. Only channels
1133
// which have a true value returned for *all* of the filters will be returned.
1134
// If no filters are provided, every channel in the open channels bucket will
1135
// be returned.
1136
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
1137
        []*OpenChannel, error) {
3✔
1138

3✔
1139
        var channels []*OpenChannel
3✔
1140

3✔
1141
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1142
                // Get the bucket dedicated to storing the metadata for open
3✔
1143
                // channels.
3✔
1144
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
1145
                if openChanBucket == nil {
3✔
1146
                        return ErrNoActiveChannels
×
1147
                }
×
1148

1149
                // Next, fetch the bucket dedicated to storing metadata related
1150
                // to all nodes. All keys within this bucket are the serialized
1151
                // public keys of all our direct counterparties.
1152
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
3✔
1153
                if nodeMetaBucket == nil {
3✔
1154
                        return fmt.Errorf("node bucket not created")
×
1155
                }
×
1156

1157
                // Finally for each node public key in the bucket, fetch all
1158
                // the channels related to this particular node.
1159
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
6✔
1160
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
3✔
1161
                        if nodeChanBucket == nil {
3✔
1162
                                return nil
×
1163
                        }
×
1164

1165
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
1166
                                // If there's a value, it's not a bucket so
3✔
1167
                                // ignore it.
3✔
1168
                                if v != nil {
3✔
1169
                                        return nil
×
1170
                                }
×
1171

1172
                                // If we've found a valid chainhash bucket,
1173
                                // then we'll retrieve that so we can extract
1174
                                // all the channels.
1175
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
1176
                                        chainHash,
3✔
1177
                                )
3✔
1178
                                if chainBucket == nil {
3✔
1179
                                        return fmt.Errorf("unable to read "+
×
1180
                                                "bucket for chain=%x", chainHash[:])
×
1181
                                }
×
1182

1183
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
3✔
1184
                                if err != nil {
3✔
1185
                                        return fmt.Errorf("unable to read "+
×
1186
                                                "channel for chain_hash=%x, "+
×
1187
                                                "node_key=%x: %v", chainHash[:], k, err)
×
1188
                                }
×
1189
                                for _, channel := range nodeChans {
6✔
1190
                                        // includeChannel indicates whether the channel
3✔
1191
                                        // meets the criteria specified by our filters.
3✔
1192
                                        includeChannel := true
3✔
1193

3✔
1194
                                        // Run through each filter and check whether the
3✔
1195
                                        // channel should be included.
3✔
1196
                                        for _, f := range filters {
6✔
1197
                                                // If the channel fails the filter, set
3✔
1198
                                                // includeChannel to false and don't bother
3✔
1199
                                                // checking the remaining filters.
3✔
1200
                                                if !f(channel) {
6✔
1201
                                                        includeChannel = false
3✔
1202
                                                        break
3✔
1203
                                                }
1204
                                        }
1205

1206
                                        // If the channel passed every filter, include it in
1207
                                        // our set of channels.
1208
                                        if includeChannel {
6✔
1209
                                                channels = append(channels, channel)
3✔
1210
                                        }
3✔
1211
                                }
1212
                                return nil
3✔
1213
                        })
1214

1215
                })
1216
        }, func() {
3✔
1217
                channels = nil
3✔
1218
        })
3✔
1219
        if err != nil {
3✔
1220
                return nil, err
×
1221
        }
×
1222

1223
        return channels, nil
3✔
1224
}
1225

1226
// FetchClosedChannels attempts to fetch all closed channels from the database.
1227
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1228
// be returned in the response or not. When a channel was cooperatively closed,
1229
// it becomes fully closed after a single confirmation.  When a channel was
1230
// forcibly closed, it will become fully closed after _all_ the pending funds
1231
// (if any) have been swept.
1232
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1233
        []*ChannelCloseSummary, error) {
3✔
1234

3✔
1235
        var chanSummaries []*ChannelCloseSummary
3✔
1236

3✔
1237
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1238
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1239
                if closeBucket == nil {
3✔
1240
                        return ErrNoClosedChannels
×
1241
                }
×
1242

1243
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
6✔
1244
                        summaryReader := bytes.NewReader(summaryBytes)
3✔
1245
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
3✔
1246
                        if err != nil {
3✔
1247
                                return err
×
1248
                        }
×
1249

1250
                        // If the query specified to only include pending
1251
                        // channels, then we'll skip any channels which aren't
1252
                        // currently pending.
1253
                        if !chanSummary.IsPending && pendingOnly {
6✔
1254
                                return nil
3✔
1255
                        }
3✔
1256

1257
                        chanSummaries = append(chanSummaries, chanSummary)
3✔
1258
                        return nil
3✔
1259
                })
1260
        }, func() {
3✔
1261
                chanSummaries = nil
3✔
1262
        }); err != nil {
3✔
1263
                return nil, err
×
1264
        }
×
1265

1266
        return chanSummaries, nil
3✔
1267
}
1268

1269
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1270
// the channeldb.
1271
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1272

1273
// FetchClosedChannel queries for a channel close summary using the channel
1274
// point of the channel in question.
1275
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1276
        *ChannelCloseSummary, error) {
3✔
1277

3✔
1278
        var chanSummary *ChannelCloseSummary
3✔
1279
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1280
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1281
                if closeBucket == nil {
3✔
1282
                        return ErrClosedChannelNotFound
×
1283
                }
×
1284

1285
                var b bytes.Buffer
3✔
1286
                var err error
3✔
1287
                if err = graphdb.WriteOutpoint(&b, chanID); err != nil {
3✔
1288
                        return err
×
1289
                }
×
1290

1291
                summaryBytes := closeBucket.Get(b.Bytes())
3✔
1292
                if summaryBytes == nil {
3✔
UNCOV
1293
                        return ErrClosedChannelNotFound
×
UNCOV
1294
                }
×
1295

1296
                summaryReader := bytes.NewReader(summaryBytes)
3✔
1297
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
3✔
1298

3✔
1299
                return err
3✔
1300
        }, func() {
3✔
1301
                chanSummary = nil
3✔
1302
        }); err != nil {
3✔
UNCOV
1303
                return nil, err
×
UNCOV
1304
        }
×
1305

1306
        return chanSummary, nil
3✔
1307
}
1308

1309
// FetchClosedChannelForID queries for a channel close summary using the
1310
// channel ID of the channel in question.
1311
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1312
        *ChannelCloseSummary, error) {
3✔
1313

3✔
1314
        var chanSummary *ChannelCloseSummary
3✔
1315
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1316
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1317
                if closeBucket == nil {
3✔
1318
                        return ErrClosedChannelNotFound
×
1319
                }
×
1320

1321
                // The first 30 bytes of the channel ID and outpoint will be
1322
                // equal.
1323
                cursor := closeBucket.ReadCursor()
3✔
1324
                op, c := cursor.Seek(cid[:30])
3✔
1325

3✔
1326
                // We scan over all possible candidates for this channel ID.
3✔
1327
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
6✔
1328
                        var outPoint wire.OutPoint
3✔
1329
                        err := graphdb.ReadOutpoint(
3✔
1330
                                bytes.NewReader(op), &outPoint,
3✔
1331
                        )
3✔
1332
                        if err != nil {
3✔
1333
                                return err
×
1334
                        }
×
1335

1336
                        // If the found outpoint does not correspond to this
1337
                        // channel ID, we continue.
1338
                        if !cid.IsChanPoint(&outPoint) {
3✔
UNCOV
1339
                                continue
×
1340
                        }
1341

1342
                        // Deserialize the close summary and return.
1343
                        r := bytes.NewReader(c)
3✔
1344
                        chanSummary, err = deserializeCloseChannelSummary(r)
3✔
1345
                        if err != nil {
3✔
1346
                                return err
×
1347
                        }
×
1348

1349
                        return nil
3✔
1350
                }
1351
                return ErrClosedChannelNotFound
3✔
1352
        }, func() {
3✔
1353
                chanSummary = nil
3✔
1354
        }); err != nil {
6✔
1355
                return nil, err
3✔
1356
        }
3✔
1357

1358
        return chanSummary, nil
3✔
1359
}
1360

1361
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1362
// channel should be marked as fully closed if the channel was initially
1363
// cooperatively closed and it's reached a single confirmation, or after all
1364
// the pending funds in a channel that has been forcibly closed have been
1365
// swept.
1366
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
3✔
1367
        var (
3✔
1368
                openChannels  []*OpenChannel
3✔
1369
                pruneLinkNode *btcec.PublicKey
3✔
1370
        )
3✔
1371
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1372
                var b bytes.Buffer
3✔
1373
                if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
3✔
1374
                        return err
×
1375
                }
×
1376

1377
                chanID := b.Bytes()
3✔
1378

3✔
1379
                closedChanBucket, err := tx.CreateTopLevelBucket(
3✔
1380
                        closedChannelBucket,
3✔
1381
                )
3✔
1382
                if err != nil {
3✔
1383
                        return err
×
1384
                }
×
1385

1386
                chanSummaryBytes := closedChanBucket.Get(chanID)
3✔
1387
                if chanSummaryBytes == nil {
3✔
1388
                        return fmt.Errorf("no closed channel for "+
×
1389
                                "chan_point=%v found", chanPoint)
×
1390
                }
×
1391

1392
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
3✔
1393
                chanSummary, err := deserializeCloseChannelSummary(
3✔
1394
                        chanSummaryReader,
3✔
1395
                )
3✔
1396
                if err != nil {
3✔
1397
                        return err
×
1398
                }
×
1399

1400
                chanSummary.IsPending = false
3✔
1401

3✔
1402
                var newSummary bytes.Buffer
3✔
1403
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
3✔
1404
                if err != nil {
3✔
1405
                        return err
×
1406
                }
×
1407

1408
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
3✔
1409
                if err != nil {
3✔
1410
                        return err
×
1411
                }
×
1412

1413
                // Now that the channel is closed, we'll check if we have any
1414
                // other open channels with this peer. If we don't we'll
1415
                // garbage collect it to ensure we don't establish persistent
1416
                // connections to peers without open channels.
1417
                pruneLinkNode = chanSummary.RemotePub
3✔
1418
                openChannels, err = c.fetchOpenChannels(
3✔
1419
                        tx, pruneLinkNode,
3✔
1420
                )
3✔
1421
                if err != nil {
3✔
1422
                        return fmt.Errorf("unable to fetch open channels for "+
×
1423
                                "peer %x: %v",
×
1424
                                pruneLinkNode.SerializeCompressed(), err)
×
1425
                }
×
1426

1427
                return nil
3✔
1428
        }, func() {
3✔
1429
                openChannels = nil
3✔
1430
                pruneLinkNode = nil
3✔
1431
        })
3✔
1432
        if err != nil {
3✔
1433
                return err
×
1434
        }
×
1435

1436
        // Decide whether we want to remove the link node, based upon the number
1437
        // of still open channels.
1438
        return c.pruneLinkNode(openChannels, pruneLinkNode)
3✔
1439
}
1440

1441
// pruneLinkNode determines whether we should garbage collect a link node from
1442
// the database due to no longer having any open channels with it. If there are
1443
// any left, then this acts as a no-op.
1444
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1445
        remotePub *btcec.PublicKey) error {
3✔
1446

3✔
1447
        if len(openChannels) > 0 {
6✔
1448
                return nil
3✔
1449
        }
3✔
1450

1451
        log.Infof("Pruning link node %x with zero open channels from database",
3✔
1452
                remotePub.SerializeCompressed())
3✔
1453

3✔
1454
        return c.linkNodeDB.DeleteLinkNode(remotePub)
3✔
1455
}
1456

1457
// PruneLinkNodes attempts to prune all link nodes found within the database
1458
// with whom we no longer have any open channels with.
1459
func (c *ChannelStateDB) PruneLinkNodes() error {
3✔
1460
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
3✔
1461
        if err != nil {
3✔
1462
                return err
×
1463
        }
×
1464

1465
        for _, linkNode := range allLinkNodes {
6✔
1466
                var (
3✔
1467
                        openChannels []*OpenChannel
3✔
1468
                        linkNode     = linkNode
3✔
1469
                )
3✔
1470
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1471
                        var err error
3✔
1472
                        openChannels, err = c.fetchOpenChannels(
3✔
1473
                                tx, linkNode.IdentityPub,
3✔
1474
                        )
3✔
1475
                        return err
3✔
1476
                }, func() {
6✔
1477
                        openChannels = nil
3✔
1478
                })
3✔
1479
                if err != nil {
3✔
1480
                        return err
×
1481
                }
×
1482

1483
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
3✔
1484
                if err != nil {
3✔
1485
                        return err
×
1486
                }
×
1487
        }
1488

1489
        return nil
3✔
1490
}
1491

1492
// ChannelShell is a shell of a channel that is meant to be used for channel
1493
// recovery purposes. It contains a minimal OpenChannel instance along with
1494
// addresses for that target node.
1495
type ChannelShell struct {
1496
        // NodeAddrs the set of addresses that this node has known to be
1497
        // reachable at in the past.
1498
        NodeAddrs []net.Addr
1499

1500
        // Chan is a shell of an OpenChannel, it contains only the items
1501
        // required to restore the channel on disk.
1502
        Chan *OpenChannel
1503
}
1504

1505
// RestoreChannelShells is a method that allows the caller to reconstruct the
1506
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1507
// new channel to disk, create a LinkNode instance with the passed node
1508
// addresses, and finally create an edge within the graph for the channel as
1509
// well. This method is idempotent, so repeated calls with the same set of
1510
// channel shells won't modify the database after the initial call.
1511
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
3✔
1512
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1513
                for _, channelShell := range channelShells {
6✔
1514
                        channel := channelShell.Chan
3✔
1515

3✔
1516
                        // When we make a channel, we mark that the channel has
3✔
1517
                        // been restored, this will signal to other sub-systems
3✔
1518
                        // to not attempt to use the channel as if it was a
3✔
1519
                        // regular one.
3✔
1520
                        channel.chanStatus |= ChanStatusRestored
3✔
1521

3✔
1522
                        // First, we'll attempt to create a new open channel
3✔
1523
                        // and link node for this channel. If the channel
3✔
1524
                        // already exists, then in order to ensure this method
3✔
1525
                        // is idempotent, we'll continue to the next step.
3✔
1526
                        channel.Db = c
3✔
1527
                        err := syncNewChannel(
3✔
1528
                                tx, channel, channelShell.NodeAddrs,
3✔
1529
                        )
3✔
1530
                        if err != nil {
6✔
1531
                                return err
3✔
1532
                        }
3✔
1533
                }
1534

1535
                return nil
3✔
1536
        }, func() {})
3✔
1537
        if err != nil {
6✔
1538
                return err
3✔
1539
        }
3✔
1540

1541
        return nil
3✔
1542
}
1543

1544
// AddrsForNode consults the channel database for all addresses known to the
1545
// passed node public key. The returned boolean indicates if the given node is
1546
// unknown to the channel DB or not.
1547
//
1548
// NOTE: this is part of the AddrSource interface.
1549
func (d *DB) AddrsForNode(_ context.Context, nodePub *btcec.PublicKey) (bool,
1550
        []net.Addr, error) {
3✔
1551

3✔
1552
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
3✔
1553
        // Only if the error is something other than ErrNodeNotFound do we
3✔
1554
        // return it.
3✔
1555
        switch {
3✔
1556
        case err != nil && !errors.Is(err, ErrNodeNotFound):
×
1557
                return false, nil, err
×
1558

1559
        case errors.Is(err, ErrNodeNotFound):
×
1560
                return false, nil, nil
×
1561
        }
1562

1563
        return true, linkNode.Addresses, nil
3✔
1564
}
1565

1566
// AbandonChannel attempts to remove the target channel from the open channel
1567
// database. If the channel was already removed (has a closed channel entry),
1568
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1569
// into the database.
1570
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1571
        bestHeight uint32) error {
3✔
1572

3✔
1573
        // With the chanPoint constructed, we'll attempt to find the target
3✔
1574
        // channel in the database. If we can't find the channel, then we'll
3✔
1575
        // return the error back to the caller.
3✔
1576
        dbChan, err := c.FetchChannel(*chanPoint)
3✔
1577
        switch {
3✔
1578
        // If the channel wasn't found, then it's possible that it was already
1579
        // abandoned from the database.
1580
        case err == ErrChannelNotFound:
3✔
1581
                _, closedErr := c.FetchClosedChannel(chanPoint)
3✔
1582
                if closedErr != nil {
3✔
UNCOV
1583
                        return closedErr
×
UNCOV
1584
                }
×
1585

1586
                // If the channel was already closed, then we don't return an
1587
                // error as we'd like this step to be repeatable.
1588
                return nil
3✔
1589
        case err != nil:
×
1590
                return err
×
1591
        }
1592

1593
        // Now that we've found the channel, we'll populate a close summary for
1594
        // the channel, so we can store as much information for this abounded
1595
        // channel as possible. We also ensure that we set Pending to false, to
1596
        // indicate that this channel has been "fully" closed.
1597
        summary := &ChannelCloseSummary{
3✔
1598
                CloseType:               Abandoned,
3✔
1599
                ChanPoint:               *chanPoint,
3✔
1600
                ChainHash:               dbChan.ChainHash,
3✔
1601
                CloseHeight:             bestHeight,
3✔
1602
                RemotePub:               dbChan.IdentityPub,
3✔
1603
                Capacity:                dbChan.Capacity,
3✔
1604
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
3✔
1605
                ShortChanID:             dbChan.ShortChanID(),
3✔
1606
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
3✔
1607
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
3✔
1608
                LocalChanConfig:         dbChan.LocalChanCfg,
3✔
1609
        }
3✔
1610

3✔
1611
        // Finally, we'll close the channel in the DB, and return back to the
3✔
1612
        // caller. We set ourselves as the close initiator because we abandoned
3✔
1613
        // the channel.
3✔
1614
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
3✔
1615
}
1616

1617
// SaveChannelOpeningState saves the serialized channel state for the provided
1618
// chanPoint to the channelOpeningStateBucket.
1619
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1620
        serializedState []byte) error {
3✔
1621

3✔
1622
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1623
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
3✔
1624
                if err != nil {
3✔
1625
                        return err
×
1626
                }
×
1627

1628
                return bucket.Put(outPoint, serializedState)
3✔
1629
        }, func() {})
3✔
1630
}
1631

1632
// GetChannelOpeningState fetches the serialized channel state for the provided
1633
// outPoint from the database, or returns ErrChannelNotFound if the channel
1634
// is not found.
1635
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1636
        error) {
3✔
1637

3✔
1638
        var serializedState []byte
3✔
1639
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1640
                bucket := tx.ReadBucket(channelOpeningStateBucket)
3✔
1641
                if bucket == nil {
6✔
1642
                        // If the bucket does not exist, it means we never added
3✔
1643
                        //  a channel to the db, so return ErrChannelNotFound.
3✔
1644
                        return ErrChannelNotFound
3✔
1645
                }
3✔
1646

1647
                stateBytes := bucket.Get(outPoint)
3✔
1648
                if stateBytes == nil {
6✔
1649
                        return ErrChannelNotFound
3✔
1650
                }
3✔
1651

1652
                serializedState = append(serializedState, stateBytes...)
3✔
1653

3✔
1654
                return nil
3✔
1655
        }, func() {
3✔
1656
                serializedState = nil
3✔
1657
        })
3✔
1658
        return serializedState, err
3✔
1659
}
1660

1661
// DeleteChannelOpeningState removes any state for outPoint from the database.
1662
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
3✔
1663
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1664
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
3✔
1665
                if bucket == nil {
3✔
1666
                        return ErrChannelNotFound
×
1667
                }
×
1668

1669
                return bucket.Delete(outPoint)
3✔
1670
        }, func() {})
3✔
1671
}
1672

1673
// syncVersions function is used for safe db version synchronization. It
1674
// applies migration functions to the current database and recovers the
1675
// previous state of db if at least one error/panic appeared during migration.
1676
func (d *DB) syncVersions(versions []mandatoryVersion) error {
3✔
1677
        meta, err := d.FetchMeta()
3✔
1678
        if err != nil {
3✔
1679
                if err == ErrMetaNotFound {
×
1680
                        meta = &Meta{}
×
1681
                } else {
×
1682
                        return err
×
1683
                }
×
1684
        }
1685

1686
        latestVersion := getLatestDBVersion(versions)
3✔
1687
        log.Infof("Checking for schema update: latest_version=%v, "+
3✔
1688
                "db_version=%v", latestVersion, meta.DbVersionNumber)
3✔
1689

3✔
1690
        switch {
3✔
1691

1692
        // If the database reports a higher version that we are aware of, the
1693
        // user is probably trying to revert to a prior version of lnd. We fail
1694
        // here to prevent reversions and unintended corruption.
UNCOV
1695
        case meta.DbVersionNumber > latestVersion:
×
UNCOV
1696
                log.Errorf("Refusing to revert from db_version=%d to "+
×
UNCOV
1697
                        "lower version=%d", meta.DbVersionNumber,
×
UNCOV
1698
                        latestVersion)
×
UNCOV
1699
                return ErrDBReversion
×
1700

1701
        // If the current database version matches the latest version number,
1702
        // then we don't need to perform any migrations.
1703
        case meta.DbVersionNumber == latestVersion:
3✔
1704
                return nil
3✔
1705
        }
1706

UNCOV
1707
        log.Infof("Performing database schema migration")
×
UNCOV
1708

×
UNCOV
1709
        // Otherwise, we fetch the migrations which need to applied, and
×
UNCOV
1710
        // execute them serially within a single database transaction to ensure
×
UNCOV
1711
        // the migration is atomic.
×
UNCOV
1712
        migrations, migrationVersions := getMigrationsToApply(
×
UNCOV
1713
                versions, meta.DbVersionNumber,
×
UNCOV
1714
        )
×
UNCOV
1715
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
1716
                for i, migration := range migrations {
×
UNCOV
1717
                        if migration == nil {
×
1718
                                continue
×
1719
                        }
1720

UNCOV
1721
                        log.Infof("Applying migration #%v",
×
UNCOV
1722
                                migrationVersions[i])
×
UNCOV
1723

×
UNCOV
1724
                        if err := migration(tx); err != nil {
×
UNCOV
1725
                                log.Infof("Unable to apply migration #%v",
×
UNCOV
1726
                                        migrationVersions[i])
×
UNCOV
1727
                                return err
×
UNCOV
1728
                        }
×
1729
                }
1730

UNCOV
1731
                meta.DbVersionNumber = latestVersion
×
UNCOV
1732
                err := putMeta(meta, tx)
×
UNCOV
1733
                if err != nil {
×
1734
                        return err
×
1735
                }
×
1736

1737
                // In dry-run mode, return an error to prevent the transaction
1738
                // from committing.
UNCOV
1739
                if d.dryRun {
×
UNCOV
1740
                        return ErrDryRunMigrationOK
×
UNCOV
1741
                }
×
1742

UNCOV
1743
                return nil
×
UNCOV
1744
        }, func() {})
×
1745
}
1746

1747
// applyOptionalVersions applies the optional migrations to the database if
1748
// specified in the config.
1749
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
3✔
1750
        // TODO(yy): need to design the db to support dry run for optional
3✔
1751
        // migrations.
3✔
1752
        if d.dryRun {
3✔
UNCOV
1753
                log.Info("Skipped optional migrations as dry run mode is not " +
×
UNCOV
1754
                        "supported yet")
×
UNCOV
1755
                return nil
×
UNCOV
1756
        }
×
1757

1758
        om, err := d.fetchOptionalMeta()
3✔
1759
        if err != nil {
3✔
1760
                if err == ErrMetaNotFound {
×
1761
                        om = &OptionalMeta{
×
1762
                                Versions: make(map[uint64]string),
×
1763
                        }
×
1764
                } else {
×
1765
                        return fmt.Errorf("unable to fetch optional "+
×
1766
                                "meta: %w", err)
×
1767
                }
×
1768
        }
1769

1770
        // migrationCfg is the parent configuration which implements the config
1771
        // interfaces of all the single optional migrations.
1772
        migrationCfg := &MigrationConfigImpl{
3✔
1773
                migration30.MigrateRevLogConfigImpl{
3✔
1774
                        NoAmountData: d.noRevLogAmtData,
3✔
1775
                },
3✔
1776
                migration34.MigrationConfigImpl{
3✔
1777
                        DecayedLog: cfg.DecayedLog,
3✔
1778
                },
3✔
1779
        }
3✔
1780

3✔
1781
        log.Infof("Applying %d optional migrations", len(optionalVersions))
3✔
1782

3✔
1783
        // Apply the optional migrations if requested.
3✔
1784
        for number, version := range optionalVersions {
6✔
1785
                log.Infof("Checking for optional update: name=%v", version.name)
3✔
1786

3✔
1787
                // Exit early if the optional migration is not specified.
3✔
1788
                if !cfg.MigrationFlags[number] {
6✔
1789
                        log.Debugf("Skipping optional migration: name=%s as "+
3✔
1790
                                "it is not specified in the config",
3✔
1791
                                version.name)
3✔
1792

3✔
1793
                        continue
3✔
1794
                }
1795

1796
                // Exit early if the optional migration has already been
1797
                // applied.
1798
                if _, ok := om.Versions[uint64(number)]; ok {
6✔
1799
                        log.Debugf("Skipping optional migration: name=%s as "+
3✔
1800
                                "it has already been applied", version.name)
3✔
1801

3✔
1802
                        continue
3✔
1803
                }
1804

1805
                log.Infof("Performing database optional migration: %s",
3✔
1806
                        version.name)
3✔
1807

3✔
1808
                // Call the migration function for the specific optional
3✔
1809
                // migration.
3✔
1810
                if err := version.migration(d, migrationCfg); err != nil {
3✔
1811
                        log.Errorf("Unable to apply optional migration: %s, "+
×
1812
                                "error: %v", version.name, err)
×
1813
                        return err
×
1814
                }
×
1815

1816
                // Update the optional meta. Notice that unlike the mandatory db
1817
                // migrations where we perform the migration and updating meta
1818
                // in a single db transaction, we use different transactions
1819
                // here. Even when the following update is failed, we should be
1820
                // fine here as we would re-run the optional migration again,
1821
                // which is a noop, during next startup.
1822
                om.Versions[uint64(number)] = version.name
3✔
1823
                if err := d.putOptionalMeta(om); err != nil {
3✔
1824
                        log.Errorf("Unable to update optional meta: %v", err)
×
1825
                        return err
×
1826
                }
×
1827

1828
                log.Infof("Successfully applied optional migration: %s",
3✔
1829
                        version.name)
3✔
1830
        }
1831

1832
        return nil
3✔
1833
}
1834

1835
// ChannelStateDB returns the sub database that is concerned with the channel
1836
// state.
1837
func (d *DB) ChannelStateDB() *ChannelStateDB {
3✔
1838
        return d.channelStateDB
3✔
1839
}
3✔
1840

1841
// LatestDBVersion returns the number of the latest database version currently
1842
// known to the channel DB.
UNCOV
1843
func LatestDBVersion() uint32 {
×
UNCOV
1844
        return getLatestDBVersion(dbVersions)
×
UNCOV
1845
}
×
1846

1847
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
3✔
1848
        return versions[len(versions)-1].number
3✔
1849
}
3✔
1850

1851
// getMigrationsToApply retrieves the migration function that should be
1852
// applied to the database.
1853
func getMigrationsToApply(versions []mandatoryVersion,
UNCOV
1854
        version uint32) ([]migration, []uint32) {
×
UNCOV
1855

×
UNCOV
1856
        migrations := make([]migration, 0, len(versions))
×
UNCOV
1857
        migrationVersions := make([]uint32, 0, len(versions))
×
UNCOV
1858

×
UNCOV
1859
        for _, v := range versions {
×
UNCOV
1860
                if v.number > version {
×
UNCOV
1861
                        migrations = append(migrations, v.migration)
×
UNCOV
1862
                        migrationVersions = append(migrationVersions, v.number)
×
UNCOV
1863
                }
×
1864
        }
1865

UNCOV
1866
        return migrations, migrationVersions
×
1867
}
1868

1869
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1870
// from the historical channel bucket. If the bucket does not exist,
1871
// ErrNoHistoricalBucket is returned.
1872
func fetchHistoricalChanBucket(tx kvdb.RTx,
1873
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
3✔
1874

3✔
1875
        // First fetch the top level bucket which stores all data related to
3✔
1876
        // historically stored channels.
3✔
1877
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
3✔
1878
        if historicalChanBucket == nil {
3✔
1879
                return nil, ErrNoHistoricalBucket
×
1880
        }
×
1881

1882
        // With the bucket for the node and chain fetched, we can now go down
1883
        // another level, for the channel itself.
1884
        var chanPointBuf bytes.Buffer
3✔
1885
        if err := graphdb.WriteOutpoint(&chanPointBuf, outPoint); err != nil {
3✔
1886
                return nil, err
×
1887
        }
×
1888
        chanBucket := historicalChanBucket.NestedReadBucket(
3✔
1889
                chanPointBuf.Bytes(),
3✔
1890
        )
3✔
1891
        if chanBucket == nil {
3✔
UNCOV
1892
                return nil, ErrChannelNotFound
×
UNCOV
1893
        }
×
1894

1895
        return chanBucket, nil
3✔
1896
}
1897

1898
// FetchHistoricalChannel fetches open channel data from the historical channel
1899
// bucket.
1900
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1901
        *OpenChannel, error) {
3✔
1902

3✔
1903
        var channel *OpenChannel
3✔
1904
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1905
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
3✔
1906
                if err != nil {
3✔
UNCOV
1907
                        return err
×
UNCOV
1908
                }
×
1909

1910
                channel, err = fetchOpenChannel(chanBucket, outPoint)
3✔
1911
                if err != nil {
3✔
1912
                        return err
×
1913
                }
×
1914

1915
                channel.Db = c
3✔
1916
                return nil
3✔
1917
        }, func() {
3✔
1918
                channel = nil
3✔
1919
        })
3✔
1920
        if err != nil {
3✔
UNCOV
1921
                return nil, err
×
UNCOV
1922
        }
×
1923

1924
        return channel, nil
3✔
1925
}
1926

1927
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1928
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
3✔
1929

3✔
1930
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
3✔
1931
        if finalHtlcsBucket == nil {
6✔
1932
                return nil, ErrFinalHtlcsBucketNotFound
3✔
1933
        }
3✔
1934

1935
        var chanIDBytes [8]byte
3✔
1936
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
3✔
1937

3✔
1938
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
3✔
1939
        if chanBucket == nil {
3✔
1940
                return nil, ErrFinalChannelBucketNotFound
×
1941
        }
×
1942

1943
        return chanBucket, nil
3✔
1944
}
1945

1946
var ErrHtlcUnknown = errors.New("htlc unknown")
1947

1948
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1949
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1950
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1951
        htlcIndex uint64) (*FinalHtlcInfo, error) {
3✔
1952

3✔
1953
        var idBytes [8]byte
3✔
1954
        byteOrder.PutUint64(idBytes[:], htlcIndex)
3✔
1955

3✔
1956
        var settledByte byte
3✔
1957

3✔
1958
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1959
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
3✔
1960
                        tx, chanID,
3✔
1961
                )
3✔
1962
                switch {
3✔
1963
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
3✔
1964
                        fallthrough
3✔
1965

1966
                case errors.Is(err, ErrFinalChannelBucketNotFound):
3✔
1967
                        return ErrHtlcUnknown
3✔
1968

1969
                case err != nil:
×
1970
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1971
                                err)
×
1972
                }
1973

1974
                value := finalHtlcsBucket.Get(idBytes[:])
3✔
1975
                if value == nil {
3✔
UNCOV
1976
                        return ErrHtlcUnknown
×
UNCOV
1977
                }
×
1978

1979
                if len(value) != 1 {
3✔
1980
                        return errors.New("unexpected final htlc value length")
×
1981
                }
×
1982

1983
                settledByte = value[0]
3✔
1984

3✔
1985
                return nil
3✔
1986
        }, func() {
3✔
1987
                settledByte = 0
3✔
1988
        })
3✔
1989
        if err != nil {
6✔
1990
                return nil, err
3✔
1991
        }
3✔
1992

1993
        info := FinalHtlcInfo{
3✔
1994
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
3✔
1995
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
3✔
1996
        }
3✔
1997

3✔
1998
        return &info, nil
3✔
1999
}
2000

2001
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
2002
// the database.
2003
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
2004
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
3✔
2005

3✔
2006
        // Skip if the user did not opt in to storing final resolutions.
3✔
2007
        if !c.parent.storeFinalHtlcResolutions {
6✔
2008
                return nil
3✔
2009
        }
3✔
2010

UNCOV
2011
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
×
UNCOV
2012
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
×
UNCOV
2013
                if err != nil {
×
2014
                        return err
×
2015
                }
×
2016

UNCOV
2017
                return putFinalHtlc(
×
UNCOV
2018
                        finalHtlcsBucket, htlcID,
×
UNCOV
2019
                        FinalHtlcInfo{
×
UNCOV
2020
                                Settled:  settled,
×
UNCOV
2021
                                Offchain: false,
×
UNCOV
2022
                        },
×
UNCOV
2023
                )
×
UNCOV
2024
        }, func() {})
×
2025
}
2026

2027
// MakeTestInvoiceDB is used to create a test invoice database for testing
2028
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
2029
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
UNCOV
2030
        invoices.InvoiceDB, error) {
×
UNCOV
2031

×
UNCOV
2032
        return MakeTestDB(t, modifiers...)
×
UNCOV
2033
}
×
2034

2035
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
2036
// A callback which cleans up the created temporary directories is also
2037
// returned and intended to be executed after the test completes.
UNCOV
2038
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
×
UNCOV
2039
        // First, create a temporary directory to be used for the duration of
×
UNCOV
2040
        // this test.
×
UNCOV
2041
        tempDirName := t.TempDir()
×
UNCOV
2042

×
UNCOV
2043
        // Next, create channeldb for the first time.
×
UNCOV
2044
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
×
UNCOV
2045
        if err != nil {
×
2046
                backendCleanup()
×
2047
                return nil, err
×
2048
        }
×
2049

UNCOV
2050
        cdb, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
2051
        if err != nil {
×
2052
                backendCleanup()
×
2053
                return nil, err
×
2054
        }
×
2055

UNCOV
2056
        t.Cleanup(func() {
×
UNCOV
2057
                cdb.Close()
×
UNCOV
2058
                backendCleanup()
×
UNCOV
2059
        })
×
2060

UNCOV
2061
        return cdb, nil
×
2062
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc