• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1622978334

13 Jan 2025 03:51PM UTC coverage: 72.802% (-3.8%) from 76.608%
1622978334

Pull #300

gitlab-ci

alfrunes
fix: Deployment device count should not exceed max devices

Added a condition to skip deployments when the device count reaches max
devices.

Changelog: Title
Ticket: MEN-7847
Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
Pull Request #300: fix: Deployment device count should not exceed max devices

4251 of 6164 branches covered (68.96%)

Branch coverage included in aggregate %.

0 of 18 new or added lines in 1 file covered. (0.0%)

2544 existing lines in 83 files now uncovered.

42741 of 58384 relevant lines covered (73.21%)

21.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.0
/backend/services/deployments/main.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package main
16

17
import (
18
        "context"
19
        "encoding/json"
20
        "fmt"
21
        "os"
22
        "strings"
23
        "time"
24

25
        "github.com/pkg/errors"
26
        "github.com/urfave/cli"
27

28
        "github.com/mendersoftware/mender-server/pkg/config"
29
        "github.com/mendersoftware/mender-server/pkg/identity"
30
        "github.com/mendersoftware/mender-server/pkg/log"
31
        mstore "github.com/mendersoftware/mender-server/pkg/store"
32
        "github.com/mendersoftware/mender-server/pkg/version"
33

34
        "github.com/mendersoftware/mender-server/services/deployments/app"
35
        "github.com/mendersoftware/mender-server/services/deployments/client/workflows"
36
        dconfig "github.com/mendersoftware/mender-server/services/deployments/config"
37
        "github.com/mendersoftware/mender-server/services/deployments/store"
38
        "github.com/mendersoftware/mender-server/services/deployments/store/mongo"
39
)
40

41
const (
42
        deviceDeploymentsBatchSize = 512
43

44
        cliDefaultRateLimit = 50
45
)
46

47
var appVersion = version.Get()
48

49
func main() {
1✔
50
        doMain(os.Args)
1✔
51
}
1✔
52

53
func doMain(args []string) {
1✔
54

1✔
55
        var configPath string
1✔
56

1✔
57
        app := cli.NewApp()
1✔
58
        app.Usage = "Deployments Service"
1✔
59
        app.Version = appVersion.Version
1✔
60

1✔
61
        app.Flags = []cli.Flag{
1✔
62
                cli.StringFlag{
1✔
63
                        Name: "config",
1✔
64
                        Usage: "Configuration `FILE`." +
1✔
65
                                " Supports JSON, TOML, YAML and HCL formatted configs.",
1✔
66
                        Destination: &configPath,
1✔
67
                },
1✔
68
        }
1✔
69

1✔
70
        app.Commands = []cli.Command{
1✔
71
                {
1✔
72
                        Name:  "server",
1✔
73
                        Usage: "Run the service as a server",
1✔
74
                        Flags: []cli.Flag{
1✔
75
                                cli.BoolFlag{
1✔
76
                                        Name:  "automigrate",
1✔
77
                                        Usage: "Run database migrations before starting.",
1✔
78
                                },
1✔
79
                        },
1✔
80

1✔
81
                        Action: cmdServer,
1✔
82
                },
1✔
83
                {
1✔
84
                        Name:  "migrate",
1✔
85
                        Usage: "Run migrations and exit",
1✔
86
                        Flags: []cli.Flag{
1✔
87
                                cli.StringFlag{
1✔
88
                                        Name:  "tenant",
1✔
89
                                        Usage: "Tenant ID (optional).",
1✔
90
                                },
1✔
91
                        },
1✔
92

1✔
93
                        Action: cmdMigrate,
1✔
94
                },
1✔
95
                {
1✔
96
                        Name:  "propagate-reporting",
1✔
97
                        Usage: "Trigger a reindex of all the device deployments in the reporting services ",
1✔
98
                        Flags: []cli.Flag{
1✔
99
                                cli.StringFlag{
1✔
100
                                        Name:  "tenant_id",
1✔
101
                                        Usage: "Tenant ID (optional) - propagate for just a single tenant.",
1✔
102
                                },
1✔
103
                                cli.UintFlag{
1✔
104
                                        Name:  "rate-limit",
1✔
105
                                        Usage: "`N`umber of reindexing batch requests per second",
1✔
106
                                        Value: cliDefaultRateLimit,
1✔
107
                                },
1✔
108
                                cli.BoolFlag{
1✔
109
                                        Name: "dry-run",
1✔
110
                                        Usage: "Do not perform any modifications," +
1✔
111
                                                " just scan and print devices.",
1✔
112
                                },
1✔
113
                        },
1✔
114

1✔
115
                        Action: cmdPropagateReporting,
1✔
116
                },
1✔
117
                {
1✔
118
                        Name:  "storage-daemon",
1✔
119
                        Usage: "Start storage daemon cleaning up expired objects from storage",
1✔
120
                        Flags: []cli.Flag{
1✔
121
                                cli.DurationFlag{
1✔
122
                                        Name: "interval",
1✔
123
                                        Usage: "Time interval to run cleanup routine; " +
1✔
124
                                                "a value of 0 runs the daemon for one " +
1✔
125
                                                "iteration and terminates (cron mode).",
1✔
126
                                        Value: 0,
1✔
127
                                },
1✔
128
                                cli.DurationFlag{
1✔
129
                                        Name: "time-jitter",
1✔
130
                                        Usage: "The time jitter added for expired links. " +
1✔
131
                                                "Links must be expired for `DURATION` " +
1✔
132
                                                "to be removed.",
1✔
133
                                        Value: time.Second * 3,
1✔
134
                                },
1✔
135
                        },
1✔
136
                        Action: cmdStorageDaemon,
1✔
137
                },
1✔
138
                {
1✔
139
                        Name:  "version",
1✔
140
                        Usage: "Show version information",
1✔
141
                        Flags: []cli.Flag{
1✔
142
                                cli.StringFlag{
1✔
143
                                        Name:  "output",
1✔
144
                                        Usage: "Output format <json|text>",
1✔
145
                                        Value: "text",
1✔
146
                                },
1✔
147
                        },
1✔
148
                        Action: func(args *cli.Context) error {
1✔
149
                                switch strings.ToLower(args.String("output")) {
×
150
                                case "text":
×
151
                                        fmt.Print(appVersion)
×
152
                                case "json":
×
153
                                        _ = json.NewEncoder(os.Stdout).Encode(appVersion)
×
154
                                default:
×
155
                                        return fmt.Errorf("Unknown output format %q", args.String("output"))
×
156
                                }
157
                                return nil
×
158
                        },
159
                },
160
        }
161

162
        app.Action = cmdServer
1✔
163
        app.Before = func(args *cli.Context) error {
2✔
164
                if err := dconfig.Setup(configPath); err != nil {
1✔
165
                        return cli.NewExitError(err.Error(), 1)
×
166
                }
×
167

168
                return nil
1✔
169
        }
170

171
        err := app.Run(args)
1✔
172
        if err != nil {
1✔
173
                log.NewEmpty().Fatal(err.Error())
×
174
        }
×
175
}
176

UNCOV
177
func cmdServer(args *cli.Context) error {
×
UNCOV
178
        devSetup := args.GlobalBool("dev")
×
UNCOV
179

×
UNCOV
180
        l := log.New(log.Ctx{})
×
UNCOV
181

×
UNCOV
182
        if devSetup {
×
183
                l.Infof("setting up development configuration")
×
184
                config.Config.Set(dconfig.SettingMiddleware, dconfig.EnvDev)
×
185
        }
×
186

UNCOV
187
        l.Print("Deployments Service starting up")
×
UNCOV
188
        err := migrate("", args.Bool("automigrate"))
×
UNCOV
189
        if err != nil {
×
190
                return err
×
191
        }
×
192

UNCOV
193
        setupContext, cancel := context.WithTimeout(
×
UNCOV
194
                context.Background(),
×
UNCOV
195
                time.Second*30,
×
UNCOV
196
        )
×
UNCOV
197
        err = RunServer(setupContext)
×
UNCOV
198
        cancel()
×
UNCOV
199
        if err != nil {
×
200
                return cli.NewExitError(err.Error(), 4)
×
201
        }
×
202

UNCOV
203
        return nil
×
204
}
205

206
func cmdMigrate(args *cli.Context) error {
1✔
207
        tenant := args.String("tenant")
1✔
208
        return migrate(tenant, true)
1✔
209
}
1✔
210

211
func migrate(tenant string, automigrate bool) error {
1✔
212
        ctx := context.Background()
1✔
213

1✔
214
        dbClient, err := mongo.NewMongoClient(ctx, config.Config)
1✔
215
        if err != nil {
1✔
216
                return cli.NewExitError(
×
217
                        fmt.Sprintf("failed to connect to db: %v", err),
×
218
                        3)
×
219
        }
×
220
        defer func() {
2✔
221
                _ = dbClient.Disconnect(ctx)
1✔
222
        }()
1✔
223

224
        dbVersion := mongo.DbVersion
1✔
225
        if !automigrate {
1✔
226
                dbVersion = mongo.DbMinimumVersion
×
227
        }
×
228

229
        if tenant != "" {
1✔
230
                db := mstore.DbNameForTenant(tenant, mongo.DbName)
×
231
                err = mongo.MigrateSingle(ctx, db, dbVersion, dbClient, automigrate)
×
232
        } else {
1✔
233
                err = mongo.Migrate(ctx, dbVersion, dbClient, automigrate)
1✔
234
        }
1✔
235
        if err != nil {
1✔
236
                return cli.NewExitError(
×
237
                        fmt.Sprintf("failed to run migrations: %v", err),
×
238
                        3)
×
239
        }
×
240

241
        return nil
1✔
242
}
243

244
func cmdStorageDaemon(args *cli.Context) error {
×
245
        ctx := context.Background()
×
246
        objectStorage, err := SetupObjectStorage(ctx)
×
247
        if err != nil {
×
248
                return err
×
249
        }
×
250
        mgo, err := mongo.NewMongoClient(ctx, config.Config)
×
251
        if err != nil {
×
252
                return err
×
253
        }
×
254
        database := mongo.NewDataStoreMongoWithClient(mgo)
×
255
        app := app.NewDeployments(database, objectStorage, 0, false)
×
256
        return app.CleanupExpiredUploads(
×
257
                ctx,
×
258
                args.Duration("interval"),
×
259
                args.Duration("time-jitter"),
×
260
        )
×
261
}
262

263
func cmdPropagateReporting(args *cli.Context) error {
×
264
        if config.Config.GetString(dconfig.SettingReportingAddr) == "" {
×
265
                return cli.NewExitError(errors.New("reporting address not configured"), 1)
×
266
        }
×
267
        c := config.Config
×
268
        ctx, cancel := context.WithTimeout(
×
269
                context.Background(),
×
270
                time.Second*30,
×
271
        )
×
272
        defer cancel()
×
273
        dbClient, err := mongo.NewMongoClient(ctx, c)
×
274
        if err != nil {
×
275
                return err
×
276
        }
×
277
        defer func() {
×
278
                _ = dbClient.Disconnect(context.Background())
×
279
        }()
×
280

281
        db := mongo.NewDataStoreMongoWithClient(dbClient)
×
282

×
283
        wflows := workflows.NewClient()
×
284

×
285
        var requestPeriod time.Duration
×
286
        rateLimit := args.Uint("rate-limit")
×
287
        if rateLimit > 0 {
×
288
                requestPeriod = time.Second / time.Duration(args.Uint("rate-limit"))
×
289
        }
×
290

291
        err = propagateReporting(
×
292
                db,
×
293
                wflows,
×
294
                args.String("tenant_id"),
×
295
                requestPeriod,
×
296
                args.Bool("dry-run"),
×
297
        )
×
298
        if err != nil {
×
299
                return cli.NewExitError(err, 7)
×
300
        }
×
301
        return nil
×
302
}
303

304
func propagateReporting(
305
        db store.DataStore,
306
        wflows workflows.Client,
307
        tenant string,
308
        requestPeriod time.Duration,
309
        dryRun bool,
310
) error {
1✔
311
        l := log.NewEmpty()
1✔
312

1✔
313
        dbs, err := selectDbs(db, tenant)
1✔
314
        if err != nil {
1✔
315
                return errors.Wrap(err, "aborting")
×
316
        }
×
317

318
        var errReturned error
1✔
319
        for _, d := range dbs {
2✔
320
                err := tryPropagateReportingForDb(db, wflows, d, requestPeriod, dryRun)
1✔
321
                if err != nil {
1✔
322
                        errReturned = err
×
323
                        l.Errorf("giving up on DB %s due to fatal error: %s", d, err.Error())
×
324
                        continue
×
325
                }
326
        }
327

328
        l.Info("all DBs processed, exiting.")
1✔
329
        return errReturned
1✔
330
}
331

332
func selectDbs(db store.DataStore, tenant string) ([]string, error) {
1✔
333
        l := log.NewEmpty()
1✔
334

1✔
335
        var dbs []string
1✔
336

1✔
337
        if tenant != "" {
1✔
338
                l.Infof("propagating deployments history for user-specified tenant %s", tenant)
×
339
                n := mstore.DbNameForTenant(tenant, mongo.DbName)
×
340
                dbs = []string{n}
×
341
        } else {
1✔
342
                l.Infof("propagating deployments history for all tenants")
1✔
343

1✔
344
                // infer if we're in ST or MT
1✔
345
                tdbs, err := db.GetTenantDbs()
1✔
346
                if err != nil {
1✔
347
                        return nil, errors.Wrap(err, "failed to retrieve tenant DBs")
×
348
                }
×
349

350
                if len(tdbs) == 0 {
1✔
351
                        l.Infof("no tenant DBs found - will try the default database %s", mongo.DbName)
×
352
                        dbs = []string{mongo.DbName}
×
353
                } else {
1✔
354
                        dbs = tdbs
1✔
355
                }
1✔
356
        }
357

358
        return dbs, nil
1✔
359
}
360

361
func tryPropagateReportingForDb(
362
        db store.DataStore,
363
        wflows workflows.Client,
364
        dbname string,
365
        requestPeriod time.Duration,
366
        dryRun bool,
367
) error {
1✔
368
        l := log.NewEmpty()
1✔
369

1✔
370
        l.Infof("propagating deployments data to reporting from DB: %s", dbname)
1✔
371

1✔
372
        tenant := mstore.TenantFromDbName(dbname, mongo.DbName)
1✔
373

1✔
374
        ctx := context.Background()
1✔
375
        if tenant != "" {
1✔
376
                ctx = identity.WithContext(ctx, &identity.Identity{
×
377
                        Tenant: tenant,
×
378
                })
×
379
        }
×
380

381
        err := reindexDeploymentsReporting(ctx, db, wflows, tenant, requestPeriod, dryRun)
1✔
382
        if err != nil {
1✔
383
                l.Infof("Done with DB %s, but there were errors: %s.", dbname, err.Error())
×
384
        } else {
1✔
385
                l.Infof("Done with DB %s", dbname)
1✔
386
        }
1✔
387

388
        return err
1✔
389
}
390

391
func reindexDeploymentsReporting(
392
        ctx context.Context,
393
        db store.DataStore,
394
        wflows workflows.Client,
395
        tenant string,
396
        requestPeriod time.Duration,
397
        dryRun bool,
398
) error {
1✔
399
        var skip int
1✔
400

1✔
401
        done := ctx.Done()
1✔
402
        ticker := time.NewTicker(requestPeriod)
1✔
403
        defer ticker.Stop()
1✔
404
        skip = 0
1✔
405
        for {
2✔
406
                dd, err := db.GetDeviceDeployments(ctx, skip, deviceDeploymentsBatchSize, "", nil, true)
1✔
407
                if err != nil {
1✔
408
                        return errors.Wrap(err, "failed to get device deployments")
×
409
                }
×
410

411
                if len(dd) < 1 {
1✔
412
                        break
×
413
                }
414

415
                if !dryRun {
2✔
416
                        deviceDeployments := make([]workflows.DeviceDeploymentShortInfo, len(dd))
1✔
417
                        for i, d := range dd {
2✔
418
                                deviceDeployments[i].ID = d.Id
1✔
419
                                deviceDeployments[i].DeviceID = d.DeviceId
1✔
420
                                deviceDeployments[i].DeploymentID = d.DeploymentId
1✔
421
                        }
1✔
422
                        err := wflows.StartReindexReportingDeploymentBatch(ctx, deviceDeployments)
1✔
423
                        if err != nil {
1✔
424
                                return err
×
425
                        }
×
426
                }
427

428
                skip += deviceDeploymentsBatchSize
1✔
429
                if len(dd) < deviceDeploymentsBatchSize {
2✔
430
                        break
1✔
431
                }
432
                select {
×
433
                case <-ticker.C:
×
434

435
                case <-done:
×
436
                        return ctx.Err()
×
437
                }
438
        }
439
        return nil
1✔
440
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc