• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1495380963

14 Oct 2024 03:35PM UTC coverage: 70.373% (-2.5%) from 72.904%
1495380963

Pull #101

gitlab-ci

mineralsfree
feat: tenant list added

Ticket: MEN-7568
Changelog: None

Signed-off-by: Mikita Pilinka <mikita.pilinka@northern.tech>
Pull Request #101: feat: tenant list added

4406 of 6391 branches covered (68.94%)

Branch coverage included in aggregate %.

88 of 183 new or added lines in 10 files covered. (48.09%)

2623 existing lines in 65 files now uncovered.

36673 of 51982 relevant lines covered (70.55%)

31.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.09
/backend/services/iot-manager/app/app.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package app
16

17
import (
18
        "context"
19
        "net/http"
20
        "time"
21

22
        "github.com/google/uuid"
23
        "github.com/pkg/errors"
24

25
        "github.com/mendersoftware/mender-server/pkg/identity"
26
        "github.com/mendersoftware/mender-server/pkg/log"
27

28
        "github.com/mendersoftware/mender-server/services/iot-manager/client"
29
        "github.com/mendersoftware/mender-server/services/iot-manager/client/devauth"
30
        "github.com/mendersoftware/mender-server/services/iot-manager/client/iotcore"
31
        "github.com/mendersoftware/mender-server/services/iot-manager/client/iothub"
32
        "github.com/mendersoftware/mender-server/services/iot-manager/client/workflows"
33
        "github.com/mendersoftware/mender-server/services/iot-manager/model"
34
        "github.com/mendersoftware/mender-server/services/iot-manager/store"
35
)
36

37
var (
38
        ErrIntegrationNotFound = errors.New("integration not found")
39
        ErrIntegrationExists   = errors.New("integration already exists")
40
        ErrUnknownIntegration  = errors.New("unknown integration provider")
41
        ErrNoCredentials       = errors.New("no connection string or credentials " +
42
                "configured for the tenant")
43
        ErrNoDeviceConnectionString = errors.New("device has no connection string")
44

45
        ErrDeviceAlreadyExists     = errors.New("device already exists")
46
        ErrDeviceNotFound          = errors.New("device not found")
47
        ErrDeviceStateConflict     = errors.New("conflict when updating the device state")
48
        ErrCannotRemoveIntegration = errors.New("cannot remove integration in use by devices")
49
)
50

51
const (
52
        confKeyPrimaryKey     = "azureConnectionString"
53
        confKeyAWSCertificate = "awsCertificate"
54
        confKeyAWSPrivateKey  = "awsPrivateKey"
55
        confKeyAWSEndpoint    = "awsEndpoint"
56
)
57

58
// App interface describes app objects
59
//
60
//nolint:lll
61
//go:generate ../../../utils/mockgen.sh
62
type App interface {
63
        WithIoTCore(client iotcore.Client) App
64
        WithIoTHub(client iothub.Client) App
65
        WithWebhooksTimeout(timeout uint) App
66
        HealthCheck(context.Context) error
67
        GetDeviceIntegrations(context.Context, string) ([]model.Integration, error)
68
        GetIntegrations(context.Context) ([]model.Integration, error)
69
        GetIntegrationById(context.Context, uuid.UUID) (*model.Integration, error)
70
        CreateIntegration(context.Context, model.Integration) (*model.Integration, error)
71
        SetDeviceStatus(context.Context, string, model.Status) error
72
        SetIntegrationCredentials(context.Context, uuid.UUID, model.Credentials) error
73
        RemoveIntegration(context.Context, uuid.UUID) error
74
        GetDevice(context.Context, string) (*model.Device, error)
75
        GetDeviceStateIntegration(context.Context, string, uuid.UUID) (*model.DeviceState, error)
76
        SetDeviceStateIntegration(context.Context, string, uuid.UUID, *model.DeviceState) (*model.DeviceState, error)
77
        GetDeviceStateIoTHub(context.Context, string, *model.Integration) (*model.DeviceState, error)
78
        SetDeviceStateIoTHub(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
79
        GetDeviceStateIoTCore(context.Context, string, *model.Integration) (*model.DeviceState, error)
80
        SetDeviceStateIoTCore(context.Context, string, *model.Integration, *model.DeviceState) (*model.DeviceState, error)
81
        ProvisionDevice(context.Context, model.DeviceEvent) error
82
        DeleteTenant(context.Context) error
83
        DecommissionDevice(context.Context, string) error
84

85
        SyncDevices(context.Context, int, bool) error
86

87
        GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error)
88
        VerifyDeviceTwin(ctx context.Context, req model.PreauthRequest) error
89
}
90

91
// app is an app object
92
type app struct {
93
        store           store.DataStore
94
        iothubClient    iothub.Client
95
        iotcoreClient   iotcore.Client
96
        wf              workflows.Client
97
        devauth         devauth.Client
98
        httpClient      *http.Client
99
        webhooksTimeout time.Duration
100
}
101

102
// NewApp initialize a new iot-manager App
103
func New(ds store.DataStore, wf workflows.Client, da devauth.Client) App {
1✔
104
        c := client.New()
1✔
105
        hubClient := iothub.NewClient(
1✔
106
                iothub.NewOptions().SetClient(c),
1✔
107
        )
1✔
108
        return &app{
1✔
109
                store:        ds,
1✔
110
                wf:           wf,
1✔
111
                devauth:      da,
1✔
112
                iothubClient: hubClient,
1✔
113
                httpClient:   c,
1✔
114
        }
1✔
115
}
1✔
116

117
// WithIoTHub sets the IoT Hub client
118
func (a *app) WithIoTHub(client iothub.Client) App {
1✔
119
        a.iothubClient = client
1✔
120
        return a
1✔
121
}
1✔
122

123
// WithIoTCore sets the IoT Core client
124
func (a *app) WithIoTCore(client iotcore.Client) App {
1✔
125
        a.iotcoreClient = client
1✔
126
        return a
1✔
127
}
1✔
128

129
// WithWebhooksTimeout sets the timeout for webhooks requests
130
func (a *app) WithWebhooksTimeout(timeout uint) App {
1✔
131
        a.webhooksTimeout = time.Duration(timeout * uint(time.Second))
1✔
132
        return a
1✔
133
}
1✔
134

135
// HealthCheck performs a health check and returns an error if it fails
136
func (a *app) HealthCheck(ctx context.Context) error {
1✔
137
        return a.store.Ping(ctx)
1✔
138
}
1✔
139

140
func (a *app) GetIntegrations(ctx context.Context) ([]model.Integration, error) {
1✔
141
        return a.store.GetIntegrations(ctx, model.IntegrationFilter{})
1✔
142
}
1✔
143

144
func (a *app) GetIntegrationById(ctx context.Context, id uuid.UUID) (*model.Integration, error) {
1✔
145
        integration, err := a.store.GetIntegrationById(ctx, id)
1✔
146
        if err != nil {
2✔
147
                switch cause := errors.Cause(err); cause {
1✔
148
                case store.ErrObjectNotFound:
1✔
149
                        return nil, ErrIntegrationNotFound
1✔
150
                default:
1✔
151
                        return nil, err
1✔
152
                }
153
        }
154
        return integration, err
1✔
155
}
156

157
func (a *app) CreateIntegration(
158
        ctx context.Context,
159
        integration model.Integration,
160
) (*model.Integration, error) {
1✔
161
        result, err := a.store.CreateIntegration(ctx, integration)
1✔
162
        if err == store.ErrObjectExists {
1✔
163
                return nil, ErrIntegrationExists
×
164
        }
×
165
        return result, err
1✔
166
}
167

168
func (a *app) SetIntegrationCredentials(
169
        ctx context.Context,
170
        integrationID uuid.UUID,
171
        credentials model.Credentials,
172
) error {
1✔
173
        err := a.store.SetIntegrationCredentials(ctx, integrationID, credentials)
1✔
174
        if err != nil {
2✔
175
                switch cause := errors.Cause(err); cause {
1✔
176
                case store.ErrObjectNotFound:
1✔
177
                        return ErrIntegrationNotFound
1✔
178
                default:
1✔
179
                        return err
1✔
180
                }
181
        }
182
        return err
1✔
183
}
184

185
func (a *app) RemoveIntegration(
186
        ctx context.Context,
187
        integrationID uuid.UUID,
188
) error {
1✔
189
        itg, err := a.store.GetIntegrationById(ctx, integrationID)
1✔
190
        if err == nil {
2✔
191
                if itg.Provider != model.ProviderWebhook {
2✔
192
                        // check if there are any devices with given integration enabled
1✔
193
                        devicesExist, err := a.store.
1✔
194
                                DoDevicesExistByIntegrationID(ctx, integrationID)
1✔
195
                        if err != nil {
2✔
196
                                return err
1✔
197
                        }
1✔
198
                        if devicesExist {
2✔
199
                                return ErrCannotRemoveIntegration
1✔
200
                        }
1✔
201
                }
202
                err = a.store.RemoveIntegration(ctx, integrationID)
1✔
203
        }
204
        if errors.Is(err, store.ErrObjectNotFound) {
2✔
205
                return ErrIntegrationNotFound
1✔
206
        }
1✔
207
        return err
1✔
208
}
209

210
func (a *app) GetDeviceIntegrations(
211
        ctx context.Context,
212
        deviceID string,
213
) ([]model.Integration, error) {
×
214
        device, err := a.store.GetDevice(ctx, deviceID)
×
215
        if err != nil {
×
216
                if err == store.ErrObjectNotFound {
×
217
                        return nil, ErrDeviceNotFound
×
218
                }
×
219
                return nil, errors.Wrap(err, "app: failed to get device integrations")
×
220
        }
221
        if len(device.IntegrationIDs) > 0 {
×
222
                integrations, err := a.store.GetIntegrations(ctx,
×
223
                        model.IntegrationFilter{IDs: device.IntegrationIDs},
×
224
                )
×
225
                return integrations, errors.Wrap(err, "app: failed to get device integrations")
×
226
        }
×
227
        return []model.Integration{}, nil
×
228
}
229

230
func (a *app) SetDeviceStatus(ctx context.Context, deviceID string, status model.Status) error {
1✔
231
        go func() {
2✔
232
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
1✔
233
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
1✔
234
                defer cancel()
1✔
235
                runAndLogError(ctxWithTimeout, func() error {
2✔
236
                        return a.setDeviceStatus(ctxWithTimeout, deviceID, status)
1✔
237
                })
1✔
238
        }()
239
        return nil
1✔
240
}
241

242
func (a *app) setDeviceStatus(ctx context.Context, deviceID string, status model.Status) error {
1✔
243
        integrations, err := a.store.GetIntegrations(ctx, model.IntegrationFilter{})
1✔
244
        if err != nil {
2✔
245
                if errors.Is(err, store.ErrObjectNotFound) {
2✔
246
                        return nil
1✔
247
                }
1✔
248
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
249
        }
250
        event := model.Event{
1✔
251
                WebhookEvent: model.WebhookEvent{
1✔
252
                        ID:   uuid.New(),
1✔
253
                        Type: model.EventTypeDeviceStatusChanged,
1✔
254
                        Data: model.DeviceEvent{
1✔
255
                                ID:     deviceID,
1✔
256
                                Status: status,
1✔
257
                        },
1✔
258
                        EventTS: time.Now(),
1✔
259
                },
1✔
260
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
1✔
261
        }
1✔
262

1✔
263
        var (
1✔
264
                ok     bool
1✔
265
                device = newDevice(deviceID, a.store)
1✔
266
        )
1✔
267
        for _, integration := range integrations {
2✔
268
                deliver := model.DeliveryStatus{
1✔
269
                        IntegrationID: integration.ID,
1✔
270
                        Success:       true,
1✔
271
                }
1✔
272
                switch integration.Provider {
1✔
273
                case model.ProviderIoTHub:
1✔
274
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
275
                        if err != nil {
1✔
276
                                break // switch
×
277
                        } else if !ok {
1✔
278
                                continue // loop
×
279
                        }
280
                        err = a.setDeviceStatusIoTHub(ctx, deviceID, status, integration)
1✔
281

282
                case model.ProviderIoTCore:
1✔
283
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
284
                        if err != nil {
1✔
285
                                break // switch
×
286
                        } else if !ok {
1✔
287
                                continue // loop
×
288
                        }
289
                        err = a.setDeviceStatusIoTCore(ctx, deviceID, status, integration)
1✔
290

291
                case model.ProviderWebhook:
1✔
292
                        var (
1✔
293
                                req *http.Request
1✔
294
                                rsp *http.Response
1✔
295
                        )
1✔
296
                        req, err = client.NewWebhookRequest(ctx,
1✔
297
                                &integration.Credentials,
1✔
298
                                event.WebhookEvent)
1✔
299
                        if err != nil {
2✔
300
                                break // switch
1✔
301
                        }
302
                        rsp, err = a.httpClient.Do(req)
1✔
303
                        if err != nil {
2✔
304
                                break // switch
1✔
305
                        }
306
                        deliver.StatusCode = &rsp.StatusCode
1✔
307
                        if rsp.StatusCode >= 300 {
2✔
308
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
309
                        }
1✔
310
                        _ = rsp.Body.Close()
1✔
311

312
                default:
1✔
313
                        continue
1✔
314
                }
315
                if err != nil {
2✔
316
                        var httpError client.HTTPError
1✔
317
                        if errors.As(err, &httpError) {
2✔
318
                                errCode := httpError.Code()
1✔
319
                                deliver.StatusCode = &errCode
1✔
320
                        }
1✔
321
                        deliver.Success = false
1✔
322
                        deliver.Error = err.Error()
1✔
323
                }
324
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
1✔
325
        }
326
        err = a.store.SaveEvent(ctx, event)
1✔
327
        return err
1✔
328
}
329

330
func (a *app) DeleteTenant(
331
        ctx context.Context,
332
) error {
1✔
333
        return a.store.DeleteTenantData(ctx)
1✔
334
}
1✔
335

336
func (a *app) ProvisionDevice(
337
        ctx context.Context,
338
        device model.DeviceEvent,
339
) error {
1✔
340
        go func() {
2✔
341
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
1✔
342
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
1✔
343
                defer cancel()
1✔
344
                runAndLogError(ctxWithTimeout, func() error {
2✔
345
                        return a.provisionDevice(ctxWithTimeout, device)
1✔
346
                })
1✔
347
        }()
348
        return nil
1✔
349
}
350

351
func (a *app) provisionDevice(
352
        ctx context.Context,
353
        device model.DeviceEvent,
354
) error {
1✔
355
        integrations, err := a.GetIntegrations(ctx)
1✔
356
        if err != nil {
2✔
357
                if errors.Is(err, store.ErrObjectNotFound) {
2✔
358
                        return nil
1✔
359
                }
1✔
360
                return errors.Wrap(err, "failed to retrieve integrations")
1✔
361
        }
362
        event := model.Event{
1✔
363
                WebhookEvent: model.WebhookEvent{
1✔
364
                        ID:      uuid.New(),
1✔
365
                        Type:    model.EventTypeDeviceProvisioned,
1✔
366
                        Data:    device,
1✔
367
                        EventTS: time.Now(),
1✔
368
                },
1✔
369
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
1✔
370
        }
1✔
371
        integrationIDs := make([]uuid.UUID, 0, len(integrations))
1✔
372
        for _, integration := range integrations {
2✔
373
                deliver := model.DeliveryStatus{
1✔
374
                        IntegrationID: integration.ID,
1✔
375
                        Success:       true,
1✔
376
                }
1✔
377
                switch integration.Provider {
1✔
378
                case model.ProviderIoTHub:
×
379
                        err = a.provisionIoTHubDevice(ctx, device.ID, integration)
×
380
                        integrationIDs = append(integrationIDs, integration.ID)
×
381
                case model.ProviderIoTCore:
×
382
                        err = a.provisionIoTCoreDevice(ctx, device.ID, integration, &iotcore.Device{
×
383
                                Status: iotcore.StatusEnabled,
×
384
                        })
×
385
                        integrationIDs = append(integrationIDs, integration.ID)
×
386
                case model.ProviderWebhook:
1✔
387
                        var (
1✔
388
                                req *http.Request
1✔
389
                                rsp *http.Response
1✔
390
                        )
1✔
391
                        req, err = client.NewWebhookRequest(ctx,
1✔
392
                                &integration.Credentials,
1✔
393
                                event.WebhookEvent)
1✔
394
                        if err != nil {
2✔
395
                                break // switch
1✔
396
                        }
397
                        rsp, err = a.httpClient.Do(req)
1✔
398
                        if err != nil {
2✔
399
                                break // switch
1✔
400
                        }
401
                        deliver.StatusCode = &rsp.StatusCode
1✔
402
                        if rsp.StatusCode >= 300 {
2✔
403
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
404
                        }
1✔
405
                        _ = rsp.Body.Close()
1✔
406

407
                default:
1✔
408
                        continue
1✔
409
                }
410
                if err != nil {
2✔
411
                        var httpError client.HTTPError
1✔
412
                        if errors.As(err, &httpError) {
2✔
413
                                errCode := httpError.Code()
1✔
414
                                deliver.StatusCode = &errCode
1✔
415
                        }
1✔
416
                        deliver.Success = false
1✔
417
                        deliver.Error = err.Error()
1✔
418
                }
419
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
1✔
420
        }
421
        _, err = a.store.UpsertDeviceIntegrations(ctx, device.ID, integrationIDs)
1✔
422
        if err != nil {
2✔
423
                var statusCodeInternal = 1500
1✔
424
                for i := range event.DeliveryStatus {
1✔
425
                        stat := &event.DeliveryStatus[i]
×
426
                        if stat.Error == "" {
×
427
                                stat.Error = "failed to connect device to integration"
×
428
                                stat.StatusCode = &statusCodeInternal
×
429
                        }
×
430
                        stat.Success = false
×
431
                }
432
                // Use 'panic' field to log potential data inconsistency
433
                log.FromContext(ctx).
1✔
434
                        WithField("panic", err.Error()).
1✔
435
                        Error("failed to connect device integration")
1✔
436
        }
437
        err = a.store.SaveEvent(ctx, event)
1✔
438
        return err
1✔
439
}
440

441
func (a *app) syncBatch(
442
        ctx context.Context,
443
        devices []model.Device,
444
        integCache map[uuid.UUID]*model.Integration,
445
        failEarly bool,
UNCOV
446
) error {
×
UNCOV
447
        var err error
×
UNCOV
448
        l := log.FromContext(ctx)
×
UNCOV
449

×
UNCOV
450
        deviceMap := make(map[uuid.UUID][]string, len(integCache))
×
UNCOV
451
        for _, dev := range devices {
×
UNCOV
452
                for _, id := range dev.IntegrationIDs {
×
UNCOV
453
                        deviceMap[id] = append(deviceMap[id], dev.ID)
×
UNCOV
454
                }
×
455
        }
456

UNCOV
457
        for integID, deviceIDs := range deviceMap {
×
UNCOV
458
                integration, ok := integCache[integID]
×
UNCOV
459
                if !ok {
×
460
                        // (Data race) Try again to fetch the integration
×
461
                        integration, err = a.store.GetIntegrationById(ctx, integID)
×
462
                        if err != nil {
×
463
                                if err == store.ErrObjectNotFound {
×
464
                                        integCache[integID] = nil
×
465
                                        continue
×
466
                                }
467
                                err = errors.Wrap(err, "failed to retrieve device integration")
×
468
                                if failEarly {
×
469
                                        return err
×
470
                                }
×
471
                                l.Errorf("failed to get device integration: %s", err)
×
472
                                continue
×
473
                        } else {
×
474
                                integCache[integID] = integration
×
475
                        }
×
476
                }
UNCOV
477
                if integration == nil {
×
478
                        // Should not occur, but is not impossible since mongo client
×
479
                        // caches batches of results.
×
480
                        _, err := a.store.RemoveDevicesFromIntegration(ctx, integID)
×
481
                        if err != nil {
×
482
                                err = errors.Wrap(err, "failed to remove integration from devices")
×
483
                                if failEarly {
×
484
                                        return err
×
485
                                }
×
486
                                l.Error(err)
×
487
                        }
488
                        continue
×
489
                }
490

UNCOV
491
                switch integration.Provider {
×
UNCOV
492
                case model.ProviderIoTHub:
×
UNCOV
493
                        err := a.syncIoTHubDevices(ctx, deviceIDs, *integration, failEarly)
×
UNCOV
494
                        if err != nil {
×
495
                                if failEarly {
×
496
                                        return err
×
497
                                }
×
498
                                l.Error(err)
×
499
                        }
UNCOV
500
                case model.ProviderIoTCore:
×
UNCOV
501
                        err := a.syncIoTCoreDevices(ctx, deviceIDs, *integration, failEarly)
×
UNCOV
502
                        if err != nil {
×
503
                                if failEarly {
×
504
                                        return err
×
505
                                }
×
506
                                l.Error(err)
×
507
                        }
508
                default:
×
509
                }
510
        }
511

UNCOV
512
        return nil
×
513
}
514

UNCOV
515
func (a app) syncCacheIntegrations(ctx context.Context) (map[uuid.UUID]*model.Integration, error) {
×
UNCOV
516
        // NOTE At the time of writing this, we don't allow more than one
×
UNCOV
517
        //      integration per tenant so this const doesn't matter.
×
UNCOV
518
        // TODO Will we need a more sophisticated cache data structure?
×
UNCOV
519
        const MaxIntegrationsToCache = 20
×
UNCOV
520
        // Cache integrations for the given tenant
×
UNCOV
521
        integrations, err := a.store.GetIntegrations(
×
UNCOV
522
                ctx, model.IntegrationFilter{Limit: MaxIntegrationsToCache},
×
UNCOV
523
        )
×
UNCOV
524
        if err != nil {
×
525
                return nil, errors.Wrap(err, "failed to get integrations for tenant")
×
526
        }
×
UNCOV
527
        integCache := make(map[uuid.UUID]*model.Integration, len(integrations))
×
UNCOV
528
        for i := range integrations {
×
UNCOV
529
                integCache[integrations[i].ID] = &integrations[i]
×
UNCOV
530
        }
×
UNCOV
531
        return integCache, nil
×
532
}
533

534
func (a *app) SyncDevices(
535
        ctx context.Context,
536
        batchSize int,
537
        failEarly bool,
UNCOV
538
) error {
×
UNCOV
539
        type DeviceWithTenantID struct {
×
UNCOV
540
                model.Device `bson:",inline"`
×
UNCOV
541
                TenantID     string `bson:"tenant_id"`
×
UNCOV
542
        }
×
UNCOV
543
        iter, err := a.store.GetAllDevices(ctx)
×
UNCOV
544
        if err != nil {
×
545
                return err
×
546
        }
×
UNCOV
547
        defer iter.Close(ctx)
×
UNCOV
548

×
UNCOV
549
        var (
×
UNCOV
550
                deviceBatch        = make([]model.Device, 0, batchSize)
×
UNCOV
551
                tenantID    string = ""
×
UNCOV
552
                integCache  map[uuid.UUID]*model.Integration
×
UNCOV
553
        )
×
UNCOV
554
        tCtx := identity.WithContext(ctx, &identity.Identity{
×
UNCOV
555
                Tenant: tenantID,
×
UNCOV
556
        })
×
UNCOV
557
        integCache, err = a.syncCacheIntegrations(tCtx)
×
UNCOV
558
        if err != nil {
×
559
                return err
×
560
        }
×
UNCOV
561
        for iter.Next(ctx) {
×
UNCOV
562
                dev := DeviceWithTenantID{}
×
UNCOV
563
                err := iter.Decode(&dev)
×
UNCOV
564
                if err != nil {
×
565
                        return err
×
566
                }
×
UNCOV
567
                if len(deviceBatch) == cap(deviceBatch) ||
×
UNCOV
568
                        (tenantID != dev.TenantID && len(deviceBatch) > 0) {
×
UNCOV
569
                        err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
×
UNCOV
570
                        if err != nil {
×
571
                                return err
×
572
                        }
×
UNCOV
573
                        deviceBatch = deviceBatch[:0]
×
574
                }
UNCOV
575
                if tenantID != dev.TenantID {
×
UNCOV
576
                        tenantID = dev.TenantID
×
UNCOV
577
                        tCtx = identity.WithContext(ctx, &identity.Identity{
×
UNCOV
578
                                Tenant: tenantID,
×
UNCOV
579
                        })
×
UNCOV
580

×
UNCOV
581
                        integCache, err = a.syncCacheIntegrations(tCtx)
×
UNCOV
582
                        if err != nil {
×
583
                                return err
×
584
                        }
×
585

586
                }
UNCOV
587
                deviceBatch = append(deviceBatch, dev.Device)
×
588
        }
UNCOV
589
        if len(deviceBatch) > 0 {
×
UNCOV
590
                err := a.syncBatch(tCtx, deviceBatch, integCache, failEarly)
×
UNCOV
591
                if err != nil {
×
592
                        return err
×
593
                }
×
594
        }
UNCOV
595
        return nil
×
596
}
597

598
func (a *app) DecommissionDevice(ctx context.Context, deviceID string) error {
1✔
599
        go func() {
2✔
600
                ctxWithTimeout, cancel := context.WithTimeout(context.Background(), a.webhooksTimeout)
1✔
601
                ctxWithTimeout = identity.WithContext(ctxWithTimeout, identity.FromContext(ctx))
1✔
602
                defer cancel()
1✔
603
                runAndLogError(ctxWithTimeout, func() error {
2✔
604
                        return a.decommissionDevice(ctxWithTimeout, deviceID)
1✔
605
                })
1✔
606
        }()
607
        return nil
1✔
608
}
609

610
func (a *app) decommissionDevice(ctx context.Context, deviceID string) error {
1✔
611
        integrations, err := a.GetIntegrations(ctx)
1✔
612
        if err != nil {
2✔
613
                return err
1✔
614
        }
1✔
615
        var (
1✔
616
                device = newDevice(deviceID, a.store)
1✔
617
        )
1✔
618
        event := model.Event{
1✔
619
                WebhookEvent: model.WebhookEvent{
1✔
620
                        ID:   uuid.New(),
1✔
621
                        Type: model.EventTypeDeviceDecommissioned,
1✔
622
                        Data: model.DeviceEvent{
1✔
623
                                ID: deviceID,
1✔
624
                        },
1✔
625
                        EventTS: time.Now(),
1✔
626
                },
1✔
627
                DeliveryStatus: make([]model.DeliveryStatus, 0, len(integrations)),
1✔
628
        }
1✔
629
        for _, integration := range integrations {
2✔
630
                var (
1✔
631
                        err error
1✔
632
                        ok  bool
1✔
633
                )
1✔
634
                deliver := model.DeliveryStatus{
1✔
635
                        IntegrationID: integration.ID,
1✔
636
                        Success:       true,
1✔
637
                }
1✔
638
                switch integration.Provider {
1✔
639
                case model.ProviderIoTHub:
1✔
640
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
641
                        if err != nil {
1✔
642
                                break // switch
×
643
                        } else if !ok {
2✔
644
                                continue // loop
1✔
645
                        }
646
                        err = a.decommissionIoTHubDevice(ctx, deviceID, integration)
1✔
647
                case model.ProviderIoTCore:
1✔
648
                        ok, err = device.HasIntegration(ctx, integration.ID)
1✔
649
                        if err != nil {
1✔
650
                                break // switch
×
651
                        } else if !ok {
1✔
652
                                continue // loop
×
653
                        }
654
                        err = a.decommissionIoTCoreDevice(ctx, deviceID, integration)
1✔
655
                case model.ProviderWebhook:
1✔
656
                        var (
1✔
657
                                req *http.Request
1✔
658
                                rsp *http.Response
1✔
659
                        )
1✔
660
                        req, err = client.NewWebhookRequest(ctx,
1✔
661
                                &integration.Credentials,
1✔
662
                                event.WebhookEvent)
1✔
663
                        if err != nil {
2✔
664
                                break // switch
1✔
665
                        }
666
                        rsp, err = a.httpClient.Do(req)
1✔
667
                        if err != nil {
2✔
668
                                break // switch
1✔
669
                        }
670
                        deliver.StatusCode = &rsp.StatusCode
1✔
671
                        if rsp.StatusCode >= 300 {
2✔
672
                                err = client.NewHTTPError(rsp.StatusCode)
1✔
673
                        }
1✔
674
                        _ = rsp.Body.Close()
1✔
675

676
                default:
1✔
677
                        continue
1✔
678
                }
679
                if err != nil {
2✔
680
                        var httpError client.HTTPError
1✔
681
                        if errors.As(err, &httpError) {
2✔
682
                                errCode := httpError.Code()
1✔
683
                                deliver.StatusCode = &errCode
1✔
684
                        }
1✔
685
                        deliver.Success = false
1✔
686
                        deliver.Error = err.Error()
1✔
687
                }
688
                event.DeliveryStatus = append(event.DeliveryStatus, deliver)
1✔
689
        }
690
        err = a.store.DeleteDevice(ctx, deviceID)
1✔
691
        if errors.Is(err, store.ErrObjectNotFound) {
2✔
692
                err = nil
1✔
693
        }
1✔
694
        if err != nil {
2✔
695
                // Add the `panic` keyword field so we can trace database inconsistencies
1✔
696
                log.FromContext(ctx).
1✔
697
                        WithField("panic", err.Error()).
1✔
698
                        Errorf("failed to remove device from database: %s", err.Error())
1✔
699
        }
1✔
700
        err = a.store.SaveEvent(ctx, event)
1✔
701
        return err
1✔
702
}
703

704
func (a *app) GetDevice(ctx context.Context, deviceID string) (*model.Device, error) {
1✔
705
        device, err := a.store.GetDevice(ctx, deviceID)
1✔
706
        if err == store.ErrObjectNotFound {
2✔
707
                return nil, ErrDeviceNotFound
1✔
708
        }
1✔
709
        return device, err
1✔
710
}
711

712
func (a *app) GetDeviceStateIntegration(
713
        ctx context.Context,
714
        deviceID string,
715
        integrationID uuid.UUID,
716
) (*model.DeviceState, error) {
1✔
717
        _, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
1✔
718
        if err != nil {
2✔
719
                if err == store.ErrObjectNotFound {
2✔
720
                        return nil, ErrIntegrationNotFound
1✔
721
                }
1✔
722
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
723
        }
724
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
1✔
725
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
2✔
726
                return nil, ErrIntegrationNotFound
1✔
727
        } else if err != nil {
3✔
728
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
729
        }
1✔
730
        switch integration.Provider {
1✔
731
        case model.ProviderIoTHub:
1✔
732
                return a.GetDeviceStateIoTHub(ctx, deviceID, integration)
1✔
733
        case model.ProviderIoTCore:
1✔
734
                return a.GetDeviceStateIoTCore(ctx, deviceID, integration)
1✔
735
        default:
1✔
736
                return nil, ErrUnknownIntegration
1✔
737
        }
738
}
739

740
func (a *app) SetDeviceStateIntegration(
741
        ctx context.Context,
742
        deviceID string,
743
        integrationID uuid.UUID,
744
        state *model.DeviceState,
745
) (*model.DeviceState, error) {
1✔
746
        device, err := a.store.GetDeviceByIntegrationID(ctx, deviceID, integrationID)
1✔
747
        if err != nil {
2✔
748
                return nil, errors.Wrap(err, "failed to retrieve the device")
1✔
749
        } else if device == nil {
3✔
750
                return nil, ErrIntegrationNotFound
1✔
751
        }
1✔
752
        integration, err := a.store.GetIntegrationById(ctx, integrationID)
1✔
753
        if integration == nil && (err == nil || err == store.ErrObjectNotFound) {
2✔
754
                return nil, ErrIntegrationNotFound
1✔
755
        } else if err != nil {
3✔
756
                return nil, errors.Wrap(err, "failed to retrieve the integration")
1✔
757
        }
1✔
758
        switch integration.Provider {
1✔
759
        case model.ProviderIoTHub:
1✔
760
                return a.SetDeviceStateIoTHub(ctx, deviceID, integration, state)
1✔
761
        case model.ProviderIoTCore:
×
762
                return a.SetDeviceStateIoTCore(ctx, deviceID, integration, state)
×
763
        default:
1✔
764
                return nil, ErrUnknownIntegration
1✔
765
        }
766
}
767

768
func (a *app) GetEvents(ctx context.Context, filter model.EventsFilter) ([]model.Event, error) {
1✔
769
        return a.store.GetEvents(ctx, filter)
1✔
770
}
1✔
771

772
func runAndLogError(ctx context.Context, f func() error) {
1✔
773
        var err error
1✔
774
        logger := log.FromContext(ctx)
1✔
775
        defer func() {
2✔
776
                if r := recover(); r != nil {
1✔
777
                        logger.WithField("panic", r).
×
778
                                Error(errors.Wrap(err, "panic processing asynchronous webhook"))
×
779
                } else if err != nil {
2✔
780
                        logger.WithField("error", err.Error()).
1✔
781
                                Error(errors.Wrap(err, "failed to process an asynchronous webhook"))
1✔
782
                }
1✔
783
        }()
784
        err = f()
1✔
785
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc