• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1395024768

31 Jul 2024 08:57AM UTC coverage: 79.856%. Remained the same
1395024768

push

gitlab-ci

lluiscampos
fix: URL-decode proxy username and password

In cases where the proxy username or password contains characters that
cannot show up in URLs, they should be URL-encoded.

This is to ensure backwards compatibility with the Mender client
3.

Unfortunately, tinyproxy considers all special characters in the
BasicAuth configuration entry as syntax error so we have no way
to test this.

Ticket: MEN-7402
Changelog: none
Signed-off-by: Vratislav Podzimek <vratislav.podzimek@northern.tech>
(cherry picked from commit 819d3d1b2)

7 of 9 new or added lines in 2 files covered. (77.78%)

89 existing lines in 2 files now uncovered.

7120 of 8916 relevant lines covered (79.86%)

12270.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.26
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
namespace mender {
28
namespace http {
29

30
namespace common = mender::common;
31
namespace crypto = mender::common::crypto;
32

33
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
34
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
35
const unsigned int BeastHttpVersion = 11;
36

37
namespace asio = boost::asio;
38
namespace http = boost::beast::http;
39

40
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
41

42
static http::verb MethodToBeastVerb(Method method) {
246✔
43
        switch (method) {
246✔
44
        case Method::GET:
45
                return http::verb::get;
46
        case Method::HEAD:
47
                return http::verb::head;
48
        case Method::POST:
49
                return http::verb::post;
50
        case Method::PUT:
51
                return http::verb::put;
52
        case Method::PATCH:
53
                return http::verb::patch;
54
        case Method::CONNECT:
55
                return http::verb::connect;
56
        case Method::Invalid:
57
                // Fallthrough to end (no-op).
58
                break;
59
        }
60
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
61
        // still assert here for safety.
62
        assert(false);
63
        return http::verb::get;
64
}
65

66
static expected::expected<Method, error::Error> BeastVerbToMethod(
225✔
67
        http::verb verb, const string &verb_string) {
68
        switch (verb) {
225✔
69
        case http::verb::get:
189✔
70
                return Method::GET;
71
        case http::verb::head:
×
72
                return Method::HEAD;
73
        case http::verb::post:
13✔
74
                return Method::POST;
75
        case http::verb::put:
23✔
76
                return Method::PUT;
77
        case http::verb::patch:
×
78
                return Method::PATCH;
79
        case http::verb::connect:
×
80
                return Method::CONNECT;
81
        default:
×
82
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
83
        }
84
}
85

86
template <typename StreamType>
87
class BodyAsyncReader : virtual public io::AsyncReader {
88
public:
89
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
153✔
90
                stream_ {stream},
91
                cancelled_ {cancelled} {
306✔
92
        }
153✔
93
        ~BodyAsyncReader() {
39✔
94
                Cancel();
39✔
95
        }
78✔
96

97
        error::Error AsyncRead(
2,187✔
98
                vector<uint8_t>::iterator start,
99
                vector<uint8_t>::iterator end,
100
                io::AsyncIoHandler handler) override {
101
                if (eof_) {
2,187✔
102
                        handler(0);
×
103
                        return error::NoError;
×
104
                }
105

106
                if (*cancelled_) {
2,187✔
107
                        return error::MakeError(
×
108
                                error::ProgrammingError,
109
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
110
                }
111
                stream_.AsyncReadNextBodyPart(start, end, [this, handler](io::ExpectedSize size) {
12,822✔
112
                        if (size && size.value() == 0) {
6,317✔
113
                                eof_ = true;
124✔
114
                        }
115
                        handler(size);
12,634✔
116
                });
117
                return error::NoError;
2,187✔
118
        }
119

120
        void Cancel() override {
39✔
121
                if (!*cancelled_) {
39✔
122
                        stream_.Cancel();
4✔
123
                }
124
        }
39✔
125

126
private:
127
        StreamType &stream_;
128
        shared_ptr<bool> cancelled_;
129
        bool eof_ {false};
130

131
        friend class Client;
132
        friend class Server;
133
};
134

135
template <typename StreamType>
136
class RawSocket : virtual public io::AsyncReadWriter {
137
public:
138
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
139
                destroying_ {make_shared<bool>(false)},
×
140
                stream_ {stream},
141
                buffered_ {buffered} {
×
142
                // If there are no buffered bytes, then we don't need it.
143
                if (buffered_ && buffered_->size() == 0) {
×
144
                        buffered_.reset();
×
145
                }
146
        }
×
147

148
        ~RawSocket() {
15✔
149
                *destroying_ = true;
15✔
150
                Cancel();
15✔
151
        }
30✔
152

153
        error::Error AsyncRead(
320✔
154
                vector<uint8_t>::iterator start,
155
                vector<uint8_t>::iterator end,
156
                io::AsyncIoHandler handler) override {
157
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
158
                // header and parts of the body in one block, return those first.
159
                if (buffered_) {
320✔
160
                        return DrainPrebufferedData(start, end, handler);
8✔
161
                }
162

163
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
164
                auto &destroying = destroying_;
165
                stream_->async_read_some(
632✔
166
                        read_buffer_,
316✔
167
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
168
                                if (*destroying) {
313✔
169
                                        return;
170
                                }
171

172
                                if (ec == asio::error::operation_aborted) {
313✔
173
                                        handler(expected::unexpected(error::Error(
12✔
174
                                                make_error_condition(errc::operation_canceled),
6✔
175
                                                "Could not read from socket")));
176
                                } else if (ec) {
310✔
177
                                        handler(expected::unexpected(
12✔
178
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
179
                                } else {
180
                                        handler(num_read);
608✔
181
                                }
182
                        });
183
                return error::NoError;
316✔
184
        }
185

186
        error::Error AsyncWrite(
309✔
187
                vector<uint8_t>::const_iterator start,
188
                vector<uint8_t>::const_iterator end,
189
                io::AsyncIoHandler handler) override {
190
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
191
                auto &destroying = destroying_;
192
                stream_->async_write_some(
618✔
193
                        write_buffer_,
309✔
194
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
195
                                if (*destroying) {
306✔
196
                                        return;
197
                                }
198

199
                                if (ec == asio::error::operation_aborted) {
306✔
200
                                        handler(expected::unexpected(error::Error(
×
201
                                                make_error_condition(errc::operation_canceled),
×
202
                                                "Could not write to socket")));
203
                                } else if (ec) {
306✔
204
                                        handler(expected::unexpected(
×
205
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
206
                                } else {
207
                                        handler(num_written);
612✔
208
                                }
209
                        });
210
                return error::NoError;
309✔
211
        }
212

213
        void Cancel() override {
28✔
214
                if (stream_->lowest_layer().is_open()) {
28✔
215
                        stream_->lowest_layer().cancel();
15✔
216
                        stream_->lowest_layer().close();
15✔
217
                }
218
        }
28✔
219

220
private:
221
        error::Error DrainPrebufferedData(
4✔
222
                vector<uint8_t>::iterator start,
223
                vector<uint8_t>::iterator end,
224
                io::AsyncIoHandler handler) {
225
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
226

227
                // These two lines are equivalent to:
228
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
229
                // but compatible with Boost 1.67.
230
                const beast::flat_buffer &cbuffered = *buffered_;
231
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
232
                buffered_->consume(to_copy);
4✔
233
                if (buffered_->size() == 0) {
4✔
234
                        // We don't need it anymore.
235
                        buffered_.reset();
4✔
236
                }
237
                handler(to_copy);
4✔
238
                return error::NoError;
4✔
239
        }
240

241
        shared_ptr<bool> destroying_;
242
        shared_ptr<StreamType> stream_;
243
        shared_ptr<beast::flat_buffer> buffered_;
244
        asio::mutable_buffer read_buffer_;
245
        asio::const_buffer write_buffer_;
246
};
247

248
template <typename PARSER>
249
size_t GetContentLength(const PARSER &parser) {
398✔
250
        auto content_length = parser.content_length();
398✔
251
        if (content_length) {
398✔
252
                return content_length.value();
351✔
253
        } else {
254
                return 0;
255
        }
256
}
257

258
expected::ExpectedBool HasBody(
447✔
259
        const expected::ExpectedString &content_length,
260
        const expected::ExpectedString &transfer_encoding) {
261
        if (transfer_encoding) {
447✔
262
                if (transfer_encoding.value() != "chunked") {
2✔
263
                        return expected::unexpected(error::Error(
×
264
                                make_error_condition(errc::not_supported),
×
265
                                "Unsupported Transfer-Encoding: " + transfer_encoding.value()));
×
266
                }
267
                return true;
268
        }
269

270
        if (content_length) {
445✔
271
                auto length = common::StringToLongLong(content_length.value());
219✔
272
                if (!length || length.value() < 0) {
219✔
273
                        return expected::unexpected(error::Error(
×
274
                                length.error().code,
275
                                "Content-Length contains invalid number: " + content_length.value()));
×
276
                }
277
                return length.value() > 0;
219✔
278
        }
279

280
        return false;
281
}
282

283
Client::Client(
353✔
284
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
285
        event_loop_ {event_loop},
286
        logger_name_ {logger_name},
287
        client_config_ {client},
288
        http_proxy_ {client.http_proxy},
353✔
289
        https_proxy_ {client.https_proxy},
353✔
290
        no_proxy_ {client.no_proxy},
353✔
291
        cancelled_ {make_shared<bool>(true)},
×
292
        resolver_(GetAsioIoContext(event_loop)),
293
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,412✔
294
}
353✔
295

296
Client::~Client() {
2,118✔
297
        if (!*cancelled_) {
353✔
298
                logger_.Warning("Client destroyed while request is still active!");
32✔
299
        }
300
        DoCancel();
353✔
301
}
353✔
302

303
error::Error Client::Initialize() {
284✔
304
        if (initialized_) {
284✔
305
                return error::NoError;
71✔
306
        }
307

308
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
631✔
309
                ssl_ctx_[i].set_verify_mode(
842✔
310
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
311

312
                beast::error_code ec {};
422✔
313
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
422✔
314
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
315
                        ssl_ctx_[i].use_certificate_file(
316
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
317
                        if (ec) {
4✔
318
                                return error::Error(
319
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
320
                        }
321
                        auto exp_key = crypto::PrivateKey::Load(
322
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
323
                        if (!exp_key) {
3✔
324
                                return exp_key.error().WithContext(
325
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
326
                        }
327

328
                        const int ret =
329
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value().Get());
2✔
330
                        if (ret != 1) {
2✔
331
                                return MakeError(
332
                                        HTTPInitError,
333
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
334
                                                + " to the SSL CTX");
×
335
                        }
336
                } else if (
337
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
418✔
338
                        return error::Error(
339
                                make_error_condition(errc::invalid_argument),
4✔
340
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
341
                }
342

343
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
418✔
344
                if (ec) {
418✔
345
                        auto err = error::Error(
346
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
347
                        if (client_config_.server_cert_path == "") {
×
348
                                // We aren't going to have any valid certificates then.
349
                                return err;
×
350
                        } else {
351
                                // We have a dedicated certificate, so this is not fatal.
352
                                log::Info(err.String());
×
353
                        }
354
                }
355
                if (client_config_.server_cert_path != "") {
418✔
356
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
52✔
357
                        if (ec) {
52✔
358
                                log::Warning("Failed to load the server certificate! Falling back to the CA store");
4✔
359
                        }
360
                }
361
        }
362

363
        initialized_ = true;
209✔
364

365
        return error::NoError;
209✔
366
}
367

368
// Create the HOST header according to:
369
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.23
370
// In short: Add the port-number if it is non-standard HTTP
371
static string CreateHOSTAddress(OutgoingRequestPtr req) {
272✔
372
        if (req->GetPort() == 80 || req->GetPort() == 443) {
272✔
373
                return req->GetHost();
4✔
374
        }
375
        return req->GetHost() + ":" + to_string(req->GetPort());
536✔
376
}
377

378
error::Error Client::AsyncCall(
284✔
379
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
380
        auto err = Initialize();
284✔
381
        if (err != error::NoError) {
284✔
382
                return err;
4✔
383
        }
384

385
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
280✔
386
                return error::Error(
387
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
388
        }
389

390
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
280✔
391
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
392
        }
393

394
        if (!header_handler || !body_handler) {
278✔
395
                return error::MakeError(
396
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
397
        }
398

399
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
277✔
400
                return error::Error(
401
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
402
        }
403

404
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
276✔
405

406
        request_ = req;
407

408
        err = HandleProxySetup();
276✔
409
        if (err != error::NoError) {
276✔
410
                return err;
4✔
411
        }
412

413
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
414
        // request to route to our k8s cluster. Set this in all cases.
415
        const string header_url = CreateHOSTAddress(req);
544✔
416
        req->SetHeader("HOST", header_url);
544✔
417

418
        log::Trace("Setting HOST address: " + header_url);
272✔
419

420
        header_handler_ = header_handler;
272✔
421
        body_handler_ = body_handler;
272✔
422
        status_ = TransactionStatus::None;
272✔
423

424
        cancelled_ = make_shared<bool>(false);
272✔
425

426
        auto &cancelled = cancelled_;
427

428
        resolver_.async_resolve(
544✔
429
                request_->address_.host,
430
                to_string(request_->address_.port),
544✔
431
                [this, cancelled](
540✔
432
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
267✔
433
                        if (!*cancelled) {
268✔
434
                                ResolveHandler(ec, results);
267✔
435
                        }
436
                });
268✔
437

438
        return error::NoError;
272✔
439
}
440

441
static inline error::Error AddProxyAuthHeader(OutgoingRequest &req, BrokenDownUrl &proxy_address) {
22✔
442
        if (proxy_address.username == "") {
22✔
443
                // nothing to do
444
                return error::NoError;
19✔
445
        }
446
        auto ex_dec_username = URLDecode(proxy_address.username);
3✔
447
        auto ex_dec_password = URLDecode(proxy_address.password);
3✔
448
        if (!ex_dec_username) {
3✔
NEW
449
                return ex_dec_username.error();
×
450
        }
451
        if (!ex_dec_password) {
3✔
NEW
452
                return ex_dec_password.error();
×
453
        }
454
        auto creds = ex_dec_username.value() + ":" + ex_dec_password.value();
3✔
455
        auto ex_encoded_creds = crypto::EncodeBase64(common::ByteVectorFromString(creds));
6✔
456
        if (!ex_encoded_creds) {
3✔
457
                return ex_encoded_creds.error();
×
458
        }
459
        req.SetHeader("Proxy-Authorization", "Basic " + ex_encoded_creds.value());
6✔
460

461
        return error::NoError;
3✔
462
}
463

464
error::Error Client::HandleProxySetup() {
276✔
465
        secondary_req_.reset();
276✔
466

467
        if (request_->address_.protocol == "http") {
276✔
468
                socket_mode_ = SocketMode::Plain;
250✔
469

470
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
250✔
471
                        // Make a modified proxy request.
472
                        BrokenDownUrl proxy_address;
20✔
473
                        auto err = BreakDownUrl(http_proxy_, proxy_address, true);
11✔
474
                        if (err != error::NoError) {
11✔
475
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
476
                        }
477
                        if (proxy_address.path != "" && proxy_address.path != "/") {
10✔
478
                                return MakeError(
479
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
480
                        }
481

482
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
18✔
483
                                                                          + ":" + to_string(request_->address_.port)
27✔
484
                                                                          + request_->address_.path;
27✔
485
                        request_->address_.host = proxy_address.host;
9✔
486
                        request_->address_.port = proxy_address.port;
9✔
487
                        request_->address_.protocol = proxy_address.protocol;
9✔
488

489
                        err = AddProxyAuthHeader(*request_, proxy_address);
9✔
490
                        if (err != error::NoError) {
9✔
UNCOV
491
                                return err;
×
492
                        }
493

494
                        if (proxy_address.protocol == "https") {
9✔
495
                                socket_mode_ = SocketMode::Tls;
5✔
496
                        } else if (proxy_address.protocol == "http") {
4✔
497
                                socket_mode_ = SocketMode::Plain;
4✔
498
                        } else {
499
                                // Should never get here.
500
                                assert(false);
501
                        }
502
                }
503
        } else if (request_->address_.protocol == "https") {
26✔
504
                socket_mode_ = SocketMode::Tls;
26✔
505

506
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
26✔
507
                        // Save the original request for later, so that we can make a new request
508
                        // over the channel established by CONNECT.
509
                        secondary_req_ = std::move(request_);
510

511
                        request_ = make_shared<OutgoingRequest>();
30✔
512
                        request_->SetMethod(Method::CONNECT);
15✔
513
                        BrokenDownUrl proxy_address;
28✔
514
                        auto err = BreakDownUrl(https_proxy_, proxy_address, true);
15✔
515
                        if (err != error::NoError) {
15✔
516
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
517
                        }
518
                        if (proxy_address.path != "" && proxy_address.path != "/") {
14✔
519
                                return MakeError(
520
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
521
                        }
522

523
                        request_->address_.path =
524
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
26✔
525
                        request_->address_.host = proxy_address.host;
13✔
526
                        request_->address_.port = proxy_address.port;
13✔
527
                        request_->address_.protocol = proxy_address.protocol;
13✔
528

529
                        err = AddProxyAuthHeader(*request_, proxy_address);
13✔
530
                        if (err != error::NoError) {
13✔
UNCOV
531
                                return err;
×
532
                        }
533

534
                        if (proxy_address.protocol == "https") {
13✔
535
                                socket_mode_ = SocketMode::Tls;
7✔
536
                        } else if (proxy_address.protocol == "http") {
6✔
537
                                socket_mode_ = SocketMode::Plain;
6✔
538
                        } else {
539
                                // Should never get here.
540
                                assert(false);
541
                        }
542
                }
543
        } else {
544
                // Should never get here
545
                assert(false);
546
        }
547

548
        return error::NoError;
272✔
549
}
550

551
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
172✔
552
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
172✔
553
                return expected::unexpected(error::Error(
2✔
554
                        make_error_condition(errc::operation_in_progress),
4✔
555
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
556
        }
557

558
        if (GetContentLength(*response_data_.http_response_parser_) == 0
170✔
559
                && !response_data_.http_response_parser_->chunked()) {
170✔
560
                return expected::unexpected(
17✔
561
                        MakeError(BodyMissingError, "Response does not contain a body"));
51✔
562
        }
563

564
        status_ = TransactionStatus::ReaderCreated;
153✔
565
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
306✔
566
}
567

568
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
569
        if (*cancelled_) {
7✔
UNCOV
570
                return expected::unexpected(error::Error(
×
UNCOV
571
                        make_error_condition(errc::not_connected),
×
UNCOV
572
                        "Cannot switch protocols if endpoint is not connected"));
×
573
        }
574

575
        // Rest of the connection is done directly on the socket, we are done here.
576
        status_ = TransactionStatus::Done;
7✔
577
        *cancelled_ = true;
7✔
578
        cancelled_ = make_shared<bool>(false);
14✔
579

580
        auto stream = stream_;
581
        // This no longer belongs to us.
582
        stream_.reset();
7✔
583

584
        switch (socket_mode_) {
7✔
UNCOV
585
        case SocketMode::TlsTls:
×
UNCOV
586
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
UNCOV
587
                        stream, response_data_.response_buffer_);
×
UNCOV
588
        case SocketMode::Tls:
×
UNCOV
589
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
UNCOV
590
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
UNCOV
591
                        response_data_.response_buffer_);
×
592
        case SocketMode::Plain:
7✔
593
                return make_shared<RawSocket<tcp::socket>>(
7✔
594
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
595
                        response_data_.response_buffer_);
7✔
596
        }
597

598
        AssertOrReturnUnexpected(false);
×
599
}
600

601
void Client::CallHandler(ResponseHandler handler) {
354✔
602
        // This function exists to make sure we have a copy of the handler we're calling (in the
603
        // argument list). This is important in case the handler owns the client instance through a
604
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
605
        // does, then it destroys the final copy of the handler, and therefore also the client,
606
        // which is why we need to make a copy here, before calling it.
607
        handler(response_);
354✔
608
}
354✔
609

610
void Client::CallErrorHandler(
83✔
611
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
612
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
249✔
613
}
83✔
614

615
void Client::CallErrorHandler(
126✔
616
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
617
        status_ = TransactionStatus::Done;
126✔
618
        DoCancel();
126✔
619
        handler(expected::unexpected(
252✔
620
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
504✔
621
}
126✔
622

623
void Client::ResolveHandler(
267✔
624
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
625
        if (ec) {
267✔
UNCOV
626
                CallErrorHandler(ec, request_, header_handler_);
×
UNCOV
627
                return;
×
628
        }
629

630
        if (logger_.Level() >= log::LogLevel::Debug) {
267✔
631
                string ips = "[";
245✔
632
                string sep;
633
                for (auto r : results) {
1,044✔
634
                        ips += sep;
277✔
635
                        ips += r.endpoint().address().to_string();
277✔
636
                        sep = ", ";
277✔
637
                }
638
                ips += "]";
245✔
639
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
490✔
640
        }
641

642
        resolver_results_ = results;
643

644
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
267✔
645
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
534✔
646

647
        if (!response_data_.response_buffer_) {
267✔
648
                // We can reuse this if preexisting.
649
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
396✔
650

651
                // This is equivalent to:
652
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
653
                // but compatible with Boost 1.67.
654
                response_data_.response_buffer_->prepare(
655
                        body_buffer_.size() - response_data_.response_buffer_->size());
198✔
656
        }
657

658
        auto &cancelled = cancelled_;
659

660
        asio::async_connect(
267✔
661
                stream_->lowest_layer(),
662
                resolver_results_,
267✔
663
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
534✔
664
                        if (!*cancelled) {
267✔
665
                                switch (socket_mode_) {
267✔
UNCOV
666
                                case SocketMode::TlsTls:
×
667
                                        // Should never happen because we always need to handshake
668
                                        // the innermost Tls first, then the outermost, but the
669
                                        // latter doesn't happen here.
670
                                        assert(false);
UNCOV
671
                                        CallErrorHandler(
×
UNCOV
672
                                                error::MakeError(
×
UNCOV
673
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
674
                                                request_,
×
UNCOV
675
                                                header_handler_);
×
676
                                case SocketMode::Tls:
21✔
677
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
21✔
678
                                case SocketMode::Plain:
246✔
679
                                        return ConnectHandler(ec, endpoint);
246✔
680
                                }
681
                        }
682
                });
683
}
684

685
template <typename StreamType>
686
void Client::HandshakeHandler(
25✔
687
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
688
        if (ec) {
25✔
689
                CallErrorHandler(ec, request_, header_handler_);
2✔
690
                return;
2✔
691
        }
692

693
        // Enable TCP keepalive
694
        boost::asio::socket_base::keep_alive option(true);
695
        stream_->lowest_layer().set_option(option);
23✔
696

697
        // Set SNI Hostname (many hosts need this to handshake successfully)
698
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
23✔
UNCOV
699
                beast::error_code ec2 {
×
UNCOV
700
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
UNCOV
701
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
702
        }
703

704
        // Enable host name verification (not done automatically and we don't have
705
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
706
        // hence the callback that boost provides).
707
        boost::system::error_code b_ec;
23✔
708
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
46✔
709
        if (b_ec) {
23✔
UNCOV
710
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
UNCOV
711
                CallErrorHandler(b_ec, request_, header_handler_);
×
UNCOV
712
                return;
×
713
        }
714

715
        auto &cancelled = cancelled_;
716

717
        stream.async_handshake(
46✔
718
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
23✔
719
                        if (*cancelled) {
26✔
720
                                return;
721
                        }
722
                        if (ec) {
26✔
723
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
724
                                CallErrorHandler(ec, request_, header_handler_);
10✔
725
                                return;
10✔
726
                        }
727
                        logger_.Debug("https: Successful SSL handshake");
32✔
728
                        ConnectHandler(ec, endpoint);
16✔
729
                });
730
}
731

732

733
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
262✔
734
        if (ec) {
262✔
735
                CallErrorHandler(ec, request_, header_handler_);
16✔
736
                return;
16✔
737
        }
738

739
        // Enable TCP keepalive
740
        boost::asio::socket_base::keep_alive option(true);
741
        stream_->lowest_layer().set_option(option);
246✔
742

743
        logger_.Debug("Connected to " + endpoint.address().to_string());
492✔
744

745
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
246✔
746
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
492✔
747

748
        for (const auto &header : request_->headers_) {
646✔
749
                request_data_.http_request_->set(header.first, header.second);
400✔
750
        }
751

752
        request_data_.http_request_serializer_ =
753
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
246✔
754

755
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
492✔
756

757
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
758
        // if they do, they should be handled higher up in the application logic.
759
        //
760
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
761
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
762
        // `has_value()` in their code, causing their subsequent comparison operation to
763
        // misbehave. So pass highest possible value instead.
764
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
765

766
        auto &cancelled = cancelled_;
767
        auto &request_data = request_data_;
246✔
768

769
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
246✔
770
                if (!*cancelled) {
246✔
771
                        WriteHeaderHandler(ec, num_written);
246✔
772
                }
773
        };
492✔
774

775
        switch (socket_mode_) {
246✔
776
        case SocketMode::TlsTls:
2✔
777
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
2✔
778
                break;
779
        case SocketMode::Tls:
14✔
780
                http::async_write_header(
14✔
781
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
782
                break;
783
        case SocketMode::Plain:
230✔
784
                http::async_write_header(
230✔
785
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
786
                break;
787
        }
788
}
789

790
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
246✔
791
        if (num_written > 0) {
246✔
792
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
492✔
793
        }
794

795
        if (ec) {
246✔
UNCOV
796
                CallErrorHandler(ec, request_, header_handler_);
×
797
                return;
206✔
798
        }
799

800
        auto exp_has_body =
801
                HasBody(request_->GetHeader("Content-Length"), request_->GetHeader("Transfer-Encoding"));
492✔
802
        if (!exp_has_body) {
246✔
UNCOV
803
                CallErrorHandler(exp_has_body.error(), request_, header_handler_);
×
804
                return;
×
805
        }
806
        if (!exp_has_body.value()) {
246✔
807
                ReadHeader();
205✔
808
                return;
809
        }
810

811
        if (!request_->body_gen_ && !request_->async_body_gen_) {
41✔
812
                auto err = MakeError(BodyMissingError, "No body generator");
2✔
813
                CallErrorHandler(err, request_, header_handler_);
2✔
814
                return;
815
        }
816

817
        assert(!(request_->body_gen_ && request_->async_body_gen_));
818

819
        if (request_->body_gen_) {
40✔
820
                auto body_reader = request_->body_gen_();
34✔
821
                if (!body_reader) {
34✔
UNCOV
822
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
823
                        return;
824
                }
825
                request_->body_reader_ = body_reader.value();
34✔
826
        } else {
827
                auto body_reader = request_->async_body_gen_();
6✔
828
                if (!body_reader) {
6✔
UNCOV
829
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
830
                        return;
831
                }
832
                request_->async_body_reader_ = body_reader.value();
6✔
833
        }
834

835
        PrepareAndWriteNewBodyBuffer();
40✔
836
}
837

838
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,278✔
839
        if (num_written > 0) {
2,278✔
840
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,242✔
841
        }
842

843
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,278✔
844
                // Write next block of the body.
845
                PrepareAndWriteNewBodyBuffer();
1,120✔
846
        } else if (ec) {
1,158✔
847
                CallErrorHandler(ec, request_, header_handler_);
8✔
848
        } else if (num_written > 0) {
1,154✔
849
                // We are still writing the body.
850
                WriteBody();
1,121✔
851
        } else {
852
                // We are ready to receive the response.
853
                ReadHeader();
33✔
854
        }
855
}
2,278✔
856

857
void Client::PrepareAndWriteNewBodyBuffer() {
1,160✔
858
        // request_->body_reader_ XOR request_->async_body_reader_
859
        assert(
860
                (request_->body_reader_ || request_->async_body_reader_)
861
                && !(request_->body_reader_ && request_->async_body_reader_));
862

863
        auto cancelled = cancelled_;
864
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,592✔
865
                if (!*cancelled) {
1,160✔
866
                        if (!read) {
1,159✔
867
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
868
                                return;
2✔
869
                        }
870
                        WriteNewBodyBuffer(read.value());
1,157✔
871
                }
872
        };
1,160✔
873

874

875
        if (request_->body_reader_) {
1,160✔
876
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,470✔
877
        } else {
878
                auto err = request_->async_body_reader_->AsyncRead(
879
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
880
                if (err != error::NoError) {
425✔
UNCOV
881
                        CallErrorHandler(err, request_, header_handler_);
×
882
                }
883
        }
884
}
1,160✔
885

886
void Client::WriteNewBodyBuffer(size_t size) {
1,157✔
887
        request_data_.http_request_->body().data = body_buffer_.data();
1,157✔
888
        request_data_.http_request_->body().size = size;
1,157✔
889

890
        if (size > 0) {
1,157✔
891
                request_data_.http_request_->body().more = true;
1,124✔
892
        } else {
893
                // Release ownership of Body reader.
894
                request_->body_reader_.reset();
33✔
895
                request_->async_body_reader_.reset();
33✔
896
                request_data_.http_request_->body().more = false;
33✔
897
        }
898

899
        WriteBody();
1,157✔
900
}
1,157✔
901

902
void Client::WriteBody() {
2,278✔
903
        auto &cancelled = cancelled_;
904
        auto &request_data = request_data_;
2,278✔
905

906
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,278✔
907
                if (!*cancelled) {
2,278✔
908
                        WriteBodyHandler(ec, num_written);
2,278✔
909
                }
910
        };
4,556✔
911

912
        switch (socket_mode_) {
2,278✔
UNCOV
913
        case SocketMode::TlsTls:
×
914
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
915
                break;
UNCOV
916
        case SocketMode::Tls:
×
917
                http::async_write_some(
918
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
919
                break;
920
        case SocketMode::Plain:
2,278✔
921
                http::async_write_some(
922
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
923
                break;
924
        }
925
}
2,278✔
926

927
void Client::ReadHeader() {
238✔
928
        auto &cancelled = cancelled_;
929
        auto &response_data = response_data_;
238✔
930

931
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
235✔
932
                if (!*cancelled) {
235✔
933
                        ReadHeaderHandler(ec, num_read);
235✔
934
                }
935
        };
476✔
936

937
        switch (socket_mode_) {
238✔
938
        case SocketMode::TlsTls:
2✔
939
                http::async_read_some(
2✔
940
                        *stream_,
941
                        *response_data_.response_buffer_,
942
                        *response_data_.http_response_parser_,
943
                        handler);
944
                break;
945
        case SocketMode::Tls:
14✔
946
                http::async_read_some(
14✔
947
                        stream_->next_layer(),
948
                        *response_data_.response_buffer_,
949
                        *response_data_.http_response_parser_,
950
                        handler);
951
                break;
952
        case SocketMode::Plain:
222✔
953
                http::async_read_some(
222✔
954
                        stream_->next_layer().next_layer(),
955
                        *response_data_.response_buffer_,
956
                        *response_data_.http_response_parser_,
957
                        handler);
958
                break;
959
        }
960
}
238✔
961

962
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
235✔
963
        if (num_read > 0) {
235✔
964
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
460✔
965
        }
966

967
        if (ec) {
235✔
968
                CallErrorHandler(ec, request_, header_handler_);
5✔
969
                return;
65✔
970
        }
971

972
        if (!response_data_.http_response_parser_->is_header_done()) {
230✔
UNCOV
973
                ReadHeader();
×
UNCOV
974
                return;
×
975
        }
976

977
        if (secondary_req_) {
230✔
978
                HandleSecondaryRequest();
9✔
979
                return;
9✔
980
        }
981

982
        response_.reset(new IncomingResponse(*this, cancelled_));
442✔
983
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
221✔
984
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
221✔
985

986
        logger_.Debug(
442✔
987
                "Received response: " + to_string(response_->status_code_) + " "
442✔
988
                + response_->status_message_);
663✔
989

990
        string debug_str;
991
        for (auto header = response_data_.http_response_parser_->get().cbegin();
254✔
992
                 header != response_data_.http_response_parser_->get().cend();
475✔
993
                 header++) {
994
                response_->headers_[string {header->name_string()}] = string {header->value()};
762✔
995
                if (logger_.Level() >= log::LogLevel::Debug) {
254✔
996
                        debug_str += string {header->name_string()};
239✔
997
                        debug_str += ": ";
239✔
998
                        debug_str += string {header->value()};
239✔
999
                        debug_str += "\n";
239✔
1000
                }
1001
        }
1002

1003
        logger_.Debug("Received headers:\n" + debug_str);
442✔
1004
        debug_str.clear();
1005

1006
        if (GetContentLength(*response_data_.http_response_parser_) == 0
221✔
1007
                && !response_data_.http_response_parser_->chunked()) {
221✔
1008
                auto cancelled = cancelled_;
1009
                status_ = TransactionStatus::HeaderHandlerCalled;
48✔
1010
                CallHandler(header_handler_);
96✔
1011
                if (!*cancelled) {
48✔
1012
                        status_ = TransactionStatus::Done;
42✔
1013
                        if (response_->status_code_ != StatusCode::StatusSwitchingProtocols) {
42✔
1014
                                // Make an exception for 101 Switching Protocols response, where the TCP connection
1015
                                // is meant to be reused.
1016
                                DoCancel();
38✔
1017
                        }
1018
                        CallHandler(body_handler_);
84✔
1019
                }
1020
                return;
1021
        }
1022

1023
        auto cancelled = cancelled_;
1024
        status_ = TransactionStatus::HeaderHandlerCalled;
173✔
1025
        CallHandler(header_handler_);
346✔
1026
        if (*cancelled) {
173✔
1027
                return;
1028
        }
1029

1030
        // We know that a body reader is required here, because of the check for body above.
1031
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
170✔
1032
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
1033
        }
1034
}
1035

1036
void Client::HandleSecondaryRequest() {
9✔
1037
        logger_.Debug(
18✔
1038
                "Received proxy response: "
1039
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
18✔
1040
                + string {response_data_.http_response_parser_->get().reason()});
36✔
1041

1042
        request_ = std::move(secondary_req_);
1043

1044
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
9✔
1045
                auto err = MakeError(
1046
                        ProxyError,
1047
                        "Proxy returned unexpected response: "
1048
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
1049
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
1050
                CallErrorHandler(err, request_, header_handler_);
4✔
1051
                return;
1052
        }
1053

1054
        if (GetContentLength(*response_data_.http_response_parser_) != 0
7✔
1055
                || response_data_.http_response_parser_->chunked()) {
7✔
UNCOV
1056
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
UNCOV
1057
                CallErrorHandler(err, request_, header_handler_);
×
1058
                return;
1059
        }
1060

1061
        // We are connected. Now repeat the request cycle with the original request. Pretend
1062
        // we were just connected.
1063

1064
        assert(request_->GetProtocol() == "https");
1065

1066
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1067
        // a different layer, this will get out of sync.
1068
        assert(response_data_.response_buffer_->size() == 0);
1069

1070
        switch (socket_mode_) {
7✔
UNCOV
1071
        case SocketMode::TlsTls:
×
1072
                // Should never get here, because this is the only place where TlsTls mode
1073
                // is supposed to be turned on.
1074
                assert(false);
UNCOV
1075
                CallErrorHandler(
×
UNCOV
1076
                        error::MakeError(
×
1077
                                error::ProgrammingError,
UNCOV
1078
                                "Any other mode than Tls is not valid when handling secondary request"),
×
1079
                        request_,
×
UNCOV
1080
                        header_handler_);
×
UNCOV
1081
                break;
×
1082
        case SocketMode::Tls:
3✔
1083
                // Upgrade to TLS inside TLS.
1084
                socket_mode_ = SocketMode::TlsTls;
3✔
1085
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1086
                break;
3✔
1087
        case SocketMode::Plain:
4✔
1088
                // Upgrade to TLS.
1089
                socket_mode_ = SocketMode::Tls;
4✔
1090
                HandshakeHandler(
4✔
1091
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
4✔
1092
                break;
4✔
1093
        }
1094
}
1095

1096
void Client::AsyncReadNextBodyPart(
4,209✔
1097
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1098
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1099

1100
        if (status_ == TransactionStatus::ReaderCreated) {
4,209✔
1101
                status_ = TransactionStatus::BodyReadingInProgress;
151✔
1102
        }
1103

1104
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
4,209✔
1105
                auto cancelled = cancelled_;
1106
                handler(0);
182✔
1107
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
91✔
1108
                        status_ = TransactionStatus::Done;
91✔
1109
                        DoCancel();
91✔
1110
                        CallHandler(body_handler_);
182✔
1111
                }
1112
                return;
1113
        }
1114

1115
        reader_buf_start_ = start;
4,118✔
1116
        reader_buf_end_ = end;
4,118✔
1117
        reader_handler_ = handler;
4,118✔
1118
        size_t read_size = end - start;
4,118✔
1119
        size_t smallest = min(body_buffer_.size(), read_size);
6,231✔
1120

1121
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
4,118✔
1122
        response_data_.http_response_parser_->get().body().size = smallest;
4,118✔
1123
        response_data_.last_buffer_size_ = smallest;
4,118✔
1124

1125
        auto &cancelled = cancelled_;
1126
        auto &response_data = response_data_;
4,118✔
1127

1128
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4,117✔
1129
                if (!*cancelled) {
4,117✔
1130
                        ReadBodyHandler(ec, num_read);
4,116✔
1131
                }
1132
        };
8,236✔
1133

1134
        switch (socket_mode_) {
4,118✔
1135
        case SocketMode::TlsTls:
2✔
1136
                http::async_read_some(
2✔
1137
                        *stream_,
1138
                        *response_data_.response_buffer_,
1139
                        *response_data_.http_response_parser_,
1140
                        async_handler);
1141
                break;
1142
        case SocketMode::Tls:
4✔
1143
                http::async_read_some(
4✔
1144
                        stream_->next_layer(),
1145
                        *response_data_.response_buffer_,
1146
                        *response_data_.http_response_parser_,
1147
                        async_handler);
1148
                break;
1149
        case SocketMode::Plain:
4,112✔
1150
                http::async_read_some(
4,112✔
1151
                        stream_->next_layer().next_layer(),
1152
                        *response_data_.response_buffer_,
1153
                        *response_data_.http_response_parser_,
1154
                        async_handler);
1155
                break;
1156
        }
1157
}
1158

1159
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
4,116✔
1160
        if (num_read > 0) {
4,116✔
1161
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
8,132✔
1162
        }
1163

1164
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,116✔
1165
                // This can be ignored. We always reset the buffer between reads anyway.
1166
                ec = error_code();
1,958✔
1167
        }
1168

1169
        assert(reader_handler_);
1170

1171
        if (response_data_.http_response_parser_->is_done()) {
4,116✔
1172
                status_ = TransactionStatus::BodyReadingFinished;
97✔
1173
        }
1174

1175
        auto cancelled = cancelled_;
1176

1177
        if (ec) {
4,116✔
1178
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
100✔
1179
                reader_handler_(expected::unexpected(err));
150✔
1180
                if (!*cancelled) {
50✔
1181
                        CallErrorHandler(ec, request_, body_handler_);
92✔
1182
                }
1183
                return;
1184
        }
1185

1186
        // The num_read from above includes out of band payload data, such as chunk headers, which
1187
        // we are not interested in. So we need to calculate the payload size from the remaining
1188
        // buffer space.
1189
        size_t payload_read =
1190
                response_data_.last_buffer_size_ - response_data_.http_response_parser_->get().body().size;
4,066✔
1191

1192
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
4,066✔
1193
        size_t smallest = min(payload_read, buf_size);
4,066✔
1194

1195
        if (smallest == 0) {
4,066✔
1196
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1197
                // return 0 to the handler however, because in `io::Reader` context this means
1198
                // EOF. So just repeat the request instead, until we get actual payload data.
1199
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1200
        } else {
1201
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,989✔
1202
                reader_handler_(smallest);
7,978✔
1203
        }
1204
}
1205

1206
void Client::Cancel() {
147✔
1207
        auto cancelled = cancelled_;
1208

1209
        if (!*cancelled) {
147✔
1210
                auto err =
1211
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
42✔
1212
                switch (status_) {
21✔
1213
                case TransactionStatus::None:
3✔
1214
                        CallErrorHandler(err, request_, header_handler_);
3✔
1215
                        break;
3✔
1216
                case TransactionStatus::HeaderHandlerCalled:
17✔
1217
                case TransactionStatus::ReaderCreated:
1218
                case TransactionStatus::BodyReadingInProgress:
1219
                case TransactionStatus::BodyReadingFinished:
1220
                        CallErrorHandler(err, request_, body_handler_);
17✔
1221
                        break;
17✔
1222
                case TransactionStatus::Replying:
1223
                case TransactionStatus::SwitchingProtocol:
1224
                        // Not used by client.
1225
                        assert(false);
1226
                        break;
1227
                case TransactionStatus::BodyHandlerCalled:
1228
                case TransactionStatus::Done:
1229
                        break;
1230
                }
1231
        }
1232

1233
        if (!*cancelled) {
147✔
1234
                DoCancel();
1✔
1235
        }
1236
}
147✔
1237

1238
void Client::DoCancel() {
609✔
1239
        resolver_.cancel();
609✔
1240
        if (stream_) {
609✔
1241
                stream_->lowest_layer().cancel();
260✔
1242
                stream_->lowest_layer().close();
260✔
1243
                stream_.reset();
260✔
1244
        }
1245

1246
        // Reset logger to no connection.
1247
        logger_ = log::Logger(logger_name_);
609✔
1248

1249
        // Set cancel state and then make a new one. Those who are interested should have their own
1250
        // pointer to the old one.
1251
        *cancelled_ = true;
609✔
1252
        cancelled_ = make_shared<bool>(true);
609✔
1253
}
609✔
1254

1255
Stream::Stream(Server &server) :
441✔
1256
        server_ {server},
1257
        logger_ {"http"},
1258
        cancelled_(make_shared<bool>(true)),
441✔
1259
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
441✔
1260
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,323✔
1261
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
882✔
1262

1263
        // This is equivalent to:
1264
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1265
        // but compatible with Boost 1.67.
1266
        request_data_.request_buffer_->prepare(
1267
                body_buffer_.size() - request_data_.request_buffer_->size());
441✔
1268

1269
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
882✔
1270

1271
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1272
        // they do, they should be handled higher up in the application logic.
1273
        //
1274
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1275
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1276
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1277
        // possible value instead.
1278
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1279
}
441✔
1280

1281
Stream::~Stream() {
1,323✔
1282
        DoCancel();
441✔
1283
}
441✔
1284

1285
void Stream::Cancel() {
7✔
1286
        auto cancelled = cancelled_;
1287

1288
        if (!*cancelled) {
7✔
1289
                auto err =
1290
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1291
                switch (status_) {
7✔
UNCOV
1292
                case TransactionStatus::None:
×
UNCOV
1293
                        CallErrorHandler(err, request_, server_.header_handler_);
×
UNCOV
1294
                        break;
×
1295
                case TransactionStatus::HeaderHandlerCalled:
5✔
1296
                case TransactionStatus::ReaderCreated:
1297
                case TransactionStatus::BodyReadingInProgress:
1298
                case TransactionStatus::BodyReadingFinished:
1299
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1300
                        break;
5✔
1301
                case TransactionStatus::BodyHandlerCalled:
×
1302
                        // In between body handler and reply finished. No one to handle the status
1303
                        // here.
UNCOV
1304
                        server_.RemoveStream(shared_from_this());
×
UNCOV
1305
                        break;
×
1306
                case TransactionStatus::Replying:
1✔
1307
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1308
                        break;
1✔
1309
                case TransactionStatus::SwitchingProtocol:
1✔
1310
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1311
                        break;
1✔
1312
                case TransactionStatus::Done:
1313
                        break;
1314
                }
1315
        }
1316

1317
        if (!*cancelled) {
7✔
UNCOV
1318
                DoCancel();
×
1319
        }
1320
}
7✔
1321

1322
void Stream::DoCancel() {
801✔
1323
        if (socket_.is_open()) {
801✔
1324
                socket_.cancel();
217✔
1325
                socket_.close();
217✔
1326
        }
1327

1328
        // Set cancel state and then make a new one. Those who are interested should have their own
1329
        // pointer to the old one.
1330
        *cancelled_ = true;
801✔
1331
        cancelled_ = make_shared<bool>(true);
801✔
1332
}
801✔
1333

UNCOV
1334
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
UNCOV
1335
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
UNCOV
1336
}
×
1337

UNCOV
1338
void Stream::CallErrorHandler(
×
1339
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
UNCOV
1340
        status_ = TransactionStatus::Done;
×
UNCOV
1341
        DoCancel();
×
1342
        handler(expected::unexpected(err.WithContext(
×
1343
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1344

UNCOV
1345
        server_.RemoveStream(shared_from_this());
×
1346
}
×
1347

1348
void Stream::CallErrorHandler(
2✔
1349
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1350
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1351
}
2✔
1352

1353
void Stream::CallErrorHandler(
8✔
1354
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1355
        status_ = TransactionStatus::Done;
8✔
1356
        DoCancel();
8✔
1357
        handler(
8✔
1358
                req,
1359
                err.WithContext(
8✔
1360
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
24✔
1361

1362
        server_.RemoveStream(shared_from_this());
8✔
1363
}
8✔
1364

1365
void Stream::CallErrorHandler(
4✔
1366
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1367
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1368
}
4✔
1369

1370
void Stream::CallErrorHandler(
7✔
1371
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1372
        status_ = TransactionStatus::Done;
7✔
1373
        DoCancel();
7✔
1374
        handler(err.WithContext(
14✔
1375
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1376

1377
        server_.RemoveStream(shared_from_this());
7✔
1378
}
7✔
1379

UNCOV
1380
void Stream::CallErrorHandler(
×
1381
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
UNCOV
1382
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
UNCOV
1383
}
×
1384

1385
void Stream::CallErrorHandler(
1✔
1386
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1387
        status_ = TransactionStatus::Done;
1✔
1388
        DoCancel();
1✔
1389
        handler(expected::unexpected(err.WithContext(
2✔
1390
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1391

1392
        server_.RemoveStream(shared_from_this());
1✔
1393
}
1✔
1394

1395
void Stream::AcceptHandler(const error_code &ec) {
225✔
1396
        if (ec) {
225✔
UNCOV
1397
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
UNCOV
1398
                return;
×
1399
        }
1400

1401
        auto ip = socket_.remote_endpoint().address().to_string();
450✔
1402

1403
        // Use IP as context for logging.
1404
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
225✔
1405

1406
        logger_.Debug("Accepted connection.");
450✔
1407

1408
        request_.reset(new IncomingRequest(*this, cancelled_));
450✔
1409

1410
        request_->address_.host = ip;
225✔
1411

1412
        *cancelled_ = false;
225✔
1413

1414
        ReadHeader();
225✔
1415
}
1416

1417
void Stream::ReadHeader() {
225✔
1418
        auto &cancelled = cancelled_;
1419
        auto &request_data = request_data_;
225✔
1420

1421
        http::async_read_some(
450✔
1422
                socket_,
225✔
1423
                *request_data_.request_buffer_,
1424
                *request_data_.http_request_parser_,
1425
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
225✔
1426
                        if (!*cancelled) {
225✔
1427
                                ReadHeaderHandler(ec, num_read);
225✔
1428
                        }
1429
                });
225✔
1430
}
225✔
1431

1432
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
225✔
1433
        if (num_read > 0) {
225✔
1434
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
450✔
1435
        }
1436

1437
        if (ec) {
225✔
UNCOV
1438
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1439
                return;
185✔
1440
        }
1441

1442
        if (!request_data_.http_request_parser_->is_header_done()) {
225✔
UNCOV
1443
                ReadHeader();
×
UNCOV
1444
                return;
×
1445
        }
1446

1447
        auto method_result = BeastVerbToMethod(
1448
                request_data_.http_request_parser_->get().base().method(),
1449
                string {request_data_.http_request_parser_->get().base().method_string()});
450✔
1450
        if (!method_result) {
225✔
1451
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1452
                return;
×
1453
        }
1454
        request_->method_ = method_result.value();
225✔
1455
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
225✔
1456

1457
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
225✔
1458

1459
        string debug_str;
1460
        for (auto header = request_data_.http_request_parser_->get().cbegin();
391✔
1461
                 header != request_data_.http_request_parser_->get().cend();
616✔
1462
                 header++) {
1463
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,173✔
1464
                if (logger_.Level() >= log::LogLevel::Debug) {
391✔
1465
                        debug_str += string {header->name_string()};
326✔
1466
                        debug_str += ": ";
326✔
1467
                        debug_str += string {header->value()};
326✔
1468
                        debug_str += "\n";
326✔
1469
                }
1470
        }
1471

1472
        logger_.Debug("Received headers:\n" + debug_str);
450✔
1473
        debug_str.clear();
1474

1475
        if (GetContentLength(*request_data_.http_request_parser_) == 0
225✔
1476
                && !request_data_.http_request_parser_->chunked()) {
225✔
1477
                auto cancelled = cancelled_;
1478
                status_ = TransactionStatus::HeaderHandlerCalled;
184✔
1479
                server_.header_handler_(request_);
368✔
1480
                if (!*cancelled) {
184✔
1481
                        status_ = TransactionStatus::BodyHandlerCalled;
184✔
1482
                        CallBodyHandler();
184✔
1483
                }
1484
                return;
1485
        }
1486

1487
        assert(!request_data_.http_request_parser_->is_done());
1488

1489
        auto cancelled = cancelled_;
1490
        status_ = TransactionStatus::HeaderHandlerCalled;
41✔
1491
        server_.header_handler_(request_);
82✔
1492
        if (*cancelled) {
41✔
1493
                return;
1494
        }
1495

1496
        // We know that a body reader is required here, because of the check for body above.
1497
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
40✔
1498
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1499
        }
1500
}
1501

1502
void Stream::AsyncReadNextBodyPart(
2,264✔
1503
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1504
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1505

1506
        if (status_ == TransactionStatus::ReaderCreated) {
2,264✔
1507
                status_ = TransactionStatus::BodyReadingInProgress;
39✔
1508
        }
1509

1510
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,264✔
1511
                auto cancelled = cancelled_;
1512
                handler(0);
66✔
1513
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
33✔
1514
                        status_ = TransactionStatus::BodyHandlerCalled;
33✔
1515
                        CallBodyHandler();
33✔
1516
                }
1517
                return;
1518
        }
1519

1520
        reader_buf_start_ = start;
2,231✔
1521
        reader_buf_end_ = end;
2,231✔
1522
        reader_handler_ = handler;
2,231✔
1523
        size_t read_size = end - start;
2,231✔
1524
        size_t smallest = min(body_buffer_.size(), read_size);
3,287✔
1525

1526
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,231✔
1527
        request_data_.http_request_parser_->get().body().size = smallest;
2,231✔
1528
        request_data_.last_buffer_size_ = smallest;
2,231✔
1529

1530
        auto &cancelled = cancelled_;
1531
        auto &request_data = request_data_;
2,231✔
1532

1533
        http::async_read_some(
4,462✔
1534
                socket_,
2,231✔
1535
                *request_data_.request_buffer_,
1536
                *request_data_.http_request_parser_,
1537
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,231✔
1538
                        if (!*cancelled) {
2,231✔
1539
                                ReadBodyHandler(ec, num_read);
2,231✔
1540
                        }
1541
                });
2,231✔
1542
}
1543

1544
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,231✔
1545
        if (num_read > 0) {
2,231✔
1546
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,454✔
1547
        }
1548

1549
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,231✔
1550
                // This can be ignored. We always reset the buffer between reads anyway.
1551
                ec = error_code();
979✔
1552
        }
1553

1554
        assert(reader_handler_);
1555

1556
        if (request_data_.http_request_parser_->is_done()) {
2,231✔
1557
                status_ = TransactionStatus::BodyReadingFinished;
33✔
1558
        }
1559

1560
        auto cancelled = cancelled_;
1561

1562
        if (ec) {
2,231✔
1563
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1564
                reader_handler_(expected::unexpected(err));
12✔
1565
                if (!*cancelled) {
4✔
1566
                        CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1567
                }
1568
                return;
1569
        }
1570

1571
        // The num_read from above includes out of band payload data, such as chunk headers, which
1572
        // we are not interested in. So we need to calculate the payload size from the remaining
1573
        // buffer space.
1574
        size_t payload_read =
1575
                request_data_.last_buffer_size_ - request_data_.http_request_parser_->get().body().size;
2,227✔
1576

1577
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,227✔
1578
        size_t smallest = min(payload_read, buf_size);
2,227✔
1579

1580
        if (smallest == 0) {
2,227✔
1581
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1582
                // return 0 to the handler however, because in `io::Reader` context this means
1583
                // EOF. So just repeat the request instead, until we get actual payload data.
1584
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1585
        } else {
1586
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,150✔
1587
                reader_handler_(smallest);
4,300✔
1588
        }
1589
}
1590

1591
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
202✔
1592
        SetupResponse();
202✔
1593

1594
        reply_finished_handler_ = reply_finished_handler;
202✔
1595

1596
        auto &cancelled = cancelled_;
1597
        auto &response_data = response_data_;
202✔
1598

1599
        http::async_write_header(
404✔
1600
                socket_,
202✔
1601
                *response_data_.http_response_serializer_,
1602
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
202✔
1603
                        if (!*cancelled) {
202✔
1604
                                WriteHeaderHandler(ec, num_written);
201✔
1605
                        }
1606
                });
202✔
1607
}
202✔
1608

1609
void Stream::SetupResponse() {
211✔
1610
        auto response = maybe_response_.lock();
211✔
1611
        // Only called from existing responses, so this should always be true.
1612
        assert(response);
1613

1614
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1615
        status_ = TransactionStatus::Replying;
211✔
1616

1617
        // From here on we take shared ownership.
1618
        response_ = response;
1619

1620
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
422✔
1621

1622
        for (const auto &header : response->headers_) {
438✔
1623
                response_data_.http_response_->base().set(header.first, header.second);
227✔
1624
        }
1625

1626
        response_data_.http_response_->result(response->GetStatusCode());
211✔
1627
        response_data_.http_response_->reason(response->GetStatusMessage());
422✔
1628

1629
        response_data_.http_response_serializer_ =
1630
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
422✔
1631
}
211✔
1632

1633
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
201✔
1634
        if (num_written > 0) {
201✔
1635
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
402✔
1636
        }
1637

1638
        if (ec) {
201✔
UNCOV
1639
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1640
                return;
37✔
1641
        }
1642

1643
        auto exp_has_body =
1644
                HasBody(response_->GetHeader("Content-Length"), response_->GetHeader("Transfer-Encoding"));
402✔
1645
        if (!exp_has_body) {
201✔
UNCOV
1646
                CallErrorHandler(exp_has_body.error(), request_, reply_finished_handler_);
×
1647
                return;
×
1648
        }
1649
        if (!exp_has_body.value()) {
201✔
1650
                FinishReply();
36✔
1651
                return;
1652
        }
1653

1654
        if (!response_->body_reader_ && !response_->async_body_reader_) {
165✔
1655
                auto err = MakeError(BodyMissingError, "No body reader");
2✔
1656
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1657
                return;
1658
        }
1659

1660
        PrepareAndWriteNewBodyBuffer();
164✔
1661
}
1662

1663
void Stream::PrepareAndWriteNewBodyBuffer() {
2,104✔
1664
        // response_->body_reader_ XOR response_->async_body_reader_
1665
        assert(
1666
                (response_->body_reader_ || response_->async_body_reader_)
1667
                && !(response_->body_reader_ && response_->async_body_reader_));
1668

1669
        auto read_handler = [this](io::ExpectedSize read) {
2,105✔
1670
                if (!read) {
2,104✔
1671
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1672
                        return;
1✔
1673
                }
1674
                WriteNewBodyBuffer(read.value());
2,103✔
1675
        };
2,104✔
1676

1677
        if (response_->body_reader_) {
2,104✔
1678
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,660✔
1679
        } else {
1680
                auto err = response_->async_body_reader_->AsyncRead(
1681
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1682
                if (err != error::NoError) {
274✔
UNCOV
1683
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1684
                }
1685
        }
1686
}
2,104✔
1687

1688
void Stream::WriteNewBodyBuffer(size_t size) {
2,103✔
1689
        response_data_.http_response_->body().data = body_buffer_.data();
2,103✔
1690
        response_data_.http_response_->body().size = size;
2,103✔
1691

1692
        if (size > 0) {
2,103✔
1693
                response_data_.http_response_->body().more = true;
1,974✔
1694
        } else {
1695
                response_data_.http_response_->body().more = false;
129✔
1696
        }
1697

1698
        WriteBody();
2,103✔
1699
}
2,103✔
1700

1701
void Stream::WriteBody() {
4,057✔
1702
        auto &cancelled = cancelled_;
1703
        auto &response_data = response_data_;
4,057✔
1704

1705
        http::async_write_some(
8,114✔
1706
                socket_,
4,057✔
1707
                *response_data_.http_response_serializer_,
1708
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
4,018✔
1709
                        if (!*cancelled) {
4,018✔
1710
                                WriteBodyHandler(ec, num_written);
4,018✔
1711
                        }
1712
                });
4,018✔
1713
}
4,057✔
1714

1715
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
4,018✔
1716
        if (num_written > 0) {
4,018✔
1717
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,908✔
1718
        }
1719

1720
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,018✔
1721
                // Write next body block.
1722
                PrepareAndWriteNewBodyBuffer();
1,940✔
1723
        } else if (ec) {
2,078✔
1724
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1725
        } else if (num_written > 0) {
2,074✔
1726
                // We are still writing the body.
1727
                WriteBody();
1,954✔
1728
        } else {
1729
                // We are finished.
1730
                FinishReply();
120✔
1731
        }
1732
}
4,018✔
1733

1734
void Stream::FinishReply() {
156✔
1735
        // We are done.
1736
        status_ = TransactionStatus::Done;
156✔
1737
        DoCancel();
156✔
1738
        // Release ownership of Body reader.
1739
        response_->body_reader_.reset();
156✔
1740
        response_->async_body_reader_.reset();
156✔
1741
        reply_finished_handler_(error::NoError);
156✔
1742
        server_.RemoveStream(shared_from_this());
156✔
1743
}
156✔
1744

1745
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1746
        SetupResponse();
9✔
1747

1748
        switch_protocol_handler_ = handler;
9✔
1749
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1750

1751
        auto &cancelled = cancelled_;
1752
        auto &response_data = response_data_;
9✔
1753

1754
        http::async_write_header(
18✔
1755
                socket_,
9✔
1756
                *response_data_.http_response_serializer_,
1757
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1758
                        if (!*cancelled) {
9✔
1759
                                SwitchingProtocolHandler(ec, num_written);
8✔
1760
                        }
1761
                });
9✔
1762

1763
        return error::NoError;
9✔
1764
}
1765

1766
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1767
        if (num_written > 0) {
8✔
1768
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1769
        }
1770

1771
        if (ec) {
8✔
UNCOV
1772
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
UNCOV
1773
                return;
×
1774
        }
1775

1776
        auto socket = make_shared<RawSocket<tcp::socket>>(
1777
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1778

1779
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1780

1781
        // Rest of the connection is done directly on the socket, set cancelled_ but don't close it.
1782
        *cancelled_ = true;
8✔
1783
        cancelled_ = make_shared<bool>(true);
8✔
1784
        server_.RemoveStream(shared_from_this());
16✔
1785

1786
        switch_protocol_handler(socket);
16✔
1787
}
1788

1789
void Stream::CallBodyHandler() {
217✔
1790
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1791
        // it immediately destroys, which would destroy this stream as well. At the end of this
1792
        // function, it's ok to destroy it.
1793
        auto stream_ref = shared_from_this();
1794

1795
        server_.body_handler_(request_, error::NoError);
651✔
1796

1797
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1798
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1799
        // request has not been handled correctly.
1800
        auto response = maybe_response_.lock();
217✔
1801
        if (!response) {
217✔
1802
                logger_.Error("Handler produced no response. Closing stream prematurely.");
6✔
1803
                *cancelled_ = true;
3✔
1804
                cancelled_ = make_shared<bool>(true);
3✔
1805
                server_.RemoveStream(shared_from_this());
9✔
1806
        }
1807
}
217✔
1808

1809
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
237✔
1810
        event_loop_ {event_loop},
1811
        acceptor_(GetAsioIoContext(event_loop_)) {
428✔
1812
}
237✔
1813

1814
Server::~Server() {
474✔
1815
        Cancel();
237✔
1816
}
237✔
1817

1818
error::Error Server::AsyncServeUrl(
202✔
1819
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1820
        return AsyncServeUrl(
1821
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
822✔
1822
                        if (err != error::NoError) {
212✔
1823
                                body_handler(expected::unexpected(err));
12✔
1824
                        } else {
1825
                                body_handler(req);
412✔
1826
                        }
1827
                });
616✔
1828
}
1829

1830
error::Error Server::AsyncServeUrl(
217✔
1831
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1832
        auto err = BreakDownUrl(url, address_);
217✔
1833
        if (error::NoError != err) {
217✔
UNCOV
1834
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1835
        }
1836

1837
        if (address_.protocol != "http") {
217✔
UNCOV
1838
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1839
        }
1840

1841
        if (address_.path.size() > 0 && address_.path != "/") {
217✔
1842
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1843
        }
1844

1845
        boost::system::error_code ec;
216✔
1846
        auto address = asio::ip::make_address(address_.host, ec);
216✔
1847
        if (ec) {
216✔
1848
                return error::Error(
UNCOV
1849
                        ec.default_error_condition(),
×
UNCOV
1850
                        "Could not construct endpoint from address " + address_.host);
×
1851
        }
1852

1853
        asio::ip::tcp::endpoint endpoint(address, address_.port);
216✔
1854

1855
        ec.clear();
1856
        acceptor_.open(endpoint.protocol(), ec);
216✔
1857
        if (ec) {
216✔
1858
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1859
        }
1860

1861
        // Allow address reuse, otherwise we can't re-bind later.
1862
        ec.clear();
1863
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
216✔
1864
        if (ec) {
216✔
UNCOV
1865
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1866
        }
1867

1868
        ec.clear();
1869
        acceptor_.bind(endpoint, ec);
216✔
1870
        if (ec) {
216✔
UNCOV
1871
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1872
        }
1873

1874
        ec.clear();
1875
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
216✔
1876
        if (ec) {
216✔
UNCOV
1877
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1878
        }
1879

1880
        header_handler_ = header_handler;
216✔
1881
        body_handler_ = body_handler;
216✔
1882

1883
        PrepareNewStream();
216✔
1884

1885
        return error::NoError;
216✔
1886
}
1887

1888
void Server::Cancel() {
257✔
1889
        if (acceptor_.is_open()) {
257✔
1890
                acceptor_.cancel();
216✔
1891
                acceptor_.close();
216✔
1892
        }
1893
        streams_.clear();
1894
}
257✔
1895

1896
uint16_t Server::GetPort() const {
17✔
1897
        return acceptor_.local_endpoint().port();
17✔
1898
}
1899

1900
string Server::GetUrl() const {
16✔
1901
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1902
}
1903

1904
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
216✔
1905
        if (*req->cancelled_) {
216✔
UNCOV
1906
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1907
        }
1908
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
432✔
1909
        req->stream_.maybe_response_ = response;
216✔
1910
        return response;
216✔
1911
}
1912

1913
error::Error Server::AsyncReply(
202✔
1914
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1915
        if (*resp->cancelled_) {
202✔
UNCOV
1916
                return MakeError(StreamCancelledError, "Cannot send response");
×
1917
        }
1918

1919
        resp->stream_.AsyncReply(reply_finished_handler);
202✔
1920
        return error::NoError;
202✔
1921
}
1922

1923
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
58✔
1924
        if (*req->cancelled_) {
58✔
UNCOV
1925
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1926
        }
1927

1928
        auto &stream = req->stream_;
58✔
1929
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
58✔
1930
                return expected::unexpected(error::Error(
1✔
1931
                        make_error_condition(errc::operation_in_progress),
2✔
1932
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1933
        }
1934

1935
        if (GetContentLength(*stream.request_data_.http_request_parser_) == 0
57✔
1936
                && !stream.request_data_.http_request_parser_->chunked()) {
57✔
1937
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
54✔
1938
        }
1939

1940
        stream.status_ = TransactionStatus::ReaderCreated;
39✔
1941
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
78✔
1942
}
1943

1944
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1945
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1946
}
1947

1948
void Server::PrepareNewStream() {
441✔
1949
        StreamPtr new_stream {new Stream(*this)};
441✔
1950
        streams_.insert(new_stream);
1951
        AsyncAccept(new_stream);
882✔
1952
}
441✔
1953

1954
void Server::AsyncAccept(StreamPtr stream) {
441✔
1955
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
669✔
1956
                if (ec) {
228✔
1957
                        if (ec != errc::operation_canceled) {
3✔
UNCOV
1958
                                log::Error("Could not accept connection: " + ec.message());
×
1959
                        }
1960
                        return;
3✔
1961
                }
1962

1963
                stream->AcceptHandler(ec);
225✔
1964

1965
                this->PrepareNewStream();
225✔
1966
        });
1967
}
441✔
1968

1969
void Server::RemoveStream(StreamPtr stream) {
188✔
1970
        streams_.erase(stream);
188✔
1971

1972
        stream->DoCancel();
188✔
1973
}
188✔
1974

1975
} // namespace http
1976
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc