• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 1565759653

29 Nov 2024 07:56AM UTC coverage: 67.786% (-14.5%) from 82.255%
1565759653

push

gitlab-ci

web-flow
Merge pull request #336 from alfrunes/2.6.x

chore(deps): Update golang builder images to latest

1050 of 1549 relevant lines covered (67.79%)

5.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/app/worker/worker_group.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "sync"
21
        "sync/atomic"
22
        "time"
23

24
        natsio "github.com/nats-io/nats.go"
25
        "github.com/pkg/errors"
26

27
        "github.com/mendersoftware/go-lib-micro/log"
28

29
        "github.com/mendersoftware/workflows/client/nats"
30
        "github.com/mendersoftware/workflows/model"
31
        "github.com/mendersoftware/workflows/store"
32
)
33

34
type workerGroup struct {
35
        mu        sync.Mutex    // Mutex to protect the shared firstDone channel
36
        firstDone chan struct{} // First worker finished
37
        done      chan struct{} // All workers finished
38
        termID    int32         // ID of the first worker to finish
39

40
        sidecarDone chan struct{}
41
        workerCount int32
42

43
        input        <-chan *natsio.Msg
44
        notifyPeriod time.Duration
45

46
        client nats.Client
47
        store  store.DataStore
48
}
49

50
func NewWorkGroup(
51
        input <-chan *natsio.Msg,
52
        notifyPeriod time.Duration,
53
        nc nats.Client,
54
        ds store.DataStore,
55
) *workerGroup {
×
56
        return &workerGroup{
×
57
                sidecarDone: make(chan struct{}),
×
58
                done:        make(chan struct{}),
×
59
                firstDone:   make(chan struct{}),
×
60

×
61
                input:        input,
×
62
                notifyPeriod: notifyPeriod,
×
63
                client:       nc,
×
64
                store:        ds,
×
65
        }
×
66
}
×
67

68
// Done returns a channel (barrier) that is closed when the last worker
69
// has exited.
70
func (w *workerGroup) Done() <-chan struct{} {
×
71
        return w.done
×
72
}
×
73

74
// FirstDone returns a channel (barrier) that is closed when the first
75
// worker has exited.
76
func (w *workerGroup) FirstDone() <-chan struct{} {
×
77
        return w.firstDone
×
78
}
×
79

80
// TermID is the ID of the first worker that quit.
81
func (w *workerGroup) TermID() int32 {
×
82
        return w.termID
×
83
}
×
84

85
func (w *workerGroup) RunWorker(ctx context.Context) {
×
86
        id := atomic.AddInt32(&w.workerCount, 1)
×
87
        l := log.FromContext(ctx)
×
88
        l = l.F(log.Ctx{"worker_id": id})
×
89
        ctx = log.WithContext(ctx, l)
×
90

×
91
        sidecarChan := make(chan *natsio.Msg, 1)
×
92
        sidecarDone := make(chan struct{})
×
93
        defer func() {
×
94
                l.Info("worker shutting down")
×
95
                close(sidecarChan)
×
96

×
97
                w.mu.Lock()
×
98
                remaining := atomic.AddInt32(&w.workerCount, -1)
×
99
                // Is this the last worker to quit?
×
100
                if remaining <= 0 {
×
101
                        select {
×
102
                        case <-w.done:
×
103

104
                        default:
×
105
                                close(w.done)
×
106
                        }
107
                } else {
×
108
                        // Is this the first worker to quit?
×
109
                        select {
×
110
                        case <-w.firstDone:
×
111

112
                        default:
×
113
                                w.termID = id
×
114
                                close(w.firstDone)
×
115
                        }
116
                }
117
                w.mu.Unlock()
×
118
        }()
119
        l.Info("worker starting up")
×
120
        // workerSidecar is responsible for notifying the broker about slow workflows
×
121
        go w.workerSidecar(ctx, sidecarChan, sidecarDone)
×
122
        w.workerMain(ctx, sidecarChan, sidecarDone)
×
123
}
124

125
func (w *workerGroup) workerMain(
126
        ctx context.Context,
127
        sidecarChan chan *natsio.Msg,
128
        sidecarDone chan struct{},
129
) {
×
130
        l := log.FromContext(ctx)
×
131
        ctxDone := ctx.Done()
×
132
        timeoutTimer := newStoppedTimer()
×
133
        for {
×
134
                var (
×
135
                        msg    *natsio.Msg
×
136
                        isOpen bool
×
137
                )
×
138
                select {
×
139
                case msg, isOpen = <-w.input:
×
140
                        if !isOpen {
×
141
                                return
×
142
                        }
×
143

144
                case <-sidecarDone:
×
145
                        return
×
146
                case <-ctxDone:
×
147
                        return
×
148
                }
149

150
                // Notify the sidecar routine about the new message
151
                select {
×
152
                case sidecarChan <- msg:
×
153

154
                case <-timeoutTimer.After(w.notifyPeriod / 8):
×
155
                        l.Warn("timeout notifying sidecar routine about message")
×
156

157
                case <-sidecarDone:
×
158
                        return
×
159
                case <-ctxDone:
×
160
                        return
×
161
                }
162

163
                job := &model.Job{}
×
164
                err := json.Unmarshal(msg.Data, job)
×
165
                if err != nil {
×
166
                        l.Error(errors.Wrap(err, "failed to unmarshall message"))
×
167
                        if err := msg.Term(); err != nil {
×
168
                                l.Error(errors.Wrap(err, "failed to term the message"))
×
169
                        }
×
170
                        continue
×
171
                }
172
                // process the job
173
                l.Infof("processing job %s workflow %s", job.ID, job.WorkflowName)
×
174
                err = processJob(ctx, job, w.store, w.client)
×
175
                if err != nil {
×
176
                        l.Errorf("error processing job: %s", err.Error())
×
177
                } else {
×
178
                        l.Infof("finished job %s workflow %s", job.ID, job.WorkflowName)
×
179
                }
×
180
                // stop the in progress ticker and ack the message
181
                select {
×
182
                case sidecarChan <- nil:
×
183

184
                case <-timeoutTimer.After(w.notifyPeriod):
×
185
                        l.Errorf("timeout notifying sidecar about job completion")
×
186

187
                case <-ctxDone:
×
188
                        return
×
189
                case <-sidecarDone:
×
190
                        return
×
191
                }
192
                // Release message
193
                if err := msg.AckSync(); err != nil {
×
194
                        l.Error(errors.Wrap(err, "failed to ack the message"))
×
195
                }
×
196
        }
197
}
198

199
// workerSidecar helps notifying the NATS server about slow workflows.
200
// When workerMain picks up a new task, this routine is woken up and starts
201
// a timer that sends an "IN PROGRESS" package back to the broker if the worker
202
// takes too long.
203
func (w *workerGroup) workerSidecar(
204
        ctx context.Context,
205
        msgIn <-chan *natsio.Msg,
206
        done chan<- struct{},
207
) {
×
208
        var (
×
209
                isOpen        bool
×
210
                msgInProgress *natsio.Msg
×
211
                ctxDone       = ctx.Done()
×
212
                l             = log.FromContext(ctx)
×
213
        )
×
214
        defer close(done)
×
215

×
216
        t := newStoppedTimer()
×
217
        for {
×
218
                select {
×
219
                case <-t.C:
×
220
                        ctx, cancel := context.WithTimeout(ctx, w.notifyPeriod)
×
221
                        err := msgInProgress.InProgress(natsio.Context(ctx))
×
222
                        cancel()
×
223
                        if err != nil {
×
224
                                l.Errorf("error notifying broker about message in progress: %s", err)
×
225
                                // If the +WPI message fails, let's not try again, but
×
226
                                // wait for the next message.
×
227
                        } else {
×
228
                                t.Reset(w.notifyPeriod)
×
229
                        }
×
230
                case msgInProgress, isOpen = <-msgIn:
×
231
                        if !isOpen {
×
232
                                return
×
233
                        } else if msgInProgress == nil {
×
234
                                t.Stop()
×
235
                        } else {
×
236
                                t.Reset(w.notifyPeriod)
×
237
                        }
×
238
                case <-ctxDone:
×
239
                        return
×
240
                }
241
        }
242
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc