• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 742364226

pending completion
742364226

Pull #87

gitlab-ci

Fabio Tranchitella
refac: use snake case everywhere when reading and writing data to OS
Pull Request #87: MEN-5930: index device deployment objects

783 of 896 new or added lines in 16 files covered. (87.39%)

1 existing line in 1 file now uncovered.

2694 of 3150 relevant lines covered (85.52%)

17.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.83
/store/opensearch/store.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package opensearch
16

17
import (
18
        "bytes"
19
        "context"
20
        "encoding/json"
21
        "fmt"
22
        "io/ioutil"
23
        "net/http"
24
        "strings"
25

26
        "github.com/opensearch-project/opensearch-go"
27
        "github.com/opensearch-project/opensearch-go/opensearchapi"
28
        "github.com/pkg/errors"
29

30
        "github.com/mendersoftware/go-lib-micro/identity"
31
        "github.com/mendersoftware/go-lib-micro/log"
32
        _ "github.com/mendersoftware/go-lib-micro/log"
33

34
        "github.com/mendersoftware/reporting/model"
35
        "github.com/mendersoftware/reporting/store"
36
)
37

38
type StoreOption func(*opensearchStore)
39

40
type opensearchStore struct {
41
        addresses                []string
42
        devicesIndexName         string
43
        devicesIndexShards       int
44
        devicesIndexReplicas     int
45
        deploymentsIndexName     string
46
        deploymentsIndexShards   int
47
        deploymentsIndexReplicas int
48
        client                   *opensearch.Client
49
}
50

51
func NewStore(opts ...StoreOption) (store.Store, error) {
4✔
52
        store := &opensearchStore{}
4✔
53
        for _, opt := range opts {
32✔
54
                opt(store)
28✔
55
        }
28✔
56

57
        cfg := opensearch.Config{
4✔
58
                Addresses: store.addresses,
4✔
59
        }
4✔
60
        osClient, err := opensearch.NewClient(cfg)
4✔
61
        if err != nil {
4✔
62
                return nil, errors.Wrap(err, "invalid OpenSearch configuration")
×
63
        }
×
64

65
        store.client = osClient
4✔
66
        return store, nil
4✔
67
}
68

69
func WithServerAddresses(addresses []string) StoreOption {
4✔
70
        return func(s *opensearchStore) {
8✔
71
                s.addresses = addresses
4✔
72
        }
4✔
73
}
74

75
func WithDevicesIndexName(indexName string) StoreOption {
4✔
76
        return func(s *opensearchStore) {
8✔
77
                s.devicesIndexName = indexName
4✔
78
        }
4✔
79
}
80

81
func WithDevicesIndexShards(indexShards int) StoreOption {
4✔
82
        return func(s *opensearchStore) {
8✔
83
                s.devicesIndexShards = indexShards
4✔
84
        }
4✔
85
}
86

87
func WithDevicesIndexReplicas(indexReplicas int) StoreOption {
4✔
88
        return func(s *opensearchStore) {
8✔
89
                s.devicesIndexReplicas = indexReplicas
4✔
90
        }
4✔
91
}
92

93
func WithDeploymentsIndexName(indexName string) StoreOption {
4✔
94
        return func(s *opensearchStore) {
8✔
95
                s.deploymentsIndexName = indexName
4✔
96
        }
4✔
97
}
98

99
func WithDeploymentsIndexShards(indexShards int) StoreOption {
4✔
100
        return func(s *opensearchStore) {
8✔
101
                s.deploymentsIndexShards = indexShards
4✔
102
        }
4✔
103
}
104

105
func WithDeploymentsIndexReplicas(indexReplicas int) StoreOption {
4✔
106
        return func(s *opensearchStore) {
8✔
107
                s.deploymentsIndexReplicas = indexReplicas
4✔
108
        }
4✔
109
}
110

111
type BulkAction struct {
112
        Type string
113
        Desc *BulkActionDesc
114
}
115

116
type BulkActionDesc struct {
117
        ID            string `json:"_id"`
118
        Index         string `json:"_index"`
119
        IfSeqNo       int64  `json:"_if_seq_no"`
120
        IfPrimaryTerm int64  `json:"_if_primary_term"`
121
        Routing       string `json:"routing"`
122
        Tenant        string
123
}
124

125
type BulkItem struct {
126
        Action *BulkAction
127
        Doc    interface{}
128
}
129

130
func (bad BulkActionDesc) MarshalJSON() ([]byte, error) {
19✔
131
        return json.Marshal(struct {
19✔
132
                ID      string `json:"_id"`
19✔
133
                Index   string `json:"_index"`
19✔
134
                Routing string `json:"routing"`
19✔
135
        }{
19✔
136
                ID:      bad.ID,
19✔
137
                Index:   bad.Index,
19✔
138
                Routing: bad.Routing,
19✔
139
        })
19✔
140
}
19✔
141

142
func (ba BulkAction) MarshalJSON() ([]byte, error) {
19✔
143
        a := map[string]*BulkActionDesc{
19✔
144
                ba.Type: ba.Desc,
19✔
145
        }
19✔
146
        return json.Marshal(a)
19✔
147
}
19✔
148

149
func (bi BulkItem) Marshal() ([]byte, error) {
×
150
        action, err := json.Marshal(bi.Action)
×
151
        if err != nil {
×
152
                return nil, err
×
153
        }
×
154
        buf := bytes.NewBuffer(action)
×
155
        buf.WriteString("\n")
×
156

×
157
        if bi.Doc == nil {
×
158
                return buf.Bytes(), nil
×
159
        }
×
160

161
        if bi.Doc != nil {
×
162
                doc, err := json.Marshal(bi.Doc)
×
163
                if err != nil {
×
164
                        return nil, err
×
165
                }
×
166
                buf.Write(doc)
×
167
                buf.WriteString("\n")
×
168
        }
169

170
        return buf.Bytes(), nil
×
171
}
172

173
func (s *opensearchStore) BulkIndexDeployments(ctx context.Context,
174
        deployments []*model.Deployment) error {
2✔
175
        var data strings.Builder
2✔
176

2✔
177
        for _, deployment := range deployments {
6✔
178
                actionJSON, err := json.Marshal(BulkAction{
4✔
179
                        Type: "index",
4✔
180
                        Desc: &BulkActionDesc{
4✔
181
                                ID:      deployment.ID,
4✔
182
                                Index:   s.GetDeploymentsIndex(deployment.TenantID),
4✔
183
                                Routing: s.GetDeploymentsRoutingKey(deployment.TenantID),
4✔
184
                        },
4✔
185
                })
4✔
186
                if err != nil {
4✔
NEW
187
                        return err
×
NEW
188
                }
×
189
                deploymentJSON, err := json.Marshal(deployment)
4✔
190
                if err != nil {
4✔
NEW
191
                        return err
×
NEW
192
                }
×
193
                data.WriteString(string(actionJSON) + "\n" + string(deploymentJSON) + "\n")
4✔
194
        }
195
        dataString := data.String()
2✔
196

2✔
197
        l := log.FromContext(ctx)
2✔
198
        l.Debugf("opensearch request: %s", dataString)
2✔
199

2✔
200
        req := opensearchapi.BulkRequest{
2✔
201
                Body: strings.NewReader(dataString),
2✔
202
        }
2✔
203
        res, err := req.Do(ctx, s.client)
2✔
204
        if err != nil {
2✔
NEW
205
                return errors.Wrap(err, "failed to bulk index")
×
NEW
206
        }
×
207
        defer res.Body.Close()
2✔
208

2✔
209
        return nil
2✔
210
}
211

212
func (s *opensearchStore) BulkIndexDevices(ctx context.Context, devices []*model.Device,
213
        removedDevices []*model.Device) error {
6✔
214
        var data strings.Builder
6✔
215

6✔
216
        for _, device := range devices {
21✔
217
                actionJSON, err := json.Marshal(BulkAction{
15✔
218
                        Type: "index",
15✔
219
                        Desc: &BulkActionDesc{
15✔
220
                                ID:      device.GetID(),
15✔
221
                                Index:   s.GetDevicesIndex(device.GetTenantID()),
15✔
222
                                Routing: s.GetDevicesRoutingKey(device.GetTenantID()),
15✔
223
                        },
15✔
224
                })
15✔
225
                if err != nil {
15✔
226
                        return err
×
227
                }
×
228
                deviceJSON, err := json.Marshal(device)
15✔
229
                if err != nil {
15✔
230
                        return err
×
231
                }
×
232
                data.WriteString(string(actionJSON) + "\n" + string(deviceJSON) + "\n")
15✔
233
        }
234
        for _, device := range removedDevices {
6✔
235
                actionJSON, err := json.Marshal(BulkAction{
×
236
                        Type: "delete",
×
237
                        Desc: &BulkActionDesc{
×
238
                                ID:      device.GetID(),
×
239
                                Index:   s.GetDevicesIndex(device.GetTenantID()),
×
240
                                Routing: s.GetDevicesRoutingKey(device.GetTenantID()),
×
241
                        },
×
242
                })
×
243
                if err != nil {
×
244
                        return err
×
245
                }
×
246
                data.WriteString(string(actionJSON) + "\n")
×
247
        }
248

249
        dataString := data.String()
6✔
250

6✔
251
        l := log.FromContext(ctx)
6✔
252
        l.Debugf("opensearch request: %s", dataString)
6✔
253

6✔
254
        req := opensearchapi.BulkRequest{
6✔
255
                Body: strings.NewReader(dataString),
6✔
256
        }
6✔
257
        res, err := req.Do(ctx, s.client)
6✔
258
        if err != nil {
6✔
259
                return errors.Wrap(err, "failed to bulk index")
×
260
        }
×
261
        defer res.Body.Close()
6✔
262

6✔
263
        return nil
6✔
264
}
265

266
func (s *opensearchStore) Migrate(ctx context.Context) error {
4✔
267
        indexName := s.GetDevicesIndex("")
4✔
268
        template := fmt.Sprintf(indexDevicesTemplate,
4✔
269
                indexName,
4✔
270
                s.devicesIndexShards,
4✔
271
                s.devicesIndexReplicas,
4✔
272
        )
4✔
273
        err := s.migratePutIndexTemplate(ctx, indexName, template)
4✔
274
        if err == nil {
8✔
275
                err = s.migrateCreateIndex(ctx, indexName)
4✔
276
        }
4✔
277
        if err == nil {
7✔
278
                indexName = s.GetDeploymentsIndex("")
3✔
279
                template = fmt.Sprintf(indexDeploymentsTemplate,
3✔
280
                        indexName,
3✔
281
                        s.devicesIndexShards,
3✔
282
                        s.devicesIndexReplicas,
3✔
283
                )
3✔
284
                err = s.migratePutIndexTemplate(ctx, indexName, template)
3✔
285
        }
3✔
286
        if err == nil {
7✔
287
                err = s.migrateCreateIndex(ctx, indexName)
3✔
288
        }
3✔
289
        return err
4✔
290
}
291

292
func (s *opensearchStore) migratePutIndexTemplate(ctx context.Context,
293
        indexName, template string) error {
7✔
294
        l := log.FromContext(ctx)
7✔
295
        l.Infof("put the index template for %s", indexName)
7✔
296

7✔
297
        req := opensearchapi.IndicesPutIndexTemplateRequest{
7✔
298
                Name: indexName,
7✔
299
                Body: strings.NewReader(template),
7✔
300
        }
7✔
301

7✔
302
        res, err := req.Do(ctx, s.client)
7✔
303
        if err != nil {
7✔
304
                return errors.Wrap(err, "failed to put the index template")
×
305
        }
×
306
        defer res.Body.Close()
7✔
307

7✔
308
        if res.StatusCode != http.StatusOK {
7✔
NEW
309
                body, _ := ioutil.ReadAll(res.Body)
×
NEW
310
                return errors.Errorf("failed to set up the index template: %s", string(body))
×
UNCOV
311
        }
×
312
        return nil
7✔
313
}
314

315
func (s *opensearchStore) migrateCreateIndex(ctx context.Context, indexName string) error {
7✔
316
        l := log.FromContext(ctx)
7✔
317
        l.Infof("verify if the index %s exists", indexName)
7✔
318

7✔
319
        req := opensearchapi.IndicesExistsRequest{
7✔
320
                Index: []string{indexName},
7✔
321
        }
7✔
322
        res, err := req.Do(ctx, s.client)
7✔
323
        if err != nil {
7✔
324
                return errors.Wrap(err, "failed to verify the index")
×
325
        }
×
326
        defer res.Body.Close()
7✔
327

7✔
328
        if res.StatusCode == http.StatusNotFound {
11✔
329
                l.Infof("create the index %s", indexName)
4✔
330

4✔
331
                req := opensearchapi.IndicesCreateRequest{
4✔
332
                        Index: indexName,
4✔
333
                }
4✔
334
                res, err := req.Do(ctx, s.client)
4✔
335
                if err != nil {
4✔
336
                        return errors.Wrap(err, "failed to create the index")
×
337
                }
×
338
                defer res.Body.Close()
4✔
339

4✔
340
                if res.StatusCode != http.StatusOK {
6✔
341
                        return errors.New("failed to create the index")
2✔
342
                }
2✔
343
        } else if res.StatusCode != http.StatusOK {
3✔
344
                return errors.New("failed to verify the index")
×
345
        }
×
346

347
        return nil
5✔
348
}
349

350
func (s *opensearchStore) Ping(ctx context.Context) error {
118✔
351
        pingRequest := s.client.Ping.WithContext(ctx)
118✔
352
        _, err := s.client.Ping(pingRequest)
118✔
353
        return errors.Wrap(err, "failed to ping opensearch")
118✔
354
}
118✔
355

356
func (s *opensearchStore) AggregateDevices(ctx context.Context,
357
        query model.Query) (model.M, error) {
2✔
358
        id := identity.FromContext(ctx)
2✔
359
        indexName := s.GetDevicesIndex(id.Tenant)
2✔
360
        routingKey := s.GetDevicesRoutingKey(id.Tenant)
2✔
361
        return s.aggregate(ctx, indexName, routingKey, query)
2✔
362
}
2✔
363

364
func (s *opensearchStore) AggregateDeployments(ctx context.Context,
365
        query model.Query) (model.M, error) {
2✔
366
        id := identity.FromContext(ctx)
2✔
367
        indexName := s.GetDeploymentsIndex(id.Tenant)
2✔
368
        routingKey := s.GetDeploymentsRoutingKey(id.Tenant)
2✔
369
        return s.aggregate(ctx, indexName, routingKey, query)
2✔
370
}
2✔
371

372
func (s *opensearchStore) aggregate(ctx context.Context, indexName, routingKey string,
373
        query model.Query) (model.M, error) {
4✔
374
        l := log.FromContext(ctx)
4✔
375

4✔
376
        var buf bytes.Buffer
4✔
377
        if err := json.NewEncoder(&buf).Encode(query); err != nil {
4✔
378
                return nil, err
×
379
        }
×
380

381
        l.Debugf("es query: %v", buf.String())
4✔
382

4✔
383
        searchRequests := []func(*opensearchapi.SearchRequest){
4✔
384
                s.client.Search.WithContext(ctx),
4✔
385
                s.client.Search.WithIndex(indexName),
4✔
386
                s.client.Search.WithBody(&buf),
4✔
387
                s.client.Search.WithTrackTotalHits(false),
4✔
388
        }
4✔
389
        if routingKey != "" {
8✔
390
                searchRequests = append(searchRequests, s.client.Search.WithRouting(routingKey))
4✔
391
        }
4✔
392
        resp, err := s.client.Search(searchRequests...)
4✔
393
        defer resp.Body.Close()
4✔
394

4✔
395
        if err != nil {
4✔
396
                return nil, err
×
397
        }
×
398

399
        if resp.IsError() {
4✔
400
                return nil, errors.New(resp.String())
×
401
        }
×
402

403
        var ret map[string]interface{}
4✔
404
        if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
4✔
405
                return nil, err
×
406
        }
×
407

408
        l.Debugf("opensearch response: %v", ret)
4✔
409
        return ret, nil
4✔
410
}
411

412
func (s *opensearchStore) SearchDevices(ctx context.Context, query model.Query) (model.M, error) {
17✔
413
        id := identity.FromContext(ctx)
17✔
414
        indexName := s.GetDevicesIndex(id.Tenant)
17✔
415
        routingKey := s.GetDevicesRoutingKey(id.Tenant)
17✔
416
        return s.search(ctx, indexName, routingKey, query)
17✔
417
}
17✔
418

419
func (s *opensearchStore) SearchDeployments(ctx context.Context,
420
        query model.Query) (model.M, error) {
6✔
421
        id := identity.FromContext(ctx)
6✔
422
        indexName := s.GetDeploymentsIndex(id.Tenant)
6✔
423
        routingKey := s.GetDeploymentsRoutingKey(id.Tenant)
6✔
424
        return s.search(ctx, indexName, routingKey, query)
6✔
425
}
6✔
426

427
func (s *opensearchStore) search(ctx context.Context, indexName, routingKey string,
428
        query model.Query) (model.M, error) {
23✔
429
        l := log.FromContext(ctx)
23✔
430

23✔
431
        var buf bytes.Buffer
23✔
432
        if err := json.NewEncoder(&buf).Encode(query); err != nil {
23✔
433
                return nil, err
×
434
        }
×
435

436
        l.Debugf("es query: %v", buf.String())
23✔
437

23✔
438
        searchRequests := []func(*opensearchapi.SearchRequest){
23✔
439
                s.client.Search.WithContext(ctx),
23✔
440
                s.client.Search.WithIndex(indexName),
23✔
441
                s.client.Search.WithBody(&buf),
23✔
442
                s.client.Search.WithTrackTotalHits(true),
23✔
443
        }
23✔
444
        if routingKey != "" {
46✔
445
                searchRequests = append(searchRequests, s.client.Search.WithRouting(routingKey))
23✔
446
        }
23✔
447
        resp, err := s.client.Search(searchRequests...)
23✔
448
        defer resp.Body.Close()
23✔
449

23✔
450
        if err != nil {
23✔
451
                return nil, err
×
452
        }
×
453

454
        if resp.IsError() {
23✔
455
                return nil, errors.New(resp.String())
×
456
        }
×
457

458
        var ret map[string]interface{}
23✔
459
        if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
23✔
460
                return nil, err
×
461
        }
×
462

463
        l.Debugf("opensearch response: %v", ret)
23✔
464
        return ret, nil
23✔
465
}
466

467
// GetDevicesIndexMapping retrieves the "devices*" index definition for tenant 'tid'
468
// existing fields, incl. inventory attributes, are found under 'properties'
469
// see: https://opensearch.org/docs/latest/api-reference/index-apis/get-index/
470
func (s *opensearchStore) GetDevicesIndexMapping(ctx context.Context,
471
        tid string) (map[string]interface{}, error) {
×
472
        idx := s.GetDevicesIndex(tid)
×
NEW
473
        return s.getIndexMapping(ctx, tid, idx)
×
NEW
474
}
×
475

476
// GetDeploymentsIndexMapping retrieves the "deployments*" index definition for tenant 'tid'
477
// existing fields, incl. inventory attributes, are found under 'properties'
478
// see: https://opensearch.org/docs/latest/api-reference/index-apis/get-index/
479
func (s *opensearchStore) GetDeploymentsIndexMapping(ctx context.Context,
NEW
480
        tid string) (map[string]interface{}, error) {
×
NEW
481
        idx := s.GetDeploymentsIndex(tid)
×
NEW
482
        return s.getIndexMapping(ctx, tid, idx)
×
NEW
483
}
×
484

485
func (s *opensearchStore) getIndexMapping(ctx context.Context,
NEW
486
        tid, idx string) (map[string]interface{}, error) {
×
NEW
487
        l := log.FromContext(ctx)
×
488
        req := opensearchapi.IndicesGetRequest{
×
489
                Index: []string{idx},
×
490
        }
×
491

×
492
        res, err := req.Do(ctx, s.client)
×
493
        if err != nil {
×
494
                return nil, errors.Wrapf(err, "failed to get devices index from store, tid %s", tid)
×
495
        }
×
496
        defer res.Body.Close()
×
497

×
498
        if res.IsError() {
×
499
                return nil, errors.Errorf(
×
500
                        "failed to get devices index from store, tid %s, code %d",
×
501
                        tid, res.StatusCode,
×
502
                )
×
503
        }
×
504

505
        var indexRes map[string]interface{}
×
506
        if err := json.NewDecoder(res.Body).Decode(&indexRes); err != nil {
×
507
                return nil, err
×
508
        }
×
509

510
        index, ok := indexRes[idx]
×
511
        if !ok {
×
512
                return nil, errors.New("can't parse index defintion response")
×
513
        }
×
514

515
        indexM, ok := index.(map[string]interface{})
×
516
        if !ok {
×
517
                return nil, errors.New("can't parse index defintion response")
×
518
        }
×
519

NEW
520
        l.Debugf("index for tid %s\n%s\n", tid, indexM)
×
521
        return indexM, nil
×
522
}
523

524
// GetDevicesIndex returns the index name for the tenant tid
525
func (s *opensearchStore) GetDevicesIndex(tid string) string {
38✔
526
        return s.devicesIndexName
38✔
527
}
38✔
528

529
// GetDeploymentsIndex returns the index name for the tenant tid
530
func (s *opensearchStore) GetDeploymentsIndex(tid string) string {
15✔
531
        return s.deploymentsIndexName
15✔
532
}
15✔
533

534
// GetDevicesRoutingKey returns the routing key for the tenant tid
535
func (s *opensearchStore) GetDevicesRoutingKey(tid string) string {
34✔
536
        return tid
34✔
537
}
34✔
538

539
// GetDeploymentsRoutingKey returns the routing key for the tenant tid
540
func (s *opensearchStore) GetDeploymentsRoutingKey(tid string) string {
12✔
541
        return tid
12✔
542
}
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc