• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 813731514

pending completion
813731514

push

gitlab-ci

GitHub
Merge pull request #267 from kjaskiewiczz/update-deps

1472 of 1729 relevant lines covered (85.14%)

20.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.03
/app/worker/worker.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "os"
21
        "os/signal"
22
        "sync"
23
        "time"
24

25
        natsio "github.com/nats-io/nats.go"
26
        "github.com/pkg/errors"
27
        "golang.org/x/sys/unix"
28

29
        "github.com/mendersoftware/go-lib-micro/config"
30
        "github.com/mendersoftware/go-lib-micro/log"
31

32
        "github.com/mendersoftware/workflows/client/nats"
33
        dconfig "github.com/mendersoftware/workflows/config"
34
        "github.com/mendersoftware/workflows/model"
35
        "github.com/mendersoftware/workflows/store"
36
)
37

38
// notifyPeriod is the frequency for notifying the
39
// NATS server about slow workflows.
40
const notifyPeriod = nats.AckWait * 8 / 10
41

42
// Workflows filters workflows executed by a worker
43
type Workflows struct {
44
        Included []string
45
        Excluded []string
46
}
47

48
// InitAndRun initializes the worker and runs it
49
func InitAndRun(
50
        conf config.Reader,
51
        workflows Workflows,
52
        dataStore store.DataStore,
53
        natsClient nats.Client,
54
) error {
1✔
55
        ctx, cancel := context.WithCancel(context.Background())
1✔
56
        // Calling cancel() before returning should shut down
1✔
57
        // all workers. However, the new driver is not
1✔
58
        // particularly good at listening to the context in the
1✔
59
        // current state, but it'll be forced to shut down
1✔
60
        // eventually.
1✔
61
        defer cancel()
1✔
62

1✔
63
        if conf.GetBool(dconfig.SettingDebugLog) {
1✔
64
                log.Setup(true)
×
65
        }
×
66
        l := log.FromContext(ctx)
1✔
67

1✔
68
        err := dataStore.LoadWorkflows(ctx, l)
1✔
69
        if err != nil {
1✔
70
                return errors.Wrap(err, "failed to load workflows")
×
71
        }
×
72

73
        streamName := config.Config.GetString(dconfig.SettingNatsStreamName)
1✔
74
        topic := config.Config.GetString(dconfig.SettingNatsSubscriberTopic)
1✔
75
        subject := streamName + "." + topic
1✔
76
        durableName := config.Config.GetString(dconfig.SettingNatsSubscriberDurable)
1✔
77
        concurrency := conf.GetInt(dconfig.SettingConcurrency)
1✔
78

1✔
79
        channel := make(chan *natsio.Msg, concurrency)
1✔
80
        unsubscribe, err := natsClient.JetStreamSubscribe(
1✔
81
                ctx,
1✔
82
                subject,
1✔
83
                durableName,
1✔
84
                concurrency,
1✔
85
                channel,
1✔
86
        )
1✔
87
        if err != nil {
1✔
88
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
×
89
        }
×
90
        defer func() {
2✔
91
                _ = unsubscribe()
1✔
92
        }()
1✔
93

94
        quit := make(chan os.Signal, 1)
1✔
95
        signal.Notify(quit, unix.SIGINT, unix.SIGTERM)
1✔
96

1✔
97
        // Spawn worker pool
1✔
98
        var wg = new(sync.WaitGroup)
1✔
99
        wg.Add(concurrency)
1✔
100
        for i := 0; i < concurrency; i++ {
11✔
101
                workerID := i
10✔
102
                go func() {
20✔
103
                        defer func() {
20✔
104
                                wg.Done()
10✔
105
                                cancel() // Unblock main thread if it's waiting
10✔
106
                        }()
10✔
107
                        wl := l.F(log.Ctx{"worker_id": workerID})
10✔
108
                        wCtx := log.WithContext(ctx, wl)
10✔
109

10✔
110
                        wl.Info("worker starting up")
10✔
111
                        workerMain(wCtx, channel, natsClient, dataStore)
10✔
112
                        wl.Info("worker shut down")
10✔
113
                }()
114
        }
115

116
        // Run until a SIGTERM or SIGINT is received or one of the workers
117
        // stops unexpectedly
118
        select {
1✔
119
        case <-quit:
1✔
120
        case <-ctx.Done():
×
121
                err = ctx.Err()
×
122
        }
123
        // Notify workers that we're done
124
        close(channel)
1✔
125

1✔
126
        // Wait for all workers to finish their current task
1✔
127
        wg.Wait()
1✔
128
        return err
1✔
129
}
130

131
func workerMain(ctx context.Context, msgIn <-chan *natsio.Msg, nc nats.Client, ds store.DataStore) {
10✔
132
        l := log.FromContext(ctx)
10✔
133
        sidecarChan := make(chan *natsio.Msg, 1)
10✔
134
        sidecarTimer := (*reusableTimer)(time.NewTimer(0))
10✔
135
        defer close(sidecarChan)
10✔
136
        done := ctx.Done()
10✔
137

10✔
138
        // workerSidecar is responsible for notifying the broker about slow workflows
10✔
139
        go workerSidecar(ctx, sidecarChan)
10✔
140
        var isOpen bool
10✔
141
        for {
30✔
142
                var msg *natsio.Msg
20✔
143
                select {
20✔
144
                case <-done:
×
145
                        return
×
146
                case msg, isOpen = <-msgIn:
20✔
147
                        if !isOpen {
30✔
148
                                return
10✔
149
                        }
10✔
150
                        // Notify the sidecar routine about the new message
151
                        select {
10✔
152
                        case <-sidecarTimer.After(notifyPeriod / 8):
×
153
                                l.Warn("timeout notifying sidecar routine about message")
×
154

155
                        case sidecarChan <- msg:
10✔
156
                        }
157
                }
158

159
                job := &model.Job{}
10✔
160
                err := json.Unmarshal(msg.Data, job)
10✔
161
                if err != nil {
10✔
162
                        l.Error(errors.Wrap(err, "failed to unmarshall message"))
×
163
                        if err := msg.Term(); err != nil {
×
164
                                l.Error(errors.Wrap(err, "failed to term the message"))
×
165
                        }
×
166
                        continue
×
167
                }
168
                // process the job
169
                l.Infof("Worker: processing job %s workflow %s", job.ID, job.WorkflowName)
10✔
170
                err = processJob(ctx, job, ds, nc)
10✔
171
                if err != nil {
10✔
172
                        l.Errorf("error: %v", err)
×
173
                }
×
174
                // Release message
175
                sidecarChan <- nil
10✔
176
                // stop the in progress ticker and ack the message
10✔
177
                if err := msg.AckSync(); err != nil {
10✔
178
                        l.Error(errors.Wrap(err, "failed to ack the message"))
×
179
                }
×
180
        }
181
}
182

183
// workerSidecar helps notifying the NATS server about slow workflows.
184
// When workerMain picks up a new task, this routine is woken up and starts
185
// a timer that sends an "IN PROGRESS" package back to the broker if the worker
186
// takes too long.
187
func workerSidecar(ctx context.Context, msgIn <-chan *natsio.Msg) {
10✔
188
        var (
10✔
189
                isOpen        bool
10✔
190
                msgInProgress *natsio.Msg
10✔
191
        )
10✔
192
        done := ctx.Done()
10✔
193
        t := (*reusableTimer)(time.NewTimer(0))
10✔
194
        for {
30✔
195
                select {
20✔
196
                case <-done:
×
197
                        return
×
198
                case msgInProgress, isOpen = <-msgIn:
20✔
199
                        if !isOpen {
30✔
200
                                return
10✔
201
                        }
10✔
202
                }
203
                for msgInProgress != nil {
20✔
204
                        select {
10✔
205
                        case <-t.After(notifyPeriod):
×
206
                                _ = msgInProgress.InProgress()
×
207
                        case <-done:
×
208
                                return
×
209
                        case msgInProgress, isOpen = <-msgIn:
10✔
210
                                if !isOpen {
10✔
211
                                        return
×
212
                                }
×
213
                        }
214
                }
215
        }
216
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc