• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 1354855011

01 Jul 2024 12:34AM UTC coverage: 81.632% (-0.3%) from 81.882%
1354855011

Pull #327

gitlab-ci

web-flow
chore: bump golang in the docker-dependencies group

Bumps the docker-dependencies group with 1 update: golang.


Updates `golang` from 1.22.3-alpine3.19 to 1.22.4-alpine3.19

---
updated-dependencies:
- dependency-name: golang
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: docker-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #327: chore: bump golang from 1.22.3-alpine3.19 to 1.22.4-alpine3.19 in the docker-dependencies group

1631 of 1998 relevant lines covered (81.63%)

14.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.8
/store/mongo/datastore_mongo.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package mongo
16

17
import (
18
        "context"
19
        "crypto/tls"
20
        "fmt"
21
        "strconv"
22
        "strings"
23
        "time"
24

25
        "github.com/pkg/errors"
26
        "go.mongodb.org/mongo-driver/bson"
27
        "go.mongodb.org/mongo-driver/bson/primitive"
28
        "go.mongodb.org/mongo-driver/mongo"
29
        "go.mongodb.org/mongo-driver/mongo/options"
30
        mopts "go.mongodb.org/mongo-driver/mongo/options"
31

32
        "github.com/mendersoftware/go-lib-micro/config"
33
        "github.com/mendersoftware/go-lib-micro/log"
34

35
        dconfig "github.com/mendersoftware/workflows/config"
36
        "github.com/mendersoftware/workflows/model"
37
        "github.com/mendersoftware/workflows/store"
38
)
39

40
const (
41
        // JobsCollectionName refers to the collection of finished or
42
        // jobs in progress.
43
        JobsCollectionName = "jobs"
44

45
        // WorkflowCollectionName refers to the collection of stored workflows
46
        WorkflowCollectionName = "workflows"
47
)
48

49
var (
50
        ErrNoSuchWorkflowByVersion = errors.New("Workflow of the given version not found")
51
)
52

53
// SetupDataStore returns the mongo data store and optionally runs migrations
54
func SetupDataStore(automigrate bool) (*DataStoreMongo, error) {
3✔
55
        ctx := context.Background()
3✔
56
        dbClient, err := NewClient(ctx, config.Config)
3✔
57
        if err != nil {
3✔
58
                return nil, errors.New(fmt.Sprintf("failed to connect to db: %v", err))
×
59
        }
×
60
        err = doMigrations(ctx, dbClient, automigrate)
3✔
61
        if err != nil {
3✔
62
                return nil, err
×
63
        }
×
64
        dataStore := NewDataStoreWithClient(dbClient, config.Config)
3✔
65
        return dataStore, nil
3✔
66
}
67

68
func doMigrations(ctx context.Context, client *mongo.Client,
69
        automigrate bool) error {
3✔
70
        db := config.Config.GetString(dconfig.SettingDbName)
3✔
71
        err := Migrate(ctx, db, DbVersion, client, automigrate)
3✔
72
        if err != nil {
3✔
73
                return errors.New(fmt.Sprintf("failed to run migrations: %v", err))
×
74
        }
×
75

76
        return nil
3✔
77
}
78

79
func disconnectClient(parentCtx context.Context, client *mongo.Client) {
2✔
80
        ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second)
2✔
81
        defer cancel()
2✔
82
        _ = client.Disconnect(ctx)
2✔
83
}
2✔
84

85
// NewClient returns a mongo client
86
func NewClient(_ context.Context, c config.Reader) (*mongo.Client, error) {
3✔
87

3✔
88
        clientOptions := mopts.Client()
3✔
89
        mongoURL := c.GetString(dconfig.SettingMongo)
3✔
90
        if !strings.Contains(mongoURL, "://") {
3✔
91
                return nil, errors.Errorf("Invalid mongoURL %q: missing schema.",
×
92
                        mongoURL)
×
93
        }
×
94
        clientOptions.ApplyURI(mongoURL)
3✔
95

3✔
96
        username := c.GetString(dconfig.SettingDbUsername)
3✔
97
        if username != "" {
3✔
98
                credentials := mopts.Credential{
×
99
                        Username: c.GetString(dconfig.SettingDbUsername),
×
100
                }
×
101
                password := c.GetString(dconfig.SettingDbPassword)
×
102
                if password != "" {
×
103
                        credentials.Password = password
×
104
                        credentials.PasswordSet = true
×
105
                }
×
106
                clientOptions.SetAuth(credentials)
×
107
        }
108

109
        if c.GetBool(dconfig.SettingDbSSL) {
3✔
110
                tlsConfig := &tls.Config{}
×
111
                tlsConfig.InsecureSkipVerify = c.GetBool(dconfig.SettingDbSSLSkipVerify)
×
112
                clientOptions.SetTLSConfig(tlsConfig)
×
113
        }
×
114

115
        // Set 10s timeout
116
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
3✔
117
        defer cancel()
3✔
118
        client, err := mongo.Connect(ctx, clientOptions)
3✔
119
        if err != nil {
3✔
120
                return nil, errors.Wrap(err, "Failed to connect to mongo server")
×
121
        }
×
122

123
        // Validate connection
124
        if err = client.Ping(ctx, nil); err != nil {
3✔
125
                return nil, errors.Wrap(err, "Error reaching mongo server")
×
126
        }
×
127

128
        return client, nil
3✔
129
}
130

131
// DataStoreMongo is the data storage service
132
type DataStoreMongo struct {
133
        // client holds the reference to the client used to communicate with the
134
        // mongodb server.
135
        client *mongo.Client
136
        // dbName contains the name of the workflow database.
137
        dbName string
138
        // workflows holds a local cache of workflows - a worker should NEVER
139
        // access this cache directly, but through
140
        // DataStoreMongo.GetWorkflowByName.
141
        workflows map[string]*model.Workflow
142
}
143

144
// NewDataStoreWithClient initializes a DataStore object
145
func NewDataStoreWithClient(client *mongo.Client, c config.Reader) *DataStoreMongo {
3✔
146
        dbName := c.GetString(dconfig.SettingDbName)
3✔
147
        ctx := context.Background()
3✔
148

3✔
149
        // Maybe initialize workflows
3✔
150
        var findResults []*model.Workflow
3✔
151
        workflows := make(map[string]*model.Workflow)
3✔
152
        database := client.Database(dbName)
3✔
153
        collWflows := database.Collection(WorkflowCollectionName)
3✔
154
        cur, err := collWflows.Find(ctx, bson.M{})
3✔
155
        if err == nil {
6✔
156
                if err = cur.All(ctx, &findResults); err == nil {
6✔
157
                        for _, workflow := range findResults {
3✔
158
                                workflows[workflow.Name+"."+strconv.Itoa(workflow.Version)] = workflow
×
159
                        }
×
160
                }
161
        }
162

163
        return &DataStoreMongo{
3✔
164
                client:    client,
3✔
165
                dbName:    dbName,
3✔
166
                workflows: workflows,
3✔
167
        }
3✔
168
}
169

170
func (db *DataStoreMongo) Ping(ctx context.Context) error {
1✔
171
        res := db.client.Database(db.dbName).
1✔
172
                RunCommand(ctx, bson.M{"ping": 1})
1✔
173
        return res.Err()
1✔
174
}
1✔
175

176
// LoadWorkflows from filesystem if the workflowsPath setting is provided
177
func (db *DataStoreMongo) LoadWorkflows(ctx context.Context, l *log.Logger) error {
3✔
178
        workflowsPath := config.Config.GetString(dconfig.SettingWorkflowsPath)
3✔
179
        if workflowsPath != "" {
5✔
180
                workflows := model.GetWorkflowsFromPath(workflowsPath)
2✔
181
                l.Infof("LoadWorkflows: loading %d workflows from %s.", len(workflows), workflowsPath)
2✔
182
                for _, workflow := range workflows {
12✔
183
                        l.Infof("LoadWorkflows: loading %s v%d.", workflow.Name, workflow.Version)
10✔
184
                        count, err := db.InsertWorkflows(ctx, *workflow)
10✔
185
                        if count != 1 {
10✔
186
                                l.Infof("LoadWorkflows: not loaded: %s v%d.", workflow.Name, workflow.Version)
×
187
                        }
×
188
                        if err != nil {
10✔
189
                                l.Infof(
×
190
                                        "LoadWorkflows: error loading: %s v%d: %s.",
×
191
                                        workflow.Name,
×
192
                                        workflow.Version,
×
193
                                        err.Error(),
×
194
                                )
×
195
                        }
×
196
                }
197
        } else {
1✔
198
                l.Info("LoadWorkflows: empty workflowsPath, not loading workflows")
1✔
199
        }
1✔
200
        return nil
3✔
201
}
202

203
// InsertWorkflows inserts a workflow to the database and cache and returns the number of
204
// inserted elements or an error for the first error generated.
205
func (db *DataStoreMongo) InsertWorkflows(
206
        ctx context.Context,
207
        workflows ...model.Workflow,
208
) (int, error) {
19✔
209
        database := db.client.Database(db.dbName)
19✔
210
        collWflows := database.Collection(WorkflowCollectionName)
19✔
211
        for i, workflow := range workflows {
39✔
212
                if workflow.Name == "" {
21✔
213
                        return i, store.ErrWorkflowMissingName
1✔
214
                }
1✔
215
                workflowDb, _ := db.GetWorkflowByName(ctx, workflow.Name, strconv.Itoa(workflow.Version))
19✔
216
                if workflowDb != nil && workflowDb.Version >= workflow.Version {
22✔
217
                        return i + 1, store.ErrWorkflowAlreadyExists
3✔
218
                }
3✔
219
                if workflowDb == nil || workflowDb.Version < workflow.Version {
32✔
220
                        upsert := true
16✔
221
                        opt := &mopts.UpdateOptions{
16✔
222
                                Upsert: &upsert,
16✔
223
                        }
16✔
224
                        query := bson.M{"_id": workflow.Name}
16✔
225
                        update := bson.M{"$set": workflow}
16✔
226
                        if _, err := collWflows.UpdateOne(ctx, query, update, opt); err != nil {
16✔
227
                                return i + 1, err
×
228
                        }
×
229
                }
230
                db.workflows[workflow.Name] = &workflow
16✔
231
        }
232
        return len(workflows), nil
15✔
233
}
234

235
// GetWorkflowByName gets the workflow with the given name - either from the
236
// cache, or searches the database if the workflow is not cached.
237
func (db *DataStoreMongo) GetWorkflowByName(
238
        ctx context.Context,
239
        workflowName string,
240
        version string,
241
) (*model.Workflow, error) {
47✔
242
        workflow, ok := db.workflows[workflowName]
47✔
243
        l := log.FromContext(ctx)
47✔
244

47✔
245
        versionNumber, e := strconv.Atoi(version)
47✔
246
        if e != nil {
68✔
247
                versionNumber = 0
21✔
248
        }
21✔
249

250
        if !ok || workflow.Version < versionNumber {
75✔
251
                var result model.Workflow
28✔
252
                database := db.client.Database(db.dbName)
28✔
253
                collWflows := database.Collection(WorkflowCollectionName)
28✔
254
                err := collWflows.FindOne(ctx, bson.M{"_id": workflowName}).
28✔
255
                        Decode(&result)
28✔
256
                if err != nil {
47✔
257
                        return nil, err
19✔
258
                }
19✔
259
                if result.Version < versionNumber {
9✔
260
                        l.Errorf("workflow found but with version too low: %s v%s", workflowName, version)
×
261
                        return nil, ErrNoSuchWorkflowByVersion
×
262
                }
×
263
                db.workflows[result.Name] = &result
9✔
264
                return &result, err
9✔
265
        } else {
19✔
266
                l.Debugf("cache hit: %s v%s", workflowName, version)
19✔
267
        }
19✔
268
        return workflow, nil
19✔
269
}
270

271
// GetWorkflows gets all workflows from the cache as a list
272
// (should only be used by the server process)
273
func (db *DataStoreMongo) GetWorkflows(ctx context.Context) []model.Workflow {
1✔
274
        workflows := make([]model.Workflow, len(db.workflows))
1✔
275
        var i int
1✔
276
        for _, workflow := range db.workflows {
2✔
277
                workflows[i] = *workflow
1✔
278
                i++
1✔
279
        }
1✔
280

281
        return workflows
1✔
282
}
283

284
// UpsertJob inserts the job in the queue
285
func (db *DataStoreMongo) UpsertJob(
286
        ctx context.Context, job *model.Job) (*model.Job, error) {
15✔
287
        if job.ID == "" {
20✔
288
                job.ID = primitive.NewObjectID().Hex()
5✔
289
        }
5✔
290
        query := bson.M{
15✔
291
                "_id": job.ID,
15✔
292
        }
15✔
293
        update := bson.M{
15✔
294
                "$set": job,
15✔
295
        }
15✔
296
        findUpdateOptions := &mopts.FindOneAndUpdateOptions{}
15✔
297
        findUpdateOptions.SetReturnDocument(mopts.After)
15✔
298
        findUpdateOptions.SetUpsert(true)
15✔
299

15✔
300
        database := db.client.Database(db.dbName)
15✔
301
        collJobs := database.Collection(JobsCollectionName)
15✔
302

15✔
303
        err := collJobs.FindOneAndUpdate(ctx, query, update, findUpdateOptions).Decode(job)
15✔
304
        if err != nil {
15✔
305
                return nil, err
×
306
        }
×
307

308
        return job, nil
15✔
309
}
310

311
// UpdateJobAddResult add a task execution result to a job status
312
func (db *DataStoreMongo) UpdateJobAddResult(ctx context.Context,
313
        job *model.Job, result *model.TaskResult) error {
29✔
314
        options := &mopts.UpdateOptions{}
29✔
315
        options.SetUpsert(true)
29✔
316

29✔
317
        update := bson.M{
29✔
318
                "$addToSet": bson.M{
29✔
319
                        "results": result,
29✔
320
                },
29✔
321
                "$setOnInsert": bson.M{
29✔
322
                        "workflow_name":    job.WorkflowName,
29✔
323
                        "input_parameters": job.InputParameters,
29✔
324
                        "status":           job.Status,
29✔
325
                        "insert_time":      job.InsertTime,
29✔
326
                        "version":          job.WorkflowVersion,
29✔
327
                },
29✔
328
        }
29✔
329

29✔
330
        collection := db.client.Database(db.dbName).
29✔
331
                Collection(JobsCollectionName)
29✔
332
        _, err := collection.UpdateOne(ctx, bson.M{"_id": job.ID}, update, options)
29✔
333
        if err != nil {
29✔
334
                return err
×
335
        }
×
336

337
        return nil
29✔
338
}
339

340
// UpdateJobStatus set the task execution status for a job status
341
func (db *DataStoreMongo) UpdateJobStatus(
342
        ctx context.Context, job *model.Job, status int32) error {
12✔
343
        if model.StatusToString(status) == "unknown" {
13✔
344
                return model.ErrInvalidStatus
1✔
345
        }
1✔
346

347
        options := &mopts.UpdateOptions{}
11✔
348
        options.SetUpsert(true)
11✔
349

11✔
350
        collection := db.client.Database(db.dbName).
11✔
351
                Collection(JobsCollectionName)
11✔
352
        _, err := collection.UpdateOne(ctx, bson.M{
11✔
353
                "_id": job.ID,
11✔
354
        }, bson.M{
11✔
355
                "$set": bson.M{
11✔
356
                        "status": status,
11✔
357
                },
11✔
358
                "$setOnInsert": bson.M{
11✔
359
                        "workflow_name":    job.WorkflowName,
11✔
360
                        "input_parameters": job.InputParameters,
11✔
361
                        "results":          job.Results,
11✔
362
                        "insert_time":      job.InsertTime,
11✔
363
                        "version":          job.WorkflowVersion,
11✔
364
                },
11✔
365
        }, options)
11✔
366
        if err != nil {
11✔
367
                return err
×
368
        }
×
369

370
        return nil
11✔
371
}
372

373
// GetJobByNameAndID get the task execution status for a job
374
// by workflow name and ID
375
func (db *DataStoreMongo) GetJobByNameAndID(
376
        ctx context.Context, name string, ID string) (*model.Job, error) {
31✔
377
        collection := db.client.Database(db.dbName).
31✔
378
                Collection(JobsCollectionName)
31✔
379
        cur := collection.FindOne(ctx, bson.M{
31✔
380
                "_id":           ID,
31✔
381
                "workflow_name": name,
31✔
382
        })
31✔
383
        var job model.Job
31✔
384
        err := cur.Decode(&job)
31✔
385
        if err == mongo.ErrNoDocuments {
31✔
386
                return nil, nil
×
387
        } else if err != nil {
31✔
388
                return nil, err
×
389
        }
×
390

391
        return &job, nil
31✔
392
}
393

394
// GetJobByID get the task execution status for a job by ID
395
func (db *DataStoreMongo) GetJobByID(
396
        ctx context.Context, ID string) (*model.Job, error) {
2✔
397
        collection := db.client.Database(db.dbName).
2✔
398
                Collection(JobsCollectionName)
2✔
399
        cur := collection.FindOne(ctx, bson.M{
2✔
400
                "_id": ID,
2✔
401
        })
2✔
402
        var job model.Job
2✔
403
        err := cur.Decode(&job)
2✔
404
        if err == mongo.ErrNoDocuments {
3✔
405
                return nil, nil
1✔
406
        } else if err != nil {
2✔
407
                return nil, err
×
408
        }
×
409

410
        return &job, nil
1✔
411
}
412

413
func (db *DataStoreMongo) GetAllJobs(
414
        ctx context.Context, page int64, perPage int64) ([]model.Job, int64, error) {
1✔
415
        collection := db.client.Database(db.dbName).
1✔
416
                Collection(JobsCollectionName)
1✔
417
        findOptions := &options.FindOptions{}
1✔
418
        findOptions.SetSkip(int64((page - 1) * perPage))
1✔
419
        findOptions.SetLimit(int64(perPage))
1✔
420
        sortField := bson.M{}
1✔
421
        sortField["insert_time"] = -1
1✔
422
        findOptions.SetSort(sortField)
1✔
423
        cur, err := collection.Find(ctx, bson.M{}, findOptions)
1✔
424
        if err != nil {
1✔
425
                return []model.Job{}, 0, err
×
426
        }
×
427

428
        var jobs []model.Job
1✔
429
        err = cur.All(ctx, &jobs)
1✔
430
        if err == mongo.ErrNoDocuments {
1✔
431
                return []model.Job{}, 0, nil
×
432
        } else if err != nil {
1✔
433
                return []model.Job{}, 0, err
×
434
        }
×
435

436
        count, err := collection.CountDocuments(ctx, bson.M{})
1✔
437
        if err != nil {
1✔
438
                return []model.Job{}, 0, err
×
439
        }
×
440
        return jobs, count, nil
1✔
441
}
442

443
// Close disconnects the client
444
func (db *DataStoreMongo) Close() {
2✔
445
        ctx := context.Background()
2✔
446
        disconnectClient(ctx, db.client)
2✔
447
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc