• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.23
/sweep/store.go
1
package sweep
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "io"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btcd/wire"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/tlv"
13
)
14

15
var (
16
        // txHashesBucketKey is the key that points to a bucket containing the
17
        // hashes of all sweep txes that were published successfully.
18
        //
19
        // maps: txHash -> TxRecord
20
        txHashesBucketKey = []byte("sweeper-tx-hashes")
21

22
        // utxnChainPrefix is the bucket prefix for nursery buckets.
23
        utxnChainPrefix = []byte("utxn")
24

25
        // utxnHeightIndexKey is the sub bucket where the nursery stores the
26
        // height index.
27
        utxnHeightIndexKey = []byte("height-index")
28

29
        // utxnFinalizedKndrTxnKey is a static key that can be used to locate
30
        // the nursery finalized kindergarten sweep txn.
31
        utxnFinalizedKndrTxnKey = []byte("finalized-kndr-txn")
32

33
        byteOrder = binary.BigEndian
34

35
        errNoTxHashesBucket = errors.New("tx hashes bucket does not exist")
36

37
        // ErrTxNotFound is returned when querying using a txid that's not
38
        // found in our db.
39
        ErrTxNotFound = errors.New("tx not found")
40
)
41

42
// TxRecord specifies a record of a tx that's stored in the database.
43
type TxRecord struct {
44
        // Txid is the sweeping tx's txid, which is used as the key to store
45
        // the following values.
46
        Txid chainhash.Hash
47

48
        // FeeRate is the fee rate of the sweeping tx, unit is sats/kw.
49
        FeeRate uint64
50

51
        // Fee is the fee of the sweeping tx, unit is sat.
52
        Fee uint64
53

54
        // Published indicates whether the tx has been published.
55
        Published bool
56
}
57

58
// toTlvStream converts TxRecord into a tlv representation.
59
func (t *TxRecord) toTlvStream() (*tlv.Stream, error) {
3✔
60
        const (
3✔
61
                // A set of tlv type definitions used to serialize TxRecord.
3✔
62
                // We define it here instead of the head of the file to avoid
3✔
63
                // naming conflicts.
3✔
64
                //
3✔
65
                // NOTE: A migration should be added whenever the existing type
3✔
66
                // changes.
3✔
67
                //
3✔
68
                // NOTE: Txid is stored as the key, so it's not included here.
3✔
69
                feeRateType tlv.Type = 0
3✔
70
                feeType     tlv.Type = 1
3✔
71
                boolType    tlv.Type = 2
3✔
72
        )
3✔
73

3✔
74
        return tlv.NewStream(
3✔
75
                tlv.MakeBigSizeRecord(feeRateType, &t.FeeRate),
3✔
76
                tlv.MakeBigSizeRecord(feeType, &t.Fee),
3✔
77
                tlv.MakePrimitiveRecord(boolType, &t.Published),
3✔
78
        )
3✔
79
}
3✔
80

81
// serializeTxRecord serializes a TxRecord based on tlv format.
82
func serializeTxRecord(w io.Writer, tx *TxRecord) error {
3✔
83
        // Create the tlv stream.
3✔
84
        tlvStream, err := tx.toTlvStream()
3✔
85
        if err != nil {
3✔
86
                return err
×
87
        }
×
88

89
        // Encode the tlv stream.
90
        var buf bytes.Buffer
3✔
91
        if err := tlvStream.Encode(&buf); err != nil {
3✔
92
                return err
×
93
        }
×
94

95
        // Write the tlv stream.
96
        if _, err = w.Write(buf.Bytes()); err != nil {
3✔
97
                return err
×
98
        }
×
99

100
        return nil
3✔
101
}
102

103
// deserializeTxRecord deserializes a TxRecord based on tlv format.
104
func deserializeTxRecord(r io.Reader) (*TxRecord, error) {
3✔
105
        var tx TxRecord
3✔
106

3✔
107
        // Create the tlv stream.
3✔
108
        tlvStream, err := tx.toTlvStream()
3✔
109
        if err != nil {
3✔
110
                return nil, err
×
111
        }
×
112

113
        if err := tlvStream.Decode(r); err != nil {
3✔
114
                return nil, err
×
115
        }
×
116

117
        return &tx, nil
3✔
118
}
119

120
// SweeperStore stores published txes.
121
type SweeperStore interface {
122
        // IsOurTx determines whether a tx is published by us, based on its
123
        // hash.
124
        IsOurTx(hash chainhash.Hash) bool
125

126
        // StoreTx stores a tx hash we are about to publish.
127
        StoreTx(*TxRecord) error
128

129
        // ListSweeps lists all the sweeps we have successfully published.
130
        ListSweeps() ([]chainhash.Hash, error)
131

132
        // GetTx queries the database to find the tx that matches the given
133
        // txid. Returns ErrTxNotFound if it cannot be found.
134
        GetTx(hash chainhash.Hash) (*TxRecord, error)
135

136
        // DeleteTx removes a tx specified by the hash from the store.
137
        DeleteTx(hash chainhash.Hash) error
138
}
139

140
type sweeperStore struct {
141
        db kvdb.Backend
142
}
143

144
// NewSweeperStore returns a new store instance.
145
func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) (
146
        SweeperStore, error) {
3✔
147

3✔
148
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
149
                if tx.ReadWriteBucket(txHashesBucketKey) != nil {
6✔
150
                        return nil
3✔
151
                }
3✔
152

153
                txHashesBucket, err := tx.CreateTopLevelBucket(
3✔
154
                        txHashesBucketKey,
3✔
155
                )
3✔
156
                if err != nil {
3✔
157
                        return err
×
158
                }
×
159

160
                // Use non-existence of tx hashes bucket as a signal to migrate
161
                // nursery finalized txes.
162
                err = migrateTxHashes(tx, txHashesBucket, chainHash)
3✔
163

3✔
164
                return err
3✔
165
        }, func() {})
3✔
166
        if err != nil {
3✔
167
                return nil, err
×
168
        }
×
169

170
        return &sweeperStore{
3✔
171
                db: db,
3✔
172
        }, nil
3✔
173
}
174

175
// migrateTxHashes migrates nursery finalized txes to the tx hashes bucket. This
176
// is not implemented as a database migration, to keep the downgrade path open.
177
//
178
// TODO(yy): delete this function once nursery is removed.
179
func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
180
        chainHash *chainhash.Hash) error {
3✔
181

3✔
182
        log.Infof("Migrating UTXO nursery finalized TXIDs")
3✔
183

3✔
184
        // Compose chain bucket key.
3✔
185
        var b bytes.Buffer
3✔
186
        if _, err := b.Write(utxnChainPrefix); err != nil {
3✔
187
                return err
×
188
        }
×
189

190
        if _, err := b.Write(chainHash[:]); err != nil {
3✔
191
                return err
×
192
        }
×
193

194
        // Get chain bucket if exists.
195
        chainBucket := tx.ReadWriteBucket(b.Bytes())
3✔
196
        if chainBucket == nil {
6✔
197
                return nil
3✔
198
        }
3✔
199

200
        // Retrieve the existing height index.
201
        hghtIndex := chainBucket.NestedReadWriteBucket(utxnHeightIndexKey)
×
202
        if hghtIndex == nil {
×
203
                return nil
×
204
        }
×
205

206
        // Retrieve all heights.
207
        err := hghtIndex.ForEach(func(k, v []byte) error {
×
208
                heightBucket := hghtIndex.NestedReadWriteBucket(k)
×
209
                if heightBucket == nil {
×
210
                        return nil
×
211
                }
×
212

213
                // Get finalized tx for height.
214
                txBytes := heightBucket.Get(utxnFinalizedKndrTxnKey)
×
215
                if txBytes == nil {
×
216
                        return nil
×
217
                }
×
218

219
                // Deserialize and skip tx if it cannot be deserialized.
220
                tx := &wire.MsgTx{}
×
221
                err := tx.Deserialize(bytes.NewReader(txBytes))
×
222
                if err != nil {
×
223
                        log.Warnf("Cannot deserialize utxn tx")
×
224
                        return nil
×
225
                }
×
226

227
                // Calculate hash.
228
                hash := tx.TxHash()
×
229

×
230
                // Insert utxn tx hash in hashes bucket.
×
231
                log.Debugf("Inserting nursery tx %v in hash list "+
×
232
                        "(height=%v)", hash, byteOrder.Uint32(k))
×
233

×
234
                // Create the transaction record. Since this is an old record,
×
235
                // we can assume it's already been published. Although it's
×
236
                // possible to calculate the fees and fee rate used here, we
×
237
                // skip it as it's unlikely we'd perform RBF on these old
×
238
                // sweeping transactions.
×
239
                tr := &TxRecord{
×
240
                        Txid:      hash,
×
241
                        Published: true,
×
242
                }
×
243

×
244
                // Serialize tx record.
×
245
                var b bytes.Buffer
×
246
                err = serializeTxRecord(&b, tr)
×
247
                if err != nil {
×
248
                        return err
×
249
                }
×
250

251
                return txHashesBucket.Put(tr.Txid[:], b.Bytes())
×
252
        })
253
        if err != nil {
×
254
                return err
×
255
        }
×
256

257
        return nil
×
258
}
259

260
// StoreTx stores that we are about to publish a tx.
261
func (s *sweeperStore) StoreTx(tr *TxRecord) error {
3✔
262
        return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
6✔
263
                txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
3✔
264
                if txHashesBucket == nil {
3✔
265
                        return errNoTxHashesBucket
×
266
                }
×
267

268
                // Serialize tx record.
269
                var b bytes.Buffer
3✔
270
                err := serializeTxRecord(&b, tr)
3✔
271
                if err != nil {
3✔
272
                        return err
×
273
                }
×
274

275
                return txHashesBucket.Put(tr.Txid[:], b.Bytes())
3✔
276
        }, func() {})
3✔
277
}
278

279
// IsOurTx determines whether a tx is published by us, based on its hash.
280
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) bool {
3✔
281
        var ours bool
3✔
282

3✔
283
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
284
                txHashesBucket := tx.ReadBucket(txHashesBucketKey)
3✔
285
                // If the root bucket cannot be found, we consider the tx to be
3✔
286
                // not found in our db.
3✔
287
                if txHashesBucket == nil {
3✔
288
                        log.Error("Tx hashes bucket not found in sweeper store")
×
289
                        return nil
×
290
                }
×
291

292
                ours = txHashesBucket.Get(hash[:]) != nil
3✔
293

3✔
294
                return nil
3✔
295
        }, func() {
3✔
296
                ours = false
3✔
297
        })
3✔
298
        if err != nil {
3✔
299
                return false
×
300
        }
×
301

302
        return ours
3✔
303
}
304

305
// ListSweeps lists all the sweep transactions we have in the sweeper store.
306
func (s *sweeperStore) ListSweeps() ([]chainhash.Hash, error) {
3✔
307
        var sweepTxns []chainhash.Hash
3✔
308

3✔
309
        if err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
310
                txHashesBucket := tx.ReadBucket(txHashesBucketKey)
3✔
311
                if txHashesBucket == nil {
3✔
312
                        return errNoTxHashesBucket
×
313
                }
×
314

315
                return txHashesBucket.ForEach(func(resKey, _ []byte) error {
6✔
316
                        txid, err := chainhash.NewHash(resKey)
3✔
317
                        if err != nil {
3✔
318
                                return err
×
319
                        }
×
320

321
                        sweepTxns = append(sweepTxns, *txid)
3✔
322

3✔
323
                        return nil
3✔
324
                })
325
        }, func() {
3✔
326
                sweepTxns = nil
3✔
327
        }); err != nil {
3✔
328
                return nil, err
×
329
        }
×
330

331
        return sweepTxns, nil
3✔
332
}
333

334
// GetTx queries the database to find the tx that matches the given txid.
335
// Returns ErrTxNotFound if it cannot be found.
336
func (s *sweeperStore) GetTx(txid chainhash.Hash) (*TxRecord, error) {
3✔
337
        // Create a record.
3✔
338
        tr := &TxRecord{}
3✔
339

3✔
340
        var err error
3✔
341
        err = kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
342
                txHashesBucket := tx.ReadBucket(txHashesBucketKey)
3✔
343
                if txHashesBucket == nil {
3✔
344
                        return errNoTxHashesBucket
×
345
                }
×
346

347
                txBytes := txHashesBucket.Get(txid[:])
3✔
348
                if txBytes == nil {
6✔
349
                        return ErrTxNotFound
3✔
350
                }
3✔
351

352
                // For old records, we'd get an empty byte slice here. We can
353
                // assume it's already been published. Although it's possible
354
                // to calculate the fees and fee rate used here, we skip it as
355
                // it's unlikely we'd perform RBF on these old sweeping
356
                // transactions.
357
                //
358
                // TODO(yy): remove this check once migration is added.
359
                if len(txBytes) == 0 {
3✔
UNCOV
360
                        tr.Published = true
×
UNCOV
361
                        return nil
×
UNCOV
362
                }
×
363

364
                tr, err = deserializeTxRecord(bytes.NewReader(txBytes))
3✔
365
                if err != nil {
3✔
366
                        return err
×
367
                }
×
368

369
                return nil
3✔
370
        }, func() {
3✔
371
                tr = &TxRecord{}
3✔
372
        })
3✔
373
        if err != nil {
6✔
374
                return nil, err
3✔
375
        }
3✔
376

377
        // Attach the txid to the record.
378
        tr.Txid = txid
3✔
379

3✔
380
        return tr, nil
3✔
381
}
382

383
// DeleteTx removes the given tx from db.
384
func (s *sweeperStore) DeleteTx(txid chainhash.Hash) error {
3✔
385
        return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
6✔
386
                txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
3✔
387
                if txHashesBucket == nil {
3✔
388
                        return errNoTxHashesBucket
×
389
                }
×
390

391
                return txHashesBucket.Delete(txid[:])
3✔
392
        }, func() {})
3✔
393
}
394

395
// Compile-time constraint to ensure sweeperStore implements SweeperStore.
396
var _ SweeperStore = (*sweeperStore)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc