• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 1565759653

29 Nov 2024 07:56AM UTC coverage: 67.786% (-14.5%) from 82.255%
1565759653

push

gitlab-ci

web-flow
Merge pull request #336 from alfrunes/2.6.x

chore(deps): Update golang builder images to latest

1050 of 1549 relevant lines covered (67.79%)

5.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/app/worker/worker.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "os"
20
        "os/signal"
21
        "time"
22

23
        natsio "github.com/nats-io/nats.go"
24
        "github.com/pkg/errors"
25
        "golang.org/x/sys/unix"
26

27
        "github.com/mendersoftware/go-lib-micro/config"
28
        "github.com/mendersoftware/go-lib-micro/log"
29

30
        "github.com/mendersoftware/workflows/client/nats"
31
        dconfig "github.com/mendersoftware/workflows/config"
32
        "github.com/mendersoftware/workflows/store"
33
)
34

35
// Workflows filters workflows executed by a worker
36
type Workflows struct {
37
        Included []string
38
        Excluded []string
39
}
40

41
// InitAndRun initializes the worker and runs it
42
func InitAndRun(
43
        conf config.Reader,
44
        workflows Workflows,
45
        dataStore store.DataStore,
46
        natsClient nats.Client,
47
) error {
×
48
        ctx, cancel := context.WithCancel(context.Background())
×
49
        // Calling cancel() before returning should shut down
×
50
        // all workers. However, the new driver is not
×
51
        // particularly good at listening to the context in the
×
52
        // current state, but it'll be forced to shut down
×
53
        // eventually.
×
54
        defer cancel()
×
55

×
56
        if conf.GetBool(dconfig.SettingDebugLog) {
×
57
                log.Setup(true)
×
58
        }
×
59
        l := log.FromContext(ctx)
×
60

×
61
        err := dataStore.LoadWorkflows(ctx, l)
×
62
        if err != nil {
×
63
                return errors.Wrap(err, "failed to load workflows")
×
64
        }
×
65

66
        streamName := config.Config.GetString(dconfig.SettingNatsStreamName)
×
67
        topic := config.Config.GetString(dconfig.SettingNatsSubscriberTopic)
×
68
        subject := streamName + "." + topic
×
69
        durableName := config.Config.GetString(dconfig.SettingNatsSubscriberDurable)
×
70
        concurrency := conf.GetInt(dconfig.SettingConcurrency)
×
71

×
72
        cfg, err := natsClient.GetConsumerConfig(durableName)
×
73
        if err != nil {
×
74
                return err
×
75
        }
×
76
        notifyPeriod := cfg.AckWait * 8 / 10
×
77
        if notifyPeriod < time.Second {
×
78
                notifyPeriod = time.Second
×
79
        }
×
80

81
        jobChan := make(chan *natsio.Msg, concurrency)
×
82
        unsubscribe, err := natsClient.JetStreamSubscribe(
×
83
                ctx,
×
84
                subject,
×
85
                durableName,
×
86
                jobChan,
×
87
        )
×
88
        if err != nil {
×
89
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
×
90
        }
×
91

92
        quit := make(chan os.Signal, 1)
×
93
        signal.Notify(quit, unix.SIGINT, unix.SIGTERM)
×
94

×
95
        // Spawn worker pool
×
96
        wg := NewWorkGroup(jobChan, notifyPeriod, natsClient, dataStore)
×
97
        for i := 0; i < concurrency; i++ {
×
98
                go wg.RunWorker(ctx)
×
99
        }
×
100

101
        // Run until a SIGTERM or SIGINT is received or one of the workers
102
        // stops unexpectedly
103
        select {
×
104
        case <-wg.FirstDone():
×
105
                l.Warnf("worker %d terminated, application is terminating", wg.TermID())
×
106
        case sig := <-quit:
×
107
                l.Warnf("received signal %s: terminating workers", sig)
×
108
        case <-ctx.Done():
×
109
                err = ctx.Err()
×
110
        }
111
        errSub := unsubscribe()
×
112
        if errSub != nil {
×
113
                l.Errorf("error unsubscribing from Jetstream: %s", errSub.Error())
×
114
        }
×
115
        // Notify workers that we're done
116
        close(jobChan)
×
117
        if err == nil {
×
118
                l.Infof("waiting up to %s for all workers to finish", cfg.AckWait)
×
119
                select {
×
120
                case <-wg.Done():
×
121
                case sig := <-quit:
×
122
                        l.Warnf("received signal %s while waiting: aborting", sig)
×
123
                case <-time.After(cfg.AckWait):
×
124
                }
125
        }
126
        return err
×
127
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc