• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been set to done!

mendersoftware / reporting / 844885364

pending completion
844885364

Pull #135

gitlab-ci

GitHub
Merge pull request #131 from alfrunes/MEN-6448
Pull Request #135: Align staging with master

1 of 1 new or added line in 1 file covered. (100.0%)

3 existing lines in 2 files now uncovered.

2807 of 3299 relevant lines covered (85.09%)

15.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.83
/store/opensearch/store.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package opensearch
16

17
import (
18
        "bytes"
19
        "context"
20
        "encoding/json"
21
        "fmt"
22
        "io/ioutil"
23
        "net/http"
24
        "strings"
25

26
        "github.com/opensearch-project/opensearch-go"
27
        "github.com/opensearch-project/opensearch-go/opensearchapi"
28
        "github.com/pkg/errors"
29

30
        "github.com/mendersoftware/go-lib-micro/identity"
31
        "github.com/mendersoftware/go-lib-micro/log"
32
        _ "github.com/mendersoftware/go-lib-micro/log"
33

34
        "github.com/mendersoftware/reporting/model"
35
        "github.com/mendersoftware/reporting/store"
36
)
37

38
type StoreOption func(*opensearchStore)
39

40
type opensearchStore struct {
41
        addresses                []string
42
        devicesIndexName         string
43
        devicesIndexShards       int
44
        devicesIndexReplicas     int
45
        deploymentsIndexName     string
46
        deploymentsIndexShards   int
47
        deploymentsIndexReplicas int
48
        client                   *opensearch.Client
49
}
50

51
func NewStore(opts ...StoreOption) (store.Store, error) {
3✔
52
        store := &opensearchStore{}
3✔
53
        for _, opt := range opts {
24✔
54
                opt(store)
21✔
55
        }
21✔
56

57
        cfg := opensearch.Config{
3✔
58
                Addresses: store.addresses,
3✔
59
        }
3✔
60
        osClient, err := opensearch.NewClient(cfg)
3✔
61
        if err != nil {
3✔
62
                return nil, errors.Wrap(err, "invalid OpenSearch configuration")
×
63
        }
×
64

65
        store.client = osClient
3✔
66
        return store, nil
3✔
67
}
68

69
func WithServerAddresses(addresses []string) StoreOption {
3✔
70
        return func(s *opensearchStore) {
6✔
71
                s.addresses = addresses
3✔
72
        }
3✔
73
}
74

75
func WithDevicesIndexName(indexName string) StoreOption {
3✔
76
        return func(s *opensearchStore) {
6✔
77
                s.devicesIndexName = indexName
3✔
78
        }
3✔
79
}
80

81
func WithDevicesIndexShards(indexShards int) StoreOption {
3✔
82
        return func(s *opensearchStore) {
6✔
83
                s.devicesIndexShards = indexShards
3✔
84
        }
3✔
85
}
86

87
func WithDevicesIndexReplicas(indexReplicas int) StoreOption {
3✔
88
        return func(s *opensearchStore) {
6✔
89
                s.devicesIndexReplicas = indexReplicas
3✔
90
        }
3✔
91
}
92

93
func WithDeploymentsIndexName(indexName string) StoreOption {
3✔
94
        return func(s *opensearchStore) {
6✔
95
                s.deploymentsIndexName = indexName
3✔
96
        }
3✔
97
}
98

99
func WithDeploymentsIndexShards(indexShards int) StoreOption {
3✔
100
        return func(s *opensearchStore) {
6✔
101
                s.deploymentsIndexShards = indexShards
3✔
102
        }
3✔
103
}
104

105
func WithDeploymentsIndexReplicas(indexReplicas int) StoreOption {
3✔
106
        return func(s *opensearchStore) {
6✔
107
                s.deploymentsIndexReplicas = indexReplicas
3✔
108
        }
3✔
109
}
110

111
type BulkAction struct {
112
        Type string
113
        Desc *BulkActionDesc
114
}
115

116
type BulkActionDesc struct {
117
        ID            string `json:"_id"`
118
        Index         string `json:"_index"`
119
        IfSeqNo       int64  `json:"_if_seq_no"`
120
        IfPrimaryTerm int64  `json:"_if_primary_term"`
121
        Routing       string `json:"routing"`
122
        Tenant        string
123
}
124

125
type BulkItem struct {
126
        Action *BulkAction
127
        Doc    interface{}
128
}
129

130
func (bad BulkActionDesc) MarshalJSON() ([]byte, error) {
19✔
131
        return json.Marshal(struct {
19✔
132
                ID      string `json:"_id"`
19✔
133
                Index   string `json:"_index"`
19✔
134
                Routing string `json:"routing"`
19✔
135
        }{
19✔
136
                ID:      bad.ID,
19✔
137
                Index:   bad.Index,
19✔
138
                Routing: bad.Routing,
19✔
139
        })
19✔
140
}
19✔
141

142
func (ba BulkAction) MarshalJSON() ([]byte, error) {
19✔
143
        a := map[string]*BulkActionDesc{
19✔
144
                ba.Type: ba.Desc,
19✔
145
        }
19✔
146
        return json.Marshal(a)
19✔
147
}
19✔
148

149
func (bi BulkItem) Marshal() ([]byte, error) {
×
150
        action, err := json.Marshal(bi.Action)
×
151
        if err != nil {
×
152
                return nil, err
×
153
        }
×
154
        buf := bytes.NewBuffer(action)
×
155
        buf.WriteString("\n")
×
156

×
157
        if bi.Doc == nil {
×
158
                return buf.Bytes(), nil
×
159
        }
×
160

161
        if bi.Doc != nil {
×
162
                doc, err := json.Marshal(bi.Doc)
×
163
                if err != nil {
×
164
                        return nil, err
×
165
                }
×
166
                buf.Write(doc)
×
167
                buf.WriteString("\n")
×
168
        }
169

170
        return buf.Bytes(), nil
×
171
}
172

173
func (s *opensearchStore) BulkIndexDeployments(ctx context.Context,
174
        deployments []*model.Deployment) error {
2✔
175
        var data strings.Builder
2✔
176

2✔
177
        for _, deployment := range deployments {
6✔
178
                actionJSON, err := json.Marshal(BulkAction{
4✔
179
                        Type: "index",
4✔
180
                        Desc: &BulkActionDesc{
4✔
181
                                ID:      deployment.ID,
4✔
182
                                Index:   s.GetDeploymentsIndex(deployment.TenantID),
4✔
183
                                Routing: s.GetDeploymentsRoutingKey(deployment.TenantID),
4✔
184
                        },
4✔
185
                })
4✔
186
                if err != nil {
4✔
187
                        return err
×
188
                }
×
189
                deploymentJSON, err := json.Marshal(deployment)
4✔
190
                if err != nil {
4✔
191
                        return err
×
192
                }
×
193
                data.WriteString(string(actionJSON) + "\n" + string(deploymentJSON) + "\n")
4✔
194
        }
195
        dataString := data.String()
2✔
196

2✔
197
        l := log.FromContext(ctx)
2✔
198
        l.Debugf("opensearch request: %s", dataString)
2✔
199

2✔
200
        req := opensearchapi.BulkRequest{
2✔
201
                Body: strings.NewReader(dataString),
2✔
202
        }
2✔
203
        res, err := req.Do(ctx, s.client)
2✔
204
        if err != nil {
2✔
205
                return errors.Wrap(err, "failed to bulk index")
×
206
        }
×
207
        defer res.Body.Close()
2✔
208

2✔
209
        return nil
2✔
210
}
211

212
func (s *opensearchStore) BulkIndexDevices(ctx context.Context, devices []*model.Device,
213
        removedDevices []*model.Device) error {
6✔
214
        var data strings.Builder
6✔
215

6✔
216
        for _, device := range devices {
21✔
217
                actionJSON, err := json.Marshal(BulkAction{
15✔
218
                        Type: "index",
15✔
219
                        Desc: &BulkActionDesc{
15✔
220
                                ID:      device.GetID(),
15✔
221
                                Index:   s.GetDevicesIndex(device.GetTenantID()),
15✔
222
                                Routing: s.GetDevicesRoutingKey(device.GetTenantID()),
15✔
223
                        },
15✔
224
                })
15✔
225
                if err != nil {
15✔
226
                        return err
×
227
                }
×
228
                deviceJSON, err := json.Marshal(device)
15✔
229
                if err != nil {
15✔
230
                        return err
×
231
                }
×
232
                data.WriteString(string(actionJSON) + "\n" + string(deviceJSON) + "\n")
15✔
233
        }
234
        for _, device := range removedDevices {
6✔
235
                actionJSON, err := json.Marshal(BulkAction{
×
236
                        Type: "delete",
×
237
                        Desc: &BulkActionDesc{
×
238
                                ID:      device.GetID(),
×
239
                                Index:   s.GetDevicesIndex(device.GetTenantID()),
×
240
                                Routing: s.GetDevicesRoutingKey(device.GetTenantID()),
×
241
                        },
×
242
                })
×
243
                if err != nil {
×
244
                        return err
×
245
                }
×
246
                data.WriteString(string(actionJSON) + "\n")
×
247
        }
248

249
        dataString := data.String()
6✔
250

6✔
251
        l := log.FromContext(ctx)
6✔
252
        l.Debugf("opensearch request: %s", dataString)
6✔
253

6✔
254
        req := opensearchapi.BulkRequest{
6✔
255
                Body: strings.NewReader(dataString),
6✔
256
        }
6✔
257
        res, err := req.Do(ctx, s.client)
6✔
258
        if err != nil {
6✔
259
                return errors.Wrap(err, "failed to bulk index")
×
260
        }
×
261
        defer res.Body.Close()
6✔
262

6✔
263
        return nil
6✔
264
}
265

266
func (s *opensearchStore) Migrate(ctx context.Context) error {
3✔
267
        indexName := s.GetDevicesIndex("")
3✔
268
        template := fmt.Sprintf(indexDevicesTemplate,
3✔
269
                indexName,
3✔
270
                s.devicesIndexShards,
3✔
271
                s.devicesIndexReplicas,
3✔
272
        )
3✔
273
        err := s.migratePutIndexTemplate(ctx, indexName, template)
3✔
274
        if err == nil {
6✔
275
                err = s.migrateCreateIndex(ctx, indexName)
3✔
276
        }
3✔
277
        if err == nil {
5✔
278
                indexName = s.GetDeploymentsIndex("")
2✔
279
                template = fmt.Sprintf(indexDeploymentsTemplate,
2✔
280
                        indexName,
2✔
281
                        s.devicesIndexShards,
2✔
282
                        s.devicesIndexReplicas,
2✔
283
                )
2✔
284
                err = s.migratePutIndexTemplate(ctx, indexName, template)
2✔
285
        }
2✔
286
        if err == nil {
5✔
287
                err = s.migrateCreateIndex(ctx, indexName)
2✔
288
        }
2✔
289
        return err
3✔
290
}
291

292
func (s *opensearchStore) migratePutIndexTemplate(ctx context.Context,
293
        indexName, template string) error {
5✔
294
        l := log.FromContext(ctx)
5✔
295
        l.Infof("put the index template for %s", indexName)
5✔
296

5✔
297
        req := opensearchapi.IndicesPutIndexTemplateRequest{
5✔
298
                Name: indexName,
5✔
299
                Body: strings.NewReader(template),
5✔
300
        }
5✔
301

5✔
302
        res, err := req.Do(ctx, s.client)
5✔
303
        if err != nil {
5✔
304
                return errors.Wrap(err, "failed to put the index template")
×
305
        }
×
306
        defer res.Body.Close()
5✔
307

5✔
308
        if res.StatusCode != http.StatusOK {
5✔
309
                body, _ := ioutil.ReadAll(res.Body)
×
310
                return errors.Errorf("failed to set up the index template: %s", string(body))
×
311
        }
×
312
        return nil
5✔
313
}
314

315
func (s *opensearchStore) migrateCreateIndex(ctx context.Context, indexName string) error {
5✔
316
        l := log.FromContext(ctx)
5✔
317
        l.Infof("verify if the index %s exists", indexName)
5✔
318

5✔
319
        req := opensearchapi.IndicesExistsRequest{
5✔
320
                Index: []string{indexName},
5✔
321
        }
5✔
322
        res, err := req.Do(ctx, s.client)
5✔
323
        if err != nil {
5✔
324
                return errors.Wrap(err, "failed to verify the index")
×
325
        }
×
326
        defer res.Body.Close()
5✔
327

5✔
328
        if res.StatusCode == http.StatusNotFound {
8✔
329
                l.Infof("create the index %s", indexName)
3✔
330

3✔
331
                req := opensearchapi.IndicesCreateRequest{
3✔
332
                        Index: indexName,
3✔
333
                }
3✔
334
                res, err := req.Do(ctx, s.client)
3✔
335
                if err != nil {
3✔
336
                        return errors.Wrap(err, "failed to create the index")
×
337
                }
×
338
                defer res.Body.Close()
3✔
339

3✔
340
                if res.StatusCode != http.StatusOK {
4✔
341
                        return errors.New("failed to create the index")
1✔
342
                }
1✔
343
        } else if res.StatusCode != http.StatusOK {
2✔
344
                return errors.New("failed to verify the index")
×
345
        }
×
346

347
        return nil
4✔
348
}
349

350
func (s *opensearchStore) Ping(ctx context.Context) error {
133✔
351
        pingRequest := s.client.Ping.WithContext(ctx)
133✔
352
        _, err := s.client.Ping(pingRequest)
133✔
353
        return errors.Wrap(err, "failed to ping opensearch")
133✔
354
}
133✔
355

356
func (s *opensearchStore) AggregateDevices(ctx context.Context,
357
        query model.Query) (model.M, error) {
2✔
358
        id := identity.FromContext(ctx)
2✔
359
        indexName := s.GetDevicesIndex(id.Tenant)
2✔
360
        routingKey := s.GetDevicesRoutingKey(id.Tenant)
2✔
361
        return s.aggregate(ctx, indexName, routingKey, query)
2✔
362
}
2✔
363

364
func (s *opensearchStore) AggregateDeployments(ctx context.Context,
365
        query model.Query) (model.M, error) {
2✔
366
        id := identity.FromContext(ctx)
2✔
367
        indexName := s.GetDeploymentsIndex(id.Tenant)
2✔
368
        routingKey := s.GetDeploymentsRoutingKey(id.Tenant)
2✔
369
        return s.aggregate(ctx, indexName, routingKey, query)
2✔
370
}
2✔
371

372
func (s *opensearchStore) aggregate(ctx context.Context, indexName, routingKey string,
373
        query model.Query) (model.M, error) {
4✔
374
        l := log.FromContext(ctx)
4✔
375

4✔
376
        var buf bytes.Buffer
4✔
377
        if err := json.NewEncoder(&buf).Encode(query); err != nil {
4✔
378
                return nil, err
×
379
        }
×
380

381
        l.Debugf("es query: %v", buf.String())
4✔
382

4✔
383
        searchRequests := []func(*opensearchapi.SearchRequest){
4✔
384
                s.client.Search.WithContext(ctx),
4✔
385
                s.client.Search.WithIndex(indexName),
4✔
386
                s.client.Search.WithBody(&buf),
4✔
387
                s.client.Search.WithTrackTotalHits(false),
4✔
388
        }
4✔
389
        if routingKey != "" {
8✔
390
                searchRequests = append(searchRequests, s.client.Search.WithRouting(routingKey))
4✔
391
        }
4✔
392
        resp, err := s.client.Search(searchRequests...)
4✔
393
        defer resp.Body.Close()
4✔
394

4✔
395
        if err != nil {
4✔
396
                return nil, err
×
397
        }
×
398

399
        if resp.IsError() {
4✔
400
                return nil, errors.New(resp.String())
×
401
        }
×
402

403
        var ret map[string]interface{}
4✔
404
        if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
4✔
405
                return nil, err
×
406
        }
×
407

408
        l.Debugf("opensearch response: %v", ret)
4✔
409
        return ret, nil
4✔
410
}
411

412
func (s *opensearchStore) SearchDevices(ctx context.Context, query model.Query) (model.M, error) {
15✔
413
        id := identity.FromContext(ctx)
15✔
414
        indexName := s.GetDevicesIndex(id.Tenant)
15✔
415
        routingKey := s.GetDevicesRoutingKey(id.Tenant)
15✔
416
        return s.search(ctx, indexName, routingKey, query)
15✔
417
}
15✔
418

419
func (s *opensearchStore) SearchDeployments(ctx context.Context,
420
        query model.Query) (model.M, error) {
6✔
421
        id := identity.FromContext(ctx)
6✔
422
        indexName := s.GetDeploymentsIndex(id.Tenant)
6✔
423
        routingKey := s.GetDeploymentsRoutingKey(id.Tenant)
6✔
424
        return s.search(ctx, indexName, routingKey, query)
6✔
425
}
6✔
426

427
func (s *opensearchStore) search(ctx context.Context, indexName, routingKey string,
428
        query model.Query) (model.M, error) {
21✔
429
        l := log.FromContext(ctx)
21✔
430

21✔
431
        var buf bytes.Buffer
21✔
432
        if err := json.NewEncoder(&buf).Encode(query); err != nil {
21✔
433
                return nil, err
×
434
        }
×
435

436
        l.Debugf("es query: %v", buf.String())
21✔
437

21✔
438
        searchRequests := []func(*opensearchapi.SearchRequest){
21✔
439
                s.client.Search.WithContext(ctx),
21✔
440
                s.client.Search.WithIndex(indexName),
21✔
441
                s.client.Search.WithBody(&buf),
21✔
442
                s.client.Search.WithTrackTotalHits(true),
21✔
443
        }
21✔
444
        if routingKey != "" {
42✔
445
                searchRequests = append(searchRequests, s.client.Search.WithRouting(routingKey))
21✔
446
        }
21✔
447
        resp, err := s.client.Search(searchRequests...)
21✔
448
        if err != nil {
21✔
UNCOV
449
                return nil, err
×
450
        }
×
451
        defer resp.Body.Close()
21✔
452

21✔
453
        if resp.IsError() {
21✔
454
                return nil, errors.New(resp.String())
×
455
        }
×
456

457
        var ret map[string]interface{}
21✔
458
        if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
21✔
459
                return nil, err
×
460
        }
×
461

462
        l.Debugf("opensearch response: %v", ret)
21✔
463
        return ret, nil
21✔
464
}
465

466
// GetDevicesIndexMapping retrieves the "devices*" index definition for tenant 'tid'
467
// existing fields, incl. inventory attributes, are found under 'properties'
468
// see: https://opensearch.org/docs/latest/api-reference/index-apis/get-index/
469
func (s *opensearchStore) GetDevicesIndexMapping(ctx context.Context,
470
        tid string) (map[string]interface{}, error) {
×
471
        idx := s.GetDevicesIndex(tid)
×
472
        return s.getIndexMapping(ctx, tid, idx)
×
473
}
×
474

475
// GetDeploymentsIndexMapping retrieves the "deployments*" index definition for tenant 'tid'
476
// existing fields, incl. inventory attributes, are found under 'properties'
477
// see: https://opensearch.org/docs/latest/api-reference/index-apis/get-index/
478
func (s *opensearchStore) GetDeploymentsIndexMapping(ctx context.Context,
479
        tid string) (map[string]interface{}, error) {
×
480
        idx := s.GetDeploymentsIndex(tid)
×
481
        return s.getIndexMapping(ctx, tid, idx)
×
482
}
×
483

484
func (s *opensearchStore) getIndexMapping(ctx context.Context,
485
        tid, idx string) (map[string]interface{}, error) {
×
486
        l := log.FromContext(ctx)
×
487
        req := opensearchapi.IndicesGetRequest{
×
488
                Index: []string{idx},
×
489
        }
×
490

×
491
        res, err := req.Do(ctx, s.client)
×
492
        if err != nil {
×
493
                return nil, errors.Wrapf(err, "failed to get devices index from store, tid %s", tid)
×
494
        }
×
495
        defer res.Body.Close()
×
496

×
497
        if res.IsError() {
×
498
                return nil, errors.Errorf(
×
499
                        "failed to get devices index from store, tid %s, code %d",
×
500
                        tid, res.StatusCode,
×
501
                )
×
502
        }
×
503

504
        var indexRes map[string]interface{}
×
505
        if err := json.NewDecoder(res.Body).Decode(&indexRes); err != nil {
×
506
                return nil, err
×
507
        }
×
508

509
        index, ok := indexRes[idx]
×
510
        if !ok {
×
511
                return nil, errors.New("can't parse index defintion response")
×
512
        }
×
513

514
        indexM, ok := index.(map[string]interface{})
×
515
        if !ok {
×
516
                return nil, errors.New("can't parse index defintion response")
×
517
        }
×
518

519
        l.Debugf("index for tid %s\n%s\n", tid, indexM)
×
520
        return indexM, nil
×
521
}
522

523
// GetDevicesIndex returns the index name for the tenant tid
524
func (s *opensearchStore) GetDevicesIndex(tid string) string {
35✔
525
        return s.devicesIndexName
35✔
526
}
35✔
527

528
// GetDeploymentsIndex returns the index name for the tenant tid
529
func (s *opensearchStore) GetDeploymentsIndex(tid string) string {
14✔
530
        return s.deploymentsIndexName
14✔
531
}
14✔
532

533
// GetDevicesRoutingKey returns the routing key for the tenant tid
534
func (s *opensearchStore) GetDevicesRoutingKey(tid string) string {
32✔
535
        return tid
32✔
536
}
32✔
537

538
// GetDeploymentsRoutingKey returns the routing key for the tenant tid
539
func (s *opensearchStore) GetDeploymentsRoutingKey(tid string) string {
12✔
540
        return tid
12✔
541
}
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc