• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14193549836

01 Apr 2025 10:40AM UTC coverage: 69.046% (+0.007%) from 69.039%
14193549836

Pull #9665

github

web-flow
Merge e8825f209 into b01f4e514
Pull Request #9665: kvdb: bump etcd libs to v3.5.12

133439 of 193262 relevant lines covered (69.05%)

22119.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.14
/routing/chainview/queue.go
1
package chainview
2

3
import "sync"
4

5
// blockEventType is the possible types of a blockEvent.
6
type blockEventType uint8
7

8
const (
9
        // connected is the type of a blockEvent representing a block
10
        // that was connected to our current chain.
11
        connected blockEventType = iota
12

13
        // disconnected is the type of a blockEvent representing a
14
        // block that is stale/disconnected from our current chain.
15
        disconnected
16
)
17

18
// blockEvent represent a block that was either connected
19
// or disconnected from the current chain.
20
type blockEvent struct {
21
        eventType blockEventType
22
        block     *FilteredBlock
23
}
24

25
// blockEventQueue is an ordered queue for block events sent from a
26
// FilteredChainView. The two types of possible block events are
27
// connected/new blocks, and disconnected/stale blocks. The
28
// blockEventQueue keeps the order of these events intact, while
29
// still being non-blocking. This is important in order for the
30
// chainView's call to onBlockConnected/onBlockDisconnected to not
31
// get blocked, and for the consumer of the block events to always
32
// get the events in the correct order.
33
type blockEventQueue struct {
34
        queueCond *sync.Cond
35
        queueMtx  sync.Mutex
36
        queue     []*blockEvent
37

38
        // newBlocks is the channel where the consumer of the queue
39
        // will receive connected/new blocks from the FilteredChainView.
40
        newBlocks chan *FilteredBlock
41

42
        // staleBlocks is the channel where the consumer of the queue will
43
        // receive disconnected/stale blocks from the FilteredChainView.
44
        staleBlocks chan *FilteredBlock
45

46
        wg   sync.WaitGroup
47
        quit chan struct{}
48
}
49

50
// newBlockEventQueue creates a new blockEventQueue.
51
func newBlockEventQueue() *blockEventQueue {
23✔
52
        b := &blockEventQueue{
23✔
53
                newBlocks:   make(chan *FilteredBlock),
23✔
54
                staleBlocks: make(chan *FilteredBlock),
23✔
55
                quit:        make(chan struct{}),
23✔
56
        }
23✔
57
        b.queueCond = sync.NewCond(&b.queueMtx)
23✔
58

23✔
59
        return b
23✔
60
}
23✔
61

62
// Start starts the blockEventQueue coordinator such that it can start handling
63
// events.
64
func (b *blockEventQueue) Start() {
23✔
65
        b.wg.Add(1)
23✔
66
        go b.queueCoordinator()
23✔
67
}
23✔
68

69
// Stop signals the queue coordinator to stop, such that the queue can be
70
// shut down.
71
func (b *blockEventQueue) Stop() {
23✔
72
        close(b.quit)
23✔
73

23✔
74
        b.queueCond.Signal()
23✔
75
}
23✔
76

77
// queueCoordinator is the queue's main loop, handling incoming block events
78
// and handing them off to the correct output channel.
79
//
80
// NB: MUST be run as a goroutine from the Start() method.
81
func (b *blockEventQueue) queueCoordinator() {
23✔
82
        defer b.wg.Done()
23✔
83

23✔
84
        for {
3,518✔
85
                // First, we'll check our condition. If the queue of events is
3,495✔
86
                // empty, then we'll wait until a new item is added.
3,495✔
87
                b.queueCond.L.Lock()
3,495✔
88
                for len(b.queue) == 0 {
6,052✔
89
                        b.queueCond.Wait()
2,557✔
90

2,557✔
91
                        // If we were woke up in order to exit, then we'll do
2,557✔
92
                        // so. Otherwise, we'll check the queue for any new
2,557✔
93
                        // items.
2,557✔
94
                        select {
2,557✔
95
                        case <-b.quit:
19✔
96
                                b.queueCond.L.Unlock()
19✔
97
                                return
19✔
98
                        default:
2,541✔
99
                        }
100
                }
101

102
                // Grab the first element in the queue, and nil the index to
103
                // avoid gc leak.
104
                event := b.queue[0]
3,479✔
105
                b.queue[0] = nil
3,479✔
106
                b.queue = b.queue[1:]
3,479✔
107
                b.queueCond.L.Unlock()
3,479✔
108

3,479✔
109
                // In the case this is a connected block, we'll send it on the
3,479✔
110
                // newBlocks channel. In case it is a disconnected block, we'll
3,479✔
111
                // send it on the staleBlocks channel. This send will block
3,479✔
112
                // until it is received by the consumer on the other end, making
3,479✔
113
                // sure we won't try to send any other block event before the
3,479✔
114
                // consumer is aware of this one.
3,479✔
115
                switch event.eventType {
3,479✔
116
                case connected:
1,819✔
117
                        select {
1,819✔
118
                        case b.newBlocks <- event.block:
1,815✔
119
                        case <-b.quit:
4✔
120
                                return
4✔
121
                        }
122
                case disconnected:
1,662✔
123
                        select {
1,662✔
124
                        case b.staleBlocks <- event.block:
1,662✔
125
                        case <-b.quit:
×
126
                                return
×
127
                        }
128
                }
129
        }
130
}
131

132
// Add puts the provided blockEvent at the end of the event queue, making sure
133
// it will first be received after all previous events. This method is
134
// non-blocking, in the sense that it will never wait for the consumer of the
135
// queue to read form the other end, making it safe to call from the
136
// FilteredChainView's onBlockConnected/onBlockDisconnected.
137
func (b *blockEventQueue) Add(event *blockEvent) {
3,515✔
138

3,515✔
139
        // Lock the condition, and add the event to the end of queue.
3,515✔
140
        b.queueCond.L.Lock()
3,515✔
141
        b.queue = append(b.queue, event)
3,515✔
142
        b.queueCond.L.Unlock()
3,515✔
143

3,515✔
144
        // With the event added, we signal to the queueCoordinator that
3,515✔
145
        // there are new events to handle.
3,515✔
146
        b.queueCond.Signal()
3,515✔
147
}
3,515✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc