• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1783459323

24 Apr 2025 06:49AM UTC coverage: 65.258% (-0.008%) from 65.266%
1783459323

Pull #573

gitlab-ci

mzedel
chore: debug

Signed-off-by: Manuel Zedel <manuel.zedel@northern.tech>
Pull Request #573: QA-977 - reuse auth info across e2e tests

31802 of 48733 relevant lines covered (65.26%)

1.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.0
/backend/services/deviceconnect/api/http/management.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package http
16

17
import (
18
        "bufio"
19
        "context"
20
        "encoding/binary"
21
        "encoding/json"
22
        "io"
23
        "net/http"
24
        "strconv"
25
        "sync"
26
        "time"
27

28
        "github.com/gin-gonic/gin"
29
        validation "github.com/go-ozzo/ozzo-validation/v4"
30
        "github.com/gorilla/websocket"
31
        natsio "github.com/nats-io/nats.go"
32
        "github.com/pkg/errors"
33
        "github.com/vmihailenco/msgpack/v5"
34

35
        "github.com/mendersoftware/mender-server/pkg/identity"
36
        "github.com/mendersoftware/mender-server/pkg/log"
37
        "github.com/mendersoftware/mender-server/pkg/requestid"
38
        "github.com/mendersoftware/mender-server/pkg/rest.utils"
39
        "github.com/mendersoftware/mender-server/pkg/ws"
40
        "github.com/mendersoftware/mender-server/pkg/ws/menderclient"
41
        "github.com/mendersoftware/mender-server/pkg/ws/shell"
42

43
        "github.com/mendersoftware/mender-server/services/deviceconnect/app"
44
        "github.com/mendersoftware/mender-server/services/deviceconnect/client/nats"
45
        "github.com/mendersoftware/mender-server/services/deviceconnect/model"
46
)
47

48
// HTTP errors
49
var (
50
        ErrMissingUserAuthentication = errors.New(
51
                "missing or non-user identity in the authorization headers",
52
        )
53
        ErrMsgSessionLimit = "session byte limit exceeded"
54

55
        //The name of the field holding a number of milliseconds to sleep between
56
        //the consecutive writes of session recording data. Note that it does not have
57
        //anything to do with the sleep between the keystrokes send, lines printed,
58
        //or screen blinks, we are only aware of the stream of bytes.
59
        PlaybackSleepIntervalMsField = "sleep_ms"
60

61
        //The name of the field in the query parameter to GET that holds the id of a session
62
        PlaybackSessionIDField = "sessionId"
63

64
        //The threshold between the shell commands received (keystrokes) above which the
65
        //delay control message is saved (1.5 seconds)
66
        keyStrokeDelayRecordingThresholdNs = int64(1500 * 1000000)
67

68
        //The key stroke delay is recorded in two bytes, so this is the maximal
69
        //possible delay. We round down to this if the real delay is larger
70
        keyStrokeMaxDelayRecording = int64(65535 * 1000000)
71
)
72

73
const channelSize = 25 // TODO make configurable
74

75
const (
76
        PropertyUserID = "user_id"
77
)
78

79
var wsUpgrader = websocket.Upgrader{
80
        Subprotocols: []string{"protomsg/msgpack"},
81
        CheckOrigin:  allowAllOrigins,
82
        Error: func(
83
                w http.ResponseWriter, r *http.Request, s int, e error,
84
        ) {
3✔
85
                w.WriteHeader(s)
3✔
86
                enc := json.NewEncoder(w)
3✔
87
                _ = enc.Encode(rest.Error{
3✔
88
                        Err:       e.Error(),
3✔
89
                        RequestID: requestid.FromContext(r.Context())},
3✔
90
                )
3✔
91
        },
3✔
92
}
93

94
// ManagementController container for end-points
95
type ManagementController struct {
96
        app  app.App
97
        nats nats.Client
98
}
99

100
// NewManagementController returns a new ManagementController
101
func NewManagementController(
102
        app app.App,
103
        nc nats.Client,
104
) *ManagementController {
3✔
105
        return &ManagementController{
3✔
106
                app:  app,
3✔
107
                nats: nc,
3✔
108
        }
3✔
109
}
3✔
110

111
// GetDevice returns a device
112
func (h ManagementController) GetDevice(c *gin.Context) {
3✔
113
        ctx := c.Request.Context()
3✔
114

3✔
115
        idata := identity.FromContext(ctx)
3✔
116
        if idata == nil || !idata.IsUser {
3✔
117
                c.JSON(http.StatusBadRequest, gin.H{
×
118
                        "error": ErrMissingUserAuthentication.Error(),
×
119
                })
×
120
                return
×
121
        }
×
122
        tenantID := idata.Tenant
3✔
123
        deviceID := c.Param("deviceId")
3✔
124

3✔
125
        device, err := h.app.GetDevice(ctx, tenantID, deviceID)
3✔
126
        if err == app.ErrDeviceNotFound {
4✔
127
                c.JSON(http.StatusNotFound, gin.H{
1✔
128
                        "error": err.Error(),
1✔
129
                })
1✔
130
                return
1✔
131
        } else if err != nil {
5✔
132
                c.JSON(http.StatusBadRequest, gin.H{
1✔
133
                        "error": err.Error(),
1✔
134
                })
1✔
135
                return
1✔
136
        }
1✔
137

138
        c.JSON(http.StatusOK, device)
3✔
139
}
140

141
// Connect extracts identity from request, checks user permissions
142
// and calls ConnectDevice
143
func (h ManagementController) Connect(c *gin.Context) {
2✔
144
        ctx := c.Request.Context()
2✔
145
        l := log.FromContext(ctx)
2✔
146

2✔
147
        idata := identity.FromContext(ctx)
2✔
148
        if !idata.IsUser {
2✔
149
                c.JSON(http.StatusBadRequest, gin.H{
×
150
                        "error": ErrMissingUserAuthentication.Error(),
×
151
                })
×
152
                return
×
153
        }
×
154

155
        tenantID := idata.Tenant
2✔
156
        userID := idata.Subject
2✔
157
        deviceID := c.Param("deviceId")
2✔
158

2✔
159
        session := &model.Session{
2✔
160
                TenantID:           tenantID,
2✔
161
                UserID:             userID,
2✔
162
                DeviceID:           deviceID,
2✔
163
                StartTS:            time.Now(),
2✔
164
                BytesRecordedMutex: &sync.Mutex{},
2✔
165
                Types:              []string{},
2✔
166
        }
2✔
167

2✔
168
        // Prepare the user session
2✔
169
        err := h.app.PrepareUserSession(ctx, session)
2✔
170
        if err == app.ErrDeviceNotFound || err == app.ErrDeviceNotConnected {
4✔
171
                c.JSON(http.StatusNotFound, gin.H{
2✔
172
                        "error": err.Error(),
2✔
173
                })
2✔
174
                return
2✔
175
        } else if _, ok := errors.Cause(err).(validation.Errors); ok {
4✔
176
                c.JSON(http.StatusBadRequest, gin.H{
×
177
                        "error": err.Error(),
×
178
                })
×
179
                return
×
180
        } else if err != nil {
3✔
181
                l.Error(err)
1✔
182
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
183
                        "error": err.Error(),
1✔
184
                })
1✔
185
                return
1✔
186
        }
1✔
187
        defer func() {
4✔
188
                err := h.app.FreeUserSession(ctx, session.ID, session.Types)
2✔
189
                if err != nil {
2✔
190
                        l.Warnf("failed to free session: %s", err.Error())
×
191
                }
×
192
        }()
193

194
        deviceChan := make(chan *natsio.Msg, channelSize)
2✔
195
        sub, err := h.nats.ChanSubscribe(session.Subject(tenantID), deviceChan)
2✔
196
        if err != nil {
2✔
197
                l.Error(err)
×
198
                c.JSON(http.StatusInternalServerError, gin.H{
×
199
                        "error": "failed to establish internal device session",
×
200
                })
×
201
                return
×
202
        }
×
203
        //nolint:errcheck
204
        defer sub.Unsubscribe()
2✔
205

2✔
206
        // upgrade get request to websocket protocol
2✔
207
        conn, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil)
2✔
208
        if err != nil {
4✔
209
                err = errors.Wrap(err, "unable to upgrade the request to websocket protocol")
2✔
210
                l.Error(err)
2✔
211
                // upgrader.Upgrade has already responded
2✔
212
                return
2✔
213
        }
2✔
214
        conn.SetReadLimit(int64(app.MessageSizeLimit))
2✔
215

2✔
216
        //nolint:errcheck
2✔
217
        h.ConnectServeWS(ctx, conn, session, deviceChan)
2✔
218
}
219

220
func (h ManagementController) Playback(c *gin.Context) {
2✔
221
        ctx := c.Request.Context()
2✔
222
        l := log.FromContext(ctx)
2✔
223

2✔
224
        idata := identity.FromContext(ctx)
2✔
225
        if !idata.IsUser {
2✔
226
                c.JSON(http.StatusBadRequest, gin.H{
×
227
                        "error": ErrMissingUserAuthentication.Error(),
×
228
                })
×
229
                return
×
230
        }
×
231

232
        tenantID := idata.Tenant
2✔
233
        userID := idata.Subject
2✔
234
        sessionID := c.Param(PlaybackSessionIDField)
2✔
235
        session := &model.Session{
2✔
236
                TenantID:           tenantID,
2✔
237
                UserID:             userID,
2✔
238
                StartTS:            time.Now(),
2✔
239
                BytesRecordedMutex: &sync.Mutex{},
2✔
240
        }
2✔
241
        sleepInterval := c.Param(PlaybackSleepIntervalMsField)
2✔
242
        sleepMilliseconds := uint(app.DefaultPlaybackSleepIntervalMs)
2✔
243
        if len(sleepInterval) > 1 {
2✔
244
                n, err := strconv.ParseUint(sleepInterval, 10, 32)
×
245
                if err != nil {
×
246
                        sleepMilliseconds = uint(n)
×
247
                }
×
248
        }
249

250
        l.Infof("Playing back the session session_id=%s", sessionID)
2✔
251

2✔
252
        // upgrade get request to websocket protocol
2✔
253
        conn, err := wsUpgrader.Upgrade(c.Writer, c.Request, nil)
2✔
254
        if err != nil {
4✔
255
                err = errors.Wrap(err, "unable to upgrade the request to websocket protocol")
2✔
256
                l.Error(err)
2✔
257
                return
2✔
258
        }
2✔
259
        conn.SetReadLimit(int64(app.MessageSizeLimit))
1✔
260

1✔
261
        deviceChan := make(chan *natsio.Msg, channelSize)
1✔
262
        errChan := make(chan error, 1)
1✔
263

1✔
264
        //nolint:errcheck
1✔
265
        go h.websocketWriter(ctx,
1✔
266
                conn,
1✔
267
                session,
1✔
268
                deviceChan,
1✔
269
                errChan,
1✔
270
                bufio.NewWriterSize(io.Discard, app.RecorderBufferSize),
1✔
271
                bufio.NewWriterSize(io.Discard, app.RecorderBufferSize))
1✔
272

1✔
273
        go func() {
2✔
274
                err = h.app.GetSessionRecording(ctx,
1✔
275
                        sessionID,
1✔
276
                        app.NewPlayback(deviceChan, sleepMilliseconds))
1✔
277
                if err != nil {
1✔
278
                        err = errors.Wrap(err, "unable to get the session.")
×
279
                        errChan <- err
×
280
                        return
×
281
                }
×
282
        }()
283
        // We need to keep reading in order to keep ping/pong handlers functioning.
284
        for ; err == nil; _, _, err = conn.NextReader() {
2✔
285
        }
1✔
286
}
287

288
func writerFinalizer(conn *websocket.Conn, e *error, l *log.Logger) {
2✔
289
        err := *e
2✔
290
        if err != nil {
4✔
291
                if !websocket.IsUnexpectedCloseError(errors.Cause(err)) {
4✔
292
                        errMsg := err.Error()
2✔
293
                        errBody := make([]byte, len(errMsg)+2)
2✔
294
                        binary.BigEndian.PutUint16(errBody,
2✔
295
                                websocket.CloseInternalServerErr)
2✔
296
                        copy(errBody[2:], errMsg)
2✔
297
                        errClose := conn.WriteControl(
2✔
298
                                websocket.CloseMessage,
2✔
299
                                errBody,
2✔
300
                                time.Now().Add(writeWait),
2✔
301
                        )
2✔
302
                        if errClose != nil {
3✔
303
                                err = errors.Wrapf(err,
1✔
304
                                        "error sending websocket close frame: %s",
1✔
305
                                        errClose.Error(),
1✔
306
                                )
1✔
307
                        }
1✔
308
                }
309
                l.Errorf("websocket closed with error: %s", err.Error())
2✔
310
        }
311
        conn.Close()
2✔
312
}
313

314
// websocketWriter is the go-routine responsible for the writing end of the
315
// websocket. The routine forwards messages posted on the NATS session subject
316
// and periodically pings the connection. If the connection times out or a
317
// protocol violation occurs, the routine closes the connection.
318
func (h ManagementController) websocketWriter(
319
        ctx context.Context,
320
        conn *websocket.Conn,
321
        session *model.Session,
322
        deviceChan <-chan *natsio.Msg,
323
        errChan <-chan error,
324
        recorderBuffered *bufio.Writer,
325
        controlRecorderBuffered *bufio.Writer,
326
) (err error) {
2✔
327
        l := log.FromContext(ctx)
2✔
328
        defer writerFinalizer(conn, &err, l)
2✔
329

2✔
330
        // handle the ping-pong connection health check
2✔
331
        if err != nil {
2✔
332
                l.Error(err)
×
333
                return err
×
334
        }
×
335

336
        conn.SetPingHandler(func(msg string) error {
3✔
337
                if err != nil {
1✔
338
                        return err
×
339
                }
×
340
                return conn.WriteControl(
1✔
341
                        websocket.PongMessage,
1✔
342
                        []byte(msg),
1✔
343
                        time.Now().Add(writeWait),
1✔
344
                )
1✔
345
        })
346

347
        defer recorderBuffered.Flush()
2✔
348
        defer controlRecorderBuffered.Flush()
2✔
349
        recordedBytes := 0
2✔
350
        controlBytes := 0
2✔
351

2✔
352
        sessOverLimit := false
2✔
353
        sessOverLimitHandled := false
2✔
354

2✔
355
        lastKeystrokeAt := time.Now().UTC().UnixNano()
2✔
356
Loop:
2✔
357
        for {
4✔
358
                var forwardedMsg []byte
2✔
359

2✔
360
                select {
2✔
361
                case msg := <-deviceChan:
2✔
362
                        mr := &ws.ProtoMsg{}
2✔
363
                        err = msgpack.Unmarshal(msg.Data, mr)
2✔
364
                        if err != nil {
2✔
365
                                return err
×
366
                        }
×
367

368
                        forwardedMsg = msg.Data
2✔
369

2✔
370
                        if mr.Header.Proto == ws.ProtoTypeShell {
4✔
371
                                switch mr.Header.MsgType {
2✔
372
                                case shell.MessageTypeShellCommand:
2✔
373

2✔
374
                                        if recordedBytes >= app.MessageSizeLimit ||
2✔
375
                                                controlBytes >= app.MessageSizeLimit {
3✔
376
                                                sessOverLimit = true
1✔
377

1✔
378
                                                errMsg := h.handleSessLimit(ctx,
1✔
379
                                                        session,
1✔
380
                                                        &sessOverLimitHandled,
1✔
381
                                                )
1✔
382

1✔
383
                                                //override original message with shell error
1✔
384
                                                if errMsg != nil {
2✔
385
                                                        forwardedMsg = errMsg
1✔
386
                                                }
1✔
387
                                        } else {
2✔
388
                                                if err = recordSession(ctx,
2✔
389
                                                        mr,
2✔
390
                                                        recorderBuffered,
2✔
391
                                                        controlRecorderBuffered,
2✔
392
                                                        &recordedBytes,
2✔
393
                                                        &controlBytes,
2✔
394
                                                        &lastKeystrokeAt,
2✔
395
                                                        session,
2✔
396
                                                ); err != nil {
2✔
397
                                                        return err
×
398
                                                }
×
399
                                        }
400

401
                                case shell.MessageTypeStopShell:
×
402
                                        l.Debugf("session logging: recorderBuffered.Flush()"+
×
403
                                                " at %d on stop shell", recordedBytes)
×
404
                                        recorderBuffered.Flush()
×
405
                                }
406
                        }
407

408
                        if !sessOverLimit {
4✔
409
                                err = conn.WriteMessage(websocket.BinaryMessage, forwardedMsg)
2✔
410
                                if err != nil {
2✔
411
                                        l.Error(err)
×
412
                                        break Loop
×
413
                                }
414
                        }
415
                case <-ctx.Done():
1✔
416
                        break Loop
1✔
417
                case err := <-errChan:
2✔
418
                        return err
2✔
419
                }
420
        }
421
        return err
1✔
422
}
423

424
func (h ManagementController) handleSessLimit(ctx context.Context,
425
        session *model.Session,
426
        handled *bool,
427
) []byte {
1✔
428
        l := log.FromContext(ctx)
1✔
429

1✔
430
        // possible error return message (ws->user)
1✔
431
        var retMsg []byte
1✔
432

1✔
433
        // attempt to clean up once
1✔
434
        if !(*handled) {
2✔
435
                sendLimitErrDevice(ctx, session, h.nats)
1✔
436
                userErrMsg, err := prepLimitErrUser(ctx, session)
1✔
437
                if err != nil {
1✔
438
                        l.Errorf("session limit: " +
×
439
                                "failed to notify user")
×
440
                }
×
441

442
                retMsg = userErrMsg
1✔
443

1✔
444
                err = h.app.FreeUserSession(ctx, session.ID, session.Types)
1✔
445
                if err != nil {
1✔
446
                        l.Warnf("failed to free session"+
×
447
                                "that went over limit: %s", err.Error())
×
448
                }
×
449

450
                *handled = true
1✔
451
        }
452

453
        return retMsg
1✔
454
}
455

456
func recordSession(ctx context.Context,
457
        msg *ws.ProtoMsg,
458
        recorder io.Writer,
459
        recorderCtrl io.Writer,
460
        recBytes *int,
461
        ctrlBytes *int,
462
        lastKeystrokeAt *int64,
463
        session *model.Session) error {
2✔
464
        l := log.FromContext(ctx)
2✔
465

2✔
466
        b, e := recorder.Write(msg.Body)
2✔
467
        if e != nil {
2✔
468
                l.Errorf("session logging: "+
×
469
                        "recorderBuffered.Write"+
×
470
                        "(len=%d)=%d,%+v",
×
471
                        len(msg.Body), b, e)
×
472
        }
×
473
        timeNowUTC := time.Now().UTC().UnixNano()
2✔
474
        keystrokeDelay := timeNowUTC - (*lastKeystrokeAt)
2✔
475
        if keystrokeDelay >= keyStrokeDelayRecordingThresholdNs {
2✔
476
                if keystrokeDelay > keyStrokeMaxDelayRecording {
×
477
                        keystrokeDelay = keyStrokeMaxDelayRecording
×
478
                }
×
479

480
                controlMsg := app.Control{
×
481
                        Type:   app.DelayMessage,
×
482
                        Offset: *recBytes,
×
483
                        DelayMs: uint16(float64(keystrokeDelay) *
×
484
                                0.000001),
×
485
                        TerminalHeight: 0,
×
486
                        TerminalWidth:  0,
×
487
                }
×
488
                n, _ := recorderCtrl.Write(
×
489
                        controlMsg.MarshalBinary())
×
490
                l.Debugf("saving control delay message: %+v/%d",
×
491
                        controlMsg, n)
×
492
                (*ctrlBytes) += n
×
493
        }
494

495
        (*lastKeystrokeAt) = timeNowUTC
2✔
496

2✔
497
        (*recBytes) += len(msg.Body)
2✔
498
        session.BytesRecordedMutex.Lock()
2✔
499
        session.BytesRecorded = *recBytes
2✔
500
        session.BytesRecordedMutex.Unlock()
2✔
501

2✔
502
        return nil
2✔
503
}
504

505
// prepLimitErrUser preps a session limit exceeded error for the user (shell cmd + err status)
506
func prepLimitErrUser(ctx context.Context, session *model.Session) ([]byte, error) {
1✔
507
        userErrMsg := ws.ProtoMsg{
1✔
508
                Header: ws.ProtoHdr{
1✔
509
                        Proto:     ws.ProtoTypeShell,
1✔
510
                        MsgType:   shell.MessageTypeShellCommand,
1✔
511
                        SessionID: session.ID,
1✔
512
                        Properties: map[string]interface{}{
1✔
513
                                "status": shell.ErrorMessage,
1✔
514
                        },
1✔
515
                },
1✔
516
                Body: []byte(ErrMsgSessionLimit),
1✔
517
        }
1✔
518

1✔
519
        return msgpack.Marshal(userErrMsg)
1✔
520
}
1✔
521

522
// sendLimitErrDevice preps and sends
523
// session limit exceeded error to device (stop shell + err status)
524
// this is best effort, log and swallow errors
525
func sendLimitErrDevice(ctx context.Context, session *model.Session, nats nats.Client) {
1✔
526
        l := log.FromContext(ctx)
1✔
527

1✔
528
        msg := ws.ProtoMsg{
1✔
529
                Header: ws.ProtoHdr{
1✔
530
                        Proto:     ws.ProtoTypeShell,
1✔
531
                        MsgType:   shell.MessageTypeStopShell,
1✔
532
                        SessionID: session.ID,
1✔
533
                        Properties: map[string]interface{}{
1✔
534
                                "status":       shell.ErrorMessage,
1✔
535
                                PropertyUserID: session.UserID,
1✔
536
                        },
1✔
537
                },
1✔
538
                Body: []byte(ErrMsgSessionLimit),
1✔
539
        }
1✔
540
        data, err := msgpack.Marshal(msg)
1✔
541
        if err != nil {
1✔
542
                l.Errorf(
×
543
                        "session limit: "+
×
544
                                "failed to prep stop session"+
×
545
                                "%s message to device: %s, error %v",
×
546
                        session.ID,
×
547
                        session.DeviceID,
×
548
                        err,
×
549
                )
×
550
        }
×
551
        err = nats.Publish(model.GetDeviceSubject(
1✔
552
                session.TenantID, session.DeviceID),
1✔
553
                data,
1✔
554
        )
1✔
555
        if err != nil {
1✔
556
                l.Errorf(
×
557
                        "session limit: failed to send stop session"+
×
558
                                "%s message to device: %s, error %v",
×
559
                        session.ID,
×
560
                        session.DeviceID,
×
561
                        err,
×
562
                )
×
563
        }
×
564
}
565

566
// ConnectServeWS starts a websocket connection with the device
567
// Currently this handler only properly handles a single terminal session.
568
func (h ManagementController) ConnectServeWS(
569
        ctx context.Context,
570
        conn *websocket.Conn,
571
        sess *model.Session,
572
        deviceChan chan *natsio.Msg,
573
) (err error) {
2✔
574
        l := log.FromContext(ctx)
2✔
575
        id := identity.FromContext(ctx)
2✔
576
        errChan := make(chan error, 1)
2✔
577
        remoteTerminalRunning := false
2✔
578

2✔
579
        defer func() {
4✔
580
                if err != nil {
4✔
581
                        select {
2✔
582
                        case errChan <- err:
2✔
583

584
                        case <-time.After(time.Second):
×
585
                                l.Warn("Failed to propagate error to client")
×
586
                        }
587
                }
588
                if remoteTerminalRunning {
3✔
589
                        msg := ws.ProtoMsg{
1✔
590
                                Header: ws.ProtoHdr{
1✔
591
                                        Proto:     ws.ProtoTypeShell,
1✔
592
                                        MsgType:   shell.MessageTypeStopShell,
1✔
593
                                        SessionID: sess.ID,
1✔
594
                                        Properties: map[string]interface{}{
1✔
595
                                                "status":       shell.ErrorMessage,
1✔
596
                                                PropertyUserID: sess.UserID,
1✔
597
                                        },
1✔
598
                                },
1✔
599
                                Body: []byte("user disconnected"),
1✔
600
                        }
1✔
601
                        data, _ := msgpack.Marshal(msg)
1✔
602
                        errPublish := h.nats.Publish(model.GetDeviceSubject(
1✔
603
                                id.Tenant, sess.DeviceID),
1✔
604
                                data,
1✔
605
                        )
1✔
606
                        if errPublish != nil {
1✔
607
                                l.Warnf(
×
608
                                        "failed to propagate stop session "+
×
609
                                                "message to device: %s",
×
610
                                        errPublish.Error(),
×
611
                                )
×
612
                        }
×
613
                }
614
                close(errChan)
2✔
615
        }()
616

617
        controlRecorder := h.app.GetControlRecorder(ctx, sess.ID)
2✔
618
        controlRecorderBuffered := bufio.NewWriterSize(controlRecorder, app.RecorderBufferSize)
2✔
619
        defer controlRecorderBuffered.Flush()
2✔
620

2✔
621
        sessionRecorder := h.app.GetRecorder(ctx, sess.ID)
2✔
622
        sessionRecorderBuffered := bufio.NewWriterSize(sessionRecorder, app.RecorderBufferSize)
2✔
623
        defer sessionRecorderBuffered.Flush()
2✔
624

2✔
625
        // websocketWriter is responsible for closing the websocket
2✔
626
        //nolint:errcheck
2✔
627
        go h.websocketWriter(ctx,
2✔
628
                conn,
2✔
629
                sess,
2✔
630
                deviceChan,
2✔
631
                errChan,
2✔
632
                sessionRecorderBuffered,
2✔
633
                controlRecorderBuffered)
2✔
634

2✔
635
        return h.connectServeWSProcessMessages(ctx, conn, sess, deviceChan,
2✔
636
                &remoteTerminalRunning, controlRecorderBuffered)
2✔
637
}
638

639
func (h ManagementController) connectServeWSProcessMessages(
640
        ctx context.Context,
641
        conn *websocket.Conn,
642
        sess *model.Session,
643
        deviceChan chan *natsio.Msg,
644
        remoteTerminalRunning *bool,
645
        controlRecorderBuffered *bufio.Writer,
646
) (err error) {
2✔
647
        l := log.FromContext(ctx)
2✔
648
        id := identity.FromContext(ctx)
2✔
649
        logTerminal := false
2✔
650
        logPortForward := false
2✔
651

2✔
652
        var data []byte
2✔
653
        controlBytes := 0
2✔
654
        ignoreControlMessages := false
2✔
655
        for {
4✔
656
                _, data, err = conn.ReadMessage()
2✔
657
                if err != nil {
4✔
658
                        if _, ok := err.(*websocket.CloseError); ok {
4✔
659
                                return nil
2✔
660
                        }
2✔
661
                        return err
1✔
662
                }
663
                m := &ws.ProtoMsg{}
2✔
664
                err = msgpack.Unmarshal(data, m)
2✔
665
                if err != nil {
3✔
666
                        return err
1✔
667
                }
1✔
668

669
                m.Header.SessionID = sess.ID
2✔
670
                if m.Header.Properties == nil {
3✔
671
                        m.Header.Properties = make(map[string]interface{})
1✔
672
                }
1✔
673
                m.Header.Properties[PropertyUserID] = sess.UserID
2✔
674
                data, _ = msgpack.Marshal(m)
2✔
675
                switch m.Header.Proto {
2✔
676
                case ws.ProtoTypeShell:
2✔
677
                        // send the audit log for remote terminal
2✔
678
                        if !logTerminal {
4✔
679
                                if err := h.app.LogUserSession(ctx, sess,
2✔
680
                                        model.SessionTypeTerminal); err != nil {
2✔
681
                                        return err
×
682
                                }
×
683
                                sess.Types = append(sess.Types, model.SessionTypeTerminal)
2✔
684
                                logTerminal = true
2✔
685
                        }
686
                        // handle remote terminal-specific messages
687
                        switch m.Header.MsgType {
2✔
688
                        case shell.MessageTypeSpawnShell:
1✔
689
                                *remoteTerminalRunning = true
1✔
690
                        case shell.MessageTypeStopShell:
1✔
691
                                *remoteTerminalRunning = false
1✔
692
                        case shell.MessageTypeResizeShell:
×
693
                                if ignoreControlMessages {
×
694
                                        continue
×
695
                                }
696
                                if controlBytes >= app.MessageSizeLimit {
×
697
                                        l.Infof("session_id=%s control data limit reached.",
×
698
                                                sess.ID)
×
699
                                        //see https://northerntech.atlassian.net/browse/MEN-4448
×
700
                                        ignoreControlMessages = true
×
701
                                        continue
×
702
                                }
703

704
                                controlBytes += sendResizeMessage(m, sess, controlRecorderBuffered)
×
705
                        }
706
                case ws.ProtoTypePortForward:
×
707
                        if !logPortForward {
×
708
                                if err := h.app.LogUserSession(ctx, sess,
×
709
                                        model.SessionTypePortForward); err != nil {
×
710
                                        return err
×
711
                                }
×
712
                                sess.Types = append(sess.Types, model.SessionTypePortForward)
×
713
                                logPortForward = true
×
714
                        }
715
                }
716

717
                err = h.nats.Publish(model.GetDeviceSubject(id.Tenant, sess.DeviceID), data)
2✔
718
                if err != nil {
2✔
719
                        return err
×
720
                }
×
721
        }
722
}
723

724
func sendResizeMessage(m *ws.ProtoMsg,
725
        sess *model.Session,
726
        controlRecorderBuffered *bufio.Writer) (n int) {
×
727
        if _, ok := m.Header.Properties[model.ResizeMessageTermHeightField]; ok {
×
728
                return 0
×
729
        }
×
730
        if _, ok := m.Header.Properties[model.ResizeMessageTermWidthField]; ok {
×
731
                return 0
×
732
        }
×
733

734
        var height uint16 = 0
×
735
        switch m.Header.Properties[model.ResizeMessageTermHeightField].(type) {
×
736
        case uint8:
×
737
                height = uint16(m.Header.Properties[model.ResizeMessageTermHeightField].(uint8))
×
738
        case int8:
×
739
                height = uint16(m.Header.Properties[model.ResizeMessageTermHeightField].(int8))
×
740
        }
741

742
        var width uint16 = 0
×
743
        switch m.Header.Properties[model.ResizeMessageTermWidthField].(type) {
×
744
        case uint8:
×
745
                width = uint16(m.Header.Properties[model.ResizeMessageTermWidthField].(uint8))
×
746
        case int8:
×
747
                width = uint16(m.Header.Properties[model.ResizeMessageTermWidthField].(int8))
×
748
        }
749

750
        sess.BytesRecordedMutex.Lock()
×
751
        controlMsg := app.Control{
×
752
                Type:           app.ResizeMessage,
×
753
                Offset:         sess.BytesRecorded,
×
754
                DelayMs:        0,
×
755
                TerminalHeight: height,
×
756
                TerminalWidth:  width,
×
757
        }
×
758
        sess.BytesRecordedMutex.Unlock()
×
759

×
760
        n, _ = controlRecorderBuffered.Write(
×
761
                controlMsg.MarshalBinary(),
×
762
        )
×
763
        return n
×
764
}
765

766
func (h ManagementController) CheckUpdate(c *gin.Context) {
1✔
767
        h.sendMenderCommand(c, menderclient.MessageTypeMenderClientCheckUpdate)
1✔
768
}
1✔
769

770
func (h ManagementController) SendInventory(c *gin.Context) {
1✔
771
        h.sendMenderCommand(c, menderclient.MessageTypeMenderClientSendInventory)
1✔
772
}
1✔
773

774
func (h ManagementController) sendMenderCommand(c *gin.Context, msgType string) {
1✔
775
        ctx := c.Request.Context()
1✔
776

1✔
777
        idata := identity.FromContext(ctx)
1✔
778
        if idata == nil || !idata.IsUser {
1✔
779
                c.JSON(http.StatusBadRequest, gin.H{
×
780
                        "error": ErrMissingUserAuthentication.Error(),
×
781
                })
×
782
                return
×
783
        }
×
784
        tenantID := idata.Tenant
1✔
785
        deviceID := c.Param("deviceId")
1✔
786

1✔
787
        device, err := h.app.GetDevice(ctx, tenantID, deviceID)
1✔
788
        if err == app.ErrDeviceNotFound {
2✔
789
                c.JSON(http.StatusNotFound, gin.H{
1✔
790
                        "error": err.Error(),
1✔
791
                })
1✔
792
                return
1✔
793
        } else if err != nil {
3✔
794
                c.JSON(http.StatusBadRequest, gin.H{
1✔
795
                        "error": err.Error(),
1✔
796
                })
1✔
797
                return
1✔
798
        } else if device.Status != model.DeviceStatusConnected {
3✔
799
                c.JSON(http.StatusConflict, gin.H{
1✔
800
                        "error": app.ErrDeviceNotConnected,
1✔
801
                })
1✔
802
                return
1✔
803
        }
1✔
804

805
        msg := &ws.ProtoMsg{
1✔
806
                Header: ws.ProtoHdr{
1✔
807
                        Proto:   ws.ProtoTypeMenderClient,
1✔
808
                        MsgType: msgType,
1✔
809
                        Properties: map[string]interface{}{
1✔
810
                                PropertyUserID: idata.Subject,
1✔
811
                        },
1✔
812
                },
1✔
813
        }
1✔
814
        data, _ := msgpack.Marshal(msg)
1✔
815

1✔
816
        err = h.nats.Publish(model.GetDeviceSubject(idata.Tenant, device.ID), data)
1✔
817
        if err != nil {
2✔
818
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
819
                        "error": err.Error(),
1✔
820
                })
1✔
821
        }
1✔
822

823
        c.JSON(http.StatusAccepted, nil)
1✔
824
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc