• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13440912774

20 Feb 2025 05:14PM UTC coverage: 57.697% (-1.1%) from 58.802%
13440912774

Pull #9535

github

guggero
GitHub: remove duplicate caching

Turns out that actions/setup-go starting with @v4 also adds caching.
With that, our cache size on disk has almost doubled, leading to the
GitHub runner running out of space in certain situation.
We fix that by disabling the automated caching since we already have our
own, custom-tailored version.
Pull Request #9535: GitHub: remove duplicate caching

103519 of 179417 relevant lines covered (57.7%)

24825.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.0
/htlcswitch/decayedlog.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "sync/atomic"
10

11
        sphinx "github.com/lightningnetwork/lightning-onion"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
)
15

16
const (
17
        // defaultDbDirectory is the default directory where our decayed log
18
        // will store our (sharedHash, CLTV) key-value pairs.
19
        defaultDbDirectory = "sharedhashes"
20
)
21

22
var (
23
        // sharedHashBucket is a bucket which houses the first HashPrefixSize
24
        // bytes of a received HTLC's hashed shared secret as the key and the HTLC's
25
        // CLTV expiry as the value.
26
        sharedHashBucket = []byte("shared-hash")
27

28
        // batchReplayBucket is a bucket that maps batch identifiers to
29
        // serialized ReplaySets. This is used to give idempotency in the event
30
        // that a batch is processed more than once.
31
        batchReplayBucket = []byte("batch-replay")
32
)
33

34
var (
35
        // ErrDecayedLogInit is used to indicate a decayed log failed to create
36
        // the proper bucketing structure on startup.
37
        ErrDecayedLogInit = errors.New("unable to initialize decayed log")
38

39
        // ErrDecayedLogCorrupted signals that the anticipated bucketing
40
        // structure has diverged since initialization.
41
        ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted")
42
)
43

44
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
45
// the decayed logs database.
46
func NewBoltBackendCreator(dbPath,
47
        dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
8✔
48

8✔
49
        return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
16✔
50
                cfg := &kvdb.BoltBackendConfig{
8✔
51
                        DBPath:            dbPath,
8✔
52
                        DBFileName:        dbFileName,
8✔
53
                        NoFreelistSync:    boltCfg.NoFreelistSync,
8✔
54
                        AutoCompact:       boltCfg.AutoCompact,
8✔
55
                        AutoCompactMinAge: boltCfg.AutoCompactMinAge,
8✔
56
                        DBTimeout:         boltCfg.DBTimeout,
8✔
57
                }
8✔
58

8✔
59
                // Use default path for log database.
8✔
60
                if dbPath == "" {
8✔
61
                        cfg.DBPath = defaultDbDirectory
×
62
                }
×
63

64
                db, err := kvdb.GetBoltBackend(cfg)
8✔
65
                if err != nil {
8✔
66
                        return nil, fmt.Errorf("could not open boltdb: %w", err)
×
67
                }
×
68

69
                return db, nil
8✔
70
        }
71
}
72

73
// DecayedLog implements the PersistLog interface. It stores the first
74
// HashPrefixSize bytes of a sha256-hashed shared secret along with a node's
75
// CLTV value. It is a decaying log meaning there will be a garbage collector
76
// to collect entries which are expired according to their stored CLTV value
77
// and the current block height. DecayedLog wraps boltdb for simplicity and
78
// batches writes to the database to decrease write contention.
79
type DecayedLog struct {
80
        started int32 // To be used atomically.
81
        stopped int32 // To be used atomically.
82

83
        db kvdb.Backend
84

85
        notifier chainntnfs.ChainNotifier
86

87
        wg   sync.WaitGroup
88
        quit chan struct{}
89
}
90

91
// NewDecayedLog creates a new DecayedLog, which caches recently seen hash
92
// shared secrets. Entries are evicted as their cltv expires using block epochs
93
// from the given notifier.
94
func NewDecayedLog(db kvdb.Backend,
95
        notifier chainntnfs.ChainNotifier) *DecayedLog {
8✔
96

8✔
97
        return &DecayedLog{
8✔
98
                db:       db,
8✔
99
                notifier: notifier,
8✔
100
                quit:     make(chan struct{}),
8✔
101
        }
8✔
102
}
8✔
103

104
// Start opens the database we will be using to store hashed shared secrets.
105
// It also starts the garbage collector in a goroutine to remove stale
106
// database entries.
107
func (d *DecayedLog) Start() error {
8✔
108
        if !atomic.CompareAndSwapInt32(&d.started, 0, 1) {
8✔
109
                return nil
×
110
        }
×
111

112
        // Initialize the primary buckets used by the decayed log.
113
        if err := d.initBuckets(); err != nil {
8✔
114
                return err
×
115
        }
×
116

117
        // Start garbage collector.
118
        if d.notifier != nil {
11✔
119
                epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil)
3✔
120
                if err != nil {
3✔
121
                        return fmt.Errorf("unable to register for epoch "+
×
122
                                "notifications: %v", err)
×
123
                }
×
124

125
                d.wg.Add(1)
3✔
126
                go d.garbageCollector(epochClient)
3✔
127
        }
128

129
        return nil
8✔
130
}
131

132
// initBuckets initializes the primary buckets used by the decayed log, namely
133
// the shared hash bucket, and batch replay
134
func (d *DecayedLog) initBuckets() error {
8✔
135
        return kvdb.Update(d.db, func(tx kvdb.RwTx) error {
16✔
136
                _, err := tx.CreateTopLevelBucket(sharedHashBucket)
8✔
137
                if err != nil {
8✔
138
                        return ErrDecayedLogInit
×
139
                }
×
140

141
                _, err = tx.CreateTopLevelBucket(batchReplayBucket)
8✔
142
                if err != nil {
8✔
143
                        return ErrDecayedLogInit
×
144
                }
×
145

146
                return nil
8✔
147
        }, func() {})
8✔
148
}
149

150
// Stop halts the garbage collector and closes boltdb.
151
func (d *DecayedLog) Stop() error {
11✔
152
        log.Debugf("DecayedLog shutting down...")
11✔
153
        defer log.Debugf("DecayedLog shutdown complete")
11✔
154

11✔
155
        if !atomic.CompareAndSwapInt32(&d.stopped, 0, 1) {
14✔
156
                return nil
3✔
157
        }
3✔
158

159
        // Stop garbage collector.
160
        close(d.quit)
8✔
161

8✔
162
        d.wg.Wait()
8✔
163

8✔
164
        return nil
8✔
165
}
166

167
// garbageCollector deletes entries from sharedHashBucket whose expiry height
168
// has already past. This function MUST be run as a goroutine.
169
func (d *DecayedLog) garbageCollector(epochClient *chainntnfs.BlockEpochEvent) {
3✔
170
        defer d.wg.Done()
3✔
171
        defer epochClient.Cancel()
3✔
172

3✔
173
        for {
9✔
174
                select {
6✔
175
                case epoch, ok := <-epochClient.Epochs:
3✔
176
                        if !ok {
3✔
177
                                // Block epoch was canceled, shutting down.
×
178
                                log.Infof("Block epoch canceled, " +
×
179
                                        "decaying hash log shutting down")
×
180
                                return
×
181
                        }
×
182

183
                        // Perform a bout of garbage collection using the
184
                        // epoch's block height.
185
                        height := uint32(epoch.Height)
3✔
186
                        numExpired, err := d.gcExpiredHashes(height)
3✔
187
                        if err != nil {
3✔
188
                                log.Errorf("unable to expire hashes at "+
×
189
                                        "height=%d", height)
×
190
                        }
×
191

192
                        if numExpired > 0 {
5✔
193
                                log.Infof("Garbage collected %v shared "+
2✔
194
                                        "secret hashes at height=%v",
2✔
195
                                        numExpired, height)
2✔
196
                        }
2✔
197

198
                case <-d.quit:
3✔
199
                        // Received shutdown request.
3✔
200
                        log.Infof("Decaying hash log received " +
3✔
201
                                "shutdown request")
3✔
202
                        return
3✔
203
                }
204
        }
205
}
206

207
// gcExpiredHashes purges the decaying log of all entries whose CLTV expires
208
// below the provided height.
209
func (d *DecayedLog) gcExpiredHashes(height uint32) (uint32, error) {
3✔
210
        var numExpiredHashes uint32
3✔
211

3✔
212
        err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
6✔
213
                numExpiredHashes = 0
3✔
214

3✔
215
                // Grab the shared hash bucket
3✔
216
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
3✔
217
                if sharedHashes == nil {
3✔
218
                        return fmt.Errorf("sharedHashBucket " +
×
219
                                "is nil")
×
220
                }
×
221

222
                var expiredCltv [][]byte
3✔
223
                if err := sharedHashes.ForEach(func(k, v []byte) error {
6✔
224
                        // Deserialize the CLTV value for this entry.
3✔
225
                        cltv := uint32(binary.BigEndian.Uint32(v))
3✔
226

3✔
227
                        if cltv < height {
5✔
228
                                // This CLTV is expired. We must add it to an
2✔
229
                                // array which we'll loop over and delete every
2✔
230
                                // hash contained from the db.
2✔
231
                                expiredCltv = append(expiredCltv, k)
2✔
232
                                numExpiredHashes++
2✔
233
                        }
2✔
234

235
                        return nil
3✔
236
                }); err != nil {
×
237
                        return err
×
238
                }
×
239

240
                // Delete every item in the array. This must
241
                // be done explicitly outside of the ForEach
242
                // function for safety reasons.
243
                for _, hash := range expiredCltv {
5✔
244
                        err := sharedHashes.Delete(hash)
2✔
245
                        if err != nil {
2✔
246
                                return err
×
247
                        }
×
248
                }
249

250
                return nil
3✔
251
        })
252
        if err != nil {
3✔
253
                return 0, err
×
254
        }
×
255

256
        return numExpiredHashes, nil
3✔
257
}
258

259
// Delete removes a <shared secret hash, CLTV> key-pair from the
260
// sharedHashBucket.
261
func (d *DecayedLog) Delete(hash *sphinx.HashPrefix) error {
2✔
262
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
4✔
263
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
2✔
264
                if sharedHashes == nil {
2✔
265
                        return ErrDecayedLogCorrupted
×
266
                }
×
267

268
                return sharedHashes.Delete(hash[:])
2✔
269
        })
270
}
271

272
// Get retrieves the CLTV of a processed HTLC given the first 20 bytes of the
273
// Sha-256 hash of the shared secret.
274
func (d *DecayedLog) Get(hash *sphinx.HashPrefix) (uint32, error) {
9✔
275
        var value uint32
9✔
276

9✔
277
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
18✔
278
                // Grab the shared hash bucket which stores the mapping from
9✔
279
                // truncated sha-256 hashes of shared secrets to CLTV's.
9✔
280
                sharedHashes := tx.ReadBucket(sharedHashBucket)
9✔
281
                if sharedHashes == nil {
9✔
282
                        return fmt.Errorf("sharedHashes is nil, could " +
×
283
                                "not retrieve CLTV value")
×
284
                }
×
285

286
                // Retrieve the bytes which represents the CLTV
287
                valueBytes := sharedHashes.Get(hash[:])
9✔
288
                if valueBytes == nil {
13✔
289
                        return sphinx.ErrLogEntryNotFound
4✔
290
                }
4✔
291

292
                // The first 4 bytes represent the CLTV, store it in value.
293
                value = uint32(binary.BigEndian.Uint32(valueBytes))
5✔
294

5✔
295
                return nil
5✔
296
        }, func() {
9✔
297
                value = 0
9✔
298
        })
9✔
299
        if err != nil {
13✔
300
                return value, err
4✔
301
        }
4✔
302

303
        return value, nil
5✔
304
}
305

306
// Put stores a shared secret hash as the key and the CLTV as the value.
307
func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error {
5✔
308
        // Optimisitically serialize the cltv value into the scratch buffer.
5✔
309
        var scratch [4]byte
5✔
310
        binary.BigEndian.PutUint32(scratch[:], cltv)
5✔
311

5✔
312
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
10✔
313
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
5✔
314
                if sharedHashes == nil {
5✔
315
                        return ErrDecayedLogCorrupted
×
316
                }
×
317

318
                // Check to see if this hash prefix has been recorded before. If
319
                // a value is found, this packet is being replayed.
320
                valueBytes := sharedHashes.Get(hash[:])
5✔
321
                if valueBytes != nil {
5✔
322
                        return sphinx.ErrReplayedPacket
×
323
                }
×
324

325
                return sharedHashes.Put(hash[:], scratch[:])
5✔
326
        })
327
}
328

329
// PutBatch accepts a pending batch of hashed secret entries to write to disk.
330
// Each hashed secret is inserted with a corresponding time value, dictating
331
// when the entry will be evicted from the log.
332
// NOTE: This method enforces idempotency by writing the replay set obtained
333
// from the first attempt for a particular batch ID, and decoding the return
334
// value to subsequent calls. For the indices of the replay set to be aligned
335
// properly, the batch MUST be constructed identically to the first attempt,
336
// pruning will cause the indices to become invalid.
337
func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
×
338
        // Since batched boltdb txns may be executed multiple times before
×
339
        // succeeding, we will create a new replay set for each invocation to
×
340
        // avoid any side-effects. If the txn is successful, this replay set
×
341
        // will be merged with the replay set computed during batch construction
×
342
        // to generate the complete replay set. If this batch was previously
×
343
        // processed, the replay set will be deserialized from disk.
×
344
        var replays *sphinx.ReplaySet
×
345
        if err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
346
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
347
                if sharedHashes == nil {
×
348
                        return ErrDecayedLogCorrupted
×
349
                }
×
350

351
                // Load the batch replay bucket, which will be used to either
352
                // retrieve the result of previously processing this batch, or
353
                // to write the result of this operation.
354
                batchReplayBkt := tx.ReadWriteBucket(batchReplayBucket)
×
355
                if batchReplayBkt == nil {
×
356
                        return ErrDecayedLogCorrupted
×
357
                }
×
358

359
                // Check for the existence of this batch's id in the replay
360
                // bucket. If a non-nil value is found, this indicates that we
361
                // have already processed this batch before. We deserialize the
362
                // resulting and return it to ensure calls to put batch are
363
                // idempotent.
364
                replayBytes := batchReplayBkt.Get(b.ID)
×
365
                if replayBytes != nil {
×
366
                        replays = sphinx.NewReplaySet()
×
367
                        return replays.Decode(bytes.NewReader(replayBytes))
×
368
                }
×
369

370
                // The CLTV will be stored into scratch and then stored into the
371
                // sharedHashBucket.
372
                var scratch [4]byte
×
373

×
374
                replays = sphinx.NewReplaySet()
×
375
                err := b.ForEach(func(seqNum uint16, hashPrefix *sphinx.HashPrefix, cltv uint32) error {
×
376
                        // Retrieve the bytes which represents the CLTV
×
377
                        valueBytes := sharedHashes.Get(hashPrefix[:])
×
378
                        if valueBytes != nil {
×
379
                                replays.Add(seqNum)
×
380
                                return nil
×
381
                        }
×
382

383
                        // Serialize the cltv value and write an entry keyed by
384
                        // the hash prefix.
385
                        binary.BigEndian.PutUint32(scratch[:], cltv)
×
386
                        return sharedHashes.Put(hashPrefix[:], scratch[:])
×
387
                })
388
                if err != nil {
×
389
                        return err
×
390
                }
×
391

392
                // Merge the replay set computed from checking the on-disk
393
                // entries with the in-batch replays computed during this
394
                // batch's construction.
395
                replays.Merge(b.ReplaySet)
×
396

×
397
                // Write the replay set under the batch identifier to the batch
×
398
                // replays bucket. This can be used during recovery to test (1)
×
399
                // that a particular batch was successfully processed and (2)
×
400
                // recover the indexes of the adds that were rejected as
×
401
                // replays.
×
402
                var replayBuf bytes.Buffer
×
403
                if err := replays.Encode(&replayBuf); err != nil {
×
404
                        return err
×
405
                }
×
406

407
                return batchReplayBkt.Put(b.ID, replayBuf.Bytes())
×
408
        }); err != nil {
×
409
                return nil, err
×
410
        }
×
411

412
        b.ReplaySet = replays
×
413
        b.IsCommitted = true
×
414

×
415
        return replays, nil
×
416
}
417

418
// A compile time check to see if DecayedLog adheres to the PersistLog
419
// interface.
420
var _ sphinx.ReplayLog = (*DecayedLog)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc