• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 844901443

pending completion
844901443

Pull #134

gitlab-ci

Krzysztof Jaskiewicz
feat: index last device deployment status
Pull Request #134: feat: index last device deployment status

37 of 42 new or added lines in 2 files covered. (88.1%)

4 existing lines in 2 files now uncovered.

2830 of 3328 relevant lines covered (85.04%)

16.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.79
/app/indexer/jobs.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19

20
        "github.com/pkg/errors"
21

22
        "github.com/mendersoftware/go-lib-micro/config"
23
        "github.com/mendersoftware/go-lib-micro/log"
24

25
        "github.com/mendersoftware/reporting/client/deployments"
26
        "github.com/mendersoftware/reporting/client/deviceauth"
27
        "github.com/mendersoftware/reporting/client/inventory"
28
        rconfig "github.com/mendersoftware/reporting/config"
29
        "github.com/mendersoftware/reporting/model"
30
)
31

32
type IDs map[string]bool
33
type ActionIDs map[string]IDs
34
type TenantActionIDs map[string]ActionIDs
35

36
func (i *indexer) GetJobs(ctx context.Context, jobs chan model.Job) error {
37
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
38

39
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
40
        subject := streamName + "." + topic
41
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
42

43
        err := i.nats.JetStreamSubscribe(ctx, subject, durableName, jobs)
44
        if err != nil {
45
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
46
        }
47

48
        return nil
49
}
50

51
func (i *indexer) ProcessJobs(ctx context.Context, jobs []model.Job) {
52
        l := log.FromContext(ctx)
53
        l.Debugf("Processing %d jobs", len(jobs))
54
        tenantsActionIDs := groupJobsIntoTenantActionIDs(jobs)
55
        for tenant, actionIDs := range tenantsActionIDs {
56
                for action, IDs := range actionIDs {
57
                        if action == model.ActionReindex {
58
                                i.processJobDevices(ctx, tenant, IDs)
59
                        } else if action == model.ActionReindexDeployment {
60
                                i.processJobDeployments(ctx, tenant, IDs)
61
                        } else {
62
                                l.Warnf("ignoring unknown job action: %v", action)
63
                        }
64
                }
65
        }
66
}
67

68
func (i *indexer) processJobDevices(
69
        ctx context.Context,
70
        tenant string,
71
        IDs IDs,
72
) {
73
        l := log.FromContext(ctx)
74
        devices := make([]*model.Device, 0, len(IDs))
75
        removedDevices := make([]*model.Device, 0, len(IDs))
76

77
        deviceIDs := make([]string, 0, len(IDs))
78
        for deviceID := range IDs {
79
                deviceIDs = append(deviceIDs, deviceID)
80
        }
81
        // get devices from deviceauth
82
        deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
83
        if err != nil {
84
                l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
85
                return
86
        }
87
        // get devices from inventory
88
        inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
89
        if err != nil {
90
                l.Error(errors.Wrap(err, "failed to get devices from inventory"))
91
                return
92
        }
93
        // get last deployment statuses from deployment
94
        deploymentsDevices, err := i.deplClient.GetLatestFinishedDeployment(ctx, tenant, deviceIDs)
95
        if err != nil {
NEW
96
                l.Error(errors.Wrap(err, "failed to get last device deployments from deployments"))
NEW
97
                return
NEW
98
        }
99

100
        // process the results
101
        devices = devices[:0]
102
        removedDevices = removedDevices[:0]
103
        for _, deviceID := range deviceIDs {
104
                var deviceAuthDevice *deviceauth.DeviceAuthDevice
105
                var inventoryDevice *inventory.Device
106
                var deploymentsDevice *deployments.LastDeviceDeployment
107
                for _, d := range deviceAuthDevices {
108
                        if d.ID == deviceID {
109
                                deviceAuthDevice = &d
110
                                break
111
                        }
112
                }
113
                for _, d := range inventoryDevices {
114
                        if d.ID == inventory.DeviceID(deviceID) {
115
                                inventoryDevice = &d
116
                                break
117
                        }
118
                }
119
                for _, d := range deploymentsDevices {
120
                        if d.DeviceID == deviceID {
121
                                deploymentsDevice = &d
122
                                break
123
                        }
124
                }
125
                if deviceAuthDevice == nil || inventoryDevice == nil {
126
                        removedDevices = append(removedDevices, &model.Device{
127
                                ID:       &deviceID,
128
                                TenantID: &tenant,
129
                        })
130
                        continue
131
                }
132
                device := i.processJobDevice(
133
                        ctx,
134
                        tenant,
135
                        deviceAuthDevice,
136
                        inventoryDevice,
137
                        deploymentsDevice,
138
                )
139
                if device != nil {
140
                        devices = append(devices, device)
141
                }
142
        }
143
        // bulk index the device
144
        if len(devices) > 0 || len(removedDevices) > 0 {
145
                err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
146
                if err != nil {
147
                        err = errors.Wrap(err, "failed to bulk index the devices")
148
                        l.Error(err)
149
                }
150
        }
151
}
152

153
func (i *indexer) processJobDevice(
154
        ctx context.Context,
155
        tenant string,
156
        deviceAuthDevice *deviceauth.DeviceAuthDevice,
157
        inventoryDevice *inventory.Device,
158
        deploymentsDevice *deployments.LastDeviceDeployment,
159
) *model.Device {
160
        l := log.FromContext(ctx)
161
        //
162
        device := model.NewDevice(tenant, string(inventoryDevice.ID))
163
        // data from inventory
164
        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
165
        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
166
                inventoryDevice.Attributes, true, false)
167
        if err != nil {
168
                err = errors.Wrapf(err,
169
                        "failed to map device data for tenant %s, "+
170
                                "device %s", tenant, inventoryDevice.ID)
171
                l.Warn(err)
172
        } else {
173
                for _, invattr := range attributes {
174
                        attr := model.NewInventoryAttribute(invattr.Scope).
175
                                SetName(invattr.Name).
176
                                SetVal(invattr.Value)
177
                        if err := device.AppendAttr(attr); err != nil {
178
                                err = errors.Wrapf(err,
179
                                        "failed to convert device data for tenant %s, "+
180
                                                "device %s", tenant, inventoryDevice.ID)
181
                                l.Warn(err)
182
                        }
183
                }
184
        }
185
        // data from device auth
186
        _ = device.AppendAttr(&model.InventoryAttribute{
187
                Scope:  model.ScopeIdentity,
188
                Name:   model.AttrNameStatus,
189
                String: []string{deviceAuthDevice.Status},
190
        })
191
        for name, value := range deviceAuthDevice.IdDataStruct {
192
                attr := model.NewInventoryAttribute(model.ScopeIdentity).
193
                        SetName(name).
194
                        SetVal(value)
195
                if err := device.AppendAttr(attr); err != nil {
196
                        err = errors.Wrapf(err,
197
                                "failed to convert identity data for tenant %s, "+
198
                                        "device %s", tenant, inventoryDevice.ID)
199
                        l.Warn(err)
200
                }
201
        }
202

203
        // data from deployments
204
        if deploymentsDevice != nil {
205
                _ = device.AppendAttr(&model.InventoryAttribute{
206
                        Scope:  model.ScopeSystem,
207
                        Name:   model.AttrNameLatestDeploymentStatus,
208
                        String: []string{deploymentsDevice.DeviceDeploymentStatus},
209
                })
210
        }
211

212
        // return the device
213
        return device
214
}
215

216
func (i *indexer) processJobDeployments(
217
        ctx context.Context,
218
        tenant string,
219
        IDs IDs,
220
) {
221
        l := log.FromContext(ctx)
222
        depls := make([]*model.Deployment, 0, len(IDs))
223
        deploymentIDs := make([]string, 0, len(IDs))
224
        for deploymentID := range IDs {
225
                deploymentIDs = append(deploymentIDs, deploymentID)
226
        }
227
        // get device deployments from deployments
228
        deviceDeployments, err := i.deplClient.GetDeployments(ctx, tenant, deploymentIDs)
229
        if err != nil {
230
                l.Error(errors.Wrap(err, "failed to get device deployments from device deployments"))
231
                return
232
        }
233
        // process the results
234
        for deploymentID := range IDs {
235
                for _, d := range deviceDeployments {
236
                        if d.ID == deploymentID {
237
                                depl := i.processJobDeployment(ctx, tenant, d)
238
                                if depl != nil {
239
                                        depls = append(depls, depl)
240
                                }
241
                                break
242
                        }
243
                }
244
        }
245
        // bulk index the device
246
        if len(depls) > 0 {
247
                err = i.store.BulkIndexDeployments(ctx, depls)
248
                if err != nil {
249
                        err = errors.Wrap(err, "failed to bulk index the deployments")
250
                        l.Error(err)
251
                }
252
        }
253
}
254

255
func (i *indexer) processJobDeployment(
256
        ctx context.Context,
257
        tenant string,
258
        deployment *deployments.DeviceDeployment,
259
) *model.Deployment {
260
        deviceElapsedSeconds := uint(0)
261
        if deployment.Device == nil ||
262
                deployment.Deployment == nil {
263
                return nil
264
        } else if deployment.Device.Finished != nil && deployment.Device.Created != nil {
265
                deviceElapsedSeconds = uint(deployment.Device.Finished.Sub(
266
                        *deployment.Device.Created).Seconds())
267
        }
268
        res := &model.Deployment{
269
                ID:                          deployment.ID,
270
                DeviceID:                    deployment.Device.Id,
271
                DeploymentID:                deployment.Deployment.Id,
272
                TenantID:                    tenant,
273
                DeploymentName:              deployment.Deployment.Name,
274
                DeploymentArtifactName:      deployment.Deployment.ArtifactName,
275
                DeploymentType:              deployment.Deployment.Type,
276
                DeploymentCreated:           deployment.Deployment.Created,
277
                DeploymentFilterID:          deployment.Deployment.FilterId,
278
                DeploymentAllDevices:        deployment.Deployment.AllDevices,
279
                DeploymentForceInstallation: deployment.Deployment.ForceInstallation,
280
                DeploymentGroups:            deployment.Deployment.Groups,
281
                DeploymentPhased:            deployment.Deployment.PhaseId != "",
282
                DeploymentPhaseId:           deployment.Deployment.PhaseId,
283
                DeploymentRetries:           deployment.Deployment.Retries,
284
                DeploymentMaxDevices:        uint(deployment.Deployment.MaxDevices),
285
                DeploymentAutogenerateDelta: deployment.Deployment.AutogenerateDelta,
286
                DeviceCreated:               deployment.Device.Created,
287
                DeviceFinished:              deployment.Device.Finished,
288
                DeviceElapsedSeconds:        deviceElapsedSeconds,
289
                DeviceDeleted:               deployment.Device.Deleted,
290
                DeviceStatus:                deployment.Device.Status,
291
                DeviceIsLogAvailable:        deployment.Device.IsLogAvailable,
292
                DeviceRetries:               deployment.Device.Retries,
293
                DeviceAttempts:              deployment.Device.Attempts,
294
        }
295
        if deployment.Device.Image != nil {
296
                res.ImageID = deployment.Device.Image.Id
297
                res.ImageDescription = deployment.Device.Image.Description
298
                res.ImageArtifactName = deployment.Device.Image.Name
299
                res.ImageDeviceTypes = deployment.Device.Image.DeviceTypesCompatible
300
                res.ImageSigned = deployment.Device.Image.Signed
301
                res.ImageProvides = deployment.Device.Image.Provides
302
                res.ImageDepends = deployment.Device.Image.Depends
303
                res.ImageClearsProvides = deployment.Device.Image.ClearsProvides
304
                res.ImageSize = deployment.Device.Image.Size
305
                if deployment.Device.Image.Info != nil {
306
                        res.ImageArtifactInfoFormat = deployment.Device.Image.Info.Format
307
                        res.ImageArtifactInfoVersion = deployment.Device.Image.Info.Version
308
                }
309
        }
310
        return res
311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc