• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1046247406

23 Oct 2023 09:07AM UTC coverage: 80.105% (-0.1%) from 80.226%
1046247406

push

gitlab-ci

oleorhagen
feat(crypto): Add HSM support

This adds support for HSM key storage. This is a wrapper around the existing
OpenSSL external crypto support, and supports both the deprecated `ENGINE` API
in OpenSSL 1.x, and the new `Provider` API in OpenSSL 3.x.

Ticket: MEN-6668
Changelog: The client's HSM crypto-module support is changed so that the
`PrivateKey` used for `authentication` is always taken from the configurations:
`security.AuthPrivateKey`, and the `HttpsClient.private_key` is only used as the
key for the associated certificate `HttpsClient.client_certificate`. The two can
still use the same key, but this means now that you add the same key `url` in
both places.

Signed-off-by: Ole Petter <ole.orhagen@northern.tech>

66 of 66 new or added lines in 7 files covered. (100.0%)

6845 of 8545 relevant lines covered (80.11%)

9412.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.35
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24
#include <common/crypto.hpp>
25

26
namespace mender {
27
namespace http {
28

29
namespace common = mender::common;
30
namespace crypto = mender::common::crypto;
31

32
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
33
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
34
const unsigned int BeastHttpVersion = 11;
35

36
namespace asio = boost::asio;
37
namespace http = boost::beast::http;
38

39
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
40

41
static http::verb MethodToBeastVerb(Method method) {
226✔
42
        switch (method) {
226✔
43
        case Method::GET:
44
                return http::verb::get;
45
        case Method::HEAD:
46
                return http::verb::head;
47
        case Method::POST:
48
                return http::verb::post;
49
        case Method::PUT:
50
                return http::verb::put;
51
        case Method::PATCH:
52
                return http::verb::patch;
53
        case Method::CONNECT:
54
                return http::verb::connect;
55
        case Method::Invalid:
56
                // Fallthrough to end (no-op).
57
                break;
58
        }
59
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
60
        // still assert here for safety.
61
        assert(false);
62
        return http::verb::get;
63
}
64

65
static expected::expected<Method, error::Error> BeastVerbToMethod(
212✔
66
        http::verb verb, const string &verb_string) {
67
        switch (verb) {
212✔
68
        case http::verb::get:
178✔
69
                return Method::GET;
70
        case http::verb::head:
×
71
                return Method::HEAD;
72
        case http::verb::post:
12✔
73
                return Method::POST;
74
        case http::verb::put:
22✔
75
                return Method::PUT;
76
        case http::verb::patch:
×
77
                return Method::PATCH;
78
        case http::verb::connect:
×
79
                return Method::CONNECT;
80
        default:
×
81
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
82
        }
83
}
84

85
template <typename StreamType>
86
class BodyAsyncReader : virtual public io::AsyncReader {
87
public:
88
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
141✔
89
                stream_ {stream},
90
                cancelled_ {cancelled} {
282✔
91
        }
141✔
92
        ~BodyAsyncReader() {
36✔
93
                Cancel();
36✔
94
        }
72✔
95

96
        error::Error AsyncRead(
2,030✔
97
                vector<uint8_t>::iterator start,
98
                vector<uint8_t>::iterator end,
99
                io::AsyncIoHandler handler) override {
100
                if (*cancelled_) {
2,030✔
101
                        return error::MakeError(
×
102
                                error::ProgrammingError,
103
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
104
                }
105
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,030✔
106
                return error::NoError;
2,030✔
107
        }
108

109
        void Cancel() override {
38✔
110
                if (!*cancelled_) {
38✔
111
                        stream_.Cancel();
4✔
112
                }
113
        }
38✔
114

115
private:
116
        StreamType &stream_;
117
        shared_ptr<bool> cancelled_;
118

119
        friend class Client;
120
        friend class Server;
121
};
122

123
template <typename StreamType>
124
class RawSocket : virtual public io::AsyncReadWriter {
125
public:
126
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
127
                destroying_ {make_shared<bool>(false)},
×
128
                stream_ {stream},
129
                buffered_ {buffered} {
×
130
                // If there are no buffered bytes, then we don't need it.
131
                if (buffered_ && buffered_->size() == 0) {
×
132
                        buffered_.reset();
×
133
                }
134
        }
×
135

136
        ~RawSocket() {
15✔
137
                *destroying_ = true;
15✔
138
                Cancel();
15✔
139
        }
30✔
140

141
        error::Error AsyncRead(
320✔
142
                vector<uint8_t>::iterator start,
143
                vector<uint8_t>::iterator end,
144
                io::AsyncIoHandler handler) override {
145
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
146
                // header and parts of the body in one block, return those first.
147
                if (buffered_) {
320✔
148
                        return DrainPrebufferedData(start, end, handler);
8✔
149
                }
150

151
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
152
                auto &destroying = destroying_;
153
                stream_->async_read_some(
632✔
154
                        read_buffer_,
316✔
155
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
156
                                if (*destroying) {
313✔
157
                                        return;
158
                                }
159

160
                                if (ec == asio::error::operation_aborted) {
313✔
161
                                        handler(expected::unexpected(error::Error(
12✔
162
                                                make_error_condition(errc::operation_canceled),
6✔
163
                                                "Could not read from socket")));
164
                                } else if (ec) {
310✔
165
                                        handler(expected::unexpected(
12✔
166
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
167
                                } else {
168
                                        handler(num_read);
608✔
169
                                }
170
                        });
171
                return error::NoError;
316✔
172
        }
173

174
        error::Error AsyncWrite(
309✔
175
                vector<uint8_t>::const_iterator start,
176
                vector<uint8_t>::const_iterator end,
177
                io::AsyncIoHandler handler) override {
178
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
179
                auto &destroying = destroying_;
180
                stream_->async_write_some(
618✔
181
                        write_buffer_,
309✔
182
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
183
                                if (*destroying) {
306✔
184
                                        return;
185
                                }
186

187
                                if (ec == asio::error::operation_aborted) {
306✔
188
                                        handler(expected::unexpected(error::Error(
×
189
                                                make_error_condition(errc::operation_canceled),
×
190
                                                "Could not write to socket")));
191
                                } else if (ec) {
306✔
192
                                        handler(expected::unexpected(
×
193
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
194
                                } else {
195
                                        handler(num_written);
612✔
196
                                }
197
                        });
198
                return error::NoError;
309✔
199
        }
200

201
        void Cancel() override {
28✔
202
                if (stream_->lowest_layer().is_open()) {
28✔
203
                        stream_->lowest_layer().cancel();
15✔
204
                        stream_->lowest_layer().close();
15✔
205
                }
206
        }
28✔
207

208
private:
209
        error::Error DrainPrebufferedData(
4✔
210
                vector<uint8_t>::iterator start,
211
                vector<uint8_t>::iterator end,
212
                io::AsyncIoHandler handler) {
213
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
214

215
                // These two lines are equivalent to:
216
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
217
                // but compatible with Boost 1.67.
218
                const beast::flat_buffer &cbuffered = *buffered_;
219
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
220
                buffered_->consume(to_copy);
4✔
221
                if (buffered_->size() == 0) {
4✔
222
                        // We don't need it anymore.
223
                        buffered_.reset();
4✔
224
                }
225
                handler(to_copy);
4✔
226
                return error::NoError;
4✔
227
        }
228

229
        shared_ptr<bool> destroying_;
230
        shared_ptr<StreamType> stream_;
231
        shared_ptr<beast::flat_buffer> buffered_;
232
        asio::mutable_buffer read_buffer_;
233
        asio::const_buffer write_buffer_;
234
};
235

236
Client::Client(
332✔
237
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
238
        event_loop_ {event_loop},
239
        logger_name_ {logger_name},
240
        client_config_ {client},
241
        http_proxy_ {client.http_proxy},
332✔
242
        https_proxy_ {client.https_proxy},
332✔
243
        no_proxy_ {client.no_proxy},
332✔
244
        cancelled_ {make_shared<bool>(true)},
×
245
        disable_keep_alive_ {client.disable_keep_alive},
332✔
246
        resolver_(GetAsioIoContext(event_loop)),
247
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,328✔
248
}
332✔
249

250
Client::~Client() {
1,992✔
251
        if (!*cancelled_) {
332✔
252
                logger_.Warning("Client destroyed while request is still active!");
28✔
253
        }
254
        DoCancel();
332✔
255
}
332✔
256

257
error::Error Client::Initialize() {
260✔
258
        if (initialized_) {
260✔
259
                return error::NoError;
66✔
260
        }
261

262
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
572✔
263
                ssl_ctx_[i].set_verify_mode(
383✔
264
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
383✔
265

266
                beast::error_code ec {};
383✔
267
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
383✔
268
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
269
                        ssl_ctx_[i].use_certificate_file(
270
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
271
                        if (ec) {
4✔
272
                                return error::Error(
273
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
274
                        }
275
                        auto exp_key = crypto::PrivateKey::Load(
276
                                {client_config_.client_cert_key_path, "", client_config_.ssl_engine});
6✔
277
                        if (!exp_key) {
3✔
278
                                return exp_key.error().WithContext(
279
                                        "Error loading private key from " + client_config_.client_cert_key_path);
2✔
280
                        }
281

282
                        const int ret =
283
                                SSL_CTX_use_PrivateKey(ssl_ctx_[i].native_handle(), exp_key.value().Get());
2✔
284
                        if (ret != 1) {
2✔
285
                                log::Error(
×
286
                                        "Failed to add the PrivateKey: " + client_config_.client_cert_key_path
×
287
                                        + " to the SSL CTX. The HTTP client will not be functioning normally");
×
288
                        }
289
                } else if (
290
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
379✔
291
                        return error::Error(
292
                                make_error_condition(errc::invalid_argument),
4✔
293
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
294
                }
295

296
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
379✔
297
                if (ec) {
379✔
298
                        auto err = error::Error(
299
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
300
                        if (client_config_.server_cert_path == "") {
×
301
                                // We aren't going to have any valid certificates then.
302
                                return err;
×
303
                        } else {
304
                                // We have a dedicated certificate, so this is not fatal.
305
                                log::Info(err.String());
×
306
                        }
307
                }
308
                if (client_config_.server_cert_path != "") {
379✔
309
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
35✔
310
                        if (ec) {
35✔
311
                                return error::Error(
312
                                        ec.default_error_condition(), "Failed to load the server certificate!");
2✔
313
                        }
314
                }
315
        }
316

317
        initialized_ = true;
189✔
318

319
        return error::NoError;
189✔
320
}
321

322
error::Error Client::AsyncCall(
260✔
323
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
324
        auto err = Initialize();
260✔
325
        if (err != error::NoError) {
260✔
326
                return err;
5✔
327
        }
328

329
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
255✔
330
                return error::Error(
331
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
332
        }
333

334
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
255✔
335
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
336
        }
337

338
        if (!header_handler || !body_handler) {
253✔
339
                return error::MakeError(
340
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
341
        }
342

343
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
252✔
344
                return error::Error(
345
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
346
        }
347

348
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
251✔
349

350
        request_ = req;
351

352
        err = HandleProxySetup();
251✔
353
        if (err != error::NoError) {
251✔
354
                return err;
4✔
355
        }
356

357
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
358
        // request to route to our k8s cluster. Set this in all cases.
359
        req->SetHeader("HOST", req->address_.host);
494✔
360

361
        header_handler_ = header_handler;
247✔
362
        body_handler_ = body_handler;
247✔
363
        status_ = TransactionStatus::None;
247✔
364

365
        cancelled_ = make_shared<bool>(false);
247✔
366

367
        auto &cancelled = cancelled_;
368

369
        resolver_.async_resolve(
494✔
370
                request_->address_.host,
371
                to_string(request_->address_.port),
494✔
372
                [this, cancelled](
493✔
373
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
245✔
374
                        if (!*cancelled) {
246✔
375
                                ResolveHandler(ec, results);
245✔
376
                        }
377
                });
246✔
378

379
        return error::NoError;
247✔
380
}
381

382
error::Error Client::HandleProxySetup() {
251✔
383
        secondary_req_.reset();
251✔
384

385
        if (request_->address_.protocol == "http") {
251✔
386
                socket_mode_ = SocketMode::Plain;
234✔
387

388
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, http_proxy_)) {
234✔
389
                        // Make a modified proxy request.
390
                        BrokenDownUrl proxy_address;
16✔
391
                        auto err = BreakDownUrl(http_proxy_, proxy_address);
9✔
392
                        if (err != error::NoError) {
9✔
393
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
394
                        }
395
                        if (proxy_address.path != "" && proxy_address.path != "/") {
8✔
396
                                return MakeError(
397
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
398
                        }
399

400
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
14✔
401
                                                                          + ":" + to_string(request_->address_.port)
21✔
402
                                                                          + request_->address_.path;
21✔
403
                        request_->address_.host = proxy_address.host;
7✔
404
                        request_->address_.port = proxy_address.port;
7✔
405
                        request_->address_.protocol = proxy_address.protocol;
7✔
406

407
                        if (proxy_address.protocol == "https") {
7✔
408
                                socket_mode_ = SocketMode::Tls;
4✔
409
                        } else if (proxy_address.protocol == "http") {
3✔
410
                                socket_mode_ = SocketMode::Plain;
3✔
411
                        } else {
412
                                // Should never get here.
413
                                assert(false);
414
                        }
415
                }
416
        } else if (request_->address_.protocol == "https") {
17✔
417
                socket_mode_ = SocketMode::Tls;
17✔
418

419
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, https_proxy_)) {
17✔
420
                        // Save the original request for later, so that we can make a new request
421
                        // over the channel established by CONNECT.
422
                        secondary_req_ = std::move(request_);
423

424
                        request_ = make_shared<OutgoingRequest>();
20✔
425
                        request_->SetMethod(Method::CONNECT);
10✔
426
                        BrokenDownUrl proxy_address;
18✔
427
                        auto err = BreakDownUrl(https_proxy_, proxy_address);
10✔
428
                        if (err != error::NoError) {
10✔
429
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
430
                        }
431
                        if (proxy_address.path != "" && proxy_address.path != "/") {
9✔
432
                                return MakeError(
433
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
434
                        }
435

436
                        request_->address_.path =
437
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
16✔
438
                        request_->address_.host = proxy_address.host;
8✔
439
                        request_->address_.port = proxy_address.port;
8✔
440
                        request_->address_.protocol = proxy_address.protocol;
8✔
441

442
                        if (proxy_address.protocol == "https") {
8✔
443
                                socket_mode_ = SocketMode::Tls;
4✔
444
                        } else if (proxy_address.protocol == "http") {
4✔
445
                                socket_mode_ = SocketMode::Plain;
4✔
446
                        } else {
447
                                // Should never get here.
448
                                assert(false);
449
                        }
450
                }
451
        } else {
452
                // Should never get here
453
                assert(false);
454
        }
455

456
        return error::NoError;
247✔
457
}
458

459
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
159✔
460
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
159✔
461
                return expected::unexpected(error::Error(
2✔
462
                        make_error_condition(errc::operation_in_progress),
4✔
463
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
464
        }
465

466
        if (response_body_length_ == 0) {
157✔
467
                return expected::unexpected(
16✔
468
                        MakeError(BodyMissingError, "Response does not contain a body"));
48✔
469
        }
470

471
        status_ = TransactionStatus::ReaderCreated;
141✔
472
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
282✔
473
}
474

475
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
476
        if (*cancelled_) {
7✔
477
                return expected::unexpected(error::Error(
×
478
                        make_error_condition(errc::not_connected),
×
479
                        "Cannot switch protocols if endpoint is not connected"));
×
480
        }
481

482
        // Rest of the connection is done directly on the socket, we are done here.
483
        status_ = TransactionStatus::Done;
7✔
484
        *cancelled_ = true;
7✔
485
        cancelled_ = make_shared<bool>(false);
14✔
486

487
        auto stream = stream_;
488
        // This no longer belongs to us.
489
        stream_.reset();
7✔
490

491
        switch (socket_mode_) {
7✔
492
        case SocketMode::TlsTls:
×
493
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
494
                        stream, response_data_.response_buffer_);
×
495
        case SocketMode::Tls:
×
496
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
497
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
498
                        response_data_.response_buffer_);
×
499
        case SocketMode::Plain:
7✔
500
                return make_shared<RawSocket<tcp::socket>>(
7✔
501
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
502
                        response_data_.response_buffer_);
7✔
503
        }
504

505
        AssertOrReturnUnexpected(false);
×
506
}
507

508
void Client::CallHandler(ResponseHandler handler) {
281✔
509
        // This function exists to make sure we have a copy of the handler we're calling (in the
510
        // argument list). This is important in case the handler owns the client instance through a
511
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
512
        // does, then it destroys the final copy of the handler, and therefore also the client,
513
        // which is why we need to make a copy here, before calling it.
514
        handler(response_);
281✔
515
}
281✔
516

517
void Client::CallErrorHandler(
33✔
518
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
519
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
99✔
520
}
33✔
521

522
void Client::CallErrorHandler(
162✔
523
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
524
        *cancelled_ = true;
162✔
525
        cancelled_ = make_shared<bool>(true);
162✔
526
        stream_.reset();
162✔
527
        status_ = TransactionStatus::Done;
162✔
528
        handler(expected::unexpected(
324✔
529
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
648✔
530
}
162✔
531

532
void Client::ResolveHandler(
245✔
533
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
534
        if (ec) {
245✔
535
                CallErrorHandler(ec, request_, header_handler_);
×
536
                return;
×
537
        }
538

539
        if (logger_.Level() >= log::LogLevel::Debug) {
245✔
540
                string ips = "[";
228✔
541
                string sep;
542
                for (auto r : results) {
964✔
543
                        ips += sep;
254✔
544
                        ips += r.endpoint().address().to_string();
254✔
545
                        sep = ", ";
254✔
546
                }
547
                ips += "]";
228✔
548
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
456✔
549
        }
550

551
        resolver_results_ = results;
552

553
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
245✔
554
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
490✔
555

556
        if (!response_data_.response_buffer_) {
245✔
557
                // We can reuse this if preexisting.
558
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
358✔
559

560
                // This is equivalent to:
561
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
562
                // but compatible with Boost 1.67.
563
                response_data_.response_buffer_->prepare(
564
                        body_buffer_.size() - response_data_.response_buffer_->size());
179✔
565
        }
566

567
        auto &cancelled = cancelled_;
568

569
        asio::async_connect(
245✔
570
                stream_->lowest_layer(),
571
                resolver_results_,
245✔
572
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
490✔
573
                        if (!*cancelled) {
245✔
574
                                switch (socket_mode_) {
245✔
575
                                case SocketMode::TlsTls:
×
576
                                        // Should never happen because we always need to handshake
577
                                        // the innermost Tls first, then the outermost, but the
578
                                        // latter doesn't happen here.
579
                                        assert(false);
580
                                        CallErrorHandler(
×
581
                                                error::MakeError(
×
582
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
583
                                                request_,
×
584
                                                header_handler_);
×
585
                                case SocketMode::Tls:
14✔
586
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
14✔
587
                                case SocketMode::Plain:
231✔
588
                                        return ConnectHandler(ec, endpoint);
231✔
589
                                }
590
                        }
591
                });
592
}
593

594
template <typename StreamType>
595
void Client::HandshakeHandler(
16✔
596
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
597
        if (ec) {
16✔
598
                CallErrorHandler(ec, request_, header_handler_);
2✔
599
                return;
2✔
600
        }
601

602
        if (not disable_keep_alive_) {
14✔
603
                boost::asio::socket_base::keep_alive option(true);
604
                stream_->lowest_layer().set_option(option);
14✔
605
        }
606

607
        // Set SNI Hostname (many hosts need this to handshake successfully)
608
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
14✔
609
                beast::error_code ec2 {
×
610
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
611
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
612
        }
613

614
        auto &cancelled = cancelled_;
615

616
        stream.async_handshake(
28✔
617
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
14✔
618
                        if (*cancelled) {
15✔
619
                                return;
620
                        }
621
                        if (ec) {
15✔
622
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
8✔
623
                                CallErrorHandler(ec, request_, header_handler_);
4✔
624
                                return;
4✔
625
                        }
626
                        logger_.Debug("https: Successful SSL handshake");
22✔
627
                        ConnectHandler(ec, endpoint);
11✔
628
                });
629
}
630

631

632
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
242✔
633
        if (ec) {
242✔
634
                CallErrorHandler(ec, request_, header_handler_);
16✔
635
                return;
16✔
636
        }
637

638
        if (not disable_keep_alive_) {
226✔
639
                boost::asio::socket_base::keep_alive option(true);
640
                stream_->lowest_layer().set_option(option);
173✔
641
        }
642

643
        logger_.Debug("Connected to " + endpoint.address().to_string());
452✔
644

645
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
226✔
646
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
452✔
647

648
        for (const auto &header : request_->headers_) {
597✔
649
                request_data_.http_request_->set(header.first, header.second);
371✔
650
        }
651

652
        request_data_.http_request_serializer_ =
653
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
226✔
654

655
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
452✔
656

657
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
658
        // if they do, they should be handled higher up in the application logic.
659
        //
660
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
661
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
662
        // `has_value()` in their code, causing their subsequent comparison operation to
663
        // misbehave. So pass highest possible value instead.
664
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
665

666
        auto &cancelled = cancelled_;
667
        auto &request_data = request_data_;
226✔
668

669
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
226✔
670
                if (!*cancelled) {
226✔
671
                        WriteHeaderHandler(ec, num_written);
226✔
672
                }
673
        };
452✔
674

675
        switch (socket_mode_) {
226✔
676
        case SocketMode::TlsTls:
1✔
677
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
1✔
678
                break;
679
        case SocketMode::Tls:
10✔
680
                http::async_write_header(
10✔
681
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
682
                break;
683
        case SocketMode::Plain:
215✔
684
                http::async_write_header(
215✔
685
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
686
                break;
687
        }
688
}
689

690
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
226✔
691
        if (num_written > 0) {
226✔
692
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
452✔
693
        }
694

695
        if (ec) {
226✔
696
                CallErrorHandler(ec, request_, header_handler_);
×
697
                return;
189✔
698
        }
699

700
        auto header = request_->GetHeader("Content-Length");
452✔
701
        if (!header || header.value() == "0") {
226✔
702
                ReadHeader();
188✔
703
                return;
704
        }
705

706
        auto length = common::StringToLongLong(header.value());
38✔
707
        if (!length || length.value() < 0) {
38✔
708
                auto err = error::Error(
709
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
710
                CallErrorHandler(err, request_, header_handler_);
×
711
                return;
712
        }
713
        request_body_length_ = length.value();
38✔
714

715
        if (!request_->body_gen_ && !request_->async_body_gen_) {
38✔
716
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
717
                CallErrorHandler(err, request_, header_handler_);
2✔
718
                return;
719
        }
720

721
        assert(!(request_->body_gen_ && request_->async_body_gen_));
722

723
        if (request_->body_gen_) {
37✔
724
                auto body_reader = request_->body_gen_();
31✔
725
                if (!body_reader) {
31✔
726
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
727
                        return;
728
                }
729
                request_->body_reader_ = body_reader.value();
31✔
730
        } else {
731
                auto body_reader = request_->async_body_gen_();
6✔
732
                if (!body_reader) {
6✔
733
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
734
                        return;
735
                }
736
                request_->async_body_reader_ = body_reader.value();
6✔
737
        }
738

739
        PrepareAndWriteNewBodyBuffer();
37✔
740
}
741

742
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,118✔
743
        if (num_written > 0) {
2,118✔
744
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,084✔
745
        }
746

747
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,118✔
748
                // Write next block of the body.
749
                PrepareAndWriteNewBodyBuffer();
1,042✔
750
        } else if (ec) {
1,076✔
751
                CallErrorHandler(ec, request_, header_handler_);
8✔
752
        } else if (num_written > 0) {
1,072✔
753
                // We are still writing the body.
754
                WriteBody();
1,042✔
755
        } else {
756
                // We are ready to receive the response.
757
                ReadHeader();
30✔
758
        }
759
}
2,118✔
760

761
void Client::PrepareAndWriteNewBodyBuffer() {
1,079✔
762
        // request_->body_reader_ XOR request_->async_body_reader_
763
        assert(
764
                (request_->body_reader_ || request_->async_body_reader_)
765
                && !(request_->body_reader_ && request_->async_body_reader_));
766

767
        auto cancelled = cancelled_;
768
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,430✔
769
                if (!*cancelled) {
1,079✔
770
                        if (!read) {
1,078✔
771
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
772
                                return;
2✔
773
                        }
774
                        WriteNewBodyBuffer(read.value());
1,076✔
775
                }
776
        };
1,079✔
777

778

779
        if (request_->body_reader_) {
1,079✔
780
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,308✔
781
        } else {
782
                auto err = request_->async_body_reader_->AsyncRead(
783
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
784
                if (err != error::NoError) {
425✔
785
                        CallErrorHandler(err, request_, header_handler_);
×
786
                }
787
        }
788
}
1,079✔
789

790
void Client::WriteNewBodyBuffer(size_t size) {
1,076✔
791
        request_data_.http_request_->body().data = body_buffer_.data();
1,076✔
792
        request_data_.http_request_->body().size = size;
1,076✔
793

794
        if (size > 0) {
1,076✔
795
                request_data_.http_request_->body().more = true;
1,046✔
796
        } else {
797
                // Release ownership of Body reader.
798
                request_->body_reader_.reset();
30✔
799
                request_->async_body_reader_.reset();
30✔
800
                request_data_.http_request_->body().more = false;
30✔
801
        }
802

803
        WriteBody();
1,076✔
804
}
1,076✔
805

806
void Client::WriteBody() {
2,118✔
807
        auto &cancelled = cancelled_;
808
        auto &request_data = request_data_;
2,118✔
809

810
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,118✔
811
                if (!*cancelled) {
2,118✔
812
                        WriteBodyHandler(ec, num_written);
2,118✔
813
                }
814
        };
4,236✔
815

816
        switch (socket_mode_) {
2,118✔
817
        case SocketMode::TlsTls:
×
818
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
819
                break;
820
        case SocketMode::Tls:
×
821
                http::async_write_some(
822
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
823
                break;
824
        case SocketMode::Plain:
2,118✔
825
                http::async_write_some(
826
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
827
                break;
828
        }
829
}
2,118✔
830

831
void Client::ReadHeader() {
218✔
832
        auto &cancelled = cancelled_;
833
        auto &response_data = response_data_;
218✔
834

835
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
217✔
836
                if (!*cancelled) {
217✔
837
                        ReadHeaderHandler(ec, num_read);
217✔
838
                }
839
        };
436✔
840

841
        switch (socket_mode_) {
218✔
842
        case SocketMode::TlsTls:
1✔
843
                http::async_read_some(
1✔
844
                        *stream_,
845
                        *response_data_.response_buffer_,
846
                        *response_data_.http_response_parser_,
847
                        handler);
848
                break;
849
        case SocketMode::Tls:
10✔
850
                http::async_read_some(
10✔
851
                        stream_->next_layer(),
852
                        *response_data_.response_buffer_,
853
                        *response_data_.http_response_parser_,
854
                        handler);
855
                break;
856
        case SocketMode::Plain:
207✔
857
                http::async_read_some(
207✔
858
                        stream_->next_layer().next_layer(),
859
                        *response_data_.response_buffer_,
860
                        *response_data_.http_response_parser_,
861
                        handler);
862
                break;
863
        }
864
}
218✔
865

866
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
217✔
867
        if (num_read > 0) {
217✔
868
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
424✔
869
        }
870

871
        if (ec) {
217✔
872
                CallErrorHandler(ec, request_, header_handler_);
5✔
873
                return;
75✔
874
        }
875

876
        if (!response_data_.http_response_parser_->is_header_done()) {
212✔
877
                ReadHeader();
×
878
                return;
×
879
        }
880

881
        auto content_length = response_data_.http_response_parser_->content_length();
212✔
882
        if (content_length) {
212✔
883
                response_body_length_ = content_length.value();
175✔
884
        } else {
885
                response_body_length_ = 0;
37✔
886
        }
887

888
        if (secondary_req_) {
212✔
889
                HandleSecondaryRequest();
5✔
890
                return;
5✔
891
        }
892

893
        response_.reset(new IncomingResponse(*this, cancelled_));
414✔
894
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
207✔
895
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
207✔
896

897
        logger_.Debug(
414✔
898
                "Received response: " + to_string(response_->status_code_) + " "
414✔
899
                + response_->status_message_);
621✔
900

901
        string debug_str;
902
        for (auto header = response_data_.http_response_parser_->get().cbegin();
243✔
903
                 header != response_data_.http_response_parser_->get().cend();
450✔
904
                 header++) {
905
                response_->headers_[string {header->name_string()}] = string {header->value()};
729✔
906
                if (logger_.Level() >= log::LogLevel::Debug) {
243✔
907
                        debug_str += string {header->name_string()};
230✔
908
                        debug_str += ": ";
230✔
909
                        debug_str += string {header->value()};
230✔
910
                        debug_str += "\n";
230✔
911
                }
912
        }
913

914
        logger_.Debug("Received headers:\n" + debug_str);
414✔
915
        debug_str.clear();
916

917
        if (response_data_.http_response_parser_->chunked()) {
207✔
918
                auto cancelled = cancelled_;
919
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
920
                CallHandler(header_handler_);
2✔
921
                if (!*cancelled) {
1✔
922
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
923
                        CallErrorHandler(err, request_, body_handler_);
2✔
924
                }
925
                return;
926
        }
927

928
        response_body_read_ = 0;
206✔
929

930
        if (response_body_read_ >= response_body_length_) {
206✔
931
                auto cancelled = cancelled_;
932
                status_ = TransactionStatus::HeaderHandlerCalled;
45✔
933
                CallHandler(header_handler_);
90✔
934
                if (!*cancelled) {
45✔
935
                        status_ = TransactionStatus::Done;
39✔
936
                        CallHandler(body_handler_);
78✔
937

938
                        // After body handler has run, set the request to cancelled. The body
939
                        // handler may have made a new request, so this is not necessarily the same
940
                        // request as is currently active (note use of shared_ptr copy, not
941
                        // `cancelled_`).
942
                        *cancelled = true;
39✔
943
                }
944
                return;
945
        }
946

947
        auto cancelled = cancelled_;
948
        status_ = TransactionStatus::HeaderHandlerCalled;
161✔
949
        CallHandler(header_handler_);
322✔
950
        if (*cancelled) {
161✔
951
                return;
952
        }
953

954
        // We know that a body reader is required here, because of the `response_body_read_ >=
955
        // response_body_length_` check above.
956
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
142✔
957
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
4✔
958
        }
959
}
960

961
void Client::HandleSecondaryRequest() {
5✔
962
        logger_.Debug(
10✔
963
                "Received proxy response: "
964
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
10✔
965
                + string {response_data_.http_response_parser_->get().reason()});
20✔
966

967
        request_ = std::move(secondary_req_);
968

969
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
5✔
970
                auto err = MakeError(
971
                        ProxyError,
972
                        "Proxy returned unexpected response: "
973
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
974
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
975
                CallErrorHandler(err, request_, header_handler_);
4✔
976
                return;
977
        }
978

979
        if (response_body_length_ != 0 || response_data_.http_response_parser_->chunked()) {
3✔
980
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
981
                CallErrorHandler(err, request_, header_handler_);
×
982
                return;
983
        }
984

985
        // We are connected. Now repeat the request cycle with the original request. Pretend
986
        // we were just connected.
987

988
        assert(request_->GetProtocol() == "https");
989

990
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
991
        // a different layer, this will get out of sync.
992
        assert(response_data_.response_buffer_->size() == 0);
993

994
        switch (socket_mode_) {
3✔
995
        case SocketMode::TlsTls:
×
996
                // Should never get here, because this is the only place where TlsTls mode
997
                // is supposed to be turned on.
998
                assert(false);
999
                CallErrorHandler(
×
1000
                        error::MakeError(
×
1001
                                error::ProgrammingError,
1002
                                "Any other mode than Tls is not valid when handling secondary request"),
×
1003
                        request_,
×
1004
                        header_handler_);
×
1005
                break;
×
1006
        case SocketMode::Tls:
1✔
1007
                // Upgrade to TLS inside TLS.
1008
                socket_mode_ = SocketMode::TlsTls;
1✔
1009
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
1✔
1010
                break;
1✔
1011
        case SocketMode::Plain:
2✔
1012
                // Upgrade to TLS.
1013
                socket_mode_ = SocketMode::Tls;
2✔
1014
                HandshakeHandler(
2✔
1015
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
2✔
1016
                break;
2✔
1017
        }
1018
}
1019

1020
void Client::AsyncReadNextBodyPart(
3,796✔
1021
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1022
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1023

1024
        if (status_ == TransactionStatus::ReaderCreated) {
3,796✔
1025
                status_ = TransactionStatus::BodyReadingInProgress;
140✔
1026
        }
1027

1028
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
3,796✔
1029
                auto cancelled = cancelled_;
1030
                handler(0);
70✔
1031
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
35✔
1032
                        status_ = TransactionStatus::Done;
35✔
1033
                        CallHandler(body_handler_);
70✔
1034

1035
                        // After body handler has run, set the request to cancelled. The body
1036
                        // handler may have made a new request, so this is not necessarily the same
1037
                        // request as is currently active (note use of shared_ptr copy, not
1038
                        // `cancelled_`).
1039
                        *cancelled = true;
35✔
1040
                }
1041
                return;
1042
        }
1043

1044
        reader_buf_start_ = start;
3,761✔
1045
        reader_buf_end_ = end;
3,761✔
1046
        reader_handler_ = handler;
3,761✔
1047
        size_t read_size = end - start;
3,761✔
1048
        size_t smallest = min(body_buffer_.size(), read_size);
5,874✔
1049

1050
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
3,761✔
1051
        response_data_.http_response_parser_->get().body().size = smallest;
3,761✔
1052

1053
        auto &cancelled = cancelled_;
1054
        auto &response_data = response_data_;
3,761✔
1055

1056
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
3,760✔
1057
                if (!*cancelled) {
3,760✔
1058
                        ReadBodyHandler(ec, num_read);
3,759✔
1059
                }
1060
        };
7,522✔
1061

1062
        switch (socket_mode_) {
3,761✔
1063
        case SocketMode::TlsTls:
1✔
1064
                http::async_read_some(
1✔
1065
                        *stream_,
1066
                        *response_data_.response_buffer_,
1067
                        *response_data_.http_response_parser_,
1068
                        async_handler);
1069
                break;
1070
        case SocketMode::Tls:
2✔
1071
                http::async_read_some(
2✔
1072
                        stream_->next_layer(),
1073
                        *response_data_.response_buffer_,
1074
                        *response_data_.http_response_parser_,
1075
                        async_handler);
1076
                break;
1077
        case SocketMode::Plain:
3,758✔
1078
                http::async_read_some(
3,758✔
1079
                        stream_->next_layer().next_layer(),
1080
                        *response_data_.response_buffer_,
1081
                        *response_data_.http_response_parser_,
1082
                        async_handler);
1083
                break;
1084
        }
1085
}
1086

1087
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
3,759✔
1088
        if (num_read > 0) {
3,759✔
1089
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
7,420✔
1090
                response_body_read_ += num_read;
3,710✔
1091
        }
1092

1093
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,759✔
1094
                // This can be ignored. We always reset the buffer between reads anyway.
1095
                ec = error_code();
1,958✔
1096
        }
1097

1098
        assert(reader_handler_);
1099

1100
        if (response_body_read_ >= response_body_length_) {
3,759✔
1101
                status_ = TransactionStatus::BodyReadingFinished;
87✔
1102
        }
1103

1104
        auto cancelled = cancelled_;
1105

1106
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
3,759✔
1107
        size_t smallest = min(num_read, buf_size);
3,759✔
1108
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,759✔
1109
        if (ec) {
3,759✔
1110
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
98✔
1111
                reader_handler_(expected::unexpected(err));
147✔
1112
        } else {
1113
                reader_handler_(smallest);
7,420✔
1114
        }
1115

1116
        if (!*cancelled && ec) {
3,759✔
1117
                CallErrorHandler(ec, request_, body_handler_);
4✔
1118
                return;
1119
        }
1120
}
1121

1122
void Client::Cancel() {
195✔
1123
        auto cancelled = cancelled_;
1124

1125
        if (!*cancelled) {
195✔
1126
                auto err =
1127
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
246✔
1128
                switch (status_) {
123✔
1129
                case TransactionStatus::None:
1✔
1130
                        CallErrorHandler(err, request_, header_handler_);
1✔
1131
                        break;
1✔
1132
                case TransactionStatus::HeaderHandlerCalled:
120✔
1133
                case TransactionStatus::ReaderCreated:
1134
                case TransactionStatus::BodyReadingInProgress:
1135
                case TransactionStatus::BodyReadingFinished:
1136
                        CallErrorHandler(err, request_, body_handler_);
120✔
1137
                        break;
120✔
1138
                case TransactionStatus::Replying:
1139
                case TransactionStatus::SwitchingProtocol:
1140
                        // Not used by client.
1141
                        assert(false);
1142
                        break;
1143
                case TransactionStatus::BodyHandlerCalled:
1144
                case TransactionStatus::Done:
1145
                        break;
1146
                }
1147
        }
1148

1149
        if (!*cancelled) {
195✔
1150
                DoCancel();
2✔
1151
        }
1152
}
195✔
1153

1154
void Client::DoCancel() {
334✔
1155
        resolver_.cancel();
334✔
1156
        if (stream_) {
334✔
1157
                stream_->lowest_layer().cancel();
66✔
1158
                stream_->lowest_layer().close();
66✔
1159
                stream_.reset();
66✔
1160
        }
1161

1162
        request_.reset();
334✔
1163
        response_.reset();
334✔
1164

1165
        // Reset logger to no connection.
1166
        logger_ = log::Logger(logger_name_);
334✔
1167

1168
        // Set cancel state and then make a new one. Those who are interested should have their own
1169
        // pointer to the old one.
1170
        *cancelled_ = true;
334✔
1171
        cancelled_ = make_shared<bool>(true);
334✔
1172
}
334✔
1173

1174
Stream::Stream(Server &server) :
418✔
1175
        server_ {server},
1176
        logger_ {"http"},
1177
        cancelled_(make_shared<bool>(true)),
418✔
1178
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
418✔
1179
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,254✔
1180
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
836✔
1181

1182
        // This is equivalent to:
1183
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1184
        // but compatible with Boost 1.67.
1185
        request_data_.request_buffer_->prepare(
1186
                body_buffer_.size() - request_data_.request_buffer_->size());
418✔
1187

1188
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
836✔
1189

1190
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1191
        // they do, they should be handled higher up in the application logic.
1192
        //
1193
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1194
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1195
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1196
        // possible value instead.
1197
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1198
}
418✔
1199

1200
Stream::~Stream() {
1,254✔
1201
        DoCancel();
418✔
1202
}
418✔
1203

1204
void Stream::Cancel() {
7✔
1205
        auto cancelled = cancelled_;
1206

1207
        if (!*cancelled) {
7✔
1208
                auto err =
1209
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1210
                switch (status_) {
7✔
1211
                case TransactionStatus::None:
×
1212
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1213
                        break;
×
1214
                case TransactionStatus::HeaderHandlerCalled:
5✔
1215
                case TransactionStatus::ReaderCreated:
1216
                case TransactionStatus::BodyReadingInProgress:
1217
                case TransactionStatus::BodyReadingFinished:
1218
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1219
                        break;
5✔
1220
                case TransactionStatus::BodyHandlerCalled:
×
1221
                        // In between body handler and reply finished. No one to handle the status
1222
                        // here.
1223
                        server_.RemoveStream(shared_from_this());
×
1224
                        break;
×
1225
                case TransactionStatus::Replying:
1✔
1226
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1227
                        break;
1✔
1228
                case TransactionStatus::SwitchingProtocol:
1✔
1229
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1230
                        break;
1✔
1231
                case TransactionStatus::Done:
1232
                        break;
1233
                }
1234
        }
1235

1236
        if (!*cancelled) {
7✔
1237
                DoCancel();
×
1238
        }
1239
}
7✔
1240

1241
void Stream::DoCancel() {
594✔
1242
        if (socket_.is_open()) {
594✔
1243
                socket_.cancel();
204✔
1244
                socket_.close();
204✔
1245
        }
1246

1247
        // Set cancel state and then make a new one. Those who are interested should have their own
1248
        // pointer to the old one.
1249
        *cancelled_ = true;
594✔
1250
        cancelled_ = make_shared<bool>(true);
594✔
1251
}
594✔
1252

1253
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1254
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1255
}
×
1256

1257
void Stream::CallErrorHandler(
×
1258
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1259
        *cancelled_ = true;
×
1260
        cancelled_ = make_shared<bool>(true);
×
1261
        status_ = TransactionStatus::Done;
×
1262
        handler(expected::unexpected(err.WithContext(
×
1263
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1264

1265
        server_.RemoveStream(shared_from_this());
×
1266
}
×
1267

1268
void Stream::CallErrorHandler(
2✔
1269
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1270
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1271
}
2✔
1272

1273
void Stream::CallErrorHandler(
9✔
1274
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1275
        *cancelled_ = true;
9✔
1276
        cancelled_ = make_shared<bool>(true);
9✔
1277
        status_ = TransactionStatus::Done;
9✔
1278
        handler(
9✔
1279
                req,
1280
                err.WithContext(
9✔
1281
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
27✔
1282

1283
        server_.RemoveStream(shared_from_this());
9✔
1284
}
9✔
1285

1286
void Stream::CallErrorHandler(
4✔
1287
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1288
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1289
}
4✔
1290

1291
void Stream::CallErrorHandler(
7✔
1292
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1293
        *cancelled_ = true;
7✔
1294
        cancelled_ = make_shared<bool>(true);
7✔
1295
        status_ = TransactionStatus::Done;
7✔
1296
        handler(err.WithContext(
14✔
1297
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1298

1299
        server_.RemoveStream(shared_from_this());
7✔
1300
}
7✔
1301

1302
void Stream::CallErrorHandler(
×
1303
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1304
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1305
}
×
1306

1307
void Stream::CallErrorHandler(
1✔
1308
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1309
        *cancelled_ = true;
1✔
1310
        cancelled_ = make_shared<bool>(true);
1✔
1311
        status_ = TransactionStatus::Done;
1✔
1312
        handler(expected::unexpected(err.WithContext(
2✔
1313
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1314

1315
        server_.RemoveStream(shared_from_this());
1✔
1316
}
1✔
1317

1318
void Stream::AcceptHandler(const error_code &ec) {
212✔
1319
        if (ec) {
212✔
1320
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1321
                return;
×
1322
        }
1323

1324
        auto ip = socket_.remote_endpoint().address().to_string();
424✔
1325

1326
        // Use IP as context for logging.
1327
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
212✔
1328

1329
        logger_.Debug("Accepted connection.");
424✔
1330

1331
        request_.reset(new IncomingRequest(*this, cancelled_));
424✔
1332

1333
        request_->address_.host = ip;
212✔
1334

1335
        *cancelled_ = false;
212✔
1336

1337
        ReadHeader();
212✔
1338
}
1339

1340
void Stream::ReadHeader() {
212✔
1341
        auto &cancelled = cancelled_;
1342
        auto &request_data = request_data_;
212✔
1343

1344
        http::async_read_some(
424✔
1345
                socket_,
212✔
1346
                *request_data_.request_buffer_,
1347
                *request_data_.http_request_parser_,
1348
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
212✔
1349
                        if (!*cancelled) {
212✔
1350
                                ReadHeaderHandler(ec, num_read);
212✔
1351
                        }
1352
                });
212✔
1353
}
212✔
1354

1355
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
212✔
1356
        if (num_read > 0) {
212✔
1357
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
424✔
1358
        }
1359

1360
        if (ec) {
212✔
1361
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1362
                return;
175✔
1363
        }
1364

1365
        if (!request_data_.http_request_parser_->is_header_done()) {
212✔
1366
                ReadHeader();
×
1367
                return;
×
1368
        }
1369

1370
        auto method_result = BeastVerbToMethod(
1371
                request_data_.http_request_parser_->get().base().method(),
1372
                string {request_data_.http_request_parser_->get().base().method_string()});
424✔
1373
        if (!method_result) {
212✔
1374
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1375
                return;
×
1376
        }
1377
        request_->method_ = method_result.value();
212✔
1378
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
212✔
1379

1380
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
212✔
1381

1382
        string debug_str;
1383
        for (auto header = request_data_.http_request_parser_->get().cbegin();
366✔
1384
                 header != request_data_.http_request_parser_->get().cend();
578✔
1385
                 header++) {
1386
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,098✔
1387
                if (logger_.Level() >= log::LogLevel::Debug) {
366✔
1388
                        debug_str += string {header->name_string()};
313✔
1389
                        debug_str += ": ";
313✔
1390
                        debug_str += string {header->value()};
313✔
1391
                        debug_str += "\n";
313✔
1392
                }
1393
        }
1394

1395
        logger_.Debug("Received headers:\n" + debug_str);
424✔
1396
        debug_str.clear();
1397

1398
        if (request_data_.http_request_parser_->chunked()) {
212✔
1399
                auto cancelled = cancelled_;
1400
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
1401
                server_.header_handler_(request_);
2✔
1402
                if (!*cancelled) {
1✔
1403
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
1404
                        CallErrorHandler(err, request_, server_.body_handler_);
2✔
1405
                }
1406
                return;
1407
        }
1408

1409
        auto content_length = request_data_.http_request_parser_->content_length();
211✔
1410
        if (content_length) {
211✔
1411
                request_body_length_ = content_length.value();
38✔
1412
        } else {
1413
                request_body_length_ = 0;
173✔
1414
        }
1415
        request_body_read_ = 0;
211✔
1416

1417
        if (request_body_read_ >= request_body_length_) {
211✔
1418
                auto cancelled = cancelled_;
1419
                status_ = TransactionStatus::HeaderHandlerCalled;
173✔
1420
                server_.header_handler_(request_);
346✔
1421
                if (!*cancelled) {
173✔
1422
                        status_ = TransactionStatus::BodyHandlerCalled;
173✔
1423
                        CallBodyHandler();
173✔
1424
                }
1425
                return;
1426
        }
1427

1428
        auto cancelled = cancelled_;
1429
        status_ = TransactionStatus::HeaderHandlerCalled;
38✔
1430
        server_.header_handler_(request_);
76✔
1431
        if (*cancelled) {
38✔
1432
                return;
1433
        }
1434

1435
        // We know that a body reader is required here, because of the `request_body_read_ >=
1436
        // request_body_length_` check above.
1437
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
37✔
1438
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1439
        }
1440
}
1441

1442
void Stream::AsyncReadNextBodyPart(
2,030✔
1443
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1444
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1445

1446
        if (status_ == TransactionStatus::ReaderCreated) {
2,030✔
1447
                status_ = TransactionStatus::BodyReadingInProgress;
36✔
1448
        }
1449

1450
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,030✔
1451
                auto cancelled = cancelled_;
1452
                handler(0);
60✔
1453
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
30✔
1454
                        status_ = TransactionStatus::BodyHandlerCalled;
30✔
1455
                        CallBodyHandler();
30✔
1456
                }
1457
                return;
1458
        }
1459

1460
        reader_buf_start_ = start;
2,000✔
1461
        reader_buf_end_ = end;
2,000✔
1462
        reader_handler_ = handler;
2,000✔
1463
        size_t read_size = end - start;
2,000✔
1464
        size_t smallest = min(body_buffer_.size(), read_size);
3,056✔
1465

1466
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,000✔
1467
        request_data_.http_request_parser_->get().body().size = smallest;
2,000✔
1468

1469
        auto &cancelled = cancelled_;
1470
        auto &request_data = request_data_;
2,000✔
1471

1472
        http::async_read_some(
4,000✔
1473
                socket_,
2,000✔
1474
                *request_data_.request_buffer_,
1475
                *request_data_.http_request_parser_,
1476
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,000✔
1477
                        if (!*cancelled) {
2,000✔
1478
                                ReadBodyHandler(ec, num_read);
2,000✔
1479
                        }
1480
                });
2,000✔
1481
}
1482

1483
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,000✔
1484
        if (num_read > 0) {
2,000✔
1485
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
3,992✔
1486
                request_body_read_ += num_read;
1,996✔
1487
        }
1488

1489
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,000✔
1490
                // This can be ignored. We always reset the buffer between reads anyway.
1491
                ec = error_code();
979✔
1492
        }
1493

1494
        assert(reader_handler_);
1495

1496
        if (request_body_read_ >= request_body_length_) {
2,000✔
1497
                status_ = TransactionStatus::BodyReadingFinished;
30✔
1498
        }
1499

1500
        auto cancelled = cancelled_;
1501

1502
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,000✔
1503
        size_t smallest = min(num_read, buf_size);
2,000✔
1504
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,000✔
1505
        if (ec) {
2,000✔
1506
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1507
                reader_handler_(expected::unexpected(err));
12✔
1508
        } else {
1509
                reader_handler_(smallest);
3,992✔
1510
        }
1511

1512
        if (!*cancelled && ec) {
2,000✔
1513
                CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1514
                return;
1515
        }
1516
}
1517

1518
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
190✔
1519
        SetupResponse();
190✔
1520

1521
        reply_finished_handler_ = reply_finished_handler;
190✔
1522

1523
        auto &cancelled = cancelled_;
1524
        auto &response_data = response_data_;
190✔
1525

1526
        http::async_write_header(
380✔
1527
                socket_,
190✔
1528
                *response_data_.http_response_serializer_,
1529
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
190✔
1530
                        if (!*cancelled) {
190✔
1531
                                WriteHeaderHandler(ec, num_written);
189✔
1532
                        }
1533
                });
190✔
1534
}
190✔
1535

1536
void Stream::SetupResponse() {
199✔
1537
        auto response = maybe_response_.lock();
199✔
1538
        // Only called from existing responses, so this should always be true.
1539
        assert(response);
1540

1541
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1542
        status_ = TransactionStatus::Replying;
199✔
1543

1544
        // From here on we take shared ownership.
1545
        response_ = response;
1546

1547
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
398✔
1548

1549
        for (const auto &header : response->headers_) {
417✔
1550
                response_data_.http_response_->base().set(header.first, header.second);
218✔
1551
        }
1552

1553
        response_data_.http_response_->result(response->GetStatusCode());
199✔
1554
        response_data_.http_response_->reason(response->GetStatusMessage());
398✔
1555

1556
        response_data_.http_response_serializer_ =
1557
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
398✔
1558
}
199✔
1559

1560
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
189✔
1561
        if (num_written > 0) {
189✔
1562
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
378✔
1563
        }
1564

1565
        if (ec) {
189✔
1566
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1567
                return;
34✔
1568
        }
1569

1570
        auto header = response_->GetHeader("Content-Length");
378✔
1571
        if (!header || header.value() == "0") {
189✔
1572
                FinishReply();
33✔
1573
                return;
1574
        }
1575

1576
        auto length = common::StringToLongLong(header.value());
156✔
1577
        if (!length || length.value() < 0) {
156✔
1578
                auto err = error::Error(
1579
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1580
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1581
                return;
1582
        }
1583

1584
        if (!response_->body_reader_ && !response_->async_body_reader_) {
156✔
1585
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1586
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1587
                return;
1588
        }
1589

1590
        PrepareAndWriteNewBodyBuffer();
155✔
1591
}
1592

1593
void Stream::PrepareAndWriteNewBodyBuffer() {
1,872✔
1594
        // response_->body_reader_ XOR response_->async_body_reader_
1595
        assert(
1596
                (response_->body_reader_ || response_->async_body_reader_)
1597
                && !(response_->body_reader_ && response_->async_body_reader_));
1598

1599
        auto read_handler = [this](io::ExpectedSize read) {
1,873✔
1600
                if (!read) {
1,872✔
1601
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1602
                        return;
1✔
1603
                }
1604
                WriteNewBodyBuffer(read.value());
1,871✔
1605
        };
1,872✔
1606

1607
        if (response_->body_reader_) {
1,872✔
1608
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,196✔
1609
        } else {
1610
                auto err = response_->async_body_reader_->AsyncRead(
1611
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1612
                if (err != error::NoError) {
274✔
1613
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1614
                }
1615
        }
1616
}
1,872✔
1617

1618
void Stream::WriteNewBodyBuffer(size_t size) {
1,871✔
1619
        response_data_.http_response_->body().data = body_buffer_.data();
1,871✔
1620
        response_data_.http_response_->body().size = size;
1,871✔
1621

1622
        if (size > 0) {
1,871✔
1623
                response_data_.http_response_->body().more = true;
1,750✔
1624
        } else {
1625
                response_data_.http_response_->body().more = false;
121✔
1626
        }
1627

1628
        WriteBody();
1,871✔
1629
}
1,871✔
1630

1631
void Stream::WriteBody() {
3,599✔
1632
        auto &cancelled = cancelled_;
1633
        auto &response_data = response_data_;
3,599✔
1634

1635
        http::async_write_some(
7,198✔
1636
                socket_,
3,599✔
1637
                *response_data_.http_response_serializer_,
1638
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
3,562✔
1639
                        if (!*cancelled) {
3,562✔
1640
                                WriteBodyHandler(ec, num_written);
3,562✔
1641
                        }
1642
                });
3,562✔
1643
}
3,599✔
1644

1645
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
3,562✔
1646
        if (num_written > 0) {
3,562✔
1647
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,456✔
1648
        }
1649

1650
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,562✔
1651
                // Write next body block.
1652
                PrepareAndWriteNewBodyBuffer();
1,717✔
1653
        } else if (ec) {
1,845✔
1654
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1655
        } else if (num_written > 0) {
1,841✔
1656
                // We are still writing the body.
1657
                WriteBody();
1,728✔
1658
        } else {
1659
                // We are finished.
1660
                FinishReply();
113✔
1661
        }
1662
}
3,562✔
1663

1664
void Stream::FinishReply() {
146✔
1665
        // We are done.
1666
        *cancelled_ = true;
146✔
1667
        cancelled_ = make_shared<bool>(true);
146✔
1668
        status_ = TransactionStatus::Done;
146✔
1669
        // Release ownership of Body reader.
1670
        response_->body_reader_.reset();
146✔
1671
        response_->async_body_reader_.reset();
146✔
1672
        reply_finished_handler_(error::NoError);
146✔
1673
        server_.RemoveStream(shared_from_this());
146✔
1674
}
146✔
1675

1676
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1677
        SetupResponse();
9✔
1678

1679
        switch_protocol_handler_ = handler;
9✔
1680
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1681

1682
        auto &cancelled = cancelled_;
1683
        auto &response_data = response_data_;
9✔
1684

1685
        http::async_write_header(
18✔
1686
                socket_,
9✔
1687
                *response_data_.http_response_serializer_,
1688
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1689
                        if (!*cancelled) {
9✔
1690
                                SwitchingProtocolHandler(ec, num_written);
8✔
1691
                        }
1692
                });
9✔
1693

1694
        return error::NoError;
9✔
1695
}
1696

1697
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1698
        if (num_written > 0) {
8✔
1699
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1700
        }
1701

1702
        if (ec) {
8✔
1703
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1704
                return;
×
1705
        }
1706

1707
        auto socket = make_shared<RawSocket<tcp::socket>>(
1708
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1709

1710
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1711

1712
        // Rest of the connection is done directly on the socket, we are done here.
1713
        status_ = TransactionStatus::Done;
8✔
1714
        *cancelled_ = true;
8✔
1715
        cancelled_ = make_shared<bool>(true);
8✔
1716
        server_.RemoveStream(shared_from_this());
16✔
1717

1718
        switch_protocol_handler(socket);
16✔
1719
}
1720

1721
void Stream::CallBodyHandler() {
203✔
1722
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1723
        // it immediately destroys, which would destroy this stream as well. At the end of this
1724
        // function, it's ok to destroy it.
1725
        auto stream_ref = shared_from_this();
1726

1727
        server_.body_handler_(request_, error::NoError);
609✔
1728

1729
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1730
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1731
        // request has not been handled correctly.
1732
        auto response = maybe_response_.lock();
203✔
1733
        if (!response) {
203✔
1734
                logger_.Error("Handler produced no response. Closing stream prematurely.");
4✔
1735
                *cancelled_ = true;
2✔
1736
                cancelled_ = make_shared<bool>(true);
2✔
1737
                server_.RemoveStream(shared_from_this());
6✔
1738
        }
1739
}
203✔
1740

1741
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
226✔
1742
        event_loop_ {event_loop},
1743
        acceptor_(GetAsioIoContext(event_loop_)) {
406✔
1744
}
226✔
1745

1746
Server::~Server() {
452✔
1747
        Cancel();
226✔
1748
}
226✔
1749

1750
error::Error Server::AsyncServeUrl(
194✔
1751
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1752
        return AsyncServeUrl(
1753
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
792✔
1754
                        if (err != error::NoError) {
200✔
1755
                                body_handler(expected::unexpected(err));
14✔
1756
                        } else {
1757
                                body_handler(req);
386✔
1758
                        }
1759
                });
588✔
1760
}
1761

1762
error::Error Server::AsyncServeUrl(
207✔
1763
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1764
        auto err = BreakDownUrl(url, address_);
207✔
1765
        if (error::NoError != err) {
207✔
1766
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1767
        }
1768

1769
        if (address_.protocol != "http") {
207✔
1770
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1771
        }
1772

1773
        if (address_.path.size() > 0 && address_.path != "/") {
207✔
1774
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1775
        }
1776

1777
        boost::system::error_code ec;
206✔
1778
        auto address = asio::ip::make_address(address_.host, ec);
206✔
1779
        if (ec) {
206✔
1780
                return error::Error(
1781
                        ec.default_error_condition(),
×
1782
                        "Could not construct endpoint from address " + address_.host);
×
1783
        }
1784

1785
        asio::ip::tcp::endpoint endpoint(address, address_.port);
206✔
1786

1787
        ec.clear();
1788
        acceptor_.open(endpoint.protocol(), ec);
206✔
1789
        if (ec) {
206✔
1790
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1791
        }
1792

1793
        // Allow address reuse, otherwise we can't re-bind later.
1794
        ec.clear();
1795
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
206✔
1796
        if (ec) {
206✔
1797
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1798
        }
1799

1800
        ec.clear();
1801
        acceptor_.bind(endpoint, ec);
206✔
1802
        if (ec) {
206✔
1803
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1804
        }
1805

1806
        ec.clear();
1807
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
206✔
1808
        if (ec) {
206✔
1809
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1810
        }
1811

1812
        header_handler_ = header_handler;
206✔
1813
        body_handler_ = body_handler;
206✔
1814

1815
        PrepareNewStream();
206✔
1816

1817
        return error::NoError;
206✔
1818
}
1819

1820
void Server::Cancel() {
241✔
1821
        if (acceptor_.is_open()) {
241✔
1822
                acceptor_.cancel();
206✔
1823
                acceptor_.close();
206✔
1824
        }
1825
        streams_.clear();
1826
}
241✔
1827

1828
uint16_t Server::GetPort() const {
13✔
1829
        return acceptor_.local_endpoint().port();
13✔
1830
}
1831

1832
string Server::GetUrl() const {
12✔
1833
        return "http://127.0.0.1:" + to_string(GetPort());
24✔
1834
}
1835

1836
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
202✔
1837
        if (*req->cancelled_) {
202✔
1838
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1839
        }
1840
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
404✔
1841
        req->stream_.maybe_response_ = response;
202✔
1842
        return response;
202✔
1843
}
1844

1845
error::Error Server::AsyncReply(
190✔
1846
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1847
        if (*resp->cancelled_) {
190✔
1848
                return MakeError(StreamCancelledError, "Cannot send response");
×
1849
        }
1850

1851
        resp->stream_.AsyncReply(reply_finished_handler);
190✔
1852
        return error::NoError;
190✔
1853
}
1854

1855
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
52✔
1856
        if (*req->cancelled_) {
52✔
1857
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1858
        }
1859

1860
        auto &stream = req->stream_;
52✔
1861
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
52✔
1862
                return expected::unexpected(error::Error(
1✔
1863
                        make_error_condition(errc::operation_in_progress),
2✔
1864
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1865
        }
1866

1867
        if (stream.request_body_length_ == 0) {
51✔
1868
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
45✔
1869
        }
1870

1871
        stream.status_ = TransactionStatus::ReaderCreated;
36✔
1872
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
72✔
1873
}
1874

1875
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1876
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1877
}
1878

1879
void Server::PrepareNewStream() {
418✔
1880
        StreamPtr new_stream {new Stream(*this)};
418✔
1881
        streams_.insert(new_stream);
1882
        AsyncAccept(new_stream);
836✔
1883
}
418✔
1884

1885
void Server::AsyncAccept(StreamPtr stream) {
418✔
1886
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
633✔
1887
                if (ec) {
215✔
1888
                        log::Error("Could not accept connection: " + ec.message());
6✔
1889
                        return;
3✔
1890
                }
1891

1892
                stream->AcceptHandler(ec);
212✔
1893

1894
                this->PrepareNewStream();
212✔
1895
        });
1896
}
418✔
1897

1898
void Server::RemoveStream(StreamPtr stream) {
176✔
1899
        streams_.erase(stream);
176✔
1900

1901
        stream->DoCancel();
176✔
1902
}
176✔
1903

1904
} // namespace http
1905
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc