• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13481548301

23 Feb 2025 09:06AM UTC coverage: 4.031% (-54.8%) from 58.825%
13481548301

Pull #9521

github

web-flow
Merge 1ffbe99fe into 5fe900d18
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

2852 of 70750 relevant lines covered (4.03%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/queue/queue.go
1
package queue
2

3
import (
4
        "container/list"
5
        "sync"
6
)
7

8
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
9
// Clients interact with the queue by pushing items into the in channel and
10
// popping items from the out channel. There is a goroutine that manages moving
11
// items from the in channel to the out channel in the correct order that must
12
// be started by calling Start().
13
type ConcurrentQueue struct {
14
        started sync.Once
15
        stopped sync.Once
16

17
        chanIn   chan interface{}
18
        chanOut  chan interface{}
19
        overflow *list.List
20

21
        wg   sync.WaitGroup
22
        quit chan struct{}
23
}
24

25
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
26
// the capacity of the output channel. When the size of the queue is below this
27
// threshold, pushes do not incur the overhead of the less efficient overflow
28
// structure.
29
func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
×
30
        return &ConcurrentQueue{
×
31
                chanIn:   make(chan interface{}),
×
32
                chanOut:  make(chan interface{}, bufferSize),
×
33
                overflow: list.New(),
×
34
                quit:     make(chan struct{}),
×
35
        }
×
36
}
×
37

38
// ChanIn returns a channel that can be used to push new items into the queue.
39
func (cq *ConcurrentQueue) ChanIn() chan<- interface{} {
×
40
        return cq.chanIn
×
41
}
×
42

43
// ChanOut returns a channel that can be used to pop items from the queue.
44
func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
×
45
        return cq.chanOut
×
46
}
×
47

48
// Start begins a goroutine that manages moving items from the in channel to the
49
// out channel. The queue tries to move items directly to the out channel
50
// minimize overhead, but if the out channel is full it pushes items to an
51
// overflow queue. This must be called before using the queue.
52
func (cq *ConcurrentQueue) Start() {
×
53
        cq.started.Do(cq.start)
×
54
}
×
55

56
func (cq *ConcurrentQueue) start() {
×
57
        cq.wg.Add(1)
×
58
        go func() {
×
59
                defer cq.wg.Done()
×
60

×
61
        readLoop:
×
62
                for {
×
63
                        nextElement := cq.overflow.Front()
×
64
                        if nextElement == nil {
×
65
                                // Overflow queue is empty so incoming items can be pushed
×
66
                                // directly to the output channel. If output channel is full
×
67
                                // though, push to overflow.
×
68
                                select {
×
69
                                case item, ok := <-cq.chanIn:
×
70
                                        if !ok {
×
71
                                                break readLoop
×
72
                                        }
73
                                        select {
×
74
                                        case cq.chanOut <- item:
×
75
                                                // Optimistically push directly to chanOut
76
                                        default:
×
77
                                                cq.overflow.PushBack(item)
×
78
                                        }
79
                                case <-cq.quit:
×
80
                                        return
×
81
                                }
82
                        } else {
×
83
                                // Overflow queue is not empty, so any new items get pushed to
×
84
                                // the back to preserve order.
×
85
                                select {
×
86
                                case item, ok := <-cq.chanIn:
×
87
                                        if !ok {
×
88
                                                break readLoop
×
89
                                        }
90
                                        cq.overflow.PushBack(item)
×
91
                                case cq.chanOut <- nextElement.Value:
×
92
                                        cq.overflow.Remove(nextElement)
×
93
                                case <-cq.quit:
×
94
                                        return
×
95
                                }
96
                        }
97
                }
98

99
                // Incoming channel has been closed. Empty overflow queue into
100
                // the outgoing channel.
101
                nextElement := cq.overflow.Front()
×
102
                for nextElement != nil {
×
103
                        select {
×
104
                        case cq.chanOut <- nextElement.Value:
×
105
                                cq.overflow.Remove(nextElement)
×
106
                        case <-cq.quit:
×
107
                                return
×
108
                        }
109
                        nextElement = cq.overflow.Front()
×
110
                }
111

112
                // Close outgoing channel.
113
                close(cq.chanOut)
×
114
        }()
115
}
116

117
// Stop ends the goroutine that moves items from the in channel to the out
118
// channel. This does not clear the queue state, so the queue can be restarted
119
// without dropping items.
120
func (cq *ConcurrentQueue) Stop() {
×
121
        cq.stopped.Do(func() {
×
122
                close(cq.quit)
×
123
                cq.wg.Wait()
×
124
        })
×
125
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc