• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 1329047123

27 May 2024 12:00PM UTC coverage: 82.689%. First build
1329047123

push

gitlab-ci

web-flow
Merge pull request #324 from alfrunes/2.5.x

Backport #318 (MEN-7214) to 2.5.x

0 of 174 new or added lines in 3 files covered. (0.0%)

1605 of 1941 relevant lines covered (82.69%)

14.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.74
/app/worker/worker_group.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "sync"
21
        "sync/atomic"
22
        "time"
23

24
        natsio "github.com/nats-io/nats.go"
25
        "github.com/pkg/errors"
26

27
        "github.com/mendersoftware/go-lib-micro/log"
28

29
        "github.com/mendersoftware/workflows/client/nats"
30
        "github.com/mendersoftware/workflows/model"
31
        "github.com/mendersoftware/workflows/store"
32
)
33

34
type workerGroup struct {
35
        mu        sync.Mutex    // Mutex to protect the shared firstDone channel
36
        firstDone chan struct{} // First worker finished
37
        done      chan struct{} // All workers finished
38
        termID    int32         // ID of the first worker to finish
39

40
        sidecarDone chan struct{}
41
        workerCount int32
42

43
        input        <-chan *natsio.Msg
44
        notifyPeriod time.Duration
45

46
        client nats.Client
47
        store  store.DataStore
48
}
49

50
func NewWorkGroup(
51
        input <-chan *natsio.Msg,
52
        notifyPeriod time.Duration,
53
        nc nats.Client,
54
        ds store.DataStore,
NEW
55
) *workerGroup {
1✔
NEW
56
        return &workerGroup{
1✔
NEW
57
                sidecarDone: make(chan struct{}),
1✔
NEW
58
                done:        make(chan struct{}),
1✔
NEW
59
                firstDone:   make(chan struct{}),
1✔
NEW
60

1✔
NEW
61
                input:        input,
1✔
NEW
62
                notifyPeriod: notifyPeriod,
1✔
NEW
63
                client:       nc,
1✔
NEW
64
                store:        ds,
1✔
NEW
65
        }
1✔
NEW
66
}
1✔
67

68
// Done returns a channel (barrier) that is closed when the last worker
69
// has exited.
NEW
70
func (w *workerGroup) Done() <-chan struct{} {
1✔
NEW
71
        return w.done
1✔
NEW
72
}
1✔
73

74
// FirstDone returns a channel (barrier) that is closed when the first
75
// worker has exited.
NEW
76
func (w *workerGroup) FirstDone() <-chan struct{} {
1✔
NEW
77
        return w.firstDone
1✔
NEW
78
}
1✔
79

80
// TermID is the ID of the first worker that quit.
NEW
81
func (w *workerGroup) TermID() int32 {
×
NEW
82
        return w.termID
×
NEW
83
}
×
84

NEW
85
func (w *workerGroup) RunWorker(ctx context.Context) {
10✔
NEW
86
        id := atomic.AddInt32(&w.workerCount, 1)
10✔
NEW
87
        l := log.FromContext(ctx)
10✔
NEW
88
        l = l.F(log.Ctx{"worker_id": id})
10✔
NEW
89
        ctx = log.WithContext(ctx, l)
10✔
NEW
90

10✔
NEW
91
        sidecarChan := make(chan *natsio.Msg, 1)
10✔
NEW
92
        sidecarDone := make(chan struct{})
10✔
NEW
93
        defer func() {
20✔
NEW
94
                l.Info("worker shutting down")
10✔
NEW
95
                close(sidecarChan)
10✔
NEW
96

10✔
NEW
97
                w.mu.Lock()
10✔
NEW
98
                remaining := atomic.AddInt32(&w.workerCount, -1)
10✔
NEW
99
                // Is this the last worker to quit?
10✔
NEW
100
                if remaining <= 0 {
11✔
NEW
101
                        select {
1✔
NEW
102
                        case <-w.done:
×
103

NEW
104
                        default:
1✔
NEW
105
                                close(w.done)
1✔
106
                        }
NEW
107
                } else {
9✔
NEW
108
                        // Is this the first worker to quit?
9✔
NEW
109
                        select {
9✔
NEW
110
                        case <-w.firstDone:
8✔
111

NEW
112
                        default:
1✔
NEW
113
                                w.termID = id
1✔
NEW
114
                                close(w.firstDone)
1✔
115
                        }
116
                }
NEW
117
                w.mu.Unlock()
10✔
118
        }()
NEW
119
        l.Info("worker starting up")
10✔
NEW
120
        // workerSidecar is responsible for notifying the broker about slow workflows
10✔
NEW
121
        go w.workerSidecar(ctx, sidecarChan, sidecarDone)
10✔
NEW
122
        w.workerMain(ctx, sidecarChan, sidecarDone)
10✔
123
}
124

125
func (w *workerGroup) workerMain(
126
        ctx context.Context,
127
        sidecarChan chan *natsio.Msg,
128
        sidecarDone chan struct{},
NEW
129
) {
10✔
NEW
130
        l := log.FromContext(ctx)
10✔
NEW
131
        ctxDone := ctx.Done()
10✔
NEW
132
        timeoutTimer := newStoppedTimer()
10✔
NEW
133
        for {
30✔
NEW
134
                var (
20✔
NEW
135
                        msg    *natsio.Msg
20✔
NEW
136
                        isOpen bool
20✔
NEW
137
                )
20✔
NEW
138
                select {
20✔
NEW
139
                case msg, isOpen = <-w.input:
20✔
NEW
140
                        if !isOpen {
30✔
NEW
141
                                return
10✔
NEW
142
                        }
10✔
143

NEW
144
                case <-sidecarDone:
×
NEW
145
                        return
×
NEW
146
                case <-ctxDone:
×
NEW
147
                        return
×
148
                }
149

150
                // Notify the sidecar routine about the new message
NEW
151
                select {
10✔
NEW
152
                case sidecarChan <- msg:
9✔
153

NEW
154
                case <-timeoutTimer.After(w.notifyPeriod / 8):
1✔
NEW
155
                        l.Warn("timeout notifying sidecar routine about message")
1✔
156

NEW
157
                case <-sidecarDone:
×
NEW
158
                        return
×
NEW
159
                case <-ctxDone:
×
NEW
160
                        return
×
161
                }
162

NEW
163
                job := &model.Job{}
10✔
NEW
164
                err := json.Unmarshal(msg.Data, job)
10✔
NEW
165
                if err != nil {
10✔
NEW
166
                        l.Error(errors.Wrap(err, "failed to unmarshall message"))
×
NEW
167
                        if err := msg.Term(); err != nil {
×
NEW
168
                                l.Error(errors.Wrap(err, "failed to term the message"))
×
NEW
169
                        }
×
NEW
170
                        continue
×
171
                }
172
                // process the job
NEW
173
                l.Infof("processing job %s workflow %s", job.ID, job.WorkflowName)
10✔
NEW
174
                err = processJob(ctx, job, w.store, w.client)
10✔
NEW
175
                if err != nil {
10✔
NEW
176
                        l.Errorf("error processing job: %s", err.Error())
×
NEW
177
                } else {
10✔
NEW
178
                        l.Infof("finished job %s workflow %s", job.ID, job.WorkflowName)
10✔
NEW
179
                }
10✔
180
                // stop the in progress ticker and ack the message
NEW
181
                select {
10✔
NEW
182
                case sidecarChan <- nil:
10✔
183

NEW
184
                case <-timeoutTimer.After(w.notifyPeriod):
×
NEW
185
                        l.Errorf("timeout notifying sidecar about job completion")
×
186

NEW
187
                case <-ctxDone:
×
NEW
188
                        return
×
NEW
189
                case <-sidecarDone:
×
NEW
190
                        return
×
191
                }
192
                // Release message
NEW
193
                if err := msg.AckSync(); err != nil {
10✔
NEW
194
                        l.Error(errors.Wrap(err, "failed to ack the message"))
×
NEW
195
                }
×
196
        }
197
}
198

199
// workerSidecar helps notifying the NATS server about slow workflows.
200
// When workerMain picks up a new task, this routine is woken up and starts
201
// a timer that sends an "IN PROGRESS" package back to the broker if the worker
202
// takes too long.
203
func (w *workerGroup) workerSidecar(
204
        ctx context.Context,
205
        msgIn <-chan *natsio.Msg,
206
        done chan<- struct{},
NEW
207
) {
10✔
NEW
208
        var (
10✔
NEW
209
                isOpen        bool
10✔
NEW
210
                msgInProgress *natsio.Msg
10✔
NEW
211
                ctxDone       = ctx.Done()
10✔
NEW
212
                l             = log.FromContext(ctx)
10✔
NEW
213
        )
10✔
NEW
214
        defer close(done)
10✔
NEW
215

10✔
NEW
216
        t := newStoppedTimer()
10✔
NEW
217
        for {
39✔
NEW
218
                select {
29✔
NEW
219
                case <-t.C:
×
NEW
220
                        ctx, cancel := context.WithTimeout(ctx, w.notifyPeriod)
×
NEW
221
                        err := msgInProgress.InProgress(natsio.Context(ctx))
×
NEW
222
                        cancel()
×
NEW
223
                        if err != nil {
×
NEW
224
                                l.Errorf("error notifying broker about message in progress: %s", err)
×
NEW
225
                                // If the +WPI message fails, let's not try again, but
×
NEW
226
                                // wait for the next message.
×
NEW
227
                        } else {
×
NEW
228
                                t.Reset(w.notifyPeriod)
×
NEW
229
                        }
×
NEW
230
                case msgInProgress, isOpen = <-msgIn:
29✔
NEW
231
                        if !isOpen {
39✔
NEW
232
                                return
10✔
NEW
233
                        } else if msgInProgress == nil {
39✔
NEW
234
                                t.Stop()
10✔
NEW
235
                        } else {
19✔
NEW
236
                                t.Reset(w.notifyPeriod)
9✔
NEW
237
                        }
9✔
NEW
238
                case <-ctxDone:
×
NEW
239
                        return
×
240
                }
241
        }
242
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc