• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12349698563

16 Dec 2024 09:29AM UTC coverage: 58.55% (-0.09%) from 58.636%
12349698563

Pull #9357

github

GeorgeTsagk
contractcourt: include custom records on replayed htlc

When notifying the invoice registry for an exit hop htlc we also want to
include its custom records. The channelLink, the other caller of this
method, already populates this field. So we make sure the contest
resolver does so too.
Pull Request #9357: contractcourt: include custom records on replayed htlc

2 of 2 new or added lines in 1 file covered. (100.0%)

262 existing lines in 24 files now uncovered.

134243 of 229278 relevant lines covered (58.55%)

19277.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.73
/watchtower/wtclient/queue.go
1
package wtclient
2

3
import (
4
        "container/list"
5
        "errors"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btclog/v2"
11
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
12
)
13

14
const (
15
        // dbErrorBackoff is the length of time we will back off before retrying
16
        // any DB action that failed.
17
        dbErrorBackoff = time.Second * 5
18
)
19

20
// internalTask wraps a BackupID task with a success channel.
21
type internalTask[T any] struct {
22
        task    T
23
        success chan bool
24
}
25

26
// newInternalTask creates a new internalTask with the given task.
27
func newInternalTask[T any](task T) *internalTask[T] {
64,429✔
28
        return &internalTask[T]{
64,429✔
29
                task:    task,
64,429✔
30
                success: make(chan bool),
64,429✔
31
        }
64,429✔
32
}
64,429✔
33

34
// DiskOverflowQueue is a queue that must be initialised with a certain maximum
35
// buffer size which represents the maximum number of elements that the queue
36
// should hold in memory. If the queue is full, then any new elements added to
37
// the queue will be persisted to disk instead. Once a consumer starts reading
38
// from the front of the queue again then items on disk will be moved into the
39
// queue again. The queue is also re-start safe. When it is stopped, any items
40
// in the memory queue, will be persisted to disk. On start up, the queue will
41
// be re-initialised with the items on disk.
42
type DiskOverflowQueue[T any] struct {
43
        startOnce sync.Once
44
        stopOnce  sync.Once
45

46
        log btclog.Logger
47

48
        // db is the database that will be used to persist queue items to disk.
49
        db wtdb.Queue[T]
50

51
        // toDisk represents the current mode of operation of the queue.
52
        toDisk atomic.Bool
53

54
        // We used an unbound list for the input of the queue so that producers
55
        // putting items into the queue are never blocked.
56
        inputListMu   sync.Mutex
57
        inputListCond *sync.Cond
58
        inputList     *list.List
59

60
        // inputChan is an unbuffered channel used to pass items from
61
        // drainInputList to feedMemQueue.
62
        inputChan chan *internalTask[T]
63

64
        // memQueue is a buffered channel used to pass items from
65
        // feedMemQueue to feedOutputChan.
66
        memQueue chan T
67

68
        // outputChan is an unbuffered channel from which items at the head of
69
        // the queue can be read.
70
        outputChan chan T
71

72
        // newDiskItemSignal is used to signal that there is a new item in the
73
        // main disk queue. There should only be one reader and one writer for
74
        // this channel.
75
        newDiskItemSignal chan struct{}
76

77
        // leftOverItem1 will be a non-nil task on shutdown if the
78
        // feedOutputChan method was holding an unhandled tasks at shutdown
79
        // time. Since feedOutputChan handles the very head of the queue, this
80
        // item should be the first to be reloaded on restart.
81
        leftOverItem1 *T
82

83
        // leftOverItems2 will be non-empty on shutdown if the feedMemQueue
84
        // method was holding any unhandled tasks at shutdown time. Since
85
        // feedMemQueue manages the input to the queue, the tasks should be
86
        // pushed to the head of the disk queue.
87
        leftOverItems2 []T
88

89
        // leftOverItem3 will be non-nil on shutdown if drainInputList was
90
        // holding an unhandled task at shutdown time. This task should be put
91
        // at the tail of the disk queue but should come before any input list
92
        // task.
93
        leftOverItem3 *T
94

95
        quit chan struct{}
96
        wg   sync.WaitGroup
97
}
98

99
// NewDiskOverflowQueue constructs a new DiskOverflowQueue.
100
func NewDiskOverflowQueue[T any](db wtdb.Queue[T], maxQueueSize uint64,
101
        logger btclog.Logger) (*DiskOverflowQueue[T], error) {
51✔
102

51✔
103
        if maxQueueSize < 2 {
51✔
104
                return nil, errors.New("the in-memory queue buffer size " +
×
105
                        "must be larger than 2")
×
106
        }
×
107

108
        q := &DiskOverflowQueue[T]{
51✔
109
                log:               logger,
51✔
110
                db:                db,
51✔
111
                inputList:         list.New(),
51✔
112
                newDiskItemSignal: make(chan struct{}, 1),
51✔
113
                inputChan:         make(chan *internalTask[T]),
51✔
114
                memQueue:          make(chan T, maxQueueSize-2),
51✔
115
                outputChan:        make(chan T),
51✔
116
                quit:              make(chan struct{}),
51✔
117
        }
51✔
118
        q.inputListCond = sync.NewCond(&q.inputListMu)
51✔
119

51✔
120
        return q, nil
51✔
121
}
122

123
// Start kicks off all the goroutines that are required to manage the queue.
124
func (q *DiskOverflowQueue[T]) Start() error {
51✔
125
        var err error
51✔
126
        q.startOnce.Do(func() {
102✔
127
                err = q.start()
51✔
128
        })
51✔
129

130
        return err
51✔
131
}
132

133
// start kicks off all the goroutines that are required to manage the queue.
134
func (q *DiskOverflowQueue[T]) start() error {
51✔
135
        numDisk, err := q.db.Len()
51✔
136
        if err != nil {
51✔
137
                return err
×
138
        }
×
139
        if numDisk != 0 {
65✔
140
                q.toDisk.Store(true)
14✔
141
        }
14✔
142

143
        // Kick off the three goroutines which will handle the input list, the
144
        // in-memory queue and the output channel.
145
        // The three goroutines are moving items according to the following
146
        // diagram:
147
        //
148
        //         ┌─────────┐ drainInputList  ┌──────────┐
149
        //         │inputList├─────┬──────────►│disk/db   │
150
        //         └─────────┘     │           └──────────┘
151
        //                         │ (depending on mode)
152
        //                         │           ┌──────────┐
153
        //                         └──────────►│inputChan │
154
        //                                     └──────────┘
155
        //
156
        //         ┌─────────┐ feedMemQueue    ┌──────────┐
157
        //         │disk/db  ├───────┬────────►│memQueue  │
158
        //         └─────────┘       │         └──────────┘
159
        //                           │ (depending on mode)
160
        //         ┌─────────┐       │
161
        //         │inputChan├───────┘
162
        //         └─────────┘
163
        //
164
        //         ┌─────────┐ feedOutputChan  ┌──────────┐
165
        //         │memQueue ├────────────────►│outputChan│
166
        //         └─────────┘                 └──────────┘
167
        //
168
        q.wg.Add(3)
51✔
169
        go q.drainInputList()
51✔
170
        go q.feedMemQueue()
51✔
171
        go q.feedOutputChan()
51✔
172

51✔
173
        return nil
51✔
174
}
175

176
// Stop stops the queue and persists any items in the memory queue to disk.
177
func (q *DiskOverflowQueue[T]) Stop() error {
51✔
178
        var err error
51✔
179
        q.stopOnce.Do(func() {
102✔
180
                err = q.stop()
51✔
181
        })
51✔
182

183
        return err
51✔
184
}
185

186
// stop the queue and persists any items in the memory queue to disk.
187
func (q *DiskOverflowQueue[T]) stop() error {
51✔
188
        close(q.quit)
51✔
189

51✔
190
        // Signal on the inputListCond until all the goroutines have returned.
51✔
191
        shutdown := make(chan struct{})
51✔
192
        go func() {
102✔
193
                for {
159✔
194
                        select {
108✔
195
                        case <-time.After(time.Millisecond):
60✔
196
                                q.inputListCond.Signal()
60✔
197
                        case <-shutdown:
51✔
198
                                return
51✔
199
                        }
200
                }
201
        }()
202

203
        q.wg.Wait()
51✔
204
        close(shutdown)
51✔
205

51✔
206
        // queueHead will be the items that we will be pushed to the head of
51✔
207
        // the queue.
51✔
208
        var queueHead []T
51✔
209

51✔
210
        // First, we append leftOverItem1 since this task is the current head
51✔
211
        // of the queue.
51✔
212
        if q.leftOverItem1 != nil {
65✔
213
                queueHead = append(queueHead, *q.leftOverItem1)
14✔
214
        }
14✔
215

216
        // Next, drain the buffered queue.
217
        for {
6,122✔
218
                task, ok := <-q.memQueue
6,071✔
219
                if !ok {
6,122✔
220
                        break
51✔
221
                }
222

223
                queueHead = append(queueHead, task)
6,020✔
224
        }
225

226
        // Then, any items held in leftOverItems2 would have been next to join
227
        // the memQueue. So those gets added next.
228
        if len(q.leftOverItems2) != 0 {
62✔
229
                queueHead = append(queueHead, q.leftOverItems2...)
11✔
230
        }
11✔
231

232
        // Now, push these items to the head of the queue.
233
        err := q.db.PushHead(queueHead...)
51✔
234
        if err != nil {
51✔
235
                q.log.Errorf("Could not add tasks to queue head: %v", err)
×
236
        }
×
237

238
        // Next we handle any items that need to be added to the main disk
239
        // queue.
240
        var diskQueue []T
51✔
241

51✔
242
        // Any item in leftOverItem3 is the first item that should join the
51✔
243
        // disk queue.
51✔
244
        if q.leftOverItem3 != nil {
52✔
245
                diskQueue = append(diskQueue, *q.leftOverItem3)
1✔
246
        }
1✔
247

248
        // Lastly, drain any items in the unbuffered input list.
249
        q.inputListCond.L.Lock()
51✔
250
        for q.inputList.Front() != nil {
153,972✔
251
                e := q.inputList.Front()
153,921✔
252

153,921✔
253
                //nolint:forcetypeassert
153,921✔
254
                task := q.inputList.Remove(e).(T)
153,921✔
255

153,921✔
256
                diskQueue = append(diskQueue, task)
153,921✔
257
        }
153,921✔
258
        q.inputListCond.L.Unlock()
51✔
259

51✔
260
        // Now persist these items to the main disk queue.
51✔
261
        err = q.db.Push(diskQueue...)
51✔
262
        if err != nil {
51✔
263
                q.log.Errorf("Could not add tasks to queue tail: %v", err)
×
264
        }
×
265

266
        return nil
51✔
267
}
268

269
// QueueBackupID adds a wtdb.BackupID to the queue. It will only return an error
270
// if the queue has been stopped. It is non-blocking.
271
func (q *DiskOverflowQueue[T]) QueueBackupID(item *wtdb.BackupID) error {
200,495✔
272
        // Return an error if the queue has been stopped
200,495✔
273
        select {
200,495✔
274
        case <-q.quit:
×
275
                return ErrClientExiting
×
276
        default:
200,495✔
277
        }
278

279
        // Add the new item to the unbound input list.
280
        q.inputListCond.L.Lock()
200,495✔
281
        q.inputList.PushBack(item)
200,495✔
282
        q.inputListCond.L.Unlock()
200,495✔
283

200,495✔
284
        // Signal that there is a new item in the input list.
200,495✔
285
        q.inputListCond.Signal()
200,495✔
286

200,495✔
287
        return nil
200,495✔
288
}
289

290
// NextBackupID can be used to read from the head of the DiskOverflowQueue.
291
func (q *DiskOverflowQueue[T]) NextBackupID() <-chan T {
200,545✔
292
        return q.outputChan
200,545✔
293
}
200,545✔
294

295
// drainInputList handles the input to the DiskOverflowQueue. It takes from the
296
// un-bounded input list and then, depending on what mode the queue is in,
297
// either puts the new item straight onto the persisted disk queue or attempts
298
// to feed it into the memQueue. On exit, any unhandled task will be assigned to
299
// leftOverItem3.
300
func (q *DiskOverflowQueue[T]) drainInputList() {
51✔
301
        defer q.wg.Done()
51✔
302

51✔
303
        for {
46,671✔
304
                // Wait for the input list to not be empty.
46,620✔
305
                q.inputListCond.L.Lock()
46,620✔
306
                for q.inputList.Front() == nil {
46,721✔
307
                        q.inputListCond.Wait()
101✔
308

101✔
309
                        select {
101✔
310
                        case <-q.quit:
49✔
311
                                q.inputListCond.L.Unlock()
49✔
312
                                return
49✔
313
                        default:
55✔
314
                        }
315
                }
316

317
                // Pop the first element from the queue.
318
                e := q.inputList.Front()
46,574✔
319

46,574✔
320
                //nolint:forcetypeassert
46,574✔
321
                task := q.inputList.Remove(e).(T)
46,574✔
322
                q.inputListCond.L.Unlock()
46,574✔
323

46,574✔
324
                // What we do with this new item depends on what the mode of the
46,574✔
325
                // queue currently is.
46,574✔
326
                for q.pushToActiveQueue(task) {
64,865✔
327
                        // We retry until the task is handled or the quit
18,291✔
328
                        // channel is closed.
18,291✔
329
                }
18,291✔
330

331
                // If the above returned false because the quit channel was
332
                // closed, then we exit.
333
                select {
46,574✔
334
                case <-q.quit:
2✔
335
                        return
2✔
336
                default:
46,572✔
337
                }
338
        }
339
}
340

341
// pushToActiveQueue handles the input of a new task to the queue. It returns
342
// true if the task should be retried and false if the task was handled or the
343
// quit channel fired.
344
func (q *DiskOverflowQueue[T]) pushToActiveQueue(task T) bool {
64,865✔
345
        // If the queue is in disk mode then any new items should be put
64,865✔
346
        // straight into the disk queue.
64,865✔
347
        if q.toDisk.Load() {
65,301✔
348
                err := q.db.Push(task)
436✔
349
                if err != nil {
436✔
350
                        // Log and back off for a few seconds and then
×
351
                        // try again with the same task.
×
352
                        q.log.Errorf("could not persist %s to disk. "+
×
353
                                "Retrying after backoff", task)
×
354

×
355
                        select {
×
356
                        // Backoff for a bit and then re-check the mode
357
                        // and try again to handle the task.
358
                        case <-time.After(dbErrorBackoff):
×
359
                                return true
×
360

361
                        // If the queue is quit at this moment, then the
362
                        // unhandled task is assigned to leftOverItem3
363
                        // so that it can be handled by the stop method.
364
                        case <-q.quit:
×
365
                                q.leftOverItem3 = &task
×
366

×
367
                                return false
×
368
                        }
369
                }
370

371
                // Send a signal that there is a new item in the main
372
                // disk queue.
373
                select {
436✔
374
                case q.newDiskItemSignal <- struct{}{}:
74✔
UNCOV
375
                case <-q.quit:
×
376

377
                // Because there might already be a signal in the
378
                // newDiskItemSignal channel, we can skip sending another
379
                // signal. The channel only has a buffer of one, so we would
380
                // block here if we didn't have a default case.
381
                default:
362✔
382
                }
383

384
                // If we got here, we were able to store the task in the disk
385
                // queue, so we can return false as no retry is necessary.
386
                return false
436✔
387
        }
388

389
        // If the mode is memory mode, then try feed it to the feedMemQueue
390
        // handler via the un-buffered inputChan channel. We wrap it in an
391
        // internal task so that we can find out if feedMemQueue successfully
392
        // handled the item. If it did, we continue in memory mode and if not,
393
        // then we switch to disk mode so that we can persist the item to the
394
        // disk queue instead.
395
        it := newInternalTask(task)
64,429✔
396

64,429✔
397
        select {
64,429✔
398
        // Try feed the task to the feedMemQueue handler. The handler, if it
399
        // does take the task, is guaranteed to respond via the success channel
400
        // of the task to indicate if the task was successfully added to the
401
        // in-mem queue. This is guaranteed even if the queue is being stopped.
402
        case q.inputChan <- it:
46,178✔
403

404
        // If the queue is quit at this moment, then the unhandled task is
405
        // assigned to leftOverItem3 so that it can be handled by the stop
406
        // method.
407
        case <-q.quit:
1✔
408
                q.leftOverItem3 = &task
1✔
409

1✔
410
                return false
1✔
411

412
        default:
18,250✔
413
                // The task was not accepted. So maybe the mode changed.
18,250✔
414
                return true
18,250✔
415
        }
416

417
        // If we get here, it means that the feedMemQueue handler took the task.
418
        // It is guaranteed to respond via the success channel, so we wait for
419
        // that response here.
420
        s := <-it.success
46,178✔
421
        if s {
92,315✔
422
                return false
46,137✔
423
        }
46,137✔
424

425
        // If the task was not successfully handled by feedMemQueue, then we
426
        // switch to disk mode so that the task can be persisted in the disk
427
        // queue instead.
428
        q.toDisk.Store(true)
41✔
429

41✔
430
        return true
41✔
431
}
432

433
// feedMemQueue manages which items should be fed onto the buffered
434
// memQueue. If the queue is then in disk mode, then the handler will read new
435
// tasks from the disk queue until it is empty. After that, it will switch
436
// between reading from the input channel or the disk queue depending on the
437
// queue mode.
438
func (q *DiskOverflowQueue[T]) feedMemQueue() {
51✔
439
        defer func() {
102✔
440
                close(q.memQueue)
51✔
441
                q.wg.Done()
51✔
442
        }()
51✔
443

444
        feedFromDisk := func() {
138✔
445
                select {
87✔
446
                case <-q.quit:
2✔
447
                        return
2✔
448
                default:
85✔
449
                }
450

451
                for {
779✔
452
                        // Ideally, we want to do batch reads from the DB. So
694✔
453
                        // we check how much capacity there is in the memQueue
694✔
454
                        // and fetch enough tasks to fill that capacity. If
694✔
455
                        // there is no capacity, however, then we at least want
694✔
456
                        // to fetch one task.
694✔
457
                        numToPop := cap(q.memQueue) - len(q.memQueue)
694✔
458
                        if numToPop == 0 {
1,216✔
459
                                numToPop = 1
522✔
460
                        }
522✔
461

462
                        tasks, err := q.db.PopUpTo(numToPop)
694✔
463
                        if errors.Is(err, wtdb.ErrEmptyQueue) {
768✔
464
                                q.toDisk.Store(false)
74✔
465

74✔
466
                                return
74✔
467
                        } else if err != nil {
694✔
468
                                q.log.Errorf("Could not load next task from " +
×
469
                                        "disk. Retrying.")
×
470

×
471
                                select {
×
472
                                case <-time.After(dbErrorBackoff):
×
473
                                        continue
×
474
                                case <-q.quit:
×
475
                                        return
×
476
                                }
477
                        }
478

479
                        // If we did manage to fetch a task from disk, we make
480
                        // sure to set the toDisk mode to true since we may
481
                        // block indefinitely while trying to push the tasks to
482
                        // the memQueue in which case we want the drainInputList
483
                        // goroutine to write any new tasks to disk.
484
                        q.toDisk.Store(true)
620✔
485

620✔
486
                        for i, task := range tasks {
161,023✔
487
                                select {
160,403✔
488
                                case q.memQueue <- task:
160,392✔
489

490
                                // If the queue is quit at this moment, then the
491
                                // unhandled tasks are assigned to
492
                                // leftOverItems2 so that they can be handled
493
                                // by the stop method.
494
                                case <-q.quit:
11✔
495
                                        q.leftOverItems2 = tasks[i:]
11✔
496
                                        return
11✔
497
                                }
498
                        }
499
                }
500
        }
501

502
        // If the queue is in disk mode, then the memQueue is fed with tasks
503
        // from the disk queue until it is empty.
504
        if q.toDisk.Load() {
65✔
505
                feedFromDisk()
14✔
506
        }
14✔
507

508
        // Now the queue enters its normal operation.
509
        for {
46,350✔
510
                select {
46,299✔
511
                case <-q.quit:
51✔
512
                        return
51✔
513

514
                // If there is a signal that a new item has been added to disk
515
                // then we use the disk queue as the source of the next task
516
                // to feed into memQueue.
517
                case <-q.newDiskItemSignal:
73✔
518
                        feedFromDisk()
73✔
519

520
                // If any items come through on the inputChan, then we try feed
521
                // these directly into the memQueue. If there is space in the
522
                // memeQueue then we respond with success to the producer,
523
                // otherwise we respond with failure so that the producer can
524
                // instead persist the task to disk. After the producer,
525
                // drainInputList, has pushed an item to inputChan, it is
526
                // guaranteed to await a response on the task's success channel
527
                // before quitting. Therefore, it is not required to listen on
528
                // the quit channel here.
529
                case task := <-q.inputChan:
46,178✔
530
                        select {
46,178✔
531
                        case q.memQueue <- task.task:
46,137✔
532
                                task.success <- true
46,137✔
533
                                continue
46,137✔
534
                        default:
41✔
535
                                task.success <- false
41✔
536
                        }
537
                }
538
        }
539
}
540

541
// feedOutputChan will pop an item from the buffered memQueue and block until
542
// the item is taken from the un-buffered outputChan. This is done repeatedly
543
// for the lifetime of the DiskOverflowQueue. On shutdown of the queue, any
544
// item not consumed by the outputChan but held by this method is assigned to
545
// the leftOverItem1 member so that the Stop method can persist the item to
546
// disk so that it is reloaded on restart.
547
//
548
// NOTE: This must be run as a goroutine.
549
func (q *DiskOverflowQueue[T]) feedOutputChan() {
51✔
550
        defer func() {
102✔
551
                close(q.outputChan)
51✔
552
                q.wg.Done()
51✔
553
        }()
51✔
554

555
        for {
200,594✔
556
                select {
200,543✔
557
                case nextTask, ok := <-q.memQueue:
200,509✔
558
                        // If the memQueue is closed, then the queue is
200,509✔
559
                        // stopping.
200,509✔
560
                        if !ok {
200,509✔
561
                                return
×
562
                        }
×
563

564
                        select {
200,509✔
565
                        case q.outputChan <- nextTask:
200,495✔
566
                        case <-q.quit:
14✔
567
                                q.leftOverItem1 = &nextTask
14✔
568
                                return
14✔
569
                        }
570

571
                case <-q.quit:
37✔
572
                        return
37✔
573
                }
574
        }
575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc