• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1622978334

13 Jan 2025 03:51PM UTC coverage: 72.802% (-3.8%) from 76.608%
1622978334

Pull #300

gitlab-ci

alfrunes
fix: Deployment device count should not exceed max devices

Added a condition to skip deployments when the device count reaches max
devices.

Changelog: Title
Ticket: MEN-7847
Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
Pull Request #300: fix: Deployment device count should not exceed max devices

4251 of 6164 branches covered (68.96%)

Branch coverage included in aggregate %.

0 of 18 new or added lines in 1 file covered. (0.0%)

2544 existing lines in 83 files now uncovered.

42741 of 58384 relevant lines covered (73.21%)

21.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.21
/backend/services/deviceauth/client/orchestrator/client_orchestrator.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package orchestrator
16

17
import (
18
        "bytes"
19
        "context"
20
        "encoding/json"
21
        "io"
22
        "net/http"
23
        "time"
24

25
        "github.com/pkg/errors"
26

27
        "github.com/mendersoftware/mender-server/pkg/identity"
28
        "github.com/mendersoftware/mender-server/pkg/log"
29
        "github.com/mendersoftware/mender-server/pkg/requestid"
30
        "github.com/mendersoftware/mender-server/pkg/rest_utils"
31

32
        "github.com/mendersoftware/mender-server/services/deviceauth/utils"
33
)
34

35
const (
36
        // orchestrator endpoint
37
        DeviceDecommissioningOrchestratorUri = "/api/v1/workflow/decommission_device"
38
        ProvisionDeviceOrchestratorUri       = "/api/v1/workflow/provision_device"
39
        UpdateDeviceStatusOrchestratorUri    = "/api/v1/workflow/update_device_status"
40
        UpdateDeviceInventoryOrchestratorUri = "/api/v1/workflow/update_device_inventory"
41
        HealthURI                            = "/api/v1/health"
42
        DeviceLimitWarningURI                = "/api/v1/workflow/device_limit_email"
43
        ReindexReportingURI                  = "/api/v1/workflow/reindex_reporting"
44
        ReindexReportingBatchURI             = "/api/v1/workflow/reindex_reporting/batch"
45
        // default request timeout, 10s?
46
        defaultReqTimeout = time.Duration(10) * time.Second
47
)
48

49
// Config conveys client configuration
50
type Config struct {
51
        // Orchestrator host
52
        OrchestratorAddr string
53
        // Request timeout
54
        Timeout time.Duration
55
}
56

57
// ClientRunner is an interface of orchestrator client
58
//
59
//go:generate ../../../../utils/mockgen.sh
60
type ClientRunner interface {
61
        CheckHealth(ctx context.Context) error
62
        SubmitDeviceDecommisioningJob(ctx context.Context, req DecommissioningReq) error
63
        SubmitProvisionDeviceJob(ctx context.Context, req ProvisionDeviceReq) error
64
        SubmitUpdateDeviceStatusJob(ctx context.Context, req UpdateDeviceStatusReq) error
65
        SubmitDeviceLimitWarning(ctx context.Context, devWarn DeviceLimitWarning) error
66
        SubmitUpdateDeviceInventoryJob(ctx context.Context, req UpdateDeviceInventoryReq) error
67
        SubmitReindexReporting(c context.Context, device string) error
68
        SubmitReindexReportingBatch(c context.Context, devices []string) error
69
}
70

71
// Client is an opaque implementation of orchestrator client. Implements
72
// ClientRunner interface
73
type Client struct {
74
        conf Config
75
        http http.Client
76
}
77

78
func NewClient(c Config) *Client {
1✔
79
        if c.Timeout == 0 {
2✔
80
                c.Timeout = defaultReqTimeout
1✔
81
        }
1✔
82

83
        return &Client{
1✔
84
                conf: c,
1✔
85
                http: http.Client{
1✔
86
                        Timeout: c.Timeout,
1✔
87
                },
1✔
88
        }
1✔
89
}
90

91
func (c *Client) CheckHealth(ctx context.Context) error {
1✔
92
        var (
1✔
93
                apiErr rest_utils.ApiError
1✔
94
        )
1✔
95

1✔
96
        if ctx == nil {
2✔
97
                ctx = context.Background()
1✔
98
        }
1✔
99
        if _, ok := ctx.Deadline(); !ok {
2✔
100
                var cancel context.CancelFunc
1✔
101
                ctx, cancel = context.WithTimeout(ctx, c.conf.Timeout)
1✔
102
                defer cancel()
1✔
103
        }
1✔
104
        req, _ := http.NewRequestWithContext(
1✔
105
                ctx, "GET",
1✔
106
                utils.JoinURL(c.conf.OrchestratorAddr, HealthURI), nil,
1✔
107
        )
1✔
108

1✔
109
        rsp, err := c.http.Do(req)
1✔
110
        if err != nil {
2✔
111
                return err
1✔
112
        }
1✔
113
        defer rsp.Body.Close()
1✔
114
        if rsp.StatusCode >= http.StatusOK && rsp.StatusCode < 300 {
2✔
115
                return nil
1✔
116
        }
1✔
117
        decoder := json.NewDecoder(rsp.Body)
1✔
118
        err = decoder.Decode(&apiErr)
1✔
119
        if err != nil {
2✔
120
                return errors.Errorf("health check HTTP error: %s", rsp.Status)
1✔
121
        }
1✔
122
        return &apiErr
1✔
123
}
124

125
func (co *Client) SubmitDeviceDecommisioningJob(
126
        ctx context.Context,
127
        decommissioningReq DecommissioningReq,
128
) error {
1✔
129

1✔
130
        l := log.FromContext(ctx)
1✔
131

1✔
132
        l.Debugf("Submit decommissioning job for device: %s", decommissioningReq.DeviceId)
1✔
133

1✔
134
        DecommissioningReqJson, err := json.Marshal(decommissioningReq)
1✔
135
        if err != nil {
1✔
136
                return errors.Wrapf(err, "failed to submit device decommissioning job")
×
137
        }
×
138

139
        contentReader := bytes.NewReader(DecommissioningReqJson)
1✔
140

1✔
141
        req, err := http.NewRequest(
1✔
142
                http.MethodPost,
1✔
143
                utils.JoinURL(co.conf.OrchestratorAddr, DeviceDecommissioningOrchestratorUri),
1✔
144
                contentReader)
1✔
145
        if err != nil {
1✔
146
                return errors.Wrapf(err, "failed to create request")
×
147
        }
×
148

149
        req.Header.Set("Content-Type", "application/json")
1✔
150

1✔
151
        // set the device admission request timeout
1✔
152
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
1✔
153
        defer cancel()
1✔
154

1✔
155
        rsp, err := co.http.Do(req.WithContext(ctx))
1✔
156
        if err != nil {
2✔
157
                return errors.Wrapf(err, "failed to submit decommissioning job")
1✔
158
        }
1✔
159
        defer rsp.Body.Close()
1✔
160

1✔
161
        if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusCreated {
2✔
162
                body, err := io.ReadAll(rsp.Body)
1✔
163
                if err != nil {
1✔
164
                        body = []byte("<failed to read>")
×
165
                }
×
166
                l.Errorf("decommision request %s %s failed with status %v, response text: %s",
1✔
167
                        req.Method, req.URL, rsp.Status, body)
1✔
168

1✔
169
                return errors.Errorf(
1✔
170
                        "submit decommissioning request failed with status %v", rsp.Status)
1✔
171
        }
172
        return nil
1✔
173
}
174

175
func (co *Client) SubmitProvisionDeviceJob(
176
        ctx context.Context,
177
        provisionDeviceReq ProvisionDeviceReq,
UNCOV
178
) error {
×
UNCOV
179

×
UNCOV
180
        l := log.FromContext(ctx)
×
UNCOV
181

×
UNCOV
182
        l.Debugf("Submit provision device job for device: %s", provisionDeviceReq.DeviceID)
×
UNCOV
183

×
UNCOV
184
        ProvisionDeviceReqJson, err := json.Marshal(provisionDeviceReq)
×
UNCOV
185
        if err != nil {
×
186
                return errors.Wrapf(err, "failed to submit provision device job")
×
187
        }
×
188

UNCOV
189
        contentReader := bytes.NewReader(ProvisionDeviceReqJson)
×
UNCOV
190

×
UNCOV
191
        req, err := http.NewRequest(
×
UNCOV
192
                http.MethodPost,
×
UNCOV
193
                utils.JoinURL(co.conf.OrchestratorAddr, ProvisionDeviceOrchestratorUri),
×
UNCOV
194
                contentReader)
×
UNCOV
195
        if err != nil {
×
196
                return errors.Wrapf(err, "failed to create request")
×
197
        }
×
198

UNCOV
199
        req.Header.Set("Content-Type", "application/json")
×
UNCOV
200

×
UNCOV
201
        // set the device admission request timeout
×
UNCOV
202
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
×
UNCOV
203
        defer cancel()
×
UNCOV
204

×
UNCOV
205
        rsp, err := co.http.Do(req.WithContext(ctx))
×
UNCOV
206
        if err != nil {
×
207
                return errors.Wrapf(err, "failed to submit provision device job")
×
208
        }
×
UNCOV
209
        defer rsp.Body.Close()
×
UNCOV
210

×
UNCOV
211
        if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusCreated {
×
212
                body, err := io.ReadAll(rsp.Body)
×
213
                if err != nil {
×
214
                        body = []byte("<failed to read>")
×
215
                }
×
216
                l.Errorf("provision device request %s %s failed with status %v, response text: %s",
×
217
                        req.Method, req.URL, rsp.Status, body)
×
218

×
219
                return errors.Errorf(
×
220
                        "submit provision device request failed with status %v", rsp.Status)
×
221
        }
UNCOV
222
        return nil
×
223
}
224

225
func (co *Client) SubmitUpdateDeviceStatusJob(
226
        ctx context.Context,
227
        updateDeviceStatusReq UpdateDeviceStatusReq,
UNCOV
228
) error {
×
UNCOV
229
        l := log.FromContext(ctx)
×
UNCOV
230

×
UNCOV
231
        l.Debugf("Submit update device status job for devices: %v", updateDeviceStatusReq.Devices)
×
UNCOV
232

×
UNCOV
233
        UpdateDeviceStatusReqJson, err := json.Marshal(updateDeviceStatusReq)
×
UNCOV
234
        if err != nil {
×
235
                return errors.Wrapf(err, "failed to submit update device status job")
×
236
        }
×
237

UNCOV
238
        contentReader := bytes.NewReader(UpdateDeviceStatusReqJson)
×
UNCOV
239

×
UNCOV
240
        req, err := http.NewRequestWithContext(
×
UNCOV
241
                ctx,
×
UNCOV
242
                http.MethodPost,
×
UNCOV
243
                utils.JoinURL(co.conf.OrchestratorAddr, UpdateDeviceStatusOrchestratorUri),
×
UNCOV
244
                contentReader)
×
UNCOV
245
        if err != nil {
×
246
                return errors.Wrapf(err, "failed to create request")
×
247
        }
×
248

UNCOV
249
        req.Header.Set("Content-Type", "application/json")
×
UNCOV
250

×
UNCOV
251
        // set the device admission request timeout
×
UNCOV
252
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
×
UNCOV
253
        defer cancel()
×
UNCOV
254

×
UNCOV
255
        rsp, err := co.http.Do(req.WithContext(ctx))
×
UNCOV
256
        if err != nil {
×
257
                return errors.Wrapf(err, "failed to submit update device status job")
×
258
        }
×
UNCOV
259
        defer rsp.Body.Close()
×
UNCOV
260

×
UNCOV
261
        if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusCreated {
×
UNCOV
262
                body, err := io.ReadAll(rsp.Body)
×
UNCOV
263
                if err != nil {
×
264
                        body = []byte("<failed to read>")
×
265
                }
×
UNCOV
266
                l.Errorf("update device status request %s %s failed with status %v, response text: %s",
×
UNCOV
267
                        req.Method, req.URL, rsp.Status, body)
×
UNCOV
268

×
UNCOV
269
                return errors.Errorf(
×
UNCOV
270
                        "submit update device status request failed with status %v", rsp.Status)
×
271
        }
UNCOV
272
        return nil
×
273
}
274

275
func (co *Client) SubmitDeviceLimitWarning(
276
        ctx context.Context,
277
        devWarn DeviceLimitWarning,
278
) error {
1✔
279
        if err := devWarn.Validate(); err != nil {
2✔
280
                return errors.Wrap(err,
1✔
281
                        "workflows: [internal] invalid request argument",
1✔
282
                )
1✔
283
        }
1✔
284

285
        bodyJSON, _ := json.Marshal(devWarn)
1✔
286

1✔
287
        req, err := http.NewRequestWithContext(ctx, "POST",
1✔
288
                utils.JoinURL(co.conf.OrchestratorAddr, DeviceLimitWarningURI),
1✔
289
                bytes.NewReader(bodyJSON),
1✔
290
        )
1✔
291
        if err != nil {
2✔
292
                return errors.Wrap(err,
1✔
293
                        "workflows: error preparing device limit warning request",
1✔
294
                )
1✔
295
        }
1✔
296
        rsp, err := co.http.Do(req)
1✔
297
        if err != nil {
2✔
298
                return errors.Wrap(err,
1✔
299
                        "workflows: error sending device limit warning request",
1✔
300
                )
1✔
301
        }
1✔
302
        defer rsp.Body.Close()
1✔
303
        if rsp.StatusCode >= 400 {
2✔
304
                var (
1✔
305
                        apiErr    = new(rest_utils.ApiError)
1✔
306
                        jsDecoder = json.NewDecoder(rsp.Body)
1✔
307
                )
1✔
308
                err := jsDecoder.Decode(apiErr)
1✔
309
                if err != nil {
2✔
310
                        return errors.Errorf(
1✔
311
                                "workflows: unexpected HTTP response: %s",
1✔
312
                                rsp.Status,
1✔
313
                        )
1✔
314
                }
1✔
315
                return apiErr
1✔
316
        }
317
        return nil
1✔
318
}
319

320
func (co *Client) SubmitUpdateDeviceInventoryJob(
321
        ctx context.Context,
322
        updateDeviceInventoryReq UpdateDeviceInventoryReq,
UNCOV
323
) error {
×
UNCOV
324
        l := log.FromContext(ctx)
×
UNCOV
325

×
UNCOV
326
        l.Debugf("Submit update device inventory job for device: %q", updateDeviceInventoryReq.DeviceId)
×
UNCOV
327

×
UNCOV
328
        UpdateDeviceInventoryReqJson, err := json.Marshal(updateDeviceInventoryReq)
×
UNCOV
329
        if err != nil {
×
330
                return errors.Wrapf(err, "failed to submit update device inventory job")
×
331
        }
×
332

UNCOV
333
        contentReader := bytes.NewReader(UpdateDeviceInventoryReqJson)
×
UNCOV
334

×
UNCOV
335
        req, err := http.NewRequest(
×
UNCOV
336
                http.MethodPost,
×
UNCOV
337
                utils.JoinURL(co.conf.OrchestratorAddr, UpdateDeviceInventoryOrchestratorUri),
×
UNCOV
338
                contentReader)
×
UNCOV
339
        if err != nil {
×
340
                return errors.Wrapf(err, "failed to create request")
×
341
        }
×
342

UNCOV
343
        req.Header.Set("Content-Type", "application/json")
×
UNCOV
344

×
UNCOV
345
        // set the workflows client request timeout
×
UNCOV
346
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
×
UNCOV
347
        defer cancel()
×
UNCOV
348

×
UNCOV
349
        rsp, err := co.http.Do(req.WithContext(ctx))
×
UNCOV
350
        if err != nil {
×
351
                return errors.Wrapf(err, "failed to submit update device inventory job")
×
352
        }
×
UNCOV
353
        defer rsp.Body.Close()
×
UNCOV
354

×
UNCOV
355
        if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusCreated {
×
356
                body, err := io.ReadAll(rsp.Body)
×
357
                if err != nil {
×
358
                        body = []byte("<failed to read>")
×
359
                }
×
360
                l.Errorf("update device inventory request %s %s failed with status %v, response text: %s",
×
361
                        req.Method, req.URL, rsp.Status, body)
×
362

×
363
                return errors.Errorf(
×
364
                        "submit update device inventory request failed with status %v", rsp.Status)
×
365
        }
UNCOV
366
        return nil
×
367
}
368

369
func (co *Client) SubmitReindexReporting(ctx context.Context, device string) error {
1✔
370
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
1✔
371
        defer cancel()
1✔
372

1✔
373
        tenantID := ""
1✔
374
        if id := identity.FromContext(ctx); id != nil {
2✔
375
                tenantID = id.Tenant
1✔
376
        }
1✔
377
        wflow := ReindexReportingWorkflow{
1✔
378
                RequestID: requestid.FromContext(ctx),
1✔
379
                TenantID:  tenantID,
1✔
380
                DeviceID:  device,
1✔
381
                Service:   ServiceDeviceauth,
1✔
382
        }
1✔
383
        payload, _ := json.Marshal(wflow)
1✔
384
        req, err := http.NewRequestWithContext(ctx,
1✔
385
                "POST",
1✔
386
                utils.JoinURL(co.conf.OrchestratorAddr, ReindexReportingURI),
1✔
387
                bytes.NewReader(payload),
1✔
388
        )
1✔
389
        if err != nil {
1✔
390
                return errors.Wrap(err, "workflows: error preparing HTTP request")
×
391
        }
×
392

393
        req.Header.Set("Content-Type", "application/json")
1✔
394

1✔
395
        rsp, err := co.http.Do(req)
1✔
396
        if err != nil {
2✔
397
                return errors.Wrap(err, "workflows: failed to submit reindex job")
1✔
398
        }
1✔
399
        defer rsp.Body.Close()
1✔
400

1✔
401
        if rsp.StatusCode < 300 {
2✔
402
                return nil
1✔
403
        } else if rsp.StatusCode == http.StatusNotFound {
3✔
404
                return errors.New(`workflows: workflow "reindex_reporting" not defined`)
1✔
405
        }
1✔
406

407
        return errors.Errorf(
1✔
408
                "workflows: unexpected HTTP status from workflows service: %s",
1✔
409
                rsp.Status,
1✔
410
        )
1✔
411
}
412

413
func (co *Client) SubmitReindexReportingBatch(ctx context.Context, devices []string) error {
1✔
414
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
1✔
415
        defer cancel()
1✔
416

1✔
417
        tenantID := ""
1✔
418
        if id := identity.FromContext(ctx); id != nil {
2✔
419
                tenantID = id.Tenant
1✔
420
        }
1✔
421
        reqID := requestid.FromContext(ctx)
1✔
422
        wflows := make([]ReindexReportingWorkflow, len(devices))
1✔
423
        for i, device := range devices {
2✔
424
                wflows[i] = ReindexReportingWorkflow{
1✔
425
                        RequestID: reqID,
1✔
426
                        TenantID:  tenantID,
1✔
427
                        DeviceID:  device,
1✔
428
                        Service:   ServiceDeviceauth,
1✔
429
                }
1✔
430
        }
1✔
431
        payload, _ := json.Marshal(wflows)
1✔
432
        req, err := http.NewRequestWithContext(ctx,
1✔
433
                "POST",
1✔
434
                utils.JoinURL(co.conf.OrchestratorAddr, ReindexReportingBatchURI),
1✔
435
                bytes.NewReader(payload),
1✔
436
        )
1✔
437
        if err != nil {
1✔
438
                return errors.Wrap(err, "workflows: error preparing HTTP request")
×
439
        }
×
440

441
        req.Header.Set("Content-Type", "application/json")
1✔
442

1✔
443
        rsp, err := co.http.Do(req)
1✔
444
        if err != nil {
2✔
445
                return errors.Wrap(err, "workflows: failed to submit reindex job")
1✔
446
        }
1✔
447
        defer rsp.Body.Close()
1✔
448

1✔
449
        if rsp.StatusCode < 300 {
2✔
450
                return nil
1✔
451
        } else if rsp.StatusCode == http.StatusNotFound {
3✔
452
                return errors.New(`workflows: workflow "reindex_reporting" not defined`)
1✔
453
        }
1✔
454

455
        return errors.Errorf(
1✔
456
                "workflows: unexpected HTTP status from workflows service: %s",
1✔
457
                rsp.Status,
1✔
458
        )
1✔
459
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc