• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 2281564137

23 Jan 2026 10:59AM UTC coverage: 81.48% (+1.7%) from 79.764%
2281564137

push

gitlab-ci

michalkopczan
fix: Schedule next deployment poll if current one failed early causing no handler to be called

Ticket: MEN-9144
Changelog: Fix a hang when polling for deployment failed early causing no handler of API response
to be called. Added handler call for this case, causing the deployment polling
to continue.

Signed-off-by: Michal Kopczan <michal.kopczan@northern.tech>

1 of 1 new or added line in 1 file covered. (100.0%)

327 existing lines in 44 files now uncovered.

8839 of 10848 relevant lines covered (81.48%)

20226.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.54
/src/common/events/events_io.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/events_io.hpp>
16

17
namespace mender {
18
namespace common {
19
namespace events {
20
namespace io {
21

22
AsyncReaderFromReader::AsyncReaderFromReader(EventLoop &loop, mio::ReaderPtr reader) :
270✔
23
        reader_ {reader},
24
        loop_ {loop} {
135✔
25
}
270✔
26

27
AsyncReaderFromReader::~AsyncReaderFromReader() {
405✔
28
        Cancel();
135✔
29
}
270✔
30

31
error::Error AsyncReaderFromReader::AsyncRead(
913✔
32
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, mio::AsyncIoHandler handler) {
33
        cancelled_ = make_shared<bool>(false);
913✔
34
        auto &cancelled = cancelled_;
35
        loop_.Post([this, cancelled, start, end, handler]() {
2,739✔
36
                if (!*cancelled) {
913✔
37
                        in_progress_ = true;
913✔
38
                        // Simple, "cheating" implementation, we just do it synchronously.
39
                        auto result = reader_->Read(start, end);
913✔
40
                        in_progress_ = false;
913✔
41
                        handler(result);
1,826✔
42
                }
43
        });
913✔
44

45
        return error::NoError;
913✔
46
}
47

48
void AsyncReaderFromReader::Cancel() {
135✔
49
        // Cancel() is not allowed on normal Readers.
50
        assert(!in_progress_);
51
        if (cancelled_) {
135✔
52
                *cancelled_ = true;
130✔
53
                cancelled_.reset();
130✔
54
        }
55
}
135✔
56

57
AsyncWriterFromWriter::AsyncWriterFromWriter(EventLoop &loop, mio::WriterPtr writer) :
2✔
58
        writer_ {writer},
59
        loop_ {loop} {
1✔
60
}
2✔
61

62
AsyncWriterFromWriter::~AsyncWriterFromWriter() {
3✔
63
        Cancel();
1✔
64
}
2✔
65

66
error::Error AsyncWriterFromWriter::AsyncWrite(
1✔
67
        vector<uint8_t>::const_iterator start,
68
        vector<uint8_t>::const_iterator end,
69
        mio::AsyncIoHandler handler) {
70
        cancelled_ = make_shared<bool>(false);
1✔
71
        auto &cancelled = cancelled_;
72
        loop_.Post([this, cancelled, start, end, handler]() {
3✔
73
                if (!*cancelled) {
1✔
74
                        in_progress_ = true;
1✔
75
                        // Simple, "cheating" implementation, we just do it synchronously.
76
                        auto result = writer_->Write(start, end);
1✔
77
                        in_progress_ = false;
1✔
78
                        handler(result);
2✔
79
                }
80
        });
1✔
81

82
        return error::NoError;
1✔
83
}
84

85
void AsyncWriterFromWriter::Cancel() {
1✔
86
        // Cancel() is not allowed on normal Writers.
87
        assert(!in_progress_);
88
        if (cancelled_) {
1✔
89
                *cancelled_ = true;
1✔
90
                cancelled_.reset();
1✔
91
        }
92
}
1✔
93

94
ReaderFromAsyncReader::ReaderFromAsyncReader(EventLoop &event_loop, mio::AsyncReaderPtr reader) :
54✔
95
        event_loop_(event_loop),
54✔
96
        reader_(reader) {
54✔
97
}
54✔
98

99
ReaderFromAsyncReader::ReaderFromAsyncReader(EventLoop &event_loop, mio::AsyncReader &reader) :
×
UNCOV
100
        event_loop_(event_loop),
×
101
        // For references, just use a destructor-less pointer.
102
        reader_(&reader, [](mio::AsyncReader *) {}) {
×
103
}
×
104

105
mio::ExpectedSize ReaderFromAsyncReader::Read(
102✔
106
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end) {
107
        mio::ExpectedSize read;
102✔
108
        bool finished = false;
102✔
109
        event_loop_.Post([start, end, this, &finished, &read]() {
102✔
110
                auto err =
111
                        reader_->AsyncRead(start, end, [this, &finished, &read](mio::ExpectedSize num_read) {
102✔
112
                                read = num_read;
102✔
113
                                finished = true;
102✔
114
                                event_loop_.Stop();
102✔
115
                        });
102✔
116
                if (err != error::NoError) {
102✔
117
                        read = expected::unexpected(err);
×
118
                        finished = true;
×
119
                        event_loop_.Stop();
×
120
                }
121
        });
102✔
122

123
        // Since the same event loop may have been used to call into this function, run the event
124
        // loop recursively to keep processing events.
125
        event_loop_.Run();
102✔
126

127
        if (!finished) {
102✔
128
                // If this happens then it means that the event loop was stopped by somebody
129
                // else. We have no choice now but to return error, since we have to get out of this
130
                // stack frame. We also need to re-stop the event loop, since the first stop was
131
                // spent on getting here.
132
                event_loop_.Stop();
×
133
                return expected::unexpected(
×
134
                        error::Error(make_error_condition(errc::operation_canceled), "Event loop was stopped"));
×
135
        }
136

137
        return read;
138
}
139

140
TeeReader::ExpectedTeeReaderLeafPtr TeeReader::MakeAsyncReader() {
8✔
141
        if (any_of(
8✔
142
                        leaf_readers_.begin(),
143
                        leaf_readers_.end(),
144
                        [](const std::pair<TeeReaderLeafPtr, TeeReaderLeafContext> r) {
145
                                return r.second.buffer_bytes_missing != 0;
4✔
146
                        })) {
147
                return expected::unexpected(error::Error(
×
148
                        make_error_condition(errc::io_error), "A Reader is already reading from the buffer"));
×
149
        }
150

151
        if (stop_done_) {
8✔
152
                return expected::unexpected(error::Error(
×
153
                        make_error_condition(errc::io_error), "Buffering stopped, no more readers allowed"));
×
154
        }
155

156
        auto ex_bytes_missing = buffered_reader_->Rewind();
8✔
157
        if (!ex_bytes_missing) {
8✔
158
                return expected::unexpected(ex_bytes_missing.error());
×
159
        }
160

161
        auto reader = make_shared<TeeReaderLeaf>(shared_from_this());
8✔
162
        leaf_readers_.insert({reader, TeeReaderLeafContext {}});
16✔
163
        leaf_readers_[reader].buffer_bytes_missing = ex_bytes_missing.value();
8✔
164
        return reader;
8✔
165
}
166

167
error::Error TeeReader::ReadyToAsyncRead(
17✔
168
        TeeReader::TeeReaderLeafPtr leaf_reader,
169
        vector<uint8_t>::iterator start,
170
        vector<uint8_t>::iterator end,
171
        mio::AsyncIoHandler handler) {
172
        // The reader must exist in the internal map.
173
        auto found = leaf_readers_.find(leaf_reader);
174
        AssertOrReturnError(found != leaf_readers_.end());
17✔
175

176
        if (leaf_readers_[leaf_reader].buffer_bytes_missing > 0) {
17✔
177
                // Special case, reading missing bytes
178
                TeeReaderLeafContext &ctx = leaf_readers_[leaf_reader];
179
                auto to_read = std::min(ctx.buffer_bytes_missing, static_cast<size_t>(end - start));
1✔
180
                auto handler_wrapper = [this, handler, &ctx](mio::ExpectedSize result) {
3✔
181
                        if (result) {
1✔
182
                                ctx.buffer_bytes_missing -= result.value();
1✔
183
                        }
184
                        auto err = MaybeDiscardBuffer();
1✔
185
                        if (err != error::NoError) {
1✔
186
                                if (!result) {
×
187
                                        err = result.error().FollowedBy(err);
×
188
                                }
189
                                result = expected::unexpected(err);
×
190
                        }
191
                        handler(result);
2✔
192
                };
2✔
193

194
                auto err = buffered_reader_->AsyncRead(start, start + to_read, handler_wrapper);
1✔
195
                if (err != error::NoError) {
1✔
196
                        handler(expected::unexpected(err));
×
197
                }
198
        } else {
199
                leaf_readers_[leaf_reader].pending_read.start = start;
16✔
200
                leaf_readers_[leaf_reader].pending_read.end = end;
16✔
201
                leaf_readers_[leaf_reader].pending_read.handler = handler;
16✔
202
                if (++ready_to_read == leaf_readers_.size()) {
16✔
203
                        DoAsyncRead();
9✔
204
                        ready_to_read = 0;
9✔
205
                }
206
        }
207

208
        return error::NoError;
17✔
209
}
210

211
void TeeReader::CallAllHandlers(mio::ExpectedSize result) {
9✔
212
        // Makes a copy of the handlers and then calls them sequentially
213
        vector<mio::AsyncIoHandler> handlers;
214
        for (auto &it : leaf_readers_) {
25✔
215
                handlers.push_back(it.second.pending_read.handler);
16✔
216
                it.second.pending_read = {};
32✔
217
        }
218
        for (const auto &h : handlers) {
25✔
219
                h(result);
32✔
220
        }
221
}
25✔
222

223
void TeeReader::DoAsyncRead() {
9✔
224
        auto handler = [this](mio::ExpectedSize result) {
9✔
225
                if (!result) {
9✔
226
                        CallAllHandlers(result);
×
227
                        return;
228
                };
229

230
                auto start_iterator = leaf_readers_.begin()->second.pending_read.start;
9✔
231
                auto read_bytes = result.value();
9✔
232
                for_each(
9✔
233
                        std::next(leaf_readers_.begin()),
234
                        leaf_readers_.end(),
9✔
235
                        [start_iterator,
7✔
236
                         read_bytes](const std::pair<TeeReaderLeafPtr, TeeReaderLeafContext> r) {
237
                                std::copy_n(start_iterator, read_bytes, r.second.pending_read.start);
7✔
238
                        });
7✔
239

240
                CallAllHandlers(result);
18✔
241
        };
9✔
242

243
        auto min_read = std::min_element(
9✔
244
                leaf_readers_.cbegin(),
245
                leaf_readers_.cend(),
246
                [](const std::pair<TeeReaderLeafPtr, TeeReaderLeafContext> r1,
247
                   std::pair<TeeReaderLeafPtr, TeeReaderLeafContext> r2) {
248
                        return (r1.second.pending_read.end - r1.second.pending_read.start)
249
                                   < (r2.second.pending_read.end - r2.second.pending_read.start);
7✔
250
                });
251
        auto bytes_to_read = min_read->second.pending_read.end - min_read->second.pending_read.start;
252

253
        auto err = buffered_reader_->AsyncRead(
9✔
254
                leaf_readers_.begin()->second.pending_read.start,
255
                leaf_readers_.begin()->second.pending_read.start + bytes_to_read,
256
                handler);
9✔
257
        if (err != error::NoError) {
9✔
258
                CallAllHandlers(expected::unexpected(err));
×
259
        }
260
}
9✔
261

262
error::Error TeeReader::MaybeDiscardBuffer() {
2✔
263
        if (stop_done_
2✔
264
                && all_of(
2✔
265
                        leaf_readers_.begin(),
266
                        leaf_readers_.end(),
267
                        [](const std::pair<TeeReaderLeafPtr, TeeReaderLeafContext> r) {
268
                                return r.second.buffer_bytes_missing == 0;
3✔
269
                        })) {
270
                return buffered_reader_->StopBufferingAndDiscard();
1✔
271
        }
272

273
        return error::NoError;
1✔
274
}
275

276
error::Error TeeReader::StopBuffering() {
1✔
277
        stop_done_ = true;
1✔
278
        return MaybeDiscardBuffer();
1✔
279
}
280

281
error::Error TeeReader::CancelReader(TeeReader::TeeReaderLeafPtr leaf_reader) {
2✔
282
        // The reader must exist in the internal map.
283
        auto found = leaf_readers_.find(leaf_reader);
284
        AssertOrReturnError(found != leaf_readers_.end());
2✔
285

286
        auto handler = found->second.pending_read.handler;
2✔
287

288
        leaf_readers_.erase(found);
2✔
289

290
        if (handler) {
2✔
291
                handler(expected::unexpected(
×
292
                        error::Error(make_error_condition(errc::operation_canceled), "Leaf reader cancelled")));
×
293
        }
294

295
        if (leaf_readers_.size() == 0) {
2✔
296
                buffered_reader_->Cancel();
1✔
297
        }
298
        return error::NoError;
2✔
299
}
300

301
TeeReader::~TeeReader() {
4✔
302
        leaf_readers_.clear();
303
}
4✔
304

305
error::Error TeeReader::TeeReaderLeaf::AsyncRead(
17✔
306
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, mio::AsyncIoHandler handler) {
307
        auto p = tee_reader_.lock();
17✔
308
        if (!p) {
17✔
309
                return error::Error(make_error_condition(errc::io_error), "TeeReader already destroyed");
×
310
        }
311

312
        return p->ReadyToAsyncRead(shared_from_this(), start, end, handler);
51✔
313
}
314

315
void TeeReader::TeeReaderLeaf::Cancel() {
2✔
316
        auto p = tee_reader_.lock();
2✔
317
        if (!p) {
2✔
318
                // Already disconnected from the tee reader. Nothing to do.
319
                return;
320
        }
321

322
        // Disconnect us from the tee reader. This reader is useless after this.
323
        tee_reader_.reset();
2✔
324

325
        p->CancelReader(shared_from_this());
4✔
326
};
327

328
} // namespace io
329
} // namespace events
330
} // namespace common
331
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc