• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / iot-manager / 1445709825

09 Sep 2024 11:23AM UTC coverage: 86.917% (-0.3%) from 87.172%
1445709825

Pull #303

gitlab-ci

alfrunes
ci: update gitlab runner

Moving to deprecated docker gitlab public runner, to a self-hosted runner

Ticket: SEC-1133
Changelog: none

Signed-off-by: Roberto Giovanardi <roberto.giovanardi@northern.tech>
(cherry picked from commit bb026f77c)
Pull Request #303: Cherry-pick MEN-7478 to 1.3.x (3.7.x)

42 of 54 new or added lines in 2 files covered. (77.78%)

128 existing lines in 10 files now uncovered.

3169 of 3646 relevant lines covered (86.92%)

9.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.77
/app/app.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package app
16

17
import (
18
        "context"
19
        "net/http"
20
        "time"
21

22
        "github.com/google/uuid"
23
        "github.com/pkg/errors"
24

25
        "github.com/mendersoftware/go-lib-micro/identity"
26
        "github.com/mendersoftware/go-lib-micro/log"
27

28
        "github.com/mendersoftware/iot-manager/client"
29
        "github.com/mendersoftware/iot-manager/client/devauth"
30
        "github.com/mendersoftware/iot-manager/client/iotcore"
31
        "github.com/mendersoftware/iot-manager/client/iothub"
32
        "github.com/mendersoftware/iot-manager/client/workflows"
33
        "github.com/mendersoftware/iot-manager/model"
34
        "github.com/mendersoftware/iot-manager/store"
35
)
36

37
var (
38
        ErrIntegrationNotFound = errors.New("integration not found")
39
        ErrIntegrationExists   = errors.New("integration already exists")
40
        ErrUnknownIntegration  = errors.New("unknown integration provider")
41
        ErrNoCredentials       = errors.New("no connection string or credentials " +
42
                "configured for the tenant")
43
        ErrNoDeviceConnectionString = errors.New("device has no connection string")
44

45
        ErrDeviceAlreadyExists     = errors.New("device already exists")
46
        ErrDeviceNotFound          = errors.New("device not found")
47
        ErrDeviceStateConflict     = errors.New("conflict when updating the device state")
48
        ErrCannotRemoveIntegration = errors.New("cannot remove integration in use by devices")
49
)
50

51
const (
52
        confKeyPrimaryKey     = "azureConnectionString"
53
        confKeyAWSCertificate = "awsCertificate"
54
        confKeyAWSPrivateKey  = "awsPrivateKey"
55
        confKeyAWSEndpoint    = "awsEndpoint"
56
)
57

58
// App interface describes app objects
59
//
60
//nolint:lll
61
//go:generate ../utils/mockgen.sh
62
type App interface {
63
        WithIoTCore(client iotcore.Client) App
64
        WithIoTHub(client iothub.Client) App
65
        HealthCheck(context.Context) error
66
        GetDeviceIntegrations(context.Context, string) ([]model.Integration, error)
67
        GetIntegrations(context.Context) ([]model.Integration, error)
68
        GetIntegrationById(context.Context, uuid.UUID) (*model.Integration, error)
69
        CreateIntegration(context.Context, model.Integration) (*model.Integration, error)
70
        SetDeviceStatus(context.Context, string, model.Status) error
71
        SetIntegrationCredentials(context.Context, uuid.UUID, model.Credentials) error
72
        RemoveIntegration(context.Context, uuid.UUID) error
73
        GetDevice(context.Context, string) (*model.Device, error)
74
        GetDeviceStateIntegration(context.Context, string, uuid.UUID) (*model.DeviceState, error)
75
        SetDeviceStateIntegration(context.Context, string, uuid.UUID, *model.DeviceState) (*model.DeviceState, error)
76
        GetDeviceStateIoTHub(context.Context, string, *model.Integration) (*model.DeviceState, error)
77
        SetDeviceStateIoTHub(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
78
        GetDeviceStateIoTCore(context.Context, string, *model.Integration) (*model.DeviceState, error)
79
        SetDeviceStateIoTCore(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
80
        ProvisionDevice(context.Context, model.DeviceEvent) error
81
        DecommissionDevice(context.Context, string) error
82

83
        SyncDevices(context.Context, int, bool) error
84

85
        GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error)
86
        VerifyDeviceTwin(ctx context.Context, req model.PreauthRequest) error
87
}
88

89
// app is an app object
90
type app struct {
91
        store         store.DataStore
92
        iothubClient  iothub.Client
93
        iotcoreClient iotcore.Client
94
        wf            workflows.Client
95
        devauth       devauth.Client
96
        httpClient    *http.Client
97
}
98

99
// NewApp initialize a new iot-manager App
100
func New(ds store.DataStore, wf workflows.Client, da devauth.Client) App {
91✔
101
        c := client.New()
91✔
102
        hubClient := iothub.NewClient(
91✔
103
                iothub.NewOptions().SetClient(c),
91✔
104
        )
91✔
105
        return &app{
91✔
106
                store:        ds,
91✔
107
                wf:           wf,
91✔
108
                devauth:      da,
91✔
109
                iothubClient: hubClient,
91✔
110
                httpClient:   c,
91✔
111
        }
91✔
112
}
91✔
113

114
// WithIoTHub sets the IoT Hub client
115
func (a *app) WithIoTHub(client iothub.Client) App {
33✔
116
        a.iothubClient = client
33✔
117
        return a
33✔
118
}
33✔
119

120
// WithIoTCore sets the IoT Core client
121
func (a *app) WithIoTCore(client iotcore.Client) App {
25✔
122
        a.iotcoreClient = client
25✔
123
        return a
25✔
124
}
25✔
125

126
// HealthCheck performs a health check and returns an error if it fails
127
func (a *app) HealthCheck(ctx context.Context) error {
2✔
128
        return a.store.Ping(ctx)
2✔
129
}
2✔
130

131
func (a *app) GetIntegrations(ctx context.Context) ([]model.Integration, error) {
33✔
132
        return a.store.GetIntegrations(ctx, model.IntegrationFilter{})
33✔
133
}
33✔
134

135
func (a *app) GetIntegrationById(ctx context.Context, id uuid.UUID) (*model.Integration, error) {
3✔
136
        integration, err := a.store.GetIntegrationById(ctx, id)
3✔
137
        if err != nil {
5✔
138
                switch cause := errors.Cause(err); cause {
2✔
139
                case store.ErrObjectNotFound:
1✔
140
                        return nil, ErrIntegrationNotFound
1✔
141
                default:
1✔
142
                        return nil, err
1✔
143
                }
144
        }
145
        return integration, err
1✔
146
}
147

148
func (a *app) CreateIntegration(
149
        ctx context.Context,
150
        integration model.Integration,
151
) (*model.Integration, error) {
8✔
152
        result, err := a.store.CreateIntegration(ctx, integration)
8✔
153
        if err == store.ErrObjectExists {
8✔
UNCOV
154
                return nil, ErrIntegrationExists
×
UNCOV
155
        }
×
156
        return result, err
8✔
157
}
158

159
func (a *app) SetIntegrationCredentials(
160
        ctx context.Context,
161
        integrationID uuid.UUID,
162
        credentials model.Credentials,
163
) error {
3✔
164
        err := a.store.SetIntegrationCredentials(ctx, integrationID, credentials)
3✔
165
        if err != nil {
5✔
166
                switch cause := errors.Cause(err); cause {
2✔
167
                case store.ErrObjectNotFound:
1✔
168
                        return ErrIntegrationNotFound
1✔
169
                default:
1✔
170
                        return err
1✔
171
                }
172
        }
173
        return err
1✔
174
}
175

176
func (a *app) RemoveIntegration(
177
        ctx context.Context,
178
        integrationID uuid.UUID,
179
) error {
5✔
180
        itg, err := a.store.GetIntegrationById(ctx, integrationID)
5✔
181
        if err == nil {
9✔
182
                if itg.Provider != model.ProviderWebhook {
8✔
183
                        // check if there are any devices with given integration enabled
4✔
184
                        devicesExist, err := a.store.
4✔
185
                                DoDevicesExistByIntegrationID(ctx, integrationID)
4✔
186
                        if err != nil {
5✔
187
                                return err
1✔
188
                        }
1✔
189
                        if devicesExist {
4✔
190
                                return ErrCannotRemoveIntegration
1✔
191
                        }
1✔
192
                }
193
                err = a.store.RemoveIntegration(ctx, integrationID)
2✔
194
        }
195
        if errors.Is(err, store.ErrObjectNotFound) {
4✔
196
                return ErrIntegrationNotFound
1✔
197
        }
1✔
198
        return err
2✔
199
}
200

201
func (a *app) GetDeviceIntegrations(
202
        ctx context.Context,
203
        deviceID string,
UNCOV
204
) ([]model.Integration, error) {
×
UNCOV
205
        device, err := a.store.GetDevice(ctx, deviceID)
×
UNCOV
206
        if err != nil {
×
UNCOV
207
                if err == store.ErrObjectNotFound {
×
UNCOV
208
                        return nil, ErrDeviceNotFound
×
UNCOV
209
                }
×
UNCOV
210
                return nil, errors.Wrap(err, "app: failed to get device integrations")
×
211
        }
UNCOV
212
        if len(device.IntegrationIDs) > 0 {
×
213
                integrations, err := a.store.GetIntegrations(ctx,
×
214
                        model.IntegrationFilter{IDs: device.IntegrationIDs},
×
215
                )
×
216
                return integrations, errors.Wrap(err, "app: failed to get device integrations")
×
217
        }
×
218
        return []model.Integration{}, nil
×
219
}
220

221
func (a *app) SetDeviceStatus(ctx context.Context, deviceID string, status model.Status) error {
7✔
222
        integrations, err := a.store.GetIntegrations(ctx, model.IntegrationFilter{})
7✔
223
        if err != nil {
8✔
224
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
225
        }
1✔
226
        event := model.Event{
6✔
227
                WebhookEvent: model.WebhookEvent{
6✔
228
                        ID:   uuid.New(),
6✔
229
                        Type: model.EventTypeDeviceStatusChanged,
6✔
230
                        Data: model.DeviceEvent{
6✔
231
                                ID:     deviceID,
6✔
232
                                Status: status,
6✔
233
                        },
6✔
234
                        EventTS: time.Now(),
6✔
235
                },
6✔
236
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
6✔
237
        }
6✔
238

6✔
239
        var (
6✔
240
                ok       bool
6✔
241
                errStack model.ErrorStack
6✔
242
                device   = newDevice(deviceID, a.store)
6✔
243
        )
6✔
244
        for _, integration := range integrations {
15✔
245
                deliver := model.DeliveryStatus{
9✔
246
                        IntegrationID: integration.ID,
9✔
247
                        Success:       true,
9✔
248
                }
9✔
249
                switch integration.Provider {
9✔
250
                case model.ProviderIoTHub:
1✔
251
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
252
                        if err != nil {
1✔
UNCOV
253
                                break // switch
×
254
                        } else if !ok {
1✔
UNCOV
255
                                continue // loop
×
256
                        }
257
                        err = a.setDeviceStatusIoTHub(ctx, deviceID, status, integration)
1✔
258

259
                case model.ProviderIoTCore:
1✔
260
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
261
                        if err != nil {
1✔
UNCOV
262
                                break // switch
×
263
                        } else if !ok {
1✔
UNCOV
264
                                continue // loop
×
265
                        }
266
                        err = a.setDeviceStatusIoTCore(ctx, deviceID, status, integration)
1✔
267

268
                case model.ProviderWebhook:
6✔
269
                        var (
6✔
270
                                req *http.Request
6✔
271
                                rsp *http.Response
6✔
272
                        )
6✔
273
                        req, err = client.NewWebhookRequest(ctx,
6✔
274
                                &integration.Credentials,
6✔
275
                                event.WebhookEvent)
6✔
276
                        if err != nil {
7✔
277
                                break // switch
1✔
278
                        }
279
                        rsp, err = a.httpClient.Do(req)
5✔
280
                        if err != nil {
7✔
281
                                break // switch
2✔
282
                        }
283
                        deliver.StatusCode = &rsp.StatusCode
3✔
284
                        if rsp.StatusCode >= 300 {
4✔
285
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
286
                        }
1✔
287
                        _ = rsp.Body.Close()
3✔
288

289
                default:
1✔
290
                        continue
1✔
291
                }
292
                if err != nil {
12✔
293
                        var httpError client.HTTPError
4✔
294
                        if errors.As(err, &httpError) {
5✔
295
                                errCode := httpError.Code()
1✔
296
                                deliver.StatusCode = &errCode
1✔
297
                        }
1✔
298
                        deliver.Success = false
4✔
299
                        deliver.Error = err.Error()
4✔
300
                        _ = errStack.Push(err)
4✔
301
                }
302
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
8✔
303
        }
304
        err = a.store.SaveEvent(ctx, event)
6✔
305
        if errStack != nil {
10✔
306
                if err != nil {
5✔
307
                        err = errors.WithMessage(err, errStack.Error())
1✔
308
                } else {
4✔
309
                        err = errStack
3✔
310
                }
3✔
311
        }
312
        return err
6✔
313
}
314

315
func (a *app) ProvisionDevice(
316
        ctx context.Context,
317
        device model.DeviceEvent,
318
) error {
7✔
319
        integrations, err := a.GetIntegrations(ctx)
7✔
320
        if err != nil {
8✔
321
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
322
        }
1✔
323
        var errStack model.ErrorStack
6✔
324
        event := model.Event{
6✔
325
                WebhookEvent: model.WebhookEvent{
6✔
326
                        ID:      uuid.New(),
6✔
327
                        Type:    model.EventTypeDeviceProvisioned,
6✔
328
                        Data:    device,
6✔
329
                        EventTS: time.Now(),
6✔
330
                },
6✔
331
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
6✔
332
        }
6✔
333
        integrationIDs := make([]uuid.UUID, 0, len(integrations))
6✔
334
        for _, integration := range integrations {
12✔
335
                deliver := model.DeliveryStatus{
6✔
336
                        IntegrationID: integration.ID,
6✔
337
                        Success:       true,
6✔
338
                }
6✔
339
                switch integration.Provider {
6✔
UNCOV
340
                case model.ProviderIoTHub:
×
UNCOV
341
                        err = a.provisionIoTHubDevice(ctx, device.ID, integration)
×
UNCOV
342
                        integrationIDs = append(integrationIDs, integration.ID)
×
UNCOV
343
                case model.ProviderIoTCore:
×
UNCOV
344
                        err = a.provisionIoTCoreDevice(ctx, device.ID, integration, &iotcore.Device{
×
UNCOV
345
                                Status: iotcore.StatusEnabled,
×
UNCOV
346
                        })
×
UNCOV
347
                        integrationIDs = append(integrationIDs, integration.ID)
×
348
                case model.ProviderWebhook:
5✔
349
                        var (
5✔
350
                                req *http.Request
5✔
351
                                rsp *http.Response
5✔
352
                        )
5✔
353
                        req, err = client.NewWebhookRequest(ctx,
5✔
354
                                &integration.Credentials,
5✔
355
                                event.WebhookEvent)
5✔
356
                        if err != nil {
6✔
357
                                break // switch
1✔
358
                        }
359
                        rsp, err = a.httpClient.Do(req)
4✔
360
                        if err != nil {
5✔
361
                                break // switch
1✔
362
                        }
363
                        deliver.StatusCode = &rsp.StatusCode
3✔
364
                        if rsp.StatusCode >= 300 {
4✔
365
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
366
                        }
1✔
367
                        _ = rsp.Body.Close()
3✔
368

369
                default:
1✔
370
                        continue
1✔
371
                }
372
                if err != nil {
8✔
373
                        var httpError client.HTTPError
3✔
374
                        if errors.As(err, &httpError) {
4✔
375
                                errCode := httpError.Code()
1✔
376
                                deliver.StatusCode = &errCode
1✔
377
                        }
1✔
378
                        deliver.Success = false
3✔
379
                        deliver.Error = err.Error()
3✔
380
                        _ = errStack.Push(err)
3✔
381
                }
382
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
5✔
383
        }
384
        _, err = a.store.UpsertDeviceIntegrations(ctx, device.ID, integrationIDs)
6✔
385
        errSave := a.store.SaveEvent(ctx, event)
6✔
386
        if errSave != nil {
7✔
387
                _ = errStack.Push(errSave)
1✔
388
        }
1✔
389
        if errStack != nil {
10✔
390
                if err != nil {
5✔
391
                        err = errors.Wrap(err, errStack.Error())
1✔
392
                } else {
4✔
393
                        err = errStack
3✔
394
                }
3✔
395
        }
396
        return err
6✔
397
}
398

399
func (a *app) syncBatch(
400
        ctx context.Context,
401
        devices []model.Device,
402
        integCache map[uuid.UUID]*model.Integration,
403
        failEarly bool,
404
) error {
3✔
405
        var err error
3✔
406
        l := log.FromContext(ctx)
3✔
407

3✔
408
        deviceMap := make(map[uuid.UUID][]string, len(integCache))
3✔
409
        for _, dev := range devices {
33✔
410
                for _, id := range dev.IntegrationIDs {
60✔
411
                        deviceMap[id] = append(deviceMap[id], dev.ID)
30✔
412
                }
30✔
413
        }
414

415
        for integID, deviceIDs := range deviceMap {
6✔
416
                integration, ok := integCache[integID]
3✔
417
                if !ok {
3✔
UNCOV
418
                        // (Data race) Try again to fetch the integration
×
UNCOV
419
                        integration, err = a.store.GetIntegrationById(ctx, integID)
×
UNCOV
420
                        if err != nil {
×
UNCOV
421
                                if err == store.ErrObjectNotFound {
×
UNCOV
422
                                        integCache[integID] = nil
×
UNCOV
423
                                        continue
×
424
                                }
425
                                err = errors.Wrap(err, "failed to retrieve device integration")
×
426
                                if failEarly {
×
427
                                        return err
×
428
                                }
×
429
                                l.Errorf("failed to get device integration: %s", err)
×
430
                                continue
×
UNCOV
431
                        } else {
×
UNCOV
432
                                integCache[integID] = integration
×
UNCOV
433
                        }
×
434
                }
435
                if integration == nil {
3✔
UNCOV
436
                        // Should not occur, but is not impossible since mongo client
×
UNCOV
437
                        // caches batches of results.
×
UNCOV
438
                        _, err := a.store.RemoveDevicesFromIntegration(ctx, integID)
×
UNCOV
439
                        if err != nil {
×
UNCOV
440
                                err = errors.Wrap(err, "failed to remove integration from devices")
×
UNCOV
441
                                if failEarly {
×
UNCOV
442
                                        return err
×
UNCOV
443
                                }
×
UNCOV
444
                                l.Error(err)
×
445
                        }
UNCOV
446
                        continue
×
447
                }
448

449
                switch integration.Provider {
3✔
450
                case model.ProviderIoTHub:
3✔
451
                        err := a.syncIoTHubDevices(ctx, deviceIDs, *integration, failEarly)
3✔
452
                        if err != nil {
3✔
UNCOV
453
                                if failEarly {
×
UNCOV
454
                                        return err
×
UNCOV
455
                                }
×
UNCOV
456
                                l.Error(err)
×
457
                        }
UNCOV
458
                case model.ProviderIoTCore:
×
UNCOV
459
                        err := a.syncIoTCoreDevices(ctx, deviceIDs, *integration, failEarly)
×
460
                        if err != nil {
×
461
                                if failEarly {
×
462
                                        return err
×
463
                                }
×
464
                                l.Error(err)
×
465
                        }
UNCOV
466
                default:
×
467
                }
468
        }
469

470
        return nil
3✔
471
}
472

473
func (a app) syncCacheIntegrations(ctx context.Context) (map[uuid.UUID]*model.Integration, error) {
3✔
474
        // NOTE At the time of writing this, we don't allow more than one
3✔
475
        //      integration per tenant so this const doesn't matter.
3✔
476
        // TODO Will we need a more sophisticated cache data structure?
3✔
477
        const MaxIntegrationsToCache = 20
3✔
478
        // Cache integrations for the given tenant
3✔
479
        integrations, err := a.store.GetIntegrations(
3✔
480
                ctx, model.IntegrationFilter{Limit: MaxIntegrationsToCache},
3✔
481
        )
3✔
482
        if err != nil {
3✔
483
                return nil, errors.Wrap(err, "failed to get integrations for tenant")
×
484
        }
×
485
        integCache := make(map[uuid.UUID]*model.Integration, len(integrations))
3✔
486
        for i := range integrations {
5✔
487
                integCache[integrations[i].ID] = &integrations[i]
2✔
488
        }
2✔
489
        return integCache, nil
3✔
490
}
491

492
func (a *app) SyncDevices(
493
        ctx context.Context,
494
        batchSize int,
495
        failEarly bool,
496
) error {
1✔
497
        type DeviceWithTenantID struct {
1✔
498
                model.Device `bson:",inline"`
1✔
499
                TenantID     string `bson:"tenant_id"`
1✔
500
        }
1✔
501
        iter, err := a.store.GetAllDevices(ctx)
1✔
502
        if err != nil {
1✔
503
                return err
×
504
        }
×
505
        defer iter.Close(ctx)
1✔
506

1✔
507
        var (
1✔
508
                deviceBatch        = make([]model.Device, 0, batchSize)
1✔
509
                tenantID    string = ""
1✔
510
                integCache  map[uuid.UUID]*model.Integration
1✔
511
        )
1✔
512
        tCtx := identity.WithContext(ctx, &identity.Identity{
1✔
513
                Tenant: tenantID,
1✔
514
        })
1✔
515
        integCache, err = a.syncCacheIntegrations(tCtx)
1✔
516
        if err != nil {
1✔
UNCOV
517
                return err
×
UNCOV
518
        }
×
519
        for iter.Next(ctx) {
31✔
520
                dev := DeviceWithTenantID{}
30✔
521
                err := iter.Decode(&dev)
30✔
522
                if err != nil {
30✔
UNCOV
523
                        return err
×
UNCOV
524
                }
×
525
                if len(deviceBatch) == cap(deviceBatch) ||
30✔
526
                        (tenantID != dev.TenantID && len(deviceBatch) > 0) {
32✔
527
                        err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
2✔
528
                        if err != nil {
2✔
UNCOV
529
                                return err
×
UNCOV
530
                        }
×
531
                        deviceBatch = deviceBatch[:0]
2✔
532
                }
533
                if tenantID != dev.TenantID {
32✔
534
                        tenantID = dev.TenantID
2✔
535
                        tCtx = identity.WithContext(ctx, &identity.Identity{
2✔
536
                                Tenant: tenantID,
2✔
537
                        })
2✔
538

2✔
539
                        integCache, err = a.syncCacheIntegrations(tCtx)
2✔
540
                        if err != nil {
2✔
UNCOV
541
                                return err
×
UNCOV
542
                        }
×
543

544
                }
545
                deviceBatch = append(deviceBatch, dev.Device)
30✔
546
        }
547
        if len(deviceBatch) > 0 {
2✔
548
                err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
1✔
549
                if err != nil {
1✔
UNCOV
550
                        return err
×
UNCOV
551
                }
×
552
        }
553
        return nil
1✔
554
}
555

556
func (a *app) DecommissionDevice(ctx context.Context, deviceID string) error {
16✔
557
        integrations, err := a.GetIntegrations(ctx)
16✔
558
        if err != nil {
19✔
559
                return err
3✔
560
        }
3✔
561
        var (
13✔
562
                errStack model.ErrorStack
13✔
563
                device   = newDevice(deviceID, a.store)
13✔
564
        )
13✔
565
        event := model.Event{
13✔
566
                WebhookEvent: model.WebhookEvent{
13✔
567
                        ID:   uuid.New(),
13✔
568
                        Type: model.EventTypeDeviceDecommissioned,
13✔
569
                        Data: model.DeviceEvent{
13✔
570
                                ID: deviceID,
13✔
571
                        },
13✔
572
                        EventTS: time.Now(),
13✔
573
                },
13✔
574
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
13✔
575
        }
13✔
576
        for _, integration := range integrations {
29✔
577
                var (
16✔
578
                        err error
16✔
579
                        ok  bool
16✔
580
                )
16✔
581
                deliver := model.DeliveryStatus{
16✔
582
                        IntegrationID: integration.ID,
16✔
583
                        Success:       true,
16✔
584
                }
16✔
585
                switch integration.Provider {
16✔
586
                case model.ProviderIoTHub:
5✔
587
                        ok, err = device.HasIntegration(ctx, integration.ID)
5✔
588
                        if err != nil {
5✔
UNCOV
589
                                break // switch
×
590
                        } else if !ok {
6✔
591
                                continue // loop
1✔
592
                        }
593
                        err = a.decommissionIoTHubDevice(ctx, deviceID, integration)
4✔
594
                case model.ProviderIoTCore:
4✔
595
                        ok, err = device.HasIntegration(ctx, integration.ID)
4✔
596
                        if err != nil {
4✔
UNCOV
597
                                break // switch
×
598
                        } else if !ok {
4✔
UNCOV
599
                                continue // loop
×
600
                        }
601
                        err = a.decommissionIoTCoreDevice(ctx, deviceID, integration)
4✔
602
                case model.ProviderWebhook:
6✔
603
                        var (
6✔
604
                                req *http.Request
6✔
605
                                rsp *http.Response
6✔
606
                        )
6✔
607
                        req, err = client.NewWebhookRequest(ctx,
6✔
608
                                &integration.Credentials,
6✔
609
                                event.WebhookEvent)
6✔
610
                        if err != nil {
7✔
611
                                break // switch
1✔
612
                        }
613
                        rsp, err = a.httpClient.Do(req)
5✔
614
                        if err != nil {
7✔
615
                                break // switch
2✔
616
                        }
617
                        deliver.StatusCode = &rsp.StatusCode
3✔
618
                        if rsp.StatusCode >= 300 {
4✔
619
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
620
                        }
1✔
621
                        _ = rsp.Body.Close()
3✔
622

623
                default:
1✔
624
                        continue
1✔
625
                }
626
                if err != nil {
19✔
627
                        var httpError client.HTTPError
5✔
628
                        if errors.As(err, &httpError) {
6✔
629
                                errCode := httpError.Code()
1✔
630
                                deliver.StatusCode = &errCode
1✔
631
                        }
1✔
632
                        deliver.Success = false
5✔
633
                        deliver.Error = err.Error()
5✔
634
                        _ = errStack.Push(err)
5✔
635
                }
636
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
14✔
637
        }
638
        err = a.store.DeleteDevice(ctx, deviceID)
13✔
639
        if err == store.ErrObjectNotFound {
16✔
640
                err = ErrDeviceNotFound
3✔
641
        }
3✔
642
        errSave := a.store.SaveEvent(ctx, event)
13✔
643
        if errSave != nil {
13✔
UNCOV
644
                _ = errStack.Push(errSave)
×
UNCOV
645
        }
×
646
        if errStack != nil {
18✔
647
                if err != nil {
6✔
648
                        err = errors.Wrap(err, errStack.Error())
1✔
649
                } else {
5✔
650
                        err = errStack
4✔
651
                }
4✔
652
        }
653
        return err
13✔
654
}
655

656
func (a *app) GetDevice(ctx context.Context, deviceID string) (*model.Device, error) {
3✔
657
        device, err := a.store.GetDevice(ctx, deviceID)
3✔
658
        if err == store.ErrObjectNotFound {
4✔
659
                return nil, ErrDeviceNotFound
1✔
660
        }
1✔
661
        return device, err
2✔
662
}
663

664
func (a *app) GetDeviceStateIntegration(
665
        ctx context.Context,
666
        deviceID string,
667
        integrationID uuid.UUID,
668
) (*model.DeviceState, error) {
7✔
669
        _, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
7✔
670
        if err != nil {
9✔
671
                if err == store.ErrObjectNotFound {
3✔
672
                        return nil, ErrIntegrationNotFound
1✔
673
                }
1✔
674
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
675
        }
676
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
5✔
677
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
6✔
678
                return nil, ErrIntegrationNotFound
1✔
679
        } else if err != nil {
6✔
680
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
681
        }
1✔
682
        switch integration.Provider {
3✔
683
        case model.ProviderIoTHub:
1✔
684
                return a.GetDeviceStateIoTHub(ctx, deviceID, integration)
1✔
685
        case model.ProviderIoTCore:
1✔
686
                return a.GetDeviceStateIoTCore(ctx, deviceID, integration)
1✔
687
        default:
1✔
688
                return nil, ErrUnknownIntegration
1✔
689
        }
690
}
691

692
func (a *app) SetDeviceStateIntegration(
693
        ctx context.Context,
694
        deviceID string,
695
        integrationID uuid.UUID,
696
        state *model.DeviceState,
697
) (*model.DeviceState, error) {
6✔
698
        device, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
6✔
699
        if err != nil {
7✔
700
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
701
        } else if device == nil {
7✔
702
                return nil, ErrIntegrationNotFound
1✔
703
        }
1✔
704
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
4✔
705
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
5✔
706
                return nil, ErrIntegrationNotFound
1✔
707
        } else if err != nil {
5✔
708
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
709
        }
1✔
710
        switch integration.Provider {
2✔
711
        case model.ProviderIoTHub:
1✔
712
                return a.SetDeviceStateIoTHub(ctx, deviceID, integration, state)
1✔
UNCOV
713
        case model.ProviderIoTCore:
×
UNCOV
714
                return a.SetDeviceStateIoTCore(ctx, deviceID, integration, state)
×
715
        default:
1✔
716
                return nil, ErrUnknownIntegration
1✔
717
        }
718
}
719

720
func (a *app) GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error) {
4✔
721
        return a.store.GetEvents(ctx, filter)
4✔
722
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc