• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15628278788

13 Jun 2025 06:45AM UTC coverage: 68.511% (+10.2%) from 58.333%
15628278788

Pull #9945

github

web-flow
Merge e78253ccc into 35102e7c3
Pull Request #9945: Decayed log optional migration

104 of 128 new or added lines in 10 files covered. (81.25%)

42 existing lines in 10 files now uncovered.

134495 of 196311 relevant lines covered (68.51%)

22245.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.19
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "net"
8
        "os"
9
        "testing"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/go-errors/errors"
15
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
16
        "github.com/lightningnetwork/lnd/channeldb/migration12"
17
        "github.com/lightningnetwork/lnd/channeldb/migration13"
18
        "github.com/lightningnetwork/lnd/channeldb/migration16"
19
        "github.com/lightningnetwork/lnd/channeldb/migration20"
20
        "github.com/lightningnetwork/lnd/channeldb/migration21"
21
        "github.com/lightningnetwork/lnd/channeldb/migration23"
22
        "github.com/lightningnetwork/lnd/channeldb/migration24"
23
        "github.com/lightningnetwork/lnd/channeldb/migration25"
24
        "github.com/lightningnetwork/lnd/channeldb/migration26"
25
        "github.com/lightningnetwork/lnd/channeldb/migration27"
26
        "github.com/lightningnetwork/lnd/channeldb/migration29"
27
        "github.com/lightningnetwork/lnd/channeldb/migration30"
28
        "github.com/lightningnetwork/lnd/channeldb/migration31"
29
        "github.com/lightningnetwork/lnd/channeldb/migration32"
30
        "github.com/lightningnetwork/lnd/channeldb/migration33"
31
        "github.com/lightningnetwork/lnd/channeldb/migration34"
32
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
33
        "github.com/lightningnetwork/lnd/clock"
34
        graphdb "github.com/lightningnetwork/lnd/graph/db"
35
        "github.com/lightningnetwork/lnd/invoices"
36
        "github.com/lightningnetwork/lnd/kvdb"
37
        "github.com/lightningnetwork/lnd/lnwire"
38
        "github.com/stretchr/testify/require"
39
)
40

41
const (
42
        dbName = "channel.db"
43
)
44

45
var (
46
        // ErrDryRunMigrationOK signals that a migration executed successful,
47
        // but we intentionally did not commit the result.
48
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
49

50
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
51
        // bucket does not exist.
52
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
53
                "found")
54

55
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
56
        // final htlc outcomes does not exist.
57
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
58
                "bucket not found")
59
)
60

61
// migration is a function which takes a prior outdated version of the database
62
// instances and mutates the key/bucket structure to arrive at a more
63
// up-to-date version of the database.
64
type migration func(tx kvdb.RwTx) error
65

66
// mandatoryVersion defines a db version that must be applied before the lnd
67
// starts.
68
type mandatoryVersion struct {
69
        number    uint32
70
        migration migration
71
}
72

73
// MigrationConfig is an interface combines the config interfaces of all
74
// optional migrations.
75
type MigrationConfig interface {
76
        migration30.MigrateRevLogConfig
77
        migration34.MigrationConfig
78
}
79

80
// MigrationConfigImpl is a super set of all the various migration configs and
81
// an implementation of MigrationConfig.
82
type MigrationConfigImpl struct {
83
        migration30.MigrateRevLogConfigImpl
84
        migration34.MigrationConfigImpl
85
}
86

87
// optionalMigration defines an optional migration function. When a migration
88
// is optional, it usually involves a large scale of changes that might touch
89
// millions of keys. Due to OOM concern, the update cannot be safely done
90
// within one db transaction. Thus, for optional migrations, they must take the
91
// db backend and construct transactions as needed.
92
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
93

94
// optionalVersion defines a db version that can be optionally applied. When
95
// applying migrations, we must apply all the mandatory migrations first before
96
// attempting optional ones.
97
type optionalVersion struct {
98
        name      string
99
        migration optionalMigration
100
}
101

102
var (
103
        // dbVersions is storing all mandatory versions of database. If current
104
        // version of database don't match with latest version this list will
105
        // be used for retrieving all migration function that are need to apply
106
        // to the current db.
107
        dbVersions = []mandatoryVersion{
108
                {
109
                        // The base DB version requires no migration.
110
                        number:    0,
111
                        migration: nil,
112
                },
113
                {
114
                        // The version of the database where two new indexes
115
                        // for the update time of node and channel updates were
116
                        // added.
117
                        number:    1,
118
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
119
                },
120
                {
121
                        // The DB version that added the invoice event time
122
                        // series.
123
                        number:    2,
124
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
125
                },
126
                {
127
                        // The DB version that updated the embedded invoice in
128
                        // outgoing payments to match the new format.
129
                        number:    3,
130
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
131
                },
132
                {
133
                        // The version of the database where every channel
134
                        // always has two entries in the edges bucket. If
135
                        // a policy is unknown, this will be represented
136
                        // by a special byte sequence.
137
                        number:    4,
138
                        migration: migration_01_to_11.MigrateEdgePolicies,
139
                },
140
                {
141
                        // The DB version where we persist each attempt to send
142
                        // an HTLC to a payment hash, and track whether the
143
                        // payment is in-flight, succeeded, or failed.
144
                        number:    5,
145
                        migration: migration_01_to_11.PaymentStatusesMigration,
146
                },
147
                {
148
                        // The DB version that properly prunes stale entries
149
                        // from the edge update index.
150
                        number:    6,
151
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
152
                },
153
                {
154
                        // The DB version that migrates the ChannelCloseSummary
155
                        // to a format where optional fields are indicated with
156
                        // boolean flags.
157
                        number:    7,
158
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
159
                },
160
                {
161
                        // The DB version that changes the gossiper's message
162
                        // store keys to account for the message's type and
163
                        // ShortChannelID.
164
                        number:    8,
165
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
166
                },
167
                {
168
                        // The DB version where the payments and payment
169
                        // statuses are moved to being stored in a combined
170
                        // bucket.
171
                        number:    9,
172
                        migration: migration_01_to_11.MigrateOutgoingPayments,
173
                },
174
                {
175
                        // The DB version where we started to store legacy
176
                        // payload information for all routes, as well as the
177
                        // optional TLV records.
178
                        number:    10,
179
                        migration: migration_01_to_11.MigrateRouteSerialization,
180
                },
181
                {
182
                        // Add invoice htlc and cltv delta fields.
183
                        number:    11,
184
                        migration: migration_01_to_11.MigrateInvoices,
185
                },
186
                {
187
                        // Migrate to TLV invoice bodies, add payment address
188
                        // and features, remove receipt.
189
                        number:    12,
190
                        migration: migration12.MigrateInvoiceTLV,
191
                },
192
                {
193
                        // Migrate to multi-path payments.
194
                        number:    13,
195
                        migration: migration13.MigrateMPP,
196
                },
197
                {
198
                        // Initialize payment address index and begin using it
199
                        // as the default index, falling back to payment hash
200
                        // index.
201
                        number:    14,
202
                        migration: mig.CreateTLB(payAddrIndexBucket),
203
                },
204
                {
205
                        // Initialize payment index bucket which will be used
206
                        // to index payments by sequence number. This index will
207
                        // be used to allow more efficient ListPayments queries.
208
                        number:    15,
209
                        migration: mig.CreateTLB(paymentsIndexBucket),
210
                },
211
                {
212
                        // Add our existing payments to the index bucket created
213
                        // in migration 15.
214
                        number:    16,
215
                        migration: migration16.MigrateSequenceIndex,
216
                },
217
                {
218
                        // Create a top level bucket which will store extra
219
                        // information about channel closes.
220
                        number:    17,
221
                        migration: mig.CreateTLB(closeSummaryBucket),
222
                },
223
                {
224
                        // Create a top level bucket which holds information
225
                        // about our peers.
226
                        number:    18,
227
                        migration: mig.CreateTLB(peersBucket),
228
                },
229
                {
230
                        // Create a top level bucket which holds outpoint
231
                        // information.
232
                        number:    19,
233
                        migration: mig.CreateTLB(outpointBucket),
234
                },
235
                {
236
                        // Migrate some data to the outpoint index.
237
                        number:    20,
238
                        migration: migration20.MigrateOutpointIndex,
239
                },
240
                {
241
                        // Migrate to length prefixed wire messages everywhere
242
                        // in the database.
243
                        number:    21,
244
                        migration: migration21.MigrateDatabaseWireMessages,
245
                },
246
                {
247
                        // Initialize set id index so that invoices can be
248
                        // queried by individual htlc sets.
249
                        number:    22,
250
                        migration: mig.CreateTLB(setIDIndexBucket),
251
                },
252
                {
253
                        number:    23,
254
                        migration: migration23.MigrateHtlcAttempts,
255
                },
256
                {
257
                        // Remove old forwarding packages of closed channels.
258
                        number:    24,
259
                        migration: migration24.MigrateFwdPkgCleanup,
260
                },
261
                {
262
                        // Save the initial local/remote balances in channel
263
                        // info.
264
                        number:    25,
265
                        migration: migration25.MigrateInitialBalances,
266
                },
267
                {
268
                        // Migrate the initial local/remote balance fields into
269
                        // tlv records.
270
                        number:    26,
271
                        migration: migration26.MigrateBalancesToTlvRecords,
272
                },
273
                {
274
                        // Patch the initial local/remote balance fields with
275
                        // empty values for historical channels.
276
                        number:    27,
277
                        migration: migration27.MigrateHistoricalBalances,
278
                },
279
                {
280
                        number:    28,
281
                        migration: mig.CreateTLB(chanIDBucket),
282
                },
283
                {
284
                        number:    29,
285
                        migration: migration29.MigrateChanID,
286
                },
287
                {
288
                        // Removes the "sweeper-last-tx" bucket. Although we
289
                        // do not have a mandatory version 30 we skip this
290
                        // version because its naming is already used for the
291
                        // first optional migration.
292
                        number:    31,
293
                        migration: migration31.DeleteLastPublishedTxTLB,
294
                },
295
                {
296
                        number:    32,
297
                        migration: migration32.MigrateMCRouteSerialisation,
298
                },
299
                {
300
                        number:    33,
301
                        migration: migration33.MigrateMCStoreNameSpacedResults,
302
                },
303
        }
304

305
        // optionalVersions stores all optional migrations that are applied
306
        // after dbVersions.
307
        //
308
        // NOTE: optional migrations must be fault-tolerant and re-run already
309
        // migrated data must be noop, which means the migration must be able
310
        // to determine its state.
311
        optionalVersions = []optionalVersion{
312
                {
313
                        name: "prune_revocation_log",
314
                        migration: func(db kvdb.Backend,
315
                                cfg MigrationConfig) error {
×
316

×
317
                                return migration30.MigrateRevocationLog(db, cfg)
×
318
                        },
×
319
                },
320
                {
321
                        name: "gc_decayed_log",
322
                        migration: func(db kvdb.Backend,
323
                                cfg MigrationConfig) error {
3✔
324

3✔
325
                                return migration34.MigrateDecayedLog(
3✔
326
                                        db, cfg,
3✔
327
                                )
3✔
328
                        },
3✔
329
                },
330
        }
331

332
        // Big endian is the preferred byte order, due to cursor scans over
333
        // integer keys iterating in order.
334
        byteOrder = binary.BigEndian
335

336
        // channelOpeningStateBucket is the database bucket used to store the
337
        // channelOpeningState for each channel that is currently in the process
338
        // of being opened.
339
        channelOpeningStateBucket = []byte("channelOpeningState")
340
)
341

342
// DB is the primary datastore for the lnd daemon. The database stores
343
// information related to nodes, routing data, open/closed channels, fee
344
// schedules, and reputation data.
345
type DB struct {
346
        kvdb.Backend
347

348
        // channelStateDB separates all DB operations on channel state.
349
        channelStateDB *ChannelStateDB
350

351
        dbPath                    string
352
        clock                     clock.Clock
353
        dryRun                    bool
354
        keepFailedPaymentAttempts bool
355
        storeFinalHtlcResolutions bool
356

357
        // noRevLogAmtData if true, means that commitment transaction amount
358
        // data should not be stored in the revocation log.
359
        noRevLogAmtData bool
360
}
361

362
// OpenForTesting opens or creates a channeldb to be used for tests. Any
363
// necessary schemas migrations due to updates will take place as necessary.
364
func OpenForTesting(t testing.TB, dbPath string,
365
        modifiers ...OptionModifier) *DB {
1,456✔
366

1,456✔
367
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
1,456✔
368
                DBPath:            dbPath,
1,456✔
369
                DBFileName:        dbName,
1,456✔
370
                NoFreelistSync:    true,
1,456✔
371
                AutoCompact:       false,
1,456✔
372
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
1,456✔
373
                DBTimeout:         kvdb.DefaultDBTimeout,
1,456✔
374
        })
1,456✔
375
        require.NoError(t, err)
1,456✔
376

1,456✔
377
        db, err := CreateWithBackend(backend, modifiers...)
1,456✔
378
        require.NoError(t, err)
1,456✔
379

1,456✔
380
        db.dbPath = dbPath
1,456✔
381

1,456✔
382
        t.Cleanup(func() {
2,912✔
383
                require.NoError(t, db.Close())
1,456✔
384
        })
1,456✔
385

386
        return db
1,456✔
387
}
388

389
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
390
// Any necessary schemas migrations due to updates will take place as necessary.
391
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
392
        error) {
1,753✔
393

1,753✔
394
        opts := DefaultOptions()
1,753✔
395
        for _, modifier := range modifiers {
1,933✔
396
                modifier(&opts)
180✔
397
        }
180✔
398

399
        if !opts.NoMigration {
3,506✔
400
                if err := initChannelDB(backend); err != nil {
1,754✔
401
                        return nil, err
1✔
402
                }
1✔
403
        }
404

405
        chanDB := &DB{
1,752✔
406
                Backend: backend,
1,752✔
407
                channelStateDB: &ChannelStateDB{
1,752✔
408
                        linkNodeDB: &LinkNodeDB{
1,752✔
409
                                backend: backend,
1,752✔
410
                        },
1,752✔
411
                        backend: backend,
1,752✔
412
                },
1,752✔
413
                clock:                     opts.clock,
1,752✔
414
                dryRun:                    opts.dryRun,
1,752✔
415
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
1,752✔
416
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
1,752✔
417
                noRevLogAmtData:           opts.NoRevLogAmtData,
1,752✔
418
        }
1,752✔
419

1,752✔
420
        // Set the parent pointer (only used in tests).
1,752✔
421
        chanDB.channelStateDB.parent = chanDB
1,752✔
422

1,752✔
423
        // Synchronize the version of database and apply migrations if needed.
1,752✔
424
        if !opts.NoMigration {
3,504✔
425
                if err := chanDB.syncVersions(dbVersions); err != nil {
1,753✔
426
                        backend.Close()
1✔
427
                        return nil, err
1✔
428
                }
1✔
429

430
                // Grab the optional migration config.
431
                omc := opts.OptionalMiragtionConfig
1,751✔
432
                if err := chanDB.applyOptionalVersions(omc); err != nil {
1,751✔
433
                        backend.Close()
×
434
                        return nil, err
×
435
                }
×
436
        }
437

438
        return chanDB, nil
1,751✔
439
}
440

441
// Path returns the file path to the channel database.
442
func (d *DB) Path() string {
197✔
443
        return d.dbPath
197✔
444
}
197✔
445

446
var dbTopLevelBuckets = [][]byte{
447
        openChannelBucket,
448
        closedChannelBucket,
449
        forwardingLogBucket,
450
        fwdPackagesKey,
451
        invoiceBucket,
452
        payAddrIndexBucket,
453
        setIDIndexBucket,
454
        paymentsIndexBucket,
455
        peersBucket,
456
        nodeInfoBucket,
457
        metaBucket,
458
        closeSummaryBucket,
459
        outpointBucket,
460
        chanIDBucket,
461
        historicalChannelBucket,
462
}
463

464
// Wipe completely deletes all saved state within all used buckets within the
465
// database. The deletion is done in a single transaction, therefore this
466
// operation is fully atomic.
467
func (d *DB) Wipe() error {
177✔
468
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
354✔
469
                for _, tlb := range dbTopLevelBuckets {
2,832✔
470
                        err := tx.DeleteTopLevelBucket(tlb)
2,655✔
471
                        if err != nil && err != kvdb.ErrBucketNotFound {
2,655✔
472
                                return err
×
473
                        }
×
474
                }
475
                return nil
177✔
476
        }, func() {})
177✔
477
        if err != nil {
177✔
478
                return err
×
479
        }
×
480

481
        return initChannelDB(d.Backend)
177✔
482
}
483

484
// initChannelDB creates and initializes a fresh version of channeldb. In the
485
// case that the target path has not yet been created or doesn't yet exist, then
486
// the path is created. Additionally, all required top-level buckets used within
487
// the database are created.
488
func initChannelDB(db kvdb.Backend) error {
1,930✔
489
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
3,860✔
490
                // Check if DB was marked as inactive with a tomb stone.
1,930✔
491
                if err := EnsureNoTombstone(tx); err != nil {
1,931✔
492
                        return err
1✔
493
                }
1✔
494

495
                meta := &Meta{}
1,929✔
496
                // Check if DB is already initialized.
1,929✔
497
                err := FetchMeta(meta, tx)
1,929✔
498
                if err == nil {
2,139✔
499
                        return nil
210✔
500
                }
210✔
501

502
                for _, tlb := range dbTopLevelBuckets {
27,510✔
503
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
25,788✔
504
                                return err
×
505
                        }
×
506
                }
507

508
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
1,722✔
509
                return putMeta(meta, tx)
1,722✔
510
        }, func() {})
1,930✔
511
        if err != nil {
1,931✔
512
                return fmt.Errorf("unable to create new channeldb: %w", err)
1✔
513
        }
1✔
514

515
        return nil
1,929✔
516
}
517

518
// fileExists returns true if the file exists, and false otherwise.
519
func fileExists(path string) bool {
1✔
520
        if _, err := os.Stat(path); err != nil {
1✔
521
                if os.IsNotExist(err) {
×
522
                        return false
×
523
                }
×
524
        }
525

526
        return true
1✔
527
}
528

529
// ChannelStateDB is a database that keeps track of all channel state.
530
type ChannelStateDB struct {
531
        // linkNodeDB separates all DB operations on LinkNodes.
532
        linkNodeDB *LinkNodeDB
533

534
        // parent holds a pointer to the "main" channeldb.DB object. This is
535
        // only used for testing and should never be used in production code.
536
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
537
        // this pointer.
538
        parent *DB
539

540
        // backend points to the actual backend holding the channel state
541
        // database. This may be a real backend or a cache middleware.
542
        backend kvdb.Backend
543
}
544

545
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
546
// ChannelStateDB instance. Use this function only in tests where passing around
547
// pointers makes testing less readable. Never to be used in production code!
548
func (c *ChannelStateDB) GetParentDB() *DB {
517✔
549
        return c.parent
517✔
550
}
517✔
551

552
// LinkNodeDB returns the current instance of the link node database.
553
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
3✔
554
        return c.linkNodeDB
3✔
555
}
3✔
556

557
// FetchOpenChannels starts a new database transaction and returns all stored
558
// currently active/open channels associated with the target nodeID. In the case
559
// that no active channels are known to have been created with this node, then a
560
// zero-length slice is returned.
561
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
562
        []*OpenChannel, error) {
257✔
563

257✔
564
        var channels []*OpenChannel
257✔
565
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
514✔
566
                var err error
257✔
567
                channels, err = c.fetchOpenChannels(tx, nodeID)
257✔
568
                return err
257✔
569
        }, func() {
514✔
570
                channels = nil
257✔
571
        })
257✔
572

573
        return channels, err
257✔
574
}
575

576
// fetchOpenChannels uses and existing database transaction and returns all
577
// stored currently active/open channels associated with the target nodeID. In
578
// the case that no active channels are known to have been created with this
579
// node, then a zero-length slice is returned.
580
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
581
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
264✔
582

264✔
583
        // Get the bucket dedicated to storing the metadata for open channels.
264✔
584
        openChanBucket := tx.ReadBucket(openChannelBucket)
264✔
585
        if openChanBucket == nil {
264✔
586
                return nil, nil
×
587
        }
×
588

589
        // Within this top level bucket, fetch the bucket dedicated to storing
590
        // open channel data specific to the remote node.
591
        pub := nodeID.SerializeCompressed()
264✔
592
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
264✔
593
        if nodeChanBucket == nil {
319✔
594
                return nil, nil
55✔
595
        }
55✔
596

597
        // Next, we'll need to go down an additional layer in order to retrieve
598
        // the channels for each chain the node knows of.
599
        var channels []*OpenChannel
212✔
600
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
424✔
601
                // If there's a value, it's not a bucket so ignore it.
212✔
602
                if v != nil {
212✔
603
                        return nil
×
604
                }
×
605

606
                // If we've found a valid chainhash bucket, then we'll retrieve
607
                // that so we can extract all the channels.
608
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
212✔
609
                if chainBucket == nil {
212✔
610
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
611
                                chainHash[:])
×
612
                }
×
613

614
                // Finally, we both of the necessary buckets retrieved, fetch
615
                // all the active channels related to this node.
616
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
212✔
617
                if err != nil {
212✔
618
                        return fmt.Errorf("unable to read channel for "+
×
619
                                "chain_hash=%x, node_key=%x: %v",
×
620
                                chainHash[:], pub, err)
×
621
                }
×
622

623
                channels = append(channels, nodeChannels...)
212✔
624
                return nil
212✔
625
        })
626

627
        return channels, err
212✔
628
}
629

630
// fetchNodeChannels retrieves all active channels from the target chainBucket
631
// which is under a node's dedicated channel bucket. This function is typically
632
// used to fetch all the active channels related to a particular node.
633
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
634
        []*OpenChannel, error) {
718✔
635

718✔
636
        var channels []*OpenChannel
718✔
637

718✔
638
        // A node may have channels on several chains, so for each known chain,
718✔
639
        // we'll extract all the channels.
718✔
640
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
1,497✔
641
                // If there's a value, it's not a bucket so ignore it.
779✔
642
                if v != nil {
779✔
643
                        return nil
×
644
                }
×
645

646
                // Once we've found a valid channel bucket, we'll extract it
647
                // from the node's chain bucket.
648
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
779✔
649

779✔
650
                var outPoint wire.OutPoint
779✔
651
                err := graphdb.ReadOutpoint(
779✔
652
                        bytes.NewReader(chanPoint), &outPoint,
779✔
653
                )
779✔
654
                if err != nil {
779✔
655
                        return err
×
656
                }
×
657
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
779✔
658
                if err != nil {
779✔
659
                        return fmt.Errorf("unable to read channel data for "+
×
660
                                "chan_point=%v: %w", outPoint, err)
×
661
                }
×
662
                oChannel.Db = c
779✔
663

779✔
664
                channels = append(channels, oChannel)
779✔
665

779✔
666
                return nil
779✔
667
        })
668
        if err != nil {
718✔
669
                return nil, err
×
670
        }
×
671

672
        return channels, nil
718✔
673
}
674

675
// FetchChannel attempts to locate a channel specified by the passed channel
676
// point. If the channel cannot be found, then an error will be returned.
677
func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel,
678
        error) {
13✔
679

13✔
680
        var targetChanPoint bytes.Buffer
13✔
681
        err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint)
13✔
682
        if err != nil {
13✔
683
                return nil, err
×
684
        }
×
685

686
        targetChanPointBytes := targetChanPoint.Bytes()
13✔
687
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
13✔
688
                error) {
25✔
689

12✔
690
                return targetChanPointBytes, &chanPoint, nil
12✔
691
        }
12✔
692

693
        return c.channelScanner(nil, selector)
13✔
694
}
695

696
// FetchChannelByID attempts to locate a channel specified by the passed channel
697
// ID. If the channel cannot be found, then an error will be returned.
698
// Optionally an existing db tx can be supplied.
699
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
700
        *OpenChannel, error) {
5✔
701

5✔
702
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
5✔
703
                error) {
10✔
704

5✔
705
                var (
5✔
706
                        targetChanPointBytes []byte
5✔
707
                        targetChanPoint      *wire.OutPoint
5✔
708

5✔
709
                        // errChanFound is used to signal that the channel has
5✔
710
                        // been found so that iteration through the DB buckets
5✔
711
                        // can stop.
5✔
712
                        errChanFound = errors.New("channel found")
5✔
713
                )
5✔
714
                err := chainBkt.ForEach(func(k, _ []byte) error {
10✔
715
                        var outPoint wire.OutPoint
5✔
716
                        err := graphdb.ReadOutpoint(
5✔
717
                                bytes.NewReader(k), &outPoint,
5✔
718
                        )
5✔
719
                        if err != nil {
5✔
720
                                return err
×
721
                        }
×
722

723
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
5✔
724
                        if chanID != id {
6✔
725
                                return nil
1✔
726
                        }
1✔
727

728
                        targetChanPoint = &outPoint
4✔
729
                        targetChanPointBytes = k
4✔
730

4✔
731
                        return errChanFound
4✔
732
                })
733
                if err != nil && !errors.Is(err, errChanFound) {
5✔
734
                        return nil, nil, err
×
735
                }
×
736
                if targetChanPoint == nil {
6✔
737
                        return nil, nil, ErrChannelNotFound
1✔
738
                }
1✔
739

740
                return targetChanPointBytes, targetChanPoint, nil
4✔
741
        }
742

743
        return c.channelScanner(tx, selector)
5✔
744
}
745

746
// ChanCount is used by the server in determining access control.
747
type ChanCount struct {
748
        HasOpenOrClosedChan bool
749
        PendingOpenCount    uint64
750
}
751

752
// FetchPermAndTempPeers returns a map where the key is the remote node's
753
// public key and the value is a struct that has a tally of the pending-open
754
// channels and whether the peer has an open or closed channel with us.
755
func (c *ChannelStateDB) FetchPermAndTempPeers(
756
        chainHash []byte) (map[string]ChanCount, error) {
4✔
757

4✔
758
        peerCounts := make(map[string]ChanCount)
4✔
759

4✔
760
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
8✔
761
                openChanBucket := tx.ReadBucket(openChannelBucket)
4✔
762
                if openChanBucket == nil {
4✔
763
                        return ErrNoChanDBExists
×
764
                }
×
765

766
                openChanErr := openChanBucket.ForEach(func(nodePub,
4✔
767
                        v []byte) error {
10✔
768

6✔
769
                        // If there is a value, this is not a bucket.
6✔
770
                        if v != nil {
6✔
771
                                return nil
×
772
                        }
×
773

774
                        nodeChanBucket := openChanBucket.NestedReadBucket(
6✔
775
                                nodePub,
6✔
776
                        )
6✔
777
                        if nodeChanBucket == nil {
6✔
778
                                return nil
×
779
                        }
×
780

781
                        chainBucket := nodeChanBucket.NestedReadBucket(
6✔
782
                                chainHash,
6✔
783
                        )
6✔
784
                        if chainBucket == nil {
6✔
785
                                return fmt.Errorf("no chain bucket exists")
×
786
                        }
×
787

788
                        var isPermPeer bool
6✔
789
                        var pendingOpenCount uint64
6✔
790

6✔
791
                        internalErr := chainBucket.ForEach(func(chanPoint,
6✔
792
                                val []byte) error {
11✔
793

5✔
794
                                // If there is a value, this is not a bucket.
5✔
795
                                if val != nil {
5✔
796
                                        return nil
×
797
                                }
×
798

799
                                chanBucket := chainBucket.NestedReadBucket(
5✔
800
                                        chanPoint,
5✔
801
                                )
5✔
802
                                if chanBucket == nil {
5✔
803
                                        return nil
×
804
                                }
×
805

806
                                var op wire.OutPoint
5✔
807
                                readErr := graphdb.ReadOutpoint(
5✔
808
                                        bytes.NewReader(chanPoint), &op,
5✔
809
                                )
5✔
810
                                if readErr != nil {
5✔
811
                                        return readErr
×
812
                                }
×
813

814
                                // We need to go through each channel and look
815
                                // at the IsPending status.
816
                                openChan, err := fetchOpenChannel(
5✔
817
                                        chanBucket, &op,
5✔
818
                                )
5✔
819
                                if err != nil {
5✔
820
                                        return err
×
821
                                }
×
822

823
                                if openChan.IsPending {
9✔
824
                                        // Add to the pending-open count since
4✔
825
                                        // this is a temp peer.
4✔
826
                                        pendingOpenCount++
4✔
827
                                        return nil
4✔
828
                                }
4✔
829

830
                                // Since IsPending is false, this is a perm
831
                                // peer.
832
                                isPermPeer = true
4✔
833

4✔
834
                                return nil
4✔
835
                        })
836
                        if internalErr != nil {
6✔
837
                                return internalErr
×
838
                        }
×
839

840
                        peerCount := ChanCount{
6✔
841
                                HasOpenOrClosedChan: isPermPeer,
6✔
842
                                PendingOpenCount:    pendingOpenCount,
6✔
843
                        }
6✔
844
                        peerCounts[string(nodePub)] = peerCount
6✔
845

6✔
846
                        return nil
6✔
847
                })
848
                if openChanErr != nil {
4✔
849
                        return openChanErr
×
850
                }
×
851

852
                // Now check the closed channel bucket.
853
                historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
4✔
854
                if historicalChanBucket == nil {
4✔
855
                        return ErrNoHistoricalBucket
×
856
                }
×
857

858
                historicalErr := historicalChanBucket.ForEach(func(chanPoint,
4✔
859
                        v []byte) error {
8✔
860
                        // Parse each nested bucket and the chanInfoKey to get
4✔
861
                        // the IsPending bool. This determines whether the
4✔
862
                        // peer is protected or not.
4✔
863
                        if v != nil {
4✔
864
                                // This is not a bucket. This is currently not
×
865
                                // possible.
×
866
                                return nil
×
867
                        }
×
868

869
                        chanBucket := historicalChanBucket.NestedReadBucket(
4✔
870
                                chanPoint,
4✔
871
                        )
4✔
872
                        if chanBucket == nil {
4✔
873
                                // This is not possible.
×
874
                                return fmt.Errorf("no historical channel " +
×
875
                                        "bucket exists")
×
876
                        }
×
877

878
                        var op wire.OutPoint
4✔
879
                        readErr := graphdb.ReadOutpoint(
4✔
880
                                bytes.NewReader(chanPoint), &op,
4✔
881
                        )
4✔
882
                        if readErr != nil {
4✔
883
                                return readErr
×
884
                        }
×
885

886
                        // This channel is closed, but the structure of the
887
                        // historical bucket is the same. This is by design,
888
                        // which means we can call fetchOpenChannel.
889
                        channel, fetchErr := fetchOpenChannel(chanBucket, &op)
4✔
890
                        if fetchErr != nil {
4✔
891
                                return fetchErr
×
892
                        }
×
893

894
                        // Only include this peer in the protected class if
895
                        // the closing transaction confirmed. Note that
896
                        // CloseChannel can be called in the funding manager
897
                        // while IsPending is true which is why we need this
898
                        // special-casing to not count premature funding
899
                        // manager calls to CloseChannel.
900
                        if !channel.IsPending {
8✔
901
                                // Fetch the public key of the remote node. We
4✔
902
                                // need to use the string-ified serialized,
4✔
903
                                // compressed bytes as the key.
4✔
904
                                remotePub := channel.IdentityPub
4✔
905
                                remoteSer := remotePub.SerializeCompressed()
4✔
906
                                remoteKey := string(remoteSer)
4✔
907

4✔
908
                                count, exists := peerCounts[remoteKey]
4✔
909
                                if exists {
8✔
910
                                        count.HasOpenOrClosedChan = true
4✔
911
                                        peerCounts[remoteKey] = count
4✔
912
                                } else {
4✔
913
                                        peerCount := ChanCount{
×
914
                                                HasOpenOrClosedChan: true,
×
915
                                        }
×
916
                                        peerCounts[remoteKey] = peerCount
×
917
                                }
×
918
                        }
919

920
                        return nil
4✔
921
                })
922
                if historicalErr != nil {
4✔
923
                        return historicalErr
×
924
                }
×
925

926
                return nil
4✔
927
        }, func() {
4✔
928
                clear(peerCounts)
4✔
929
        })
4✔
930

931
        return peerCounts, err
4✔
932
}
933

934
// channelSelector describes a function that takes a chain-hash bucket from
935
// within the open-channel DB and returns the wanted channel point bytes, and
936
// channel point. It must return the ErrChannelNotFound error if the wanted
937
// channel is not in the given bucket.
938
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
939
        error)
940

941
// channelScanner will traverse the DB to each chain-hash bucket of each node
942
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
943
// to fetch the wanted channel outpoint from the chain bucket.
944
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
945
        chanSelect channelSelector) (*OpenChannel, error) {
15✔
946

15✔
947
        var (
15✔
948
                targetChan *OpenChannel
15✔
949

15✔
950
                // errChanFound is used to signal that the channel has been
15✔
951
                // found so that iteration through the DB buckets can stop.
15✔
952
                errChanFound = errors.New("channel found")
15✔
953
        )
15✔
954

15✔
955
        // chanScan will traverse the following bucket structure:
15✔
956
        //  * nodePub => chainHash => chanPoint
15✔
957
        //
15✔
958
        // At each level we go one further, ensuring that we're traversing the
15✔
959
        // proper key (that's actually a bucket). By only reading the bucket
15✔
960
        // structure and skipping fully decoding each channel, we save a good
15✔
961
        // bit of CPU as we don't need to do things like decompress public
15✔
962
        // keys.
15✔
963
        chanScan := func(tx kvdb.RTx) error {
30✔
964
                // Get the bucket dedicated to storing the metadata for open
15✔
965
                // channels.
15✔
966
                openChanBucket := tx.ReadBucket(openChannelBucket)
15✔
967
                if openChanBucket == nil {
15✔
968
                        return ErrNoActiveChannels
×
969
                }
×
970

971
                // Within the node channel bucket, are the set of node pubkeys
972
                // we have channels with, we don't know the entire set, so we'll
973
                // check them all.
974
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
29✔
975
                        // Ensure that this is a key the same size as a pubkey,
14✔
976
                        // and also that it leads directly to a bucket.
14✔
977
                        if len(nodePub) != 33 || v != nil {
14✔
978
                                return nil
×
979
                        }
×
980

981
                        nodeChanBucket := openChanBucket.NestedReadBucket(
14✔
982
                                nodePub,
14✔
983
                        )
14✔
984
                        if nodeChanBucket == nil {
14✔
985
                                return nil
×
986
                        }
×
987

988
                        // The next layer down is all the chains that this node
989
                        // has channels on with us.
990
                        return nodeChanBucket.ForEach(func(chainHash,
14✔
991
                                v []byte) error {
28✔
992

14✔
993
                                // If there's a value, it's not a bucket so
14✔
994
                                // ignore it.
14✔
995
                                if v != nil {
14✔
996
                                        return nil
×
997
                                }
×
998

999
                                chainBucket := nodeChanBucket.NestedReadBucket(
14✔
1000
                                        chainHash,
14✔
1001
                                )
14✔
1002
                                if chainBucket == nil {
14✔
1003
                                        return fmt.Errorf("unable to read "+
×
1004
                                                "bucket for chain=%x",
×
1005
                                                chainHash)
×
1006
                                }
×
1007

1008
                                // Finally, we reach the leaf bucket that stores
1009
                                // all the chanPoints for this node.
1010
                                targetChanBytes, chanPoint, err := chanSelect(
14✔
1011
                                        chainBucket,
14✔
1012
                                )
14✔
1013
                                if errors.Is(err, ErrChannelNotFound) {
15✔
1014
                                        return nil
1✔
1015
                                } else if err != nil {
14✔
1016
                                        return err
×
1017
                                }
×
1018

1019
                                chanBucket := chainBucket.NestedReadBucket(
13✔
1020
                                        targetChanBytes,
13✔
1021
                                )
13✔
1022
                                if chanBucket == nil {
19✔
1023
                                        return nil
6✔
1024
                                }
6✔
1025

1026
                                channel, err := fetchOpenChannel(
10✔
1027
                                        chanBucket, chanPoint,
10✔
1028
                                )
10✔
1029
                                if err != nil {
10✔
1030
                                        return err
×
1031
                                }
×
1032

1033
                                targetChan = channel
10✔
1034
                                targetChan.Db = c
10✔
1035

10✔
1036
                                return errChanFound
10✔
1037
                        })
1038
                })
1039
        }
1040

1041
        var err error
15✔
1042
        if tx == nil {
30✔
1043
                err = kvdb.View(c.backend, chanScan, func() {})
30✔
1044
        } else {
×
1045
                err = chanScan(tx)
×
1046
        }
×
1047
        if err != nil && !errors.Is(err, errChanFound) {
15✔
1048
                return nil, err
×
1049
        }
×
1050

1051
        if targetChan != nil {
25✔
1052
                return targetChan, nil
10✔
1053
        }
10✔
1054

1055
        // If we can't find the channel, then we return with an error, as we
1056
        // have nothing to back up.
1057
        return nil, ErrChannelNotFound
8✔
1058
}
1059

1060
// FetchAllChannels attempts to retrieve all open channels currently stored
1061
// within the database, including pending open, fully open and channels waiting
1062
// for a closing transaction to confirm.
1063
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
566✔
1064
        return fetchChannels(c)
566✔
1065
}
566✔
1066

1067
// FetchAllOpenChannels will return all channels that have the funding
1068
// transaction confirmed, and is not waiting for a closing transaction to be
1069
// confirmed.
1070
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
408✔
1071
        return fetchChannels(
408✔
1072
                c,
408✔
1073
                pendingChannelFilter(false),
408✔
1074
                waitingCloseFilter(false),
408✔
1075
        )
408✔
1076
}
408✔
1077

1078
// FetchPendingChannels will return channels that have completed the process of
1079
// generating and broadcasting funding transactions, but whose funding
1080
// transactions have yet to be confirmed on the blockchain.
1081
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
82✔
1082
        return fetchChannels(c,
82✔
1083
                pendingChannelFilter(true),
82✔
1084
                waitingCloseFilter(false),
82✔
1085
        )
82✔
1086
}
82✔
1087

1088
// FetchWaitingCloseChannels will return all channels that have been opened,
1089
// but are now waiting for a closing transaction to be confirmed.
1090
//
1091
// NOTE: This includes channels that are also pending to be opened.
1092
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
4✔
1093
        return fetchChannels(
4✔
1094
                c, waitingCloseFilter(true),
4✔
1095
        )
4✔
1096
}
4✔
1097

1098
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
1099
// A set of filters can be combined to filter across multiple dimensions.
1100
type fetchChannelsFilter func(channel *OpenChannel) bool
1101

1102
// pendingChannelFilter returns a filter based on whether channels are pending
1103
// (ie, their funding transaction still needs to confirm). If pending is false,
1104
// channels with confirmed funding transactions are returned.
1105
func pendingChannelFilter(pending bool) fetchChannelsFilter {
496✔
1106
        return func(channel *OpenChannel) bool {
721✔
1107
                return channel.IsPending == pending
225✔
1108
        }
225✔
1109
}
1110

1111
// waitingCloseFilter returns a filter which filters channels based on whether
1112
// they are awaiting the confirmation of their closing transaction. If waiting
1113
// close is true, channels that have had their closing tx broadcast are
1114
// included. If it is false, channels that are not awaiting confirmation of
1115
// their close transaction are returned.
1116
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
494✔
1117
        return func(channel *OpenChannel) bool {
705✔
1118
                // If the channel is in any other state than Default,
211✔
1119
                // then it means it is waiting to be closed.
211✔
1120
                channelWaitingClose :=
211✔
1121
                        channel.ChanStatus() != ChanStatusDefault
211✔
1122

211✔
1123
                // Include the channel if it matches the value for
211✔
1124
                // waiting close that we are filtering on.
211✔
1125
                return channelWaitingClose == waitingClose
211✔
1126
        }
211✔
1127
}
1128

1129
// fetchChannels attempts to retrieve channels currently stored in the
1130
// database. It takes a set of filters which are applied to each channel to
1131
// obtain a set of channels with the desired set of properties. Only channels
1132
// which have a true value returned for *all* of the filters will be returned.
1133
// If no filters are provided, every channel in the open channels bucket will
1134
// be returned.
1135
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
1136
        []*OpenChannel, error) {
1,063✔
1137

1,063✔
1138
        var channels []*OpenChannel
1,063✔
1139

1,063✔
1140
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
2,126✔
1141
                // Get the bucket dedicated to storing the metadata for open
1,063✔
1142
                // channels.
1,063✔
1143
                openChanBucket := tx.ReadBucket(openChannelBucket)
1,063✔
1144
                if openChanBucket == nil {
1,063✔
1145
                        return ErrNoActiveChannels
×
1146
                }
×
1147

1148
                // Next, fetch the bucket dedicated to storing metadata related
1149
                // to all nodes. All keys within this bucket are the serialized
1150
                // public keys of all our direct counterparties.
1151
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
1,063✔
1152
                if nodeMetaBucket == nil {
1,063✔
1153
                        return fmt.Errorf("node bucket not created")
×
1154
                }
×
1155

1156
                // Finally for each node public key in the bucket, fetch all
1157
                // the channels related to this particular node.
1158
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
1,572✔
1159
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
509✔
1160
                        if nodeChanBucket == nil {
509✔
1161
                                return nil
×
1162
                        }
×
1163

1164
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
1,018✔
1165
                                // If there's a value, it's not a bucket so
509✔
1166
                                // ignore it.
509✔
1167
                                if v != nil {
509✔
1168
                                        return nil
×
1169
                                }
×
1170

1171
                                // If we've found a valid chainhash bucket,
1172
                                // then we'll retrieve that so we can extract
1173
                                // all the channels.
1174
                                chainBucket := nodeChanBucket.NestedReadBucket(
509✔
1175
                                        chainHash,
509✔
1176
                                )
509✔
1177
                                if chainBucket == nil {
509✔
1178
                                        return fmt.Errorf("unable to read "+
×
1179
                                                "bucket for chain=%x", chainHash[:])
×
1180
                                }
×
1181

1182
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
509✔
1183
                                if err != nil {
509✔
1184
                                        return fmt.Errorf("unable to read "+
×
1185
                                                "channel for chain_hash=%x, "+
×
1186
                                                "node_key=%x: %v", chainHash[:], k, err)
×
1187
                                }
×
1188
                                for _, channel := range nodeChans {
1,055✔
1189
                                        // includeChannel indicates whether the channel
546✔
1190
                                        // meets the criteria specified by our filters.
546✔
1191
                                        includeChannel := true
546✔
1192

546✔
1193
                                        // Run through each filter and check whether the
546✔
1194
                                        // channel should be included.
546✔
1195
                                        for _, f := range filters {
979✔
1196
                                                // If the channel fails the filter, set
433✔
1197
                                                // includeChannel to false and don't bother
433✔
1198
                                                // checking the remaining filters.
433✔
1199
                                                if !f(channel) {
461✔
1200
                                                        includeChannel = false
28✔
1201
                                                        break
28✔
1202
                                                }
1203
                                        }
1204

1205
                                        // If the channel passed every filter, include it in
1206
                                        // our set of channels.
1207
                                        if includeChannel {
1,067✔
1208
                                                channels = append(channels, channel)
521✔
1209
                                        }
521✔
1210
                                }
1211
                                return nil
509✔
1212
                        })
1213

1214
                })
1215
        }, func() {
1,063✔
1216
                channels = nil
1,063✔
1217
        })
1,063✔
1218
        if err != nil {
1,063✔
1219
                return nil, err
×
1220
        }
×
1221

1222
        return channels, nil
1,063✔
1223
}
1224

1225
// FetchClosedChannels attempts to fetch all closed channels from the database.
1226
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1227
// be returned in the response or not. When a channel was cooperatively closed,
1228
// it becomes fully closed after a single confirmation.  When a channel was
1229
// forcibly closed, it will become fully closed after _all_ the pending funds
1230
// (if any) have been swept.
1231
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1232
        []*ChannelCloseSummary, error) {
503✔
1233

503✔
1234
        var chanSummaries []*ChannelCloseSummary
503✔
1235

503✔
1236
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1,006✔
1237
                closeBucket := tx.ReadBucket(closedChannelBucket)
503✔
1238
                if closeBucket == nil {
503✔
1239
                        return ErrNoClosedChannels
×
1240
                }
×
1241

1242
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
526✔
1243
                        summaryReader := bytes.NewReader(summaryBytes)
23✔
1244
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
23✔
1245
                        if err != nil {
23✔
1246
                                return err
×
1247
                        }
×
1248

1249
                        // If the query specified to only include pending
1250
                        // channels, then we'll skip any channels which aren't
1251
                        // currently pending.
1252
                        if !chanSummary.IsPending && pendingOnly {
27✔
1253
                                return nil
4✔
1254
                        }
4✔
1255

1256
                        chanSummaries = append(chanSummaries, chanSummary)
22✔
1257
                        return nil
22✔
1258
                })
1259
        }, func() {
503✔
1260
                chanSummaries = nil
503✔
1261
        }); err != nil {
503✔
1262
                return nil, err
×
1263
        }
×
1264

1265
        return chanSummaries, nil
503✔
1266
}
1267

1268
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1269
// the channeldb.
1270
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1271

1272
// FetchClosedChannel queries for a channel close summary using the channel
1273
// point of the channel in question.
1274
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1275
        *ChannelCloseSummary, error) {
6✔
1276

6✔
1277
        var chanSummary *ChannelCloseSummary
6✔
1278
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
12✔
1279
                closeBucket := tx.ReadBucket(closedChannelBucket)
6✔
1280
                if closeBucket == nil {
6✔
1281
                        return ErrClosedChannelNotFound
×
1282
                }
×
1283

1284
                var b bytes.Buffer
6✔
1285
                var err error
6✔
1286
                if err = graphdb.WriteOutpoint(&b, chanID); err != nil {
6✔
1287
                        return err
×
1288
                }
×
1289

1290
                summaryBytes := closeBucket.Get(b.Bytes())
6✔
1291
                if summaryBytes == nil {
7✔
1292
                        return ErrClosedChannelNotFound
1✔
1293
                }
1✔
1294

1295
                summaryReader := bytes.NewReader(summaryBytes)
5✔
1296
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
5✔
1297

5✔
1298
                return err
5✔
1299
        }, func() {
6✔
1300
                chanSummary = nil
6✔
1301
        }); err != nil {
7✔
1302
                return nil, err
1✔
1303
        }
1✔
1304

1305
        return chanSummary, nil
5✔
1306
}
1307

1308
// FetchClosedChannelForID queries for a channel close summary using the
1309
// channel ID of the channel in question.
1310
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1311
        *ChannelCloseSummary, error) {
105✔
1312

105✔
1313
        var chanSummary *ChannelCloseSummary
105✔
1314
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
210✔
1315
                closeBucket := tx.ReadBucket(closedChannelBucket)
105✔
1316
                if closeBucket == nil {
105✔
1317
                        return ErrClosedChannelNotFound
×
1318
                }
×
1319

1320
                // The first 30 bytes of the channel ID and outpoint will be
1321
                // equal.
1322
                cursor := closeBucket.ReadCursor()
105✔
1323
                op, c := cursor.Seek(cid[:30])
105✔
1324

105✔
1325
                // We scan over all possible candidates for this channel ID.
105✔
1326
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
5,360✔
1327
                        var outPoint wire.OutPoint
5,255✔
1328
                        err := graphdb.ReadOutpoint(
5,255✔
1329
                                bytes.NewReader(op), &outPoint,
5,255✔
1330
                        )
5,255✔
1331
                        if err != nil {
5,255✔
1332
                                return err
×
1333
                        }
×
1334

1335
                        // If the found outpoint does not correspond to this
1336
                        // channel ID, we continue.
1337
                        if !cid.IsChanPoint(&outPoint) {
10,406✔
1338
                                continue
5,151✔
1339
                        }
1340

1341
                        // Deserialize the close summary and return.
1342
                        r := bytes.NewReader(c)
104✔
1343
                        chanSummary, err = deserializeCloseChannelSummary(r)
104✔
1344
                        if err != nil {
104✔
1345
                                return err
×
1346
                        }
×
1347

1348
                        return nil
104✔
1349
                }
1350
                return ErrClosedChannelNotFound
4✔
1351
        }, func() {
105✔
1352
                chanSummary = nil
105✔
1353
        }); err != nil {
109✔
1354
                return nil, err
4✔
1355
        }
4✔
1356

1357
        return chanSummary, nil
104✔
1358
}
1359

1360
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1361
// channel should be marked as fully closed if the channel was initially
1362
// cooperatively closed and it's reached a single confirmation, or after all
1363
// the pending funds in a channel that has been forcibly closed have been
1364
// swept.
1365
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
10✔
1366
        var (
10✔
1367
                openChannels  []*OpenChannel
10✔
1368
                pruneLinkNode *btcec.PublicKey
10✔
1369
        )
10✔
1370
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
20✔
1371
                var b bytes.Buffer
10✔
1372
                if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
10✔
1373
                        return err
×
1374
                }
×
1375

1376
                chanID := b.Bytes()
10✔
1377

10✔
1378
                closedChanBucket, err := tx.CreateTopLevelBucket(
10✔
1379
                        closedChannelBucket,
10✔
1380
                )
10✔
1381
                if err != nil {
10✔
1382
                        return err
×
1383
                }
×
1384

1385
                chanSummaryBytes := closedChanBucket.Get(chanID)
10✔
1386
                if chanSummaryBytes == nil {
10✔
1387
                        return fmt.Errorf("no closed channel for "+
×
1388
                                "chan_point=%v found", chanPoint)
×
1389
                }
×
1390

1391
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
10✔
1392
                chanSummary, err := deserializeCloseChannelSummary(
10✔
1393
                        chanSummaryReader,
10✔
1394
                )
10✔
1395
                if err != nil {
10✔
1396
                        return err
×
1397
                }
×
1398

1399
                chanSummary.IsPending = false
10✔
1400

10✔
1401
                var newSummary bytes.Buffer
10✔
1402
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
10✔
1403
                if err != nil {
10✔
1404
                        return err
×
1405
                }
×
1406

1407
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
10✔
1408
                if err != nil {
10✔
1409
                        return err
×
1410
                }
×
1411

1412
                // Now that the channel is closed, we'll check if we have any
1413
                // other open channels with this peer. If we don't we'll
1414
                // garbage collect it to ensure we don't establish persistent
1415
                // connections to peers without open channels.
1416
                pruneLinkNode = chanSummary.RemotePub
10✔
1417
                openChannels, err = c.fetchOpenChannels(
10✔
1418
                        tx, pruneLinkNode,
10✔
1419
                )
10✔
1420
                if err != nil {
10✔
1421
                        return fmt.Errorf("unable to fetch open channels for "+
×
1422
                                "peer %x: %v",
×
1423
                                pruneLinkNode.SerializeCompressed(), err)
×
1424
                }
×
1425

1426
                return nil
10✔
1427
        }, func() {
10✔
1428
                openChannels = nil
10✔
1429
                pruneLinkNode = nil
10✔
1430
        })
10✔
1431
        if err != nil {
10✔
1432
                return err
×
1433
        }
×
1434

1435
        // Decide whether we want to remove the link node, based upon the number
1436
        // of still open channels.
1437
        return c.pruneLinkNode(openChannels, pruneLinkNode)
10✔
1438
}
1439

1440
// pruneLinkNode determines whether we should garbage collect a link node from
1441
// the database due to no longer having any open channels with it. If there are
1442
// any left, then this acts as a no-op.
1443
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1444
        remotePub *btcec.PublicKey) error {
10✔
1445

10✔
1446
        if len(openChannels) > 0 {
13✔
1447
                return nil
3✔
1448
        }
3✔
1449

1450
        log.Infof("Pruning link node %x with zero open channels from database",
10✔
1451
                remotePub.SerializeCompressed())
10✔
1452

10✔
1453
        return c.linkNodeDB.DeleteLinkNode(remotePub)
10✔
1454
}
1455

1456
// PruneLinkNodes attempts to prune all link nodes found within the database
1457
// with whom we no longer have any open channels with.
1458
func (c *ChannelStateDB) PruneLinkNodes() error {
3✔
1459
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
3✔
1460
        if err != nil {
3✔
1461
                return err
×
1462
        }
×
1463

1464
        for _, linkNode := range allLinkNodes {
6✔
1465
                var (
3✔
1466
                        openChannels []*OpenChannel
3✔
1467
                        linkNode     = linkNode
3✔
1468
                )
3✔
1469
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1470
                        var err error
3✔
1471
                        openChannels, err = c.fetchOpenChannels(
3✔
1472
                                tx, linkNode.IdentityPub,
3✔
1473
                        )
3✔
1474
                        return err
3✔
1475
                }, func() {
6✔
1476
                        openChannels = nil
3✔
1477
                })
3✔
1478
                if err != nil {
3✔
1479
                        return err
×
1480
                }
×
1481

1482
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
3✔
1483
                if err != nil {
3✔
1484
                        return err
×
1485
                }
×
1486
        }
1487

1488
        return nil
3✔
1489
}
1490

1491
// ChannelShell is a shell of a channel that is meant to be used for channel
1492
// recovery purposes. It contains a minimal OpenChannel instance along with
1493
// addresses for that target node.
1494
type ChannelShell struct {
1495
        // NodeAddrs the set of addresses that this node has known to be
1496
        // reachable at in the past.
1497
        NodeAddrs []net.Addr
1498

1499
        // Chan is a shell of an OpenChannel, it contains only the items
1500
        // required to restore the channel on disk.
1501
        Chan *OpenChannel
1502
}
1503

1504
// RestoreChannelShells is a method that allows the caller to reconstruct the
1505
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1506
// new channel to disk, create a LinkNode instance with the passed node
1507
// addresses, and finally create an edge within the graph for the channel as
1508
// well. This method is idempotent, so repeated calls with the same set of
1509
// channel shells won't modify the database after the initial call.
1510
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
4✔
1511
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
8✔
1512
                for _, channelShell := range channelShells {
8✔
1513
                        channel := channelShell.Chan
4✔
1514

4✔
1515
                        // When we make a channel, we mark that the channel has
4✔
1516
                        // been restored, this will signal to other sub-systems
4✔
1517
                        // to not attempt to use the channel as if it was a
4✔
1518
                        // regular one.
4✔
1519
                        channel.chanStatus |= ChanStatusRestored
4✔
1520

4✔
1521
                        // First, we'll attempt to create a new open channel
4✔
1522
                        // and link node for this channel. If the channel
4✔
1523
                        // already exists, then in order to ensure this method
4✔
1524
                        // is idempotent, we'll continue to the next step.
4✔
1525
                        channel.Db = c
4✔
1526
                        err := syncNewChannel(
4✔
1527
                                tx, channel, channelShell.NodeAddrs,
4✔
1528
                        )
4✔
1529
                        if err != nil {
7✔
1530
                                return err
3✔
1531
                        }
3✔
1532
                }
1533

1534
                return nil
4✔
1535
        }, func() {})
4✔
1536
        if err != nil {
7✔
1537
                return err
3✔
1538
        }
3✔
1539

1540
        return nil
4✔
1541
}
1542

1543
// AddrsForNode consults the channel database for all addresses known to the
1544
// passed node public key. The returned boolean indicates if the given node is
1545
// unknown to the channel DB or not.
1546
//
1547
// NOTE: this is part of the AddrSource interface.
1548
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr, error) {
4✔
1549
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
4✔
1550
        // Only if the error is something other than ErrNodeNotFound do we
4✔
1551
        // return it.
4✔
1552
        switch {
4✔
1553
        case err != nil && !errors.Is(err, ErrNodeNotFound):
×
1554
                return false, nil, err
×
1555

1556
        case errors.Is(err, ErrNodeNotFound):
×
1557
                return false, nil, nil
×
1558
        }
1559

1560
        return true, linkNode.Addresses, nil
4✔
1561
}
1562

1563
// AbandonChannel attempts to remove the target channel from the open channel
1564
// database. If the channel was already removed (has a closed channel entry),
1565
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1566
// into the database.
1567
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1568
        bestHeight uint32) error {
7✔
1569

7✔
1570
        // With the chanPoint constructed, we'll attempt to find the target
7✔
1571
        // channel in the database. If we can't find the channel, then we'll
7✔
1572
        // return the error back to the caller.
7✔
1573
        dbChan, err := c.FetchChannel(*chanPoint)
7✔
1574
        switch {
7✔
1575
        // If the channel wasn't found, then it's possible that it was already
1576
        // abandoned from the database.
1577
        case err == ErrChannelNotFound:
5✔
1578
                _, closedErr := c.FetchClosedChannel(chanPoint)
5✔
1579
                if closedErr != nil {
6✔
1580
                        return closedErr
1✔
1581
                }
1✔
1582

1583
                // If the channel was already closed, then we don't return an
1584
                // error as we'd like this step to be repeatable.
1585
                return nil
4✔
1586
        case err != nil:
×
1587
                return err
×
1588
        }
1589

1590
        // Now that we've found the channel, we'll populate a close summary for
1591
        // the channel, so we can store as much information for this abounded
1592
        // channel as possible. We also ensure that we set Pending to false, to
1593
        // indicate that this channel has been "fully" closed.
1594
        summary := &ChannelCloseSummary{
5✔
1595
                CloseType:               Abandoned,
5✔
1596
                ChanPoint:               *chanPoint,
5✔
1597
                ChainHash:               dbChan.ChainHash,
5✔
1598
                CloseHeight:             bestHeight,
5✔
1599
                RemotePub:               dbChan.IdentityPub,
5✔
1600
                Capacity:                dbChan.Capacity,
5✔
1601
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
5✔
1602
                ShortChanID:             dbChan.ShortChanID(),
5✔
1603
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
5✔
1604
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
5✔
1605
                LocalChanConfig:         dbChan.LocalChanCfg,
5✔
1606
        }
5✔
1607

5✔
1608
        // Finally, we'll close the channel in the DB, and return back to the
5✔
1609
        // caller. We set ourselves as the close initiator because we abandoned
5✔
1610
        // the channel.
5✔
1611
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
5✔
1612
}
1613

1614
// SaveChannelOpeningState saves the serialized channel state for the provided
1615
// chanPoint to the channelOpeningStateBucket.
1616
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1617
        serializedState []byte) error {
95✔
1618

95✔
1619
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
190✔
1620
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
95✔
1621
                if err != nil {
95✔
1622
                        return err
×
1623
                }
×
1624

1625
                return bucket.Put(outPoint, serializedState)
95✔
1626
        }, func() {})
95✔
1627
}
1628

1629
// GetChannelOpeningState fetches the serialized channel state for the provided
1630
// outPoint from the database, or returns ErrChannelNotFound if the channel
1631
// is not found.
1632
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1633
        error) {
255✔
1634

255✔
1635
        var serializedState []byte
255✔
1636
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
510✔
1637
                bucket := tx.ReadBucket(channelOpeningStateBucket)
255✔
1638
                if bucket == nil {
258✔
1639
                        // If the bucket does not exist, it means we never added
3✔
1640
                        //  a channel to the db, so return ErrChannelNotFound.
3✔
1641
                        return ErrChannelNotFound
3✔
1642
                }
3✔
1643

1644
                stateBytes := bucket.Get(outPoint)
255✔
1645
                if stateBytes == nil {
305✔
1646
                        return ErrChannelNotFound
50✔
1647
                }
50✔
1648

1649
                serializedState = append(serializedState, stateBytes...)
208✔
1650

208✔
1651
                return nil
208✔
1652
        }, func() {
255✔
1653
                serializedState = nil
255✔
1654
        })
255✔
1655
        return serializedState, err
255✔
1656
}
1657

1658
// DeleteChannelOpeningState removes any state for outPoint from the database.
1659
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
27✔
1660
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
54✔
1661
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
27✔
1662
                if bucket == nil {
27✔
1663
                        return ErrChannelNotFound
×
1664
                }
×
1665

1666
                return bucket.Delete(outPoint)
27✔
1667
        }, func() {})
27✔
1668
}
1669

1670
// syncVersions function is used for safe db version synchronization. It
1671
// applies migration functions to the current database and recovers the
1672
// previous state of db if at least one error/panic appeared during migration.
1673
func (d *DB) syncVersions(versions []mandatoryVersion) error {
1,756✔
1674
        meta, err := d.FetchMeta()
1,756✔
1675
        if err != nil {
1,756✔
1676
                if err == ErrMetaNotFound {
×
1677
                        meta = &Meta{}
×
1678
                } else {
×
1679
                        return err
×
1680
                }
×
1681
        }
1682

1683
        latestVersion := getLatestDBVersion(versions)
1,756✔
1684
        log.Infof("Checking for schema update: latest_version=%v, "+
1,756✔
1685
                "db_version=%v", latestVersion, meta.DbVersionNumber)
1,756✔
1686

1,756✔
1687
        switch {
1,756✔
1688

1689
        // If the database reports a higher version that we are aware of, the
1690
        // user is probably trying to revert to a prior version of lnd. We fail
1691
        // here to prevent reversions and unintended corruption.
1692
        case meta.DbVersionNumber > latestVersion:
1✔
1693
                log.Errorf("Refusing to revert from db_version=%d to "+
1✔
1694
                        "lower version=%d", meta.DbVersionNumber,
1✔
1695
                        latestVersion)
1✔
1696
                return ErrDBReversion
1✔
1697

1698
        // If the current database version matches the latest version number,
1699
        // then we don't need to perform any migrations.
1700
        case meta.DbVersionNumber == latestVersion:
1,751✔
1701
                return nil
1,751✔
1702
        }
1703

1704
        log.Infof("Performing database schema migration")
4✔
1705

4✔
1706
        // Otherwise, we fetch the migrations which need to applied, and
4✔
1707
        // execute them serially within a single database transaction to ensure
4✔
1708
        // the migration is atomic.
4✔
1709
        migrations, migrationVersions := getMigrationsToApply(
4✔
1710
                versions, meta.DbVersionNumber,
4✔
1711
        )
4✔
1712
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
8✔
1713
                for i, migration := range migrations {
8✔
1714
                        if migration == nil {
4✔
1715
                                continue
×
1716
                        }
1717

1718
                        log.Infof("Applying migration #%v",
4✔
1719
                                migrationVersions[i])
4✔
1720

4✔
1721
                        if err := migration(tx); err != nil {
5✔
1722
                                log.Infof("Unable to apply migration #%v",
1✔
1723
                                        migrationVersions[i])
1✔
1724
                                return err
1✔
1725
                        }
1✔
1726
                }
1727

1728
                meta.DbVersionNumber = latestVersion
2✔
1729
                err := putMeta(meta, tx)
2✔
1730
                if err != nil {
2✔
1731
                        return err
×
1732
                }
×
1733

1734
                // In dry-run mode, return an error to prevent the transaction
1735
                // from committing.
1736
                if d.dryRun {
3✔
1737
                        return ErrDryRunMigrationOK
1✔
1738
                }
1✔
1739

1740
                return nil
1✔
1741
        }, func() {})
4✔
1742
}
1743

1744
// applyOptionalVersions applies the optional migrations to the database if
1745
// specified in the config.
1746
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
1,754✔
1747
        // TODO(yy): need to design the db to support dry run for optional
1,754✔
1748
        // migrations.
1,754✔
1749
        if d.dryRun {
1,755✔
1750
                log.Info("Skipped optional migrations as dry run mode is not " +
1✔
1751
                        "supported yet")
1✔
1752
                return nil
1✔
1753
        }
1✔
1754

1755
        om, err := d.fetchOptionalMeta()
1,753✔
1756
        if err != nil {
1,753✔
1757
                if err == ErrMetaNotFound {
×
1758
                        om = &OptionalMeta{
×
1759
                                Versions: make(map[uint64]string),
×
1760
                        }
×
1761
                } else {
×
1762
                        return err
×
1763
                }
×
1764
        }
1765

1766
        // migrationCfg is the parent migration which implements the config
1767
        // interfaces of the single optional migrations.
1768
        migrationCfg := &MigrationConfigImpl{
1,753✔
1769
                migration30.MigrateRevLogConfigImpl{
1,753✔
1770
                        NoAmountData: d.noRevLogAmtData,
1,753✔
1771
                },
1,753✔
1772
                migration34.MigrationConfigImpl{
1,753✔
1773
                        DecayedLog: cfg.DecayedLog,
1,753✔
1774
                },
1,753✔
1775
        }
1,753✔
1776

1,753✔
1777
        log.Infof("Applying %d optional migrations", len(optionalVersions))
1,753✔
1778

1,753✔
1779
        // Apply the optional migrations if requested.
1,753✔
1780
        for number, version := range optionalVersions {
5,256✔
1781
                log.Infof("Checking for optional update: name=%v", version.name)
3,503✔
1782

3,503✔
1783
                // Exit early if the optional migration is not specified.
3,503✔
1784
                if !cfg.MigrationFlags[number] {
7,002✔
1785
                        log.Debugf("Skipping optional migration: name=%s as "+
3,499✔
1786
                                "it is not specified in the config",
3,499✔
1787
                                version.name)
3,499✔
1788

3,499✔
1789
                        continue
3,499✔
1790
                }
1791

1792
                // Exit early if the optional migration has already been
1793
                // applied.
1794
                if _, ok := om.Versions[uint64(number)]; ok {
12✔
1795
                        log.Debugf("Skipping optional migration: name=%s as "+
5✔
1796
                                "it has already been applied", version.name)
5✔
1797

5✔
1798
                        continue
5✔
1799
                }
1800

1801
                log.Infof("Performing database optional migration: %s",
5✔
1802
                        version.name)
5✔
1803

5✔
1804
                // Call the migration function for the specific optional
5✔
1805
                // migration.
5✔
1806
                if err := version.migration(d, migrationCfg); err != nil {
5✔
NEW
1807
                        log.Errorf("Unable to apply optional migration: %s, "+
×
NEW
1808
                                "error: %v", version.name, err)
×
NEW
1809
                        return err
×
NEW
1810
                }
×
1811

1812
                // Update the optional meta. Notice that unlike the mandatory db
1813
                // migrations where we perform the migration and updating meta
1814
                // in a single db transaction, we use different transactions
1815
                // here. Even when the following update is failed, we should be
1816
                // fine here as we would re-run the optional migration again,
1817
                // which is a noop, during next startup.
1818
                om.Versions[uint64(number)] = version.name
5✔
1819
                if err := d.putOptionalMeta(om); err != nil {
5✔
NEW
1820
                        log.Errorf("Unable to update optional meta: %v", err)
×
NEW
1821
                        return err
×
NEW
1822
                }
×
1823

1824
                log.Infof("Successfully applied optional migration: %s",
5✔
1825
                        version.name)
5✔
1826
        }
1827

1828
        return nil
1,753✔
1829
}
1830

1831
// ChannelStateDB returns the sub database that is concerned with the channel
1832
// state.
1833
func (d *DB) ChannelStateDB() *ChannelStateDB {
2,171✔
1834
        return d.channelStateDB
2,171✔
1835
}
2,171✔
1836

1837
// LatestDBVersion returns the number of the latest database version currently
1838
// known to the channel DB.
1839
func LatestDBVersion() uint32 {
1✔
1840
        return getLatestDBVersion(dbVersions)
1✔
1841
}
1✔
1842

1843
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
3,479✔
1844
        return versions[len(versions)-1].number
3,479✔
1845
}
3,479✔
1846

1847
// getMigrationsToApply retrieves the migration function that should be
1848
// applied to the database.
1849
func getMigrationsToApply(versions []mandatoryVersion,
1850
        version uint32) ([]migration, []uint32) {
5✔
1851

5✔
1852
        migrations := make([]migration, 0, len(versions))
5✔
1853
        migrationVersions := make([]uint32, 0, len(versions))
5✔
1854

5✔
1855
        for _, v := range versions {
17✔
1856
                if v.number > version {
18✔
1857
                        migrations = append(migrations, v.migration)
6✔
1858
                        migrationVersions = append(migrationVersions, v.number)
6✔
1859
                }
6✔
1860
        }
1861

1862
        return migrations, migrationVersions
5✔
1863
}
1864

1865
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1866
// from the historical channel bucket. If the bucket does not exist,
1867
// ErrNoHistoricalBucket is returned.
1868
func fetchHistoricalChanBucket(tx kvdb.RTx,
1869
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
7✔
1870

7✔
1871
        // First fetch the top level bucket which stores all data related to
7✔
1872
        // historically stored channels.
7✔
1873
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
7✔
1874
        if historicalChanBucket == nil {
7✔
1875
                return nil, ErrNoHistoricalBucket
×
1876
        }
×
1877

1878
        // With the bucket for the node and chain fetched, we can now go down
1879
        // another level, for the channel itself.
1880
        var chanPointBuf bytes.Buffer
7✔
1881
        if err := graphdb.WriteOutpoint(&chanPointBuf, outPoint); err != nil {
7✔
1882
                return nil, err
×
1883
        }
×
1884
        chanBucket := historicalChanBucket.NestedReadBucket(
7✔
1885
                chanPointBuf.Bytes(),
7✔
1886
        )
7✔
1887
        if chanBucket == nil {
9✔
1888
                return nil, ErrChannelNotFound
2✔
1889
        }
2✔
1890

1891
        return chanBucket, nil
5✔
1892
}
1893

1894
// FetchHistoricalChannel fetches open channel data from the historical channel
1895
// bucket.
1896
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1897
        *OpenChannel, error) {
7✔
1898

7✔
1899
        var channel *OpenChannel
7✔
1900
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
14✔
1901
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
7✔
1902
                if err != nil {
9✔
1903
                        return err
2✔
1904
                }
2✔
1905

1906
                channel, err = fetchOpenChannel(chanBucket, outPoint)
5✔
1907
                if err != nil {
5✔
1908
                        return err
×
1909
                }
×
1910

1911
                channel.Db = c
5✔
1912
                return nil
5✔
1913
        }, func() {
7✔
1914
                channel = nil
7✔
1915
        })
7✔
1916
        if err != nil {
9✔
1917
                return nil, err
2✔
1918
        }
2✔
1919

1920
        return channel, nil
5✔
1921
}
1922

1923
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1924
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
13✔
1925

13✔
1926
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
13✔
1927
        if finalHtlcsBucket == nil {
19✔
1928
                return nil, ErrFinalHtlcsBucketNotFound
6✔
1929
        }
6✔
1930

1931
        var chanIDBytes [8]byte
7✔
1932
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
7✔
1933

7✔
1934
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
7✔
1935
        if chanBucket == nil {
7✔
1936
                return nil, ErrFinalChannelBucketNotFound
×
1937
        }
×
1938

1939
        return chanBucket, nil
7✔
1940
}
1941

1942
var ErrHtlcUnknown = errors.New("htlc unknown")
1943

1944
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1945
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1946
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1947
        htlcIndex uint64) (*FinalHtlcInfo, error) {
13✔
1948

13✔
1949
        var idBytes [8]byte
13✔
1950
        byteOrder.PutUint64(idBytes[:], htlcIndex)
13✔
1951

13✔
1952
        var settledByte byte
13✔
1953

13✔
1954
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
26✔
1955
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
13✔
1956
                        tx, chanID,
13✔
1957
                )
13✔
1958
                switch {
13✔
1959
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
6✔
1960
                        fallthrough
6✔
1961

1962
                case errors.Is(err, ErrFinalChannelBucketNotFound):
6✔
1963
                        return ErrHtlcUnknown
6✔
1964

1965
                case err != nil:
×
1966
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1967
                                err)
×
1968
                }
1969

1970
                value := finalHtlcsBucket.Get(idBytes[:])
7✔
1971
                if value == nil {
8✔
1972
                        return ErrHtlcUnknown
1✔
1973
                }
1✔
1974

1975
                if len(value) != 1 {
6✔
1976
                        return errors.New("unexpected final htlc value length")
×
1977
                }
×
1978

1979
                settledByte = value[0]
6✔
1980

6✔
1981
                return nil
6✔
1982
        }, func() {
13✔
1983
                settledByte = 0
13✔
1984
        })
13✔
1985
        if err != nil {
20✔
1986
                return nil, err
7✔
1987
        }
7✔
1988

1989
        info := FinalHtlcInfo{
6✔
1990
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
6✔
1991
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
6✔
1992
        }
6✔
1993

6✔
1994
        return &info, nil
6✔
1995
}
1996

1997
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
1998
// the database.
1999
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
2000
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
4✔
2001

4✔
2002
        // Skip if the user did not opt in to storing final resolutions.
4✔
2003
        if !c.parent.storeFinalHtlcResolutions {
7✔
2004
                return nil
3✔
2005
        }
3✔
2006

2007
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2✔
2008
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
1✔
2009
                if err != nil {
1✔
2010
                        return err
×
2011
                }
×
2012

2013
                return putFinalHtlc(
1✔
2014
                        finalHtlcsBucket, htlcID,
1✔
2015
                        FinalHtlcInfo{
1✔
2016
                                Settled:  settled,
1✔
2017
                                Offchain: false,
1✔
2018
                        },
1✔
2019
                )
1✔
2020
        }, func() {})
1✔
2021
}
2022

2023
// MakeTestInvoiceDB is used to create a test invoice database for testing
2024
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
2025
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
2026
        invoices.InvoiceDB, error) {
154✔
2027

154✔
2028
        return MakeTestDB(t, modifiers...)
154✔
2029
}
154✔
2030

2031
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
2032
// A callback which cleans up the created temporary directories is also
2033
// returned and intended to be executed after the test completes.
2034
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
289✔
2035
        // First, create a temporary directory to be used for the duration of
289✔
2036
        // this test.
289✔
2037
        tempDirName := t.TempDir()
289✔
2038

289✔
2039
        // Next, create channeldb for the first time.
289✔
2040
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
289✔
2041
        if err != nil {
289✔
2042
                backendCleanup()
×
2043
                return nil, err
×
2044
        }
×
2045

2046
        cdb, err := CreateWithBackend(backend, modifiers...)
289✔
2047
        if err != nil {
289✔
2048
                backendCleanup()
×
2049
                return nil, err
×
2050
        }
×
2051

2052
        t.Cleanup(func() {
578✔
2053
                cdb.Close()
289✔
2054
                backendCleanup()
289✔
2055
        })
289✔
2056

2057
        return cdb, nil
289✔
2058
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc