• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1978132441

11 Aug 2025 03:08PM UTC coverage: 65.376% (-0.1%) from 65.495%
1978132441

Pull #854

gitlab-ci

bahaa-ghazal
feat(workflows): Add CLI flag to migrate consumers from push to pull mode

Signed-off-by: Bahaa Aldeen Ghazal <bahaa.ghazal@northern.tech>
Pull Request #854: MEN-8326: Migrate consumers from push to pull mode

71 of 160 new or added lines in 5 files covered. (44.38%)

31 existing lines in 3 files now uncovered.

32338 of 49465 relevant lines covered (65.38%)

1.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.26
/backend/services/workflows/app/worker/worker_group.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "sync"
21
        "sync/atomic"
22
        "time"
23

24
        natsio "github.com/nats-io/nats.go"
25
        "github.com/pkg/errors"
26

27
        "github.com/mendersoftware/mender-server/pkg/log"
28

29
        "github.com/mendersoftware/mender-server/services/workflows/client/nats"
30
        "github.com/mendersoftware/mender-server/services/workflows/model"
31
        "github.com/mendersoftware/mender-server/services/workflows/store"
32
)
33

34
type workerGroup struct {
35
        mu        sync.Mutex    // Mutex to protect the shared firstDone channel
36
        firstDone chan struct{} // First worker finished
37
        done      chan struct{} // All workers finished
38
        termID    int32         // ID of the first worker to finish
39

40
        workerCount int32
41

42
        input        <-chan *natsio.Msg
43
        notifyPeriod time.Duration
44

45
        sub    *natsio.Subscription
46
        client nats.Client
47
        store  store.DataStore
48
}
49

50
func NewWorkGroup(
51
        input <-chan *natsio.Msg,
52
        notifyPeriod time.Duration,
53
        nc nats.Client,
54
        ds store.DataStore,
55
        sub *natsio.Subscription,
56
) *workerGroup {
2✔
57
        return &workerGroup{
2✔
58
                done:      make(chan struct{}),
2✔
59
                firstDone: make(chan struct{}),
2✔
60

2✔
61
                input:        input,
2✔
62
                notifyPeriod: notifyPeriod,
2✔
63
                client:       nc,
2✔
64
                store:        ds,
2✔
65
                sub:          sub,
2✔
66
        }
2✔
67
}
2✔
68

69
// Done returns a channel (barrier) that is closed when the last worker
70
// has exited.
71
func (w *workerGroup) Done() <-chan struct{} {
2✔
72
        return w.done
2✔
73
}
2✔
74

75
// FirstDone returns a channel (barrier) that is closed when the first
76
// worker has exited.
77
func (w *workerGroup) FirstDone() <-chan struct{} {
2✔
78
        return w.firstDone
2✔
79
}
2✔
80

81
// TermID is the ID of the first worker that quit.
82
func (w *workerGroup) TermID() int32 {
×
83
        return w.termID
×
84
}
×
85

86
func (w *workerGroup) RunWorker(ctx context.Context) {
2✔
87
        id := atomic.AddInt32(&w.workerCount, 1)
2✔
88
        l := log.FromContext(ctx)
2✔
89
        l = l.F(log.Ctx{"worker_id": id})
2✔
90
        ctx = log.WithContext(ctx, l)
2✔
91

2✔
92
        sidecarChan := make(chan *natsio.Msg, 1)
2✔
93
        sidecarDone := make(chan struct{})
2✔
94
        defer func() {
4✔
95
                l.Info("worker shutting down")
2✔
96
                close(sidecarChan)
2✔
97

2✔
98
                w.mu.Lock()
2✔
99
                remaining := atomic.AddInt32(&w.workerCount, -1)
2✔
100
                // Is this the last worker to quit?
2✔
101
                if remaining <= 0 {
4✔
102
                        select {
2✔
103
                        case <-w.done:
×
104

105
                        default:
2✔
106
                                close(w.done)
2✔
107
                        }
108
                } else {
2✔
109
                        // Is this the first worker to quit?
2✔
110
                        select {
2✔
111
                        case <-w.firstDone:
2✔
112

113
                        default:
2✔
114
                                w.termID = id
2✔
115
                                close(w.firstDone)
2✔
116
                        }
117
                }
118
                w.mu.Unlock()
2✔
119
        }()
120
        l.Info("worker starting up")
2✔
121
        // workerSidecar is responsible for notifying the broker about slow workflows
2✔
122
        go w.workerSidecar(ctx, sidecarChan, sidecarDone)
2✔
123
        switch w.sub.Type() {
2✔
124
        case natsio.PullSubscription:
2✔
125
                w.pullWorkerMain(ctx, sidecarChan, sidecarDone)
2✔
NEW
126
        case natsio.ChanSubscription:
×
NEW
127
                w.workerMain(ctx, sidecarChan, sidecarDone)
×
NEW
128
        default:
×
NEW
129
                l.Error("unsupported consumer type")
×
130
        }
131
}
132

133
func (w *workerGroup) pullWorkerMain(
134
        ctx context.Context,
135
        sidecarChan chan *natsio.Msg,
136
        sidecarDone chan struct{},
137
) {
2✔
138
        l := log.FromContext(ctx)
2✔
139
        ctxDone := ctx.Done()
2✔
140
        timeoutTimer := newStoppedTimer()
2✔
141
        for {
4✔
142
                var msg *natsio.Msg
2✔
143
                select {
2✔
NEW
144
                case <-ctx.Done():
×
NEW
145
                        return
×
NEW
146
                case <-sidecarDone:
×
NEW
147
                        return
×
148
                default:
2✔
149
                        msgs, err := w.sub.Fetch(1, natsio.MaxWait(1*time.Second))
2✔
150
                        if err != nil {
4✔
151
                                if errors.Is(err, natsio.ErrTimeout) {
4✔
152
                                        continue
2✔
153
                                }
154
                                l.Errorf("failed to fetch message: %s", err)
2✔
155
                                return
2✔
156
                        }
157
                        if len(msgs) == 0 {
2✔
NEW
158
                                continue
×
159
                        }
160
                        msg = msgs[0]
2✔
161
                }
162

163
                // Notify the sidecar routine about the new message
164
                select {
2✔
165
                case sidecarChan <- msg:
2✔
166

NEW
167
                case <-timeoutTimer.After(w.notifyPeriod / 8):
×
NEW
168
                        l.Warn("timeout notifying sidecar routine about message")
×
169

NEW
170
                case <-sidecarDone:
×
NEW
171
                        return
×
NEW
172
                case <-ctxDone:
×
NEW
173
                        return
×
174
                }
175

176
                job := &model.Job{}
2✔
177
                err := json.Unmarshal(msg.Data, job)
2✔
178
                if err != nil {
2✔
NEW
179
                        l.Error(errors.Wrap(err, "failed to unmarshall message"))
×
NEW
180
                        if err := msg.Term(); err != nil {
×
NEW
181
                                l.Error(errors.Wrap(err, "failed to term the message"))
×
NEW
182
                        }
×
NEW
183
                        continue
×
184
                }
185
                // process the job
186
                l.Infof("processing job %s workflow %s", job.ID, job.WorkflowName)
2✔
187
                err = processJob(ctx, job, w.store, w.client)
2✔
188
                if err != nil {
4✔
189
                        l.Errorf("error processing job: %s", err.Error())
2✔
190
                } else {
4✔
191
                        l.Infof("finished job %s workflow %s", job.ID, job.WorkflowName)
2✔
192
                }
2✔
193
                // stop the in progress ticker and ack the message
194
                select {
2✔
195
                case sidecarChan <- nil:
2✔
196

NEW
197
                case <-timeoutTimer.After(w.notifyPeriod):
×
NEW
198
                        l.Errorf("timeout notifying sidecar about job completion")
×
199

NEW
200
                case <-ctxDone:
×
NEW
201
                        return
×
NEW
202
                case <-sidecarDone:
×
NEW
203
                        return
×
204
                }
205
                // Release message
206
                if err := msg.AckSync(); err != nil {
2✔
NEW
207
                        l.Error(errors.Wrap(err, "failed to ack the message"))
×
NEW
208
                }
×
209
        }
210
}
211

212
func (w *workerGroup) workerMain(
213
        ctx context.Context,
214
        sidecarChan chan *natsio.Msg,
215
        sidecarDone chan struct{},
UNCOV
216
) {
×
UNCOV
217
        l := log.FromContext(ctx)
×
UNCOV
218
        ctxDone := ctx.Done()
×
UNCOV
219
        timeoutTimer := newStoppedTimer()
×
UNCOV
220
        for {
×
UNCOV
221
                var (
×
UNCOV
222
                        msg    *natsio.Msg
×
UNCOV
223
                        isOpen bool
×
UNCOV
224
                )
×
UNCOV
225
                select {
×
UNCOV
226
                case msg, isOpen = <-w.input:
×
UNCOV
227
                        if !isOpen {
×
UNCOV
228
                                return
×
UNCOV
229
                        }
×
230

231
                case <-sidecarDone:
×
232
                        return
×
233
                case <-ctxDone:
×
234
                        return
×
235
                }
236

237
                // Notify the sidecar routine about the new message
UNCOV
238
                select {
×
UNCOV
239
                case sidecarChan <- msg:
×
240

241
                case <-timeoutTimer.After(w.notifyPeriod / 8):
×
242
                        l.Warn("timeout notifying sidecar routine about message")
×
243

244
                case <-sidecarDone:
×
245
                        return
×
246
                case <-ctxDone:
×
247
                        return
×
248
                }
249

UNCOV
250
                job := &model.Job{}
×
UNCOV
251
                err := json.Unmarshal(msg.Data, job)
×
UNCOV
252
                if err != nil {
×
253
                        l.Error(errors.Wrap(err, "failed to unmarshall message"))
×
254
                        if err := msg.Term(); err != nil {
×
255
                                l.Error(errors.Wrap(err, "failed to term the message"))
×
256
                        }
×
257
                        continue
×
258
                }
259
                // process the job
UNCOV
260
                l.Infof("processing job %s workflow %s", job.ID, job.WorkflowName)
×
UNCOV
261
                err = processJob(ctx, job, w.store, w.client)
×
UNCOV
262
                if err != nil {
×
UNCOV
263
                        l.Errorf("error processing job: %s", err.Error())
×
UNCOV
264
                } else {
×
UNCOV
265
                        l.Infof("finished job %s workflow %s", job.ID, job.WorkflowName)
×
UNCOV
266
                }
×
267
                // stop the in progress ticker and ack the message
UNCOV
268
                select {
×
UNCOV
269
                case sidecarChan <- nil:
×
270

271
                case <-timeoutTimer.After(w.notifyPeriod):
×
272
                        l.Errorf("timeout notifying sidecar about job completion")
×
273

274
                case <-ctxDone:
×
275
                        return
×
276
                case <-sidecarDone:
×
277
                        return
×
278
                }
279
                // Release message
UNCOV
280
                if err := msg.AckSync(); err != nil {
×
281
                        l.Error(errors.Wrap(err, "failed to ack the message"))
×
282
                }
×
283
        }
284
}
285

286
// workerSidecar helps notifying the NATS server about slow workflows.
287
// When workerMain picks up a new task, this routine is woken up and starts
288
// a timer that sends an "IN PROGRESS" package back to the broker if the worker
289
// takes too long.
290
func (w *workerGroup) workerSidecar(
291
        ctx context.Context,
292
        msgIn <-chan *natsio.Msg,
293
        done chan<- struct{},
294
) {
2✔
295
        var (
2✔
296
                isOpen        bool
2✔
297
                msgInProgress *natsio.Msg
2✔
298
                ctxDone       = ctx.Done()
2✔
299
                l             = log.FromContext(ctx)
2✔
300
        )
2✔
301
        defer close(done)
2✔
302

2✔
303
        t := newStoppedTimer()
2✔
304
        for {
4✔
305
                select {
2✔
306
                case <-t.C:
×
307
                        ctx, cancel := context.WithTimeout(ctx, w.notifyPeriod)
×
308
                        err := msgInProgress.InProgress(natsio.Context(ctx))
×
309
                        cancel()
×
310
                        if err != nil {
×
311
                                l.Errorf("error notifying broker about message in progress: %s", err)
×
312
                                // If the +WPI message fails, let's not try again, but
×
313
                                // wait for the next message.
×
314
                        } else {
×
315
                                t.Reset(w.notifyPeriod)
×
316
                        }
×
317
                case msgInProgress, isOpen = <-msgIn:
2✔
318
                        if !isOpen {
4✔
319
                                return
2✔
320
                        } else if msgInProgress == nil {
6✔
321
                                t.Stop()
2✔
322
                        } else {
4✔
323
                                t.Reset(w.notifyPeriod)
2✔
324
                        }
2✔
325
                case <-ctxDone:
×
326
                        return
×
327
                }
328
        }
329
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc