• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1057637229

01 Nov 2023 01:17PM UTC coverage: 79.933% (-0.3%) from 80.207%
1057637229

push

gitlab-ci

oleorhagen
chore: Alias mender-authd.service -> mender-client.service

This aliases the systemd.service for `mender-authd` to `mender-client`, so that
other existing third-party services relying on it will still work transparently.

Ticket: MEN-6812

Signed-off-by: Ole Petter <ole.orhagen@northern.tech>

6899 of 8631 relevant lines covered (79.93%)

9322.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.28
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25
#include <common/crypto.hpp>
26

27
namespace mender {
28
namespace http {
29

30
namespace common = mender::common;
31
namespace crypto = mender::common::crypto;
32

33
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
34
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
35
const unsigned int BeastHttpVersion = 11;
36

37
namespace asio = boost::asio;
38
namespace http = boost::beast::http;
39

40
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
41

42
static http::verb MethodToBeastVerb(Method method) {
236✔
43
        switch (method) {
236✔
44
        case Method::GET:
45
                return http::verb::get;
46
        case Method::HEAD:
47
                return http::verb::head;
48
        case Method::POST:
49
                return http::verb::post;
50
        case Method::PUT:
51
                return http::verb::put;
52
        case Method::PATCH:
53
                return http::verb::patch;
54
        case Method::CONNECT:
55
                return http::verb::connect;
56
        case Method::Invalid:
57
                // Fallthrough to end (no-op).
58
                break;
59
        }
60
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
61
        // still assert here for safety.
62
        assert(false);
63
        return http::verb::get;
64
}
65

66
static expected::expected<Method, error::Error> BeastVerbToMethod(
219✔
67
        http::verb verb, const string &verb_string) {
68
        switch (verb) {
219✔
69
        case http::verb::get:
183✔
70
                return Method::GET;
71
        case http::verb::head:
×
72
                return Method::HEAD;
73
        case http::verb::post:
13✔
74
                return Method::POST;
75
        case http::verb::put:
23✔
76
                return Method::PUT;
77
        case http::verb::patch:
×
78
                return Method::PATCH;
79
        case http::verb::connect:
×
80
                return Method::CONNECT;
81
        default:
×
82
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
83
        }
84
}
85

86
template <typename StreamType>
87
class BodyAsyncReader : virtual public io::AsyncReader {
88
public:
89
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
146✔
90
                stream_ {stream},
91
                cancelled_ {cancelled} {
292✔
92
        }
146✔
93
        ~BodyAsyncReader() {
38✔
94
                Cancel();
38✔
95
        }
76✔
96

97
        error::Error AsyncRead(
2,034✔
98
                vector<uint8_t>::iterator start,
99
                vector<uint8_t>::iterator end,
100
                io::AsyncIoHandler handler) override {
101
                if (*cancelled_) {
2,034✔
102
                        return error::MakeError(
×
103
                                error::ProgrammingError,
104
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
105
                }
106
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,034✔
107
                return error::NoError;
2,034✔
108
        }
109

110
        void Cancel() override {
40✔
111
                if (!*cancelled_) {
40✔
112
                        stream_.Cancel();
4✔
113
                }
114
        }
40✔
115

116
private:
117
        StreamType &stream_;
118
        shared_ptr<bool> cancelled_;
119

120
        friend class Client;
121
        friend class Server;
122
};
123

124
template <typename StreamType>
125
class RawSocket : virtual public io::AsyncReadWriter {
126
public:
127
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
128
                destroying_ {make_shared<bool>(false)},
×
129
                stream_ {stream},
130
                buffered_ {buffered} {
×
131
                // If there are no buffered bytes, then we don't need it.
132
                if (buffered_ && buffered_->size() == 0) {
×
133
                        buffered_.reset();
×
134
                }
135
        }
×
136

137
        ~RawSocket() {
15✔
138
                *destroying_ = true;
15✔
139
                Cancel();
15✔
140
        }
30✔
141

142
        error::Error AsyncRead(
320✔
143
                vector<uint8_t>::iterator start,
144
                vector<uint8_t>::iterator end,
145
                io::AsyncIoHandler handler) override {
146
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
147
                // header and parts of the body in one block, return those first.
148
                if (buffered_) {
320✔
149
                        return DrainPrebufferedData(start, end, handler);
8✔
150
                }
151

152
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
153
                auto &destroying = destroying_;
154
                stream_->async_read_some(
632✔
155
                        read_buffer_,
316✔
156
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
157
                                if (*destroying) {
313✔
158
                                        return;
159
                                }
160

161
                                if (ec == asio::error::operation_aborted) {
313✔
162
                                        handler(expected::unexpected(error::Error(
12✔
163
                                                make_error_condition(errc::operation_canceled),
6✔
164
                                                "Could not read from socket")));
165
                                } else if (ec) {
310✔
166
                                        handler(expected::unexpected(
12✔
167
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
168
                                } else {
169
                                        handler(num_read);
608✔
170
                                }
171
                        });
172
                return error::NoError;
316✔
173
        }
174

175
        error::Error AsyncWrite(
309✔
176
                vector<uint8_t>::const_iterator start,
177
                vector<uint8_t>::const_iterator end,
178
                io::AsyncIoHandler handler) override {
179
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
180
                auto &destroying = destroying_;
181
                stream_->async_write_some(
618✔
182
                        write_buffer_,
309✔
183
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
184
                                if (*destroying) {
306✔
185
                                        return;
186
                                }
187

188
                                if (ec == asio::error::operation_aborted) {
306✔
189
                                        handler(expected::unexpected(error::Error(
×
190
                                                make_error_condition(errc::operation_canceled),
×
191
                                                "Could not write to socket")));
192
                                } else if (ec) {
306✔
193
                                        handler(expected::unexpected(
×
194
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
195
                                } else {
196
                                        handler(num_written);
612✔
197
                                }
198
                        });
199
                return error::NoError;
309✔
200
        }
201

202
        void Cancel() override {
28✔
203
                if (stream_->lowest_layer().is_open()) {
28✔
204
                        stream_->lowest_layer().cancel();
15✔
205
                        stream_->lowest_layer().close();
15✔
206
                }
207
        }
28✔
208

209
private:
210
        error::Error DrainPrebufferedData(
4✔
211
                vector<uint8_t>::iterator start,
212
                vector<uint8_t>::iterator end,
213
                io::AsyncIoHandler handler) {
214
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
215

216
                // These two lines are equivalent to:
217
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
218
                // but compatible with Boost 1.67.
219
                const beast::flat_buffer &cbuffered = *buffered_;
220
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
221
                buffered_->consume(to_copy);
4✔
222
                if (buffered_->size() == 0) {
4✔
223
                        // We don't need it anymore.
224
                        buffered_.reset();
4✔
225
                }
226
                handler(to_copy);
4✔
227
                return error::NoError;
4✔
228
        }
229

230
        shared_ptr<bool> destroying_;
231
        shared_ptr<StreamType> stream_;
232
        shared_ptr<beast::flat_buffer> buffered_;
233
        asio::mutable_buffer read_buffer_;
234
        asio::const_buffer write_buffer_;
235
};
236

237
Client::Client(
345✔
238
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
239
        event_loop_ {event_loop},
240
        logger_name_ {logger_name},
241
        client_config_ {client},
242
        http_proxy_ {client.http_proxy},
345✔
243
        https_proxy_ {client.https_proxy},
345✔
244
        no_proxy_ {client.no_proxy},
345✔
245
        cancelled_ {make_shared<bool>(true)},
345✔
246
        disable_keep_alive_ {client.disable_keep_alive},
247
        resolver_(GetAsioIoContext(event_loop)),
248
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,380✔
249
}
345✔
250

251
Client::~Client() {
2,070✔
252
        if (!*cancelled_) {
345✔
253
                logger_.Warning("Client destroyed while request is still active!");
32✔
254
        }
255
        DoCancel();
345✔
256
}
345✔
257

258
error::Error Client::Initialize() {
275✔
259
        if (initialized_) {
275✔
260
                return error::NoError;
68✔
261
        }
262

263
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
611✔
264
                ssl_ctx_[i].set_verify_mode(
816✔
265
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
266

267
                beast::error_code ec {};
409✔
268
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
409✔
269
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
270
                        ssl_ctx_[i].use_certificate_file(
271
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
272
                        if (ec) {
4✔
273
                                return error::Error(
274
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
275
                        }
276
                        auto exp_key = crypto::PrivateKey::Load(
277
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
278
                        if (!exp_key) {
3✔
279
                                return exp_key.error().WithContext(
280
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
281
                        }
282

283
                        const int ret =
284
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value().Get());
2✔
285
                        if (ret != 1) {
2✔
286
                                return MakeError(
287
                                        HTTPInitError,
288
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
289
                                                + " to the SSL CTX");
×
290
                        }
291
                } else if (
292
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
405✔
293
                        return error::Error(
294
                                make_error_condition(errc::invalid_argument),
4✔
295
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
296
                }
297

298
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
405✔
299
                if (ec) {
405✔
300
                        auto err = error::Error(
301
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
302
                        if (client_config_.server_cert_path == "") {
×
303
                                // We aren't going to have any valid certificates then.
304
                                return err;
×
305
                        } else {
306
                                // We have a dedicated certificate, so this is not fatal.
307
                                log::Info(err.String());
×
308
                        }
309
                }
310
                if (client_config_.server_cert_path != "") {
405✔
311
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
47✔
312
                        if (ec) {
47✔
313
                                return error::Error(
314
                                        ec.default_error_condition(), "Failed to load the server certificate!");
2✔
315
                        }
316
                }
317
        }
318

319
        initialized_ = true;
202✔
320

321
        return error::NoError;
202✔
322
}
323

324
error::Error Client::AsyncCall(
275✔
325
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
326
        auto err = Initialize();
275✔
327
        if (err != error::NoError) {
275✔
328
                return err;
5✔
329
        }
330

331
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
270✔
332
                return error::Error(
333
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
334
        }
335

336
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
270✔
337
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
338
        }
339

340
        if (!header_handler || !body_handler) {
268✔
341
                return error::MakeError(
342
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
343
        }
344

345
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
267✔
346
                return error::Error(
347
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
348
        }
349

350
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
266✔
351

352
        request_ = req;
353

354
        err = HandleProxySetup();
266✔
355
        if (err != error::NoError) {
266✔
356
                return err;
4✔
357
        }
358

359
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
360
        // request to route to our k8s cluster. Set this in all cases.
361
        req->SetHeader("HOST", req->address_.host);
524✔
362

363
        header_handler_ = header_handler;
262✔
364
        body_handler_ = body_handler;
262✔
365
        status_ = TransactionStatus::None;
262✔
366

367
        cancelled_ = make_shared<bool>(false);
262✔
368

369
        auto &cancelled = cancelled_;
370

371
        resolver_.async_resolve(
524✔
372
                request_->address_.host,
373
                to_string(request_->address_.port),
524✔
374
                [this, cancelled](
522✔
375
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
259✔
376
                        if (!*cancelled) {
260✔
377
                                ResolveHandler(ec, results);
259✔
378
                        }
379
                });
260✔
380

381
        return error::NoError;
262✔
382
}
383

384
error::Error Client::HandleProxySetup() {
266✔
385
        secondary_req_.reset();
266✔
386

387
        if (request_->address_.protocol == "http") {
266✔
388
                socket_mode_ = SocketMode::Plain;
243✔
389

390
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
243✔
391
                        // Make a modified proxy request.
392
                        BrokenDownUrl proxy_address;
18✔
393
                        auto err = BreakDownUrl(http_proxy_, proxy_address);
10✔
394
                        if (err != error::NoError) {
10✔
395
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
396
                        }
397
                        if (proxy_address.path != "" && proxy_address.path != "/") {
9✔
398
                                return MakeError(
399
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
400
                        }
401

402
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
16✔
403
                                                                          + ":" + to_string(request_->address_.port)
24✔
404
                                                                          + request_->address_.path;
24✔
405
                        request_->address_.host = proxy_address.host;
8✔
406
                        request_->address_.port = proxy_address.port;
8✔
407
                        request_->address_.protocol = proxy_address.protocol;
8✔
408

409
                        if (proxy_address.protocol == "https") {
8✔
410
                                socket_mode_ = SocketMode::Tls;
5✔
411
                        } else if (proxy_address.protocol == "http") {
3✔
412
                                socket_mode_ = SocketMode::Plain;
3✔
413
                        } else {
414
                                // Should never get here.
415
                                assert(false);
416
                        }
417
                }
418
        } else if (request_->address_.protocol == "https") {
23✔
419
                socket_mode_ = SocketMode::Tls;
23✔
420

421
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
23✔
422
                        // Save the original request for later, so that we can make a new request
423
                        // over the channel established by CONNECT.
424
                        secondary_req_ = std::move(request_);
425

426
                        request_ = make_shared<OutgoingRequest>();
26✔
427
                        request_->SetMethod(Method::CONNECT);
13✔
428
                        BrokenDownUrl proxy_address;
24✔
429
                        auto err = BreakDownUrl(https_proxy_, proxy_address);
13✔
430
                        if (err != error::NoError) {
13✔
431
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
432
                        }
433
                        if (proxy_address.path != "" && proxy_address.path != "/") {
12✔
434
                                return MakeError(
435
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
436
                        }
437

438
                        request_->address_.path =
439
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
22✔
440
                        request_->address_.host = proxy_address.host;
11✔
441
                        request_->address_.port = proxy_address.port;
11✔
442
                        request_->address_.protocol = proxy_address.protocol;
11✔
443

444
                        if (proxy_address.protocol == "https") {
11✔
445
                                socket_mode_ = SocketMode::Tls;
6✔
446
                        } else if (proxy_address.protocol == "http") {
5✔
447
                                socket_mode_ = SocketMode::Plain;
5✔
448
                        } else {
449
                                // Should never get here.
450
                                assert(false);
451
                        }
452
                }
453
        } else {
454
                // Should never get here
455
                assert(false);
456
        }
457

458
        return error::NoError;
262✔
459
}
460

461
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
165✔
462
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
165✔
463
                return expected::unexpected(error::Error(
2✔
464
                        make_error_condition(errc::operation_in_progress),
4✔
465
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
466
        }
467

468
        if (response_body_length_ == 0) {
163✔
469
                return expected::unexpected(
17✔
470
                        MakeError(BodyMissingError, "Response does not contain a body"));
51✔
471
        }
472

473
        status_ = TransactionStatus::ReaderCreated;
146✔
474
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
292✔
475
}
476

477
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
478
        if (*cancelled_) {
7✔
479
                return expected::unexpected(error::Error(
×
480
                        make_error_condition(errc::not_connected),
×
481
                        "Cannot switch protocols if endpoint is not connected"));
×
482
        }
483

484
        // Rest of the connection is done directly on the socket, we are done here.
485
        status_ = TransactionStatus::Done;
7✔
486
        *cancelled_ = true;
7✔
487
        cancelled_ = make_shared<bool>(false);
14✔
488

489
        auto stream = stream_;
490
        // This no longer belongs to us.
491
        stream_.reset();
7✔
492

493
        switch (socket_mode_) {
7✔
494
        case SocketMode::TlsTls:
×
495
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
496
                        stream, response_data_.response_buffer_);
×
497
        case SocketMode::Tls:
×
498
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
499
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
500
                        response_data_.response_buffer_);
×
501
        case SocketMode::Plain:
7✔
502
                return make_shared<RawSocket<tcp::socket>>(
7✔
503
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
504
                        response_data_.response_buffer_);
7✔
505
        }
506

507
        AssertOrReturnUnexpected(false);
×
508
}
509

510
void Client::CallHandler(ResponseHandler handler) {
291✔
511
        // This function exists to make sure we have a copy of the handler we're calling (in the
512
        // argument list). This is important in case the handler owns the client instance through a
513
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
514
        // does, then it destroys the final copy of the handler, and therefore also the client,
515
        // which is why we need to make a copy here, before calling it.
516
        handler(response_);
291✔
517
}
291✔
518

519
void Client::CallErrorHandler(
82✔
520
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
521
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
246✔
522
}
82✔
523

524
void Client::CallErrorHandler(
171✔
525
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
526
        *cancelled_ = true;
171✔
527
        cancelled_ = make_shared<bool>(true);
171✔
528
        stream_.reset();
171✔
529
        status_ = TransactionStatus::Done;
171✔
530
        handler(expected::unexpected(
342✔
531
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
684✔
532
}
171✔
533

534
void Client::ResolveHandler(
259✔
535
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
536
        if (ec) {
259✔
537
                CallErrorHandler(ec, request_, header_handler_);
×
538
                return;
×
539
        }
540

541
        if (logger_.Level() >= log::LogLevel::Debug) {
259✔
542
                string ips = "[";
239✔
543
                string sep;
544
                for (auto r : results) {
1,016✔
545
                        ips += sep;
269✔
546
                        ips += r.endpoint().address().to_string();
269✔
547
                        sep = ", ";
269✔
548
                }
549
                ips += "]";
239✔
550
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
478✔
551
        }
552

553
        resolver_results_ = results;
554

555
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
259✔
556
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
518✔
557

558
        if (!response_data_.response_buffer_) {
259✔
559
                // We can reuse this if preexisting.
560
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
384✔
561

562
                // This is equivalent to:
563
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
564
                // but compatible with Boost 1.67.
565
                response_data_.response_buffer_->prepare(
566
                        body_buffer_.size() - response_data_.response_buffer_->size());
192✔
567
        }
568

569
        auto &cancelled = cancelled_;
570

571
        asio::async_connect(
259✔
572
                stream_->lowest_layer(),
573
                resolver_results_,
259✔
574
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
518✔
575
                        if (!*cancelled) {
259✔
576
                                switch (socket_mode_) {
259✔
577
                                case SocketMode::TlsTls:
×
578
                                        // Should never happen because we always need to handshake
579
                                        // the innermost Tls first, then the outermost, but the
580
                                        // latter doesn't happen here.
581
                                        assert(false);
582
                                        CallErrorHandler(
×
583
                                                error::MakeError(
×
584
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
585
                                                request_,
×
586
                                                header_handler_);
×
587
                                case SocketMode::Tls:
20✔
588
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
20✔
589
                                case SocketMode::Plain:
239✔
590
                                        return ConnectHandler(ec, endpoint);
239✔
591
                                }
592
                        }
593
                });
594
}
595

596
template <typename StreamType>
597
void Client::HandshakeHandler(
23✔
598
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
599
        if (ec) {
23✔
600
                CallErrorHandler(ec, request_, header_handler_);
2✔
601
                return;
2✔
602
        }
603

604
        if (not disable_keep_alive_) {
21✔
605
                boost::asio::socket_base::keep_alive option(true);
606
                stream_->lowest_layer().set_option(option);
21✔
607
        }
608

609
        // Set SNI Hostname (many hosts need this to handshake successfully)
610
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
21✔
611
                beast::error_code ec2 {
×
612
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
613
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
614
        }
615

616
        // Enable host name verification (not done automatically and we don't have
617
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
618
        // hence the callback that boost provides).
619
        boost::system::error_code b_ec;
21✔
620
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
42✔
621
        if (b_ec) {
21✔
622
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
623
                CallErrorHandler(b_ec, request_, header_handler_);
×
624
                return;
×
625
        }
626

627
        auto &cancelled = cancelled_;
628

629
        stream.async_handshake(
42✔
630
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
21✔
631
                        if (*cancelled) {
23✔
632
                                return;
633
                        }
634
                        if (ec) {
23✔
635
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
636
                                CallErrorHandler(ec, request_, header_handler_);
10✔
637
                                return;
10✔
638
                        }
639
                        logger_.Debug("https: Successful SSL handshake");
26✔
640
                        ConnectHandler(ec, endpoint);
13✔
641
                });
642
}
643

644

645
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
252✔
646
        if (ec) {
252✔
647
                CallErrorHandler(ec, request_, header_handler_);
16✔
648
                return;
16✔
649
        }
650

651
        if (not disable_keep_alive_) {
236✔
652
                boost::asio::socket_base::keep_alive option(true);
653
                stream_->lowest_layer().set_option(option);
236✔
654
        }
655

656
        logger_.Debug("Connected to " + endpoint.address().to_string());
472✔
657

658
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
236✔
659
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
472✔
660

661
        for (const auto &header : request_->headers_) {
623✔
662
                request_data_.http_request_->set(header.first, header.second);
387✔
663
        }
664

665
        request_data_.http_request_serializer_ =
666
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
236✔
667

668
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
472✔
669

670
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
671
        // if they do, they should be handled higher up in the application logic.
672
        //
673
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
674
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
675
        // `has_value()` in their code, causing their subsequent comparison operation to
676
        // misbehave. So pass highest possible value instead.
677
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
678

679
        auto &cancelled = cancelled_;
680
        auto &request_data = request_data_;
236✔
681

682
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
236✔
683
                if (!*cancelled) {
236✔
684
                        WriteHeaderHandler(ec, num_written);
236✔
685
                }
686
        };
472✔
687

688
        switch (socket_mode_) {
236✔
689
        case SocketMode::TlsTls:
1✔
690
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
1✔
691
                break;
692
        case SocketMode::Tls:
12✔
693
                http::async_write_header(
12✔
694
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
695
                break;
696
        case SocketMode::Plain:
223✔
697
                http::async_write_header(
223✔
698
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
699
                break;
700
        }
701
}
702

703
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
236✔
704
        if (num_written > 0) {
236✔
705
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
472✔
706
        }
707

708
        if (ec) {
236✔
709
                CallErrorHandler(ec, request_, header_handler_);
×
710
                return;
197✔
711
        }
712

713
        auto header = request_->GetHeader("Content-Length");
472✔
714
        if (!header || header.value() == "0") {
236✔
715
                ReadHeader();
196✔
716
                return;
717
        }
718

719
        auto length = common::StringToLongLong(header.value());
40✔
720
        if (!length || length.value() < 0) {
40✔
721
                auto err = error::Error(
722
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
723
                CallErrorHandler(err, request_, header_handler_);
×
724
                return;
725
        }
726
        request_body_length_ = length.value();
40✔
727

728
        if (!request_->body_gen_ && !request_->async_body_gen_) {
40✔
729
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
730
                CallErrorHandler(err, request_, header_handler_);
2✔
731
                return;
732
        }
733

734
        assert(!(request_->body_gen_ && request_->async_body_gen_));
735

736
        if (request_->body_gen_) {
39✔
737
                auto body_reader = request_->body_gen_();
33✔
738
                if (!body_reader) {
33✔
739
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
740
                        return;
741
                }
742
                request_->body_reader_ = body_reader.value();
33✔
743
        } else {
744
                auto body_reader = request_->async_body_gen_();
6✔
745
                if (!body_reader) {
6✔
746
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
747
                        return;
748
                }
749
                request_->async_body_reader_ = body_reader.value();
6✔
750
        }
751

752
        PrepareAndWriteNewBodyBuffer();
39✔
753
}
754

755
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,124✔
756
        if (num_written > 0) {
2,124✔
757
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,088✔
758
        }
759

760
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,124✔
761
                // Write next block of the body.
762
                PrepareAndWriteNewBodyBuffer();
1,044✔
763
        } else if (ec) {
1,080✔
764
                CallErrorHandler(ec, request_, header_handler_);
8✔
765
        } else if (num_written > 0) {
1,076✔
766
                // We are still writing the body.
767
                WriteBody();
1,044✔
768
        } else {
769
                // We are ready to receive the response.
770
                ReadHeader();
32✔
771
        }
772
}
2,124✔
773

774
void Client::PrepareAndWriteNewBodyBuffer() {
1,083✔
775
        // request_->body_reader_ XOR request_->async_body_reader_
776
        assert(
777
                (request_->body_reader_ || request_->async_body_reader_)
778
                && !(request_->body_reader_ && request_->async_body_reader_));
779

780
        auto cancelled = cancelled_;
781
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,438✔
782
                if (!*cancelled) {
1,083✔
783
                        if (!read) {
1,082✔
784
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
785
                                return;
2✔
786
                        }
787
                        WriteNewBodyBuffer(read.value());
1,080✔
788
                }
789
        };
1,083✔
790

791

792
        if (request_->body_reader_) {
1,083✔
793
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,316✔
794
        } else {
795
                auto err = request_->async_body_reader_->AsyncRead(
796
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
797
                if (err != error::NoError) {
425✔
798
                        CallErrorHandler(err, request_, header_handler_);
×
799
                }
800
        }
801
}
1,083✔
802

803
void Client::WriteNewBodyBuffer(size_t size) {
1,080✔
804
        request_data_.http_request_->body().data = body_buffer_.data();
1,080✔
805
        request_data_.http_request_->body().size = size;
1,080✔
806

807
        if (size > 0) {
1,080✔
808
                request_data_.http_request_->body().more = true;
1,048✔
809
        } else {
810
                // Release ownership of Body reader.
811
                request_->body_reader_.reset();
32✔
812
                request_->async_body_reader_.reset();
32✔
813
                request_data_.http_request_->body().more = false;
32✔
814
        }
815

816
        WriteBody();
1,080✔
817
}
1,080✔
818

819
void Client::WriteBody() {
2,124✔
820
        auto &cancelled = cancelled_;
821
        auto &request_data = request_data_;
2,124✔
822

823
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,124✔
824
                if (!*cancelled) {
2,124✔
825
                        WriteBodyHandler(ec, num_written);
2,124✔
826
                }
827
        };
4,248✔
828

829
        switch (socket_mode_) {
2,124✔
830
        case SocketMode::TlsTls:
×
831
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
832
                break;
833
        case SocketMode::Tls:
×
834
                http::async_write_some(
835
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
836
                break;
837
        case SocketMode::Plain:
2,124✔
838
                http::async_write_some(
839
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
840
                break;
841
        }
842
}
2,124✔
843

844
void Client::ReadHeader() {
228✔
845
        auto &cancelled = cancelled_;
846
        auto &response_data = response_data_;
228✔
847

848
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
225✔
849
                if (!*cancelled) {
225✔
850
                        ReadHeaderHandler(ec, num_read);
225✔
851
                }
852
        };
456✔
853

854
        switch (socket_mode_) {
228✔
855
        case SocketMode::TlsTls:
1✔
856
                http::async_read_some(
1✔
857
                        *stream_,
858
                        *response_data_.response_buffer_,
859
                        *response_data_.http_response_parser_,
860
                        handler);
861
                break;
862
        case SocketMode::Tls:
12✔
863
                http::async_read_some(
12✔
864
                        stream_->next_layer(),
865
                        *response_data_.response_buffer_,
866
                        *response_data_.http_response_parser_,
867
                        handler);
868
                break;
869
        case SocketMode::Plain:
215✔
870
                http::async_read_some(
215✔
871
                        stream_->next_layer().next_layer(),
872
                        *response_data_.response_buffer_,
873
                        *response_data_.http_response_parser_,
874
                        handler);
875
                break;
876
        }
877
}
228✔
878

879
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
225✔
880
        if (num_read > 0) {
225✔
881
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
440✔
882
        }
883

884
        if (ec) {
225✔
885
                CallErrorHandler(ec, request_, header_handler_);
5✔
886
                return;
62✔
887
        }
888

889
        if (!response_data_.http_response_parser_->is_header_done()) {
220✔
890
                ReadHeader();
×
891
                return;
×
892
        }
893

894
        auto content_length = response_data_.http_response_parser_->content_length();
220✔
895
        if (content_length) {
220✔
896
                response_body_length_ = content_length.value();
181✔
897
        } else {
898
                response_body_length_ = 0;
39✔
899
        }
900

901
        if (secondary_req_) {
220✔
902
                HandleSecondaryRequest();
7✔
903
                return;
7✔
904
        }
905

906
        response_.reset(new IncomingResponse(*this, cancelled_));
426✔
907
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
213✔
908
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
213✔
909

910
        logger_.Debug(
426✔
911
                "Received response: " + to_string(response_->status_code_) + " "
426✔
912
                + response_->status_message_);
639✔
913

914
        string debug_str;
915
        for (auto header = response_data_.http_response_parser_->get().cbegin();
249✔
916
                 header != response_data_.http_response_parser_->get().cend();
462✔
917
                 header++) {
918
                response_->headers_[string {header->name_string()}] = string {header->value()};
747✔
919
                if (logger_.Level() >= log::LogLevel::Debug) {
249✔
920
                        debug_str += string {header->name_string()};
235✔
921
                        debug_str += ": ";
235✔
922
                        debug_str += string {header->value()};
235✔
923
                        debug_str += "\n";
235✔
924
                }
925
        }
926

927
        logger_.Debug("Received headers:\n" + debug_str);
426✔
928
        debug_str.clear();
929

930
        if (response_data_.http_response_parser_->chunked()) {
213✔
931
                auto cancelled = cancelled_;
932
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
933
                CallHandler(header_handler_);
2✔
934
                if (!*cancelled) {
1✔
935
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
936
                        CallErrorHandler(err, request_, body_handler_);
2✔
937
                }
938
                return;
939
        }
940

941
        response_body_read_ = 0;
212✔
942

943
        if (response_body_read_ >= response_body_length_) {
212✔
944
                auto cancelled = cancelled_;
945
                status_ = TransactionStatus::HeaderHandlerCalled;
46✔
946
                CallHandler(header_handler_);
92✔
947
                if (!*cancelled) {
46✔
948
                        status_ = TransactionStatus::Done;
40✔
949
                        CallHandler(body_handler_);
80✔
950

951
                        // After body handler has run, set the request to cancelled. The body
952
                        // handler may have made a new request, so this is not necessarily the same
953
                        // request as is currently active (note use of shared_ptr copy, not
954
                        // `cancelled_`).
955
                        *cancelled = true;
40✔
956
                }
957
                return;
958
        }
959

960
        auto cancelled = cancelled_;
961
        status_ = TransactionStatus::HeaderHandlerCalled;
166✔
962
        CallHandler(header_handler_);
332✔
963
        if (*cancelled) {
166✔
964
                return;
965
        }
966

967
        // We know that a body reader is required here, because of the `response_body_read_ >=
968
        // response_body_length_` check above.
969
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
163✔
970
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
36✔
971
        }
972
}
973

974
void Client::HandleSecondaryRequest() {
7✔
975
        logger_.Debug(
14✔
976
                "Received proxy response: "
977
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
14✔
978
                + string {response_data_.http_response_parser_->get().reason()});
28✔
979

980
        request_ = std::move(secondary_req_);
981

982
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
7✔
983
                auto err = MakeError(
984
                        ProxyError,
985
                        "Proxy returned unexpected response: "
986
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
987
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
988
                CallErrorHandler(err, request_, header_handler_);
4✔
989
                return;
990
        }
991

992
        if (response_body_length_ != 0 || response_data_.http_response_parser_->chunked()) {
5✔
993
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
994
                CallErrorHandler(err, request_, header_handler_);
×
995
                return;
996
        }
997

998
        // We are connected. Now repeat the request cycle with the original request. Pretend
999
        // we were just connected.
1000

1001
        assert(request_->GetProtocol() == "https");
1002

1003
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
1004
        // a different layer, this will get out of sync.
1005
        assert(response_data_.response_buffer_->size() == 0);
1006

1007
        switch (socket_mode_) {
5✔
1008
        case SocketMode::TlsTls:
×
1009
                // Should never get here, because this is the only place where TlsTls mode
1010
                // is supposed to be turned on.
1011
                assert(false);
1012
                CallErrorHandler(
×
1013
                        error::MakeError(
×
1014
                                error::ProgrammingError,
1015
                                "Any other mode than Tls is not valid when handling secondary request"),
×
1016
                        request_,
×
1017
                        header_handler_);
×
1018
                break;
×
1019
        case SocketMode::Tls:
2✔
1020
                // Upgrade to TLS inside TLS.
1021
                socket_mode_ = SocketMode::TlsTls;
2✔
1022
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
2✔
1023
                break;
2✔
1024
        case SocketMode::Plain:
3✔
1025
                // Upgrade to TLS.
1026
                socket_mode_ = SocketMode::Tls;
3✔
1027
                HandshakeHandler(
3✔
1028
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1029
                break;
3✔
1030
        }
1031
}
1032

1033
void Client::AsyncReadNextBodyPart(
3,802✔
1034
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1035
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1036

1037
        if (status_ == TransactionStatus::ReaderCreated) {
3,802✔
1038
                status_ = TransactionStatus::BodyReadingInProgress;
144✔
1039
        }
1040

1041
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
3,802✔
1042
                auto cancelled = cancelled_;
1043
                handler(0);
76✔
1044
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
38✔
1045
                        status_ = TransactionStatus::Done;
38✔
1046
                        CallHandler(body_handler_);
76✔
1047

1048
                        // After body handler has run, set the request to cancelled. The body
1049
                        // handler may have made a new request, so this is not necessarily the same
1050
                        // request as is currently active (note use of shared_ptr copy, not
1051
                        // `cancelled_`).
1052
                        *cancelled = true;
38✔
1053
                }
1054
                return;
1055
        }
1056

1057
        reader_buf_start_ = start;
3,764✔
1058
        reader_buf_end_ = end;
3,764✔
1059
        reader_handler_ = handler;
3,764✔
1060
        size_t read_size = end - start;
3,764✔
1061
        size_t smallest = min(body_buffer_.size(), read_size);
5,877✔
1062

1063
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
3,764✔
1064
        response_data_.http_response_parser_->get().body().size = smallest;
3,764✔
1065

1066
        auto &cancelled = cancelled_;
1067
        auto &response_data = response_data_;
3,764✔
1068

1069
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
3,763✔
1070
                if (!*cancelled) {
3,763✔
1071
                        ReadBodyHandler(ec, num_read);
3,762✔
1072
                }
1073
        };
7,528✔
1074

1075
        switch (socket_mode_) {
3,764✔
1076
        case SocketMode::TlsTls:
1✔
1077
                http::async_read_some(
1✔
1078
                        *stream_,
1079
                        *response_data_.response_buffer_,
1080
                        *response_data_.http_response_parser_,
1081
                        async_handler);
1082
                break;
1083
        case SocketMode::Tls:
3✔
1084
                http::async_read_some(
3✔
1085
                        stream_->next_layer(),
1086
                        *response_data_.response_buffer_,
1087
                        *response_data_.http_response_parser_,
1088
                        async_handler);
1089
                break;
1090
        case SocketMode::Plain:
3,760✔
1091
                http::async_read_some(
3,760✔
1092
                        stream_->next_layer().next_layer(),
1093
                        *response_data_.response_buffer_,
1094
                        *response_data_.http_response_parser_,
1095
                        async_handler);
1096
                break;
1097
        }
1098
}
1099

1100
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
3,762✔
1101
        if (num_read > 0) {
3,762✔
1102
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
7,426✔
1103
                response_body_read_ += num_read;
3,713✔
1104
        }
1105

1106
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,762✔
1107
                // This can be ignored. We always reset the buffer between reads anyway.
1108
                ec = error_code();
1,958✔
1109
        }
1110

1111
        assert(reader_handler_);
1112

1113
        if (response_body_read_ >= response_body_length_) {
3,762✔
1114
                status_ = TransactionStatus::BodyReadingFinished;
91✔
1115
        }
1116

1117
        auto cancelled = cancelled_;
1118

1119
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
3,762✔
1120
        size_t smallest = min(num_read, buf_size);
3,762✔
1121
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,762✔
1122
        if (ec) {
3,762✔
1123
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
98✔
1124
                reader_handler_(expected::unexpected(err));
147✔
1125
        } else {
1126
                reader_handler_(smallest);
7,426✔
1127
        }
1128

1129
        if (!*cancelled && ec) {
3,762✔
1130
                CallErrorHandler(ec, request_, body_handler_);
90✔
1131
                return;
1132
        }
1133
}
1134

1135
void Client::Cancel() {
189✔
1136
        auto cancelled = cancelled_;
1137

1138
        if (!*cancelled) {
189✔
1139
                auto err =
1140
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
134✔
1141
                switch (status_) {
67✔
1142
                case TransactionStatus::None:
2✔
1143
                        CallErrorHandler(err, request_, header_handler_);
2✔
1144
                        break;
2✔
1145
                case TransactionStatus::HeaderHandlerCalled:
63✔
1146
                case TransactionStatus::ReaderCreated:
1147
                case TransactionStatus::BodyReadingInProgress:
1148
                case TransactionStatus::BodyReadingFinished:
1149
                        CallErrorHandler(err, request_, body_handler_);
63✔
1150
                        break;
63✔
1151
                case TransactionStatus::Replying:
1152
                case TransactionStatus::SwitchingProtocol:
1153
                        // Not used by client.
1154
                        assert(false);
1155
                        break;
1156
                case TransactionStatus::BodyHandlerCalled:
1157
                case TransactionStatus::Done:
1158
                        break;
1159
                }
1160
        }
1161

1162
        if (!*cancelled) {
189✔
1163
                DoCancel();
2✔
1164
        }
1165
}
189✔
1166

1167
void Client::DoCancel() {
347✔
1168
        resolver_.cancel();
347✔
1169
        if (stream_) {
347✔
1170
                stream_->lowest_layer().cancel();
72✔
1171
                stream_->lowest_layer().close();
72✔
1172
                stream_.reset();
72✔
1173
        }
1174

1175
        request_.reset();
347✔
1176
        response_.reset();
347✔
1177

1178
        // Reset logger to no connection.
1179
        logger_ = log::Logger(logger_name_);
347✔
1180

1181
        // Set cancel state and then make a new one. Those who are interested should have their own
1182
        // pointer to the old one.
1183
        *cancelled_ = true;
347✔
1184
        cancelled_ = make_shared<bool>(true);
347✔
1185
}
347✔
1186

1187
Stream::Stream(Server &server) :
431✔
1188
        server_ {server},
1189
        logger_ {"http"},
1190
        cancelled_(make_shared<bool>(true)),
431✔
1191
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
431✔
1192
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,293✔
1193
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
862✔
1194

1195
        // This is equivalent to:
1196
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1197
        // but compatible with Boost 1.67.
1198
        request_data_.request_buffer_->prepare(
1199
                body_buffer_.size() - request_data_.request_buffer_->size());
431✔
1200

1201
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
862✔
1202

1203
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1204
        // they do, they should be handled higher up in the application logic.
1205
        //
1206
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1207
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1208
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1209
        // possible value instead.
1210
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1211
}
431✔
1212

1213
Stream::~Stream() {
1,293✔
1214
        DoCancel();
431✔
1215
}
431✔
1216

1217
void Stream::Cancel() {
7✔
1218
        auto cancelled = cancelled_;
1219

1220
        if (!*cancelled) {
7✔
1221
                auto err =
1222
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1223
                switch (status_) {
7✔
1224
                case TransactionStatus::None:
×
1225
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1226
                        break;
×
1227
                case TransactionStatus::HeaderHandlerCalled:
5✔
1228
                case TransactionStatus::ReaderCreated:
1229
                case TransactionStatus::BodyReadingInProgress:
1230
                case TransactionStatus::BodyReadingFinished:
1231
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1232
                        break;
5✔
1233
                case TransactionStatus::BodyHandlerCalled:
×
1234
                        // In between body handler and reply finished. No one to handle the status
1235
                        // here.
1236
                        server_.RemoveStream(shared_from_this());
×
1237
                        break;
×
1238
                case TransactionStatus::Replying:
1✔
1239
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1240
                        break;
1✔
1241
                case TransactionStatus::SwitchingProtocol:
1✔
1242
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1243
                        break;
1✔
1244
                case TransactionStatus::Done:
1245
                        break;
1246
                }
1247
        }
1248

1249
        if (!*cancelled) {
7✔
1250
                DoCancel();
×
1251
        }
1252
}
7✔
1253

1254
void Stream::DoCancel() {
611✔
1255
        if (socket_.is_open()) {
611✔
1256
                socket_.cancel();
211✔
1257
                socket_.close();
211✔
1258
        }
1259

1260
        // Set cancel state and then make a new one. Those who are interested should have their own
1261
        // pointer to the old one.
1262
        *cancelled_ = true;
611✔
1263
        cancelled_ = make_shared<bool>(true);
611✔
1264
}
611✔
1265

1266
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1267
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1268
}
×
1269

1270
void Stream::CallErrorHandler(
×
1271
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1272
        *cancelled_ = true;
×
1273
        cancelled_ = make_shared<bool>(true);
×
1274
        status_ = TransactionStatus::Done;
×
1275
        handler(expected::unexpected(err.WithContext(
×
1276
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1277

1278
        server_.RemoveStream(shared_from_this());
×
1279
}
×
1280

1281
void Stream::CallErrorHandler(
2✔
1282
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1283
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1284
}
2✔
1285

1286
void Stream::CallErrorHandler(
9✔
1287
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1288
        *cancelled_ = true;
9✔
1289
        cancelled_ = make_shared<bool>(true);
9✔
1290
        status_ = TransactionStatus::Done;
9✔
1291
        handler(
9✔
1292
                req,
1293
                err.WithContext(
9✔
1294
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
27✔
1295

1296
        server_.RemoveStream(shared_from_this());
9✔
1297
}
9✔
1298

1299
void Stream::CallErrorHandler(
4✔
1300
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1301
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1302
}
4✔
1303

1304
void Stream::CallErrorHandler(
7✔
1305
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1306
        *cancelled_ = true;
7✔
1307
        cancelled_ = make_shared<bool>(true);
7✔
1308
        status_ = TransactionStatus::Done;
7✔
1309
        handler(err.WithContext(
14✔
1310
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1311

1312
        server_.RemoveStream(shared_from_this());
7✔
1313
}
7✔
1314

1315
void Stream::CallErrorHandler(
×
1316
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1317
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1318
}
×
1319

1320
void Stream::CallErrorHandler(
1✔
1321
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1322
        *cancelled_ = true;
1✔
1323
        cancelled_ = make_shared<bool>(true);
1✔
1324
        status_ = TransactionStatus::Done;
1✔
1325
        handler(expected::unexpected(err.WithContext(
2✔
1326
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1327

1328
        server_.RemoveStream(shared_from_this());
1✔
1329
}
1✔
1330

1331
void Stream::AcceptHandler(const error_code &ec) {
219✔
1332
        if (ec) {
219✔
1333
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1334
                return;
×
1335
        }
1336

1337
        auto ip = socket_.remote_endpoint().address().to_string();
438✔
1338

1339
        // Use IP as context for logging.
1340
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
219✔
1341

1342
        logger_.Debug("Accepted connection.");
438✔
1343

1344
        request_.reset(new IncomingRequest(*this, cancelled_));
438✔
1345

1346
        request_->address_.host = ip;
219✔
1347

1348
        *cancelled_ = false;
219✔
1349

1350
        ReadHeader();
219✔
1351
}
1352

1353
void Stream::ReadHeader() {
219✔
1354
        auto &cancelled = cancelled_;
1355
        auto &request_data = request_data_;
219✔
1356

1357
        http::async_read_some(
438✔
1358
                socket_,
219✔
1359
                *request_data_.request_buffer_,
1360
                *request_data_.http_request_parser_,
1361
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
219✔
1362
                        if (!*cancelled) {
219✔
1363
                                ReadHeaderHandler(ec, num_read);
219✔
1364
                        }
1365
                });
219✔
1366
}
219✔
1367

1368
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
219✔
1369
        if (num_read > 0) {
219✔
1370
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
438✔
1371
        }
1372

1373
        if (ec) {
219✔
1374
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1375
                return;
180✔
1376
        }
1377

1378
        if (!request_data_.http_request_parser_->is_header_done()) {
219✔
1379
                ReadHeader();
×
1380
                return;
×
1381
        }
1382

1383
        auto method_result = BeastVerbToMethod(
1384
                request_data_.http_request_parser_->get().base().method(),
1385
                string {request_data_.http_request_parser_->get().base().method_string()});
438✔
1386
        if (!method_result) {
219✔
1387
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1388
                return;
×
1389
        }
1390
        request_->method_ = method_result.value();
219✔
1391
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
219✔
1392

1393
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
219✔
1394

1395
        string debug_str;
1396
        for (auto header = request_data_.http_request_parser_->get().cbegin();
381✔
1397
                 header != request_data_.http_request_parser_->get().cend();
600✔
1398
                 header++) {
1399
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,143✔
1400
                if (logger_.Level() >= log::LogLevel::Debug) {
381✔
1401
                        debug_str += string {header->name_string()};
320✔
1402
                        debug_str += ": ";
320✔
1403
                        debug_str += string {header->value()};
320✔
1404
                        debug_str += "\n";
320✔
1405
                }
1406
        }
1407

1408
        logger_.Debug("Received headers:\n" + debug_str);
438✔
1409
        debug_str.clear();
1410

1411
        if (request_data_.http_request_parser_->chunked()) {
219✔
1412
                auto cancelled = cancelled_;
1413
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
1414
                server_.header_handler_(request_);
2✔
1415
                if (!*cancelled) {
1✔
1416
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
1417
                        CallErrorHandler(err, request_, server_.body_handler_);
2✔
1418
                }
1419
                return;
1420
        }
1421

1422
        auto content_length = request_data_.http_request_parser_->content_length();
218✔
1423
        if (content_length) {
218✔
1424
                request_body_length_ = content_length.value();
40✔
1425
        } else {
1426
                request_body_length_ = 0;
178✔
1427
        }
1428
        request_body_read_ = 0;
218✔
1429

1430
        if (request_body_read_ >= request_body_length_) {
218✔
1431
                auto cancelled = cancelled_;
1432
                status_ = TransactionStatus::HeaderHandlerCalled;
178✔
1433
                server_.header_handler_(request_);
356✔
1434
                if (!*cancelled) {
178✔
1435
                        status_ = TransactionStatus::BodyHandlerCalled;
178✔
1436
                        CallBodyHandler();
178✔
1437
                }
1438
                return;
1439
        }
1440

1441
        auto cancelled = cancelled_;
1442
        status_ = TransactionStatus::HeaderHandlerCalled;
40✔
1443
        server_.header_handler_(request_);
80✔
1444
        if (*cancelled) {
40✔
1445
                return;
1446
        }
1447

1448
        // We know that a body reader is required here, because of the `request_body_read_ >=
1449
        // request_body_length_` check above.
1450
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
39✔
1451
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1452
        }
1453
}
1454

1455
void Stream::AsyncReadNextBodyPart(
2,034✔
1456
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1457
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1458

1459
        if (status_ == TransactionStatus::ReaderCreated) {
2,034✔
1460
                status_ = TransactionStatus::BodyReadingInProgress;
38✔
1461
        }
1462

1463
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,034✔
1464
                auto cancelled = cancelled_;
1465
                handler(0);
64✔
1466
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
32✔
1467
                        status_ = TransactionStatus::BodyHandlerCalled;
32✔
1468
                        CallBodyHandler();
32✔
1469
                }
1470
                return;
1471
        }
1472

1473
        reader_buf_start_ = start;
2,002✔
1474
        reader_buf_end_ = end;
2,002✔
1475
        reader_handler_ = handler;
2,002✔
1476
        size_t read_size = end - start;
2,002✔
1477
        size_t smallest = min(body_buffer_.size(), read_size);
3,058✔
1478

1479
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,002✔
1480
        request_data_.http_request_parser_->get().body().size = smallest;
2,002✔
1481

1482
        auto &cancelled = cancelled_;
1483
        auto &request_data = request_data_;
2,002✔
1484

1485
        http::async_read_some(
4,004✔
1486
                socket_,
2,002✔
1487
                *request_data_.request_buffer_,
1488
                *request_data_.http_request_parser_,
1489
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,002✔
1490
                        if (!*cancelled) {
2,002✔
1491
                                ReadBodyHandler(ec, num_read);
2,002✔
1492
                        }
1493
                });
2,002✔
1494
}
1495

1496
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,002✔
1497
        if (num_read > 0) {
2,002✔
1498
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
3,996✔
1499
                request_body_read_ += num_read;
1,998✔
1500
        }
1501

1502
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,002✔
1503
                // This can be ignored. We always reset the buffer between reads anyway.
1504
                ec = error_code();
979✔
1505
        }
1506

1507
        assert(reader_handler_);
1508

1509
        if (request_body_read_ >= request_body_length_) {
2,002✔
1510
                status_ = TransactionStatus::BodyReadingFinished;
32✔
1511
        }
1512

1513
        auto cancelled = cancelled_;
1514

1515
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,002✔
1516
        size_t smallest = min(num_read, buf_size);
2,002✔
1517
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,002✔
1518
        if (ec) {
2,002✔
1519
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1520
                reader_handler_(expected::unexpected(err));
12✔
1521
        } else {
1522
                reader_handler_(smallest);
3,996✔
1523
        }
1524

1525
        if (!*cancelled && ec) {
2,002✔
1526
                CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1527
                return;
1528
        }
1529
}
1530

1531
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
196✔
1532
        SetupResponse();
196✔
1533

1534
        reply_finished_handler_ = reply_finished_handler;
196✔
1535

1536
        auto &cancelled = cancelled_;
1537
        auto &response_data = response_data_;
196✔
1538

1539
        http::async_write_header(
392✔
1540
                socket_,
196✔
1541
                *response_data_.http_response_serializer_,
1542
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
196✔
1543
                        if (!*cancelled) {
196✔
1544
                                WriteHeaderHandler(ec, num_written);
195✔
1545
                        }
1546
                });
196✔
1547
}
196✔
1548

1549
void Stream::SetupResponse() {
205✔
1550
        auto response = maybe_response_.lock();
205✔
1551
        // Only called from existing responses, so this should always be true.
1552
        assert(response);
1553

1554
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1555
        status_ = TransactionStatus::Replying;
205✔
1556

1557
        // From here on we take shared ownership.
1558
        response_ = response;
1559

1560
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
410✔
1561

1562
        for (const auto &header : response->headers_) {
428✔
1563
                response_data_.http_response_->base().set(header.first, header.second);
223✔
1564
        }
1565

1566
        response_data_.http_response_->result(response->GetStatusCode());
205✔
1567
        response_data_.http_response_->reason(response->GetStatusMessage());
410✔
1568

1569
        response_data_.http_response_serializer_ =
1570
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
410✔
1571
}
205✔
1572

1573
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
195✔
1574
        if (num_written > 0) {
195✔
1575
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
390✔
1576
        }
1577

1578
        if (ec) {
195✔
1579
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1580
                return;
36✔
1581
        }
1582

1583
        auto header = response_->GetHeader("Content-Length");
390✔
1584
        if (!header || header.value() == "0") {
195✔
1585
                FinishReply();
35✔
1586
                return;
1587
        }
1588

1589
        auto length = common::StringToLongLong(header.value());
160✔
1590
        if (!length || length.value() < 0) {
160✔
1591
                auto err = error::Error(
1592
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1593
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1594
                return;
1595
        }
1596

1597
        if (!response_->body_reader_ && !response_->async_body_reader_) {
160✔
1598
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1599
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1600
                return;
1601
        }
1602

1603
        PrepareAndWriteNewBodyBuffer();
159✔
1604
}
1605

1606
void Stream::PrepareAndWriteNewBodyBuffer() {
1,901✔
1607
        // response_->body_reader_ XOR response_->async_body_reader_
1608
        assert(
1609
                (response_->body_reader_ || response_->async_body_reader_)
1610
                && !(response_->body_reader_ && response_->async_body_reader_));
1611

1612
        auto read_handler = [this](io::ExpectedSize read) {
1,902✔
1613
                if (!read) {
1,901✔
1614
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1615
                        return;
1✔
1616
                }
1617
                WriteNewBodyBuffer(read.value());
1,900✔
1618
        };
1,901✔
1619

1620
        if (response_->body_reader_) {
1,901✔
1621
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,254✔
1622
        } else {
1623
                auto err = response_->async_body_reader_->AsyncRead(
1624
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1625
                if (err != error::NoError) {
274✔
1626
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1627
                }
1628
        }
1629
}
1,901✔
1630

1631
void Stream::WriteNewBodyBuffer(size_t size) {
1,900✔
1632
        response_data_.http_response_->body().data = body_buffer_.data();
1,900✔
1633
        response_data_.http_response_->body().size = size;
1,900✔
1634

1635
        if (size > 0) {
1,900✔
1636
                response_data_.http_response_->body().more = true;
1,777✔
1637
        } else {
1638
                response_data_.http_response_->body().more = false;
123✔
1639
        }
1640

1641
        WriteBody();
1,900✔
1642
}
1,900✔
1643

1644
void Stream::WriteBody() {
3,656✔
1645
        auto &cancelled = cancelled_;
1646
        auto &response_data = response_data_;
3,656✔
1647

1648
        http::async_write_some(
7,312✔
1649
                socket_,
3,656✔
1650
                *response_data_.http_response_serializer_,
1651
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
3,616✔
1652
                        if (!*cancelled) {
3,616✔
1653
                                WriteBodyHandler(ec, num_written);
3,616✔
1654
                        }
1655
                });
3,616✔
1656
}
3,656✔
1657

1658
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
3,616✔
1659
        if (num_written > 0) {
3,616✔
1660
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,512✔
1661
        }
1662

1663
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,616✔
1664
                // Write next body block.
1665
                PrepareAndWriteNewBodyBuffer();
1,742✔
1666
        } else if (ec) {
1,874✔
1667
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1668
        } else if (num_written > 0) {
1,870✔
1669
                // We are still writing the body.
1670
                WriteBody();
1,756✔
1671
        } else {
1672
                // We are finished.
1673
                FinishReply();
114✔
1674
        }
1675
}
3,616✔
1676

1677
void Stream::FinishReply() {
149✔
1678
        // We are done.
1679
        *cancelled_ = true;
149✔
1680
        cancelled_ = make_shared<bool>(true);
149✔
1681
        status_ = TransactionStatus::Done;
149✔
1682
        // Release ownership of Body reader.
1683
        response_->body_reader_.reset();
149✔
1684
        response_->async_body_reader_.reset();
149✔
1685
        reply_finished_handler_(error::NoError);
149✔
1686
        server_.RemoveStream(shared_from_this());
149✔
1687
}
149✔
1688

1689
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1690
        SetupResponse();
9✔
1691

1692
        switch_protocol_handler_ = handler;
9✔
1693
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1694

1695
        auto &cancelled = cancelled_;
1696
        auto &response_data = response_data_;
9✔
1697

1698
        http::async_write_header(
18✔
1699
                socket_,
9✔
1700
                *response_data_.http_response_serializer_,
1701
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1702
                        if (!*cancelled) {
9✔
1703
                                SwitchingProtocolHandler(ec, num_written);
8✔
1704
                        }
1705
                });
9✔
1706

1707
        return error::NoError;
9✔
1708
}
1709

1710
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1711
        if (num_written > 0) {
8✔
1712
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1713
        }
1714

1715
        if (ec) {
8✔
1716
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1717
                return;
×
1718
        }
1719

1720
        auto socket = make_shared<RawSocket<tcp::socket>>(
1721
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1722

1723
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1724

1725
        // Rest of the connection is done directly on the socket, we are done here.
1726
        status_ = TransactionStatus::Done;
8✔
1727
        *cancelled_ = true;
8✔
1728
        cancelled_ = make_shared<bool>(true);
8✔
1729
        server_.RemoveStream(shared_from_this());
16✔
1730

1731
        switch_protocol_handler(socket);
16✔
1732
}
1733

1734
void Stream::CallBodyHandler() {
210✔
1735
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1736
        // it immediately destroys, which would destroy this stream as well. At the end of this
1737
        // function, it's ok to destroy it.
1738
        auto stream_ref = shared_from_this();
1739

1740
        server_.body_handler_(request_, error::NoError);
630✔
1741

1742
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1743
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1744
        // request has not been handled correctly.
1745
        auto response = maybe_response_.lock();
210✔
1746
        if (!response) {
210✔
1747
                logger_.Error("Handler produced no response. Closing stream prematurely.");
4✔
1748
                *cancelled_ = true;
2✔
1749
                cancelled_ = make_shared<bool>(true);
2✔
1750
                server_.RemoveStream(shared_from_this());
6✔
1751
        }
1752
}
210✔
1753

1754
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
233✔
1755
        event_loop_ {event_loop},
1756
        acceptor_(GetAsioIoContext(event_loop_)) {
420✔
1757
}
233✔
1758

1759
Server::~Server() {
466✔
1760
        Cancel();
233✔
1761
}
233✔
1762

1763
error::Error Server::AsyncServeUrl(
198✔
1764
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1765
        return AsyncServeUrl(
1766
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
808✔
1767
                        if (err != error::NoError) {
206✔
1768
                                body_handler(expected::unexpected(err));
14✔
1769
                        } else {
1770
                                body_handler(req);
398✔
1771
                        }
1772
                });
602✔
1773
}
1774

1775
error::Error Server::AsyncServeUrl(
213✔
1776
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1777
        auto err = BreakDownUrl(url, address_);
213✔
1778
        if (error::NoError != err) {
213✔
1779
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1780
        }
1781

1782
        if (address_.protocol != "http") {
213✔
1783
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1784
        }
1785

1786
        if (address_.path.size() > 0 && address_.path != "/") {
213✔
1787
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1788
        }
1789

1790
        boost::system::error_code ec;
212✔
1791
        auto address = asio::ip::make_address(address_.host, ec);
212✔
1792
        if (ec) {
212✔
1793
                return error::Error(
1794
                        ec.default_error_condition(),
×
1795
                        "Could not construct endpoint from address " + address_.host);
×
1796
        }
1797

1798
        asio::ip::tcp::endpoint endpoint(address, address_.port);
212✔
1799

1800
        ec.clear();
1801
        acceptor_.open(endpoint.protocol(), ec);
212✔
1802
        if (ec) {
212✔
1803
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1804
        }
1805

1806
        // Allow address reuse, otherwise we can't re-bind later.
1807
        ec.clear();
1808
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
212✔
1809
        if (ec) {
212✔
1810
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1811
        }
1812

1813
        ec.clear();
1814
        acceptor_.bind(endpoint, ec);
212✔
1815
        if (ec) {
212✔
1816
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1817
        }
1818

1819
        ec.clear();
1820
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
212✔
1821
        if (ec) {
212✔
1822
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1823
        }
1824

1825
        header_handler_ = header_handler;
212✔
1826
        body_handler_ = body_handler;
212✔
1827

1828
        PrepareNewStream();
212✔
1829

1830
        return error::NoError;
212✔
1831
}
1832

1833
void Server::Cancel() {
253✔
1834
        if (acceptor_.is_open()) {
253✔
1835
                acceptor_.cancel();
212✔
1836
                acceptor_.close();
212✔
1837
        }
1838
        streams_.clear();
1839
}
253✔
1840

1841
uint16_t Server::GetPort() const {
17✔
1842
        return acceptor_.local_endpoint().port();
17✔
1843
}
1844

1845
string Server::GetUrl() const {
16✔
1846
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1847
}
1848

1849
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
209✔
1850
        if (*req->cancelled_) {
209✔
1851
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1852
        }
1853
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
418✔
1854
        req->stream_.maybe_response_ = response;
209✔
1855
        return response;
209✔
1856
}
1857

1858
error::Error Server::AsyncReply(
196✔
1859
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1860
        if (*resp->cancelled_) {
196✔
1861
                return MakeError(StreamCancelledError, "Cannot send response");
×
1862
        }
1863

1864
        resp->stream_.AsyncReply(reply_finished_handler);
196✔
1865
        return error::NoError;
196✔
1866
}
1867

1868
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
55✔
1869
        if (*req->cancelled_) {
55✔
1870
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1871
        }
1872

1873
        auto &stream = req->stream_;
55✔
1874
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
55✔
1875
                return expected::unexpected(error::Error(
1✔
1876
                        make_error_condition(errc::operation_in_progress),
2✔
1877
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1878
        }
1879

1880
        if (stream.request_body_length_ == 0) {
54✔
1881
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
48✔
1882
        }
1883

1884
        stream.status_ = TransactionStatus::ReaderCreated;
38✔
1885
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
76✔
1886
}
1887

1888
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1889
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1890
}
1891

1892
void Server::PrepareNewStream() {
431✔
1893
        StreamPtr new_stream {new Stream(*this)};
431✔
1894
        streams_.insert(new_stream);
1895
        AsyncAccept(new_stream);
862✔
1896
}
431✔
1897

1898
void Server::AsyncAccept(StreamPtr stream) {
431✔
1899
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
653✔
1900
                if (ec) {
222✔
1901
                        log::Error("Could not accept connection: " + ec.message());
6✔
1902
                        return;
3✔
1903
                }
1904

1905
                stream->AcceptHandler(ec);
219✔
1906

1907
                this->PrepareNewStream();
219✔
1908
        });
1909
}
431✔
1910

1911
void Server::RemoveStream(StreamPtr stream) {
180✔
1912
        streams_.erase(stream);
180✔
1913

1914
        stream->DoCancel();
180✔
1915
}
180✔
1916

1917
} // namespace http
1918
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc