• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.58
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "net"
8
        "os"
9
        "testing"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/go-errors/errors"
15
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
16
        "github.com/lightningnetwork/lnd/channeldb/migration12"
17
        "github.com/lightningnetwork/lnd/channeldb/migration13"
18
        "github.com/lightningnetwork/lnd/channeldb/migration16"
19
        "github.com/lightningnetwork/lnd/channeldb/migration20"
20
        "github.com/lightningnetwork/lnd/channeldb/migration21"
21
        "github.com/lightningnetwork/lnd/channeldb/migration23"
22
        "github.com/lightningnetwork/lnd/channeldb/migration24"
23
        "github.com/lightningnetwork/lnd/channeldb/migration25"
24
        "github.com/lightningnetwork/lnd/channeldb/migration26"
25
        "github.com/lightningnetwork/lnd/channeldb/migration27"
26
        "github.com/lightningnetwork/lnd/channeldb/migration29"
27
        "github.com/lightningnetwork/lnd/channeldb/migration30"
28
        "github.com/lightningnetwork/lnd/channeldb/migration31"
29
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
30
        "github.com/lightningnetwork/lnd/clock"
31
        "github.com/lightningnetwork/lnd/invoices"
32
        "github.com/lightningnetwork/lnd/kvdb"
33
        "github.com/lightningnetwork/lnd/lnwire"
34
        "github.com/lightningnetwork/lnd/routing/route"
35
)
36

37
const (
38
        dbName = "channel.db"
39
)
40

41
var (
42
        // ErrDryRunMigrationOK signals that a migration executed successful,
43
        // but we intentionally did not commit the result.
44
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
45

46
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
47
        // bucket does not exist.
48
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
49
                "found")
50

51
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
52
        // final htlc outcomes does not exist.
53
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
54
                "bucket not found")
55
)
56

57
// migration is a function which takes a prior outdated version of the database
58
// instances and mutates the key/bucket structure to arrive at a more
59
// up-to-date version of the database.
60
type migration func(tx kvdb.RwTx) error
61

62
// mandatoryVersion defines a db version that must be applied before the lnd
63
// starts.
64
type mandatoryVersion struct {
65
        number    uint32
66
        migration migration
67
}
68

69
// MigrationConfig is an interface combines the config interfaces of all
70
// optional migrations.
71
type MigrationConfig interface {
72
        migration30.MigrateRevLogConfig
73
}
74

75
// MigrationConfigImpl is a super set of all the various migration configs and
76
// an implementation of MigrationConfig.
77
type MigrationConfigImpl struct {
78
        migration30.MigrateRevLogConfigImpl
79
}
80

81
// optionalMigration defines an optional migration function. When a migration
82
// is optional, it usually involves a large scale of changes that might touch
83
// millions of keys. Due to OOM concern, the update cannot be safely done
84
// within one db transaction. Thus, for optional migrations, they must take the
85
// db backend and construct transactions as needed.
86
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
87

88
// optionalVersion defines a db version that can be optionally applied. When
89
// applying migrations, we must apply all the mandatory migrations first before
90
// attempting optional ones.
91
type optionalVersion struct {
92
        name      string
93
        migration optionalMigration
94
}
95

96
var (
97
        // dbVersions is storing all mandatory versions of database. If current
98
        // version of database don't match with latest version this list will
99
        // be used for retrieving all migration function that are need to apply
100
        // to the current db.
101
        dbVersions = []mandatoryVersion{
102
                {
103
                        // The base DB version requires no migration.
104
                        number:    0,
105
                        migration: nil,
106
                },
107
                {
108
                        // The version of the database where two new indexes
109
                        // for the update time of node and channel updates were
110
                        // added.
111
                        number:    1,
112
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
113
                },
114
                {
115
                        // The DB version that added the invoice event time
116
                        // series.
117
                        number:    2,
118
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
119
                },
120
                {
121
                        // The DB version that updated the embedded invoice in
122
                        // outgoing payments to match the new format.
123
                        number:    3,
124
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
125
                },
126
                {
127
                        // The version of the database where every channel
128
                        // always has two entries in the edges bucket. If
129
                        // a policy is unknown, this will be represented
130
                        // by a special byte sequence.
131
                        number:    4,
132
                        migration: migration_01_to_11.MigrateEdgePolicies,
133
                },
134
                {
135
                        // The DB version where we persist each attempt to send
136
                        // an HTLC to a payment hash, and track whether the
137
                        // payment is in-flight, succeeded, or failed.
138
                        number:    5,
139
                        migration: migration_01_to_11.PaymentStatusesMigration,
140
                },
141
                {
142
                        // The DB version that properly prunes stale entries
143
                        // from the edge update index.
144
                        number:    6,
145
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
146
                },
147
                {
148
                        // The DB version that migrates the ChannelCloseSummary
149
                        // to a format where optional fields are indicated with
150
                        // boolean flags.
151
                        number:    7,
152
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
153
                },
154
                {
155
                        // The DB version that changes the gossiper's message
156
                        // store keys to account for the message's type and
157
                        // ShortChannelID.
158
                        number:    8,
159
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
160
                },
161
                {
162
                        // The DB version where the payments and payment
163
                        // statuses are moved to being stored in a combined
164
                        // bucket.
165
                        number:    9,
166
                        migration: migration_01_to_11.MigrateOutgoingPayments,
167
                },
168
                {
169
                        // The DB version where we started to store legacy
170
                        // payload information for all routes, as well as the
171
                        // optional TLV records.
172
                        number:    10,
173
                        migration: migration_01_to_11.MigrateRouteSerialization,
174
                },
175
                {
176
                        // Add invoice htlc and cltv delta fields.
177
                        number:    11,
178
                        migration: migration_01_to_11.MigrateInvoices,
179
                },
180
                {
181
                        // Migrate to TLV invoice bodies, add payment address
182
                        // and features, remove receipt.
183
                        number:    12,
184
                        migration: migration12.MigrateInvoiceTLV,
185
                },
186
                {
187
                        // Migrate to multi-path payments.
188
                        number:    13,
189
                        migration: migration13.MigrateMPP,
190
                },
191
                {
192
                        // Initialize payment address index and begin using it
193
                        // as the default index, falling back to payment hash
194
                        // index.
195
                        number:    14,
196
                        migration: mig.CreateTLB(payAddrIndexBucket),
197
                },
198
                {
199
                        // Initialize payment index bucket which will be used
200
                        // to index payments by sequence number. This index will
201
                        // be used to allow more efficient ListPayments queries.
202
                        number:    15,
203
                        migration: mig.CreateTLB(paymentsIndexBucket),
204
                },
205
                {
206
                        // Add our existing payments to the index bucket created
207
                        // in migration 15.
208
                        number:    16,
209
                        migration: migration16.MigrateSequenceIndex,
210
                },
211
                {
212
                        // Create a top level bucket which will store extra
213
                        // information about channel closes.
214
                        number:    17,
215
                        migration: mig.CreateTLB(closeSummaryBucket),
216
                },
217
                {
218
                        // Create a top level bucket which holds information
219
                        // about our peers.
220
                        number:    18,
221
                        migration: mig.CreateTLB(peersBucket),
222
                },
223
                {
224
                        // Create a top level bucket which holds outpoint
225
                        // information.
226
                        number:    19,
227
                        migration: mig.CreateTLB(outpointBucket),
228
                },
229
                {
230
                        // Migrate some data to the outpoint index.
231
                        number:    20,
232
                        migration: migration20.MigrateOutpointIndex,
233
                },
234
                {
235
                        // Migrate to length prefixed wire messages everywhere
236
                        // in the database.
237
                        number:    21,
238
                        migration: migration21.MigrateDatabaseWireMessages,
239
                },
240
                {
241
                        // Initialize set id index so that invoices can be
242
                        // queried by individual htlc sets.
243
                        number:    22,
244
                        migration: mig.CreateTLB(setIDIndexBucket),
245
                },
246
                {
247
                        number:    23,
248
                        migration: migration23.MigrateHtlcAttempts,
249
                },
250
                {
251
                        // Remove old forwarding packages of closed channels.
252
                        number:    24,
253
                        migration: migration24.MigrateFwdPkgCleanup,
254
                },
255
                {
256
                        // Save the initial local/remote balances in channel
257
                        // info.
258
                        number:    25,
259
                        migration: migration25.MigrateInitialBalances,
260
                },
261
                {
262
                        // Migrate the initial local/remote balance fields into
263
                        // tlv records.
264
                        number:    26,
265
                        migration: migration26.MigrateBalancesToTlvRecords,
266
                },
267
                {
268
                        // Patch the initial local/remote balance fields with
269
                        // empty values for historical channels.
270
                        number:    27,
271
                        migration: migration27.MigrateHistoricalBalances,
272
                },
273
                {
274
                        number:    28,
275
                        migration: mig.CreateTLB(chanIDBucket),
276
                },
277
                {
278
                        number:    29,
279
                        migration: migration29.MigrateChanID,
280
                },
281
                {
282
                        // Removes the "sweeper-last-tx" bucket. Although we
283
                        // do not have a mandatory version 30 we skip this
284
                        // version because its naming is already used for the
285
                        // first optional migration.
286
                        number:    31,
287
                        migration: migration31.DeleteLastPublishedTxTLB,
288
                },
289
        }
290

291
        // optionalVersions stores all optional migrations that are applied
292
        // after dbVersions.
293
        //
294
        // NOTE: optional migrations must be fault-tolerant and re-run already
295
        // migrated data must be noop, which means the migration must be able
296
        // to determine its state.
297
        optionalVersions = []optionalVersion{
298
                {
299
                        name: "prune revocation log",
300
                        migration: func(db kvdb.Backend,
301
                                cfg MigrationConfig) error {
×
302

×
303
                                return migration30.MigrateRevocationLog(db, cfg)
×
304
                        },
×
305
                },
306
        }
307

308
        // Big endian is the preferred byte order, due to cursor scans over
309
        // integer keys iterating in order.
310
        byteOrder = binary.BigEndian
311

312
        // channelOpeningStateBucket is the database bucket used to store the
313
        // channelOpeningState for each channel that is currently in the process
314
        // of being opened.
315
        channelOpeningStateBucket = []byte("channelOpeningState")
316
)
317

318
// DB is the primary datastore for the lnd daemon. The database stores
319
// information related to nodes, routing data, open/closed channels, fee
320
// schedules, and reputation data.
321
type DB struct {
322
        kvdb.Backend
323

324
        // channelStateDB separates all DB operations on channel state.
325
        channelStateDB *ChannelStateDB
326

327
        dbPath                    string
328
        graph                     *ChannelGraph
329
        clock                     clock.Clock
330
        dryRun                    bool
331
        keepFailedPaymentAttempts bool
332
        storeFinalHtlcResolutions bool
333

334
        // noRevLogAmtData if true, means that commitment transaction amount
335
        // data should not be stored in the revocation log.
336
        noRevLogAmtData bool
337
}
338

339
// Open opens or creates channeldb. Any necessary schemas migrations due
340
// to updates will take place as necessary.
341
// TODO(bhandras): deprecate this function.
342
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
×
343
        opts := DefaultOptions()
×
344
        for _, modifier := range modifiers {
×
345
                modifier(&opts)
×
346
        }
×
347

348
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
×
349
                DBPath:            dbPath,
×
350
                DBFileName:        dbName,
×
351
                NoFreelistSync:    opts.NoFreelistSync,
×
352
                AutoCompact:       opts.AutoCompact,
×
353
                AutoCompactMinAge: opts.AutoCompactMinAge,
×
354
                DBTimeout:         opts.DBTimeout,
×
355
        })
×
356
        if err != nil {
×
357
                return nil, err
×
358
        }
×
359

360
        db, err := CreateWithBackend(backend, modifiers...)
×
361
        if err == nil {
×
362
                db.dbPath = dbPath
×
363
        }
×
364
        return db, err
×
365
}
366

367
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
368
// Any necessary schemas migrations due to updates will take place as necessary.
369
func CreateWithBackend(backend kvdb.Backend,
370
        modifiers ...OptionModifier) (*DB, error) {
3✔
371

3✔
372
        opts := DefaultOptions()
3✔
373
        for _, modifier := range modifiers {
6✔
374
                modifier(&opts)
3✔
375
        }
3✔
376

377
        if !opts.NoMigration {
6✔
378
                if err := initChannelDB(backend); err != nil {
3✔
379
                        return nil, err
×
380
                }
×
381
        }
382

383
        chanDB := &DB{
3✔
384
                Backend: backend,
3✔
385
                channelStateDB: &ChannelStateDB{
3✔
386
                        linkNodeDB: &LinkNodeDB{
3✔
387
                                backend: backend,
3✔
388
                        },
3✔
389
                        backend: backend,
3✔
390
                },
3✔
391
                clock:                     opts.clock,
3✔
392
                dryRun:                    opts.dryRun,
3✔
393
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
3✔
394
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
3✔
395
                noRevLogAmtData:           opts.NoRevLogAmtData,
3✔
396
        }
3✔
397

3✔
398
        // Set the parent pointer (only used in tests).
3✔
399
        chanDB.channelStateDB.parent = chanDB
3✔
400

3✔
401
        var err error
3✔
402
        chanDB.graph, err = NewChannelGraph(
3✔
403
                backend, opts.RejectCacheSize, opts.ChannelCacheSize,
3✔
404
                opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
3✔
405
                opts.UseGraphCache, opts.NoMigration,
3✔
406
        )
3✔
407
        if err != nil {
3✔
408
                return nil, err
×
409
        }
×
410

411
        // Synchronize the version of database and apply migrations if needed.
412
        if !opts.NoMigration {
6✔
413
                if err := chanDB.syncVersions(dbVersions); err != nil {
3✔
414
                        backend.Close()
×
415
                        return nil, err
×
416
                }
×
417

418
                // Grab the optional migration config.
419
                omc := opts.OptionalMiragtionConfig
3✔
420
                if err := chanDB.applyOptionalVersions(omc); err != nil {
3✔
421
                        backend.Close()
×
422
                        return nil, err
×
423
                }
×
424
        }
425

426
        return chanDB, nil
3✔
427
}
428

429
// Path returns the file path to the channel database.
430
func (d *DB) Path() string {
×
431
        return d.dbPath
×
432
}
×
433

434
var dbTopLevelBuckets = [][]byte{
435
        openChannelBucket,
436
        closedChannelBucket,
437
        forwardingLogBucket,
438
        fwdPackagesKey,
439
        invoiceBucket,
440
        payAddrIndexBucket,
441
        setIDIndexBucket,
442
        paymentsIndexBucket,
443
        peersBucket,
444
        nodeInfoBucket,
445
        metaBucket,
446
        closeSummaryBucket,
447
        outpointBucket,
448
        chanIDBucket,
449
        historicalChannelBucket,
450
}
451

452
// Wipe completely deletes all saved state within all used buckets within the
453
// database. The deletion is done in a single transaction, therefore this
454
// operation is fully atomic.
455
func (d *DB) Wipe() error {
×
456
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
×
457
                for _, tlb := range dbTopLevelBuckets {
×
458
                        err := tx.DeleteTopLevelBucket(tlb)
×
459
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
460
                                return err
×
461
                        }
×
462
                }
463
                return nil
×
464
        }, func() {})
×
465
        if err != nil {
×
466
                return err
×
467
        }
×
468

469
        return initChannelDB(d.Backend)
×
470
}
471

472
// initChannelDB creates and initializes a fresh version of channeldb. In the
473
// case that the target path has not yet been created or doesn't yet exist, then
474
// the path is created. Additionally, all required top-level buckets used within
475
// the database are created.
476
func initChannelDB(db kvdb.Backend) error {
3✔
477
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
478
                // Check if DB was marked as inactive with a tomb stone.
3✔
479
                if err := EnsureNoTombstone(tx); err != nil {
3✔
480
                        return err
×
481
                }
×
482

483
                meta := &Meta{}
3✔
484
                // Check if DB is already initialized.
3✔
485
                err := FetchMeta(meta, tx)
3✔
486
                if err == nil {
6✔
487
                        return nil
3✔
488
                }
3✔
489

490
                for _, tlb := range dbTopLevelBuckets {
6✔
491
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
3✔
492
                                return err
×
493
                        }
×
494
                }
495

496
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
3✔
497
                return putMeta(meta, tx)
3✔
498
        }, func() {})
3✔
499
        if err != nil {
3✔
500
                return fmt.Errorf("unable to create new channeldb: %w", err)
×
501
        }
×
502

503
        return nil
3✔
504
}
505

506
// fileExists returns true if the file exists, and false otherwise.
507
func fileExists(path string) bool {
×
508
        if _, err := os.Stat(path); err != nil {
×
509
                if os.IsNotExist(err) {
×
510
                        return false
×
511
                }
×
512
        }
513

514
        return true
×
515
}
516

517
// ChannelStateDB is a database that keeps track of all channel state.
518
type ChannelStateDB struct {
519
        // linkNodeDB separates all DB operations on LinkNodes.
520
        linkNodeDB *LinkNodeDB
521

522
        // parent holds a pointer to the "main" channeldb.DB object. This is
523
        // only used for testing and should never be used in production code.
524
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
525
        // this pointer.
526
        parent *DB
527

528
        // backend points to the actual backend holding the channel state
529
        // database. This may be a real backend or a cache middleware.
530
        backend kvdb.Backend
531
}
532

533
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
534
// ChannelStateDB instance. Use this function only in tests where passing around
535
// pointers makes testing less readable. Never to be used in production code!
536
func (c *ChannelStateDB) GetParentDB() *DB {
×
537
        return c.parent
×
538
}
×
539

540
// LinkNodeDB returns the current instance of the link node database.
541
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
3✔
542
        return c.linkNodeDB
3✔
543
}
3✔
544

545
// FetchOpenChannels starts a new database transaction and returns all stored
546
// currently active/open channels associated with the target nodeID. In the case
547
// that no active channels are known to have been created with this node, then a
548
// zero-length slice is returned.
549
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
550
        []*OpenChannel, error) {
3✔
551

3✔
552
        var channels []*OpenChannel
3✔
553
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
554
                var err error
3✔
555
                channels, err = c.fetchOpenChannels(tx, nodeID)
3✔
556
                return err
3✔
557
        }, func() {
6✔
558
                channels = nil
3✔
559
        })
3✔
560

561
        return channels, err
3✔
562
}
563

564
// fetchOpenChannels uses and existing database transaction and returns all
565
// stored currently active/open channels associated with the target nodeID. In
566
// the case that no active channels are known to have been created with this
567
// node, then a zero-length slice is returned.
568
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
569
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
3✔
570

3✔
571
        // Get the bucket dedicated to storing the metadata for open channels.
3✔
572
        openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
573
        if openChanBucket == nil {
3✔
574
                return nil, nil
×
575
        }
×
576

577
        // Within this top level bucket, fetch the bucket dedicated to storing
578
        // open channel data specific to the remote node.
579
        pub := nodeID.SerializeCompressed()
3✔
580
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
3✔
581
        if nodeChanBucket == nil {
6✔
582
                return nil, nil
3✔
583
        }
3✔
584

585
        // Next, we'll need to go down an additional layer in order to retrieve
586
        // the channels for each chain the node knows of.
587
        var channels []*OpenChannel
3✔
588
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
589
                // If there's a value, it's not a bucket so ignore it.
3✔
590
                if v != nil {
3✔
591
                        return nil
×
592
                }
×
593

594
                // If we've found a valid chainhash bucket, then we'll retrieve
595
                // that so we can extract all the channels.
596
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
3✔
597
                if chainBucket == nil {
3✔
598
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
599
                                chainHash[:])
×
600
                }
×
601

602
                // Finally, we both of the necessary buckets retrieved, fetch
603
                // all the active channels related to this node.
604
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
3✔
605
                if err != nil {
3✔
606
                        return fmt.Errorf("unable to read channel for "+
×
607
                                "chain_hash=%x, node_key=%x: %v",
×
608
                                chainHash[:], pub, err)
×
609
                }
×
610

611
                channels = append(channels, nodeChannels...)
3✔
612
                return nil
3✔
613
        })
614

615
        return channels, err
3✔
616
}
617

618
// fetchNodeChannels retrieves all active channels from the target chainBucket
619
// which is under a node's dedicated channel bucket. This function is typically
620
// used to fetch all the active channels related to a particular node.
621
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
622
        []*OpenChannel, error) {
3✔
623

3✔
624
        var channels []*OpenChannel
3✔
625

3✔
626
        // A node may have channels on several chains, so for each known chain,
3✔
627
        // we'll extract all the channels.
3✔
628
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
6✔
629
                // If there's a value, it's not a bucket so ignore it.
3✔
630
                if v != nil {
3✔
631
                        return nil
×
632
                }
×
633

634
                // Once we've found a valid channel bucket, we'll extract it
635
                // from the node's chain bucket.
636
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
3✔
637

3✔
638
                var outPoint wire.OutPoint
3✔
639
                err := readOutpoint(bytes.NewReader(chanPoint), &outPoint)
3✔
640
                if err != nil {
3✔
641
                        return err
×
642
                }
×
643
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
3✔
644
                if err != nil {
3✔
645
                        return fmt.Errorf("unable to read channel data for "+
×
646
                                "chan_point=%v: %w", outPoint, err)
×
647
                }
×
648
                oChannel.Db = c
3✔
649

3✔
650
                channels = append(channels, oChannel)
3✔
651

3✔
652
                return nil
3✔
653
        })
654
        if err != nil {
3✔
655
                return nil, err
×
656
        }
×
657

658
        return channels, nil
3✔
659
}
660

661
// FetchChannel attempts to locate a channel specified by the passed channel
662
// point. If the channel cannot be found, then an error will be returned.
663
// Optionally an existing db tx can be supplied.
664
func (c *ChannelStateDB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (
665
        *OpenChannel, error) {
3✔
666

3✔
667
        var targetChanPoint bytes.Buffer
3✔
668
        if err := writeOutpoint(&targetChanPoint, &chanPoint); err != nil {
3✔
669
                return nil, err
×
670
        }
×
671

672
        targetChanPointBytes := targetChanPoint.Bytes()
3✔
673
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
674
                error) {
6✔
675

3✔
676
                return targetChanPointBytes, &chanPoint, nil
3✔
677
        }
3✔
678

679
        return c.channelScanner(tx, selector)
3✔
680
}
681

682
// FetchChannelByID attempts to locate a channel specified by the passed channel
683
// ID. If the channel cannot be found, then an error will be returned.
684
// Optionally an existing db tx can be supplied.
685
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
686
        *OpenChannel, error) {
3✔
687

3✔
688
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
689
                error) {
6✔
690

3✔
691
                var (
3✔
692
                        targetChanPointBytes []byte
3✔
693
                        targetChanPoint      *wire.OutPoint
3✔
694

3✔
695
                        // errChanFound is used to signal that the channel has
3✔
696
                        // been found so that iteration through the DB buckets
3✔
697
                        // can stop.
3✔
698
                        errChanFound = errors.New("channel found")
3✔
699
                )
3✔
700
                err := chainBkt.ForEach(func(k, _ []byte) error {
6✔
701
                        var outPoint wire.OutPoint
3✔
702
                        err := readOutpoint(bytes.NewReader(k), &outPoint)
3✔
703
                        if err != nil {
3✔
704
                                return err
×
705
                        }
×
706

707
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
3✔
708
                        if chanID != id {
3✔
709
                                return nil
×
710
                        }
×
711

712
                        targetChanPoint = &outPoint
3✔
713
                        targetChanPointBytes = k
3✔
714

3✔
715
                        return errChanFound
3✔
716
                })
717
                if err != nil && !errors.Is(err, errChanFound) {
3✔
718
                        return nil, nil, err
×
719
                }
×
720
                if targetChanPoint == nil {
3✔
721
                        return nil, nil, ErrChannelNotFound
×
722
                }
×
723

724
                return targetChanPointBytes, targetChanPoint, nil
3✔
725
        }
726

727
        return c.channelScanner(tx, selector)
3✔
728
}
729

730
// channelSelector describes a function that takes a chain-hash bucket from
731
// within the open-channel DB and returns the wanted channel point bytes, and
732
// channel point. It must return the ErrChannelNotFound error if the wanted
733
// channel is not in the given bucket.
734
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
735
        error)
736

737
// channelScanner will traverse the DB to each chain-hash bucket of each node
738
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
739
// to fetch the wanted channel outpoint from the chain bucket.
740
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
741
        chanSelect channelSelector) (*OpenChannel, error) {
3✔
742

3✔
743
        var (
3✔
744
                targetChan *OpenChannel
3✔
745

3✔
746
                // errChanFound is used to signal that the channel has been
3✔
747
                // found so that iteration through the DB buckets can stop.
3✔
748
                errChanFound = errors.New("channel found")
3✔
749
        )
3✔
750

3✔
751
        // chanScan will traverse the following bucket structure:
3✔
752
        //  * nodePub => chainHash => chanPoint
3✔
753
        //
3✔
754
        // At each level we go one further, ensuring that we're traversing the
3✔
755
        // proper key (that's actually a bucket). By only reading the bucket
3✔
756
        // structure and skipping fully decoding each channel, we save a good
3✔
757
        // bit of CPU as we don't need to do things like decompress public
3✔
758
        // keys.
3✔
759
        chanScan := func(tx kvdb.RTx) error {
6✔
760
                // Get the bucket dedicated to storing the metadata for open
3✔
761
                // channels.
3✔
762
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
763
                if openChanBucket == nil {
3✔
764
                        return ErrNoActiveChannels
×
765
                }
×
766

767
                // Within the node channel bucket, are the set of node pubkeys
768
                // we have channels with, we don't know the entire set, so we'll
769
                // check them all.
770
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
6✔
771
                        // Ensure that this is a key the same size as a pubkey,
3✔
772
                        // and also that it leads directly to a bucket.
3✔
773
                        if len(nodePub) != 33 || v != nil {
3✔
774
                                return nil
×
775
                        }
×
776

777
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
778
                                nodePub,
3✔
779
                        )
3✔
780
                        if nodeChanBucket == nil {
3✔
781
                                return nil
×
782
                        }
×
783

784
                        // The next layer down is all the chains that this node
785
                        // has channels on with us.
786
                        return nodeChanBucket.ForEach(func(chainHash,
3✔
787
                                v []byte) error {
6✔
788

3✔
789
                                // If there's a value, it's not a bucket so
3✔
790
                                // ignore it.
3✔
791
                                if v != nil {
3✔
792
                                        return nil
×
793
                                }
×
794

795
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
796
                                        chainHash,
3✔
797
                                )
3✔
798
                                if chainBucket == nil {
3✔
799
                                        return fmt.Errorf("unable to read "+
×
800
                                                "bucket for chain=%x",
×
801
                                                chainHash)
×
802
                                }
×
803

804
                                // Finally, we reach the leaf bucket that stores
805
                                // all the chanPoints for this node.
806
                                targetChanBytes, chanPoint, err := chanSelect(
3✔
807
                                        chainBucket,
3✔
808
                                )
3✔
809
                                if errors.Is(err, ErrChannelNotFound) {
3✔
810
                                        return nil
×
811
                                } else if err != nil {
3✔
812
                                        return err
×
813
                                }
×
814

815
                                chanBucket := chainBucket.NestedReadBucket(
3✔
816
                                        targetChanBytes,
3✔
817
                                )
3✔
818
                                if chanBucket == nil {
6✔
819
                                        return nil
3✔
820
                                }
3✔
821

822
                                channel, err := fetchOpenChannel(
3✔
823
                                        chanBucket, chanPoint,
3✔
824
                                )
3✔
825
                                if err != nil {
3✔
826
                                        return err
×
827
                                }
×
828

829
                                targetChan = channel
3✔
830
                                targetChan.Db = c
3✔
831

3✔
832
                                return errChanFound
3✔
833
                        })
834
                })
835
        }
836

837
        var err error
3✔
838
        if tx == nil {
6✔
839
                err = kvdb.View(c.backend, chanScan, func() {})
6✔
840
        } else {
3✔
841
                err = chanScan(tx)
3✔
842
        }
3✔
843
        if err != nil && !errors.Is(err, errChanFound) {
3✔
844
                return nil, err
×
845
        }
×
846

847
        if targetChan != nil {
6✔
848
                return targetChan, nil
3✔
849
        }
3✔
850

851
        // If we can't find the channel, then we return with an error, as we
852
        // have nothing to back up.
853
        return nil, ErrChannelNotFound
3✔
854
}
855

856
// FetchAllChannels attempts to retrieve all open channels currently stored
857
// within the database, including pending open, fully open and channels waiting
858
// for a closing transaction to confirm.
859
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
3✔
860
        return fetchChannels(c)
3✔
861
}
3✔
862

863
// FetchAllOpenChannels will return all channels that have the funding
864
// transaction confirmed, and is not waiting for a closing transaction to be
865
// confirmed.
866
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
3✔
867
        return fetchChannels(
3✔
868
                c,
3✔
869
                pendingChannelFilter(false),
3✔
870
                waitingCloseFilter(false),
3✔
871
        )
3✔
872
}
3✔
873

874
// FetchPendingChannels will return channels that have completed the process of
875
// generating and broadcasting funding transactions, but whose funding
876
// transactions have yet to be confirmed on the blockchain.
877
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
3✔
878
        return fetchChannels(c,
3✔
879
                pendingChannelFilter(true),
3✔
880
                waitingCloseFilter(false),
3✔
881
        )
3✔
882
}
3✔
883

884
// FetchWaitingCloseChannels will return all channels that have been opened,
885
// but are now waiting for a closing transaction to be confirmed.
886
//
887
// NOTE: This includes channels that are also pending to be opened.
888
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
3✔
889
        return fetchChannels(
3✔
890
                c, waitingCloseFilter(true),
3✔
891
        )
3✔
892
}
3✔
893

894
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
895
// A set of filters can be combined to filter across multiple dimensions.
896
type fetchChannelsFilter func(channel *OpenChannel) bool
897

898
// pendingChannelFilter returns a filter based on whether channels are pending
899
// (ie, their funding transaction still needs to confirm). If pending is false,
900
// channels with confirmed funding transactions are returned.
901
func pendingChannelFilter(pending bool) fetchChannelsFilter {
3✔
902
        return func(channel *OpenChannel) bool {
6✔
903
                return channel.IsPending == pending
3✔
904
        }
3✔
905
}
906

907
// waitingCloseFilter returns a filter which filters channels based on whether
908
// they are awaiting the confirmation of their closing transaction. If waiting
909
// close is true, channels that have had their closing tx broadcast are
910
// included. If it is false, channels that are not awaiting confirmation of
911
// their close transaction are returned.
912
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
3✔
913
        return func(channel *OpenChannel) bool {
6✔
914
                // If the channel is in any other state than Default,
3✔
915
                // then it means it is waiting to be closed.
3✔
916
                channelWaitingClose :=
3✔
917
                        channel.ChanStatus() != ChanStatusDefault
3✔
918

3✔
919
                // Include the channel if it matches the value for
3✔
920
                // waiting close that we are filtering on.
3✔
921
                return channelWaitingClose == waitingClose
3✔
922
        }
3✔
923
}
924

925
// fetchChannels attempts to retrieve channels currently stored in the
926
// database. It takes a set of filters which are applied to each channel to
927
// obtain a set of channels with the desired set of properties. Only channels
928
// which have a true value returned for *all* of the filters will be returned.
929
// If no filters are provided, every channel in the open channels bucket will
930
// be returned.
931
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
932
        []*OpenChannel, error) {
3✔
933

3✔
934
        var channels []*OpenChannel
3✔
935

3✔
936
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
937
                // Get the bucket dedicated to storing the metadata for open
3✔
938
                // channels.
3✔
939
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
940
                if openChanBucket == nil {
3✔
941
                        return ErrNoActiveChannels
×
942
                }
×
943

944
                // Next, fetch the bucket dedicated to storing metadata related
945
                // to all nodes. All keys within this bucket are the serialized
946
                // public keys of all our direct counterparties.
947
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
3✔
948
                if nodeMetaBucket == nil {
3✔
949
                        return fmt.Errorf("node bucket not created")
×
950
                }
×
951

952
                // Finally for each node public key in the bucket, fetch all
953
                // the channels related to this particular node.
954
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
6✔
955
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
3✔
956
                        if nodeChanBucket == nil {
3✔
957
                                return nil
×
958
                        }
×
959

960
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
961
                                // If there's a value, it's not a bucket so
3✔
962
                                // ignore it.
3✔
963
                                if v != nil {
3✔
964
                                        return nil
×
965
                                }
×
966

967
                                // If we've found a valid chainhash bucket,
968
                                // then we'll retrieve that so we can extract
969
                                // all the channels.
970
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
971
                                        chainHash,
3✔
972
                                )
3✔
973
                                if chainBucket == nil {
3✔
974
                                        return fmt.Errorf("unable to read "+
×
975
                                                "bucket for chain=%x", chainHash[:])
×
976
                                }
×
977

978
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
3✔
979
                                if err != nil {
3✔
980
                                        return fmt.Errorf("unable to read "+
×
981
                                                "channel for chain_hash=%x, "+
×
982
                                                "node_key=%x: %v", chainHash[:], k, err)
×
983
                                }
×
984
                                for _, channel := range nodeChans {
6✔
985
                                        // includeChannel indicates whether the channel
3✔
986
                                        // meets the criteria specified by our filters.
3✔
987
                                        includeChannel := true
3✔
988

3✔
989
                                        // Run through each filter and check whether the
3✔
990
                                        // channel should be included.
3✔
991
                                        for _, f := range filters {
6✔
992
                                                // If the channel fails the filter, set
3✔
993
                                                // includeChannel to false and don't bother
3✔
994
                                                // checking the remaining filters.
3✔
995
                                                if !f(channel) {
6✔
996
                                                        includeChannel = false
3✔
997
                                                        break
3✔
998
                                                }
999
                                        }
1000

1001
                                        // If the channel passed every filter, include it in
1002
                                        // our set of channels.
1003
                                        if includeChannel {
6✔
1004
                                                channels = append(channels, channel)
3✔
1005
                                        }
3✔
1006
                                }
1007
                                return nil
3✔
1008
                        })
1009

1010
                })
1011
        }, func() {
3✔
1012
                channels = nil
3✔
1013
        })
3✔
1014
        if err != nil {
3✔
1015
                return nil, err
×
1016
        }
×
1017

1018
        return channels, nil
3✔
1019
}
1020

1021
// FetchClosedChannels attempts to fetch all closed channels from the database.
1022
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1023
// be returned in the response or not. When a channel was cooperatively closed,
1024
// it becomes fully closed after a single confirmation.  When a channel was
1025
// forcibly closed, it will become fully closed after _all_ the pending funds
1026
// (if any) have been swept.
1027
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1028
        []*ChannelCloseSummary, error) {
3✔
1029

3✔
1030
        var chanSummaries []*ChannelCloseSummary
3✔
1031

3✔
1032
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1033
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1034
                if closeBucket == nil {
3✔
1035
                        return ErrNoClosedChannels
×
1036
                }
×
1037

1038
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
6✔
1039
                        summaryReader := bytes.NewReader(summaryBytes)
3✔
1040
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
3✔
1041
                        if err != nil {
3✔
1042
                                return err
×
1043
                        }
×
1044

1045
                        // If the query specified to only include pending
1046
                        // channels, then we'll skip any channels which aren't
1047
                        // currently pending.
1048
                        if !chanSummary.IsPending && pendingOnly {
6✔
1049
                                return nil
3✔
1050
                        }
3✔
1051

1052
                        chanSummaries = append(chanSummaries, chanSummary)
3✔
1053
                        return nil
3✔
1054
                })
1055
        }, func() {
3✔
1056
                chanSummaries = nil
3✔
1057
        }); err != nil {
3✔
1058
                return nil, err
×
1059
        }
×
1060

1061
        return chanSummaries, nil
3✔
1062
}
1063

1064
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1065
// the channeldb.
1066
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1067

1068
// FetchClosedChannel queries for a channel close summary using the channel
1069
// point of the channel in question.
1070
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1071
        *ChannelCloseSummary, error) {
3✔
1072

3✔
1073
        var chanSummary *ChannelCloseSummary
3✔
1074
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1075
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1076
                if closeBucket == nil {
3✔
1077
                        return ErrClosedChannelNotFound
×
1078
                }
×
1079

1080
                var b bytes.Buffer
3✔
1081
                var err error
3✔
1082
                if err = writeOutpoint(&b, chanID); err != nil {
3✔
1083
                        return err
×
1084
                }
×
1085

1086
                summaryBytes := closeBucket.Get(b.Bytes())
3✔
1087
                if summaryBytes == nil {
3✔
1088
                        return ErrClosedChannelNotFound
×
1089
                }
×
1090

1091
                summaryReader := bytes.NewReader(summaryBytes)
3✔
1092
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
3✔
1093

3✔
1094
                return err
3✔
1095
        }, func() {
3✔
1096
                chanSummary = nil
3✔
1097
        }); err != nil {
3✔
1098
                return nil, err
×
1099
        }
×
1100

1101
        return chanSummary, nil
3✔
1102
}
1103

1104
// FetchClosedChannelForID queries for a channel close summary using the
1105
// channel ID of the channel in question.
1106
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1107
        *ChannelCloseSummary, error) {
3✔
1108

3✔
1109
        var chanSummary *ChannelCloseSummary
3✔
1110
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1111
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1112
                if closeBucket == nil {
3✔
1113
                        return ErrClosedChannelNotFound
×
1114
                }
×
1115

1116
                // The first 30 bytes of the channel ID and outpoint will be
1117
                // equal.
1118
                cursor := closeBucket.ReadCursor()
3✔
1119
                op, c := cursor.Seek(cid[:30])
3✔
1120

3✔
1121
                // We scan over all possible candidates for this channel ID.
3✔
1122
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
6✔
1123
                        var outPoint wire.OutPoint
3✔
1124
                        err := readOutpoint(bytes.NewReader(op), &outPoint)
3✔
1125
                        if err != nil {
3✔
1126
                                return err
×
1127
                        }
×
1128

1129
                        // If the found outpoint does not correspond to this
1130
                        // channel ID, we continue.
1131
                        if !cid.IsChanPoint(&outPoint) {
3✔
1132
                                continue
×
1133
                        }
1134

1135
                        // Deserialize the close summary and return.
1136
                        r := bytes.NewReader(c)
3✔
1137
                        chanSummary, err = deserializeCloseChannelSummary(r)
3✔
1138
                        if err != nil {
3✔
1139
                                return err
×
1140
                        }
×
1141

1142
                        return nil
3✔
1143
                }
1144
                return ErrClosedChannelNotFound
3✔
1145
        }, func() {
3✔
1146
                chanSummary = nil
3✔
1147
        }); err != nil {
6✔
1148
                return nil, err
3✔
1149
        }
3✔
1150

1151
        return chanSummary, nil
3✔
1152
}
1153

1154
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1155
// channel should be marked as fully closed if the channel was initially
1156
// cooperatively closed and it's reached a single confirmation, or after all
1157
// the pending funds in a channel that has been forcibly closed have been
1158
// swept.
1159
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
3✔
1160
        var (
3✔
1161
                openChannels  []*OpenChannel
3✔
1162
                pruneLinkNode *btcec.PublicKey
3✔
1163
        )
3✔
1164
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1165
                var b bytes.Buffer
3✔
1166
                if err := writeOutpoint(&b, chanPoint); err != nil {
3✔
1167
                        return err
×
1168
                }
×
1169

1170
                chanID := b.Bytes()
3✔
1171

3✔
1172
                closedChanBucket, err := tx.CreateTopLevelBucket(
3✔
1173
                        closedChannelBucket,
3✔
1174
                )
3✔
1175
                if err != nil {
3✔
1176
                        return err
×
1177
                }
×
1178

1179
                chanSummaryBytes := closedChanBucket.Get(chanID)
3✔
1180
                if chanSummaryBytes == nil {
3✔
1181
                        return fmt.Errorf("no closed channel for "+
×
1182
                                "chan_point=%v found", chanPoint)
×
1183
                }
×
1184

1185
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
3✔
1186
                chanSummary, err := deserializeCloseChannelSummary(
3✔
1187
                        chanSummaryReader,
3✔
1188
                )
3✔
1189
                if err != nil {
3✔
1190
                        return err
×
1191
                }
×
1192

1193
                chanSummary.IsPending = false
3✔
1194

3✔
1195
                var newSummary bytes.Buffer
3✔
1196
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
3✔
1197
                if err != nil {
3✔
1198
                        return err
×
1199
                }
×
1200

1201
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
3✔
1202
                if err != nil {
3✔
1203
                        return err
×
1204
                }
×
1205

1206
                // Now that the channel is closed, we'll check if we have any
1207
                // other open channels with this peer. If we don't we'll
1208
                // garbage collect it to ensure we don't establish persistent
1209
                // connections to peers without open channels.
1210
                pruneLinkNode = chanSummary.RemotePub
3✔
1211
                openChannels, err = c.fetchOpenChannels(
3✔
1212
                        tx, pruneLinkNode,
3✔
1213
                )
3✔
1214
                if err != nil {
3✔
1215
                        return fmt.Errorf("unable to fetch open channels for "+
×
1216
                                "peer %x: %v",
×
1217
                                pruneLinkNode.SerializeCompressed(), err)
×
1218
                }
×
1219

1220
                return nil
3✔
1221
        }, func() {
3✔
1222
                openChannels = nil
3✔
1223
                pruneLinkNode = nil
3✔
1224
        })
3✔
1225
        if err != nil {
3✔
1226
                return err
×
1227
        }
×
1228

1229
        // Decide whether we want to remove the link node, based upon the number
1230
        // of still open channels.
1231
        return c.pruneLinkNode(openChannels, pruneLinkNode)
3✔
1232
}
1233

1234
// pruneLinkNode determines whether we should garbage collect a link node from
1235
// the database due to no longer having any open channels with it. If there are
1236
// any left, then this acts as a no-op.
1237
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1238
        remotePub *btcec.PublicKey) error {
3✔
1239

3✔
1240
        if len(openChannels) > 0 {
6✔
1241
                return nil
3✔
1242
        }
3✔
1243

1244
        log.Infof("Pruning link node %x with zero open channels from database",
3✔
1245
                remotePub.SerializeCompressed())
3✔
1246

3✔
1247
        return c.linkNodeDB.DeleteLinkNode(remotePub)
3✔
1248
}
1249

1250
// PruneLinkNodes attempts to prune all link nodes found within the database
1251
// with whom we no longer have any open channels with.
1252
func (c *ChannelStateDB) PruneLinkNodes() error {
3✔
1253
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
3✔
1254
        if err != nil {
3✔
1255
                return err
×
1256
        }
×
1257

1258
        for _, linkNode := range allLinkNodes {
6✔
1259
                var (
3✔
1260
                        openChannels []*OpenChannel
3✔
1261
                        linkNode     = linkNode
3✔
1262
                )
3✔
1263
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1264
                        var err error
3✔
1265
                        openChannels, err = c.fetchOpenChannels(
3✔
1266
                                tx, linkNode.IdentityPub,
3✔
1267
                        )
3✔
1268
                        return err
3✔
1269
                }, func() {
6✔
1270
                        openChannels = nil
3✔
1271
                })
3✔
1272
                if err != nil {
3✔
1273
                        return err
×
1274
                }
×
1275

1276
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
3✔
1277
                if err != nil {
3✔
1278
                        return err
×
1279
                }
×
1280
        }
1281

1282
        return nil
3✔
1283
}
1284

1285
// ChannelShell is a shell of a channel that is meant to be used for channel
1286
// recovery purposes. It contains a minimal OpenChannel instance along with
1287
// addresses for that target node.
1288
type ChannelShell struct {
1289
        // NodeAddrs the set of addresses that this node has known to be
1290
        // reachable at in the past.
1291
        NodeAddrs []net.Addr
1292

1293
        // Chan is a shell of an OpenChannel, it contains only the items
1294
        // required to restore the channel on disk.
1295
        Chan *OpenChannel
1296
}
1297

1298
// RestoreChannelShells is a method that allows the caller to reconstruct the
1299
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1300
// new channel to disk, create a LinkNode instance with the passed node
1301
// addresses, and finally create an edge within the graph for the channel as
1302
// well. This method is idempotent, so repeated calls with the same set of
1303
// channel shells won't modify the database after the initial call.
1304
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
3✔
1305
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1306
                for _, channelShell := range channelShells {
6✔
1307
                        channel := channelShell.Chan
3✔
1308

3✔
1309
                        // When we make a channel, we mark that the channel has
3✔
1310
                        // been restored, this will signal to other sub-systems
3✔
1311
                        // to not attempt to use the channel as if it was a
3✔
1312
                        // regular one.
3✔
1313
                        channel.chanStatus |= ChanStatusRestored
3✔
1314

3✔
1315
                        // First, we'll attempt to create a new open channel
3✔
1316
                        // and link node for this channel. If the channel
3✔
1317
                        // already exists, then in order to ensure this method
3✔
1318
                        // is idempotent, we'll continue to the next step.
3✔
1319
                        channel.Db = c
3✔
1320
                        err := syncNewChannel(
3✔
1321
                                tx, channel, channelShell.NodeAddrs,
3✔
1322
                        )
3✔
1323
                        if err != nil {
6✔
1324
                                return err
3✔
1325
                        }
3✔
1326
                }
1327

1328
                return nil
3✔
1329
        }, func() {})
3✔
1330
        if err != nil {
6✔
1331
                return err
3✔
1332
        }
3✔
1333

1334
        return nil
3✔
1335
}
1336

1337
// AddrsForNode consults the graph and channel database for all addresses known
1338
// to the passed node public key.
1339
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr,
1340
        error) {
3✔
1341

3✔
1342
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
3✔
1343
        if err != nil {
3✔
1344
                return nil, err
×
1345
        }
×
1346

1347
        // We'll also query the graph for this peer to see if they have any
1348
        // addresses that we don't currently have stored within the link node
1349
        // database.
1350
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
3✔
1351
        if err != nil {
3✔
1352
                return nil, err
×
1353
        }
×
1354
        graphNode, err := d.graph.FetchLightningNode(nil, pubKey)
3✔
1355
        if err != nil && err != ErrGraphNodeNotFound {
3✔
1356
                return nil, err
×
1357
        } else if err == ErrGraphNodeNotFound {
6✔
1358
                // If the node isn't found, then that's OK, as we still have the
3✔
1359
                // link node data. But any other error needs to be returned.
3✔
1360
                graphNode = &LightningNode{}
3✔
1361
        }
3✔
1362

1363
        // Now that we have both sources of addrs for this node, we'll use a
1364
        // map to de-duplicate any addresses between the two sources, and
1365
        // produce a final list of the combined addrs.
1366
        addrs := make(map[string]net.Addr)
3✔
1367
        for _, addr := range linkNode.Addresses {
6✔
1368
                addrs[addr.String()] = addr
3✔
1369
        }
3✔
1370
        for _, addr := range graphNode.Addresses {
6✔
1371
                addrs[addr.String()] = addr
3✔
1372
        }
3✔
1373
        dedupedAddrs := make([]net.Addr, 0, len(addrs))
3✔
1374
        for _, addr := range addrs {
6✔
1375
                dedupedAddrs = append(dedupedAddrs, addr)
3✔
1376
        }
3✔
1377

1378
        return dedupedAddrs, nil
3✔
1379
}
1380

1381
// AbandonChannel attempts to remove the target channel from the open channel
1382
// database. If the channel was already removed (has a closed channel entry),
1383
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1384
// into the database.
1385
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1386
        bestHeight uint32) error {
3✔
1387

3✔
1388
        // With the chanPoint constructed, we'll attempt to find the target
3✔
1389
        // channel in the database. If we can't find the channel, then we'll
3✔
1390
        // return the error back to the caller.
3✔
1391
        dbChan, err := c.FetchChannel(nil, *chanPoint)
3✔
1392
        switch {
3✔
1393
        // If the channel wasn't found, then it's possible that it was already
1394
        // abandoned from the database.
1395
        case err == ErrChannelNotFound:
3✔
1396
                _, closedErr := c.FetchClosedChannel(chanPoint)
3✔
1397
                if closedErr != nil {
3✔
1398
                        return closedErr
×
1399
                }
×
1400

1401
                // If the channel was already closed, then we don't return an
1402
                // error as we'd like this step to be repeatable.
1403
                return nil
3✔
1404
        case err != nil:
×
1405
                return err
×
1406
        }
1407

1408
        // Now that we've found the channel, we'll populate a close summary for
1409
        // the channel, so we can store as much information for this abounded
1410
        // channel as possible. We also ensure that we set Pending to false, to
1411
        // indicate that this channel has been "fully" closed.
1412
        summary := &ChannelCloseSummary{
3✔
1413
                CloseType:               Abandoned,
3✔
1414
                ChanPoint:               *chanPoint,
3✔
1415
                ChainHash:               dbChan.ChainHash,
3✔
1416
                CloseHeight:             bestHeight,
3✔
1417
                RemotePub:               dbChan.IdentityPub,
3✔
1418
                Capacity:                dbChan.Capacity,
3✔
1419
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
3✔
1420
                ShortChanID:             dbChan.ShortChanID(),
3✔
1421
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
3✔
1422
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
3✔
1423
                LocalChanConfig:         dbChan.LocalChanCfg,
3✔
1424
        }
3✔
1425

3✔
1426
        // Finally, we'll close the channel in the DB, and return back to the
3✔
1427
        // caller. We set ourselves as the close initiator because we abandoned
3✔
1428
        // the channel.
3✔
1429
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
3✔
1430
}
1431

1432
// SaveChannelOpeningState saves the serialized channel state for the provided
1433
// chanPoint to the channelOpeningStateBucket.
1434
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1435
        serializedState []byte) error {
3✔
1436

3✔
1437
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1438
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
3✔
1439
                if err != nil {
3✔
1440
                        return err
×
1441
                }
×
1442

1443
                return bucket.Put(outPoint, serializedState)
3✔
1444
        }, func() {})
3✔
1445
}
1446

1447
// GetChannelOpeningState fetches the serialized channel state for the provided
1448
// outPoint from the database, or returns ErrChannelNotFound if the channel
1449
// is not found.
1450
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1451
        error) {
3✔
1452

3✔
1453
        var serializedState []byte
3✔
1454
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1455
                bucket := tx.ReadBucket(channelOpeningStateBucket)
3✔
1456
                if bucket == nil {
6✔
1457
                        // If the bucket does not exist, it means we never added
3✔
1458
                        //  a channel to the db, so return ErrChannelNotFound.
3✔
1459
                        return ErrChannelNotFound
3✔
1460
                }
3✔
1461

1462
                stateBytes := bucket.Get(outPoint)
3✔
1463
                if stateBytes == nil {
6✔
1464
                        return ErrChannelNotFound
3✔
1465
                }
3✔
1466

1467
                serializedState = append(serializedState, stateBytes...)
3✔
1468

3✔
1469
                return nil
3✔
1470
        }, func() {
3✔
1471
                serializedState = nil
3✔
1472
        })
3✔
1473
        return serializedState, err
3✔
1474
}
1475

1476
// DeleteChannelOpeningState removes any state for outPoint from the database.
1477
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
3✔
1478
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1479
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
3✔
1480
                if bucket == nil {
3✔
1481
                        return ErrChannelNotFound
×
1482
                }
×
1483

1484
                return bucket.Delete(outPoint)
3✔
1485
        }, func() {})
3✔
1486
}
1487

1488
// syncVersions function is used for safe db version synchronization. It
1489
// applies migration functions to the current database and recovers the
1490
// previous state of db if at least one error/panic appeared during migration.
1491
func (d *DB) syncVersions(versions []mandatoryVersion) error {
3✔
1492
        meta, err := d.FetchMeta()
3✔
1493
        if err != nil {
3✔
1494
                if err == ErrMetaNotFound {
×
1495
                        meta = &Meta{}
×
1496
                } else {
×
1497
                        return err
×
1498
                }
×
1499
        }
1500

1501
        latestVersion := getLatestDBVersion(versions)
3✔
1502
        log.Infof("Checking for schema update: latest_version=%v, "+
3✔
1503
                "db_version=%v", latestVersion, meta.DbVersionNumber)
3✔
1504

3✔
1505
        switch {
3✔
1506

1507
        // If the database reports a higher version that we are aware of, the
1508
        // user is probably trying to revert to a prior version of lnd. We fail
1509
        // here to prevent reversions and unintended corruption.
1510
        case meta.DbVersionNumber > latestVersion:
×
1511
                log.Errorf("Refusing to revert from db_version=%d to "+
×
1512
                        "lower version=%d", meta.DbVersionNumber,
×
1513
                        latestVersion)
×
1514
                return ErrDBReversion
×
1515

1516
        // If the current database version matches the latest version number,
1517
        // then we don't need to perform any migrations.
1518
        case meta.DbVersionNumber == latestVersion:
3✔
1519
                return nil
3✔
1520
        }
1521

1522
        log.Infof("Performing database schema migration")
×
1523

×
1524
        // Otherwise, we fetch the migrations which need to applied, and
×
1525
        // execute them serially within a single database transaction to ensure
×
1526
        // the migration is atomic.
×
1527
        migrations, migrationVersions := getMigrationsToApply(
×
1528
                versions, meta.DbVersionNumber,
×
1529
        )
×
1530
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
×
1531
                for i, migration := range migrations {
×
1532
                        if migration == nil {
×
1533
                                continue
×
1534
                        }
1535

1536
                        log.Infof("Applying migration #%v",
×
1537
                                migrationVersions[i])
×
1538

×
1539
                        if err := migration(tx); err != nil {
×
1540
                                log.Infof("Unable to apply migration #%v",
×
1541
                                        migrationVersions[i])
×
1542
                                return err
×
1543
                        }
×
1544
                }
1545

1546
                meta.DbVersionNumber = latestVersion
×
1547
                err := putMeta(meta, tx)
×
1548
                if err != nil {
×
1549
                        return err
×
1550
                }
×
1551

1552
                // In dry-run mode, return an error to prevent the transaction
1553
                // from committing.
1554
                if d.dryRun {
×
1555
                        return ErrDryRunMigrationOK
×
1556
                }
×
1557

1558
                return nil
×
1559
        }, func() {})
×
1560
}
1561

1562
// applyOptionalVersions takes a config to determine whether the optional
1563
// migrations will be applied.
1564
//
1565
// NOTE: only support the prune_revocation_log optional migration atm.
1566
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
3✔
1567
        // TODO(yy): need to design the db to support dry run for optional
3✔
1568
        // migrations.
3✔
1569
        if d.dryRun {
3✔
1570
                log.Info("Skipped optional migrations as dry run mode is not " +
×
1571
                        "supported yet")
×
1572
                return nil
×
1573
        }
×
1574

1575
        om, err := d.fetchOptionalMeta()
3✔
1576
        if err != nil {
3✔
1577
                if err == ErrMetaNotFound {
×
1578
                        om = &OptionalMeta{
×
1579
                                Versions: make(map[uint64]string),
×
1580
                        }
×
1581
                } else {
×
1582
                        return err
×
1583
                }
×
1584
        }
1585

1586
        log.Infof("Checking for optional update: prune_revocation_log=%v, "+
3✔
1587
                "db_version=%s", cfg.PruneRevocationLog, om)
3✔
1588

3✔
1589
        // Exit early if the optional migration is not specified.
3✔
1590
        if !cfg.PruneRevocationLog {
6✔
1591
                return nil
3✔
1592
        }
3✔
1593

1594
        // Exit early if the optional migration has already been applied.
1595
        if _, ok := om.Versions[0]; ok {
×
1596
                return nil
×
1597
        }
×
1598

1599
        // Get the optional version.
1600
        version := optionalVersions[0]
×
1601
        log.Infof("Performing database optional migration: %s", version.name)
×
1602

×
1603
        migrationCfg := &MigrationConfigImpl{
×
1604
                migration30.MigrateRevLogConfigImpl{
×
1605
                        NoAmountData: d.noRevLogAmtData,
×
1606
                },
×
1607
        }
×
1608

×
1609
        // Migrate the data.
×
1610
        if err := version.migration(d, migrationCfg); err != nil {
×
1611
                log.Errorf("Unable to apply optional migration: %s, error: %v",
×
1612
                        version.name, err)
×
1613
                return err
×
1614
        }
×
1615

1616
        // Update the optional meta. Notice that unlike the mandatory db
1617
        // migrations where we perform the migration and updating meta in a
1618
        // single db transaction, we use different transactions here. Even when
1619
        // the following update is failed, we should be fine here as we would
1620
        // re-run the optional migration again, which is a noop, during next
1621
        // startup.
1622
        om.Versions[0] = version.name
×
1623
        if err := d.putOptionalMeta(om); err != nil {
×
1624
                log.Errorf("Unable to update optional meta: %v", err)
×
1625
                return err
×
1626
        }
×
1627

1628
        return nil
×
1629
}
1630

1631
// ChannelGraph returns the current instance of the directed channel graph.
1632
func (d *DB) ChannelGraph() *ChannelGraph {
3✔
1633
        return d.graph
3✔
1634
}
3✔
1635

1636
// ChannelStateDB returns the sub database that is concerned with the channel
1637
// state.
1638
func (d *DB) ChannelStateDB() *ChannelStateDB {
3✔
1639
        return d.channelStateDB
3✔
1640
}
3✔
1641

1642
// LatestDBVersion returns the number of the latest database version currently
1643
// known to the channel DB.
1644
func LatestDBVersion() uint32 {
×
1645
        return getLatestDBVersion(dbVersions)
×
1646
}
×
1647

1648
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
3✔
1649
        return versions[len(versions)-1].number
3✔
1650
}
3✔
1651

1652
// getMigrationsToApply retrieves the migration function that should be
1653
// applied to the database.
1654
func getMigrationsToApply(versions []mandatoryVersion,
1655
        version uint32) ([]migration, []uint32) {
×
1656

×
1657
        migrations := make([]migration, 0, len(versions))
×
1658
        migrationVersions := make([]uint32, 0, len(versions))
×
1659

×
1660
        for _, v := range versions {
×
1661
                if v.number > version {
×
1662
                        migrations = append(migrations, v.migration)
×
1663
                        migrationVersions = append(migrationVersions, v.number)
×
1664
                }
×
1665
        }
1666

1667
        return migrations, migrationVersions
×
1668
}
1669

1670
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1671
// from the historical channel bucket. If the bucket does not exist,
1672
// ErrNoHistoricalBucket is returned.
1673
func fetchHistoricalChanBucket(tx kvdb.RTx,
1674
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
3✔
1675

3✔
1676
        // First fetch the top level bucket which stores all data related to
3✔
1677
        // historically stored channels.
3✔
1678
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
3✔
1679
        if historicalChanBucket == nil {
3✔
1680
                return nil, ErrNoHistoricalBucket
×
1681
        }
×
1682

1683
        // With the bucket for the node and chain fetched, we can now go down
1684
        // another level, for the channel itself.
1685
        var chanPointBuf bytes.Buffer
3✔
1686
        if err := writeOutpoint(&chanPointBuf, outPoint); err != nil {
3✔
1687
                return nil, err
×
1688
        }
×
1689
        chanBucket := historicalChanBucket.NestedReadBucket(
3✔
1690
                chanPointBuf.Bytes(),
3✔
1691
        )
3✔
1692
        if chanBucket == nil {
3✔
1693
                return nil, ErrChannelNotFound
×
1694
        }
×
1695

1696
        return chanBucket, nil
3✔
1697
}
1698

1699
// FetchHistoricalChannel fetches open channel data from the historical channel
1700
// bucket.
1701
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1702
        *OpenChannel, error) {
3✔
1703

3✔
1704
        var channel *OpenChannel
3✔
1705
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1706
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
3✔
1707
                if err != nil {
3✔
1708
                        return err
×
1709
                }
×
1710

1711
                channel, err = fetchOpenChannel(chanBucket, outPoint)
3✔
1712
                if err != nil {
3✔
1713
                        return err
×
1714
                }
×
1715

1716
                channel.Db = c
3✔
1717
                return nil
3✔
1718
        }, func() {
3✔
1719
                channel = nil
3✔
1720
        })
3✔
1721
        if err != nil {
3✔
1722
                return nil, err
×
1723
        }
×
1724

1725
        return channel, nil
3✔
1726
}
1727

1728
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1729
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
3✔
1730

3✔
1731
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
3✔
1732
        if finalHtlcsBucket == nil {
3✔
1733
                return nil, ErrFinalHtlcsBucketNotFound
×
1734
        }
×
1735

1736
        var chanIDBytes [8]byte
3✔
1737
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
3✔
1738

3✔
1739
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
3✔
1740
        if chanBucket == nil {
3✔
1741
                return nil, ErrFinalChannelBucketNotFound
×
1742
        }
×
1743

1744
        return chanBucket, nil
3✔
1745
}
1746

1747
var ErrHtlcUnknown = errors.New("htlc unknown")
1748

1749
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1750
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1751
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1752
        htlcIndex uint64) (*FinalHtlcInfo, error) {
3✔
1753

3✔
1754
        var idBytes [8]byte
3✔
1755
        byteOrder.PutUint64(idBytes[:], htlcIndex)
3✔
1756

3✔
1757
        var settledByte byte
3✔
1758

3✔
1759
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1760
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
3✔
1761
                        tx, chanID,
3✔
1762
                )
3✔
1763
                switch {
3✔
1764
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
×
1765
                        fallthrough
×
1766

1767
                case errors.Is(err, ErrFinalChannelBucketNotFound):
×
1768
                        return ErrHtlcUnknown
×
1769

1770
                case err != nil:
×
1771
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1772
                                err)
×
1773
                }
1774

1775
                value := finalHtlcsBucket.Get(idBytes[:])
3✔
1776
                if value == nil {
3✔
1777
                        return ErrHtlcUnknown
×
1778
                }
×
1779

1780
                if len(value) != 1 {
3✔
1781
                        return errors.New("unexpected final htlc value length")
×
1782
                }
×
1783

1784
                settledByte = value[0]
3✔
1785

3✔
1786
                return nil
3✔
1787
        }, func() {
3✔
1788
                settledByte = 0
3✔
1789
        })
3✔
1790
        if err != nil {
3✔
1791
                return nil, err
×
1792
        }
×
1793

1794
        info := FinalHtlcInfo{
3✔
1795
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
3✔
1796
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
3✔
1797
        }
3✔
1798

3✔
1799
        return &info, nil
3✔
1800
}
1801

1802
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
1803
// the database.
1804
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
1805
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
3✔
1806

3✔
1807
        // Skip if the user did not opt in to storing final resolutions.
3✔
1808
        if !c.parent.storeFinalHtlcResolutions {
6✔
1809
                return nil
3✔
1810
        }
3✔
1811

1812
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
×
1813
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
×
1814
                if err != nil {
×
1815
                        return err
×
1816
                }
×
1817

1818
                return putFinalHtlc(
×
1819
                        finalHtlcsBucket, htlcID,
×
1820
                        FinalHtlcInfo{
×
1821
                                Settled:  settled,
×
1822
                                Offchain: false,
×
1823
                        },
×
1824
                )
×
1825
        }, func() {})
×
1826
}
1827

1828
// MakeTestInvoiceDB is used to create a test invoice database for testing
1829
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
1830
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
1831
        invoices.InvoiceDB, error) {
×
1832

×
1833
        return MakeTestDB(t, modifiers...)
×
1834
}
×
1835

1836
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
1837
// A callback which cleans up the created temporary directories is also
1838
// returned and intended to be executed after the test completes.
1839
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
×
1840
        // First, create a temporary directory to be used for the duration of
×
1841
        // this test.
×
1842
        tempDirName := t.TempDir()
×
1843

×
1844
        // Next, create channeldb for the first time.
×
1845
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
×
1846
        if err != nil {
×
1847
                backendCleanup()
×
1848
                return nil, err
×
1849
        }
×
1850

1851
        cdb, err := CreateWithBackend(backend, modifiers...)
×
1852
        if err != nil {
×
1853
                backendCleanup()
×
1854
                return nil, err
×
1855
        }
×
1856

1857
        t.Cleanup(func() {
×
1858
                cdb.Close()
×
1859
                backendCleanup()
×
1860
        })
×
1861

1862
        return cdb, nil
×
1863
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc