• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 837242630

pending completion
837242630

Pull #130

gitlab-ci

Krzysztof Jaskiewicz
test: acceptance test for geo queries
Pull Request #130: support for geolocation queries

52 of 56 new or added lines in 3 files covered. (92.86%)

12 existing lines in 2 files now uncovered.

2861 of 3355 relevant lines covered (85.28%)

17.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.78
/app/indexer/jobs.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19

20
        "github.com/pkg/errors"
21

22
        "github.com/mendersoftware/go-lib-micro/config"
23
        "github.com/mendersoftware/go-lib-micro/log"
24

25
        "github.com/mendersoftware/reporting/client/deployments"
26
        "github.com/mendersoftware/reporting/client/deviceauth"
27
        "github.com/mendersoftware/reporting/client/inventory"
28
        rconfig "github.com/mendersoftware/reporting/config"
29
        "github.com/mendersoftware/reporting/model"
30
)
31

32
type IDs map[string]bool
33
type ActionIDs map[string]IDs
34
type TenantActionIDs map[string]ActionIDs
35

36
func (i *indexer) GetJobs(ctx context.Context, jobs chan model.Job) error {
7✔
37
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
7✔
38

7✔
39
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
7✔
40
        subject := streamName + "." + topic
7✔
41
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
7✔
42

7✔
43
        err := i.nats.JetStreamSubscribe(ctx, subject, durableName, jobs)
7✔
44
        if err != nil {
11✔
45
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
4✔
46
        }
4✔
47

48
        return nil
3✔
49
}
50

51
func (i *indexer) ProcessJobs(ctx context.Context, jobs []model.Job) {
15✔
52
        l := log.FromContext(ctx)
15✔
53
        l.Debugf("Processing %d jobs", len(jobs))
15✔
54
        tenantsActionIDs := groupJobsIntoTenantActionIDs(jobs)
15✔
55
        for tenant, actionIDs := range tenantsActionIDs {
33✔
56
                for action, IDs := range actionIDs {
36✔
57
                        if action == model.ActionReindex {
32✔
58
                                i.processJobDevices(ctx, tenant, IDs)
14✔
59
                        } else if action == model.ActionReindexDeployment {
22✔
60
                                i.processJobDeployments(ctx, tenant, IDs)
4✔
61
                        } else {
4✔
UNCOV
62
                                l.Warnf("ignoring unknown job action: %v", action)
×
UNCOV
63
                        }
×
64
                }
65
        }
66
}
67

68
func (i *indexer) processJobDevices(
69
        ctx context.Context,
70
        tenant string,
71
        IDs IDs,
72
) {
14✔
73
        l := log.FromContext(ctx)
14✔
74
        devices := make([]*model.Device, 0, len(IDs))
14✔
75
        removedDevices := make([]*model.Device, 0, len(IDs))
14✔
76

14✔
77
        deviceIDs := make([]string, 0, len(IDs))
14✔
78
        for deviceID := range IDs {
53✔
79
                deviceIDs = append(deviceIDs, deviceID)
39✔
80
        }
39✔
81
        // get devices from deviceauth
82
        deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
14✔
83
        if err != nil {
16✔
84
                l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
2✔
85
                return
2✔
86
        }
2✔
87
        // get devices from inventory
88
        inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
12✔
89
        if err != nil {
14✔
90
                l.Error(errors.Wrap(err, "failed to get devices from inventory"))
2✔
91
                return
2✔
92
        }
2✔
93
        // process the results
94
        devices = devices[:0]
10✔
95
        removedDevices = removedDevices[:0]
10✔
96
        for _, deviceID := range deviceIDs {
37✔
97
                var deviceAuthDevice *deviceauth.DeviceAuthDevice
27✔
98
                var inventoryDevice *inventory.Device
27✔
99
                for _, d := range deviceAuthDevices {
80✔
100
                        if d.ID == deviceID {
76✔
101
                                deviceAuthDevice = &d
23✔
102
                                break
23✔
103
                        }
104
                }
105
                for _, d := range inventoryDevices {
80✔
106
                        if d.ID == inventory.DeviceID(deviceID) {
76✔
107
                                inventoryDevice = &d
23✔
108
                                break
23✔
109
                        }
110
                }
111
                if deviceAuthDevice == nil || inventoryDevice == nil {
31✔
112
                        removedDevices = append(removedDevices, &model.Device{
4✔
113
                                ID:       &deviceID,
4✔
114
                                TenantID: &tenant,
4✔
115
                        })
4✔
116
                        continue
4✔
117
                }
118
                device := i.processJobDevice(ctx, tenant, deviceAuthDevice, inventoryDevice)
23✔
119
                if device != nil {
46✔
120
                        devices = append(devices, device)
23✔
121
                }
23✔
122
        }
123
        // bulk index the device
124
        if len(devices) > 0 || len(removedDevices) > 0 {
20✔
125
                err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
10✔
126
                if err != nil {
12✔
127
                        err = errors.Wrap(err, "failed to bulk index the devices")
2✔
128
                        l.Error(err)
2✔
129
                }
2✔
130
        }
131
}
132

133
func (i *indexer) processJobDevice(
134
        ctx context.Context,
135
        tenant string,
136
        deviceAuthDevice *deviceauth.DeviceAuthDevice,
137
        inventoryDevice *inventory.Device,
138
) *model.Device {
23✔
139
        l := log.FromContext(ctx)
23✔
140
        //
23✔
141
        device := model.NewDevice(tenant, string(inventoryDevice.ID))
23✔
142
        // data from inventory
23✔
143
        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
23✔
144
        // extract location from attributes
23✔
145
        ok, location := extractLocation(inventoryDevice.Attributes)
23✔
146
        if ok {
26✔
147
                device.Location = &location
3✔
148
        }
3✔
149
        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
23✔
150
                inventoryDevice.Attributes, true, false)
23✔
151
        if err != nil {
23✔
152
                err = errors.Wrapf(err,
×
153
                        "failed to map device data for tenant %s, "+
×
UNCOV
154
                                "device %s", tenant, inventoryDevice.ID)
×
UNCOV
155
                l.Warn(err)
×
156
        } else {
23✔
157
                for _, invattr := range attributes {
58✔
158
                        attr := model.NewInventoryAttribute(invattr.Scope).
35✔
159
                                SetName(invattr.Name).
35✔
160
                                SetVal(invattr.Value)
35✔
161
                        if err := device.AppendAttr(attr); err != nil {
35✔
162
                                err = errors.Wrapf(err,
×
163
                                        "failed to convert device data for tenant %s, "+
×
164
                                                "device %s", tenant, inventoryDevice.ID)
×
UNCOV
165
                                l.Warn(err)
×
UNCOV
166
                        }
×
167
                }
168
        }
169
        // data from device auth
170
        _ = device.AppendAttr(&model.InventoryAttribute{
23✔
171
                Scope:  model.ScopeIdentity,
23✔
172
                Name:   model.AttrNameStatus,
23✔
173
                String: []string{deviceAuthDevice.Status},
23✔
174
        })
23✔
175
        for name, value := range deviceAuthDevice.IdDataStruct {
31✔
176
                attr := model.NewInventoryAttribute(model.ScopeIdentity).
8✔
177
                        SetName(name).
8✔
178
                        SetVal(value)
8✔
179
                if err := device.AppendAttr(attr); err != nil {
8✔
UNCOV
180
                        err = errors.Wrapf(err,
×
UNCOV
181
                                "failed to convert identity data for tenant %s, "+
×
UNCOV
182
                                        "device %s", tenant, inventoryDevice.ID)
×
183
                        l.Warn(err)
×
184
                }
×
185
        }
186
        // FIXME:
187
        // this part of the code will be reintroduced in a different form
188
        // see https://northerntech.atlassian.net/browse/MEN-6422 for details
189
        /*
190
                // latest deployment
191
                deviceDeployment, err := i.deplClient.GetLatestFinishedDeployment(ctx, tenant,
192
                        string(inventoryDevice.ID))
193
                if err != nil {
194
                        l.Error(errors.Wrap(err, "failed to get device deployments from deployments"))
195
                        return nil
196
                } else if deviceDeployment != nil {
197
                        _ = device.AppendAttr(&model.InventoryAttribute{
198
                                Scope:  model.ScopeSystem,
199
                                Name:   model.AttrNameLatestDeploymentStatus,
200
                                String: []string{deviceDeployment.Device.Status},
201
                        })
202
                }
203
        */
204
        // return the device
205
        return device
23✔
206
}
207

208
func extractLocation(
209
        attrs inventory.DeviceAttributes,
210
) (bool, string) {
23✔
211
        latIdx := -1
23✔
212
        lonIdx := -1
23✔
213
        check := false
23✔
214

23✔
215
        for i, attr := range attrs {
58✔
216
                if attr.Name == "geo-lat" {
38✔
217
                        latIdx = i
3✔
218
                        check = true
3✔
219
                } else if attr.Name == "geo-lon" {
38✔
220
                        lonIdx = i
3✔
221
                        check = true
3✔
222
                }
3✔
223
                if check && latIdx >= 0 && lonIdx >= 0 {
38✔
224
                        break
3✔
225
                }
226
        }
227
        if latIdx >= 0 && lonIdx >= 0 {
26✔
228
                latStr, ok := attrs[latIdx].Value.(string)
3✔
229
                if !ok {
3✔
NEW
230
                        return false, ""
×
NEW
231
                }
×
232
                lonStr, ok := attrs[lonIdx].Value.(string)
3✔
233
                if !ok {
3✔
NEW
234
                        return false, ""
×
NEW
235
                }
×
236
                return true, latStr + "," + lonStr
3✔
237
        }
238
        return false, ""
20✔
239
}
240

241
func (i *indexer) processJobDeployments(
242
        ctx context.Context,
243
        tenant string,
244
        IDs IDs,
245
) {
4✔
246
        l := log.FromContext(ctx)
4✔
247
        depls := make([]*model.Deployment, 0, len(IDs))
4✔
248
        deploymentIDs := make([]string, 0, len(IDs))
4✔
249
        for deploymentID := range IDs {
14✔
250
                deploymentIDs = append(deploymentIDs, deploymentID)
10✔
251
        }
10✔
252
        // get device deployments from deployments
253
        deviceDeployments, err := i.deplClient.GetDeployments(ctx, tenant, deploymentIDs)
4✔
254
        if err != nil {
4✔
255
                l.Error(errors.Wrap(err, "failed to get device deployments from device deployments"))
×
256
                return
×
257
        }
×
258
        // process the results
259
        for deploymentID := range IDs {
14✔
260
                for _, d := range deviceDeployments {
28✔
261
                        if d.ID == deploymentID {
28✔
262
                                depl := i.processJobDeployment(ctx, tenant, d)
10✔
263
                                if depl != nil {
20✔
264
                                        depls = append(depls, depl)
10✔
265
                                }
10✔
266
                                break
10✔
267
                        }
268
                }
269
        }
270
        // bulk index the device
271
        if len(depls) > 0 {
8✔
272
                err = i.store.BulkIndexDeployments(ctx, depls)
4✔
273
                if err != nil {
4✔
274
                        err = errors.Wrap(err, "failed to bulk index the deployments")
×
275
                        l.Error(err)
×
276
                }
×
277
        }
278
}
279

280
func (i *indexer) processJobDeployment(
281
        ctx context.Context,
282
        tenant string,
283
        deployment *deployments.DeviceDeployment,
284
) *model.Deployment {
10✔
285
        deviceElapsedSeconds := uint(0)
10✔
286
        if deployment.Device == nil ||
10✔
287
                deployment.Deployment == nil {
10✔
UNCOV
288
                return nil
×
289
        } else if deployment.Device.Finished != nil && deployment.Device.Created != nil {
18✔
290
                deviceElapsedSeconds = uint(deployment.Device.Finished.Sub(
8✔
291
                        *deployment.Device.Created).Seconds())
8✔
292
        }
8✔
293
        res := &model.Deployment{
10✔
294
                ID:                          deployment.ID,
10✔
295
                DeviceID:                    deployment.Device.Id,
10✔
296
                DeploymentID:                deployment.Deployment.Id,
10✔
297
                TenantID:                    tenant,
10✔
298
                DeploymentName:              deployment.Deployment.Name,
10✔
299
                DeploymentArtifactName:      deployment.Deployment.ArtifactName,
10✔
300
                DeploymentType:              deployment.Deployment.Type,
10✔
301
                DeploymentCreated:           deployment.Deployment.Created,
10✔
302
                DeploymentFilterID:          deployment.Deployment.FilterId,
10✔
303
                DeploymentAllDevices:        deployment.Deployment.AllDevices,
10✔
304
                DeploymentForceInstallation: deployment.Deployment.ForceInstallation,
10✔
305
                DeploymentGroups:            deployment.Deployment.Groups,
10✔
306
                DeploymentPhased:            deployment.Deployment.PhaseId != "",
10✔
307
                DeploymentPhaseId:           deployment.Deployment.PhaseId,
10✔
308
                DeploymentRetries:           deployment.Deployment.Retries,
10✔
309
                DeploymentMaxDevices:        uint(deployment.Deployment.MaxDevices),
10✔
310
                DeploymentAutogenerateDelta: deployment.Deployment.AutogenerateDelta,
10✔
311
                DeviceCreated:               deployment.Device.Created,
10✔
312
                DeviceFinished:              deployment.Device.Finished,
10✔
313
                DeviceElapsedSeconds:        deviceElapsedSeconds,
10✔
314
                DeviceDeleted:               deployment.Device.Deleted,
10✔
315
                DeviceStatus:                deployment.Device.Status,
10✔
316
                DeviceIsLogAvailable:        deployment.Device.IsLogAvailable,
10✔
317
                DeviceRetries:               deployment.Device.Retries,
10✔
318
                DeviceAttempts:              deployment.Device.Attempts,
10✔
319
        }
10✔
320
        if deployment.Device.Image != nil {
18✔
321
                res.ImageID = deployment.Device.Image.Id
8✔
322
                res.ImageDescription = deployment.Device.Image.Description
8✔
323
                res.ImageArtifactName = deployment.Device.Image.Name
8✔
324
                res.ImageDeviceTypes = deployment.Device.Image.DeviceTypesCompatible
8✔
325
                res.ImageSigned = deployment.Device.Image.Signed
8✔
326
                res.ImageProvides = deployment.Device.Image.Provides
8✔
327
                res.ImageDepends = deployment.Device.Image.Depends
8✔
328
                res.ImageClearsProvides = deployment.Device.Image.ClearsProvides
8✔
329
                res.ImageSize = deployment.Device.Image.Size
8✔
330
                if deployment.Device.Image.Info != nil {
16✔
331
                        res.ImageArtifactInfoFormat = deployment.Device.Image.Info.Format
8✔
332
                        res.ImageArtifactInfoVersion = deployment.Device.Image.Info.Version
8✔
333
                }
8✔
334
        }
335
        return res
10✔
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc