• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1802588196

06 May 2025 10:48AM UTC coverage: 66.322% (+1.0%) from 65.288%
1802588196

Pull #631

gitlab-ci

lluiscampos
feat(create-artifact-worker): Update `mender-artifact` to latest v4.1.0

Modifying also the integration to build the tool from source instead or
repurposing the upstream Debian package. This has the main advantage
that we can compile it statically (by disabling a feature that we don't
use) and that we have control of the compatibility aspects of the
binary.

Ticket: MEN-8337

Signed-off-by: Lluis Campos <lluis.campos@northern.tech>
Pull Request #631: MEN-8337: feat(create-artifact-worker): Update `mender-artifact` to latest v4.1.0

29522 of 44513 relevant lines covered (66.32%)

1.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.22
/backend/services/workflows/store/mongo/datastore_mongo.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package mongo
16

17
import (
18
        "context"
19
        "crypto/tls"
20
        "fmt"
21
        "strconv"
22
        "strings"
23
        "time"
24

25
        "github.com/pkg/errors"
26
        "go.mongodb.org/mongo-driver/bson"
27
        "go.mongodb.org/mongo-driver/bson/primitive"
28
        "go.mongodb.org/mongo-driver/mongo"
29
        "go.mongodb.org/mongo-driver/mongo/options"
30
        mopts "go.mongodb.org/mongo-driver/mongo/options"
31

32
        "github.com/mendersoftware/mender-server/pkg/config"
33
        "github.com/mendersoftware/mender-server/pkg/log"
34

35
        dconfig "github.com/mendersoftware/mender-server/services/workflows/config"
36
        "github.com/mendersoftware/mender-server/services/workflows/model"
37
        "github.com/mendersoftware/mender-server/services/workflows/store"
38
)
39

40
const (
41
        // JobsCollectionName refers to the collection of finished or
42
        // jobs in progress.
43
        JobsCollectionName = "jobs"
44

45
        // WorkflowCollectionName refers to the collection of stored workflows
46
        WorkflowCollectionName = "workflows"
47
)
48

49
var (
50
        ErrNoSuchWorkflowByVersion = errors.New("Workflow of the given version not found")
51
)
52

53
// SetupDataStore returns the mongo data store and optionally runs migrations
54
func SetupDataStore(automigrate bool) (*DataStoreMongo, error) {
3✔
55
        ctx := context.Background()
3✔
56
        dbClient, err := NewClient(ctx, config.Config)
3✔
57
        if err != nil {
3✔
58
                return nil, errors.New(fmt.Sprintf("failed to connect to db: %v", err))
×
59
        }
×
60
        err = doMigrations(ctx, dbClient, automigrate)
3✔
61
        if err != nil {
3✔
62
                return nil, err
×
63
        }
×
64
        dataStore := NewDataStoreWithClient(dbClient, config.Config)
3✔
65
        return dataStore, nil
3✔
66
}
67

68
func doMigrations(ctx context.Context, client *mongo.Client,
69
        automigrate bool) error {
3✔
70
        db := config.Config.GetString(dconfig.SettingDbName)
3✔
71
        err := Migrate(ctx, db, DbVersion, client, automigrate)
3✔
72
        if err != nil {
3✔
73
                return errors.New(fmt.Sprintf("failed to run migrations: %v", err))
×
74
        }
×
75

76
        return nil
3✔
77
}
78

79
func disconnectClient(parentCtx context.Context, client *mongo.Client) {
2✔
80
        ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second)
2✔
81
        defer cancel()
2✔
82
        _ = client.Disconnect(ctx)
2✔
83
}
2✔
84

85
// NewClient returns a mongo client
86
func NewClient(_ context.Context, c config.Reader) (*mongo.Client, error) {
3✔
87

3✔
88
        clientOptions := mopts.Client()
3✔
89
        mongoURL := c.GetString(dconfig.SettingMongo)
3✔
90
        if !strings.Contains(mongoURL, "://") {
3✔
91
                return nil, errors.Errorf("Invalid mongoURL %q: missing schema.",
×
92
                        mongoURL)
×
93
        }
×
94
        clientOptions.ApplyURI(mongoURL).
3✔
95
                SetRegistry(newRegistry())
3✔
96

3✔
97
        username := c.GetString(dconfig.SettingDbUsername)
3✔
98
        if username != "" {
3✔
99
                credentials := mopts.Credential{
×
100
                        Username: c.GetString(dconfig.SettingDbUsername),
×
101
                }
×
102
                password := c.GetString(dconfig.SettingDbPassword)
×
103
                if password != "" {
×
104
                        credentials.Password = password
×
105
                        credentials.PasswordSet = true
×
106
                }
×
107
                clientOptions.SetAuth(credentials)
×
108
        }
109

110
        if c.GetBool(dconfig.SettingDbSSL) {
3✔
111
                tlsConfig := &tls.Config{}
×
112
                tlsConfig.InsecureSkipVerify = c.GetBool(dconfig.SettingDbSSLSkipVerify)
×
113
                clientOptions.SetTLSConfig(tlsConfig)
×
114
        }
×
115

116
        // Set 10s timeout
117
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
3✔
118
        defer cancel()
3✔
119
        client, err := mongo.Connect(ctx, clientOptions)
3✔
120
        if err != nil {
3✔
121
                return nil, errors.Wrap(err, "Failed to connect to mongo server")
×
122
        }
×
123

124
        // Validate connection
125
        if err = client.Ping(ctx, nil); err != nil {
3✔
126
                return nil, errors.Wrap(err, "Error reaching mongo server")
×
127
        }
×
128

129
        return client, nil
3✔
130
}
131

132
// DataStoreMongo is the data storage service
133
type DataStoreMongo struct {
134
        // client holds the reference to the client used to communicate with the
135
        // mongodb server.
136
        client *mongo.Client
137
        // dbName contains the name of the workflow database.
138
        dbName string
139
        // workflows holds a local cache of workflows - a worker should NEVER
140
        // access this cache directly, but through
141
        // DataStoreMongo.GetWorkflowByName.
142
        workflows map[string]*model.Workflow
143
}
144

145
// NewDataStoreWithClient initializes a DataStore object
146
func NewDataStoreWithClient(client *mongo.Client, c config.Reader) *DataStoreMongo {
3✔
147
        dbName := c.GetString(dconfig.SettingDbName)
3✔
148
        ctx := context.Background()
3✔
149

3✔
150
        // Maybe initialize workflows
3✔
151
        var findResults []*model.Workflow
3✔
152
        workflows := make(map[string]*model.Workflow)
3✔
153
        database := client.Database(dbName)
3✔
154
        collWflows := database.Collection(WorkflowCollectionName)
3✔
155
        cur, err := collWflows.Find(ctx, bson.M{})
3✔
156
        if err == nil {
6✔
157
                if err = cur.All(ctx, &findResults); err == nil {
6✔
158
                        for _, workflow := range findResults {
5✔
159
                                workflows[workflow.Name+"."+strconv.Itoa(workflow.Version)] = workflow
2✔
160
                        }
2✔
161
                }
162
        }
163

164
        return &DataStoreMongo{
3✔
165
                client:    client,
3✔
166
                dbName:    dbName,
3✔
167
                workflows: workflows,
3✔
168
        }
3✔
169
}
170

171
func (db *DataStoreMongo) Ping(ctx context.Context) error {
3✔
172
        res := db.client.Database(db.dbName).
3✔
173
                RunCommand(ctx, bson.M{"ping": 1})
3✔
174
        return res.Err()
3✔
175
}
3✔
176

177
// LoadWorkflows from filesystem if the workflowsPath setting is provided
178
func (db *DataStoreMongo) LoadWorkflows(ctx context.Context, l *log.Logger) error {
3✔
179
        workflowsPath := config.Config.GetString(dconfig.SettingWorkflowsPath)
3✔
180
        if workflowsPath != "" {
6✔
181
                workflows := model.GetWorkflowsFromPath(workflowsPath)
3✔
182
                l.Infof("LoadWorkflows: loading %d workflows from %s.", len(workflows), workflowsPath)
3✔
183
                for _, workflow := range workflows {
6✔
184
                        l.Infof("LoadWorkflows: loading %s v%d.", workflow.Name, workflow.Version)
3✔
185
                        count, err := db.InsertWorkflows(ctx, *workflow)
3✔
186
                        if count != 1 {
3✔
187
                                l.Infof("LoadWorkflows: not loaded: %s v%d.", workflow.Name, workflow.Version)
×
188
                        }
×
189
                        if err != nil {
5✔
190
                                l.Infof(
2✔
191
                                        "LoadWorkflows: error loading: %s v%d: %s.",
2✔
192
                                        workflow.Name,
2✔
193
                                        workflow.Version,
2✔
194
                                        err.Error(),
2✔
195
                                )
2✔
196
                        }
2✔
197
                }
198
        } else {
×
199
                l.Info("LoadWorkflows: empty workflowsPath, not loading workflows")
×
200
        }
×
201
        return nil
3✔
202
}
203

204
// InsertWorkflows inserts a workflow to the database and cache and returns the number of
205
// inserted elements or an error for the first error generated.
206
func (db *DataStoreMongo) InsertWorkflows(
207
        ctx context.Context,
208
        workflows ...model.Workflow,
209
) (int, error) {
3✔
210
        database := db.client.Database(db.dbName)
3✔
211
        collWflows := database.Collection(WorkflowCollectionName)
3✔
212
        for i, workflow := range workflows {
6✔
213
                if workflow.Name == "" {
4✔
214
                        return i, store.ErrWorkflowMissingName
1✔
215
                }
1✔
216
                workflowDb, _ := db.GetWorkflowByName(ctx, workflow.Name, strconv.Itoa(workflow.Version))
3✔
217
                if workflowDb != nil && workflowDb.Version >= workflow.Version {
6✔
218
                        return i + 1, store.ErrWorkflowAlreadyExists
3✔
219
                }
3✔
220
                if workflowDb == nil || workflowDb.Version < workflow.Version {
6✔
221
                        upsert := true
3✔
222
                        opt := &mopts.UpdateOptions{
3✔
223
                                Upsert: &upsert,
3✔
224
                        }
3✔
225
                        query := bson.M{"_id": workflow.Name}
3✔
226
                        update := bson.M{"$set": workflow}
3✔
227
                        if _, err := collWflows.UpdateOne(ctx, query, update, opt); err != nil {
3✔
228
                                return i + 1, err
×
229
                        }
×
230
                }
231
                db.workflows[workflow.Name] = &workflow
3✔
232
        }
233
        return len(workflows), nil
3✔
234
}
235

236
// GetWorkflowByName gets the workflow with the given name - either from the
237
// cache, or searches the database if the workflow is not cached.
238
func (db *DataStoreMongo) GetWorkflowByName(
239
        ctx context.Context,
240
        workflowName string,
241
        version string,
242
) (*model.Workflow, error) {
3✔
243
        workflow, ok := db.workflows[workflowName]
3✔
244
        l := log.FromContext(ctx)
3✔
245

3✔
246
        versionNumber, e := strconv.Atoi(version)
3✔
247
        if e != nil {
5✔
248
                versionNumber = 0
2✔
249
        }
2✔
250

251
        if !ok || workflow.Version < versionNumber {
6✔
252
                var result model.Workflow
3✔
253
                database := db.client.Database(db.dbName)
3✔
254
                collWflows := database.Collection(WorkflowCollectionName)
3✔
255
                err := collWflows.FindOne(ctx, bson.M{"_id": workflowName}).
3✔
256
                        Decode(&result)
3✔
257
                if err != nil {
6✔
258
                        return nil, err
3✔
259
                }
3✔
260
                if result.Version < versionNumber {
4✔
261
                        l.Errorf("workflow found but with version too low: %s v%s", workflowName, version)
1✔
262
                        return nil, ErrNoSuchWorkflowByVersion
1✔
263
                }
1✔
264
                db.workflows[result.Name] = &result
3✔
265
                return &result, err
3✔
266
        } else {
3✔
267
                l.Debugf("cache hit: %s v%s", workflowName, version)
3✔
268
        }
3✔
269
        return workflow, nil
3✔
270
}
271

272
// GetWorkflows gets all workflows from the cache as a list
273
// (should only be used by the server process)
274
func (db *DataStoreMongo) GetWorkflows(ctx context.Context) []model.Workflow {
2✔
275
        workflows := make([]model.Workflow, len(db.workflows))
2✔
276
        var i int
2✔
277
        for _, workflow := range db.workflows {
4✔
278
                workflows[i] = *workflow
2✔
279
                i++
2✔
280
        }
2✔
281

282
        return workflows
2✔
283
}
284

285
// UpsertJob inserts the job in the queue
286
func (db *DataStoreMongo) UpsertJob(
287
        ctx context.Context, job *model.Job) (*model.Job, error) {
3✔
288
        if job.ID == "" {
4✔
289
                job.ID = primitive.NewObjectID().Hex()
1✔
290
        }
1✔
291
        query := bson.M{
3✔
292
                "_id": job.ID,
3✔
293
        }
3✔
294
        update := bson.M{
3✔
295
                "$set": job,
3✔
296
        }
3✔
297
        findUpdateOptions := &mopts.FindOneAndUpdateOptions{}
3✔
298
        findUpdateOptions.SetReturnDocument(mopts.After)
3✔
299
        findUpdateOptions.SetUpsert(true)
3✔
300

3✔
301
        database := db.client.Database(db.dbName)
3✔
302
        collJobs := database.Collection(JobsCollectionName)
3✔
303

3✔
304
        err := collJobs.FindOneAndUpdate(ctx, query, update, findUpdateOptions).Decode(job)
3✔
305
        if err != nil {
3✔
306
                return nil, err
×
307
        }
×
308

309
        return job, nil
3✔
310
}
311

312
// UpdateJobAddResult add a task execution result to a job status
313
func (db *DataStoreMongo) UpdateJobAddResult(ctx context.Context,
314
        job *model.Job, result *model.TaskResult) error {
3✔
315
        options := &mopts.UpdateOptions{}
3✔
316
        options.SetUpsert(true)
3✔
317

3✔
318
        update := bson.M{
3✔
319
                "$addToSet": bson.M{
3✔
320
                        "results": result,
3✔
321
                },
3✔
322
                "$setOnInsert": bson.M{
3✔
323
                        "workflow_name":    job.WorkflowName,
3✔
324
                        "input_parameters": job.InputParameters,
3✔
325
                        "status":           job.Status,
3✔
326
                        "insert_time":      job.InsertTime,
3✔
327
                        "version":          job.WorkflowVersion,
3✔
328
                },
3✔
329
        }
3✔
330

3✔
331
        collection := db.client.Database(db.dbName).
3✔
332
                Collection(JobsCollectionName)
3✔
333
        _, err := collection.UpdateOne(ctx, bson.M{"_id": job.ID}, update, options)
3✔
334
        if err != nil {
3✔
335
                return err
×
336
        }
×
337

338
        return nil
3✔
339
}
340

341
// UpdateJobStatus set the task execution status for a job status
342
func (db *DataStoreMongo) UpdateJobStatus(
343
        ctx context.Context, job *model.Job, status int32) error {
3✔
344
        if model.StatusToString(status) == "unknown" {
4✔
345
                return model.ErrInvalidStatus
1✔
346
        }
1✔
347

348
        options := &mopts.UpdateOptions{}
3✔
349
        options.SetUpsert(true)
3✔
350

3✔
351
        collection := db.client.Database(db.dbName).
3✔
352
                Collection(JobsCollectionName)
3✔
353
        _, err := collection.UpdateOne(ctx, bson.M{
3✔
354
                "_id": job.ID,
3✔
355
        }, bson.M{
3✔
356
                "$set": bson.M{
3✔
357
                        "status": status,
3✔
358
                },
3✔
359
                "$setOnInsert": bson.M{
3✔
360
                        "workflow_name":    job.WorkflowName,
3✔
361
                        "input_parameters": job.InputParameters,
3✔
362
                        "results":          job.Results,
3✔
363
                        "insert_time":      job.InsertTime,
3✔
364
                        "version":          job.WorkflowVersion,
3✔
365
                },
3✔
366
        }, options)
3✔
367
        if err != nil {
3✔
368
                return err
×
369
        }
×
370

371
        return nil
3✔
372
}
373

374
// GetJobByNameAndID get the task execution status for a job
375
// by workflow name and ID
376
func (db *DataStoreMongo) GetJobByNameAndID(
377
        ctx context.Context, name string, ID string) (*model.Job, error) {
3✔
378
        collection := db.client.Database(db.dbName).
3✔
379
                Collection(JobsCollectionName)
3✔
380
        cur := collection.FindOne(ctx, bson.M{
3✔
381
                "_id":           ID,
3✔
382
                "workflow_name": name,
3✔
383
        })
3✔
384
        var job model.Job
3✔
385
        err := cur.Decode(&job)
3✔
386
        if err == mongo.ErrNoDocuments {
4✔
387
                return nil, nil
1✔
388
        } else if err != nil {
3✔
389
                return nil, err
×
390
        }
×
391

392
        return &job, nil
2✔
393
}
394

395
// GetJobByID get the task execution status for a job by ID
396
func (db *DataStoreMongo) GetJobByID(
397
        ctx context.Context, ID string) (*model.Job, error) {
2✔
398
        collection := db.client.Database(db.dbName).
2✔
399
                Collection(JobsCollectionName)
2✔
400
        cur := collection.FindOne(ctx, bson.M{
2✔
401
                "_id": ID,
2✔
402
        })
2✔
403
        var job model.Job
2✔
404
        err := cur.Decode(&job)
2✔
405
        if err == mongo.ErrNoDocuments {
4✔
406
                return nil, nil
2✔
407
        } else if err != nil {
3✔
408
                return nil, err
×
409
        }
×
410

411
        return &job, nil
1✔
412
}
413

414
func (db *DataStoreMongo) GetAllJobs(
415
        ctx context.Context, page int64, perPage int64) ([]model.Job, int64, error) {
1✔
416
        collection := db.client.Database(db.dbName).
1✔
417
                Collection(JobsCollectionName)
1✔
418
        findOptions := &options.FindOptions{}
1✔
419
        findOptions.SetSkip(int64((page - 1) * perPage))
1✔
420
        findOptions.SetLimit(int64(perPage))
1✔
421
        sortField := bson.M{}
1✔
422
        sortField["insert_time"] = -1
1✔
423
        findOptions.SetSort(sortField)
1✔
424
        cur, err := collection.Find(ctx, bson.M{}, findOptions)
1✔
425
        if err != nil {
1✔
426
                return []model.Job{}, 0, err
×
427
        }
×
428

429
        var jobs []model.Job
1✔
430
        err = cur.All(ctx, &jobs)
1✔
431
        if err == mongo.ErrNoDocuments {
1✔
432
                return []model.Job{}, 0, nil
×
433
        } else if err != nil {
1✔
434
                return []model.Job{}, 0, err
×
435
        }
×
436

437
        count, err := collection.CountDocuments(ctx, bson.M{})
1✔
438
        if err != nil {
1✔
439
                return []model.Job{}, 0, err
×
440
        }
×
441
        return jobs, count, nil
1✔
442
}
443

444
// Close disconnects the client
445
func (db *DataStoreMongo) Close() {
2✔
446
        ctx := context.Background()
2✔
447
        disconnectClient(ctx, db.client)
2✔
448
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc