• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1057599036

01 Nov 2023 12:43PM UTC coverage: 80.276% (+0.07%) from 80.207%
1057599036

push

gitlab-ci

kacf
fix: Make sure Cancel is called if resumer body reader is destroyed.

This was already the case in most cases, since the inner reader would
call cancel, but not in case the reader was destroyed in between
retries, in which case there is no inner reader.

Changelog: None
Ticket: None

Signed-off-by: Kristian Amlie <kristian.amlie@northern.tech>

6 of 6 new or added lines in 1 file covered. (100.0%)

6911 of 8609 relevant lines covered (80.28%)

9347.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.95
/src/common/io/io.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/io.hpp>
16

17
#include <config.h>
18

19
#include <cerrno>
20
#include <cstdint>
21
#include <cstring>
22
#include <istream>
23
#include <memory>
24
#include <streambuf>
25
#include <vector>
26
#include <fstream>
27

28
namespace mender {
29
namespace common {
30
namespace io {
31

32
namespace error = mender::common::error;
33
namespace expected = mender::common::expected;
34

35
void AsyncReader::RepeatedAsyncRead(
3✔
36
        vector<uint8_t>::iterator start,
37
        vector<uint8_t>::iterator end,
38
        RepeatedAsyncIoHandler handler) {
39
        class Functor {
8,453✔
40
        public:
41
                AsyncReader &reader;
42
                vector<uint8_t>::iterator start;
43
                vector<uint8_t>::iterator end;
44
                RepeatedAsyncIoHandler handler;
45
                void ScheduleNextRead(Repeat repeat) {
3,173✔
46
                        while (repeat == Repeat::Yes) {
3,173✔
47
                                auto err = reader.AsyncRead(start, end, *this);
6,340✔
48
                                if (err == error::NoError) {
3,170✔
49
                                        break;
50
                                } else {
51
                                        repeat = handler(expected::unexpected(err));
×
52
                                }
53
                        }
54
                }
3,173✔
55
                void operator()(ExpectedSize num_read) {
3,170✔
56
                        auto repeat = handler(num_read);
3,170✔
57
                        ScheduleNextRead(repeat);
3,170✔
58
                }
3,170✔
59
        };
60
        Functor func {*this, start, end, handler};
3✔
61
        func.ScheduleNextRead(Repeat::Yes);
3✔
62
}
3✔
63

64
Error Copy(Writer &dst, Reader &src) {
198✔
65
        vector<uint8_t> buffer(MENDER_BUFSIZE);
198✔
66
        return Copy(dst, src, buffer);
396✔
67
}
68

69
Error Copy(Writer &dst, Reader &src, vector<uint8_t> &buffer) {
198✔
70
        while (true) {
71
                auto r_result = src.Read(buffer.begin(), buffer.end());
1,399✔
72
                if (!r_result) {
1,399✔
73
                        return r_result.error();
3✔
74
                } else if (r_result.value() == 0) {
1,396✔
75
                        return NoError;
192✔
76
                } else if (r_result.value() > buffer.size()) {
1,204✔
77
                        return error::MakeError(
78
                                error::ProgrammingError,
79
                                "Read returned more bytes than requested. This is a bug in the Read function.");
×
80
                }
81

82
                auto w_result = dst.Write(buffer.cbegin(), buffer.cbegin() + r_result.value());
1,204✔
83
                if (!w_result) {
1,204✔
84
                        return w_result.error();
1✔
85
                } else if (w_result.value() == 0) {
1,203✔
86
                        // Should this even happen?
87
                        return Error(std::error_condition(std::errc::io_error), "Zero write when copying data");
2✔
88
                } else if (r_result.value() != w_result.value()) {
1,202✔
89
                        return Error(
90
                                std::error_condition(std::errc::io_error), "Short write when copying data");
2✔
91
                }
92
        }
93
}
94

95
struct CopyData {
112✔
96
        CopyData(size_t limit) :
112✔
97
                buf(MENDER_BUFSIZE),
98
                limit {limit} {
112✔
99
        }
112✔
100

101
        vector<uint8_t> buf;
102
        size_t copied {0};
103
        size_t limit;
104
};
105

106
void AsyncCopy(
1✔
107
        Writer &dst, AsyncReader &src, function<void(Error)> finished_handler, size_t stop_after) {
108
        AsyncCopy(
1✔
109
                WriterPtr(&dst, [](Writer *) {}),
1✔
110
                AsyncReaderPtr(&src, [](AsyncReader *) {}),
1✔
111
                finished_handler,
112
                stop_after);
1✔
113
}
1✔
114

115
void AsyncCopy(
100✔
116
        WriterPtr dst, AsyncReaderPtr src, function<void(Error)> finished_handler, size_t stop_after) {
117
        auto data = make_shared<CopyData>(stop_after);
100✔
118
        class Functor {
119
        public:
120
                void operator()(ExpectedSize size) {
2,226✔
121
                        if (!size) {
2,226✔
122
                                CallFinishedHandler(size.error());
9✔
123
                                return;
79✔
124
                        }
125

126

127
                        if (*size == 0) {
2,217✔
128
                                CallFinishedHandler(error::NoError);
66✔
129
                                return;
66✔
130
                        }
131

132
                        auto n = writer->Write(data->buf.begin(), data->buf.begin() + *size);
2,151✔
133
                        if (!n) {
2,151✔
134
                                CallFinishedHandler(n.error());
2✔
135
                                return;
136
                        } else if (*n != *size) {
2,149✔
137
                                CallFinishedHandler(Error(
×
138
                                        make_error_condition(std::errc::io_error), "Short write when copying data"));
×
139
                                return;
×
140
                        }
141

142
                        data->copied += *n;
2,149✔
143

144
                        size_t to_copy = min(data->limit - data->copied, data->buf.size());
2,149✔
145
                        if (to_copy == 0) {
2,149✔
146
                                CallFinishedHandler(error::NoError);
2✔
147
                                return;
148
                        }
149

150
                        auto err = reader->AsyncRead(
151
                                data->buf.begin(),
152
                                data->buf.begin() + to_copy,
153
                                Functor {writer, reader, data, finished_handler});
6,441✔
154
                        if (err != error::NoError) {
2,147✔
155
                                CallFinishedHandler(err);
×
156
                        }
157
                }
158

159
                void CallFinishedHandler(const error::Error &err) {
79✔
160
                        // Sometimes the functor is kept on the event loop longer than we expect. It
161
                        // will eventually be destroyed, but destroy what we own now, so that
162
                        // we don't keep references to them after we're done.
163
                        auto handler = finished_handler;
79✔
164
                        *this = {};
79✔
165
                        handler(err);
158✔
166
                }
79✔
167

168
                WriterPtr writer;
169
                AsyncReaderPtr reader;
170
                shared_ptr<CopyData> data;
171
                function<void(Error)> finished_handler;
172
        };
173
        size_t to_copy = min(data->limit, data->buf.size());
200✔
174
        auto err = src->AsyncRead(
175
                data->buf.begin(), data->buf.begin() + to_copy, Functor {dst, src, data, finished_handler});
300✔
176
        if (err != error::NoError) {
100✔
177
                finished_handler(err);
×
178
        }
179
}
100✔
180

181
void AsyncCopy(
×
182
        AsyncWriter &dst, Reader &src, function<void(Error)> finished_handler, size_t stop_after) {
183
        AsyncCopy(
×
184
                AsyncWriterPtr(&dst, [](AsyncWriter *) {}),
×
185
                ReaderPtr(&src, [](Reader *) {}),
×
186
                finished_handler,
187
                stop_after);
×
188
}
×
189

190
void AsyncCopy(
4✔
191
        AsyncWriterPtr dst, ReaderPtr src, function<void(Error)> finished_handler, size_t stop_after) {
192
        auto data = make_shared<CopyData>(stop_after);
4✔
193

194
        class Functor {
195
        public:
196
                void operator()(ExpectedSize exp_written) {
157✔
197
                        if (!exp_written) {
157✔
198
                                CallFinishedHandler(exp_written.error());
×
199
                                return;
2✔
200
                        } else if (exp_written.value() != expected_written) {
157✔
201
                                CallFinishedHandler(Error(
×
202
                                        make_error_condition(std::errc::io_error), "Short write when copying data"));
×
203
                                return;
×
204
                        }
205

206
                        data->copied += *exp_written;
157✔
207

208
                        size_t to_copy = min(data->limit - data->copied, data->buf.size());
157✔
209
                        if (to_copy == 0) {
157✔
210
                                CallFinishedHandler(error::NoError);
×
211
                                return;
×
212
                        }
213

214
                        auto exp_read = reader->Read(data->buf.begin(), data->buf.begin() + to_copy);
157✔
215
                        if (!exp_read) {
157✔
216
                                CallFinishedHandler(exp_read.error());
×
217
                                return;
218
                        }
219
                        auto &read = exp_read.value();
157✔
220

221
                        if (read == 0) {
157✔
222
                                CallFinishedHandler(error::NoError);
2✔
223
                                return;
224
                        }
225

226
                        auto err = writer->AsyncWrite(
227
                                data->buf.begin(),
228
                                data->buf.begin() + read,
155✔
229
                                Functor {writer, reader, finished_handler, data, read});
465✔
230
                        if (err != error::NoError) {
155✔
231
                                CallFinishedHandler(err);
×
232
                        }
233
                }
234

235
                void CallFinishedHandler(const error::Error &err) {
2✔
236
                        // Sometimes the functor is kept on the event loop longer than we expect. It
237
                        // will eventually be destroyed, but destroy what we own now, so that
238
                        // we don't keep references to them after we're done.
239
                        auto handler = finished_handler;
2✔
240
                        *this = {};
2✔
241
                        handler(err);
4✔
242
                }
2✔
243

244
                AsyncWriterPtr writer;
245
                ReaderPtr reader;
246
                function<void(Error)> finished_handler;
247
                shared_ptr<CopyData> data;
248
                size_t expected_written;
249
        };
250

251
        Functor initial {dst, src, finished_handler, data, 0};
8✔
252
        initial(0);
8✔
253
}
4✔
254

255
void AsyncCopy(
×
256
        AsyncWriter &dst, AsyncReader &src, function<void(Error)> finished_handler, size_t stop_after) {
257
        AsyncCopy(
×
258
                AsyncWriterPtr(&dst, [](AsyncWriter *) {}),
×
259
                AsyncReaderPtr(&src, [](AsyncReader *) {}),
×
260
                finished_handler,
261
                stop_after);
×
262
}
×
263

264
class AsyncCopyReaderFunctor {
265
public:
266
        void operator()(io::ExpectedSize exp_size);
267

268
        void CallFinishedHandler(const error::Error &err) {
6✔
269
                // Sometimes the functor is kept on the event loop longer than we expect. It will
270
                // eventually be destroyed, but destroy what we own now, so that we don't keep
271
                // references to them after we're done.
272
                auto handler = finished_handler;
6✔
273
                *this = {};
6✔
274
                handler(err);
12✔
275
        }
6✔
276

277
        AsyncWriterPtr writer;
278
        AsyncReaderPtr reader;
279
        function<void(Error)> finished_handler;
280
        shared_ptr<CopyData> data;
281
};
282

283
class AsyncCopyWriterFunctor {
284
public:
285
        void operator()(io::ExpectedSize exp_size);
286

287
        void CallFinishedHandler(const error::Error &err) {
×
288
                // Sometimes the functor is kept on the event loop longer than we expect. It will
289
                // eventually be destroyed, but destroy what we own now, so that we don't keep
290
                // references to them after we're done.
291
                auto handler = finished_handler;
×
292
                *this = {};
×
293
                handler(err);
×
294
        }
×
295

296
        AsyncWriterPtr writer;
297
        AsyncReaderPtr reader;
298
        function<void(Error)> finished_handler;
299
        shared_ptr<CopyData> data;
300
        size_t expected_written;
301
};
302

303
void AsyncCopyReaderFunctor::operator()(io::ExpectedSize exp_size) {
160✔
304
        if (!exp_size) {
160✔
305
                CallFinishedHandler(exp_size.error());
6✔
306
                return;
6✔
307
        }
308
        if (exp_size.value() == 0) {
154✔
309
                CallFinishedHandler(error::NoError);
×
310
                return;
×
311
        }
312

313
        auto err = writer->AsyncWrite(
314
                data->buf.begin(),
315
                data->buf.begin() + exp_size.value(),
154✔
316
                AsyncCopyWriterFunctor {writer, reader, finished_handler, data, exp_size.value()});
616✔
317
        if (err != error::NoError) {
154✔
318
                CallFinishedHandler(err);
×
319
        }
320
}
321

322
void AsyncCopyWriterFunctor::operator()(io::ExpectedSize exp_size) {
153✔
323
        if (!exp_size) {
153✔
324
                CallFinishedHandler(exp_size.error());
×
325
                return;
×
326
        }
327
        if (exp_size.value() != expected_written) {
153✔
328
                CallFinishedHandler(
×
329
                        error::Error(make_error_condition(errc::io_error), "Short write in AsyncCopy"));
×
330
                return;
×
331
        }
332

333
        data->copied += *exp_size;
153✔
334

335
        size_t to_copy = min(data->limit - data->copied, data->buf.size());
153✔
336
        if (to_copy == 0) {
153✔
337
                CallFinishedHandler(error::NoError);
×
338
                return;
×
339
        }
340

341
        auto err = reader->AsyncRead(
342
                data->buf.begin(),
343
                data->buf.begin() + to_copy,
344
                AsyncCopyReaderFunctor {writer, reader, finished_handler, data});
612✔
345
        if (err != error::NoError) {
153✔
346
                CallFinishedHandler(err);
×
347
        }
348
}
349

350
void AsyncCopy(
8✔
351
        AsyncWriterPtr dst,
352
        AsyncReaderPtr src,
353
        function<void(Error)> finished_handler,
354
        size_t stop_after) {
355
        auto data = make_shared<CopyData>(stop_after);
8✔
356

357
        size_t to_copy = min(data->limit, data->buf.size());
16✔
358
        auto err = src->AsyncRead(
359
                data->buf.begin(),
360
                data->buf.begin() + to_copy,
361
                AsyncCopyReaderFunctor {dst, src, finished_handler, data});
32✔
362
        if (err != error::NoError) {
8✔
363
                finished_handler(err);
×
364
        }
365
}
8✔
366

367
void ByteWriter::SetUnlimited(bool enabled) {
225✔
368
        unlimited_ = enabled;
225✔
369
}
225✔
370

371
ExpectedSize ByteWriter::Write(
5,920✔
372
        vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) {
373
        assert(end > start);
374
        Vsize max_write {receiver_->size() - bytes_written_};
5,920✔
375
        if (max_write == 0 && !unlimited_) {
5,920✔
376
                return expected::unexpected(Error(make_error_condition(errc::no_space_on_device), ""));
×
377
        }
378
        Vsize iterator_size {static_cast<Vsize>(end - start)};
5,920✔
379
        Vsize bytes_to_write;
380
        if (unlimited_) {
5,920✔
381
                bytes_to_write = iterator_size;
382
                if (max_write < bytes_to_write) {
5,886✔
383
                        receiver_->resize(bytes_written_ + bytes_to_write);
5,886✔
384
                        max_write = bytes_to_write;
385
                }
386
        } else {
387
                bytes_to_write = min(iterator_size, max_write);
34✔
388
        }
389
        auto it = next(receiver_->begin(), bytes_written_);
5,920✔
390
        std::copy_n(start, bytes_to_write, it);
5,920✔
391
        bytes_written_ += bytes_to_write;
5,920✔
392
        return bytes_to_write;
393
}
394

395

396
ExpectedSize StreamWriter::Write(
13✔
397
        vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) {
398
        os_->write(reinterpret_cast<const char *>(&*start), end - start);
13✔
399
        if (!(*(os_.get()))) {
13✔
400
                return expected::unexpected(Error(make_error_condition(errc::io_error), ""));
×
401
        }
402
        return end - start;
403
}
404

405
class ReaderStreamBuffer : public streambuf {
406
public:
407
        ReaderStreamBuffer(Reader &reader) :
484✔
408
                reader_ {reader},
409
                buf_(buf_size_) {};
484✔
410
        streambuf::int_type underflow() override;
411

412
private:
413
        static const Vsize buf_size_ = MENDER_BUFSIZE;
414
        Reader &reader_;
415
        vector<uint8_t> buf_;
416
};
417

418
streambuf::int_type ReaderStreamBuffer::underflow() {
912✔
419
        // eback -- pointer to the first char (byte)
420
        // gptr  -- pointer to the current char (byte)
421
        // egptr -- pointer past the last char (byte)
422

423
        // This function is only called if gptr() == nullptr or gptr() >= egptr(),
424
        // i.e. if there's nothing more to read.
425
        if (this->gptr() >= this->egptr()) {
912✔
426
                errno = 0;
912✔
427
                auto ex_n_read = reader_.Read(buf_.begin(), buf_.end());
912✔
428
                streamsize n_read;
429
                if (ex_n_read) {
912✔
430
                        n_read = ex_n_read.value();
912✔
431
                } else {
432
                        // There is no way to return an error from underflow(), generally
433
                        // the streams only care about how much data was read. No data or
434
                        // less data then requested by the caller of istream.read() means
435
                        // eofbit and failbit are set. If the user code wants to get the
436
                        // error or check if there was an error, it needs to check errno.
437
                        //
438
                        // So as long as we don't clear errno after a failure in the
439
                        // reader_.Read() above, error handling works as usual and returning
440
                        // eof below is all that needs to happen here.
441
                        //
442
                        // In case errno is not set for some reason, let's try to get it
443
                        // from the error with a fallback to a generic I/O error.
444
                        if (errno == 0) {
×
445
                                if (ex_n_read.error().code.category() == generic_category()) {
×
446
                                        errno = ex_n_read.error().code.value();
×
447
                                } else {
448
                                        errno = EIO;
×
449
                                }
450
                        }
451
                        n_read = 0;
452
                }
453

454
                streambuf::char_type *first = reinterpret_cast<streambuf::char_type *>(buf_.data());
455

456
                // set eback, gptr, egptr
457
                this->setg(first, first, first + n_read);
912✔
458
        }
459

460
        return this->gptr() == this->egptr() ? std::char_traits<char>::eof()
912✔
461
                                                                                 : std::char_traits<char>::to_int_type(*this->gptr());
912✔
462
};
463

464
/**
465
 * A variant of the #istream class that takes ownership of the #streambuf buffer
466
 * created for it.
467
 *
468
 * @note Base #istream is designed to work on shared buffers so it doesn't
469
 *       destruct/delete the buffer.
470
 */
471
class istreamWithUniqueBuffer : public istream {
472
public:
473
        // The unique_ptr, &&buf and std::move() model this really nicely -- a
474
        // unique_ptr rvalue (i.e. temporary) is required and it's moved into the
475
        // object. The default destructor then takes care of cleaning up properly.
476
        istreamWithUniqueBuffer(unique_ptr<streambuf> &&buf) :
484✔
477
                istream(buf.get()),
478
                buf_ {std::move(buf)} {};
484✔
479

480
private:
481
        unique_ptr<streambuf> buf_;
482
};
483

484
unique_ptr<istream> Reader::GetStream() {
484✔
485
        return unique_ptr<istream>(
486
                new istreamWithUniqueBuffer(unique_ptr<ReaderStreamBuffer>(new ReaderStreamBuffer(*this))));
484✔
487
};
488

489
ExpectedIfstream OpenIfstream(const string &path) {
280✔
490
        ifstream is;
560✔
491
        errno = 0;
280✔
492
        is.open(path);
280✔
493
        if (!is) {
280✔
494
                int io_errno = errno;
8✔
495
                return ExpectedIfstream(expected::unexpected(error::Error(
8✔
496
                        generic_category().default_error_condition(io_errno),
16✔
497
                        "Failed to open '" + path + "' for reading")));
24✔
498
        }
499
        return ExpectedIfstream(std::move(is));
272✔
500
}
501

502
ExpectedSharedIfstream OpenSharedIfstream(const string &path) {
7✔
503
        auto exp_is = OpenIfstream(path);
7✔
504
        if (!exp_is) {
7✔
505
                return expected::unexpected(exp_is.error());
×
506
        }
507
        return make_shared<ifstream>(std::move(exp_is.value()));
14✔
508
}
509

510
ExpectedOfstream OpenOfstream(const string &path, bool append) {
1,042✔
511
        ofstream os;
2,084✔
512
        errno = 0;
1,042✔
513
        os.open(path, append ? ios::app : ios::out);
1,988✔
514
        if (!os) {
1,042✔
515
                int io_errno = errno;
1✔
516
                return ExpectedOfstream(expected::unexpected(error::Error(
1✔
517
                        generic_category().default_error_condition(io_errno),
2✔
518
                        "Failed to open '" + path + "' for writing")));
3✔
519
        }
520
        return os;
1,041✔
521
}
522

523
ExpectedSharedOfstream OpenSharedOfstream(const string &path, bool append) {
×
524
        auto exp_is = OpenOfstream(path, append);
×
525
        if (!exp_is) {
×
526
                return expected::unexpected(exp_is.error());
×
527
        }
528
        return make_shared<ofstream>(std::move(exp_is.value()));
×
529
}
530

531
error::Error WriteStringIntoOfstream(ofstream &os, const string &data) {
762✔
532
        errno = 0;
762✔
533
        os.write(data.data(), data.size());
762✔
534
        if (os.bad() || os.fail()) {
762✔
535
                int io_errno = errno;
1✔
536
                return error::Error(
537
                        std::generic_category().default_error_condition(io_errno),
2✔
538
                        "Failed to write data into the stream");
2✔
539
        }
540

541
        return error::NoError;
761✔
542
}
543

544
ExpectedSize StreamReader::Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) {
1,552✔
545
        is_->read(reinterpret_cast<char *>(&*start), end - start);
1,552✔
546
        if (!is_) {
1,552✔
547
                int io_error = errno;
×
548
                return expected::unexpected(
×
549
                        Error(std::generic_category().default_error_condition(io_error), ""));
×
550
        }
551
        return is_->gcount();
552
}
553

554
error::Error FileReader::Rewind() {
4✔
555
        if (!is_) {
4✔
556
                auto ex_is = OpenSharedIfstream(path_);
3✔
557
                if (!ex_is) {
3✔
558
                        return ex_is.error();
×
559
                }
560
                is_ = ex_is.value();
3✔
561
        }
562
        if (!(*is_)) {
4✔
563
                return Error(std::error_condition(std::errc::io_error), "Bad stream, cannot rewind");
×
564
        }
565
        errno = 0;
4✔
566
        is_->seekg(0, ios::beg);
4✔
567
        int io_errno = errno;
4✔
568
        if (!(*is_)) {
4✔
569
                return Error(
570
                        generic_category().default_error_condition(io_errno),
×
571
                        "Failed to seek to the beginning of the stream");
×
572
        }
573
        return error::NoError;
4✔
574
}
575

576
} // namespace io
577
} // namespace common
578
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc