• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 728455154

pending completion
728455154

Pull #83

gitlab-ci

Fabio Tranchitella
ci(tests): device inventory data aggregation acceptance tests
Pull Request #83: feat: device inventory data aggregation

198 of 234 new or added lines in 6 files covered. (84.62%)

2 existing lines in 1 file now uncovered.

2090 of 2464 relevant lines covered (84.82%)

16.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.33
/store/opensearch/store.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package opensearch
16

17
import (
18
        "bytes"
19
        "context"
20
        "encoding/json"
21
        "fmt"
22
        "net/http"
23
        "strings"
24

25
        "github.com/opensearch-project/opensearch-go"
26
        "github.com/opensearch-project/opensearch-go/opensearchapi"
27
        "github.com/pkg/errors"
28

29
        "github.com/mendersoftware/go-lib-micro/identity"
30
        "github.com/mendersoftware/go-lib-micro/log"
31
        _ "github.com/mendersoftware/go-lib-micro/log"
32

33
        "github.com/mendersoftware/reporting/model"
34
        "github.com/mendersoftware/reporting/store"
35
)
36

37
type StoreOption func(*opensearchStore)
38

39
type opensearchStore struct {
40
        addresses            []string
41
        devicesIndexName     string
42
        devicesIndexShards   int
43
        devicesIndexReplicas int
44
        client               *opensearch.Client
45
}
46

47
func NewStore(opts ...StoreOption) (store.Store, error) {
3✔
48
        store := &opensearchStore{}
3✔
49
        for _, opt := range opts {
15✔
50
                opt(store)
12✔
51
        }
12✔
52

53
        cfg := opensearch.Config{
3✔
54
                Addresses: store.addresses,
3✔
55
        }
3✔
56
        osClient, err := opensearch.NewClient(cfg)
3✔
57
        if err != nil {
3✔
58
                return nil, errors.Wrap(err, "invalid OpenSearch configuration")
×
59
        }
×
60

61
        store.client = osClient
3✔
62
        return store, nil
3✔
63
}
64

65
func WithServerAddresses(addresses []string) StoreOption {
3✔
66
        return func(s *opensearchStore) {
6✔
67
                s.addresses = addresses
3✔
68
        }
3✔
69
}
70

71
func WithDevicesIndexName(indexName string) StoreOption {
3✔
72
        return func(s *opensearchStore) {
6✔
73
                s.devicesIndexName = indexName
3✔
74
        }
3✔
75
}
76

77
func WithDevicesIndexShards(indexShards int) StoreOption {
3✔
78
        return func(s *opensearchStore) {
6✔
79
                s.devicesIndexShards = indexShards
3✔
80
        }
3✔
81
}
82

83
func WithDevicesIndexReplicas(indexReplicas int) StoreOption {
3✔
84
        return func(s *opensearchStore) {
6✔
85
                s.devicesIndexReplicas = indexReplicas
3✔
86
        }
3✔
87
}
88

89
type BulkAction struct {
90
        Type string
91
        Desc *BulkActionDesc
92
}
93

94
type BulkActionDesc struct {
95
        ID            string `json:"_id"`
96
        Index         string `json:"_index"`
97
        IfSeqNo       int64  `json:"_if_seq_no"`
98
        IfPrimaryTerm int64  `json:"_if_primary_term"`
99
        Routing       string `json:"routing"`
100
        Tenant        string
101
}
102

103
type BulkItem struct {
104
        Action *BulkAction
105
        Doc    interface{}
106
}
107

108
func (bad BulkActionDesc) MarshalJSON() ([]byte, error) {
15✔
109
        return json.Marshal(struct {
15✔
110
                ID      string `json:"_id"`
15✔
111
                Index   string `json:"_index"`
15✔
112
                Routing string `json:"routing"`
15✔
113
        }{
15✔
114
                ID:      bad.ID,
15✔
115
                Index:   bad.Index,
15✔
116
                Routing: bad.Routing,
15✔
117
        })
15✔
118
}
15✔
119

120
func (ba BulkAction) MarshalJSON() ([]byte, error) {
15✔
121
        a := map[string]*BulkActionDesc{
15✔
122
                ba.Type: ba.Desc,
15✔
123
        }
15✔
124
        return json.Marshal(a)
15✔
125
}
15✔
126

127
func (bi BulkItem) Marshal() ([]byte, error) {
×
128
        action, err := json.Marshal(bi.Action)
×
129
        if err != nil {
×
130
                return nil, err
×
131
        }
×
132
        buf := bytes.NewBuffer(action)
×
133
        buf.WriteString("\n")
×
134

×
135
        if bi.Doc == nil {
×
136
                return buf.Bytes(), nil
×
137
        }
×
138

139
        if bi.Doc != nil {
×
140
                doc, err := json.Marshal(bi.Doc)
×
141
                if err != nil {
×
142
                        return nil, err
×
143
                }
×
144
                buf.Write(doc)
×
145
                buf.WriteString("\n")
×
146
        }
147

148
        return buf.Bytes(), nil
×
149
}
150

151
func (s *opensearchStore) BulkIndexDevices(ctx context.Context, devices []*model.Device,
152
        removedDevices []*model.Device) error {
7✔
153
        var data strings.Builder
7✔
154

7✔
155
        for _, device := range devices {
22✔
156
                actionJSON, err := json.Marshal(BulkAction{
15✔
157
                        Type: "index",
15✔
158
                        Desc: &BulkActionDesc{
15✔
159
                                ID:      device.GetID(),
15✔
160
                                Index:   s.GetDevicesIndex(device.GetTenantID()),
15✔
161
                                Routing: s.GetDevicesRoutingKey(device.GetTenantID()),
15✔
162
                        },
15✔
163
                })
15✔
164
                if err != nil {
15✔
165
                        return err
×
166
                }
×
167
                deviceJSON, err := json.Marshal(device)
15✔
168
                if err != nil {
15✔
169
                        return err
×
170
                }
×
171
                data.WriteString(string(actionJSON) + "\n" + string(deviceJSON) + "\n")
15✔
172
        }
173
        for _, device := range removedDevices {
7✔
174
                actionJSON, err := json.Marshal(BulkAction{
×
175
                        Type: "delete",
×
176
                        Desc: &BulkActionDesc{
×
177
                                ID:      device.GetID(),
×
178
                                Index:   s.GetDevicesIndex(device.GetTenantID()),
×
179
                                Routing: s.GetDevicesRoutingKey(device.GetTenantID()),
×
180
                        },
×
181
                })
×
182
                if err != nil {
×
183
                        return err
×
184
                }
×
185
                data.WriteString(string(actionJSON) + "\n")
×
186
        }
187

188
        dataString := data.String()
7✔
189

7✔
190
        l := log.FromContext(ctx)
7✔
191
        l.Debugf("opensearch request: %s", dataString)
7✔
192

7✔
193
        req := opensearchapi.BulkRequest{
7✔
194
                Body: strings.NewReader(dataString),
7✔
195
        }
7✔
196
        res, err := req.Do(ctx, s.client)
7✔
197
        if err != nil {
7✔
198
                return errors.Wrap(err, "failed to bulk index")
×
199
        }
×
200
        defer res.Body.Close()
7✔
201

7✔
202
        return nil
7✔
203
}
204

205
func (s *opensearchStore) Migrate(ctx context.Context) error {
3✔
206
        indexName := s.GetDevicesIndex("")
3✔
207
        err := s.migratePutIndexTemplate(ctx, indexName)
3✔
208
        if err == nil {
6✔
209
                err = s.migrateCreateIndex(ctx, indexName)
3✔
210
        }
3✔
211
        return err
3✔
212
}
213

214
func (s *opensearchStore) migratePutIndexTemplate(ctx context.Context, indexName string) error {
3✔
215
        l := log.FromContext(ctx)
3✔
216
        l.Infof("put the index template for %s", indexName)
3✔
217

3✔
218
        template := fmt.Sprintf(indexDevicesTemplate,
3✔
219
                indexName,
3✔
220
                s.devicesIndexShards,
3✔
221
                s.devicesIndexReplicas,
3✔
222
        )
3✔
223
        req := opensearchapi.IndicesPutIndexTemplateRequest{
3✔
224
                Name: indexName,
3✔
225
                Body: strings.NewReader(template),
3✔
226
        }
3✔
227

3✔
228
        res, err := req.Do(ctx, s.client)
3✔
229
        if err != nil {
3✔
230
                return errors.Wrap(err, "failed to put the index template")
×
231
        }
×
232
        defer res.Body.Close()
3✔
233

3✔
234
        if res.StatusCode != http.StatusOK {
3✔
235
                return errors.New("failed to set up the index template")
×
236
        }
×
237
        return nil
3✔
238
}
239

240
func (s *opensearchStore) migrateCreateIndex(ctx context.Context, indexName string) error {
3✔
241
        l := log.FromContext(ctx)
3✔
242
        l.Infof("verify if the index %s exists", indexName)
3✔
243

3✔
244
        req := opensearchapi.IndicesExistsRequest{
3✔
245
                Index: []string{indexName},
3✔
246
        }
3✔
247
        res, err := req.Do(ctx, s.client)
3✔
248
        if err != nil {
3✔
249
                return errors.Wrap(err, "failed to verify the index")
×
250
        }
×
251
        defer res.Body.Close()
3✔
252

3✔
253
        if res.StatusCode == http.StatusNotFound {
5✔
254
                l.Infof("create the index %s", indexName)
2✔
255

2✔
256
                req := opensearchapi.IndicesCreateRequest{
2✔
257
                        Index: indexName,
2✔
258
                }
2✔
259
                res, err := req.Do(ctx, s.client)
2✔
260
                if err != nil {
2✔
261
                        return errors.Wrap(err, "failed to create the index")
×
262
                }
×
263
                defer res.Body.Close()
2✔
264

2✔
265
                if res.StatusCode != http.StatusOK {
3✔
266
                        return errors.New("failed to create the index")
1✔
267
                }
1✔
268
        } else if res.StatusCode != http.StatusOK {
1✔
269
                return errors.New("failed to verify the index")
×
270
        }
×
271

272
        return nil
2✔
273
}
274

275
func (s *opensearchStore) Ping(ctx context.Context) error {
121✔
276
        pingRequest := s.client.Ping.WithContext(ctx)
121✔
277
        _, err := s.client.Ping(pingRequest)
121✔
278
        return errors.Wrap(err, "failed to ping opensearch")
121✔
279
}
121✔
280

281
func (s *opensearchStore) Aggregate(ctx context.Context, query model.Query) (model.M, error) {
2✔
282
        l := log.FromContext(ctx)
2✔
283

2✔
284
        var buf bytes.Buffer
2✔
285
        if err := json.NewEncoder(&buf).Encode(query); err != nil {
2✔
NEW
286
                return nil, err
×
NEW
287
        }
×
288

289
        l.Debugf("es query: %v", buf.String())
2✔
290

2✔
291
        id := identity.FromContext(ctx)
2✔
292

2✔
293
        searchRequests := []func(*opensearchapi.SearchRequest){
2✔
294
                s.client.Search.WithContext(ctx),
2✔
295
                s.client.Search.WithIndex(s.GetDevicesIndex(id.Tenant)),
2✔
296
                s.client.Search.WithBody(&buf),
2✔
297
                s.client.Search.WithTrackTotalHits(false),
2✔
298
        }
2✔
299
        routingKey := s.GetDevicesRoutingKey(id.Tenant)
2✔
300
        if routingKey != "" {
4✔
301
                searchRequests = append(searchRequests, s.client.Search.WithRouting(routingKey))
2✔
302
        }
2✔
303
        resp, err := s.client.Search(searchRequests...)
2✔
304
        defer resp.Body.Close()
2✔
305

2✔
306
        if err != nil {
2✔
NEW
307
                return nil, err
×
NEW
308
        }
×
309

310
        if resp.IsError() {
2✔
NEW
311
                return nil, errors.New(resp.String())
×
NEW
312
        }
×
313

314
        var ret map[string]interface{}
2✔
315
        if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
2✔
NEW
316
                return nil, err
×
NEW
317
        }
×
318

319
        l.Debugf("opensearch response: %v", ret)
2✔
320

2✔
321
        return ret, nil
2✔
322
}
323

324
func (s *opensearchStore) Search(ctx context.Context, query model.Query) (model.M, error) {
15✔
325
        l := log.FromContext(ctx)
15✔
326

15✔
327
        var buf bytes.Buffer
15✔
328
        if err := json.NewEncoder(&buf).Encode(query); err != nil {
15✔
329
                return nil, err
×
330
        }
×
331

332
        l.Debugf("es query: %v", buf.String())
15✔
333

15✔
334
        id := identity.FromContext(ctx)
15✔
335

15✔
336
        searchRequests := []func(*opensearchapi.SearchRequest){
15✔
337
                s.client.Search.WithContext(ctx),
15✔
338
                s.client.Search.WithIndex(s.GetDevicesIndex(id.Tenant)),
15✔
339
                s.client.Search.WithBody(&buf),
15✔
340
                s.client.Search.WithTrackTotalHits(true),
15✔
341
        }
15✔
342
        routingKey := s.GetDevicesRoutingKey(id.Tenant)
15✔
343
        if routingKey != "" {
30✔
344
                searchRequests = append(searchRequests, s.client.Search.WithRouting(routingKey))
15✔
345
        }
15✔
346
        resp, err := s.client.Search(searchRequests...)
15✔
347
        defer resp.Body.Close()
15✔
348

15✔
349
        if err != nil {
15✔
350
                return nil, err
×
351
        }
×
352

353
        if resp.IsError() {
15✔
354
                return nil, errors.New(resp.String())
×
355
        }
×
356

357
        var ret map[string]interface{}
15✔
358
        if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
15✔
359
                return nil, err
×
360
        }
×
361

362
        l.Debugf("opensearch response: %v", ret)
15✔
363

15✔
364
        return ret, nil
15✔
365
}
366

367
// GetDevicesIndexMapping retrieves the "devices*" index definition for tenant 'tid'
368
// existing fields, incl. inventory attributes, are found under 'properties'
369
// see: https://opensearch.org/docs/latest/api-reference/index-apis/get-index/
370
func (s *opensearchStore) GetDevicesIndexMapping(ctx context.Context,
371
        tid string) (map[string]interface{}, error) {
×
372
        l := log.FromContext(ctx)
×
373
        idx := s.GetDevicesIndex(tid)
×
374

×
375
        req := opensearchapi.IndicesGetRequest{
×
376
                Index: []string{idx},
×
377
        }
×
378

×
379
        res, err := req.Do(ctx, s.client)
×
380
        if err != nil {
×
381
                return nil, errors.Wrapf(err, "failed to get devices index from store, tid %s", tid)
×
382
        }
×
383
        defer res.Body.Close()
×
384

×
385
        if res.IsError() {
×
386
                return nil, errors.Errorf(
×
387
                        "failed to get devices index from store, tid %s, code %d",
×
388
                        tid, res.StatusCode,
×
389
                )
×
390
        }
×
391

392
        var indexRes map[string]interface{}
×
393
        if err := json.NewDecoder(res.Body).Decode(&indexRes); err != nil {
×
394
                return nil, err
×
395
        }
×
396

397
        index, ok := indexRes[idx]
×
398
        if !ok {
×
399
                return nil, errors.New("can't parse index defintion response")
×
400
        }
×
401

402
        indexM, ok := index.(map[string]interface{})
×
403
        if !ok {
×
404
                return nil, errors.New("can't parse index defintion response")
×
405
        }
×
406

407
        l.Debugf("devices index for tid %s\n%s\n", tid, indexM)
×
408

×
409
        return indexM, nil
×
410
}
411

412
// GetDevicesIndex returns the index name for the tenant tid
413
func (s *opensearchStore) GetDevicesIndex(tid string) string {
35✔
414
        return s.devicesIndexName
35✔
415
}
35✔
416

417
// GetDevicesRoutingKey returns the routing key for the tenant tid
418
func (s *opensearchStore) GetDevicesRoutingKey(tid string) string {
32✔
419
        return tid
32✔
420
}
32✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc