• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 739771832

pending completion
739771832

Pull #87

gitlab-ci

Fabio Tranchitella
chore: bump the copyright year in the LICENSE file
Pull Request #87: MEN-5930: index device deployment objects

644 of 887 new or added lines in 15 files covered. (72.6%)

2 existing lines in 2 files now uncovered.

2555 of 3142 relevant lines covered (81.32%)

16.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.31
/app/indexer/jobs.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19
        "encoding/json"
20

21
        natsio "github.com/nats-io/nats.go"
22
        "github.com/pkg/errors"
23

24
        "github.com/mendersoftware/go-lib-micro/config"
25
        "github.com/mendersoftware/go-lib-micro/log"
26

27
        "github.com/mendersoftware/reporting/client/deployments"
28
        "github.com/mendersoftware/reporting/client/deviceauth"
29
        "github.com/mendersoftware/reporting/client/inventory"
30
        rconfig "github.com/mendersoftware/reporting/config"
31
        "github.com/mendersoftware/reporting/model"
32
)
33

34
type IDs map[string]bool
35
type ActionIDs map[string]IDs
36
type TenantActionIDs map[string]ActionIDs
37

38
func (i *indexer) GetJobs(ctx context.Context, jobs chan *model.Job) error {
7✔
39
        l := log.FromContext(ctx)
7✔
40

7✔
41
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
7✔
42
        err := i.nats.JetStreamCreateStream(streamName)
7✔
43
        if err != nil {
7✔
44
                return errors.Wrap(err, "failed to create the nats JetStream stream")
×
45
        }
×
46

47
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
7✔
48
        subject := streamName + "." + topic
7✔
49
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
7✔
50

7✔
51
        channel := make(chan *natsio.Msg, 1)
7✔
52
        unsubscribe, err := i.nats.JetStreamSubscribe(ctx, subject, durableName, channel)
7✔
53
        if err != nil {
9✔
54
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
2✔
55
        }
2✔
56

57
        go func() {
10✔
58
                l.Info("Reindexer ready to receive messages")
5✔
59
                defer func() {
10✔
60
                        _ = unsubscribe()
5✔
61
                }()
5✔
62

63
                for {
29✔
64
                        select {
24✔
65
                        case msg := <-channel:
19✔
66
                                job := &model.Job{}
19✔
67
                                err := json.Unmarshal(msg.Data, job)
19✔
68
                                if err != nil {
21✔
69
                                        err = errors.Wrap(err, "failed to unmarshall message")
2✔
70
                                        l.Error(err)
2✔
71
                                        if err := msg.Term(); err != nil {
4✔
72
                                                err = errors.Wrap(err, "failed to term the message")
2✔
73
                                                l.Error(err)
2✔
74
                                        }
2✔
75
                                        continue
2✔
76
                                }
77
                                if err = msg.Ack(); err != nil {
19✔
78
                                        err = errors.Wrap(err, "failed to ack the message")
2✔
79
                                        l.Error(err)
2✔
80
                                }
2✔
81
                                jobs <- job
17✔
82

83
                        case <-ctx.Done():
5✔
84
                                return
5✔
85
                        }
86
                }
87
        }()
88

89
        return nil
5✔
90
}
91

92
func (i *indexer) ProcessJobs(ctx context.Context, jobs []*model.Job) {
15✔
93
        l := log.FromContext(ctx)
15✔
94
        l.Debugf("Processing %d jobs", len(jobs))
15✔
95
        tenantsActionIDs := groupJobsIntoTenantActionIDs(jobs)
15✔
96
        for tenant, actionIDs := range tenantsActionIDs {
33✔
97
                for action, IDs := range actionIDs {
36✔
98
                        if action == model.ActionReindex {
34✔
99
                                i.processJobDevices(ctx, tenant, IDs)
16✔
100
                        } else if action == model.ActionReindexDeployment {
20✔
101
                                i.processJobDeployments(ctx, tenant, IDs)
2✔
102
                        } else {
2✔
NEW
103
                                l.Warnf("ignoring unknown job action: %v", action)
×
NEW
104
                        }
×
105
                }
106
        }
107
}
108

109
func (i *indexer) processJobDevices(
110
        ctx context.Context,
111
        tenant string,
112
        IDs IDs,
113
) {
16✔
114
        l := log.FromContext(ctx)
16✔
115
        devices := make([]*model.Device, 0, len(IDs))
16✔
116
        removedDevices := make([]*model.Device, 0, len(IDs))
16✔
117

16✔
118
        deviceIDs := make([]string, 0, len(IDs))
16✔
119
        for deviceID := range IDs {
61✔
120
                deviceIDs = append(deviceIDs, deviceID)
45✔
121
        }
45✔
122
        // get devices from deviceauth
123
        deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
16✔
124
        if err != nil {
18✔
125
                l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
2✔
126
                return
2✔
127
        }
2✔
128
        // get devices from inventory
129
        inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
14✔
130
        if err != nil {
16✔
131
                l.Error(errors.Wrap(err, "failed to get devices from inventory"))
2✔
132
                return
2✔
133
        }
2✔
134
        // process the results
135
        devices = devices[:0]
12✔
136
        removedDevices = removedDevices[:0]
12✔
137
        for _, deviceID := range deviceIDs {
45✔
138
                var deviceAuthDevice *deviceauth.DeviceAuthDevice
33✔
139
                var inventoryDevice *inventory.Device
33✔
140
                for _, d := range deviceAuthDevices {
96✔
141
                        if d.ID == deviceID {
90✔
142
                                deviceAuthDevice = &d
27✔
143
                                break
27✔
144
                        }
145
                }
146
                for _, d := range inventoryDevices {
96✔
147
                        if d.ID == inventory.DeviceID(deviceID) {
90✔
148
                                inventoryDevice = &d
27✔
149
                                break
27✔
150
                        }
151
                }
152
                if deviceAuthDevice == nil || inventoryDevice == nil {
39✔
153
                        removedDevices = append(removedDevices, &model.Device{
6✔
154
                                ID:       &deviceID,
6✔
155
                                TenantID: &tenant,
6✔
156
                        })
6✔
157
                        continue
6✔
158
                }
159
                device := i.processJobDevice(ctx, tenant, deviceAuthDevice, inventoryDevice)
27✔
160
                if device != nil {
54✔
161
                        devices = append(devices, device)
27✔
162
                }
27✔
163
        }
164
        // bulk index the device
165
        if len(devices) > 0 || len(removedDevices) > 0 {
24✔
166
                err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
12✔
167
                if err != nil {
14✔
168
                        err = errors.Wrap(err, "failed to bulk index the devices")
2✔
169
                        l.Error(err)
2✔
170
                }
2✔
171
        }
172
}
173

174
func (i *indexer) processJobDevice(
175
        ctx context.Context,
176
        tenant string,
177
        deviceAuthDevice *deviceauth.DeviceAuthDevice,
178
        inventoryDevice *inventory.Device,
179
) *model.Device {
27✔
180
        l := log.FromContext(ctx)
27✔
181
        //
27✔
182
        device := model.NewDevice(tenant, string(inventoryDevice.ID))
27✔
183
        // data from inventory
27✔
184
        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
27✔
185
        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
27✔
186
                inventoryDevice.Attributes, true, false)
27✔
187
        if err != nil {
27✔
188
                err = errors.Wrapf(err,
×
NEW
189
                        "failed to map device data for tenant %s, "+
×
190
                                "device %s", tenant, inventoryDevice.ID)
×
191
                l.Warn(err)
×
192
        } else {
27✔
193
                for _, invattr := range attributes {
56✔
194
                        attr := model.NewInventoryAttribute(invattr.Scope).
29✔
195
                                SetName(invattr.Name).
29✔
196
                                SetVal(invattr.Value)
29✔
197
                        if err := device.AppendAttr(attr); err != nil {
29✔
198
                                err = errors.Wrapf(err,
×
NEW
199
                                        "failed to convert device data for tenant %s, "+
×
200
                                                "device %s", tenant, inventoryDevice.ID)
×
201
                                l.Warn(err)
×
202
                        }
×
203
                }
204
        }
205
        // data from device auth
206
        _ = device.AppendAttr(&model.InventoryAttribute{
27✔
207
                Scope:  model.ScopeIdentity,
27✔
208
                Name:   model.AttrNameStatus,
27✔
209
                String: []string{deviceAuthDevice.Status},
27✔
210
        })
27✔
211
        for name, value := range deviceAuthDevice.IdDataStruct {
39✔
212
                attr := model.NewInventoryAttribute(model.ScopeIdentity).
12✔
213
                        SetName(name).
12✔
214
                        SetVal(value)
12✔
215
                if err := device.AppendAttr(attr); err != nil {
12✔
216
                        err = errors.Wrapf(err,
×
217
                                "failed to convert identity data for tenant %s, "+
×
218
                                        "device %s", tenant, inventoryDevice.ID)
×
219
                        l.Warn(err)
×
220
                }
×
221
        }
222
        // latest deployment
223
        deviceDeployment, err := i.deplClient.GetLatestFinishedDeployment(ctx, tenant,
27✔
224
                string(inventoryDevice.ID))
27✔
225
        if err != nil {
27✔
226
                l.Error(errors.Wrap(err, "failed to get device deployments from deployments"))
×
227
                return nil
×
228
        } else if deviceDeployment != nil {
46✔
229
                _ = device.AppendAttr(&model.InventoryAttribute{
19✔
230
                        Scope:  model.ScopeSystem,
19✔
231
                        Name:   model.AttrNameLatestDeploymentStatus,
19✔
232
                        String: []string{deviceDeployment.Device.Status},
19✔
233
                })
19✔
234
        }
19✔
235
        // return the device
236
        return device
27✔
237
}
238

239
func (i *indexer) processJobDeployments(
240
        ctx context.Context,
241
        tenant string,
242
        IDs IDs,
243
) {
2✔
244
        l := log.FromContext(ctx)
2✔
245
        depls := make([]*model.Deployment, 0, len(IDs))
2✔
246
        deploymentIDs := make([]string, 0, len(IDs))
2✔
247
        for deploymentID := range IDs {
8✔
248
                deploymentIDs = append(deploymentIDs, deploymentID)
6✔
249
        }
6✔
250
        // get device deployments from deployments
251
        deviceDeployments, err := i.deplClient.GetDeployments(ctx, tenant, deploymentIDs)
2✔
252
        if err != nil {
2✔
NEW
253
                l.Error(errors.Wrap(err, "failed to get device deployments from device deployments"))
×
NEW
254
                return
×
NEW
255
        }
×
256
        // process the results
257
        for deploymentID := range IDs {
8✔
258
                for _, d := range deviceDeployments {
18✔
259
                        if d.ID == deploymentID {
18✔
260
                                depl := i.processJobDeployment(ctx, tenant, d)
6✔
261
                                if depl != nil {
12✔
262
                                        depls = append(depls, depl)
6✔
263
                                }
6✔
264
                                break
6✔
265
                        }
266
                }
267
        }
268
        // bulk index the device
269
        if len(depls) > 0 {
4✔
270
                err = i.store.BulkIndexDeployments(ctx, depls)
2✔
271
                if err != nil {
2✔
NEW
272
                        err = errors.Wrap(err, "failed to bulk index the deployments")
×
NEW
273
                        l.Error(err)
×
NEW
274
                }
×
275
        }
276
}
277

278
func (i *indexer) processJobDeployment(
279
        ctx context.Context,
280
        tenant string,
281
        deployment *deployments.DeviceDeployment,
282
) *model.Deployment {
6✔
283
        deviceElapsedSeconds := uint(0)
6✔
284
        if deployment.Device == nil ||
6✔
285
                deployment.Deployment == nil {
6✔
NEW
286
                return nil
×
287
        } else if deployment.Device.Finished != nil && deployment.Device.Created != nil {
10✔
288
                deviceElapsedSeconds = uint(deployment.Device.Finished.Sub(
4✔
289
                        *deployment.Device.Created).Seconds())
4✔
290
        }
4✔
291
        res := &model.Deployment{
6✔
292
                ID:                          deployment.ID,
6✔
293
                DeviceID:                    deployment.Device.Id,
6✔
294
                DeploymentID:                deployment.Deployment.Id,
6✔
295
                TenantID:                    tenant,
6✔
296
                DeploymentName:              deployment.Deployment.Name,
6✔
297
                DeploymentArtifactName:      deployment.Deployment.ArtifactName,
6✔
298
                DeploymentFilterID:          deployment.Deployment.FilterId,
6✔
299
                DeploymentAllDevices:        deployment.Deployment.AllDevices,
6✔
300
                DeploymentForceInstallation: deployment.Deployment.ForceInstallation,
6✔
301
                DeploymentGroup:             deployment.Deployment.Group,
6✔
302
                DeploymentPhased:            deployment.Deployment.PhaseId != "",
6✔
303
                DeploymentPhaseId:           deployment.Deployment.PhaseId,
6✔
304
                DeploymentRetries:           deployment.Deployment.Retries,
6✔
305
                DeploymentMaxDevices:        uint(deployment.Deployment.MaxDevices),
6✔
306
                DeploymentAutogenerateDelta: deployment.Deployment.AutogenerateDelta,
6✔
307
                DeviceCreated:               deployment.Device.Created,
6✔
308
                DeviceFinished:              deployment.Device.Finished,
6✔
309
                DeviceElapsedSeconds:        deviceElapsedSeconds,
6✔
310
                DeviceDeleted:               deployment.Device.Deleted,
6✔
311
                DeviceStatus:                deployment.Device.Status,
6✔
312
                DeviceIsLogAvailable:        deployment.Device.IsLogAvailable,
6✔
313
                DeviceRetries:               deployment.Device.Retries,
6✔
314
                DeviceAttempts:              deployment.Device.Attempts,
6✔
315
        }
6✔
316
        if deployment.Device.Image != nil {
12✔
317
                res.ImageID = deployment.Device.Image.Id
6✔
318
                res.ImageDescription = deployment.Device.Image.Description
6✔
319
                res.ImageArtifactName = deployment.Device.Image.Name
6✔
320
                res.ImageDeviceTypes = deployment.Device.Image.DeviceTypesCompatible
6✔
321
                res.ImageSigned = deployment.Device.Image.Signed
6✔
322
                res.ImageProvides = deployment.Device.Image.Provides
6✔
323
                res.ImageDepends = deployment.Device.Image.Depends
6✔
324
                res.ImageClearsProvides = deployment.Device.Image.ClearsProvides
6✔
325
                res.ImageSize = deployment.Device.Image.Size
6✔
326
                if deployment.Device.Image.Info != nil {
12✔
327
                        res.ImageArtifactInfoFormat = deployment.Device.Image.Info.Format
6✔
328
                        res.ImageArtifactInfoVersion = deployment.Device.Image.Info.Version
6✔
329
                }
6✔
330
        }
331
        return res
6✔
332
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc