• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.84
/watchtower/wtdb/queue.go
1
package wtdb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
)
12

13
var (
14
        // queueMainBkt will hold the main queue contents. It will have the
15
        // following structure:
16
        //                         => oldestIndexKey => oldest index
17
        //                      => nextIndexKey => newest index
18
        //                        => itemsBkt => <index> -> item
19
        //
20
        // Any items added to the queue via Push, will be added to this queue.
21
        // Items will only be popped from this queue if the head queue is empty.
22
        queueMainBkt = []byte("queue-main")
23

24
        // queueHeadBkt will hold the items that have been pushed to the head
25
        // of the queue. It will have the following structure:
26
        //                         => oldestIndexKey => oldest index
27
        //                      => nextIndexKey => newest index
28
        //                        => itemsBkt => <index> -> item
29
        //
30
        // If PushHead is called with a new set of items, then first all
31
        // remaining items in the head queue will be popped and added ot the
32
        // given set of items. Then, once the head queue is empty, the set of
33
        // items will be pushed to the queue. If this queue is not empty, then
34
        // Pop will pop items from this queue before popping from the main
35
        // queue.
36
        queueHeadBkt = []byte("queue-head")
37

38
        // itemsBkt is a sub-bucket of both the main and head queue storing:
39
        //                 index -> encoded item
40
        itemsBkt = []byte("items")
41

42
        // oldestIndexKey is a key of both the main and head queue storing the
43
        // index of the item at the head of the queue.
44
        oldestIndexKey = []byte("oldest-index")
45

46
        // nextIndexKey is a key of both the main and head queue storing the
47
        // index of the item at the tail of the queue.
48
        nextIndexKey = []byte("next-index")
49

50
        // ErrEmptyQueue is returned from Pop if there are no items left in
51
        // the queue.
52
        ErrEmptyQueue = errors.New("queue is empty")
53
)
54

55
// Queue is an interface describing a FIFO queue for any generic type T.
56
type Queue[T any] interface {
57
        // Len returns the number of tasks in the queue.
58
        Len() (uint64, error)
59

60
        // Push pushes new T items to the tail of the queue.
61
        Push(items ...T) error
62

63
        // PopUpTo attempts to pop up to n items from the head of the queue. If
64
        // no more items are in the queue then ErrEmptyQueue is returned.
65
        PopUpTo(n int) ([]T, error)
66

67
        // PushHead pushes new T items to the head of the queue.
68
        PushHead(items ...T) error
69
}
70

71
// Serializable is an interface must be satisfied for any type that the
72
// DiskQueueDB should handle.
73
type Serializable interface {
74
        Encode(w io.Writer) error
75
        Decode(r io.Reader) error
76
}
77

78
// DiskQueueDB is a generic Bolt DB implementation of the Queue interface.
79
type DiskQueueDB[T Serializable] struct {
80
        db          kvdb.Backend
81
        topLevelBkt []byte
82
        constructor func() T
83
        onItemWrite func(tx kvdb.RwTx, item T) error
84
}
85

86
// A compile-time check to ensure that DiskQueueDB implements the Queue
87
// interface.
88
var _ Queue[Serializable] = (*DiskQueueDB[Serializable])(nil)
89

90
// NewQueueDB constructs a new DiskQueueDB. A queueBktName must be provided so
91
// that the DiskQueueDB can create its own namespace in the bolt db.
92
func NewQueueDB[T Serializable](db kvdb.Backend, queueBktName []byte,
93
        constructor func() T,
94
        onItemWrite func(tx kvdb.RwTx, item T) error) Queue[T] {
3✔
95

3✔
96
        return &DiskQueueDB[T]{
3✔
97
                db:          db,
3✔
98
                topLevelBkt: queueBktName,
3✔
99
                constructor: constructor,
3✔
100
                onItemWrite: onItemWrite,
3✔
101
        }
3✔
102
}
3✔
103

104
// Len returns the number of tasks in the queue.
105
//
106
// NOTE: This is part of the Queue interface.
107
func (d *DiskQueueDB[T]) Len() (uint64, error) {
3✔
108
        var res uint64
3✔
109
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
6✔
110
                var err error
3✔
111
                res, err = d.len(tx)
3✔
112

3✔
113
                return err
3✔
114
        }, func() {
6✔
115
                res = 0
3✔
116
        })
3✔
117
        if err != nil {
3✔
118
                return 0, err
×
119
        }
×
120

121
        return res, nil
3✔
122
}
123

124
// Push adds a T to the tail of the queue.
125
//
126
// NOTE: This is part of the Queue interface.
127
func (d *DiskQueueDB[T]) Push(items ...T) error {
3✔
128
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
6✔
129
                for _, item := range items {
3✔
UNCOV
130
                        err := d.addItem(tx, queueMainBkt, item)
×
UNCOV
131
                        if err != nil {
×
132
                                return err
×
133
                        }
×
134
                }
135

136
                return nil
3✔
137
        }, func() {})
3✔
138
}
139

140
// PopUpTo attempts to pop up to n items from the queue. If the queue is empty,
141
// then ErrEmptyQueue is returned.
142
//
143
// NOTE: This is part of the Queue interface.
UNCOV
144
func (d *DiskQueueDB[T]) PopUpTo(n int) ([]T, error) {
×
UNCOV
145
        var items []T
×
UNCOV
146

×
UNCOV
147
        err := d.db.Update(func(tx walletdb.ReadWriteTx) error {
×
UNCOV
148
                // Get the number of items in the queue.
×
UNCOV
149
                l, err := d.len(tx)
×
UNCOV
150
                if err != nil {
×
151
                        return err
×
152
                }
×
153

154
                // If there are no items, then we are done.
UNCOV
155
                if l == 0 {
×
UNCOV
156
                        return ErrEmptyQueue
×
UNCOV
157
                }
×
158

159
                // If the number of items in the queue is less than the maximum
160
                // specified by the caller, then set the maximum to the number
161
                // of items that there actually are.
UNCOV
162
                num := n
×
UNCOV
163
                if l < uint64(n) {
×
UNCOV
164
                        num = int(l)
×
UNCOV
165
                }
×
166

167
                // Pop the specified number of items off of the queue.
UNCOV
168
                items = make([]T, 0, num)
×
UNCOV
169
                for i := 0; i < num; i++ {
×
UNCOV
170
                        item, err := d.pop(tx)
×
UNCOV
171
                        if err != nil {
×
172
                                return err
×
173
                        }
×
174

UNCOV
175
                        items = append(items, item)
×
176
                }
177

UNCOV
178
                return err
×
UNCOV
179
        }, func() {
×
UNCOV
180
                items = nil
×
UNCOV
181
        })
×
UNCOV
182
        if err != nil {
×
UNCOV
183
                return nil, err
×
UNCOV
184
        }
×
185

UNCOV
186
        return items, nil
×
187
}
188

189
// PushHead pushes new T items to the head of the queue. For this implementation
190
// of the Queue interface, this will require popping all items currently in the
191
// head queue and adding them after first adding the given list of items. Care
192
// should thus be taken to never have an unbounded number of items in the head
193
// queue.
194
//
195
// NOTE: This is part of the Queue interface.
196
func (d *DiskQueueDB[T]) PushHead(items ...T) error {
3✔
197
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
6✔
198
                // Determine how many items are still in the head queue.
3✔
199
                numHead, err := d.numItems(tx, queueHeadBkt)
3✔
200
                if err != nil {
3✔
201
                        return err
×
202
                }
×
203

204
                // Create a new in-memory list that will contain all the new
205
                // items along with the items currently in the queue.
206
                itemList := make([]T, 0, int(numHead)+len(items))
3✔
207

3✔
208
                // Insert all the given items into the list first since these
3✔
209
                // should be at the head of the queue.
3✔
210
                itemList = append(itemList, items...)
3✔
211

3✔
212
                // Now, read out all the items that are currently in the
3✔
213
                // persisted head queue and add them to the back of the list
3✔
214
                // of items to be added.
3✔
215
                for {
6✔
216
                        t, err := d.nextItem(tx, queueHeadBkt)
3✔
217
                        if errors.Is(err, ErrEmptyQueue) {
6✔
218
                                break
3✔
UNCOV
219
                        } else if err != nil {
×
220
                                return err
×
221
                        }
×
222

UNCOV
223
                        itemList = append(itemList, t)
×
224
                }
225

226
                // Now the head queue is empty, the items can be pushed to the
227
                // queue.
228
                for _, item := range itemList {
3✔
UNCOV
229
                        err := d.addItem(tx, queueHeadBkt, item)
×
UNCOV
230
                        if err != nil {
×
231
                                return err
×
232
                        }
×
233
                }
234

235
                return nil
3✔
236
        }, func() {})
3✔
237
}
238

239
// pop gets the next T item from the head of the queue. If no more items are in
240
// the queue then ErrEmptyQueue is returned.
UNCOV
241
func (d *DiskQueueDB[T]) pop(tx walletdb.ReadWriteTx) (T, error) {
×
UNCOV
242
        // First, check if there are items left in the head queue.
×
UNCOV
243
        item, err := d.nextItem(tx, queueHeadBkt)
×
UNCOV
244

×
UNCOV
245
        // No error means that an item was found in the head queue.
×
UNCOV
246
        if err == nil {
×
UNCOV
247
                return item, nil
×
UNCOV
248
        }
×
249

250
        // Else, if error is not ErrEmptyQueue, then return the error.
UNCOV
251
        if !errors.Is(err, ErrEmptyQueue) {
×
252
                return item, err
×
253
        }
×
254

255
        // Otherwise, the head queue is empty, so we now check if there are
256
        // items in the main queue.
UNCOV
257
        return d.nextItem(tx, queueMainBkt)
×
258
}
259

260
// addItem adds the given item to the back of the given queue.
UNCOV
261
func (d *DiskQueueDB[T]) addItem(tx kvdb.RwTx, queueName []byte, item T) error {
×
UNCOV
262
        var (
×
UNCOV
263
                namespacedBkt = tx.ReadWriteBucket(d.topLevelBkt)
×
UNCOV
264
                err           error
×
UNCOV
265
        )
×
UNCOV
266
        if namespacedBkt == nil {
×
UNCOV
267
                namespacedBkt, err = tx.CreateTopLevelBucket(d.topLevelBkt)
×
UNCOV
268
                if err != nil {
×
269
                        return err
×
270
                }
×
271
        }
272

UNCOV
273
        mainTasksBucket, err := namespacedBkt.CreateBucketIfNotExists(
×
UNCOV
274
                cTaskQueue,
×
UNCOV
275
        )
×
UNCOV
276
        if err != nil {
×
277
                return err
×
278
        }
×
279

UNCOV
280
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
×
UNCOV
281
        if err != nil {
×
282
                return err
×
283
        }
×
284

UNCOV
285
        if d.onItemWrite != nil {
×
UNCOV
286
                err = d.onItemWrite(tx, item)
×
UNCOV
287
                if err != nil {
×
288
                        return err
×
289
                }
×
290
        }
291

292
        // Find the index to use for placing this new item at the back of the
293
        // queue.
UNCOV
294
        var nextIndex uint64
×
UNCOV
295
        nextIndexB := bucket.Get(nextIndexKey)
×
UNCOV
296
        if nextIndexB != nil {
×
UNCOV
297
                nextIndex, err = readBigSize(nextIndexB)
×
UNCOV
298
                if err != nil {
×
299
                        return err
×
300
                }
×
UNCOV
301
        } else {
×
UNCOV
302
                nextIndexB, err = writeBigSize(0)
×
UNCOV
303
                if err != nil {
×
304
                        return err
×
305
                }
×
306
        }
307

UNCOV
308
        tasksBucket, err := bucket.CreateBucketIfNotExists(itemsBkt)
×
UNCOV
309
        if err != nil {
×
310
                return err
×
311
        }
×
312

UNCOV
313
        var buff bytes.Buffer
×
UNCOV
314
        err = item.Encode(&buff)
×
UNCOV
315
        if err != nil {
×
316
                return err
×
317
        }
×
318

319
        // Put the new task in the assigned index.
UNCOV
320
        err = tasksBucket.Put(nextIndexB, buff.Bytes())
×
UNCOV
321
        if err != nil {
×
322
                return err
×
323
        }
×
324

325
        // Increment the next-index counter.
UNCOV
326
        nextIndex++
×
UNCOV
327
        nextIndexB, err = writeBigSize(nextIndex)
×
UNCOV
328
        if err != nil {
×
329
                return err
×
330
        }
×
331

UNCOV
332
        return bucket.Put(nextIndexKey, nextIndexB)
×
333
}
334

335
// nextItem pops an item of the queue identified by the given namespace. If
336
// there are no items on the queue then ErrEmptyQueue is returned.
337
func (d *DiskQueueDB[T]) nextItem(tx kvdb.RwTx, queueName []byte) (T, error) {
3✔
338
        task := d.constructor()
3✔
339

3✔
340
        namespacedBkt := tx.ReadWriteBucket(d.topLevelBkt)
3✔
341
        if namespacedBkt == nil {
6✔
342
                return task, ErrEmptyQueue
3✔
343
        }
3✔
344

UNCOV
345
        mainTasksBucket := namespacedBkt.NestedReadWriteBucket(cTaskQueue)
×
UNCOV
346
        if mainTasksBucket == nil {
×
347
                return task, ErrEmptyQueue
×
348
        }
×
349

UNCOV
350
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
×
UNCOV
351
        if err != nil {
×
352
                return task, err
×
353
        }
×
354

355
        // Get the index of the tail of the queue.
UNCOV
356
        var nextIndex uint64
×
UNCOV
357
        nextIndexB := bucket.Get(nextIndexKey)
×
UNCOV
358
        if nextIndexB != nil {
×
UNCOV
359
                nextIndex, err = readBigSize(nextIndexB)
×
UNCOV
360
                if err != nil {
×
361
                        return task, err
×
362
                }
×
363
        }
364

365
        // Get the index of the head of the queue.
UNCOV
366
        var oldestIndex uint64
×
UNCOV
367
        oldestIndexB := bucket.Get(oldestIndexKey)
×
UNCOV
368
        if oldestIndexB != nil {
×
UNCOV
369
                oldestIndex, err = readBigSize(oldestIndexB)
×
UNCOV
370
                if err != nil {
×
371
                        return task, err
×
372
                }
×
UNCOV
373
        } else {
×
UNCOV
374
                oldestIndexB, err = writeBigSize(0)
×
UNCOV
375
                if err != nil {
×
376
                        return task, err
×
377
                }
×
378
        }
379

380
        // If the head and tail are equal, then there are no items in the queue.
UNCOV
381
        if oldestIndex == nextIndex {
×
UNCOV
382
                // Take this opportunity to reset both indexes to zero.
×
UNCOV
383
                zeroIndexB, err := writeBigSize(0)
×
UNCOV
384
                if err != nil {
×
385
                        return task, err
×
386
                }
×
387

UNCOV
388
                err = bucket.Put(oldestIndexKey, zeroIndexB)
×
UNCOV
389
                if err != nil {
×
390
                        return task, err
×
391
                }
×
392

UNCOV
393
                err = bucket.Put(nextIndexKey, zeroIndexB)
×
UNCOV
394
                if err != nil {
×
395
                        return task, err
×
396
                }
×
397

UNCOV
398
                return task, ErrEmptyQueue
×
399
        }
400

401
        // Otherwise, pop the item at the oldest index.
UNCOV
402
        tasksBucket := bucket.NestedReadWriteBucket(itemsBkt)
×
UNCOV
403
        if tasksBucket == nil {
×
404
                return task, fmt.Errorf("client-tasks bucket not found")
×
405
        }
×
406

UNCOV
407
        item := tasksBucket.Get(oldestIndexB)
×
UNCOV
408
        if item == nil {
×
409
                return task, fmt.Errorf("no task found under index")
×
410
        }
×
411

UNCOV
412
        err = tasksBucket.Delete(oldestIndexB)
×
UNCOV
413
        if err != nil {
×
414
                return task, err
×
415
        }
×
416

417
        // Increment the oldestIndex value so that it now points to the new
418
        // oldest item.
UNCOV
419
        oldestIndex++
×
UNCOV
420
        oldestIndexB, err = writeBigSize(oldestIndex)
×
UNCOV
421
        if err != nil {
×
422
                return task, err
×
423
        }
×
424

UNCOV
425
        err = bucket.Put(oldestIndexKey, oldestIndexB)
×
UNCOV
426
        if err != nil {
×
427
                return task, err
×
428
        }
×
429

UNCOV
430
        if err = task.Decode(bytes.NewBuffer(item)); err != nil {
×
431
                return task, err
×
432
        }
×
433

UNCOV
434
        return task, nil
×
435
}
436

437
// len returns the number of items in the queue. This will be the addition of
438
// the number of items in the main queue and the number in the head queue.
439
func (d *DiskQueueDB[T]) len(tx kvdb.RTx) (uint64, error) {
3✔
440
        numMain, err := d.numItems(tx, queueMainBkt)
3✔
441
        if err != nil {
3✔
442
                return 0, err
×
443
        }
×
444

445
        numHead, err := d.numItems(tx, queueHeadBkt)
3✔
446
        if err != nil {
3✔
447
                return 0, err
×
448
        }
×
449

450
        return numMain + numHead, nil
3✔
451
}
452

453
// numItems returns the number of items in the given queue.
454
func (d *DiskQueueDB[T]) numItems(tx kvdb.RTx, queueName []byte) (uint64,
455
        error) {
3✔
456

3✔
457
        // Get the queue bucket at the correct namespace.
3✔
458
        namespacedBkt := tx.ReadBucket(d.topLevelBkt)
3✔
459
        if namespacedBkt == nil {
6✔
460
                return 0, nil
3✔
461
        }
3✔
462

UNCOV
463
        mainTasksBucket := namespacedBkt.NestedReadBucket(cTaskQueue)
×
UNCOV
464
        if mainTasksBucket == nil {
×
465
                return 0, nil
×
466
        }
×
467

UNCOV
468
        bucket := mainTasksBucket.NestedReadBucket(queueName)
×
UNCOV
469
        if bucket == nil {
×
UNCOV
470
                return 0, nil
×
UNCOV
471
        }
×
472

UNCOV
473
        var (
×
UNCOV
474
                oldestIndex uint64
×
UNCOV
475
                nextIndex   uint64
×
UNCOV
476
                err         error
×
UNCOV
477
        )
×
UNCOV
478

×
UNCOV
479
        // Get the next index key.
×
UNCOV
480
        nextIndexB := bucket.Get(nextIndexKey)
×
UNCOV
481
        if nextIndexB != nil {
×
UNCOV
482
                nextIndex, err = readBigSize(nextIndexB)
×
UNCOV
483
                if err != nil {
×
484
                        return 0, err
×
485
                }
×
486
        }
487

488
        // Get the oldest index.
UNCOV
489
        oldestIndexB := bucket.Get(oldestIndexKey)
×
UNCOV
490
        if oldestIndexB != nil {
×
UNCOV
491
                oldestIndex, err = readBigSize(oldestIndexB)
×
UNCOV
492
                if err != nil {
×
493
                        return 0, err
×
494
                }
×
495
        }
496

UNCOV
497
        return nextIndex - oldestIndex, nil
×
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc