• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1057599036

01 Nov 2023 12:43PM UTC coverage: 80.276% (+0.07%) from 80.207%
1057599036

push

gitlab-ci

kacf
fix: Make sure Cancel is called if resumer body reader is destroyed.

This was already the case in most cases, since the inner reader would
call cancel, but not in case the reader was destroyed in between
retries, in which case there is no inner reader.

Changelog: None
Ticket: None

Signed-off-by: Kristian Amlie <kristian.amlie@northern.tech>

6 of 6 new or added lines in 1 file covered. (100.0%)

6911 of 8609 relevant lines covered (80.28%)

9347.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.21
/src/mender-update/http_resumer/http_resumer.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <mender-update/http_resumer.hpp>
16

17
#include <regex>
18

19
#include <common/common.hpp>
20
#include <common/expected.hpp>
21

22
namespace mender {
23
namespace update {
24
namespace http_resumer {
25

26
namespace common = mender::common;
27
namespace expected = mender::common::expected;
28
namespace http = mender::http;
29

30
// Represents the parts of a Content-Range HTTP header
31
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range
32
struct RangeHeader {
33
        long long int range_start {0};
34
        long long int range_end {0};
35
        long long int size {0};
36
};
37
using ExpectedRangeHeader = expected::expected<RangeHeader, error::Error>;
38

39
// Parses the HTTP Content-Range header
40
// For an alternative implementation without regex dependency see:
41
// https://github.com/mendersoftware/mender/pull/1372/commits/ea711fc4dafa943266e9013fd6704da3d4518a27
42
ExpectedRangeHeader ParseRangeHeader(string header) {
41✔
43
        RangeHeader range_header {};
44

45
        std::regex content_range_regexp {R"(bytes\s+(\d+)\s?-\s?(\d+)\s?\/?\s?(\d+|\*)?)"};
82✔
46

47
        std::smatch range_matches;
48
        if (!regex_match(header, range_matches, content_range_regexp)) {
41✔
49
                return expected::unexpected(http::MakeError(
10✔
50
                        http::NoSuchHeaderError, "Invalid Content-Range returned from server: " + header));
30✔
51
        }
52

53
        auto exp_range_start = common::StringToLongLong(range_matches[1].str());
31✔
54
        auto exp_range_end = common::StringToLongLong(range_matches[2].str());
62✔
55
        if (!exp_range_start || !exp_range_end) {
31✔
56
                return expected::unexpected(http::MakeError(
×
57
                        http::NoSuchHeaderError, "Content-Range contains invalid number: " + header));
×
58
        }
59
        range_header.range_start = exp_range_start.value();
31✔
60
        range_header.range_end = exp_range_end.value();
31✔
61

62
        if (range_header.range_start > range_header.range_end) {
31✔
63
                return expected::unexpected(http::MakeError(
1✔
64
                        http::NoSuchHeaderError, "Invalid Content-Range returned from server: " + header));
3✔
65
        }
66

67
        if ((range_matches[3].matched) && (range_matches[3].str() != "*")) {
55✔
68
                auto exp_size = common::StringToLongLong(range_matches[3].str());
40✔
69
                if (!exp_size) {
20✔
70
                        return expected::unexpected(http::MakeError(
×
71
                                http::NoSuchHeaderError, "Content-Range contains invalid number: " + header));
×
72
                }
73
                range_header.size = exp_size.value();
20✔
74
        }
75

76
        return range_header;
77
}
78

79
class HeaderHandlerFunctor {
572✔
80
public:
81
        HeaderHandlerFunctor(weak_ptr<DownloadResumerClient> resumer) :
82
                resumer_client_ {resumer} {};
83

84
        void operator()(http::ExpectedIncomingResponsePtr exp_resp);
85

86
private:
87
        void HandleFirstResponse(
88
                const shared_ptr<DownloadResumerClient> &resumer_client,
89
                http::ExpectedIncomingResponsePtr exp_resp);
90
        void HandleNextResponse(
91
                const shared_ptr<DownloadResumerClient> &resumer_client,
92
                http::ExpectedIncomingResponsePtr exp_resp);
93

94
        weak_ptr<DownloadResumerClient> resumer_client_;
95
};
96

97
class BodyHandlerFunctor {
592✔
98
public:
99
        BodyHandlerFunctor(weak_ptr<DownloadResumerClient> resumer) :
100
                resumer_client_ {resumer} {};
101

102
        void operator()(http::ExpectedIncomingResponsePtr exp_resp);
103

104
private:
105
        weak_ptr<DownloadResumerClient> resumer_client_;
106
};
107

108
void HeaderHandlerFunctor::operator()(http::ExpectedIncomingResponsePtr exp_resp) {
140✔
109
        auto resumer_client = resumer_client_.lock();
140✔
110
        if (resumer_client) {
140✔
111
                if (resumer_client->resumer_state_->active_state == DownloadResumerActiveStatus::Resuming) {
138✔
112
                        HandleNextResponse(resumer_client, exp_resp);
106✔
113
                } else {
114
                        HandleFirstResponse(resumer_client, exp_resp);
170✔
115
                }
116
        }
117
}
140✔
118

119
void HeaderHandlerFunctor::HandleFirstResponse(
85✔
120
        const shared_ptr<DownloadResumerClient> &resumer_client,
121
        http::ExpectedIncomingResponsePtr exp_resp) {
122
        // The first response shall always call the user header callback. On resumable responses, we
123
        // create a our own incoming response and call the user header handler. On errors, we log a
124
        // warning and call the user handler with the original response
125

126
        if (!exp_resp) {
85✔
127
                resumer_client->logger_.Warning(exp_resp.error().String());
×
128
                resumer_client->CallUserHandler(exp_resp);
×
129
                return;
3✔
130
        }
131
        auto resp = exp_resp.value();
85✔
132

133
        if (resp->GetStatusCode() != mender::http::StatusOK) {
85✔
134
                // Non-resumable response
135
                resumer_client->CallUserHandler(exp_resp);
2✔
136
                return;
2✔
137
        }
138

139
        auto exp_header = resp->GetHeader("Content-Length");
166✔
140
        if (!exp_header || exp_header.value() == "0") {
83✔
141
                resumer_client->logger_.Warning("Response does not contain Content-Length header");
2✔
142
                resumer_client->CallUserHandler(exp_resp);
1✔
143
                return;
1✔
144
        }
145

146
        auto exp_length = common::StringToLongLong(exp_header.value());
82✔
147
        if (!exp_length || exp_length.value() < 0) {
82✔
148
                resumer_client->logger_.Warning(
×
149
                        "Content-Length contains invalid number: " + exp_header.value());
×
150
                resumer_client->CallUserHandler(exp_resp);
×
151
                return;
152
        }
153

154
        // Resumable response
155
        resumer_client->resumer_state_->active_state = DownloadResumerActiveStatus::Resuming;
82✔
156
        resumer_client->resumer_state_->offset = 0;
82✔
157
        resumer_client->resumer_state_->content_length = exp_length.value();
82✔
158

159
        // Prepare a modified response and call user handler
160
        resumer_client->response_.reset(new http::IncomingResponse(*resumer_client, resp->cancelled_));
164✔
161
        resumer_client->response_->status_code_ = resp->GetStatusCode();
82✔
162
        resumer_client->response_->status_message_ = resp->GetStatusMessage();
164✔
163
        resumer_client->response_->headers_ = resp->GetHeaders();
164
        resumer_client->CallUserHandler(resumer_client->response_);
164✔
165
}
166

167
void HeaderHandlerFunctor::HandleNextResponse(
53✔
168
        const shared_ptr<DownloadResumerClient> &resumer_client,
169
        http::ExpectedIncomingResponsePtr exp_resp) {
170
        // If an error occurs has already occurred, schedule the next AsyncCall directly
171
        // If an error occurs during handling here, cancel the resuming and call the user handler.
172
        if (!exp_resp) {
53✔
173
                resumer_client->logger_.Warning(exp_resp.error().String());
22✔
174

175
                auto err = resumer_client->ScheduleNextResumeRequest();
11✔
176
                if (err != error::NoError) {
11✔
177
                        resumer_client->logger_.Error(err.String());
2✔
178
                        resumer_client->CallUserHandler(expected::unexpected(err));
2✔
179
                }
180
                return;
181
        }
182
        auto resp = exp_resp.value();
42✔
183

184
        auto resumer_reader = resumer_client->resumer_reader_.lock();
42✔
185
        if (!resumer_reader) {
42✔
186
                // Errors should already have been handled as part of the Cancel() inside the
187
                // destructor of the reader.
188
                return;
189
        }
190

191
        auto exp_content_range = resp->GetHeader("Content-Range").and_then(ParseRangeHeader);
84✔
192
        if (!exp_content_range) {
42✔
193
                resumer_client->logger_.Error(exp_content_range.error().String());
24✔
194
                resumer_client->CallUserHandler(expected::unexpected(exp_content_range.error()));
24✔
195
                return;
12✔
196
        }
197

198
        auto content_range = exp_content_range.value();
30✔
199
        if (content_range.size != 0
200
                && content_range.size != resumer_client->resumer_state_->content_length) {
30✔
201
                auto size_changed_err = http::MakeError(
202
                        http::DownloadResumerError,
203
                        "Size of artifact changed after download was resumed (expected "
204
                                + to_string(resumer_client->resumer_state_->content_length) + ", got "
4✔
205
                                + to_string(content_range.size) + ")");
8✔
206
                resumer_client->logger_.Error(size_changed_err.String());
4✔
207
                resumer_client->CallUserHandler(expected::unexpected(size_changed_err));
4✔
208
                return;
209
        }
210

211
        if ((content_range.range_end != resumer_client->resumer_state_->content_length - 1)
28✔
212
                || (content_range.range_start != resumer_client->resumer_state_->offset)) {
28✔
213
                auto bad_range_err = http::MakeError(
214
                        http::DownloadResumerError,
215
                        "HTTP server returned an different range than requested. Requested "
216
                                + to_string(resumer_client->resumer_state_->offset) + "-"
4✔
217
                                + to_string(resumer_client->resumer_state_->content_length - 1) + ", got "
8✔
218
                                + to_string(content_range.range_start) + "-" + to_string(content_range.range_end));
8✔
219
                resumer_client->logger_.Error(bad_range_err.String());
4✔
220
                resumer_client->CallUserHandler(expected::unexpected(bad_range_err));
4✔
221
                return;
222
        }
223

224
        // Get the reader for the new response
225
        auto exp_reader = resumer_client->client_.MakeBodyAsyncReader(resp);
52✔
226
        if (!exp_reader) {
26✔
227
                auto bad_range_err = exp_reader.error().WithContext("cannot get the reader after resume");
×
228
                resumer_client->logger_.Error(bad_range_err.String());
×
229
                resumer_client->CallUserHandler(expected::unexpected(bad_range_err));
×
230
                return;
231
        }
232
        // Update the inner reader of the user reader
233
        resumer_reader->inner_reader_ = exp_reader.value();
26✔
234

235
        // Resume reading reusing last user data (start, end, handler)
236
        auto err = resumer_reader->AsyncReadResume();
26✔
237
        if (err != error::NoError) {
26✔
238
                auto bad_read_err = err.WithContext("error reading after resume");
×
239
                resumer_client->logger_.Error(bad_read_err.String());
×
240
                resumer_client->CallUserHandler(expected::unexpected(bad_read_err));
×
241
                return;
242
        }
243
}
244

245
void BodyHandlerFunctor::operator()(http::ExpectedIncomingResponsePtr exp_resp) {
127✔
246
        auto resumer_client = resumer_client_.lock();
127✔
247
        if (!resumer_client) {
127✔
248
                return;
249
        }
250

251
        if (*resumer_client->cancelled_) {
127✔
252
                resumer_client->CallUserHandler(exp_resp);
72✔
253
                return;
72✔
254
        }
255

256
        if (resumer_client->resumer_state_->active_state == DownloadResumerActiveStatus::Inactive) {
55✔
257
                resumer_client->CallUserHandler(exp_resp);
2✔
258
                return;
2✔
259
        }
260

261
        // We resume the download if either:
262
        // * there is any error or
263
        // * successful read with status code Partial Content and there is still data missing
264
        const bool is_range_response =
265
                exp_resp && exp_resp.value()->GetStatusCode() == mender::http::StatusPartialContent;
53✔
266
        const bool is_data_missing =
267
                resumer_client->resumer_state_->offset < resumer_client->resumer_state_->content_length;
53✔
268
        if (!exp_resp || (is_range_response && is_data_missing)) {
53✔
269
                if (!exp_resp) {
44✔
270
                        auto resumer_reader = resumer_client->resumer_reader_.lock();
44✔
271
                        if (resumer_reader) {
44✔
272
                                resumer_reader->inner_reader_.reset();
44✔
273
                        }
274
                        if (exp_resp.error().code == make_error_condition(errc::operation_canceled)) {
44✔
275
                                // We don't want to resume cancelled requests, as these were
276
                                // cancelled for a reason.
277
                                resumer_client->CallUserHandler(exp_resp);
×
278
                                return;
279
                        }
280
                        resumer_client->logger_.Info(
88✔
281
                                "Will try to resume after error " + exp_resp.error().String());
88✔
282
                }
283

284
                auto err = resumer_client->ScheduleNextResumeRequest();
44✔
285
                if (err != error::NoError) {
44✔
286
                        resumer_client->logger_.Error(err.String());
×
287
                        resumer_client->CallUserHandler(expected::unexpected(err));
×
288
                        return;
289
                }
290
        } else {
291
                // Update headers with the last received server response. When resuming has taken place,
292
                // the user will get different headers on header and body handlers, representing (somehow)
293
                // what the resumer has been doing in its behalf.
294
                auto resp = exp_resp.value();
9✔
295
                resumer_client->response_->status_code_ = resp->GetStatusCode();
9✔
296
                resumer_client->response_->status_message_ = resp->GetStatusMessage();
18✔
297
                resumer_client->response_->headers_ = resp->GetHeaders();
298

299
                // Finished, call the user handler \o/
300
                resumer_client->logger_.Debug("Download resumed and completed successfully");
18✔
301
                resumer_client->CallUserHandler(resumer_client->response_);
18✔
302
        }
303
}
304

305
DownloadResumerAsyncReader::~DownloadResumerAsyncReader() {
243✔
306
        Cancel();
81✔
307
}
81✔
308

309
void DownloadResumerAsyncReader::Cancel() {
81✔
310
        auto resumer_client = resumer_client_.lock();
81✔
311
        if (!*cancelled_ && resumer_client) {
81✔
312
                resumer_client->Cancel();
54✔
313
        }
314
}
81✔
315

316
error::Error DownloadResumerAsyncReader::AsyncRead(
2,085✔
317
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
318
        auto resumer_client = resumer_client_.lock();
2,085✔
319
        if (!resumer_client || *cancelled_) {
2,085✔
320
                return error::MakeError(
321
                        error::ProgrammingError,
322
                        "DownloadResumerAsyncReader::AsyncRead called after stream is destroyed");
×
323
        }
324
        // Save user parameters for further resumes of the body read
325
        resumer_client->last_read_ = {.start = start, .end = end, .handler = handler};
2,085✔
326
        return AsyncReadResume();
2,085✔
327
}
328

329
error::Error DownloadResumerAsyncReader::AsyncReadResume() {
2,111✔
330
        auto resumer_client = resumer_client_.lock();
2,111✔
331
        if (!resumer_client) {
2,111✔
332
                return error::MakeError(
333
                        error::ProgrammingError,
334
                        "DownloadResumerAsyncReader::AsyncReadResume called after client is destroyed");
×
335
        }
336
        return inner_reader_->AsyncRead(
337
                resumer_client->last_read_.start,
338
                resumer_client->last_read_.end,
339
                [this](io::ExpectedSize result) {
2,110✔
340
                        if (!result) {
2,110✔
341
                                logger_.Warning(
88✔
342
                                        "Reading error, a new request will be re-scheduled. "
343
                                        + result.error().String());
88✔
344
                        } else {
345
                                resumer_state_->offset += result.value();
2,066✔
346
                                logger_.Debug("read " + to_string(result.value()) + " bytes");
4,132✔
347
                                auto resumer_client = resumer_client_.lock();
2,066✔
348
                                if (resumer_client) {
2,066✔
349
                                        resumer_client->last_read_.handler(result);
4,132✔
350
                                } else {
351
                                        logger_.Error(
×
352
                                                "AsyncRead finish handler called after resumer client has been destroyed.");
×
353
                                }
354
                        }
355
                });
6,332✔
356
}
357

358
DownloadResumerClient::DownloadResumerClient(
124✔
359
        const http::ClientConfig &config, events::EventLoop &event_loop) :
360
        resumer_state_ {make_shared<DownloadResumerClientState>()},
361
        client_(config, event_loop, "http_resumer:client"),
362
        logger_ {"http_resumer:client"},
363
        cancelled_ {make_shared<bool>(true)},
124✔
364
        retry_ {
365
                .backoff = http::ExponentialBackoff(chrono::minutes(1), 10),
366
                .wait_timer = events::Timer(event_loop)} {
496✔
367
}
124✔
368

369
DownloadResumerClient::~DownloadResumerClient() {
248✔
370
        if (!*cancelled_) {
124✔
371
                logger_.Warning("DownloadResumerClient destroyed while request is still active!");
4✔
372
        }
373
        client_.Cancel();
124✔
374
}
124✔
375

376
error::Error DownloadResumerClient::AsyncCall(
87✔
377
        http::OutgoingRequestPtr req,
378
        http::ResponseHandler user_header_handler,
379
        http::ResponseHandler user_body_handler) {
380
        HeaderHandlerFunctor resumer_header_handler {shared_from_this()};
87✔
381
        BodyHandlerFunctor resumer_body_handler {shared_from_this()};
87✔
382

383
        user_request_ = req;
384
        user_header_handler_ = user_header_handler;
87✔
385
        user_body_handler_ = user_body_handler;
87✔
386

387
        if (!*cancelled_) {
87✔
388
                return error::Error(
389
                        make_error_condition(errc::operation_in_progress), "HTTP resumer call already ongoing");
×
390
        }
391

392
        *cancelled_ = false;
87✔
393
        retry_.backoff.Reset();
394
        resumer_state_->active_state = DownloadResumerActiveStatus::Inactive;
87✔
395
        resumer_state_->user_handlers_state = DownloadResumerUserHandlersStatus::None;
87✔
396
        return client_.AsyncCall(req, resumer_header_handler, resumer_body_handler);
348✔
397
}
398

399
io::ExpectedAsyncReaderPtr DownloadResumerClient::MakeBodyAsyncReader(
82✔
400
        http::IncomingResponsePtr resp) {
401
        auto exp_reader = client_.MakeBodyAsyncReader(resp);
164✔
402
        if (!exp_reader) {
82✔
403
                return exp_reader;
404
        }
405
        auto resumer_reader = make_shared<DownloadResumerAsyncReader>(
406
                exp_reader.value(), resumer_state_, cancelled_, shared_from_this());
162✔
407
        resumer_reader_ = resumer_reader;
408
        return resumer_reader;
81✔
409
}
410

411
http::OutgoingRequestPtr DownloadResumerClient::RemainingRangeRequest() const {
53✔
412
        auto range_req = make_shared<http::OutgoingRequest>(*user_request_);
53✔
413
        range_req->SetHeader(
53✔
414
                "Range",
415
                "bytes=" + to_string(resumer_state_->offset) + "-"
106✔
416
                        + to_string(resumer_state_->content_length - 1));
159✔
417
        return range_req;
53✔
418
};
419

420
error::Error DownloadResumerClient::ScheduleNextResumeRequest() {
55✔
421
        auto exp_interval = retry_.backoff.NextInterval();
55✔
422
        if (!exp_interval) {
55✔
423
                return http::MakeError(
424
                        http::DownloadResumerError,
425
                        "Giving up on resuming the download: " + exp_interval.error().String());
2✔
426
        }
427

428
        auto interval = exp_interval.value();
54✔
429
        logger_.Info(
108✔
430
                "Resuming download after "
431
                + to_string(chrono::duration_cast<chrono::seconds>(interval).count()) + " seconds");
108✔
432

433
        HeaderHandlerFunctor resumer_next_header_handler {shared_from_this()};
54✔
434
        BodyHandlerFunctor resumer_next_body_handler {shared_from_this()};
54✔
435

436
        retry_.wait_timer.AsyncWait(
54✔
437
                interval, [this, resumer_next_header_handler, resumer_next_body_handler](error::Error err) {
159✔
438
                        if (err != error::NoError) {
53✔
439
                                auto err_user = http::MakeError(
440
                                        http::DownloadResumerError, "Unexpected error in wait timer: " + err.String());
×
441
                                logger_.Error(err_user.String());
×
442
                                CallUserHandler(expected::unexpected(err_user));
×
443
                                return;
444
                        }
445

446
                        auto next_call_err = client_.AsyncCall(
447
                                RemainingRangeRequest(), resumer_next_header_handler, resumer_next_body_handler);
159✔
448
                        if (next_call_err != error::NoError) {
53✔
449
                                // Schedule once more
450
                                auto err = ScheduleNextResumeRequest();
×
451
                                if (err != error::NoError) {
×
452
                                        logger_.Error(err.String());
×
453
                                        CallUserHandler(expected::unexpected(err));
×
454
                                }
455
                        }
456
                });
162✔
457

458
        return error::NoError;
54✔
459
}
460

461
void DownloadResumerClient::CallUserHandler(http::ExpectedIncomingResponsePtr exp_resp) {
185✔
462
        if (!exp_resp) {
185✔
463
                DoCancel();
89✔
464
        }
465
        if (resumer_state_->user_handlers_state == DownloadResumerUserHandlersStatus::None) {
185✔
466
                resumer_state_->user_handlers_state =
85✔
467
                        DownloadResumerUserHandlersStatus::HeaderHandlerCalled;
468
                user_header_handler_(exp_resp);
170✔
469
        } else if (
100✔
470
                resumer_state_->user_handlers_state
471
                == DownloadResumerUserHandlersStatus::HeaderHandlerCalled) {
472
                resumer_state_->user_handlers_state = DownloadResumerUserHandlersStatus::BodyHandlerCalled;
84✔
473
                DoCancel();
84✔
474
                user_body_handler_(exp_resp);
168✔
475
        } else {
476
                string msg;
477
                if (!exp_resp) {
16✔
478
                        msg = "error: " + exp_resp.error().String();
32✔
479
                } else {
480
                        auto &resp = exp_resp.value();
×
481
                        msg = "response: " + to_string(resp->GetStatusCode()) + " " + resp->GetStatusMessage();
×
482
                }
483
                logger_.Warning("Cannot call any user handler with " + msg);
32✔
484
        }
485
}
185✔
486

487
void DownloadResumerClient::Cancel() {
57✔
488
        DoCancel();
57✔
489
        client_.Cancel();
57✔
490
};
57✔
491

492
void DownloadResumerClient::DoCancel() {
230✔
493
        // Set cancel state and then make a new one. Those who are interested should have their own
494
        // pointer to the old one.
495
        *cancelled_ = true;
230✔
496
        cancelled_ = make_shared<bool>(true);
230✔
497
};
230✔
498

499
} // namespace http_resumer
500
} // namespace update
501
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc