• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1057366047

01 Nov 2023 08:50AM UTC coverage: 80.121% (-0.07%) from 80.195%
1057366047

push

gitlab-ci

kacf
fix: Don't cache server URL and token locally in process.

The problem if we do is that is that we may then connect to a
forwarder address which doesn't exist anymore. This produces
"connection refused", not "Not authorized", and therefore it does not
trigger an authentication request by itself. So always ask.

This can be easily reproduced by bringing up both mender-auth and
mender-update, and then restarting mender-auth after the first auth
cycle.

This makes the whole `AuthenticatorExternalTokenUpdateTest` test
irrelevant, since we no longer need to test the caching capability of
the client side authenticator. So just remove it.

Changelog: None
Ticket: None

Signed-off-by: Kristian Amlie <kristian.amlie@northern.tech>

6 of 6 new or added lines in 1 file covered. (100.0%)

6884 of 8592 relevant lines covered (80.12%)

9363.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.35
/src/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio.hpp>
20
#include <boost/asio/ip/tcp.hpp>
21
#include <boost/asio/ssl/host_name_verification.hpp>
22
#include <boost/asio/ssl/verify_mode.hpp>
23

24
#include <common/common.hpp>
25

26
namespace mender {
27
namespace http {
28

29
namespace common = mender::common;
30

31
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
32
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
33
const unsigned int BeastHttpVersion = 11;
34

35
namespace asio = boost::asio;
36
namespace http = boost::beast::http;
37

38
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
39

40
static http::verb MethodToBeastVerb(Method method) {
234✔
41
        switch (method) {
234✔
42
        case Method::GET:
43
                return http::verb::get;
44
        case Method::HEAD:
45
                return http::verb::head;
46
        case Method::POST:
47
                return http::verb::post;
48
        case Method::PUT:
49
                return http::verb::put;
50
        case Method::PATCH:
51
                return http::verb::patch;
52
        case Method::CONNECT:
53
                return http::verb::connect;
54
        case Method::Invalid:
55
                // Fallthrough to end (no-op).
56
                break;
57
        }
58
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
59
        // still assert here for safety.
60
        assert(false);
61
        return http::verb::get;
62
}
63

64
static expected::expected<Method, error::Error> BeastVerbToMethod(
217✔
65
        http::verb verb, const string &verb_string) {
66
        switch (verb) {
217✔
67
        case http::verb::get:
181✔
68
                return Method::GET;
69
        case http::verb::head:
×
70
                return Method::HEAD;
71
        case http::verb::post:
13✔
72
                return Method::POST;
73
        case http::verb::put:
23✔
74
                return Method::PUT;
75
        case http::verb::patch:
×
76
                return Method::PATCH;
77
        case http::verb::connect:
×
78
                return Method::CONNECT;
79
        default:
×
80
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
81
        }
82
}
83

84
template <typename StreamType>
85
class BodyAsyncReader : virtual public io::AsyncReader {
86
public:
87
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
144✔
88
                stream_ {stream},
89
                cancelled_ {cancelled} {
288✔
90
        }
144✔
91
        ~BodyAsyncReader() {
38✔
92
                Cancel();
38✔
93
        }
76✔
94

95
        error::Error AsyncRead(
2,034✔
96
                vector<uint8_t>::iterator start,
97
                vector<uint8_t>::iterator end,
98
                io::AsyncIoHandler handler) override {
99
                if (*cancelled_) {
2,034✔
100
                        return error::MakeError(
×
101
                                error::ProgrammingError,
102
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
103
                }
104
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,034✔
105
                return error::NoError;
2,034✔
106
        }
107

108
        void Cancel() override {
40✔
109
                if (!*cancelled_) {
40✔
110
                        stream_.Cancel();
4✔
111
                }
112
        }
40✔
113

114
private:
115
        StreamType &stream_;
116
        shared_ptr<bool> cancelled_;
117

118
        friend class Client;
119
        friend class Server;
120
};
121

122
template <typename StreamType>
123
class RawSocket : virtual public io::AsyncReadWriter {
124
public:
125
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
126
                destroying_ {make_shared<bool>(false)},
×
127
                stream_ {stream},
128
                buffered_ {buffered} {
×
129
                // If there are no buffered bytes, then we don't need it.
130
                if (buffered_ && buffered_->size() == 0) {
×
131
                        buffered_.reset();
×
132
                }
133
        }
×
134

135
        ~RawSocket() {
15✔
136
                *destroying_ = true;
15✔
137
                Cancel();
15✔
138
        }
30✔
139

140
        error::Error AsyncRead(
320✔
141
                vector<uint8_t>::iterator start,
142
                vector<uint8_t>::iterator end,
143
                io::AsyncIoHandler handler) override {
144
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
145
                // header and parts of the body in one block, return those first.
146
                if (buffered_) {
320✔
147
                        return DrainPrebufferedData(start, end, handler);
8✔
148
                }
149

150
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
151
                auto &destroying = destroying_;
152
                stream_->async_read_some(
632✔
153
                        read_buffer_,
316✔
154
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
155
                                if (*destroying) {
313✔
156
                                        return;
157
                                }
158

159
                                if (ec == asio::error::operation_aborted) {
313✔
160
                                        handler(expected::unexpected(error::Error(
12✔
161
                                                make_error_condition(errc::operation_canceled),
6✔
162
                                                "Could not read from socket")));
163
                                } else if (ec) {
310✔
164
                                        handler(expected::unexpected(
12✔
165
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
166
                                } else {
167
                                        handler(num_read);
608✔
168
                                }
169
                        });
170
                return error::NoError;
316✔
171
        }
172

173
        error::Error AsyncWrite(
309✔
174
                vector<uint8_t>::const_iterator start,
175
                vector<uint8_t>::const_iterator end,
176
                io::AsyncIoHandler handler) override {
177
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
178
                auto &destroying = destroying_;
179
                stream_->async_write_some(
618✔
180
                        write_buffer_,
309✔
181
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
182
                                if (*destroying) {
306✔
183
                                        return;
184
                                }
185

186
                                if (ec == asio::error::operation_aborted) {
306✔
187
                                        handler(expected::unexpected(error::Error(
×
188
                                                make_error_condition(errc::operation_canceled),
×
189
                                                "Could not write to socket")));
190
                                } else if (ec) {
306✔
191
                                        handler(expected::unexpected(
×
192
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
193
                                } else {
194
                                        handler(num_written);
612✔
195
                                }
196
                        });
197
                return error::NoError;
309✔
198
        }
199

200
        void Cancel() override {
28✔
201
                if (stream_->lowest_layer().is_open()) {
28✔
202
                        stream_->lowest_layer().cancel();
15✔
203
                        stream_->lowest_layer().close();
15✔
204
                }
205
        }
28✔
206

207
private:
208
        error::Error DrainPrebufferedData(
4✔
209
                vector<uint8_t>::iterator start,
210
                vector<uint8_t>::iterator end,
211
                io::AsyncIoHandler handler) {
212
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
213

214
                // These two lines are equivalent to:
215
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
216
                // but compatible with Boost 1.67.
217
                const beast::flat_buffer &cbuffered = *buffered_;
218
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
219
                buffered_->consume(to_copy);
4✔
220
                if (buffered_->size() == 0) {
4✔
221
                        // We don't need it anymore.
222
                        buffered_.reset();
4✔
223
                }
224
                handler(to_copy);
4✔
225
                return error::NoError;
4✔
226
        }
227

228
        shared_ptr<bool> destroying_;
229
        shared_ptr<StreamType> stream_;
230
        shared_ptr<beast::flat_buffer> buffered_;
231
        asio::mutable_buffer read_buffer_;
232
        asio::const_buffer write_buffer_;
233
};
234

235
Client::Client(
344✔
236
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
237
        event_loop_ {event_loop},
238
        logger_name_ {logger_name},
239
        client_config_ {client},
240
        http_proxy_ {client.http_proxy},
344✔
241
        https_proxy_ {client.https_proxy},
344✔
242
        no_proxy_ {client.no_proxy},
344✔
243
        cancelled_ {make_shared<bool>(true)},
344✔
244
        disable_keep_alive_ {client.disable_keep_alive},
245
        resolver_(GetAsioIoContext(event_loop)),
246
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,376✔
247
}
344✔
248

249
Client::~Client() {
2,064✔
250
        if (!*cancelled_) {
344✔
251
                logger_.Warning("Client destroyed while request is still active!");
32✔
252
        }
253
        DoCancel();
344✔
254
}
344✔
255

256
error::Error Client::Initialize() {
272✔
257
        if (initialized_) {
272✔
258
                return error::NoError;
66✔
259
        }
260

261
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
608✔
262
                ssl_ctx_[i].set_verify_mode(
812✔
263
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
264

265
                beast::error_code ec {};
407✔
266
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
407✔
267
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
268
                        ssl_ctx_[i].use_certificate_file(
269
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
270
                        if (ec) {
4✔
271
                                return error::Error(
272
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
273
                        }
274
                        ssl_ctx_[i].use_private_key_file(
275
                                client_config_.client_cert_key_path, boost::asio::ssl::context_base::pem, ec);
3✔
276
                        if (ec) {
3✔
277
                                return error::Error(
278
                                        ec.default_error_condition(), "Could not load client certificate private key");
2✔
279
                        }
280
                } else if (
281
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
403✔
282
                        return error::Error(
283
                                make_error_condition(errc::invalid_argument),
4✔
284
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
285
                }
286

287
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
403✔
288
                if (ec) {
403✔
289
                        auto err = error::Error(
290
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
291
                        if (client_config_.server_cert_path == "") {
×
292
                                // We aren't going to have any valid certificates then.
293
                                return err;
×
294
                        } else {
295
                                // We have a dedicated certificate, so this is not fatal.
296
                                log::Info(err.String());
×
297
                        }
298
                }
299
                if (client_config_.server_cert_path != "") {
403✔
300
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
47✔
301
                        if (ec) {
47✔
302
                                return error::Error(
303
                                        ec.default_error_condition(), "Failed to load the server certificate!");
2✔
304
                        }
305
                }
306
        }
307

308
        initialized_ = true;
201✔
309

310
        return error::NoError;
201✔
311
}
312

313
error::Error Client::AsyncCall(
272✔
314
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
315
        auto err = Initialize();
272✔
316
        if (err != error::NoError) {
272✔
317
                return err;
5✔
318
        }
319

320
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
267✔
321
                return error::Error(
322
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
323
        }
324

325
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
267✔
326
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
327
        }
328

329
        if (!header_handler || !body_handler) {
265✔
330
                return error::MakeError(
331
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
332
        }
333

334
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
264✔
335
                return error::Error(
336
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
337
        }
338

339
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
263✔
340

341
        request_ = req;
342

343
        err = HandleProxySetup();
263✔
344
        if (err != error::NoError) {
263✔
345
                return err;
4✔
346
        }
347

348
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
349
        // request to route to our k8s cluster. Set this in all cases.
350
        req->SetHeader("HOST", req->address_.host);
518✔
351

352
        header_handler_ = header_handler;
259✔
353
        body_handler_ = body_handler;
259✔
354
        status_ = TransactionStatus::None;
259✔
355

356
        cancelled_ = make_shared<bool>(false);
259✔
357

358
        auto &cancelled = cancelled_;
359

360
        resolver_.async_resolve(
518✔
361
                request_->address_.host,
362
                to_string(request_->address_.port),
518✔
363
                [this, cancelled](
517✔
364
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
257✔
365
                        if (!*cancelled) {
258✔
366
                                ResolveHandler(ec, results);
257✔
367
                        }
368
                });
258✔
369

370
        return error::NoError;
259✔
371
}
372

373
error::Error Client::HandleProxySetup() {
263✔
374
        secondary_req_.reset();
263✔
375

376
        if (request_->address_.protocol == "http") {
263✔
377
                socket_mode_ = SocketMode::Plain;
240✔
378

379
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
240✔
380
                        // Make a modified proxy request.
381
                        BrokenDownUrl proxy_address;
18✔
382
                        auto err = BreakDownUrl(http_proxy_, proxy_address);
10✔
383
                        if (err != error::NoError) {
10✔
384
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
385
                        }
386
                        if (proxy_address.path != "" && proxy_address.path != "/") {
9✔
387
                                return MakeError(
388
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
389
                        }
390

391
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
16✔
392
                                                                          + ":" + to_string(request_->address_.port)
24✔
393
                                                                          + request_->address_.path;
24✔
394
                        request_->address_.host = proxy_address.host;
8✔
395
                        request_->address_.port = proxy_address.port;
8✔
396
                        request_->address_.protocol = proxy_address.protocol;
8✔
397

398
                        if (proxy_address.protocol == "https") {
8✔
399
                                socket_mode_ = SocketMode::Tls;
5✔
400
                        } else if (proxy_address.protocol == "http") {
3✔
401
                                socket_mode_ = SocketMode::Plain;
3✔
402
                        } else {
403
                                // Should never get here.
404
                                assert(false);
405
                        }
406
                }
407
        } else if (request_->address_.protocol == "https") {
23✔
408
                socket_mode_ = SocketMode::Tls;
23✔
409

410
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, no_proxy_)) {
23✔
411
                        // Save the original request for later, so that we can make a new request
412
                        // over the channel established by CONNECT.
413
                        secondary_req_ = std::move(request_);
414

415
                        request_ = make_shared<OutgoingRequest>();
26✔
416
                        request_->SetMethod(Method::CONNECT);
13✔
417
                        BrokenDownUrl proxy_address;
24✔
418
                        auto err = BreakDownUrl(https_proxy_, proxy_address);
13✔
419
                        if (err != error::NoError) {
13✔
420
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
421
                        }
422
                        if (proxy_address.path != "" && proxy_address.path != "/") {
12✔
423
                                return MakeError(
424
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
425
                        }
426

427
                        request_->address_.path =
428
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
22✔
429
                        request_->address_.host = proxy_address.host;
11✔
430
                        request_->address_.port = proxy_address.port;
11✔
431
                        request_->address_.protocol = proxy_address.protocol;
11✔
432

433
                        if (proxy_address.protocol == "https") {
11✔
434
                                socket_mode_ = SocketMode::Tls;
6✔
435
                        } else if (proxy_address.protocol == "http") {
5✔
436
                                socket_mode_ = SocketMode::Plain;
5✔
437
                        } else {
438
                                // Should never get here.
439
                                assert(false);
440
                        }
441
                }
442
        } else {
443
                // Should never get here
444
                assert(false);
445
        }
446

447
        return error::NoError;
259✔
448
}
449

450
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
163✔
451
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
163✔
452
                return expected::unexpected(error::Error(
2✔
453
                        make_error_condition(errc::operation_in_progress),
4✔
454
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
455
        }
456

457
        if (response_body_length_ == 0) {
161✔
458
                return expected::unexpected(
17✔
459
                        MakeError(BodyMissingError, "Response does not contain a body"));
51✔
460
        }
461

462
        status_ = TransactionStatus::ReaderCreated;
144✔
463
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
288✔
464
}
465

466
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
467
        if (*cancelled_) {
7✔
468
                return expected::unexpected(error::Error(
×
469
                        make_error_condition(errc::not_connected),
×
470
                        "Cannot switch protocols if endpoint is not connected"));
×
471
        }
472

473
        // Rest of the connection is done directly on the socket, we are done here.
474
        status_ = TransactionStatus::Done;
7✔
475
        *cancelled_ = true;
7✔
476
        cancelled_ = make_shared<bool>(false);
14✔
477

478
        auto stream = stream_;
479
        // This no longer belongs to us.
480
        stream_.reset();
7✔
481

482
        switch (socket_mode_) {
7✔
483
        case SocketMode::TlsTls:
×
484
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
485
                        stream, response_data_.response_buffer_);
×
486
        case SocketMode::Tls:
×
487
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
488
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
489
                        response_data_.response_buffer_);
×
490
        case SocketMode::Plain:
7✔
491
                return make_shared<RawSocket<tcp::socket>>(
7✔
492
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
493
                        response_data_.response_buffer_);
7✔
494
        }
495

496
        AssertOrReturnUnexpected(false);
×
497
}
498

499
void Client::CallHandler(ResponseHandler handler) {
289✔
500
        // This function exists to make sure we have a copy of the handler we're calling (in the
501
        // argument list). This is important in case the handler owns the client instance through a
502
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
503
        // does, then it destroys the final copy of the handler, and therefore also the client,
504
        // which is why we need to make a copy here, before calling it.
505
        handler(response_);
289✔
506
}
289✔
507

508
void Client::CallErrorHandler(
39✔
509
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
510
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
117✔
511
}
39✔
512

513
void Client::CallErrorHandler(
168✔
514
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
515
        *cancelled_ = true;
168✔
516
        cancelled_ = make_shared<bool>(true);
168✔
517
        stream_.reset();
168✔
518
        status_ = TransactionStatus::Done;
168✔
519
        handler(expected::unexpected(
336✔
520
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
672✔
521
}
168✔
522

523
void Client::ResolveHandler(
257✔
524
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
525
        if (ec) {
257✔
526
                CallErrorHandler(ec, request_, header_handler_);
×
527
                return;
×
528
        }
529

530
        if (logger_.Level() >= log::LogLevel::Debug) {
257✔
531
                string ips = "[";
237✔
532
                string sep;
533
                for (auto r : results) {
1,008✔
534
                        ips += sep;
267✔
535
                        ips += r.endpoint().address().to_string();
267✔
536
                        sep = ", ";
267✔
537
                }
538
                ips += "]";
237✔
539
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
474✔
540
        }
541

542
        resolver_results_ = results;
543

544
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
257✔
545
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
514✔
546

547
        if (!response_data_.response_buffer_) {
257✔
548
                // We can reuse this if preexisting.
549
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
382✔
550

551
                // This is equivalent to:
552
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
553
                // but compatible with Boost 1.67.
554
                response_data_.response_buffer_->prepare(
555
                        body_buffer_.size() - response_data_.response_buffer_->size());
191✔
556
        }
557

558
        auto &cancelled = cancelled_;
559

560
        asio::async_connect(
257✔
561
                stream_->lowest_layer(),
562
                resolver_results_,
257✔
563
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
514✔
564
                        if (!*cancelled) {
257✔
565
                                switch (socket_mode_) {
257✔
566
                                case SocketMode::TlsTls:
×
567
                                        // Should never happen because we always need to handshake
568
                                        // the innermost Tls first, then the outermost, but the
569
                                        // latter doesn't happen here.
570
                                        assert(false);
571
                                        CallErrorHandler(
×
572
                                                error::MakeError(
×
573
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
574
                                                request_,
×
575
                                                header_handler_);
×
576
                                case SocketMode::Tls:
20✔
577
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
20✔
578
                                case SocketMode::Plain:
237✔
579
                                        return ConnectHandler(ec, endpoint);
237✔
580
                                }
581
                        }
582
                });
583
}
584

585
template <typename StreamType>
586
void Client::HandshakeHandler(
23✔
587
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
588
        if (ec) {
23✔
589
                CallErrorHandler(ec, request_, header_handler_);
2✔
590
                return;
2✔
591
        }
592

593
        if (not disable_keep_alive_) {
21✔
594
                boost::asio::socket_base::keep_alive option(true);
595
                stream_->lowest_layer().set_option(option);
21✔
596
        }
597

598
        // Set SNI Hostname (many hosts need this to handshake successfully)
599
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
21✔
600
                beast::error_code ec2 {
×
601
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
602
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
603
        }
604

605
        // Enable host name verification (not done automatically and we don't have
606
        // enough access to the TLS internals to use X509_VERIFY_PARAM_set1_host(),
607
        // hence the callback that boost provides).
608
        boost::system::error_code b_ec;
21✔
609
        stream.set_verify_callback(ssl::host_name_verification(request_->address_.host), b_ec);
42✔
610
        if (b_ec) {
21✔
611
                logger_.Error("Failed to enable host name verification: " + b_ec.message());
×
612
                CallErrorHandler(b_ec, request_, header_handler_);
×
613
                return;
×
614
        }
615

616
        auto &cancelled = cancelled_;
617

618
        stream.async_handshake(
42✔
619
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
21✔
620
                        if (*cancelled) {
23✔
621
                                return;
622
                        }
623
                        if (ec) {
23✔
624
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
20✔
625
                                CallErrorHandler(ec, request_, header_handler_);
10✔
626
                                return;
10✔
627
                        }
628
                        logger_.Debug("https: Successful SSL handshake");
26✔
629
                        ConnectHandler(ec, endpoint);
13✔
630
                });
631
}
632

633

634
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
250✔
635
        if (ec) {
250✔
636
                CallErrorHandler(ec, request_, header_handler_);
16✔
637
                return;
16✔
638
        }
639

640
        if (not disable_keep_alive_) {
234✔
641
                boost::asio::socket_base::keep_alive option(true);
642
                stream_->lowest_layer().set_option(option);
234✔
643
        }
644

645
        logger_.Debug("Connected to " + endpoint.address().to_string());
468✔
646

647
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
234✔
648
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
468✔
649

650
        for (const auto &header : request_->headers_) {
619✔
651
                request_data_.http_request_->set(header.first, header.second);
385✔
652
        }
653

654
        request_data_.http_request_serializer_ =
655
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
234✔
656

657
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
468✔
658

659
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
660
        // if they do, they should be handled higher up in the application logic.
661
        //
662
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
663
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
664
        // `has_value()` in their code, causing their subsequent comparison operation to
665
        // misbehave. So pass highest possible value instead.
666
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
667

668
        auto &cancelled = cancelled_;
669
        auto &request_data = request_data_;
234✔
670

671
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
234✔
672
                if (!*cancelled) {
234✔
673
                        WriteHeaderHandler(ec, num_written);
234✔
674
                }
675
        };
468✔
676

677
        switch (socket_mode_) {
234✔
678
        case SocketMode::TlsTls:
1✔
679
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
1✔
680
                break;
681
        case SocketMode::Tls:
12✔
682
                http::async_write_header(
12✔
683
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
684
                break;
685
        case SocketMode::Plain:
221✔
686
                http::async_write_header(
221✔
687
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
688
                break;
689
        }
690
}
691

692
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
234✔
693
        if (num_written > 0) {
234✔
694
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
468✔
695
        }
696

697
        if (ec) {
234✔
698
                CallErrorHandler(ec, request_, header_handler_);
×
699
                return;
195✔
700
        }
701

702
        auto header = request_->GetHeader("Content-Length");
468✔
703
        if (!header || header.value() == "0") {
234✔
704
                ReadHeader();
194✔
705
                return;
706
        }
707

708
        auto length = common::StringToLongLong(header.value());
40✔
709
        if (!length || length.value() < 0) {
40✔
710
                auto err = error::Error(
711
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
712
                CallErrorHandler(err, request_, header_handler_);
×
713
                return;
714
        }
715
        request_body_length_ = length.value();
40✔
716

717
        if (!request_->body_gen_ && !request_->async_body_gen_) {
40✔
718
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
719
                CallErrorHandler(err, request_, header_handler_);
2✔
720
                return;
721
        }
722

723
        assert(!(request_->body_gen_ && request_->async_body_gen_));
724

725
        if (request_->body_gen_) {
39✔
726
                auto body_reader = request_->body_gen_();
33✔
727
                if (!body_reader) {
33✔
728
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
729
                        return;
730
                }
731
                request_->body_reader_ = body_reader.value();
33✔
732
        } else {
733
                auto body_reader = request_->async_body_gen_();
6✔
734
                if (!body_reader) {
6✔
735
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
736
                        return;
737
                }
738
                request_->async_body_reader_ = body_reader.value();
6✔
739
        }
740

741
        PrepareAndWriteNewBodyBuffer();
39✔
742
}
743

744
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,124✔
745
        if (num_written > 0) {
2,124✔
746
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,088✔
747
        }
748

749
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,124✔
750
                // Write next block of the body.
751
                PrepareAndWriteNewBodyBuffer();
1,044✔
752
        } else if (ec) {
1,080✔
753
                CallErrorHandler(ec, request_, header_handler_);
8✔
754
        } else if (num_written > 0) {
1,076✔
755
                // We are still writing the body.
756
                WriteBody();
1,044✔
757
        } else {
758
                // We are ready to receive the response.
759
                ReadHeader();
32✔
760
        }
761
}
2,124✔
762

763
void Client::PrepareAndWriteNewBodyBuffer() {
1,083✔
764
        // request_->body_reader_ XOR request_->async_body_reader_
765
        assert(
766
                (request_->body_reader_ || request_->async_body_reader_)
767
                && !(request_->body_reader_ && request_->async_body_reader_));
768

769
        auto cancelled = cancelled_;
770
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,438✔
771
                if (!*cancelled) {
1,083✔
772
                        if (!read) {
1,082✔
773
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
774
                                return;
2✔
775
                        }
776
                        WriteNewBodyBuffer(read.value());
1,080✔
777
                }
778
        };
1,083✔
779

780

781
        if (request_->body_reader_) {
1,083✔
782
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,316✔
783
        } else {
784
                auto err = request_->async_body_reader_->AsyncRead(
785
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
786
                if (err != error::NoError) {
425✔
787
                        CallErrorHandler(err, request_, header_handler_);
×
788
                }
789
        }
790
}
1,083✔
791

792
void Client::WriteNewBodyBuffer(size_t size) {
1,080✔
793
        request_data_.http_request_->body().data = body_buffer_.data();
1,080✔
794
        request_data_.http_request_->body().size = size;
1,080✔
795

796
        if (size > 0) {
1,080✔
797
                request_data_.http_request_->body().more = true;
1,048✔
798
        } else {
799
                // Release ownership of Body reader.
800
                request_->body_reader_.reset();
32✔
801
                request_->async_body_reader_.reset();
32✔
802
                request_data_.http_request_->body().more = false;
32✔
803
        }
804

805
        WriteBody();
1,080✔
806
}
1,080✔
807

808
void Client::WriteBody() {
2,124✔
809
        auto &cancelled = cancelled_;
810
        auto &request_data = request_data_;
2,124✔
811

812
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,124✔
813
                if (!*cancelled) {
2,124✔
814
                        WriteBodyHandler(ec, num_written);
2,124✔
815
                }
816
        };
4,248✔
817

818
        switch (socket_mode_) {
2,124✔
819
        case SocketMode::TlsTls:
×
820
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
821
                break;
822
        case SocketMode::Tls:
×
823
                http::async_write_some(
824
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
825
                break;
826
        case SocketMode::Plain:
2,124✔
827
                http::async_write_some(
828
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
829
                break;
830
        }
831
}
2,124✔
832

833
void Client::ReadHeader() {
226✔
834
        auto &cancelled = cancelled_;
835
        auto &response_data = response_data_;
226✔
836

837
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
223✔
838
                if (!*cancelled) {
223✔
839
                        ReadHeaderHandler(ec, num_read);
223✔
840
                }
841
        };
452✔
842

843
        switch (socket_mode_) {
226✔
844
        case SocketMode::TlsTls:
1✔
845
                http::async_read_some(
1✔
846
                        *stream_,
847
                        *response_data_.response_buffer_,
848
                        *response_data_.http_response_parser_,
849
                        handler);
850
                break;
851
        case SocketMode::Tls:
12✔
852
                http::async_read_some(
12✔
853
                        stream_->next_layer(),
854
                        *response_data_.response_buffer_,
855
                        *response_data_.http_response_parser_,
856
                        handler);
857
                break;
858
        case SocketMode::Plain:
213✔
859
                http::async_read_some(
213✔
860
                        stream_->next_layer().next_layer(),
861
                        *response_data_.response_buffer_,
862
                        *response_data_.http_response_parser_,
863
                        handler);
864
                break;
865
        }
866
}
226✔
867

868
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
223✔
869
        if (num_read > 0) {
223✔
870
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
436✔
871
        }
872

873
        if (ec) {
223✔
874
                CallErrorHandler(ec, request_, header_handler_);
5✔
875
                return;
78✔
876
        }
877

878
        if (!response_data_.http_response_parser_->is_header_done()) {
218✔
879
                ReadHeader();
×
880
                return;
×
881
        }
882

883
        auto content_length = response_data_.http_response_parser_->content_length();
218✔
884
        if (content_length) {
218✔
885
                response_body_length_ = content_length.value();
179✔
886
        } else {
887
                response_body_length_ = 0;
39✔
888
        }
889

890
        if (secondary_req_) {
218✔
891
                HandleSecondaryRequest();
7✔
892
                return;
7✔
893
        }
894

895
        response_.reset(new IncomingResponse(*this, cancelled_));
422✔
896
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
211✔
897
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
211✔
898

899
        logger_.Debug(
422✔
900
                "Received response: " + to_string(response_->status_code_) + " "
422✔
901
                + response_->status_message_);
633✔
902

903
        string debug_str;
904
        for (auto header = response_data_.http_response_parser_->get().cbegin();
247✔
905
                 header != response_data_.http_response_parser_->get().cend();
458✔
906
                 header++) {
907
                response_->headers_[string {header->name_string()}] = string {header->value()};
741✔
908
                if (logger_.Level() >= log::LogLevel::Debug) {
247✔
909
                        debug_str += string {header->name_string()};
233✔
910
                        debug_str += ": ";
233✔
911
                        debug_str += string {header->value()};
233✔
912
                        debug_str += "\n";
233✔
913
                }
914
        }
915

916
        logger_.Debug("Received headers:\n" + debug_str);
422✔
917
        debug_str.clear();
918

919
        if (response_data_.http_response_parser_->chunked()) {
211✔
920
                auto cancelled = cancelled_;
921
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
922
                CallHandler(header_handler_);
2✔
923
                if (!*cancelled) {
1✔
924
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
925
                        CallErrorHandler(err, request_, body_handler_);
2✔
926
                }
927
                return;
928
        }
929

930
        response_body_read_ = 0;
210✔
931

932
        if (response_body_read_ >= response_body_length_) {
210✔
933
                auto cancelled = cancelled_;
934
                status_ = TransactionStatus::HeaderHandlerCalled;
46✔
935
                CallHandler(header_handler_);
92✔
936
                if (!*cancelled) {
46✔
937
                        status_ = TransactionStatus::Done;
40✔
938
                        CallHandler(body_handler_);
80✔
939

940
                        // After body handler has run, set the request to cancelled. The body
941
                        // handler may have made a new request, so this is not necessarily the same
942
                        // request as is currently active (note use of shared_ptr copy, not
943
                        // `cancelled_`).
944
                        *cancelled = true;
40✔
945
                }
946
                return;
947
        }
948

949
        auto cancelled = cancelled_;
950
        status_ = TransactionStatus::HeaderHandlerCalled;
164✔
951
        CallHandler(header_handler_);
328✔
952
        if (*cancelled) {
164✔
953
                return;
954
        }
955

956
        // We know that a body reader is required here, because of the `response_body_read_ >=
957
        // response_body_length_` check above.
958
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
145✔
959
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
4✔
960
        }
961
}
962

963
void Client::HandleSecondaryRequest() {
7✔
964
        logger_.Debug(
14✔
965
                "Received proxy response: "
966
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
14✔
967
                + string {response_data_.http_response_parser_->get().reason()});
28✔
968

969
        request_ = std::move(secondary_req_);
970

971
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
7✔
972
                auto err = MakeError(
973
                        ProxyError,
974
                        "Proxy returned unexpected response: "
975
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
976
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
977
                CallErrorHandler(err, request_, header_handler_);
4✔
978
                return;
979
        }
980

981
        if (response_body_length_ != 0 || response_data_.http_response_parser_->chunked()) {
5✔
982
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
983
                CallErrorHandler(err, request_, header_handler_);
×
984
                return;
985
        }
986

987
        // We are connected. Now repeat the request cycle with the original request. Pretend
988
        // we were just connected.
989

990
        assert(request_->GetProtocol() == "https");
991

992
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
993
        // a different layer, this will get out of sync.
994
        assert(response_data_.response_buffer_->size() == 0);
995

996
        switch (socket_mode_) {
5✔
997
        case SocketMode::TlsTls:
×
998
                // Should never get here, because this is the only place where TlsTls mode
999
                // is supposed to be turned on.
1000
                assert(false);
1001
                CallErrorHandler(
×
1002
                        error::MakeError(
×
1003
                                error::ProgrammingError,
1004
                                "Any other mode than Tls is not valid when handling secondary request"),
×
1005
                        request_,
×
1006
                        header_handler_);
×
1007
                break;
×
1008
        case SocketMode::Tls:
2✔
1009
                // Upgrade to TLS inside TLS.
1010
                socket_mode_ = SocketMode::TlsTls;
2✔
1011
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
2✔
1012
                break;
2✔
1013
        case SocketMode::Plain:
3✔
1014
                // Upgrade to TLS.
1015
                socket_mode_ = SocketMode::Tls;
3✔
1016
                HandshakeHandler(
3✔
1017
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
3✔
1018
                break;
3✔
1019
        }
1020
}
1021

1022
void Client::AsyncReadNextBodyPart(
3,803✔
1023
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1024
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1025

1026
        if (status_ == TransactionStatus::ReaderCreated) {
3,803✔
1027
                status_ = TransactionStatus::BodyReadingInProgress;
143✔
1028
        }
1029

1030
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
3,803✔
1031
                auto cancelled = cancelled_;
1032
                handler(0);
76✔
1033
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
38✔
1034
                        status_ = TransactionStatus::Done;
38✔
1035
                        CallHandler(body_handler_);
76✔
1036

1037
                        // After body handler has run, set the request to cancelled. The body
1038
                        // handler may have made a new request, so this is not necessarily the same
1039
                        // request as is currently active (note use of shared_ptr copy, not
1040
                        // `cancelled_`).
1041
                        *cancelled = true;
38✔
1042
                }
1043
                return;
1044
        }
1045

1046
        reader_buf_start_ = start;
3,765✔
1047
        reader_buf_end_ = end;
3,765✔
1048
        reader_handler_ = handler;
3,765✔
1049
        size_t read_size = end - start;
3,765✔
1050
        size_t smallest = min(body_buffer_.size(), read_size);
5,878✔
1051

1052
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
3,765✔
1053
        response_data_.http_response_parser_->get().body().size = smallest;
3,765✔
1054

1055
        auto &cancelled = cancelled_;
1056
        auto &response_data = response_data_;
3,765✔
1057

1058
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
3,764✔
1059
                if (!*cancelled) {
3,764✔
1060
                        ReadBodyHandler(ec, num_read);
3,763✔
1061
                }
1062
        };
7,530✔
1063

1064
        switch (socket_mode_) {
3,765✔
1065
        case SocketMode::TlsTls:
1✔
1066
                http::async_read_some(
1✔
1067
                        *stream_,
1068
                        *response_data_.response_buffer_,
1069
                        *response_data_.http_response_parser_,
1070
                        async_handler);
1071
                break;
1072
        case SocketMode::Tls:
3✔
1073
                http::async_read_some(
3✔
1074
                        stream_->next_layer(),
1075
                        *response_data_.response_buffer_,
1076
                        *response_data_.http_response_parser_,
1077
                        async_handler);
1078
                break;
1079
        case SocketMode::Plain:
3,761✔
1080
                http::async_read_some(
3,761✔
1081
                        stream_->next_layer().next_layer(),
1082
                        *response_data_.response_buffer_,
1083
                        *response_data_.http_response_parser_,
1084
                        async_handler);
1085
                break;
1086
        }
1087
}
1088

1089
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
3,763✔
1090
        if (num_read > 0) {
3,763✔
1091
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
7,428✔
1092
                response_body_read_ += num_read;
3,714✔
1093
        }
1094

1095
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,763✔
1096
                // This can be ignored. We always reset the buffer between reads anyway.
1097
                ec = error_code();
1,958✔
1098
        }
1099

1100
        assert(reader_handler_);
1101

1102
        if (response_body_read_ >= response_body_length_) {
3,763✔
1103
                status_ = TransactionStatus::BodyReadingFinished;
90✔
1104
        }
1105

1106
        auto cancelled = cancelled_;
1107

1108
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
3,763✔
1109
        size_t smallest = min(num_read, buf_size);
3,763✔
1110
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,763✔
1111
        if (ec) {
3,763✔
1112
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
98✔
1113
                reader_handler_(expected::unexpected(err));
147✔
1114
        } else {
1115
                reader_handler_(smallest);
7,428✔
1116
        }
1117

1118
        if (!*cancelled && ec) {
3,763✔
1119
                CallErrorHandler(ec, request_, body_handler_);
4✔
1120
                return;
1121
        }
1122
}
1123

1124
void Client::Cancel() {
195✔
1125
        auto cancelled = cancelled_;
1126

1127
        if (!*cancelled) {
195✔
1128
                auto err =
1129
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
246✔
1130
                switch (status_) {
123✔
1131
                case TransactionStatus::None:
1✔
1132
                        CallErrorHandler(err, request_, header_handler_);
1✔
1133
                        break;
1✔
1134
                case TransactionStatus::HeaderHandlerCalled:
120✔
1135
                case TransactionStatus::ReaderCreated:
1136
                case TransactionStatus::BodyReadingInProgress:
1137
                case TransactionStatus::BodyReadingFinished:
1138
                        CallErrorHandler(err, request_, body_handler_);
120✔
1139
                        break;
120✔
1140
                case TransactionStatus::Replying:
1141
                case TransactionStatus::SwitchingProtocol:
1142
                        // Not used by client.
1143
                        assert(false);
1144
                        break;
1145
                case TransactionStatus::BodyHandlerCalled:
1146
                case TransactionStatus::Done:
1147
                        break;
1148
                }
1149
        }
1150

1151
        if (!*cancelled) {
195✔
1152
                DoCancel();
2✔
1153
        }
1154
}
195✔
1155

1156
void Client::DoCancel() {
346✔
1157
        resolver_.cancel();
346✔
1158
        if (stream_) {
346✔
1159
                stream_->lowest_layer().cancel();
72✔
1160
                stream_->lowest_layer().close();
72✔
1161
                stream_.reset();
72✔
1162
        }
1163

1164
        request_.reset();
346✔
1165
        response_.reset();
346✔
1166

1167
        // Reset logger to no connection.
1168
        logger_ = log::Logger(logger_name_);
346✔
1169

1170
        // Set cancel state and then make a new one. Those who are interested should have their own
1171
        // pointer to the old one.
1172
        *cancelled_ = true;
346✔
1173
        cancelled_ = make_shared<bool>(true);
346✔
1174
}
346✔
1175

1176
Stream::Stream(Server &server) :
428✔
1177
        server_ {server},
1178
        logger_ {"http"},
1179
        cancelled_(make_shared<bool>(true)),
428✔
1180
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
428✔
1181
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,284✔
1182
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
856✔
1183

1184
        // This is equivalent to:
1185
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1186
        // but compatible with Boost 1.67.
1187
        request_data_.request_buffer_->prepare(
1188
                body_buffer_.size() - request_data_.request_buffer_->size());
428✔
1189

1190
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
856✔
1191

1192
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1193
        // they do, they should be handled higher up in the application logic.
1194
        //
1195
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1196
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1197
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1198
        // possible value instead.
1199
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1200
}
428✔
1201

1202
Stream::~Stream() {
1,284✔
1203
        DoCancel();
428✔
1204
}
428✔
1205

1206
void Stream::Cancel() {
7✔
1207
        auto cancelled = cancelled_;
1208

1209
        if (!*cancelled) {
7✔
1210
                auto err =
1211
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1212
                switch (status_) {
7✔
1213
                case TransactionStatus::None:
×
1214
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1215
                        break;
×
1216
                case TransactionStatus::HeaderHandlerCalled:
5✔
1217
                case TransactionStatus::ReaderCreated:
1218
                case TransactionStatus::BodyReadingInProgress:
1219
                case TransactionStatus::BodyReadingFinished:
1220
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1221
                        break;
5✔
1222
                case TransactionStatus::BodyHandlerCalled:
×
1223
                        // In between body handler and reply finished. No one to handle the status
1224
                        // here.
1225
                        server_.RemoveStream(shared_from_this());
×
1226
                        break;
×
1227
                case TransactionStatus::Replying:
1✔
1228
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1229
                        break;
1✔
1230
                case TransactionStatus::SwitchingProtocol:
1✔
1231
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1232
                        break;
1✔
1233
                case TransactionStatus::Done:
1234
                        break;
1235
                }
1236
        }
1237

1238
        if (!*cancelled) {
7✔
1239
                DoCancel();
×
1240
        }
1241
}
7✔
1242

1243
void Stream::DoCancel() {
607✔
1244
        if (socket_.is_open()) {
607✔
1245
                socket_.cancel();
209✔
1246
                socket_.close();
209✔
1247
        }
1248

1249
        // Set cancel state and then make a new one. Those who are interested should have their own
1250
        // pointer to the old one.
1251
        *cancelled_ = true;
607✔
1252
        cancelled_ = make_shared<bool>(true);
607✔
1253
}
607✔
1254

1255
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1256
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1257
}
×
1258

1259
void Stream::CallErrorHandler(
×
1260
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1261
        *cancelled_ = true;
×
1262
        cancelled_ = make_shared<bool>(true);
×
1263
        status_ = TransactionStatus::Done;
×
1264
        handler(expected::unexpected(err.WithContext(
×
1265
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1266

1267
        server_.RemoveStream(shared_from_this());
×
1268
}
×
1269

1270
void Stream::CallErrorHandler(
2✔
1271
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1272
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1273
}
2✔
1274

1275
void Stream::CallErrorHandler(
9✔
1276
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1277
        *cancelled_ = true;
9✔
1278
        cancelled_ = make_shared<bool>(true);
9✔
1279
        status_ = TransactionStatus::Done;
9✔
1280
        handler(
9✔
1281
                req,
1282
                err.WithContext(
9✔
1283
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
27✔
1284

1285
        server_.RemoveStream(shared_from_this());
9✔
1286
}
9✔
1287

1288
void Stream::CallErrorHandler(
4✔
1289
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1290
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1291
}
4✔
1292

1293
void Stream::CallErrorHandler(
7✔
1294
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1295
        *cancelled_ = true;
7✔
1296
        cancelled_ = make_shared<bool>(true);
7✔
1297
        status_ = TransactionStatus::Done;
7✔
1298
        handler(err.WithContext(
14✔
1299
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1300

1301
        server_.RemoveStream(shared_from_this());
7✔
1302
}
7✔
1303

1304
void Stream::CallErrorHandler(
×
1305
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1306
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1307
}
×
1308

1309
void Stream::CallErrorHandler(
1✔
1310
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1311
        *cancelled_ = true;
1✔
1312
        cancelled_ = make_shared<bool>(true);
1✔
1313
        status_ = TransactionStatus::Done;
1✔
1314
        handler(expected::unexpected(err.WithContext(
2✔
1315
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1316

1317
        server_.RemoveStream(shared_from_this());
1✔
1318
}
1✔
1319

1320
void Stream::AcceptHandler(const error_code &ec) {
217✔
1321
        if (ec) {
217✔
1322
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1323
                return;
×
1324
        }
1325

1326
        auto ip = socket_.remote_endpoint().address().to_string();
434✔
1327

1328
        // Use IP as context for logging.
1329
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
217✔
1330

1331
        logger_.Debug("Accepted connection.");
434✔
1332

1333
        request_.reset(new IncomingRequest(*this, cancelled_));
434✔
1334

1335
        request_->address_.host = ip;
217✔
1336

1337
        *cancelled_ = false;
217✔
1338

1339
        ReadHeader();
217✔
1340
}
1341

1342
void Stream::ReadHeader() {
217✔
1343
        auto &cancelled = cancelled_;
1344
        auto &request_data = request_data_;
217✔
1345

1346
        http::async_read_some(
434✔
1347
                socket_,
217✔
1348
                *request_data_.request_buffer_,
1349
                *request_data_.http_request_parser_,
1350
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
217✔
1351
                        if (!*cancelled) {
217✔
1352
                                ReadHeaderHandler(ec, num_read);
217✔
1353
                        }
1354
                });
217✔
1355
}
217✔
1356

1357
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
217✔
1358
        if (num_read > 0) {
217✔
1359
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
434✔
1360
        }
1361

1362
        if (ec) {
217✔
1363
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1364
                return;
178✔
1365
        }
1366

1367
        if (!request_data_.http_request_parser_->is_header_done()) {
217✔
1368
                ReadHeader();
×
1369
                return;
×
1370
        }
1371

1372
        auto method_result = BeastVerbToMethod(
1373
                request_data_.http_request_parser_->get().base().method(),
1374
                string {request_data_.http_request_parser_->get().base().method_string()});
434✔
1375
        if (!method_result) {
217✔
1376
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1377
                return;
×
1378
        }
1379
        request_->method_ = method_result.value();
217✔
1380
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
217✔
1381

1382
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
217✔
1383

1384
        string debug_str;
1385
        for (auto header = request_data_.http_request_parser_->get().cbegin();
379✔
1386
                 header != request_data_.http_request_parser_->get().cend();
596✔
1387
                 header++) {
1388
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,137✔
1389
                if (logger_.Level() >= log::LogLevel::Debug) {
379✔
1390
                        debug_str += string {header->name_string()};
318✔
1391
                        debug_str += ": ";
318✔
1392
                        debug_str += string {header->value()};
318✔
1393
                        debug_str += "\n";
318✔
1394
                }
1395
        }
1396

1397
        logger_.Debug("Received headers:\n" + debug_str);
434✔
1398
        debug_str.clear();
1399

1400
        if (request_data_.http_request_parser_->chunked()) {
217✔
1401
                auto cancelled = cancelled_;
1402
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
1403
                server_.header_handler_(request_);
2✔
1404
                if (!*cancelled) {
1✔
1405
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
1406
                        CallErrorHandler(err, request_, server_.body_handler_);
2✔
1407
                }
1408
                return;
1409
        }
1410

1411
        auto content_length = request_data_.http_request_parser_->content_length();
216✔
1412
        if (content_length) {
216✔
1413
                request_body_length_ = content_length.value();
40✔
1414
        } else {
1415
                request_body_length_ = 0;
176✔
1416
        }
1417
        request_body_read_ = 0;
216✔
1418

1419
        if (request_body_read_ >= request_body_length_) {
216✔
1420
                auto cancelled = cancelled_;
1421
                status_ = TransactionStatus::HeaderHandlerCalled;
176✔
1422
                server_.header_handler_(request_);
352✔
1423
                if (!*cancelled) {
176✔
1424
                        status_ = TransactionStatus::BodyHandlerCalled;
176✔
1425
                        CallBodyHandler();
176✔
1426
                }
1427
                return;
1428
        }
1429

1430
        auto cancelled = cancelled_;
1431
        status_ = TransactionStatus::HeaderHandlerCalled;
40✔
1432
        server_.header_handler_(request_);
80✔
1433
        if (*cancelled) {
40✔
1434
                return;
1435
        }
1436

1437
        // We know that a body reader is required here, because of the `request_body_read_ >=
1438
        // request_body_length_` check above.
1439
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
39✔
1440
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1441
        }
1442
}
1443

1444
void Stream::AsyncReadNextBodyPart(
2,034✔
1445
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1446
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1447

1448
        if (status_ == TransactionStatus::ReaderCreated) {
2,034✔
1449
                status_ = TransactionStatus::BodyReadingInProgress;
38✔
1450
        }
1451

1452
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,034✔
1453
                auto cancelled = cancelled_;
1454
                handler(0);
64✔
1455
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
32✔
1456
                        status_ = TransactionStatus::BodyHandlerCalled;
32✔
1457
                        CallBodyHandler();
32✔
1458
                }
1459
                return;
1460
        }
1461

1462
        reader_buf_start_ = start;
2,002✔
1463
        reader_buf_end_ = end;
2,002✔
1464
        reader_handler_ = handler;
2,002✔
1465
        size_t read_size = end - start;
2,002✔
1466
        size_t smallest = min(body_buffer_.size(), read_size);
3,058✔
1467

1468
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,002✔
1469
        request_data_.http_request_parser_->get().body().size = smallest;
2,002✔
1470

1471
        auto &cancelled = cancelled_;
1472
        auto &request_data = request_data_;
2,002✔
1473

1474
        http::async_read_some(
4,004✔
1475
                socket_,
2,002✔
1476
                *request_data_.request_buffer_,
1477
                *request_data_.http_request_parser_,
1478
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,002✔
1479
                        if (!*cancelled) {
2,002✔
1480
                                ReadBodyHandler(ec, num_read);
2,002✔
1481
                        }
1482
                });
2,002✔
1483
}
1484

1485
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,002✔
1486
        if (num_read > 0) {
2,002✔
1487
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
3,996✔
1488
                request_body_read_ += num_read;
1,998✔
1489
        }
1490

1491
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,002✔
1492
                // This can be ignored. We always reset the buffer between reads anyway.
1493
                ec = error_code();
979✔
1494
        }
1495

1496
        assert(reader_handler_);
1497

1498
        if (request_body_read_ >= request_body_length_) {
2,002✔
1499
                status_ = TransactionStatus::BodyReadingFinished;
32✔
1500
        }
1501

1502
        auto cancelled = cancelled_;
1503

1504
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,002✔
1505
        size_t smallest = min(num_read, buf_size);
2,002✔
1506
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,002✔
1507
        if (ec) {
2,002✔
1508
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1509
                reader_handler_(expected::unexpected(err));
12✔
1510
        } else {
1511
                reader_handler_(smallest);
3,996✔
1512
        }
1513

1514
        if (!*cancelled && ec) {
2,002✔
1515
                CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1516
                return;
1517
        }
1518
}
1519

1520
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
194✔
1521
        SetupResponse();
194✔
1522

1523
        reply_finished_handler_ = reply_finished_handler;
194✔
1524

1525
        auto &cancelled = cancelled_;
1526
        auto &response_data = response_data_;
194✔
1527

1528
        http::async_write_header(
388✔
1529
                socket_,
194✔
1530
                *response_data_.http_response_serializer_,
1531
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
194✔
1532
                        if (!*cancelled) {
194✔
1533
                                WriteHeaderHandler(ec, num_written);
193✔
1534
                        }
1535
                });
194✔
1536
}
194✔
1537

1538
void Stream::SetupResponse() {
203✔
1539
        auto response = maybe_response_.lock();
203✔
1540
        // Only called from existing responses, so this should always be true.
1541
        assert(response);
1542

1543
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1544
        status_ = TransactionStatus::Replying;
203✔
1545

1546
        // From here on we take shared ownership.
1547
        response_ = response;
1548

1549
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
406✔
1550

1551
        for (const auto &header : response->headers_) {
424✔
1552
                response_data_.http_response_->base().set(header.first, header.second);
221✔
1553
        }
1554

1555
        response_data_.http_response_->result(response->GetStatusCode());
203✔
1556
        response_data_.http_response_->reason(response->GetStatusMessage());
406✔
1557

1558
        response_data_.http_response_serializer_ =
1559
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
406✔
1560
}
203✔
1561

1562
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
193✔
1563
        if (num_written > 0) {
193✔
1564
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
386✔
1565
        }
1566

1567
        if (ec) {
193✔
1568
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1569
                return;
36✔
1570
        }
1571

1572
        auto header = response_->GetHeader("Content-Length");
386✔
1573
        if (!header || header.value() == "0") {
193✔
1574
                FinishReply();
35✔
1575
                return;
1576
        }
1577

1578
        auto length = common::StringToLongLong(header.value());
158✔
1579
        if (!length || length.value() < 0) {
158✔
1580
                auto err = error::Error(
1581
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1582
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1583
                return;
1584
        }
1585

1586
        if (!response_->body_reader_ && !response_->async_body_reader_) {
158✔
1587
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1588
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1589
                return;
1590
        }
1591

1592
        PrepareAndWriteNewBodyBuffer();
157✔
1593
}
1594

1595
void Stream::PrepareAndWriteNewBodyBuffer() {
1,873✔
1596
        // response_->body_reader_ XOR response_->async_body_reader_
1597
        assert(
1598
                (response_->body_reader_ || response_->async_body_reader_)
1599
                && !(response_->body_reader_ && response_->async_body_reader_));
1600

1601
        auto read_handler = [this](io::ExpectedSize read) {
1,874✔
1602
                if (!read) {
1,873✔
1603
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1604
                        return;
1✔
1605
                }
1606
                WriteNewBodyBuffer(read.value());
1,872✔
1607
        };
1,873✔
1608

1609
        if (response_->body_reader_) {
1,873✔
1610
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,198✔
1611
        } else {
1612
                auto err = response_->async_body_reader_->AsyncRead(
1613
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1614
                if (err != error::NoError) {
274✔
1615
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1616
                }
1617
        }
1618
}
1,873✔
1619

1620
void Stream::WriteNewBodyBuffer(size_t size) {
1,872✔
1621
        response_data_.http_response_->body().data = body_buffer_.data();
1,872✔
1622
        response_data_.http_response_->body().size = size;
1,872✔
1623

1624
        if (size > 0) {
1,872✔
1625
                response_data_.http_response_->body().more = true;
1,750✔
1626
        } else {
1627
                response_data_.http_response_->body().more = false;
122✔
1628
        }
1629

1630
        WriteBody();
1,872✔
1631
}
1,872✔
1632

1633
void Stream::WriteBody() {
3,601✔
1634
        auto &cancelled = cancelled_;
1635
        auto &response_data = response_data_;
3,601✔
1636

1637
        http::async_write_some(
7,202✔
1638
                socket_,
3,601✔
1639
                *response_data_.http_response_serializer_,
1640
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
3,562✔
1641
                        if (!*cancelled) {
3,562✔
1642
                                WriteBodyHandler(ec, num_written);
3,562✔
1643
                        }
1644
                });
3,562✔
1645
}
3,601✔
1646

1647
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
3,562✔
1648
        if (num_written > 0) {
3,562✔
1649
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,458✔
1650
        }
1651

1652
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,562✔
1653
                // Write next body block.
1654
                PrepareAndWriteNewBodyBuffer();
1,716✔
1655
        } else if (ec) {
1,846✔
1656
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1657
        } else if (num_written > 0) {
1,842✔
1658
                // We are still writing the body.
1659
                WriteBody();
1,729✔
1660
        } else {
1661
                // We are finished.
1662
                FinishReply();
113✔
1663
        }
1664
}
3,562✔
1665

1666
void Stream::FinishReply() {
148✔
1667
        // We are done.
1668
        *cancelled_ = true;
148✔
1669
        cancelled_ = make_shared<bool>(true);
148✔
1670
        status_ = TransactionStatus::Done;
148✔
1671
        // Release ownership of Body reader.
1672
        response_->body_reader_.reset();
148✔
1673
        response_->async_body_reader_.reset();
148✔
1674
        reply_finished_handler_(error::NoError);
148✔
1675
        server_.RemoveStream(shared_from_this());
148✔
1676
}
148✔
1677

1678
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1679
        SetupResponse();
9✔
1680

1681
        switch_protocol_handler_ = handler;
9✔
1682
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1683

1684
        auto &cancelled = cancelled_;
1685
        auto &response_data = response_data_;
9✔
1686

1687
        http::async_write_header(
18✔
1688
                socket_,
9✔
1689
                *response_data_.http_response_serializer_,
1690
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1691
                        if (!*cancelled) {
9✔
1692
                                SwitchingProtocolHandler(ec, num_written);
8✔
1693
                        }
1694
                });
9✔
1695

1696
        return error::NoError;
9✔
1697
}
1698

1699
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1700
        if (num_written > 0) {
8✔
1701
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1702
        }
1703

1704
        if (ec) {
8✔
1705
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1706
                return;
×
1707
        }
1708

1709
        auto socket = make_shared<RawSocket<tcp::socket>>(
1710
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1711

1712
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1713

1714
        // Rest of the connection is done directly on the socket, we are done here.
1715
        status_ = TransactionStatus::Done;
8✔
1716
        *cancelled_ = true;
8✔
1717
        cancelled_ = make_shared<bool>(true);
8✔
1718
        server_.RemoveStream(shared_from_this());
16✔
1719

1720
        switch_protocol_handler(socket);
16✔
1721
}
1722

1723
void Stream::CallBodyHandler() {
208✔
1724
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1725
        // it immediately destroys, which would destroy this stream as well. At the end of this
1726
        // function, it's ok to destroy it.
1727
        auto stream_ref = shared_from_this();
1728

1729
        server_.body_handler_(request_, error::NoError);
624✔
1730

1731
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1732
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1733
        // request has not been handled correctly.
1734
        auto response = maybe_response_.lock();
208✔
1735
        if (!response) {
208✔
1736
                logger_.Error("Handler produced no response. Closing stream prematurely.");
4✔
1737
                *cancelled_ = true;
2✔
1738
                cancelled_ = make_shared<bool>(true);
2✔
1739
                server_.RemoveStream(shared_from_this());
6✔
1740
        }
1741
}
208✔
1742

1743
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
232✔
1744
        event_loop_ {event_loop},
1745
        acceptor_(GetAsioIoContext(event_loop_)) {
418✔
1746
}
232✔
1747

1748
Server::~Server() {
464✔
1749
        Cancel();
232✔
1750
}
232✔
1751

1752
error::Error Server::AsyncServeUrl(
197✔
1753
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1754
        return AsyncServeUrl(
1755
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
804✔
1756
                        if (err != error::NoError) {
204✔
1757
                                body_handler(expected::unexpected(err));
14✔
1758
                        } else {
1759
                                body_handler(req);
394✔
1760
                        }
1761
                });
598✔
1762
}
1763

1764
error::Error Server::AsyncServeUrl(
212✔
1765
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1766
        auto err = BreakDownUrl(url, address_);
212✔
1767
        if (error::NoError != err) {
212✔
1768
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1769
        }
1770

1771
        if (address_.protocol != "http") {
212✔
1772
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1773
        }
1774

1775
        if (address_.path.size() > 0 && address_.path != "/") {
212✔
1776
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1777
        }
1778

1779
        boost::system::error_code ec;
211✔
1780
        auto address = asio::ip::make_address(address_.host, ec);
211✔
1781
        if (ec) {
211✔
1782
                return error::Error(
1783
                        ec.default_error_condition(),
×
1784
                        "Could not construct endpoint from address " + address_.host);
×
1785
        }
1786

1787
        asio::ip::tcp::endpoint endpoint(address, address_.port);
211✔
1788

1789
        ec.clear();
1790
        acceptor_.open(endpoint.protocol(), ec);
211✔
1791
        if (ec) {
211✔
1792
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1793
        }
1794

1795
        // Allow address reuse, otherwise we can't re-bind later.
1796
        ec.clear();
1797
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
211✔
1798
        if (ec) {
211✔
1799
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1800
        }
1801

1802
        ec.clear();
1803
        acceptor_.bind(endpoint, ec);
211✔
1804
        if (ec) {
211✔
1805
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1806
        }
1807

1808
        ec.clear();
1809
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
211✔
1810
        if (ec) {
211✔
1811
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1812
        }
1813

1814
        header_handler_ = header_handler;
211✔
1815
        body_handler_ = body_handler;
211✔
1816

1817
        PrepareNewStream();
211✔
1818

1819
        return error::NoError;
211✔
1820
}
1821

1822
void Server::Cancel() {
252✔
1823
        if (acceptor_.is_open()) {
252✔
1824
                acceptor_.cancel();
211✔
1825
                acceptor_.close();
211✔
1826
        }
1827
        streams_.clear();
1828
}
252✔
1829

1830
uint16_t Server::GetPort() const {
17✔
1831
        return acceptor_.local_endpoint().port();
17✔
1832
}
1833

1834
string Server::GetUrl() const {
16✔
1835
        return "http://127.0.0.1:" + to_string(GetPort());
32✔
1836
}
1837

1838
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
207✔
1839
        if (*req->cancelled_) {
207✔
1840
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1841
        }
1842
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
414✔
1843
        req->stream_.maybe_response_ = response;
207✔
1844
        return response;
207✔
1845
}
1846

1847
error::Error Server::AsyncReply(
194✔
1848
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1849
        if (*resp->cancelled_) {
194✔
1850
                return MakeError(StreamCancelledError, "Cannot send response");
×
1851
        }
1852

1853
        resp->stream_.AsyncReply(reply_finished_handler);
194✔
1854
        return error::NoError;
194✔
1855
}
1856

1857
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
55✔
1858
        if (*req->cancelled_) {
55✔
1859
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1860
        }
1861

1862
        auto &stream = req->stream_;
55✔
1863
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
55✔
1864
                return expected::unexpected(error::Error(
1✔
1865
                        make_error_condition(errc::operation_in_progress),
2✔
1866
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1867
        }
1868

1869
        if (stream.request_body_length_ == 0) {
54✔
1870
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
48✔
1871
        }
1872

1873
        stream.status_ = TransactionStatus::ReaderCreated;
38✔
1874
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
76✔
1875
}
1876

1877
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1878
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1879
}
1880

1881
void Server::PrepareNewStream() {
428✔
1882
        StreamPtr new_stream {new Stream(*this)};
428✔
1883
        streams_.insert(new_stream);
1884
        AsyncAccept(new_stream);
856✔
1885
}
428✔
1886

1887
void Server::AsyncAccept(StreamPtr stream) {
428✔
1888
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
648✔
1889
                if (ec) {
220✔
1890
                        if (ec != errc::operation_canceled) {
3✔
1891
                                log::Error("Could not accept connection: " + ec.message());
×
1892
                        }
1893
                        return;
3✔
1894
                }
1895

1896
                stream->AcceptHandler(ec);
217✔
1897

1898
                this->PrepareNewStream();
217✔
1899
        });
1900
}
428✔
1901

1902
void Server::RemoveStream(StreamPtr stream) {
179✔
1903
        streams_.erase(stream);
179✔
1904

1905
        stream->DoCancel();
179✔
1906
}
179✔
1907

1908
} // namespace http
1909
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc