• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1704099547

06 Mar 2025 04:08PM UTC coverage: 65.477% (-0.03%) from 65.502%
1704099547

push

gitlab-ci

web-flow
Merge pull request #506 from merlin-northern/men_7868_iot_manager_service_stores_deviceauth_related_events_even_if_no_integration_is_configured_

fix(iot-manager): do not store events if there are no integrations.

9 of 9 new or added lines in 1 file covered. (100.0%)

18 existing lines in 5 files now uncovered.

31645 of 48330 relevant lines covered (65.48%)

1.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.06
/backend/services/iot-manager/app/app.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package app
16

17
import (
18
        "context"
19
        "net/http"
20
        "time"
21

22
        "github.com/google/uuid"
23
        "github.com/pkg/errors"
24

25
        "github.com/mendersoftware/mender-server/pkg/identity"
26
        "github.com/mendersoftware/mender-server/pkg/log"
27

28
        "github.com/mendersoftware/mender-server/services/iot-manager/client"
29
        "github.com/mendersoftware/mender-server/services/iot-manager/client/devauth"
30
        "github.com/mendersoftware/mender-server/services/iot-manager/client/iotcore"
31
        "github.com/mendersoftware/mender-server/services/iot-manager/client/iothub"
32
        "github.com/mendersoftware/mender-server/services/iot-manager/client/workflows"
33
        "github.com/mendersoftware/mender-server/services/iot-manager/model"
34
        "github.com/mendersoftware/mender-server/services/iot-manager/store"
35
)
36

37
var (
38
        ErrIntegrationNotFound = errors.New("integration not found")
39
        ErrIntegrationExists   = errors.New("integration already exists")
40
        ErrUnknownIntegration  = errors.New("unknown integration provider")
41
        ErrNoCredentials       = errors.New("no connection string or credentials " +
42
                "configured for the tenant")
43
        ErrNoDeviceConnectionString = errors.New("device has no connection string")
44

45
        ErrDeviceAlreadyExists     = errors.New("device already exists")
46
        ErrDeviceNotFound          = errors.New("device not found")
47
        ErrDeviceStateConflict     = errors.New("conflict when updating the device state")
48
        ErrCannotRemoveIntegration = errors.New("cannot remove integration in use by devices")
49
)
50

51
const (
52
        confKeyPrimaryKey     = "azureConnectionString"
53
        confKeyAWSCertificate = "awsCertificate"
54
        confKeyAWSPrivateKey  = "awsPrivateKey"
55
        confKeyAWSEndpoint    = "awsEndpoint"
56
)
57

58
// App interface describes app objects
59
//
60
//nolint:lll
61
//go:generate ../../../utils/mockgen.sh
62
type App interface {
63
        WithIoTCore(client iotcore.Client) App
64
        WithIoTHub(client iothub.Client) App
65
        WithWebhooksTimeout(timeout uint) App
66
        HealthCheck(context.Context) error
67
        GetDeviceIntegrations(context.Context, string) ([]model.Integration, error)
68
        GetIntegrations(context.Context) ([]model.Integration, error)
69
        GetIntegrationById(context.Context, uuid.UUID) (*model.Integration, error)
70
        CreateIntegration(context.Context, model.Integration) (*model.Integration, error)
71
        SetDeviceStatus(context.Context, string, model.Status) error
72
        SetIntegrationCredentials(context.Context, uuid.UUID, model.Credentials) error
73
        RemoveIntegration(context.Context, uuid.UUID) error
74
        GetDevice(context.Context, string) (*model.Device, error)
75
        GetDeviceStateIntegration(context.Context, string, uuid.UUID) (*model.DeviceState, error)
76
        SetDeviceStateIntegration(context.Context, string, uuid.UUID, *model.DeviceState) (*model.DeviceState, error)
77
        GetDeviceStateIoTHub(context.Context, string, *model.Integration) (*model.DeviceState, error)
78
        SetDeviceStateIoTHub(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
79
        GetDeviceStateIoTCore(context.Context, string, *model.Integration) (*model.DeviceState, error)
80
        SetDeviceStateIoTCore(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
81
        ProvisionDevice(context.Context, model.DeviceEvent) error
82
        DeleteTenant(context.Context) error
83
        DecommissionDevice(context.Context, string) error
84

85
        SyncDevices(context.Context, int, bool) error
86

87
        GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error)
88
        VerifyDeviceTwin(ctx context.Context, req model.PreauthRequest) error
89
}
90

91
// app is an app object
92
type app struct {
93
        store           store.DataStore
94
        iothubClient    iothub.Client
95
        iotcoreClient   iotcore.Client
96
        wf              workflows.Client
97
        devauth         devauth.Client
98
        httpClient      *http.Client
99
        webhooksTimeout time.Duration
100
}
101

102
// NewApp initialize a new iot-manager App
103
func New(ds store.DataStore, wf workflows.Client, da devauth.Client) App {
3✔
104
        c := client.New()
3✔
105
        hubClient := iothub.NewClient(
3✔
106
                iothub.NewOptions().SetClient(c),
3✔
107
        )
3✔
108
        return &app{
3✔
109
                store:        ds,
3✔
110
                wf:           wf,
3✔
111
                devauth:      da,
3✔
112
                iothubClient: hubClient,
3✔
113
                httpClient:   c,
3✔
114
        }
3✔
115
}
3✔
116

117
// WithIoTHub sets the IoT Hub client
118
func (a *app) WithIoTHub(client iothub.Client) App {
3✔
119
        a.iothubClient = client
3✔
120
        return a
3✔
121
}
3✔
122

123
// WithIoTCore sets the IoT Core client
124
func (a *app) WithIoTCore(client iotcore.Client) App {
3✔
125
        a.iotcoreClient = client
3✔
126
        return a
3✔
127
}
3✔
128

129
// WithWebhooksTimeout sets the timeout for webhooks requests
130
func (a *app) WithWebhooksTimeout(timeout uint) App {
3✔
131
        a.webhooksTimeout = time.Duration(timeout * uint(time.Second))
3✔
132
        return a
3✔
133
}
3✔
134

135
// HealthCheck performs a health check and returns an error if it fails
136
func (a *app) HealthCheck(ctx context.Context) error {
2✔
137
        return a.store.Ping(ctx)
2✔
138
}
2✔
139

140
func (a *app) GetIntegrations(ctx context.Context) ([]model.Integration, error) {
3✔
141
        return a.store.GetIntegrations(ctx, model.IntegrationFilter{})
3✔
142
}
3✔
143

144
func (a *app) GetIntegrationById(ctx context.Context, id uuid.UUID) (*model.Integration, error) {
1✔
145
        integration, err := a.store.GetIntegrationById(ctx, id)
1✔
146
        if err != nil {
2✔
147
                switch cause := errors.Cause(err); cause {
1✔
148
                case store.ErrObjectNotFound:
1✔
149
                        return nil, ErrIntegrationNotFound
1✔
150
                default:
1✔
151
                        return nil, err
1✔
152
                }
153
        }
154
        return integration, err
1✔
155
}
156

157
func (a *app) CreateIntegration(
158
        ctx context.Context,
159
        integration model.Integration,
160
) (*model.Integration, error) {
3✔
161
        result, err := a.store.CreateIntegration(ctx, integration)
3✔
162
        if err == store.ErrObjectExists {
3✔
163
                return nil, ErrIntegrationExists
×
164
        }
×
165
        return result, err
3✔
166
}
167

168
func (a *app) SetIntegrationCredentials(
169
        ctx context.Context,
170
        integrationID uuid.UUID,
171
        credentials model.Credentials,
172
) error {
1✔
173
        err := a.store.SetIntegrationCredentials(ctx, integrationID, credentials)
1✔
174
        if err != nil {
2✔
175
                switch cause := errors.Cause(err); cause {
1✔
176
                case store.ErrObjectNotFound:
1✔
177
                        return ErrIntegrationNotFound
1✔
178
                default:
1✔
179
                        return err
1✔
180
                }
181
        }
182
        return err
1✔
183
}
184

185
func (a *app) RemoveIntegration(
186
        ctx context.Context,
187
        integrationID uuid.UUID,
188
) error {
1✔
189
        itg, err := a.store.GetIntegrationById(ctx, integrationID)
1✔
190
        if err == nil {
2✔
191
                if itg.Provider != model.ProviderWebhook {
2✔
192
                        // check if there are any devices with given integration enabled
1✔
193
                        devicesExist, err := a.store.
1✔
194
                                DoDevicesExistByIntegrationID(ctx, integrationID)
1✔
195
                        if err != nil {
2✔
196
                                return err
1✔
197
                        }
1✔
198
                        if devicesExist {
2✔
199
                                return ErrCannotRemoveIntegration
1✔
200
                        }
1✔
201
                }
202
                err = a.store.RemoveIntegration(ctx, integrationID)
1✔
203
        }
204
        if errors.Is(err, store.ErrObjectNotFound) {
2✔
205
                return ErrIntegrationNotFound
1✔
206
        }
1✔
207
        return err
1✔
208
}
209

210
func (a *app) GetDeviceIntegrations(
211
        ctx context.Context,
212
        deviceID string,
213
) ([]model.Integration, error) {
×
214
        device, err := a.store.GetDevice(ctx, deviceID)
×
215
        if err != nil {
×
216
                if err == store.ErrObjectNotFound {
×
217
                        return nil, ErrDeviceNotFound
×
218
                }
×
219
                return nil, errors.Wrap(err, "app: failed to get device integrations")
×
220
        }
221
        if len(device.IntegrationIDs) > 0 {
×
222
                integrations, err := a.store.GetIntegrations(ctx,
×
223
                        model.IntegrationFilter{IDs: device.IntegrationIDs},
×
224
                )
×
225
                return integrations, errors.Wrap(err, "app: failed to get device integrations")
×
226
        }
×
227
        return []model.Integration{}, nil
×
228
}
229

230
func (a *app) SetDeviceStatus(ctx context.Context, deviceID string, status model.Status) error {
3✔
231
        go func() {
6✔
232
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
3✔
233
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
3✔
234
                defer cancel()
3✔
235
                runAndLogError(ctxWithTimeout, func() error {
6✔
236
                        return a.setDeviceStatus(ctxWithTimeout, deviceID, status)
3✔
237
                })
3✔
238
        }()
239
        return nil
3✔
240
}
241

242
func (a *app) setDeviceStatus(ctx context.Context, deviceID string, status model.Status) error {
3✔
243
        integrations, err := a.store.GetIntegrations(ctx, model.IntegrationFilter{})
3✔
244
        if err != nil {
4✔
245
                if errors.Is(err, store.ErrObjectNotFound) {
2✔
246
                        return nil
1✔
247
                }
1✔
248
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
249
        }
250
        event := model.Event{
3✔
251
                WebhookEvent: model.WebhookEvent{
3✔
252
                        ID:   uuid.New(),
3✔
253
                        Type: model.EventTypeDeviceStatusChanged,
3✔
254
                        Data: model.DeviceEvent{
3✔
255
                                ID:     deviceID,
3✔
256
                                Status: status,
3✔
257
                        },
3✔
258
                        EventTS: time.Now(),
3✔
259
                },
3✔
260
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
3✔
261
        }
3✔
262

3✔
263
        var (
3✔
264
                ok     bool
3✔
265
                device = newDevice(deviceID, a.store)
3✔
266
        )
3✔
267
        for _, integration := range integrations {
6✔
268
                deliver := model.DeliveryStatus{
3✔
269
                        IntegrationID: integration.ID,
3✔
270
                        Success:       true,
3✔
271
                }
3✔
272
                switch integration.Provider {
3✔
273
                case model.ProviderIoTHub:
1✔
274
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
275
                        if err != nil {
1✔
276
                                break // switch
×
277
                        } else if !ok {
1✔
278
                                continue // loop
×
279
                        }
280
                        err = a.setDeviceStatusIoTHub(ctx, deviceID, status, integration)
1✔
281

282
                case model.ProviderIoTCore:
1✔
283
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
284
                        if err != nil {
1✔
285
                                break // switch
×
286
                        } else if !ok {
1✔
287
                                continue // loop
×
288
                        }
289
                        err = a.setDeviceStatusIoTCore(ctx, deviceID, status, integration)
1✔
290

291
                case model.ProviderWebhook:
3✔
292
                        var (
3✔
293
                                req *http.Request
3✔
294
                                rsp *http.Response
3✔
295
                        )
3✔
296
                        req, err = client.NewWebhookRequest(ctx,
3✔
297
                                &integration.Credentials,
3✔
298
                                event.WebhookEvent)
3✔
299
                        if err != nil {
4✔
300
                                break // switch
1✔
301
                        }
302
                        rsp, err = a.httpClient.Do(req)
3✔
303
                        if err != nil {
4✔
304
                                break // switch
1✔
305
                        }
306
                        deliver.StatusCode = &rsp.StatusCode
3✔
307
                        if rsp.StatusCode >= 300 {
4✔
308
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
309
                        }
1✔
310
                        _ = rsp.Body.Close()
3✔
311

312
                default:
1✔
313
                        continue
1✔
314
                }
315
                if err != nil {
4✔
316
                        var httpError client.HTTPError
1✔
317
                        if errors.As(err, &httpError) {
2✔
318
                                errCode := httpError.Code()
1✔
319
                                deliver.StatusCode = &errCode
1✔
320
                        }
1✔
321
                        deliver.Success = false
1✔
322
                        deliver.Error = err.Error()
1✔
323
                }
324
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
3✔
325
        }
326
        if len(integrations) > 0 {
6✔
327
                err = a.store.SaveEvent(ctx, event)
3✔
328
        }
3✔
329
        return err
3✔
330
}
331

332
func (a *app) DeleteTenant(
333
        ctx context.Context,
334
) error {
2✔
335
        return a.store.DeleteTenantData(ctx)
2✔
336
}
2✔
337

338
func (a *app) ProvisionDevice(
339
        ctx context.Context,
340
        device model.DeviceEvent,
341
) error {
3✔
342
        go func() {
6✔
343
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
3✔
344
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
3✔
345
                defer cancel()
3✔
346
                runAndLogError(ctxWithTimeout, func() error {
6✔
347
                        return a.provisionDevice(ctxWithTimeout, device)
3✔
348
                })
3✔
349
        }()
350
        return nil
3✔
351
}
352

353
func (a *app) provisionDevice(
354
        ctx context.Context,
355
        device model.DeviceEvent,
356
) error {
3✔
357
        integrations, err := a.GetIntegrations(ctx)
3✔
358
        if err != nil {
4✔
359
                if errors.Is(err, store.ErrObjectNotFound) {
2✔
360
                        return nil
1✔
361
                }
1✔
362
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
363
        }
364
        event := model.Event{
3✔
365
                WebhookEvent: model.WebhookEvent{
3✔
366
                        ID:      uuid.New(),
3✔
367
                        Type:    model.EventTypeDeviceProvisioned,
3✔
368
                        Data:    device,
3✔
369
                        EventTS: time.Now(),
3✔
370
                },
3✔
371
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
3✔
372
        }
3✔
373
        integrationIDs := make([]uuid.UUID, 0, len(integrations))
3✔
374
        for _, integration := range integrations {
6✔
375
                deliver := model.DeliveryStatus{
3✔
376
                        IntegrationID: integration.ID,
3✔
377
                        Success:       true,
3✔
378
                }
3✔
379
                switch integration.Provider {
3✔
380
                case model.ProviderIoTHub:
×
381
                        err = a.provisionIoTHubDevice(ctx, device.ID, integration)
×
382
                        integrationIDs = append(integrationIDs, integration.ID)
×
383
                case model.ProviderIoTCore:
×
384
                        err = a.provisionIoTCoreDevice(ctx, device.ID, integration, &iotcore.Device{
×
385
                                Status: iotcore.StatusEnabled,
×
386
                        })
×
387
                        integrationIDs = append(integrationIDs, integration.ID)
×
388
                case model.ProviderWebhook:
3✔
389
                        var (
3✔
390
                                req *http.Request
3✔
391
                                rsp *http.Response
3✔
392
                        )
3✔
393
                        req, err = client.NewWebhookRequest(ctx,
3✔
394
                                &integration.Credentials,
3✔
395
                                event.WebhookEvent)
3✔
396
                        if err != nil {
4✔
397
                                break // switch
1✔
398
                        }
399
                        rsp, err = a.httpClient.Do(req)
3✔
400
                        if err != nil {
4✔
401
                                break // switch
1✔
402
                        }
403
                        deliver.StatusCode = &rsp.StatusCode
3✔
404
                        if rsp.StatusCode >= 300 {
4✔
405
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
406
                        }
1✔
407
                        _ = rsp.Body.Close()
3✔
408

409
                default:
1✔
410
                        continue
1✔
411
                }
412
                if err != nil {
4✔
413
                        var httpError client.HTTPError
1✔
414
                        if errors.As(err, &httpError) {
2✔
415
                                errCode := httpError.Code()
1✔
416
                                deliver.StatusCode = &errCode
1✔
417
                        }
1✔
418
                        deliver.Success = false
1✔
419
                        deliver.Error = err.Error()
1✔
420
                }
421
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
3✔
422
        }
423
        _, err = a.store.UpsertDeviceIntegrations(ctx, device.ID, integrationIDs)
3✔
424
        if err != nil {
3✔
UNCOV
425
                var statusCodeInternal = 1500
×
UNCOV
426
                for i := range event.DeliveryStatus {
×
427
                        stat := &event.DeliveryStatus[i]
×
428
                        if stat.Error == "" {
×
429
                                stat.Error = "failed to connect device to integration"
×
430
                                stat.StatusCode = &statusCodeInternal
×
431
                        }
×
432
                        stat.Success = false
×
433
                }
434
                // Use 'panic' field to log potential data inconsistency
UNCOV
435
                log.FromContext(ctx).
×
UNCOV
436
                        WithField("panic", err.Error()).
×
UNCOV
437
                        Error("failed to connect device integration")
×
438
        }
439
        if len(integrations) > 0 {
6✔
440
                err = a.store.SaveEvent(ctx, event)
3✔
441
        }
3✔
442
        return err
3✔
443
}
444

445
func (a *app) syncBatch(
446
        ctx context.Context,
447
        devices []model.Device,
448
        integCache map[uuid.UUID]*model.Integration,
449
        failEarly bool,
450
) error {
1✔
451
        var err error
1✔
452
        l := log.FromContext(ctx)
1✔
453

1✔
454
        deviceMap := make(map[uuid.UUID][]string, len(integCache))
1✔
455
        for _, dev := range devices {
2✔
456
                for _, id := range dev.IntegrationIDs {
2✔
457
                        deviceMap[id] = append(deviceMap[id], dev.ID)
1✔
458
                }
1✔
459
        }
460

461
        for integID, deviceIDs := range deviceMap {
2✔
462
                integration, ok := integCache[integID]
1✔
463
                if !ok {
1✔
464
                        // (Data race) Try again to fetch the integration
×
465
                        integration, err = a.store.GetIntegrationById(ctx, integID)
×
466
                        if err != nil {
×
467
                                if err == store.ErrObjectNotFound {
×
468
                                        integCache[integID] = nil
×
469
                                        continue
×
470
                                }
471
                                err = errors.Wrap(err, "failed to retrieve device integration")
×
472
                                if failEarly {
×
473
                                        return err
×
474
                                }
×
475
                                l.Errorf("failed to get device integration: %s", err)
×
476
                                continue
×
477
                        } else {
×
478
                                integCache[integID] = integration
×
479
                        }
×
480
                }
481
                if integration == nil {
1✔
482
                        // Should not occur, but is not impossible since mongo client
×
483
                        // caches batches of results.
×
484
                        _, err := a.store.RemoveDevicesFromIntegration(ctx, integID)
×
485
                        if err != nil {
×
486
                                err = errors.Wrap(err, "failed to remove integration from devices")
×
487
                                if failEarly {
×
488
                                        return err
×
489
                                }
×
490
                                l.Error(err)
×
491
                        }
492
                        continue
×
493
                }
494

495
                switch integration.Provider {
1✔
496
                case model.ProviderIoTHub:
1✔
497
                        err := a.syncIoTHubDevices(ctx, deviceIDs, *integration, failEarly)
1✔
498
                        if err != nil {
1✔
499
                                if failEarly {
×
500
                                        return err
×
501
                                }
×
502
                                l.Error(err)
×
503
                        }
504
                case model.ProviderIoTCore:
1✔
505
                        err := a.syncIoTCoreDevices(ctx, deviceIDs, *integration, failEarly)
1✔
506
                        if err != nil {
1✔
507
                                if failEarly {
×
508
                                        return err
×
509
                                }
×
510
                                l.Error(err)
×
511
                        }
512
                default:
×
513
                }
514
        }
515

516
        return nil
1✔
517
}
518

519
func (a app) syncCacheIntegrations(ctx context.Context) (map[uuid.UUID]*model.Integration, error) {
1✔
520
        // NOTE At the time of writing this, we don't allow more than one
1✔
521
        //      integration per tenant so this const doesn't matter.
1✔
522
        // TODO Will we need a more sophisticated cache data structure?
1✔
523
        const MaxIntegrationsToCache = 20
1✔
524
        // Cache integrations for the given tenant
1✔
525
        integrations, err := a.store.GetIntegrations(
1✔
526
                ctx, model.IntegrationFilter{Limit: MaxIntegrationsToCache},
1✔
527
        )
1✔
528
        if err != nil {
1✔
529
                return nil, errors.Wrap(err, "failed to get integrations for tenant")
×
530
        }
×
531
        integCache := make(map[uuid.UUID]*model.Integration, len(integrations))
1✔
532
        for i := range integrations {
2✔
533
                integCache[integrations[i].ID] = &integrations[i]
1✔
534
        }
1✔
535
        return integCache, nil
1✔
536
}
537

538
func (a *app) SyncDevices(
539
        ctx context.Context,
540
        batchSize int,
541
        failEarly bool,
542
) error {
1✔
543
        type DeviceWithTenantID struct {
1✔
544
                model.Device `bson:",inline"`
1✔
545
                TenantID     string `bson:"tenant_id"`
1✔
546
        }
1✔
547
        iter, err := a.store.GetAllDevices(ctx)
1✔
548
        if err != nil {
1✔
549
                return err
×
550
        }
×
551
        defer iter.Close(ctx)
1✔
552

1✔
553
        var (
1✔
554
                deviceBatch        = make([]model.Device, 0, batchSize)
1✔
555
                tenantID    string = ""
1✔
556
                integCache  map[uuid.UUID]*model.Integration
1✔
557
        )
1✔
558
        tCtx := identity.WithContext(ctx, &identity.Identity{
1✔
559
                Tenant: tenantID,
1✔
560
        })
1✔
561
        integCache, err = a.syncCacheIntegrations(tCtx)
1✔
562
        if err != nil {
1✔
563
                return err
×
564
        }
×
565
        for iter.Next(ctx) {
2✔
566
                dev := DeviceWithTenantID{}
1✔
567
                err := iter.Decode(&dev)
1✔
568
                if err != nil {
1✔
569
                        return err
×
570
                }
×
571
                if len(deviceBatch) == cap(deviceBatch) ||
1✔
572
                        (tenantID != dev.TenantID && len(deviceBatch) > 0) {
2✔
573
                        err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
1✔
574
                        if err != nil {
1✔
575
                                return err
×
576
                        }
×
577
                        deviceBatch = deviceBatch[:0]
1✔
578
                }
579
                if tenantID != dev.TenantID {
2✔
580
                        tenantID = dev.TenantID
1✔
581
                        tCtx = identity.WithContext(ctx, &identity.Identity{
1✔
582
                                Tenant: tenantID,
1✔
583
                        })
1✔
584

1✔
585
                        integCache, err = a.syncCacheIntegrations(tCtx)
1✔
586
                        if err != nil {
1✔
587
                                return err
×
588
                        }
×
589

590
                }
591
                deviceBatch = append(deviceBatch, dev.Device)
1✔
592
        }
593
        if len(deviceBatch) > 0 {
2✔
594
                err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
1✔
595
                if err != nil {
1✔
596
                        return err
×
597
                }
×
598
        }
599
        return nil
1✔
600
}
601

602
func (a *app) DecommissionDevice(ctx context.Context, deviceID string) error {
3✔
603
        go func() {
6✔
604
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
3✔
605
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
3✔
606
                defer cancel()
3✔
607
                runAndLogError(ctxWithTimeout, func() error {
6✔
608
                        return a.decommissionDevice(ctxWithTimeout, deviceID)
3✔
609
                })
3✔
610
        }()
611
        return nil
3✔
612
}
613

614
func (a *app) decommissionDevice(ctx context.Context, deviceID string) error {
3✔
615
        integrations, err := a.GetIntegrations(ctx)
3✔
616
        if err != nil {
4✔
617
                return err
1✔
618
        }
1✔
619
        var (
3✔
620
                device = newDevice(deviceID, a.store)
3✔
621
        )
3✔
622
        event := model.Event{
3✔
623
                WebhookEvent: model.WebhookEvent{
3✔
624
                        ID:   uuid.New(),
3✔
625
                        Type: model.EventTypeDeviceDecommissioned,
3✔
626
                        Data: model.DeviceEvent{
3✔
627
                                ID: deviceID,
3✔
628
                        },
3✔
629
                        EventTS: time.Now(),
3✔
630
                },
3✔
631
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
3✔
632
        }
3✔
633
        for _, integration := range integrations {
6✔
634
                var (
3✔
635
                        err error
3✔
636
                        ok  bool
3✔
637
                )
3✔
638
                deliver := model.DeliveryStatus{
3✔
639
                        IntegrationID: integration.ID,
3✔
640
                        Success:       true,
3✔
641
                }
3✔
642
                switch integration.Provider {
3✔
643
                case model.ProviderIoTHub:
2✔
644
                        ok, err = device.HasIntegration(ctx, integration.ID)
2✔
645
                        if err != nil {
2✔
646
                                break // switch
×
647
                        } else if !ok {
3✔
648
                                continue // loop
1✔
649
                        }
650
                        err = a.decommissionIoTHubDevice(ctx, deviceID, integration)
2✔
651
                case model.ProviderIoTCore:
2✔
652
                        ok, err = device.HasIntegration(ctx, integration.ID)
2✔
653
                        if err != nil {
2✔
654
                                break // switch
×
655
                        } else if !ok {
2✔
656
                                continue // loop
×
657
                        }
658
                        err = a.decommissionIoTCoreDevice(ctx, deviceID, integration)
2✔
659
                case model.ProviderWebhook:
3✔
660
                        var (
3✔
661
                                req *http.Request
3✔
662
                                rsp *http.Response
3✔
663
                        )
3✔
664
                        req, err = client.NewWebhookRequest(ctx,
3✔
665
                                &integration.Credentials,
3✔
666
                                event.WebhookEvent)
3✔
667
                        if err != nil {
4✔
668
                                break // switch
1✔
669
                        }
670
                        rsp, err = a.httpClient.Do(req)
3✔
671
                        if err != nil {
4✔
672
                                break // switch
1✔
673
                        }
674
                        deliver.StatusCode = &rsp.StatusCode
3✔
675
                        if rsp.StatusCode >= 300 {
4✔
676
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
677
                        }
1✔
678
                        _ = rsp.Body.Close()
3✔
679

680
                default:
1✔
681
                        continue
1✔
682
                }
683
                if err != nil {
5✔
684
                        var httpError client.HTTPError
2✔
685
                        if errors.As(err, &httpError) {
3✔
686
                                errCode := httpError.Code()
1✔
687
                                deliver.StatusCode = &errCode
1✔
688
                        }
1✔
689
                        deliver.Success = false
2✔
690
                        deliver.Error = err.Error()
2✔
691
                }
692
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
3✔
693
        }
694
        err = a.store.DeleteDevice(ctx, deviceID)
3✔
695
        if errors.Is(err, store.ErrObjectNotFound) {
6✔
696
                err = nil
3✔
697
        }
3✔
698
        if err != nil {
4✔
699
                // Add the `panic` keyword field so we can trace database inconsistencies
1✔
700
                log.FromContext(ctx).
1✔
701
                        WithField("panic", err.Error()).
1✔
702
                        Errorf("failed to remove device from database: %s", err.Error())
1✔
703
        }
1✔
704
        if len(integrations) > 0 {
6✔
705
                err = a.store.SaveEvent(ctx, event)
3✔
706
        }
3✔
707
        return err
3✔
708
}
709

710
func (a *app) GetDevice(ctx context.Context, deviceID string) (*model.Device, error) {
1✔
711
        device, err := a.store.GetDevice(ctx, deviceID)
1✔
712
        if err == store.ErrObjectNotFound {
2✔
713
                return nil, ErrDeviceNotFound
1✔
714
        }
1✔
715
        return device, err
1✔
716
}
717

718
func (a *app) GetDeviceStateIntegration(
719
        ctx context.Context,
720
        deviceID string,
721
        integrationID uuid.UUID,
722
) (*model.DeviceState, error) {
1✔
723
        _, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
1✔
724
        if err != nil {
2✔
725
                if err == store.ErrObjectNotFound {
2✔
726
                        return nil, ErrIntegrationNotFound
1✔
727
                }
1✔
728
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
729
        }
730
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
1✔
731
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
2✔
732
                return nil, ErrIntegrationNotFound
1✔
733
        } else if err != nil {
3✔
734
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
735
        }
1✔
736
        switch integration.Provider {
1✔
737
        case model.ProviderIoTHub:
1✔
738
                return a.GetDeviceStateIoTHub(ctx, deviceID, integration)
1✔
739
        case model.ProviderIoTCore:
1✔
740
                return a.GetDeviceStateIoTCore(ctx, deviceID, integration)
1✔
741
        default:
1✔
742
                return nil, ErrUnknownIntegration
1✔
743
        }
744
}
745

746
func (a *app) SetDeviceStateIntegration(
747
        ctx context.Context,
748
        deviceID string,
749
        integrationID uuid.UUID,
750
        state *model.DeviceState,
751
) (*model.DeviceState, error) {
1✔
752
        device, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
1✔
753
        if err != nil {
2✔
754
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
755
        } else if device == nil {
3✔
756
                return nil, ErrIntegrationNotFound
1✔
757
        }
1✔
758
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
1✔
759
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
2✔
760
                return nil, ErrIntegrationNotFound
1✔
761
        } else if err != nil {
3✔
762
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
763
        }
1✔
764
        switch integration.Provider {
1✔
765
        case model.ProviderIoTHub:
1✔
766
                return a.SetDeviceStateIoTHub(ctx, deviceID, integration, state)
1✔
767
        case model.ProviderIoTCore:
×
768
                return a.SetDeviceStateIoTCore(ctx, deviceID, integration, state)
×
769
        default:
1✔
770
                return nil, ErrUnknownIntegration
1✔
771
        }
772
}
773

774
func (a *app) GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error) {
3✔
775
        return a.store.GetEvents(ctx, filter)
3✔
776
}
3✔
777

778
func runAndLogError(ctx context.Context, f func() error) {
3✔
779
        var err error
3✔
780
        logger := log.FromContext(ctx)
3✔
781
        defer func() {
6✔
782
                if r := recover(); r != nil {
4✔
783
                        logger.WithField("panic", r).
1✔
784
                                Error(errors.Wrap(err, "panic processing asynchronous webhook"))
1✔
785
                } else if err != nil {
5✔
786
                        logger.WithField("error", err.Error()).
1✔
787
                                Error(errors.Wrap(err, "failed to process an asynchronous webhook"))
1✔
788
                }
1✔
789
        }()
790
        err = f()
3✔
791
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc