• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

mendersoftware / reporting / 732592385

pending completion
732592385

Pull #87

gitlab-ci

Fabio Tranchitella
refac: add support for processing deployments reindex jobs
Pull Request #87: MEN-5930: index device deployment objects

396 of 468 new or added lines in 13 files covered. (84.62%)

7 existing lines in 2 files now uncovered.

2207 of 2620 relevant lines covered (84.24%)

17.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.65
/app/indexer/jobs.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19
        "encoding/json"
20

21
        natsio "github.com/nats-io/nats.go"
22
        "github.com/pkg/errors"
23

24
        "github.com/mendersoftware/go-lib-micro/config"
25
        "github.com/mendersoftware/go-lib-micro/log"
26

27
        "github.com/mendersoftware/reporting/client/deviceauth"
28
        "github.com/mendersoftware/reporting/client/inventory"
29
        rconfig "github.com/mendersoftware/reporting/config"
30
        "github.com/mendersoftware/reporting/model"
31
)
32

33
type IDs map[string]bool
34
type ActionIDs map[string]IDs
35
type TenantActionIDs map[string]ActionIDs
36

37
func (i *indexer) GetJobs(ctx context.Context, jobs chan *model.Job) error {
7✔
38
        l := log.FromContext(ctx)
7✔
39

7✔
40
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
7✔
41
        err := i.nats.JetStreamCreateStream(streamName)
7✔
42
        if err != nil {
7✔
43
                return errors.Wrap(err, "failed to create the nats JetStream stream")
×
44
        }
×
45

46
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
7✔
47
        subject := streamName + "." + topic
7✔
48
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
7✔
49

7✔
50
        channel := make(chan *natsio.Msg, 1)
7✔
51
        unsubscribe, err := i.nats.JetStreamSubscribe(ctx, subject, durableName, channel)
7✔
52
        if err != nil {
9✔
53
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
2✔
54
        }
2✔
55

56
        go func() {
10✔
57
                l.Info("Reindexer ready to receive messages")
5✔
58
                defer func() {
7✔
59
                        _ = unsubscribe()
2✔
60
                }()
2✔
61

62
                for {
29✔
63
                        select {
24✔
64
                        case msg := <-channel:
19✔
65
                                job := &model.Job{}
19✔
66
                                err := json.Unmarshal(msg.Data, job)
19✔
67
                                if err != nil {
21✔
68
                                        err = errors.Wrap(err, "failed to unmarshall message")
2✔
69
                                        l.Error(err)
2✔
70
                                        if err := msg.Term(); err != nil {
4✔
71
                                                err = errors.Wrap(err, "failed to term the message")
2✔
72
                                                l.Error(err)
2✔
73
                                        }
2✔
74
                                        continue
2✔
75
                                }
76
                                if err = msg.Ack(); err != nil {
19✔
77
                                        err = errors.Wrap(err, "failed to ack the message")
2✔
78
                                        l.Error(err)
2✔
79
                                }
2✔
80
                                jobs <- job
17✔
81

82
                        case <-ctx.Done():
2✔
83
                                return
2✔
84
                        }
85
                }
86
        }()
87

88
        return nil
5✔
89
}
90

91
func (i *indexer) ProcessJobs(ctx context.Context, jobs []*model.Job) {
13✔
92
        l := log.FromContext(ctx)
13✔
93
        l.Debugf("Processing %d jobs", len(jobs))
13✔
94
        tenantsActionIDs := groupJobsIntoTenantActionIDs(jobs)
13✔
95
        for tenant, actionIDs := range tenantsActionIDs {
29✔
96
                for action, IDs := range actionIDs {
32✔
97
                        if action == model.ActionReindex {
32✔
98
                                i.processJobDevices(ctx, tenant, IDs)
16✔
99
                        } else if action == model.ActionReindexDeployment {
16✔
NEW
100
                                i.processJobDeployments(ctx, tenant, IDs)
×
NEW
101
                        } else {
×
NEW
102
                                l.Warnf("ignoring unknown job action: %v", action)
×
NEW
103
                        }
×
104
                }
105
        }
106
}
107

108
func (i *indexer) processJobDevices(
109
        ctx context.Context,
110
        tenant string,
111
        IDs IDs,
112
) {
16✔
113
        l := log.FromContext(ctx)
16✔
114
        devices := make([]*model.Device, 0, len(IDs))
16✔
115
        removedDevices := make([]*model.Device, 0, len(IDs))
16✔
116

16✔
117
        deviceIDs := make([]string, 0, len(IDs))
16✔
118
        for deviceID := range IDs {
61✔
119
                deviceIDs = append(deviceIDs, deviceID)
45✔
120
        }
45✔
121
        // get devices from deviceauth
122
        deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
16✔
123
        if err != nil {
18✔
124
                l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
2✔
125
                return
2✔
126
        }
2✔
127
        // get devices from inventory
128
        inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
14✔
129
        if err != nil {
16✔
130
                l.Error(errors.Wrap(err, "failed to get devices from inventory"))
2✔
131
                return
2✔
132
        }
2✔
133
        // process the results
134
        devices = devices[:0]
12✔
135
        removedDevices = removedDevices[:0]
12✔
136
        for _, deviceID := range deviceIDs {
45✔
137
                var deviceAuthDevice *deviceauth.DeviceAuthDevice
33✔
138
                var inventoryDevice *inventory.Device
33✔
139
                for _, d := range deviceAuthDevices {
96✔
140
                        if d.ID == deviceID {
90✔
141
                                deviceAuthDevice = &d
27✔
142
                                break
27✔
143
                        }
144
                }
145
                for _, d := range inventoryDevices {
96✔
146
                        if d.ID == inventory.DeviceID(deviceID) {
90✔
147
                                inventoryDevice = &d
27✔
148
                                break
27✔
149
                        }
150
                }
151
                if deviceAuthDevice == nil || inventoryDevice == nil {
39✔
152
                        removedDevices = append(removedDevices, &model.Device{
6✔
153
                                ID:       &deviceID,
6✔
154
                                TenantID: &tenant,
6✔
155
                        })
6✔
156
                        continue
6✔
157
                }
158
                device := i.processJobDevice(ctx, tenant, deviceAuthDevice, inventoryDevice)
27✔
159
                if device == nil {
27✔
UNCOV
160
                        continue
×
161
                }
162
                devices = append(devices, device)
27✔
163
        }
164
        // bulk index the device
165
        if len(devices) > 0 || len(removedDevices) > 0 {
24✔
166
                err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
12✔
167
                if err != nil {
14✔
168
                        err = errors.Wrap(err, "failed to bulk index the devices")
2✔
169
                        l.Error(err)
2✔
170
                }
2✔
171
        }
172
}
173

174
func (i *indexer) processJobDevice(
175
        ctx context.Context,
176
        tenant string,
177
        deviceAuthDevice *deviceauth.DeviceAuthDevice,
178
        inventoryDevice *inventory.Device,
179
) *model.Device {
27✔
180
        l := log.FromContext(ctx)
27✔
181
        //
27✔
182
        device := model.NewDevice(tenant, string(inventoryDevice.ID))
27✔
183
        // data from inventory
27✔
184
        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
27✔
185
        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
27✔
186
                inventoryDevice.Attributes, true, false)
27✔
187
        if err != nil {
27✔
NEW
188
                err = errors.Wrapf(err,
×
NEW
189
                        "failed to map device data for tenant %s, "+
×
NEW
190
                                "device %s", tenant, inventoryDevice.ID)
×
NEW
191
                l.Warn(err)
×
192
        } else {
27✔
193
                for _, invattr := range attributes {
54✔
194
                        attr := model.NewInventoryAttribute(invattr.Scope).
27✔
195
                                SetName(invattr.Name).
27✔
196
                                SetVal(invattr.Value)
27✔
197
                        if err := device.AppendAttr(attr); err != nil {
27✔
198
                                err = errors.Wrapf(err,
×
NEW
199
                                        "failed to convert device data for tenant %s, "+
×
NEW
200
                                                "device %s", tenant, inventoryDevice.ID)
×
UNCOV
201
                                l.Warn(err)
×
UNCOV
202
                        }
×
203
                }
204
        }
205
        // data from device auth
206
        _ = device.AppendAttr(&model.InventoryAttribute{
27✔
207
                Scope:  model.ScopeIdentity,
27✔
208
                Name:   model.AttrNameStatus,
27✔
209
                String: []string{deviceAuthDevice.Status},
27✔
210
        })
27✔
211
        for name, value := range deviceAuthDevice.IdDataStruct {
39✔
212
                attr := model.NewInventoryAttribute(model.ScopeIdentity).
12✔
213
                        SetName(name).
12✔
214
                        SetVal(value)
12✔
215
                if err := device.AppendAttr(attr); err != nil {
12✔
NEW
216
                        err = errors.Wrapf(err,
×
NEW
217
                                "failed to convert identity data for tenant %s, "+
×
NEW
218
                                        "device %s", tenant, inventoryDevice.ID)
×
NEW
219
                        l.Warn(err)
×
UNCOV
220
                }
×
221
        }
222
        // latest deployment
223
        deviceDeployment, err := i.deplClient.GetLatestFinishedDeployment(ctx, tenant,
27✔
224
                string(inventoryDevice.ID))
27✔
225
        if err != nil {
27✔
NEW
226
                l.Error(errors.Wrap(err, "failed to get device deployments from deployments"))
×
NEW
227
                return nil
×
228
        } else if deviceDeployment != nil {
46✔
229
                _ = device.AppendAttr(&model.InventoryAttribute{
19✔
230
                        Scope:  model.ScopeSystem,
19✔
231
                        Name:   model.AttrNameLatestDeploymentStatus,
19✔
232
                        String: []string{deviceDeployment.Device.Status},
19✔
233
                })
19✔
234
        }
19✔
235
        // return the device
236
        return device
27✔
237
}
238

239
func (i *indexer) processJobDeployments(
240
        ctx context.Context,
241
        tenant string,
242
        IDs IDs,
NEW
243
) {
×
NEW
244

×
UNCOV
245
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc