• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1056184062

31 Oct 2023 12:41PM UTC coverage: 80.182% (+0.3%) from 79.877%
1056184062

push

gitlab-ci

kacf
chore: Do not try to resume requests that were cancelled by callers.

This requires us to move the destruction of the inner reader until
after the body handler has been called, otherwise it will trigger
another `operation_canceled` error which we don't want.

Ticket: MEN-6807

Signed-off-by: Kristian Amlie <kristian.amlie@northern.tech>

5 of 5 new or added lines in 1 file covered. (100.0%)

6890 of 8593 relevant lines covered (80.18%)

9363.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.65
/src/mender-update/http_resumer/http_resumer.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <mender-update/http_resumer.hpp>
16

17
#include <regex>
18

19
#include <common/common.hpp>
20
#include <common/expected.hpp>
21

22
namespace mender {
23
namespace update {
24
namespace http_resumer {
25

26
namespace common = mender::common;
27
namespace expected = mender::common::expected;
28
namespace http = mender::http;
29

30
// Represents the parts of a Content-Range HTTP header
31
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range
32
struct RangeHeader {
33
        long long int range_start {0};
34
        long long int range_end {0};
35
        long long int size {0};
36
};
37
using ExpectedRangeHeader = expected::expected<RangeHeader, error::Error>;
38

39
// Parses the HTTP Content-Range header
40
// For an alternative implementation without regex dependency see:
41
// https://github.com/mendersoftware/mender/pull/1372/commits/ea711fc4dafa943266e9013fd6704da3d4518a27
42
ExpectedRangeHeader ParseRangeHeader(string header) {
41✔
43
        RangeHeader range_header {};
44

45
        std::regex content_range_regexp {R"(bytes\s+(\d+)\s?-\s?(\d+)\s?\/?\s?(\d+|\*)?)"};
82✔
46

47
        std::smatch range_matches;
48
        if (!regex_match(header, range_matches, content_range_regexp)) {
41✔
49
                return expected::unexpected(http::MakeError(
10✔
50
                        http::NoSuchHeaderError, "Invalid Content-Range returned from server: " + header));
30✔
51
        }
52

53
        auto exp_range_start = common::StringToLongLong(range_matches[1].str());
31✔
54
        auto exp_range_end = common::StringToLongLong(range_matches[2].str());
62✔
55
        if (!exp_range_start || !exp_range_end) {
31✔
56
                return expected::unexpected(http::MakeError(
×
57
                        http::NoSuchHeaderError, "Content-Range contains invalid number: " + header));
×
58
        }
59
        range_header.range_start = exp_range_start.value();
31✔
60
        range_header.range_end = exp_range_end.value();
31✔
61

62
        if (range_header.range_start > range_header.range_end) {
31✔
63
                return expected::unexpected(http::MakeError(
1✔
64
                        http::NoSuchHeaderError, "Invalid Content-Range returned from server: " + header));
3✔
65
        }
66

67
        if ((range_matches[3].matched) && (range_matches[3].str() != "*")) {
55✔
68
                auto exp_size = common::StringToLongLong(range_matches[3].str());
40✔
69
                if (!exp_size) {
20✔
70
                        return expected::unexpected(http::MakeError(
×
71
                                http::NoSuchHeaderError, "Content-Range contains invalid number: " + header));
×
72
                }
73
                range_header.size = exp_size.value();
20✔
74
        }
75

76
        return range_header;
77
}
78

79
class HeaderHandlerFunctor {
563✔
80
public:
81
        HeaderHandlerFunctor(weak_ptr<DownloadResumerClient> resumer) :
82
                resumer_client_ {resumer} {};
83

84
        void operator()(http::ExpectedIncomingResponsePtr exp_resp);
85

86
private:
87
        void HandleFirstResponse(
88
                const shared_ptr<DownloadResumerClient> &resumer_client,
89
                http::ExpectedIncomingResponsePtr exp_resp);
90
        void HandleNextResponse(
91
                const shared_ptr<DownloadResumerClient> &resumer_client,
92
                http::ExpectedIncomingResponsePtr exp_resp);
93

94
        weak_ptr<DownloadResumerClient> resumer_client_;
95
};
96

97
class BodyHandlerFunctor {
583✔
98
public:
99
        BodyHandlerFunctor(weak_ptr<DownloadResumerClient> resumer) :
100
                resumer_client_ {resumer} {};
101

102
        void operator()(http::ExpectedIncomingResponsePtr exp_resp);
103

104
private:
105
        weak_ptr<DownloadResumerClient> resumer_client_;
106
};
107

108
void HeaderHandlerFunctor::operator()(http::ExpectedIncomingResponsePtr exp_resp) {
138✔
109
        auto resumer_client = resumer_client_.lock();
138✔
110
        if (resumer_client) {
138✔
111
                if (resumer_client->resumer_state_->active_state == DownloadResumerActiveStatus::Resuming) {
137✔
112
                        HandleNextResponse(resumer_client, exp_resp);
106✔
113
                } else {
114
                        HandleFirstResponse(resumer_client, exp_resp);
168✔
115
                }
116
        }
117
}
138✔
118

119
void HeaderHandlerFunctor::HandleFirstResponse(
84✔
120
        const shared_ptr<DownloadResumerClient> &resumer_client,
121
        http::ExpectedIncomingResponsePtr exp_resp) {
122
        // The first response shall always call the user header callback. On resumable responses, we
123
        // create a our own incoming response and call the user header handler. On errors, we log a
124
        // warning and call the user handler with the original response
125

126
        if (!exp_resp) {
84✔
127
                resumer_client->logger_.Warning(exp_resp.error().String());
×
128
                resumer_client->CallUserHandler(exp_resp);
×
129
                return;
3✔
130
        }
131
        auto resp = exp_resp.value();
84✔
132

133
        if (resp->GetStatusCode() != mender::http::StatusOK) {
84✔
134
                // Non-resumable response
135
                resumer_client->CallUserHandler(exp_resp);
2✔
136
                return;
2✔
137
        }
138

139
        auto exp_header = resp->GetHeader("Content-Length");
164✔
140
        if (!exp_header || exp_header.value() == "0") {
82✔
141
                resumer_client->logger_.Warning("Response does not contain Content-Length header");
2✔
142
                resumer_client->CallUserHandler(exp_resp);
1✔
143
                return;
1✔
144
        }
145

146
        auto exp_length = common::StringToLongLong(exp_header.value());
81✔
147
        if (!exp_length || exp_length.value() < 0) {
81✔
148
                resumer_client->logger_.Warning(
×
149
                        "Content-Length contains invalid number: " + exp_header.value());
×
150
                resumer_client->CallUserHandler(exp_resp);
×
151
                return;
152
        }
153

154
        // Resumable response
155
        resumer_client->resumer_state_->active_state = DownloadResumerActiveStatus::Resuming;
81✔
156
        resumer_client->resumer_state_->offset = 0;
81✔
157
        resumer_client->resumer_state_->content_length = exp_length.value();
81✔
158

159
        // Prepare a modified response and call user handler
160
        resumer_client->response_.reset(new http::IncomingResponse(*resumer_client, resp->cancelled_));
162✔
161
        resumer_client->response_->status_code_ = resp->GetStatusCode();
81✔
162
        resumer_client->response_->status_message_ = resp->GetStatusMessage();
162✔
163
        resumer_client->response_->headers_ = resp->GetHeaders();
164
        resumer_client->CallUserHandler(resumer_client->response_);
162✔
165
}
166

167
void HeaderHandlerFunctor::HandleNextResponse(
53✔
168
        const shared_ptr<DownloadResumerClient> &resumer_client,
169
        http::ExpectedIncomingResponsePtr exp_resp) {
170
        // If an error occurs has already occurred, schedule the next AsyncCall directly
171
        // If an error occurs during handling here, cancel the resuming and call the user handler.
172
        if (!exp_resp) {
53✔
173
                resumer_client->logger_.Warning(exp_resp.error().String());
22✔
174

175
                auto err = resumer_client->ScheduleNextResumeRequest();
11✔
176
                if (err != error::NoError) {
11✔
177
                        resumer_client->logger_.Error(err.String());
2✔
178
                        resumer_client->CallUserHandler(expected::unexpected(err));
2✔
179
                }
180
                return;
181
        }
182
        auto resp = exp_resp.value();
42✔
183

184
        auto resumer_reader = resumer_client->resumer_reader_.lock();
42✔
185
        if (!resumer_reader) {
42✔
186
                // Errors should already have been handled as part of the Cancel() inside the
187
                // destructor of the reader.
188
                return;
189
        }
190

191
        auto exp_content_range = resp->GetHeader("Content-Range").and_then(ParseRangeHeader);
84✔
192
        if (!exp_content_range) {
42✔
193
                resumer_client->logger_.Error(exp_content_range.error().String());
24✔
194
                resumer_client->CallUserHandler(expected::unexpected(exp_content_range.error()));
24✔
195
                return;
12✔
196
        }
197

198
        auto content_range = exp_content_range.value();
30✔
199
        if (content_range.size != 0
200
                && content_range.size != resumer_client->resumer_state_->content_length) {
30✔
201
                auto size_changed_err = http::MakeError(
202
                        http::DownloadResumerError,
203
                        "Size of artifact changed after download was resumed (expected "
204
                                + to_string(resumer_client->resumer_state_->content_length) + ", got "
4✔
205
                                + to_string(content_range.size) + ")");
8✔
206
                resumer_client->logger_.Error(size_changed_err.String());
4✔
207
                resumer_client->CallUserHandler(expected::unexpected(size_changed_err));
4✔
208
                return;
209
        }
210

211
        if ((content_range.range_end != resumer_client->resumer_state_->content_length - 1)
28✔
212
                || (content_range.range_start != resumer_client->resumer_state_->offset)) {
28✔
213
                auto bad_range_err = http::MakeError(
214
                        http::DownloadResumerError,
215
                        "HTTP server returned an different range than requested. Requested "
216
                                + to_string(resumer_client->resumer_state_->offset) + "-"
4✔
217
                                + to_string(resumer_client->resumer_state_->content_length - 1) + ", got "
8✔
218
                                + to_string(content_range.range_start) + "-" + to_string(content_range.range_end));
8✔
219
                resumer_client->logger_.Error(bad_range_err.String());
4✔
220
                resumer_client->CallUserHandler(expected::unexpected(bad_range_err));
4✔
221
                return;
222
        }
223

224
        // Get the reader for the new response
225
        auto exp_reader = resumer_client->client_.MakeBodyAsyncReader(resp);
52✔
226
        if (!exp_reader) {
26✔
227
                auto bad_range_err = exp_reader.error().WithContext("cannot get the reader after resume");
×
228
                resumer_client->logger_.Error(bad_range_err.String());
×
229
                resumer_client->CallUserHandler(expected::unexpected(bad_range_err));
×
230
                return;
231
        }
232
        // Update the inner reader of the user reader
233
        resumer_reader->inner_reader_ = exp_reader.value();
26✔
234

235
        // Resume reading reusing last user data (start, end, handler)
236
        auto err = resumer_reader->AsyncReadResume();
26✔
237
        if (err != error::NoError) {
26✔
238
                auto bad_read_err = err.WithContext("error reading after resume");
×
239
                resumer_client->logger_.Error(bad_read_err.String());
×
240
                resumer_client->CallUserHandler(expected::unexpected(bad_read_err));
×
241
                return;
242
        }
243
}
244

245
void BodyHandlerFunctor::operator()(http::ExpectedIncomingResponsePtr exp_resp) {
126✔
246
        auto resumer_client = resumer_client_.lock();
126✔
247
        if (!resumer_client) {
126✔
248
                return;
249
        }
250

251
        if (*resumer_client->cancelled_) {
126✔
252
                resumer_client->CallUserHandler(exp_resp);
19✔
253
                return;
19✔
254
        }
255

256
        if (resumer_client->resumer_state_->active_state == DownloadResumerActiveStatus::Inactive) {
107✔
257
                resumer_client->CallUserHandler(exp_resp);
2✔
258
                return;
2✔
259
        }
260

261
        // We resume the download if either:
262
        // * there is any error or
263
        // * successful read with status code Partial Content and there is still data missing
264
        const bool is_range_response =
265
                exp_resp && exp_resp.value()->GetStatusCode() == mender::http::StatusPartialContent;
105✔
266
        const bool is_data_missing =
267
                resumer_client->resumer_state_->offset < resumer_client->resumer_state_->content_length;
105✔
268
        if (!exp_resp || (is_range_response && is_data_missing)) {
105✔
269
                if (!exp_resp) {
96✔
270
                        auto resumer_reader = resumer_client->resumer_reader_.lock();
96✔
271
                        if (resumer_reader) {
96✔
272
                                resumer_reader->inner_reader_.reset();
43✔
273
                        }
274
                        if (exp_resp.error().code == make_error_condition(errc::operation_canceled)) {
96✔
275
                                // We don't want to resume cancelled requests, as these were
276
                                // cancelled for a reason.
277
                                resumer_client->CallUserHandler(exp_resp);
106✔
278
                                return;
279
                        }
280
                        resumer_client->logger_.Info(
86✔
281
                                "Will try to resume after error " + exp_resp.error().String());
86✔
282
                }
283

284
                auto err = resumer_client->ScheduleNextResumeRequest();
43✔
285
                if (err != error::NoError) {
43✔
286
                        resumer_client->logger_.Error(err.String());
×
287
                        resumer_client->CallUserHandler(expected::unexpected(err));
×
288
                        return;
289
                }
290
        } else {
291
                // Update headers with the last received server response. When resuming has taken place,
292
                // the user will get different headers on header and body handlers, representing (somehow)
293
                // what the resumer has been doing in its behalf.
294
                auto resp = exp_resp.value();
9✔
295
                resumer_client->response_->status_code_ = resp->GetStatusCode();
9✔
296
                resumer_client->response_->status_message_ = resp->GetStatusMessage();
18✔
297
                resumer_client->response_->headers_ = resp->GetHeaders();
298

299
                // Finished, call the user handler \o/
300
                resumer_client->logger_.Debug("Download resumed and completed successfully");
18✔
301
                resumer_client->CallUserHandler(resumer_client->response_);
18✔
302
        }
303
}
304

305
void DownloadResumerAsyncReader::Cancel() {
×
306
        if (!*cancelled_) {
×
307
                inner_reader_->Cancel();
×
308
        }
309
}
×
310

311
error::Error DownloadResumerAsyncReader::AsyncRead(
2,043✔
312
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
313
        auto resumer_client = resumer_client_.lock();
2,043✔
314
        if (!resumer_client || *cancelled_) {
2,043✔
315
                return error::MakeError(
316
                        error::ProgrammingError,
317
                        "DownloadResumerAsyncReader::AsyncRead called after stream is destroyed");
×
318
        }
319
        // Save user parameters for further resumes of the body read
320
        resumer_client->last_read_ = {.start = start, .end = end, .handler = handler};
2,043✔
321
        return AsyncReadResume();
2,043✔
322
}
323

324
error::Error DownloadResumerAsyncReader::AsyncReadResume() {
2,069✔
325
        auto resumer_client = resumer_client_.lock();
2,069✔
326
        if (!resumer_client) {
2,069✔
327
                return error::MakeError(
328
                        error::ProgrammingError,
329
                        "DownloadResumerAsyncReader::AsyncReadResume called after client is destroyed");
×
330
        }
331
        return inner_reader_->AsyncRead(
332
                resumer_client->last_read_.start,
333
                resumer_client->last_read_.end,
334
                [this](io::ExpectedSize result) {
2,068✔
335
                        if (!result) {
2,068✔
336
                                logger_.Warning(
86✔
337
                                        "Reading error, a new request will be re-scheduled. "
338
                                        + result.error().String());
86✔
339
                        } else {
340
                                resumer_state_->offset += result.value();
2,025✔
341
                                logger_.Debug("read " + to_string(result.value()) + " bytes");
4,050✔
342
                                auto resumer_client = resumer_client_.lock();
2,025✔
343
                                if (resumer_client) {
2,025✔
344
                                        resumer_client->last_read_.handler(result);
4,050✔
345
                                } else {
346
                                        logger_.Error(
×
347
                                                "AsyncRead finish handler called after resumer client has been destroyed.");
×
348
                                }
349
                        }
350
                });
6,206✔
351
}
352

353
DownloadResumerClient::DownloadResumerClient(
123✔
354
        const http::ClientConfig &config, events::EventLoop &event_loop) :
355
        resumer_state_ {make_shared<DownloadResumerClientState>()},
356
        client_(config, event_loop, "http_resumer:client"),
357
        logger_ {"http_resumer:client"},
358
        cancelled_ {make_shared<bool>(true)},
123✔
359
        retry_ {
360
                .backoff = http::ExponentialBackoff(chrono::minutes(1), 10),
361
                .wait_timer = events::Timer(event_loop)} {
492✔
362
}
123✔
363

364
DownloadResumerClient::~DownloadResumerClient() {
246✔
365
        if (!*cancelled_) {
123✔
366
                logger_.Warning("DownloadResumerClient destroyed while request is still active!");
2✔
367
        }
368
        client_.Cancel();
123✔
369
}
123✔
370

371
error::Error DownloadResumerClient::AsyncCall(
85✔
372
        http::OutgoingRequestPtr req,
373
        http::ResponseHandler user_header_handler,
374
        http::ResponseHandler user_body_handler) {
375
        HeaderHandlerFunctor resumer_header_handler {shared_from_this()};
85✔
376
        BodyHandlerFunctor resumer_body_handler {shared_from_this()};
85✔
377

378
        user_request_ = req;
379
        user_header_handler_ = user_header_handler;
85✔
380
        user_body_handler_ = user_body_handler;
85✔
381

382
        if (!*cancelled_) {
85✔
383
                return error::Error(
384
                        make_error_condition(errc::operation_in_progress), "HTTP resumer call already ongoing");
×
385
        }
386

387
        *cancelled_ = false;
85✔
388
        retry_.backoff.Reset();
389
        resumer_state_->active_state = DownloadResumerActiveStatus::Inactive;
85✔
390
        resumer_state_->user_handlers_state = DownloadResumerUserHandlersStatus::None;
85✔
391
        return client_.AsyncCall(req, resumer_header_handler, resumer_body_handler);
340✔
392
}
393

394
io::ExpectedAsyncReaderPtr DownloadResumerClient::MakeBodyAsyncReader(
81✔
395
        http::IncomingResponsePtr resp) {
396
        auto exp_reader = client_.MakeBodyAsyncReader(resp);
162✔
397
        if (!exp_reader) {
81✔
398
                return exp_reader;
399
        }
400
        auto resumer_reader = make_shared<DownloadResumerAsyncReader>(
401
                exp_reader.value(), resumer_state_, cancelled_, shared_from_this());
160✔
402
        resumer_reader_ = resumer_reader;
403
        return resumer_reader;
80✔
404
}
405

406
http::OutgoingRequestPtr DownloadResumerClient::RemainingRangeRequest() const {
53✔
407
        auto range_req = make_shared<http::OutgoingRequest>(*user_request_);
53✔
408
        range_req->SetHeader(
53✔
409
                "Range",
410
                "bytes=" + to_string(resumer_state_->offset) + "-"
106✔
411
                        + to_string(resumer_state_->content_length - 1));
159✔
412
        return range_req;
53✔
413
};
414

415
error::Error DownloadResumerClient::ScheduleNextResumeRequest() {
54✔
416
        auto exp_interval = retry_.backoff.NextInterval();
54✔
417
        if (!exp_interval) {
54✔
418
                return http::MakeError(
419
                        http::DownloadResumerError,
420
                        "Giving up on resuming the download: " + exp_interval.error().String());
2✔
421
        }
422

423
        auto interval = exp_interval.value();
53✔
424
        logger_.Info(
106✔
425
                "Resuming download after "
426
                + to_string(chrono::duration_cast<chrono::seconds>(interval).count()) + " seconds");
106✔
427

428
        HeaderHandlerFunctor resumer_next_header_handler {shared_from_this()};
53✔
429
        BodyHandlerFunctor resumer_next_body_handler {shared_from_this()};
53✔
430

431
        retry_.wait_timer.AsyncWait(
53✔
432
                interval, [this, resumer_next_header_handler, resumer_next_body_handler](error::Error err) {
159✔
433
                        if (err != error::NoError) {
53✔
434
                                auto err_user = http::MakeError(
435
                                        http::DownloadResumerError, "Unexpected error in wait timer: " + err.String());
×
436
                                logger_.Error(err_user.String());
×
437
                                CallUserHandler(expected::unexpected(err_user));
×
438
                                return;
439
                        }
440

441
                        auto next_call_err = client_.AsyncCall(
442
                                RemainingRangeRequest(), resumer_next_header_handler, resumer_next_body_handler);
159✔
443
                        if (next_call_err != error::NoError) {
53✔
444
                                // Schedule once more
445
                                auto err = ScheduleNextResumeRequest();
×
446
                                if (err != error::NoError) {
×
447
                                        logger_.Error(err.String());
×
448
                                        CallUserHandler(expected::unexpected(err));
×
449
                                }
450
                        }
451
                });
159✔
452

453
        return error::NoError;
53✔
454
}
455

456
void DownloadResumerClient::CallUserHandler(http::ExpectedIncomingResponsePtr exp_resp) {
184✔
457
        if (!exp_resp) {
184✔
458
                DoCancel();
89✔
459
        }
460
        if (resumer_state_->user_handlers_state == DownloadResumerUserHandlersStatus::None) {
184✔
461
                resumer_state_->user_handlers_state =
84✔
462
                        DownloadResumerUserHandlersStatus::HeaderHandlerCalled;
463
                user_header_handler_(exp_resp);
168✔
464
        } else if (
100✔
465
                resumer_state_->user_handlers_state
466
                == DownloadResumerUserHandlersStatus::HeaderHandlerCalled) {
467
                resumer_state_->user_handlers_state = DownloadResumerUserHandlersStatus::BodyHandlerCalled;
84✔
468
                DoCancel();
84✔
469
                user_body_handler_(exp_resp);
168✔
470
        } else {
471
                string msg;
472
                if (!exp_resp) {
16✔
473
                        msg = "error: " + exp_resp.error().String();
32✔
474
                } else {
475
                        auto &resp = exp_resp.value();
×
476
                        msg = "response: " + to_string(resp->GetStatusCode()) + " " + resp->GetStatusMessage();
×
477
                }
478
                logger_.Warning("Cannot call any user handler with " + msg);
32✔
479
        }
480
}
184✔
481

482
void DownloadResumerClient::Cancel() {
3✔
483
        DoCancel();
3✔
484
        client_.Cancel();
3✔
485
};
3✔
486

487
void DownloadResumerClient::DoCancel() {
176✔
488
        // Set cancel state and then make a new one. Those who are interested should have their own
489
        // pointer to the old one.
490
        *cancelled_ = true;
176✔
491
        cancelled_ = make_shared<bool>(true);
176✔
492
};
176✔
493

494
} // namespace http_resumer
495
} // namespace update
496
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc