• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 715476761

pending completion
715476761

Pull #79

gitlab-ci

Fabio Tranchitella
feat: end-point to list the filterable attributes usage and limits
Pull Request #79: MEN-5598: map inventory attributes to sequential fields

441 of 510 new or added lines in 14 files covered. (86.47%)

13 existing lines in 2 files now uncovered.

1832 of 2225 relevant lines covered (82.34%)

15.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

/app/indexer/jobs.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19
        "encoding/json"
20

21
        natsio "github.com/nats-io/nats.go"
22
        "github.com/pkg/errors"
23

24
        "github.com/mendersoftware/go-lib-micro/config"
25
        "github.com/mendersoftware/go-lib-micro/log"
26

27
        "github.com/mendersoftware/reporting/client/deviceauth"
28
        "github.com/mendersoftware/reporting/client/inventory"
29
        rconfig "github.com/mendersoftware/reporting/config"
30
        "github.com/mendersoftware/reporting/model"
31
)
32

33
type Services map[string]bool
34
type DeviceServices map[string]Services
35
type TenantDeviceServices map[string]DeviceServices
36

37
func (i *indexer) GetJobs(ctx context.Context, jobs chan *model.Job) error {
7✔
38
        l := log.FromContext(ctx)
7✔
39

7✔
40
        streamName := config.Config.GetString(rconfig.SettingNatsStreamName)
7✔
41
        err := i.nats.JetStreamCreateStream(streamName)
7✔
42
        if err != nil {
7✔
43
                return errors.Wrap(err, "failed to create the nats JetStream stream")
×
44
        }
×
45

46
        topic := config.Config.GetString(rconfig.SettingNatsSubscriberTopic)
7✔
47
        subject := streamName + "." + topic
7✔
48
        durableName := config.Config.GetString(rconfig.SettingNatsSubscriberDurable)
7✔
49

7✔
50
        channel := make(chan *natsio.Msg, 1)
7✔
51
        unsubscribe, err := i.nats.JetStreamSubscribe(ctx, subject, durableName, channel)
7✔
52
        if err != nil {
9✔
53
                return errors.Wrap(err, "failed to subscribe to the nats JetStream")
2✔
54
        }
2✔
55

56
        go func() {
10✔
57
                l.Info("Reindexer ready to receive messages")
5✔
58
                defer func() {
9✔
59
                        _ = unsubscribe()
4✔
60
                }()
4✔
61

62
                for {
24✔
63
                        select {
19✔
64
                        case msg := <-channel:
14✔
65
                                job := &model.Job{}
14✔
66
                                err := json.Unmarshal(msg.Data, job)
14✔
67
                                if err != nil {
16✔
68
                                        err = errors.Wrap(err, "failed to unmarshall message")
2✔
69
                                        l.Error(err)
2✔
70
                                        if err := msg.Term(); err != nil {
4✔
71
                                                err = errors.Wrap(err, "failed to term the message")
2✔
72
                                                l.Error(err)
2✔
73
                                        }
2✔
74
                                        continue
2✔
75
                                }
76
                                if err = msg.Ack(); err != nil {
14✔
77
                                        err = errors.Wrap(err, "failed to ack the message")
2✔
78
                                        l.Error(err)
2✔
79
                                }
2✔
80
                                jobs <- job
12✔
81

82
                        case <-ctx.Done():
4✔
83
                                return
4✔
84
                        }
85
                }
86
        }()
87

88
        return nil
5✔
89
}
90

91
func (i *indexer) ProcessJobs(ctx context.Context, jobs []*model.Job) {
10✔
92
        l := log.FromContext(ctx)
10✔
93

10✔
94
        devices := make([]*model.Device, 0, len(jobs))
10✔
95
        removedDevices := make([]*model.Device, 0, len(jobs))
10✔
96

10✔
97
        l.Debugf("Processing %d jobs", len(jobs))
10✔
98
        tenantsDevicesServices := groupJobsIntoTenantDeviceServices(jobs)
10✔
99
        for tenant, deviceServices := range tenantsDevicesServices {
22✔
100
                deviceIDs := make([]string, 0, len(deviceServices))
12✔
101
                for deviceID := range deviceServices {
46✔
102
                        deviceIDs = append(deviceIDs, deviceID)
34✔
103
                }
34✔
104
                // get devices from deviceauth
105
                deviceAuthDevices, err := i.devClient.GetDevices(ctx, tenant, deviceIDs)
12✔
106
                if err != nil {
14✔
107
                        l.Error(errors.Wrap(err, "failed to get devices from deviceauth"))
2✔
108
                        continue
2✔
109
                }
110
                // get devices from inventory
111
                inventoryDevices, err := i.invClient.GetDevices(ctx, tenant, deviceIDs)
10✔
112
                if err != nil {
12✔
113
                        l.Error(errors.Wrap(err, "failed to get devices from inventory"))
2✔
114
                        continue
2✔
115
                }
116
                // process the results
117
                devices = devices[:0]
8✔
118
                removedDevices = removedDevices[:0]
8✔
119
                for _, deviceID := range deviceIDs {
30✔
120
                        var deviceAuthDevice *deviceauth.DeviceAuthDevice
22✔
121
                        var inventoryDevice *inventory.Device
22✔
122
                        for _, d := range deviceAuthDevices {
64✔
123
                                if d.ID == deviceID {
60✔
124
                                        deviceAuthDevice = &d
18✔
125
                                        break
18✔
126
                                }
127
                        }
128
                        for _, d := range inventoryDevices {
64✔
129
                                if d.ID == inventory.DeviceID(deviceID) {
60✔
130
                                        inventoryDevice = &d
18✔
131
                                        break
18✔
132
                                }
133
                        }
134
                        if deviceAuthDevice == nil || inventoryDevice == nil {
26✔
135
                                removedDevices = append(removedDevices, &model.Device{
4✔
136
                                        ID:       &deviceID,
4✔
137
                                        TenantID: &tenant,
4✔
138
                                })
4✔
139
                                continue
4✔
140
                        }
141
                        device := model.NewDevice(tenant, string(inventoryDevice.ID))
18✔
142
                        // data from inventory
18✔
143
                        device.SetUpdatedAt(inventoryDevice.UpdatedTs)
18✔
144
                        attributes, err := i.mapper.MapInventoryAttributes(ctx, tenant,
18✔
145
                                inventoryDevice.Attributes, true)
18✔
146
                        if err != nil {
18✔
NEW
147
                                err = errors.Wrapf(err,
×
NEW
148
                                        "failed to map inventory data for tenant %s, "+
×
NEW
149
                                                "device %s", tenant, deviceID)
×
NEW
150
                                l.Warn(err)
×
151
                        } else {
18✔
152
                                for _, invattr := range attributes {
36✔
153
                                        attr := model.NewInventoryAttribute(invattr.Scope).
18✔
154
                                                SetName(invattr.Name).
18✔
155
                                                SetVal(invattr.Value)
18✔
156
                                        if err := device.AppendAttr(attr); err != nil {
18✔
NEW
157
                                                err = errors.Wrapf(err,
×
NEW
158
                                                        "failed to convert inventory data for tenant %s, "+
×
NEW
159
                                                                "device %s", tenant, deviceID)
×
NEW
160
                                                l.Warn(err)
×
NEW
161
                                        }
×
162
                                }
163
                        }
164
                        // data from device auth
165
                        _ = device.AppendAttr(&model.InventoryAttribute{
18✔
166
                                Scope:  model.ScopeIdentity,
18✔
167
                                Name:   model.AttrNameStatus,
18✔
168
                                String: []string{deviceAuthDevice.Status},
18✔
169
                        })
18✔
170
                        for name, value := range deviceAuthDevice.IdDataStruct {
26✔
171
                                attr := model.NewInventoryAttribute(model.ScopeIdentity).
8✔
172
                                        SetName(name).
8✔
173
                                        SetVal(value)
8✔
174
                                if err := device.AppendAttr(attr); err != nil {
8✔
175
                                        err = errors.Wrapf(err,
×
176
                                                "failed to convert identity data for tenant %s, "+
×
177
                                                        "device %s", tenant, deviceID)
×
178
                                        l.Warn(err)
×
179
                                }
×
180
                        }
181
                        // append the device
182
                        devices = append(devices, device)
18✔
183
                }
184
                // bulk index the device
185
                if len(devices) > 0 || len(removedDevices) > 0 {
16✔
186
                        err = i.store.BulkIndexDevices(ctx, devices, removedDevices)
8✔
187
                        if err != nil {
10✔
188
                                err = errors.Wrap(err, "failed to bulk index the devices")
2✔
189
                                l.Error(err)
2✔
190
                        }
2✔
191
                }
192
        }
193
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc