• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1495380963

14 Oct 2024 03:35PM UTC coverage: 70.373% (-2.5%) from 72.904%
1495380963

Pull #101

gitlab-ci

mineralsfree
feat: tenant list added

Ticket: MEN-7568
Changelog: None

Signed-off-by: Mikita Pilinka <mikita.pilinka@northern.tech>
Pull Request #101: feat: tenant list added

4406 of 6391 branches covered (68.94%)

Branch coverage included in aggregate %.

88 of 183 new or added lines in 10 files covered. (48.09%)

2623 existing lines in 65 files now uncovered.

36673 of 51982 relevant lines covered (70.55%)

31.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/backend/services/workflows/app/worker/worker_group.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "sync"
21
        "sync/atomic"
22
        "time"
23

24
        natsio "github.com/nats-io/nats.go"
25
        "github.com/pkg/errors"
26

27
        "github.com/mendersoftware/mender-server/pkg/log"
28

29
        "github.com/mendersoftware/mender-server/services/workflows/client/nats"
30
        "github.com/mendersoftware/mender-server/services/workflows/model"
31
        "github.com/mendersoftware/mender-server/services/workflows/store"
32
)
33

34
type workerGroup struct {
35
        mu        sync.Mutex    // Mutex to protect the shared firstDone channel
36
        firstDone chan struct{} // First worker finished
37
        done      chan struct{} // All workers finished
38
        termID    int32         // ID of the first worker to finish
39

40
        workerCount int32
41

42
        input        <-chan *natsio.Msg
43
        notifyPeriod time.Duration
44

45
        client nats.Client
46
        store  store.DataStore
47
}
48

49
func NewWorkGroup(
50
        input <-chan *natsio.Msg,
51
        notifyPeriod time.Duration,
52
        nc nats.Client,
53
        ds store.DataStore,
UNCOV
54
) *workerGroup {
×
UNCOV
55
        return &workerGroup{
×
UNCOV
56
                done:      make(chan struct{}),
×
UNCOV
57
                firstDone: make(chan struct{}),
×
UNCOV
58

×
UNCOV
59
                input:        input,
×
UNCOV
60
                notifyPeriod: notifyPeriod,
×
UNCOV
61
                client:       nc,
×
UNCOV
62
                store:        ds,
×
UNCOV
63
        }
×
UNCOV
64
}
×
65

66
// Done returns a channel (barrier) that is closed when the last worker
67
// has exited.
UNCOV
68
func (w *workerGroup) Done() <-chan struct{} {
×
UNCOV
69
        return w.done
×
UNCOV
70
}
×
71

72
// FirstDone returns a channel (barrier) that is closed when the first
73
// worker has exited.
UNCOV
74
func (w *workerGroup) FirstDone() <-chan struct{} {
×
UNCOV
75
        return w.firstDone
×
UNCOV
76
}
×
77

78
// TermID is the ID of the first worker that quit.
79
func (w *workerGroup) TermID() int32 {
×
80
        return w.termID
×
81
}
×
82

UNCOV
83
func (w *workerGroup) RunWorker(ctx context.Context) {
×
UNCOV
84
        id := atomic.AddInt32(&w.workerCount, 1)
×
UNCOV
85
        l := log.FromContext(ctx)
×
UNCOV
86
        l = l.F(log.Ctx{"worker_id": id})
×
UNCOV
87
        ctx = log.WithContext(ctx, l)
×
UNCOV
88

×
UNCOV
89
        sidecarChan := make(chan *natsio.Msg, 1)
×
UNCOV
90
        sidecarDone := make(chan struct{})
×
UNCOV
91
        defer func() {
×
UNCOV
92
                l.Info("worker shutting down")
×
UNCOV
93
                close(sidecarChan)
×
UNCOV
94

×
UNCOV
95
                w.mu.Lock()
×
UNCOV
96
                remaining := atomic.AddInt32(&w.workerCount, -1)
×
UNCOV
97
                // Is this the last worker to quit?
×
UNCOV
98
                if remaining <= 0 {
×
UNCOV
99
                        select {
×
100
                        case <-w.done:
×
101

UNCOV
102
                        default:
×
UNCOV
103
                                close(w.done)
×
104
                        }
UNCOV
105
                } else {
×
UNCOV
106
                        // Is this the first worker to quit?
×
UNCOV
107
                        select {
×
UNCOV
108
                        case <-w.firstDone:
×
109

UNCOV
110
                        default:
×
UNCOV
111
                                w.termID = id
×
UNCOV
112
                                close(w.firstDone)
×
113
                        }
114
                }
UNCOV
115
                w.mu.Unlock()
×
116
        }()
UNCOV
117
        l.Info("worker starting up")
×
UNCOV
118
        // workerSidecar is responsible for notifying the broker about slow workflows
×
UNCOV
119
        go w.workerSidecar(ctx, sidecarChan, sidecarDone)
×
UNCOV
120
        w.workerMain(ctx, sidecarChan, sidecarDone)
×
121
}
122

123
func (w *workerGroup) workerMain(
124
        ctx context.Context,
125
        sidecarChan chan *natsio.Msg,
126
        sidecarDone chan struct{},
UNCOV
127
) {
×
UNCOV
128
        l := log.FromContext(ctx)
×
UNCOV
129
        ctxDone := ctx.Done()
×
UNCOV
130
        timeoutTimer := newStoppedTimer()
×
UNCOV
131
        for {
×
UNCOV
132
                var (
×
UNCOV
133
                        msg    *natsio.Msg
×
UNCOV
134
                        isOpen bool
×
UNCOV
135
                )
×
UNCOV
136
                select {
×
UNCOV
137
                case msg, isOpen = <-w.input:
×
UNCOV
138
                        if !isOpen {
×
UNCOV
139
                                return
×
UNCOV
140
                        }
×
141

142
                case <-sidecarDone:
×
143
                        return
×
144
                case <-ctxDone:
×
145
                        return
×
146
                }
147

148
                // Notify the sidecar routine about the new message
UNCOV
149
                select {
×
UNCOV
150
                case sidecarChan <- msg:
×
151

152
                case <-timeoutTimer.After(w.notifyPeriod / 8):
×
153
                        l.Warn("timeout notifying sidecar routine about message")
×
154

155
                case <-sidecarDone:
×
156
                        return
×
157
                case <-ctxDone:
×
158
                        return
×
159
                }
160

UNCOV
161
                job := &model.Job{}
×
UNCOV
162
                err := json.Unmarshal(msg.Data, job)
×
UNCOV
163
                if err != nil {
×
164
                        l.Error(errors.Wrap(err, "failed to unmarshall message"))
×
165
                        if err := msg.Term(); err != nil {
×
166
                                l.Error(errors.Wrap(err, "failed to term the message"))
×
167
                        }
×
168
                        continue
×
169
                }
170
                // process the job
UNCOV
171
                l.Infof("processing job %s workflow %s", job.ID, job.WorkflowName)
×
UNCOV
172
                err = processJob(ctx, job, w.store, w.client)
×
UNCOV
173
                if err != nil {
×
UNCOV
174
                        l.Errorf("error processing job: %s", err.Error())
×
UNCOV
175
                } else {
×
UNCOV
176
                        l.Infof("finished job %s workflow %s", job.ID, job.WorkflowName)
×
UNCOV
177
                }
×
178
                // stop the in progress ticker and ack the message
UNCOV
179
                select {
×
UNCOV
180
                case sidecarChan <- nil:
×
181

182
                case <-timeoutTimer.After(w.notifyPeriod):
×
183
                        l.Errorf("timeout notifying sidecar about job completion")
×
184

185
                case <-ctxDone:
×
186
                        return
×
187
                case <-sidecarDone:
×
188
                        return
×
189
                }
190
                // Release message
UNCOV
191
                if err := msg.AckSync(); err != nil {
×
192
                        l.Error(errors.Wrap(err, "failed to ack the message"))
×
193
                }
×
194
        }
195
}
196

197
// workerSidecar helps notifying the NATS server about slow workflows.
198
// When workerMain picks up a new task, this routine is woken up and starts
199
// a timer that sends an "IN PROGRESS" package back to the broker if the worker
200
// takes too long.
201
func (w *workerGroup) workerSidecar(
202
        ctx context.Context,
203
        msgIn <-chan *natsio.Msg,
204
        done chan<- struct{},
UNCOV
205
) {
×
UNCOV
206
        var (
×
UNCOV
207
                isOpen        bool
×
UNCOV
208
                msgInProgress *natsio.Msg
×
UNCOV
209
                ctxDone       = ctx.Done()
×
UNCOV
210
                l             = log.FromContext(ctx)
×
UNCOV
211
        )
×
UNCOV
212
        defer close(done)
×
UNCOV
213

×
UNCOV
214
        t := newStoppedTimer()
×
UNCOV
215
        for {
×
UNCOV
216
                select {
×
UNCOV
217
                case <-t.C:
×
UNCOV
218
                        ctx, cancel := context.WithTimeout(ctx, w.notifyPeriod)
×
UNCOV
219
                        err := msgInProgress.InProgress(natsio.Context(ctx))
×
UNCOV
220
                        cancel()
×
UNCOV
221
                        if err != nil {
×
UNCOV
222
                                l.Errorf("error notifying broker about message in progress: %s", err)
×
UNCOV
223
                                // If the +WPI message fails, let's not try again, but
×
UNCOV
224
                                // wait for the next message.
×
UNCOV
225
                        } else {
×
226
                                t.Reset(w.notifyPeriod)
×
227
                        }
×
UNCOV
228
                case msgInProgress, isOpen = <-msgIn:
×
UNCOV
229
                        if !isOpen {
×
UNCOV
230
                                return
×
UNCOV
231
                        } else if msgInProgress == nil {
×
UNCOV
232
                                t.Stop()
×
UNCOV
233
                        } else {
×
UNCOV
234
                                t.Reset(w.notifyPeriod)
×
UNCOV
235
                        }
×
236
                case <-ctxDone:
×
237
                        return
×
238
                }
239
        }
240
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc