• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 981419520

25 Aug 2023 09:40AM UTC coverage: 83.342% (-0.1%) from 83.45%
981419520

Pull #281

gitlab-ci

oldgiova
feat: added multiplatform build

Ticket: QA-613

Changelog: Added multiplatform build

Signed-off-by: Roberto Giovanardi <roberto.giovanardi@northern.tech>
Pull Request #281: WIP Multiplatform build

1551 of 1861 relevant lines covered (83.34%)

14.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.26
/app/worker/worker.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "os"
21
        "os/signal"
22
        "sync"
23
        "time"
24

25
        natsio "github.com/nats-io/nats.go"
26
        "github.com/pkg/errors"
27
        "golang.org/x/sys/unix"
28

29
        "github.com/mendersoftware/go-lib-micro/config"
30
        "github.com/mendersoftware/go-lib-micro/log"
31

32
        "github.com/mendersoftware/workflows/client/nats"
33
        dconfig "github.com/mendersoftware/workflows/config"
34
        "github.com/mendersoftware/workflows/model"
35
        "github.com/mendersoftware/workflows/store"
36
)
37

38
// Workflows filters workflows executed by a worker
39
type Workflows struct {
40
        Included []string
41
        Excluded []string
42
}
43

44
// InitAndRun initializes the worker and runs it
45
func InitAndRun(
46
        conf config.Reader,
47
        workflows Workflows,
48
        dataStore store.DataStore,
49
        natsClient nats.Client,
50
) error {
1✔
51
        ctx, cancel := context.WithCancel(context.Background())
1✔
52
        // Calling cancel() before returning should shut down
1✔
53
        // all workers. However, the new driver is not
1✔
54
        // particularly good at listening to the context in the
1✔
55
        // current state, but it'll be forced to shut down
1✔
56
        // eventually.
1✔
57
        defer cancel()
1✔
58

1✔
59
        if conf.GetBool(dconfig.SettingDebugLog) {
1✔
60
                log.Setup(true)
×
61
        }
×
62
        l := log.FromContext(ctx)
1✔
63

1✔
64
        err := dataStore.LoadWorkflows(ctx, l)
1✔
65
        if err != nil {
1✔
66
                return errors.Wrap(err, "failed to load workflows")
×
67
        }
×
68

69
        streamName := config.Config.GetString(dconfig.SettingNatsStreamName)
1✔
70
        topic := config.Config.GetString(dconfig.SettingNatsSubscriberTopic)
1✔
71
        subject := streamName + "." + topic
1✔
72
        durableName := config.Config.GetString(dconfig.SettingNatsSubscriberDurable)
1✔
73
        concurrency := conf.GetInt(dconfig.SettingConcurrency)
1✔
74

1✔
75
        cfg, err := natsClient.GetConsumerConfig(durableName)
1✔
76
        if err != nil {
1✔
77
                return err
×
78
        }
×
79
        notifyPeriod := cfg.AckWait * 8 / 10
1✔
80
        if notifyPeriod < time.Second {
1✔
81
                notifyPeriod = time.Second
×
82
        }
×
83

84
        channel := make(chan *natsio.Msg, concurrency)
1✔
85
        unsubscribe, err := natsClient.JetStreamSubscribe(
1✔
86
                ctx,
1✔
87
                subject,
1✔
88
                durableName,
1✔
89
                channel,
1✔
90
        )
1✔
91
        if err != nil {
1✔
92
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
×
93
        }
×
94
        defer func() {
2✔
95
                _ = unsubscribe()
1✔
96
        }()
1✔
97

98
        quit := make(chan os.Signal, 1)
1✔
99
        signal.Notify(quit, unix.SIGINT, unix.SIGTERM)
1✔
100

1✔
101
        // Spawn worker pool
1✔
102
        var wg = new(sync.WaitGroup)
1✔
103
        wg.Add(concurrency)
1✔
104
        for i := 0; i < concurrency; i++ {
11✔
105
                workerID := i
10✔
106
                go func() {
20✔
107
                        defer func() {
20✔
108
                                wg.Done()
10✔
109
                                cancel() // Unblock main thread if it's waiting
10✔
110
                        }()
10✔
111
                        wl := l.F(log.Ctx{"worker_id": workerID})
10✔
112
                        wCtx := log.WithContext(ctx, wl)
10✔
113

10✔
114
                        wl.Info("worker starting up")
10✔
115
                        workerMain(wCtx, channel, notifyPeriod, natsClient, dataStore)
10✔
116
                        wl.Info("worker shut down")
10✔
117
                }()
118
        }
119

120
        // Run until a SIGTERM or SIGINT is received or one of the workers
121
        // stops unexpectedly
122
        select {
1✔
123
        case <-quit:
1✔
124
        case <-ctx.Done():
×
125
                err = ctx.Err()
×
126
        }
127
        // Notify workers that we're done
128
        close(channel)
1✔
129

1✔
130
        // Wait for all workers to finish their current task
1✔
131
        wg.Wait()
1✔
132
        return err
1✔
133
}
134

135
func workerMain(
136
        ctx context.Context,
137
        msgIn <-chan *natsio.Msg,
138
        notifyPeriod time.Duration,
139
        nc nats.Client,
140
        ds store.DataStore) {
10✔
141
        l := log.FromContext(ctx)
10✔
142
        sidecarChan := make(chan *natsio.Msg, 1)
10✔
143
        sidecarTimer := (*reusableTimer)(time.NewTimer(0))
10✔
144
        defer close(sidecarChan)
10✔
145
        done := ctx.Done()
10✔
146

10✔
147
        // workerSidecar is responsible for notifying the broker about slow workflows
10✔
148
        go workerSidecar(ctx, sidecarChan, notifyPeriod)
10✔
149
        var isOpen bool
10✔
150
        for {
30✔
151
                var msg *natsio.Msg
20✔
152
                select {
20✔
153
                case <-done:
×
154
                        return
×
155
                case msg, isOpen = <-msgIn:
20✔
156
                        if !isOpen {
30✔
157
                                return
10✔
158
                        }
10✔
159
                        // Notify the sidecar routine about the new message
160
                        select {
10✔
161
                        case <-sidecarTimer.After(notifyPeriod / 8):
×
162
                                l.Warn("timeout notifying sidecar routine about message")
×
163

164
                        case sidecarChan <- msg:
10✔
165
                        }
166
                }
167

168
                job := &model.Job{}
10✔
169
                err := json.Unmarshal(msg.Data, job)
10✔
170
                if err != nil {
10✔
171
                        l.Error(errors.Wrap(err, "failed to unmarshall message"))
×
172
                        if err := msg.Term(); err != nil {
×
173
                                l.Error(errors.Wrap(err, "failed to term the message"))
×
174
                        }
×
175
                        continue
×
176
                }
177
                // process the job
178
                l.Infof("Worker: processing job %s workflow %s", job.ID, job.WorkflowName)
10✔
179
                err = processJob(ctx, job, ds, nc)
10✔
180
                if err != nil {
10✔
181
                        l.Errorf("error: %v", err)
×
182
                }
×
183
                // Release message
184
                sidecarChan <- nil
10✔
185
                // stop the in progress ticker and ack the message
10✔
186
                if err := msg.AckSync(); err != nil {
10✔
187
                        l.Error(errors.Wrap(err, "failed to ack the message"))
×
188
                }
×
189
        }
190
}
191

192
// workerSidecar helps notifying the NATS server about slow workflows.
193
// When workerMain picks up a new task, this routine is woken up and starts
194
// a timer that sends an "IN PROGRESS" package back to the broker if the worker
195
// takes too long.
196
func workerSidecar(ctx context.Context, msgIn <-chan *natsio.Msg, notifyPeriod time.Duration) {
10✔
197
        var (
10✔
198
                isOpen        bool
10✔
199
                msgInProgress *natsio.Msg
10✔
200
        )
10✔
201
        done := ctx.Done()
10✔
202
        t := (*reusableTimer)(time.NewTimer(0))
10✔
203
        for {
30✔
204
                select {
20✔
205
                case <-done:
×
206
                        return
×
207
                case msgInProgress, isOpen = <-msgIn:
20✔
208
                        if !isOpen {
30✔
209
                                return
10✔
210
                        }
10✔
211
                }
212
                for msgInProgress != nil {
20✔
213
                        select {
10✔
214
                        case <-t.After(notifyPeriod):
×
215
                                _ = msgInProgress.InProgress()
×
216
                        case <-done:
×
217
                                return
×
218
                        case msgInProgress, isOpen = <-msgIn:
10✔
219
                                if !isOpen {
10✔
220
                                        return
×
221
                                }
×
222
                        }
223
                }
224
        }
225
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc