• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11219354629

07 Oct 2024 03:56PM UTC coverage: 58.585% (-0.2%) from 58.814%
11219354629

Pull #9147

github

ziggie1984
fixup! sqlc: migration up script for payments.
Pull Request #9147: [Part 1|3] Introduce SQL Payment schema into LND

130227 of 222287 relevant lines covered (58.59%)

29106.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.54
/channeldb/db.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "fmt"
7
        "net"
8
        "os"
9
        "testing"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/go-errors/errors"
15
        mig "github.com/lightningnetwork/lnd/channeldb/migration"
16
        "github.com/lightningnetwork/lnd/channeldb/migration12"
17
        "github.com/lightningnetwork/lnd/channeldb/migration13"
18
        "github.com/lightningnetwork/lnd/channeldb/migration16"
19
        "github.com/lightningnetwork/lnd/channeldb/migration20"
20
        "github.com/lightningnetwork/lnd/channeldb/migration21"
21
        "github.com/lightningnetwork/lnd/channeldb/migration23"
22
        "github.com/lightningnetwork/lnd/channeldb/migration24"
23
        "github.com/lightningnetwork/lnd/channeldb/migration25"
24
        "github.com/lightningnetwork/lnd/channeldb/migration26"
25
        "github.com/lightningnetwork/lnd/channeldb/migration27"
26
        "github.com/lightningnetwork/lnd/channeldb/migration29"
27
        "github.com/lightningnetwork/lnd/channeldb/migration30"
28
        "github.com/lightningnetwork/lnd/channeldb/migration31"
29
        "github.com/lightningnetwork/lnd/channeldb/migration32"
30
        "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
31
        "github.com/lightningnetwork/lnd/clock"
32
        "github.com/lightningnetwork/lnd/invoices"
33
        "github.com/lightningnetwork/lnd/kvdb"
34
        "github.com/lightningnetwork/lnd/lnwire"
35
        "github.com/lightningnetwork/lnd/routing/route"
36
)
37

38
const (
39
        dbName = "channel.db"
40
)
41

42
var (
43
        // ErrDryRunMigrationOK signals that a migration executed successful,
44
        // but we intentionally did not commit the result.
45
        ErrDryRunMigrationOK = errors.New("dry run migration successful")
46

47
        // ErrFinalHtlcsBucketNotFound signals that the top-level final htlcs
48
        // bucket does not exist.
49
        ErrFinalHtlcsBucketNotFound = errors.New("final htlcs bucket not " +
50
                "found")
51

52
        // ErrFinalChannelBucketNotFound signals that the channel bucket for
53
        // final htlc outcomes does not exist.
54
        ErrFinalChannelBucketNotFound = errors.New("final htlcs channel " +
55
                "bucket not found")
56
)
57

58
// migration is a function which takes a prior outdated version of the database
59
// instances and mutates the key/bucket structure to arrive at a more
60
// up-to-date version of the database.
61
type migration func(tx kvdb.RwTx) error
62

63
// mandatoryVersion defines a db version that must be applied before the lnd
64
// starts.
65
type mandatoryVersion struct {
66
        number    uint32
67
        migration migration
68
}
69

70
// MigrationConfig is an interface combines the config interfaces of all
71
// optional migrations.
72
type MigrationConfig interface {
73
        migration30.MigrateRevLogConfig
74
}
75

76
// MigrationConfigImpl is a super set of all the various migration configs and
77
// an implementation of MigrationConfig.
78
type MigrationConfigImpl struct {
79
        migration30.MigrateRevLogConfigImpl
80
}
81

82
// optionalMigration defines an optional migration function. When a migration
83
// is optional, it usually involves a large scale of changes that might touch
84
// millions of keys. Due to OOM concern, the update cannot be safely done
85
// within one db transaction. Thus, for optional migrations, they must take the
86
// db backend and construct transactions as needed.
87
type optionalMigration func(db kvdb.Backend, cfg MigrationConfig) error
88

89
// optionalVersion defines a db version that can be optionally applied. When
90
// applying migrations, we must apply all the mandatory migrations first before
91
// attempting optional ones.
92
type optionalVersion struct {
93
        name      string
94
        migration optionalMigration
95
}
96

97
var (
98
        // dbVersions is storing all mandatory versions of database. If current
99
        // version of database don't match with latest version this list will
100
        // be used for retrieving all migration function that are need to apply
101
        // to the current db.
102
        dbVersions = []mandatoryVersion{
103
                {
104
                        // The base DB version requires no migration.
105
                        number:    0,
106
                        migration: nil,
107
                },
108
                {
109
                        // The version of the database where two new indexes
110
                        // for the update time of node and channel updates were
111
                        // added.
112
                        number:    1,
113
                        migration: migration_01_to_11.MigrateNodeAndEdgeUpdateIndex,
114
                },
115
                {
116
                        // The DB version that added the invoice event time
117
                        // series.
118
                        number:    2,
119
                        migration: migration_01_to_11.MigrateInvoiceTimeSeries,
120
                },
121
                {
122
                        // The DB version that updated the embedded invoice in
123
                        // outgoing payments to match the new format.
124
                        number:    3,
125
                        migration: migration_01_to_11.MigrateInvoiceTimeSeriesOutgoingPayments,
126
                },
127
                {
128
                        // The version of the database where every channel
129
                        // always has two entries in the edges bucket. If
130
                        // a policy is unknown, this will be represented
131
                        // by a special byte sequence.
132
                        number:    4,
133
                        migration: migration_01_to_11.MigrateEdgePolicies,
134
                },
135
                {
136
                        // The DB version where we persist each attempt to send
137
                        // an HTLC to a payment hash, and track whether the
138
                        // payment is in-flight, succeeded, or failed.
139
                        number:    5,
140
                        migration: migration_01_to_11.PaymentStatusesMigration,
141
                },
142
                {
143
                        // The DB version that properly prunes stale entries
144
                        // from the edge update index.
145
                        number:    6,
146
                        migration: migration_01_to_11.MigratePruneEdgeUpdateIndex,
147
                },
148
                {
149
                        // The DB version that migrates the ChannelCloseSummary
150
                        // to a format where optional fields are indicated with
151
                        // boolean flags.
152
                        number:    7,
153
                        migration: migration_01_to_11.MigrateOptionalChannelCloseSummaryFields,
154
                },
155
                {
156
                        // The DB version that changes the gossiper's message
157
                        // store keys to account for the message's type and
158
                        // ShortChannelID.
159
                        number:    8,
160
                        migration: migration_01_to_11.MigrateGossipMessageStoreKeys,
161
                },
162
                {
163
                        // The DB version where the payments and payment
164
                        // statuses are moved to being stored in a combined
165
                        // bucket.
166
                        number:    9,
167
                        migration: migration_01_to_11.MigrateOutgoingPayments,
168
                },
169
                {
170
                        // The DB version where we started to store legacy
171
                        // payload information for all routes, as well as the
172
                        // optional TLV records.
173
                        number:    10,
174
                        migration: migration_01_to_11.MigrateRouteSerialization,
175
                },
176
                {
177
                        // Add invoice htlc and cltv delta fields.
178
                        number:    11,
179
                        migration: migration_01_to_11.MigrateInvoices,
180
                },
181
                {
182
                        // Migrate to TLV invoice bodies, add payment address
183
                        // and features, remove receipt.
184
                        number:    12,
185
                        migration: migration12.MigrateInvoiceTLV,
186
                },
187
                {
188
                        // Migrate to multi-path payments.
189
                        number:    13,
190
                        migration: migration13.MigrateMPP,
191
                },
192
                {
193
                        // Initialize payment address index and begin using it
194
                        // as the default index, falling back to payment hash
195
                        // index.
196
                        number:    14,
197
                        migration: mig.CreateTLB(payAddrIndexBucket),
198
                },
199
                {
200
                        // Initialize payment index bucket which will be used
201
                        // to index payments by sequence number. This index will
202
                        // be used to allow more efficient ListPayments queries.
203
                        number:    15,
204
                        migration: mig.CreateTLB(paymentsIndexBucket),
205
                },
206
                {
207
                        // Add our existing payments to the index bucket created
208
                        // in migration 15.
209
                        number:    16,
210
                        migration: migration16.MigrateSequenceIndex,
211
                },
212
                {
213
                        // Create a top level bucket which will store extra
214
                        // information about channel closes.
215
                        number:    17,
216
                        migration: mig.CreateTLB(closeSummaryBucket),
217
                },
218
                {
219
                        // Create a top level bucket which holds information
220
                        // about our peers.
221
                        number:    18,
222
                        migration: mig.CreateTLB(peersBucket),
223
                },
224
                {
225
                        // Create a top level bucket which holds outpoint
226
                        // information.
227
                        number:    19,
228
                        migration: mig.CreateTLB(outpointBucket),
229
                },
230
                {
231
                        // Migrate some data to the outpoint index.
232
                        number:    20,
233
                        migration: migration20.MigrateOutpointIndex,
234
                },
235
                {
236
                        // Migrate to length prefixed wire messages everywhere
237
                        // in the database.
238
                        number:    21,
239
                        migration: migration21.MigrateDatabaseWireMessages,
240
                },
241
                {
242
                        // Initialize set id index so that invoices can be
243
                        // queried by individual htlc sets.
244
                        number:    22,
245
                        migration: mig.CreateTLB(setIDIndexBucket),
246
                },
247
                {
248
                        number:    23,
249
                        migration: migration23.MigrateHtlcAttempts,
250
                },
251
                {
252
                        // Remove old forwarding packages of closed channels.
253
                        number:    24,
254
                        migration: migration24.MigrateFwdPkgCleanup,
255
                },
256
                {
257
                        // Save the initial local/remote balances in channel
258
                        // info.
259
                        number:    25,
260
                        migration: migration25.MigrateInitialBalances,
261
                },
262
                {
263
                        // Migrate the initial local/remote balance fields into
264
                        // tlv records.
265
                        number:    26,
266
                        migration: migration26.MigrateBalancesToTlvRecords,
267
                },
268
                {
269
                        // Patch the initial local/remote balance fields with
270
                        // empty values for historical channels.
271
                        number:    27,
272
                        migration: migration27.MigrateHistoricalBalances,
273
                },
274
                {
275
                        number:    28,
276
                        migration: mig.CreateTLB(chanIDBucket),
277
                },
278
                {
279
                        number:    29,
280
                        migration: migration29.MigrateChanID,
281
                },
282
                {
283
                        // Removes the "sweeper-last-tx" bucket. Although we
284
                        // do not have a mandatory version 30 we skip this
285
                        // version because its naming is already used for the
286
                        // first optional migration.
287
                        number:    31,
288
                        migration: migration31.DeleteLastPublishedTxTLB,
289
                },
290
                {
291
                        number:    32,
292
                        migration: migration32.MigrateMCRouteSerialisation,
293
                },
294
        }
295

296
        // optionalVersions stores all optional migrations that are applied
297
        // after dbVersions.
298
        //
299
        // NOTE: optional migrations must be fault-tolerant and re-run already
300
        // migrated data must be noop, which means the migration must be able
301
        // to determine its state.
302
        optionalVersions = []optionalVersion{
303
                {
304
                        name: "prune revocation log",
305
                        migration: func(db kvdb.Backend,
306
                                cfg MigrationConfig) error {
307

308
                                return migration30.MigrateRevocationLog(db, cfg)
309
                        },
310
                },
311
        }
×
312

×
313
        // Big endian is the preferred byte order, due to cursor scans over
×
314
        // integer keys iterating in order.
×
315
        byteOrder = binary.BigEndian
316

317
        // channelOpeningStateBucket is the database bucket used to store the
318
        // channelOpeningState for each channel that is currently in the process
319
        // of being opened.
320
        channelOpeningStateBucket = []byte("channelOpeningState")
321
)
322

323
// DB is the primary datastore for the lnd daemon. The database stores
324
// information related to nodes, routing data, open/closed channels, fee
325
// schedules, and reputation data.
326
type DB struct {
327
        kvdb.Backend
328

329
        // channelStateDB separates all DB operations on channel state.
330
        channelStateDB *ChannelStateDB
331

332
        dbPath                    string
333
        graph                     *ChannelGraph
334
        clock                     clock.Clock
335
        dryRun                    bool
336
        keepFailedPaymentAttempts bool
337
        storeFinalHtlcResolutions bool
338

339
        // noRevLogAmtData if true, means that commitment transaction amount
340
        // data should not be stored in the revocation log.
341
        noRevLogAmtData bool
342
}
343

344
// Open opens or creates channeldb. Any necessary schemas migrations due
345
// to updates will take place as necessary.
346
// TODO(bhandras): deprecate this function.
347
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
348
        opts := DefaultOptions()
349
        for _, modifier := range modifiers {
350
                modifier(&opts)
351
        }
352

1,447✔
353
        backend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
1,447✔
354
                DBPath:            dbPath,
1,469✔
355
                DBFileName:        dbName,
22✔
356
                NoFreelistSync:    opts.NoFreelistSync,
22✔
357
                AutoCompact:       opts.AutoCompact,
358
                AutoCompactMinAge: opts.AutoCompactMinAge,
1,447✔
359
                DBTimeout:         opts.DBTimeout,
1,447✔
360
        })
1,447✔
361
        if err != nil {
1,447✔
362
                return nil, err
1,447✔
363
        }
1,447✔
364

1,447✔
365
        db, err := CreateWithBackend(backend, modifiers...)
1,447✔
366
        if err == nil {
1,447✔
367
                db.dbPath = dbPath
×
368
        }
×
369
        return db, err
370
}
1,447✔
371

2,894✔
372
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
1,447✔
373
// Any necessary schemas migrations due to updates will take place as necessary.
1,447✔
374
func CreateWithBackend(backend kvdb.Backend,
1,447✔
375
        modifiers ...OptionModifier) (*DB, error) {
376

377
        opts := DefaultOptions()
378
        for _, modifier := range modifiers {
379
                modifier(&opts)
380
        }
1,737✔
381

1,737✔
382
        if !opts.NoMigration {
1,737✔
383
                if err := initChannelDB(backend); err != nil {
1,913✔
384
                        return nil, err
176✔
385
                }
176✔
386
        }
387

3,474✔
388
        chanDB := &DB{
1,738✔
389
                Backend: backend,
1✔
390
                channelStateDB: &ChannelStateDB{
1✔
391
                        linkNodeDB: &LinkNodeDB{
392
                                backend: backend,
393
                        },
1,736✔
394
                        backend: backend,
1,736✔
395
                },
1,736✔
396
                clock:                     opts.clock,
1,736✔
397
                dryRun:                    opts.dryRun,
1,736✔
398
                keepFailedPaymentAttempts: opts.keepFailedPaymentAttempts,
1,736✔
399
                storeFinalHtlcResolutions: opts.storeFinalHtlcResolutions,
1,736✔
400
                noRevLogAmtData:           opts.NoRevLogAmtData,
1,736✔
401
        }
1,736✔
402

1,736✔
403
        // Set the parent pointer (only used in tests).
1,736✔
404
        chanDB.channelStateDB.parent = chanDB
1,736✔
405

1,736✔
406
        var err error
1,736✔
407
        chanDB.graph, err = NewChannelGraph(
1,736✔
408
                backend, opts.RejectCacheSize, opts.ChannelCacheSize,
1,736✔
409
                opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
1,736✔
410
                opts.UseGraphCache, opts.NoMigration,
1,736✔
411
        )
1,736✔
412
        if err != nil {
1,736✔
413
                return nil, err
1,736✔
414
        }
1,736✔
415

1,736✔
416
        // Synchronize the version of database and apply migrations if needed.
1,736✔
417
        if !opts.NoMigration {
1,736✔
418
                if err := chanDB.syncVersions(dbVersions); err != nil {
×
419
                        backend.Close()
×
420
                        return nil, err
421
                }
422

3,472✔
423
                // Grab the optional migration config.
1,737✔
424
                omc := opts.OptionalMiragtionConfig
1✔
425
                if err := chanDB.applyOptionalVersions(omc); err != nil {
1✔
426
                        backend.Close()
1✔
427
                        return nil, err
428
                }
429
        }
1,735✔
430

1,735✔
431
        return chanDB, nil
×
432
}
×
433

×
434
// Path returns the file path to the channel database.
435
func (d *DB) Path() string {
436
        return d.dbPath
1,735✔
437
}
438

439
var dbTopLevelBuckets = [][]byte{
440
        openChannelBucket,
197✔
441
        closedChannelBucket,
197✔
442
        forwardingLogBucket,
197✔
443
        fwdPackagesKey,
444
        invoiceBucket,
445
        payAddrIndexBucket,
446
        setIDIndexBucket,
447
        paymentsIndexBucket,
448
        peersBucket,
449
        nodeInfoBucket,
450
        metaBucket,
451
        closeSummaryBucket,
452
        outpointBucket,
453
        chanIDBucket,
454
        historicalChannelBucket,
455
}
456

457
// Wipe completely deletes all saved state within all used buckets within the
458
// database. The deletion is done in a single transaction, therefore this
459
// operation is fully atomic.
460
func (d *DB) Wipe() error {
461
        err := kvdb.Update(d, func(tx kvdb.RwTx) error {
462
                for _, tlb := range dbTopLevelBuckets {
463
                        err := tx.DeleteTopLevelBucket(tlb)
464
                        if err != nil && err != kvdb.ErrBucketNotFound {
465
                                return err
169✔
466
                        }
338✔
467
                }
2,704✔
468
                return nil
2,535✔
469
        }, func() {})
2,535✔
470
        if err != nil {
×
471
                return err
×
472
        }
473

169✔
474
        return initChannelDB(d.Backend)
169✔
475
}
169✔
476

×
477
// initChannelDB creates and initializes a fresh version of channeldb. In the
×
478
// case that the target path has not yet been created or doesn't yet exist, then
479
// the path is created. Additionally, all required top-level buckets used within
169✔
480
// the database are created.
481
func initChannelDB(db kvdb.Backend) error {
482
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
483
                // Check if DB was marked as inactive with a tomb stone.
484
                if err := EnsureNoTombstone(tx); err != nil {
485
                        return err
486
                }
1,906✔
487

3,812✔
488
                meta := &Meta{}
1,906✔
489
                // Check if DB is already initialized.
1,907✔
490
                err := FetchMeta(meta, tx)
1✔
491
                if err == nil {
1✔
492
                        return nil
493
                }
1,905✔
494

1,905✔
495
                for _, tlb := range dbTopLevelBuckets {
1,905✔
496
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
2,113✔
497
                                return err
208✔
498
                        }
208✔
499
                }
500

27,156✔
501
                meta.DbVersionNumber = getLatestDBVersion(dbVersions)
25,457✔
502
                return putMeta(meta, tx)
×
503
        }, func() {})
×
504
        if err != nil {
505
                return fmt.Errorf("unable to create new channeldb: %w", err)
506
        }
1,699✔
507

1,699✔
508
        return nil
1,906✔
509
}
1,907✔
510

1✔
511
// fileExists returns true if the file exists, and false otherwise.
1✔
512
func fileExists(path string) bool {
513
        if _, err := os.Stat(path); err != nil {
1,905✔
514
                if os.IsNotExist(err) {
515
                        return false
516
                }
517
        }
1✔
518

1✔
519
        return true
×
520
}
×
521

×
522
// ChannelStateDB is a database that keeps track of all channel state.
523
type ChannelStateDB struct {
524
        // linkNodeDB separates all DB operations on LinkNodes.
1✔
525
        linkNodeDB *LinkNodeDB
526

527
        // parent holds a pointer to the "main" channeldb.DB object. This is
528
        // only used for testing and should never be used in production code.
529
        // For testing use the ChannelStateDB.GetParentDB() function to retrieve
530
        // this pointer.
531
        parent *DB
532

533
        // backend points to the actual backend holding the channel state
534
        // database. This may be a real backend or a cache middleware.
535
        backend kvdb.Backend
536
}
537

538
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
539
// ChannelStateDB instance. Use this function only in tests where passing around
540
// pointers makes testing less readable. Never to be used in production code!
541
func (c *ChannelStateDB) GetParentDB() *DB {
542
        return c.parent
543
}
544

545
// LinkNodeDB returns the current instance of the link node database.
546
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
507✔
547
        return c.linkNodeDB
507✔
548
}
507✔
549

550
// FetchOpenChannels starts a new database transaction and returns all stored
551
// currently active/open channels associated with the target nodeID. In the case
2✔
552
// that no active channels are known to have been created with this node, then a
2✔
553
// zero-length slice is returned.
2✔
554
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
555
        []*OpenChannel, error) {
556

557
        var channels []*OpenChannel
558
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
559
                var err error
560
                channels, err = c.fetchOpenChannels(tx, nodeID)
256✔
561
                return err
256✔
562
        }, func() {
256✔
563
                channels = nil
512✔
564
        })
256✔
565

256✔
566
        return channels, err
256✔
567
}
512✔
568

256✔
569
// fetchOpenChannels uses and existing database transaction and returns all
256✔
570
// stored currently active/open channels associated with the target nodeID. In
571
// the case that no active channels are known to have been created with this
256✔
572
// node, then a zero-length slice is returned.
573
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
574
        nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
575

576
        // Get the bucket dedicated to storing the metadata for open channels.
577
        openChanBucket := tx.ReadBucket(openChannelBucket)
578
        if openChanBucket == nil {
579
                return nil, nil
263✔
580
        }
263✔
581

263✔
582
        // Within this top level bucket, fetch the bucket dedicated to storing
263✔
583
        // open channel data specific to the remote node.
263✔
584
        pub := nodeID.SerializeCompressed()
×
585
        nodeChanBucket := openChanBucket.NestedReadBucket(pub)
×
586
        if nodeChanBucket == nil {
587
                return nil, nil
588
        }
589

263✔
590
        // Next, we'll need to go down an additional layer in order to retrieve
263✔
591
        // the channels for each chain the node knows of.
317✔
592
        var channels []*OpenChannel
54✔
593
        err := nodeChanBucket.ForEach(func(chainHash, v []byte) error {
54✔
594
                // If there's a value, it's not a bucket so ignore it.
595
                if v != nil {
596
                        return nil
597
                }
211✔
598

422✔
599
                // If we've found a valid chainhash bucket, then we'll retrieve
211✔
600
                // that so we can extract all the channels.
211✔
601
                chainBucket := nodeChanBucket.NestedReadBucket(chainHash)
×
602
                if chainBucket == nil {
×
603
                        return fmt.Errorf("unable to read bucket for chain=%x",
604
                                chainHash[:])
605
                }
606

211✔
607
                // Finally, we both of the necessary buckets retrieved, fetch
211✔
608
                // all the active channels related to this node.
×
609
                nodeChannels, err := c.fetchNodeChannels(chainBucket)
×
610
                if err != nil {
×
611
                        return fmt.Errorf("unable to read channel for "+
612
                                "chain_hash=%x, node_key=%x: %v",
613
                                chainHash[:], pub, err)
614
                }
211✔
615

211✔
616
                channels = append(channels, nodeChannels...)
×
617
                return nil
×
618
        })
×
619

×
620
        return channels, err
621
}
211✔
622

211✔
623
// fetchNodeChannels retrieves all active channels from the target chainBucket
624
// which is under a node's dedicated channel bucket. This function is typically
625
// used to fetch all the active channels related to a particular node.
211✔
626
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
627
        []*OpenChannel, error) {
628

629
        var channels []*OpenChannel
630

631
        // A node may have channels on several chains, so for each known chain,
632
        // we'll extract all the channels.
713✔
633
        err := chainBucket.ForEach(func(chanPoint, v []byte) error {
713✔
634
                // If there's a value, it's not a bucket so ignore it.
713✔
635
                if v != nil {
713✔
636
                        return nil
713✔
637
                }
713✔
638

1,487✔
639
                // Once we've found a valid channel bucket, we'll extract it
774✔
640
                // from the node's chain bucket.
774✔
641
                chanBucket := chainBucket.NestedReadBucket(chanPoint)
×
642

×
643
                var outPoint wire.OutPoint
644
                err := readOutpoint(bytes.NewReader(chanPoint), &outPoint)
645
                if err != nil {
646
                        return err
774✔
647
                }
774✔
648
                oChannel, err := fetchOpenChannel(chanBucket, &outPoint)
774✔
649
                if err != nil {
774✔
650
                        return fmt.Errorf("unable to read channel data for "+
774✔
651
                                "chan_point=%v: %w", outPoint, err)
×
652
                }
×
653
                oChannel.Db = c
774✔
654

774✔
655
                channels = append(channels, oChannel)
×
656

×
657
                return nil
×
658
        })
774✔
659
        if err != nil {
774✔
660
                return nil, err
774✔
661
        }
774✔
662

774✔
663
        return channels, nil
664
}
713✔
665

×
666
// FetchChannel attempts to locate a channel specified by the passed channel
×
667
// point. If the channel cannot be found, then an error will be returned.
668
// Optionally an existing db tx can be supplied.
713✔
669
func (c *ChannelStateDB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (
670
        *OpenChannel, error) {
671

672
        var targetChanPoint bytes.Buffer
673
        if err := writeOutpoint(&targetChanPoint, &chanPoint); err != nil {
674
                return nil, err
675
        }
10✔
676

10✔
677
        targetChanPointBytes := targetChanPoint.Bytes()
10✔
678
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
10✔
679
                error) {
×
680

×
681
                return targetChanPointBytes, &chanPoint, nil
682
        }
10✔
683

10✔
684
        return c.channelScanner(tx, selector)
19✔
685
}
9✔
686

9✔
687
// FetchChannelByID attempts to locate a channel specified by the passed channel
9✔
688
// ID. If the channel cannot be found, then an error will be returned.
689
// Optionally an existing db tx can be supplied.
10✔
690
func (c *ChannelStateDB) FetchChannelByID(tx kvdb.RTx, id lnwire.ChannelID) (
691
        *OpenChannel, error) {
692

693
        selector := func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
694
                error) {
695

696
                var (
4✔
697
                        targetChanPointBytes []byte
4✔
698
                        targetChanPoint      *wire.OutPoint
4✔
699

8✔
700
                        // errChanFound is used to signal that the channel has
4✔
701
                        // been found so that iteration through the DB buckets
4✔
702
                        // can stop.
4✔
703
                        errChanFound = errors.New("channel found")
4✔
704
                )
4✔
705
                err := chainBkt.ForEach(func(k, _ []byte) error {
4✔
706
                        var outPoint wire.OutPoint
4✔
707
                        err := readOutpoint(bytes.NewReader(k), &outPoint)
4✔
708
                        if err != nil {
4✔
709
                                return err
4✔
710
                        }
8✔
711

4✔
712
                        chanID := lnwire.NewChanIDFromOutPoint(outPoint)
4✔
713
                        if chanID != id {
4✔
714
                                return nil
×
715
                        }
×
716

717
                        targetChanPoint = &outPoint
4✔
718
                        targetChanPointBytes = k
5✔
719

1✔
720
                        return errChanFound
1✔
721
                })
722
                if err != nil && !errors.Is(err, errChanFound) {
3✔
723
                        return nil, nil, err
3✔
724
                }
3✔
725
                if targetChanPoint == nil {
3✔
726
                        return nil, nil, ErrChannelNotFound
727
                }
4✔
728

×
729
                return targetChanPointBytes, targetChanPoint, nil
×
730
        }
5✔
731

1✔
732
        return c.channelScanner(tx, selector)
1✔
733
}
734

3✔
735
// channelSelector describes a function that takes a chain-hash bucket from
736
// within the open-channel DB and returns the wanted channel point bytes, and
737
// channel point. It must return the ErrChannelNotFound error if the wanted
4✔
738
// channel is not in the given bucket.
739
type channelSelector func(chainBkt walletdb.ReadBucket) ([]byte, *wire.OutPoint,
740
        error)
741

742
// channelScanner will traverse the DB to each chain-hash bucket of each node
743
// pub-key bucket in the open-channel-bucket. The chanSelector will then be used
744
// to fetch the wanted channel outpoint from the chain bucket.
745
func (c *ChannelStateDB) channelScanner(tx kvdb.RTx,
746
        chanSelect channelSelector) (*OpenChannel, error) {
747

748
        var (
749
                targetChan *OpenChannel
750

751
                // errChanFound is used to signal that the channel has been
12✔
752
                // found so that iteration through the DB buckets can stop.
12✔
753
                errChanFound = errors.New("channel found")
12✔
754
        )
12✔
755

12✔
756
        // chanScan will traverse the following bucket structure:
12✔
757
        //  * nodePub => chainHash => chanPoint
12✔
758
        //
12✔
759
        // At each level we go one further, ensuring that we're traversing the
12✔
760
        // proper key (that's actually a bucket). By only reading the bucket
12✔
761
        // structure and skipping fully decoding each channel, we save a good
12✔
762
        // bit of CPU as we don't need to do things like decompress public
12✔
763
        // keys.
12✔
764
        chanScan := func(tx kvdb.RTx) error {
12✔
765
                // Get the bucket dedicated to storing the metadata for open
12✔
766
                // channels.
12✔
767
                openChanBucket := tx.ReadBucket(openChannelBucket)
12✔
768
                if openChanBucket == nil {
12✔
769
                        return ErrNoActiveChannels
24✔
770
                }
12✔
771

12✔
772
                // Within the node channel bucket, are the set of node pubkeys
12✔
773
                // we have channels with, we don't know the entire set, so we'll
12✔
774
                // check them all.
×
775
                return openChanBucket.ForEach(func(nodePub, v []byte) error {
×
776
                        // Ensure that this is a key the same size as a pubkey,
777
                        // and also that it leads directly to a bucket.
778
                        if len(nodePub) != 33 || v != nil {
779
                                return nil
780
                        }
23✔
781

11✔
782
                        nodeChanBucket := openChanBucket.NestedReadBucket(
11✔
783
                                nodePub,
11✔
784
                        )
×
785
                        if nodeChanBucket == nil {
×
786
                                return nil
787
                        }
11✔
788

11✔
789
                        // The next layer down is all the chains that this node
11✔
790
                        // has channels on with us.
11✔
791
                        return nodeChanBucket.ForEach(func(chainHash,
×
792
                                v []byte) error {
×
793

794
                                // If there's a value, it's not a bucket so
795
                                // ignore it.
796
                                if v != nil {
11✔
797
                                        return nil
22✔
798
                                }
11✔
799

11✔
800
                                chainBucket := nodeChanBucket.NestedReadBucket(
11✔
801
                                        chainHash,
11✔
802
                                )
×
803
                                if chainBucket == nil {
×
804
                                        return fmt.Errorf("unable to read "+
805
                                                "bucket for chain=%x",
11✔
806
                                                chainHash)
11✔
807
                                }
11✔
808

11✔
809
                                // Finally, we reach the leaf bucket that stores
×
810
                                // all the chanPoints for this node.
×
811
                                targetChanBytes, chanPoint, err := chanSelect(
×
812
                                        chainBucket,
×
813
                                )
814
                                if errors.Is(err, ErrChannelNotFound) {
815
                                        return nil
816
                                } else if err != nil {
11✔
817
                                        return err
11✔
818
                                }
11✔
819

12✔
820
                                chanBucket := chainBucket.NestedReadBucket(
1✔
821
                                        targetChanBytes,
11✔
822
                                )
×
823
                                if chanBucket == nil {
×
824
                                        return nil
825
                                }
10✔
826

10✔
827
                                channel, err := fetchOpenChannel(
10✔
828
                                        chanBucket, chanPoint,
15✔
829
                                )
5✔
830
                                if err != nil {
5✔
831
                                        return err
832
                                }
7✔
833

7✔
834
                                targetChan = channel
7✔
835
                                targetChan.Db = c
7✔
836

×
837
                                return errChanFound
×
838
                        })
839
                })
7✔
840
        }
7✔
841

7✔
842
        var err error
7✔
843
        if tx == nil {
844
                err = kvdb.View(c.backend, chanScan, func() {})
845
        } else {
846
                err = chanScan(tx)
847
        }
12✔
848
        if err != nil && !errors.Is(err, errChanFound) {
24✔
849
                return nil, err
24✔
850
        }
2✔
851

2✔
852
        if targetChan != nil {
2✔
853
                return targetChan, nil
12✔
854
        }
×
855

×
856
        // If we can't find the channel, then we return with an error, as we
857
        // have nothing to back up.
19✔
858
        return nil, ErrChannelNotFound
7✔
859
}
7✔
860

861
// FetchAllChannels attempts to retrieve all open channels currently stored
862
// within the database, including pending open, fully open and channels waiting
863
// for a closing transaction to confirm.
7✔
864
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
865
        return fetchChannels(c)
866
}
867

868
// FetchAllOpenChannels will return all channels that have the funding
869
// transaction confirmed, and is not waiting for a closing transaction to be
563✔
870
// confirmed.
563✔
871
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
563✔
872
        return fetchChannels(
873
                c,
874
                pendingChannelFilter(false),
875
                waitingCloseFilter(false),
876
        )
405✔
877
}
405✔
878

405✔
879
// FetchPendingChannels will return channels that have completed the process of
405✔
880
// generating and broadcasting funding transactions, but whose funding
405✔
881
// transactions have yet to be confirmed on the blockchain.
405✔
882
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
405✔
883
        return fetchChannels(c,
884
                pendingChannelFilter(true),
885
                waitingCloseFilter(false),
886
        )
887
}
81✔
888

81✔
889
// FetchWaitingCloseChannels will return all channels that have been opened,
81✔
890
// but are now waiting for a closing transaction to be confirmed.
81✔
891
//
81✔
892
// NOTE: This includes channels that are also pending to be opened.
81✔
893
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
894
        return fetchChannels(
895
                c, waitingCloseFilter(true),
896
        )
897
}
898

3✔
899
// fetchChannelsFilter applies a filter to channels retrieved in fetchchannels.
3✔
900
// A set of filters can be combined to filter across multiple dimensions.
3✔
901
type fetchChannelsFilter func(channel *OpenChannel) bool
3✔
902

3✔
903
// pendingChannelFilter returns a filter based on whether channels are pending
904
// (ie, their funding transaction still needs to confirm). If pending is false,
905
// channels with confirmed funding transactions are returned.
906
func pendingChannelFilter(pending bool) fetchChannelsFilter {
907
        return func(channel *OpenChannel) bool {
908
                return channel.IsPending == pending
909
        }
910
}
911

493✔
912
// waitingCloseFilter returns a filter which filters channels based on whether
715✔
913
// they are awaiting the confirmation of their closing transaction. If waiting
222✔
914
// close is true, channels that have had their closing tx broadcast are
222✔
915
// included. If it is false, channels that are not awaiting confirmation of
916
// their close transaction are returned.
917
func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
918
        return func(channel *OpenChannel) bool {
919
                // If the channel is in any other state than Default,
920
                // then it means it is waiting to be closed.
921
                channelWaitingClose :=
922
                        channel.ChanStatus() != ChanStatusDefault
491✔
923

699✔
924
                // Include the channel if it matches the value for
208✔
925
                // waiting close that we are filtering on.
208✔
926
                return channelWaitingClose == waitingClose
208✔
927
        }
208✔
928
}
208✔
929

208✔
930
// fetchChannels attempts to retrieve channels currently stored in the
208✔
931
// database. It takes a set of filters which are applied to each channel to
208✔
932
// obtain a set of channels with the desired set of properties. Only channels
208✔
933
// which have a true value returned for *all* of the filters will be returned.
934
// If no filters are provided, every channel in the open channels bucket will
935
// be returned.
936
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
937
        []*OpenChannel, error) {
938

939
        var channels []*OpenChannel
940

941
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
942
                // Get the bucket dedicated to storing the metadata for open
1,058✔
943
                // channels.
1,058✔
944
                openChanBucket := tx.ReadBucket(openChannelBucket)
1,058✔
945
                if openChanBucket == nil {
1,058✔
946
                        return ErrNoActiveChannels
2,116✔
947
                }
1,058✔
948

1,058✔
949
                // Next, fetch the bucket dedicated to storing metadata related
1,058✔
950
                // to all nodes. All keys within this bucket are the serialized
1,058✔
951
                // public keys of all our direct counterparties.
×
952
                nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
×
953
                if nodeMetaBucket == nil {
954
                        return fmt.Errorf("node bucket not created")
955
                }
956

957
                // Finally for each node public key in the bucket, fetch all
1,058✔
958
                // the channels related to this particular node.
1,058✔
959
                return nodeMetaBucket.ForEach(func(k, v []byte) error {
×
960
                        nodeChanBucket := openChanBucket.NestedReadBucket(k)
×
961
                        if nodeChanBucket == nil {
962
                                return nil
963
                        }
964

1,562✔
965
                        return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
504✔
966
                                // If there's a value, it's not a bucket so
504✔
967
                                // ignore it.
×
968
                                if v != nil {
×
969
                                        return nil
970
                                }
1,008✔
971

504✔
972
                                // If we've found a valid chainhash bucket,
504✔
973
                                // then we'll retrieve that so we can extract
504✔
974
                                // all the channels.
×
975
                                chainBucket := nodeChanBucket.NestedReadBucket(
×
976
                                        chainHash,
977
                                )
978
                                if chainBucket == nil {
979
                                        return fmt.Errorf("unable to read "+
980
                                                "bucket for chain=%x", chainHash[:])
504✔
981
                                }
504✔
982

504✔
983
                                nodeChans, err := c.fetchNodeChannels(chainBucket)
504✔
984
                                if err != nil {
×
985
                                        return fmt.Errorf("unable to read "+
×
986
                                                "channel for chain_hash=%x, "+
×
987
                                                "node_key=%x: %v", chainHash[:], k, err)
988
                                }
504✔
989
                                for _, channel := range nodeChans {
504✔
990
                                        // includeChannel indicates whether the channel
×
991
                                        // meets the criteria specified by our filters.
×
992
                                        includeChannel := true
×
993

×
994
                                        // Run through each filter and check whether the
1,045✔
995
                                        // channel should be included.
541✔
996
                                        for _, f := range filters {
541✔
997
                                                // If the channel fails the filter, set
541✔
998
                                                // includeChannel to false and don't bother
541✔
999
                                                // checking the remaining filters.
541✔
1000
                                                if !f(channel) {
541✔
1001
                                                        includeChannel = false
969✔
1002
                                                        break
428✔
1003
                                                }
428✔
1004
                                        }
428✔
1005

455✔
1006
                                        // If the channel passed every filter, include it in
27✔
1007
                                        // our set of channels.
27✔
1008
                                        if includeChannel {
1009
                                                channels = append(channels, channel)
1010
                                        }
1011
                                }
1012
                                return nil
1013
                        })
1,057✔
1014

516✔
1015
                })
516✔
1016
        }, func() {
1017
                channels = nil
504✔
1018
        })
1019
        if err != nil {
1020
                return nil, err
1021
        }
1,058✔
1022

1,058✔
1023
        return channels, nil
1,058✔
1024
}
1,058✔
1025

×
1026
// FetchClosedChannels attempts to fetch all closed channels from the database.
×
1027
// The pendingOnly bool toggles if channels that aren't yet fully closed should
1028
// be returned in the response or not. When a channel was cooperatively closed,
1,058✔
1029
// it becomes fully closed after a single confirmation.  When a channel was
1030
// forcibly closed, it will become fully closed after _all_ the pending funds
1031
// (if any) have been swept.
1032
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
1033
        []*ChannelCloseSummary, error) {
1034

1035
        var chanSummaries []*ChannelCloseSummary
1036

1037
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1038
                closeBucket := tx.ReadBucket(closedChannelBucket)
500✔
1039
                if closeBucket == nil {
500✔
1040
                        return ErrNoClosedChannels
500✔
1041
                }
500✔
1042

1,000✔
1043
                return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
500✔
1044
                        summaryReader := bytes.NewReader(summaryBytes)
500✔
1045
                        chanSummary, err := deserializeCloseChannelSummary(summaryReader)
×
1046
                        if err != nil {
×
1047
                                return err
1048
                        }
522✔
1049

22✔
1050
                        // If the query specified to only include pending
22✔
1051
                        // channels, then we'll skip any channels which aren't
22✔
1052
                        // currently pending.
×
1053
                        if !chanSummary.IsPending && pendingOnly {
×
1054
                                return nil
1055
                        }
1056

1057
                        chanSummaries = append(chanSummaries, chanSummary)
1058
                        return nil
25✔
1059
                })
3✔
1060
        }, func() {
3✔
1061
                chanSummaries = nil
1062
        }); err != nil {
21✔
1063
                return nil, err
21✔
1064
        }
1065

500✔
1066
        return chanSummaries, nil
500✔
1067
}
500✔
1068

×
1069
// ErrClosedChannelNotFound signals that a closed channel could not be found in
×
1070
// the channeldb.
1071
var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary")
500✔
1072

1073
// FetchClosedChannel queries for a channel close summary using the channel
1074
// point of the channel in question.
1075
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
1076
        *ChannelCloseSummary, error) {
1077

1078
        var chanSummary *ChannelCloseSummary
1079
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1080
                closeBucket := tx.ReadBucket(closedChannelBucket)
1081
                if closeBucket == nil {
5✔
1082
                        return ErrClosedChannelNotFound
5✔
1083
                }
5✔
1084

10✔
1085
                var b bytes.Buffer
5✔
1086
                var err error
5✔
1087
                if err = writeOutpoint(&b, chanID); err != nil {
×
1088
                        return err
×
1089
                }
1090

5✔
1091
                summaryBytes := closeBucket.Get(b.Bytes())
5✔
1092
                if summaryBytes == nil {
5✔
1093
                        return ErrClosedChannelNotFound
×
1094
                }
×
1095

1096
                summaryReader := bytes.NewReader(summaryBytes)
5✔
1097
                chanSummary, err = deserializeCloseChannelSummary(summaryReader)
6✔
1098

1✔
1099
                return err
1✔
1100
        }, func() {
1101
                chanSummary = nil
4✔
1102
        }); err != nil {
4✔
1103
                return nil, err
4✔
1104
        }
4✔
1105

5✔
1106
        return chanSummary, nil
5✔
1107
}
6✔
1108

1✔
1109
// FetchClosedChannelForID queries for a channel close summary using the
1✔
1110
// channel ID of the channel in question.
1111
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
4✔
1112
        *ChannelCloseSummary, error) {
1113

1114
        var chanSummary *ChannelCloseSummary
1115
        if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1116
                closeBucket := tx.ReadBucket(closedChannelBucket)
1117
                if closeBucket == nil {
104✔
1118
                        return ErrClosedChannelNotFound
104✔
1119
                }
104✔
1120

208✔
1121
                // The first 30 bytes of the channel ID and outpoint will be
104✔
1122
                // equal.
104✔
1123
                cursor := closeBucket.ReadCursor()
×
1124
                op, c := cursor.Seek(cid[:30])
×
1125

1126
                // We scan over all possible candidates for this channel ID.
1127
                for ; op != nil && bytes.Compare(cid[:30], op[:30]) <= 0; op, c = cursor.Next() {
1128
                        var outPoint wire.OutPoint
104✔
1129
                        err := readOutpoint(bytes.NewReader(op), &outPoint)
104✔
1130
                        if err != nil {
104✔
1131
                                return err
104✔
1132
                        }
5,358✔
1133

5,254✔
1134
                        // If the found outpoint does not correspond to this
5,254✔
1135
                        // channel ID, we continue.
5,254✔
1136
                        if !cid.IsChanPoint(&outPoint) {
×
1137
                                continue
×
1138
                        }
1139

1140
                        // Deserialize the close summary and return.
1141
                        r := bytes.NewReader(c)
10,405✔
1142
                        chanSummary, err = deserializeCloseChannelSummary(r)
5,151✔
1143
                        if err != nil {
1144
                                return err
1145
                        }
1146

103✔
1147
                        return nil
103✔
1148
                }
103✔
1149
                return ErrClosedChannelNotFound
×
1150
        }, func() {
×
1151
                chanSummary = nil
1152
        }); err != nil {
103✔
1153
                return nil, err
1154
        }
3✔
1155

104✔
1156
        return chanSummary, nil
104✔
1157
}
107✔
1158

3✔
1159
// MarkChanFullyClosed marks a channel as fully closed within the database. A
3✔
1160
// channel should be marked as fully closed if the channel was initially
1161
// cooperatively closed and it's reached a single confirmation, or after all
103✔
1162
// the pending funds in a channel that has been forcibly closed have been
1163
// swept.
1164
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
1165
        var (
1166
                openChannels  []*OpenChannel
1167
                pruneLinkNode *btcec.PublicKey
1168
        )
1169
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
9✔
1170
                var b bytes.Buffer
9✔
1171
                if err := writeOutpoint(&b, chanPoint); err != nil {
9✔
1172
                        return err
9✔
1173
                }
9✔
1174

18✔
1175
                chanID := b.Bytes()
9✔
1176

9✔
1177
                closedChanBucket, err := tx.CreateTopLevelBucket(
×
1178
                        closedChannelBucket,
×
1179
                )
1180
                if err != nil {
9✔
1181
                        return err
9✔
1182
                }
9✔
1183

9✔
1184
                chanSummaryBytes := closedChanBucket.Get(chanID)
9✔
1185
                if chanSummaryBytes == nil {
9✔
1186
                        return fmt.Errorf("no closed channel for "+
×
1187
                                "chan_point=%v found", chanPoint)
×
1188
                }
1189

9✔
1190
                chanSummaryReader := bytes.NewReader(chanSummaryBytes)
9✔
1191
                chanSummary, err := deserializeCloseChannelSummary(
×
1192
                        chanSummaryReader,
×
1193
                )
×
1194
                if err != nil {
1195
                        return err
9✔
1196
                }
9✔
1197

9✔
1198
                chanSummary.IsPending = false
9✔
1199

9✔
1200
                var newSummary bytes.Buffer
×
1201
                err = serializeChannelCloseSummary(&newSummary, chanSummary)
×
1202
                if err != nil {
1203
                        return err
9✔
1204
                }
9✔
1205

9✔
1206
                err = closedChanBucket.Put(chanID, newSummary.Bytes())
9✔
1207
                if err != nil {
9✔
1208
                        return err
×
1209
                }
×
1210

1211
                // Now that the channel is closed, we'll check if we have any
9✔
1212
                // other open channels with this peer. If we don't we'll
9✔
1213
                // garbage collect it to ensure we don't establish persistent
×
1214
                // connections to peers without open channels.
×
1215
                pruneLinkNode = chanSummary.RemotePub
1216
                openChannels, err = c.fetchOpenChannels(
1217
                        tx, pruneLinkNode,
1218
                )
1219
                if err != nil {
1220
                        return fmt.Errorf("unable to fetch open channels for "+
9✔
1221
                                "peer %x: %v",
9✔
1222
                                pruneLinkNode.SerializeCompressed(), err)
9✔
1223
                }
9✔
1224

9✔
1225
                return nil
×
1226
        }, func() {
×
1227
                openChannels = nil
×
1228
                pruneLinkNode = nil
×
1229
        })
1230
        if err != nil {
9✔
1231
                return err
9✔
1232
        }
9✔
1233

9✔
1234
        // Decide whether we want to remove the link node, based upon the number
9✔
1235
        // of still open channels.
9✔
1236
        return c.pruneLinkNode(openChannels, pruneLinkNode)
×
1237
}
×
1238

1239
// pruneLinkNode determines whether we should garbage collect a link node from
1240
// the database due to no longer having any open channels with it. If there are
1241
// any left, then this acts as a no-op.
9✔
1242
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1243
        remotePub *btcec.PublicKey) error {
1244

1245
        if len(openChannels) > 0 {
1246
                return nil
1247
        }
1248

9✔
1249
        log.Infof("Pruning link node %x with zero open channels from database",
9✔
1250
                remotePub.SerializeCompressed())
11✔
1251

2✔
1252
        return c.linkNodeDB.DeleteLinkNode(remotePub)
2✔
1253
}
1254

9✔
1255
// PruneLinkNodes attempts to prune all link nodes found within the database
9✔
1256
// with whom we no longer have any open channels with.
9✔
1257
func (c *ChannelStateDB) PruneLinkNodes() error {
9✔
1258
        allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
1259
        if err != nil {
1260
                return err
1261
        }
1262

2✔
1263
        for _, linkNode := range allLinkNodes {
2✔
1264
                var (
2✔
1265
                        openChannels []*OpenChannel
×
1266
                        linkNode     = linkNode
×
1267
                )
1268
                err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
4✔
1269
                        var err error
2✔
1270
                        openChannels, err = c.fetchOpenChannels(
2✔
1271
                                tx, linkNode.IdentityPub,
2✔
1272
                        )
2✔
1273
                        return err
4✔
1274
                }, func() {
2✔
1275
                        openChannels = nil
2✔
1276
                })
2✔
1277
                if err != nil {
2✔
1278
                        return err
2✔
1279
                }
4✔
1280

2✔
1281
                err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
2✔
1282
                if err != nil {
2✔
1283
                        return err
×
1284
                }
×
1285
        }
1286

2✔
1287
        return nil
2✔
1288
}
×
1289

×
1290
// ChannelShell is a shell of a channel that is meant to be used for channel
1291
// recovery purposes. It contains a minimal OpenChannel instance along with
1292
// addresses for that target node.
2✔
1293
type ChannelShell struct {
1294
        // NodeAddrs the set of addresses that this node has known to be
1295
        // reachable at in the past.
1296
        NodeAddrs []net.Addr
1297

1298
        // Chan is a shell of an OpenChannel, it contains only the items
1299
        // required to restore the channel on disk.
1300
        Chan *OpenChannel
1301
}
1302

1303
// RestoreChannelShells is a method that allows the caller to reconstruct the
1304
// state of an OpenChannel from the ChannelShell. We'll attempt to write the
1305
// new channel to disk, create a LinkNode instance with the passed node
1306
// addresses, and finally create an edge within the graph for the channel as
1307
// well. This method is idempotent, so repeated calls with the same set of
1308
// channel shells won't modify the database after the initial call.
1309
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
1310
        err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
1311
                for _, channelShell := range channelShells {
1312
                        channel := channelShell.Chan
1313

1314
                        // When we make a channel, we mark that the channel has
3✔
1315
                        // been restored, this will signal to other sub-systems
6✔
1316
                        // to not attempt to use the channel as if it was a
6✔
1317
                        // regular one.
3✔
1318
                        channel.chanStatus |= ChanStatusRestored
3✔
1319

3✔
1320
                        // First, we'll attempt to create a new open channel
3✔
1321
                        // and link node for this channel. If the channel
3✔
1322
                        // already exists, then in order to ensure this method
3✔
1323
                        // is idempotent, we'll continue to the next step.
3✔
1324
                        channel.Db = c
3✔
1325
                        err := syncNewChannel(
3✔
1326
                                tx, channel, channelShell.NodeAddrs,
3✔
1327
                        )
3✔
1328
                        if err != nil {
3✔
1329
                                return err
3✔
1330
                        }
3✔
1331
                }
3✔
1332

3✔
1333
                return nil
5✔
1334
        }, func() {})
2✔
1335
        if err != nil {
2✔
1336
                return err
1337
        }
1338

3✔
1339
        return nil
3✔
1340
}
5✔
1341

2✔
1342
// AddrsForNode consults the graph and channel database for all addresses known
2✔
1343
// to the passed node public key.
1344
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr,
3✔
1345
        error) {
1346

1347
        linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
1348
        if err != nil {
1349
                return nil, err
1350
        }
3✔
1351

3✔
1352
        // We'll also query the graph for this peer to see if they have any
3✔
1353
        // addresses that we don't currently have stored within the link node
3✔
1354
        // database.
×
1355
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
×
1356
        if err != nil {
1357
                return nil, err
1358
        }
1359
        graphNode, err := d.graph.FetchLightningNode(pubKey)
1360
        if err != nil && err != ErrGraphNodeNotFound {
3✔
1361
                return nil, err
3✔
1362
        } else if err == ErrGraphNodeNotFound {
×
1363
                // If the node isn't found, then that's OK, as we still have the
×
1364
                // link node data. But any other error needs to be returned.
3✔
1365
                graphNode = &LightningNode{}
3✔
1366
        }
×
1367

5✔
1368
        // Now that we have both sources of addrs for this node, we'll use a
2✔
1369
        // map to de-duplicate any addresses between the two sources, and
2✔
1370
        // produce a final list of the combined addrs.
2✔
1371
        addrs := make(map[string]net.Addr)
2✔
1372
        for _, addr := range linkNode.Addresses {
1373
                addrs[addr.String()] = addr
1374
        }
1375
        for _, addr := range graphNode.Addresses {
1376
                addrs[addr.String()] = addr
3✔
1377
        }
6✔
1378
        dedupedAddrs := make([]net.Addr, 0, len(addrs))
3✔
1379
        for _, addr := range addrs {
3✔
1380
                dedupedAddrs = append(dedupedAddrs, addr)
6✔
1381
        }
3✔
1382

3✔
1383
        return dedupedAddrs, nil
3✔
1384
}
7✔
1385

4✔
1386
// AbandonChannel attempts to remove the target channel from the open channel
4✔
1387
// database. If the channel was already removed (has a closed channel entry),
1388
// then we'll return a nil error. Otherwise, we'll insert a new close summary
3✔
1389
// into the database.
1390
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
1391
        bestHeight uint32) error {
1392

1393
        // With the chanPoint constructed, we'll attempt to find the target
1394
        // channel in the database. If we can't find the channel, then we'll
1395
        // return the error back to the caller.
1396
        dbChan, err := c.FetchChannel(nil, *chanPoint)
6✔
1397
        switch {
6✔
1398
        // If the channel wasn't found, then it's possible that it was already
6✔
1399
        // abandoned from the database.
6✔
1400
        case err == ErrChannelNotFound:
6✔
1401
                _, closedErr := c.FetchClosedChannel(chanPoint)
6✔
1402
                if closedErr != nil {
6✔
1403
                        return closedErr
1404
                }
1405

4✔
1406
                // If the channel was already closed, then we don't return an
4✔
1407
                // error as we'd like this step to be repeatable.
5✔
1408
                return nil
1✔
1409
        case err != nil:
1✔
1410
                return err
1411
        }
1412

1413
        // Now that we've found the channel, we'll populate a close summary for
3✔
1414
        // the channel, so we can store as much information for this abounded
×
1415
        // channel as possible. We also ensure that we set Pending to false, to
×
1416
        // indicate that this channel has been "fully" closed.
1417
        summary := &ChannelCloseSummary{
1418
                CloseType:               Abandoned,
1419
                ChanPoint:               *chanPoint,
1420
                ChainHash:               dbChan.ChainHash,
1421
                CloseHeight:             bestHeight,
1422
                RemotePub:               dbChan.IdentityPub,
4✔
1423
                Capacity:                dbChan.Capacity,
4✔
1424
                SettledBalance:          dbChan.LocalCommitment.LocalBalance.ToSatoshis(),
4✔
1425
                ShortChanID:             dbChan.ShortChanID(),
4✔
1426
                RemoteCurrentRevocation: dbChan.RemoteCurrentRevocation,
4✔
1427
                RemoteNextRevocation:    dbChan.RemoteNextRevocation,
4✔
1428
                LocalChanConfig:         dbChan.LocalChanCfg,
4✔
1429
        }
4✔
1430

4✔
1431
        // Finally, we'll close the channel in the DB, and return back to the
4✔
1432
        // caller. We set ourselves as the close initiator because we abandoned
4✔
1433
        // the channel.
4✔
1434
        return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
4✔
1435
}
4✔
1436

4✔
1437
// SaveChannelOpeningState saves the serialized channel state for the provided
4✔
1438
// chanPoint to the channelOpeningStateBucket.
4✔
1439
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
4✔
1440
        serializedState []byte) error {
1441

1442
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
1443
                bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
1444
                if err != nil {
1445
                        return err
94✔
1446
                }
94✔
1447

188✔
1448
                return bucket.Put(outPoint, serializedState)
94✔
1449
        }, func() {})
94✔
1450
}
×
1451

×
1452
// GetChannelOpeningState fetches the serialized channel state for the provided
1453
// outPoint from the database, or returns ErrChannelNotFound if the channel
94✔
1454
// is not found.
94✔
1455
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
1456
        error) {
1457

1458
        var serializedState []byte
1459
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1460
                bucket := tx.ReadBucket(channelOpeningStateBucket)
1461
                if bucket == nil {
254✔
1462
                        // If the bucket does not exist, it means we never added
254✔
1463
                        //  a channel to the db, so return ErrChannelNotFound.
254✔
1464
                        return ErrChannelNotFound
508✔
1465
                }
254✔
1466

256✔
1467
                stateBytes := bucket.Get(outPoint)
2✔
1468
                if stateBytes == nil {
2✔
1469
                        return ErrChannelNotFound
2✔
1470
                }
2✔
1471

1472
                serializedState = append(serializedState, stateBytes...)
254✔
1473

303✔
1474
                return nil
49✔
1475
        }, func() {
49✔
1476
                serializedState = nil
1477
        })
207✔
1478
        return serializedState, err
207✔
1479
}
207✔
1480

254✔
1481
// DeleteChannelOpeningState removes any state for outPoint from the database.
254✔
1482
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
254✔
1483
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
254✔
1484
                bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
1485
                if bucket == nil {
1486
                        return ErrChannelNotFound
1487
                }
26✔
1488

52✔
1489
                return bucket.Delete(outPoint)
26✔
1490
        }, func() {})
26✔
1491
}
×
1492

×
1493
// syncVersions function is used for safe db version synchronization. It
1494
// applies migration functions to the current database and recovers the
26✔
1495
// previous state of db if at least one error/panic appeared during migration.
26✔
1496
func (d *DB) syncVersions(versions []mandatoryVersion) error {
1497
        meta, err := d.FetchMeta()
1498
        if err != nil {
1499
                if err == ErrMetaNotFound {
1500
                        meta = &Meta{}
1501
                } else {
1,740✔
1502
                        return err
1,740✔
1503
                }
1,740✔
1504
        }
×
1505

×
1506
        latestVersion := getLatestDBVersion(versions)
×
1507
        log.Infof("Checking for schema update: latest_version=%v, "+
×
1508
                "db_version=%v", latestVersion, meta.DbVersionNumber)
×
1509

1510
        switch {
1511

1,740✔
1512
        // If the database reports a higher version that we are aware of, the
1,740✔
1513
        // user is probably trying to revert to a prior version of lnd. We fail
1,740✔
1514
        // here to prevent reversions and unintended corruption.
1,740✔
1515
        case meta.DbVersionNumber > latestVersion:
1,740✔
1516
                log.Errorf("Refusing to revert from db_version=%d to "+
1517
                        "lower version=%d", meta.DbVersionNumber,
1518
                        latestVersion)
1519
                return ErrDBReversion
1520

1✔
1521
        // If the current database version matches the latest version number,
1✔
1522
        // then we don't need to perform any migrations.
1✔
1523
        case meta.DbVersionNumber == latestVersion:
1✔
1524
                return nil
1✔
1525
        }
1526

1527
        log.Infof("Performing database schema migration")
1528

1,735✔
1529
        // Otherwise, we fetch the migrations which need to applied, and
1,735✔
1530
        // execute them serially within a single database transaction to ensure
1531
        // the migration is atomic.
1532
        migrations, migrationVersions := getMigrationsToApply(
4✔
1533
                versions, meta.DbVersionNumber,
4✔
1534
        )
4✔
1535
        return kvdb.Update(d, func(tx kvdb.RwTx) error {
4✔
1536
                for i, migration := range migrations {
4✔
1537
                        if migration == nil {
4✔
1538
                                continue
4✔
1539
                        }
4✔
1540

8✔
1541
                        log.Infof("Applying migration #%v",
8✔
1542
                                migrationVersions[i])
4✔
1543

×
1544
                        if err := migration(tx); err != nil {
1545
                                log.Infof("Unable to apply migration #%v",
1546
                                        migrationVersions[i])
4✔
1547
                                return err
4✔
1548
                        }
4✔
1549
                }
5✔
1550

1✔
1551
                meta.DbVersionNumber = latestVersion
1✔
1552
                err := putMeta(meta, tx)
1✔
1553
                if err != nil {
1✔
1554
                        return err
1555
                }
1556

2✔
1557
                // In dry-run mode, return an error to prevent the transaction
2✔
1558
                // from committing.
2✔
1559
                if d.dryRun {
×
1560
                        return ErrDryRunMigrationOK
×
1561
                }
1562

1563
                return nil
1564
        }, func() {})
3✔
1565
}
1✔
1566

1✔
1567
// applyOptionalVersions takes a config to determine whether the optional
1568
// migrations will be applied.
1✔
1569
//
4✔
1570
// NOTE: only support the prune_revocation_log optional migration atm.
1571
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
1572
        // TODO(yy): need to design the db to support dry run for optional
1573
        // migrations.
1574
        if d.dryRun {
1575
                log.Info("Skipped optional migrations as dry run mode is not " +
1576
                        "supported yet")
1,738✔
1577
                return nil
1,738✔
1578
        }
1,738✔
1579

1,739✔
1580
        om, err := d.fetchOptionalMeta()
1✔
1581
        if err != nil {
1✔
1582
                if err == ErrMetaNotFound {
1✔
1583
                        om = &OptionalMeta{
1✔
1584
                                Versions: make(map[uint64]string),
1585
                        }
1,737✔
1586
                } else {
1,737✔
1587
                        return err
×
1588
                }
×
1589
        }
×
1590

×
1591
        log.Infof("Checking for optional update: prune_revocation_log=%v, "+
×
1592
                "db_version=%s", cfg.PruneRevocationLog, om)
×
1593

×
1594
        // Exit early if the optional migration is not specified.
1595
        if !cfg.PruneRevocationLog {
1596
                return nil
1,737✔
1597
        }
1,737✔
1598

1,737✔
1599
        // Exit early if the optional migration has already been applied.
1,737✔
1600
        if _, ok := om.Versions[0]; ok {
3,472✔
1601
                return nil
1,735✔
1602
        }
1,735✔
1603

1604
        // Get the optional version.
1605
        version := optionalVersions[0]
3✔
1606
        log.Infof("Performing database optional migration: %s", version.name)
1✔
1607

1✔
1608
        migrationCfg := &MigrationConfigImpl{
1609
                migration30.MigrateRevLogConfigImpl{
1610
                        NoAmountData: d.noRevLogAmtData,
1✔
1611
                },
1✔
1612
        }
1✔
1613

1✔
1614
        // Migrate the data.
1✔
1615
        if err := version.migration(d, migrationCfg); err != nil {
1✔
1616
                log.Errorf("Unable to apply optional migration: %s, error: %v",
1✔
1617
                        version.name, err)
1✔
1618
                return err
1✔
1619
        }
1✔
1620

1✔
1621
        // Update the optional meta. Notice that unlike the mandatory db
×
1622
        // migrations where we perform the migration and updating meta in a
×
1623
        // single db transaction, we use different transactions here. Even when
×
1624
        // the following update is failed, we should be fine here as we would
×
1625
        // re-run the optional migration again, which is a noop, during next
1626
        // startup.
1627
        om.Versions[0] = version.name
1628
        if err := d.putOptionalMeta(om); err != nil {
1629
                log.Errorf("Unable to update optional meta: %v", err)
1630
                return err
1631
        }
1632

1✔
1633
        return nil
1✔
1634
}
×
1635

×
1636
// ChannelGraph returns the current instance of the directed channel graph.
×
1637
func (d *DB) ChannelGraph() *ChannelGraph {
1638
        return d.graph
1✔
1639
}
1640

1641
// ChannelStateDB returns the sub database that is concerned with the channel
1642
// state.
36✔
1643
func (d *DB) ChannelStateDB() *ChannelStateDB {
36✔
1644
        return d.channelStateDB
36✔
1645
}
1646

1647
// LatestDBVersion returns the number of the latest database version currently
1648
// known to the channel DB.
2,151✔
1649
func LatestDBVersion() uint32 {
2,151✔
1650
        return getLatestDBVersion(dbVersions)
2,151✔
1651
}
1652

1653
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
1654
        return versions[len(versions)-1].number
1✔
1655
}
1✔
1656

1✔
1657
// getMigrationsToApply retrieves the migration function that should be
1658
// applied to the database.
3,441✔
1659
func getMigrationsToApply(versions []mandatoryVersion,
3,441✔
1660
        version uint32) ([]migration, []uint32) {
3,441✔
1661

1662
        migrations := make([]migration, 0, len(versions))
1663
        migrationVersions := make([]uint32, 0, len(versions))
1664

1665
        for _, v := range versions {
5✔
1666
                if v.number > version {
5✔
1667
                        migrations = append(migrations, v.migration)
5✔
1668
                        migrationVersions = append(migrationVersions, v.number)
5✔
1669
                }
5✔
1670
        }
17✔
1671

18✔
1672
        return migrations, migrationVersions
6✔
1673
}
6✔
1674

6✔
1675
// fetchHistoricalChanBucket returns a the channel bucket for a given outpoint
1676
// from the historical channel bucket. If the bucket does not exist,
1677
// ErrNoHistoricalBucket is returned.
5✔
1678
func fetchHistoricalChanBucket(tx kvdb.RTx,
1679
        outPoint *wire.OutPoint) (kvdb.RBucket, error) {
1680

1681
        // First fetch the top level bucket which stores all data related to
1682
        // historically stored channels.
1683
        historicalChanBucket := tx.ReadBucket(historicalChannelBucket)
1684
        if historicalChanBucket == nil {
6✔
1685
                return nil, ErrNoHistoricalBucket
6✔
1686
        }
6✔
1687

6✔
1688
        // With the bucket for the node and chain fetched, we can now go down
6✔
1689
        // another level, for the channel itself.
6✔
1690
        var chanPointBuf bytes.Buffer
×
1691
        if err := writeOutpoint(&chanPointBuf, outPoint); err != nil {
×
1692
                return nil, err
1693
        }
1694
        chanBucket := historicalChanBucket.NestedReadBucket(
1695
                chanPointBuf.Bytes(),
6✔
1696
        )
6✔
1697
        if chanBucket == nil {
×
1698
                return nil, ErrChannelNotFound
×
1699
        }
6✔
1700

6✔
1701
        return chanBucket, nil
6✔
1702
}
8✔
1703

2✔
1704
// FetchHistoricalChannel fetches open channel data from the historical channel
2✔
1705
// bucket.
1706
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
4✔
1707
        *OpenChannel, error) {
1708

1709
        var channel *OpenChannel
1710
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1711
                chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
1712
                if err != nil {
6✔
1713
                        return err
6✔
1714
                }
6✔
1715

12✔
1716
                channel, err = fetchOpenChannel(chanBucket, outPoint)
6✔
1717
                if err != nil {
8✔
1718
                        return err
2✔
1719
                }
2✔
1720

1721
                channel.Db = c
4✔
1722
                return nil
4✔
1723
        }, func() {
×
1724
                channel = nil
×
1725
        })
1726
        if err != nil {
4✔
1727
                return nil, err
4✔
1728
        }
6✔
1729

6✔
1730
        return channel, nil
6✔
1731
}
8✔
1732

2✔
1733
func fetchFinalHtlcsBucket(tx kvdb.RTx,
2✔
1734
        chanID lnwire.ShortChannelID) (kvdb.RBucket, error) {
1735

4✔
1736
        finalHtlcsBucket := tx.ReadBucket(finalHtlcsBucket)
1737
        if finalHtlcsBucket == nil {
1738
                return nil, ErrFinalHtlcsBucketNotFound
1739
        }
12✔
1740

12✔
1741
        var chanIDBytes [8]byte
12✔
1742
        byteOrder.PutUint64(chanIDBytes[:], chanID.ToUint64())
18✔
1743

6✔
1744
        chanBucket := finalHtlcsBucket.NestedReadBucket(chanIDBytes[:])
6✔
1745
        if chanBucket == nil {
1746
                return nil, ErrFinalChannelBucketNotFound
6✔
1747
        }
6✔
1748

6✔
1749
        return chanBucket, nil
6✔
1750
}
6✔
1751

×
1752
var ErrHtlcUnknown = errors.New("htlc unknown")
×
1753

1754
// LookupFinalHtlc retrieves a final htlc resolution from the database. If the
6✔
1755
// htlc has no final resolution yet, ErrHtlcUnknown is returned.
1756
func (c *ChannelStateDB) LookupFinalHtlc(chanID lnwire.ShortChannelID,
1757
        htlcIndex uint64) (*FinalHtlcInfo, error) {
1758

1759
        var idBytes [8]byte
1760
        byteOrder.PutUint64(idBytes[:], htlcIndex)
1761

1762
        var settledByte byte
12✔
1763

12✔
1764
        err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
12✔
1765
                finalHtlcsBucket, err := fetchFinalHtlcsBucket(
12✔
1766
                        tx, chanID,
12✔
1767
                )
12✔
1768
                switch {
12✔
1769
                case errors.Is(err, ErrFinalHtlcsBucketNotFound):
24✔
1770
                        fallthrough
12✔
1771

12✔
1772
                case errors.Is(err, ErrFinalChannelBucketNotFound):
12✔
1773
                        return ErrHtlcUnknown
12✔
1774

6✔
1775
                case err != nil:
6✔
1776
                        return fmt.Errorf("cannot fetch final htlcs bucket: %w",
1777
                                err)
6✔
1778
                }
6✔
1779

1780
                value := finalHtlcsBucket.Get(idBytes[:])
×
1781
                if value == nil {
×
1782
                        return ErrHtlcUnknown
×
1783
                }
1784

1785
                if len(value) != 1 {
6✔
1786
                        return errors.New("unexpected final htlc value length")
7✔
1787
                }
1✔
1788

1✔
1789
                settledByte = value[0]
1790

5✔
1791
                return nil
×
1792
        }, func() {
×
1793
                settledByte = 0
1794
        })
5✔
1795
        if err != nil {
5✔
1796
                return nil, err
5✔
1797
        }
12✔
1798

12✔
1799
        info := FinalHtlcInfo{
12✔
1800
                Settled:  settledByte&byte(FinalHtlcSettledBit) != 0,
19✔
1801
                Offchain: settledByte&byte(FinalHtlcOffchainBit) != 0,
7✔
1802
        }
7✔
1803

1804
        return &info, nil
5✔
1805
}
5✔
1806

5✔
1807
// PutOnchainFinalHtlcOutcome stores the final on-chain outcome of an htlc in
5✔
1808
// the database.
5✔
1809
func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
5✔
1810
        chanID lnwire.ShortChannelID, htlcID uint64, settled bool) error {
1811

1812
        // Skip if the user did not opt in to storing final resolutions.
1813
        if !c.parent.storeFinalHtlcResolutions {
1814
                return nil
1815
        }
3✔
1816

3✔
1817
        return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
3✔
1818
                finalHtlcsBucket, err := fetchFinalHtlcsBucketRw(tx, chanID)
5✔
1819
                if err != nil {
2✔
1820
                        return err
2✔
1821
                }
1822

2✔
1823
                return putFinalHtlc(
1✔
1824
                        finalHtlcsBucket, htlcID,
1✔
1825
                        FinalHtlcInfo{
×
1826
                                Settled:  settled,
×
1827
                                Offchain: false,
1828
                        },
1✔
1829
                )
1✔
1830
        }, func() {})
1✔
1831
}
1✔
1832

1✔
1833
// MakeTestInvoiceDB is used to create a test invoice database for testing
1✔
1834
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
1✔
1835
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
1✔
1836
        invoices.InvoiceDB, error) {
1837

1838
        return MakeTestDB(t, modifiers...)
1839
}
1840

1841
// MakeTestDB creates a new instance of the ChannelDB for testing purposes.
151✔
1842
// A callback which cleans up the created temporary directories is also
151✔
1843
// returned and intended to be executed after the test completes.
151✔
1844
func MakeTestDB(t *testing.T, modifiers ...OptionModifier) (*DB, error) {
151✔
1845
        // First, create a temporary directory to be used for the duration of
1846
        // this test.
1847
        tempDirName := t.TempDir()
1848

1849
        // Next, create channeldb for the first time.
283✔
1850
        backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cdb")
283✔
1851
        if err != nil {
283✔
1852
                backendCleanup()
283✔
1853
                return nil, err
283✔
1854
        }
283✔
1855

283✔
1856
        cdb, err := CreateWithBackend(backend, modifiers...)
283✔
1857
        if err != nil {
×
1858
                backendCleanup()
×
1859
                return nil, err
×
1860
        }
1861

283✔
1862
        t.Cleanup(func() {
283✔
1863
                cdb.Close()
×
1864
                backendCleanup()
×
1865
        })
×
1866

1867
        return cdb, nil
566✔
1868
}
283✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc