• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-connect / 920889739

pending completion
920889739

push

gitlab-ci

web-flow
Merge pull request #101 from lluiscampos/ci-update-unit-tests

ci: Update unit tests job to the latest template

2460 of 3151 relevant lines covered (78.07%)

6.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.74
/session/filetransfer.go
1
// Copyright 2021 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package session
16

17
import (
18
        "fmt"
19
        "io"
20
        "os"
21
        "path"
22
        "runtime"
23
        "strconv"
24
        "sync/atomic"
25
        "syscall"
26

27
        "github.com/pkg/errors"
28
        log "github.com/sirupsen/logrus"
29
        "github.com/vmihailenco/msgpack/v5"
30

31
        "github.com/mendersoftware/go-lib-micro/ws"
32
        wsft "github.com/mendersoftware/go-lib-micro/ws/filetransfer"
33

34
        "github.com/mendersoftware/mender-connect/config"
35
        "github.com/mendersoftware/mender-connect/limits/filetransfer"
36
        "github.com/mendersoftware/mender-connect/session/model"
37
)
38

39
const (
40
        ACKSlidingWindowSend = 10
41
        ACKSlidingWindowRecv = 20
42
        FileTransferBufSize  = 4096
43
)
44

45
var errFileTransferAbort = errors.New("handler aborted")
46

47
type FileTransferHandler struct {
48
        // mutex is used for protecting the async handler. A channel with cap(1)
49
        // is used instead of sync.Mutex to be able to test acquiring the
50
        // mutex without blocking.
51
        mutex chan struct{}
52
        // msgChan is used to pass messages down to the async file transfer handler routine.
53
        msgChan chan *ws.ProtoMsg
54
        permit  *filetransfer.Permit
55
}
56

57
// FileTransfer creates a new filetransfer constructor
58
func FileTransfer(limits config.Limits) Constructor {
33✔
59
        return func() SessionHandler {
66✔
60
                return &FileTransferHandler{
33✔
61
                        mutex:   make(chan struct{}, 1),
33✔
62
                        msgChan: make(chan *ws.ProtoMsg),
33✔
63
                        permit:  filetransfer.NewPermit(limits),
33✔
64
                }
33✔
65
        }
33✔
66
}
67

68
func (h *FileTransferHandler) Error(msg *ws.ProtoMsg, w ResponseWriter, err error) {
102✔
69
        errMsg := err.Error()
102✔
70
        msgErr := wsft.Error{
102✔
71
                Error:       &errMsg,
102✔
72
                MessageType: &msg.Header.MsgType,
102✔
73
        }
102✔
74
        rsp := *msg
102✔
75
        rsp.Header.MsgType = wsft.MessageTypeError
102✔
76
        rsp.Body, _ = msgpack.Marshal(msgErr)
102✔
77
        w.WriteProtoMsg(&rsp) //nolint:errcheck
102✔
78
}
102✔
79

80
func (h *FileTransferHandler) Close() error {
19✔
81
        close(h.msgChan)
19✔
82
        return nil
19✔
83
}
19✔
84

85
func (h *FileTransferHandler) ServeProtoMsg(msg *ws.ProtoMsg, w ResponseWriter) {
226✔
86
        switch msg.Header.MsgType {
226✔
87
        case wsft.MessageTypePut:
13✔
88
                _ = h.InitFileUpload(msg, w)
13✔
89

90
        case wsft.MessageTypeStat:
4✔
91
                h.StatFile(msg, w)
4✔
92

93
        case wsft.MessageTypeGet:
12✔
94
                _ = h.InitFileDownload(msg, w)
12✔
95

96
        case wsft.MessageTypeACK, wsft.MessageTypeChunk:
191✔
97
                // Messages are digested by async go-routine.
191✔
98
                select {
191✔
99
                // If we can grab the mutex, there are no async handlers running.
100
                case h.mutex <- struct{}{}:
80✔
101
                        <-h.mutex
80✔
102
                        h.Error(msg, w, errors.New("no file transfer in progress"))
80✔
103

104
                case h.msgChan <- msg:
111✔
105
                }
106

107
        case wsft.MessageTypeError:
5✔
108
                select {
5✔
109
                // If there's an active async handler, pass the error down,
110
                // otherwise, log the error.
111
                case h.mutex <- struct{}{}:
2✔
112
                        <-h.mutex
2✔
113
                        var erro wsft.Error
2✔
114
                        err := msgpack.Unmarshal(msg.Body, &erro)
2✔
115
                        if err != nil {
3✔
116
                                log.Errorf("Error decoding error message from client: %s", err.Error())
1✔
117
                        } else {
2✔
118
                                log.Errorf("Received error from client: %s", *erro.Error)
1✔
119
                        }
1✔
120
                case h.msgChan <- msg:
3✔
121
                }
122

123
        default:
1✔
124
                h.Error(msg, w, errors.Errorf(
1✔
125
                        "session: filetransfer message type '%s' not supported",
1✔
126
                        msg.Header.MsgType,
1✔
127
                ))
1✔
128
        }
129
}
130

131
func (h *FileTransferHandler) StatFile(msg *ws.ProtoMsg, w ResponseWriter) {
4✔
132
        var params model.StatFile
4✔
133
        err := msgpack.Unmarshal(msg.Body, &params)
4✔
134
        if err != nil {
5✔
135
                h.Error(msg, w, errors.Wrap(err, "malformed request parameters"))
1✔
136
                return
1✔
137
        } else if err = params.Validate(); err != nil {
5✔
138
                h.Error(msg, w, errors.Wrap(err, "invalid request parameters"))
1✔
139
                return
1✔
140
        }
1✔
141
        stat, err := os.Stat(*params.Path)
2✔
142
        if err != nil {
3✔
143
                h.Error(msg, w, errors.Wrapf(err,
1✔
144
                        "failed to get file info from path '%s'", *params.Path))
1✔
145
                return
1✔
146
        }
1✔
147
        mode := uint32(stat.Mode())
1✔
148
        size := stat.Size()
1✔
149
        modTime := stat.ModTime()
1✔
150
        fileInfo := wsft.FileInfo{
1✔
151
                Path:    params.Path,
1✔
152
                Size:    &size,
1✔
153
                Mode:    &mode,
1✔
154
                ModTime: &modTime,
1✔
155
        }
1✔
156
        if statT, ok := stat.Sys().(*syscall.Stat_t); ok {
2✔
157
                // Only return UID/GID if the filesystem/OS supports it
1✔
158
                fileInfo.UID = &statT.Uid
1✔
159
                fileInfo.GID = &statT.Gid
1✔
160
        }
1✔
161
        b, _ := msgpack.Marshal(fileInfo)
1✔
162

1✔
163
        err = w.WriteProtoMsg(&ws.ProtoMsg{
1✔
164
                Header: ws.ProtoHdr{
1✔
165
                        Proto:     ws.ProtoTypeFileTransfer,
1✔
166
                        MsgType:   wsft.MessageTypeFileInfo,
1✔
167
                        SessionID: msg.Header.SessionID,
1✔
168
                },
1✔
169
                Body: b,
1✔
170
        })
1✔
171
        if err != nil {
1✔
172
                log.Errorf("error sending FileInfo to client: %s", err.Error())
×
173
        }
×
174
}
175

176
// chunkWriter is used for packaging writes into ProtoMsg chunks before
177
// sending it on the connection.
178
type chunkWriter struct {
179
        SessionID string
180
        Offset    int64
181
        W         ResponseWriter
182
}
183

184
func (c *chunkWriter) Write(b []byte) (int, error) {
27✔
185
        msg := ws.ProtoMsg{
27✔
186
                Header: ws.ProtoHdr{
27✔
187
                        Proto:     ws.ProtoTypeFileTransfer,
27✔
188
                        MsgType:   wsft.MessageTypeChunk,
27✔
189
                        SessionID: c.SessionID,
27✔
190
                        Properties: map[string]interface{}{
27✔
191
                                "offset": c.Offset,
27✔
192
                        },
27✔
193
                },
27✔
194
                Body: b,
27✔
195
        }
27✔
196
        err := c.W.WriteProtoMsg(&msg)
27✔
197
        if err != nil {
27✔
198
                return 0, err
×
199
        }
×
200
        c.Offset += int64(len(b))
27✔
201
        return len(b), err
27✔
202
}
203

204
func (h *FileTransferHandler) InitFileDownload(msg *ws.ProtoMsg, w ResponseWriter) (err error) {
12✔
205
        var params model.GetFile
12✔
206
        defer func() {
24✔
207
                if err != nil {
17✔
208
                        log.Error(err.Error())
5✔
209
                        h.Error(msg, w, err)
5✔
210
                }
5✔
211
        }()
212
        if err = msgpack.Unmarshal(msg.Body, &params); err != nil {
13✔
213
                err = errors.Wrap(err, "malformed request parameters")
1✔
214
                return err
1✔
215
        } else if err = params.Validate(); err != nil {
13✔
216
                err = errors.Wrap(err, "invalid request parameters")
1✔
217
                return err
1✔
218
        } else if err = h.permit.DownloadFile(params); err != nil {
12✔
219
                log.Warnf("file download access denied: %s", err.Error())
1✔
220
                err = errors.Wrap(err, "access denied")
1✔
221
                return err
1✔
222
        }
1✔
223
        belowLimit := h.permit.BytesSent(uint64(0))
9✔
224
        if !belowLimit {
9✔
225
                log.Warnf("file download tx bytes limit reached.")
×
226
                return filetransfer.ErrTxBytesLimitExhausted
×
227
        }
×
228

229
        fd, err := os.Open(*params.Path)
9✔
230
        if err != nil {
10✔
231
                err = errors.Wrap(err, "failed to open file for reading")
1✔
232
                return err
1✔
233
        }
1✔
234
        select {
8✔
235
        case h.mutex <- struct{}{}:
7✔
236
                go h.DownloadHandler(fd, msg, w) //nolint:errcheck
7✔
237
        default:
1✔
238
                errClose := fd.Close()
1✔
239
                if errClose != nil {
1✔
240
                        log.Warnf("error closing file: %s", err.Error())
×
241
                }
×
242
                return errors.New("another file transfer is in progress")
1✔
243
        }
244
        return nil
7✔
245
}
246

247
func (h *FileTransferHandler) DownloadHandler(
248
        fd *os.File,
249
        msg *ws.ProtoMsg,
250
        w ResponseWriter,
251
) (err error) {
7✔
252
        var (
7✔
253
                ackOffset int64
7✔
254
                N         int64
7✔
255
        )
7✔
256
        defer func() {
14✔
257
                errClose := fd.Close()
7✔
258
                if errClose != nil {
7✔
259
                        log.Warnf("error closing file descriptor: %s", errClose.Error())
×
260
                }
×
261
                if err != nil && err != errFileTransferAbort {
10✔
262
                        h.Error(msg, w, err)
3✔
263
                        log.Error(err.Error())
3✔
264
                }
3✔
265
                <-h.mutex
7✔
266
        }()
267

268
        chunker := &chunkWriter{
7✔
269
                SessionID: msg.Header.SessionID,
7✔
270
                W:         w,
7✔
271
        }
7✔
272

7✔
273
        waitAck := func() (*ws.ProtoMsg, error) {
36✔
274
                msg, open := <-h.msgChan
29✔
275
                if !open {
29✔
276
                        return nil, errFileTransferAbort
×
277
                }
×
278
                switch msg.Header.MsgType {
29✔
279
                case wsft.MessageTypeACK:
26✔
280

281
                case wsft.MessageTypeError:
2✔
282
                        var erro wsft.Error
2✔
283
                        msgpack.Unmarshal(msg.Body, &erro) //nolint:errcheck
2✔
284
                        if erro.Error != nil {
3✔
285
                                log.Errorf("received error message from client: %s", *erro.Error)
1✔
286
                        } else {
2✔
287
                                log.Error("received malformed error message from client: aborting")
1✔
288
                        }
1✔
289
                        return msg, errFileTransferAbort
2✔
290

291
                default:
1✔
292
                        return msg, errors.Errorf(
1✔
293
                                "received unexpected message type '%s'; expected 'ack'",
1✔
294
                                msg.Header.MsgType,
1✔
295
                        )
1✔
296
                }
297
                if off, ok := msg.Header.Properties["offset"]; ok {
51✔
298
                        t, ok := off.(int64)
25✔
299
                        if !ok {
26✔
300
                                return msg, errors.New("invalid offset data type: require int64")
1✔
301
                        }
1✔
302
                        ackOffset = t
24✔
303
                } else {
1✔
304
                        return msg, errors.New("ack message: offset property cannot be blank")
1✔
305
                }
1✔
306
                return msg, nil
24✔
307
        }
308

309
        buf := make([]byte, FileTransferBufSize)
7✔
310
        for {
17✔
311
                windowBytes := ackOffset - chunker.Offset +
10✔
312
                        ACKSlidingWindowRecv*FileTransferBufSize
10✔
313
                if windowBytes > 0 {
19✔
314
                        N, err = io.CopyBuffer(chunker, io.LimitReader(fd, windowBytes), buf)
9✔
315
                        if err != nil {
9✔
316
                                err = errors.Wrap(err, "failed to copy file chunk to session")
×
317
                                return err
×
318
                        }
×
319
                        belowLimit := h.permit.BytesSent(uint64(N))
9✔
320
                        if !belowLimit {
9✔
321
                                log.Warnf("file download tx bytes limit reached.")
×
322
                                return filetransfer.ErrTxBytesLimitExhausted
×
323
                        }
×
324
                        if N < windowBytes {
16✔
325
                                break
7✔
326
                        }
327
                }
328

329
                msg, err = waitAck()
3✔
330
                if err != nil {
3✔
331
                        return err
×
332
                }
×
333
        }
334

335
        // Send EOF chunk
336
        err = w.WriteProtoMsg(&ws.ProtoMsg{
7✔
337
                Header: ws.ProtoHdr{
7✔
338
                        Proto:     ws.ProtoTypeFileTransfer,
7✔
339
                        MsgType:   wsft.MessageTypeChunk,
7✔
340
                        SessionID: msg.Header.SessionID,
7✔
341
                        Properties: map[string]interface{}{
7✔
342
                                "offset": chunker.Offset,
7✔
343
                        },
7✔
344
                },
7✔
345
        })
7✔
346
        if err != nil {
7✔
347
                log.Errorf("failed to send EOF message to client: %s", err.Error())
×
348
                return err
×
349
        }
×
350

351
        for ackOffset < chunker.Offset {
33✔
352
                msg, err = waitAck()
26✔
353
                if err != nil {
31✔
354
                        return err
5✔
355
                }
5✔
356
        }
357
        return nil
2✔
358
}
359

360
func (h *FileTransferHandler) InitFileUpload(msg *ws.ProtoMsg, w ResponseWriter) (err error) {
13✔
361
        var (
13✔
362
                defaultMode uint32 = 0644
13✔
363
                defaultUID  uint32 = uint32(os.Getuid())
13✔
364
                defaultGID  uint32 = uint32(os.Getgid())
13✔
365
        )
13✔
366
        params := model.UploadRequest{
13✔
367
                UID:  &defaultUID,
13✔
368
                GID:  &defaultGID,
13✔
369
                Mode: &defaultMode,
13✔
370
        }
13✔
371
        defer func() {
26✔
372
                if err != nil {
18✔
373
                        h.Error(msg, w, err)
5✔
374
                }
5✔
375
        }()
376

377
        err = msgpack.Unmarshal(msg.Body, &params)
13✔
378
        log.Debugf("InitFileUpload getting upload file: %+v", params)
13✔
379
        if err != nil {
14✔
380
                return errors.Wrap(err, "malformed request parameters")
1✔
381
        } else if err = params.Validate(); err != nil {
14✔
382
                return errors.Wrap(err, "invalid request parameters")
1✔
383
        } else if err = h.permit.UploadFile(params); err != nil {
13✔
384
                return errors.Wrap(err, "access denied")
1✔
385
        }
1✔
386

387
        belowLimit := h.permit.BytesReceived(uint64(0))
10✔
388
        if !belowLimit {
10✔
389
                log.Warnf("file upload rx bytes limit reached.")
×
390
                err = filetransfer.ErrTxBytesLimitExhausted
×
391
                return filetransfer.ErrTxBytesLimitExhausted
×
392
        }
×
393

394
StatAgain:
395
        if info, errStat := os.Lstat(*params.Path); errStat != nil {
19✔
396
                if !os.IsNotExist(errStat) {
8✔
397
                        err = errors.Wrap(errStat, "error checking file location")
×
398
                        return err
×
399
                }
×
400
        } else if info.IsDir() {
5✔
401
                if params.SrcPath != nil {
3✔
402
                        *params.Path = path.Join(*params.Path, path.Base(*params.SrcPath))
1✔
403
                        params.SrcPath = nil
1✔
404
                        goto StatAgain
1✔
405
                } else {
1✔
406
                        err = errors.New("conflicting file path: cannot overwrite directory")
1✔
407
                        return err
1✔
408
                }
1✔
409
        } else if !info.Mode().IsRegular() {
1✔
410
                err = errors.New(
×
411
                        "conflicting file path: cannot overwrite irregular file",
×
412
                )
×
413
                return err
×
414
        }
×
415

416
        select {
9✔
417
        case h.mutex <- struct{}{}:
8✔
418
                go h.FileUploadHandler(msg, params, w) //nolint:errcheck
8✔
419
        default:
1✔
420
                err = errors.New("another file transfer is in progress")
1✔
421
                return err
1✔
422
        }
423
        return nil
8✔
424
}
425

426
var atomicSuffix uint32
427

428
func createWrOnlyTempFile(params model.UploadRequest) (fd *os.File, err error) {
8✔
429
        for i := 0; i < 100; i++ {
16✔
430
                suffix := atomic.AddUint32(&atomicSuffix, 1)
8✔
431
                filename := *params.Path + fmt.Sprintf(".%08x%02x", suffix, i)
8✔
432
                fd, err = os.OpenFile(filename, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0200)
8✔
433
                if os.IsExist(err) {
8✔
434
                        continue
×
435
                } else if err != nil {
9✔
436
                        break
1✔
437
                }
438
                break
7✔
439
        }
440
        if err != nil {
9✔
441
                return nil, errors.Wrap(err, "failed to create file")
1✔
442
        }
1✔
443
        return fd, nil
7✔
444
}
445

446
func (h *FileTransferHandler) FileUploadHandler(
447
        msg *ws.ProtoMsg,
448
        params model.UploadRequest,
449
        w ResponseWriter,
450
) (err error) {
8✔
451
        var (
8✔
452
                fd      *os.File
8✔
453
                closeFd bool
8✔
454
        )
8✔
455
        defer func() {
16✔
456
                if fd != nil {
13✔
457
                        if closeFd {
10✔
458
                                errClose := fd.Close()
5✔
459
                                if errClose != nil {
5✔
460
                                        log.Warnf("error closing file: %s", errClose.Error())
×
461
                                }
×
462
                        }
463
                        errRm := os.Remove(fd.Name())
5✔
464
                        if errRm != nil {
5✔
465
                                log.Errorf(
×
466
                                        "error removing file after aborting upload: %s",
×
467
                                        errRm.Error(),
×
468
                                )
×
469
                        }
×
470
                }
471
                if err != nil {
14✔
472
                        log.Error(err.Error())
6✔
473
                        if errors.Cause(err) != errFileTransferAbort {
10✔
474
                                h.Error(msg, w, err)
4✔
475
                        }
4✔
476
                }
477
                <-h.mutex
8✔
478
        }()
479

480
        fd, err = createWrOnlyTempFile(params)
8✔
481
        if err != nil {
9✔
482
                h.Error(msg, w, errors.Wrap(err, "failed to create target file"))
1✔
483
                return err
1✔
484
        }
1✔
485
        closeFd = true
7✔
486

7✔
487
        err = w.WriteProtoMsg(&ws.ProtoMsg{
7✔
488
                Header: ws.ProtoHdr{
7✔
489
                        Proto:      ws.ProtoTypeFileTransfer,
7✔
490
                        MsgType:    wsft.MessageTypeACK,
7✔
491
                        SessionID:  msg.Header.SessionID,
7✔
492
                        Properties: map[string]interface{}{"offset": int64(0)},
7✔
493
                },
7✔
494
        })
7✔
495
        if err != nil {
8✔
496
                log.Errorf("failed to respond to client: %s", err.Error())
1✔
497
                return errFileTransferAbort
1✔
498
        }
1✔
499

500
        _, err = h.writeFile(w, fd)
6✔
501
        if err != nil {
10✔
502
                return err
4✔
503
        }
4✔
504
        // Set the final permissions and owner.
505
        err = fd.Chmod(os.FileMode(*params.Mode) & os.ModePerm)
2✔
506
        if err != nil {
2✔
507
                return errors.Wrap(err, "failed to set file permissions")
×
508
        }
×
509
        err = fd.Chown(int(*params.UID), int(*params.GID))
2✔
510
        if err != nil {
2✔
511
                return errors.Wrap(err, "failed to set file owner")
×
512
        }
×
513

514
        closeFd = false
2✔
515
        filename := fd.Name()
2✔
516
        errClose := fd.Close()
2✔
517
        if errClose != nil {
2✔
518
                log.Warnf("error closing file: %s", errClose.Error())
×
519
        }
×
520
        err = os.Rename(filename, *params.Path)
2✔
521
        if err != nil {
2✔
522
                return errors.Wrap(err, "failed to commit uploaded file")
×
523
        }
×
524

525
        err = h.permit.PreserveOwnerGroup(*params.Path, int(*params.UID), int(*params.GID))
2✔
526
        if err != nil {
2✔
527
                return errors.Wrap(err, "failed to preserve file owner/group "+
×
528
                        strconv.Itoa(int(*params.UID))+"/"+strconv.Itoa(int(*params.GID)))
×
529
        }
×
530

531
        err = h.permit.PreserveModes(*params.Path, os.FileMode(*params.Mode))
2✔
532
        if err != nil {
2✔
533
                return errors.Wrap(err, "failed to preserve file mode "+
×
534
                        "("+os.FileMode(*params.Mode).String()+")")
×
535
        }
×
536

537
        fd = nil
2✔
538
        return err
2✔
539
}
540

541
func (h *FileTransferHandler) dstWrite(dst *os.File, body []byte, offset int64) (int, error) {
79✔
542
        n, err := dst.Write(body)
79✔
543
        offset += int64(n)
79✔
544
        belowLimit := h.permit.BytesReceived(uint64(n))
79✔
545
        if !belowLimit || !h.permit.BelowMaxAllowedFileSize(offset) {
79✔
546
                log.Warnf("file upload rx bytes limit reached.")
×
547
                return n, filetransfer.ErrTxBytesLimitExhausted
×
548
        } else {
79✔
549
                return n, err
79✔
550
        }
79✔
551
}
552

553
func (h *FileTransferHandler) writeFile(w ResponseWriter, dst *os.File) (int64, error) {
6✔
554
        var (
6✔
555
                done   bool
6✔
556
                open   bool
6✔
557
                err    error
6✔
558
                i      int
6✔
559
                offset int64
6✔
560
                msg    *ws.ProtoMsg
6✔
561
        )
6✔
562
        // Convenience clojure for decoding file chunk and writing to destination file.
6✔
563
        writeChunk := func(msg *ws.ProtoMsg) error {
91✔
564
                switch msg.Header.MsgType {
85✔
565
                case wsft.MessageTypeChunk:
83✔
566

567
                case wsft.MessageTypeError:
1✔
568
                        var cerr wsft.Error
1✔
569
                        packErr := msgpack.Unmarshal(msg.Body, &cerr)
1✔
570
                        if packErr == nil && cerr.Error != nil {
2✔
571
                                log.Errorf("Received error during upload: %s", *cerr.Error)
1✔
572
                        } else {
1✔
573
                                log.Error("Received malformed error during upload: aborting")
×
574
                        }
×
575
                        return errFileTransferAbort
1✔
576

577
                default:
1✔
578
                        return errors.Errorf(
1✔
579
                                "received unexpected message type '%s' during file upload",
1✔
580
                                msg.Header.MsgType,
1✔
581
                        )
1✔
582
                }
583
                chunkOffset, ok := msg.Header.Properties["offset"].(int64)
83✔
584
                if !ok {
84✔
585
                        return errors.New("invalid file chunk message: missing offset property")
1✔
586
                } else {
83✔
587
                        if chunkOffset != offset {
83✔
588
                                return errors.New("received unexpected chunk offset")
1✔
589
                        }
1✔
590
                }
591
                if len(msg.Body) > 0 {
160✔
592
                        n, err := h.dstWrite(dst, msg.Body, offset)
79✔
593
                        offset += int64(n)
79✔
594
                        if err != nil {
79✔
595
                                return errors.Wrap(err, "failed to write file chunk")
×
596
                        }
×
597
                } else {
2✔
598
                        // EOF
2✔
599
                        return io.EOF
2✔
600
                }
2✔
601
                return nil
79✔
602
        }
603

604
        for !done {
20✔
605
                msg, open = <-h.msgChan
14✔
606
                if !open {
14✔
607
                        return offset, errFileTransferAbort
×
608
                }
×
609
                err = writeChunk(msg)
14✔
610
                if err == io.EOF {
14✔
611
                        done = true
×
612
                } else if err != nil {
18✔
613
                        return offset, err
4✔
614
                }
4✔
615
                // Receive up to ACKSlidingWindowSend file chunks before
616
                // responding with an ACK.
617
        InnerLoop:
10✔
618
                for i = 1; i < ACKSlidingWindowSend; i++ {
84✔
619
                        runtime.Gosched()
74✔
620
                        select {
74✔
621
                        case msg, open = <-h.msgChan:
71✔
622
                                if !open {
71✔
623
                                        return offset, errFileTransferAbort
×
624
                                }
×
625
                                err = writeChunk(msg)
71✔
626
                                if err == io.EOF {
73✔
627
                                        done = true
2✔
628
                                } else if err != nil {
71✔
629
                                        return offset, err
×
630
                                }
×
631
                        default:
3✔
632
                                break InnerLoop
3✔
633
                        }
634
                }
635

636
                // Copy message headers to response and change message type to ACK.
637
                rsp := &ws.ProtoMsg{Header: msg.Header}
10✔
638
                rsp.Header.MsgType = wsft.MessageTypeACK
10✔
639
                err = w.WriteProtoMsg(rsp)
10✔
640
                if err != nil {
10✔
641
                        log.Errorf("failed to ack file chunk: %s", err.Error())
×
642
                        return offset, errFileTransferAbort
×
643
                }
×
644
        }
645
        return offset, nil
2✔
646
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc