• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15858991938

24 Jun 2025 06:51PM UTC coverage: 55.808% (-2.4%) from 58.173%
15858991938

Pull #9148

github

web-flow
Merge 0e921d6a5 into 29ff13d83
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

232 of 267 new or added lines in 5 files covered. (86.89%)

24606 existing lines in 281 files now uncovered.

108380 of 194201 relevant lines covered (55.81%)

22488.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.29
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "fmt"
8
        "net"
9
        "os"
10
        "testing"
11

12
        "github.com/btcsuite/btcd/btcec/v2"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/walletdb"
15
        "github.com/go-errors/errors"
16
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
17
        "github.com/lightningnetwork/lnd/channeldb/migration12"
18
        "github.com/lightningnetwork/lnd/channeldb/migration13"
19
        "github.com/lightningnetwork/lnd/channeldb/migration16"
20
        "github.com/lightningnetwork/lnd/channeldb/migration20"
21
        "github.com/lightningnetwork/lnd/channeldb/migration21"
22
        "github.com/lightningnetwork/lnd/channeldb/migration23"
23
        "github.com/lightningnetwork/lnd/channeldb/migration24"
24
        "github.com/lightningnetwork/lnd/channeldb/migration25"
25
        "github.com/lightningnetwork/lnd/channeldb/migration26"
26
        "github.com/lightningnetwork/lnd/channeldb/migration27"
27
        "github.com/lightningnetwork/lnd/channeldb/migration29"
28
        "github.com/lightningnetwork/lnd/channeldb/migration30"
29
        "github.com/lightningnetwork/lnd/channeldb/migration31"
30
        "github.com/lightningnetwork/lnd/channeldb/migration32"
31
        "github.com/lightningnetwork/lnd/channeldb/migration33"
32
        "github.com/lightningnetwork/lnd/channeldb/migration34"
33
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
34
        "github.com/lightningnetwork/lnd/clock"
35
        graphdb "github.com/lightningnetwork/lnd/graph/db"
36
        "github.com/lightningnetwork/lnd/invoices"
37
        "github.com/lightningnetwork/lnd/kvdb"
38
        "github.com/lightningnetwork/lnd/lnwire"
39
        "github.com/stretchr/testify/require"
40
)
41

42
const (
43
        dbName = "channel.db"
44
)
45

46
var (
47
        // ErrDryRunMigrationOK signals that a migration executed successful,
48
        // but we intentionally did not commit the result.
49
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
50

51
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
52
        // bucket does not exist.
53
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
54
                "found")
55

56
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
57
        // final htlc outcomes does not exist.
58
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
59
                "bucket not found")
60
)
61

62
// migration is a function which takes a prior outdated version of the database
63
// instances and mutates the key/bucket structure to arrive at a more
64
// up-to-date version of the database.
65
type migration func(tx kvdb.RwTx) error
66

67
// mandatoryVersion defines a db version that must be applied before the lnd
68
// starts.
69
type mandatoryVersion struct {
70
        number    uint32
71
        migration migration
72
}
73

74
// MigrationConfig is an interface combines the config interfaces of all
75
// optional migrations.
76
type MigrationConfig interface {
77
        migration30.MigrateRevLogConfig
78
        migration34.MigrationConfig
79
}
80

81
// MigrationConfigImpl is a super set of all the various migration configs and
82
// an implementation of MigrationConfig.
83
type MigrationConfigImpl struct {
84
        migration30.MigrateRevLogConfigImpl
85
        migration34.MigrationConfigImpl
86
}
87

88
// optionalMigration defines an optional migration function. When a migration
89
// is optional, it usually involves a large scale of changes that might touch
90
// millions of keys. Due to OOM concern, the update cannot be safely done
91
// within one db transaction. Thus, for optional migrations, they must take the
92
// db backend and construct transactions as needed.
93
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
94

95
// optionalVersion defines a db version that can be optionally applied. When
96
// applying migrations, we must apply all the mandatory migrations first before
97
// attempting optional ones.
98
type optionalVersion struct {
99
        name      string
100
        migration optionalMigration
101
}
102

103
var (
104
        // dbVersions is storing all mandatory versions of database. If current
105
        // version of database don't match with latest version this list will
106
        // be used for retrieving all migration function that are need to apply
107
        // to the current db.
108
        dbVersions = []mandatoryVersion{
109
                {
110
                        // The base DB version requires no migration.
111
                        number:    0,
112
                        migration: nil,
113
                },
114
                {
115
                        // The version of the database where two new indexes
116
                        // for the update time of node and channel updates were
117
                        // added.
118
                        number:    1,
119
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
120
                },
121
                {
122
                        // The DB version that added the invoice event time
123
                        // series.
124
                        number:    2,
125
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
126
                },
127
                {
128
                        // The DB version that updated the embedded invoice in
129
                        // outgoing payments to match the new format.
130
                        number:    3,
131
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
132
                },
133
                {
134
                        // The version of the database where every channel
135
                        // always has two entries in the edges bucket. If
136
                        // a policy is unknown, this will be represented
137
                        // by a special byte sequence.
138
                        number:    4,
139
                        migration: migration_01_to_11.MigrateEdgePolicies,
140
                },
141
                {
142
                        // The DB version where we persist each attempt to send
143
                        // an HTLC to a payment hash, and track whether the
144
                        // payment is in-flight, succeeded, or failed.
145
                        number:    5,
146
                        migration: migration_01_to_11.PaymentStatusesMigration,
147
                },
148
                {
149
                        // The DB version that properly prunes stale entries
150
                        // from the edge update index.
151
                        number:    6,
152
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
153
                },
154
                {
155
                        // The DB version that migrates the ChannelCloseSummary
156
                        // to a format where optional fields are indicated with
157
                        // boolean flags.
158
                        number:    7,
159
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
160
                },
161
                {
162
                        // The DB version that changes the gossiper's message
163
                        // store keys to account for the message's type and
164
                        // ShortChannelID.
165
                        number:    8,
166
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
167
                },
168
                {
169
                        // The DB version where the payments and payment
170
                        // statuses are moved to being stored in a combined
171
                        // bucket.
172
                        number:    9,
173
                        migration: migration_01_to_11.MigrateOutgoingPayments,
174
                },
175
                {
176
                        // The DB version where we started to store legacy
177
                        // payload information for all routes, as well as the
178
                        // optional TLV records.
179
                        number:    10,
180
                        migration: migration_01_to_11.MigrateRouteSerialization,
181
                },
182
                {
183
                        // Add invoice htlc and cltv delta fields.
184
                        number:    11,
185
                        migration: migration_01_to_11.MigrateInvoices,
186
                },
187
                {
188
                        // Migrate to TLV invoice bodies, add payment address
189
                        // and features, remove receipt.
190
                        number:    12,
191
                        migration: migration12.MigrateInvoiceTLV,
192
                },
193
                {
194
                        // Migrate to multi-path payments.
195
                        number:    13,
196
                        migration: migration13.MigrateMPP,
197
                },
198
                {
199
                        // Initialize payment address index and begin using it
200
                        // as the default index, falling back to payment hash
201
                        // index.
202
                        number:    14,
203
                        migration: mig.CreateTLB(payAddrIndexBucket),
204
                },
205
                {
206
                        // Initialize payment index bucket which will be used
207
                        // to index payments by sequence number. This index will
208
                        // be used to allow more efficient ListPayments queries.
209
                        number:    15,
210
                        migration: mig.CreateTLB(paymentsIndexBucket),
211
                },
212
                {
213
                        // Add our existing payments to the index bucket created
214
                        // in migration 15.
215
                        number:    16,
216
                        migration: migration16.MigrateSequenceIndex,
217
                },
218
                {
219
                        // Create a top level bucket which will store extra
220
                        // information about channel closes.
221
                        number:    17,
222
                        migration: mig.CreateTLB(closeSummaryBucket),
223
                },
224
                {
225
                        // Create a top level bucket which holds information
226
                        // about our peers.
227
                        number:    18,
228
                        migration: mig.CreateTLB(peersBucket),
229
                },
230
                {
231
                        // Create a top level bucket which holds outpoint
232
                        // information.
233
                        number:    19,
234
                        migration: mig.CreateTLB(outpointBucket),
235
                },
236
                {
237
                        // Migrate some data to the outpoint index.
238
                        number:    20,
239
                        migration: migration20.MigrateOutpointIndex,
240
                },
241
                {
242
                        // Migrate to length prefixed wire messages everywhere
243
                        // in the database.
244
                        number:    21,
245
                        migration: migration21.MigrateDatabaseWireMessages,
246
                },
247
                {
248
                        // Initialize set id index so that invoices can be
249
                        // queried by individual htlc sets.
250
                        number:    22,
251
                        migration: mig.CreateTLB(setIDIndexBucket),
252
                },
253
                {
254
                        number:    23,
255
                        migration: migration23.MigrateHtlcAttempts,
256
                },
257
                {
258
                        // Remove old forwarding packages of closed channels.
259
                        number:    24,
260
                        migration: migration24.MigrateFwdPkgCleanup,
261
                },
262
                {
263
                        // Save the initial local/remote balances in channel
264
                        // info.
265
                        number:    25,
266
                        migration: migration25.MigrateInitialBalances,
267
                },
268
                {
269
                        // Migrate the initial local/remote balance fields into
270
                        // tlv records.
271
                        number:    26,
272
                        migration: migration26.MigrateBalancesToTlvRecords,
273
                },
274
                {
275
                        // Patch the initial local/remote balance fields with
276
                        // empty values for historical channels.
277
                        number:    27,
278
                        migration: migration27.MigrateHistoricalBalances,
279
                },
280
                {
281
                        number:    28,
282
                        migration: mig.CreateTLB(chanIDBucket),
283
                },
284
                {
285
                        number:    29,
286
                        migration: migration29.MigrateChanID,
287
                },
288
                {
289
                        // Removes the "sweeper-last-tx" bucket. Although we
290
                        // do not have a mandatory version 30 we skip this
291
                        // version because its naming is already used for the
292
                        // first optional migration.
293
                        number:    31,
294
                        migration: migration31.DeleteLastPublishedTxTLB,
295
                },
296
                {
297
                        number:    32,
298
                        migration: migration32.MigrateMCRouteSerialisation,
299
                },
300
                {
301
                        number:    33,
302
                        migration: migration33.MigrateMCStoreNameSpacedResults,
303
                },
304
        }
305

306
        // optionalVersions stores all optional migrations that are applied
307
        // after dbVersions.
308
        //
309
        // NOTE: optional migrations must be fault-tolerant and re-run already
310
        // migrated data must be noop, which means the migration must be able
311
        // to determine its state.
312
        optionalVersions = []optionalVersion{
313
                {
314
                        name: "prune_revocation_log",
315
                        migration: func(db kvdb.Backend,
316
                                cfg MigrationConfig) error {
×
317

×
318
                                return migration30.MigrateRevocationLog(db, cfg)
×
319
                        },
×
320
                },
321
                {
322
                        name: "gc_decayed_log",
323
                        migration: func(db kvdb.Backend,
UNCOV
324
                                cfg MigrationConfig) error {
×
UNCOV
325

×
UNCOV
326
                                return migration34.MigrateDecayedLog(
×
UNCOV
327
                                        db, cfg,
×
UNCOV
328
                                )
×
UNCOV
329
                        },
×
330
                },
331
        }
332

333
        // Big endian is the preferred byte order, due to cursor scans over
334
        // integer keys iterating in order.
335
        byteOrder = binary.BigEndian
336

337
        // channelOpeningStateBucket is the database bucket used to store the
338
        // channelOpeningState for each channel that is currently in the process
339
        // of being opened.
340
        channelOpeningStateBucket = []byte("channelOpeningState")
341
)
342

343
// DB is the primary datastore for the lnd daemon. The database stores
344
// information related to nodes, routing data, open/closed channels, fee
345
// schedules, and reputation data.
346
type DB struct {
347
        kvdb.Backend
348

349
        // channelStateDB separates all DB operations on channel state.
350
        channelStateDB *ChannelStateDB
351

352
        dbPath                    string
353
        clock                     clock.Clock
354
        dryRun                    bool
355
        keepFailedPaymentAttempts bool
356
        storeFinalHtlcResolutions bool
357

358
        // noRevLogAmtData if true, means that commitment transaction amount
359
        // data should not be stored in the revocation log.
360
        noRevLogAmtData bool
361
}
362

363
// OpenForTesting opens or creates a channeldb to be used for tests. Any
364
// necessary schemas migrations due to updates will take place as necessary.
365
func OpenForTesting(t testing.TB, dbPath string,
366
        modifiers ...OptionModifier) *DB {
1,456✔
367

1,456✔
368
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
1,456✔
369
                DBPath:            dbPath,
1,456✔
370
                DBFileName:        dbName,
1,456✔
371
                NoFreelistSync:    true,
1,456✔
372
                AutoCompact:       false,
1,456✔
373
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
1,456✔
374
                DBTimeout:         kvdb.DefaultDBTimeout,
1,456✔
375
        })
1,456✔
376
        require.NoError(t, err)
1,456✔
377

1,456✔
378
        db, err := CreateWithBackend(backend, modifiers...)
1,456✔
379
        require.NoError(t, err)
1,456✔
380

1,456✔
381
        db.dbPath = dbPath
1,456✔
382

1,456✔
383
        t.Cleanup(func() {
2,912✔
384
                require.NoError(t, db.Close())
1,456✔
385
        })
1,456✔
386

387
        return db
1,456✔
388
}
389

390
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
391
// Any necessary schemas migrations due to updates will take place as necessary.
392
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
393
        error) {
1,751✔
394

1,751✔
395
        opts := DefaultOptions()
1,751✔
396
        for _, modifier := range modifiers {
1,928✔
397
                modifier(&opts)
177✔
398
        }
177✔
399

400
        if !opts.NoMigration {
3,502✔
401
                if err := initChannelDB(backend); err != nil {
1,752✔
402
                        return nil, err
1✔
403
                }
1✔
404
        }
405

406
        chanDB := &DB{
1,750✔
407
                Backend: backend,
1,750✔
408
                channelStateDB: &ChannelStateDB{
1,750✔
409
                        linkNodeDB: &LinkNodeDB{
1,750✔
410
                                backend: backend,
1,750✔
411
                        },
1,750✔
412
                        backend: backend,
1,750✔
413
                },
1,750✔
414
                clock:                     opts.clock,
1,750✔
415
                dryRun:                    opts.dryRun,
1,750✔
416
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
1,750✔
417
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
1,750✔
418
                noRevLogAmtData:           opts.NoRevLogAmtData,
1,750✔
419
        }
1,750✔
420

1,750✔
421
        // Set the parent pointer (only used in tests).
1,750✔
422
        chanDB.channelStateDB.parent = chanDB
1,750✔
423

1,750✔
424
        // Synchronize the version of database and apply migrations if needed.
1,750✔
425
        if !opts.NoMigration {
3,500✔
426
                if err := chanDB.syncVersions(dbVersions); err != nil {
1,751✔
427
                        backend.Close()
1✔
428
                        return nil, err
1✔
429
                }
1✔
430

431
                // Grab the optional migration config.
432
                omc := opts.OptionalMiragtionConfig
1,749✔
433
                if err := chanDB.applyOptionalVersions(omc); err != nil {
1,749✔
434
                        backend.Close()
×
435
                        return nil, err
×
436
                }
×
437
        }
438

439
        return chanDB, nil
1,749✔
440
}
441

442
// Path returns the file path to the channel database.
443
func (d *DB) Path() string {
197✔
444
        return d.dbPath
197✔
445
}
197✔
446

447
var dbTopLevelBuckets = [][]byte{
448
        openChannelBucket,
449
        closedChannelBucket,
450
        forwardingLogBucket,
451
        fwdPackagesKey,
452
        invoiceBucket,
453
        payAddrIndexBucket,
454
        setIDIndexBucket,
455
        paymentsIndexBucket,
456
        peersBucket,
457
        nodeInfoBucket,
458
        metaBucket,
459
        closeSummaryBucket,
460
        outpointBucket,
461
        chanIDBucket,
462
        historicalChannelBucket,
463
}
464

465
// Wipe completely deletes all saved state within all used buckets within the
466
// database. The deletion is done in a single transaction, therefore this
467
// operation is fully atomic.
468
func (d *DB) Wipe() error {
177✔
469
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
354✔
470
                for _, tlb := range dbTopLevelBuckets {
2,832✔
471
                        err := tx.DeleteTopLevelBucket(tlb)
2,655✔
472
                        if err != nil && err != kvdb.ErrBucketNotFound {
2,655✔
473
                                return err
×
474
                        }
×
475
                }
476
                return nil
177✔
477
        }, func() {})
177✔
478
        if err != nil {
177✔
479
                return err
×
480
        }
×
481

482
        return initChannelDB(d.Backend)
177✔
483
}
484

485
// initChannelDB creates and initializes a fresh version of channeldb. In the
486
// case that the target path has not yet been created or doesn't yet exist, then
487
// the path is created. Additionally, all required top-level buckets used within
488
// the database are created.
489
func initChannelDB(db kvdb.Backend) error {
1,928✔
490
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
3,856✔
491
                // Check if DB was marked as inactive with a tomb stone.
1,928✔
492
                if err := EnsureNoTombstone(tx); err != nil {
1,929✔
493
                        return err
1✔
494
                }
1✔
495

496
                for _, tlb := range dbTopLevelBuckets {
30,832✔
497
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
28,905✔
498
                                return err
×
499
                        }
×
500
                }
501

502
                meta := &Meta{}
1,927✔
503
                // Check if DB is already initialized.
1,927✔
504
                err := FetchMeta(meta, tx)
1,927✔
505
                if err == nil {
3,854✔
506
                        return nil
1,927✔
507
                }
1,927✔
508

509
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
×
510
                return putMeta(meta, tx)
×
511
        }, func() {})
1,928✔
512
        if err != nil {
1,929✔
513
                return fmt.Errorf("unable to create new channeldb: %w", err)
1✔
514
        }
1✔
515

516
        return nil
1,927✔
517
}
518

519
// fileExists returns true if the file exists, and false otherwise.
520
func fileExists(path string) bool {
1✔
521
        if _, err := os.Stat(path); err != nil {
1✔
522
                if os.IsNotExist(err) {
×
523
                        return false
×
524
                }
×
525
        }
526

527
        return true
1✔
528
}
529

530
// ChannelStateDB is a database that keeps track of all channel state.
531
type ChannelStateDB struct {
532
        // linkNodeDB separates all DB operations on LinkNodes.
533
        linkNodeDB *LinkNodeDB
534

535
        // parent holds a pointer to the "main" channeldb.DB object. This is
536
        // only used for testing and should never be used in production code.
537
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
538
        // this pointer.
539
        parent *DB
540

541
        // backend points to the actual backend holding the channel state
542
        // database. This may be a real backend or a cache middleware.
543
        backend kvdb.Backend
544
}
545

546
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
547
// ChannelStateDB instance. Use this function only in tests where passing around
548
// pointers makes testing less readable. Never to be used in production code!
549
func (c *ChannelStateDB) GetParentDB() *DB {
517✔
550
        return c.parent
517✔
551
}
517✔
552

553
// LinkNodeDB returns the current instance of the link node database.
UNCOV
554
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
×
UNCOV
555
        return c.linkNodeDB
×
UNCOV
556
}
×
557

558
// FetchOpenChannels starts a new database transaction and returns all stored
559
// currently active/open channels associated with the target nodeID. In the case
560
// that no active channels are known to have been created with this node, then a
561
// zero-length slice is returned.
562
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
563
        []*OpenChannel, error) {
254✔
564

254✔
565
        var channels []*OpenChannel
254✔
566
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
508✔
567
                var err error
254✔
568
                channels, err = c.fetchOpenChannels(tx, nodeID)
254✔
569
                return err
254✔
570
        }, func() {
508✔
571
                channels = nil
254✔
572
        })
254✔
573

574
        return channels, err
254✔
575
}
576

577
// fetchOpenChannels uses and existing database transaction and returns all
578
// stored currently active/open channels associated with the target nodeID. In
579
// the case that no active channels are known to have been created with this
580
// node, then a zero-length slice is returned.
581
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
582
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
261✔
583

261✔
584
        // Get the bucket dedicated to storing the metadata for open channels.
261✔
585
        openChanBucket := tx.ReadBucket(openChannelBucket)
261✔
586
        if openChanBucket == nil {
261✔
587
                return nil, nil
×
588
        }
×
589

590
        // Within this top level bucket, fetch the bucket dedicated to storing
591
        // open channel data specific to the remote node.
592
        pub := nodeID.SerializeCompressed()
261✔
593
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
261✔
594
        if nodeChanBucket == nil {
313✔
595
                return nil, nil
52✔
596
        }
52✔
597

598
        // Next, we'll need to go down an additional layer in order to retrieve
599
        // the channels for each chain the node knows of.
600
        var channels []*OpenChannel
209✔
601
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
418✔
602
                // If there's a value, it's not a bucket so ignore it.
209✔
603
                if v != nil {
209✔
604
                        return nil
×
605
                }
×
606

607
                // If we've found a valid chainhash bucket, then we'll retrieve
608
                // that so we can extract all the channels.
609
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
209✔
610
                if chainBucket == nil {
209✔
611
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
612
                                chainHash[:])
×
613
                }
×
614

615
                // Finally, we both of the necessary buckets retrieved, fetch
616
                // all the active channels related to this node.
617
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
209✔
618
                if err != nil {
209✔
619
                        return fmt.Errorf("unable to read channel for "+
×
620
                                "chain_hash=%x, node_key=%x: %v",
×
621
                                chainHash[:], pub, err)
×
622
                }
×
623

624
                channels = append(channels, nodeChannels...)
209✔
625
                return nil
209✔
626
        })
627

628
        return channels, err
209✔
629
}
630

631
// fetchNodeChannels retrieves all active channels from the target chainBucket
632
// which is under a node's dedicated channel bucket. This function is typically
633
// used to fetch all the active channels related to a particular node.
634
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
635
        []*OpenChannel, error) {
715✔
636

715✔
637
        var channels []*OpenChannel
715✔
638

715✔
639
        // A node may have channels on several chains, so for each known chain,
715✔
640
        // we'll extract all the channels.
715✔
641
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
1,491✔
642
                // If there's a value, it's not a bucket so ignore it.
776✔
643
                if v != nil {
776✔
644
                        return nil
×
645
                }
×
646

647
                // Once we've found a valid channel bucket, we'll extract it
648
                // from the node's chain bucket.
649
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
776✔
650

776✔
651
                var outPoint wire.OutPoint
776✔
652
                err := graphdb.ReadOutpoint(
776✔
653
                        bytes.NewReader(chanPoint), &outPoint,
776✔
654
                )
776✔
655
                if err != nil {
776✔
656
                        return err
×
657
                }
×
658
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
776✔
659
                if err != nil {
776✔
660
                        return fmt.Errorf("unable to read channel data for "+
×
661
                                "chan_point=%v: %w", outPoint, err)
×
662
                }
×
663
                oChannel.Db = c
776✔
664

776✔
665
                channels = append(channels, oChannel)
776✔
666

776✔
667
                return nil
776✔
668
        })
669
        if err != nil {
715✔
670
                return nil, err
×
671
        }
×
672

673
        return channels, nil
715✔
674
}
675

676
// FetchChannel attempts to locate a channel specified by the passed channel
677
// point. If the channel cannot be found, then an error will be returned.
678
func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel,
679
        error) {
10✔
680

10✔
681
        var targetChanPoint bytes.Buffer
10✔
682
        err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint)
10✔
683
        if err != nil {
10✔
684
                return nil, err
×
685
        }
×
686

687
        targetChanPointBytes := targetChanPoint.Bytes()
10✔
688
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
10✔
689
                error) {
20✔
690

10✔
691
                return targetChanPointBytes, &chanPoint, nil
10✔
692
        }
10✔
693

694
        return c.channelScanner(nil, selector)
10✔
695
}
696

697
// FetchChannelByID attempts to locate a channel specified by the passed channel
698
// ID. If the channel cannot be found, then an error will be returned.
699
// Optionally an existing db tx can be supplied.
700
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
701
        *OpenChannel, error) {
2✔
702

2✔
703
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
2✔
704
                error) {
4✔
705

2✔
706
                var (
2✔
707
                        targetChanPointBytes []byte
2✔
708
                        targetChanPoint      *wire.OutPoint
2✔
709

2✔
710
                        // errChanFound is used to signal that the channel has
2✔
711
                        // been found so that iteration through the DB buckets
2✔
712
                        // can stop.
2✔
713
                        errChanFound = errors.New("channel found")
2✔
714
                )
2✔
715
                err := chainBkt.ForEach(func(k, _ []byte) error {
4✔
716
                        var outPoint wire.OutPoint
2✔
717
                        err := graphdb.ReadOutpoint(
2✔
718
                                bytes.NewReader(k), &outPoint,
2✔
719
                        )
2✔
720
                        if err != nil {
2✔
721
                                return err
×
722
                        }
×
723

724
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
2✔
725
                        if chanID != id {
3✔
726
                                return nil
1✔
727
                        }
1✔
728

729
                        targetChanPoint = &outPoint
1✔
730
                        targetChanPointBytes = k
1✔
731

1✔
732
                        return errChanFound
1✔
733
                })
734
                if err != nil && !errors.Is(err, errChanFound) {
2✔
735
                        return nil, nil, err
×
736
                }
×
737
                if targetChanPoint == nil {
3✔
738
                        return nil, nil, ErrChannelNotFound
1✔
739
                }
1✔
740

741
                return targetChanPointBytes, targetChanPoint, nil
1✔
742
        }
743

744
        return c.channelScanner(tx, selector)
2✔
745
}
746

747
// ChanCount is used by the server in determining access control.
748
type ChanCount struct {
749
        HasOpenOrClosedChan bool
750
        PendingOpenCount    uint64
751
}
752

753
// FetchPermAndTempPeers returns a map where the key is the remote node's
754
// public key and the value is a struct that has a tally of the pending-open
755
// channels and whether the peer has an open or closed channel with us.
756
func (c *ChannelStateDB) FetchPermAndTempPeers(
757
        chainHash []byte) (map[string]ChanCount, error) {
1✔
758

1✔
759
        peerChanInfo := make(map[string]ChanCount)
1✔
760

1✔
761
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
2✔
762
                openChanBucket := tx.ReadBucket(openChannelBucket)
1✔
763
                if openChanBucket == nil {
1✔
764
                        return ErrNoChanDBExists
×
765
                }
×
766

767
                openChanErr := openChanBucket.ForEach(func(nodePub,
1✔
768
                        v []byte) error {
4✔
769

3✔
770
                        // If there is a value, this is not a bucket.
3✔
771
                        if v != nil {
3✔
772
                                return nil
×
773
                        }
×
774

775
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
776
                                nodePub,
3✔
777
                        )
3✔
778
                        if nodeChanBucket == nil {
3✔
779
                                return nil
×
780
                        }
×
781

782
                        chainBucket := nodeChanBucket.NestedReadBucket(
3✔
783
                                chainHash,
3✔
784
                        )
3✔
785
                        if chainBucket == nil {
3✔
786
                                return fmt.Errorf("no chain bucket exists")
×
787
                        }
×
788

789
                        var isPermPeer bool
3✔
790
                        var pendingOpenCount uint64
3✔
791

3✔
792
                        internalErr := chainBucket.ForEach(func(chanPoint,
3✔
793
                                val []byte) error {
5✔
794

2✔
795
                                // If there is a value, this is not a bucket.
2✔
796
                                if val != nil {
2✔
797
                                        return nil
×
798
                                }
×
799

800
                                chanBucket := chainBucket.NestedReadBucket(
2✔
801
                                        chanPoint,
2✔
802
                                )
2✔
803
                                if chanBucket == nil {
2✔
804
                                        return nil
×
805
                                }
×
806

807
                                var op wire.OutPoint
2✔
808
                                readErr := graphdb.ReadOutpoint(
2✔
809
                                        bytes.NewReader(chanPoint), &op,
2✔
810
                                )
2✔
811
                                if readErr != nil {
2✔
812
                                        return readErr
×
813
                                }
×
814

815
                                // We need to go through each channel and look
816
                                // at the IsPending status.
817
                                openChan, err := fetchOpenChannel(
2✔
818
                                        chanBucket, &op,
2✔
819
                                )
2✔
820
                                if err != nil {
2✔
821
                                        return err
×
822
                                }
×
823

824
                                if openChan.IsPending {
3✔
825
                                        // Add to the pending-open count since
1✔
826
                                        // this is a temp peer.
1✔
827
                                        pendingOpenCount++
1✔
828
                                        return nil
1✔
829
                                }
1✔
830

831
                                // Since IsPending is false, this is a perm
832
                                // peer.
833
                                isPermPeer = true
1✔
834

1✔
835
                                return nil
1✔
836
                        })
837
                        if internalErr != nil {
3✔
838
                                return internalErr
×
839
                        }
×
840

841
                        peerCount := ChanCount{
3✔
842
                                HasOpenOrClosedChan: isPermPeer,
3✔
843
                                PendingOpenCount:    pendingOpenCount,
3✔
844
                        }
3✔
845
                        peerChanInfo[string(nodePub)] = peerCount
3✔
846

3✔
847
                        return nil
3✔
848
                })
849
                if openChanErr != nil {
1✔
850
                        return openChanErr
×
851
                }
×
852

853
                // Now check the closed channel bucket.
854
                historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
1✔
855
                if historicalChanBucket == nil {
1✔
856
                        return ErrNoHistoricalBucket
×
857
                }
×
858

859
                historicalErr := historicalChanBucket.ForEach(func(chanPoint,
1✔
860
                        v []byte) error {
2✔
861
                        // Parse each nested bucket and the chanInfoKey to get
1✔
862
                        // the IsPending bool. This determines whether the
1✔
863
                        // peer is protected or not.
1✔
864
                        if v != nil {
1✔
865
                                // This is not a bucket. This is currently not
×
866
                                // possible.
×
867
                                return nil
×
868
                        }
×
869

870
                        chanBucket := historicalChanBucket.NestedReadBucket(
1✔
871
                                chanPoint,
1✔
872
                        )
1✔
873
                        if chanBucket == nil {
1✔
874
                                // This is not possible.
×
875
                                return fmt.Errorf("no historical channel " +
×
876
                                        "bucket exists")
×
877
                        }
×
878

879
                        var op wire.OutPoint
1✔
880
                        readErr := graphdb.ReadOutpoint(
1✔
881
                                bytes.NewReader(chanPoint), &op,
1✔
882
                        )
1✔
883
                        if readErr != nil {
1✔
884
                                return readErr
×
885
                        }
×
886

887
                        // This channel is closed, but the structure of the
888
                        // historical bucket is the same. This is by design,
889
                        // which means we can call fetchOpenChannel.
890
                        channel, fetchErr := fetchOpenChannel(chanBucket, &op)
1✔
891
                        if fetchErr != nil {
1✔
892
                                return fetchErr
×
893
                        }
×
894

895
                        // Only include this peer in the protected class if
896
                        // the closing transaction confirmed. Note that
897
                        // CloseChannel can be called in the funding manager
898
                        // while IsPending is true which is why we need this
899
                        // special-casing to not count premature funding
900
                        // manager calls to CloseChannel.
901
                        if !channel.IsPending {
2✔
902
                                // Fetch the public key of the remote node. We
1✔
903
                                // need to use the string-ified serialized,
1✔
904
                                // compressed bytes as the key.
1✔
905
                                remotePub := channel.IdentityPub
1✔
906
                                remoteSer := remotePub.SerializeCompressed()
1✔
907
                                remoteKey := string(remoteSer)
1✔
908

1✔
909
                                count, exists := peerChanInfo[remoteKey]
1✔
910
                                if exists {
2✔
911
                                        count.HasOpenOrClosedChan = true
1✔
912
                                        peerChanInfo[remoteKey] = count
1✔
913
                                } else {
1✔
914
                                        peerCount := ChanCount{
×
915
                                                HasOpenOrClosedChan: true,
×
916
                                        }
×
917
                                        peerChanInfo[remoteKey] = peerCount
×
918
                                }
×
919
                        }
920

921
                        return nil
1✔
922
                })
923
                if historicalErr != nil {
1✔
924
                        return historicalErr
×
925
                }
×
926

927
                return nil
1✔
928
        }, func() {
1✔
929
                clear(peerChanInfo)
1✔
930
        })
1✔
931

932
        return peerChanInfo, err
1✔
933
}
934

935
// channelSelector describes a function that takes a chain-hash bucket from
936
// within the open-channel DB and returns the wanted channel point bytes, and
937
// channel point. It must return the ErrChannelNotFound error if the wanted
938
// channel is not in the given bucket.
939
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
940
        error)
941

942
// channelScanner will traverse the DB to each chain-hash bucket of each node
943
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
944
// to fetch the wanted channel outpoint from the chain bucket.
945
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
946
        chanSelect channelSelector) (*OpenChannel, error) {
12✔
947

12✔
948
        var (
12✔
949
                targetChan *OpenChannel
12✔
950

12✔
951
                // errChanFound is used to signal that the channel has been
12✔
952
                // found so that iteration through the DB buckets can stop.
12✔
953
                errChanFound = errors.New("channel found")
12✔
954
        )
12✔
955

12✔
956
        // chanScan will traverse the following bucket structure:
12✔
957
        //  * nodePub => chainHash => chanPoint
12✔
958
        //
12✔
959
        // At each level we go one further, ensuring that we're traversing the
12✔
960
        // proper key (that's actually a bucket). By only reading the bucket
12✔
961
        // structure and skipping fully decoding each channel, we save a good
12✔
962
        // bit of CPU as we don't need to do things like decompress public
12✔
963
        // keys.
12✔
964
        chanScan := func(tx kvdb.RTx) error {
24✔
965
                // Get the bucket dedicated to storing the metadata for open
12✔
966
                // channels.
12✔
967
                openChanBucket := tx.ReadBucket(openChannelBucket)
12✔
968
                if openChanBucket == nil {
12✔
969
                        return ErrNoActiveChannels
×
970
                }
×
971

972
                // Within the node channel bucket, are the set of node pubkeys
973
                // we have channels with, we don't know the entire set, so we'll
974
                // check them all.
975
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
24✔
976
                        // Ensure that this is a key the same size as a pubkey,
12✔
977
                        // and also that it leads directly to a bucket.
12✔
978
                        if len(nodePub) != 33 || v != nil {
12✔
979
                                return nil
×
980
                        }
×
981

982
                        nodeChanBucket := openChanBucket.NestedReadBucket(
12✔
983
                                nodePub,
12✔
984
                        )
12✔
985
                        if nodeChanBucket == nil {
12✔
986
                                return nil
×
987
                        }
×
988

989
                        // The next layer down is all the chains that this node
990
                        // has channels on with us.
991
                        return nodeChanBucket.ForEach(func(chainHash,
12✔
992
                                v []byte) error {
24✔
993

12✔
994
                                // If there's a value, it's not a bucket so
12✔
995
                                // ignore it.
12✔
996
                                if v != nil {
12✔
997
                                        return nil
×
998
                                }
×
999

1000
                                chainBucket := nodeChanBucket.NestedReadBucket(
12✔
1001
                                        chainHash,
12✔
1002
                                )
12✔
1003
                                if chainBucket == nil {
12✔
1004
                                        return fmt.Errorf("unable to read "+
×
1005
                                                "bucket for chain=%x",
×
1006
                                                chainHash)
×
1007
                                }
×
1008

1009
                                // Finally, we reach the leaf bucket that stores
1010
                                // all the chanPoints for this node.
1011
                                targetChanBytes, chanPoint, err := chanSelect(
12✔
1012
                                        chainBucket,
12✔
1013
                                )
12✔
1014
                                if errors.Is(err, ErrChannelNotFound) {
13✔
1015
                                        return nil
1✔
1016
                                } else if err != nil {
12✔
1017
                                        return err
×
1018
                                }
×
1019

1020
                                chanBucket := chainBucket.NestedReadBucket(
11✔
1021
                                        targetChanBytes,
11✔
1022
                                )
11✔
1023
                                if chanBucket == nil {
15✔
1024
                                        return nil
4✔
1025
                                }
4✔
1026

1027
                                channel, err := fetchOpenChannel(
7✔
1028
                                        chanBucket, chanPoint,
7✔
1029
                                )
7✔
1030
                                if err != nil {
7✔
1031
                                        return err
×
1032
                                }
×
1033

1034
                                targetChan = channel
7✔
1035
                                targetChan.Db = c
7✔
1036

7✔
1037
                                return errChanFound
7✔
1038
                        })
1039
                })
1040
        }
1041

1042
        var err error
12✔
1043
        if tx == nil {
24✔
1044
                err = kvdb.View(c.backend, chanScan, func() {})
24✔
1045
        } else {
×
1046
                err = chanScan(tx)
×
1047
        }
×
1048
        if err != nil && !errors.Is(err, errChanFound) {
12✔
1049
                return nil, err
×
1050
        }
×
1051

1052
        if targetChan != nil {
19✔
1053
                return targetChan, nil
7✔
1054
        }
7✔
1055

1056
        // If we can't find the channel, then we return with an error, as we
1057
        // have nothing to back up.
1058
        return nil, ErrChannelNotFound
5✔
1059
}
1060

1061
// FetchAllChannels attempts to retrieve all open channels currently stored
1062
// within the database, including pending open, fully open and channels waiting
1063
// for a closing transaction to confirm.
1064
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
563✔
1065
        return fetchChannels(c)
563✔
1066
}
563✔
1067

1068
// FetchAllOpenChannels will return all channels that have the funding
1069
// transaction confirmed, and is not waiting for a closing transaction to be
1070
// confirmed.
1071
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
405✔
1072
        return fetchChannels(
405✔
1073
                c,
405✔
1074
                pendingChannelFilter(false),
405✔
1075
                waitingCloseFilter(false),
405✔
1076
        )
405✔
1077
}
405✔
1078

1079
// FetchPendingChannels will return channels that have completed the process of
1080
// generating and broadcasting funding transactions, but whose funding
1081
// transactions have yet to be confirmed on the blockchain.
1082
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
79✔
1083
        return fetchChannels(c,
79✔
1084
                pendingChannelFilter(true),
79✔
1085
                waitingCloseFilter(false),
79✔
1086
        )
79✔
1087
}
79✔
1088

1089
// FetchWaitingCloseChannels will return all channels that have been opened,
1090
// but are now waiting for a closing transaction to be confirmed.
1091
//
1092
// NOTE: This includes channels that are also pending to be opened.
1093
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
1✔
1094
        return fetchChannels(
1✔
1095
                c, waitingCloseFilter(true),
1✔
1096
        )
1✔
1097
}
1✔
1098

1099
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
1100
// A set of filters can be combined to filter across multiple dimensions.
1101
type fetchChannelsFilter func(channel *OpenChannel) bool
1102

1103
// pendingChannelFilter returns a filter based on whether channels are pending
1104
// (ie, their funding transaction still needs to confirm). If pending is false,
1105
// channels with confirmed funding transactions are returned.
1106
func pendingChannelFilter(pending bool) fetchChannelsFilter {
493✔
1107
        return func(channel *OpenChannel) bool {
715✔
1108
                return channel.IsPending == pending
222✔
1109
        }
222✔
1110
}
1111

1112
// waitingCloseFilter returns a filter which filters channels based on whether
1113
// they are awaiting the confirmation of their closing transaction. If waiting
1114
// close is true, channels that have had their closing tx broadcast are
1115
// included. If it is false, channels that are not awaiting confirmation of
1116
// their close transaction are returned.
1117
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
491✔
1118
        return func(channel *OpenChannel) bool {
699✔
1119
                // If the channel is in any other state than Default,
208✔
1120
                // then it means it is waiting to be closed.
208✔
1121
                channelWaitingClose :=
208✔
1122
                        channel.ChanStatus() != ChanStatusDefault
208✔
1123

208✔
1124
                // Include the channel if it matches the value for
208✔
1125
                // waiting close that we are filtering on.
208✔
1126
                return channelWaitingClose == waitingClose
208✔
1127
        }
208✔
1128
}
1129

1130
// fetchChannels attempts to retrieve channels currently stored in the
1131
// database. It takes a set of filters which are applied to each channel to
1132
// obtain a set of channels with the desired set of properties. Only channels
1133
// which have a true value returned for *all* of the filters will be returned.
1134
// If no filters are provided, every channel in the open channels bucket will
1135
// be returned.
1136
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
1137
        []*OpenChannel, error) {
1,060✔
1138

1,060✔
1139
        var channels []*OpenChannel
1,060✔
1140

1,060✔
1141
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
2,120✔
1142
                // Get the bucket dedicated to storing the metadata for open
1,060✔
1143
                // channels.
1,060✔
1144
                openChanBucket := tx.ReadBucket(openChannelBucket)
1,060✔
1145
                if openChanBucket == nil {
1,060✔
1146
                        return ErrNoActiveChannels
×
1147
                }
×
1148

1149
                // Next, fetch the bucket dedicated to storing metadata related
1150
                // to all nodes. All keys within this bucket are the serialized
1151
                // public keys of all our direct counterparties.
1152
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
1,060✔
1153
                if nodeMetaBucket == nil {
1,060✔
1154
                        return fmt.Errorf("node bucket not created")
×
1155
                }
×
1156

1157
                // Finally for each node public key in the bucket, fetch all
1158
                // the channels related to this particular node.
1159
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
1,566✔
1160
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
506✔
1161
                        if nodeChanBucket == nil {
506✔
1162
                                return nil
×
1163
                        }
×
1164

1165
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
1,012✔
1166
                                // If there's a value, it's not a bucket so
506✔
1167
                                // ignore it.
506✔
1168
                                if v != nil {
506✔
1169
                                        return nil
×
1170
                                }
×
1171

1172
                                // If we've found a valid chainhash bucket,
1173
                                // then we'll retrieve that so we can extract
1174
                                // all the channels.
1175
                                chainBucket := nodeChanBucket.NestedReadBucket(
506✔
1176
                                        chainHash,
506✔
1177
                                )
506✔
1178
                                if chainBucket == nil {
506✔
1179
                                        return fmt.Errorf("unable to read "+
×
1180
                                                "bucket for chain=%x", chainHash[:])
×
1181
                                }
×
1182

1183
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
506✔
1184
                                if err != nil {
506✔
1185
                                        return fmt.Errorf("unable to read "+
×
1186
                                                "channel for chain_hash=%x, "+
×
1187
                                                "node_key=%x: %v", chainHash[:], k, err)
×
1188
                                }
×
1189
                                for _, channel := range nodeChans {
1,049✔
1190
                                        // includeChannel indicates whether the channel
543✔
1191
                                        // meets the criteria specified by our filters.
543✔
1192
                                        includeChannel := true
543✔
1193

543✔
1194
                                        // Run through each filter and check whether the
543✔
1195
                                        // channel should be included.
543✔
1196
                                        for _, f := range filters {
973✔
1197
                                                // If the channel fails the filter, set
430✔
1198
                                                // includeChannel to false and don't bother
430✔
1199
                                                // checking the remaining filters.
430✔
1200
                                                if !f(channel) {
455✔
1201
                                                        includeChannel = false
25✔
1202
                                                        break
25✔
1203
                                                }
1204
                                        }
1205

1206
                                        // If the channel passed every filter, include it in
1207
                                        // our set of channels.
1208
                                        if includeChannel {
1,061✔
1209
                                                channels = append(channels, channel)
518✔
1210
                                        }
518✔
1211
                                }
1212
                                return nil
506✔
1213
                        })
1214

1215
                })
1216
        }, func() {
1,060✔
1217
                channels = nil
1,060✔
1218
        })
1,060✔
1219
        if err != nil {
1,060✔
1220
                return nil, err
×
1221
        }
×
1222

1223
        return channels, nil
1,060✔
1224
}
1225

1226
// FetchClosedChannels attempts to fetch all closed channels from the database.
1227
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1228
// be returned in the response or not. When a channel was cooperatively closed,
1229
// it becomes fully closed after a single confirmation.  When a channel was
1230
// forcibly closed, it will become fully closed after _all_ the pending funds
1231
// (if any) have been swept.
1232
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1233
        []*ChannelCloseSummary, error) {
500✔
1234

500✔
1235
        var chanSummaries []*ChannelCloseSummary
500✔
1236

500✔
1237
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1,000✔
1238
                closeBucket := tx.ReadBucket(closedChannelBucket)
500✔
1239
                if closeBucket == nil {
500✔
1240
                        return ErrNoClosedChannels
×
1241
                }
×
1242

1243
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
520✔
1244
                        summaryReader := bytes.NewReader(summaryBytes)
20✔
1245
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
20✔
1246
                        if err != nil {
20✔
1247
                                return err
×
1248
                        }
×
1249

1250
                        // If the query specified to only include pending
1251
                        // channels, then we'll skip any channels which aren't
1252
                        // currently pending.
1253
                        if !chanSummary.IsPending && pendingOnly {
21✔
1254
                                return nil
1✔
1255
                        }
1✔
1256

1257
                        chanSummaries = append(chanSummaries, chanSummary)
19✔
1258
                        return nil
19✔
1259
                })
1260
        }, func() {
500✔
1261
                chanSummaries = nil
500✔
1262
        }); err != nil {
500✔
1263
                return nil, err
×
1264
        }
×
1265

1266
        return chanSummaries, nil
500✔
1267
}
1268

1269
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1270
// the channeldb.
1271
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1272

1273
// FetchClosedChannel queries for a channel close summary using the channel
1274
// point of the channel in question.
1275
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1276
        *ChannelCloseSummary, error) {
3✔
1277

3✔
1278
        var chanSummary *ChannelCloseSummary
3✔
1279
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1280
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1281
                if closeBucket == nil {
3✔
1282
                        return ErrClosedChannelNotFound
×
1283
                }
×
1284

1285
                var b bytes.Buffer
3✔
1286
                var err error
3✔
1287
                if err = graphdb.WriteOutpoint(&b, chanID); err != nil {
3✔
1288
                        return err
×
1289
                }
×
1290

1291
                summaryBytes := closeBucket.Get(b.Bytes())
3✔
1292
                if summaryBytes == nil {
4✔
1293
                        return ErrClosedChannelNotFound
1✔
1294
                }
1✔
1295

1296
                summaryReader := bytes.NewReader(summaryBytes)
2✔
1297
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
2✔
1298

2✔
1299
                return err
2✔
1300
        }, func() {
3✔
1301
                chanSummary = nil
3✔
1302
        }); err != nil {
4✔
1303
                return nil, err
1✔
1304
        }
1✔
1305

1306
        return chanSummary, nil
2✔
1307
}
1308

1309
// FetchClosedChannelForID queries for a channel close summary using the
1310
// channel ID of the channel in question.
1311
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1312
        *ChannelCloseSummary, error) {
102✔
1313

102✔
1314
        var chanSummary *ChannelCloseSummary
102✔
1315
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
204✔
1316
                closeBucket := tx.ReadBucket(closedChannelBucket)
102✔
1317
                if closeBucket == nil {
102✔
1318
                        return ErrClosedChannelNotFound
×
1319
                }
×
1320

1321
                // The first 30 bytes of the channel ID and outpoint will be
1322
                // equal.
1323
                cursor := closeBucket.ReadCursor()
102✔
1324
                op, c := cursor.Seek(cid[:30])
102✔
1325

102✔
1326
                // We scan over all possible candidates for this channel ID.
102✔
1327
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
5,354✔
1328
                        var outPoint wire.OutPoint
5,252✔
1329
                        err := graphdb.ReadOutpoint(
5,252✔
1330
                                bytes.NewReader(op), &outPoint,
5,252✔
1331
                        )
5,252✔
1332
                        if err != nil {
5,252✔
1333
                                return err
×
1334
                        }
×
1335

1336
                        // If the found outpoint does not correspond to this
1337
                        // channel ID, we continue.
1338
                        if !cid.IsChanPoint(&outPoint) {
10,403✔
1339
                                continue
5,151✔
1340
                        }
1341

1342
                        // Deserialize the close summary and return.
1343
                        r := bytes.NewReader(c)
101✔
1344
                        chanSummary, err = deserializeCloseChannelSummary(r)
101✔
1345
                        if err != nil {
101✔
1346
                                return err
×
1347
                        }
×
1348

1349
                        return nil
101✔
1350
                }
1351
                return ErrClosedChannelNotFound
1✔
1352
        }, func() {
102✔
1353
                chanSummary = nil
102✔
1354
        }); err != nil {
103✔
1355
                return nil, err
1✔
1356
        }
1✔
1357

1358
        return chanSummary, nil
101✔
1359
}
1360

1361
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1362
// channel should be marked as fully closed if the channel was initially
1363
// cooperatively closed and it's reached a single confirmation, or after all
1364
// the pending funds in a channel that has been forcibly closed have been
1365
// swept.
1366
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
7✔
1367
        var (
7✔
1368
                openChannels  []*OpenChannel
7✔
1369
                pruneLinkNode *btcec.PublicKey
7✔
1370
        )
7✔
1371
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
14✔
1372
                var b bytes.Buffer
7✔
1373
                if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
7✔
1374
                        return err
×
1375
                }
×
1376

1377
                chanID := b.Bytes()
7✔
1378

7✔
1379
                closedChanBucket, err := tx.CreateTopLevelBucket(
7✔
1380
                        closedChannelBucket,
7✔
1381
                )
7✔
1382
                if err != nil {
7✔
1383
                        return err
×
1384
                }
×
1385

1386
                chanSummaryBytes := closedChanBucket.Get(chanID)
7✔
1387
                if chanSummaryBytes == nil {
7✔
1388
                        return fmt.Errorf("no closed channel for "+
×
1389
                                "chan_point=%v found", chanPoint)
×
1390
                }
×
1391

1392
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
7✔
1393
                chanSummary, err := deserializeCloseChannelSummary(
7✔
1394
                        chanSummaryReader,
7✔
1395
                )
7✔
1396
                if err != nil {
7✔
1397
                        return err
×
1398
                }
×
1399

1400
                chanSummary.IsPending = false
7✔
1401

7✔
1402
                var newSummary bytes.Buffer
7✔
1403
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
7✔
1404
                if err != nil {
7✔
1405
                        return err
×
1406
                }
×
1407

1408
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
7✔
1409
                if err != nil {
7✔
1410
                        return err
×
1411
                }
×
1412

1413
                // Now that the channel is closed, we'll check if we have any
1414
                // other open channels with this peer. If we don't we'll
1415
                // garbage collect it to ensure we don't establish persistent
1416
                // connections to peers without open channels.
1417
                pruneLinkNode = chanSummary.RemotePub
7✔
1418
                openChannels, err = c.fetchOpenChannels(
7✔
1419
                        tx, pruneLinkNode,
7✔
1420
                )
7✔
1421
                if err != nil {
7✔
1422
                        return fmt.Errorf("unable to fetch open channels for "+
×
1423
                                "peer %x: %v",
×
1424
                                pruneLinkNode.SerializeCompressed(), err)
×
1425
                }
×
1426

1427
                return nil
7✔
1428
        }, func() {
7✔
1429
                openChannels = nil
7✔
1430
                pruneLinkNode = nil
7✔
1431
        })
7✔
1432
        if err != nil {
7✔
1433
                return err
×
1434
        }
×
1435

1436
        // Decide whether we want to remove the link node, based upon the number
1437
        // of still open channels.
1438
        return c.pruneLinkNode(openChannels, pruneLinkNode)
7✔
1439
}
1440

1441
// pruneLinkNode determines whether we should garbage collect a link node from
1442
// the database due to no longer having any open channels with it. If there are
1443
// any left, then this acts as a no-op.
1444
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1445
        remotePub *btcec.PublicKey) error {
7✔
1446

7✔
1447
        if len(openChannels) > 0 {
7✔
UNCOV
1448
                return nil
×
UNCOV
1449
        }
×
1450

1451
        log.Infof("Pruning link node %x with zero open channels from database",
7✔
1452
                remotePub.SerializeCompressed())
7✔
1453

7✔
1454
        return c.linkNodeDB.DeleteLinkNode(remotePub)
7✔
1455
}
1456

1457
// PruneLinkNodes attempts to prune all link nodes found within the database
1458
// with whom we no longer have any open channels with.
UNCOV
1459
func (c *ChannelStateDB) PruneLinkNodes() error {
×
UNCOV
1460
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
×
UNCOV
1461
        if err != nil {
×
1462
                return err
×
1463
        }
×
1464

UNCOV
1465
        for _, linkNode := range allLinkNodes {
×
UNCOV
1466
                var (
×
UNCOV
1467
                        openChannels []*OpenChannel
×
UNCOV
1468
                        linkNode     = linkNode
×
UNCOV
1469
                )
×
UNCOV
1470
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
×
UNCOV
1471
                        var err error
×
UNCOV
1472
                        openChannels, err = c.fetchOpenChannels(
×
UNCOV
1473
                                tx, linkNode.IdentityPub,
×
UNCOV
1474
                        )
×
UNCOV
1475
                        return err
×
UNCOV
1476
                }, func() {
×
UNCOV
1477
                        openChannels = nil
×
UNCOV
1478
                })
×
UNCOV
1479
                if err != nil {
×
1480
                        return err
×
1481
                }
×
1482

UNCOV
1483
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
×
UNCOV
1484
                if err != nil {
×
1485
                        return err
×
1486
                }
×
1487
        }
1488

UNCOV
1489
        return nil
×
1490
}
1491

1492
// ChannelShell is a shell of a channel that is meant to be used for channel
1493
// recovery purposes. It contains a minimal OpenChannel instance along with
1494
// addresses for that target node.
1495
type ChannelShell struct {
1496
        // NodeAddrs the set of addresses that this node has known to be
1497
        // reachable at in the past.
1498
        NodeAddrs []net.Addr
1499

1500
        // Chan is a shell of an OpenChannel, it contains only the items
1501
        // required to restore the channel on disk.
1502
        Chan *OpenChannel
1503
}
1504

1505
// RestoreChannelShells is a method that allows the caller to reconstruct the
1506
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1507
// new channel to disk, create a LinkNode instance with the passed node
1508
// addresses, and finally create an edge within the graph for the channel as
1509
// well. This method is idempotent, so repeated calls with the same set of
1510
// channel shells won't modify the database after the initial call.
1511
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
1✔
1512
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2✔
1513
                for _, channelShell := range channelShells {
2✔
1514
                        channel := channelShell.Chan
1✔
1515

1✔
1516
                        // When we make a channel, we mark that the channel has
1✔
1517
                        // been restored, this will signal to other sub-systems
1✔
1518
                        // to not attempt to use the channel as if it was a
1✔
1519
                        // regular one.
1✔
1520
                        channel.chanStatus |= ChanStatusRestored
1✔
1521

1✔
1522
                        // First, we'll attempt to create a new open channel
1✔
1523
                        // and link node for this channel. If the channel
1✔
1524
                        // already exists, then in order to ensure this method
1✔
1525
                        // is idempotent, we'll continue to the next step.
1✔
1526
                        channel.Db = c
1✔
1527
                        err := syncNewChannel(
1✔
1528
                                tx, channel, channelShell.NodeAddrs,
1✔
1529
                        )
1✔
1530
                        if err != nil {
1✔
UNCOV
1531
                                return err
×
UNCOV
1532
                        }
×
1533
                }
1534

1535
                return nil
1✔
1536
        }, func() {})
1✔
1537
        if err != nil {
1✔
UNCOV
1538
                return err
×
UNCOV
1539
        }
×
1540

1541
        return nil
1✔
1542
}
1543

1544
// AddrsForNode consults the channel database for all addresses known to the
1545
// passed node public key. The returned boolean indicates if the given node is
1546
// unknown to the channel DB or not.
1547
//
1548
// NOTE: this is part of the AddrSource interface.
1549
func (d *DB) AddrsForNode(_ context.Context, nodePub *btcec.PublicKey) (bool,
1550
        []net.Addr, error) {
1✔
1551

1✔
1552
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
1✔
1553
        // Only if the error is something other than ErrNodeNotFound do we
1✔
1554
        // return it.
1✔
1555
        switch {
1✔
1556
        case err != nil && !errors.Is(err, ErrNodeNotFound):
×
1557
                return false, nil, err
×
1558

1559
        case errors.Is(err, ErrNodeNotFound):
×
1560
                return false, nil, nil
×
1561
        }
1562

1563
        return true, linkNode.Addresses, nil
1✔
1564
}
1565

1566
// AbandonChannel attempts to remove the target channel from the open channel
1567
// database. If the channel was already removed (has a closed channel entry),
1568
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1569
// into the database.
1570
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1571
        bestHeight uint32) error {
4✔
1572

4✔
1573
        // With the chanPoint constructed, we'll attempt to find the target
4✔
1574
        // channel in the database. If we can't find the channel, then we'll
4✔
1575
        // return the error back to the caller.
4✔
1576
        dbChan, err := c.FetchChannel(*chanPoint)
4✔
1577
        switch {
4✔
1578
        // If the channel wasn't found, then it's possible that it was already
1579
        // abandoned from the database.
1580
        case err == ErrChannelNotFound:
2✔
1581
                _, closedErr := c.FetchClosedChannel(chanPoint)
2✔
1582
                if closedErr != nil {
3✔
1583
                        return closedErr
1✔
1584
                }
1✔
1585

1586
                // If the channel was already closed, then we don't return an
1587
                // error as we'd like this step to be repeatable.
1588
                return nil
1✔
1589
        case err != nil:
×
1590
                return err
×
1591
        }
1592

1593
        // Now that we've found the channel, we'll populate a close summary for
1594
        // the channel, so we can store as much information for this abounded
1595
        // channel as possible. We also ensure that we set Pending to false, to
1596
        // indicate that this channel has been "fully" closed.
1597
        summary := &ChannelCloseSummary{
2✔
1598
                CloseType:               Abandoned,
2✔
1599
                ChanPoint:               *chanPoint,
2✔
1600
                ChainHash:               dbChan.ChainHash,
2✔
1601
                CloseHeight:             bestHeight,
2✔
1602
                RemotePub:               dbChan.IdentityPub,
2✔
1603
                Capacity:                dbChan.Capacity,
2✔
1604
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
2✔
1605
                ShortChanID:             dbChan.ShortChanID(),
2✔
1606
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
2✔
1607
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
2✔
1608
                LocalChanConfig:         dbChan.LocalChanCfg,
2✔
1609
        }
2✔
1610

2✔
1611
        // Finally, we'll close the channel in the DB, and return back to the
2✔
1612
        // caller. We set ourselves as the close initiator because we abandoned
2✔
1613
        // the channel.
2✔
1614
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
2✔
1615
}
1616

1617
// SaveChannelOpeningState saves the serialized channel state for the provided
1618
// chanPoint to the channelOpeningStateBucket.
1619
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1620
        serializedState []byte) error {
92✔
1621

92✔
1622
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
184✔
1623
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
92✔
1624
                if err != nil {
92✔
1625
                        return err
×
1626
                }
×
1627

1628
                return bucket.Put(outPoint, serializedState)
92✔
1629
        }, func() {})
92✔
1630
}
1631

1632
// GetChannelOpeningState fetches the serialized channel state for the provided
1633
// outPoint from the database, or returns ErrChannelNotFound if the channel
1634
// is not found.
1635
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1636
        error) {
251✔
1637

251✔
1638
        var serializedState []byte
251✔
1639
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
502✔
1640
                bucket := tx.ReadBucket(channelOpeningStateBucket)
251✔
1641
                if bucket == nil {
251✔
UNCOV
1642
                        // If the bucket does not exist, it means we never added
×
UNCOV
1643
                        //  a channel to the db, so return ErrChannelNotFound.
×
UNCOV
1644
                        return ErrChannelNotFound
×
UNCOV
1645
                }
×
1646

1647
                stateBytes := bucket.Get(outPoint)
251✔
1648
                if stateBytes == nil {
298✔
1649
                        return ErrChannelNotFound
47✔
1650
                }
47✔
1651

1652
                serializedState = append(serializedState, stateBytes...)
204✔
1653

204✔
1654
                return nil
204✔
1655
        }, func() {
251✔
1656
                serializedState = nil
251✔
1657
        })
251✔
1658
        return serializedState, err
251✔
1659
}
1660

1661
// DeleteChannelOpeningState removes any state for outPoint from the database.
1662
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
24✔
1663
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
48✔
1664
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
24✔
1665
                if bucket == nil {
24✔
1666
                        return ErrChannelNotFound
×
1667
                }
×
1668

1669
                return bucket.Delete(outPoint)
24✔
1670
        }, func() {})
24✔
1671
}
1672

1673
// syncVersions function is used for safe db version synchronization. It
1674
// applies migration functions to the current database and recovers the
1675
// previous state of db if at least one error/panic appeared during migration.
1676
func (d *DB) syncVersions(versions []mandatoryVersion) error {
1,754✔
1677
        meta, err := d.FetchMeta()
1,754✔
1678
        if err != nil {
1,754✔
1679
                if err == ErrMetaNotFound {
×
1680
                        meta = &Meta{}
×
1681
                } else {
×
1682
                        return err
×
1683
                }
×
1684
        }
1685

1686
        latestVersion := getLatestDBVersion(versions)
1,754✔
1687
        log.Infof("Checking for schema update: latest_version=%v, "+
1,754✔
1688
                "db_version=%v", latestVersion, meta.DbVersionNumber)
1,754✔
1689

1,754✔
1690
        switch {
1,754✔
1691

1692
        // If the database reports a higher version that we are aware of, the
1693
        // user is probably trying to revert to a prior version of lnd. We fail
1694
        // here to prevent reversions and unintended corruption.
1695
        case meta.DbVersionNumber > latestVersion:
1✔
1696
                log.Errorf("Refusing to revert from db_version=%d to "+
1✔
1697
                        "lower version=%d", meta.DbVersionNumber,
1✔
1698
                        latestVersion)
1✔
1699
                return ErrDBReversion
1✔
1700

1701
        // If the current database version matches the latest version number,
1702
        // then we don't need to perform any migrations.
1703
        case meta.DbVersionNumber == latestVersion:
1,749✔
1704
                return nil
1,749✔
1705
        }
1706

1707
        log.Infof("Performing database schema migration")
4✔
1708

4✔
1709
        // Otherwise, we fetch the migrations which need to applied, and
4✔
1710
        // execute them serially within a single database transaction to ensure
4✔
1711
        // the migration is atomic.
4✔
1712
        migrations, migrationVersions := getMigrationsToApply(
4✔
1713
                versions, meta.DbVersionNumber,
4✔
1714
        )
4✔
1715
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
8✔
1716
                for i, migration := range migrations {
8✔
1717
                        if migration == nil {
4✔
1718
                                continue
×
1719
                        }
1720

1721
                        log.Infof("Applying migration #%v",
4✔
1722
                                migrationVersions[i])
4✔
1723

4✔
1724
                        if err := migration(tx); err != nil {
5✔
1725
                                log.Infof("Unable to apply migration #%v",
1✔
1726
                                        migrationVersions[i])
1✔
1727
                                return err
1✔
1728
                        }
1✔
1729
                }
1730

1731
                meta.DbVersionNumber = latestVersion
2✔
1732
                err := putMeta(meta, tx)
2✔
1733
                if err != nil {
2✔
1734
                        return err
×
1735
                }
×
1736

1737
                // In dry-run mode, return an error to prevent the transaction
1738
                // from committing.
1739
                if d.dryRun {
3✔
1740
                        return ErrDryRunMigrationOK
1✔
1741
                }
1✔
1742

1743
                return nil
1✔
1744
        }, func() {})
4✔
1745
}
1746

1747
// applyOptionalVersions applies the optional migrations to the database if
1748
// specified in the config.
1749
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
1,752✔
1750
        // TODO(yy): need to design the db to support dry run for optional
1,752✔
1751
        // migrations.
1,752✔
1752
        if d.dryRun {
1,753✔
1753
                log.Info("Skipped optional migrations as dry run mode is not " +
1✔
1754
                        "supported yet")
1✔
1755
                return nil
1✔
1756
        }
1✔
1757

1758
        om, err := d.fetchOptionalMeta()
1,751✔
1759
        if err != nil {
1,751✔
1760
                if err == ErrMetaNotFound {
×
1761
                        om = &OptionalMeta{
×
1762
                                Versions: make(map[uint64]string),
×
1763
                        }
×
1764
                } else {
×
1765
                        return fmt.Errorf("unable to fetch optional "+
×
1766
                                "meta: %w", err)
×
1767
                }
×
1768
        }
1769

1770
        // migrationCfg is the parent configuration which implements the config
1771
        // interfaces of all the single optional migrations.
1772
        migrationCfg := &MigrationConfigImpl{
1,751✔
1773
                migration30.MigrateRevLogConfigImpl{
1,751✔
1774
                        NoAmountData: d.noRevLogAmtData,
1,751✔
1775
                },
1,751✔
1776
                migration34.MigrationConfigImpl{
1,751✔
1777
                        DecayedLog: cfg.DecayedLog,
1,751✔
1778
                },
1,751✔
1779
        }
1,751✔
1780

1,751✔
1781
        log.Infof("Applying %d optional migrations", len(optionalVersions))
1,751✔
1782

1,751✔
1783
        // Apply the optional migrations if requested.
1,751✔
1784
        for number, version := range optionalVersions {
5,253✔
1785
                log.Infof("Checking for optional update: name=%v", version.name)
3,502✔
1786

3,502✔
1787
                // Exit early if the optional migration is not specified.
3,502✔
1788
                if !cfg.MigrationFlags[number] {
7,000✔
1789
                        log.Debugf("Skipping optional migration: name=%s as "+
3,498✔
1790
                                "it is not specified in the config",
3,498✔
1791
                                version.name)
3,498✔
1792

3,498✔
1793
                        continue
3,498✔
1794
                }
1795

1796
                // Exit early if the optional migration has already been
1797
                // applied.
1798
                if _, ok := om.Versions[uint64(number)]; ok {
6✔
1799
                        log.Debugf("Skipping optional migration: name=%s as "+
2✔
1800
                                "it has already been applied", version.name)
2✔
1801

2✔
1802
                        continue
2✔
1803
                }
1804

1805
                log.Infof("Performing database optional migration: %s",
2✔
1806
                        version.name)
2✔
1807

2✔
1808
                // Call the migration function for the specific optional
2✔
1809
                // migration.
2✔
1810
                if err := version.migration(d, migrationCfg); err != nil {
2✔
1811
                        log.Errorf("Unable to apply optional migration: %s, "+
×
1812
                                "error: %v", version.name, err)
×
1813
                        return err
×
1814
                }
×
1815

1816
                // Update the optional meta. Notice that unlike the mandatory db
1817
                // migrations where we perform the migration and updating meta
1818
                // in a single db transaction, we use different transactions
1819
                // here. Even when the following update is failed, we should be
1820
                // fine here as we would re-run the optional migration again,
1821
                // which is a noop, during next startup.
1822
                om.Versions[uint64(number)] = version.name
2✔
1823
                if err := d.putOptionalMeta(om); err != nil {
2✔
1824
                        log.Errorf("Unable to update optional meta: %v", err)
×
1825
                        return err
×
1826
                }
×
1827

1828
                log.Infof("Successfully applied optional migration: %s",
2✔
1829
                        version.name)
2✔
1830
        }
1831

1832
        return nil
1,751✔
1833
}
1834

1835
// ChannelStateDB returns the sub database that is concerned with the channel
1836
// state.
1837
func (d *DB) ChannelStateDB() *ChannelStateDB {
2,168✔
1838
        return d.channelStateDB
2,168✔
1839
}
2,168✔
1840

1841
// LatestDBVersion returns the number of the latest database version currently
1842
// known to the channel DB.
1843
func LatestDBVersion() uint32 {
1✔
1844
        return getLatestDBVersion(dbVersions)
1✔
1845
}
1✔
1846

1847
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
5,435✔
1848
        return versions[len(versions)-1].number
5,435✔
1849
}
5,435✔
1850

1851
// getMigrationsToApply retrieves the migration function that should be
1852
// applied to the database.
1853
func getMigrationsToApply(versions []mandatoryVersion,
1854
        version uint32) ([]migration, []uint32) {
5✔
1855

5✔
1856
        migrations := make([]migration, 0, len(versions))
5✔
1857
        migrationVersions := make([]uint32, 0, len(versions))
5✔
1858

5✔
1859
        for _, v := range versions {
17✔
1860
                if v.number > version {
18✔
1861
                        migrations = append(migrations, v.migration)
6✔
1862
                        migrationVersions = append(migrationVersions, v.number)
6✔
1863
                }
6✔
1864
        }
1865

1866
        return migrations, migrationVersions
5✔
1867
}
1868

1869
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1870
// from the historical channel bucket. If the bucket does not exist,
1871
// ErrNoHistoricalBucket is returned.
1872
func fetchHistoricalChanBucket(tx kvdb.RTx,
1873
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
4✔
1874

4✔
1875
        // First fetch the top level bucket which stores all data related to
4✔
1876
        // historically stored channels.
4✔
1877
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
4✔
1878
        if historicalChanBucket == nil {
4✔
1879
                return nil, ErrNoHistoricalBucket
×
1880
        }
×
1881

1882
        // With the bucket for the node and chain fetched, we can now go down
1883
        // another level, for the channel itself.
1884
        var chanPointBuf bytes.Buffer
4✔
1885
        if err := graphdb.WriteOutpoint(&chanPointBuf, outPoint); err != nil {
4✔
1886
                return nil, err
×
1887
        }
×
1888
        chanBucket := historicalChanBucket.NestedReadBucket(
4✔
1889
                chanPointBuf.Bytes(),
4✔
1890
        )
4✔
1891
        if chanBucket == nil {
6✔
1892
                return nil, ErrChannelNotFound
2✔
1893
        }
2✔
1894

1895
        return chanBucket, nil
2✔
1896
}
1897

1898
// FetchHistoricalChannel fetches open channel data from the historical channel
1899
// bucket.
1900
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1901
        *OpenChannel, error) {
4✔
1902

4✔
1903
        var channel *OpenChannel
4✔
1904
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
8✔
1905
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
4✔
1906
                if err != nil {
6✔
1907
                        return err
2✔
1908
                }
2✔
1909

1910
                channel, err = fetchOpenChannel(chanBucket, outPoint)
2✔
1911
                if err != nil {
2✔
1912
                        return err
×
1913
                }
×
1914

1915
                channel.Db = c
2✔
1916
                return nil
2✔
1917
        }, func() {
4✔
1918
                channel = nil
4✔
1919
        })
4✔
1920
        if err != nil {
6✔
1921
                return nil, err
2✔
1922
        }
2✔
1923

1924
        return channel, nil
2✔
1925
}
1926

1927
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1928
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
10✔
1929

10✔
1930
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
10✔
1931
        if finalHtlcsBucket == nil {
16✔
1932
                return nil, ErrFinalHtlcsBucketNotFound
6✔
1933
        }
6✔
1934

1935
        var chanIDBytes [8]byte
4✔
1936
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
4✔
1937

4✔
1938
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
4✔
1939
        if chanBucket == nil {
4✔
1940
                return nil, ErrFinalChannelBucketNotFound
×
1941
        }
×
1942

1943
        return chanBucket, nil
4✔
1944
}
1945

1946
var ErrHtlcUnknown = errors.New("htlc unknown")
1947

1948
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1949
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1950
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1951
        htlcIndex uint64) (*FinalHtlcInfo, error) {
10✔
1952

10✔
1953
        var idBytes [8]byte
10✔
1954
        byteOrder.PutUint64(idBytes[:], htlcIndex)
10✔
1955

10✔
1956
        var settledByte byte
10✔
1957

10✔
1958
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
20✔
1959
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
10✔
1960
                        tx, chanID,
10✔
1961
                )
10✔
1962
                switch {
10✔
1963
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
6✔
1964
                        fallthrough
6✔
1965

1966
                case errors.Is(err, ErrFinalChannelBucketNotFound):
6✔
1967
                        return ErrHtlcUnknown
6✔
1968

1969
                case err != nil:
×
1970
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1971
                                err)
×
1972
                }
1973

1974
                value := finalHtlcsBucket.Get(idBytes[:])
4✔
1975
                if value == nil {
5✔
1976
                        return ErrHtlcUnknown
1✔
1977
                }
1✔
1978

1979
                if len(value) != 1 {
3✔
1980
                        return errors.New("unexpected final htlc value length")
×
1981
                }
×
1982

1983
                settledByte = value[0]
3✔
1984

3✔
1985
                return nil
3✔
1986
        }, func() {
10✔
1987
                settledByte = 0
10✔
1988
        })
10✔
1989
        if err != nil {
17✔
1990
                return nil, err
7✔
1991
        }
7✔
1992

1993
        info := FinalHtlcInfo{
3✔
1994
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
3✔
1995
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
3✔
1996
        }
3✔
1997

3✔
1998
        return &info, nil
3✔
1999
}
2000

2001
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
2002
// the database.
2003
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
2004
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
1✔
2005

1✔
2006
        // Skip if the user did not opt in to storing final resolutions.
1✔
2007
        if !c.parent.storeFinalHtlcResolutions {
1✔
UNCOV
2008
                return nil
×
UNCOV
2009
        }
×
2010

2011
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2✔
2012
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
1✔
2013
                if err != nil {
1✔
2014
                        return err
×
2015
                }
×
2016

2017
                return putFinalHtlc(
1✔
2018
                        finalHtlcsBucket, htlcID,
1✔
2019
                        FinalHtlcInfo{
1✔
2020
                                Settled:  settled,
1✔
2021
                                Offchain: false,
1✔
2022
                        },
1✔
2023
                )
1✔
2024
        }, func() {})
1✔
2025
}
2026

2027
// MakeTestInvoiceDB is used to create a test invoice database for testing
2028
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
2029
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
2030
        invoices.InvoiceDB, error) {
154✔
2031

154✔
2032
        return MakeTestDB(t, modifiers...)
154✔
2033
}
154✔
2034

2035
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
2036
// A callback which cleans up the created temporary directories is also
2037
// returned and intended to be executed after the test completes.
2038
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
290✔
2039
        // First, create a temporary directory to be used for the duration of
290✔
2040
        // this test.
290✔
2041
        tempDirName := t.TempDir()
290✔
2042

290✔
2043
        // Next, create channeldb for the first time.
290✔
2044
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
290✔
2045
        if err != nil {
290✔
2046
                backendCleanup()
×
2047
                return nil, err
×
2048
        }
×
2049

2050
        cdb, err := CreateWithBackend(backend, modifiers...)
290✔
2051
        if err != nil {
290✔
2052
                backendCleanup()
×
2053
                return nil, err
×
2054
        }
×
2055

2056
        t.Cleanup(func() {
580✔
2057
                cdb.Close()
290✔
2058
                backendCleanup()
290✔
2059
        })
290✔
2060

2061
        return cdb, nil
290✔
2062
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc