• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1379134403

18 Jul 2024 03:29PM UTC coverage: 75.624% (+0.05%) from 75.571%
1379134403

push

gitlab-ci

vpodzime
fix: Enable support for user:password@host in proxy connections

Ticket: MEN-7402
Changelog: Basic authentication (https://user:password@host/) is now supported for proxy URLs and connections
Signed-off-by: Vratislav Podzimek <vratislav.podzimek@northern.tech>

14 of 17 new or added lines in 1 file covered. (82.35%)

83 existing lines in 1 file now uncovered.

7089 of 9374 relevant lines covered (75.62%)

11630.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.39
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
namespace mender {
28
namespace common {
29
namespace http {
30

31
namespace common = mender::common;
32
namespace crypto = mender::common::crypto;
33

34
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
35
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
36
const unsigned int BeastHttpVersion = 11;
37

38
namespace asio = boost::asio;
39
namespace http = boost::beast::http;
40

41
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
42

43
static http::verb MethodToBeastVerb(Method method) {
246✔
44
        switch (method) {
246✔
45
        case Method::GET:
46
                return http::verb::get;
47
        case Method::HEAD:
48
                return http::verb::head;
49
        case Method::POST:
50
                return http::verb::post;
51
        case Method::PUT:
52
                return http::verb::put;
53
        case Method::PATCH:
54
                return http::verb::patch;
55
        case Method::CONNECT:
56
                return http::verb::connect;
57
        case Method::Invalid:
58
                // Fallthrough to end (no-op).
59
                break;
60
        }
61
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
62
        // still assert here for safety.
63
        assert(false);
64
        return http::verb::get;
65
}
66

67
static expected::expected<Method, error::Error> BeastVerbToMethod(
225✔
68
        http::verb verb, const string &verb_string) {
69
        switch (verb) {
225✔
70
        case http::verb::get:
189✔
71
                return Method::GET;
72
        case http::verb::head:
×
73
                return Method::HEAD;
74
        case http::verb::post:
13✔
75
                return Method::POST;
76
        case http::verb::put:
23✔
77
                return Method::PUT;
78
        case http::verb::patch:
×
79
                return Method::PATCH;
80
        case http::verb::connect:
×
81
                return Method::CONNECT;
82
        default:
×
83
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
84
        }
85
}
86

87
template <typename StreamType>
88
class BodyAsyncReader : virtual public io::AsyncReader {
89
public:
90
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
153✔
91
                stream_ {stream},
92
                cancelled_ {cancelled} {
306✔
93
        }
153✔
94
        ~BodyAsyncReader() {
39✔
95
                Cancel();
39✔
96
        }
78✔
97

98
        error::Error AsyncRead(
2,187✔
99
                vector<uint8_t>::iterator start,
100
                vector<uint8_t>::iterator end,
101
                io::AsyncIoHandler handler) override {
102
                if (eof_) {
2,187✔
103
                        handler(0);
×
104
                        return error::NoError;
×
105
                }
106

107
                if (*cancelled_) {
2,187✔
108
                        return error::MakeError(
×
109
                                error::ProgrammingError,
110
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
111
                }
112
                stream_.AsyncReadNextBodyPart(start, end, [this, handler](io::ExpectedSize size) {
12,829✔
113
                        if (size && size.value() == 0) {
6,322✔
114
                                eof_ = true;
125✔
115
                        }
116
                        handler(size);
12,644✔
117
                });
118
                return error::NoError;
2,187✔
119
        }
120

121
        void Cancel() override {
39✔
122
                if (!*cancelled_) {
39✔
123
                        stream_.Cancel();
4✔
124
                }
125
        }
39✔
126

127
private:
128
        StreamType &stream_;
129
        shared_ptr<bool> cancelled_;
130
        bool eof_ {false};
131

132
        friend class Client;
133
        friend class Server;
134
};
135

136
template <typename StreamType>
137
class RawSocket : virtual public io::AsyncReadWriter {
138
public:
139
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
140
                destroying_ {make_shared<bool>(false)},
×
141
                stream_ {stream},
142
                buffered_ {buffered} {
×
143
                // If there are no buffered bytes, then we don't need it.
144
                if (buffered_ && buffered_->size() == 0) {
×
145
                        buffered_.reset();
×
146
                }
147
        }
×
148

149
        ~RawSocket() {
15✔
150
                *destroying_ = true;
15✔
151
                Cancel();
15✔
152
        }
30✔
153

154
        error::Error AsyncRead(
320✔
155
                vector<uint8_t>::iterator start,
156
                vector<uint8_t>::iterator end,
157
                io::AsyncIoHandler handler) override {
158
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
159
                // header and parts of the body in one block, return those first.
160
                if (buffered_) {
320✔
161
                        return DrainPrebufferedData(start, end, handler);
8✔
162
                }
163

164
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
165
                auto &destroying = destroying_;
166
                stream_->async_read_some(
632✔
167
                        read_buffer_,
316✔
168
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
169
                                if (*destroying) {
313✔
170
                                        return;
171
                                }
172

173
                                if (ec == asio::error::operation_aborted) {
313✔
174
                                        handler(expected::unexpected(error::Error(
12✔
175
                                                make_error_condition(errc::operation_canceled),
6✔
176
                                                "Could not read from socket")));
177
                                } else if (ec) {
310✔
178
                                        handler(expected::unexpected(
12✔
179
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
180
                                } else {
181
                                        handler(num_read);
608✔
182
                                }
183
                        });
184
                return error::NoError;
316✔
185
        }
186

187
        error::Error AsyncWrite(
309✔
188
                vector<uint8_t>::const_iterator start,
189
                vector<uint8_t>::const_iterator end,
190
                io::AsyncIoHandler handler) override {
191
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
192
                auto &destroying = destroying_;
193
                stream_->async_write_some(
618✔
194
                        write_buffer_,
309✔
195
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
196
                                if (*destroying) {
306✔
197
                                        return;
198
                                }
199

200
                                if (ec == asio::error::operation_aborted) {
306✔
201
                                        handler(expected::unexpected(error::Error(
×
202
                                                make_error_condition(errc::operation_canceled),
×
203
                                                "Could not write to socket")));
204
                                } else if (ec) {
306✔
205
                                        handler(expected::unexpected(
×
206
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
207
                                } else {
208
                                        handler(num_written);
612✔
209
                                }
210
                        });
211
                return error::NoError;
309✔
212
        }
213

214
        void Cancel() override {
28✔
215
                if (stream_->lowest_layer().is_open()) {
28✔
216
                        stream_->lowest_layer().cancel();
15✔
217
                        stream_->lowest_layer().close();
15✔
218
                }
219
        }
28✔
220

221
private:
222
        error::Error DrainPrebufferedData(
4✔
223
                vector<uint8_t>::iterator start,
224
                vector<uint8_t>::iterator end,
225
                io::AsyncIoHandler handler) {
226
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
227

228
                // These two lines are equivalent to:
229
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
230
                // but compatible with Boost 1.67.
231
                const beast::flat_buffer &cbuffered = *buffered_;
232
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
233
                buffered_->consume(to_copy);
4✔
234
                if (buffered_->size() == 0) {
4✔
235
                        // We don't need it anymore.
236
                        buffered_.reset();
4✔
237
                }
238
                handler(to_copy);
4✔
239
                return error::NoError;
4✔
240
        }
241

242
        shared_ptr<bool> destroying_;
243
        shared_ptr<StreamType> stream_;
244
        shared_ptr<beast::flat_buffer> buffered_;
245
        asio::mutable_buffer read_buffer_;
246
        asio::const_buffer write_buffer_;
247
};
248

249
template <typename PARSER>
250
size_t GetContentLength(const PARSER &parser) {
398✔
251
        auto content_length = parser.content_length();
398✔
252
        if (content_length) {
398✔
253
                return content_length.value();
351✔
254
        } else {
255
                return 0;
256
        }
257
}
258

259
expected::ExpectedBool HasBody(
447✔
260
        const expected::ExpectedString &content_length,
261
        const expected::ExpectedString &transfer_encoding) {
262
        if (transfer_encoding) {
447✔
263
                if (transfer_encoding.value() != "chunked") {
2✔
264
                        return expected::unexpected(error::Error(
×
265
                                make_error_condition(errc::not_supported),
×
266
                                "Unsupported Transfer-Encoding: " + transfer_encoding.value()));
×
267
                }
268
                return true;
269
        }
270

271
        if (content_length) {
445✔
272
                auto length = common::StringToLongLong(content_length.value());
219✔
273
                if (!length || length.value() < 0) {
219✔
274
                        return expected::unexpected(error::Error(
×
275
                                length.error().code,
276
                                "Content-Length contains invalid number: " + content_length.value()));
×
277
                }
278
                return length.value() > 0;
219✔
279
        }
280

281
        return false;
282
}
283

284
Client::Client(
353✔
285
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
286
        event_loop_ {event_loop},
287
        logger_name_ {logger_name},
288
        client_config_ {client},
289
        http_proxy_ {client.http_proxy},
353✔
290
        https_proxy_ {client.https_proxy},
353✔
291
        no_proxy_ {client.no_proxy},
353✔
292
        cancelled_ {make_shared<bool>(true)},
×
293
        resolver_(GetAsioIoContext(event_loop)),
294
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,412✔
295
}
353✔
296

297
Client::~Client() {
2,118✔
298
        if (!*cancelled_) {
353✔
299
                logger_.Warning("Client destroyed while request is still active!");
30✔
300
        }
301
        DoCancel();
353✔
302
}
353✔
303

304
error::Error Client::Initialize() {
284✔
305
        if (initialized_) {
284✔
306
                return error::NoError;
71✔
307
        }
308

309
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
631✔
310
                ssl_ctx_[i].set_verify_mode(
842✔
311
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
312

313
                beast::error_code ec {};
422✔
314
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
422✔
315
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
316
                        ssl_ctx_[i].use_certificate_file(
317
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
318
                        if (ec) {
4✔
319
                                return error::Error(
320
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
321
                        }
322
                        auto exp_key = crypto::PrivateKey::Load(
323
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
324
                        if (!exp_key) {
3✔
325
                                return exp_key.error().WithContext(
326
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
327
                        }
328

329
                        const int ret =
330
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value().Get());
2✔
331
                        if (ret != 1) {
2✔
332
                                return MakeError(
333
                                        HTTPInitError,
334
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
335
                                                + " to the SSL CTX");
×
336
                        }
337
                } else if (
338
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
418✔
339
                        return error::Error(
340
                                make_error_condition(errc::invalid_argument),
4✔
341
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
342
                }
343

344
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
418✔
345
                if (ec) {
418✔
346
                        auto err = error::Error(
347
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
348
                        if (client_config_.server_cert_path == "") {
×
349
                                // We aren't going to have any valid certificates then.
350
                                return err;
×
351
                        } else {
352
                                // We have a dedicated certificate, so this is not fatal.
353
                                log::Info(err.String());
×
354
                        }
355
                }
356
                if (client_config_.server_cert_path != "") {
418✔
357
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
52✔
358
                        if (ec) {
52✔
359
                                log::Warning("Failed to load the server certificate! Falling back to the CA store");
4✔
360
                        }
361
                }
362
        }
363

364
        initialized_ = true;
209✔
365

366
        return error::NoError;
209✔
367
}
368

369
// Create the HOST header according to:
370
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.23
371
// In short: Add the port-number if it is non-standard HTTP
372
static string CreateHOSTAddress(OutgoingRequestPtr req) {
272✔
373
        if (req->GetPort() == 80 || req->GetPort() == 443) {
272✔
374
                return req->GetHost();
4✔
375
        }
376
        return req->GetHost() + ":" + to_string(req->GetPort());
536✔
377
}
378

379
error::Error Client::AsyncCall(
284✔
380
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
381
        auto err = Initialize();
284✔
382
        if (err != error::NoError) {
284✔
383
                return err;
4✔
384
        }
385

386
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
280✔
387
                return error::Error(
388
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
389
        }
390

391
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
280✔
392
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
393
        }
394

395
        if (!header_handler || !body_handler) {
278✔
396
                return error::MakeError(
397
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
398
        }
399

400
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
277✔
401
                return error::Error(
402
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
403
        }
404

405
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
276✔
406

407
        request_ = req;
408

409
        err = HandleProxySetup();
276✔
410
        if (err != error::NoError) {
276✔
411
                return err;
4✔
412
        }
413

414
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
415
        // request to route to our k8s cluster. Set this in all cases.
416
        const string header_url = CreateHOSTAddress(req);
544✔
417
        req->SetHeader("HOST", header_url);
544✔
418

419
        log::Trace("Setting HOST address: " + header_url);
272✔
420

421
        header_handler_ = header_handler;
272✔
422
        body_handler_ = body_handler;
272✔
423
        status_ = TransactionStatus::None;
272✔
424

425
        cancelled_ = make_shared<bool>(false);
272✔
426

427
        auto &cancelled = cancelled_;
428

429
        resolver_.async_resolve(
544✔
430
                request_->address_.host,
431
                to_string(request_->address_.port),
544✔
432
                [this, cancelled](
540✔
433
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
267✔
434
                        if (!*cancelled) {
268✔
435
                                ResolveHandler(ec, results);
267✔
436
                        }
437
                });
268✔
438

439
        return error::NoError;
272✔
440
}
441

442
static inline error::Error AddProxyAuthHeader(OutgoingRequest &req, BrokenDownUrl &proxy_address) {
22✔
443
        if (proxy_address.username == "") {
22✔
444
                // nothing to do
445
                return error::NoError;
19✔
446
        }
447
        auto creds = proxy_address.username + ":" + proxy_address.password;
3✔
448
        auto ex_encoded_creds = crypto::EncodeBase64(common::ByteVectorFromString(creds));
6✔
449
        if (!ex_encoded_creds) {
3✔
NEW
450
                return ex_encoded_creds.error();
×
451
        }
452
        req.SetHeader("Proxy-Authorization", "Basic " + ex_encoded_creds.value());
6✔
453

454
        return error::NoError;
3✔
455
}
456

457
error::Error Client::HandleProxySetup() {
276✔
458
        secondary_req_.reset();
276✔
459

460
        if (request_->address_.protocol == "http") {
276✔
461
                socket_mode_ = SocketMode::Plain;
250✔
462

463
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
250✔
464
                        // Make a modified proxy request.
465
                        BrokenDownUrl proxy_address;
20✔
466
                        auto err = BreakDownUrl(http_proxy_, proxy_address, true);
11✔
467
                        if (err != error::NoError) {
11✔
468
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
469
                        }
470
                        if (proxy_address.path != "" && proxy_address.path != "/") {
10✔
471
                                return MakeError(
472
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
473
                        }
474

475
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
18✔
476
                                                                          + ":" + to_string(request_->address_.port)
27✔
477
                                                                          + request_->address_.path;
27✔
478
                        request_->address_.host = proxy_address.host;
9✔
479
                        request_->address_.port = proxy_address.port;
9✔
480
                        request_->address_.protocol = proxy_address.protocol;
9✔
481

482
                        err = AddProxyAuthHeader(*request_, proxy_address);
9✔
483
                        if (err != error::NoError) {
9✔
NEW
484
                                return err;
×
485
                        }
486

487
                        if (proxy_address.protocol == "https") {
9✔
488
                                socket_mode_ = SocketMode::Tls;
5✔
489
                        } else if (proxy_address.protocol == "http") {
4✔
490
                                socket_mode_ = SocketMode::Plain;
4✔
491
                        } else {
492
                                // Should never get here.
493
                                assert(false);
494
                        }
495
                }
496
        } else if (request_->address_.protocol == "https") {
26✔
497
                socket_mode_ = SocketMode::Tls;
26✔
498

499
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
26✔
500
                        // Save the original request for later, so that we can make a new request
501
                        // over the channel established by CONNECT.
502
                        secondary_req_ = std::move(request_);
503

504
                        request_ = make_shared<OutgoingRequest>();
30✔
505
                        request_->SetMethod(Method::CONNECT);
15✔
506
                        BrokenDownUrl proxy_address;
28✔
507
                        auto err = BreakDownUrl(https_proxy_, proxy_address, true);
15✔
508
                        if (err != error::NoError) {
15✔
509
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
510
                        }
511
                        if (proxy_address.path != "" && proxy_address.path != "/") {
14✔
512
                                return MakeError(
513
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
514
                        }
515

516
                        request_->address_.path =
517
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
26✔
518
                        request_->address_.host = proxy_address.host;
13✔
519
                        request_->address_.port = proxy_address.port;
13✔
520
                        request_->address_.protocol = proxy_address.protocol;
13✔
521

522
                        err = AddProxyAuthHeader(*request_, proxy_address);
13✔
523
                        if (err != error::NoError) {
13✔
NEW
524
                                return err;
×
525
                        }
526

527
                        if (proxy_address.protocol == "https") {
13✔
528
                                socket_mode_ = SocketMode::Tls;
7✔
529
                        } else if (proxy_address.protocol == "http") {
6✔
530
                                socket_mode_ = SocketMode::Plain;
6✔
531
                        } else {
532
                                // Should never get here.
533
                                assert(false);
534
                        }
535
                }
536
        } else {
537
                // Should never get here
538
                assert(false);
539
        }
540

541
        return error::NoError;
272✔
542
}
543

544
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
172✔
545
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
172✔
546
                return expected::unexpected(error::Error(
2✔
547
                        make_error_condition(errc::operation_in_progress),
4✔
548
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
549
        }
550

551
        if (GetContentLength(*response_data_.http_response_parser_) == 0
170✔
552
                && !response_data_.http_response_parser_->chunked()) {
170✔
553
                return expected::unexpected(
17✔
554
                        MakeError(BodyMissingError, "Response does not contain a body"));
51✔
555
        }
556

557
        status_ = TransactionStatus::ReaderCreated;
153✔
558
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
306✔
559
}
560

561
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
562
        if (*cancelled_) {
7✔
UNCOV
563
                return expected::unexpected(error::Error(
×
UNCOV
564
                        make_error_condition(errc::not_connected),
×
UNCOV
565
                        "Cannot switch protocols if endpoint is not connected"));
×
566
        }
567

568
        // Rest of the connection is done directly on the socket, we are done here.
569
        status_ = TransactionStatus::Done;
7✔
570
        *cancelled_ = true;
7✔
571
        cancelled_ = make_shared<bool>(false);
14✔
572

573
        auto stream = stream_;
574
        // This no longer belongs to us.
575
        stream_.reset();
7✔
576

577
        switch (socket_mode_) {
7✔
UNCOV
578
        case SocketMode::TlsTls:
×
UNCOV
579
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
UNCOV
580
                        stream, response_data_.response_buffer_);
×
UNCOV
581
        case SocketMode::Tls:
×
UNCOV
582
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
UNCOV
583
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
UNCOV
584
                        response_data_.response_buffer_);
×
585
        case SocketMode::Plain:
7✔
586
                return make_shared<RawSocket<tcp::socket>>(
7✔
587
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
588
                        response_data_.response_buffer_);
7✔
589
        }
590

UNCOV
591
        AssertOrReturnUnexpected(false);
×
592
}
593

594
void Client::CallHandler(ResponseHandler handler) {
355✔
595
        // This function exists to make sure we have a copy of the handler we're calling (in the
596
        // argument list). This is important in case the handler owns the client instance through a
597
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
598
        // does, then it destroys the final copy of the handler, and therefore also the client,
599
        // which is why we need to make a copy here, before calling it.
600
        handler(response_);
355✔
601
}
355✔
602

603
void Client::CallErrorHandler(
83✔
604
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
605
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
249✔
606
}
83✔
607

608
void Client::CallErrorHandler(
126✔
609
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
610
        status_ = TransactionStatus::Done;
126✔
611
        DoCancel();
126✔
612
        handler(expected::unexpected(
252✔
613
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
504✔
614
}
126✔
615

616
void Client::ResolveHandler(
267✔
617
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
618
        if (ec) {
267✔
UNCOV
619
                CallErrorHandler(ec, request_, header_handler_);
×
UNCOV
620
                return;
×
621
        }
622

623
        if (logger_.Level() >= log::LogLevel::Debug) {
267✔
624
                string ips = "[";
245✔
625
                string sep;
626
                for (auto r : results) {
1,044✔
627
                        ips += sep;
277✔
628
                        ips += r.endpoint().address().to_string();
277✔
629
                        sep = ", ";
277✔
630
                }
631
                ips += "]";
245✔
632
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
490✔
633
        }
634

635
        resolver_results_ = results;
636

637
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
267✔
638
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
534✔
639

640
        if (!response_data_.response_buffer_) {
267✔
641
                // We can reuse this if preexisting.
642
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
396✔
643

644
                // This is equivalent to:
645
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
646
                // but compatible with Boost 1.67.
647
                response_data_.response_buffer_->prepare(
648
                        body_buffer_.size() - response_data_.response_buffer_->size());
198✔
649
        }
650

651
        auto &cancelled = cancelled_;
652

653
        asio::async_connect(
267✔
654
                stream_->lowest_layer(),
655
                resolver_results_,
267✔
656
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
534✔
657
                        if (!*cancelled) {
267✔
658
                                switch (socket_mode_) {
267✔
UNCOV
659
                                case SocketMode::TlsTls:
×
660
                                        // Should never happen because we always need to handshake
661
                                        // the innermost Tls first, then the outermost, but the
662
                                        // latter doesn't happen here.
663
                                        assert(false);
UNCOV
664
                                        CallErrorHandler(
×
UNCOV
665
                                                error::MakeError(
×
UNCOV
666
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
UNCOV
667
                                                request_,
×
UNCOV
668
                                                header_handler_);
×
669
                                case SocketMode::Tls:
21✔
670
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
21✔
671
                                case SocketMode::Plain:
246✔
672
                                        return ConnectHandler(ec, endpoint);
246✔
673
                                }
674
                        }
675
                });
676
}
677

678
template <typename StreamType>
679
void Client::HandshakeHandler(
25✔
680
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
681
        if (ec) {
25✔
682
                CallErrorHandler(ec, request_, header_handler_);
2✔
683
                return;
2✔
684
        }
685

686
        // Enable TCP keepalive
687
        boost::asio::socket_base::keep_alive option(true);
688
        stream_->lowest_layer().set_option(option);
23✔
689

690
        // Set SNI Hostname (many hosts need this to handshake successfully)
691
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
23✔
692
                beast::error_code ec2 {
×
693
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
UNCOV
694
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
695
        }
696

697
        // Enable host name verification (not done automatically and we don't have
698
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
699
        // hence the callback that boost provides).
700
        boost::system::error_code b_ec;
23✔
701
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
46✔
702
        if (b_ec) {
23✔
UNCOV
703
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
UNCOV
704
                CallErrorHandler(b_ec, request_, header_handler_);
×
UNCOV
705
                return;
×
706
        }
707

708
        auto &cancelled = cancelled_;
709

710
        stream.async_handshake(
46✔
711
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
23✔
712
                        if (*cancelled) {
26✔
713
                                return;
714
                        }
715
                        if (ec) {
26✔
716
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
717
                                CallErrorHandler(ec, request_, header_handler_);
10✔
718
                                return;
10✔
719
                        }
720
                        logger_.Debug("https: Successful SSL handshake");
32✔
721
                        ConnectHandler(ec, endpoint);
16✔
722
                });
723
}
724

725

726
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
262✔
727
        if (ec) {
262✔
728
                CallErrorHandler(ec, request_, header_handler_);
16✔
729
                return;
16✔
730
        }
731

732
        // Enable TCP keepalive
733
        boost::asio::socket_base::keep_alive option(true);
734
        stream_->lowest_layer().set_option(option);
246✔
735

736
        logger_.Debug("Connected to " + endpoint.address().to_string());
492✔
737

738
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
246✔
739
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
492✔
740

741
        for (const auto &header : request_->headers_) {
646✔
742
                request_data_.http_request_->set(header.first, header.second);
400✔
743
        }
744

745
        request_data_.http_request_serializer_ =
746
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
246✔
747

748
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
492✔
749

750
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
751
        // if they do, they should be handled higher up in the application logic.
752
        //
753
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
754
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
755
        // `has_value()` in their code, causing their subsequent comparison operation to
756
        // misbehave. So pass highest possible value instead.
757
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
758

759
        auto &cancelled = cancelled_;
760
        auto &request_data = request_data_;
246✔
761

762
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
246✔
763
                if (!*cancelled) {
246✔
764
                        WriteHeaderHandler(ec, num_written);
246✔
765
                }
766
        };
492✔
767

768
        switch (socket_mode_) {
246✔
769
        case SocketMode::TlsTls:
2✔
770
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
2✔
771
                break;
772
        case SocketMode::Tls:
14✔
773
                http::async_write_header(
14✔
774
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
775
                break;
776
        case SocketMode::Plain:
230✔
777
                http::async_write_header(
230✔
778
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
779
                break;
780
        }
781
}
782

783
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
246✔
784
        if (num_written > 0) {
246✔
785
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
492✔
786
        }
787

788
        if (ec) {
246✔
UNCOV
789
                CallErrorHandler(ec, request_, header_handler_);
×
790
                return;
206✔
791
        }
792

793
        auto exp_has_body =
794
                HasBody(request_->GetHeader("Content-Length"), request_->GetHeader("Transfer-Encoding"));
492✔
795
        if (!exp_has_body) {
246✔
UNCOV
796
                CallErrorHandler(exp_has_body.error(), request_, header_handler_);
×
UNCOV
797
                return;
×
798
        }
799
        if (!exp_has_body.value()) {
246✔
800
                ReadHeader();
205✔
801
                return;
802
        }
803

804
        if (!request_->body_gen_ && !request_->async_body_gen_) {
41✔
805
                auto err = MakeError(BodyMissingError, "No body generator");
2✔
806
                CallErrorHandler(err, request_, header_handler_);
2✔
807
                return;
808
        }
809

810
        assert(!(request_->body_gen_ && request_->async_body_gen_));
811

812
        if (request_->body_gen_) {
40✔
813
                auto body_reader = request_->body_gen_();
34✔
814
                if (!body_reader) {
34✔
UNCOV
815
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
816
                        return;
817
                }
818
                request_->body_reader_ = body_reader.value();
34✔
819
        } else {
820
                auto body_reader = request_->async_body_gen_();
6✔
821
                if (!body_reader) {
6✔
822
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
823
                        return;
824
                }
825
                request_->async_body_reader_ = body_reader.value();
6✔
826
        }
827

828
        PrepareAndWriteNewBodyBuffer();
40✔
829
}
830

831
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,278✔
832
        if (num_written > 0) {
2,278✔
833
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,242✔
834
        }
835

836
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,278✔
837
                // Write next block of the body.
838
                PrepareAndWriteNewBodyBuffer();
1,120✔
839
        } else if (ec) {
1,158✔
840
                CallErrorHandler(ec, request_, header_handler_);
8✔
841
        } else if (num_written > 0) {
1,154✔
842
                // We are still writing the body.
843
                WriteBody();
1,121✔
844
        } else {
845
                // We are ready to receive the response.
846
                ReadHeader();
33✔
847
        }
848
}
2,278✔
849

850
void Client::PrepareAndWriteNewBodyBuffer() {
1,160✔
851
        // request_->body_reader_ XOR request_->async_body_reader_
852
        assert(
853
                (request_->body_reader_ || request_->async_body_reader_)
854
                && !(request_->body_reader_ && request_->async_body_reader_));
855

856
        auto cancelled = cancelled_;
857
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,592✔
858
                if (!*cancelled) {
1,160✔
859
                        if (!read) {
1,159✔
860
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
861
                                return;
2✔
862
                        }
863
                        WriteNewBodyBuffer(read.value());
1,157✔
864
                }
865
        };
1,160✔
866

867

868
        if (request_->body_reader_) {
1,160✔
869
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,470✔
870
        } else {
871
                auto err = request_->async_body_reader_->AsyncRead(
872
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
873
                if (err != error::NoError) {
425✔
UNCOV
874
                        CallErrorHandler(err, request_, header_handler_);
×
875
                }
876
        }
877
}
1,160✔
878

879
void Client::WriteNewBodyBuffer(size_t size) {
1,157✔
880
        request_data_.http_request_->body().data = body_buffer_.data();
1,157✔
881
        request_data_.http_request_->body().size = size;
1,157✔
882

883
        if (size > 0) {
1,157✔
884
                request_data_.http_request_->body().more = true;
1,124✔
885
        } else {
886
                // Release ownership of Body reader.
887
                request_->body_reader_.reset();
33✔
888
                request_->async_body_reader_.reset();
33✔
889
                request_data_.http_request_->body().more = false;
33✔
890
        }
891

892
        WriteBody();
1,157✔
893
}
1,157✔
894

895
void Client::WriteBody() {
2,278✔
896
        auto &cancelled = cancelled_;
897
        auto &request_data = request_data_;
2,278✔
898

899
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,278✔
900
                if (!*cancelled) {
2,278✔
901
                        WriteBodyHandler(ec, num_written);
2,278✔
902
                }
903
        };
4,556✔
904

905
        switch (socket_mode_) {
2,278✔
UNCOV
906
        case SocketMode::TlsTls:
×
907
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
908
                break;
UNCOV
909
        case SocketMode::Tls:
×
910
                http::async_write_some(
911
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
912
                break;
913
        case SocketMode::Plain:
2,278✔
914
                http::async_write_some(
915
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
916
                break;
917
        }
918
}
2,278✔
919

920
void Client::ReadHeader() {
238✔
921
        auto &cancelled = cancelled_;
922
        auto &response_data = response_data_;
238✔
923

924
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
235✔
925
                if (!*cancelled) {
235✔
926
                        ReadHeaderHandler(ec, num_read);
235✔
927
                }
928
        };
476✔
929

930
        switch (socket_mode_) {
238✔
931
        case SocketMode::TlsTls:
2✔
932
                http::async_read_some(
2✔
933
                        *stream_,
934
                        *response_data_.response_buffer_,
935
                        *response_data_.http_response_parser_,
936
                        handler);
937
                break;
938
        case SocketMode::Tls:
14✔
939
                http::async_read_some(
14✔
940
                        stream_->next_layer(),
941
                        *response_data_.response_buffer_,
942
                        *response_data_.http_response_parser_,
943
                        handler);
944
                break;
945
        case SocketMode::Plain:
222✔
946
                http::async_read_some(
222✔
947
                        stream_->next_layer().next_layer(),
948
                        *response_data_.response_buffer_,
949
                        *response_data_.http_response_parser_,
950
                        handler);
951
                break;
952
        }
953
}
238✔
954

955
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
235✔
956
        if (num_read > 0) {
235✔
957
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
460✔
958
        }
959

960
        if (ec) {
235✔
961
                CallErrorHandler(ec, request_, header_handler_);
5✔
962
                return;
65✔
963
        }
964

965
        if (!response_data_.http_response_parser_->is_header_done()) {
230✔
UNCOV
966
                ReadHeader();
×
UNCOV
967
                return;
×
968
        }
969

970
        if (secondary_req_) {
230✔
971
                HandleSecondaryRequest();
9✔
972
                return;
9✔
973
        }
974

975
        response_.reset(new IncomingResponse(*this, cancelled_));
442✔
976
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
221✔
977
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
221✔
978

979
        logger_.Debug(
442✔
980
                "Received response: " + to_string(response_->status_code_) + " "
442✔
981
                + response_->status_message_);
663✔
982

983
        string debug_str;
984
        for (auto header = response_data_.http_response_parser_->get().cbegin();
254✔
985
                 header != response_data_.http_response_parser_->get().cend();
475✔
986
                 header++) {
987
                response_->headers_[string {header->name_string()}] = string {header->value()};
762✔
988
                if (logger_.Level() >= log::LogLevel::Debug) {
254✔
989
                        debug_str += string {header->name_string()};
239✔
990
                        debug_str += ": ";
239✔
991
                        debug_str += string {header->value()};
239✔
992
                        debug_str += "\n";
239✔
993
                }
994
        }
995

996
        logger_.Debug("Received headers:\n" + debug_str);
442✔
997
        debug_str.clear();
998

999
        if (GetContentLength(*response_data_.http_response_parser_) == 0
221✔
1000
                && !response_data_.http_response_parser_->chunked()) {
221✔
1001
                auto cancelled = cancelled_;
1002
                status_ = TransactionStatus::HeaderHandlerCalled;
48✔
1003
                CallHandler(header_handler_);
96✔
1004
                if (!*cancelled) {
48✔
1005
                        status_ = TransactionStatus::Done;
42✔
1006
                        if (response_->status_code_ != StatusCode::StatusSwitchingProtocols) {
42✔
1007
                                // Make an exception for 101 Switching Protocols response, where the TCP connection
1008
                                // is meant to be reused.
1009
                                DoCancel();
38✔
1010
                        }
1011
                        CallHandler(body_handler_);
84✔
1012
                }
1013
                return;
1014
        }
1015

1016
        auto cancelled = cancelled_;
1017
        status_ = TransactionStatus::HeaderHandlerCalled;
173✔
1018
        CallHandler(header_handler_);
346✔
1019
        if (*cancelled) {
173✔
1020
                return;
1021
        }
1022

1023
        // We know that a body reader is required here, because of the check for body above.
1024
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
170✔
1025
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
1026
        }
1027
}
1028

1029
void Client::HandleSecondaryRequest() {
9✔
1030
        logger_.Debug(
18✔
1031
                "Received proxy response: "
1032
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
18✔
1033
                + string {response_data_.http_response_parser_->get().reason()});
36✔
1034

1035
        request_ = std::move(secondary_req_);
1036

1037
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
9✔
1038
                auto err = MakeError(
1039
                        ProxyError,
1040
                        "Proxy returned unexpected response: "
1041
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
1042
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
1043
                CallErrorHandler(err, request_, header_handler_);
4✔
1044
                return;
1045
        }
1046

1047
        if (GetContentLength(*response_data_.http_response_parser_) != 0
7✔
1048
                || response_data_.http_response_parser_->chunked()) {
7✔
UNCOV
1049
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
UNCOV
1050
                CallErrorHandler(err, request_, header_handler_);
×
1051
                return;
1052
        }
1053

1054
        // We are connected. Now repeat the request cycle with the original request. Pretend
1055
        // we were just connected.
1056

1057
        assert(request_->GetProtocol() == "https");
1058

1059
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1060
        // a different layer, this will get out of sync.
1061
        assert(response_data_.response_buffer_->size() == 0);
1062

1063
        switch (socket_mode_) {
7✔
UNCOV
1064
        case SocketMode::TlsTls:
×
1065
                // Should never get here, because this is the only place where TlsTls mode
1066
                // is supposed to be turned on.
1067
                assert(false);
UNCOV
1068
                CallErrorHandler(
×
UNCOV
1069
                        error::MakeError(
×
1070
                                error::ProgrammingError,
UNCOV
1071
                                "Any other mode than Tls is not valid when handling secondary request"),
×
UNCOV
1072
                        request_,
×
UNCOV
1073
                        header_handler_);
×
1074
                break;
×
1075
        case SocketMode::Tls:
3✔
1076
                // Upgrade to TLS inside TLS.
1077
                socket_mode_ = SocketMode::TlsTls;
3✔
1078
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1079
                break;
3✔
1080
        case SocketMode::Plain:
4✔
1081
                // Upgrade to TLS.
1082
                socket_mode_ = SocketMode::Tls;
4✔
1083
                HandshakeHandler(
4✔
1084
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
4✔
1085
                break;
4✔
1086
        }
1087
}
1088

1089
void Client::AsyncReadNextBodyPart(
4,213✔
1090
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1091
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1092

1093
        if (status_ == TransactionStatus::ReaderCreated) {
4,213✔
1094
                status_ = TransactionStatus::BodyReadingInProgress;
151✔
1095
        }
1096

1097
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
4,213✔
1098
                auto cancelled = cancelled_;
1099
                handler(0);
184✔
1100
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
92✔
1101
                        status_ = TransactionStatus::Done;
92✔
1102
                        DoCancel();
92✔
1103
                        CallHandler(body_handler_);
184✔
1104
                }
1105
                return;
1106
        }
1107

1108
        reader_buf_start_ = start;
4,121✔
1109
        reader_buf_end_ = end;
4,121✔
1110
        reader_handler_ = handler;
4,121✔
1111
        size_t read_size = end - start;
4,121✔
1112
        size_t smallest = min(body_buffer_.size(), read_size);
6,234✔
1113

1114
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
4,121✔
1115
        response_data_.http_response_parser_->get().body().size = smallest;
4,121✔
1116
        response_data_.last_buffer_size_ = smallest;
4,121✔
1117

1118
        auto &cancelled = cancelled_;
1119
        auto &response_data = response_data_;
4,121✔
1120

1121
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4,120✔
1122
                if (!*cancelled) {
4,120✔
1123
                        ReadBodyHandler(ec, num_read);
4,120✔
1124
                }
1125
        };
8,242✔
1126

1127
        switch (socket_mode_) {
4,121✔
1128
        case SocketMode::TlsTls:
2✔
1129
                http::async_read_some(
2✔
1130
                        *stream_,
1131
                        *response_data_.response_buffer_,
1132
                        *response_data_.http_response_parser_,
1133
                        async_handler);
1134
                break;
1135
        case SocketMode::Tls:
4✔
1136
                http::async_read_some(
4✔
1137
                        stream_->next_layer(),
1138
                        *response_data_.response_buffer_,
1139
                        *response_data_.http_response_parser_,
1140
                        async_handler);
1141
                break;
1142
        case SocketMode::Plain:
4,115✔
1143
                http::async_read_some(
4,115✔
1144
                        stream_->next_layer().next_layer(),
1145
                        *response_data_.response_buffer_,
1146
                        *response_data_.http_response_parser_,
1147
                        async_handler);
1148
                break;
1149
        }
1150
}
1151

1152
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
4,120✔
1153
        if (num_read > 0) {
4,120✔
1154
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
8,140✔
1155
        }
1156

1157
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,120✔
1158
                // This can be ignored. We always reset the buffer between reads anyway.
1159
                ec = error_code();
1,958✔
1160
        }
1161

1162
        assert(reader_handler_);
1163

1164
        if (response_data_.http_response_parser_->is_done()) {
4,120✔
1165
                status_ = TransactionStatus::BodyReadingFinished;
98✔
1166
        }
1167

1168
        auto cancelled = cancelled_;
1169

1170
        if (ec) {
4,120✔
1171
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
100✔
1172
                reader_handler_(expected::unexpected(err));
150✔
1173
                if (!*cancelled) {
50✔
1174
                        CallErrorHandler(ec, request_, body_handler_);
92✔
1175
                }
1176
                return;
1177
        }
1178

1179
        // The num_read from above includes out of band payload data, such as chunk headers, which
1180
        // we are not interested in. So we need to calculate the payload size from the remaining
1181
        // buffer space.
1182
        size_t payload_read =
1183
                response_data_.last_buffer_size_ - response_data_.http_response_parser_->get().body().size;
4,070✔
1184

1185
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
4,070✔
1186
        size_t smallest = min(payload_read, buf_size);
4,070✔
1187

1188
        if (smallest == 0) {
4,070✔
1189
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1190
                // return 0 to the handler however, because in `io::Reader` context this means
1191
                // EOF. So just repeat the request instead, until we get actual payload data.
1192
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1193
        } else {
1194
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,993✔
1195
                reader_handler_(smallest);
7,986✔
1196
        }
1197
}
1198

1199
void Client::Cancel() {
147✔
1200
        auto cancelled = cancelled_;
1201

1202
        if (!*cancelled) {
147✔
1203
                auto err =
1204
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
42✔
1205
                switch (status_) {
21✔
1206
                case TransactionStatus::None:
3✔
1207
                        CallErrorHandler(err, request_, header_handler_);
3✔
1208
                        break;
3✔
1209
                case TransactionStatus::HeaderHandlerCalled:
17✔
1210
                case TransactionStatus::ReaderCreated:
1211
                case TransactionStatus::BodyReadingInProgress:
1212
                case TransactionStatus::BodyReadingFinished:
1213
                        CallErrorHandler(err, request_, body_handler_);
17✔
1214
                        break;
17✔
1215
                case TransactionStatus::Replying:
1216
                case TransactionStatus::SwitchingProtocol:
1217
                        // Not used by client.
1218
                        assert(false);
1219
                        break;
1220
                case TransactionStatus::BodyHandlerCalled:
1221
                case TransactionStatus::Done:
1222
                        break;
1223
                }
1224
        }
1225

1226
        if (!*cancelled) {
147✔
1227
                DoCancel();
1✔
1228
        }
1229
}
147✔
1230

1231
void Client::DoCancel() {
610✔
1232
        resolver_.cancel();
610✔
1233
        if (stream_) {
610✔
1234
                stream_->lowest_layer().cancel();
260✔
1235
                stream_->lowest_layer().close();
260✔
1236
                stream_.reset();
260✔
1237
        }
1238

1239
        // Reset logger to no connection.
1240
        logger_ = log::Logger(logger_name_);
610✔
1241

1242
        // Set cancel state and then make a new one. Those who are interested should have their own
1243
        // pointer to the old one.
1244
        *cancelled_ = true;
610✔
1245
        cancelled_ = make_shared<bool>(true);
610✔
1246
}
610✔
1247

1248
Stream::Stream(Server &server) :
441✔
1249
        server_ {server},
1250
        logger_ {"http"},
1251
        cancelled_(make_shared<bool>(true)),
441✔
1252
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
441✔
1253
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,323✔
1254
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
882✔
1255

1256
        // This is equivalent to:
1257
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1258
        // but compatible with Boost 1.67.
1259
        request_data_.request_buffer_->prepare(
1260
                body_buffer_.size() - request_data_.request_buffer_->size());
441✔
1261

1262
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
882✔
1263

1264
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1265
        // they do, they should be handled higher up in the application logic.
1266
        //
1267
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1268
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1269
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1270
        // possible value instead.
1271
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1272
}
441✔
1273

1274
Stream::~Stream() {
1,323✔
1275
        DoCancel();
441✔
1276
}
441✔
1277

1278
void Stream::Cancel() {
7✔
1279
        auto cancelled = cancelled_;
1280

1281
        if (!*cancelled) {
7✔
1282
                auto err =
1283
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1284
                switch (status_) {
7✔
UNCOV
1285
                case TransactionStatus::None:
×
UNCOV
1286
                        CallErrorHandler(err, request_, server_.header_handler_);
×
UNCOV
1287
                        break;
×
1288
                case TransactionStatus::HeaderHandlerCalled:
5✔
1289
                case TransactionStatus::ReaderCreated:
1290
                case TransactionStatus::BodyReadingInProgress:
1291
                case TransactionStatus::BodyReadingFinished:
1292
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1293
                        break;
5✔
UNCOV
1294
                case TransactionStatus::BodyHandlerCalled:
×
1295
                        // In between body handler and reply finished. No one to handle the status
1296
                        // here.
UNCOV
1297
                        server_.RemoveStream(shared_from_this());
×
UNCOV
1298
                        break;
×
1299
                case TransactionStatus::Replying:
1✔
1300
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1301
                        break;
1✔
1302
                case TransactionStatus::SwitchingProtocol:
1✔
1303
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1304
                        break;
1✔
1305
                case TransactionStatus::Done:
1306
                        break;
1307
                }
1308
        }
1309

1310
        if (!*cancelled) {
7✔
1311
                DoCancel();
×
1312
        }
1313
}
7✔
1314

1315
void Stream::DoCancel() {
801✔
1316
        if (socket_.is_open()) {
801✔
1317
                socket_.cancel();
217✔
1318
                socket_.close();
217✔
1319
        }
1320

1321
        // Set cancel state and then make a new one. Those who are interested should have their own
1322
        // pointer to the old one.
1323
        *cancelled_ = true;
801✔
1324
        cancelled_ = make_shared<bool>(true);
801✔
1325
}
801✔
1326

UNCOV
1327
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
UNCOV
1328
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
UNCOV
1329
}
×
1330

UNCOV
1331
void Stream::CallErrorHandler(
×
1332
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
UNCOV
1333
        status_ = TransactionStatus::Done;
×
UNCOV
1334
        DoCancel();
×
UNCOV
1335
        handler(expected::unexpected(err.WithContext(
×
1336
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1337

UNCOV
1338
        server_.RemoveStream(shared_from_this());
×
UNCOV
1339
}
×
1340

1341
void Stream::CallErrorHandler(
2✔
1342
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1343
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1344
}
2✔
1345

1346
void Stream::CallErrorHandler(
8✔
1347
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1348
        status_ = TransactionStatus::Done;
8✔
1349
        DoCancel();
8✔
1350
        handler(
8✔
1351
                req,
1352
                err.WithContext(
8✔
1353
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
24✔
1354

1355
        server_.RemoveStream(shared_from_this());
8✔
1356
}
8✔
1357

1358
void Stream::CallErrorHandler(
4✔
1359
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1360
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1361
}
4✔
1362

1363
void Stream::CallErrorHandler(
7✔
1364
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1365
        status_ = TransactionStatus::Done;
7✔
1366
        DoCancel();
7✔
1367
        handler(err.WithContext(
14✔
1368
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1369

1370
        server_.RemoveStream(shared_from_this());
7✔
1371
}
7✔
1372

UNCOV
1373
void Stream::CallErrorHandler(
×
1374
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
UNCOV
1375
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
UNCOV
1376
}
×
1377

1378
void Stream::CallErrorHandler(
1✔
1379
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1380
        status_ = TransactionStatus::Done;
1✔
1381
        DoCancel();
1✔
1382
        handler(expected::unexpected(err.WithContext(
2✔
1383
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1384

1385
        server_.RemoveStream(shared_from_this());
1✔
1386
}
1✔
1387

1388
void Stream::AcceptHandler(const error_code &ec) {
225✔
1389
        if (ec) {
225✔
UNCOV
1390
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
UNCOV
1391
                return;
×
1392
        }
1393

1394
        auto ip = socket_.remote_endpoint().address().to_string();
450✔
1395

1396
        // Use IP as context for logging.
1397
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
225✔
1398

1399
        logger_.Debug("Accepted connection.");
450✔
1400

1401
        request_.reset(new IncomingRequest(*this, cancelled_));
450✔
1402

1403
        request_->address_.host = ip;
225✔
1404

1405
        *cancelled_ = false;
225✔
1406

1407
        ReadHeader();
225✔
1408
}
1409

1410
void Stream::ReadHeader() {
225✔
1411
        auto &cancelled = cancelled_;
1412
        auto &request_data = request_data_;
225✔
1413

1414
        http::async_read_some(
450✔
1415
                socket_,
225✔
1416
                *request_data_.request_buffer_,
1417
                *request_data_.http_request_parser_,
1418
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
225✔
1419
                        if (!*cancelled) {
225✔
1420
                                ReadHeaderHandler(ec, num_read);
225✔
1421
                        }
1422
                });
225✔
1423
}
225✔
1424

1425
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
225✔
1426
        if (num_read > 0) {
225✔
1427
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
450✔
1428
        }
1429

1430
        if (ec) {
225✔
UNCOV
1431
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1432
                return;
185✔
1433
        }
1434

1435
        if (!request_data_.http_request_parser_->is_header_done()) {
225✔
UNCOV
1436
                ReadHeader();
×
UNCOV
1437
                return;
×
1438
        }
1439

1440
        auto method_result = BeastVerbToMethod(
1441
                request_data_.http_request_parser_->get().base().method(),
1442
                string {request_data_.http_request_parser_->get().base().method_string()});
450✔
1443
        if (!method_result) {
225✔
UNCOV
1444
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
UNCOV
1445
                return;
×
1446
        }
1447
        request_->method_ = method_result.value();
225✔
1448
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
225✔
1449

1450
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
225✔
1451

1452
        string debug_str;
1453
        for (auto header = request_data_.http_request_parser_->get().cbegin();
391✔
1454
                 header != request_data_.http_request_parser_->get().cend();
616✔
1455
                 header++) {
1456
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,173✔
1457
                if (logger_.Level() >= log::LogLevel::Debug) {
391✔
1458
                        debug_str += string {header->name_string()};
326✔
1459
                        debug_str += ": ";
326✔
1460
                        debug_str += string {header->value()};
326✔
1461
                        debug_str += "\n";
326✔
1462
                }
1463
        }
1464

1465
        logger_.Debug("Received headers:\n" + debug_str);
450✔
1466
        debug_str.clear();
1467

1468
        if (GetContentLength(*request_data_.http_request_parser_) == 0
225✔
1469
                && !request_data_.http_request_parser_->chunked()) {
225✔
1470
                auto cancelled = cancelled_;
1471
                status_ = TransactionStatus::HeaderHandlerCalled;
184✔
1472
                server_.header_handler_(request_);
368✔
1473
                if (!*cancelled) {
184✔
1474
                        status_ = TransactionStatus::BodyHandlerCalled;
184✔
1475
                        CallBodyHandler();
184✔
1476
                }
1477
                return;
1478
        }
1479

1480
        assert(!request_data_.http_request_parser_->is_done());
1481

1482
        auto cancelled = cancelled_;
1483
        status_ = TransactionStatus::HeaderHandlerCalled;
41✔
1484
        server_.header_handler_(request_);
82✔
1485
        if (*cancelled) {
41✔
1486
                return;
1487
        }
1488

1489
        // We know that a body reader is required here, because of the check for body above.
1490
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
40✔
1491
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1492
        }
1493
}
1494

1495
void Stream::AsyncReadNextBodyPart(
2,264✔
1496
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1497
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1498

1499
        if (status_ == TransactionStatus::ReaderCreated) {
2,264✔
1500
                status_ = TransactionStatus::BodyReadingInProgress;
39✔
1501
        }
1502

1503
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,264✔
1504
                auto cancelled = cancelled_;
1505
                handler(0);
66✔
1506
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
33✔
1507
                        status_ = TransactionStatus::BodyHandlerCalled;
33✔
1508
                        CallBodyHandler();
33✔
1509
                }
1510
                return;
1511
        }
1512

1513
        reader_buf_start_ = start;
2,231✔
1514
        reader_buf_end_ = end;
2,231✔
1515
        reader_handler_ = handler;
2,231✔
1516
        size_t read_size = end - start;
2,231✔
1517
        size_t smallest = min(body_buffer_.size(), read_size);
3,287✔
1518

1519
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,231✔
1520
        request_data_.http_request_parser_->get().body().size = smallest;
2,231✔
1521
        request_data_.last_buffer_size_ = smallest;
2,231✔
1522

1523
        auto &cancelled = cancelled_;
1524
        auto &request_data = request_data_;
2,231✔
1525

1526
        http::async_read_some(
4,462✔
1527
                socket_,
2,231✔
1528
                *request_data_.request_buffer_,
1529
                *request_data_.http_request_parser_,
1530
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,231✔
1531
                        if (!*cancelled) {
2,231✔
1532
                                ReadBodyHandler(ec, num_read);
2,231✔
1533
                        }
1534
                });
2,231✔
1535
}
1536

1537
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,231✔
1538
        if (num_read > 0) {
2,231✔
1539
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,454✔
1540
        }
1541

1542
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,231✔
1543
                // This can be ignored. We always reset the buffer between reads anyway.
1544
                ec = error_code();
979✔
1545
        }
1546

1547
        assert(reader_handler_);
1548

1549
        if (request_data_.http_request_parser_->is_done()) {
2,231✔
1550
                status_ = TransactionStatus::BodyReadingFinished;
33✔
1551
        }
1552

1553
        auto cancelled = cancelled_;
1554

1555
        if (ec) {
2,231✔
1556
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1557
                reader_handler_(expected::unexpected(err));
12✔
1558
                if (!*cancelled) {
4✔
1559
                        CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1560
                }
1561
                return;
1562
        }
1563

1564
        // The num_read from above includes out of band payload data, such as chunk headers, which
1565
        // we are not interested in. So we need to calculate the payload size from the remaining
1566
        // buffer space.
1567
        size_t payload_read =
1568
                request_data_.last_buffer_size_ - request_data_.http_request_parser_->get().body().size;
2,227✔
1569

1570
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,227✔
1571
        size_t smallest = min(payload_read, buf_size);
2,227✔
1572

1573
        if (smallest == 0) {
2,227✔
1574
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1575
                // return 0 to the handler however, because in `io::Reader` context this means
1576
                // EOF. So just repeat the request instead, until we get actual payload data.
1577
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1578
        } else {
1579
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,150✔
1580
                reader_handler_(smallest);
4,300✔
1581
        }
1582
}
1583

1584
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
202✔
1585
        SetupResponse();
202✔
1586

1587
        reply_finished_handler_ = reply_finished_handler;
202✔
1588

1589
        auto &cancelled = cancelled_;
1590
        auto &response_data = response_data_;
202✔
1591

1592
        http::async_write_header(
404✔
1593
                socket_,
202✔
1594
                *response_data_.http_response_serializer_,
1595
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
202✔
1596
                        if (!*cancelled) {
202✔
1597
                                WriteHeaderHandler(ec, num_written);
201✔
1598
                        }
1599
                });
202✔
1600
}
202✔
1601

1602
void Stream::SetupResponse() {
211✔
1603
        auto response = maybe_response_.lock();
211✔
1604
        // Only called from existing responses, so this should always be true.
1605
        assert(response);
1606

1607
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1608
        status_ = TransactionStatus::Replying;
211✔
1609

1610
        // From here on we take shared ownership.
1611
        response_ = response;
1612

1613
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
422✔
1614

1615
        for (const auto &header : response->headers_) {
438✔
1616
                response_data_.http_response_->base().set(header.first, header.second);
227✔
1617
        }
1618

1619
        response_data_.http_response_->result(response->GetStatusCode());
211✔
1620
        response_data_.http_response_->reason(response->GetStatusMessage());
422✔
1621

1622
        response_data_.http_response_serializer_ =
1623
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
422✔
1624
}
211✔
1625

1626
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
201✔
1627
        if (num_written > 0) {
201✔
1628
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
402✔
1629
        }
1630

1631
        if (ec) {
201✔
UNCOV
1632
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1633
                return;
37✔
1634
        }
1635

1636
        auto exp_has_body =
1637
                HasBody(response_->GetHeader("Content-Length"), response_->GetHeader("Transfer-Encoding"));
402✔
1638
        if (!exp_has_body) {
201✔
UNCOV
1639
                CallErrorHandler(exp_has_body.error(), request_, reply_finished_handler_);
×
UNCOV
1640
                return;
×
1641
        }
1642
        if (!exp_has_body.value()) {
201✔
1643
                FinishReply();
36✔
1644
                return;
1645
        }
1646

1647
        if (!response_->body_reader_ && !response_->async_body_reader_) {
165✔
1648
                auto err = MakeError(BodyMissingError, "No body reader");
2✔
1649
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1650
                return;
1651
        }
1652

1653
        PrepareAndWriteNewBodyBuffer();
164✔
1654
}
1655

1656
void Stream::PrepareAndWriteNewBodyBuffer() {
2,105✔
1657
        // response_->body_reader_ XOR response_->async_body_reader_
1658
        assert(
1659
                (response_->body_reader_ || response_->async_body_reader_)
1660
                && !(response_->body_reader_ && response_->async_body_reader_));
1661

1662
        auto read_handler = [this](io::ExpectedSize read) {
2,106✔
1663
                if (!read) {
2,105✔
1664
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1665
                        return;
1✔
1666
                }
1667
                WriteNewBodyBuffer(read.value());
2,104✔
1668
        };
2,105✔
1669

1670
        if (response_->body_reader_) {
2,105✔
1671
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,662✔
1672
        } else {
1673
                auto err = response_->async_body_reader_->AsyncRead(
1674
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1675
                if (err != error::NoError) {
274✔
UNCOV
1676
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1677
                }
1678
        }
1679
}
2,105✔
1680

1681
void Stream::WriteNewBodyBuffer(size_t size) {
2,104✔
1682
        response_data_.http_response_->body().data = body_buffer_.data();
2,104✔
1683
        response_data_.http_response_->body().size = size;
2,104✔
1684

1685
        if (size > 0) {
2,104✔
1686
                response_data_.http_response_->body().more = true;
1,974✔
1687
        } else {
1688
                response_data_.http_response_->body().more = false;
130✔
1689
        }
1690

1691
        WriteBody();
2,104✔
1692
}
2,104✔
1693

1694
void Stream::WriteBody() {
4,058✔
1695
        auto &cancelled = cancelled_;
1696
        auto &response_data = response_data_;
4,058✔
1697

1698
        http::async_write_some(
8,116✔
1699
                socket_,
4,058✔
1700
                *response_data_.http_response_serializer_,
1701
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
4,019✔
1702
                        if (!*cancelled) {
4,019✔
1703
                                WriteBodyHandler(ec, num_written);
4,019✔
1704
                        }
1705
                });
4,019✔
1706
}
4,058✔
1707

1708
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
4,019✔
1709
        if (num_written > 0) {
4,019✔
1710
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,908✔
1711
        }
1712

1713
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,019✔
1714
                // Write next body block.
1715
                PrepareAndWriteNewBodyBuffer();
1,941✔
1716
        } else if (ec) {
2,078✔
1717
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1718
        } else if (num_written > 0) {
2,074✔
1719
                // We are still writing the body.
1720
                WriteBody();
1,954✔
1721
        } else {
1722
                // We are finished.
1723
                FinishReply();
120✔
1724
        }
1725
}
4,019✔
1726

1727
void Stream::FinishReply() {
156✔
1728
        // We are done.
1729
        status_ = TransactionStatus::Done;
156✔
1730
        DoCancel();
156✔
1731
        // Release ownership of Body reader.
1732
        response_->body_reader_.reset();
156✔
1733
        response_->async_body_reader_.reset();
156✔
1734
        reply_finished_handler_(error::NoError);
156✔
1735
        server_.RemoveStream(shared_from_this());
156✔
1736
}
156✔
1737

1738
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1739
        SetupResponse();
9✔
1740

1741
        switch_protocol_handler_ = handler;
9✔
1742
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1743

1744
        auto &cancelled = cancelled_;
1745
        auto &response_data = response_data_;
9✔
1746

1747
        http::async_write_header(
18✔
1748
                socket_,
9✔
1749
                *response_data_.http_response_serializer_,
1750
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1751
                        if (!*cancelled) {
9✔
1752
                                SwitchingProtocolHandler(ec, num_written);
8✔
1753
                        }
1754
                });
9✔
1755

1756
        return error::NoError;
9✔
1757
}
1758

1759
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1760
        if (num_written > 0) {
8✔
1761
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1762
        }
1763

1764
        if (ec) {
8✔
UNCOV
1765
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
UNCOV
1766
                return;
×
1767
        }
1768

1769
        auto socket = make_shared<RawSocket<tcp::socket>>(
1770
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1771

1772
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1773

1774
        // Rest of the connection is done directly on the socket, set cancelled_ but don't close it.
1775
        *cancelled_ = true;
8✔
1776
        cancelled_ = make_shared<bool>(true);
8✔
1777
        server_.RemoveStream(shared_from_this());
16✔
1778

1779
        switch_protocol_handler(socket);
16✔
1780
}
1781

1782
void Stream::CallBodyHandler() {
217✔
1783
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1784
        // it immediately destroys, which would destroy this stream as well. At the end of this
1785
        // function, it's ok to destroy it.
1786
        auto stream_ref = shared_from_this();
1787

1788
        server_.body_handler_(request_, error::NoError);
651✔
1789

1790
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1791
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1792
        // request has not been handled correctly.
1793
        auto response = maybe_response_.lock();
217✔
1794
        if (!response) {
217✔
1795
                logger_.Error("Handler produced no response. Closing stream prematurely.");
6✔
1796
                *cancelled_ = true;
3✔
1797
                cancelled_ = make_shared<bool>(true);
3✔
1798
                server_.RemoveStream(shared_from_this());
9✔
1799
        }
1800
}
217✔
1801

1802
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
237✔
1803
        event_loop_ {event_loop},
1804
        acceptor_(GetAsioIoContext(event_loop_)) {
428✔
1805
}
237✔
1806

1807
Server::~Server() {
474✔
1808
        Cancel();
237✔
1809
}
237✔
1810

1811
error::Error Server::AsyncServeUrl(
202✔
1812
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1813
        return AsyncServeUrl(
1814
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
822✔
1815
                        if (err != error::NoError) {
212✔
1816
                                body_handler(expected::unexpected(err));
12✔
1817
                        } else {
1818
                                body_handler(req);
412✔
1819
                        }
1820
                });
616✔
1821
}
1822

1823
error::Error Server::AsyncServeUrl(
217✔
1824
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1825
        auto err = BreakDownUrl(url, address_);
217✔
1826
        if (error::NoError != err) {
217✔
UNCOV
1827
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1828
        }
1829

1830
        if (address_.protocol != "http") {
217✔
UNCOV
1831
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1832
        }
1833

1834
        if (address_.path.size() > 0 && address_.path != "/") {
217✔
1835
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1836
        }
1837

1838
        boost::system::error_code ec;
216✔
1839
        auto address = asio::ip::make_address(address_.host, ec);
216✔
1840
        if (ec) {
216✔
1841
                return error::Error(
UNCOV
1842
                        ec.default_error_condition(),
×
UNCOV
1843
                        "Could not construct endpoint from address " + address_.host);
×
1844
        }
1845

1846
        asio::ip::tcp::endpoint endpoint(address, address_.port);
216✔
1847

1848
        ec.clear();
1849
        acceptor_.open(endpoint.protocol(), ec);
216✔
1850
        if (ec) {
216✔
UNCOV
1851
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1852
        }
1853

1854
        // Allow address reuse, otherwise we can't re-bind later.
1855
        ec.clear();
1856
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
216✔
1857
        if (ec) {
216✔
UNCOV
1858
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1859
        }
1860

1861
        ec.clear();
1862
        acceptor_.bind(endpoint, ec);
216✔
1863
        if (ec) {
216✔
UNCOV
1864
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1865
        }
1866

1867
        ec.clear();
1868
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
216✔
1869
        if (ec) {
216✔
UNCOV
1870
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1871
        }
1872

1873
        header_handler_ = header_handler;
216✔
1874
        body_handler_ = body_handler;
216✔
1875

1876
        PrepareNewStream();
216✔
1877

1878
        return error::NoError;
216✔
1879
}
1880

1881
void Server::Cancel() {
257✔
1882
        if (acceptor_.is_open()) {
257✔
1883
                acceptor_.cancel();
216✔
1884
                acceptor_.close();
216✔
1885
        }
1886
        streams_.clear();
1887
}
257✔
1888

1889
uint16_t Server::GetPort() const {
17✔
1890
        return acceptor_.local_endpoint().port();
17✔
1891
}
1892

1893
string Server::GetUrl() const {
16✔
1894
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1895
}
1896

1897
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
216✔
1898
        if (*req->cancelled_) {
216✔
UNCOV
1899
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1900
        }
1901
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
432✔
1902
        req->stream_.maybe_response_ = response;
216✔
1903
        return response;
216✔
1904
}
1905

1906
error::Error Server::AsyncReply(
202✔
1907
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1908
        if (*resp->cancelled_) {
202✔
UNCOV
1909
                return MakeError(StreamCancelledError, "Cannot send response");
×
1910
        }
1911

1912
        resp->stream_.AsyncReply(reply_finished_handler);
202✔
1913
        return error::NoError;
202✔
1914
}
1915

1916
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
58✔
1917
        if (*req->cancelled_) {
58✔
UNCOV
1918
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1919
        }
1920

1921
        auto &stream = req->stream_;
58✔
1922
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
58✔
1923
                return expected::unexpected(error::Error(
1✔
1924
                        make_error_condition(errc::operation_in_progress),
2✔
1925
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1926
        }
1927

1928
        if (GetContentLength(*stream.request_data_.http_request_parser_) == 0
57✔
1929
                && !stream.request_data_.http_request_parser_->chunked()) {
57✔
1930
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
54✔
1931
        }
1932

1933
        stream.status_ = TransactionStatus::ReaderCreated;
39✔
1934
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
78✔
1935
}
1936

1937
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1938
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1939
}
1940

1941
void Server::PrepareNewStream() {
441✔
1942
        StreamPtr new_stream {new Stream(*this)};
441✔
1943
        streams_.insert(new_stream);
1944
        AsyncAccept(new_stream);
882✔
1945
}
441✔
1946

1947
void Server::AsyncAccept(StreamPtr stream) {
441✔
1948
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
669✔
1949
                if (ec) {
228✔
1950
                        if (ec != errc::operation_canceled) {
3✔
UNCOV
1951
                                log::Error("Could not accept connection: " + ec.message());
×
1952
                        }
1953
                        return;
3✔
1954
                }
1955

1956
                stream->AcceptHandler(ec);
225✔
1957

1958
                this->PrepareNewStream();
225✔
1959
        });
1960
}
441✔
1961

1962
void Server::RemoveStream(StreamPtr stream) {
188✔
1963
        streams_.erase(stream);
188✔
1964

1965
        stream->DoCancel();
188✔
1966
}
188✔
1967

1968
} // namespace http
1969
} // namespace common
1970
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc