• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.32
/pool/worker.go
1
package pool
2

3
import (
4
        "errors"
5
        "sync"
6
        "time"
7
)
8

9
// ErrWorkerPoolExiting signals that a shutdown of the Worker has been
10
// requested.
11
var ErrWorkerPoolExiting = errors.New("worker pool exiting")
12

13
// DefaultWorkerTimeout is the default duration after which a worker goroutine
14
// will exit to free up resources after having received no newly submitted
15
// tasks.
16
const DefaultWorkerTimeout = 90 * time.Second
17

18
type (
19
        // WorkerState is an interface used by the Worker to abstract the
20
        // lifecycle of internal state used by a worker goroutine.
21
        WorkerState interface {
22
                // Reset clears any internal state that may have been dirtied in
23
                // processing a prior task.
24
                Reset()
25

26
                // Cleanup releases any shared state before a worker goroutine
27
                // exits.
28
                Cleanup()
29
        }
30

31
        // WorkerConfig parametrizes the behavior of a Worker pool.
32
        WorkerConfig struct {
33
                // NewWorkerState allocates a new state for a worker goroutine.
34
                // This method is called each time a new worker goroutine is
35
                // spawned by the pool.
36
                NewWorkerState func() WorkerState
37

38
                // NumWorkers is the maximum number of workers the Worker pool
39
                // will permit to be allocated. Once the maximum number is
40
                // reached, any newly submitted tasks are forced to be processed
41
                // by existing worker goroutines.
42
                NumWorkers int
43

44
                // WorkerTimeout is the duration after which a worker goroutine
45
                // will exit after having received no newly submitted tasks.
46
                WorkerTimeout time.Duration
47
        }
48

49
        // Worker maintains a pool of goroutines that process submitted function
50
        // closures, and enable more efficient reuse of expensive state.
51
        Worker struct {
52
                started sync.Once
53
                stopped sync.Once
54

55
                cfg *WorkerConfig
56

57
                // requests is a channel where new tasks are submitted. Tasks
58
                // submitted through this channel may cause a new worker
59
                // goroutine to be allocated.
60
                requests chan *request
61

62
                // work is a channel where new tasks are submitted, but is only
63
                // read by active worker goroutines.
64
                work chan *request
65

66
                // workerSem is a channel-based semaphore that is used to limit
67
                // the total number of worker goroutines to the number
68
                // prescribed by the WorkerConfig.
69
                workerSem chan struct{}
70

71
                wg   sync.WaitGroup
72
                quit chan struct{}
73
        }
74

75
        // request is a tuple of task closure and error channel that is used to
76
        // both submit a task to the pool and respond with any errors
77
        // encountered during the task's execution.
78
        request struct {
79
                fn      func(WorkerState) error
80
                errChan chan error
81
        }
82
)
83

84
// NewWorker initializes a new Worker pool using the provided WorkerConfig.
85
func NewWorker(cfg *WorkerConfig) *Worker {
3✔
86
        return &Worker{
3✔
87
                cfg:       cfg,
3✔
88
                requests:  make(chan *request),
3✔
89
                workerSem: make(chan struct{}, cfg.NumWorkers),
3✔
90
                work:      make(chan *request),
3✔
91
                quit:      make(chan struct{}),
3✔
92
        }
3✔
93
}
3✔
94

95
// Start safely spins up the Worker pool.
96
func (w *Worker) Start() error {
3✔
97
        w.started.Do(func() {
6✔
98
                w.wg.Add(1)
3✔
99
                go w.requestHandler()
3✔
100
        })
3✔
101
        return nil
3✔
102
}
103

104
// Stop safely shuts down the Worker pool.
105
func (w *Worker) Stop() error {
3✔
106
        w.stopped.Do(func() {
6✔
107
                close(w.quit)
3✔
108
                w.wg.Wait()
3✔
109
        })
3✔
110
        return nil
3✔
111
}
112

113
// Submit accepts a function closure to the worker pool. The returned error will
114
// be either the result of the closure's execution or an ErrWorkerPoolExiting if
115
// a shutdown is requested.
116
func (w *Worker) Submit(fn func(WorkerState) error) error {
3✔
117
        req := &request{
3✔
118
                fn:      fn,
3✔
119
                errChan: make(chan error, 1),
3✔
120
        }
3✔
121

3✔
122
        select {
3✔
123

124
        // Send request to requestHandler, where either a new worker is spawned
125
        // or the task will be handed to an existing worker.
126
        case w.requests <- req:
3✔
127

128
        // Fast path directly to existing worker.
129
        case w.work <- req:
3✔
130

131
        case <-w.quit:
×
132
                return ErrWorkerPoolExiting
×
133
        }
134

135
        select {
3✔
136

137
        // Wait for task to be processed.
138
        case err := <-req.errChan:
3✔
139
                return err
3✔
140

141
        case <-w.quit:
×
142
                return ErrWorkerPoolExiting
×
143
        }
144
}
145

146
// requestHandler processes incoming tasks by either allocating new worker
147
// goroutines to process the incoming tasks, or by feeding a submitted task to
148
// an already running worker goroutine.
149
func (w *Worker) requestHandler() {
3✔
150
        defer w.wg.Done()
3✔
151

3✔
152
        for {
6✔
153
                select {
3✔
154
                case req := <-w.requests:
3✔
155
                        select {
3✔
156

157
                        // If we have not reached our maximum number of workers,
158
                        // spawn one to process the submitted request.
159
                        case w.workerSem <- struct{}{}:
3✔
160
                                w.wg.Add(1)
3✔
161
                                go w.spawnWorker(req)
3✔
162

163
                        // Otherwise, submit the task to any of the active
164
                        // workers.
165
                        case w.work <- req:
3✔
166

167
                        case <-w.quit:
×
168
                                return
×
169
                        }
170

171
                case <-w.quit:
3✔
172
                        return
3✔
173
                }
174
        }
175
}
176

177
// spawnWorker is used when the Worker pool wishes to create a new worker
178
// goroutine. The worker's state is initialized by calling the config's
179
// NewWorkerState method, and will continue to process incoming tasks until the
180
// pool is shut down or no new tasks are received before the worker's timeout
181
// elapses.
182
//
183
// NOTE: This method MUST be run as a goroutine.
184
func (w *Worker) spawnWorker(req *request) {
3✔
185
        defer w.wg.Done()
3✔
186
        defer func() { <-w.workerSem }()
6✔
187

188
        state := w.cfg.NewWorkerState()
3✔
189
        defer state.Cleanup()
3✔
190

3✔
191
        req.errChan <- req.fn(state)
3✔
192

3✔
193
        // We'll use a timer to implement the worker timeouts, as this reduces
3✔
194
        // the number of total allocations that would otherwise be necessary
3✔
195
        // with time.After.
3✔
196
        var t *time.Timer
3✔
197
        for {
6✔
198
                // Before processing another request, we'll reset the worker
3✔
199
                // state to that each request is processed against a clean
3✔
200
                // state.
3✔
201
                state.Reset()
3✔
202

3✔
203
                select {
3✔
204

205
                // Process any new requests that get submitted. We use a
206
                // non-blocking case first so that under high load we can spare
207
                // allocating a timeout.
UNCOV
208
                case req := <-w.work:
×
UNCOV
209
                        req.errChan <- req.fn(state)
×
UNCOV
210
                        continue
×
211

212
                case <-w.quit:
×
213
                        return
×
214

215
                default:
3✔
216
                }
217

218
                // There were no new requests that could be taken immediately
219
                // from the work channel. Initialize or reset the timeout, which
220
                // will fire if the worker doesn't receive a new task before
221
                // needing to exit.
222
                if t != nil {
6✔
223
                        t.Reset(w.cfg.WorkerTimeout)
3✔
224
                } else {
6✔
225
                        t = time.NewTimer(w.cfg.WorkerTimeout)
3✔
226
                }
3✔
227

228
                select {
3✔
229

230
                // Process any new requests that get submitted.
231
                case req := <-w.work:
3✔
232
                        req.errChan <- req.fn(state)
3✔
233

3✔
234
                        // Stop the timer, draining the timer's channel if a
3✔
235
                        // notification was already delivered.
3✔
236
                        if !t.Stop() {
3✔
237
                                <-t.C
×
238
                        }
×
239

240
                // The timeout has elapsed, meaning the worker did not receive
241
                // any new tasks. Exit to allow the worker to return and free
242
                // its resources.
243
                case <-t.C:
1✔
244
                        return
1✔
245

246
                case <-w.quit:
3✔
247
                        return
3✔
248
                }
249
        }
250
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc