• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13408822928

19 Feb 2025 08:59AM UTC coverage: 41.123% (-17.7%) from 58.794%
13408822928

Pull #9521

github

web-flow
Merge d2f397b3c into 0e8786348
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

92496 of 224923 relevant lines covered (41.12%)

18825.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.82
/watchtower/wtdb/queue.go
1
package wtdb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
)
12

13
var (
14
        // queueMainBkt will hold the main queue contents. It will have the
15
        // following structure:
16
        //                         => oldestIndexKey => oldest index
17
        //                      => nextIndexKey => newest index
18
        //                        => itemsBkt => <index> -> item
19
        //
20
        // Any items added to the queue via Push, will be added to this queue.
21
        // Items will only be popped from this queue if the head queue is empty.
22
        queueMainBkt = []byte("queue-main")
23

24
        // queueHeadBkt will hold the items that have been pushed to the head
25
        // of the queue. It will have the following structure:
26
        //                         => oldestIndexKey => oldest index
27
        //                      => nextIndexKey => newest index
28
        //                        => itemsBkt => <index> -> item
29
        //
30
        // If PushHead is called with a new set of items, then first all
31
        // remaining items in the head queue will be popped and added ot the
32
        // given set of items. Then, once the head queue is empty, the set of
33
        // items will be pushed to the queue. If this queue is not empty, then
34
        // Pop will pop items from this queue before popping from the main
35
        // queue.
36
        queueHeadBkt = []byte("queue-head")
37

38
        // itemsBkt is a sub-bucket of both the main and head queue storing:
39
        //                 index -> encoded item
40
        itemsBkt = []byte("items")
41

42
        // oldestIndexKey is a key of both the main and head queue storing the
43
        // index of the item at the head of the queue.
44
        oldestIndexKey = []byte("oldest-index")
45

46
        // nextIndexKey is a key of both the main and head queue storing the
47
        // index of the item at the tail of the queue.
48
        nextIndexKey = []byte("next-index")
49

50
        // ErrEmptyQueue is returned from Pop if there are no items left in
51
        // the queue.
52
        ErrEmptyQueue = errors.New("queue is empty")
53
)
54

55
// Queue is an interface describing a FIFO queue for any generic type T.
56
type Queue[T any] interface {
57
        // Len returns the number of tasks in the queue.
58
        Len() (uint64, error)
59

60
        // Push pushes new T items to the tail of the queue.
61
        Push(items ...T) error
62

63
        // PopUpTo attempts to pop up to n items from the head of the queue. If
64
        // no more items are in the queue then ErrEmptyQueue is returned.
65
        PopUpTo(n int) ([]T, error)
66

67
        // PushHead pushes new T items to the head of the queue.
68
        PushHead(items ...T) error
69
}
70

71
// Serializable is an interface must be satisfied for any type that the
72
// DiskQueueDB should handle.
73
type Serializable interface {
74
        Encode(w io.Writer) error
75
        Decode(r io.Reader) error
76
}
77

78
// DiskQueueDB is a generic Bolt DB implementation of the Queue interface.
79
type DiskQueueDB[T Serializable] struct {
80
        db          kvdb.Backend
81
        topLevelBkt []byte
82
        constructor func() T
83
        onItemWrite func(tx kvdb.RwTx, item T) error
84
}
85

86
// A compile-time check to ensure that DiskQueueDB implements the Queue
87
// interface.
88
var _ Queue[Serializable] = (*DiskQueueDB[Serializable])(nil)
89

90
// NewQueueDB constructs a new DiskQueueDB. A queueBktName must be provided so
91
// that the DiskQueueDB can create its own namespace in the bolt db.
92
func NewQueueDB[T Serializable](db kvdb.Backend, queueBktName []byte,
93
        constructor func() T,
94
        onItemWrite func(tx kvdb.RwTx, item T) error) Queue[T] {
1✔
95

1✔
96
        return &DiskQueueDB[T]{
1✔
97
                db:          db,
1✔
98
                topLevelBkt: queueBktName,
1✔
99
                constructor: constructor,
1✔
100
                onItemWrite: onItemWrite,
1✔
101
        }
1✔
102
}
1✔
103

104
// Len returns the number of tasks in the queue.
105
//
106
// NOTE: This is part of the Queue interface.
107
func (d *DiskQueueDB[T]) Len() (uint64, error) {
5✔
108
        var res uint64
5✔
109
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
10✔
110
                var err error
5✔
111
                res, err = d.len(tx)
5✔
112

5✔
113
                return err
5✔
114
        }, func() {
10✔
115
                res = 0
5✔
116
        })
5✔
117
        if err != nil {
5✔
118
                return 0, err
×
119
        }
×
120

121
        return res, nil
5✔
122
}
123

124
// Push adds a T to the tail of the queue.
125
//
126
// NOTE: This is part of the Queue interface.
127
func (d *DiskQueueDB[T]) Push(items ...T) error {
3✔
128
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
6✔
129
                for _, item := range items {
10✔
130
                        err := d.addItem(tx, queueMainBkt, item)
7✔
131
                        if err != nil {
7✔
132
                                return err
×
133
                        }
×
134
                }
135

136
                return nil
3✔
137
        }, func() {})
3✔
138
}
139

140
// PopUpTo attempts to pop up to n items from the queue. If the queue is empty,
141
// then ErrEmptyQueue is returned.
142
//
143
// NOTE: This is part of the Queue interface.
144
func (d *DiskQueueDB[T]) PopUpTo(n int) ([]T, error) {
5✔
145
        var items []T
5✔
146

5✔
147
        err := d.db.Update(func(tx walletdb.ReadWriteTx) error {
10✔
148
                // Get the number of items in the queue.
5✔
149
                l, err := d.len(tx)
5✔
150
                if err != nil {
5✔
151
                        return err
×
152
                }
×
153

154
                // If there are no items, then we are done.
155
                if l == 0 {
5✔
156
                        return ErrEmptyQueue
×
157
                }
×
158

159
                // If the number of items in the queue is less than the maximum
160
                // specified by the caller, then set the maximum to the number
161
                // of items that there actually are.
162
                num := n
5✔
163
                if l < uint64(n) {
5✔
164
                        num = int(l)
×
165
                }
×
166

167
                // Pop the specified number of items off of the queue.
168
                items = make([]T, 0, num)
5✔
169
                for i := 0; i < num; i++ {
18✔
170
                        item, err := d.pop(tx)
13✔
171
                        if err != nil {
13✔
172
                                return err
×
173
                        }
×
174

175
                        items = append(items, item)
13✔
176
                }
177

178
                return err
5✔
179
        }, func() {
5✔
180
                items = nil
5✔
181
        })
5✔
182
        if err != nil {
5✔
183
                return nil, err
×
184
        }
×
185

186
        return items, nil
5✔
187
}
188

189
// PushHead pushes new T items to the head of the queue. For this implementation
190
// of the Queue interface, this will require popping all items currently in the
191
// head queue and adding them after first adding the given list of items. Care
192
// should thus be taken to never have an unbounded number of items in the head
193
// queue.
194
//
195
// NOTE: This is part of the Queue interface.
196
func (d *DiskQueueDB[T]) PushHead(items ...T) error {
3✔
197
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
6✔
198
                // Determine how many items are still in the head queue.
3✔
199
                numHead, err := d.numItems(tx, queueHeadBkt)
3✔
200
                if err != nil {
3✔
201
                        return err
×
202
                }
×
203

204
                // Create a new in-memory list that will contain all the new
205
                // items along with the items currently in the queue.
206
                itemList := make([]T, 0, int(numHead)+len(items))
3✔
207

3✔
208
                // Insert all the given items into the list first since these
3✔
209
                // should be at the head of the queue.
3✔
210
                itemList = append(itemList, items...)
3✔
211

3✔
212
                // Now, read out all the items that are currently in the
3✔
213
                // persisted head queue and add them to the back of the list
3✔
214
                // of items to be added.
3✔
215
                for {
8✔
216
                        t, err := d.nextItem(tx, queueHeadBkt)
5✔
217
                        if errors.Is(err, ErrEmptyQueue) {
8✔
218
                                break
3✔
219
                        } else if err != nil {
2✔
220
                                return err
×
221
                        }
×
222

223
                        itemList = append(itemList, t)
2✔
224
                }
225

226
                // Now the head queue is empty, the items can be pushed to the
227
                // queue.
228
                for _, item := range itemList {
11✔
229
                        err := d.addItem(tx, queueHeadBkt, item)
8✔
230
                        if err != nil {
8✔
231
                                return err
×
232
                        }
×
233
                }
234

235
                return nil
3✔
236
        }, func() {})
3✔
237
}
238

239
// pop gets the next T item from the head of the queue. If no more items are in
240
// the queue then ErrEmptyQueue is returned.
241
func (d *DiskQueueDB[T]) pop(tx walletdb.ReadWriteTx) (T, error) {
13✔
242
        // First, check if there are items left in the head queue.
13✔
243
        item, err := d.nextItem(tx, queueHeadBkt)
13✔
244

13✔
245
        // No error means that an item was found in the head queue.
13✔
246
        if err == nil {
19✔
247
                return item, nil
6✔
248
        }
6✔
249

250
        // Else, if error is not ErrEmptyQueue, then return the error.
251
        if !errors.Is(err, ErrEmptyQueue) {
7✔
252
                return item, err
×
253
        }
×
254

255
        // Otherwise, the head queue is empty, so we now check if there are
256
        // items in the main queue.
257
        return d.nextItem(tx, queueMainBkt)
7✔
258
}
259

260
// addItem adds the given item to the back of the given queue.
261
func (d *DiskQueueDB[T]) addItem(tx kvdb.RwTx, queueName []byte, item T) error {
15✔
262
        var (
15✔
263
                namespacedBkt = tx.ReadWriteBucket(d.topLevelBkt)
15✔
264
                err           error
15✔
265
        )
15✔
266
        if namespacedBkt == nil {
16✔
267
                namespacedBkt, err = tx.CreateTopLevelBucket(d.topLevelBkt)
1✔
268
                if err != nil {
1✔
269
                        return err
×
270
                }
×
271
        }
272

273
        mainTasksBucket, err := namespacedBkt.CreateBucketIfNotExists(
15✔
274
                cTaskQueue,
15✔
275
        )
15✔
276
        if err != nil {
15✔
277
                return err
×
278
        }
×
279

280
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
15✔
281
        if err != nil {
15✔
282
                return err
×
283
        }
×
284

285
        if d.onItemWrite != nil {
30✔
286
                err = d.onItemWrite(tx, item)
15✔
287
                if err != nil {
15✔
288
                        return err
×
289
                }
×
290
        }
291

292
        // Find the index to use for placing this new item at the back of the
293
        // queue.
294
        var nextIndex uint64
15✔
295
        nextIndexB := bucket.Get(nextIndexKey)
15✔
296
        if nextIndexB != nil {
29✔
297
                nextIndex, err = readBigSize(nextIndexB)
14✔
298
                if err != nil {
14✔
299
                        return err
×
300
                }
×
301
        } else {
1✔
302
                nextIndexB, err = writeBigSize(0)
1✔
303
                if err != nil {
1✔
304
                        return err
×
305
                }
×
306
        }
307

308
        tasksBucket, err := bucket.CreateBucketIfNotExists(itemsBkt)
15✔
309
        if err != nil {
15✔
310
                return err
×
311
        }
×
312

313
        var buff bytes.Buffer
15✔
314
        err = item.Encode(&buff)
15✔
315
        if err != nil {
15✔
316
                return err
×
317
        }
×
318

319
        // Put the new task in the assigned index.
320
        err = tasksBucket.Put(nextIndexB, buff.Bytes())
15✔
321
        if err != nil {
15✔
322
                return err
×
323
        }
×
324

325
        // Increment the next-index counter.
326
        nextIndex++
15✔
327
        nextIndexB, err = writeBigSize(nextIndex)
15✔
328
        if err != nil {
15✔
329
                return err
×
330
        }
×
331

332
        return bucket.Put(nextIndexKey, nextIndexB)
15✔
333
}
334

335
// nextItem pops an item of the queue identified by the given namespace. If
336
// there are no items on the queue then ErrEmptyQueue is returned.
337
func (d *DiskQueueDB[T]) nextItem(tx kvdb.RwTx, queueName []byte) (T, error) {
25✔
338
        task := d.constructor()
25✔
339

25✔
340
        namespacedBkt := tx.ReadWriteBucket(d.topLevelBkt)
25✔
341
        if namespacedBkt == nil {
25✔
342
                return task, ErrEmptyQueue
×
343
        }
×
344

345
        mainTasksBucket := namespacedBkt.NestedReadWriteBucket(cTaskQueue)
25✔
346
        if mainTasksBucket == nil {
25✔
347
                return task, ErrEmptyQueue
×
348
        }
×
349

350
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
25✔
351
        if err != nil {
25✔
352
                return task, err
×
353
        }
×
354

355
        // Get the index of the tail of the queue.
356
        var nextIndex uint64
25✔
357
        nextIndexB := bucket.Get(nextIndexKey)
25✔
358
        if nextIndexB != nil {
49✔
359
                nextIndex, err = readBigSize(nextIndexB)
24✔
360
                if err != nil {
24✔
361
                        return task, err
×
362
                }
×
363
        }
364

365
        // Get the index of the head of the queue.
366
        var oldestIndex uint64
25✔
367
        oldestIndexB := bucket.Get(oldestIndexKey)
25✔
368
        if oldestIndexB != nil {
48✔
369
                oldestIndex, err = readBigSize(oldestIndexB)
23✔
370
                if err != nil {
23✔
371
                        return task, err
×
372
                }
×
373
        } else {
2✔
374
                oldestIndexB, err = writeBigSize(0)
2✔
375
                if err != nil {
2✔
376
                        return task, err
×
377
                }
×
378
        }
379

380
        // If the head and tail are equal, then there are no items in the queue.
381
        if oldestIndex == nextIndex {
35✔
382
                // Take this opportunity to reset both indexes to zero.
10✔
383
                zeroIndexB, err := writeBigSize(0)
10✔
384
                if err != nil {
10✔
385
                        return task, err
×
386
                }
×
387

388
                err = bucket.Put(oldestIndexKey, zeroIndexB)
10✔
389
                if err != nil {
10✔
390
                        return task, err
×
391
                }
×
392

393
                err = bucket.Put(nextIndexKey, zeroIndexB)
10✔
394
                if err != nil {
10✔
395
                        return task, err
×
396
                }
×
397

398
                return task, ErrEmptyQueue
10✔
399
        }
400

401
        // Otherwise, pop the item at the oldest index.
402
        tasksBucket := bucket.NestedReadWriteBucket(itemsBkt)
15✔
403
        if tasksBucket == nil {
15✔
404
                return task, fmt.Errorf("client-tasks bucket not found")
×
405
        }
×
406

407
        item := tasksBucket.Get(oldestIndexB)
15✔
408
        if item == nil {
15✔
409
                return task, fmt.Errorf("no task found under index")
×
410
        }
×
411

412
        err = tasksBucket.Delete(oldestIndexB)
15✔
413
        if err != nil {
15✔
414
                return task, err
×
415
        }
×
416

417
        // Increment the oldestIndex value so that it now points to the new
418
        // oldest item.
419
        oldestIndex++
15✔
420
        oldestIndexB, err = writeBigSize(oldestIndex)
15✔
421
        if err != nil {
15✔
422
                return task, err
×
423
        }
×
424

425
        err = bucket.Put(oldestIndexKey, oldestIndexB)
15✔
426
        if err != nil {
15✔
427
                return task, err
×
428
        }
×
429

430
        if err = task.Decode(bytes.NewBuffer(item)); err != nil {
15✔
431
                return task, err
×
432
        }
×
433

434
        return task, nil
15✔
435
}
436

437
// len returns the number of items in the queue. This will be the addition of
438
// the number of items in the main queue and the number in the head queue.
439
func (d *DiskQueueDB[T]) len(tx kvdb.RTx) (uint64, error) {
10✔
440
        numMain, err := d.numItems(tx, queueMainBkt)
10✔
441
        if err != nil {
10✔
442
                return 0, err
×
443
        }
×
444

445
        numHead, err := d.numItems(tx, queueHeadBkt)
10✔
446
        if err != nil {
10✔
447
                return 0, err
×
448
        }
×
449

450
        return numMain + numHead, nil
10✔
451
}
452

453
// numItems returns the number of items in the given queue.
454
func (d *DiskQueueDB[T]) numItems(tx kvdb.RTx, queueName []byte) (uint64,
455
        error) {
23✔
456

23✔
457
        // Get the queue bucket at the correct namespace.
23✔
458
        namespacedBkt := tx.ReadBucket(d.topLevelBkt)
23✔
459
        if namespacedBkt == nil {
25✔
460
                return 0, nil
2✔
461
        }
2✔
462

463
        mainTasksBucket := namespacedBkt.NestedReadBucket(cTaskQueue)
21✔
464
        if mainTasksBucket == nil {
21✔
465
                return 0, nil
×
466
        }
×
467

468
        bucket := mainTasksBucket.NestedReadBucket(queueName)
21✔
469
        if bucket == nil {
23✔
470
                return 0, nil
2✔
471
        }
2✔
472

473
        var (
19✔
474
                oldestIndex uint64
19✔
475
                nextIndex   uint64
19✔
476
                err         error
19✔
477
        )
19✔
478

19✔
479
        // Get the next index key.
19✔
480
        nextIndexB := bucket.Get(nextIndexKey)
19✔
481
        if nextIndexB != nil {
38✔
482
                nextIndex, err = readBigSize(nextIndexB)
19✔
483
                if err != nil {
19✔
484
                        return 0, err
×
485
                }
×
486
        }
487

488
        // Get the oldest index.
489
        oldestIndexB := bucket.Get(oldestIndexKey)
19✔
490
        if oldestIndexB != nil {
36✔
491
                oldestIndex, err = readBigSize(oldestIndexB)
17✔
492
                if err != nil {
17✔
493
                        return 0, err
×
494
                }
×
495
        }
496

497
        return nextIndex - oldestIndex, nil
19✔
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc