• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.27
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "net"
8
        "os"
9
        "testing"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/go-errors/errors"
15
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
16
        "github.com/lightningnetwork/lnd/channeldb/migration12"
17
        "github.com/lightningnetwork/lnd/channeldb/migration13"
18
        "github.com/lightningnetwork/lnd/channeldb/migration16"
19
        "github.com/lightningnetwork/lnd/channeldb/migration20"
20
        "github.com/lightningnetwork/lnd/channeldb/migration21"
21
        "github.com/lightningnetwork/lnd/channeldb/migration23"
22
        "github.com/lightningnetwork/lnd/channeldb/migration24"
23
        "github.com/lightningnetwork/lnd/channeldb/migration25"
24
        "github.com/lightningnetwork/lnd/channeldb/migration26"
25
        "github.com/lightningnetwork/lnd/channeldb/migration27"
26
        "github.com/lightningnetwork/lnd/channeldb/migration29"
27
        "github.com/lightningnetwork/lnd/channeldb/migration30"
28
        "github.com/lightningnetwork/lnd/channeldb/migration31"
29
        "github.com/lightningnetwork/lnd/channeldb/migration32"
30
        "github.com/lightningnetwork/lnd/channeldb/migration33"
31
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
32
        "github.com/lightningnetwork/lnd/clock"
33
        "github.com/lightningnetwork/lnd/invoices"
34
        "github.com/lightningnetwork/lnd/kvdb"
35
        "github.com/lightningnetwork/lnd/lnwire"
36
        "github.com/lightningnetwork/lnd/routing/route"
37
)
38

39
const (
40
        dbName = "channel.db"
41
)
42

43
var (
44
        // ErrDryRunMigrationOK signals that a migration executed successful,
45
        // but we intentionally did not commit the result.
46
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
47

48
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
49
        // bucket does not exist.
50
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
51
                "found")
52

53
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
54
        // final htlc outcomes does not exist.
55
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
56
                "bucket not found")
57
)
58

59
// migration is a function which takes a prior outdated version of the database
60
// instances and mutates the key/bucket structure to arrive at a more
61
// up-to-date version of the database.
62
type migration func(tx kvdb.RwTx) error
63

64
// mandatoryVersion defines a db version that must be applied before the lnd
65
// starts.
66
type mandatoryVersion struct {
67
        number    uint32
68
        migration migration
69
}
70

71
// MigrationConfig is an interface combines the config interfaces of all
72
// optional migrations.
73
type MigrationConfig interface {
74
        migration30.MigrateRevLogConfig
75
}
76

77
// MigrationConfigImpl is a super set of all the various migration configs and
78
// an implementation of MigrationConfig.
79
type MigrationConfigImpl struct {
80
        migration30.MigrateRevLogConfigImpl
81
}
82

83
// optionalMigration defines an optional migration function. When a migration
84
// is optional, it usually involves a large scale of changes that might touch
85
// millions of keys. Due to OOM concern, the update cannot be safely done
86
// within one db transaction. Thus, for optional migrations, they must take the
87
// db backend and construct transactions as needed.
88
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
89

90
// optionalVersion defines a db version that can be optionally applied. When
91
// applying migrations, we must apply all the mandatory migrations first before
92
// attempting optional ones.
93
type optionalVersion struct {
94
        name      string
95
        migration optionalMigration
96
}
97

98
var (
99
        // dbVersions is storing all mandatory versions of database. If current
100
        // version of database don't match with latest version this list will
101
        // be used for retrieving all migration function that are need to apply
102
        // to the current db.
103
        dbVersions = []mandatoryVersion{
104
                {
105
                        // The base DB version requires no migration.
106
                        number:    0,
107
                        migration: nil,
108
                },
109
                {
110
                        // The version of the database where two new indexes
111
                        // for the update time of node and channel updates were
112
                        // added.
113
                        number:    1,
114
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
115
                },
116
                {
117
                        // The DB version that added the invoice event time
118
                        // series.
119
                        number:    2,
120
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
121
                },
122
                {
123
                        // The DB version that updated the embedded invoice in
124
                        // outgoing payments to match the new format.
125
                        number:    3,
126
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
127
                },
128
                {
129
                        // The version of the database where every channel
130
                        // always has two entries in the edges bucket. If
131
                        // a policy is unknown, this will be represented
132
                        // by a special byte sequence.
133
                        number:    4,
134
                        migration: migration_01_to_11.MigrateEdgePolicies,
135
                },
136
                {
137
                        // The DB version where we persist each attempt to send
138
                        // an HTLC to a payment hash, and track whether the
139
                        // payment is in-flight, succeeded, or failed.
140
                        number:    5,
141
                        migration: migration_01_to_11.PaymentStatusesMigration,
142
                },
143
                {
144
                        // The DB version that properly prunes stale entries
145
                        // from the edge update index.
146
                        number:    6,
147
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
148
                },
149
                {
150
                        // The DB version that migrates the ChannelCloseSummary
151
                        // to a format where optional fields are indicated with
152
                        // boolean flags.
153
                        number:    7,
154
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
155
                },
156
                {
157
                        // The DB version that changes the gossiper's message
158
                        // store keys to account for the message's type and
159
                        // ShortChannelID.
160
                        number:    8,
161
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
162
                },
163
                {
164
                        // The DB version where the payments and payment
165
                        // statuses are moved to being stored in a combined
166
                        // bucket.
167
                        number:    9,
168
                        migration: migration_01_to_11.MigrateOutgoingPayments,
169
                },
170
                {
171
                        // The DB version where we started to store legacy
172
                        // payload information for all routes, as well as the
173
                        // optional TLV records.
174
                        number:    10,
175
                        migration: migration_01_to_11.MigrateRouteSerialization,
176
                },
177
                {
178
                        // Add invoice htlc and cltv delta fields.
179
                        number:    11,
180
                        migration: migration_01_to_11.MigrateInvoices,
181
                },
182
                {
183
                        // Migrate to TLV invoice bodies, add payment address
184
                        // and features, remove receipt.
185
                        number:    12,
186
                        migration: migration12.MigrateInvoiceTLV,
187
                },
188
                {
189
                        // Migrate to multi-path payments.
190
                        number:    13,
191
                        migration: migration13.MigrateMPP,
192
                },
193
                {
194
                        // Initialize payment address index and begin using it
195
                        // as the default index, falling back to payment hash
196
                        // index.
197
                        number:    14,
198
                        migration: mig.CreateTLB(payAddrIndexBucket),
199
                },
200
                {
201
                        // Initialize payment index bucket which will be used
202
                        // to index payments by sequence number. This index will
203
                        // be used to allow more efficient ListPayments queries.
204
                        number:    15,
205
                        migration: mig.CreateTLB(paymentsIndexBucket),
206
                },
207
                {
208
                        // Add our existing payments to the index bucket created
209
                        // in migration 15.
210
                        number:    16,
211
                        migration: migration16.MigrateSequenceIndex,
212
                },
213
                {
214
                        // Create a top level bucket which will store extra
215
                        // information about channel closes.
216
                        number:    17,
217
                        migration: mig.CreateTLB(closeSummaryBucket),
218
                },
219
                {
220
                        // Create a top level bucket which holds information
221
                        // about our peers.
222
                        number:    18,
223
                        migration: mig.CreateTLB(peersBucket),
224
                },
225
                {
226
                        // Create a top level bucket which holds outpoint
227
                        // information.
228
                        number:    19,
229
                        migration: mig.CreateTLB(outpointBucket),
230
                },
231
                {
232
                        // Migrate some data to the outpoint index.
233
                        number:    20,
234
                        migration: migration20.MigrateOutpointIndex,
235
                },
236
                {
237
                        // Migrate to length prefixed wire messages everywhere
238
                        // in the database.
239
                        number:    21,
240
                        migration: migration21.MigrateDatabaseWireMessages,
241
                },
242
                {
243
                        // Initialize set id index so that invoices can be
244
                        // queried by individual htlc sets.
245
                        number:    22,
246
                        migration: mig.CreateTLB(setIDIndexBucket),
247
                },
248
                {
249
                        number:    23,
250
                        migration: migration23.MigrateHtlcAttempts,
251
                },
252
                {
253
                        // Remove old forwarding packages of closed channels.
254
                        number:    24,
255
                        migration: migration24.MigrateFwdPkgCleanup,
256
                },
257
                {
258
                        // Save the initial local/remote balances in channel
259
                        // info.
260
                        number:    25,
261
                        migration: migration25.MigrateInitialBalances,
262
                },
263
                {
264
                        // Migrate the initial local/remote balance fields into
265
                        // tlv records.
266
                        number:    26,
267
                        migration: migration26.MigrateBalancesToTlvRecords,
268
                },
269
                {
270
                        // Patch the initial local/remote balance fields with
271
                        // empty values for historical channels.
272
                        number:    27,
273
                        migration: migration27.MigrateHistoricalBalances,
274
                },
275
                {
276
                        number:    28,
277
                        migration: mig.CreateTLB(chanIDBucket),
278
                },
279
                {
280
                        number:    29,
281
                        migration: migration29.MigrateChanID,
282
                },
283
                {
284
                        // Removes the "sweeper-last-tx" bucket. Although we
285
                        // do not have a mandatory version 30 we skip this
286
                        // version because its naming is already used for the
287
                        // first optional migration.
288
                        number:    31,
289
                        migration: migration31.DeleteLastPublishedTxTLB,
290
                },
291
                {
292
                        number:    32,
293
                        migration: migration32.MigrateMCRouteSerialisation,
294
                },
295
                {
296
                        number:    33,
297
                        migration: migration33.MigrateMCStoreNameSpacedResults,
298
                },
299
        }
300

301
        // optionalVersions stores all optional migrations that are applied
302
        // after dbVersions.
303
        //
304
        // NOTE: optional migrations must be fault-tolerant and re-run already
305
        // migrated data must be noop, which means the migration must be able
306
        // to determine its state.
307
        optionalVersions = []optionalVersion{
308
                {
309
                        name: "prune revocation log",
310
                        migration: func(db kvdb.Backend,
311
                                cfg MigrationConfig) error {
×
312

×
313
                                return migration30.MigrateRevocationLog(db, cfg)
×
314
                        },
×
315
                },
316
        }
317

318
        // Big endian is the preferred byte order, due to cursor scans over
319
        // integer keys iterating in order.
320
        byteOrder = binary.BigEndian
321

322
        // channelOpeningStateBucket is the database bucket used to store the
323
        // channelOpeningState for each channel that is currently in the process
324
        // of being opened.
325
        channelOpeningStateBucket = []byte("channelOpeningState")
326
)
327

328
// DB is the primary datastore for the lnd daemon. The database stores
329
// information related to nodes, routing data, open/closed channels, fee
330
// schedules, and reputation data.
331
type DB struct {
332
        kvdb.Backend
333

334
        // channelStateDB separates all DB operations on channel state.
335
        channelStateDB *ChannelStateDB
336

337
        dbPath                    string
338
        graph                     *ChannelGraph
339
        clock                     clock.Clock
340
        dryRun                    bool
341
        keepFailedPaymentAttempts bool
342
        storeFinalHtlcResolutions bool
343

344
        // noRevLogAmtData if true, means that commitment transaction amount
345
        // data should not be stored in the revocation log.
346
        noRevLogAmtData bool
347
}
348

349
// Open opens or creates channeldb. Any necessary schemas migrations due
350
// to updates will take place as necessary.
351
// TODO(bhandras): deprecate this function.
352
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
1,451✔
353
        opts := DefaultOptions()
1,451✔
354
        for _, modifier := range modifiers {
1,473✔
355
                modifier(&opts)
22✔
356
        }
22✔
357

358
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
1,451✔
359
                DBPath:            dbPath,
1,451✔
360
                DBFileName:        dbName,
1,451✔
361
                NoFreelistSync:    opts.NoFreelistSync,
1,451✔
362
                AutoCompact:       opts.AutoCompact,
1,451✔
363
                AutoCompactMinAge: opts.AutoCompactMinAge,
1,451✔
364
                DBTimeout:         opts.DBTimeout,
1,451✔
365
        })
1,451✔
366
        if err != nil {
1,451✔
367
                return nil, err
×
368
        }
×
369

370
        db, err := CreateWithBackend(backend, modifiers...)
1,451✔
371
        if err == nil {
2,902✔
372
                db.dbPath = dbPath
1,451✔
373
        }
1,451✔
374
        return db, err
1,451✔
375
}
376

377
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
378
// Any necessary schemas migrations due to updates will take place as necessary.
379
func CreateWithBackend(backend kvdb.Backend,
380
        modifiers ...OptionModifier) (*DB, error) {
1,739✔
381

1,739✔
382
        opts := DefaultOptions()
1,739✔
383
        for _, modifier := range modifiers {
1,913✔
384
                modifier(&opts)
174✔
385
        }
174✔
386

387
        if !opts.NoMigration {
3,478✔
388
                if err := initChannelDB(backend); err != nil {
1,740✔
389
                        return nil, err
1✔
390
                }
1✔
391
        }
392

393
        chanDB := &DB{
1,738✔
394
                Backend: backend,
1,738✔
395
                channelStateDB: &ChannelStateDB{
1,738✔
396
                        linkNodeDB: &LinkNodeDB{
1,738✔
397
                                backend: backend,
1,738✔
398
                        },
1,738✔
399
                        backend: backend,
1,738✔
400
                },
1,738✔
401
                clock:                     opts.clock,
1,738✔
402
                dryRun:                    opts.dryRun,
1,738✔
403
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
1,738✔
404
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
1,738✔
405
                noRevLogAmtData:           opts.NoRevLogAmtData,
1,738✔
406
        }
1,738✔
407

1,738✔
408
        // Set the parent pointer (only used in tests).
1,738✔
409
        chanDB.channelStateDB.parent = chanDB
1,738✔
410

1,738✔
411
        var err error
1,738✔
412
        chanDB.graph, err = NewChannelGraph(
1,738✔
413
                backend, opts.RejectCacheSize, opts.ChannelCacheSize,
1,738✔
414
                opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
1,738✔
415
                opts.UseGraphCache, opts.NoMigration,
1,738✔
416
        )
1,738✔
417
        if err != nil {
1,738✔
418
                return nil, err
×
419
        }
×
420

421
        // Synchronize the version of database and apply migrations if needed.
422
        if !opts.NoMigration {
3,476✔
423
                if err := chanDB.syncVersions(dbVersions); err != nil {
1,739✔
424
                        backend.Close()
1✔
425
                        return nil, err
1✔
426
                }
1✔
427

428
                // Grab the optional migration config.
429
                omc := opts.OptionalMiragtionConfig
1,737✔
430
                if err := chanDB.applyOptionalVersions(omc); err != nil {
1,737✔
431
                        backend.Close()
×
432
                        return nil, err
×
433
                }
×
434
        }
435

436
        return chanDB, nil
1,737✔
437
}
438

439
// Path returns the file path to the channel database.
440
func (d *DB) Path() string {
197✔
441
        return d.dbPath
197✔
442
}
197✔
443

444
var dbTopLevelBuckets = [][]byte{
445
        openChannelBucket,
446
        closedChannelBucket,
447
        forwardingLogBucket,
448
        fwdPackagesKey,
449
        invoiceBucket,
450
        payAddrIndexBucket,
451
        setIDIndexBucket,
452
        paymentsIndexBucket,
453
        peersBucket,
454
        nodeInfoBucket,
455
        metaBucket,
456
        closeSummaryBucket,
457
        outpointBucket,
458
        chanIDBucket,
459
        historicalChannelBucket,
460
}
461

462
// Wipe completely deletes all saved state within all used buckets within the
463
// database. The deletion is done in a single transaction, therefore this
464
// operation is fully atomic.
465
func (d *DB) Wipe() error {
169✔
466
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
338✔
467
                for _, tlb := range dbTopLevelBuckets {
2,704✔
468
                        err := tx.DeleteTopLevelBucket(tlb)
2,535✔
469
                        if err != nil && err != kvdb.ErrBucketNotFound {
2,535✔
470
                                return err
×
471
                        }
×
472
                }
473
                return nil
169✔
474
        }, func() {})
169✔
475
        if err != nil {
169✔
476
                return err
×
477
        }
×
478

479
        return initChannelDB(d.Backend)
169✔
480
}
481

482
// initChannelDB creates and initializes a fresh version of channeldb. In the
483
// case that the target path has not yet been created or doesn't yet exist, then
484
// the path is created. Additionally, all required top-level buckets used within
485
// the database are created.
486
func initChannelDB(db kvdb.Backend) error {
1,908✔
487
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
3,816✔
488
                // Check if DB was marked as inactive with a tomb stone.
1,908✔
489
                if err := EnsureNoTombstone(tx); err != nil {
1,909✔
490
                        return err
1✔
491
                }
1✔
492

493
                meta := &Meta{}
1,907✔
494
                // Check if DB is already initialized.
1,907✔
495
                err := FetchMeta(meta, tx)
1,907✔
496
                if err == nil {
2,113✔
497
                        return nil
206✔
498
                }
206✔
499

500
                for _, tlb := range dbTopLevelBuckets {
27,216✔
501
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
25,515✔
502
                                return err
×
503
                        }
×
504
                }
505

506
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
1,701✔
507
                return putMeta(meta, tx)
1,701✔
508
        }, func() {})
1,908✔
509
        if err != nil {
1,909✔
510
                return fmt.Errorf("unable to create new channeldb: %w", err)
1✔
511
        }
1✔
512

513
        return nil
1,907✔
514
}
515

516
// fileExists returns true if the file exists, and false otherwise.
517
func fileExists(path string) bool {
1✔
518
        if _, err := os.Stat(path); err != nil {
1✔
519
                if os.IsNotExist(err) {
×
520
                        return false
×
521
                }
×
522
        }
523

524
        return true
1✔
525
}
526

527
// ChannelStateDB is a database that keeps track of all channel state.
528
type ChannelStateDB struct {
529
        // linkNodeDB separates all DB operations on LinkNodes.
530
        linkNodeDB *LinkNodeDB
531

532
        // parent holds a pointer to the "main" channeldb.DB object. This is
533
        // only used for testing and should never be used in production code.
534
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
535
        // this pointer.
536
        parent *DB
537

538
        // backend points to the actual backend holding the channel state
539
        // database. This may be a real backend or a cache middleware.
540
        backend kvdb.Backend
541
}
542

543
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
544
// ChannelStateDB instance. Use this function only in tests where passing around
545
// pointers makes testing less readable. Never to be used in production code!
546
func (c *ChannelStateDB) GetParentDB() *DB {
509✔
547
        return c.parent
509✔
548
}
509✔
549

550
// LinkNodeDB returns the current instance of the link node database.
UNCOV
551
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
×
UNCOV
552
        return c.linkNodeDB
×
UNCOV
553
}
×
554

555
// FetchOpenChannels starts a new database transaction and returns all stored
556
// currently active/open channels associated with the target nodeID. In the case
557
// that no active channels are known to have been created with this node, then a
558
// zero-length slice is returned.
559
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
560
        []*OpenChannel, error) {
253✔
561

253✔
562
        var channels []*OpenChannel
253✔
563
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
506✔
564
                var err error
253✔
565
                channels, err = c.fetchOpenChannels(tx, nodeID)
253✔
566
                return err
253✔
567
        }, func() {
506✔
568
                channels = nil
253✔
569
        })
253✔
570

571
        return channels, err
253✔
572
}
573

574
// fetchOpenChannels uses and existing database transaction and returns all
575
// stored currently active/open channels associated with the target nodeID. In
576
// the case that no active channels are known to have been created with this
577
// node, then a zero-length slice is returned.
578
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
579
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
260✔
580

260✔
581
        // Get the bucket dedicated to storing the metadata for open channels.
260✔
582
        openChanBucket := tx.ReadBucket(openChannelBucket)
260✔
583
        if openChanBucket == nil {
260✔
584
                return nil, nil
×
585
        }
×
586

587
        // Within this top level bucket, fetch the bucket dedicated to storing
588
        // open channel data specific to the remote node.
589
        pub := nodeID.SerializeCompressed()
260✔
590
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
260✔
591
        if nodeChanBucket == nil {
311✔
592
                return nil, nil
51✔
593
        }
51✔
594

595
        // Next, we'll need to go down an additional layer in order to retrieve
596
        // the channels for each chain the node knows of.
597
        var channels []*OpenChannel
209✔
598
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
418✔
599
                // If there's a value, it's not a bucket so ignore it.
209✔
600
                if v != nil {
209✔
601
                        return nil
×
602
                }
×
603

604
                // If we've found a valid chainhash bucket, then we'll retrieve
605
                // that so we can extract all the channels.
606
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
209✔
607
                if chainBucket == nil {
209✔
608
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
609
                                chainHash[:])
×
610
                }
×
611

612
                // Finally, we both of the necessary buckets retrieved, fetch
613
                // all the active channels related to this node.
614
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
209✔
615
                if err != nil {
209✔
616
                        return fmt.Errorf("unable to read channel for "+
×
617
                                "chain_hash=%x, node_key=%x: %v",
×
618
                                chainHash[:], pub, err)
×
619
                }
×
620

621
                channels = append(channels, nodeChannels...)
209✔
622
                return nil
209✔
623
        })
624

625
        return channels, err
209✔
626
}
627

628
// fetchNodeChannels retrieves all active channels from the target chainBucket
629
// which is under a node's dedicated channel bucket. This function is typically
630
// used to fetch all the active channels related to a particular node.
631
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
632
        []*OpenChannel, error) {
715✔
633

715✔
634
        var channels []*OpenChannel
715✔
635

715✔
636
        // A node may have channels on several chains, so for each known chain,
715✔
637
        // we'll extract all the channels.
715✔
638
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
1,491✔
639
                // If there's a value, it's not a bucket so ignore it.
776✔
640
                if v != nil {
776✔
641
                        return nil
×
642
                }
×
643

644
                // Once we've found a valid channel bucket, we'll extract it
645
                // from the node's chain bucket.
646
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
776✔
647

776✔
648
                var outPoint wire.OutPoint
776✔
649
                err := readOutpoint(bytes.NewReader(chanPoint), &outPoint)
776✔
650
                if err != nil {
776✔
651
                        return err
×
652
                }
×
653
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
776✔
654
                if err != nil {
776✔
655
                        return fmt.Errorf("unable to read channel data for "+
×
656
                                "chan_point=%v: %w", outPoint, err)
×
657
                }
×
658
                oChannel.Db = c
776✔
659

776✔
660
                channels = append(channels, oChannel)
776✔
661

776✔
662
                return nil
776✔
663
        })
664
        if err != nil {
715✔
665
                return nil, err
×
666
        }
×
667

668
        return channels, nil
715✔
669
}
670

671
// FetchChannel attempts to locate a channel specified by the passed channel
672
// point. If the channel cannot be found, then an error will be returned.
673
// Optionally an existing db tx can be supplied.
674
func (c *ChannelStateDB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (
675
        *OpenChannel, error) {
8✔
676

8✔
677
        var targetChanPoint bytes.Buffer
8✔
678
        if err := writeOutpoint(&targetChanPoint, &chanPoint); err != nil {
8✔
679
                return nil, err
×
680
        }
×
681

682
        targetChanPointBytes := targetChanPoint.Bytes()
8✔
683
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
8✔
684
                error) {
15✔
685

7✔
686
                return targetChanPointBytes, &chanPoint, nil
7✔
687
        }
7✔
688

689
        return c.channelScanner(tx, selector)
8✔
690
}
691

692
// FetchChannelByID attempts to locate a channel specified by the passed channel
693
// ID. If the channel cannot be found, then an error will be returned.
694
// Optionally an existing db tx can be supplied.
695
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
696
        *OpenChannel, error) {
2✔
697

2✔
698
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
2✔
699
                error) {
4✔
700

2✔
701
                var (
2✔
702
                        targetChanPointBytes []byte
2✔
703
                        targetChanPoint      *wire.OutPoint
2✔
704

2✔
705
                        // errChanFound is used to signal that the channel has
2✔
706
                        // been found so that iteration through the DB buckets
2✔
707
                        // can stop.
2✔
708
                        errChanFound = errors.New("channel found")
2✔
709
                )
2✔
710
                err := chainBkt.ForEach(func(k, _ []byte) error {
4✔
711
                        var outPoint wire.OutPoint
2✔
712
                        err := readOutpoint(bytes.NewReader(k), &outPoint)
2✔
713
                        if err != nil {
2✔
714
                                return err
×
715
                        }
×
716

717
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
2✔
718
                        if chanID != id {
3✔
719
                                return nil
1✔
720
                        }
1✔
721

722
                        targetChanPoint = &outPoint
1✔
723
                        targetChanPointBytes = k
1✔
724

1✔
725
                        return errChanFound
1✔
726
                })
727
                if err != nil && !errors.Is(err, errChanFound) {
2✔
728
                        return nil, nil, err
×
729
                }
×
730
                if targetChanPoint == nil {
3✔
731
                        return nil, nil, ErrChannelNotFound
1✔
732
                }
1✔
733

734
                return targetChanPointBytes, targetChanPoint, nil
1✔
735
        }
736

737
        return c.channelScanner(tx, selector)
2✔
738
}
739

740
// channelSelector describes a function that takes a chain-hash bucket from
741
// within the open-channel DB and returns the wanted channel point bytes, and
742
// channel point. It must return the ErrChannelNotFound error if the wanted
743
// channel is not in the given bucket.
744
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
745
        error)
746

747
// channelScanner will traverse the DB to each chain-hash bucket of each node
748
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
749
// to fetch the wanted channel outpoint from the chain bucket.
750
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
751
        chanSelect channelSelector) (*OpenChannel, error) {
10✔
752

10✔
753
        var (
10✔
754
                targetChan *OpenChannel
10✔
755

10✔
756
                // errChanFound is used to signal that the channel has been
10✔
757
                // found so that iteration through the DB buckets can stop.
10✔
758
                errChanFound = errors.New("channel found")
10✔
759
        )
10✔
760

10✔
761
        // chanScan will traverse the following bucket structure:
10✔
762
        //  * nodePub => chainHash => chanPoint
10✔
763
        //
10✔
764
        // At each level we go one further, ensuring that we're traversing the
10✔
765
        // proper key (that's actually a bucket). By only reading the bucket
10✔
766
        // structure and skipping fully decoding each channel, we save a good
10✔
767
        // bit of CPU as we don't need to do things like decompress public
10✔
768
        // keys.
10✔
769
        chanScan := func(tx kvdb.RTx) error {
20✔
770
                // Get the bucket dedicated to storing the metadata for open
10✔
771
                // channels.
10✔
772
                openChanBucket := tx.ReadBucket(openChannelBucket)
10✔
773
                if openChanBucket == nil {
10✔
774
                        return ErrNoActiveChannels
×
775
                }
×
776

777
                // Within the node channel bucket, are the set of node pubkeys
778
                // we have channels with, we don't know the entire set, so we'll
779
                // check them all.
780
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
19✔
781
                        // Ensure that this is a key the same size as a pubkey,
9✔
782
                        // and also that it leads directly to a bucket.
9✔
783
                        if len(nodePub) != 33 || v != nil {
9✔
784
                                return nil
×
785
                        }
×
786

787
                        nodeChanBucket := openChanBucket.NestedReadBucket(
9✔
788
                                nodePub,
9✔
789
                        )
9✔
790
                        if nodeChanBucket == nil {
9✔
791
                                return nil
×
792
                        }
×
793

794
                        // The next layer down is all the chains that this node
795
                        // has channels on with us.
796
                        return nodeChanBucket.ForEach(func(chainHash,
9✔
797
                                v []byte) error {
18✔
798

9✔
799
                                // If there's a value, it's not a bucket so
9✔
800
                                // ignore it.
9✔
801
                                if v != nil {
9✔
802
                                        return nil
×
803
                                }
×
804

805
                                chainBucket := nodeChanBucket.NestedReadBucket(
9✔
806
                                        chainHash,
9✔
807
                                )
9✔
808
                                if chainBucket == nil {
9✔
809
                                        return fmt.Errorf("unable to read "+
×
810
                                                "bucket for chain=%x",
×
811
                                                chainHash)
×
812
                                }
×
813

814
                                // Finally, we reach the leaf bucket that stores
815
                                // all the chanPoints for this node.
816
                                targetChanBytes, chanPoint, err := chanSelect(
9✔
817
                                        chainBucket,
9✔
818
                                )
9✔
819
                                if errors.Is(err, ErrChannelNotFound) {
10✔
820
                                        return nil
1✔
821
                                } else if err != nil {
9✔
822
                                        return err
×
823
                                }
×
824

825
                                chanBucket := chainBucket.NestedReadBucket(
8✔
826
                                        targetChanBytes,
8✔
827
                                )
8✔
828
                                if chanBucket == nil {
11✔
829
                                        return nil
3✔
830
                                }
3✔
831

832
                                channel, err := fetchOpenChannel(
5✔
833
                                        chanBucket, chanPoint,
5✔
834
                                )
5✔
835
                                if err != nil {
5✔
836
                                        return err
×
837
                                }
×
838

839
                                targetChan = channel
5✔
840
                                targetChan.Db = c
5✔
841

5✔
842
                                return errChanFound
5✔
843
                        })
844
                })
845
        }
846

847
        var err error
10✔
848
        if tx == nil {
20✔
849
                err = kvdb.View(c.backend, chanScan, func() {})
20✔
UNCOV
850
        } else {
×
UNCOV
851
                err = chanScan(tx)
×
UNCOV
852
        }
×
853
        if err != nil && !errors.Is(err, errChanFound) {
10✔
854
                return nil, err
×
855
        }
×
856

857
        if targetChan != nil {
15✔
858
                return targetChan, nil
5✔
859
        }
5✔
860

861
        // If we can't find the channel, then we return with an error, as we
862
        // have nothing to back up.
863
        return nil, ErrChannelNotFound
5✔
864
}
865

866
// FetchAllChannels attempts to retrieve all open channels currently stored
867
// within the database, including pending open, fully open and channels waiting
868
// for a closing transaction to confirm.
869
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
563✔
870
        return fetchChannels(c)
563✔
871
}
563✔
872

873
// FetchAllOpenChannels will return all channels that have the funding
874
// transaction confirmed, and is not waiting for a closing transaction to be
875
// confirmed.
876
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
405✔
877
        return fetchChannels(
405✔
878
                c,
405✔
879
                pendingChannelFilter(false),
405✔
880
                waitingCloseFilter(false),
405✔
881
        )
405✔
882
}
405✔
883

884
// FetchPendingChannels will return channels that have completed the process of
885
// generating and broadcasting funding transactions, but whose funding
886
// transactions have yet to be confirmed on the blockchain.
887
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
78✔
888
        return fetchChannels(c,
78✔
889
                pendingChannelFilter(true),
78✔
890
                waitingCloseFilter(false),
78✔
891
        )
78✔
892
}
78✔
893

894
// FetchWaitingCloseChannels will return all channels that have been opened,
895
// but are now waiting for a closing transaction to be confirmed.
896
//
897
// NOTE: This includes channels that are also pending to be opened.
898
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
1✔
899
        return fetchChannels(
1✔
900
                c, waitingCloseFilter(true),
1✔
901
        )
1✔
902
}
1✔
903

904
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
905
// A set of filters can be combined to filter across multiple dimensions.
906
type fetchChannelsFilter func(channel *OpenChannel) bool
907

908
// pendingChannelFilter returns a filter based on whether channels are pending
909
// (ie, their funding transaction still needs to confirm). If pending is false,
910
// channels with confirmed funding transactions are returned.
911
func pendingChannelFilter(pending bool) fetchChannelsFilter {
492✔
912
        return func(channel *OpenChannel) bool {
714✔
913
                return channel.IsPending == pending
222✔
914
        }
222✔
915
}
916

917
// waitingCloseFilter returns a filter which filters channels based on whether
918
// they are awaiting the confirmation of their closing transaction. If waiting
919
// close is true, channels that have had their closing tx broadcast are
920
// included. If it is false, channels that are not awaiting confirmation of
921
// their close transaction are returned.
922
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
490✔
923
        return func(channel *OpenChannel) bool {
698✔
924
                // If the channel is in any other state than Default,
208✔
925
                // then it means it is waiting to be closed.
208✔
926
                channelWaitingClose :=
208✔
927
                        channel.ChanStatus() != ChanStatusDefault
208✔
928

208✔
929
                // Include the channel if it matches the value for
208✔
930
                // waiting close that we are filtering on.
208✔
931
                return channelWaitingClose == waitingClose
208✔
932
        }
208✔
933
}
934

935
// fetchChannels attempts to retrieve channels currently stored in the
936
// database. It takes a set of filters which are applied to each channel to
937
// obtain a set of channels with the desired set of properties. Only channels
938
// which have a true value returned for *all* of the filters will be returned.
939
// If no filters are provided, every channel in the open channels bucket will
940
// be returned.
941
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
942
        []*OpenChannel, error) {
1,059✔
943

1,059✔
944
        var channels []*OpenChannel
1,059✔
945

1,059✔
946
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
2,118✔
947
                // Get the bucket dedicated to storing the metadata for open
1,059✔
948
                // channels.
1,059✔
949
                openChanBucket := tx.ReadBucket(openChannelBucket)
1,059✔
950
                if openChanBucket == nil {
1,059✔
951
                        return ErrNoActiveChannels
×
952
                }
×
953

954
                // Next, fetch the bucket dedicated to storing metadata related
955
                // to all nodes. All keys within this bucket are the serialized
956
                // public keys of all our direct counterparties.
957
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
1,059✔
958
                if nodeMetaBucket == nil {
1,059✔
959
                        return fmt.Errorf("node bucket not created")
×
960
                }
×
961

962
                // Finally for each node public key in the bucket, fetch all
963
                // the channels related to this particular node.
964
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
1,565✔
965
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
506✔
966
                        if nodeChanBucket == nil {
506✔
967
                                return nil
×
968
                        }
×
969

970
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
1,012✔
971
                                // If there's a value, it's not a bucket so
506✔
972
                                // ignore it.
506✔
973
                                if v != nil {
506✔
974
                                        return nil
×
975
                                }
×
976

977
                                // If we've found a valid chainhash bucket,
978
                                // then we'll retrieve that so we can extract
979
                                // all the channels.
980
                                chainBucket := nodeChanBucket.NestedReadBucket(
506✔
981
                                        chainHash,
506✔
982
                                )
506✔
983
                                if chainBucket == nil {
506✔
984
                                        return fmt.Errorf("unable to read "+
×
985
                                                "bucket for chain=%x", chainHash[:])
×
986
                                }
×
987

988
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
506✔
989
                                if err != nil {
506✔
990
                                        return fmt.Errorf("unable to read "+
×
991
                                                "channel for chain_hash=%x, "+
×
992
                                                "node_key=%x: %v", chainHash[:], k, err)
×
993
                                }
×
994
                                for _, channel := range nodeChans {
1,049✔
995
                                        // includeChannel indicates whether the channel
543✔
996
                                        // meets the criteria specified by our filters.
543✔
997
                                        includeChannel := true
543✔
998

543✔
999
                                        // Run through each filter and check whether the
543✔
1000
                                        // channel should be included.
543✔
1001
                                        for _, f := range filters {
973✔
1002
                                                // If the channel fails the filter, set
430✔
1003
                                                // includeChannel to false and don't bother
430✔
1004
                                                // checking the remaining filters.
430✔
1005
                                                if !f(channel) {
455✔
1006
                                                        includeChannel = false
25✔
1007
                                                        break
25✔
1008
                                                }
1009
                                        }
1010

1011
                                        // If the channel passed every filter, include it in
1012
                                        // our set of channels.
1013
                                        if includeChannel {
1,061✔
1014
                                                channels = append(channels, channel)
518✔
1015
                                        }
518✔
1016
                                }
1017
                                return nil
506✔
1018
                        })
1019

1020
                })
1021
        }, func() {
1,059✔
1022
                channels = nil
1,059✔
1023
        })
1,059✔
1024
        if err != nil {
1,059✔
1025
                return nil, err
×
1026
        }
×
1027

1028
        return channels, nil
1,059✔
1029
}
1030

1031
// FetchClosedChannels attempts to fetch all closed channels from the database.
1032
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1033
// be returned in the response or not. When a channel was cooperatively closed,
1034
// it becomes fully closed after a single confirmation.  When a channel was
1035
// forcibly closed, it will become fully closed after _all_ the pending funds
1036
// (if any) have been swept.
1037
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1038
        []*ChannelCloseSummary, error) {
500✔
1039

500✔
1040
        var chanSummaries []*ChannelCloseSummary
500✔
1041

500✔
1042
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1,000✔
1043
                closeBucket := tx.ReadBucket(closedChannelBucket)
500✔
1044
                if closeBucket == nil {
500✔
1045
                        return ErrNoClosedChannels
×
1046
                }
×
1047

1048
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
520✔
1049
                        summaryReader := bytes.NewReader(summaryBytes)
20✔
1050
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
20✔
1051
                        if err != nil {
20✔
1052
                                return err
×
1053
                        }
×
1054

1055
                        // If the query specified to only include pending
1056
                        // channels, then we'll skip any channels which aren't
1057
                        // currently pending.
1058
                        if !chanSummary.IsPending && pendingOnly {
21✔
1059
                                return nil
1✔
1060
                        }
1✔
1061

1062
                        chanSummaries = append(chanSummaries, chanSummary)
19✔
1063
                        return nil
19✔
1064
                })
1065
        }, func() {
500✔
1066
                chanSummaries = nil
500✔
1067
        }); err != nil {
500✔
1068
                return nil, err
×
1069
        }
×
1070

1071
        return chanSummaries, nil
500✔
1072
}
1073

1074
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1075
// the channeldb.
1076
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1077

1078
// FetchClosedChannel queries for a channel close summary using the channel
1079
// point of the channel in question.
1080
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1081
        *ChannelCloseSummary, error) {
3✔
1082

3✔
1083
        var chanSummary *ChannelCloseSummary
3✔
1084
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
6✔
1085
                closeBucket := tx.ReadBucket(closedChannelBucket)
3✔
1086
                if closeBucket == nil {
3✔
1087
                        return ErrClosedChannelNotFound
×
1088
                }
×
1089

1090
                var b bytes.Buffer
3✔
1091
                var err error
3✔
1092
                if err = writeOutpoint(&b, chanID); err != nil {
3✔
1093
                        return err
×
1094
                }
×
1095

1096
                summaryBytes := closeBucket.Get(b.Bytes())
3✔
1097
                if summaryBytes == nil {
4✔
1098
                        return ErrClosedChannelNotFound
1✔
1099
                }
1✔
1100

1101
                summaryReader := bytes.NewReader(summaryBytes)
2✔
1102
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
2✔
1103

2✔
1104
                return err
2✔
1105
        }, func() {
3✔
1106
                chanSummary = nil
3✔
1107
        }); err != nil {
4✔
1108
                return nil, err
1✔
1109
        }
1✔
1110

1111
        return chanSummary, nil
2✔
1112
}
1113

1114
// FetchClosedChannelForID queries for a channel close summary using the
1115
// channel ID of the channel in question.
1116
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1117
        *ChannelCloseSummary, error) {
102✔
1118

102✔
1119
        var chanSummary *ChannelCloseSummary
102✔
1120
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
204✔
1121
                closeBucket := tx.ReadBucket(closedChannelBucket)
102✔
1122
                if closeBucket == nil {
102✔
1123
                        return ErrClosedChannelNotFound
×
1124
                }
×
1125

1126
                // The first 30 bytes of the channel ID and outpoint will be
1127
                // equal.
1128
                cursor := closeBucket.ReadCursor()
102✔
1129
                op, c := cursor.Seek(cid[:30])
102✔
1130

102✔
1131
                // We scan over all possible candidates for this channel ID.
102✔
1132
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
5,354✔
1133
                        var outPoint wire.OutPoint
5,252✔
1134
                        err := readOutpoint(bytes.NewReader(op), &outPoint)
5,252✔
1135
                        if err != nil {
5,252✔
1136
                                return err
×
1137
                        }
×
1138

1139
                        // If the found outpoint does not correspond to this
1140
                        // channel ID, we continue.
1141
                        if !cid.IsChanPoint(&outPoint) {
10,403✔
1142
                                continue
5,151✔
1143
                        }
1144

1145
                        // Deserialize the close summary and return.
1146
                        r := bytes.NewReader(c)
101✔
1147
                        chanSummary, err = deserializeCloseChannelSummary(r)
101✔
1148
                        if err != nil {
101✔
1149
                                return err
×
1150
                        }
×
1151

1152
                        return nil
101✔
1153
                }
1154
                return ErrClosedChannelNotFound
1✔
1155
        }, func() {
102✔
1156
                chanSummary = nil
102✔
1157
        }); err != nil {
103✔
1158
                return nil, err
1✔
1159
        }
1✔
1160

1161
        return chanSummary, nil
101✔
1162
}
1163

1164
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1165
// channel should be marked as fully closed if the channel was initially
1166
// cooperatively closed and it's reached a single confirmation, or after all
1167
// the pending funds in a channel that has been forcibly closed have been
1168
// swept.
1169
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
7✔
1170
        var (
7✔
1171
                openChannels  []*OpenChannel
7✔
1172
                pruneLinkNode *btcec.PublicKey
7✔
1173
        )
7✔
1174
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
14✔
1175
                var b bytes.Buffer
7✔
1176
                if err := writeOutpoint(&b, chanPoint); err != nil {
7✔
1177
                        return err
×
1178
                }
×
1179

1180
                chanID := b.Bytes()
7✔
1181

7✔
1182
                closedChanBucket, err := tx.CreateTopLevelBucket(
7✔
1183
                        closedChannelBucket,
7✔
1184
                )
7✔
1185
                if err != nil {
7✔
1186
                        return err
×
1187
                }
×
1188

1189
                chanSummaryBytes := closedChanBucket.Get(chanID)
7✔
1190
                if chanSummaryBytes == nil {
7✔
1191
                        return fmt.Errorf("no closed channel for "+
×
1192
                                "chan_point=%v found", chanPoint)
×
1193
                }
×
1194

1195
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
7✔
1196
                chanSummary, err := deserializeCloseChannelSummary(
7✔
1197
                        chanSummaryReader,
7✔
1198
                )
7✔
1199
                if err != nil {
7✔
1200
                        return err
×
1201
                }
×
1202

1203
                chanSummary.IsPending = false
7✔
1204

7✔
1205
                var newSummary bytes.Buffer
7✔
1206
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
7✔
1207
                if err != nil {
7✔
1208
                        return err
×
1209
                }
×
1210

1211
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
7✔
1212
                if err != nil {
7✔
1213
                        return err
×
1214
                }
×
1215

1216
                // Now that the channel is closed, we'll check if we have any
1217
                // other open channels with this peer. If we don't we'll
1218
                // garbage collect it to ensure we don't establish persistent
1219
                // connections to peers without open channels.
1220
                pruneLinkNode = chanSummary.RemotePub
7✔
1221
                openChannels, err = c.fetchOpenChannels(
7✔
1222
                        tx, pruneLinkNode,
7✔
1223
                )
7✔
1224
                if err != nil {
7✔
1225
                        return fmt.Errorf("unable to fetch open channels for "+
×
1226
                                "peer %x: %v",
×
1227
                                pruneLinkNode.SerializeCompressed(), err)
×
1228
                }
×
1229

1230
                return nil
7✔
1231
        }, func() {
7✔
1232
                openChannels = nil
7✔
1233
                pruneLinkNode = nil
7✔
1234
        })
7✔
1235
        if err != nil {
7✔
1236
                return err
×
1237
        }
×
1238

1239
        // Decide whether we want to remove the link node, based upon the number
1240
        // of still open channels.
1241
        return c.pruneLinkNode(openChannels, pruneLinkNode)
7✔
1242
}
1243

1244
// pruneLinkNode determines whether we should garbage collect a link node from
1245
// the database due to no longer having any open channels with it. If there are
1246
// any left, then this acts as a no-op.
1247
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1248
        remotePub *btcec.PublicKey) error {
7✔
1249

7✔
1250
        if len(openChannels) > 0 {
7✔
UNCOV
1251
                return nil
×
UNCOV
1252
        }
×
1253

1254
        log.Infof("Pruning link node %x with zero open channels from database",
7✔
1255
                remotePub.SerializeCompressed())
7✔
1256

7✔
1257
        return c.linkNodeDB.DeleteLinkNode(remotePub)
7✔
1258
}
1259

1260
// PruneLinkNodes attempts to prune all link nodes found within the database
1261
// with whom we no longer have any open channels with.
UNCOV
1262
func (c *ChannelStateDB) PruneLinkNodes() error {
×
UNCOV
1263
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
×
UNCOV
1264
        if err != nil {
×
1265
                return err
×
1266
        }
×
1267

UNCOV
1268
        for _, linkNode := range allLinkNodes {
×
UNCOV
1269
                var (
×
UNCOV
1270
                        openChannels []*OpenChannel
×
UNCOV
1271
                        linkNode     = linkNode
×
UNCOV
1272
                )
×
UNCOV
1273
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
×
UNCOV
1274
                        var err error
×
UNCOV
1275
                        openChannels, err = c.fetchOpenChannels(
×
UNCOV
1276
                                tx, linkNode.IdentityPub,
×
UNCOV
1277
                        )
×
UNCOV
1278
                        return err
×
UNCOV
1279
                }, func() {
×
UNCOV
1280
                        openChannels = nil
×
UNCOV
1281
                })
×
UNCOV
1282
                if err != nil {
×
1283
                        return err
×
1284
                }
×
1285

UNCOV
1286
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
×
UNCOV
1287
                if err != nil {
×
1288
                        return err
×
1289
                }
×
1290
        }
1291

UNCOV
1292
        return nil
×
1293
}
1294

1295
// ChannelShell is a shell of a channel that is meant to be used for channel
1296
// recovery purposes. It contains a minimal OpenChannel instance along with
1297
// addresses for that target node.
1298
type ChannelShell struct {
1299
        // NodeAddrs the set of addresses that this node has known to be
1300
        // reachable at in the past.
1301
        NodeAddrs []net.Addr
1302

1303
        // Chan is a shell of an OpenChannel, it contains only the items
1304
        // required to restore the channel on disk.
1305
        Chan *OpenChannel
1306
}
1307

1308
// RestoreChannelShells is a method that allows the caller to reconstruct the
1309
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1310
// new channel to disk, create a LinkNode instance with the passed node
1311
// addresses, and finally create an edge within the graph for the channel as
1312
// well. This method is idempotent, so repeated calls with the same set of
1313
// channel shells won't modify the database after the initial call.
1314
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
1✔
1315
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2✔
1316
                for _, channelShell := range channelShells {
2✔
1317
                        channel := channelShell.Chan
1✔
1318

1✔
1319
                        // When we make a channel, we mark that the channel has
1✔
1320
                        // been restored, this will signal to other sub-systems
1✔
1321
                        // to not attempt to use the channel as if it was a
1✔
1322
                        // regular one.
1✔
1323
                        channel.chanStatus |= ChanStatusRestored
1✔
1324

1✔
1325
                        // First, we'll attempt to create a new open channel
1✔
1326
                        // and link node for this channel. If the channel
1✔
1327
                        // already exists, then in order to ensure this method
1✔
1328
                        // is idempotent, we'll continue to the next step.
1✔
1329
                        channel.Db = c
1✔
1330
                        err := syncNewChannel(
1✔
1331
                                tx, channel, channelShell.NodeAddrs,
1✔
1332
                        )
1✔
1333
                        if err != nil {
1✔
UNCOV
1334
                                return err
×
UNCOV
1335
                        }
×
1336
                }
1337

1338
                return nil
1✔
1339
        }, func() {})
1✔
1340
        if err != nil {
1✔
UNCOV
1341
                return err
×
UNCOV
1342
        }
×
1343

1344
        return nil
1✔
1345
}
1346

1347
// AddrsForNode consults the graph and channel database for all addresses known
1348
// to the passed node public key.
1349
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr,
1350
        error) {
1✔
1351

1✔
1352
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
1✔
1353
        if err != nil {
1✔
1354
                return nil, err
×
1355
        }
×
1356

1357
        // We'll also query the graph for this peer to see if they have any
1358
        // addresses that we don't currently have stored within the link node
1359
        // database.
1360
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
1✔
1361
        if err != nil {
1✔
1362
                return nil, err
×
1363
        }
×
1364
        graphNode, err := d.graph.FetchLightningNode(pubKey)
1✔
1365
        if err != nil && err != ErrGraphNodeNotFound {
1✔
1366
                return nil, err
×
1367
        } else if err == ErrGraphNodeNotFound {
1✔
UNCOV
1368
                // If the node isn't found, then that's OK, as we still have the
×
UNCOV
1369
                // link node data. But any other error needs to be returned.
×
UNCOV
1370
                graphNode = &LightningNode{}
×
UNCOV
1371
        }
×
1372

1373
        // Now that we have both sources of addrs for this node, we'll use a
1374
        // map to de-duplicate any addresses between the two sources, and
1375
        // produce a final list of the combined addrs.
1376
        addrs := make(map[string]net.Addr)
1✔
1377
        for _, addr := range linkNode.Addresses {
2✔
1378
                addrs[addr.String()] = addr
1✔
1379
        }
1✔
1380
        for _, addr := range graphNode.Addresses {
2✔
1381
                addrs[addr.String()] = addr
1✔
1382
        }
1✔
1383
        dedupedAddrs := make([]net.Addr, 0, len(addrs))
1✔
1384
        for _, addr := range addrs {
3✔
1385
                dedupedAddrs = append(dedupedAddrs, addr)
2✔
1386
        }
2✔
1387

1388
        return dedupedAddrs, nil
1✔
1389
}
1390

1391
// AbandonChannel attempts to remove the target channel from the open channel
1392
// database. If the channel was already removed (has a closed channel entry),
1393
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1394
// into the database.
1395
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1396
        bestHeight uint32) error {
4✔
1397

4✔
1398
        // With the chanPoint constructed, we'll attempt to find the target
4✔
1399
        // channel in the database. If we can't find the channel, then we'll
4✔
1400
        // return the error back to the caller.
4✔
1401
        dbChan, err := c.FetchChannel(nil, *chanPoint)
4✔
1402
        switch {
4✔
1403
        // If the channel wasn't found, then it's possible that it was already
1404
        // abandoned from the database.
1405
        case err == ErrChannelNotFound:
2✔
1406
                _, closedErr := c.FetchClosedChannel(chanPoint)
2✔
1407
                if closedErr != nil {
3✔
1408
                        return closedErr
1✔
1409
                }
1✔
1410

1411
                // If the channel was already closed, then we don't return an
1412
                // error as we'd like this step to be repeatable.
1413
                return nil
1✔
1414
        case err != nil:
×
1415
                return err
×
1416
        }
1417

1418
        // Now that we've found the channel, we'll populate a close summary for
1419
        // the channel, so we can store as much information for this abounded
1420
        // channel as possible. We also ensure that we set Pending to false, to
1421
        // indicate that this channel has been "fully" closed.
1422
        summary := &ChannelCloseSummary{
2✔
1423
                CloseType:               Abandoned,
2✔
1424
                ChanPoint:               *chanPoint,
2✔
1425
                ChainHash:               dbChan.ChainHash,
2✔
1426
                CloseHeight:             bestHeight,
2✔
1427
                RemotePub:               dbChan.IdentityPub,
2✔
1428
                Capacity:                dbChan.Capacity,
2✔
1429
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
2✔
1430
                ShortChanID:             dbChan.ShortChanID(),
2✔
1431
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
2✔
1432
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
2✔
1433
                LocalChanConfig:         dbChan.LocalChanCfg,
2✔
1434
        }
2✔
1435

2✔
1436
        // Finally, we'll close the channel in the DB, and return back to the
2✔
1437
        // caller. We set ourselves as the close initiator because we abandoned
2✔
1438
        // the channel.
2✔
1439
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
2✔
1440
}
1441

1442
// SaveChannelOpeningState saves the serialized channel state for the provided
1443
// chanPoint to the channelOpeningStateBucket.
1444
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1445
        serializedState []byte) error {
92✔
1446

92✔
1447
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
184✔
1448
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
92✔
1449
                if err != nil {
92✔
1450
                        return err
×
1451
                }
×
1452

1453
                return bucket.Put(outPoint, serializedState)
92✔
1454
        }, func() {})
92✔
1455
}
1456

1457
// GetChannelOpeningState fetches the serialized channel state for the provided
1458
// outPoint from the database, or returns ErrChannelNotFound if the channel
1459
// is not found.
1460
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1461
        error) {
252✔
1462

252✔
1463
        var serializedState []byte
252✔
1464
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
504✔
1465
                bucket := tx.ReadBucket(channelOpeningStateBucket)
252✔
1466
                if bucket == nil {
252✔
UNCOV
1467
                        // If the bucket does not exist, it means we never added
×
UNCOV
1468
                        //  a channel to the db, so return ErrChannelNotFound.
×
UNCOV
1469
                        return ErrChannelNotFound
×
UNCOV
1470
                }
×
1471

1472
                stateBytes := bucket.Get(outPoint)
252✔
1473
                if stateBytes == nil {
299✔
1474
                        return ErrChannelNotFound
47✔
1475
                }
47✔
1476

1477
                serializedState = append(serializedState, stateBytes...)
205✔
1478

205✔
1479
                return nil
205✔
1480
        }, func() {
252✔
1481
                serializedState = nil
252✔
1482
        })
252✔
1483
        return serializedState, err
252✔
1484
}
1485

1486
// DeleteChannelOpeningState removes any state for outPoint from the database.
1487
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
24✔
1488
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
48✔
1489
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
24✔
1490
                if bucket == nil {
24✔
1491
                        return ErrChannelNotFound
×
1492
                }
×
1493

1494
                return bucket.Delete(outPoint)
24✔
1495
        }, func() {})
24✔
1496
}
1497

1498
// syncVersions function is used for safe db version synchronization. It
1499
// applies migration functions to the current database and recovers the
1500
// previous state of db if at least one error/panic appeared during migration.
1501
func (d *DB) syncVersions(versions []mandatoryVersion) error {
1,742✔
1502
        meta, err := d.FetchMeta()
1,742✔
1503
        if err != nil {
1,742✔
1504
                if err == ErrMetaNotFound {
×
1505
                        meta = &Meta{}
×
1506
                } else {
×
1507
                        return err
×
1508
                }
×
1509
        }
1510

1511
        latestVersion := getLatestDBVersion(versions)
1,742✔
1512
        log.Infof("Checking for schema update: latest_version=%v, "+
1,742✔
1513
                "db_version=%v", latestVersion, meta.DbVersionNumber)
1,742✔
1514

1,742✔
1515
        switch {
1,742✔
1516

1517
        // If the database reports a higher version that we are aware of, the
1518
        // user is probably trying to revert to a prior version of lnd. We fail
1519
        // here to prevent reversions and unintended corruption.
1520
        case meta.DbVersionNumber > latestVersion:
1✔
1521
                log.Errorf("Refusing to revert from db_version=%d to "+
1✔
1522
                        "lower version=%d", meta.DbVersionNumber,
1✔
1523
                        latestVersion)
1✔
1524
                return ErrDBReversion
1✔
1525

1526
        // If the current database version matches the latest version number,
1527
        // then we don't need to perform any migrations.
1528
        case meta.DbVersionNumber == latestVersion:
1,737✔
1529
                return nil
1,737✔
1530
        }
1531

1532
        log.Infof("Performing database schema migration")
4✔
1533

4✔
1534
        // Otherwise, we fetch the migrations which need to applied, and
4✔
1535
        // execute them serially within a single database transaction to ensure
4✔
1536
        // the migration is atomic.
4✔
1537
        migrations, migrationVersions := getMigrationsToApply(
4✔
1538
                versions, meta.DbVersionNumber,
4✔
1539
        )
4✔
1540
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
8✔
1541
                for i, migration := range migrations {
8✔
1542
                        if migration == nil {
4✔
1543
                                continue
×
1544
                        }
1545

1546
                        log.Infof("Applying migration #%v",
4✔
1547
                                migrationVersions[i])
4✔
1548

4✔
1549
                        if err := migration(tx); err != nil {
5✔
1550
                                log.Infof("Unable to apply migration #%v",
1✔
1551
                                        migrationVersions[i])
1✔
1552
                                return err
1✔
1553
                        }
1✔
1554
                }
1555

1556
                meta.DbVersionNumber = latestVersion
2✔
1557
                err := putMeta(meta, tx)
2✔
1558
                if err != nil {
2✔
1559
                        return err
×
1560
                }
×
1561

1562
                // In dry-run mode, return an error to prevent the transaction
1563
                // from committing.
1564
                if d.dryRun {
3✔
1565
                        return ErrDryRunMigrationOK
1✔
1566
                }
1✔
1567

1568
                return nil
1✔
1569
        }, func() {})
4✔
1570
}
1571

1572
// applyOptionalVersions takes a config to determine whether the optional
1573
// migrations will be applied.
1574
//
1575
// NOTE: only support the prune_revocation_log optional migration atm.
1576
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
1,740✔
1577
        // TODO(yy): need to design the db to support dry run for optional
1,740✔
1578
        // migrations.
1,740✔
1579
        if d.dryRun {
1,741✔
1580
                log.Info("Skipped optional migrations as dry run mode is not " +
1✔
1581
                        "supported yet")
1✔
1582
                return nil
1✔
1583
        }
1✔
1584

1585
        om, err := d.fetchOptionalMeta()
1,739✔
1586
        if err != nil {
1,739✔
1587
                if err == ErrMetaNotFound {
×
1588
                        om = &OptionalMeta{
×
1589
                                Versions: make(map[uint64]string),
×
1590
                        }
×
1591
                } else {
×
1592
                        return err
×
1593
                }
×
1594
        }
1595

1596
        log.Infof("Checking for optional update: prune_revocation_log=%v, "+
1,739✔
1597
                "db_version=%s", cfg.PruneRevocationLog, om)
1,739✔
1598

1,739✔
1599
        // Exit early if the optional migration is not specified.
1,739✔
1600
        if !cfg.PruneRevocationLog {
3,476✔
1601
                return nil
1,737✔
1602
        }
1,737✔
1603

1604
        // Exit early if the optional migration has already been applied.
1605
        if _, ok := om.Versions[0]; ok {
3✔
1606
                return nil
1✔
1607
        }
1✔
1608

1609
        // Get the optional version.
1610
        version := optionalVersions[0]
1✔
1611
        log.Infof("Performing database optional migration: %s", version.name)
1✔
1612

1✔
1613
        migrationCfg := &MigrationConfigImpl{
1✔
1614
                migration30.MigrateRevLogConfigImpl{
1✔
1615
                        NoAmountData: d.noRevLogAmtData,
1✔
1616
                },
1✔
1617
        }
1✔
1618

1✔
1619
        // Migrate the data.
1✔
1620
        if err := version.migration(d, migrationCfg); err != nil {
1✔
1621
                log.Errorf("Unable to apply optional migration: %s, error: %v",
×
1622
                        version.name, err)
×
1623
                return err
×
1624
        }
×
1625

1626
        // Update the optional meta. Notice that unlike the mandatory db
1627
        // migrations where we perform the migration and updating meta in a
1628
        // single db transaction, we use different transactions here. Even when
1629
        // the following update is failed, we should be fine here as we would
1630
        // re-run the optional migration again, which is a noop, during next
1631
        // startup.
1632
        om.Versions[0] = version.name
1✔
1633
        if err := d.putOptionalMeta(om); err != nil {
1✔
1634
                log.Errorf("Unable to update optional meta: %v", err)
×
1635
                return err
×
1636
        }
×
1637

1638
        return nil
1✔
1639
}
1640

1641
// ChannelGraph returns the current instance of the directed channel graph.
1642
func (d *DB) ChannelGraph() *ChannelGraph {
34✔
1643
        return d.graph
34✔
1644
}
34✔
1645

1646
// ChannelStateDB returns the sub database that is concerned with the channel
1647
// state.
1648
func (d *DB) ChannelStateDB() *ChannelStateDB {
2,157✔
1649
        return d.channelStateDB
2,157✔
1650
}
2,157✔
1651

1652
// LatestDBVersion returns the number of the latest database version currently
1653
// known to the channel DB.
1654
func LatestDBVersion() uint32 {
1✔
1655
        return getLatestDBVersion(dbVersions)
1✔
1656
}
1✔
1657

1658
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
3,447✔
1659
        return versions[len(versions)-1].number
3,447✔
1660
}
3,447✔
1661

1662
// getMigrationsToApply retrieves the migration function that should be
1663
// applied to the database.
1664
func getMigrationsToApply(versions []mandatoryVersion,
1665
        version uint32) ([]migration, []uint32) {
5✔
1666

5✔
1667
        migrations := make([]migration, 0, len(versions))
5✔
1668
        migrationVersions := make([]uint32, 0, len(versions))
5✔
1669

5✔
1670
        for _, v := range versions {
17✔
1671
                if v.number > version {
18✔
1672
                        migrations = append(migrations, v.migration)
6✔
1673
                        migrationVersions = append(migrationVersions, v.number)
6✔
1674
                }
6✔
1675
        }
1676

1677
        return migrations, migrationVersions
5✔
1678
}
1679

1680
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1681
// from the historical channel bucket. If the bucket does not exist,
1682
// ErrNoHistoricalBucket is returned.
1683
func fetchHistoricalChanBucket(tx kvdb.RTx,
1684
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
4✔
1685

4✔
1686
        // First fetch the top level bucket which stores all data related to
4✔
1687
        // historically stored channels.
4✔
1688
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
4✔
1689
        if historicalChanBucket == nil {
4✔
1690
                return nil, ErrNoHistoricalBucket
×
1691
        }
×
1692

1693
        // With the bucket for the node and chain fetched, we can now go down
1694
        // another level, for the channel itself.
1695
        var chanPointBuf bytes.Buffer
4✔
1696
        if err := writeOutpoint(&chanPointBuf, outPoint); err != nil {
4✔
1697
                return nil, err
×
1698
        }
×
1699
        chanBucket := historicalChanBucket.NestedReadBucket(
4✔
1700
                chanPointBuf.Bytes(),
4✔
1701
        )
4✔
1702
        if chanBucket == nil {
6✔
1703
                return nil, ErrChannelNotFound
2✔
1704
        }
2✔
1705

1706
        return chanBucket, nil
2✔
1707
}
1708

1709
// FetchHistoricalChannel fetches open channel data from the historical channel
1710
// bucket.
1711
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1712
        *OpenChannel, error) {
4✔
1713

4✔
1714
        var channel *OpenChannel
4✔
1715
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
8✔
1716
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
4✔
1717
                if err != nil {
6✔
1718
                        return err
2✔
1719
                }
2✔
1720

1721
                channel, err = fetchOpenChannel(chanBucket, outPoint)
2✔
1722
                if err != nil {
2✔
1723
                        return err
×
1724
                }
×
1725

1726
                channel.Db = c
2✔
1727
                return nil
2✔
1728
        }, func() {
4✔
1729
                channel = nil
4✔
1730
        })
4✔
1731
        if err != nil {
6✔
1732
                return nil, err
2✔
1733
        }
2✔
1734

1735
        return channel, nil
2✔
1736
}
1737

1738
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1739
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
10✔
1740

10✔
1741
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
10✔
1742
        if finalHtlcsBucket == nil {
16✔
1743
                return nil, ErrFinalHtlcsBucketNotFound
6✔
1744
        }
6✔
1745

1746
        var chanIDBytes [8]byte
4✔
1747
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
4✔
1748

4✔
1749
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
4✔
1750
        if chanBucket == nil {
4✔
1751
                return nil, ErrFinalChannelBucketNotFound
×
1752
        }
×
1753

1754
        return chanBucket, nil
4✔
1755
}
1756

1757
var ErrHtlcUnknown = errors.New("htlc unknown")
1758

1759
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1760
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1761
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1762
        htlcIndex uint64) (*FinalHtlcInfo, error) {
10✔
1763

10✔
1764
        var idBytes [8]byte
10✔
1765
        byteOrder.PutUint64(idBytes[:], htlcIndex)
10✔
1766

10✔
1767
        var settledByte byte
10✔
1768

10✔
1769
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
20✔
1770
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
10✔
1771
                        tx, chanID,
10✔
1772
                )
10✔
1773
                switch {
10✔
1774
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
6✔
1775
                        fallthrough
6✔
1776

1777
                case errors.Is(err, ErrFinalChannelBucketNotFound):
6✔
1778
                        return ErrHtlcUnknown
6✔
1779

1780
                case err != nil:
×
1781
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1782
                                err)
×
1783
                }
1784

1785
                value := finalHtlcsBucket.Get(idBytes[:])
4✔
1786
                if value == nil {
5✔
1787
                        return ErrHtlcUnknown
1✔
1788
                }
1✔
1789

1790
                if len(value) != 1 {
3✔
1791
                        return errors.New("unexpected final htlc value length")
×
1792
                }
×
1793

1794
                settledByte = value[0]
3✔
1795

3✔
1796
                return nil
3✔
1797
        }, func() {
10✔
1798
                settledByte = 0
10✔
1799
        })
10✔
1800
        if err != nil {
17✔
1801
                return nil, err
7✔
1802
        }
7✔
1803

1804
        info := FinalHtlcInfo{
3✔
1805
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
3✔
1806
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
3✔
1807
        }
3✔
1808

3✔
1809
        return &info, nil
3✔
1810
}
1811

1812
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
1813
// the database.
1814
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
1815
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
1✔
1816

1✔
1817
        // Skip if the user did not opt in to storing final resolutions.
1✔
1818
        if !c.parent.storeFinalHtlcResolutions {
1✔
UNCOV
1819
                return nil
×
UNCOV
1820
        }
×
1821

1822
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2✔
1823
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
1✔
1824
                if err != nil {
1✔
1825
                        return err
×
1826
                }
×
1827

1828
                return putFinalHtlc(
1✔
1829
                        finalHtlcsBucket, htlcID,
1✔
1830
                        FinalHtlcInfo{
1✔
1831
                                Settled:  settled,
1✔
1832
                                Offchain: false,
1✔
1833
                        },
1✔
1834
                )
1✔
1835
        }, func() {})
1✔
1836
}
1837

1838
// MakeTestInvoiceDB is used to create a test invoice database for testing
1839
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
1840
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
1841
        invoices.InvoiceDB, error) {
151✔
1842

151✔
1843
        return MakeTestDB(t, modifiers...)
151✔
1844
}
151✔
1845

1846
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
1847
// A callback which cleans up the created temporary directories is also
1848
// returned and intended to be executed after the test completes.
1849
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
283✔
1850
        // First, create a temporary directory to be used for the duration of
283✔
1851
        // this test.
283✔
1852
        tempDirName := t.TempDir()
283✔
1853

283✔
1854
        // Next, create channeldb for the first time.
283✔
1855
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
283✔
1856
        if err != nil {
283✔
1857
                backendCleanup()
×
1858
                return nil, err
×
1859
        }
×
1860

1861
        cdb, err := CreateWithBackend(backend, modifiers...)
283✔
1862
        if err != nil {
283✔
1863
                backendCleanup()
×
1864
                return nil, err
×
1865
        }
×
1866

1867
        t.Cleanup(func() {
566✔
1868
                cdb.Close()
283✔
1869
                backendCleanup()
283✔
1870
        })
283✔
1871

1872
        return cdb, nil
283✔
1873
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc