• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1018307341

27 Sep 2023 12:04PM UTC coverage: 77.682% (-0.4%) from 78.076%
1018307341

push

gitlab-ci

vpodzime
fix: Wait repeatedly for I/O events on DBus file descriptors

ASIO's `stream_descriptor::async_read/wait/error()` all register
one-off handlers. We actually need to watch the given file
descriptors for all events until we cancel the watch.

We can do that by using functors re-registering themselves
whenever called.

Also, add a tip and commented code to DBus tests helping debug
DBus issues.

Ticket: MEN-6655
Changelog: none
Signed-off-by: Vratislav Podzimek <v.podzimek@mykolab.com>

17 of 17 new or added lines in 1 file covered. (100.0%)

6474 of 8334 relevant lines covered (77.68%)

11035.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.33
/common/io/io.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/io.hpp>
16

17
#include <config.h>
18

19
#include <cerrno>
20
#include <cstdint>
21
#include <cstring>
22
#include <istream>
23
#include <memory>
24
#include <streambuf>
25
#include <vector>
26
#include <fstream>
27

28
namespace mender {
29
namespace common {
30
namespace io {
31

32
namespace error = mender::common::error;
33
namespace expected = mender::common::expected;
34

35
void AsyncReader::RepeatedAsyncRead(
73✔
36
        vector<uint8_t>::iterator start,
37
        vector<uint8_t>::iterator end,
38
        RepeatedAsyncIoHandler handler) {
39
        class Functor {
40
        public:
41
                AsyncReader &reader;
42
                vector<uint8_t>::iterator start;
43
                vector<uint8_t>::iterator end;
44
                RepeatedAsyncIoHandler handler;
45
                void ScheduleNextRead(Repeat repeat) {
3,309✔
46
                        while (repeat == Repeat::Yes) {
3,309✔
47
                                auto err = reader.AsyncRead(start, end, *this);
6,472✔
48
                                if (err == error::NoError) {
3,236✔
49
                                        break;
3,236✔
50
                                } else {
51
                                        repeat = handler(expected::unexpected(err));
×
52
                                }
53
                        }
54
                }
3,309✔
55
                void operator()(ExpectedSize num_read) {
3,236✔
56
                        auto repeat = handler(num_read);
3,236✔
57
                        ScheduleNextRead(repeat);
3,236✔
58
                }
3,236✔
59
        };
60
        Functor func {*this, start, end, handler};
146✔
61
        func.ScheduleNextRead(Repeat::Yes);
73✔
62
}
73✔
63

64
Error Copy(Writer &dst, Reader &src) {
197✔
65
        vector<uint8_t> buffer(MENDER_BUFSIZE);
394✔
66
        return Copy(dst, src, buffer);
394✔
67
}
68

69
Error Copy(Writer &dst, Reader &src, vector<uint8_t> &buffer) {
947✔
70
        while (true) {
71
                auto r_result = src.Read(buffer.begin(), buffer.end());
947✔
72
                if (!r_result) {
947✔
73
                        return r_result.error();
3✔
74
                } else if (r_result.value() == 0) {
944✔
75
                        return NoError;
191✔
76
                } else if (r_result.value() > buffer.size()) {
753✔
77
                        return error::MakeError(
78
                                error::ProgrammingError,
79
                                "Read returned more bytes than requested. This is a bug in the Read function.");
×
80
                }
81

82
                auto w_result = dst.Write(buffer.cbegin(), buffer.cbegin() + r_result.value());
753✔
83
                if (!w_result) {
753✔
84
                        return w_result.error();
1✔
85
                } else if (w_result.value() == 0) {
752✔
86
                        // Should this even happen?
87
                        return Error(std::error_condition(std::errc::io_error), "Zero write when copying data");
2✔
88
                } else if (r_result.value() != w_result.value()) {
751✔
89
                        return Error(
90
                                std::error_condition(std::errc::io_error), "Short write when copying data");
2✔
91
                }
92
        }
750✔
93
}
94

95
void AsyncCopy(Writer &dst, AsyncReader &src, function<void(Error)> finished_handler) {
×
96
        AsyncCopy(
×
97
                WriterPtr(&dst, [](Writer *) {}),
×
98
                AsyncReaderPtr(&src, [](AsyncReader *) {}),
×
99
                finished_handler);
×
100
}
×
101

102
void AsyncCopy(WriterPtr dst, AsyncReaderPtr src, function<void(Error)> finished_handler) {
71✔
103
        auto buf = make_shared<vector<uint8_t>>(MENDER_BUFSIZE);
71✔
104
        src->RepeatedAsyncRead(
142✔
105
                buf->begin(), buf->end(), [dst, src, buf, finished_handler](ExpectedSize size) {
1,123✔
106
                        if (!size) {
1,123✔
107
                                finished_handler(size.error());
6✔
108
                                return Repeat::No;
6✔
109
                        }
110

111
                        if (*size == 0) {
1,117✔
112
                                finished_handler(error::NoError);
63✔
113
                                return Repeat::No;
63✔
114
                        }
115

116
                        auto n = dst->Write(buf->begin(), buf->begin() + *size);
2,108✔
117
                        if (!n) {
1,054✔
118
                                finished_handler(n.error());
2✔
119
                                return Repeat::No;
2✔
120
                        } else if (*n != *size) {
1,052✔
121
                                finished_handler(Error(
×
122
                                        make_error_condition(std::errc::io_error), "Short write when copying data"));
×
123
                                return Repeat::No;
×
124
                        }
125

126
                        return Repeat::Yes;
1,052✔
127
                });
142✔
128
}
71✔
129

130
void ByteWriter::SetUnlimited(bool enabled) {
187✔
131
        unlimited_ = enabled;
187✔
132
}
187✔
133

134
ExpectedSize ByteWriter::Write(
3,523✔
135
        vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) {
136
        assert(end > start);
3,523✔
137
        Vsize max_write {receiver_->size() - bytes_written_};
3,523✔
138
        if (max_write == 0 && !unlimited_) {
3,523✔
139
                return expected::unexpected(Error(make_error_condition(errc::no_space_on_device), ""));
×
140
        }
141
        Vsize iterator_size {static_cast<Vsize>(end - start)};
3,523✔
142
        Vsize bytes_to_write;
143
        if (unlimited_) {
3,523✔
144
                bytes_to_write = iterator_size;
3,490✔
145
                if (max_write < bytes_to_write) {
3,490✔
146
                        receiver_->resize(bytes_written_ + bytes_to_write);
3,490✔
147
                        max_write = bytes_to_write;
3,490✔
148
                }
149
        } else {
150
                bytes_to_write = min(iterator_size, max_write);
33✔
151
        }
152
        auto it = next(receiver_->begin(), bytes_written_);
3,523✔
153
        std::copy_n(start, bytes_to_write, it);
3,523✔
154
        bytes_written_ += bytes_to_write;
3,523✔
155
        return bytes_to_write;
3,523✔
156
}
157

158

159
ExpectedSize StreamWriter::Write(
12✔
160
        vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) {
161
        os_->write(reinterpret_cast<const char *>(&*start), end - start);
12✔
162
        if (!(*(os_.get()))) {
12✔
163
                return expected::unexpected(Error(make_error_condition(errc::io_error), ""));
×
164
        }
165
        return end - start;
24✔
166
}
167

168
class ReaderStreamBuffer : public streambuf {
169
public:
170
        ReaderStreamBuffer(Reader &reader) :
480✔
171
                reader_ {reader},
172
                buf_(buf_size_) {};
960✔
173
        streambuf::int_type underflow() override;
174

175
private:
176
        static const Vsize buf_size_ = MENDER_BUFSIZE;
177
        Reader &reader_;
178
        vector<uint8_t> buf_;
179
};
180

181
streambuf::int_type ReaderStreamBuffer::underflow() {
905✔
182
        // eback -- pointer to the first char (byte)
183
        // gptr  -- pointer to the current char (byte)
184
        // egptr -- pointer past the last char (byte)
185

186
        // This function is only called if gptr() == nullptr or gptr() >= egptr(),
187
        // i.e. if there's nothing more to read.
188
        if (this->gptr() >= this->egptr()) {
905✔
189
                errno = 0;
905✔
190
                auto ex_n_read = reader_.Read(buf_.begin(), buf_.end());
1,810✔
191
                streamsize n_read;
192
                if (ex_n_read) {
905✔
193
                        n_read = ex_n_read.value();
905✔
194
                } else {
195
                        // There is no way to return an error from underflow(), generally
196
                        // the streams only care about how much data was read. No data or
197
                        // less data then requested by the caller of istream.read() means
198
                        // eofbit and failbit are set. If the user code wants to get the
199
                        // error or check if there was an error, it needs to check errno.
200
                        //
201
                        // So as long as we don't clear errno after a failure in the
202
                        // reader_.Read() above, error handling works as usual and returning
203
                        // eof below is all that needs to happen here.
204
                        //
205
                        // In case errno is not set for some reason, let's try to get it
206
                        // from the error with a fallback to a generic I/O error.
207
                        if (errno == 0) {
×
208
                                if (ex_n_read.error().code.category() == generic_category()) {
×
209
                                        errno = ex_n_read.error().code.value();
×
210
                                } else {
211
                                        errno = EIO;
×
212
                                }
213
                        }
214
                        n_read = 0;
×
215
                }
216

217
                streambuf::char_type *first = reinterpret_cast<streambuf::char_type *>(buf_.data());
905✔
218

219
                // set eback, gptr, egptr
220
                this->setg(first, first, first + n_read);
905✔
221
        }
222

223
        return this->gptr() == this->egptr() ? std::char_traits<char>::eof()
905✔
224
                                                                                 : std::char_traits<char>::to_int_type(*this->gptr());
905✔
225
};
226

227
/**
228
 * A variant of the #istream class that takes ownership of the #streambuf buffer
229
 * created for it.
230
 *
231
 * @note Base #istream is designed to work on shared buffers so it doesn't
232
 *       destruct/delete the buffer.
233
 */
234
class istreamWithUniqueBuffer : public istream {
235
public:
236
        // The unique_ptr, &&buf and std::move() model this really nicely -- a
237
        // unique_ptr rvalue (i.e. temporary) is required and it's moved into the
238
        // object. The default destructor then takes care of cleaning up properly.
239
        istreamWithUniqueBuffer(unique_ptr<streambuf> &&buf) :
480✔
240
                istream(buf.get()),
241
                buf_ {std::move(buf)} {};
480✔
242

243
private:
244
        unique_ptr<streambuf> buf_;
245
};
246

247
unique_ptr<istream> Reader::GetStream() {
480✔
248
        return unique_ptr<istream>(
249
                new istreamWithUniqueBuffer(unique_ptr<ReaderStreamBuffer>(new ReaderStreamBuffer(*this))));
480✔
250
};
251

252
ExpectedIfstream OpenIfstream(const string &path) {
276✔
253
        ifstream is;
552✔
254
        errno = 0;
276✔
255
        is.open(path);
276✔
256
        if (!is) {
276✔
257
                int io_errno = errno;
7✔
258
                return ExpectedIfstream(expected::unexpected(error::Error(
7✔
259
                        generic_category().default_error_condition(io_errno),
7✔
260
                        "Failed to open '" + path + "' for reading")));
21✔
261
        }
262
        return ExpectedIfstream(std::move(is));
269✔
263
}
264

265
ExpectedSharedIfstream OpenSharedIfstream(const string &path) {
7✔
266
        auto exp_is = OpenIfstream(path);
14✔
267
        if (!exp_is) {
7✔
268
                return expected::unexpected(exp_is.error());
×
269
        }
270
        return make_shared<ifstream>(std::move(exp_is.value()));
14✔
271
}
272

273
ExpectedOfstream OpenOfstream(const string &path, bool append) {
1,040✔
274
        ofstream os;
2,080✔
275
        errno = 0;
1,040✔
276
        os.open(path, append ? ios::app : ios::out);
1,040✔
277
        if (!os) {
1,040✔
278
                int io_errno = errno;
1✔
279
                return ExpectedOfstream(expected::unexpected(error::Error(
1✔
280
                        generic_category().default_error_condition(io_errno),
1✔
281
                        "Failed to open '" + path + "' for writing")));
3✔
282
        }
283
        return os;
1,039✔
284
}
285

286
ExpectedSharedOfstream OpenSharedOfstream(const string &path, bool append) {
×
287
        auto exp_is = OpenOfstream(path, append);
×
288
        if (!exp_is) {
×
289
                return expected::unexpected(exp_is.error());
×
290
        }
291
        return make_shared<ofstream>(std::move(exp_is.value()));
×
292
}
293

294
error::Error WriteStringIntoOfstream(ofstream &os, const string &data) {
762✔
295
        errno = 0;
762✔
296
        os.write(data.data(), data.size());
762✔
297
        if (os.bad() || os.fail()) {
762✔
298
                int io_errno = errno;
1✔
299
                return error::Error(
300
                        std::generic_category().default_error_condition(io_errno),
1✔
301
                        "Failed to write data into the stream");
3✔
302
        }
303

304
        return error::NoError;
761✔
305
}
306

307
ExpectedSize StreamReader::Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) {
1,578✔
308
        is_->read(reinterpret_cast<char *>(&*start), end - start);
1,578✔
309
        if (!is_) {
1,578✔
310
                int io_error = errno;
×
311
                return expected::unexpected(
×
312
                        Error(std::generic_category().default_error_condition(io_error), ""));
×
313
        }
314
        return is_->gcount();
3,156✔
315
}
316

317
error::Error FileReader::Rewind() {
4✔
318
        if (!is_) {
4✔
319
                auto ex_is = OpenSharedIfstream(path_);
3✔
320
                if (!ex_is) {
3✔
321
                        return ex_is.error();
×
322
                }
323
                is_ = ex_is.value();
3✔
324
        }
325
        if (!(*is_)) {
4✔
326
                return Error(std::error_condition(std::errc::io_error), "Bad stream, cannot rewind");
×
327
        }
328
        errno = 0;
4✔
329
        is_->seekg(0, ios::beg);
4✔
330
        int io_errno = errno;
4✔
331
        if (!(*is_)) {
4✔
332
                return Error(
333
                        generic_category().default_error_condition(io_errno),
×
334
                        "Failed to seek to the beginning of the stream");
×
335
        }
336
        return error::NoError;
4✔
337
}
338

339
} // namespace io
340
} // namespace common
341
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc