• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15733724825

18 Jun 2025 01:09PM UTC coverage: 68.248% (+0.02%) from 68.231%
15733724825

push

github

web-flow
Merge pull request #9962 from yyforyongyu/fix-panic

chainio: use package logger instead of instance logger

5 of 7 new or added lines in 1 file covered. (71.43%)

50 existing lines in 13 files now uncovered.

134464 of 197024 relevant lines covered (68.25%)

22189.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.92
/watchtower/wtclient/queue.go
1
package wtclient
2

3
import (
4
        "container/list"
5
        "errors"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
12
)
13

14
const (
15
        // dbErrorBackoff is the length of time we will back off before retrying
16
        // any DB action that failed.
17
        dbErrorBackoff = time.Second * 5
18
)
19

20
// internalTask wraps a BackupID task with a success channel.
21
type internalTask[T any] struct {
22
        task    T
23
        success chan bool
24
}
25

26
// newInternalTask creates a new internalTask with the given task.
27
func newInternalTask[T any](task T) *internalTask[T] {
73,866✔
28
        return &internalTask[T]{
73,866✔
29
                task:    task,
73,866✔
30
                success: make(chan bool),
73,866✔
31
        }
73,866✔
32
}
73,866✔
33

34
// DiskOverflowQueue is a queue that must be initialised with a certain maximum
35
// buffer size which represents the maximum number of elements that the queue
36
// should hold in memory. If the queue is full, then any new elements added to
37
// the queue will be persisted to disk instead. Once a consumer starts reading
38
// from the front of the queue again then items on disk will be moved into the
39
// queue again. The queue is also re-start safe. When it is stopped, any items
40
// in the memory queue, will be persisted to disk. On start up, the queue will
41
// be re-initialised with the items on disk.
42
type DiskOverflowQueue[T any] struct {
43
        startOnce sync.Once
44
        stopOnce  sync.Once
45

46
        log btclog.Logger
47

48
        // db is the database that will be used to persist queue items to disk.
49
        db wtdb.Queue[T]
50

51
        // toDisk represents the current mode of operation of the queue.
52
        toDisk atomic.Bool
53

54
        // We used an unbound list for the input of the queue so that producers
55
        // putting items into the queue are never blocked.
56
        inputListMu   sync.Mutex
57
        inputListCond *sync.Cond
58
        inputList     *list.List
59

60
        // inputChan is an unbuffered channel used to pass items from
61
        // drainInputList to feedMemQueue.
62
        inputChan chan *internalTask[T]
63

64
        // memQueue is a buffered channel used to pass items from
65
        // feedMemQueue to feedOutputChan.
66
        memQueue chan T
67

68
        // outputChan is an unbuffered channel from which items at the head of
69
        // the queue can be read.
70
        outputChan chan T
71

72
        // newDiskItemSignal is used to signal that there is a new item in the
73
        // main disk queue. There should only be one reader and one writer for
74
        // this channel.
75
        newDiskItemSignal chan struct{}
76

77
        // leftOverItem1 will be a non-nil task on shutdown if the
78
        // feedOutputChan method was holding an unhandled tasks at shutdown
79
        // time. Since feedOutputChan handles the very head of the queue, this
80
        // item should be the first to be reloaded on restart.
81
        leftOverItem1 *T
82

83
        // leftOverItems2 will be non-empty on shutdown if the feedMemQueue
84
        // method was holding any unhandled tasks at shutdown time. Since
85
        // feedMemQueue manages the input to the queue, the tasks should be
86
        // pushed to the head of the disk queue.
87
        leftOverItems2 []T
88

89
        // leftOverItem3 will be non-nil on shutdown if drainInputList was
90
        // holding an unhandled task at shutdown time. This task should be put
91
        // at the tail of the disk queue but should come before any input list
92
        // task.
93
        leftOverItem3 *T
94

95
        quit chan struct{}
96
        wg   sync.WaitGroup
97
}
98

99
// NewDiskOverflowQueue constructs a new DiskOverflowQueue.
100
func NewDiskOverflowQueue[T any](db wtdb.Queue[T], maxQueueSize uint64,
101
        logger btclog.Logger) (*DiskOverflowQueue[T], error) {
57✔
102

57✔
103
        if maxQueueSize < 2 {
57✔
104
                return nil, errors.New("the in-memory queue buffer size " +
×
105
                        "must be larger than 2")
×
106
        }
×
107

108
        q := &DiskOverflowQueue[T]{
57✔
109
                log:               logger,
57✔
110
                db:                db,
57✔
111
                inputList:         list.New(),
57✔
112
                newDiskItemSignal: make(chan struct{}, 1),
57✔
113
                inputChan:         make(chan *internalTask[T]),
57✔
114
                memQueue:          make(chan T, maxQueueSize-2),
57✔
115
                outputChan:        make(chan T),
57✔
116
                quit:              make(chan struct{}),
57✔
117
        }
57✔
118
        q.inputListCond = sync.NewCond(&q.inputListMu)
57✔
119

57✔
120
        return q, nil
57✔
121
}
122

123
// Start kicks off all the goroutines that are required to manage the queue.
124
func (q *DiskOverflowQueue[T]) Start() error {
57✔
125
        var err error
57✔
126
        q.startOnce.Do(func() {
114✔
127
                err = q.start()
57✔
128
        })
57✔
129

130
        return err
57✔
131
}
132

133
// start kicks off all the goroutines that are required to manage the queue.
134
func (q *DiskOverflowQueue[T]) start() error {
57✔
135
        numDisk, err := q.db.Len()
57✔
136
        if err != nil {
57✔
137
                return err
×
138
        }
×
139
        if numDisk != 0 {
77✔
140
                q.toDisk.Store(true)
20✔
141
        }
20✔
142

143
        // Kick off the three goroutines which will handle the input list, the
144
        // in-memory queue and the output channel.
145
        // The three goroutines are moving items according to the following
146
        // diagram:
147
        //
148
        //         ┌─────────┐ drainInputList  ┌──────────┐
149
        //         │inputList├─────┬──────────►│disk/db   │
150
        //         └─────────┘     │           └──────────┘
151
        //                         │ (depending on mode)
152
        //                         │           ┌──────────┐
153
        //                         └──────────►│inputChan │
154
        //                                     └──────────┘
155
        //
156
        //         ┌─────────┐ feedMemQueue    ┌──────────┐
157
        //         │disk/db  ├───────┬────────►│memQueue  │
158
        //         └─────────┘       │         └──────────┘
159
        //                           │ (depending on mode)
160
        //         ┌─────────┐       │
161
        //         │inputChan├───────┘
162
        //         └─────────┘
163
        //
164
        //         ┌─────────┐ feedOutputChan  ┌──────────┐
165
        //         │memQueue ├────────────────►│outputChan│
166
        //         └─────────┘                 └──────────┘
167
        //
168
        q.wg.Add(3)
57✔
169
        go q.drainInputList()
57✔
170
        go q.feedMemQueue()
57✔
171
        go q.feedOutputChan()
57✔
172

57✔
173
        return nil
57✔
174
}
175

176
// Stop stops the queue and persists any items in the memory queue to disk.
177
func (q *DiskOverflowQueue[T]) Stop() error {
57✔
178
        var err error
57✔
179
        q.stopOnce.Do(func() {
114✔
180
                err = q.stop()
57✔
181
        })
57✔
182

183
        return err
57✔
184
}
185

186
// stop the queue and persists any items in the memory queue to disk.
187
func (q *DiskOverflowQueue[T]) stop() error {
57✔
188
        close(q.quit)
57✔
189

57✔
190
        // Signal on the inputListCond until all the goroutines have returned.
57✔
191
        shutdown := make(chan struct{})
57✔
192
        go func() {
114✔
193
                for {
198✔
194
                        select {
141✔
195
                        case <-time.After(time.Millisecond):
87✔
196
                                q.inputListCond.Signal()
87✔
197
                        case <-shutdown:
57✔
198
                                return
57✔
199
                        }
200
                }
201
        }()
202

203
        q.wg.Wait()
57✔
204
        close(shutdown)
57✔
205

57✔
206
        // queueHead will be the items that we will be pushed to the head of
57✔
207
        // the queue.
57✔
208
        var queueHead []T
57✔
209

57✔
210
        // First, we append leftOverItem1 since this task is the current head
57✔
211
        // of the queue.
57✔
212
        if q.leftOverItem1 != nil {
77✔
213
                queueHead = append(queueHead, *q.leftOverItem1)
20✔
214
        }
20✔
215

216
        // Next, drain the buffered queue.
217
        for {
9,455✔
218
                task, ok := <-q.memQueue
9,398✔
219
                if !ok {
9,455✔
220
                        break
57✔
221
                }
222

223
                queueHead = append(queueHead, task)
9,341✔
224
        }
225

226
        // Then, any items held in leftOverItems2 would have been next to join
227
        // the memQueue. So those gets added next.
228
        if len(q.leftOverItems2) != 0 {
74✔
229
                queueHead = append(queueHead, q.leftOverItems2...)
17✔
230
        }
17✔
231

232
        // Now, push these items to the head of the queue.
233
        err := q.db.PushHead(queueHead...)
57✔
234
        if err != nil {
57✔
235
                q.log.Errorf("Could not add tasks to queue head: %v", err)
×
236
        }
×
237

238
        // Next we handle any items that need to be added to the main disk
239
        // queue.
240
        var diskQueue []T
57✔
241

57✔
242
        // Any item in leftOverItem3 is the first item that should join the
57✔
243
        // disk queue.
57✔
244
        if q.leftOverItem3 != nil {
57✔
UNCOV
245
                diskQueue = append(diskQueue, *q.leftOverItem3)
×
UNCOV
246
        }
×
247

248
        // Lastly, drain any items in the unbuffered input list.
249
        q.inputListCond.L.Lock()
57✔
250
        for q.inputList.Front() != nil {
176,686✔
251
                e := q.inputList.Front()
176,629✔
252

176,629✔
253
                //nolint:forcetypeassert
176,629✔
254
                task := q.inputList.Remove(e).(T)
176,629✔
255

176,629✔
256
                diskQueue = append(diskQueue, task)
176,629✔
257
        }
176,629✔
258
        q.inputListCond.L.Unlock()
57✔
259

57✔
260
        // Now persist these items to the main disk queue.
57✔
261
        err = q.db.Push(diskQueue...)
57✔
262
        if err != nil {
57✔
263
                q.log.Errorf("Could not add tasks to queue tail: %v", err)
×
264
        }
×
265

266
        return nil
57✔
267
}
268

269
// QueueBackupID adds a wtdb.BackupID to the queue. It will only return an error
270
// if the queue has been stopped. It is non-blocking.
271
func (q *DiskOverflowQueue[T]) QueueBackupID(item *wtdb.BackupID) error {
200,495✔
272
        // Return an error if the queue has been stopped
200,495✔
273
        select {
200,495✔
274
        case <-q.quit:
×
275
                return ErrClientExiting
×
276
        default:
200,495✔
277
        }
278

279
        // Add the new item to the unbound input list.
280
        q.inputListCond.L.Lock()
200,495✔
281
        q.inputList.PushBack(item)
200,495✔
282
        q.inputListCond.L.Unlock()
200,495✔
283

200,495✔
284
        // Signal that there is a new item in the input list.
200,495✔
285
        q.inputListCond.Signal()
200,495✔
286

200,495✔
287
        return nil
200,495✔
288
}
289

290
// NextBackupID can be used to read from the head of the DiskOverflowQueue.
291
func (q *DiskOverflowQueue[T]) NextBackupID() <-chan T {
200,545✔
292
        return q.outputChan
200,545✔
293
}
200,545✔
294

295
// drainInputList handles the input to the DiskOverflowQueue. It takes from the
296
// un-bounded input list and then, depending on what mode the queue is in,
297
// either puts the new item straight onto the persisted disk queue or attempts
298
// to feed it into the memQueue. On exit, any unhandled task will be assigned to
299
// leftOverItem3.
300
func (q *DiskOverflowQueue[T]) drainInputList() {
57✔
301
        defer q.wg.Done()
57✔
302

57✔
303
        for {
23,974✔
304
                // Wait for the input list to not be empty.
23,917✔
305
                q.inputListCond.L.Lock()
23,917✔
306
                for q.inputList.Front() == nil {
24,022✔
307
                        q.inputListCond.Wait()
105✔
308

105✔
309
                        select {
105✔
310
                        case <-q.quit:
54✔
311
                                q.inputListCond.L.Unlock()
54✔
312
                                return
54✔
313
                        default:
54✔
314
                        }
315
                }
316

317
                // Pop the first element from the queue.
318
                e := q.inputList.Front()
23,866✔
319

23,866✔
320
                //nolint:forcetypeassert
23,866✔
321
                task := q.inputList.Remove(e).(T)
23,866✔
322
                q.inputListCond.L.Unlock()
23,866✔
323

23,866✔
324
                // What we do with this new item depends on what the mode of the
23,866✔
325
                // queue currently is.
23,866✔
326
                for q.pushToActiveQueue(task) {
74,325✔
327
                        // We retry until the task is handled or the quit
50,459✔
328
                        // channel is closed.
50,459✔
329
                }
50,459✔
330

331
                // If the above returned false because the quit channel was
332
                // closed, then we exit.
333
                select {
23,866✔
334
                case <-q.quit:
3✔
335
                        return
3✔
336
                default:
23,863✔
337
                }
338
        }
339
}
340

341
// pushToActiveQueue handles the input of a new task to the queue. It returns
342
// true if the task should be retried and false if the task was handled or the
343
// quit channel fired.
344
func (q *DiskOverflowQueue[T]) pushToActiveQueue(task T) bool {
74,325✔
345
        // If the queue is in disk mode then any new items should be put
74,325✔
346
        // straight into the disk queue.
74,325✔
347
        if q.toDisk.Load() {
74,784✔
348
                err := q.db.Push(task)
459✔
349
                if err != nil {
459✔
350
                        // Log and back off for a few seconds and then
×
351
                        // try again with the same task.
×
352
                        q.log.Errorf("could not persist %s to disk. "+
×
353
                                "Retrying after backoff", task)
×
354

×
355
                        select {
×
356
                        // Backoff for a bit and then re-check the mode
357
                        // and try again to handle the task.
358
                        case <-time.After(dbErrorBackoff):
×
359
                                return true
×
360

361
                        // If the queue is quit at this moment, then the
362
                        // unhandled task is assigned to leftOverItem3
363
                        // so that it can be handled by the stop method.
364
                        case <-q.quit:
×
365
                                q.leftOverItem3 = &task
×
366

×
367
                                return false
×
368
                        }
369
                }
370

371
                // Send a signal that there is a new item in the main
372
                // disk queue.
373
                select {
459✔
374
                case q.newDiskItemSignal <- struct{}{}:
91✔
375
                case <-q.quit:
2✔
376

377
                // Because there might already be a signal in the
378
                // newDiskItemSignal channel, we can skip sending another
379
                // signal. The channel only has a buffer of one, so we would
380
                // block here if we didn't have a default case.
381
                default:
366✔
382
                }
383

384
                // If we got here, we were able to store the task in the disk
385
                // queue, so we can return false as no retry is necessary.
386
                return false
459✔
387
        }
388

389
        // If the mode is memory mode, then try feed it to the feedMemQueue
390
        // handler via the un-buffered inputChan channel. We wrap it in an
391
        // internal task so that we can find out if feedMemQueue successfully
392
        // handled the item. If it did, we continue in memory mode and if not,
393
        // then we switch to disk mode so that we can persist the item to the
394
        // disk queue instead.
395
        it := newInternalTask(task)
73,866✔
396

73,866✔
397
        select {
73,866✔
398
        // Try feed the task to the feedMemQueue handler. The handler, if it
399
        // does take the task, is guaranteed to respond via the success channel
400
        // of the task to indicate if the task was successfully added to the
401
        // in-mem queue. This is guaranteed even if the queue is being stopped.
402
        case q.inputChan <- it:
23,452✔
403

404
        // If the queue is quit at this moment, then the unhandled task is
405
        // assigned to leftOverItem3 so that it can be handled by the stop
406
        // method.
UNCOV
407
        case <-q.quit:
×
UNCOV
408
                q.leftOverItem3 = &task
×
UNCOV
409

×
UNCOV
410
                return false
×
411

412
        default:
50,414✔
413
                // The task was not accepted. So maybe the mode changed.
50,414✔
414
                return true
50,414✔
415
        }
416

417
        // If we get here, it means that the feedMemQueue handler took the task.
418
        // It is guaranteed to respond via the success channel, so we wait for
419
        // that response here.
420
        s := <-it.success
23,452✔
421
        if s {
46,859✔
422
                return false
23,407✔
423
        }
23,407✔
424

425
        // If the task was not successfully handled by feedMemQueue, then we
426
        // switch to disk mode so that the task can be persisted in the disk
427
        // queue instead.
428
        q.toDisk.Store(true)
45✔
429

45✔
430
        return true
45✔
431
}
432

433
// feedMemQueue manages which items should be fed onto the buffered
434
// memQueue. If the queue is then in disk mode, then the handler will read new
435
// tasks from the disk queue until it is empty. After that, it will switch
436
// between reading from the input channel or the disk queue depending on the
437
// queue mode.
438
func (q *DiskOverflowQueue[T]) feedMemQueue() {
57✔
439
        defer func() {
114✔
440
                close(q.memQueue)
57✔
441
                q.wg.Done()
57✔
442
        }()
57✔
443

444
        feedFromDisk := func() {
165✔
445
                select {
108✔
446
                case <-q.quit:
1✔
447
                        return
1✔
448
                default:
107✔
449
                }
450

451
                for {
838✔
452
                        // Ideally, we want to do batch reads from the DB. So
731✔
453
                        // we check how much capacity there is in the memQueue
731✔
454
                        // and fetch enough tasks to fill that capacity. If
731✔
455
                        // there is no capacity, however, then we at least want
731✔
456
                        // to fetch one task.
731✔
457
                        numToPop := cap(q.memQueue) - len(q.memQueue)
731✔
458
                        if numToPop == 0 {
1,264✔
459
                                numToPop = 1
533✔
460
                        }
533✔
461

462
                        tasks, err := q.db.PopUpTo(numToPop)
731✔
463
                        if errors.Is(err, wtdb.ErrEmptyQueue) {
821✔
464
                                q.toDisk.Store(false)
90✔
465

90✔
466
                                return
90✔
467
                        } else if err != nil {
731✔
468
                                q.log.Errorf("Could not load next task from " +
×
469
                                        "disk. Retrying.")
×
470

×
471
                                select {
×
472
                                case <-time.After(dbErrorBackoff):
×
473
                                        continue
×
474
                                case <-q.quit:
×
475
                                        return
×
476
                                }
477
                        }
478

479
                        // If we did manage to fetch a task from disk, we make
480
                        // sure to set the toDisk mode to true since we may
481
                        // block indefinitely while trying to push the tasks to
482
                        // the memQueue in which case we want the drainInputList
483
                        // goroutine to write any new tasks to disk.
484
                        q.toDisk.Store(true)
641✔
485

641✔
486
                        for i, task := range tasks {
187,107✔
487
                                select {
186,466✔
488
                                case q.memQueue <- task:
186,449✔
489

490
                                // If the queue is quit at this moment, then the
491
                                // unhandled tasks are assigned to
492
                                // leftOverItems2 so that they can be handled
493
                                // by the stop method.
494
                                case <-q.quit:
17✔
495
                                        q.leftOverItems2 = tasks[i:]
17✔
496
                                        return
17✔
497
                                }
498
                        }
499
                }
500
        }
501

502
        // If the queue is in disk mode, then the memQueue is fed with tasks
503
        // from the disk queue until it is empty.
504
        if q.toDisk.Load() {
77✔
505
                feedFromDisk()
20✔
506
        }
20✔
507

508
        // Now the queue enters its normal operation.
509
        for {
23,651✔
510
                select {
23,594✔
511
                case <-q.quit:
57✔
512
                        return
57✔
513

514
                // If there is a signal that a new item has been added to disk
515
                // then we use the disk queue as the source of the next task
516
                // to feed into memQueue.
517
                case <-q.newDiskItemSignal:
88✔
518
                        feedFromDisk()
88✔
519

520
                // If any items come through on the inputChan, then we try feed
521
                // these directly into the memQueue. If there is space in the
522
                // memeQueue then we respond with success to the producer,
523
                // otherwise we respond with failure so that the producer can
524
                // instead persist the task to disk. After the producer,
525
                // drainInputList, has pushed an item to inputChan, it is
526
                // guaranteed to await a response on the task's success channel
527
                // before quitting. Therefore, it is not required to listen on
528
                // the quit channel here.
529
                case task := <-q.inputChan:
23,452✔
530
                        select {
23,452✔
531
                        case q.memQueue <- task.task:
23,407✔
532
                                task.success <- true
23,407✔
533
                                continue
23,407✔
534
                        default:
45✔
535
                                task.success <- false
45✔
536
                        }
537
                }
538
        }
539
}
540

541
// feedOutputChan will pop an item from the buffered memQueue and block until
542
// the item is taken from the un-buffered outputChan. This is done repeatedly
543
// for the lifetime of the DiskOverflowQueue. On shutdown of the queue, any
544
// item not consumed by the outputChan but held by this method is assigned to
545
// the leftOverItem1 member so that the Stop method can persist the item to
546
// disk so that it is reloaded on restart.
547
//
548
// NOTE: This must be run as a goroutine.
549
func (q *DiskOverflowQueue[T]) feedOutputChan() {
57✔
550
        defer func() {
114✔
551
                close(q.outputChan)
57✔
552
                q.wg.Done()
57✔
553
        }()
57✔
554

555
        for {
200,606✔
556
                select {
200,549✔
557
                case nextTask, ok := <-q.memQueue:
200,515✔
558
                        // If the memQueue is closed, then the queue is
200,515✔
559
                        // stopping.
200,515✔
560
                        if !ok {
200,515✔
561
                                return
×
562
                        }
×
563

564
                        select {
200,515✔
565
                        case q.outputChan <- nextTask:
200,495✔
566
                        case <-q.quit:
20✔
567
                                q.leftOverItem1 = &nextTask
20✔
568
                                return
20✔
569
                        }
570

571
                case <-q.quit:
37✔
572
                        return
37✔
573
                }
574
        }
575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc