• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / workflows / 792065958

pending completion
792065958

Pull #263

gitlab-ci

GitHub
chore: bump golang.org/x/sys
Pull Request #263: chore: bump golang.org/x/sys from 0.0.0-20220715151400-c0bba94af5f8 to 0.5.0

1474 of 1729 relevant lines covered (85.25%)

20.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.75
/store/mongo/datastore_mongo.go
1
// Copyright 2022 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package mongo
16

17
import (
18
        "context"
19
        "crypto/tls"
20
        "fmt"
21
        "strconv"
22
        "strings"
23
        "time"
24

25
        "github.com/pkg/errors"
26
        "go.mongodb.org/mongo-driver/bson"
27
        "go.mongodb.org/mongo-driver/bson/primitive"
28
        "go.mongodb.org/mongo-driver/mongo"
29
        "go.mongodb.org/mongo-driver/mongo/options"
30
        mopts "go.mongodb.org/mongo-driver/mongo/options"
31

32
        "github.com/mendersoftware/go-lib-micro/config"
33
        "github.com/mendersoftware/go-lib-micro/log"
34

35
        dconfig "github.com/mendersoftware/workflows/config"
36
        "github.com/mendersoftware/workflows/model"
37
        "github.com/mendersoftware/workflows/store"
38
)
39

40
const (
41
        // JobsCollectionName refers to the collection of finished or
42
        // jobs in progress.
43
        JobsCollectionName = "jobs"
44

45
        // WorkflowCollectionName refers to the collection of stored workflows
46
        WorkflowCollectionName = "workflows"
47
)
48

49
var (
50
        ErrNoSuchWorkflowByVersion = errors.New("Workflow of the given version not found")
51
)
52

53
// SetupDataStore returns the mongo data store and optionally runs migrations
54
func SetupDataStore(automigrate bool) (*DataStoreMongo, error) {
4✔
55
        ctx := context.Background()
4✔
56
        dbClient, err := NewClient(ctx, config.Config)
4✔
57
        if err != nil {
4✔
58
                return nil, errors.New(fmt.Sprintf("failed to connect to db: %v", err))
×
59
        }
×
60
        err = doMigrations(ctx, dbClient, automigrate)
4✔
61
        if err != nil {
4✔
62
                return nil, err
×
63
        }
×
64
        dataStore := NewDataStoreWithClient(dbClient, config.Config)
4✔
65
        return dataStore, nil
4✔
66
}
67

68
func doMigrations(ctx context.Context, client *Client,
69
        automigrate bool) error {
4✔
70
        db := config.Config.GetString(dconfig.SettingDbName)
4✔
71
        err := Migrate(ctx, db, DbVersion, client, automigrate)
4✔
72
        if err != nil {
4✔
73
                return errors.New(fmt.Sprintf("failed to run migrations: %v", err))
×
74
        }
×
75

76
        return nil
4✔
77
}
78

79
func disconnectClient(parentCtx context.Context, client *Client) {
2✔
80
        ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second)
2✔
81
        defer cancel()
2✔
82
        _ = client.Disconnect(ctx)
2✔
83
}
2✔
84

85
// Client is a package specific mongo client
86
type Client struct {
87
        mongo.Client
88
}
89

90
// NewClient returns a mongo client
91
func NewClient(_ context.Context, c config.Reader) (*Client, error) {
4✔
92

4✔
93
        clientOptions := mopts.Client()
4✔
94
        mongoURL := c.GetString(dconfig.SettingMongo)
4✔
95
        if !strings.Contains(mongoURL, "://") {
4✔
96
                return nil, errors.Errorf("Invalid mongoURL %q: missing schema.",
×
97
                        mongoURL)
×
98
        }
×
99
        clientOptions.ApplyURI(mongoURL)
4✔
100

4✔
101
        username := c.GetString(dconfig.SettingDbUsername)
4✔
102
        if username != "" {
4✔
103
                credentials := mopts.Credential{
×
104
                        Username: c.GetString(dconfig.SettingDbUsername),
×
105
                }
×
106
                password := c.GetString(dconfig.SettingDbPassword)
×
107
                if password != "" {
×
108
                        credentials.Password = password
×
109
                        credentials.PasswordSet = true
×
110
                }
×
111
                clientOptions.SetAuth(credentials)
×
112
        }
113

114
        if c.GetBool(dconfig.SettingDbSSL) {
4✔
115
                tlsConfig := &tls.Config{}
×
116
                tlsConfig.InsecureSkipVerify = c.GetBool(dconfig.SettingDbSSLSkipVerify)
×
117
                clientOptions.SetTLSConfig(tlsConfig)
×
118
        }
×
119

120
        // Set 10s timeout
121
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
4✔
122
        defer cancel()
4✔
123
        client, err := mongo.Connect(ctx, clientOptions)
4✔
124
        if err != nil {
4✔
125
                return nil, errors.Wrap(err, "Failed to connect to mongo server")
×
126
        }
×
127

128
        // Validate connection
129
        if err = client.Ping(ctx, nil); err != nil {
4✔
130
                return nil, errors.Wrap(err, "Error reaching mongo server")
×
131
        }
×
132

133
        mongoClient := Client{Client: *client}
4✔
134
        return &mongoClient, nil
4✔
135
}
136

137
// DataStoreMongo is the data storage service
138
type DataStoreMongo struct {
139
        // client holds the reference to the client used to communicate with the
140
        // mongodb server.
141
        client *Client
142
        // dbName contains the name of the workflow database.
143
        dbName string
144
        // workflows holds a local cache of workflows - a worker should NEVER
145
        // access this cache directly, but through
146
        // DataStoreMongo.GetWorkflowByName.
147
        workflows map[string]*model.Workflow
148
}
149

150
// NewDataStoreWithClient initializes a DataStore object
151
func NewDataStoreWithClient(client *Client, c config.Reader) *DataStoreMongo {
4✔
152
        dbName := c.GetString(dconfig.SettingDbName)
4✔
153
        ctx := context.Background()
4✔
154

4✔
155
        // Maybe initialize workflows
4✔
156
        var findResults []*model.Workflow
4✔
157
        workflows := make(map[string]*model.Workflow)
4✔
158
        database := client.Database(dbName)
4✔
159
        collWflows := database.Collection(WorkflowCollectionName)
4✔
160
        cur, err := collWflows.Find(ctx, bson.M{})
4✔
161
        if err == nil {
8✔
162
                if err = cur.All(ctx, &findResults); err == nil {
8✔
163
                        for _, workflow := range findResults {
4✔
164
                                workflows[workflow.Name+"."+strconv.Itoa(workflow.Version)] = workflow
×
165
                        }
×
166
                }
167
        }
168

169
        return &DataStoreMongo{
4✔
170
                client:    client,
4✔
171
                dbName:    dbName,
4✔
172
                workflows: workflows,
4✔
173
        }
4✔
174
}
175

176
func (db *DataStoreMongo) Ping(ctx context.Context) error {
2✔
177
        res := db.client.Database(db.dbName).
2✔
178
                RunCommand(ctx, bson.M{"ping": 1})
2✔
179
        return res.Err()
2✔
180
}
2✔
181

182
// LoadWorkflows from filesystem if the workflowsPath setting is provided
183
func (db *DataStoreMongo) LoadWorkflows(ctx context.Context, l *log.Logger) error {
4✔
184
        workflowsPath := config.Config.GetString(dconfig.SettingWorkflowsPath)
4✔
185
        if workflowsPath != "" {
7✔
186
                workflows := model.GetWorkflowsFromPath(workflowsPath)
3✔
187
                l.Infof("LoadWorkflows: loading %d workflows from %s.", len(workflows), workflowsPath)
3✔
188
                for _, workflow := range workflows {
15✔
189
                        l.Infof("LoadWorkflows: loading %s v%d.", workflow.Name, workflow.Version)
12✔
190
                        count, err := db.InsertWorkflows(ctx, *workflow)
12✔
191
                        if count != 1 {
13✔
192
                                l.Infof("LoadWorkflows: not loaded: %s v%d.", workflow.Name, workflow.Version)
1✔
193
                        }
1✔
194
                        if err != nil {
13✔
195
                                l.Infof(
1✔
196
                                        "LoadWorkflows: error loading: %s v%d: %s.",
1✔
197
                                        workflow.Name,
1✔
198
                                        workflow.Version,
1✔
199
                                        err.Error(),
1✔
200
                                )
1✔
201
                        }
1✔
202
                }
203
        } else {
1✔
204
                l.Info("LoadWorkflows: empty workflowsPath, not loading workflows")
1✔
205
        }
1✔
206
        return nil
4✔
207
}
208

209
// InsertWorkflows inserts a workflow to the database and cache and returns the number of
210
// inserted elements or an error for the first error generated.
211
func (db *DataStoreMongo) InsertWorkflows(
212
        ctx context.Context,
213
        workflows ...model.Workflow,
214
) (int, error) {
30✔
215
        database := db.client.Database(db.dbName)
30✔
216
        collWflows := database.Collection(WorkflowCollectionName)
30✔
217
        for i, workflow := range workflows {
62✔
218
                if workflow.Name == "" {
35✔
219
                        return i, store.ErrWorkflowMissingName
3✔
220
                }
3✔
221
                workflowDb, _ := db.GetWorkflowByName(ctx, workflow.Name, strconv.Itoa(workflow.Version))
29✔
222
                if workflowDb != nil && workflowDb.Version >= workflow.Version {
35✔
223
                        return i + 1, store.ErrWorkflowAlreadyExists
6✔
224
                }
6✔
225
                if workflowDb == nil || workflowDb.Version < workflow.Version {
46✔
226
                        upsert := true
23✔
227
                        opt := &mopts.UpdateOptions{
23✔
228
                                Upsert: &upsert,
23✔
229
                        }
23✔
230
                        query := bson.M{"_id": workflow.Name}
23✔
231
                        update := bson.M{"$set": workflow}
23✔
232
                        if _, err := collWflows.UpdateOne(ctx, query, update, opt); err != nil {
23✔
233
                                return i + 1, err
×
234
                        }
×
235
                }
236
                db.workflows[workflow.Name] = &workflow
23✔
237
        }
238
        return len(workflows), nil
21✔
239
}
240

241
// GetWorkflowByName gets the workflow with the given name - either from the
242
// cache, or searches the database if the workflow is not cached.
243
func (db *DataStoreMongo) GetWorkflowByName(
244
        ctx context.Context,
245
        workflowName string,
246
        version string,
247
) (*model.Workflow, error) {
64✔
248
        workflow, ok := db.workflows[workflowName]
64✔
249
        l := log.FromContext(ctx)
64✔
250

64✔
251
        versionNumber, e := strconv.Atoi(version)
64✔
252
        if e != nil {
85✔
253
                versionNumber = 0
21✔
254
        }
21✔
255

256
        if !ok || workflow.Version < versionNumber {
102✔
257
                var result model.Workflow
38✔
258
                database := db.client.Database(db.dbName)
38✔
259
                collWflows := database.Collection(WorkflowCollectionName)
38✔
260
                err := collWflows.FindOne(ctx, bson.M{"_id": workflowName}).
38✔
261
                        Decode(&result)
38✔
262
                if err != nil {
66✔
263
                        return nil, err
28✔
264
                }
28✔
265
                if result.Version < versionNumber {
10✔
266
                        l.Errorf("workflow found but with version too low: %s v%s", workflowName, version)
×
267
                        return nil, ErrNoSuchWorkflowByVersion
×
268
                }
×
269
                db.workflows[result.Name] = &result
10✔
270
                return &result, err
10✔
271
        } else {
26✔
272
                l.Debugf("cache hit: %s v%s", workflowName, version)
26✔
273
        }
26✔
274
        return workflow, nil
26✔
275
}
276

277
// GetWorkflows gets all workflows from the cache as a list
278
// (should only be used by the server process)
279
func (db *DataStoreMongo) GetWorkflows(ctx context.Context) []model.Workflow {
2✔
280
        workflows := make([]model.Workflow, len(db.workflows))
2✔
281
        var i int
2✔
282
        for _, workflow := range db.workflows {
4✔
283
                workflows[i] = *workflow
2✔
284
                i++
2✔
285
        }
2✔
286

287
        return workflows
2✔
288
}
289

290
// UpsertJob inserts the job in the queue
291
func (db *DataStoreMongo) UpsertJob(
292
        ctx context.Context, job *model.Job) (*model.Job, error) {
20✔
293
        if job.ID == "" {
30✔
294
                job.ID = primitive.NewObjectID().Hex()
10✔
295
        }
10✔
296
        query := bson.M{
20✔
297
                "_id": job.ID,
20✔
298
        }
20✔
299
        update := bson.M{
20✔
300
                "$set": job,
20✔
301
        }
20✔
302
        findUpdateOptions := &mopts.FindOneAndUpdateOptions{}
20✔
303
        findUpdateOptions.SetReturnDocument(mopts.After)
20✔
304
        findUpdateOptions.SetUpsert(true)
20✔
305

20✔
306
        database := db.client.Database(db.dbName)
20✔
307
        collJobs := database.Collection(JobsCollectionName)
20✔
308

20✔
309
        err := collJobs.FindOneAndUpdate(ctx, query, update, findUpdateOptions).Decode(job)
20✔
310
        if err != nil {
20✔
311
                return nil, err
×
312
        }
×
313

314
        return job, nil
20✔
315
}
316

317
// UpdateJobAddResult add a task execution result to a job status
318
func (db *DataStoreMongo) UpdateJobAddResult(ctx context.Context,
319
        job *model.Job, result *model.TaskResult) error {
30✔
320
        options := &mopts.UpdateOptions{}
30✔
321
        options.SetUpsert(true)
30✔
322

30✔
323
        update := bson.M{
30✔
324
                "$addToSet": bson.M{
30✔
325
                        "results": result,
30✔
326
                },
30✔
327
                "$setOnInsert": bson.M{
30✔
328
                        "workflow_name":    job.WorkflowName,
30✔
329
                        "input_parameters": job.InputParameters,
30✔
330
                        "status":           job.Status,
30✔
331
                        "insert_time":      job.InsertTime,
30✔
332
                        "version":          job.WorkflowVersion,
30✔
333
                },
30✔
334
        }
30✔
335

30✔
336
        collection := db.client.Database(db.dbName).
30✔
337
                Collection(JobsCollectionName)
30✔
338
        _, err := collection.UpdateOne(ctx, bson.M{"_id": job.ID}, update, options)
30✔
339
        if err != nil {
30✔
340
                return err
×
341
        }
×
342

343
        return nil
30✔
344
}
345

346
// UpdateJobStatus set the task execution status for a job status
347
func (db *DataStoreMongo) UpdateJobStatus(
348
        ctx context.Context, job *model.Job, status int32) error {
14✔
349
        if model.StatusToString(status) == "unknown" {
16✔
350
                return model.ErrInvalidStatus
2✔
351
        }
2✔
352

353
        options := &mopts.UpdateOptions{}
12✔
354
        options.SetUpsert(true)
12✔
355

12✔
356
        collection := db.client.Database(db.dbName).
12✔
357
                Collection(JobsCollectionName)
12✔
358
        _, err := collection.UpdateOne(ctx, bson.M{
12✔
359
                "_id": job.ID,
12✔
360
        }, bson.M{
12✔
361
                "$set": bson.M{
12✔
362
                        "status": status,
12✔
363
                },
12✔
364
                "$setOnInsert": bson.M{
12✔
365
                        "workflow_name":    job.WorkflowName,
12✔
366
                        "input_parameters": job.InputParameters,
12✔
367
                        "results":          job.Results,
12✔
368
                        "insert_time":      job.InsertTime,
12✔
369
                        "version":          job.WorkflowVersion,
12✔
370
                },
12✔
371
        }, options)
12✔
372
        if err != nil {
12✔
373
                return err
×
374
        }
×
375

376
        return nil
12✔
377
}
378

379
// GetJobByNameAndID get the task execution status for a job
380
// by workflow name and ID
381
func (db *DataStoreMongo) GetJobByNameAndID(
382
        ctx context.Context, name string, ID string) (*model.Job, error) {
34✔
383
        collection := db.client.Database(db.dbName).
34✔
384
                Collection(JobsCollectionName)
34✔
385
        cur := collection.FindOne(ctx, bson.M{
34✔
386
                "_id":           ID,
34✔
387
                "workflow_name": name,
34✔
388
        })
34✔
389
        var job model.Job
34✔
390
        err := cur.Decode(&job)
34✔
391
        if err == mongo.ErrNoDocuments {
34✔
392
                return nil, nil
×
393
        } else if err != nil {
34✔
394
                return nil, err
×
395
        }
×
396

397
        return &job, nil
34✔
398
}
399

400
func (db *DataStoreMongo) GetAllJobs(
401
        ctx context.Context, page int64, perPage int64) ([]model.Job, int64, error) {
2✔
402
        collection := db.client.Database(db.dbName).
2✔
403
                Collection(JobsCollectionName)
2✔
404
        findOptions := &options.FindOptions{}
2✔
405
        findOptions.SetSkip(int64((page - 1) * perPage))
2✔
406
        findOptions.SetLimit(int64(perPage))
2✔
407
        sortField := bson.M{}
2✔
408
        sortField["insert_time"] = -1
2✔
409
        findOptions.SetSort(sortField)
2✔
410
        cur, err := collection.Find(ctx, bson.M{}, findOptions)
2✔
411
        if err != nil {
2✔
412
                return []model.Job{}, 0, err
×
413
        }
×
414

415
        var jobs []model.Job
2✔
416
        err = cur.All(ctx, &jobs)
2✔
417
        if err == mongo.ErrNoDocuments {
2✔
418
                return []model.Job{}, 0, nil
×
419
        } else if err != nil {
2✔
420
                return []model.Job{}, 0, err
×
421
        }
×
422

423
        count, err := collection.CountDocuments(ctx, bson.M{})
2✔
424
        if err != nil {
2✔
425
                return []model.Job{}, 0, err
×
426
        }
×
427
        return jobs, count, nil
2✔
428
}
429

430
// Close disconnects the client
431
func (db *DataStoreMongo) Close() {
2✔
432
        ctx := context.Background()
2✔
433
        disconnectClient(ctx, db.client)
2✔
434
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc