• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1685329647

24 Feb 2025 08:34AM UTC coverage: 75.912% (-0.005%) from 75.917%
1685329647

Pull #1757

gitlab-ci

lluiscampos
chore: bump ubuntu from 22.04 to 24.04 in /tests

Bumps ubuntu from 22.04 to 24.04.

---
updated-dependencies:
- dependency-name: ubuntu
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Lluis Campos <lluis.campos@northern.tech>
Pull Request #1757: Install `mender-artifact` from final tag + Bump ubuntu from 22.04 to 24.04 in /tests

7365 of 9702 relevant lines covered (75.91%)

11147.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.26
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
namespace mender {
28
namespace common {
29
namespace http {
30

31
namespace common = mender::common;
32
namespace crypto = mender::common::crypto;
33

34
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
35
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
36
const unsigned int BeastHttpVersion = 11;
37

38
namespace asio = boost::asio;
39
namespace http = boost::beast::http;
40

41
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
42

43
static http::verb MethodToBeastVerb(Method method) {
246✔
44
        switch (method) {
246✔
45
        case Method::GET:
46
                return http::verb::get;
47
        case Method::HEAD:
48
                return http::verb::head;
49
        case Method::POST:
50
                return http::verb::post;
51
        case Method::PUT:
52
                return http::verb::put;
53
        case Method::PATCH:
54
                return http::verb::patch;
55
        case Method::CONNECT:
56
                return http::verb::connect;
57
        case Method::Invalid:
58
                // Fallthrough to end (no-op).
59
                break;
60
        }
61
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
62
        // still assert here for safety.
63
        assert(false);
64
        return http::verb::get;
65
}
66

67
static expected::expected<Method, error::Error> BeastVerbToMethod(
225✔
68
        http::verb verb, const string &verb_string) {
69
        switch (verb) {
225✔
70
        case http::verb::get:
189✔
71
                return Method::GET;
72
        case http::verb::head:
×
73
                return Method::HEAD;
74
        case http::verb::post:
13✔
75
                return Method::POST;
76
        case http::verb::put:
23✔
77
                return Method::PUT;
78
        case http::verb::patch:
×
79
                return Method::PATCH;
80
        case http::verb::connect:
×
81
                return Method::CONNECT;
82
        default:
×
83
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
84
        }
85
}
86

87
template <typename StreamType>
88
class BodyAsyncReader : virtual public io::AsyncReader {
89
public:
90
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
153✔
91
                stream_ {stream},
92
                cancelled_ {cancelled} {
306✔
93
        }
153✔
94
        ~BodyAsyncReader() {
39✔
95
                Cancel();
39✔
96
        }
78✔
97

98
        error::Error AsyncRead(
2,187✔
99
                vector<uint8_t>::iterator start,
100
                vector<uint8_t>::iterator end,
101
                io::AsyncIoHandler handler) override {
102
                if (eof_) {
2,187✔
103
                        handler(0);
×
104
                        return error::NoError;
×
105
                }
106

107
                if (*cancelled_) {
2,187✔
108
                        return error::MakeError(
×
109
                                error::ProgrammingError,
110
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
111
                }
112
                stream_.AsyncReadNextBodyPart(start, end, [this, handler](io::ExpectedSize size) {
12,805✔
113
                        if (size && size.value() == 0) {
6,310✔
114
                                eof_ = true;
125✔
115
                        }
116
                        handler(size);
12,620✔
117
                });
118
                return error::NoError;
2,187✔
119
        }
120

121
        void Cancel() override {
39✔
122
                if (!*cancelled_) {
39✔
123
                        stream_.Cancel();
4✔
124
                }
125
        }
39✔
126

127
private:
128
        StreamType &stream_;
129
        shared_ptr<bool> cancelled_;
130
        bool eof_ {false};
131

132
        friend class Client;
133
        friend class Server;
134
};
135

136
template <typename StreamType>
137
class RawSocket : virtual public io::AsyncReadWriter {
138
public:
139
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
140
                destroying_ {make_shared<bool>(false)},
×
141
                stream_ {stream},
142
                buffered_ {buffered} {
×
143
                // If there are no buffered bytes, then we don't need it.
144
                if (buffered_ && buffered_->size() == 0) {
×
145
                        buffered_.reset();
×
146
                }
147
        }
×
148

149
        ~RawSocket() {
15✔
150
                *destroying_ = true;
15✔
151
                Cancel();
15✔
152
        }
30✔
153

154
        error::Error AsyncRead(
320✔
155
                vector<uint8_t>::iterator start,
156
                vector<uint8_t>::iterator end,
157
                io::AsyncIoHandler handler) override {
158
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
159
                // header and parts of the body in one block, return those first.
160
                if (buffered_) {
320✔
161
                        return DrainPrebufferedData(start, end, handler);
8✔
162
                }
163

164
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
165
                auto &destroying = destroying_;
166
                stream_->async_read_some(
632✔
167
                        read_buffer_,
316✔
168
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
169
                                if (*destroying) {
313✔
170
                                        return;
171
                                }
172

173
                                if (ec == asio::error::operation_aborted) {
313✔
174
                                        handler(expected::unexpected(error::Error(
12✔
175
                                                make_error_condition(errc::operation_canceled),
6✔
176
                                                "Could not read from socket")));
177
                                } else if (ec) {
310✔
178
                                        handler(expected::unexpected(
12✔
179
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
180
                                } else {
181
                                        handler(num_read);
608✔
182
                                }
183
                        });
184
                return error::NoError;
316✔
185
        }
186

187
        error::Error AsyncWrite(
309✔
188
                vector<uint8_t>::const_iterator start,
189
                vector<uint8_t>::const_iterator end,
190
                io::AsyncIoHandler handler) override {
191
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
192
                auto &destroying = destroying_;
193
                stream_->async_write_some(
618✔
194
                        write_buffer_,
309✔
195
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
196
                                if (*destroying) {
306✔
197
                                        return;
198
                                }
199

200
                                if (ec == asio::error::operation_aborted) {
306✔
201
                                        handler(expected::unexpected(error::Error(
×
202
                                                make_error_condition(errc::operation_canceled),
×
203
                                                "Could not write to socket")));
204
                                } else if (ec) {
306✔
205
                                        handler(expected::unexpected(
×
206
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
207
                                } else {
208
                                        handler(num_written);
612✔
209
                                }
210
                        });
211
                return error::NoError;
309✔
212
        }
213

214
        void Cancel() override {
28✔
215
                if (stream_->lowest_layer().is_open()) {
28✔
216
                        stream_->lowest_layer().cancel();
15✔
217
                        stream_->lowest_layer().close();
15✔
218
                }
219
        }
28✔
220

221
private:
222
        error::Error DrainPrebufferedData(
4✔
223
                vector<uint8_t>::iterator start,
224
                vector<uint8_t>::iterator end,
225
                io::AsyncIoHandler handler) {
226
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
227

228
                // These two lines are equivalent to:
229
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
230
                // but compatible with Boost 1.67.
231
                const beast::flat_buffer &cbuffered = *buffered_;
232
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
233
                buffered_->consume(to_copy);
4✔
234
                if (buffered_->size() == 0) {
4✔
235
                        // We don't need it anymore.
236
                        buffered_.reset();
4✔
237
                }
238
                handler(to_copy);
4✔
239
                return error::NoError;
4✔
240
        }
241

242
        shared_ptr<bool> destroying_;
243
        shared_ptr<StreamType> stream_;
244
        shared_ptr<beast::flat_buffer> buffered_;
245
        asio::mutable_buffer read_buffer_;
246
        asio::const_buffer write_buffer_;
247
};
248

249
template <typename PARSER>
250
int64_t GetContentLength(const PARSER &parser) {
398✔
251
        auto content_length = parser.content_length();
398✔
252
        if (content_length) {
398✔
253
                return content_length.value();
351✔
254
        } else {
255
                return 0;
256
        }
257
}
258

259
expected::ExpectedBool HasBody(
447✔
260
        const expected::ExpectedString &content_length,
261
        const expected::ExpectedString &transfer_encoding) {
262
        if (transfer_encoding) {
447✔
263
                if (transfer_encoding.value() != "chunked") {
2✔
264
                        return expected::unexpected(error::Error(
×
265
                                make_error_condition(errc::not_supported),
×
266
                                "Unsupported Transfer-Encoding: " + transfer_encoding.value()));
×
267
                }
268
                return true;
269
        }
270

271
        if (content_length) {
445✔
272
                auto length = common::StringToLongLong(content_length.value());
219✔
273
                if (!length || length.value() < 0) {
219✔
274
                        return expected::unexpected(error::Error(
×
275
                                length.error().code,
276
                                "Content-Length contains invalid number: " + content_length.value()));
×
277
                }
278
                return length.value() > 0;
219✔
279
        }
280

281
        return false;
282
}
283

284
Client::Client(
353✔
285
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
286
        event_loop_ {event_loop},
287
        logger_name_ {logger_name},
288
        client_config_ {client},
289
        http_proxy_ {client.http_proxy},
353✔
290
        https_proxy_ {client.https_proxy},
353✔
291
        no_proxy_ {client.no_proxy},
353✔
292
        cancelled_ {make_shared<bool>(true)},
×
293
        resolver_(GetAsioIoContext(event_loop)),
294
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,412✔
295
}
353✔
296

297
Client::~Client() {
2,118✔
298
        if (!*cancelled_) {
353✔
299
                logger_.Warning("Client destroyed while request is still active!");
30✔
300
        }
301
        DoCancel();
353✔
302
}
353✔
303

304
error::Error Client::Initialize() {
284✔
305
        if (initialized_) {
284✔
306
                return error::NoError;
71✔
307
        }
308

309
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
631✔
310
                ssl_ctx_[i].set_verify_mode(
842✔
311
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
312

313
                beast::error_code ec {};
422✔
314
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
422✔
315
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
316
                        ssl_ctx_[i].use_certificate_file(
317
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
318
                        if (ec) {
4✔
319
                                return error::Error(
320
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
321
                        }
322
                        auto exp_key = crypto::PrivateKey::Load(
323
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
324
                        if (!exp_key) {
3✔
325
                                return exp_key.error().WithContext(
326
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
327
                        }
328

329
                        const int ret =
330
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value()->Get());
2✔
331
                        if (ret != 1) {
2✔
332
                                return MakeError(
333
                                        HTTPInitError,
334
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
335
                                                + " to the SSL CTX");
×
336
                        }
337
                } else if (
338
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
418✔
339
                        return error::Error(
340
                                make_error_condition(errc::invalid_argument),
4✔
341
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
342
                }
343

344
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
418✔
345
                if (ec) {
418✔
346
                        auto err = error::Error(
347
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
348
                        if (client_config_.server_cert_path == "") {
×
349
                                // We aren't going to have any valid certificates then.
350
                                return err;
×
351
                        } else {
352
                                // We have a dedicated certificate, so this is not fatal.
353
                                log::Info(err.String());
×
354
                        }
355
                }
356
                if (client_config_.server_cert_path != "") {
418✔
357
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
52✔
358
                        if (ec) {
52✔
359
                                log::Warning("Failed to load the server certificate! Falling back to the CA store");
4✔
360
                        }
361
                }
362
        }
363

364
        initialized_ = true;
209✔
365

366
        return error::NoError;
209✔
367
}
368

369
// Create the HOST header according to:
370
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.23
371
// In short: Add the port-number if it is non-standard HTTP
372
static string CreateHOSTAddress(OutgoingRequestPtr req) {
272✔
373
        if (req->GetPort() == 80 || req->GetPort() == 443) {
272✔
374
                return req->GetHost();
4✔
375
        }
376
        return req->GetHost() + ":" + to_string(req->GetPort());
536✔
377
}
378

379
error::Error Client::AsyncCall(
284✔
380
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
381
        auto err = Initialize();
284✔
382
        if (err != error::NoError) {
284✔
383
                return err;
4✔
384
        }
385

386
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
280✔
387
                return error::Error(
388
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
389
        }
390

391
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
280✔
392
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
393
        }
394

395
        if (!header_handler || !body_handler) {
278✔
396
                return error::MakeError(
397
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
398
        }
399

400
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
277✔
401
                return error::Error(
402
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
403
        }
404

405
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
276✔
406

407
        request_ = req;
408

409
        err = HandleProxySetup();
276✔
410
        if (err != error::NoError) {
276✔
411
                return err;
4✔
412
        }
413

414
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
415
        // request to route to our k8s cluster. Set this in all cases.
416
        const string header_url = CreateHOSTAddress(req);
544✔
417
        req->SetHeader("HOST", header_url);
544✔
418

419
        log::Trace("Setting HOST address: " + header_url);
272✔
420

421
        header_handler_ = header_handler;
272✔
422
        body_handler_ = body_handler;
272✔
423
        status_ = TransactionStatus::None;
272✔
424

425
        cancelled_ = make_shared<bool>(false);
272✔
426

427
        auto &cancelled = cancelled_;
428

429
        resolver_.async_resolve(
544✔
430
                request_->address_.host,
431
                to_string(request_->address_.port),
544✔
432
                [this, cancelled](
540✔
433
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
267✔
434
                        if (!*cancelled) {
268✔
435
                                ResolveHandler(ec, results);
267✔
436
                        }
437
                });
268✔
438

439
        return error::NoError;
272✔
440
}
441

442
static inline error::Error AddProxyAuthHeader(OutgoingRequest &req, BrokenDownUrl &proxy_address) {
22✔
443
        if (proxy_address.username == "") {
22✔
444
                // nothing to do
445
                return error::NoError;
19✔
446
        }
447
        auto ex_dec_username = URLDecode(proxy_address.username);
3✔
448
        auto ex_dec_password = URLDecode(proxy_address.password);
3✔
449
        if (!ex_dec_username) {
3✔
450
                return ex_dec_username.error();
×
451
        }
452
        if (!ex_dec_password) {
3✔
453
                return ex_dec_password.error();
×
454
        }
455
        auto creds = ex_dec_username.value() + ":" + ex_dec_password.value();
3✔
456
        auto ex_encoded_creds = crypto::EncodeBase64(common::ByteVectorFromString(creds));
6✔
457
        if (!ex_encoded_creds) {
3✔
458
                return ex_encoded_creds.error();
×
459
        }
460
        req.SetHeader("Proxy-Authorization", "Basic " + ex_encoded_creds.value());
6✔
461

462
        return error::NoError;
3✔
463
}
464

465
error::Error Client::HandleProxySetup() {
276✔
466
        secondary_req_.reset();
276✔
467

468
        if (request_->address_.protocol == "http") {
276✔
469
                socket_mode_ = SocketMode::Plain;
250✔
470

471
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
250✔
472
                        // Make a modified proxy request.
473
                        BrokenDownUrl proxy_address;
20✔
474
                        auto err = BreakDownUrl(http_proxy_, proxy_address, true);
11✔
475
                        if (err != error::NoError) {
11✔
476
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
477
                        }
478
                        if (proxy_address.path != "" && proxy_address.path != "/") {
10✔
479
                                return MakeError(
480
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
481
                        }
482

483
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
18✔
484
                                                                          + ":" + to_string(request_->address_.port)
27✔
485
                                                                          + request_->address_.path;
27✔
486
                        request_->address_.host = proxy_address.host;
9✔
487
                        request_->address_.port = proxy_address.port;
9✔
488
                        request_->address_.protocol = proxy_address.protocol;
9✔
489

490
                        err = AddProxyAuthHeader(*request_, proxy_address);
9✔
491
                        if (err != error::NoError) {
9✔
492
                                return err;
×
493
                        }
494

495
                        if (proxy_address.protocol == "https") {
9✔
496
                                socket_mode_ = SocketMode::Tls;
5✔
497
                        } else if (proxy_address.protocol == "http") {
4✔
498
                                socket_mode_ = SocketMode::Plain;
4✔
499
                        } else {
500
                                // Should never get here.
501
                                assert(false);
502
                        }
503
                }
504
        } else if (request_->address_.protocol == "https") {
26✔
505
                socket_mode_ = SocketMode::Tls;
26✔
506

507
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
26✔
508
                        // Save the original request for later, so that we can make a new request
509
                        // over the channel established by CONNECT.
510
                        secondary_req_ = std::move(request_);
511

512
                        request_ = make_shared<OutgoingRequest>();
30✔
513
                        request_->SetMethod(Method::CONNECT);
15✔
514
                        BrokenDownUrl proxy_address;
28✔
515
                        auto err = BreakDownUrl(https_proxy_, proxy_address, true);
15✔
516
                        if (err != error::NoError) {
15✔
517
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
518
                        }
519
                        if (proxy_address.path != "" && proxy_address.path != "/") {
14✔
520
                                return MakeError(
521
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
522
                        }
523

524
                        request_->address_.path =
525
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
26✔
526
                        request_->address_.host = proxy_address.host;
13✔
527
                        request_->address_.port = proxy_address.port;
13✔
528
                        request_->address_.protocol = proxy_address.protocol;
13✔
529

530
                        err = AddProxyAuthHeader(*request_, proxy_address);
13✔
531
                        if (err != error::NoError) {
13✔
532
                                return err;
×
533
                        }
534

535
                        if (proxy_address.protocol == "https") {
13✔
536
                                socket_mode_ = SocketMode::Tls;
7✔
537
                        } else if (proxy_address.protocol == "http") {
6✔
538
                                socket_mode_ = SocketMode::Plain;
6✔
539
                        } else {
540
                                // Should never get here.
541
                                assert(false);
542
                        }
543
                }
544
        } else {
545
                // Should never get here
546
                assert(false);
547
        }
548

549
        return error::NoError;
272✔
550
}
551

552
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
172✔
553
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
172✔
554
                return expected::unexpected(error::Error(
2✔
555
                        make_error_condition(errc::operation_in_progress),
4✔
556
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
557
        }
558

559
        if (GetContentLength(*response_data_.http_response_parser_) == 0
170✔
560
                && !response_data_.http_response_parser_->chunked()) {
170✔
561
                return expected::unexpected(
17✔
562
                        MakeError(BodyMissingError, "Response does not contain a body"));
51✔
563
        }
564

565
        status_ = TransactionStatus::ReaderCreated;
153✔
566
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
306✔
567
}
568

569
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
570
        if (*cancelled_) {
7✔
571
                return expected::unexpected(error::Error(
×
572
                        make_error_condition(errc::not_connected),
×
573
                        "Cannot switch protocols if endpoint is not connected"));
×
574
        }
575

576
        // Rest of the connection is done directly on the socket, we are done here.
577
        status_ = TransactionStatus::Done;
7✔
578
        *cancelled_ = true;
7✔
579
        cancelled_ = make_shared<bool>(false);
14✔
580

581
        auto stream = stream_;
582
        // This no longer belongs to us.
583
        stream_.reset();
7✔
584

585
        switch (socket_mode_) {
7✔
586
        case SocketMode::TlsTls:
×
587
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
588
                        stream, response_data_.response_buffer_);
×
589
        case SocketMode::Tls:
×
590
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
591
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
592
                        response_data_.response_buffer_);
×
593
        case SocketMode::Plain:
7✔
594
                return make_shared<RawSocket<tcp::socket>>(
7✔
595
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
596
                        response_data_.response_buffer_);
7✔
597
        }
598

599
        AssertOrReturnUnexpected(false);
×
600
}
601

602
void Client::CallHandler(ResponseHandler handler) {
356✔
603
        // This function exists to make sure we have a copy of the handler we're calling (in the
604
        // argument list). This is important in case the handler owns the client instance through a
605
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
606
        // does, then it destroys the final copy of the handler, and therefore also the client,
607
        // which is why we need to make a copy here, before calling it.
608
        handler(response_);
356✔
609
}
356✔
610

611
void Client::CallErrorHandler(
83✔
612
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
613
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
249✔
614
}
83✔
615

616
void Client::CallErrorHandler(
125✔
617
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
618
        status_ = TransactionStatus::Done;
125✔
619
        DoCancel();
125✔
620
        handler(expected::unexpected(
250✔
621
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
500✔
622
}
125✔
623

624
void Client::ResolveHandler(
267✔
625
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
626
        if (ec) {
267✔
627
                CallErrorHandler(ec, request_, header_handler_);
×
628
                return;
×
629
        }
630

631
        if (logger_.Level() >= log::LogLevel::Debug) {
267✔
632
                string ips = "[";
245✔
633
                string sep;
634
                for (auto r : results) {
1,028✔
635
                        ips += sep;
269✔
636
                        ips += r.endpoint().address().to_string();
269✔
637
                        sep = ", ";
269✔
638
                }
639
                ips += "]";
245✔
640
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
490✔
641
        }
642

643
        resolver_results_ = results;
644

645
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
267✔
646
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
534✔
647

648
        if (!response_data_.response_buffer_) {
267✔
649
                // We can reuse this if preexisting.
650
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
396✔
651

652
                // This is equivalent to:
653
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
654
                // but compatible with Boost 1.67.
655
                response_data_.response_buffer_->prepare(
656
                        body_buffer_.size() - response_data_.response_buffer_->size());
198✔
657
        }
658

659
        auto &cancelled = cancelled_;
660

661
        asio::async_connect(
267✔
662
                stream_->lowest_layer(),
663
                resolver_results_,
267✔
664
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
534✔
665
                        if (!*cancelled) {
267✔
666
                                switch (socket_mode_) {
267✔
667
                                case SocketMode::TlsTls:
×
668
                                        // Should never happen because we always need to handshake
669
                                        // the innermost Tls first, then the outermost, but the
670
                                        // latter doesn't happen here.
671
                                        assert(false);
672
                                        CallErrorHandler(
×
673
                                                error::MakeError(
×
674
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
675
                                                request_,
×
676
                                                header_handler_);
×
677
                                case SocketMode::Tls:
21✔
678
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
21✔
679
                                case SocketMode::Plain:
246✔
680
                                        return ConnectHandler(ec, endpoint);
246✔
681
                                }
682
                        }
683
                });
684
}
685

686
template <typename StreamType>
687
void Client::HandshakeHandler(
25✔
688
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
689
        if (ec) {
25✔
690
                CallErrorHandler(ec, request_, header_handler_);
2✔
691
                return;
2✔
692
        }
693

694
        // Enable TCP keepalive
695
        boost::asio::socket_base::keep_alive option(true);
696
        stream_->lowest_layer().set_option(option);
23✔
697

698
        // We can't avoid a C style cast on this next line. The usual method by which system headers
699
        // are excluded from warnings doesn't work, because `SSL_set_tlsext_host_name` is a macro,
700
        // containing a cast, which expands here, not in the original file. So just disable the
701
        // warning here.
702
#ifdef __clang__
703
#pragma clang diagnostic push
704
#pragma clang diagnostic ignored "-Wold-style-cast"
705
#else
706
#pragma GCC diagnostic push
707
#pragma GCC diagnostic ignored "-Wold-style-cast"
708
#endif
709
        // Set SNI Hostname (many hosts need this to handshake successfully)
710
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
23✔
711
#ifdef __clang__
712
#pragma clang diagnostic pop
713
#else
714
#pragma GCC diagnostic pop
715
#endif
716
                beast::error_code ec2 {
×
717
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
718
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
719
        }
720

721
        // Enable host name verification (not done automatically and we don't have
722
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
723
        // hence the callback that boost provides).
724
        boost::system::error_code b_ec;
23✔
725
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
46✔
726
        if (b_ec) {
23✔
727
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
728
                CallErrorHandler(b_ec, request_, header_handler_);
×
729
                return;
×
730
        }
731

732
        auto &cancelled = cancelled_;
733

734
        stream.async_handshake(
46✔
735
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
23✔
736
                        if (*cancelled) {
26✔
737
                                return;
738
                        }
739
                        if (ec) {
26✔
740
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
741
                                CallErrorHandler(ec, request_, header_handler_);
10✔
742
                                return;
10✔
743
                        }
744
                        logger_.Debug("https: Successful SSL handshake");
32✔
745
                        ConnectHandler(ec, endpoint);
16✔
746
                });
747
}
748

749

750
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
262✔
751
        if (ec) {
262✔
752
                CallErrorHandler(ec, request_, header_handler_);
16✔
753
                return;
16✔
754
        }
755

756
        // Enable TCP keepalive
757
        boost::asio::socket_base::keep_alive option(true);
758
        stream_->lowest_layer().set_option(option);
246✔
759

760
        logger_.Debug("Connected to " + endpoint.address().to_string());
492✔
761

762
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
246✔
763
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
492✔
764

765
        for (const auto &header : request_->headers_) {
646✔
766
                request_data_.http_request_->set(header.first, header.second);
400✔
767
        }
768

769
        request_data_.http_request_serializer_ =
770
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
246✔
771

772
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
492✔
773

774
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
775
        // if they do, they should be handled higher up in the application logic.
776
        //
777
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
778
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
779
        // `has_value()` in their code, causing their subsequent comparison operation to
780
        // misbehave. So pass highest possible value instead.
781
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
782

783
        auto &cancelled = cancelled_;
784
        auto &request_data = request_data_;
246✔
785

786
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
246✔
787
                if (!*cancelled) {
246✔
788
                        WriteHeaderHandler(ec, num_written);
246✔
789
                }
790
        };
492✔
791

792
        switch (socket_mode_) {
246✔
793
        case SocketMode::TlsTls:
2✔
794
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
2✔
795
                break;
796
        case SocketMode::Tls:
14✔
797
                http::async_write_header(
14✔
798
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
799
                break;
800
        case SocketMode::Plain:
230✔
801
                http::async_write_header(
230✔
802
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
803
                break;
804
        }
805
}
806

807
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
246✔
808
        if (num_written > 0) {
246✔
809
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
492✔
810
        }
811

812
        if (ec) {
246✔
813
                CallErrorHandler(ec, request_, header_handler_);
×
814
                return;
206✔
815
        }
816

817
        auto exp_has_body =
818
                HasBody(request_->GetHeader("Content-Length"), request_->GetHeader("Transfer-Encoding"));
492✔
819
        if (!exp_has_body) {
246✔
820
                CallErrorHandler(exp_has_body.error(), request_, header_handler_);
×
821
                return;
×
822
        }
823
        if (!exp_has_body.value()) {
246✔
824
                ReadHeader();
205✔
825
                return;
826
        }
827

828
        if (!request_->body_gen_ && !request_->async_body_gen_) {
41✔
829
                auto err = MakeError(BodyMissingError, "No body generator");
2✔
830
                CallErrorHandler(err, request_, header_handler_);
2✔
831
                return;
832
        }
833

834
        assert(!(request_->body_gen_ && request_->async_body_gen_));
835

836
        if (request_->body_gen_) {
40✔
837
                auto body_reader = request_->body_gen_();
34✔
838
                if (!body_reader) {
34✔
839
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
840
                        return;
841
                }
842
                request_->body_reader_ = body_reader.value();
34✔
843
        } else {
844
                auto body_reader = request_->async_body_gen_();
6✔
845
                if (!body_reader) {
6✔
846
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
847
                        return;
848
                }
849
                request_->async_body_reader_ = body_reader.value();
6✔
850
        }
851

852
        PrepareAndWriteNewBodyBuffer();
40✔
853
}
854

855
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,280✔
856
        if (num_written > 0) {
2,280✔
857
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,244✔
858
        }
859

860
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,280✔
861
                // Write next block of the body.
862
                PrepareAndWriteNewBodyBuffer();
1,121✔
863
        } else if (ec) {
1,159✔
864
                CallErrorHandler(ec, request_, header_handler_);
8✔
865
        } else if (num_written > 0) {
1,155✔
866
                // We are still writing the body.
867
                WriteBody();
1,122✔
868
        } else {
869
                // We are ready to receive the response.
870
                ReadHeader();
33✔
871
        }
872
}
2,280✔
873

874
void Client::PrepareAndWriteNewBodyBuffer() {
1,161✔
875
        // request_->body_reader_ XOR request_->async_body_reader_
876
        assert(
877
                (request_->body_reader_ || request_->async_body_reader_)
878
                && !(request_->body_reader_ && request_->async_body_reader_));
879

880
        auto cancelled = cancelled_;
881
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,594✔
882
                if (!*cancelled) {
1,161✔
883
                        if (!read) {
1,160✔
884
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
885
                                return;
2✔
886
                        }
887
                        WriteNewBodyBuffer(read.value());
1,158✔
888
                }
889
        };
1,161✔
890

891

892
        if (request_->body_reader_) {
1,161✔
893
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,472✔
894
        } else {
895
                auto err = request_->async_body_reader_->AsyncRead(
896
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
897
                if (err != error::NoError) {
425✔
898
                        CallErrorHandler(err, request_, header_handler_);
×
899
                }
900
        }
901
}
1,161✔
902

903
void Client::WriteNewBodyBuffer(size_t size) {
1,158✔
904
        request_data_.http_request_->body().data = body_buffer_.data();
1,158✔
905
        request_data_.http_request_->body().size = size;
1,158✔
906

907
        if (size > 0) {
1,158✔
908
                request_data_.http_request_->body().more = true;
1,125✔
909
        } else {
910
                // Release ownership of Body reader.
911
                request_->body_reader_.reset();
33✔
912
                request_->async_body_reader_.reset();
33✔
913
                request_data_.http_request_->body().more = false;
33✔
914
        }
915

916
        WriteBody();
1,158✔
917
}
1,158✔
918

919
void Client::WriteBody() {
2,280✔
920
        auto &cancelled = cancelled_;
921
        auto &request_data = request_data_;
2,280✔
922

923
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,280✔
924
                if (!*cancelled) {
2,280✔
925
                        WriteBodyHandler(ec, num_written);
2,280✔
926
                }
927
        };
4,560✔
928

929
        switch (socket_mode_) {
2,280✔
930
        case SocketMode::TlsTls:
×
931
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
932
                break;
933
        case SocketMode::Tls:
×
934
                http::async_write_some(
935
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
936
                break;
937
        case SocketMode::Plain:
2,280✔
938
                http::async_write_some(
939
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
940
                break;
941
        }
942
}
2,280✔
943

944
void Client::ReadHeader() {
238✔
945
        auto &cancelled = cancelled_;
946
        auto &response_data = response_data_;
238✔
947

948
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
235✔
949
                if (!*cancelled) {
235✔
950
                        ReadHeaderHandler(ec, num_read);
235✔
951
                }
952
        };
476✔
953

954
        switch (socket_mode_) {
238✔
955
        case SocketMode::TlsTls:
2✔
956
                http::async_read_some(
2✔
957
                        *stream_,
958
                        *response_data_.response_buffer_,
959
                        *response_data_.http_response_parser_,
960
                        handler);
961
                break;
962
        case SocketMode::Tls:
14✔
963
                http::async_read_some(
14✔
964
                        stream_->next_layer(),
965
                        *response_data_.response_buffer_,
966
                        *response_data_.http_response_parser_,
967
                        handler);
968
                break;
969
        case SocketMode::Plain:
222✔
970
                http::async_read_some(
222✔
971
                        stream_->next_layer().next_layer(),
972
                        *response_data_.response_buffer_,
973
                        *response_data_.http_response_parser_,
974
                        handler);
975
                break;
976
        }
977
}
238✔
978

979
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
235✔
980
        if (num_read > 0) {
235✔
981
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
460✔
982
        }
983

984
        if (ec) {
235✔
985
                CallErrorHandler(ec, request_, header_handler_);
5✔
986
                return;
65✔
987
        }
988

989
        if (!response_data_.http_response_parser_->is_header_done()) {
230✔
990
                ReadHeader();
×
991
                return;
×
992
        }
993

994
        if (secondary_req_) {
230✔
995
                HandleSecondaryRequest();
9✔
996
                return;
9✔
997
        }
998

999
        response_.reset(new IncomingResponse(*this, cancelled_));
442✔
1000
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
221✔
1001
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
221✔
1002

1003
        logger_.Debug(
442✔
1004
                "Received response: " + to_string(response_->status_code_) + " "
442✔
1005
                + response_->status_message_);
663✔
1006

1007
        string debug_str;
1008
        for (auto header = response_data_.http_response_parser_->get().cbegin();
255✔
1009
                 header != response_data_.http_response_parser_->get().cend();
476✔
1010
                 header++) {
1011
                response_->headers_[string {header->name_string()}] = string {header->value()};
765✔
1012
                if (logger_.Level() >= log::LogLevel::Debug) {
255✔
1013
                        debug_str += string {header->name_string()};
240✔
1014
                        debug_str += ": ";
240✔
1015
                        debug_str += string {header->value()};
240✔
1016
                        debug_str += "\n";
240✔
1017
                }
1018
        }
1019

1020
        logger_.Debug("Received headers:\n" + debug_str);
442✔
1021
        debug_str.clear();
1022

1023
        if (GetContentLength(*response_data_.http_response_parser_) == 0
221✔
1024
                && !response_data_.http_response_parser_->chunked()) {
221✔
1025
                auto cancelled = cancelled_;
1026
                status_ = TransactionStatus::HeaderHandlerCalled;
48✔
1027
                CallHandler(header_handler_);
96✔
1028
                if (!*cancelled) {
48✔
1029
                        status_ = TransactionStatus::Done;
43✔
1030
                        if (response_->status_code_ != StatusCode::StatusSwitchingProtocols) {
43✔
1031
                                // Make an exception for 101 Switching Protocols response, where the TCP connection
1032
                                // is meant to be reused.
1033
                                DoCancel();
39✔
1034
                        }
1035
                        CallHandler(body_handler_);
86✔
1036
                }
1037
                return;
1038
        }
1039

1040
        auto cancelled = cancelled_;
1041
        status_ = TransactionStatus::HeaderHandlerCalled;
173✔
1042
        CallHandler(header_handler_);
346✔
1043
        if (*cancelled) {
173✔
1044
                return;
1045
        }
1046

1047
        // We know that a body reader is required here, because of the check for body above.
1048
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
170✔
1049
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
1050
        }
1051
}
1052

1053
void Client::HandleSecondaryRequest() {
9✔
1054
        logger_.Debug(
18✔
1055
                "Received proxy response: "
1056
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
18✔
1057
                + string {response_data_.http_response_parser_->get().reason()});
36✔
1058

1059
        request_ = std::move(secondary_req_);
1060

1061
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
9✔
1062
                auto err = MakeError(
1063
                        ProxyError,
1064
                        "Proxy returned unexpected response: "
1065
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
1066
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
1067
                CallErrorHandler(err, request_, header_handler_);
4✔
1068
                return;
1069
        }
1070

1071
        if (GetContentLength(*response_data_.http_response_parser_) != 0
7✔
1072
                || response_data_.http_response_parser_->chunked()) {
7✔
1073
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
1074
                CallErrorHandler(err, request_, header_handler_);
×
1075
                return;
1076
        }
1077

1078
        // We are connected. Now repeat the request cycle with the original request. Pretend
1079
        // we were just connected.
1080

1081
        assert(request_->GetProtocol() == "https");
1082

1083
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1084
        // a different layer, this will get out of sync.
1085
        assert(response_data_.response_buffer_->size() == 0);
1086

1087
        switch (socket_mode_) {
7✔
1088
        case SocketMode::TlsTls:
×
1089
                // Should never get here, because this is the only place where TlsTls mode
1090
                // is supposed to be turned on.
1091
                assert(false);
1092
                CallErrorHandler(
×
1093
                        error::MakeError(
×
1094
                                error::ProgrammingError,
1095
                                "Any other mode than Tls is not valid when handling secondary request"),
×
1096
                        request_,
×
1097
                        header_handler_);
×
1098
                break;
×
1099
        case SocketMode::Tls:
3✔
1100
                // Upgrade to TLS inside TLS.
1101
                socket_mode_ = SocketMode::TlsTls;
3✔
1102
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1103
                break;
3✔
1104
        case SocketMode::Plain:
4✔
1105
                // Upgrade to TLS.
1106
                socket_mode_ = SocketMode::Tls;
4✔
1107
                HandshakeHandler(
4✔
1108
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
4✔
1109
                break;
4✔
1110
        }
1111
}
1112

1113
void Client::AsyncReadNextBodyPart(
4,201✔
1114
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1115
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1116

1117
        if (status_ == TransactionStatus::ReaderCreated) {
4,201✔
1118
                status_ = TransactionStatus::BodyReadingInProgress;
151✔
1119
        }
1120

1121
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
4,201✔
1122
                auto cancelled = cancelled_;
1123
                handler(0);
184✔
1124
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
92✔
1125
                        status_ = TransactionStatus::Done;
92✔
1126
                        DoCancel();
92✔
1127
                        CallHandler(body_handler_);
184✔
1128
                }
1129
                return;
1130
        }
1131

1132
        reader_buf_start_ = start;
4,109✔
1133
        reader_buf_end_ = end;
4,109✔
1134
        reader_handler_ = handler;
4,109✔
1135
        size_t read_size = end - start;
4,109✔
1136
        size_t smallest = min(body_buffer_.size(), read_size);
6,222✔
1137

1138
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
4,109✔
1139
        response_data_.http_response_parser_->get().body().size = smallest;
4,109✔
1140
        response_data_.last_buffer_size_ = smallest;
4,109✔
1141

1142
        auto &cancelled = cancelled_;
1143
        auto &response_data = response_data_;
4,109✔
1144

1145
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4,108✔
1146
                if (!*cancelled) {
4,108✔
1147
                        ReadBodyHandler(ec, num_read);
4,108✔
1148
                }
1149
        };
8,218✔
1150

1151
        switch (socket_mode_) {
4,109✔
1152
        case SocketMode::TlsTls:
2✔
1153
                http::async_read_some(
2✔
1154
                        *stream_,
1155
                        *response_data_.response_buffer_,
1156
                        *response_data_.http_response_parser_,
1157
                        async_handler);
1158
                break;
1159
        case SocketMode::Tls:
4✔
1160
                http::async_read_some(
4✔
1161
                        stream_->next_layer(),
1162
                        *response_data_.response_buffer_,
1163
                        *response_data_.http_response_parser_,
1164
                        async_handler);
1165
                break;
1166
        case SocketMode::Plain:
4,103✔
1167
                http::async_read_some(
4,103✔
1168
                        stream_->next_layer().next_layer(),
1169
                        *response_data_.response_buffer_,
1170
                        *response_data_.http_response_parser_,
1171
                        async_handler);
1172
                break;
1173
        }
1174
}
1175

1176
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
4,108✔
1177
        if (num_read > 0) {
4,108✔
1178
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
8,116✔
1179
        }
1180

1181
        if (ec == http::make_error_code(http::error::need_buffer)) {
4,108✔
1182
                // This can be ignored. We always reset the buffer between reads anyway.
1183
                ec = error_code();
1,958✔
1184
        }
1185

1186
        assert(reader_handler_);
1187

1188
        if (response_data_.http_response_parser_->is_done()) {
4,108✔
1189
                status_ = TransactionStatus::BodyReadingFinished;
98✔
1190
        }
1191

1192
        auto cancelled = cancelled_;
1193

1194
        if (ec) {
4,108✔
1195
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
100✔
1196
                reader_handler_(expected::unexpected(err));
150✔
1197
                if (!*cancelled) {
50✔
1198
                        CallErrorHandler(ec, request_, body_handler_);
92✔
1199
                }
1200
                return;
1201
        }
1202

1203
        // The num_read from above includes out of band payload data, such as chunk headers, which
1204
        // we are not interested in. So we need to calculate the payload size from the remaining
1205
        // buffer space.
1206
        size_t payload_read =
1207
                response_data_.last_buffer_size_ - response_data_.http_response_parser_->get().body().size;
4,058✔
1208

1209
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
4,058✔
1210
        size_t smallest = min(payload_read, buf_size);
4,058✔
1211

1212
        if (smallest == 0) {
4,058✔
1213
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1214
                // return 0 to the handler however, because in `io::Reader` context this means
1215
                // EOF. So just repeat the request instead, until we get actual payload data.
1216
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1217
        } else {
1218
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,981✔
1219
                reader_handler_(smallest);
7,962✔
1220
        }
1221
}
1222

1223
void Client::Cancel() {
203✔
1224
        auto cancelled = cancelled_;
1225

1226
        if (!*cancelled) {
203✔
1227
                auto err =
1228
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
40✔
1229
                switch (status_) {
20✔
1230
                case TransactionStatus::None:
3✔
1231
                        CallErrorHandler(err, request_, header_handler_);
3✔
1232
                        break;
3✔
1233
                case TransactionStatus::HeaderHandlerCalled:
16✔
1234
                case TransactionStatus::ReaderCreated:
1235
                case TransactionStatus::BodyReadingInProgress:
1236
                case TransactionStatus::BodyReadingFinished:
1237
                        CallErrorHandler(err, request_, body_handler_);
16✔
1238
                        break;
16✔
1239
                case TransactionStatus::Replying:
1240
                case TransactionStatus::SwitchingProtocol:
1241
                        // Not used by client.
1242
                        assert(false);
1243
                        break;
1244
                case TransactionStatus::BodyHandlerCalled:
1245
                case TransactionStatus::Done:
1246
                        break;
1247
                }
1248
        }
1249

1250
        if (!*cancelled) {
203✔
1251
                DoCancel();
1✔
1252
        }
1253
}
203✔
1254

1255
void Client::DoCancel() {
610✔
1256
        resolver_.cancel();
610✔
1257
        if (stream_) {
610✔
1258
                stream_->lowest_layer().cancel();
260✔
1259
                stream_->lowest_layer().close();
260✔
1260
                stream_.reset();
260✔
1261
        }
1262

1263
        // Reset logger to no connection.
1264
        logger_ = log::Logger(logger_name_);
610✔
1265

1266
        // Set cancel state and then make a new one. Those who are interested should have their own
1267
        // pointer to the old one.
1268
        *cancelled_ = true;
610✔
1269
        cancelled_ = make_shared<bool>(true);
610✔
1270
}
610✔
1271

1272
Stream::Stream(Server &server) :
441✔
1273
        server_ {server},
1274
        logger_ {"http"},
1275
        cancelled_(make_shared<bool>(true)),
441✔
1276
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
441✔
1277
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,323✔
1278
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
882✔
1279

1280
        // This is equivalent to:
1281
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1282
        // but compatible with Boost 1.67.
1283
        request_data_.request_buffer_->prepare(
1284
                body_buffer_.size() - request_data_.request_buffer_->size());
441✔
1285

1286
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
882✔
1287

1288
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1289
        // they do, they should be handled higher up in the application logic.
1290
        //
1291
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1292
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1293
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1294
        // possible value instead.
1295
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1296
}
441✔
1297

1298
Stream::~Stream() {
1,323✔
1299
        DoCancel();
441✔
1300
}
441✔
1301

1302
void Stream::Cancel() {
7✔
1303
        auto cancelled = cancelled_;
1304

1305
        if (!*cancelled) {
7✔
1306
                auto err =
1307
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1308
                switch (status_) {
7✔
1309
                case TransactionStatus::None:
×
1310
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1311
                        break;
×
1312
                case TransactionStatus::HeaderHandlerCalled:
5✔
1313
                case TransactionStatus::ReaderCreated:
1314
                case TransactionStatus::BodyReadingInProgress:
1315
                case TransactionStatus::BodyReadingFinished:
1316
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1317
                        break;
5✔
1318
                case TransactionStatus::BodyHandlerCalled:
×
1319
                        // In between body handler and reply finished. No one to handle the status
1320
                        // here.
1321
                        server_.RemoveStream(shared_from_this());
×
1322
                        break;
×
1323
                case TransactionStatus::Replying:
1✔
1324
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1325
                        break;
1✔
1326
                case TransactionStatus::SwitchingProtocol:
1✔
1327
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1328
                        break;
1✔
1329
                case TransactionStatus::Done:
1330
                        break;
1331
                }
1332
        }
1333

1334
        if (!*cancelled) {
7✔
1335
                DoCancel();
×
1336
        }
1337
}
7✔
1338

1339
void Stream::DoCancel() {
795✔
1340
        if (socket_.is_open()) {
795✔
1341
                socket_.cancel();
217✔
1342
                socket_.close();
217✔
1343
        }
1344

1345
        // Set cancel state and then make a new one. Those who are interested should have their own
1346
        // pointer to the old one.
1347
        *cancelled_ = true;
795✔
1348
        cancelled_ = make_shared<bool>(true);
795✔
1349
}
795✔
1350

1351
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1352
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1353
}
×
1354

1355
void Stream::CallErrorHandler(
×
1356
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1357
        status_ = TransactionStatus::Done;
×
1358
        DoCancel();
×
1359
        handler(expected::unexpected(err.WithContext(
×
1360
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1361

1362
        server_.RemoveStream(shared_from_this());
×
1363
}
×
1364

1365
void Stream::CallErrorHandler(
2✔
1366
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1367
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1368
}
2✔
1369

1370
void Stream::CallErrorHandler(
8✔
1371
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1372
        status_ = TransactionStatus::Done;
8✔
1373
        DoCancel();
8✔
1374
        handler(
8✔
1375
                req,
1376
                err.WithContext(
8✔
1377
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
24✔
1378

1379
        server_.RemoveStream(shared_from_this());
8✔
1380
}
8✔
1381

1382
void Stream::CallErrorHandler(
4✔
1383
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1384
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1385
}
4✔
1386

1387
void Stream::CallErrorHandler(
7✔
1388
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1389
        status_ = TransactionStatus::Done;
7✔
1390
        DoCancel();
7✔
1391
        handler(err.WithContext(
14✔
1392
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1393

1394
        server_.RemoveStream(shared_from_this());
7✔
1395
}
7✔
1396

1397
void Stream::CallErrorHandler(
×
1398
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1399
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1400
}
×
1401

1402
void Stream::CallErrorHandler(
1✔
1403
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1404
        status_ = TransactionStatus::Done;
1✔
1405
        DoCancel();
1✔
1406
        handler(expected::unexpected(err.WithContext(
2✔
1407
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1408

1409
        server_.RemoveStream(shared_from_this());
1✔
1410
}
1✔
1411

1412
void Stream::AcceptHandler(const error_code &ec) {
225✔
1413
        if (ec) {
225✔
1414
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1415
                return;
×
1416
        }
1417

1418
        auto ip = socket_.remote_endpoint().address().to_string();
450✔
1419

1420
        // Use IP as context for logging.
1421
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
225✔
1422

1423
        logger_.Debug("Accepted connection.");
450✔
1424

1425
        request_.reset(new IncomingRequest(*this, cancelled_));
450✔
1426

1427
        request_->address_.host = ip;
225✔
1428

1429
        *cancelled_ = false;
225✔
1430

1431
        ReadHeader();
225✔
1432
}
1433

1434
void Stream::ReadHeader() {
225✔
1435
        auto &cancelled = cancelled_;
1436
        auto &request_data = request_data_;
225✔
1437

1438
        http::async_read_some(
450✔
1439
                socket_,
225✔
1440
                *request_data_.request_buffer_,
1441
                *request_data_.http_request_parser_,
1442
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
225✔
1443
                        if (!*cancelled) {
225✔
1444
                                ReadHeaderHandler(ec, num_read);
225✔
1445
                        }
1446
                });
225✔
1447
}
225✔
1448

1449
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
225✔
1450
        if (num_read > 0) {
225✔
1451
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
450✔
1452
        }
1453

1454
        if (ec) {
225✔
1455
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1456
                return;
185✔
1457
        }
1458

1459
        if (!request_data_.http_request_parser_->is_header_done()) {
225✔
1460
                ReadHeader();
×
1461
                return;
×
1462
        }
1463

1464
        auto method_result = BeastVerbToMethod(
1465
                request_data_.http_request_parser_->get().base().method(),
1466
                string {request_data_.http_request_parser_->get().base().method_string()});
450✔
1467
        if (!method_result) {
225✔
1468
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1469
                return;
×
1470
        }
1471
        request_->method_ = method_result.value();
225✔
1472
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
225✔
1473

1474
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
225✔
1475

1476
        string debug_str;
1477
        for (auto header = request_data_.http_request_parser_->get().cbegin();
391✔
1478
                 header != request_data_.http_request_parser_->get().cend();
616✔
1479
                 header++) {
1480
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,173✔
1481
                if (logger_.Level() >= log::LogLevel::Debug) {
391✔
1482
                        debug_str += string {header->name_string()};
326✔
1483
                        debug_str += ": ";
326✔
1484
                        debug_str += string {header->value()};
326✔
1485
                        debug_str += "\n";
326✔
1486
                }
1487
        }
1488

1489
        logger_.Debug("Received headers:\n" + debug_str);
450✔
1490
        debug_str.clear();
1491

1492
        if (GetContentLength(*request_data_.http_request_parser_) == 0
225✔
1493
                && !request_data_.http_request_parser_->chunked()) {
225✔
1494
                auto cancelled = cancelled_;
1495
                status_ = TransactionStatus::HeaderHandlerCalled;
184✔
1496
                server_.header_handler_(request_);
368✔
1497
                if (!*cancelled) {
184✔
1498
                        status_ = TransactionStatus::BodyHandlerCalled;
184✔
1499
                        CallBodyHandler();
184✔
1500
                }
1501
                return;
1502
        }
1503

1504
        assert(!request_data_.http_request_parser_->is_done());
1505

1506
        auto cancelled = cancelled_;
1507
        status_ = TransactionStatus::HeaderHandlerCalled;
41✔
1508
        server_.header_handler_(request_);
82✔
1509
        if (*cancelled) {
41✔
1510
                return;
1511
        }
1512

1513
        // We know that a body reader is required here, because of the check for body above.
1514
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
40✔
1515
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1516
        }
1517
}
1518

1519
void Stream::AsyncReadNextBodyPart(
2,264✔
1520
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1521
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1522

1523
        if (status_ == TransactionStatus::ReaderCreated) {
2,264✔
1524
                status_ = TransactionStatus::BodyReadingInProgress;
39✔
1525
        }
1526

1527
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,264✔
1528
                auto cancelled = cancelled_;
1529
                handler(0);
66✔
1530
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
33✔
1531
                        status_ = TransactionStatus::BodyHandlerCalled;
33✔
1532
                        CallBodyHandler();
33✔
1533
                }
1534
                return;
1535
        }
1536

1537
        reader_buf_start_ = start;
2,231✔
1538
        reader_buf_end_ = end;
2,231✔
1539
        reader_handler_ = handler;
2,231✔
1540
        size_t read_size = end - start;
2,231✔
1541
        size_t smallest = min(body_buffer_.size(), read_size);
3,287✔
1542

1543
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,231✔
1544
        request_data_.http_request_parser_->get().body().size = smallest;
2,231✔
1545
        request_data_.last_buffer_size_ = smallest;
2,231✔
1546

1547
        auto &cancelled = cancelled_;
1548
        auto &request_data = request_data_;
2,231✔
1549

1550
        http::async_read_some(
4,462✔
1551
                socket_,
2,231✔
1552
                *request_data_.request_buffer_,
1553
                *request_data_.http_request_parser_,
1554
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,231✔
1555
                        if (!*cancelled) {
2,231✔
1556
                                ReadBodyHandler(ec, num_read);
2,231✔
1557
                        }
1558
                });
2,231✔
1559
}
1560

1561
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,231✔
1562
        if (num_read > 0) {
2,231✔
1563
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,454✔
1564
        }
1565

1566
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,231✔
1567
                // This can be ignored. We always reset the buffer between reads anyway.
1568
                ec = error_code();
979✔
1569
        }
1570

1571
        assert(reader_handler_);
1572

1573
        if (request_data_.http_request_parser_->is_done()) {
2,231✔
1574
                status_ = TransactionStatus::BodyReadingFinished;
33✔
1575
        }
1576

1577
        auto cancelled = cancelled_;
1578

1579
        if (ec) {
2,231✔
1580
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1581
                reader_handler_(expected::unexpected(err));
12✔
1582
                if (!*cancelled) {
4✔
1583
                        CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1584
                }
1585
                return;
1586
        }
1587

1588
        // The num_read from above includes out of band payload data, such as chunk headers, which
1589
        // we are not interested in. So we need to calculate the payload size from the remaining
1590
        // buffer space.
1591
        size_t payload_read =
1592
                request_data_.last_buffer_size_ - request_data_.http_request_parser_->get().body().size;
2,227✔
1593

1594
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,227✔
1595
        size_t smallest = min(payload_read, buf_size);
2,227✔
1596

1597
        if (smallest == 0) {
2,227✔
1598
                // We read nothing, which can happen if all we read was a chunk header. We cannot
1599
                // return 0 to the handler however, because in `io::Reader` context this means
1600
                // EOF. So just repeat the request instead, until we get actual payload data.
1601
                AsyncReadNextBodyPart(reader_buf_start_, reader_buf_end_, reader_handler_);
154✔
1602
        } else {
1603
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,150✔
1604
                reader_handler_(smallest);
4,300✔
1605
        }
1606
}
1607

1608
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
202✔
1609
        SetupResponse();
202✔
1610

1611
        reply_finished_handler_ = reply_finished_handler;
202✔
1612

1613
        auto &cancelled = cancelled_;
1614
        auto &response_data = response_data_;
202✔
1615

1616
        http::async_write_header(
404✔
1617
                socket_,
202✔
1618
                *response_data_.http_response_serializer_,
1619
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
202✔
1620
                        if (!*cancelled) {
202✔
1621
                                WriteHeaderHandler(ec, num_written);
201✔
1622
                        }
1623
                });
202✔
1624
}
202✔
1625

1626
void Stream::SetupResponse() {
211✔
1627
        auto response = maybe_response_.lock();
211✔
1628
        // Only called from existing responses, so this should always be true.
1629
        assert(response);
1630

1631
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1632
        status_ = TransactionStatus::Replying;
211✔
1633

1634
        // From here on we take shared ownership.
1635
        response_ = response;
1636

1637
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
422✔
1638

1639
        for (const auto &header : response->headers_) {
438✔
1640
                response_data_.http_response_->base().set(header.first, header.second);
227✔
1641
        }
1642

1643
        response_data_.http_response_->result(response->GetStatusCode());
211✔
1644
        response_data_.http_response_->reason(response->GetStatusMessage());
422✔
1645

1646
        response_data_.http_response_serializer_ =
1647
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
422✔
1648
}
211✔
1649

1650
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
201✔
1651
        if (num_written > 0) {
201✔
1652
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
402✔
1653
        }
1654

1655
        if (ec) {
201✔
1656
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1657
                return;
37✔
1658
        }
1659

1660
        auto exp_has_body =
1661
                HasBody(response_->GetHeader("Content-Length"), response_->GetHeader("Transfer-Encoding"));
402✔
1662
        if (!exp_has_body) {
201✔
1663
                CallErrorHandler(exp_has_body.error(), request_, reply_finished_handler_);
×
1664
                return;
×
1665
        }
1666
        if (!exp_has_body.value()) {
201✔
1667
                FinishReply();
36✔
1668
                return;
1669
        }
1670

1671
        if (!response_->body_reader_ && !response_->async_body_reader_) {
165✔
1672
                auto err = MakeError(BodyMissingError, "No body reader");
2✔
1673
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1674
                return;
1675
        }
1676

1677
        PrepareAndWriteNewBodyBuffer();
164✔
1678
}
1679

1680
void Stream::PrepareAndWriteNewBodyBuffer() {
2,074✔
1681
        // response_->body_reader_ XOR response_->async_body_reader_
1682
        assert(
1683
                (response_->body_reader_ || response_->async_body_reader_)
1684
                && !(response_->body_reader_ && response_->async_body_reader_));
1685

1686
        auto read_handler = [this](io::ExpectedSize read) {
2,075✔
1687
                if (!read) {
2,074✔
1688
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1689
                        return;
1✔
1690
                }
1691
                WriteNewBodyBuffer(read.value());
2,073✔
1692
        };
2,074✔
1693

1694
        if (response_->body_reader_) {
2,074✔
1695
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,600✔
1696
        } else {
1697
                auto err = response_->async_body_reader_->AsyncRead(
1698
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1699
                if (err != error::NoError) {
274✔
1700
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1701
                }
1702
        }
1703
}
2,074✔
1704

1705
void Stream::WriteNewBodyBuffer(size_t size) {
2,073✔
1706
        response_data_.http_response_->body().data = body_buffer_.data();
2,073✔
1707
        response_data_.http_response_->body().size = size;
2,073✔
1708

1709
        if (size > 0) {
2,073✔
1710
                response_data_.http_response_->body().more = true;
1,946✔
1711
        } else {
1712
                response_data_.http_response_->body().more = false;
127✔
1713
        }
1714

1715
        WriteBody();
2,073✔
1716
}
2,073✔
1717

1718
void Stream::WriteBody() {
3,999✔
1719
        auto &cancelled = cancelled_;
1720
        auto &response_data = response_data_;
3,999✔
1721

1722
        http::async_write_some(
7,998✔
1723
                socket_,
3,999✔
1724
                *response_data_.http_response_serializer_,
1725
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
3,957✔
1726
                        if (!*cancelled) {
3,957✔
1727
                                WriteBodyHandler(ec, num_written);
3,957✔
1728
                        }
1729
                });
3,957✔
1730
}
3,999✔
1731

1732
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
3,957✔
1733
        if (num_written > 0) {
3,957✔
1734
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,852✔
1735
        }
1736

1737
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,957✔
1738
                // Write next body block.
1739
                PrepareAndWriteNewBodyBuffer();
1,910✔
1740
        } else if (ec) {
2,047✔
1741
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1742
        } else if (num_written > 0) {
2,043✔
1743
                // We are still writing the body.
1744
                WriteBody();
1,926✔
1745
        } else {
1746
                // We are finished.
1747
                FinishReply();
117✔
1748
        }
1749
}
3,957✔
1750

1751
void Stream::FinishReply() {
153✔
1752
        // We are done.
1753
        status_ = TransactionStatus::Done;
153✔
1754
        DoCancel();
153✔
1755
        // Release ownership of Body reader.
1756
        response_->body_reader_.reset();
153✔
1757
        response_->async_body_reader_.reset();
153✔
1758
        reply_finished_handler_(error::NoError);
153✔
1759
        server_.RemoveStream(shared_from_this());
153✔
1760
}
153✔
1761

1762
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1763
        SetupResponse();
9✔
1764

1765
        switch_protocol_handler_ = handler;
9✔
1766
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1767

1768
        auto &cancelled = cancelled_;
1769
        auto &response_data = response_data_;
9✔
1770

1771
        http::async_write_header(
18✔
1772
                socket_,
9✔
1773
                *response_data_.http_response_serializer_,
1774
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1775
                        if (!*cancelled) {
9✔
1776
                                SwitchingProtocolHandler(ec, num_written);
8✔
1777
                        }
1778
                });
9✔
1779

1780
        return error::NoError;
9✔
1781
}
1782

1783
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1784
        if (num_written > 0) {
8✔
1785
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1786
        }
1787

1788
        if (ec) {
8✔
1789
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1790
                return;
×
1791
        }
1792

1793
        auto socket = make_shared<RawSocket<tcp::socket>>(
1794
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1795

1796
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1797

1798
        // Rest of the connection is done directly on the socket, set cancelled_ but don't close it.
1799
        *cancelled_ = true;
8✔
1800
        cancelled_ = make_shared<bool>(true);
8✔
1801
        server_.RemoveStream(shared_from_this());
16✔
1802

1803
        switch_protocol_handler(socket);
16✔
1804
}
1805

1806
void Stream::CallBodyHandler() {
217✔
1807
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1808
        // it immediately destroys, which would destroy this stream as well. At the end of this
1809
        // function, it's ok to destroy it.
1810
        auto stream_ref = shared_from_this();
1811

1812
        server_.body_handler_(request_, error::NoError);
651✔
1813

1814
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1815
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1816
        // request has not been handled correctly.
1817
        auto response = maybe_response_.lock();
217✔
1818
        if (!response) {
217✔
1819
                logger_.Error("Handler produced no response. Closing stream prematurely.");
6✔
1820
                *cancelled_ = true;
3✔
1821
                cancelled_ = make_shared<bool>(true);
3✔
1822
                server_.RemoveStream(shared_from_this());
9✔
1823
        }
1824
}
217✔
1825

1826
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
237✔
1827
        event_loop_ {event_loop},
1828
        acceptor_(GetAsioIoContext(event_loop_)) {
428✔
1829
}
237✔
1830

1831
Server::~Server() {
474✔
1832
        Cancel();
237✔
1833
}
237✔
1834

1835
error::Error Server::AsyncServeUrl(
202✔
1836
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1837
        return AsyncServeUrl(
1838
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
822✔
1839
                        if (err != error::NoError) {
212✔
1840
                                body_handler(expected::unexpected(err));
12✔
1841
                        } else {
1842
                                body_handler(req);
412✔
1843
                        }
1844
                });
616✔
1845
}
1846

1847
error::Error Server::AsyncServeUrl(
217✔
1848
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1849
        auto err = BreakDownUrl(url, address_);
217✔
1850
        if (error::NoError != err) {
217✔
1851
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1852
        }
1853

1854
        if (address_.protocol != "http") {
217✔
1855
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1856
        }
1857

1858
        if (address_.path.size() > 0 && address_.path != "/") {
217✔
1859
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1860
        }
1861

1862
        boost::system::error_code ec;
216✔
1863
        auto address = asio::ip::make_address(address_.host, ec);
216✔
1864
        if (ec) {
216✔
1865
                return error::Error(
1866
                        ec.default_error_condition(),
×
1867
                        "Could not construct endpoint from address " + address_.host);
×
1868
        }
1869

1870
        asio::ip::tcp::endpoint endpoint(address, address_.port);
216✔
1871

1872
        ec.clear();
1873
        acceptor_.open(endpoint.protocol(), ec);
216✔
1874
        if (ec) {
216✔
1875
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1876
        }
1877

1878
        // Allow address reuse, otherwise we can't re-bind later.
1879
        ec.clear();
1880
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
216✔
1881
        if (ec) {
216✔
1882
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1883
        }
1884

1885
        ec.clear();
1886
        acceptor_.bind(endpoint, ec);
216✔
1887
        if (ec) {
216✔
1888
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1889
        }
1890

1891
        ec.clear();
1892
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
216✔
1893
        if (ec) {
216✔
1894
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1895
        }
1896

1897
        header_handler_ = header_handler;
216✔
1898
        body_handler_ = body_handler;
216✔
1899

1900
        PrepareNewStream();
216✔
1901

1902
        return error::NoError;
216✔
1903
}
1904

1905
void Server::Cancel() {
257✔
1906
        if (acceptor_.is_open()) {
257✔
1907
                acceptor_.cancel();
216✔
1908
                acceptor_.close();
216✔
1909
        }
1910
        streams_.clear();
1911
}
257✔
1912

1913
uint16_t Server::GetPort() const {
17✔
1914
        return acceptor_.local_endpoint().port();
17✔
1915
}
1916

1917
string Server::GetUrl() const {
16✔
1918
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1919
}
1920

1921
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
216✔
1922
        if (*req->cancelled_) {
216✔
1923
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1924
        }
1925
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
432✔
1926
        req->stream_.maybe_response_ = response;
216✔
1927
        return response;
216✔
1928
}
1929

1930
error::Error Server::AsyncReply(
202✔
1931
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1932
        if (*resp->cancelled_) {
202✔
1933
                return MakeError(StreamCancelledError, "Cannot send response");
×
1934
        }
1935

1936
        resp->stream_.AsyncReply(reply_finished_handler);
202✔
1937
        return error::NoError;
202✔
1938
}
1939

1940
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
58✔
1941
        if (*req->cancelled_) {
58✔
1942
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1943
        }
1944

1945
        auto &stream = req->stream_;
58✔
1946
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
58✔
1947
                return expected::unexpected(error::Error(
1✔
1948
                        make_error_condition(errc::operation_in_progress),
2✔
1949
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1950
        }
1951

1952
        if (GetContentLength(*stream.request_data_.http_request_parser_) == 0
57✔
1953
                && !stream.request_data_.http_request_parser_->chunked()) {
57✔
1954
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
54✔
1955
        }
1956

1957
        stream.status_ = TransactionStatus::ReaderCreated;
39✔
1958
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
78✔
1959
}
1960

1961
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1962
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1963
}
1964

1965
void Server::PrepareNewStream() {
441✔
1966
        StreamPtr new_stream {new Stream(*this)};
441✔
1967
        streams_.insert(new_stream);
1968
        AsyncAccept(new_stream);
882✔
1969
}
441✔
1970

1971
void Server::AsyncAccept(StreamPtr stream) {
441✔
1972
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
669✔
1973
                if (ec) {
228✔
1974
                        if (ec != errc::operation_canceled) {
3✔
1975
                                log::Error("Could not accept connection: " + ec.message());
×
1976
                        }
1977
                        return;
3✔
1978
                }
1979

1980
                stream->AcceptHandler(ec);
225✔
1981

1982
                this->PrepareNewStream();
225✔
1983
        });
1984
}
441✔
1985

1986
void Server::RemoveStream(StreamPtr stream) {
185✔
1987
        streams_.erase(stream);
185✔
1988

1989
        stream->DoCancel();
185✔
1990
}
185✔
1991

1992
} // namespace http
1993
} // namespace common
1994
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc