• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 978727395

24 Aug 2023 11:26AM UTC coverage: 79.085% (+0.2%) from 78.84%
978727395

push

gitlab-ci

lluiscampos
feat: Implement `http::DownloadResumer`

Implement class to download the Artifact, which will react to server
disconnections or other sorts of short read by scheduling new HTTP
requests with `Range` header.

See https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests for
an introduction to the feature, and read the specification for more
details.

The user calls _once_ `AsyncCall` with the header and body handlers, and
`DownloadResumer` will call back these handlers _once_ (each). The data
is passed to the user at operation completion.

The validation of the `Content-Range` header and the cases for the unit
tests are heavily inspired by the legacy client. See:
* https://github.com/mendersoftware/mender/blob/<a class=hub.com/mendersoftware/mender/commit/d9010526d35d3ac861ea1e4210d36c2fef748ef8">d9010526d/client/update_resumer.go#L113
* https://github.com/mendersoftware/mender/blob/d9010526d35d3ac861ea1e4210d36c2fef748ef8/client/update_resumer_test.go#L197

Ticket: MEN-6498
Changelog: None

Signed-off-by: Lluis Campos <lluis.campos@northern.tech>

231 of 231 new or added lines in 3 files covered. (100.0%)

5706 of 7215 relevant lines covered (79.09%)

278.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.33
/common/io/io.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/io.hpp>
16

17
#include <config.h>
18

19
#include <cerrno>
20
#include <cstdint>
21
#include <cstring>
22
#include <istream>
23
#include <memory>
24
#include <streambuf>
25
#include <vector>
26
#include <fstream>
27

28
namespace mender {
29
namespace common {
30
namespace io {
31

32
namespace error = mender::common::error;
33
namespace expected = mender::common::expected;
34

35
void AsyncReader::RepeatedAsyncRead(
2✔
36
        vector<uint8_t>::iterator start,
37
        vector<uint8_t>::iterator end,
38
        RepeatedAsyncIoHandler handler) {
39
        class Functor {
40
        public:
41
                AsyncReader &reader;
42
                vector<uint8_t>::iterator start;
43
                vector<uint8_t>::iterator end;
44
                RepeatedAsyncIoHandler handler;
45
                void ScheduleNextRead(Repeat repeat) {
2,115✔
46
                        while (repeat == Repeat::Yes) {
2,115✔
47
                                auto err = reader.AsyncRead(start, end, *this);
4,228✔
48
                                if (err == error::NoError) {
2,114✔
49
                                        break;
2,114✔
50
                                } else {
51
                                        repeat = handler(expected::unexpected(err));
×
52
                                }
53
                        }
54
                }
2,115✔
55
                void operator()(ExpectedSize num_read) {
2,113✔
56
                        auto repeat = handler(num_read);
2,113✔
57
                        ScheduleNextRead(repeat);
2,113✔
58
                }
2,113✔
59
        };
60
        Functor func {*this, start, end, handler};
4✔
61
        func.ScheduleNextRead(Repeat::Yes);
2✔
62
}
2✔
63

64
Error Copy(Writer &dst, Reader &src) {
210✔
65
        vector<uint8_t> buffer(MENDER_BUFSIZE);
420✔
66
        return Copy(dst, src, buffer);
420✔
67
}
68

69
Error Copy(Writer &dst, Reader &src, vector<uint8_t> &buffer) {
2,444✔
70
        while (true) {
71
                auto r_result = src.Read(buffer.begin(), buffer.end());
2,444✔
72
                if (!r_result) {
2,444✔
73
                        return r_result.error();
3✔
74
                } else if (r_result.value() == 0) {
2,441✔
75
                        return NoError;
204✔
76
                } else if (r_result.value() > buffer.size()) {
2,237✔
77
                        return error::MakeError(
78
                                error::ProgrammingError,
79
                                "Read returned more bytes than requested. This is a bug in the Read function.");
×
80
                }
81

82
                auto w_result = dst.Write(buffer.cbegin(), buffer.cbegin() + r_result.value());
2,237✔
83
                if (!w_result) {
2,237✔
84
                        return w_result.error();
1✔
85
                } else if (w_result.value() == 0) {
2,236✔
86
                        // Should this even happen?
87
                        return Error(std::error_condition(std::errc::io_error), "Zero write when copying data");
2✔
88
                } else if (r_result.value() != w_result.value()) {
2,235✔
89
                        return Error(
90
                                std::error_condition(std::errc::io_error), "Short write when copying data");
2✔
91
                }
92
        }
2,234✔
93
}
94

95
ExpectedSize ByteReader::Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) {
1,109✔
96
        if (bytes_read_ == emitter_->size()) {
1,109✔
97
                return 0;
44✔
98
        }
99
        assert(end > start);
1,065✔
100
        Vsize max_read {emitter_->size() - bytes_read_};
1,065✔
101
        Vsize iterator_size {static_cast<Vsize>(end - start)};
1,065✔
102
        Vsize bytes_to_read = min(iterator_size, max_read);
1,065✔
103

104
        auto it = std::next(emitter_->begin(), bytes_read_);
1,065✔
105
        std::copy_n(it, bytes_to_read, start);
1,065✔
106
        bytes_read_ += bytes_to_read;
1,065✔
107
        return bytes_to_read;
1,065✔
108
}
109

110
void ByteWriter::SetUnlimited(bool enabled) {
219✔
111
        unlimited_ = enabled;
219✔
112
}
219✔
113

114
ExpectedSize ByteWriter::Write(
4,148✔
115
        vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) {
116
        assert(end > start);
4,148✔
117
        Vsize max_write {receiver_->size() - bytes_written_};
4,148✔
118
        if (max_write == 0 && !unlimited_) {
4,148✔
119
                return expected::unexpected(Error(make_error_condition(errc::no_space_on_device), ""));
×
120
        }
121
        Vsize iterator_size {static_cast<Vsize>(end - start)};
4,148✔
122
        Vsize bytes_to_write;
123
        if (unlimited_) {
4,148✔
124
                bytes_to_write = iterator_size;
4,115✔
125
                if (max_write < bytes_to_write) {
4,115✔
126
                        receiver_->resize(bytes_written_ + bytes_to_write);
4,115✔
127
                        max_write = bytes_to_write;
4,115✔
128
                }
129
        } else {
130
                bytes_to_write = min(iterator_size, max_write);
33✔
131
        }
132
        auto it = next(receiver_->begin(), bytes_written_);
4,148✔
133
        std::copy_n(start, bytes_to_write, it);
4,148✔
134
        bytes_written_ += bytes_to_write;
4,148✔
135
        return bytes_to_write;
4,148✔
136
}
137

138

139
ExpectedSize StreamWriter::Write(
12✔
140
        vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) {
141
        os_->write(reinterpret_cast<const char *>(&*start), end - start);
12✔
142
        if (!(*(os_.get()))) {
12✔
143
                return expected::unexpected(Error(make_error_condition(errc::io_error), ""));
×
144
        }
145
        return end - start;
24✔
146
}
147

148
class ReaderStreamBuffer : public streambuf {
149
public:
150
        ReaderStreamBuffer(Reader &reader) :
350✔
151
                reader_ {reader},
152
                buf_(buf_size_) {};
700✔
153
        streambuf::int_type underflow() override;
154

155
private:
156
        static const Vsize buf_size_ = MENDER_BUFSIZE;
157
        Reader &reader_;
158
        vector<uint8_t> buf_;
159
};
160

161
streambuf::int_type ReaderStreamBuffer::underflow() {
658✔
162
        // eback -- pointer to the first char (byte)
163
        // gptr  -- pointer to the current char (byte)
164
        // egptr -- pointer past the last char (byte)
165

166
        // This function is only called if gptr() == nullptr or gptr() >= egptr(),
167
        // i.e. if there's nothing more to read.
168
        if (this->gptr() >= this->egptr()) {
658✔
169
                errno = 0;
658✔
170
                auto ex_n_read = reader_.Read(buf_.begin(), buf_.end());
1,316✔
171
                streamsize n_read;
172
                if (ex_n_read) {
658✔
173
                        n_read = ex_n_read.value();
658✔
174
                } else {
175
                        // There is no way to return an error from underflow(), generally
176
                        // the streams only care about how much data was read. No data or
177
                        // less data then requested by the caller of istream.read() means
178
                        // eofbit and failbit are set. If the user code wants to get the
179
                        // error or check if there was an error, it needs to check errno.
180
                        //
181
                        // So as long as we don't clear errno after a failure in the
182
                        // reader_.Read() above, error handling works as usual and returning
183
                        // eof below is all that needs to happen here.
184
                        //
185
                        // In case errno is not set for some reason, let's try to get it
186
                        // from the error with a fallback to a generic I/O error.
187
                        if (errno == 0) {
×
188
                                if (ex_n_read.error().code.category() == generic_category()) {
×
189
                                        errno = ex_n_read.error().code.value();
×
190
                                } else {
191
                                        errno = EIO;
×
192
                                }
193
                        }
194
                        n_read = 0;
×
195
                }
196

197
                streambuf::char_type *first = reinterpret_cast<streambuf::char_type *>(buf_.data());
658✔
198

199
                // set eback, gptr, egptr
200
                this->setg(first, first, first + n_read);
658✔
201
        }
202

203
        return this->gptr() == this->egptr() ? std::char_traits<char>::eof()
658✔
204
                                                                                 : std::char_traits<char>::to_int_type(*this->gptr());
658✔
205
};
206

207
/**
208
 * A variant of the #istream class that takes ownership of the #streambuf buffer
209
 * created for it.
210
 *
211
 * @note Base #istream is designed to work on shared buffers so it doesn't
212
 *       destruct/delete the buffer.
213
 */
214
class istreamWithUniqueBuffer : public istream {
215
public:
216
        // The unique_ptr, &&buf and std::move() model this really nicely -- a
217
        // unique_ptr rvalue (i.e. temporary) is required and it's moved into the
218
        // object. The default destructor then takes care of cleaning up properly.
219
        istreamWithUniqueBuffer(unique_ptr<streambuf> &&buf) :
350✔
220
                istream(buf.get()),
221
                buf_ {std::move(buf)} {};
350✔
222

223
private:
224
        unique_ptr<streambuf> buf_;
225
};
226

227
unique_ptr<istream> Reader::GetStream() {
350✔
228
        return unique_ptr<istream>(
229
                new istreamWithUniqueBuffer(unique_ptr<ReaderStreamBuffer>(new ReaderStreamBuffer(*this))));
350✔
230
};
231

232
ExpectedIfstream OpenIfstream(const string &path) {
177✔
233
        ifstream is;
354✔
234
        errno = 0;
177✔
235
        is.open(path);
177✔
236
        if (!is) {
177✔
237
                int io_errno = errno;
6✔
238
                return ExpectedIfstream(expected::unexpected(error::Error(
6✔
239
                        generic_category().default_error_condition(io_errno),
6✔
240
                        "Failed to open '" + path + "' for reading")));
18✔
241
        }
242
        return ExpectedIfstream(std::move(is));
171✔
243
}
244

245
ExpectedSharedIfstream OpenSharedIfstream(const string &path) {
7✔
246
        auto exp_is = OpenIfstream(path);
14✔
247
        if (!exp_is) {
7✔
248
                return expected::unexpected(exp_is.error());
×
249
        }
250
        return make_shared<ifstream>(std::move(exp_is.value()));
14✔
251
}
252

253
ExpectedOfstream OpenOfstream(const string &path, bool append) {
623✔
254
        ofstream os;
1,246✔
255
        errno = 0;
623✔
256
        os.open(path, append ? ios::app : ios::out);
623✔
257
        if (!os) {
623✔
258
                int io_errno = errno;
1✔
259
                return ExpectedOfstream(expected::unexpected(error::Error(
1✔
260
                        generic_category().default_error_condition(io_errno),
1✔
261
                        "Failed to open '" + path + "' for writing")));
3✔
262
        }
263
        return os;
622✔
264
}
265

266
ExpectedSharedOfstream OpenSharedOfstream(const string &path, bool append) {
×
267
        auto exp_is = OpenOfstream(path, append);
×
268
        if (!exp_is) {
×
269
                return expected::unexpected(exp_is.error());
×
270
        }
271
        return make_shared<ofstream>(std::move(exp_is.value()));
×
272
}
273

274
error::Error WriteStringIntoOfstream(ofstream &os, const string &data) {
443✔
275
        errno = 0;
443✔
276
        os.write(data.data(), data.size());
443✔
277
        if (os.bad() || os.fail()) {
443✔
278
                int io_errno = errno;
1✔
279
                return error::Error(
280
                        std::generic_category().default_error_condition(io_errno),
1✔
281
                        "Failed to write data into the stream");
3✔
282
        }
283

284
        return error::NoError;
442✔
285
}
286

287
ExpectedSize StreamReader::Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) {
1,446✔
288
        is_->read(reinterpret_cast<char *>(&*start), end - start);
1,446✔
289
        if (!is_) {
1,446✔
290
                int io_error = errno;
×
291
                return expected::unexpected(
×
292
                        Error(std::generic_category().default_error_condition(io_error), ""));
×
293
        }
294
        return is_->gcount();
2,892✔
295
}
296

297
error::Error FileReader::Rewind() {
4✔
298
        if (!is_) {
4✔
299
                auto ex_is = OpenSharedIfstream(path_);
3✔
300
                if (!ex_is) {
3✔
301
                        return ex_is.error();
×
302
                }
303
                is_ = ex_is.value();
3✔
304
        }
305
        if (!(*is_)) {
4✔
306
                return Error(std::error_condition(std::errc::io_error), "Bad stream, cannot rewind");
×
307
        }
308
        errno = 0;
4✔
309
        is_->seekg(0, ios::beg);
4✔
310
        int io_errno = errno;
4✔
311
        if (!(*is_)) {
4✔
312
                return Error(
313
                        generic_category().default_error_condition(io_errno),
×
314
                        "Failed to seek to the beginning of the stream");
×
315
        }
316
        return error::NoError;
4✔
317
}
318

319
} // namespace io
320
} // namespace common
321
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc