• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 714256730

pending completion
714256730

Pull #79

gitlab-ci

Fabio Tranchitella
refac: optimize the mapping cache taking into account the max size
Pull Request #79: MEN-5598: map inventory attributes to sequential fields

343 of 410 new or added lines in 12 files covered. (83.66%)

4 existing lines in 2 files now uncovered.

1689 of 2125 relevant lines covered (79.48%)

12.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.13
/app/indexer/jobs.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19
        "encoding/json"
20

21
        natsio "github.com/nats-io/nats.go"
22
        "github.com/pkg/errors"
23

24
        "github.com/mendersoftware/go-lib-micro/config"
25
        "github.com/mendersoftware/go-lib-micro/log"
26

27
        "github.com/mendersoftware/reporting/client/deviceauth"
28
        "github.com/mendersoftware/reporting/client/inventory"
29
        rconfig "github.com/mendersoftware/reporting/config"
30
        "github.com/mendersoftware/reporting/model"
31
)
32

33
type Services map[string]bool
34
type DeviceServices map[string]Services
35
type TenantDeviceServices map[string]DeviceServices
36

37
func (i *indexer) GetJobs(ctx context.Context, jobs chan *model.Job) error {
7✔
38
        l := log.FromContext(ctx)
7✔
39

7✔
40
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
7✔
41
        err := i.nats.JetStreamCreateStream(streamName)
7✔
42
        if err != nil {
7✔
43
                return errors.Wrap(err, "failed to create the nats JetStream stream")
×
44
        }
×
45

46
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
7✔
47
        subject := streamName + "." + topic
7✔
48
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
7✔
49

7✔
50
        channel := make(chan *natsio.Msg, 1)
7✔
51
        unsubscribe, err := i.nats.JetStreamSubscribe(ctx, subject, durableName, channel)
7✔
52
        if err != nil {
9✔
53
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
2✔
54
        }
2✔
55

56
        go func() {
10✔
57
                l.Info("Reindexer ready to receive messages")
5✔
58
                defer func() {
9✔
59
                        _ = unsubscribe()
4✔
60
                }()
4✔
61

62
                for {
14✔
63
                        select {
9✔
64
                        case msg := <-channel:
4✔
65
                                job := &model.Job{}
4✔
66
                                err := json.Unmarshal(msg.Data, job)
4✔
67
                                if err != nil {
6✔
68
                                        err = errors.Wrap(err, "failed to unmarshall message")
2✔
69
                                        l.Error(err)
2✔
70
                                        if err := msg.Term(); err != nil {
4✔
71
                                                err = errors.Wrap(err, "failed to term the message")
2✔
72
                                                l.Error(err)
2✔
73
                                        }
2✔
74
                                        continue
2✔
75
                                }
76
                                if err = msg.Ack(); err != nil {
4✔
77
                                        err = errors.Wrap(err, "failed to ack the message")
2✔
78
                                        l.Error(err)
2✔
79
                                }
2✔
80
                                jobs <- job
2✔
81

82
                        case <-ctx.Done():
4✔
83
                                return
4✔
84
                        }
85
                }
86
        }()
87

88
        return nil
5✔
89
}
90

91
func (i *indexer) ProcessJobs(ctx context.Context, jobs []*model.Job) {
8✔
92
        l := log.FromContext(ctx)
8✔
93

8✔
94
        devices := make([]*model.Device, 0, len(jobs))
8✔
95
        removedDevices := make([]*model.Device, 0, len(jobs))
8✔
96

8✔
97
        l.Debugf("Processing %d jobs", len(jobs))
8✔
98
        tenantsDevicesServices := groupJobsIntoTenantDeviceServices(jobs)
8✔
99
        for tenant, deviceServices := range tenantsDevicesServices {
16✔
100
                deviceIDs := make([]string, 0, len(deviceServices))
8✔
101
                for deviceID := range deviceServices {
32✔
102
                        deviceIDs = append(deviceIDs, deviceID)
24✔
103
                }
24✔
104
                // get devices from deviceauth
105
                deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
8✔
106
                if err != nil {
10✔
107
                        l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
2✔
108
                        continue
2✔
109
                }
110
                // get devices from inventory
111
                inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
6✔
112
                if err != nil {
8✔
113
                        l.Error(errors.Wrap(err, "failed to get devices from inventory"))
2✔
114
                        continue
2✔
115
                }
116
                // process the results
117
                devices = devices[:0]
4✔
118
                removedDevices = removedDevices[:0]
4✔
119
                for _, deviceID := range deviceIDs {
16✔
120
                        var deviceAuthDevice *deviceauth.DeviceAuthDevice
12✔
121
                        var inventoryDevice *inventory.Device
12✔
122
                        for _, d := range deviceAuthDevices {
32✔
123
                                if d.ID == deviceID {
28✔
124
                                        deviceAuthDevice = &d
8✔
125
                                        break
8✔
126
                                }
127
                        }
128
                        for _, d := range inventoryDevices {
32✔
129
                                if d.ID == inventory.DeviceID(deviceID) {
28✔
130
                                        inventoryDevice = &d
8✔
131
                                        break
8✔
132
                                }
133
                        }
134
                        if deviceAuthDevice == nil || inventoryDevice == nil {
16✔
135
                                removedDevices = append(removedDevices, &model.Device{
4✔
136
                                        ID:       &deviceID,
4✔
137
                                        TenantID: &tenant,
4✔
138
                                })
4✔
139
                                continue
4✔
140
                        }
141
                        device := model.NewDevice(tenant, string(inventoryDevice.ID))
8✔
142
                        // data from inventory
8✔
143
                        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
8✔
144
                        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
8✔
145
                                inventoryDevice.Attributes, true)
8✔
146
                        if err != nil {
8✔
NEW
147
                                err = errors.Wrapf(err,
×
NEW
148
                                        "failed to map inventory data for tenant %s, "+
×
NEW
149
                                                "device %s", tenant, deviceID)
×
NEW
150
                                l.Warn(err)
×
151
                        } else {
8✔
152
                                for _, invattr := range attributes {
8✔
NEW
153
                                        attr := model.NewInventoryAttribute(invattr.Scope).
×
NEW
154
                                                SetName(invattr.Name).
×
NEW
155
                                                SetVal(invattr.Value)
×
NEW
156
                                        if err := device.AppendAttr(attr); err != nil {
×
NEW
157
                                                err = errors.Wrapf(err,
×
NEW
158
                                                        "failed to convert inventory data for tenant %s, "+
×
NEW
159
                                                                "device %s", tenant, deviceID)
×
NEW
160
                                                l.Warn(err)
×
NEW
161
                                        }
×
162
                                }
163
                        }
164
                        // data from device auth
165
                        _ = device.AppendAttr(&model.InventoryAttribute{
8✔
166
                                Scope:  model.ScopeIdentity,
8✔
167
                                Name:   model.AttrNameStatus,
8✔
168
                                String: []string{deviceAuthDevice.Status},
8✔
169
                        })
8✔
170
                        for name, value := range deviceAuthDevice.IdDataStruct {
16✔
171
                                attr := model.NewInventoryAttribute(model.ScopeIdentity).
8✔
172
                                        SetName(name).
8✔
173
                                        SetVal(value)
8✔
174
                                if err := device.AppendAttr(attr); err != nil {
8✔
175
                                        err = errors.Wrapf(err,
×
176
                                                "failed to convert identity data for tenant %s, "+
×
177
                                                        "device %s", tenant, deviceID)
×
178
                                        l.Warn(err)
×
179
                                }
×
180
                        }
181
                        // append the device
182
                        devices = append(devices, device)
8✔
183
                }
184
                // bulk index the device
185
                if len(devices) > 0 || len(removedDevices) > 0 {
8✔
186
                        err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
4✔
187
                        if err != nil {
6✔
188
                                err = errors.Wrap(err, "failed to bulk index the devices")
2✔
189
                                l.Error(err)
2✔
190
                        }
2✔
191
                }
192
        }
193
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc