• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1495380963

14 Oct 2024 03:35PM UTC coverage: 70.373% (-2.5%) from 72.904%
1495380963

Pull #101

gitlab-ci

mineralsfree
feat: tenant list added

Ticket: MEN-7568
Changelog: None

Signed-off-by: Mikita Pilinka <mikita.pilinka@northern.tech>
Pull Request #101: feat: tenant list added

4406 of 6391 branches covered (68.94%)

Branch coverage included in aggregate %.

88 of 183 new or added lines in 10 files covered. (48.09%)

2623 existing lines in 65 files now uncovered.

36673 of 51982 relevant lines covered (70.55%)

31.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/backend/services/workflows/app/worker/worker.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package worker
16

17
import (
18
        "context"
19
        "os"
20
        "os/signal"
21
        "time"
22

23
        natsio "github.com/nats-io/nats.go"
24
        "github.com/pkg/errors"
25
        "golang.org/x/sys/unix"
26

27
        "github.com/mendersoftware/mender-server/pkg/config"
28
        "github.com/mendersoftware/mender-server/pkg/log"
29

30
        "github.com/mendersoftware/mender-server/services/workflows/client/nats"
31
        dconfig "github.com/mendersoftware/mender-server/services/workflows/config"
32
        "github.com/mendersoftware/mender-server/services/workflows/store"
33
)
34

35
// Workflows filters workflows executed by a worker
36
type Workflows struct {
37
        Included []string
38
        Excluded []string
39
}
40

41
// InitAndRun initializes the worker and runs it
42
func InitAndRun(
43
        conf config.Reader,
44
        workflows Workflows,
45
        dataStore store.DataStore,
46
        natsClient nats.Client,
UNCOV
47
) error {
×
UNCOV
48
        ctx, cancel := context.WithCancel(context.Background())
×
UNCOV
49
        // Calling cancel() before returning should shut down
×
UNCOV
50
        // all workers. However, the new driver is not
×
UNCOV
51
        // particularly good at listening to the context in the
×
UNCOV
52
        // current state, but it'll be forced to shut down
×
UNCOV
53
        // eventually.
×
UNCOV
54
        defer cancel()
×
UNCOV
55

×
UNCOV
56
        if conf.GetBool(dconfig.SettingDebugLog) {
×
57
                log.Setup(true)
×
58
        }
×
UNCOV
59
        l := log.FromContext(ctx)
×
UNCOV
60

×
UNCOV
61
        err := dataStore.LoadWorkflows(ctx, l)
×
UNCOV
62
        if err != nil {
×
63
                return errors.Wrap(err, "failed to load workflows")
×
64
        }
×
65

UNCOV
66
        streamName := config.Config.GetString(dconfig.SettingNatsStreamName)
×
UNCOV
67
        topic := config.Config.GetString(dconfig.SettingNatsSubscriberTopic)
×
UNCOV
68
        subject := streamName + "." + topic
×
UNCOV
69
        durableName := config.Config.GetString(dconfig.SettingNatsSubscriberDurable)
×
UNCOV
70
        concurrency := conf.GetInt(dconfig.SettingConcurrency)
×
UNCOV
71

×
UNCOV
72
        cfg, err := natsClient.GetConsumerConfig(durableName)
×
UNCOV
73
        if err != nil {
×
74
                return err
×
75
        }
×
UNCOV
76
        notifyPeriod := cfg.AckWait * 8 / 10
×
UNCOV
77
        if notifyPeriod < time.Second {
×
78
                notifyPeriod = time.Second
×
79
        }
×
80

UNCOV
81
        jobChan := make(chan *natsio.Msg, concurrency)
×
UNCOV
82
        unsubscribe, err := natsClient.JetStreamSubscribe(
×
UNCOV
83
                ctx,
×
UNCOV
84
                subject,
×
UNCOV
85
                durableName,
×
UNCOV
86
                jobChan,
×
UNCOV
87
        )
×
UNCOV
88
        if err != nil {
×
89
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
×
90
        }
×
91

UNCOV
92
        quit := make(chan os.Signal, 1)
×
UNCOV
93
        signal.Notify(quit, unix.SIGINT, unix.SIGTERM)
×
UNCOV
94

×
UNCOV
95
        // Spawn worker pool
×
UNCOV
96
        wg := NewWorkGroup(jobChan, notifyPeriod, natsClient, dataStore)
×
UNCOV
97
        for i := 0; i < concurrency; i++ {
×
UNCOV
98
                go wg.RunWorker(ctx)
×
UNCOV
99
        }
×
100

101
        // Run until a SIGTERM or SIGINT is received or one of the workers
102
        // stops unexpectedly
UNCOV
103
        select {
×
104
        case <-wg.FirstDone():
×
105
                l.Warnf("worker %d terminated, application is terminating", wg.TermID())
×
UNCOV
106
        case sig := <-quit:
×
UNCOV
107
                l.Warnf("received signal %s: terminating workers", sig)
×
108
        case <-ctx.Done():
×
109
                err = ctx.Err()
×
110
        }
UNCOV
111
        errSub := unsubscribe()
×
UNCOV
112
        if errSub != nil {
×
113
                l.Errorf("error unsubscribing from Jetstream: %s", errSub.Error())
×
114
        }
×
115
        // Notify workers that we're done
UNCOV
116
        close(jobChan)
×
UNCOV
117
        if err == nil {
×
UNCOV
118
                l.Infof("waiting up to %s for all workers to finish", cfg.AckWait)
×
UNCOV
119
                select {
×
UNCOV
120
                case <-wg.Done():
×
121
                case sig := <-quit:
×
122
                        l.Warnf("received signal %s while waiting: aborting", sig)
×
123
                case <-time.After(cfg.AckWait):
×
124
                }
125
        }
UNCOV
126
        return err
×
127
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc