• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15031268339

14 May 2025 09:15PM UTC coverage: 58.592% (-10.4%) from 68.997%
15031268339

Pull #9801

github

web-flow
Merge 748c3fe22 into b0cba7dd0
Pull Request #9801: peer+lnd: add new CLI option to control if we D/C on slow pongs

5 of 79 new or added lines in 3 files covered. (6.33%)

28199 existing lines in 450 files now uncovered.

97428 of 166282 relevant lines covered (58.59%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.97
/htlcswitch/decayedlog.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "sync/atomic"
10

11
        sphinx "github.com/lightningnetwork/lightning-onion"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
)
15

16
const (
17
        // defaultDbDirectory is the default directory where our decayed log
18
        // will store our (sharedHash, CLTV) key-value pairs.
19
        defaultDbDirectory = "sharedhashes"
20
)
21

22
var (
23
        // sharedHashBucket is a bucket which houses the first HashPrefixSize
24
        // bytes of a received HTLC's hashed shared secret as the key and the HTLC's
25
        // CLTV expiry as the value.
26
        sharedHashBucket = []byte("shared-hash")
27

28
        // batchReplayBucket is a bucket that maps batch identifiers to
29
        // serialized ReplaySets. This is used to give idempotency in the event
30
        // that a batch is processed more than once.
31
        batchReplayBucket = []byte("batch-replay")
32
)
33

34
var (
35
        // ErrDecayedLogInit is used to indicate a decayed log failed to create
36
        // the proper bucketing structure on startup.
37
        ErrDecayedLogInit = errors.New("unable to initialize decayed log")
38

39
        // ErrDecayedLogCorrupted signals that the anticipated bucketing
40
        // structure has diverged since initialization.
41
        ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted")
42
)
43

44
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
45
// the decayed logs database.
46
func NewBoltBackendCreator(dbPath,
UNCOV
47
        dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
UNCOV
48

×
UNCOV
49
        return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
UNCOV
50
                cfg := &kvdb.BoltBackendConfig{
×
UNCOV
51
                        DBPath:            dbPath,
×
UNCOV
52
                        DBFileName:        dbFileName,
×
UNCOV
53
                        NoFreelistSync:    boltCfg.NoFreelistSync,
×
UNCOV
54
                        AutoCompact:       boltCfg.AutoCompact,
×
UNCOV
55
                        AutoCompactMinAge: boltCfg.AutoCompactMinAge,
×
UNCOV
56
                        DBTimeout:         boltCfg.DBTimeout,
×
UNCOV
57
                }
×
UNCOV
58

×
UNCOV
59
                // Use default path for log database.
×
UNCOV
60
                if dbPath == "" {
×
61
                        cfg.DBPath = defaultDbDirectory
×
62
                }
×
63

UNCOV
64
                db, err := kvdb.GetBoltBackend(cfg)
×
UNCOV
65
                if err != nil {
×
66
                        return nil, fmt.Errorf("could not open boltdb: %w", err)
×
67
                }
×
68

UNCOV
69
                return db, nil
×
70
        }
71
}
72

73
// DecayedLog implements the PersistLog interface. It stores the first
74
// HashPrefixSize bytes of a sha256-hashed shared secret along with a node's
75
// CLTV value. It is a decaying log meaning there will be a garbage collector
76
// to collect entries which are expired according to their stored CLTV value
77
// and the current block height. DecayedLog wraps boltdb for simplicity and
78
// batches writes to the database to decrease write contention.
79
type DecayedLog struct {
80
        started int32 // To be used atomically.
81
        stopped int32 // To be used atomically.
82

83
        db kvdb.Backend
84

85
        notifier chainntnfs.ChainNotifier
86

87
        wg   sync.WaitGroup
88
        quit chan struct{}
89
}
90

91
// NewDecayedLog creates a new DecayedLog, which caches recently seen hash
92
// shared secrets. Entries are evicted as their cltv expires using block epochs
93
// from the given notifier.
94
func NewDecayedLog(db kvdb.Backend,
95
        notifier chainntnfs.ChainNotifier) *DecayedLog {
3✔
96

3✔
97
        return &DecayedLog{
3✔
98
                db:       db,
3✔
99
                notifier: notifier,
3✔
100
                quit:     make(chan struct{}),
3✔
101
        }
3✔
102
}
3✔
103

104
// Start opens the database we will be using to store hashed shared secrets.
105
// It also starts the garbage collector in a goroutine to remove stale
106
// database entries.
107
func (d *DecayedLog) Start() error {
3✔
108
        if !atomic.CompareAndSwapInt32(&d.started, 0, 1) {
3✔
109
                return nil
×
110
        }
×
111

112
        // Initialize the primary buckets used by the decayed log.
113
        if err := d.initBuckets(); err != nil {
3✔
114
                return err
×
115
        }
×
116

117
        // Start garbage collector.
118
        if d.notifier != nil {
6✔
119
                epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil)
3✔
120
                if err != nil {
3✔
121
                        return fmt.Errorf("unable to register for epoch "+
×
122
                                "notifications: %v", err)
×
123
                }
×
124

125
                d.wg.Add(1)
3✔
126
                go d.garbageCollector(epochClient)
3✔
127
        }
128

129
        return nil
3✔
130
}
131

132
// initBuckets initializes the primary buckets used by the decayed log, namely
133
// the shared hash bucket, and batch replay
134
func (d *DecayedLog) initBuckets() error {
3✔
135
        return kvdb.Update(d.db, func(tx kvdb.RwTx) error {
6✔
136
                _, err := tx.CreateTopLevelBucket(sharedHashBucket)
3✔
137
                if err != nil {
3✔
138
                        return ErrDecayedLogInit
×
139
                }
×
140

141
                _, err = tx.CreateTopLevelBucket(batchReplayBucket)
3✔
142
                if err != nil {
3✔
143
                        return ErrDecayedLogInit
×
144
                }
×
145

146
                return nil
3✔
147
        }, func() {})
3✔
148
}
149

150
// Stop halts the garbage collector and closes boltdb.
151
func (d *DecayedLog) Stop() error {
3✔
152
        log.Debugf("DecayedLog shutting down...")
3✔
153
        defer log.Debugf("DecayedLog shutdown complete")
3✔
154

3✔
155
        if !atomic.CompareAndSwapInt32(&d.stopped, 0, 1) {
3✔
UNCOV
156
                return nil
×
UNCOV
157
        }
×
158

159
        // Stop garbage collector.
160
        close(d.quit)
3✔
161

3✔
162
        d.wg.Wait()
3✔
163

3✔
164
        return nil
3✔
165
}
166

167
// garbageCollector deletes entries from sharedHashBucket whose expiry height
168
// has already past. This function MUST be run as a goroutine.
169
func (d *DecayedLog) garbageCollector(epochClient *chainntnfs.BlockEpochEvent) {
3✔
170
        defer d.wg.Done()
3✔
171
        defer epochClient.Cancel()
3✔
172

3✔
173
        for {
6✔
174
                select {
3✔
175
                case epoch, ok := <-epochClient.Epochs:
3✔
176
                        if !ok {
3✔
177
                                // Block epoch was canceled, shutting down.
×
178
                                log.Infof("Block epoch canceled, " +
×
179
                                        "decaying hash log shutting down")
×
180
                                return
×
181
                        }
×
182

183
                        // Perform a bout of garbage collection using the
184
                        // epoch's block height.
185
                        height := uint32(epoch.Height)
3✔
186
                        numExpired, err := d.gcExpiredHashes(height)
3✔
187
                        if err != nil {
3✔
188
                                log.Errorf("unable to expire hashes at "+
×
189
                                        "height=%d", height)
×
190
                        }
×
191

192
                        if numExpired > 0 {
6✔
193
                                log.Infof("Garbage collected %v shared "+
3✔
194
                                        "secret hashes at height=%v",
3✔
195
                                        numExpired, height)
3✔
196
                        }
3✔
197

198
                case <-d.quit:
3✔
199
                        // Received shutdown request.
3✔
200
                        log.Infof("Decaying hash log received " +
3✔
201
                                "shutdown request")
3✔
202
                        return
3✔
203
                }
204
        }
205
}
206

207
// gcExpiredHashes purges the decaying log of all entries whose CLTV expires
208
// below the provided height.
209
func (d *DecayedLog) gcExpiredHashes(height uint32) (uint32, error) {
3✔
210
        var numExpiredHashes uint32
3✔
211

3✔
212
        err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
6✔
213
                numExpiredHashes = 0
3✔
214

3✔
215
                // Grab the shared hash bucket
3✔
216
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
3✔
217
                if sharedHashes == nil {
3✔
218
                        return fmt.Errorf("sharedHashBucket " +
×
219
                                "is nil")
×
220
                }
×
221

222
                var expiredCltv [][]byte
3✔
223
                if err := sharedHashes.ForEach(func(k, v []byte) error {
6✔
224
                        // Deserialize the CLTV value for this entry.
3✔
225
                        cltv := uint32(binary.BigEndian.Uint32(v))
3✔
226

3✔
227
                        if cltv < height {
6✔
228
                                // This CLTV is expired. We must add it to an
3✔
229
                                // array which we'll loop over and delete every
3✔
230
                                // hash contained from the db.
3✔
231
                                expiredCltv = append(expiredCltv, k)
3✔
232
                                numExpiredHashes++
3✔
233
                        }
3✔
234

235
                        return nil
3✔
236
                }); err != nil {
×
237
                        return err
×
238
                }
×
239

240
                // Delete every item in the array. This must
241
                // be done explicitly outside of the ForEach
242
                // function for safety reasons.
243
                for _, hash := range expiredCltv {
6✔
244
                        err := sharedHashes.Delete(hash)
3✔
245
                        if err != nil {
3✔
246
                                return err
×
247
                        }
×
248
                }
249

250
                return nil
3✔
251
        })
252
        if err != nil {
3✔
253
                return 0, err
×
254
        }
×
255

256
        return numExpiredHashes, nil
3✔
257
}
258

259
// Delete removes a <shared secret hash, CLTV> key-pair from the
260
// sharedHashBucket.
UNCOV
261
func (d *DecayedLog) Delete(hash *sphinx.HashPrefix) error {
×
UNCOV
262
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
UNCOV
263
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
UNCOV
264
                if sharedHashes == nil {
×
265
                        return ErrDecayedLogCorrupted
×
266
                }
×
267

UNCOV
268
                return sharedHashes.Delete(hash[:])
×
269
        })
270
}
271

272
// Get retrieves the CLTV of a processed HTLC given the first 20 bytes of the
273
// Sha-256 hash of the shared secret.
UNCOV
274
func (d *DecayedLog) Get(hash *sphinx.HashPrefix) (uint32, error) {
×
UNCOV
275
        var value uint32
×
UNCOV
276

×
UNCOV
277
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
×
UNCOV
278
                // Grab the shared hash bucket which stores the mapping from
×
UNCOV
279
                // truncated sha-256 hashes of shared secrets to CLTV's.
×
UNCOV
280
                sharedHashes := tx.ReadBucket(sharedHashBucket)
×
UNCOV
281
                if sharedHashes == nil {
×
282
                        return fmt.Errorf("sharedHashes is nil, could " +
×
283
                                "not retrieve CLTV value")
×
284
                }
×
285

286
                // Retrieve the bytes which represents the CLTV
UNCOV
287
                valueBytes := sharedHashes.Get(hash[:])
×
UNCOV
288
                if valueBytes == nil {
×
UNCOV
289
                        return sphinx.ErrLogEntryNotFound
×
UNCOV
290
                }
×
291

292
                // The first 4 bytes represent the CLTV, store it in value.
UNCOV
293
                value = uint32(binary.BigEndian.Uint32(valueBytes))
×
UNCOV
294

×
UNCOV
295
                return nil
×
UNCOV
296
        }, func() {
×
UNCOV
297
                value = 0
×
UNCOV
298
        })
×
UNCOV
299
        if err != nil {
×
UNCOV
300
                return value, err
×
UNCOV
301
        }
×
302

UNCOV
303
        return value, nil
×
304
}
305

306
// Put stores a shared secret hash as the key and the CLTV as the value.
UNCOV
307
func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error {
×
UNCOV
308
        // Optimisitically serialize the cltv value into the scratch buffer.
×
UNCOV
309
        var scratch [4]byte
×
UNCOV
310
        binary.BigEndian.PutUint32(scratch[:], cltv)
×
UNCOV
311

×
UNCOV
312
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
UNCOV
313
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
UNCOV
314
                if sharedHashes == nil {
×
315
                        return ErrDecayedLogCorrupted
×
316
                }
×
317

318
                // Check to see if this hash prefix has been recorded before. If
319
                // a value is found, this packet is being replayed.
UNCOV
320
                valueBytes := sharedHashes.Get(hash[:])
×
UNCOV
321
                if valueBytes != nil {
×
322
                        return sphinx.ErrReplayedPacket
×
323
                }
×
324

UNCOV
325
                return sharedHashes.Put(hash[:], scratch[:])
×
326
        })
327
}
328

329
// PutBatch accepts a pending batch of hashed secret entries to write to disk.
330
// Each hashed secret is inserted with a corresponding time value, dictating
331
// when the entry will be evicted from the log.
332
// NOTE: This method enforces idempotency by writing the replay set obtained
333
// from the first attempt for a particular batch ID, and decoding the return
334
// value to subsequent calls. For the indices of the replay set to be aligned
335
// properly, the batch MUST be constructed identically to the first attempt,
336
// pruning will cause the indices to become invalid.
337
func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
3✔
338
        // Since batched boltdb txns may be executed multiple times before
3✔
339
        // succeeding, we will create a new replay set for each invocation to
3✔
340
        // avoid any side-effects. If the txn is successful, this replay set
3✔
341
        // will be merged with the replay set computed during batch construction
3✔
342
        // to generate the complete replay set. If this batch was previously
3✔
343
        // processed, the replay set will be deserialized from disk.
3✔
344
        var replays *sphinx.ReplaySet
3✔
345
        if err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
6✔
346
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
3✔
347
                if sharedHashes == nil {
3✔
348
                        return ErrDecayedLogCorrupted
×
349
                }
×
350

351
                // Load the batch replay bucket, which will be used to either
352
                // retrieve the result of previously processing this batch, or
353
                // to write the result of this operation.
354
                batchReplayBkt := tx.ReadWriteBucket(batchReplayBucket)
3✔
355
                if batchReplayBkt == nil {
3✔
356
                        return ErrDecayedLogCorrupted
×
357
                }
×
358

359
                // Check for the existence of this batch's id in the replay
360
                // bucket. If a non-nil value is found, this indicates that we
361
                // have already processed this batch before. We deserialize the
362
                // resulting and return it to ensure calls to put batch are
363
                // idempotent.
364
                replayBytes := batchReplayBkt.Get(b.ID)
3✔
365
                if replayBytes != nil {
6✔
366
                        replays = sphinx.NewReplaySet()
3✔
367
                        return replays.Decode(bytes.NewReader(replayBytes))
3✔
368
                }
3✔
369

370
                // The CLTV will be stored into scratch and then stored into the
371
                // sharedHashBucket.
372
                var scratch [4]byte
3✔
373

3✔
374
                replays = sphinx.NewReplaySet()
3✔
375
                err := b.ForEach(func(seqNum uint16, hashPrefix *sphinx.HashPrefix, cltv uint32) error {
6✔
376
                        // Retrieve the bytes which represents the CLTV
3✔
377
                        valueBytes := sharedHashes.Get(hashPrefix[:])
3✔
378
                        if valueBytes != nil {
6✔
379
                                replays.Add(seqNum)
3✔
380
                                return nil
3✔
381
                        }
3✔
382

383
                        // Serialize the cltv value and write an entry keyed by
384
                        // the hash prefix.
385
                        binary.BigEndian.PutUint32(scratch[:], cltv)
3✔
386
                        return sharedHashes.Put(hashPrefix[:], scratch[:])
3✔
387
                })
388
                if err != nil {
3✔
389
                        return err
×
390
                }
×
391

392
                // Merge the replay set computed from checking the on-disk
393
                // entries with the in-batch replays computed during this
394
                // batch's construction.
395
                replays.Merge(b.ReplaySet)
3✔
396

3✔
397
                // Write the replay set under the batch identifier to the batch
3✔
398
                // replays bucket. This can be used during recovery to test (1)
3✔
399
                // that a particular batch was successfully processed and (2)
3✔
400
                // recover the indexes of the adds that were rejected as
3✔
401
                // replays.
3✔
402
                var replayBuf bytes.Buffer
3✔
403
                if err := replays.Encode(&replayBuf); err != nil {
3✔
404
                        return err
×
405
                }
×
406

407
                return batchReplayBkt.Put(b.ID, replayBuf.Bytes())
3✔
408
        }); err != nil {
×
409
                return nil, err
×
410
        }
×
411

412
        b.ReplaySet = replays
3✔
413
        b.IsCommitted = true
3✔
414

3✔
415
        return replays, nil
3✔
416
}
417

418
// A compile time check to see if DecayedLog adheres to the PersistLog
419
// interface.
420
var _ sphinx.ReplayLog = (*DecayedLog)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc