• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1689576603

26 Feb 2025 01:36PM UTC coverage: 76.183% (-0.02%) from 76.199%
1689576603

Pull #483

gitlab-ci

merlin-northern
feat(iot-manager): getting events by integration by ID

Allows to:
* get device events from iot-manager by integration ID
* creating new integration creates a new ID
* maintains backward compatibility for GET /event
* with UI calling the new endpoint this fixes the showing
  of events from old webhooks in the integraiton details

Changelog: Do not show events from previous webhooks in the integration details by making possible for the UI to get the events by integration ID, at the same time create new ID on ech integration creation.
Ticket: MEN-7801
Signed-off-by: Peter Grzybowski <peter@northern.tech>
Pull Request #483: feat(iot-manager): getting events by integration by ID

36 of 46 new or added lines in 6 files covered. (78.26%)

13 existing lines in 4 files now uncovered.

36875 of 48403 relevant lines covered (76.18%)

1.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.45
/backend/services/workflows/main.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package main
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "fmt"
21
        "log"
22
        "os"
23
        "strings"
24

25
        "github.com/pkg/errors"
26
        "github.com/urfave/cli"
27

28
        "github.com/mendersoftware/mender-server/pkg/config"
29
        "github.com/mendersoftware/mender-server/pkg/version"
30

31
        "github.com/mendersoftware/mender-server/services/workflows/app/server"
32
        "github.com/mendersoftware/mender-server/services/workflows/app/worker"
33
        "github.com/mendersoftware/mender-server/services/workflows/client/nats"
34
        dconfig "github.com/mendersoftware/mender-server/services/workflows/config"
35
        "github.com/mendersoftware/mender-server/services/workflows/model"
36
        store "github.com/mendersoftware/mender-server/services/workflows/store/mongo"
37
)
38

39
var appVersion = version.Get()
40

41
func main() {
2✔
42
        doMain(os.Args)
2✔
43
}
2✔
44

45
func doMain(args []string) {
2✔
46
        var configPath string
2✔
47

2✔
48
        app := &cli.App{
2✔
49
                Flags: []cli.Flag{
2✔
50
                        &cli.StringFlag{
2✔
51
                                Name: "config",
2✔
52
                                Usage: "Configuration `FILE`." +
2✔
53
                                        " Supports JSON, TOML, YAML and HCL formatted configs.",
2✔
54
                                Destination: &configPath,
2✔
55
                        },
2✔
56
                },
2✔
57
                Commands: []cli.Command{
2✔
58
                        {
2✔
59
                                Name:   "server",
2✔
60
                                Usage:  "Run the HTTP API server",
2✔
61
                                Action: cmdServer,
2✔
62
                                Flags: []cli.Flag{
2✔
63
                                        &cli.BoolFlag{
2✔
64
                                                Name:  "automigrate",
2✔
65
                                                Usage: "Run database migrations before starting.",
2✔
66
                                        },
2✔
67
                                },
2✔
68
                        },
2✔
69
                        {
2✔
70
                                Name:   "worker",
2✔
71
                                Usage:  "Run the worker process",
2✔
72
                                Action: cmdWorker,
2✔
73
                                Flags: []cli.Flag{
2✔
74
                                        &cli.BoolFlag{
2✔
75
                                                Name:  "automigrate",
2✔
76
                                                Usage: "Run database migrations before starting.",
2✔
77
                                        },
2✔
78
                                        &cli.StringFlag{
2✔
79
                                                Name:  "workflows",
2✔
80
                                                Usage: "Comma-separated list of workflows executed by this worker",
2✔
81
                                        },
2✔
82
                                        &cli.StringFlag{
2✔
83
                                                Name:  "excluded-workflows",
2✔
84
                                                Usage: "Comma-separated list of workflows NOT executed by this worker",
2✔
85
                                        },
2✔
86
                                },
2✔
87
                        },
2✔
88
                        {
2✔
89
                                Name:   "migrate",
2✔
90
                                Usage:  "Run the migrations",
2✔
91
                                Action: cmdMigrate,
2✔
92
                                Flags: []cli.Flag{
2✔
93
                                        cli.BoolFlag{
2✔
94
                                                Name:   "skip-nats",
2✔
95
                                                Usage:  "Skip migrating the NATS Jetstream configuration",
2✔
96
                                                EnvVar: "WORKFLOWS_MIGRATION_SKIP_NATS",
2✔
97
                                        },
2✔
98
                                        cli.BoolFlag{
2✔
99
                                                Name:   "skip-database",
2✔
100
                                                Usage:  "Skip migrating the database",
2✔
101
                                                EnvVar: "WORKFLOWS_MIGRATION_SKIP_DATABASE",
2✔
102
                                        },
2✔
103
                                },
2✔
104
                        },
2✔
105
                        {
2✔
106
                                Name:   "list-jobs",
2✔
107
                                Usage:  "List jobs",
2✔
108
                                Action: cmdListJobs,
2✔
109
                                Flags: []cli.Flag{
2✔
110
                                        cli.Int64Flag{
2✔
111
                                                Name:  "page",
2✔
112
                                                Usage: "page number to show",
2✔
113
                                        },
2✔
114
                                        cli.Int64Flag{
2✔
115
                                                Name:  "perPage",
2✔
116
                                                Usage: "number of results per page",
2✔
117
                                        },
2✔
118
                                },
2✔
119
                        },
2✔
120
                        {
2✔
121
                                Name:  "version",
2✔
122
                                Usage: "Show version information",
2✔
123
                                Flags: []cli.Flag{
2✔
124
                                        cli.StringFlag{
2✔
125
                                                Name:  "output",
2✔
126
                                                Usage: "Output format <json|text>",
2✔
127
                                                Value: "text",
2✔
128
                                        },
2✔
129
                                },
2✔
130
                                Action: func(args *cli.Context) error {
2✔
131
                                        switch strings.ToLower(args.String("output")) {
×
132
                                        case "text":
×
133
                                                fmt.Print(appVersion)
×
134
                                        case "json":
×
135
                                                _ = json.NewEncoder(os.Stdout).Encode(appVersion)
×
136
                                        default:
×
137
                                                return fmt.Errorf("Unknown output format %q", args.String("output"))
×
138
                                        }
139
                                        return nil
×
140
                                },
141
                        },
142
                },
143
                Version: appVersion.Version,
144
        }
145
        app.Usage = "Workflows"
2✔
146
        app.Action = cmdServer
2✔
147

2✔
148
        app.Before = func(args *cli.Context) error {
4✔
149
                err := config.FromConfigFile(configPath, dconfig.Defaults)
2✔
150
                if err != nil {
2✔
151
                        return cli.NewExitError(
×
152
                                fmt.Sprintf("error loading configuration: %s", err),
×
153
                                1)
×
154
                }
×
155

156
                // Enable setting config values by environment variables
157
                config.Config.SetEnvPrefix("WORKFLOWS")
2✔
158
                config.Config.AutomaticEnv()
2✔
159
                config.Config.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_"))
2✔
160

2✔
161
                return nil
2✔
162
        }
163

164
        err := app.Run(args)
2✔
165
        if err != nil {
2✔
UNCOV
166
                log.Fatal(err)
×
UNCOV
167
        }
×
168
}
169

170
func getNatsClient() (nats.Client, error) {
2✔
171
        natsURI := config.Config.GetString(dconfig.SettingNatsURI)
2✔
172
        streamName := config.Config.GetString(dconfig.SettingNatsStreamName)
2✔
173
        nats, err := nats.NewClientWithDefaults(natsURI, streamName)
2✔
174
        if err != nil {
2✔
175
                return nil, errors.Wrap(err, "failed to connect to nats")
×
176
        }
×
177
        return nats, err
2✔
178
}
179

180
func initJetstream(nc nats.Client, producer, upsert bool) (err error) {
2✔
181
        durableName := config.Config.GetString(dconfig.SettingNatsSubscriberDurable)
2✔
182
        if producer {
4✔
183
                err = nc.JetStreamCreateStream(nc.StreamName())
2✔
184
                if err != nil {
2✔
185
                        return err
×
186
                }
×
187
        } else {
2✔
188
                var cfg nats.ConsumerConfig
2✔
189
                cfg, err = dconfig.GetNatsConsumerConfig(config.Config)
2✔
190
                if err != nil {
2✔
191
                        return err
×
192
                }
×
193
                err = nc.CreateConsumer(durableName, upsert, cfg)
2✔
194
        }
195
        return err
2✔
196
}
197

198
func cmdServer(args *cli.Context) error {
2✔
199
        dataStore, err := store.SetupDataStore(args.Bool("automigrate"))
2✔
200
        if err != nil {
2✔
201
                return err
×
202
        }
×
203
        defer dataStore.Close()
2✔
204

2✔
205
        nats, err := getNatsClient()
2✔
206
        if err != nil {
2✔
207
                return err
×
208
        }
×
209
        defer nats.Close()
2✔
210

2✔
211
        if err = initJetstream(nats, true, args.Bool("automigrate")); err != nil {
2✔
212
                return errors.WithMessage(err, "failed to apply Jetstream migrations")
×
213
        }
×
214

215
        return server.InitAndRun(config.Config, dataStore, nats)
2✔
216
}
217

218
func cmdWorker(args *cli.Context) error {
2✔
219
        dataStore, err := store.SetupDataStore(args.Bool("automigrate"))
2✔
220
        if err != nil {
2✔
UNCOV
221
                return err
×
UNCOV
222
        }
×
223
        defer dataStore.Close()
2✔
224

2✔
225
        nats, err := getNatsClient()
2✔
226
        if err != nil {
2✔
227
                return err
×
228
        }
×
229
        defer nats.Close()
2✔
230

2✔
231
        if err = initJetstream(nats, false, args.Bool("automigrate")); err != nil {
2✔
232
                return errors.WithMessage(err, "failed to apply Jetstream consumer migrations")
×
233
        }
×
234

235
        var included, excluded []string
2✔
236

2✔
237
        includedWorkflows := args.String("workflows")
2✔
238
        if includedWorkflows != "" {
2✔
239
                included = strings.Split(includedWorkflows, ",")
×
240
        }
×
241

242
        excludedWorkflows := args.String("excluded-workflows")
2✔
243
        if excludedWorkflows != "" {
2✔
244
                excluded = strings.Split(excludedWorkflows, ",")
×
245
        }
×
246

247
        workflows := worker.Workflows{
2✔
248
                Included: included,
2✔
249
                Excluded: excluded,
2✔
250
        }
2✔
251
        return worker.InitAndRun(config.Config, workflows, dataStore, nats)
2✔
252
}
253

254
func cmdMigrate(args *cli.Context) error {
×
255
        var err error
×
256
        if !args.Bool("skip-database") {
×
257
                _, err = store.SetupDataStore(true)
×
258
                if err != nil {
×
259
                        return err
×
260
                }
×
261
        }
262
        if !args.Bool("skip-nats") {
×
263
                var nc nats.Client
×
264
                nc, err = getNatsClient()
×
265
                if err == nil {
×
266
                        if err = initJetstream(nc, true, true); err == nil {
×
267
                                err = initJetstream(nc, false, true)
×
268
                        }
×
269
                }
270

271
        }
272
        return err
×
273
}
274

275
func cmdListJobs(args *cli.Context) error {
×
276
        dataStore, err := store.SetupDataStore(false)
×
277
        if err != nil {
×
278
                return err
×
279
        }
×
280
        defer dataStore.Close()
×
281

×
282
        ctx, cancel := context.WithCancel(context.Background())
×
283
        defer cancel()
×
284

×
285
        var page int64
×
286
        var perPage int64
×
287
        page = args.Int64("page")
×
288
        perPage = args.Int64("perPage")
×
289

×
290
        if page < 1 {
×
291
                page = 1
×
292
        }
×
293
        if perPage < 1 {
×
294
                perPage = 75
×
295
        }
×
296
        jobs, count, _ := dataStore.GetAllJobs(ctx, page, perPage)
×
297
        fmt.Printf("all jobs: %d; page: %d/%d perPage:%d\n%29s %24s %10s %s\n",
×
298
                count, page, count/perPage, perPage, "insert time", "id", "status", "workflow")
×
299
        for _, j := range jobs {
×
300
                format := "Mon, 2 Jan 2006 15:04:05 MST"
×
301
                fmt.Printf(
×
302
                        "%29s %24s %10s %s\n",
×
303
                        j.InsertTime.Format(format),
×
304
                        j.ID, model.StatusToString(j.Status),
×
305
                        j.WorkflowName,
×
306
                )
×
307
        }
×
308
        fmt.Printf("all jobs: %d; page: %d/%d\n", count, page, count/perPage)
×
309

×
310
        return nil
×
311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc