• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13315153898

13 Feb 2025 07:04PM UTC coverage: 58.791% (+9.4%) from 49.357%
13315153898

Pull #9458

github

Crypt-iQ
release-notes: update for 0.19.0
Pull Request #9458: multi+server.go: add initial permissions for some peers

332 of 551 new or added lines in 9 files covered. (60.25%)

25 existing lines in 8 files now uncovered.

136364 of 231946 relevant lines covered (58.79%)

19200.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.85
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "encoding/hex"
7
        "fmt"
8
        "net"
9
        "os"
10
        "testing"
11

12
        "github.com/btcsuite/btcd/btcec/v2"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/walletdb"
15
        "github.com/go-errors/errors"
16
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
17
        "github.com/lightningnetwork/lnd/channeldb/migration12"
18
        "github.com/lightningnetwork/lnd/channeldb/migration13"
19
        "github.com/lightningnetwork/lnd/channeldb/migration16"
20
        "github.com/lightningnetwork/lnd/channeldb/migration20"
21
        "github.com/lightningnetwork/lnd/channeldb/migration21"
22
        "github.com/lightningnetwork/lnd/channeldb/migration23"
23
        "github.com/lightningnetwork/lnd/channeldb/migration24"
24
        "github.com/lightningnetwork/lnd/channeldb/migration25"
25
        "github.com/lightningnetwork/lnd/channeldb/migration26"
26
        "github.com/lightningnetwork/lnd/channeldb/migration27"
27
        "github.com/lightningnetwork/lnd/channeldb/migration29"
28
        "github.com/lightningnetwork/lnd/channeldb/migration30"
29
        "github.com/lightningnetwork/lnd/channeldb/migration31"
30
        "github.com/lightningnetwork/lnd/channeldb/migration32"
31
        "github.com/lightningnetwork/lnd/channeldb/migration33"
32
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
33
        "github.com/lightningnetwork/lnd/clock"
34
        graphdb "github.com/lightningnetwork/lnd/graph/db"
35
        "github.com/lightningnetwork/lnd/invoices"
36
        "github.com/lightningnetwork/lnd/kvdb"
37
        "github.com/lightningnetwork/lnd/lnwire"
38
        "github.com/stretchr/testify/require"
39
)
40

41
const (
42
        dbName = "channel.db"
43
)
44

45
var (
46
        // ErrDryRunMigrationOK signals that a migration executed successful,
47
        // but we intentionally did not commit the result.
48
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
49

50
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
51
        // bucket does not exist.
52
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
53
                "found")
54

55
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
56
        // final htlc outcomes does not exist.
57
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
58
                "bucket not found")
59
)
60

61
// migration is a function which takes a prior outdated version of the database
62
// instances and mutates the key/bucket structure to arrive at a more
63
// up-to-date version of the database.
64
type migration func(tx kvdb.RwTx) error
65

66
// mandatoryVersion defines a db version that must be applied before the lnd
67
// starts.
68
type mandatoryVersion struct {
69
        number    uint32
70
        migration migration
71
}
72

73
// MigrationConfig is an interface combines the config interfaces of all
74
// optional migrations.
75
type MigrationConfig interface {
76
        migration30.MigrateRevLogConfig
77
}
78

79
// MigrationConfigImpl is a super set of all the various migration configs and
80
// an implementation of MigrationConfig.
81
type MigrationConfigImpl struct {
82
        migration30.MigrateRevLogConfigImpl
83
}
84

85
// optionalMigration defines an optional migration function. When a migration
86
// is optional, it usually involves a large scale of changes that might touch
87
// millions of keys. Due to OOM concern, the update cannot be safely done
88
// within one db transaction. Thus, for optional migrations, they must take the
89
// db backend and construct transactions as needed.
90
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
91

92
// optionalVersion defines a db version that can be optionally applied. When
93
// applying migrations, we must apply all the mandatory migrations first before
94
// attempting optional ones.
95
type optionalVersion struct {
96
        name      string
97
        migration optionalMigration
98
}
99

100
var (
101
        // dbVersions is storing all mandatory versions of database. If current
102
        // version of database don't match with latest version this list will
103
        // be used for retrieving all migration function that are need to apply
104
        // to the current db.
105
        dbVersions = []mandatoryVersion{
106
                {
107
                        // The base DB version requires no migration.
108
                        number:    0,
109
                        migration: nil,
110
                },
111
                {
112
                        // The version of the database where two new indexes
113
                        // for the update time of node and channel updates were
114
                        // added.
115
                        number:    1,
116
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
117
                },
118
                {
119
                        // The DB version that added the invoice event time
120
                        // series.
121
                        number:    2,
122
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
123
                },
124
                {
125
                        // The DB version that updated the embedded invoice in
126
                        // outgoing payments to match the new format.
127
                        number:    3,
128
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
129
                },
130
                {
131
                        // The version of the database where every channel
132
                        // always has two entries in the edges bucket. If
133
                        // a policy is unknown, this will be represented
134
                        // by a special byte sequence.
135
                        number:    4,
136
                        migration: migration_01_to_11.MigrateEdgePolicies,
137
                },
138
                {
139
                        // The DB version where we persist each attempt to send
140
                        // an HTLC to a payment hash, and track whether the
141
                        // payment is in-flight, succeeded, or failed.
142
                        number:    5,
143
                        migration: migration_01_to_11.PaymentStatusesMigration,
144
                },
145
                {
146
                        // The DB version that properly prunes stale entries
147
                        // from the edge update index.
148
                        number:    6,
149
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
150
                },
151
                {
152
                        // The DB version that migrates the ChannelCloseSummary
153
                        // to a format where optional fields are indicated with
154
                        // boolean flags.
155
                        number:    7,
156
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
157
                },
158
                {
159
                        // The DB version that changes the gossiper's message
160
                        // store keys to account for the message's type and
161
                        // ShortChannelID.
162
                        number:    8,
163
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
164
                },
165
                {
166
                        // The DB version where the payments and payment
167
                        // statuses are moved to being stored in a combined
168
                        // bucket.
169
                        number:    9,
170
                        migration: migration_01_to_11.MigrateOutgoingPayments,
171
                },
172
                {
173
                        // The DB version where we started to store legacy
174
                        // payload information for all routes, as well as the
175
                        // optional TLV records.
176
                        number:    10,
177
                        migration: migration_01_to_11.MigrateRouteSerialization,
178
                },
179
                {
180
                        // Add invoice htlc and cltv delta fields.
181
                        number:    11,
182
                        migration: migration_01_to_11.MigrateInvoices,
183
                },
184
                {
185
                        // Migrate to TLV invoice bodies, add payment address
186
                        // and features, remove receipt.
187
                        number:    12,
188
                        migration: migration12.MigrateInvoiceTLV,
189
                },
190
                {
191
                        // Migrate to multi-path payments.
192
                        number:    13,
193
                        migration: migration13.MigrateMPP,
194
                },
195
                {
196
                        // Initialize payment address index and begin using it
197
                        // as the default index, falling back to payment hash
198
                        // index.
199
                        number:    14,
200
                        migration: mig.CreateTLB(payAddrIndexBucket),
201
                },
202
                {
203
                        // Initialize payment index bucket which will be used
204
                        // to index payments by sequence number. This index will
205
                        // be used to allow more efficient ListPayments queries.
206
                        number:    15,
207
                        migration: mig.CreateTLB(paymentsIndexBucket),
208
                },
209
                {
210
                        // Add our existing payments to the index bucket created
211
                        // in migration 15.
212
                        number:    16,
213
                        migration: migration16.MigrateSequenceIndex,
214
                },
215
                {
216
                        // Create a top level bucket which will store extra
217
                        // information about channel closes.
218
                        number:    17,
219
                        migration: mig.CreateTLB(closeSummaryBucket),
220
                },
221
                {
222
                        // Create a top level bucket which holds information
223
                        // about our peers.
224
                        number:    18,
225
                        migration: mig.CreateTLB(peersBucket),
226
                },
227
                {
228
                        // Create a top level bucket which holds outpoint
229
                        // information.
230
                        number:    19,
231
                        migration: mig.CreateTLB(outpointBucket),
232
                },
233
                {
234
                        // Migrate some data to the outpoint index.
235
                        number:    20,
236
                        migration: migration20.MigrateOutpointIndex,
237
                },
238
                {
239
                        // Migrate to length prefixed wire messages everywhere
240
                        // in the database.
241
                        number:    21,
242
                        migration: migration21.MigrateDatabaseWireMessages,
243
                },
244
                {
245
                        // Initialize set id index so that invoices can be
246
                        // queried by individual htlc sets.
247
                        number:    22,
248
                        migration: mig.CreateTLB(setIDIndexBucket),
249
                },
250
                {
251
                        number:    23,
252
                        migration: migration23.MigrateHtlcAttempts,
253
                },
254
                {
255
                        // Remove old forwarding packages of closed channels.
256
                        number:    24,
257
                        migration: migration24.MigrateFwdPkgCleanup,
258
                },
259
                {
260
                        // Save the initial local/remote balances in channel
261
                        // info.
262
                        number:    25,
263
                        migration: migration25.MigrateInitialBalances,
264
                },
265
                {
266
                        // Migrate the initial local/remote balance fields into
267
                        // tlv records.
268
                        number:    26,
269
                        migration: migration26.MigrateBalancesToTlvRecords,
270
                },
271
                {
272
                        // Patch the initial local/remote balance fields with
273
                        // empty values for historical channels.
274
                        number:    27,
275
                        migration: migration27.MigrateHistoricalBalances,
276
                },
277
                {
278
                        number:    28,
279
                        migration: mig.CreateTLB(chanIDBucket),
280
                },
281
                {
282
                        number:    29,
283
                        migration: migration29.MigrateChanID,
284
                },
285
                {
286
                        // Removes the "sweeper-last-tx" bucket. Although we
287
                        // do not have a mandatory version 30 we skip this
288
                        // version because its naming is already used for the
289
                        // first optional migration.
290
                        number:    31,
291
                        migration: migration31.DeleteLastPublishedTxTLB,
292
                },
293
                {
294
                        number:    32,
295
                        migration: migration32.MigrateMCRouteSerialisation,
296
                },
297
                {
298
                        number:    33,
299
                        migration: migration33.MigrateMCStoreNameSpacedResults,
300
                },
301
        }
302

303
        // optionalVersions stores all optional migrations that are applied
304
        // after dbVersions.
305
        //
306
        // NOTE: optional migrations must be fault-tolerant and re-run already
307
        // migrated data must be noop, which means the migration must be able
308
        // to determine its state.
309
        optionalVersions = []optionalVersion{
310
                {
311
                        name: "prune revocation log",
312
                        migration: func(db kvdb.Backend,
313
                                cfg MigrationConfig) error {
×
314

×
315
                                return migration30.MigrateRevocationLog(db, cfg)
×
316
                        },
×
317
                },
318
        }
319

320
        // Big endian is the preferred byte order, due to cursor scans over
321
        // integer keys iterating in order.
322
        byteOrder = binary.BigEndian
323

324
        // channelOpeningStateBucket is the database bucket used to store the
325
        // channelOpeningState for each channel that is currently in the process
326
        // of being opened.
327
        channelOpeningStateBucket = []byte("channelOpeningState")
328
)
329

330
// DB is the primary datastore for the lnd daemon. The database stores
331
// information related to nodes, routing data, open/closed channels, fee
332
// schedules, and reputation data.
333
type DB struct {
334
        kvdb.Backend
335

336
        // channelStateDB separates all DB operations on channel state.
337
        channelStateDB *ChannelStateDB
338

339
        dbPath                    string
340
        clock                     clock.Clock
341
        dryRun                    bool
342
        keepFailedPaymentAttempts bool
343
        storeFinalHtlcResolutions bool
344

345
        // noRevLogAmtData if true, means that commitment transaction amount
346
        // data should not be stored in the revocation log.
347
        noRevLogAmtData bool
348
}
349

350
// OpenForTesting opens or creates a channeldb to be used for tests. Any
351
// necessary schemas migrations due to updates will take place as necessary.
352
func OpenForTesting(t testing.TB, dbPath string,
353
        modifiers ...OptionModifier) *DB {
1,453✔
354

1,453✔
355
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
1,453✔
356
                DBPath:            dbPath,
1,453✔
357
                DBFileName:        dbName,
1,453✔
358
                NoFreelistSync:    true,
1,453✔
359
                AutoCompact:       false,
1,453✔
360
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
1,453✔
361
                DBTimeout:         kvdb.DefaultDBTimeout,
1,453✔
362
        })
1,453✔
363
        require.NoError(t, err)
1,453✔
364

1,453✔
365
        db, err := CreateWithBackend(backend, modifiers...)
1,453✔
366
        require.NoError(t, err)
1,453✔
367

1,453✔
368
        db.dbPath = dbPath
1,453✔
369

1,453✔
370
        t.Cleanup(func() {
2,906✔
371
                require.NoError(t, db.Close())
1,453✔
372
        })
1,453✔
373

374
        return db
1,453✔
375
}
376

377
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
378
// Any necessary schemas migrations due to updates will take place as necessary.
379
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
380
        error) {
1,749✔
381

1,749✔
382
        opts := DefaultOptions()
1,749✔
383
        for _, modifier := range modifiers {
1,929✔
384
                modifier(&opts)
180✔
385
        }
180✔
386

387
        if !opts.NoMigration {
3,498✔
388
                if err := initChannelDB(backend); err != nil {
1,750✔
389
                        return nil, err
1✔
390
                }
1✔
391
        }
392

393
        chanDB := &DB{
1,748✔
394
                Backend: backend,
1,748✔
395
                channelStateDB: &ChannelStateDB{
1,748✔
396
                        linkNodeDB: &LinkNodeDB{
1,748✔
397
                                backend: backend,
1,748✔
398
                        },
1,748✔
399
                        backend: backend,
1,748✔
400
                },
1,748✔
401
                clock:                     opts.clock,
1,748✔
402
                dryRun:                    opts.dryRun,
1,748✔
403
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
1,748✔
404
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
1,748✔
405
                noRevLogAmtData:           opts.NoRevLogAmtData,
1,748✔
406
        }
1,748✔
407

1,748✔
408
        // Set the parent pointer (only used in tests).
1,748✔
409
        chanDB.channelStateDB.parent = chanDB
1,748✔
410

1,748✔
411
        // Synchronize the version of database and apply migrations if needed.
1,748✔
412
        if !opts.NoMigration {
3,496✔
413
                if err := chanDB.syncVersions(dbVersions); err != nil {
1,749✔
414
                        backend.Close()
1✔
415
                        return nil, err
1✔
416
                }
1✔
417

418
                // Grab the optional migration config.
419
                omc := opts.OptionalMiragtionConfig
1,747✔
420
                if err := chanDB.applyOptionalVersions(omc); err != nil {
1,747✔
421
                        backend.Close()
×
422
                        return nil, err
×
423
                }
×
424
        }
425

426
        return chanDB, nil
1,747✔
427
}
428

429
// Path returns the file path to the channel database.
430
func (d *DB) Path() string {
197✔
431
        return d.dbPath
197✔
432
}
197✔
433

434
var dbTopLevelBuckets = [][]byte{
435
        openChannelBucket,
436
        closedChannelBucket,
437
        forwardingLogBucket,
438
        fwdPackagesKey,
439
        invoiceBucket,
440
        payAddrIndexBucket,
441
        setIDIndexBucket,
442
        paymentsIndexBucket,
443
        peersBucket,
444
        nodeInfoBucket,
445
        metaBucket,
446
        closeSummaryBucket,
447
        outpointBucket,
448
        chanIDBucket,
449
        historicalChannelBucket,
450
}
451

452
// Wipe completely deletes all saved state within all used buckets within the
453
// database. The deletion is done in a single transaction, therefore this
454
// operation is fully atomic.
455
func (d *DB) Wipe() error {
177✔
456
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
354✔
457
                for _, tlb := range dbTopLevelBuckets {
2,832✔
458
                        err := tx.DeleteTopLevelBucket(tlb)
2,655✔
459
                        if err != nil && err != kvdb.ErrBucketNotFound {
2,655✔
460
                                return err
×
461
                        }
×
462
                }
463
                return nil
177✔
464
        }, func() {})
177✔
465
        if err != nil {
177✔
466
                return err
×
467
        }
×
468

469
        return initChannelDB(d.Backend)
177✔
470
}
471

472
// initChannelDB creates and initializes a fresh version of channeldb. In the
473
// case that the target path has not yet been created or doesn't yet exist, then
474
// the path is created. Additionally, all required top-level buckets used within
475
// the database are created.
476
func initChannelDB(db kvdb.Backend) error {
1,926✔
477
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
3,852✔
478
                // Check if DB was marked as inactive with a tomb stone.
1,926✔
479
                if err := EnsureNoTombstone(tx); err != nil {
1,927✔
480
                        return err
1✔
481
                }
1✔
482

483
                meta := &Meta{}
1,925✔
484
                // Check if DB is already initialized.
1,925✔
485
                err := FetchMeta(meta, tx)
1,925✔
486
                if err == nil {
2,135✔
487
                        return nil
210✔
488
                }
210✔
489

490
                for _, tlb := range dbTopLevelBuckets {
27,446✔
491
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
25,728✔
492
                                return err
×
493
                        }
×
494
                }
495

496
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
1,718✔
497
                return putMeta(meta, tx)
1,718✔
498
        }, func() {})
1,926✔
499
        if err != nil {
1,927✔
500
                return fmt.Errorf("unable to create new channeldb: %w", err)
1✔
501
        }
1✔
502

503
        return nil
1,925✔
504
}
505

506
// fileExists returns true if the file exists, and false otherwise.
507
func fileExists(path string) bool {
1✔
508
        if _, err := os.Stat(path); err != nil {
1✔
509
                if os.IsNotExist(err) {
×
510
                        return false
×
511
                }
×
512
        }
513

514
        return true
1✔
515
}
516

517
// ChannelStateDB is a database that keeps track of all channel state.
518
type ChannelStateDB struct {
519
        // linkNodeDB separates all DB operations on LinkNodes.
520
        linkNodeDB *LinkNodeDB
521

522
        // parent holds a pointer to the "main" channeldb.DB object. This is
523
        // only used for testing and should never be used in production code.
524
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
525
        // this pointer.
526
        parent *DB
527

528
        // backend points to the actual backend holding the channel state
529
        // database. This may be a real backend or a cache middleware.
530
        backend kvdb.Backend
531
}
532

533
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
534
// ChannelStateDB instance. Use this function only in tests where passing around
535
// pointers makes testing less readable. Never to be used in production code!
536
func (c *ChannelStateDB) GetParentDB() *DB {
517✔
537
        return c.parent
517✔
538
}
517✔
539

540
// LinkNodeDB returns the current instance of the link node database.
541
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
3✔
542
        return c.linkNodeDB
3✔
543
}
3✔
544

545
// FetchOpenChannels starts a new database transaction and returns all stored
546
// currently active/open channels associated with the target nodeID. In the case
547
// that no active channels are known to have been created with this node, then a
548
// zero-length slice is returned.
549
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
550
        []*OpenChannel, error) {
256✔
551

256✔
552
        var channels []*OpenChannel
256✔
553
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
512✔
554
                var err error
256✔
555
                channels, err = c.fetchOpenChannels(tx, nodeID)
256✔
556
                return err
256✔
557
        }, func() {
512✔
558
                channels = nil
256✔
559
        })
256✔
560

561
        return channels, err
256✔
562
}
563

564
// fetchOpenChannels uses and existing database transaction and returns all
565
// stored currently active/open channels associated with the target nodeID. In
566
// the case that no active channels are known to have been created with this
567
// node, then a zero-length slice is returned.
568
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
569
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
263✔
570

263✔
571
        // Get the bucket dedicated to storing the metadata for open channels.
263✔
572
        openChanBucket := tx.ReadBucket(openChannelBucket)
263✔
573
        if openChanBucket == nil {
263✔
574
                return nil, nil
×
575
        }
×
576

577
        // Within this top level bucket, fetch the bucket dedicated to storing
578
        // open channel data specific to the remote node.
579
        pub := nodeID.SerializeCompressed()
263✔
580
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
263✔
581
        if nodeChanBucket == nil {
317✔
582
                return nil, nil
54✔
583
        }
54✔
584

585
        // Next, we'll need to go down an additional layer in order to retrieve
586
        // the channels for each chain the node knows of.
587
        var channels []*OpenChannel
212✔
588
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
424✔
589
                // If there's a value, it's not a bucket so ignore it.
212✔
590
                if v != nil {
212✔
591
                        return nil
×
592
                }
×
593

594
                // If we've found a valid chainhash bucket, then we'll retrieve
595
                // that so we can extract all the channels.
596
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
212✔
597
                if chainBucket == nil {
212✔
598
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
599
                                chainHash[:])
×
600
                }
×
601

602
                // Finally, we both of the necessary buckets retrieved, fetch
603
                // all the active channels related to this node.
604
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
212✔
605
                if err != nil {
212✔
606
                        return fmt.Errorf("unable to read channel for "+
×
607
                                "chain_hash=%x, node_key=%x: %v",
×
608
                                chainHash[:], pub, err)
×
609
                }
×
610

611
                channels = append(channels, nodeChannels...)
212✔
612
                return nil
212✔
613
        })
614

615
        return channels, err
212✔
616
}
617

618
// fetchNodeChannels retrieves all active channels from the target chainBucket
619
// which is under a node's dedicated channel bucket. This function is typically
620
// used to fetch all the active channels related to a particular node.
621
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
622
        []*OpenChannel, error) {
718✔
623

718✔
624
        var channels []*OpenChannel
718✔
625

718✔
626
        // A node may have channels on several chains, so for each known chain,
718✔
627
        // we'll extract all the channels.
718✔
628
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
1,497✔
629
                // If there's a value, it's not a bucket so ignore it.
779✔
630
                if v != nil {
779✔
631
                        return nil
×
632
                }
×
633

634
                // Once we've found a valid channel bucket, we'll extract it
635
                // from the node's chain bucket.
636
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
779✔
637

779✔
638
                var outPoint wire.OutPoint
779✔
639
                err := graphdb.ReadOutpoint(
779✔
640
                        bytes.NewReader(chanPoint), &outPoint,
779✔
641
                )
779✔
642
                if err != nil {
779✔
643
                        return err
×
644
                }
×
645
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
779✔
646
                if err != nil {
779✔
647
                        return fmt.Errorf("unable to read channel data for "+
×
648
                                "chan_point=%v: %w", outPoint, err)
×
649
                }
×
650
                oChannel.Db = c
779✔
651

779✔
652
                channels = append(channels, oChannel)
779✔
653

779✔
654
                return nil
779✔
655
        })
656
        if err != nil {
718✔
657
                return nil, err
×
658
        }
×
659

660
        return channels, nil
718✔
661
}
662

663
// FetchChannel attempts to locate a channel specified by the passed channel
664
// point. If the channel cannot be found, then an error will be returned.
665
func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel,
666
        error) {
13✔
667

13✔
668
        var targetChanPoint bytes.Buffer
13✔
669
        err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint)
13✔
670
        if err != nil {
13✔
671
                return nil, err
×
672
        }
×
673

674
        targetChanPointBytes := targetChanPoint.Bytes()
13✔
675
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
13✔
676
                error) {
26✔
677

13✔
678
                return targetChanPointBytes, &chanPoint, nil
13✔
679
        }
13✔
680

681
        return c.channelScanner(nil, selector)
13✔
682
}
683

684
// FetchChannelByID attempts to locate a channel specified by the passed channel
685
// ID. If the channel cannot be found, then an error will be returned.
686
// Optionally an existing db tx can be supplied.
687
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
688
        *OpenChannel, error) {
5✔
689

5✔
690
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
5✔
691
                error) {
10✔
692

5✔
693
                var (
5✔
694
                        targetChanPointBytes []byte
5✔
695
                        targetChanPoint      *wire.OutPoint
5✔
696

5✔
697
                        // errChanFound is used to signal that the channel has
5✔
698
                        // been found so that iteration through the DB buckets
5✔
699
                        // can stop.
5✔
700
                        errChanFound = errors.New("channel found")
5✔
701
                )
5✔
702
                err := chainBkt.ForEach(func(k, _ []byte) error {
10✔
703
                        var outPoint wire.OutPoint
5✔
704
                        err := graphdb.ReadOutpoint(
5✔
705
                                bytes.NewReader(k), &outPoint,
5✔
706
                        )
5✔
707
                        if err != nil {
5✔
708
                                return err
×
709
                        }
×
710

711
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
5✔
712
                        if chanID != id {
6✔
713
                                return nil
1✔
714
                        }
1✔
715

716
                        targetChanPoint = &outPoint
4✔
717
                        targetChanPointBytes = k
4✔
718

4✔
719
                        return errChanFound
4✔
720
                })
721
                if err != nil && !errors.Is(err, errChanFound) {
5✔
722
                        return nil, nil, err
×
723
                }
×
724
                if targetChanPoint == nil {
6✔
725
                        return nil, nil, ErrChannelNotFound
1✔
726
                }
1✔
727

728
                return targetChanPointBytes, targetChanPoint, nil
4✔
729
        }
730

731
        return c.channelScanner(tx, selector)
5✔
732
}
733

734
// ChanCount is used by the server in determining access control.
735
type ChanCount struct {
736
        HasOpenOrClosedChan bool
737
        PendingOpenCount    uint64
738
}
739

740
// FetchPermAndTempPeers returns a map where the key is the remote node's
741
// public key and the value is a struct that has a tally of the pending-open
742
// channels and whether the peer has an open or closed channel with us.
743
func (c *ChannelStateDB) FetchPermAndTempPeers(
744
        chainHash []byte) (map[string]ChanCount, error) {
4✔
745

4✔
746
        peerCounts := make(map[string]ChanCount)
4✔
747

4✔
748
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
8✔
749
                openChanBucket := tx.ReadBucket(openChannelBucket)
4✔
750
                if openChanBucket == nil {
4✔
NEW
751
                        return ErrNoChanDBExists
×
NEW
752
                }
×
753

754
                openChanErr := openChanBucket.ForEach(func(nodePub,
4✔
755
                        v []byte) error {
10✔
756

6✔
757
                        // If there is a value, this is not a bucket.
6✔
758
                        if v != nil {
6✔
NEW
759
                                return nil
×
NEW
760
                        }
×
761

762
                        nodeChanBucket := openChanBucket.NestedReadBucket(
6✔
763
                                nodePub,
6✔
764
                        )
6✔
765
                        if nodeChanBucket == nil {
6✔
NEW
766
                                return nil
×
NEW
767
                        }
×
768

769
                        chainBucket := nodeChanBucket.NestedReadBucket(
6✔
770
                                chainHash,
6✔
771
                        )
6✔
772
                        if chainBucket == nil {
6✔
NEW
773
                                return fmt.Errorf("no chain bucket exists")
×
NEW
774
                        }
×
775

776
                        var isPermPeer bool
6✔
777
                        var pendingOpenCount uint64
6✔
778

6✔
779
                        internalErr := chainBucket.ForEach(func(chanPoint,
6✔
780
                                val []byte) error {
11✔
781

5✔
782
                                // If there is a value, this is not a bucket.
5✔
783
                                if val != nil {
5✔
NEW
784
                                        return nil
×
NEW
785
                                }
×
786

787
                                chanBucket := chainBucket.NestedReadBucket(
5✔
788
                                        chanPoint,
5✔
789
                                )
5✔
790
                                if chanBucket == nil {
5✔
NEW
791
                                        return nil
×
NEW
792
                                }
×
793

794
                                var op wire.OutPoint
5✔
795
                                readErr := graphdb.ReadOutpoint(
5✔
796
                                        bytes.NewReader(chanPoint), &op,
5✔
797
                                )
5✔
798
                                if readErr != nil {
5✔
NEW
799
                                        return readErr
×
NEW
800
                                }
×
801

802
                                // We need to go through each channel and look
803
                                // at the IsPending status.
804
                                openChan, err := fetchOpenChannel(
5✔
805
                                        chanBucket, &op,
5✔
806
                                )
5✔
807
                                if err != nil {
5✔
NEW
808
                                        return err
×
NEW
809
                                }
×
810

811
                                if openChan.IsPending {
9✔
812
                                        // Add to the pending-open count since
4✔
813
                                        // this is a temp peer.
4✔
814
                                        pendingOpenCount++
4✔
815
                                        return nil
4✔
816
                                }
4✔
817

818
                                // Since IsPending is false, this is a perm
819
                                // peer.
820
                                isPermPeer = true
4✔
821

4✔
822
                                return nil
4✔
823
                        })
824
                        if internalErr != nil {
6✔
NEW
825
                                return internalErr
×
NEW
826
                        }
×
827

828
                        nodeStr := hex.EncodeToString(nodePub)
6✔
829

6✔
830
                        peerCount := ChanCount{
6✔
831
                                HasOpenOrClosedChan: isPermPeer,
6✔
832
                                PendingOpenCount:    pendingOpenCount,
6✔
833
                        }
6✔
834
                        peerCounts[nodeStr] = peerCount
6✔
835

6✔
836
                        return nil
6✔
837
                })
838
                if openChanErr != nil {
4✔
NEW
839
                        return openChanErr
×
NEW
840
                }
×
841

842
                // Now check the closed channel bucket.
843
                historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
4✔
844
                if historicalChanBucket == nil {
4✔
NEW
845
                        return ErrNoHistoricalBucket
×
NEW
846
                }
×
847

848
                historicalErr := historicalChanBucket.ForEach(func(chanPoint,
4✔
849
                        v []byte) error {
8✔
850
                        // Parse each nested bucket and the chanInfoKey to get
4✔
851
                        // the IsPending bool. This determines whether the
4✔
852
                        // peer is protected or not.
4✔
853
                        if v != nil {
4✔
NEW
854
                                // This is not a bucket. This is currently not
×
NEW
855
                                // possible.
×
NEW
856
                                return nil
×
NEW
857
                        }
×
858

859
                        chanBucket := historicalChanBucket.NestedReadBucket(
4✔
860
                                chanPoint,
4✔
861
                        )
4✔
862
                        if chanBucket == nil {
4✔
NEW
863
                                // This is not possible.
×
NEW
864
                                return fmt.Errorf("no historical channel " +
×
NEW
865
                                        "bucket exists")
×
NEW
866
                        }
×
867

868
                        var op wire.OutPoint
4✔
869
                        readErr := graphdb.ReadOutpoint(
4✔
870
                                bytes.NewReader(chanPoint), &op,
4✔
871
                        )
4✔
872
                        if readErr != nil {
4✔
NEW
873
                                return readErr
×
NEW
874
                        }
×
875

876
                        channel, fetchErr := fetchOpenChannel(chanBucket, &op)
4✔
877
                        if fetchErr != nil {
4✔
NEW
878
                                return fetchErr
×
NEW
879
                        }
×
880

881
                        // Only include this peer in the protected class if
882
                        // the closing transaction confirmed. Note that
883
                        // CloseChannel can be called in the funding manager
884
                        // while IsPending is true which is why we need this
885
                        // special-casing to not count premature funding
886
                        // manager calls to CloseChannel.
887
                        if !channel.IsPending {
8✔
888
                                // Fetch the public key of the remote node.
4✔
889
                                remotePub := channel.IdentityPub
4✔
890
                                remotePubStr := hex.EncodeToString(
4✔
891
                                        remotePub.SerializeCompressed(),
4✔
892
                                )
4✔
893
                                count, exists := peerCounts[remotePubStr]
4✔
894
                                if exists {
8✔
895
                                        count.HasOpenOrClosedChan = true
4✔
896
                                        peerCounts[remotePubStr] = count
4✔
897
                                } else {
4✔
NEW
898
                                        peerCount := ChanCount{
×
NEW
899
                                                HasOpenOrClosedChan: true,
×
NEW
900
                                        }
×
NEW
901
                                        peerCounts[remotePubStr] = peerCount
×
NEW
902
                                }
×
903
                        }
904

905
                        return nil
4✔
906
                })
907
                if historicalErr != nil {
4✔
NEW
908
                        return historicalErr
×
NEW
909
                }
×
910

911
                return nil
4✔
912
        }, func() {
4✔
913
                clear(peerCounts)
4✔
914
        })
4✔
915

916
        return peerCounts, err
4✔
917
}
918

919
// channelSelector describes a function that takes a chain-hash bucket from
920
// within the open-channel DB and returns the wanted channel point bytes, and
921
// channel point. It must return the ErrChannelNotFound error if the wanted
922
// channel is not in the given bucket.
923
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
924
        error)
925

926
// channelScanner will traverse the DB to each chain-hash bucket of each node
927
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
928
// to fetch the wanted channel outpoint from the chain bucket.
929
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
930
        chanSelect channelSelector) (*OpenChannel, error) {
15✔
931

15✔
932
        var (
15✔
933
                targetChan *OpenChannel
15✔
934

15✔
935
                // errChanFound is used to signal that the channel has been
15✔
936
                // found so that iteration through the DB buckets can stop.
15✔
937
                errChanFound = errors.New("channel found")
15✔
938
        )
15✔
939

15✔
940
        // chanScan will traverse the following bucket structure:
15✔
941
        //  * nodePub => chainHash => chanPoint
15✔
942
        //
15✔
943
        // At each level we go one further, ensuring that we're traversing the
15✔
944
        // proper key (that's actually a bucket). By only reading the bucket
15✔
945
        // structure and skipping fully decoding each channel, we save a good
15✔
946
        // bit of CPU as we don't need to do things like decompress public
15✔
947
        // keys.
15✔
948
        chanScan := func(tx kvdb.RTx) error {
30✔
949
                // Get the bucket dedicated to storing the metadata for open
15✔
950
                // channels.
15✔
951
                openChanBucket := tx.ReadBucket(openChannelBucket)
15✔
952
                if openChanBucket == nil {
15✔
953
                        return ErrNoActiveChannels
×
954
                }
×
955

956
                // Within the node channel bucket, are the set of node pubkeys
957
                // we have channels with, we don't know the entire set, so we'll
958
                // check them all.
959
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
30✔
960
                        // Ensure that this is a key the same size as a pubkey,
15✔
961
                        // and also that it leads directly to a bucket.
15✔
962
                        if len(nodePub) != 33 || v != nil {
15✔
963
                                return nil
×
964
                        }
×
965

966
                        nodeChanBucket := openChanBucket.NestedReadBucket(
15✔
967
                                nodePub,
15✔
968
                        )
15✔
969
                        if nodeChanBucket == nil {
15✔
970
                                return nil
×
971
                        }
×
972

973
                        // The next layer down is all the chains that this node
974
                        // has channels on with us.
975
                        return nodeChanBucket.ForEach(func(chainHash,
15✔
976
                                v []byte) error {
30✔
977

15✔
978
                                // If there's a value, it's not a bucket so
15✔
979
                                // ignore it.
15✔
980
                                if v != nil {
15✔
981
                                        return nil
×
982
                                }
×
983

984
                                chainBucket := nodeChanBucket.NestedReadBucket(
15✔
985
                                        chainHash,
15✔
986
                                )
15✔
987
                                if chainBucket == nil {
15✔
988
                                        return fmt.Errorf("unable to read "+
×
989
                                                "bucket for chain=%x",
×
990
                                                chainHash)
×
991
                                }
×
992

993
                                // Finally, we reach the leaf bucket that stores
994
                                // all the chanPoints for this node.
995
                                targetChanBytes, chanPoint, err := chanSelect(
15✔
996
                                        chainBucket,
15✔
997
                                )
15✔
998
                                if errors.Is(err, ErrChannelNotFound) {
16✔
999
                                        return nil
1✔
1000
                                } else if err != nil {
15✔
1001
                                        return err
×
1002
                                }
×
1003

1004
                                chanBucket := chainBucket.NestedReadBucket(
14✔
1005
                                        targetChanBytes,
14✔
1006
                                )
14✔
1007
                                if chanBucket == nil {
21✔
1008
                                        return nil
7✔
1009
                                }
7✔
1010

1011
                                channel, err := fetchOpenChannel(
10✔
1012
                                        chanBucket, chanPoint,
10✔
1013
                                )
10✔
1014
                                if err != nil {
10✔
1015
                                        return err
×
1016
                                }
×
1017

1018
                                targetChan = channel
10✔
1019
                                targetChan.Db = c
10✔
1020

10✔
1021
                                return errChanFound
10✔
1022
                        })
1023
                })
1024
        }
1025

1026
        var err error
15✔
1027
        if tx == nil {
30✔
1028
                err = kvdb.View(c.backend, chanScan, func() {})
30✔
1029
        } else {
×
1030
                err = chanScan(tx)
×
1031
        }
×
1032
        if err != nil && !errors.Is(err, errChanFound) {
15✔
1033
                return nil, err
×
1034
        }
×
1035

1036
        if targetChan != nil {
25✔
1037
                return targetChan, nil
10✔
1038
        }
10✔
1039

1040
        // If we can't find the channel, then we return with an error, as we
1041
        // have nothing to back up.
1042
        return nil, ErrChannelNotFound
8✔
1043
}
1044

1045
// FetchAllChannels attempts to retrieve all open channels currently stored
1046
// within the database, including pending open, fully open and channels waiting
1047
// for a closing transaction to confirm.
1048
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
566✔
1049
        return fetchChannels(c)
566✔
1050
}
566✔
1051

1052
// FetchAllOpenChannels will return all channels that have the funding
1053
// transaction confirmed, and is not waiting for a closing transaction to be
1054
// confirmed.
1055
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
408✔
1056
        return fetchChannels(
408✔
1057
                c,
408✔
1058
                pendingChannelFilter(false),
408✔
1059
                waitingCloseFilter(false),
408✔
1060
        )
408✔
1061
}
408✔
1062

1063
// FetchPendingChannels will return channels that have completed the process of
1064
// generating and broadcasting funding transactions, but whose funding
1065
// transactions have yet to be confirmed on the blockchain.
1066
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
81✔
1067
        return fetchChannels(c,
81✔
1068
                pendingChannelFilter(true),
81✔
1069
                waitingCloseFilter(false),
81✔
1070
        )
81✔
1071
}
81✔
1072

1073
// FetchWaitingCloseChannels will return all channels that have been opened,
1074
// but are now waiting for a closing transaction to be confirmed.
1075
//
1076
// NOTE: This includes channels that are also pending to be opened.
1077
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
4✔
1078
        return fetchChannels(
4✔
1079
                c, waitingCloseFilter(true),
4✔
1080
        )
4✔
1081
}
4✔
1082

1083
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
1084
// A set of filters can be combined to filter across multiple dimensions.
1085
type fetchChannelsFilter func(channel *OpenChannel) bool
1086

1087
// pendingChannelFilter returns a filter based on whether channels are pending
1088
// (ie, their funding transaction still needs to confirm). If pending is false,
1089
// channels with confirmed funding transactions are returned.
1090
func pendingChannelFilter(pending bool) fetchChannelsFilter {
495✔
1091
        return func(channel *OpenChannel) bool {
720✔
1092
                return channel.IsPending == pending
225✔
1093
        }
225✔
1094
}
1095

1096
// waitingCloseFilter returns a filter which filters channels based on whether
1097
// they are awaiting the confirmation of their closing transaction. If waiting
1098
// close is true, channels that have had their closing tx broadcast are
1099
// included. If it is false, channels that are not awaiting confirmation of
1100
// their close transaction are returned.
1101
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
493✔
1102
        return func(channel *OpenChannel) bool {
704✔
1103
                // If the channel is in any other state than Default,
211✔
1104
                // then it means it is waiting to be closed.
211✔
1105
                channelWaitingClose :=
211✔
1106
                        channel.ChanStatus() != ChanStatusDefault
211✔
1107

211✔
1108
                // Include the channel if it matches the value for
211✔
1109
                // waiting close that we are filtering on.
211✔
1110
                return channelWaitingClose == waitingClose
211✔
1111
        }
211✔
1112
}
1113

1114
// fetchChannels attempts to retrieve channels currently stored in the
1115
// database. It takes a set of filters which are applied to each channel to
1116
// obtain a set of channels with the desired set of properties. Only channels
1117
// which have a true value returned for *all* of the filters will be returned.
1118
// If no filters are provided, every channel in the open channels bucket will
1119
// be returned.
1120
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
1121
        []*OpenChannel, error) {
1,062✔
1122

1,062✔
1123
        var channels []*OpenChannel
1,062✔
1124

1,062✔
1125
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
2,124✔
1126
                // Get the bucket dedicated to storing the metadata for open
1,062✔
1127
                // channels.
1,062✔
1128
                openChanBucket := tx.ReadBucket(openChannelBucket)
1,062✔
1129
                if openChanBucket == nil {
1,062✔
1130
                        return ErrNoActiveChannels
×
1131
                }
×
1132

1133
                // Next, fetch the bucket dedicated to storing metadata related
1134
                // to all nodes. All keys within this bucket are the serialized
1135
                // public keys of all our direct counterparties.
1136
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
1,062✔
1137
                if nodeMetaBucket == nil {
1,062✔
1138
                        return fmt.Errorf("node bucket not created")
×
1139
                }
×
1140

1141
                // Finally for each node public key in the bucket, fetch all
1142
                // the channels related to this particular node.
1143
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
1,571✔
1144
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
509✔
1145
                        if nodeChanBucket == nil {
509✔
1146
                                return nil
×
1147
                        }
×
1148

1149
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
1,018✔
1150
                                // If there's a value, it's not a bucket so
509✔
1151
                                // ignore it.
509✔
1152
                                if v != nil {
509✔
1153
                                        return nil
×
1154
                                }
×
1155

1156
                                // If we've found a valid chainhash bucket,
1157
                                // then we'll retrieve that so we can extract
1158
                                // all the channels.
1159
                                chainBucket := nodeChanBucket.NestedReadBucket(
509✔
1160
                                        chainHash,
509✔
1161
                                )
509✔
1162
                                if chainBucket == nil {
509✔
1163
                                        return fmt.Errorf("unable to read "+
×
1164
                                                "bucket for chain=%x", chainHash[:])
×
1165
                                }
×
1166

1167
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
509✔
1168
                                if err != nil {
509✔
1169
                                        return fmt.Errorf("unable to read "+
×
1170
                                                "channel for chain_hash=%x, "+
×
1171
                                                "node_key=%x: %v", chainHash[:], k, err)
×
1172
                                }
×
1173
                                for _, channel := range nodeChans {
1,055✔
1174
                                        // includeChannel indicates whether the channel
546✔
1175
                                        // meets the criteria specified by our filters.
546✔
1176
                                        includeChannel := true
546✔
1177

546✔
1178
                                        // Run through each filter and check whether the
546✔
1179
                                        // channel should be included.
546✔
1180
                                        for _, f := range filters {
979✔
1181
                                                // If the channel fails the filter, set
433✔
1182
                                                // includeChannel to false and don't bother
433✔
1183
                                                // checking the remaining filters.
433✔
1184
                                                if !f(channel) {
461✔
1185
                                                        includeChannel = false
28✔
1186
                                                        break
28✔
1187
                                                }
1188
                                        }
1189

1190
                                        // If the channel passed every filter, include it in
1191
                                        // our set of channels.
1192
                                        if includeChannel {
1,067✔
1193
                                                channels = append(channels, channel)
521✔
1194
                                        }
521✔
1195
                                }
1196
                                return nil
509✔
1197
                        })
1198

1199
                })
1200
        }, func() {
1,062✔
1201
                channels = nil
1,062✔
1202
        })
1,062✔
1203
        if err != nil {
1,062✔
1204
                return nil, err
×
1205
        }
×
1206

1207
        return channels, nil
1,062✔
1208
}
1209

1210
// FetchClosedChannels attempts to fetch all closed channels from the database.
1211
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1212
// be returned in the response or not. When a channel was cooperatively closed,
1213
// it becomes fully closed after a single confirmation.  When a channel was
1214
// forcibly closed, it will become fully closed after _all_ the pending funds
1215
// (if any) have been swept.
1216
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1217
        []*ChannelCloseSummary, error) {
503✔
1218

503✔
1219
        var chanSummaries []*ChannelCloseSummary
503✔
1220

503✔
1221
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1,006✔
1222
                closeBucket := tx.ReadBucket(closedChannelBucket)
503✔
1223
                if closeBucket == nil {
503✔
1224
                        return ErrNoClosedChannels
×
1225
                }
×
1226

1227
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
526✔
1228
                        summaryReader := bytes.NewReader(summaryBytes)
23✔
1229
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
23✔
1230
                        if err != nil {
23✔
1231
                                return err
×
1232
                        }
×
1233

1234
                        // If the query specified to only include pending
1235
                        // channels, then we'll skip any channels which aren't
1236
                        // currently pending.
1237
                        if !chanSummary.IsPending && pendingOnly {
27✔
1238
                                return nil
4✔
1239
                        }
4✔
1240

1241
                        chanSummaries = append(chanSummaries, chanSummary)
22✔
1242
                        return nil
22✔
1243
                })
1244
        }, func() {
503✔
1245
                chanSummaries = nil
503✔
1246
        }); err != nil {
503✔
1247
                return nil, err
×
1248
        }
×
1249

1250
        return chanSummaries, nil
503✔
1251
}
1252

1253
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1254
// the channeldb.
1255
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1256

1257
// FetchClosedChannel queries for a channel close summary using the channel
1258
// point of the channel in question.
1259
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1260
        *ChannelCloseSummary, error) {
6✔
1261

6✔
1262
        var chanSummary *ChannelCloseSummary
6✔
1263
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
12✔
1264
                closeBucket := tx.ReadBucket(closedChannelBucket)
6✔
1265
                if closeBucket == nil {
6✔
1266
                        return ErrClosedChannelNotFound
×
1267
                }
×
1268

1269
                var b bytes.Buffer
6✔
1270
                var err error
6✔
1271
                if err = graphdb.WriteOutpoint(&b, chanID); err != nil {
6✔
1272
                        return err
×
1273
                }
×
1274

1275
                summaryBytes := closeBucket.Get(b.Bytes())
6✔
1276
                if summaryBytes == nil {
7✔
1277
                        return ErrClosedChannelNotFound
1✔
1278
                }
1✔
1279

1280
                summaryReader := bytes.NewReader(summaryBytes)
5✔
1281
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
5✔
1282

5✔
1283
                return err
5✔
1284
        }, func() {
6✔
1285
                chanSummary = nil
6✔
1286
        }); err != nil {
7✔
1287
                return nil, err
1✔
1288
        }
1✔
1289

1290
        return chanSummary, nil
5✔
1291
}
1292

1293
// FetchClosedChannelForID queries for a channel close summary using the
1294
// channel ID of the channel in question.
1295
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1296
        *ChannelCloseSummary, error) {
105✔
1297

105✔
1298
        var chanSummary *ChannelCloseSummary
105✔
1299
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
210✔
1300
                closeBucket := tx.ReadBucket(closedChannelBucket)
105✔
1301
                if closeBucket == nil {
105✔
1302
                        return ErrClosedChannelNotFound
×
1303
                }
×
1304

1305
                // The first 30 bytes of the channel ID and outpoint will be
1306
                // equal.
1307
                cursor := closeBucket.ReadCursor()
105✔
1308
                op, c := cursor.Seek(cid[:30])
105✔
1309

105✔
1310
                // We scan over all possible candidates for this channel ID.
105✔
1311
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
5,360✔
1312
                        var outPoint wire.OutPoint
5,255✔
1313
                        err := graphdb.ReadOutpoint(
5,255✔
1314
                                bytes.NewReader(op), &outPoint,
5,255✔
1315
                        )
5,255✔
1316
                        if err != nil {
5,255✔
1317
                                return err
×
1318
                        }
×
1319

1320
                        // If the found outpoint does not correspond to this
1321
                        // channel ID, we continue.
1322
                        if !cid.IsChanPoint(&outPoint) {
10,406✔
1323
                                continue
5,151✔
1324
                        }
1325

1326
                        // Deserialize the close summary and return.
1327
                        r := bytes.NewReader(c)
104✔
1328
                        chanSummary, err = deserializeCloseChannelSummary(r)
104✔
1329
                        if err != nil {
104✔
1330
                                return err
×
1331
                        }
×
1332

1333
                        return nil
104✔
1334
                }
1335
                return ErrClosedChannelNotFound
4✔
1336
        }, func() {
105✔
1337
                chanSummary = nil
105✔
1338
        }); err != nil {
109✔
1339
                return nil, err
4✔
1340
        }
4✔
1341

1342
        return chanSummary, nil
104✔
1343
}
1344

1345
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1346
// channel should be marked as fully closed if the channel was initially
1347
// cooperatively closed and it's reached a single confirmation, or after all
1348
// the pending funds in a channel that has been forcibly closed have been
1349
// swept.
1350
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
10✔
1351
        var (
10✔
1352
                openChannels  []*OpenChannel
10✔
1353
                pruneLinkNode *btcec.PublicKey
10✔
1354
        )
10✔
1355
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
20✔
1356
                var b bytes.Buffer
10✔
1357
                if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
10✔
1358
                        return err
×
1359
                }
×
1360

1361
                chanID := b.Bytes()
10✔
1362

10✔
1363
                closedChanBucket, err := tx.CreateTopLevelBucket(
10✔
1364
                        closedChannelBucket,
10✔
1365
                )
10✔
1366
                if err != nil {
10✔
1367
                        return err
×
1368
                }
×
1369

1370
                chanSummaryBytes := closedChanBucket.Get(chanID)
10✔
1371
                if chanSummaryBytes == nil {
10✔
1372
                        return fmt.Errorf("no closed channel for "+
×
1373
                                "chan_point=%v found", chanPoint)
×
1374
                }
×
1375

1376
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
10✔
1377
                chanSummary, err := deserializeCloseChannelSummary(
10✔
1378
                        chanSummaryReader,
10✔
1379
                )
10✔
1380
                if err != nil {
10✔
1381
                        return err
×
1382
                }
×
1383

1384
                chanSummary.IsPending = false
10✔
1385

10✔
1386
                var newSummary bytes.Buffer
10✔
1387
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
10✔
1388
                if err != nil {
10✔
1389
                        return err
×
1390
                }
×
1391

1392
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
10✔
1393
                if err != nil {
10✔
1394
                        return err
×
1395
                }
×
1396

1397
                // Now that the channel is closed, we'll check if we have any
1398
                // other open channels with this peer. If we don't we'll
1399
                // garbage collect it to ensure we don't establish persistent
1400
                // connections to peers without open channels.
1401
                pruneLinkNode = chanSummary.RemotePub
10✔
1402
                openChannels, err = c.fetchOpenChannels(
10✔
1403
                        tx, pruneLinkNode,
10✔
1404
                )
10✔
1405
                if err != nil {
10✔
1406
                        return fmt.Errorf("unable to fetch open channels for "+
×
1407
                                "peer %x: %v",
×
1408
                                pruneLinkNode.SerializeCompressed(), err)
×
1409
                }
×
1410

1411
                return nil
10✔
1412
        }, func() {
10✔
1413
                openChannels = nil
10✔
1414
                pruneLinkNode = nil
10✔
1415
        })
10✔
1416
        if err != nil {
10✔
1417
                return err
×
1418
        }
×
1419

1420
        // Decide whether we want to remove the link node, based upon the number
1421
        // of still open channels.
1422
        return c.pruneLinkNode(openChannels, pruneLinkNode)
10✔
1423
}
1424

1425
// pruneLinkNode determines whether we should garbage collect a link node from
1426
// the database due to no longer having any open channels with it. If there are
1427
// any left, then this acts as a no-op.
1428
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1429
        remotePub *btcec.PublicKey) error {
10✔
1430

10✔
1431
        if len(openChannels) > 0 {
13✔
1432
                return nil
3✔
1433
        }
3✔
1434

1435
        log.Infof("Pruning link node %x with zero open channels from database",
10✔
1436
                remotePub.SerializeCompressed())
10✔
1437

10✔
1438
        return c.linkNodeDB.DeleteLinkNode(remotePub)
10✔
1439
}
1440

1441
// PruneLinkNodes attempts to prune all link nodes found within the database
1442
// with whom we no longer have any open channels with.
1443
func (c *ChannelStateDB) PruneLinkNodes() error {
3✔
1444
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
3✔
1445
        if err != nil {
3✔
1446
                return err
×
1447
        }
×
1448

1449
        for _, linkNode := range allLinkNodes {
6✔
1450
                var (
3✔
1451
                        openChannels []*OpenChannel
3✔
1452
                        linkNode     = linkNode
3✔
1453
                )
3✔
1454
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1455
                        var err error
3✔
1456
                        openChannels, err = c.fetchOpenChannels(
3✔
1457
                                tx, linkNode.IdentityPub,
3✔
1458
                        )
3✔
1459
                        return err
3✔
1460
                }, func() {
6✔
1461
                        openChannels = nil
3✔
1462
                })
3✔
1463
                if err != nil {
3✔
1464
                        return err
×
1465
                }
×
1466

1467
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
3✔
1468
                if err != nil {
3✔
1469
                        return err
×
1470
                }
×
1471
        }
1472

1473
        return nil
3✔
1474
}
1475

1476
// ChannelShell is a shell of a channel that is meant to be used for channel
1477
// recovery purposes. It contains a minimal OpenChannel instance along with
1478
// addresses for that target node.
1479
type ChannelShell struct {
1480
        // NodeAddrs the set of addresses that this node has known to be
1481
        // reachable at in the past.
1482
        NodeAddrs []net.Addr
1483

1484
        // Chan is a shell of an OpenChannel, it contains only the items
1485
        // required to restore the channel on disk.
1486
        Chan *OpenChannel
1487
}
1488

1489
// RestoreChannelShells is a method that allows the caller to reconstruct the
1490
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1491
// new channel to disk, create a LinkNode instance with the passed node
1492
// addresses, and finally create an edge within the graph for the channel as
1493
// well. This method is idempotent, so repeated calls with the same set of
1494
// channel shells won't modify the database after the initial call.
1495
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
4✔
1496
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
8✔
1497
                for _, channelShell := range channelShells {
8✔
1498
                        channel := channelShell.Chan
4✔
1499

4✔
1500
                        // When we make a channel, we mark that the channel has
4✔
1501
                        // been restored, this will signal to other sub-systems
4✔
1502
                        // to not attempt to use the channel as if it was a
4✔
1503
                        // regular one.
4✔
1504
                        channel.chanStatus |= ChanStatusRestored
4✔
1505

4✔
1506
                        // First, we'll attempt to create a new open channel
4✔
1507
                        // and link node for this channel. If the channel
4✔
1508
                        // already exists, then in order to ensure this method
4✔
1509
                        // is idempotent, we'll continue to the next step.
4✔
1510
                        channel.Db = c
4✔
1511
                        err := syncNewChannel(
4✔
1512
                                tx, channel, channelShell.NodeAddrs,
4✔
1513
                        )
4✔
1514
                        if err != nil {
7✔
1515
                                return err
3✔
1516
                        }
3✔
1517
                }
1518

1519
                return nil
4✔
1520
        }, func() {})
4✔
1521
        if err != nil {
7✔
1522
                return err
3✔
1523
        }
3✔
1524

1525
        return nil
4✔
1526
}
1527

1528
// AddrsForNode consults the channel database for all addresses known to the
1529
// passed node public key. The returned boolean indicates if the given node is
1530
// unknown to the channel DB or not.
1531
//
1532
// NOTE: this is part of the AddrSource interface.
1533
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr, error) {
4✔
1534
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
4✔
1535
        // Only if the error is something other than ErrNodeNotFound do we
4✔
1536
        // return it.
4✔
1537
        switch {
4✔
1538
        case err != nil && !errors.Is(err, ErrNodeNotFound):
×
1539
                return false, nil, err
×
1540

1541
        case errors.Is(err, ErrNodeNotFound):
×
1542
                return false, nil, nil
×
1543
        }
1544

1545
        return true, linkNode.Addresses, nil
4✔
1546
}
1547

1548
// AbandonChannel attempts to remove the target channel from the open channel
1549
// database. If the channel was already removed (has a closed channel entry),
1550
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1551
// into the database.
1552
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1553
        bestHeight uint32) error {
7✔
1554

7✔
1555
        // With the chanPoint constructed, we'll attempt to find the target
7✔
1556
        // channel in the database. If we can't find the channel, then we'll
7✔
1557
        // return the error back to the caller.
7✔
1558
        dbChan, err := c.FetchChannel(*chanPoint)
7✔
1559
        switch {
7✔
1560
        // If the channel wasn't found, then it's possible that it was already
1561
        // abandoned from the database.
1562
        case err == ErrChannelNotFound:
5✔
1563
                _, closedErr := c.FetchClosedChannel(chanPoint)
5✔
1564
                if closedErr != nil {
6✔
1565
                        return closedErr
1✔
1566
                }
1✔
1567

1568
                // If the channel was already closed, then we don't return an
1569
                // error as we'd like this step to be repeatable.
1570
                return nil
4✔
1571
        case err != nil:
×
1572
                return err
×
1573
        }
1574

1575
        // Now that we've found the channel, we'll populate a close summary for
1576
        // the channel, so we can store as much information for this abounded
1577
        // channel as possible. We also ensure that we set Pending to false, to
1578
        // indicate that this channel has been "fully" closed.
1579
        summary := &ChannelCloseSummary{
5✔
1580
                CloseType:               Abandoned,
5✔
1581
                ChanPoint:               *chanPoint,
5✔
1582
                ChainHash:               dbChan.ChainHash,
5✔
1583
                CloseHeight:             bestHeight,
5✔
1584
                RemotePub:               dbChan.IdentityPub,
5✔
1585
                Capacity:                dbChan.Capacity,
5✔
1586
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
5✔
1587
                ShortChanID:             dbChan.ShortChanID(),
5✔
1588
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
5✔
1589
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
5✔
1590
                LocalChanConfig:         dbChan.LocalChanCfg,
5✔
1591
        }
5✔
1592

5✔
1593
        // Finally, we'll close the channel in the DB, and return back to the
5✔
1594
        // caller. We set ourselves as the close initiator because we abandoned
5✔
1595
        // the channel.
5✔
1596
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
5✔
1597
}
1598

1599
// SaveChannelOpeningState saves the serialized channel state for the provided
1600
// chanPoint to the channelOpeningStateBucket.
1601
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1602
        serializedState []byte) error {
95✔
1603

95✔
1604
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
190✔
1605
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
95✔
1606
                if err != nil {
95✔
1607
                        return err
×
1608
                }
×
1609

1610
                return bucket.Put(outPoint, serializedState)
95✔
1611
        }, func() {})
95✔
1612
}
1613

1614
// GetChannelOpeningState fetches the serialized channel state for the provided
1615
// outPoint from the database, or returns ErrChannelNotFound if the channel
1616
// is not found.
1617
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1618
        error) {
254✔
1619

254✔
1620
        var serializedState []byte
254✔
1621
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
508✔
1622
                bucket := tx.ReadBucket(channelOpeningStateBucket)
254✔
1623
                if bucket == nil {
257✔
1624
                        // If the bucket does not exist, it means we never added
3✔
1625
                        //  a channel to the db, so return ErrChannelNotFound.
3✔
1626
                        return ErrChannelNotFound
3✔
1627
                }
3✔
1628

1629
                stateBytes := bucket.Get(outPoint)
254✔
1630
                if stateBytes == nil {
304✔
1631
                        return ErrChannelNotFound
50✔
1632
                }
50✔
1633

1634
                serializedState = append(serializedState, stateBytes...)
207✔
1635

207✔
1636
                return nil
207✔
1637
        }, func() {
254✔
1638
                serializedState = nil
254✔
1639
        })
254✔
1640
        return serializedState, err
254✔
1641
}
1642

1643
// DeleteChannelOpeningState removes any state for outPoint from the database.
1644
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
27✔
1645
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
54✔
1646
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
27✔
1647
                if bucket == nil {
27✔
1648
                        return ErrChannelNotFound
×
1649
                }
×
1650

1651
                return bucket.Delete(outPoint)
27✔
1652
        }, func() {})
27✔
1653
}
1654

1655
// syncVersions function is used for safe db version synchronization. It
1656
// applies migration functions to the current database and recovers the
1657
// previous state of db if at least one error/panic appeared during migration.
1658
func (d *DB) syncVersions(versions []mandatoryVersion) error {
1,752✔
1659
        meta, err := d.FetchMeta()
1,752✔
1660
        if err != nil {
1,752✔
1661
                if err == ErrMetaNotFound {
×
1662
                        meta = &Meta{}
×
1663
                } else {
×
1664
                        return err
×
1665
                }
×
1666
        }
1667

1668
        latestVersion := getLatestDBVersion(versions)
1,752✔
1669
        log.Infof("Checking for schema update: latest_version=%v, "+
1,752✔
1670
                "db_version=%v", latestVersion, meta.DbVersionNumber)
1,752✔
1671

1,752✔
1672
        switch {
1,752✔
1673

1674
        // If the database reports a higher version that we are aware of, the
1675
        // user is probably trying to revert to a prior version of lnd. We fail
1676
        // here to prevent reversions and unintended corruption.
1677
        case meta.DbVersionNumber > latestVersion:
1✔
1678
                log.Errorf("Refusing to revert from db_version=%d to "+
1✔
1679
                        "lower version=%d", meta.DbVersionNumber,
1✔
1680
                        latestVersion)
1✔
1681
                return ErrDBReversion
1✔
1682

1683
        // If the current database version matches the latest version number,
1684
        // then we don't need to perform any migrations.
1685
        case meta.DbVersionNumber == latestVersion:
1,747✔
1686
                return nil
1,747✔
1687
        }
1688

1689
        log.Infof("Performing database schema migration")
4✔
1690

4✔
1691
        // Otherwise, we fetch the migrations which need to applied, and
4✔
1692
        // execute them serially within a single database transaction to ensure
4✔
1693
        // the migration is atomic.
4✔
1694
        migrations, migrationVersions := getMigrationsToApply(
4✔
1695
                versions, meta.DbVersionNumber,
4✔
1696
        )
4✔
1697
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
8✔
1698
                for i, migration := range migrations {
8✔
1699
                        if migration == nil {
4✔
1700
                                continue
×
1701
                        }
1702

1703
                        log.Infof("Applying migration #%v",
4✔
1704
                                migrationVersions[i])
4✔
1705

4✔
1706
                        if err := migration(tx); err != nil {
5✔
1707
                                log.Infof("Unable to apply migration #%v",
1✔
1708
                                        migrationVersions[i])
1✔
1709
                                return err
1✔
1710
                        }
1✔
1711
                }
1712

1713
                meta.DbVersionNumber = latestVersion
2✔
1714
                err := putMeta(meta, tx)
2✔
1715
                if err != nil {
2✔
1716
                        return err
×
1717
                }
×
1718

1719
                // In dry-run mode, return an error to prevent the transaction
1720
                // from committing.
1721
                if d.dryRun {
3✔
1722
                        return ErrDryRunMigrationOK
1✔
1723
                }
1✔
1724

1725
                return nil
1✔
1726
        }, func() {})
4✔
1727
}
1728

1729
// applyOptionalVersions takes a config to determine whether the optional
1730
// migrations will be applied.
1731
//
1732
// NOTE: only support the prune_revocation_log optional migration atm.
1733
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
1,750✔
1734
        // TODO(yy): need to design the db to support dry run for optional
1,750✔
1735
        // migrations.
1,750✔
1736
        if d.dryRun {
1,751✔
1737
                log.Info("Skipped optional migrations as dry run mode is not " +
1✔
1738
                        "supported yet")
1✔
1739
                return nil
1✔
1740
        }
1✔
1741

1742
        om, err := d.fetchOptionalMeta()
1,749✔
1743
        if err != nil {
1,749✔
1744
                if err == ErrMetaNotFound {
×
1745
                        om = &OptionalMeta{
×
1746
                                Versions: make(map[uint64]string),
×
1747
                        }
×
1748
                } else {
×
1749
                        return err
×
1750
                }
×
1751
        }
1752

1753
        log.Infof("Checking for optional update: prune_revocation_log=%v, "+
1,749✔
1754
                "db_version=%s", cfg.PruneRevocationLog, om)
1,749✔
1755

1,749✔
1756
        // Exit early if the optional migration is not specified.
1,749✔
1757
        if !cfg.PruneRevocationLog {
3,496✔
1758
                return nil
1,747✔
1759
        }
1,747✔
1760

1761
        // Exit early if the optional migration has already been applied.
1762
        if _, ok := om.Versions[0]; ok {
3✔
1763
                return nil
1✔
1764
        }
1✔
1765

1766
        // Get the optional version.
1767
        version := optionalVersions[0]
1✔
1768
        log.Infof("Performing database optional migration: %s", version.name)
1✔
1769

1✔
1770
        migrationCfg := &MigrationConfigImpl{
1✔
1771
                migration30.MigrateRevLogConfigImpl{
1✔
1772
                        NoAmountData: d.noRevLogAmtData,
1✔
1773
                },
1✔
1774
        }
1✔
1775

1✔
1776
        // Migrate the data.
1✔
1777
        if err := version.migration(d, migrationCfg); err != nil {
1✔
1778
                log.Errorf("Unable to apply optional migration: %s, error: %v",
×
1779
                        version.name, err)
×
1780
                return err
×
1781
        }
×
1782

1783
        // Update the optional meta. Notice that unlike the mandatory db
1784
        // migrations where we perform the migration and updating meta in a
1785
        // single db transaction, we use different transactions here. Even when
1786
        // the following update is failed, we should be fine here as we would
1787
        // re-run the optional migration again, which is a noop, during next
1788
        // startup.
1789
        om.Versions[0] = version.name
1✔
1790
        if err := d.putOptionalMeta(om); err != nil {
1✔
1791
                log.Errorf("Unable to update optional meta: %v", err)
×
1792
                return err
×
1793
        }
×
1794

1795
        return nil
1✔
1796
}
1797

1798
// ChannelStateDB returns the sub database that is concerned with the channel
1799
// state.
1800
func (d *DB) ChannelStateDB() *ChannelStateDB {
2,169✔
1801
        return d.channelStateDB
2,169✔
1802
}
2,169✔
1803

1804
// LatestDBVersion returns the number of the latest database version currently
1805
// known to the channel DB.
1806
func LatestDBVersion() uint32 {
1✔
1807
        return getLatestDBVersion(dbVersions)
1✔
1808
}
1✔
1809

1810
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
3,471✔
1811
        return versions[len(versions)-1].number
3,471✔
1812
}
3,471✔
1813

1814
// getMigrationsToApply retrieves the migration function that should be
1815
// applied to the database.
1816
func getMigrationsToApply(versions []mandatoryVersion,
1817
        version uint32) ([]migration, []uint32) {
5✔
1818

5✔
1819
        migrations := make([]migration, 0, len(versions))
5✔
1820
        migrationVersions := make([]uint32, 0, len(versions))
5✔
1821

5✔
1822
        for _, v := range versions {
17✔
1823
                if v.number > version {
18✔
1824
                        migrations = append(migrations, v.migration)
6✔
1825
                        migrationVersions = append(migrationVersions, v.number)
6✔
1826
                }
6✔
1827
        }
1828

1829
        return migrations, migrationVersions
5✔
1830
}
1831

1832
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1833
// from the historical channel bucket. If the bucket does not exist,
1834
// ErrNoHistoricalBucket is returned.
1835
func fetchHistoricalChanBucket(tx kvdb.RTx,
1836
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
7✔
1837

7✔
1838
        // First fetch the top level bucket which stores all data related to
7✔
1839
        // historically stored channels.
7✔
1840
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
7✔
1841
        if historicalChanBucket == nil {
7✔
1842
                return nil, ErrNoHistoricalBucket
×
1843
        }
×
1844

1845
        // With the bucket for the node and chain fetched, we can now go down
1846
        // another level, for the channel itself.
1847
        var chanPointBuf bytes.Buffer
7✔
1848
        if err := graphdb.WriteOutpoint(&chanPointBuf, outPoint); err != nil {
7✔
1849
                return nil, err
×
1850
        }
×
1851
        chanBucket := historicalChanBucket.NestedReadBucket(
7✔
1852
                chanPointBuf.Bytes(),
7✔
1853
        )
7✔
1854
        if chanBucket == nil {
9✔
1855
                return nil, ErrChannelNotFound
2✔
1856
        }
2✔
1857

1858
        return chanBucket, nil
5✔
1859
}
1860

1861
// FetchHistoricalChannel fetches open channel data from the historical channel
1862
// bucket.
1863
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1864
        *OpenChannel, error) {
7✔
1865

7✔
1866
        var channel *OpenChannel
7✔
1867
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
14✔
1868
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
7✔
1869
                if err != nil {
9✔
1870
                        return err
2✔
1871
                }
2✔
1872

1873
                channel, err = fetchOpenChannel(chanBucket, outPoint)
5✔
1874
                if err != nil {
5✔
1875
                        return err
×
1876
                }
×
1877

1878
                channel.Db = c
5✔
1879
                return nil
5✔
1880
        }, func() {
7✔
1881
                channel = nil
7✔
1882
        })
7✔
1883
        if err != nil {
9✔
1884
                return nil, err
2✔
1885
        }
2✔
1886

1887
        return channel, nil
5✔
1888
}
1889

1890
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1891
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
13✔
1892

13✔
1893
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
13✔
1894
        if finalHtlcsBucket == nil {
19✔
1895
                return nil, ErrFinalHtlcsBucketNotFound
6✔
1896
        }
6✔
1897

1898
        var chanIDBytes [8]byte
7✔
1899
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
7✔
1900

7✔
1901
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
7✔
1902
        if chanBucket == nil {
7✔
1903
                return nil, ErrFinalChannelBucketNotFound
×
1904
        }
×
1905

1906
        return chanBucket, nil
7✔
1907
}
1908

1909
var ErrHtlcUnknown = errors.New("htlc unknown")
1910

1911
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1912
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1913
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1914
        htlcIndex uint64) (*FinalHtlcInfo, error) {
13✔
1915

13✔
1916
        var idBytes [8]byte
13✔
1917
        byteOrder.PutUint64(idBytes[:], htlcIndex)
13✔
1918

13✔
1919
        var settledByte byte
13✔
1920

13✔
1921
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
26✔
1922
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
13✔
1923
                        tx, chanID,
13✔
1924
                )
13✔
1925
                switch {
13✔
1926
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
6✔
1927
                        fallthrough
6✔
1928

1929
                case errors.Is(err, ErrFinalChannelBucketNotFound):
6✔
1930
                        return ErrHtlcUnknown
6✔
1931

1932
                case err != nil:
×
1933
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1934
                                err)
×
1935
                }
1936

1937
                value := finalHtlcsBucket.Get(idBytes[:])
7✔
1938
                if value == nil {
8✔
1939
                        return ErrHtlcUnknown
1✔
1940
                }
1✔
1941

1942
                if len(value) != 1 {
6✔
1943
                        return errors.New("unexpected final htlc value length")
×
1944
                }
×
1945

1946
                settledByte = value[0]
6✔
1947

6✔
1948
                return nil
6✔
1949
        }, func() {
13✔
1950
                settledByte = 0
13✔
1951
        })
13✔
1952
        if err != nil {
20✔
1953
                return nil, err
7✔
1954
        }
7✔
1955

1956
        info := FinalHtlcInfo{
6✔
1957
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
6✔
1958
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
6✔
1959
        }
6✔
1960

6✔
1961
        return &info, nil
6✔
1962
}
1963

1964
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
1965
// the database.
1966
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
1967
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
4✔
1968

4✔
1969
        // Skip if the user did not opt in to storing final resolutions.
4✔
1970
        if !c.parent.storeFinalHtlcResolutions {
7✔
1971
                return nil
3✔
1972
        }
3✔
1973

1974
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2✔
1975
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
1✔
1976
                if err != nil {
1✔
1977
                        return err
×
1978
                }
×
1979

1980
                return putFinalHtlc(
1✔
1981
                        finalHtlcsBucket, htlcID,
1✔
1982
                        FinalHtlcInfo{
1✔
1983
                                Settled:  settled,
1✔
1984
                                Offchain: false,
1✔
1985
                        },
1✔
1986
                )
1✔
1987
        }, func() {})
1✔
1988
}
1989

1990
// MakeTestInvoiceDB is used to create a test invoice database for testing
1991
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
1992
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
1993
        invoices.InvoiceDB, error) {
154✔
1994

154✔
1995
        return MakeTestDB(t, modifiers...)
154✔
1996
}
154✔
1997

1998
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
1999
// A callback which cleans up the created temporary directories is also
2000
// returned and intended to be executed after the test completes.
2001
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
288✔
2002
        // First, create a temporary directory to be used for the duration of
288✔
2003
        // this test.
288✔
2004
        tempDirName := t.TempDir()
288✔
2005

288✔
2006
        // Next, create channeldb for the first time.
288✔
2007
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
288✔
2008
        if err != nil {
288✔
2009
                backendCleanup()
×
2010
                return nil, err
×
2011
        }
×
2012

2013
        cdb, err := CreateWithBackend(backend, modifiers...)
288✔
2014
        if err != nil {
288✔
2015
                backendCleanup()
×
2016
                return nil, err
×
2017
        }
×
2018

2019
        t.Cleanup(func() {
576✔
2020
                cdb.Close()
288✔
2021
                backendCleanup()
288✔
2022
        })
288✔
2023

2024
        return cdb, nil
288✔
2025
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc