• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 732508278

pending completion
732508278

Pull #87

gitlab-ci

Fabio Tranchitella
refac: rename internal search end-point to `/tenants/{tenant_id}/devices/search`
Pull Request #87: MEN-5930: index device deployment objects

331 of 402 new or added lines in 12 files covered. (82.34%)

135 existing lines in 5 files now uncovered.

2196 of 2600 relevant lines covered (84.46%)

17.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.9
/app/indexer/jobs.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19
        "encoding/json"
20

21
        natsio "github.com/nats-io/nats.go"
22
        "github.com/pkg/errors"
23

24
        "github.com/mendersoftware/go-lib-micro/config"
25
        "github.com/mendersoftware/go-lib-micro/log"
26

27
        "github.com/mendersoftware/reporting/client/deviceauth"
28
        "github.com/mendersoftware/reporting/client/inventory"
29
        rconfig "github.com/mendersoftware/reporting/config"
30
        "github.com/mendersoftware/reporting/model"
31
)
32

33
type Services map[string]bool
34
type DeviceServices map[string]Services
35
type TenantDeviceServices map[string]DeviceServices
36

37
func (i *indexer) GetJobs(ctx context.Context, jobs chan *model.Job) error {
7✔
38
        l := log.FromContext(ctx)
7✔
39

7✔
40
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
7✔
41
        err := i.nats.JetStreamCreateStream(streamName)
7✔
42
        if err != nil {
7✔
UNCOV
43
                return errors.Wrap(err, "failed to create the nats JetStream stream")
×
44
        }
×
45

46
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
7✔
47
        subject := streamName + "." + topic
7✔
48
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
7✔
49

7✔
50
        channel := make(chan *natsio.Msg, 1)
7✔
51
        unsubscribe, err := i.nats.JetStreamSubscribe(ctx, subject, durableName, channel)
7✔
52
        if err != nil {
9✔
53
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
2✔
54
        }
2✔
55

56
        go func() {
10✔
57
                l.Info("Reindexer ready to receive messages")
5✔
58
                defer func() {
7✔
59
                        _ = unsubscribe()
2✔
60
                }()
2✔
61

62
                for {
29✔
63
                        select {
24✔
64
                        case msg := <-channel:
19✔
65
                                job := &model.Job{}
19✔
66
                                err := json.Unmarshal(msg.Data, job)
19✔
67
                                if err != nil {
21✔
68
                                        err = errors.Wrap(err, "failed to unmarshall message")
2✔
69
                                        l.Error(err)
2✔
70
                                        if err := msg.Term(); err != nil {
4✔
71
                                                err = errors.Wrap(err, "failed to term the message")
2✔
72
                                                l.Error(err)
2✔
73
                                        }
2✔
74
                                        continue
2✔
75
                                }
76
                                if err = msg.Ack(); err != nil {
19✔
77
                                        err = errors.Wrap(err, "failed to ack the message")
2✔
78
                                        l.Error(err)
2✔
79
                                }
2✔
80
                                jobs <- job
17✔
81

82
                        case <-ctx.Done():
2✔
83
                                return
2✔
84
                        }
85
                }
86
        }()
87

88
        return nil
5✔
89
}
90

91
func (i *indexer) ProcessJobs(ctx context.Context, jobs []*model.Job) {
13✔
92
        l := log.FromContext(ctx)
13✔
93

13✔
94
        devices := make([]*model.Device, 0, len(jobs))
13✔
95
        removedDevices := make([]*model.Device, 0, len(jobs))
13✔
96

13✔
97
        l.Debugf("Processing %d jobs", len(jobs))
13✔
98
        tenantsDevicesServices := groupJobsIntoTenantDeviceServices(jobs)
13✔
99
        for tenant, deviceServices := range tenantsDevicesServices {
29✔
100
                deviceIDs := make([]string, 0, len(deviceServices))
16✔
101
                for deviceID := range deviceServices {
61✔
102
                        deviceIDs = append(deviceIDs, deviceID)
45✔
103
                }
45✔
104
                // get devices from deviceauth
105
                deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
16✔
106
                if err != nil {
18✔
107
                        l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
2✔
108
                        continue
2✔
109
                }
110
                // get devices from inventory
111
                inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
14✔
112
                if err != nil {
16✔
113
                        l.Error(errors.Wrap(err, "failed to get devices from inventory"))
2✔
114
                        continue
2✔
115
                }
116
                // process the results
117
                devices = devices[:0]
12✔
118
                removedDevices = removedDevices[:0]
12✔
119
                for _, deviceID := range deviceIDs {
45✔
120
                        var deviceAuthDevice *deviceauth.DeviceAuthDevice
33✔
121
                        var inventoryDevice *inventory.Device
33✔
122
                        for _, d := range deviceAuthDevices {
96✔
123
                                if d.ID == deviceID {
90✔
124
                                        deviceAuthDevice = &d
27✔
125
                                        break
27✔
126
                                }
127
                        }
128
                        for _, d := range inventoryDevices {
96✔
129
                                if d.ID == inventory.DeviceID(deviceID) {
90✔
130
                                        inventoryDevice = &d
27✔
131
                                        break
27✔
132
                                }
133
                        }
134
                        if deviceAuthDevice == nil || inventoryDevice == nil {
39✔
135
                                removedDevices = append(removedDevices, &model.Device{
6✔
136
                                        ID:       &deviceID,
6✔
137
                                        TenantID: &tenant,
6✔
138
                                })
6✔
139
                                continue
6✔
140
                        }
141
                        device := i.processJobDevice(ctx, tenant, deviceAuthDevice, inventoryDevice)
27✔
142
                        if device == nil {
27✔
NEW
143
                                continue
×
144
                        }
145
                        devices = append(devices, device)
27✔
146
                }
147
                // bulk index the device
148
                if len(devices) > 0 || len(removedDevices) > 0 {
24✔
149
                        err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
12✔
150
                        if err != nil {
14✔
151
                                err = errors.Wrap(err, "failed to bulk index the devices")
2✔
152
                                l.Error(err)
2✔
153
                        }
2✔
154
                }
155
        }
156
}
157

158
func (i *indexer) processJobDevice(
159
        ctx context.Context,
160
        tenant string,
161
        deviceAuthDevice *deviceauth.DeviceAuthDevice,
162
        inventoryDevice *inventory.Device,
163
) *model.Device {
27✔
164
        l := log.FromContext(ctx)
27✔
165
        //
27✔
166
        device := model.NewDevice(tenant, string(inventoryDevice.ID))
27✔
167
        // data from inventory
27✔
168
        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
27✔
169
        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
27✔
170
                inventoryDevice.Attributes, true, false)
27✔
171
        if err != nil {
27✔
NEW
UNCOV
172
                err = errors.Wrapf(err,
×
NEW
173
                        "failed to map device data for tenant %s, "+
×
NEW
174
                                "device %s", tenant, inventoryDevice.ID)
×
NEW
175
                l.Warn(err)
×
176
        } else {
27✔
177
                for _, invattr := range attributes {
54✔
178
                        attr := model.NewInventoryAttribute(invattr.Scope).
27✔
179
                                SetName(invattr.Name).
27✔
180
                                SetVal(invattr.Value)
27✔
181
                        if err := device.AppendAttr(attr); err != nil {
27✔
NEW
UNCOV
182
                                err = errors.Wrapf(err,
×
NEW
UNCOV
183
                                        "failed to convert device data for tenant %s, "+
×
NEW
UNCOV
184
                                                "device %s", tenant, inventoryDevice.ID)
×
NEW
UNCOV
185
                                l.Warn(err)
×
NEW
UNCOV
186
                        }
×
187
                }
188
        }
189
        // data from device auth
190
        _ = device.AppendAttr(&model.InventoryAttribute{
27✔
191
                Scope:  model.ScopeIdentity,
27✔
192
                Name:   model.AttrNameStatus,
27✔
193
                String: []string{deviceAuthDevice.Status},
27✔
194
        })
27✔
195
        for name, value := range deviceAuthDevice.IdDataStruct {
39✔
196
                attr := model.NewInventoryAttribute(model.ScopeIdentity).
12✔
197
                        SetName(name).
12✔
198
                        SetVal(value)
12✔
199
                if err := device.AppendAttr(attr); err != nil {
12✔
NEW
UNCOV
200
                        err = errors.Wrapf(err,
×
NEW
UNCOV
201
                                "failed to convert identity data for tenant %s, "+
×
NEW
UNCOV
202
                                        "device %s", tenant, inventoryDevice.ID)
×
NEW
UNCOV
203
                        l.Warn(err)
×
NEW
UNCOV
204
                }
×
205
        }
206
        // latest deployment
207
        deviceDeployment, err := i.deplClient.GetLatestFinishedDeployment(ctx, tenant,
27✔
208
                string(inventoryDevice.ID))
27✔
209
        if err != nil {
27✔
NEW
UNCOV
210
                l.Error(errors.Wrap(err, "failed to get device deployments from deployments"))
×
NEW
UNCOV
211
                return nil
×
212
        } else if deviceDeployment != nil {
46✔
213
                _ = device.AppendAttr(&model.InventoryAttribute{
19✔
214
                        Scope:  model.ScopeSystem,
19✔
215
                        Name:   model.AttrNameLatestDeploymentStatus,
19✔
216
                        String: []string{deviceDeployment.Device.Status},
19✔
217
                })
19✔
218
        }
19✔
219
        // return the device
220
        return device
27✔
221
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc