• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16466354971

23 Jul 2025 09:05AM UTC coverage: 57.54% (-9.7%) from 67.201%
16466354971

Pull #9455

github

web-flow
Merge f914ae23c into 90e211684
Pull Request #9455: discovery+lnwire: add support for DNS host name in NodeAnnouncement msg

151 of 291 new or added lines in 7 files covered. (51.89%)

28441 existing lines in 456 files now uncovered.

98864 of 171817 relevant lines covered (57.54%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.07
/htlcswitch/decayedlog.go
1
package htlcswitch
2

3
import (
4
        "encoding/binary"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        sphinx "github.com/lightningnetwork/lightning-onion"
11
        "github.com/lightningnetwork/lnd/chainntnfs"
12
        "github.com/lightningnetwork/lnd/kvdb"
13
)
14

15
const (
16
        // defaultDbDirectory is the default directory where our decayed log
17
        // will store our (sharedHash, CLTV) key-value pairs.
18
        defaultDbDirectory = "sharedhashes"
19
)
20

21
var (
22
        // sharedHashBucket is a bucket which houses the first HashPrefixSize
23
        // bytes of a received HTLC's hashed shared secret as the key and the HTLC's
24
        // CLTV expiry as the value.
25
        sharedHashBucket = []byte("shared-hash")
26
)
27

28
var (
29
        // ErrDecayedLogInit is used to indicate a decayed log failed to create
30
        // the proper bucketing structure on startup.
31
        ErrDecayedLogInit = errors.New("unable to initialize decayed log")
32

33
        // ErrDecayedLogCorrupted signals that the anticipated bucketing
34
        // structure has diverged since initialization.
35
        ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted")
36
)
37

38
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
39
// the decayed logs database.
40
func NewBoltBackendCreator(dbPath,
UNCOV
41
        dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
UNCOV
42

×
UNCOV
43
        return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
×
UNCOV
44
                cfg := &kvdb.BoltBackendConfig{
×
UNCOV
45
                        DBPath:            dbPath,
×
UNCOV
46
                        DBFileName:        dbFileName,
×
UNCOV
47
                        NoFreelistSync:    boltCfg.NoFreelistSync,
×
UNCOV
48
                        AutoCompact:       boltCfg.AutoCompact,
×
UNCOV
49
                        AutoCompactMinAge: boltCfg.AutoCompactMinAge,
×
UNCOV
50
                        DBTimeout:         boltCfg.DBTimeout,
×
UNCOV
51
                }
×
UNCOV
52

×
UNCOV
53
                // Use default path for log database.
×
UNCOV
54
                if dbPath == "" {
×
55
                        cfg.DBPath = defaultDbDirectory
×
56
                }
×
57

UNCOV
58
                db, err := kvdb.GetBoltBackend(cfg)
×
UNCOV
59
                if err != nil {
×
60
                        return nil, fmt.Errorf("could not open boltdb: %w", err)
×
61
                }
×
62

UNCOV
63
                return db, nil
×
64
        }
65
}
66

67
// DecayedLog implements the PersistLog interface. It stores the first
68
// HashPrefixSize bytes of a sha256-hashed shared secret along with a node's
69
// CLTV value. It is a decaying log meaning there will be a garbage collector
70
// to collect entries which are expired according to their stored CLTV value
71
// and the current block height. DecayedLog wraps boltdb for simplicity and
72
// batches writes to the database to decrease write contention.
73
type DecayedLog struct {
74
        started int32 // To be used atomically.
75
        stopped int32 // To be used atomically.
76

77
        db kvdb.Backend
78

79
        notifier chainntnfs.ChainNotifier
80

81
        wg   sync.WaitGroup
82
        quit chan struct{}
83
}
84

85
// NewDecayedLog creates a new DecayedLog, which caches recently seen hash
86
// shared secrets. Entries are evicted as their cltv expires using block epochs
87
// from the given notifier.
88
func NewDecayedLog(db kvdb.Backend,
89
        notifier chainntnfs.ChainNotifier) *DecayedLog {
3✔
90

3✔
91
        return &DecayedLog{
3✔
92
                db:       db,
3✔
93
                notifier: notifier,
3✔
94
                quit:     make(chan struct{}),
3✔
95
        }
3✔
96
}
3✔
97

98
// Start opens the database we will be using to store hashed shared secrets.
99
// It also starts the garbage collector in a goroutine to remove stale
100
// database entries.
101
func (d *DecayedLog) Start() error {
3✔
102
        if !atomic.CompareAndSwapInt32(&d.started, 0, 1) {
3✔
103
                return nil
×
104
        }
×
105

106
        // Initialize the primary buckets used by the decayed log.
107
        if err := d.initBuckets(); err != nil {
3✔
108
                return err
×
109
        }
×
110

111
        // Start garbage collector.
112
        if d.notifier != nil {
6✔
113
                epochClient, err := d.notifier.RegisterBlockEpochNtfn(nil)
3✔
114
                if err != nil {
3✔
115
                        return fmt.Errorf("unable to register for epoch "+
×
116
                                "notifications: %v", err)
×
117
                }
×
118

119
                d.wg.Add(1)
3✔
120
                go d.garbageCollector(epochClient)
3✔
121
        }
122

123
        return nil
3✔
124
}
125

126
// initBuckets initializes the primary buckets used by the decayed log, namely
127
// the shared hash bucket, and batch replay
128
func (d *DecayedLog) initBuckets() error {
3✔
129
        return kvdb.Update(d.db, func(tx kvdb.RwTx) error {
6✔
130
                _, err := tx.CreateTopLevelBucket(sharedHashBucket)
3✔
131
                if err != nil {
3✔
132
                        return ErrDecayedLogInit
×
133
                }
×
134

135
                return nil
3✔
136
        }, func() {})
3✔
137
}
138

139
// Stop halts the garbage collector and closes boltdb.
140
func (d *DecayedLog) Stop() error {
3✔
141
        log.Debugf("DecayedLog shutting down...")
3✔
142
        defer log.Debugf("DecayedLog shutdown complete")
3✔
143

3✔
144
        if !atomic.CompareAndSwapInt32(&d.stopped, 0, 1) {
3✔
UNCOV
145
                return nil
×
UNCOV
146
        }
×
147

148
        // Stop garbage collector.
149
        close(d.quit)
3✔
150

3✔
151
        d.wg.Wait()
3✔
152

3✔
153
        return nil
3✔
154
}
155

156
// garbageCollector deletes entries from sharedHashBucket whose expiry height
157
// has already past. This function MUST be run as a goroutine.
158
func (d *DecayedLog) garbageCollector(epochClient *chainntnfs.BlockEpochEvent) {
3✔
159
        defer d.wg.Done()
3✔
160
        defer epochClient.Cancel()
3✔
161

3✔
162
        for {
6✔
163
                select {
3✔
164
                case epoch, ok := <-epochClient.Epochs:
3✔
165
                        if !ok {
3✔
166
                                // Block epoch was canceled, shutting down.
×
167
                                log.Infof("Block epoch canceled, " +
×
168
                                        "decaying hash log shutting down")
×
169
                                return
×
170
                        }
×
171

172
                        // Perform a bout of garbage collection using the
173
                        // epoch's block height.
174
                        height := uint32(epoch.Height)
3✔
175
                        numExpired, err := d.gcExpiredHashes(height)
3✔
176
                        if err != nil {
3✔
177
                                log.Errorf("unable to expire hashes at "+
×
178
                                        "height=%d", height)
×
179
                        }
×
180

181
                        if numExpired > 0 {
6✔
182
                                log.Infof("Garbage collected %v shared "+
3✔
183
                                        "secret hashes at height=%v",
3✔
184
                                        numExpired, height)
3✔
185
                        }
3✔
186

187
                case <-d.quit:
3✔
188
                        // Received shutdown request.
3✔
189
                        log.Infof("Decaying hash log received " +
3✔
190
                                "shutdown request")
3✔
191
                        return
3✔
192
                }
193
        }
194
}
195

196
// gcExpiredHashes purges the decaying log of all entries whose CLTV expires
197
// below the provided height.
198
func (d *DecayedLog) gcExpiredHashes(height uint32) (uint32, error) {
3✔
199
        var numExpiredHashes uint32
3✔
200

3✔
201
        err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
6✔
202
                numExpiredHashes = 0
3✔
203

3✔
204
                // Grab the shared hash bucket
3✔
205
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
3✔
206
                if sharedHashes == nil {
3✔
207
                        return fmt.Errorf("sharedHashBucket " +
×
208
                                "is nil")
×
209
                }
×
210

211
                var expiredCltv [][]byte
3✔
212
                if err := sharedHashes.ForEach(func(k, v []byte) error {
6✔
213
                        // Deserialize the CLTV value for this entry.
3✔
214
                        cltv := uint32(binary.BigEndian.Uint32(v))
3✔
215

3✔
216
                        if cltv < height {
6✔
217
                                // This CLTV is expired. We must add it to an
3✔
218
                                // array which we'll loop over and delete every
3✔
219
                                // hash contained from the db.
3✔
220
                                expiredCltv = append(expiredCltv, k)
3✔
221
                                numExpiredHashes++
3✔
222
                        }
3✔
223

224
                        return nil
3✔
225
                }); err != nil {
×
226
                        return err
×
227
                }
×
228

229
                // Delete every item in the array. This must
230
                // be done explicitly outside of the ForEach
231
                // function for safety reasons.
232
                for _, hash := range expiredCltv {
6✔
233
                        err := sharedHashes.Delete(hash)
3✔
234
                        if err != nil {
3✔
235
                                return err
×
236
                        }
×
237
                }
238

239
                return nil
3✔
240
        })
241
        if err != nil {
3✔
242
                return 0, err
×
243
        }
×
244

245
        return numExpiredHashes, nil
3✔
246
}
247

248
// Delete removes a <shared secret hash, CLTV> key-pair from the
249
// sharedHashBucket.
UNCOV
250
func (d *DecayedLog) Delete(hash *sphinx.HashPrefix) error {
×
UNCOV
251
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
UNCOV
252
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
UNCOV
253
                if sharedHashes == nil {
×
254
                        return ErrDecayedLogCorrupted
×
255
                }
×
256

UNCOV
257
                return sharedHashes.Delete(hash[:])
×
258
        })
259
}
260

261
// Get retrieves the CLTV of a processed HTLC given the first 20 bytes of the
262
// Sha-256 hash of the shared secret.
UNCOV
263
func (d *DecayedLog) Get(hash *sphinx.HashPrefix) (uint32, error) {
×
UNCOV
264
        var value uint32
×
UNCOV
265

×
UNCOV
266
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
×
UNCOV
267
                // Grab the shared hash bucket which stores the mapping from
×
UNCOV
268
                // truncated sha-256 hashes of shared secrets to CLTV's.
×
UNCOV
269
                sharedHashes := tx.ReadBucket(sharedHashBucket)
×
UNCOV
270
                if sharedHashes == nil {
×
271
                        return fmt.Errorf("sharedHashes is nil, could " +
×
272
                                "not retrieve CLTV value")
×
273
                }
×
274

275
                // Retrieve the bytes which represents the CLTV
UNCOV
276
                valueBytes := sharedHashes.Get(hash[:])
×
UNCOV
277
                if valueBytes == nil {
×
UNCOV
278
                        return sphinx.ErrLogEntryNotFound
×
UNCOV
279
                }
×
280

281
                // The first 4 bytes represent the CLTV, store it in value.
UNCOV
282
                value = uint32(binary.BigEndian.Uint32(valueBytes))
×
UNCOV
283

×
UNCOV
284
                return nil
×
UNCOV
285
        }, func() {
×
UNCOV
286
                value = 0
×
UNCOV
287
        })
×
UNCOV
288
        if err != nil {
×
UNCOV
289
                return value, err
×
UNCOV
290
        }
×
291

UNCOV
292
        return value, nil
×
293
}
294

295
// Put stores a shared secret hash as the key and the CLTV as the value.
UNCOV
296
func (d *DecayedLog) Put(hash *sphinx.HashPrefix, cltv uint32) error {
×
UNCOV
297
        // Optimisitically serialize the cltv value into the scratch buffer.
×
UNCOV
298
        var scratch [4]byte
×
UNCOV
299
        binary.BigEndian.PutUint32(scratch[:], cltv)
×
UNCOV
300

×
UNCOV
301
        return kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
×
UNCOV
302
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
×
UNCOV
303
                if sharedHashes == nil {
×
304
                        return ErrDecayedLogCorrupted
×
305
                }
×
306

307
                // Check to see if this hash prefix has been recorded before. If
308
                // a value is found, this packet is being replayed.
UNCOV
309
                valueBytes := sharedHashes.Get(hash[:])
×
UNCOV
310
                if valueBytes != nil {
×
311
                        return sphinx.ErrReplayedPacket
×
312
                }
×
313

UNCOV
314
                return sharedHashes.Put(hash[:], scratch[:])
×
315
        })
316
}
317

318
// PutBatch accepts a pending batch of hashed secret entries to write to disk.
319
// Each hashed secret is inserted with a corresponding time value, dictating
320
// when the entry will be evicted from the log.
321
//
322
// TODO(yy): remove this method and use `Put` instead.
323
func (d *DecayedLog) PutBatch(b *sphinx.Batch) (*sphinx.ReplaySet, error) {
3✔
324
        // Since batched boltdb txns may be executed multiple times before
3✔
325
        // succeeding, we will create a new replay set for each invocation to
3✔
326
        // avoid any side-effects. If the txn is successful, this replay set
3✔
327
        // will be merged with the replay set computed during batch construction
3✔
328
        // to generate the complete replay set. If this batch was previously
3✔
329
        // processed, the replay set will be deserialized from disk.
3✔
330
        var replays *sphinx.ReplaySet
3✔
331
        if err := kvdb.Batch(d.db, func(tx kvdb.RwTx) error {
6✔
332
                sharedHashes := tx.ReadWriteBucket(sharedHashBucket)
3✔
333
                if sharedHashes == nil {
3✔
334
                        return ErrDecayedLogCorrupted
×
335
                }
×
336

337
                // The CLTV will be stored into scratch and then stored into the
338
                // sharedHashBucket.
339
                var scratch [4]byte
3✔
340

3✔
341
                replays = sphinx.NewReplaySet()
3✔
342
                err := b.ForEach(func(seqNum uint16, hashPrefix *sphinx.HashPrefix, cltv uint32) error {
6✔
343
                        // Retrieve the bytes which represents the CLTV
3✔
344
                        valueBytes := sharedHashes.Get(hashPrefix[:])
3✔
345
                        if valueBytes != nil {
6✔
346
                                replays.Add(seqNum)
3✔
347
                                return nil
3✔
348
                        }
3✔
349

350
                        // Serialize the cltv value and write an entry keyed by
351
                        // the hash prefix.
352
                        binary.BigEndian.PutUint32(scratch[:], cltv)
3✔
353
                        return sharedHashes.Put(hashPrefix[:], scratch[:])
3✔
354
                })
355
                if err != nil {
3✔
356
                        return err
×
357
                }
×
358

359
                // Merge the replay set computed from checking the on-disk
360
                // entries with the in-batch replays computed during this
361
                // batch's construction.
362
                replays.Merge(b.ReplaySet)
3✔
363

3✔
364
                return nil
3✔
365
        }); err != nil {
×
366
                return nil, err
×
367
        }
×
368

369
        b.ReplaySet = replays
3✔
370
        b.IsCommitted = true
3✔
371

3✔
372
        return replays, nil
3✔
373
}
374

375
// A compile time check to see if DecayedLog adheres to the PersistLog
376
// interface.
377
var _ sphinx.ReplayLog = (*DecayedLog)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc