• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11170835610

03 Oct 2024 10:41PM UTC coverage: 49.188% (-9.6%) from 58.738%
11170835610

push

github

web-flow
Merge pull request #9154 from ziggie1984/master

multi: bump btcd version.

3 of 6 new or added lines in 6 files covered. (50.0%)

26110 existing lines in 428 files now uncovered.

97359 of 197934 relevant lines covered (49.19%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.58
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "net"
8
        "os"
9
        "testing"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/go-errors/errors"
15
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
16
        "github.com/lightningnetwork/lnd/channeldb/migration12"
17
        "github.com/lightningnetwork/lnd/channeldb/migration13"
18
        "github.com/lightningnetwork/lnd/channeldb/migration16"
19
        "github.com/lightningnetwork/lnd/channeldb/migration20"
20
        "github.com/lightningnetwork/lnd/channeldb/migration21"
21
        "github.com/lightningnetwork/lnd/channeldb/migration23"
22
        "github.com/lightningnetwork/lnd/channeldb/migration24"
23
        "github.com/lightningnetwork/lnd/channeldb/migration25"
24
        "github.com/lightningnetwork/lnd/channeldb/migration26"
25
        "github.com/lightningnetwork/lnd/channeldb/migration27"
26
        "github.com/lightningnetwork/lnd/channeldb/migration29"
27
        "github.com/lightningnetwork/lnd/channeldb/migration30"
28
        "github.com/lightningnetwork/lnd/channeldb/migration31"
29
        "github.com/lightningnetwork/lnd/channeldb/migration32"
30
        "github.com/lightningnetwork/lnd/channeldb/migration33"
31
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
32
        "github.com/lightningnetwork/lnd/clock"
33
        "github.com/lightningnetwork/lnd/invoices"
34
        "github.com/lightningnetwork/lnd/kvdb"
35
        "github.com/lightningnetwork/lnd/lnwire"
36
        "github.com/lightningnetwork/lnd/routing/route"
37
)
38

39
const (
40
        dbName = "channel.db"
41
)
42

43
var (
44
        // ErrDryRunMigrationOK signals that a migration executed successful,
45
        // but we intentionally did not commit the result.
46
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
47

48
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
49
        // bucket does not exist.
50
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
51
                "found")
52

53
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
54
        // final htlc outcomes does not exist.
55
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
56
                "bucket not found")
57
)
58

59
// migration is a function which takes a prior outdated version of the database
60
// instances and mutates the key/bucket structure to arrive at a more
61
// up-to-date version of the database.
62
type migration func(tx kvdb.RwTx) error
63

64
// mandatoryVersion defines a db version that must be applied before the lnd
65
// starts.
66
type mandatoryVersion struct {
67
        number    uint32
68
        migration migration
69
}
70

71
// MigrationConfig is an interface combines the config interfaces of all
72
// optional migrations.
73
type MigrationConfig interface {
74
        migration30.MigrateRevLogConfig
75
}
76

77
// MigrationConfigImpl is a super set of all the various migration configs and
78
// an implementation of MigrationConfig.
79
type MigrationConfigImpl struct {
80
        migration30.MigrateRevLogConfigImpl
81
}
82

83
// optionalMigration defines an optional migration function. When a migration
84
// is optional, it usually involves a large scale of changes that might touch
85
// millions of keys. Due to OOM concern, the update cannot be safely done
86
// within one db transaction. Thus, for optional migrations, they must take the
87
// db backend and construct transactions as needed.
88
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
89

90
// optionalVersion defines a db version that can be optionally applied. When
91
// applying migrations, we must apply all the mandatory migrations first before
92
// attempting optional ones.
93
type optionalVersion struct {
94
        name      string
95
        migration optionalMigration
96
}
97

98
var (
99
        // dbVersions is storing all mandatory versions of database. If current
100
        // version of database don't match with latest version this list will
101
        // be used for retrieving all migration function that are need to apply
102
        // to the current db.
103
        dbVersions = []mandatoryVersion{
104
                {
105
                        // The base DB version requires no migration.
106
                        number:    0,
107
                        migration: nil,
108
                },
109
                {
110
                        // The version of the database where two new indexes
111
                        // for the update time of node and channel updates were
112
                        // added.
113
                        number:    1,
114
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
115
                },
116
                {
117
                        // The DB version that added the invoice event time
118
                        // series.
119
                        number:    2,
120
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
121
                },
122
                {
123
                        // The DB version that updated the embedded invoice in
124
                        // outgoing payments to match the new format.
125
                        number:    3,
126
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
127
                },
128
                {
129
                        // The version of the database where every channel
130
                        // always has two entries in the edges bucket. If
131
                        // a policy is unknown, this will be represented
132
                        // by a special byte sequence.
133
                        number:    4,
134
                        migration: migration_01_to_11.MigrateEdgePolicies,
135
                },
136
                {
137
                        // The DB version where we persist each attempt to send
138
                        // an HTLC to a payment hash, and track whether the
139
                        // payment is in-flight, succeeded, or failed.
140
                        number:    5,
141
                        migration: migration_01_to_11.PaymentStatusesMigration,
142
                },
143
                {
144
                        // The DB version that properly prunes stale entries
145
                        // from the edge update index.
146
                        number:    6,
147
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
148
                },
149
                {
150
                        // The DB version that migrates the ChannelCloseSummary
151
                        // to a format where optional fields are indicated with
152
                        // boolean flags.
153
                        number:    7,
154
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
155
                },
156
                {
157
                        // The DB version that changes the gossiper's message
158
                        // store keys to account for the message's type and
159
                        // ShortChannelID.
160
                        number:    8,
161
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
162
                },
163
                {
164
                        // The DB version where the payments and payment
165
                        // statuses are moved to being stored in a combined
166
                        // bucket.
167
                        number:    9,
168
                        migration: migration_01_to_11.MigrateOutgoingPayments,
169
                },
170
                {
171
                        // The DB version where we started to store legacy
172
                        // payload information for all routes, as well as the
173
                        // optional TLV records.
174
                        number:    10,
175
                        migration: migration_01_to_11.MigrateRouteSerialization,
176
                },
177
                {
178
                        // Add invoice htlc and cltv delta fields.
179
                        number:    11,
180
                        migration: migration_01_to_11.MigrateInvoices,
181
                },
182
                {
183
                        // Migrate to TLV invoice bodies, add payment address
184
                        // and features, remove receipt.
185
                        number:    12,
186
                        migration: migration12.MigrateInvoiceTLV,
187
                },
188
                {
189
                        // Migrate to multi-path payments.
190
                        number:    13,
191
                        migration: migration13.MigrateMPP,
192
                },
193
                {
194
                        // Initialize payment address index and begin using it
195
                        // as the default index, falling back to payment hash
196
                        // index.
197
                        number:    14,
198
                        migration: mig.CreateTLB(payAddrIndexBucket),
199
                },
200
                {
201
                        // Initialize payment index bucket which will be used
202
                        // to index payments by sequence number. This index will
203
                        // be used to allow more efficient ListPayments queries.
204
                        number:    15,
205
                        migration: mig.CreateTLB(paymentsIndexBucket),
206
                },
207
                {
208
                        // Add our existing payments to the index bucket created
209
                        // in migration 15.
210
                        number:    16,
211
                        migration: migration16.MigrateSequenceIndex,
212
                },
213
                {
214
                        // Create a top level bucket which will store extra
215
                        // information about channel closes.
216
                        number:    17,
217
                        migration: mig.CreateTLB(closeSummaryBucket),
218
                },
219
                {
220
                        // Create a top level bucket which holds information
221
                        // about our peers.
222
                        number:    18,
223
                        migration: mig.CreateTLB(peersBucket),
224
                },
225
                {
226
                        // Create a top level bucket which holds outpoint
227
                        // information.
228
                        number:    19,
229
                        migration: mig.CreateTLB(outpointBucket),
230
                },
231
                {
232
                        // Migrate some data to the outpoint index.
233
                        number:    20,
234
                        migration: migration20.MigrateOutpointIndex,
235
                },
236
                {
237
                        // Migrate to length prefixed wire messages everywhere
238
                        // in the database.
239
                        number:    21,
240
                        migration: migration21.MigrateDatabaseWireMessages,
241
                },
242
                {
243
                        // Initialize set id index so that invoices can be
244
                        // queried by individual htlc sets.
245
                        number:    22,
246
                        migration: mig.CreateTLB(setIDIndexBucket),
247
                },
248
                {
249
                        number:    23,
250
                        migration: migration23.MigrateHtlcAttempts,
251
                },
252
                {
253
                        // Remove old forwarding packages of closed channels.
254
                        number:    24,
255
                        migration: migration24.MigrateFwdPkgCleanup,
256
                },
257
                {
258
                        // Save the initial local/remote balances in channel
259
                        // info.
260
                        number:    25,
261
                        migration: migration25.MigrateInitialBalances,
262
                },
263
                {
264
                        // Migrate the initial local/remote balance fields into
265
                        // tlv records.
266
                        number:    26,
267
                        migration: migration26.MigrateBalancesToTlvRecords,
268
                },
269
                {
270
                        // Patch the initial local/remote balance fields with
271
                        // empty values for historical channels.
272
                        number:    27,
273
                        migration: migration27.MigrateHistoricalBalances,
274
                },
275
                {
276
                        number:    28,
277
                        migration: mig.CreateTLB(chanIDBucket),
278
                },
279
                {
280
                        number:    29,
281
                        migration: migration29.MigrateChanID,
282
                },
283
                {
284
                        // Removes the "sweeper-last-tx" bucket. Although we
285
                        // do not have a mandatory version 30 we skip this
286
                        // version because its naming is already used for the
287
                        // first optional migration.
288
                        number:    31,
289
                        migration: migration31.DeleteLastPublishedTxTLB,
290
                },
291
                {
292
                        number:    32,
293
                        migration: migration32.MigrateMCRouteSerialisation,
294
                },
295
                {
296
                        number:    33,
297
                        migration: migration33.MigrateMCStoreNameSpacedResults,
298
                },
299
        }
300

301
        // optionalVersions stores all optional migrations that are applied
302
        // after dbVersions.
303
        //
304
        // NOTE: optional migrations must be fault-tolerant and re-run already
305
        // migrated data must be noop, which means the migration must be able
306
        // to determine its state.
307
        optionalVersions = []optionalVersion{
308
                {
309
                        name: "prune revocation log",
310
                        migration: func(db kvdb.Backend,
311
                                cfg MigrationConfig) error {
×
312

×
313
                                return migration30.MigrateRevocationLog(db, cfg)
×
314
                        },
×
315
                },
316
        }
317

318
        // Big endian is the preferred byte order, due to cursor scans over
319
        // integer keys iterating in order.
320
        byteOrder = binary.BigEndian
321

322
        // channelOpeningStateBucket is the database bucket used to store the
323
        // channelOpeningState for each channel that is currently in the process
324
        // of being opened.
325
        channelOpeningStateBucket = []byte("channelOpeningState")
326
)
327

328
// DB is the primary datastore for the lnd daemon. The database stores
329
// information related to nodes, routing data, open/closed channels, fee
330
// schedules, and reputation data.
331
type DB struct {
332
        kvdb.Backend
333

334
        // channelStateDB separates all DB operations on channel state.
335
        channelStateDB *ChannelStateDB
336

337
        dbPath                    string
338
        graph                     *ChannelGraph
339
        clock                     clock.Clock
340
        dryRun                    bool
341
        keepFailedPaymentAttempts bool
342
        storeFinalHtlcResolutions bool
343

344
        // noRevLogAmtData if true, means that commitment transaction amount
345
        // data should not be stored in the revocation log.
346
        noRevLogAmtData bool
347
}
348

349
// Open opens or creates channeldb. Any necessary schemas migrations due
350
// to updates will take place as necessary.
351
// TODO(bhandras): deprecate this function.
UNCOV
352
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
×
UNCOV
353
        opts := DefaultOptions()
×
UNCOV
354
        for _, modifier := range modifiers {
×
UNCOV
355
                modifier(&opts)
×
UNCOV
356
        }
×
357

UNCOV
358
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
×
UNCOV
359
                DBPath:            dbPath,
×
UNCOV
360
                DBFileName:        dbName,
×
UNCOV
361
                NoFreelistSync:    opts.NoFreelistSync,
×
UNCOV
362
                AutoCompact:       opts.AutoCompact,
×
UNCOV
363
                AutoCompactMinAge: opts.AutoCompactMinAge,
×
UNCOV
364
                DBTimeout:         opts.DBTimeout,
×
UNCOV
365
        })
×
UNCOV
366
        if err != nil {
×
367
                return nil, err
×
368
        }
×
369

UNCOV
370
        db, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
371
        if err == nil {
×
UNCOV
372
                db.dbPath = dbPath
×
UNCOV
373
        }
×
UNCOV
374
        return db, err
×
375
}
376

377
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
378
// Any necessary schemas migrations due to updates will take place as necessary.
379
func CreateWithBackend(backend kvdb.Backend,
380
        modifiers ...OptionModifier) (*DB, error) {
2✔
381

2✔
382
        opts := DefaultOptions()
2✔
383
        for _, modifier := range modifiers {
4✔
384
                modifier(&opts)
2✔
385
        }
2✔
386

387
        if !opts.NoMigration {
4✔
388
                if err := initChannelDB(backend); err != nil {
2✔
UNCOV
389
                        return nil, err
×
UNCOV
390
                }
×
391
        }
392

393
        chanDB := &DB{
2✔
394
                Backend: backend,
2✔
395
                channelStateDB: &ChannelStateDB{
2✔
396
                        linkNodeDB: &LinkNodeDB{
2✔
397
                                backend: backend,
2✔
398
                        },
2✔
399
                        backend: backend,
2✔
400
                },
2✔
401
                clock:                     opts.clock,
2✔
402
                dryRun:                    opts.dryRun,
2✔
403
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
2✔
404
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
2✔
405
                noRevLogAmtData:           opts.NoRevLogAmtData,
2✔
406
        }
2✔
407

2✔
408
        // Set the parent pointer (only used in tests).
2✔
409
        chanDB.channelStateDB.parent = chanDB
2✔
410

2✔
411
        var err error
2✔
412
        chanDB.graph, err = NewChannelGraph(
2✔
413
                backend, opts.RejectCacheSize, opts.ChannelCacheSize,
2✔
414
                opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
2✔
415
                opts.UseGraphCache, opts.NoMigration,
2✔
416
        )
2✔
417
        if err != nil {
2✔
418
                return nil, err
×
419
        }
×
420

421
        // Synchronize the version of database and apply migrations if needed.
422
        if !opts.NoMigration {
4✔
423
                if err := chanDB.syncVersions(dbVersions); err != nil {
2✔
UNCOV
424
                        backend.Close()
×
UNCOV
425
                        return nil, err
×
UNCOV
426
                }
×
427

428
                // Grab the optional migration config.
429
                omc := opts.OptionalMiragtionConfig
2✔
430
                if err := chanDB.applyOptionalVersions(omc); err != nil {
2✔
431
                        backend.Close()
×
432
                        return nil, err
×
433
                }
×
434
        }
435

436
        return chanDB, nil
2✔
437
}
438

439
// Path returns the file path to the channel database.
UNCOV
440
func (d *DB) Path() string {
×
UNCOV
441
        return d.dbPath
×
UNCOV
442
}
×
443

444
var dbTopLevelBuckets = [][]byte{
445
        openChannelBucket,
446
        closedChannelBucket,
447
        forwardingLogBucket,
448
        fwdPackagesKey,
449
        invoiceBucket,
450
        payAddrIndexBucket,
451
        setIDIndexBucket,
452
        paymentsIndexBucket,
453
        peersBucket,
454
        nodeInfoBucket,
455
        metaBucket,
456
        closeSummaryBucket,
457
        outpointBucket,
458
        chanIDBucket,
459
        historicalChannelBucket,
460
}
461

462
// Wipe completely deletes all saved state within all used buckets within the
463
// database. The deletion is done in a single transaction, therefore this
464
// operation is fully atomic.
UNCOV
465
func (d *DB) Wipe() error {
×
UNCOV
466
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
467
                for _, tlb := range dbTopLevelBuckets {
×
UNCOV
468
                        err := tx.DeleteTopLevelBucket(tlb)
×
UNCOV
469
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
470
                                return err
×
471
                        }
×
472
                }
UNCOV
473
                return nil
×
UNCOV
474
        }, func() {})
×
UNCOV
475
        if err != nil {
×
476
                return err
×
477
        }
×
478

UNCOV
479
        return initChannelDB(d.Backend)
×
480
}
481

482
// initChannelDB creates and initializes a fresh version of channeldb. In the
483
// case that the target path has not yet been created or doesn't yet exist, then
484
// the path is created. Additionally, all required top-level buckets used within
485
// the database are created.
486
func initChannelDB(db kvdb.Backend) error {
2✔
487
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
4✔
488
                // Check if DB was marked as inactive with a tomb stone.
2✔
489
                if err := EnsureNoTombstone(tx); err != nil {
2✔
UNCOV
490
                        return err
×
UNCOV
491
                }
×
492

493
                meta := &Meta{}
2✔
494
                // Check if DB is already initialized.
2✔
495
                err := FetchMeta(meta, tx)
2✔
496
                if err == nil {
4✔
497
                        return nil
2✔
498
                }
2✔
499

500
                for _, tlb := range dbTopLevelBuckets {
4✔
501
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
2✔
502
                                return err
×
503
                        }
×
504
                }
505

506
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
2✔
507
                return putMeta(meta, tx)
2✔
508
        }, func() {})
2✔
509
        if err != nil {
2✔
UNCOV
510
                return fmt.Errorf("unable to create new channeldb: %w", err)
×
UNCOV
511
        }
×
512

513
        return nil
2✔
514
}
515

516
// fileExists returns true if the file exists, and false otherwise.
UNCOV
517
func fileExists(path string) bool {
×
UNCOV
518
        if _, err := os.Stat(path); err != nil {
×
519
                if os.IsNotExist(err) {
×
520
                        return false
×
521
                }
×
522
        }
523

UNCOV
524
        return true
×
525
}
526

527
// ChannelStateDB is a database that keeps track of all channel state.
528
type ChannelStateDB struct {
529
        // linkNodeDB separates all DB operations on LinkNodes.
530
        linkNodeDB *LinkNodeDB
531

532
        // parent holds a pointer to the "main" channeldb.DB object. This is
533
        // only used for testing and should never be used in production code.
534
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
535
        // this pointer.
536
        parent *DB
537

538
        // backend points to the actual backend holding the channel state
539
        // database. This may be a real backend or a cache middleware.
540
        backend kvdb.Backend
541
}
542

543
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
544
// ChannelStateDB instance. Use this function only in tests where passing around
545
// pointers makes testing less readable. Never to be used in production code!
UNCOV
546
func (c *ChannelStateDB) GetParentDB() *DB {
×
UNCOV
547
        return c.parent
×
UNCOV
548
}
×
549

550
// LinkNodeDB returns the current instance of the link node database.
551
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
2✔
552
        return c.linkNodeDB
2✔
553
}
2✔
554

555
// FetchOpenChannels starts a new database transaction and returns all stored
556
// currently active/open channels associated with the target nodeID. In the case
557
// that no active channels are known to have been created with this node, then a
558
// zero-length slice is returned.
559
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
560
        []*OpenChannel, error) {
2✔
561

2✔
562
        var channels []*OpenChannel
2✔
563
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
564
                var err error
2✔
565
                channels, err = c.fetchOpenChannels(tx, nodeID)
2✔
566
                return err
2✔
567
        }, func() {
4✔
568
                channels = nil
2✔
569
        })
2✔
570

571
        return channels, err
2✔
572
}
573

574
// fetchOpenChannels uses and existing database transaction and returns all
575
// stored currently active/open channels associated with the target nodeID. In
576
// the case that no active channels are known to have been created with this
577
// node, then a zero-length slice is returned.
578
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
579
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
2✔
580

2✔
581
        // Get the bucket dedicated to storing the metadata for open channels.
2✔
582
        openChanBucket := tx.ReadBucket(openChannelBucket)
2✔
583
        if openChanBucket == nil {
2✔
584
                return nil, nil
×
585
        }
×
586

587
        // Within this top level bucket, fetch the bucket dedicated to storing
588
        // open channel data specific to the remote node.
589
        pub := nodeID.SerializeCompressed()
2✔
590
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
2✔
591
        if nodeChanBucket == nil {
4✔
592
                return nil, nil
2✔
593
        }
2✔
594

595
        // Next, we'll need to go down an additional layer in order to retrieve
596
        // the channels for each chain the node knows of.
597
        var channels []*OpenChannel
2✔
598
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
4✔
599
                // If there's a value, it's not a bucket so ignore it.
2✔
600
                if v != nil {
2✔
601
                        return nil
×
602
                }
×
603

604
                // If we've found a valid chainhash bucket, then we'll retrieve
605
                // that so we can extract all the channels.
606
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
2✔
607
                if chainBucket == nil {
2✔
608
                        return fmt.Errorf("unable to read bucket for chain=%x",
×
609
                                chainHash[:])
×
610
                }
×
611

612
                // Finally, we both of the necessary buckets retrieved, fetch
613
                // all the active channels related to this node.
614
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
2✔
615
                if err != nil {
2✔
616
                        return fmt.Errorf("unable to read channel for "+
×
617
                                "chain_hash=%x, node_key=%x: %v",
×
618
                                chainHash[:], pub, err)
×
619
                }
×
620

621
                channels = append(channels, nodeChannels...)
2✔
622
                return nil
2✔
623
        })
624

625
        return channels, err
2✔
626
}
627

628
// fetchNodeChannels retrieves all active channels from the target chainBucket
629
// which is under a node's dedicated channel bucket. This function is typically
630
// used to fetch all the active channels related to a particular node.
631
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
632
        []*OpenChannel, error) {
2✔
633

2✔
634
        var channels []*OpenChannel
2✔
635

2✔
636
        // A node may have channels on several chains, so for each known chain,
2✔
637
        // we'll extract all the channels.
2✔
638
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
4✔
639
                // If there's a value, it's not a bucket so ignore it.
2✔
640
                if v != nil {
2✔
641
                        return nil
×
642
                }
×
643

644
                // Once we've found a valid channel bucket, we'll extract it
645
                // from the node's chain bucket.
646
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
2✔
647

2✔
648
                var outPoint wire.OutPoint
2✔
649
                err := readOutpoint(bytes.NewReader(chanPoint), &outPoint)
2✔
650
                if err != nil {
2✔
651
                        return err
×
652
                }
×
653
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
2✔
654
                if err != nil {
2✔
655
                        return fmt.Errorf("unable to read channel data for "+
×
656
                                "chan_point=%v: %w", outPoint, err)
×
657
                }
×
658
                oChannel.Db = c
2✔
659

2✔
660
                channels = append(channels, oChannel)
2✔
661

2✔
662
                return nil
2✔
663
        })
664
        if err != nil {
2✔
665
                return nil, err
×
666
        }
×
667

668
        return channels, nil
2✔
669
}
670

671
// FetchChannel attempts to locate a channel specified by the passed channel
672
// point. If the channel cannot be found, then an error will be returned.
673
// Optionally an existing db tx can be supplied.
674
func (c *ChannelStateDB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (
675
        *OpenChannel, error) {
2✔
676

2✔
677
        var targetChanPoint bytes.Buffer
2✔
678
        if err := writeOutpoint(&targetChanPoint, &chanPoint); err != nil {
2✔
679
                return nil, err
×
680
        }
×
681

682
        targetChanPointBytes := targetChanPoint.Bytes()
2✔
683
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
2✔
684
                error) {
4✔
685

2✔
686
                return targetChanPointBytes, &chanPoint, nil
2✔
687
        }
2✔
688

689
        return c.channelScanner(tx, selector)
2✔
690
}
691

692
// FetchChannelByID attempts to locate a channel specified by the passed channel
693
// ID. If the channel cannot be found, then an error will be returned.
694
// Optionally an existing db tx can be supplied.
695
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
696
        *OpenChannel, error) {
2✔
697

2✔
698
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
2✔
699
                error) {
4✔
700

2✔
701
                var (
2✔
702
                        targetChanPointBytes []byte
2✔
703
                        targetChanPoint      *wire.OutPoint
2✔
704

2✔
705
                        // errChanFound is used to signal that the channel has
2✔
706
                        // been found so that iteration through the DB buckets
2✔
707
                        // can stop.
2✔
708
                        errChanFound = errors.New("channel found")
2✔
709
                )
2✔
710
                err := chainBkt.ForEach(func(k, _ []byte) error {
4✔
711
                        var outPoint wire.OutPoint
2✔
712
                        err := readOutpoint(bytes.NewReader(k), &outPoint)
2✔
713
                        if err != nil {
2✔
714
                                return err
×
715
                        }
×
716

717
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
2✔
718
                        if chanID != id {
2✔
UNCOV
719
                                return nil
×
UNCOV
720
                        }
×
721

722
                        targetChanPoint = &outPoint
2✔
723
                        targetChanPointBytes = k
2✔
724

2✔
725
                        return errChanFound
2✔
726
                })
727
                if err != nil && !errors.Is(err, errChanFound) {
2✔
728
                        return nil, nil, err
×
729
                }
×
730
                if targetChanPoint == nil {
2✔
UNCOV
731
                        return nil, nil, ErrChannelNotFound
×
UNCOV
732
                }
×
733

734
                return targetChanPointBytes, targetChanPoint, nil
2✔
735
        }
736

737
        return c.channelScanner(tx, selector)
2✔
738
}
739

740
// channelSelector describes a function that takes a chain-hash bucket from
741
// within the open-channel DB and returns the wanted channel point bytes, and
742
// channel point. It must return the ErrChannelNotFound error if the wanted
743
// channel is not in the given bucket.
744
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
745
        error)
746

747
// channelScanner will traverse the DB to each chain-hash bucket of each node
748
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
749
// to fetch the wanted channel outpoint from the chain bucket.
750
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
751
        chanSelect channelSelector) (*OpenChannel, error) {
2✔
752

2✔
753
        var (
2✔
754
                targetChan *OpenChannel
2✔
755

2✔
756
                // errChanFound is used to signal that the channel has been
2✔
757
                // found so that iteration through the DB buckets can stop.
2✔
758
                errChanFound = errors.New("channel found")
2✔
759
        )
2✔
760

2✔
761
        // chanScan will traverse the following bucket structure:
2✔
762
        //  * nodePub => chainHash => chanPoint
2✔
763
        //
2✔
764
        // At each level we go one further, ensuring that we're traversing the
2✔
765
        // proper key (that's actually a bucket). By only reading the bucket
2✔
766
        // structure and skipping fully decoding each channel, we save a good
2✔
767
        // bit of CPU as we don't need to do things like decompress public
2✔
768
        // keys.
2✔
769
        chanScan := func(tx kvdb.RTx) error {
4✔
770
                // Get the bucket dedicated to storing the metadata for open
2✔
771
                // channels.
2✔
772
                openChanBucket := tx.ReadBucket(openChannelBucket)
2✔
773
                if openChanBucket == nil {
2✔
774
                        return ErrNoActiveChannels
×
775
                }
×
776

777
                // Within the node channel bucket, are the set of node pubkeys
778
                // we have channels with, we don't know the entire set, so we'll
779
                // check them all.
780
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
4✔
781
                        // Ensure that this is a key the same size as a pubkey,
2✔
782
                        // and also that it leads directly to a bucket.
2✔
783
                        if len(nodePub) != 33 || v != nil {
2✔
784
                                return nil
×
785
                        }
×
786

787
                        nodeChanBucket := openChanBucket.NestedReadBucket(
2✔
788
                                nodePub,
2✔
789
                        )
2✔
790
                        if nodeChanBucket == nil {
2✔
791
                                return nil
×
792
                        }
×
793

794
                        // The next layer down is all the chains that this node
795
                        // has channels on with us.
796
                        return nodeChanBucket.ForEach(func(chainHash,
2✔
797
                                v []byte) error {
4✔
798

2✔
799
                                // If there's a value, it's not a bucket so
2✔
800
                                // ignore it.
2✔
801
                                if v != nil {
2✔
802
                                        return nil
×
803
                                }
×
804

805
                                chainBucket := nodeChanBucket.NestedReadBucket(
2✔
806
                                        chainHash,
2✔
807
                                )
2✔
808
                                if chainBucket == nil {
2✔
809
                                        return fmt.Errorf("unable to read "+
×
810
                                                "bucket for chain=%x",
×
811
                                                chainHash)
×
812
                                }
×
813

814
                                // Finally, we reach the leaf bucket that stores
815
                                // all the chanPoints for this node.
816
                                targetChanBytes, chanPoint, err := chanSelect(
2✔
817
                                        chainBucket,
2✔
818
                                )
2✔
819
                                if errors.Is(err, ErrChannelNotFound) {
2✔
UNCOV
820
                                        return nil
×
821
                                } else if err != nil {
2✔
822
                                        return err
×
823
                                }
×
824

825
                                chanBucket := chainBucket.NestedReadBucket(
2✔
826
                                        targetChanBytes,
2✔
827
                                )
2✔
828
                                if chanBucket == nil {
4✔
829
                                        return nil
2✔
830
                                }
2✔
831

832
                                channel, err := fetchOpenChannel(
2✔
833
                                        chanBucket, chanPoint,
2✔
834
                                )
2✔
835
                                if err != nil {
2✔
836
                                        return err
×
837
                                }
×
838

839
                                targetChan = channel
2✔
840
                                targetChan.Db = c
2✔
841

2✔
842
                                return errChanFound
2✔
843
                        })
844
                })
845
        }
846

847
        var err error
2✔
848
        if tx == nil {
4✔
849
                err = kvdb.View(c.backend, chanScan, func() {})
4✔
850
        } else {
2✔
851
                err = chanScan(tx)
2✔
852
        }
2✔
853
        if err != nil && !errors.Is(err, errChanFound) {
2✔
854
                return nil, err
×
855
        }
×
856

857
        if targetChan != nil {
4✔
858
                return targetChan, nil
2✔
859
        }
2✔
860

861
        // If we can't find the channel, then we return with an error, as we
862
        // have nothing to back up.
863
        return nil, ErrChannelNotFound
2✔
864
}
865

866
// FetchAllChannels attempts to retrieve all open channels currently stored
867
// within the database, including pending open, fully open and channels waiting
868
// for a closing transaction to confirm.
869
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
2✔
870
        return fetchChannels(c)
2✔
871
}
2✔
872

873
// FetchAllOpenChannels will return all channels that have the funding
874
// transaction confirmed, and is not waiting for a closing transaction to be
875
// confirmed.
876
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
2✔
877
        return fetchChannels(
2✔
878
                c,
2✔
879
                pendingChannelFilter(false),
2✔
880
                waitingCloseFilter(false),
2✔
881
        )
2✔
882
}
2✔
883

884
// FetchPendingChannels will return channels that have completed the process of
885
// generating and broadcasting funding transactions, but whose funding
886
// transactions have yet to be confirmed on the blockchain.
887
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
2✔
888
        return fetchChannels(c,
2✔
889
                pendingChannelFilter(true),
2✔
890
                waitingCloseFilter(false),
2✔
891
        )
2✔
892
}
2✔
893

894
// FetchWaitingCloseChannels will return all channels that have been opened,
895
// but are now waiting for a closing transaction to be confirmed.
896
//
897
// NOTE: This includes channels that are also pending to be opened.
898
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
2✔
899
        return fetchChannels(
2✔
900
                c, waitingCloseFilter(true),
2✔
901
        )
2✔
902
}
2✔
903

904
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
905
// A set of filters can be combined to filter across multiple dimensions.
906
type fetchChannelsFilter func(channel *OpenChannel) bool
907

908
// pendingChannelFilter returns a filter based on whether channels are pending
909
// (ie, their funding transaction still needs to confirm). If pending is false,
910
// channels with confirmed funding transactions are returned.
911
func pendingChannelFilter(pending bool) fetchChannelsFilter {
2✔
912
        return func(channel *OpenChannel) bool {
4✔
913
                return channel.IsPending == pending
2✔
914
        }
2✔
915
}
916

917
// waitingCloseFilter returns a filter which filters channels based on whether
918
// they are awaiting the confirmation of their closing transaction. If waiting
919
// close is true, channels that have had their closing tx broadcast are
920
// included. If it is false, channels that are not awaiting confirmation of
921
// their close transaction are returned.
922
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
2✔
923
        return func(channel *OpenChannel) bool {
4✔
924
                // If the channel is in any other state than Default,
2✔
925
                // then it means it is waiting to be closed.
2✔
926
                channelWaitingClose :=
2✔
927
                        channel.ChanStatus() != ChanStatusDefault
2✔
928

2✔
929
                // Include the channel if it matches the value for
2✔
930
                // waiting close that we are filtering on.
2✔
931
                return channelWaitingClose == waitingClose
2✔
932
        }
2✔
933
}
934

935
// fetchChannels attempts to retrieve channels currently stored in the
936
// database. It takes a set of filters which are applied to each channel to
937
// obtain a set of channels with the desired set of properties. Only channels
938
// which have a true value returned for *all* of the filters will be returned.
939
// If no filters are provided, every channel in the open channels bucket will
940
// be returned.
941
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
942
        []*OpenChannel, error) {
2✔
943

2✔
944
        var channels []*OpenChannel
2✔
945

2✔
946
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
947
                // Get the bucket dedicated to storing the metadata for open
2✔
948
                // channels.
2✔
949
                openChanBucket := tx.ReadBucket(openChannelBucket)
2✔
950
                if openChanBucket == nil {
2✔
951
                        return ErrNoActiveChannels
×
952
                }
×
953

954
                // Next, fetch the bucket dedicated to storing metadata related
955
                // to all nodes. All keys within this bucket are the serialized
956
                // public keys of all our direct counterparties.
957
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
2✔
958
                if nodeMetaBucket == nil {
2✔
959
                        return fmt.Errorf("node bucket not created")
×
960
                }
×
961

962
                // Finally for each node public key in the bucket, fetch all
963
                // the channels related to this particular node.
964
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
4✔
965
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
2✔
966
                        if nodeChanBucket == nil {
2✔
967
                                return nil
×
968
                        }
×
969

970
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
4✔
971
                                // If there's a value, it's not a bucket so
2✔
972
                                // ignore it.
2✔
973
                                if v != nil {
2✔
974
                                        return nil
×
975
                                }
×
976

977
                                // If we've found a valid chainhash bucket,
978
                                // then we'll retrieve that so we can extract
979
                                // all the channels.
980
                                chainBucket := nodeChanBucket.NestedReadBucket(
2✔
981
                                        chainHash,
2✔
982
                                )
2✔
983
                                if chainBucket == nil {
2✔
984
                                        return fmt.Errorf("unable to read "+
×
985
                                                "bucket for chain=%x", chainHash[:])
×
986
                                }
×
987

988
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
2✔
989
                                if err != nil {
2✔
990
                                        return fmt.Errorf("unable to read "+
×
991
                                                "channel for chain_hash=%x, "+
×
992
                                                "node_key=%x: %v", chainHash[:], k, err)
×
993
                                }
×
994
                                for _, channel := range nodeChans {
4✔
995
                                        // includeChannel indicates whether the channel
2✔
996
                                        // meets the criteria specified by our filters.
2✔
997
                                        includeChannel := true
2✔
998

2✔
999
                                        // Run through each filter and check whether the
2✔
1000
                                        // channel should be included.
2✔
1001
                                        for _, f := range filters {
4✔
1002
                                                // If the channel fails the filter, set
2✔
1003
                                                // includeChannel to false and don't bother
2✔
1004
                                                // checking the remaining filters.
2✔
1005
                                                if !f(channel) {
4✔
1006
                                                        includeChannel = false
2✔
1007
                                                        break
2✔
1008
                                                }
1009
                                        }
1010

1011
                                        // If the channel passed every filter, include it in
1012
                                        // our set of channels.
1013
                                        if includeChannel {
4✔
1014
                                                channels = append(channels, channel)
2✔
1015
                                        }
2✔
1016
                                }
1017
                                return nil
2✔
1018
                        })
1019

1020
                })
1021
        }, func() {
2✔
1022
                channels = nil
2✔
1023
        })
2✔
1024
        if err != nil {
2✔
1025
                return nil, err
×
1026
        }
×
1027

1028
        return channels, nil
2✔
1029
}
1030

1031
// FetchClosedChannels attempts to fetch all closed channels from the database.
1032
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1033
// be returned in the response or not. When a channel was cooperatively closed,
1034
// it becomes fully closed after a single confirmation.  When a channel was
1035
// forcibly closed, it will become fully closed after _all_ the pending funds
1036
// (if any) have been swept.
1037
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1038
        []*ChannelCloseSummary, error) {
2✔
1039

2✔
1040
        var chanSummaries []*ChannelCloseSummary
2✔
1041

2✔
1042
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
1043
                closeBucket := tx.ReadBucket(closedChannelBucket)
2✔
1044
                if closeBucket == nil {
2✔
1045
                        return ErrNoClosedChannels
×
1046
                }
×
1047

1048
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
4✔
1049
                        summaryReader := bytes.NewReader(summaryBytes)
2✔
1050
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
2✔
1051
                        if err != nil {
2✔
1052
                                return err
×
1053
                        }
×
1054

1055
                        // If the query specified to only include pending
1056
                        // channels, then we'll skip any channels which aren't
1057
                        // currently pending.
1058
                        if !chanSummary.IsPending && pendingOnly {
4✔
1059
                                return nil
2✔
1060
                        }
2✔
1061

1062
                        chanSummaries = append(chanSummaries, chanSummary)
2✔
1063
                        return nil
2✔
1064
                })
1065
        }, func() {
2✔
1066
                chanSummaries = nil
2✔
1067
        }); err != nil {
2✔
1068
                return nil, err
×
1069
        }
×
1070

1071
        return chanSummaries, nil
2✔
1072
}
1073

1074
// ErrClosedChannelNotFound signals that a closed channel could not be found in
1075
// the channeldb.
1076
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
1077

1078
// FetchClosedChannel queries for a channel close summary using the channel
1079
// point of the channel in question.
1080
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1081
        *ChannelCloseSummary, error) {
2✔
1082

2✔
1083
        var chanSummary *ChannelCloseSummary
2✔
1084
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
1085
                closeBucket := tx.ReadBucket(closedChannelBucket)
2✔
1086
                if closeBucket == nil {
2✔
1087
                        return ErrClosedChannelNotFound
×
1088
                }
×
1089

1090
                var b bytes.Buffer
2✔
1091
                var err error
2✔
1092
                if err = writeOutpoint(&b, chanID); err != nil {
2✔
1093
                        return err
×
1094
                }
×
1095

1096
                summaryBytes := closeBucket.Get(b.Bytes())
2✔
1097
                if summaryBytes == nil {
2✔
UNCOV
1098
                        return ErrClosedChannelNotFound
×
UNCOV
1099
                }
×
1100

1101
                summaryReader := bytes.NewReader(summaryBytes)
2✔
1102
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
2✔
1103

2✔
1104
                return err
2✔
1105
        }, func() {
2✔
1106
                chanSummary = nil
2✔
1107
        }); err != nil {
2✔
UNCOV
1108
                return nil, err
×
UNCOV
1109
        }
×
1110

1111
        return chanSummary, nil
2✔
1112
}
1113

1114
// FetchClosedChannelForID queries for a channel close summary using the
1115
// channel ID of the channel in question.
1116
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
1117
        *ChannelCloseSummary, error) {
2✔
1118

2✔
1119
        var chanSummary *ChannelCloseSummary
2✔
1120
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
1121
                closeBucket := tx.ReadBucket(closedChannelBucket)
2✔
1122
                if closeBucket == nil {
2✔
1123
                        return ErrClosedChannelNotFound
×
1124
                }
×
1125

1126
                // The first 30 bytes of the channel ID and outpoint will be
1127
                // equal.
1128
                cursor := closeBucket.ReadCursor()
2✔
1129
                op, c := cursor.Seek(cid[:30])
2✔
1130

2✔
1131
                // We scan over all possible candidates for this channel ID.
2✔
1132
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
4✔
1133
                        var outPoint wire.OutPoint
2✔
1134
                        err := readOutpoint(bytes.NewReader(op), &outPoint)
2✔
1135
                        if err != nil {
2✔
1136
                                return err
×
1137
                        }
×
1138

1139
                        // If the found outpoint does not correspond to this
1140
                        // channel ID, we continue.
1141
                        if !cid.IsChanPoint(&outPoint) {
2✔
UNCOV
1142
                                continue
×
1143
                        }
1144

1145
                        // Deserialize the close summary and return.
1146
                        r := bytes.NewReader(c)
2✔
1147
                        chanSummary, err = deserializeCloseChannelSummary(r)
2✔
1148
                        if err != nil {
2✔
1149
                                return err
×
1150
                        }
×
1151

1152
                        return nil
2✔
1153
                }
1154
                return ErrClosedChannelNotFound
2✔
1155
        }, func() {
2✔
1156
                chanSummary = nil
2✔
1157
        }); err != nil {
4✔
1158
                return nil, err
2✔
1159
        }
2✔
1160

1161
        return chanSummary, nil
2✔
1162
}
1163

1164
// MarkChanFullyClosed marks a channel as fully closed within the database. A
1165
// channel should be marked as fully closed if the channel was initially
1166
// cooperatively closed and it's reached a single confirmation, or after all
1167
// the pending funds in a channel that has been forcibly closed have been
1168
// swept.
1169
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
2✔
1170
        var (
2✔
1171
                openChannels  []*OpenChannel
2✔
1172
                pruneLinkNode *btcec.PublicKey
2✔
1173
        )
2✔
1174
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
4✔
1175
                var b bytes.Buffer
2✔
1176
                if err := writeOutpoint(&b, chanPoint); err != nil {
2✔
1177
                        return err
×
1178
                }
×
1179

1180
                chanID := b.Bytes()
2✔
1181

2✔
1182
                closedChanBucket, err := tx.CreateTopLevelBucket(
2✔
1183
                        closedChannelBucket,
2✔
1184
                )
2✔
1185
                if err != nil {
2✔
1186
                        return err
×
1187
                }
×
1188

1189
                chanSummaryBytes := closedChanBucket.Get(chanID)
2✔
1190
                if chanSummaryBytes == nil {
2✔
1191
                        return fmt.Errorf("no closed channel for "+
×
1192
                                "chan_point=%v found", chanPoint)
×
1193
                }
×
1194

1195
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
2✔
1196
                chanSummary, err := deserializeCloseChannelSummary(
2✔
1197
                        chanSummaryReader,
2✔
1198
                )
2✔
1199
                if err != nil {
2✔
1200
                        return err
×
1201
                }
×
1202

1203
                chanSummary.IsPending = false
2✔
1204

2✔
1205
                var newSummary bytes.Buffer
2✔
1206
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
2✔
1207
                if err != nil {
2✔
1208
                        return err
×
1209
                }
×
1210

1211
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
2✔
1212
                if err != nil {
2✔
1213
                        return err
×
1214
                }
×
1215

1216
                // Now that the channel is closed, we'll check if we have any
1217
                // other open channels with this peer. If we don't we'll
1218
                // garbage collect it to ensure we don't establish persistent
1219
                // connections to peers without open channels.
1220
                pruneLinkNode = chanSummary.RemotePub
2✔
1221
                openChannels, err = c.fetchOpenChannels(
2✔
1222
                        tx, pruneLinkNode,
2✔
1223
                )
2✔
1224
                if err != nil {
2✔
1225
                        return fmt.Errorf("unable to fetch open channels for "+
×
1226
                                "peer %x: %v",
×
1227
                                pruneLinkNode.SerializeCompressed(), err)
×
1228
                }
×
1229

1230
                return nil
2✔
1231
        }, func() {
2✔
1232
                openChannels = nil
2✔
1233
                pruneLinkNode = nil
2✔
1234
        })
2✔
1235
        if err != nil {
2✔
1236
                return err
×
1237
        }
×
1238

1239
        // Decide whether we want to remove the link node, based upon the number
1240
        // of still open channels.
1241
        return c.pruneLinkNode(openChannels, pruneLinkNode)
2✔
1242
}
1243

1244
// pruneLinkNode determines whether we should garbage collect a link node from
1245
// the database due to no longer having any open channels with it. If there are
1246
// any left, then this acts as a no-op.
1247
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1248
        remotePub *btcec.PublicKey) error {
2✔
1249

2✔
1250
        if len(openChannels) > 0 {
4✔
1251
                return nil
2✔
1252
        }
2✔
1253

1254
        log.Infof("Pruning link node %x with zero open channels from database",
2✔
1255
                remotePub.SerializeCompressed())
2✔
1256

2✔
1257
        return c.linkNodeDB.DeleteLinkNode(remotePub)
2✔
1258
}
1259

1260
// PruneLinkNodes attempts to prune all link nodes found within the database
1261
// with whom we no longer have any open channels with.
1262
func (c *ChannelStateDB) PruneLinkNodes() error {
2✔
1263
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
2✔
1264
        if err != nil {
2✔
1265
                return err
×
1266
        }
×
1267

1268
        for _, linkNode := range allLinkNodes {
4✔
1269
                var (
2✔
1270
                        openChannels []*OpenChannel
2✔
1271
                        linkNode     = linkNode
2✔
1272
                )
2✔
1273
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
1274
                        var err error
2✔
1275
                        openChannels, err = c.fetchOpenChannels(
2✔
1276
                                tx, linkNode.IdentityPub,
2✔
1277
                        )
2✔
1278
                        return err
2✔
1279
                }, func() {
4✔
1280
                        openChannels = nil
2✔
1281
                })
2✔
1282
                if err != nil {
2✔
1283
                        return err
×
1284
                }
×
1285

1286
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
2✔
1287
                if err != nil {
2✔
1288
                        return err
×
1289
                }
×
1290
        }
1291

1292
        return nil
2✔
1293
}
1294

1295
// ChannelShell is a shell of a channel that is meant to be used for channel
1296
// recovery purposes. It contains a minimal OpenChannel instance along with
1297
// addresses for that target node.
1298
type ChannelShell struct {
1299
        // NodeAddrs the set of addresses that this node has known to be
1300
        // reachable at in the past.
1301
        NodeAddrs []net.Addr
1302

1303
        // Chan is a shell of an OpenChannel, it contains only the items
1304
        // required to restore the channel on disk.
1305
        Chan *OpenChannel
1306
}
1307

1308
// RestoreChannelShells is a method that allows the caller to reconstruct the
1309
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1310
// new channel to disk, create a LinkNode instance with the passed node
1311
// addresses, and finally create an edge within the graph for the channel as
1312
// well. This method is idempotent, so repeated calls with the same set of
1313
// channel shells won't modify the database after the initial call.
1314
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
2✔
1315
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
4✔
1316
                for _, channelShell := range channelShells {
4✔
1317
                        channel := channelShell.Chan
2✔
1318

2✔
1319
                        // When we make a channel, we mark that the channel has
2✔
1320
                        // been restored, this will signal to other sub-systems
2✔
1321
                        // to not attempt to use the channel as if it was a
2✔
1322
                        // regular one.
2✔
1323
                        channel.chanStatus |= ChanStatusRestored
2✔
1324

2✔
1325
                        // First, we'll attempt to create a new open channel
2✔
1326
                        // and link node for this channel. If the channel
2✔
1327
                        // already exists, then in order to ensure this method
2✔
1328
                        // is idempotent, we'll continue to the next step.
2✔
1329
                        channel.Db = c
2✔
1330
                        err := syncNewChannel(
2✔
1331
                                tx, channel, channelShell.NodeAddrs,
2✔
1332
                        )
2✔
1333
                        if err != nil {
4✔
1334
                                return err
2✔
1335
                        }
2✔
1336
                }
1337

1338
                return nil
2✔
1339
        }, func() {})
2✔
1340
        if err != nil {
4✔
1341
                return err
2✔
1342
        }
2✔
1343

1344
        return nil
2✔
1345
}
1346

1347
// AddrsForNode consults the graph and channel database for all addresses known
1348
// to the passed node public key.
1349
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr,
1350
        error) {
2✔
1351

2✔
1352
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
2✔
1353
        if err != nil {
2✔
1354
                return nil, err
×
1355
        }
×
1356

1357
        // We'll also query the graph for this peer to see if they have any
1358
        // addresses that we don't currently have stored within the link node
1359
        // database.
1360
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
2✔
1361
        if err != nil {
2✔
1362
                return nil, err
×
1363
        }
×
1364
        graphNode, err := d.graph.FetchLightningNode(pubKey)
2✔
1365
        if err != nil && err != ErrGraphNodeNotFound {
2✔
1366
                return nil, err
×
1367
        } else if err == ErrGraphNodeNotFound {
4✔
1368
                // If the node isn't found, then that's OK, as we still have the
2✔
1369
                // link node data. But any other error needs to be returned.
2✔
1370
                graphNode = &LightningNode{}
2✔
1371
        }
2✔
1372

1373
        // Now that we have both sources of addrs for this node, we'll use a
1374
        // map to de-duplicate any addresses between the two sources, and
1375
        // produce a final list of the combined addrs.
1376
        addrs := make(map[string]net.Addr)
2✔
1377
        for _, addr := range linkNode.Addresses {
4✔
1378
                addrs[addr.String()] = addr
2✔
1379
        }
2✔
1380
        for _, addr := range graphNode.Addresses {
4✔
1381
                addrs[addr.String()] = addr
2✔
1382
        }
2✔
1383
        dedupedAddrs := make([]net.Addr, 0, len(addrs))
2✔
1384
        for _, addr := range addrs {
4✔
1385
                dedupedAddrs = append(dedupedAddrs, addr)
2✔
1386
        }
2✔
1387

1388
        return dedupedAddrs, nil
2✔
1389
}
1390

1391
// AbandonChannel attempts to remove the target channel from the open channel
1392
// database. If the channel was already removed (has a closed channel entry),
1393
// then we'll return a nil error. Otherwise, we'll insert a new close summary
1394
// into the database.
1395
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1396
        bestHeight uint32) error {
2✔
1397

2✔
1398
        // With the chanPoint constructed, we'll attempt to find the target
2✔
1399
        // channel in the database. If we can't find the channel, then we'll
2✔
1400
        // return the error back to the caller.
2✔
1401
        dbChan, err := c.FetchChannel(nil, *chanPoint)
2✔
1402
        switch {
2✔
1403
        // If the channel wasn't found, then it's possible that it was already
1404
        // abandoned from the database.
1405
        case err == ErrChannelNotFound:
2✔
1406
                _, closedErr := c.FetchClosedChannel(chanPoint)
2✔
1407
                if closedErr != nil {
2✔
UNCOV
1408
                        return closedErr
×
UNCOV
1409
                }
×
1410

1411
                // If the channel was already closed, then we don't return an
1412
                // error as we'd like this step to be repeatable.
1413
                return nil
2✔
1414
        case err != nil:
×
1415
                return err
×
1416
        }
1417

1418
        // Now that we've found the channel, we'll populate a close summary for
1419
        // the channel, so we can store as much information for this abounded
1420
        // channel as possible. We also ensure that we set Pending to false, to
1421
        // indicate that this channel has been "fully" closed.
1422
        summary := &ChannelCloseSummary{
2✔
1423
                CloseType:               Abandoned,
2✔
1424
                ChanPoint:               *chanPoint,
2✔
1425
                ChainHash:               dbChan.ChainHash,
2✔
1426
                CloseHeight:             bestHeight,
2✔
1427
                RemotePub:               dbChan.IdentityPub,
2✔
1428
                Capacity:                dbChan.Capacity,
2✔
1429
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
2✔
1430
                ShortChanID:             dbChan.ShortChanID(),
2✔
1431
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
2✔
1432
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
2✔
1433
                LocalChanConfig:         dbChan.LocalChanCfg,
2✔
1434
        }
2✔
1435

2✔
1436
        // Finally, we'll close the channel in the DB, and return back to the
2✔
1437
        // caller. We set ourselves as the close initiator because we abandoned
2✔
1438
        // the channel.
2✔
1439
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
2✔
1440
}
1441

1442
// SaveChannelOpeningState saves the serialized channel state for the provided
1443
// chanPoint to the channelOpeningStateBucket.
1444
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
1445
        serializedState []byte) error {
2✔
1446

2✔
1447
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
4✔
1448
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
2✔
1449
                if err != nil {
2✔
1450
                        return err
×
1451
                }
×
1452

1453
                return bucket.Put(outPoint, serializedState)
2✔
1454
        }, func() {})
2✔
1455
}
1456

1457
// GetChannelOpeningState fetches the serialized channel state for the provided
1458
// outPoint from the database, or returns ErrChannelNotFound if the channel
1459
// is not found.
1460
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1461
        error) {
2✔
1462

2✔
1463
        var serializedState []byte
2✔
1464
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
1465
                bucket := tx.ReadBucket(channelOpeningStateBucket)
2✔
1466
                if bucket == nil {
4✔
1467
                        // If the bucket does not exist, it means we never added
2✔
1468
                        //  a channel to the db, so return ErrChannelNotFound.
2✔
1469
                        return ErrChannelNotFound
2✔
1470
                }
2✔
1471

1472
                stateBytes := bucket.Get(outPoint)
2✔
1473
                if stateBytes == nil {
4✔
1474
                        return ErrChannelNotFound
2✔
1475
                }
2✔
1476

1477
                serializedState = append(serializedState, stateBytes...)
2✔
1478

2✔
1479
                return nil
2✔
1480
        }, func() {
2✔
1481
                serializedState = nil
2✔
1482
        })
2✔
1483
        return serializedState, err
2✔
1484
}
1485

1486
// DeleteChannelOpeningState removes any state for outPoint from the database.
1487
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
2✔
1488
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
4✔
1489
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
2✔
1490
                if bucket == nil {
2✔
1491
                        return ErrChannelNotFound
×
1492
                }
×
1493

1494
                return bucket.Delete(outPoint)
2✔
1495
        }, func() {})
2✔
1496
}
1497

1498
// syncVersions function is used for safe db version synchronization. It
1499
// applies migration functions to the current database and recovers the
1500
// previous state of db if at least one error/panic appeared during migration.
1501
func (d *DB) syncVersions(versions []mandatoryVersion) error {
2✔
1502
        meta, err := d.FetchMeta()
2✔
1503
        if err != nil {
2✔
1504
                if err == ErrMetaNotFound {
×
1505
                        meta = &Meta{}
×
1506
                } else {
×
1507
                        return err
×
1508
                }
×
1509
        }
1510

1511
        latestVersion := getLatestDBVersion(versions)
2✔
1512
        log.Infof("Checking for schema update: latest_version=%v, "+
2✔
1513
                "db_version=%v", latestVersion, meta.DbVersionNumber)
2✔
1514

2✔
1515
        switch {
2✔
1516

1517
        // If the database reports a higher version that we are aware of, the
1518
        // user is probably trying to revert to a prior version of lnd. We fail
1519
        // here to prevent reversions and unintended corruption.
UNCOV
1520
        case meta.DbVersionNumber > latestVersion:
×
UNCOV
1521
                log.Errorf("Refusing to revert from db_version=%d to "+
×
UNCOV
1522
                        "lower version=%d", meta.DbVersionNumber,
×
UNCOV
1523
                        latestVersion)
×
UNCOV
1524
                return ErrDBReversion
×
1525

1526
        // If the current database version matches the latest version number,
1527
        // then we don't need to perform any migrations.
1528
        case meta.DbVersionNumber == latestVersion:
2✔
1529
                return nil
2✔
1530
        }
1531

UNCOV
1532
        log.Infof("Performing database schema migration")
×
UNCOV
1533

×
UNCOV
1534
        // Otherwise, we fetch the migrations which need to applied, and
×
UNCOV
1535
        // execute them serially within a single database transaction to ensure
×
UNCOV
1536
        // the migration is atomic.
×
UNCOV
1537
        migrations, migrationVersions := getMigrationsToApply(
×
UNCOV
1538
                versions, meta.DbVersionNumber,
×
UNCOV
1539
        )
×
UNCOV
1540
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
×
UNCOV
1541
                for i, migration := range migrations {
×
UNCOV
1542
                        if migration == nil {
×
1543
                                continue
×
1544
                        }
1545

UNCOV
1546
                        log.Infof("Applying migration #%v",
×
UNCOV
1547
                                migrationVersions[i])
×
UNCOV
1548

×
UNCOV
1549
                        if err := migration(tx); err != nil {
×
UNCOV
1550
                                log.Infof("Unable to apply migration #%v",
×
UNCOV
1551
                                        migrationVersions[i])
×
UNCOV
1552
                                return err
×
UNCOV
1553
                        }
×
1554
                }
1555

UNCOV
1556
                meta.DbVersionNumber = latestVersion
×
UNCOV
1557
                err := putMeta(meta, tx)
×
UNCOV
1558
                if err != nil {
×
1559
                        return err
×
1560
                }
×
1561

1562
                // In dry-run mode, return an error to prevent the transaction
1563
                // from committing.
UNCOV
1564
                if d.dryRun {
×
UNCOV
1565
                        return ErrDryRunMigrationOK
×
UNCOV
1566
                }
×
1567

UNCOV
1568
                return nil
×
UNCOV
1569
        }, func() {})
×
1570
}
1571

1572
// applyOptionalVersions takes a config to determine whether the optional
1573
// migrations will be applied.
1574
//
1575
// NOTE: only support the prune_revocation_log optional migration atm.
1576
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
2✔
1577
        // TODO(yy): need to design the db to support dry run for optional
2✔
1578
        // migrations.
2✔
1579
        if d.dryRun {
2✔
UNCOV
1580
                log.Info("Skipped optional migrations as dry run mode is not " +
×
UNCOV
1581
                        "supported yet")
×
UNCOV
1582
                return nil
×
UNCOV
1583
        }
×
1584

1585
        om, err := d.fetchOptionalMeta()
2✔
1586
        if err != nil {
2✔
1587
                if err == ErrMetaNotFound {
×
1588
                        om = &OptionalMeta{
×
1589
                                Versions: make(map[uint64]string),
×
1590
                        }
×
1591
                } else {
×
1592
                        return err
×
1593
                }
×
1594
        }
1595

1596
        log.Infof("Checking for optional update: prune_revocation_log=%v, "+
2✔
1597
                "db_version=%s", cfg.PruneRevocationLog, om)
2✔
1598

2✔
1599
        // Exit early if the optional migration is not specified.
2✔
1600
        if !cfg.PruneRevocationLog {
4✔
1601
                return nil
2✔
1602
        }
2✔
1603

1604
        // Exit early if the optional migration has already been applied.
UNCOV
1605
        if _, ok := om.Versions[0]; ok {
×
UNCOV
1606
                return nil
×
UNCOV
1607
        }
×
1608

1609
        // Get the optional version.
UNCOV
1610
        version := optionalVersions[0]
×
UNCOV
1611
        log.Infof("Performing database optional migration: %s", version.name)
×
UNCOV
1612

×
UNCOV
1613
        migrationCfg := &MigrationConfigImpl{
×
UNCOV
1614
                migration30.MigrateRevLogConfigImpl{
×
UNCOV
1615
                        NoAmountData: d.noRevLogAmtData,
×
UNCOV
1616
                },
×
UNCOV
1617
        }
×
UNCOV
1618

×
UNCOV
1619
        // Migrate the data.
×
UNCOV
1620
        if err := version.migration(d, migrationCfg); err != nil {
×
1621
                log.Errorf("Unable to apply optional migration: %s, error: %v",
×
1622
                        version.name, err)
×
1623
                return err
×
1624
        }
×
1625

1626
        // Update the optional meta. Notice that unlike the mandatory db
1627
        // migrations where we perform the migration and updating meta in a
1628
        // single db transaction, we use different transactions here. Even when
1629
        // the following update is failed, we should be fine here as we would
1630
        // re-run the optional migration again, which is a noop, during next
1631
        // startup.
UNCOV
1632
        om.Versions[0] = version.name
×
UNCOV
1633
        if err := d.putOptionalMeta(om); err != nil {
×
1634
                log.Errorf("Unable to update optional meta: %v", err)
×
1635
                return err
×
1636
        }
×
1637

UNCOV
1638
        return nil
×
1639
}
1640

1641
// ChannelGraph returns the current instance of the directed channel graph.
1642
func (d *DB) ChannelGraph() *ChannelGraph {
2✔
1643
        return d.graph
2✔
1644
}
2✔
1645

1646
// ChannelStateDB returns the sub database that is concerned with the channel
1647
// state.
1648
func (d *DB) ChannelStateDB() *ChannelStateDB {
2✔
1649
        return d.channelStateDB
2✔
1650
}
2✔
1651

1652
// LatestDBVersion returns the number of the latest database version currently
1653
// known to the channel DB.
UNCOV
1654
func LatestDBVersion() uint32 {
×
UNCOV
1655
        return getLatestDBVersion(dbVersions)
×
UNCOV
1656
}
×
1657

1658
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
2✔
1659
        return versions[len(versions)-1].number
2✔
1660
}
2✔
1661

1662
// getMigrationsToApply retrieves the migration function that should be
1663
// applied to the database.
1664
func getMigrationsToApply(versions []mandatoryVersion,
UNCOV
1665
        version uint32) ([]migration, []uint32) {
×
UNCOV
1666

×
UNCOV
1667
        migrations := make([]migration, 0, len(versions))
×
UNCOV
1668
        migrationVersions := make([]uint32, 0, len(versions))
×
UNCOV
1669

×
UNCOV
1670
        for _, v := range versions {
×
UNCOV
1671
                if v.number > version {
×
UNCOV
1672
                        migrations = append(migrations, v.migration)
×
UNCOV
1673
                        migrationVersions = append(migrationVersions, v.number)
×
UNCOV
1674
                }
×
1675
        }
1676

UNCOV
1677
        return migrations, migrationVersions
×
1678
}
1679

1680
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1681
// from the historical channel bucket. If the bucket does not exist,
1682
// ErrNoHistoricalBucket is returned.
1683
func fetchHistoricalChanBucket(tx kvdb.RTx,
1684
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
2✔
1685

2✔
1686
        // First fetch the top level bucket which stores all data related to
2✔
1687
        // historically stored channels.
2✔
1688
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
2✔
1689
        if historicalChanBucket == nil {
2✔
1690
                return nil, ErrNoHistoricalBucket
×
1691
        }
×
1692

1693
        // With the bucket for the node and chain fetched, we can now go down
1694
        // another level, for the channel itself.
1695
        var chanPointBuf bytes.Buffer
2✔
1696
        if err := writeOutpoint(&chanPointBuf, outPoint); err != nil {
2✔
1697
                return nil, err
×
1698
        }
×
1699
        chanBucket := historicalChanBucket.NestedReadBucket(
2✔
1700
                chanPointBuf.Bytes(),
2✔
1701
        )
2✔
1702
        if chanBucket == nil {
2✔
UNCOV
1703
                return nil, ErrChannelNotFound
×
UNCOV
1704
        }
×
1705

1706
        return chanBucket, nil
2✔
1707
}
1708

1709
// FetchHistoricalChannel fetches open channel data from the historical channel
1710
// bucket.
1711
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
1712
        *OpenChannel, error) {
2✔
1713

2✔
1714
        var channel *OpenChannel
2✔
1715
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
1716
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
2✔
1717
                if err != nil {
2✔
UNCOV
1718
                        return err
×
UNCOV
1719
                }
×
1720

1721
                channel, err = fetchOpenChannel(chanBucket, outPoint)
2✔
1722
                if err != nil {
2✔
1723
                        return err
×
1724
                }
×
1725

1726
                channel.Db = c
2✔
1727
                return nil
2✔
1728
        }, func() {
2✔
1729
                channel = nil
2✔
1730
        })
2✔
1731
        if err != nil {
2✔
UNCOV
1732
                return nil, err
×
UNCOV
1733
        }
×
1734

1735
        return channel, nil
2✔
1736
}
1737

1738
func fetchFinalHtlcsBucket(tx kvdb.RTx,
1739
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
2✔
1740

2✔
1741
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
2✔
1742
        if finalHtlcsBucket == nil {
2✔
UNCOV
1743
                return nil, ErrFinalHtlcsBucketNotFound
×
UNCOV
1744
        }
×
1745

1746
        var chanIDBytes [8]byte
2✔
1747
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
2✔
1748

2✔
1749
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
2✔
1750
        if chanBucket == nil {
2✔
1751
                return nil, ErrFinalChannelBucketNotFound
×
1752
        }
×
1753

1754
        return chanBucket, nil
2✔
1755
}
1756

1757
var ErrHtlcUnknown = errors.New("htlc unknown")
1758

1759
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
1760
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1761
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1762
        htlcIndex uint64) (*FinalHtlcInfo, error) {
2✔
1763

2✔
1764
        var idBytes [8]byte
2✔
1765
        byteOrder.PutUint64(idBytes[:], htlcIndex)
2✔
1766

2✔
1767
        var settledByte byte
2✔
1768

2✔
1769
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
1770
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
2✔
1771
                        tx, chanID,
2✔
1772
                )
2✔
1773
                switch {
2✔
UNCOV
1774
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
×
UNCOV
1775
                        fallthrough
×
1776

UNCOV
1777
                case errors.Is(err, ErrFinalChannelBucketNotFound):
×
UNCOV
1778
                        return ErrHtlcUnknown
×
1779

1780
                case err != nil:
×
1781
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
×
1782
                                err)
×
1783
                }
1784

1785
                value := finalHtlcsBucket.Get(idBytes[:])
2✔
1786
                if value == nil {
2✔
UNCOV
1787
                        return ErrHtlcUnknown
×
UNCOV
1788
                }
×
1789

1790
                if len(value) != 1 {
2✔
1791
                        return errors.New("unexpected final htlc value length")
×
1792
                }
×
1793

1794
                settledByte = value[0]
2✔
1795

2✔
1796
                return nil
2✔
1797
        }, func() {
2✔
1798
                settledByte = 0
2✔
1799
        })
2✔
1800
        if err != nil {
2✔
UNCOV
1801
                return nil, err
×
UNCOV
1802
        }
×
1803

1804
        info := FinalHtlcInfo{
2✔
1805
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
2✔
1806
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
2✔
1807
        }
2✔
1808

2✔
1809
        return &info, nil
2✔
1810
}
1811

1812
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
1813
// the database.
1814
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
1815
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
2✔
1816

2✔
1817
        // Skip if the user did not opt in to storing final resolutions.
2✔
1818
        if !c.parent.storeFinalHtlcResolutions {
4✔
1819
                return nil
2✔
1820
        }
2✔
1821

UNCOV
1822
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
×
UNCOV
1823
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
×
UNCOV
1824
                if err != nil {
×
1825
                        return err
×
1826
                }
×
1827

UNCOV
1828
                return putFinalHtlc(
×
UNCOV
1829
                        finalHtlcsBucket, htlcID,
×
UNCOV
1830
                        FinalHtlcInfo{
×
UNCOV
1831
                                Settled:  settled,
×
UNCOV
1832
                                Offchain: false,
×
UNCOV
1833
                        },
×
UNCOV
1834
                )
×
UNCOV
1835
        }, func() {})
×
1836
}
1837

1838
// MakeTestInvoiceDB is used to create a test invoice database for testing
1839
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
1840
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
UNCOV
1841
        invoices.InvoiceDB, error) {
×
UNCOV
1842

×
UNCOV
1843
        return MakeTestDB(t, modifiers...)
×
UNCOV
1844
}
×
1845

1846
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
1847
// A callback which cleans up the created temporary directories is also
1848
// returned and intended to be executed after the test completes.
UNCOV
1849
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
×
UNCOV
1850
        // First, create a temporary directory to be used for the duration of
×
UNCOV
1851
        // this test.
×
UNCOV
1852
        tempDirName := t.TempDir()
×
UNCOV
1853

×
UNCOV
1854
        // Next, create channeldb for the first time.
×
UNCOV
1855
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
×
UNCOV
1856
        if err != nil {
×
1857
                backendCleanup()
×
1858
                return nil, err
×
1859
        }
×
1860

UNCOV
1861
        cdb, err := CreateWithBackend(backend, modifiers...)
×
UNCOV
1862
        if err != nil {
×
1863
                backendCleanup()
×
1864
                return nil, err
×
1865
        }
×
1866

UNCOV
1867
        t.Cleanup(func() {
×
UNCOV
1868
                cdb.Close()
×
UNCOV
1869
                backendCleanup()
×
UNCOV
1870
        })
×
1871

UNCOV
1872
        return cdb, nil
×
1873
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc