• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1010553510

20 Sep 2023 06:54AM UTC coverage: 78.964% (+0.02%) from 78.946%
1010553510

push

gitlab-ci

oleorhagen
feat(bootstrap-artifact): Install the bootstrap Artifact

Ticket: MEN-6671
Changelog: None

Signed-off-by: Ole Petter <ole.orhagen@northern.tech>

39 of 39 new or added lines in 3 files covered. (100.0%)

6175 of 7820 relevant lines covered (78.96%)

11616.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.98
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24

25
namespace mender {
26
namespace http {
27

28
namespace common = mender::common;
29

30
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
31
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
32
const unsigned int BeastHttpVersion = 11;
33

34
namespace asio = boost::asio;
35
namespace http = boost::beast::http;
36

37
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
38

39
static http::verb MethodToBeastVerb(Method method) {
139✔
40
        switch (method) {
139✔
41
        case Method::GET:
98✔
42
                return http::verb::get;
98✔
43
        case Method::HEAD:
×
44
                return http::verb::head;
×
45
        case Method::POST:
19✔
46
                return http::verb::post;
19✔
47
        case Method::PUT:
22✔
48
                return http::verb::put;
22✔
49
        case Method::PATCH:
×
50
                return http::verb::patch;
×
51
        case Method::CONNECT:
×
52
                return http::verb::connect;
×
53
        case Method::Invalid:
×
54
                // Fallthrough to end (no-op).
55
                break;
×
56
        }
57
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
58
        // still assert here for safety.
59
        assert(false);
×
60
        return http::verb::get;
61
}
62

63
static expected::expected<Method, error::Error> BeastVerbToMethod(
135✔
64
        http::verb verb, const string &verb_string) {
65
        switch (verb) {
135✔
66
        case http::verb::get:
94✔
67
                return Method::GET;
94✔
68
        case http::verb::head:
×
69
                return Method::HEAD;
×
70
        case http::verb::post:
19✔
71
                return Method::POST;
19✔
72
        case http::verb::put:
22✔
73
                return Method::PUT;
22✔
74
        case http::verb::patch:
×
75
                return Method::PATCH;
×
76
        case http::verb::connect:
×
77
                return Method::CONNECT;
×
78
        default:
×
79
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
80
        }
81
}
82

83
template <typename StreamType>
84
class BodyAsyncReader : virtual public io::AsyncReader {
85
public:
86
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
88✔
87
                stream_ {stream},
88
                cancelled_ {cancelled} {
88✔
89
        }
88✔
90
        ~BodyAsyncReader() {
43✔
91
                Cancel();
43✔
92
        }
43✔
93

94
        error::Error AsyncRead(
2,044✔
95
                vector<uint8_t>::iterator start,
96
                vector<uint8_t>::iterator end,
97
                io::AsyncIoHandler handler) override {
98
                if (*cancelled_) {
2,044✔
99
                        return error::MakeError(
×
100
                                error::ProgrammingError,
101
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
102
                }
103
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,044✔
104
                return error::NoError;
2,044✔
105
        }
106

107
        void Cancel() override {
45✔
108
                if (!*cancelled_) {
45✔
109
                        stream_.Cancel();
5✔
110
                }
111
        }
45✔
112

113
private:
114
        StreamType &stream_;
115
        shared_ptr<bool> cancelled_;
116

117
        friend class Client;
118
        friend class Server;
119
};
120

121
Client::Client(
93✔
122
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
256✔
123
        event_loop_ {event_loop},
124
        logger_name_ {logger_name},
125
        cancelled_ {make_shared<bool>(true)},
×
126
        resolver_(GetAsioIoContext(event_loop)),
127
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
186✔
128
        // This is equivalent to:
129
        //   response_buffer_.reserve(body_buffer_.size());
130
        // but compatible with Boost 1.67.
131
        response_buffer_.prepare(body_buffer_.size() - response_buffer_.size());
93✔
132

133
        ssl_ctx_.set_verify_mode(ssl::verify_peer);
93✔
134

135
        if (client.client_cert_path != "" and client.client_cert_key_path != "") {
93✔
136
                ssl_ctx_.set_options(boost::asio::ssl::context::default_workarounds);
×
137
                ssl_ctx_.use_certificate_file(client.client_cert_path, boost::asio::ssl::context_base::pem);
×
138
                ssl_ctx_.use_private_key_file(
×
139
                        client.client_cert_key_path, boost::asio::ssl::context_base::pem);
×
140
        }
141

142
        beast::error_code ec {};
93✔
143
        ssl_ctx_.set_default_verify_paths(ec); // Load the default CAs
93✔
144
        if (ec) {
93✔
145
                log::Error("Failed to load the SSL default directory");
×
146
        }
147
        if (client.server_cert_path != "") {
93✔
148
                ssl_ctx_.load_verify_file(client.server_cert_path, ec);
×
149
                if (ec) {
×
150
                        log::Error("Failed to load the server certificate!");
×
151
                }
152
        }
153
}
93✔
154

155
Client::~Client() {
93✔
156
        if (!*cancelled_) {
93✔
157
                logger_.Warning("Client destroyed while request is still active!");
×
158
        }
159
        DoCancel();
93✔
160
}
93✔
161

162
error::Error Client::AsyncCall(
146✔
163
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
164
        if (!*cancelled_) {
146✔
165
                return error::Error(
166
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
167
        }
168

169
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
146✔
170
                return error::MakeError(error::ProgrammingError, "Request is not ready");
2✔
171
        }
172

173
        if (!header_handler || !body_handler) {
144✔
174
                return error::MakeError(
175
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
1✔
176
        }
177

178
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
143✔
179
                return error::Error(
180
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
181
        }
182

183
        if (req->address_.protocol == "https") {
142✔
184
                is_https_ = true;
5✔
185
        }
186

187
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
142✔
188

189
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
190
        // request to route to our k8s cluster. Set this in all cases.
191
        req->SetHeader("HOST", req->address_.host);
142✔
192

193
        request_ = req;
142✔
194
        header_handler_ = header_handler;
142✔
195
        body_handler_ = body_handler;
142✔
196
        status_ = TransactionStatus::None;
142✔
197

198
        *cancelled_ = false;
142✔
199

200
        auto &cancelled = cancelled_;
142✔
201

202
        resolver_.async_resolve(
426✔
203
                request_->address_.host,
142✔
204
                to_string(request_->address_.port),
284✔
205
                [this, cancelled](
142✔
206
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
141✔
207
                        if (!*cancelled) {
142✔
208
                                ResolveHandler(ec, results);
141✔
209
                        }
210
                });
142✔
211

212
        return error::NoError;
142✔
213
}
214

215
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
101✔
216
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
101✔
217
                return expected::unexpected(error::Error(
1✔
218
                        make_error_condition(errc::operation_in_progress),
1✔
219
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
220
        }
221

222
        if (response_body_length_ == 0) {
100✔
223
                return expected::unexpected(
12✔
224
                        MakeError(BodyMissingError, "Response does not contain a body"));
24✔
225
        }
226

227
        status_ = TransactionStatus::ReaderCreated;
88✔
228
        return make_shared<BodyAsyncReader<Client>>(resp->client_, resp->cancelled_);
176✔
229
}
230

231
void Client::CallHandler(ResponseHandler handler) {
185✔
232
        // This function exists to make sure we have a copy of the handler we're calling (in the
233
        // argument list). This is important in case the handler owns the client instance through a
234
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
235
        // does, then it destroyes the final copy of the handler, and therefore also the client,
236
        // which is why we need to make a copy here, before calling it.
237
        handler(response_);
185✔
238
}
185✔
239

240
void Client::CallErrorHandler(
12✔
241
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
242
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
243
}
12✔
244

245
void Client::CallErrorHandler(
76✔
246
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
247
        *cancelled_ = true;
76✔
248
        cancelled_ = make_shared<bool>(true);
76✔
249
        stream_.reset();
76✔
250
        status_ = TransactionStatus::Done;
76✔
251
        handler(expected::unexpected(
76✔
252
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
152✔
253
}
76✔
254

255
void Client::ResolveHandler(
141✔
256
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
257
        if (ec) {
141✔
258
                CallErrorHandler(ec, request_, header_handler_);
×
259
                return;
×
260
        }
261

262
        if (logger_.Level() >= log::LogLevel::Debug) {
141✔
263
                string ips = "[";
280✔
264
                string sep;
140✔
265
                for (auto r : results) {
293✔
266
                        ips += sep;
153✔
267
                        ips += r.endpoint().address().to_string();
153✔
268
                        sep = ", ";
153✔
269
                }
270
                ips += "]";
140✔
271
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
140✔
272
        }
273

274
        resolver_results_ = results;
141✔
275

276
        stream_ = make_shared<ssl::stream<tcp::socket>>(GetAsioIoContext(event_loop_), ssl_ctx_);
141✔
277

278
        http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
141✔
279

280
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
281
        // if they do, they should be handled higher up in the application logic.
282
        //
283
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
284
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
285
        // `has_value()` in their code, causing their subsequent comparison operation to
286
        // misbehave. So pass highest possible value instead.
287
        http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
141✔
288

289
        auto &cancelled = cancelled_;
141✔
290

291
        asio::async_connect(
282✔
292
                stream_->next_layer(),
141✔
293
                resolver_results_,
141✔
294
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
423✔
295
                        if (!*cancelled) {
141✔
296
                                if (is_https_) {
141✔
297
                                        return HandshakeHandler(ec, endpoint);
5✔
298
                                }
299
                                return ConnectHandler(ec, endpoint);
136✔
300
                        }
301
                });
302
}
303

304
void Client::HandshakeHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
5✔
305
        if (ec) {
5✔
306
                CallErrorHandler(ec, request_, header_handler_);
×
307
                return;
×
308
        }
309

310
        // Set SNI Hostname (many hosts need this to handshake successfully)
311
        if (!SSL_set_tlsext_host_name(stream_->native_handle(), request_->address_.host.c_str())) {
5✔
312
                beast::error_code ec2 {
313
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
314
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
315
        }
316

317
        auto &cancelled = cancelled_;
5✔
318

319
        stream_->async_handshake(
5✔
320
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
16✔
321
                        if (*cancelled) {
5✔
322
                                return;
×
323
                        }
324
                        if (ec) {
5✔
325
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
1✔
326
                                CallErrorHandler(ec, request_, header_handler_);
1✔
327
                                return;
1✔
328
                        }
329
                        logger_.Debug("https: Successful SSL handshake");
4✔
330
                        ConnectHandler(ec, endpoint);
4✔
331
                });
332
}
333

334

335
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
140✔
336
        if (ec) {
140✔
337
                CallErrorHandler(ec, request_, header_handler_);
1✔
338
                return;
1✔
339
        }
340

341
        logger_.Debug("Connected to " + endpoint.address().to_string());
139✔
342

343
        http_request_ = make_shared<http::request<http::buffer_body>>(
139✔
344
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
139✔
345

346
        for (const auto &header : request_->headers_) {
421✔
347
                http_request_->set(header.first, header.second);
282✔
348
        }
349

350
        http_request_serializer_ =
351
                make_shared<http::request_serializer<http::buffer_body>>(*http_request_);
139✔
352

353
        auto &cancelled = cancelled_;
139✔
354

355
        if (is_https_) {
139✔
356
                http::async_write_header(
8✔
357
                        *stream_,
4✔
358
                        *http_request_serializer_,
4✔
359
                        [this, cancelled](const error_code &ec, size_t num_written) {
8✔
360
                                if (!*cancelled) {
4✔
361
                                        WriteHeaderHandler(ec, num_written);
4✔
362
                                }
363
                        });
4✔
364
        } else {
365
                http::async_write_header(
270✔
366
                        stream_->next_layer(),
135✔
367
                        *http_request_serializer_,
135✔
368
                        [this, cancelled](const error_code &ec, size_t num_written) {
270✔
369
                                if (!*cancelled) {
135✔
370
                                        WriteHeaderHandler(ec, num_written);
135✔
371
                                }
372
                        });
135✔
373
        }
374
}
375

376
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
139✔
377
        if (num_written > 0) {
139✔
378
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
139✔
379
        }
380

381
        if (ec) {
139✔
382
                CallErrorHandler(ec, request_, header_handler_);
×
383
                return;
95✔
384
        }
385

386
        auto header = request_->GetHeader("Content-Length");
278✔
387
        if (!header || header.value() == "0") {
139✔
388
                ReadHeader();
94✔
389
                return;
94✔
390
        }
391

392
        auto length = common::StringToLongLong(header.value());
45✔
393
        if (!length || length.value() < 0) {
45✔
394
                auto err = error::Error(
395
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
396
                CallErrorHandler(err, request_, header_handler_);
×
397
                return;
×
398
        }
399
        request_body_length_ = length.value();
45✔
400

401
        if (!request_->body_gen_ && !request_->async_body_gen_) {
45✔
402
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
403
                CallErrorHandler(err, request_, header_handler_);
1✔
404
                return;
1✔
405
        }
406

407
        assert(!(request_->body_gen_ && request_->async_body_gen_));
44✔
408

409
        if (request_->body_gen_) {
44✔
410
                auto body_reader = request_->body_gen_();
38✔
411
                if (!body_reader) {
38✔
412
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
413
                        return;
×
414
                }
415
                request_->body_reader_ = body_reader.value();
38✔
416
        } else {
417
                auto body_reader = request_->async_body_gen_();
6✔
418
                if (!body_reader) {
6✔
419
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
420
                        return;
×
421
                }
422
                request_->async_body_reader_ = body_reader.value();
6✔
423
        }
424

425
        PrepareAndWriteNewBodyBuffer();
44✔
426
}
427

428
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,136✔
429
        if (num_written > 0) {
2,136✔
430
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
1,048✔
431
        }
432

433
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,136✔
434
                // Write next block of the body.
435
                PrepareAndWriteNewBodyBuffer();
1,048✔
436
        } else if (ec) {
1,088✔
437
                CallErrorHandler(ec, request_, header_handler_);
3✔
438
        } else if (num_written > 0) {
1,085✔
439
                // We are still writing the body.
440
                WriteBody();
1,048✔
441
        } else {
442
                // We are ready to receive the response.
443
                ReadHeader();
37✔
444
        }
445
}
2,136✔
446

447
void Client::PrepareAndWriteNewBodyBuffer() {
1,092✔
448
        // request_->body_reader_ XOR request_->async_body_reader_
449
        assert(
1,092✔
450
                (request_->body_reader_ || request_->async_body_reader_)
451
                && !(request_->body_reader_ && request_->async_body_reader_));
452

453
        auto cancelled = cancelled_;
2,184✔
454
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
2,185✔
455
                if (!*cancelled) {
1,092✔
456
                        if (!read) {
1,091✔
457
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
458
                                return;
2✔
459
                        }
460
                        WriteNewBodyBuffer(read.value());
1,089✔
461
                }
462
        };
2,184✔
463

464

465
        if (request_->body_reader_) {
1,092✔
466
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
667✔
467
        } else {
468
                auto err = request_->async_body_reader_->AsyncRead(
425✔
469
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
1,275✔
470
                if (err != error::NoError) {
425✔
471
                        CallErrorHandler(err, request_, header_handler_);
×
472
                }
473
        }
474
}
1,092✔
475

476
void Client::WriteNewBodyBuffer(size_t size) {
1,089✔
477
        http_request_->body().data = body_buffer_.data();
1,089✔
478
        http_request_->body().size = size;
1,089✔
479

480
        if (size > 0) {
1,089✔
481
                http_request_->body().more = true;
1,052✔
482
        } else {
483
                // Release ownership of Body reader.
484
                request_->body_reader_.reset();
37✔
485
                request_->async_body_reader_.reset();
37✔
486
                http_request_->body().more = false;
37✔
487
        }
488

489
        WriteBody();
1,089✔
490
}
1,089✔
491

492
void Client::WriteBody() {
2,137✔
493
        auto &cancelled = cancelled_;
2,137✔
494

495
        if (is_https_) {
2,137✔
496
                http::async_write_some(
×
497
                        *stream_,
×
498
                        *http_request_serializer_,
×
499
                        [this, cancelled](const error_code &ec, size_t num_written) {
×
500
                                if (!*cancelled) {
×
501
                                        WriteBodyHandler(ec, num_written);
×
502
                                }
503
                        });
×
504
        } else {
505
                http::async_write_some(
4,274✔
506
                        stream_->next_layer(),
2,137✔
507
                        *http_request_serializer_,
2,137✔
508
                        [this, cancelled](const error_code &ec, size_t num_written) {
4,272✔
509
                                if (!*cancelled) {
2,136✔
510
                                        WriteBodyHandler(ec, num_written);
2,136✔
511
                                }
512
                        });
2,136✔
513
        }
514
}
2,137✔
515

516
void Client::ReadHeader() {
131✔
517
        http_response_parser_->get().body().data = body_buffer_.data();
131✔
518
        http_response_parser_->get().body().size = body_buffer_.size();
131✔
519

520
        auto &cancelled = cancelled_;
131✔
521

522
        if (is_https_) {
131✔
523
                http::async_read_some(
8✔
524
                        *stream_,
4✔
525
                        response_buffer_,
4✔
526
                        *http_response_parser_,
4✔
527
                        [this, cancelled](const error_code &ec, size_t num_read) {
8✔
528
                                if (!*cancelled) {
4✔
529
                                        ReadHeaderHandler(ec, num_read);
4✔
530
                                }
531
                        });
4✔
532
        } else {
533
                http::async_read_some(
254✔
534
                        stream_->next_layer(),
127✔
535
                        response_buffer_,
127✔
536
                        *http_response_parser_,
127✔
537
                        [this, cancelled](const error_code &ec, size_t num_read) {
252✔
538
                                if (!*cancelled) {
126✔
539
                                        ReadHeaderHandler(ec, num_read);
126✔
540
                                }
541
                        });
126✔
542
        }
543
}
131✔
544

545
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
130✔
546
        if (num_read > 0) {
130✔
547
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
125✔
548
        }
549

550
        if (ec) {
130✔
551
                CallErrorHandler(ec, request_, header_handler_);
5✔
552
                return;
40✔
553
        }
554

555
        if (!http_response_parser_->is_header_done()) {
125✔
556
                ReadHeader();
×
557
                return;
×
558
        }
559

560
        response_.reset(new IncomingResponse(*this, cancelled_));
125✔
561
        response_->status_code_ = http_response_parser_->get().result_int();
125✔
562
        response_->status_message_ = string {http_response_parser_->get().reason()};
125✔
563

564
        string debug_str;
125✔
565
        for (auto header = http_response_parser_->get().cbegin();
248✔
566
                 header != http_response_parser_->get().cend();
496✔
567
                 header++) {
568
                response_->headers_[string {header->name_string()}] = string {header->value()};
246✔
569
                if (logger_.Level() >= log::LogLevel::Debug) {
123✔
570
                        debug_str += string {header->name_string()};
122✔
571
                        debug_str += ": ";
122✔
572
                        debug_str += string {header->value()};
122✔
573
                        debug_str += "\n";
122✔
574
                }
575
        }
576

577
        logger_.Debug("Received headers:\n" + debug_str);
125✔
578
        debug_str.clear();
125✔
579

580
        if (http_response_parser_->chunked()) {
125✔
581
                auto cancelled = cancelled_;
1✔
582
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
583
                CallHandler(header_handler_);
1✔
584
                if (!*cancelled) {
1✔
585
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
586
                        CallErrorHandler(err, request_, body_handler_);
1✔
587
                }
588
                return;
1✔
589
        }
590

591
        auto content_length = http_response_parser_->content_length();
124✔
592
        if (content_length) {
124✔
593
                response_body_length_ = content_length.value();
103✔
594
        } else {
595
                response_body_length_ = 0;
21✔
596
        }
597
        response_body_read_ = 0;
124✔
598

599
        if (response_body_read_ >= response_body_length_) {
124✔
600
                auto cancelled = cancelled_;
33✔
601
                status_ = TransactionStatus::HeaderHandlerCalled;
33✔
602
                CallHandler(header_handler_);
33✔
603
                if (!*cancelled) {
33✔
604
                        *cancelled_ = true;
31✔
605
                        cancelled_ = make_shared<bool>(true);
31✔
606
                        stream_.reset();
31✔
607
                        CallHandler(body_handler_);
31✔
608
                }
609
                return;
33✔
610
        }
611

612
        auto cancelled = cancelled_;
91✔
613
        status_ = TransactionStatus::HeaderHandlerCalled;
91✔
614
        CallHandler(header_handler_);
91✔
615
        if (*cancelled) {
91✔
616
                return;
1✔
617
        }
618

619
        // We know that a body reader is required here, because of the `response_body_read_ >=
620
        // response_body_length_` check above.
621
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
90✔
622
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
2✔
623
        }
624
}
625

626
void Client::AsyncReadNextBodyPart(
1,787✔
627
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
628
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1,787✔
629

630
        if (status_ == TransactionStatus::ReaderCreated) {
1,787✔
631
                status_ = TransactionStatus::BodyReadingInProgress;
88✔
632
        }
633

634
        if (status_ != TransactionStatus::BodyReadingInProgress) {
1,787✔
635
                auto cancelled = cancelled_;
30✔
636
                handler(0);
30✔
637
                if (!*cancelled && status_ == TransactionStatus::ReachedEnd) {
30✔
638
                        status_ = TransactionStatus::Done;
29✔
639
                        *cancelled_ = true;
29✔
640
                        cancelled_ = make_shared<bool>(true);
29✔
641
                        stream_.reset();
29✔
642
                        CallHandler(body_handler_);
29✔
643
                }
644
                return;
30✔
645
        }
646

647
        reader_buf_start_ = start;
1,757✔
648
        reader_buf_end_ = end;
1,757✔
649
        reader_handler_ = handler;
1,757✔
650
        size_t read_size = end - start;
1,757✔
651
        size_t smallest = min(body_buffer_.size(), read_size);
1,757✔
652

653
        http_response_parser_->get().body().data = body_buffer_.data();
1,757✔
654
        http_response_parser_->get().body().size = smallest;
1,757✔
655

656
        auto &cancelled = cancelled_;
1,757✔
657

658
        if (is_https_) {
1,757✔
659
                http::async_read_some(
×
660
                        *stream_,
×
661
                        response_buffer_,
×
662
                        *http_response_parser_,
×
663
                        [this, cancelled](const error_code &ec, size_t num_read) {
×
664
                                if (!*cancelled) {
×
665
                                        ReadBodyHandler(ec, num_read);
×
666
                                }
667
                        });
×
668
        } else {
669
                http::async_read_some(
3,514✔
670
                        stream_->next_layer(),
1,757✔
671
                        response_buffer_,
1,757✔
672
                        *http_response_parser_,
1,757✔
673
                        [this, cancelled](const error_code &ec, size_t num_read) {
3,514✔
674
                                if (!*cancelled) {
1,757✔
675
                                        ReadBodyHandler(ec, num_read);
1,757✔
676
                                }
677
                        });
1,757✔
678
        }
679
}
680

681
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
1,757✔
682
        if (num_read > 0) {
1,757✔
683
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
1,751✔
684
                response_body_read_ += num_read;
1,751✔
685
        }
686

687
        if (ec == http::make_error_code(http::error::need_buffer)) {
1,757✔
688
                // This can be ignored. We always reset the buffer between reads anyway.
689
                ec = error_code();
979✔
690
        }
691

692
        assert(reader_handler_);
1,757✔
693

694
        if (response_body_read_ >= response_body_length_) {
1,757✔
695
                status_ = TransactionStatus::ReachedEnd;
80✔
696
        }
697

698
        auto cancelled = cancelled_;
1,757✔
699

700
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
1,757✔
701
        size_t smallest = min(num_read, buf_size);
1,757✔
702
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
1,757✔
703
        if (ec) {
1,757✔
704
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
6✔
705
                reader_handler_(expected::unexpected(err));
6✔
706
        } else {
707
                reader_handler_(smallest);
1,751✔
708
        }
709

710
        if (!*cancelled && ec) {
1,757✔
711
                CallErrorHandler(ec, request_, body_handler_);
2✔
712
                return;
2✔
713
        }
714
}
715

716
void Client::Cancel() {
59✔
717
        auto cancelled = cancelled_;
118✔
718

719
        if (!*cancelled) {
59✔
720
                auto err =
721
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
116✔
722
                if (status_ == TransactionStatus::None) {
58✔
723
                        CallErrorHandler(err, request_, header_handler_);
1✔
724
                } else if (status_ != TransactionStatus::Done) {
57✔
725
                        CallErrorHandler(err, request_, body_handler_);
57✔
726
                }
727
        }
728

729
        if (!*cancelled) {
59✔
730
                DoCancel();
×
731
        }
732
}
59✔
733

734
void Client::DoCancel() {
349✔
735
        resolver_.cancel();
349✔
736
        if (stream_) {
349✔
737
                stream_->next_layer().cancel();
5✔
738
                stream_->next_layer().close();
5✔
739
                stream_.reset();
5✔
740
        }
741

742
        request_.reset();
349✔
743
        response_.reset();
349✔
744

745
        // Reset logger to no connection.
746
        logger_ = log::Logger(logger_name_);
349✔
747

748
        // Set cancel state and then make a new one. Those who are interested should have their own
749
        // pointer to the old one.
750
        *cancelled_ = true;
349✔
751
        cancelled_ = make_shared<bool>(true);
349✔
752
}
349✔
753

754
ClientConfig::ClientConfig() :
59✔
755
        ClientConfig("") {
59✔
756
}
59✔
757

758
ClientConfig::ClientConfig(
335✔
759
        const string &server_cert_path,
760
        const string &client_cert_path,
761
        const string &client_cert_key_path) :
335✔
762
        server_cert_path {server_cert_path},
763
        client_cert_path {client_cert_path},
764
        client_cert_key_path {client_cert_key_path} {};
335✔
765

766
ClientConfig::~ClientConfig() {
335✔
767
}
335✔
768

769
ServerConfig::ServerConfig() {
148✔
770
}
148✔
771

772
ServerConfig::~ServerConfig() {
148✔
773
}
148✔
774

775
Stream::Stream(Server &server) :
289✔
776
        server_ {server},
777
        logger_ {"http"},
778
        cancelled_(make_shared<bool>(true)),
×
779
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
289✔
780
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
578✔
781
        // This is equivalent to:
782
        //   request_buffer_.reserve(body_buffer_.size());
783
        // but compatible with Boost 1.67.
784
        request_buffer_.prepare(body_buffer_.size() - request_buffer_.size());
289✔
785

786
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
787
        // they do, they should be handled higher up in the application logic.
788
        //
789
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
790
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
791
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
792
        // possible value instead.
793
        http_request_parser_.body_limit(numeric_limits<uint64_t>::max());
289✔
794
}
289✔
795

796
Stream::~Stream() {
289✔
797
        DoCancel();
289✔
798
}
289✔
799

800
void Stream::Cancel() {
7✔
801
        auto cancelled = cancelled_;
14✔
802

803
        if (!*cancelled) {
7✔
804
                auto err =
805
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
806
                if (status_ == TransactionStatus::None) {
7✔
807
                        CallErrorHandler(err, request_, server_.header_handler_);
×
808
                } else if (status_ != TransactionStatus::Done) {
7✔
809
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
810
                }
811
        }
812

813
        if (!*cancelled) {
7✔
814
                DoCancel();
2✔
815
        }
816
}
7✔
817

818
void Stream::DoCancel() {
413✔
819
        if (socket_.is_open()) {
413✔
820
                socket_.cancel();
135✔
821
                socket_.close();
135✔
822
        }
823

824
        // Set cancel state and then make a new one. Those who are interested should have their own
825
        // pointer to the old one.
826
        *cancelled_ = true;
413✔
827
        cancelled_ = make_shared<bool>(true);
413✔
828
}
413✔
829

830
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
831
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
832
}
×
833

834
void Stream::CallErrorHandler(
×
835
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
836
        *cancelled_ = true;
×
837
        cancelled_ = make_shared<bool>(true);
×
838
        status_ = TransactionStatus::Done;
×
839
        handler(expected::unexpected(err.WithContext(
×
840
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
841

842
        server_.RemoveStream(shared_from_this());
×
843
}
×
844

845
void Stream::CallErrorHandler(
2✔
846
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
847
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
2✔
848
}
2✔
849

850
void Stream::CallErrorHandler(
9✔
851
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
852
        *cancelled_ = true;
9✔
853
        cancelled_ = make_shared<bool>(true);
9✔
854
        status_ = TransactionStatus::Done;
9✔
855
        handler(
9✔
856
                req,
857
                err.WithContext(
×
858
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
18✔
859

860
        server_.RemoveStream(shared_from_this());
9✔
861
}
9✔
862

863
void Stream::CallErrorHandler(
4✔
864
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
865
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
4✔
866
}
4✔
867

868
void Stream::CallErrorHandler(
6✔
869
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
870
        *cancelled_ = true;
6✔
871
        cancelled_ = make_shared<bool>(true);
6✔
872
        status_ = TransactionStatus::Done;
6✔
873
        handler(err.WithContext(
6✔
874
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
12✔
875

876
        server_.RemoveStream(shared_from_this());
6✔
877
}
6✔
878

879
void Stream::AcceptHandler(const error_code &ec) {
135✔
880
        if (ec) {
135✔
881
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
882
                return;
×
883
        }
884

885
        auto ip = socket_.remote_endpoint().address().to_string();
270✔
886

887
        // Use IP as context for logging.
888
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
135✔
889

890
        logger_.Debug("Accepted connection.");
135✔
891

892
        request_.reset(new IncomingRequest(*this, cancelled_));
135✔
893

894
        request_->address_.host = ip;
135✔
895

896
        *cancelled_ = false;
135✔
897

898
        ReadHeader();
135✔
899
}
900

901
void Stream::ReadHeader() {
135✔
902
        http_request_parser_.get().body().data = body_buffer_.data();
135✔
903
        http_request_parser_.get().body().size = body_buffer_.size();
135✔
904

905
        auto &cancelled = cancelled_;
135✔
906

907
        http::async_read_some(
270✔
908
                socket_,
135✔
909
                request_buffer_,
135✔
910
                http_request_parser_,
911
                [this, cancelled](const error_code &ec, size_t num_read) {
270✔
912
                        if (!*cancelled) {
135✔
913
                                ReadHeaderHandler(ec, num_read);
135✔
914
                        }
915
                });
135✔
916
}
135✔
917

918
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
135✔
919
        if (num_read > 0) {
135✔
920
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
135✔
921
        }
922

923
        if (ec) {
135✔
924
                CallErrorHandler(ec, request_, server_.header_handler_);
×
925
                return;
91✔
926
        }
927

928
        if (!http_request_parser_.is_header_done()) {
135✔
929
                ReadHeader();
×
930
                return;
×
931
        }
932

933
        auto method_result = BeastVerbToMethod(
934
                http_request_parser_.get().base().method(),
135✔
935
                string {http_request_parser_.get().base().method_string()});
270✔
936
        if (!method_result) {
135✔
937
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
938
                return;
×
939
        }
940
        request_->method_ = method_result.value();
135✔
941
        request_->address_.path = string(http_request_parser_.get().base().target());
135✔
942

943
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
135✔
944

945
        string debug_str;
135✔
946
        for (auto header = http_request_parser_.get().cbegin();
413✔
947
                 header != http_request_parser_.get().cend();
826✔
948
                 header++) {
949
                request_->headers_[string {header->name_string()}] = string {header->value()};
556✔
950
                if (logger_.Level() >= log::LogLevel::Debug) {
278✔
951
                        debug_str += string {header->name_string()};
277✔
952
                        debug_str += ": ";
277✔
953
                        debug_str += string {header->value()};
277✔
954
                        debug_str += "\n";
277✔
955
                }
956
        }
957

958
        logger_.Debug("Received headers:\n" + debug_str);
135✔
959
        debug_str.clear();
135✔
960

961
        if (http_request_parser_.chunked()) {
135✔
962
                auto cancelled = cancelled_;
1✔
963
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
964
                server_.header_handler_(request_);
1✔
965
                if (!*cancelled) {
1✔
966
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
967
                        CallErrorHandler(err, request_, server_.body_handler_);
1✔
968
                }
969
                return;
1✔
970
        }
971

972
        auto content_length = http_request_parser_.content_length();
134✔
973
        if (content_length) {
134✔
974
                request_body_length_ = content_length.value();
45✔
975
        } else {
976
                request_body_length_ = 0;
89✔
977
        }
978
        request_body_read_ = 0;
134✔
979

980
        if (request_body_read_ >= request_body_length_) {
134✔
981
                auto cancelled = cancelled_;
89✔
982
                status_ = TransactionStatus::HeaderHandlerCalled;
89✔
983
                server_.header_handler_(request_);
89✔
984
                if (!*cancelled) {
89✔
985
                        CallBodyHandler();
89✔
986
                }
987
                return;
89✔
988
        }
989

990
        auto cancelled = cancelled_;
45✔
991
        status_ = TransactionStatus::HeaderHandlerCalled;
45✔
992
        server_.header_handler_(request_);
45✔
993
        if (*cancelled) {
45✔
994
                return;
1✔
995
        }
996

997
        // We know that a body reader is required here, because of the `request_body_read_ >=
998
        // request_body_length_` check above.
999
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
44✔
1000
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
1✔
1001
        }
1002
}
1003

1004
void Stream::AsyncReadNextBodyPart(
2,044✔
1005
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1006
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
2,044✔
1007

1008
        if (status_ == TransactionStatus::ReaderCreated) {
2,044✔
1009
                status_ = TransactionStatus::BodyReadingInProgress;
43✔
1010
        }
1011

1012
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,044✔
1013
                auto cancelled = cancelled_;
37✔
1014
                handler(0);
37✔
1015
                if (!*cancelled && status_ == TransactionStatus::ReachedEnd) {
37✔
1016
                        status_ = TransactionStatus::Done;
37✔
1017
                        CallBodyHandler();
37✔
1018
                }
1019
                return;
37✔
1020
        }
1021

1022
        reader_buf_start_ = start;
2,007✔
1023
        reader_buf_end_ = end;
2,007✔
1024
        reader_handler_ = handler;
2,007✔
1025
        size_t read_size = end - start;
2,007✔
1026
        size_t smallest = min(body_buffer_.size(), read_size);
2,007✔
1027

1028
        http_request_parser_.get().body().data = body_buffer_.data();
2,007✔
1029
        http_request_parser_.get().body().size = smallest;
2,007✔
1030

1031
        auto &cancelled = cancelled_;
2,007✔
1032

1033
        http::async_read_some(
4,014✔
1034
                socket_,
2,007✔
1035
                request_buffer_,
2,007✔
1036
                http_request_parser_,
1037
                [this, cancelled](const error_code &ec, size_t num_read) {
4,014✔
1038
                        if (!*cancelled) {
2,007✔
1039
                                ReadBodyHandler(ec, num_read);
2,007✔
1040
                        }
1041
                });
2,007✔
1042
}
1043

1044
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,007✔
1045
        if (num_read > 0) {
2,007✔
1046
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
2,003✔
1047
                request_body_read_ += num_read;
2,003✔
1048
        }
1049

1050
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,007✔
1051
                // This can be ignored. We always reset the buffer between reads anyway.
1052
                ec = error_code();
979✔
1053
        }
1054

1055
        assert(reader_handler_);
2,007✔
1056

1057
        if (request_body_read_ >= request_body_length_) {
2,007✔
1058
                status_ = TransactionStatus::ReachedEnd;
37✔
1059
        }
1060

1061
        auto cancelled = cancelled_;
2,007✔
1062

1063
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,007✔
1064
        size_t smallest = min(num_read, buf_size);
2,007✔
1065
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,007✔
1066
        if (ec) {
2,007✔
1067
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
4✔
1068
                reader_handler_(expected::unexpected(err));
4✔
1069
        } else {
1070
                reader_handler_(smallest);
2,003✔
1071
        }
1072

1073
        if (!*cancelled && ec) {
2,007✔
1074
                CallErrorHandler(ec, request_, server_.body_handler_);
2✔
1075
                return;
2✔
1076
        }
1077
}
1078

1079
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
122✔
1080
        auto response = maybe_response_.lock();
122✔
1081
        // Only called from existing responses, so this should always be true.
1082
        assert(response);
122✔
1083

1084
        // From here on we take shared ownership.
1085
        response_ = response;
122✔
1086

1087
        reply_finished_handler_ = reply_finished_handler;
122✔
1088

1089
        http_response_ = make_shared<http::response<http::buffer_body>>();
122✔
1090

1091
        for (const auto &header : response->headers_) {
231✔
1092
                http_response_->base().set(header.first, header.second);
109✔
1093
        }
1094

1095
        http_response_->result(response->GetStatusCode());
122✔
1096
        http_response_->reason(response->GetStatusMessage());
122✔
1097

1098
        http_response_serializer_ =
1099
                make_shared<http::response_serializer<http::buffer_body>>(*http_response_);
122✔
1100

1101
        auto &cancelled = cancelled_;
122✔
1102

1103
        http::async_write_header(
244✔
1104
                socket_,
122✔
1105
                *http_response_serializer_,
122✔
1106
                [this, cancelled](const error_code &ec, size_t num_written) {
243✔
1107
                        if (!*cancelled) {
122✔
1108
                                WriteHeaderHandler(ec, num_written);
121✔
1109
                        }
1110
                });
122✔
1111
}
122✔
1112

1113
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
121✔
1114
        if (num_written > 0) {
121✔
1115
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
121✔
1116
        }
1117

1118
        if (ec) {
121✔
1119
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1120
                return;
33✔
1121
        }
1122

1123
        auto header = response_->GetHeader("Content-Length");
242✔
1124
        if (!header || header.value() == "0") {
121✔
1125
                FinishReply();
32✔
1126
                return;
32✔
1127
        }
1128

1129
        auto length = common::StringToLongLong(header.value());
89✔
1130
        if (!length || length.value() < 0) {
89✔
1131
                auto err = error::Error(
1132
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1133
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1134
                return;
×
1135
        }
1136

1137
        if (!response_->body_reader_ && !response_->async_body_reader_) {
89✔
1138
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1139
                CallErrorHandler(err, request_, reply_finished_handler_);
1✔
1140
                return;
1✔
1141
        }
1142

1143
        PrepareAndWriteNewBodyBuffer();
88✔
1144
}
1145

1146
void Stream::PrepareAndWriteNewBodyBuffer() {
874✔
1147
        // response_->body_reader_ XOR response_->async_body_reader_
1148
        assert(
874✔
1149
                (response_->body_reader_ || response_->async_body_reader_)
1150
                && !(response_->body_reader_ && response_->async_body_reader_));
1151

1152
        auto read_handler = [this](io::ExpectedSize read) {
1,749✔
1153
                if (!read) {
874✔
1154
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
1✔
1155
                        return;
1✔
1156
                }
1157
                WriteNewBodyBuffer(read.value());
873✔
1158
        };
874✔
1159

1160
        if (response_->body_reader_) {
874✔
1161
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
600✔
1162
        } else {
1163
                auto err = response_->async_body_reader_->AsyncRead(
274✔
1164
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
548✔
1165
                if (err != error::NoError) {
274✔
1166
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1167
                }
1168
        }
1169
}
874✔
1170

1171
void Stream::WriteNewBodyBuffer(size_t size) {
873✔
1172
        http_response_->body().data = body_buffer_.data();
873✔
1173
        http_response_->body().size = size;
873✔
1174

1175
        if (size > 0) {
873✔
1176
                http_response_->body().more = true;
794✔
1177
        } else {
1178
                // Release ownership of Body reader.
1179
                response_->body_reader_.reset();
79✔
1180
                response_->async_body_reader_.reset();
79✔
1181
                http_response_->body().more = false;
79✔
1182
        }
1183

1184
        WriteBody();
873✔
1185
}
873✔
1186

1187
void Stream::WriteBody() {
1,663✔
1188
        auto &cancelled = cancelled_;
1,663✔
1189

1190
        http::async_write_some(
3,326✔
1191
                socket_,
1,663✔
1192
                *http_response_serializer_,
1,663✔
1193
                [this, cancelled](const error_code &ec, size_t num_written) {
3,299✔
1194
                        if (!*cancelled) {
1,650✔
1195
                                WriteBodyHandler(ec, num_written);
1,649✔
1196
                        }
1197
                });
1,650✔
1198
}
1,663✔
1199

1200
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
1,649✔
1201
        if (num_written > 0) {
1,649✔
1202
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
790✔
1203
        }
1204

1205
        if (ec == http::make_error_code(http::error::need_buffer)) {
1,649✔
1206
                // Write next body block.
1207
                PrepareAndWriteNewBodyBuffer();
786✔
1208
        } else if (ec) {
863✔
1209
                CallErrorHandler(ec, request_, reply_finished_handler_);
4✔
1210
        } else if (num_written > 0) {
859✔
1211
                // We are still writing the body.
1212
                WriteBody();
790✔
1213
        } else {
1214
                // We are finished.
1215
                FinishReply();
69✔
1216
        }
1217
}
1,649✔
1218

1219
void Stream::FinishReply() {
101✔
1220
        // We are done.
1221
        *cancelled_ = true;
101✔
1222
        cancelled_ = make_shared<bool>(true);
101✔
1223
        reply_finished_handler_(error::NoError);
101✔
1224
        server_.RemoveStream(shared_from_this());
101✔
1225
}
101✔
1226

1227
void Stream::CallBodyHandler() {
126✔
1228
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1229
        // it immediately destroys, which would destroy this stream as well. At the end of this
1230
        // function, it's ok to destroy it.
1231
        auto stream_ref = shared_from_this();
252✔
1232

1233
        server_.body_handler_(request_, error::NoError);
126✔
1234

1235
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1236
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1237
        // request has not been handled correctly.
1238
        auto response = maybe_response_.lock();
252✔
1239
        if (!response) {
126✔
1240
                logger_.Error("Handler produced no response. Closing stream prematurely.");
2✔
1241
                *cancelled_ = true;
2✔
1242
                cancelled_ = make_shared<bool>(true);
2✔
1243
                server_.RemoveStream(shared_from_this());
2✔
1244
        }
1245
}
126✔
1246

1247
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
25✔
1248
        event_loop_ {event_loop},
1249
        acceptor_(GetAsioIoContext(event_loop_)) {
25✔
1250
}
25✔
1251

1252
Server::~Server() {
25✔
1253
        Cancel();
25✔
1254
}
25✔
1255

1256
error::Error Server::AsyncServeUrl(
147✔
1257
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1258
        return AsyncServeUrl(
1259
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
128✔
1260
                        if (err != error::NoError) {
128✔
1261
                                body_handler(expected::unexpected(err));
7✔
1262
                        } else {
1263
                                body_handler(req);
121✔
1264
                        }
1265
                });
275✔
1266
}
1267

1268
error::Error Server::AsyncServeUrl(
155✔
1269
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1270
        auto err = BreakDownUrl(url, address_);
310✔
1271
        if (error::NoError != err) {
155✔
1272
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1273
        }
1274

1275
        if (address_.protocol != "http") {
155✔
1276
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1277
        }
1278

1279
        if (address_.path.size() > 0 && address_.path != "/") {
155✔
1280
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
1✔
1281
        }
1282

1283
        boost::system::error_code ec;
154✔
1284
        auto address = asio::ip::make_address(address_.host, ec);
154✔
1285
        if (ec) {
154✔
1286
                return error::Error(
1287
                        ec.default_error_condition(),
×
1288
                        "Could not construct endpoint from address " + address_.host);
×
1289
        }
1290

1291
        asio::ip::tcp::endpoint endpoint(address, address_.port);
154✔
1292

1293
        ec.clear();
154✔
1294
        acceptor_.open(endpoint.protocol(), ec);
154✔
1295
        if (ec) {
154✔
1296
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1297
        }
1298

1299
        // Allow address reuse, otherwise we can't re-bind later.
1300
        ec.clear();
154✔
1301
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
154✔
1302
        if (ec) {
154✔
1303
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1304
        }
1305

1306
        ec.clear();
154✔
1307
        acceptor_.bind(endpoint, ec);
154✔
1308
        if (ec) {
154✔
1309
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1310
        }
1311

1312
        ec.clear();
154✔
1313
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
154✔
1314
        if (ec) {
154✔
1315
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1316
        }
1317

1318
        header_handler_ = header_handler;
154✔
1319
        body_handler_ = body_handler;
154✔
1320

1321
        PrepareNewStream();
154✔
1322

1323
        return error::NoError;
154✔
1324
}
1325

1326
void Server::Cancel() {
162✔
1327
        if (acceptor_.is_open()) {
162✔
1328
                acceptor_.cancel();
154✔
1329
                acceptor_.close();
154✔
1330
        }
1331
        streams_.clear();
162✔
1332
}
162✔
1333

1334
uint16_t Server::GetPort() const {
7✔
1335
        return acceptor_.local_endpoint().port();
7✔
1336
}
1337

1338
string Server::GetUrl() const {
7✔
1339
        return "http://127.0.0.1:" + to_string(GetPort());
14✔
1340
}
1341

1342
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
125✔
1343
        if (*req->cancelled_) {
125✔
1344
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1345
        }
1346
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
250✔
1347
        req->stream_.maybe_response_ = response;
125✔
1348
        return response;
125✔
1349
}
1350

1351
error::Error Server::AsyncReply(
122✔
1352
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1353
        if (*resp->cancelled_) {
122✔
1354
                return MakeError(StreamCancelledError, "Cannot send response");
×
1355
        }
1356

1357
        resp->stream_.AsyncReply(reply_finished_handler);
122✔
1358
        return error::NoError;
122✔
1359
}
1360

1361
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
54✔
1362
        if (*req->cancelled_) {
54✔
1363
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1364
        }
1365

1366
        auto &stream = req->stream_;
54✔
1367
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
54✔
1368
                return expected::unexpected(error::Error(
1✔
1369
                        make_error_condition(errc::operation_in_progress),
1✔
1370
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1371
        }
1372

1373
        if (stream.request_body_length_ == 0) {
53✔
1374
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
20✔
1375
        }
1376

1377
        stream.status_ = TransactionStatus::ReaderCreated;
43✔
1378
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
86✔
1379
}
1380

1381
void Server::PrepareNewStream() {
289✔
1382
        StreamPtr new_stream {new Stream(*this)};
289✔
1383
        streams_.insert(new_stream);
289✔
1384
        AsyncAccept(new_stream);
289✔
1385
}
289✔
1386

1387
void Server::AsyncAccept(StreamPtr stream) {
289✔
1388
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
289✔
1389
                if (ec) {
136✔
1390
                        log::Error("Could not accept connection: " + ec.message());
1✔
1391
                        return;
1✔
1392
                }
1393

1394
                stream->AcceptHandler(ec);
135✔
1395

1396
                this->PrepareNewStream();
135✔
1397
        });
1398
}
289✔
1399

1400
void Server::RemoveStream(const StreamPtr &stream) {
122✔
1401
        streams_.erase(stream);
122✔
1402

1403
        // Work around bug in Boost ASIO: When the handler for `async_read_some` is called with `ec
1404
        // == operation_aborted`, the handler should not access any supplied buffers, because it may
1405
        // be aborted due to object destruction. However, it does access buffers. This means it does
1406
        // not help to call `Cancel()` prior to destruction. We need to call `Cancel()` first, and
1407
        // then wait until the handler which receives `operation_aborted` has run. So do a
1408
        // `Cancel()` followed by `Post()` for this, which should queue us in the correct order:
1409
        // `operation_aborted` -> `Post` handler.
1410
        stream->DoCancel();
122✔
1411
        event_loop_.Post([stream]() {
122✔
1412
                // No-op, just keep `stream` alive until we get back to this handler.
1413
        });
244✔
1414
}
122✔
1415

1416
} // namespace http
1417
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc