• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1932482447

17 Jul 2025 02:00PM UTC coverage: 75.926% (+0.02%) from 75.909%
1932482447

push

gitlab-ci

web-flow
Merge pull request #1807 from danielskinstad/liboost-docker

chore: bump libboost_log to 1.83.0

7377 of 9716 relevant lines covered (75.93%)

13946.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.17
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
#include <mender-version.h>
28

29
namespace mender {
30
namespace common {
31
namespace http {
32

33
namespace common = mender::common;
34
namespace crypto = mender::common::crypto;
35

36
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
37
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
38
const unsigned int BeastHttpVersion = 11;
39

40
namespace asio = boost::asio;
41
namespace http = boost::beast::http;
42

43
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
44

45
static http::verb MethodToBeastVerb(Method method) {
248✔
46
        switch (method) {
248✔
47
        case Method::GET:
48
                return http::verb::get;
49
        case Method::HEAD:
50
                return http::verb::head;
51
        case Method::POST:
52
                return http::verb::post;
53
        case Method::PUT:
54
                return http::verb::put;
55
        case Method::PATCH:
56
                return http::verb::patch;
57
        case Method::CONNECT:
58
                return http::verb::connect;
59
        case Method::Invalid:
60
                // Fallthrough to end (no-op).
61
                break;
62
        }
63
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
64
        // still assert here for safety.
65
        assert(false);
66
        return http::verb::get;
67
}
68

69
static expected::expected<Method, error::Error> BeastVerbToMethod(
227✔
70
        http::verb verb, const string &verb_string) {
71
        switch (verb) {
227✔
72
        case http::verb::get:
191✔
73
                return Method::GET;
74
        case http::verb::head:
×
75
                return Method::HEAD;
76
        case http::verb::post:
13✔
77
                return Method::POST;
78
        case http::verb::put:
23✔
79
                return Method::PUT;
80
        case http::verb::patch:
×
81
                return Method::PATCH;
82
        case http::verb::connect:
×
83
                return Method::CONNECT;
84
        default:
×
85
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
86
        }
87
}
88

89
template <typename StreamType>
90
class BodyAsyncReader : virtual public io::AsyncReader {
91
public:
92
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
155✔
93
                stream_ {stream},
94
                cancelled_ {cancelled} {
310✔
95
        }
155✔
96
        ~BodyAsyncReader() {
39✔
97
                Cancel();
39✔
98
        }
78✔
99

100
        error::Error AsyncRead(
2,187✔
101
                vector<uint8_t>::iterator start,
102
                vector<uint8_t>::iterator end,
103
                io::AsyncIoHandler handler) override {
104
                if (eof_) {
2,187✔
105
                        handler(0);
×
106
                        return error::NoError;
×
107
                }
108

109
                if (*cancelled_) {
2,187✔
110
                        return error::MakeError(
×
111
                                error::ProgrammingError,
112
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
113
                }
114
                stream_.AsyncReadNextBodyPart(start, end, [this, handler](io::ExpectedSize size) {
13,723✔
115
                        if (size && size.value() == 0) {
6,616✔
116
                                eof_ = true;
127✔
117
                        }
118
                        handler(size);
13,232✔
119
                });
120
                return error::NoError;
2,187✔
121
        }
122

123
        void Cancel() override {
39✔
124
                if (!*cancelled_) {
39✔
125
                        stream_.Cancel();
4✔
126
                }
127
        }
39✔
128

129
private:
130
        StreamType &stream_;
131
        shared_ptr<bool> cancelled_;
132
        bool eof_ {false};
133

134
        friend class Client;
135
        friend class Server;
136
};
137

138
template <typename StreamType>
139
class RawSocket : virtual public io::AsyncReadWriter {
140
public:
141
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
142
                destroying_ {make_shared<bool>(false)},
×
143
                stream_ {stream},
144
                buffered_ {buffered} {
×
145
                // If there are no buffered bytes, then we don't need it.
146
                if (buffered_ && buffered_->size() == 0) {
×
147
                        buffered_.reset();
×
148
                }
149
        }
×
150

151
        ~RawSocket() {
15✔
152
                *destroying_ = true;
15✔
153
                Cancel();
15✔
154
        }
30✔
155

156
        error::Error AsyncRead(
320✔
157
                vector<uint8_t>::iterator start,
158
                vector<uint8_t>::iterator end,
159
                io::AsyncIoHandler handler) override {
160
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
161
                // header and parts of the body in one block, return those first.
162
                if (buffered_) {
320✔
163
                        return DrainPrebufferedData(start, end, handler);
8✔
164
                }
165

166
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
167
                auto &destroying = destroying_;
168
                stream_->async_read_some(
632✔
169
                        read_buffer_,
316✔
170
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
171
                                if (*destroying) {
313✔
172
                                        return;
173
                                }
174

175
                                if (ec == asio::error::operation_aborted) {
313✔
176
                                        handler(expected::unexpected(error::Error(
12✔
177
                                                make_error_condition(errc::operation_canceled),
6✔
178
                                                "Could not read from socket")));
179
                                } else if (ec) {
310✔
180
                                        handler(expected::unexpected(
12✔
181
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
182
                                } else {
183
                                        handler(num_read);
608✔
184
                                }
185
                        });
186
                return error::NoError;
316✔
187
        }
188

189
        error::Error AsyncWrite(
309✔
190
                vector<uint8_t>::const_iterator start,
191
                vector<uint8_t>::const_iterator end,
192
                io::AsyncIoHandler handler) override {
193
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
194
                auto &destroying = destroying_;
195
                stream_->async_write_some(
618✔
196
                        write_buffer_,
309✔
197
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
198
                                if (*destroying) {
306✔
199
                                        return;
200
                                }
201

202
                                if (ec == asio::error::operation_aborted) {
306✔
203
                                        handler(expected::unexpected(error::Error(
×
204
                                                make_error_condition(errc::operation_canceled),
×
205
                                                "Could not write to socket")));
206
                                } else if (ec) {
306✔
207
                                        handler(expected::unexpected(
×
208
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
209
                                } else {
210
                                        handler(num_written);
612✔
211
                                }
212
                        });
213
                return error::NoError;
309✔
214
        }
215

216
        void Cancel() override {
28✔
217
                if (stream_->lowest_layer().is_open()) {
28✔
218
                        stream_->lowest_layer().cancel();
15✔
219
                        stream_->lowest_layer().close();
15✔
220
                }
221
        }
28✔
222

223
private:
224
        error::Error DrainPrebufferedData(
4✔
225
                vector<uint8_t>::iterator start,
226
                vector<uint8_t>::iterator end,
227
                io::AsyncIoHandler handler) {
228
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
229

230
                // These two lines are equivalent to:
231
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
232
                // but compatible with Boost 1.67.
233
                const beast::flat_buffer &cbuffered = *buffered_;
234
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
235
                buffered_->consume(to_copy);
4✔
236
                if (buffered_->size() == 0) {
4✔
237
                        // We don't need it anymore.
238
                        buffered_.reset();
4✔
239
                }
240
                handler(to_copy);
4✔
241
                return error::NoError;
4✔
242
        }
243

244
        shared_ptr<bool> destroying_;
245
        shared_ptr<StreamType> stream_;
246
        shared_ptr<beast::flat_buffer> buffered_;
247
        asio::mutable_buffer read_buffer_;
248
        asio::const_buffer write_buffer_;
249
};
250

251
template <typename PARSER>
252
int64_t GetContentLength(const PARSER &parser) {
402✔
253
        auto content_length = parser.content_length();
402✔
254
        if (content_length) {
402✔
255
                return content_length.value();
351✔
256
        } else {
257
                return 0;
258
        }
259
}
260

261
expected::ExpectedBool HasBody(
451✔
262
        const expected::ExpectedString &content_length,
263
        const expected::ExpectedString &transfer_encoding) {
264
        if (transfer_encoding) {
451✔
265
                if (transfer_encoding.value() != "chunked") {
4✔
266
                        return expected::unexpected(error::Error(
×
267
                                make_error_condition(errc::not_supported),
×
268
                                "Unsupported Transfer-Encoding: " + transfer_encoding.value()));
×
269
                }
270
                return true;
271
        }
272

273
        if (content_length) {
447✔
274
                auto length = common::StringToLongLong(content_length.value());
219✔
275
                if (!length || length.value() < 0) {
219✔
276
                        return expected::unexpected(error::Error(
×
277
                                length.error().code,
278
                                "Content-Length contains invalid number: " + content_length.value()));
×
279
                }
280
                return length.value() > 0;
219✔
281
        }
282

283
        return false;
284
}
285

286
Client::Client(
354✔
287
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
288
        event_loop_ {event_loop},
289
        logger_name_ {logger_name},
290
        client_config_ {client},
291
        http_proxy_ {client.http_proxy},
354✔
292
        https_proxy_ {client.https_proxy},
354✔
293
        no_proxy_ {client.no_proxy},
354✔
294
        cancelled_ {make_shared<bool>(true)},
×
295
        resolver_(GetAsioIoContext(event_loop)),
296
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,416✔
297
}
354✔
298

299
Client::~Client() {
2,124✔
300
        if (!*cancelled_) {
354✔
301
                logger_.Warning("Client destroyed while request is still active!");
30✔
302
        }
303
        DoCancel();
354✔
304
}
354✔
305

306
error::Error Client::Initialize() {
286✔
307
        if (initialized_) {
286✔
308
                return error::NoError;
72✔
309
        }
310

311
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
634✔
312
                ssl_ctx_[i].set_verify_mode(
846✔
313
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
314

315
                beast::error_code ec {};
424✔
316
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
424✔
317
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
318
                        ssl_ctx_[i].use_certificate_file(
319
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
320
                        if (ec) {
4✔
321
                                return error::Error(
322
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
323
                        }
324
                        auto exp_key = crypto::PrivateKey::Load(
325
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
326
                        if (!exp_key) {
3✔
327
                                return exp_key.error().WithContext(
328
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
329
                        }
330

331
                        const int ret =
332
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value()->Get());
2✔
333
                        if (ret != 1) {
2✔
334
                                return MakeError(
335
                                        HTTPInitError,
336
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
337
                                                + " to the SSL CTX");
×
338
                        }
339
                } else if (
340
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
420✔
341
                        return error::Error(
342
                                make_error_condition(errc::invalid_argument),
4✔
343
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
344
                }
345

346
                bool cert_loaded = true;
347
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
420✔
348
                if (ec) {
420✔
349
                        auto err = error::Error(
350
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
351
                        if (client_config_.server_cert_path == "") {
×
352
                                // We aren't going to have any valid certificates then.
353
                                return err;
×
354
                        } else {
355
                                // We have a dedicated certificate, so this is not fatal.
356
                                log::Info(err.String());
×
357
                                cert_loaded = false;
358
                        }
359
                }
360
                if (client_config_.server_cert_path != "") {
420✔
361
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
52✔
362
                        if (ec) {
52✔
363
                                log::Warning("Failed to load the server certificate! Falling back to the CA store");
4✔
364
                                if (!cert_loaded) {
2✔
365
                                        return error::Error(
366
                                                ec.default_error_condition(),
×
367
                                                "Failed to load SSL default directory and server certificate");
×
368
                                }
369
                        }
370
                }
371
        }
372

373
        initialized_ = true;
210✔
374

375
        return error::NoError;
210✔
376
}
377

378
// Create the HOST header according to:
379
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.23
380
// In short: Add the port-number if it is non-standard HTTP
381
static string CreateHOSTAddress(OutgoingRequestPtr req) {
274✔
382
        if (req->GetPort() == 80 || req->GetPort() == 443) {
274✔
383
                return req->GetHost();
4✔
384
        }
385
        return req->GetHost() + ":" + to_string(req->GetPort());
540✔
386
}
387

388
error::Error Client::AsyncCall(
286✔
389
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
390
        auto err = Initialize();
286✔
391
        if (err != error::NoError) {
286✔
392
                return err;
4✔
393
        }
394

395
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
282✔
396
                return error::Error(
397
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
398
        }
399

400
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
282✔
401
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
402
        }
403

404
        if (!header_handler || !body_handler) {
280✔
405
                return error::MakeError(
406
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
407
        }
408

409
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
279✔
410
                return error::Error(
411
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
412
        }
413

414
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
278✔
415

416
        request_ = req;
417

418
        err = HandleProxySetup();
278✔
419
        if (err != error::NoError) {
278✔
420
                return err;
4✔
421
        }
422

423
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
424
        // request to route to our k8s cluster. Set this in all cases.
425
        const string header_url = CreateHOSTAddress(req);
548✔
426
        req->SetHeader("HOST", header_url);
548✔
427

428
        log::Trace("Setting HOST address: " + header_url);
548✔
429

430
        // Add User-Agent header for all requests
431
        req->SetHeader("User-Agent", "Mender/" MENDER_VERSION);
548✔
432

433
        header_handler_ = header_handler;
274✔
434
        body_handler_ = body_handler;
274✔
435
        status_ = TransactionStatus::None;
274✔
436

437
        cancelled_ = make_shared<bool>(false);
274✔
438

439
        auto &cancelled = cancelled_;
440

441
        resolver_.async_resolve(
548✔
442
                request_->address_.host,
443
                to_string(request_->address_.port),
548✔
444
                [this, cancelled](
544✔
445
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
269✔
446
                        if (!*cancelled) {
270✔
447
                                ResolveHandler(ec, results);
269✔
448
                        }
449
                });
270✔
450

451
        return error::NoError;
274✔
452
}
453

454
static inline error::Error AddProxyAuthHeader(OutgoingRequest &req, BrokenDownUrl &proxy_address) {
22✔
455
        if (proxy_address.username == "") {
22✔
456
                // nothing to do
457
                return error::NoError;
19✔
458
        }
459
        auto ex_dec_username = URLDecode(proxy_address.username);
3✔
460
        auto ex_dec_password = URLDecode(proxy_address.password);
3✔
461
        if (!ex_dec_username) {
3✔
462
                return ex_dec_username.error();
×
463
        }
464
        if (!ex_dec_password) {
3✔
465
                return ex_dec_password.error();
×
466
        }
467
        auto creds = ex_dec_username.value() + ":" + ex_dec_password.value();
3✔
468
        auto ex_encoded_creds = crypto::EncodeBase64(common::ByteVectorFromString(creds));
6✔
469
        if (!ex_encoded_creds) {
3✔
470
                return ex_encoded_creds.error();
×
471
        }
472
        req.SetHeader("Proxy-Authorization", "Basic " + ex_encoded_creds.value());
6✔
473
        log::Warning(
3✔
474
                "Avoid using basic authentication if possible, and make sure if it's used, it's through HTTPS");
6✔
475

476
        return error::NoError;
3✔
477
}
478

479
error::Error Client::HandleProxySetup() {
278✔
480
        secondary_req_.reset();
278✔
481

482
        if (request_->address_.protocol == "http") {
278✔
483
                socket_mode_ = SocketMode::Plain;
252✔
484

485
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
252✔
486
                        // Make a modified proxy request.
487
                        BrokenDownUrl proxy_address;
20✔
488
                        auto err = BreakDownUrl(http_proxy_, proxy_address, true);
11✔
489
                        if (err != error::NoError) {
11✔
490
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
491
                        }
492
                        if (proxy_address.path != "" && proxy_address.path != "/") {
10✔
493
                                return MakeError(
494
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
495
                        }
496

497
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
18✔
498
                                                                          + ":" + to_string(request_->address_.port)
27✔
499
                                                                          + request_->address_.path;
27✔
500
                        request_->address_.host = proxy_address.host;
9✔
501
                        request_->address_.port = proxy_address.port;
9✔
502
                        request_->address_.protocol = proxy_address.protocol;
9✔
503

504
                        err = AddProxyAuthHeader(*request_, proxy_address);
9✔
505
                        if (err != error::NoError) {
9✔
506
                                return err;
×
507
                        }
508

509
                        if (proxy_address.protocol == "https") {
9✔
510
                                socket_mode_ = SocketMode::Tls;
5✔
511
                        } else if (proxy_address.protocol == "http") {
4✔
512
                                socket_mode_ = SocketMode::Plain;
4✔
513
                        } else {
514
                                // Should never get here.
515
                                assert(false);
516
                        }
517
                }
518
        } else if (request_->address_.protocol == "https") {
26✔
519
                socket_mode_ = SocketMode::Tls;
26✔
520

521
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
26✔
522
                        // Save the original request for later, so that we can make a new request
523
                        // over the channel established by CONNECT.
524
                        secondary_req_ = std::move(request_);
525

526
                        request_ = make_shared<OutgoingRequest>();
30✔
527
                        request_->SetMethod(Method::CONNECT);
15✔
528
                        BrokenDownUrl proxy_address;
28✔
529
                        auto err = BreakDownUrl(https_proxy_, proxy_address, true);
15✔
530
                        if (err != error::NoError) {
15✔
531
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
532
                        }
533
                        if (proxy_address.path != "" && proxy_address.path != "/") {
14✔
534
                                return MakeError(
535
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
536
                        }
537

538
                        request_->address_.path =
539
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
26✔
540
                        request_->address_.host = proxy_address.host;
13✔
541
                        request_->address_.port = proxy_address.port;
13✔
542
                        request_->address_.protocol = proxy_address.protocol;
13✔
543

544
                        err = AddProxyAuthHeader(*request_, proxy_address);
13✔
545
                        if (err != error::NoError) {
13✔
546
                                return err;
×
547
                        }
548

549
                        if (proxy_address.protocol == "https") {
13✔
550
                                socket_mode_ = SocketMode::Tls;
7✔
551
                        } else if (proxy_address.protocol == "http") {
6✔
552
                                socket_mode_ = SocketMode::Plain;
6✔
553
                        } else {
554
                                // Should never get here.
555
                                assert(false);
556
                        }
557
                }
558
        } else {
559
                // Should never get here
560
                assert(false);
561
        }
562

563
        return error::NoError;
274✔
564
}
565

566
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
174✔
567
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
174✔
568
                return expected::unexpected(error::Error(
2✔
569
                        make_error_condition(errc::operation_in_progress),
4✔
570
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
571
        }
572

573
        if (GetContentLength(*response_data_.http_response_parser_) == 0
172✔
574
                && !response_data_.http_response_parser_->chunked()) {
172✔
575
                return expected::unexpected(
17✔
576
                        MakeError(BodyMissingError, "Response does not contain a body"));
51✔
577
        }
578

579
        status_ = TransactionStatus::ReaderCreated;
155✔
580
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
310✔
581
}
582

583
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
584
        if (*cancelled_) {
7✔
585
                return expected::unexpected(error::Error(
×
586
                        make_error_condition(errc::not_connected),
×
587
                        "Cannot switch protocols if endpoint is not connected"));
×
588
        }
589

590
        // Rest of the connection is done directly on the socket, we are done here.
591
        status_ = TransactionStatus::Done;
7✔
592
        *cancelled_ = true;
7✔
593
        cancelled_ = make_shared<bool>(false);
14✔
594

595
        auto stream = stream_;
596
        // This no longer belongs to us.
597
        stream_.reset();
7✔
598

599
        switch (socket_mode_) {
7✔
600
        case SocketMode::TlsTls:
×
601
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
602
                        stream, response_data_.response_buffer_);
×
603
        case SocketMode::Tls:
×
604
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
605
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
606
                        response_data_.response_buffer_);
×
607
        case SocketMode::Plain:
7✔
608
                return make_shared<RawSocket<tcp::socket>>(
7✔
609
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
610
                        response_data_.response_buffer_);
7✔
611
        }
612

613
        AssertOrReturnUnexpected(false);
×
614
}
615

616
void Client::CallHandler(ResponseHandler handler) {
360✔
617
        // This function exists to make sure we have a copy of the handler we're calling (in the
618
        // argument list). This is important in case the handler owns the client instance through a
619
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
620
        // does, then it destroys the final copy of the handler, and therefore also the client,
621
        // which is why we need to make a copy here, before calling it.
622
        handler(response_);
360✔
623
}
360✔
624

625
void Client::CallErrorHandler(
83✔
626
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
627
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
249✔
628
}
83✔
629

630
void Client::CallErrorHandler(
125✔
631
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
632
        status_ = TransactionStatus::Done;
125✔
633
        DoCancel();
125✔
634
        handler(expected::unexpected(
250✔
635
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
500✔
636
}
125✔
637

638
void Client::ResolveHandler(
269✔
639
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
640
        if (ec) {
269✔
641
                CallErrorHandler(ec, request_, header_handler_);
×
642
                return;
×
643
        }
644

645
        if (logger_.Level() >= log::LogLevel::Debug) {
269✔
646
                string ips = "[";
247✔
647
                string sep;
648
                for (auto r : results) {
1,036✔
649
                        ips += sep;
271✔
650
                        ips += r.endpoint().address().to_string();
271✔
651
                        sep = ", ";
271✔
652
                }
653
                ips += "]";
247✔
654
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
494✔
655
        }
656

657
        resolver_results_ = results;
658

659
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
269✔
660
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
538✔
661

662
        if (response_data_.response_buffer_) {
269✔
663
                // We can reuse this if preexisting, just make sure we start with a
664
                // clean state (while avoiding shrinking/discarding the buffer, see
665
                // https://www.boost.org/doc/libs/1_70_0/libs/beast/doc/html/beast/ref/boost__beast__basic_flat_buffer/clear.html
666
                // for details).
667
                // Since there should be no leftover bytes from previous responses, we
668
                // log if there are some, but let's not bother all users with a warning,
669
                // there is nothing they could do about it. However, for
670
                // testing/debugging/CI, it can be useful to have this information.
671
                if (response_data_.response_buffer_->size() > 0) {
70✔
672
                        logger_.Debug(
1✔
673
                                "Leftover data from the previous response! ("
674
                                + to_string(response_data_.response_buffer_->size()) + " bytes)");
2✔
675
                }
676
                response_data_.response_buffer_->clear();
677
        } else {
678
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
398✔
679

680
                // This is equivalent to:
681
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
682
                // but compatible with Boost 1.67.
683
                response_data_.response_buffer_->prepare(
684
                        body_buffer_.size() - response_data_.response_buffer_->size());
199✔
685
        }
686

687
        auto &cancelled = cancelled_;
688

689
        asio::async_connect(
269✔
690
                stream_->lowest_layer(),
691
                resolver_results_,
269✔
692
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
538✔
693
                        if (!*cancelled) {
269✔
694
                                switch (socket_mode_) {
269✔
695
                                case SocketMode::TlsTls:
×
696
                                        // Should never happen because we always need to handshake
697
                                        // the innermost Tls first, then the outermost, but the
698
                                        // latter doesn't happen here.
699
                                        assert(false);
700
                                        CallErrorHandler(
×
701
                                                error::MakeError(
×
702
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
703
                                                request_,
×
704
                                                header_handler_);
×
705
                                case SocketMode::Tls:
21✔
706
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
21✔
707
                                case SocketMode::Plain:
248✔
708
                                        return ConnectHandler(ec, endpoint);
248✔
709
                                }
710
                        }
711
                });
712
}
713

714
template <typename StreamType>
715
void Client::HandshakeHandler(
25✔
716
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
717
        if (ec) {
25✔
718
                CallErrorHandler(ec, request_, header_handler_);
2✔
719
                return;
2✔
720
        }
721

722
        // Enable TCP keepalive
723
        boost::asio::socket_base::keep_alive option(true);
724
        stream_->lowest_layer().set_option(option);
23✔
725

726
        // We can't avoid a C style cast on this next line. The usual method by which system headers
727
        // are excluded from warnings doesn't work, because `SSL_set_tlsext_host_name` is a macro,
728
        // containing a cast, which expands here, not in the original file. So just disable the
729
        // warning here.
730
#ifdef __clang__
731
#pragma clang diagnostic push
732
#pragma clang diagnostic ignored "-Wold-style-cast"
733
#else
734
#pragma GCC diagnostic push
735
#pragma GCC diagnostic ignored "-Wold-style-cast"
736
#endif
737
        // Set SNI Hostname (many hosts need this to handshake successfully)
738
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
23✔
739
#ifdef __clang__
740
#pragma clang diagnostic pop
741
#else
742
#pragma GCC diagnostic pop
743
#endif
744
                beast::error_code ec2 {
×
745
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
746
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
747
        }
748

749
        // Enable host name verification (not done automatically and we don't have
750
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
751
        // hence the callback that boost provides).
752
        boost::system::error_code b_ec;
23✔
753
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
46✔
754
        if (b_ec) {
23✔
755
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
756
                CallErrorHandler(b_ec, request_, header_handler_);
×
757
                return;
×
758
        }
759

760
        auto &cancelled = cancelled_;
761

762
        stream.async_handshake(
46✔
763
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
23✔
764
                        if (*cancelled) {
26✔
765
                                return;
766
                        }
767
                        if (ec) {
26✔
768
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
769
                                CallErrorHandler(ec, request_, header_handler_);
10✔
770
                                return;
10✔
771
                        }
772
                        logger_.Debug("https: Successful SSL handshake");
32✔
773
                        ConnectHandler(ec, endpoint);
16✔
774
                });
775
}
776

777

778
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
264✔
779
        if (ec) {
264✔
780
                CallErrorHandler(ec, request_, header_handler_);
16✔
781
                return;
16✔
782
        }
783

784
        // Enable TCP keepalive
785
        boost::asio::socket_base::keep_alive option(true);
786
        stream_->lowest_layer().set_option(option);
248✔
787

788
        logger_.Debug("Connected to " + endpoint.address().to_string());
496✔
789

790
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
248✔
791
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
496✔
792

793
        for (const auto &header : request_->headers_) {
889✔
794
                request_data_.http_request_->set(header.first, header.second);
641✔
795
        }
796

797
        request_data_.http_request_serializer_ =
798
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
248✔
799

800
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
496✔
801

802
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
803
        // if they do, they should be handled higher up in the application logic.
804
        //
805
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
806
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
807
        // `has_value()` in their code, causing their subsequent comparison operation to
808
        // misbehave. So pass highest possible value instead.
809
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
810

811
        auto &cancelled = cancelled_;
812
        auto &request_data = request_data_;
248✔
813

814
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
248✔
815
                if (!*cancelled) {
248✔
816
                        WriteHeaderHandler(ec, num_written);
248✔
817
                }
818
        };
496✔
819

820
        switch (socket_mode_) {
248✔
821
        case SocketMode::TlsTls:
2✔
822
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
2✔
823
                break;
824
        case SocketMode::Tls:
14✔
825
                http::async_write_header(
14✔
826
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
827
                break;
828
        case SocketMode::Plain:
232✔
829
                http::async_write_header(
232✔
830
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
831
                break;
832
        }
833
}
834

835
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
248✔
836
        if (num_written > 0) {
248✔
837
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
496✔
838
        }
839

840
        if (ec) {
248✔
841
                CallErrorHandler(ec, request_, header_handler_);
×
842
                return;
208✔
843
        }
844

845
        auto exp_has_body =
846
                HasBody(request_->GetHeader("Content-Length"), request_->GetHeader("Transfer-Encoding"));
496✔
847
        if (!exp_has_body) {
248✔
848
                CallErrorHandler(exp_has_body.error(), request_, header_handler_);
×
849
                return;
×
850
        }
851
        if (!exp_has_body.value()) {
248✔
852
                ReadHeader();
207✔
853
                return;
854
        }
855

856
        if (!request_->body_gen_ && !request_->async_body_gen_) {
41✔
857
                auto err = MakeError(BodyMissingError, "No body generator");
2✔
858
                CallErrorHandler(err, request_, header_handler_);
2✔
859
                return;
860
        }
861

862
        assert(!(request_->body_gen_ && request_->async_body_gen_));
863

864
        if (request_->body_gen_) {
40✔
865
                auto body_reader = request_->body_gen_();
34✔
866
                if (!body_reader) {
34✔
867
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
868
                        return;
869
                }
870
                request_->body_reader_ = body_reader.value();
34✔
871
        } else {
872
                auto body_reader = request_->async_body_gen_();
6✔
873
                if (!body_reader) {
6✔
874
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
875
                        return;
876
                }
877
                request_->async_body_reader_ = body_reader.value();
6✔
878
        }
879

880
        PrepareAndWriteNewBodyBuffer();
40✔
881
}
882

883
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,278✔
884
        if (num_written > 0) {
2,278✔
885
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,242✔
886
        }
887

888
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,278✔
889
                // Write next block of the body.
890
                PrepareAndWriteNewBodyBuffer();
1,120✔
891
        } else if (ec) {
1,158✔
892
                CallErrorHandler(ec, request_, header_handler_);
8✔
893
        } else if (num_written > 0) {
1,154✔
894
                // We are still writing the body.
895
                WriteBody();
1,121✔
896
        } else {
897
                // We are ready to receive the response.
898
                ReadHeader();
33✔
899
        }
900
}
2,278✔
901

902
void Client::PrepareAndWriteNewBodyBuffer() {
1,160✔
903
        // request_->body_reader_ XOR request_->async_body_reader_
904
        assert(
905
                (request_->body_reader_ || request_->async_body_reader_)
906
                && !(request_->body_reader_ && request_->async_body_reader_));
907

908
        auto cancelled = cancelled_;
909
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,592✔
910
                if (!*cancelled) {
1,160✔
911
                        if (!read) {
1,159✔
912
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
913
                                return;
2✔
914
                        }
915
                        WriteNewBodyBuffer(read.value());
1,157✔
916
                }
917
        };
1,160✔
918

919

920
        if (request_->body_reader_) {
1,160✔
921
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,470✔
922
        } else {
923
                auto err = request_->async_body_reader_->AsyncRead(
924
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
925
                if (err != error::NoError) {
425✔
926
                        CallErrorHandler(err, request_, header_handler_);
×
927
                }
928
        }
929
}
1,160✔
930

931
void Client::WriteNewBodyBuffer(size_t size) {
1,157✔
932
        request_data_.http_request_->body().data = body_buffer_.data();
1,157✔
933
        request_data_.http_request_->body().size = size;
1,157✔
934

935
        if (size > 0) {
1,157✔
936
                request_data_.http_request_->body().more = true;
1,124✔
937
        } else {
938
                // Release ownership of Body reader.
939
                request_->body_reader_.reset();
33✔
940
                request_->async_body_reader_.reset();
33✔
941
                request_data_.http_request_->body().more = false;
33✔
942
        }
943

944
        WriteBody();
1,157✔
945
}
1,157✔
946

947
void Client::WriteBody() {
2,278✔
948
        auto &cancelled = cancelled_;
949
        auto &request_data = request_data_;
2,278✔
950

951
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,278✔
952
                if (!*cancelled) {
2,278✔
953
                        WriteBodyHandler(ec, num_written);
2,278✔
954
                }
955
        };
4,556✔
956

957
        switch (socket_mode_) {
2,278✔
958
        case SocketMode::TlsTls:
×
959
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
960
                break;
961
        case SocketMode::Tls:
×
962
                http::async_write_some(
963
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
964
                break;
965
        case SocketMode::Plain:
2,278✔
966
                http::async_write_some(
967
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
968
                break;
969
        }
970
}
2,278✔
971

972
void Client::ReadHeader() {
240✔
973
        auto &cancelled = cancelled_;
974
        auto &response_data = response_data_;
240✔
975

976
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
237✔
977
                if (!*cancelled) {
237✔
978
                        ReadHeaderHandler(ec, num_read);
237✔
979
                }
980
        };
480✔
981

982
        switch (socket_mode_) {
240✔
983
        case SocketMode::TlsTls:
2✔
984
                http::async_read_some(
2✔
985
                        *stream_,
986
                        *response_data_.response_buffer_,
987
                        *response_data_.http_response_parser_,
988
                        handler);
989
                break;
990
        case SocketMode::Tls:
14✔
991
                http::async_read_some(
14✔
992
                        stream_->next_layer(),
993
                        *response_data_.response_buffer_,
994
                        *response_data_.http_response_parser_,
995
                        handler);
996
                break;
997
        case SocketMode::Plain:
224✔
998
                http::async_read_some(
224✔
999
                        stream_->next_layer().next_layer(),
1000
                        *response_data_.response_buffer_,
1001
                        *response_data_.http_response_parser_,
1002
                        handler);
1003
                break;
1004
        }
1005
}
240✔
1006

1007
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
237✔
1008
        if (num_read > 0) {
237✔
1009
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
464✔
1010
        }
1011

1012
        if (ec) {
237✔
1013
                CallErrorHandler(ec, request_, header_handler_);
5✔
1014
                return;
65✔
1015
        }
1016

1017
        if (!response_data_.http_response_parser_->is_header_done()) {
232✔
1018
                ReadHeader();
×
1019
                return;
×
1020
        }
1021

1022
        if (secondary_req_) {
232✔
1023
                HandleSecondaryRequest();
9✔
1024
                return;
9✔
1025
        }
1026

1027
        response_.reset(new IncomingResponse(*this, cancelled_));
446✔
1028
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
223✔
1029
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
223✔
1030

1031
        logger_.Debug(
446✔
1032
                "Received response: " + to_string(response_->status_code_) + " "
446✔
1033
                + response_->status_message_);
669✔
1034

1035
        string debug_str;
1036
        for (auto header = response_data_.http_response_parser_->get().cbegin();
257✔
1037
                 header != response_data_.http_response_parser_->get().cend();
480✔
1038
                 header++) {
1039
                response_->headers_[string {header->name_string()}] = string {header->value()};
771✔
1040
                if (logger_.Level() >= log::LogLevel::Debug) {
257✔
1041
                        debug_str += string {header->name_string()};
242✔
1042
                        debug_str += ": ";
242✔
1043
                        debug_str += string {header->value()};
242✔
1044
                        debug_str += "\n";
242✔
1045
                }
1046
        }
1047

1048
        logger_.Debug("Received headers:\n" + debug_str);
446✔
1049
        debug_str.clear();
1050

1051
        if (GetContentLength(*response_data_.http_response_parser_) == 0
223✔
1052
                && !response_data_.http_response_parser_->chunked()) {
223✔
1053
                auto cancelled = cancelled_;
1054
                status_ = TransactionStatus::HeaderHandlerCalled;
48✔
1055
                CallHandler(header_handler_);
96✔
1056
                if (!*cancelled) {
48✔
1057
                        status_ = TransactionStatus::Done;
43✔
1058
                        if (response_->status_code_ != StatusCode::StatusSwitchingProtocols) {
43✔
1059
                                // Make an exception for 101 Switching Protocols response, where the TCP connection
1060
                                // is meant to be reused.
1061
                                DoCancel();
39✔
1062
                        }
1063
                        CallHandler(body_handler_);
86✔
1064
                }
1065
                return;
1066
        }
1067

1068
        auto cancelled = cancelled_;
1069
        status_ = TransactionStatus::HeaderHandlerCalled;
175✔
1070
        CallHandler(header_handler_);
350✔
1071
        if (*cancelled) {
175✔
1072
                return;
1073
        }
1074

1075
        // We know that a body reader is required here, because of the check for body above.
1076
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
172✔
1077
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
1078
        }
1079
}
1080

1081
void Client::HandleSecondaryRequest() {
9✔
1082
        logger_.Debug(
18✔
1083
                "Received proxy response: "
1084
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
18✔
1085
                + string {response_data_.http_response_parser_->get().reason()});
36✔
1086

1087
        request_ = std::move(secondary_req_);
1088

1089
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
9✔
1090
                auto err = MakeError(
1091
                        ProxyError,
1092
                        "Proxy returned unexpected response: "
1093
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
1094
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
1095
                CallErrorHandler(err, request_, header_handler_);
4✔
1096
                return;
1097
        }
1098

1099
        if (GetContentLength(*response_data_.http_response_parser_) != 0
7✔
1100
                || response_data_.http_response_parser_->chunked()) {
7✔
1101
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
1102
                CallErrorHandler(err, request_, header_handler_);
×
1103
                return;
1104
        }
1105

1106
        // We are connected. Now repeat the request cycle with the original request. Pretend
1107
        // we were just connected.
1108

1109
        assert(request_->GetProtocol() == "https");
1110

1111
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1112
        // a different layer, this will get out of sync.
1113
        assert(response_data_.response_buffer_->size() == 0);
1114

1115
        switch (socket_mode_) {
7✔
1116
        case SocketMode::TlsTls:
×
1117
                // Should never get here, because this is the only place where TlsTls mode
1118
                // is supposed to be turned on.
1119
                assert(false);
1120
                CallErrorHandler(
×
1121
                        error::MakeError(
×
1122
                                error::ProgrammingError,
1123
                                "Any other mode than Tls is not valid when handling secondary request"),
×
1124
                        request_,
×
1125
                        header_handler_);
×
1126
                break;
×
1127
        case SocketMode::Tls:
3✔
1128
                // Upgrade to TLS inside TLS.
1129
                socket_mode_ = SocketMode::TlsTls;
3✔
1130
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1131
                break;
3✔
1132
        case SocketMode::Plain:
4✔
1133
                // Upgrade to TLS.
1134
                socket_mode_ = SocketMode::Tls;
4✔
1135
                HandshakeHandler(
4✔
1136
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
4✔
1137
                break;
4✔
1138
        }
1139
}
1140

1141
void Client::AsyncReadNextBodyPart(
4,661✔
1142
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1143
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1144

1145
        if (status_ == TransactionStatus::ReaderCreated) {
4,661✔
1146
                status_ = TransactionStatus::BodyReadingInProgress;
153✔
1147
        }
1148

1149
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
4,661✔
1150
                auto cancelled = cancelled_;
1151
                handler(0);
188✔
1152
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
94✔
1153
                        status_ = TransactionStatus::Done;
94✔
1154
                        DoCancel();
94✔
1155
                        CallHandler(body_handler_);
188✔
1156
                }
1157
                return;
1158
        }
1159

1160
        reader_buf_start_ = start;
4,567✔
1161
        reader_buf_end_ = end;
4,567✔
1162
        reader_handler_ = handler;
4,567✔
1163
        size_t read_size = end - start;
4,567✔
1164
        size_t smallest = min(body_buffer_.size(), read_size);
6,680✔
1165

1166
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
4,567✔
1167
        response_data_.http_response_parser_->get().body().size = smallest;
4,567✔
1168
        response_data_.last_buffer_size_ = smallest;
4,567✔
1169

1170
        auto &cancelled = cancelled_;
1171
        auto &response_data = response_data_;
4,567✔
1172

1173
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4,566✔
1174
                if (!*cancelled) {
4,566✔
1175
                        ReadBodyHandler(ec, num_read);
4,566✔
1176
                }
1177
        };
9,134✔
1178

1179
        switch (socket_mode_) {
4,567✔
1180
        case SocketMode::TlsTls:
2✔
1181
                http::async_read_some(
2✔
1182
                        *stream_,
1183
                        *response_data_.response_buffer_,
1184
                        *response_data_.http_response_parser_,
1185
                        async_handler);
1186
                break;
1187
        case SocketMode::Tls:
4✔
1188
                http::async_read_some(
4✔
1189
                        stream_->next_layer(),
1190
                        *response_data_.response_buffer_,
1191
                        *response_data_.http_response_parser_,
1192
                        async_handler);
1193
                break;
1194
        case SocketMode::Plain:
4,561✔
1195
                http::async_read_some(
4,561✔
1196
                        stream_->next_layer().next_layer(),
1197
                        *response_data_.response_buffer_,
1198
                        *response_data_.http_response_parser_,
1199
                        async_handler);
1200
                break;
1201
        }
1202
}
1203

1204
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
4,566✔
1205
        if (num_read > 0) {
4,566✔
1206
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
9,032✔
1207
        }
1208

1209
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,566✔
1210
                // This can be ignored. We always reset the buffer between reads anyway.
1211
                ec = error_code();
1,958✔
1212
        }
1213

1214
        assert(reader_handler_);
1215

1216
        if (response_data_.http_response_parser_->is_done()) {
4,566✔
1217
                status_ = TransactionStatus::BodyReadingFinished;
100✔
1218
        }
1219

1220
        auto cancelled = cancelled_;
1221

1222
        if (ec) {
4,566✔
1223
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
100✔
1224
                reader_handler_(expected::unexpected(err));
150✔
1225
                if (!*cancelled) {
50✔
1226
                        CallErrorHandler(ec, request_, body_handler_);
92✔
1227
                }
1228
                return;
1229
        }
1230

1231
        // The num_read from above includes out of band payload data, such as chunk headers, which
1232
        // we are not interested in. So we need to calculate the payload size from the remaining
1233
        // buffer space.
1234
        size_t payload_read =
1235
                response_data_.last_buffer_size_ - response_data_.http_response_parser_->get().body().size;
4,516✔
1236

1237
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
4,516✔
1238
        size_t smallest = min(payload_read, buf_size);
4,516✔
1239

1240
        if (smallest == 0) {
4,516✔
1241
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1242
                // return 0 to the handler however, because in `io::Reader` context this means
1243
                // EOF. So just repeat the request instead, until we get actual payload data.
1244
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
462✔
1245
        } else {
1246
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
4,285✔
1247
                reader_handler_(smallest);
8,570✔
1248
        }
1249
}
1250

1251
void Client::Cancel() {
203✔
1252
        auto cancelled = cancelled_;
1253

1254
        if (!*cancelled) {
203✔
1255
                auto err =
1256
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
40✔
1257
                switch (status_) {
20✔
1258
                case TransactionStatus::None:
3✔
1259
                        CallErrorHandler(err, request_, header_handler_);
3✔
1260
                        break;
3✔
1261
                case TransactionStatus::HeaderHandlerCalled:
16✔
1262
                case TransactionStatus::ReaderCreated:
1263
                case TransactionStatus::BodyReadingInProgress:
1264
                case TransactionStatus::BodyReadingFinished:
1265
                        CallErrorHandler(err, request_, body_handler_);
16✔
1266
                        break;
16✔
1267
                case TransactionStatus::Replying:
1268
                case TransactionStatus::SwitchingProtocol:
1269
                        // Not used by client.
1270
                        assert(false);
1271
                        break;
1272
                case TransactionStatus::BodyHandlerCalled:
1273
                case TransactionStatus::Done:
1274
                        break;
1275
                }
1276
        }
1277

1278
        if (!*cancelled) {
203✔
1279
                DoCancel();
1✔
1280
        }
1281
}
203✔
1282

1283
void Client::DoCancel() {
613✔
1284
        resolver_.cancel();
613✔
1285
        if (stream_) {
613✔
1286
                stream_->lowest_layer().cancel();
262✔
1287
                stream_->lowest_layer().close();
262✔
1288
                stream_.reset();
262✔
1289
        }
1290

1291
        // Reset logger to no connection.
1292
        logger_ = log::Logger(logger_name_);
613✔
1293

1294
        // Set cancel state and then make a new one. Those who are interested should have their own
1295
        // pointer to the old one.
1296
        *cancelled_ = true;
613✔
1297
        cancelled_ = make_shared<bool>(true);
613✔
1298
}
613✔
1299

1300
Stream::Stream(Server &server) :
444✔
1301
        server_ {server},
1302
        logger_ {"http"},
1303
        cancelled_(make_shared<bool>(true)),
444✔
1304
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
444✔
1305
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,332✔
1306
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
888✔
1307

1308
        // This is equivalent to:
1309
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1310
        // but compatible with Boost 1.67.
1311
        request_data_.request_buffer_->prepare(
1312
                body_buffer_.size() - request_data_.request_buffer_->size());
444✔
1313

1314
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
888✔
1315

1316
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1317
        // they do, they should be handled higher up in the application logic.
1318
        //
1319
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1320
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1321
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1322
        // possible value instead.
1323
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1324
}
444✔
1325

1326
Stream::~Stream() {
1,332✔
1327
        DoCancel();
444✔
1328
}
444✔
1329

1330
void Stream::Cancel() {
7✔
1331
        auto cancelled = cancelled_;
1332

1333
        if (!*cancelled) {
7✔
1334
                auto err =
1335
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1336
                switch (status_) {
7✔
1337
                case TransactionStatus::None:
×
1338
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1339
                        break;
×
1340
                case TransactionStatus::HeaderHandlerCalled:
5✔
1341
                case TransactionStatus::ReaderCreated:
1342
                case TransactionStatus::BodyReadingInProgress:
1343
                case TransactionStatus::BodyReadingFinished:
1344
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1345
                        break;
5✔
1346
                case TransactionStatus::BodyHandlerCalled:
×
1347
                        // In between body handler and reply finished. No one to handle the status
1348
                        // here.
1349
                        server_.RemoveStream(shared_from_this());
×
1350
                        break;
×
1351
                case TransactionStatus::Replying:
1✔
1352
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1353
                        break;
1✔
1354
                case TransactionStatus::SwitchingProtocol:
1✔
1355
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1356
                        break;
1✔
1357
                case TransactionStatus::Done:
1358
                        break;
1359
                }
1360
        }
1361

1362
        if (!*cancelled) {
7✔
1363
                DoCancel();
×
1364
        }
1365
}
7✔
1366

1367
void Stream::DoCancel() {
806✔
1368
        if (socket_.is_open()) {
806✔
1369
                socket_.cancel();
219✔
1370
                socket_.close();
219✔
1371
        }
1372

1373
        // Set cancel state and then make a new one. Those who are interested should have their own
1374
        // pointer to the old one.
1375
        *cancelled_ = true;
806✔
1376
        cancelled_ = make_shared<bool>(true);
806✔
1377
}
806✔
1378

1379
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1380
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1381
}
×
1382

1383
void Stream::CallErrorHandler(
×
1384
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1385
        status_ = TransactionStatus::Done;
×
1386
        DoCancel();
×
1387
        handler(expected::unexpected(err.WithContext(
×
1388
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1389

1390
        server_.RemoveStream(shared_from_this());
×
1391
}
×
1392

1393
void Stream::CallErrorHandler(
2✔
1394
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1395
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1396
}
2✔
1397

1398
void Stream::CallErrorHandler(
8✔
1399
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1400
        status_ = TransactionStatus::Done;
8✔
1401
        DoCancel();
8✔
1402
        handler(
8✔
1403
                req,
1404
                err.WithContext(
8✔
1405
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
24✔
1406

1407
        server_.RemoveStream(shared_from_this());
8✔
1408
}
8✔
1409

1410
void Stream::CallErrorHandler(
4✔
1411
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1412
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1413
}
4✔
1414

1415
void Stream::CallErrorHandler(
7✔
1416
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1417
        status_ = TransactionStatus::Done;
7✔
1418
        DoCancel();
7✔
1419
        handler(err.WithContext(
14✔
1420
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1421

1422
        server_.RemoveStream(shared_from_this());
7✔
1423
}
7✔
1424

1425
void Stream::CallErrorHandler(
×
1426
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1427
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1428
}
×
1429

1430
void Stream::CallErrorHandler(
1✔
1431
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1432
        status_ = TransactionStatus::Done;
1✔
1433
        DoCancel();
1✔
1434
        handler(expected::unexpected(err.WithContext(
2✔
1435
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1436

1437
        server_.RemoveStream(shared_from_this());
1✔
1438
}
1✔
1439

1440
void Stream::AcceptHandler(const error_code &ec) {
227✔
1441
        if (ec) {
227✔
1442
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1443
                return;
×
1444
        }
1445

1446
        auto ip = socket_.remote_endpoint().address().to_string();
454✔
1447

1448
        // Use IP as context for logging.
1449
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
227✔
1450

1451
        logger_.Debug("Accepted connection.");
454✔
1452

1453
        request_.reset(new IncomingRequest(*this, cancelled_));
454✔
1454

1455
        request_->address_.host = ip;
227✔
1456

1457
        *cancelled_ = false;
227✔
1458

1459
        ReadHeader();
227✔
1460
}
1461

1462
void Stream::ReadHeader() {
227✔
1463
        auto &cancelled = cancelled_;
1464
        auto &request_data = request_data_;
227✔
1465

1466
        http::async_read_some(
454✔
1467
                socket_,
227✔
1468
                *request_data_.request_buffer_,
1469
                *request_data_.http_request_parser_,
1470
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
227✔
1471
                        if (!*cancelled) {
227✔
1472
                                ReadHeaderHandler(ec, num_read);
227✔
1473
                        }
1474
                });
227✔
1475
}
227✔
1476

1477
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
227✔
1478
        if (num_read > 0) {
227✔
1479
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
454✔
1480
        }
1481

1482
        if (ec) {
227✔
1483
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1484
                return;
187✔
1485
        }
1486

1487
        if (!request_data_.http_request_parser_->is_header_done()) {
227✔
1488
                ReadHeader();
×
1489
                return;
×
1490
        }
1491

1492
        auto method_result = BeastVerbToMethod(
1493
                request_data_.http_request_parser_->get().base().method(),
1494
                string {request_data_.http_request_parser_->get().base().method_string()});
454✔
1495
        if (!method_result) {
227✔
1496
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1497
                return;
×
1498
        }
1499
        request_->method_ = method_result.value();
227✔
1500
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
227✔
1501

1502
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
227✔
1503

1504
        string debug_str;
1505
        for (auto header = request_data_.http_request_parser_->get().cbegin();
620✔
1506
                 header != request_data_.http_request_parser_->get().cend();
847✔
1507
                 header++) {
1508
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,860✔
1509
                if (logger_.Level() >= log::LogLevel::Debug) {
620✔
1510
                        debug_str += string {header->name_string()};
535✔
1511
                        debug_str += ": ";
535✔
1512
                        debug_str += string {header->value()};
535✔
1513
                        debug_str += "\n";
535✔
1514
                }
1515
        }
1516

1517
        logger_.Debug("Received headers:\n" + debug_str);
454✔
1518
        debug_str.clear();
1519

1520
        if (GetContentLength(*request_data_.http_request_parser_) == 0
227✔
1521
                && !request_data_.http_request_parser_->chunked()) {
227✔
1522
                auto cancelled = cancelled_;
1523
                status_ = TransactionStatus::HeaderHandlerCalled;
186✔
1524
                server_.header_handler_(request_);
372✔
1525
                if (!*cancelled) {
186✔
1526
                        status_ = TransactionStatus::BodyHandlerCalled;
186✔
1527
                        CallBodyHandler();
186✔
1528
                }
1529
                return;
1530
        }
1531

1532
        assert(!request_data_.http_request_parser_->is_done());
1533

1534
        auto cancelled = cancelled_;
1535
        status_ = TransactionStatus::HeaderHandlerCalled;
41✔
1536
        server_.header_handler_(request_);
82✔
1537
        if (*cancelled) {
41✔
1538
                return;
1539
        }
1540

1541
        // We know that a body reader is required here, because of the check for body above.
1542
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
40✔
1543
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1544
        }
1545
}
1546

1547
void Stream::AsyncReadNextBodyPart(
2,264✔
1548
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1549
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1550

1551
        if (status_ == TransactionStatus::ReaderCreated) {
2,264✔
1552
                status_ = TransactionStatus::BodyReadingInProgress;
39✔
1553
        }
1554

1555
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,264✔
1556
                auto cancelled = cancelled_;
1557
                handler(0);
66✔
1558
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
33✔
1559
                        status_ = TransactionStatus::BodyHandlerCalled;
33✔
1560
                        CallBodyHandler();
33✔
1561
                }
1562
                return;
1563
        }
1564

1565
        reader_buf_start_ = start;
2,231✔
1566
        reader_buf_end_ = end;
2,231✔
1567
        reader_handler_ = handler;
2,231✔
1568
        size_t read_size = end - start;
2,231✔
1569
        size_t smallest = min(body_buffer_.size(), read_size);
3,287✔
1570

1571
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,231✔
1572
        request_data_.http_request_parser_->get().body().size = smallest;
2,231✔
1573
        request_data_.last_buffer_size_ = smallest;
2,231✔
1574

1575
        auto &cancelled = cancelled_;
1576
        auto &request_data = request_data_;
2,231✔
1577

1578
        http::async_read_some(
4,462✔
1579
                socket_,
2,231✔
1580
                *request_data_.request_buffer_,
1581
                *request_data_.http_request_parser_,
1582
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,231✔
1583
                        if (!*cancelled) {
2,231✔
1584
                                ReadBodyHandler(ec, num_read);
2,231✔
1585
                        }
1586
                });
2,231✔
1587
}
1588

1589
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,231✔
1590
        if (num_read > 0) {
2,231✔
1591
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,454✔
1592
        }
1593

1594
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,231✔
1595
                // This can be ignored. We always reset the buffer between reads anyway.
1596
                ec = error_code();
979✔
1597
        }
1598

1599
        assert(reader_handler_);
1600

1601
        if (request_data_.http_request_parser_->is_done()) {
2,231✔
1602
                status_ = TransactionStatus::BodyReadingFinished;
33✔
1603
        }
1604

1605
        auto cancelled = cancelled_;
1606

1607
        if (ec) {
2,231✔
1608
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1609
                reader_handler_(expected::unexpected(err));
12✔
1610
                if (!*cancelled) {
4✔
1611
                        CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1612
                }
1613
                return;
1614
        }
1615

1616
        // The num_read from above includes out of band payload data, such as chunk headers, which
1617
        // we are not interested in. So we need to calculate the payload size from the remaining
1618
        // buffer space.
1619
        size_t payload_read =
1620
                request_data_.last_buffer_size_ - request_data_.http_request_parser_->get().body().size;
2,227✔
1621

1622
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,227✔
1623
        size_t smallest = min(payload_read, buf_size);
2,227✔
1624

1625
        if (smallest == 0) {
2,227✔
1626
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1627
                // return 0 to the handler however, because in `io::Reader` context this means
1628
                // EOF. So just repeat the request instead, until we get actual payload data.
1629
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1630
        } else {
1631
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,150✔
1632
                reader_handler_(smallest);
4,300✔
1633
        }
1634
}
1635

1636
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
204✔
1637
        SetupResponse();
204✔
1638

1639
        reply_finished_handler_ = reply_finished_handler;
204✔
1640

1641
        auto &cancelled = cancelled_;
1642
        auto &response_data = response_data_;
204✔
1643

1644
        http::async_write_header(
408✔
1645
                socket_,
204✔
1646
                *response_data_.http_response_serializer_,
1647
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
204✔
1648
                        if (!*cancelled) {
204✔
1649
                                WriteHeaderHandler(ec, num_written);
203✔
1650
                        }
1651
                });
204✔
1652
}
204✔
1653

1654
void Stream::SetupResponse() {
213✔
1655
        auto response = maybe_response_.lock();
213✔
1656
        // Only called from existing responses, so this should always be true.
1657
        assert(response);
1658

1659
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1660
        status_ = TransactionStatus::Replying;
213✔
1661

1662
        // From here on we take shared ownership.
1663
        response_ = response;
1664

1665
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
426✔
1666

1667
        for (const auto &header : response->headers_) {
442✔
1668
                response_data_.http_response_->base().set(header.first, header.second);
229✔
1669
        }
1670

1671
        response_data_.http_response_->result(response->GetStatusCode());
213✔
1672
        response_data_.http_response_->reason(response->GetStatusMessage());
426✔
1673

1674
        response_data_.http_response_serializer_ =
1675
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
426✔
1676
}
213✔
1677

1678
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
203✔
1679
        if (num_written > 0) {
203✔
1680
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
406✔
1681
        }
1682

1683
        if (ec) {
203✔
1684
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1685
                return;
37✔
1686
        }
1687

1688
        auto exp_has_body =
1689
                HasBody(response_->GetHeader("Content-Length"), response_->GetHeader("Transfer-Encoding"));
406✔
1690
        if (!exp_has_body) {
203✔
1691
                CallErrorHandler(exp_has_body.error(), request_, reply_finished_handler_);
×
1692
                return;
×
1693
        }
1694
        if (!exp_has_body.value()) {
203✔
1695
                FinishReply();
36✔
1696
                return;
1697
        }
1698

1699
        if (!response_->body_reader_ && !response_->async_body_reader_) {
167✔
1700
                auto err = MakeError(BodyMissingError, "No body reader");
2✔
1701
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1702
                return;
1703
        }
1704

1705
        PrepareAndWriteNewBodyBuffer();
166✔
1706
}
1707

1708
void Stream::PrepareAndWriteNewBodyBuffer() {
2,227✔
1709
        // response_->body_reader_ XOR response_->async_body_reader_
1710
        assert(
1711
                (response_->body_reader_ || response_->async_body_reader_)
1712
                && !(response_->body_reader_ && response_->async_body_reader_));
1713

1714
        auto read_handler = [this](io::ExpectedSize read) {
2,228✔
1715
                if (!read) {
2,227✔
1716
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1717
                        return;
1✔
1718
                }
1719
                WriteNewBodyBuffer(read.value());
2,226✔
1720
        };
2,227✔
1721

1722
        if (response_->body_reader_) {
2,227✔
1723
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,906✔
1724
        } else {
1725
                auto err = response_->async_body_reader_->AsyncRead(
1726
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1727
                if (err != error::NoError) {
274✔
1728
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1729
                }
1730
        }
1731
}
2,227✔
1732

1733
void Stream::WriteNewBodyBuffer(size_t size) {
2,226✔
1734
        response_data_.http_response_->body().data = body_buffer_.data();
2,226✔
1735
        response_data_.http_response_->body().size = size;
2,226✔
1736

1737
        if (size > 0) {
2,226✔
1738
                response_data_.http_response_->body().more = true;
2,097✔
1739
        } else {
1740
                response_data_.http_response_->body().more = false;
129✔
1741
        }
1742

1743
        WriteBody();
2,226✔
1744
}
2,226✔
1745

1746
void Stream::WriteBody() {
4,305✔
1747
        auto &cancelled = cancelled_;
1748
        auto &response_data = response_data_;
4,305✔
1749

1750
        http::async_write_some(
8,610✔
1751
                socket_,
4,305✔
1752
                *response_data_.http_response_serializer_,
1753
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
4,265✔
1754
                        if (!*cancelled) {
4,265✔
1755
                                WriteBodyHandler(ec, num_written);
4,265✔
1756
                        }
1757
                });
4,265✔
1758
}
4,305✔
1759

1760
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
4,265✔
1761
        if (num_written > 0) {
4,265✔
1762
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
4,158✔
1763
        }
1764

1765
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,265✔
1766
                // Write next body block.
1767
                PrepareAndWriteNewBodyBuffer();
2,061✔
1768
        } else if (ec) {
2,204✔
1769
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1770
        } else if (num_written > 0) {
2,200✔
1771
                // We are still writing the body.
1772
                WriteBody();
2,079✔
1773
        } else {
1774
                // We are finished.
1775
                FinishReply();
121✔
1776
        }
1777
}
4,265✔
1778

1779
void Stream::FinishReply() {
157✔
1780
        // We are done.
1781
        status_ = TransactionStatus::Done;
157✔
1782
        DoCancel();
157✔
1783
        // Release ownership of Body reader.
1784
        response_->body_reader_.reset();
157✔
1785
        response_->async_body_reader_.reset();
157✔
1786
        reply_finished_handler_(error::NoError);
157✔
1787
        server_.RemoveStream(shared_from_this());
157✔
1788
}
157✔
1789

1790
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1791
        SetupResponse();
9✔
1792

1793
        switch_protocol_handler_ = handler;
9✔
1794
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1795

1796
        auto &cancelled = cancelled_;
1797
        auto &response_data = response_data_;
9✔
1798

1799
        http::async_write_header(
18✔
1800
                socket_,
9✔
1801
                *response_data_.http_response_serializer_,
1802
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1803
                        if (!*cancelled) {
9✔
1804
                                SwitchingProtocolHandler(ec, num_written);
8✔
1805
                        }
1806
                });
9✔
1807

1808
        return error::NoError;
9✔
1809
}
1810

1811
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1812
        if (num_written > 0) {
8✔
1813
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1814
        }
1815

1816
        if (ec) {
8✔
1817
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1818
                return;
×
1819
        }
1820

1821
        auto socket = make_shared<RawSocket<tcp::socket>>(
1822
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1823

1824
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1825

1826
        // Rest of the connection is done directly on the socket, set cancelled_ but don't close it.
1827
        *cancelled_ = true;
8✔
1828
        cancelled_ = make_shared<bool>(true);
8✔
1829
        server_.RemoveStream(shared_from_this());
16✔
1830

1831
        switch_protocol_handler(socket);
16✔
1832
}
1833

1834
void Stream::CallBodyHandler() {
219✔
1835
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1836
        // it immediately destroys, which would destroy this stream as well. At the end of this
1837
        // function, it's ok to destroy it.
1838
        auto stream_ref = shared_from_this();
1839

1840
        server_.body_handler_(request_, error::NoError);
657✔
1841

1842
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1843
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1844
        // request has not been handled correctly.
1845
        auto response = maybe_response_.lock();
219✔
1846
        if (!response) {
219✔
1847
                logger_.Error("Handler produced no response. Closing stream prematurely.");
6✔
1848
                *cancelled_ = true;
3✔
1849
                cancelled_ = make_shared<bool>(true);
3✔
1850
                server_.RemoveStream(shared_from_this());
9✔
1851
        }
1852
}
219✔
1853

1854
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
238✔
1855
        event_loop_ {event_loop},
1856
        acceptor_(GetAsioIoContext(event_loop_)) {
429✔
1857
}
238✔
1858

1859
Server::~Server() {
476✔
1860
        Cancel();
238✔
1861
}
238✔
1862

1863
error::Error Server::AsyncServeUrl(
203✔
1864
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1865
        return AsyncServeUrl(
1866
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
826✔
1867
                        if (err != error::NoError) {
214✔
1868
                                body_handler(expected::unexpected(err));
12✔
1869
                        } else {
1870
                                body_handler(req);
416✔
1871
                        }
1872
                });
620✔
1873
}
1874

1875
error::Error Server::AsyncServeUrl(
218✔
1876
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1877
        auto err = BreakDownUrl(url, address_);
218✔
1878
        if (error::NoError != err) {
218✔
1879
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1880
        }
1881

1882
        if (address_.protocol != "http") {
218✔
1883
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1884
        }
1885

1886
        if (address_.path.size() > 0 && address_.path != "/") {
218✔
1887
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1888
        }
1889

1890
        boost::system::error_code ec;
217✔
1891
        auto address = asio::ip::make_address(address_.host, ec);
217✔
1892
        if (ec) {
217✔
1893
                return error::Error(
1894
                        ec.default_error_condition(),
×
1895
                        "Could not construct endpoint from address " + address_.host);
×
1896
        }
1897

1898
        asio::ip::tcp::endpoint endpoint(address, address_.port);
217✔
1899

1900
        ec.clear();
1901
        acceptor_.open(endpoint.protocol(), ec);
217✔
1902
        if (ec) {
217✔
1903
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1904
        }
1905

1906
        // Allow address reuse, otherwise we can't re-bind later.
1907
        ec.clear();
1908
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
217✔
1909
        if (ec) {
217✔
1910
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1911
        }
1912

1913
        ec.clear();
1914
        acceptor_.bind(endpoint, ec);
217✔
1915
        if (ec) {
217✔
1916
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1917
        }
1918

1919
        ec.clear();
1920
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
217✔
1921
        if (ec) {
217✔
1922
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1923
        }
1924

1925
        header_handler_ = header_handler;
217✔
1926
        body_handler_ = body_handler;
217✔
1927

1928
        PrepareNewStream();
217✔
1929

1930
        return error::NoError;
217✔
1931
}
1932

1933
void Server::Cancel() {
258✔
1934
        if (acceptor_.is_open()) {
258✔
1935
                acceptor_.cancel();
217✔
1936
                acceptor_.close();
217✔
1937
        }
1938
        streams_.clear();
1939
}
258✔
1940

1941
uint16_t Server::GetPort() const {
17✔
1942
        return acceptor_.local_endpoint().port();
17✔
1943
}
1944

1945
string Server::GetUrl() const {
16✔
1946
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1947
}
1948

1949
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
218✔
1950
        if (*req->cancelled_) {
218✔
1951
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1952
        }
1953
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
436✔
1954
        req->stream_.maybe_response_ = response;
218✔
1955
        return response;
218✔
1956
}
1957

1958
error::Error Server::AsyncReply(
204✔
1959
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1960
        if (*resp->cancelled_) {
204✔
1961
                return MakeError(StreamCancelledError, "Cannot send response");
×
1962
        }
1963

1964
        resp->stream_.AsyncReply(reply_finished_handler);
204✔
1965
        return error::NoError;
204✔
1966
}
1967

1968
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
58✔
1969
        if (*req->cancelled_) {
58✔
1970
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1971
        }
1972

1973
        auto &stream = req->stream_;
58✔
1974
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
58✔
1975
                return expected::unexpected(error::Error(
1✔
1976
                        make_error_condition(errc::operation_in_progress),
2✔
1977
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1978
        }
1979

1980
        if (GetContentLength(*stream.request_data_.http_request_parser_) == 0
57✔
1981
                && !stream.request_data_.http_request_parser_->chunked()) {
57✔
1982
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
54✔
1983
        }
1984

1985
        stream.status_ = TransactionStatus::ReaderCreated;
39✔
1986
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
78✔
1987
}
1988

1989
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1990
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1991
}
1992

1993
void Server::PrepareNewStream() {
444✔
1994
        StreamPtr new_stream {new Stream(*this)};
444✔
1995
        streams_.insert(new_stream);
1996
        AsyncAccept(new_stream);
888✔
1997
}
444✔
1998

1999
void Server::AsyncAccept(StreamPtr stream) {
444✔
2000
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
674✔
2001
                if (ec) {
230✔
2002
                        if (ec != errc::operation_canceled) {
3✔
2003
                                log::Error("Could not accept connection: " + ec.message());
×
2004
                        }
2005
                        return;
3✔
2006
                }
2007

2008
                stream->AcceptHandler(ec);
227✔
2009

2010
                this->PrepareNewStream();
227✔
2011
        });
2012
}
444✔
2013

2014
void Server::RemoveStream(StreamPtr stream) {
189✔
2015
        streams_.erase(stream);
189✔
2016

2017
        stream->DoCancel();
189✔
2018
}
189✔
2019

2020
} // namespace http
2021
} // namespace common
2022
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc