• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13593508312

28 Feb 2025 05:41PM UTC coverage: 58.287% (-10.4%) from 68.65%
13593508312

Pull #9458

github

web-flow
Merge d40067c0c into f1182e433
Pull Request #9458: multi+server.go: add initial permissions for some peers

346 of 548 new or added lines in 10 files covered. (63.14%)

27412 existing lines in 442 files now uncovered.

94709 of 162488 relevant lines covered (58.29%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.32
/pool/worker.go
1
package pool
2

3
import (
4
        "errors"
5
        "sync"
6
        "time"
7
)
8

9
// ErrWorkerPoolExiting signals that a shutdown of the Worker has been
10
// requested.
11
var ErrWorkerPoolExiting = errors.New("worker pool exiting")
12

13
// DefaultWorkerTimeout is the default duration after which a worker goroutine
14
// will exit to free up resources after having received no newly submitted
15
// tasks.
16
const DefaultWorkerTimeout = 90 * time.Second
17

18
type (
19
        // WorkerState is an interface used by the Worker to abstract the
20
        // lifecycle of internal state used by a worker goroutine.
21
        WorkerState interface {
22
                // Reset clears any internal state that may have been dirtied in
23
                // processing a prior task.
24
                Reset()
25

26
                // Cleanup releases any shared state before a worker goroutine
27
                // exits.
28
                Cleanup()
29
        }
30

31
        // WorkerConfig parametrizes the behavior of a Worker pool.
32
        WorkerConfig struct {
33
                // NewWorkerState allocates a new state for a worker goroutine.
34
                // This method is called each time a new worker goroutine is
35
                // spawned by the pool.
36
                NewWorkerState func() WorkerState
37

38
                // NumWorkers is the maximum number of workers the Worker pool
39
                // will permit to be allocated. Once the maximum number is
40
                // reached, any newly submitted tasks are forced to be processed
41
                // by existing worker goroutines.
42
                NumWorkers int
43

44
                // WorkerTimeout is the duration after which a worker goroutine
45
                // will exit after having received no newly submitted tasks.
46
                WorkerTimeout time.Duration
47
        }
48

49
        // Worker maintains a pool of goroutines that process submitted function
50
        // closures, and enable more efficient reuse of expensive state.
51
        Worker struct {
52
                started sync.Once
53
                stopped sync.Once
54

55
                cfg *WorkerConfig
56

57
                // requests is a channel where new tasks are submitted. Tasks
58
                // submitted through this channel may cause a new worker
59
                // goroutine to be allocated.
60
                requests chan *request
61

62
                // work is a channel where new tasks are submitted, but is only
63
                // read by active worker goroutines.
64
                work chan *request
65

66
                // workerSem is a channel-based semaphore that is used to limit
67
                // the total number of worker goroutines to the number
68
                // prescribed by the WorkerConfig.
69
                workerSem chan struct{}
70

71
                wg   sync.WaitGroup
72
                quit chan struct{}
73
        }
74

75
        // request is a tuple of task closure and error channel that is used to
76
        // both submit a task to the pool and respond with any errors
77
        // encountered during the task's execution.
78
        request struct {
79
                fn      func(WorkerState) error
80
                errChan chan error
81
        }
82
)
83

84
// NewWorker initializes a new Worker pool using the provided WorkerConfig.
85
func NewWorker(cfg *WorkerConfig) *Worker {
3✔
86
        return &Worker{
3✔
87
                cfg:       cfg,
3✔
88
                requests:  make(chan *request),
3✔
89
                workerSem: make(chan struct{}, cfg.NumWorkers),
3✔
90
                work:      make(chan *request),
3✔
91
                quit:      make(chan struct{}),
3✔
92
        }
3✔
93
}
3✔
94

95
// Start safely spins up the Worker pool.
96
func (w *Worker) Start() error {
3✔
97
        w.started.Do(func() {
6✔
98
                w.wg.Add(1)
3✔
99
                go w.requestHandler()
3✔
100
        })
3✔
101
        return nil
3✔
102
}
103

104
// Stop safely shuts down the Worker pool.
105
func (w *Worker) Stop() error {
3✔
106
        w.stopped.Do(func() {
6✔
107
                close(w.quit)
3✔
108
                w.wg.Wait()
3✔
109
        })
3✔
110
        return nil
3✔
111
}
112

113
// Submit accepts a function closure to the worker pool. The returned error will
114
// be either the result of the closure's execution or an ErrWorkerPoolExiting if
115
// a shutdown is requested.
116
func (w *Worker) Submit(fn func(WorkerState) error) error {
3✔
117
        req := &request{
3✔
118
                fn:      fn,
3✔
119
                errChan: make(chan error, 1),
3✔
120
        }
3✔
121

3✔
122
        select {
3✔
123

124
        // Send request to requestHandler, where either a new worker is spawned
125
        // or the task will be handed to an existing worker.
126
        case w.requests <- req:
3✔
127

128
        // Fast path directly to existing worker.
129
        case w.work <- req:
3✔
130

131
        case <-w.quit:
×
132
                return ErrWorkerPoolExiting
×
133
        }
134

135
        select {
3✔
136

137
        // Wait for task to be processed.
138
        case err := <-req.errChan:
3✔
139
                return err
3✔
140

141
        case <-w.quit:
×
142
                return ErrWorkerPoolExiting
×
143
        }
144
}
145

146
// requestHandler processes incoming tasks by either allocating new worker
147
// goroutines to process the incoming tasks, or by feeding a submitted task to
148
// an already running worker goroutine.
149
func (w *Worker) requestHandler() {
3✔
150
        defer w.wg.Done()
3✔
151

3✔
152
        for {
6✔
153
                select {
3✔
154
                case req := <-w.requests:
3✔
155
                        select {
3✔
156

157
                        // If we have not reached our maximum number of workers,
158
                        // spawn one to process the submitted request.
159
                        case w.workerSem <- struct{}{}:
3✔
160
                                w.wg.Add(1)
3✔
161
                                go w.spawnWorker(req)
3✔
162

163
                        // Otherwise, submit the task to any of the active
164
                        // workers.
165
                        case w.work <- req:
3✔
166

167
                        case <-w.quit:
×
168
                                return
×
169
                        }
170

171
                case <-w.quit:
3✔
172
                        return
3✔
173
                }
174
        }
175
}
176

177
// spawnWorker is used when the Worker pool wishes to create a new worker
178
// goroutine. The worker's state is initialized by calling the config's
179
// NewWorkerState method, and will continue to process incoming tasks until the
180
// pool is shut down or no new tasks are received before the worker's timeout
181
// elapses.
182
//
183
// NOTE: This method MUST be run as a goroutine.
184
func (w *Worker) spawnWorker(req *request) {
3✔
185
        defer w.wg.Done()
3✔
186
        defer func() { <-w.workerSem }()
6✔
187

188
        state := w.cfg.NewWorkerState()
3✔
189
        defer state.Cleanup()
3✔
190

3✔
191
        req.errChan <- req.fn(state)
3✔
192

3✔
193
        // We'll use a timer to implement the worker timeouts, as this reduces
3✔
194
        // the number of total allocations that would otherwise be necessary
3✔
195
        // with time.After.
3✔
196
        var t *time.Timer
3✔
197
        for {
6✔
198
                // Before processing another request, we'll reset the worker
3✔
199
                // state to that each request is processed against a clean
3✔
200
                // state.
3✔
201
                state.Reset()
3✔
202

3✔
203
                select {
3✔
204

205
                // Process any new requests that get submitted. We use a
206
                // non-blocking case first so that under high load we can spare
207
                // allocating a timeout.
UNCOV
208
                case req := <-w.work:
×
UNCOV
209
                        req.errChan <- req.fn(state)
×
UNCOV
210
                        continue
×
211

212
                case <-w.quit:
×
213
                        return
×
214

215
                default:
3✔
216
                }
217

218
                // There were no new requests that could be taken immediately
219
                // from the work channel. Initialize or reset the timeout, which
220
                // will fire if the worker doesn't receive a new task before
221
                // needing to exit.
222
                if t != nil {
6✔
223
                        t.Reset(w.cfg.WorkerTimeout)
3✔
224
                } else {
6✔
225
                        t = time.NewTimer(w.cfg.WorkerTimeout)
3✔
226
                }
3✔
227

228
                select {
3✔
229

230
                // Process any new requests that get submitted.
231
                case req := <-w.work:
3✔
232
                        req.errChan <- req.fn(state)
3✔
233

3✔
234
                        // Stop the timer, draining the timer's channel if a
3✔
235
                        // notification was already delivered.
3✔
236
                        if !t.Stop() {
3✔
237
                                <-t.C
×
238
                        }
×
239

240
                // The timeout has elapsed, meaning the worker did not receive
241
                // any new tasks. Exit to allow the worker to return and free
242
                // its resources.
243
                case <-t.C:
1✔
244
                        return
1✔
245

246
                case <-w.quit:
3✔
247
                        return
3✔
248
                }
249
        }
250
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc