• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15736109134

18 Jun 2025 02:46PM UTC coverage: 58.197% (-10.1%) from 68.248%
15736109134

Pull #9752

github

web-flow
Merge d2634a68c into 31c74f20f
Pull Request #9752: routerrpc: reject payment to invoice that don't have payment secret or blinded paths

6 of 13 new or added lines in 2 files covered. (46.15%)

28331 existing lines in 455 files now uncovered.

97860 of 168153 relevant lines covered (58.2%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.3
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "fmt"
8
        "net"
9
        "os"
10
        "testing"
11

12
        "github.com/btcsuite/btcd/btcec/v2"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/walletdb"
15
        "github.com/go-errors/errors"
16
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
17
        "github.com/lightningnetwork/lnd/channeldb/migration12"
18
        "github.com/lightningnetwork/lnd/channeldb/migration13"
19
        "github.com/lightningnetwork/lnd/channeldb/migration16"
20
        "github.com/lightningnetwork/lnd/channeldb/migration20"
21
        "github.com/lightningnetwork/lnd/channeldb/migration21"
22
        "github.com/lightningnetwork/lnd/channeldb/migration23"
23
        "github.com/lightningnetwork/lnd/channeldb/migration24"
24
        "github.com/lightningnetwork/lnd/channeldb/migration25"
25
        "github.com/lightningnetwork/lnd/channeldb/migration26"
26
        "github.com/lightningnetwork/lnd/channeldb/migration27"
27
        "github.com/lightningnetwork/lnd/channeldb/migration29"
28
        "github.com/lightningnetwork/lnd/channeldb/migration30"
29
        "github.com/lightningnetwork/lnd/channeldb/migration31"
30
        "github.com/lightningnetwork/lnd/channeldb/migration32"
31
        "github.com/lightningnetwork/lnd/channeldb/migration33"
32
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
33
        "github.com/lightningnetwork/lnd/clock"
34
        graphdb "github.com/lightningnetwork/lnd/graph/db"
35
        "github.com/lightningnetwork/lnd/invoices"
36
        "github.com/lightningnetwork/lnd/kvdb"
37
        "github.com/lightningnetwork/lnd/lnwire"
38
        "github.com/stretchr/testify/require"
39
)
40

41
const (
42
        dbName = "channel.db"
43
)
44

45
var (
46
        // ErrDryRunMigrationOK signals that a migration executed successful,
47
        // but we intentionally did not commit the result.
48
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
49

50
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
51
        // bucket does not exist.
52
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
53
                "found")
54

55
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
56
        // final htlc outcomes does not exist.
57
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
58
                "bucket not found")
59
)
60

61
// migration is a function which takes a prior outdated version of the database
62
// instances and mutates the key/bucket structure to arrive at a more
63
// up-to-date version of the database.
64
type migration func(tx kvdb.RwTx) error
65

66
// mandatoryVersion defines a db version that must be applied before the lnd
67
// starts.
68
type mandatoryVersion struct {
69
        number    uint32
70
        migration migration
71
}
72

73
// MigrationConfig is an interface combines the config interfaces of all
74
// optional migrations.
75
type MigrationConfig interface {
76
        migration30.MigrateRevLogConfig
77
}
78

79
// MigrationConfigImpl is a super set of all the various migration configs and
80
// an implementation of MigrationConfig.
81
type MigrationConfigImpl struct {
82
        migration30.MigrateRevLogConfigImpl
83
}
84

85
// optionalMigration defines an optional migration function. When a migration
86
// is optional, it usually involves a large scale of changes that might touch
87
// millions of keys. Due to OOM concern, the update cannot be safely done
88
// within one db transaction. Thus, for optional migrations, they must take the
89
// db backend and construct transactions as needed.
90
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
91

92
// optionalVersion defines a db version that can be optionally applied. When
93
// applying migrations, we must apply all the mandatory migrations first before
94
// attempting optional ones.
95
type optionalVersion struct {
96
        name      string
97
        migration optionalMigration
98
}
99

100
var (
101
        // dbVersions is storing all mandatory versions of database. If current
102
        // version of database don't match with latest version this list will
103
        // be used for retrieving all migration function that are need to apply
104
        // to the current db.
105
        dbVersions = []mandatoryVersion{
106
                {
107
                        // The base DB version requires no migration.
108
                        number:    0,
109
                        migration: nil,
110
                },
111
                {
112
                        // The version of the database where two new indexes
113
                        // for the update time of node and channel updates were
114
                        // added.
115
                        number:    1,
116
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
117
                },
118
                {
119
                        // The DB version that added the invoice event time
120
                        // series.
121
                        number:    2,
122
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
123
                },
124
                {
125
                        // The DB version that updated the embedded invoice in
126
                        // outgoing payments to match the new format.
127
                        number:    3,
128
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
129
                },
130
                {
131
                        // The version of the database where every channel
132
                        // always has two entries in the edges bucket. If
133
                        // a policy is unknown, this will be represented
134
                        // by a special byte sequence.
135
                        number:    4,
136
                        migration: migration_01_to_11.MigrateEdgePolicies,
137
                },
138
                {
139
                        // The DB version where we persist each attempt to send
140
                        // an HTLC to a payment hash, and track whether the
141
                        // payment is in-flight, succeeded, or failed.
142
                        number:    5,
143
                        migration: migration_01_to_11.PaymentStatusesMigration,
144
                },
145
                {
146
                        // The DB version that properly prunes stale entries
147
                        // from the edge update index.
148
                        number:    6,
149
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
150
                },
151
                {
152
                        // The DB version that migrates the ChannelCloseSummary
153
                        // to a format where optional fields are indicated with
154
                        // boolean flags.
155
                        number:    7,
156
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
157
                },
158
                {
159
                        // The DB version that changes the gossiper's message
160
                        // store keys to account for the message's type and
161
                        // ShortChannelID.
162
                        number:    8,
163
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
164
                },
165
                {
166
                        // The DB version where the payments and payment
167
                        // statuses are moved to being stored in a combined
168
                        // bucket.
169
                        number:    9,
170
                        migration: migration_01_to_11.MigrateOutgoingPayments,
171
                },
172
                {
173
                        // The DB version where we started to store legacy
174
                        // payload information for all routes, as well as the
175
                        // optional TLV records.
176
                        number:    10,
177
                        migration: migration_01_to_11.MigrateRouteSerialization,
178
                },
179
                {
180
                        // Add invoice htlc and cltv delta fields.
181
                        number:    11,
182
                        migration: migration_01_to_11.MigrateInvoices,
183
                },
184
                {
185
                        // Migrate to TLV invoice bodies, add payment address
186
                        // and features, remove receipt.
187
                        number:    12,
188
                        migration: migration12.MigrateInvoiceTLV,
189
                },
190
                {
191
                        // Migrate to multi-path payments.
192
                        number:    13,
193
                        migration: migration13.MigrateMPP,
194
                },
195
                {
196
                        // Initialize payment address index and begin using it
197
                        // as the default index, falling back to payment hash
198
                        // index.
199
                        number:    14,
200
                        migration: mig.CreateTLB(payAddrIndexBucket),
201
                },
202
                {
203
                        // Initialize payment index bucket which will be used
204
                        // to index payments by sequence number. This index will
205
                        // be used to allow more efficient ListPayments queries.
206
                        number:    15,
207
                        migration: mig.CreateTLB(paymentsIndexBucket),
208
                },
209
                {
210
                        // Add our existing payments to the index bucket created
211
                        // in migration 15.
212
                        number:    16,
213
                        migration: migration16.MigrateSequenceIndex,
214
                },
215
                {
216
                        // Create a top level bucket which will store extra
217
                        // information about channel closes.
218
                        number:    17,
219
                        migration: mig.CreateTLB(closeSummaryBucket),
220
                },
221
                {
222
                        // Create a top level bucket which holds information
223
                        // about our peers.
224
                        number:    18,
225
                        migration: mig.CreateTLB(peersBucket),
226
                },
227
                {
228
                        // Create a top level bucket which holds outpoint
229
                        // information.
230
                        number:    19,
231
                        migration: mig.CreateTLB(outpointBucket),
232
                },
233
                {
234
                        // Migrate some data to the outpoint index.
235
                        number:    20,
236
                        migration: migration20.MigrateOutpointIndex,
237
                },
238
                {
239
                        // Migrate to length prefixed wire messages everywhere
240
                        // in the database.
241
                        number:    21,
242
                        migration: migration21.MigrateDatabaseWireMessages,
243
                },
244
                {
245
                        // Initialize set id index so that invoices can be
246
                        // queried by individual htlc sets.
247
                        number:    22,
248
                        migration: mig.CreateTLB(setIDIndexBucket),
249
                },
250
                {
251
                        number:    23,
252
                        migration: migration23.MigrateHtlcAttempts,
253
                },
254
                {
255
                        // Remove old forwarding packages of closed channels.
256
                        number:    24,
257
                        migration: migration24.MigrateFwdPkgCleanup,
258
                },
259
                {
260
                        // Save the initial local/remote balances in channel
261
                        // info.
262
                        number:    25,
263
                        migration: migration25.MigrateInitialBalances,
264
                },
265
                {
266
                        // Migrate the initial local/remote balance fields into
267
                        // tlv records.
268
                        number:    26,
269
                        migration: migration26.MigrateBalancesToTlvRecords,
270
                },
271
                {
272
                        // Patch the initial local/remote balance fields with
273
                        // empty values for historical channels.
274
                        number:    27,
275
                        migration: migration27.MigrateHistoricalBalances,
276
                },
277
                {
278
                        number:    28,
279
                        migration: mig.CreateTLB(chanIDBucket),
280
                },
281
                {
282
                        number:    29,
283
                        migration: migration29.MigrateChanID,
284
                },
285
                {
286
                        // Removes the "sweeper-last-tx" bucket. Although we
287
                        // do not have a mandatory version 30 we skip this
288
                        // version because its naming is already used for the
289
                        // first optional migration.
290
                        number:    31,
291
                        migration: migration31.DeleteLastPublishedTxTLB,
292
                },
293
                {
294
                        number:    32,
295
                        migration: migration32.MigrateMCRouteSerialisation,
296
                },
297
                {
298
                        number:    33,
299
                        migration: migration33.MigrateMCStoreNameSpacedResults,
300
                },
301
        }
302

303
        // optionalVersions stores all optional migrations that are applied
304
        // after dbVersions.
305
        //
306
        // NOTE: optional migrations must be fault-tolerant and re-run already
307
        // migrated data must be noop, which means the migration must be able
308
        // to determine its state.
309
        optionalVersions = []optionalVersion{
310
                {
311
                        name: "prune revocation log",
312
                        migration: func(db kvdb.Backend,
313
                                cfg MigrationConfig) error {
×
314

×
315
                                return migration30.MigrateRevocationLog(db, cfg)
×
316
                        },
×
317
                },
318
        }
319

320
        // Big endian is the preferred byte order, due to cursor scans over
321
        // integer keys iterating in order.
322
        byteOrder = binary.BigEndian
323

324
        // channelOpeningStateBucket is the database bucket used to store the
325
        // channelOpeningState for each channel that is currently in the process
326
        // of being opened.
327
        channelOpeningStateBucket = []byte("channelOpeningState")
328
)
329

330
// DB is the primary datastore for the lnd daemon. The database stores
331
// information related to nodes, routing data, open/closed channels, fee
332
// schedules, and reputation data.
333
type DB struct {
334
        kvdb.Backend
335

336
        // channelStateDB separates all DB operations on channel state.
337
        channelStateDB *ChannelStateDB
338

339
        dbPath                    string
340
        clock                     clock.Clock
341
        dryRun                    bool
342
        keepFailedPaymentAttempts bool
343
        storeFinalHtlcResolutions bool
344

345
        // noRevLogAmtData if true, means that commitment transaction amount
346
        // data should not be stored in the revocation log.
347
        noRevLogAmtData bool
348
}
349

350
// OpenForTesting opens or creates a channeldb to be used for tests. Any
351
// necessary schemas migrations due to updates will take place as necessary.
352
func OpenForTesting(t testing.TB, dbPath string,
UNCOV
353
        modifiers ...OptionModifier) *DB {
×
UNCOV
354

×
UNCOV
355
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
×
UNCOV
356
                DBPath:            dbPath,
×
UNCOV
357
                DBFileName:        dbName,
×
UNCOV
358
                NoFreelistSync:    true,
×
UNCOV
359
                AutoCompact:       false,
×
UNCOV
360
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
×
UNCOV
361
                DBTimeout:         kvdb.DefaultDBTimeout,
×
UNCOV
362
        })
×
UNCOV
363
        require.NoError(t, err)
×
UNCOV
364

×
UNCOV
365
        db, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
366
        require.NoError(t, err)
×
UNCOV
367

×
UNCOV
368
        db.dbPath = dbPath
×
UNCOV
369

×
UNCOV
370
        t.Cleanup(func() {
×
UNCOV
371
                require.NoError(t, db.Close())
×
UNCOV
372
        })
×
373

UNCOV
374
        return db
×
375
}
376

377
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
378
// Any necessary schemas migrations due to updates will take place as necessary.
379
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
380
        error) {
3✔
381

3✔
382
        opts := DefaultOptions()
3✔
383
        for _, modifier := range modifiers {
6✔
384
                modifier(&opts)
3✔
385
        }
3✔
386

387
        if !opts.NoMigration {
6✔
388
                if err := initChannelDB(backend); err != nil {
3✔
UNCOV
389
                        return nil, err
×
UNCOV
390
                }
×
391
        }
392

393
        chanDB := &DB{
3✔
394
                Backend: backend,
3✔
395
                channelStateDB: &ChannelStateDB{
3✔
396
                        linkNodeDB: &LinkNodeDB{
3✔
397
                                backend: backend,
3✔
398
                        },
3✔
399
                        backend: backend,
3✔
400
                },
3✔
401
                clock:                     opts.clock,
3✔
402
                dryRun:                    opts.dryRun,
3✔
403
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
3✔
404
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
3✔
405
                noRevLogAmtData:           opts.NoRevLogAmtData,
3✔
406
        }
3✔
407

3✔
408
        // Set the parent pointer (only used in tests).
3✔
409
        chanDB.channelStateDB.parent = chanDB
3✔
410

3✔
411
        // Synchronize the version of database and apply migrations if needed.
3✔
412
        if !opts.NoMigration {
6✔
413
                if err := chanDB.syncVersions(dbVersions); err != nil {
3✔
UNCOV
414
                        backend.Close()
×
UNCOV
415
                        return nil, err
×
UNCOV
416
                }
×
417

418
                // Grab the optional migration config.
419
                omc := opts.OptionalMiragtionConfig
3✔
420
                if err := chanDB.applyOptionalVersions(omc); err != nil {
3✔
421
                        backend.Close()
×
422
                        return nil, err
×
423
                }
×
424
        }
425

426
        return chanDB, nil
3✔
427
}
428

429
// Path returns the file path to the channel database.
UNCOV
430
func (d *DB) Path() string {
×
UNCOV
431
        return d.dbPath
×
UNCOV
432
}
×
433

434
var dbTopLevelBuckets = [][]byte{
435
        openChannelBucket,
436
        closedChannelBucket,
437
        forwardingLogBucket,
438
        fwdPackagesKey,
439
        invoiceBucket,
440
        payAddrIndexBucket,
441
        setIDIndexBucket,
442
        paymentsIndexBucket,
443
        peersBucket,
444
        nodeInfoBucket,
445
        metaBucket,
446
        closeSummaryBucket,
447
        outpointBucket,
448
        chanIDBucket,
449
        historicalChannelBucket,
450
}
451

452
// Wipe completely deletes all saved state within all used buckets within the
453
// database. The deletion is done in a single transaction, therefore this
454
// operation is fully atomic.
UNCOV
455
func (d *DB) Wipe() error {
×
UNCOV
456
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
457
                for _, tlb := range dbTopLevelBuckets {
×
UNCOV
458
                        err := tx.DeleteTopLevelBucket(tlb)
×
UNCOV
459
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
460
                                return err
×
461
                        }
×
462
                }
UNCOV
463
                return nil
×
UNCOV
464
        }, func() {})
×
UNCOV
465
        if err != nil {
×
466
                return err
×
467
        }
×
468

UNCOV
469
        return initChannelDB(d.Backend)
×
470
}
471

472
// initChannelDB creates and initializes a fresh version of channeldb. In the
473
// case that the target path has not yet been created or doesn't yet exist, then
474
// the path is created. Additionally, all required top-level buckets used within
475
// the database are created.
476
func initChannelDB(db kvdb.Backend) error {
3✔
477
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
478
                // Check if DB was marked as inactive with a tomb stone.
3✔
479
                if err := EnsureNoTombstone(tx); err != nil {
3✔
UNCOV
480
                        return err
×
UNCOV
481
                }
×
482

483
                for _, tlb := range dbTopLevelBuckets {
6✔
484
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
3✔
485
                                return err
×
486
                        }
×
487
                }
488

489
                meta := &Meta{}
3✔
490
                // Check if DB is already initialized.
3✔
491
                err := FetchMeta(meta, tx)
3✔
492
                if err == nil {
6✔
493
                        return nil
3✔
494
                }
3✔
495

496
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
×
497
                return putMeta(meta, tx)
×
498
        }, func() {})
3✔
499
        if err != nil {
3✔
UNCOV
500
                return fmt.Errorf("unable to create new channeldb: %w", err)
×
UNCOV
501
        }
×
502

503
        return nil
3✔
504
}
505

506
// fileExists returns true if the file exists, and false otherwise.
UNCOV
507
func fileExists(path string) bool {
×
UNCOV
508
        if _, err := os.Stat(path); err != nil {
×
509
                if os.IsNotExist(err) {
×
510
                        return false
×
511
                }
×
512
        }
513

UNCOV
514
        return true
×
515
}
516

517
// ChannelStateDB is a database that keeps track of all channel state.
518
type ChannelStateDB struct {
519
        // linkNodeDB separates all DB operations on LinkNodes.
520
        linkNodeDB *LinkNodeDB
521

522
        // parent holds a pointer to the "main" channeldb.DB object. This is
523
        // only used for testing and should never be used in production code.
524
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
525
        // this pointer.
526
        parent *DB
527

528
        // backend points to the actual backend holding the channel state
529
        // database. This may be a real backend or a cache middleware.
530
        backend kvdb.Backend
531
}
532

533
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
534
// ChannelStateDB instance. Use this function only in tests where passing around
535
// pointers makes testing less readable. Never to be used in production code!
UNCOV
536
func (c *ChannelStateDB) GetParentDB() *DB {
×
UNCOV
537
        return c.parent
×
UNCOV
538
}
×
539

540
// LinkNodeDB returns the current instance of the link node database.
541
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
3✔
542
        return c.linkNodeDB
3✔
543
}
3✔
544

545
// FetchOpenChannels starts a new database transaction and returns all stored
546
// currently active/open channels associated with the target nodeID. In the case
547
// that no active channels are known to have been created with this node, then a
548
// zero-length slice is returned.
549
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
550
        []*OpenChannel, error) {
3✔
551

3✔
552
        var channels []*OpenChannel
3✔
553
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
554
                var err error
3✔
555
                channels, err = c.fetchOpenChannels(tx, nodeID)
3✔
556
                return err
3✔
557
        }, func() {
6✔
558
                channels = nil
3✔
559
        })
3✔
560

561
        return channels, err
3✔
562
}
563

564
// fetchOpenChannels uses and existing database transaction and returns all
565
// stored currently active/open channels associated with the target nodeID. In
566
// the case that no active channels are known to have been created with this
567
// node, then a zero-length slice is returned.
568
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
569
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
3✔
570

3✔
571
        // Get the bucket dedicated to storing the metadata for open channels.
3✔
572
        openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
573
        if openChanBucket == nil {
3✔
574
                return nil, nil
×
575
        }
×
576

577
        // Within this top level bucket, fetch the bucket dedicated to storing
578
        // open channel data specific to the remote node.
579
        pub := nodeID.SerializeCompressed()
3✔
580
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
3✔
581
        if nodeChanBucket == nil {
6✔
582
                return nil, nil
3✔
583
        }
3✔
584

585
        // Next, we'll need to go down an additional layer in order to retrieve
586
        // the channels for each chain the node knows of.
587
        var channels []*OpenChannel
3✔
588
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
589
                // If there's a value, it's not a bucket so ignore it.
3✔
590
                if v != nil {
3✔
591
                        return nil
×
592
                }
×
593

594
                // If we've found a valid chainhash bucket, then we'll retrieve
595
                // that so we can extract all the channels.
596
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
3✔
597
                if chainBucket == nil {
3✔
598
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
599
                                chainHash[:])
×
600
                }
×
601

602
                // Finally, we both of the necessary buckets retrieved, fetch
603
                // all the active channels related to this node.
604
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
3✔
605
                if err != nil {
3✔
606
                        return fmt.Errorf("unable to read channel for "+
×
607
                                "chain_hash=%x, node_key=%x: %v",
×
608
                                chainHash[:], pub, err)
×
609
                }
×
610

611
                channels = append(channels, nodeChannels...)
3✔
612
                return nil
3✔
613
        })
614

615
        return channels, err
3✔
616
}
617

618
// fetchNodeChannels retrieves all active channels from the target chainBucket
619
// which is under a node's dedicated channel bucket. This function is typically
620
// used to fetch all the active channels related to a particular node.
621
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
622
        []*OpenChannel, error) {
3✔
623

3✔
624
        var channels []*OpenChannel
3✔
625

3✔
626
        // A node may have channels on several chains, so for each known chain,
3✔
627
        // we'll extract all the channels.
3✔
628
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
6✔
629
                // If there's a value, it's not a bucket so ignore it.
3✔
630
                if v != nil {
3✔
631
                        return nil
×
632
                }
×
633

634
                // Once we've found a valid channel bucket, we'll extract it
635
                // from the node's chain bucket.
636
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
3✔
637

3✔
638
                var outPoint wire.OutPoint
3✔
639
                err := graphdb.ReadOutpoint(
3✔
640
                        bytes.NewReader(chanPoint), &outPoint,
3✔
641
                )
3✔
642
                if err != nil {
3✔
643
                        return err
×
644
                }
×
645
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
3✔
646
                if err != nil {
3✔
647
                        return fmt.Errorf("unable to read channel data for "+
×
648
                                "chan_point=%v: %w", outPoint, err)
×
649
                }
×
650
                oChannel.Db = c
3✔
651

3✔
652
                channels = append(channels, oChannel)
3✔
653

3✔
654
                return nil
3✔
655
        })
656
        if err != nil {
3✔
657
                return nil, err
×
658
        }
×
659

660
        return channels, nil
3✔
661
}
662

663
// FetchChannel attempts to locate a channel specified by the passed channel
664
// point. If the channel cannot be found, then an error will be returned.
665
func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel,
666
        error) {
3✔
667

3✔
668
        var targetChanPoint bytes.Buffer
3✔
669
        err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint)
3✔
670
        if err != nil {
3✔
671
                return nil, err
×
672
        }
×
673

674
        targetChanPointBytes := targetChanPoint.Bytes()
3✔
675
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
676
                error) {
6✔
677

3✔
678
                return targetChanPointBytes, &chanPoint, nil
3✔
679
        }
3✔
680

681
        return c.channelScanner(nil, selector)
3✔
682
}
683

684
// FetchChannelByID attempts to locate a channel specified by the passed channel
685
// ID. If the channel cannot be found, then an error will be returned.
686
// Optionally an existing db tx can be supplied.
687
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
688
        *OpenChannel, error) {
3✔
689

3✔
690
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
3✔
691
                error) {
6✔
692

3✔
693
                var (
3✔
694
                        targetChanPointBytes []byte
3✔
695
                        targetChanPoint      *wire.OutPoint
3✔
696

3✔
697
                        // errChanFound is used to signal that the channel has
3✔
698
                        // been found so that iteration through the DB buckets
3✔
699
                        // can stop.
3✔
700
                        errChanFound = errors.New("channel found")
3✔
701
                )
3✔
702
                err := chainBkt.ForEach(func(k, _ []byte) error {
6✔
703
                        var outPoint wire.OutPoint
3✔
704
                        err := graphdb.ReadOutpoint(
3✔
705
                                bytes.NewReader(k), &outPoint,
3✔
706
                        )
3✔
707
                        if err != nil {
3✔
708
                                return err
×
709
                        }
×
710

711
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
3✔
712
                        if chanID != id {
3✔
UNCOV
713
                                return nil
×
UNCOV
714
                        }
×
715

716
                        targetChanPoint = &outPoint
3✔
717
                        targetChanPointBytes = k
3✔
718

3✔
719
                        return errChanFound
3✔
720
                })
721
                if err != nil && !errors.Is(err, errChanFound) {
3✔
722
                        return nil, nil, err
×
723
                }
×
724
                if targetChanPoint == nil {
3✔
UNCOV
725
                        return nil, nil, ErrChannelNotFound
×
UNCOV
726
                }
×
727

728
                return targetChanPointBytes, targetChanPoint, nil
3✔
729
        }
730

731
        return c.channelScanner(tx, selector)
3✔
732
}
733

734
// ChanCount is used by the server in determining access control.
735
type ChanCount struct {
736
        HasOpenOrClosedChan bool
737
        PendingOpenCount    uint64
738
}
739

740
// FetchPermAndTempPeers returns a map where the key is the remote node's
741
// public key and the value is a struct that has a tally of the pending-open
742
// channels and whether the peer has an open or closed channel with us.
743
func (c *ChannelStateDB) FetchPermAndTempPeers(
744
        chainHash []byte) (map[string]ChanCount, error) {
3✔
745

3✔
746
        peerCounts := make(map[string]ChanCount)
3✔
747

3✔
748
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
749
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
750
                if openChanBucket == nil {
3✔
751
                        return ErrNoChanDBExists
×
752
                }
×
753

754
                openChanErr := openChanBucket.ForEach(func(nodePub,
3✔
755
                        v []byte) error {
6✔
756

3✔
757
                        // If there is a value, this is not a bucket.
3✔
758
                        if v != nil {
3✔
759
                                return nil
×
760
                        }
×
761

762
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
763
                                nodePub,
3✔
764
                        )
3✔
765
                        if nodeChanBucket == nil {
3✔
766
                                return nil
×
767
                        }
×
768

769
                        chainBucket := nodeChanBucket.NestedReadBucket(
3✔
770
                                chainHash,
3✔
771
                        )
3✔
772
                        if chainBucket == nil {
3✔
773
                                return fmt.Errorf("no chain bucket exists")
×
774
                        }
×
775

776
                        var isPermPeer bool
3✔
777
                        var pendingOpenCount uint64
3✔
778

3✔
779
                        internalErr := chainBucket.ForEach(func(chanPoint,
3✔
780
                                val []byte) error {
6✔
781

3✔
782
                                // If there is a value, this is not a bucket.
3✔
783
                                if val != nil {
3✔
784
                                        return nil
×
785
                                }
×
786

787
                                chanBucket := chainBucket.NestedReadBucket(
3✔
788
                                        chanPoint,
3✔
789
                                )
3✔
790
                                if chanBucket == nil {
3✔
791
                                        return nil
×
792
                                }
×
793

794
                                var op wire.OutPoint
3✔
795
                                readErr := graphdb.ReadOutpoint(
3✔
796
                                        bytes.NewReader(chanPoint), &op,
3✔
797
                                )
3✔
798
                                if readErr != nil {
3✔
799
                                        return readErr
×
800
                                }
×
801

802
                                // We need to go through each channel and look
803
                                // at the IsPending status.
804
                                openChan, err := fetchOpenChannel(
3✔
805
                                        chanBucket, &op,
3✔
806
                                )
3✔
807
                                if err != nil {
3✔
808
                                        return err
×
809
                                }
×
810

811
                                if openChan.IsPending {
6✔
812
                                        // Add to the pending-open count since
3✔
813
                                        // this is a temp peer.
3✔
814
                                        pendingOpenCount++
3✔
815
                                        return nil
3✔
816
                                }
3✔
817

818
                                // Since IsPending is false, this is a perm
819
                                // peer.
820
                                isPermPeer = true
3✔
821

3✔
822
                                return nil
3✔
823
                        })
824
                        if internalErr != nil {
3✔
825
                                return internalErr
×
826
                        }
×
827

828
                        peerCount := ChanCount{
3✔
829
                                HasOpenOrClosedChan: isPermPeer,
3✔
830
                                PendingOpenCount:    pendingOpenCount,
3✔
831
                        }
3✔
832
                        peerCounts[string(nodePub)] = peerCount
3✔
833

3✔
834
                        return nil
3✔
835
                })
836
                if openChanErr != nil {
3✔
837
                        return openChanErr
×
838
                }
×
839

840
                // Now check the closed channel bucket.
841
                historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
3✔
842
                if historicalChanBucket == nil {
3✔
843
                        return ErrNoHistoricalBucket
×
844
                }
×
845

846
                historicalErr := historicalChanBucket.ForEach(func(chanPoint,
3✔
847
                        v []byte) error {
6✔
848
                        // Parse each nested bucket and the chanInfoKey to get
3✔
849
                        // the IsPending bool. This determines whether the
3✔
850
                        // peer is protected or not.
3✔
851
                        if v != nil {
3✔
852
                                // This is not a bucket. This is currently not
×
853
                                // possible.
×
854
                                return nil
×
855
                        }
×
856

857
                        chanBucket := historicalChanBucket.NestedReadBucket(
3✔
858
                                chanPoint,
3✔
859
                        )
3✔
860
                        if chanBucket == nil {
3✔
861
                                // This is not possible.
×
862
                                return fmt.Errorf("no historical channel " +
×
863
                                        "bucket exists")
×
864
                        }
×
865

866
                        var op wire.OutPoint
3✔
867
                        readErr := graphdb.ReadOutpoint(
3✔
868
                                bytes.NewReader(chanPoint), &op,
3✔
869
                        )
3✔
870
                        if readErr != nil {
3✔
871
                                return readErr
×
872
                        }
×
873

874
                        // This channel is closed, but the structure of the
875
                        // historical bucket is the same. This is by design,
876
                        // which means we can call fetchOpenChannel.
877
                        channel, fetchErr := fetchOpenChannel(chanBucket, &op)
3✔
878
                        if fetchErr != nil {
3✔
879
                                return fetchErr
×
880
                        }
×
881

882
                        // Only include this peer in the protected class if
883
                        // the closing transaction confirmed. Note that
884
                        // CloseChannel can be called in the funding manager
885
                        // while IsPending is true which is why we need this
886
                        // special-casing to not count premature funding
887
                        // manager calls to CloseChannel.
888
                        if !channel.IsPending {
6✔
889
                                // Fetch the public key of the remote node. We
3✔
890
                                // need to use the string-ified serialized,
3✔
891
                                // compressed bytes as the key.
3✔
892
                                remotePub := channel.IdentityPub
3✔
893
                                remoteSer := remotePub.SerializeCompressed()
3✔
894
                                remoteKey := string(remoteSer)
3✔
895

3✔
896
                                count, exists := peerCounts[remoteKey]
3✔
897
                                if exists {
6✔
898
                                        count.HasOpenOrClosedChan = true
3✔
899
                                        peerCounts[remoteKey] = count
3✔
900
                                } else {
3✔
901
                                        peerCount := ChanCount{
×
902
                                                HasOpenOrClosedChan: true,
×
903
                                        }
×
904
                                        peerCounts[remoteKey] = peerCount
×
905
                                }
×
906
                        }
907

908
                        return nil
3✔
909
                })
910
                if historicalErr != nil {
3✔
911
                        return historicalErr
×
912
                }
×
913

914
                return nil
3✔
915
        }, func() {
3✔
916
                clear(peerCounts)
3✔
917
        })
3✔
918

919
        return peerCounts, err
3✔
920
}
921

922
// channelSelector describes a function that takes a chain-hash bucket from
923
// within the open-channel DB and returns the wanted channel point bytes, and
924
// channel point. It must return the ErrChannelNotFound error if the wanted
925
// channel is not in the given bucket.
926
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
927
        error)
928

929
// channelScanner will traverse the DB to each chain-hash bucket of each node
930
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
931
// to fetch the wanted channel outpoint from the chain bucket.
932
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
933
        chanSelect channelSelector) (*OpenChannel, error) {
3✔
934

3✔
935
        var (
3✔
936
                targetChan *OpenChannel
3✔
937

3✔
938
                // errChanFound is used to signal that the channel has been
3✔
939
                // found so that iteration through the DB buckets can stop.
3✔
940
                errChanFound = errors.New("channel found")
3✔
941
        )
3✔
942

3✔
943
        // chanScan will traverse the following bucket structure:
3✔
944
        //  * nodePub => chainHash => chanPoint
3✔
945
        //
3✔
946
        // At each level we go one further, ensuring that we're traversing the
3✔
947
        // proper key (that's actually a bucket). By only reading the bucket
3✔
948
        // structure and skipping fully decoding each channel, we save a good
3✔
949
        // bit of CPU as we don't need to do things like decompress public
3✔
950
        // keys.
3✔
951
        chanScan := func(tx kvdb.RTx) error {
6✔
952
                // Get the bucket dedicated to storing the metadata for open
3✔
953
                // channels.
3✔
954
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
955
                if openChanBucket == nil {
3✔
956
                        return ErrNoActiveChannels
×
957
                }
×
958

959
                // Within the node channel bucket, are the set of node pubkeys
960
                // we have channels with, we don't know the entire set, so we'll
961
                // check them all.
962
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
6✔
963
                        // Ensure that this is a key the same size as a pubkey,
3✔
964
                        // and also that it leads directly to a bucket.
3✔
965
                        if len(nodePub) != 33 || v != nil {
3✔
966
                                return nil
×
967
                        }
×
968

969
                        nodeChanBucket := openChanBucket.NestedReadBucket(
3✔
970
                                nodePub,
3✔
971
                        )
3✔
972
                        if nodeChanBucket == nil {
3✔
973
                                return nil
×
974
                        }
×
975

976
                        // The next layer down is all the chains that this node
977
                        // has channels on with us.
978
                        return nodeChanBucket.ForEach(func(chainHash,
3✔
979
                                v []byte) error {
6✔
980

3✔
981
                                // If there's a value, it's not a bucket so
3✔
982
                                // ignore it.
3✔
983
                                if v != nil {
3✔
984
                                        return nil
×
985
                                }
×
986

987
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
988
                                        chainHash,
3✔
989
                                )
3✔
990
                                if chainBucket == nil {
3✔
991
                                        return fmt.Errorf("unable to read "+
×
992
                                                "bucket for chain=%x",
×
993
                                                chainHash)
×
994
                                }
×
995

996
                                // Finally, we reach the leaf bucket that stores
997
                                // all the chanPoints for this node.
998
                                targetChanBytes, chanPoint, err := chanSelect(
3✔
999
                                        chainBucket,
3✔
1000
                                )
3✔
1001
                                if errors.Is(err, ErrChannelNotFound) {
3✔
UNCOV
1002
                                        return nil
×
1003
                                } else if err != nil {
3✔
1004
                                        return err
×
1005
                                }
×
1006

1007
                                chanBucket := chainBucket.NestedReadBucket(
3✔
1008
                                        targetChanBytes,
3✔
1009
                                )
3✔
1010
                                if chanBucket == nil {
6✔
1011
                                        return nil
3✔
1012
                                }
3✔
1013

1014
                                channel, err := fetchOpenChannel(
3✔
1015
                                        chanBucket, chanPoint,
3✔
1016
                                )
3✔
1017
                                if err != nil {
3✔
1018
                                        return err
×
1019
                                }
×
1020

1021
                                targetChan = channel
3✔
1022
                                targetChan.Db = c
3✔
1023

3✔
1024
                                return errChanFound
3✔
1025
                        })
1026
                })
1027
        }
1028

1029
        var err error
3✔
1030
        if tx == nil {
6✔
1031
                err = kvdb.View(c.backend, chanScan, func() {})
6✔
1032
        } else {
×
1033
                err = chanScan(tx)
×
1034
        }
×
1035
        if err != nil && !errors.Is(err, errChanFound) {
3✔
1036
                return nil, err
×
1037
        }
×
1038

1039
        if targetChan != nil {
6✔
1040
                return targetChan, nil
3✔
1041
        }
3✔
1042

1043
        // If we can't find the channel, then we return with an error, as we
1044
        // have nothing to back up.
1045
        return nil, ErrChannelNotFound
3✔
1046
}
1047

1048
// FetchAllChannels attempts to retrieve all open channels currently stored
1049
// within the database, including pending open, fully open and channels waiting
1050
// for a closing transaction to confirm.
1051
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
3✔
1052
        return fetchChannels(c)
3✔
1053
}
3✔
1054

1055
// FetchAllOpenChannels will return all channels that have the funding
1056
// transaction confirmed, and is not waiting for a closing transaction to be
1057
// confirmed.
1058
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
3✔
1059
        return fetchChannels(
3✔
1060
                c,
3✔
1061
                pendingChannelFilter(false),
3✔
1062
                waitingCloseFilter(false),
3✔
1063
        )
3✔
1064
}
3✔
1065

1066
// FetchPendingChannels will return channels that have completed the process of
1067
// generating and broadcasting funding transactions, but whose funding
1068
// transactions have yet to be confirmed on the blockchain.
1069
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
3✔
1070
        return fetchChannels(c,
3✔
1071
                pendingChannelFilter(true),
3✔
1072
                waitingCloseFilter(false),
3✔
1073
        )
3✔
1074
}
3✔
1075

1076
// FetchWaitingCloseChannels will return all channels that have been opened,
1077
// but are now waiting for a closing transaction to be confirmed.
1078
//
1079
// NOTE: This includes channels that are also pending to be opened.
1080
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
3✔
1081
        return fetchChannels(
3✔
1082
                c, waitingCloseFilter(true),
3✔
1083
        )
3✔
1084
}
3✔
1085

1086
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
1087
// A set of filters can be combined to filter across multiple dimensions.
1088
type fetchChannelsFilter func(channel *OpenChannel) bool
1089

1090
// pendingChannelFilter returns a filter based on whether channels are pending
1091
// (ie, their funding transaction still needs to confirm). If pending is false,
1092
// channels with confirmed funding transactions are returned.
1093
func pendingChannelFilter(pending bool) fetchChannelsFilter {
3✔
1094
        return func(channel *OpenChannel) bool {
6✔
1095
                return channel.IsPending == pending
3✔
1096
        }
3✔
1097
}
1098

1099
// waitingCloseFilter returns a filter which filters channels based on whether
1100
// they are awaiting the confirmation of their closing transaction. If waiting
1101
// close is true, channels that have had their closing tx broadcast are
1102
// included. If it is false, channels that are not awaiting confirmation of
1103
// their close transaction are returned.
1104
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
3✔
1105
        return func(channel *OpenChannel) bool {
6✔
1106
                // If the channel is in any other state than Default,
3✔
1107
                // then it means it is waiting to be closed.
3✔
1108
                channelWaitingClose :=
3✔
1109
                        channel.ChanStatus() != ChanStatusDefault
3✔
1110

3✔
1111
                // Include the channel if it matches the value for
3✔
1112
                // waiting close that we are filtering on.
3✔
1113
                return channelWaitingClose == waitingClose
3✔
1114
        }
3✔
1115
}
1116

1117
// fetchChannels attempts to retrieve channels currently stored in the
1118
// database. It takes a set of filters which are applied to each channel to
1119
// obtain a set of channels with the desired set of properties. Only channels
1120
// which have a true value returned for *all* of the filters will be returned.
1121
// If no filters are provided, every channel in the open channels bucket will
1122
// be returned.
1123
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
1124
        []*OpenChannel, error) {
3✔
1125

3✔
1126
        var channels []*OpenChannel
3✔
1127

3✔
1128
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1129
                // Get the bucket dedicated to storing the metadata for open
3✔
1130
                // channels.
3✔
1131
                openChanBucket := tx.ReadBucket(openChannelBucket)
3✔
1132
                if openChanBucket == nil {
3✔
1133
                        return ErrNoActiveChannels
×
1134
                }
×
1135

1136
                // Next, fetch the bucket dedicated to storing metadata related
1137
                // to all nodes. All keys within this bucket are the serialized
1138
                // public keys of all our direct counterparties.
1139
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
3✔
1140
                if nodeMetaBucket == nil {
3✔
1141
                        return fmt.Errorf("node bucket not created")
×
1142
                }
×
1143

1144
                // Finally for each node public key in the bucket, fetch all
1145
                // the channels related to this particular node.
1146
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
6✔
1147
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
3✔
1148
                        if nodeChanBucket == nil {
3✔
1149
                                return nil
×
1150
                        }
×
1151

1152
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
6✔
1153
                                // If there's a value, it's not a bucket so
3✔
1154
                                // ignore it.
3✔
1155
                                if v != nil {
3✔
1156
                                        return nil
×
1157
                                }
×
1158

1159
                                // If we've found a valid chainhash bucket,
1160
                                // then we'll retrieve that so we can extract
1161
                                // all the channels.
1162
                                chainBucket := nodeChanBucket.NestedReadBucket(
3✔
1163
                                        chainHash,
3✔
1164
                                )
3✔
1165
                                if chainBucket == nil {
3✔
1166
                                        return fmt.Errorf("unable to read "+
×
1167
                                                "bucket for chain=%x", chainHash[:])
×
1168
                                }
×
1169

1170
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
3✔
1171
                                if err != nil {
3✔
1172
                                        return fmt.Errorf("unable to read "+
×
1173
                                                "channel for chain_hash=%x, "+
×
1174
                                                "node_key=%x: %v", chainHash[:], k, err)
×
1175
                                }
×
1176
                                for _, channel := range nodeChans {
6✔
1177
                                        // includeChannel indicates whether the channel
3✔
1178
                                        // meets the criteria specified by our filters.
3✔
1179
                                        includeChannel := true
3✔
1180

3✔
1181
                                        // Run through each filter and check whether the
3✔
1182
                                        // channel should be included.
3✔
1183
                                        for _, f := range filters {
6✔
1184
                                                // If the channel fails the filter, set
3✔
1185
                                                // includeChannel to false and don't bother
3✔
1186
                                                // checking the remaining filters.
3✔
1187
                                                if !f(channel) {
6✔
1188
                                                        includeChannel = false
3✔
1189
                                                        break
3✔
1190
                                                }
1191
                                        }
1192

1193
                                        // If the channel passed every filter, include it in
1194
                                        // our set of channels.
1195
                                        if includeChannel {
6✔
1196
                                                channels = append(channels, channel)
3✔
1197
                                        }
3✔
1198
                                }
1199
                                return nil
3✔
1200
                        })
1201

1202
                })
1203
        }, func() {
3✔
1204
                channels = nil
3✔
1205
        })
3✔
1206
        if err != nil {
3✔
1207
                return nil, err
×
1208
        }
×
1209

1210
        return channels, nil
3✔
1211
}
1212

1213
// FetchClosedChannels attempts to fetch all closed channels from the database.
1214
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1215
// be returned in the response or not. When a channel was cooperatively closed,
1216
// it becomes fully closed after a single confirmation.  When a channel was
1217
// forcibly closed, it will become fully closed after _all_ the pending funds
1218
// (if any) have been swept.
1219
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1220
        []*ChannelCloseSummary, error) {
3✔
1221

3✔
1222
        var chanSummaries []*ChannelCloseSummary
3✔
1223

3✔
1224
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1225
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1226
                if closeBucket == nil {
3✔
1227
                        return ErrNoClosedChannels
×
1228
                }
×
1229

1230
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
6✔
1231
                        summaryReader := bytes.NewReader(summaryBytes)
3✔
1232
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
3✔
1233
                        if err != nil {
3✔
1234
                                return err
×
1235
                        }
×
1236

1237
                        // If the query specified to only include pending
1238
                        // channels, then we'll skip any channels which aren't
1239
                        // currently pending.
1240
                        if !chanSummary.IsPending && pendingOnly {
6✔
1241
                                return nil
3✔
1242
                        }
3✔
1243

1244
                        chanSummaries = append(chanSummaries, chanSummary)
3✔
1245
                        return nil
3✔
1246
                })
1247
        }, func() {
3✔
1248
                chanSummaries = nil
3✔
1249
        }); err != nil {
3✔
1250
                return nil, err
×
1251
        }
×
1252

1253
        return chanSummaries, nil
3✔
1254
}
1255

1256
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1257
// the channeldb.
1258
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1259

1260
// FetchClosedChannel queries for a channel close summary using the channel
1261
// point of the channel in question.
1262
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1263
        *ChannelCloseSummary, error) {
3✔
1264

3✔
1265
        var chanSummary *ChannelCloseSummary
3✔
1266
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1267
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1268
                if closeBucket == nil {
3✔
1269
                        return ErrClosedChannelNotFound
×
1270
                }
×
1271

1272
                var b bytes.Buffer
3✔
1273
                var err error
3✔
1274
                if err = graphdb.WriteOutpoint(&b, chanID); err != nil {
3✔
1275
                        return err
×
1276
                }
×
1277

1278
                summaryBytes := closeBucket.Get(b.Bytes())
3✔
1279
                if summaryBytes == nil {
3✔
UNCOV
1280
                        return ErrClosedChannelNotFound
×
UNCOV
1281
                }
×
1282

1283
                summaryReader := bytes.NewReader(summaryBytes)
3✔
1284
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
3✔
1285

3✔
1286
                return err
3✔
1287
        }, func() {
3✔
1288
                chanSummary = nil
3✔
1289
        }); err != nil {
3✔
UNCOV
1290
                return nil, err
×
UNCOV
1291
        }
×
1292

1293
        return chanSummary, nil
3✔
1294
}
1295

1296
// FetchClosedChannelForID queries for a channel close summary using the
1297
// channel ID of the channel in question.
1298
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1299
        *ChannelCloseSummary, error) {
3✔
1300

3✔
1301
        var chanSummary *ChannelCloseSummary
3✔
1302
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1303
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1304
                if closeBucket == nil {
3✔
1305
                        return ErrClosedChannelNotFound
×
1306
                }
×
1307

1308
                // The first 30 bytes of the channel ID and outpoint will be
1309
                // equal.
1310
                cursor := closeBucket.ReadCursor()
3✔
1311
                op, c := cursor.Seek(cid[:30])
3✔
1312

3✔
1313
                // We scan over all possible candidates for this channel ID.
3✔
1314
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
6✔
1315
                        var outPoint wire.OutPoint
3✔
1316
                        err := graphdb.ReadOutpoint(
3✔
1317
                                bytes.NewReader(op), &outPoint,
3✔
1318
                        )
3✔
1319
                        if err != nil {
3✔
1320
                                return err
×
1321
                        }
×
1322

1323
                        // If the found outpoint does not correspond to this
1324
                        // channel ID, we continue.
1325
                        if !cid.IsChanPoint(&outPoint) {
3✔
UNCOV
1326
                                continue
×
1327
                        }
1328

1329
                        // Deserialize the close summary and return.
1330
                        r := bytes.NewReader(c)
3✔
1331
                        chanSummary, err = deserializeCloseChannelSummary(r)
3✔
1332
                        if err != nil {
3✔
1333
                                return err
×
1334
                        }
×
1335

1336
                        return nil
3✔
1337
                }
1338
                return ErrClosedChannelNotFound
3✔
1339
        }, func() {
3✔
1340
                chanSummary = nil
3✔
1341
        }); err != nil {
6✔
1342
                return nil, err
3✔
1343
        }
3✔
1344

1345
        return chanSummary, nil
3✔
1346
}
1347

1348
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1349
// channel should be marked as fully closed if the channel was initially
1350
// cooperatively closed and it's reached a single confirmation, or after all
1351
// the pending funds in a channel that has been forcibly closed have been
1352
// swept.
1353
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
3✔
1354
        var (
3✔
1355
                openChannels  []*OpenChannel
3✔
1356
                pruneLinkNode *btcec.PublicKey
3✔
1357
        )
3✔
1358
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1359
                var b bytes.Buffer
3✔
1360
                if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
3✔
1361
                        return err
×
1362
                }
×
1363

1364
                chanID := b.Bytes()
3✔
1365

3✔
1366
                closedChanBucket, err := tx.CreateTopLevelBucket(
3✔
1367
                        closedChannelBucket,
3✔
1368
                )
3✔
1369
                if err != nil {
3✔
1370
                        return err
×
1371
                }
×
1372

1373
                chanSummaryBytes := closedChanBucket.Get(chanID)
3✔
1374
                if chanSummaryBytes == nil {
3✔
1375
                        return fmt.Errorf("no closed channel for "+
×
1376
                                "chan_point=%v found", chanPoint)
×
1377
                }
×
1378

1379
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
3✔
1380
                chanSummary, err := deserializeCloseChannelSummary(
3✔
1381
                        chanSummaryReader,
3✔
1382
                )
3✔
1383
                if err != nil {
3✔
1384
                        return err
×
1385
                }
×
1386

1387
                chanSummary.IsPending = false
3✔
1388

3✔
1389
                var newSummary bytes.Buffer
3✔
1390
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
3✔
1391
                if err != nil {
3✔
1392
                        return err
×
1393
                }
×
1394

1395
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
3✔
1396
                if err != nil {
3✔
1397
                        return err
×
1398
                }
×
1399

1400
                // Now that the channel is closed, we'll check if we have any
1401
                // other open channels with this peer. If we don't we'll
1402
                // garbage collect it to ensure we don't establish persistent
1403
                // connections to peers without open channels.
1404
                pruneLinkNode = chanSummary.RemotePub
3✔
1405
                openChannels, err = c.fetchOpenChannels(
3✔
1406
                        tx, pruneLinkNode,
3✔
1407
                )
3✔
1408
                if err != nil {
3✔
1409
                        return fmt.Errorf("unable to fetch open channels for "+
×
1410
                                "peer %x: %v",
×
1411
                                pruneLinkNode.SerializeCompressed(), err)
×
1412
                }
×
1413

1414
                return nil
3✔
1415
        }, func() {
3✔
1416
                openChannels = nil
3✔
1417
                pruneLinkNode = nil
3✔
1418
        })
3✔
1419
        if err != nil {
3✔
1420
                return err
×
1421
        }
×
1422

1423
        // Decide whether we want to remove the link node, based upon the number
1424
        // of still open channels.
1425
        return c.pruneLinkNode(openChannels, pruneLinkNode)
3✔
1426
}
1427

1428
// pruneLinkNode determines whether we should garbage collect a link node from
1429
// the database due to no longer having any open channels with it. If there are
1430
// any left, then this acts as a no-op.
1431
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1432
        remotePub *btcec.PublicKey) error {
3✔
1433

3✔
1434
        if len(openChannels) > 0 {
6✔
1435
                return nil
3✔
1436
        }
3✔
1437

1438
        log.Infof("Pruning link node %x with zero open channels from database",
3✔
1439
                remotePub.SerializeCompressed())
3✔
1440

3✔
1441
        return c.linkNodeDB.DeleteLinkNode(remotePub)
3✔
1442
}
1443

1444
// PruneLinkNodes attempts to prune all link nodes found within the database
1445
// with whom we no longer have any open channels with.
1446
func (c *ChannelStateDB) PruneLinkNodes() error {
3✔
1447
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
3✔
1448
        if err != nil {
3✔
1449
                return err
×
1450
        }
×
1451

1452
        for _, linkNode := range allLinkNodes {
6✔
1453
                var (
3✔
1454
                        openChannels []*OpenChannel
3✔
1455
                        linkNode     = linkNode
3✔
1456
                )
3✔
1457
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1458
                        var err error
3✔
1459
                        openChannels, err = c.fetchOpenChannels(
3✔
1460
                                tx, linkNode.IdentityPub,
3✔
1461
                        )
3✔
1462
                        return err
3✔
1463
                }, func() {
6✔
1464
                        openChannels = nil
3✔
1465
                })
3✔
1466
                if err != nil {
3✔
1467
                        return err
×
1468
                }
×
1469

1470
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
3✔
1471
                if err != nil {
3✔
1472
                        return err
×
1473
                }
×
1474
        }
1475

1476
        return nil
3✔
1477
}
1478

1479
// ChannelShell is a shell of a channel that is meant to be used for channel
1480
// recovery purposes. It contains a minimal OpenChannel instance along with
1481
// addresses for that target node.
1482
type ChannelShell struct {
1483
        // NodeAddrs the set of addresses that this node has known to be
1484
        // reachable at in the past.
1485
        NodeAddrs []net.Addr
1486

1487
        // Chan is a shell of an OpenChannel, it contains only the items
1488
        // required to restore the channel on disk.
1489
        Chan *OpenChannel
1490
}
1491

1492
// RestoreChannelShells is a method that allows the caller to reconstruct the
1493
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1494
// new channel to disk, create a LinkNode instance with the passed node
1495
// addresses, and finally create an edge within the graph for the channel as
1496
// well. This method is idempotent, so repeated calls with the same set of
1497
// channel shells won't modify the database after the initial call.
1498
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
3✔
1499
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1500
                for _, channelShell := range channelShells {
6✔
1501
                        channel := channelShell.Chan
3✔
1502

3✔
1503
                        // When we make a channel, we mark that the channel has
3✔
1504
                        // been restored, this will signal to other sub-systems
3✔
1505
                        // to not attempt to use the channel as if it was a
3✔
1506
                        // regular one.
3✔
1507
                        channel.chanStatus |= ChanStatusRestored
3✔
1508

3✔
1509
                        // First, we'll attempt to create a new open channel
3✔
1510
                        // and link node for this channel. If the channel
3✔
1511
                        // already exists, then in order to ensure this method
3✔
1512
                        // is idempotent, we'll continue to the next step.
3✔
1513
                        channel.Db = c
3✔
1514
                        err := syncNewChannel(
3✔
1515
                                tx, channel, channelShell.NodeAddrs,
3✔
1516
                        )
3✔
1517
                        if err != nil {
6✔
1518
                                return err
3✔
1519
                        }
3✔
1520
                }
1521

1522
                return nil
3✔
1523
        }, func() {})
3✔
1524
        if err != nil {
6✔
1525
                return err
3✔
1526
        }
3✔
1527

1528
        return nil
3✔
1529
}
1530

1531
// AddrsForNode consults the channel database for all addresses known to the
1532
// passed node public key. The returned boolean indicates if the given node is
1533
// unknown to the channel DB or not.
1534
//
1535
// NOTE: this is part of the AddrSource interface.
1536
func (d *DB) AddrsForNode(_ context.Context, nodePub *btcec.PublicKey) (bool,
1537
        []net.Addr, error) {
3✔
1538

3✔
1539
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
3✔
1540
        // Only if the error is something other than ErrNodeNotFound do we
3✔
1541
        // return it.
3✔
1542
        switch {
3✔
1543
        case err != nil && !errors.Is(err, ErrNodeNotFound):
×
1544
                return false, nil, err
×
1545

1546
        case errors.Is(err, ErrNodeNotFound):
×
1547
                return false, nil, nil
×
1548
        }
1549

1550
        return true, linkNode.Addresses, nil
3✔
1551
}
1552

1553
// AbandonChannel attempts to remove the target channel from the open channel
1554
// database. If the channel was already removed (has a closed channel entry),
1555
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1556
// into the database.
1557
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1558
        bestHeight uint32) error {
3✔
1559

3✔
1560
        // With the chanPoint constructed, we'll attempt to find the target
3✔
1561
        // channel in the database. If we can't find the channel, then we'll
3✔
1562
        // return the error back to the caller.
3✔
1563
        dbChan, err := c.FetchChannel(*chanPoint)
3✔
1564
        switch {
3✔
1565
        // If the channel wasn't found, then it's possible that it was already
1566
        // abandoned from the database.
1567
        case err == ErrChannelNotFound:
3✔
1568
                _, closedErr := c.FetchClosedChannel(chanPoint)
3✔
1569
                if closedErr != nil {
3✔
UNCOV
1570
                        return closedErr
×
UNCOV
1571
                }
×
1572

1573
                // If the channel was already closed, then we don't return an
1574
                // error as we'd like this step to be repeatable.
1575
                return nil
3✔
1576
        case err != nil:
×
1577
                return err
×
1578
        }
1579

1580
        // Now that we've found the channel, we'll populate a close summary for
1581
        // the channel, so we can store as much information for this abounded
1582
        // channel as possible. We also ensure that we set Pending to false, to
1583
        // indicate that this channel has been "fully" closed.
1584
        summary := &ChannelCloseSummary{
3✔
1585
                CloseType:               Abandoned,
3✔
1586
                ChanPoint:               *chanPoint,
3✔
1587
                ChainHash:               dbChan.ChainHash,
3✔
1588
                CloseHeight:             bestHeight,
3✔
1589
                RemotePub:               dbChan.IdentityPub,
3✔
1590
                Capacity:                dbChan.Capacity,
3✔
1591
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
3✔
1592
                ShortChanID:             dbChan.ShortChanID(),
3✔
1593
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
3✔
1594
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
3✔
1595
                LocalChanConfig:         dbChan.LocalChanCfg,
3✔
1596
        }
3✔
1597

3✔
1598
        // Finally, we'll close the channel in the DB, and return back to the
3✔
1599
        // caller. We set ourselves as the close initiator because we abandoned
3✔
1600
        // the channel.
3✔
1601
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
3✔
1602
}
1603

1604
// SaveChannelOpeningState saves the serialized channel state for the provided
1605
// chanPoint to the channelOpeningStateBucket.
1606
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1607
        serializedState []byte) error {
3✔
1608

3✔
1609
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1610
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
3✔
1611
                if err != nil {
3✔
1612
                        return err
×
1613
                }
×
1614

1615
                return bucket.Put(outPoint, serializedState)
3✔
1616
        }, func() {})
3✔
1617
}
1618

1619
// GetChannelOpeningState fetches the serialized channel state for the provided
1620
// outPoint from the database, or returns ErrChannelNotFound if the channel
1621
// is not found.
1622
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1623
        error) {
3✔
1624

3✔
1625
        var serializedState []byte
3✔
1626
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1627
                bucket := tx.ReadBucket(channelOpeningStateBucket)
3✔
1628
                if bucket == nil {
6✔
1629
                        // If the bucket does not exist, it means we never added
3✔
1630
                        //  a channel to the db, so return ErrChannelNotFound.
3✔
1631
                        return ErrChannelNotFound
3✔
1632
                }
3✔
1633

1634
                stateBytes := bucket.Get(outPoint)
3✔
1635
                if stateBytes == nil {
6✔
1636
                        return ErrChannelNotFound
3✔
1637
                }
3✔
1638

1639
                serializedState = append(serializedState, stateBytes...)
3✔
1640

3✔
1641
                return nil
3✔
1642
        }, func() {
3✔
1643
                serializedState = nil
3✔
1644
        })
3✔
1645
        return serializedState, err
3✔
1646
}
1647

1648
// DeleteChannelOpeningState removes any state for outPoint from the database.
1649
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
3✔
1650
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
6✔
1651
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
3✔
1652
                if bucket == nil {
3✔
1653
                        return ErrChannelNotFound
×
1654
                }
×
1655

1656
                return bucket.Delete(outPoint)
3✔
1657
        }, func() {})
3✔
1658
}
1659

1660
// syncVersions function is used for safe db version synchronization. It
1661
// applies migration functions to the current database and recovers the
1662
// previous state of db if at least one error/panic appeared during migration.
1663
func (d *DB) syncVersions(versions []mandatoryVersion) error {
3✔
1664
        meta, err := d.FetchMeta()
3✔
1665
        if err != nil {
3✔
1666
                if err == ErrMetaNotFound {
×
1667
                        meta = &Meta{}
×
1668
                } else {
×
1669
                        return err
×
1670
                }
×
1671
        }
1672

1673
        latestVersion := getLatestDBVersion(versions)
3✔
1674
        log.Infof("Checking for schema update: latest_version=%v, "+
3✔
1675
                "db_version=%v", latestVersion, meta.DbVersionNumber)
3✔
1676

3✔
1677
        switch {
3✔
1678

1679
        // If the database reports a higher version that we are aware of, the
1680
        // user is probably trying to revert to a prior version of lnd. We fail
1681
        // here to prevent reversions and unintended corruption.
UNCOV
1682
        case meta.DbVersionNumber > latestVersion:
×
UNCOV
1683
                log.Errorf("Refusing to revert from db_version=%d to "+
×
UNCOV
1684
                        "lower version=%d", meta.DbVersionNumber,
×
UNCOV
1685
                        latestVersion)
×
UNCOV
1686
                return ErrDBReversion
×
1687

1688
        // If the current database version matches the latest version number,
1689
        // then we don't need to perform any migrations.
1690
        case meta.DbVersionNumber == latestVersion:
3✔
1691
                return nil
3✔
1692
        }
1693

UNCOV
1694
        log.Infof("Performing database schema migration")
×
UNCOV
1695

×
UNCOV
1696
        // Otherwise, we fetch the migrations which need to applied, and
×
UNCOV
1697
        // execute them serially within a single database transaction to ensure
×
UNCOV
1698
        // the migration is atomic.
×
UNCOV
1699
        migrations, migrationVersions := getMigrationsToApply(
×
UNCOV
1700
                versions, meta.DbVersionNumber,
×
UNCOV
1701
        )
×
UNCOV
1702
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
1703
                for i, migration := range migrations {
×
UNCOV
1704
                        if migration == nil {
×
1705
                                continue
×
1706
                        }
1707

UNCOV
1708
                        log.Infof("Applying migration #%v",
×
UNCOV
1709
                                migrationVersions[i])
×
UNCOV
1710

×
UNCOV
1711
                        if err := migration(tx); err != nil {
×
UNCOV
1712
                                log.Infof("Unable to apply migration #%v",
×
UNCOV
1713
                                        migrationVersions[i])
×
UNCOV
1714
                                return err
×
UNCOV
1715
                        }
×
1716
                }
1717

UNCOV
1718
                meta.DbVersionNumber = latestVersion
×
UNCOV
1719
                err := putMeta(meta, tx)
×
UNCOV
1720
                if err != nil {
×
1721
                        return err
×
1722
                }
×
1723

1724
                // In dry-run mode, return an error to prevent the transaction
1725
                // from committing.
UNCOV
1726
                if d.dryRun {
×
UNCOV
1727
                        return ErrDryRunMigrationOK
×
UNCOV
1728
                }
×
1729

UNCOV
1730
                return nil
×
UNCOV
1731
        }, func() {})
×
1732
}
1733

1734
// applyOptionalVersions takes a config to determine whether the optional
1735
// migrations will be applied.
1736
//
1737
// NOTE: only support the prune_revocation_log optional migration atm.
1738
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
3✔
1739
        // TODO(yy): need to design the db to support dry run for optional
3✔
1740
        // migrations.
3✔
1741
        if d.dryRun {
3✔
UNCOV
1742
                log.Info("Skipped optional migrations as dry run mode is not " +
×
UNCOV
1743
                        "supported yet")
×
UNCOV
1744
                return nil
×
UNCOV
1745
        }
×
1746

1747
        om, err := d.fetchOptionalMeta()
3✔
1748
        if err != nil {
3✔
1749
                if err == ErrMetaNotFound {
×
1750
                        om = &OptionalMeta{
×
1751
                                Versions: make(map[uint64]string),
×
1752
                        }
×
1753
                } else {
×
1754
                        return err
×
1755
                }
×
1756
        }
1757

1758
        log.Infof("Checking for optional update: prune_revocation_log=%v, "+
3✔
1759
                "db_version=%s", cfg.PruneRevocationLog, om)
3✔
1760

3✔
1761
        // Exit early if the optional migration is not specified.
3✔
1762
        if !cfg.PruneRevocationLog {
6✔
1763
                return nil
3✔
1764
        }
3✔
1765

1766
        // Exit early if the optional migration has already been applied.
UNCOV
1767
        if _, ok := om.Versions[0]; ok {
×
UNCOV
1768
                return nil
×
UNCOV
1769
        }
×
1770

1771
        // Get the optional version.
UNCOV
1772
        version := optionalVersions[0]
×
UNCOV
1773
        log.Infof("Performing database optional migration: %s", version.name)
×
UNCOV
1774

×
UNCOV
1775
        migrationCfg := &MigrationConfigImpl{
×
UNCOV
1776
                migration30.MigrateRevLogConfigImpl{
×
UNCOV
1777
                        NoAmountData: d.noRevLogAmtData,
×
UNCOV
1778
                },
×
UNCOV
1779
        }
×
UNCOV
1780

×
UNCOV
1781
        // Migrate the data.
×
UNCOV
1782
        if err := version.migration(d, migrationCfg); err != nil {
×
1783
                log.Errorf("Unable to apply optional migration: %s, error: %v",
×
1784
                        version.name, err)
×
1785
                return err
×
1786
        }
×
1787

1788
        // Update the optional meta. Notice that unlike the mandatory db
1789
        // migrations where we perform the migration and updating meta in a
1790
        // single db transaction, we use different transactions here. Even when
1791
        // the following update is failed, we should be fine here as we would
1792
        // re-run the optional migration again, which is a noop, during next
1793
        // startup.
UNCOV
1794
        om.Versions[0] = version.name
×
UNCOV
1795
        if err := d.putOptionalMeta(om); err != nil {
×
1796
                log.Errorf("Unable to update optional meta: %v", err)
×
1797
                return err
×
1798
        }
×
1799

UNCOV
1800
        return nil
×
1801
}
1802

1803
// ChannelStateDB returns the sub database that is concerned with the channel
1804
// state.
1805
func (d *DB) ChannelStateDB() *ChannelStateDB {
3✔
1806
        return d.channelStateDB
3✔
1807
}
3✔
1808

1809
// LatestDBVersion returns the number of the latest database version currently
1810
// known to the channel DB.
UNCOV
1811
func LatestDBVersion() uint32 {
×
UNCOV
1812
        return getLatestDBVersion(dbVersions)
×
UNCOV
1813
}
×
1814

1815
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
3✔
1816
        return versions[len(versions)-1].number
3✔
1817
}
3✔
1818

1819
// getMigrationsToApply retrieves the migration function that should be
1820
// applied to the database.
1821
func getMigrationsToApply(versions []mandatoryVersion,
UNCOV
1822
        version uint32) ([]migration, []uint32) {
×
UNCOV
1823

×
UNCOV
1824
        migrations := make([]migration, 0, len(versions))
×
UNCOV
1825
        migrationVersions := make([]uint32, 0, len(versions))
×
UNCOV
1826

×
UNCOV
1827
        for _, v := range versions {
×
UNCOV
1828
                if v.number > version {
×
UNCOV
1829
                        migrations = append(migrations, v.migration)
×
UNCOV
1830
                        migrationVersions = append(migrationVersions, v.number)
×
UNCOV
1831
                }
×
1832
        }
1833

UNCOV
1834
        return migrations, migrationVersions
×
1835
}
1836

1837
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1838
// from the historical channel bucket. If the bucket does not exist,
1839
// ErrNoHistoricalBucket is returned.
1840
func fetchHistoricalChanBucket(tx kvdb.RTx,
1841
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
3✔
1842

3✔
1843
        // First fetch the top level bucket which stores all data related to
3✔
1844
        // historically stored channels.
3✔
1845
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
3✔
1846
        if historicalChanBucket == nil {
3✔
1847
                return nil, ErrNoHistoricalBucket
×
1848
        }
×
1849

1850
        // With the bucket for the node and chain fetched, we can now go down
1851
        // another level, for the channel itself.
1852
        var chanPointBuf bytes.Buffer
3✔
1853
        if err := graphdb.WriteOutpoint(&chanPointBuf, outPoint); err != nil {
3✔
1854
                return nil, err
×
1855
        }
×
1856
        chanBucket := historicalChanBucket.NestedReadBucket(
3✔
1857
                chanPointBuf.Bytes(),
3✔
1858
        )
3✔
1859
        if chanBucket == nil {
3✔
UNCOV
1860
                return nil, ErrChannelNotFound
×
UNCOV
1861
        }
×
1862

1863
        return chanBucket, nil
3✔
1864
}
1865

1866
// FetchHistoricalChannel fetches open channel data from the historical channel
1867
// bucket.
1868
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1869
        *OpenChannel, error) {
3✔
1870

3✔
1871
        var channel *OpenChannel
3✔
1872
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1873
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
3✔
1874
                if err != nil {
3✔
UNCOV
1875
                        return err
×
UNCOV
1876
                }
×
1877

1878
                channel, err = fetchOpenChannel(chanBucket, outPoint)
3✔
1879
                if err != nil {
3✔
1880
                        return err
×
1881
                }
×
1882

1883
                channel.Db = c
3✔
1884
                return nil
3✔
1885
        }, func() {
3✔
1886
                channel = nil
3✔
1887
        })
3✔
1888
        if err != nil {
3✔
UNCOV
1889
                return nil, err
×
UNCOV
1890
        }
×
1891

1892
        return channel, nil
3✔
1893
}
1894

1895
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1896
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
3✔
1897

3✔
1898
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
3✔
1899
        if finalHtlcsBucket == nil {
3✔
UNCOV
1900
                return nil, ErrFinalHtlcsBucketNotFound
×
UNCOV
1901
        }
×
1902

1903
        var chanIDBytes [8]byte
3✔
1904
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
3✔
1905

3✔
1906
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
3✔
1907
        if chanBucket == nil {
3✔
1908
                return nil, ErrFinalChannelBucketNotFound
×
1909
        }
×
1910

1911
        return chanBucket, nil
3✔
1912
}
1913

1914
var ErrHtlcUnknown = errors.New("htlc unknown")
1915

1916
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1917
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1918
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1919
        htlcIndex uint64) (*FinalHtlcInfo, error) {
3✔
1920

3✔
1921
        var idBytes [8]byte
3✔
1922
        byteOrder.PutUint64(idBytes[:], htlcIndex)
3✔
1923

3✔
1924
        var settledByte byte
3✔
1925

3✔
1926
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1927
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
3✔
1928
                        tx, chanID,
3✔
1929
                )
3✔
1930
                switch {
3✔
UNCOV
1931
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
×
UNCOV
1932
                        fallthrough
×
1933

UNCOV
1934
                case errors.Is(err, ErrFinalChannelBucketNotFound):
×
UNCOV
1935
                        return ErrHtlcUnknown
×
1936

1937
                case err != nil:
×
1938
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1939
                                err)
×
1940
                }
1941

1942
                value := finalHtlcsBucket.Get(idBytes[:])
3✔
1943
                if value == nil {
3✔
UNCOV
1944
                        return ErrHtlcUnknown
×
UNCOV
1945
                }
×
1946

1947
                if len(value) != 1 {
3✔
1948
                        return errors.New("unexpected final htlc value length")
×
1949
                }
×
1950

1951
                settledByte = value[0]
3✔
1952

3✔
1953
                return nil
3✔
1954
        }, func() {
3✔
1955
                settledByte = 0
3✔
1956
        })
3✔
1957
        if err != nil {
3✔
UNCOV
1958
                return nil, err
×
UNCOV
1959
        }
×
1960

1961
        info := FinalHtlcInfo{
3✔
1962
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
3✔
1963
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
3✔
1964
        }
3✔
1965

3✔
1966
        return &info, nil
3✔
1967
}
1968

1969
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
1970
// the database.
1971
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
1972
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
3✔
1973

3✔
1974
        // Skip if the user did not opt in to storing final resolutions.
3✔
1975
        if !c.parent.storeFinalHtlcResolutions {
6✔
1976
                return nil
3✔
1977
        }
3✔
1978

UNCOV
1979
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
×
UNCOV
1980
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
×
UNCOV
1981
                if err != nil {
×
1982
                        return err
×
1983
                }
×
1984

UNCOV
1985
                return putFinalHtlc(
×
UNCOV
1986
                        finalHtlcsBucket, htlcID,
×
UNCOV
1987
                        FinalHtlcInfo{
×
UNCOV
1988
                                Settled:  settled,
×
UNCOV
1989
                                Offchain: false,
×
UNCOV
1990
                        },
×
UNCOV
1991
                )
×
UNCOV
1992
        }, func() {})
×
1993
}
1994

1995
// MakeTestInvoiceDB is used to create a test invoice database for testing
1996
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
1997
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
UNCOV
1998
        invoices.InvoiceDB, error) {
×
UNCOV
1999

×
UNCOV
2000
        return MakeTestDB(t, modifiers...)
×
UNCOV
2001
}
×
2002

2003
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
2004
// A callback which cleans up the created temporary directories is also
2005
// returned and intended to be executed after the test completes.
UNCOV
2006
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
×
UNCOV
2007
        // First, create a temporary directory to be used for the duration of
×
UNCOV
2008
        // this test.
×
UNCOV
2009
        tempDirName := t.TempDir()
×
UNCOV
2010

×
UNCOV
2011
        // Next, create channeldb for the first time.
×
UNCOV
2012
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
×
UNCOV
2013
        if err != nil {
×
2014
                backendCleanup()
×
2015
                return nil, err
×
2016
        }
×
2017

UNCOV
2018
        cdb, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
2019
        if err != nil {
×
2020
                backendCleanup()
×
2021
                return nil, err
×
2022
        }
×
2023

UNCOV
2024
        t.Cleanup(func() {
×
UNCOV
2025
                cdb.Close()
×
UNCOV
2026
                backendCleanup()
×
UNCOV
2027
        })
×
2028

UNCOV
2029
        return cdb, nil
×
2030
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc