• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / inventory / 951607174

pending completion
951607174

Pull #400

gitlab-ci

merlin-northern
fix: attributes changed: compare timestamps with configurable threshold.

Changelog: Title
Ticket: MEN-6643
Signed-off-by: Peter Grzybowski <peter@northern.tech>
Pull Request #400: fix: attributes changed: compare timestamps with configurable threshold.

11 of 13 new or added lines in 3 files covered. (84.62%)

6 existing lines in 1 file now uncovered.

3240 of 3561 relevant lines covered (90.99%)

137.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.08
/store/mongo/datastore_mongo.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package mongo
16

17
import (
18
        "context"
19
        "crypto/tls"
20
        "fmt"
21
        "io"
22
        "strings"
23
        "sync"
24
        "time"
25

26
        "go.mongodb.org/mongo-driver/bson"
27
        "go.mongodb.org/mongo-driver/bson/primitive"
28
        "go.mongodb.org/mongo-driver/mongo"
29
        mopts "go.mongodb.org/mongo-driver/mongo/options"
30

31
        "github.com/google/uuid"
32
        "github.com/pkg/errors"
33

34
        "github.com/mendersoftware/go-lib-micro/log"
35
        mstore "github.com/mendersoftware/go-lib-micro/store"
36

37
        "github.com/mendersoftware/inventory/model"
38
        "github.com/mendersoftware/inventory/store"
39
        "github.com/mendersoftware/inventory/utils"
40
)
41

42
const (
43
        DbVersion = "1.1.0"
44

45
        DbName        = "inventory"
46
        DbDevicesColl = "devices"
47

48
        DbDevId              = "_id"
49
        DbDevAttributes      = "attributes"
50
        DbDevGroup           = "group"
51
        DbDevRevision        = "revision"
52
        DbDevUpdatedTs       = "updated_ts"
53
        DbDevAttributesText  = "text"
54
        DbDevAttributesTs    = "timestamp"
55
        DbDevAttributesDesc  = "description"
56
        DbDevAttributesValue = "value"
57
        DbDevAttributesScope = "scope"
58
        DbDevAttributesName  = "name"
59
        DbDevAttributesGroup = DbDevAttributes + "." +
60
                model.AttrScopeSystem + "-" + model.AttrNameGroup
61
        DbDevAttributesGroupValue = DbDevAttributesGroup + "." +
62
                DbDevAttributesValue
63

64
        DbScopeInventory = "inventory"
65

66
        FiltersAttributesLimit = 500
67

68
        attrIdentityStatus = "identity-status"
69
)
70

71
var (
72
        //with offcial mongodb supported driver we keep client
73
        clientGlobal *mongo.Client
74

75
        // once ensures client is created only once
76
        once sync.Once
77
)
78

79
type DataStoreMongoConfig struct {
80
        // connection string
81
        ConnectionString string
82

83
        // SSL support
84
        SSL           bool
85
        SSLSkipVerify bool
86

87
        // Overwrites credentials provided in connection string if provided
88
        Username string
89
        Password string
90
}
91

92
type DataStoreMongo struct {
93
        client      *mongo.Client
94
        automigrate bool
95
}
96

97
func NewDataStoreMongoWithSession(client *mongo.Client) store.DataStore {
192✔
98
        return &DataStoreMongo{client: client}
192✔
99
}
192✔
100

101
// config.ConnectionString must contain a valid
102
func NewDataStoreMongo(config DataStoreMongoConfig) (store.DataStore, error) {
5✔
103
        //init master session
5✔
104
        var err error
5✔
105
        once.Do(func() {
9✔
106
                if !strings.Contains(config.ConnectionString, "://") {
8✔
107
                        config.ConnectionString = "mongodb://" + config.ConnectionString
4✔
108
                }
4✔
109
                clientOptions := mopts.Client().ApplyURI(config.ConnectionString)
4✔
110

4✔
111
                if config.Username != "" {
4✔
112
                        clientOptions.SetAuth(mopts.Credential{
×
113
                                Username: config.Username,
×
114
                                Password: config.Password,
×
115
                        })
×
116
                }
×
117

118
                if config.SSL {
4✔
119
                        tlsConfig := &tls.Config{}
×
120
                        tlsConfig.InsecureSkipVerify = config.SSLSkipVerify
×
121
                        clientOptions.SetTLSConfig(tlsConfig)
×
122
                }
×
123

124
                ctx := context.Background()
4✔
125
                l := log.FromContext(ctx)
4✔
126
                clientGlobal, err = mongo.Connect(ctx, clientOptions)
4✔
127
                if err != nil {
4✔
128
                        l.Errorf("mongo: error connecting to mongo '%s'", err.Error())
×
129
                        return
×
130
                }
×
131
                if clientGlobal == nil {
4✔
132
                        l.Errorf("mongo: client is nil. wow.")
×
133
                        return
×
134
                }
×
135
                // from: https://www.mongodb.com/blog/post/mongodb-go-driver-tutorial
136
                /*
137
                        It is best practice to keep a client that is connected to MongoDB around so that the
138
                        application can make use of connection pooling - you don't want to open and close a
139
                        connection for each query. However, if your application no longer requires a connection,
140
                        the connection can be closed with client.Disconnect() like so:
141
                */
142
                err = clientGlobal.Ping(ctx, nil)
4✔
143
                if err != nil {
5✔
144
                        clientGlobal = nil
1✔
145
                        l.Errorf("mongo: error pinging mongo '%s'", err.Error())
1✔
146
                        return
1✔
147
                }
1✔
148
                if clientGlobal == nil {
3✔
149
                        l.Errorf("mongo: global instance of client is nil.")
×
150
                        return
×
151
                }
×
152
        })
153

154
        if clientGlobal == nil {
6✔
155
                return nil, errors.New("failed to open mongo-driver session")
1✔
156
        }
1✔
157
        db := &DataStoreMongo{client: clientGlobal}
4✔
158

4✔
159
        return db, nil
4✔
160
}
161

162
func (db *DataStoreMongo) Ping(ctx context.Context) error {
1✔
163
        res := db.client.Database(DbName).RunCommand(ctx, bson.M{"ping": 1})
1✔
164
        return res.Err()
1✔
165
}
1✔
166

167
func (db *DataStoreMongo) GetDevices(
168
        ctx context.Context,
169
        q store.ListQuery,
170
) ([]model.Device, int, error) {
53✔
171
        c := db.client.Database(mstore.DbFromContext(ctx, DbName)).Collection(DbDevicesColl)
53✔
172

53✔
173
        queryFilters := make([]bson.M, 0)
53✔
174
        for _, filter := range q.Filters {
60✔
175
                op := mongoOperator(filter.Operator)
7✔
176
                name := fmt.Sprintf(
7✔
177
                        "%s-%s",
7✔
178
                        filter.AttrScope,
7✔
179
                        model.GetDeviceAttributeNameReplacer().Replace(filter.AttrName),
7✔
180
                )
7✔
181
                field := fmt.Sprintf("%s.%s.%s", DbDevAttributes, name, DbDevAttributesValue)
7✔
182
                switch filter.Operator {
7✔
183
                default:
7✔
184
                        if filter.ValueFloat != nil {
11✔
185
                                queryFilters = append(queryFilters, bson.M{"$or": []bson.M{
4✔
186
                                        {field: bson.M{op: filter.Value}},
4✔
187
                                        {field: bson.M{op: filter.ValueFloat}},
4✔
188
                                }})
4✔
189
                        } else if filter.ValueTime != nil {
8✔
190
                                queryFilters = append(queryFilters, bson.M{"$or": []bson.M{
1✔
191
                                        {field: bson.M{op: filter.Value}},
1✔
192
                                        {field: bson.M{op: filter.ValueTime}},
1✔
193
                                }})
1✔
194
                        } else {
3✔
195
                                queryFilters = append(queryFilters, bson.M{field: bson.M{op: filter.Value}})
2✔
196
                        }
2✔
197
                }
198
        }
199
        if q.GroupName != "" {
85✔
200
                groupFilter := bson.M{DbDevAttributesGroupValue: q.GroupName}
32✔
201
                queryFilters = append(queryFilters, groupFilter)
32✔
202
        }
32✔
203
        if q.GroupName != "" {
85✔
204
                groupFilter := bson.M{DbDevAttributesGroupValue: q.GroupName}
32✔
205
                queryFilters = append(queryFilters, groupFilter)
32✔
206
        }
32✔
207
        if q.HasGroup != nil {
89✔
208
                groupExistenceFilter := bson.M{
36✔
209
                        DbDevAttributesGroup: bson.M{
36✔
210
                                "$exists": *q.HasGroup,
36✔
211
                        },
36✔
212
                }
36✔
213
                queryFilters = append(queryFilters, groupExistenceFilter)
36✔
214
        }
36✔
215

216
        findQuery := bson.M{}
53✔
217
        if len(queryFilters) > 0 {
96✔
218
                findQuery["$and"] = queryFilters
43✔
219
        }
43✔
220

221
        findOptions := mopts.Find()
53✔
222
        if q.Skip > 0 {
63✔
223
                findOptions.SetSkip(int64(q.Skip))
10✔
224
        }
10✔
225
        if q.Limit > 0 {
101✔
226
                findOptions.SetLimit(int64(q.Limit))
48✔
227
        }
48✔
228
        if q.Sort != nil {
58✔
229
                name := fmt.Sprintf(
5✔
230
                        "%s-%s",
5✔
231
                        q.Sort.AttrScope,
5✔
232
                        model.GetDeviceAttributeNameReplacer().Replace(q.Sort.AttrName),
5✔
233
                )
5✔
234
                sortField := fmt.Sprintf("%s.%s.%s", DbDevAttributes, name, DbDevAttributesValue)
5✔
235
                sortFieldQuery := bson.D{{Key: sortField, Value: 1}}
5✔
236
                if !q.Sort.Ascending {
8✔
237
                        sortFieldQuery[0].Value = -1
3✔
238
                }
3✔
239
                findOptions.SetSort(sortFieldQuery)
5✔
240
        }
241

242
        cursor, err := c.Find(ctx, findQuery, findOptions)
53✔
243
        if err != nil {
53✔
244
                return nil, -1, errors.Wrap(err, "failed to search devices")
×
245
        }
×
246
        defer cursor.Close(ctx)
53✔
247

53✔
248
        devices := []model.Device{}
53✔
249
        if err = cursor.All(ctx, &devices); err != nil {
53✔
250
                return nil, -1, errors.Wrap(err, "failed to search devices")
×
251
        }
×
252

253
        count, err := c.CountDocuments(ctx, findQuery)
53✔
254
        if err != nil {
53✔
255
                return nil, -1, errors.Wrap(err, "failed to count devices")
×
256
        }
×
257

258
        return devices, int(count), nil
53✔
259
}
260

261
func (db *DataStoreMongo) GetDevice(
262
        ctx context.Context,
263
        id model.DeviceID,
264
) (*model.Device, error) {
36✔
265
        var res model.Device
36✔
266
        c := db.client.
36✔
267
                Database(mstore.DbFromContext(ctx, DbName)).
36✔
268
                Collection(DbDevicesColl)
36✔
269
        l := log.FromContext(ctx)
36✔
270

36✔
271
        if id == model.NilDeviceID {
38✔
272
                return nil, nil
2✔
273
        }
2✔
274
        if err := c.FindOne(ctx, bson.M{DbDevId: id}).Decode(&res); err != nil {
38✔
275
                switch err {
4✔
276
                case mongo.ErrNoDocuments:
4✔
277
                        return nil, nil
4✔
278
                default:
×
279
                        l.Errorf("GetDevice: %v", err)
×
280
                        return nil, errors.Wrap(err, "failed to fetch device")
×
281
                }
282
        }
283
        return &res, nil
30✔
284
}
285

286
func (db *DataStoreMongo) GetDevicesById(
287
        ctx context.Context,
288
        id []model.DeviceID,
289
) ([]model.Device, error) {
277✔
290
        var res []model.Device
277✔
291
        c := db.client.
277✔
292
                Database(mstore.DbFromContext(ctx, DbName)).
277✔
293
                Collection(DbDevicesColl)
277✔
294
        l := log.FromContext(ctx)
277✔
295

277✔
296
        if len(id) < 1 {
278✔
297
                return nil, nil
1✔
298
        }
1✔
299
        r, err := c.Find(ctx, bson.M{DbDevId: bson.M{"$in": id}})
276✔
300
        if err != nil {
276✔
301
                switch err {
×
302
                case mongo.ErrNoDocuments:
×
303
                        return nil, nil
×
304
                default:
×
305
                        l.Errorf("GetDevicesById Find: %v", err)
×
306
                        return nil, errors.Wrap(err, "failed to fetch devices")
×
307
                }
308
        }
309
        err = r.All(ctx, &res)
276✔
310
        if err != nil {
276✔
311
                l.Errorf("GetDevicesById deocde: %v", err)
×
312
                return nil, errors.Wrap(err, "failed to decode devices")
×
313
        }
×
314
        return res, nil
276✔
315
}
316

317
// AddDevice inserts a new device, initializing the inventory data.
318
func (db *DataStoreMongo) AddDevice(ctx context.Context, dev *model.Device) error {
256✔
319
        if dev.Group != "" {
296✔
320
                dev.Attributes = append(dev.Attributes, model.DeviceAttribute{
40✔
321
                        Scope: model.AttrScopeSystem,
40✔
322
                        Name:  model.AttrNameGroup,
40✔
323
                        Value: dev.Group,
40✔
324
                })
40✔
325
        }
40✔
326
        _, err := db.UpsertDevicesAttributesWithUpdated(
256✔
327
                ctx, []model.DeviceID{dev.ID}, dev.Attributes, "", "", 0,
256✔
328
        )
256✔
329
        if err != nil {
256✔
330
                return errors.Wrap(err, "failed to store device")
×
331
        }
×
332
        return nil
256✔
333
}
334

335
func (db *DataStoreMongo) UpsertDevicesAttributesWithRevision(
336
        ctx context.Context,
337
        devices []model.DeviceUpdate,
338
        attrs model.DeviceAttributes,
339
) (*model.UpdateResult, error) {
3✔
340
        return db.upsertAttributes(ctx, devices, attrs, false, true, "", "")
3✔
341
}
3✔
342

343
func (db *DataStoreMongo) inventoryNeedsUpdate(
344
        ctx context.Context,
345
        ids []model.DeviceID,
346
        newAttributes model.DeviceAttributes,
347
        lastUpdateSecondsThreshold int64,
348
) []model.DeviceID {
277✔
349
        devicesArray, err := db.GetDevicesById(ctx, ids)
277✔
350
        if devicesArray == nil || err != nil {
533✔
351
                return ids
256✔
352
        }
256✔
353
        devices := make(map[model.DeviceID]model.Device, len(devicesArray))
21✔
354
        for _, d := range devicesArray {
43✔
355
                devices[d.ID] = d
22✔
356
        }
22✔
357

358
        var devicesInNeedOfUpdate []model.DeviceID
21✔
359
        // all the devices that do not exist in the db (not returned
21✔
360
        // by GetDevicesById and not present in devices map)
21✔
361
        // need to be upserted, i.e.: need to be returned
21✔
362
        // from this call, and we add them to the slice here
21✔
363
        for _, id := range ids {
44✔
364
                if _, ok := devices[id]; !ok {
24✔
365
                        devicesInNeedOfUpdate = append(devicesInNeedOfUpdate, id)
1✔
366
                }
1✔
367
        }
368
        for _, device := range devices {
43✔
369
                a := device.Attributes.GetByName(model.AttrNameUpdated)
22✔
370
                if a == nil {
26✔
371
                        devicesInNeedOfUpdate = append(devicesInNeedOfUpdate, device.ID)
4✔
372
                        continue
4✔
373
                }
374

375
                if v, ok := a.Value.(primitive.DateTime); ok {
36✔
376
                        lastUpdateTime := v.Time()
18✔
377
                        now := time.Now()
18✔
378
                        if (now.Unix() - lastUpdateTime.Unix()) > lastUpdateSecondsThreshold {
18✔
NEW
379
                                devicesInNeedOfUpdate = append(devicesInNeedOfUpdate, device.ID)
×
NEW
380
                                continue
×
381
                        }
382
                }
383

384
                if !device.Attributes.Equal(newAttributes) {
36✔
385
                        devicesInNeedOfUpdate = append(devicesInNeedOfUpdate, device.ID)
18✔
386
                        continue
18✔
387
                }
388
        }
389
        return devicesInNeedOfUpdate
21✔
390
}
391

392
func (db *DataStoreMongo) UpsertDevicesAttributesWithUpdated(
393
        ctx context.Context,
394
        ids []model.DeviceID,
395
        attrs model.DeviceAttributes,
396
        scope string,
397
        etag string,
398
        lastUpdateSecondsThreshold int64,
399
) (*model.UpdateResult, error) {
277✔
400
        idsToUpdate := db.inventoryNeedsUpdate(ctx, ids, attrs, lastUpdateSecondsThreshold)
277✔
401
        if len(idsToUpdate) < 1 {
278✔
402
                return nil, nil
1✔
403
        }
1✔
404
        return db.upsertAttributes(ctx, makeDevsWithIds(idsToUpdate), attrs, true, false, scope, etag)
276✔
405
}
406

407
func (db *DataStoreMongo) UpsertDevicesAttributes(
408
        ctx context.Context,
409
        ids []model.DeviceID,
410
        attrs model.DeviceAttributes,
411
) (*model.UpdateResult, error) {
16✔
412
        return db.upsertAttributes(ctx, makeDevsWithIds(ids), attrs, false, false, "", "")
16✔
413
}
16✔
414

415
func makeDevsWithIds(ids []model.DeviceID) []model.DeviceUpdate {
292✔
416
        devices := make([]model.DeviceUpdate, len(ids))
292✔
417
        for i, id := range ids {
591✔
418
                devices[i].Id = id
299✔
419
        }
299✔
420
        return devices
292✔
421
}
422

423
func (db *DataStoreMongo) upsertAttributes(
424
        ctx context.Context,
425
        devices []model.DeviceUpdate,
426
        attrs model.DeviceAttributes,
427
        withUpdated bool,
428
        withRevision bool,
429
        scope string,
430
        etag string,
431
) (*model.UpdateResult, error) {
295✔
432
        const systemScope = DbDevAttributes + "." + model.AttrScopeSystem
295✔
433
        const createdField = systemScope + "-" + model.AttrNameCreated
295✔
434
        const etagField = model.AttrNameTagsEtag
295✔
435
        var (
295✔
436
                result *model.UpdateResult
295✔
437
                filter interface{}
295✔
438
                err    error
295✔
439
        )
295✔
440

295✔
441
        c := db.client.
295✔
442
                Database(mstore.DbFromContext(ctx, DbName)).
295✔
443
                Collection(DbDevicesColl)
295✔
444

295✔
445
        update, err := makeAttrUpsert(attrs)
295✔
446
        if err != nil {
297✔
447
                return nil, err
2✔
448
        }
2✔
449

450
        now := time.Now()
293✔
451
        oninsert := bson.M{
293✔
452
                createdField: model.DeviceAttribute{
293✔
453
                        Scope: model.AttrScopeSystem,
293✔
454
                        Name:  model.AttrNameCreated,
293✔
455
                        Value: now,
293✔
456
                },
293✔
457
        }
293✔
458
        if !withRevision {
583✔
459
                oninsert["revision"] = 0
290✔
460
        }
290✔
461

462
        const updatedField = systemScope + "-" + model.AttrNameUpdated
293✔
463
        if withUpdated {
568✔
464
                update[updatedField] = model.DeviceAttribute{
275✔
465
                        Scope: model.AttrScopeSystem,
275✔
466
                        Name:  model.AttrNameUpdated,
275✔
467
                        Value: now,
275✔
468
                }
275✔
469
        } else {
293✔
470
                oninsert[updatedField] = model.DeviceAttribute{
18✔
471
                        Scope: model.AttrScopeSystem,
18✔
472
                        Name:  model.AttrNameUpdated,
18✔
473
                        Value: now,
18✔
474
                }
18✔
475
        }
18✔
476

477
        switch len(devices) {
293✔
478
        case 0:
1✔
479
                return &model.UpdateResult{}, nil
1✔
480
        case 1:
289✔
481
                filter := bson.M{
289✔
482
                        "_id": devices[0].Id,
289✔
483
                }
289✔
484
                updateOpts := mopts.FindOneAndUpdate().
289✔
485
                        SetUpsert(true).
289✔
486
                        SetReturnDocument(mopts.After)
289✔
487

289✔
488
                if withRevision {
291✔
489
                        filter[DbDevRevision] = bson.M{"$lt": devices[0].Revision}
2✔
490
                        update[DbDevRevision] = devices[0].Revision
2✔
491
                }
2✔
492
                if scope == model.AttrScopeTags {
294✔
493
                        update[etagField] = uuid.New().String()
5✔
494
                        updateOpts = mopts.FindOneAndUpdate().
5✔
495
                                SetUpsert(false).
5✔
496
                                SetReturnDocument(mopts.After)
5✔
497
                }
5✔
498
                if etag != "" {
290✔
499
                        filter[etagField] = bson.M{"$eq": etag}
1✔
500
                }
1✔
501

502
                update = bson.M{
289✔
503
                        "$set":         update,
289✔
504
                        "$setOnInsert": oninsert,
289✔
505
                }
289✔
506

289✔
507
                device := &model.Device{}
289✔
508
                res := c.FindOneAndUpdate(ctx, filter, update, updateOpts)
289✔
509
                err = res.Decode(device)
289✔
510
                if err != nil {
291✔
511
                        if mongo.IsDuplicateKeyError(err) {
3✔
512
                                return nil, store.ErrWriteConflict
1✔
513
                        } else if err == mongo.ErrNoDocuments {
3✔
514
                                return &model.UpdateResult{}, nil
1✔
515
                        } else {
1✔
516
                                return nil, err
×
517
                        }
×
518
                }
519
                result = &model.UpdateResult{
287✔
520
                        MatchedCount: 1,
287✔
521
                        CreatedCount: 0,
287✔
522
                        Devices:      []*model.Device{device},
287✔
523
                }
287✔
524
        default:
3✔
525
                var bres *mongo.BulkWriteResult
3✔
526
                // Perform single bulk-write operation
3✔
527
                // NOTE: Can't use UpdateMany as $in query operator does not
3✔
528
                //       upsert missing devices.
3✔
529

3✔
530
                models := make([]mongo.WriteModel, len(devices))
3✔
531
                for i, dev := range devices {
12✔
532
                        umod := mongo.NewUpdateOneModel()
9✔
533
                        if withRevision {
12✔
534
                                filter = bson.M{
3✔
535
                                        "_id":         dev.Id,
3✔
536
                                        DbDevRevision: bson.M{"$lt": dev.Revision},
3✔
537
                                }
3✔
538
                                update[DbDevRevision] = dev.Revision
3✔
539
                                umod.Update = bson.M{
3✔
540
                                        "$set":         update,
3✔
541
                                        "$setOnInsert": oninsert,
3✔
542
                                }
3✔
543
                        } else {
9✔
544
                                filter = map[string]interface{}{"_id": dev.Id}
6✔
545
                                umod.Update = bson.M{
6✔
546
                                        "$set":         update,
6✔
547
                                        "$setOnInsert": oninsert,
6✔
548
                                }
6✔
549
                        }
6✔
550
                        umod.Filter = filter
9✔
551
                        umod.SetUpsert(true)
9✔
552
                        models[i] = umod
9✔
553
                }
554
                bres, err = c.BulkWrite(
3✔
555
                        ctx, models, mopts.BulkWrite().SetOrdered(false),
3✔
556
                )
3✔
557
                if err != nil {
4✔
558
                        if mongo.IsDuplicateKeyError(err) {
2✔
559
                                // bulk mode, swallow the error as we already updated the other devices
1✔
560
                                // and the Matchedcount and CreatedCount values will tell the caller if
1✔
561
                                // all the operations succeeded or not
1✔
562
                                err = nil
1✔
563
                        } else {
1✔
564
                                return nil, err
×
565
                        }
×
566
                }
567
                result = &model.UpdateResult{
3✔
568
                        MatchedCount: bres.MatchedCount,
3✔
569
                        CreatedCount: bres.UpsertedCount,
3✔
570
                }
3✔
571
        }
572
        return result, err
290✔
573
}
574

575
// makeAttrField is a convenience function for composing attribute field names.
576
func makeAttrField(attrName, attrScope string, subFields ...string) string {
4,748✔
577
        field := fmt.Sprintf(
4,748✔
578
                "%s.%s-%s",
4,748✔
579
                DbDevAttributes,
4,748✔
580
                attrScope,
4,748✔
581
                model.GetDeviceAttributeNameReplacer().Replace(attrName),
4,748✔
582
        )
4,748✔
583
        if len(subFields) > 0 {
9,495✔
584
                field = strings.Join(
4,747✔
585
                        append([]string{field}, subFields...), ".",
4,747✔
586
                )
4,747✔
587
        }
4,747✔
588
        return field
4,748✔
589
}
590

591
// makeAttrUpsert creates a new upsert document for the given attributes.
592
func makeAttrUpsert(attrs model.DeviceAttributes) (bson.M, error) {
322✔
593
        var fieldName string
322✔
594
        upsert := make(bson.M)
322✔
595

322✔
596
        for i := range attrs {
1,753✔
597
                if attrs[i].Name == "" {
1,434✔
598
                        return nil, store.ErrNoAttrName
3✔
599
                }
3✔
600
                if attrs[i].Scope == "" {
1,434✔
601
                        // Default to inventory scope
6✔
602
                        attrs[i].Scope = model.AttrScopeInventory
6✔
603
                }
6✔
604

605
                fieldName = makeAttrField(
1,428✔
606
                        attrs[i].Name,
1,428✔
607
                        attrs[i].Scope,
1,428✔
608
                        DbDevAttributesScope,
1,428✔
609
                )
1,428✔
610
                upsert[fieldName] = attrs[i].Scope
1,428✔
611

1,428✔
612
                fieldName = makeAttrField(
1,428✔
613
                        attrs[i].Name,
1,428✔
614
                        attrs[i].Scope,
1,428✔
615
                        DbDevAttributesName,
1,428✔
616
                )
1,428✔
617
                upsert[fieldName] = attrs[i].Name
1,428✔
618

1,428✔
619
                if attrs[i].Value != nil {
2,851✔
620
                        fieldName = makeAttrField(
1,423✔
621
                                attrs[i].Name,
1,423✔
622
                                attrs[i].Scope,
1,423✔
623
                                DbDevAttributesValue,
1,423✔
624
                        )
1,423✔
625
                        upsert[fieldName] = attrs[i].Value
1,423✔
626
                }
1,423✔
627

628
                if attrs[i].Description != nil {
1,884✔
629
                        fieldName = makeAttrField(
456✔
630
                                attrs[i].Name,
456✔
631
                                attrs[i].Scope,
456✔
632
                                DbDevAttributesDesc,
456✔
633
                        )
456✔
634
                        upsert[fieldName] = attrs[i].Description
456✔
635
                }
456✔
636

637
                if attrs[i].Timestamp != nil {
1,440✔
638
                        fieldName = makeAttrField(
12✔
639
                                attrs[i].Name,
12✔
640
                                attrs[i].Scope,
12✔
641
                                DbDevAttributesTs,
12✔
642
                        )
12✔
643
                        upsert[fieldName] = attrs[i].Timestamp
12✔
644
                }
12✔
645
        }
646
        return upsert, nil
319✔
647
}
648

649
// makeAttrRemove creates a new unset document to remove attributes
650
func makeAttrRemove(attrs model.DeviceAttributes) (bson.M, error) {
26✔
651
        var fieldName string
26✔
652
        remove := make(bson.M)
26✔
653

26✔
654
        for i := range attrs {
27✔
655
                if attrs[i].Name == "" {
1✔
656
                        return nil, store.ErrNoAttrName
×
657
                }
×
658
                if attrs[i].Scope == "" {
1✔
659
                        // Default to inventory scope
×
660
                        attrs[i].Scope = model.AttrScopeInventory
×
661
                }
×
662
                fieldName = makeAttrField(
1✔
663
                        attrs[i].Name,
1✔
664
                        attrs[i].Scope,
1✔
665
                )
1✔
666
                remove[fieldName] = true
1✔
667
        }
668
        return remove, nil
26✔
669
}
670

671
func mongoOperator(co store.ComparisonOperator) string {
7✔
672
        switch co {
7✔
673
        case store.Eq:
7✔
674
                return "$eq"
7✔
675
        }
676
        return ""
×
677
}
678

679
func (db *DataStoreMongo) UpsertRemoveDeviceAttributes(
680
        ctx context.Context,
681
        id model.DeviceID,
682
        updateAttrs model.DeviceAttributes,
683
        removeAttrs model.DeviceAttributes,
684
        scope string,
685
        etag string,
686
) (*model.UpdateResult, error) {
27✔
687
        const systemScope = DbDevAttributes + "." + model.AttrScopeSystem
27✔
688
        const updatedField = systemScope + "-" + model.AttrNameUpdated
27✔
689
        const createdField = systemScope + "-" + model.AttrNameCreated
27✔
690
        const etagField = model.AttrNameTagsEtag
27✔
691
        var (
27✔
692
                err error
27✔
693
        )
27✔
694

27✔
695
        c := db.client.
27✔
696
                Database(mstore.DbFromContext(ctx, DbName)).
27✔
697
                Collection(DbDevicesColl)
27✔
698

27✔
699
        update, err := makeAttrUpsert(updateAttrs)
27✔
700
        if err != nil {
28✔
701
                return nil, err
1✔
702
        }
1✔
703
        remove, err := makeAttrRemove(removeAttrs)
26✔
704
        if err != nil {
26✔
705
                return nil, err
×
706
        }
×
707
        filter := bson.M{"_id": id}
26✔
708
        if etag != "" {
31✔
709
                filter[etagField] = bson.M{"$eq": etag}
5✔
710
        }
5✔
711

712
        updateOpts := mopts.FindOneAndUpdate().
26✔
713
                SetUpsert(true).
26✔
714
                SetReturnDocument(mopts.After)
26✔
715
        if scope == model.AttrScopeTags {
38✔
716
                update[etagField] = uuid.New().String()
12✔
717
                updateOpts = updateOpts.SetUpsert(false)
12✔
718
        }
12✔
719
        now := time.Now()
26✔
720
        if scope != model.AttrScopeTags {
40✔
721
                update[updatedField] = model.DeviceAttribute{
14✔
722
                        Scope: model.AttrScopeSystem,
14✔
723
                        Name:  model.AttrNameUpdated,
14✔
724
                        Value: now,
14✔
725
                }
14✔
726
        }
14✔
727
        update = bson.M{
26✔
728
                "$set": update,
26✔
729
                "$setOnInsert": bson.M{
26✔
730
                        createdField: model.DeviceAttribute{
26✔
731
                                Scope: model.AttrScopeSystem,
26✔
732
                                Name:  model.AttrNameCreated,
26✔
733
                                Value: now,
26✔
734
                        },
26✔
735
                },
26✔
736
        }
26✔
737
        if len(remove) > 0 {
27✔
738
                update["$unset"] = remove
1✔
739
        }
1✔
740

741
        device := &model.Device{}
26✔
742
        res := c.FindOneAndUpdate(ctx, filter, update, updateOpts)
26✔
743
        err = res.Decode(device)
26✔
744
        if err == mongo.ErrNoDocuments {
28✔
745
                return &model.UpdateResult{
2✔
746
                        MatchedCount: 0,
2✔
747
                        CreatedCount: 0,
2✔
748
                        Devices:      []*model.Device{},
2✔
749
                }, nil
2✔
750
        } else if err == nil {
50✔
751
                return &model.UpdateResult{
24✔
752
                        MatchedCount: 1,
24✔
753
                        CreatedCount: 0,
24✔
754
                        Devices:      []*model.Device{device},
24✔
755
                }, nil
24✔
756
        }
24✔
757
        return nil, err
×
758
}
759

760
func (db *DataStoreMongo) UpdateDevicesGroup(
761
        ctx context.Context,
762
        devIDs []model.DeviceID,
763
        group model.GroupName,
764
) (*model.UpdateResult, error) {
61✔
765
        database := db.client.Database(mstore.DbFromContext(ctx, DbName))
61✔
766
        collDevs := database.Collection(DbDevicesColl)
61✔
767

61✔
768
        var filter = bson.M{}
61✔
769
        switch len(devIDs) {
61✔
770
        case 0:
3✔
771
                return &model.UpdateResult{}, nil
3✔
772
        case 1:
54✔
773
                filter[DbDevId] = devIDs[0]
54✔
774
        default:
4✔
775
                filter[DbDevId] = bson.M{"$in": devIDs}
4✔
776
        }
777
        update := bson.M{
58✔
778
                "$set": bson.M{
58✔
779
                        DbDevAttributesGroup: model.DeviceAttribute{
58✔
780
                                Scope: model.AttrScopeSystem,
58✔
781
                                Name:  DbDevGroup,
58✔
782
                                Value: group,
58✔
783
                        },
58✔
784
                },
58✔
785
        }
58✔
786
        res, err := collDevs.UpdateMany(ctx, filter, update)
58✔
787
        if err != nil {
58✔
788
                return nil, err
×
789
        }
×
790
        return &model.UpdateResult{
58✔
791
                MatchedCount: res.MatchedCount,
58✔
792
                UpdatedCount: res.ModifiedCount,
58✔
793
        }, nil
58✔
794
}
795

796
// UpdateDeviceText updates the device text field
797
func (db *DataStoreMongo) UpdateDeviceText(
798
        ctx context.Context,
799
        deviceID model.DeviceID,
800
        text string,
801
) error {
22✔
802
        filter := bson.M{
22✔
803
                DbDevId: deviceID.String(),
22✔
804
        }
22✔
805

22✔
806
        update := bson.M{
22✔
807
                "$set": bson.M{
22✔
808
                        DbDevAttributesText: text,
22✔
809
                },
22✔
810
        }
22✔
811

22✔
812
        database := db.client.Database(mstore.DbFromContext(ctx, DbName))
22✔
813
        collDevs := database.Collection(DbDevicesColl)
22✔
814

22✔
815
        _, err := collDevs.UpdateOne(ctx, filter, update)
22✔
816
        return err
22✔
817
}
22✔
818

819
func (db *DataStoreMongo) GetFiltersAttributes(
820
        ctx context.Context,
821
) ([]model.FilterAttribute, error) {
4✔
822
        database := db.client.Database(mstore.DbFromContext(ctx, DbName))
4✔
823
        collDevs := database.Collection(DbDevicesColl)
4✔
824

4✔
825
        const DbCount = "count"
4✔
826

4✔
827
        cur, err := collDevs.Aggregate(ctx, []bson.M{
4✔
828
                {
4✔
829
                        "$project": bson.M{
4✔
830
                                "attributes": bson.M{
4✔
831
                                        "$objectToArray": "$" + DbDevAttributes,
4✔
832
                                },
4✔
833
                        },
4✔
834
                },
4✔
835
                {
4✔
836
                        "$unwind": "$" + DbDevAttributes,
4✔
837
                },
4✔
838
                {
4✔
839
                        "$project": bson.M{
4✔
840
                                DbDevAttributesName:  "$" + DbDevAttributes + ".v." + DbDevAttributesName,
4✔
841
                                DbDevAttributesScope: "$" + DbDevAttributes + ".v." + DbDevAttributesScope,
4✔
842
                        },
4✔
843
                },
4✔
844
                {
4✔
845
                        "$group": bson.M{
4✔
846
                                DbDevId: bson.M{
4✔
847
                                        DbDevAttributesName:  "$" + DbDevAttributesName,
4✔
848
                                        DbDevAttributesScope: "$" + DbDevAttributesScope,
4✔
849
                                },
4✔
850
                                DbCount: bson.M{
4✔
851
                                        "$sum": 1,
4✔
852
                                },
4✔
853
                        },
4✔
854
                },
4✔
855
                {
4✔
856
                        "$project": bson.M{
4✔
857
                                DbDevId:              0,
4✔
858
                                DbDevAttributesName:  "$" + DbDevId + "." + DbDevAttributesName,
4✔
859
                                DbDevAttributesScope: "$" + DbDevId + "." + DbDevAttributesScope,
4✔
860
                                DbCount:              "$" + DbCount,
4✔
861
                        },
4✔
862
                },
4✔
863
                {
4✔
864
                        "$sort": bson.D{
4✔
865
                                {Key: DbCount, Value: -1},
4✔
866
                                {Key: DbDevAttributesScope, Value: 1},
4✔
867
                                {Key: DbDevAttributesName, Value: 1},
4✔
868
                        },
4✔
869
                },
4✔
870
                {
4✔
871
                        "$limit": FiltersAttributesLimit,
4✔
872
                },
4✔
873
        })
4✔
874
        if err != nil {
4✔
875
                return nil, err
×
876
        }
×
877
        defer cur.Close(ctx)
4✔
878

4✔
879
        var attributes []model.FilterAttribute
4✔
880
        err = cur.All(ctx, &attributes)
4✔
881
        if err != nil {
4✔
882
                return nil, err
×
883
        }
×
884

885
        return attributes, nil
4✔
886
}
887

888
func (db *DataStoreMongo) DeleteGroup(
889
        ctx context.Context,
890
        group model.GroupName,
891
) (chan model.DeviceID, error) {
1✔
892
        deviceIDs := make(chan model.DeviceID)
1✔
893

1✔
894
        database := db.client.Database(mstore.DbFromContext(ctx, DbName))
1✔
895
        collDevs := database.Collection(DbDevicesColl)
1✔
896

1✔
897
        filter := bson.M{DbDevAttributesGroupValue: group}
1✔
898

1✔
899
        const batchMaxSize = 100
1✔
900
        batchSize := int32(batchMaxSize)
1✔
901
        findOptions := &mopts.FindOptions{
1✔
902
                Projection: bson.M{DbDevId: 1},
1✔
903
                BatchSize:  &batchSize,
1✔
904
        }
1✔
905
        cursor, err := collDevs.Find(ctx, filter, findOptions)
1✔
906
        if err != nil {
1✔
907
                return nil, err
×
908
        }
×
909

910
        go func() {
2✔
911
                defer cursor.Close(ctx)
1✔
912
                batch := make([]model.DeviceID, batchMaxSize)
1✔
913
                batchSize := 0
1✔
914

1✔
915
                update := bson.M{"$unset": bson.M{DbDevAttributesGroup: 1}}
1✔
916
                device := &model.Device{}
1✔
917
                defer close(deviceIDs)
1✔
918

1✔
919
        next:
1✔
920
                for {
5✔
921
                        hasNext := cursor.Next(ctx)
4✔
922
                        if !hasNext {
6✔
923
                                if batchSize > 0 {
3✔
924
                                        break
1✔
925
                                }
926
                                return
1✔
927
                        }
928
                        if err = cursor.Decode(&device); err == nil {
4✔
929
                                batch[batchSize] = device.ID
2✔
930
                                batchSize++
2✔
931
                                if len(batch) == batchSize {
2✔
932
                                        break
×
933
                                }
934
                        }
935
                }
936

937
                _, _ = collDevs.UpdateMany(ctx, bson.M{DbDevId: bson.M{"$in": batch[:batchSize]}}, update)
1✔
938
                for _, item := range batch[:batchSize] {
3✔
939
                        deviceIDs <- item
2✔
940
                }
2✔
941
                batchSize = 0
1✔
942
                goto next
1✔
943
        }()
944

945
        return deviceIDs, nil
1✔
946
}
947

948
func (db *DataStoreMongo) UnsetDevicesGroup(
949
        ctx context.Context,
950
        deviceIDs []model.DeviceID,
951
        group model.GroupName,
952
) (*model.UpdateResult, error) {
14✔
953
        database := db.client.Database(mstore.DbFromContext(ctx, DbName))
14✔
954
        collDevs := database.Collection(DbDevicesColl)
14✔
955

14✔
956
        var filter bson.D
14✔
957
        // Add filter on device id (either $in or direct indexing)
14✔
958
        switch len(deviceIDs) {
14✔
959
        case 0:
1✔
960
                return &model.UpdateResult{}, nil
1✔
961
        case 1:
10✔
962
                filter = bson.D{{Key: DbDevId, Value: deviceIDs[0]}}
10✔
963
        default:
3✔
964
                filter = bson.D{{Key: DbDevId, Value: bson.M{"$in": deviceIDs}}}
3✔
965
        }
966
        // Append filter on group
967
        filter = append(
13✔
968
                filter,
13✔
969
                bson.E{Key: DbDevAttributesGroupValue, Value: group},
13✔
970
        )
13✔
971
        // Create unset operation on group attribute
13✔
972
        update := bson.M{
13✔
973
                "$unset": bson.M{
13✔
974
                        DbDevAttributesGroup: "",
13✔
975
                },
13✔
976
        }
13✔
977
        res, err := collDevs.UpdateMany(ctx, filter, update)
13✔
978
        if err != nil {
13✔
979
                return nil, err
×
980
        }
×
981
        return &model.UpdateResult{
13✔
982
                MatchedCount: res.MatchedCount,
13✔
983
                UpdatedCount: res.ModifiedCount,
13✔
984
        }, nil
13✔
985
}
986

987
func predicateToQuery(pred model.FilterPredicate) (bson.D, error) {
2✔
988
        if err := pred.Validate(); err != nil {
3✔
989
                return nil, err
1✔
990
        }
1✔
991
        name := fmt.Sprintf(
1✔
992
                "%s.%s-%s.value",
1✔
993
                DbDevAttributes,
1✔
994
                pred.Scope,
1✔
995
                model.GetDeviceAttributeNameReplacer().Replace(pred.Attribute),
1✔
996
        )
1✔
997
        return bson.D{{
1✔
998
                Key: name, Value: bson.D{{Key: pred.Type, Value: pred.Value}},
1✔
999
        }}, nil
1✔
1000
}
1001

1002
func (db *DataStoreMongo) ListGroups(
1003
        ctx context.Context,
1004
        filters []model.FilterPredicate,
1005
) ([]model.GroupName, error) {
12✔
1006
        c := db.client.
12✔
1007
                Database(mstore.DbFromContext(ctx, DbName)).
12✔
1008
                Collection(DbDevicesColl)
12✔
1009

12✔
1010
        fltr := bson.D{{
12✔
1011
                Key: DbDevAttributesGroupValue, Value: bson.M{"$exists": true},
12✔
1012
        }}
12✔
1013
        if len(fltr) > 0 {
24✔
1014
                for _, p := range filters {
14✔
1015
                        q, err := predicateToQuery(p)
2✔
1016
                        if err != nil {
3✔
1017
                                return nil, errors.Wrap(
1✔
1018
                                        err, "store: bad filter predicate",
1✔
1019
                                )
1✔
1020
                        }
1✔
1021
                        fltr = append(fltr, q...)
1✔
1022
                }
1023
        }
1024
        results, err := c.Distinct(
11✔
1025
                ctx, DbDevAttributesGroupValue, fltr,
11✔
1026
        )
11✔
1027
        if err != nil {
11✔
1028
                return nil, err
×
1029
        }
×
1030

1031
        groups := make([]model.GroupName, len(results))
11✔
1032
        for i, d := range results {
47✔
1033
                groups[i] = model.GroupName(d.(string))
36✔
1034
        }
36✔
1035
        return groups, nil
11✔
1036
}
1037

1038
func (db *DataStoreMongo) GetDevicesByGroup(
1039
        ctx context.Context,
1040
        group model.GroupName,
1041
        skip,
1042
        limit int,
1043
) ([]model.DeviceID, int, error) {
37✔
1044
        c := db.client.
37✔
1045
                Database(mstore.DbFromContext(ctx, DbName)).
37✔
1046
                Collection(DbDevicesColl)
37✔
1047

37✔
1048
        filter := bson.M{DbDevAttributesGroupValue: group}
37✔
1049
        result := c.FindOne(ctx, filter)
37✔
1050
        if result == nil {
37✔
1051
                return nil, -1, store.ErrGroupNotFound
×
1052
        }
×
1053

1054
        var dev model.Device
37✔
1055
        err := result.Decode(&dev)
37✔
1056
        if err != nil {
43✔
1057
                return nil, -1, store.ErrGroupNotFound
6✔
1058
        }
6✔
1059

1060
        hasGroup := group != ""
31✔
1061
        devices, totalDevices, e := db.GetDevices(ctx,
31✔
1062
                store.ListQuery{
31✔
1063
                        Skip:      skip,
31✔
1064
                        Limit:     limit,
31✔
1065
                        Filters:   nil,
31✔
1066
                        Sort:      nil,
31✔
1067
                        HasGroup:  &hasGroup,
31✔
1068
                        GroupName: string(group)})
31✔
1069
        if e != nil {
31✔
1070
                return nil, -1, errors.Wrap(e, "failed to get device list for group")
×
1071
        }
×
1072

1073
        resIds := make([]model.DeviceID, len(devices))
31✔
1074
        for i, d := range devices {
84✔
1075
                resIds[i] = d.ID
53✔
1076
        }
53✔
1077
        return resIds, totalDevices, nil
31✔
1078
}
1079

1080
func (db *DataStoreMongo) GetDeviceGroup(
1081
        ctx context.Context,
1082
        id model.DeviceID,
1083
) (model.GroupName, error) {
6✔
1084
        dev, err := db.GetDevice(ctx, id)
6✔
1085
        if err != nil || dev == nil {
8✔
1086
                return "", store.ErrDevNotFound
2✔
1087
        }
2✔
1088

1089
        return dev.Group, nil
4✔
1090
}
1091

1092
func (db *DataStoreMongo) DeleteDevices(
1093
        ctx context.Context, ids []model.DeviceID,
1094
) (*model.UpdateResult, error) {
3✔
1095
        var filter = bson.M{}
3✔
1096
        database := db.client.Database(mstore.DbFromContext(ctx, DbName))
3✔
1097
        collDevs := database.Collection(DbDevicesColl)
3✔
1098

3✔
1099
        switch len(ids) {
3✔
1100
        case 0:
×
1101
                // This is a no-op, don't bother requesting mongo.
×
1102
                return &model.UpdateResult{DeletedCount: 0}, nil
×
1103
        case 1:
2✔
1104
                filter[DbDevId] = ids[0]
2✔
1105
        default:
1✔
1106
                filter[DbDevId] = bson.M{"$in": ids}
1✔
1107
        }
1108
        res, err := collDevs.DeleteMany(ctx, filter)
3✔
1109
        if err != nil {
3✔
1110
                return nil, err
×
1111
        }
×
1112
        return &model.UpdateResult{
3✔
1113
                DeletedCount: res.DeletedCount,
3✔
1114
        }, nil
3✔
1115
}
1116

1117
func (db *DataStoreMongo) GetAllAttributeNames(ctx context.Context) ([]string, error) {
31✔
1118
        c := db.client.Database(mstore.DbFromContext(ctx, DbName)).Collection(DbDevicesColl)
31✔
1119

31✔
1120
        project := bson.M{
31✔
1121
                "$project": bson.M{
31✔
1122
                        "arrayofkeyvalue": bson.M{
31✔
1123
                                "$objectToArray": "$$ROOT.attributes",
31✔
1124
                        },
31✔
1125
                },
31✔
1126
        }
31✔
1127

31✔
1128
        unwind := bson.M{
31✔
1129
                "$unwind": "$arrayofkeyvalue",
31✔
1130
        }
31✔
1131

31✔
1132
        group := bson.M{
31✔
1133
                "$group": bson.M{
31✔
1134
                        "_id": nil,
31✔
1135
                        "allkeys": bson.M{
31✔
1136
                                "$addToSet": "$arrayofkeyvalue.v.name",
31✔
1137
                        },
31✔
1138
                },
31✔
1139
        }
31✔
1140

31✔
1141
        l := log.FromContext(ctx)
31✔
1142
        cursor, err := c.Aggregate(ctx, []bson.M{
31✔
1143
                project,
31✔
1144
                unwind,
31✔
1145
                group,
31✔
1146
        })
31✔
1147
        if err != nil {
31✔
1148
                return nil, err
×
1149
        }
×
1150
        defer cursor.Close(ctx)
31✔
1151

31✔
1152
        cursor.Next(ctx)
31✔
1153
        elem := &bson.D{}
31✔
1154
        err = cursor.Decode(elem)
31✔
1155
        if err != nil {
45✔
1156
                if err != io.EOF {
14✔
1157
                        return nil, errors.Wrap(err, "failed to get attributes")
×
1158
                } else {
14✔
1159
                        return make([]string, 0), nil
14✔
1160
                }
14✔
1161
        }
1162
        m := elem.Map()
17✔
1163
        results := m["allkeys"].(primitive.A)
17✔
1164
        attributeNames := make([]string, len(results))
17✔
1165
        for i, d := range results {
76✔
1166
                attributeNames[i] = d.(string)
59✔
1167
                l.Debugf("GetAllAttributeNames got: '%v'", d)
59✔
1168
        }
59✔
1169

1170
        return attributeNames, nil
17✔
1171
}
1172

1173
func (db *DataStoreMongo) SearchDevices(
1174
        ctx context.Context,
1175
        searchParams model.SearchParams,
1176
) ([]model.Device, int, error) {
15✔
1177
        c := db.client.Database(mstore.DbFromContext(ctx, DbName)).Collection(DbDevicesColl)
15✔
1178

15✔
1179
        queryFilters := make([]bson.M, 0)
15✔
1180
        for _, filter := range searchParams.Filters {
29✔
1181
                op := filter.Type
14✔
1182
                var field string
14✔
1183
                if filter.Scope == model.AttrScopeIdentity && filter.Attribute == model.AttrNameID {
16✔
1184
                        field = DbDevId
2✔
1185
                } else {
14✔
1186
                        name := fmt.Sprintf(
12✔
1187
                                "%s-%s",
12✔
1188
                                filter.Scope,
12✔
1189
                                model.GetDeviceAttributeNameReplacer().Replace(filter.Attribute),
12✔
1190
                        )
12✔
1191
                        field = fmt.Sprintf("%s.%s.%s", DbDevAttributes, name, DbDevAttributesValue)
12✔
1192
                }
12✔
1193
                queryFilters = append(queryFilters, bson.M{field: bson.M{op: filter.Value}})
14✔
1194
        }
1195

1196
        // FIXME: remove after migrating ids to attributes
1197
        if len(searchParams.DeviceIDs) > 0 {
16✔
1198
                queryFilters = append(queryFilters, bson.M{"_id": bson.M{"$in": searchParams.DeviceIDs}})
1✔
1199
        }
1✔
1200

1201
        if searchParams.Text != "" {
16✔
1202
                queryFilters = append(queryFilters, bson.M{
1✔
1203
                        "$text": bson.M{
1✔
1204
                                "$search": utils.TextToKeywords(searchParams.Text),
1✔
1205
                        },
1✔
1206
                })
1✔
1207
        }
1✔
1208

1209
        findQuery := bson.M{}
15✔
1210
        if len(queryFilters) > 0 {
28✔
1211
                findQuery["$and"] = queryFilters
13✔
1212
        }
13✔
1213

1214
        findOptions := mopts.Find()
15✔
1215
        findOptions.SetSkip(int64((searchParams.Page - 1) * searchParams.PerPage))
15✔
1216
        findOptions.SetLimit(int64(searchParams.PerPage))
15✔
1217

15✔
1218
        if len(searchParams.Attributes) > 0 {
17✔
1219
                name := fmt.Sprintf(
2✔
1220
                        "%s-%s",
2✔
1221
                        model.AttrScopeSystem,
2✔
1222
                        model.GetDeviceAttributeNameReplacer().Replace(DbDevUpdatedTs),
2✔
1223
                )
2✔
1224
                field := fmt.Sprintf("%s.%s", DbDevAttributes, name)
2✔
1225
                projection := bson.M{field: 1}
2✔
1226
                for _, attribute := range searchParams.Attributes {
5✔
1227
                        name := fmt.Sprintf(
3✔
1228
                                "%s-%s",
3✔
1229
                                attribute.Scope,
3✔
1230
                                model.GetDeviceAttributeNameReplacer().Replace(attribute.Attribute),
3✔
1231
                        )
3✔
1232
                        field := fmt.Sprintf("%s.%s", DbDevAttributes, name)
3✔
1233
                        projection[field] = 1
3✔
1234
                }
3✔
1235
                findOptions.SetProjection(projection)
2✔
1236
        }
1237

1238
        if searchParams.Text != "" {
16✔
1239
                findOptions.SetSort(bson.M{"score": bson.M{"$meta": "textScore"}})
1✔
1240
        } else if len(searchParams.Sort) > 0 {
19✔
1241
                sortField := make(bson.D, len(searchParams.Sort))
4✔
1242
                for i, sortQ := range searchParams.Sort {
9✔
1243
                        var field string
5✔
1244
                        if sortQ.Scope == model.AttrScopeIdentity && sortQ.Attribute == model.AttrNameID {
6✔
1245
                                field = DbDevId
1✔
1246
                        } else {
5✔
1247
                                name := fmt.Sprintf(
4✔
1248
                                        "%s-%s",
4✔
1249
                                        sortQ.Scope,
4✔
1250
                                        model.GetDeviceAttributeNameReplacer().Replace(sortQ.Attribute),
4✔
1251
                                )
4✔
1252
                                field = fmt.Sprintf("%s.%s", DbDevAttributes, name)
4✔
1253
                        }
4✔
1254
                        sortField[i] = bson.E{Key: field, Value: 1}
5✔
1255
                        if sortQ.Order == "desc" {
8✔
1256
                                sortField[i].Value = -1
3✔
1257
                        }
3✔
1258
                }
1259
                findOptions.SetSort(sortField)
4✔
1260
        }
1261

1262
        cursor, err := c.Find(ctx, findQuery, findOptions)
15✔
1263
        if err != nil {
16✔
1264
                return nil, -1, errors.Wrap(err, "failed to search devices")
1✔
1265
        }
1✔
1266
        defer cursor.Close(ctx)
14✔
1267

14✔
1268
        devices := []model.Device{}
14✔
1269

14✔
1270
        if err = cursor.All(ctx, &devices); err != nil {
14✔
1271
                return nil, -1, errors.Wrap(err, "failed to search devices")
×
1272
        }
×
1273

1274
        count, err := c.CountDocuments(ctx, findQuery)
14✔
1275
        if err != nil {
14✔
1276
                return nil, -1, errors.Wrap(err, "failed to search devices")
×
1277
        }
×
1278

1279
        return devices, int(count), nil
14✔
1280
}
1281

1282
func indexAttr(s *mongo.Client, ctx context.Context, attr string) error {
68✔
1283
        l := log.FromContext(ctx)
68✔
1284
        c := s.Database(mstore.DbFromContext(ctx, DbName)).Collection(DbDevicesColl)
68✔
1285

68✔
1286
        indexView := c.Indexes()
68✔
1287
        keys := bson.D{
68✔
1288
                {Key: indexAttrName(attrIdentityStatus), Value: 1},
68✔
1289
                {Key: indexAttrName(attr), Value: 1},
68✔
1290
        }
68✔
1291
        _, err := indexView.CreateOne(ctx, mongo.IndexModel{Keys: keys, Options: &mopts.IndexOptions{
68✔
1292
                Name: &attr,
68✔
1293
        }})
68✔
1294

68✔
1295
        if err != nil {
68✔
1296
                if isTooManyIndexes(err) {
×
1297
                        l.Warnf(
×
1298
                                "failed to index attr %s in db %s: too many indexes",
×
1299
                                attr,
×
1300
                                mstore.DbFromContext(ctx, DbName),
×
1301
                        )
×
1302
                } else {
×
1303
                        return errors.Wrapf(
×
1304
                                err,
×
1305
                                "failed to index attr %s in db %s",
×
1306
                                attr,
×
1307
                                mstore.DbFromContext(ctx, DbName),
×
1308
                        )
×
1309
                }
×
1310
        }
1311

1312
        return nil
68✔
1313
}
1314

1315
func indexAttrName(attr string) string {
152✔
1316
        return fmt.Sprintf("attributes.%s.value", attr)
152✔
1317
}
152✔
1318

1319
func isTooManyIndexes(e error) bool {
×
1320
        return strings.HasPrefix(e.Error(), "add index fails, too many indexes for inventory.devices")
×
1321
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc