• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / deviceauth / 1507843008

13 Sep 2024 11:01AM UTC coverage: 81.326%. Remained the same
1507843008

push

gitlab-ci

web-flow
Merge pull request #727 from mzedel/chore/deprecate

Chore/deprecate

4834 of 5944 relevant lines covered (81.33%)

42.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.5
/client/orchestrator/client_orchestrator.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package orchestrator
16

17
import (
18
        "bytes"
19
        "context"
20
        "encoding/json"
21
        "io/ioutil"
22
        "net/http"
23
        "time"
24

25
        "github.com/mendersoftware/go-lib-micro/identity"
26
        "github.com/mendersoftware/go-lib-micro/log"
27
        "github.com/mendersoftware/go-lib-micro/requestid"
28
        "github.com/mendersoftware/go-lib-micro/rest_utils"
29
        "github.com/pkg/errors"
30

31
        "github.com/mendersoftware/deviceauth/utils"
32
)
33

34
const (
35
        // orchestrator endpoint
36
        DeviceDecommissioningOrchestratorUri = "/api/v1/workflow/decommission_device"
37
        ProvisionDeviceOrchestratorUri       = "/api/v1/workflow/provision_device"
38
        UpdateDeviceStatusOrchestratorUri    = "/api/v1/workflow/update_device_status"
39
        UpdateDeviceInventoryOrchestratorUri = "/api/v1/workflow/update_device_inventory"
40
        HealthURI                            = "/api/v1/health"
41
        DeviceLimitWarningURI                = "/api/v1/workflow/device_limit_email"
42
        ReindexReportingURI                  = "/api/v1/workflow/reindex_reporting"
43
        ReindexReportingBatchURI             = "/api/v1/workflow/reindex_reporting/batch"
44
        // default request timeout, 10s?
45
        defaultReqTimeout = time.Duration(10) * time.Second
46
)
47

48
// Config conveys client configuration
49
type Config struct {
50
        // Orchestrator host
51
        OrchestratorAddr string
52
        // Request timeout
53
        Timeout time.Duration
54
}
55

56
// ClientRunner is an interface of orchestrator client
57
//
58
//go:generate ../../utils/mockgen.sh
59
type ClientRunner interface {
60
        CheckHealth(ctx context.Context) error
61
        SubmitDeviceDecommisioningJob(ctx context.Context, req DecommissioningReq) error
62
        SubmitProvisionDeviceJob(ctx context.Context, req ProvisionDeviceReq) error
63
        SubmitUpdateDeviceStatusJob(ctx context.Context, req UpdateDeviceStatusReq) error
64
        SubmitDeviceLimitWarning(ctx context.Context, devWarn DeviceLimitWarning) error
65
        SubmitUpdateDeviceInventoryJob(ctx context.Context, req UpdateDeviceInventoryReq) error
66
        SubmitReindexReporting(c context.Context, device string) error
67
        SubmitReindexReportingBatch(c context.Context, devices []string) error
68
}
69

70
// Client is an opaque implementation of orchestrator client. Implements
71
// ClientRunner interface
72
type Client struct {
73
        conf Config
74
        http http.Client
75
}
76

77
func NewClient(c Config) *Client {
39✔
78
        if c.Timeout == 0 {
65✔
79
                c.Timeout = defaultReqTimeout
26✔
80
        }
26✔
81

82
        return &Client{
39✔
83
                conf: c,
39✔
84
                http: http.Client{
39✔
85
                        Timeout: c.Timeout,
39✔
86
                },
39✔
87
        }
39✔
88
}
89

90
func (c *Client) CheckHealth(ctx context.Context) error {
8✔
91
        var (
8✔
92
                apiErr rest_utils.ApiError
8✔
93
        )
8✔
94

8✔
95
        if ctx == nil {
10✔
96
                ctx = context.Background()
2✔
97
        }
2✔
98
        if _, ok := ctx.Deadline(); !ok {
12✔
99
                var cancel context.CancelFunc
4✔
100
                ctx, cancel = context.WithTimeout(ctx, c.conf.Timeout)
4✔
101
                defer cancel()
4✔
102
        }
4✔
103
        req, _ := http.NewRequestWithContext(
8✔
104
                ctx, "GET",
8✔
105
                utils.JoinURL(c.conf.OrchestratorAddr, HealthURI), nil,
8✔
106
        )
8✔
107

8✔
108
        rsp, err := c.http.Do(req)
8✔
109
        if err != nil {
10✔
110
                return err
2✔
111
        }
2✔
112
        defer rsp.Body.Close()
6✔
113
        if rsp.StatusCode >= http.StatusOK && rsp.StatusCode < 300 {
8✔
114
                return nil
2✔
115
        }
2✔
116
        decoder := json.NewDecoder(rsp.Body)
4✔
117
        err = decoder.Decode(&apiErr)
4✔
118
        if err != nil {
6✔
119
                return errors.Errorf("health check HTTP error: %s", rsp.Status)
2✔
120
        }
2✔
121
        return &apiErr
2✔
122
}
123

124
func (co *Client) SubmitDeviceDecommisioningJob(
125
        ctx context.Context,
126
        decommissioningReq DecommissioningReq,
127
) error {
7✔
128

7✔
129
        l := log.FromContext(ctx)
7✔
130

7✔
131
        l.Debugf("Submit decommissioning job for device: %s", decommissioningReq.DeviceId)
7✔
132

7✔
133
        DecommissioningReqJson, err := json.Marshal(decommissioningReq)
7✔
134
        if err != nil {
7✔
135
                return errors.Wrapf(err, "failed to submit device decommissioning job")
×
136
        }
×
137

138
        contentReader := bytes.NewReader(DecommissioningReqJson)
7✔
139

7✔
140
        req, err := http.NewRequest(
7✔
141
                http.MethodPost,
7✔
142
                utils.JoinURL(co.conf.OrchestratorAddr, DeviceDecommissioningOrchestratorUri),
7✔
143
                contentReader)
7✔
144
        if err != nil {
7✔
145
                return errors.Wrapf(err, "failed to create request")
×
146
        }
×
147

148
        req.Header.Set("Content-Type", "application/json")
7✔
149

7✔
150
        // set the device admission request timeout
7✔
151
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
7✔
152
        defer cancel()
7✔
153

7✔
154
        rsp, err := co.http.Do(req.WithContext(ctx))
7✔
155
        if err != nil {
9✔
156
                return errors.Wrapf(err, "failed to submit decommissioning job")
2✔
157
        }
2✔
158
        defer rsp.Body.Close()
5✔
159

5✔
160
        if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusCreated {
8✔
161
                body, err := ioutil.ReadAll(rsp.Body)
3✔
162
                if err != nil {
3✔
163
                        body = []byte("<failed to read>")
×
164
                }
×
165
                l.Errorf("decommision request %s %s failed with status %v, response text: %s",
3✔
166
                        req.Method, req.URL, rsp.Status, body)
3✔
167

3✔
168
                return errors.Errorf(
3✔
169
                        "submit decommissioning request failed with status %v", rsp.Status)
3✔
170
        }
171
        return nil
3✔
172
}
173

174
func (co *Client) SubmitProvisionDeviceJob(
175
        ctx context.Context,
176
        provisionDeviceReq ProvisionDeviceReq,
177
) error {
1✔
178

1✔
179
        l := log.FromContext(ctx)
1✔
180

1✔
181
        l.Debugf("Submit provision device job for device: %s", provisionDeviceReq.DeviceID)
1✔
182

1✔
183
        ProvisionDeviceReqJson, err := json.Marshal(provisionDeviceReq)
1✔
184
        if err != nil {
1✔
185
                return errors.Wrapf(err, "failed to submit provision device job")
×
186
        }
×
187

188
        contentReader := bytes.NewReader(ProvisionDeviceReqJson)
1✔
189

1✔
190
        req, err := http.NewRequest(
1✔
191
                http.MethodPost,
1✔
192
                utils.JoinURL(co.conf.OrchestratorAddr, ProvisionDeviceOrchestratorUri),
1✔
193
                contentReader)
1✔
194
        if err != nil {
1✔
195
                return errors.Wrapf(err, "failed to create request")
×
196
        }
×
197

198
        req.Header.Set("Content-Type", "application/json")
1✔
199

1✔
200
        // set the device admission request timeout
1✔
201
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
1✔
202
        defer cancel()
1✔
203

1✔
204
        rsp, err := co.http.Do(req.WithContext(ctx))
1✔
205
        if err != nil {
1✔
206
                return errors.Wrapf(err, "failed to submit provision device job")
×
207
        }
×
208
        defer rsp.Body.Close()
1✔
209

1✔
210
        if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusCreated {
1✔
211
                body, err := ioutil.ReadAll(rsp.Body)
×
212
                if err != nil {
×
213
                        body = []byte("<failed to read>")
×
214
                }
×
215
                l.Errorf("provision device request %s %s failed with status %v, response text: %s",
×
216
                        req.Method, req.URL, rsp.Status, body)
×
217

×
218
                return errors.Errorf(
×
219
                        "submit provision device request failed with status %v", rsp.Status)
×
220
        }
221
        return nil
1✔
222
}
223

224
func (co *Client) SubmitUpdateDeviceStatusJob(
225
        ctx context.Context,
226
        updateDeviceStatusReq UpdateDeviceStatusReq,
227
) error {
1✔
228
        l := log.FromContext(ctx)
1✔
229

1✔
230
        l.Debugf("Submit update device status job for devices: %v", updateDeviceStatusReq.Devices)
1✔
231

1✔
232
        UpdateDeviceStatusReqJson, err := json.Marshal(updateDeviceStatusReq)
1✔
233
        if err != nil {
1✔
234
                return errors.Wrapf(err, "failed to submit update device status job")
×
235
        }
×
236

237
        contentReader := bytes.NewReader(UpdateDeviceStatusReqJson)
1✔
238

1✔
239
        req, err := http.NewRequestWithContext(
1✔
240
                ctx,
1✔
241
                http.MethodPost,
1✔
242
                utils.JoinURL(co.conf.OrchestratorAddr, UpdateDeviceStatusOrchestratorUri),
1✔
243
                contentReader)
1✔
244
        if err != nil {
1✔
245
                return errors.Wrapf(err, "failed to create request")
×
246
        }
×
247

248
        req.Header.Set("Content-Type", "application/json")
1✔
249

1✔
250
        // set the device admission request timeout
1✔
251
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
1✔
252
        defer cancel()
1✔
253

1✔
254
        rsp, err := co.http.Do(req.WithContext(ctx))
1✔
255
        if err != nil {
1✔
256
                return errors.Wrapf(err, "failed to submit update device status job")
×
257
        }
×
258
        defer rsp.Body.Close()
1✔
259

1✔
260
        if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusCreated {
2✔
261
                body, err := ioutil.ReadAll(rsp.Body)
1✔
262
                if err != nil {
1✔
263
                        body = []byte("<failed to read>")
×
264
                }
×
265
                l.Errorf("update device status request %s %s failed with status %v, response text: %s",
1✔
266
                        req.Method, req.URL, rsp.Status, body)
1✔
267

1✔
268
                return errors.Errorf(
1✔
269
                        "submit update device status request failed with status %v", rsp.Status)
1✔
270
        }
271
        return nil
1✔
272
}
273

274
func (co *Client) SubmitDeviceLimitWarning(
275
        ctx context.Context,
276
        devWarn DeviceLimitWarning,
277
) error {
12✔
278
        if err := devWarn.Validate(); err != nil {
14✔
279
                return errors.Wrap(err,
2✔
280
                        "workflows: [internal] invalid request argument",
2✔
281
                )
2✔
282
        }
2✔
283

284
        bodyJSON, _ := json.Marshal(devWarn)
10✔
285

10✔
286
        req, err := http.NewRequestWithContext(ctx, "POST",
10✔
287
                utils.JoinURL(co.conf.OrchestratorAddr, DeviceLimitWarningURI),
10✔
288
                bytes.NewReader(bodyJSON),
10✔
289
        )
10✔
290
        if err != nil {
12✔
291
                return errors.Wrap(err,
2✔
292
                        "workflows: error preparing device limit warning request",
2✔
293
                )
2✔
294
        }
2✔
295
        rsp, err := co.http.Do(req)
8✔
296
        if err != nil {
10✔
297
                return errors.Wrap(err,
2✔
298
                        "workflows: error sending device limit warning request",
2✔
299
                )
2✔
300
        }
2✔
301
        defer rsp.Body.Close()
6✔
302
        if rsp.StatusCode >= 400 {
10✔
303
                var (
4✔
304
                        apiErr    = new(rest_utils.ApiError)
4✔
305
                        jsDecoder = json.NewDecoder(rsp.Body)
4✔
306
                )
4✔
307
                err := jsDecoder.Decode(apiErr)
4✔
308
                if err != nil {
6✔
309
                        return errors.Errorf(
2✔
310
                                "workflows: unexpected HTTP response: %s",
2✔
311
                                rsp.Status,
2✔
312
                        )
2✔
313
                }
2✔
314
                return apiErr
2✔
315
        }
316
        return nil
2✔
317
}
318

319
func (co *Client) SubmitUpdateDeviceInventoryJob(
320
        ctx context.Context,
321
        updateDeviceInventoryReq UpdateDeviceInventoryReq,
322
) error {
1✔
323
        l := log.FromContext(ctx)
1✔
324

1✔
325
        l.Debugf("Submit update device inventory job for device: %q", updateDeviceInventoryReq.DeviceId)
1✔
326

1✔
327
        UpdateDeviceInventoryReqJson, err := json.Marshal(updateDeviceInventoryReq)
1✔
328
        if err != nil {
1✔
329
                return errors.Wrapf(err, "failed to submit update device inventory job")
×
330
        }
×
331

332
        contentReader := bytes.NewReader(UpdateDeviceInventoryReqJson)
1✔
333

1✔
334
        req, err := http.NewRequest(
1✔
335
                http.MethodPost,
1✔
336
                utils.JoinURL(co.conf.OrchestratorAddr, UpdateDeviceInventoryOrchestratorUri),
1✔
337
                contentReader)
1✔
338
        if err != nil {
1✔
339
                return errors.Wrapf(err, "failed to create request")
×
340
        }
×
341

342
        req.Header.Set("Content-Type", "application/json")
1✔
343

1✔
344
        // set the workflows client request timeout
1✔
345
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
1✔
346
        defer cancel()
1✔
347

1✔
348
        rsp, err := co.http.Do(req.WithContext(ctx))
1✔
349
        if err != nil {
1✔
350
                return errors.Wrapf(err, "failed to submit update device inventory job")
×
351
        }
×
352
        defer rsp.Body.Close()
1✔
353

1✔
354
        if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusCreated {
1✔
355
                body, err := ioutil.ReadAll(rsp.Body)
×
356
                if err != nil {
×
357
                        body = []byte("<failed to read>")
×
358
                }
×
359
                l.Errorf("update device inventory request %s %s failed with status %v, response text: %s",
×
360
                        req.Method, req.URL, rsp.Status, body)
×
361

×
362
                return errors.Errorf(
×
363
                        "submit update device inventory request failed with status %v", rsp.Status)
×
364
        }
365
        return nil
1✔
366
}
367

368
func (co *Client) SubmitReindexReporting(ctx context.Context, device string) error {
8✔
369
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
8✔
370
        defer cancel()
8✔
371

8✔
372
        tenantID := ""
8✔
373
        if id := identity.FromContext(ctx); id != nil {
16✔
374
                tenantID = id.Tenant
8✔
375
        }
8✔
376
        wflow := ReindexReportingWorkflow{
8✔
377
                RequestID: requestid.FromContext(ctx),
8✔
378
                TenantID:  tenantID,
8✔
379
                DeviceID:  device,
8✔
380
                Service:   ServiceDeviceauth,
8✔
381
        }
8✔
382
        payload, _ := json.Marshal(wflow)
8✔
383
        req, err := http.NewRequestWithContext(ctx,
8✔
384
                "POST",
8✔
385
                utils.JoinURL(co.conf.OrchestratorAddr, ReindexReportingURI),
8✔
386
                bytes.NewReader(payload),
8✔
387
        )
8✔
388
        if err != nil {
8✔
389
                return errors.Wrap(err, "workflows: error preparing HTTP request")
×
390
        }
×
391

392
        req.Header.Set("Content-Type", "application/json")
8✔
393

8✔
394
        rsp, err := co.http.Do(req)
8✔
395
        if err != nil {
10✔
396
                return errors.Wrap(err, "workflows: failed to submit reindex job")
2✔
397
        }
2✔
398
        defer rsp.Body.Close()
6✔
399

6✔
400
        if rsp.StatusCode < 300 {
8✔
401
                return nil
2✔
402
        } else if rsp.StatusCode == http.StatusNotFound {
8✔
403
                return errors.New(`workflows: workflow "reindex_reporting" not defined`)
2✔
404
        }
2✔
405

406
        return errors.Errorf(
2✔
407
                "workflows: unexpected HTTP status from workflows service: %s",
2✔
408
                rsp.Status,
2✔
409
        )
2✔
410
}
411

412
func (co *Client) SubmitReindexReportingBatch(ctx context.Context, devices []string) error {
8✔
413
        ctx, cancel := context.WithTimeout(ctx, co.conf.Timeout)
8✔
414
        defer cancel()
8✔
415

8✔
416
        tenantID := ""
8✔
417
        if id := identity.FromContext(ctx); id != nil {
16✔
418
                tenantID = id.Tenant
8✔
419
        }
8✔
420
        reqID := requestid.FromContext(ctx)
8✔
421
        wflows := make([]ReindexReportingWorkflow, len(devices))
8✔
422
        for i, device := range devices {
18✔
423
                wflows[i] = ReindexReportingWorkflow{
10✔
424
                        RequestID: reqID,
10✔
425
                        TenantID:  tenantID,
10✔
426
                        DeviceID:  device,
10✔
427
                        Service:   ServiceDeviceauth,
10✔
428
                }
10✔
429
        }
10✔
430
        payload, _ := json.Marshal(wflows)
8✔
431
        req, err := http.NewRequestWithContext(ctx,
8✔
432
                "POST",
8✔
433
                utils.JoinURL(co.conf.OrchestratorAddr, ReindexReportingBatchURI),
8✔
434
                bytes.NewReader(payload),
8✔
435
        )
8✔
436
        if err != nil {
8✔
437
                return errors.Wrap(err, "workflows: error preparing HTTP request")
×
438
        }
×
439

440
        req.Header.Set("Content-Type", "application/json")
8✔
441

8✔
442
        rsp, err := co.http.Do(req)
8✔
443
        if err != nil {
10✔
444
                return errors.Wrap(err, "workflows: failed to submit reindex job")
2✔
445
        }
2✔
446
        defer rsp.Body.Close()
6✔
447

6✔
448
        if rsp.StatusCode < 300 {
8✔
449
                return nil
2✔
450
        } else if rsp.StatusCode == http.StatusNotFound {
8✔
451
                return errors.New(`workflows: workflow "reindex_reporting" not defined`)
2✔
452
        }
2✔
453

454
        return errors.Errorf(
2✔
455
                "workflows: unexpected HTTP status from workflows service: %s",
2✔
456
                rsp.Status,
2✔
457
        )
2✔
458
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc