• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 2281564137

23 Jan 2026 10:59AM UTC coverage: 81.48% (+1.7%) from 79.764%
2281564137

push

gitlab-ci

michalkopczan
fix: Schedule next deployment poll if current one failed early causing no handler to be called

Ticket: MEN-9144
Changelog: Fix a hang when polling for deployment failed early causing no handler of API response
to be called. Added handler call for this case, causing the deployment polling
to continue.

Signed-off-by: Michal Kopczan <michal.kopczan@northern.tech>

1 of 1 new or added line in 1 file covered. (100.0%)

327 existing lines in 44 files now uncovered.

8839 of 10848 relevant lines covered (81.48%)

20226.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.72
/src/common/io.hpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#ifndef MENDER_COMMON_IO_HPP
16
#define MENDER_COMMON_IO_HPP
17

18
#include <common/error.hpp>
19
#include <common/expected.hpp>
20

21
#include <algorithm>
22
#include <cstdint>
23
#include <fstream>
24
#include <iostream>
25
#include <istream>
26
#include <iterator>
27
#include <limits>
28
#include <memory>
29
#include <ostream>
30
#include <sstream>
31
#include <string>
32
#include <system_error>
33
#include <vector>
34

35
namespace mender {
36
namespace common {
37
namespace io {
38

39
using namespace std;
40

41
namespace expected = mender::common::expected;
42

43
using mender::common::error::Error;
44
using mender::common::error::NoError;
45

46
using mender::common::expected::ExpectedSize;
47

48
namespace paths {
49
extern const string Stdin;
50
}
51

52
class Reader {
9✔
53
public:
54
        virtual ~Reader() {};
55

56
        virtual ExpectedSize Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) = 0;
57

58
        unique_ptr<istream> GetStream();
59
};
60
using ReaderPtr = shared_ptr<Reader>;
61
using ExpectedReaderPtr = expected::expected<ReaderPtr, Error>;
62

63
class Writer {
64
public:
65
        virtual ~Writer() {};
66

67
        virtual ExpectedSize Write(
68
                vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) = 0;
69
};
70
using WriterPtr = shared_ptr<Writer>;
71
using ExpectedWriterPtr = expected::expected<WriterPtr, Error>;
72

73
class ReadWriter : virtual public Reader, virtual public Writer {};
74
using ReadWriterPtr = shared_ptr<ReadWriter>;
75
using ExpectedReadWriterPtr = expected::expected<ReadWriterPtr, Error>;
76

77
class Canceller {
2,202✔
78
public:
79
        virtual ~Canceller() {
80
        }
81

82
        /** Cancel() has the following semantics:
83
                - When called and there are no asynchronous operations ongoing, it is a no-op.
84
                - When called and there are asynchronous operations, the operation should be cancelled,
85
                  and the callers should get `errc::operation_canceled`.
86
                - If called as part of the destructor, then the operation should be canceled, but
87
                  callers should not receive anything (because they may already have been destroyed.
88
         */
89
        virtual void Cancel() = 0;
90
};
91

92
enum class Repeat {
93
        Yes,
94
        No,
95
};
96

97
using AsyncIoHandler = function<void(ExpectedSize)>;
98
using RepeatedAsyncIoHandler = function<Repeat(ExpectedSize)>;
99

100
class AsyncReader : virtual public Canceller {
101
public:
102
        // Note: iterators generally need to remain valid until either the handler or `Cancel()` is
103
        // called.
104
        virtual error::Error AsyncRead(
105
                vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, AsyncIoHandler handler) = 0;
106

107
        // Calls AsyncRead repeatedly with the same iterators and handler, until the stream is
108
        // exhausted or an error occurs. All errors will be returned through the handler, even
109
        // initial errors from AsyncRead.
110
        void RepeatedAsyncRead(
111
                vector<uint8_t>::iterator start,
112
                vector<uint8_t>::iterator end,
113
                RepeatedAsyncIoHandler handler);
114
};
115
using AsyncReaderPtr = shared_ptr<AsyncReader>;
116

117
class AsyncWriter : virtual public Canceller {
118
public:
119
        // Note: iterators generally need to remain valid until either the handler or `Cancel()` is
120
        // called.
121
        virtual error::Error AsyncWrite(
122
                vector<uint8_t>::const_iterator start,
123
                vector<uint8_t>::const_iterator end,
124
                AsyncIoHandler handler) = 0;
125
};
126
using AsyncWriterPtr = shared_ptr<AsyncWriter>;
127

128
class AsyncReadWriter : virtual public AsyncReader, virtual public AsyncWriter {};
129
using AsyncReadWriterPtr = shared_ptr<AsyncReadWriter>;
130

131
using ExpectedAsyncReaderPtr = expected::expected<AsyncReaderPtr, error::Error>;
132
using ExpectedAsyncWriterPtr = expected::expected<AsyncWriterPtr, error::Error>;
133
using ExpectedAsyncReadWriterPtr = expected::expected<AsyncReadWriterPtr, error::Error>;
134

135
/**
136
 * Stream the data from `src` to `dst` until encountering EOF or an error.
137
 */
138
Error Copy(Writer &dst, Reader &src);
139

140
/**
141
 * Stream the data from `src` to `dst` until encountering EOF or an error, using `buffer` as an
142
 * intermediate. The block size will be the size of `buffer`.
143
 */
144
Error Copy(Writer &dst, Reader &src, vector<uint8_t> &buffer);
145

146
/**
147
 * Stream the data from `src` to `dst` until encountering EOF or an error. The reading end is async,
148
 * the writing end is not, so it should be "quick", otherwise it may still stall.
149
 */
150
void AsyncCopy(
151
        Writer &dst,
152
        AsyncReader &src,
153
        function<void(Error)> finished_handler,
154
        int64_t stop_after = numeric_limits<int64_t>::max());
155
void AsyncCopy(
156
        WriterPtr dst,
157
        AsyncReaderPtr src,
158
        function<void(Error)> finished_handler,
159
        int64_t stop_after = numeric_limits<int64_t>::max());
160

161
/**
162
 * Stream the data from `src` to `dst` until encountering EOF or an error. The writing end is async,
163
 * the reading end is not, so it should be "quick", otherwise it may still stall.
164
 */
165
void AsyncCopy(
166
        AsyncWriter &dst,
167
        Reader &src,
168
        function<void(Error)> finished_handler,
169
        int64_t stop_after = numeric_limits<int64_t>::max());
170
void AsyncCopy(
171
        AsyncWriterPtr dst,
172
        ReaderPtr src,
173
        function<void(Error)> finished_handler,
174
        int64_t stop_after = numeric_limits<int64_t>::max());
175

176
/**
177
 * Stream the data from `src` to `dst` until encountering EOF or an error.
178
 */
179
void AsyncCopy(
180
        AsyncWriter &dst,
181
        AsyncReader &src,
182
        function<void(Error)> finished_handler,
183
        int64_t stop_after = numeric_limits<int64_t>::max());
184
void AsyncCopy(
185
        AsyncWriterPtr dst,
186
        AsyncReaderPtr src,
187
        function<void(Error)> finished_handler,
188
        int64_t stop_after = numeric_limits<int64_t>::max());
189

190
class StreamReader : virtual public Reader {
191
protected:
192
        shared_ptr<std::istream> is_;
193

194
public:
195
        StreamReader(std::istream &stream) :
416✔
196
                // For references, initialize a shared_ptr with a null deleter, since we don't own
197
                // the object.
198
                is_ {&stream, [](std::istream *stream) {}} {
208✔
199
        }
416✔
200
        StreamReader(shared_ptr<std::istream> stream) :
246✔
201
                is_ {stream} {
123✔
202
        }
246✔
203
        ExpectedSize Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) override;
204
};
205

206
using ExpectedIfstream = expected::expected<ifstream, error::Error>;
207
using ExpectedSharedIfstream = expected::expected<shared_ptr<ifstream>, error::Error>;
208
using ExpectedOfstream = expected::expected<ofstream, error::Error>;
209
using ExpectedSharedOfstream = expected::expected<shared_ptr<ofstream>, error::Error>;
210
ExpectedIfstream OpenIfstream(const string &path);
211
ExpectedSharedIfstream OpenSharedIfstream(const string &path);
212
ExpectedOfstream OpenOfstream(const string &path, bool append = false);
213
ExpectedSharedOfstream OpenSharedOfstream(const string &path, bool append = false);
214

215
class FileReader : virtual public StreamReader {
216
public:
217
        FileReader(const string &path) :
16✔
218
                StreamReader(shared_ptr<std::istream>()),
8✔
219
                path_ {path} {};
24✔
220

221
        ExpectedSize Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) override {
66✔
222
                // We cannot open the stream in the constructor because it can fail and
223
                // there's no way to report that error. However, the check below is
224
                // cheap compared to the I/O that happens in this function, so a very
225
                // little overhead.
226
                if (!is_) {
66✔
227
                        auto ex_is = OpenSharedIfstream(path_);
5✔
228
                        if (!ex_is) {
5✔
UNCOV
229
                                return expected::unexpected(ex_is.error());
×
230
                        }
231
                        is_ = ex_is.value();
5✔
232
                }
233
                return StreamReader::Read(start, end);
66✔
234
        }
235

236
        error::Error Rewind();
237

238
private:
239
        string path_;
240
};
241

242
/* Discards all data written to it */
243
class Discard : virtual public Writer {
76✔
244
        ExpectedSize Write(
1,084✔
245
                vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) override {
246
                return end - start;
1,084✔
247
        }
248
};
249

250
class StringReader : virtual public Reader {
251
private:
252
        std::stringstream s_;
253
        unique_ptr<StreamReader> reader_;
254

255
public:
256
        StringReader(const string &str) :
158✔
257
                s_ {str},
79✔
258
                reader_ {new StreamReader(s_)} {
79✔
259
        }
158✔
260
        StringReader(string &&str) :
26✔
261
                s_ {str},
13✔
262
                reader_ {new StreamReader(s_)} {
13✔
263
        }
26✔
264
        StringReader(StringReader &&sr) :
265
                s_ {std::move(sr.s_)},
266
                reader_ {new StreamReader(s_)} {
267
        }
268
        StringReader &operator=(StringReader &&sr) {
1✔
269
                s_ = std::move(sr.s_);
1✔
270
                reader_.reset(new StreamReader(s_));
1✔
271
                return *this;
1✔
272
        }
273

274
        ExpectedSize Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) override {
160✔
275
                return reader_->Read(start, end);
160✔
276
        }
277
};
278

279
using Vsize = vector<uint8_t>::size_type;
280

281
class ByteReader : virtual public Reader {
282
private:
283
        shared_ptr<vector<uint8_t>> emitter_;
284
        Vsize bytes_read_ {0};
285

286
public:
287
        ByteReader(vector<uint8_t> &emitter) :
18✔
288
                emitter_ {&emitter, [](vector<uint8_t> *vec) {}} {
9✔
289
        }
18✔
290

291
        ByteReader(shared_ptr<vector<uint8_t>> emitter) :
292
                emitter_ {emitter} {
293
        }
294

295
        ExpectedSize Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) override;
296

297
        void Rewind();
298
};
299

300
class BufferedReader : virtual public Reader {
301
private:
302
        Reader &wrapped_reader_;
303
        bool rewind_done_ {false};
304
        bool rewind_consumed_ {false};
305
        bool stop_done_ {false};
306

307
protected:
308
        vector<uint8_t> buffer_;
309
        ByteReader buffer_reader_;
310
        size_t buffer_remaining_ {0};
311

312
public:
313
        BufferedReader(Reader &reader) :
4✔
314
                wrapped_reader_ {reader},
2✔
315
                buffer_reader_ {buffer_} {};
4✔
316

317
        ExpectedSize Read(vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) override;
318

319
        ExpectedSize Rewind();
320

321
        ExpectedSize StopBufferingAndRewind();
322
        error::Error StopBufferingAndDiscard();
323
};
324
using BufferedReaderPtr = shared_ptr<BufferedReader>;
325

326
class AsyncBufferedReader : virtual public AsyncReader {
327
private:
328
        AsyncReader &wrapped_reader_;
329
        bool rewind_done_ {false};
330
        bool rewind_consumed_ {false};
331
        bool stop_done_ {false};
332

333
protected:
334
        vector<uint8_t> buffer_;
335
        ByteReader buffer_reader_;
336
        size_t buffer_remaining_ {0};
337

338
public:
339
        AsyncBufferedReader(AsyncReader &reader) :
10✔
340
                wrapped_reader_ {reader},
5✔
341
                buffer_reader_ {buffer_} {};
10✔
342

343
        error::Error AsyncRead(
344
                vector<uint8_t>::iterator start,
345
                vector<uint8_t>::iterator end,
346
                AsyncIoHandler handler) override;
347

348
        void Cancel() override {
1✔
349
                buffer_.clear();
1✔
UNCOV
350
                buffer_reader_.Rewind();
×
351
                rewind_done_ = false;
1✔
352
                rewind_consumed_ = false;
1✔
353
                stop_done_ = false;
1✔
354
                wrapped_reader_.Cancel();
1✔
355
        };
1✔
356

357
        ExpectedSize Rewind();
358

359
        ExpectedSize StopBufferingAndRewind();
360
        error::Error StopBufferingAndDiscard();
361
};
362
using AsyncBufferedReaderPtr = shared_ptr<AsyncBufferedReader>;
363

364
class ByteWriter : virtual public Writer {
365
private:
366
        shared_ptr<vector<uint8_t>> receiver_;
367
        Vsize bytes_written_ {0};
368
        bool unlimited_ {false};
369

370
public:
371
        ByteWriter(vector<uint8_t> &receiver) :
574✔
372
                receiver_ {&receiver, [](vector<uint8_t> *vec) {}} {
287✔
373
        }
574✔
374

375
        ByteWriter(shared_ptr<vector<uint8_t>> receiver) :
98✔
376
                receiver_ {receiver} {
49✔
377
        }
98✔
378

379
        // Will extend the vector if necessary. Probably a bad idea in production code, but useful
380
        // in tests.
381
        void SetUnlimited(bool enabled);
382

383
        ExpectedSize Write(
384
                vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) override;
385
};
386

387
class StreamWriter : virtual public Writer {
388
private:
389
        shared_ptr<std::ostream> os_;
390

391
public:
392
        StreamWriter(std::ostream &stream) :
2,052✔
393
                os_ {&stream, [](std::ostream *str) {}} {
1,026✔
394
        }
2,052✔
395
        StreamWriter(shared_ptr<std::ostream> stream) :
396
                os_ {stream} {
397
        }
398
        ExpectedSize Write(
399
                vector<uint8_t>::const_iterator start, vector<uint8_t>::const_iterator end) override;
400
};
401

402
error::Error WriteStringIntoOfstream(ofstream &os, const string &data);
403

404
expected::ExpectedSize FileSize(const string &path);
405

406
expected::ExpectedUintMax GetAvailableSpace(const string &path);
407

408
} // namespace io
409
} // namespace common
410
} // namespace mender
411

412
#endif // MENDER_COMMON_IO_HPP
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc