• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12199391122

06 Dec 2024 01:10PM UTC coverage: 49.807% (-9.1%) from 58.933%
12199391122

push

github

web-flow
Merge pull request #9337 from Guayaba221/patch-1

chore: fix typo in ruby.md

100137 of 201051 relevant lines covered (49.81%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.97
/htlcswitch/decayedlog.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "sync/atomic"
10

11
        sphinx "github.com/lightningnetwork/lightning-onion"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
)
15

16
const (
17
        // defaultDbDirectory is the default directory where our decayed log
18
        // will store our (sharedHash, CLTV) key-value pairs.
19
        defaultDbDirectory = "sharedhashes"
20
)
21

22
var (
23
        // sharedHashBucket is a bucket which houses the first HashPrefixSize
24
        // bytes of a received HTLC's hashed shared secret as the key and the HTLC's
25
        // CLTV expiry as the value.
26
        sharedHashBucket = []byte("shared-hash")
27

28
        // batchReplayBucket is a bucket that maps batch identifiers to
29
        // serialized ReplaySets. This is used to give idempotency in the event
30
        // that a batch is processed more than once.
31
        batchReplayBucket = []byte("batch-replay")
32
)
33

34
var (
35
        // ErrDecayedLogInit is used to indicate a decayed log failed to create
36
        // the proper bucketing structure on startup.
37
        ErrDecayedLogInit = errors.New("unable to initialize decayed log")
38

39
        // ErrDecayedLogCorrupted signals that the anticipated bucketing
40
        // structure has diverged since initialization.
41
        ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted")
42
)
43

44
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
45
// the decayed logs database.
46
func NewBoltBackendCreator(dbPath,
47
        dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
48

×
49
        return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
50
                cfg := &kvdb.BoltBackendConfig{
×
51
                        DBPath:            dbPath,
×
52
                        DBFileName:        dbFileName,
×
53
                        NoFreelistSync:    boltCfg.NoFreelistSync,
×
54
                        AutoCompact:       boltCfg.AutoCompact,
×
55
                        AutoCompactMinAge: boltCfg.AutoCompactMinAge,
×
56
                        DBTimeout:         boltCfg.DBTimeout,
×
57
                }
×
58

×
59
                // Use default path for log database.
×
60
                if dbPath == "" {
×
61
                        cfg.DBPath = defaultDbDirectory
×
62
                }
×
63

64
                db, err := kvdb.GetBoltBackend(cfg)
×
65
                if err != nil {
×
66
                        return nil, fmt.Errorf("could not open boltdb: %w", err)
×
67
                }
×
68

69
                return db, nil
×
70
        }
71
}
72

73
// DecayedLog implements the PersistLog interface. It stores the first
74
// HashPrefixSize bytes of a sha256-hashed shared secret along with a node's
75
// CLTV value. It is a decaying log meaning there will be a garbage collector
76
// to collect entries which are expired according to their stored CLTV value
77
// and the current block height. DecayedLog wraps boltdb for simplicity and
78
// batches writes to the database to decrease write contention.
79
type DecayedLog struct {
80
        started int32 // To be used atomically.
81
        stopped int32 // To be used atomically.
82

83
        db kvdb.Backend
84

85
        notifier chainntnfs.ChainNotifier
86

87
        wg   sync.WaitGroup
88
        quit chan struct{}
89
}
90

91
// NewDecayedLog creates a new DecayedLog, which caches recently seen hash
92
// shared secrets. Entries are evicted as their cltv expires using block epochs
93
// from the given notifier.
94
func NewDecayedLog(db kvdb.Backend,
95
        notifier chainntnfs.ChainNotifier) *DecayedLog {
4✔
96

4✔
97
        return &DecayedLog{
4✔
98
                db:       db,
4✔
99
                notifier: notifier,
4✔
100
                quit:     make(chan struct{}),
4✔
101
        }
4✔
102
}
4✔
103

104
// Start opens the database we will be using to store hashed shared secrets.
105
// It also starts the garbage collector in a goroutine to remove stale
106
// database entries.
107
func (d *DecayedLog) Start() error {
4✔
108
        if !atomic.CompareAndSwapInt32(&d.started, 0, 1) {
4✔
109
                return nil
×
110
        }
×
111

112
        // Initialize the primary buckets used by the decayed log.
113
        if err := d.initBuckets(); err != nil {
4✔
114
                return err
×
115
        }
×
116

117
        // Start garbage collector.
118
        if d.notifier != nil {
8✔
119
                epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil)
4✔
120
                if err != nil {
4✔
121
                        return fmt.Errorf("unable to register for epoch "+
×
122
                                "notifications: %v", err)
×
123
                }
×
124

125
                d.wg.Add(1)
4✔
126
                go d.garbageCollector(epochClient)
4✔
127
        }
128

129
        return nil
4✔
130
}
131

132
// initBuckets initializes the primary buckets used by the decayed log, namely
133
// the shared hash bucket, and batch replay
134
func (d *DecayedLog) initBuckets() error {
4✔
135
        return kvdb.Update(d.db, func(tx kvdb.RwTx) error {
8✔
136
                _, err := tx.CreateTopLevelBucket(sharedHashBucket)
4✔
137
                if err != nil {
4✔
138
                        return ErrDecayedLogInit
×
139
                }
×
140

141
                _, err = tx.CreateTopLevelBucket(batchReplayBucket)
4✔
142
                if err != nil {
4✔
143
                        return ErrDecayedLogInit
×
144
                }
×
145

146
                return nil
4✔
147
        }, func() {})
4✔
148
}
149

150
// Stop halts the garbage collector and closes boltdb.
151
func (d *DecayedLog) Stop() error {
4✔
152
        log.Debugf("DecayedLog shutting down...")
4✔
153
        defer log.Debugf("DecayedLog shutdown complete")
4✔
154

4✔
155
        if !atomic.CompareAndSwapInt32(&d.stopped, 0, 1) {
4✔
156
                return nil
×
157
        }
×
158

159
        // Stop garbage collector.
160
        close(d.quit)
4✔
161

4✔
162
        d.wg.Wait()
4✔
163

4✔
164
        return nil
4✔
165
}
166

167
// garbageCollector deletes entries from sharedHashBucket whose expiry height
168
// has already past. This function MUST be run as a goroutine.
169
func (d *DecayedLog) garbageCollector(epochClient *chainntnfs.BlockEpochEvent) {
4✔
170
        defer d.wg.Done()
4✔
171
        defer epochClient.Cancel()
4✔
172

4✔
173
        for {
8✔
174
                select {
4✔
175
                case epoch, ok := <-epochClient.Epochs:
4✔
176
                        if !ok {
4✔
177
                                // Block epoch was canceled, shutting down.
×
178
                                log.Infof("Block epoch canceled, " +
×
179
                                        "decaying hash log shutting down")
×
180
                                return
×
181
                        }
×
182

183
                        // Perform a bout of garbage collection using the
184
                        // epoch's block height.
185
                        height := uint32(epoch.Height)
4✔
186
                        numExpired, err := d.gcExpiredHashes(height)
4✔
187
                        if err != nil {
4✔
188
                                log.Errorf("unable to expire hashes at "+
×
189
                                        "height=%d", height)
×
190
                        }
×
191

192
                        if numExpired > 0 {
8✔
193
                                log.Infof("Garbage collected %v shared "+
4✔
194
                                        "secret hashes at height=%v",
4✔
195
                                        numExpired, height)
4✔
196
                        }
4✔
197

198
                case <-d.quit:
4✔
199
                        // Received shutdown request.
4✔
200
                        log.Infof("Decaying hash log received " +
4✔
201
                                "shutdown request")
4✔
202
                        return
4✔
203
                }
204
        }
205
}
206

207
// gcExpiredHashes purges the decaying log of all entries whose CLTV expires
208
// below the provided height.
209
func (d *DecayedLog) gcExpiredHashes(height uint32) (uint32, error) {
4✔
210
        var numExpiredHashes uint32
4✔
211

4✔
212
        err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
8✔
213
                numExpiredHashes = 0
4✔
214

4✔
215
                // Grab the shared hash bucket
4✔
216
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
4✔
217
                if sharedHashes == nil {
4✔
218
                        return fmt.Errorf("sharedHashBucket " +
×
219
                                "is nil")
×
220
                }
×
221

222
                var expiredCltv [][]byte
4✔
223
                if err := sharedHashes.ForEach(func(k, v []byte) error {
8✔
224
                        // Deserialize the CLTV value for this entry.
4✔
225
                        cltv := uint32(binary.BigEndian.Uint32(v))
4✔
226

4✔
227
                        if cltv < height {
8✔
228
                                // This CLTV is expired. We must add it to an
4✔
229
                                // array which we'll loop over and delete every
4✔
230
                                // hash contained from the db.
4✔
231
                                expiredCltv = append(expiredCltv, k)
4✔
232
                                numExpiredHashes++
4✔
233
                        }
4✔
234

235
                        return nil
4✔
236
                }); err != nil {
×
237
                        return err
×
238
                }
×
239

240
                // Delete every item in the array. This must
241
                // be done explicitly outside of the ForEach
242
                // function for safety reasons.
243
                for _, hash := range expiredCltv {
8✔
244
                        err := sharedHashes.Delete(hash)
4✔
245
                        if err != nil {
4✔
246
                                return err
×
247
                        }
×
248
                }
249

250
                return nil
4✔
251
        })
252
        if err != nil {
4✔
253
                return 0, err
×
254
        }
×
255

256
        return numExpiredHashes, nil
4✔
257
}
258

259
// Delete removes a <shared secret hash, CLTV> key-pair from the
260
// sharedHashBucket.
261
func (d *DecayedLog) Delete(hash *sphinx.HashPrefix) error {
×
262
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
263
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
264
                if sharedHashes == nil {
×
265
                        return ErrDecayedLogCorrupted
×
266
                }
×
267

268
                return sharedHashes.Delete(hash[:])
×
269
        })
270
}
271

272
// Get retrieves the CLTV of a processed HTLC given the first 20 bytes of the
273
// Sha-256 hash of the shared secret.
274
func (d *DecayedLog) Get(hash *sphinx.HashPrefix) (uint32, error) {
×
275
        var value uint32
×
276

×
277
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
×
278
                // Grab the shared hash bucket which stores the mapping from
×
279
                // truncated sha-256 hashes of shared secrets to CLTV's.
×
280
                sharedHashes := tx.ReadBucket(sharedHashBucket)
×
281
                if sharedHashes == nil {
×
282
                        return fmt.Errorf("sharedHashes is nil, could " +
×
283
                                "not retrieve CLTV value")
×
284
                }
×
285

286
                // Retrieve the bytes which represents the CLTV
287
                valueBytes := sharedHashes.Get(hash[:])
×
288
                if valueBytes == nil {
×
289
                        return sphinx.ErrLogEntryNotFound
×
290
                }
×
291

292
                // The first 4 bytes represent the CLTV, store it in value.
293
                value = uint32(binary.BigEndian.Uint32(valueBytes))
×
294

×
295
                return nil
×
296
        }, func() {
×
297
                value = 0
×
298
        })
×
299
        if err != nil {
×
300
                return value, err
×
301
        }
×
302

303
        return value, nil
×
304
}
305

306
// Put stores a shared secret hash as the key and the CLTV as the value.
307
func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error {
×
308
        // Optimisitically serialize the cltv value into the scratch buffer.
×
309
        var scratch [4]byte
×
310
        binary.BigEndian.PutUint32(scratch[:], cltv)
×
311

×
312
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
313
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
314
                if sharedHashes == nil {
×
315
                        return ErrDecayedLogCorrupted
×
316
                }
×
317

318
                // Check to see if this hash prefix has been recorded before. If
319
                // a value is found, this packet is being replayed.
320
                valueBytes := sharedHashes.Get(hash[:])
×
321
                if valueBytes != nil {
×
322
                        return sphinx.ErrReplayedPacket
×
323
                }
×
324

325
                return sharedHashes.Put(hash[:], scratch[:])
×
326
        })
327
}
328

329
// PutBatch accepts a pending batch of hashed secret entries to write to disk.
330
// Each hashed secret is inserted with a corresponding time value, dictating
331
// when the entry will be evicted from the log.
332
// NOTE: This method enforces idempotency by writing the replay set obtained
333
// from the first attempt for a particular batch ID, and decoding the return
334
// value to subsequent calls. For the indices of the replay set to be aligned
335
// properly, the batch MUST be constructed identically to the first attempt,
336
// pruning will cause the indices to become invalid.
337
func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
4✔
338
        // Since batched boltdb txns may be executed multiple times before
4✔
339
        // succeeding, we will create a new replay set for each invocation to
4✔
340
        // avoid any side-effects. If the txn is successful, this replay set
4✔
341
        // will be merged with the replay set computed during batch construction
4✔
342
        // to generate the complete replay set. If this batch was previously
4✔
343
        // processed, the replay set will be deserialized from disk.
4✔
344
        var replays *sphinx.ReplaySet
4✔
345
        if err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
8✔
346
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
4✔
347
                if sharedHashes == nil {
4✔
348
                        return ErrDecayedLogCorrupted
×
349
                }
×
350

351
                // Load the batch replay bucket, which will be used to either
352
                // retrieve the result of previously processing this batch, or
353
                // to write the result of this operation.
354
                batchReplayBkt := tx.ReadWriteBucket(batchReplayBucket)
4✔
355
                if batchReplayBkt == nil {
4✔
356
                        return ErrDecayedLogCorrupted
×
357
                }
×
358

359
                // Check for the existence of this batch's id in the replay
360
                // bucket. If a non-nil value is found, this indicates that we
361
                // have already processed this batch before. We deserialize the
362
                // resulting and return it to ensure calls to put batch are
363
                // idempotent.
364
                replayBytes := batchReplayBkt.Get(b.ID)
4✔
365
                if replayBytes != nil {
8✔
366
                        replays = sphinx.NewReplaySet()
4✔
367
                        return replays.Decode(bytes.NewReader(replayBytes))
4✔
368
                }
4✔
369

370
                // The CLTV will be stored into scratch and then stored into the
371
                // sharedHashBucket.
372
                var scratch [4]byte
4✔
373

4✔
374
                replays = sphinx.NewReplaySet()
4✔
375
                err := b.ForEach(func(seqNum uint16, hashPrefix *sphinx.HashPrefix, cltv uint32) error {
8✔
376
                        // Retrieve the bytes which represents the CLTV
4✔
377
                        valueBytes := sharedHashes.Get(hashPrefix[:])
4✔
378
                        if valueBytes != nil {
8✔
379
                                replays.Add(seqNum)
4✔
380
                                return nil
4✔
381
                        }
4✔
382

383
                        // Serialize the cltv value and write an entry keyed by
384
                        // the hash prefix.
385
                        binary.BigEndian.PutUint32(scratch[:], cltv)
4✔
386
                        return sharedHashes.Put(hashPrefix[:], scratch[:])
4✔
387
                })
388
                if err != nil {
4✔
389
                        return err
×
390
                }
×
391

392
                // Merge the replay set computed from checking the on-disk
393
                // entries with the in-batch replays computed during this
394
                // batch's construction.
395
                replays.Merge(b.ReplaySet)
4✔
396

4✔
397
                // Write the replay set under the batch identifier to the batch
4✔
398
                // replays bucket. This can be used during recovery to test (1)
4✔
399
                // that a particular batch was successfully processed and (2)
4✔
400
                // recover the indexes of the adds that were rejected as
4✔
401
                // replays.
4✔
402
                var replayBuf bytes.Buffer
4✔
403
                if err := replays.Encode(&replayBuf); err != nil {
4✔
404
                        return err
×
405
                }
×
406

407
                return batchReplayBkt.Put(b.ID, replayBuf.Bytes())
4✔
408
        }); err != nil {
×
409
                return nil, err
×
410
        }
×
411

412
        b.ReplaySet = replays
4✔
413
        b.IsCommitted = true
4✔
414

4✔
415
        return replays, nil
4✔
416
}
417

418
// A compile time check to see if DecayedLog adheres to the PersistLog
419
// interface.
420
var _ sphinx.ReplayLog = (*DecayedLog)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc