• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.5
/sweep/store.go
1
package sweep
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "io"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/btcsuite/btcd/wire"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/tlv"
13
)
14

15
var (
16
        // txHashesBucketKey is the key that points to a bucket containing the
17
        // hashes of all sweep txes that were published successfully.
18
        //
19
        // maps: txHash -> TxRecord
20
        txHashesBucketKey = []byte("sweeper-tx-hashes")
21

22
        // utxnChainPrefix is the bucket prefix for nursery buckets.
23
        utxnChainPrefix = []byte("utxn")
24

25
        // utxnHeightIndexKey is the sub bucket where the nursery stores the
26
        // height index.
27
        utxnHeightIndexKey = []byte("height-index")
28

29
        // utxnFinalizedKndrTxnKey is a static key that can be used to locate
30
        // the nursery finalized kindergarten sweep txn.
31
        utxnFinalizedKndrTxnKey = []byte("finalized-kndr-txn")
32

33
        byteOrder = binary.BigEndian
34

35
        errNoTxHashesBucket = errors.New("tx hashes bucket does not exist")
36

37
        // ErrTxNotFound is returned when querying using a txid that's not
38
        // found in our db.
39
        ErrTxNotFound = errors.New("tx not found")
40
)
41

42
// TxRecord specifies a record of a tx that's stored in the database.
43
type TxRecord struct {
44
        // Txid is the sweeping tx's txid, which is used as the key to store
45
        // the following values.
46
        Txid chainhash.Hash
47

48
        // FeeRate is the fee rate of the sweeping tx, unit is sats/kw.
49
        FeeRate uint64
50

51
        // Fee is the fee of the sweeping tx, unit is sat.
52
        Fee uint64
53

54
        // Published indicates whether the tx has been published.
55
        Published bool
56
}
57

58
// toTlvStream converts TxRecord into a tlv representation.
59
func (t *TxRecord) toTlvStream() (*tlv.Stream, error) {
7✔
60
        const (
7✔
61
                // A set of tlv type definitions used to serialize TxRecord.
7✔
62
                // We define it here instead of the head of the file to avoid
7✔
63
                // naming conflicts.
7✔
64
                //
7✔
65
                // NOTE: A migration should be added whenever the existing type
7✔
66
                // changes.
7✔
67
                //
7✔
68
                // NOTE: Txid is stored as the key, so it's not included here.
7✔
69
                feeRateType tlv.Type = 0
7✔
70
                feeType     tlv.Type = 1
7✔
71
                boolType    tlv.Type = 2
7✔
72
        )
7✔
73

7✔
74
        return tlv.NewStream(
7✔
75
                tlv.MakeBigSizeRecord(feeRateType, &t.FeeRate),
7✔
76
                tlv.MakeBigSizeRecord(feeType, &t.Fee),
7✔
77
                tlv.MakePrimitiveRecord(boolType, &t.Published),
7✔
78
        )
7✔
79
}
7✔
80

81
// serializeTxRecord serializes a TxRecord based on tlv format.
82
func serializeTxRecord(w io.Writer, tx *TxRecord) error {
5✔
83
        // Create the tlv stream.
5✔
84
        tlvStream, err := tx.toTlvStream()
5✔
85
        if err != nil {
5✔
86
                return err
×
87
        }
×
88

89
        // Encode the tlv stream.
90
        var buf bytes.Buffer
5✔
91
        if err := tlvStream.Encode(&buf); err != nil {
5✔
92
                return err
×
93
        }
×
94

95
        // Write the tlv stream.
96
        if _, err = w.Write(buf.Bytes()); err != nil {
5✔
97
                return err
×
98
        }
×
99

100
        return nil
5✔
101
}
102

103
// deserializeTxRecord deserializes a TxRecord based on tlv format.
104
func deserializeTxRecord(r io.Reader) (*TxRecord, error) {
2✔
105
        var tx TxRecord
2✔
106

2✔
107
        // Create the tlv stream.
2✔
108
        tlvStream, err := tx.toTlvStream()
2✔
109
        if err != nil {
2✔
110
                return nil, err
×
111
        }
×
112

113
        if err := tlvStream.Decode(r); err != nil {
2✔
114
                return nil, err
×
115
        }
×
116

117
        return &tx, nil
2✔
118
}
119

120
// SweeperStore stores published txes.
121
type SweeperStore interface {
122
        // IsOurTx determines whether a tx is published by us, based on its
123
        // hash.
124
        IsOurTx(hash chainhash.Hash) (bool, error)
125

126
        // StoreTx stores a tx hash we are about to publish.
127
        StoreTx(*TxRecord) error
128

129
        // ListSweeps lists all the sweeps we have successfully published.
130
        ListSweeps() ([]chainhash.Hash, error)
131

132
        // GetTx queries the database to find the tx that matches the given
133
        // txid. Returns ErrTxNotFound if it cannot be found.
134
        GetTx(hash chainhash.Hash) (*TxRecord, error)
135

136
        // DeleteTx removes a tx specified by the hash from the store.
137
        DeleteTx(hash chainhash.Hash) error
138
}
139

140
type sweeperStore struct {
141
        db kvdb.Backend
142
}
143

144
// NewSweeperStore returns a new store instance.
145
func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) (
146
        SweeperStore, error) {
5✔
147

5✔
148
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
10✔
149
                if tx.ReadWriteBucket(txHashesBucketKey) != nil {
6✔
150
                        return nil
1✔
151
                }
1✔
152

153
                txHashesBucket, err := tx.CreateTopLevelBucket(
4✔
154
                        txHashesBucketKey,
4✔
155
                )
4✔
156
                if err != nil {
4✔
157
                        return err
×
158
                }
×
159

160
                // Use non-existence of tx hashes bucket as a signal to migrate
161
                // nursery finalized txes.
162
                err = migrateTxHashes(tx, txHashesBucket, chainHash)
4✔
163

4✔
164
                return err
4✔
165
        }, func() {})
5✔
166
        if err != nil {
5✔
167
                return nil, err
×
168
        }
×
169

170
        return &sweeperStore{
5✔
171
                db: db,
5✔
172
        }, nil
5✔
173
}
174

175
// migrateTxHashes migrates nursery finalized txes to the tx hashes bucket. This
176
// is not implemented as a database migration, to keep the downgrade path open.
177
//
178
// TODO(yy): delete this function once nursery is removed.
179
func migrateTxHashes(tx kvdb.RwTx, txHashesBucket kvdb.RwBucket,
180
        chainHash *chainhash.Hash) error {
4✔
181

4✔
182
        log.Infof("Migrating UTXO nursery finalized TXIDs")
4✔
183

4✔
184
        // Compose chain bucket key.
4✔
185
        var b bytes.Buffer
4✔
186
        if _, err := b.Write(utxnChainPrefix); err != nil {
4✔
187
                return err
×
188
        }
×
189

190
        if _, err := b.Write(chainHash[:]); err != nil {
4✔
191
                return err
×
192
        }
×
193

194
        // Get chain bucket if exists.
195
        chainBucket := tx.ReadWriteBucket(b.Bytes())
4✔
196
        if chainBucket == nil {
8✔
197
                return nil
4✔
198
        }
4✔
199

200
        // Retrieve the existing height index.
201
        hghtIndex := chainBucket.NestedReadWriteBucket(utxnHeightIndexKey)
×
202
        if hghtIndex == nil {
×
203
                return nil
×
204
        }
×
205

206
        // Retrieve all heights.
207
        err := hghtIndex.ForEach(func(k, v []byte) error {
×
208
                heightBucket := hghtIndex.NestedReadWriteBucket(k)
×
209
                if heightBucket == nil {
×
210
                        return nil
×
211
                }
×
212

213
                // Get finalized tx for height.
214
                txBytes := heightBucket.Get(utxnFinalizedKndrTxnKey)
×
215
                if txBytes == nil {
×
216
                        return nil
×
217
                }
×
218

219
                // Deserialize and skip tx if it cannot be deserialized.
220
                tx := &wire.MsgTx{}
×
221
                err := tx.Deserialize(bytes.NewReader(txBytes))
×
222
                if err != nil {
×
223
                        log.Warnf("Cannot deserialize utxn tx")
×
224
                        return nil
×
225
                }
×
226

227
                // Calculate hash.
228
                hash := tx.TxHash()
×
229

×
230
                // Insert utxn tx hash in hashes bucket.
×
231
                log.Debugf("Inserting nursery tx %v in hash list "+
×
232
                        "(height=%v)", hash, byteOrder.Uint32(k))
×
233

×
234
                // Create the transaction record. Since this is an old record,
×
235
                // we can assume it's already been published. Although it's
×
236
                // possible to calculate the fees and fee rate used here, we
×
237
                // skip it as it's unlikely we'd perform RBF on these old
×
238
                // sweeping transactions.
×
239
                tr := &TxRecord{
×
240
                        Txid:      hash,
×
241
                        Published: true,
×
242
                }
×
243

×
244
                // Serialize tx record.
×
245
                var b bytes.Buffer
×
246
                err = serializeTxRecord(&b, tr)
×
247
                if err != nil {
×
248
                        return err
×
249
                }
×
250

251
                return txHashesBucket.Put(tr.Txid[:], b.Bytes())
×
252
        })
253
        if err != nil {
×
254
                return err
×
255
        }
×
256

257
        return nil
×
258
}
259

260
// StoreTx stores that we are about to publish a tx.
261
func (s *sweeperStore) StoreTx(tr *TxRecord) error {
4✔
262
        return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
8✔
263
                txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
4✔
264
                if txHashesBucket == nil {
4✔
265
                        return errNoTxHashesBucket
×
266
                }
×
267

268
                // Serialize tx record.
269
                var b bytes.Buffer
4✔
270
                err := serializeTxRecord(&b, tr)
4✔
271
                if err != nil {
4✔
272
                        return err
×
273
                }
×
274

275
                return txHashesBucket.Put(tr.Txid[:], b.Bytes())
4✔
276
        }, func() {})
4✔
277
}
278

279
// IsOurTx determines whether a tx is published by us, based on its
280
// hash.
281
func (s *sweeperStore) IsOurTx(hash chainhash.Hash) (bool, error) {
3✔
282
        var ours bool
3✔
283

3✔
284
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
285
                txHashesBucket := tx.ReadBucket(txHashesBucketKey)
3✔
286
                if txHashesBucket == nil {
3✔
287
                        return errNoTxHashesBucket
×
288
                }
×
289

290
                ours = txHashesBucket.Get(hash[:]) != nil
3✔
291

3✔
292
                return nil
3✔
293
        }, func() {
3✔
294
                ours = false
3✔
295
        })
3✔
296
        if err != nil {
3✔
UNCOV
297
                return false, err
×
UNCOV
298
        }
×
299

300
        return ours, nil
3✔
301
}
302

303
// ListSweeps lists all the sweep transactions we have in the sweeper store.
304
func (s *sweeperStore) ListSweeps() ([]chainhash.Hash, error) {
1✔
305
        var sweepTxns []chainhash.Hash
1✔
306

1✔
307
        if err := kvdb.View(s.db, func(tx kvdb.RTx) error {
2✔
308
                txHashesBucket := tx.ReadBucket(txHashesBucketKey)
1✔
309
                if txHashesBucket == nil {
1✔
UNCOV
310
                        return errNoTxHashesBucket
×
UNCOV
311
                }
×
312

313
                return txHashesBucket.ForEach(func(resKey, _ []byte) error {
3✔
314
                        txid, err := chainhash.NewHash(resKey)
2✔
315
                        if err != nil {
2✔
UNCOV
316
                                return err
×
UNCOV
317
                        }
×
318

319
                        sweepTxns = append(sweepTxns, *txid)
2✔
320

2✔
321
                        return nil
2✔
322
                })
323
        }, func() {
1✔
324
                sweepTxns = nil
1✔
325
        }); err != nil {
1✔
UNCOV
326
                return nil, err
×
UNCOV
327
        }
×
328

329
        return sweepTxns, nil
1✔
330
}
331

332
// GetTx queries the database to find the tx that matches the given txid.
333
// Returns ErrTxNotFound if it cannot be found.
334
func (s *sweeperStore) GetTx(txid chainhash.Hash) (*TxRecord, error) {
4✔
335
        // Create a record.
4✔
336
        tr := &TxRecord{}
4✔
337

4✔
338
        var err error
4✔
339
        err = kvdb.View(s.db, func(tx kvdb.RTx) error {
8✔
340
                txHashesBucket := tx.ReadBucket(txHashesBucketKey)
4✔
341
                if txHashesBucket == nil {
4✔
UNCOV
342
                        return errNoTxHashesBucket
×
UNCOV
343
                }
×
344

345
                txBytes := txHashesBucket.Get(txid[:])
4✔
346
                if txBytes == nil {
6✔
347
                        return ErrTxNotFound
2✔
348
                }
2✔
349

350
                // For old records, we'd get an empty byte slice here. We can
351
                // assume it's already been published. Although it's possible
352
                // to calculate the fees and fee rate used here, we skip it as
353
                // it's unlikely we'd perform RBF on these old sweeping
354
                // transactions.
355
                //
356
                // TODO(yy): remove this check once migration is added.
357
                if len(txBytes) == 0 {
3✔
358
                        tr.Published = true
1✔
359
                        return nil
1✔
360
                }
1✔
361

362
                tr, err = deserializeTxRecord(bytes.NewReader(txBytes))
1✔
363
                if err != nil {
1✔
UNCOV
364
                        return err
×
UNCOV
365
                }
×
366

367
                return nil
1✔
368
        }, func() {
4✔
369
                tr = &TxRecord{}
4✔
370
        })
4✔
371
        if err != nil {
6✔
372
                return nil, err
2✔
373
        }
2✔
374

375
        // Attach the txid to the record.
376
        tr.Txid = txid
2✔
377

2✔
378
        return tr, nil
2✔
379
}
380

381
// DeleteTx removes the given tx from db.
382
func (s *sweeperStore) DeleteTx(txid chainhash.Hash) error {
2✔
383
        return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
4✔
384
                txHashesBucket := tx.ReadWriteBucket(txHashesBucketKey)
2✔
385
                if txHashesBucket == nil {
2✔
UNCOV
386
                        return errNoTxHashesBucket
×
UNCOV
387
                }
×
388

389
                return txHashesBucket.Delete(txid[:])
2✔
390
        }, func() {})
2✔
391
}
392

393
// Compile-time constraint to ensure sweeperStore implements SweeperStore.
394
var _ SweeperStore = (*sweeperStore)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc