• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.19
/routing/missioncontrol_store.go
1
package routing
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "encoding/binary"
7
        "fmt"
8
        "math"
9
        "sync"
10
        "time"
11

12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/lightningnetwork/lnd/channeldb"
14
        "github.com/lightningnetwork/lnd/kvdb"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
)
17

18
var (
19
        // resultsKey is the fixed key under which the attempt results are
20
        // stored.
21
        resultsKey = []byte("missioncontrol-results")
22

23
        // Big endian is the preferred byte order, due to cursor scans over
24
        // integer keys iterating in order.
25
        byteOrder = binary.BigEndian
26
)
27

28
const (
29
        // unknownFailureSourceIdx is the database encoding of an unknown error
30
        // source.
31
        unknownFailureSourceIdx = -1
32
)
33

34
// missionControlStore is a bolt db based implementation of a mission control
35
// store. It stores the raw payment attempt data from which the internal mission
36
// controls state can be rederived on startup. This allows the mission control
37
// internal data structure to be changed without requiring a database migration.
38
// Also changes to mission control parameters can be applied to historical data.
39
// Finally, it enables importing raw data from an external source.
40
type missionControlStore struct {
41
        done chan struct{}
42
        wg   sync.WaitGroup
43
        db   kvdb.Backend
44

45
        // queueCond is signalled when items are put into the queue.
46
        queueCond *sync.Cond
47

48
        // queue stores all pending payment results not yet added to the store.
49
        // Access is protected by the queueCond.L mutex.
50
        queue *list.List
51

52
        // keys holds the stored MC store item keys in the order of storage.
53
        // We use this list when adding/deleting items from the database to
54
        // avoid cursor use which may be slow in the remote DB case.
55
        keys *list.List
56

57
        // keysMap holds the stored MC store item keys. We use this map to check
58
        // if a new payment result has already been stored.
59
        keysMap map[string]struct{}
60

61
        // maxRecords is the maximum amount of records we will store in the db.
62
        maxRecords int
63

64
        // flushInterval is the configured interval we use to store new results
65
        // and delete outdated ones from the db.
66
        flushInterval time.Duration
67
}
68

69
func newMissionControlStore(db kvdb.Backend, maxRecords int,
70
        flushInterval time.Duration) (*missionControlStore, error) {
3✔
71

3✔
72
        var (
3✔
73
                keys    *list.List
3✔
74
                keysMap map[string]struct{}
3✔
75
        )
3✔
76

3✔
77
        // Create buckets if not yet existing.
3✔
78
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
79
                resultsBucket, err := tx.CreateTopLevelBucket(resultsKey)
3✔
80
                if err != nil {
3✔
81
                        return fmt.Errorf("cannot create results bucket: %w",
×
82
                                err)
×
83
                }
×
84

85
                // Collect all keys to be able to quickly calculate the
86
                // difference when updating the DB state.
87
                c := resultsBucket.ReadCursor()
3✔
88
                for k, _ := c.First(); k != nil; k, _ = c.Next() {
6✔
89
                        keys.PushBack(string(k))
3✔
90
                        keysMap[string(k)] = struct{}{}
3✔
91
                }
3✔
92

93
                return nil
3✔
94
        }, func() {
3✔
95
                keys = list.New()
3✔
96
                keysMap = make(map[string]struct{})
3✔
97
        })
3✔
98
        if err != nil {
3✔
99
                return nil, err
×
100
        }
×
101

102
        log.Infof("Loaded %d mission control entries", len(keysMap))
3✔
103

3✔
104
        return &missionControlStore{
3✔
105
                done:          make(chan struct{}),
3✔
106
                db:            db,
3✔
107
                queueCond:     sync.NewCond(&sync.Mutex{}),
3✔
108
                queue:         list.New(),
3✔
109
                keys:          keys,
3✔
110
                keysMap:       keysMap,
3✔
111
                maxRecords:    maxRecords,
3✔
112
                flushInterval: flushInterval,
3✔
113
        }, nil
3✔
114
}
115

116
// clear removes all results from the db.
117
func (b *missionControlStore) clear() error {
3✔
118
        b.queueCond.L.Lock()
3✔
119
        defer b.queueCond.L.Unlock()
3✔
120

3✔
121
        err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
6✔
122
                if err := tx.DeleteTopLevelBucket(resultsKey); err != nil {
3✔
123
                        return err
×
124
                }
×
125

126
                _, err := tx.CreateTopLevelBucket(resultsKey)
3✔
127
                return err
3✔
128
        }, func() {})
3✔
129

130
        if err != nil {
3✔
131
                return err
×
132
        }
×
133

134
        b.queue = list.New()
3✔
135
        return nil
3✔
136
}
137

138
// fetchAll returns all results currently stored in the database.
139
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
3✔
140
        var results []*paymentResult
3✔
141

3✔
142
        err := kvdb.View(b.db, func(tx kvdb.RTx) error {
6✔
143
                resultBucket := tx.ReadBucket(resultsKey)
3✔
144
                results = make([]*paymentResult, 0)
3✔
145

3✔
146
                return resultBucket.ForEach(func(k, v []byte) error {
6✔
147
                        result, err := deserializeResult(k, v)
3✔
148
                        if err != nil {
3✔
149
                                return err
×
150
                        }
×
151

152
                        results = append(results, result)
3✔
153

3✔
154
                        return nil
3✔
155
                })
156

157
        }, func() {
3✔
158
                results = nil
3✔
159
        })
3✔
160
        if err != nil {
3✔
161
                return nil, err
×
162
        }
×
163

164
        return results, nil
3✔
165
}
166

167
// serializeResult serializes a payment result and returns a key and value byte
168
// slice to insert into the bucket.
169
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
3✔
170
        // Write timestamps, success status, failure source index and route.
3✔
171
        var b bytes.Buffer
3✔
172

3✔
173
        var dbFailureSourceIdx int32
3✔
174
        if rp.failureSourceIdx == nil {
6✔
175
                dbFailureSourceIdx = unknownFailureSourceIdx
3✔
176
        } else {
6✔
177
                dbFailureSourceIdx = int32(*rp.failureSourceIdx)
3✔
178
        }
3✔
179

180
        err := channeldb.WriteElements(
3✔
181
                &b,
3✔
182
                uint64(rp.timeFwd.UnixNano()),
3✔
183
                uint64(rp.timeReply.UnixNano()),
3✔
184
                rp.success, dbFailureSourceIdx,
3✔
185
        )
3✔
186
        if err != nil {
3✔
187
                return nil, nil, err
×
188
        }
×
189

190
        if err := channeldb.SerializeRoute(&b, *rp.route); err != nil {
3✔
191
                return nil, nil, err
×
192
        }
×
193

194
        // Write failure. If there is no failure message, write an empty
195
        // byte slice.
196
        var failureBytes bytes.Buffer
3✔
197
        if rp.failure != nil {
6✔
198
                err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0)
3✔
199
                if err != nil {
3✔
200
                        return nil, nil, err
×
201
                }
×
202
        }
203
        err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes())
3✔
204
        if err != nil {
3✔
205
                return nil, nil, err
×
206
        }
×
207

208
        // Compose key that identifies this result.
209
        key := getResultKey(rp)
3✔
210

3✔
211
        return key, b.Bytes(), nil
3✔
212
}
213

214
// deserializeResult deserializes a payment result.
215
func deserializeResult(k, v []byte) (*paymentResult, error) {
3✔
216
        // Parse payment id.
3✔
217
        result := paymentResult{
3✔
218
                id: byteOrder.Uint64(k[8:]),
3✔
219
        }
3✔
220

3✔
221
        r := bytes.NewReader(v)
3✔
222

3✔
223
        // Read timestamps, success status and failure source index.
3✔
224
        var (
3✔
225
                timeFwd, timeReply uint64
3✔
226
                dbFailureSourceIdx int32
3✔
227
        )
3✔
228

3✔
229
        err := channeldb.ReadElements(
3✔
230
                r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx,
3✔
231
        )
3✔
232
        if err != nil {
3✔
233
                return nil, err
×
234
        }
×
235

236
        // Convert time stamps to local time zone for consistent logging.
237
        result.timeFwd = time.Unix(0, int64(timeFwd)).Local()
3✔
238
        result.timeReply = time.Unix(0, int64(timeReply)).Local()
3✔
239

3✔
240
        // Convert from unknown index magic number to nil value.
3✔
241
        if dbFailureSourceIdx != unknownFailureSourceIdx {
6✔
242
                failureSourceIdx := int(dbFailureSourceIdx)
3✔
243
                result.failureSourceIdx = &failureSourceIdx
3✔
244
        }
3✔
245

246
        // Read route.
247
        route, err := channeldb.DeserializeRoute(r)
3✔
248
        if err != nil {
3✔
249
                return nil, err
×
250
        }
×
251
        result.route = &route
3✔
252

3✔
253
        // Read failure.
3✔
254
        failureBytes, err := wire.ReadVarBytes(
3✔
255
                r, 0, math.MaxUint16, "failure",
3✔
256
        )
3✔
257
        if err != nil {
3✔
258
                return nil, err
×
259
        }
×
260
        if len(failureBytes) > 0 {
6✔
261
                result.failure, err = lnwire.DecodeFailureMessage(
3✔
262
                        bytes.NewReader(failureBytes), 0,
3✔
263
                )
3✔
264
                if err != nil {
3✔
265
                        return nil, err
×
266
                }
×
267
        }
268

269
        return &result, nil
3✔
270
}
271

272
// AddResult adds a new result to the db.
273
func (b *missionControlStore) AddResult(rp *paymentResult) {
3✔
274
        b.queueCond.L.Lock()
3✔
275
        b.queue.PushBack(rp)
3✔
276
        b.queueCond.L.Unlock()
3✔
277

3✔
278
        b.queueCond.Signal()
3✔
279
}
3✔
280

281
// stop stops the store ticker goroutine.
282
func (b *missionControlStore) stop() {
3✔
283
        close(b.done)
3✔
284

3✔
285
        b.queueCond.Signal()
3✔
286

3✔
287
        b.wg.Wait()
3✔
288
}
3✔
289

290
// run runs the MC store ticker goroutine.
291
func (b *missionControlStore) run() {
3✔
292
        b.wg.Add(1)
3✔
293

3✔
294
        go func() {
6✔
295
                defer b.wg.Done()
3✔
296

3✔
297
                timer := time.NewTimer(b.flushInterval)
3✔
298

3✔
299
                // Immediately stop the timer. It will be started once new
3✔
300
                // items are added to the store. As the doc for time.Timer
3✔
301
                // states, every call to Stop() done on a timer that is not
3✔
302
                // known to have been fired needs to be checked and the timer's
3✔
303
                // channel needs to be drained appropriately. This could happen
3✔
304
                // if the flushInterval is very small (e.g. 1 nanosecond).
3✔
305
                if !timer.Stop() {
3✔
306
                        <-timer.C
×
307
                }
×
308

309
                for {
6✔
310
                        // Wait for the queue to not be empty.
3✔
311
                        b.queueCond.L.Lock()
3✔
312
                        for b.queue.Front() == nil {
6✔
313
                                b.queueCond.Wait()
3✔
314

3✔
315
                                select {
3✔
316
                                case <-b.done:
3✔
317
                                        b.queueCond.L.Unlock()
3✔
318

3✔
319
                                        return
3✔
320
                                default:
3✔
321
                                }
322
                        }
323
                        b.queueCond.L.Unlock()
3✔
324

3✔
325
                        // Restart the timer.
3✔
326
                        timer.Reset(b.flushInterval)
3✔
327

3✔
328
                        select {
3✔
329
                        case <-timer.C:
3✔
330
                                if err := b.storeResults(); err != nil {
3✔
331
                                        log.Errorf("Failed to update mission "+
×
332
                                                "control store: %v", err)
×
333
                                }
×
334

335
                        case <-b.done:
2✔
336
                                // Release the timer's resources.
2✔
337
                                if !timer.Stop() {
2✔
338
                                        <-timer.C
×
339
                                }
×
340
                                return
2✔
341
                        }
342
                }
343
        }()
344
}
345

346
// storeResults stores all accumulated results.
347
func (b *missionControlStore) storeResults() error {
3✔
348
        // We copy a reference to the queue and clear the original queue to be
3✔
349
        // able to release the lock.
3✔
350
        b.queueCond.L.Lock()
3✔
351
        l := b.queue
3✔
352

3✔
353
        if l.Len() == 0 {
3✔
354
                b.queueCond.L.Unlock()
×
355

×
356
                return nil
×
357
        }
×
358
        b.queue = list.New()
3✔
359
        b.queueCond.L.Unlock()
3✔
360

3✔
361
        var (
3✔
362
                newKeys    map[string]struct{}
3✔
363
                delKeys    []string
3✔
364
                storeCount int
3✔
365
                pruneCount int
3✔
366
        )
3✔
367

3✔
368
        // Create a deduped list of new entries.
3✔
369
        newKeys = make(map[string]struct{}, l.Len())
3✔
370
        for e := l.Front(); e != nil; e = e.Next() {
6✔
371
                pr, ok := e.Value.(*paymentResult)
3✔
372
                if !ok {
3✔
373
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
374
                                e.Value)
×
375
                }
×
376
                key := string(getResultKey(pr))
3✔
377
                if _, ok := b.keysMap[key]; ok {
3✔
378
                        l.Remove(e)
×
379
                        continue
×
380
                }
381
                if _, ok := newKeys[key]; ok {
3✔
382
                        l.Remove(e)
×
383
                        continue
×
384
                }
385
                newKeys[key] = struct{}{}
3✔
386
        }
387

388
        // Create a list of entries to delete.
389
        toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
3✔
390
        if b.maxRecords > 0 && toDelete > 0 {
3✔
391
                delKeys = make([]string, 0, toDelete)
×
392

×
393
                // Delete as many as needed from old keys.
×
394
                for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
×
395
                        key, ok := e.Value.(string)
×
396
                        if !ok {
×
397
                                return fmt.Errorf("wrong type %T (not string)",
×
398
                                        e.Value)
×
399
                        }
×
400
                        delKeys = append(delKeys, key)
×
401
                        e = e.Next()
×
402
                }
403

404
                // If more deletions are needed, simply do not add from the
405
                // list of new keys.
406
                for e := l.Front(); len(delKeys) < toDelete && e != nil; {
×
407
                        toDelete--
×
408
                        pr, ok := e.Value.(*paymentResult)
×
409
                        if !ok {
×
410
                                return fmt.Errorf("wrong type %T (not "+
×
411
                                        "*paymentResult )", e.Value)
×
412
                        }
×
413
                        key := string(getResultKey(pr))
×
414
                        delete(newKeys, key)
×
415
                        l.Remove(e)
×
416
                        e = l.Front()
×
417
                }
418
        }
419

420
        err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
6✔
421
                bucket := tx.ReadWriteBucket(resultsKey)
3✔
422

3✔
423
                for e := l.Front(); e != nil; e = e.Next() {
6✔
424
                        pr, ok := e.Value.(*paymentResult)
3✔
425
                        if !ok {
3✔
426
                                return fmt.Errorf("wrong type %T (not "+
×
427
                                        "*paymentResult)", e.Value)
×
428
                        }
×
429

430
                        // Serialize result into key and value byte slices.
431
                        k, v, err := serializeResult(pr)
3✔
432
                        if err != nil {
3✔
433
                                return err
×
434
                        }
×
435

436
                        // Put into results bucket.
437
                        if err := bucket.Put(k, v); err != nil {
3✔
438
                                return err
×
439
                        }
×
440

441
                        storeCount++
3✔
442
                }
443

444
                // Prune oldest entries.
445
                for _, key := range delKeys {
3✔
446
                        if err := bucket.Delete([]byte(key)); err != nil {
×
447
                                return err
×
448
                        }
×
449
                        pruneCount++
×
450
                }
451

452
                return nil
3✔
453
        }, func() {
3✔
454
                storeCount, pruneCount = 0, 0
3✔
455
        })
3✔
456

457
        if err != nil {
3✔
458
                return err
×
459
        }
×
460

461
        log.Debugf("Stored mission control results: %d added, %d deleted",
3✔
462
                storeCount, pruneCount)
3✔
463

3✔
464
        // DB Update was successful, update the in-memory cache.
3✔
465
        for _, key := range delKeys {
3✔
466
                delete(b.keysMap, key)
×
467
                b.keys.Remove(b.keys.Front())
×
468
        }
×
469
        for e := l.Front(); e != nil; e = e.Next() {
6✔
470
                pr, ok := e.Value.(*paymentResult)
3✔
471
                if !ok {
3✔
472
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
473
                                e.Value)
×
474
                }
×
475
                key := string(getResultKey(pr))
3✔
476
                b.keys.PushBack(key)
3✔
477
        }
478

479
        return nil
3✔
480
}
481

482
// getResultKey returns a byte slice representing a unique key for this payment
483
// result.
484
func getResultKey(rp *paymentResult) []byte {
3✔
485
        var keyBytes [8 + 8 + 33]byte
3✔
486

3✔
487
        // Identify records by a combination of time, payment id and sender pub
3✔
488
        // key. This allows importing mission control data from an external
3✔
489
        // source without key collisions and keeps the records sorted
3✔
490
        // chronologically.
3✔
491
        byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano()))
3✔
492
        byteOrder.PutUint64(keyBytes[8:], rp.id)
3✔
493
        copy(keyBytes[16:], rp.route.SourcePubKey[:])
3✔
494

3✔
495
        return keyBytes[:]
3✔
496
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc