• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / deployments / 843450222

pending completion
843450222

Pull #854

gitlab-ci

Alf-Rune Siqveland
chore: Add `--throttle` flag to `propagate-reporting` command
Pull Request #854: chore: Add `--throttle` flag to `propagate-reporting` command

8 of 11 new or added lines in 1 file covered. (72.73%)

434 existing lines in 4 files now uncovered.

6943 of 8758 relevant lines covered (79.28%)

70.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.09
/main.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package main
16

17
import (
18
        "context"
19
        "fmt"
20
        "os"
21
        "time"
22

23
        "github.com/mendersoftware/go-lib-micro/config"
24
        "github.com/mendersoftware/go-lib-micro/identity"
25
        "github.com/mendersoftware/go-lib-micro/log"
26
        mstore "github.com/mendersoftware/go-lib-micro/store"
27
        "github.com/pkg/errors"
28
        "github.com/urfave/cli"
29

30
        "github.com/mendersoftware/deployments/app"
31
        "github.com/mendersoftware/deployments/client/workflows"
32
        dconfig "github.com/mendersoftware/deployments/config"
33
        "github.com/mendersoftware/deployments/store"
34
        "github.com/mendersoftware/deployments/store/mongo"
35
)
36

37
const (
38
        deviceDeploymentsBatchSize = 512
39
)
40

41
func main() {
×
42
        doMain(os.Args)
×
43
}
×
44

45
func doMain(args []string) {
1✔
46

1✔
47
        var configPath string
1✔
48

1✔
49
        app := cli.NewApp()
1✔
50
        app.Usage = "Deployments Service"
1✔
51

1✔
52
        app.Flags = []cli.Flag{
1✔
53
                cli.StringFlag{
1✔
54
                        Name: "config",
1✔
55
                        Usage: "Configuration `FILE`." +
1✔
56
                                " Supports JSON, TOML, YAML and HCL formatted configs.",
1✔
57
                        Destination: &configPath,
1✔
58
                },
1✔
59
        }
1✔
60

1✔
61
        app.Commands = []cli.Command{
1✔
62
                {
1✔
63
                        Name:  "server",
1✔
64
                        Usage: "Run the service as a server",
1✔
65
                        Flags: []cli.Flag{
1✔
66
                                cli.BoolFlag{
1✔
67
                                        Name:  "automigrate",
1✔
68
                                        Usage: "Run database migrations before starting.",
1✔
69
                                },
1✔
70
                        },
1✔
71

1✔
72
                        Action: cmdServer,
1✔
73
                },
1✔
74
                {
1✔
75
                        Name:  "migrate",
1✔
76
                        Usage: "Run migrations and exit",
1✔
77
                        Flags: []cli.Flag{
1✔
78
                                cli.StringFlag{
1✔
79
                                        Name:  "tenant",
1✔
80
                                        Usage: "Tenant ID (optional).",
1✔
81
                                },
1✔
82
                        },
1✔
83

1✔
84
                        Action: cmdMigrate,
1✔
85
                },
1✔
86
                {
1✔
87
                        Name:  "propagate-reporting",
1✔
88
                        Usage: "Trigger a reindex of all the device deployments in the reporting services ",
1✔
89
                        Flags: []cli.Flag{
1✔
90
                                cli.StringFlag{
1✔
91
                                        Name:  "tenant_id",
1✔
92
                                        Usage: "Tenant ID (optional) - propagate for just a single tenant.",
1✔
93
                                },
1✔
94
                                cli.DurationFlag{
1✔
95
                                        Name:  "throttle",
1✔
96
                                        Usage: "`DURATION` to wait for each reindexing batch request",
1✔
97
                                        Value: time.Millisecond * 25,
1✔
98
                                },
1✔
99
                                cli.BoolFlag{
1✔
100
                                        Name: "dry-run",
1✔
101
                                        Usage: "Do not perform any modifications," +
1✔
102
                                                " just scan and print devices.",
1✔
103
                                },
1✔
104
                        },
1✔
105

1✔
106
                        Action: cmdPropagateReporting,
1✔
107
                },
1✔
108
                {
1✔
109
                        Name:  "storage-daemon",
1✔
110
                        Usage: "Start storage daemon cleaning up expired objects from storage",
1✔
111
                        Flags: []cli.Flag{
1✔
112
                                cli.DurationFlag{
1✔
113
                                        Name: "interval",
1✔
114
                                        Usage: "Time interval to run cleanup routine; " +
1✔
115
                                                "a value of 0 runs the daemon for one " +
1✔
116
                                                "iteration and terminates (cron mode).",
1✔
117
                                        Value: 0,
1✔
118
                                },
1✔
119
                                cli.DurationFlag{
1✔
120
                                        Name: "time-jitter",
1✔
121
                                        Usage: "The time jitter added for expired links. " +
1✔
122
                                                "Links must be expired for `DURATION` " +
1✔
123
                                                "to be removed.",
1✔
124
                                        Value: time.Second * 3,
1✔
125
                                },
1✔
126
                        },
1✔
127
                        Action: cmdStorageDaemon,
1✔
128
                },
1✔
129
        }
1✔
130

1✔
131
        app.Action = cmdServer
1✔
132
        app.Before = func(args *cli.Context) error {
2✔
133
                if err := dconfig.Setup(configPath); err != nil {
1✔
134
                        return cli.NewExitError(err.Error(), 1)
×
135
                }
×
136

137
                return nil
1✔
138
        }
139

140
        err := app.Run(args)
1✔
141
        if err != nil {
1✔
142
                log.NewEmpty().Fatal(err.Error())
×
143
        }
×
144
}
145

146
func cmdServer(args *cli.Context) error {
1✔
147
        devSetup := args.GlobalBool("dev")
1✔
148

1✔
149
        l := log.New(log.Ctx{})
1✔
150

1✔
151
        if devSetup {
1✔
152
                l.Infof("setting up development configuration")
×
153
                config.Config.Set(dconfig.SettingMiddleware, dconfig.EnvDev)
×
154
        }
×
155

156
        l.Print("Deployments Service starting up")
1✔
157
        err := migrate("", args.Bool("automigrate"))
1✔
158
        if err != nil {
1✔
159
                return err
×
160
        }
×
161

162
        setupContext, cancel := context.WithTimeout(
1✔
163
                context.Background(),
1✔
164
                time.Second*30,
1✔
165
        )
1✔
166
        err = RunServer(setupContext)
1✔
167
        cancel()
1✔
168
        if err != nil {
1✔
169
                return cli.NewExitError(err.Error(), 4)
×
170
        }
×
171

172
        return nil
×
173
}
174

175
func cmdMigrate(args *cli.Context) error {
1✔
176
        tenant := args.String("tenant")
1✔
177
        return migrate(tenant, true)
1✔
178
}
1✔
179

180
func migrate(tenant string, automigrate bool) error {
1✔
181
        ctx := context.Background()
1✔
182

1✔
183
        dbClient, err := mongo.NewMongoClient(ctx, config.Config)
1✔
184
        if err != nil {
1✔
185
                return cli.NewExitError(
×
186
                        fmt.Sprintf("failed to connect to db: %v", err),
×
187
                        3)
×
188
        }
×
189
        defer func() {
2✔
190
                _ = dbClient.Disconnect(ctx)
1✔
191
        }()
1✔
192

193
        if tenant != "" {
1✔
194
                db := mstore.DbNameForTenant(tenant, mongo.DbName)
×
195
                err = mongo.MigrateSingle(ctx, db, mongo.DbVersion, dbClient, true)
×
196
        } else {
1✔
197
                err = mongo.Migrate(ctx, mongo.DbVersion, dbClient, true)
1✔
198
        }
1✔
199
        if err != nil {
1✔
200
                return cli.NewExitError(
×
201
                        fmt.Sprintf("failed to run migrations: %v", err),
×
202
                        3)
×
203
        }
×
204

205
        return nil
1✔
206
}
207

208
func cmdStorageDaemon(args *cli.Context) error {
×
209
        ctx := context.Background()
×
210
        objectStorage, err := SetupObjectStorage(ctx)
×
211
        if err != nil {
×
212
                return err
×
213
        }
×
214
        mgo, err := mongo.NewMongoClient(ctx, config.Config)
×
215
        if err != nil {
×
216
                return err
×
217
        }
×
218
        database := mongo.NewDataStoreMongoWithClient(mgo)
×
219
        app := app.NewDeployments(database, objectStorage)
×
220
        return app.CleanupExpiredUploads(
×
221
                ctx,
×
222
                args.Duration("interval"),
×
223
                args.Duration("time-jitter"),
×
224
        )
×
225
}
226

227
func cmdPropagateReporting(args *cli.Context) error {
×
228
        if config.Config.GetString(dconfig.SettingReportingAddr) == "" {
×
229
                return cli.NewExitError(errors.New("reporting address not configured"), 1)
×
230
        }
×
231
        c := config.Config
×
232
        ctx, cancel := context.WithTimeout(
×
233
                context.Background(),
×
234
                time.Second*30,
×
235
        )
×
236
        defer cancel()
×
237
        dbClient, err := mongo.NewMongoClient(ctx, c)
×
238
        if err != nil {
×
239
                return err
×
240
        }
×
241
        defer func() {
×
242
                _ = dbClient.Disconnect(context.Background())
×
243
        }()
×
244

245
        db := mongo.NewDataStoreMongoWithClient(dbClient)
×
246

×
247
        wflows := workflows.NewClient()
×
248

×
249
        err = propagateReporting(
×
250
                db,
×
251
                wflows,
×
252
                args.String("tenant_id"),
×
NEW
253
                args.Duration("throttle"),
×
254
                args.Bool("dry-run"),
×
255
        )
×
256
        if err != nil {
×
257
                return cli.NewExitError(err, 7)
×
258
        }
×
259
        return nil
×
260
}
261

262
func propagateReporting(
263
        db store.DataStore,
264
        wflows workflows.Client,
265
        tenant string,
266
        throttleDuration time.Duration,
267
        dryRun bool,
268
) error {
4✔
269
        l := log.NewEmpty()
4✔
270

4✔
271
        dbs, err := selectDbs(db, tenant)
4✔
272
        if err != nil {
4✔
273
                return errors.Wrap(err, "aborting")
×
274
        }
×
275

276
        var errReturned error
4✔
277
        for _, d := range dbs {
8✔
278
                err := tryPropagateReportingForDb(db, wflows, d, throttleDuration, dryRun)
4✔
279
                if err != nil {
4✔
280
                        errReturned = err
×
281
                        l.Errorf("giving up on DB %s due to fatal error: %s", d, err.Error())
×
282
                        continue
×
283
                }
284
        }
285

286
        l.Info("all DBs processed, exiting.")
4✔
287
        return errReturned
4✔
288
}
289

290
func selectDbs(db store.DataStore, tenant string) ([]string, error) {
4✔
291
        l := log.NewEmpty()
4✔
292

4✔
293
        var dbs []string
4✔
294

4✔
295
        if tenant != "" {
4✔
296
                l.Infof("propagating deployments history for user-specified tenant %s", tenant)
×
297
                n := mstore.DbNameForTenant(tenant, mongo.DbName)
×
298
                dbs = []string{n}
×
299
        } else {
4✔
300
                l.Infof("propagating deployments history for all tenants")
4✔
301

4✔
302
                // infer if we're in ST or MT
4✔
303
                tdbs, err := db.GetTenantDbs()
4✔
304
                if err != nil {
4✔
305
                        return nil, errors.Wrap(err, "failed to retrieve tenant DBs")
×
306
                }
×
307

308
                if len(tdbs) == 0 {
4✔
309
                        l.Infof("no tenant DBs found - will try the default database %s", mongo.DbName)
×
310
                        dbs = []string{mongo.DbName}
×
311
                } else {
4✔
312
                        dbs = tdbs
4✔
313
                }
4✔
314
        }
315

316
        return dbs, nil
4✔
317
}
318

319
func tryPropagateReportingForDb(
320
        db store.DataStore,
321
        wflows workflows.Client,
322
        dbname string,
323
        throttleDuration time.Duration,
324
        dryRun bool,
325
) error {
4✔
326
        l := log.NewEmpty()
4✔
327

4✔
328
        l.Infof("propagating deployments data to reporting from DB: %s", dbname)
4✔
329

4✔
330
        tenant := mstore.TenantFromDbName(dbname, mongo.DbName)
4✔
331

4✔
332
        ctx := context.Background()
4✔
333
        if tenant != "" {
4✔
334
                ctx = identity.WithContext(ctx, &identity.Identity{
×
335
                        Tenant: tenant,
×
336
                })
×
337
        }
×
338

339
        err := reindexDeploymentsReporting(ctx, db, wflows, tenant, throttleDuration, dryRun)
4✔
340
        if err != nil {
4✔
341
                l.Infof("Done with DB %s, but there were errors: %s.", dbname, err.Error())
×
342
        } else {
4✔
343
                l.Infof("Done with DB %s", dbname)
4✔
344
        }
4✔
345

346
        return err
4✔
347
}
348

349
func reindexDeploymentsReporting(
350
        ctx context.Context,
351
        db store.DataStore,
352
        wflows workflows.Client,
353
        tenant string,
354
        throttleDuration time.Duration,
355
        dryRun bool,
356
) error {
4✔
357
        var skip int
4✔
358

4✔
359
        nextIterAt := time.Now()
4✔
360
        skip = 0
4✔
361
        for {
8✔
362
                dd, err := db.GetDeviceDeployments(ctx, skip, deviceDeploymentsBatchSize, "", nil, true)
4✔
363
                if err != nil {
4✔
364
                        return errors.Wrap(err, "failed to get device deployments")
×
365
                }
×
366

367
                if len(dd) < 1 {
4✔
368
                        break
×
369
                }
370

371
                if !dryRun {
6✔
372
                        deviceDeployments := make([]workflows.DeviceDeploymentShortInfo, len(dd))
2✔
373
                        for i, d := range dd {
6✔
374
                                deviceDeployments[i].ID = d.Id
4✔
375
                                deviceDeployments[i].DeviceID = d.DeviceId
4✔
376
                                deviceDeployments[i].DeploymentID = d.DeploymentId
4✔
377
                        }
4✔
378
                        err := wflows.StartReindexReportingDeploymentBatch(ctx, deviceDeployments)
2✔
379
                        if err != nil {
2✔
380
                                return err
×
381
                        }
×
382
                }
383

384
                skip += deviceDeploymentsBatchSize
4✔
385
                if len(dd) < deviceDeploymentsBatchSize {
8✔
386
                        break
4✔
387
                }
NEW
388
                nextIterAt.Add(throttleDuration)
×
NEW
389
                time.Sleep(time.Until(nextIterAt))
×
390
        }
391
        return nil
4✔
392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc