• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-server / 1592525883

17 Dec 2024 01:50PM UTC coverage: 73.526% (+0.7%) from 72.839%
1592525883

Pull #270

gitlab-ci

bahaa-ghazal
test: testing 'deploy to all devices' feature

Changelog = Title
Ticket = MEN-4272
Signed-off-by: Bahaa Aldeen Ghazal <bahaa.ghazal@northern.tech>
Pull Request #270: test: testing 'deploy to all devices' feature

4244 of 6144 branches covered (69.08%)

Branch coverage included in aggregate %.

40043 of 54089 relevant lines covered (74.03%)

23.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.39
/backend/services/deviceconnect/api/http/management_filetransfer.go
1
// Copyright 2021 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package http
16

17
import (
18
        "bufio"
19
        "context"
20
        "fmt"
21
        "io"
22
        "net/http"
23
        "os"
24
        "path"
25
        "strconv"
26
        "time"
27

28
        "github.com/gin-gonic/gin"
29
        "github.com/google/uuid"
30
        natsio "github.com/nats-io/nats.go"
31
        "github.com/pkg/errors"
32
        "github.com/vmihailenco/msgpack/v5"
33

34
        "github.com/mendersoftware/mender-server/pkg/identity"
35
        "github.com/mendersoftware/mender-server/pkg/log"
36
        "github.com/mendersoftware/mender-server/pkg/requestid"
37
        "github.com/mendersoftware/mender-server/pkg/ws"
38
        wsft "github.com/mendersoftware/mender-server/pkg/ws/filetransfer"
39

40
        "github.com/mendersoftware/mender-server/services/deviceconnect/app"
41
        "github.com/mendersoftware/mender-server/services/deviceconnect/model"
42
)
43

44
type fileTransferParams struct {
45
        TenantID  string
46
        UserID    string
47
        SessionID string
48
        Device    *model.Device
49
}
50

51
const (
52
        hdrContentType            = "Content-Type"
53
        hdrContentDisposition     = "Content-Disposition"
54
        hdrMenderFileTransferPath = "X-MEN-File-Path"
55
        hdrMenderFileTransferUID  = "X-MEN-File-UID"
56
        hdrMenderFileTransferGID  = "X-MEN-File-GID"
57
        hdrMenderFileTransferMode = "X-MEN-File-Mode"
58
        hdrMenderFileTransferSize = "X-MEN-File-Size"
59
)
60

61
const (
62
        fieldUploadPath = "path"
63
        fieldUploadUID  = "uid"
64
        fieldUploadGID  = "gid"
65
        fieldUploadMode = "mode"
66
        fieldUploadFile = "file"
67

68
        PropertyOffset = "offset"
69

70
        paramDownloadPath = "path"
71
)
72

73
var fileTransferTimeout = 60 * time.Second
74
var fileTransferBufferSize = 4096
75
var ackSlidingWindowSend = 10
76
var ackSlidingWindowRecv = 20
77

78
type Error struct {
79
        error      error
80
        statusCode int
81
}
82

83
func NewError(err error, code int) error {
1✔
84
        return &Error{
1✔
85
                error:      err,
1✔
86
                statusCode: code,
1✔
87
        }
1✔
88
}
1✔
89

90
func (err *Error) Error() string {
2✔
91
        return err.error.Error()
2✔
92
}
2✔
93

94
func (err *Error) Unwrap() error {
×
95
        return err.error
×
96
}
×
97

98
var (
99
        errFileTransferMarshalling   = errors.New("failed to marshal the request")
100
        errFileTransferUnmarshalling = errors.New("failed to unmarshal the request")
101
        errFileTransferPublishing    = errors.New("failed to publish the message")
102
        errFileTransferSubscribing   = errors.New("failed to subscribe to the mesages")
103
        errFileTransferTimeout       = &Error{
104
                error:      errors.New("file transfer timed out"),
105
                statusCode: http.StatusRequestTimeout,
106
        }
107
        errFileTransferFailed = &Error{
108
                error:      errors.New("file transfer failed"),
109
                statusCode: http.StatusBadRequest,
110
        }
111
        errFileTransferNotImplemented = &Error{
112
                error:      errors.New("file transfer not implemented on device"),
113
                statusCode: http.StatusBadGateway,
114
        }
115
        errFileTransferDisabled = &Error{
116
                error:      errors.New("file transfer disabled on device"),
117
                statusCode: http.StatusBadGateway,
118
        }
119
)
120

121
var newFileTransferSessionID = func() (uuid.UUID, error) {
1✔
122
        return uuid.NewRandom()
1✔
123
}
1✔
124

125
func (h ManagementController) getFileTransferParams(c *gin.Context) (*fileTransferParams, int,
126
        error) {
2✔
127
        ctx := c.Request.Context()
2✔
128

2✔
129
        idata := identity.FromContext(ctx)
2✔
130
        if idata == nil || !idata.IsUser {
3✔
131
                return nil, http.StatusUnauthorized, ErrMissingUserAuthentication
1✔
132
        }
1✔
133
        tenantID := idata.Tenant
2✔
134
        deviceID := c.Param("deviceId")
2✔
135

2✔
136
        device, err := h.app.GetDevice(ctx, tenantID, deviceID)
2✔
137
        if err == app.ErrDeviceNotFound {
3✔
138
                return nil, http.StatusNotFound, err
1✔
139
        } else if err != nil {
4✔
140
                return nil, http.StatusBadRequest, err
1✔
141
        } else if device.Status != model.DeviceStatusConnected {
4✔
142
                return nil, http.StatusConflict, app.ErrDeviceNotConnected
1✔
143
        }
1✔
144

145
        if c.Request.Method != http.MethodGet && c.Request.Body == nil {
3✔
146
                return nil, http.StatusBadRequest, errors.New("missing request body")
1✔
147
        }
1✔
148

149
        sessionID, err := newFileTransferSessionID()
2✔
150
        if err != nil {
2✔
151
                return nil, http.StatusInternalServerError,
×
152
                        errors.New("failed to generate session ID")
×
153
        }
×
154

155
        return &fileTransferParams{
2✔
156
                TenantID:  idata.Tenant,
2✔
157
                UserID:    idata.Subject,
2✔
158
                SessionID: sessionID.String(),
2✔
159
                Device:    device,
2✔
160
        }, 0, nil
2✔
161
}
162

163
func (h ManagementController) publishFileTransferProtoMessage(sessionID, userID, deviceTopic,
164
        msgType string, body interface{}, offset int64) error {
1✔
165
        var msgBody []byte
1✔
166
        if msgType == wsft.MessageTypeChunk && body != nil {
2✔
167
                msgBody = body.([]byte)
1✔
168
        } else if msgType == wsft.MessageTypeACK {
3✔
169
                msgBody = nil
1✔
170
        } else if body != nil {
3✔
171
                var err error
1✔
172
                msgBody, err = msgpack.Marshal(body)
1✔
173
                if err != nil {
1✔
174
                        return errors.Wrap(err, errFileTransferMarshalling.Error())
×
175
                }
×
176
        }
177
        proto := ws.ProtoTypeFileTransfer
1✔
178
        if msgType == ws.MessageTypePing || msgType == ws.MessageTypePong {
1✔
179
                proto = ws.ProtoTypeControl
×
180
        }
×
181
        msg := &ws.ProtoMsg{
1✔
182
                Header: ws.ProtoHdr{
1✔
183
                        Proto:     proto,
1✔
184
                        MsgType:   msgType,
1✔
185
                        SessionID: sessionID,
1✔
186
                        Properties: map[string]interface{}{
1✔
187
                                PropertyUserID: userID,
1✔
188
                        },
1✔
189
                },
1✔
190
                Body: msgBody,
1✔
191
        }
1✔
192
        if msgType == wsft.MessageTypeChunk || msgType == wsft.MessageTypeACK {
2✔
193
                msg.Header.Properties[PropertyOffset] = offset
1✔
194
        }
1✔
195
        data, err := msgpack.Marshal(msg)
1✔
196
        if err != nil {
1✔
197
                return errors.Wrap(err, errFileTransferMarshalling.Error())
×
198
        }
×
199

200
        err = h.nats.Publish(deviceTopic, data)
1✔
201
        if err != nil {
1✔
202
                return errors.Wrap(err, errFileTransferPublishing.Error())
×
203
        }
×
204
        return nil
1✔
205
}
206

207
func (h ManagementController) publishControlMessage(
208
        sessionID, deviceTopic, messageType string, body interface{},
209
) error {
2✔
210
        msg := &ws.ProtoMsg{
2✔
211
                Header: ws.ProtoHdr{
2✔
212
                        Proto:     ws.ProtoTypeControl,
2✔
213
                        MsgType:   messageType,
2✔
214
                        SessionID: sessionID,
2✔
215
                },
2✔
216
        }
2✔
217

2✔
218
        if body != nil {
4✔
219
                if b, ok := body.([]byte); ok {
2✔
220
                        msg.Body = b
×
221
                } else {
2✔
222
                        b, err := msgpack.Marshal(body)
2✔
223
                        if err != nil {
2✔
224
                                return errors.Wrap(errFileTransferMarshalling, err.Error())
×
225
                        }
×
226
                        msg.Body = b
2✔
227
                }
228
        }
229

230
        data, err := msgpack.Marshal(msg)
2✔
231
        if err != nil {
2✔
232
                return errors.Wrap(errFileTransferMarshalling, err.Error())
×
233
        }
×
234
        err = h.nats.Publish(deviceTopic, data)
2✔
235
        if err != nil {
2✔
236
                return errors.Wrap(errFileTransferPublishing, err.Error())
×
237
        }
×
238
        return err
2✔
239
}
240

241
func (h ManagementController) decodeFileTransferProtoMessage(data []byte) (*ws.ProtoMsg,
242
        interface{}, error) {
1✔
243
        msg := &ws.ProtoMsg{}
1✔
244
        err := msgpack.Unmarshal(data, msg)
1✔
245
        if err != nil {
1✔
246
                return nil, nil, errors.Wrap(err, errFileTransferUnmarshalling.Error())
×
247
        }
×
248

249
        switch msg.Header.MsgType {
1✔
250
        case wsft.MessageTypeError:
1✔
251
                msgBody := &ws.Error{}
1✔
252
                err := msgpack.Unmarshal(msg.Body, msgBody)
1✔
253
                if err != nil {
1✔
254
                        return nil, nil, errors.Wrap(err, errFileTransferUnmarshalling.Error())
×
255
                }
×
256
                return msg, msgBody, nil
1✔
257
        case wsft.MessageTypeFileInfo:
×
258
                msgBody := &wsft.FileInfo{}
×
259
                err := msgpack.Unmarshal(msg.Body, msgBody)
×
260
                if err != nil {
×
261
                        return nil, nil, errors.Wrap(err, errFileTransferUnmarshalling.Error())
×
262
                }
×
263
                return msg, msgBody, nil
×
264
        case wsft.MessageTypeACK, wsft.MessageTypeChunk, ws.MessageTypePing, ws.MessageTypePong:
1✔
265
                return msg, nil, nil
1✔
266
        }
267

268
        return nil, nil, errors.Errorf("unexpected message type '%s'", msg.Header.MsgType)
1✔
269
}
270

271
func writeHeaders(c *gin.Context, fileInfo *wsft.FileInfo) {
1✔
272
        c.Writer.Header().Add(hdrContentType, "application/octet-stream")
1✔
273
        if fileInfo.Path != nil {
2✔
274
                filename := path.Base(*fileInfo.Path)
1✔
275
                c.Writer.Header().Add(hdrContentDisposition,
1✔
276
                        "attachment; filename=\""+filename+"\"")
1✔
277
                c.Writer.Header().Add(hdrMenderFileTransferPath, *fileInfo.Path)
1✔
278
        }
1✔
279
        if fileInfo.UID != nil {
2✔
280
                c.Writer.Header().Add(hdrMenderFileTransferUID, fmt.Sprintf("%d", *fileInfo.UID))
1✔
281
        }
1✔
282
        if fileInfo.GID != nil {
2✔
283
                c.Writer.Header().Add(hdrMenderFileTransferGID, fmt.Sprintf("%d", *fileInfo.GID))
1✔
284
        }
1✔
285
        if fileInfo.Mode != nil {
2✔
286
                c.Writer.Header().Add(hdrMenderFileTransferMode, fmt.Sprintf("%o", *fileInfo.Mode))
1✔
287
        }
1✔
288
        if fileInfo.Size != nil {
2✔
289
                c.Writer.Header().Add(hdrMenderFileTransferSize, fmt.Sprintf("%d", *fileInfo.Size))
1✔
290
        }
1✔
291
        c.Writer.WriteHeader(http.StatusOK)
1✔
292
}
293
func (h ManagementController) handleResponseError(c *gin.Context, err error) {
2✔
294
        l := log.FromContext(c.Request.Context())
2✔
295
        l.Errorf("error handling request: %s", err.Error())
2✔
296
        if !c.Writer.Written() {
4✔
297
                var statusError *Error
2✔
298
                var errMsg string = err.Error()
2✔
299
                var statusCode int = http.StatusInternalServerError
2✔
300
                if errors.As(err, &statusError) {
4✔
301
                        statusCode = statusError.statusCode
2✔
302
                }
2✔
303
                if statusCode >= 500 {
3✔
304
                        errMsg = "internal error"
1✔
305
                }
1✔
306
                c.Writer.WriteHeader(statusCode)
2✔
307
                c.JSON(statusCode, gin.H{
2✔
308
                        "error":      errMsg,
2✔
309
                        "request_id": requestid.FromContext(c.Request.Context()),
2✔
310
                })
2✔
311
        } else {
×
312
                l.Warn("response already written")
×
313
        }
×
314
}
315

316
func chanTimeout(
317
        src <-chan *natsio.Msg,
318
        timeout time.Duration,
319
) <-chan *natsio.Msg {
2✔
320
        timer := time.NewTimer(timeout)
2✔
321
        dst := make(chan *natsio.Msg)
2✔
322
        go func() {
4✔
323
                for {
4✔
324
                        select {
2✔
325
                        case <-timer.C:
2✔
326
                                close(dst)
2✔
327
                                return
2✔
328
                        case msg, ok := <-src:
1✔
329
                                if !ok {
2✔
330
                                        close(dst)
1✔
331
                                        return
1✔
332
                                }
1✔
333
                                if !timer.Stop() {
1✔
334
                                        // Timer must be stopped and drained before calling Reset.
×
335
                                        select {
×
336
                                        case <-timer.C:
×
337
                                        default:
×
338
                                        }
339
                                }
340
                                timer.Reset(timeout)
1✔
341
                                dst <- msg
1✔
342
                        }
343
                }
344
        }()
345
        return dst
2✔
346
}
347

348
func (h ManagementController) statFile(
349
        ctx context.Context,
350
        sessChan <-chan *natsio.Msg,
351
        path, sessionID, userID, deviceTopic string) (*wsft.FileInfo, error) {
1✔
352
        // stat the remote file
1✔
353
        req := wsft.StatFile{
1✔
354
                Path: &path,
1✔
355
        }
1✔
356
        if err := h.publishFileTransferProtoMessage(sessionID,
1✔
357
                userID, deviceTopic, wsft.MessageTypeStat, req, 0); err != nil {
1✔
358
                return nil, err
×
359
        }
×
360
        var fileInfo *wsft.FileInfo
1✔
361
        select {
1✔
362
        case rsp, ok := <-sessChan:
1✔
363
                if !ok {
2✔
364
                        return nil, errFileTransferTimeout
1✔
365
                }
1✔
366
                var msg ws.ProtoMsg
1✔
367
                err := msgpack.Unmarshal(rsp.Data, &msg)
1✔
368
                if err != nil {
1✔
369
                        return nil, fmt.Errorf("malformed message from device: %w", err)
×
370
                }
×
371
                if msg.Header.MsgType == ws.MessageTypeError {
2✔
372
                        var errMsg ws.Error
1✔
373
                        _ = msgpack.Unmarshal(msg.Body, &errMsg)
1✔
374
                        errCode := http.StatusBadRequest
1✔
375
                        if errMsg.Code > 0 {
1✔
376
                                errCode = errMsg.Code
×
377
                        }
×
378
                        rspErr := NewError(
1✔
379
                                fmt.Errorf("error received from device: %s", errMsg.Error),
1✔
380
                                errCode,
1✔
381
                        )
1✔
382
                        return nil, rspErr
1✔
383
                }
384
                if msg.Header.Proto != ws.ProtoTypeFileTransfer ||
1✔
385
                        msg.Header.MsgType != wsft.MessageTypeFileInfo {
1✔
386
                        return nil, fmt.Errorf("unexpected response from device %q", msg.Header.MsgType)
×
387
                }
×
388
                err = msgpack.Unmarshal(msg.Body, &fileInfo)
1✔
389
                if err != nil {
1✔
390
                        return nil, fmt.Errorf("malformed message body from device: %w", err)
×
391
                }
×
392
        case <-ctx.Done():
×
393
                return nil, ctx.Err()
×
394
        }
395
        return fileInfo, nil
1✔
396
}
397

398
func (h ManagementController) downloadFileResponse(c *gin.Context, params *fileTransferParams,
399
        request *model.DownloadFileRequest) {
2✔
400
        ctx := c.Request.Context()
2✔
401
        // send a JSON-encoded error message in case of failure
2✔
402

2✔
403
        // subscribe to messages from the device
2✔
404
        deviceTopic := model.GetDeviceSubject(params.TenantID, params.Device.ID)
2✔
405
        sessionTopic := model.GetSessionSubject(params.TenantID, params.SessionID)
2✔
406
        subChan := make(chan *natsio.Msg, channelSize)
2✔
407
        defer close(subChan)
2✔
408
        sub, err := h.nats.ChanSubscribe(sessionTopic, subChan)
2✔
409
        if err != nil {
2✔
410
                h.handleResponseError(c, errors.Wrap(err, errFileTransferSubscribing.Error()))
×
411
                return
×
412
        }
×
413
        //nolint:errcheck
414
        defer sub.Unsubscribe()
2✔
415

2✔
416
        msgChan := chanTimeout(subChan, fileTransferTimeout)
2✔
417

2✔
418
        if err = h.filetransferHandshake(msgChan, params.SessionID, deviceTopic); err != nil {
4✔
419
                h.handleResponseError(c, err)
2✔
420
                return
2✔
421
        }
2✔
422
        // Inform the device that we're closing the session
423
        //nolint:errcheck
424
        defer h.publishControlMessage(params.SessionID, deviceTopic, ws.MessageTypeClose, nil)
1✔
425

1✔
426
        fileInfo, err := h.statFile(
1✔
427
                ctx, msgChan, *request.Path,
1✔
428
                params.SessionID, params.UserID, deviceTopic,
1✔
429
        )
1✔
430
        if err != nil {
2✔
431
                h.handleResponseError(c, fmt.Errorf("failed to retrieve file info: %w", err))
1✔
432
                return
1✔
433
        }
1✔
434
        if fileInfo.Mode == nil || !os.FileMode(*fileInfo.Mode).IsRegular() {
2✔
435
                h.handleResponseError(
1✔
436
                        c,
1✔
437
                        NewError(fmt.Errorf("path is not a regular file"), http.StatusBadRequest),
1✔
438
                )
1✔
439
                return
1✔
440
        }
1✔
441
        writeHeaders(c, fileInfo)
1✔
442
        if c.Request.Method == http.MethodHead {
1✔
443
                return
×
444
        }
×
445
        err = h.downloadFile(
1✔
446
                ctx, msgChan, c.Writer, *request.Path,
1✔
447
                params.SessionID, params.UserID, deviceTopic,
1✔
448
        )
1✔
449
        if err != nil {
2✔
450
                if !c.Writer.Written() {
2✔
451
                        h.handleResponseError(c, err)
1✔
452
                }
1✔
453
                log.FromContext(ctx).
1✔
454
                        Errorf("error downloading file from device: %s", err.Error())
1✔
455
        }
456
}
457

458
func (h ManagementController) downloadFile(
459
        ctx context.Context,
460
        msgChan <-chan *natsio.Msg,
461
        dst io.Writer,
462
        path, sessionID, userID, deviceTopic string,
463
) error {
1✔
464
        latestOffset := int64(0)
1✔
465
        bw := bufio.NewWriter(dst)
1✔
466
        numberOfChunks := 0
1✔
467
        req := wsft.GetFile{
1✔
468
                Path: &path,
1✔
469
        }
1✔
470
        if err := h.publishFileTransferProtoMessage(
1✔
471
                sessionID,
1✔
472
                userID,
1✔
473
                deviceTopic,
1✔
474
                wsft.MessageTypeGet,
1✔
475
                req, 0); err != nil {
1✔
476
                return err
×
477
        }
×
478
        for {
2✔
479
                select {
1✔
480
                case wsMessage, ok := <-msgChan:
1✔
481
                        if !ok {
2✔
482
                                return errFileTransferTimeout
1✔
483
                        }
1✔
484

485
                        // process the message
486
                        msg, msgBody, err := h.decodeFileTransferProtoMessage(wsMessage.Data)
1✔
487
                        if err != nil {
1✔
488
                                return err
×
489
                        }
×
490

491
                        // process incoming messages from the device by type
492
                        switch msg.Header.MsgType {
1✔
493

494
                        // error message, stop here
495
                        case wsft.MessageTypeError:
1✔
496
                                err := msgBody.(*ws.Error)
1✔
497
                                errCode := http.StatusInternalServerError
1✔
498
                                if err.Code > 0 {
1✔
499
                                        errCode = err.Code
×
500
                                }
×
501
                                return NewError(errors.New(err.Error), errCode)
1✔
502

503
                        // file data chunk
504
                        case wsft.MessageTypeChunk:
1✔
505
                                if msg.Body == nil {
2✔
506
                                        if err := h.publishFileTransferProtoMessage(
1✔
507
                                                sessionID, userID, deviceTopic,
1✔
508
                                                wsft.MessageTypeACK, nil,
1✔
509
                                                latestOffset); err != nil {
1✔
510
                                                return err
×
511
                                        }
×
512
                                        return bw.Flush()
1✔
513
                                }
514

515
                                // verify the offset property
516
                                propOffset, _ := msg.Header.Properties[PropertyOffset].(int64)
1✔
517
                                if propOffset != latestOffset {
2✔
518
                                        return NewError(errors.Wrap(errFileTransferFailed,
1✔
519
                                                "wrong offset received"), http.StatusInternalServerError)
1✔
520
                                }
1✔
521
                                latestOffset += int64(len(msg.Body))
1✔
522

1✔
523
                                _, err := bw.Write(msg.Body)
1✔
524
                                if err != nil {
1✔
525
                                        return err
×
526
                                }
×
527

528
                                numberOfChunks++
1✔
529
                                if numberOfChunks >= ackSlidingWindowSend {
2✔
530
                                        if err := h.publishFileTransferProtoMessage(
1✔
531
                                                sessionID, userID, deviceTopic,
1✔
532
                                                wsft.MessageTypeACK, nil,
1✔
533
                                                latestOffset); err != nil {
1✔
534
                                                return err
×
535
                                        }
×
536
                                        numberOfChunks = 0
1✔
537
                                }
538

539
                        case ws.MessageTypePing:
×
540
                                if err := h.publishFileTransferProtoMessage(
×
541
                                        sessionID, userID, deviceTopic,
×
542
                                        ws.MessageTypePong, nil,
×
543
                                        -1); err != nil {
×
544
                                        return err
×
545
                                }
×
546
                        }
547
                case <-ctx.Done():
×
548
                        return ctx.Err()
×
549
                }
550
        }
551
}
552

553
func (h ManagementController) DownloadFile(c *gin.Context) {
2✔
554
        l := log.FromContext(c.Request.Context())
2✔
555

2✔
556
        params, statusCode, err := h.getFileTransferParams(c)
2✔
557
        if err != nil {
3✔
558
                l.Error(err)
1✔
559
                c.JSON(statusCode, gin.H{"error": err.Error()})
1✔
560
                return
1✔
561
        }
1✔
562

563
        path := c.Request.URL.Query().Get(paramDownloadPath)
2✔
564
        request := &model.DownloadFileRequest{
2✔
565
                Path: &path,
2✔
566
        }
2✔
567

2✔
568
        if err := request.Validate(); err != nil {
3✔
569
                l.Error(err)
1✔
570
                c.JSON(http.StatusBadRequest, gin.H{
1✔
571
                        "error": errors.Wrap(err, "bad request").Error(),
1✔
572
                })
1✔
573
                return
1✔
574
        }
1✔
575

576
        ctx := c.Request.Context()
2✔
577
        if err := h.app.DownloadFile(ctx, params.UserID, params.Device.ID,
2✔
578
                *request.Path); err != nil {
3✔
579
                l.Error(err)
1✔
580
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
581
                        "error": errors.Wrap(err, "bad request").Error(),
1✔
582
                })
1✔
583
                return
1✔
584
        }
1✔
585

586
        h.downloadFileResponse(c, params, request)
2✔
587
}
588

589
func (h ManagementController) uploadFileResponseHandleInboundMessages(
590
        c *gin.Context, params *fileTransferParams,
591
        msgChan chan *natsio.Msg, errorChan chan error,
592
        latestAckOffsets chan int64,
593
) {
1✔
594
        var latestAckOffset int64
1✔
595
        deviceTopic := model.GetDeviceSubject(params.TenantID, params.Device.ID)
1✔
596
        for {
2✔
597
                select {
1✔
598
                case wsMessage := <-msgChan:
1✔
599
                        msg, msgBody, err := h.decodeFileTransferProtoMessage(
1✔
600
                                wsMessage.Data)
1✔
601
                        if err != nil {
2✔
602
                                errorChan <- err
1✔
603
                                return
1✔
604
                        }
1✔
605

606
                        // process incoming messages from the device by type
607
                        switch msg.Header.MsgType {
1✔
608

609
                        // error message, stop here
610
                        case wsft.MessageTypeError:
1✔
611
                                errorMsg := msgBody.(*ws.Error)
1✔
612
                                errCode := http.StatusBadRequest
1✔
613
                                if errorMsg.Code > 0 {
2✔
614
                                        errCode = errorMsg.Code
1✔
615
                                }
1✔
616
                                errorChan <- NewError(errors.New(errorMsg.Error), errCode)
1✔
617
                                return
1✔
618

619
                        // you can continue the upload
620
                        case wsft.MessageTypeACK:
1✔
621
                                propValue := msg.Header.Properties[PropertyOffset]
1✔
622
                                propOffset, _ := propValue.(int64)
1✔
623
                                if propOffset > latestAckOffset {
2✔
624
                                        latestAckOffset = propOffset
1✔
625
                                        select {
1✔
626
                                        case latestAckOffsets <- latestAckOffset:
1✔
627
                                        case <-latestAckOffsets:
×
628
                                                // Replace ack offset with the latest one
×
629
                                                latestAckOffsets <- latestAckOffset
×
630
                                        }
631
                                }
632

633
                        // handle ping messages
634
                        case ws.MessageTypePing:
×
635
                                if err := h.publishFileTransferProtoMessage(
×
636
                                        params.SessionID, params.UserID, deviceTopic,
×
637
                                        ws.MessageTypePong, nil,
×
638
                                        -1); err != nil {
×
639
                                        errorChan <- err
×
640
                                }
×
641
                        }
642
                case <-c.Done():
×
643
                        return
×
644
                }
645
        }
646
}
647

648
// filetransferHandshake initiates a handshake and checks that the device
649
// is willing to accept file transfer requests.
650
func (h ManagementController) filetransferHandshake(
651
        sessChan <-chan *natsio.Msg, sessionID, deviceTopic string,
652
) error {
2✔
653
        if err := h.publishControlMessage(
2✔
654
                sessionID, deviceTopic,
2✔
655
                ws.MessageTypeOpen, ws.Open{
2✔
656
                        Versions: []int{ws.ProtocolVersion},
2✔
657
                }); err != nil {
2✔
658
                return errFileTransferPublishing
×
659
        }
×
660
        select {
2✔
661
        case natsMsg, ok := <-sessChan:
1✔
662
                if !ok {
1✔
663
                        return errFileTransferTimeout
×
664
                }
×
665
                var msg ws.ProtoMsg
1✔
666
                err := msgpack.Unmarshal(natsMsg.Data, &msg)
1✔
667
                if err != nil {
1✔
668
                        return errFileTransferUnmarshalling
×
669
                }
×
670

671
                if msg.Header.MsgType == ws.MessageTypeError {
2✔
672
                        erro := new(ws.Error)
1✔
673
                        //nolint:errcheck
1✔
674
                        msgpack.Unmarshal(natsMsg.Data, erro)
1✔
675
                        errCode := http.StatusInternalServerError
1✔
676
                        if erro.Code > 0 {
1✔
677
                                errCode = erro.Code
×
678
                        }
×
679
                        rspErr := NewError(
1✔
680
                                fmt.Errorf("handshake error from client: %s", erro.Error),
1✔
681
                                errCode,
1✔
682
                        )
1✔
683
                        return fmt.Errorf("handshake error from client: %w", rspErr)
1✔
684
                } else if msg.Header.MsgType != ws.MessageTypeAccept {
2✔
685
                        return errFileTransferNotImplemented
1✔
686
                }
1✔
687
                accept := new(ws.Accept)
1✔
688
                err = msgpack.Unmarshal(msg.Body, accept)
1✔
689
                if err != nil {
1✔
690
                        return errFileTransferUnmarshalling
×
691
                }
×
692

693
                for _, proto := range accept.Protocols {
2✔
694
                        if proto == ws.ProtoTypeFileTransfer {
2✔
695
                                return nil
1✔
696
                        }
1✔
697
                }
698
                // Let's try to be polite and close the session before returning
699
                //nolint:errcheck
700
                h.publishControlMessage(sessionID, deviceTopic, ws.MessageTypeClose, nil)
1✔
701
                return errFileTransferDisabled
1✔
702

703
        case <-time.After(fileTransferTimeout):
2✔
704
                return errFileTransferTimeout
2✔
705
        }
706
}
707

708
func (h ManagementController) uploadFileResponse(c *gin.Context, params *fileTransferParams,
709
        request *model.UploadFileRequest) {
1✔
710
        l := log.FromContext(c.Request.Context())
1✔
711

1✔
712
        // send a JSON-encoded error message in case of failure
1✔
713
        var responseError error
1✔
714
        errorStatusCode := http.StatusInternalServerError
1✔
715
        defer func() {
2✔
716
                if responseError != nil {
2✔
717
                        l.Error(responseError.Error())
1✔
718
                        var statusError *Error
1✔
719
                        if errors.As(responseError, &statusError) {
2✔
720
                                errorStatusCode = statusError.statusCode
1✔
721
                        }
1✔
722
                        c.JSON(errorStatusCode, gin.H{
1✔
723
                                "error": responseError.Error(),
1✔
724
                        })
1✔
725
                        return
1✔
726
                }
727
        }()
728

729
        // subscribe to messages from the device
730
        deviceTopic := model.GetDeviceSubject(params.TenantID, params.Device.ID)
1✔
731
        sessionTopic := model.GetSessionSubject(params.TenantID, params.SessionID)
1✔
732
        msgChan := make(chan *natsio.Msg, channelSize)
1✔
733
        sub, err := h.nats.ChanSubscribe(sessionTopic, msgChan)
1✔
734
        if err != nil {
1✔
735
                responseError = errors.Wrap(err, errFileTransferSubscribing.Error())
×
736
                return
×
737
        }
×
738

739
        //nolint:errcheck
740
        defer sub.Unsubscribe()
1✔
741

1✔
742
        if err = h.filetransferHandshake(msgChan, params.SessionID, deviceTopic); err != nil {
2✔
743
                switch err {
1✔
744
                case errFileTransferTimeout:
1✔
745
                        errorStatusCode = http.StatusRequestTimeout
1✔
746
                case errFileTransferNotImplemented, errFileTransferDisabled:
1✔
747
                        errorStatusCode = http.StatusBadGateway
1✔
748
                }
749
                responseError = err
1✔
750
                return
1✔
751
        }
752

753
        // Inform the device that we're closing the session
754
        //nolint:errcheck
755
        defer h.publishControlMessage(params.SessionID, deviceTopic, ws.MessageTypeClose, nil)
1✔
756

1✔
757
        // initialize the file transfer
1✔
758
        req := wsft.UploadRequest{
1✔
759
                SrcPath: request.SrcPath,
1✔
760
                Path:    request.Path,
1✔
761
                UID:     request.UID,
1✔
762
                GID:     request.GID,
1✔
763
                Mode:    request.Mode,
1✔
764
        }
1✔
765
        if err := h.publishFileTransferProtoMessage(params.SessionID,
1✔
766
                params.UserID, deviceTopic, wsft.MessageTypePut, req, 0); err != nil {
1✔
767
                responseError = err
×
768
                return
×
769
        }
×
770

771
        // receive the message from the device
772
        select {
1✔
773
        case wsMessage := <-msgChan:
1✔
774
                msg, msgBody, err := h.decodeFileTransferProtoMessage(wsMessage.Data)
1✔
775
                if err != nil {
1✔
776
                        responseError = err
×
777
                        return
×
778
                }
×
779

780
                // process incoming messages from the device by type
781
                switch msg.Header.MsgType {
1✔
782

783
                // error message, stop here
784
                case wsft.MessageTypeError:
1✔
785
                        errorMsg := msgBody.(*ws.Error)
1✔
786
                        errorStatusCode = http.StatusBadRequest
1✔
787
                        if errorMsg.Code > 0 {
1✔
788
                                errorStatusCode = errorMsg.Code
×
789
                        }
×
790
                        responseError = NewError(errors.New(errorMsg.Error), errorStatusCode)
1✔
791
                        return
1✔
792

793
                // you can continue the upload
794
                case wsft.MessageTypeACK:
1✔
795
                }
796

797
        // no message after timeout expired, stop here
798
        case <-time.After(fileTransferTimeout):
1✔
799
                errorStatusCode = http.StatusRequestTimeout
1✔
800
                responseError = errFileTransferTimeout
1✔
801
                return
1✔
802
        }
803

804
        // receive the ack message from the device
805
        latestAckOffsets := make(chan int64, 1)
1✔
806
        errorChan := make(chan error)
1✔
807
        go h.uploadFileResponseHandleInboundMessages(
1✔
808
                c, params, msgChan, errorChan, latestAckOffsets,
1✔
809
        )
1✔
810

1✔
811
        h.uploadFileResponseWriter(
1✔
812
                c, params, request, errorChan, latestAckOffsets, &errorStatusCode, &responseError,
1✔
813
        )
1✔
814
}
815

816
func (h ManagementController) uploadFileResponseWriter(c *gin.Context,
817
        params *fileTransferParams, request *model.UploadFileRequest,
818
        errorChan chan error, latestAckOffsets <-chan int64,
819
        errorStatusCode *int, responseError *error) {
1✔
820
        var (
1✔
821
                offset          int64
1✔
822
                latestAckOffset int64
1✔
823
        )
1✔
824
        deviceTopic := model.GetDeviceSubject(params.TenantID, params.Device.ID)
1✔
825

1✔
826
        timeout := time.NewTimer(fileTransferTimeout)
1✔
827
        data := make([]byte, fileTransferBufferSize)
1✔
828
        for {
2✔
829
                n, err := request.File.Read(data)
1✔
830
                if err != nil && err != io.EOF {
1✔
831
                        if err == io.ErrUnexpectedEOF {
×
832
                                *errorStatusCode = http.StatusBadRequest
×
833
                                *responseError = errors.New(
×
834
                                        "malformed request body: " +
×
835
                                                "did not find closing multipart boundary",
×
836
                                )
×
837
                        } else {
×
838
                                *responseError = err
×
839
                        }
×
840
                        return
×
841
                } else if n == 0 {
2✔
842
                        if err := h.publishFileTransferProtoMessage(params.SessionID,
1✔
843
                                params.UserID, deviceTopic, wsft.MessageTypeChunk, nil,
1✔
844
                                offset); err != nil {
1✔
845
                                *responseError = err
×
846
                                return
×
847
                        }
×
848
                        break
1✔
849
                }
850

851
                // send the chunk
852
                if err := h.publishFileTransferProtoMessage(params.SessionID,
1✔
853
                        params.UserID, deviceTopic, wsft.MessageTypeChunk, data[0:n],
1✔
854
                        offset); err != nil {
1✔
855
                        *responseError = err
×
856
                        return
×
857
                }
×
858

859
                // update the offset
860
                offset += int64(n)
1✔
861

1✔
862
                // wait for acks, in case the ack sliding window is over
1✔
863
                if offset > latestAckOffset+int64(fileTransferBufferSize*ackSlidingWindowRecv) {
2✔
864
                        timeout.Reset(fileTransferTimeout)
1✔
865
                        select {
1✔
866
                        case err := <-errorChan:
1✔
867
                                *errorStatusCode = http.StatusBadRequest
1✔
868
                                *responseError = err
1✔
869
                                return
1✔
870
                        case latestAckOffset = <-latestAckOffsets:
1✔
871
                        case <-timeout.C:
1✔
872
                                *errorStatusCode = http.StatusRequestTimeout
1✔
873
                                *responseError = errFileTransferTimeout
1✔
874
                                return
1✔
875
                        }
876
                } else {
×
877
                        // in case of error, report it
×
878
                        select {
×
879
                        case err := <-errorChan:
×
880
                                *errorStatusCode = http.StatusBadRequest
×
881
                                *responseError = err
×
882
                                return
×
883
                        default:
×
884
                        }
885
                }
886

887
        }
888

889
        for offset > latestAckOffset {
1✔
890
                timeout.Reset(fileTransferTimeout)
×
891
                select {
×
892
                case latestAckOffset = <-latestAckOffsets:
×
893
                case <-timeout.C:
×
894
                        *errorStatusCode = http.StatusRequestTimeout
×
895
                        *responseError = errFileTransferTimeout
×
896
                        return
×
897
                }
898
        }
899

900
        c.Writer.WriteHeader(http.StatusCreated)
1✔
901
}
902

903
func (h ManagementController) parseUploadFileRequest(c *gin.Context) (*model.UploadFileRequest,
904
        error) {
2✔
905
        reader, err := c.Request.MultipartReader()
2✔
906
        if err != nil {
3✔
907
                return nil, err
1✔
908
        }
1✔
909

910
        request := &model.UploadFileRequest{}
1✔
911
        for {
2✔
912
                part, err := reader.NextPart()
1✔
913
                if err == io.EOF {
2✔
914
                        break
1✔
915
                }
916
                if err != nil {
1✔
917
                        return nil, err
×
918
                }
×
919
                var n int
1✔
920
                data := make([]byte, fileTransferBufferSize)
1✔
921
                partName := part.FormName()
1✔
922
                switch partName {
1✔
923
                case fieldUploadPath, fieldUploadUID, fieldUploadGID, fieldUploadMode:
1✔
924
                        n, err = part.Read(data)
1✔
925
                        var value string
1✔
926
                        if err == nil || err == io.EOF {
2✔
927
                                value = string(data[:n])
1✔
928
                        }
1✔
929
                        switch partName {
1✔
930
                        case fieldUploadPath:
1✔
931
                                request.Path = &value
1✔
932
                        case fieldUploadUID:
1✔
933
                                v, err := strconv.Atoi(string(data[:n]))
1✔
934
                                if err != nil {
1✔
935
                                        return nil, err
×
936
                                }
×
937
                                nUID := uint32(v)
1✔
938
                                request.UID = &nUID
1✔
939
                        case fieldUploadGID:
1✔
940
                                v, err := strconv.Atoi(string(data[:n]))
1✔
941
                                if err != nil {
1✔
942
                                        return nil, err
×
943
                                }
×
944
                                nGID := uint32(v)
1✔
945
                                request.GID = &nGID
1✔
946
                        case fieldUploadMode:
1✔
947
                                v, err := strconv.ParseUint(string(data[:n]), 8, 32)
1✔
948
                                if err != nil {
1✔
949
                                        return nil, err
×
950
                                }
×
951
                                nMode := uint32(v)
1✔
952
                                request.Mode = &nMode
1✔
953
                        }
954
                        part.Close()
1✔
955
                case fieldUploadFile:
1✔
956
                        filename := part.FileName()
1✔
957
                        request.SrcPath = &filename
1✔
958
                        request.File = part
1✔
959
                }
960
                // file is the last part we can process, in order to avoid loading it in memory
961
                if request.File != nil {
2✔
962
                        break
1✔
963
                }
964
        }
965

966
        return request, nil
1✔
967
}
968

969
func (h ManagementController) UploadFile(c *gin.Context) {
2✔
970
        l := log.FromContext(c.Request.Context())
2✔
971

2✔
972
        params, statusCode, err := h.getFileTransferParams(c)
2✔
973
        if err != nil {
3✔
974
                l.Error(err.Error())
1✔
975
                c.JSON(statusCode, gin.H{"error": err.Error()})
1✔
976
                return
1✔
977
        }
1✔
978

979
        request, err := h.parseUploadFileRequest(c)
2✔
980
        if err != nil {
3✔
981
                l.Error(err.Error())
1✔
982
                c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
1✔
983
                return
1✔
984
        }
1✔
985

986
        if err := request.Validate(); err != nil {
2✔
987
                l.Error(err.Error())
1✔
988
                c.JSON(http.StatusBadRequest, gin.H{
1✔
989
                        "error": errors.Wrap(err, "bad request").Error(),
1✔
990
                })
1✔
991
                return
1✔
992
        }
1✔
993

994
        defer request.File.Close()
1✔
995

1✔
996
        ctx := c.Request.Context()
1✔
997
        if err := h.app.UploadFile(ctx, params.UserID, params.Device.ID,
1✔
998
                *request.Path); err != nil {
2✔
999
                l.Error(err)
1✔
1000
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
1001
                        "error": errors.Wrap(err, "bad request").Error(),
1✔
1002
                })
1✔
1003
                return
1✔
1004
        }
1✔
1005

1006
        h.uploadFileResponse(c, params, request)
1✔
1007
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc