• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11219354629

07 Oct 2024 03:56PM UTC coverage: 58.585% (-0.2%) from 58.814%
11219354629

Pull #9147

github

ziggie1984
fixup! sqlc: migration up script for payments.
Pull Request #9147: [Part 1|3] Introduce SQL Payment schema into LND

130227 of 222287 relevant lines covered (58.59%)

29106.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.33
/routing/missioncontrol_store.go
1
package routing
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "encoding/binary"
7
        "fmt"
8
        "io"
9
        "math"
10
        "sync"
11
        "time"
12

13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/kvdb"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
var (
20
        // resultsKey is the fixed key under which the attempt results are
21
        // stored.
22
        resultsKey = []byte("missioncontrol-results")
23

24
        // Big endian is the preferred byte order, due to cursor scans over
25
        // integer keys iterating in order.
26
        byteOrder = binary.BigEndian
27
)
28

29
const (
30
        // unknownFailureSourceIdx is the database encoding of an unknown error
31
        // source.
32
        unknownFailureSourceIdx = -1
33
)
34

35
// missionControlStore is a bolt db based implementation of a mission control
36
// store. It stores the raw payment attempt data from which the internal mission
37
// controls state can be rederived on startup. This allows the mission control
38
// internal data structure to be changed without requiring a database migration.
39
// Also changes to mission control parameters can be applied to historical data.
40
// Finally, it enables importing raw data from an external source.
41
type missionControlStore struct {
42
        done chan struct{}
43
        wg   sync.WaitGroup
44
        db   kvdb.Backend
45

46
        // queueCond is signalled when items are put into the queue.
47
        queueCond *sync.Cond
48

49
        // queue stores all pending payment results not yet added to the store.
50
        // Access is protected by the queueCond.L mutex.
51
        queue *list.List
52

53
        // keys holds the stored MC store item keys in the order of storage.
54
        // We use this list when adding/deleting items from the database to
55
        // avoid cursor use which may be slow in the remote DB case.
56
        keys *list.List
57

58
        // keysMap holds the stored MC store item keys. We use this map to check
59
        // if a new payment result has already been stored.
60
        keysMap map[string]struct{}
61

62
        // maxRecords is the maximum amount of records we will store in the db.
63
        maxRecords int
64

65
        // flushInterval is the configured interval we use to store new results
66
        // and delete outdated ones from the db.
67
        flushInterval time.Duration
68
}
69

70
func newMissionControlStore(db kvdb.Backend, maxRecords int,
71
        flushInterval time.Duration) (*missionControlStore, error) {
72

73
        var (
74
                keys    *list.List
75
                keysMap map[string]struct{}
76
        )
77

78
        // Create buckets if not yet existing.
79
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
80
                resultsBucket, err := tx.CreateTopLevelBucket(resultsKey)
81
                if err != nil {
82
                        return fmt.Errorf("cannot create results bucket: %w",
83
                                err)
84
                }
85

86
                // Collect all keys to be able to quickly calculate the
39✔
87
                // difference when updating the DB state.
39✔
88
                c := resultsBucket.ReadCursor()
39✔
89
                for k, _ := c.First(); k != nil; k, _ = c.Next() {
39✔
90
                        keys.PushBack(string(k))
39✔
91
                        keysMap[string(k)] = struct{}{}
39✔
92
                }
39✔
93

39✔
94
                return nil
78✔
95
        }, func() {
39✔
96
                keys = list.New()
39✔
97
                keysMap = make(map[string]struct{})
39✔
98
        })
49✔
99
        if err != nil {
10✔
100
                return nil, err
10✔
101
        }
10✔
102

103
        log.Infof("Loaded %d mission control entries", len(keysMap))
39✔
104

39✔
105
        return &missionControlStore{
39✔
106
                done:          make(chan struct{}),
39✔
107
                db:            db,
39✔
108
                queueCond:     sync.NewCond(&sync.Mutex{}),
39✔
109
                queue:         list.New(),
×
110
                keys:          keys,
×
111
                keysMap:       keysMap,
112
                maxRecords:    maxRecords,
39✔
113
                flushInterval: flushInterval,
39✔
114
        }, nil
39✔
115
}
39✔
116

39✔
117
// clear removes all results from the db.
39✔
118
func (b *missionControlStore) clear() error {
39✔
119
        b.queueCond.L.Lock()
39✔
120
        defer b.queueCond.L.Unlock()
39✔
121

39✔
122
        err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
39✔
123
                if err := tx.DeleteTopLevelBucket(resultsKey); err != nil {
39✔
124
                        return err
125
                }
126

127
                _, err := tx.CreateTopLevelBucket(resultsKey)
5✔
128
                return err
5✔
129
        }, func() {})
5✔
130

5✔
131
        if err != nil {
5✔
132
                return err
×
133
        }
×
134

135
        b.queue = list.New()
5✔
136
        return nil
5✔
137
}
5✔
138

139
// fetchAll returns all results currently stored in the database.
140
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
141
        var results []*paymentResult
48✔
142

48✔
143
        err := kvdb.View(b.db, func(tx kvdb.RTx) error {
48✔
144
                resultBucket := tx.ReadBucket(resultsKey)
96✔
145
                results = make([]*paymentResult, 0)
48✔
146

48✔
147
                return resultBucket.ForEach(func(k, v []byte) error {
71✔
148
                        result, err := deserializeResult(k, v)
23✔
149
                        if err != nil {
23✔
150
                                return err
×
151
                        }
×
152

153
                        results = append(results, result)
23✔
154

23✔
155
                        return nil
23✔
156
                })
157

158
        }, func() {
48✔
159
                results = nil
48✔
160
        })
48✔
161
        if err != nil {
48✔
162
                return nil, err
×
163
        }
×
164

165
        return results, nil
48✔
166
}
167

168
// serializeResult serializes a payment result and returns a key and value byte
169
// slice to insert into the bucket.
170
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
14✔
171
        // Write timestamps, success status, failure source index and route.
14✔
172
        var b bytes.Buffer
14✔
173

14✔
174
        var dbFailureSourceIdx int32
14✔
175
        if rp.failureSourceIdx == nil {
17✔
176
                dbFailureSourceIdx = unknownFailureSourceIdx
3✔
177
        } else {
16✔
178
                dbFailureSourceIdx = int32(*rp.failureSourceIdx)
13✔
179
        }
13✔
180

181
        err := channeldb.WriteElements(
14✔
182
                &b,
14✔
183
                uint64(rp.timeFwd.UnixNano()),
14✔
184
                uint64(rp.timeReply.UnixNano()),
14✔
185
                rp.success, dbFailureSourceIdx,
14✔
186
        )
14✔
187
        if err != nil {
14✔
188
                return nil, nil, err
×
189
        }
×
190

191
        if err := serializeRoute(&b, rp.route); err != nil {
14✔
192
                return nil, nil, err
×
193
        }
×
194

195
        // Write failure. If there is no failure message, write an empty
196
        // byte slice.
197
        var failureBytes bytes.Buffer
14✔
198
        if rp.failure != nil {
27✔
199
                err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0)
13✔
200
                if err != nil {
13✔
201
                        return nil, nil, err
×
202
                }
×
203
        }
204
        err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes())
14✔
205
        if err != nil {
14✔
206
                return nil, nil, err
×
207
        }
×
208

209
        // Compose key that identifies this result.
210
        key := getResultKey(rp)
14✔
211

14✔
212
        return key, b.Bytes(), nil
14✔
213
}
214

215
// deserializeRoute deserializes the mcRoute from the given io.Reader.
216
func deserializeRoute(r io.Reader) (*mcRoute, error) {
23✔
217
        var rt mcRoute
23✔
218
        if err := channeldb.ReadElements(r, &rt.totalAmount); err != nil {
23✔
219
                return nil, err
×
220
        }
×
221

222
        var pub []byte
23✔
223
        if err := channeldb.ReadElements(r, &pub); err != nil {
23✔
224
                return nil, err
×
225
        }
×
226
        copy(rt.sourcePubKey[:], pub)
23✔
227

23✔
228
        var numHops uint32
23✔
229
        if err := channeldb.ReadElements(r, &numHops); err != nil {
23✔
230
                return nil, err
×
231
        }
×
232

233
        var hops []*mcHop
23✔
234
        for i := uint32(0); i < numHops; i++ {
50✔
235
                hop, err := deserializeHop(r)
27✔
236
                if err != nil {
27✔
237
                        return nil, err
×
238
                }
×
239
                hops = append(hops, hop)
27✔
240
        }
241
        rt.hops = hops
23✔
242

23✔
243
        return &rt, nil
23✔
244
}
245

246
// deserializeHop deserializes the mcHop from the given io.Reader.
247
func deserializeHop(r io.Reader) (*mcHop, error) {
27✔
248
        var h mcHop
27✔
249

27✔
250
        var pub []byte
27✔
251
        if err := channeldb.ReadElements(r, &pub); err != nil {
27✔
252
                return nil, err
×
253
        }
×
254
        copy(h.pubKeyBytes[:], pub)
27✔
255

27✔
256
        if err := channeldb.ReadElements(r,
27✔
257
                &h.channelID, &h.amtToFwd, &h.hasBlindingPoint,
27✔
258
                &h.hasCustomRecords,
27✔
259
        ); err != nil {
27✔
260
                return nil, err
×
261
        }
×
262

263
        return &h, nil
27✔
264
}
265

266
// serializeRoute serializes a mcRoute and writes the resulting bytes to the
267
// given io.Writer.
268
func serializeRoute(w io.Writer, r *mcRoute) error {
14✔
269
        err := channeldb.WriteElements(w, r.totalAmount, r.sourcePubKey[:])
14✔
270
        if err != nil {
14✔
271
                return err
×
272
        }
×
273

274
        if err := channeldb.WriteElements(w, uint32(len(r.hops))); err != nil {
14✔
275
                return err
×
276
        }
×
277

278
        for _, h := range r.hops {
32✔
279
                if err := serializeHop(w, h); err != nil {
18✔
280
                        return err
×
281
                }
×
282
        }
283

284
        return nil
14✔
285
}
286

287
// serializeHop serializes a mcHop and writes the resulting bytes to the given
288
// io.Writer.
289
func serializeHop(w io.Writer, h *mcHop) error {
18✔
290
        return channeldb.WriteElements(w,
18✔
291
                h.pubKeyBytes[:],
18✔
292
                h.channelID,
18✔
293
                h.amtToFwd,
18✔
294
                h.hasBlindingPoint,
18✔
295
                h.hasCustomRecords,
18✔
296
        )
18✔
297
}
18✔
298

299
// deserializeResult deserializes a payment result.
300
func deserializeResult(k, v []byte) (*paymentResult, error) {
23✔
301
        // Parse payment id.
23✔
302
        result := paymentResult{
23✔
303
                id: byteOrder.Uint64(k[8:]),
23✔
304
        }
23✔
305

23✔
306
        r := bytes.NewReader(v)
23✔
307

23✔
308
        // Read timestamps, success status and failure source index.
23✔
309
        var (
23✔
310
                timeFwd, timeReply uint64
23✔
311
                dbFailureSourceIdx int32
23✔
312
        )
23✔
313

23✔
314
        err := channeldb.ReadElements(
23✔
315
                r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx,
23✔
316
        )
23✔
317
        if err != nil {
23✔
318
                return nil, err
×
319
        }
×
320

321
        // Convert time stamps to local time zone for consistent logging.
322
        result.timeFwd = time.Unix(0, int64(timeFwd)).Local()
23✔
323
        result.timeReply = time.Unix(0, int64(timeReply)).Local()
23✔
324

23✔
325
        // Convert from unknown index magic number to nil value.
23✔
326
        if dbFailureSourceIdx != unknownFailureSourceIdx {
45✔
327
                failureSourceIdx := int(dbFailureSourceIdx)
22✔
328
                result.failureSourceIdx = &failureSourceIdx
22✔
329
        }
22✔
330

331
        // Read route.
332
        route, err := deserializeRoute(r)
23✔
333
        if err != nil {
23✔
334
                return nil, err
×
335
        }
×
336
        result.route = route
23✔
337

23✔
338
        // Read failure.
23✔
339
        failureBytes, err := wire.ReadVarBytes(
23✔
340
                r, 0, math.MaxUint16, "failure",
23✔
341
        )
23✔
342
        if err != nil {
23✔
343
                return nil, err
×
344
        }
×
345
        if len(failureBytes) > 0 {
45✔
346
                result.failure, err = lnwire.DecodeFailureMessage(
22✔
347
                        bytes.NewReader(failureBytes), 0,
22✔
348
                )
22✔
349
                if err != nil {
22✔
350
                        return nil, err
×
351
                }
×
352
        }
353

354
        return &result, nil
23✔
355
}
356

357
// AddResult adds a new result to the db.
358
func (b *missionControlStore) AddResult(rp *paymentResult) {
79✔
359
        b.queueCond.L.Lock()
79✔
360
        b.queue.PushBack(rp)
79✔
361
        b.queueCond.L.Unlock()
79✔
362

79✔
363
        b.queueCond.Signal()
79✔
364
}
79✔
365

366
// stop stops the store ticker goroutine.
367
func (b *missionControlStore) stop() {
4✔
368
        close(b.done)
4✔
369

4✔
370
        b.queueCond.Signal()
4✔
371

4✔
372
        b.wg.Wait()
4✔
373
}
4✔
374

375
// run runs the MC store ticker goroutine.
376
func (b *missionControlStore) run() {
4✔
377
        b.wg.Add(1)
4✔
378

4✔
379
        go func() {
8✔
380
                defer b.wg.Done()
4✔
381

4✔
382
                timer := time.NewTimer(b.flushInterval)
4✔
383

4✔
384
                // Immediately stop the timer. It will be started once new
4✔
385
                // items are added to the store. As the doc for time.Timer
4✔
386
                // states, every call to Stop() done on a timer that is not
4✔
387
                // known to have been fired needs to be checked and the timer's
4✔
388
                // channel needs to be drained appropriately. This could happen
4✔
389
                // if the flushInterval is very small (e.g. 1 nanosecond).
4✔
390
                if !timer.Stop() {
4✔
391
                        select {
×
392
                        case <-timer.C:
×
393
                        case <-b.done:
×
394
                                log.Debugf("Stopping mission control store")
×
395
                        }
396
                }
397

398
                for {
11✔
399
                        // Wait for the queue to not be empty.
7✔
400
                        b.queueCond.L.Lock()
7✔
401
                        for b.queue.Front() == nil {
14✔
402
                                b.queueCond.Wait()
7✔
403

7✔
404
                                select {
7✔
405
                                case <-b.done:
4✔
406
                                        b.queueCond.L.Unlock()
4✔
407

4✔
408
                                        return
4✔
409
                                default:
5✔
410
                                }
411
                        }
412
                        b.queueCond.L.Unlock()
5✔
413

5✔
414
                        // Restart the timer.
5✔
415
                        timer.Reset(b.flushInterval)
5✔
416

5✔
417
                        select {
5✔
418
                        case <-timer.C:
5✔
419
                                if err := b.storeResults(); err != nil {
5✔
420
                                        log.Errorf("Failed to update mission "+
×
421
                                                "control store: %v", err)
×
422
                                }
×
423

424
                        case <-b.done:
2✔
425
                                // Release the timer's resources.
2✔
426
                                if !timer.Stop() {
2✔
427
                                        select {
×
428
                                        case <-timer.C:
×
429
                                        case <-b.done:
×
430
                                                log.Debugf("Mission control " +
×
431
                                                        "store stopped")
×
432
                                        }
433
                                }
434
                                return
2✔
435
                        }
436
                }
437
        }()
438
}
439

440
// storeResults stores all accumulated results.
441
func (b *missionControlStore) storeResults() error {
10✔
442
        // We copy a reference to the queue and clear the original queue to be
10✔
443
        // able to release the lock.
10✔
444
        b.queueCond.L.Lock()
10✔
445
        l := b.queue
10✔
446

10✔
447
        if l.Len() == 0 {
10✔
448
                b.queueCond.L.Unlock()
×
449

×
450
                return nil
×
451
        }
×
452
        b.queue = list.New()
10✔
453
        b.queueCond.L.Unlock()
10✔
454

10✔
455
        var (
10✔
456
                newKeys    map[string]struct{}
10✔
457
                delKeys    []string
10✔
458
                storeCount int
10✔
459
                pruneCount int
10✔
460
        )
10✔
461

10✔
462
        // Create a deduped list of new entries.
10✔
463
        newKeys = make(map[string]struct{}, l.Len())
10✔
464
        for e := l.Front(); e != nil; e = e.Next() {
24✔
465
                pr, ok := e.Value.(*paymentResult)
14✔
466
                if !ok {
14✔
467
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
468
                                e.Value)
×
469
                }
×
470
                key := string(getResultKey(pr))
14✔
471
                if _, ok := b.keysMap[key]; ok {
14✔
472
                        l.Remove(e)
×
473
                        continue
×
474
                }
475
                if _, ok := newKeys[key]; ok {
15✔
476
                        l.Remove(e)
1✔
477
                        continue
1✔
478
                }
479
                newKeys[key] = struct{}{}
13✔
480
        }
481

482
        // Create a list of entries to delete.
483
        toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
10✔
484
        if b.maxRecords > 0 && toDelete > 0 {
13✔
485
                delKeys = make([]string, 0, toDelete)
3✔
486

3✔
487
                // Delete as many as needed from old keys.
3✔
488
                for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
7✔
489
                        key, ok := e.Value.(string)
4✔
490
                        if !ok {
4✔
491
                                return fmt.Errorf("wrong type %T (not string)",
×
492
                                        e.Value)
×
493
                        }
×
494
                        delKeys = append(delKeys, key)
4✔
495
                        e = e.Next()
4✔
496
                }
497

498
                // If more deletions are needed, simply do not add from the
499
                // list of new keys.
500
                for e := l.Front(); len(delKeys) < toDelete && e != nil; {
3✔
501
                        toDelete--
×
502
                        pr, ok := e.Value.(*paymentResult)
×
503
                        if !ok {
×
504
                                return fmt.Errorf("wrong type %T (not "+
×
505
                                        "*paymentResult )", e.Value)
×
506
                        }
×
507
                        key := string(getResultKey(pr))
×
508
                        delete(newKeys, key)
×
509
                        l.Remove(e)
×
510
                        e = l.Front()
×
511
                }
512
        }
513

514
        err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
20✔
515
                bucket := tx.ReadWriteBucket(resultsKey)
24✔
516

14✔
517
                for e := l.Front(); e != nil; e = e.Next() {
14✔
518
                        pr, ok := e.Value.(*paymentResult)
×
519
                        if !ok {
×
520
                                return fmt.Errorf("wrong type %T (not "+
×
521
                                        "*paymentResult)", e.Value)
522
                        }
523

14✔
524
                        // Serialize result into key and value byte slices.
14✔
525
                        k, v, err := serializeResult(pr)
×
526
                        if err != nil {
×
527
                                return err
528
                        }
529

14✔
530
                        // Put into results bucket.
×
531
                        if err := bucket.Put(k, v); err != nil {
×
532
                                return err
533
                        }
14✔
534

535
                        storeCount++
536
                }
537

14✔
538
                // Prune oldest entries.
4✔
539
                for _, key := range delKeys {
×
540
                        if err := bucket.Delete([]byte(key)); err != nil {
×
541
                                return err
4✔
542
                        }
543
                        pruneCount++
544
                }
10✔
545

10✔
546
                return nil
10✔
547
        }, func() {
10✔
548
                storeCount, pruneCount = 0, 0
549
        })
10✔
550

×
551
        if err != nil {
×
552
                return err
553
        }
10✔
554

10✔
555
        log.Debugf("Stored mission control results: %d added, %d deleted",
10✔
556
                storeCount, pruneCount)
10✔
557

14✔
558
        // DB Update was successful, update the in-memory cache.
4✔
559
        for _, key := range delKeys {
4✔
560
                delete(b.keysMap, key)
4✔
561
                b.keys.Remove(b.keys.Front())
24✔
562
        }
14✔
563
        for e := l.Front(); e != nil; e = e.Next() {
14✔
564
                pr, ok := e.Value.(*paymentResult)
×
565
                if !ok {
×
566
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
567
                                e.Value)
14✔
568
                }
14✔
569
                key := string(getResultKey(pr))
570
                b.keys.PushBack(key)
571
        }
10✔
572

573
        return nil
574
}
575

576
// getResultKey returns a byte slice representing a unique key for this payment
38✔
577
// result.
38✔
578
func getResultKey(rp *paymentResult) []byte {
38✔
579
        var keyBytes [8 + 8 + 33]byte
38✔
580

38✔
581
        // Identify records by a combination of time, payment id and sender pub
38✔
582
        // key. This allows importing mission control data from an external
38✔
583
        // source without key collisions and keeps the records sorted
38✔
584
        // chronologically.
38✔
585
        byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano()))
38✔
586
        byteOrder.PutUint64(keyBytes[8:], rp.id)
38✔
587
        copy(keyBytes[16:], rp.route.sourcePubKey[:])
38✔
588

38✔
589
        return keyBytes[:]
590
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc