• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1385817157

24 Jul 2024 08:11AM UTC coverage: 75.647%. Remained the same
1385817157

push

gitlab-ci

vpodzime
fix: URL-decode proxy username and password

In cases where the proxy username or password contains characters that
cannot show up in URLs, they should be URL-encoded.

This is to ensure backwards compatibility with the Mender client
3.

Unfortunately, tinyproxy considers all special characters in the
BasicAuth configuration entry as syntax error so we have no way
to test this.

Ticket: MEN-7402
Changelog: none
Signed-off-by: Vratislav Podzimek <vratislav.podzimek@northern.tech>

7 of 9 new or added lines in 2 files covered. (77.78%)

16 existing lines in 1 file now uncovered.

7104 of 9391 relevant lines covered (75.65%)

11609.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.26
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
namespace mender {
28
namespace common {
29
namespace http {
30

31
namespace common = mender::common;
32
namespace crypto = mender::common::crypto;
33

34
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
35
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
36
const unsigned int BeastHttpVersion = 11;
37

38
namespace asio = boost::asio;
39
namespace http = boost::beast::http;
40

41
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
42

43
static http::verb MethodToBeastVerb(Method method) {
246✔
44
        switch (method) {
246✔
45
        case Method::GET:
46
                return http::verb::get;
47
        case Method::HEAD:
48
                return http::verb::head;
49
        case Method::POST:
50
                return http::verb::post;
51
        case Method::PUT:
52
                return http::verb::put;
53
        case Method::PATCH:
54
                return http::verb::patch;
55
        case Method::CONNECT:
56
                return http::verb::connect;
57
        case Method::Invalid:
58
                // Fallthrough to end (no-op).
59
                break;
60
        }
61
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
62
        // still assert here for safety.
63
        assert(false);
64
        return http::verb::get;
65
}
66

67
static expected::expected<Method, error::Error> BeastVerbToMethod(
225✔
68
        http::verb verb, const string &verb_string) {
69
        switch (verb) {
225✔
70
        case http::verb::get:
189✔
71
                return Method::GET;
72
        case http::verb::head:
×
73
                return Method::HEAD;
74
        case http::verb::post:
13✔
75
                return Method::POST;
76
        case http::verb::put:
23✔
77
                return Method::PUT;
78
        case http::verb::patch:
×
79
                return Method::PATCH;
80
        case http::verb::connect:
×
81
                return Method::CONNECT;
82
        default:
×
83
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
84
        }
85
}
86

87
template <typename StreamType>
88
class BodyAsyncReader : virtual public io::AsyncReader {
89
public:
90
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
153✔
91
                stream_ {stream},
92
                cancelled_ {cancelled} {
306✔
93
        }
153✔
94
        ~BodyAsyncReader() {
39✔
95
                Cancel();
39✔
96
        }
78✔
97

98
        error::Error AsyncRead(
2,187✔
99
                vector<uint8_t>::iterator start,
100
                vector<uint8_t>::iterator end,
101
                io::AsyncIoHandler handler) override {
102
                if (eof_) {
2,187✔
103
                        handler(0);
×
104
                        return error::NoError;
×
105
                }
106

107
                if (*cancelled_) {
2,187✔
108
                        return error::MakeError(
×
109
                                error::ProgrammingError,
110
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
111
                }
112
                stream_.AsyncReadNextBodyPart(start, end, [this, handler](io::ExpectedSize size) {
12,822✔
113
                        if (size && size.value() == 0) {
6,317✔
114
                                eof_ = true;
124✔
115
                        }
116
                        handler(size);
12,634✔
117
                });
118
                return error::NoError;
2,187✔
119
        }
120

121
        void Cancel() override {
39✔
122
                if (!*cancelled_) {
39✔
123
                        stream_.Cancel();
4✔
124
                }
125
        }
39✔
126

127
private:
128
        StreamType &stream_;
129
        shared_ptr<bool> cancelled_;
130
        bool eof_ {false};
131

132
        friend class Client;
133
        friend class Server;
134
};
135

136
template <typename StreamType>
137
class RawSocket : virtual public io::AsyncReadWriter {
138
public:
139
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
140
                destroying_ {make_shared<bool>(false)},
×
141
                stream_ {stream},
142
                buffered_ {buffered} {
×
143
                // If there are no buffered bytes, then we don't need it.
144
                if (buffered_ && buffered_->size() == 0) {
×
145
                        buffered_.reset();
×
146
                }
147
        }
×
148

149
        ~RawSocket() {
15✔
150
                *destroying_ = true;
15✔
151
                Cancel();
15✔
152
        }
30✔
153

154
        error::Error AsyncRead(
320✔
155
                vector<uint8_t>::iterator start,
156
                vector<uint8_t>::iterator end,
157
                io::AsyncIoHandler handler) override {
158
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
159
                // header and parts of the body in one block, return those first.
160
                if (buffered_) {
320✔
161
                        return DrainPrebufferedData(start, end, handler);
8✔
162
                }
163

164
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
165
                auto &destroying = destroying_;
166
                stream_->async_read_some(
632✔
167
                        read_buffer_,
316✔
168
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
169
                                if (*destroying) {
313✔
170
                                        return;
171
                                }
172

173
                                if (ec == asio::error::operation_aborted) {
313✔
174
                                        handler(expected::unexpected(error::Error(
12✔
175
                                                make_error_condition(errc::operation_canceled),
6✔
176
                                                "Could not read from socket")));
177
                                } else if (ec) {
310✔
178
                                        handler(expected::unexpected(
12✔
179
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
180
                                } else {
181
                                        handler(num_read);
608✔
182
                                }
183
                        });
184
                return error::NoError;
316✔
185
        }
186

187
        error::Error AsyncWrite(
309✔
188
                vector<uint8_t>::const_iterator start,
189
                vector<uint8_t>::const_iterator end,
190
                io::AsyncIoHandler handler) override {
191
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
192
                auto &destroying = destroying_;
193
                stream_->async_write_some(
618✔
194
                        write_buffer_,
309✔
195
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
196
                                if (*destroying) {
306✔
197
                                        return;
198
                                }
199

200
                                if (ec == asio::error::operation_aborted) {
306✔
201
                                        handler(expected::unexpected(error::Error(
×
202
                                                make_error_condition(errc::operation_canceled),
×
203
                                                "Could not write to socket")));
204
                                } else if (ec) {
306✔
205
                                        handler(expected::unexpected(
×
206
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
207
                                } else {
208
                                        handler(num_written);
612✔
209
                                }
210
                        });
211
                return error::NoError;
309✔
212
        }
213

214
        void Cancel() override {
28✔
215
                if (stream_->lowest_layer().is_open()) {
28✔
216
                        stream_->lowest_layer().cancel();
15✔
217
                        stream_->lowest_layer().close();
15✔
218
                }
219
        }
28✔
220

221
private:
222
        error::Error DrainPrebufferedData(
4✔
223
                vector<uint8_t>::iterator start,
224
                vector<uint8_t>::iterator end,
225
                io::AsyncIoHandler handler) {
226
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
227

228
                // These two lines are equivalent to:
229
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
230
                // but compatible with Boost 1.67.
231
                const beast::flat_buffer &cbuffered = *buffered_;
232
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
233
                buffered_->consume(to_copy);
4✔
234
                if (buffered_->size() == 0) {
4✔
235
                        // We don't need it anymore.
236
                        buffered_.reset();
4✔
237
                }
238
                handler(to_copy);
4✔
239
                return error::NoError;
4✔
240
        }
241

242
        shared_ptr<bool> destroying_;
243
        shared_ptr<StreamType> stream_;
244
        shared_ptr<beast::flat_buffer> buffered_;
245
        asio::mutable_buffer read_buffer_;
246
        asio::const_buffer write_buffer_;
247
};
248

249
template <typename PARSER>
250
size_t GetContentLength(const PARSER &parser) {
398✔
251
        auto content_length = parser.content_length();
398✔
252
        if (content_length) {
398✔
253
                return content_length.value();
351✔
254
        } else {
255
                return 0;
256
        }
257
}
258

259
expected::ExpectedBool HasBody(
447✔
260
        const expected::ExpectedString &content_length,
261
        const expected::ExpectedString &transfer_encoding) {
262
        if (transfer_encoding) {
447✔
263
                if (transfer_encoding.value() != "chunked") {
2✔
264
                        return expected::unexpected(error::Error(
×
265
                                make_error_condition(errc::not_supported),
×
266
                                "Unsupported Transfer-Encoding: " + transfer_encoding.value()));
×
267
                }
268
                return true;
269
        }
270

271
        if (content_length) {
445✔
272
                auto length = common::StringToLongLong(content_length.value());
219✔
273
                if (!length || length.value() < 0) {
219✔
274
                        return expected::unexpected(error::Error(
×
275
                                length.error().code,
276
                                "Content-Length contains invalid number: " + content_length.value()));
×
277
                }
278
                return length.value() > 0;
219✔
279
        }
280

281
        return false;
282
}
283

284
Client::Client(
353✔
285
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
286
        event_loop_ {event_loop},
287
        logger_name_ {logger_name},
288
        client_config_ {client},
289
        http_proxy_ {client.http_proxy},
353✔
290
        https_proxy_ {client.https_proxy},
353✔
291
        no_proxy_ {client.no_proxy},
353✔
292
        cancelled_ {make_shared<bool>(true)},
×
293
        resolver_(GetAsioIoContext(event_loop)),
294
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,412✔
295
}
353✔
296

297
Client::~Client() {
2,118✔
298
        if (!*cancelled_) {
353✔
299
                logger_.Warning("Client destroyed while request is still active!");
32✔
300
        }
301
        DoCancel();
353✔
302
}
353✔
303

304
error::Error Client::Initialize() {
284✔
305
        if (initialized_) {
284✔
306
                return error::NoError;
71✔
307
        }
308

309
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
631✔
310
                ssl_ctx_[i].set_verify_mode(
842✔
311
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
312

313
                beast::error_code ec {};
422✔
314
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
422✔
315
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
316
                        ssl_ctx_[i].use_certificate_file(
317
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
318
                        if (ec) {
4✔
319
                                return error::Error(
320
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
321
                        }
322
                        auto exp_key = crypto::PrivateKey::Load(
323
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
324
                        if (!exp_key) {
3✔
325
                                return exp_key.error().WithContext(
326
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
327
                        }
328

329
                        const int ret =
330
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value().Get());
2✔
331
                        if (ret != 1) {
2✔
332
                                return MakeError(
333
                                        HTTPInitError,
334
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
335
                                                + " to the SSL CTX");
×
336
                        }
337
                } else if (
338
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
418✔
339
                        return error::Error(
340
                                make_error_condition(errc::invalid_argument),
4✔
341
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
342
                }
343

344
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
418✔
345
                if (ec) {
418✔
346
                        auto err = error::Error(
347
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
348
                        if (client_config_.server_cert_path == "") {
×
349
                                // We aren't going to have any valid certificates then.
350
                                return err;
×
351
                        } else {
352
                                // We have a dedicated certificate, so this is not fatal.
353
                                log::Info(err.String());
×
354
                        }
355
                }
356
                if (client_config_.server_cert_path != "") {
418✔
357
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
52✔
358
                        if (ec) {
52✔
359
                                log::Warning("Failed to load the server certificate! Falling back to the CA store");
4✔
360
                        }
361
                }
362
        }
363

364
        initialized_ = true;
209✔
365

366
        return error::NoError;
209✔
367
}
368

369
// Create the HOST header according to:
370
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.23
371
// In short: Add the port-number if it is non-standard HTTP
372
static string CreateHOSTAddress(OutgoingRequestPtr req) {
272✔
373
        if (req->GetPort() == 80 || req->GetPort() == 443) {
272✔
374
                return req->GetHost();
4✔
375
        }
376
        return req->GetHost() + ":" + to_string(req->GetPort());
536✔
377
}
378

379
error::Error Client::AsyncCall(
284✔
380
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
381
        auto err = Initialize();
284✔
382
        if (err != error::NoError) {
284✔
383
                return err;
4✔
384
        }
385

386
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
280✔
387
                return error::Error(
388
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
389
        }
390

391
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
280✔
392
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
393
        }
394

395
        if (!header_handler || !body_handler) {
278✔
396
                return error::MakeError(
397
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
398
        }
399

400
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
277✔
401
                return error::Error(
402
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
403
        }
404

405
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
276✔
406

407
        request_ = req;
408

409
        err = HandleProxySetup();
276✔
410
        if (err != error::NoError) {
276✔
411
                return err;
4✔
412
        }
413

414
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
415
        // request to route to our k8s cluster. Set this in all cases.
416
        const string header_url = CreateHOSTAddress(req);
544✔
417
        req->SetHeader("HOST", header_url);
544✔
418

419
        log::Trace("Setting HOST address: " + header_url);
272✔
420

421
        header_handler_ = header_handler;
272✔
422
        body_handler_ = body_handler;
272✔
423
        status_ = TransactionStatus::None;
272✔
424

425
        cancelled_ = make_shared<bool>(false);
272✔
426

427
        auto &cancelled = cancelled_;
428

429
        resolver_.async_resolve(
544✔
430
                request_->address_.host,
431
                to_string(request_->address_.port),
544✔
432
                [this, cancelled](
540✔
433
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
267✔
434
                        if (!*cancelled) {
268✔
435
                                ResolveHandler(ec, results);
267✔
436
                        }
437
                });
268✔
438

439
        return error::NoError;
272✔
440
}
441

442
static inline error::Error AddProxyAuthHeader(OutgoingRequest &req, BrokenDownUrl &proxy_address) {
22✔
443
        if (proxy_address.username == "") {
22✔
444
                // nothing to do
445
                return error::NoError;
19✔
446
        }
447
        auto ex_dec_username = URLDecode(proxy_address.username);
3✔
448
        auto ex_dec_password = URLDecode(proxy_address.password);
3✔
449
        if (!ex_dec_username) {
3✔
NEW
450
                return ex_dec_username.error();
×
451
        }
452
        if (!ex_dec_password) {
3✔
NEW
453
                return ex_dec_password.error();
×
454
        }
455
        auto creds = ex_dec_username.value() + ":" + ex_dec_password.value();
3✔
456
        auto ex_encoded_creds = crypto::EncodeBase64(common::ByteVectorFromString(creds));
6✔
457
        if (!ex_encoded_creds) {
3✔
458
                return ex_encoded_creds.error();
×
459
        }
460
        req.SetHeader("Proxy-Authorization", "Basic " + ex_encoded_creds.value());
6✔
461

462
        return error::NoError;
3✔
463
}
464

465
error::Error Client::HandleProxySetup() {
276✔
466
        secondary_req_.reset();
276✔
467

468
        if (request_->address_.protocol == "http") {
276✔
469
                socket_mode_ = SocketMode::Plain;
250✔
470

471
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
250✔
472
                        // Make a modified proxy request.
473
                        BrokenDownUrl proxy_address;
20✔
474
                        auto err = BreakDownUrl(http_proxy_, proxy_address, true);
11✔
475
                        if (err != error::NoError) {
11✔
476
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
477
                        }
478
                        if (proxy_address.path != "" && proxy_address.path != "/") {
10✔
479
                                return MakeError(
480
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
481
                        }
482

483
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
18✔
484
                                                                          + ":" + to_string(request_->address_.port)
27✔
485
                                                                          + request_->address_.path;
27✔
486
                        request_->address_.host = proxy_address.host;
9✔
487
                        request_->address_.port = proxy_address.port;
9✔
488
                        request_->address_.protocol = proxy_address.protocol;
9✔
489

490
                        err = AddProxyAuthHeader(*request_, proxy_address);
9✔
491
                        if (err != error::NoError) {
9✔
492
                                return err;
×
493
                        }
494

495
                        if (proxy_address.protocol == "https") {
9✔
496
                                socket_mode_ = SocketMode::Tls;
5✔
497
                        } else if (proxy_address.protocol == "http") {
4✔
498
                                socket_mode_ = SocketMode::Plain;
4✔
499
                        } else {
500
                                // Should never get here.
501
                                assert(false);
502
                        }
503
                }
504
        } else if (request_->address_.protocol == "https") {
26✔
505
                socket_mode_ = SocketMode::Tls;
26✔
506

507
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
26✔
508
                        // Save the original request for later, so that we can make a new request
509
                        // over the channel established by CONNECT.
510
                        secondary_req_ = std::move(request_);
511

512
                        request_ = make_shared<OutgoingRequest>();
30✔
513
                        request_->SetMethod(Method::CONNECT);
15✔
514
                        BrokenDownUrl proxy_address;
28✔
515
                        auto err = BreakDownUrl(https_proxy_, proxy_address, true);
15✔
516
                        if (err != error::NoError) {
15✔
517
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
518
                        }
519
                        if (proxy_address.path != "" && proxy_address.path != "/") {
14✔
520
                                return MakeError(
521
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
522
                        }
523

524
                        request_->address_.path =
525
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
26✔
526
                        request_->address_.host = proxy_address.host;
13✔
527
                        request_->address_.port = proxy_address.port;
13✔
528
                        request_->address_.protocol = proxy_address.protocol;
13✔
529

530
                        err = AddProxyAuthHeader(*request_, proxy_address);
13✔
531
                        if (err != error::NoError) {
13✔
532
                                return err;
×
533
                        }
534

535
                        if (proxy_address.protocol == "https") {
13✔
536
                                socket_mode_ = SocketMode::Tls;
7✔
537
                        } else if (proxy_address.protocol == "http") {
6✔
538
                                socket_mode_ = SocketMode::Plain;
6✔
539
                        } else {
540
                                // Should never get here.
541
                                assert(false);
542
                        }
543
                }
544
        } else {
545
                // Should never get here
546
                assert(false);
547
        }
548

549
        return error::NoError;
272✔
550
}
551

552
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
172✔
553
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
172✔
554
                return expected::unexpected(error::Error(
2✔
555
                        make_error_condition(errc::operation_in_progress),
4✔
556
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
557
        }
558

559
        if (GetContentLength(*response_data_.http_response_parser_) == 0
170✔
560
                && !response_data_.http_response_parser_->chunked()) {
170✔
561
                return expected::unexpected(
17✔
562
                        MakeError(BodyMissingError, "Response does not contain a body"));
51✔
563
        }
564

565
        status_ = TransactionStatus::ReaderCreated;
153✔
566
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
306✔
567
}
568

569
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
570
        if (*cancelled_) {
7✔
571
                return expected::unexpected(error::Error(
×
572
                        make_error_condition(errc::not_connected),
×
573
                        "Cannot switch protocols if endpoint is not connected"));
×
574
        }
575

576
        // Rest of the connection is done directly on the socket, we are done here.
577
        status_ = TransactionStatus::Done;
7✔
578
        *cancelled_ = true;
7✔
579
        cancelled_ = make_shared<bool>(false);
14✔
580

581
        auto stream = stream_;
582
        // This no longer belongs to us.
583
        stream_.reset();
7✔
584

585
        switch (socket_mode_) {
7✔
586
        case SocketMode::TlsTls:
×
587
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
588
                        stream, response_data_.response_buffer_);
×
589
        case SocketMode::Tls:
×
590
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
591
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
592
                        response_data_.response_buffer_);
×
593
        case SocketMode::Plain:
7✔
594
                return make_shared<RawSocket<tcp::socket>>(
7✔
595
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
596
                        response_data_.response_buffer_);
7✔
597
        }
598

599
        AssertOrReturnUnexpected(false);
×
600
}
601

602
void Client::CallHandler(ResponseHandler handler) {
354✔
603
        // This function exists to make sure we have a copy of the handler we're calling (in the
604
        // argument list). This is important in case the handler owns the client instance through a
605
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
606
        // does, then it destroys the final copy of the handler, and therefore also the client,
607
        // which is why we need to make a copy here, before calling it.
608
        handler(response_);
354✔
609
}
354✔
610

611
void Client::CallErrorHandler(
83✔
612
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
613
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
249✔
614
}
83✔
615

616
void Client::CallErrorHandler(
126✔
617
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
618
        status_ = TransactionStatus::Done;
126✔
619
        DoCancel();
126✔
620
        handler(expected::unexpected(
252✔
621
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
504✔
622
}
126✔
623

624
void Client::ResolveHandler(
267✔
625
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
626
        if (ec) {
267✔
627
                CallErrorHandler(ec, request_, header_handler_);
×
628
                return;
×
629
        }
630

631
        if (logger_.Level() >= log::LogLevel::Debug) {
267✔
632
                string ips = "[";
245✔
633
                string sep;
634
                for (auto r : results) {
1,044✔
635
                        ips += sep;
277✔
636
                        ips += r.endpoint().address().to_string();
277✔
637
                        sep = ", ";
277✔
638
                }
639
                ips += "]";
245✔
640
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
490✔
641
        }
642

643
        resolver_results_ = results;
644

645
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
267✔
646
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
534✔
647

648
        if (!response_data_.response_buffer_) {
267✔
649
                // We can reuse this if preexisting.
650
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
396✔
651

652
                // This is equivalent to:
653
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
654
                // but compatible with Boost 1.67.
655
                response_data_.response_buffer_->prepare(
656
                        body_buffer_.size() - response_data_.response_buffer_->size());
198✔
657
        }
658

659
        auto &cancelled = cancelled_;
660

661
        asio::async_connect(
267✔
662
                stream_->lowest_layer(),
663
                resolver_results_,
267✔
664
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
534✔
665
                        if (!*cancelled) {
267✔
666
                                switch (socket_mode_) {
267✔
667
                                case SocketMode::TlsTls:
×
668
                                        // Should never happen because we always need to handshake
669
                                        // the innermost Tls first, then the outermost, but the
670
                                        // latter doesn't happen here.
671
                                        assert(false);
672
                                        CallErrorHandler(
×
673
                                                error::MakeError(
×
674
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
675
                                                request_,
×
676
                                                header_handler_);
×
677
                                case SocketMode::Tls:
21✔
678
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
21✔
679
                                case SocketMode::Plain:
246✔
680
                                        return ConnectHandler(ec, endpoint);
246✔
681
                                }
682
                        }
683
                });
684
}
685

686
template <typename StreamType>
687
void Client::HandshakeHandler(
25✔
688
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
689
        if (ec) {
25✔
690
                CallErrorHandler(ec, request_, header_handler_);
2✔
691
                return;
2✔
692
        }
693

694
        // Enable TCP keepalive
695
        boost::asio::socket_base::keep_alive option(true);
696
        stream_->lowest_layer().set_option(option);
23✔
697

698
        // Set SNI Hostname (many hosts need this to handshake successfully)
699
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
23✔
700
                beast::error_code ec2 {
×
701
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
702
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
703
        }
704

705
        // Enable host name verification (not done automatically and we don't have
706
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
707
        // hence the callback that boost provides).
708
        boost::system::error_code b_ec;
23✔
709
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
46✔
710
        if (b_ec) {
23✔
711
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
712
                CallErrorHandler(b_ec, request_, header_handler_);
×
713
                return;
×
714
        }
715

716
        auto &cancelled = cancelled_;
717

718
        stream.async_handshake(
46✔
719
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
23✔
720
                        if (*cancelled) {
26✔
721
                                return;
722
                        }
723
                        if (ec) {
26✔
724
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
725
                                CallErrorHandler(ec, request_, header_handler_);
10✔
726
                                return;
10✔
727
                        }
728
                        logger_.Debug("https: Successful SSL handshake");
32✔
729
                        ConnectHandler(ec, endpoint);
16✔
730
                });
731
}
732

733

734
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
262✔
735
        if (ec) {
262✔
736
                CallErrorHandler(ec, request_, header_handler_);
16✔
737
                return;
16✔
738
        }
739

740
        // Enable TCP keepalive
741
        boost::asio::socket_base::keep_alive option(true);
742
        stream_->lowest_layer().set_option(option);
246✔
743

744
        logger_.Debug("Connected to " + endpoint.address().to_string());
492✔
745

746
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
246✔
747
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
492✔
748

749
        for (const auto &header : request_->headers_) {
646✔
750
                request_data_.http_request_->set(header.first, header.second);
400✔
751
        }
752

753
        request_data_.http_request_serializer_ =
754
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
246✔
755

756
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
492✔
757

758
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
759
        // if they do, they should be handled higher up in the application logic.
760
        //
761
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
762
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
763
        // `has_value()` in their code, causing their subsequent comparison operation to
764
        // misbehave. So pass highest possible value instead.
765
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
766

767
        auto &cancelled = cancelled_;
768
        auto &request_data = request_data_;
246✔
769

770
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
246✔
771
                if (!*cancelled) {
246✔
772
                        WriteHeaderHandler(ec, num_written);
246✔
773
                }
774
        };
492✔
775

776
        switch (socket_mode_) {
246✔
777
        case SocketMode::TlsTls:
2✔
778
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
2✔
779
                break;
780
        case SocketMode::Tls:
14✔
781
                http::async_write_header(
14✔
782
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
783
                break;
784
        case SocketMode::Plain:
230✔
785
                http::async_write_header(
230✔
786
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
787
                break;
788
        }
789
}
790

791
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
246✔
792
        if (num_written > 0) {
246✔
793
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
492✔
794
        }
795

796
        if (ec) {
246✔
797
                CallErrorHandler(ec, request_, header_handler_);
×
798
                return;
206✔
799
        }
800

801
        auto exp_has_body =
802
                HasBody(request_->GetHeader("Content-Length"), request_->GetHeader("Transfer-Encoding"));
492✔
803
        if (!exp_has_body) {
246✔
804
                CallErrorHandler(exp_has_body.error(), request_, header_handler_);
×
805
                return;
×
806
        }
807
        if (!exp_has_body.value()) {
246✔
808
                ReadHeader();
205✔
809
                return;
810
        }
811

812
        if (!request_->body_gen_ && !request_->async_body_gen_) {
41✔
813
                auto err = MakeError(BodyMissingError, "No body generator");
2✔
814
                CallErrorHandler(err, request_, header_handler_);
2✔
815
                return;
816
        }
817

818
        assert(!(request_->body_gen_ && request_->async_body_gen_));
819

820
        if (request_->body_gen_) {
40✔
821
                auto body_reader = request_->body_gen_();
34✔
822
                if (!body_reader) {
34✔
823
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
824
                        return;
825
                }
826
                request_->body_reader_ = body_reader.value();
34✔
827
        } else {
828
                auto body_reader = request_->async_body_gen_();
6✔
829
                if (!body_reader) {
6✔
830
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
831
                        return;
832
                }
833
                request_->async_body_reader_ = body_reader.value();
6✔
834
        }
835

836
        PrepareAndWriteNewBodyBuffer();
40✔
837
}
838

839
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,278✔
840
        if (num_written > 0) {
2,278✔
841
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,242✔
842
        }
843

844
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,278✔
845
                // Write next block of the body.
846
                PrepareAndWriteNewBodyBuffer();
1,120✔
847
        } else if (ec) {
1,158✔
848
                CallErrorHandler(ec, request_, header_handler_);
8✔
849
        } else if (num_written > 0) {
1,154✔
850
                // We are still writing the body.
851
                WriteBody();
1,121✔
852
        } else {
853
                // We are ready to receive the response.
854
                ReadHeader();
33✔
855
        }
856
}
2,278✔
857

858
void Client::PrepareAndWriteNewBodyBuffer() {
1,160✔
859
        // request_->body_reader_ XOR request_->async_body_reader_
860
        assert(
861
                (request_->body_reader_ || request_->async_body_reader_)
862
                && !(request_->body_reader_ && request_->async_body_reader_));
863

864
        auto cancelled = cancelled_;
865
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,592✔
866
                if (!*cancelled) {
1,160✔
867
                        if (!read) {
1,159✔
868
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
869
                                return;
2✔
870
                        }
871
                        WriteNewBodyBuffer(read.value());
1,157✔
872
                }
873
        };
1,160✔
874

875

876
        if (request_->body_reader_) {
1,160✔
877
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,470✔
878
        } else {
879
                auto err = request_->async_body_reader_->AsyncRead(
880
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
881
                if (err != error::NoError) {
425✔
882
                        CallErrorHandler(err, request_, header_handler_);
×
883
                }
884
        }
885
}
1,160✔
886

887
void Client::WriteNewBodyBuffer(size_t size) {
1,157✔
888
        request_data_.http_request_->body().data = body_buffer_.data();
1,157✔
889
        request_data_.http_request_->body().size = size;
1,157✔
890

891
        if (size > 0) {
1,157✔
892
                request_data_.http_request_->body().more = true;
1,124✔
893
        } else {
894
                // Release ownership of Body reader.
895
                request_->body_reader_.reset();
33✔
896
                request_->async_body_reader_.reset();
33✔
897
                request_data_.http_request_->body().more = false;
33✔
898
        }
899

900
        WriteBody();
1,157✔
901
}
1,157✔
902

903
void Client::WriteBody() {
2,278✔
904
        auto &cancelled = cancelled_;
905
        auto &request_data = request_data_;
2,278✔
906

907
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,278✔
908
                if (!*cancelled) {
2,278✔
909
                        WriteBodyHandler(ec, num_written);
2,278✔
910
                }
911
        };
4,556✔
912

913
        switch (socket_mode_) {
2,278✔
914
        case SocketMode::TlsTls:
×
915
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
916
                break;
917
        case SocketMode::Tls:
×
918
                http::async_write_some(
919
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
920
                break;
921
        case SocketMode::Plain:
2,278✔
922
                http::async_write_some(
923
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
924
                break;
925
        }
926
}
2,278✔
927

928
void Client::ReadHeader() {
238✔
929
        auto &cancelled = cancelled_;
930
        auto &response_data = response_data_;
238✔
931

932
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
235✔
933
                if (!*cancelled) {
235✔
934
                        ReadHeaderHandler(ec, num_read);
235✔
935
                }
936
        };
476✔
937

938
        switch (socket_mode_) {
238✔
939
        case SocketMode::TlsTls:
2✔
940
                http::async_read_some(
2✔
941
                        *stream_,
942
                        *response_data_.response_buffer_,
943
                        *response_data_.http_response_parser_,
944
                        handler);
945
                break;
946
        case SocketMode::Tls:
14✔
947
                http::async_read_some(
14✔
948
                        stream_->next_layer(),
949
                        *response_data_.response_buffer_,
950
                        *response_data_.http_response_parser_,
951
                        handler);
952
                break;
953
        case SocketMode::Plain:
222✔
954
                http::async_read_some(
222✔
955
                        stream_->next_layer().next_layer(),
956
                        *response_data_.response_buffer_,
957
                        *response_data_.http_response_parser_,
958
                        handler);
959
                break;
960
        }
961
}
238✔
962

963
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
235✔
964
        if (num_read > 0) {
235✔
965
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
460✔
966
        }
967

968
        if (ec) {
235✔
969
                CallErrorHandler(ec, request_, header_handler_);
5✔
970
                return;
65✔
971
        }
972

973
        if (!response_data_.http_response_parser_->is_header_done()) {
230✔
974
                ReadHeader();
×
975
                return;
×
976
        }
977

978
        if (secondary_req_) {
230✔
979
                HandleSecondaryRequest();
9✔
980
                return;
9✔
981
        }
982

983
        response_.reset(new IncomingResponse(*this, cancelled_));
442✔
984
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
221✔
985
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
221✔
986

987
        logger_.Debug(
442✔
988
                "Received response: " + to_string(response_->status_code_) + " "
442✔
989
                + response_->status_message_);
663✔
990

991
        string debug_str;
992
        for (auto header = response_data_.http_response_parser_->get().cbegin();
254✔
993
                 header != response_data_.http_response_parser_->get().cend();
475✔
994
                 header++) {
995
                response_->headers_[string {header->name_string()}] = string {header->value()};
762✔
996
                if (logger_.Level() >= log::LogLevel::Debug) {
254✔
997
                        debug_str += string {header->name_string()};
239✔
998
                        debug_str += ": ";
239✔
999
                        debug_str += string {header->value()};
239✔
1000
                        debug_str += "\n";
239✔
1001
                }
1002
        }
1003

1004
        logger_.Debug("Received headers:\n" + debug_str);
442✔
1005
        debug_str.clear();
1006

1007
        if (GetContentLength(*response_data_.http_response_parser_) == 0
221✔
1008
                && !response_data_.http_response_parser_->chunked()) {
221✔
1009
                auto cancelled = cancelled_;
1010
                status_ = TransactionStatus::HeaderHandlerCalled;
48✔
1011
                CallHandler(header_handler_);
96✔
1012
                if (!*cancelled) {
48✔
1013
                        status_ = TransactionStatus::Done;
42✔
1014
                        if (response_->status_code_ != StatusCode::StatusSwitchingProtocols) {
42✔
1015
                                // Make an exception for 101 Switching Protocols response, where the TCP connection
1016
                                // is meant to be reused.
1017
                                DoCancel();
38✔
1018
                        }
1019
                        CallHandler(body_handler_);
84✔
1020
                }
1021
                return;
1022
        }
1023

1024
        auto cancelled = cancelled_;
1025
        status_ = TransactionStatus::HeaderHandlerCalled;
173✔
1026
        CallHandler(header_handler_);
346✔
1027
        if (*cancelled) {
173✔
1028
                return;
1029
        }
1030

1031
        // We know that a body reader is required here, because of the check for body above.
1032
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
170✔
1033
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
1034
        }
1035
}
1036

1037
void Client::HandleSecondaryRequest() {
9✔
1038
        logger_.Debug(
18✔
1039
                "Received proxy response: "
1040
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
18✔
1041
                + string {response_data_.http_response_parser_->get().reason()});
36✔
1042

1043
        request_ = std::move(secondary_req_);
1044

1045
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
9✔
1046
                auto err = MakeError(
1047
                        ProxyError,
1048
                        "Proxy returned unexpected response: "
1049
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
1050
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
1051
                CallErrorHandler(err, request_, header_handler_);
4✔
1052
                return;
1053
        }
1054

1055
        if (GetContentLength(*response_data_.http_response_parser_) != 0
7✔
1056
                || response_data_.http_response_parser_->chunked()) {
7✔
1057
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
1058
                CallErrorHandler(err, request_, header_handler_);
×
1059
                return;
1060
        }
1061

1062
        // We are connected. Now repeat the request cycle with the original request. Pretend
1063
        // we were just connected.
1064

1065
        assert(request_->GetProtocol() == "https");
1066

1067
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1068
        // a different layer, this will get out of sync.
1069
        assert(response_data_.response_buffer_->size() == 0);
1070

1071
        switch (socket_mode_) {
7✔
1072
        case SocketMode::TlsTls:
×
1073
                // Should never get here, because this is the only place where TlsTls mode
1074
                // is supposed to be turned on.
1075
                assert(false);
1076
                CallErrorHandler(
×
1077
                        error::MakeError(
×
1078
                                error::ProgrammingError,
1079
                                "Any other mode than Tls is not valid when handling secondary request"),
×
1080
                        request_,
×
1081
                        header_handler_);
×
1082
                break;
×
1083
        case SocketMode::Tls:
3✔
1084
                // Upgrade to TLS inside TLS.
1085
                socket_mode_ = SocketMode::TlsTls;
3✔
1086
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1087
                break;
3✔
1088
        case SocketMode::Plain:
4✔
1089
                // Upgrade to TLS.
1090
                socket_mode_ = SocketMode::Tls;
4✔
1091
                HandshakeHandler(
4✔
1092
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
4✔
1093
                break;
4✔
1094
        }
1095
}
1096

1097
void Client::AsyncReadNextBodyPart(
4,209✔
1098
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1099
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1100

1101
        if (status_ == TransactionStatus::ReaderCreated) {
4,209✔
1102
                status_ = TransactionStatus::BodyReadingInProgress;
151✔
1103
        }
1104

1105
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
4,209✔
1106
                auto cancelled = cancelled_;
1107
                handler(0);
182✔
1108
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
91✔
1109
                        status_ = TransactionStatus::Done;
91✔
1110
                        DoCancel();
91✔
1111
                        CallHandler(body_handler_);
182✔
1112
                }
1113
                return;
1114
        }
1115

1116
        reader_buf_start_ = start;
4,118✔
1117
        reader_buf_end_ = end;
4,118✔
1118
        reader_handler_ = handler;
4,118✔
1119
        size_t read_size = end - start;
4,118✔
1120
        size_t smallest = min(body_buffer_.size(), read_size);
6,231✔
1121

1122
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
4,118✔
1123
        response_data_.http_response_parser_->get().body().size = smallest;
4,118✔
1124
        response_data_.last_buffer_size_ = smallest;
4,118✔
1125

1126
        auto &cancelled = cancelled_;
1127
        auto &response_data = response_data_;
4,118✔
1128

1129
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4,117✔
1130
                if (!*cancelled) {
4,117✔
1131
                        ReadBodyHandler(ec, num_read);
4,116✔
1132
                }
1133
        };
8,236✔
1134

1135
        switch (socket_mode_) {
4,118✔
1136
        case SocketMode::TlsTls:
2✔
1137
                http::async_read_some(
2✔
1138
                        *stream_,
1139
                        *response_data_.response_buffer_,
1140
                        *response_data_.http_response_parser_,
1141
                        async_handler);
1142
                break;
1143
        case SocketMode::Tls:
4✔
1144
                http::async_read_some(
4✔
1145
                        stream_->next_layer(),
1146
                        *response_data_.response_buffer_,
1147
                        *response_data_.http_response_parser_,
1148
                        async_handler);
1149
                break;
1150
        case SocketMode::Plain:
4,112✔
1151
                http::async_read_some(
4,112✔
1152
                        stream_->next_layer().next_layer(),
1153
                        *response_data_.response_buffer_,
1154
                        *response_data_.http_response_parser_,
1155
                        async_handler);
1156
                break;
1157
        }
1158
}
1159

1160
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
4,116✔
1161
        if (num_read > 0) {
4,116✔
1162
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
8,132✔
1163
        }
1164

1165
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,116✔
1166
                // This can be ignored. We always reset the buffer between reads anyway.
1167
                ec = error_code();
1,958✔
1168
        }
1169

1170
        assert(reader_handler_);
1171

1172
        if (response_data_.http_response_parser_->is_done()) {
4,116✔
1173
                status_ = TransactionStatus::BodyReadingFinished;
97✔
1174
        }
1175

1176
        auto cancelled = cancelled_;
1177

1178
        if (ec) {
4,116✔
1179
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
100✔
1180
                reader_handler_(expected::unexpected(err));
150✔
1181
                if (!*cancelled) {
50✔
1182
                        CallErrorHandler(ec, request_, body_handler_);
92✔
1183
                }
1184
                return;
1185
        }
1186

1187
        // The num_read from above includes out of band payload data, such as chunk headers, which
1188
        // we are not interested in. So we need to calculate the payload size from the remaining
1189
        // buffer space.
1190
        size_t payload_read =
1191
                response_data_.last_buffer_size_ - response_data_.http_response_parser_->get().body().size;
4,066✔
1192

1193
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
4,066✔
1194
        size_t smallest = min(payload_read, buf_size);
4,066✔
1195

1196
        if (smallest == 0) {
4,066✔
1197
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1198
                // return 0 to the handler however, because in `io::Reader` context this means
1199
                // EOF. So just repeat the request instead, until we get actual payload data.
1200
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1201
        } else {
1202
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,989✔
1203
                reader_handler_(smallest);
7,978✔
1204
        }
1205
}
1206

1207
void Client::Cancel() {
147✔
1208
        auto cancelled = cancelled_;
1209

1210
        if (!*cancelled) {
147✔
1211
                auto err =
1212
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
42✔
1213
                switch (status_) {
21✔
1214
                case TransactionStatus::None:
3✔
1215
                        CallErrorHandler(err, request_, header_handler_);
3✔
1216
                        break;
3✔
1217
                case TransactionStatus::HeaderHandlerCalled:
17✔
1218
                case TransactionStatus::ReaderCreated:
1219
                case TransactionStatus::BodyReadingInProgress:
1220
                case TransactionStatus::BodyReadingFinished:
1221
                        CallErrorHandler(err, request_, body_handler_);
17✔
1222
                        break;
17✔
1223
                case TransactionStatus::Replying:
1224
                case TransactionStatus::SwitchingProtocol:
1225
                        // Not used by client.
1226
                        assert(false);
1227
                        break;
1228
                case TransactionStatus::BodyHandlerCalled:
1229
                case TransactionStatus::Done:
1230
                        break;
1231
                }
1232
        }
1233

1234
        if (!*cancelled) {
147✔
1235
                DoCancel();
1✔
1236
        }
1237
}
147✔
1238

1239
void Client::DoCancel() {
609✔
1240
        resolver_.cancel();
609✔
1241
        if (stream_) {
609✔
1242
                stream_->lowest_layer().cancel();
260✔
1243
                stream_->lowest_layer().close();
260✔
1244
                stream_.reset();
260✔
1245
        }
1246

1247
        // Reset logger to no connection.
1248
        logger_ = log::Logger(logger_name_);
609✔
1249

1250
        // Set cancel state and then make a new one. Those who are interested should have their own
1251
        // pointer to the old one.
1252
        *cancelled_ = true;
609✔
1253
        cancelled_ = make_shared<bool>(true);
609✔
1254
}
609✔
1255

1256
Stream::Stream(Server &server) :
441✔
1257
        server_ {server},
1258
        logger_ {"http"},
1259
        cancelled_(make_shared<bool>(true)),
441✔
1260
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
441✔
1261
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,323✔
1262
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
882✔
1263

1264
        // This is equivalent to:
1265
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1266
        // but compatible with Boost 1.67.
1267
        request_data_.request_buffer_->prepare(
1268
                body_buffer_.size() - request_data_.request_buffer_->size());
441✔
1269

1270
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
882✔
1271

1272
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1273
        // they do, they should be handled higher up in the application logic.
1274
        //
1275
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1276
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1277
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1278
        // possible value instead.
1279
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1280
}
441✔
1281

1282
Stream::~Stream() {
1,323✔
1283
        DoCancel();
441✔
1284
}
441✔
1285

1286
void Stream::Cancel() {
7✔
1287
        auto cancelled = cancelled_;
1288

1289
        if (!*cancelled) {
7✔
1290
                auto err =
1291
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1292
                switch (status_) {
7✔
1293
                case TransactionStatus::None:
×
1294
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1295
                        break;
×
1296
                case TransactionStatus::HeaderHandlerCalled:
5✔
1297
                case TransactionStatus::ReaderCreated:
1298
                case TransactionStatus::BodyReadingInProgress:
1299
                case TransactionStatus::BodyReadingFinished:
1300
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1301
                        break;
5✔
1302
                case TransactionStatus::BodyHandlerCalled:
×
1303
                        // In between body handler and reply finished. No one to handle the status
1304
                        // here.
1305
                        server_.RemoveStream(shared_from_this());
×
1306
                        break;
×
1307
                case TransactionStatus::Replying:
1✔
1308
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1309
                        break;
1✔
1310
                case TransactionStatus::SwitchingProtocol:
1✔
1311
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1312
                        break;
1✔
1313
                case TransactionStatus::Done:
1314
                        break;
1315
                }
1316
        }
1317

1318
        if (!*cancelled) {
7✔
1319
                DoCancel();
×
1320
        }
1321
}
7✔
1322

1323
void Stream::DoCancel() {
799✔
1324
        if (socket_.is_open()) {
799✔
1325
                socket_.cancel();
217✔
1326
                socket_.close();
217✔
1327
        }
1328

1329
        // Set cancel state and then make a new one. Those who are interested should have their own
1330
        // pointer to the old one.
1331
        *cancelled_ = true;
799✔
1332
        cancelled_ = make_shared<bool>(true);
799✔
1333
}
799✔
1334

1335
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1336
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1337
}
×
1338

1339
void Stream::CallErrorHandler(
×
1340
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1341
        status_ = TransactionStatus::Done;
×
1342
        DoCancel();
×
1343
        handler(expected::unexpected(err.WithContext(
×
1344
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1345

1346
        server_.RemoveStream(shared_from_this());
×
1347
}
×
1348

1349
void Stream::CallErrorHandler(
2✔
1350
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1351
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1352
}
2✔
1353

1354
void Stream::CallErrorHandler(
8✔
1355
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1356
        status_ = TransactionStatus::Done;
8✔
1357
        DoCancel();
8✔
1358
        handler(
8✔
1359
                req,
1360
                err.WithContext(
8✔
1361
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
24✔
1362

1363
        server_.RemoveStream(shared_from_this());
8✔
1364
}
8✔
1365

1366
void Stream::CallErrorHandler(
4✔
1367
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1368
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1369
}
4✔
1370

1371
void Stream::CallErrorHandler(
7✔
1372
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1373
        status_ = TransactionStatus::Done;
7✔
1374
        DoCancel();
7✔
1375
        handler(err.WithContext(
14✔
1376
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1377

1378
        server_.RemoveStream(shared_from_this());
7✔
1379
}
7✔
1380

1381
void Stream::CallErrorHandler(
×
1382
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1383
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1384
}
×
1385

1386
void Stream::CallErrorHandler(
1✔
1387
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1388
        status_ = TransactionStatus::Done;
1✔
1389
        DoCancel();
1✔
1390
        handler(expected::unexpected(err.WithContext(
2✔
1391
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1392

1393
        server_.RemoveStream(shared_from_this());
1✔
1394
}
1✔
1395

1396
void Stream::AcceptHandler(const error_code &ec) {
225✔
1397
        if (ec) {
225✔
1398
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1399
                return;
×
1400
        }
1401

1402
        auto ip = socket_.remote_endpoint().address().to_string();
450✔
1403

1404
        // Use IP as context for logging.
1405
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
225✔
1406

1407
        logger_.Debug("Accepted connection.");
450✔
1408

1409
        request_.reset(new IncomingRequest(*this, cancelled_));
450✔
1410

1411
        request_->address_.host = ip;
225✔
1412

1413
        *cancelled_ = false;
225✔
1414

1415
        ReadHeader();
225✔
1416
}
1417

1418
void Stream::ReadHeader() {
225✔
1419
        auto &cancelled = cancelled_;
1420
        auto &request_data = request_data_;
225✔
1421

1422
        http::async_read_some(
450✔
1423
                socket_,
225✔
1424
                *request_data_.request_buffer_,
1425
                *request_data_.http_request_parser_,
1426
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
225✔
1427
                        if (!*cancelled) {
225✔
1428
                                ReadHeaderHandler(ec, num_read);
225✔
1429
                        }
1430
                });
225✔
1431
}
225✔
1432

1433
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
225✔
1434
        if (num_read > 0) {
225✔
1435
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
450✔
1436
        }
1437

1438
        if (ec) {
225✔
1439
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1440
                return;
185✔
1441
        }
1442

1443
        if (!request_data_.http_request_parser_->is_header_done()) {
225✔
1444
                ReadHeader();
×
1445
                return;
×
1446
        }
1447

1448
        auto method_result = BeastVerbToMethod(
1449
                request_data_.http_request_parser_->get().base().method(),
1450
                string {request_data_.http_request_parser_->get().base().method_string()});
450✔
1451
        if (!method_result) {
225✔
1452
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1453
                return;
×
1454
        }
1455
        request_->method_ = method_result.value();
225✔
1456
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
225✔
1457

1458
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
225✔
1459

1460
        string debug_str;
1461
        for (auto header = request_data_.http_request_parser_->get().cbegin();
391✔
1462
                 header != request_data_.http_request_parser_->get().cend();
616✔
1463
                 header++) {
1464
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,173✔
1465
                if (logger_.Level() >= log::LogLevel::Debug) {
391✔
1466
                        debug_str += string {header->name_string()};
326✔
1467
                        debug_str += ": ";
326✔
1468
                        debug_str += string {header->value()};
326✔
1469
                        debug_str += "\n";
326✔
1470
                }
1471
        }
1472

1473
        logger_.Debug("Received headers:\n" + debug_str);
450✔
1474
        debug_str.clear();
1475

1476
        if (GetContentLength(*request_data_.http_request_parser_) == 0
225✔
1477
                && !request_data_.http_request_parser_->chunked()) {
225✔
1478
                auto cancelled = cancelled_;
1479
                status_ = TransactionStatus::HeaderHandlerCalled;
184✔
1480
                server_.header_handler_(request_);
368✔
1481
                if (!*cancelled) {
184✔
1482
                        status_ = TransactionStatus::BodyHandlerCalled;
184✔
1483
                        CallBodyHandler();
184✔
1484
                }
1485
                return;
1486
        }
1487

1488
        assert(!request_data_.http_request_parser_->is_done());
1489

1490
        auto cancelled = cancelled_;
1491
        status_ = TransactionStatus::HeaderHandlerCalled;
41✔
1492
        server_.header_handler_(request_);
82✔
1493
        if (*cancelled) {
41✔
1494
                return;
1495
        }
1496

1497
        // We know that a body reader is required here, because of the check for body above.
1498
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
40✔
1499
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1500
        }
1501
}
1502

1503
void Stream::AsyncReadNextBodyPart(
2,264✔
1504
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1505
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1506

1507
        if (status_ == TransactionStatus::ReaderCreated) {
2,264✔
1508
                status_ = TransactionStatus::BodyReadingInProgress;
39✔
1509
        }
1510

1511
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,264✔
1512
                auto cancelled = cancelled_;
1513
                handler(0);
66✔
1514
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
33✔
1515
                        status_ = TransactionStatus::BodyHandlerCalled;
33✔
1516
                        CallBodyHandler();
33✔
1517
                }
1518
                return;
1519
        }
1520

1521
        reader_buf_start_ = start;
2,231✔
1522
        reader_buf_end_ = end;
2,231✔
1523
        reader_handler_ = handler;
2,231✔
1524
        size_t read_size = end - start;
2,231✔
1525
        size_t smallest = min(body_buffer_.size(), read_size);
3,287✔
1526

1527
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,231✔
1528
        request_data_.http_request_parser_->get().body().size = smallest;
2,231✔
1529
        request_data_.last_buffer_size_ = smallest;
2,231✔
1530

1531
        auto &cancelled = cancelled_;
1532
        auto &request_data = request_data_;
2,231✔
1533

1534
        http::async_read_some(
4,462✔
1535
                socket_,
2,231✔
1536
                *request_data_.request_buffer_,
1537
                *request_data_.http_request_parser_,
1538
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,231✔
1539
                        if (!*cancelled) {
2,231✔
1540
                                ReadBodyHandler(ec, num_read);
2,231✔
1541
                        }
1542
                });
2,231✔
1543
}
1544

1545
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,231✔
1546
        if (num_read > 0) {
2,231✔
1547
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,454✔
1548
        }
1549

1550
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,231✔
1551
                // This can be ignored. We always reset the buffer between reads anyway.
1552
                ec = error_code();
979✔
1553
        }
1554

1555
        assert(reader_handler_);
1556

1557
        if (request_data_.http_request_parser_->is_done()) {
2,231✔
1558
                status_ = TransactionStatus::BodyReadingFinished;
33✔
1559
        }
1560

1561
        auto cancelled = cancelled_;
1562

1563
        if (ec) {
2,231✔
1564
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1565
                reader_handler_(expected::unexpected(err));
12✔
1566
                if (!*cancelled) {
4✔
1567
                        CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1568
                }
1569
                return;
1570
        }
1571

1572
        // The num_read from above includes out of band payload data, such as chunk headers, which
1573
        // we are not interested in. So we need to calculate the payload size from the remaining
1574
        // buffer space.
1575
        size_t payload_read =
1576
                request_data_.last_buffer_size_ - request_data_.http_request_parser_->get().body().size;
2,227✔
1577

1578
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,227✔
1579
        size_t smallest = min(payload_read, buf_size);
2,227✔
1580

1581
        if (smallest == 0) {
2,227✔
1582
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1583
                // return 0 to the handler however, because in `io::Reader` context this means
1584
                // EOF. So just repeat the request instead, until we get actual payload data.
1585
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1586
        } else {
1587
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,150✔
1588
                reader_handler_(smallest);
4,300✔
1589
        }
1590
}
1591

1592
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
202✔
1593
        SetupResponse();
202✔
1594

1595
        reply_finished_handler_ = reply_finished_handler;
202✔
1596

1597
        auto &cancelled = cancelled_;
1598
        auto &response_data = response_data_;
202✔
1599

1600
        http::async_write_header(
404✔
1601
                socket_,
202✔
1602
                *response_data_.http_response_serializer_,
1603
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
202✔
1604
                        if (!*cancelled) {
202✔
1605
                                WriteHeaderHandler(ec, num_written);
201✔
1606
                        }
1607
                });
202✔
1608
}
202✔
1609

1610
void Stream::SetupResponse() {
211✔
1611
        auto response = maybe_response_.lock();
211✔
1612
        // Only called from existing responses, so this should always be true.
1613
        assert(response);
1614

1615
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1616
        status_ = TransactionStatus::Replying;
211✔
1617

1618
        // From here on we take shared ownership.
1619
        response_ = response;
1620

1621
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
422✔
1622

1623
        for (const auto &header : response->headers_) {
438✔
1624
                response_data_.http_response_->base().set(header.first, header.second);
227✔
1625
        }
1626

1627
        response_data_.http_response_->result(response->GetStatusCode());
211✔
1628
        response_data_.http_response_->reason(response->GetStatusMessage());
422✔
1629

1630
        response_data_.http_response_serializer_ =
1631
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
422✔
1632
}
211✔
1633

1634
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
201✔
1635
        if (num_written > 0) {
201✔
1636
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
402✔
1637
        }
1638

1639
        if (ec) {
201✔
1640
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1641
                return;
37✔
1642
        }
1643

1644
        auto exp_has_body =
1645
                HasBody(response_->GetHeader("Content-Length"), response_->GetHeader("Transfer-Encoding"));
402✔
1646
        if (!exp_has_body) {
201✔
1647
                CallErrorHandler(exp_has_body.error(), request_, reply_finished_handler_);
×
1648
                return;
×
1649
        }
1650
        if (!exp_has_body.value()) {
201✔
1651
                FinishReply();
36✔
1652
                return;
1653
        }
1654

1655
        if (!response_->body_reader_ && !response_->async_body_reader_) {
165✔
1656
                auto err = MakeError(BodyMissingError, "No body reader");
2✔
1657
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1658
                return;
1659
        }
1660

1661
        PrepareAndWriteNewBodyBuffer();
164✔
1662
}
1663

1664
void Stream::PrepareAndWriteNewBodyBuffer() {
2,104✔
1665
        // response_->body_reader_ XOR response_->async_body_reader_
1666
        assert(
1667
                (response_->body_reader_ || response_->async_body_reader_)
1668
                && !(response_->body_reader_ && response_->async_body_reader_));
1669

1670
        auto read_handler = [this](io::ExpectedSize read) {
2,105✔
1671
                if (!read) {
2,104✔
1672
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1673
                        return;
1✔
1674
                }
1675
                WriteNewBodyBuffer(read.value());
2,103✔
1676
        };
2,104✔
1677

1678
        if (response_->body_reader_) {
2,104✔
1679
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,660✔
1680
        } else {
1681
                auto err = response_->async_body_reader_->AsyncRead(
1682
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1683
                if (err != error::NoError) {
274✔
1684
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1685
                }
1686
        }
1687
}
2,104✔
1688

1689
void Stream::WriteNewBodyBuffer(size_t size) {
2,103✔
1690
        response_data_.http_response_->body().data = body_buffer_.data();
2,103✔
1691
        response_data_.http_response_->body().size = size;
2,103✔
1692

1693
        if (size > 0) {
2,103✔
1694
                response_data_.http_response_->body().more = true;
1,973✔
1695
        } else {
1696
                response_data_.http_response_->body().more = false;
130✔
1697
        }
1698

1699
        WriteBody();
2,103✔
1700
}
2,103✔
1701

1702
void Stream::WriteBody() {
4,056✔
1703
        auto &cancelled = cancelled_;
1704
        auto &response_data = response_data_;
4,056✔
1705

1706
        http::async_write_some(
8,112✔
1707
                socket_,
4,056✔
1708
                *response_data_.http_response_serializer_,
1709
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
4,016✔
1710
                        if (!*cancelled) {
4,016✔
1711
                                WriteBodyHandler(ec, num_written);
4,016✔
1712
                        }
1713
                });
4,016✔
1714
}
4,056✔
1715

1716
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
4,016✔
1717
        if (num_written > 0) {
4,016✔
1718
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,906✔
1719
        }
1720

1721
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,016✔
1722
                // Write next body block.
1723
                PrepareAndWriteNewBodyBuffer();
1,940✔
1724
        } else if (ec) {
2,076✔
1725
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1726
        } else if (num_written > 0) {
2,072✔
1727
                // We are still writing the body.
1728
                WriteBody();
1,953✔
1729
        } else {
1730
                // We are finished.
1731
                FinishReply();
119✔
1732
        }
1733
}
4,016✔
1734

1735
void Stream::FinishReply() {
155✔
1736
        // We are done.
1737
        status_ = TransactionStatus::Done;
155✔
1738
        DoCancel();
155✔
1739
        // Release ownership of Body reader.
1740
        response_->body_reader_.reset();
155✔
1741
        response_->async_body_reader_.reset();
155✔
1742
        reply_finished_handler_(error::NoError);
155✔
1743
        server_.RemoveStream(shared_from_this());
155✔
1744
}
155✔
1745

1746
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1747
        SetupResponse();
9✔
1748

1749
        switch_protocol_handler_ = handler;
9✔
1750
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1751

1752
        auto &cancelled = cancelled_;
1753
        auto &response_data = response_data_;
9✔
1754

1755
        http::async_write_header(
18✔
1756
                socket_,
9✔
1757
                *response_data_.http_response_serializer_,
1758
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1759
                        if (!*cancelled) {
9✔
1760
                                SwitchingProtocolHandler(ec, num_written);
8✔
1761
                        }
1762
                });
9✔
1763

1764
        return error::NoError;
9✔
1765
}
1766

1767
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1768
        if (num_written > 0) {
8✔
1769
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1770
        }
1771

1772
        if (ec) {
8✔
1773
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1774
                return;
×
1775
        }
1776

1777
        auto socket = make_shared<RawSocket<tcp::socket>>(
1778
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1779

1780
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1781

1782
        // Rest of the connection is done directly on the socket, set cancelled_ but don't close it.
1783
        *cancelled_ = true;
8✔
1784
        cancelled_ = make_shared<bool>(true);
8✔
1785
        server_.RemoveStream(shared_from_this());
16✔
1786

1787
        switch_protocol_handler(socket);
16✔
1788
}
1789

1790
void Stream::CallBodyHandler() {
217✔
1791
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1792
        // it immediately destroys, which would destroy this stream as well. At the end of this
1793
        // function, it's ok to destroy it.
1794
        auto stream_ref = shared_from_this();
1795

1796
        server_.body_handler_(request_, error::NoError);
651✔
1797

1798
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1799
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1800
        // request has not been handled correctly.
1801
        auto response = maybe_response_.lock();
217✔
1802
        if (!response) {
217✔
1803
                logger_.Error("Handler produced no response. Closing stream prematurely.");
6✔
1804
                *cancelled_ = true;
3✔
1805
                cancelled_ = make_shared<bool>(true);
3✔
1806
                server_.RemoveStream(shared_from_this());
9✔
1807
        }
1808
}
217✔
1809

1810
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
237✔
1811
        event_loop_ {event_loop},
1812
        acceptor_(GetAsioIoContext(event_loop_)) {
428✔
1813
}
237✔
1814

1815
Server::~Server() {
474✔
1816
        Cancel();
237✔
1817
}
237✔
1818

1819
error::Error Server::AsyncServeUrl(
202✔
1820
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1821
        return AsyncServeUrl(
1822
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
822✔
1823
                        if (err != error::NoError) {
212✔
1824
                                body_handler(expected::unexpected(err));
12✔
1825
                        } else {
1826
                                body_handler(req);
412✔
1827
                        }
1828
                });
616✔
1829
}
1830

1831
error::Error Server::AsyncServeUrl(
217✔
1832
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1833
        auto err = BreakDownUrl(url, address_);
217✔
1834
        if (error::NoError != err) {
217✔
1835
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1836
        }
1837

1838
        if (address_.protocol != "http") {
217✔
1839
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1840
        }
1841

1842
        if (address_.path.size() > 0 && address_.path != "/") {
217✔
1843
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1844
        }
1845

1846
        boost::system::error_code ec;
216✔
1847
        auto address = asio::ip::make_address(address_.host, ec);
216✔
1848
        if (ec) {
216✔
1849
                return error::Error(
1850
                        ec.default_error_condition(),
×
1851
                        "Could not construct endpoint from address " + address_.host);
×
1852
        }
1853

1854
        asio::ip::tcp::endpoint endpoint(address, address_.port);
216✔
1855

1856
        ec.clear();
1857
        acceptor_.open(endpoint.protocol(), ec);
216✔
1858
        if (ec) {
216✔
1859
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1860
        }
1861

1862
        // Allow address reuse, otherwise we can't re-bind later.
1863
        ec.clear();
1864
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
216✔
1865
        if (ec) {
216✔
1866
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1867
        }
1868

1869
        ec.clear();
1870
        acceptor_.bind(endpoint, ec);
216✔
1871
        if (ec) {
216✔
1872
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1873
        }
1874

1875
        ec.clear();
1876
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
216✔
1877
        if (ec) {
216✔
1878
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1879
        }
1880

1881
        header_handler_ = header_handler;
216✔
1882
        body_handler_ = body_handler;
216✔
1883

1884
        PrepareNewStream();
216✔
1885

1886
        return error::NoError;
216✔
1887
}
1888

1889
void Server::Cancel() {
257✔
1890
        if (acceptor_.is_open()) {
257✔
1891
                acceptor_.cancel();
216✔
1892
                acceptor_.close();
216✔
1893
        }
1894
        streams_.clear();
1895
}
257✔
1896

1897
uint16_t Server::GetPort() const {
17✔
1898
        return acceptor_.local_endpoint().port();
17✔
1899
}
1900

1901
string Server::GetUrl() const {
16✔
1902
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1903
}
1904

1905
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
216✔
1906
        if (*req->cancelled_) {
216✔
1907
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1908
        }
1909
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
432✔
1910
        req->stream_.maybe_response_ = response;
216✔
1911
        return response;
216✔
1912
}
1913

1914
error::Error Server::AsyncReply(
202✔
1915
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1916
        if (*resp->cancelled_) {
202✔
1917
                return MakeError(StreamCancelledError, "Cannot send response");
×
1918
        }
1919

1920
        resp->stream_.AsyncReply(reply_finished_handler);
202✔
1921
        return error::NoError;
202✔
1922
}
1923

1924
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
58✔
1925
        if (*req->cancelled_) {
58✔
1926
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1927
        }
1928

1929
        auto &stream = req->stream_;
58✔
1930
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
58✔
1931
                return expected::unexpected(error::Error(
1✔
1932
                        make_error_condition(errc::operation_in_progress),
2✔
1933
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1934
        }
1935

1936
        if (GetContentLength(*stream.request_data_.http_request_parser_) == 0
57✔
1937
                && !stream.request_data_.http_request_parser_->chunked()) {
57✔
1938
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
54✔
1939
        }
1940

1941
        stream.status_ = TransactionStatus::ReaderCreated;
39✔
1942
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
78✔
1943
}
1944

1945
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1946
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1947
}
1948

1949
void Server::PrepareNewStream() {
441✔
1950
        StreamPtr new_stream {new Stream(*this)};
441✔
1951
        streams_.insert(new_stream);
1952
        AsyncAccept(new_stream);
882✔
1953
}
441✔
1954

1955
void Server::AsyncAccept(StreamPtr stream) {
441✔
1956
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
669✔
1957
                if (ec) {
228✔
1958
                        if (ec != errc::operation_canceled) {
3✔
1959
                                log::Error("Could not accept connection: " + ec.message());
×
1960
                        }
1961
                        return;
3✔
1962
                }
1963

1964
                stream->AcceptHandler(ec);
225✔
1965

1966
                this->PrepareNewStream();
225✔
1967
        });
1968
}
441✔
1969

1970
void Server::RemoveStream(StreamPtr stream) {
187✔
1971
        streams_.erase(stream);
187✔
1972

1973
        stream->DoCancel();
187✔
1974
}
187✔
1975

1976
} // namespace http
1977
} // namespace common
1978
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc