• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.39
/htlcswitch/decayedlog.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "sync"
9
        "sync/atomic"
10

11
        sphinx "github.com/lightningnetwork/lightning-onion"
12
        "github.com/lightningnetwork/lnd/chainntnfs"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
)
15

16
const (
17
        // defaultDbDirectory is the default directory where our decayed log
18
        // will store our (sharedHash, CLTV) key-value pairs.
19
        defaultDbDirectory = "sharedhashes"
20
)
21

22
var (
23
        // sharedHashBucket is a bucket which houses the first HashPrefixSize
24
        // bytes of a received HTLC's hashed shared secret as the key and the HTLC's
25
        // CLTV expiry as the value.
26
        sharedHashBucket = []byte("shared-hash")
27

28
        // batchReplayBucket is a bucket that maps batch identifiers to
29
        // serialized ReplaySets. This is used to give idempotency in the event
30
        // that a batch is processed more than once.
31
        batchReplayBucket = []byte("batch-replay")
32
)
33

34
var (
35
        // ErrDecayedLogInit is used to indicate a decayed log failed to create
36
        // the proper bucketing structure on startup.
37
        ErrDecayedLogInit = errors.New("unable to initialize decayed log")
38

39
        // ErrDecayedLogCorrupted signals that the anticipated bucketing
40
        // structure has diverged since initialization.
41
        ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted")
42
)
43

44
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
45
// the decayed logs database.
46
func NewBoltBackendCreator(dbPath,
47
        dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
48

×
49
        return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
50
                cfg := &kvdb.BoltBackendConfig{
×
51
                        DBPath:            dbPath,
×
52
                        DBFileName:        dbFileName,
×
53
                        NoFreelistSync:    boltCfg.NoFreelistSync,
×
54
                        AutoCompact:       boltCfg.AutoCompact,
×
55
                        AutoCompactMinAge: boltCfg.AutoCompactMinAge,
×
56
                        DBTimeout:         boltCfg.DBTimeout,
×
57
                }
×
58

×
59
                // Use default path for log database.
×
60
                if dbPath == "" {
×
61
                        cfg.DBPath = defaultDbDirectory
×
62
                }
×
63

64
                db, err := kvdb.GetBoltBackend(cfg)
×
65
                if err != nil {
×
66
                        return nil, fmt.Errorf("could not open boltdb: %w", err)
×
67
                }
×
68

69
                return db, nil
×
70
        }
71
}
72

73
// DecayedLog implements the PersistLog interface. It stores the first
74
// HashPrefixSize bytes of a sha256-hashed shared secret along with a node's
75
// CLTV value. It is a decaying log meaning there will be a garbage collector
76
// to collect entries which are expired according to their stored CLTV value
77
// and the current block height. DecayedLog wraps boltdb for simplicity and
78
// batches writes to the database to decrease write contention.
79
type DecayedLog struct {
80
        started int32 // To be used atomically.
81
        stopped int32 // To be used atomically.
82

83
        db kvdb.Backend
84

85
        notifier chainntnfs.ChainNotifier
86

87
        wg   sync.WaitGroup
88
        quit chan struct{}
89
}
90

91
// NewDecayedLog creates a new DecayedLog, which caches recently seen hash
92
// shared secrets. Entries are evicted as their cltv expires using block epochs
93
// from the given notifier.
94
func NewDecayedLog(db kvdb.Backend,
95
        notifier chainntnfs.ChainNotifier) *DecayedLog {
3✔
96

3✔
97
        return &DecayedLog{
3✔
98
                db:       db,
3✔
99
                notifier: notifier,
3✔
100
                quit:     make(chan struct{}),
3✔
101
        }
3✔
102
}
3✔
103

104
// Start opens the database we will be using to store hashed shared secrets.
105
// It also starts the garbage collector in a goroutine to remove stale
106
// database entries.
107
func (d *DecayedLog) Start() error {
3✔
108
        if !atomic.CompareAndSwapInt32(&d.started, 0, 1) {
3✔
109
                return nil
×
110
        }
×
111

112
        // Initialize the primary buckets used by the decayed log.
113
        if err := d.initBuckets(); err != nil {
3✔
114
                return err
×
115
        }
×
116

117
        // Start garbage collector.
118
        if d.notifier != nil {
6✔
119
                epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil)
3✔
120
                if err != nil {
3✔
121
                        return fmt.Errorf("unable to register for epoch "+
×
122
                                "notifications: %v", err)
×
123
                }
×
124

125
                d.wg.Add(1)
3✔
126
                go d.garbageCollector(epochClient)
3✔
127
        }
128

129
        return nil
3✔
130
}
131

132
// initBuckets initializes the primary buckets used by the decayed log, namely
133
// the shared hash bucket, and batch replay
134
func (d *DecayedLog) initBuckets() error {
3✔
135
        return kvdb.Update(d.db, func(tx kvdb.RwTx) error {
6✔
136
                _, err := tx.CreateTopLevelBucket(sharedHashBucket)
3✔
137
                if err != nil {
3✔
138
                        return ErrDecayedLogInit
×
139
                }
×
140

141
                _, err = tx.CreateTopLevelBucket(batchReplayBucket)
3✔
142
                if err != nil {
3✔
143
                        return ErrDecayedLogInit
×
144
                }
×
145

146
                return nil
3✔
147
        }, func() {})
3✔
148
}
149

150
// Stop halts the garbage collector and closes boltdb.
151
func (d *DecayedLog) Stop() error {
3✔
152
        if !atomic.CompareAndSwapInt32(&d.stopped, 0, 1) {
3✔
153
                return nil
×
154
        }
×
155

156
        // Stop garbage collector.
157
        close(d.quit)
3✔
158

3✔
159
        d.wg.Wait()
3✔
160

3✔
161
        return nil
3✔
162
}
163

164
// garbageCollector deletes entries from sharedHashBucket whose expiry height
165
// has already past. This function MUST be run as a goroutine.
166
func (d *DecayedLog) garbageCollector(epochClient *chainntnfs.BlockEpochEvent) {
3✔
167
        defer d.wg.Done()
3✔
168
        defer epochClient.Cancel()
3✔
169

3✔
170
        for {
6✔
171
                select {
3✔
172
                case epoch, ok := <-epochClient.Epochs:
3✔
173
                        if !ok {
3✔
174
                                // Block epoch was canceled, shutting down.
×
175
                                log.Infof("Block epoch canceled, " +
×
176
                                        "decaying hash log shutting down")
×
177
                                return
×
178
                        }
×
179

180
                        // Perform a bout of garbage collection using the
181
                        // epoch's block height.
182
                        height := uint32(epoch.Height)
3✔
183
                        numExpired, err := d.gcExpiredHashes(height)
3✔
184
                        if err != nil {
3✔
185
                                log.Errorf("unable to expire hashes at "+
×
186
                                        "height=%d", height)
×
187
                        }
×
188

189
                        if numExpired > 0 {
6✔
190
                                log.Infof("Garbage collected %v shared "+
3✔
191
                                        "secret hashes at height=%v",
3✔
192
                                        numExpired, height)
3✔
193
                        }
3✔
194

195
                case <-d.quit:
3✔
196
                        // Received shutdown request.
3✔
197
                        log.Infof("Decaying hash log received " +
3✔
198
                                "shutdown request")
3✔
199
                        return
3✔
200
                }
201
        }
202
}
203

204
// gcExpiredHashes purges the decaying log of all entries whose CLTV expires
205
// below the provided height.
206
func (d *DecayedLog) gcExpiredHashes(height uint32) (uint32, error) {
3✔
207
        var numExpiredHashes uint32
3✔
208

3✔
209
        err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
6✔
210
                numExpiredHashes = 0
3✔
211

3✔
212
                // Grab the shared hash bucket
3✔
213
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
3✔
214
                if sharedHashes == nil {
3✔
215
                        return fmt.Errorf("sharedHashBucket " +
×
216
                                "is nil")
×
217
                }
×
218

219
                var expiredCltv [][]byte
3✔
220
                if err := sharedHashes.ForEach(func(k, v []byte) error {
6✔
221
                        // Deserialize the CLTV value for this entry.
3✔
222
                        cltv := uint32(binary.BigEndian.Uint32(v))
3✔
223

3✔
224
                        if cltv < height {
6✔
225
                                // This CLTV is expired. We must add it to an
3✔
226
                                // array which we'll loop over and delete every
3✔
227
                                // hash contained from the db.
3✔
228
                                expiredCltv = append(expiredCltv, k)
3✔
229
                                numExpiredHashes++
3✔
230
                        }
3✔
231

232
                        return nil
3✔
233
                }); err != nil {
×
234
                        return err
×
235
                }
×
236

237
                // Delete every item in the array. This must
238
                // be done explicitly outside of the ForEach
239
                // function for safety reasons.
240
                for _, hash := range expiredCltv {
6✔
241
                        err := sharedHashes.Delete(hash)
3✔
242
                        if err != nil {
3✔
243
                                return err
×
244
                        }
×
245
                }
246

247
                return nil
3✔
248
        })
249
        if err != nil {
3✔
250
                return 0, err
×
251
        }
×
252

253
        return numExpiredHashes, nil
3✔
254
}
255

256
// Delete removes a <shared secret hash, CLTV> key-pair from the
257
// sharedHashBucket.
258
func (d *DecayedLog) Delete(hash *sphinx.HashPrefix) error {
×
259
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
260
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
261
                if sharedHashes == nil {
×
262
                        return ErrDecayedLogCorrupted
×
263
                }
×
264

265
                return sharedHashes.Delete(hash[:])
×
266
        })
267
}
268

269
// Get retrieves the CLTV of a processed HTLC given the first 20 bytes of the
270
// Sha-256 hash of the shared secret.
271
func (d *DecayedLog) Get(hash *sphinx.HashPrefix) (uint32, error) {
×
272
        var value uint32
×
273

×
274
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
×
275
                // Grab the shared hash bucket which stores the mapping from
×
276
                // truncated sha-256 hashes of shared secrets to CLTV's.
×
277
                sharedHashes := tx.ReadBucket(sharedHashBucket)
×
278
                if sharedHashes == nil {
×
279
                        return fmt.Errorf("sharedHashes is nil, could " +
×
280
                                "not retrieve CLTV value")
×
281
                }
×
282

283
                // Retrieve the bytes which represents the CLTV
284
                valueBytes := sharedHashes.Get(hash[:])
×
285
                if valueBytes == nil {
×
286
                        return sphinx.ErrLogEntryNotFound
×
287
                }
×
288

289
                // The first 4 bytes represent the CLTV, store it in value.
290
                value = uint32(binary.BigEndian.Uint32(valueBytes))
×
291

×
292
                return nil
×
293
        }, func() {
×
294
                value = 0
×
295
        })
×
296
        if err != nil {
×
297
                return value, err
×
298
        }
×
299

300
        return value, nil
×
301
}
302

303
// Put stores a shared secret hash as the key and the CLTV as the value.
304
func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error {
×
305
        // Optimisitically serialize the cltv value into the scratch buffer.
×
306
        var scratch [4]byte
×
307
        binary.BigEndian.PutUint32(scratch[:], cltv)
×
308

×
309
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
310
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
311
                if sharedHashes == nil {
×
312
                        return ErrDecayedLogCorrupted
×
313
                }
×
314

315
                // Check to see if this hash prefix has been recorded before. If
316
                // a value is found, this packet is being replayed.
317
                valueBytes := sharedHashes.Get(hash[:])
×
318
                if valueBytes != nil {
×
319
                        return sphinx.ErrReplayedPacket
×
320
                }
×
321

322
                return sharedHashes.Put(hash[:], scratch[:])
×
323
        })
324
}
325

326
// PutBatch accepts a pending batch of hashed secret entries to write to disk.
327
// Each hashed secret is inserted with a corresponding time value, dictating
328
// when the entry will be evicted from the log.
329
// NOTE: This method enforces idempotency by writing the replay set obtained
330
// from the first attempt for a particular batch ID, and decoding the return
331
// value to subsequent calls. For the indices of the replay set to be aligned
332
// properly, the batch MUST be constructed identically to the first attempt,
333
// pruning will cause the indices to become invalid.
334
func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
3✔
335
        // Since batched boltdb txns may be executed multiple times before
3✔
336
        // succeeding, we will create a new replay set for each invocation to
3✔
337
        // avoid any side-effects. If the txn is successful, this replay set
3✔
338
        // will be merged with the replay set computed during batch construction
3✔
339
        // to generate the complete replay set. If this batch was previously
3✔
340
        // processed, the replay set will be deserialized from disk.
3✔
341
        var replays *sphinx.ReplaySet
3✔
342
        if err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
6✔
343
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
3✔
344
                if sharedHashes == nil {
3✔
345
                        return ErrDecayedLogCorrupted
×
346
                }
×
347

348
                // Load the batch replay bucket, which will be used to either
349
                // retrieve the result of previously processing this batch, or
350
                // to write the result of this operation.
351
                batchReplayBkt := tx.ReadWriteBucket(batchReplayBucket)
3✔
352
                if batchReplayBkt == nil {
3✔
353
                        return ErrDecayedLogCorrupted
×
354
                }
×
355

356
                // Check for the existence of this batch's id in the replay
357
                // bucket. If a non-nil value is found, this indicates that we
358
                // have already processed this batch before. We deserialize the
359
                // resulting and return it to ensure calls to put batch are
360
                // idempotent.
361
                replayBytes := batchReplayBkt.Get(b.ID)
3✔
362
                if replayBytes != nil {
6✔
363
                        replays = sphinx.NewReplaySet()
3✔
364
                        return replays.Decode(bytes.NewReader(replayBytes))
3✔
365
                }
3✔
366

367
                // The CLTV will be stored into scratch and then stored into the
368
                // sharedHashBucket.
369
                var scratch [4]byte
3✔
370

3✔
371
                replays = sphinx.NewReplaySet()
3✔
372
                err := b.ForEach(func(seqNum uint16, hashPrefix *sphinx.HashPrefix, cltv uint32) error {
6✔
373
                        // Retrieve the bytes which represents the CLTV
3✔
374
                        valueBytes := sharedHashes.Get(hashPrefix[:])
3✔
375
                        if valueBytes != nil {
6✔
376
                                replays.Add(seqNum)
3✔
377
                                return nil
3✔
378
                        }
3✔
379

380
                        // Serialize the cltv value and write an entry keyed by
381
                        // the hash prefix.
382
                        binary.BigEndian.PutUint32(scratch[:], cltv)
3✔
383
                        return sharedHashes.Put(hashPrefix[:], scratch[:])
3✔
384
                })
385
                if err != nil {
3✔
386
                        return err
×
387
                }
×
388

389
                // Merge the replay set computed from checking the on-disk
390
                // entries with the in-batch replays computed during this
391
                // batch's construction.
392
                replays.Merge(b.ReplaySet)
3✔
393

3✔
394
                // Write the replay set under the batch identifier to the batch
3✔
395
                // replays bucket. This can be used during recovery to test (1)
3✔
396
                // that a particular batch was successfully processed and (2)
3✔
397
                // recover the indexes of the adds that were rejected as
3✔
398
                // replays.
3✔
399
                var replayBuf bytes.Buffer
3✔
400
                if err := replays.Encode(&replayBuf); err != nil {
3✔
401
                        return err
×
402
                }
×
403

404
                return batchReplayBkt.Put(b.ID, replayBuf.Bytes())
3✔
405
        }); err != nil {
×
406
                return nil, err
×
407
        }
×
408

409
        b.ReplaySet = replays
3✔
410
        b.IsCommitted = true
3✔
411

3✔
412
        return replays, nil
3✔
413
}
414

415
// A compile time check to see if DecayedLog adheres to the PersistLog
416
// interface.
417
var _ sphinx.ReplayLog = (*DecayedLog)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc