• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / deviceconnect / 1376315854

12 Jul 2024 04:21PM UTC coverage: 76.156% (-0.2%) from 76.366%
1376315854

push

gitlab-ci

web-flow
Merge pull request #381 from mendersoftware/dependabot/go_modules/golang-dependencies-f8f4aaf149

chore: bump the golang-dependencies group with 4 updates

0 of 6 new or added lines in 2 files covered. (0.0%)

2 existing lines in 1 file now uncovered.

2453 of 3221 relevant lines covered (76.16%)

22.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.82
/store/mongo/datastore_mongo.go
1
// Copyright 2024 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package mongo
16

17
import (
18
        "context"
19
        "crypto/tls"
20
        "fmt"
21
        "io"
22
        "strings"
23
        "time"
24

25
        "github.com/google/uuid"
26
        "github.com/pkg/errors"
27
        "github.com/vmihailenco/msgpack/v5"
28
        "go.mongodb.org/mongo-driver/bson"
29
        "go.mongodb.org/mongo-driver/mongo"
30
        mopts "go.mongodb.org/mongo-driver/mongo/options"
31
        "go.mongodb.org/mongo-driver/mongo/writeconcern"
32

33
        "github.com/mendersoftware/go-lib-micro/config"
34
        "github.com/mendersoftware/go-lib-micro/identity"
35
        "github.com/mendersoftware/go-lib-micro/log"
36
        mdoc "github.com/mendersoftware/go-lib-micro/mongo/doc"
37
        "github.com/mendersoftware/go-lib-micro/mongo/migrate"
38
        mstore "github.com/mendersoftware/go-lib-micro/store/v2"
39
        "github.com/mendersoftware/go-lib-micro/ws"
40
        "github.com/mendersoftware/go-lib-micro/ws/shell"
41

42
        "github.com/mendersoftware/deviceconnect/app"
43
        dconfig "github.com/mendersoftware/deviceconnect/config"
44
        "github.com/mendersoftware/deviceconnect/model"
45
        "github.com/mendersoftware/deviceconnect/store"
46
        "github.com/mendersoftware/deviceconnect/utils"
47
)
48

49
var (
50
        clock                        utils.Clock = utils.RealClock{}
51
        recordingReadBufferSize                  = 1024
52
        ErrUnknownControlMessageType             = errors.New("unknown control message type")
53
        ErrRecordingDataInconsistent             = errors.New("recording data corrupt")
54
)
55

56
const (
57
        // DevicesCollectionName refers to the name of the collection of stored devices
58
        DevicesCollectionName = "devices"
59

60
        // SessionsCollectionName refers to the name of the collection of sessions
61
        SessionsCollectionName = "sessions"
62

63
        // RecordingsCollectionName name of the collection of session recordings
64
        RecordingsCollectionName = "recordings"
65

66
        // ControlCollectionName name of the collection of session control data
67
        ControlCollectionName = "control"
68

69
        dbFieldID        = "_id"
70
        dbFieldVersion   = "version"
71
        dbFieldSessionID = "session_id"
72
        dbFieldDeviceID  = "device_id"
73
        dbFieldStatus    = "status"
74
        dbFieldCreatedTs = "created_ts"
75
        dbFieldUpdatedTs = "updated_ts"
76
)
77

78
// SetupDataStore returns the mongo data store and optionally runs migrations
79
func SetupDataStore(automigrate bool) (store.DataStore, error) {
×
80
        ctx := context.Background()
×
81
        dbClient, err := NewClient(ctx, config.Config)
×
82
        if err != nil {
×
83
                return nil, errors.New(fmt.Sprintf("failed to connect to db: %v", err))
×
84
        }
×
85
        err = doMigrations(ctx, dbClient, automigrate)
×
86
        if err != nil {
×
87
                return nil, err
×
88
        }
×
89
        dataStore := NewDataStoreWithClient(dbClient,
×
90
                time.Second*time.Duration(config.Config.GetInt(dconfig.SettingRecordingExpireSec)))
×
91
        return dataStore, nil
×
92
}
93

94
func doMigrations(ctx context.Context, client *mongo.Client,
95
        automigrate bool) error {
×
96
        db := config.Config.GetString(dconfig.SettingDbName)
×
97
        dbs, err := migrate.GetTenantDbs(ctx, client, mstore.IsTenantDb(db))
×
98
        if err != nil {
×
99
                return errors.Wrap(err, "failed go retrieve tenant DBs")
×
100
        }
×
101
        if len(dbs) == 0 {
×
102
                dbs = []string{DbName}
×
103
        }
×
104

105
        for _, d := range dbs {
×
106
                err := Migrate(ctx, d, DbVersion, client, automigrate)
×
107
                if err != nil {
×
108
                        return errors.New(fmt.Sprintf("failed to run migrations: %v", err))
×
109
                }
×
110
        }
111
        return nil
×
112
}
113

114
// NewClient returns a mongo client
115
func NewClient(ctx context.Context, c config.Reader) (*mongo.Client, error) {
×
116

×
117
        clientOptions := mopts.Client()
×
118
        mongoURL := c.GetString(dconfig.SettingMongo)
×
119
        if !strings.Contains(mongoURL, "://") {
×
120
                return nil, errors.Errorf("Invalid mongoURL %q: missing schema.",
×
121
                        mongoURL)
×
122
        }
×
NEW
123
        clientOptions.ApplyURI(mongoURL).SetRegistry(newRegistry())
×
124

×
125
        username := c.GetString(dconfig.SettingDbUsername)
×
126
        if username != "" {
×
127
                credentials := mopts.Credential{
×
128
                        Username: c.GetString(dconfig.SettingDbUsername),
×
129
                }
×
130
                password := c.GetString(dconfig.SettingDbPassword)
×
131
                if password != "" {
×
132
                        credentials.Password = password
×
133
                        credentials.PasswordSet = true
×
134
                }
×
135
                clientOptions.SetAuth(credentials)
×
136
        }
137

138
        if c.GetBool(dconfig.SettingDbSSL) {
×
139
                tlsConfig := &tls.Config{}
×
140
                tlsConfig.InsecureSkipVerify = c.GetBool(dconfig.SettingDbSSLSkipVerify)
×
141
                clientOptions.SetTLSConfig(tlsConfig)
×
142
        }
×
143

144
        // Set writeconcern to acknowlage after write has propagated to the
145
        // mongod instance and commited to the file system journal.
146
        var wc *writeconcern.WriteConcern
×
147
        journal := true
×
148
        wc = &writeconcern.WriteConcern{
×
149
                W:       1,
×
150
                Journal: &journal,
×
151
        }
×
152
        clientOptions.SetWriteConcern(wc)
×
153

×
154
        // Set 10s timeout
×
155
        if _, ok := ctx.Deadline(); !ok {
×
156
                var cancel context.CancelFunc
×
157
                ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
×
158
                defer cancel()
×
159
        }
×
160
        client, err := mongo.Connect(ctx, clientOptions)
×
161
        if err != nil {
×
162
                return nil, errors.Wrap(err, "Failed to connect to mongo server")
×
163
        }
×
164

165
        // Validate connection
166
        if err = client.Ping(ctx, nil); err != nil {
×
167
                return nil, errors.Wrap(err, "Error reaching mongo server")
×
168
        }
×
169

170
        return client, nil
×
171
}
172

173
// DataStoreMongo is the data storage service
174
type DataStoreMongo struct {
175
        // client holds the reference to the client used to communicate with the
176
        // mongodb server.
177
        client          *mongo.Client
178
        recordingExpire time.Duration
179
}
180

181
// NewDataStoreWithClient initializes a DataStore object
182
func NewDataStoreWithClient(client *mongo.Client, expire time.Duration) store.DataStore {
1✔
183
        return &DataStoreMongo{
1✔
184
                client:          client,
1✔
185
                recordingExpire: expire,
1✔
186
        }
1✔
187
}
1✔
188

189
// Ping verifies the connection to the database
190
func (db *DataStoreMongo) Ping(ctx context.Context) error {
1✔
191
        res := db.client.Database(DbName).RunCommand(ctx, bson.M{"ping": 1})
1✔
192
        return res.Err()
1✔
193
}
1✔
194

195
// ProvisionDevice provisions a new device
196
func (db *DataStoreMongo) ProvisionDevice(ctx context.Context, tenantID, deviceID string) error {
2✔
197
        coll := db.client.Database(DbName).Collection(DevicesCollectionName)
2✔
198

2✔
199
        now := clock.Now().UTC()
2✔
200

2✔
201
        updateOpts := &mopts.UpdateOptions{}
2✔
202
        updateOpts.SetUpsert(true)
2✔
203
        _, err := coll.UpdateOne(ctx,
2✔
204
                bson.M{dbFieldID: deviceID, mstore.FieldTenantID: tenantID},
2✔
205
                bson.M{
2✔
206
                        "$setOnInsert": bson.M{
2✔
207
                                dbFieldStatus:        model.DeviceStatusUnknown,
2✔
208
                                dbFieldCreatedTs:     &now,
2✔
209
                                dbFieldUpdatedTs:     &now,
2✔
210
                                mstore.FieldTenantID: tenantID,
2✔
211
                        },
2✔
212
                },
2✔
213
                updateOpts,
2✔
214
        )
2✔
215
        return err
2✔
216
}
2✔
217

218
// DeleteDevice deletes a device
219
func (db *DataStoreMongo) DeleteDevice(ctx context.Context, tenantID, deviceID string) error {
1✔
220
        coll := db.client.Database(DbName).Collection(DevicesCollectionName)
1✔
221

1✔
222
        _, err := coll.DeleteOne(ctx, bson.M{dbFieldID: deviceID, mstore.FieldTenantID: tenantID})
1✔
223
        return err
1✔
224
}
1✔
225

226
// GetDevice returns a device
227
func (db *DataStoreMongo) GetDevice(
228
        ctx context.Context,
229
        tenantID string,
230
        deviceID string,
231
) (*model.Device, error) {
3✔
232
        coll := db.client.Database(DbName).Collection(DevicesCollectionName)
3✔
233

3✔
234
        cur := coll.FindOne(ctx, bson.M{dbFieldID: deviceID, mstore.FieldTenantID: tenantID})
3✔
235

3✔
236
        device := &model.Device{}
3✔
237
        if err := cur.Decode(&device); err != nil {
5✔
238
                if err == mongo.ErrNoDocuments {
4✔
239
                        return nil, nil
2✔
240
                }
2✔
241
                return nil, err
×
242
        }
243

244
        return device, nil
1✔
245
}
246

247
func (db *DataStoreMongo) SetDeviceConnected(
248
        ctx context.Context,
249
        tenantID, deviceID string,
250
) (int64, error) {
2✔
251
        coll := db.client.Database(DbName).Collection(DevicesCollectionName)
2✔
252

2✔
253
        updateOpts := mopts.FindOneAndUpdate().
2✔
254
                SetUpsert(true).
2✔
255
                SetReturnDocument(mopts.After).
2✔
256
                SetProjection(bson.M{"version": 1})
2✔
257

2✔
258
        now := clock.Now().UTC()
2✔
259

2✔
260
        var version struct {
2✔
261
                Version int64 `bson:"version"`
2✔
262
        }
2✔
263

2✔
264
        err := coll.FindOneAndUpdate(ctx,
2✔
265
                bson.M{dbFieldID: deviceID, mstore.FieldTenantID: tenantID},
2✔
266
                bson.M{
2✔
267
                        "$set": bson.M{
2✔
268
                                dbFieldStatus:    model.DeviceStatusConnected,
2✔
269
                                dbFieldUpdatedTs: &now,
2✔
270
                        },
2✔
271
                        "$inc": bson.M{"version": 1},
2✔
272
                        "$setOnInsert": bson.M{
2✔
273
                                dbFieldCreatedTs:     &now,
2✔
274
                                mstore.FieldTenantID: tenantID,
2✔
275
                        },
2✔
276
                },
2✔
277
                updateOpts,
2✔
278
        ).Decode(&version)
2✔
279

2✔
280
        return version.Version, err
2✔
281
}
2✔
282
func (db *DataStoreMongo) SetDeviceDisconnected(
283
        ctx context.Context,
284
        tenantID, deviceID string,
285
        version int64,
286
) error {
2✔
287
        coll := db.client.Database(DbName).Collection(DevicesCollectionName)
2✔
288

2✔
289
        now := clock.Now().UTC()
2✔
290

2✔
291
        _, err := coll.UpdateOne(ctx,
2✔
292
                bson.M{
2✔
293
                        dbFieldID:            deviceID,
2✔
294
                        mstore.FieldTenantID: tenantID,
2✔
295
                        dbFieldVersion:       version,
2✔
296
                },
2✔
297
                bson.M{
2✔
298
                        "$set": bson.M{
2✔
299
                                dbFieldStatus:    model.DeviceStatusDisconnected,
2✔
300
                                dbFieldUpdatedTs: &now,
2✔
301
                        },
2✔
302
                        "$setOnInsert": bson.M{
2✔
303
                                dbFieldCreatedTs:     &now,
2✔
304
                                mstore.FieldTenantID: tenantID,
2✔
305
                        },
2✔
306
                },
2✔
307
        )
2✔
308
        return err
2✔
309
}
2✔
310

311
// AllocateSession allocates a new session.
312
func (db *DataStoreMongo) AllocateSession(ctx context.Context, sess *model.Session) error {
3✔
313

3✔
314
        if err := sess.Validate(); err != nil {
4✔
315
                return errors.Wrap(err, "store: cannot allocate invalid Session")
1✔
316
        }
1✔
317

318
        coll := db.client.Database(DbName).Collection(SessionsCollectionName)
2✔
319
        tenantElem := bson.E{Key: mstore.FieldTenantID, Value: sess.TenantID}
2✔
320
        _, err := coll.InsertOne(ctx, mdoc.DocumentFromStruct(*sess, tenantElem))
2✔
321
        if err != nil {
3✔
322
                return errors.Wrap(err, "store: failed to allocate session")
1✔
323
        }
1✔
324

325
        return nil
1✔
326
}
327

328
// DeleteSession deletes a session
329
func (db *DataStoreMongo) DeleteSession(
330
        ctx context.Context, sessionID string,
331
) (*model.Session, error) {
4✔
332
        collSess := db.client.Database(DbName).
4✔
333
                Collection(SessionsCollectionName)
4✔
334

4✔
335
        sess := new(model.Session)
4✔
336
        err := collSess.FindOneAndDelete(
4✔
337
                ctx, mstore.WithTenantID(ctx, bson.D{{Key: dbFieldID, Value: sessionID}}),
4✔
338
        ).Decode(sess)
4✔
339
        if err != nil {
6✔
340
                if err == mongo.ErrNoDocuments {
3✔
341
                        return nil, store.ErrSessionNotFound
1✔
342
                } else {
2✔
343
                        return nil, err
1✔
344
                }
1✔
345
        }
346
        if idty := identity.FromContext(ctx); idty != nil {
3✔
347
                sess.TenantID = idty.Tenant
1✔
348
        }
1✔
349
        return sess, nil
2✔
350
}
351

352
// GetSession returns a session
353
func (db *DataStoreMongo) GetSession(
354
        ctx context.Context,
355
        sessionID string,
356
) (*model.Session, error) {
4✔
357
        collSess := db.client.
4✔
358
                Database(DbName).
4✔
359
                Collection(SessionsCollectionName)
4✔
360

4✔
361
        session := &model.Session{}
4✔
362
        err := collSess.
4✔
363
                FindOne(ctx, mstore.WithTenantID(ctx, bson.M{dbFieldID: sessionID})).
4✔
364
                Decode(session)
4✔
365
        if err != nil {
6✔
366
                if err == mongo.ErrNoDocuments {
3✔
367
                        return nil, store.ErrSessionNotFound
1✔
368
                }
1✔
369
                return nil, err
1✔
370
        }
371
        idty := identity.FromContext(ctx)
2✔
372
        if idty != nil {
3✔
373
                session.TenantID = idty.Tenant
1✔
374
        }
1✔
375

376
        return session, nil
2✔
377
}
378

379
func sendControlMessage(control app.Control, sessionID string, w io.Writer) (int, error) {
6✔
380
        messageType := ""
6✔
381
        var data []byte
6✔
382
        properties := make(map[string]interface{})
6✔
383

6✔
384
        switch control.Type {
6✔
385
        case app.DelayMessage:
6✔
386
                messageType = model.DelayMessageName
6✔
387
                properties[model.DelayMessageValueField] = control.DelayMs
6✔
388
        case app.ResizeMessage:
×
389
                messageType = shell.MessageTypeResizeShell
×
390
                properties[model.ResizeMessageTermHeightField] = control.TerminalHeight
×
391
                properties[model.ResizeMessageTermWidthField] = control.TerminalWidth
×
392
        default:
×
393
                return 0, ErrUnknownControlMessageType
×
394
        }
395

396
        msg := ws.ProtoMsg{
6✔
397
                Header: ws.ProtoHdr{
6✔
398
                        Proto:      ws.ProtoTypeShell,
6✔
399
                        MsgType:    messageType,
6✔
400
                        SessionID:  sessionID,
6✔
401
                        Properties: properties,
6✔
402
                },
6✔
403
                Body: data,
6✔
404
        }
6✔
405
        messagePacked, err := msgpack.Marshal(&msg)
6✔
406
        if err != nil {
6✔
407
                return 0, err
×
408
        } else {
6✔
409
                return w.Write(messagePacked)
6✔
410
        }
6✔
411
}
412

413
// GetSession writes session recordings to given io.Writer
414
func (db *DataStoreMongo) WriteSessionRecords(ctx context.Context,
415
        sessionID string,
416
        w io.Writer) error {
2✔
417
        l := log.FromContext(ctx)
2✔
418
        collRecording := db.client.Database(DbName).
2✔
419
                Collection(RecordingsCollectionName)
2✔
420
        collControl := db.client.Database(DbName).
2✔
421
                Collection(ControlCollectionName)
2✔
422

2✔
423
        findOptions := mopts.Find()
2✔
424
        sortField := bson.M{
2✔
425
                "created_ts": 1,
2✔
426
        }
2✔
427
        findOptions.SetSort(sortField)
2✔
428
        recordingsCursor, err := collRecording.Find(ctx,
2✔
429
                mstore.WithTenantID(ctx, bson.M{
2✔
430
                        dbFieldSessionID: sessionID,
2✔
431
                }),
2✔
432
                findOptions,
2✔
433
        )
2✔
434
        if err != nil {
2✔
435
                return err
×
436
        }
×
437
        defer recordingsCursor.Close(ctx)
2✔
438

2✔
439
        controlCursor, err := collControl.Find(ctx,
2✔
440
                mstore.WithTenantID(ctx, bson.M{
2✔
441
                        dbFieldSessionID: sessionID,
2✔
442
                }),
2✔
443
                findOptions,
2✔
444
        )
2✔
445
        if err != nil {
2✔
446
                return err
×
447
        }
×
448
        defer controlCursor.Close(ctx)
2✔
449

2✔
450
        controlReader := NewControlMessageReader(ctx, controlCursor)
2✔
451
        recordingReader := NewRecordingReader(ctx, recordingsCursor)
2✔
452

2✔
453
        recordingWriter := NewRecordingWriter(sessionID, w)
2✔
454
        recordingBuffer := make([]byte, recordingReadBufferSize)
2✔
455
        recordingBytesSent := 0
2✔
456
        for {
10✔
457
                control := controlReader.Pop()
8✔
458
                if control == nil {
10✔
459
                        l.Debug("WriteSessionRecords: no more control " +
2✔
460
                                "messages, flushing the recording upstream.")
2✔
461
                        //no more control messages, we send the whole recording
2✔
462
                        n, err := io.Copy(recordingWriter, recordingReader)
2✔
463
                        if err != nil && err != io.ErrShortWrite && n < 1 {
2✔
464
                                l.Errorf("WriteSessionRecords: "+
×
465
                                        "error writing recording data, err: %+v n:%d",
×
466
                                        err, n)
×
467
                        }
×
468
                        if n == 0 {
2✔
469
                                l.Errorf("WriteSessionRecords: "+
×
470
                                        "failed to write any recording data, err: %+v",
×
471
                                        err)
×
472
                        } else {
2✔
473
                                recordingBytesSent += int(n)
2✔
474
                        }
2✔
475
                        break
2✔
476
                } else {
6✔
477
                        if recordingBytesSent > control.Offset {
6✔
478
                                //this should never happen, we missed
×
479
                                //the control message, data inconsistency
×
480
                                l.Errorf("WriteSessionRecords: recordingBytesSent > control.Offset")
×
481
                                err = ErrRecordingDataInconsistent
×
482
                                break
×
483
                        }
484

485
                        bytesUntilControlMessage := control.Offset - recordingBytesSent
6✔
486
                        l.Debugf("(1) WriteSessionRecords: control.Offset:%d"+
6✔
487
                                " recordingBytesSent:%d "+
6✔
488
                                " bytesUntilControlMessage:%d (recordingBuffer.len=%d) "+
6✔
489
                                "reading up to %d bytes of recording and sending.",
6✔
490
                                control.Offset,
6✔
491
                                recordingBytesSent,
6✔
492
                                bytesUntilControlMessage,
6✔
493
                                len(recordingBuffer),
6✔
494
                                control.Offset-recordingBytesSent)
6✔
495
                        //it is possible that the recording is larger than one recordingBuffer,
6✔
496
                        //we need to send until we have the control.Offset in the buffer
6✔
497
                        for bytesUntilControlMessage > len(recordingBuffer) {
6✔
498
                                n, e := recordingReader.Read(recordingBuffer)
×
499
                                if n > 0 {
×
500
                                        _, err = sendRecordingMessage(recordingBuffer[:n],
×
501
                                                sessionID,
×
502
                                                w)
×
503
                                        if err != nil {
×
504
                                                l.Errorf("error sending recording data: %s",
×
505
                                                        err.Error())
×
506
                                                break
×
507
                                        }
508
                                        recordingBytesSent += n
×
509
                                        bytesUntilControlMessage = control.Offset -
×
510
                                                recordingBytesSent
×
511
                                }
512
                                if e != nil || n == 0 {
×
513
                                        break
×
514
                                }
515
                        }
516
                        if err != nil {
6✔
517
                                break
×
518
                        }
519

520
                        bytesUntilControlMessage = control.Offset - recordingBytesSent
6✔
521
                        l.Debugf("(2) WriteSessionRecords: control.Offset:%d"+
6✔
522
                                " recordingBytesSent:%d "+
6✔
523
                                " bytesUntilControlMessage:%d (recordingBuffer.len=%d) "+
6✔
524
                                "reading up to %d bytes of recording and sending.",
6✔
525
                                control.Offset,
6✔
526
                                recordingBytesSent,
6✔
527
                                bytesUntilControlMessage,
6✔
528
                                len(recordingBuffer),
6✔
529
                                bytesUntilControlMessage)
6✔
530
                        //this means that the control offset is in the future
6✔
531
                        //part of the recording buffer
6✔
532
                        //we can send up to control.Offset-recordingBytesSent
6✔
533
                        //bytes and then send the control message
6✔
534
                        n, e := recordingReader.Read(recordingBuffer[:bytesUntilControlMessage])
6✔
535
                        l.Debugf("recordingReader.Read(len=%d)=%d,%+v",
6✔
536
                                bytesUntilControlMessage, n, e)
6✔
537
                        if n > 0 {
12✔
538
                                _, err = sendRecordingMessage(recordingBuffer[:n], sessionID, w)
6✔
539
                                if err != nil {
6✔
540
                                        l.Errorf("error sending recording data: %s",
×
541
                                                err.Error())
×
542
                                        break
×
543
                                }
544
                                recordingBytesSent += n
6✔
545
                        }
546
                        l.Debugf("WriteSessionRecords: sending %+v.", *control)
6✔
547
                        _, err = sendControlMessage(*control, sessionID, w)
6✔
548
                        if err != nil {
6✔
549
                                l.Errorf("error sending recording data: %s",
×
550
                                        err.Error())
×
551
                                break
×
552
                        }
553
                }
554
        }
555
        l.Infof("session playback: WriteSessionRecords: sent %d bytes.", recordingBytesSent)
2✔
556

2✔
557
        return err
2✔
558
}
559

560
// SetSession saves a session recording
561
func (db *DataStoreMongo) InsertSessionRecording(ctx context.Context,
562
        sessionID string,
563
        sessionBytes []byte) error {
2✔
564
        coll := db.client.Database(DbName).Collection(RecordingsCollectionName)
2✔
565

2✔
566
        now := clock.Now().UTC()
2✔
567
        recording := model.Recording{
2✔
568
                ID:        uuid.New(),
2✔
569
                SessionID: sessionID,
2✔
570
                Recording: sessionBytes,
2✔
571
                CreatedTs: now,
2✔
572
                ExpireTs:  now.Add(db.recordingExpire),
2✔
573
        }
2✔
574
        _, err := coll.InsertOne(ctx,
2✔
575
                mstore.WithTenantID(ctx, &recording),
2✔
576
        )
2✔
577
        return err
2✔
578
}
2✔
579

580
// Inserts control data recording
581
func (db *DataStoreMongo) InsertControlRecording(ctx context.Context,
582
        sessionID string,
583
        sessionBytes []byte) error {
×
584
        coll := db.client.Database(DbName).
×
585
                Collection(ControlCollectionName)
×
586

×
587
        now := clock.Now().UTC()
×
588
        recording := model.ControlData{
×
589
                ID:        uuid.New(),
×
590
                SessionID: sessionID,
×
591
                Control:   sessionBytes,
×
592
                CreatedTs: now,
×
593
                ExpireTs:  now.Add(db.recordingExpire),
×
594
        }
×
595
        _, err := coll.InsertOne(ctx,
×
596
                mstore.WithTenantID(ctx, &recording),
×
597
        )
×
598
        return err
×
599
}
×
600

601
// Close disconnects the client
602
func (db *DataStoreMongo) Close() error {
×
603
        ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
×
604
        defer cancel()
×
605
        err := db.client.Disconnect(ctx)
×
606
        return err
×
607
}
×
608

609
//nolint:unused
610
func (db *DataStoreMongo) DropDatabase() error {
26✔
611
        ctx := context.Background()
26✔
612
        err := db.client.Database(DbName).Drop(ctx)
26✔
613
        return err
26✔
614
}
26✔
615

616
func (db *DataStoreMongo) DeleteTenant(ctx context.Context, tenantID string) error {
1✔
617
        database := db.client.Database(DbName)
1✔
618
        collectionNames, err := database.ListCollectionNames(ctx, mopts.ListCollectionsOptions{})
1✔
619
        if err != nil {
1✔
620
                return err
×
621
        }
×
622
        for _, collName := range collectionNames {
2✔
623
                collection := database.Collection(collName)
1✔
624
                _, e := collection.DeleteMany(ctx, mstore.WithTenantID(ctx, bson.D{}))
1✔
625
                if e != nil {
1✔
626
                        return e
×
627
                }
×
628
        }
629
        return nil
1✔
630
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc