• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.32
/routing/missioncontrol_store.go
1
package routing
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "encoding/binary"
7
        "fmt"
8
        "io"
9
        "math"
10
        "sync"
11
        "time"
12

13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/kvdb"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
var (
20
        // resultsKey is the fixed key under which the attempt results are
21
        // stored.
22
        resultsKey = []byte("missioncontrol-results")
23

24
        // Big endian is the preferred byte order, due to cursor scans over
25
        // integer keys iterating in order.
26
        byteOrder = binary.BigEndian
27
)
28

29
const (
30
        // unknownFailureSourceIdx is the database encoding of an unknown error
31
        // source.
32
        unknownFailureSourceIdx = -1
33
)
34

35
// missionControlDB is an interface that defines the database methods that a
36
// single missionControlStore has access to. It allows the missionControlStore
37
// to be unaware of the overall DB structure and restricts its access to the DB
38
// by only providing it the bucket that it needs to care about.
39
type missionControlDB interface {
40
        // update can be used to perform reads and writes on the given bucket.
41
        update(f func(bkt kvdb.RwBucket) error, reset func()) error
42

43
        // view can be used to perform reads on the given bucket.
44
        view(f func(bkt kvdb.RBucket) error, reset func()) error
45

46
        // purge will delete all the contents in this store.
47
        purge() error
48
}
49

50
// missionControlStore is a bolt db based implementation of a mission control
51
// store. It stores the raw payment attempt data from which the internal mission
52
// controls state can be rederived on startup. This allows the mission control
53
// internal data structure to be changed without requiring a database migration.
54
// Also changes to mission control parameters can be applied to historical data.
55
// Finally, it enables importing raw data from an external source.
56
type missionControlStore struct {
57
        done chan struct{}
58
        wg   sync.WaitGroup
59
        db   missionControlDB
60

61
        // queueCond is signalled when items are put into the queue.
62
        queueCond *sync.Cond
63

64
        // queue stores all pending payment results not yet added to the store.
65
        // Access is protected by the queueCond.L mutex.
66
        queue *list.List
67

68
        // keys holds the stored MC store item keys in the order of storage.
69
        // We use this list when adding/deleting items from the database to
70
        // avoid cursor use which may be slow in the remote DB case.
71
        keys *list.List
72

73
        // keysMap holds the stored MC store item keys. We use this map to check
74
        // if a new payment result has already been stored.
75
        keysMap map[string]struct{}
76

77
        // maxRecords is the maximum amount of records we will store in the db.
78
        maxRecords int
79

80
        // flushInterval is the configured interval we use to store new results
81
        // and delete outdated ones from the db.
82
        flushInterval time.Duration
83
}
84

85
func newMissionControlStore(db missionControlDB, maxRecords int,
86
        flushInterval time.Duration) (*missionControlStore, error) {
37✔
87

37✔
88
        var (
37✔
89
                keys    *list.List
37✔
90
                keysMap map[string]struct{}
37✔
91
        )
37✔
92

37✔
93
        // Create buckets if not yet existing.
37✔
94
        err := db.update(func(resultsBucket kvdb.RwBucket) error {
74✔
95
                // Collect all keys to be able to quickly calculate the
37✔
96
                // difference when updating the DB state.
37✔
97
                c := resultsBucket.ReadCursor()
37✔
98
                for k, _ := c.First(); k != nil; k, _ = c.Next() {
45✔
99
                        keys.PushBack(string(k))
8✔
100
                        keysMap[string(k)] = struct{}{}
8✔
101
                }
8✔
102

103
                return nil
37✔
104
        }, func() {
37✔
105
                keys = list.New()
37✔
106
                keysMap = make(map[string]struct{})
37✔
107
        })
37✔
108
        if err != nil {
37✔
109
                return nil, err
×
110
        }
×
111

112
        log.Infof("Loaded %d mission control entries", len(keysMap))
37✔
113

37✔
114
        return &missionControlStore{
37✔
115
                done:          make(chan struct{}),
37✔
116
                db:            db,
37✔
117
                queueCond:     sync.NewCond(&sync.Mutex{}),
37✔
118
                queue:         list.New(),
37✔
119
                keys:          keys,
37✔
120
                keysMap:       keysMap,
37✔
121
                maxRecords:    maxRecords,
37✔
122
                flushInterval: flushInterval,
37✔
123
        }, nil
37✔
124
}
125

126
// clear removes all results from the db.
127
func (b *missionControlStore) clear() error {
3✔
128
        b.queueCond.L.Lock()
3✔
129
        defer b.queueCond.L.Unlock()
3✔
130

3✔
131
        if err := b.db.purge(); err != nil {
3✔
132
                return err
×
133
        }
×
134

135
        b.queue = list.New()
3✔
136

3✔
137
        return nil
3✔
138
}
139

140
// fetchAll returns all results currently stored in the database.
141
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
46✔
142
        var results []*paymentResult
46✔
143

46✔
144
        err := b.db.view(func(resultBucket kvdb.RBucket) error {
92✔
145
                results = make([]*paymentResult, 0)
46✔
146

46✔
147
                return resultBucket.ForEach(func(k, v []byte) error {
67✔
148
                        result, err := deserializeResult(k, v)
21✔
149
                        if err != nil {
21✔
150
                                return err
×
151
                        }
×
152

153
                        results = append(results, result)
21✔
154

21✔
155
                        return nil
21✔
156
                })
157

158
        }, func() {
46✔
159
                results = nil
46✔
160
        })
46✔
161
        if err != nil {
46✔
162
                return nil, err
×
163
        }
×
164

165
        return results, nil
46✔
166
}
167

168
// serializeResult serializes a payment result and returns a key and value byte
169
// slice to insert into the bucket.
170
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
12✔
171
        // Write timestamps, success status, failure source index and route.
12✔
172
        var b bytes.Buffer
12✔
173

12✔
174
        var dbFailureSourceIdx int32
12✔
175
        if rp.failureSourceIdx == nil {
13✔
176
                dbFailureSourceIdx = unknownFailureSourceIdx
1✔
177
        } else {
12✔
178
                dbFailureSourceIdx = int32(*rp.failureSourceIdx)
11✔
179
        }
11✔
180

181
        err := channeldb.WriteElements(
12✔
182
                &b,
12✔
183
                uint64(rp.timeFwd.UnixNano()),
12✔
184
                uint64(rp.timeReply.UnixNano()),
12✔
185
                rp.success, dbFailureSourceIdx,
12✔
186
        )
12✔
187
        if err != nil {
12✔
188
                return nil, nil, err
×
189
        }
×
190

191
        if err := serializeRoute(&b, rp.route); err != nil {
12✔
192
                return nil, nil, err
×
193
        }
×
194

195
        // Write failure. If there is no failure message, write an empty
196
        // byte slice.
197
        var failureBytes bytes.Buffer
12✔
198
        if rp.failure != nil {
23✔
199
                err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0)
11✔
200
                if err != nil {
11✔
201
                        return nil, nil, err
×
202
                }
×
203
        }
204
        err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes())
12✔
205
        if err != nil {
12✔
206
                return nil, nil, err
×
207
        }
×
208

209
        // Compose key that identifies this result.
210
        key := getResultKey(rp)
12✔
211

12✔
212
        return key, b.Bytes(), nil
12✔
213
}
214

215
// deserializeRoute deserializes the mcRoute from the given io.Reader.
216
func deserializeRoute(r io.Reader) (*mcRoute, error) {
21✔
217
        var rt mcRoute
21✔
218
        if err := channeldb.ReadElements(r, &rt.totalAmount); err != nil {
21✔
219
                return nil, err
×
220
        }
×
221

222
        var pub []byte
21✔
223
        if err := channeldb.ReadElements(r, &pub); err != nil {
21✔
224
                return nil, err
×
225
        }
×
226
        copy(rt.sourcePubKey[:], pub)
21✔
227

21✔
228
        var numHops uint32
21✔
229
        if err := channeldb.ReadElements(r, &numHops); err != nil {
21✔
230
                return nil, err
×
231
        }
×
232

233
        var hops []*mcHop
21✔
234
        for i := uint32(0); i < numHops; i++ {
46✔
235
                hop, err := deserializeHop(r)
25✔
236
                if err != nil {
25✔
237
                        return nil, err
×
238
                }
×
239
                hops = append(hops, hop)
25✔
240
        }
241
        rt.hops = hops
21✔
242

21✔
243
        return &rt, nil
21✔
244
}
245

246
// deserializeHop deserializes the mcHop from the given io.Reader.
247
func deserializeHop(r io.Reader) (*mcHop, error) {
25✔
248
        var h mcHop
25✔
249

25✔
250
        var pub []byte
25✔
251
        if err := channeldb.ReadElements(r, &pub); err != nil {
25✔
252
                return nil, err
×
253
        }
×
254
        copy(h.pubKeyBytes[:], pub)
25✔
255

25✔
256
        if err := channeldb.ReadElements(r,
25✔
257
                &h.channelID, &h.amtToFwd, &h.hasBlindingPoint,
25✔
258
                &h.hasCustomRecords,
25✔
259
        ); err != nil {
25✔
260
                return nil, err
×
261
        }
×
262

263
        return &h, nil
25✔
264
}
265

266
// serializeRoute serializes a mcRoute and writes the resulting bytes to the
267
// given io.Writer.
268
func serializeRoute(w io.Writer, r *mcRoute) error {
12✔
269
        err := channeldb.WriteElements(w, r.totalAmount, r.sourcePubKey[:])
12✔
270
        if err != nil {
12✔
271
                return err
×
272
        }
×
273

274
        if err := channeldb.WriteElements(w, uint32(len(r.hops))); err != nil {
12✔
275
                return err
×
276
        }
×
277

278
        for _, h := range r.hops {
28✔
279
                if err := serializeHop(w, h); err != nil {
16✔
280
                        return err
×
281
                }
×
282
        }
283

284
        return nil
12✔
285
}
286

287
// serializeHop serializes a mcHop and writes the resulting bytes to the given
288
// io.Writer.
289
func serializeHop(w io.Writer, h *mcHop) error {
16✔
290
        return channeldb.WriteElements(w,
16✔
291
                h.pubKeyBytes[:],
16✔
292
                h.channelID,
16✔
293
                h.amtToFwd,
16✔
294
                h.hasBlindingPoint,
16✔
295
                h.hasCustomRecords,
16✔
296
        )
16✔
297
}
16✔
298

299
// deserializeResult deserializes a payment result.
300
func deserializeResult(k, v []byte) (*paymentResult, error) {
21✔
301
        // Parse payment id.
21✔
302
        result := paymentResult{
21✔
303
                id: byteOrder.Uint64(k[8:]),
21✔
304
        }
21✔
305

21✔
306
        r := bytes.NewReader(v)
21✔
307

21✔
308
        // Read timestamps, success status and failure source index.
21✔
309
        var (
21✔
310
                timeFwd, timeReply uint64
21✔
311
                dbFailureSourceIdx int32
21✔
312
        )
21✔
313

21✔
314
        err := channeldb.ReadElements(
21✔
315
                r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx,
21✔
316
        )
21✔
317
        if err != nil {
21✔
318
                return nil, err
×
319
        }
×
320

321
        // Convert time stamps to local time zone for consistent logging.
322
        result.timeFwd = time.Unix(0, int64(timeFwd)).Local()
21✔
323
        result.timeReply = time.Unix(0, int64(timeReply)).Local()
21✔
324

21✔
325
        // Convert from unknown index magic number to nil value.
21✔
326
        if dbFailureSourceIdx != unknownFailureSourceIdx {
41✔
327
                failureSourceIdx := int(dbFailureSourceIdx)
20✔
328
                result.failureSourceIdx = &failureSourceIdx
20✔
329
        }
20✔
330

331
        // Read route.
332
        route, err := deserializeRoute(r)
21✔
333
        if err != nil {
21✔
334
                return nil, err
×
335
        }
×
336
        result.route = route
21✔
337

21✔
338
        // Read failure.
21✔
339
        failureBytes, err := wire.ReadVarBytes(
21✔
340
                r, 0, math.MaxUint16, "failure",
21✔
341
        )
21✔
342
        if err != nil {
21✔
343
                return nil, err
×
344
        }
×
345
        if len(failureBytes) > 0 {
41✔
346
                result.failure, err = lnwire.DecodeFailureMessage(
20✔
347
                        bytes.NewReader(failureBytes), 0,
20✔
348
                )
20✔
349
                if err != nil {
20✔
350
                        return nil, err
×
351
                }
×
352
        }
353

354
        return &result, nil
21✔
355
}
356

357
// AddResult adds a new result to the db.
358
func (b *missionControlStore) AddResult(rp *paymentResult) {
77✔
359
        b.queueCond.L.Lock()
77✔
360
        b.queue.PushBack(rp)
77✔
361
        b.queueCond.L.Unlock()
77✔
362

77✔
363
        b.queueCond.Signal()
77✔
364
}
77✔
365

366
// stop stops the store ticker goroutine.
367
func (b *missionControlStore) stop() {
2✔
368
        close(b.done)
2✔
369

2✔
370
        b.queueCond.Signal()
2✔
371

2✔
372
        b.wg.Wait()
2✔
373
}
2✔
374

375
// run runs the MC store ticker goroutine.
376
func (b *missionControlStore) run() {
2✔
377
        b.wg.Add(1)
2✔
378

2✔
379
        go func() {
4✔
380
                defer b.wg.Done()
2✔
381

2✔
382
                timer := time.NewTimer(b.flushInterval)
2✔
383

2✔
384
                // Immediately stop the timer. It will be started once new
2✔
385
                // items are added to the store. As the doc for time.Timer
2✔
386
                // states, every call to Stop() done on a timer that is not
2✔
387
                // known to have been fired needs to be checked and the timer's
2✔
388
                // channel needs to be drained appropriately. This could happen
2✔
389
                // if the flushInterval is very small (e.g. 1 nanosecond).
2✔
390
                if !timer.Stop() {
2✔
391
                        select {
×
392
                        case <-timer.C:
×
393
                        case <-b.done:
×
394
                                log.Debugf("Stopping mission control store")
×
395
                        }
396
                }
397

398
                for {
7✔
399
                        // Wait for the queue to not be empty.
5✔
400
                        b.queueCond.L.Lock()
5✔
401
                        for b.queue.Front() == nil {
10✔
402
                                b.queueCond.Wait()
5✔
403

5✔
404
                                select {
5✔
405
                                case <-b.done:
2✔
406
                                        b.queueCond.L.Unlock()
2✔
407

2✔
408
                                        return
2✔
409
                                default:
3✔
410
                                }
411
                        }
412
                        b.queueCond.L.Unlock()
3✔
413

3✔
414
                        // Restart the timer.
3✔
415
                        timer.Reset(b.flushInterval)
3✔
416

3✔
417
                        select {
3✔
418
                        case <-timer.C:
3✔
419
                                if err := b.storeResults(); err != nil {
3✔
420
                                        log.Errorf("Failed to update mission "+
×
421
                                                "control store: %v", err)
×
422
                                }
×
423

UNCOV
424
                        case <-b.done:
×
UNCOV
425
                                // Release the timer's resources.
×
UNCOV
426
                                if !timer.Stop() {
×
427
                                        select {
×
428
                                        case <-timer.C:
×
429
                                        case <-b.done:
×
430
                                                log.Debugf("Mission control " +
×
431
                                                        "store stopped")
×
432
                                        }
433
                                }
UNCOV
434
                                return
×
435
                        }
436
                }
437
        }()
438
}
439

440
// storeResults stores all accumulated results.
441
func (b *missionControlStore) storeResults() error {
8✔
442
        // We copy a reference to the queue and clear the original queue to be
8✔
443
        // able to release the lock.
8✔
444
        b.queueCond.L.Lock()
8✔
445
        l := b.queue
8✔
446

8✔
447
        if l.Len() == 0 {
8✔
448
                b.queueCond.L.Unlock()
×
449

×
450
                return nil
×
451
        }
×
452
        b.queue = list.New()
8✔
453
        b.queueCond.L.Unlock()
8✔
454

8✔
455
        var (
8✔
456
                newKeys    map[string]struct{}
8✔
457
                delKeys    []string
8✔
458
                storeCount int
8✔
459
                pruneCount int
8✔
460
        )
8✔
461

8✔
462
        // Create a deduped list of new entries.
8✔
463
        newKeys = make(map[string]struct{}, l.Len())
8✔
464
        for e := l.Front(); e != nil; e = e.Next() {
20✔
465
                pr, ok := e.Value.(*paymentResult)
12✔
466
                if !ok {
12✔
467
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
468
                                e.Value)
×
469
                }
×
470
                key := string(getResultKey(pr))
12✔
471
                if _, ok := b.keysMap[key]; ok {
12✔
472
                        l.Remove(e)
×
473
                        continue
×
474
                }
475
                if _, ok := newKeys[key]; ok {
13✔
476
                        l.Remove(e)
1✔
477
                        continue
1✔
478
                }
479
                newKeys[key] = struct{}{}
11✔
480
        }
481

482
        // Create a list of entries to delete.
483
        toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
8✔
484
        if b.maxRecords > 0 && toDelete > 0 {
11✔
485
                delKeys = make([]string, 0, toDelete)
3✔
486

3✔
487
                // Delete as many as needed from old keys.
3✔
488
                for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
7✔
489
                        key, ok := e.Value.(string)
4✔
490
                        if !ok {
4✔
491
                                return fmt.Errorf("wrong type %T (not string)",
×
492
                                        e.Value)
×
493
                        }
×
494
                        delKeys = append(delKeys, key)
4✔
495
                        e = e.Next()
4✔
496
                }
497

498
                // If more deletions are needed, simply do not add from the
499
                // list of new keys.
500
                for e := l.Front(); len(delKeys) < toDelete && e != nil; {
3✔
501
                        toDelete--
×
502
                        pr, ok := e.Value.(*paymentResult)
×
503
                        if !ok {
×
504
                                return fmt.Errorf("wrong type %T (not "+
×
505
                                        "*paymentResult )", e.Value)
×
506
                        }
×
507
                        key := string(getResultKey(pr))
×
508
                        delete(newKeys, key)
×
509
                        l.Remove(e)
×
510
                        e = l.Front()
×
511
                }
512
        }
513

514
        err := b.db.update(func(bucket kvdb.RwBucket) error {
16✔
515
                for e := l.Front(); e != nil; e = e.Next() {
20✔
516
                        pr, ok := e.Value.(*paymentResult)
12✔
517
                        if !ok {
12✔
518
                                return fmt.Errorf("wrong type %T (not "+
×
519
                                        "*paymentResult)", e.Value)
×
520
                        }
×
521

522
                        // Serialize result into key and value byte slices.
523
                        k, v, err := serializeResult(pr)
12✔
524
                        if err != nil {
12✔
525
                                return err
×
526
                        }
×
527

528
                        // Put into results bucket.
529
                        if err := bucket.Put(k, v); err != nil {
12✔
530
                                return err
×
531
                        }
×
532

533
                        storeCount++
12✔
534
                }
535

536
                // Prune oldest entries.
537
                for _, key := range delKeys {
12✔
538
                        if err := bucket.Delete([]byte(key)); err != nil {
4✔
539
                                return err
×
540
                        }
×
541
                        pruneCount++
4✔
542
                }
543

544
                return nil
8✔
545
        }, func() {
8✔
546
                storeCount, pruneCount = 0, 0
8✔
547
        })
8✔
548

549
        if err != nil {
8✔
550
                return err
×
551
        }
×
552

553
        log.Debugf("Stored mission control results: %d added, %d deleted",
8✔
554
                storeCount, pruneCount)
8✔
555

8✔
556
        // DB Update was successful, update the in-memory cache.
8✔
557
        for _, key := range delKeys {
12✔
558
                delete(b.keysMap, key)
4✔
559
                b.keys.Remove(b.keys.Front())
4✔
560
        }
4✔
561
        for e := l.Front(); e != nil; e = e.Next() {
20✔
562
                pr, ok := e.Value.(*paymentResult)
12✔
563
                if !ok {
12✔
564
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
565
                                e.Value)
×
566
                }
×
567
                key := string(getResultKey(pr))
12✔
568
                b.keys.PushBack(key)
12✔
569
        }
570

571
        return nil
8✔
572
}
573

574
// getResultKey returns a byte slice representing a unique key for this payment
575
// result.
576
func getResultKey(rp *paymentResult) []byte {
36✔
577
        var keyBytes [8 + 8 + 33]byte
36✔
578

36✔
579
        // Identify records by a combination of time, payment id and sender pub
36✔
580
        // key. This allows importing mission control data from an external
36✔
581
        // source without key collisions and keeps the records sorted
36✔
582
        // chronologically.
36✔
583
        byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano()))
36✔
584
        byteOrder.PutUint64(keyBytes[8:], rp.id)
36✔
585
        copy(keyBytes[16:], rp.route.sourcePubKey[:])
36✔
586

36✔
587
        return keyBytes[:]
36✔
588
}
36✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc