• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13035292482

29 Jan 2025 03:59PM UTC coverage: 49.3% (-9.5%) from 58.777%
13035292482

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

100634 of 204126 relevant lines covered (49.3%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.01
/watchtower/wtclient/queue.go
1
package wtclient
2

3
import (
4
        "container/list"
5
        "errors"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
12
)
13

14
const (
15
        // dbErrorBackoff is the length of time we will back off before retrying
16
        // any DB action that failed.
17
        dbErrorBackoff = time.Second * 5
18
)
19

20
// internalTask wraps a BackupID task with a success channel.
21
type internalTask[T any] struct {
22
        task    T
23
        success chan bool
24
}
25

26
// newInternalTask creates a new internalTask with the given task.
27
func newInternalTask[T any](task T) *internalTask[T] {
3✔
28
        return &internalTask[T]{
3✔
29
                task:    task,
3✔
30
                success: make(chan bool),
3✔
31
        }
3✔
32
}
3✔
33

34
// DiskOverflowQueue is a queue that must be initialised with a certain maximum
35
// buffer size which represents the maximum number of elements that the queue
36
// should hold in memory. If the queue is full, then any new elements added to
37
// the queue will be persisted to disk instead. Once a consumer starts reading
38
// from the front of the queue again then items on disk will be moved into the
39
// queue again. The queue is also re-start safe. When it is stopped, any items
40
// in the memory queue, will be persisted to disk. On start up, the queue will
41
// be re-initialised with the items on disk.
42
type DiskOverflowQueue[T any] struct {
43
        startOnce sync.Once
44
        stopOnce  sync.Once
45

46
        log btclog.Logger
47

48
        // db is the database that will be used to persist queue items to disk.
49
        db wtdb.Queue[T]
50

51
        // toDisk represents the current mode of operation of the queue.
52
        toDisk atomic.Bool
53

54
        // We used an unbound list for the input of the queue so that producers
55
        // putting items into the queue are never blocked.
56
        inputListMu   sync.Mutex
57
        inputListCond *sync.Cond
58
        inputList     *list.List
59

60
        // inputChan is an unbuffered channel used to pass items from
61
        // drainInputList to feedMemQueue.
62
        inputChan chan *internalTask[T]
63

64
        // memQueue is a buffered channel used to pass items from
65
        // feedMemQueue to feedOutputChan.
66
        memQueue chan T
67

68
        // outputChan is an unbuffered channel from which items at the head of
69
        // the queue can be read.
70
        outputChan chan T
71

72
        // newDiskItemSignal is used to signal that there is a new item in the
73
        // main disk queue. There should only be one reader and one writer for
74
        // this channel.
75
        newDiskItemSignal chan struct{}
76

77
        // leftOverItem1 will be a non-nil task on shutdown if the
78
        // feedOutputChan method was holding an unhandled tasks at shutdown
79
        // time. Since feedOutputChan handles the very head of the queue, this
80
        // item should be the first to be reloaded on restart.
81
        leftOverItem1 *T
82

83
        // leftOverItems2 will be non-empty on shutdown if the feedMemQueue
84
        // method was holding any unhandled tasks at shutdown time. Since
85
        // feedMemQueue manages the input to the queue, the tasks should be
86
        // pushed to the head of the disk queue.
87
        leftOverItems2 []T
88

89
        // leftOverItem3 will be non-nil on shutdown if drainInputList was
90
        // holding an unhandled task at shutdown time. This task should be put
91
        // at the tail of the disk queue but should come before any input list
92
        // task.
93
        leftOverItem3 *T
94

95
        quit chan struct{}
96
        wg   sync.WaitGroup
97
}
98

99
// NewDiskOverflowQueue constructs a new DiskOverflowQueue.
100
func NewDiskOverflowQueue[T any](db wtdb.Queue[T], maxQueueSize uint64,
101
        logger btclog.Logger) (*DiskOverflowQueue[T], error) {
3✔
102

3✔
103
        if maxQueueSize < 2 {
3✔
104
                return nil, errors.New("the in-memory queue buffer size " +
×
105
                        "must be larger than 2")
×
106
        }
×
107

108
        q := &DiskOverflowQueue[T]{
3✔
109
                log:               logger,
3✔
110
                db:                db,
3✔
111
                inputList:         list.New(),
3✔
112
                newDiskItemSignal: make(chan struct{}, 1),
3✔
113
                inputChan:         make(chan *internalTask[T]),
3✔
114
                memQueue:          make(chan T, maxQueueSize-2),
3✔
115
                outputChan:        make(chan T),
3✔
116
                quit:              make(chan struct{}),
3✔
117
        }
3✔
118
        q.inputListCond = sync.NewCond(&q.inputListMu)
3✔
119

3✔
120
        return q, nil
3✔
121
}
122

123
// Start kicks off all the goroutines that are required to manage the queue.
124
func (q *DiskOverflowQueue[T]) Start() error {
3✔
125
        var err error
3✔
126
        q.startOnce.Do(func() {
6✔
127
                err = q.start()
3✔
128
        })
3✔
129

130
        return err
3✔
131
}
132

133
// start kicks off all the goroutines that are required to manage the queue.
134
func (q *DiskOverflowQueue[T]) start() error {
3✔
135
        numDisk, err := q.db.Len()
3✔
136
        if err != nil {
3✔
137
                return err
×
138
        }
×
139
        if numDisk != 0 {
3✔
140
                q.toDisk.Store(true)
×
141
        }
×
142

143
        // Kick off the three goroutines which will handle the input list, the
144
        // in-memory queue and the output channel.
145
        // The three goroutines are moving items according to the following
146
        // diagram:
147
        //
148
        //         ┌─────────┐ drainInputList  ┌──────────┐
149
        //         │inputList├─────┬──────────►│disk/db   │
150
        //         └─────────┘     │           └──────────┘
151
        //                         │ (depending on mode)
152
        //                         │           ┌──────────┐
153
        //                         └──────────►│inputChan │
154
        //                                     └──────────┘
155
        //
156
        //         ┌─────────┐ feedMemQueue    ┌──────────┐
157
        //         │disk/db  ├───────┬────────►│memQueue  │
158
        //         └─────────┘       │         └──────────┘
159
        //                           │ (depending on mode)
160
        //         ┌─────────┐       │
161
        //         │inputChan├───────┘
162
        //         └─────────┘
163
        //
164
        //         ┌─────────┐ feedOutputChan  ┌──────────┐
165
        //         │memQueue ├────────────────►│outputChan│
166
        //         └─────────┘                 └──────────┘
167
        //
168
        q.wg.Add(3)
3✔
169
        go q.drainInputList()
3✔
170
        go q.feedMemQueue()
3✔
171
        go q.feedOutputChan()
3✔
172

3✔
173
        return nil
3✔
174
}
175

176
// Stop stops the queue and persists any items in the memory queue to disk.
177
func (q *DiskOverflowQueue[T]) Stop() error {
3✔
178
        var err error
3✔
179
        q.stopOnce.Do(func() {
6✔
180
                err = q.stop()
3✔
181
        })
3✔
182

183
        return err
3✔
184
}
185

186
// stop the queue and persists any items in the memory queue to disk.
187
func (q *DiskOverflowQueue[T]) stop() error {
3✔
188
        close(q.quit)
3✔
189

3✔
190
        // Signal on the inputListCond until all the goroutines have returned.
3✔
191
        shutdown := make(chan struct{})
3✔
192
        go func() {
6✔
193
                for {
6✔
194
                        select {
3✔
195
                        case <-time.After(time.Millisecond):
3✔
196
                                q.inputListCond.Signal()
3✔
197
                        case <-shutdown:
3✔
198
                                return
3✔
199
                        }
200
                }
201
        }()
202

203
        q.wg.Wait()
3✔
204
        close(shutdown)
3✔
205

3✔
206
        // queueHead will be the items that we will be pushed to the head of
3✔
207
        // the queue.
3✔
208
        var queueHead []T
3✔
209

3✔
210
        // First, we append leftOverItem1 since this task is the current head
3✔
211
        // of the queue.
3✔
212
        if q.leftOverItem1 != nil {
3✔
213
                queueHead = append(queueHead, *q.leftOverItem1)
×
214
        }
×
215

216
        // Next, drain the buffered queue.
217
        for {
6✔
218
                task, ok := <-q.memQueue
3✔
219
                if !ok {
6✔
220
                        break
3✔
221
                }
222

223
                queueHead = append(queueHead, task)
×
224
        }
225

226
        // Then, any items held in leftOverItems2 would have been next to join
227
        // the memQueue. So those gets added next.
228
        if len(q.leftOverItems2) != 0 {
3✔
229
                queueHead = append(queueHead, q.leftOverItems2...)
×
230
        }
×
231

232
        // Now, push these items to the head of the queue.
233
        err := q.db.PushHead(queueHead...)
3✔
234
        if err != nil {
3✔
235
                q.log.Errorf("Could not add tasks to queue head: %v", err)
×
236
        }
×
237

238
        // Next we handle any items that need to be added to the main disk
239
        // queue.
240
        var diskQueue []T
3✔
241

3✔
242
        // Any item in leftOverItem3 is the first item that should join the
3✔
243
        // disk queue.
3✔
244
        if q.leftOverItem3 != nil {
3✔
245
                diskQueue = append(diskQueue, *q.leftOverItem3)
×
246
        }
×
247

248
        // Lastly, drain any items in the unbuffered input list.
249
        q.inputListCond.L.Lock()
3✔
250
        for q.inputList.Front() != nil {
3✔
251
                e := q.inputList.Front()
×
252

×
253
                //nolint:forcetypeassert
×
254
                task := q.inputList.Remove(e).(T)
×
255

×
256
                diskQueue = append(diskQueue, task)
×
257
        }
×
258
        q.inputListCond.L.Unlock()
3✔
259

3✔
260
        // Now persist these items to the main disk queue.
3✔
261
        err = q.db.Push(diskQueue...)
3✔
262
        if err != nil {
3✔
263
                q.log.Errorf("Could not add tasks to queue tail: %v", err)
×
264
        }
×
265

266
        return nil
3✔
267
}
268

269
// QueueBackupID adds a wtdb.BackupID to the queue. It will only return an error
270
// if the queue has been stopped. It is non-blocking.
271
func (q *DiskOverflowQueue[T]) QueueBackupID(item *wtdb.BackupID) error {
3✔
272
        // Return an error if the queue has been stopped
3✔
273
        select {
3✔
274
        case <-q.quit:
×
275
                return ErrClientExiting
×
276
        default:
3✔
277
        }
278

279
        // Add the new item to the unbound input list.
280
        q.inputListCond.L.Lock()
3✔
281
        q.inputList.PushBack(item)
3✔
282
        q.inputListCond.L.Unlock()
3✔
283

3✔
284
        // Signal that there is a new item in the input list.
3✔
285
        q.inputListCond.Signal()
3✔
286

3✔
287
        return nil
3✔
288
}
289

290
// NextBackupID can be used to read from the head of the DiskOverflowQueue.
291
func (q *DiskOverflowQueue[T]) NextBackupID() <-chan T {
3✔
292
        return q.outputChan
3✔
293
}
3✔
294

295
// drainInputList handles the input to the DiskOverflowQueue. It takes from the
296
// un-bounded input list and then, depending on what mode the queue is in,
297
// either puts the new item straight onto the persisted disk queue or attempts
298
// to feed it into the memQueue. On exit, any unhandled task will be assigned to
299
// leftOverItem3.
300
func (q *DiskOverflowQueue[T]) drainInputList() {
3✔
301
        defer q.wg.Done()
3✔
302

3✔
303
        for {
6✔
304
                // Wait for the input list to not be empty.
3✔
305
                q.inputListCond.L.Lock()
3✔
306
                for q.inputList.Front() == nil {
6✔
307
                        q.inputListCond.Wait()
3✔
308

3✔
309
                        select {
3✔
310
                        case <-q.quit:
3✔
311
                                q.inputListCond.L.Unlock()
3✔
312
                                return
3✔
313
                        default:
3✔
314
                        }
315
                }
316

317
                // Pop the first element from the queue.
318
                e := q.inputList.Front()
3✔
319

3✔
320
                //nolint:forcetypeassert
3✔
321
                task := q.inputList.Remove(e).(T)
3✔
322
                q.inputListCond.L.Unlock()
3✔
323

3✔
324
                // What we do with this new item depends on what the mode of the
3✔
325
                // queue currently is.
3✔
326
                for q.pushToActiveQueue(task) {
3✔
327
                        // We retry until the task is handled or the quit
×
328
                        // channel is closed.
×
329
                }
×
330

331
                // If the above returned false because the quit channel was
332
                // closed, then we exit.
333
                select {
3✔
334
                case <-q.quit:
×
335
                        return
×
336
                default:
3✔
337
                }
338
        }
339
}
340

341
// pushToActiveQueue handles the input of a new task to the queue. It returns
342
// true if the task should be retried and false if the task was handled or the
343
// quit channel fired.
344
func (q *DiskOverflowQueue[T]) pushToActiveQueue(task T) bool {
3✔
345
        // If the queue is in disk mode then any new items should be put
3✔
346
        // straight into the disk queue.
3✔
347
        if q.toDisk.Load() {
3✔
348
                err := q.db.Push(task)
×
349
                if err != nil {
×
350
                        // Log and back off for a few seconds and then
×
351
                        // try again with the same task.
×
352
                        q.log.Errorf("could not persist %s to disk. "+
×
353
                                "Retrying after backoff", task)
×
354

×
355
                        select {
×
356
                        // Backoff for a bit and then re-check the mode
357
                        // and try again to handle the task.
358
                        case <-time.After(dbErrorBackoff):
×
359
                                return true
×
360

361
                        // If the queue is quit at this moment, then the
362
                        // unhandled task is assigned to leftOverItem3
363
                        // so that it can be handled by the stop method.
364
                        case <-q.quit:
×
365
                                q.leftOverItem3 = &task
×
366

×
367
                                return false
×
368
                        }
369
                }
370

371
                // Send a signal that there is a new item in the main
372
                // disk queue.
373
                select {
×
374
                case q.newDiskItemSignal <- struct{}{}:
×
375
                case <-q.quit:
×
376

377
                // Because there might already be a signal in the
378
                // newDiskItemSignal channel, we can skip sending another
379
                // signal. The channel only has a buffer of one, so we would
380
                // block here if we didn't have a default case.
381
                default:
×
382
                }
383

384
                // If we got here, we were able to store the task in the disk
385
                // queue, so we can return false as no retry is necessary.
386
                return false
×
387
        }
388

389
        // If the mode is memory mode, then try feed it to the feedMemQueue
390
        // handler via the un-buffered inputChan channel. We wrap it in an
391
        // internal task so that we can find out if feedMemQueue successfully
392
        // handled the item. If it did, we continue in memory mode and if not,
393
        // then we switch to disk mode so that we can persist the item to the
394
        // disk queue instead.
395
        it := newInternalTask(task)
3✔
396

3✔
397
        select {
3✔
398
        // Try feed the task to the feedMemQueue handler. The handler, if it
399
        // does take the task, is guaranteed to respond via the success channel
400
        // of the task to indicate if the task was successfully added to the
401
        // in-mem queue. This is guaranteed even if the queue is being stopped.
402
        case q.inputChan <- it:
3✔
403

404
        // If the queue is quit at this moment, then the unhandled task is
405
        // assigned to leftOverItem3 so that it can be handled by the stop
406
        // method.
407
        case <-q.quit:
×
408
                q.leftOverItem3 = &task
×
409

×
410
                return false
×
411

412
        default:
×
413
                // The task was not accepted. So maybe the mode changed.
×
414
                return true
×
415
        }
416

417
        // If we get here, it means that the feedMemQueue handler took the task.
418
        // It is guaranteed to respond via the success channel, so we wait for
419
        // that response here.
420
        s := <-it.success
3✔
421
        if s {
6✔
422
                return false
3✔
423
        }
3✔
424

425
        // If the task was not successfully handled by feedMemQueue, then we
426
        // switch to disk mode so that the task can be persisted in the disk
427
        // queue instead.
428
        q.toDisk.Store(true)
×
429

×
430
        return true
×
431
}
432

433
// feedMemQueue manages which items should be fed onto the buffered
434
// memQueue. If the queue is then in disk mode, then the handler will read new
435
// tasks from the disk queue until it is empty. After that, it will switch
436
// between reading from the input channel or the disk queue depending on the
437
// queue mode.
438
func (q *DiskOverflowQueue[T]) feedMemQueue() {
3✔
439
        defer func() {
6✔
440
                close(q.memQueue)
3✔
441
                q.wg.Done()
3✔
442
        }()
3✔
443

444
        feedFromDisk := func() {
3✔
445
                select {
×
446
                case <-q.quit:
×
447
                        return
×
448
                default:
×
449
                }
450

451
                for {
×
452
                        // Ideally, we want to do batch reads from the DB. So
×
453
                        // we check how much capacity there is in the memQueue
×
454
                        // and fetch enough tasks to fill that capacity. If
×
455
                        // there is no capacity, however, then we at least want
×
456
                        // to fetch one task.
×
457
                        numToPop := cap(q.memQueue) - len(q.memQueue)
×
458
                        if numToPop == 0 {
×
459
                                numToPop = 1
×
460
                        }
×
461

462
                        tasks, err := q.db.PopUpTo(numToPop)
×
463
                        if errors.Is(err, wtdb.ErrEmptyQueue) {
×
464
                                q.toDisk.Store(false)
×
465

×
466
                                return
×
467
                        } else if err != nil {
×
468
                                q.log.Errorf("Could not load next task from " +
×
469
                                        "disk. Retrying.")
×
470

×
471
                                select {
×
472
                                case <-time.After(dbErrorBackoff):
×
473
                                        continue
×
474
                                case <-q.quit:
×
475
                                        return
×
476
                                }
477
                        }
478

479
                        // If we did manage to fetch a task from disk, we make
480
                        // sure to set the toDisk mode to true since we may
481
                        // block indefinitely while trying to push the tasks to
482
                        // the memQueue in which case we want the drainInputList
483
                        // goroutine to write any new tasks to disk.
484
                        q.toDisk.Store(true)
×
485

×
486
                        for i, task := range tasks {
×
487
                                select {
×
488
                                case q.memQueue <- task:
×
489

490
                                // If the queue is quit at this moment, then the
491
                                // unhandled tasks are assigned to
492
                                // leftOverItems2 so that they can be handled
493
                                // by the stop method.
494
                                case <-q.quit:
×
495
                                        q.leftOverItems2 = tasks[i:]
×
496
                                        return
×
497
                                }
498
                        }
499
                }
500
        }
501

502
        // If the queue is in disk mode, then the memQueue is fed with tasks
503
        // from the disk queue until it is empty.
504
        if q.toDisk.Load() {
3✔
505
                feedFromDisk()
×
506
        }
×
507

508
        // Now the queue enters its normal operation.
509
        for {
6✔
510
                select {
3✔
511
                case <-q.quit:
3✔
512
                        return
3✔
513

514
                // If there is a signal that a new item has been added to disk
515
                // then we use the disk queue as the source of the next task
516
                // to feed into memQueue.
517
                case <-q.newDiskItemSignal:
×
518
                        feedFromDisk()
×
519

520
                // If any items come through on the inputChan, then we try feed
521
                // these directly into the memQueue. If there is space in the
522
                // memeQueue then we respond with success to the producer,
523
                // otherwise we respond with failure so that the producer can
524
                // instead persist the task to disk. After the producer,
525
                // drainInputList, has pushed an item to inputChan, it is
526
                // guaranteed to await a response on the task's success channel
527
                // before quitting. Therefore, it is not required to listen on
528
                // the quit channel here.
529
                case task := <-q.inputChan:
3✔
530
                        select {
3✔
531
                        case q.memQueue <- task.task:
3✔
532
                                task.success <- true
3✔
533
                                continue
3✔
534
                        default:
×
535
                                task.success <- false
×
536
                        }
537
                }
538
        }
539
}
540

541
// feedOutputChan will pop an item from the buffered memQueue and block until
542
// the item is taken from the un-buffered outputChan. This is done repeatedly
543
// for the lifetime of the DiskOverflowQueue. On shutdown of the queue, any
544
// item not consumed by the outputChan but held by this method is assigned to
545
// the leftOverItem1 member so that the Stop method can persist the item to
546
// disk so that it is reloaded on restart.
547
//
548
// NOTE: This must be run as a goroutine.
549
func (q *DiskOverflowQueue[T]) feedOutputChan() {
3✔
550
        defer func() {
6✔
551
                close(q.outputChan)
3✔
552
                q.wg.Done()
3✔
553
        }()
3✔
554

555
        for {
6✔
556
                select {
3✔
557
                case nextTask, ok := <-q.memQueue:
3✔
558
                        // If the memQueue is closed, then the queue is
3✔
559
                        // stopping.
3✔
560
                        if !ok {
3✔
561
                                return
×
562
                        }
×
563

564
                        select {
3✔
565
                        case q.outputChan <- nextTask:
3✔
566
                        case <-q.quit:
×
567
                                q.leftOverItem1 = &nextTask
×
568
                                return
×
569
                        }
570

571
                case <-q.quit:
3✔
572
                        return
3✔
573
                }
574
        }
575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc