• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / deployments / 890874106

pending completion
890874106

Pull #867

gitlab-ci

kjaskiewiczz
feat: make releases persistent in the database

Changelog: Title
Ticket: MEN-5180

Signed-off-by: Krzysztof Jaskiewicz <krzysztof.jaskiewicz@northern.tech>
Pull Request #867: feat: make releases persistent in the database

81 of 239 new or added lines in 5 files covered. (33.89%)

1 existing line in 1 file now uncovered.

7174 of 9174 relevant lines covered (78.2%)

67.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.57
/main.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package main
16

17
import (
18
        "context"
19
        "fmt"
20
        "os"
21
        "time"
22

23
        "github.com/mendersoftware/go-lib-micro/config"
24
        "github.com/mendersoftware/go-lib-micro/identity"
25
        "github.com/mendersoftware/go-lib-micro/log"
26
        mstore "github.com/mendersoftware/go-lib-micro/store"
27
        "github.com/pkg/errors"
28
        "github.com/urfave/cli"
29

30
        "github.com/mendersoftware/deployments/app"
31
        "github.com/mendersoftware/deployments/client/workflows"
32
        dconfig "github.com/mendersoftware/deployments/config"
33
        "github.com/mendersoftware/deployments/store"
34
        "github.com/mendersoftware/deployments/store/mongo"
35
)
36

37
const (
38
        deviceDeploymentsBatchSize = 512
39

40
        cliDefaultRateLimit = 50
41
)
42

43
func main() {
×
44
        doMain(os.Args)
×
45
}
×
46

47
func doMain(args []string) {
1✔
48

1✔
49
        var configPath string
1✔
50

1✔
51
        app := cli.NewApp()
1✔
52
        app.Usage = "Deployments Service"
1✔
53

1✔
54
        app.Flags = []cli.Flag{
1✔
55
                cli.StringFlag{
1✔
56
                        Name: "config",
1✔
57
                        Usage: "Configuration `FILE`." +
1✔
58
                                " Supports JSON, TOML, YAML and HCL formatted configs.",
1✔
59
                        Destination: &configPath,
1✔
60
                },
1✔
61
        }
1✔
62

1✔
63
        app.Commands = []cli.Command{
1✔
64
                {
1✔
65
                        Name:  "server",
1✔
66
                        Usage: "Run the service as a server",
1✔
67
                        Flags: []cli.Flag{
1✔
68
                                cli.BoolFlag{
1✔
69
                                        Name:  "automigrate",
1✔
70
                                        Usage: "Run database migrations before starting.",
1✔
71
                                },
1✔
72
                        },
1✔
73

1✔
74
                        Action: cmdServer,
1✔
75
                },
1✔
76
                {
1✔
77
                        Name:  "migrate",
1✔
78
                        Usage: "Run migrations and exit",
1✔
79
                        Flags: []cli.Flag{
1✔
80
                                cli.StringFlag{
1✔
81
                                        Name:  "tenant",
1✔
82
                                        Usage: "Tenant ID (optional).",
1✔
83
                                },
1✔
84
                        },
1✔
85

1✔
86
                        Action: cmdMigrate,
1✔
87
                },
1✔
88
                {
1✔
89
                        Name:  "propagate-reporting",
1✔
90
                        Usage: "Trigger a reindex of all the device deployments in the reporting services ",
1✔
91
                        Flags: []cli.Flag{
1✔
92
                                cli.StringFlag{
1✔
93
                                        Name:  "tenant_id",
1✔
94
                                        Usage: "Tenant ID (optional) - propagate for just a single tenant.",
1✔
95
                                },
1✔
96
                                cli.UintFlag{
1✔
97
                                        Name:  "rate-limit",
1✔
98
                                        Usage: "`N`umber of reindexing batch requests per second",
1✔
99
                                        Value: cliDefaultRateLimit,
1✔
100
                                },
1✔
101
                                cli.BoolFlag{
1✔
102
                                        Name: "dry-run",
1✔
103
                                        Usage: "Do not perform any modifications," +
1✔
104
                                                " just scan and print devices.",
1✔
105
                                },
1✔
106
                        },
1✔
107

1✔
108
                        Action: cmdPropagateReporting,
1✔
109
                },
1✔
110
                {
1✔
111
                        Name:  "storage-daemon",
1✔
112
                        Usage: "Start storage daemon cleaning up expired objects from storage",
1✔
113
                        Flags: []cli.Flag{
1✔
114
                                cli.DurationFlag{
1✔
115
                                        Name: "interval",
1✔
116
                                        Usage: "Time interval to run cleanup routine; " +
1✔
117
                                                "a value of 0 runs the daemon for one " +
1✔
118
                                                "iteration and terminates (cron mode).",
1✔
119
                                        Value: 0,
1✔
120
                                },
1✔
121
                                cli.DurationFlag{
1✔
122
                                        Name: "time-jitter",
1✔
123
                                        Usage: "The time jitter added for expired links. " +
1✔
124
                                                "Links must be expired for `DURATION` " +
1✔
125
                                                "to be removed.",
1✔
126
                                        Value: time.Second * 3,
1✔
127
                                },
1✔
128
                        },
1✔
129
                        Action: cmdStorageDaemon,
1✔
130
                },
1✔
131
        }
1✔
132

1✔
133
        app.Action = cmdServer
1✔
134
        app.Before = func(args *cli.Context) error {
2✔
135
                if err := dconfig.Setup(configPath); err != nil {
1✔
136
                        return cli.NewExitError(err.Error(), 1)
×
137
                }
×
138

139
                return nil
1✔
140
        }
141

142
        err := app.Run(args)
1✔
143
        if err != nil {
1✔
144
                log.NewEmpty().Fatal(err.Error())
×
145
        }
×
146
}
147

148
func cmdServer(args *cli.Context) error {
1✔
149
        devSetup := args.GlobalBool("dev")
1✔
150

1✔
151
        l := log.New(log.Ctx{})
1✔
152

1✔
153
        if devSetup {
1✔
154
                l.Infof("setting up development configuration")
×
155
                config.Config.Set(dconfig.SettingMiddleware, dconfig.EnvDev)
×
156
        }
×
157

158
        l.Print("Deployments Service starting up")
1✔
159
        err := migrate("", args.Bool("automigrate"))
1✔
160
        if err != nil {
1✔
161
                return err
×
162
        }
×
163

164
        setupContext, cancel := context.WithTimeout(
1✔
165
                context.Background(),
1✔
166
                time.Second*30,
1✔
167
        )
1✔
168
        err = RunServer(setupContext)
1✔
169
        cancel()
1✔
170
        if err != nil {
1✔
171
                return cli.NewExitError(err.Error(), 4)
×
172
        }
×
173

174
        return nil
×
175
}
176

177
func cmdMigrate(args *cli.Context) error {
1✔
178
        tenant := args.String("tenant")
1✔
179
        return migrate(tenant, true)
1✔
180
}
1✔
181

182
func migrate(tenant string, automigrate bool) error {
1✔
183
        ctx := context.Background()
1✔
184

1✔
185
        dbClient, err := mongo.NewMongoClient(ctx, config.Config)
1✔
186
        if err != nil {
1✔
187
                return cli.NewExitError(
×
188
                        fmt.Sprintf("failed to connect to db: %v", err),
×
189
                        3)
×
190
        }
×
191
        defer func() {
2✔
192
                _ = dbClient.Disconnect(ctx)
1✔
193
        }()
1✔
194

195
        dbVersion := mongo.DbVersion
1✔
196
        if !automigrate {
1✔
NEW
197
                dbVersion = mongo.DbMinimumVersion
×
NEW
198
        }
×
199

200
        if tenant != "" {
1✔
201
                db := mstore.DbNameForTenant(tenant, mongo.DbName)
×
NEW
202
                err = mongo.MigrateSingle(ctx, db, dbVersion, dbClient, automigrate)
×
203
        } else {
1✔
204
                err = mongo.Migrate(ctx, dbVersion, dbClient, automigrate)
1✔
205
        }
1✔
206
        if err != nil {
1✔
207
                return cli.NewExitError(
×
208
                        fmt.Sprintf("failed to run migrations: %v", err),
×
209
                        3)
×
210
        }
×
211

212
        return nil
1✔
213
}
214

215
func cmdStorageDaemon(args *cli.Context) error {
×
216
        ctx := context.Background()
×
217
        objectStorage, err := SetupObjectStorage(ctx)
×
218
        if err != nil {
×
219
                return err
×
220
        }
×
221
        mgo, err := mongo.NewMongoClient(ctx, config.Config)
×
222
        if err != nil {
×
223
                return err
×
224
        }
×
225
        database := mongo.NewDataStoreMongoWithClient(mgo)
×
226
        app := app.NewDeployments(database, objectStorage)
×
227
        return app.CleanupExpiredUploads(
×
228
                ctx,
×
229
                args.Duration("interval"),
×
230
                args.Duration("time-jitter"),
×
231
        )
×
232
}
233

234
func cmdPropagateReporting(args *cli.Context) error {
×
235
        if config.Config.GetString(dconfig.SettingReportingAddr) == "" {
×
236
                return cli.NewExitError(errors.New("reporting address not configured"), 1)
×
237
        }
×
238
        c := config.Config
×
239
        ctx, cancel := context.WithTimeout(
×
240
                context.Background(),
×
241
                time.Second*30,
×
242
        )
×
243
        defer cancel()
×
244
        dbClient, err := mongo.NewMongoClient(ctx, c)
×
245
        if err != nil {
×
246
                return err
×
247
        }
×
248
        defer func() {
×
249
                _ = dbClient.Disconnect(context.Background())
×
250
        }()
×
251

252
        db := mongo.NewDataStoreMongoWithClient(dbClient)
×
253

×
254
        wflows := workflows.NewClient()
×
255

×
256
        var requestPeriod time.Duration
×
257
        rateLimit := args.Uint("rate-limit")
×
258
        if rateLimit > 0 {
×
259
                requestPeriod = time.Second / time.Duration(args.Uint("rate-limit"))
×
260
        }
×
261

262
        err = propagateReporting(
×
263
                db,
×
264
                wflows,
×
265
                args.String("tenant_id"),
×
266
                requestPeriod,
×
267
                args.Bool("dry-run"),
×
268
        )
×
269
        if err != nil {
×
270
                return cli.NewExitError(err, 7)
×
271
        }
×
272
        return nil
×
273
}
274

275
func propagateReporting(
276
        db store.DataStore,
277
        wflows workflows.Client,
278
        tenant string,
279
        requestPeriod time.Duration,
280
        dryRun bool,
281
) error {
4✔
282
        l := log.NewEmpty()
4✔
283

4✔
284
        dbs, err := selectDbs(db, tenant)
4✔
285
        if err != nil {
4✔
286
                return errors.Wrap(err, "aborting")
×
287
        }
×
288

289
        var errReturned error
4✔
290
        for _, d := range dbs {
8✔
291
                err := tryPropagateReportingForDb(db, wflows, d, requestPeriod, dryRun)
4✔
292
                if err != nil {
4✔
293
                        errReturned = err
×
294
                        l.Errorf("giving up on DB %s due to fatal error: %s", d, err.Error())
×
295
                        continue
×
296
                }
297
        }
298

299
        l.Info("all DBs processed, exiting.")
4✔
300
        return errReturned
4✔
301
}
302

303
func selectDbs(db store.DataStore, tenant string) ([]string, error) {
4✔
304
        l := log.NewEmpty()
4✔
305

4✔
306
        var dbs []string
4✔
307

4✔
308
        if tenant != "" {
4✔
309
                l.Infof("propagating deployments history for user-specified tenant %s", tenant)
×
310
                n := mstore.DbNameForTenant(tenant, mongo.DbName)
×
311
                dbs = []string{n}
×
312
        } else {
4✔
313
                l.Infof("propagating deployments history for all tenants")
4✔
314

4✔
315
                // infer if we're in ST or MT
4✔
316
                tdbs, err := db.GetTenantDbs()
4✔
317
                if err != nil {
4✔
318
                        return nil, errors.Wrap(err, "failed to retrieve tenant DBs")
×
319
                }
×
320

321
                if len(tdbs) == 0 {
4✔
322
                        l.Infof("no tenant DBs found - will try the default database %s", mongo.DbName)
×
323
                        dbs = []string{mongo.DbName}
×
324
                } else {
4✔
325
                        dbs = tdbs
4✔
326
                }
4✔
327
        }
328

329
        return dbs, nil
4✔
330
}
331

332
func tryPropagateReportingForDb(
333
        db store.DataStore,
334
        wflows workflows.Client,
335
        dbname string,
336
        requestPeriod time.Duration,
337
        dryRun bool,
338
) error {
4✔
339
        l := log.NewEmpty()
4✔
340

4✔
341
        l.Infof("propagating deployments data to reporting from DB: %s", dbname)
4✔
342

4✔
343
        tenant := mstore.TenantFromDbName(dbname, mongo.DbName)
4✔
344

4✔
345
        ctx := context.Background()
4✔
346
        if tenant != "" {
4✔
347
                ctx = identity.WithContext(ctx, &identity.Identity{
×
348
                        Tenant: tenant,
×
349
                })
×
350
        }
×
351

352
        err := reindexDeploymentsReporting(ctx, db, wflows, tenant, requestPeriod, dryRun)
4✔
353
        if err != nil {
4✔
354
                l.Infof("Done with DB %s, but there were errors: %s.", dbname, err.Error())
×
355
        } else {
4✔
356
                l.Infof("Done with DB %s", dbname)
4✔
357
        }
4✔
358

359
        return err
4✔
360
}
361

362
func reindexDeploymentsReporting(
363
        ctx context.Context,
364
        db store.DataStore,
365
        wflows workflows.Client,
366
        tenant string,
367
        requestPeriod time.Duration,
368
        dryRun bool,
369
) error {
4✔
370
        var skip int
4✔
371

4✔
372
        done := ctx.Done()
4✔
373
        ticker := time.NewTicker(requestPeriod)
4✔
374
        defer ticker.Stop()
4✔
375
        skip = 0
4✔
376
        for {
8✔
377
                dd, err := db.GetDeviceDeployments(ctx, skip, deviceDeploymentsBatchSize, "", nil, true)
4✔
378
                if err != nil {
4✔
379
                        return errors.Wrap(err, "failed to get device deployments")
×
380
                }
×
381

382
                if len(dd) < 1 {
4✔
383
                        break
×
384
                }
385

386
                if !dryRun {
6✔
387
                        deviceDeployments := make([]workflows.DeviceDeploymentShortInfo, len(dd))
2✔
388
                        for i, d := range dd {
6✔
389
                                deviceDeployments[i].ID = d.Id
4✔
390
                                deviceDeployments[i].DeviceID = d.DeviceId
4✔
391
                                deviceDeployments[i].DeploymentID = d.DeploymentId
4✔
392
                        }
4✔
393
                        err := wflows.StartReindexReportingDeploymentBatch(ctx, deviceDeployments)
2✔
394
                        if err != nil {
2✔
395
                                return err
×
396
                        }
×
397
                }
398

399
                skip += deviceDeploymentsBatchSize
4✔
400
                if len(dd) < deviceDeploymentsBatchSize {
8✔
401
                        break
4✔
402
                }
403
                select {
×
404
                case <-ticker.C:
×
405

406
                case <-done:
×
407
                        return ctx.Err()
×
408
                }
409
        }
410
        return nil
4✔
411
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc