• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 992949688

05 Sep 2023 09:18AM UTC coverage: 89.001% (+4.0%) from 85.036%
992949688

Pull #138

gitlab-ci

web-flow
Merge pull request #163 from mendersoftware/combined-prs-branch

Combined PRs
Pull Request #138: Align staging with master

177 of 199 new or added lines in 8 files covered. (88.94%)

2 existing lines in 1 file now uncovered.

2476 of 2782 relevant lines covered (89.0%)

18.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.44
/app/indexer/jobs.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19
        "strconv"
20

21
        "github.com/pkg/errors"
22

23
        "github.com/mendersoftware/go-lib-micro/config"
24
        "github.com/mendersoftware/go-lib-micro/log"
25

26
        "github.com/mendersoftware/reporting/client/deployments"
27
        "github.com/mendersoftware/reporting/client/deviceauth"
28
        "github.com/mendersoftware/reporting/client/inventory"
29
        rconfig "github.com/mendersoftware/reporting/config"
30
        "github.com/mendersoftware/reporting/model"
31
)
32

33
const undefinedCoordinateIdx = -1
34

35
type IDs map[string]bool
36
type ActionIDs map[string]IDs
37
type TenantActionIDs map[string]ActionIDs
38

39
func (i *indexer) GetJobs(ctx context.Context, jobs chan model.Job) error {
4✔
40
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
4✔
41

4✔
42
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
4✔
43
        subject := streamName + "." + topic
4✔
44
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
4✔
45

4✔
46
        err := i.nats.JetStreamSubscribe(ctx, subject, durableName, jobs)
4✔
47
        if err != nil {
6✔
48
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
2✔
49
        }
2✔
50

51
        return nil
2✔
52
}
53

54
func (i *indexer) ProcessJobs(ctx context.Context, jobs []model.Job) {
11✔
55
        l := log.FromContext(ctx)
11✔
56
        l.Debugf("Processing %d jobs", len(jobs))
11✔
57
        tenantsActionIDs := groupJobsIntoTenantActionIDs(jobs)
11✔
58
        for tenant, actionIDs := range tenantsActionIDs {
25✔
59
                for action, IDs := range actionIDs {
28✔
60
                        if action == model.ActionReindex {
25✔
61
                                i.processJobDevices(ctx, tenant, IDs)
11✔
62
                        } else if action == model.ActionReindexDeployment {
17✔
63
                                i.processJobDeployments(ctx, tenant, IDs)
3✔
64
                        } else {
3✔
65
                                l.Warnf("ignoring unknown job action: %v", action)
×
66
                        }
×
67
                }
68
        }
69
}
70

71
func (i *indexer) processJobDevices(
72
        ctx context.Context,
73
        tenant string,
74
        IDs IDs,
75
) {
11✔
76
        l := log.FromContext(ctx)
11✔
77
        devices := make([]*model.Device, 0, len(IDs))
11✔
78
        removedDevices := make([]*model.Device, 0, len(IDs))
11✔
79

11✔
80
        deviceIDs := make([]string, 0, len(IDs))
11✔
81
        for deviceID := range IDs {
41✔
82
                deviceIDs = append(deviceIDs, deviceID)
30✔
83
        }
30✔
84
        // get devices from deviceauth
85
        deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
11✔
86
        if err != nil {
12✔
87
                l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
1✔
88
                return
1✔
89
        }
1✔
90
        // get devices from inventory
91
        inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
10✔
92
        if err != nil {
11✔
93
                l.Error(errors.Wrap(err, "failed to get devices from inventory"))
1✔
94
                return
1✔
95
        }
1✔
96
        // get last deployment statuses from deployment
97
        deploymentsDevices, err := i.deplClient.GetLatestFinishedDeployment(ctx, tenant, deviceIDs)
9✔
98
        if err != nil {
9✔
99
                l.Error(errors.Wrap(err, "failed to get last device deployments from deployments"))
×
100
                return
×
101
        }
×
102

103
        // process the results
104
        devices = devices[:0]
9✔
105
        removedDevices = removedDevices[:0]
9✔
106
        for _, deviceID := range deviceIDs {
33✔
107
                var deviceAuthDevice *deviceauth.DeviceAuthDevice
24✔
108
                var inventoryDevice *inventory.Device
24✔
109
                var deploymentsDevice *deployments.LastDeviceDeployment
24✔
110
                if d, ok := deviceAuthDevices[deviceID]; ok {
45✔
111
                        deviceAuthDevice = &d
21✔
112
                }
21✔
113
                for _, d := range inventoryDevices {
72✔
114
                        if d.ID == inventory.DeviceID(deviceID) {
69✔
115
                                inventoryDevice = &d
21✔
116
                                break
21✔
117
                        }
118
                }
119
                for _, d := range deploymentsDevices {
44✔
120
                        if d.DeviceID == deviceID {
25✔
121
                                deploymentsDevice = &d
5✔
122
                                break
5✔
123
                        }
124
                }
125
                if deviceAuthDevice == nil || inventoryDevice == nil {
27✔
126
                        removedDevices = append(removedDevices, &model.Device{
3✔
127
                                ID:       &deviceID,
3✔
128
                                TenantID: &tenant,
3✔
129
                        })
3✔
130
                        continue
3✔
131
                }
132
                device := i.processJobDevice(
21✔
133
                        ctx,
21✔
134
                        tenant,
21✔
135
                        deviceAuthDevice,
21✔
136
                        inventoryDevice,
21✔
137
                        deploymentsDevice,
21✔
138
                )
21✔
139
                if device != nil {
42✔
140
                        devices = append(devices, device)
21✔
141
                }
21✔
142
        }
143
        // bulk index the device
144
        if len(devices) > 0 || len(removedDevices) > 0 {
18✔
145
                err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
9✔
146
                if err != nil {
10✔
147
                        err = errors.Wrap(err, "failed to bulk index the devices")
1✔
148
                        l.Error(err)
1✔
149
                }
1✔
150
        }
151
}
152

153
func (i *indexer) processJobDevice(
154
        ctx context.Context,
155
        tenant string,
156
        deviceAuthDevice *deviceauth.DeviceAuthDevice,
157
        inventoryDevice *inventory.Device,
158
        deploymentsDevice *deployments.LastDeviceDeployment,
159
) *model.Device {
21✔
160
        l := log.FromContext(ctx)
21✔
161
        //
21✔
162
        device := model.NewDevice(tenant, string(inventoryDevice.ID))
21✔
163
        // data from inventory
21✔
164
        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
21✔
165
        // last checkin date
21✔
166
        if deviceAuthDevice != nil {
42✔
167
                device.SetLastCheckIn(deviceAuthDevice.LastCheckinDate)
21✔
168
        }
21✔
169
        // extract location from attributes
170
        ok, location := extractLocation(inventoryDevice.Attributes)
21✔
171
        if ok {
24✔
172
                device.Location = &location
3✔
173
        }
3✔
174
        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
21✔
175
                inventoryDevice.Attributes, true, false)
21✔
176
        if err != nil {
21✔
177
                err = errors.Wrapf(err,
×
178
                        "failed to map device data for tenant %s, "+
×
179
                                "device %s", tenant, inventoryDevice.ID)
×
180
                l.Warn(err)
×
181
        } else {
21✔
182
                for _, invattr := range attributes {
55✔
183
                        attr := model.NewInventoryAttribute(invattr.Scope).
34✔
184
                                SetName(invattr.Name).
34✔
185
                                SetVal(invattr.Value)
34✔
186
                        if err := device.AppendAttr(attr); err != nil {
34✔
187
                                err = errors.Wrapf(err,
×
188
                                        "failed to convert device data for tenant %s, "+
×
189
                                                "device %s", tenant, inventoryDevice.ID)
×
190
                                l.Warn(err)
×
191
                        }
×
192
                }
193
        }
194
        // data from device auth
195
        _ = device.AppendAttr(&model.InventoryAttribute{
21✔
196
                Scope:  model.ScopeIdentity,
21✔
197
                Name:   model.AttrNameStatus,
21✔
198
                String: []string{deviceAuthDevice.Status},
21✔
199
        })
21✔
200
        for name, value := range deviceAuthDevice.IdDataStruct {
27✔
201
                attr := model.NewInventoryAttribute(model.ScopeIdentity).
6✔
202
                        SetName(name).
6✔
203
                        SetVal(value)
6✔
204
                if err := device.AppendAttr(attr); err != nil {
6✔
205
                        err = errors.Wrapf(err,
×
206
                                "failed to convert identity data for tenant %s, "+
×
207
                                        "device %s", tenant, inventoryDevice.ID)
×
208
                        l.Warn(err)
×
209
                }
×
210
        }
211

212
        // data from deployments
213
        if deploymentsDevice != nil {
26✔
214
                _ = device.AppendAttr(&model.InventoryAttribute{
5✔
215
                        Scope:  model.ScopeSystem,
5✔
216
                        Name:   model.AttrNameLatestDeploymentStatus,
5✔
217
                        String: []string{deploymentsDevice.DeviceDeploymentStatus},
5✔
218
                })
5✔
219
        }
5✔
220

221
        // return the device
222
        return device
21✔
223
}
224

225
func extractLocation(
226
        attrs inventory.DeviceAttributes,
227
) (bool, string) {
21✔
228
        latIdx := undefinedCoordinateIdx
21✔
229
        lonIdx := undefinedCoordinateIdx
21✔
230

21✔
231
        for i, attr := range attrs {
55✔
232
                if attr.Name == model.AttrNameGeoLatitude {
37✔
233
                        latIdx = i
3✔
234
                } else if attr.Name == model.AttrNameGeoLongitude {
37✔
235
                        lonIdx = i
3✔
236
                }
3✔
237
                if latIdx != undefinedCoordinateIdx && lonIdx != undefinedCoordinateIdx {
37✔
238
                        break
3✔
239
                }
240
        }
241
        if latIdx != undefinedCoordinateIdx && lonIdx != undefinedCoordinateIdx {
24✔
242
                latStr, ok := attrs[latIdx].Value.(string)
3✔
243
                if !ok {
3✔
NEW
244
                        return false, ""
×
NEW
245
                }
×
246
                lonStr, ok := attrs[lonIdx].Value.(string)
3✔
247
                if !ok {
3✔
NEW
248
                        return false, ""
×
NEW
249
                }
×
250
                if validLocation(latStr, lonStr) {
6✔
251
                        return true, latStr + "," + lonStr
3✔
252
                }
3✔
253
        }
254
        return false, ""
18✔
255
}
256

257
func validLocation(latStr, lonStr string) bool {
3✔
258
        lat, err := strconv.ParseFloat(latStr, 32)
3✔
259
        if err != nil {
3✔
NEW
260
                return false
×
NEW
261
        }
×
262
        if lat < -90 || lat > 90 {
3✔
NEW
263
                return false
×
NEW
264
        }
×
265
        lon, err := strconv.ParseFloat(lonStr, 32)
3✔
266
        if err != nil {
3✔
NEW
267
                return false
×
NEW
268
        }
×
269
        if lon < -180 || lon > 180 {
3✔
NEW
270
                return false
×
NEW
271
        }
×
272
        return true
3✔
273
}
274

275
func (i *indexer) processJobDeployments(
276
        ctx context.Context,
277
        tenant string,
278
        IDs IDs,
279
) {
3✔
280
        l := log.FromContext(ctx)
3✔
281
        depls := make([]*model.Deployment, 0, len(IDs))
3✔
282
        deploymentIDs := make([]string, 0, len(IDs))
3✔
283
        for deploymentID := range IDs {
10✔
284
                deploymentIDs = append(deploymentIDs, deploymentID)
7✔
285
        }
7✔
286
        // get device deployments from deployments
287
        deviceDeployments, err := i.deplClient.GetDeployments(ctx, tenant, deploymentIDs)
3✔
288
        if err != nil {
3✔
289
                l.Error(errors.Wrap(err, "failed to get device deployments from device deployments"))
×
290
                return
×
291
        }
×
292
        // process the results
293
        for deploymentID := range IDs {
10✔
294
                for _, d := range deviceDeployments {
19✔
295
                        if d.ID == deploymentID {
19✔
296
                                depl := i.processJobDeployment(ctx, tenant, d)
7✔
297
                                if depl != nil {
14✔
298
                                        depls = append(depls, depl)
7✔
299
                                }
7✔
300
                                break
7✔
301
                        }
302
                }
303
        }
304
        // bulk index the device
305
        if len(depls) > 0 {
6✔
306
                err = i.store.BulkIndexDeployments(ctx, depls)
3✔
307
                if err != nil {
3✔
308
                        err = errors.Wrap(err, "failed to bulk index the deployments")
×
309
                        l.Error(err)
×
310
                }
×
311
        }
312
}
313

314
func (i *indexer) processJobDeployment(
315
        ctx context.Context,
316
        tenant string,
317
        deployment *deployments.DeviceDeployment,
318
) *model.Deployment {
7✔
319
        deviceElapsedSeconds := uint(0)
7✔
320
        if deployment.Device == nil ||
7✔
321
                deployment.Deployment == nil {
7✔
322
                return nil
×
323
        } else if deployment.Device.Finished != nil && deployment.Device.Created != nil {
13✔
324
                deviceElapsedSeconds = uint(deployment.Device.Finished.Sub(
6✔
325
                        *deployment.Device.Created).Seconds())
6✔
326
        }
6✔
327
        res := &model.Deployment{
7✔
328
                ID:                          deployment.ID,
7✔
329
                DeviceID:                    deployment.Device.Id,
7✔
330
                DeploymentID:                deployment.Deployment.Id,
7✔
331
                TenantID:                    tenant,
7✔
332
                DeploymentName:              deployment.Deployment.Name,
7✔
333
                DeploymentArtifactName:      deployment.Deployment.ArtifactName,
7✔
334
                DeploymentType:              deployment.Deployment.Type,
7✔
335
                DeploymentCreated:           deployment.Deployment.Created,
7✔
336
                DeploymentFilterID:          deployment.Deployment.FilterId,
7✔
337
                DeploymentAllDevices:        deployment.Deployment.AllDevices,
7✔
338
                DeploymentForceInstallation: deployment.Deployment.ForceInstallation,
7✔
339
                DeploymentGroups:            deployment.Deployment.Groups,
7✔
340
                DeploymentPhased:            deployment.Deployment.PhaseId != "",
7✔
341
                DeploymentPhaseId:           deployment.Deployment.PhaseId,
7✔
342
                DeploymentRetries:           deployment.Deployment.Retries,
7✔
343
                DeploymentMaxDevices:        uint(deployment.Deployment.MaxDevices),
7✔
344
                DeploymentAutogenerateDelta: deployment.Deployment.AutogenerateDelta,
7✔
345
                DeviceCreated:               deployment.Device.Created,
7✔
346
                DeviceFinished:              deployment.Device.Finished,
7✔
347
                DeviceElapsedSeconds:        deviceElapsedSeconds,
7✔
348
                DeviceDeleted:               deployment.Device.Deleted,
7✔
349
                DeviceStatus:                deployment.Device.Status,
7✔
350
                DeviceIsLogAvailable:        deployment.Device.IsLogAvailable,
7✔
351
                DeviceRetries:               deployment.Device.Retries,
7✔
352
                DeviceAttempts:              deployment.Device.Attempts,
7✔
353
        }
7✔
354
        if deployment.Device.Image != nil {
12✔
355
                res.ImageID = deployment.Device.Image.Id
5✔
356
                res.ImageDescription = deployment.Device.Image.Description
5✔
357
                res.ImageArtifactName = deployment.Device.Image.Name
5✔
358
                res.ImageDeviceTypes = deployment.Device.Image.DeviceTypesCompatible
5✔
359
                res.ImageSigned = deployment.Device.Image.Signed
5✔
360
                res.ImageProvides = deployment.Device.Image.Provides
5✔
361
                res.ImageDepends = deployment.Device.Image.Depends
5✔
362
                res.ImageClearsProvides = deployment.Device.Image.ClearsProvides
5✔
363
                res.ImageSize = deployment.Device.Image.Size
5✔
364
                if deployment.Device.Image.Info != nil {
10✔
365
                        res.ImageArtifactInfoFormat = deployment.Device.Image.Info.Format
5✔
366
                        res.ImageArtifactInfoVersion = deployment.Device.Image.Info.Version
5✔
367
                }
5✔
368
        }
369
        return res
7✔
370
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc