• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 813096809

pending completion
813096809

push

gitlab-ci

GitHub
Merge pull request #120 from mendersoftware/master

1 of 1 new or added line in 1 file covered. (100.0%)

8 existing lines in 3 files now uncovered.

2786 of 3259 relevant lines covered (85.49%)

17.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.62
/app/indexer/jobs.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19

20
        "github.com/pkg/errors"
21

22
        "github.com/mendersoftware/go-lib-micro/config"
23
        "github.com/mendersoftware/go-lib-micro/log"
24

25
        "github.com/mendersoftware/reporting/client/deployments"
26
        "github.com/mendersoftware/reporting/client/deviceauth"
27
        "github.com/mendersoftware/reporting/client/inventory"
28
        rconfig "github.com/mendersoftware/reporting/config"
29
        "github.com/mendersoftware/reporting/model"
30
)
31

32
type IDs map[string]bool
33
type ActionIDs map[string]IDs
34
type TenantActionIDs map[string]ActionIDs
35

36
func (i *indexer) GetJobs(ctx context.Context, jobs chan model.Job) error {
7✔
37
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
7✔
38

7✔
39
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
7✔
40
        subject := streamName + "." + topic
7✔
41
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
7✔
42

7✔
43
        err := i.nats.JetStreamSubscribe(ctx, subject, durableName, jobs)
7✔
44
        if err != nil {
11✔
45
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
4✔
46
        }
4✔
47

48
        return nil
3✔
49
}
50

51
func (i *indexer) ProcessJobs(ctx context.Context, jobs []model.Job) {
17✔
52
        l := log.FromContext(ctx)
17✔
53
        l.Debugf("Processing %d jobs", len(jobs))
17✔
54
        tenantsActionIDs := groupJobsIntoTenantActionIDs(jobs)
17✔
55
        for tenant, actionIDs := range tenantsActionIDs {
37✔
56
                for action, IDs := range actionIDs {
40✔
57
                        if action == model.ActionReindex {
36✔
58
                                i.processJobDevices(ctx, tenant, IDs)
16✔
59
                        } else if action == model.ActionReindexDeployment {
24✔
60
                                i.processJobDeployments(ctx, tenant, IDs)
4✔
61
                        } else {
4✔
UNCOV
62
                                l.Warnf("ignoring unknown job action: %v", action)
×
UNCOV
63
                        }
×
64
                }
65
        }
66
}
67

68
func (i *indexer) processJobDevices(
69
        ctx context.Context,
70
        tenant string,
71
        IDs IDs,
72
) {
16✔
73
        l := log.FromContext(ctx)
16✔
74
        devices := make([]*model.Device, 0, len(IDs))
16✔
75
        removedDevices := make([]*model.Device, 0, len(IDs))
16✔
76

16✔
77
        deviceIDs := make([]string, 0, len(IDs))
16✔
78
        for deviceID := range IDs {
61✔
79
                deviceIDs = append(deviceIDs, deviceID)
45✔
80
        }
45✔
81
        // get devices from deviceauth
82
        deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
16✔
83
        if err != nil {
18✔
84
                l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
2✔
85
                return
2✔
86
        }
2✔
87
        // get devices from inventory
88
        inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
14✔
89
        if err != nil {
16✔
90
                l.Error(errors.Wrap(err, "failed to get devices from inventory"))
2✔
91
                return
2✔
92
        }
2✔
93
        // process the results
94
        devices = devices[:0]
12✔
95
        removedDevices = removedDevices[:0]
12✔
96
        for _, deviceID := range deviceIDs {
45✔
97
                var deviceAuthDevice *deviceauth.DeviceAuthDevice
33✔
98
                var inventoryDevice *inventory.Device
33✔
99
                for _, d := range deviceAuthDevices {
96✔
100
                        if d.ID == deviceID {
90✔
101
                                deviceAuthDevice = &d
27✔
102
                                break
27✔
103
                        }
104
                }
105
                for _, d := range inventoryDevices {
96✔
106
                        if d.ID == inventory.DeviceID(deviceID) {
90✔
107
                                inventoryDevice = &d
27✔
108
                                break
27✔
109
                        }
110
                }
111
                if deviceAuthDevice == nil || inventoryDevice == nil {
39✔
112
                        removedDevices = append(removedDevices, &model.Device{
6✔
113
                                ID:       &deviceID,
6✔
114
                                TenantID: &tenant,
6✔
115
                        })
6✔
116
                        continue
6✔
117
                }
118
                device := i.processJobDevice(ctx, tenant, deviceAuthDevice, inventoryDevice)
27✔
119
                if device != nil {
54✔
120
                        devices = append(devices, device)
27✔
121
                }
27✔
122
        }
123
        // bulk index the device
124
        if len(devices) > 0 || len(removedDevices) > 0 {
24✔
125
                err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
12✔
126
                if err != nil {
14✔
127
                        err = errors.Wrap(err, "failed to bulk index the devices")
2✔
128
                        l.Error(err)
2✔
129
                }
2✔
130
        }
131
}
132

133
func (i *indexer) processJobDevice(
134
        ctx context.Context,
135
        tenant string,
136
        deviceAuthDevice *deviceauth.DeviceAuthDevice,
137
        inventoryDevice *inventory.Device,
138
) *model.Device {
27✔
139
        l := log.FromContext(ctx)
27✔
140
        //
27✔
141
        device := model.NewDevice(tenant, string(inventoryDevice.ID))
27✔
142
        // data from inventory
27✔
143
        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
27✔
144
        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
27✔
145
                inventoryDevice.Attributes, true, false)
27✔
146
        if err != nil {
27✔
147
                err = errors.Wrapf(err,
×
148
                        "failed to map device data for tenant %s, "+
×
149
                                "device %s", tenant, inventoryDevice.ID)
×
150
                l.Warn(err)
×
151
        } else {
27✔
152
                for _, invattr := range attributes {
56✔
153
                        attr := model.NewInventoryAttribute(invattr.Scope).
29✔
154
                                SetName(invattr.Name).
29✔
155
                                SetVal(invattr.Value)
29✔
156
                        if err := device.AppendAttr(attr); err != nil {
29✔
157
                                err = errors.Wrapf(err,
×
158
                                        "failed to convert device data for tenant %s, "+
×
159
                                                "device %s", tenant, inventoryDevice.ID)
×
160
                                l.Warn(err)
×
161
                        }
×
162
                }
163
        }
164
        // data from device auth
165
        _ = device.AppendAttr(&model.InventoryAttribute{
27✔
166
                Scope:  model.ScopeIdentity,
27✔
167
                Name:   model.AttrNameStatus,
27✔
168
                String: []string{deviceAuthDevice.Status},
27✔
169
        })
27✔
170
        for name, value := range deviceAuthDevice.IdDataStruct {
39✔
171
                attr := model.NewInventoryAttribute(model.ScopeIdentity).
12✔
172
                        SetName(name).
12✔
173
                        SetVal(value)
12✔
174
                if err := device.AppendAttr(attr); err != nil {
12✔
175
                        err = errors.Wrapf(err,
×
176
                                "failed to convert identity data for tenant %s, "+
×
177
                                        "device %s", tenant, inventoryDevice.ID)
×
178
                        l.Warn(err)
×
179
                }
×
180
        }
181
        // latest deployment
182
        deviceDeployment, err := i.deplClient.GetLatestFinishedDeployment(ctx, tenant,
27✔
183
                string(inventoryDevice.ID))
27✔
184
        if err != nil {
27✔
185
                l.Error(errors.Wrap(err, "failed to get device deployments from deployments"))
×
186
                return nil
×
187
        } else if deviceDeployment != nil {
46✔
188
                _ = device.AppendAttr(&model.InventoryAttribute{
19✔
189
                        Scope:  model.ScopeSystem,
19✔
190
                        Name:   model.AttrNameLatestDeploymentStatus,
19✔
191
                        String: []string{deviceDeployment.Device.Status},
19✔
192
                })
19✔
193
        }
19✔
194
        // return the device
195
        return device
27✔
196
}
197

198
func (i *indexer) processJobDeployments(
199
        ctx context.Context,
200
        tenant string,
201
        IDs IDs,
202
) {
4✔
203
        l := log.FromContext(ctx)
4✔
204
        depls := make([]*model.Deployment, 0, len(IDs))
4✔
205
        deploymentIDs := make([]string, 0, len(IDs))
4✔
206
        for deploymentID := range IDs {
14✔
207
                deploymentIDs = append(deploymentIDs, deploymentID)
10✔
208
        }
10✔
209
        // get device deployments from deployments
210
        deviceDeployments, err := i.deplClient.GetDeployments(ctx, tenant, deploymentIDs)
4✔
211
        if err != nil {
4✔
212
                l.Error(errors.Wrap(err, "failed to get device deployments from device deployments"))
×
213
                return
×
214
        }
×
215
        // process the results
216
        for deploymentID := range IDs {
14✔
217
                for _, d := range deviceDeployments {
28✔
218
                        if d.ID == deploymentID {
28✔
219
                                depl := i.processJobDeployment(ctx, tenant, d)
10✔
220
                                if depl != nil {
20✔
221
                                        depls = append(depls, depl)
10✔
222
                                }
10✔
223
                                break
10✔
224
                        }
225
                }
226
        }
227
        // bulk index the device
228
        if len(depls) > 0 {
8✔
229
                err = i.store.BulkIndexDeployments(ctx, depls)
4✔
230
                if err != nil {
4✔
231
                        err = errors.Wrap(err, "failed to bulk index the deployments")
×
232
                        l.Error(err)
×
233
                }
×
234
        }
235
}
236

237
func (i *indexer) processJobDeployment(
238
        ctx context.Context,
239
        tenant string,
240
        deployment *deployments.DeviceDeployment,
241
) *model.Deployment {
10✔
242
        deviceElapsedSeconds := uint(0)
10✔
243
        if deployment.Device == nil ||
10✔
244
                deployment.Deployment == nil {
10✔
245
                return nil
×
246
        } else if deployment.Device.Finished != nil && deployment.Device.Created != nil {
18✔
247
                deviceElapsedSeconds = uint(deployment.Device.Finished.Sub(
8✔
248
                        *deployment.Device.Created).Seconds())
8✔
249
        }
8✔
250
        res := &model.Deployment{
10✔
251
                ID:                          deployment.ID,
10✔
252
                DeviceID:                    deployment.Device.Id,
10✔
253
                DeploymentID:                deployment.Deployment.Id,
10✔
254
                TenantID:                    tenant,
10✔
255
                DeploymentName:              deployment.Deployment.Name,
10✔
256
                DeploymentArtifactName:      deployment.Deployment.ArtifactName,
10✔
257
                DeploymentType:              deployment.Deployment.Type,
10✔
258
                DeploymentCreated:           deployment.Deployment.Created,
10✔
259
                DeploymentFilterID:          deployment.Deployment.FilterId,
10✔
260
                DeploymentAllDevices:        deployment.Deployment.AllDevices,
10✔
261
                DeploymentForceInstallation: deployment.Deployment.ForceInstallation,
10✔
262
                DeploymentGroup:             deployment.Deployment.Group,
10✔
263
                DeploymentPhased:            deployment.Deployment.PhaseId != "",
10✔
264
                DeploymentPhaseId:           deployment.Deployment.PhaseId,
10✔
265
                DeploymentRetries:           deployment.Deployment.Retries,
10✔
266
                DeploymentMaxDevices:        uint(deployment.Deployment.MaxDevices),
10✔
267
                DeploymentAutogenerateDelta: deployment.Deployment.AutogenerateDelta,
10✔
268
                DeviceCreated:               deployment.Device.Created,
10✔
269
                DeviceFinished:              deployment.Device.Finished,
10✔
270
                DeviceElapsedSeconds:        deviceElapsedSeconds,
10✔
271
                DeviceDeleted:               deployment.Device.Deleted,
10✔
272
                DeviceStatus:                deployment.Device.Status,
10✔
273
                DeviceIsLogAvailable:        deployment.Device.IsLogAvailable,
10✔
274
                DeviceRetries:               deployment.Device.Retries,
10✔
275
                DeviceAttempts:              deployment.Device.Attempts,
10✔
276
        }
10✔
277
        if deployment.Device.Image != nil {
18✔
278
                res.ImageID = deployment.Device.Image.Id
8✔
279
                res.ImageDescription = deployment.Device.Image.Description
8✔
280
                res.ImageArtifactName = deployment.Device.Image.Name
8✔
281
                res.ImageDeviceTypes = deployment.Device.Image.DeviceTypesCompatible
8✔
282
                res.ImageSigned = deployment.Device.Image.Signed
8✔
283
                res.ImageProvides = deployment.Device.Image.Provides
8✔
284
                res.ImageDepends = deployment.Device.Image.Depends
8✔
285
                res.ImageClearsProvides = deployment.Device.Image.ClearsProvides
8✔
286
                res.ImageSize = deployment.Device.Image.Size
8✔
287
                if deployment.Device.Image.Info != nil {
16✔
288
                        res.ImageArtifactInfoFormat = deployment.Device.Image.Info.Format
8✔
289
                        res.ImageArtifactInfoVersion = deployment.Device.Image.Info.Version
8✔
290
                }
8✔
291
        }
292
        return res
10✔
293
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc