• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13586005509

28 Feb 2025 10:14AM UTC coverage: 68.629% (+9.9%) from 58.77%
13586005509

Pull #9521

github

web-flow
Merge 37d3a70a5 into 8532955b3
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

129950 of 189351 relevant lines covered (68.63%)

23726.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.67
/watchtower/wtdb/queue.go
1
package wtdb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
)
12

13
var (
14
        // queueMainBkt will hold the main queue contents. It will have the
15
        // following structure:
16
        //                         => oldestIndexKey => oldest index
17
        //                      => nextIndexKey => newest index
18
        //                        => itemsBkt => <index> -> item
19
        //
20
        // Any items added to the queue via Push, will be added to this queue.
21
        // Items will only be popped from this queue if the head queue is empty.
22
        queueMainBkt = []byte("queue-main")
23

24
        // queueHeadBkt will hold the items that have been pushed to the head
25
        // of the queue. It will have the following structure:
26
        //                         => oldestIndexKey => oldest index
27
        //                      => nextIndexKey => newest index
28
        //                        => itemsBkt => <index> -> item
29
        //
30
        // If PushHead is called with a new set of items, then first all
31
        // remaining items in the head queue will be popped and added ot the
32
        // given set of items. Then, once the head queue is empty, the set of
33
        // items will be pushed to the queue. If this queue is not empty, then
34
        // Pop will pop items from this queue before popping from the main
35
        // queue.
36
        queueHeadBkt = []byte("queue-head")
37

38
        // itemsBkt is a sub-bucket of both the main and head queue storing:
39
        //                 index -> encoded item
40
        itemsBkt = []byte("items")
41

42
        // oldestIndexKey is a key of both the main and head queue storing the
43
        // index of the item at the head of the queue.
44
        oldestIndexKey = []byte("oldest-index")
45

46
        // nextIndexKey is a key of both the main and head queue storing the
47
        // index of the item at the tail of the queue.
48
        nextIndexKey = []byte("next-index")
49

50
        // ErrEmptyQueue is returned from Pop if there are no items left in
51
        // the queue.
52
        ErrEmptyQueue = errors.New("queue is empty")
53
)
54

55
// Queue is an interface describing a FIFO queue for any generic type T.
56
type Queue[T any] interface {
57
        // Len returns the number of tasks in the queue.
58
        Len() (uint64, error)
59

60
        // Push pushes new T items to the tail of the queue.
61
        Push(items ...T) error
62

63
        // PopUpTo attempts to pop up to n items from the head of the queue. If
64
        // no more items are in the queue then ErrEmptyQueue is returned.
65
        PopUpTo(n int) ([]T, error)
66

67
        // PushHead pushes new T items to the head of the queue.
68
        PushHead(items ...T) error
69
}
70

71
// Serializable is an interface must be satisfied for any type that the
72
// DiskQueueDB should handle.
73
type Serializable interface {
74
        Encode(w io.Writer) error
75
        Decode(r io.Reader) error
76
}
77

78
// DiskQueueDB is a generic Bolt DB implementation of the Queue interface.
79
type DiskQueueDB[T Serializable] struct {
80
        db          kvdb.Backend
81
        topLevelBkt []byte
82
        constructor func() T
83
        onItemWrite func(tx kvdb.RwTx, item T) error
84
}
85

86
// A compile-time check to ensure that DiskQueueDB implements the Queue
87
// interface.
88
var _ Queue[Serializable] = (*DiskQueueDB[Serializable])(nil)
89

90
// NewQueueDB constructs a new DiskQueueDB. A queueBktName must be provided so
91
// that the DiskQueueDB can create its own namespace in the bolt db.
92
func NewQueueDB[T Serializable](db kvdb.Backend, queueBktName []byte,
93
        constructor func() T,
94
        onItemWrite func(tx kvdb.RwTx, item T) error) Queue[T] {
43✔
95

43✔
96
        return &DiskQueueDB[T]{
43✔
97
                db:          db,
43✔
98
                topLevelBkt: queueBktName,
43✔
99
                constructor: constructor,
43✔
100
                onItemWrite: onItemWrite,
43✔
101
        }
43✔
102
}
43✔
103

104
// Len returns the number of tasks in the queue.
105
//
106
// NOTE: This is part of the Queue interface.
107
func (d *DiskQueueDB[T]) Len() (uint64, error) {
81✔
108
        var res uint64
81✔
109
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
162✔
110
                var err error
81✔
111
                res, err = d.len(tx)
81✔
112

81✔
113
                return err
81✔
114
        }, func() {
162✔
115
                res = 0
81✔
116
        })
81✔
117
        if err != nil {
81✔
118
                return 0, err
×
119
        }
×
120

121
        return res, nil
81✔
122
}
123

124
// Push adds a T to the tail of the queue.
125
//
126
// NOTE: This is part of the Queue interface.
127
func (d *DiskQueueDB[T]) Push(items ...T) error {
531✔
128
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
1,062✔
129
                for _, item := range items {
192,104✔
130
                        err := d.addItem(tx, queueMainBkt, item)
191,573✔
131
                        if err != nil {
191,573✔
132
                                return err
×
133
                        }
×
134
                }
135

136
                return nil
531✔
137
        }, func() {})
531✔
138
}
139

140
// PopUpTo attempts to pop up to n items from the queue. If the queue is empty,
141
// then ErrEmptyQueue is returned.
142
//
143
// NOTE: This is part of the Queue interface.
144
func (d *DiskQueueDB[T]) PopUpTo(n int) ([]T, error) {
761✔
145
        var items []T
761✔
146

761✔
147
        err := d.db.Update(func(tx walletdb.ReadWriteTx) error {
1,522✔
148
                // Get the number of items in the queue.
761✔
149
                l, err := d.len(tx)
761✔
150
                if err != nil {
761✔
151
                        return err
×
152
                }
×
153

154
                // If there are no items, then we are done.
155
                if l == 0 {
842✔
156
                        return ErrEmptyQueue
81✔
157
                }
81✔
158

159
                // If the number of items in the queue is less than the maximum
160
                // specified by the caller, then set the maximum to the number
161
                // of items that there actually are.
162
                num := n
680✔
163
                if l < uint64(n) {
686✔
164
                        num = int(l)
6✔
165
                }
6✔
166

167
                // Pop the specified number of items off of the queue.
168
                items = make([]T, 0, num)
680✔
169
                for i := 0; i < num; i++ {
233,021✔
170
                        item, err := d.pop(tx)
232,341✔
171
                        if err != nil {
232,341✔
172
                                return err
×
173
                        }
×
174

175
                        items = append(items, item)
232,341✔
176
                }
177

178
                return err
680✔
179
        }, func() {
761✔
180
                items = nil
761✔
181
        })
761✔
182
        if err != nil {
842✔
183
                return nil, err
81✔
184
        }
81✔
185

186
        return items, nil
680✔
187
}
188

189
// PushHead pushes new T items to the head of the queue. For this implementation
190
// of the Queue interface, this will require popping all items currently in the
191
// head queue and adding them after first adding the given list of items. Care
192
// should thus be taken to never have an unbounded number of items in the head
193
// queue.
194
//
195
// NOTE: This is part of the Queue interface.
196
func (d *DiskQueueDB[T]) PushHead(items ...T) error {
70✔
197
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
140✔
198
                // Determine how many items are still in the head queue.
70✔
199
                numHead, err := d.numItems(tx, queueHeadBkt)
70✔
200
                if err != nil {
70✔
201
                        return err
×
202
                }
×
203

204
                // Create a new in-memory list that will contain all the new
205
                // items along with the items currently in the queue.
206
                itemList := make([]T, 0, int(numHead)+len(items))
70✔
207

70✔
208
                // Insert all the given items into the list first since these
70✔
209
                // should be at the head of the queue.
70✔
210
                itemList = append(itemList, items...)
70✔
211

70✔
212
                // Now, read out all the items that are currently in the
70✔
213
                // persisted head queue and add them to the back of the list
70✔
214
                // of items to be added.
70✔
215
                for {
145✔
216
                        t, err := d.nextItem(tx, queueHeadBkt)
75✔
217
                        if errors.Is(err, ErrEmptyQueue) {
145✔
218
                                break
70✔
219
                        } else if err != nil {
5✔
220
                                return err
×
221
                        }
×
222

223
                        itemList = append(itemList, t)
5✔
224
                }
225

226
                // Now the head queue is empty, the items can be pushed to the
227
                // queue.
228
                for _, item := range itemList {
40,843✔
229
                        err := d.addItem(tx, queueHeadBkt, item)
40,773✔
230
                        if err != nil {
40,773✔
231
                                return err
×
232
                        }
×
233
                }
234

235
                return nil
70✔
236
        }, func() {})
70✔
237
}
238

239
// pop gets the next T item from the head of the queue. If no more items are in
240
// the queue then ErrEmptyQueue is returned.
241
func (d *DiskQueueDB[T]) pop(tx walletdb.ReadWriteTx) (T, error) {
232,341✔
242
        // First, check if there are items left in the head queue.
232,341✔
243
        item, err := d.nextItem(tx, queueHeadBkt)
232,341✔
244

232,341✔
245
        // No error means that an item was found in the head queue.
232,341✔
246
        if err == nil {
273,109✔
247
                return item, nil
40,768✔
248
        }
40,768✔
249

250
        // Else, if error is not ErrEmptyQueue, then return the error.
251
        if !errors.Is(err, ErrEmptyQueue) {
191,573✔
252
                return item, err
×
253
        }
×
254

255
        // Otherwise, the head queue is empty, so we now check if there are
256
        // items in the main queue.
257
        return d.nextItem(tx, queueMainBkt)
191,573✔
258
}
259

260
// addItem adds the given item to the back of the given queue.
261
func (d *DiskQueueDB[T]) addItem(tx kvdb.RwTx, queueName []byte, item T) error {
232,346✔
262
        var (
232,346✔
263
                namespacedBkt = tx.ReadWriteBucket(d.topLevelBkt)
232,346✔
264
                err           error
232,346✔
265
        )
232,346✔
266
        if namespacedBkt == nil {
232,373✔
267
                namespacedBkt, err = tx.CreateTopLevelBucket(d.topLevelBkt)
27✔
268
                if err != nil {
27✔
269
                        return err
×
270
                }
×
271
        }
272

273
        mainTasksBucket, err := namespacedBkt.CreateBucketIfNotExists(
232,346✔
274
                cTaskQueue,
232,346✔
275
        )
232,346✔
276
        if err != nil {
232,346✔
277
                return err
×
278
        }
×
279

280
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
232,346✔
281
        if err != nil {
232,346✔
282
                return err
×
283
        }
×
284

285
        if d.onItemWrite != nil {
464,692✔
286
                err = d.onItemWrite(tx, item)
232,346✔
287
                if err != nil {
232,346✔
288
                        return err
×
289
                }
×
290
        }
291

292
        // Find the index to use for placing this new item at the back of the
293
        // queue.
294
        var nextIndex uint64
232,346✔
295
        nextIndexB := bucket.Get(nextIndexKey)
232,346✔
296
        if nextIndexB != nil {
464,665✔
297
                nextIndex, err = readBigSize(nextIndexB)
232,319✔
298
                if err != nil {
232,319✔
299
                        return err
×
300
                }
×
301
        } else {
27✔
302
                nextIndexB, err = writeBigSize(0)
27✔
303
                if err != nil {
27✔
304
                        return err
×
305
                }
×
306
        }
307

308
        tasksBucket, err := bucket.CreateBucketIfNotExists(itemsBkt)
232,346✔
309
        if err != nil {
232,346✔
310
                return err
×
311
        }
×
312

313
        var buff bytes.Buffer
232,346✔
314
        err = item.Encode(&buff)
232,346✔
315
        if err != nil {
232,346✔
316
                return err
×
317
        }
×
318

319
        // Put the new task in the assigned index.
320
        err = tasksBucket.Put(nextIndexB, buff.Bytes())
232,346✔
321
        if err != nil {
232,346✔
322
                return err
×
323
        }
×
324

325
        // Increment the next-index counter.
326
        nextIndex++
232,346✔
327
        nextIndexB, err = writeBigSize(nextIndex)
232,346✔
328
        if err != nil {
232,346✔
329
                return err
×
330
        }
×
331

332
        return bucket.Put(nextIndexKey, nextIndexB)
232,346✔
333
}
334

335
// nextItem pops an item of the queue identified by the given namespace. If
336
// there are no items on the queue then ErrEmptyQueue is returned.
337
func (d *DiskQueueDB[T]) nextItem(tx kvdb.RwTx, queueName []byte) (T, error) {
423,989✔
338
        task := d.constructor()
423,989✔
339

423,989✔
340
        namespacedBkt := tx.ReadWriteBucket(d.topLevelBkt)
423,989✔
341
        if namespacedBkt == nil {
423,996✔
342
                return task, ErrEmptyQueue
7✔
343
        }
7✔
344

345
        mainTasksBucket := namespacedBkt.NestedReadWriteBucket(cTaskQueue)
423,982✔
346
        if mainTasksBucket == nil {
423,982✔
347
                return task, ErrEmptyQueue
×
348
        }
×
349

350
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
423,982✔
351
        if err != nil {
423,982✔
352
                return task, err
×
353
        }
×
354

355
        // Get the index of the tail of the queue.
356
        var nextIndex uint64
423,982✔
357
        nextIndexB := bucket.Get(nextIndexKey)
423,982✔
358
        if nextIndexB != nil {
847,937✔
359
                nextIndex, err = readBigSize(nextIndexB)
423,955✔
360
                if err != nil {
423,955✔
361
                        return task, err
×
362
                }
×
363
        }
364

365
        // Get the index of the head of the queue.
366
        var oldestIndex uint64
423,982✔
367
        oldestIndexB := bucket.Get(oldestIndexKey)
423,982✔
368
        if oldestIndexB != nil {
847,910✔
369
                oldestIndex, err = readBigSize(oldestIndexB)
423,928✔
370
                if err != nil {
423,928✔
371
                        return task, err
×
372
                }
×
373
        } else {
54✔
374
                oldestIndexB, err = writeBigSize(0)
54✔
375
                if err != nil {
54✔
376
                        return task, err
×
377
                }
×
378
        }
379

380
        // If the head and tail are equal, then there are no items in the queue.
381
        if oldestIndex == nextIndex {
615,618✔
382
                // Take this opportunity to reset both indexes to zero.
191,636✔
383
                zeroIndexB, err := writeBigSize(0)
191,636✔
384
                if err != nil {
191,636✔
385
                        return task, err
×
386
                }
×
387

388
                err = bucket.Put(oldestIndexKey, zeroIndexB)
191,636✔
389
                if err != nil {
191,636✔
390
                        return task, err
×
391
                }
×
392

393
                err = bucket.Put(nextIndexKey, zeroIndexB)
191,636✔
394
                if err != nil {
191,636✔
395
                        return task, err
×
396
                }
×
397

398
                return task, ErrEmptyQueue
191,636✔
399
        }
400

401
        // Otherwise, pop the item at the oldest index.
402
        tasksBucket := bucket.NestedReadWriteBucket(itemsBkt)
232,346✔
403
        if tasksBucket == nil {
232,346✔
404
                return task, fmt.Errorf("client-tasks bucket not found")
×
405
        }
×
406

407
        item := tasksBucket.Get(oldestIndexB)
232,346✔
408
        if item == nil {
232,346✔
409
                return task, fmt.Errorf("no task found under index")
×
410
        }
×
411

412
        err = tasksBucket.Delete(oldestIndexB)
232,346✔
413
        if err != nil {
232,346✔
414
                return task, err
×
415
        }
×
416

417
        // Increment the oldestIndex value so that it now points to the new
418
        // oldest item.
419
        oldestIndex++
232,346✔
420
        oldestIndexB, err = writeBigSize(oldestIndex)
232,346✔
421
        if err != nil {
232,346✔
422
                return task, err
×
423
        }
×
424

425
        err = bucket.Put(oldestIndexKey, oldestIndexB)
232,346✔
426
        if err != nil {
232,346✔
427
                return task, err
×
428
        }
×
429

430
        if err = task.Decode(bytes.NewBuffer(item)); err != nil {
232,346✔
431
                return task, err
×
432
        }
×
433

434
        return task, nil
232,346✔
435
}
436

437
// len returns the number of items in the queue. This will be the addition of
438
// the number of items in the main queue and the number in the head queue.
439
func (d *DiskQueueDB[T]) len(tx kvdb.RTx) (uint64, error) {
842✔
440
        numMain, err := d.numItems(tx, queueMainBkt)
842✔
441
        if err != nil {
842✔
442
                return 0, err
×
443
        }
×
444

445
        numHead, err := d.numItems(tx, queueHeadBkt)
842✔
446
        if err != nil {
842✔
447
                return 0, err
×
448
        }
×
449

450
        return numMain + numHead, nil
842✔
451
}
452

453
// numItems returns the number of items in the given queue.
454
func (d *DiskQueueDB[T]) numItems(tx kvdb.RTx, queueName []byte) (uint64,
455
        error) {
1,748✔
456

1,748✔
457
        // Get the queue bucket at the correct namespace.
1,748✔
458
        namespacedBkt := tx.ReadBucket(d.topLevelBkt)
1,748✔
459
        if namespacedBkt == nil {
1,821✔
460
                return 0, nil
73✔
461
        }
73✔
462

463
        mainTasksBucket := namespacedBkt.NestedReadBucket(cTaskQueue)
1,675✔
464
        if mainTasksBucket == nil {
1,675✔
465
                return 0, nil
×
466
        }
×
467

468
        bucket := mainTasksBucket.NestedReadBucket(queueName)
1,675✔
469
        if bucket == nil {
1,703✔
470
                return 0, nil
28✔
471
        }
28✔
472

473
        var (
1,647✔
474
                oldestIndex uint64
1,647✔
475
                nextIndex   uint64
1,647✔
476
                err         error
1,647✔
477
        )
1,647✔
478

1,647✔
479
        // Get the next index key.
1,647✔
480
        nextIndexB := bucket.Get(nextIndexKey)
1,647✔
481
        if nextIndexB != nil {
3,294✔
482
                nextIndex, err = readBigSize(nextIndexB)
1,647✔
483
                if err != nil {
1,647✔
484
                        return 0, err
×
485
                }
×
486
        }
487

488
        // Get the oldest index.
489
        oldestIndexB := bucket.Get(oldestIndexKey)
1,647✔
490
        if oldestIndexB != nil {
3,264✔
491
                oldestIndex, err = readBigSize(oldestIndexB)
1,617✔
492
                if err != nil {
1,617✔
493
                        return 0, err
×
494
                }
×
495
        }
496

497
        return nextIndex - oldestIndex, nil
1,647✔
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc