• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15951470896

29 Jun 2025 04:23AM UTC coverage: 67.594% (-0.01%) from 67.606%
15951470896

Pull #9751

github

web-flow
Merge 599d9b051 into 6290edf14
Pull Request #9751: multi: update Go to 1.23.10 and update some packages

135088 of 199851 relevant lines covered (67.59%)

21909.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.67
/watchtower/wtdb/queue.go
1
package wtdb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
)
12

13
var (
14
        // queueMainBkt will hold the main queue contents. It will have the
15
        // following structure:
16
        //                         => oldestIndexKey => oldest index
17
        //                      => nextIndexKey => newest index
18
        //                        => itemsBkt => <index> -> item
19
        //
20
        // Any items added to the queue via Push, will be added to this queue.
21
        // Items will only be popped from this queue if the head queue is empty.
22
        queueMainBkt = []byte("queue-main")
23

24
        // queueHeadBkt will hold the items that have been pushed to the head
25
        // of the queue. It will have the following structure:
26
        //                         => oldestIndexKey => oldest index
27
        //                      => nextIndexKey => newest index
28
        //                        => itemsBkt => <index> -> item
29
        //
30
        // If PushHead is called with a new set of items, then first all
31
        // remaining items in the head queue will be popped and added ot the
32
        // given set of items. Then, once the head queue is empty, the set of
33
        // items will be pushed to the queue. If this queue is not empty, then
34
        // Pop will pop items from this queue before popping from the main
35
        // queue.
36
        queueHeadBkt = []byte("queue-head")
37

38
        // itemsBkt is a sub-bucket of both the main and head queue storing:
39
        //                 index -> encoded item
40
        itemsBkt = []byte("items")
41

42
        // oldestIndexKey is a key of both the main and head queue storing the
43
        // index of the item at the head of the queue.
44
        oldestIndexKey = []byte("oldest-index")
45

46
        // nextIndexKey is a key of both the main and head queue storing the
47
        // index of the item at the tail of the queue.
48
        nextIndexKey = []byte("next-index")
49

50
        // ErrEmptyQueue is returned from Pop if there are no items left in
51
        // the queue.
52
        ErrEmptyQueue = errors.New("queue is empty")
53
)
54

55
// Queue is an interface describing a FIFO queue for any generic type T.
56
type Queue[T any] interface {
57
        // Len returns the number of tasks in the queue.
58
        Len() (uint64, error)
59

60
        // Push pushes new T items to the tail of the queue.
61
        Push(items ...T) error
62

63
        // PopUpTo attempts to pop up to n items from the head of the queue. If
64
        // no more items are in the queue then ErrEmptyQueue is returned.
65
        PopUpTo(n int) ([]T, error)
66

67
        // PushHead pushes new T items to the head of the queue.
68
        PushHead(items ...T) error
69
}
70

71
// Serializable is an interface must be satisfied for any type that the
72
// DiskQueueDB should handle.
73
type Serializable interface {
74
        Encode(w io.Writer) error
75
        Decode(r io.Reader) error
76
}
77

78
// DiskQueueDB is a generic Bolt DB implementation of the Queue interface.
79
type DiskQueueDB[T Serializable] struct {
80
        db          kvdb.Backend
81
        topLevelBkt []byte
82
        constructor func() T
83
        onItemWrite func(tx kvdb.RwTx, item T) error
84
}
85

86
// A compile-time check to ensure that DiskQueueDB implements the Queue
87
// interface.
88
var _ Queue[Serializable] = (*DiskQueueDB[Serializable])(nil)
89

90
// NewQueueDB constructs a new DiskQueueDB. A queueBktName must be provided so
91
// that the DiskQueueDB can create its own namespace in the bolt db.
92
func NewQueueDB[T Serializable](db kvdb.Backend, queueBktName []byte,
93
        constructor func() T,
94
        onItemWrite func(tx kvdb.RwTx, item T) error) Queue[T] {
43✔
95

43✔
96
        return &DiskQueueDB[T]{
43✔
97
                db:          db,
43✔
98
                topLevelBkt: queueBktName,
43✔
99
                constructor: constructor,
43✔
100
                onItemWrite: onItemWrite,
43✔
101
        }
43✔
102
}
43✔
103

104
// Len returns the number of tasks in the queue.
105
//
106
// NOTE: This is part of the Queue interface.
107
func (d *DiskQueueDB[T]) Len() (uint64, error) {
73✔
108
        var res uint64
73✔
109
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
146✔
110
                var err error
73✔
111
                res, err = d.len(tx)
73✔
112

73✔
113
                return err
73✔
114
        }, func() {
146✔
115
                res = 0
73✔
116
        })
73✔
117
        if err != nil {
73✔
118
                return 0, err
×
119
        }
×
120

121
        return res, nil
73✔
122
}
123

124
// Push adds a T to the tail of the queue.
125
//
126
// NOTE: This is part of the Queue interface.
127
func (d *DiskQueueDB[T]) Push(items ...T) error {
527✔
128
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
1,054✔
129
                for _, item := range items {
179,639✔
130
                        err := d.addItem(tx, queueMainBkt, item)
179,112✔
131
                        if err != nil {
179,112✔
132
                                return err
×
133
                        }
×
134
                }
135

136
                return nil
527✔
137
        }, func() {})
527✔
138
}
139

140
// PopUpTo attempts to pop up to n items from the queue. If the queue is empty,
141
// then ErrEmptyQueue is returned.
142
//
143
// NOTE: This is part of the Queue interface.
144
func (d *DiskQueueDB[T]) PopUpTo(n int) ([]T, error) {
732✔
145
        var items []T
732✔
146

732✔
147
        err := d.db.Update(func(tx walletdb.ReadWriteTx) error {
1,464✔
148
                // Get the number of items in the queue.
732✔
149
                l, err := d.len(tx)
732✔
150
                if err != nil {
732✔
151
                        return err
×
152
                }
×
153

154
                // If there are no items, then we are done.
155
                if l == 0 {
805✔
156
                        return ErrEmptyQueue
73✔
157
                }
73✔
158

159
                // If the number of items in the queue is less than the maximum
160
                // specified by the caller, then set the maximum to the number
161
                // of items that there actually are.
162
                num := n
659✔
163
                if l < uint64(n) {
662✔
164
                        num = int(l)
3✔
165
                }
3✔
166

167
                // Pop the specified number of items off of the queue.
168
                items = make([]T, 0, num)
659✔
169
                for i := 0; i < num; i++ {
200,953✔
170
                        item, err := d.pop(tx)
200,294✔
171
                        if err != nil {
200,294✔
172
                                return err
×
173
                        }
×
174

175
                        items = append(items, item)
200,294✔
176
                }
177

178
                return err
659✔
179
        }, func() {
732✔
180
                items = nil
732✔
181
        })
732✔
182
        if err != nil {
805✔
183
                return nil, err
73✔
184
        }
73✔
185

186
        return items, nil
659✔
187
}
188

189
// PushHead pushes new T items to the head of the queue. For this implementation
190
// of the Queue interface, this will require popping all items currently in the
191
// head queue and adding them after first adding the given list of items. Care
192
// should thus be taken to never have an unbounded number of items in the head
193
// queue.
194
//
195
// NOTE: This is part of the Queue interface.
196
func (d *DiskQueueDB[T]) PushHead(items ...T) error {
62✔
197
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
124✔
198
                // Determine how many items are still in the head queue.
62✔
199
                numHead, err := d.numItems(tx, queueHeadBkt)
62✔
200
                if err != nil {
62✔
201
                        return err
×
202
                }
×
203

204
                // Create a new in-memory list that will contain all the new
205
                // items along with the items currently in the queue.
206
                itemList := make([]T, 0, int(numHead)+len(items))
62✔
207

62✔
208
                // Insert all the given items into the list first since these
62✔
209
                // should be at the head of the queue.
62✔
210
                itemList = append(itemList, items...)
62✔
211

62✔
212
                // Now, read out all the items that are currently in the
62✔
213
                // persisted head queue and add them to the back of the list
62✔
214
                // of items to be added.
62✔
215
                for {
129✔
216
                        t, err := d.nextItem(tx, queueHeadBkt)
67✔
217
                        if errors.Is(err, ErrEmptyQueue) {
129✔
218
                                break
62✔
219
                        } else if err != nil {
5✔
220
                                return err
×
221
                        }
×
222

223
                        itemList = append(itemList, t)
5✔
224
                }
225

226
                // Now the head queue is empty, the items can be pushed to the
227
                // queue.
228
                for _, item := range itemList {
21,249✔
229
                        err := d.addItem(tx, queueHeadBkt, item)
21,187✔
230
                        if err != nil {
21,187✔
231
                                return err
×
232
                        }
×
233
                }
234

235
                return nil
62✔
236
        }, func() {})
62✔
237
}
238

239
// pop gets the next T item from the head of the queue. If no more items are in
240
// the queue then ErrEmptyQueue is returned.
241
func (d *DiskQueueDB[T]) pop(tx walletdb.ReadWriteTx) (T, error) {
200,294✔
242
        // First, check if there are items left in the head queue.
200,294✔
243
        item, err := d.nextItem(tx, queueHeadBkt)
200,294✔
244

200,294✔
245
        // No error means that an item was found in the head queue.
200,294✔
246
        if err == nil {
221,476✔
247
                return item, nil
21,182✔
248
        }
21,182✔
249

250
        // Else, if error is not ErrEmptyQueue, then return the error.
251
        if !errors.Is(err, ErrEmptyQueue) {
179,112✔
252
                return item, err
×
253
        }
×
254

255
        // Otherwise, the head queue is empty, so we now check if there are
256
        // items in the main queue.
257
        return d.nextItem(tx, queueMainBkt)
179,112✔
258
}
259

260
// addItem adds the given item to the back of the given queue.
261
func (d *DiskQueueDB[T]) addItem(tx kvdb.RwTx, queueName []byte, item T) error {
200,299✔
262
        var (
200,299✔
263
                namespacedBkt = tx.ReadWriteBucket(d.topLevelBkt)
200,299✔
264
                err           error
200,299✔
265
        )
200,299✔
266
        if namespacedBkt == nil {
200,326✔
267
                namespacedBkt, err = tx.CreateTopLevelBucket(d.topLevelBkt)
27✔
268
                if err != nil {
27✔
269
                        return err
×
270
                }
×
271
        }
272

273
        mainTasksBucket, err := namespacedBkt.CreateBucketIfNotExists(
200,299✔
274
                cTaskQueue,
200,299✔
275
        )
200,299✔
276
        if err != nil {
200,299✔
277
                return err
×
278
        }
×
279

280
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
200,299✔
281
        if err != nil {
200,299✔
282
                return err
×
283
        }
×
284

285
        if d.onItemWrite != nil {
400,598✔
286
                err = d.onItemWrite(tx, item)
200,299✔
287
                if err != nil {
200,299✔
288
                        return err
×
289
                }
×
290
        }
291

292
        // Find the index to use for placing this new item at the back of the
293
        // queue.
294
        var nextIndex uint64
200,299✔
295
        nextIndexB := bucket.Get(nextIndexKey)
200,299✔
296
        if nextIndexB != nil {
400,570✔
297
                nextIndex, err = readBigSize(nextIndexB)
200,271✔
298
                if err != nil {
200,271✔
299
                        return err
×
300
                }
×
301
        } else {
28✔
302
                nextIndexB, err = writeBigSize(0)
28✔
303
                if err != nil {
28✔
304
                        return err
×
305
                }
×
306
        }
307

308
        tasksBucket, err := bucket.CreateBucketIfNotExists(itemsBkt)
200,299✔
309
        if err != nil {
200,299✔
310
                return err
×
311
        }
×
312

313
        var buff bytes.Buffer
200,299✔
314
        err = item.Encode(&buff)
200,299✔
315
        if err != nil {
200,299✔
316
                return err
×
317
        }
×
318

319
        // Put the new task in the assigned index.
320
        err = tasksBucket.Put(nextIndexB, buff.Bytes())
200,299✔
321
        if err != nil {
200,299✔
322
                return err
×
323
        }
×
324

325
        // Increment the next-index counter.
326
        nextIndex++
200,299✔
327
        nextIndexB, err = writeBigSize(nextIndex)
200,299✔
328
        if err != nil {
200,299✔
329
                return err
×
330
        }
×
331

332
        return bucket.Put(nextIndexKey, nextIndexB)
200,299✔
333
}
334

335
// nextItem pops an item of the queue identified by the given namespace. If
336
// there are no items on the queue then ErrEmptyQueue is returned.
337
func (d *DiskQueueDB[T]) nextItem(tx kvdb.RwTx, queueName []byte) (T, error) {
379,473✔
338
        task := d.constructor()
379,473✔
339

379,473✔
340
        namespacedBkt := tx.ReadWriteBucket(d.topLevelBkt)
379,473✔
341
        if namespacedBkt == nil {
379,481✔
342
                return task, ErrEmptyQueue
8✔
343
        }
8✔
344

345
        mainTasksBucket := namespacedBkt.NestedReadWriteBucket(cTaskQueue)
379,465✔
346
        if mainTasksBucket == nil {
379,465✔
347
                return task, ErrEmptyQueue
×
348
        }
×
349

350
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
379,465✔
351
        if err != nil {
379,465✔
352
                return task, err
×
353
        }
×
354

355
        // Get the index of the tail of the queue.
356
        var nextIndex uint64
379,465✔
357
        nextIndexB := bucket.Get(nextIndexKey)
379,465✔
358
        if nextIndexB != nil {
758,904✔
359
                nextIndex, err = readBigSize(nextIndexB)
379,439✔
360
                if err != nil {
379,439✔
361
                        return task, err
×
362
                }
×
363
        }
364

365
        // Get the index of the head of the queue.
366
        var oldestIndex uint64
379,465✔
367
        oldestIndexB := bucket.Get(oldestIndexKey)
379,465✔
368
        if oldestIndexB != nil {
758,876✔
369
                oldestIndex, err = readBigSize(oldestIndexB)
379,411✔
370
                if err != nil {
379,411✔
371
                        return task, err
×
372
                }
×
373
        } else {
54✔
374
                oldestIndexB, err = writeBigSize(0)
54✔
375
                if err != nil {
54✔
376
                        return task, err
×
377
                }
×
378
        }
379

380
        // If the head and tail are equal, then there are no items in the queue.
381
        if oldestIndex == nextIndex {
558,631✔
382
                // Take this opportunity to reset both indexes to zero.
179,166✔
383
                zeroIndexB, err := writeBigSize(0)
179,166✔
384
                if err != nil {
179,166✔
385
                        return task, err
×
386
                }
×
387

388
                err = bucket.Put(oldestIndexKey, zeroIndexB)
179,166✔
389
                if err != nil {
179,166✔
390
                        return task, err
×
391
                }
×
392

393
                err = bucket.Put(nextIndexKey, zeroIndexB)
179,166✔
394
                if err != nil {
179,166✔
395
                        return task, err
×
396
                }
×
397

398
                return task, ErrEmptyQueue
179,166✔
399
        }
400

401
        // Otherwise, pop the item at the oldest index.
402
        tasksBucket := bucket.NestedReadWriteBucket(itemsBkt)
200,299✔
403
        if tasksBucket == nil {
200,299✔
404
                return task, fmt.Errorf("client-tasks bucket not found")
×
405
        }
×
406

407
        item := tasksBucket.Get(oldestIndexB)
200,299✔
408
        if item == nil {
200,299✔
409
                return task, fmt.Errorf("no task found under index")
×
410
        }
×
411

412
        err = tasksBucket.Delete(oldestIndexB)
200,299✔
413
        if err != nil {
200,299✔
414
                return task, err
×
415
        }
×
416

417
        // Increment the oldestIndex value so that it now points to the new
418
        // oldest item.
419
        oldestIndex++
200,299✔
420
        oldestIndexB, err = writeBigSize(oldestIndex)
200,299✔
421
        if err != nil {
200,299✔
422
                return task, err
×
423
        }
×
424

425
        err = bucket.Put(oldestIndexKey, oldestIndexB)
200,299✔
426
        if err != nil {
200,299✔
427
                return task, err
×
428
        }
×
429

430
        if err = task.Decode(bytes.NewBuffer(item)); err != nil {
200,299✔
431
                return task, err
×
432
        }
×
433

434
        return task, nil
200,299✔
435
}
436

437
// len returns the number of items in the queue. This will be the addition of
438
// the number of items in the main queue and the number in the head queue.
439
func (d *DiskQueueDB[T]) len(tx kvdb.RTx) (uint64, error) {
805✔
440
        numMain, err := d.numItems(tx, queueMainBkt)
805✔
441
        if err != nil {
805✔
442
                return 0, err
×
443
        }
×
444

445
        numHead, err := d.numItems(tx, queueHeadBkt)
805✔
446
        if err != nil {
805✔
447
                return 0, err
×
448
        }
×
449

450
        return numMain + numHead, nil
805✔
451
}
452

453
// numItems returns the number of items in the given queue.
454
func (d *DiskQueueDB[T]) numItems(tx kvdb.RTx, queueName []byte) (uint64,
455
        error) {
1,666✔
456

1,666✔
457
        // Get the queue bucket at the correct namespace.
1,666✔
458
        namespacedBkt := tx.ReadBucket(d.topLevelBkt)
1,666✔
459
        if namespacedBkt == nil {
1,740✔
460
                return 0, nil
74✔
461
        }
74✔
462

463
        mainTasksBucket := namespacedBkt.NestedReadBucket(cTaskQueue)
1,592✔
464
        if mainTasksBucket == nil {
1,592✔
465
                return 0, nil
×
466
        }
×
467

468
        bucket := mainTasksBucket.NestedReadBucket(queueName)
1,592✔
469
        if bucket == nil {
1,619✔
470
                return 0, nil
27✔
471
        }
27✔
472

473
        var (
1,565✔
474
                oldestIndex uint64
1,565✔
475
                nextIndex   uint64
1,565✔
476
                err         error
1,565✔
477
        )
1,565✔
478

1,565✔
479
        // Get the next index key.
1,565✔
480
        nextIndexB := bucket.Get(nextIndexKey)
1,565✔
481
        if nextIndexB != nil {
3,130✔
482
                nextIndex, err = readBigSize(nextIndexB)
1,565✔
483
                if err != nil {
1,565✔
484
                        return 0, err
×
485
                }
×
486
        }
487

488
        // Get the oldest index.
489
        oldestIndexB := bucket.Get(oldestIndexKey)
1,565✔
490
        if oldestIndexB != nil {
3,097✔
491
                oldestIndex, err = readBigSize(oldestIndexB)
1,532✔
492
                if err != nil {
1,532✔
493
                        return 0, err
×
494
                }
×
495
        }
496

497
        return nextIndex - oldestIndex, nil
1,565✔
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc