• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16982438758

15 Aug 2025 03:47AM UTC coverage: 66.748% (-0.03%) from 66.776%
16982438758

Pull #9455

github

web-flow
Merge 9a33d5e7b into 31fc55650
Pull Request #9455: [1/2] discovery+lnwire: add support for DNS host name in NodeAnnouncement msg

209 of 304 new or added lines in 8 files covered. (68.75%)

1268 existing lines in 30 files now uncovered.

136026 of 203789 relevant lines covered (66.75%)

21540.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.73
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "net"
10
        "os"
11
        "testing"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/btcsuite/btcwallet/walletdb"
16
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
17
        "github.com/lightningnetwork/lnd/channeldb/migration12"
18
        "github.com/lightningnetwork/lnd/channeldb/migration13"
19
        "github.com/lightningnetwork/lnd/channeldb/migration16"
20
        "github.com/lightningnetwork/lnd/channeldb/migration20"
21
        "github.com/lightningnetwork/lnd/channeldb/migration21"
22
        "github.com/lightningnetwork/lnd/channeldb/migration23"
23
        "github.com/lightningnetwork/lnd/channeldb/migration24"
24
        "github.com/lightningnetwork/lnd/channeldb/migration25"
25
        "github.com/lightningnetwork/lnd/channeldb/migration26"
26
        "github.com/lightningnetwork/lnd/channeldb/migration27"
27
        "github.com/lightningnetwork/lnd/channeldb/migration29"
28
        "github.com/lightningnetwork/lnd/channeldb/migration30"
29
        "github.com/lightningnetwork/lnd/channeldb/migration31"
30
        "github.com/lightningnetwork/lnd/channeldb/migration32"
31
        "github.com/lightningnetwork/lnd/channeldb/migration33"
32
        "github.com/lightningnetwork/lnd/channeldb/migration34"
33
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
34
        "github.com/lightningnetwork/lnd/clock"
35
        graphdb "github.com/lightningnetwork/lnd/graph/db"
36
        "github.com/lightningnetwork/lnd/invoices"
37
        "github.com/lightningnetwork/lnd/kvdb"
38
        "github.com/lightningnetwork/lnd/lnwire"
39
        "github.com/stretchr/testify/require"
40
)
41

42
const (
43
        dbName = "channel.db"
44
)
45

46
var (
47
        // ErrDryRunMigrationOK signals that a migration executed successful,
48
        // but we intentionally did not commit the result.
49
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
50

51
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
52
        // bucket does not exist.
53
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
54
                "found")
55

56
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
57
        // final htlc outcomes does not exist.
58
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
59
                "bucket not found")
60
)
61

62
// migration is a function which takes a prior outdated version of the database
63
// instances and mutates the key/bucket structure to arrive at a more
64
// up-to-date version of the database.
65
type migration func(tx kvdb.RwTx) error
66

67
// mandatoryVersion defines a db version that must be applied before the lnd
68
// starts.
69
type mandatoryVersion struct {
70
        number    uint32
71
        migration migration
72
}
73

74
// MigrationConfig is an interface combines the config interfaces of all
75
// optional migrations.
76
type MigrationConfig interface {
77
        migration30.MigrateRevLogConfig
78
        migration34.MigrationConfig
79
}
80

81
// MigrationConfigImpl is a super set of all the various migration configs and
82
// an implementation of MigrationConfig.
83
type MigrationConfigImpl struct {
84
        migration30.MigrateRevLogConfigImpl
85
        migration34.MigrationConfigImpl
86
}
87

88
// optionalMigration defines an optional migration function. When a migration
89
// is optional, it usually involves a large scale of changes that might touch
90
// millions of keys. Due to OOM concern, the update cannot be safely done
91
// within one db transaction. Thus, for optional migrations, they must take the
92
// db backend and construct transactions as needed.
93
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
94

95
// optionalVersion defines a db version that can be optionally applied. When
96
// applying migrations, we must apply all the mandatory migrations first before
97
// attempting optional ones.
98
type optionalVersion struct {
99
        name      string
100
        migration optionalMigration
101
}
102

103
var (
104
        // dbVersions is storing all mandatory versions of database. If current
105
        // version of database don't match with latest version this list will
106
        // be used for retrieving all migration function that are need to apply
107
        // to the current db.
108
        dbVersions = []mandatoryVersion{
109
                {
110
                        // The base DB version requires no migration.
111
                        number:    0,
112
                        migration: nil,
113
                },
114
                {
115
                        // The version of the database where two new indexes
116
                        // for the update time of node and channel updates were
117
                        // added.
118
                        number:    1,
119
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
120
                },
121
                {
122
                        // The DB version that added the invoice event time
123
                        // series.
124
                        number:    2,
125
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
126
                },
127
                {
128
                        // The DB version that updated the embedded invoice in
129
                        // outgoing payments to match the new format.
130
                        number:    3,
131
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
132
                },
133
                {
134
                        // The version of the database where every channel
135
                        // always has two entries in the edges bucket. If
136
                        // a policy is unknown, this will be represented
137
                        // by a special byte sequence.
138
                        number:    4,
139
                        migration: migration_01_to_11.MigrateEdgePolicies,
140
                },
141
                {
142
                        // The DB version where we persist each attempt to send
143
                        // an HTLC to a payment hash, and track whether the
144
                        // payment is in-flight, succeeded, or failed.
145
                        number:    5,
146
                        migration: migration_01_to_11.PaymentStatusesMigration,
147
                },
148
                {
149
                        // The DB version that properly prunes stale entries
150
                        // from the edge update index.
151
                        number:    6,
152
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
153
                },
154
                {
155
                        // The DB version that migrates the ChannelCloseSummary
156
                        // to a format where optional fields are indicated with
157
                        // boolean flags.
158
                        number:    7,
159
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
160
                },
161
                {
162
                        // The DB version that changes the gossiper's message
163
                        // store keys to account for the message's type and
164
                        // ShortChannelID.
165
                        number:    8,
166
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
167
                },
168
                {
169
                        // The DB version where the payments and payment
170
                        // statuses are moved to being stored in a combined
171
                        // bucket.
172
                        number:    9,
173
                        migration: migration_01_to_11.MigrateOutgoingPayments,
174
                },
175
                {
176
                        // The DB version where we started to store legacy
177
                        // payload information for all routes, as well as the
178
                        // optional TLV records.
179
                        number:    10,
180
                        migration: migration_01_to_11.MigrateRouteSerialization,
181
                },
182
                {
183
                        // Add invoice htlc and cltv delta fields.
184
                        number:    11,
185
                        migration: migration_01_to_11.MigrateInvoices,
186
                },
187
                {
188
                        // Migrate to TLV invoice bodies, add payment address
189
                        // and features, remove receipt.
190
                        number:    12,
191
                        migration: migration12.MigrateInvoiceTLV,
192
                },
193
                {
194
                        // Migrate to multi-path payments.
195
                        number:    13,
196
                        migration: migration13.MigrateMPP,
197
                },
198
                {
199
                        // Initialize payment address index and begin using it
200
                        // as the default index, falling back to payment hash
201
                        // index.
202
                        number:    14,
203
                        migration: mig.CreateTLB(payAddrIndexBucket),
204
                },
205
                {
206
                        // Initialize payment index bucket which will be used
207
                        // to index payments by sequence number. This index will
208
                        // be used to allow more efficient ListPayments queries.
209
                        number:    15,
UNCOV
210
                        migration: mig.CreateTLB(paymentsIndexBucket),
×
UNCOV
211
                },
×
UNCOV
212
                {
×
213
                        // Add our existing payments to the index bucket created
214
                        // in migration 15.
215
                        number:    16,
216
                        migration: migration16.MigrateSequenceIndex,
217
                },
218
                {
219
                        // Create a top level bucket which will store extra
220
                        // information about channel closes.
221
                        number:    17,
222
                        migration: mig.CreateTLB(closeSummaryBucket),
223
                },
224
                {
225
                        // Create a top level bucket which holds information
226
                        // about our peers.
227
                        number:    18,
228
                        migration: mig.CreateTLB(peersBucket),
229
                },
230
                {
231
                        // Create a top level bucket which holds outpoint
232
                        // information.
233
                        number:    19,
234
                        migration: mig.CreateTLB(outpointBucket),
235
                },
236
                {
237
                        // Migrate some data to the outpoint index.
238
                        number:    20,
239
                        migration: migration20.MigrateOutpointIndex,
240
                },
241
                {
242
                        // Migrate to length prefixed wire messages everywhere
243
                        // in the database.
244
                        number:    21,
245
                        migration: migration21.MigrateDatabaseWireMessages,
246
                },
247
                {
248
                        // Initialize set id index so that invoices can be
249
                        // queried by individual htlc sets.
250
                        number:    22,
251
                        migration: mig.CreateTLB(setIDIndexBucket),
252
                },
253
                {
254
                        number:    23,
255
                        migration: migration23.MigrateHtlcAttempts,
256
                },
257
                {
258
                        // Remove old forwarding packages of closed channels.
259
                        number:    24,
260
                        migration: migration24.MigrateFwdPkgCleanup,
261
                },
262
                {
263
                        // Save the initial local/remote balances in channel
264
                        // info.
265
                        number:    25,
266
                        migration: migration25.MigrateInitialBalances,
267
                },
268
                {
269
                        // Migrate the initial local/remote balance fields into
270
                        // tlv records.
271
                        number:    26,
272
                        migration: migration26.MigrateBalancesToTlvRecords,
273
                },
274
                {
275
                        // Patch the initial local/remote balance fields with
276
                        // empty values for historical channels.
277
                        number:    27,
278
                        migration: migration27.MigrateHistoricalBalances,
279
                },
280
                {
281
                        number:    28,
282
                        migration: mig.CreateTLB(chanIDBucket),
283
                },
284
                {
285
                        number:    29,
286
                        migration: migration29.MigrateChanID,
287
                },
288
                {
289
                        // Removes the "sweeper-last-tx" bucket. Although we
290
                        // do not have a mandatory version 30 we skip this
291
                        // version because its naming is already used for the
292
                        // first optional migration.
293
                        number:    31,
294
                        migration: migration31.DeleteLastPublishedTxTLB,
295
                },
296
                {
297
                        number:    32,
298
                        migration: migration32.MigrateMCRouteSerialisation,
299
                },
300
                {
301
                        number:    33,
302
                        migration: migration33.MigrateMCStoreNameSpacedResults,
303
                },
304
        }
305

306
        // optionalVersions stores all optional migrations that are applied
307
        // after dbVersions.
308
        //
309
        // NOTE: optional migrations must be fault-tolerant and re-run already
310
        // migrated data must be noop, which means the migration must be able
311
        // to determine its state.
312
        optionalVersions = []optionalVersion{
313
                {
314
                        name: "prune_revocation_log",
315
                        migration: func(db kvdb.Backend,
316
                                cfg MigrationConfig) error {
317

318
                                return migration30.MigrateRevocationLog(db, cfg)
×
319
                        },
×
UNCOV
320
                },
×
UNCOV
321
                {
×
322
                        name: "gc_decayed_log",
323
                        migration: func(db kvdb.Backend,
324
                                cfg MigrationConfig) error {
325

326
                                return migration34.MigrateDecayedLog(
3✔
327
                                        db, cfg,
3✔
328
                                )
3✔
329
                        },
3✔
330
                },
3✔
331
        }
3✔
332

333
        // Big endian is the preferred byte order, due to cursor scans over
334
        // integer keys iterating in order.
335
        byteOrder = binary.BigEndian
336

337
        // channelOpeningStateBucket is the database bucket used to store the
338
        // channelOpeningState for each channel that is currently in the process
339
        // of being opened.
340
        channelOpeningStateBucket = []byte("channelOpeningState")
341
)
342

343
// DB is the primary datastore for the lnd daemon. The database stores
344
// information related to nodes, routing data, open/closed channels, fee
345
// schedules, and reputation data.
346
type DB struct {
347
        kvdb.Backend
348

349
        // channelStateDB separates all DB operations on channel state.
350
        channelStateDB *ChannelStateDB
351

352
        dbPath                    string
353
        clock                     clock.Clock
354
        dryRun                    bool
355
        storeFinalHtlcResolutions bool
356

357
        // noRevLogAmtData if true, means that commitment transaction amount
358
        // data should not be stored in the revocation log.
359
        noRevLogAmtData bool
360
}
361

362
// OpenForTesting opens or creates a channeldb to be used for tests. Any
363
// necessary schemas migrations due to updates will take place as necessary.
364
func OpenForTesting(t testing.TB, dbPath string,
365
        modifiers ...OptionModifier) *DB {
366

367
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
1,464✔
368
                DBPath:            dbPath,
1,464✔
369
                DBFileName:        dbName,
1,464✔
370
                NoFreelistSync:    true,
1,464✔
371
                AutoCompact:       false,
1,464✔
372
                AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
1,464✔
373
                DBTimeout:         kvdb.DefaultDBTimeout,
1,464✔
374
        })
1,464✔
375
        require.NoError(t, err)
1,464✔
376

1,464✔
377
        db, err := CreateWithBackend(backend, modifiers...)
1,464✔
378
        require.NoError(t, err)
1,464✔
379

1,464✔
380
        db.dbPath = dbPath
1,464✔
381

1,464✔
382
        t.Cleanup(func() {
1,464✔
383
                require.NoError(t, db.Close())
1,464✔
384
        })
2,928✔
385

1,464✔
386
        return db
1,464✔
387
}
388

1,464✔
389
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
390
// Any necessary schemas migrations due to updates will take place as necessary.
391
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
392
        error) {
393

394
        opts := DefaultOptions()
1,729✔
395
        for _, modifier := range modifiers {
1,729✔
396
                modifier(&opts)
1,729✔
397
        }
1,900✔
398

171✔
399
        if !opts.NoMigration {
171✔
400
                if err := initChannelDB(backend); err != nil {
401
                        return nil, err
3,458✔
402
                }
1,730✔
403
        }
1✔
404

1✔
405
        chanDB := &DB{
406
                Backend: backend,
407
                channelStateDB: &ChannelStateDB{
1,728✔
408
                        linkNodeDB: &LinkNodeDB{
1,728✔
409
                                backend: backend,
1,728✔
410
                        },
1,728✔
411
                        backend: backend,
1,728✔
412
                },
1,728✔
413
                clock:                     opts.clock,
1,728✔
414
                dryRun:                    opts.dryRun,
1,728✔
415
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
1,728✔
416
                noRevLogAmtData:           opts.NoRevLogAmtData,
1,728✔
417
        }
1,728✔
418

1,728✔
419
        // Set the parent pointer (only used in tests).
1,728✔
420
        chanDB.channelStateDB.parent = chanDB
1,728✔
421

1,728✔
422
        // Synchronize the version of database and apply migrations if needed.
1,728✔
423
        if !opts.NoMigration {
1,728✔
424
                if err := chanDB.syncVersions(dbVersions); err != nil {
1,728✔
425
                        backend.Close()
3,456✔
426
                        return nil, err
1,729✔
427
                }
1✔
428

1✔
429
                // Grab the optional migration config.
1✔
430
                omc := opts.OptionalMiragtionConfig
431
                if err := chanDB.applyOptionalVersions(omc); err != nil {
432
                        backend.Close()
1,727✔
433
                        return nil, err
1,727✔
434
                }
×
UNCOV
435
        }
×
UNCOV
436

×
437
        return chanDB, nil
438
}
439

1,727✔
440
// Path returns the file path to the channel database.
441
func (d *DB) Path() string {
442
        return d.dbPath
443
}
197✔
444

197✔
445
var dbTopLevelBuckets = [][]byte{
197✔
446
        openChannelBucket,
447
        closedChannelBucket,
448
        forwardingLogBucket,
449
        fwdPackagesKey,
450
        invoiceBucket,
451
        payAddrIndexBucket,
452
        setIDIndexBucket,
453
        paymentsIndexBucket,
454
        peersBucket,
455
        nodeInfoBucket,
456
        metaBucket,
457
        closeSummaryBucket,
458
        outpointBucket,
459
        chanIDBucket,
460
        historicalChannelBucket,
461
}
462

463
// Wipe completely deletes all saved state within all used buckets within the
464
// database. The deletion is done in a single transaction, therefore this
465
// operation is fully atomic.
466
func (d *DB) Wipe() error {
467
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
177✔
468
                for _, tlb := range dbTopLevelBuckets {
354✔
469
                        err := tx.DeleteTopLevelBucket(tlb)
2,655✔
470
                        if err != nil && err != kvdb.ErrBucketNotFound {
2,478✔
471
                                return err
2,478✔
472
                        }
×
UNCOV
473
                }
×
474
                return nil
475
        }, func() {})
177✔
476
        if err != nil {
177✔
477
                return err
177✔
478
        }
×
UNCOV
479

×
480
        return initChannelDB(d.Backend)
481
}
177✔
482

483
// initChannelDB creates and initializes a fresh version of channeldb. In the
484
// case that the target path has not yet been created or doesn't yet exist, then
485
// the path is created. Additionally, all required top-level buckets used within
486
// the database are created.
487
func initChannelDB(db kvdb.Backend) error {
488
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
1,906✔
489
                // Check if DB was marked as inactive with a tomb stone.
3,812✔
490
                if err := EnsureNoTombstone(tx); err != nil {
1,906✔
491
                        return err
1,907✔
492
                }
1✔
493

1✔
494
                for _, tlb := range dbTopLevelBuckets {
495
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
28,536✔
496
                                return err
26,631✔
497
                        }
×
UNCOV
498
                }
×
499

500
                meta := &Meta{}
501
                // Check if DB is already initialized.
1,905✔
502
                err := FetchMeta(meta, tx)
1,905✔
503
                if err == nil {
1,905✔
504
                        return nil
3,810✔
505
                }
1,905✔
506

1,905✔
507
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
508
                return putMeta(meta, tx)
×
UNCOV
509
        }, func() {})
×
510
        if err != nil {
1,906✔
511
                return fmt.Errorf("unable to create new channeldb: %w", err)
1,907✔
512
        }
1✔
513

1✔
514
        return nil
515
}
1,905✔
516

517
// fileExists returns true if the file exists, and false otherwise.
518
func fileExists(path string) bool {
519
        if _, err := os.Stat(path); err != nil {
1✔
520
                if os.IsNotExist(err) {
1✔
521
                        return false
×
522
                }
×
UNCOV
523
        }
×
524

525
        return true
526
}
1✔
527

528
// ChannelStateDB is a database that keeps track of all channel state.
529
type ChannelStateDB struct {
530
        // linkNodeDB separates all DB operations on LinkNodes.
531
        linkNodeDB *LinkNodeDB
532

533
        // parent holds a pointer to the "main" channeldb.DB object. This is
534
        // only used for testing and should never be used in production code.
535
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
536
        // this pointer.
537
        parent *DB
538

539
        // backend points to the actual backend holding the channel state
540
        // database. This may be a real backend or a cache middleware.
541
        backend kvdb.Backend
542
}
543

544
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
545
// ChannelStateDB instance. Use this function only in tests where passing around
546
// pointers makes testing less readable. Never to be used in production code!
547
func (c *ChannelStateDB) GetParentDB() *DB {
548
        return c.parent
517✔
549
}
517✔
550

517✔
551
// LinkNodeDB returns the current instance of the link node database.
552
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
553
        return c.linkNodeDB
3✔
554
}
3✔
555

3✔
556
// FetchOpenChannels starts a new database transaction and returns all stored
557
// currently active/open channels associated with the target nodeID. In the case
558
// that no active channels are known to have been created with this node, then a
559
// zero-length slice is returned.
560
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
561
        []*OpenChannel, error) {
562

256✔
563
        var channels []*OpenChannel
256✔
564
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
256✔
565
                var err error
512✔
566
                channels, err = c.fetchOpenChannels(tx, nodeID)
256✔
567
                return err
256✔
568
        }, func() {
256✔
569
                channels = nil
512✔
570
        })
256✔
571

256✔
572
        return channels, err
573
}
256✔
574

575
// fetchOpenChannels uses and existing database transaction and returns all
576
// stored currently active/open channels associated with the target nodeID. In
577
// the case that no active channels are known to have been created with this
578
// node, then a zero-length slice is returned.
579
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
580
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
581

263✔
582
        // Get the bucket dedicated to storing the metadata for open channels.
263✔
583
        openChanBucket := tx.ReadBucket(openChannelBucket)
263✔
584
        if openChanBucket == nil {
263✔
585
                return nil, nil
263✔
586
        }
×
UNCOV
587

×
588
        // Within this top level bucket, fetch the bucket dedicated to storing
589
        // open channel data specific to the remote node.
590
        pub := nodeID.SerializeCompressed()
591
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
263✔
592
        if nodeChanBucket == nil {
263✔
593
                return nil, nil
317✔
594
        }
54✔
595

54✔
596
        // Next, we'll need to go down an additional layer in order to retrieve
597
        // the channels for each chain the node knows of.
598
        var channels []*OpenChannel
599
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
212✔
600
                // If there's a value, it's not a bucket so ignore it.
424✔
601
                if v != nil {
212✔
602
                        return nil
212✔
603
                }
×
UNCOV
604

×
605
                // If we've found a valid chainhash bucket, then we'll retrieve
606
                // that so we can extract all the channels.
607
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
608
                if chainBucket == nil {
212✔
609
                        return fmt.Errorf("unable to read bucket for chain=%x",
212✔
610
                                chainHash[:])
×
611
                }
×
UNCOV
612

×
613
                // Finally, we both of the necessary buckets retrieved, fetch
614
                // all the active channels related to this node.
615
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
616
                if err != nil {
212✔
617
                        return fmt.Errorf("unable to read channel for "+
212✔
618
                                "chain_hash=%x, node_key=%x: %v",
×
619
                                chainHash[:], pub, err)
×
620
                }
×
UNCOV
621

×
622
                channels = append(channels, nodeChannels...)
623
                return nil
212✔
624
        })
212✔
625

626
        return channels, err
627
}
212✔
628

629
// fetchNodeChannels retrieves all active channels from the target chainBucket
630
// which is under a node's dedicated channel bucket. This function is typically
631
// used to fetch all the active channels related to a particular node.
632
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
633
        []*OpenChannel, error) {
634

718✔
635
        var channels []*OpenChannel
718✔
636

718✔
637
        // A node may have channels on several chains, so for each known chain,
718✔
638
        // we'll extract all the channels.
718✔
639
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
718✔
640
                // If there's a value, it's not a bucket so ignore it.
1,494✔
641
                if v != nil {
776✔
642
                        return nil
776✔
643
                }
×
UNCOV
644

×
645
                // Once we've found a valid channel bucket, we'll extract it
646
                // from the node's chain bucket.
647
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
648

776✔
649
                var outPoint wire.OutPoint
776✔
650
                err := graphdb.ReadOutpoint(
776✔
651
                        bytes.NewReader(chanPoint), &outPoint,
776✔
652
                )
776✔
653
                if err != nil {
776✔
654
                        return err
776✔
655
                }
×
UNCOV
656
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
×
657
                if err != nil {
776✔
658
                        return fmt.Errorf("unable to read channel data for "+
776✔
659
                                "chan_point=%v: %w", outPoint, err)
×
660
                }
×
UNCOV
661
                oChannel.Db = c
×
662

776✔
663
                channels = append(channels, oChannel)
776✔
664

776✔
665
                return nil
776✔
666
        })
776✔
667
        if err != nil {
668
                return nil, err
718✔
669
        }
×
UNCOV
670

×
671
        return channels, nil
672
}
718✔
673

674
// FetchChannel attempts to locate a channel specified by the passed channel
675
// point. If the channel cannot be found, then an error will be returned.
676
func (c *ChannelStateDB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel,
677
        error) {
678

13✔
679
        var targetChanPoint bytes.Buffer
13✔
680
        err := graphdb.WriteOutpoint(&targetChanPoint, &chanPoint)
13✔
681
        if err != nil {
13✔
682
                return nil, err
13✔
683
        }
×
UNCOV
684

×
685
        targetChanPointBytes := targetChanPoint.Bytes()
686
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
13✔
687
                error) {
13✔
688

26✔
689
                return targetChanPointBytes, &chanPoint, nil
13✔
690
        }
13✔
691

13✔
692
        return c.channelScanner(nil, selector)
693
}
13✔
694

695
// FetchChannelByID attempts to locate a channel specified by the passed channel
696
// ID. If the channel cannot be found, then an error will be returned.
697
// Optionally an existing db tx can be supplied.
698
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
699
        *OpenChannel, error) {
700

5✔
701
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
5✔
702
                error) {
5✔
703

10✔
704
                var (
5✔
705
                        targetChanPointBytes []byte
5✔
706
                        targetChanPoint      *wire.OutPoint
5✔
707

5✔
708
                        // errChanFound is used to signal that the channel has
5✔
709
                        // been found so that iteration through the DB buckets
5✔
710
                        // can stop.
5✔
711
                        errChanFound = errors.New("channel found")
5✔
712
                )
5✔
713
                err := chainBkt.ForEach(func(k, _ []byte) error {
5✔
714
                        var outPoint wire.OutPoint
10✔
715
                        err := graphdb.ReadOutpoint(
5✔
716
                                bytes.NewReader(k), &outPoint,
5✔
717
                        )
5✔
718
                        if err != nil {
5✔
719
                                return err
5✔
720
                        }
×
UNCOV
721

×
722
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
723
                        if chanID != id {
5✔
724
                                return nil
6✔
725
                        }
1✔
726

1✔
727
                        targetChanPoint = &outPoint
728
                        targetChanPointBytes = k
4✔
729

4✔
730
                        return errChanFound
4✔
731
                })
4✔
732
                if err != nil && !errors.Is(err, errChanFound) {
733
                        return nil, nil, err
5✔
734
                }
×
UNCOV
735
                if targetChanPoint == nil {
×
736
                        return nil, nil, ErrChannelNotFound
6✔
737
                }
1✔
738

1✔
739
                return targetChanPointBytes, targetChanPoint, nil
740
        }
4✔
741

742
        return c.channelScanner(tx, selector)
743
}
5✔
744

745
// ChanCount is used by the server in determining access control.
746
type ChanCount struct {
747
        HasOpenOrClosedChan bool
748
        PendingOpenCount    uint64
749
}
750

751
// FetchPermAndTempPeers returns a map where the key is the remote node's
752
// public key and the value is a struct that has a tally of the pending-open
753
// channels and whether the peer has an open or closed channel with us.
754
func (c *ChannelStateDB) FetchPermAndTempPeers(
755
        chainHash []byte) (map[string]ChanCount, error) {
756

4✔
757
        peerChanInfo := make(map[string]ChanCount)
4✔
758

4✔
759
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
760
                openChanBucket := tx.ReadBucket(openChannelBucket)
8✔
761
                if openChanBucket == nil {
4✔
762
                        return ErrNoChanDBExists
4✔
763
                }
×
UNCOV
764

×
765
                openChanErr := openChanBucket.ForEach(func(nodePub,
766
                        v []byte) error {
4✔
767

10✔
768
                        // If there is a value, this is not a bucket.
6✔
769
                        if v != nil {
6✔
770
                                return nil
6✔
771
                        }
×
UNCOV
772

×
773
                        nodeChanBucket := openChanBucket.NestedReadBucket(
774
                                nodePub,
6✔
775
                        )
6✔
776
                        if nodeChanBucket == nil {
6✔
777
                                return nil
6✔
778
                        }
×
UNCOV
779

×
780
                        chainBucket := nodeChanBucket.NestedReadBucket(
781
                                chainHash,
6✔
782
                        )
6✔
783
                        if chainBucket == nil {
6✔
784
                                return fmt.Errorf("no chain bucket exists")
6✔
785
                        }
×
UNCOV
786

×
787
                        var isPermPeer bool
788
                        var pendingOpenCount uint64
6✔
789

6✔
790
                        internalErr := chainBucket.ForEach(func(chanPoint,
6✔
791
                                val []byte) error {
6✔
792

11✔
793
                                // If there is a value, this is not a bucket.
5✔
794
                                if val != nil {
5✔
795
                                        return nil
5✔
796
                                }
×
UNCOV
797

×
798
                                chanBucket := chainBucket.NestedReadBucket(
799
                                        chanPoint,
5✔
800
                                )
5✔
801
                                if chanBucket == nil {
5✔
802
                                        return nil
5✔
803
                                }
×
UNCOV
804

×
805
                                var op wire.OutPoint
806
                                readErr := graphdb.ReadOutpoint(
5✔
807
                                        bytes.NewReader(chanPoint), &op,
5✔
808
                                )
5✔
809
                                if readErr != nil {
5✔
810
                                        return readErr
5✔
811
                                }
×
UNCOV
812

×
813
                                // We need to go through each channel and look
814
                                // at the IsPending status.
815
                                openChan, err := fetchOpenChannel(
816
                                        chanBucket, &op,
5✔
817
                                )
5✔
818
                                if err != nil {
5✔
819
                                        return err
5✔
820
                                }
×
UNCOV
821

×
822
                                if openChan.IsPending {
823
                                        // Add to the pending-open count since
9✔
824
                                        // this is a temp peer.
4✔
825
                                        pendingOpenCount++
4✔
826
                                        return nil
4✔
827
                                }
4✔
828

4✔
829
                                // Since IsPending is false, this is a perm
830
                                // peer.
831
                                isPermPeer = true
832

4✔
833
                                return nil
4✔
834
                        })
4✔
835
                        if internalErr != nil {
836
                                return internalErr
6✔
837
                        }
×
UNCOV
838

×
839
                        peerCount := ChanCount{
840
                                HasOpenOrClosedChan: isPermPeer,
6✔
841
                                PendingOpenCount:    pendingOpenCount,
6✔
842
                        }
6✔
843
                        peerChanInfo[string(nodePub)] = peerCount
6✔
844

6✔
845
                        return nil
6✔
846
                })
6✔
847
                if openChanErr != nil {
848
                        return openChanErr
4✔
849
                }
×
UNCOV
850

×
851
                // Now check the closed channel bucket.
852
                historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
853
                if historicalChanBucket == nil {
4✔
854
                        return ErrNoHistoricalBucket
4✔
855
                }
×
UNCOV
856

×
857
                historicalErr := historicalChanBucket.ForEach(func(chanPoint,
858
                        v []byte) error {
4✔
859
                        // Parse each nested bucket and the chanInfoKey to get
8✔
860
                        // the IsPending bool. This determines whether the
4✔
861
                        // peer is protected or not.
4✔
862
                        if v != nil {
4✔
863
                                // This is not a bucket. This is currently not
4✔
864
                                // possible.
×
865
                                return nil
×
866
                        }
×
UNCOV
867

×
868
                        chanBucket := historicalChanBucket.NestedReadBucket(
869
                                chanPoint,
4✔
870
                        )
4✔
871
                        if chanBucket == nil {
4✔
872
                                // This is not possible.
4✔
873
                                return fmt.Errorf("no historical channel " +
×
874
                                        "bucket exists")
×
875
                        }
×
UNCOV
876

×
877
                        var op wire.OutPoint
878
                        readErr := graphdb.ReadOutpoint(
4✔
879
                                bytes.NewReader(chanPoint), &op,
4✔
880
                        )
4✔
881
                        if readErr != nil {
4✔
882
                                return readErr
4✔
883
                        }
×
UNCOV
884

×
885
                        // This channel is closed, but the structure of the
886
                        // historical bucket is the same. This is by design,
887
                        // which means we can call fetchOpenChannel.
888
                        channel, fetchErr := fetchOpenChannel(chanBucket, &op)
889
                        if fetchErr != nil {
4✔
890
                                return fetchErr
4✔
891
                        }
×
UNCOV
892

×
893
                        // Only include this peer in the protected class if
894
                        // the closing transaction confirmed. Note that
895
                        // CloseChannel can be called in the funding manager
896
                        // while IsPending is true which is why we need this
897
                        // special-casing to not count premature funding
898
                        // manager calls to CloseChannel.
899
                        if !channel.IsPending {
900
                                // Fetch the public key of the remote node. We
8✔
901
                                // need to use the string-ified serialized,
4✔
902
                                // compressed bytes as the key.
4✔
903
                                remotePub := channel.IdentityPub
4✔
904
                                remoteSer := remotePub.SerializeCompressed()
4✔
905
                                remoteKey := string(remoteSer)
4✔
906

4✔
907
                                count, exists := peerChanInfo[remoteKey]
4✔
908
                                if exists {
4✔
909
                                        count.HasOpenOrClosedChan = true
8✔
910
                                        peerChanInfo[remoteKey] = count
4✔
911
                                } else {
4✔
912
                                        peerCount := ChanCount{
4✔
913
                                                HasOpenOrClosedChan: true,
×
914
                                        }
×
915
                                        peerChanInfo[remoteKey] = peerCount
×
916
                                }
×
UNCOV
917
                        }
×
918

919
                        return nil
920
                })
4✔
921
                if historicalErr != nil {
922
                        return historicalErr
4✔
923
                }
×
UNCOV
924

×
925
                return nil
926
        }, func() {
4✔
927
                clear(peerChanInfo)
4✔
928
        })
4✔
929

4✔
930
        return peerChanInfo, err
931
}
4✔
932

933
// channelSelector describes a function that takes a chain-hash bucket from
934
// within the open-channel DB and returns the wanted channel point bytes, and
935
// channel point. It must return the ErrChannelNotFound error if the wanted
936
// channel is not in the given bucket.
937
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
938
        error)
939

940
// channelScanner will traverse the DB to each chain-hash bucket of each node
941
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
942
// to fetch the wanted channel outpoint from the chain bucket.
943
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
944
        chanSelect channelSelector) (*OpenChannel, error) {
945

15✔
946
        var (
15✔
947
                targetChan *OpenChannel
15✔
948

15✔
949
                // errChanFound is used to signal that the channel has been
15✔
950
                // found so that iteration through the DB buckets can stop.
15✔
951
                errChanFound = errors.New("channel found")
15✔
952
        )
15✔
953

15✔
954
        // chanScan will traverse the following bucket structure:
15✔
955
        //  * nodePub => chainHash => chanPoint
15✔
956
        //
15✔
957
        // At each level we go one further, ensuring that we're traversing the
15✔
958
        // proper key (that's actually a bucket). By only reading the bucket
15✔
959
        // structure and skipping fully decoding each channel, we save a good
15✔
960
        // bit of CPU as we don't need to do things like decompress public
15✔
961
        // keys.
15✔
962
        chanScan := func(tx kvdb.RTx) error {
15✔
963
                // Get the bucket dedicated to storing the metadata for open
30✔
964
                // channels.
15✔
965
                openChanBucket := tx.ReadBucket(openChannelBucket)
15✔
966
                if openChanBucket == nil {
15✔
967
                        return ErrNoActiveChannels
15✔
968
                }
×
UNCOV
969

×
970
                // Within the node channel bucket, are the set of node pubkeys
971
                // we have channels with, we don't know the entire set, so we'll
972
                // check them all.
973
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
974
                        // Ensure that this is a key the same size as a pubkey,
30✔
975
                        // and also that it leads directly to a bucket.
15✔
976
                        if len(nodePub) != 33 || v != nil {
15✔
977
                                return nil
15✔
978
                        }
×
UNCOV
979

×
980
                        nodeChanBucket := openChanBucket.NestedReadBucket(
981
                                nodePub,
15✔
982
                        )
15✔
983
                        if nodeChanBucket == nil {
15✔
984
                                return nil
15✔
985
                        }
×
UNCOV
986

×
987
                        // The next layer down is all the chains that this node
988
                        // has channels on with us.
989
                        return nodeChanBucket.ForEach(func(chainHash,
990
                                v []byte) error {
15✔
991

30✔
992
                                // If there's a value, it's not a bucket so
15✔
993
                                // ignore it.
15✔
994
                                if v != nil {
15✔
995
                                        return nil
15✔
996
                                }
×
UNCOV
997

×
998
                                chainBucket := nodeChanBucket.NestedReadBucket(
999
                                        chainHash,
15✔
1000
                                )
15✔
1001
                                if chainBucket == nil {
15✔
1002
                                        return fmt.Errorf("unable to read "+
15✔
1003
                                                "bucket for chain=%x",
×
1004
                                                chainHash)
×
1005
                                }
×
UNCOV
1006

×
1007
                                // Finally, we reach the leaf bucket that stores
1008
                                // all the chanPoints for this node.
1009
                                targetChanBytes, chanPoint, err := chanSelect(
1010
                                        chainBucket,
15✔
1011
                                )
15✔
1012
                                if errors.Is(err, ErrChannelNotFound) {
15✔
1013
                                        return nil
16✔
1014
                                } else if err != nil {
1✔
1015
                                        return err
15✔
1016
                                }
×
UNCOV
1017

×
1018
                                chanBucket := chainBucket.NestedReadBucket(
1019
                                        targetChanBytes,
14✔
1020
                                )
14✔
1021
                                if chanBucket == nil {
14✔
1022
                                        return nil
21✔
1023
                                }
7✔
1024

7✔
1025
                                channel, err := fetchOpenChannel(
1026
                                        chanBucket, chanPoint,
10✔
1027
                                )
10✔
1028
                                if err != nil {
10✔
1029
                                        return err
10✔
1030
                                }
×
UNCOV
1031

×
1032
                                targetChan = channel
1033
                                targetChan.Db = c
10✔
1034

10✔
1035
                                return errChanFound
10✔
1036
                        })
10✔
1037
                })
1038
        }
1039

1040
        var err error
1041
        if tx == nil {
15✔
1042
                err = kvdb.View(c.backend, chanScan, func() {})
30✔
1043
        } else {
30✔
1044
                err = chanScan(tx)
×
1045
        }
×
UNCOV
1046
        if err != nil && !errors.Is(err, errChanFound) {
×
1047
                return nil, err
15✔
1048
        }
×
UNCOV
1049

×
1050
        if targetChan != nil {
1051
                return targetChan, nil
25✔
1052
        }
10✔
1053

10✔
1054
        // If we can't find the channel, then we return with an error, as we
1055
        // have nothing to back up.
1056
        return nil, ErrChannelNotFound
1057
}
8✔
1058

1059
// FetchAllChannels attempts to retrieve all open channels currently stored
1060
// within the database, including pending open, fully open and channels waiting
1061
// for a closing transaction to confirm.
1062
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
1063
        return fetchChannels(c)
566✔
1064
}
566✔
1065

566✔
1066
// FetchAllOpenChannels will return all channels that have the funding
1067
// transaction confirmed, and is not waiting for a closing transaction to be
1068
// confirmed.
1069
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
1070
        return fetchChannels(
408✔
1071
                c,
408✔
1072
                pendingChannelFilter(false),
408✔
1073
                waitingCloseFilter(false),
408✔
1074
        )
408✔
1075
}
408✔
1076

408✔
1077
// FetchPendingChannels will return channels that have completed the process of
1078
// generating and broadcasting funding transactions, but whose funding
1079
// transactions have yet to be confirmed on the blockchain.
1080
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
1081
        return fetchChannels(c,
81✔
1082
                pendingChannelFilter(true),
81✔
1083
                waitingCloseFilter(false),
81✔
1084
        )
81✔
1085
}
81✔
1086

81✔
1087
// FetchWaitingCloseChannels will return all channels that have been opened,
1088
// but are now waiting for a closing transaction to be confirmed.
1089
//
1090
// NOTE: This includes channels that are also pending to be opened.
1091
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
1092
        return fetchChannels(
4✔
1093
                c, waitingCloseFilter(true),
4✔
1094
        )
4✔
1095
}
4✔
1096

4✔
1097
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
1098
// A set of filters can be combined to filter across multiple dimensions.
1099
type fetchChannelsFilter func(channel *OpenChannel) bool
1100

1101
// pendingChannelFilter returns a filter based on whether channels are pending
1102
// (ie, their funding transaction still needs to confirm). If pending is false,
1103
// channels with confirmed funding transactions are returned.
1104
func pendingChannelFilter(pending bool) fetchChannelsFilter {
1105
        return func(channel *OpenChannel) bool {
495✔
1106
                return channel.IsPending == pending
720✔
1107
        }
225✔
1108
}
225✔
1109

1110
// waitingCloseFilter returns a filter which filters channels based on whether
1111
// they are awaiting the confirmation of their closing transaction. If waiting
1112
// close is true, channels that have had their closing tx broadcast are
1113
// included. If it is false, channels that are not awaiting confirmation of
1114
// their close transaction are returned.
1115
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
1116
        return func(channel *OpenChannel) bool {
493✔
1117
                // If the channel is in any other state than Default,
704✔
1118
                // then it means it is waiting to be closed.
211✔
1119
                channelWaitingClose :=
211✔
1120
                        channel.ChanStatus() != ChanStatusDefault
211✔
1121

211✔
1122
                // Include the channel if it matches the value for
211✔
1123
                // waiting close that we are filtering on.
211✔
1124
                return channelWaitingClose == waitingClose
211✔
1125
        }
211✔
1126
}
211✔
1127

1128
// fetchChannels attempts to retrieve channels currently stored in the
1129
// database. It takes a set of filters which are applied to each channel to
1130
// obtain a set of channels with the desired set of properties. Only channels
1131
// which have a true value returned for *all* of the filters will be returned.
1132
// If no filters are provided, every channel in the open channels bucket will
1133
// be returned.
1134
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
1135
        []*OpenChannel, error) {
1136

1,062✔
1137
        var channels []*OpenChannel
1,062✔
1138

1,062✔
1139
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1,062✔
1140
                // Get the bucket dedicated to storing the metadata for open
2,124✔
1141
                // channels.
1,062✔
1142
                openChanBucket := tx.ReadBucket(openChannelBucket)
1,062✔
1143
                if openChanBucket == nil {
1,062✔
1144
                        return ErrNoActiveChannels
1,062✔
1145
                }
×
UNCOV
1146

×
1147
                // Next, fetch the bucket dedicated to storing metadata related
1148
                // to all nodes. All keys within this bucket are the serialized
1149
                // public keys of all our direct counterparties.
1150
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
1151
                if nodeMetaBucket == nil {
1,062✔
1152
                        return fmt.Errorf("node bucket not created")
1,062✔
1153
                }
×
UNCOV
1154

×
1155
                // Finally for each node public key in the bucket, fetch all
1156
                // the channels related to this particular node.
1157
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
1158
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
1,571✔
1159
                        if nodeChanBucket == nil {
509✔
1160
                                return nil
509✔
1161
                        }
×
UNCOV
1162

×
1163
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
1164
                                // If there's a value, it's not a bucket so
1,018✔
1165
                                // ignore it.
509✔
1166
                                if v != nil {
509✔
1167
                                        return nil
509✔
1168
                                }
×
UNCOV
1169

×
1170
                                // If we've found a valid chainhash bucket,
1171
                                // then we'll retrieve that so we can extract
1172
                                // all the channels.
1173
                                chainBucket := nodeChanBucket.NestedReadBucket(
1174
                                        chainHash,
509✔
1175
                                )
509✔
1176
                                if chainBucket == nil {
509✔
1177
                                        return fmt.Errorf("unable to read "+
509✔
1178
                                                "bucket for chain=%x", chainHash[:])
×
1179
                                }
×
UNCOV
1180

×
1181
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
1182
                                if err != nil {
509✔
1183
                                        return fmt.Errorf("unable to read "+
509✔
1184
                                                "channel for chain_hash=%x, "+
×
1185
                                                "node_key=%x: %v", chainHash[:], k, err)
×
1186
                                }
×
UNCOV
1187
                                for _, channel := range nodeChans {
×
1188
                                        // includeChannel indicates whether the channel
1,055✔
1189
                                        // meets the criteria specified by our filters.
546✔
1190
                                        includeChannel := true
546✔
1191

546✔
1192
                                        // Run through each filter and check whether the
546✔
1193
                                        // channel should be included.
546✔
1194
                                        for _, f := range filters {
546✔
1195
                                                // If the channel fails the filter, set
979✔
1196
                                                // includeChannel to false and don't bother
433✔
1197
                                                // checking the remaining filters.
433✔
1198
                                                if !f(channel) {
433✔
1199
                                                        includeChannel = false
461✔
1200
                                                        break
28✔
1201
                                                }
28✔
1202
                                        }
1203

1204
                                        // If the channel passed every filter, include it in
1205
                                        // our set of channels.
1206
                                        if includeChannel {
1207
                                                channels = append(channels, channel)
1,067✔
1208
                                        }
521✔
1209
                                }
521✔
1210
                                return nil
1211
                        })
509✔
1212

1213
                })
1214
        }, func() {
1215
                channels = nil
1,062✔
1216
        })
1,062✔
1217
        if err != nil {
1,062✔
1218
                return nil, err
1,062✔
1219
        }
×
UNCOV
1220

×
1221
        return channels, nil
1222
}
1,062✔
1223

1224
// FetchClosedChannels attempts to fetch all closed channels from the database.
1225
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1226
// be returned in the response or not. When a channel was cooperatively closed,
1227
// it becomes fully closed after a single confirmation.  When a channel was
1228
// forcibly closed, it will become fully closed after _all_ the pending funds
1229
// (if any) have been swept.
1230
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1231
        []*ChannelCloseSummary, error) {
1232

503✔
1233
        var chanSummaries []*ChannelCloseSummary
503✔
1234

503✔
1235
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
503✔
1236
                closeBucket := tx.ReadBucket(closedChannelBucket)
1,006✔
1237
                if closeBucket == nil {
503✔
1238
                        return ErrNoClosedChannels
503✔
1239
                }
×
UNCOV
1240

×
1241
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
1242
                        summaryReader := bytes.NewReader(summaryBytes)
526✔
1243
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
23✔
1244
                        if err != nil {
23✔
1245
                                return err
23✔
1246
                        }
×
UNCOV
1247

×
1248
                        // If the query specified to only include pending
1249
                        // channels, then we'll skip any channels which aren't
1250
                        // currently pending.
1251
                        if !chanSummary.IsPending && pendingOnly {
1252
                                return nil
27✔
1253
                        }
4✔
1254

4✔
1255
                        chanSummaries = append(chanSummaries, chanSummary)
1256
                        return nil
22✔
1257
                })
22✔
1258
        }, func() {
1259
                chanSummaries = nil
503✔
1260
        }); err != nil {
503✔
1261
                return nil, err
503✔
1262
        }
×
UNCOV
1263

×
1264
        return chanSummaries, nil
1265
}
503✔
1266

1267
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1268
// the channeldb.
1269
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1270

1271
// FetchClosedChannel queries for a channel close summary using the channel
1272
// point of the channel in question.
1273
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1274
        *ChannelCloseSummary, error) {
1275

6✔
1276
        var chanSummary *ChannelCloseSummary
6✔
1277
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1278
                closeBucket := tx.ReadBucket(closedChannelBucket)
12✔
1279
                if closeBucket == nil {
6✔
1280
                        return ErrClosedChannelNotFound
6✔
1281
                }
×
UNCOV
1282

×
1283
                var b bytes.Buffer
1284
                var err error
6✔
1285
                if err = graphdb.WriteOutpoint(&b, chanID); err != nil {
6✔
1286
                        return err
6✔
1287
                }
×
UNCOV
1288

×
1289
                summaryBytes := closeBucket.Get(b.Bytes())
1290
                if summaryBytes == nil {
6✔
1291
                        return ErrClosedChannelNotFound
7✔
1292
                }
1✔
1293

1✔
1294
                summaryReader := bytes.NewReader(summaryBytes)
1295
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
5✔
1296

5✔
1297
                return err
5✔
1298
        }, func() {
5✔
1299
                chanSummary = nil
6✔
1300
        }); err != nil {
6✔
1301
                return nil, err
7✔
1302
        }
1✔
1303

1✔
1304
        return chanSummary, nil
1305
}
5✔
1306

1307
// FetchClosedChannelForID queries for a channel close summary using the
1308
// channel ID of the channel in question.
1309
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1310
        *ChannelCloseSummary, error) {
1311

105✔
1312
        var chanSummary *ChannelCloseSummary
105✔
1313
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
105✔
1314
                closeBucket := tx.ReadBucket(closedChannelBucket)
210✔
1315
                if closeBucket == nil {
105✔
1316
                        return ErrClosedChannelNotFound
105✔
1317
                }
×
UNCOV
1318

×
1319
                // The first 30 bytes of the channel ID and outpoint will be
1320
                // equal.
1321
                cursor := closeBucket.ReadCursor()
1322
                op, c := cursor.Seek(cid[:30])
105✔
1323

105✔
1324
                // We scan over all possible candidates for this channel ID.
105✔
1325
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
105✔
1326
                        var outPoint wire.OutPoint
5,360✔
1327
                        err := graphdb.ReadOutpoint(
5,255✔
1328
                                bytes.NewReader(op), &outPoint,
5,255✔
1329
                        )
5,255✔
1330
                        if err != nil {
5,255✔
1331
                                return err
5,255✔
1332
                        }
×
UNCOV
1333

×
1334
                        // If the found outpoint does not correspond to this
1335
                        // channel ID, we continue.
1336
                        if !cid.IsChanPoint(&outPoint) {
1337
                                continue
10,406✔
1338
                        }
5,151✔
1339

1340
                        // Deserialize the close summary and return.
1341
                        r := bytes.NewReader(c)
1342
                        chanSummary, err = deserializeCloseChannelSummary(r)
104✔
1343
                        if err != nil {
104✔
1344
                                return err
104✔
1345
                        }
×
UNCOV
1346

×
1347
                        return nil
1348
                }
104✔
1349
                return ErrClosedChannelNotFound
1350
        }, func() {
4✔
1351
                chanSummary = nil
105✔
1352
        }); err != nil {
105✔
1353
                return nil, err
109✔
1354
        }
4✔
1355

4✔
1356
        return chanSummary, nil
1357
}
104✔
1358

1359
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1360
// channel should be marked as fully closed if the channel was initially
1361
// cooperatively closed and it's reached a single confirmation, or after all
1362
// the pending funds in a channel that has been forcibly closed have been
1363
// swept.
1364
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
1365
        var (
10✔
1366
                openChannels  []*OpenChannel
10✔
1367
                pruneLinkNode *btcec.PublicKey
10✔
1368
        )
10✔
1369
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
10✔
1370
                var b bytes.Buffer
20✔
1371
                if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
10✔
1372
                        return err
10✔
1373
                }
×
UNCOV
1374

×
1375
                chanID := b.Bytes()
1376

10✔
1377
                closedChanBucket, err := tx.CreateTopLevelBucket(
10✔
1378
                        closedChannelBucket,
10✔
1379
                )
10✔
1380
                if err != nil {
10✔
1381
                        return err
10✔
1382
                }
×
UNCOV
1383

×
1384
                chanSummaryBytes := closedChanBucket.Get(chanID)
1385
                if chanSummaryBytes == nil {
10✔
1386
                        return fmt.Errorf("no closed channel for "+
10✔
1387
                                "chan_point=%v found", chanPoint)
×
1388
                }
×
UNCOV
1389

×
1390
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
1391
                chanSummary, err := deserializeCloseChannelSummary(
10✔
1392
                        chanSummaryReader,
10✔
1393
                )
10✔
1394
                if err != nil {
10✔
1395
                        return err
10✔
1396
                }
×
UNCOV
1397

×
1398
                chanSummary.IsPending = false
1399

10✔
1400
                var newSummary bytes.Buffer
10✔
1401
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
10✔
1402
                if err != nil {
10✔
1403
                        return err
10✔
1404
                }
×
UNCOV
1405

×
1406
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
1407
                if err != nil {
10✔
1408
                        return err
10✔
1409
                }
×
UNCOV
1410

×
1411
                // Now that the channel is closed, we'll check if we have any
1412
                // other open channels with this peer. If we don't we'll
1413
                // garbage collect it to ensure we don't establish persistent
1414
                // connections to peers without open channels.
1415
                pruneLinkNode = chanSummary.RemotePub
1416
                openChannels, err = c.fetchOpenChannels(
10✔
1417
                        tx, pruneLinkNode,
10✔
1418
                )
10✔
1419
                if err != nil {
10✔
1420
                        return fmt.Errorf("unable to fetch open channels for "+
10✔
1421
                                "peer %x: %v",
×
1422
                                pruneLinkNode.SerializeCompressed(), err)
×
1423
                }
×
UNCOV
1424

×
1425
                return nil
1426
        }, func() {
10✔
1427
                openChannels = nil
10✔
1428
                pruneLinkNode = nil
10✔
1429
        })
10✔
1430
        if err != nil {
10✔
1431
                return err
10✔
1432
        }
×
UNCOV
1433

×
1434
        // Decide whether we want to remove the link node, based upon the number
1435
        // of still open channels.
1436
        return c.pruneLinkNode(openChannels, pruneLinkNode)
1437
}
10✔
1438

1439
// pruneLinkNode determines whether we should garbage collect a link node from
1440
// the database due to no longer having any open channels with it. If there are
1441
// any left, then this acts as a no-op.
1442
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1443
        remotePub *btcec.PublicKey) error {
1444

10✔
1445
        if len(openChannels) > 0 {
10✔
1446
                return nil
13✔
1447
        }
3✔
1448

3✔
1449
        log.Infof("Pruning link node %x with zero open channels from database",
1450
                remotePub.SerializeCompressed())
10✔
1451

10✔
1452
        return c.linkNodeDB.DeleteLinkNode(remotePub)
10✔
1453
}
10✔
1454

1455
// PruneLinkNodes attempts to prune all link nodes found within the database
1456
// with whom we no longer have any open channels with.
1457
func (c *ChannelStateDB) PruneLinkNodes() error {
1458
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
3✔
1459
        if err != nil {
3✔
1460
                return err
3✔
1461
        }
×
UNCOV
1462

×
1463
        for _, linkNode := range allLinkNodes {
1464
                var (
6✔
1465
                        openChannels []*OpenChannel
3✔
1466
                        linkNode     = linkNode
3✔
1467
                )
3✔
1468
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
3✔
1469
                        var err error
6✔
1470
                        openChannels, err = c.fetchOpenChannels(
3✔
1471
                                tx, linkNode.IdentityPub,
3✔
1472
                        )
3✔
1473
                        return err
3✔
1474
                }, func() {
3✔
1475
                        openChannels = nil
6✔
1476
                })
3✔
1477
                if err != nil {
3✔
1478
                        return err
3✔
1479
                }
×
UNCOV
1480

×
1481
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
1482
                if err != nil {
3✔
1483
                        return err
3✔
1484
                }
×
UNCOV
1485
        }
×
1486

1487
        return nil
1488
}
3✔
1489

1490
// ChannelShell is a shell of a channel that is meant to be used for channel
1491
// recovery purposes. It contains a minimal OpenChannel instance along with
1492
// addresses for that target node.
1493
type ChannelShell struct {
1494
        // NodeAddrs the set of addresses that this node has known to be
1495
        // reachable at in the past.
1496
        NodeAddrs []net.Addr
1497

1498
        // Chan is a shell of an OpenChannel, it contains only the items
1499
        // required to restore the channel on disk.
1500
        Chan *OpenChannel
1501
}
1502

1503
// RestoreChannelShells is a method that allows the caller to reconstruct the
1504
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1505
// new channel to disk, create a LinkNode instance with the passed node
1506
// addresses, and finally create an edge within the graph for the channel as
1507
// well. This method is idempotent, so repeated calls with the same set of
1508
// channel shells won't modify the database after the initial call.
1509
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
1510
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
4✔
1511
                for _, channelShell := range channelShells {
8✔
1512
                        channel := channelShell.Chan
8✔
1513

4✔
1514
                        // When we make a channel, we mark that the channel has
4✔
1515
                        // been restored, this will signal to other sub-systems
4✔
1516
                        // to not attempt to use the channel as if it was a
4✔
1517
                        // regular one.
4✔
1518
                        channel.chanStatus |= ChanStatusRestored
4✔
1519

4✔
1520
                        // First, we'll attempt to create a new open channel
4✔
1521
                        // and link node for this channel. If the channel
4✔
1522
                        // already exists, then in order to ensure this method
4✔
1523
                        // is idempotent, we'll continue to the next step.
4✔
1524
                        channel.Db = c
4✔
1525
                        err := syncNewChannel(
4✔
1526
                                tx, channel, channelShell.NodeAddrs,
4✔
1527
                        )
4✔
1528
                        if err != nil {
4✔
1529
                                return err
7✔
1530
                        }
3✔
1531
                }
3✔
1532

1533
                return nil
1534
        }, func() {})
4✔
1535
        if err != nil {
4✔
1536
                return err
7✔
1537
        }
3✔
1538

3✔
1539
        return nil
1540
}
4✔
1541

1542
// AddrsForNode consults the channel database for all addresses known to the
1543
// passed node public key. The returned boolean indicates if the given node is
1544
// unknown to the channel DB or not.
1545
//
1546
// NOTE: this is part of the AddrSource interface.
1547
func (d *DB) AddrsForNode(_ context.Context, nodePub *btcec.PublicKey) (bool,
1548
        []net.Addr, error) {
1549

4✔
1550
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
4✔
1551
        // Only if the error is something other than ErrNodeNotFound do we
4✔
1552
        // return it.
4✔
1553
        switch {
4✔
1554
        case err != nil && !errors.Is(err, ErrNodeNotFound):
4✔
1555
                return false, nil, err
×
UNCOV
1556

×
1557
        case errors.Is(err, ErrNodeNotFound):
1558
                return false, nil, nil
×
UNCOV
1559
        }
×
1560

1561
        return true, linkNode.Addresses, nil
1562
}
4✔
1563

1564
// AbandonChannel attempts to remove the target channel from the open channel
1565
// database. If the channel was already removed (has a closed channel entry),
1566
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1567
// into the database.
1568
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1569
        bestHeight uint32) error {
1570

7✔
1571
        // With the chanPoint constructed, we'll attempt to find the target
7✔
1572
        // channel in the database. If we can't find the channel, then we'll
7✔
1573
        // return the error back to the caller.
7✔
1574
        dbChan, err := c.FetchChannel(*chanPoint)
7✔
1575
        switch {
7✔
1576
        // If the channel wasn't found, then it's possible that it was already
7✔
1577
        // abandoned from the database.
1578
        case err == ErrChannelNotFound:
1579
                _, closedErr := c.FetchClosedChannel(chanPoint)
5✔
1580
                if closedErr != nil {
5✔
1581
                        return closedErr
6✔
1582
                }
1✔
1583

1✔
1584
                // If the channel was already closed, then we don't return an
1585
                // error as we'd like this step to be repeatable.
1586
                return nil
1587
        case err != nil:
4✔
1588
                return err
×
UNCOV
1589
        }
×
1590

1591
        // Now that we've found the channel, we'll populate a close summary for
1592
        // the channel, so we can store as much information for this abounded
1593
        // channel as possible. We also ensure that we set Pending to false, to
1594
        // indicate that this channel has been "fully" closed.
1595
        summary := &ChannelCloseSummary{
1596
                CloseType:               Abandoned,
5✔
1597
                ChanPoint:               *chanPoint,
5✔
1598
                ChainHash:               dbChan.ChainHash,
5✔
1599
                CloseHeight:             bestHeight,
5✔
1600
                RemotePub:               dbChan.IdentityPub,
5✔
1601
                Capacity:                dbChan.Capacity,
5✔
1602
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
5✔
1603
                ShortChanID:             dbChan.ShortChanID(),
5✔
1604
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
5✔
1605
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
5✔
1606
                LocalChanConfig:         dbChan.LocalChanCfg,
5✔
1607
        }
5✔
1608

5✔
1609
        // Finally, we'll close the channel in the DB, and return back to the
5✔
1610
        // caller. We set ourselves as the close initiator because we abandoned
5✔
1611
        // the channel.
5✔
1612
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
5✔
1613
}
5✔
1614

1615
// SaveChannelOpeningState saves the serialized channel state for the provided
1616
// chanPoint to the channelOpeningStateBucket.
1617
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1618
        serializedState []byte) error {
1619

95✔
1620
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
95✔
1621
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
190✔
1622
                if err != nil {
95✔
1623
                        return err
95✔
1624
                }
×
UNCOV
1625

×
1626
                return bucket.Put(outPoint, serializedState)
1627
        }, func() {})
95✔
1628
}
95✔
1629

1630
// GetChannelOpeningState fetches the serialized channel state for the provided
1631
// outPoint from the database, or returns ErrChannelNotFound if the channel
1632
// is not found.
1633
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1634
        error) {
1635

254✔
1636
        var serializedState []byte
254✔
1637
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
254✔
1638
                bucket := tx.ReadBucket(channelOpeningStateBucket)
508✔
1639
                if bucket == nil {
254✔
1640
                        // If the bucket does not exist, it means we never added
257✔
1641
                        //  a channel to the db, so return ErrChannelNotFound.
3✔
1642
                        return ErrChannelNotFound
3✔
1643
                }
3✔
1644

3✔
1645
                stateBytes := bucket.Get(outPoint)
1646
                if stateBytes == nil {
254✔
1647
                        return ErrChannelNotFound
304✔
1648
                }
50✔
1649

50✔
1650
                serializedState = append(serializedState, stateBytes...)
1651

207✔
1652
                return nil
207✔
1653
        }, func() {
207✔
1654
                serializedState = nil
254✔
1655
        })
254✔
1656
        return serializedState, err
254✔
1657
}
254✔
1658

1659
// DeleteChannelOpeningState removes any state for outPoint from the database.
1660
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
1661
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
27✔
1662
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
54✔
1663
                if bucket == nil {
27✔
1664
                        return ErrChannelNotFound
27✔
1665
                }
×
UNCOV
1666

×
1667
                return bucket.Delete(outPoint)
1668
        }, func() {})
27✔
1669
}
27✔
1670

1671
// syncVersions function is used for safe db version synchronization. It
1672
// applies migration functions to the current database and recovers the
1673
// previous state of db if at least one error/panic appeared during migration.
1674
func (d *DB) syncVersions(versions []mandatoryVersion) error {
1675
        meta, err := d.FetchMeta()
1,732✔
1676
        if err != nil {
1,732✔
1677
                if err == ErrMetaNotFound {
1,732✔
1678
                        meta = &Meta{}
×
1679
                } else {
×
1680
                        return err
×
1681
                }
×
UNCOV
1682
        }
×
1683

1684
        latestVersion := getLatestDBVersion(versions)
1685
        log.Infof("Checking for schema update: latest_version=%v, "+
1,732✔
1686
                "db_version=%v", latestVersion, meta.DbVersionNumber)
1,732✔
1687

1,732✔
1688
        switch {
1,732✔
1689

1,732✔
1690
        // If the database reports a higher version that we are aware of, the
1691
        // user is probably trying to revert to a prior version of lnd. We fail
1692
        // here to prevent reversions and unintended corruption.
1693
        case meta.DbVersionNumber > latestVersion:
1694
                log.Errorf("Refusing to revert from db_version=%d to "+
1✔
1695
                        "lower version=%d", meta.DbVersionNumber,
1✔
1696
                        latestVersion)
1✔
1697
                return ErrDBReversion
1✔
1698

1✔
1699
        // If the current database version matches the latest version number,
1700
        // then we don't need to perform any migrations.
1701
        case meta.DbVersionNumber == latestVersion:
1702
                return nil
1,727✔
1703
        }
1,727✔
1704

1705
        log.Infof("Performing database schema migration")
1706

4✔
1707
        // Otherwise, we fetch the migrations which need to applied, and
4✔
1708
        // execute them serially within a single database transaction to ensure
4✔
1709
        // the migration is atomic.
4✔
1710
        migrations, migrationVersions := getMigrationsToApply(
4✔
1711
                versions, meta.DbVersionNumber,
4✔
1712
        )
4✔
1713
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
4✔
1714
                for i, migration := range migrations {
8✔
1715
                        if migration == nil {
8✔
1716
                                continue
4✔
UNCOV
1717
                        }
×
1718

1719
                        log.Infof("Applying migration #%v",
1720
                                migrationVersions[i])
4✔
1721

4✔
1722
                        if err := migration(tx); err != nil {
4✔
1723
                                log.Infof("Unable to apply migration #%v",
5✔
1724
                                        migrationVersions[i])
1✔
1725
                                return err
1✔
1726
                        }
1✔
1727
                }
1✔
1728

1729
                meta.DbVersionNumber = latestVersion
1730
                err := putMeta(meta, tx)
2✔
1731
                if err != nil {
2✔
1732
                        return err
2✔
1733
                }
×
UNCOV
1734

×
1735
                // In dry-run mode, return an error to prevent the transaction
1736
                // from committing.
1737
                if d.dryRun {
1738
                        return ErrDryRunMigrationOK
3✔
1739
                }
1✔
1740

1✔
1741
                return nil
1742
        }, func() {})
1✔
1743
}
4✔
1744

1745
// applyOptionalVersions applies the optional migrations to the database if
1746
// specified in the config.
1747
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
1748
        // TODO(yy): need to design the db to support dry run for optional
1,730✔
1749
        // migrations.
1,730✔
1750
        if d.dryRun {
1,730✔
1751
                log.Info("Skipped optional migrations as dry run mode is not " +
1,731✔
1752
                        "supported yet")
1✔
1753
                return nil
1✔
1754
        }
1✔
1755

1✔
1756
        om, err := d.fetchOptionalMeta()
1757
        if err != nil {
1,729✔
1758
                if err == ErrMetaNotFound {
1,729✔
1759
                        om = &OptionalMeta{
×
1760
                                Versions: make(map[uint64]string),
×
1761
                        }
×
1762
                } else {
×
1763
                        return fmt.Errorf("unable to fetch optional "+
×
1764
                                "meta: %w", err)
×
1765
                }
×
UNCOV
1766
        }
×
1767

1768
        // migrationCfg is the parent configuration which implements the config
1769
        // interfaces of all the single optional migrations.
1770
        migrationCfg := &MigrationConfigImpl{
1771
                migration30.MigrateRevLogConfigImpl{
1,729✔
1772
                        NoAmountData: d.noRevLogAmtData,
1,729✔
1773
                },
1,729✔
1774
                migration34.MigrationConfigImpl{
1,729✔
1775
                        DecayedLog: cfg.DecayedLog,
1,729✔
1776
                },
1,729✔
1777
        }
1,729✔
1778

1,729✔
1779
        log.Infof("Applying %d optional migrations", len(optionalVersions))
1,729✔
1780

1,729✔
1781
        // Apply the optional migrations if requested.
1,729✔
1782
        for number, version := range optionalVersions {
1,729✔
1783
                log.Infof("Checking for optional update: name=%v", version.name)
5,184✔
1784

3,455✔
1785
                // Exit early if the optional migration is not specified.
3,455✔
1786
                if !cfg.MigrationFlags[number] {
3,455✔
1787
                        log.Debugf("Skipping optional migration: name=%s as "+
6,906✔
1788
                                "it is not specified in the config",
3,451✔
1789
                                version.name)
3,451✔
1790

3,451✔
1791
                        continue
3,451✔
1792
                }
3,451✔
1793

1794
                // Exit early if the optional migration has already been
1795
                // applied.
1796
                if _, ok := om.Versions[uint64(number)]; ok {
1797
                        log.Debugf("Skipping optional migration: name=%s as "+
12✔
1798
                                "it has already been applied", version.name)
5✔
1799

5✔
1800
                        continue
5✔
1801
                }
5✔
1802

1803
                log.Infof("Performing database optional migration: %s",
1804
                        version.name)
5✔
1805

5✔
1806
                // Call the migration function for the specific optional
5✔
1807
                // migration.
5✔
1808
                if err := version.migration(d, migrationCfg); err != nil {
5✔
1809
                        log.Errorf("Unable to apply optional migration: %s, "+
5✔
1810
                                "error: %v", version.name, err)
×
1811
                        return err
×
1812
                }
×
UNCOV
1813

×
1814
                // Update the optional meta. Notice that unlike the mandatory db
1815
                // migrations where we perform the migration and updating meta
1816
                // in a single db transaction, we use different transactions
1817
                // here. Even when the following update is failed, we should be
1818
                // fine here as we would re-run the optional migration again,
1819
                // which is a noop, during next startup.
1820
                om.Versions[uint64(number)] = version.name
1821
                if err := d.putOptionalMeta(om); err != nil {
5✔
1822
                        log.Errorf("Unable to update optional meta: %v", err)
5✔
1823
                        return err
×
1824
                }
×
UNCOV
1825

×
1826
                log.Infof("Successfully applied optional migration: %s",
1827
                        version.name)
5✔
1828
        }
5✔
1829

1830
        return nil
1831
}
1,729✔
1832

1833
// ChannelStateDB returns the sub database that is concerned with the channel
1834
// state.
1835
func (d *DB) ChannelStateDB() *ChannelStateDB {
1836
        return d.channelStateDB
2,179✔
1837
}
2,179✔
1838

2,179✔
1839
// LatestDBVersion returns the number of the latest database version currently
1840
// known to the channel DB.
1841
func LatestDBVersion() uint32 {
1842
        return getLatestDBVersion(dbVersions)
1✔
1843
}
1✔
1844

1✔
1845
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
1846
        return versions[len(versions)-1].number
5,363✔
1847
}
5,363✔
1848

5,363✔
1849
// getMigrationsToApply retrieves the migration function that should be
1850
// applied to the database.
1851
func getMigrationsToApply(versions []mandatoryVersion,
1852
        version uint32) ([]migration, []uint32) {
1853

5✔
1854
        migrations := make([]migration, 0, len(versions))
5✔
1855
        migrationVersions := make([]uint32, 0, len(versions))
5✔
1856

5✔
1857
        for _, v := range versions {
5✔
1858
                if v.number > version {
17✔
1859
                        migrations = append(migrations, v.migration)
18✔
1860
                        migrationVersions = append(migrationVersions, v.number)
6✔
1861
                }
6✔
1862
        }
6✔
1863

1864
        return migrations, migrationVersions
1865
}
5✔
1866

1867
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1868
// from the historical channel bucket. If the bucket does not exist,
1869
// ErrNoHistoricalBucket is returned.
1870
func fetchHistoricalChanBucket(tx kvdb.RTx,
1871
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
1872

7✔
1873
        // First fetch the top level bucket which stores all data related to
7✔
1874
        // historically stored channels.
7✔
1875
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
7✔
1876
        if historicalChanBucket == nil {
7✔
1877
                return nil, ErrNoHistoricalBucket
7✔
1878
        }
×
UNCOV
1879

×
1880
        // With the bucket for the node and chain fetched, we can now go down
1881
        // another level, for the channel itself.
1882
        var chanPointBuf bytes.Buffer
1883
        if err := graphdb.WriteOutpoint(&chanPointBuf, outPoint); err != nil {
7✔
1884
                return nil, err
7✔
1885
        }
×
UNCOV
1886
        chanBucket := historicalChanBucket.NestedReadBucket(
×
1887
                chanPointBuf.Bytes(),
7✔
1888
        )
7✔
1889
        if chanBucket == nil {
7✔
1890
                return nil, ErrChannelNotFound
9✔
1891
        }
2✔
1892

2✔
1893
        return chanBucket, nil
1894
}
5✔
1895

1896
// FetchHistoricalChannel fetches open channel data from the historical channel
1897
// bucket.
1898
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1899
        *OpenChannel, error) {
1900

7✔
1901
        var channel *OpenChannel
7✔
1902
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
7✔
1903
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
14✔
1904
                if err != nil {
7✔
1905
                        return err
9✔
1906
                }
2✔
1907

2✔
1908
                channel, err = fetchOpenChannel(chanBucket, outPoint)
1909
                if err != nil {
5✔
1910
                        return err
5✔
1911
                }
×
UNCOV
1912

×
1913
                channel.Db = c
1914
                return nil
5✔
1915
        }, func() {
5✔
1916
                channel = nil
7✔
1917
        })
7✔
1918
        if err != nil {
7✔
1919
                return nil, err
9✔
1920
        }
2✔
1921

2✔
1922
        return channel, nil
1923
}
5✔
1924

1925
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1926
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
1927

13✔
1928
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
13✔
1929
        if finalHtlcsBucket == nil {
13✔
1930
                return nil, ErrFinalHtlcsBucketNotFound
22✔
1931
        }
9✔
1932

9✔
1933
        var chanIDBytes [8]byte
1934
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
7✔
1935

7✔
1936
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
7✔
1937
        if chanBucket == nil {
7✔
1938
                return nil, ErrFinalChannelBucketNotFound
7✔
1939
        }
×
UNCOV
1940

×
1941
        return chanBucket, nil
1942
}
7✔
1943

1944
var ErrHtlcUnknown = errors.New("htlc unknown")
1945

1946
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1947
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1948
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1949
        htlcIndex uint64) (*FinalHtlcInfo, error) {
1950

13✔
1951
        var idBytes [8]byte
13✔
1952
        byteOrder.PutUint64(idBytes[:], htlcIndex)
13✔
1953

13✔
1954
        var settledByte byte
13✔
1955

13✔
1956
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
13✔
1957
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
26✔
1958
                        tx, chanID,
13✔
1959
                )
13✔
1960
                switch {
13✔
1961
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
13✔
1962
                        fallthrough
9✔
1963

9✔
1964
                case errors.Is(err, ErrFinalChannelBucketNotFound):
1965
                        return ErrHtlcUnknown
9✔
1966

9✔
1967
                case err != nil:
1968
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1969
                                err)
×
UNCOV
1970
                }
×
1971

1972
                value := finalHtlcsBucket.Get(idBytes[:])
1973
                if value == nil {
7✔
1974
                        return ErrHtlcUnknown
8✔
1975
                }
1✔
1976

1✔
1977
                if len(value) != 1 {
1978
                        return errors.New("unexpected final htlc value length")
6✔
1979
                }
×
UNCOV
1980

×
1981
                settledByte = value[0]
1982

6✔
1983
                return nil
6✔
1984
        }, func() {
6✔
1985
                settledByte = 0
13✔
1986
        })
13✔
1987
        if err != nil {
13✔
1988
                return nil, err
23✔
1989
        }
10✔
1990

10✔
1991
        info := FinalHtlcInfo{
1992
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
6✔
1993
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
6✔
1994
        }
6✔
1995

6✔
1996
        return &info, nil
6✔
1997
}
6✔
1998

1999
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
2000
// the database.
2001
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
2002
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
2003

4✔
2004
        // Skip if the user did not opt in to storing final resolutions.
4✔
2005
        if !c.parent.storeFinalHtlcResolutions {
4✔
2006
                return nil
7✔
2007
        }
3✔
2008

3✔
2009
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2010
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
2✔
2011
                if err != nil {
1✔
2012
                        return err
1✔
2013
                }
×
UNCOV
2014

×
2015
                return putFinalHtlc(
2016
                        finalHtlcsBucket, htlcID,
1✔
2017
                        FinalHtlcInfo{
1✔
2018
                                Settled:  settled,
1✔
2019
                                Offchain: false,
1✔
2020
                        },
1✔
2021
                )
1✔
2022
        }, func() {})
1✔
2023
}
1✔
2024

2025
// MakeTestInvoiceDB is used to create a test invoice database for testing
2026
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
2027
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
2028
        invoices.InvoiceDB, error) {
2029

154✔
2030
        return MakeTestDB(t, modifiers...)
154✔
2031
}
154✔
2032

154✔
2033
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
2034
// A callback which cleans up the created temporary directories is also
2035
// returned and intended to be executed after the test completes.
2036
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
2037
        // First, create a temporary directory to be used for the duration of
257✔
2038
        // this test.
257✔
2039
        tempDirName := t.TempDir()
257✔
2040

257✔
2041
        // Next, create channeldb for the first time.
257✔
2042
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
257✔
2043
        if err != nil {
257✔
2044
                backendCleanup()
257✔
2045
                return nil, err
×
2046
        }
×
UNCOV
2047

×
2048
        cdb, err := CreateWithBackend(backend, modifiers...)
2049
        if err != nil {
257✔
2050
                backendCleanup()
257✔
2051
                return nil, err
×
2052
        }
×
UNCOV
2053

×
2054
        t.Cleanup(func() {
2055
                cdb.Close()
514✔
2056
                backendCleanup()
257✔
2057
        })
257✔
2058

257✔
2059
        return cdb, nil
2060
}
257✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc