• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / iot-manager / 1336988003

18 Jun 2024 09:38AM UTC coverage: 87.602%. Remained the same
1336988003

Pull #288

gitlab-ci

alfrunes
test(accpetance): Infer Docker compose service name from host

Remove hard-coded host name from config and actually use the `--host`
pytest config.

Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
Pull Request #288: test(accpetance): Infer Docker compose service name from host

3229 of 3686 relevant lines covered (87.6%)

11.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/app/app.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package app
16

17
import (
18
        "context"
19
        "net/http"
20
        "time"
21

22
        "github.com/google/uuid"
23
        "github.com/pkg/errors"
24

25
        "github.com/mendersoftware/go-lib-micro/identity"
26
        "github.com/mendersoftware/go-lib-micro/log"
27

28
        "github.com/mendersoftware/iot-manager/client"
29
        "github.com/mendersoftware/iot-manager/client/devauth"
30
        "github.com/mendersoftware/iot-manager/client/iotcore"
31
        "github.com/mendersoftware/iot-manager/client/iothub"
32
        "github.com/mendersoftware/iot-manager/client/workflows"
33
        "github.com/mendersoftware/iot-manager/model"
34
        "github.com/mendersoftware/iot-manager/store"
35
)
36

37
var (
38
        ErrIntegrationNotFound = errors.New("integration not found")
39
        ErrIntegrationExists   = errors.New("integration already exists")
40
        ErrUnknownIntegration  = errors.New("unknown integration provider")
41
        ErrNoCredentials       = errors.New("no connection string or credentials " +
42
                "configured for the tenant")
43
        ErrNoDeviceConnectionString = errors.New("device has no connection string")
44

45
        ErrDeviceAlreadyExists     = errors.New("device already exists")
46
        ErrDeviceNotFound          = errors.New("device not found")
47
        ErrDeviceStateConflict     = errors.New("conflict when updating the device state")
48
        ErrCannotRemoveIntegration = errors.New("cannot remove integration in use by devices")
49
)
50

51
const (
52
        confKeyPrimaryKey     = "azureConnectionString"
53
        confKeyAWSCertificate = "awsCertificate"
54
        confKeyAWSPrivateKey  = "awsPrivateKey"
55
        confKeyAWSEndpoint    = "awsEndpoint"
56
)
57

58
// App interface describes app objects
59
//
60
//nolint:lll
61
//go:generate ../utils/mockgen.sh
62
type App interface {
63
        WithIoTCore(client iotcore.Client) App
64
        WithIoTHub(client iothub.Client) App
65
        WithWebhooksTimeout(timeout uint) App
66
        HealthCheck(context.Context) error
67
        GetDeviceIntegrations(context.Context, string) ([]model.Integration, error)
68
        GetIntegrations(context.Context) ([]model.Integration, error)
69
        GetIntegrationById(context.Context, uuid.UUID) (*model.Integration, error)
70
        CreateIntegration(context.Context, model.Integration) (*model.Integration, error)
71
        SetDeviceStatus(context.Context, string, model.Status) error
72
        SetIntegrationCredentials(context.Context, uuid.UUID, model.Credentials) error
73
        RemoveIntegration(context.Context, uuid.UUID) error
74
        GetDevice(context.Context, string) (*model.Device, error)
75
        GetDeviceStateIntegration(context.Context, string, uuid.UUID) (*model.DeviceState, error)
76
        SetDeviceStateIntegration(context.Context, string, uuid.UUID, *model.DeviceState) (*model.DeviceState, error)
77
        GetDeviceStateIoTHub(context.Context, string, *model.Integration) (*model.DeviceState, error)
78
        SetDeviceStateIoTHub(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
79
        GetDeviceStateIoTCore(context.Context, string, *model.Integration) (*model.DeviceState, error)
80
        SetDeviceStateIoTCore(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
81
        ProvisionDevice(context.Context, model.DeviceEvent) error
82
        DecommissionDevice(context.Context, string) error
83

84
        SyncDevices(context.Context, int, bool) error
85

86
        GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error)
87
        VerifyDeviceTwin(ctx context.Context, req model.PreauthRequest) error
88
}
89

90
// app is an app object
91
type app struct {
92
        store           store.DataStore
93
        iothubClient    iothub.Client
94
        iotcoreClient   iotcore.Client
95
        wf              workflows.Client
96
        devauth         devauth.Client
97
        httpClient      *http.Client
98
        webhooksTimeout time.Duration
99
}
100

101
// NewApp initialize a new iot-manager App
102
func New(ds store.DataStore, wf workflows.Client, da devauth.Client) App {
94✔
103
        c := client.New()
94✔
104
        hubClient := iothub.NewClient(
94✔
105
                iothub.NewOptions().SetClient(c),
94✔
106
        )
94✔
107
        return &app{
94✔
108
                store:        ds,
94✔
109
                wf:           wf,
94✔
110
                devauth:      da,
94✔
111
                iothubClient: hubClient,
94✔
112
                httpClient:   c,
94✔
113
        }
94✔
114
}
94✔
115

116
// WithIoTHub sets the IoT Hub client
117
func (a *app) WithIoTHub(client iothub.Client) App {
34✔
118
        a.iothubClient = client
34✔
119
        return a
34✔
120
}
34✔
121

122
// WithIoTCore sets the IoT Core client
123
func (a *app) WithIoTCore(client iotcore.Client) App {
27✔
124
        a.iotcoreClient = client
27✔
125
        return a
27✔
126
}
27✔
127

128
// WithWebhooksTimeout sets the timeout for webhooks requests
129
func (a *app) WithWebhooksTimeout(timeout uint) App {
4✔
130
        a.webhooksTimeout = time.Duration(timeout * uint(time.Second))
4✔
131
        return a
4✔
132
}
4✔
133

134
// HealthCheck performs a health check and returns an error if it fails
135
func (a *app) HealthCheck(ctx context.Context) error {
2✔
136
        return a.store.Ping(ctx)
2✔
137
}
2✔
138

139
func (a *app) GetIntegrations(ctx context.Context) ([]model.Integration, error) {
37✔
140
        return a.store.GetIntegrations(ctx, model.IntegrationFilter{})
37✔
141
}
37✔
142

143
func (a *app) GetIntegrationById(ctx context.Context, id uuid.UUID) (*model.Integration, error) {
3✔
144
        integration, err := a.store.GetIntegrationById(ctx, id)
3✔
145
        if err != nil {
5✔
146
                switch cause := errors.Cause(err); cause {
2✔
147
                case store.ErrObjectNotFound:
1✔
148
                        return nil, ErrIntegrationNotFound
1✔
149
                default:
1✔
150
                        return nil, err
1✔
151
                }
152
        }
153
        return integration, err
1✔
154
}
155

156
func (a *app) CreateIntegration(
157
        ctx context.Context,
158
        integration model.Integration,
159
) (*model.Integration, error) {
10✔
160
        result, err := a.store.CreateIntegration(ctx, integration)
10✔
161
        if err == store.ErrObjectExists {
10✔
162
                return nil, ErrIntegrationExists
×
163
        }
×
164
        return result, err
10✔
165
}
166

167
func (a *app) SetIntegrationCredentials(
168
        ctx context.Context,
169
        integrationID uuid.UUID,
170
        credentials model.Credentials,
171
) error {
3✔
172
        err := a.store.SetIntegrationCredentials(ctx, integrationID, credentials)
3✔
173
        if err != nil {
5✔
174
                switch cause := errors.Cause(err); cause {
2✔
175
                case store.ErrObjectNotFound:
1✔
176
                        return ErrIntegrationNotFound
1✔
177
                default:
1✔
178
                        return err
1✔
179
                }
180
        }
181
        return err
1✔
182
}
183

184
func (a *app) RemoveIntegration(
185
        ctx context.Context,
186
        integrationID uuid.UUID,
187
) error {
5✔
188
        itg, err := a.store.GetIntegrationById(ctx, integrationID)
5✔
189
        if err == nil {
9✔
190
                if itg.Provider != model.ProviderWebhook {
8✔
191
                        // check if there are any devices with given integration enabled
4✔
192
                        devicesExist, err := a.store.
4✔
193
                                DoDevicesExistByIntegrationID(ctx, integrationID)
4✔
194
                        if err != nil {
5✔
195
                                return err
1✔
196
                        }
1✔
197
                        if devicesExist {
4✔
198
                                return ErrCannotRemoveIntegration
1✔
199
                        }
1✔
200
                }
201
                err = a.store.RemoveIntegration(ctx, integrationID)
2✔
202
        }
203
        if errors.Is(err, store.ErrObjectNotFound) {
4✔
204
                return ErrIntegrationNotFound
1✔
205
        }
1✔
206
        return err
2✔
207
}
208

209
func (a *app) GetDeviceIntegrations(
210
        ctx context.Context,
211
        deviceID string,
212
) ([]model.Integration, error) {
×
213
        device, err := a.store.GetDevice(ctx, deviceID)
×
214
        if err != nil {
×
215
                if err == store.ErrObjectNotFound {
×
216
                        return nil, ErrDeviceNotFound
×
217
                }
×
218
                return nil, errors.Wrap(err, "app: failed to get device integrations")
×
219
        }
220
        if len(device.IntegrationIDs) > 0 {
×
221
                integrations, err := a.store.GetIntegrations(ctx,
×
222
                        model.IntegrationFilter{IDs: device.IntegrationIDs},
×
223
                )
×
224
                return integrations, errors.Wrap(err, "app: failed to get device integrations")
×
225
        }
×
226
        return []model.Integration{}, nil
×
227
}
228

229
func (a *app) SetDeviceStatus(ctx context.Context, deviceID string, status model.Status) error {
8✔
230
        go func() {
16✔
231
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
8✔
232
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
8✔
233
                defer cancel()
8✔
234
                runAndLogError(ctxWithTimeout, func() error {
16✔
235
                        return a.setDeviceStatus(ctxWithTimeout, deviceID, status)
8✔
236
                })
8✔
237
        }()
238
        return nil
8✔
239
}
240

241
func (a *app) setDeviceStatus(ctx context.Context, deviceID string, status model.Status) error {
8✔
242
        integrations, err := a.store.GetIntegrations(ctx, model.IntegrationFilter{})
8✔
243
        if err != nil {
10✔
244
                if errors.Is(err, store.ErrObjectNotFound) {
3✔
245
                        return nil
1✔
246
                }
1✔
247
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
248
        }
249
        event := model.Event{
6✔
250
                WebhookEvent: model.WebhookEvent{
6✔
251
                        ID:   uuid.New(),
6✔
252
                        Type: model.EventTypeDeviceStatusChanged,
6✔
253
                        Data: model.DeviceEvent{
6✔
254
                                ID:     deviceID,
6✔
255
                                Status: status,
6✔
256
                        },
6✔
257
                        EventTS: time.Now(),
6✔
258
                },
6✔
259
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
6✔
260
        }
6✔
261

6✔
262
        var (
6✔
263
                ok     bool
6✔
264
                device = newDevice(deviceID, a.store)
6✔
265
        )
6✔
266
        for _, integration := range integrations {
15✔
267
                deliver := model.DeliveryStatus{
9✔
268
                        IntegrationID: integration.ID,
9✔
269
                        Success:       true,
9✔
270
                }
9✔
271
                switch integration.Provider {
9✔
272
                case model.ProviderIoTHub:
1✔
273
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
274
                        if err != nil {
1✔
275
                                break // switch
×
276
                        } else if !ok {
1✔
277
                                continue // loop
×
278
                        }
279
                        err = a.setDeviceStatusIoTHub(ctx, deviceID, status, integration)
1✔
280

281
                case model.ProviderIoTCore:
1✔
282
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
283
                        if err != nil {
1✔
284
                                break // switch
×
285
                        } else if !ok {
1✔
286
                                continue // loop
×
287
                        }
288
                        err = a.setDeviceStatusIoTCore(ctx, deviceID, status, integration)
1✔
289

290
                case model.ProviderWebhook:
6✔
291
                        var (
6✔
292
                                req *http.Request
6✔
293
                                rsp *http.Response
6✔
294
                        )
6✔
295
                        req, err = client.NewWebhookRequest(ctx,
6✔
296
                                &integration.Credentials,
6✔
297
                                event.WebhookEvent)
6✔
298
                        if err != nil {
7✔
299
                                break // switch
1✔
300
                        }
301
                        rsp, err = a.httpClient.Do(req)
5✔
302
                        if err != nil {
7✔
303
                                break // switch
2✔
304
                        }
305
                        deliver.StatusCode = &rsp.StatusCode
3✔
306
                        if rsp.StatusCode >= 300 {
4✔
307
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
308
                        }
1✔
309
                        _ = rsp.Body.Close()
3✔
310

311
                default:
1✔
312
                        continue
1✔
313
                }
314
                if err != nil {
12✔
315
                        var httpError client.HTTPError
4✔
316
                        if errors.As(err, &httpError) {
5✔
317
                                errCode := httpError.Code()
1✔
318
                                deliver.StatusCode = &errCode
1✔
319
                        }
1✔
320
                        deliver.Success = false
4✔
321
                        deliver.Error = err.Error()
4✔
322
                }
323
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
8✔
324
        }
325
        err = a.store.SaveEvent(ctx, event)
6✔
326
        return err
6✔
327
}
328

329
func (a *app) ProvisionDevice(
330
        ctx context.Context,
331
        device model.DeviceEvent,
332
) error {
8✔
333
        go func() {
16✔
334
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
8✔
335
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
8✔
336
                defer cancel()
8✔
337
                runAndLogError(ctxWithTimeout, func() error {
16✔
338
                        return a.provisionDevice(ctxWithTimeout, device)
8✔
339
                })
8✔
340
        }()
341
        return nil
8✔
342
}
343

344
func (a *app) provisionDevice(
345
        ctx context.Context,
346
        device model.DeviceEvent,
347
) error {
8✔
348
        integrations, err := a.GetIntegrations(ctx)
8✔
349
        if err != nil {
10✔
350
                if errors.Is(err, store.ErrObjectNotFound) {
3✔
351
                        return nil
1✔
352
                }
1✔
353
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
354
        }
355
        event := model.Event{
6✔
356
                WebhookEvent: model.WebhookEvent{
6✔
357
                        ID:      uuid.New(),
6✔
358
                        Type:    model.EventTypeDeviceProvisioned,
6✔
359
                        Data:    device,
6✔
360
                        EventTS: time.Now(),
6✔
361
                },
6✔
362
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
6✔
363
        }
6✔
364
        integrationIDs := make([]uuid.UUID, 0, len(integrations))
6✔
365
        for _, integration := range integrations {
12✔
366
                deliver := model.DeliveryStatus{
6✔
367
                        IntegrationID: integration.ID,
6✔
368
                        Success:       true,
6✔
369
                }
6✔
370
                switch integration.Provider {
6✔
371
                case model.ProviderIoTHub:
×
372
                        err = a.provisionIoTHubDevice(ctx, device.ID, integration)
×
373
                        integrationIDs = append(integrationIDs, integration.ID)
×
374
                case model.ProviderIoTCore:
×
375
                        err = a.provisionIoTCoreDevice(ctx, device.ID, integration, &iotcore.Device{
×
376
                                Status: iotcore.StatusEnabled,
×
377
                        })
×
378
                        integrationIDs = append(integrationIDs, integration.ID)
×
379
                case model.ProviderWebhook:
5✔
380
                        var (
5✔
381
                                req *http.Request
5✔
382
                                rsp *http.Response
5✔
383
                        )
5✔
384
                        req, err = client.NewWebhookRequest(ctx,
5✔
385
                                &integration.Credentials,
5✔
386
                                event.WebhookEvent)
5✔
387
                        if err != nil {
6✔
388
                                break // switch
1✔
389
                        }
390
                        rsp, err = a.httpClient.Do(req)
4✔
391
                        if err != nil {
5✔
392
                                break // switch
1✔
393
                        }
394
                        deliver.StatusCode = &rsp.StatusCode
3✔
395
                        if rsp.StatusCode >= 300 {
4✔
396
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
397
                        }
1✔
398
                        _ = rsp.Body.Close()
3✔
399

400
                default:
1✔
401
                        continue
1✔
402
                }
403
                if err != nil {
8✔
404
                        var httpError client.HTTPError
3✔
405
                        if errors.As(err, &httpError) {
4✔
406
                                errCode := httpError.Code()
1✔
407
                                deliver.StatusCode = &errCode
1✔
408
                        }
1✔
409
                        deliver.Success = false
3✔
410
                        deliver.Error = err.Error()
3✔
411
                }
412
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
5✔
413
        }
414
        _, err = a.store.UpsertDeviceIntegrations(ctx, device.ID, integrationIDs)
6✔
415
        if err != nil {
7✔
416
                var statusCodeInternal = 1500
1✔
417
                for i := range event.DeliveryStatus {
1✔
418
                        stat := &event.DeliveryStatus[i]
×
419
                        if stat.Error == "" {
×
420
                                stat.Error = "failed to connect device to integration"
×
421
                                stat.StatusCode = &statusCodeInternal
×
422
                        }
×
423
                        stat.Success = false
×
424
                }
425
                // Use 'panic' field to log potential data inconsistency
426
                log.FromContext(ctx).
1✔
427
                        WithField("panic", err.Error()).
1✔
428
                        Error("failed to connect device integration")
1✔
429
        }
430
        err = a.store.SaveEvent(ctx, event)
6✔
431
        return err
6✔
432
}
433

434
func (a *app) syncBatch(
435
        ctx context.Context,
436
        devices []model.Device,
437
        integCache map[uuid.UUID]*model.Integration,
438
        failEarly bool,
439
) error {
6✔
440
        var err error
6✔
441
        l := log.FromContext(ctx)
6✔
442

6✔
443
        deviceMap := make(map[uuid.UUID][]string, len(integCache))
6✔
444
        for _, dev := range devices {
66✔
445
                for _, id := range dev.IntegrationIDs {
120✔
446
                        deviceMap[id] = append(deviceMap[id], dev.ID)
60✔
447
                }
60✔
448
        }
449

450
        for integID, deviceIDs := range deviceMap {
12✔
451
                integration, ok := integCache[integID]
6✔
452
                if !ok {
6✔
453
                        // (Data race) Try again to fetch the integration
×
454
                        integration, err = a.store.GetIntegrationById(ctx, integID)
×
455
                        if err != nil {
×
456
                                if err == store.ErrObjectNotFound {
×
457
                                        integCache[integID] = nil
×
458
                                        continue
×
459
                                }
460
                                err = errors.Wrap(err, "failed to retrieve device integration")
×
461
                                if failEarly {
×
462
                                        return err
×
463
                                }
×
464
                                l.Errorf("failed to get device integration: %s", err)
×
465
                                continue
×
466
                        } else {
×
467
                                integCache[integID] = integration
×
468
                        }
×
469
                }
470
                if integration == nil {
6✔
471
                        // Should not occur, but is not impossible since mongo client
×
472
                        // caches batches of results.
×
473
                        _, err := a.store.RemoveDevicesFromIntegration(ctx, integID)
×
474
                        if err != nil {
×
475
                                err = errors.Wrap(err, "failed to remove integration from devices")
×
476
                                if failEarly {
×
477
                                        return err
×
478
                                }
×
479
                                l.Error(err)
×
480
                        }
481
                        continue
×
482
                }
483

484
                switch integration.Provider {
6✔
485
                case model.ProviderIoTHub:
3✔
486
                        err := a.syncIoTHubDevices(ctx, deviceIDs, *integration, failEarly)
3✔
487
                        if err != nil {
3✔
488
                                if failEarly {
×
489
                                        return err
×
490
                                }
×
491
                                l.Error(err)
×
492
                        }
493
                case model.ProviderIoTCore:
3✔
494
                        err := a.syncIoTCoreDevices(ctx, deviceIDs, *integration, failEarly)
3✔
495
                        if err != nil {
3✔
496
                                if failEarly {
×
497
                                        return err
×
498
                                }
×
499
                                l.Error(err)
×
500
                        }
501
                default:
×
502
                }
503
        }
504

505
        return nil
6✔
506
}
507

508
func (a app) syncCacheIntegrations(ctx context.Context) (map[uuid.UUID]*model.Integration, error) {
6✔
509
        // NOTE At the time of writing this, we don't allow more than one
6✔
510
        //      integration per tenant so this const doesn't matter.
6✔
511
        // TODO Will we need a more sophisticated cache data structure?
6✔
512
        const MaxIntegrationsToCache = 20
6✔
513
        // Cache integrations for the given tenant
6✔
514
        integrations, err := a.store.GetIntegrations(
6✔
515
                ctx, model.IntegrationFilter{Limit: MaxIntegrationsToCache},
6✔
516
        )
6✔
517
        if err != nil {
6✔
518
                return nil, errors.Wrap(err, "failed to get integrations for tenant")
×
519
        }
×
520
        integCache := make(map[uuid.UUID]*model.Integration, len(integrations))
6✔
521
        for i := range integrations {
10✔
522
                integCache[integrations[i].ID] = &integrations[i]
4✔
523
        }
4✔
524
        return integCache, nil
6✔
525
}
526

527
func (a *app) SyncDevices(
528
        ctx context.Context,
529
        batchSize int,
530
        failEarly bool,
531
) error {
2✔
532
        type DeviceWithTenantID struct {
2✔
533
                model.Device `bson:",inline"`
2✔
534
                TenantID     string `bson:"tenant_id"`
2✔
535
        }
2✔
536
        iter, err := a.store.GetAllDevices(ctx)
2✔
537
        if err != nil {
2✔
538
                return err
×
539
        }
×
540
        defer iter.Close(ctx)
2✔
541

2✔
542
        var (
2✔
543
                deviceBatch        = make([]model.Device, 0, batchSize)
2✔
544
                tenantID    string = ""
2✔
545
                integCache  map[uuid.UUID]*model.Integration
2✔
546
        )
2✔
547
        tCtx := identity.WithContext(ctx, &identity.Identity{
2✔
548
                Tenant: tenantID,
2✔
549
        })
2✔
550
        integCache, err = a.syncCacheIntegrations(tCtx)
2✔
551
        if err != nil {
2✔
552
                return err
×
553
        }
×
554
        for iter.Next(ctx) {
62✔
555
                dev := DeviceWithTenantID{}
60✔
556
                err := iter.Decode(&dev)
60✔
557
                if err != nil {
60✔
558
                        return err
×
559
                }
×
560
                if len(deviceBatch) == cap(deviceBatch) ||
60✔
561
                        (tenantID != dev.TenantID && len(deviceBatch) > 0) {
64✔
562
                        err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
4✔
563
                        if err != nil {
4✔
564
                                return err
×
565
                        }
×
566
                        deviceBatch = deviceBatch[:0]
4✔
567
                }
568
                if tenantID != dev.TenantID {
64✔
569
                        tenantID = dev.TenantID
4✔
570
                        tCtx = identity.WithContext(ctx, &identity.Identity{
4✔
571
                                Tenant: tenantID,
4✔
572
                        })
4✔
573

4✔
574
                        integCache, err = a.syncCacheIntegrations(tCtx)
4✔
575
                        if err != nil {
4✔
576
                                return err
×
577
                        }
×
578

579
                }
580
                deviceBatch = append(deviceBatch, dev.Device)
60✔
581
        }
582
        if len(deviceBatch) > 0 {
4✔
583
                err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
2✔
584
                if err != nil {
2✔
585
                        return err
×
586
                }
×
587
        }
588
        return nil
2✔
589
}
590

591
func (a *app) DecommissionDevice(ctx context.Context, deviceID string) error {
7✔
592
        go func() {
14✔
593
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
7✔
594
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
7✔
595
                defer cancel()
7✔
596
                runAndLogError(ctxWithTimeout, func() error {
14✔
597
                        return a.decommissionDevice(ctxWithTimeout, deviceID)
7✔
598
                })
7✔
599
        }()
600
        return nil
7✔
601
}
602

603
func (a *app) decommissionDevice(ctx context.Context, deviceID string) error {
19✔
604
        integrations, err := a.GetIntegrations(ctx)
19✔
605
        if err != nil {
22✔
606
                return err
3✔
607
        }
3✔
608
        var (
16✔
609
                device = newDevice(deviceID, a.store)
16✔
610
        )
16✔
611
        event := model.Event{
16✔
612
                WebhookEvent: model.WebhookEvent{
16✔
613
                        ID:   uuid.New(),
16✔
614
                        Type: model.EventTypeDeviceDecommissioned,
16✔
615
                        Data: model.DeviceEvent{
16✔
616
                                ID: deviceID,
16✔
617
                        },
16✔
618
                        EventTS: time.Now(),
16✔
619
                },
16✔
620
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
16✔
621
        }
16✔
622
        for _, integration := range integrations {
35✔
623
                var (
19✔
624
                        err error
19✔
625
                        ok  bool
19✔
626
                )
19✔
627
                deliver := model.DeliveryStatus{
19✔
628
                        IntegrationID: integration.ID,
19✔
629
                        Success:       true,
19✔
630
                }
19✔
631
                switch integration.Provider {
19✔
632
                case model.ProviderIoTHub:
5✔
633
                        ok, err = device.HasIntegration(ctx, integration.ID)
5✔
634
                        if err != nil {
5✔
635
                                break // switch
×
636
                        } else if !ok {
6✔
637
                                continue // loop
1✔
638
                        }
639
                        err = a.decommissionIoTHubDevice(ctx, deviceID, integration)
4✔
640
                case model.ProviderIoTCore:
7✔
641
                        ok, err = device.HasIntegration(ctx, integration.ID)
7✔
642
                        if err != nil {
7✔
643
                                break // switch
×
644
                        } else if !ok {
7✔
645
                                continue // loop
×
646
                        }
647
                        err = a.decommissionIoTCoreDevice(ctx, deviceID, integration)
7✔
648
                case model.ProviderWebhook:
6✔
649
                        var (
6✔
650
                                req *http.Request
6✔
651
                                rsp *http.Response
6✔
652
                        )
6✔
653
                        req, err = client.NewWebhookRequest(ctx,
6✔
654
                                &integration.Credentials,
6✔
655
                                event.WebhookEvent)
6✔
656
                        if err != nil {
7✔
657
                                break // switch
1✔
658
                        }
659
                        rsp, err = a.httpClient.Do(req)
5✔
660
                        if err != nil {
7✔
661
                                break // switch
2✔
662
                        }
663
                        deliver.StatusCode = &rsp.StatusCode
3✔
664
                        if rsp.StatusCode >= 300 {
4✔
665
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
666
                        }
1✔
667
                        _ = rsp.Body.Close()
3✔
668

669
                default:
1✔
670
                        continue
1✔
671
                }
672
                if err != nil {
25✔
673
                        var httpError client.HTTPError
8✔
674
                        if errors.As(err, &httpError) {
9✔
675
                                errCode := httpError.Code()
1✔
676
                                deliver.StatusCode = &errCode
1✔
677
                        }
1✔
678
                        deliver.Success = false
8✔
679
                        deliver.Error = err.Error()
8✔
680
                }
681
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
17✔
682
        }
683
        err = a.store.DeleteDevice(ctx, deviceID)
16✔
684
        if errors.Is(err, store.ErrObjectNotFound) {
19✔
685
                err = nil
3✔
686
        }
3✔
687
        if err != nil {
17✔
688
                // Add the `panic` keyword field so we can trace database inconsistencies
1✔
689
                log.FromContext(ctx).
1✔
690
                        WithField("panic", err.Error()).
1✔
691
                        Errorf("failed to remove device from database: %s", err.Error())
1✔
692
        }
1✔
693
        err = a.store.SaveEvent(ctx, event)
16✔
694
        return err
16✔
695
}
696

697
func (a *app) GetDevice(ctx context.Context, deviceID string) (*model.Device, error) {
3✔
698
        device, err := a.store.GetDevice(ctx, deviceID)
3✔
699
        if err == store.ErrObjectNotFound {
4✔
700
                return nil, ErrDeviceNotFound
1✔
701
        }
1✔
702
        return device, err
2✔
703
}
704

705
func (a *app) GetDeviceStateIntegration(
706
        ctx context.Context,
707
        deviceID string,
708
        integrationID uuid.UUID,
709
) (*model.DeviceState, error) {
7✔
710
        _, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
7✔
711
        if err != nil {
9✔
712
                if err == store.ErrObjectNotFound {
3✔
713
                        return nil, ErrIntegrationNotFound
1✔
714
                }
1✔
715
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
716
        }
717
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
5✔
718
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
6✔
719
                return nil, ErrIntegrationNotFound
1✔
720
        } else if err != nil {
6✔
721
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
722
        }
1✔
723
        switch integration.Provider {
3✔
724
        case model.ProviderIoTHub:
1✔
725
                return a.GetDeviceStateIoTHub(ctx, deviceID, integration)
1✔
726
        case model.ProviderIoTCore:
1✔
727
                return a.GetDeviceStateIoTCore(ctx, deviceID, integration)
1✔
728
        default:
1✔
729
                return nil, ErrUnknownIntegration
1✔
730
        }
731
}
732

733
func (a *app) SetDeviceStateIntegration(
734
        ctx context.Context,
735
        deviceID string,
736
        integrationID uuid.UUID,
737
        state *model.DeviceState,
738
) (*model.DeviceState, error) {
6✔
739
        device, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
6✔
740
        if err != nil {
7✔
741
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
742
        } else if device == nil {
7✔
743
                return nil, ErrIntegrationNotFound
1✔
744
        }
1✔
745
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
4✔
746
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
5✔
747
                return nil, ErrIntegrationNotFound
1✔
748
        } else if err != nil {
5✔
749
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
750
        }
1✔
751
        switch integration.Provider {
2✔
752
        case model.ProviderIoTHub:
1✔
753
                return a.SetDeviceStateIoTHub(ctx, deviceID, integration, state)
1✔
754
        case model.ProviderIoTCore:
×
755
                return a.SetDeviceStateIoTCore(ctx, deviceID, integration, state)
×
756
        default:
1✔
757
                return nil, ErrUnknownIntegration
1✔
758
        }
759
}
760

761
func (a *app) GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error) {
4✔
762
        return a.store.GetEvents(ctx, filter)
4✔
763
}
4✔
764

765
func runAndLogError(ctx context.Context, f func() error) {
24✔
766
        var err error
24✔
767
        logger := log.FromContext(ctx)
24✔
768
        defer func() {
48✔
769
                if r := recover(); r != nil {
24✔
770
                        logger.WithField("panic", r).
×
771
                                Error(errors.Wrap(err, "panic processing asynchronous webhook"))
×
772
                } else if err != nil {
29✔
773
                        logger.WithField("error", err.Error()).
5✔
774
                                Error(errors.Wrap(err, "failed to process an asynchronous webhook"))
5✔
775
                }
5✔
776
        }()
777
        err = f()
24✔
778
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc