• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1592525883

17 Dec 2024 01:50PM UTC coverage: 73.526% (+0.7%) from 72.839%
1592525883

Pull #270

gitlab-ci

bahaa-ghazal
test: testing 'deploy to all devices' feature

Changelog = Title
Ticket = MEN-4272
Signed-off-by: Bahaa Aldeen Ghazal <bahaa.ghazal@northern.tech>
Pull Request #270: test: testing 'deploy to all devices' feature

4244 of 6144 branches covered (69.08%)

Branch coverage included in aggregate %.

40043 of 54089 relevant lines covered (74.03%)

23.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.8
/backend/services/workflows/app/worker/worker_group.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "sync"
21
        "sync/atomic"
22
        "time"
23

24
        natsio "github.com/nats-io/nats.go"
25
        "github.com/pkg/errors"
26

27
        "github.com/mendersoftware/mender-server/pkg/log"
28

29
        "github.com/mendersoftware/mender-server/services/workflows/client/nats"
30
        "github.com/mendersoftware/mender-server/services/workflows/model"
31
        "github.com/mendersoftware/mender-server/services/workflows/store"
32
)
33

34
type workerGroup struct {
35
        mu        sync.Mutex    // Mutex to protect the shared firstDone channel
36
        firstDone chan struct{} // First worker finished
37
        done      chan struct{} // All workers finished
38
        termID    int32         // ID of the first worker to finish
39

40
        workerCount int32
41

42
        input        <-chan *natsio.Msg
43
        notifyPeriod time.Duration
44

45
        client nats.Client
46
        store  store.DataStore
47
}
48

49
func NewWorkGroup(
50
        input <-chan *natsio.Msg,
51
        notifyPeriod time.Duration,
52
        nc nats.Client,
53
        ds store.DataStore,
54
) *workerGroup {
2✔
55
        return &workerGroup{
2✔
56
                done:      make(chan struct{}),
2✔
57
                firstDone: make(chan struct{}),
2✔
58

2✔
59
                input:        input,
2✔
60
                notifyPeriod: notifyPeriod,
2✔
61
                client:       nc,
2✔
62
                store:        ds,
2✔
63
        }
2✔
64
}
2✔
65

66
// Done returns a channel (barrier) that is closed when the last worker
67
// has exited.
68
func (w *workerGroup) Done() <-chan struct{} {
2✔
69
        return w.done
2✔
70
}
2✔
71

72
// FirstDone returns a channel (barrier) that is closed when the first
73
// worker has exited.
74
func (w *workerGroup) FirstDone() <-chan struct{} {
2✔
75
        return w.firstDone
2✔
76
}
2✔
77

78
// TermID is the ID of the first worker that quit.
79
func (w *workerGroup) TermID() int32 {
×
80
        return w.termID
×
81
}
×
82

83
func (w *workerGroup) RunWorker(ctx context.Context) {
2✔
84
        id := atomic.AddInt32(&w.workerCount, 1)
2✔
85
        l := log.FromContext(ctx)
2✔
86
        l = l.F(log.Ctx{"worker_id": id})
2✔
87
        ctx = log.WithContext(ctx, l)
2✔
88

2✔
89
        sidecarChan := make(chan *natsio.Msg, 1)
2✔
90
        sidecarDone := make(chan struct{})
2✔
91
        defer func() {
4✔
92
                l.Info("worker shutting down")
2✔
93
                close(sidecarChan)
2✔
94

2✔
95
                w.mu.Lock()
2✔
96
                remaining := atomic.AddInt32(&w.workerCount, -1)
2✔
97
                // Is this the last worker to quit?
2✔
98
                if remaining <= 0 {
4✔
99
                        select {
2✔
100
                        case <-w.done:
×
101

102
                        default:
2✔
103
                                close(w.done)
2✔
104
                        }
105
                } else {
2✔
106
                        // Is this the first worker to quit?
2✔
107
                        select {
2✔
108
                        case <-w.firstDone:
2✔
109

110
                        default:
2✔
111
                                w.termID = id
2✔
112
                                close(w.firstDone)
2✔
113
                        }
114
                }
115
                w.mu.Unlock()
2✔
116
        }()
117
        l.Info("worker starting up")
2✔
118
        // workerSidecar is responsible for notifying the broker about slow workflows
2✔
119
        go w.workerSidecar(ctx, sidecarChan, sidecarDone)
2✔
120
        w.workerMain(ctx, sidecarChan, sidecarDone)
2✔
121
}
122

123
func (w *workerGroup) workerMain(
124
        ctx context.Context,
125
        sidecarChan chan *natsio.Msg,
126
        sidecarDone chan struct{},
127
) {
2✔
128
        l := log.FromContext(ctx)
2✔
129
        ctxDone := ctx.Done()
2✔
130
        timeoutTimer := newStoppedTimer()
2✔
131
        for {
4✔
132
                var (
2✔
133
                        msg    *natsio.Msg
2✔
134
                        isOpen bool
2✔
135
                )
2✔
136
                select {
2✔
137
                case msg, isOpen = <-w.input:
2✔
138
                        if !isOpen {
4✔
139
                                return
2✔
140
                        }
2✔
141

142
                case <-sidecarDone:
×
143
                        return
×
144
                case <-ctxDone:
×
145
                        return
×
146
                }
147

148
                // Notify the sidecar routine about the new message
149
                select {
2✔
150
                case sidecarChan <- msg:
2✔
151

152
                case <-timeoutTimer.After(w.notifyPeriod / 8):
×
153
                        l.Warn("timeout notifying sidecar routine about message")
×
154

155
                case <-sidecarDone:
×
156
                        return
×
157
                case <-ctxDone:
×
158
                        return
×
159
                }
160

161
                job := &model.Job{}
2✔
162
                err := json.Unmarshal(msg.Data, job)
2✔
163
                if err != nil {
2✔
164
                        l.Error(errors.Wrap(err, "failed to unmarshall message"))
×
165
                        if err := msg.Term(); err != nil {
×
166
                                l.Error(errors.Wrap(err, "failed to term the message"))
×
167
                        }
×
168
                        continue
×
169
                }
170
                // process the job
171
                l.Infof("processing job %s workflow %s", job.ID, job.WorkflowName)
2✔
172
                err = processJob(ctx, job, w.store, w.client)
2✔
173
                if err != nil {
4✔
174
                        l.Errorf("error processing job: %s", err.Error())
2✔
175
                } else {
4✔
176
                        l.Infof("finished job %s workflow %s", job.ID, job.WorkflowName)
2✔
177
                }
2✔
178
                // stop the in progress ticker and ack the message
179
                select {
2✔
180
                case sidecarChan <- nil:
2✔
181

182
                case <-timeoutTimer.After(w.notifyPeriod):
×
183
                        l.Errorf("timeout notifying sidecar about job completion")
×
184

185
                case <-ctxDone:
×
186
                        return
×
187
                case <-sidecarDone:
×
188
                        return
×
189
                }
190
                // Release message
191
                if err := msg.AckSync(); err != nil {
2✔
192
                        l.Error(errors.Wrap(err, "failed to ack the message"))
×
193
                }
×
194
        }
195
}
196

197
// workerSidecar helps notifying the NATS server about slow workflows.
198
// When workerMain picks up a new task, this routine is woken up and starts
199
// a timer that sends an "IN PROGRESS" package back to the broker if the worker
200
// takes too long.
201
func (w *workerGroup) workerSidecar(
202
        ctx context.Context,
203
        msgIn <-chan *natsio.Msg,
204
        done chan<- struct{},
205
) {
2✔
206
        var (
2✔
207
                isOpen        bool
2✔
208
                msgInProgress *natsio.Msg
2✔
209
                ctxDone       = ctx.Done()
2✔
210
                l             = log.FromContext(ctx)
2✔
211
        )
2✔
212
        defer close(done)
2✔
213

2✔
214
        t := newStoppedTimer()
2✔
215
        for {
4✔
216
                select {
2✔
217
                case <-t.C:
×
218
                        ctx, cancel := context.WithTimeout(ctx, w.notifyPeriod)
×
219
                        err := msgInProgress.InProgress(natsio.Context(ctx))
×
220
                        cancel()
×
221
                        if err != nil {
×
222
                                l.Errorf("error notifying broker about message in progress: %s", err)
×
223
                                // If the +WPI message fails, let's not try again, but
×
224
                                // wait for the next message.
×
225
                        } else {
×
226
                                t.Reset(w.notifyPeriod)
×
227
                        }
×
228
                case msgInProgress, isOpen = <-msgIn:
2✔
229
                        if !isOpen {
4✔
230
                                return
2✔
231
                        } else if msgInProgress == nil {
6✔
232
                                t.Stop()
2✔
233
                        } else {
4✔
234
                                t.Reset(w.notifyPeriod)
2✔
235
                        }
2✔
236
                case <-ctxDone:
×
237
                        return
×
238
                }
239
        }
240
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc