• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / deviceconnect / 1094563519

04 Dec 2023 02:24PM UTC coverage: 80.0% (+3.7%) from 76.254%
1094563519

push

gitlab-ci

web-flow
Merge pull request #336 from alfrunes/revert-ALV-182

Revert "Merge pull request #334 from alfrunes/ALV-182"

109 of 132 new or added lines in 1 file covered. (82.58%)

2 existing lines in 1 file now uncovered.

1508 of 1885 relevant lines covered (80.0%)

37.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.69
/api/http/management_filetransfer.go
1
// Copyright 2021 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package http
16

17
import (
18
        "fmt"
19
        "io"
20
        "net/http"
21
        "os"
22
        "path"
23
        "strconv"
24
        "strings"
25
        "time"
26

27
        "github.com/gin-gonic/gin"
28
        "github.com/google/uuid"
29
        natsio "github.com/nats-io/nats.go"
30
        "github.com/pkg/errors"
31
        "github.com/vmihailenco/msgpack/v5"
32

33
        "github.com/mendersoftware/go-lib-micro/identity"
34
        "github.com/mendersoftware/go-lib-micro/log"
35
        "github.com/mendersoftware/go-lib-micro/ws"
36
        wsft "github.com/mendersoftware/go-lib-micro/ws/filetransfer"
37

38
        "github.com/mendersoftware/deviceconnect/app"
39
        "github.com/mendersoftware/deviceconnect/model"
40
)
41

42
type fileTransferParams struct {
43
        TenantID  string
44
        UserID    string
45
        SessionID string
46
        Device    *model.Device
47
}
48

49
const (
50
        hdrContentType            = "Content-Type"
51
        hdrContentDisposition     = "Content-Disposition"
52
        hdrMenderFileTransferPath = "X-MEN-File-Path"
53
        hdrMenderFileTransferUID  = "X-MEN-File-UID"
54
        hdrMenderFileTransferGID  = "X-MEN-File-GID"
55
        hdrMenderFileTransferMode = "X-MEN-File-Mode"
56
        hdrMenderFileTransferSize = "X-MEN-File-Size"
57
)
58

59
const (
60
        fieldUploadPath = "path"
61
        fieldUploadUID  = "uid"
62
        fieldUploadGID  = "gid"
63
        fieldUploadMode = "mode"
64
        fieldUploadFile = "file"
65

66
        PropertyOffset = "offset"
67

68
        paramDownloadPath = "path"
69
)
70

71
var fileTransferPingInterval = 30 * time.Second
72
var fileTransferTimeout = 60 * time.Second
73
var fileTransferBufferSize = 4096
74
var ackSlidingWindowSend = 10
75
var ackSlidingWindowRecv = 20
76

77
var (
78
        errFileTransferMarshalling    = errors.New("failed to marshal the request")
79
        errFileTransferUnmarshalling  = errors.New("failed to unmarshal the request")
80
        errFileTransferPublishing     = errors.New("failed to publish the message")
81
        errFileTransferSubscribing    = errors.New("failed to subscribe to the mesages")
82
        errFileTransferTimeout        = errors.New("file transfer timed out")
83
        errFileTransferFailed         = errors.New("file transfer failed")
84
        errFileTransferNotImplemented = errors.New("file transfer not implemented on device")
85
        errFileTransferDisabled       = errors.New("file transfer disabled on device")
86
)
87

88
var newFileTransferSessionID = func() (uuid.UUID, error) {
×
89
        return uuid.NewRandom()
×
90
}
×
91

92
func (h ManagementController) getFileTransferParams(c *gin.Context) (*fileTransferParams, int,
93
        error) {
35✔
94
        ctx := c.Request.Context()
35✔
95

35✔
96
        idata := identity.FromContext(ctx)
35✔
97
        if idata == nil || !idata.IsUser {
37✔
98
                return nil, http.StatusUnauthorized, ErrMissingUserAuthentication
2✔
99
        }
2✔
100
        tenantID := idata.Tenant
33✔
101
        deviceID := c.Param("deviceId")
33✔
102

33✔
103
        device, err := h.app.GetDevice(ctx, tenantID, deviceID)
33✔
104
        if err == app.ErrDeviceNotFound {
35✔
105
                return nil, http.StatusNotFound, err
2✔
106
        } else if err != nil {
35✔
107
                return nil, http.StatusBadRequest, err
2✔
108
        } else if device.Status != model.DeviceStatusConnected {
33✔
109
                return nil, http.StatusConflict, app.ErrDeviceNotConnected
2✔
110
        }
2✔
111

112
        if c.Request.Method != http.MethodGet && c.Request.Body == nil {
29✔
113
                return nil, http.StatusBadRequest, errors.New("missing request body")
2✔
114
        }
2✔
115

116
        sessionID, err := newFileTransferSessionID()
25✔
117
        if err != nil {
25✔
118
                return nil, http.StatusInternalServerError,
×
119
                        errors.New("failed to generate session ID")
×
120
        }
×
121

122
        return &fileTransferParams{
25✔
123
                TenantID:  idata.Tenant,
25✔
124
                UserID:    idata.Subject,
25✔
125
                SessionID: sessionID.String(),
25✔
126
                Device:    device,
25✔
127
        }, 0, nil
25✔
128
}
129

130
func (h ManagementController) publishFileTransferProtoMessage(sessionID, userID, deviceTopic,
131
        msgType string, body interface{}, offset int64) error {
31✔
132
        var msgBody []byte
31✔
133
        if msgType == wsft.MessageTypeChunk && body != nil {
34✔
134
                msgBody = body.([]byte)
3✔
135
        } else if msgType == wsft.MessageTypeACK {
39✔
136
                msgBody = nil
8✔
137
        } else if body != nil {
47✔
138
                var err error
19✔
139
                msgBody, err = msgpack.Marshal(body)
19✔
140
                if err != nil {
19✔
141
                        return errors.Wrap(err, errFileTransferMarshalling.Error())
×
142
                }
×
143
        }
144
        proto := ws.ProtoTypeFileTransfer
31✔
145
        if msgType == ws.MessageTypePing || msgType == ws.MessageTypePong {
31✔
146
                proto = ws.ProtoTypeControl
×
147
        }
×
148
        msg := &ws.ProtoMsg{
31✔
149
                Header: ws.ProtoHdr{
31✔
150
                        Proto:     proto,
31✔
151
                        MsgType:   msgType,
31✔
152
                        SessionID: sessionID,
31✔
153
                        Properties: map[string]interface{}{
31✔
154
                                PropertyUserID: userID,
31✔
155
                        },
31✔
156
                },
31✔
157
                Body: msgBody,
31✔
158
        }
31✔
159
        if msgType == wsft.MessageTypeChunk || msgType == wsft.MessageTypeACK {
43✔
160
                msg.Header.Properties[PropertyOffset] = offset
12✔
161
        }
12✔
162
        data, err := msgpack.Marshal(msg)
31✔
163
        if err != nil {
31✔
164
                return errors.Wrap(err, errFileTransferMarshalling.Error())
×
165
        }
×
166

167
        err = h.nats.Publish(deviceTopic, data)
31✔
168
        if err != nil {
31✔
169
                return errors.Wrap(err, errFileTransferPublishing.Error())
×
170
        }
×
171
        return nil
31✔
172
}
173

174
func (h ManagementController) publishControlMessage(
175
        sessionID, deviceTopic, messageType string, body interface{},
176
) error {
39✔
177
        msg := &ws.ProtoMsg{
39✔
178
                Header: ws.ProtoHdr{
39✔
179
                        Proto:     ws.ProtoTypeControl,
39✔
180
                        MsgType:   messageType,
39✔
181
                        SessionID: sessionID,
39✔
182
                },
39✔
183
        }
39✔
184

39✔
185
        if body != nil {
56✔
186
                if b, ok := body.([]byte); ok {
17✔
187
                        msg.Body = b
×
188
                } else {
17✔
189
                        b, err := msgpack.Marshal(body)
17✔
190
                        if err != nil {
17✔
191
                                return errors.Wrap(errFileTransferMarshalling, err.Error())
×
192
                        }
×
193
                        msg.Body = b
17✔
194
                }
195
        }
196

197
        data, err := msgpack.Marshal(msg)
39✔
198
        if err != nil {
39✔
199
                return errors.Wrap(errFileTransferMarshalling, err.Error())
×
200
        }
×
201
        err = h.nats.Publish(deviceTopic, data)
39✔
202
        if err != nil {
39✔
203
                return errors.Wrap(errFileTransferPublishing, err.Error())
×
204
        }
×
205
        return err
39✔
206
}
207

208
func (h ManagementController) decodeFileTransferProtoMessage(data []byte) (*ws.ProtoMsg,
209
        interface{}, error) {
25✔
210
        msg := &ws.ProtoMsg{}
25✔
211
        err := msgpack.Unmarshal(data, msg)
25✔
212
        if err != nil {
25✔
213
                return nil, nil, errors.Wrap(err, errFileTransferUnmarshalling.Error())
×
214
        }
×
215

216
        switch msg.Header.MsgType {
25✔
217
        case wsft.MessageTypeError:
4✔
218
                msgBody := &wsft.Error{}
4✔
219
                err := msgpack.Unmarshal(msg.Body, msgBody)
4✔
220
                if err != nil {
4✔
221
                        return nil, nil, errors.Wrap(err, errFileTransferUnmarshalling.Error())
×
222
                }
×
223
                return msg, msgBody, nil
4✔
224
        case wsft.MessageTypeFileInfo:
6✔
225
                msgBody := &wsft.FileInfo{}
6✔
226
                err := msgpack.Unmarshal(msg.Body, msgBody)
6✔
227
                if err != nil {
6✔
228
                        return nil, nil, errors.Wrap(err, errFileTransferUnmarshalling.Error())
×
229
                }
×
230
                return msg, msgBody, nil
6✔
231
        case wsft.MessageTypeACK, wsft.MessageTypeChunk, ws.MessageTypePing, ws.MessageTypePong:
13✔
232
                return msg, nil, nil
13✔
233
        }
234

235
        return nil, nil, errors.Errorf("unexpected message type '%s'", msg.Header.MsgType)
2✔
236
}
237

238
func writeHeaders(c *gin.Context, fileInfo *wsft.FileInfo) {
5✔
239
        c.Writer.WriteHeader(http.StatusOK)
5✔
240
        c.Writer.Header().Add(hdrContentType, "application/octet-stream")
5✔
241
        if fileInfo.Path != nil {
10✔
242
                filename := path.Base(*fileInfo.Path)
5✔
243
                c.Writer.Header().Add(hdrContentDisposition,
5✔
244
                        "attachment; filename=\""+filename+"\"")
5✔
245
                c.Writer.Header().Add(hdrMenderFileTransferPath, *fileInfo.Path)
5✔
246
        }
5✔
247
        if fileInfo.UID != nil {
6✔
248
                c.Writer.Header().Add(hdrMenderFileTransferUID, fmt.Sprintf("%d", *fileInfo.UID))
1✔
249
        }
1✔
250
        if fileInfo.GID != nil {
6✔
251
                c.Writer.Header().Add(hdrMenderFileTransferGID, fmt.Sprintf("%d", *fileInfo.GID))
1✔
252
        }
1✔
253
        if fileInfo.Mode != nil {
10✔
254
                c.Writer.Header().Add(hdrMenderFileTransferMode, fmt.Sprintf("%o", *fileInfo.Mode))
5✔
255
        }
5✔
256
        if fileInfo.Size != nil {
10✔
257
                c.Writer.Header().Add(hdrMenderFileTransferSize, fmt.Sprintf("%d", *fileInfo.Size))
5✔
258
        }
5✔
259
}
260

261
func (h ManagementController) downloadFileResponseError(c *gin.Context,
262
        responseHeaderSent *bool, responseError *error) {
9✔
263
        l := log.FromContext(c.Request.Context())
9✔
264
        if !*responseHeaderSent && *responseError != nil {
13✔
265
                l.Error((*responseError).Error())
4✔
266
                status := http.StatusInternalServerError
4✔
267
                // errFileTranserFailed is a special case, we return 400 instead of 500
4✔
268
                if strings.Contains((*responseError).Error(), errFileTransferFailed.Error()) {
6✔
269
                        status = http.StatusBadRequest
2✔
270
                } else if *responseError == errFileTransferTimeout {
5✔
271
                        status = http.StatusRequestTimeout
1✔
272
                } else if *responseError == errFileTransferNotImplemented ||
2✔
273
                        *responseError == errFileTransferDisabled {
2✔
274
                        status = http.StatusBadGateway
1✔
275
                }
1✔
276
                c.JSON(status, gin.H{
4✔
277
                        "error": (*responseError).Error(),
4✔
278
                })
4✔
279
                return
4✔
280
        }
281
}
282

283
func (h ManagementController) downloadFileResponse(c *gin.Context, params *fileTransferParams,
284
        request *model.DownloadFileRequest) {
9✔
285
        // send a JSON-encoded error message in case of failure
9✔
286
        var responseError error
9✔
287
        var responseHeaderSent bool
9✔
288
        defer h.downloadFileResponseError(c, &responseHeaderSent, &responseError)
9✔
289

9✔
290
        // subscribe to messages from the device
9✔
291
        deviceTopic := model.GetDeviceSubject(params.TenantID, params.Device.ID)
9✔
292
        sessionTopic := model.GetSessionSubject(params.TenantID, params.SessionID)
9✔
293
        msgChan := make(chan *natsio.Msg, channelSize)
9✔
294
        sub, err := h.nats.ChanSubscribe(sessionTopic, msgChan)
9✔
295
        if err != nil {
9✔
NEW
296
                responseError = errors.Wrap(err, errFileTransferSubscribing.Error())
×
297
                return
×
298
        }
×
299

300
        if err = h.filetransferHandshake(msgChan, params.SessionID, deviceTopic); err != nil {
10✔
301
                responseError = err
1✔
302
                return
1✔
303
        }
1✔
304

305
        //nolint:errcheck
306
        defer sub.Unsubscribe()
8✔
307

8✔
308
        // stat the remote file
8✔
309
        req := wsft.StatFile{
8✔
310
                Path: request.Path,
8✔
311
        }
8✔
312
        if err := h.publishFileTransferProtoMessage(params.SessionID,
8✔
313
                params.UserID, deviceTopic, wsft.MessageTypeStat, req, 0); err != nil {
8✔
NEW
314
                responseError = err
×
315
                return
×
316
        }
×
317

318
        // Inform the device that we're closing the session
319
        //nolint:errcheck
320
        defer h.publishControlMessage(params.SessionID, deviceTopic, ws.MessageTypeClose, nil)
8✔
321

8✔
322
        ticker := time.NewTicker(fileTransferPingInterval)
8✔
323
        defer ticker.Stop()
8✔
324

8✔
325
        // handle messages from the device
8✔
326
        timeout := time.NewTimer(fileTransferTimeout)
8✔
327
        latestOffset := int64(0)
8✔
328
        numberOfChunks := 0
8✔
329
        var fileInfo wsft.FileInfo
8✔
330
        for {
35✔
331
                select {
27✔
332
                case wsMessage := <-msgChan:
17✔
333
                        // reset the timeout ticket
17✔
334
                        timeout.Reset(fileTransferTimeout)
17✔
335
                        // process the message
17✔
336
                        err := h.downloadFileResponseProcessMessage(c, params, request,
17✔
337
                                wsMessage, deviceTopic, &latestOffset, &numberOfChunks,
17✔
338
                                &responseHeaderSent, &fileInfo, ticker)
17✔
339
                        if err == io.EOF {
19✔
340
                                return
2✔
341
                        } else if err != nil {
21✔
342
                                responseError = err
4✔
343
                                return
4✔
344
                        }
4✔
345
                // send a Ping message to keep the session alive
346
                case <-ticker.C:
8✔
347
                        responseError = h.publishControlMessage(
8✔
348
                                params.SessionID, deviceTopic, ws.MessageTypePing, nil,
8✔
349
                        )
8✔
350
                        if responseError != nil {
8✔
NEW
351
                                return
×
UNCOV
352
                        }
×
353

354
                // no message after timeout expired, stop here
355
                case <-timeout.C:
2✔
356
                        responseError = errFileTransferTimeout
2✔
357
                        return
2✔
358
                }
359
        }
360
}
361

362
func (h ManagementController) downloadFileResponseProcessMessage(c *gin.Context,
363
        params *fileTransferParams, request *model.DownloadFileRequest, wsMessage *natsio.Msg,
364
        deviceTopic string, latestOffset *int64, numberOfChunks *int, responseHeaderSent *bool,
365
        fileInfo *wsft.FileInfo, ticker *time.Ticker) error {
17✔
366
        msg, msgBody, err := h.decodeFileTransferProtoMessage(wsMessage.Data)
17✔
367
        if err != nil {
17✔
NEW
368
                return err
×
NEW
369
        }
×
370

371
        // process incoming messages from the device by type
372
        switch msg.Header.MsgType {
17✔
373

374
        // error message, stop here
375
        case wsft.MessageTypeError:
2✔
376
                errorMsg := msgBody.(*wsft.Error)
2✔
377
                if *errorMsg.MessageType == wsft.MessageTypeStat {
4✔
378
                        return errors.Wrap(errors.New(*errorMsg.Error),
2✔
379
                                errFileTransferFailed.Error())
2✔
380
                } else {
2✔
NEW
381
                        return errors.New(*errorMsg.Error)
×
NEW
382
                }
×
383

384
        // file stat response, if okay, let's get the file
385
        case wsft.MessageTypeFileInfo:
6✔
386
                req := wsft.GetFile{
6✔
387
                        Path: request.Path,
6✔
388
                }
6✔
389
                if err := h.publishFileTransferProtoMessage(params.SessionID,
6✔
390
                        params.UserID, deviceTopic, wsft.MessageTypeGet,
6✔
391
                        req, 0); err != nil {
6✔
NEW
392
                        return err
×
NEW
393
                }
×
394
                *fileInfo = *msgBody.(*wsft.FileInfo)
6✔
395
                if (os.FileMode(*fileInfo.Mode) & os.ModeType) != 0 {
7✔
396
                        err := errors.New("path is not a regular file")
1✔
397
                        return errors.Wrap(err, errFileTransferFailed.Error())
1✔
398
                }
1✔
399

400
        // file data chunk
401
        case wsft.MessageTypeChunk:
9✔
402
                if !*responseHeaderSent {
14✔
403
                        writeHeaders(c, fileInfo)
5✔
404
                        *responseHeaderSent = true
5✔
405
                }
5✔
406
                if msg.Body == nil {
11✔
407
                        if err := h.publishFileTransferProtoMessage(
2✔
408
                                params.SessionID, params.UserID, deviceTopic,
2✔
409
                                wsft.MessageTypeACK, nil,
2✔
410
                                *latestOffset); err != nil {
2✔
NEW
411
                                return err
×
NEW
412
                        }
×
413
                        return io.EOF
2✔
414
                }
415

416
                // verify the offset property
417
                propOffset, _ := msg.Header.Properties[PropertyOffset].(int64)
7✔
418
                if propOffset != *latestOffset {
8✔
419
                        return errors.Wrap(errFileTransferFailed,
1✔
420
                                "wrong offset received")
1✔
421
                }
1✔
422
                *latestOffset += int64(len(msg.Body))
6✔
423

6✔
424
                _, err := c.Writer.Write(msg.Body)
6✔
425
                if err != nil {
6✔
NEW
426
                        return err
×
NEW
427
                }
×
428

429
                (*numberOfChunks)++
6✔
430
                if *numberOfChunks >= ackSlidingWindowSend {
12✔
431
                        if err := h.publishFileTransferProtoMessage(
6✔
432
                                params.SessionID, params.UserID, deviceTopic,
6✔
433
                                wsft.MessageTypeACK, nil,
6✔
434
                                *latestOffset); err != nil {
6✔
NEW
435
                                return err
×
436
                        }
×
437
                        *numberOfChunks = 0
6✔
438
                }
439

NEW
440
        case ws.MessageTypePing:
×
NEW
441
                if err := h.publishFileTransferProtoMessage(
×
NEW
442
                        params.SessionID, params.UserID, deviceTopic,
×
NEW
443
                        ws.MessageTypePong, nil,
×
NEW
444
                        -1); err != nil {
×
NEW
445
                        return err
×
UNCOV
446
                }
×
NEW
447
                fallthrough
×
448

NEW
449
        case ws.MessageTypePong:
×
NEW
450
                ticker.Reset(fileTransferPingInterval)
×
451
        }
452

453
        return nil
11✔
454
}
455

456
func (h ManagementController) DownloadFile(c *gin.Context) {
17✔
457
        l := log.FromContext(c.Request.Context())
17✔
458

17✔
459
        params, statusCode, err := h.getFileTransferParams(c)
17✔
460
        if err != nil {
21✔
461
                l.Error(err)
4✔
462
                c.JSON(statusCode, gin.H{"error": err.Error()})
4✔
463
                return
4✔
464
        }
4✔
465

466
        path := c.Request.URL.Query().Get(paramDownloadPath)
13✔
467
        request := &model.DownloadFileRequest{
13✔
468
                Path: &path,
13✔
469
        }
13✔
470

13✔
471
        if err := request.Validate(); err != nil {
16✔
472
                l.Error(err)
3✔
473
                c.JSON(http.StatusBadRequest, gin.H{
3✔
474
                        "error": errors.Wrap(err, "bad request").Error(),
3✔
475
                })
3✔
476
                return
3✔
477
        }
3✔
478

479
        ctx := c.Request.Context()
10✔
480
        if err := h.app.DownloadFile(ctx, params.UserID, params.Device.ID,
10✔
481
                *request.Path); err != nil {
11✔
482
                l.Error(err)
1✔
483
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
484
                        "error": errors.Wrap(err, "bad request").Error(),
1✔
485
                })
1✔
486
                return
1✔
487
        }
1✔
488

489
        h.downloadFileResponse(c, params, request)
9✔
490
}
491

492
func (h ManagementController) uploadFileResponseHandleInboundMessages(
493
        c *gin.Context, params *fileTransferParams,
494
        msgChan chan *natsio.Msg, errorChan chan error,
495
        latestAckOffsets chan int64,
496
) {
3✔
497
        var latestAckOffset int64
3✔
498
        deviceTopic := model.GetDeviceSubject(params.TenantID, params.Device.ID)
3✔
499
        for {
7✔
500
                select {
4✔
501
                case wsMessage := <-msgChan:
4✔
502
                        msg, msgBody, err := h.decodeFileTransferProtoMessage(
4✔
503
                                wsMessage.Data)
4✔
504
                        if err != nil {
6✔
505
                                errorChan <- err
2✔
506
                                return
2✔
507
                        }
2✔
508

509
                        // process incoming messages from the device by type
510
                        switch msg.Header.MsgType {
2✔
511

512
                        // error message, stop here
513
                        case wsft.MessageTypeError:
1✔
514
                                errorMsg := msgBody.(*wsft.Error)
1✔
515
                                errorChan <- errors.New(*errorMsg.Error)
1✔
516
                                return
1✔
517

518
                        // you can continue the upload
519
                        case wsft.MessageTypeACK:
1✔
520
                                propValue := msg.Header.Properties[PropertyOffset]
1✔
521
                                propOffset, _ := propValue.(int64)
1✔
522
                                if propOffset > latestAckOffset {
2✔
523
                                        latestAckOffset = propOffset
1✔
524
                                        select {
1✔
525
                                        case latestAckOffsets <- latestAckOffset:
1✔
526
                                        case <-latestAckOffsets:
×
527
                                                // Replace ack offset with the latest one
×
528
                                                latestAckOffsets <- latestAckOffset
×
529
                                        }
530
                                }
531

532
                        // handle ping messages
533
                        case ws.MessageTypePing:
×
534
                                if err := h.publishFileTransferProtoMessage(
×
535
                                        params.SessionID, params.UserID, deviceTopic,
×
536
                                        ws.MessageTypePong, nil,
×
537
                                        -1); err != nil {
×
538
                                        errorChan <- err
×
539
                                }
×
540
                        }
541
                case <-c.Done():
×
542
                        return
×
543
                }
544
        }
545
}
546

547
// filetransferHandshake initiates a handshake and checks that the device
548
// is willing to accept file transfer requests.
549
func (h ManagementController) filetransferHandshake(
550
        sessChan <-chan *natsio.Msg, sessionID, deviceTopic string,
551
) error {
17✔
552
        if err := h.publishControlMessage(
17✔
553
                sessionID, deviceTopic,
17✔
554
                ws.MessageTypeOpen, ws.Open{
17✔
555
                        Versions: []int{ws.ProtocolVersion},
17✔
556
                }); err != nil {
17✔
557
                return errFileTransferPublishing
×
558
        }
×
559
        select {
17✔
560
        case natsMsg := <-sessChan:
16✔
561
                var msg ws.ProtoMsg
16✔
562
                err := msgpack.Unmarshal(natsMsg.Data, &msg)
16✔
563
                if err != nil {
16✔
564
                        return errFileTransferUnmarshalling
×
565
                }
×
566

567
                if msg.Header.MsgType == ws.MessageTypeError {
17✔
568
                        erro := new(ws.Error)
1✔
569
                        //nolint:errcheck
1✔
570
                        msgpack.Unmarshal(natsMsg.Data, erro)
1✔
571
                        return errors.Errorf("handshake error from client: %s", erro.Error)
1✔
572
                } else if msg.Header.MsgType != ws.MessageTypeAccept {
17✔
573
                        return errFileTransferNotImplemented
1✔
574
                }
1✔
575
                accept := new(ws.Accept)
14✔
576
                err = msgpack.Unmarshal(msg.Body, accept)
14✔
577
                if err != nil {
14✔
578
                        return errFileTransferUnmarshalling
×
579
                }
×
580

581
                for _, proto := range accept.Protocols {
29✔
582
                        if proto == ws.ProtoTypeFileTransfer {
28✔
583
                                return nil
13✔
584
                        }
13✔
585
                }
586
                // Let's try to be polite and close the session before returning
587
                //nolint:errcheck
588
                h.publishControlMessage(sessionID, deviceTopic, ws.MessageTypeClose, nil)
1✔
589
                return errFileTransferDisabled
1✔
590

591
        case <-time.After(fileTransferTimeout):
1✔
592
                return errFileTransferTimeout
1✔
593
        }
594
}
595

596
func (h ManagementController) uploadFileResponse(c *gin.Context, params *fileTransferParams,
597
        request *model.UploadFileRequest) {
8✔
598
        l := log.FromContext(c.Request.Context())
8✔
599

8✔
600
        // send a JSON-encoded error message in case of failure
8✔
601
        var responseError error
8✔
602
        errorStatusCode := http.StatusInternalServerError
8✔
603
        defer func() {
16✔
604
                if responseError != nil {
15✔
605
                        l.Error(responseError.Error())
7✔
606
                        c.JSON(errorStatusCode, gin.H{
7✔
607
                                "error": responseError.Error(),
7✔
608
                        })
7✔
609
                        return
7✔
610
                }
7✔
611
        }()
612

613
        // subscribe to messages from the device
614
        deviceTopic := model.GetDeviceSubject(params.TenantID, params.Device.ID)
8✔
615
        sessionTopic := model.GetSessionSubject(params.TenantID, params.SessionID)
8✔
616
        msgChan := make(chan *natsio.Msg, channelSize)
8✔
617
        sub, err := h.nats.ChanSubscribe(sessionTopic, msgChan)
8✔
618
        if err != nil {
8✔
619
                responseError = errors.Wrap(err, errFileTransferSubscribing.Error())
×
620
                return
×
621
        }
×
622

623
        //nolint:errcheck
624
        defer sub.Unsubscribe()
8✔
625

8✔
626
        if err = h.filetransferHandshake(msgChan, params.SessionID, deviceTopic); err != nil {
11✔
627
                switch err {
3✔
628
                case errFileTransferTimeout:
1✔
629
                        errorStatusCode = http.StatusRequestTimeout
1✔
630
                case errFileTransferNotImplemented, errFileTransferDisabled:
1✔
631
                        errorStatusCode = http.StatusBadGateway
1✔
632
                }
633
                responseError = err
3✔
634
                return
3✔
635
        }
636

637
        // Inform the device that we're closing the session
638
        //nolint:errcheck
639
        defer h.publishControlMessage(params.SessionID, deviceTopic, ws.MessageTypeClose, nil)
5✔
640

5✔
641
        // initialize the file transfer
5✔
642
        req := wsft.UploadRequest{
5✔
643
                SrcPath: request.SrcPath,
5✔
644
                Path:    request.Path,
5✔
645
                UID:     request.UID,
5✔
646
                GID:     request.GID,
5✔
647
                Mode:    request.Mode,
5✔
648
        }
5✔
649
        if err := h.publishFileTransferProtoMessage(params.SessionID,
5✔
650
                params.UserID, deviceTopic, wsft.MessageTypePut, req, 0); err != nil {
5✔
651
                responseError = err
×
652
                return
×
653
        }
×
654

655
        // receive the message from the device
656
        select {
5✔
657
        case wsMessage := <-msgChan:
4✔
658
                msg, msgBody, err := h.decodeFileTransferProtoMessage(wsMessage.Data)
4✔
659
                if err != nil {
4✔
660
                        responseError = err
×
661
                        return
×
662
                }
×
663

664
                // process incoming messages from the device by type
665
                switch msg.Header.MsgType {
4✔
666

667
                // error message, stop here
668
                case wsft.MessageTypeError:
1✔
669
                        errorMsg := msgBody.(*wsft.Error)
1✔
670
                        errorStatusCode = http.StatusBadRequest
1✔
671
                        responseError = errors.New(*errorMsg.Error)
1✔
672
                        return
1✔
673

674
                // you can continue the upload
675
                case wsft.MessageTypeACK:
3✔
676
                }
677

678
        // no message after timeout expired, stop here
679
        case <-time.After(fileTransferTimeout):
1✔
680
                errorStatusCode = http.StatusRequestTimeout
1✔
681
                responseError = errFileTransferTimeout
1✔
682
                return
1✔
683
        }
684

685
        // receive the ack message from the device
686
        latestAckOffsets := make(chan int64, 1)
3✔
687
        errorChan := make(chan error)
3✔
688
        go h.uploadFileResponseHandleInboundMessages(
3✔
689
                c, params, msgChan, errorChan, latestAckOffsets,
3✔
690
        )
3✔
691

3✔
692
        h.uploadFileResponseWriter(
3✔
693
                c, params, request, errorChan, latestAckOffsets, &errorStatusCode, &responseError,
3✔
694
        )
3✔
695
}
696

697
func (h ManagementController) uploadFileResponseWriter(c *gin.Context,
698
        params *fileTransferParams, request *model.UploadFileRequest,
699
        errorChan chan error, latestAckOffsets <-chan int64,
700
        errorStatusCode *int, responseError *error) {
3✔
701
        var (
3✔
702
                offset          int64
3✔
703
                latestAckOffset int64
3✔
704
        )
3✔
705
        deviceTopic := model.GetDeviceSubject(params.TenantID, params.Device.ID)
3✔
706

3✔
707
        timeout := time.NewTimer(fileTransferTimeout)
3✔
708
        data := make([]byte, fileTransferBufferSize)
3✔
709
        for {
7✔
710
                n, err := request.File.Read(data)
4✔
711
                if err != nil && err != io.EOF {
4✔
712
                        if err == io.ErrUnexpectedEOF {
×
713
                                *errorStatusCode = http.StatusBadRequest
×
714
                                *responseError = errors.New(
×
715
                                        "malformed request body: " +
×
716
                                                "did not find closing multipart boundary",
×
717
                                )
×
718
                        } else {
×
719
                                *responseError = err
×
720
                        }
×
721
                        return
×
722
                } else if n == 0 {
5✔
723
                        if err := h.publishFileTransferProtoMessage(params.SessionID,
1✔
724
                                params.UserID, deviceTopic, wsft.MessageTypeChunk, nil,
1✔
725
                                offset); err != nil {
1✔
726
                                *responseError = err
×
727
                                return
×
728
                        }
×
729
                        break
1✔
730
                }
731

732
                // send the chunk
733
                if err := h.publishFileTransferProtoMessage(params.SessionID,
3✔
734
                        params.UserID, deviceTopic, wsft.MessageTypeChunk, data[0:n],
3✔
735
                        offset); err != nil {
3✔
736
                        *responseError = err
×
737
                        return
×
738
                }
×
739

740
                // update the offset
741
                offset += int64(n)
3✔
742

3✔
743
                // wait for acks, in case the ack sliding window is over
3✔
744
                if offset > latestAckOffset+int64(fileTransferBufferSize*ackSlidingWindowRecv) {
6✔
745
                        timeout.Reset(fileTransferTimeout)
3✔
746
                        select {
3✔
747
                        case err := <-errorChan:
1✔
748
                                *errorStatusCode = http.StatusBadRequest
1✔
749
                                *responseError = err
1✔
750
                                return
1✔
751
                        case latestAckOffset = <-latestAckOffsets:
1✔
752
                        case <-timeout.C:
1✔
753
                                *errorStatusCode = http.StatusRequestTimeout
1✔
754
                                *responseError = errFileTransferTimeout
1✔
755
                                return
1✔
756
                        }
757
                } else {
×
758
                        // in case of error, report it
×
759
                        select {
×
760
                        case err := <-errorChan:
×
761
                                *errorStatusCode = http.StatusBadRequest
×
762
                                *responseError = err
×
763
                                return
×
764
                        default:
×
765
                        }
766
                }
767

768
        }
769

770
        for offset > latestAckOffset {
1✔
771
                timeout.Reset(fileTransferTimeout)
×
772
                select {
×
773
                case latestAckOffset = <-latestAckOffsets:
×
774
                case <-timeout.C:
×
775
                        *errorStatusCode = http.StatusRequestTimeout
×
776
                        *responseError = errFileTransferTimeout
×
777
                        return
×
778
                }
779
        }
780

781
        c.Writer.WriteHeader(http.StatusCreated)
1✔
782
}
783

784
func (h ManagementController) parseUploadFileRequest(c *gin.Context) (*model.UploadFileRequest,
785
        error) {
12✔
786
        reader, err := c.Request.MultipartReader()
12✔
787
        if err != nil {
12✔
788
                return nil, err
×
789
        }
×
790

791
        request := &model.UploadFileRequest{}
12✔
792
        for {
72✔
793
                part, err := reader.NextPart()
60✔
794
                if err == io.EOF {
61✔
795
                        break
1✔
796
                }
797
                if err != nil {
59✔
798
                        return nil, err
×
799
                }
×
800
                var n int
59✔
801
                data := make([]byte, fileTransferBufferSize)
59✔
802
                partName := part.FormName()
59✔
803
                switch partName {
59✔
804
                case fieldUploadPath, fieldUploadUID, fieldUploadGID, fieldUploadMode:
48✔
805
                        n, err = part.Read(data)
48✔
806
                        var value string
48✔
807
                        if err == nil || err == io.EOF {
96✔
808
                                value = string(data[:n])
48✔
809
                        }
48✔
810
                        switch partName {
48✔
811
                        case fieldUploadPath:
12✔
812
                                request.Path = &value
12✔
813
                        case fieldUploadUID:
12✔
814
                                v, err := strconv.Atoi(string(data[:n]))
12✔
815
                                if err != nil {
12✔
816
                                        return nil, err
×
817
                                }
×
818
                                nUID := uint32(v)
12✔
819
                                request.UID = &nUID
12✔
820
                        case fieldUploadGID:
12✔
821
                                v, err := strconv.Atoi(string(data[:n]))
12✔
822
                                if err != nil {
12✔
823
                                        return nil, err
×
824
                                }
×
825
                                nGID := uint32(v)
12✔
826
                                request.GID = &nGID
12✔
827
                        case fieldUploadMode:
12✔
828
                                v, err := strconv.ParseUint(string(data[:n]), 8, 32)
12✔
829
                                if err != nil {
12✔
830
                                        return nil, err
×
831
                                }
×
832
                                nMode := uint32(v)
12✔
833
                                request.Mode = &nMode
12✔
834
                        }
835
                        part.Close()
48✔
836
                case fieldUploadFile:
11✔
837
                        filename := part.FileName()
11✔
838
                        request.SrcPath = &filename
11✔
839
                        request.File = part
11✔
840
                }
841
                // file is the last part we can process, in order to avoid loading it in memory
842
                if request.File != nil {
70✔
843
                        break
11✔
844
                }
845
        }
846

847
        return request, nil
12✔
848
}
849

850
func (h ManagementController) UploadFile(c *gin.Context) {
18✔
851
        l := log.FromContext(c.Request.Context())
18✔
852

18✔
853
        params, statusCode, err := h.getFileTransferParams(c)
18✔
854
        if err != nil {
24✔
855
                l.Error(err.Error())
6✔
856
                c.JSON(statusCode, gin.H{"error": err.Error()})
6✔
857
                return
6✔
858
        }
6✔
859

860
        request, err := h.parseUploadFileRequest(c)
12✔
861
        if err != nil {
12✔
862
                l.Error(err.Error())
×
863
                c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
×
864
                return
×
865
        }
×
866

867
        if err := request.Validate(); err != nil {
15✔
868
                l.Error(err.Error())
3✔
869
                c.JSON(http.StatusBadRequest, gin.H{
3✔
870
                        "error": errors.Wrap(err, "bad request").Error(),
3✔
871
                })
3✔
872
                return
3✔
873
        }
3✔
874

875
        defer request.File.Close()
9✔
876

9✔
877
        ctx := c.Request.Context()
9✔
878
        if err := h.app.UploadFile(ctx, params.UserID, params.Device.ID,
9✔
879
                *request.Path); err != nil {
10✔
880
                l.Error(err)
1✔
881
                c.JSON(http.StatusInternalServerError, gin.H{
1✔
882
                        "error": errors.Wrap(err, "bad request").Error(),
1✔
883
                })
1✔
884
                return
1✔
885
        }
1✔
886

887
        h.uploadFileResponse(c, params, request)
8✔
888
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc