• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16948521526

13 Aug 2025 08:27PM UTC coverage: 54.877% (-12.1%) from 66.929%
16948521526

Pull #10155

github

web-flow
Merge 61c0fecf6 into c6a9116e3
Pull Request #10155: Add missing invoice index for native sql

108941 of 198518 relevant lines covered (54.88%)

22023.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.06
/htlcswitch/decayedlog.go
1
package htlcswitch
2

3
import (
4
        "encoding/binary"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        sphinx "github.com/lightningnetwork/lightning-onion"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "github.com/lightningnetwork/lnd/kvdb"
13
)
14

15
const (
16
        // defaultDbDirectory is the default directory where our decayed log
17
        // will store our (sharedHash, CLTV) key-value pairs.
18
        defaultDbDirectory = "sharedhashes"
19
)
20

21
var (
22
        // sharedHashBucket is a bucket which houses the first HashPrefixSize
23
        // bytes of a received HTLC's hashed shared secret as the key and the HTLC's
24
        // CLTV expiry as the value.
25
        sharedHashBucket = []byte("shared-hash")
26
)
27

28
var (
29
        // ErrDecayedLogInit is used to indicate a decayed log failed to create
30
        // the proper bucketing structure on startup.
31
        ErrDecayedLogInit = errors.New("unable to initialize decayed log")
32

33
        // ErrDecayedLogCorrupted signals that the anticipated bucketing
34
        // structure has diverged since initialization.
35
        ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted")
36
)
37

38
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
39
// the decayed logs database.
40
func NewBoltBackendCreator(dbPath,
41
        dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
8✔
42

8✔
43
        return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
16✔
44
                cfg := &kvdb.BoltBackendConfig{
8✔
45
                        DBPath:            dbPath,
8✔
46
                        DBFileName:        dbFileName,
8✔
47
                        NoFreelistSync:    boltCfg.NoFreelistSync,
8✔
48
                        AutoCompact:       boltCfg.AutoCompact,
8✔
49
                        AutoCompactMinAge: boltCfg.AutoCompactMinAge,
8✔
50
                        DBTimeout:         boltCfg.DBTimeout,
8✔
51
                }
8✔
52

8✔
53
                // Use default path for log database.
8✔
54
                if dbPath == "" {
8✔
55
                        cfg.DBPath = defaultDbDirectory
×
56
                }
×
57

58
                db, err := kvdb.GetBoltBackend(cfg)
8✔
59
                if err != nil {
8✔
60
                        return nil, fmt.Errorf("could not open boltdb: %w", err)
×
61
                }
×
62

63
                return db, nil
8✔
64
        }
65
}
66

67
// DecayedLog implements the PersistLog interface. It stores the first
68
// HashPrefixSize bytes of a sha256-hashed shared secret along with a node's
69
// CLTV value. It is a decaying log meaning there will be a garbage collector
70
// to collect entries which are expired according to their stored CLTV value
71
// and the current block height. DecayedLog wraps boltdb for simplicity and
72
// batches writes to the database to decrease write contention.
73
type DecayedLog struct {
74
        started int32 // To be used atomically.
75
        stopped int32 // To be used atomically.
76

77
        db kvdb.Backend
78

79
        notifier chainntnfs.ChainNotifier
80

81
        wg   sync.WaitGroup
82
        quit chan struct{}
83
}
84

85
// NewDecayedLog creates a new DecayedLog, which caches recently seen hash
86
// shared secrets. Entries are evicted as their cltv expires using block epochs
87
// from the given notifier.
88
func NewDecayedLog(db kvdb.Backend,
89
        notifier chainntnfs.ChainNotifier) *DecayedLog {
8✔
90

8✔
91
        return &DecayedLog{
8✔
92
                db:       db,
8✔
93
                notifier: notifier,
8✔
94
                quit:     make(chan struct{}),
8✔
95
        }
8✔
96
}
8✔
97

98
// Start opens the database we will be using to store hashed shared secrets.
99
// It also starts the garbage collector in a goroutine to remove stale
100
// database entries.
101
func (d *DecayedLog) Start() error {
8✔
102
        if !atomic.CompareAndSwapInt32(&d.started, 0, 1) {
8✔
103
                return nil
×
104
        }
×
105

106
        // Initialize the primary buckets used by the decayed log.
107
        if err := d.initBuckets(); err != nil {
8✔
108
                return err
×
109
        }
×
110

111
        // Start garbage collector.
112
        if d.notifier != nil {
11✔
113
                epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil)
3✔
114
                if err != nil {
3✔
115
                        return fmt.Errorf("unable to register for epoch "+
×
116
                                "notifications: %v", err)
×
117
                }
×
118

119
                d.wg.Add(1)
3✔
120
                go d.garbageCollector(epochClient)
3✔
121
        }
122

123
        return nil
8✔
124
}
125

126
// initBuckets initializes the primary buckets used by the decayed log, namely
127
// the shared hash bucket, and batch replay
128
func (d *DecayedLog) initBuckets() error {
8✔
129
        return kvdb.Update(d.db, func(tx kvdb.RwTx) error {
16✔
130
                _, err := tx.CreateTopLevelBucket(sharedHashBucket)
8✔
131
                if err != nil {
8✔
132
                        return ErrDecayedLogInit
×
133
                }
×
134

135
                return nil
8✔
136
        }, func() {})
8✔
137
}
138

139
// Stop halts the garbage collector and closes boltdb.
140
func (d *DecayedLog) Stop() error {
11✔
141
        log.Debugf("DecayedLog shutting down...")
11✔
142
        defer log.Debugf("DecayedLog shutdown complete")
11✔
143

11✔
144
        if !atomic.CompareAndSwapInt32(&d.stopped, 0, 1) {
14✔
145
                return nil
3✔
146
        }
3✔
147

148
        // Stop garbage collector.
149
        close(d.quit)
8✔
150

8✔
151
        d.wg.Wait()
8✔
152

8✔
153
        return nil
8✔
154
}
155

156
// garbageCollector deletes entries from sharedHashBucket whose expiry height
157
// has already past. This function MUST be run as a goroutine.
158
func (d *DecayedLog) garbageCollector(epochClient *chainntnfs.BlockEpochEvent) {
3✔
159
        defer d.wg.Done()
3✔
160
        defer epochClient.Cancel()
3✔
161

3✔
162
        for {
9✔
163
                select {
6✔
164
                case epoch, ok := <-epochClient.Epochs:
3✔
165
                        if !ok {
3✔
166
                                // Block epoch was canceled, shutting down.
×
167
                                log.Infof("Block epoch canceled, " +
×
168
                                        "decaying hash log shutting down")
×
169
                                return
×
170
                        }
×
171

172
                        // Perform a bout of garbage collection using the
173
                        // epoch's block height.
174
                        height := uint32(epoch.Height)
3✔
175
                        numExpired, err := d.gcExpiredHashes(height)
3✔
176
                        if err != nil {
3✔
177
                                log.Errorf("unable to expire hashes at "+
×
178
                                        "height=%d", height)
×
179
                        }
×
180

181
                        if numExpired > 0 {
5✔
182
                                log.Infof("Garbage collected %v shared "+
2✔
183
                                        "secret hashes at height=%v",
2✔
184
                                        numExpired, height)
2✔
185
                        }
2✔
186

187
                case <-d.quit:
3✔
188
                        // Received shutdown request.
3✔
189
                        log.Infof("Decaying hash log received " +
3✔
190
                                "shutdown request")
3✔
191
                        return
3✔
192
                }
193
        }
194
}
195

196
// gcExpiredHashes purges the decaying log of all entries whose CLTV expires
197
// below the provided height.
198
func (d *DecayedLog) gcExpiredHashes(height uint32) (uint32, error) {
3✔
199
        var numExpiredHashes uint32
3✔
200

3✔
201
        err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
6✔
202
                numExpiredHashes = 0
3✔
203

3✔
204
                // Grab the shared hash bucket
3✔
205
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
3✔
206
                if sharedHashes == nil {
3✔
207
                        return fmt.Errorf("sharedHashBucket " +
×
208
                                "is nil")
×
209
                }
×
210

211
                var expiredCltv [][]byte
3✔
212
                if err := sharedHashes.ForEach(func(k, v []byte) error {
6✔
213
                        // Deserialize the CLTV value for this entry.
3✔
214
                        cltv := uint32(binary.BigEndian.Uint32(v))
3✔
215

3✔
216
                        if cltv < height {
5✔
217
                                // This CLTV is expired. We must add it to an
2✔
218
                                // array which we'll loop over and delete every
2✔
219
                                // hash contained from the db.
2✔
220
                                expiredCltv = append(expiredCltv, k)
2✔
221
                                numExpiredHashes++
2✔
222
                        }
2✔
223

224
                        return nil
3✔
225
                }); err != nil {
×
226
                        return err
×
227
                }
×
228

229
                // Delete every item in the array. This must
230
                // be done explicitly outside of the ForEach
231
                // function for safety reasons.
232
                for _, hash := range expiredCltv {
5✔
233
                        err := sharedHashes.Delete(hash)
2✔
234
                        if err != nil {
2✔
235
                                return err
×
236
                        }
×
237
                }
238

239
                return nil
3✔
240
        })
241
        if err != nil {
3✔
242
                return 0, err
×
243
        }
×
244

245
        return numExpiredHashes, nil
3✔
246
}
247

248
// Delete removes a <shared secret hash, CLTV> key-pair from the
249
// sharedHashBucket.
250
func (d *DecayedLog) Delete(hash *sphinx.HashPrefix) error {
2✔
251
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
4✔
252
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
2✔
253
                if sharedHashes == nil {
2✔
254
                        return ErrDecayedLogCorrupted
×
255
                }
×
256

257
                return sharedHashes.Delete(hash[:])
2✔
258
        })
259
}
260

261
// Get retrieves the CLTV of a processed HTLC given the first 20 bytes of the
262
// Sha-256 hash of the shared secret.
263
func (d *DecayedLog) Get(hash *sphinx.HashPrefix) (uint32, error) {
9✔
264
        var value uint32
9✔
265

9✔
266
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
18✔
267
                // Grab the shared hash bucket which stores the mapping from
9✔
268
                // truncated sha-256 hashes of shared secrets to CLTV's.
9✔
269
                sharedHashes := tx.ReadBucket(sharedHashBucket)
9✔
270
                if sharedHashes == nil {
9✔
271
                        return fmt.Errorf("sharedHashes is nil, could " +
×
272
                                "not retrieve CLTV value")
×
273
                }
×
274

275
                // Retrieve the bytes which represents the CLTV
276
                valueBytes := sharedHashes.Get(hash[:])
9✔
277
                if valueBytes == nil {
13✔
278
                        return sphinx.ErrLogEntryNotFound
4✔
279
                }
4✔
280

281
                // The first 4 bytes represent the CLTV, store it in value.
282
                value = uint32(binary.BigEndian.Uint32(valueBytes))
5✔
283

5✔
284
                return nil
5✔
285
        }, func() {
9✔
286
                value = 0
9✔
287
        })
9✔
288
        if err != nil {
13✔
289
                return value, err
4✔
290
        }
4✔
291

292
        return value, nil
5✔
293
}
294

295
// Put stores a shared secret hash as the key and the CLTV as the value.
296
func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error {
5✔
297
        // Optimisitically serialize the cltv value into the scratch buffer.
5✔
298
        var scratch [4]byte
5✔
299
        binary.BigEndian.PutUint32(scratch[:], cltv)
5✔
300

5✔
301
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
10✔
302
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
5✔
303
                if sharedHashes == nil {
5✔
304
                        return ErrDecayedLogCorrupted
×
305
                }
×
306

307
                // Check to see if this hash prefix has been recorded before. If
308
                // a value is found, this packet is being replayed.
309
                valueBytes := sharedHashes.Get(hash[:])
5✔
310
                if valueBytes != nil {
5✔
311
                        return sphinx.ErrReplayedPacket
×
312
                }
×
313

314
                return sharedHashes.Put(hash[:], scratch[:])
5✔
315
        })
316
}
317

318
// PutBatch accepts a pending batch of hashed secret entries to write to disk.
319
// Each hashed secret is inserted with a corresponding time value, dictating
320
// when the entry will be evicted from the log.
321
//
322
// TODO(yy): remove this method and use `Put` instead.
323
func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
×
324
        // Since batched boltdb txns may be executed multiple times before
×
325
        // succeeding, we will create a new replay set for each invocation to
×
326
        // avoid any side-effects. If the txn is successful, this replay set
×
327
        // will be merged with the replay set computed during batch construction
×
328
        // to generate the complete replay set. If this batch was previously
×
329
        // processed, the replay set will be deserialized from disk.
×
330
        var replays *sphinx.ReplaySet
×
331
        if err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
332
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
333
                if sharedHashes == nil {
×
334
                        return ErrDecayedLogCorrupted
×
335
                }
×
336

337
                // The CLTV will be stored into scratch and then stored into the
338
                // sharedHashBucket.
339
                var scratch [4]byte
×
340

×
341
                replays = sphinx.NewReplaySet()
×
342
                err := b.ForEach(func(seqNum uint16, hashPrefix *sphinx.HashPrefix, cltv uint32) error {
×
343
                        // Retrieve the bytes which represents the CLTV
×
344
                        valueBytes := sharedHashes.Get(hashPrefix[:])
×
345
                        if valueBytes != nil {
×
346
                                replays.Add(seqNum)
×
347
                                return nil
×
348
                        }
×
349

350
                        // Serialize the cltv value and write an entry keyed by
351
                        // the hash prefix.
352
                        binary.BigEndian.PutUint32(scratch[:], cltv)
×
353
                        return sharedHashes.Put(hashPrefix[:], scratch[:])
×
354
                })
355
                if err != nil {
×
356
                        return err
×
357
                }
×
358

359
                // Merge the replay set computed from checking the on-disk
360
                // entries with the in-batch replays computed during this
361
                // batch's construction.
362
                replays.Merge(b.ReplaySet)
×
363

×
364
                return nil
×
365
        }); err != nil {
×
366
                return nil, err
×
367
        }
×
368

369
        b.ReplaySet = replays
×
370
        b.IsCommitted = true
×
371

×
372
        return replays, nil
×
373
}
374

375
// A compile time check to see if DecayedLog adheres to the PersistLog
376
// interface.
377
var _ sphinx.ReplayLog = (*DecayedLog)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc