• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 850601221

pending completion
850601221

Pull #138

gitlab-ci

GitHub
Merge pull request #130 from kjaskiewiczz/men-5912
Pull Request #138: Align staging with master

139 of 151 new or added lines in 5 files covered. (92.05%)

2 existing lines in 1 file now uncovered.

2964 of 3474 relevant lines covered (85.32%)

17.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.33
/app/indexer/jobs.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19
        "strconv"
20

21
        "github.com/pkg/errors"
22

23
        "github.com/mendersoftware/go-lib-micro/config"
24
        "github.com/mendersoftware/go-lib-micro/log"
25

26
        "github.com/mendersoftware/reporting/client/deployments"
27
        "github.com/mendersoftware/reporting/client/deviceauth"
28
        "github.com/mendersoftware/reporting/client/inventory"
29
        rconfig "github.com/mendersoftware/reporting/config"
30
        "github.com/mendersoftware/reporting/model"
31
)
32

33
const undefinedCoordinateIdx = -1
34

35
type IDs map[string]bool
36
type ActionIDs map[string]IDs
37
type TenantActionIDs map[string]ActionIDs
38

39
func (i *indexer) GetJobs(ctx context.Context, jobs chan model.Job) error {
7✔
40
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
7✔
41

7✔
42
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
7✔
43
        subject := streamName + "." + topic
7✔
44
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
7✔
45

7✔
46
        err := i.nats.JetStreamSubscribe(ctx, subject, durableName, jobs)
7✔
47
        if err != nil {
11✔
48
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
4✔
49
        }
4✔
50

51
        return nil
3✔
52
}
53

54
func (i *indexer) ProcessJobs(ctx context.Context, jobs []model.Job) {
17✔
55
        l := log.FromContext(ctx)
17✔
56
        l.Debugf("Processing %d jobs", len(jobs))
17✔
57
        tenantsActionIDs := groupJobsIntoTenantActionIDs(jobs)
17✔
58
        for tenant, actionIDs := range tenantsActionIDs {
37✔
59
                for action, IDs := range actionIDs {
40✔
60
                        if action == model.ActionReindex {
36✔
61
                                i.processJobDevices(ctx, tenant, IDs)
16✔
62
                        } else if action == model.ActionReindexDeployment {
24✔
63
                                i.processJobDeployments(ctx, tenant, IDs)
4✔
64
                        } else {
4✔
65
                                l.Warnf("ignoring unknown job action: %v", action)
×
66
                        }
×
67
                }
68
        }
69
}
70

71
func (i *indexer) processJobDevices(
72
        ctx context.Context,
73
        tenant string,
74
        IDs IDs,
75
) {
16✔
76
        l := log.FromContext(ctx)
16✔
77
        devices := make([]*model.Device, 0, len(IDs))
16✔
78
        removedDevices := make([]*model.Device, 0, len(IDs))
16✔
79

16✔
80
        deviceIDs := make([]string, 0, len(IDs))
16✔
81
        for deviceID := range IDs {
61✔
82
                deviceIDs = append(deviceIDs, deviceID)
45✔
83
        }
45✔
84
        // get devices from deviceauth
85
        deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
16✔
86
        if err != nil {
18✔
87
                l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
2✔
88
                return
2✔
89
        }
2✔
90
        // get devices from inventory
91
        inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
14✔
92
        if err != nil {
16✔
93
                l.Error(errors.Wrap(err, "failed to get devices from inventory"))
2✔
94
                return
2✔
95
        }
2✔
96
        // get last deployment statuses from deployment
97
        deploymentsDevices, err := i.deplClient.GetLatestFinishedDeployment(ctx, tenant, deviceIDs)
12✔
98
        if err != nil {
12✔
99
                l.Error(errors.Wrap(err, "failed to get last device deployments from deployments"))
×
100
                return
×
101
        }
×
102

103
        // process the results
104
        devices = devices[:0]
12✔
105
        removedDevices = removedDevices[:0]
12✔
106
        for _, deviceID := range deviceIDs {
45✔
107
                var deviceAuthDevice *deviceauth.DeviceAuthDevice
33✔
108
                var inventoryDevice *inventory.Device
33✔
109
                var deploymentsDevice *deployments.LastDeviceDeployment
33✔
110
                for _, d := range deviceAuthDevices {
96✔
111
                        if d.ID == deviceID {
90✔
112
                                deviceAuthDevice = &d
27✔
113
                                break
27✔
114
                        }
115
                }
116
                for _, d := range inventoryDevices {
96✔
117
                        if d.ID == inventory.DeviceID(deviceID) {
90✔
118
                                inventoryDevice = &d
27✔
119
                                break
27✔
120
                        }
121
                }
122
                for _, d := range deploymentsDevices {
58✔
123
                        if d.DeviceID == deviceID {
32✔
124
                                deploymentsDevice = &d
7✔
125
                                break
7✔
126
                        }
127
                }
128
                if deviceAuthDevice == nil || inventoryDevice == nil {
39✔
129
                        removedDevices = append(removedDevices, &model.Device{
6✔
130
                                ID:       &deviceID,
6✔
131
                                TenantID: &tenant,
6✔
132
                        })
6✔
133
                        continue
6✔
134
                }
135
                device := i.processJobDevice(
27✔
136
                        ctx,
27✔
137
                        tenant,
27✔
138
                        deviceAuthDevice,
27✔
139
                        inventoryDevice,
27✔
140
                        deploymentsDevice,
27✔
141
                )
27✔
142
                if device != nil {
54✔
143
                        devices = append(devices, device)
27✔
144
                }
27✔
145
        }
146
        // bulk index the device
147
        if len(devices) > 0 || len(removedDevices) > 0 {
24✔
148
                err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
12✔
149
                if err != nil {
14✔
150
                        err = errors.Wrap(err, "failed to bulk index the devices")
2✔
151
                        l.Error(err)
2✔
152
                }
2✔
153
        }
154
}
155

156
func (i *indexer) processJobDevice(
157
        ctx context.Context,
158
        tenant string,
159
        deviceAuthDevice *deviceauth.DeviceAuthDevice,
160
        inventoryDevice *inventory.Device,
161
        deploymentsDevice *deployments.LastDeviceDeployment,
162
) *model.Device {
27✔
163
        l := log.FromContext(ctx)
27✔
164
        //
27✔
165
        device := model.NewDevice(tenant, string(inventoryDevice.ID))
27✔
166
        // data from inventory
27✔
167
        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
27✔
168
        // extract location from attributes
27✔
169
        ok, location := extractLocation(inventoryDevice.Attributes)
27✔
170
        if ok {
30✔
171
                device.Location = &location
3✔
172
        }
3✔
173
        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
27✔
174
                inventoryDevice.Attributes, true, false)
27✔
175
        if err != nil {
27✔
176
                err = errors.Wrapf(err,
×
177
                        "failed to map device data for tenant %s, "+
×
178
                                "device %s", tenant, inventoryDevice.ID)
×
179
                l.Warn(err)
×
180
        } else {
27✔
181
                for _, invattr := range attributes {
62✔
182
                        attr := model.NewInventoryAttribute(invattr.Scope).
35✔
183
                                SetName(invattr.Name).
35✔
184
                                SetVal(invattr.Value)
35✔
185
                        if err := device.AppendAttr(attr); err != nil {
35✔
186
                                err = errors.Wrapf(err,
×
187
                                        "failed to convert device data for tenant %s, "+
×
188
                                                "device %s", tenant, inventoryDevice.ID)
×
189
                                l.Warn(err)
×
190
                        }
×
191
                }
192
        }
193
        // data from device auth
194
        _ = device.AppendAttr(&model.InventoryAttribute{
27✔
195
                Scope:  model.ScopeIdentity,
27✔
196
                Name:   model.AttrNameStatus,
27✔
197
                String: []string{deviceAuthDevice.Status},
27✔
198
        })
27✔
199
        for name, value := range deviceAuthDevice.IdDataStruct {
39✔
200
                attr := model.NewInventoryAttribute(model.ScopeIdentity).
12✔
201
                        SetName(name).
12✔
202
                        SetVal(value)
12✔
203
                if err := device.AppendAttr(attr); err != nil {
12✔
204
                        err = errors.Wrapf(err,
×
205
                                "failed to convert identity data for tenant %s, "+
×
206
                                        "device %s", tenant, inventoryDevice.ID)
×
207
                        l.Warn(err)
×
208
                }
×
209
        }
210

211
        // data from deployments
212
        if deploymentsDevice != nil {
34✔
213
                _ = device.AppendAttr(&model.InventoryAttribute{
7✔
214
                        Scope:  model.ScopeSystem,
7✔
215
                        Name:   model.AttrNameLatestDeploymentStatus,
7✔
216
                        String: []string{deploymentsDevice.DeviceDeploymentStatus},
7✔
217
                })
7✔
218
        }
7✔
219

220
        // return the device
221
        return device
27✔
222
}
223

224
func extractLocation(
225
        attrs inventory.DeviceAttributes,
226
) (bool, string) {
27✔
227
        latIdx := undefinedCoordinateIdx
27✔
228
        lonIdx := undefinedCoordinateIdx
27✔
229

27✔
230
        for i, attr := range attrs {
62✔
231
                if attr.Name == model.AttrNameGeoLatitude {
38✔
232
                        latIdx = i
3✔
233
                } else if attr.Name == model.AttrNameGeoLongitude {
38✔
234
                        lonIdx = i
3✔
235
                }
3✔
236
                if latIdx != undefinedCoordinateIdx && lonIdx != undefinedCoordinateIdx {
38✔
237
                        break
3✔
238
                }
239
        }
240
        if latIdx != undefinedCoordinateIdx && lonIdx != undefinedCoordinateIdx {
30✔
241
                latStr, ok := attrs[latIdx].Value.(string)
3✔
242
                if !ok {
3✔
NEW
243
                        return false, ""
×
NEW
244
                }
×
245
                lonStr, ok := attrs[lonIdx].Value.(string)
3✔
246
                if !ok {
3✔
NEW
247
                        return false, ""
×
NEW
248
                }
×
249
                if validLocation(latStr, lonStr) {
6✔
250
                        return true, latStr + "," + lonStr
3✔
251
                }
3✔
252
        }
253
        return false, ""
24✔
254
}
255

256
func validLocation(latStr, lonStr string) bool {
3✔
257
        lat, err := strconv.ParseFloat(latStr, 32)
3✔
258
        if err != nil {
3✔
NEW
259
                return false
×
NEW
260
        }
×
261
        if lat < -90 || lat > 90 {
3✔
NEW
262
                return false
×
NEW
263
        }
×
264
        lon, err := strconv.ParseFloat(lonStr, 32)
3✔
265
        if err != nil {
3✔
NEW
266
                return false
×
NEW
267
        }
×
268
        if lon < -180 || lon > 180 {
3✔
NEW
269
                return false
×
NEW
270
        }
×
271
        return true
3✔
272
}
273

274
func (i *indexer) processJobDeployments(
275
        ctx context.Context,
276
        tenant string,
277
        IDs IDs,
278
) {
4✔
279
        l := log.FromContext(ctx)
4✔
280
        depls := make([]*model.Deployment, 0, len(IDs))
4✔
281
        deploymentIDs := make([]string, 0, len(IDs))
4✔
282
        for deploymentID := range IDs {
14✔
283
                deploymentIDs = append(deploymentIDs, deploymentID)
10✔
284
        }
10✔
285
        // get device deployments from deployments
286
        deviceDeployments, err := i.deplClient.GetDeployments(ctx, tenant, deploymentIDs)
4✔
287
        if err != nil {
4✔
288
                l.Error(errors.Wrap(err, "failed to get device deployments from device deployments"))
×
289
                return
×
290
        }
×
291
        // process the results
292
        for deploymentID := range IDs {
14✔
293
                for _, d := range deviceDeployments {
28✔
294
                        if d.ID == deploymentID {
28✔
295
                                depl := i.processJobDeployment(ctx, tenant, d)
10✔
296
                                if depl != nil {
20✔
297
                                        depls = append(depls, depl)
10✔
298
                                }
10✔
299
                                break
10✔
300
                        }
301
                }
302
        }
303
        // bulk index the device
304
        if len(depls) > 0 {
8✔
305
                err = i.store.BulkIndexDeployments(ctx, depls)
4✔
306
                if err != nil {
4✔
307
                        err = errors.Wrap(err, "failed to bulk index the deployments")
×
308
                        l.Error(err)
×
309
                }
×
310
        }
311
}
312

313
func (i *indexer) processJobDeployment(
314
        ctx context.Context,
315
        tenant string,
316
        deployment *deployments.DeviceDeployment,
317
) *model.Deployment {
10✔
318
        deviceElapsedSeconds := uint(0)
10✔
319
        if deployment.Device == nil ||
10✔
320
                deployment.Deployment == nil {
10✔
321
                return nil
×
322
        } else if deployment.Device.Finished != nil && deployment.Device.Created != nil {
18✔
323
                deviceElapsedSeconds = uint(deployment.Device.Finished.Sub(
8✔
324
                        *deployment.Device.Created).Seconds())
8✔
325
        }
8✔
326
        res := &model.Deployment{
10✔
327
                ID:                          deployment.ID,
10✔
328
                DeviceID:                    deployment.Device.Id,
10✔
329
                DeploymentID:                deployment.Deployment.Id,
10✔
330
                TenantID:                    tenant,
10✔
331
                DeploymentName:              deployment.Deployment.Name,
10✔
332
                DeploymentArtifactName:      deployment.Deployment.ArtifactName,
10✔
333
                DeploymentType:              deployment.Deployment.Type,
10✔
334
                DeploymentCreated:           deployment.Deployment.Created,
10✔
335
                DeploymentFilterID:          deployment.Deployment.FilterId,
10✔
336
                DeploymentAllDevices:        deployment.Deployment.AllDevices,
10✔
337
                DeploymentForceInstallation: deployment.Deployment.ForceInstallation,
10✔
338
                DeploymentGroups:            deployment.Deployment.Groups,
10✔
339
                DeploymentPhased:            deployment.Deployment.PhaseId != "",
10✔
340
                DeploymentPhaseId:           deployment.Deployment.PhaseId,
10✔
341
                DeploymentRetries:           deployment.Deployment.Retries,
10✔
342
                DeploymentMaxDevices:        uint(deployment.Deployment.MaxDevices),
10✔
343
                DeploymentAutogenerateDelta: deployment.Deployment.AutogenerateDelta,
10✔
344
                DeviceCreated:               deployment.Device.Created,
10✔
345
                DeviceFinished:              deployment.Device.Finished,
10✔
346
                DeviceElapsedSeconds:        deviceElapsedSeconds,
10✔
347
                DeviceDeleted:               deployment.Device.Deleted,
10✔
348
                DeviceStatus:                deployment.Device.Status,
10✔
349
                DeviceIsLogAvailable:        deployment.Device.IsLogAvailable,
10✔
350
                DeviceRetries:               deployment.Device.Retries,
10✔
351
                DeviceAttempts:              deployment.Device.Attempts,
10✔
352
        }
10✔
353
        if deployment.Device.Image != nil {
18✔
354
                res.ImageID = deployment.Device.Image.Id
8✔
355
                res.ImageDescription = deployment.Device.Image.Description
8✔
356
                res.ImageArtifactName = deployment.Device.Image.Name
8✔
357
                res.ImageDeviceTypes = deployment.Device.Image.DeviceTypesCompatible
8✔
358
                res.ImageSigned = deployment.Device.Image.Signed
8✔
359
                res.ImageProvides = deployment.Device.Image.Provides
8✔
360
                res.ImageDepends = deployment.Device.Image.Depends
8✔
361
                res.ImageClearsProvides = deployment.Device.Image.ClearsProvides
8✔
362
                res.ImageSize = deployment.Device.Image.Size
8✔
363
                if deployment.Device.Image.Info != nil {
16✔
364
                        res.ImageArtifactInfoFormat = deployment.Device.Image.Info.Format
8✔
365
                        res.ImageArtifactInfoVersion = deployment.Device.Image.Info.Version
8✔
366
                }
8✔
367
        }
368
        return res
10✔
369
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc