• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / deployments / 843450222

pending completion
843450222

Pull #854

gitlab-ci

Alf-Rune Siqveland
chore: Add `--throttle` flag to `propagate-reporting` command
Pull Request #854: chore: Add `--throttle` flag to `propagate-reporting` command

8 of 11 new or added lines in 1 file covered. (72.73%)

434 existing lines in 4 files now uncovered.

6943 of 8758 relevant lines covered (79.28%)

70.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.89
/app/app.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//        Licensed under the Apache License, Version 2.0 (the "License");
4
//        you may not use this file except in compliance with the License.
5
//        You may obtain a copy of the License at
6
//
7
//            http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//        Unless required by applicable law or agreed to in writing, software
10
//        distributed under the License is distributed on an "AS IS" BASIS,
11
//        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//        See the License for the specific language governing permissions and
13
//        limitations under the License.
14

15
package app
16

17
import (
18
        "bytes"
19
        "context"
20
        "encoding/json"
21
        "io"
22
        "path"
23
        "reflect"
24
        "strings"
25
        "time"
26

27
        "github.com/google/uuid"
28
        "github.com/pkg/errors"
29

30
        "github.com/mendersoftware/go-lib-micro/identity"
31
        "github.com/mendersoftware/go-lib-micro/log"
32
        "github.com/mendersoftware/mender-artifact/areader"
33
        "github.com/mendersoftware/mender-artifact/artifact"
34
        "github.com/mendersoftware/mender-artifact/awriter"
35
        "github.com/mendersoftware/mender-artifact/handlers"
36

37
        "github.com/mendersoftware/deployments/client/inventory"
38
        "github.com/mendersoftware/deployments/client/reporting"
39
        "github.com/mendersoftware/deployments/client/workflows"
40
        "github.com/mendersoftware/deployments/model"
41
        "github.com/mendersoftware/deployments/storage"
42
        "github.com/mendersoftware/deployments/store"
43
        "github.com/mendersoftware/deployments/store/mongo"
44
        "github.com/mendersoftware/deployments/utils"
45
)
46

47
const (
48
        ArtifactContentType              = "application/vnd.mender-artifact"
49
        ArtifactConfigureProvides        = "data-partition.mender-configure.version"
50
        ArtifactConfigureProvidesCleared = "data-partition.mender-configure.*"
51

52
        DefaultUpdateDownloadLinkExpire  = 24 * time.Hour
53
        DefaultImageGenerationLinkExpire = 7 * 24 * time.Hour
54
        PerPageInventoryDevices          = 512
55
        InventoryGroupScope              = "system"
56
        InventoryIdentityScope           = "identity"
57
        InventoryGroupAttributeName      = "group"
58
        InventoryStatusAttributeName     = "status"
59
        InventoryStatusAccepted          = "accepted"
60

61
        fileSuffixTmp = ".tmp"
62

63
        inprogressIdleTime = time.Hour
64
)
65

66
var (
67
        ArtifactConfigureType = "mender-configure"
68
)
69

70
// Errors expected from App interface
71
var (
72
        // images
73
        ErrImageMetaNotFound                = errors.New("Image metadata is not found")
74
        ErrModelMultipartUploadMsgMalformed = errors.New("Multipart upload message malformed")
75
        ErrModelMissingInputMetadata        = errors.New("Missing input metadata")
76
        ErrModelMissingInputArtifact        = errors.New("Missing input artifact")
77
        ErrModelInvalidMetadata             = errors.New("Metadata invalid")
78
        ErrModelArtifactNotUnique           = errors.New("Artifact not unique")
79
        ErrModelImageInActiveDeployment     = errors.New(
80
                "Image is used in active deployment and cannot be removed",
81
        )
82
        ErrModelImageUsedInAnyDeployment = errors.New("Image has already been used in deployment")
83
        ErrModelParsingArtifactFailed    = errors.New("Cannot parse artifact file")
84
        ErrUploadNotFound                = errors.New("artifact object not found")
85

86
        ErrMsgArtifactConflict = "An artifact with the same name has conflicting dependencies"
87

88
        // deployments
89
        ErrModelMissingInput       = errors.New("Missing input deployment data")
90
        ErrModelInvalidDeviceID    = errors.New("Invalid device ID")
91
        ErrModelDeploymentNotFound = errors.New("Deployment not found")
92
        ErrModelInternal           = errors.New("Internal error")
93
        ErrStorageInvalidLog       = errors.New("Invalid deployment log")
94
        ErrStorageNotFound         = errors.New("Not found")
95
        ErrDeploymentAborted       = errors.New("Deployment aborted")
96
        ErrDeviceDecommissioned    = errors.New("Device decommissioned")
97
        ErrNoArtifact              = errors.New("No artifact for the deployment")
98
        ErrNoDevices               = errors.New("No devices for the deployment")
99
        ErrDuplicateDeployment     = errors.New("Deployment with given ID already exists")
100
        ErrInvalidDeploymentID     = errors.New("Deployment ID must be a valid UUID")
101
        ErrConflictingRequestData  = errors.New("Device provided conflicting request data")
102
)
103

104
//deployments
105

106
//go:generate ../utils/mockgen.sh
107
type App interface {
108
        HealthCheck(ctx context.Context) error
109
        // limits
110
        GetLimit(ctx context.Context, name string) (*model.Limit, error)
111
        ProvisionTenant(ctx context.Context, tenant_id string) error
112

113
        // Storage Settings
114
        GetStorageSettings(ctx context.Context) (*model.StorageSettings, error)
115
        SetStorageSettings(ctx context.Context, storageSettings *model.StorageSettings) error
116

117
        // images
118
        ListImages(
119
                ctx context.Context,
120
                filters *model.ReleaseOrImageFilter,
121
        ) ([]*model.Image, int, error)
122
        DownloadLink(ctx context.Context, imageID string,
123
                expire time.Duration) (*model.Link, error)
124
        UploadLink(ctx context.Context, expire time.Duration) (*model.UploadLink, error)
125
        CompleteUpload(ctx context.Context, intentID string) error
126
        GetImage(ctx context.Context, id string) (*model.Image, error)
127
        DeleteImage(ctx context.Context, imageID string) error
128
        CreateImage(ctx context.Context,
129
                multipartUploadMsg *model.MultipartUploadMsg) (string, error)
130
        GenerateImage(ctx context.Context,
131
                multipartUploadMsg *model.MultipartGenerateImageMsg) (string, error)
132
        GenerateConfigurationImage(
133
                ctx context.Context,
134
                deviceType string,
135
                deploymentID string,
136
        ) (io.Reader, error)
137
        EditImage(ctx context.Context, id string,
138
                constructorData *model.ImageMeta) (bool, error)
139

140
        // deployments
141
        CreateDeployment(ctx context.Context,
142
                constructor *model.DeploymentConstructor) (string, error)
143
        GetDeployment(ctx context.Context, deploymentID string) (*model.Deployment, error)
144
        IsDeploymentFinished(ctx context.Context, deploymentID string) (bool, error)
145
        AbortDeployment(ctx context.Context, deploymentID string) error
146
        GetDeploymentStats(ctx context.Context, deploymentID string) (model.Stats, error)
147
        GetDeploymentsStats(ctx context.Context,
148
                deploymentIDs ...string) ([]*model.DeploymentStats, error)
149
        GetDeploymentForDeviceWithCurrent(ctx context.Context, deviceID string,
150
                request *model.DeploymentNextRequest) (*model.DeploymentInstructions, error)
151
        HasDeploymentForDevice(ctx context.Context, deploymentID string,
152
                deviceID string) (bool, error)
153
        UpdateDeviceDeploymentStatus(ctx context.Context, deploymentID string,
154
                deviceID string, state model.DeviceDeploymentState) error
155
        GetDeviceStatusesForDeployment(ctx context.Context,
156
                deploymentID string) ([]model.DeviceDeployment, error)
157
        GetDevicesListForDeployment(ctx context.Context,
158
                query store.ListQuery) ([]model.DeviceDeployment, int, error)
159
        GetDeviceDeploymentListForDevice(ctx context.Context,
160
                query store.ListQueryDeviceDeployments) ([]model.DeviceDeploymentListItem, int, error)
161
        LookupDeployment(ctx context.Context,
162
                query model.Query) ([]*model.Deployment, int64, error)
163
        SaveDeviceDeploymentLog(ctx context.Context, deviceID string,
164
                deploymentID string, logs []model.LogMessage) error
165
        GetDeviceDeploymentLog(ctx context.Context,
166
                deviceID, deploymentID string) (*model.DeploymentLog, error)
167
        AbortDeviceDeployments(ctx context.Context, deviceID string) error
168
        DeleteDeviceDeploymentsHistory(ctx context.Context, deviceId string) error
169
        DecommissionDevice(ctx context.Context, deviceID string) error
170
        CreateDeviceConfigurationDeployment(
171
                ctx context.Context, constructor *model.ConfigurationDeploymentConstructor,
172
                deviceID, deploymentID string) (string, error)
173
        UpdateDeploymentsWithArtifactName(
174
                ctx context.Context,
175
                artifactName string,
176
        ) error
177
}
178

179
type Deployments struct {
180
        db              store.DataStore
181
        objectStorage   storage.ObjectStorage
182
        workflowsClient workflows.Client
183
        inventoryClient inventory.Client
184
        reportingClient reporting.Client
185
}
186

187
// Compile-time check
188
var _ App = &Deployments{}
189

190
func NewDeployments(
191
        storage store.DataStore,
192
        objectStorage storage.ObjectStorage,
193
) *Deployments {
121✔
194
        return &Deployments{
121✔
195
                db:              storage,
121✔
196
                objectStorage:   objectStorage,
121✔
197
                workflowsClient: workflows.NewClient(),
121✔
198
                inventoryClient: inventory.NewClient(),
121✔
199
        }
121✔
200
}
121✔
201

202
func (d *Deployments) SetWorkflowsClient(workflowsClient workflows.Client) {
8✔
203
        d.workflowsClient = workflowsClient
8✔
204
}
8✔
205

206
func (d *Deployments) SetInventoryClient(inventoryClient inventory.Client) {
16✔
207
        d.inventoryClient = inventoryClient
16✔
208
}
16✔
209

210
func (d *Deployments) HealthCheck(ctx context.Context) error {
12✔
211
        err := d.db.Ping(ctx)
12✔
212
        if err != nil {
14✔
213
                return errors.Wrap(err, "error reaching MongoDB")
2✔
214
        }
2✔
215
        err = d.objectStorage.HealthCheck(ctx)
10✔
216
        if err != nil {
12✔
217
                return errors.Wrap(
2✔
218
                        err,
2✔
219
                        "error reaching artifact storage service",
2✔
220
                )
2✔
221
        }
2✔
222

223
        err = d.workflowsClient.CheckHealth(ctx)
8✔
224
        if err != nil {
10✔
225
                return errors.Wrap(err, "Workflows service unhealthy")
2✔
226
        }
2✔
227

228
        err = d.inventoryClient.CheckHealth(ctx)
6✔
229
        if err != nil {
8✔
230
                return errors.Wrap(err, "Inventory service unhealthy")
2✔
231
        }
2✔
232

233
        if d.reportingClient != nil {
8✔
234
                err = d.reportingClient.CheckHealth(ctx)
4✔
235
                if err != nil {
6✔
236
                        return errors.Wrap(err, "Reporting service unhealthy")
2✔
237
                }
2✔
238
        }
239
        return nil
2✔
240
}
241

242
func (d *Deployments) contextWithStorageSettings(
243
        ctx context.Context,
244
) (context.Context, error) {
43✔
245
        var err error
43✔
246
        settings, ok := storage.SettingsFromContext(ctx)
43✔
247
        if !ok {
82✔
248
                settings, err = d.db.GetStorageSettings(ctx)
39✔
249
        }
39✔
250
        if err != nil {
47✔
251
                return nil, err
4✔
252
        } else if settings != nil {
43✔
UNCOV
253
                err = settings.Validate()
×
UNCOV
254
                if err != nil {
×
UNCOV
255
                        return nil, err
×
UNCOV
256
                }
×
257
        }
258
        return storage.SettingsWithContext(ctx, settings), nil
39✔
259
}
260

261
func (d *Deployments) GetLimit(ctx context.Context, name string) (*model.Limit, error) {
6✔
262
        limit, err := d.db.GetLimit(ctx, name)
6✔
263
        if err == mongo.ErrLimitNotFound {
8✔
264
                return &model.Limit{
2✔
265
                        Name:  name,
2✔
266
                        Value: 0,
2✔
267
                }, nil
2✔
268

2✔
269
        } else if err != nil {
8✔
270
                return nil, errors.Wrap(err, "failed to obtain limit from storage")
2✔
271
        }
2✔
272
        return limit, nil
2✔
273
}
274

275
func (d *Deployments) ProvisionTenant(ctx context.Context, tenant_id string) error {
5✔
276
        if err := d.db.ProvisionTenant(ctx, tenant_id); err != nil {
7✔
277
                return errors.Wrap(err, "failed to provision tenant")
2✔
278
        }
2✔
279

280
        return nil
3✔
281
}
282

283
// CreateImage parses artifact and uploads artifact file to the file storage - in parallel,
284
// and creates image structure in the system.
285
// Returns image ID and nil on success.
286
func (d *Deployments) CreateImage(ctx context.Context,
287
        multipartUploadMsg *model.MultipartUploadMsg) (string, error) {
1✔
288
        return d.handleArtifact(ctx, multipartUploadMsg)
1✔
289
}
1✔
290

291
// handleArtifact parses artifact and uploads artifact file to the file storage - in parallel,
292
// and creates image structure in the system.
293
// Returns image ID, artifact file ID and nil on success.
294
func (d *Deployments) handleArtifact(ctx context.Context,
295
        multipartUploadMsg *model.MultipartUploadMsg) (string, error) {
5✔
296

5✔
297
        l := log.FromContext(ctx)
5✔
298
        ctx, err := d.contextWithStorageSettings(ctx)
5✔
299
        if err != nil {
5✔
UNCOV
300
                return "", err
×
UNCOV
301
        }
×
302

303
        // create pipe
304
        pR, pW := io.Pipe()
5✔
305

5✔
306
        artifactReader := utils.CountReads(multipartUploadMsg.ArtifactReader)
5✔
307

5✔
308
        tee := io.TeeReader(artifactReader, pW)
5✔
309

5✔
310
        uid, err := uuid.Parse(multipartUploadMsg.ArtifactID)
5✔
311
        if err != nil {
6✔
312
                uid, _ = uuid.NewRandom()
1✔
313
        }
1✔
314
        artifactID := uid.String()
5✔
315

5✔
316
        ch := make(chan error)
5✔
317
        // create goroutine for artifact upload
5✔
318
        //
5✔
319
        // reading from the pipe (which is done in UploadArtifact method) is a blocking operation
5✔
320
        // and cannot be done in the same goroutine as writing to the pipe
5✔
321
        //
5✔
322
        // uploading and parsing artifact in the same process will cause in a deadlock!
5✔
323
        //nolint:errcheck
5✔
324
        go func() (err error) {
10✔
325
                defer func() { ch <- err }()
10✔
326
                err = d.objectStorage.PutObject(
5✔
327
                        ctx, model.ImagePathFromContext(ctx, artifactID), pR,
5✔
328
                )
5✔
329
                if err != nil {
6✔
330
                        pR.CloseWithError(err)
1✔
331
                }
1✔
332
                return err
5✔
333
        }()
334

335
        // parse artifact
336
        // artifact library reads all the data from the given reader
337
        metaArtifactConstructor, err := getMetaFromArchive(&tee)
5✔
338
        if err != nil {
10✔
339
                _ = pW.CloseWithError(err)
5✔
340
                <-ch
5✔
341
                return artifactID, errors.Wrap(ErrModelParsingArtifactFailed, err.Error())
5✔
342
        }
5✔
343
        // validate artifact metadata
344
        if err = metaArtifactConstructor.Validate(); err != nil {
1✔
UNCOV
345
                return artifactID, ErrModelInvalidMetadata
×
UNCOV
346
        }
×
347

348
        // read the rest of the data,
349
        // just in case the artifact library did not read all the data from the reader
350
        _, err = io.Copy(io.Discard, tee)
1✔
351
        if err != nil {
1✔
352
                // CloseWithError will cause the reading end to abort upload.
×
353
                _ = pW.CloseWithError(err)
×
UNCOV
354
                <-ch
×
UNCOV
355
                return artifactID, err
×
UNCOV
356
        }
×
357

358
        // close the pipe
359
        pW.Close()
1✔
360

1✔
361
        // collect output from the goroutine
1✔
362
        if uploadResponseErr := <-ch; uploadResponseErr != nil {
1✔
363
                return artifactID, uploadResponseErr
×
UNCOV
364
        }
×
365

366
        image := model.NewImage(
1✔
367
                artifactID,
1✔
368
                multipartUploadMsg.MetaConstructor,
1✔
369
                metaArtifactConstructor,
1✔
370
                artifactReader.Count(),
1✔
371
        )
1✔
372

1✔
373
        // save image structure in the system
1✔
374
        if err = d.db.InsertImage(ctx, image); err != nil {
1✔
UNCOV
375
                // Try to remove the storage from s3.
×
UNCOV
376
                if errDelete := d.objectStorage.DeleteObject(
×
UNCOV
377
                        ctx, model.ImagePathFromContext(ctx, artifactID),
×
UNCOV
378
                ); errDelete != nil {
×
UNCOV
379
                        l.Errorf(
×
UNCOV
380
                                "failed to clean up artifact storage after failure: %s",
×
UNCOV
381
                                errDelete,
×
382
                        )
×
383
                }
×
384
                if idxErr, ok := err.(*model.ConflictError); ok {
×
385
                        return artifactID, idxErr
×
386
                }
×
387
                return artifactID, errors.Wrap(err, "Fail to store the metadata")
×
388
        }
389
        if err := d.UpdateDeploymentsWithArtifactName(ctx, metaArtifactConstructor.Name); err != nil {
1✔
390
                return "", errors.Wrap(err, "fail to update deployments")
×
391
        }
×
392

393
        return artifactID, nil
1✔
394
}
395

396
// GenerateImage parses raw data and uploads it to the file storage - in parallel,
397
// creates image structure in the system, and starts the workflow to generate the
398
// artifact from them.
399
// Returns image ID and nil on success.
400
func (d *Deployments) GenerateImage(ctx context.Context,
401
        multipartGenerateImageMsg *model.MultipartGenerateImageMsg) (string, error) {
21✔
402

21✔
403
        if multipartGenerateImageMsg == nil {
23✔
404
                return "", ErrModelMultipartUploadMsgMalformed
2✔
405
        }
2✔
406

407
        imgPath, err := d.handleRawFile(ctx, multipartGenerateImageMsg)
19✔
408
        if err != nil {
29✔
409
                return "", err
10✔
410
        }
10✔
411
        if id := identity.FromContext(ctx); id != nil && len(id.Tenant) > 0 {
11✔
412
                multipartGenerateImageMsg.TenantID = id.Tenant
2✔
413
        }
2✔
414
        err = d.workflowsClient.StartGenerateArtifact(ctx, multipartGenerateImageMsg)
9✔
415
        if err != nil {
13✔
416
                if cleanupErr := d.objectStorage.DeleteObject(ctx, imgPath); cleanupErr != nil {
6✔
417
                        return "", errors.Wrap(err, cleanupErr.Error())
2✔
418
                }
2✔
419
                return "", err
2✔
420
        }
421

422
        return multipartGenerateImageMsg.ArtifactID, err
5✔
423
}
424

425
func (d *Deployments) GenerateConfigurationImage(
426
        ctx context.Context,
427
        deviceType string,
428
        deploymentID string,
429
) (io.Reader, error) {
9✔
430
        var buf bytes.Buffer
9✔
431
        dpl, err := d.db.FindDeploymentByID(ctx, deploymentID)
9✔
432
        if err != nil {
11✔
433
                return nil, err
2✔
434
        } else if dpl == nil {
11✔
435
                return nil, ErrModelDeploymentNotFound
2✔
436
        }
2✔
437
        var metaData map[string]interface{}
5✔
438
        err = json.Unmarshal(dpl.Configuration, &metaData)
5✔
439
        if err != nil {
7✔
440
                return nil, errors.Wrapf(err, "malformed configuration in deployment")
2✔
441
        }
2✔
442

443
        artieWriter := awriter.NewWriter(&buf, artifact.NewCompressorNone())
3✔
444
        module := handlers.NewModuleImage(ArtifactConfigureType)
3✔
445
        err = artieWriter.WriteArtifact(&awriter.WriteArtifactArgs{
3✔
446
                Format:  "mender",
3✔
447
                Version: 3,
3✔
448
                Devices: []string{deviceType},
3✔
449
                Name:    dpl.ArtifactName,
3✔
450
                Updates: &awriter.Updates{Updates: []handlers.Composer{module}},
3✔
451
                Depends: &artifact.ArtifactDepends{
3✔
452
                        CompatibleDevices: []string{deviceType},
3✔
453
                },
3✔
454
                Provides: &artifact.ArtifactProvides{
3✔
455
                        ArtifactName: dpl.ArtifactName,
3✔
456
                },
3✔
457
                MetaData: metaData,
3✔
458
                TypeInfoV3: &artifact.TypeInfoV3{
3✔
459
                        Type: &ArtifactConfigureType,
3✔
460
                        ArtifactProvides: artifact.TypeInfoProvides{
3✔
461
                                ArtifactConfigureProvides: dpl.ArtifactName,
3✔
462
                        },
3✔
463
                        ArtifactDepends:        artifact.TypeInfoDepends{},
3✔
464
                        ClearsArtifactProvides: []string{ArtifactConfigureProvidesCleared},
3✔
465
                },
3✔
466
        })
3✔
467

3✔
468
        return &buf, err
3✔
469
}
470

471
// handleRawFile parses raw data, uploads it to the file storage,
472
// and starts the workflow to generate the artifact.
473
// Returns the object path to the file and nil on success.
474
func (d *Deployments) handleRawFile(ctx context.Context,
475
        multipartMsg *model.MultipartGenerateImageMsg) (filePath string, err error) {
19✔
476
        l := log.FromContext(ctx)
19✔
477
        uid, _ := uuid.NewRandom()
19✔
478
        artifactID := uid.String()
19✔
479
        multipartMsg.ArtifactID = artifactID
19✔
480
        filePath = model.ImagePathFromContext(ctx, artifactID+fileSuffixTmp)
19✔
481

19✔
482
        // check if artifact is unique
19✔
483
        // artifact is considered to be unique if there is no artifact with the same name
19✔
484
        // and supporting the same platform in the system
19✔
485
        isArtifactUnique, err := d.db.IsArtifactUnique(ctx,
19✔
486
                multipartMsg.Name,
19✔
487
                multipartMsg.DeviceTypesCompatible,
19✔
488
        )
19✔
489
        if err != nil {
21✔
490
                return "", errors.Wrap(err, "Fail to check if artifact is unique")
2✔
491
        }
2✔
492
        if !isArtifactUnique {
19✔
493
                return "", ErrModelArtifactNotUnique
2✔
494
        }
2✔
495

496
        ctx, err = d.contextWithStorageSettings(ctx)
15✔
497
        if err != nil {
15✔
UNCOV
498
                return "", err
×
UNCOV
499
        }
×
500
        err = d.objectStorage.PutObject(
15✔
501
                ctx, filePath, multipartMsg.FileReader,
15✔
502
        )
15✔
503
        if err != nil {
17✔
504
                return "", err
2✔
505
        }
2✔
506
        defer func() {
26✔
507
                if err != nil {
17✔
508
                        e := d.objectStorage.DeleteObject(ctx, filePath)
4✔
509
                        if e != nil {
8✔
510
                                l.Errorf("error cleaning up raw file '%s' from objectstorage: %s",
4✔
511
                                        filePath, e)
4✔
512
                        }
4✔
513
                }
514
        }()
515

516
        link, err := d.objectStorage.GetRequest(
13✔
517
                ctx,
13✔
518
                filePath,
13✔
519
                path.Base(filePath),
13✔
520
                DefaultImageGenerationLinkExpire,
13✔
521
        )
13✔
522
        if err != nil {
15✔
523
                return "", err
2✔
524
        }
2✔
525
        multipartMsg.GetArtifactURI = link.Uri
11✔
526

11✔
527
        link, err = d.objectStorage.DeleteRequest(ctx, filePath, DefaultImageGenerationLinkExpire)
11✔
528
        if err != nil {
13✔
529
                return "", err
2✔
530
        }
2✔
531
        multipartMsg.DeleteArtifactURI = link.Uri
9✔
532

9✔
533
        return artifactID, nil
9✔
534
}
535

536
// GetImage allows to fetch image object with specified id
537
// Nil if not found
538
func (d *Deployments) GetImage(ctx context.Context, id string) (*model.Image, error) {
1✔
539

1✔
540
        image, err := d.db.FindImageByID(ctx, id)
1✔
541
        if err != nil {
1✔
UNCOV
542
                return nil, errors.Wrap(err, "Searching for image with specified ID")
×
UNCOV
543
        }
×
544

545
        if image == nil {
2✔
546
                return nil, nil
1✔
547
        }
1✔
548

549
        return image, nil
1✔
550
}
551

552
// DeleteImage removes metadata and image file
553
// Noop for not existing images
554
// Allowed to remove image only if image is not scheduled or in progress for an updates - then image
555
// file is needed
556
// In case of already finished updates only image file is not needed, metadata is attached directly
557
// to device deployment therefore we still have some information about image that have been used
558
// (but not the file)
559
func (d *Deployments) DeleteImage(ctx context.Context, imageID string) error {
1✔
560
        found, err := d.GetImage(ctx, imageID)
1✔
561

1✔
562
        if err != nil {
1✔
UNCOV
563
                return errors.Wrap(err, "Getting image metadata")
×
UNCOV
564
        }
×
565

566
        if found == nil {
1✔
UNCOV
567
                return ErrImageMetaNotFound
×
UNCOV
568
        }
×
569

570
        inUse, err := d.ImageUsedInActiveDeployment(ctx, imageID)
1✔
571
        if err != nil {
1✔
UNCOV
572
                return errors.Wrap(err, "Checking if image is used in active deployment")
×
UNCOV
573
        }
×
574

575
        // Image is in use, not allowed to delete
576
        if inUse {
2✔
577
                return ErrModelImageInActiveDeployment
1✔
578
        }
1✔
579

580
        // Delete image file (call to external service)
581
        // Noop for not existing file
582
        ctx, err = d.contextWithStorageSettings(ctx)
1✔
583
        if err != nil {
1✔
UNCOV
584
                return err
×
UNCOV
585
        }
×
586
        imagePath := model.ImagePathFromContext(ctx, imageID)
1✔
587
        if err := d.objectStorage.DeleteObject(ctx, imagePath); err != nil {
1✔
UNCOV
588
                return errors.Wrap(err, "Deleting image file")
×
UNCOV
589
        }
×
590

591
        // Delete metadata
592
        if err := d.db.DeleteImage(ctx, imageID); err != nil {
1✔
UNCOV
593
                return errors.Wrap(err, "Deleting image metadata")
×
UNCOV
594
        }
×
595

596
        return nil
1✔
597
}
598

599
// ListImages according to specified filers.
600
func (d *Deployments) ListImages(
601
        ctx context.Context,
602
        filters *model.ReleaseOrImageFilter,
603
) ([]*model.Image, int, error) {
1✔
604
        imageList, count, err := d.db.ListImages(ctx, filters)
1✔
605
        if err != nil {
1✔
UNCOV
606
                return nil, 0, errors.Wrap(err, "Searching for image metadata")
×
UNCOV
607
        }
×
608

609
        if imageList == nil {
2✔
610
                return make([]*model.Image, 0), 0, nil
1✔
611
        }
1✔
612

613
        return imageList, count, nil
1✔
614
}
615

616
// EditObject allows editing only if image have not been used yet in any deployment.
617
func (d *Deployments) EditImage(ctx context.Context, imageID string,
UNCOV
618
        constructor *model.ImageMeta) (bool, error) {
×
UNCOV
619

×
UNCOV
620
        if err := constructor.Validate(); err != nil {
×
UNCOV
621
                return false, errors.Wrap(err, "Validating image metadata")
×
UNCOV
622
        }
×
623

UNCOV
624
        found, err := d.ImageUsedInDeployment(ctx, imageID)
×
625
        if err != nil {
×
626
                return false, errors.Wrap(err, "Searching for usage of the image among deployments")
×
627
        }
×
628

629
        if found {
×
UNCOV
630
                return false, ErrModelImageUsedInAnyDeployment
×
631
        }
×
632

633
        foundImage, err := d.db.FindImageByID(ctx, imageID)
×
634
        if err != nil {
×
UNCOV
635
                return false, errors.Wrap(err, "Searching for image with specified ID")
×
636
        }
×
637

638
        if foundImage == nil {
×
UNCOV
639
                return false, nil
×
640
        }
×
641

642
        foundImage.SetModified(time.Now())
×
643
        foundImage.ImageMeta = constructor
×
UNCOV
644

×
645
        _, err = d.db.Update(ctx, foundImage)
×
646
        if err != nil {
×
647
                return false, errors.Wrap(err, "Updating image matadata")
×
UNCOV
648
        }
×
649

650
        return true, nil
×
651
}
652

653
// DownloadLink presigned GET link to download image file.
654
// Returns error if image have not been uploaded.
655
func (d *Deployments) DownloadLink(ctx context.Context, imageID string,
656
        expire time.Duration) (*model.Link, error) {
1✔
657

1✔
658
        image, err := d.GetImage(ctx, imageID)
1✔
659
        if err != nil {
1✔
UNCOV
660
                return nil, errors.Wrap(err, "Searching for image with specified ID")
×
UNCOV
661
        }
×
662

663
        if image == nil {
1✔
UNCOV
664
                return nil, nil
×
UNCOV
665
        }
×
666

667
        ctx, err = d.contextWithStorageSettings(ctx)
1✔
668
        if err != nil {
1✔
UNCOV
669
                return nil, err
×
UNCOV
670
        }
×
671
        imagePath := model.ImagePathFromContext(ctx, imageID)
1✔
672
        _, err = d.objectStorage.StatObject(ctx, imagePath)
1✔
673
        if err != nil {
1✔
UNCOV
674
                return nil, errors.Wrap(err, "Searching for image file")
×
UNCOV
675
        }
×
676

677
        link, err := d.objectStorage.GetRequest(
1✔
678
                ctx,
1✔
679
                imagePath,
1✔
680
                image.Name+model.ArtifactFileSuffix,
1✔
681
                expire,
1✔
682
        )
1✔
683
        if err != nil {
1✔
UNCOV
684
                return nil, errors.Wrap(err, "Generating download link")
×
UNCOV
685
        }
×
686

687
        return link, nil
1✔
688
}
689

690
func (d *Deployments) UploadLink(
691
        ctx context.Context,
692
        expire time.Duration,
693
) (*model.UploadLink, error) {
11✔
694
        ctx, err := d.contextWithStorageSettings(ctx)
11✔
695
        if err != nil {
13✔
696
                return nil, err
2✔
697
        }
2✔
698

699
        artifactID := uuid.New().String()
9✔
700
        path := model.ImagePathFromContext(ctx, artifactID) + fileSuffixTmp
9✔
701
        link, err := d.objectStorage.PutRequest(ctx, path, expire)
9✔
702
        if err != nil {
11✔
703
                return nil, errors.WithMessage(err, "app: failed to generate signed URL")
2✔
704
        }
2✔
705
        upLink := &model.UploadLink{
7✔
706
                ArtifactID: artifactID,
7✔
707
                IssuedAt:   time.Now(),
7✔
708
                Link:       *link,
7✔
709
        }
7✔
710
        err = d.db.InsertUploadIntent(ctx, upLink)
7✔
711
        if err != nil {
9✔
712
                return nil, errors.WithMessage(err, "app: error recording the upload intent")
2✔
713
        }
2✔
714

715
        return upLink, err
5✔
716
}
717

718
func (d *Deployments) processUploadedArtifact(
719
        ctx context.Context,
720
        artifactID string,
721
        artifact io.ReadCloser) error {
5✔
722
        linkStatus := model.LinkStatusCompleted
5✔
723

5✔
724
        l := log.FromContext(ctx)
5✔
725
        defer artifact.Close()
5✔
726
        ctx, cancel := context.WithCancel(ctx)
5✔
727
        defer cancel()
5✔
728
        go func() { // Heatbeat routine
10✔
729
                ticker := time.NewTicker(inprogressIdleTime / 2)
5✔
730
                done := ctx.Done()
5✔
731
                defer ticker.Stop()
5✔
732
                for {
10✔
733
                        select {
5✔
UNCOV
734
                        case <-ticker.C:
×
UNCOV
735
                                err := d.db.UpdateUploadIntentStatus(
×
UNCOV
736
                                        ctx,
×
UNCOV
737
                                        artifactID,
×
UNCOV
738
                                        model.LinkStatusProcessing,
×
UNCOV
739
                                        model.LinkStatusProcessing,
×
UNCOV
740
                                )
×
741
                                if err != nil {
×
742
                                        l.Errorf("failed to update upload link timestamp: %s", err)
×
743
                                        cancel()
×
744
                                        return
×
745
                                }
×
746
                        case <-done:
5✔
747
                                return
5✔
748
                        }
749
                }
750
        }()
751
        _, err := d.handleArtifact(ctx, &model.MultipartUploadMsg{
5✔
752
                ArtifactID:     artifactID,
5✔
753
                ArtifactReader: artifact,
5✔
754
        })
5✔
755
        if err != nil {
9✔
756
                l.Warnf("failed to process artifact %s: %s", artifactID, err)
4✔
757
                linkStatus = model.LinkStatusAborted
4✔
758
        }
4✔
759
        errDB := d.db.UpdateUploadIntentStatus(
5✔
760
                ctx, artifactID,
5✔
761
                model.LinkStatusProcessing, linkStatus,
5✔
762
        )
5✔
763
        if errDB != nil {
7✔
764
                l.Warnf("failed to update upload link status: %s", errDB)
2✔
765
        }
2✔
766
        return err
5✔
767
}
768

769
func (d *Deployments) CompleteUpload(
770
        ctx context.Context,
771
        intentID string,
772
) error {
15✔
773
        l := log.FromContext(ctx)
15✔
774
        idty := identity.FromContext(ctx)
15✔
775
        ctx, err := d.contextWithStorageSettings(ctx)
15✔
776
        if err != nil {
17✔
777
                return err
2✔
778
        }
2✔
779
        // Create an async context that does'nt cancel when server connection
780
        // closes.
781
        ctxAsync := context.Background()
13✔
782
        ctxAsync = log.WithContext(ctxAsync, l)
13✔
783
        ctxAsync = identity.WithContext(ctxAsync, idty)
13✔
784

13✔
785
        settings, _ := storage.SettingsFromContext(ctx)
13✔
786
        ctxAsync = storage.SettingsWithContext(ctxAsync, settings)
13✔
787
        artifactReader, err := d.objectStorage.GetObject(
13✔
788
                ctxAsync,
13✔
789
                model.ImagePathFromContext(ctx, intentID)+fileSuffixTmp,
13✔
790
        )
13✔
791
        if err != nil {
17✔
792
                if errors.Is(err, storage.ErrObjectNotFound) {
6✔
793
                        return ErrUploadNotFound
2✔
794
                }
2✔
795
                return err
2✔
796
        }
797

798
        err = d.db.UpdateUploadIntentStatus(
9✔
799
                ctx,
9✔
800
                intentID,
9✔
801
                model.LinkStatusPending,
9✔
802
                model.LinkStatusProcessing,
9✔
803
        )
9✔
804
        if err != nil {
13✔
805
                errClose := artifactReader.Close()
4✔
806
                if errClose != nil {
6✔
807
                        l.Warnf("failed to close artifact reader: %s", errClose)
2✔
808
                }
2✔
809
                if errors.Is(err, store.ErrNotFound) {
6✔
810
                        return ErrUploadNotFound
2✔
811
                }
2✔
812
                return err
2✔
813
        }
814
        go d.processUploadedArtifact( // nolint:errcheck
5✔
815
                ctxAsync, intentID, artifactReader,
5✔
816
        )
5✔
817
        return nil
5✔
818
}
819

820
func getArtifactInfo(info artifact.Info) *model.ArtifactInfo {
1✔
821
        return &model.ArtifactInfo{
1✔
822
                Format:  info.Format,
1✔
823
                Version: uint(info.Version),
1✔
824
        }
1✔
825
}
1✔
826

827
func getUpdateFiles(uFiles []*handlers.DataFile) ([]model.UpdateFile, error) {
1✔
828
        var files []model.UpdateFile
1✔
829
        for _, u := range uFiles {
2✔
830
                files = append(files, model.UpdateFile{
1✔
831
                        Name:     u.Name,
1✔
832
                        Size:     u.Size,
1✔
833
                        Date:     &u.Date,
1✔
834
                        Checksum: string(u.Checksum),
1✔
835
                })
1✔
836
        }
1✔
837
        return files, nil
1✔
838
}
839

840
func getMetaFromArchive(r *io.Reader) (*model.ArtifactMeta, error) {
5✔
841
        metaArtifact := model.NewArtifactMeta()
5✔
842

5✔
843
        aReader := areader.NewReader(*r)
5✔
844

5✔
845
        // There is no signature verification here.
5✔
846
        // It is just simple check if artifact is signed or not.
5✔
847
        aReader.VerifySignatureCallback = func(message, sig []byte) error {
5✔
UNCOV
848
                metaArtifact.Signed = true
×
UNCOV
849
                return nil
×
UNCOV
850
        }
×
851

852
        err := aReader.ReadArtifact()
5✔
853
        if err != nil {
10✔
854
                return nil, errors.Wrap(err, "reading artifact error")
5✔
855
        }
5✔
856

857
        metaArtifact.Info = getArtifactInfo(aReader.GetInfo())
1✔
858
        metaArtifact.DeviceTypesCompatible = aReader.GetCompatibleDevices()
1✔
859

1✔
860
        metaArtifact.Name = aReader.GetArtifactName()
1✔
861
        if metaArtifact.Info.Version == 3 {
2✔
862
                metaArtifact.Depends, err = aReader.MergeArtifactDepends()
1✔
863
                if err != nil {
1✔
UNCOV
864
                        return nil, errors.Wrap(err,
×
UNCOV
865
                                "error parsing version 3 artifact")
×
UNCOV
866
                }
×
867

868
                metaArtifact.Provides, err = aReader.MergeArtifactProvides()
1✔
869
                if err != nil {
1✔
UNCOV
870
                        return nil, errors.Wrap(err,
×
871
                                "error parsing version 3 artifact")
×
872
                }
×
873

874
                metaArtifact.ClearsProvides = aReader.MergeArtifactClearsProvides()
1✔
875
        }
876

877
        for _, p := range aReader.GetHandlers() {
2✔
878
                uFiles, err := getUpdateFiles(p.GetUpdateFiles())
1✔
879
                if err != nil {
1✔
UNCOV
880
                        return nil, errors.Wrap(err, "Cannot get update files:")
×
UNCOV
881
                }
×
882

883
                uMetadata, err := p.GetUpdateMetaData()
1✔
884
                if err != nil {
1✔
UNCOV
885
                        return nil, errors.Wrap(err, "Cannot get update metadata")
×
UNCOV
886
                }
×
887

888
                metaArtifact.Updates = append(
1✔
889
                        metaArtifact.Updates,
1✔
890
                        model.Update{
1✔
891
                                TypeInfo: model.ArtifactUpdateTypeInfo{
1✔
892
                                        Type: p.GetUpdateType(),
1✔
893
                                },
1✔
894
                                Files:    uFiles,
1✔
895
                                MetaData: uMetadata,
1✔
896
                        })
1✔
897
        }
898

899
        return metaArtifact, nil
1✔
900
}
901

902
func getArtifactIDs(artifacts []*model.Image) []string {
13✔
903
        artifactIDs := make([]string, 0, len(artifacts))
13✔
904
        for _, artifact := range artifacts {
26✔
905
                artifactIDs = append(artifactIDs, artifact.Id)
13✔
906
        }
13✔
907
        return artifactIDs
13✔
908
}
909

910
// deployments
911
func inventoryDevicesToDevicesIds(devices []model.InvDevice) []string {
8✔
912
        ids := make([]string, len(devices))
8✔
913
        for i, d := range devices {
16✔
914
                ids[i] = d.ID
8✔
915
        }
8✔
916

917
        return ids
8✔
918
}
919

920
// updateDeploymentConstructor fills devices list with device ids
921
func (d *Deployments) updateDeploymentConstructor(ctx context.Context,
922
        constructor *model.DeploymentConstructor) (*model.DeploymentConstructor, error) {
10✔
923
        l := log.FromContext(ctx)
10✔
924

10✔
925
        id := identity.FromContext(ctx)
10✔
926
        if id == nil {
10✔
UNCOV
927
                l.Error("identity not present in the context")
×
UNCOV
928
                return nil, ErrModelInternal
×
UNCOV
929
        }
×
930
        searchParams := model.SearchParams{
10✔
931
                Page:    1,
10✔
932
                PerPage: PerPageInventoryDevices,
10✔
933
                Filters: []model.FilterPredicate{
10✔
934
                        {
10✔
935
                                Scope:     InventoryIdentityScope,
10✔
936
                                Attribute: InventoryStatusAttributeName,
10✔
937
                                Type:      "$eq",
10✔
938
                                Value:     InventoryStatusAccepted,
10✔
939
                        },
10✔
940
                },
10✔
941
        }
10✔
942
        if len(constructor.Group) > 0 {
20✔
943
                searchParams.Filters = append(
10✔
944
                        searchParams.Filters,
10✔
945
                        model.FilterPredicate{
10✔
946
                                Scope:     InventoryGroupScope,
10✔
947
                                Attribute: InventoryGroupAttributeName,
10✔
948
                                Type:      "$eq",
10✔
949
                                Value:     constructor.Group,
10✔
950
                        })
10✔
951
        }
10✔
952

953
        for {
22✔
954
                devices, count, err := d.search(ctx, id.Tenant, searchParams)
12✔
955
                if err != nil {
14✔
956
                        l.Errorf("error searching for devices")
2✔
957
                        return nil, ErrModelInternal
2✔
958
                }
2✔
959
                if count < 1 {
12✔
960
                        l.Errorf("no devices found")
2✔
961
                        return nil, ErrNoDevices
2✔
962
                }
2✔
963
                if len(devices) < 1 {
8✔
UNCOV
964
                        break
×
965
                }
966
                constructor.Devices = append(constructor.Devices, inventoryDevicesToDevicesIds(devices)...)
8✔
967
                if len(constructor.Devices) == count {
14✔
968
                        break
6✔
969
                }
970
                searchParams.Page++
2✔
971
        }
972

973
        return constructor, nil
6✔
974
}
975

976
// CreateDeviceConfigurationDeployment creates new configuration deployment for the device.
977
func (d *Deployments) CreateDeviceConfigurationDeployment(
978
        ctx context.Context, constructor *model.ConfigurationDeploymentConstructor,
979
        deviceID, deploymentID string) (string, error) {
9✔
980

9✔
981
        if constructor == nil {
11✔
982
                return "", ErrModelMissingInput
2✔
983
        }
2✔
984

985
        deployment, err := model.NewDeploymentFromConfigurationDeploymentConstructor(
7✔
986
                constructor,
7✔
987
                deploymentID,
7✔
988
        )
7✔
989
        if err != nil {
7✔
UNCOV
990
                return "", errors.Wrap(err, "failed to create deployment")
×
UNCOV
991
        }
×
992

993
        deployment.DeviceList = []string{deviceID}
7✔
994
        deployment.MaxDevices = 1
7✔
995
        deployment.Configuration = []byte(constructor.Configuration)
7✔
996
        deployment.Type = model.DeploymentTypeConfiguration
7✔
997

7✔
998
        groups, err := d.getDeploymentGroups(ctx, []string{deviceID})
7✔
999
        if err != nil {
9✔
1000
                return "", err
2✔
1001
        }
2✔
1002
        deployment.Groups = groups
5✔
1003

5✔
1004
        if err := d.db.InsertDeployment(ctx, deployment); err != nil {
8✔
1005
                if strings.Contains(err.Error(), "duplicate key error") {
4✔
1006
                        return "", ErrDuplicateDeployment
1✔
1007
                }
1✔
1008
                if strings.Contains(err.Error(), "id: must be a valid UUID") {
4✔
1009
                        return "", ErrInvalidDeploymentID
1✔
1010
                }
1✔
1011
                return "", errors.Wrap(err, "Storing deployment data")
2✔
1012
        }
1013

1014
        return deployment.Id, nil
3✔
1015
}
1016

1017
// CreateDeployment precomputes new deployment and schedules it for devices.
1018
func (d *Deployments) CreateDeployment(ctx context.Context,
1019
        constructor *model.DeploymentConstructor) (string, error) {
17✔
1020

17✔
1021
        var err error
17✔
1022

17✔
1023
        if constructor == nil {
19✔
1024
                return "", ErrModelMissingInput
2✔
1025
        }
2✔
1026

1027
        if err := constructor.Validate(); err != nil {
15✔
UNCOV
1028
                return "", errors.Wrap(err, "Validating deployment")
×
UNCOV
1029
        }
×
1030

1031
        if len(constructor.Group) > 0 || constructor.AllDevices {
25✔
1032
                constructor, err = d.updateDeploymentConstructor(ctx, constructor)
10✔
1033
                if err != nil {
14✔
1034
                        return "", err
4✔
1035
                }
4✔
1036
        }
1037

1038
        deployment, err := model.NewDeploymentFromConstructor(constructor)
11✔
1039
        if err != nil {
11✔
UNCOV
1040
                return "", errors.Wrap(err, "failed to create deployment")
×
UNCOV
1041
        }
×
1042

1043
        // Assign artifacts to the deployment.
1044
        // When new artifact(s) with the artifact name same as the one in the deployment
1045
        // will be uploaded to the backend, it will also become part of this deployment.
1046
        artifacts, err := d.db.ImagesByName(ctx, deployment.ArtifactName)
11✔
1047
        if err != nil {
11✔
1048
                return "", errors.Wrap(err, "Finding artifact with given name")
×
UNCOV
1049
        }
×
1050

1051
        if len(artifacts) == 0 {
12✔
1052
                return "", ErrNoArtifact
1✔
1053
        }
1✔
1054

1055
        deployment.Artifacts = getArtifactIDs(artifacts)
11✔
1056
        deployment.DeviceList = constructor.Devices
11✔
1057
        deployment.MaxDevices = len(constructor.Devices)
11✔
1058
        deployment.Type = model.DeploymentTypeSoftware
11✔
1059
        if len(constructor.Group) > 0 {
17✔
1060
                deployment.Groups = []string{constructor.Group}
6✔
1061
        }
6✔
1062

1063
        // single device deployment case
1064
        if len(deployment.Groups) == 0 && len(constructor.Devices) == 1 {
16✔
1065
                groups, err := d.getDeploymentGroups(ctx, constructor.Devices)
5✔
1066
                if err != nil {
5✔
UNCOV
1067
                        return "", err
×
UNCOV
1068
                }
×
1069
                deployment.Groups = groups
5✔
1070
        }
1071

1072
        if err := d.db.InsertDeployment(ctx, deployment); err != nil {
13✔
1073
                return "", errors.Wrap(err, "Storing deployment data")
2✔
1074
        }
2✔
1075

1076
        return deployment.Id, nil
9✔
1077
}
1078

1079
func (d *Deployments) getDeploymentGroups(
1080
        ctx context.Context,
1081
        devices []string,
1082
) ([]string, error) {
11✔
1083
        id := identity.FromContext(ctx)
11✔
1084

11✔
1085
        //only for single device deployment case
11✔
1086
        if len(devices) != 1 {
11✔
UNCOV
1087
                return nil, nil
×
UNCOV
1088
        }
×
1089

1090
        if id == nil {
12✔
1091
                id = &identity.Identity{}
1✔
1092
        }
1✔
1093

1094
        groups, err := d.inventoryClient.GetDeviceGroups(ctx, id.Tenant, devices[0])
11✔
1095
        if err != nil && err != inventory.ErrDevNotFound {
13✔
1096
                return nil, err
2✔
1097
        }
2✔
1098
        return groups, nil
9✔
1099
}
1100

1101
// IsDeploymentFinished checks if there is unfinished deployment with given ID
1102
func (d *Deployments) IsDeploymentFinished(
1103
        ctx context.Context,
1104
        deploymentID string,
1105
) (bool, error) {
1✔
1106
        deployment, err := d.db.FindUnfinishedByID(ctx, deploymentID)
1✔
1107
        if err != nil {
1✔
UNCOV
1108
                return false, errors.Wrap(err, "Searching for unfinished deployment by ID")
×
UNCOV
1109
        }
×
1110
        if deployment == nil {
2✔
1111
                return true, nil
1✔
1112
        }
1✔
1113

1114
        return false, nil
1✔
1115
}
1116

1117
// GetDeployment fetches deployment by ID
1118
func (d *Deployments) GetDeployment(ctx context.Context,
1119
        deploymentID string) (*model.Deployment, error) {
1✔
1120

1✔
1121
        deployment, err := d.db.FindDeploymentByID(ctx, deploymentID)
1✔
1122
        if err != nil {
1✔
UNCOV
1123
                return nil, errors.Wrap(err, "Searching for deployment by ID")
×
UNCOV
1124
        }
×
1125

1126
        if err := d.setDeploymentDeviceCountIfUnset(ctx, deployment); err != nil {
1✔
UNCOV
1127
                return nil, err
×
UNCOV
1128
        }
×
1129

1130
        return deployment, nil
1✔
1131
}
1132

1133
// ImageUsedInActiveDeployment checks if specified image is in use by deployments Image is
1134
// considered to be in use if it's participating in at lest one non success/error deployment.
1135
func (d *Deployments) ImageUsedInActiveDeployment(ctx context.Context,
1136
        imageID string) (bool, error) {
7✔
1137

7✔
1138
        var found bool
7✔
1139

7✔
1140
        found, err := d.db.ExistUnfinishedByArtifactId(ctx, imageID)
7✔
1141
        if err != nil {
9✔
1142
                return false, errors.Wrap(err, "Checking if image is used by active deployment")
2✔
1143
        }
2✔
1144

1145
        if found {
6✔
1146
                return found, nil
1✔
1147
        }
1✔
1148

1149
        found, err = d.db.ExistAssignedImageWithIDAndStatuses(ctx,
5✔
1150
                imageID, model.ActiveDeploymentStatuses()...)
5✔
1151
        if err != nil {
7✔
1152
                return false, errors.Wrap(err, "Checking if image is used by active deployment")
2✔
1153
        }
2✔
1154

1155
        return found, nil
3✔
1156
}
1157

1158
// ImageUsedInDeployment checks if specified image is in use by deployments
1159
// Image is considered to be in use if it's participating in any deployment.
UNCOV
1160
func (d *Deployments) ImageUsedInDeployment(ctx context.Context, imageID string) (bool, error) {
×
UNCOV
1161

×
UNCOV
1162
        var found bool
×
UNCOV
1163

×
UNCOV
1164
        found, err := d.db.ExistUnfinishedByArtifactId(ctx, imageID)
×
UNCOV
1165
        if err != nil {
×
UNCOV
1166
                return false, errors.Wrap(err, "Checking if image is used by active deployment")
×
1167
        }
×
1168

1169
        if found {
×
1170
                return found, nil
×
1171
        }
×
1172

1173
        found, err = d.db.ExistAssignedImageWithIDAndStatuses(ctx, imageID)
×
1174
        if err != nil {
×
UNCOV
1175
                return false, errors.Wrap(err, "Checking if image is used in deployment")
×
1176
        }
×
1177

1178
        return found, nil
×
1179
}
1180

1181
// Retrieves the model.Deployment and model.DeviceDeployment structures
1182
// for the device. Upon error, nil is returned for both deployment structures.
1183
func (d *Deployments) getDeploymentForDevice(ctx context.Context,
1184
        deviceID string) (*model.Deployment, *model.DeviceDeployment, error) {
3✔
1185

3✔
1186
        // Retrieve device deployment
3✔
1187
        deviceDeployment, err := d.db.FindOldestActiveDeviceDeployment(ctx, deviceID)
3✔
1188

3✔
1189
        if err != nil {
3✔
UNCOV
1190
                return nil, nil, errors.Wrap(err,
×
UNCOV
1191
                        "Searching for oldest active deployment for the device")
×
1192
        } else if deviceDeployment == nil {
4✔
1193
                return d.getNewDeploymentForDevice(ctx, deviceID)
1✔
1194
        }
1✔
1195

1196
        deployment, err := d.db.FindDeploymentByID(ctx, deviceDeployment.DeploymentId)
3✔
1197
        if err != nil {
3✔
1198
                return nil, nil, errors.Wrap(err, "checking deployment id")
×
UNCOV
1199
        }
×
1200
        if deployment == nil {
3✔
UNCOV
1201
                return nil, nil, errors.New("No deployment corresponding to device deployment")
×
UNCOV
1202
        }
×
1203

1204
        return deployment, deviceDeployment, nil
3✔
1205
}
1206

1207
// getNewDeploymentForDevice returns deployment object and creates and returns
1208
// new device deployment for the device;
1209
//
1210
// we are interested only in the deployments that are newer than the latest
1211
// deployment applied by the device;
1212
// this way we guarantee that the device will not receive deployment
1213
// that is older than the one installed on the device;
1214
func (d *Deployments) getNewDeploymentForDevice(ctx context.Context,
1215
        deviceID string) (*model.Deployment, *model.DeviceDeployment, error) {
1✔
1216

1✔
1217
        var lastDeployment *time.Time
1✔
1218
        //get latest device deployment for the device;
1✔
1219
        deviceDeployment, err := d.db.FindLatestInactiveDeviceDeployment(ctx, deviceID)
1✔
1220
        if err != nil {
1✔
UNCOV
1221
                return nil, nil, errors.Wrap(err,
×
UNCOV
1222
                        "Searching for latest active deployment for the device")
×
1223
        } else if deviceDeployment == nil {
2✔
1224
                lastDeployment = &time.Time{}
1✔
1225
        } else {
2✔
1226
                lastDeployment = deviceDeployment.Created
1✔
1227
        }
1✔
1228

1229
        //get deployments newer then last device deployment
1230
        //iterate over deployments and check if the device is part of the deployment or not
1231
        for skip := 0; true; skip += 100 {
2✔
1232
                deployments, err := d.db.FindNewerActiveDeployments(ctx, lastDeployment, skip, 100)
1✔
1233
                if err != nil {
1✔
UNCOV
1234
                        return nil, nil, errors.Wrap(err,
×
UNCOV
1235
                                "Failed to search for newer active deployments")
×
UNCOV
1236
                }
×
1237
                if len(deployments) == 0 {
2✔
1238
                        return nil, nil, nil
1✔
1239
                }
1✔
1240

1241
                for _, deployment := range deployments {
2✔
1242
                        ok, err := d.isDevicePartOfDeployment(ctx, deviceID, deployment)
1✔
1243
                        if err != nil {
1✔
UNCOV
1244
                                return nil, nil, err
×
UNCOV
1245
                        }
×
1246
                        if ok {
2✔
1247
                                deviceDeployment, err := d.createDeviceDeploymentWithStatus(ctx,
1✔
1248
                                        deviceID, deployment, model.DeviceDeploymentStatusPending)
1✔
1249
                                if err != nil {
1✔
UNCOV
1250
                                        return nil, nil, err
×
1251
                                }
×
1252
                                return deployment, deviceDeployment, nil
1✔
1253
                        }
1254
                }
1255
        }
1256

1257
        return nil, nil, nil
×
1258
}
1259

1260
func (d *Deployments) createDeviceDeploymentWithStatus(
1261
        ctx context.Context, deviceID string,
1262
        deployment *model.Deployment, status model.DeviceDeploymentStatus,
1263
) (*model.DeviceDeployment, error) {
11✔
1264
        prevStatus := model.DeviceDeploymentStatusNull
11✔
1265
        deviceDeployment, err := d.db.GetDeviceDeployment(ctx, deployment.Id, deviceID, true)
11✔
1266
        if err != nil && err != mongo.ErrStorageNotFound {
11✔
UNCOV
1267
                return nil, err
×
1268
        } else if deviceDeployment != nil {
11✔
UNCOV
1269
                prevStatus = deviceDeployment.Status
×
UNCOV
1270
        }
×
1271

1272
        deviceDeployment = model.NewDeviceDeployment(deviceID, deployment.Id)
11✔
1273
        deviceDeployment.Status = status
11✔
1274
        deviceDeployment.Active = status.Active()
11✔
1275
        deviceDeployment.Created = deployment.Created
11✔
1276

11✔
1277
        if err := d.setDeploymentDeviceCountIfUnset(ctx, deployment); err != nil {
11✔
UNCOV
1278
                return nil, err
×
UNCOV
1279
        }
×
1280

1281
        if err := d.db.InsertDeviceDeployment(ctx, deviceDeployment,
11✔
1282
                prevStatus == model.DeviceDeploymentStatusNull); err != nil {
11✔
UNCOV
1283
                return nil, err
×
UNCOV
1284
        }
×
1285

1286
        // after inserting new device deployment update deployment stats
1287
        // in the database and locally, and update deployment status
1288
        if err := d.db.UpdateStatsInc(
11✔
1289
                ctx, deployment.Id,
11✔
1290
                prevStatus, status,
11✔
1291
        ); err != nil {
11✔
UNCOV
1292
                return nil, err
×
UNCOV
1293
        }
×
1294

1295
        deployment.Stats.Inc(status)
11✔
1296

11✔
1297
        err = d.recalcDeploymentStatus(ctx, deployment)
11✔
1298
        if err != nil {
11✔
1299
                return nil, errors.Wrap(err, "failed to update deployment status")
×
1300
        }
×
1301

1302
        if !status.Active() {
21✔
1303
                err := d.reindexDevice(ctx, deviceID)
10✔
1304
                if err != nil {
10✔
UNCOV
1305
                        l := log.FromContext(ctx)
×
1306
                        l.Warn(errors.Wrap(err, "failed to trigger a device reindex"))
×
1307
                }
×
1308
                if err := d.reindexDeployment(ctx, deviceDeployment.DeviceId,
10✔
1309
                        deviceDeployment.DeploymentId, deviceDeployment.Id); err != nil {
10✔
UNCOV
1310
                        l := log.FromContext(ctx)
×
UNCOV
1311
                        l.Warn(errors.Wrap(err, "failed to trigger a device reindex"))
×
1312
                }
×
1313
        }
1314

1315
        return deviceDeployment, nil
11✔
1316
}
1317

1318
func (d *Deployments) isDevicePartOfDeployment(
1319
        ctx context.Context,
1320
        deviceID string,
1321
        deployment *model.Deployment,
1322
) (bool, error) {
15✔
1323
        for _, id := range deployment.DeviceList {
26✔
1324
                if id == deviceID {
22✔
1325
                        return true, nil
11✔
1326
                }
11✔
1327
        }
1328
        return false, nil
5✔
1329
}
1330

1331
// GetDeploymentForDeviceWithCurrent returns deployment for the device
1332
func (d *Deployments) GetDeploymentForDeviceWithCurrent(ctx context.Context, deviceID string,
1333
        request *model.DeploymentNextRequest) (*model.DeploymentInstructions, error) {
3✔
1334

3✔
1335
        deployment, deviceDeployment, err := d.getDeploymentForDevice(ctx, deviceID)
3✔
1336
        if err != nil {
3✔
UNCOV
1337
                return nil, ErrModelInternal
×
1338
        } else if deployment == nil {
4✔
1339
                return nil, nil
1✔
1340
        }
1✔
1341

1342
        err = d.saveDeviceDeploymentRequest(ctx, deviceID, deviceDeployment, request)
3✔
1343
        if err != nil {
4✔
1344
                return nil, err
1✔
1345
        }
1✔
1346
        return d.getDeploymentInstructions(ctx, deployment, deviceDeployment, request)
3✔
1347
}
1348

1349
func (d *Deployments) getDeploymentInstructions(
1350
        ctx context.Context,
1351
        deployment *model.Deployment,
1352
        deviceDeployment *model.DeviceDeployment,
1353
        request *model.DeploymentNextRequest,
1354
) (*model.DeploymentInstructions, error) {
3✔
1355

3✔
1356
        var newArtifactAssigned bool
3✔
1357

3✔
1358
        l := log.FromContext(ctx)
3✔
1359

3✔
1360
        if deployment.Type == model.DeploymentTypeConfiguration {
4✔
1361
                // There's nothing more we need to do, the link must be filled
1✔
1362
                // in by the API layer.
1✔
1363
                return &model.DeploymentInstructions{
1✔
1364
                        ID: deployment.Id,
1✔
1365
                        Artifact: model.ArtifactDeploymentInstructions{
1✔
1366
                                // configuration artifacts are created on demand, so they do not have IDs
1✔
1367
                                // use deployment ID togheter with device ID as artifact ID
1✔
1368
                                ID:                    deployment.Id + deviceDeployment.DeviceId,
1✔
1369
                                ArtifactName:          deployment.ArtifactName,
1✔
1370
                                DeviceTypesCompatible: []string{request.DeviceProvides.DeviceType},
1✔
1371
                        },
1✔
1372
                        Type: model.DeploymentTypeConfiguration,
1✔
1373
                }, nil
1✔
1374
        }
1✔
1375

1376
        // assing artifact to the device deployment
1377
        // only if it was not assgined previously
1378
        if deviceDeployment.Image == nil {
6✔
1379
                if err := d.assignArtifact(
3✔
1380
                        ctx, deployment, deviceDeployment, request.DeviceProvides); err != nil {
3✔
UNCOV
1381
                        return nil, err
×
UNCOV
1382
                }
×
1383
                newArtifactAssigned = true
3✔
1384
        }
1385

1386
        if deviceDeployment.Image == nil {
3✔
UNCOV
1387
                // No artifact - return empty response
×
1388
                return nil, nil
×
1389
        }
×
1390

1391
        // if the deployment is not forcing the installation, and
1392
        // if artifact was recognized as already installed, and this is
1393
        // a new device deployment - indicated by device deployment status "pending",
1394
        // handle already installed artifact case
1395
        if !deployment.ForceInstallation &&
3✔
1396
                d.isAlreadyInstalled(request, deviceDeployment) &&
3✔
1397
                deviceDeployment.Status == model.DeviceDeploymentStatusPending {
6✔
1398
                return nil, d.handleAlreadyInstalled(ctx, deviceDeployment)
3✔
1399
        }
3✔
1400

1401
        // if new artifact has been assigned to device deployment
1402
        // add artifact size to deployment total size,
1403
        // before returning deployment instruction to the device
1404
        if newArtifactAssigned {
2✔
1405
                if err := d.db.IncrementDeploymentTotalSize(
1✔
1406
                        ctx, deviceDeployment.DeploymentId, deviceDeployment.Image.Size); err != nil {
1✔
UNCOV
1407
                        l.Errorf("failed to increment deployment total size: %s", err.Error())
×
UNCOV
1408
                }
×
1409
        }
1410

1411
        ctx, err := d.contextWithStorageSettings(ctx)
1✔
1412
        if err != nil {
1✔
UNCOV
1413
                return nil, err
×
1414
        }
×
1415

1416
        imagePath := model.ImagePathFromContext(ctx, deviceDeployment.Image.Id)
1✔
1417
        link, err := d.objectStorage.GetRequest(
1✔
1418
                ctx,
1✔
1419
                imagePath,
1✔
1420
                deviceDeployment.Image.Name+model.ArtifactFileSuffix,
1✔
1421
                DefaultUpdateDownloadLinkExpire,
1✔
1422
        )
1✔
1423
        if err != nil {
1✔
UNCOV
1424
                return nil, errors.Wrap(err, "Generating download link for the device")
×
UNCOV
1425
        }
×
1426

1427
        instructions := &model.DeploymentInstructions{
1✔
1428
                ID: deviceDeployment.DeploymentId,
1✔
1429
                Artifact: model.ArtifactDeploymentInstructions{
1✔
1430
                        ID: deviceDeployment.Image.Id,
1✔
1431
                        ArtifactName: deviceDeployment.Image.
1✔
1432
                                ArtifactMeta.Name,
1✔
1433
                        Source: *link,
1✔
1434
                        DeviceTypesCompatible: deviceDeployment.Image.
1✔
1435
                                ArtifactMeta.DeviceTypesCompatible,
1✔
1436
                },
1✔
1437
        }
1✔
1438

1✔
1439
        return instructions, nil
1✔
1440
}
1441

1442
func (d *Deployments) saveDeviceDeploymentRequest(ctx context.Context, deviceID string,
1443
        deviceDeployment *model.DeviceDeployment, request *model.DeploymentNextRequest) error {
3✔
1444
        if deviceDeployment.Request != nil {
4✔
1445
                if !reflect.DeepEqual(deviceDeployment.Request, request) {
2✔
1446
                        // the device reported different device type and/or artifact name
1✔
1447
                        // during the update process, which should never happen;
1✔
1448
                        // mark deployment for this device as failed to force client to rollback
1✔
1449
                        l := log.FromContext(ctx)
1✔
1450
                        l.Errorf(
1✔
1451
                                "Device with id %s reported new data: %s during update process;"+
1✔
1452
                                        "old data: %s",
1✔
1453
                                deviceID, request, deviceDeployment.Request)
1✔
1454

1✔
1455
                        if err := d.UpdateDeviceDeploymentStatus(ctx, deviceDeployment.DeploymentId, deviceID,
1✔
1456
                                model.DeviceDeploymentState{
1✔
1457
                                        Status: model.DeviceDeploymentStatusFailure,
1✔
1458
                                }); err != nil {
1✔
UNCOV
1459
                                return errors.Wrap(err, "Failed to update deployment status")
×
UNCOV
1460
                        }
×
1461
                        if err := d.reindexDevice(ctx, deviceDeployment.DeviceId); err != nil {
1✔
UNCOV
1462
                                l.Warn(errors.Wrap(err, "failed to trigger a device reindex"))
×
UNCOV
1463
                        }
×
1464
                        if err := d.reindexDeployment(ctx, deviceDeployment.DeviceId,
1✔
1465
                                deviceDeployment.DeploymentId, deviceDeployment.Id); err != nil {
1✔
1466
                                l := log.FromContext(ctx)
×
1467
                                l.Warn(errors.Wrap(err, "failed to trigger a device reindex"))
×
UNCOV
1468
                        }
×
1469
                        return ErrConflictingRequestData
1✔
1470
                }
1471
        } else {
3✔
1472
                // save the request
3✔
1473
                if err := d.db.SaveDeviceDeploymentRequest(
3✔
1474
                        ctx, deviceDeployment.Id, request); err != nil {
3✔
1475
                        return err
×
UNCOV
1476
                }
×
1477
        }
1478
        return nil
3✔
1479
}
1480

1481
// UpdateDeviceDeploymentStatus will update the deployment status for device of
1482
// ID `deviceID`. Returns nil if update was successful.
1483
func (d *Deployments) UpdateDeviceDeploymentStatus(ctx context.Context, deploymentID string,
1484
        deviceID string, ddState model.DeviceDeploymentState) error {
11✔
1485

11✔
1486
        l := log.FromContext(ctx)
11✔
1487

11✔
1488
        l.Infof("New status: %s for device %s deployment: %v", ddState.Status, deviceID, deploymentID)
11✔
1489

11✔
1490
        var finishTime *time.Time = nil
11✔
1491
        if model.IsDeviceDeploymentStatusFinished(ddState.Status) {
18✔
1492
                now := time.Now()
7✔
1493
                finishTime = &now
7✔
1494
        }
7✔
1495

1496
        dd, err := d.db.GetDeviceDeployment(ctx, deploymentID, deviceID, false)
11✔
1497
        if err == mongo.ErrStorageNotFound {
13✔
1498
                return ErrStorageNotFound
2✔
1499
        } else if err != nil {
11✔
UNCOV
1500
                return err
×
UNCOV
1501
        }
×
1502

1503
        currentStatus := dd.Status
9✔
1504

9✔
1505
        if currentStatus == model.DeviceDeploymentStatusAborted {
9✔
UNCOV
1506
                return ErrDeploymentAborted
×
1507
        }
×
1508

1509
        if currentStatus == model.DeviceDeploymentStatusDecommissioned {
9✔
UNCOV
1510
                return ErrDeviceDecommissioned
×
UNCOV
1511
        }
×
1512

1513
        // nothing to do
1514
        if ddState.Status == currentStatus {
9✔
UNCOV
1515
                return nil
×
UNCOV
1516
        }
×
1517

1518
        // update finish time
1519
        ddState.FinishTime = finishTime
9✔
1520

9✔
1521
        old, err := d.db.UpdateDeviceDeploymentStatus(ctx,
9✔
1522
                deviceID, deploymentID, ddState)
9✔
1523
        if err != nil {
9✔
UNCOV
1524
                return err
×
UNCOV
1525
        }
×
1526

1527
        if err = d.db.UpdateStatsInc(ctx, deploymentID, old, ddState.Status); err != nil {
9✔
UNCOV
1528
                return err
×
UNCOV
1529
        }
×
1530

1531
        // fetch deployment stats and update deployment status
1532
        deployment, err := d.db.FindDeploymentByID(ctx, deploymentID)
9✔
1533
        if err != nil {
9✔
UNCOV
1534
                return errors.Wrap(err, "failed when searching for deployment")
×
1535
        }
×
1536

1537
        err = d.recalcDeploymentStatus(ctx, deployment)
9✔
1538
        if err != nil {
9✔
UNCOV
1539
                return errors.Wrap(err, "failed to update deployment status")
×
UNCOV
1540
        }
×
1541

1542
        if !ddState.Status.Active() {
16✔
1543
                if err := d.reindexDevice(ctx, deviceID); err != nil {
7✔
UNCOV
1544
                        l := log.FromContext(ctx)
×
UNCOV
1545
                        l.Warn(errors.Wrap(err, "failed to trigger a device reindex"))
×
1546
                }
×
1547
                if err := d.reindexDeployment(ctx, dd.DeviceId, dd.DeploymentId, dd.Id); err != nil {
7✔
UNCOV
1548
                        l := log.FromContext(ctx)
×
UNCOV
1549
                        l.Warn(errors.Wrap(err, "failed to trigger a device reindex"))
×
UNCOV
1550
                }
×
1551
        }
1552

1553
        return nil
9✔
1554
}
1555

1556
// recalcDeploymentStatus inspects the deployment stats and
1557
// recalculates and updates its status
1558
// it should be used whenever deployment stats are touched
1559
func (d *Deployments) recalcDeploymentStatus(ctx context.Context, dep *model.Deployment) error {
19✔
1560
        status := dep.GetStatus()
19✔
1561

19✔
1562
        if err := d.db.SetDeploymentStatus(ctx, dep.Id, status, time.Now()); err != nil {
19✔
UNCOV
1563
                return err
×
UNCOV
1564
        }
×
1565

1566
        return nil
19✔
1567
}
1568

1569
func (d *Deployments) GetDeploymentStats(ctx context.Context,
1570
        deploymentID string) (model.Stats, error) {
1✔
1571

1✔
1572
        deployment, err := d.db.FindDeploymentByID(ctx, deploymentID)
1✔
1573

1✔
1574
        if err != nil {
1✔
UNCOV
1575
                return nil, errors.Wrap(err, "checking deployment id")
×
UNCOV
1576
        }
×
1577

1578
        if deployment == nil {
1✔
UNCOV
1579
                return nil, nil
×
UNCOV
1580
        }
×
1581

1582
        return deployment.Stats, nil
1✔
1583
}
1584
func (d *Deployments) GetDeploymentsStats(ctx context.Context,
1585
        deploymentIDs ...string) (deploymentStats []*model.DeploymentStats, err error) {
×
UNCOV
1586

×
UNCOV
1587
        deploymentStats, err = d.db.FindDeploymentStatsByIDs(ctx, deploymentIDs...)
×
1588

×
1589
        if err != nil {
×
UNCOV
1590
                return nil, errors.Wrap(err, "checking deployment statistics for IDs")
×
UNCOV
1591
        }
×
1592

UNCOV
1593
        if deploymentStats == nil {
×
1594
                return nil, ErrModelDeploymentNotFound
×
1595
        }
×
1596

1597
        return deploymentStats, nil
×
1598
}
1599

1600
// GetDeviceStatusesForDeployment retrieve device deployment statuses for a given deployment.
1601
func (d *Deployments) GetDeviceStatusesForDeployment(ctx context.Context,
1602
        deploymentID string) ([]model.DeviceDeployment, error) {
1✔
1603

1✔
1604
        deployment, err := d.db.FindDeploymentByID(ctx, deploymentID)
1✔
1605
        if err != nil {
1✔
1606
                return nil, ErrModelInternal
×
UNCOV
1607
        }
×
1608

1609
        if deployment == nil {
1✔
UNCOV
1610
                return nil, ErrModelDeploymentNotFound
×
UNCOV
1611
        }
×
1612

1613
        statuses, err := d.db.GetDeviceStatusesForDeployment(ctx, deploymentID)
1✔
1614
        if err != nil {
1✔
1615
                return nil, ErrModelInternal
×
1616
        }
×
1617

1618
        return statuses, nil
1✔
1619
}
1620

1621
func (d *Deployments) GetDevicesListForDeployment(ctx context.Context,
1622
        query store.ListQuery) ([]model.DeviceDeployment, int, error) {
1✔
1623

1✔
1624
        deployment, err := d.db.FindDeploymentByID(ctx, query.DeploymentID)
1✔
1625
        if err != nil {
1✔
UNCOV
1626
                return nil, -1, ErrModelInternal
×
UNCOV
1627
        }
×
1628

1629
        if deployment == nil {
1✔
UNCOV
1630
                return nil, -1, ErrModelDeploymentNotFound
×
UNCOV
1631
        }
×
1632

1633
        statuses, totalCount, err := d.db.GetDevicesListForDeployment(ctx, query)
1✔
1634
        if err != nil {
1✔
1635
                return nil, -1, ErrModelInternal
×
1636
        }
×
1637

1638
        return statuses, totalCount, nil
1✔
1639
}
1640

1641
func (d *Deployments) GetDeviceDeploymentListForDevice(ctx context.Context,
1642
        query store.ListQueryDeviceDeployments) ([]model.DeviceDeploymentListItem, int, error) {
8✔
1643
        deviceDeployments, totalCount, err := d.db.GetDeviceDeploymentsForDevice(ctx, query)
8✔
1644
        if err != nil {
10✔
1645
                return nil, -1, errors.Wrap(err, "retrieving the list of deployment statuses")
2✔
1646
        }
2✔
1647

1648
        deploymentIDs := make([]string, len(deviceDeployments))
6✔
1649
        for i, deviceDeployment := range deviceDeployments {
18✔
1650
                deploymentIDs[i] = deviceDeployment.DeploymentId
12✔
1651
        }
12✔
1652

1653
        deployments, _, err := d.db.Find(ctx, model.Query{
6✔
1654
                IDs:          deploymentIDs,
6✔
1655
                Limit:        len(deviceDeployments),
6✔
1656
                DisableCount: true,
6✔
1657
        })
6✔
1658
        if err != nil {
8✔
1659
                return nil, -1, errors.Wrap(err, "retrieving the list of deployments")
2✔
1660
        }
2✔
1661

1662
        deploymentsMap := make(map[string]*model.Deployment, len(deployments))
4✔
1663
        for _, deployment := range deployments {
10✔
1664
                deploymentsMap[deployment.Id] = deployment
6✔
1665
        }
6✔
1666

1667
        res := make([]model.DeviceDeploymentListItem, 0, len(deviceDeployments))
4✔
1668
        for i, deviceDeployment := range deviceDeployments {
12✔
1669
                if deployment, ok := deploymentsMap[deviceDeployment.DeploymentId]; ok {
14✔
1670
                        res = append(res, model.DeviceDeploymentListItem{
6✔
1671
                                Id:         deviceDeployment.Id,
6✔
1672
                                Deployment: deployment,
6✔
1673
                                Device:     &deviceDeployments[i],
6✔
1674
                        })
6✔
1675
                } else {
8✔
1676
                        res = append(res, model.DeviceDeploymentListItem{
2✔
1677
                                Id:     deviceDeployment.Id,
2✔
1678
                                Device: &deviceDeployments[i],
2✔
1679
                        })
2✔
1680
                }
2✔
1681
        }
1682

1683
        return res, totalCount, nil
4✔
1684
}
1685

1686
func (d *Deployments) setDeploymentDeviceCountIfUnset(
1687
        ctx context.Context,
1688
        deployment *model.Deployment,
1689
) error {
11✔
1690
        if deployment.DeviceCount == nil {
11✔
UNCOV
1691
                deviceCount, err := d.db.DeviceCountByDeployment(ctx, deployment.Id)
×
UNCOV
1692
                if err != nil {
×
UNCOV
1693
                        return errors.Wrap(err, "counting device deployments")
×
UNCOV
1694
                }
×
UNCOV
1695
                err = d.db.SetDeploymentDeviceCount(ctx, deployment.Id, deviceCount)
×
UNCOV
1696
                if err != nil {
×
UNCOV
1697
                        return errors.Wrap(err, "setting the device count for the deployment")
×
UNCOV
1698
                }
×
UNCOV
1699
                deployment.DeviceCount = &deviceCount
×
1700
        }
1701

1702
        return nil
11✔
1703
}
1704

1705
func (d *Deployments) LookupDeployment(ctx context.Context,
1706
        query model.Query) ([]*model.Deployment, int64, error) {
1✔
1707
        list, totalCount, err := d.db.Find(ctx, query)
1✔
1708

1✔
1709
        if err != nil {
1✔
UNCOV
1710
                return nil, 0, errors.Wrap(err, "searching for deployments")
×
UNCOV
1711
        }
×
1712

1713
        if list == nil {
2✔
1714
                return make([]*model.Deployment, 0), 0, nil
1✔
1715
        }
1✔
1716

UNCOV
1717
        for _, deployment := range list {
×
UNCOV
1718
                if err := d.setDeploymentDeviceCountIfUnset(ctx, deployment); err != nil {
×
1719
                        return nil, 0, err
×
1720
                }
×
1721
        }
1722

UNCOV
1723
        return list, totalCount, nil
×
1724
}
1725

1726
// SaveDeviceDeploymentLog will save the deployment log for device of
1727
// ID `deviceID`. Returns nil if log was saved successfully.
1728
func (d *Deployments) SaveDeviceDeploymentLog(ctx context.Context, deviceID string,
1729
        deploymentID string, logs []model.LogMessage) error {
1✔
1730

1✔
1731
        // repack to temporary deployment log and validate
1✔
1732
        dlog := model.DeploymentLog{
1✔
1733
                DeviceID:     deviceID,
1✔
1734
                DeploymentID: deploymentID,
1✔
1735
                Messages:     logs,
1✔
1736
        }
1✔
1737
        if err := dlog.Validate(); err != nil {
1✔
UNCOV
1738
                return errors.Wrapf(err, ErrStorageInvalidLog.Error())
×
UNCOV
1739
        }
×
1740

1741
        if has, err := d.HasDeploymentForDevice(ctx, deploymentID, deviceID); !has {
1✔
UNCOV
1742
                if err != nil {
×
UNCOV
1743
                        return err
×
UNCOV
1744
                } else {
×
UNCOV
1745
                        return ErrModelDeploymentNotFound
×
UNCOV
1746
                }
×
1747
        }
1748

1749
        if err := d.db.SaveDeviceDeploymentLog(ctx, dlog); err != nil {
1✔
UNCOV
1750
                return err
×
1751
        }
×
1752

1753
        return d.db.UpdateDeviceDeploymentLogAvailability(ctx,
1✔
1754
                deviceID, deploymentID, true)
1✔
1755
}
1756

1757
func (d *Deployments) GetDeviceDeploymentLog(ctx context.Context,
1758
        deviceID, deploymentID string) (*model.DeploymentLog, error) {
1✔
1759

1✔
1760
        return d.db.GetDeviceDeploymentLog(ctx,
1✔
1761
                deviceID, deploymentID)
1✔
1762
}
1✔
1763

1764
func (d *Deployments) HasDeploymentForDevice(ctx context.Context,
1765
        deploymentID string, deviceID string) (bool, error) {
1✔
1766
        return d.db.HasDeploymentForDevice(ctx, deploymentID, deviceID)
1✔
1767
}
1✔
1768

1769
// AbortDeployment aborts deployment for devices and updates deployment stats
1770
func (d *Deployments) AbortDeployment(ctx context.Context, deploymentID string) error {
9✔
1771

9✔
1772
        if err := d.db.AbortDeviceDeployments(ctx, deploymentID); err != nil {
11✔
1773
                return err
2✔
1774
        }
2✔
1775

1776
        stats, err := d.db.AggregateDeviceDeploymentByStatus(
7✔
1777
                ctx, deploymentID)
7✔
1778
        if err != nil {
9✔
1779
                return err
2✔
1780
        }
2✔
1781

1782
        // update statistics
1783
        if err := d.db.UpdateStats(ctx, deploymentID, stats); err != nil {
7✔
1784
                return errors.Wrap(err, "failed to update deployment stats")
2✔
1785
        }
2✔
1786

1787
        // when aborting the deployment we need to set status directly instead of
1788
        // using recalcDeploymentStatus method;
1789
        // it is possible that the deployment does not have any device deployments yet;
1790
        // in that case, all statistics are 0 and calculating status based on statistics
1791
        // will not work - the calculated status will be "pending"
1792
        if err := d.db.SetDeploymentStatus(ctx,
3✔
1793
                deploymentID, model.DeploymentStatusFinished, time.Now()); err != nil {
3✔
UNCOV
1794
                return errors.Wrap(err, "failed to update deployment status")
×
UNCOV
1795
        }
×
1796

1797
        return nil
3✔
1798
}
1799

1800
func (d *Deployments) updateDeviceDeploymentsStatus(
1801
        ctx context.Context,
1802
        deviceId string,
1803
        status model.DeviceDeploymentStatus,
1804
) error {
30✔
1805
        var latestDeployment *time.Time
30✔
1806
        // Retrieve active device deployment for the device
30✔
1807
        deviceDeployment, err := d.db.FindOldestActiveDeviceDeployment(ctx, deviceId)
30✔
1808
        if err != nil {
34✔
1809
                return errors.Wrap(err, "Searching for active deployment for the device")
4✔
1810
        } else if deviceDeployment != nil {
34✔
1811
                now := time.Now()
4✔
1812
                ddStatus := model.DeviceDeploymentState{
4✔
1813
                        Status:     status,
4✔
1814
                        FinishTime: &now,
4✔
1815
                }
4✔
1816
                if err := d.UpdateDeviceDeploymentStatus(ctx, deviceDeployment.DeploymentId,
4✔
1817
                        deviceId, ddStatus); err != nil {
4✔
UNCOV
1818
                        return errors.Wrap(err, "updating device deployment status")
×
UNCOV
1819
                }
×
1820
                latestDeployment = deviceDeployment.Created
4✔
1821
        } else {
22✔
1822
                // get latest device deployment for the device
22✔
1823
                deviceDeployment, err := d.db.FindLatestInactiveDeviceDeployment(ctx, deviceId)
22✔
1824
                if err != nil {
22✔
UNCOV
1825
                        return errors.Wrap(err, "Searching for latest active deployment for the device")
×
1826
                } else if deviceDeployment == nil {
40✔
1827
                        latestDeployment = &time.Time{}
18✔
1828
                } else {
22✔
1829
                        latestDeployment = deviceDeployment.Created
4✔
1830
                }
4✔
1831
        }
1832

1833
        // get deployments newer then last device deployment
1834
        // iterate over deployments and check if the device is part of the deployment or not
1835
        // if the device is part of the deployment create new, decommisioned device deployment
1836
        for skip := 0; true; skip += 100 {
66✔
1837
                deployments, err := d.db.FindNewerActiveDeployments(ctx, latestDeployment, skip, 100)
40✔
1838
                if err != nil {
40✔
UNCOV
1839
                        return errors.Wrap(err, "Failed to search for newer active deployments")
×
UNCOV
1840
                }
×
1841
                if len(deployments) == 0 {
66✔
1842
                        break
26✔
1843
                }
1844
                for _, deployment := range deployments {
28✔
1845
                        ok, err := d.isDevicePartOfDeployment(ctx, deviceId, deployment)
14✔
1846
                        if err != nil {
14✔
UNCOV
1847
                                return err
×
1848
                        }
×
1849
                        if ok {
24✔
1850
                                deviceDeployment, err := d.createDeviceDeploymentWithStatus(ctx,
10✔
1851
                                        deviceId, deployment, status)
10✔
1852
                                if err != nil {
10✔
UNCOV
1853
                                        return err
×
UNCOV
1854
                                }
×
1855
                                if !status.Active() {
20✔
1856
                                        if err := d.reindexDeployment(ctx, deviceDeployment.DeviceId,
10✔
1857
                                                deviceDeployment.DeploymentId, deviceDeployment.Id); err != nil {
10✔
UNCOV
1858
                                                l := log.FromContext(ctx)
×
UNCOV
1859
                                                l.Warn(errors.Wrap(err, "failed to trigger a device reindex"))
×
UNCOV
1860
                                        }
×
1861
                                }
1862
                        }
1863
                }
1864
        }
1865

1866
        if err := d.reindexDevice(ctx, deviceId); err != nil {
26✔
1867
                l := log.FromContext(ctx)
×
1868
                l.Warn(errors.Wrap(err, "failed to trigger a device reindex"))
×
1869
        }
×
1870

1871
        return nil
26✔
1872
}
1873

1874
// DecommissionDevice updates the status of all the pending and active deployments for a device
1875
// to decommissioned
1876
func (d *Deployments) DecommissionDevice(ctx context.Context, deviceId string) error {
14✔
1877
        return d.updateDeviceDeploymentsStatus(
14✔
1878
                ctx,
14✔
1879
                deviceId,
14✔
1880
                model.DeviceDeploymentStatusDecommissioned,
14✔
1881
        )
14✔
1882
}
14✔
1883

1884
// AbortDeviceDeployments aborts all the pending and active deployments for a device
1885
func (d *Deployments) AbortDeviceDeployments(ctx context.Context, deviceId string) error {
16✔
1886
        return d.updateDeviceDeploymentsStatus(
16✔
1887
                ctx,
16✔
1888
                deviceId,
16✔
1889
                model.DeviceDeploymentStatusAborted,
16✔
1890
        )
16✔
1891
}
16✔
1892

1893
// DeleteDeviceDeploymentsHistory deletes the device deployments history
1894
func (d *Deployments) DeleteDeviceDeploymentsHistory(ctx context.Context, deviceId string) error {
4✔
1895
        // get device deployments which will be marked as deleted
4✔
1896
        f := false
4✔
1897
        dd, err := d.db.GetDeviceDeployments(ctx, 0, 0, deviceId, &f, false)
4✔
1898
        if err != nil {
4✔
UNCOV
1899
                return err
×
UNCOV
1900
        }
×
1901

1902
        // no device deployments to update
1903
        if len(dd) <= 0 {
4✔
UNCOV
1904
                return nil
×
UNCOV
1905
        }
×
1906

1907
        // mark device deployments as deleted
1908
        if err := d.db.DeleteDeviceDeploymentsHistory(ctx, deviceId); err != nil {
6✔
1909
                return err
2✔
1910
        }
2✔
1911

1912
        // trigger reindexing of updated device deployments
1913
        deviceDeployments := make([]workflows.DeviceDeploymentShortInfo, len(dd))
2✔
1914
        for i, d := range dd {
4✔
1915
                deviceDeployments[i].ID = d.Id
2✔
1916
                deviceDeployments[i].DeviceID = d.DeviceId
2✔
1917
                deviceDeployments[i].DeploymentID = d.DeploymentId
2✔
1918
        }
2✔
1919
        return d.workflowsClient.StartReindexReportingDeploymentBatch(ctx, deviceDeployments)
2✔
1920
}
1921

1922
// Storage settings
1923
func (d *Deployments) GetStorageSettings(ctx context.Context) (*model.StorageSettings, error) {
5✔
1924
        settings, err := d.db.GetStorageSettings(ctx)
5✔
1925
        if err != nil {
7✔
1926
                return nil, errors.Wrap(err, "Searching for settings failed")
2✔
1927
        }
2✔
1928

1929
        return settings, nil
3✔
1930
}
1931

1932
func (d *Deployments) SetStorageSettings(
1933
        ctx context.Context,
1934
        storageSettings *model.StorageSettings,
1935
) error {
7✔
1936
        if storageSettings != nil {
14✔
1937
                ctx = storage.SettingsWithContext(ctx, storageSettings)
7✔
1938
                if err := d.objectStorage.HealthCheck(ctx); err != nil {
7✔
UNCOV
1939
                        return errors.WithMessage(err,
×
UNCOV
1940
                                "the provided storage settings failed the health check",
×
UNCOV
1941
                        )
×
UNCOV
1942
                }
×
1943
        }
1944
        if err := d.db.SetStorageSettings(ctx, storageSettings); err != nil {
11✔
1945
                return errors.Wrap(err, "Failed to save settings")
4✔
1946
        }
4✔
1947

1948
        return nil
3✔
1949
}
1950

1951
func (d *Deployments) WithReporting(c reporting.Client) *Deployments {
15✔
1952
        d.reportingClient = c
15✔
1953
        return d
15✔
1954
}
15✔
1955

1956
func (d *Deployments) haveReporting() bool {
12✔
1957
        return d.reportingClient != nil
12✔
1958
}
12✔
1959

1960
func (d *Deployments) search(
1961
        ctx context.Context,
1962
        tid string,
1963
        parms model.SearchParams,
1964
) ([]model.InvDevice, int, error) {
12✔
1965
        if d.haveReporting() {
14✔
1966
                return d.reportingClient.Search(ctx, tid, parms)
2✔
1967
        } else {
12✔
1968
                return d.inventoryClient.Search(ctx, tid, parms)
10✔
1969
        }
10✔
1970
}
1971

1972
func (d *Deployments) UpdateDeploymentsWithArtifactName(
1973
        ctx context.Context,
1974
        artifactName string,
1975
) error {
3✔
1976
        // first check if there are pending deployments with given artifact name
3✔
1977
        exists, err := d.db.ExistUnfinishedByArtifactName(ctx, artifactName)
3✔
1978
        if err != nil {
3✔
UNCOV
1979
                return errors.Wrap(err, "looking for deployments with given artifact name")
×
UNCOV
1980
        }
×
1981
        if !exists {
4✔
1982
                return nil
1✔
1983
        }
1✔
1984

1985
        // Assign artifacts to the deployments with given artifact name
1986
        artifacts, err := d.db.ImagesByName(ctx, artifactName)
2✔
1987
        if err != nil {
2✔
1988
                return errors.Wrap(err, "Finding artifact with given name")
×
1989
        }
×
1990

1991
        if len(artifacts) == 0 {
2✔
UNCOV
1992
                return ErrNoArtifact
×
UNCOV
1993
        }
×
1994
        artifactIDs := getArtifactIDs(artifacts)
2✔
1995
        return d.db.UpdateDeploymentsWithArtifactName(ctx, artifactName, artifactIDs)
2✔
1996
}
1997

1998
func (d *Deployments) reindexDevice(ctx context.Context, deviceID string) error {
49✔
1999
        if d.reportingClient != nil {
54✔
2000
                return d.workflowsClient.StartReindexReporting(ctx, deviceID)
5✔
2001
        }
5✔
2002
        return nil
44✔
2003
}
2004

2005
func (d *Deployments) reindexDeployment(ctx context.Context,
2006
        deviceID, deploymentID, ID string) error {
33✔
2007
        if d.reportingClient != nil {
38✔
2008
                return d.workflowsClient.StartReindexReportingDeployment(ctx, deviceID, deploymentID, ID)
5✔
2009
        }
5✔
2010
        return nil
28✔
2011
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc