• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 1354855011

01 Jul 2024 12:34AM UTC coverage: 81.632% (-0.3%) from 81.882%
1354855011

Pull #327

gitlab-ci

web-flow
chore: bump golang in the docker-dependencies group

Bumps the docker-dependencies group with 1 update: golang.


Updates `golang` from 1.22.3-alpine3.19 to 1.22.4-alpine3.19

---
updated-dependencies:
- dependency-name: golang
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: docker-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #327: chore: bump golang from 1.22.3-alpine3.19 to 1.22.4-alpine3.19 in the docker-dependencies group

1631 of 1998 relevant lines covered (81.63%)

14.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.74
/app/worker/worker_group.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "sync"
21
        "sync/atomic"
22
        "time"
23

24
        natsio "github.com/nats-io/nats.go"
25
        "github.com/pkg/errors"
26

27
        "github.com/mendersoftware/go-lib-micro/log"
28

29
        "github.com/mendersoftware/workflows/client/nats"
30
        "github.com/mendersoftware/workflows/model"
31
        "github.com/mendersoftware/workflows/store"
32
)
33

34
type workerGroup struct {
35
        mu        sync.Mutex    // Mutex to protect the shared firstDone channel
36
        firstDone chan struct{} // First worker finished
37
        done      chan struct{} // All workers finished
38
        termID    int32         // ID of the first worker to finish
39

40
        sidecarDone chan struct{}
41
        workerCount int32
42

43
        input        <-chan *natsio.Msg
44
        notifyPeriod time.Duration
45

46
        client nats.Client
47
        store  store.DataStore
48
}
49

50
func NewWorkGroup(
51
        input <-chan *natsio.Msg,
52
        notifyPeriod time.Duration,
53
        nc nats.Client,
54
        ds store.DataStore,
55
) *workerGroup {
1✔
56
        return &workerGroup{
1✔
57
                sidecarDone: make(chan struct{}),
1✔
58
                done:        make(chan struct{}),
1✔
59
                firstDone:   make(chan struct{}),
1✔
60

1✔
61
                input:        input,
1✔
62
                notifyPeriod: notifyPeriod,
1✔
63
                client:       nc,
1✔
64
                store:        ds,
1✔
65
        }
1✔
66
}
1✔
67

68
// Done returns a channel (barrier) that is closed when the last worker
69
// has exited.
70
func (w *workerGroup) Done() <-chan struct{} {
1✔
71
        return w.done
1✔
72
}
1✔
73

74
// FirstDone returns a channel (barrier) that is closed when the first
75
// worker has exited.
76
func (w *workerGroup) FirstDone() <-chan struct{} {
1✔
77
        return w.firstDone
1✔
78
}
1✔
79

80
// TermID is the ID of the first worker that quit.
81
func (w *workerGroup) TermID() int32 {
×
82
        return w.termID
×
83
}
×
84

85
func (w *workerGroup) RunWorker(ctx context.Context) {
10✔
86
        id := atomic.AddInt32(&w.workerCount, 1)
10✔
87
        l := log.FromContext(ctx)
10✔
88
        l = l.F(log.Ctx{"worker_id": id})
10✔
89
        ctx = log.WithContext(ctx, l)
10✔
90

10✔
91
        sidecarChan := make(chan *natsio.Msg, 1)
10✔
92
        sidecarDone := make(chan struct{})
10✔
93
        defer func() {
20✔
94
                l.Info("worker shutting down")
10✔
95
                close(sidecarChan)
10✔
96

10✔
97
                w.mu.Lock()
10✔
98
                remaining := atomic.AddInt32(&w.workerCount, -1)
10✔
99
                // Is this the last worker to quit?
10✔
100
                if remaining <= 0 {
11✔
101
                        select {
1✔
102
                        case <-w.done:
×
103

104
                        default:
1✔
105
                                close(w.done)
1✔
106
                        }
107
                } else {
9✔
108
                        // Is this the first worker to quit?
9✔
109
                        select {
9✔
110
                        case <-w.firstDone:
8✔
111

112
                        default:
1✔
113
                                w.termID = id
1✔
114
                                close(w.firstDone)
1✔
115
                        }
116
                }
117
                w.mu.Unlock()
10✔
118
        }()
119
        l.Info("worker starting up")
10✔
120
        // workerSidecar is responsible for notifying the broker about slow workflows
10✔
121
        go w.workerSidecar(ctx, sidecarChan, sidecarDone)
10✔
122
        w.workerMain(ctx, sidecarChan, sidecarDone)
10✔
123
}
124

125
func (w *workerGroup) workerMain(
126
        ctx context.Context,
127
        sidecarChan chan *natsio.Msg,
128
        sidecarDone chan struct{},
129
) {
10✔
130
        l := log.FromContext(ctx)
10✔
131
        ctxDone := ctx.Done()
10✔
132
        timeoutTimer := newStoppedTimer()
10✔
133
        for {
30✔
134
                var (
20✔
135
                        msg    *natsio.Msg
20✔
136
                        isOpen bool
20✔
137
                )
20✔
138
                select {
20✔
139
                case msg, isOpen = <-w.input:
20✔
140
                        if !isOpen {
30✔
141
                                return
10✔
142
                        }
10✔
143

144
                case <-sidecarDone:
×
145
                        return
×
146
                case <-ctxDone:
×
147
                        return
×
148
                }
149

150
                // Notify the sidecar routine about the new message
151
                select {
10✔
152
                case sidecarChan <- msg:
9✔
153

154
                case <-timeoutTimer.After(w.notifyPeriod / 8):
1✔
155
                        l.Warn("timeout notifying sidecar routine about message")
1✔
156

157
                case <-sidecarDone:
×
158
                        return
×
159
                case <-ctxDone:
×
160
                        return
×
161
                }
162

163
                job := &model.Job{}
10✔
164
                err := json.Unmarshal(msg.Data, job)
10✔
165
                if err != nil {
10✔
166
                        l.Error(errors.Wrap(err, "failed to unmarshall message"))
×
167
                        if err := msg.Term(); err != nil {
×
168
                                l.Error(errors.Wrap(err, "failed to term the message"))
×
169
                        }
×
170
                        continue
×
171
                }
172
                // process the job
173
                l.Infof("processing job %s workflow %s", job.ID, job.WorkflowName)
10✔
174
                err = processJob(ctx, job, w.store, w.client)
10✔
175
                if err != nil {
10✔
176
                        l.Errorf("error processing job: %s", err.Error())
×
177
                } else {
10✔
178
                        l.Infof("finished job %s workflow %s", job.ID, job.WorkflowName)
10✔
179
                }
10✔
180
                // stop the in progress ticker and ack the message
181
                select {
10✔
182
                case sidecarChan <- nil:
10✔
183

184
                case <-timeoutTimer.After(w.notifyPeriod):
×
185
                        l.Errorf("timeout notifying sidecar about job completion")
×
186

187
                case <-ctxDone:
×
188
                        return
×
189
                case <-sidecarDone:
×
190
                        return
×
191
                }
192
                // Release message
193
                if err := msg.AckSync(); err != nil {
10✔
194
                        l.Error(errors.Wrap(err, "failed to ack the message"))
×
195
                }
×
196
        }
197
}
198

199
// workerSidecar helps notifying the NATS server about slow workflows.
200
// When workerMain picks up a new task, this routine is woken up and starts
201
// a timer that sends an "IN PROGRESS" package back to the broker if the worker
202
// takes too long.
203
func (w *workerGroup) workerSidecar(
204
        ctx context.Context,
205
        msgIn <-chan *natsio.Msg,
206
        done chan<- struct{},
207
) {
10✔
208
        var (
10✔
209
                isOpen        bool
10✔
210
                msgInProgress *natsio.Msg
10✔
211
                ctxDone       = ctx.Done()
10✔
212
                l             = log.FromContext(ctx)
10✔
213
        )
10✔
214
        defer close(done)
10✔
215

10✔
216
        t := newStoppedTimer()
10✔
217
        for {
39✔
218
                select {
29✔
219
                case <-t.C:
×
220
                        ctx, cancel := context.WithTimeout(ctx, w.notifyPeriod)
×
221
                        err := msgInProgress.InProgress(natsio.Context(ctx))
×
222
                        cancel()
×
223
                        if err != nil {
×
224
                                l.Errorf("error notifying broker about message in progress: %s", err)
×
225
                                // If the +WPI message fails, let's not try again, but
×
226
                                // wait for the next message.
×
227
                        } else {
×
228
                                t.Reset(w.notifyPeriod)
×
229
                        }
×
230
                case msgInProgress, isOpen = <-msgIn:
29✔
231
                        if !isOpen {
39✔
232
                                return
10✔
233
                        } else if msgInProgress == nil {
39✔
234
                                t.Stop()
10✔
235
                        } else {
19✔
236
                                t.Reset(w.notifyPeriod)
9✔
237
                        }
9✔
238
                case <-ctxDone:
×
239
                        return
×
240
                }
241
        }
242
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc