• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14448270482

14 Apr 2025 02:30PM UTC coverage: 69.061% (-0.03%) from 69.091%
14448270482

Pull #9674

github

web-flow
Merge 417d2de43 into 6d648ad90
Pull Request #9674: Move neutrino db also to postgres when using postgres as a backend

1 of 62 new or added lines in 1 file covered. (1.61%)

96 existing lines in 22 files now uncovered.

133554 of 193385 relevant lines covered (69.06%)

22152.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.95
/watchtower/wtdb/queue.go
1
package wtdb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
)
12

13
var (
14
        // queueMainBkt will hold the main queue contents. It will have the
15
        // following structure:
16
        //                         => oldestIndexKey => oldest index
17
        //                      => nextIndexKey => newest index
18
        //                        => itemsBkt => <index> -> item
19
        //
20
        // Any items added to the queue via Push, will be added to this queue.
21
        // Items will only be popped from this queue if the head queue is empty.
22
        queueMainBkt = []byte("queue-main")
23

24
        // queueHeadBkt will hold the items that have been pushed to the head
25
        // of the queue. It will have the following structure:
26
        //                         => oldestIndexKey => oldest index
27
        //                      => nextIndexKey => newest index
28
        //                        => itemsBkt => <index> -> item
29
        //
30
        // If PushHead is called with a new set of items, then first all
31
        // remaining items in the head queue will be popped and added ot the
32
        // given set of items. Then, once the head queue is empty, the set of
33
        // items will be pushed to the queue. If this queue is not empty, then
34
        // Pop will pop items from this queue before popping from the main
35
        // queue.
36
        queueHeadBkt = []byte("queue-head")
37

38
        // itemsBkt is a sub-bucket of both the main and head queue storing:
39
        //                 index -> encoded item
40
        itemsBkt = []byte("items")
41

42
        // oldestIndexKey is a key of both the main and head queue storing the
43
        // index of the item at the head of the queue.
44
        oldestIndexKey = []byte("oldest-index")
45

46
        // nextIndexKey is a key of both the main and head queue storing the
47
        // index of the item at the tail of the queue.
48
        nextIndexKey = []byte("next-index")
49

50
        // ErrEmptyQueue is returned from Pop if there are no items left in
51
        // the queue.
52
        ErrEmptyQueue = errors.New("queue is empty")
53
)
54

55
// Queue is an interface describing a FIFO queue for any generic type T.
56
type Queue[T any] interface {
57
        // Len returns the number of tasks in the queue.
58
        Len() (uint64, error)
59

60
        // Push pushes new T items to the tail of the queue.
61
        Push(items ...T) error
62

63
        // PopUpTo attempts to pop up to n items from the head of the queue. If
64
        // no more items are in the queue then ErrEmptyQueue is returned.
65
        PopUpTo(n int) ([]T, error)
66

67
        // PushHead pushes new T items to the head of the queue.
68
        PushHead(items ...T) error
69
}
70

71
// Serializable is an interface must be satisfied for any type that the
72
// DiskQueueDB should handle.
73
type Serializable interface {
74
        Encode(w io.Writer) error
75
        Decode(r io.Reader) error
76
}
77

78
// DiskQueueDB is a generic Bolt DB implementation of the Queue interface.
79
type DiskQueueDB[T Serializable] struct {
80
        db          kvdb.Backend
81
        topLevelBkt []byte
82
        constructor func() T
83
        onItemWrite func(tx kvdb.RwTx, item T) error
84
}
85

86
// A compile-time check to ensure that DiskQueueDB implements the Queue
87
// interface.
88
var _ Queue[Serializable] = (*DiskQueueDB[Serializable])(nil)
89

90
// NewQueueDB constructs a new DiskQueueDB. A queueBktName must be provided so
91
// that the DiskQueueDB can create its own namespace in the bolt db.
92
func NewQueueDB[T Serializable](db kvdb.Backend, queueBktName []byte,
93
        constructor func() T,
94
        onItemWrite func(tx kvdb.RwTx, item T) error) Queue[T] {
43✔
95

43✔
96
        return &DiskQueueDB[T]{
43✔
97
                db:          db,
43✔
98
                topLevelBkt: queueBktName,
43✔
99
                constructor: constructor,
43✔
100
                onItemWrite: onItemWrite,
43✔
101
        }
43✔
102
}
43✔
103

104
// Len returns the number of tasks in the queue.
105
//
106
// NOTE: This is part of the Queue interface.
107
func (d *DiskQueueDB[T]) Len() (uint64, error) {
69✔
108
        var res uint64
69✔
109
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
138✔
110
                var err error
69✔
111
                res, err = d.len(tx)
69✔
112

69✔
113
                return err
69✔
114
        }, func() {
138✔
115
                res = 0
69✔
116
        })
69✔
117
        if err != nil {
69✔
118
                return 0, err
×
119
        }
×
120

121
        return res, nil
69✔
122
}
123

124
// Push adds a T to the tail of the queue.
125
//
126
// NOTE: This is part of the Queue interface.
127
func (d *DiskQueueDB[T]) Push(items ...T) error {
533✔
128
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
1,066✔
129
                for _, item := range items {
181,813✔
130
                        err := d.addItem(tx, queueMainBkt, item)
181,280✔
131
                        if err != nil {
181,280✔
132
                                return err
×
133
                        }
×
134
                }
135

136
                return nil
533✔
137
        }, func() {})
533✔
138
}
139

140
// PopUpTo attempts to pop up to n items from the queue. If the queue is empty,
141
// then ErrEmptyQueue is returned.
142
//
143
// NOTE: This is part of the Queue interface.
144
func (d *DiskQueueDB[T]) PopUpTo(n int) ([]T, error) {
720✔
145
        var items []T
720✔
146

720✔
147
        err := d.db.Update(func(tx walletdb.ReadWriteTx) error {
1,440✔
148
                // Get the number of items in the queue.
720✔
149
                l, err := d.len(tx)
720✔
150
                if err != nil {
720✔
151
                        return err
×
152
                }
×
153

154
                // If there are no items, then we are done.
155
                if l == 0 {
791✔
156
                        return ErrEmptyQueue
71✔
157
                }
71✔
158

159
                // If the number of items in the queue is less than the maximum
160
                // specified by the caller, then set the maximum to the number
161
                // of items that there actually are.
162
                num := n
649✔
163
                if l < uint64(n) {
649✔
UNCOV
164
                        num = int(l)
×
UNCOV
165
                }
×
166

167
                // Pop the specified number of items off of the queue.
168
                items = make([]T, 0, num)
649✔
169
                for i := 0; i < num; i++ {
195,353✔
170
                        item, err := d.pop(tx)
194,704✔
171
                        if err != nil {
194,704✔
172
                                return err
×
173
                        }
×
174

175
                        items = append(items, item)
194,704✔
176
                }
177

178
                return err
649✔
179
        }, func() {
720✔
180
                items = nil
720✔
181
        })
720✔
182
        if err != nil {
791✔
183
                return nil, err
71✔
184
        }
71✔
185

186
        return items, nil
649✔
187
}
188

189
// PushHead pushes new T items to the head of the queue. For this implementation
190
// of the Queue interface, this will require popping all items currently in the
191
// head queue and adding them after first adding the given list of items. Care
192
// should thus be taken to never have an unbounded number of items in the head
193
// queue.
194
//
195
// NOTE: This is part of the Queue interface.
196
func (d *DiskQueueDB[T]) PushHead(items ...T) error {
58✔
197
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
116✔
198
                // Determine how many items are still in the head queue.
58✔
199
                numHead, err := d.numItems(tx, queueHeadBkt)
58✔
200
                if err != nil {
58✔
201
                        return err
×
202
                }
×
203

204
                // Create a new in-memory list that will contain all the new
205
                // items along with the items currently in the queue.
206
                itemList := make([]T, 0, int(numHead)+len(items))
58✔
207

58✔
208
                // Insert all the given items into the list first since these
58✔
209
                // should be at the head of the queue.
58✔
210
                itemList = append(itemList, items...)
58✔
211

58✔
212
                // Now, read out all the items that are currently in the
58✔
213
                // persisted head queue and add them to the back of the list
58✔
214
                // of items to be added.
58✔
215
                for {
121✔
216
                        t, err := d.nextItem(tx, queueHeadBkt)
63✔
217
                        if errors.Is(err, ErrEmptyQueue) {
121✔
218
                                break
58✔
219
                        } else if err != nil {
5✔
220
                                return err
×
221
                        }
×
222

223
                        itemList = append(itemList, t)
5✔
224
                }
225

226
                // Now the head queue is empty, the items can be pushed to the
227
                // queue.
228
                for _, item := range itemList {
13,487✔
229
                        err := d.addItem(tx, queueHeadBkt, item)
13,429✔
230
                        if err != nil {
13,429✔
231
                                return err
×
232
                        }
×
233
                }
234

235
                return nil
58✔
236
        }, func() {})
58✔
237
}
238

239
// pop gets the next T item from the head of the queue. If no more items are in
240
// the queue then ErrEmptyQueue is returned.
241
func (d *DiskQueueDB[T]) pop(tx walletdb.ReadWriteTx) (T, error) {
194,704✔
242
        // First, check if there are items left in the head queue.
194,704✔
243
        item, err := d.nextItem(tx, queueHeadBkt)
194,704✔
244

194,704✔
245
        // No error means that an item was found in the head queue.
194,704✔
246
        if err == nil {
208,128✔
247
                return item, nil
13,424✔
248
        }
13,424✔
249

250
        // Else, if error is not ErrEmptyQueue, then return the error.
251
        if !errors.Is(err, ErrEmptyQueue) {
181,280✔
252
                return item, err
×
253
        }
×
254

255
        // Otherwise, the head queue is empty, so we now check if there are
256
        // items in the main queue.
257
        return d.nextItem(tx, queueMainBkt)
181,280✔
258
}
259

260
// addItem adds the given item to the back of the given queue.
261
func (d *DiskQueueDB[T]) addItem(tx kvdb.RwTx, queueName []byte, item T) error {
194,709✔
262
        var (
194,709✔
263
                namespacedBkt = tx.ReadWriteBucket(d.topLevelBkt)
194,709✔
264
                err           error
194,709✔
265
        )
194,709✔
266
        if namespacedBkt == nil {
194,736✔
267
                namespacedBkt, err = tx.CreateTopLevelBucket(d.topLevelBkt)
27✔
268
                if err != nil {
27✔
269
                        return err
×
270
                }
×
271
        }
272

273
        mainTasksBucket, err := namespacedBkt.CreateBucketIfNotExists(
194,709✔
274
                cTaskQueue,
194,709✔
275
        )
194,709✔
276
        if err != nil {
194,709✔
277
                return err
×
278
        }
×
279

280
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
194,709✔
281
        if err != nil {
194,709✔
282
                return err
×
283
        }
×
284

285
        if d.onItemWrite != nil {
389,418✔
286
                err = d.onItemWrite(tx, item)
194,709✔
287
                if err != nil {
194,709✔
288
                        return err
×
289
                }
×
290
        }
291

292
        // Find the index to use for placing this new item at the back of the
293
        // queue.
294
        var nextIndex uint64
194,709✔
295
        nextIndexB := bucket.Get(nextIndexKey)
194,709✔
296
        if nextIndexB != nil {
389,390✔
297
                nextIndex, err = readBigSize(nextIndexB)
194,681✔
298
                if err != nil {
194,681✔
299
                        return err
×
300
                }
×
301
        } else {
28✔
302
                nextIndexB, err = writeBigSize(0)
28✔
303
                if err != nil {
28✔
304
                        return err
×
305
                }
×
306
        }
307

308
        tasksBucket, err := bucket.CreateBucketIfNotExists(itemsBkt)
194,709✔
309
        if err != nil {
194,709✔
310
                return err
×
311
        }
×
312

313
        var buff bytes.Buffer
194,709✔
314
        err = item.Encode(&buff)
194,709✔
315
        if err != nil {
194,709✔
316
                return err
×
317
        }
×
318

319
        // Put the new task in the assigned index.
320
        err = tasksBucket.Put(nextIndexB, buff.Bytes())
194,709✔
321
        if err != nil {
194,709✔
322
                return err
×
323
        }
×
324

325
        // Increment the next-index counter.
326
        nextIndex++
194,709✔
327
        nextIndexB, err = writeBigSize(nextIndex)
194,709✔
328
        if err != nil {
194,709✔
329
                return err
×
330
        }
×
331

332
        return bucket.Put(nextIndexKey, nextIndexB)
194,709✔
333
}
334

335
// nextItem pops an item of the queue identified by the given namespace. If
336
// there are no items on the queue then ErrEmptyQueue is returned.
337
func (d *DiskQueueDB[T]) nextItem(tx kvdb.RwTx, queueName []byte) (T, error) {
376,047✔
338
        task := d.constructor()
376,047✔
339

376,047✔
340
        namespacedBkt := tx.ReadWriteBucket(d.topLevelBkt)
376,047✔
341
        if namespacedBkt == nil {
376,055✔
342
                return task, ErrEmptyQueue
8✔
343
        }
8✔
344

345
        mainTasksBucket := namespacedBkt.NestedReadWriteBucket(cTaskQueue)
376,039✔
346
        if mainTasksBucket == nil {
376,039✔
347
                return task, ErrEmptyQueue
×
348
        }
×
349

350
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
376,039✔
351
        if err != nil {
376,039✔
352
                return task, err
×
353
        }
×
354

355
        // Get the index of the tail of the queue.
356
        var nextIndex uint64
376,039✔
357
        nextIndexB := bucket.Get(nextIndexKey)
376,039✔
358
        if nextIndexB != nil {
752,052✔
359
                nextIndex, err = readBigSize(nextIndexB)
376,013✔
360
                if err != nil {
376,013✔
361
                        return task, err
×
362
                }
×
363
        }
364

365
        // Get the index of the head of the queue.
366
        var oldestIndex uint64
376,039✔
367
        oldestIndexB := bucket.Get(oldestIndexKey)
376,039✔
368
        if oldestIndexB != nil {
752,024✔
369
                oldestIndex, err = readBigSize(oldestIndexB)
375,985✔
370
                if err != nil {
375,985✔
371
                        return task, err
×
372
                }
×
373
        } else {
54✔
374
                oldestIndexB, err = writeBigSize(0)
54✔
375
                if err != nil {
54✔
376
                        return task, err
×
377
                }
×
378
        }
379

380
        // If the head and tail are equal, then there are no items in the queue.
381
        if oldestIndex == nextIndex {
557,369✔
382
                // Take this opportunity to reset both indexes to zero.
181,330✔
383
                zeroIndexB, err := writeBigSize(0)
181,330✔
384
                if err != nil {
181,330✔
385
                        return task, err
×
386
                }
×
387

388
                err = bucket.Put(oldestIndexKey, zeroIndexB)
181,330✔
389
                if err != nil {
181,330✔
390
                        return task, err
×
391
                }
×
392

393
                err = bucket.Put(nextIndexKey, zeroIndexB)
181,330✔
394
                if err != nil {
181,330✔
395
                        return task, err
×
396
                }
×
397

398
                return task, ErrEmptyQueue
181,330✔
399
        }
400

401
        // Otherwise, pop the item at the oldest index.
402
        tasksBucket := bucket.NestedReadWriteBucket(itemsBkt)
194,709✔
403
        if tasksBucket == nil {
194,709✔
404
                return task, fmt.Errorf("client-tasks bucket not found")
×
405
        }
×
406

407
        item := tasksBucket.Get(oldestIndexB)
194,709✔
408
        if item == nil {
194,709✔
409
                return task, fmt.Errorf("no task found under index")
×
410
        }
×
411

412
        err = tasksBucket.Delete(oldestIndexB)
194,709✔
413
        if err != nil {
194,709✔
414
                return task, err
×
415
        }
×
416

417
        // Increment the oldestIndex value so that it now points to the new
418
        // oldest item.
419
        oldestIndex++
194,709✔
420
        oldestIndexB, err = writeBigSize(oldestIndex)
194,709✔
421
        if err != nil {
194,709✔
422
                return task, err
×
423
        }
×
424

425
        err = bucket.Put(oldestIndexKey, oldestIndexB)
194,709✔
426
        if err != nil {
194,709✔
427
                return task, err
×
428
        }
×
429

430
        if err = task.Decode(bytes.NewBuffer(item)); err != nil {
194,709✔
431
                return task, err
×
432
        }
×
433

434
        return task, nil
194,709✔
435
}
436

437
// len returns the number of items in the queue. This will be the addition of
438
// the number of items in the main queue and the number in the head queue.
439
func (d *DiskQueueDB[T]) len(tx kvdb.RTx) (uint64, error) {
789✔
440
        numMain, err := d.numItems(tx, queueMainBkt)
789✔
441
        if err != nil {
789✔
442
                return 0, err
×
443
        }
×
444

445
        numHead, err := d.numItems(tx, queueHeadBkt)
789✔
446
        if err != nil {
789✔
447
                return 0, err
×
448
        }
×
449

450
        return numMain + numHead, nil
789✔
451
}
452

453
// numItems returns the number of items in the given queue.
454
func (d *DiskQueueDB[T]) numItems(tx kvdb.RTx, queueName []byte) (uint64,
455
        error) {
1,630✔
456

1,630✔
457
        // Get the queue bucket at the correct namespace.
1,630✔
458
        namespacedBkt := tx.ReadBucket(d.topLevelBkt)
1,630✔
459
        if namespacedBkt == nil {
1,704✔
460
                return 0, nil
74✔
461
        }
74✔
462

463
        mainTasksBucket := namespacedBkt.NestedReadBucket(cTaskQueue)
1,556✔
464
        if mainTasksBucket == nil {
1,556✔
465
                return 0, nil
×
466
        }
×
467

468
        bucket := mainTasksBucket.NestedReadBucket(queueName)
1,556✔
469
        if bucket == nil {
1,583✔
470
                return 0, nil
27✔
471
        }
27✔
472

473
        var (
1,529✔
474
                oldestIndex uint64
1,529✔
475
                nextIndex   uint64
1,529✔
476
                err         error
1,529✔
477
        )
1,529✔
478

1,529✔
479
        // Get the next index key.
1,529✔
480
        nextIndexB := bucket.Get(nextIndexKey)
1,529✔
481
        if nextIndexB != nil {
3,058✔
482
                nextIndex, err = readBigSize(nextIndexB)
1,529✔
483
                if err != nil {
1,529✔
484
                        return 0, err
×
485
                }
×
486
        }
487

488
        // Get the oldest index.
489
        oldestIndexB := bucket.Get(oldestIndexKey)
1,529✔
490
        if oldestIndexB != nil {
3,025✔
491
                oldestIndex, err = readBigSize(oldestIndexB)
1,496✔
492
                if err != nil {
1,496✔
493
                        return 0, err
×
494
                }
×
495
        }
496

497
        return nextIndex - oldestIndex, nil
1,529✔
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc