• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 1354855011

01 Jul 2024 12:34AM UTC coverage: 81.632% (-0.3%) from 81.882%
1354855011

Pull #327

gitlab-ci

web-flow
chore: bump golang in the docker-dependencies group

Bumps the docker-dependencies group with 1 update: golang.


Updates `golang` from 1.22.3-alpine3.19 to 1.22.4-alpine3.19

---
updated-dependencies:
- dependency-name: golang
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: docker-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #327: chore: bump golang from 1.22.3-alpine3.19 to 1.22.4-alpine3.19 in the docker-dependencies group

1631 of 1998 relevant lines covered (81.63%)

14.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.86
/app/worker/worker.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "os"
20
        "os/signal"
21
        "time"
22

23
        natsio "github.com/nats-io/nats.go"
24
        "github.com/pkg/errors"
25
        "golang.org/x/sys/unix"
26

27
        "github.com/mendersoftware/go-lib-micro/config"
28
        "github.com/mendersoftware/go-lib-micro/log"
29

30
        "github.com/mendersoftware/workflows/client/nats"
31
        dconfig "github.com/mendersoftware/workflows/config"
32
        "github.com/mendersoftware/workflows/store"
33
)
34

35
// Workflows filters workflows executed by a worker
36
type Workflows struct {
37
        Included []string
38
        Excluded []string
39
}
40

41
// InitAndRun initializes the worker and runs it
42
func InitAndRun(
43
        conf config.Reader,
44
        workflows Workflows,
45
        dataStore store.DataStore,
46
        natsClient nats.Client,
47
) error {
1✔
48
        ctx, cancel := context.WithCancel(context.Background())
1✔
49
        // Calling cancel() before returning should shut down
1✔
50
        // all workers. However, the new driver is not
1✔
51
        // particularly good at listening to the context in the
1✔
52
        // current state, but it'll be forced to shut down
1✔
53
        // eventually.
1✔
54
        defer cancel()
1✔
55

1✔
56
        if conf.GetBool(dconfig.SettingDebugLog) {
1✔
57
                log.Setup(true)
×
58
        }
×
59
        l := log.FromContext(ctx)
1✔
60

1✔
61
        err := dataStore.LoadWorkflows(ctx, l)
1✔
62
        if err != nil {
1✔
63
                return errors.Wrap(err, "failed to load workflows")
×
64
        }
×
65

66
        streamName := config.Config.GetString(dconfig.SettingNatsStreamName)
1✔
67
        topic := config.Config.GetString(dconfig.SettingNatsSubscriberTopic)
1✔
68
        subject := streamName + "." + topic
1✔
69
        durableName := config.Config.GetString(dconfig.SettingNatsSubscriberDurable)
1✔
70
        concurrency := conf.GetInt(dconfig.SettingConcurrency)
1✔
71

1✔
72
        cfg, err := natsClient.GetConsumerConfig(durableName)
1✔
73
        if err != nil {
1✔
74
                return err
×
75
        }
×
76
        notifyPeriod := cfg.AckWait * 8 / 10
1✔
77
        if notifyPeriod < time.Second {
1✔
78
                notifyPeriod = time.Second
×
79
        }
×
80

81
        jobChan := make(chan *natsio.Msg, concurrency)
1✔
82
        unsubscribe, err := natsClient.JetStreamSubscribe(
1✔
83
                ctx,
1✔
84
                subject,
1✔
85
                durableName,
1✔
86
                jobChan,
1✔
87
        )
1✔
88
        if err != nil {
1✔
89
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
×
90
        }
×
91

92
        quit := make(chan os.Signal, 1)
1✔
93
        signal.Notify(quit, unix.SIGINT, unix.SIGTERM)
1✔
94

1✔
95
        // Spawn worker pool
1✔
96
        wg := NewWorkGroup(jobChan, notifyPeriod, natsClient, dataStore)
1✔
97
        for i := 0; i < concurrency; i++ {
11✔
98
                go wg.RunWorker(ctx)
10✔
99
        }
10✔
100

101
        // Run until a SIGTERM or SIGINT is received or one of the workers
102
        // stops unexpectedly
103
        select {
1✔
104
        case <-wg.FirstDone():
×
105
                l.Warnf("worker %d terminated, application is terminating", wg.TermID())
×
106
        case sig := <-quit:
1✔
107
                l.Warnf("received signal %s: terminating workers", sig)
1✔
108
        case <-ctx.Done():
×
109
                err = ctx.Err()
×
110
        }
111
        errSub := unsubscribe()
1✔
112
        if errSub != nil {
1✔
113
                l.Errorf("error unsubscribing from Jetstream: %s", errSub.Error())
×
114
        }
×
115
        // Notify workers that we're done
116
        close(jobChan)
1✔
117
        if err == nil {
2✔
118
                l.Infof("waiting up to %s for all workers to finish", cfg.AckWait)
1✔
119
                select {
1✔
120
                case <-wg.Done():
1✔
121
                case sig := <-quit:
×
122
                        l.Warnf("received signal %s while waiting: aborting", sig)
×
123
                case <-time.After(cfg.AckWait):
×
124
                }
125
        }
126
        return err
1✔
127
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc