• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17027244024

17 Aug 2025 11:32PM UTC coverage: 57.287% (-9.5%) from 66.765%
17027244024

Pull #10167

github

web-flow
Merge fcb4f4303 into fb1adfc21
Pull Request #10167: multi: bump Go to 1.24.6

3 of 18 new or added lines in 6 files covered. (16.67%)

28537 existing lines in 457 files now uncovered.

99094 of 172978 relevant lines covered (57.29%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.83
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "net"
10
        "os"
11
        "testing"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/btcsuite/btcwallet/walletdb"
16
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
17
        "github.com/lightningnetwork/lnd/channeldb/migration12"
18
        "github.com/lightningnetwork/lnd/channeldb/migration13"
19
        "github.com/lightningnetwork/lnd/channeldb/migration16"
20
        "github.com/lightningnetwork/lnd/channeldb/migration20"
21
        "github.com/lightningnetwork/lnd/channeldb/migration21"
22
        "github.com/lightningnetwork/lnd/channeldb/migration23"
23
        "github.com/lightningnetwork/lnd/channeldb/migration24"
24
        "github.com/lightningnetwork/lnd/channeldb/migration25"
25
        "github.com/lightningnetwork/lnd/channeldb/migration26"
26
        "github.com/lightningnetwork/lnd/channeldb/migration27"
27
        "github.com/lightningnetwork/lnd/channeldb/migration29"
28
        "github.com/lightningnetwork/lnd/channeldb/migration30"
29
        "github.com/lightningnetwork/lnd/channeldb/migration31"
30
        "github.com/lightningnetwork/lnd/channeldb/migration32"
31
        "github.com/lightningnetwork/lnd/channeldb/migration33"
32
        "github.com/lightningnetwork/lnd/channeldb/migration34"
33
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
34
        "github.com/lightningnetwork/lnd/clock"
35
        graphdb "github.com/lightningnetwork/lnd/graph/db"
36
        "github.com/lightningnetwork/lnd/invoices"
37
        "github.com/lightningnetwork/lnd/kvdb"
38
        "github.com/lightningnetwork/lnd/lnwire"
39
        "github.com/stretchr/testify/require"
40
)
41

42
const (
43
        dbName = "channel.db"
44
)
45

46
var (
47
        // ErrDryRunMigrationOK signals that a migration executed successful,
48
        // but we intentionally did not commit the result.
49
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
50

51
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
52
        // bucket does not exist.
53
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
54
                "found")
55

56
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
57
        // final htlc outcomes does not exist.
58
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
59
                "bucket not found")
60
)
61

62
// migration is a function which takes a prior outdated version of the database
63
// instances and mutates the key/bucket structure to arrive at a more
64
// up-to-date version of the database.
65
type migration func(tx kvdb.RwTx) error
66

67
// mandatoryVersion defines a db version that must be applied before the lnd
68
// starts.
69
type mandatoryVersion struct {
70
        number    uint32
71
        migration migration
72
}
73

74
// MigrationConfig is an interface combines the config interfaces of all
75
// optional migrations.
76
type MigrationConfig interface {
77
        migration30.MigrateRevLogConfig
78
        migration34.MigrationConfig
79
}
80

81
// MigrationConfigImpl is a super set of all the various migration configs and
82
// an implementation of MigrationConfig.
83
type MigrationConfigImpl struct {
84
        migration30.MigrateRevLogConfigImpl
85
        migration34.MigrationConfigImpl
86
}
87

88
// optionalMigration defines an optional migration function. When a migration
89
// is optional, it usually involves a large scale of changes that might touch
90
// millions of keys. Due to OOM concern, the update cannot be safely done
91
// within one db transaction. Thus, for optional migrations, they must take the
92
// db backend and construct transactions as needed.
93
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
94

95
// optionalVersion defines a db version that can be optionally applied. When
96
// applying migrations, we must apply all the mandatory migrations first before
97
// attempting optional ones.
98
type optionalVersion struct {
99
        name      string
100
        migration optionalMigration
101
}
102

103
var (
104
        // dbVersions is storing all mandatory versions of database. If current
105
        // version of database don't match with latest version this list will
106
        // be used for retrieving all migration function that are need to apply
107
        // to the current db.
108
        dbVersions = []mandatoryVersion{
109
                {
110
                        // The base DB version requires no migration.
111
                        number:    0,
112
                        migration: nil,
113
                },
114
                {
115
                        // The version of the database where two new indexes
116
                        // for the update time of node and channel updates were
117
                        // added.
118
                        number:    1,
119
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
120
                },
121
                {
122
                        // The DB version that added the invoice event time
123
                        // series.
124
                        number:    2,
125
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
126
                },
127
                {
128
                        // The DB version that updated the embedded invoice in
129
                        // outgoing payments to match the new format.
130
                        number:    3,
131
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
132
                },
133
                {
134
                        // The version of the database where every channel
135
                        // always has two entries in the edges bucket. If
136
                        // a policy is unknown, this will be represented
137
                        // by a special byte sequence.
138
                        number:    4,
139
                        migration: migration_01_to_11.MigrateEdgePolicies,
140
                },
141
                {
142
                        // The DB version where we persist each attempt to send
143
                        // an HTLC to a payment hash, and track whether the
144
                        // payment is in-flight, succeeded, or failed.
145
                        number:    5,
146
                        migration: migration_01_to_11.PaymentStatusesMigration,
147
                },
148
                {
149
                        // The DB version that properly prunes stale entries
150
                        // from the edge update index.
151
                        number:    6,
152
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
153
                },
154
                {
155
                        // The DB version that migrates the ChannelCloseSummary
156
                        // to a format where optional fields are indicated with
157
                        // boolean flags.
158
                        number:    7,
159
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
160
                },
161
                {
162
                        // The DB version that changes the gossiper's message
163
                        // store keys to account for the message's type and
164
                        // ShortChannelID.
165
                        number:    8,
166
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
167
                },
168
                {
169
                        // The DB version where the payments and payment
170
                        // statuses are moved to being stored in a combined
171
                        // bucket.
172
                        number:    9,
173
                        migration: migration_01_to_11.MigrateOutgoingPayments,
174
                },
175
                {
176
                        // The DB version where we started to store legacy
177
                        // payload information for all routes, as well as the
178
                        // optional TLV records.
179
                        number:    10,
180
                        migration: migration_01_to_11.MigrateRouteSerialization,
181
                },
182
                {
183
                        // Add invoice htlc and cltv delta fields.
184
                        number:    11,
185
                        migration: migration_01_to_11.MigrateInvoices,
186
                },
187
                {
188
                        // Migrate to TLV invoice bodies, add payment address
189
                        // and features, remove receipt.
190
                        number:    12,
191
                        migration: migration12.MigrateInvoiceTLV,
192
                },
193
                {
194
                        // Migrate to multi-path payments.
195
                        number:    13,
196
                        migration: migration13.MigrateMPP,
197
                },
198
                {
199
                        // Initialize payment address index and begin using it
200
                        // as the default index, falling back to payment hash
201
                        // index.
202
                        number:    14,
203
                        migration: mig.CreateTLB(payAddrIndexBucket),
204
                },
205
                {
206
                        // This used to be create payment related top-level
207
                        // buckets, however this is now done by the payment
208
                        // package.
209
                        number: 15,
210
                        migration: func(tx kvdb.RwTx) error {
×
211
                                return nil
×
212
                        },
×
213
                },
214
                {
215
                        // Add our existing payments to the index bucket created
216
                        // in migration 15.
217
                        number:    16,
218
                        migration: migration16.MigrateSequenceIndex,
219
                },
220
                {
221
                        // Create a top level bucket which will store extra
222
                        // information about channel closes.
223
                        number:    17,
224
                        migration: mig.CreateTLB(closeSummaryBucket),
225
                },
226
                {
227
                        // Create a top level bucket which holds information
228
                        // about our peers.
229
                        number:    18,
230
                        migration: mig.CreateTLB(peersBucket),
231
                },
232
                {
233
                        // Create a top level bucket which holds outpoint
234
                        // information.
235
                        number:    19,
236
                        migration: mig.CreateTLB(outpointBucket),
237
                },
238
                {
239
                        // Migrate some data to the outpoint index.
240
                        number:    20,
241
                        migration: migration20.MigrateOutpointIndex,
242
                },
243
                {
244
                        // Migrate to length prefixed wire messages everywhere
245
                        // in the database.
246
                        number:    21,
247
                        migration: migration21.MigrateDatabaseWireMessages,
248
                },
249
                {
250
                        // Initialize set id index so that invoices can be
251
                        // queried by individual htlc sets.
252
                        number:    22,
253
                        migration: mig.CreateTLB(setIDIndexBucket),
254
                },
255
                {
256
                        number:    23,
257
                        migration: migration23.MigrateHtlcAttempts,
258
                },
259
                {
260
                        // Remove old forwarding packages of closed channels.
261
                        number:    24,
262
                        migration: migration24.MigrateFwdPkgCleanup,
263
                },
264
                {
265
                        // Save the initial local/remote balances in channel
266
                        // info.
267
                        number:    25,
268
                        migration: migration25.MigrateInitialBalances,
269
                },
270
                {
271
                        // Migrate the initial local/remote balance fields into
272
                        // tlv records.
273
                        number:    26,
274
                        migration: migration26.MigrateBalancesToTlvRecords,
275
                },
276
                {
277
                        // Patch the initial local/remote balance fields with
278
                        // empty values for historical channels.
279
                        number:    27,
280
                        migration: migration27.MigrateHistoricalBalances,
281
                },
282
                {
283
                        number:    28,
284
                        migration: mig.CreateTLB(chanIDBucket),
285
                },
286
                {
287
                        number:    29,
288
                        migration: migration29.MigrateChanID,
289
                },
290
                {
291
                        // Removes the "sweeper-last-tx" bucket. Although we
292
                        // do not have a mandatory version 30 we skip this
293
                        // version because its naming is already used for the
294
                        // first optional migration.
295
                        number:    31,
296
                        migration: migration31.DeleteLastPublishedTxTLB,
297
                },
298
                {
299
                        number:    32,
300
                        migration: migration32.MigrateMCRouteSerialisation,
301
                },
302
                {
303
                        number:    33,
304
                        migration: migration33.MigrateMCStoreNameSpacedResults,
305
                },
306
        }
307

308
        // optionalVersions stores all optional migrations that are applied
309
        // after dbVersions.
310
        //
311
        // NOTE: optional migrations must be fault-tolerant and re-run already
312
        // migrated data must be noop, which means the migration must be able
313
        // to determine its state.
314
        optionalVersions = []optionalVersion{
315
                {
316
                        name: "prune_revocation_log",
317
                        migration: func(db kvdb.Backend,
318
                                cfg MigrationConfig) error {
×
319

×
320
                                return migration30.MigrateRevocationLog(db, cfg)
×
321
                        },
×
322
                },
323
                {
324
                        name: "gc_decayed_log",
325
                        migration: func(db kvdb.Backend,
326
                                cfg MigrationConfig) error {
3✔
327

3✔
328
                                return migration34.MigrateDecayedLog(
3✔
329
                                        db, cfg,
3✔
330
                                )
3✔
331
                        },
3✔
332
                },
333
        }
334

335
        // Big endian is the preferred byte order, due to cursor scans over
336
        // integer keys iterating in order.
337
        byteOrder = binary.BigEndian
338

339
        // channelOpeningStateBucket is the database bucket used to store the
340
        // channelOpeningState for each channel that is currently in the process
341
        // of being opened.
342
        channelOpeningStateBucket = []byte("channelOpeningState")
343
)
344

345
// DB is the primary datastore for the lnd daemon. The database stores
346
// information related to nodes, routing data, open/closed channels, fee
347
// schedules, and reputation data.
348
type DB struct {
349
        kvdb.Backend
350

351
        // channelStateDB separates all DB operations on channel state.
352
        channelStateDB *ChannelStateDB
353

354
        dbPath                    string
355
        clock                     clock.Clock
356
        dryRun                    bool
357
        storeFinalHtlcResolutions bool
358

359
        // noRevLogAmtData if true, means that commitment transaction amount
360
        // data should not be stored in the revocation log.
361
        noRevLogAmtData bool
362
}
363

364
// OpenForTesting opens or creates a channeldb to be used for tests. Any
365
// necessary schemas migrations due to updates will take place as necessary.
366
func OpenForTesting(t testing.TB, dbPath string,
UNCOV
367
        modifiers ...OptionModifier) *DB {
×
UNCOV
368

×
UNCOV
369
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
×
UNCOV
370
                DBPath:            dbPath,
×
UNCOV
371
                DBFileName:        dbName,
×
UNCOV
372
                NoFreelistSync:    true,
×
UNCOV
373
                AutoCompact:       false,
×
UNCOV
374
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
×
UNCOV
375
                DBTimeout:         kvdb.DefaultDBTimeout,
×
UNCOV
376
        })
×
UNCOV
377
        require.NoError(t, err)
×
UNCOV
378

×
UNCOV
379
        db, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
380
        require.NoError(t, err)
×
UNCOV
381

×
UNCOV
382
        db.dbPath = dbPath
×
UNCOV
383

×
UNCOV
384
        t.Cleanup(func() {
×
UNCOV
385
                require.NoError(t, db.Close())
×
UNCOV
386
        })
×
387

UNCOV
388
        return db
×
389
}
390

391
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
392
// Any necessary schemas migrations due to updates will take place as necessary.
393
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
394
        error) {
3✔
395

3✔
396
        opts := DefaultOptions()
3✔
397
        for _, modifier := range modifiers {
6✔
398
                modifier(&opts)
3✔
399
        }
3✔
400

401
        if !opts.NoMigration {
6✔
402
                if err := initChannelDB(backend); err != nil {
3✔
UNCOV
403
                        return nil, err
×
UNCOV
404
                }
×
405
        }
406

407
        chanDB := &DB{
3✔
408
                Backend: backend,
3✔
409
                channelStateDB: &ChannelStateDB{
3✔
410
                        linkNodeDB: &LinkNodeDB{
3✔
411
                                backend: backend,
3✔
412
                        },
3✔
413
                        backend: backend,
3✔
414
                },
3✔
415
                clock:                     opts.clock,
3✔
416
                dryRun:                    opts.dryRun,
3✔
417
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
3✔
418
                noRevLogAmtData:           opts.NoRevLogAmtData,
3✔
419
        }
3✔
420

3✔
421
        // Set the parent pointer (only used in tests).
3✔
422
        chanDB.channelStateDB.parent = chanDB
3✔
423

3✔
424
        // Synchronize the version of database and apply migrations if needed.
3✔
425
        if !opts.NoMigration {
6✔
426
                if err := chanDB.syncVersions(dbVersions); err != nil {
3✔
UNCOV
427
                        backend.Close()
×
UNCOV
428
                        return nil, err
×
UNCOV
429
                }
×
430

431
                // Grab the optional migration config.
432
                omc := opts.OptionalMiragtionConfig
3✔
433
                if err := chanDB.applyOptionalVersions(omc); err != nil {
3✔
434
                        backend.Close()
×
435
                        return nil, err
×
436
                }
×
437
        }
438

439
        return chanDB, nil
3✔
440
}
441

442
// Path returns the file path to the channel database.
UNCOV
443
func (d *DB) Path() string {
×
UNCOV
444
        return d.dbPath
×
UNCOV
445
}
×
446

447
var dbTopLevelBuckets = [][]byte{
448
        openChannelBucket,
449
        closedChannelBucket,
450
        forwardingLogBucket,
451
        fwdPackagesKey,
452
        invoiceBucket,
453
        payAddrIndexBucket,
454
        setIDIndexBucket,
455
        peersBucket,
456
        nodeInfoBucket,
457
        metaBucket,
458
        closeSummaryBucket,
459
        outpointBucket,
460
        chanIDBucket,
461
        historicalChannelBucket,
462
}
463

464
// Wipe completely deletes all saved state within all used buckets within the
465
// database. The deletion is done in a single transaction, therefore this
466
// operation is fully atomic.
UNCOV
467
func (d *DB) Wipe() error {
×
UNCOV
468
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
469
                for _, tlb := range dbTopLevelBuckets {
×
UNCOV
470
                        err := tx.DeleteTopLevelBucket(tlb)
×
UNCOV
471
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
472
                                return err
×
473
                        }
×
474
                }
UNCOV
475
                return nil
×
UNCOV
476
        }, func() {})
×
UNCOV
477
        if err != nil {
×
478
                return err
×
479
        }
×
480

UNCOV
481
        return initChannelDB(d.Backend)
×
482
}
483

484
// initChannelDB creates and initializes a fresh version of channeldb. In the
485
// case that the target path has not yet been created or doesn't yet exist, then
486
// the path is created. Additionally, all required top-level buckets used within
487
// the database are created.
488
func initChannelDB(db kvdb.Backend) error {
3✔
489
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
490
                // Check if DB was marked as inactive with a tomb stone.
3✔
491
                if err := EnsureNoTombstone(tx); err != nil {
3✔
UNCOV
492
                        return err
×
UNCOV
493
                }
×
494

495
                for _, tlb := range dbTopLevelBuckets {
6✔
496
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
3✔
497
                                return err
×
498
                        }
×
499
                }
500

501
                meta := &Meta{}
3✔
502
                // Check if DB is already initialized.
3✔
503
                err := FetchMeta(meta, tx)
3✔
504
                if err == nil {
6✔
505
                        return nil
3✔
506
                }
3✔
507

508
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
×
509
                return putMeta(meta, tx)
×
510
        }, func() {})
3✔
511
        if err != nil {
3✔
UNCOV
512
                return fmt.Errorf("unable to create new channeldb: %w", err)
×
UNCOV
513
        }
×
514

515
        return nil
3✔
516
}
517

518
// fileExists returns true if the file exists, and false otherwise.
UNCOV
519
func fileExists(path string) bool {
×
UNCOV
520
        if _, err := os.Stat(path); err != nil {
×
521
                if os.IsNotExist(err) {
×
522
                        return false
×
523
                }
×
524
        }
525

UNCOV
526
        return true
×
527
}
528

529
// ChannelStateDB is a database that keeps track of all channel state.
530
type ChannelStateDB struct {
531
        // linkNodeDB separates all DB operations on LinkNodes.
532
        linkNodeDB *LinkNodeDB
533

534
        // parent holds a pointer to the "main" channeldb.DB object. This is
535
        // only used for testing and should never be used in production code.
536
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
537
        // this pointer.
538
        parent *DB
539

540
        // backend points to the actual backend holding the channel state
541
        // database. This may be a real backend or a cache middleware.
542
        backend kvdb.Backend
543
}
544

545
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
546
// ChannelStateDB instance. Use this function only in tests where passing around
547
// pointers makes testing less readable. Never to be used in production code!
UNCOV
548
func (c *ChannelStateDB) GetParentDB() *DB {
×
UNCOV
549
        return c.parent
×
UNCOV
550
}
×
551

552
// LinkNodeDB returns the current instance of the link node database.
553
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
3✔
554
        return c.linkNodeDB
3✔
555
}
3✔
556

557
// FetchOpenChannels starts a new database transaction and returns all stored
558
// currently active/open channels associated with the target nodeID. In the case
559
// that no active channels are known to have been created with this node, then a
560
// zero-length slice is returned.
561
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
562
        []*OpenChannel, error) {
3✔
563

3✔
564
        var channels []*OpenChannel
3✔
565
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
566
                var err error
3✔
567
                channels, err = c.fetchOpenChannels(tx, nodeID)
3✔
568
                return err
3✔
569
        }, func() {
6✔
570
                channels = nil
3✔
571
        })
3✔
572

573
        return channels, err
3✔
574
}
575

576
// fetchOpenChannels uses and existing database transaction and returns all
577
// stored currently active/open channels associated with the target nodeID. In
578
// the case that no active channels are known to have been created with this
579
// node, then a zero-length slice is returned.
580
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
581
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
3✔
582

3✔
583
        // Get the bucket dedicated to storing the metadata for open channels.
3✔
584
        openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
585
        if openChanBucket == nil {
3✔
586
                return nil, nil
×
587
        }
×
588

589
        // Within this top level bucket, fetch the bucket dedicated to storing
590
        // open channel data specific to the remote node.
591
        pub := nodeID.SerializeCompressed()
3✔
592
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
3✔
593
        if nodeChanBucket == nil {
6✔
594
                return nil, nil
3✔
595
        }
3✔
596

597
        // Next, we'll need to go down an additional layer in order to retrieve
598
        // the channels for each chain the node knows of.
599
        var channels []*OpenChannel
3✔
600
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
601
                // If there's a value, it's not a bucket so ignore it.
3✔
602
                if v != nil {
3✔
603
                        return nil
×
604
                }
×
605

606
                // If we've found a valid chainhash bucket, then we'll retrieve
607
                // that so we can extract all the channels.
608
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
3✔
609
                if chainBucket == nil {
3✔
610
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
611
                                chainHash[:])
×
612
                }
×
613

614
                // Finally, we both of the necessary buckets retrieved, fetch
615
                // all the active channels related to this node.
616
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
3✔
617
                if err != nil {
3✔
618
                        return fmt.Errorf("unable to read channel for "+
×
619
                                "chain_hash=%x, node_key=%x: %v",
×
620
                                chainHash[:], pub, err)
×
621
                }
×
622

623
                channels = append(channels, nodeChannels...)
3✔
624
                return nil
3✔
625
        })
626

627
        return channels, err
3✔
628
}
629

630
// fetchNodeChannels retrieves all active channels from the target chainBucket
631
// which is under a node's dedicated channel bucket. This function is typically
632
// used to fetch all the active channels related to a particular node.
633
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
634
        []*OpenChannel, error) {
3✔
635

3✔
636
        var channels []*OpenChannel
3✔
637

3✔
638
        // A node may have channels on several chains, so for each known chain,
3✔
639
        // we'll extract all the channels.
3✔
640
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
6✔
641
                // If there's a value, it's not a bucket so ignore it.
3✔
642
                if v != nil {
3✔
643
                        return nil
×
644
                }
×
645

646
                // Once we've found a valid channel bucket, we'll extract it
647
                // from the node's chain bucket.
648
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
3✔
649

3✔
650
                var outPoint wire.OutPoint
3✔
651
                err := graphdb.ReadOutpoint(
3✔
652
                        bytes.NewReader(chanPoint), &outPoint,
3✔
653
                )
3✔
654
                if err != nil {
3✔
655
                        return err
×
656
                }
×
657
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
3✔
658
                if err != nil {
3✔
659
                        return fmt.Errorf("unable to read channel data for "+
×
660
                                "chan_point=%v: %w", outPoint, err)
×
661
                }
×
662
                oChannel.Db = c
3✔
663

3✔
664
                channels = append(channels, oChannel)
3✔
665

3✔
666
                return nil
3✔
667
        })
668
        if err != nil {
3✔
669
                return nil, err
×
670
        }
×
671

672
        return channels, nil
3✔
673
}
674

675
// FetchChannel attempts to locate a channel specified by the passed channel
676
// point. If the channel cannot be found, then an error will be returned.
677
func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel,
678
        error) {
3✔
679

3✔
680
        var targetChanPoint bytes.Buffer
3✔
681
        err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint)
3✔
682
        if err != nil {
3✔
683
                return nil, err
×
684
        }
×
685

686
        targetChanPointBytes := targetChanPoint.Bytes()
3✔
687
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
688
                error) {
6✔
689

3✔
690
                return targetChanPointBytes, &chanPoint, nil
3✔
691
        }
3✔
692

693
        return c.channelScanner(nil, selector)
3✔
694
}
695

696
// FetchChannelByID attempts to locate a channel specified by the passed channel
697
// ID. If the channel cannot be found, then an error will be returned.
698
// Optionally an existing db tx can be supplied.
699
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
700
        *OpenChannel, error) {
3✔
701

3✔
702
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
703
                error) {
6✔
704

3✔
705
                var (
3✔
706
                        targetChanPointBytes []byte
3✔
707
                        targetChanPoint      *wire.OutPoint
3✔
708

3✔
709
                        // errChanFound is used to signal that the channel has
3✔
710
                        // been found so that iteration through the DB buckets
3✔
711
                        // can stop.
3✔
712
                        errChanFound = errors.New("channel found")
3✔
713
                )
3✔
714
                err := chainBkt.ForEach(func(k, _ []byte) error {
6✔
715
                        var outPoint wire.OutPoint
3✔
716
                        err := graphdb.ReadOutpoint(
3✔
717
                                bytes.NewReader(k), &outPoint,
3✔
718
                        )
3✔
719
                        if err != nil {
3✔
720
                                return err
×
721
                        }
×
722

723
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
3✔
724
                        if chanID != id {
3✔
UNCOV
725
                                return nil
×
UNCOV
726
                        }
×
727

728
                        targetChanPoint = &outPoint
3✔
729
                        targetChanPointBytes = k
3✔
730

3✔
731
                        return errChanFound
3✔
732
                })
733
                if err != nil && !errors.Is(err, errChanFound) {
3✔
734
                        return nil, nil, err
×
735
                }
×
736
                if targetChanPoint == nil {
3✔
UNCOV
737
                        return nil, nil, ErrChannelNotFound
×
UNCOV
738
                }
×
739

740
                return targetChanPointBytes, targetChanPoint, nil
3✔
741
        }
742

743
        return c.channelScanner(tx, selector)
3✔
744
}
745

746
// ChanCount is used by the server in determining access control.
747
type ChanCount struct {
748
        HasOpenOrClosedChan bool
749
        PendingOpenCount    uint64
750
}
751

752
// FetchPermAndTempPeers returns a map where the key is the remote node's
753
// public key and the value is a struct that has a tally of the pending-open
754
// channels and whether the peer has an open or closed channel with us.
755
func (c *ChannelStateDB) FetchPermAndTempPeers(
756
        chainHash []byte) (map[string]ChanCount, error) {
3✔
757

3✔
758
        peerChanInfo := make(map[string]ChanCount)
3✔
759

3✔
760
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
761
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
762
                if openChanBucket == nil {
3✔
763
                        return ErrNoChanDBExists
×
764
                }
×
765

766
                openChanErr := openChanBucket.ForEach(func(nodePub,
3✔
767
                        v []byte) error {
6✔
768

3✔
769
                        // If there is a value, this is not a bucket.
3✔
770
                        if v != nil {
3✔
771
                                return nil
×
772
                        }
×
773

774
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
775
                                nodePub,
3✔
776
                        )
3✔
777
                        if nodeChanBucket == nil {
3✔
778
                                return nil
×
779
                        }
×
780

781
                        chainBucket := nodeChanBucket.NestedReadBucket(
3✔
782
                                chainHash,
3✔
783
                        )
3✔
784
                        if chainBucket == nil {
3✔
785
                                return fmt.Errorf("no chain bucket exists")
×
786
                        }
×
787

788
                        var isPermPeer bool
3✔
789
                        var pendingOpenCount uint64
3✔
790

3✔
791
                        internalErr := chainBucket.ForEach(func(chanPoint,
3✔
792
                                val []byte) error {
6✔
793

3✔
794
                                // If there is a value, this is not a bucket.
3✔
795
                                if val != nil {
3✔
796
                                        return nil
×
797
                                }
×
798

799
                                chanBucket := chainBucket.NestedReadBucket(
3✔
800
                                        chanPoint,
3✔
801
                                )
3✔
802
                                if chanBucket == nil {
3✔
803
                                        return nil
×
804
                                }
×
805

806
                                var op wire.OutPoint
3✔
807
                                readErr := graphdb.ReadOutpoint(
3✔
808
                                        bytes.NewReader(chanPoint), &op,
3✔
809
                                )
3✔
810
                                if readErr != nil {
3✔
811
                                        return readErr
×
812
                                }
×
813

814
                                // We need to go through each channel and look
815
                                // at the IsPending status.
816
                                openChan, err := fetchOpenChannel(
3✔
817
                                        chanBucket, &op,
3✔
818
                                )
3✔
819
                                if err != nil {
3✔
820
                                        return err
×
821
                                }
×
822

823
                                if openChan.IsPending {
6✔
824
                                        // Add to the pending-open count since
3✔
825
                                        // this is a temp peer.
3✔
826
                                        pendingOpenCount++
3✔
827
                                        return nil
3✔
828
                                }
3✔
829

830
                                // Since IsPending is false, this is a perm
831
                                // peer.
832
                                isPermPeer = true
3✔
833

3✔
834
                                return nil
3✔
835
                        })
836
                        if internalErr != nil {
3✔
837
                                return internalErr
×
838
                        }
×
839

840
                        peerCount := ChanCount{
3✔
841
                                HasOpenOrClosedChan: isPermPeer,
3✔
842
                                PendingOpenCount:    pendingOpenCount,
3✔
843
                        }
3✔
844
                        peerChanInfo[string(nodePub)] = peerCount
3✔
845

3✔
846
                        return nil
3✔
847
                })
848
                if openChanErr != nil {
3✔
849
                        return openChanErr
×
850
                }
×
851

852
                // Now check the closed channel bucket.
853
                historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
3✔
854
                if historicalChanBucket == nil {
3✔
855
                        return ErrNoHistoricalBucket
×
856
                }
×
857

858
                historicalErr := historicalChanBucket.ForEach(func(chanPoint,
3✔
859
                        v []byte) error {
6✔
860
                        // Parse each nested bucket and the chanInfoKey to get
3✔
861
                        // the IsPending bool. This determines whether the
3✔
862
                        // peer is protected or not.
3✔
863
                        if v != nil {
3✔
864
                                // This is not a bucket. This is currently not
×
865
                                // possible.
×
866
                                return nil
×
867
                        }
×
868

869
                        chanBucket := historicalChanBucket.NestedReadBucket(
3✔
870
                                chanPoint,
3✔
871
                        )
3✔
872
                        if chanBucket == nil {
3✔
873
                                // This is not possible.
×
874
                                return fmt.Errorf("no historical channel " +
×
875
                                        "bucket exists")
×
876
                        }
×
877

878
                        var op wire.OutPoint
3✔
879
                        readErr := graphdb.ReadOutpoint(
3✔
880
                                bytes.NewReader(chanPoint), &op,
3✔
881
                        )
3✔
882
                        if readErr != nil {
3✔
883
                                return readErr
×
884
                        }
×
885

886
                        // This channel is closed, but the structure of the
887
                        // historical bucket is the same. This is by design,
888
                        // which means we can call fetchOpenChannel.
889
                        channel, fetchErr := fetchOpenChannel(chanBucket, &op)
3✔
890
                        if fetchErr != nil {
3✔
891
                                return fetchErr
×
892
                        }
×
893

894
                        // Only include this peer in the protected class if
895
                        // the closing transaction confirmed. Note that
896
                        // CloseChannel can be called in the funding manager
897
                        // while IsPending is true which is why we need this
898
                        // special-casing to not count premature funding
899
                        // manager calls to CloseChannel.
900
                        if !channel.IsPending {
6✔
901
                                // Fetch the public key of the remote node. We
3✔
902
                                // need to use the string-ified serialized,
3✔
903
                                // compressed bytes as the key.
3✔
904
                                remotePub := channel.IdentityPub
3✔
905
                                remoteSer := remotePub.SerializeCompressed()
3✔
906
                                remoteKey := string(remoteSer)
3✔
907

3✔
908
                                count, exists := peerChanInfo[remoteKey]
3✔
909
                                if exists {
6✔
910
                                        count.HasOpenOrClosedChan = true
3✔
911
                                        peerChanInfo[remoteKey] = count
3✔
912
                                } else {
3✔
913
                                        peerCount := ChanCount{
×
914
                                                HasOpenOrClosedChan: true,
×
915
                                        }
×
916
                                        peerChanInfo[remoteKey] = peerCount
×
917
                                }
×
918
                        }
919

920
                        return nil
3✔
921
                })
922
                if historicalErr != nil {
3✔
923
                        return historicalErr
×
924
                }
×
925

926
                return nil
3✔
927
        }, func() {
3✔
928
                clear(peerChanInfo)
3✔
929
        })
3✔
930

931
        return peerChanInfo, err
3✔
932
}
933

934
// channelSelector describes a function that takes a chain-hash bucket from
935
// within the open-channel DB and returns the wanted channel point bytes, and
936
// channel point. It must return the ErrChannelNotFound error if the wanted
937
// channel is not in the given bucket.
938
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
939
        error)
940

941
// channelScanner will traverse the DB to each chain-hash bucket of each node
942
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
943
// to fetch the wanted channel outpoint from the chain bucket.
944
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
945
        chanSelect channelSelector) (*OpenChannel, error) {
3✔
946

3✔
947
        var (
3✔
948
                targetChan *OpenChannel
3✔
949

3✔
950
                // errChanFound is used to signal that the channel has been
3✔
951
                // found so that iteration through the DB buckets can stop.
3✔
952
                errChanFound = errors.New("channel found")
3✔
953
        )
3✔
954

3✔
955
        // chanScan will traverse the following bucket structure:
3✔
956
        //  * nodePub => chainHash => chanPoint
3✔
957
        //
3✔
958
        // At each level we go one further, ensuring that we're traversing the
3✔
959
        // proper key (that's actually a bucket). By only reading the bucket
3✔
960
        // structure and skipping fully decoding each channel, we save a good
3✔
961
        // bit of CPU as we don't need to do things like decompress public
3✔
962
        // keys.
3✔
963
        chanScan := func(tx kvdb.RTx) error {
6✔
964
                // Get the bucket dedicated to storing the metadata for open
3✔
965
                // channels.
3✔
966
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
967
                if openChanBucket == nil {
3✔
968
                        return ErrNoActiveChannels
×
969
                }
×
970

971
                // Within the node channel bucket, are the set of node pubkeys
972
                // we have channels with, we don't know the entire set, so we'll
973
                // check them all.
974
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
6✔
975
                        // Ensure that this is a key the same size as a pubkey,
3✔
976
                        // and also that it leads directly to a bucket.
3✔
977
                        if len(nodePub) != 33 || v != nil {
3✔
978
                                return nil
×
979
                        }
×
980

981
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
982
                                nodePub,
3✔
983
                        )
3✔
984
                        if nodeChanBucket == nil {
3✔
985
                                return nil
×
986
                        }
×
987

988
                        // The next layer down is all the chains that this node
989
                        // has channels on with us.
990
                        return nodeChanBucket.ForEach(func(chainHash,
3✔
991
                                v []byte) error {
6✔
992

3✔
993
                                // If there's a value, it's not a bucket so
3✔
994
                                // ignore it.
3✔
995
                                if v != nil {
3✔
996
                                        return nil
×
997
                                }
×
998

999
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
1000
                                        chainHash,
3✔
1001
                                )
3✔
1002
                                if chainBucket == nil {
3✔
1003
                                        return fmt.Errorf("unable to read "+
×
1004
                                                "bucket for chain=%x",
×
1005
                                                chainHash)
×
1006
                                }
×
1007

1008
                                // Finally, we reach the leaf bucket that stores
1009
                                // all the chanPoints for this node.
1010
                                targetChanBytes, chanPoint, err := chanSelect(
3✔
1011
                                        chainBucket,
3✔
1012
                                )
3✔
1013
                                if errors.Is(err, ErrChannelNotFound) {
3✔
UNCOV
1014
                                        return nil
×
1015
                                } else if err != nil {
3✔
1016
                                        return err
×
1017
                                }
×
1018

1019
                                chanBucket := chainBucket.NestedReadBucket(
3✔
1020
                                        targetChanBytes,
3✔
1021
                                )
3✔
1022
                                if chanBucket == nil {
6✔
1023
                                        return nil
3✔
1024
                                }
3✔
1025

1026
                                channel, err := fetchOpenChannel(
3✔
1027
                                        chanBucket, chanPoint,
3✔
1028
                                )
3✔
1029
                                if err != nil {
3✔
1030
                                        return err
×
1031
                                }
×
1032

1033
                                targetChan = channel
3✔
1034
                                targetChan.Db = c
3✔
1035

3✔
1036
                                return errChanFound
3✔
1037
                        })
1038
                })
1039
        }
1040

1041
        var err error
3✔
1042
        if tx == nil {
6✔
1043
                err = kvdb.View(c.backend, chanScan, func() {})
6✔
1044
        } else {
×
1045
                err = chanScan(tx)
×
1046
        }
×
1047
        if err != nil && !errors.Is(err, errChanFound) {
3✔
1048
                return nil, err
×
1049
        }
×
1050

1051
        if targetChan != nil {
6✔
1052
                return targetChan, nil
3✔
1053
        }
3✔
1054

1055
        // If we can't find the channel, then we return with an error, as we
1056
        // have nothing to back up.
1057
        return nil, ErrChannelNotFound
3✔
1058
}
1059

1060
// FetchAllChannels attempts to retrieve all open channels currently stored
1061
// within the database, including pending open, fully open and channels waiting
1062
// for a closing transaction to confirm.
1063
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
3✔
1064
        return fetchChannels(c)
3✔
1065
}
3✔
1066

1067
// FetchAllOpenChannels will return all channels that have the funding
1068
// transaction confirmed, and is not waiting for a closing transaction to be
1069
// confirmed.
1070
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
3✔
1071
        return fetchChannels(
3✔
1072
                c,
3✔
1073
                pendingChannelFilter(false),
3✔
1074
                waitingCloseFilter(false),
3✔
1075
        )
3✔
1076
}
3✔
1077

1078
// FetchPendingChannels will return channels that have completed the process of
1079
// generating and broadcasting funding transactions, but whose funding
1080
// transactions have yet to be confirmed on the blockchain.
1081
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
3✔
1082
        return fetchChannels(c,
3✔
1083
                pendingChannelFilter(true),
3✔
1084
                waitingCloseFilter(false),
3✔
1085
        )
3✔
1086
}
3✔
1087

1088
// FetchWaitingCloseChannels will return all channels that have been opened,
1089
// but are now waiting for a closing transaction to be confirmed.
1090
//
1091
// NOTE: This includes channels that are also pending to be opened.
1092
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
3✔
1093
        return fetchChannels(
3✔
1094
                c, waitingCloseFilter(true),
3✔
1095
        )
3✔
1096
}
3✔
1097

1098
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
1099
// A set of filters can be combined to filter across multiple dimensions.
1100
type fetchChannelsFilter func(channel *OpenChannel) bool
1101

1102
// pendingChannelFilter returns a filter based on whether channels are pending
1103
// (ie, their funding transaction still needs to confirm). If pending is false,
1104
// channels with confirmed funding transactions are returned.
1105
func pendingChannelFilter(pending bool) fetchChannelsFilter {
3✔
1106
        return func(channel *OpenChannel) bool {
6✔
1107
                return channel.IsPending == pending
3✔
1108
        }
3✔
1109
}
1110

1111
// waitingCloseFilter returns a filter which filters channels based on whether
1112
// they are awaiting the confirmation of their closing transaction. If waiting
1113
// close is true, channels that have had their closing tx broadcast are
1114
// included. If it is false, channels that are not awaiting confirmation of
1115
// their close transaction are returned.
1116
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
3✔
1117
        return func(channel *OpenChannel) bool {
6✔
1118
                // If the channel is in any other state than Default,
3✔
1119
                // then it means it is waiting to be closed.
3✔
1120
                channelWaitingClose :=
3✔
1121
                        channel.ChanStatus() != ChanStatusDefault
3✔
1122

3✔
1123
                // Include the channel if it matches the value for
3✔
1124
                // waiting close that we are filtering on.
3✔
1125
                return channelWaitingClose == waitingClose
3✔
1126
        }
3✔
1127
}
1128

1129
// fetchChannels attempts to retrieve channels currently stored in the
1130
// database. It takes a set of filters which are applied to each channel to
1131
// obtain a set of channels with the desired set of properties. Only channels
1132
// which have a true value returned for *all* of the filters will be returned.
1133
// If no filters are provided, every channel in the open channels bucket will
1134
// be returned.
1135
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
1136
        []*OpenChannel, error) {
3✔
1137

3✔
1138
        var channels []*OpenChannel
3✔
1139

3✔
1140
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1141
                // Get the bucket dedicated to storing the metadata for open
3✔
1142
                // channels.
3✔
1143
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
1144
                if openChanBucket == nil {
3✔
1145
                        return ErrNoActiveChannels
×
1146
                }
×
1147

1148
                // Next, fetch the bucket dedicated to storing metadata related
1149
                // to all nodes. All keys within this bucket are the serialized
1150
                // public keys of all our direct counterparties.
1151
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
3✔
1152
                if nodeMetaBucket == nil {
3✔
1153
                        return fmt.Errorf("node bucket not created")
×
1154
                }
×
1155

1156
                // Finally for each node public key in the bucket, fetch all
1157
                // the channels related to this particular node.
1158
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
6✔
1159
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
3✔
1160
                        if nodeChanBucket == nil {
3✔
1161
                                return nil
×
1162
                        }
×
1163

1164
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
1165
                                // If there's a value, it's not a bucket so
3✔
1166
                                // ignore it.
3✔
1167
                                if v != nil {
3✔
1168
                                        return nil
×
1169
                                }
×
1170

1171
                                // If we've found a valid chainhash bucket,
1172
                                // then we'll retrieve that so we can extract
1173
                                // all the channels.
1174
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
1175
                                        chainHash,
3✔
1176
                                )
3✔
1177
                                if chainBucket == nil {
3✔
1178
                                        return fmt.Errorf("unable to read "+
×
1179
                                                "bucket for chain=%x", chainHash[:])
×
1180
                                }
×
1181

1182
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
3✔
1183
                                if err != nil {
3✔
1184
                                        return fmt.Errorf("unable to read "+
×
1185
                                                "channel for chain_hash=%x, "+
×
1186
                                                "node_key=%x: %v", chainHash[:], k, err)
×
1187
                                }
×
1188
                                for _, channel := range nodeChans {
6✔
1189
                                        // includeChannel indicates whether the channel
3✔
1190
                                        // meets the criteria specified by our filters.
3✔
1191
                                        includeChannel := true
3✔
1192

3✔
1193
                                        // Run through each filter and check whether the
3✔
1194
                                        // channel should be included.
3✔
1195
                                        for _, f := range filters {
6✔
1196
                                                // If the channel fails the filter, set
3✔
1197
                                                // includeChannel to false and don't bother
3✔
1198
                                                // checking the remaining filters.
3✔
1199
                                                if !f(channel) {
6✔
1200
                                                        includeChannel = false
3✔
1201
                                                        break
3✔
1202
                                                }
1203
                                        }
1204

1205
                                        // If the channel passed every filter, include it in
1206
                                        // our set of channels.
1207
                                        if includeChannel {
6✔
1208
                                                channels = append(channels, channel)
3✔
1209
                                        }
3✔
1210
                                }
1211
                                return nil
3✔
1212
                        })
1213

1214
                })
1215
        }, func() {
3✔
1216
                channels = nil
3✔
1217
        })
3✔
1218
        if err != nil {
3✔
1219
                return nil, err
×
1220
        }
×
1221

1222
        return channels, nil
3✔
1223
}
1224

1225
// FetchClosedChannels attempts to fetch all closed channels from the database.
1226
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1227
// be returned in the response or not. When a channel was cooperatively closed,
1228
// it becomes fully closed after a single confirmation.  When a channel was
1229
// forcibly closed, it will become fully closed after _all_ the pending funds
1230
// (if any) have been swept.
1231
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1232
        []*ChannelCloseSummary, error) {
3✔
1233

3✔
1234
        var chanSummaries []*ChannelCloseSummary
3✔
1235

3✔
1236
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1237
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1238
                if closeBucket == nil {
3✔
1239
                        return ErrNoClosedChannels
×
1240
                }
×
1241

1242
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
6✔
1243
                        summaryReader := bytes.NewReader(summaryBytes)
3✔
1244
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
3✔
1245
                        if err != nil {
3✔
1246
                                return err
×
1247
                        }
×
1248

1249
                        // If the query specified to only include pending
1250
                        // channels, then we'll skip any channels which aren't
1251
                        // currently pending.
1252
                        if !chanSummary.IsPending && pendingOnly {
6✔
1253
                                return nil
3✔
1254
                        }
3✔
1255

1256
                        chanSummaries = append(chanSummaries, chanSummary)
3✔
1257
                        return nil
3✔
1258
                })
1259
        }, func() {
3✔
1260
                chanSummaries = nil
3✔
1261
        }); err != nil {
3✔
1262
                return nil, err
×
1263
        }
×
1264

1265
        return chanSummaries, nil
3✔
1266
}
1267

1268
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1269
// the channeldb.
1270
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1271

1272
// FetchClosedChannel queries for a channel close summary using the channel
1273
// point of the channel in question.
1274
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1275
        *ChannelCloseSummary, error) {
3✔
1276

3✔
1277
        var chanSummary *ChannelCloseSummary
3✔
1278
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1279
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1280
                if closeBucket == nil {
3✔
1281
                        return ErrClosedChannelNotFound
×
1282
                }
×
1283

1284
                var b bytes.Buffer
3✔
1285
                var err error
3✔
1286
                if err = graphdb.WriteOutpoint(&b, chanID); err != nil {
3✔
1287
                        return err
×
1288
                }
×
1289

1290
                summaryBytes := closeBucket.Get(b.Bytes())
3✔
1291
                if summaryBytes == nil {
3✔
UNCOV
1292
                        return ErrClosedChannelNotFound
×
UNCOV
1293
                }
×
1294

1295
                summaryReader := bytes.NewReader(summaryBytes)
3✔
1296
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
3✔
1297

3✔
1298
                return err
3✔
1299
        }, func() {
3✔
1300
                chanSummary = nil
3✔
1301
        }); err != nil {
3✔
UNCOV
1302
                return nil, err
×
UNCOV
1303
        }
×
1304

1305
        return chanSummary, nil
3✔
1306
}
1307

1308
// FetchClosedChannelForID queries for a channel close summary using the
1309
// channel ID of the channel in question.
1310
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1311
        *ChannelCloseSummary, error) {
3✔
1312

3✔
1313
        var chanSummary *ChannelCloseSummary
3✔
1314
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1315
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1316
                if closeBucket == nil {
3✔
1317
                        return ErrClosedChannelNotFound
×
1318
                }
×
1319

1320
                // The first 30 bytes of the channel ID and outpoint will be
1321
                // equal.
1322
                cursor := closeBucket.ReadCursor()
3✔
1323
                op, c := cursor.Seek(cid[:30])
3✔
1324

3✔
1325
                // We scan over all possible candidates for this channel ID.
3✔
1326
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
6✔
1327
                        var outPoint wire.OutPoint
3✔
1328
                        err := graphdb.ReadOutpoint(
3✔
1329
                                bytes.NewReader(op), &outPoint,
3✔
1330
                        )
3✔
1331
                        if err != nil {
3✔
1332
                                return err
×
1333
                        }
×
1334

1335
                        // If the found outpoint does not correspond to this
1336
                        // channel ID, we continue.
1337
                        if !cid.IsChanPoint(&outPoint) {
3✔
UNCOV
1338
                                continue
×
1339
                        }
1340

1341
                        // Deserialize the close summary and return.
1342
                        r := bytes.NewReader(c)
3✔
1343
                        chanSummary, err = deserializeCloseChannelSummary(r)
3✔
1344
                        if err != nil {
3✔
1345
                                return err
×
1346
                        }
×
1347

1348
                        return nil
3✔
1349
                }
1350
                return ErrClosedChannelNotFound
3✔
1351
        }, func() {
3✔
1352
                chanSummary = nil
3✔
1353
        }); err != nil {
6✔
1354
                return nil, err
3✔
1355
        }
3✔
1356

1357
        return chanSummary, nil
3✔
1358
}
1359

1360
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1361
// channel should be marked as fully closed if the channel was initially
1362
// cooperatively closed and it's reached a single confirmation, or after all
1363
// the pending funds in a channel that has been forcibly closed have been
1364
// swept.
1365
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
3✔
1366
        var (
3✔
1367
                openChannels  []*OpenChannel
3✔
1368
                pruneLinkNode *btcec.PublicKey
3✔
1369
        )
3✔
1370
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1371
                var b bytes.Buffer
3✔
1372
                if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
3✔
1373
                        return err
×
1374
                }
×
1375

1376
                chanID := b.Bytes()
3✔
1377

3✔
1378
                closedChanBucket, err := tx.CreateTopLevelBucket(
3✔
1379
                        closedChannelBucket,
3✔
1380
                )
3✔
1381
                if err != nil {
3✔
1382
                        return err
×
1383
                }
×
1384

1385
                chanSummaryBytes := closedChanBucket.Get(chanID)
3✔
1386
                if chanSummaryBytes == nil {
3✔
1387
                        return fmt.Errorf("no closed channel for "+
×
1388
                                "chan_point=%v found", chanPoint)
×
1389
                }
×
1390

1391
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
3✔
1392
                chanSummary, err := deserializeCloseChannelSummary(
3✔
1393
                        chanSummaryReader,
3✔
1394
                )
3✔
1395
                if err != nil {
3✔
1396
                        return err
×
1397
                }
×
1398

1399
                chanSummary.IsPending = false
3✔
1400

3✔
1401
                var newSummary bytes.Buffer
3✔
1402
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
3✔
1403
                if err != nil {
3✔
1404
                        return err
×
1405
                }
×
1406

1407
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
3✔
1408
                if err != nil {
3✔
1409
                        return err
×
1410
                }
×
1411

1412
                // Now that the channel is closed, we'll check if we have any
1413
                // other open channels with this peer. If we don't we'll
1414
                // garbage collect it to ensure we don't establish persistent
1415
                // connections to peers without open channels.
1416
                pruneLinkNode = chanSummary.RemotePub
3✔
1417
                openChannels, err = c.fetchOpenChannels(
3✔
1418
                        tx, pruneLinkNode,
3✔
1419
                )
3✔
1420
                if err != nil {
3✔
1421
                        return fmt.Errorf("unable to fetch open channels for "+
×
1422
                                "peer %x: %v",
×
1423
                                pruneLinkNode.SerializeCompressed(), err)
×
1424
                }
×
1425

1426
                return nil
3✔
1427
        }, func() {
3✔
1428
                openChannels = nil
3✔
1429
                pruneLinkNode = nil
3✔
1430
        })
3✔
1431
        if err != nil {
3✔
1432
                return err
×
1433
        }
×
1434

1435
        // Decide whether we want to remove the link node, based upon the number
1436
        // of still open channels.
1437
        return c.pruneLinkNode(openChannels, pruneLinkNode)
3✔
1438
}
1439

1440
// pruneLinkNode determines whether we should garbage collect a link node from
1441
// the database due to no longer having any open channels with it. If there are
1442
// any left, then this acts as a no-op.
1443
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1444
        remotePub *btcec.PublicKey) error {
3✔
1445

3✔
1446
        if len(openChannels) > 0 {
6✔
1447
                return nil
3✔
1448
        }
3✔
1449

1450
        log.Infof("Pruning link node %x with zero open channels from database",
3✔
1451
                remotePub.SerializeCompressed())
3✔
1452

3✔
1453
        return c.linkNodeDB.DeleteLinkNode(remotePub)
3✔
1454
}
1455

1456
// PruneLinkNodes attempts to prune all link nodes found within the database
1457
// with whom we no longer have any open channels with.
1458
func (c *ChannelStateDB) PruneLinkNodes() error {
3✔
1459
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
3✔
1460
        if err != nil {
3✔
1461
                return err
×
1462
        }
×
1463

1464
        for _, linkNode := range allLinkNodes {
6✔
1465
                var (
3✔
1466
                        openChannels []*OpenChannel
3✔
1467
                        linkNode     = linkNode
3✔
1468
                )
3✔
1469
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1470
                        var err error
3✔
1471
                        openChannels, err = c.fetchOpenChannels(
3✔
1472
                                tx, linkNode.IdentityPub,
3✔
1473
                        )
3✔
1474
                        return err
3✔
1475
                }, func() {
6✔
1476
                        openChannels = nil
3✔
1477
                })
3✔
1478
                if err != nil {
3✔
1479
                        return err
×
1480
                }
×
1481

1482
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
3✔
1483
                if err != nil {
3✔
1484
                        return err
×
1485
                }
×
1486
        }
1487

1488
        return nil
3✔
1489
}
1490

1491
// ChannelShell is a shell of a channel that is meant to be used for channel
1492
// recovery purposes. It contains a minimal OpenChannel instance along with
1493
// addresses for that target node.
1494
type ChannelShell struct {
1495
        // NodeAddrs the set of addresses that this node has known to be
1496
        // reachable at in the past.
1497
        NodeAddrs []net.Addr
1498

1499
        // Chan is a shell of an OpenChannel, it contains only the items
1500
        // required to restore the channel on disk.
1501
        Chan *OpenChannel
1502
}
1503

1504
// RestoreChannelShells is a method that allows the caller to reconstruct the
1505
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1506
// new channel to disk, create a LinkNode instance with the passed node
1507
// addresses, and finally create an edge within the graph for the channel as
1508
// well. This method is idempotent, so repeated calls with the same set of
1509
// channel shells won't modify the database after the initial call.
1510
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
3✔
1511
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1512
                for _, channelShell := range channelShells {
6✔
1513
                        channel := channelShell.Chan
3✔
1514

3✔
1515
                        // When we make a channel, we mark that the channel has
3✔
1516
                        // been restored, this will signal to other sub-systems
3✔
1517
                        // to not attempt to use the channel as if it was a
3✔
1518
                        // regular one.
3✔
1519
                        channel.chanStatus |= ChanStatusRestored
3✔
1520

3✔
1521
                        // First, we'll attempt to create a new open channel
3✔
1522
                        // and link node for this channel. If the channel
3✔
1523
                        // already exists, then in order to ensure this method
3✔
1524
                        // is idempotent, we'll continue to the next step.
3✔
1525
                        channel.Db = c
3✔
1526
                        err := syncNewChannel(
3✔
1527
                                tx, channel, channelShell.NodeAddrs,
3✔
1528
                        )
3✔
1529
                        if err != nil {
6✔
1530
                                return err
3✔
1531
                        }
3✔
1532
                }
1533

1534
                return nil
3✔
1535
        }, func() {})
3✔
1536
        if err != nil {
6✔
1537
                return err
3✔
1538
        }
3✔
1539

1540
        return nil
3✔
1541
}
1542

1543
// AddrsForNode consults the channel database for all addresses known to the
1544
// passed node public key. The returned boolean indicates if the given node is
1545
// unknown to the channel DB or not.
1546
//
1547
// NOTE: this is part of the AddrSource interface.
1548
func (d *DB) AddrsForNode(_ context.Context, nodePub *btcec.PublicKey) (bool,
1549
        []net.Addr, error) {
3✔
1550

3✔
1551
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
3✔
1552
        // Only if the error is something other than ErrNodeNotFound do we
3✔
1553
        // return it.
3✔
1554
        switch {
3✔
1555
        case err != nil && !errors.Is(err, ErrNodeNotFound):
×
1556
                return false, nil, err
×
1557

1558
        case errors.Is(err, ErrNodeNotFound):
×
1559
                return false, nil, nil
×
1560
        }
1561

1562
        return true, linkNode.Addresses, nil
3✔
1563
}
1564

1565
// AbandonChannel attempts to remove the target channel from the open channel
1566
// database. If the channel was already removed (has a closed channel entry),
1567
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1568
// into the database.
1569
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1570
        bestHeight uint32) error {
3✔
1571

3✔
1572
        // With the chanPoint constructed, we'll attempt to find the target
3✔
1573
        // channel in the database. If we can't find the channel, then we'll
3✔
1574
        // return the error back to the caller.
3✔
1575
        dbChan, err := c.FetchChannel(*chanPoint)
3✔
1576
        switch {
3✔
1577
        // If the channel wasn't found, then it's possible that it was already
1578
        // abandoned from the database.
1579
        case err == ErrChannelNotFound:
3✔
1580
                _, closedErr := c.FetchClosedChannel(chanPoint)
3✔
1581
                if closedErr != nil {
3✔
UNCOV
1582
                        return closedErr
×
UNCOV
1583
                }
×
1584

1585
                // If the channel was already closed, then we don't return an
1586
                // error as we'd like this step to be repeatable.
1587
                return nil
3✔
1588
        case err != nil:
×
1589
                return err
×
1590
        }
1591

1592
        // Now that we've found the channel, we'll populate a close summary for
1593
        // the channel, so we can store as much information for this abounded
1594
        // channel as possible. We also ensure that we set Pending to false, to
1595
        // indicate that this channel has been "fully" closed.
1596
        summary := &ChannelCloseSummary{
3✔
1597
                CloseType:               Abandoned,
3✔
1598
                ChanPoint:               *chanPoint,
3✔
1599
                ChainHash:               dbChan.ChainHash,
3✔
1600
                CloseHeight:             bestHeight,
3✔
1601
                RemotePub:               dbChan.IdentityPub,
3✔
1602
                Capacity:                dbChan.Capacity,
3✔
1603
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
3✔
1604
                ShortChanID:             dbChan.ShortChanID(),
3✔
1605
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
3✔
1606
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
3✔
1607
                LocalChanConfig:         dbChan.LocalChanCfg,
3✔
1608
        }
3✔
1609

3✔
1610
        // Finally, we'll close the channel in the DB, and return back to the
3✔
1611
        // caller. We set ourselves as the close initiator because we abandoned
3✔
1612
        // the channel.
3✔
1613
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
3✔
1614
}
1615

1616
// SaveChannelOpeningState saves the serialized channel state for the provided
1617
// chanPoint to the channelOpeningStateBucket.
1618
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1619
        serializedState []byte) error {
3✔
1620

3✔
1621
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1622
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
3✔
1623
                if err != nil {
3✔
1624
                        return err
×
1625
                }
×
1626

1627
                return bucket.Put(outPoint, serializedState)
3✔
1628
        }, func() {})
3✔
1629
}
1630

1631
// GetChannelOpeningState fetches the serialized channel state for the provided
1632
// outPoint from the database, or returns ErrChannelNotFound if the channel
1633
// is not found.
1634
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1635
        error) {
3✔
1636

3✔
1637
        var serializedState []byte
3✔
1638
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1639
                bucket := tx.ReadBucket(channelOpeningStateBucket)
3✔
1640
                if bucket == nil {
6✔
1641
                        // If the bucket does not exist, it means we never added
3✔
1642
                        //  a channel to the db, so return ErrChannelNotFound.
3✔
1643
                        return ErrChannelNotFound
3✔
1644
                }
3✔
1645

1646
                stateBytes := bucket.Get(outPoint)
3✔
1647
                if stateBytes == nil {
6✔
1648
                        return ErrChannelNotFound
3✔
1649
                }
3✔
1650

1651
                serializedState = append(serializedState, stateBytes...)
3✔
1652

3✔
1653
                return nil
3✔
1654
        }, func() {
3✔
1655
                serializedState = nil
3✔
1656
        })
3✔
1657
        return serializedState, err
3✔
1658
}
1659

1660
// DeleteChannelOpeningState removes any state for outPoint from the database.
1661
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
3✔
1662
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1663
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
3✔
1664
                if bucket == nil {
3✔
1665
                        return ErrChannelNotFound
×
1666
                }
×
1667

1668
                return bucket.Delete(outPoint)
3✔
1669
        }, func() {})
3✔
1670
}
1671

1672
// syncVersions function is used for safe db version synchronization. It
1673
// applies migration functions to the current database and recovers the
1674
// previous state of db if at least one error/panic appeared during migration.
1675
func (d *DB) syncVersions(versions []mandatoryVersion) error {
3✔
1676
        meta, err := d.FetchMeta()
3✔
1677
        if err != nil {
3✔
1678
                if err == ErrMetaNotFound {
×
1679
                        meta = &Meta{}
×
1680
                } else {
×
1681
                        return err
×
1682
                }
×
1683
        }
1684

1685
        latestVersion := getLatestDBVersion(versions)
3✔
1686
        log.Infof("Checking for schema update: latest_version=%v, "+
3✔
1687
                "db_version=%v", latestVersion, meta.DbVersionNumber)
3✔
1688

3✔
1689
        switch {
3✔
1690

1691
        // If the database reports a higher version that we are aware of, the
1692
        // user is probably trying to revert to a prior version of lnd. We fail
1693
        // here to prevent reversions and unintended corruption.
UNCOV
1694
        case meta.DbVersionNumber > latestVersion:
×
UNCOV
1695
                log.Errorf("Refusing to revert from db_version=%d to "+
×
UNCOV
1696
                        "lower version=%d", meta.DbVersionNumber,
×
UNCOV
1697
                        latestVersion)
×
UNCOV
1698
                return ErrDBReversion
×
1699

1700
        // If the current database version matches the latest version number,
1701
        // then we don't need to perform any migrations.
1702
        case meta.DbVersionNumber == latestVersion:
3✔
1703
                return nil
3✔
1704
        }
1705

UNCOV
1706
        log.Infof("Performing database schema migration")
×
UNCOV
1707

×
UNCOV
1708
        // Otherwise, we fetch the migrations which need to applied, and
×
UNCOV
1709
        // execute them serially within a single database transaction to ensure
×
UNCOV
1710
        // the migration is atomic.
×
UNCOV
1711
        migrations, migrationVersions := getMigrationsToApply(
×
UNCOV
1712
                versions, meta.DbVersionNumber,
×
UNCOV
1713
        )
×
UNCOV
1714
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
1715
                for i, migration := range migrations {
×
UNCOV
1716
                        if migration == nil {
×
1717
                                continue
×
1718
                        }
1719

UNCOV
1720
                        log.Infof("Applying migration #%v",
×
UNCOV
1721
                                migrationVersions[i])
×
UNCOV
1722

×
UNCOV
1723
                        if err := migration(tx); err != nil {
×
UNCOV
1724
                                log.Infof("Unable to apply migration #%v",
×
UNCOV
1725
                                        migrationVersions[i])
×
UNCOV
1726
                                return err
×
UNCOV
1727
                        }
×
1728
                }
1729

UNCOV
1730
                meta.DbVersionNumber = latestVersion
×
UNCOV
1731
                err := putMeta(meta, tx)
×
UNCOV
1732
                if err != nil {
×
1733
                        return err
×
1734
                }
×
1735

1736
                // In dry-run mode, return an error to prevent the transaction
1737
                // from committing.
UNCOV
1738
                if d.dryRun {
×
UNCOV
1739
                        return ErrDryRunMigrationOK
×
UNCOV
1740
                }
×
1741

UNCOV
1742
                return nil
×
UNCOV
1743
        }, func() {})
×
1744
}
1745

1746
// applyOptionalVersions applies the optional migrations to the database if
1747
// specified in the config.
1748
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
3✔
1749
        // TODO(yy): need to design the db to support dry run for optional
3✔
1750
        // migrations.
3✔
1751
        if d.dryRun {
3✔
UNCOV
1752
                log.Info("Skipped optional migrations as dry run mode is not " +
×
UNCOV
1753
                        "supported yet")
×
UNCOV
1754
                return nil
×
UNCOV
1755
        }
×
1756

1757
        om, err := d.fetchOptionalMeta()
3✔
1758
        if err != nil {
3✔
1759
                if err == ErrMetaNotFound {
×
1760
                        om = &OptionalMeta{
×
1761
                                Versions: make(map[uint64]string),
×
1762
                        }
×
1763
                } else {
×
1764
                        return fmt.Errorf("unable to fetch optional "+
×
1765
                                "meta: %w", err)
×
1766
                }
×
1767
        }
1768

1769
        // migrationCfg is the parent configuration which implements the config
1770
        // interfaces of all the single optional migrations.
1771
        migrationCfg := &MigrationConfigImpl{
3✔
1772
                migration30.MigrateRevLogConfigImpl{
3✔
1773
                        NoAmountData: d.noRevLogAmtData,
3✔
1774
                },
3✔
1775
                migration34.MigrationConfigImpl{
3✔
1776
                        DecayedLog: cfg.DecayedLog,
3✔
1777
                },
3✔
1778
        }
3✔
1779

3✔
1780
        log.Infof("Applying %d optional migrations", len(optionalVersions))
3✔
1781

3✔
1782
        // Apply the optional migrations if requested.
3✔
1783
        for number, version := range optionalVersions {
6✔
1784
                log.Infof("Checking for optional update: name=%v", version.name)
3✔
1785

3✔
1786
                // Exit early if the optional migration is not specified.
3✔
1787
                if !cfg.MigrationFlags[number] {
6✔
1788
                        log.Debugf("Skipping optional migration: name=%s as "+
3✔
1789
                                "it is not specified in the config",
3✔
1790
                                version.name)
3✔
1791

3✔
1792
                        continue
3✔
1793
                }
1794

1795
                // Exit early if the optional migration has already been
1796
                // applied.
1797
                if _, ok := om.Versions[uint64(number)]; ok {
6✔
1798
                        log.Debugf("Skipping optional migration: name=%s as "+
3✔
1799
                                "it has already been applied", version.name)
3✔
1800

3✔
1801
                        continue
3✔
1802
                }
1803

1804
                log.Infof("Performing database optional migration: %s",
3✔
1805
                        version.name)
3✔
1806

3✔
1807
                // Call the migration function for the specific optional
3✔
1808
                // migration.
3✔
1809
                if err := version.migration(d, migrationCfg); err != nil {
3✔
1810
                        log.Errorf("Unable to apply optional migration: %s, "+
×
1811
                                "error: %v", version.name, err)
×
1812
                        return err
×
1813
                }
×
1814

1815
                // Update the optional meta. Notice that unlike the mandatory db
1816
                // migrations where we perform the migration and updating meta
1817
                // in a single db transaction, we use different transactions
1818
                // here. Even when the following update is failed, we should be
1819
                // fine here as we would re-run the optional migration again,
1820
                // which is a noop, during next startup.
1821
                om.Versions[uint64(number)] = version.name
3✔
1822
                if err := d.putOptionalMeta(om); err != nil {
3✔
1823
                        log.Errorf("Unable to update optional meta: %v", err)
×
1824
                        return err
×
1825
                }
×
1826

1827
                log.Infof("Successfully applied optional migration: %s",
3✔
1828
                        version.name)
3✔
1829
        }
1830

1831
        return nil
3✔
1832
}
1833

1834
// ChannelStateDB returns the sub database that is concerned with the channel
1835
// state.
1836
func (d *DB) ChannelStateDB() *ChannelStateDB {
3✔
1837
        return d.channelStateDB
3✔
1838
}
3✔
1839

1840
// LatestDBVersion returns the number of the latest database version currently
1841
// known to the channel DB.
UNCOV
1842
func LatestDBVersion() uint32 {
×
UNCOV
1843
        return getLatestDBVersion(dbVersions)
×
UNCOV
1844
}
×
1845

1846
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
3✔
1847
        return versions[len(versions)-1].number
3✔
1848
}
3✔
1849

1850
// getMigrationsToApply retrieves the migration function that should be
1851
// applied to the database.
1852
func getMigrationsToApply(versions []mandatoryVersion,
UNCOV
1853
        version uint32) ([]migration, []uint32) {
×
UNCOV
1854

×
UNCOV
1855
        migrations := make([]migration, 0, len(versions))
×
UNCOV
1856
        migrationVersions := make([]uint32, 0, len(versions))
×
UNCOV
1857

×
UNCOV
1858
        for _, v := range versions {
×
UNCOV
1859
                if v.number > version {
×
UNCOV
1860
                        migrations = append(migrations, v.migration)
×
UNCOV
1861
                        migrationVersions = append(migrationVersions, v.number)
×
UNCOV
1862
                }
×
1863
        }
1864

UNCOV
1865
        return migrations, migrationVersions
×
1866
}
1867

1868
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1869
// from the historical channel bucket. If the bucket does not exist,
1870
// ErrNoHistoricalBucket is returned.
1871
func fetchHistoricalChanBucket(tx kvdb.RTx,
1872
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
3✔
1873

3✔
1874
        // First fetch the top level bucket which stores all data related to
3✔
1875
        // historically stored channels.
3✔
1876
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
3✔
1877
        if historicalChanBucket == nil {
3✔
1878
                return nil, ErrNoHistoricalBucket
×
1879
        }
×
1880

1881
        // With the bucket for the node and chain fetched, we can now go down
1882
        // another level, for the channel itself.
1883
        var chanPointBuf bytes.Buffer
3✔
1884
        if err := graphdb.WriteOutpoint(&chanPointBuf, outPoint); err != nil {
3✔
1885
                return nil, err
×
1886
        }
×
1887
        chanBucket := historicalChanBucket.NestedReadBucket(
3✔
1888
                chanPointBuf.Bytes(),
3✔
1889
        )
3✔
1890
        if chanBucket == nil {
3✔
UNCOV
1891
                return nil, ErrChannelNotFound
×
UNCOV
1892
        }
×
1893

1894
        return chanBucket, nil
3✔
1895
}
1896

1897
// FetchHistoricalChannel fetches open channel data from the historical channel
1898
// bucket.
1899
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1900
        *OpenChannel, error) {
3✔
1901

3✔
1902
        var channel *OpenChannel
3✔
1903
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1904
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
3✔
1905
                if err != nil {
3✔
UNCOV
1906
                        return err
×
UNCOV
1907
                }
×
1908

1909
                channel, err = fetchOpenChannel(chanBucket, outPoint)
3✔
1910
                if err != nil {
3✔
1911
                        return err
×
1912
                }
×
1913

1914
                channel.Db = c
3✔
1915
                return nil
3✔
1916
        }, func() {
3✔
1917
                channel = nil
3✔
1918
        })
3✔
1919
        if err != nil {
3✔
UNCOV
1920
                return nil, err
×
UNCOV
1921
        }
×
1922

1923
        return channel, nil
3✔
1924
}
1925

1926
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1927
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
3✔
1928

3✔
1929
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
3✔
1930
        if finalHtlcsBucket == nil {
6✔
1931
                return nil, ErrFinalHtlcsBucketNotFound
3✔
1932
        }
3✔
1933

1934
        var chanIDBytes [8]byte
3✔
1935
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
3✔
1936

3✔
1937
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
3✔
1938
        if chanBucket == nil {
3✔
1939
                return nil, ErrFinalChannelBucketNotFound
×
1940
        }
×
1941

1942
        return chanBucket, nil
3✔
1943
}
1944

1945
var ErrHtlcUnknown = errors.New("htlc unknown")
1946

1947
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1948
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1949
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1950
        htlcIndex uint64) (*FinalHtlcInfo, error) {
3✔
1951

3✔
1952
        var idBytes [8]byte
3✔
1953
        byteOrder.PutUint64(idBytes[:], htlcIndex)
3✔
1954

3✔
1955
        var settledByte byte
3✔
1956

3✔
1957
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1958
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
3✔
1959
                        tx, chanID,
3✔
1960
                )
3✔
1961
                switch {
3✔
1962
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
3✔
1963
                        fallthrough
3✔
1964

1965
                case errors.Is(err, ErrFinalChannelBucketNotFound):
3✔
1966
                        return ErrHtlcUnknown
3✔
1967

1968
                case err != nil:
×
1969
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1970
                                err)
×
1971
                }
1972

1973
                value := finalHtlcsBucket.Get(idBytes[:])
3✔
1974
                if value == nil {
3✔
UNCOV
1975
                        return ErrHtlcUnknown
×
UNCOV
1976
                }
×
1977

1978
                if len(value) != 1 {
3✔
1979
                        return errors.New("unexpected final htlc value length")
×
1980
                }
×
1981

1982
                settledByte = value[0]
3✔
1983

3✔
1984
                return nil
3✔
1985
        }, func() {
3✔
1986
                settledByte = 0
3✔
1987
        })
3✔
1988
        if err != nil {
6✔
1989
                return nil, err
3✔
1990
        }
3✔
1991

1992
        info := FinalHtlcInfo{
3✔
1993
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
3✔
1994
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
3✔
1995
        }
3✔
1996

3✔
1997
        return &info, nil
3✔
1998
}
1999

2000
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
2001
// the database.
2002
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
2003
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
3✔
2004

3✔
2005
        // Skip if the user did not opt in to storing final resolutions.
3✔
2006
        if !c.parent.storeFinalHtlcResolutions {
6✔
2007
                return nil
3✔
2008
        }
3✔
2009

UNCOV
2010
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
×
UNCOV
2011
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
×
UNCOV
2012
                if err != nil {
×
2013
                        return err
×
2014
                }
×
2015

UNCOV
2016
                return putFinalHtlc(
×
UNCOV
2017
                        finalHtlcsBucket, htlcID,
×
UNCOV
2018
                        FinalHtlcInfo{
×
UNCOV
2019
                                Settled:  settled,
×
UNCOV
2020
                                Offchain: false,
×
UNCOV
2021
                        },
×
UNCOV
2022
                )
×
UNCOV
2023
        }, func() {})
×
2024
}
2025

2026
// MakeTestInvoiceDB is used to create a test invoice database for testing
2027
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
2028
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
UNCOV
2029
        invoices.InvoiceDB, error) {
×
UNCOV
2030

×
UNCOV
2031
        return MakeTestDB(t, modifiers...)
×
UNCOV
2032
}
×
2033

2034
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
2035
// A callback which cleans up the created temporary directories is also
2036
// returned and intended to be executed after the test completes.
UNCOV
2037
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
×
UNCOV
2038
        // First, create a temporary directory to be used for the duration of
×
UNCOV
2039
        // this test.
×
UNCOV
2040
        tempDirName := t.TempDir()
×
UNCOV
2041

×
UNCOV
2042
        // Next, create channeldb for the first time.
×
UNCOV
2043
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
×
UNCOV
2044
        if err != nil {
×
2045
                backendCleanup()
×
2046
                return nil, err
×
2047
        }
×
2048

UNCOV
2049
        cdb, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
2050
        if err != nil {
×
2051
                backendCleanup()
×
2052
                return nil, err
×
2053
        }
×
2054

UNCOV
2055
        t.Cleanup(func() {
×
UNCOV
2056
                cdb.Close()
×
UNCOV
2057
                backendCleanup()
×
UNCOV
2058
        })
×
2059

UNCOV
2060
        return cdb, nil
×
2061
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc