• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / iot-manager / 1401702106

05 Aug 2024 08:32PM UTC coverage: 87.577%. Remained the same
1401702106

push

gitlab-ci

web-flow
Merge pull request #295 from mendersoftware/dependabot/docker/docker-dependencies-03b04ac819

chore: bump golang from 1.22.4-alpine3.19 to 1.22.5-alpine3.19 in the docker-dependencies group

3264 of 3727 relevant lines covered (87.58%)

11.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.43
/app/app.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package app
16

17
import (
18
        "context"
19
        "net/http"
20
        "time"
21

22
        "github.com/google/uuid"
23
        "github.com/pkg/errors"
24

25
        "github.com/mendersoftware/go-lib-micro/identity"
26
        "github.com/mendersoftware/go-lib-micro/log"
27

28
        "github.com/mendersoftware/iot-manager/client"
29
        "github.com/mendersoftware/iot-manager/client/devauth"
30
        "github.com/mendersoftware/iot-manager/client/iotcore"
31
        "github.com/mendersoftware/iot-manager/client/iothub"
32
        "github.com/mendersoftware/iot-manager/client/workflows"
33
        "github.com/mendersoftware/iot-manager/model"
34
        "github.com/mendersoftware/iot-manager/store"
35
)
36

37
var (
38
        ErrIntegrationNotFound = errors.New("integration not found")
39
        ErrIntegrationExists   = errors.New("integration already exists")
40
        ErrUnknownIntegration  = errors.New("unknown integration provider")
41
        ErrNoCredentials       = errors.New("no connection string or credentials " +
42
                "configured for the tenant")
43
        ErrNoDeviceConnectionString = errors.New("device has no connection string")
44

45
        ErrDeviceAlreadyExists     = errors.New("device already exists")
46
        ErrDeviceNotFound          = errors.New("device not found")
47
        ErrDeviceStateConflict     = errors.New("conflict when updating the device state")
48
        ErrCannotRemoveIntegration = errors.New("cannot remove integration in use by devices")
49
)
50

51
const (
52
        confKeyPrimaryKey     = "azureConnectionString"
53
        confKeyAWSCertificate = "awsCertificate"
54
        confKeyAWSPrivateKey  = "awsPrivateKey"
55
        confKeyAWSEndpoint    = "awsEndpoint"
56
)
57

58
// App interface describes app objects
59
//
60
//nolint:lll
61
//go:generate ../utils/mockgen.sh
62
type App interface {
63
        WithIoTCore(client iotcore.Client) App
64
        WithIoTHub(client iothub.Client) App
65
        WithWebhooksTimeout(timeout uint) App
66
        HealthCheck(context.Context) error
67
        GetDeviceIntegrations(context.Context, string) ([]model.Integration, error)
68
        GetIntegrations(context.Context) ([]model.Integration, error)
69
        GetIntegrationById(context.Context, uuid.UUID) (*model.Integration, error)
70
        CreateIntegration(context.Context, model.Integration) (*model.Integration, error)
71
        SetDeviceStatus(context.Context, string, model.Status) error
72
        SetIntegrationCredentials(context.Context, uuid.UUID, model.Credentials) error
73
        RemoveIntegration(context.Context, uuid.UUID) error
74
        GetDevice(context.Context, string) (*model.Device, error)
75
        GetDeviceStateIntegration(context.Context, string, uuid.UUID) (*model.DeviceState, error)
76
        SetDeviceStateIntegration(context.Context, string, uuid.UUID, *model.DeviceState) (*model.DeviceState, error)
77
        GetDeviceStateIoTHub(context.Context, string, *model.Integration) (*model.DeviceState, error)
78
        SetDeviceStateIoTHub(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
79
        GetDeviceStateIoTCore(context.Context, string, *model.Integration) (*model.DeviceState, error)
80
        SetDeviceStateIoTCore(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
81
        ProvisionDevice(context.Context, model.DeviceEvent) error
82
        DeleteTenant(context.Context) error
83
        DecommissionDevice(context.Context, string) error
84

85
        SyncDevices(context.Context, int, bool) error
86

87
        GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error)
88
        VerifyDeviceTwin(ctx context.Context, req model.PreauthRequest) error
89
}
90

91
// app is an app object
92
type app struct {
93
        store           store.DataStore
94
        iothubClient    iothub.Client
95
        iotcoreClient   iotcore.Client
96
        wf              workflows.Client
97
        devauth         devauth.Client
98
        httpClient      *http.Client
99
        webhooksTimeout time.Duration
100
}
101

102
// NewApp initialize a new iot-manager App
103
func New(ds store.DataStore, wf workflows.Client, da devauth.Client) App {
96✔
104
        c := client.New()
96✔
105
        hubClient := iothub.NewClient(
96✔
106
                iothub.NewOptions().SetClient(c),
96✔
107
        )
96✔
108
        return &app{
96✔
109
                store:        ds,
96✔
110
                wf:           wf,
96✔
111
                devauth:      da,
96✔
112
                iothubClient: hubClient,
96✔
113
                httpClient:   c,
96✔
114
        }
96✔
115
}
96✔
116

117
// WithIoTHub sets the IoT Hub client
118
func (a *app) WithIoTHub(client iothub.Client) App {
34✔
119
        a.iothubClient = client
34✔
120
        return a
34✔
121
}
34✔
122

123
// WithIoTCore sets the IoT Core client
124
func (a *app) WithIoTCore(client iotcore.Client) App {
27✔
125
        a.iotcoreClient = client
27✔
126
        return a
27✔
127
}
27✔
128

129
// WithWebhooksTimeout sets the timeout for webhooks requests
130
func (a *app) WithWebhooksTimeout(timeout uint) App {
4✔
131
        a.webhooksTimeout = time.Duration(timeout * uint(time.Second))
4✔
132
        return a
4✔
133
}
4✔
134

135
// HealthCheck performs a health check and returns an error if it fails
136
func (a *app) HealthCheck(ctx context.Context) error {
2✔
137
        return a.store.Ping(ctx)
2✔
138
}
2✔
139

140
func (a *app) GetIntegrations(ctx context.Context) ([]model.Integration, error) {
37✔
141
        return a.store.GetIntegrations(ctx, model.IntegrationFilter{})
37✔
142
}
37✔
143

144
func (a *app) GetIntegrationById(ctx context.Context, id uuid.UUID) (*model.Integration, error) {
3✔
145
        integration, err := a.store.GetIntegrationById(ctx, id)
3✔
146
        if err != nil {
5✔
147
                switch cause := errors.Cause(err); cause {
2✔
148
                case store.ErrObjectNotFound:
1✔
149
                        return nil, ErrIntegrationNotFound
1✔
150
                default:
1✔
151
                        return nil, err
1✔
152
                }
153
        }
154
        return integration, err
1✔
155
}
156

157
func (a *app) CreateIntegration(
158
        ctx context.Context,
159
        integration model.Integration,
160
) (*model.Integration, error) {
10✔
161
        result, err := a.store.CreateIntegration(ctx, integration)
10✔
162
        if err == store.ErrObjectExists {
10✔
163
                return nil, ErrIntegrationExists
×
164
        }
×
165
        return result, err
10✔
166
}
167

168
func (a *app) SetIntegrationCredentials(
169
        ctx context.Context,
170
        integrationID uuid.UUID,
171
        credentials model.Credentials,
172
) error {
3✔
173
        err := a.store.SetIntegrationCredentials(ctx, integrationID, credentials)
3✔
174
        if err != nil {
5✔
175
                switch cause := errors.Cause(err); cause {
2✔
176
                case store.ErrObjectNotFound:
1✔
177
                        return ErrIntegrationNotFound
1✔
178
                default:
1✔
179
                        return err
1✔
180
                }
181
        }
182
        return err
1✔
183
}
184

185
func (a *app) RemoveIntegration(
186
        ctx context.Context,
187
        integrationID uuid.UUID,
188
) error {
5✔
189
        itg, err := a.store.GetIntegrationById(ctx, integrationID)
5✔
190
        if err == nil {
9✔
191
                if itg.Provider != model.ProviderWebhook {
8✔
192
                        // check if there are any devices with given integration enabled
4✔
193
                        devicesExist, err := a.store.
4✔
194
                                DoDevicesExistByIntegrationID(ctx, integrationID)
4✔
195
                        if err != nil {
5✔
196
                                return err
1✔
197
                        }
1✔
198
                        if devicesExist {
4✔
199
                                return ErrCannotRemoveIntegration
1✔
200
                        }
1✔
201
                }
202
                err = a.store.RemoveIntegration(ctx, integrationID)
2✔
203
        }
204
        if errors.Is(err, store.ErrObjectNotFound) {
4✔
205
                return ErrIntegrationNotFound
1✔
206
        }
1✔
207
        return err
2✔
208
}
209

210
func (a *app) GetDeviceIntegrations(
211
        ctx context.Context,
212
        deviceID string,
213
) ([]model.Integration, error) {
×
214
        device, err := a.store.GetDevice(ctx, deviceID)
×
215
        if err != nil {
×
216
                if err == store.ErrObjectNotFound {
×
217
                        return nil, ErrDeviceNotFound
×
218
                }
×
219
                return nil, errors.Wrap(err, "app: failed to get device integrations")
×
220
        }
221
        if len(device.IntegrationIDs) > 0 {
×
222
                integrations, err := a.store.GetIntegrations(ctx,
×
223
                        model.IntegrationFilter{IDs: device.IntegrationIDs},
×
224
                )
×
225
                return integrations, errors.Wrap(err, "app: failed to get device integrations")
×
226
        }
×
227
        return []model.Integration{}, nil
×
228
}
229

230
func (a *app) SetDeviceStatus(ctx context.Context, deviceID string, status model.Status) error {
8✔
231
        go func() {
16✔
232
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
8✔
233
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
8✔
234
                defer cancel()
8✔
235
                runAndLogError(ctxWithTimeout, func() error {
16✔
236
                        return a.setDeviceStatus(ctxWithTimeout, deviceID, status)
8✔
237
                })
8✔
238
        }()
239
        return nil
8✔
240
}
241

242
func (a *app) setDeviceStatus(ctx context.Context, deviceID string, status model.Status) error {
8✔
243
        integrations, err := a.store.GetIntegrations(ctx, model.IntegrationFilter{})
8✔
244
        if err != nil {
10✔
245
                if errors.Is(err, store.ErrObjectNotFound) {
3✔
246
                        return nil
1✔
247
                }
1✔
248
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
249
        }
250
        event := model.Event{
6✔
251
                WebhookEvent: model.WebhookEvent{
6✔
252
                        ID:   uuid.New(),
6✔
253
                        Type: model.EventTypeDeviceStatusChanged,
6✔
254
                        Data: model.DeviceEvent{
6✔
255
                                ID:     deviceID,
6✔
256
                                Status: status,
6✔
257
                        },
6✔
258
                        EventTS: time.Now(),
6✔
259
                },
6✔
260
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
6✔
261
        }
6✔
262

6✔
263
        var (
6✔
264
                ok     bool
6✔
265
                device = newDevice(deviceID, a.store)
6✔
266
        )
6✔
267
        for _, integration := range integrations {
15✔
268
                deliver := model.DeliveryStatus{
9✔
269
                        IntegrationID: integration.ID,
9✔
270
                        Success:       true,
9✔
271
                }
9✔
272
                switch integration.Provider {
9✔
273
                case model.ProviderIoTHub:
1✔
274
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
275
                        if err != nil {
1✔
276
                                break // switch
×
277
                        } else if !ok {
1✔
278
                                continue // loop
×
279
                        }
280
                        err = a.setDeviceStatusIoTHub(ctx, deviceID, status, integration)
1✔
281

282
                case model.ProviderIoTCore:
1✔
283
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
284
                        if err != nil {
1✔
285
                                break // switch
×
286
                        } else if !ok {
1✔
287
                                continue // loop
×
288
                        }
289
                        err = a.setDeviceStatusIoTCore(ctx, deviceID, status, integration)
1✔
290

291
                case model.ProviderWebhook:
6✔
292
                        var (
6✔
293
                                req *http.Request
6✔
294
                                rsp *http.Response
6✔
295
                        )
6✔
296
                        req, err = client.NewWebhookRequest(ctx,
6✔
297
                                &integration.Credentials,
6✔
298
                                event.WebhookEvent)
6✔
299
                        if err != nil {
7✔
300
                                break // switch
1✔
301
                        }
302
                        rsp, err = a.httpClient.Do(req)
5✔
303
                        if err != nil {
7✔
304
                                break // switch
2✔
305
                        }
306
                        deliver.StatusCode = &rsp.StatusCode
3✔
307
                        if rsp.StatusCode >= 300 {
4✔
308
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
309
                        }
1✔
310
                        _ = rsp.Body.Close()
3✔
311

312
                default:
1✔
313
                        continue
1✔
314
                }
315
                if err != nil {
12✔
316
                        var httpError client.HTTPError
4✔
317
                        if errors.As(err, &httpError) {
5✔
318
                                errCode := httpError.Code()
1✔
319
                                deliver.StatusCode = &errCode
1✔
320
                        }
1✔
321
                        deliver.Success = false
4✔
322
                        deliver.Error = err.Error()
4✔
323
                }
324
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
8✔
325
        }
326
        err = a.store.SaveEvent(ctx, event)
6✔
327
        return err
6✔
328
}
329

330
func (a *app) DeleteTenant(
331
        ctx context.Context,
332
) error {
2✔
333
        return a.store.DeleteTenantData(ctx)
2✔
334
}
2✔
335

336
func (a *app) ProvisionDevice(
337
        ctx context.Context,
338
        device model.DeviceEvent,
339
) error {
8✔
340
        go func() {
16✔
341
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
8✔
342
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
8✔
343
                defer cancel()
8✔
344
                runAndLogError(ctxWithTimeout, func() error {
16✔
345
                        return a.provisionDevice(ctxWithTimeout, device)
8✔
346
                })
8✔
347
        }()
348
        return nil
8✔
349
}
350

351
func (a *app) provisionDevice(
352
        ctx context.Context,
353
        device model.DeviceEvent,
354
) error {
8✔
355
        integrations, err := a.GetIntegrations(ctx)
8✔
356
        if err != nil {
10✔
357
                if errors.Is(err, store.ErrObjectNotFound) {
3✔
358
                        return nil
1✔
359
                }
1✔
360
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
361
        }
362
        event := model.Event{
6✔
363
                WebhookEvent: model.WebhookEvent{
6✔
364
                        ID:      uuid.New(),
6✔
365
                        Type:    model.EventTypeDeviceProvisioned,
6✔
366
                        Data:    device,
6✔
367
                        EventTS: time.Now(),
6✔
368
                },
6✔
369
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
6✔
370
        }
6✔
371
        integrationIDs := make([]uuid.UUID, 0, len(integrations))
6✔
372
        for _, integration := range integrations {
12✔
373
                deliver := model.DeliveryStatus{
6✔
374
                        IntegrationID: integration.ID,
6✔
375
                        Success:       true,
6✔
376
                }
6✔
377
                switch integration.Provider {
6✔
378
                case model.ProviderIoTHub:
×
379
                        err = a.provisionIoTHubDevice(ctx, device.ID, integration)
×
380
                        integrationIDs = append(integrationIDs, integration.ID)
×
381
                case model.ProviderIoTCore:
×
382
                        err = a.provisionIoTCoreDevice(ctx, device.ID, integration, &iotcore.Device{
×
383
                                Status: iotcore.StatusEnabled,
×
384
                        })
×
385
                        integrationIDs = append(integrationIDs, integration.ID)
×
386
                case model.ProviderWebhook:
5✔
387
                        var (
5✔
388
                                req *http.Request
5✔
389
                                rsp *http.Response
5✔
390
                        )
5✔
391
                        req, err = client.NewWebhookRequest(ctx,
5✔
392
                                &integration.Credentials,
5✔
393
                                event.WebhookEvent)
5✔
394
                        if err != nil {
6✔
395
                                break // switch
1✔
396
                        }
397
                        rsp, err = a.httpClient.Do(req)
4✔
398
                        if err != nil {
5✔
399
                                break // switch
1✔
400
                        }
401
                        deliver.StatusCode = &rsp.StatusCode
3✔
402
                        if rsp.StatusCode >= 300 {
4✔
403
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
404
                        }
1✔
405
                        _ = rsp.Body.Close()
3✔
406

407
                default:
1✔
408
                        continue
1✔
409
                }
410
                if err != nil {
8✔
411
                        var httpError client.HTTPError
3✔
412
                        if errors.As(err, &httpError) {
4✔
413
                                errCode := httpError.Code()
1✔
414
                                deliver.StatusCode = &errCode
1✔
415
                        }
1✔
416
                        deliver.Success = false
3✔
417
                        deliver.Error = err.Error()
3✔
418
                }
419
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
5✔
420
        }
421
        _, err = a.store.UpsertDeviceIntegrations(ctx, device.ID, integrationIDs)
6✔
422
        if err != nil {
7✔
423
                var statusCodeInternal = 1500
1✔
424
                for i := range event.DeliveryStatus {
1✔
425
                        stat := &event.DeliveryStatus[i]
×
426
                        if stat.Error == "" {
×
427
                                stat.Error = "failed to connect device to integration"
×
428
                                stat.StatusCode = &statusCodeInternal
×
429
                        }
×
430
                        stat.Success = false
×
431
                }
432
                // Use 'panic' field to log potential data inconsistency
433
                log.FromContext(ctx).
1✔
434
                        WithField("panic", err.Error()).
1✔
435
                        Error("failed to connect device integration")
1✔
436
        }
437
        err = a.store.SaveEvent(ctx, event)
6✔
438
        return err
6✔
439
}
440

441
func (a *app) syncBatch(
442
        ctx context.Context,
443
        devices []model.Device,
444
        integCache map[uuid.UUID]*model.Integration,
445
        failEarly bool,
446
) error {
6✔
447
        var err error
6✔
448
        l := log.FromContext(ctx)
6✔
449

6✔
450
        deviceMap := make(map[uuid.UUID][]string, len(integCache))
6✔
451
        for _, dev := range devices {
66✔
452
                for _, id := range dev.IntegrationIDs {
120✔
453
                        deviceMap[id] = append(deviceMap[id], dev.ID)
60✔
454
                }
60✔
455
        }
456

457
        for integID, deviceIDs := range deviceMap {
12✔
458
                integration, ok := integCache[integID]
6✔
459
                if !ok {
6✔
460
                        // (Data race) Try again to fetch the integration
×
461
                        integration, err = a.store.GetIntegrationById(ctx, integID)
×
462
                        if err != nil {
×
463
                                if err == store.ErrObjectNotFound {
×
464
                                        integCache[integID] = nil
×
465
                                        continue
×
466
                                }
467
                                err = errors.Wrap(err, "failed to retrieve device integration")
×
468
                                if failEarly {
×
469
                                        return err
×
470
                                }
×
471
                                l.Errorf("failed to get device integration: %s", err)
×
472
                                continue
×
473
                        } else {
×
474
                                integCache[integID] = integration
×
475
                        }
×
476
                }
477
                if integration == nil {
6✔
478
                        // Should not occur, but is not impossible since mongo client
×
479
                        // caches batches of results.
×
480
                        _, err := a.store.RemoveDevicesFromIntegration(ctx, integID)
×
481
                        if err != nil {
×
482
                                err = errors.Wrap(err, "failed to remove integration from devices")
×
483
                                if failEarly {
×
484
                                        return err
×
485
                                }
×
486
                                l.Error(err)
×
487
                        }
488
                        continue
×
489
                }
490

491
                switch integration.Provider {
6✔
492
                case model.ProviderIoTHub:
3✔
493
                        err := a.syncIoTHubDevices(ctx, deviceIDs, *integration, failEarly)
3✔
494
                        if err != nil {
3✔
495
                                if failEarly {
×
496
                                        return err
×
497
                                }
×
498
                                l.Error(err)
×
499
                        }
500
                case model.ProviderIoTCore:
3✔
501
                        err := a.syncIoTCoreDevices(ctx, deviceIDs, *integration, failEarly)
3✔
502
                        if err != nil {
3✔
503
                                if failEarly {
×
504
                                        return err
×
505
                                }
×
506
                                l.Error(err)
×
507
                        }
508
                default:
×
509
                }
510
        }
511

512
        return nil
6✔
513
}
514

515
func (a app) syncCacheIntegrations(ctx context.Context) (map[uuid.UUID]*model.Integration, error) {
6✔
516
        // NOTE At the time of writing this, we don't allow more than one
6✔
517
        //      integration per tenant so this const doesn't matter.
6✔
518
        // TODO Will we need a more sophisticated cache data structure?
6✔
519
        const MaxIntegrationsToCache = 20
6✔
520
        // Cache integrations for the given tenant
6✔
521
        integrations, err := a.store.GetIntegrations(
6✔
522
                ctx, model.IntegrationFilter{Limit: MaxIntegrationsToCache},
6✔
523
        )
6✔
524
        if err != nil {
6✔
525
                return nil, errors.Wrap(err, "failed to get integrations for tenant")
×
526
        }
×
527
        integCache := make(map[uuid.UUID]*model.Integration, len(integrations))
6✔
528
        for i := range integrations {
10✔
529
                integCache[integrations[i].ID] = &integrations[i]
4✔
530
        }
4✔
531
        return integCache, nil
6✔
532
}
533

534
func (a *app) SyncDevices(
535
        ctx context.Context,
536
        batchSize int,
537
        failEarly bool,
538
) error {
2✔
539
        type DeviceWithTenantID struct {
2✔
540
                model.Device `bson:",inline"`
2✔
541
                TenantID     string `bson:"tenant_id"`
2✔
542
        }
2✔
543
        iter, err := a.store.GetAllDevices(ctx)
2✔
544
        if err != nil {
2✔
545
                return err
×
546
        }
×
547
        defer iter.Close(ctx)
2✔
548

2✔
549
        var (
2✔
550
                deviceBatch        = make([]model.Device, 0, batchSize)
2✔
551
                tenantID    string = ""
2✔
552
                integCache  map[uuid.UUID]*model.Integration
2✔
553
        )
2✔
554
        tCtx := identity.WithContext(ctx, &identity.Identity{
2✔
555
                Tenant: tenantID,
2✔
556
        })
2✔
557
        integCache, err = a.syncCacheIntegrations(tCtx)
2✔
558
        if err != nil {
2✔
559
                return err
×
560
        }
×
561
        for iter.Next(ctx) {
62✔
562
                dev := DeviceWithTenantID{}
60✔
563
                err := iter.Decode(&dev)
60✔
564
                if err != nil {
60✔
565
                        return err
×
566
                }
×
567
                if len(deviceBatch) == cap(deviceBatch) ||
60✔
568
                        (tenantID != dev.TenantID && len(deviceBatch) > 0) {
64✔
569
                        err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
4✔
570
                        if err != nil {
4✔
571
                                return err
×
572
                        }
×
573
                        deviceBatch = deviceBatch[:0]
4✔
574
                }
575
                if tenantID != dev.TenantID {
64✔
576
                        tenantID = dev.TenantID
4✔
577
                        tCtx = identity.WithContext(ctx, &identity.Identity{
4✔
578
                                Tenant: tenantID,
4✔
579
                        })
4✔
580

4✔
581
                        integCache, err = a.syncCacheIntegrations(tCtx)
4✔
582
                        if err != nil {
4✔
583
                                return err
×
584
                        }
×
585

586
                }
587
                deviceBatch = append(deviceBatch, dev.Device)
60✔
588
        }
589
        if len(deviceBatch) > 0 {
4✔
590
                err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
2✔
591
                if err != nil {
2✔
592
                        return err
×
593
                }
×
594
        }
595
        return nil
2✔
596
}
597

598
func (a *app) DecommissionDevice(ctx context.Context, deviceID string) error {
7✔
599
        go func() {
14✔
600
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
7✔
601
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
7✔
602
                defer cancel()
7✔
603
                runAndLogError(ctxWithTimeout, func() error {
14✔
604
                        return a.decommissionDevice(ctxWithTimeout, deviceID)
7✔
605
                })
7✔
606
        }()
607
        return nil
7✔
608
}
609

610
func (a *app) decommissionDevice(ctx context.Context, deviceID string) error {
19✔
611
        integrations, err := a.GetIntegrations(ctx)
19✔
612
        if err != nil {
22✔
613
                return err
3✔
614
        }
3✔
615
        var (
16✔
616
                device = newDevice(deviceID, a.store)
16✔
617
        )
16✔
618
        event := model.Event{
16✔
619
                WebhookEvent: model.WebhookEvent{
16✔
620
                        ID:   uuid.New(),
16✔
621
                        Type: model.EventTypeDeviceDecommissioned,
16✔
622
                        Data: model.DeviceEvent{
16✔
623
                                ID: deviceID,
16✔
624
                        },
16✔
625
                        EventTS: time.Now(),
16✔
626
                },
16✔
627
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
16✔
628
        }
16✔
629
        for _, integration := range integrations {
35✔
630
                var (
19✔
631
                        err error
19✔
632
                        ok  bool
19✔
633
                )
19✔
634
                deliver := model.DeliveryStatus{
19✔
635
                        IntegrationID: integration.ID,
19✔
636
                        Success:       true,
19✔
637
                }
19✔
638
                switch integration.Provider {
19✔
639
                case model.ProviderIoTHub:
5✔
640
                        ok, err = device.HasIntegration(ctx, integration.ID)
5✔
641
                        if err != nil {
5✔
642
                                break // switch
×
643
                        } else if !ok {
6✔
644
                                continue // loop
1✔
645
                        }
646
                        err = a.decommissionIoTHubDevice(ctx, deviceID, integration)
4✔
647
                case model.ProviderIoTCore:
7✔
648
                        ok, err = device.HasIntegration(ctx, integration.ID)
7✔
649
                        if err != nil {
7✔
650
                                break // switch
×
651
                        } else if !ok {
7✔
652
                                continue // loop
×
653
                        }
654
                        err = a.decommissionIoTCoreDevice(ctx, deviceID, integration)
7✔
655
                case model.ProviderWebhook:
6✔
656
                        var (
6✔
657
                                req *http.Request
6✔
658
                                rsp *http.Response
6✔
659
                        )
6✔
660
                        req, err = client.NewWebhookRequest(ctx,
6✔
661
                                &integration.Credentials,
6✔
662
                                event.WebhookEvent)
6✔
663
                        if err != nil {
7✔
664
                                break // switch
1✔
665
                        }
666
                        rsp, err = a.httpClient.Do(req)
5✔
667
                        if err != nil {
7✔
668
                                break // switch
2✔
669
                        }
670
                        deliver.StatusCode = &rsp.StatusCode
3✔
671
                        if rsp.StatusCode >= 300 {
4✔
672
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
673
                        }
1✔
674
                        _ = rsp.Body.Close()
3✔
675

676
                default:
1✔
677
                        continue
1✔
678
                }
679
                if err != nil {
25✔
680
                        var httpError client.HTTPError
8✔
681
                        if errors.As(err, &httpError) {
9✔
682
                                errCode := httpError.Code()
1✔
683
                                deliver.StatusCode = &errCode
1✔
684
                        }
1✔
685
                        deliver.Success = false
8✔
686
                        deliver.Error = err.Error()
8✔
687
                }
688
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
17✔
689
        }
690
        err = a.store.DeleteDevice(ctx, deviceID)
16✔
691
        if errors.Is(err, store.ErrObjectNotFound) {
19✔
692
                err = nil
3✔
693
        }
3✔
694
        if err != nil {
17✔
695
                // Add the `panic` keyword field so we can trace database inconsistencies
1✔
696
                log.FromContext(ctx).
1✔
697
                        WithField("panic", err.Error()).
1✔
698
                        Errorf("failed to remove device from database: %s", err.Error())
1✔
699
        }
1✔
700
        err = a.store.SaveEvent(ctx, event)
16✔
701
        return err
16✔
702
}
703

704
func (a *app) GetDevice(ctx context.Context, deviceID string) (*model.Device, error) {
3✔
705
        device, err := a.store.GetDevice(ctx, deviceID)
3✔
706
        if err == store.ErrObjectNotFound {
4✔
707
                return nil, ErrDeviceNotFound
1✔
708
        }
1✔
709
        return device, err
2✔
710
}
711

712
func (a *app) GetDeviceStateIntegration(
713
        ctx context.Context,
714
        deviceID string,
715
        integrationID uuid.UUID,
716
) (*model.DeviceState, error) {
7✔
717
        _, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
7✔
718
        if err != nil {
9✔
719
                if err == store.ErrObjectNotFound {
3✔
720
                        return nil, ErrIntegrationNotFound
1✔
721
                }
1✔
722
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
723
        }
724
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
5✔
725
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
6✔
726
                return nil, ErrIntegrationNotFound
1✔
727
        } else if err != nil {
6✔
728
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
729
        }
1✔
730
        switch integration.Provider {
3✔
731
        case model.ProviderIoTHub:
1✔
732
                return a.GetDeviceStateIoTHub(ctx, deviceID, integration)
1✔
733
        case model.ProviderIoTCore:
1✔
734
                return a.GetDeviceStateIoTCore(ctx, deviceID, integration)
1✔
735
        default:
1✔
736
                return nil, ErrUnknownIntegration
1✔
737
        }
738
}
739

740
func (a *app) SetDeviceStateIntegration(
741
        ctx context.Context,
742
        deviceID string,
743
        integrationID uuid.UUID,
744
        state *model.DeviceState,
745
) (*model.DeviceState, error) {
6✔
746
        device, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
6✔
747
        if err != nil {
7✔
748
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
749
        } else if device == nil {
7✔
750
                return nil, ErrIntegrationNotFound
1✔
751
        }
1✔
752
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
4✔
753
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
5✔
754
                return nil, ErrIntegrationNotFound
1✔
755
        } else if err != nil {
5✔
756
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
757
        }
1✔
758
        switch integration.Provider {
2✔
759
        case model.ProviderIoTHub:
1✔
760
                return a.SetDeviceStateIoTHub(ctx, deviceID, integration, state)
1✔
761
        case model.ProviderIoTCore:
×
762
                return a.SetDeviceStateIoTCore(ctx, deviceID, integration, state)
×
763
        default:
1✔
764
                return nil, ErrUnknownIntegration
1✔
765
        }
766
}
767

768
func (a *app) GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error) {
4✔
769
        return a.store.GetEvents(ctx, filter)
4✔
770
}
4✔
771

772
func runAndLogError(ctx context.Context, f func() error) {
24✔
773
        var err error
24✔
774
        logger := log.FromContext(ctx)
24✔
775
        defer func() {
48✔
776
                if r := recover(); r != nil {
24✔
777
                        logger.WithField("panic", r).
×
778
                                Error(errors.Wrap(err, "panic processing asynchronous webhook"))
×
779
                } else if err != nil {
29✔
780
                        logger.WithField("error", err.Error()).
5✔
781
                                Error(errors.Wrap(err, "failed to process an asynchronous webhook"))
5✔
782
                }
5✔
783
        }()
784
        err = f()
24✔
785
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc