• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12199391122

06 Dec 2024 01:10PM UTC coverage: 49.807% (-9.1%) from 58.933%
12199391122

push

github

web-flow
Merge pull request #9337 from Guayaba221/patch-1

chore: fix typo in ruby.md

100137 of 201051 relevant lines covered (49.81%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.32
/pool/worker.go
1
package pool
2

3
import (
4
        "errors"
5
        "sync"
6
        "time"
7
)
8

9
// ErrWorkerPoolExiting signals that a shutdown of the Worker has been
10
// requested.
11
var ErrWorkerPoolExiting = errors.New("worker pool exiting")
12

13
// DefaultWorkerTimeout is the default duration after which a worker goroutine
14
// will exit to free up resources after having received no newly submitted
15
// tasks.
16
const DefaultWorkerTimeout = 90 * time.Second
17

18
type (
19
        // WorkerState is an interface used by the Worker to abstract the
20
        // lifecycle of internal state used by a worker goroutine.
21
        WorkerState interface {
22
                // Reset clears any internal state that may have been dirtied in
23
                // processing a prior task.
24
                Reset()
25

26
                // Cleanup releases any shared state before a worker goroutine
27
                // exits.
28
                Cleanup()
29
        }
30

31
        // WorkerConfig parametrizes the behavior of a Worker pool.
32
        WorkerConfig struct {
33
                // NewWorkerState allocates a new state for a worker goroutine.
34
                // This method is called each time a new worker goroutine is
35
                // spawned by the pool.
36
                NewWorkerState func() WorkerState
37

38
                // NumWorkers is the maximum number of workers the Worker pool
39
                // will permit to be allocated. Once the maximum number is
40
                // reached, any newly submitted tasks are forced to be processed
41
                // by existing worker goroutines.
42
                NumWorkers int
43

44
                // WorkerTimeout is the duration after which a worker goroutine
45
                // will exit after having received no newly submitted tasks.
46
                WorkerTimeout time.Duration
47
        }
48

49
        // Worker maintains a pool of goroutines that process submitted function
50
        // closures, and enable more efficient reuse of expensive state.
51
        Worker struct {
52
                started sync.Once
53
                stopped sync.Once
54

55
                cfg *WorkerConfig
56

57
                // requests is a channel where new tasks are submitted. Tasks
58
                // submitted through this channel may cause a new worker
59
                // goroutine to be allocated.
60
                requests chan *request
61

62
                // work is a channel where new tasks are submitted, but is only
63
                // read by active worker goroutines.
64
                work chan *request
65

66
                // workerSem is a channel-based semaphore that is used to limit
67
                // the total number of worker goroutines to the number
68
                // prescribed by the WorkerConfig.
69
                workerSem chan struct{}
70

71
                wg   sync.WaitGroup
72
                quit chan struct{}
73
        }
74

75
        // request is a tuple of task closure and error channel that is used to
76
        // both submit a task to the pool and respond with any errors
77
        // encountered during the task's execution.
78
        request struct {
79
                fn      func(WorkerState) error
80
                errChan chan error
81
        }
82
)
83

84
// NewWorker initializes a new Worker pool using the provided WorkerConfig.
85
func NewWorker(cfg *WorkerConfig) *Worker {
4✔
86
        return &Worker{
4✔
87
                cfg:       cfg,
4✔
88
                requests:  make(chan *request),
4✔
89
                workerSem: make(chan struct{}, cfg.NumWorkers),
4✔
90
                work:      make(chan *request),
4✔
91
                quit:      make(chan struct{}),
4✔
92
        }
4✔
93
}
4✔
94

95
// Start safely spins up the Worker pool.
96
func (w *Worker) Start() error {
4✔
97
        w.started.Do(func() {
8✔
98
                w.wg.Add(1)
4✔
99
                go w.requestHandler()
4✔
100
        })
4✔
101
        return nil
4✔
102
}
103

104
// Stop safely shuts down the Worker pool.
105
func (w *Worker) Stop() error {
4✔
106
        w.stopped.Do(func() {
8✔
107
                close(w.quit)
4✔
108
                w.wg.Wait()
4✔
109
        })
4✔
110
        return nil
4✔
111
}
112

113
// Submit accepts a function closure to the worker pool. The returned error will
114
// be either the result of the closure's execution or an ErrWorkerPoolExiting if
115
// a shutdown is requested.
116
func (w *Worker) Submit(fn func(WorkerState) error) error {
4✔
117
        req := &request{
4✔
118
                fn:      fn,
4✔
119
                errChan: make(chan error, 1),
4✔
120
        }
4✔
121

4✔
122
        select {
4✔
123

124
        // Send request to requestHandler, where either a new worker is spawned
125
        // or the task will be handed to an existing worker.
126
        case w.requests <- req:
4✔
127

128
        // Fast path directly to existing worker.
129
        case w.work <- req:
4✔
130

131
        case <-w.quit:
×
132
                return ErrWorkerPoolExiting
×
133
        }
134

135
        select {
4✔
136

137
        // Wait for task to be processed.
138
        case err := <-req.errChan:
4✔
139
                return err
4✔
140

141
        case <-w.quit:
×
142
                return ErrWorkerPoolExiting
×
143
        }
144
}
145

146
// requestHandler processes incoming tasks by either allocating new worker
147
// goroutines to process the incoming tasks, or by feeding a submitted task to
148
// an already running worker goroutine.
149
func (w *Worker) requestHandler() {
4✔
150
        defer w.wg.Done()
4✔
151

4✔
152
        for {
8✔
153
                select {
4✔
154
                case req := <-w.requests:
4✔
155
                        select {
4✔
156

157
                        // If we have not reached our maximum number of workers,
158
                        // spawn one to process the submitted request.
159
                        case w.workerSem <- struct{}{}:
4✔
160
                                w.wg.Add(1)
4✔
161
                                go w.spawnWorker(req)
4✔
162

163
                        // Otherwise, submit the task to any of the active
164
                        // workers.
165
                        case w.work <- req:
4✔
166

167
                        case <-w.quit:
×
168
                                return
×
169
                        }
170

171
                case <-w.quit:
4✔
172
                        return
4✔
173
                }
174
        }
175
}
176

177
// spawnWorker is used when the Worker pool wishes to create a new worker
178
// goroutine. The worker's state is initialized by calling the config's
179
// NewWorkerState method, and will continue to process incoming tasks until the
180
// pool is shut down or no new tasks are received before the worker's timeout
181
// elapses.
182
//
183
// NOTE: This method MUST be run as a goroutine.
184
func (w *Worker) spawnWorker(req *request) {
4✔
185
        defer w.wg.Done()
4✔
186
        defer func() { <-w.workerSem }()
8✔
187

188
        state := w.cfg.NewWorkerState()
4✔
189
        defer state.Cleanup()
4✔
190

4✔
191
        req.errChan <- req.fn(state)
4✔
192

4✔
193
        // We'll use a timer to implement the worker timeouts, as this reduces
4✔
194
        // the number of total allocations that would otherwise be necessary
4✔
195
        // with time.After.
4✔
196
        var t *time.Timer
4✔
197
        for {
8✔
198
                // Before processing another request, we'll reset the worker
4✔
199
                // state to that each request is processed against a clean
4✔
200
                // state.
4✔
201
                state.Reset()
4✔
202

4✔
203
                select {
4✔
204

205
                // Process any new requests that get submitted. We use a
206
                // non-blocking case first so that under high load we can spare
207
                // allocating a timeout.
208
                case req := <-w.work:
×
209
                        req.errChan <- req.fn(state)
×
210
                        continue
×
211

212
                case <-w.quit:
×
213
                        return
×
214

215
                default:
4✔
216
                }
217

218
                // There were no new requests that could be taken immediately
219
                // from the work channel. Initialize or reset the timeout, which
220
                // will fire if the worker doesn't receive a new task before
221
                // needing to exit.
222
                if t != nil {
8✔
223
                        t.Reset(w.cfg.WorkerTimeout)
4✔
224
                } else {
8✔
225
                        t = time.NewTimer(w.cfg.WorkerTimeout)
4✔
226
                }
4✔
227

228
                select {
4✔
229

230
                // Process any new requests that get submitted.
231
                case req := <-w.work:
4✔
232
                        req.errChan <- req.fn(state)
4✔
233

4✔
234
                        // Stop the timer, draining the timer's channel if a
4✔
235
                        // notification was already delivered.
4✔
236
                        if !t.Stop() {
4✔
237
                                <-t.C
×
238
                        }
×
239

240
                // The timeout has elapsed, meaning the worker did not receive
241
                // any new tasks. Exit to allow the worker to return and free
242
                // its resources.
243
                case <-t.C:
4✔
244
                        return
4✔
245

246
                case <-w.quit:
4✔
247
                        return
4✔
248
                }
249
        }
250
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc