• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.08
/routing/missioncontrol_store.go
1
package routing
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "encoding/binary"
7
        "fmt"
8
        "io"
9
        "sync"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/kvdb"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/tlv"
15
)
16

17
var (
18
        // resultsKey is the fixed key under which the attempt results are
19
        // stored.
20
        resultsKey = []byte("missioncontrol-results")
21

22
        // Big endian is the preferred byte order, due to cursor scans over
23
        // integer keys iterating in order.
24
        byteOrder = binary.BigEndian
25
)
26

27
// missionControlDB is an interface that defines the database methods that a
28
// single missionControlStore has access to. It allows the missionControlStore
29
// to be unaware of the overall DB structure and restricts its access to the DB
30
// by only providing it the bucket that it needs to care about.
31
type missionControlDB interface {
32
        // update can be used to perform reads and writes on the given bucket.
33
        update(f func(bkt kvdb.RwBucket) error, reset func()) error
34

35
        // view can be used to perform reads on the given bucket.
36
        view(f func(bkt kvdb.RBucket) error, reset func()) error
37

38
        // purge will delete all the contents in this store.
39
        purge() error
40
}
41

42
// missionControlStore is a bolt db based implementation of a mission control
43
// store. It stores the raw payment attempt data from which the internal mission
44
// controls state can be rederived on startup. This allows the mission control
45
// internal data structure to be changed without requiring a database migration.
46
// Also changes to mission control parameters can be applied to historical data.
47
// Finally, it enables importing raw data from an external source.
48
type missionControlStore struct {
49
        done chan struct{}
50
        wg   sync.WaitGroup
51
        db   missionControlDB
52

53
        // TODO(yy): Remove the usage of sync.Cond - we are better off using
54
        // channes than a Cond as suggested in the official godoc.
55
        //
56
        // queueCond is signalled when items are put into the queue.
57
        queueCond *sync.Cond
58

59
        // queue stores all pending payment results not yet added to the store.
60
        // Access is protected by the queueCond.L mutex.
61
        queue *list.List
62

63
        // keys holds the stored MC store item keys in the order of storage.
64
        // We use this list when adding/deleting items from the database to
65
        // avoid cursor use which may be slow in the remote DB case.
66
        keys *list.List
67

68
        // keysMap holds the stored MC store item keys. We use this map to check
69
        // if a new payment result has already been stored.
70
        keysMap map[string]struct{}
71

72
        // maxRecords is the maximum amount of records we will store in the db.
73
        maxRecords int
74

75
        // flushInterval is the configured interval we use to store new results
76
        // and delete outdated ones from the db.
77
        flushInterval time.Duration
78
}
79

80
func newMissionControlStore(db missionControlDB, maxRecords int,
81
        flushInterval time.Duration) (*missionControlStore, error) {
3✔
82

3✔
83
        var (
3✔
84
                keys    *list.List
3✔
85
                keysMap map[string]struct{}
3✔
86
        )
3✔
87

3✔
88
        // Create buckets if not yet existing.
3✔
89
        err := db.update(func(resultsBucket kvdb.RwBucket) error {
6✔
90
                // Collect all keys to be able to quickly calculate the
3✔
91
                // difference when updating the DB state.
3✔
92
                c := resultsBucket.ReadCursor()
3✔
93
                for k, _ := c.First(); k != nil; k, _ = c.Next() {
6✔
94
                        keys.PushBack(string(k))
3✔
95
                        keysMap[string(k)] = struct{}{}
3✔
96
                }
3✔
97

98
                return nil
3✔
99
        }, func() {
3✔
100
                keys = list.New()
3✔
101
                keysMap = make(map[string]struct{})
3✔
102
        })
3✔
103
        if err != nil {
3✔
104
                return nil, err
×
105
        }
×
106

107
        log.Infof("Loaded %d mission control entries", len(keysMap))
3✔
108

3✔
109
        return &missionControlStore{
3✔
110
                done:          make(chan struct{}),
3✔
111
                db:            db,
3✔
112
                queueCond:     sync.NewCond(&sync.Mutex{}),
3✔
113
                queue:         list.New(),
3✔
114
                keys:          keys,
3✔
115
                keysMap:       keysMap,
3✔
116
                maxRecords:    maxRecords,
3✔
117
                flushInterval: flushInterval,
3✔
118
        }, nil
3✔
119
}
120

121
// clear removes all results from the db.
122
func (b *missionControlStore) clear() error {
3✔
123
        b.queueCond.L.Lock()
3✔
124
        defer b.queueCond.L.Unlock()
3✔
125

3✔
126
        if err := b.db.purge(); err != nil {
3✔
127
                return err
×
128
        }
×
129

130
        b.queue = list.New()
3✔
131

3✔
132
        return nil
3✔
133
}
134

135
// fetchAll returns all results currently stored in the database.
136
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
3✔
137
        var results []*paymentResult
3✔
138

3✔
139
        err := b.db.view(func(resultBucket kvdb.RBucket) error {
6✔
140
                results = make([]*paymentResult, 0)
3✔
141

3✔
142
                return resultBucket.ForEach(func(k, v []byte) error {
6✔
143
                        result, err := deserializeResult(k, v)
3✔
144
                        if err != nil {
3✔
145
                                return err
×
146
                        }
×
147

148
                        results = append(results, result)
3✔
149

3✔
150
                        return nil
3✔
151
                })
152

153
        }, func() {
3✔
154
                results = nil
3✔
155
        })
3✔
156
        if err != nil {
3✔
157
                return nil, err
×
158
        }
×
159

160
        return results, nil
3✔
161
}
162

163
// serializeResult serializes a payment result and returns a key and value byte
164
// slice to insert into the bucket.
165
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
3✔
166
        recordProducers := []tlv.RecordProducer{
3✔
167
                &rp.timeFwd,
3✔
168
                &rp.timeReply,
3✔
169
                &rp.route,
3✔
170
        }
3✔
171

3✔
172
        rp.failure.WhenSome(
3✔
173
                func(failure tlv.RecordT[tlv.TlvType3, paymentFailure]) {
6✔
174
                        recordProducers = append(recordProducers, &failure)
3✔
175
                },
3✔
176
        )
177

178
        // Compose key that identifies this result.
179
        key := getResultKey(rp)
3✔
180

3✔
181
        var buff bytes.Buffer
3✔
182
        err := lnwire.EncodeRecordsTo(
3✔
183
                &buff, lnwire.ProduceRecordsSorted(recordProducers...),
3✔
184
        )
3✔
185
        if err != nil {
3✔
186
                return nil, nil, err
×
187
        }
×
188

189
        return key, buff.Bytes(), err
3✔
190
}
191

192
// deserializeResult deserializes a payment result.
193
func deserializeResult(k, v []byte) (*paymentResult, error) {
3✔
194
        // Parse payment id.
3✔
195
        result := paymentResult{
3✔
196
                id: byteOrder.Uint64(k[8:]),
3✔
197
        }
3✔
198

3✔
199
        failure := tlv.ZeroRecordT[tlv.TlvType3, paymentFailure]()
3✔
200
        recordProducers := []tlv.RecordProducer{
3✔
201
                &result.timeFwd,
3✔
202
                &result.timeReply,
3✔
203
                &result.route,
3✔
204
                &failure,
3✔
205
        }
3✔
206

3✔
207
        r := bytes.NewReader(v)
3✔
208
        typeMap, err := lnwire.DecodeRecords(
3✔
209
                r, lnwire.ProduceRecordsSorted(recordProducers...)...,
3✔
210
        )
3✔
211
        if err != nil {
3✔
212
                return nil, err
×
213
        }
×
214

215
        if _, ok := typeMap[result.failure.TlvType()]; ok {
6✔
216
                result.failure = tlv.SomeRecordT(failure)
3✔
217
        }
3✔
218

219
        return &result, nil
3✔
220
}
221

222
// serializeRoute serializes a mcRoute and writes the resulting bytes to the
223
// given io.Writer.
224
func serializeRoute(w io.Writer, r *mcRoute) error {
3✔
225
        records := lnwire.ProduceRecordsSorted(
3✔
226
                &r.sourcePubKey,
3✔
227
                &r.totalAmount,
3✔
228
                &r.hops,
3✔
229
        )
3✔
230

3✔
231
        return lnwire.EncodeRecordsTo(w, records)
3✔
232
}
3✔
233

234
// deserializeRoute deserializes the mcRoute from the given io.Reader.
235
func deserializeRoute(r io.Reader) (*mcRoute, error) {
3✔
236
        var rt mcRoute
3✔
237
        records := lnwire.ProduceRecordsSorted(
3✔
238
                &rt.sourcePubKey,
3✔
239
                &rt.totalAmount,
3✔
240
                &rt.hops,
3✔
241
        )
3✔
242

3✔
243
        _, err := lnwire.DecodeRecords(r, records...)
3✔
244
        if err != nil {
3✔
245
                return nil, err
×
246
        }
×
247

248
        return &rt, nil
3✔
249
}
250

251
// deserializeHop deserializes the mcHop from the given io.Reader.
252
func deserializeHop(r io.Reader) (*mcHop, error) {
3✔
253
        var (
3✔
254
                h        mcHop
3✔
255
                blinding = tlv.ZeroRecordT[tlv.TlvType3, lnwire.TrueBoolean]()
3✔
256
                custom   = tlv.ZeroRecordT[tlv.TlvType4, lnwire.TrueBoolean]()
3✔
257
        )
3✔
258
        records := lnwire.ProduceRecordsSorted(
3✔
259
                &h.channelID,
3✔
260
                &h.pubKeyBytes,
3✔
261
                &h.amtToFwd,
3✔
262
                &blinding,
3✔
263
                &custom,
3✔
264
        )
3✔
265

3✔
266
        typeMap, err := lnwire.DecodeRecords(r, records...)
3✔
267
        if err != nil {
3✔
268
                return nil, err
×
269
        }
×
270

271
        if _, ok := typeMap[h.hasBlindingPoint.TlvType()]; ok {
3✔
272
                h.hasBlindingPoint = tlv.SomeRecordT(blinding)
×
273
        }
×
274

275
        if _, ok := typeMap[h.hasCustomRecords.TlvType()]; ok {
6✔
276
                h.hasCustomRecords = tlv.SomeRecordT(custom)
3✔
277
        }
3✔
278

279
        return &h, nil
3✔
280
}
281

282
// serializeHop serializes a mcHop and writes the resulting bytes to the given
283
// io.Writer.
284
func serializeHop(w io.Writer, h *mcHop) error {
3✔
285
        recordProducers := []tlv.RecordProducer{
3✔
286
                &h.channelID,
3✔
287
                &h.pubKeyBytes,
3✔
288
                &h.amtToFwd,
3✔
289
        }
3✔
290

3✔
291
        h.hasBlindingPoint.WhenSome(func(
3✔
292
                hasBlinding tlv.RecordT[tlv.TlvType3, lnwire.TrueBoolean]) {
6✔
293

3✔
294
                recordProducers = append(recordProducers, &hasBlinding)
3✔
295
        })
3✔
296

297
        h.hasCustomRecords.WhenSome(func(
3✔
298
                hasCustom tlv.RecordT[tlv.TlvType4, lnwire.TrueBoolean]) {
6✔
299

3✔
300
                recordProducers = append(recordProducers, &hasCustom)
3✔
301
        })
3✔
302

303
        return lnwire.EncodeRecordsTo(
3✔
304
                w, lnwire.ProduceRecordsSorted(recordProducers...),
3✔
305
        )
3✔
306
}
307

308
// AddResult adds a new result to the db.
309
func (b *missionControlStore) AddResult(rp *paymentResult) {
3✔
310
        b.queueCond.L.Lock()
3✔
311
        b.queue.PushBack(rp)
3✔
312
        b.queueCond.L.Unlock()
3✔
313

3✔
314
        b.queueCond.Signal()
3✔
315
}
3✔
316

317
// stop stops the store ticker goroutine.
318
func (b *missionControlStore) stop() {
3✔
319
        close(b.done)
3✔
320

3✔
321
        b.queueCond.Signal()
3✔
322

3✔
323
        b.wg.Wait()
3✔
324
}
3✔
325

326
// run runs the MC store ticker goroutine.
327
func (b *missionControlStore) run() {
3✔
328
        b.wg.Add(1)
3✔
329

3✔
330
        go func() {
6✔
331
                defer b.wg.Done()
3✔
332

3✔
333
                timer := time.NewTimer(b.flushInterval)
3✔
334

3✔
335
                // Immediately stop the timer. It will be started once new
3✔
336
                // items are added to the store. As the doc for time.Timer
3✔
337
                // states, every call to Stop() done on a timer that is not
3✔
338
                // known to have been fired needs to be checked and the timer's
3✔
339
                // channel needs to be drained appropriately. This could happen
3✔
340
                // if the flushInterval is very small (e.g. 1 nanosecond).
3✔
341
                if !timer.Stop() {
3✔
342
                        select {
×
343
                        case <-timer.C:
×
344
                        case <-b.done:
×
345
                                log.Debugf("Stopping mission control store")
×
346
                        }
347
                }
348

349
                for {
6✔
350
                        // Wait for the queue to not be empty.
3✔
351
                        b.queueCond.L.Lock()
3✔
352
                        for b.queue.Front() == nil {
6✔
353
                                // To make sure we can properly stop, we must
3✔
354
                                // read the `done` channel first before
3✔
355
                                // attempting to call `Wait()`. This is due to
3✔
356
                                // the fact when `Signal` is called before the
3✔
357
                                // `Wait` call, the `Wait` call will block
3✔
358
                                // indefinitely.
3✔
359
                                //
3✔
360
                                // TODO(yy): replace this with channels.
3✔
361
                                select {
3✔
362
                                case <-b.done:
3✔
363
                                        b.queueCond.L.Unlock()
3✔
364

3✔
365
                                        return
3✔
366
                                default:
3✔
367
                                }
368

369
                                b.queueCond.Wait()
3✔
370
                        }
371
                        b.queueCond.L.Unlock()
3✔
372

3✔
373
                        // Restart the timer.
3✔
374
                        timer.Reset(b.flushInterval)
3✔
375

3✔
376
                        select {
3✔
377
                        case <-timer.C:
3✔
378
                                if err := b.storeResults(); err != nil {
3✔
379
                                        log.Errorf("Failed to update mission "+
×
380
                                                "control store: %v", err)
×
381
                                }
×
382

383
                        case <-b.done:
3✔
384
                                // Release the timer's resources.
3✔
385
                                if !timer.Stop() {
3✔
386
                                        select {
×
387
                                        case <-timer.C:
×
388
                                        case <-b.done:
×
389
                                                log.Debugf("Mission control " +
×
390
                                                        "store stopped")
×
391
                                        }
392
                                }
393
                                return
3✔
394
                        }
395
                }
396
        }()
397
}
398

399
// storeResults stores all accumulated results.
400
func (b *missionControlStore) storeResults() error {
3✔
401
        // We copy a reference to the queue and clear the original queue to be
3✔
402
        // able to release the lock.
3✔
403
        b.queueCond.L.Lock()
3✔
404
        l := b.queue
3✔
405

3✔
406
        if l.Len() == 0 {
3✔
407
                b.queueCond.L.Unlock()
×
408

×
409
                return nil
×
410
        }
×
411
        b.queue = list.New()
3✔
412
        b.queueCond.L.Unlock()
3✔
413

3✔
414
        var (
3✔
415
                newKeys    map[string]struct{}
3✔
416
                delKeys    []string
3✔
417
                storeCount int
3✔
418
                pruneCount int
3✔
419
        )
3✔
420

3✔
421
        // Create a deduped list of new entries.
3✔
422
        newKeys = make(map[string]struct{}, l.Len())
3✔
423
        for e := l.Front(); e != nil; e = e.Next() {
6✔
424
                pr, ok := e.Value.(*paymentResult)
3✔
425
                if !ok {
3✔
426
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
427
                                e.Value)
×
428
                }
×
429
                key := string(getResultKey(pr))
3✔
430
                if _, ok := b.keysMap[key]; ok {
3✔
431
                        l.Remove(e)
×
432
                        continue
×
433
                }
434
                if _, ok := newKeys[key]; ok {
3✔
UNCOV
435
                        l.Remove(e)
×
UNCOV
436
                        continue
×
437
                }
438
                newKeys[key] = struct{}{}
3✔
439
        }
440

441
        // Create a list of entries to delete.
442
        toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
3✔
443
        if b.maxRecords > 0 && toDelete > 0 {
3✔
UNCOV
444
                delKeys = make([]string, 0, toDelete)
×
UNCOV
445

×
UNCOV
446
                // Delete as many as needed from old keys.
×
UNCOV
447
                for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
×
UNCOV
448
                        key, ok := e.Value.(string)
×
UNCOV
449
                        if !ok {
×
450
                                return fmt.Errorf("wrong type %T (not string)",
×
451
                                        e.Value)
×
452
                        }
×
UNCOV
453
                        delKeys = append(delKeys, key)
×
UNCOV
454
                        e = e.Next()
×
455
                }
456

457
                // If more deletions are needed, simply do not add from the
458
                // list of new keys.
UNCOV
459
                for e := l.Front(); len(delKeys) < toDelete && e != nil; {
×
460
                        toDelete--
×
461
                        pr, ok := e.Value.(*paymentResult)
×
462
                        if !ok {
×
463
                                return fmt.Errorf("wrong type %T (not "+
×
464
                                        "*paymentResult )", e.Value)
×
465
                        }
×
466
                        key := string(getResultKey(pr))
×
467
                        delete(newKeys, key)
×
468
                        l.Remove(e)
×
469
                        e = l.Front()
×
470
                }
471
        }
472

473
        err := b.db.update(func(bucket kvdb.RwBucket) error {
6✔
474
                for e := l.Front(); e != nil; e = e.Next() {
6✔
475
                        pr, ok := e.Value.(*paymentResult)
3✔
476
                        if !ok {
3✔
477
                                return fmt.Errorf("wrong type %T (not "+
×
478
                                        "*paymentResult)", e.Value)
×
479
                        }
×
480

481
                        // Serialize result into key and value byte slices.
482
                        k, v, err := serializeResult(pr)
3✔
483
                        if err != nil {
3✔
484
                                return err
×
485
                        }
×
486

487
                        // Put into results bucket.
488
                        if err := bucket.Put(k, v); err != nil {
3✔
489
                                return err
×
490
                        }
×
491

492
                        storeCount++
3✔
493
                }
494

495
                // Prune oldest entries.
496
                for _, key := range delKeys {
3✔
UNCOV
497
                        if err := bucket.Delete([]byte(key)); err != nil {
×
498
                                return err
×
499
                        }
×
UNCOV
500
                        pruneCount++
×
501
                }
502

503
                return nil
3✔
504
        }, func() {
3✔
505
                storeCount, pruneCount = 0, 0
3✔
506
        })
3✔
507

508
        if err != nil {
3✔
509
                return err
×
510
        }
×
511

512
        log.Debugf("Stored mission control results: %d added, %d deleted",
3✔
513
                storeCount, pruneCount)
3✔
514

3✔
515
        // DB Update was successful, update the in-memory cache.
3✔
516
        for _, key := range delKeys {
3✔
UNCOV
517
                delete(b.keysMap, key)
×
UNCOV
518
                b.keys.Remove(b.keys.Front())
×
UNCOV
519
        }
×
520
        for e := l.Front(); e != nil; e = e.Next() {
6✔
521
                pr, ok := e.Value.(*paymentResult)
3✔
522
                if !ok {
3✔
523
                        return fmt.Errorf("wrong type %T (not *paymentResult)",
×
524
                                e.Value)
×
525
                }
×
526
                key := string(getResultKey(pr))
3✔
527
                b.keys.PushBack(key)
3✔
528
        }
529

530
        return nil
3✔
531
}
532

533
// getResultKey returns a byte slice representing a unique key for this payment
534
// result.
535
func getResultKey(rp *paymentResult) []byte {
3✔
536
        var keyBytes [8 + 8 + 33]byte
3✔
537

3✔
538
        // Identify records by a combination of time, payment id and sender pub
3✔
539
        // key. This allows importing mission control data from an external
3✔
540
        // source without key collisions and keeps the records sorted
3✔
541
        // chronologically.
3✔
542
        byteOrder.PutUint64(keyBytes[:], rp.timeReply.Val)
3✔
543
        byteOrder.PutUint64(keyBytes[8:], rp.id)
3✔
544
        copy(keyBytes[16:], rp.route.Val.sourcePubKey.Val[:])
3✔
545

3✔
546
        return keyBytes[:]
3✔
547
}
3✔
548

549
// failureMessage wraps the lnwire.FailureMessage interface such that we can
550
// apply a Record method and use the failureMessage in a TLV encoded type.
551
type failureMessage struct {
552
        lnwire.FailureMessage
553
}
554

555
// Record returns a TLV record that can be used to encode/decode a list of
556
// failureMessage to/from a TLV stream.
557
func (r *failureMessage) Record() tlv.Record {
3✔
558
        recordSize := func() uint64 {
6✔
559
                var (
3✔
560
                        b   bytes.Buffer
3✔
561
                        buf [8]byte
3✔
562
                )
3✔
563
                if err := encodeFailureMessage(&b, r, &buf); err != nil {
3✔
564
                        panic(err)
×
565
                }
566

567
                return uint64(len(b.Bytes()))
3✔
568
        }
569

570
        return tlv.MakeDynamicRecord(
3✔
571
                0, r, recordSize, encodeFailureMessage, decodeFailureMessage,
3✔
572
        )
3✔
573
}
574

575
func encodeFailureMessage(w io.Writer, val interface{}, _ *[8]byte) error {
3✔
576
        if v, ok := val.(*failureMessage); ok {
6✔
577
                var b bytes.Buffer
3✔
578
                err := lnwire.EncodeFailureMessage(&b, v.FailureMessage, 0)
3✔
579
                if err != nil {
3✔
580
                        return err
×
581
                }
×
582

583
                _, err = w.Write(b.Bytes())
3✔
584

3✔
585
                return err
3✔
586
        }
587

588
        return tlv.NewTypeForEncodingErr(val, "routing.failureMessage")
×
589
}
590

591
func decodeFailureMessage(r io.Reader, val interface{}, _ *[8]byte,
592
        l uint64) error {
3✔
593

3✔
594
        if v, ok := val.(*failureMessage); ok {
6✔
595
                msg, err := lnwire.DecodeFailureMessage(r, 0)
3✔
596
                if err != nil {
3✔
597
                        return err
×
598
                }
×
599

600
                *v = failureMessage{
3✔
601
                        FailureMessage: msg,
3✔
602
                }
3✔
603

3✔
604
                return nil
3✔
605
        }
606

607
        return tlv.NewTypeForDecodingErr(val, "routing.failureMessage", l, l)
×
608
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc