• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13035292482

29 Jan 2025 03:59PM UTC coverage: 49.3% (-9.5%) from 58.777%
13035292482

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

100634 of 204126 relevant lines covered (49.3%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.84
/watchtower/wtdb/queue.go
1
package wtdb
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "io"
8

9
        "github.com/btcsuite/btcwallet/walletdb"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
)
12

13
var (
14
        // queueMainBkt will hold the main queue contents. It will have the
15
        // following structure:
16
        //                         => oldestIndexKey => oldest index
17
        //                      => nextIndexKey => newest index
18
        //                        => itemsBkt => <index> -> item
19
        //
20
        // Any items added to the queue via Push, will be added to this queue.
21
        // Items will only be popped from this queue if the head queue is empty.
22
        queueMainBkt = []byte("queue-main")
23

24
        // queueHeadBkt will hold the items that have been pushed to the head
25
        // of the queue. It will have the following structure:
26
        //                         => oldestIndexKey => oldest index
27
        //                      => nextIndexKey => newest index
28
        //                        => itemsBkt => <index> -> item
29
        //
30
        // If PushHead is called with a new set of items, then first all
31
        // remaining items in the head queue will be popped and added ot the
32
        // given set of items. Then, once the head queue is empty, the set of
33
        // items will be pushed to the queue. If this queue is not empty, then
34
        // Pop will pop items from this queue before popping from the main
35
        // queue.
36
        queueHeadBkt = []byte("queue-head")
37

38
        // itemsBkt is a sub-bucket of both the main and head queue storing:
39
        //                 index -> encoded item
40
        itemsBkt = []byte("items")
41

42
        // oldestIndexKey is a key of both the main and head queue storing the
43
        // index of the item at the head of the queue.
44
        oldestIndexKey = []byte("oldest-index")
45

46
        // nextIndexKey is a key of both the main and head queue storing the
47
        // index of the item at the tail of the queue.
48
        nextIndexKey = []byte("next-index")
49

50
        // ErrEmptyQueue is returned from Pop if there are no items left in
51
        // the queue.
52
        ErrEmptyQueue = errors.New("queue is empty")
53
)
54

55
// Queue is an interface describing a FIFO queue for any generic type T.
56
type Queue[T any] interface {
57
        // Len returns the number of tasks in the queue.
58
        Len() (uint64, error)
59

60
        // Push pushes new T items to the tail of the queue.
61
        Push(items ...T) error
62

63
        // PopUpTo attempts to pop up to n items from the head of the queue. If
64
        // no more items are in the queue then ErrEmptyQueue is returned.
65
        PopUpTo(n int) ([]T, error)
66

67
        // PushHead pushes new T items to the head of the queue.
68
        PushHead(items ...T) error
69
}
70

71
// Serializable is an interface must be satisfied for any type that the
72
// DiskQueueDB should handle.
73
type Serializable interface {
74
        Encode(w io.Writer) error
75
        Decode(r io.Reader) error
76
}
77

78
// DiskQueueDB is a generic Bolt DB implementation of the Queue interface.
79
type DiskQueueDB[T Serializable] struct {
80
        db          kvdb.Backend
81
        topLevelBkt []byte
82
        constructor func() T
83
        onItemWrite func(tx kvdb.RwTx, item T) error
84
}
85

86
// A compile-time check to ensure that DiskQueueDB implements the Queue
87
// interface.
88
var _ Queue[Serializable] = (*DiskQueueDB[Serializable])(nil)
89

90
// NewQueueDB constructs a new DiskQueueDB. A queueBktName must be provided so
91
// that the DiskQueueDB can create its own namespace in the bolt db.
92
func NewQueueDB[T Serializable](db kvdb.Backend, queueBktName []byte,
93
        constructor func() T,
94
        onItemWrite func(tx kvdb.RwTx, item T) error) Queue[T] {
3✔
95

3✔
96
        return &DiskQueueDB[T]{
3✔
97
                db:          db,
3✔
98
                topLevelBkt: queueBktName,
3✔
99
                constructor: constructor,
3✔
100
                onItemWrite: onItemWrite,
3✔
101
        }
3✔
102
}
3✔
103

104
// Len returns the number of tasks in the queue.
105
//
106
// NOTE: This is part of the Queue interface.
107
func (d *DiskQueueDB[T]) Len() (uint64, error) {
3✔
108
        var res uint64
3✔
109
        err := kvdb.View(d.db, func(tx kvdb.RTx) error {
6✔
110
                var err error
3✔
111
                res, err = d.len(tx)
3✔
112

3✔
113
                return err
3✔
114
        }, func() {
6✔
115
                res = 0
3✔
116
        })
3✔
117
        if err != nil {
3✔
118
                return 0, err
×
119
        }
×
120

121
        return res, nil
3✔
122
}
123

124
// Push adds a T to the tail of the queue.
125
//
126
// NOTE: This is part of the Queue interface.
127
func (d *DiskQueueDB[T]) Push(items ...T) error {
3✔
128
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
6✔
129
                for _, item := range items {
3✔
130
                        err := d.addItem(tx, queueMainBkt, item)
×
131
                        if err != nil {
×
132
                                return err
×
133
                        }
×
134
                }
135

136
                return nil
3✔
137
        }, func() {})
3✔
138
}
139

140
// PopUpTo attempts to pop up to n items from the queue. If the queue is empty,
141
// then ErrEmptyQueue is returned.
142
//
143
// NOTE: This is part of the Queue interface.
144
func (d *DiskQueueDB[T]) PopUpTo(n int) ([]T, error) {
×
145
        var items []T
×
146

×
147
        err := d.db.Update(func(tx walletdb.ReadWriteTx) error {
×
148
                // Get the number of items in the queue.
×
149
                l, err := d.len(tx)
×
150
                if err != nil {
×
151
                        return err
×
152
                }
×
153

154
                // If there are no items, then we are done.
155
                if l == 0 {
×
156
                        return ErrEmptyQueue
×
157
                }
×
158

159
                // If the number of items in the queue is less than the maximum
160
                // specified by the caller, then set the maximum to the number
161
                // of items that there actually are.
162
                num := n
×
163
                if l < uint64(n) {
×
164
                        num = int(l)
×
165
                }
×
166

167
                // Pop the specified number of items off of the queue.
168
                items = make([]T, 0, num)
×
169
                for i := 0; i < num; i++ {
×
170
                        item, err := d.pop(tx)
×
171
                        if err != nil {
×
172
                                return err
×
173
                        }
×
174

175
                        items = append(items, item)
×
176
                }
177

178
                return err
×
179
        }, func() {
×
180
                items = nil
×
181
        })
×
182
        if err != nil {
×
183
                return nil, err
×
184
        }
×
185

186
        return items, nil
×
187
}
188

189
// PushHead pushes new T items to the head of the queue. For this implementation
190
// of the Queue interface, this will require popping all items currently in the
191
// head queue and adding them after first adding the given list of items. Care
192
// should thus be taken to never have an unbounded number of items in the head
193
// queue.
194
//
195
// NOTE: This is part of the Queue interface.
196
func (d *DiskQueueDB[T]) PushHead(items ...T) error {
3✔
197
        return d.db.Update(func(tx walletdb.ReadWriteTx) error {
6✔
198
                // Determine how many items are still in the head queue.
3✔
199
                numHead, err := d.numItems(tx, queueHeadBkt)
3✔
200
                if err != nil {
3✔
201
                        return err
×
202
                }
×
203

204
                // Create a new in-memory list that will contain all the new
205
                // items along with the items currently in the queue.
206
                itemList := make([]T, 0, int(numHead)+len(items))
3✔
207

3✔
208
                // Insert all the given items into the list first since these
3✔
209
                // should be at the head of the queue.
3✔
210
                itemList = append(itemList, items...)
3✔
211

3✔
212
                // Now, read out all the items that are currently in the
3✔
213
                // persisted head queue and add them to the back of the list
3✔
214
                // of items to be added.
3✔
215
                for {
6✔
216
                        t, err := d.nextItem(tx, queueHeadBkt)
3✔
217
                        if errors.Is(err, ErrEmptyQueue) {
6✔
218
                                break
3✔
219
                        } else if err != nil {
×
220
                                return err
×
221
                        }
×
222

223
                        itemList = append(itemList, t)
×
224
                }
225

226
                // Now the head queue is empty, the items can be pushed to the
227
                // queue.
228
                for _, item := range itemList {
3✔
229
                        err := d.addItem(tx, queueHeadBkt, item)
×
230
                        if err != nil {
×
231
                                return err
×
232
                        }
×
233
                }
234

235
                return nil
3✔
236
        }, func() {})
3✔
237
}
238

239
// pop gets the next T item from the head of the queue. If no more items are in
240
// the queue then ErrEmptyQueue is returned.
241
func (d *DiskQueueDB[T]) pop(tx walletdb.ReadWriteTx) (T, error) {
×
242
        // First, check if there are items left in the head queue.
×
243
        item, err := d.nextItem(tx, queueHeadBkt)
×
244

×
245
        // No error means that an item was found in the head queue.
×
246
        if err == nil {
×
247
                return item, nil
×
248
        }
×
249

250
        // Else, if error is not ErrEmptyQueue, then return the error.
251
        if !errors.Is(err, ErrEmptyQueue) {
×
252
                return item, err
×
253
        }
×
254

255
        // Otherwise, the head queue is empty, so we now check if there are
256
        // items in the main queue.
257
        return d.nextItem(tx, queueMainBkt)
×
258
}
259

260
// addItem adds the given item to the back of the given queue.
261
func (d *DiskQueueDB[T]) addItem(tx kvdb.RwTx, queueName []byte, item T) error {
×
262
        var (
×
263
                namespacedBkt = tx.ReadWriteBucket(d.topLevelBkt)
×
264
                err           error
×
265
        )
×
266
        if namespacedBkt == nil {
×
267
                namespacedBkt, err = tx.CreateTopLevelBucket(d.topLevelBkt)
×
268
                if err != nil {
×
269
                        return err
×
270
                }
×
271
        }
272

273
        mainTasksBucket, err := namespacedBkt.CreateBucketIfNotExists(
×
274
                cTaskQueue,
×
275
        )
×
276
        if err != nil {
×
277
                return err
×
278
        }
×
279

280
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
×
281
        if err != nil {
×
282
                return err
×
283
        }
×
284

285
        if d.onItemWrite != nil {
×
286
                err = d.onItemWrite(tx, item)
×
287
                if err != nil {
×
288
                        return err
×
289
                }
×
290
        }
291

292
        // Find the index to use for placing this new item at the back of the
293
        // queue.
294
        var nextIndex uint64
×
295
        nextIndexB := bucket.Get(nextIndexKey)
×
296
        if nextIndexB != nil {
×
297
                nextIndex, err = readBigSize(nextIndexB)
×
298
                if err != nil {
×
299
                        return err
×
300
                }
×
301
        } else {
×
302
                nextIndexB, err = writeBigSize(0)
×
303
                if err != nil {
×
304
                        return err
×
305
                }
×
306
        }
307

308
        tasksBucket, err := bucket.CreateBucketIfNotExists(itemsBkt)
×
309
        if err != nil {
×
310
                return err
×
311
        }
×
312

313
        var buff bytes.Buffer
×
314
        err = item.Encode(&buff)
×
315
        if err != nil {
×
316
                return err
×
317
        }
×
318

319
        // Put the new task in the assigned index.
320
        err = tasksBucket.Put(nextIndexB, buff.Bytes())
×
321
        if err != nil {
×
322
                return err
×
323
        }
×
324

325
        // Increment the next-index counter.
326
        nextIndex++
×
327
        nextIndexB, err = writeBigSize(nextIndex)
×
328
        if err != nil {
×
329
                return err
×
330
        }
×
331

332
        return bucket.Put(nextIndexKey, nextIndexB)
×
333
}
334

335
// nextItem pops an item of the queue identified by the given namespace. If
336
// there are no items on the queue then ErrEmptyQueue is returned.
337
func (d *DiskQueueDB[T]) nextItem(tx kvdb.RwTx, queueName []byte) (T, error) {
3✔
338
        task := d.constructor()
3✔
339

3✔
340
        namespacedBkt := tx.ReadWriteBucket(d.topLevelBkt)
3✔
341
        if namespacedBkt == nil {
6✔
342
                return task, ErrEmptyQueue
3✔
343
        }
3✔
344

345
        mainTasksBucket := namespacedBkt.NestedReadWriteBucket(cTaskQueue)
×
346
        if mainTasksBucket == nil {
×
347
                return task, ErrEmptyQueue
×
348
        }
×
349

350
        bucket, err := mainTasksBucket.CreateBucketIfNotExists(queueName)
×
351
        if err != nil {
×
352
                return task, err
×
353
        }
×
354

355
        // Get the index of the tail of the queue.
356
        var nextIndex uint64
×
357
        nextIndexB := bucket.Get(nextIndexKey)
×
358
        if nextIndexB != nil {
×
359
                nextIndex, err = readBigSize(nextIndexB)
×
360
                if err != nil {
×
361
                        return task, err
×
362
                }
×
363
        }
364

365
        // Get the index of the head of the queue.
366
        var oldestIndex uint64
×
367
        oldestIndexB := bucket.Get(oldestIndexKey)
×
368
        if oldestIndexB != nil {
×
369
                oldestIndex, err = readBigSize(oldestIndexB)
×
370
                if err != nil {
×
371
                        return task, err
×
372
                }
×
373
        } else {
×
374
                oldestIndexB, err = writeBigSize(0)
×
375
                if err != nil {
×
376
                        return task, err
×
377
                }
×
378
        }
379

380
        // If the head and tail are equal, then there are no items in the queue.
381
        if oldestIndex == nextIndex {
×
382
                // Take this opportunity to reset both indexes to zero.
×
383
                zeroIndexB, err := writeBigSize(0)
×
384
                if err != nil {
×
385
                        return task, err
×
386
                }
×
387

388
                err = bucket.Put(oldestIndexKey, zeroIndexB)
×
389
                if err != nil {
×
390
                        return task, err
×
391
                }
×
392

393
                err = bucket.Put(nextIndexKey, zeroIndexB)
×
394
                if err != nil {
×
395
                        return task, err
×
396
                }
×
397

398
                return task, ErrEmptyQueue
×
399
        }
400

401
        // Otherwise, pop the item at the oldest index.
402
        tasksBucket := bucket.NestedReadWriteBucket(itemsBkt)
×
403
        if tasksBucket == nil {
×
404
                return task, fmt.Errorf("client-tasks bucket not found")
×
405
        }
×
406

407
        item := tasksBucket.Get(oldestIndexB)
×
408
        if item == nil {
×
409
                return task, fmt.Errorf("no task found under index")
×
410
        }
×
411

412
        err = tasksBucket.Delete(oldestIndexB)
×
413
        if err != nil {
×
414
                return task, err
×
415
        }
×
416

417
        // Increment the oldestIndex value so that it now points to the new
418
        // oldest item.
419
        oldestIndex++
×
420
        oldestIndexB, err = writeBigSize(oldestIndex)
×
421
        if err != nil {
×
422
                return task, err
×
423
        }
×
424

425
        err = bucket.Put(oldestIndexKey, oldestIndexB)
×
426
        if err != nil {
×
427
                return task, err
×
428
        }
×
429

430
        if err = task.Decode(bytes.NewBuffer(item)); err != nil {
×
431
                return task, err
×
432
        }
×
433

434
        return task, nil
×
435
}
436

437
// len returns the number of items in the queue. This will be the addition of
438
// the number of items in the main queue and the number in the head queue.
439
func (d *DiskQueueDB[T]) len(tx kvdb.RTx) (uint64, error) {
3✔
440
        numMain, err := d.numItems(tx, queueMainBkt)
3✔
441
        if err != nil {
3✔
442
                return 0, err
×
443
        }
×
444

445
        numHead, err := d.numItems(tx, queueHeadBkt)
3✔
446
        if err != nil {
3✔
447
                return 0, err
×
448
        }
×
449

450
        return numMain + numHead, nil
3✔
451
}
452

453
// numItems returns the number of items in the given queue.
454
func (d *DiskQueueDB[T]) numItems(tx kvdb.RTx, queueName []byte) (uint64,
455
        error) {
3✔
456

3✔
457
        // Get the queue bucket at the correct namespace.
3✔
458
        namespacedBkt := tx.ReadBucket(d.topLevelBkt)
3✔
459
        if namespacedBkt == nil {
6✔
460
                return 0, nil
3✔
461
        }
3✔
462

463
        mainTasksBucket := namespacedBkt.NestedReadBucket(cTaskQueue)
×
464
        if mainTasksBucket == nil {
×
465
                return 0, nil
×
466
        }
×
467

468
        bucket := mainTasksBucket.NestedReadBucket(queueName)
×
469
        if bucket == nil {
×
470
                return 0, nil
×
471
        }
×
472

473
        var (
×
474
                oldestIndex uint64
×
475
                nextIndex   uint64
×
476
                err         error
×
477
        )
×
478

×
479
        // Get the next index key.
×
480
        nextIndexB := bucket.Get(nextIndexKey)
×
481
        if nextIndexB != nil {
×
482
                nextIndex, err = readBigSize(nextIndexB)
×
483
                if err != nil {
×
484
                        return 0, err
×
485
                }
×
486
        }
487

488
        // Get the oldest index.
489
        oldestIndexB := bucket.Get(oldestIndexKey)
×
490
        if oldestIndexB != nil {
×
491
                oldestIndex, err = readBigSize(oldestIndexB)
×
492
                if err != nil {
×
493
                        return 0, err
×
494
                }
×
495
        }
496

497
        return nextIndex - oldestIndex, nil
×
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc