• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.23
/sweep/store.go
1
package sweep
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "io"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btcd/wire"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/tlv"
13
)
14

15
var (
16
        // txHashesBucketKey is the key that points to a bucket containing the
17
        // hashes of all sweep txes that were published successfully.
18
        //
19
        // maps: txHash -> TxRecord
20
        txHashesBucketKey = []byte("sweeper-tx-hashes")
21

22
        // utxnChainPrefix is the bucket prefix for nursery buckets.
23
        utxnChainPrefix = []byte("utxn")
24

25
        // utxnHeightIndexKey is the sub bucket where the nursery stores the
26
        // height index.
27
        utxnHeightIndexKey = []byte("height-index")
28

29
        // utxnFinalizedKndrTxnKey is a static key that can be used to locate
30
        // the nursery finalized kindergarten sweep txn.
31
        utxnFinalizedKndrTxnKey = []byte("finalized-kndr-txn")
32

33
        byteOrder = binary.BigEndian
34

35
        errNoTxHashesBucket = errors.New("tx hashes bucket does not exist")
36

37
        // ErrTxNotFound is returned when querying using a txid that's not
38
        // found in our db.
39
        ErrTxNotFound = errors.New("tx not found")
40
)
41

42
// TxRecord specifies a record of a tx that's stored in the database.
43
type TxRecord struct {
44
        // Txid is the sweeping tx's txid, which is used as the key to store
45
        // the following values.
46
        Txid chainhash.Hash
47

48
        // FeeRate is the fee rate of the sweeping tx, unit is sats/kw.
49
        FeeRate uint64
50

51
        // Fee is the fee of the sweeping tx, unit is sat.
52
        Fee uint64
53

54
        // Published indicates whether the tx has been published.
55
        Published bool
56
}
57

58
// toTlvStream converts TxRecord into a tlv representation.
59
func (t *TxRecord) toTlvStream() (*tlv.Stream, error) {
3✔
60
        const (
3✔
61
                // A set of tlv type definitions used to serialize TxRecord.
3✔
62
                // We define it here instead of the head of the file to avoid
3✔
63
                // naming conflicts.
3✔
64
                //
3✔
65
                // NOTE: A migration should be added whenever the existing type
3✔
66
                // changes.
3✔
67
                //
3✔
68
                // NOTE: Txid is stored as the key, so it's not included here.
3✔
69
                feeRateType tlv.Type = 0
3✔
70
                feeType     tlv.Type = 1
3✔
71
                boolType    tlv.Type = 2
3✔
72
        )
3✔
73

3✔
74
        return tlv.NewStream(
3✔
75
                tlv.MakeBigSizeRecord(feeRateType, &t.FeeRate),
3✔
76
                tlv.MakeBigSizeRecord(feeType, &t.Fee),
3✔
77
                tlv.MakePrimitiveRecord(boolType, &t.Published),
3✔
78
        )
3✔
79
}
3✔
80

81
// serializeTxRecord serializes a TxRecord based on tlv format.
82
func serializeTxRecord(w io.Writer, tx *TxRecord) error {
3✔
83
        // Create the tlv stream.
3✔
84
        tlvStream, err := tx.toTlvStream()
3✔
85
        if err != nil {
3✔
86
                return err
×
87
        }
×
88

89
        // Encode the tlv stream.
90
        var buf bytes.Buffer
3✔
91
        if err := tlvStream.Encode(&buf); err != nil {
3✔
92
                return err
×
93
        }
×
94

95
        // Write the tlv stream.
96
        if _, err = w.Write(buf.Bytes()); err != nil {
3✔
97
                return err
×
98
        }
×
99

100
        return nil
3✔
101
}
102

103
// deserializeTxRecord deserializes a TxRecord based on tlv format.
104
func deserializeTxRecord(r io.Reader) (*TxRecord, error) {
3✔
105
        var tx TxRecord
3✔
106

3✔
107
        // Create the tlv stream.
3✔
108
        tlvStream, err := tx.toTlvStream()
3✔
109
        if err != nil {
3✔
110
                return nil, err
×
111
        }
×
112

113
        if err := tlvStream.Decode(r); err != nil {
3✔
114
                return nil, err
×
115
        }
×
116

117
        return &tx, nil
3✔
118
}
119

120
// SweeperStore stores published txes.
121
type SweeperStore interface {
122
        // IsOurTx determines whether a tx is published by us, based on its
123
        // hash.
124
        IsOurTx(hash chainhash.Hash) bool
125

126
        // StoreTx stores a tx hash we are about to publish.
127
        StoreTx(*TxRecord) error
128

129
        // ListSweeps lists all the sweeps we have successfully published.
130
        ListSweeps() ([]chainhash.Hash, error)
131

132
        // GetTx queries the database to find the tx that matches the given
133
        // txid. Returns ErrTxNotFound if it cannot be found.
134
        GetTx(hash chainhash.Hash) (*TxRecord, error)
135

136
        // DeleteTx removes a tx specified by the hash from the store.
137
        DeleteTx(hash chainhash.Hash) error
138
}
139

140
type sweeperStore struct {
141
        db kvdb.Backend
142
}
143

144
// NewSweeperStore returns a new store instance.
145
func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) (
146
        SweeperStore, error) {
3✔
147

3✔
148
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
149
                if tx.ReadWriteBucket(txHashesBucketKey) != nil {
6✔
150
                        return nil
3✔
151
                }
3✔
152

153
                txHashesBucket, err := tx.CreateTopLevelBucket(
3✔
154
                        txHashesBucketKey,
3✔
155
                )
3✔
156
                if err != nil {
3✔
157
                        return err
×
158
                }
×
159

160
                // Use non-existence of tx hashes bucket as a signal to migrate
161
                // nursery finalized txes.
162
                err = migrateTxHashes(tx, txHashesBucket, chainHash)
3✔
163

3✔
164
                return err
3✔
165
        }, func() {})
3✔
166
        if err != nil {
3✔
167
                return nil, err
×
168
        }
×
169

170
        return &sweeperStore{
3✔
171
                db: db,
3✔
172
        }, nil
3✔
173
}
174

175
// migrateTxHashes migrates nursery finalized txes to the tx hashes bucket. This
176
// is not implemented as a database migration, to keep the downgrade path open.
177
//
178
// TODO(yy): delete this function once nursery is removed.
179
func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
180
        chainHash *chainhash.Hash) error {
3✔
181

3✔
182
        log.Infof("Migrating UTXO nursery finalized TXIDs")
3✔
183

3✔
184
        // Compose chain bucket key.
3✔
185
        var b bytes.Buffer
3✔
186
        if _, err := b.Write(utxnChainPrefix); err != nil {
3✔
187
                return err
×
188
        }
×
189

190
        if _, err := b.Write(chainHash[:]); err != nil {
3✔
191
                return err
×
192
        }
×
193

194
        // Get chain bucket if exists.
195
        chainBucket := tx.ReadWriteBucket(b.Bytes())
3✔
196
        if chainBucket == nil {
6✔
197
                return nil
3✔
198
        }
3✔
199

200
        // Retrieve the existing height index.
201
        hghtIndex := chainBucket.NestedReadWriteBucket(utxnHeightIndexKey)
×
202
        if hghtIndex == nil {
×
203
                return nil
×
204
        }
×
205

206
        // Retrieve all heights.
207
        err := hghtIndex.ForEach(func(k, v []byte) error {
×
208
                heightBucket := hghtIndex.NestedReadWriteBucket(k)
×
209
                if heightBucket == nil {
×
210
                        return nil
×
211
                }
×
212

213
                // Get finalized tx for height.
214
                txBytes := heightBucket.Get(utxnFinalizedKndrTxnKey)
×
215
                if txBytes == nil {
×
216
                        return nil
×
217
                }
×
218

219
                // Deserialize and skip tx if it cannot be deserialized.
220
                tx := &wire.MsgTx{}
×
221
                err := tx.Deserialize(bytes.NewReader(txBytes))
×
222
                if err != nil {
×
223
                        log.Warnf("Cannot deserialize utxn tx")
×
224
                        return nil
×
225
                }
×
226

227
                // Calculate hash.
228
                hash := tx.TxHash()
×
229

×
230
                // Insert utxn tx hash in hashes bucket.
×
231
                log.Debugf("Inserting nursery tx %v in hash list "+
×
232
                        "(height=%v)", hash, byteOrder.Uint32(k))
×
233

×
234
                // Create the transaction record. Since this is an old record,
×
235
                // we can assume it's already been published. Although it's
×
236
                // possible to calculate the fees and fee rate used here, we
×
237
                // skip it as it's unlikely we'd perform RBF on these old
×
238
                // sweeping transactions.
×
239
                tr := &TxRecord{
×
240
                        Txid:      hash,
×
241
                        Published: true,
×
242
                }
×
243

×
244
                // Serialize tx record.
×
245
                var b bytes.Buffer
×
246
                err = serializeTxRecord(&b, tr)
×
247
                if err != nil {
×
248
                        return err
×
249
                }
×
250

251
                return txHashesBucket.Put(tr.Txid[:], b.Bytes())
×
252
        })
253
        if err != nil {
×
254
                return err
×
255
        }
×
256

257
        return nil
×
258
}
259

260
// StoreTx stores that we are about to publish a tx.
261
func (s *sweeperStore) StoreTx(tr *TxRecord) error {
3✔
262
        return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
6✔
263
                txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
3✔
264
                if txHashesBucket == nil {
3✔
265
                        return errNoTxHashesBucket
×
266
                }
×
267

268
                // Serialize tx record.
269
                var b bytes.Buffer
3✔
270
                err := serializeTxRecord(&b, tr)
3✔
271
                if err != nil {
3✔
272
                        return err
×
273
                }
×
274

275
                return txHashesBucket.Put(tr.Txid[:], b.Bytes())
3✔
276
        }, func() {})
3✔
277
}
278

279
// IsOurTx determines whether a tx is published by us, based on its hash.
280
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) bool {
3✔
281
        var ours bool
3✔
282

3✔
283
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
284
                txHashesBucket := tx.ReadBucket(txHashesBucketKey)
3✔
285
                // If the root bucket cannot be found, we consider the tx to be
3✔
286
                // not found in our db.
3✔
287
                if txHashesBucket == nil {
3✔
288
                        log.Error("Tx hashes bucket not found in sweeper store")
×
289
                        return nil
×
290
                }
×
291

292
                ours = txHashesBucket.Get(hash[:]) != nil
3✔
293

3✔
294
                return nil
3✔
295
        }, func() {
3✔
296
                ours = false
3✔
297
        })
3✔
298
        if err != nil {
3✔
299
                return false
×
300
        }
×
301

302
        return ours
3✔
303
}
304

305
// ListSweeps lists all the sweep transactions we have in the sweeper store.
306
func (s *sweeperStore) ListSweeps() ([]chainhash.Hash, error) {
3✔
307
        var sweepTxns []chainhash.Hash
3✔
308

3✔
309
        if err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
310
                txHashesBucket := tx.ReadBucket(txHashesBucketKey)
3✔
311
                if txHashesBucket == nil {
3✔
312
                        return errNoTxHashesBucket
×
313
                }
×
314

315
                return txHashesBucket.ForEach(func(resKey, _ []byte) error {
6✔
316
                        txid, err := chainhash.NewHash(resKey)
3✔
317
                        if err != nil {
3✔
318
                                return err
×
319
                        }
×
320

321
                        sweepTxns = append(sweepTxns, *txid)
3✔
322

3✔
323
                        return nil
3✔
324
                })
325
        }, func() {
3✔
326
                sweepTxns = nil
3✔
327
        }); err != nil {
3✔
328
                return nil, err
×
329
        }
×
330

331
        return sweepTxns, nil
3✔
332
}
333

334
// GetTx queries the database to find the tx that matches the given txid.
335
// Returns ErrTxNotFound if it cannot be found.
336
func (s *sweeperStore) GetTx(txid chainhash.Hash) (*TxRecord, error) {
3✔
337
        // Create a record.
3✔
338
        tr := &TxRecord{}
3✔
339

3✔
340
        var err error
3✔
341
        err = kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
342
                txHashesBucket := tx.ReadBucket(txHashesBucketKey)
3✔
343
                if txHashesBucket == nil {
3✔
344
                        return errNoTxHashesBucket
×
345
                }
×
346

347
                txBytes := txHashesBucket.Get(txid[:])
3✔
348
                if txBytes == nil {
6✔
349
                        return ErrTxNotFound
3✔
350
                }
3✔
351

352
                // For old records, we'd get an empty byte slice here. We can
353
                // assume it's already been published. Although it's possible
354
                // to calculate the fees and fee rate used here, we skip it as
355
                // it's unlikely we'd perform RBF on these old sweeping
356
                // transactions.
357
                //
358
                // TODO(yy): remove this check once migration is added.
359
                if len(txBytes) == 0 {
3✔
UNCOV
360
                        tr.Published = true
×
UNCOV
361
                        return nil
×
UNCOV
362
                }
×
363

364
                tr, err = deserializeTxRecord(bytes.NewReader(txBytes))
3✔
365
                if err != nil {
3✔
366
                        return err
×
367
                }
×
368

369
                return nil
3✔
370
        }, func() {
3✔
371
                tr = &TxRecord{}
3✔
372
        })
3✔
373
        if err != nil {
6✔
374
                return nil, err
3✔
375
        }
3✔
376

377
        // Attach the txid to the record.
378
        tr.Txid = txid
3✔
379

3✔
380
        return tr, nil
3✔
381
}
382

383
// DeleteTx removes the given tx from db.
384
func (s *sweeperStore) DeleteTx(txid chainhash.Hash) error {
3✔
385
        return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
6✔
386
                txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
3✔
387
                if txHashesBucket == nil {
3✔
388
                        return errNoTxHashesBucket
×
389
                }
×
390

391
                return txHashesBucket.Delete(txid[:])
3✔
392
        }, func() {})
3✔
393
}
394

395
// Compile-time constraint to ensure sweeperStore implements SweeperStore.
396
var _ SweeperStore = (*sweeperStore)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc