• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / deviceauth / 826658230

pending completion
826658230

Pull #638

gitlab-ci

Peter Grzybowski
chore: moving to single db
Pull Request #638: chore: moving to single db

334 of 405 new or added lines in 5 files covered. (82.47%)

38 existing lines in 3 files now uncovered.

4669 of 5588 relevant lines covered (83.55%)

75.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.9
/cmd/commands.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//        Licensed under the Apache License, Version 2.0 (the "License");
4
//        you may not use this file except in compliance with the License.
5
//        You may obtain a copy of the License at
6
//
7
//            http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//        Unless required by applicable law or agreed to in writing, software
10
//        distributed under the License is distributed on an "AS IS" BASIS,
11
//        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//        See the License for the specific language governing permissions and
13
//        limitations under the License.
14
package cmd
15

16
import (
17
        "context"
18
        "fmt"
19

20
        "github.com/mendersoftware/go-lib-micro/config"
21
        "github.com/mendersoftware/go-lib-micro/identity"
22
        "github.com/mendersoftware/go-lib-micro/log"
23
        "github.com/mendersoftware/go-lib-micro/mongo/migrate"
24
        mstore "github.com/mendersoftware/go-lib-micro/store"
25
        "github.com/pkg/errors"
26

27
        cinv "github.com/mendersoftware/deviceauth/client/inventory"
28
        "github.com/mendersoftware/deviceauth/client/orchestrator"
29
        "github.com/mendersoftware/deviceauth/client/tenant"
30
        dconfig "github.com/mendersoftware/deviceauth/config"
31
        "github.com/mendersoftware/deviceauth/model"
32
        "github.com/mendersoftware/deviceauth/store"
33
        "github.com/mendersoftware/deviceauth/store/mongo"
34
        "github.com/mendersoftware/deviceauth/utils"
35
)
36

37
var NowUnixMilis = utils.UnixMilis
38

39
func makeDataStoreConfig() mongo.DataStoreMongoConfig {
1✔
40
        return mongo.DataStoreMongoConfig{
1✔
41
                ConnectionString: config.Config.GetString(dconfig.SettingDb),
1✔
42

1✔
43
                SSL:           config.Config.GetBool(dconfig.SettingDbSSL),
1✔
44
                SSLSkipVerify: config.Config.GetBool(dconfig.SettingDbSSLSkipVerify),
1✔
45

1✔
46
                Username: config.Config.GetString(dconfig.SettingDbUsername),
1✔
47
                Password: config.Config.GetString(dconfig.SettingDbPassword),
1✔
48
        }
1✔
49

1✔
50
}
1✔
51

52
func Migrate(c config.Reader, tenant string, listTenantsFlag bool) error {
1✔
53
        db, err := mongo.NewDataStoreMongo(makeDataStoreConfig())
1✔
54

1✔
55
        if err != nil {
1✔
56
                return errors.Wrap(err, "failed to connect to db")
×
57
        }
×
58

59
        // list tenants only
60
        if listTenantsFlag {
2✔
61
                return listTenants(db)
1✔
62
        }
1✔
63

64
        db = db.WithAutomigrate().(*mongo.DataStoreMongo)
1✔
65

1✔
66
        if config.Config.Get(dconfig.SettingTenantAdmAddr) != "" {
2✔
67
                db = db.WithMultitenant()
1✔
68
        }
1✔
69

70
        ctx := context.Background()
1✔
71
        if tenant == "" {
2✔
72
                err = db.Migrate(ctx, mongo.DbVersion)
1✔
73
        } else {
2✔
74
                tenantCtx := identity.WithContext(ctx, &identity.Identity{
1✔
75
                        Tenant: tenant,
1✔
76
                })
1✔
77
                dbname := mstore.DbFromContext(tenantCtx, mongo.DbName)
1✔
78
                err = db.MigrateTenant(tenantCtx, dbname, mongo.DbVersion)
1✔
79
        }
1✔
80
        if err != nil {
1✔
81
                return errors.Wrap(err, "failed to run migrations")
×
82
        }
×
83

84
        return nil
1✔
85
}
86

87
func listTenants(db *mongo.DataStoreMongo) error {
1✔
88
        tdbs, err := db.ListTenantsIds(context.Background())
1✔
89
        if err != nil {
1✔
NEW
90
                return errors.Wrap(err, "failed to retrieve tenant ids")
×
91
        }
×
92

93
        for _, tenant := range tdbs {
2✔
94
                fmt.Println(tenant)
1✔
95
        }
1✔
96

97
        return nil
1✔
98
}
99

100
func Maintenance(decommissioningCleanupFlag bool, tenant string, dryRunFlag bool) error {
×
101
        db, err := mongo.NewDataStoreMongo(makeDataStoreConfig())
×
102
        if err != nil {
×
103
                return errors.Wrap(err, "failed to connect to db")
×
104
        }
×
105

106
        return maintenanceWithDataStore(decommissioningCleanupFlag, tenant, dryRunFlag, db)
×
107
}
108

109
func maintenanceWithDataStore(
110
        decommissioningCleanupFlag bool,
111
        tenant string,
112
        dryRunFlag bool,
113
        db *mongo.DataStoreMongo,
114
) error {
16✔
115
        // cleanup devauth database from leftovers after failed decommissioning
16✔
116
        if decommissioningCleanupFlag {
28✔
117
                return decommissioningCleanup(db, tenant, dryRunFlag)
12✔
118
        }
12✔
119

120
        return nil
4✔
121
}
122

123
func decommissioningCleanup(db *mongo.DataStoreMongo, tenant string, dryRunFlag bool) error {
12✔
124
        if dryRunFlag {
18✔
125
                return decommissioningCleanupDryRun(db, tenant)
6✔
126
        } else {
12✔
127
                return decommissioningCleanupExecute(db, tenant)
6✔
128
        }
6✔
129
}
130

131
func decommissioningCleanupDryRun(db *mongo.DataStoreMongo, tenantId string) error {
12✔
132
        //devices
12✔
133
        devices, err := db.GetDevicesBeingDecommissioned(tenantId)
12✔
134
        if err != nil {
12✔
135
                return err
×
136
        }
×
137
        if len(devices) > 0 {
16✔
138
                fmt.Println("devices with decommissioning flag set:")
4✔
139
                for _, dev := range devices {
8✔
140
                        fmt.Println(dev.Id)
4✔
141
                }
4✔
142
        }
143

144
        //auth sets
145
        authSetIds, err := db.GetBrokenAuthSets(tenantId)
12✔
146
        if err != nil {
12✔
147
                return err
×
148
        }
×
149
        if len(authSetIds) > 0 {
16✔
150
                fmt.Println("authentication sets to be removed:")
4✔
151
                for _, authSetId := range authSetIds {
8✔
152
                        fmt.Println(authSetId)
4✔
153
                }
4✔
154
        }
155

156
        return nil
12✔
157
}
158

159
func decommissioningCleanupExecute(db *mongo.DataStoreMongo, tenantId string) error {
6✔
160
        if err := decommissioningCleanupDryRun(db, tenantId); err != nil {
6✔
161
                return err
×
162
        }
×
163

164
        if err := db.DeleteDevicesBeingDecommissioned(tenantId); err != nil {
6✔
165
                return err
×
166
        }
×
167

168
        if err := db.DeleteBrokenAuthSets(tenantId); err != nil {
6✔
169
                return err
×
170
        }
×
171

172
        return nil
6✔
173
}
174

175
func PropagateStatusesInventory(
176
        db store.DataStore,
177
        c cinv.Client,
178
        tenant string,
179
        migrationVersion string,
180
        dryRun bool,
181
) error {
12✔
182
        var err error
12✔
183

12✔
184
        l := log.NewEmpty()
12✔
185
        tenants := []string{tenant}
12✔
186
        if tenant == "" {
20✔
187
                tenants, err = db.ListTenantsIds(context.Background())
8✔
188
                if err != nil {
10✔
189
                        return errors.Wrap(err, "cant list tenants")
2✔
190
                }
2✔
191
        }
192

193
        var errReturned error
10✔
194
        for _, t := range tenants {
32✔
195
                err = tryPropagateStatusesInventoryForTenant(db, c, t, migrationVersion, dryRun)
22✔
196
                if err != nil {
36✔
197
                        errReturned = err
14✔
198
                        l.Errorf("giving up on tenant %s due to fatal error: %s", t, err.Error())
14✔
199
                        continue
14✔
200
                }
201
        }
202

203
        l.Info("all tenants processed, exiting.")
10✔
204
        return errReturned
10✔
205
}
206

207
func PropagateIdDataInventory(db store.DataStore, c cinv.Client, tenant string, dryRun bool) error {
×
208
        l := log.NewEmpty()
×
209

×
210
        dbs, err := selectDbs(db, tenant)
×
211
        if err != nil {
×
212
                return errors.Wrap(err, "aborting")
×
213
        }
×
214

215
        var errReturned error
×
216
        for _, d := range dbs {
×
217
                err := tryPropagateIdDataInventoryForDb(db, c, d, dryRun)
×
218
                if err != nil {
×
219
                        errReturned = err
×
220
                        l.Errorf("giving up on DB %s due to fatal error: %s", d, err.Error())
×
221
                        continue
×
222
                }
223
        }
224

225
        l.Info("all DBs processed, exiting.")
×
226
        return errReturned
×
227
}
228

229
func PropagateReporting(db store.DataStore, wflows orchestrator.ClientRunner, tenant string,
230
        dryRun bool) error {
8✔
231
        l := log.NewEmpty()
8✔
232

8✔
233
        mapFunc := func(ctx context.Context) error {
24✔
234
                id := identity.FromContext(ctx)
16✔
235
                if id == nil || id.Tenant == "" {
16✔
NEW
236
                        // Not a tenant db - skip!
×
NEW
237
                        return nil
×
NEW
238
                }
×
239
                tenantId := id.Tenant
16✔
240
                return tryPropagateReportingForTenant(db, wflows, tenantId, dryRun)
16✔
241
        }
242
        if tenant != "" {
8✔
NEW
243
                ctx := identity.WithContext(context.Background(),
×
NEW
244
                        &identity.Identity{
×
NEW
245
                                Tenant: tenant,
×
NEW
246
                        },
×
NEW
247
                )
×
NEW
248
                err := mapFunc(ctx)
×
UNCOV
249
                if err != nil {
×
NEW
250
                        return errors.Wrap(err, "failed to propagate for given tenant")
×
UNCOV
251
                }
×
NEW
252
                l.Infof("tenant processed, exiting.")
×
253
        } else {
8✔
254
                err := db.ForEachTenant(context.Background(), mapFunc)
8✔
255
                if err != nil {
8✔
NEW
256
                        return errors.Wrap(err, "failed to propagate for all tenant")
×
NEW
257
                }
×
258
                l.Info("all tenants processed, exiting.")
8✔
259
        }
260
        return nil
8✔
261
}
262

UNCOV
263
func selectDbs(db store.DataStore, tenant string) ([]string, error) {
×
UNCOV
264
        l := log.NewEmpty()
×
UNCOV
265

×
UNCOV
266
        var dbs []string
×
UNCOV
267

×
UNCOV
268
        if tenant != "" {
×
UNCOV
269
                l.Infof("propagating inventory for user-specified tenant %s", tenant)
×
UNCOV
270
                n := mstore.DbNameForTenant(tenant, mongo.DbName)
×
UNCOV
271
                dbs = []string{n}
×
UNCOV
272
        }
×
273

UNCOV
274
        return dbs, nil
×
275
}
276

277
const (
278
        devicesBatchSize = 512
279
)
280

281
func updateDevicesStatus(
282
        ctx context.Context,
283
        db store.DataStore,
284
        c cinv.Client,
285
        tenant string,
286
        status string,
287
        dryRun bool,
288
) error {
110✔
289
        var skip uint
110✔
290

110✔
291
        skip = 0
110✔
292
        for {
220✔
293
                devices, err := db.GetDevices(ctx,
110✔
294
                        skip,
110✔
295
                        devicesBatchSize,
110✔
296
                        model.DeviceFilter{Status: []string{status}},
110✔
297
                )
110✔
298
                if err != nil {
140✔
299
                        return errors.Wrap(err, "failed to get devices")
30✔
300
                }
30✔
301

302
                if len(devices) < 1 {
80✔
303
                        break
×
304
                }
305

306
                deviceUpdates := make([]model.DeviceInventoryUpdate, len(devices))
80✔
307

80✔
308
                for i, d := range devices {
240✔
309
                        deviceUpdates[i].Id = d.Id
160✔
310
                        deviceUpdates[i].Revision = d.Revision
160✔
311
                }
160✔
312

313
                if !dryRun {
160✔
314
                        err = c.SetDeviceStatus(ctx, tenant, deviceUpdates, status)
80✔
315
                        if err != nil {
110✔
316
                                return err
30✔
317
                        }
30✔
318
                }
319

320
                if len(devices) < devicesBatchSize {
100✔
321
                        break
50✔
322
                } else {
×
323
                        skip += devicesBatchSize
×
324
                }
×
325
        }
326
        return nil
50✔
327
}
328

329
func updateDevicesIdData(
330
        ctx context.Context,
331
        db store.DataStore,
332
        c cinv.Client,
333
        tenant string,
334
        dryRun bool,
335
) error {
×
336
        var skip uint
×
337

×
338
        skip = 0
×
339
        for {
×
340
                devices, err := db.GetDevices(ctx, skip, devicesBatchSize, model.DeviceFilter{})
×
341
                if err != nil {
×
342
                        return errors.Wrap(err, "failed to get devices")
×
343
                }
×
344

345
                if len(devices) < 1 {
×
346
                        break
×
347
                }
348

349
                if !dryRun {
×
350
                        for _, d := range devices {
×
351
                                err := c.SetDeviceIdentity(ctx, tenant, d.Id, d.IdDataStruct)
×
352
                                if err != nil {
×
353
                                        return err
×
354
                                }
×
355
                        }
356
                }
357

358
                skip += devicesBatchSize
×
359
                if len(devices) < devicesBatchSize {
×
360
                        break
×
361
                }
362
        }
363
        return nil
×
364
}
365

366
func tryPropagateStatusesInventoryForTenant(
367
        db store.DataStore,
368
        c cinv.Client,
369
        tenant string,
370
        migrationVersion string,
371
        dryRun bool,
372
) error {
22✔
373
        l := log.NewEmpty()
22✔
374

22✔
375
        l.Infof("propagating device statuses to inventory from tenant: %s", tenant)
22✔
376

22✔
377
        ctx := context.Background()
22✔
378
        if tenant != "" {
44✔
379
                ctx = identity.WithContext(ctx, &identity.Identity{
22✔
380
                        Tenant: tenant,
22✔
381
                })
22✔
382
        }
22✔
383

384
        var err error
22✔
385
        var errReturned error
22✔
386
        for _, status := range model.DevStatuses {
132✔
387
                err = updateDevicesStatus(ctx, db, c, tenant, status, dryRun)
110✔
388
                if err != nil {
170✔
389
                        l.Infof(
60✔
390
                                "Done with tenant %s status=%s, but there were errors: %s.",
60✔
391
                                tenant,
60✔
392
                                status,
60✔
393
                                err.Error(),
60✔
394
                        )
60✔
395
                        errReturned = err
60✔
396
                } else {
110✔
397
                        l.Infof("Done with tenant %s status=%s", tenant, status)
50✔
398
                }
50✔
399
        }
400
        if migrationVersion != "" && !dryRun {
26✔
401
                if errReturned != nil {
4✔
402
                        l.Warnf(
×
NEW
403
                                "Will not store %s migration version for tenant %s due to errors.",
×
404
                                migrationVersion,
×
NEW
405
                                tenant,
×
406
                        )
×
407
                } else {
4✔
408
                        version, err := migrate.NewVersion(migrationVersion)
4✔
409
                        if version == nil || err != nil {
6✔
410
                                l.Warnf(
2✔
411
                                        "Will not store %s migration version in %s.migration_info due to bad version"+
2✔
412
                                                " provided.",
2✔
413
                                        migrationVersion,
2✔
414
                                        tenant,
2✔
415
                                )
2✔
416
                                errReturned = err
2✔
417
                        } else {
4✔
418
                                _ = db.StoreMigrationVersion(ctx, version)
2✔
419
                        }
2✔
420
                }
421
        }
422

423
        return errReturned
22✔
424
}
425

426
func tryPropagateIdDataInventoryForDb(
427
        db store.DataStore,
428
        c cinv.Client,
429
        dbname string,
430
        dryRun bool,
431
) error {
×
432
        l := log.NewEmpty()
×
433

×
434
        l.Infof("propagating device id_data to inventory from DB: %s", dbname)
×
435

×
436
        tenant := mstore.TenantFromDbName(dbname, mongo.DbName)
×
437

×
438
        ctx := context.Background()
×
439
        if tenant != "" {
×
440
                ctx = identity.WithContext(ctx, &identity.Identity{
×
441
                        Tenant: tenant,
×
442
                })
×
443
        }
×
444

445
        err := updateDevicesIdData(ctx, db, c, tenant, dryRun)
×
446
        if err != nil {
×
447
                l.Infof("Done with DB %s, but there were errors: %s.", dbname, err.Error())
×
448
        } else {
×
449
                l.Infof("Done with DB %s", dbname)
×
450
        }
×
451

452
        return err
×
453
}
454

455
func tryPropagateReportingForTenant(
456
        db store.DataStore,
457
        wflows orchestrator.ClientRunner,
458
        tenant string,
459
        dryRun bool,
460
) error {
16✔
461
        l := log.NewEmpty()
16✔
462

16✔
463
        l.Infof("propagating device data to reporting for tenant %s", tenant)
16✔
464

16✔
465
        ctx := context.Background()
16✔
466
        if tenant != "" {
32✔
467
                ctx = identity.WithContext(ctx, &identity.Identity{
16✔
468
                        Tenant: tenant,
16✔
469
                })
16✔
470
        } else {
16✔
NEW
471
                return errors.New("you must provide a tenant id")
×
UNCOV
472
        }
×
473

474
        err := reindexDevicesReporting(ctx, db, wflows, dryRun)
16✔
475
        if err != nil {
20✔
476
                l.Infof("Done with tenant %s, but there were errors: %s.", tenant, err.Error())
4✔
477
        } else {
16✔
478
                l.Infof("Done with tenant %s", tenant)
12✔
479
        }
12✔
480

481
        return err
16✔
482
}
483

484
func reindexDevicesReporting(
485
        ctx context.Context,
486
        db store.DataStore,
487
        wflows orchestrator.ClientRunner,
488
        dryRun bool,
489
) error {
16✔
490
        var skip uint
16✔
491

16✔
492
        skip = 0
16✔
493
        for {
32✔
494
                devices, err := db.GetDevices(ctx, skip, devicesBatchSize, model.DeviceFilter{})
16✔
495
                if err != nil {
18✔
496
                        return errors.Wrap(err, "failed to get devices")
2✔
497
                }
2✔
498

499
                if len(devices) < 1 {
14✔
500
                        break
×
501
                }
502

503
                if !dryRun {
22✔
504
                        deviceIDs := make([]string, len(devices))
8✔
505
                        for i, d := range devices {
22✔
506
                                deviceIDs[i] = d.Id
14✔
507
                        }
14✔
508
                        err := wflows.SubmitReindexReportingBatch(ctx, deviceIDs)
8✔
509
                        if err != nil {
10✔
510
                                return err
2✔
511
                        }
2✔
512
                }
513

514
                skip += devicesBatchSize
12✔
515
                if len(devices) < devicesBatchSize {
24✔
516
                        break
12✔
517
                }
518
        }
519
        return nil
12✔
520
}
521

522
const (
523
        WorkflowsDeviceLimitText    = "@/etc/workflows-enterprise/data/device_limit_email.txt"
524
        WorkflowsDeviceLimitHTML    = "@/etc/workflows-enterprise/data/device_limit_email.html"
525
        WorkflowsDeviceLimitSubject = "Device limit almost reached"
526
)
527

528
func warnTenantUsers(
529
        ctx context.Context,
530
        tenantID string,
531
        tadm tenant.ClientRunner,
532
        wflows orchestrator.ClientRunner,
533
        remainingDevices uint,
534
) error {
11✔
535
        users, err := tadm.GetTenantUsers(ctx, tenantID)
11✔
536
        if err != nil {
13✔
537
                // Log the event and continue with the other tenants
2✔
538
                return err
2✔
539
        }
2✔
540
        for i := range users {
24✔
541
                warnWFlow := orchestrator.DeviceLimitWarning{
15✔
542
                        RequestID:      "deviceAuthAdmin",
15✔
543
                        RecipientEmail: users[i].Email,
15✔
544

15✔
545
                        Subject:          WorkflowsDeviceLimitSubject,
15✔
546
                        Body:             WorkflowsDeviceLimitText,
15✔
547
                        BodyHTML:         WorkflowsDeviceLimitHTML,
15✔
548
                        RemainingDevices: &remainingDevices,
15✔
549
                }
15✔
550
                err = wflows.SubmitDeviceLimitWarning(ctx, warnWFlow)
15✔
551
                if err != nil {
17✔
552
                        return err
2✔
553
                }
2✔
554
        }
555
        return nil
7✔
556
}
557

558
// CheckDeviceLimits goes through all tenant databases and checks if the number
559
// of accepted devices is above a given threshold (in %) and sends an email
560
// to all registered users registered under the given tenant.
561
func CheckDeviceLimits(
562
        threshold float64,
563
        ds store.DataStore,
564
        tadm tenant.ClientRunner,
565
        wflows orchestrator.ClientRunner,
566
) error {
11✔
567
        // Sanitize threshold
11✔
568
        if threshold > 100.0 {
13✔
569
                threshold = 100.0
2✔
570
        } else if threshold < 0.0 {
13✔
571
                threshold = 0.0
2✔
572
        }
2✔
573
        threshProportion := threshold / 100.0
11✔
574

11✔
575
        // mapFunc is applied to all existing databases in datastore.
11✔
576
        mapFunc := func(ctx context.Context) error {
36✔
577
                id := identity.FromContext(ctx)
25✔
578
                if id == nil || id.Tenant == "" {
27✔
579
                        // Not a tenant db - skip!
2✔
580
                        return nil
2✔
581
                }
2✔
582
                tenantID := id.Tenant
23✔
583
                l := log.FromContext(ctx)
23✔
584

23✔
585
                lim, err := ds.GetLimit(ctx, model.LimitMaxDeviceCount)
23✔
586
                if err != nil {
25✔
587
                        return err
2✔
588
                }
2✔
589
                n, err := ds.GetDevCountByStatus(ctx, model.DevStatusAccepted)
21✔
590
                if err != nil {
23✔
591
                        return err
2✔
592
                }
2✔
593
                if float64(n) >= (float64(lim.Value) * threshProportion) {
30✔
594
                        // User is above limit
11✔
595

11✔
596
                        remainingUsers := uint(n) - uint(lim.Value)
11✔
597
                        err := warnTenantUsers(ctx, tenantID, tadm, wflows, remainingUsers)
11✔
598
                        if err != nil {
15✔
599
                                l.Warnf(`Failed to warn tenant "%s" `+
4✔
600
                                        `users nearing device limit: %s`,
4✔
601
                                        tenantID, err.Error(),
4✔
602
                                )
4✔
603
                        }
4✔
604
                }
605
                return nil
19✔
606
        }
607
        // Start looping through the databases.
608
        return ds.ForEachTenant(
11✔
609
                context.Background(),
11✔
610
                mapFunc,
11✔
611
        )
11✔
612
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc