• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1592525883

17 Dec 2024 01:50PM UTC coverage: 73.526% (+0.7%) from 72.839%
1592525883

Pull #270

gitlab-ci

bahaa-ghazal
test: testing 'deploy to all devices' feature

Changelog = Title
Ticket = MEN-4272
Signed-off-by: Bahaa Aldeen Ghazal <bahaa.ghazal@northern.tech>
Pull Request #270: test: testing 'deploy to all devices' feature

4244 of 6144 branches covered (69.08%)

Branch coverage included in aggregate %.

40043 of 54089 relevant lines covered (74.03%)

23.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.22
/backend/services/workflows/store/mongo/datastore_mongo.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package mongo
16

17
import (
18
        "context"
19
        "crypto/tls"
20
        "fmt"
21
        "strconv"
22
        "strings"
23
        "time"
24

25
        "github.com/pkg/errors"
26
        "go.mongodb.org/mongo-driver/bson"
27
        "go.mongodb.org/mongo-driver/bson/primitive"
28
        "go.mongodb.org/mongo-driver/mongo"
29
        "go.mongodb.org/mongo-driver/mongo/options"
30
        mopts "go.mongodb.org/mongo-driver/mongo/options"
31

32
        "github.com/mendersoftware/mender-server/pkg/config"
33
        "github.com/mendersoftware/mender-server/pkg/log"
34

35
        dconfig "github.com/mendersoftware/mender-server/services/workflows/config"
36
        "github.com/mendersoftware/mender-server/services/workflows/model"
37
        "github.com/mendersoftware/mender-server/services/workflows/store"
38
)
39

40
const (
41
        // JobsCollectionName refers to the collection of finished or
42
        // jobs in progress.
43
        JobsCollectionName = "jobs"
44

45
        // WorkflowCollectionName refers to the collection of stored workflows
46
        WorkflowCollectionName = "workflows"
47
)
48

49
var (
50
        ErrNoSuchWorkflowByVersion = errors.New("Workflow of the given version not found")
51
)
52

53
// SetupDataStore returns the mongo data store and optionally runs migrations
54
func SetupDataStore(automigrate bool) (*DataStoreMongo, error) {
3✔
55
        ctx := context.Background()
3✔
56
        dbClient, err := NewClient(ctx, config.Config)
3✔
57
        if err != nil {
3✔
58
                return nil, errors.New(fmt.Sprintf("failed to connect to db: %v", err))
×
59
        }
×
60
        err = doMigrations(ctx, dbClient, automigrate)
3✔
61
        if err != nil {
3✔
62
                return nil, err
×
63
        }
×
64
        dataStore := NewDataStoreWithClient(dbClient, config.Config)
3✔
65
        return dataStore, nil
3✔
66
}
67

68
func doMigrations(ctx context.Context, client *mongo.Client,
69
        automigrate bool) error {
3✔
70
        db := config.Config.GetString(dconfig.SettingDbName)
3✔
71
        err := Migrate(ctx, db, DbVersion, client, automigrate)
3✔
72
        if err != nil {
3✔
73
                return errors.New(fmt.Sprintf("failed to run migrations: %v", err))
×
74
        }
×
75

76
        return nil
3✔
77
}
78

79
func disconnectClient(parentCtx context.Context, client *mongo.Client) {
2✔
80
        ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second)
2✔
81
        defer cancel()
2✔
82
        _ = client.Disconnect(ctx)
2✔
83
}
2✔
84

85
// NewClient returns a mongo client
86
func NewClient(_ context.Context, c config.Reader) (*mongo.Client, error) {
3✔
87

3✔
88
        clientOptions := mopts.Client()
3✔
89
        mongoURL := c.GetString(dconfig.SettingMongo)
3✔
90
        if !strings.Contains(mongoURL, "://") {
3✔
91
                return nil, errors.Errorf("Invalid mongoURL %q: missing schema.",
×
92
                        mongoURL)
×
93
        }
×
94
        clientOptions.ApplyURI(mongoURL).
3✔
95
                SetRegistry(newRegistry())
3✔
96

3✔
97
        username := c.GetString(dconfig.SettingDbUsername)
3✔
98
        if username != "" {
3✔
99
                credentials := mopts.Credential{
×
100
                        Username: c.GetString(dconfig.SettingDbUsername),
×
101
                }
×
102
                password := c.GetString(dconfig.SettingDbPassword)
×
103
                if password != "" {
×
104
                        credentials.Password = password
×
105
                        credentials.PasswordSet = true
×
106
                }
×
107
                clientOptions.SetAuth(credentials)
×
108
        }
109

110
        if c.GetBool(dconfig.SettingDbSSL) {
3✔
111
                tlsConfig := &tls.Config{}
×
112
                tlsConfig.InsecureSkipVerify = c.GetBool(dconfig.SettingDbSSLSkipVerify)
×
113
                clientOptions.SetTLSConfig(tlsConfig)
×
114
        }
×
115

116
        // Set 10s timeout
117
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
3✔
118
        defer cancel()
3✔
119
        client, err := mongo.Connect(ctx, clientOptions)
3✔
120
        if err != nil {
3✔
121
                return nil, errors.Wrap(err, "Failed to connect to mongo server")
×
122
        }
×
123

124
        // Validate connection
125
        if err = client.Ping(ctx, nil); err != nil {
3✔
126
                return nil, errors.Wrap(err, "Error reaching mongo server")
×
127
        }
×
128

129
        return client, nil
3✔
130
}
131

132
// DataStoreMongo is the data storage service
133
type DataStoreMongo struct {
134
        // client holds the reference to the client used to communicate with the
135
        // mongodb server.
136
        client *mongo.Client
137
        // dbName contains the name of the workflow database.
138
        dbName string
139
        // workflows holds a local cache of workflows - a worker should NEVER
140
        // access this cache directly, but through
141
        // DataStoreMongo.GetWorkflowByName.
142
        workflows map[string]*model.Workflow
143
}
144

145
// NewDataStoreWithClient initializes a DataStore object
146
func NewDataStoreWithClient(client *mongo.Client, c config.Reader) *DataStoreMongo {
3✔
147
        dbName := c.GetString(dconfig.SettingDbName)
3✔
148
        ctx := context.Background()
3✔
149

3✔
150
        // Maybe initialize workflows
3✔
151
        var findResults []*model.Workflow
3✔
152
        workflows := make(map[string]*model.Workflow)
3✔
153
        database := client.Database(dbName)
3✔
154
        collWflows := database.Collection(WorkflowCollectionName)
3✔
155
        cur, err := collWflows.Find(ctx, bson.M{})
3✔
156
        if err == nil {
6✔
157
                if err = cur.All(ctx, &findResults); err == nil {
6✔
158
                        for _, workflow := range findResults {
5✔
159
                                workflows[workflow.Name+"."+strconv.Itoa(workflow.Version)] = workflow
2✔
160
                        }
2✔
161
                }
162
        }
163

164
        return &DataStoreMongo{
3✔
165
                client:    client,
3✔
166
                dbName:    dbName,
3✔
167
                workflows: workflows,
3✔
168
        }
3✔
169
}
170

171
func (db *DataStoreMongo) Ping(ctx context.Context) error {
3✔
172
        res := db.client.Database(db.dbName).
3✔
173
                RunCommand(ctx, bson.M{"ping": 1})
3✔
174
        return res.Err()
3✔
175
}
3✔
176

177
// LoadWorkflows from filesystem if the workflowsPath setting is provided
178
func (db *DataStoreMongo) LoadWorkflows(ctx context.Context, l *log.Logger) error {
3✔
179
        workflowsPath := config.Config.GetString(dconfig.SettingWorkflowsPath)
3✔
180
        if workflowsPath != "" {
6✔
181
                workflows := model.GetWorkflowsFromPath(workflowsPath)
3✔
182
                l.Infof("LoadWorkflows: loading %d workflows from %s.", len(workflows), workflowsPath)
3✔
183
                for _, workflow := range workflows {
6✔
184
                        l.Infof("LoadWorkflows: loading %s v%d.", workflow.Name, workflow.Version)
3✔
185
                        count, err := db.InsertWorkflows(ctx, *workflow)
3✔
186
                        if count != 1 {
3✔
187
                                l.Infof("LoadWorkflows: not loaded: %s v%d.", workflow.Name, workflow.Version)
×
188
                        }
×
189
                        if err != nil {
5✔
190
                                l.Infof(
2✔
191
                                        "LoadWorkflows: error loading: %s v%d: %s.",
2✔
192
                                        workflow.Name,
2✔
193
                                        workflow.Version,
2✔
194
                                        err.Error(),
2✔
195
                                )
2✔
196
                        }
2✔
197
                }
198
        } else {
×
199
                l.Info("LoadWorkflows: empty workflowsPath, not loading workflows")
×
200
        }
×
201
        return nil
3✔
202
}
203

204
// InsertWorkflows inserts a workflow to the database and cache and returns the number of
205
// inserted elements or an error for the first error generated.
206
func (db *DataStoreMongo) InsertWorkflows(
207
        ctx context.Context,
208
        workflows ...model.Workflow,
209
) (int, error) {
3✔
210
        database := db.client.Database(db.dbName)
3✔
211
        collWflows := database.Collection(WorkflowCollectionName)
3✔
212
        for i, workflow := range workflows {
6✔
213
                if workflow.Name == "" {
4✔
214
                        return i, store.ErrWorkflowMissingName
1✔
215
                }
1✔
216
                workflowDb, _ := db.GetWorkflowByName(ctx, workflow.Name, strconv.Itoa(workflow.Version))
3✔
217
                if workflowDb != nil && workflowDb.Version >= workflow.Version {
6✔
218
                        return i + 1, store.ErrWorkflowAlreadyExists
3✔
219
                }
3✔
220
                if workflowDb == nil || workflowDb.Version < workflow.Version {
6✔
221
                        upsert := true
3✔
222
                        opt := &mopts.UpdateOptions{
3✔
223
                                Upsert: &upsert,
3✔
224
                        }
3✔
225
                        query := bson.M{"_id": workflow.Name}
3✔
226
                        update := bson.M{"$set": workflow}
3✔
227
                        if _, err := collWflows.UpdateOne(ctx, query, update, opt); err != nil {
3✔
228
                                return i + 1, err
×
229
                        }
×
230
                }
231
                db.workflows[workflow.Name] = &workflow
3✔
232
        }
233
        return len(workflows), nil
3✔
234
}
235

236
// GetWorkflowByName gets the workflow with the given name - either from the
237
// cache, or searches the database if the workflow is not cached.
238
func (db *DataStoreMongo) GetWorkflowByName(
239
        ctx context.Context,
240
        workflowName string,
241
        version string,
242
) (*model.Workflow, error) {
3✔
243
        workflow, ok := db.workflows[workflowName]
3✔
244
        l := log.FromContext(ctx)
3✔
245

3✔
246
        versionNumber, e := strconv.Atoi(version)
3✔
247
        if e != nil {
5✔
248
                versionNumber = 0
2✔
249
        }
2✔
250

251
        if !ok || workflow.Version < versionNumber {
6✔
252
                var result model.Workflow
3✔
253
                database := db.client.Database(db.dbName)
3✔
254
                collWflows := database.Collection(WorkflowCollectionName)
3✔
255
                err := collWflows.FindOne(ctx, bson.M{"_id": workflowName}).
3✔
256
                        Decode(&result)
3✔
257
                if err != nil {
6✔
258
                        return nil, err
3✔
259
                }
3✔
260
                if result.Version < versionNumber {
4✔
261
                        l.Errorf("workflow found but with version too low: %s v%s", workflowName, version)
1✔
262
                        return nil, ErrNoSuchWorkflowByVersion
1✔
263
                }
1✔
264
                db.workflows[result.Name] = &result
3✔
265
                return &result, err
3✔
266
        } else {
3✔
267
                l.Debugf("cache hit: %s v%s", workflowName, version)
3✔
268
        }
3✔
269
        return workflow, nil
3✔
270
}
271

272
// GetWorkflows gets all workflows from the cache as a list
273
// (should only be used by the server process)
274
func (db *DataStoreMongo) GetWorkflows(ctx context.Context) []model.Workflow {
2✔
275
        workflows := make([]model.Workflow, len(db.workflows))
2✔
276
        var i int
2✔
277
        for _, workflow := range db.workflows {
4✔
278
                workflows[i] = *workflow
2✔
279
                i++
2✔
280
        }
2✔
281

282
        return workflows
2✔
283
}
284

285
// UpsertJob inserts the job in the queue
286
func (db *DataStoreMongo) UpsertJob(
287
        ctx context.Context, job *model.Job) (*model.Job, error) {
3✔
288
        if job.ID == "" {
4✔
289
                job.ID = primitive.NewObjectID().Hex()
1✔
290
        }
1✔
291
        query := bson.M{
3✔
292
                "_id": job.ID,
3✔
293
        }
3✔
294
        update := bson.M{
3✔
295
                "$set": job,
3✔
296
        }
3✔
297
        findUpdateOptions := &mopts.FindOneAndUpdateOptions{}
3✔
298
        findUpdateOptions.SetReturnDocument(mopts.After)
3✔
299
        findUpdateOptions.SetUpsert(true)
3✔
300

3✔
301
        database := db.client.Database(db.dbName)
3✔
302
        collJobs := database.Collection(JobsCollectionName)
3✔
303

3✔
304
        err := collJobs.FindOneAndUpdate(ctx, query, update, findUpdateOptions).Decode(job)
3✔
305
        if err != nil {
3✔
306
                return nil, err
×
307
        }
×
308

309
        return job, nil
3✔
310
}
311

312
// UpdateJobAddResult add a task execution result to a job status
313
func (db *DataStoreMongo) UpdateJobAddResult(ctx context.Context,
314
        job *model.Job, result *model.TaskResult) error {
3✔
315
        options := &mopts.UpdateOptions{}
3✔
316
        options.SetUpsert(true)
3✔
317

3✔
318
        update := bson.M{
3✔
319
                "$addToSet": bson.M{
3✔
320
                        "results": result,
3✔
321
                },
3✔
322
                "$setOnInsert": bson.M{
3✔
323
                        "workflow_name":    job.WorkflowName,
3✔
324
                        "input_parameters": job.InputParameters,
3✔
325
                        "status":           job.Status,
3✔
326
                        "insert_time":      job.InsertTime,
3✔
327
                        "version":          job.WorkflowVersion,
3✔
328
                },
3✔
329
        }
3✔
330

3✔
331
        collection := db.client.Database(db.dbName).
3✔
332
                Collection(JobsCollectionName)
3✔
333
        _, err := collection.UpdateOne(ctx, bson.M{"_id": job.ID}, update, options)
3✔
334
        if err != nil {
3✔
335
                return err
×
336
        }
×
337

338
        return nil
3✔
339
}
340

341
// UpdateJobStatus set the task execution status for a job status
342
func (db *DataStoreMongo) UpdateJobStatus(
343
        ctx context.Context, job *model.Job, status int32) error {
3✔
344
        if model.StatusToString(status) == "unknown" {
4✔
345
                return model.ErrInvalidStatus
1✔
346
        }
1✔
347

348
        options := &mopts.UpdateOptions{}
3✔
349
        options.SetUpsert(true)
3✔
350

3✔
351
        collection := db.client.Database(db.dbName).
3✔
352
                Collection(JobsCollectionName)
3✔
353
        _, err := collection.UpdateOne(ctx, bson.M{
3✔
354
                "_id": job.ID,
3✔
355
        }, bson.M{
3✔
356
                "$set": bson.M{
3✔
357
                        "status": status,
3✔
358
                },
3✔
359
                "$setOnInsert": bson.M{
3✔
360
                        "workflow_name":    job.WorkflowName,
3✔
361
                        "input_parameters": job.InputParameters,
3✔
362
                        "results":          job.Results,
3✔
363
                        "insert_time":      job.InsertTime,
3✔
364
                        "version":          job.WorkflowVersion,
3✔
365
                },
3✔
366
        }, options)
3✔
367
        if err != nil {
3✔
368
                return err
×
369
        }
×
370

371
        return nil
3✔
372
}
373

374
// GetJobByNameAndID get the task execution status for a job
375
// by workflow name and ID
376
func (db *DataStoreMongo) GetJobByNameAndID(
377
        ctx context.Context, name string, ID string) (*model.Job, error) {
3✔
378
        collection := db.client.Database(db.dbName).
3✔
379
                Collection(JobsCollectionName)
3✔
380
        cur := collection.FindOne(ctx, bson.M{
3✔
381
                "_id":           ID,
3✔
382
                "workflow_name": name,
3✔
383
        })
3✔
384
        var job model.Job
3✔
385
        err := cur.Decode(&job)
3✔
386
        if err == mongo.ErrNoDocuments {
4✔
387
                return nil, nil
1✔
388
        } else if err != nil {
3✔
389
                return nil, err
×
390
        }
×
391

392
        return &job, nil
2✔
393
}
394

395
// GetJobByID get the task execution status for a job by ID
396
func (db *DataStoreMongo) GetJobByID(
397
        ctx context.Context, ID string) (*model.Job, error) {
2✔
398
        collection := db.client.Database(db.dbName).
2✔
399
                Collection(JobsCollectionName)
2✔
400
        cur := collection.FindOne(ctx, bson.M{
2✔
401
                "_id": ID,
2✔
402
        })
2✔
403
        var job model.Job
2✔
404
        err := cur.Decode(&job)
2✔
405
        if err == mongo.ErrNoDocuments {
4✔
406
                return nil, nil
2✔
407
        } else if err != nil {
3✔
408
                return nil, err
×
409
        }
×
410

411
        return &job, nil
1✔
412
}
413

414
func (db *DataStoreMongo) GetAllJobs(
415
        ctx context.Context, page int64, perPage int64) ([]model.Job, int64, error) {
1✔
416
        collection := db.client.Database(db.dbName).
1✔
417
                Collection(JobsCollectionName)
1✔
418
        findOptions := &options.FindOptions{}
1✔
419
        findOptions.SetSkip(int64((page - 1) * perPage))
1✔
420
        findOptions.SetLimit(int64(perPage))
1✔
421
        sortField := bson.M{}
1✔
422
        sortField["insert_time"] = -1
1✔
423
        findOptions.SetSort(sortField)
1✔
424
        cur, err := collection.Find(ctx, bson.M{}, findOptions)
1✔
425
        if err != nil {
1✔
426
                return []model.Job{}, 0, err
×
427
        }
×
428

429
        var jobs []model.Job
1✔
430
        err = cur.All(ctx, &jobs)
1✔
431
        if err == mongo.ErrNoDocuments {
1✔
432
                return []model.Job{}, 0, nil
×
433
        } else if err != nil {
1✔
434
                return []model.Job{}, 0, err
×
435
        }
×
436

437
        count, err := collection.CountDocuments(ctx, bson.M{})
1✔
438
        if err != nil {
1✔
439
                return []model.Job{}, 0, err
×
440
        }
×
441
        return jobs, count, nil
1✔
442
}
443

444
// Close disconnects the client
445
func (db *DataStoreMongo) Close() {
2✔
446
        ctx := context.Background()
2✔
447
        disconnectClient(ctx, db.client)
2✔
448
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc