• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14358372723

09 Apr 2025 01:26PM UTC coverage: 56.696% (-12.3%) from 69.037%
14358372723

Pull #9696

github

web-flow
Merge e2837e400 into 867d27d68
Pull Request #9696: Add `development_guidelines.md` for both human and machine

107055 of 188823 relevant lines covered (56.7%)

22721.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.65
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "net"
8
        "os"
9
        "testing"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/go-errors/errors"
15
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
16
        "github.com/lightningnetwork/lnd/channeldb/migration12"
17
        "github.com/lightningnetwork/lnd/channeldb/migration13"
18
        "github.com/lightningnetwork/lnd/channeldb/migration16"
19
        "github.com/lightningnetwork/lnd/channeldb/migration20"
20
        "github.com/lightningnetwork/lnd/channeldb/migration21"
21
        "github.com/lightningnetwork/lnd/channeldb/migration23"
22
        "github.com/lightningnetwork/lnd/channeldb/migration24"
23
        "github.com/lightningnetwork/lnd/channeldb/migration25"
24
        "github.com/lightningnetwork/lnd/channeldb/migration26"
25
        "github.com/lightningnetwork/lnd/channeldb/migration27"
26
        "github.com/lightningnetwork/lnd/channeldb/migration29"
27
        "github.com/lightningnetwork/lnd/channeldb/migration30"
28
        "github.com/lightningnetwork/lnd/channeldb/migration31"
29
        "github.com/lightningnetwork/lnd/channeldb/migration32"
30
        "github.com/lightningnetwork/lnd/channeldb/migration33"
31
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
32
        "github.com/lightningnetwork/lnd/clock"
33
        graphdb "github.com/lightningnetwork/lnd/graph/db"
34
        "github.com/lightningnetwork/lnd/invoices"
35
        "github.com/lightningnetwork/lnd/kvdb"
36
        "github.com/lightningnetwork/lnd/lnwire"
37
        "github.com/stretchr/testify/require"
38
)
39

40
const (
41
        dbName = "channel.db"
42
)
43

44
var (
45
        // ErrDryRunMigrationOK signals that a migration executed successful,
46
        // but we intentionally did not commit the result.
47
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
48

49
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
50
        // bucket does not exist.
51
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
52
                "found")
53

54
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
55
        // final htlc outcomes does not exist.
56
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
57
                "bucket not found")
58
)
59

60
// migration is a function which takes a prior outdated version of the database
61
// instances and mutates the key/bucket structure to arrive at a more
62
// up-to-date version of the database.
63
type migration func(tx kvdb.RwTx) error
64

65
// mandatoryVersion defines a db version that must be applied before the lnd
66
// starts.
67
type mandatoryVersion struct {
68
        number    uint32
69
        migration migration
70
}
71

72
// MigrationConfig is an interface combines the config interfaces of all
73
// optional migrations.
74
type MigrationConfig interface {
75
        migration30.MigrateRevLogConfig
76
}
77

78
// MigrationConfigImpl is a super set of all the various migration configs and
79
// an implementation of MigrationConfig.
80
type MigrationConfigImpl struct {
81
        migration30.MigrateRevLogConfigImpl
82
}
83

84
// optionalMigration defines an optional migration function. When a migration
85
// is optional, it usually involves a large scale of changes that might touch
86
// millions of keys. Due to OOM concern, the update cannot be safely done
87
// within one db transaction. Thus, for optional migrations, they must take the
88
// db backend and construct transactions as needed.
89
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
90

91
// optionalVersion defines a db version that can be optionally applied. When
92
// applying migrations, we must apply all the mandatory migrations first before
93
// attempting optional ones.
94
type optionalVersion struct {
95
        name      string
96
        migration optionalMigration
97
}
98

99
var (
100
        // dbVersions is storing all mandatory versions of database. If current
101
        // version of database don't match with latest version this list will
102
        // be used for retrieving all migration function that are need to apply
103
        // to the current db.
104
        dbVersions = []mandatoryVersion{
105
                {
106
                        // The base DB version requires no migration.
107
                        number:    0,
108
                        migration: nil,
109
                },
110
                {
111
                        // The version of the database where two new indexes
112
                        // for the update time of node and channel updates were
113
                        // added.
114
                        number:    1,
115
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
116
                },
117
                {
118
                        // The DB version that added the invoice event time
119
                        // series.
120
                        number:    2,
121
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
122
                },
123
                {
124
                        // The DB version that updated the embedded invoice in
125
                        // outgoing payments to match the new format.
126
                        number:    3,
127
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
128
                },
129
                {
130
                        // The version of the database where every channel
131
                        // always has two entries in the edges bucket. If
132
                        // a policy is unknown, this will be represented
133
                        // by a special byte sequence.
134
                        number:    4,
135
                        migration: migration_01_to_11.MigrateEdgePolicies,
136
                },
137
                {
138
                        // The DB version where we persist each attempt to send
139
                        // an HTLC to a payment hash, and track whether the
140
                        // payment is in-flight, succeeded, or failed.
141
                        number:    5,
142
                        migration: migration_01_to_11.PaymentStatusesMigration,
143
                },
144
                {
145
                        // The DB version that properly prunes stale entries
146
                        // from the edge update index.
147
                        number:    6,
148
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
149
                },
150
                {
151
                        // The DB version that migrates the ChannelCloseSummary
152
                        // to a format where optional fields are indicated with
153
                        // boolean flags.
154
                        number:    7,
155
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
156
                },
157
                {
158
                        // The DB version that changes the gossiper's message
159
                        // store keys to account for the message's type and
160
                        // ShortChannelID.
161
                        number:    8,
162
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
163
                },
164
                {
165
                        // The DB version where the payments and payment
166
                        // statuses are moved to being stored in a combined
167
                        // bucket.
168
                        number:    9,
169
                        migration: migration_01_to_11.MigrateOutgoingPayments,
170
                },
171
                {
172
                        // The DB version where we started to store legacy
173
                        // payload information for all routes, as well as the
174
                        // optional TLV records.
175
                        number:    10,
176
                        migration: migration_01_to_11.MigrateRouteSerialization,
177
                },
178
                {
179
                        // Add invoice htlc and cltv delta fields.
180
                        number:    11,
181
                        migration: migration_01_to_11.MigrateInvoices,
182
                },
183
                {
184
                        // Migrate to TLV invoice bodies, add payment address
185
                        // and features, remove receipt.
186
                        number:    12,
187
                        migration: migration12.MigrateInvoiceTLV,
188
                },
189
                {
190
                        // Migrate to multi-path payments.
191
                        number:    13,
192
                        migration: migration13.MigrateMPP,
193
                },
194
                {
195
                        // Initialize payment address index and begin using it
196
                        // as the default index, falling back to payment hash
197
                        // index.
198
                        number:    14,
199
                        migration: mig.CreateTLB(payAddrIndexBucket),
200
                },
201
                {
202
                        // Initialize payment index bucket which will be used
203
                        // to index payments by sequence number. This index will
204
                        // be used to allow more efficient ListPayments queries.
205
                        number:    15,
206
                        migration: mig.CreateTLB(paymentsIndexBucket),
207
                },
208
                {
209
                        // Add our existing payments to the index bucket created
210
                        // in migration 15.
211
                        number:    16,
212
                        migration: migration16.MigrateSequenceIndex,
213
                },
214
                {
215
                        // Create a top level bucket which will store extra
216
                        // information about channel closes.
217
                        number:    17,
218
                        migration: mig.CreateTLB(closeSummaryBucket),
219
                },
220
                {
221
                        // Create a top level bucket which holds information
222
                        // about our peers.
223
                        number:    18,
224
                        migration: mig.CreateTLB(peersBucket),
225
                },
226
                {
227
                        // Create a top level bucket which holds outpoint
228
                        // information.
229
                        number:    19,
230
                        migration: mig.CreateTLB(outpointBucket),
231
                },
232
                {
233
                        // Migrate some data to the outpoint index.
234
                        number:    20,
235
                        migration: migration20.MigrateOutpointIndex,
236
                },
237
                {
238
                        // Migrate to length prefixed wire messages everywhere
239
                        // in the database.
240
                        number:    21,
241
                        migration: migration21.MigrateDatabaseWireMessages,
242
                },
243
                {
244
                        // Initialize set id index so that invoices can be
245
                        // queried by individual htlc sets.
246
                        number:    22,
247
                        migration: mig.CreateTLB(setIDIndexBucket),
248
                },
249
                {
250
                        number:    23,
251
                        migration: migration23.MigrateHtlcAttempts,
252
                },
253
                {
254
                        // Remove old forwarding packages of closed channels.
255
                        number:    24,
256
                        migration: migration24.MigrateFwdPkgCleanup,
257
                },
258
                {
259
                        // Save the initial local/remote balances in channel
260
                        // info.
261
                        number:    25,
262
                        migration: migration25.MigrateInitialBalances,
263
                },
264
                {
265
                        // Migrate the initial local/remote balance fields into
266
                        // tlv records.
267
                        number:    26,
268
                        migration: migration26.MigrateBalancesToTlvRecords,
269
                },
270
                {
271
                        // Patch the initial local/remote balance fields with
272
                        // empty values for historical channels.
273
                        number:    27,
274
                        migration: migration27.MigrateHistoricalBalances,
275
                },
276
                {
277
                        number:    28,
278
                        migration: mig.CreateTLB(chanIDBucket),
279
                },
280
                {
281
                        number:    29,
282
                        migration: migration29.MigrateChanID,
283
                },
284
                {
285
                        // Removes the "sweeper-last-tx" bucket. Although we
286
                        // do not have a mandatory version 30 we skip this
287
                        // version because its naming is already used for the
288
                        // first optional migration.
289
                        number:    31,
290
                        migration: migration31.DeleteLastPublishedTxTLB,
291
                },
292
                {
293
                        number:    32,
294
                        migration: migration32.MigrateMCRouteSerialisation,
295
                },
296
                {
297
                        number:    33,
298
                        migration: migration33.MigrateMCStoreNameSpacedResults,
299
                },
300
        }
301

302
        // optionalVersions stores all optional migrations that are applied
303
        // after dbVersions.
304
        //
305
        // NOTE: optional migrations must be fault-tolerant and re-run already
306
        // migrated data must be noop, which means the migration must be able
307
        // to determine its state.
308
        optionalVersions = []optionalVersion{
309
                {
310
                        name: "prune revocation log",
311
                        migration: func(db kvdb.Backend,
312
                                cfg MigrationConfig) error {
×
313

×
314
                                return migration30.MigrateRevocationLog(db, cfg)
×
315
                        },
×
316
                },
317
        }
318

319
        // Big endian is the preferred byte order, due to cursor scans over
320
        // integer keys iterating in order.
321
        byteOrder = binary.BigEndian
322

323
        // channelOpeningStateBucket is the database bucket used to store the
324
        // channelOpeningState for each channel that is currently in the process
325
        // of being opened.
326
        channelOpeningStateBucket = []byte("channelOpeningState")
327
)
328

329
// DB is the primary datastore for the lnd daemon. The database stores
330
// information related to nodes, routing data, open/closed channels, fee
331
// schedules, and reputation data.
332
type DB struct {
333
        kvdb.Backend
334

335
        // channelStateDB separates all DB operations on channel state.
336
        channelStateDB *ChannelStateDB
337

338
        dbPath                    string
339
        clock                     clock.Clock
340
        dryRun                    bool
341
        keepFailedPaymentAttempts bool
342
        storeFinalHtlcResolutions bool
343

344
        // noRevLogAmtData if true, means that commitment transaction amount
345
        // data should not be stored in the revocation log.
346
        noRevLogAmtData bool
347
}
348

349
// OpenForTesting opens or creates a channeldb to be used for tests. Any
350
// necessary schemas migrations due to updates will take place as necessary.
351
func OpenForTesting(t testing.TB, dbPath string,
352
        modifiers ...OptionModifier) *DB {
1,456✔
353

1,456✔
354
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
1,456✔
355
                DBPath:            dbPath,
1,456✔
356
                DBFileName:        dbName,
1,456✔
357
                NoFreelistSync:    true,
1,456✔
358
                AutoCompact:       false,
1,456✔
359
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
1,456✔
360
                DBTimeout:         kvdb.DefaultDBTimeout,
1,456✔
361
        })
1,456✔
362
        require.NoError(t, err)
1,456✔
363

1,456✔
364
        db, err := CreateWithBackend(backend, modifiers...)
1,456✔
365
        require.NoError(t, err)
1,456✔
366

1,456✔
367
        db.dbPath = dbPath
1,456✔
368

1,456✔
369
        t.Cleanup(func() {
2,912✔
370
                require.NoError(t, db.Close())
1,456✔
371
        })
1,456✔
372

373
        return db
1,456✔
374
}
375

376
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
377
// Any necessary schemas migrations due to updates will take place as necessary.
378
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
379
        error) {
1,749✔
380

1,749✔
381
        opts := DefaultOptions()
1,749✔
382
        for _, modifier := range modifiers {
1,926✔
383
                modifier(&opts)
177✔
384
        }
177✔
385

386
        if !opts.NoMigration {
3,498✔
387
                if err := initChannelDB(backend); err != nil {
1,750✔
388
                        return nil, err
1✔
389
                }
1✔
390
        }
391

392
        chanDB := &DB{
1,748✔
393
                Backend: backend,
1,748✔
394
                channelStateDB: &ChannelStateDB{
1,748✔
395
                        linkNodeDB: &LinkNodeDB{
1,748✔
396
                                backend: backend,
1,748✔
397
                        },
1,748✔
398
                        backend: backend,
1,748✔
399
                },
1,748✔
400
                clock:                     opts.clock,
1,748✔
401
                dryRun:                    opts.dryRun,
1,748✔
402
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
1,748✔
403
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
1,748✔
404
                noRevLogAmtData:           opts.NoRevLogAmtData,
1,748✔
405
        }
1,748✔
406

1,748✔
407
        // Set the parent pointer (only used in tests).
1,748✔
408
        chanDB.channelStateDB.parent = chanDB
1,748✔
409

1,748✔
410
        // Synchronize the version of database and apply migrations if needed.
1,748✔
411
        if !opts.NoMigration {
3,496✔
412
                if err := chanDB.syncVersions(dbVersions); err != nil {
1,749✔
413
                        backend.Close()
1✔
414
                        return nil, err
1✔
415
                }
1✔
416

417
                // Grab the optional migration config.
418
                omc := opts.OptionalMiragtionConfig
1,747✔
419
                if err := chanDB.applyOptionalVersions(omc); err != nil {
1,747✔
420
                        backend.Close()
×
421
                        return nil, err
×
422
                }
×
423
        }
424

425
        return chanDB, nil
1,747✔
426
}
427

428
// Path returns the file path to the channel database.
429
func (d *DB) Path() string {
197✔
430
        return d.dbPath
197✔
431
}
197✔
432

433
var dbTopLevelBuckets = [][]byte{
434
        openChannelBucket,
435
        closedChannelBucket,
436
        forwardingLogBucket,
437
        fwdPackagesKey,
438
        invoiceBucket,
439
        payAddrIndexBucket,
440
        setIDIndexBucket,
441
        paymentsIndexBucket,
442
        peersBucket,
443
        nodeInfoBucket,
444
        metaBucket,
445
        closeSummaryBucket,
446
        outpointBucket,
447
        chanIDBucket,
448
        historicalChannelBucket,
449
}
450

451
// Wipe completely deletes all saved state within all used buckets within the
452
// database. The deletion is done in a single transaction, therefore this
453
// operation is fully atomic.
454
func (d *DB) Wipe() error {
177✔
455
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
354✔
456
                for _, tlb := range dbTopLevelBuckets {
2,832✔
457
                        err := tx.DeleteTopLevelBucket(tlb)
2,655✔
458
                        if err != nil && err != kvdb.ErrBucketNotFound {
2,655✔
459
                                return err
×
460
                        }
×
461
                }
462
                return nil
177✔
463
        }, func() {})
177✔
464
        if err != nil {
177✔
465
                return err
×
466
        }
×
467

468
        return initChannelDB(d.Backend)
177✔
469
}
470

471
// initChannelDB creates and initializes a fresh version of channeldb. In the
472
// case that the target path has not yet been created or doesn't yet exist, then
473
// the path is created. Additionally, all required top-level buckets used within
474
// the database are created.
475
func initChannelDB(db kvdb.Backend) error {
1,926✔
476
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
3,852✔
477
                // Check if DB was marked as inactive with a tomb stone.
1,926✔
478
                if err := EnsureNoTombstone(tx); err != nil {
1,927✔
479
                        return err
1✔
480
                }
1✔
481

482
                meta := &Meta{}
1,925✔
483
                // Check if DB is already initialized.
1,925✔
484
                err := FetchMeta(meta, tx)
1,925✔
485
                if err == nil {
2,132✔
486
                        return nil
207✔
487
                }
207✔
488

489
                for _, tlb := range dbTopLevelBuckets {
27,488✔
490
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
25,770✔
491
                                return err
×
492
                        }
×
493
                }
494

495
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
1,718✔
496
                return putMeta(meta, tx)
1,718✔
497
        }, func() {})
1,926✔
498
        if err != nil {
1,927✔
499
                return fmt.Errorf("unable to create new channeldb: %w", err)
1✔
500
        }
1✔
501

502
        return nil
1,925✔
503
}
504

505
// fileExists returns true if the file exists, and false otherwise.
506
func fileExists(path string) bool {
1✔
507
        if _, err := os.Stat(path); err != nil {
1✔
508
                if os.IsNotExist(err) {
×
509
                        return false
×
510
                }
×
511
        }
512

513
        return true
1✔
514
}
515

516
// ChannelStateDB is a database that keeps track of all channel state.
517
type ChannelStateDB struct {
518
        // linkNodeDB separates all DB operations on LinkNodes.
519
        linkNodeDB *LinkNodeDB
520

521
        // parent holds a pointer to the "main" channeldb.DB object. This is
522
        // only used for testing and should never be used in production code.
523
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
524
        // this pointer.
525
        parent *DB
526

527
        // backend points to the actual backend holding the channel state
528
        // database. This may be a real backend or a cache middleware.
529
        backend kvdb.Backend
530
}
531

532
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
533
// ChannelStateDB instance. Use this function only in tests where passing around
534
// pointers makes testing less readable. Never to be used in production code!
535
func (c *ChannelStateDB) GetParentDB() *DB {
517✔
536
        return c.parent
517✔
537
}
517✔
538

539
// LinkNodeDB returns the current instance of the link node database.
540
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
×
541
        return c.linkNodeDB
×
542
}
×
543

544
// FetchOpenChannels starts a new database transaction and returns all stored
545
// currently active/open channels associated with the target nodeID. In the case
546
// that no active channels are known to have been created with this node, then a
547
// zero-length slice is returned.
548
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
549
        []*OpenChannel, error) {
252✔
550

252✔
551
        var channels []*OpenChannel
252✔
552
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
504✔
553
                var err error
252✔
554
                channels, err = c.fetchOpenChannels(tx, nodeID)
252✔
555
                return err
252✔
556
        }, func() {
504✔
557
                channels = nil
252✔
558
        })
252✔
559

560
        return channels, err
252✔
561
}
562

563
// fetchOpenChannels uses and existing database transaction and returns all
564
// stored currently active/open channels associated with the target nodeID. In
565
// the case that no active channels are known to have been created with this
566
// node, then a zero-length slice is returned.
567
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
568
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
259✔
569

259✔
570
        // Get the bucket dedicated to storing the metadata for open channels.
259✔
571
        openChanBucket := tx.ReadBucket(openChannelBucket)
259✔
572
        if openChanBucket == nil {
259✔
573
                return nil, nil
×
574
        }
×
575

576
        // Within this top level bucket, fetch the bucket dedicated to storing
577
        // open channel data specific to the remote node.
578
        pub := nodeID.SerializeCompressed()
259✔
579
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
259✔
580
        if nodeChanBucket == nil {
311✔
581
                return nil, nil
52✔
582
        }
52✔
583

584
        // Next, we'll need to go down an additional layer in order to retrieve
585
        // the channels for each chain the node knows of.
586
        var channels []*OpenChannel
207✔
587
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
414✔
588
                // If there's a value, it's not a bucket so ignore it.
207✔
589
                if v != nil {
207✔
590
                        return nil
×
591
                }
×
592

593
                // If we've found a valid chainhash bucket, then we'll retrieve
594
                // that so we can extract all the channels.
595
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
207✔
596
                if chainBucket == nil {
207✔
597
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
598
                                chainHash[:])
×
599
                }
×
600

601
                // Finally, we both of the necessary buckets retrieved, fetch
602
                // all the active channels related to this node.
603
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
207✔
604
                if err != nil {
207✔
605
                        return fmt.Errorf("unable to read channel for "+
×
606
                                "chain_hash=%x, node_key=%x: %v",
×
607
                                chainHash[:], pub, err)
×
608
                }
×
609

610
                channels = append(channels, nodeChannels...)
207✔
611
                return nil
207✔
612
        })
613

614
        return channels, err
207✔
615
}
616

617
// fetchNodeChannels retrieves all active channels from the target chainBucket
618
// which is under a node's dedicated channel bucket. This function is typically
619
// used to fetch all the active channels related to a particular node.
620
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
621
        []*OpenChannel, error) {
713✔
622

713✔
623
        var channels []*OpenChannel
713✔
624

713✔
625
        // A node may have channels on several chains, so for each known chain,
713✔
626
        // we'll extract all the channels.
713✔
627
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
1,484✔
628
                // If there's a value, it's not a bucket so ignore it.
771✔
629
                if v != nil {
771✔
630
                        return nil
×
631
                }
×
632

633
                // Once we've found a valid channel bucket, we'll extract it
634
                // from the node's chain bucket.
635
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
771✔
636

771✔
637
                var outPoint wire.OutPoint
771✔
638
                err := graphdb.ReadOutpoint(
771✔
639
                        bytes.NewReader(chanPoint), &outPoint,
771✔
640
                )
771✔
641
                if err != nil {
771✔
642
                        return err
×
643
                }
×
644
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
771✔
645
                if err != nil {
771✔
646
                        return fmt.Errorf("unable to read channel data for "+
×
647
                                "chan_point=%v: %w", outPoint, err)
×
648
                }
×
649
                oChannel.Db = c
771✔
650

771✔
651
                channels = append(channels, oChannel)
771✔
652

771✔
653
                return nil
771✔
654
        })
655
        if err != nil {
713✔
656
                return nil, err
×
657
        }
×
658

659
        return channels, nil
713✔
660
}
661

662
// FetchChannel attempts to locate a channel specified by the passed channel
663
// point. If the channel cannot be found, then an error will be returned.
664
func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel,
665
        error) {
10✔
666

10✔
667
        var targetChanPoint bytes.Buffer
10✔
668
        err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint)
10✔
669
        if err != nil {
10✔
670
                return nil, err
×
671
        }
×
672

673
        targetChanPointBytes := targetChanPoint.Bytes()
10✔
674
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
10✔
675
                error) {
20✔
676

10✔
677
                return targetChanPointBytes, &chanPoint, nil
10✔
678
        }
10✔
679

680
        return c.channelScanner(nil, selector)
10✔
681
}
682

683
// FetchChannelByID attempts to locate a channel specified by the passed channel
684
// ID. If the channel cannot be found, then an error will be returned.
685
// Optionally an existing db tx can be supplied.
686
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
687
        *OpenChannel, error) {
2✔
688

2✔
689
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
2✔
690
                error) {
4✔
691

2✔
692
                var (
2✔
693
                        targetChanPointBytes []byte
2✔
694
                        targetChanPoint      *wire.OutPoint
2✔
695

2✔
696
                        // errChanFound is used to signal that the channel has
2✔
697
                        // been found so that iteration through the DB buckets
2✔
698
                        // can stop.
2✔
699
                        errChanFound = errors.New("channel found")
2✔
700
                )
2✔
701
                err := chainBkt.ForEach(func(k, _ []byte) error {
4✔
702
                        var outPoint wire.OutPoint
2✔
703
                        err := graphdb.ReadOutpoint(
2✔
704
                                bytes.NewReader(k), &outPoint,
2✔
705
                        )
2✔
706
                        if err != nil {
2✔
707
                                return err
×
708
                        }
×
709

710
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
2✔
711
                        if chanID != id {
3✔
712
                                return nil
1✔
713
                        }
1✔
714

715
                        targetChanPoint = &outPoint
1✔
716
                        targetChanPointBytes = k
1✔
717

1✔
718
                        return errChanFound
1✔
719
                })
720
                if err != nil && !errors.Is(err, errChanFound) {
2✔
721
                        return nil, nil, err
×
722
                }
×
723
                if targetChanPoint == nil {
3✔
724
                        return nil, nil, ErrChannelNotFound
1✔
725
                }
1✔
726

727
                return targetChanPointBytes, targetChanPoint, nil
1✔
728
        }
729

730
        return c.channelScanner(tx, selector)
2✔
731
}
732

733
// ChanCount is used by the server in determining access control.
734
type ChanCount struct {
735
        HasOpenOrClosedChan bool
736
        PendingOpenCount    uint64
737
}
738

739
// FetchPermAndTempPeers returns a map where the key is the remote node's
740
// public key and the value is a struct that has a tally of the pending-open
741
// channels and whether the peer has an open or closed channel with us.
742
func (c *ChannelStateDB) FetchPermAndTempPeers(
743
        chainHash []byte) (map[string]ChanCount, error) {
1✔
744

1✔
745
        peerCounts := make(map[string]ChanCount)
1✔
746

1✔
747
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
2✔
748
                openChanBucket := tx.ReadBucket(openChannelBucket)
1✔
749
                if openChanBucket == nil {
1✔
750
                        return ErrNoChanDBExists
×
751
                }
×
752

753
                openChanErr := openChanBucket.ForEach(func(nodePub,
1✔
754
                        v []byte) error {
4✔
755

3✔
756
                        // If there is a value, this is not a bucket.
3✔
757
                        if v != nil {
3✔
758
                                return nil
×
759
                        }
×
760

761
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
762
                                nodePub,
3✔
763
                        )
3✔
764
                        if nodeChanBucket == nil {
3✔
765
                                return nil
×
766
                        }
×
767

768
                        chainBucket := nodeChanBucket.NestedReadBucket(
3✔
769
                                chainHash,
3✔
770
                        )
3✔
771
                        if chainBucket == nil {
3✔
772
                                return fmt.Errorf("no chain bucket exists")
×
773
                        }
×
774

775
                        var isPermPeer bool
3✔
776
                        var pendingOpenCount uint64
3✔
777

3✔
778
                        internalErr := chainBucket.ForEach(func(chanPoint,
3✔
779
                                val []byte) error {
5✔
780

2✔
781
                                // If there is a value, this is not a bucket.
2✔
782
                                if val != nil {
2✔
783
                                        return nil
×
784
                                }
×
785

786
                                chanBucket := chainBucket.NestedReadBucket(
2✔
787
                                        chanPoint,
2✔
788
                                )
2✔
789
                                if chanBucket == nil {
2✔
790
                                        return nil
×
791
                                }
×
792

793
                                var op wire.OutPoint
2✔
794
                                readErr := graphdb.ReadOutpoint(
2✔
795
                                        bytes.NewReader(chanPoint), &op,
2✔
796
                                )
2✔
797
                                if readErr != nil {
2✔
798
                                        return readErr
×
799
                                }
×
800

801
                                // We need to go through each channel and look
802
                                // at the IsPending status.
803
                                openChan, err := fetchOpenChannel(
2✔
804
                                        chanBucket, &op,
2✔
805
                                )
2✔
806
                                if err != nil {
2✔
807
                                        return err
×
808
                                }
×
809

810
                                if openChan.IsPending {
3✔
811
                                        // Add to the pending-open count since
1✔
812
                                        // this is a temp peer.
1✔
813
                                        pendingOpenCount++
1✔
814
                                        return nil
1✔
815
                                }
1✔
816

817
                                // Since IsPending is false, this is a perm
818
                                // peer.
819
                                isPermPeer = true
1✔
820

1✔
821
                                return nil
1✔
822
                        })
823
                        if internalErr != nil {
3✔
824
                                return internalErr
×
825
                        }
×
826

827
                        peerCount := ChanCount{
3✔
828
                                HasOpenOrClosedChan: isPermPeer,
3✔
829
                                PendingOpenCount:    pendingOpenCount,
3✔
830
                        }
3✔
831
                        peerCounts[string(nodePub)] = peerCount
3✔
832

3✔
833
                        return nil
3✔
834
                })
835
                if openChanErr != nil {
1✔
836
                        return openChanErr
×
837
                }
×
838

839
                // Now check the closed channel bucket.
840
                historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
1✔
841
                if historicalChanBucket == nil {
1✔
842
                        return ErrNoHistoricalBucket
×
843
                }
×
844

845
                historicalErr := historicalChanBucket.ForEach(func(chanPoint,
1✔
846
                        v []byte) error {
2✔
847
                        // Parse each nested bucket and the chanInfoKey to get
1✔
848
                        // the IsPending bool. This determines whether the
1✔
849
                        // peer is protected or not.
1✔
850
                        if v != nil {
1✔
851
                                // This is not a bucket. This is currently not
×
852
                                // possible.
×
853
                                return nil
×
854
                        }
×
855

856
                        chanBucket := historicalChanBucket.NestedReadBucket(
1✔
857
                                chanPoint,
1✔
858
                        )
1✔
859
                        if chanBucket == nil {
1✔
860
                                // This is not possible.
×
861
                                return fmt.Errorf("no historical channel " +
×
862
                                        "bucket exists")
×
863
                        }
×
864

865
                        var op wire.OutPoint
1✔
866
                        readErr := graphdb.ReadOutpoint(
1✔
867
                                bytes.NewReader(chanPoint), &op,
1✔
868
                        )
1✔
869
                        if readErr != nil {
1✔
870
                                return readErr
×
871
                        }
×
872

873
                        // This channel is closed, but the structure of the
874
                        // historical bucket is the same. This is by design,
875
                        // which means we can call fetchOpenChannel.
876
                        channel, fetchErr := fetchOpenChannel(chanBucket, &op)
1✔
877
                        if fetchErr != nil {
1✔
878
                                return fetchErr
×
879
                        }
×
880

881
                        // Only include this peer in the protected class if
882
                        // the closing transaction confirmed. Note that
883
                        // CloseChannel can be called in the funding manager
884
                        // while IsPending is true which is why we need this
885
                        // special-casing to not count premature funding
886
                        // manager calls to CloseChannel.
887
                        if !channel.IsPending {
2✔
888
                                // Fetch the public key of the remote node. We
1✔
889
                                // need to use the string-ified serialized,
1✔
890
                                // compressed bytes as the key.
1✔
891
                                remotePub := channel.IdentityPub
1✔
892
                                remoteSer := remotePub.SerializeCompressed()
1✔
893
                                remoteKey := string(remoteSer)
1✔
894

1✔
895
                                count, exists := peerCounts[remoteKey]
1✔
896
                                if exists {
2✔
897
                                        count.HasOpenOrClosedChan = true
1✔
898
                                        peerCounts[remoteKey] = count
1✔
899
                                } else {
1✔
900
                                        peerCount := ChanCount{
×
901
                                                HasOpenOrClosedChan: true,
×
902
                                        }
×
903
                                        peerCounts[remoteKey] = peerCount
×
904
                                }
×
905
                        }
906

907
                        return nil
1✔
908
                })
909
                if historicalErr != nil {
1✔
910
                        return historicalErr
×
911
                }
×
912

913
                return nil
1✔
914
        }, func() {
1✔
915
                clear(peerCounts)
1✔
916
        })
1✔
917

918
        return peerCounts, err
1✔
919
}
920

921
// channelSelector describes a function that takes a chain-hash bucket from
922
// within the open-channel DB and returns the wanted channel point bytes, and
923
// channel point. It must return the ErrChannelNotFound error if the wanted
924
// channel is not in the given bucket.
925
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
926
        error)
927

928
// channelScanner will traverse the DB to each chain-hash bucket of each node
929
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
930
// to fetch the wanted channel outpoint from the chain bucket.
931
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
932
        chanSelect channelSelector) (*OpenChannel, error) {
12✔
933

12✔
934
        var (
12✔
935
                targetChan *OpenChannel
12✔
936

12✔
937
                // errChanFound is used to signal that the channel has been
12✔
938
                // found so that iteration through the DB buckets can stop.
12✔
939
                errChanFound = errors.New("channel found")
12✔
940
        )
12✔
941

12✔
942
        // chanScan will traverse the following bucket structure:
12✔
943
        //  * nodePub => chainHash => chanPoint
12✔
944
        //
12✔
945
        // At each level we go one further, ensuring that we're traversing the
12✔
946
        // proper key (that's actually a bucket). By only reading the bucket
12✔
947
        // structure and skipping fully decoding each channel, we save a good
12✔
948
        // bit of CPU as we don't need to do things like decompress public
12✔
949
        // keys.
12✔
950
        chanScan := func(tx kvdb.RTx) error {
24✔
951
                // Get the bucket dedicated to storing the metadata for open
12✔
952
                // channels.
12✔
953
                openChanBucket := tx.ReadBucket(openChannelBucket)
12✔
954
                if openChanBucket == nil {
12✔
955
                        return ErrNoActiveChannels
×
956
                }
×
957

958
                // Within the node channel bucket, are the set of node pubkeys
959
                // we have channels with, we don't know the entire set, so we'll
960
                // check them all.
961
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
24✔
962
                        // Ensure that this is a key the same size as a pubkey,
12✔
963
                        // and also that it leads directly to a bucket.
12✔
964
                        if len(nodePub) != 33 || v != nil {
12✔
965
                                return nil
×
966
                        }
×
967

968
                        nodeChanBucket := openChanBucket.NestedReadBucket(
12✔
969
                                nodePub,
12✔
970
                        )
12✔
971
                        if nodeChanBucket == nil {
12✔
972
                                return nil
×
973
                        }
×
974

975
                        // The next layer down is all the chains that this node
976
                        // has channels on with us.
977
                        return nodeChanBucket.ForEach(func(chainHash,
12✔
978
                                v []byte) error {
24✔
979

12✔
980
                                // If there's a value, it's not a bucket so
12✔
981
                                // ignore it.
12✔
982
                                if v != nil {
12✔
983
                                        return nil
×
984
                                }
×
985

986
                                chainBucket := nodeChanBucket.NestedReadBucket(
12✔
987
                                        chainHash,
12✔
988
                                )
12✔
989
                                if chainBucket == nil {
12✔
990
                                        return fmt.Errorf("unable to read "+
×
991
                                                "bucket for chain=%x",
×
992
                                                chainHash)
×
993
                                }
×
994

995
                                // Finally, we reach the leaf bucket that stores
996
                                // all the chanPoints for this node.
997
                                targetChanBytes, chanPoint, err := chanSelect(
12✔
998
                                        chainBucket,
12✔
999
                                )
12✔
1000
                                if errors.Is(err, ErrChannelNotFound) {
13✔
1001
                                        return nil
1✔
1002
                                } else if err != nil {
12✔
1003
                                        return err
×
1004
                                }
×
1005

1006
                                chanBucket := chainBucket.NestedReadBucket(
11✔
1007
                                        targetChanBytes,
11✔
1008
                                )
11✔
1009
                                if chanBucket == nil {
15✔
1010
                                        return nil
4✔
1011
                                }
4✔
1012

1013
                                channel, err := fetchOpenChannel(
7✔
1014
                                        chanBucket, chanPoint,
7✔
1015
                                )
7✔
1016
                                if err != nil {
7✔
1017
                                        return err
×
1018
                                }
×
1019

1020
                                targetChan = channel
7✔
1021
                                targetChan.Db = c
7✔
1022

7✔
1023
                                return errChanFound
7✔
1024
                        })
1025
                })
1026
        }
1027

1028
        var err error
12✔
1029
        if tx == nil {
24✔
1030
                err = kvdb.View(c.backend, chanScan, func() {})
24✔
1031
        } else {
×
1032
                err = chanScan(tx)
×
1033
        }
×
1034
        if err != nil && !errors.Is(err, errChanFound) {
12✔
1035
                return nil, err
×
1036
        }
×
1037

1038
        if targetChan != nil {
19✔
1039
                return targetChan, nil
7✔
1040
        }
7✔
1041

1042
        // If we can't find the channel, then we return with an error, as we
1043
        // have nothing to back up.
1044
        return nil, ErrChannelNotFound
5✔
1045
}
1046

1047
// FetchAllChannels attempts to retrieve all open channels currently stored
1048
// within the database, including pending open, fully open and channels waiting
1049
// for a closing transaction to confirm.
1050
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
563✔
1051
        return fetchChannels(c)
563✔
1052
}
563✔
1053

1054
// FetchAllOpenChannels will return all channels that have the funding
1055
// transaction confirmed, and is not waiting for a closing transaction to be
1056
// confirmed.
1057
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
405✔
1058
        return fetchChannels(
405✔
1059
                c,
405✔
1060
                pendingChannelFilter(false),
405✔
1061
                waitingCloseFilter(false),
405✔
1062
        )
405✔
1063
}
405✔
1064

1065
// FetchPendingChannels will return channels that have completed the process of
1066
// generating and broadcasting funding transactions, but whose funding
1067
// transactions have yet to be confirmed on the blockchain.
1068
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
79✔
1069
        return fetchChannels(c,
79✔
1070
                pendingChannelFilter(true),
79✔
1071
                waitingCloseFilter(false),
79✔
1072
        )
79✔
1073
}
79✔
1074

1075
// FetchWaitingCloseChannels will return all channels that have been opened,
1076
// but are now waiting for a closing transaction to be confirmed.
1077
//
1078
// NOTE: This includes channels that are also pending to be opened.
1079
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
1✔
1080
        return fetchChannels(
1✔
1081
                c, waitingCloseFilter(true),
1✔
1082
        )
1✔
1083
}
1✔
1084

1085
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
1086
// A set of filters can be combined to filter across multiple dimensions.
1087
type fetchChannelsFilter func(channel *OpenChannel) bool
1088

1089
// pendingChannelFilter returns a filter based on whether channels are pending
1090
// (ie, their funding transaction still needs to confirm). If pending is false,
1091
// channels with confirmed funding transactions are returned.
1092
func pendingChannelFilter(pending bool) fetchChannelsFilter {
493✔
1093
        return func(channel *OpenChannel) bool {
715✔
1094
                return channel.IsPending == pending
222✔
1095
        }
222✔
1096
}
1097

1098
// waitingCloseFilter returns a filter which filters channels based on whether
1099
// they are awaiting the confirmation of their closing transaction. If waiting
1100
// close is true, channels that have had their closing tx broadcast are
1101
// included. If it is false, channels that are not awaiting confirmation of
1102
// their close transaction are returned.
1103
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
491✔
1104
        return func(channel *OpenChannel) bool {
699✔
1105
                // If the channel is in any other state than Default,
208✔
1106
                // then it means it is waiting to be closed.
208✔
1107
                channelWaitingClose :=
208✔
1108
                        channel.ChanStatus() != ChanStatusDefault
208✔
1109

208✔
1110
                // Include the channel if it matches the value for
208✔
1111
                // waiting close that we are filtering on.
208✔
1112
                return channelWaitingClose == waitingClose
208✔
1113
        }
208✔
1114
}
1115

1116
// fetchChannels attempts to retrieve channels currently stored in the
1117
// database. It takes a set of filters which are applied to each channel to
1118
// obtain a set of channels with the desired set of properties. Only channels
1119
// which have a true value returned for *all* of the filters will be returned.
1120
// If no filters are provided, every channel in the open channels bucket will
1121
// be returned.
1122
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
1123
        []*OpenChannel, error) {
1,060✔
1124

1,060✔
1125
        var channels []*OpenChannel
1,060✔
1126

1,060✔
1127
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
2,120✔
1128
                // Get the bucket dedicated to storing the metadata for open
1,060✔
1129
                // channels.
1,060✔
1130
                openChanBucket := tx.ReadBucket(openChannelBucket)
1,060✔
1131
                if openChanBucket == nil {
1,060✔
1132
                        return ErrNoActiveChannels
×
1133
                }
×
1134

1135
                // Next, fetch the bucket dedicated to storing metadata related
1136
                // to all nodes. All keys within this bucket are the serialized
1137
                // public keys of all our direct counterparties.
1138
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
1,060✔
1139
                if nodeMetaBucket == nil {
1,060✔
1140
                        return fmt.Errorf("node bucket not created")
×
1141
                }
×
1142

1143
                // Finally for each node public key in the bucket, fetch all
1144
                // the channels related to this particular node.
1145
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
1,566✔
1146
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
506✔
1147
                        if nodeChanBucket == nil {
506✔
1148
                                return nil
×
1149
                        }
×
1150

1151
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
1,012✔
1152
                                // If there's a value, it's not a bucket so
506✔
1153
                                // ignore it.
506✔
1154
                                if v != nil {
506✔
1155
                                        return nil
×
1156
                                }
×
1157

1158
                                // If we've found a valid chainhash bucket,
1159
                                // then we'll retrieve that so we can extract
1160
                                // all the channels.
1161
                                chainBucket := nodeChanBucket.NestedReadBucket(
506✔
1162
                                        chainHash,
506✔
1163
                                )
506✔
1164
                                if chainBucket == nil {
506✔
1165
                                        return fmt.Errorf("unable to read "+
×
1166
                                                "bucket for chain=%x", chainHash[:])
×
1167
                                }
×
1168

1169
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
506✔
1170
                                if err != nil {
506✔
1171
                                        return fmt.Errorf("unable to read "+
×
1172
                                                "channel for chain_hash=%x, "+
×
1173
                                                "node_key=%x: %v", chainHash[:], k, err)
×
1174
                                }
×
1175
                                for _, channel := range nodeChans {
1,049✔
1176
                                        // includeChannel indicates whether the channel
543✔
1177
                                        // meets the criteria specified by our filters.
543✔
1178
                                        includeChannel := true
543✔
1179

543✔
1180
                                        // Run through each filter and check whether the
543✔
1181
                                        // channel should be included.
543✔
1182
                                        for _, f := range filters {
973✔
1183
                                                // If the channel fails the filter, set
430✔
1184
                                                // includeChannel to false and don't bother
430✔
1185
                                                // checking the remaining filters.
430✔
1186
                                                if !f(channel) {
455✔
1187
                                                        includeChannel = false
25✔
1188
                                                        break
25✔
1189
                                                }
1190
                                        }
1191

1192
                                        // If the channel passed every filter, include it in
1193
                                        // our set of channels.
1194
                                        if includeChannel {
1,061✔
1195
                                                channels = append(channels, channel)
518✔
1196
                                        }
518✔
1197
                                }
1198
                                return nil
506✔
1199
                        })
1200

1201
                })
1202
        }, func() {
1,060✔
1203
                channels = nil
1,060✔
1204
        })
1,060✔
1205
        if err != nil {
1,060✔
1206
                return nil, err
×
1207
        }
×
1208

1209
        return channels, nil
1,060✔
1210
}
1211

1212
// FetchClosedChannels attempts to fetch all closed channels from the database.
1213
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1214
// be returned in the response or not. When a channel was cooperatively closed,
1215
// it becomes fully closed after a single confirmation.  When a channel was
1216
// forcibly closed, it will become fully closed after _all_ the pending funds
1217
// (if any) have been swept.
1218
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1219
        []*ChannelCloseSummary, error) {
500✔
1220

500✔
1221
        var chanSummaries []*ChannelCloseSummary
500✔
1222

500✔
1223
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1,000✔
1224
                closeBucket := tx.ReadBucket(closedChannelBucket)
500✔
1225
                if closeBucket == nil {
500✔
1226
                        return ErrNoClosedChannels
×
1227
                }
×
1228

1229
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
520✔
1230
                        summaryReader := bytes.NewReader(summaryBytes)
20✔
1231
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
20✔
1232
                        if err != nil {
20✔
1233
                                return err
×
1234
                        }
×
1235

1236
                        // If the query specified to only include pending
1237
                        // channels, then we'll skip any channels which aren't
1238
                        // currently pending.
1239
                        if !chanSummary.IsPending && pendingOnly {
21✔
1240
                                return nil
1✔
1241
                        }
1✔
1242

1243
                        chanSummaries = append(chanSummaries, chanSummary)
19✔
1244
                        return nil
19✔
1245
                })
1246
        }, func() {
500✔
1247
                chanSummaries = nil
500✔
1248
        }); err != nil {
500✔
1249
                return nil, err
×
1250
        }
×
1251

1252
        return chanSummaries, nil
500✔
1253
}
1254

1255
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1256
// the channeldb.
1257
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1258

1259
// FetchClosedChannel queries for a channel close summary using the channel
1260
// point of the channel in question.
1261
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1262
        *ChannelCloseSummary, error) {
3✔
1263

3✔
1264
        var chanSummary *ChannelCloseSummary
3✔
1265
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1266
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1267
                if closeBucket == nil {
3✔
1268
                        return ErrClosedChannelNotFound
×
1269
                }
×
1270

1271
                var b bytes.Buffer
3✔
1272
                var err error
3✔
1273
                if err = graphdb.WriteOutpoint(&b, chanID); err != nil {
3✔
1274
                        return err
×
1275
                }
×
1276

1277
                summaryBytes := closeBucket.Get(b.Bytes())
3✔
1278
                if summaryBytes == nil {
4✔
1279
                        return ErrClosedChannelNotFound
1✔
1280
                }
1✔
1281

1282
                summaryReader := bytes.NewReader(summaryBytes)
2✔
1283
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
2✔
1284

2✔
1285
                return err
2✔
1286
        }, func() {
3✔
1287
                chanSummary = nil
3✔
1288
        }); err != nil {
4✔
1289
                return nil, err
1✔
1290
        }
1✔
1291

1292
        return chanSummary, nil
2✔
1293
}
1294

1295
// FetchClosedChannelForID queries for a channel close summary using the
1296
// channel ID of the channel in question.
1297
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1298
        *ChannelCloseSummary, error) {
102✔
1299

102✔
1300
        var chanSummary *ChannelCloseSummary
102✔
1301
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
204✔
1302
                closeBucket := tx.ReadBucket(closedChannelBucket)
102✔
1303
                if closeBucket == nil {
102✔
1304
                        return ErrClosedChannelNotFound
×
1305
                }
×
1306

1307
                // The first 30 bytes of the channel ID and outpoint will be
1308
                // equal.
1309
                cursor := closeBucket.ReadCursor()
102✔
1310
                op, c := cursor.Seek(cid[:30])
102✔
1311

102✔
1312
                // We scan over all possible candidates for this channel ID.
102✔
1313
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
5,354✔
1314
                        var outPoint wire.OutPoint
5,252✔
1315
                        err := graphdb.ReadOutpoint(
5,252✔
1316
                                bytes.NewReader(op), &outPoint,
5,252✔
1317
                        )
5,252✔
1318
                        if err != nil {
5,252✔
1319
                                return err
×
1320
                        }
×
1321

1322
                        // If the found outpoint does not correspond to this
1323
                        // channel ID, we continue.
1324
                        if !cid.IsChanPoint(&outPoint) {
10,403✔
1325
                                continue
5,151✔
1326
                        }
1327

1328
                        // Deserialize the close summary and return.
1329
                        r := bytes.NewReader(c)
101✔
1330
                        chanSummary, err = deserializeCloseChannelSummary(r)
101✔
1331
                        if err != nil {
101✔
1332
                                return err
×
1333
                        }
×
1334

1335
                        return nil
101✔
1336
                }
1337
                return ErrClosedChannelNotFound
1✔
1338
        }, func() {
102✔
1339
                chanSummary = nil
102✔
1340
        }); err != nil {
103✔
1341
                return nil, err
1✔
1342
        }
1✔
1343

1344
        return chanSummary, nil
101✔
1345
}
1346

1347
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1348
// channel should be marked as fully closed if the channel was initially
1349
// cooperatively closed and it's reached a single confirmation, or after all
1350
// the pending funds in a channel that has been forcibly closed have been
1351
// swept.
1352
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
7✔
1353
        var (
7✔
1354
                openChannels  []*OpenChannel
7✔
1355
                pruneLinkNode *btcec.PublicKey
7✔
1356
        )
7✔
1357
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
14✔
1358
                var b bytes.Buffer
7✔
1359
                if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
7✔
1360
                        return err
×
1361
                }
×
1362

1363
                chanID := b.Bytes()
7✔
1364

7✔
1365
                closedChanBucket, err := tx.CreateTopLevelBucket(
7✔
1366
                        closedChannelBucket,
7✔
1367
                )
7✔
1368
                if err != nil {
7✔
1369
                        return err
×
1370
                }
×
1371

1372
                chanSummaryBytes := closedChanBucket.Get(chanID)
7✔
1373
                if chanSummaryBytes == nil {
7✔
1374
                        return fmt.Errorf("no closed channel for "+
×
1375
                                "chan_point=%v found", chanPoint)
×
1376
                }
×
1377

1378
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
7✔
1379
                chanSummary, err := deserializeCloseChannelSummary(
7✔
1380
                        chanSummaryReader,
7✔
1381
                )
7✔
1382
                if err != nil {
7✔
1383
                        return err
×
1384
                }
×
1385

1386
                chanSummary.IsPending = false
7✔
1387

7✔
1388
                var newSummary bytes.Buffer
7✔
1389
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
7✔
1390
                if err != nil {
7✔
1391
                        return err
×
1392
                }
×
1393

1394
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
7✔
1395
                if err != nil {
7✔
1396
                        return err
×
1397
                }
×
1398

1399
                // Now that the channel is closed, we'll check if we have any
1400
                // other open channels with this peer. If we don't we'll
1401
                // garbage collect it to ensure we don't establish persistent
1402
                // connections to peers without open channels.
1403
                pruneLinkNode = chanSummary.RemotePub
7✔
1404
                openChannels, err = c.fetchOpenChannels(
7✔
1405
                        tx, pruneLinkNode,
7✔
1406
                )
7✔
1407
                if err != nil {
7✔
1408
                        return fmt.Errorf("unable to fetch open channels for "+
×
1409
                                "peer %x: %v",
×
1410
                                pruneLinkNode.SerializeCompressed(), err)
×
1411
                }
×
1412

1413
                return nil
7✔
1414
        }, func() {
7✔
1415
                openChannels = nil
7✔
1416
                pruneLinkNode = nil
7✔
1417
        })
7✔
1418
        if err != nil {
7✔
1419
                return err
×
1420
        }
×
1421

1422
        // Decide whether we want to remove the link node, based upon the number
1423
        // of still open channels.
1424
        return c.pruneLinkNode(openChannels, pruneLinkNode)
7✔
1425
}
1426

1427
// pruneLinkNode determines whether we should garbage collect a link node from
1428
// the database due to no longer having any open channels with it. If there are
1429
// any left, then this acts as a no-op.
1430
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1431
        remotePub *btcec.PublicKey) error {
7✔
1432

7✔
1433
        if len(openChannels) > 0 {
7✔
1434
                return nil
×
1435
        }
×
1436

1437
        log.Infof("Pruning link node %x with zero open channels from database",
7✔
1438
                remotePub.SerializeCompressed())
7✔
1439

7✔
1440
        return c.linkNodeDB.DeleteLinkNode(remotePub)
7✔
1441
}
1442

1443
// PruneLinkNodes attempts to prune all link nodes found within the database
1444
// with whom we no longer have any open channels with.
1445
func (c *ChannelStateDB) PruneLinkNodes() error {
×
1446
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
×
1447
        if err != nil {
×
1448
                return err
×
1449
        }
×
1450

1451
        for _, linkNode := range allLinkNodes {
×
1452
                var (
×
1453
                        openChannels []*OpenChannel
×
1454
                        linkNode     = linkNode
×
1455
                )
×
1456
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
×
1457
                        var err error
×
1458
                        openChannels, err = c.fetchOpenChannels(
×
1459
                                tx, linkNode.IdentityPub,
×
1460
                        )
×
1461
                        return err
×
1462
                }, func() {
×
1463
                        openChannels = nil
×
1464
                })
×
1465
                if err != nil {
×
1466
                        return err
×
1467
                }
×
1468

1469
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
×
1470
                if err != nil {
×
1471
                        return err
×
1472
                }
×
1473
        }
1474

1475
        return nil
×
1476
}
1477

1478
// ChannelShell is a shell of a channel that is meant to be used for channel
1479
// recovery purposes. It contains a minimal OpenChannel instance along with
1480
// addresses for that target node.
1481
type ChannelShell struct {
1482
        // NodeAddrs the set of addresses that this node has known to be
1483
        // reachable at in the past.
1484
        NodeAddrs []net.Addr
1485

1486
        // Chan is a shell of an OpenChannel, it contains only the items
1487
        // required to restore the channel on disk.
1488
        Chan *OpenChannel
1489
}
1490

1491
// RestoreChannelShells is a method that allows the caller to reconstruct the
1492
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1493
// new channel to disk, create a LinkNode instance with the passed node
1494
// addresses, and finally create an edge within the graph for the channel as
1495
// well. This method is idempotent, so repeated calls with the same set of
1496
// channel shells won't modify the database after the initial call.
1497
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
1✔
1498
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2✔
1499
                for _, channelShell := range channelShells {
2✔
1500
                        channel := channelShell.Chan
1✔
1501

1✔
1502
                        // When we make a channel, we mark that the channel has
1✔
1503
                        // been restored, this will signal to other sub-systems
1✔
1504
                        // to not attempt to use the channel as if it was a
1✔
1505
                        // regular one.
1✔
1506
                        channel.chanStatus |= ChanStatusRestored
1✔
1507

1✔
1508
                        // First, we'll attempt to create a new open channel
1✔
1509
                        // and link node for this channel. If the channel
1✔
1510
                        // already exists, then in order to ensure this method
1✔
1511
                        // is idempotent, we'll continue to the next step.
1✔
1512
                        channel.Db = c
1✔
1513
                        err := syncNewChannel(
1✔
1514
                                tx, channel, channelShell.NodeAddrs,
1✔
1515
                        )
1✔
1516
                        if err != nil {
1✔
1517
                                return err
×
1518
                        }
×
1519
                }
1520

1521
                return nil
1✔
1522
        }, func() {})
1✔
1523
        if err != nil {
1✔
1524
                return err
×
1525
        }
×
1526

1527
        return nil
1✔
1528
}
1529

1530
// AddrsForNode consults the channel database for all addresses known to the
1531
// passed node public key. The returned boolean indicates if the given node is
1532
// unknown to the channel DB or not.
1533
//
1534
// NOTE: this is part of the AddrSource interface.
1535
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr, error) {
1✔
1536
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
1✔
1537
        // Only if the error is something other than ErrNodeNotFound do we
1✔
1538
        // return it.
1✔
1539
        switch {
1✔
1540
        case err != nil && !errors.Is(err, ErrNodeNotFound):
×
1541
                return false, nil, err
×
1542

1543
        case errors.Is(err, ErrNodeNotFound):
×
1544
                return false, nil, nil
×
1545
        }
1546

1547
        return true, linkNode.Addresses, nil
1✔
1548
}
1549

1550
// AbandonChannel attempts to remove the target channel from the open channel
1551
// database. If the channel was already removed (has a closed channel entry),
1552
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1553
// into the database.
1554
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1555
        bestHeight uint32) error {
4✔
1556

4✔
1557
        // With the chanPoint constructed, we'll attempt to find the target
4✔
1558
        // channel in the database. If we can't find the channel, then we'll
4✔
1559
        // return the error back to the caller.
4✔
1560
        dbChan, err := c.FetchChannel(*chanPoint)
4✔
1561
        switch {
4✔
1562
        // If the channel wasn't found, then it's possible that it was already
1563
        // abandoned from the database.
1564
        case err == ErrChannelNotFound:
2✔
1565
                _, closedErr := c.FetchClosedChannel(chanPoint)
2✔
1566
                if closedErr != nil {
3✔
1567
                        return closedErr
1✔
1568
                }
1✔
1569

1570
                // If the channel was already closed, then we don't return an
1571
                // error as we'd like this step to be repeatable.
1572
                return nil
1✔
1573
        case err != nil:
×
1574
                return err
×
1575
        }
1576

1577
        // Now that we've found the channel, we'll populate a close summary for
1578
        // the channel, so we can store as much information for this abounded
1579
        // channel as possible. We also ensure that we set Pending to false, to
1580
        // indicate that this channel has been "fully" closed.
1581
        summary := &ChannelCloseSummary{
2✔
1582
                CloseType:               Abandoned,
2✔
1583
                ChanPoint:               *chanPoint,
2✔
1584
                ChainHash:               dbChan.ChainHash,
2✔
1585
                CloseHeight:             bestHeight,
2✔
1586
                RemotePub:               dbChan.IdentityPub,
2✔
1587
                Capacity:                dbChan.Capacity,
2✔
1588
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
2✔
1589
                ShortChanID:             dbChan.ShortChanID(),
2✔
1590
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
2✔
1591
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
2✔
1592
                LocalChanConfig:         dbChan.LocalChanCfg,
2✔
1593
        }
2✔
1594

2✔
1595
        // Finally, we'll close the channel in the DB, and return back to the
2✔
1596
        // caller. We set ourselves as the close initiator because we abandoned
2✔
1597
        // the channel.
2✔
1598
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
2✔
1599
}
1600

1601
// SaveChannelOpeningState saves the serialized channel state for the provided
1602
// chanPoint to the channelOpeningStateBucket.
1603
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1604
        serializedState []byte) error {
92✔
1605

92✔
1606
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
184✔
1607
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
92✔
1608
                if err != nil {
92✔
1609
                        return err
×
1610
                }
×
1611

1612
                return bucket.Put(outPoint, serializedState)
92✔
1613
        }, func() {})
92✔
1614
}
1615

1616
// GetChannelOpeningState fetches the serialized channel state for the provided
1617
// outPoint from the database, or returns ErrChannelNotFound if the channel
1618
// is not found.
1619
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1620
        error) {
249✔
1621

249✔
1622
        var serializedState []byte
249✔
1623
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
498✔
1624
                bucket := tx.ReadBucket(channelOpeningStateBucket)
249✔
1625
                if bucket == nil {
249✔
1626
                        // If the bucket does not exist, it means we never added
×
1627
                        //  a channel to the db, so return ErrChannelNotFound.
×
1628
                        return ErrChannelNotFound
×
1629
                }
×
1630

1631
                stateBytes := bucket.Get(outPoint)
249✔
1632
                if stateBytes == nil {
296✔
1633
                        return ErrChannelNotFound
47✔
1634
                }
47✔
1635

1636
                serializedState = append(serializedState, stateBytes...)
202✔
1637

202✔
1638
                return nil
202✔
1639
        }, func() {
249✔
1640
                serializedState = nil
249✔
1641
        })
249✔
1642
        return serializedState, err
249✔
1643
}
1644

1645
// DeleteChannelOpeningState removes any state for outPoint from the database.
1646
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
24✔
1647
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
48✔
1648
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
24✔
1649
                if bucket == nil {
24✔
1650
                        return ErrChannelNotFound
×
1651
                }
×
1652

1653
                return bucket.Delete(outPoint)
24✔
1654
        }, func() {})
24✔
1655
}
1656

1657
// syncVersions function is used for safe db version synchronization. It
1658
// applies migration functions to the current database and recovers the
1659
// previous state of db if at least one error/panic appeared during migration.
1660
func (d *DB) syncVersions(versions []mandatoryVersion) error {
1,752✔
1661
        meta, err := d.FetchMeta()
1,752✔
1662
        if err != nil {
1,752✔
1663
                if err == ErrMetaNotFound {
×
1664
                        meta = &Meta{}
×
1665
                } else {
×
1666
                        return err
×
1667
                }
×
1668
        }
1669

1670
        latestVersion := getLatestDBVersion(versions)
1,752✔
1671
        log.Infof("Checking for schema update: latest_version=%v, "+
1,752✔
1672
                "db_version=%v", latestVersion, meta.DbVersionNumber)
1,752✔
1673

1,752✔
1674
        switch {
1,752✔
1675

1676
        // If the database reports a higher version that we are aware of, the
1677
        // user is probably trying to revert to a prior version of lnd. We fail
1678
        // here to prevent reversions and unintended corruption.
1679
        case meta.DbVersionNumber > latestVersion:
1✔
1680
                log.Errorf("Refusing to revert from db_version=%d to "+
1✔
1681
                        "lower version=%d", meta.DbVersionNumber,
1✔
1682
                        latestVersion)
1✔
1683
                return ErrDBReversion
1✔
1684

1685
        // If the current database version matches the latest version number,
1686
        // then we don't need to perform any migrations.
1687
        case meta.DbVersionNumber == latestVersion:
1,747✔
1688
                return nil
1,747✔
1689
        }
1690

1691
        log.Infof("Performing database schema migration")
4✔
1692

4✔
1693
        // Otherwise, we fetch the migrations which need to applied, and
4✔
1694
        // execute them serially within a single database transaction to ensure
4✔
1695
        // the migration is atomic.
4✔
1696
        migrations, migrationVersions := getMigrationsToApply(
4✔
1697
                versions, meta.DbVersionNumber,
4✔
1698
        )
4✔
1699
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
8✔
1700
                for i, migration := range migrations {
8✔
1701
                        if migration == nil {
4✔
1702
                                continue
×
1703
                        }
1704

1705
                        log.Infof("Applying migration #%v",
4✔
1706
                                migrationVersions[i])
4✔
1707

4✔
1708
                        if err := migration(tx); err != nil {
5✔
1709
                                log.Infof("Unable to apply migration #%v",
1✔
1710
                                        migrationVersions[i])
1✔
1711
                                return err
1✔
1712
                        }
1✔
1713
                }
1714

1715
                meta.DbVersionNumber = latestVersion
2✔
1716
                err := putMeta(meta, tx)
2✔
1717
                if err != nil {
2✔
1718
                        return err
×
1719
                }
×
1720

1721
                // In dry-run mode, return an error to prevent the transaction
1722
                // from committing.
1723
                if d.dryRun {
3✔
1724
                        return ErrDryRunMigrationOK
1✔
1725
                }
1✔
1726

1727
                return nil
1✔
1728
        }, func() {})
4✔
1729
}
1730

1731
// applyOptionalVersions takes a config to determine whether the optional
1732
// migrations will be applied.
1733
//
1734
// NOTE: only support the prune_revocation_log optional migration atm.
1735
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
1,750✔
1736
        // TODO(yy): need to design the db to support dry run for optional
1,750✔
1737
        // migrations.
1,750✔
1738
        if d.dryRun {
1,751✔
1739
                log.Info("Skipped optional migrations as dry run mode is not " +
1✔
1740
                        "supported yet")
1✔
1741
                return nil
1✔
1742
        }
1✔
1743

1744
        om, err := d.fetchOptionalMeta()
1,749✔
1745
        if err != nil {
1,749✔
1746
                if err == ErrMetaNotFound {
×
1747
                        om = &OptionalMeta{
×
1748
                                Versions: make(map[uint64]string),
×
1749
                        }
×
1750
                } else {
×
1751
                        return err
×
1752
                }
×
1753
        }
1754

1755
        log.Infof("Checking for optional update: prune_revocation_log=%v, "+
1,749✔
1756
                "db_version=%s", cfg.PruneRevocationLog, om)
1,749✔
1757

1,749✔
1758
        // Exit early if the optional migration is not specified.
1,749✔
1759
        if !cfg.PruneRevocationLog {
3,496✔
1760
                return nil
1,747✔
1761
        }
1,747✔
1762

1763
        // Exit early if the optional migration has already been applied.
1764
        if _, ok := om.Versions[0]; ok {
3✔
1765
                return nil
1✔
1766
        }
1✔
1767

1768
        // Get the optional version.
1769
        version := optionalVersions[0]
1✔
1770
        log.Infof("Performing database optional migration: %s", version.name)
1✔
1771

1✔
1772
        migrationCfg := &MigrationConfigImpl{
1✔
1773
                migration30.MigrateRevLogConfigImpl{
1✔
1774
                        NoAmountData: d.noRevLogAmtData,
1✔
1775
                },
1✔
1776
        }
1✔
1777

1✔
1778
        // Migrate the data.
1✔
1779
        if err := version.migration(d, migrationCfg); err != nil {
1✔
1780
                log.Errorf("Unable to apply optional migration: %s, error: %v",
×
1781
                        version.name, err)
×
1782
                return err
×
1783
        }
×
1784

1785
        // Update the optional meta. Notice that unlike the mandatory db
1786
        // migrations where we perform the migration and updating meta in a
1787
        // single db transaction, we use different transactions here. Even when
1788
        // the following update is failed, we should be fine here as we would
1789
        // re-run the optional migration again, which is a noop, during next
1790
        // startup.
1791
        om.Versions[0] = version.name
1✔
1792
        if err := d.putOptionalMeta(om); err != nil {
1✔
1793
                log.Errorf("Unable to update optional meta: %v", err)
×
1794
                return err
×
1795
        }
×
1796

1797
        return nil
1✔
1798
}
1799

1800
// ChannelStateDB returns the sub database that is concerned with the channel
1801
// state.
1802
func (d *DB) ChannelStateDB() *ChannelStateDB {
2,168✔
1803
        return d.channelStateDB
2,168✔
1804
}
2,168✔
1805

1806
// LatestDBVersion returns the number of the latest database version currently
1807
// known to the channel DB.
1808
func LatestDBVersion() uint32 {
1✔
1809
        return getLatestDBVersion(dbVersions)
1✔
1810
}
1✔
1811

1812
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
3,474✔
1813
        return versions[len(versions)-1].number
3,474✔
1814
}
3,474✔
1815

1816
// getMigrationsToApply retrieves the migration function that should be
1817
// applied to the database.
1818
func getMigrationsToApply(versions []mandatoryVersion,
1819
        version uint32) ([]migration, []uint32) {
5✔
1820

5✔
1821
        migrations := make([]migration, 0, len(versions))
5✔
1822
        migrationVersions := make([]uint32, 0, len(versions))
5✔
1823

5✔
1824
        for _, v := range versions {
17✔
1825
                if v.number > version {
18✔
1826
                        migrations = append(migrations, v.migration)
6✔
1827
                        migrationVersions = append(migrationVersions, v.number)
6✔
1828
                }
6✔
1829
        }
1830

1831
        return migrations, migrationVersions
5✔
1832
}
1833

1834
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1835
// from the historical channel bucket. If the bucket does not exist,
1836
// ErrNoHistoricalBucket is returned.
1837
func fetchHistoricalChanBucket(tx kvdb.RTx,
1838
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
4✔
1839

4✔
1840
        // First fetch the top level bucket which stores all data related to
4✔
1841
        // historically stored channels.
4✔
1842
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
4✔
1843
        if historicalChanBucket == nil {
4✔
1844
                return nil, ErrNoHistoricalBucket
×
1845
        }
×
1846

1847
        // With the bucket for the node and chain fetched, we can now go down
1848
        // another level, for the channel itself.
1849
        var chanPointBuf bytes.Buffer
4✔
1850
        if err := graphdb.WriteOutpoint(&chanPointBuf, outPoint); err != nil {
4✔
1851
                return nil, err
×
1852
        }
×
1853
        chanBucket := historicalChanBucket.NestedReadBucket(
4✔
1854
                chanPointBuf.Bytes(),
4✔
1855
        )
4✔
1856
        if chanBucket == nil {
6✔
1857
                return nil, ErrChannelNotFound
2✔
1858
        }
2✔
1859

1860
        return chanBucket, nil
2✔
1861
}
1862

1863
// FetchHistoricalChannel fetches open channel data from the historical channel
1864
// bucket.
1865
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1866
        *OpenChannel, error) {
4✔
1867

4✔
1868
        var channel *OpenChannel
4✔
1869
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
8✔
1870
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
4✔
1871
                if err != nil {
6✔
1872
                        return err
2✔
1873
                }
2✔
1874

1875
                channel, err = fetchOpenChannel(chanBucket, outPoint)
2✔
1876
                if err != nil {
2✔
1877
                        return err
×
1878
                }
×
1879

1880
                channel.Db = c
2✔
1881
                return nil
2✔
1882
        }, func() {
4✔
1883
                channel = nil
4✔
1884
        })
4✔
1885
        if err != nil {
6✔
1886
                return nil, err
2✔
1887
        }
2✔
1888

1889
        return channel, nil
2✔
1890
}
1891

1892
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1893
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
10✔
1894

10✔
1895
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
10✔
1896
        if finalHtlcsBucket == nil {
16✔
1897
                return nil, ErrFinalHtlcsBucketNotFound
6✔
1898
        }
6✔
1899

1900
        var chanIDBytes [8]byte
4✔
1901
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
4✔
1902

4✔
1903
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
4✔
1904
        if chanBucket == nil {
4✔
1905
                return nil, ErrFinalChannelBucketNotFound
×
1906
        }
×
1907

1908
        return chanBucket, nil
4✔
1909
}
1910

1911
var ErrHtlcUnknown = errors.New("htlc unknown")
1912

1913
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1914
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1915
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1916
        htlcIndex uint64) (*FinalHtlcInfo, error) {
10✔
1917

10✔
1918
        var idBytes [8]byte
10✔
1919
        byteOrder.PutUint64(idBytes[:], htlcIndex)
10✔
1920

10✔
1921
        var settledByte byte
10✔
1922

10✔
1923
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
20✔
1924
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
10✔
1925
                        tx, chanID,
10✔
1926
                )
10✔
1927
                switch {
10✔
1928
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
6✔
1929
                        fallthrough
6✔
1930

1931
                case errors.Is(err, ErrFinalChannelBucketNotFound):
6✔
1932
                        return ErrHtlcUnknown
6✔
1933

1934
                case err != nil:
×
1935
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1936
                                err)
×
1937
                }
1938

1939
                value := finalHtlcsBucket.Get(idBytes[:])
4✔
1940
                if value == nil {
5✔
1941
                        return ErrHtlcUnknown
1✔
1942
                }
1✔
1943

1944
                if len(value) != 1 {
3✔
1945
                        return errors.New("unexpected final htlc value length")
×
1946
                }
×
1947

1948
                settledByte = value[0]
3✔
1949

3✔
1950
                return nil
3✔
1951
        }, func() {
10✔
1952
                settledByte = 0
10✔
1953
        })
10✔
1954
        if err != nil {
17✔
1955
                return nil, err
7✔
1956
        }
7✔
1957

1958
        info := FinalHtlcInfo{
3✔
1959
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
3✔
1960
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
3✔
1961
        }
3✔
1962

3✔
1963
        return &info, nil
3✔
1964
}
1965

1966
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
1967
// the database.
1968
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
1969
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
1✔
1970

1✔
1971
        // Skip if the user did not opt in to storing final resolutions.
1✔
1972
        if !c.parent.storeFinalHtlcResolutions {
1✔
1973
                return nil
×
1974
        }
×
1975

1976
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2✔
1977
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
1✔
1978
                if err != nil {
1✔
1979
                        return err
×
1980
                }
×
1981

1982
                return putFinalHtlc(
1✔
1983
                        finalHtlcsBucket, htlcID,
1✔
1984
                        FinalHtlcInfo{
1✔
1985
                                Settled:  settled,
1✔
1986
                                Offchain: false,
1✔
1987
                        },
1✔
1988
                )
1✔
1989
        }, func() {})
1✔
1990
}
1991

1992
// MakeTestInvoiceDB is used to create a test invoice database for testing
1993
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
1994
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
1995
        invoices.InvoiceDB, error) {
154✔
1996

154✔
1997
        return MakeTestDB(t, modifiers...)
154✔
1998
}
154✔
1999

2000
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
2001
// A callback which cleans up the created temporary directories is also
2002
// returned and intended to be executed after the test completes.
2003
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
288✔
2004
        // First, create a temporary directory to be used for the duration of
288✔
2005
        // this test.
288✔
2006
        tempDirName := t.TempDir()
288✔
2007

288✔
2008
        // Next, create channeldb for the first time.
288✔
2009
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
288✔
2010
        if err != nil {
288✔
2011
                backendCleanup()
×
2012
                return nil, err
×
2013
        }
×
2014

2015
        cdb, err := CreateWithBackend(backend, modifiers...)
288✔
2016
        if err != nil {
288✔
2017
                backendCleanup()
×
2018
                return nil, err
×
2019
        }
×
2020

2021
        t.Cleanup(func() {
576✔
2022
                cdb.Close()
288✔
2023
                backendCleanup()
288✔
2024
        })
288✔
2025

2026
        return cdb, nil
288✔
2027
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc