• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been set to done!

mendersoftware / reporting / 732508278

pending completion
732508278

Pull #87

gitlab-ci

Fabio Tranchitella
refac: rename internal search end-point to `/tenants/{tenant_id}/devices/search`
Pull Request #87: MEN-5930: index device deployment objects

331 of 402 new or added lines in 12 files covered. (82.34%)

135 existing lines in 5 files now uncovered.

2196 of 2600 relevant lines covered (84.46%)

17.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.22
/store/opensearch/store.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package opensearch
16

17
import (
18
        "bytes"
19
        "context"
20
        "encoding/json"
21
        "fmt"
22
        "net/http"
23
        "strings"
24

25
        "github.com/opensearch-project/opensearch-go"
26
        "github.com/opensearch-project/opensearch-go/opensearchapi"
27
        "github.com/pkg/errors"
28

29
        "github.com/mendersoftware/go-lib-micro/identity"
30
        "github.com/mendersoftware/go-lib-micro/log"
31
        _ "github.com/mendersoftware/go-lib-micro/log"
32

33
        "github.com/mendersoftware/reporting/model"
34
        "github.com/mendersoftware/reporting/store"
35
)
36

37
type StoreOption func(*opensearchStore)
38

39
type opensearchStore struct {
40
        addresses                []string
41
        devicesIndexName         string
42
        devicesIndexShards       int
43
        devicesIndexReplicas     int
44
        deploymentsIndexName     string
45
        deploymentsIndexShards   int
46
        deploymentsIndexReplicas int
47
        client                   *opensearch.Client
48
}
49

50
func NewStore(opts ...StoreOption) (store.Store, error) {
3✔
51
        store := &opensearchStore{}
3✔
52
        for _, opt := range opts {
24✔
53
                opt(store)
21✔
54
        }
21✔
55

56
        cfg := opensearch.Config{
3✔
57
                Addresses: store.addresses,
3✔
58
        }
3✔
59
        osClient, err := opensearch.NewClient(cfg)
3✔
60
        if err != nil {
3✔
UNCOV
61
                return nil, errors.Wrap(err, "invalid OpenSearch configuration")
×
62
        }
×
63

64
        store.client = osClient
3✔
65
        return store, nil
3✔
66
}
67

68
func WithServerAddresses(addresses []string) StoreOption {
3✔
69
        return func(s *opensearchStore) {
6✔
70
                s.addresses = addresses
3✔
71
        }
3✔
72
}
73

74
func WithDevicesIndexName(indexName string) StoreOption {
3✔
75
        return func(s *opensearchStore) {
6✔
76
                s.devicesIndexName = indexName
3✔
77
        }
3✔
78
}
79

80
func WithDevicesIndexShards(indexShards int) StoreOption {
3✔
81
        return func(s *opensearchStore) {
6✔
82
                s.devicesIndexShards = indexShards
3✔
83
        }
3✔
84
}
85

86
func WithDevicesIndexReplicas(indexReplicas int) StoreOption {
3✔
87
        return func(s *opensearchStore) {
6✔
88
                s.devicesIndexReplicas = indexReplicas
3✔
89
        }
3✔
90
}
91

92
func WithDeploymentsIndexName(indexName string) StoreOption {
3✔
93
        return func(s *opensearchStore) {
6✔
94
                s.deploymentsIndexName = indexName
3✔
95
        }
3✔
96
}
97

98
func WithDeploymentsIndexShards(indexShards int) StoreOption {
3✔
99
        return func(s *opensearchStore) {
6✔
100
                s.deploymentsIndexShards = indexShards
3✔
101
        }
3✔
102
}
103

104
func WithDeploymentsIndexReplicas(indexReplicas int) StoreOption {
3✔
105
        return func(s *opensearchStore) {
6✔
106
                s.deploymentsIndexReplicas = indexReplicas
3✔
107
        }
3✔
108
}
109

110
type BulkAction struct {
111
        Type string
112
        Desc *BulkActionDesc
113
}
114

115
type BulkActionDesc struct {
116
        ID            string `json:"_id"`
117
        Index         string `json:"_index"`
118
        IfSeqNo       int64  `json:"_if_seq_no"`
119
        IfPrimaryTerm int64  `json:"_if_primary_term"`
120
        Routing       string `json:"routing"`
121
        Tenant        string
122
}
123

124
type BulkItem struct {
125
        Action *BulkAction
126
        Doc    interface{}
127
}
128

129
func (bad BulkActionDesc) MarshalJSON() ([]byte, error) {
15✔
130
        return json.Marshal(struct {
15✔
131
                ID      string `json:"_id"`
15✔
132
                Index   string `json:"_index"`
15✔
133
                Routing string `json:"routing"`
15✔
134
        }{
15✔
135
                ID:      bad.ID,
15✔
136
                Index:   bad.Index,
15✔
137
                Routing: bad.Routing,
15✔
138
        })
15✔
139
}
15✔
140

141
func (ba BulkAction) MarshalJSON() ([]byte, error) {
15✔
142
        a := map[string]*BulkActionDesc{
15✔
143
                ba.Type: ba.Desc,
15✔
144
        }
15✔
145
        return json.Marshal(a)
15✔
146
}
15✔
147

UNCOV
148
func (bi BulkItem) Marshal() ([]byte, error) {
×
149
        action, err := json.Marshal(bi.Action)
×
150
        if err != nil {
×
151
                return nil, err
×
152
        }
×
153
        buf := bytes.NewBuffer(action)
×
154
        buf.WriteString("\n")
×
155

×
156
        if bi.Doc == nil {
×
157
                return buf.Bytes(), nil
×
158
        }
×
159

UNCOV
160
        if bi.Doc != nil {
×
161
                doc, err := json.Marshal(bi.Doc)
×
162
                if err != nil {
×
163
                        return nil, err
×
164
                }
×
165
                buf.Write(doc)
×
166
                buf.WriteString("\n")
×
167
        }
168

UNCOV
169
        return buf.Bytes(), nil
×
170
}
171

172
func (s *opensearchStore) BulkIndexDevices(ctx context.Context, devices []*model.Device,
173
        removedDevices []*model.Device) error {
6✔
174
        var data strings.Builder
6✔
175

6✔
176
        for _, device := range devices {
21✔
177
                actionJSON, err := json.Marshal(BulkAction{
15✔
178
                        Type: "index",
15✔
179
                        Desc: &BulkActionDesc{
15✔
180
                                ID:      device.GetID(),
15✔
181
                                Index:   s.GetDevicesIndex(device.GetTenantID()),
15✔
182
                                Routing: s.GetDevicesRoutingKey(device.GetTenantID()),
15✔
183
                        },
15✔
184
                })
15✔
185
                if err != nil {
15✔
186
                        return err
×
187
                }
×
188
                deviceJSON, err := json.Marshal(device)
15✔
189
                if err != nil {
15✔
190
                        return err
×
191
                }
×
192
                data.WriteString(string(actionJSON) + "\n" + string(deviceJSON) + "\n")
15✔
193
        }
194
        for _, device := range removedDevices {
6✔
195
                actionJSON, err := json.Marshal(BulkAction{
×
196
                        Type: "delete",
×
197
                        Desc: &BulkActionDesc{
×
198
                                ID:      device.GetID(),
×
199
                                Index:   s.GetDevicesIndex(device.GetTenantID()),
×
200
                                Routing: s.GetDevicesRoutingKey(device.GetTenantID()),
×
201
                        },
×
202
                })
×
203
                if err != nil {
×
204
                        return err
×
205
                }
×
206
                data.WriteString(string(actionJSON) + "\n")
×
207
        }
208

209
        dataString := data.String()
6✔
210

6✔
211
        l := log.FromContext(ctx)
6✔
212
        l.Debugf("opensearch request: %s", dataString)
6✔
213

6✔
214
        req := opensearchapi.BulkRequest{
6✔
215
                Body: strings.NewReader(dataString),
6✔
216
        }
6✔
217
        res, err := req.Do(ctx, s.client)
6✔
218
        if err != nil {
6✔
UNCOV
219
                return errors.Wrap(err, "failed to bulk index")
×
UNCOV
220
        }
×
221
        defer res.Body.Close()
6✔
222

6✔
223
        return nil
6✔
224
}
225

226
func (s *opensearchStore) Migrate(ctx context.Context) error {
3✔
227
        indexName := s.GetDevicesIndex("")
3✔
228
        template := fmt.Sprintf(indexDevicesTemplate,
3✔
229
                indexName,
3✔
230
                s.devicesIndexShards,
3✔
231
                s.devicesIndexReplicas,
3✔
232
        )
3✔
233
        err := s.migratePutIndexTemplate(ctx, indexName, template)
3✔
234
        if err == nil {
6✔
235
                indexName = s.GetDeploymentsIndex("")
3✔
236
                template = fmt.Sprintf(indexDeploymentsTemplate,
3✔
237
                        indexName,
3✔
238
                        s.devicesIndexShards,
3✔
239
                        s.devicesIndexReplicas,
3✔
240
                )
3✔
241
                err = s.migratePutIndexTemplate(ctx, indexName, template)
3✔
242
        }
3✔
243
        if err == nil {
6✔
244
                err = s.migrateCreateIndex(ctx, indexName)
3✔
245
        }
3✔
246
        return err
3✔
247
}
248

249
func (s *opensearchStore) migratePutIndexTemplate(ctx context.Context,
250
        indexName, template string) error {
6✔
251
        l := log.FromContext(ctx)
6✔
252
        l.Infof("put the index template for %s", indexName)
6✔
253

6✔
254
        req := opensearchapi.IndicesPutIndexTemplateRequest{
6✔
255
                Name: indexName,
6✔
256
                Body: strings.NewReader(template),
6✔
257
        }
6✔
258

6✔
259
        res, err := req.Do(ctx, s.client)
6✔
260
        if err != nil {
6✔
UNCOV
261
                return errors.Wrap(err, "failed to put the index template")
×
UNCOV
262
        }
×
263
        defer res.Body.Close()
6✔
264

6✔
265
        if res.StatusCode != http.StatusOK {
6✔
UNCOV
266
                return errors.New("failed to set up the index template")
×
UNCOV
267
        }
×
268
        return nil
6✔
269
}
270

271
func (s *opensearchStore) migrateCreateIndex(ctx context.Context, indexName string) error {
3✔
272
        l := log.FromContext(ctx)
3✔
273
        l.Infof("verify if the index %s exists", indexName)
3✔
274

3✔
275
        req := opensearchapi.IndicesExistsRequest{
3✔
276
                Index: []string{indexName},
3✔
277
        }
3✔
278
        res, err := req.Do(ctx, s.client)
3✔
279
        if err != nil {
3✔
280
                return errors.Wrap(err, "failed to verify the index")
×
281
        }
×
282
        defer res.Body.Close()
3✔
283

3✔
284
        if res.StatusCode == http.StatusNotFound {
5✔
285
                l.Infof("create the index %s", indexName)
2✔
286

2✔
287
                req := opensearchapi.IndicesCreateRequest{
2✔
288
                        Index: indexName,
2✔
289
                }
2✔
290
                res, err := req.Do(ctx, s.client)
2✔
291
                if err != nil {
2✔
292
                        return errors.Wrap(err, "failed to create the index")
×
293
                }
×
294
                defer res.Body.Close()
2✔
295

2✔
296
                if res.StatusCode != http.StatusOK {
3✔
297
                        return errors.New("failed to create the index")
1✔
298
                }
1✔
299
        } else if res.StatusCode != http.StatusOK {
1✔
UNCOV
300
                return errors.New("failed to verify the index")
×
UNCOV
301
        }
×
302

303
        return nil
2✔
304
}
305

306
func (s *opensearchStore) Ping(ctx context.Context) error {
123✔
307
        pingRequest := s.client.Ping.WithContext(ctx)
123✔
308
        _, err := s.client.Ping(pingRequest)
123✔
309
        return errors.Wrap(err, "failed to ping opensearch")
123✔
310
}
123✔
311

312
func (s *opensearchStore) AggregateDevices(ctx context.Context,
313
        query model.Query) (model.M, error) {
2✔
314
        id := identity.FromContext(ctx)
2✔
315
        indexName := s.GetDevicesIndex(id.Tenant)
2✔
316
        routingKey := s.GetDevicesRoutingKey(id.Tenant)
2✔
317
        return s.aggregate(ctx, indexName, routingKey, query)
2✔
318
}
2✔
319

320
func (s *opensearchStore) AggregateDeployments(ctx context.Context,
NEW
UNCOV
321
        query model.Query) (model.M, error) {
×
NEW
UNCOV
322
        id := identity.FromContext(ctx)
×
NEW
UNCOV
323
        indexName := s.GetDeploymentsIndex(id.Tenant)
×
NEW
324
        routingKey := s.GetDeploymentsRoutingKey(id.Tenant)
×
NEW
325
        return s.aggregate(ctx, indexName, routingKey, query)
×
NEW
UNCOV
326
}
×
327

328
func (s *opensearchStore) aggregate(ctx context.Context, indexName, routingKey string,
329
        query model.Query) (model.M, error) {
2✔
330
        l := log.FromContext(ctx)
2✔
331

2✔
332
        var buf bytes.Buffer
2✔
333
        if err := json.NewEncoder(&buf).Encode(query); err != nil {
2✔
UNCOV
334
                return nil, err
×
UNCOV
335
        }
×
336

337
        l.Debugf("es query: %v", buf.String())
2✔
338

2✔
339
        searchRequests := []func(*opensearchapi.SearchRequest){
2✔
340
                s.client.Search.WithContext(ctx),
2✔
341
                s.client.Search.WithIndex(indexName),
2✔
342
                s.client.Search.WithBody(&buf),
2✔
343
                s.client.Search.WithTrackTotalHits(false),
2✔
344
        }
2✔
345
        if routingKey != "" {
4✔
346
                searchRequests = append(searchRequests, s.client.Search.WithRouting(routingKey))
2✔
347
        }
2✔
348
        resp, err := s.client.Search(searchRequests...)
2✔
349
        defer resp.Body.Close()
2✔
350

2✔
351
        if err != nil {
2✔
UNCOV
352
                return nil, err
×
UNCOV
353
        }
×
354

355
        if resp.IsError() {
2✔
356
                return nil, errors.New(resp.String())
×
357
        }
×
358

359
        var ret map[string]interface{}
2✔
360
        if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
2✔
361
                return nil, err
×
362
        }
×
363

364
        l.Debugf("opensearch response: %v", ret)
2✔
365
        return ret, nil
2✔
366
}
367

368
func (s *opensearchStore) SearchDevices(ctx context.Context, query model.Query) (model.M, error) {
17✔
369
        id := identity.FromContext(ctx)
17✔
370
        indexName := s.GetDevicesIndex(id.Tenant)
17✔
371
        routingKey := s.GetDevicesRoutingKey(id.Tenant)
17✔
372
        return s.search(ctx, indexName, routingKey, query)
17✔
373
}
17✔
374

375
func (s *opensearchStore) SearchDeployments(ctx context.Context,
NEW
UNCOV
376
        query model.Query) (model.M, error) {
×
NEW
UNCOV
377
        id := identity.FromContext(ctx)
×
NEW
378
        indexName := s.GetDeploymentsIndex(id.Tenant)
×
NEW
379
        routingKey := s.GetDeploymentsRoutingKey(id.Tenant)
×
NEW
UNCOV
380
        return s.search(ctx, indexName, routingKey, query)
×
NEW
UNCOV
381
}
×
382

383
func (s *opensearchStore) search(ctx context.Context, indexName, routingKey string,
384
        query model.Query) (model.M, error) {
17✔
385
        l := log.FromContext(ctx)
17✔
386

17✔
387
        var buf bytes.Buffer
17✔
388
        if err := json.NewEncoder(&buf).Encode(query); err != nil {
17✔
UNCOV
389
                return nil, err
×
UNCOV
390
        }
×
391

392
        l.Debugf("es query: %v", buf.String())
17✔
393

17✔
394
        searchRequests := []func(*opensearchapi.SearchRequest){
17✔
395
                s.client.Search.WithContext(ctx),
17✔
396
                s.client.Search.WithIndex(indexName),
17✔
397
                s.client.Search.WithBody(&buf),
17✔
398
                s.client.Search.WithTrackTotalHits(true),
17✔
399
        }
17✔
400
        if routingKey != "" {
34✔
401
                searchRequests = append(searchRequests, s.client.Search.WithRouting(routingKey))
17✔
402
        }
17✔
403
        resp, err := s.client.Search(searchRequests...)
17✔
404
        defer resp.Body.Close()
17✔
405

17✔
406
        if err != nil {
17✔
UNCOV
407
                return nil, err
×
UNCOV
408
        }
×
409

410
        if resp.IsError() {
17✔
UNCOV
411
                return nil, errors.New(resp.String())
×
412
        }
×
413

414
        var ret map[string]interface{}
17✔
415
        if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
17✔
416
                return nil, err
×
417
        }
×
418

419
        l.Debugf("opensearch response: %v", ret)
17✔
420
        return ret, nil
17✔
421
}
422

423
// GetDevicesIndexMapping retrieves the "devices*" index definition for tenant 'tid'
424
// existing fields, incl. inventory attributes, are found under 'properties'
425
// see: https://opensearch.org/docs/latest/api-reference/index-apis/get-index/
426
func (s *opensearchStore) GetDevicesIndexMapping(ctx context.Context,
427
        tid string) (map[string]interface{}, error) {
×
428
        idx := s.GetDevicesIndex(tid)
×
NEW
UNCOV
429
        return s.getIndexMapping(ctx, tid, idx)
×
NEW
UNCOV
430
}
×
431

432
// GetDeploymentsIndexMapping retrieves the "deployments*" index definition for tenant 'tid'
433
// existing fields, incl. inventory attributes, are found under 'properties'
434
// see: https://opensearch.org/docs/latest/api-reference/index-apis/get-index/
435
func (s *opensearchStore) GetDeploymentsIndexMapping(ctx context.Context,
NEW
UNCOV
436
        tid string) (map[string]interface{}, error) {
×
NEW
UNCOV
437
        idx := s.GetDeploymentsIndex(tid)
×
NEW
UNCOV
438
        return s.getIndexMapping(ctx, tid, idx)
×
NEW
UNCOV
439
}
×
440

441
func (s *opensearchStore) getIndexMapping(ctx context.Context,
NEW
UNCOV
442
        tid, idx string) (map[string]interface{}, error) {
×
NEW
UNCOV
443
        l := log.FromContext(ctx)
×
UNCOV
444
        req := opensearchapi.IndicesGetRequest{
×
UNCOV
445
                Index: []string{idx},
×
UNCOV
446
        }
×
UNCOV
447

×
UNCOV
448
        res, err := req.Do(ctx, s.client)
×
UNCOV
449
        if err != nil {
×
UNCOV
450
                return nil, errors.Wrapf(err, "failed to get devices index from store, tid %s", tid)
×
451
        }
×
452
        defer res.Body.Close()
×
UNCOV
453

×
UNCOV
454
        if res.IsError() {
×
455
                return nil, errors.Errorf(
×
456
                        "failed to get devices index from store, tid %s, code %d",
×
UNCOV
457
                        tid, res.StatusCode,
×
UNCOV
458
                )
×
UNCOV
459
        }
×
460

461
        var indexRes map[string]interface{}
×
UNCOV
462
        if err := json.NewDecoder(res.Body).Decode(&indexRes); err != nil {
×
UNCOV
463
                return nil, err
×
UNCOV
464
        }
×
465

UNCOV
466
        index, ok := indexRes[idx]
×
UNCOV
467
        if !ok {
×
UNCOV
468
                return nil, errors.New("can't parse index defintion response")
×
UNCOV
469
        }
×
470

471
        indexM, ok := index.(map[string]interface{})
×
472
        if !ok {
×
473
                return nil, errors.New("can't parse index defintion response")
×
474
        }
×
475

NEW
476
        l.Debugf("index for tid %s\n%s\n", tid, indexM)
×
477
        return indexM, nil
×
478
}
479

480
// GetDevicesIndex returns the index name for the tenant tid
481
func (s *opensearchStore) GetDevicesIndex(tid string) string {
37✔
482
        return s.devicesIndexName
37✔
483
}
37✔
484

485
// GetDeploymentsIndex returns the index name for the tenant tid
486
func (s *opensearchStore) GetDeploymentsIndex(tid string) string {
3✔
487
        return s.deploymentsIndexName
3✔
488
}
3✔
489

490
// GetDevicesRoutingKey returns the routing key for the tenant tid
491
func (s *opensearchStore) GetDevicesRoutingKey(tid string) string {
34✔
492
        return tid
34✔
493
}
34✔
494

495
// GetDeploymentsRoutingKey returns the routing key for the tenant tid
NEW
496
func (s *opensearchStore) GetDeploymentsRoutingKey(tid string) string {
×
NEW
497
        return tid
×
NEW
498
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc