• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1045900765

23 Oct 2023 08:35AM UTC coverage: 80.369% (+0.3%) from 80.119%
1045900765

push

gitlab-ci

lluiscampos
chore: Consolidate struct members for server URL.

Use only the list, instead of the list and the single entry.

Ticket: MEN-6662

Signed-off-by: Kristian Amlie <kristian.amlie@northern.tech>

9 of 9 new or added lines in 1 file covered. (100.0%)

6841 of 8512 relevant lines covered (80.37%)

9449.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.59
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24

25
namespace mender {
26
namespace http {
27

28
namespace common = mender::common;
29

30
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
31
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
32
const unsigned int BeastHttpVersion = 11;
33

34
namespace asio = boost::asio;
35
namespace http = boost::beast::http;
36

37
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
38

39
static http::verb MethodToBeastVerb(Method method) {
226✔
40
        switch (method) {
226✔
41
        case Method::GET:
42
                return http::verb::get;
43
        case Method::HEAD:
44
                return http::verb::head;
45
        case Method::POST:
46
                return http::verb::post;
47
        case Method::PUT:
48
                return http::verb::put;
49
        case Method::PATCH:
50
                return http::verb::patch;
51
        case Method::CONNECT:
52
                return http::verb::connect;
53
        case Method::Invalid:
54
                // Fallthrough to end (no-op).
55
                break;
56
        }
57
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
58
        // still assert here for safety.
59
        assert(false);
60
        return http::verb::get;
61
}
62

63
static expected::expected<Method, error::Error> BeastVerbToMethod(
212✔
64
        http::verb verb, const string &verb_string) {
65
        switch (verb) {
212✔
66
        case http::verb::get:
178✔
67
                return Method::GET;
68
        case http::verb::head:
×
69
                return Method::HEAD;
70
        case http::verb::post:
12✔
71
                return Method::POST;
72
        case http::verb::put:
22✔
73
                return Method::PUT;
74
        case http::verb::patch:
×
75
                return Method::PATCH;
76
        case http::verb::connect:
×
77
                return Method::CONNECT;
78
        default:
×
79
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
80
        }
81
}
82

83
template <typename StreamType>
84
class BodyAsyncReader : virtual public io::AsyncReader {
85
public:
86
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
141✔
87
                stream_ {stream},
88
                cancelled_ {cancelled} {
282✔
89
        }
141✔
90
        ~BodyAsyncReader() {
36✔
91
                Cancel();
36✔
92
        }
72✔
93

94
        error::Error AsyncRead(
2,030✔
95
                vector<uint8_t>::iterator start,
96
                vector<uint8_t>::iterator end,
97
                io::AsyncIoHandler handler) override {
98
                if (*cancelled_) {
2,030✔
99
                        return error::MakeError(
×
100
                                error::ProgrammingError,
101
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
102
                }
103
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,030✔
104
                return error::NoError;
2,030✔
105
        }
106

107
        void Cancel() override {
38✔
108
                if (!*cancelled_) {
38✔
109
                        stream_.Cancel();
4✔
110
                }
111
        }
38✔
112

113
private:
114
        StreamType &stream_;
115
        shared_ptr<bool> cancelled_;
116

117
        friend class Client;
118
        friend class Server;
119
};
120

121
template <typename StreamType>
122
class RawSocket : virtual public io::AsyncReadWriter {
123
public:
124
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
125
                destroying_ {make_shared<bool>(false)},
×
126
                stream_ {stream},
127
                buffered_ {buffered} {
×
128
                // If there are no buffered bytes, then we don't need it.
129
                if (buffered_ && buffered_->size() == 0) {
×
130
                        buffered_.reset();
×
131
                }
132
        }
×
133

134
        ~RawSocket() {
15✔
135
                *destroying_ = true;
15✔
136
                Cancel();
15✔
137
        }
30✔
138

139
        error::Error AsyncRead(
320✔
140
                vector<uint8_t>::iterator start,
141
                vector<uint8_t>::iterator end,
142
                io::AsyncIoHandler handler) override {
143
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
144
                // header and parts of the body in one block, return those first.
145
                if (buffered_) {
320✔
146
                        return DrainPrebufferedData(start, end, handler);
8✔
147
                }
148

149
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
150
                auto &destroying = destroying_;
151
                stream_->async_read_some(
632✔
152
                        read_buffer_,
316✔
153
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
154
                                if (*destroying) {
313✔
155
                                        return;
156
                                }
157

158
                                if (ec == asio::error::operation_aborted) {
313✔
159
                                        handler(expected::unexpected(error::Error(
12✔
160
                                                make_error_condition(errc::operation_canceled),
6✔
161
                                                "Could not read from socket")));
162
                                } else if (ec) {
310✔
163
                                        handler(expected::unexpected(
12✔
164
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
165
                                } else {
166
                                        handler(num_read);
608✔
167
                                }
168
                        });
169
                return error::NoError;
316✔
170
        }
171

172
        error::Error AsyncWrite(
309✔
173
                vector<uint8_t>::const_iterator start,
174
                vector<uint8_t>::const_iterator end,
175
                io::AsyncIoHandler handler) override {
176
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
177
                auto &destroying = destroying_;
178
                stream_->async_write_some(
618✔
179
                        write_buffer_,
309✔
180
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
181
                                if (*destroying) {
306✔
182
                                        return;
183
                                }
184

185
                                if (ec == asio::error::operation_aborted) {
306✔
186
                                        handler(expected::unexpected(error::Error(
×
187
                                                make_error_condition(errc::operation_canceled),
×
188
                                                "Could not write to socket")));
189
                                } else if (ec) {
306✔
190
                                        handler(expected::unexpected(
×
191
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
192
                                } else {
193
                                        handler(num_written);
612✔
194
                                }
195
                        });
196
                return error::NoError;
309✔
197
        }
198

199
        void Cancel() override {
28✔
200
                if (stream_->lowest_layer().is_open()) {
28✔
201
                        stream_->lowest_layer().cancel();
15✔
202
                        stream_->lowest_layer().close();
15✔
203
                }
204
        }
28✔
205

206
private:
207
        error::Error DrainPrebufferedData(
4✔
208
                vector<uint8_t>::iterator start,
209
                vector<uint8_t>::iterator end,
210
                io::AsyncIoHandler handler) {
211
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
4✔
212

213
                // These two lines are equivalent to:
214
                //   copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
215
                // but compatible with Boost 1.67.
216
                const beast::flat_buffer &cbuffered = *buffered_;
217
                copy_n(static_cast<const uint8_t *>(cbuffered.data().data()), to_copy, start);
4✔
218
                buffered_->consume(to_copy);
4✔
219
                if (buffered_->size() == 0) {
4✔
220
                        // We don't need it anymore.
221
                        buffered_.reset();
4✔
222
                }
223
                handler(to_copy);
4✔
224
                return error::NoError;
4✔
225
        }
226

227
        shared_ptr<bool> destroying_;
228
        shared_ptr<StreamType> stream_;
229
        shared_ptr<beast::flat_buffer> buffered_;
230
        asio::mutable_buffer read_buffer_;
231
        asio::const_buffer write_buffer_;
232
};
233

234
Client::Client(
332✔
235
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
236
        event_loop_ {event_loop},
237
        logger_name_ {logger_name},
238
        client_config_ {client},
239
        http_proxy_ {client.http_proxy},
332✔
240
        https_proxy_ {client.https_proxy},
332✔
241
        no_proxy_ {client.no_proxy},
332✔
242
        cancelled_ {make_shared<bool>(true)},
×
243
        disable_keep_alive_ {client.disable_keep_alive},
332✔
244
        resolver_(GetAsioIoContext(event_loop)),
245
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,328✔
246
}
332✔
247

248
Client::~Client() {
1,992✔
249
        if (!*cancelled_) {
332✔
250
                logger_.Warning("Client destroyed while request is still active!");
26✔
251
        }
252
        DoCancel();
332✔
253
}
332✔
254

255
error::Error Client::Initialize() {
260✔
256
        if (initialized_) {
260✔
257
                return error::NoError;
66✔
258
        }
259

260
        for (auto i = 0; i < MENDER_BOOST_BEAST_SSL_CTX_COUNT; i++) {
572✔
261
                ssl_ctx_[i].set_verify_mode(
383✔
262
                        client_config_.skip_verify ? ssl::verify_none : ssl::verify_peer);
383✔
263

264
                beast::error_code ec {};
383✔
265
                if (client_config_.client_cert_path != "" and client_config_.client_cert_key_path != "") {
383✔
266
                        ssl_ctx_[i].set_options(boost::asio::ssl::context::default_workarounds);
4✔
267
                        ssl_ctx_[i].use_certificate_file(
268
                                client_config_.client_cert_path, boost::asio::ssl::context_base::pem, ec);
4✔
269
                        if (ec) {
4✔
270
                                return error::Error(
271
                                        ec.default_error_condition(), "Could not load client certificate");
2✔
272
                        }
273
                        ssl_ctx_[i].use_private_key_file(
274
                                client_config_.client_cert_key_path, boost::asio::ssl::context_base::pem, ec);
3✔
275
                        if (ec) {
3✔
276
                                return error::Error(
277
                                        ec.default_error_condition(), "Could not load client certificate private key");
2✔
278
                        }
279
                } else if (
280
                        client_config_.client_cert_path != "" or client_config_.client_cert_key_path != "") {
379✔
281
                        return error::Error(
282
                                make_error_condition(errc::invalid_argument),
4✔
283
                                "Cannot set only one of client certificate, and client certificate private key");
4✔
284
                }
285

286
                ssl_ctx_[i].set_default_verify_paths(ec); // Load the default CAs
379✔
287
                if (ec) {
379✔
288
                        auto err = error::Error(
289
                                ec.default_error_condition(), "Failed to load the SSL default directory");
×
290
                        if (client_config_.server_cert_path == "") {
×
291
                                // We aren't going to have any valid certificates then.
292
                                return err;
×
293
                        } else {
294
                                // We have a dedicated certificate, so this is not fatal.
295
                                log::Info(err.String());
×
296
                        }
297
                }
298
                if (client_config_.server_cert_path != "") {
379✔
299
                        ssl_ctx_[i].load_verify_file(client_config_.server_cert_path, ec);
35✔
300
                        if (ec) {
35✔
301
                                return error::Error(
302
                                        ec.default_error_condition(), "Failed to load the server certificate!");
2✔
303
                        }
304
                }
305
        }
306

307
        initialized_ = true;
189✔
308

309
        return error::NoError;
189✔
310
}
311

312
error::Error Client::AsyncCall(
260✔
313
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
314
        auto err = Initialize();
260✔
315
        if (err != error::NoError) {
260✔
316
                return err;
5✔
317
        }
318

319
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
255✔
320
                return error::Error(
321
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
322
        }
323

324
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
255✔
325
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
326
        }
327

328
        if (!header_handler || !body_handler) {
253✔
329
                return error::MakeError(
330
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
331
        }
332

333
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
252✔
334
                return error::Error(
335
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
336
        }
337

338
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
251✔
339

340
        request_ = req;
341

342
        err = HandleProxySetup();
251✔
343
        if (err != error::NoError) {
251✔
344
                return err;
4✔
345
        }
346

347
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
348
        // request to route to our k8s cluster. Set this in all cases.
349
        req->SetHeader("HOST", req->address_.host);
494✔
350

351
        header_handler_ = header_handler;
247✔
352
        body_handler_ = body_handler;
247✔
353
        status_ = TransactionStatus::None;
247✔
354

355
        cancelled_ = make_shared<bool>(false);
247✔
356

357
        auto &cancelled = cancelled_;
358

359
        resolver_.async_resolve(
494✔
360
                request_->address_.host,
361
                to_string(request_->address_.port),
494✔
362
                [this, cancelled](
493✔
363
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
245✔
364
                        if (!*cancelled) {
246✔
365
                                ResolveHandler(ec, results);
245✔
366
                        }
367
                });
246✔
368

369
        return error::NoError;
247✔
370
}
371

372
error::Error Client::HandleProxySetup() {
251✔
373
        secondary_req_.reset();
251✔
374

375
        if (request_->address_.protocol == "http") {
251✔
376
                socket_mode_ = SocketMode::Plain;
234✔
377

378
                if (http_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, http_proxy_)) {
234✔
379
                        // Make a modified proxy request.
380
                        BrokenDownUrl proxy_address;
16✔
381
                        auto err = BreakDownUrl(http_proxy_, proxy_address);
9✔
382
                        if (err != error::NoError) {
9✔
383
                                return err.WithContext("HTTP proxy URL is invalid");
2✔
384
                        }
385
                        if (proxy_address.path != "" && proxy_address.path != "/") {
8✔
386
                                return MakeError(
387
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
388
                        }
389

390
                        request_->address_.path = request_->address_.protocol + "://" + request_->address_.host
14✔
391
                                                                          + ":" + to_string(request_->address_.port)
21✔
392
                                                                          + request_->address_.path;
21✔
393
                        request_->address_.host = proxy_address.host;
7✔
394
                        request_->address_.port = proxy_address.port;
7✔
395
                        request_->address_.protocol = proxy_address.protocol;
7✔
396

397
                        if (proxy_address.protocol == "https") {
7✔
398
                                socket_mode_ = SocketMode::Tls;
4✔
399
                        } else if (proxy_address.protocol == "http") {
3✔
400
                                socket_mode_ = SocketMode::Plain;
3✔
401
                        } else {
402
                                // Should never get here.
403
                                assert(false);
404
                        }
405
                }
406
        } else if (request_->address_.protocol == "https") {
17✔
407
                socket_mode_ = SocketMode::Tls;
17✔
408

409
                if (https_proxy_ != "" && !HostNameMatchesNoProxy(request_->address_.host, https_proxy_)) {
17✔
410
                        // Save the original request for later, so that we can make a new request
411
                        // over the channel established by CONNECT.
412
                        secondary_req_ = std::move(request_);
413

414
                        request_ = make_shared<OutgoingRequest>();
20✔
415
                        request_->SetMethod(Method::CONNECT);
10✔
416
                        BrokenDownUrl proxy_address;
18✔
417
                        auto err = BreakDownUrl(https_proxy_, proxy_address);
10✔
418
                        if (err != error::NoError) {
10✔
419
                                return err.WithContext("HTTPS proxy URL is invalid");
2✔
420
                        }
421
                        if (proxy_address.path != "" && proxy_address.path != "/") {
9✔
422
                                return MakeError(
423
                                        InvalidUrlError, "A URL with a path is not legal for a proxy address");
2✔
424
                        }
425

426
                        request_->address_.path =
427
                                secondary_req_->address_.host + ":" + to_string(secondary_req_->address_.port);
16✔
428
                        request_->address_.host = proxy_address.host;
8✔
429
                        request_->address_.port = proxy_address.port;
8✔
430
                        request_->address_.protocol = proxy_address.protocol;
8✔
431

432
                        if (proxy_address.protocol == "https") {
8✔
433
                                socket_mode_ = SocketMode::Tls;
4✔
434
                        } else if (proxy_address.protocol == "http") {
4✔
435
                                socket_mode_ = SocketMode::Plain;
4✔
436
                        } else {
437
                                // Should never get here.
438
                                assert(false);
439
                        }
440
                }
441
        } else {
442
                // Should never get here
443
                assert(false);
444
        }
445

446
        return error::NoError;
247✔
447
}
448

449
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
159✔
450
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
159✔
451
                return expected::unexpected(error::Error(
2✔
452
                        make_error_condition(errc::operation_in_progress),
4✔
453
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
454
        }
455

456
        if (response_body_length_ == 0) {
157✔
457
                return expected::unexpected(
16✔
458
                        MakeError(BodyMissingError, "Response does not contain a body"));
48✔
459
        }
460

461
        status_ = TransactionStatus::ReaderCreated;
141✔
462
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
282✔
463
}
464

465
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
466
        if (*cancelled_) {
7✔
467
                return expected::unexpected(error::Error(
×
468
                        make_error_condition(errc::not_connected),
×
469
                        "Cannot switch protocols if endpoint is not connected"));
×
470
        }
471

472
        // Rest of the connection is done directly on the socket, we are done here.
473
        status_ = TransactionStatus::Done;
7✔
474
        *cancelled_ = true;
7✔
475
        cancelled_ = make_shared<bool>(false);
14✔
476

477
        auto stream = stream_;
478
        // This no longer belongs to us.
479
        stream_.reset();
7✔
480

481
        switch (socket_mode_) {
7✔
482
        case SocketMode::TlsTls:
×
483
                return make_shared<RawSocket<ssl::stream<ssl::stream<tcp::socket>>>>(
×
484
                        stream, response_data_.response_buffer_);
×
485
        case SocketMode::Tls:
×
486
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
487
                        make_shared<ssl::stream<tcp::socket>>(std::move(stream->next_layer())),
×
488
                        response_data_.response_buffer_);
×
489
        case SocketMode::Plain:
7✔
490
                return make_shared<RawSocket<tcp::socket>>(
7✔
491
                        make_shared<tcp::socket>(std::move(stream->next_layer().next_layer())),
14✔
492
                        response_data_.response_buffer_);
7✔
493
        }
494

495
        AssertOrReturnUnexpected(false);
×
496
}
497

498
void Client::CallHandler(ResponseHandler handler) {
282✔
499
        // This function exists to make sure we have a copy of the handler we're calling (in the
500
        // argument list). This is important in case the handler owns the client instance through a
501
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
502
        // does, then it destroys the final copy of the handler, and therefore also the client,
503
        // which is why we need to make a copy here, before calling it.
504
        handler(response_);
282✔
505
}
282✔
506

507
void Client::CallErrorHandler(
33✔
508
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
509
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
99✔
510
}
33✔
511

512
void Client::CallErrorHandler(
162✔
513
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
514
        *cancelled_ = true;
162✔
515
        cancelled_ = make_shared<bool>(true);
162✔
516
        stream_.reset();
162✔
517
        status_ = TransactionStatus::Done;
162✔
518
        handler(expected::unexpected(
324✔
519
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
648✔
520
}
162✔
521

522
void Client::ResolveHandler(
245✔
523
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
524
        if (ec) {
245✔
525
                CallErrorHandler(ec, request_, header_handler_);
×
526
                return;
×
527
        }
528

529
        if (logger_.Level() >= log::LogLevel::Debug) {
245✔
530
                string ips = "[";
228✔
531
                string sep;
532
                for (auto r : results) {
964✔
533
                        ips += sep;
254✔
534
                        ips += r.endpoint().address().to_string();
254✔
535
                        sep = ", ";
254✔
536
                }
537
                ips += "]";
228✔
538
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
456✔
539
        }
540

541
        resolver_results_ = results;
542

543
        stream_ = make_shared<ssl::stream<ssl::stream<tcp::socket>>>(
245✔
544
                ssl::stream<tcp::socket>(GetAsioIoContext(event_loop_), ssl_ctx_[0]), ssl_ctx_[1]);
490✔
545

546
        if (!response_data_.response_buffer_) {
245✔
547
                // We can reuse this if preexisting.
548
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
358✔
549

550
                // This is equivalent to:
551
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
552
                // but compatible with Boost 1.67.
553
                response_data_.response_buffer_->prepare(
554
                        body_buffer_.size() - response_data_.response_buffer_->size());
179✔
555
        }
556

557
        auto &cancelled = cancelled_;
558

559
        asio::async_connect(
245✔
560
                stream_->lowest_layer(),
561
                resolver_results_,
245✔
562
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
490✔
563
                        if (!*cancelled) {
245✔
564
                                switch (socket_mode_) {
245✔
565
                                case SocketMode::TlsTls:
×
566
                                        // Should never happen because we always need to handshake
567
                                        // the innermost Tls first, then the outermost, but the
568
                                        // latter doesn't happen here.
569
                                        assert(false);
570
                                        CallErrorHandler(
×
571
                                                error::MakeError(
×
572
                                                        error::ProgrammingError, "TlsTls mode is invalid in ResolveHandler"),
×
573
                                                request_,
×
574
                                                header_handler_);
×
575
                                case SocketMode::Tls:
14✔
576
                                        return HandshakeHandler(stream_->next_layer(), ec, endpoint);
14✔
577
                                case SocketMode::Plain:
231✔
578
                                        return ConnectHandler(ec, endpoint);
231✔
579
                                }
580
                        }
581
                });
582
}
583

584
template <typename StreamType>
585
void Client::HandshakeHandler(
16✔
586
        StreamType &stream, const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
587
        if (ec) {
16✔
588
                CallErrorHandler(ec, request_, header_handler_);
2✔
589
                return;
2✔
590
        }
591

592
        if (not disable_keep_alive_) {
14✔
593
                boost::asio::socket_base::keep_alive option(true);
594
                stream_->lowest_layer().set_option(option);
14✔
595
        }
596

597
        // Set SNI Hostname (many hosts need this to handshake successfully)
598
        if (!SSL_set_tlsext_host_name(stream.native_handle(), request_->address_.host.c_str())) {
14✔
599
                beast::error_code ec2 {
×
600
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
601
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
602
        }
603

604
        auto &cancelled = cancelled_;
605

606
        stream.async_handshake(
28✔
607
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
14✔
608
                        if (*cancelled) {
15✔
609
                                return;
610
                        }
611
                        if (ec) {
15✔
612
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
8✔
613
                                CallErrorHandler(ec, request_, header_handler_);
4✔
614
                                return;
4✔
615
                        }
616
                        logger_.Debug("https: Successful SSL handshake");
22✔
617
                        ConnectHandler(ec, endpoint);
11✔
618
                });
619
}
620

621

622
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
242✔
623
        if (ec) {
242✔
624
                CallErrorHandler(ec, request_, header_handler_);
16✔
625
                return;
16✔
626
        }
627

628
        if (not disable_keep_alive_) {
226✔
629
                boost::asio::socket_base::keep_alive option(true);
630
                stream_->lowest_layer().set_option(option);
165✔
631
        }
632

633
        logger_.Debug("Connected to " + endpoint.address().to_string());
452✔
634

635
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
226✔
636
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
452✔
637

638
        for (const auto &header : request_->headers_) {
597✔
639
                request_data_.http_request_->set(header.first, header.second);
371✔
640
        }
641

642
        request_data_.http_request_serializer_ =
643
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
226✔
644

645
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
452✔
646

647
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
648
        // if they do, they should be handled higher up in the application logic.
649
        //
650
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
651
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
652
        // `has_value()` in their code, causing their subsequent comparison operation to
653
        // misbehave. So pass highest possible value instead.
654
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
655

656
        auto &cancelled = cancelled_;
657
        auto &request_data = request_data_;
226✔
658

659
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
226✔
660
                if (!*cancelled) {
226✔
661
                        WriteHeaderHandler(ec, num_written);
226✔
662
                }
663
        };
452✔
664

665
        switch (socket_mode_) {
226✔
666
        case SocketMode::TlsTls:
1✔
667
                http::async_write_header(*stream_, *request_data_.http_request_serializer_, handler);
1✔
668
                break;
669
        case SocketMode::Tls:
10✔
670
                http::async_write_header(
10✔
671
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
672
                break;
673
        case SocketMode::Plain:
215✔
674
                http::async_write_header(
215✔
675
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
676
                break;
677
        }
678
}
679

680
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
226✔
681
        if (num_written > 0) {
226✔
682
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
452✔
683
        }
684

685
        if (ec) {
226✔
686
                CallErrorHandler(ec, request_, header_handler_);
×
687
                return;
189✔
688
        }
689

690
        auto header = request_->GetHeader("Content-Length");
452✔
691
        if (!header || header.value() == "0") {
226✔
692
                ReadHeader();
188✔
693
                return;
694
        }
695

696
        auto length = common::StringToLongLong(header.value());
38✔
697
        if (!length || length.value() < 0) {
38✔
698
                auto err = error::Error(
699
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
700
                CallErrorHandler(err, request_, header_handler_);
×
701
                return;
702
        }
703
        request_body_length_ = length.value();
38✔
704

705
        if (!request_->body_gen_ && !request_->async_body_gen_) {
38✔
706
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
707
                CallErrorHandler(err, request_, header_handler_);
2✔
708
                return;
709
        }
710

711
        assert(!(request_->body_gen_ && request_->async_body_gen_));
712

713
        if (request_->body_gen_) {
37✔
714
                auto body_reader = request_->body_gen_();
31✔
715
                if (!body_reader) {
31✔
716
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
717
                        return;
718
                }
719
                request_->body_reader_ = body_reader.value();
31✔
720
        } else {
721
                auto body_reader = request_->async_body_gen_();
6✔
722
                if (!body_reader) {
6✔
723
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
724
                        return;
725
                }
726
                request_->async_body_reader_ = body_reader.value();
6✔
727
        }
728

729
        PrepareAndWriteNewBodyBuffer();
37✔
730
}
731

732
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,118✔
733
        if (num_written > 0) {
2,118✔
734
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,084✔
735
        }
736

737
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,118✔
738
                // Write next block of the body.
739
                PrepareAndWriteNewBodyBuffer();
1,042✔
740
        } else if (ec) {
1,076✔
741
                CallErrorHandler(ec, request_, header_handler_);
8✔
742
        } else if (num_written > 0) {
1,072✔
743
                // We are still writing the body.
744
                WriteBody();
1,042✔
745
        } else {
746
                // We are ready to receive the response.
747
                ReadHeader();
30✔
748
        }
749
}
2,118✔
750

751
void Client::PrepareAndWriteNewBodyBuffer() {
1,079✔
752
        // request_->body_reader_ XOR request_->async_body_reader_
753
        assert(
754
                (request_->body_reader_ || request_->async_body_reader_)
755
                && !(request_->body_reader_ && request_->async_body_reader_));
756

757
        auto cancelled = cancelled_;
758
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,430✔
759
                if (!*cancelled) {
1,079✔
760
                        if (!read) {
1,078✔
761
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
762
                                return;
2✔
763
                        }
764
                        WriteNewBodyBuffer(read.value());
1,076✔
765
                }
766
        };
1,079✔
767

768

769
        if (request_->body_reader_) {
1,079✔
770
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,308✔
771
        } else {
772
                auto err = request_->async_body_reader_->AsyncRead(
773
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
774
                if (err != error::NoError) {
425✔
775
                        CallErrorHandler(err, request_, header_handler_);
×
776
                }
777
        }
778
}
1,079✔
779

780
void Client::WriteNewBodyBuffer(size_t size) {
1,076✔
781
        request_data_.http_request_->body().data = body_buffer_.data();
1,076✔
782
        request_data_.http_request_->body().size = size;
1,076✔
783

784
        if (size > 0) {
1,076✔
785
                request_data_.http_request_->body().more = true;
1,046✔
786
        } else {
787
                // Release ownership of Body reader.
788
                request_->body_reader_.reset();
30✔
789
                request_->async_body_reader_.reset();
30✔
790
                request_data_.http_request_->body().more = false;
30✔
791
        }
792

793
        WriteBody();
1,076✔
794
}
1,076✔
795

796
void Client::WriteBody() {
2,118✔
797
        auto &cancelled = cancelled_;
798
        auto &request_data = request_data_;
2,118✔
799

800
        auto handler = [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,118✔
801
                if (!*cancelled) {
2,118✔
802
                        WriteBodyHandler(ec, num_written);
2,118✔
803
                }
804
        };
4,236✔
805

806
        switch (socket_mode_) {
2,118✔
807
        case SocketMode::TlsTls:
×
808
                http::async_write_some(*stream_, *request_data_.http_request_serializer_, handler);
809
                break;
810
        case SocketMode::Tls:
×
811
                http::async_write_some(
812
                        stream_->next_layer(), *request_data_.http_request_serializer_, handler);
813
                break;
814
        case SocketMode::Plain:
2,118✔
815
                http::async_write_some(
816
                        stream_->next_layer().next_layer(), *request_data_.http_request_serializer_, handler);
817
                break;
818
        }
819
}
2,118✔
820

821
void Client::ReadHeader() {
218✔
822
        auto &cancelled = cancelled_;
823
        auto &response_data = response_data_;
218✔
824

825
        auto handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
217✔
826
                if (!*cancelled) {
217✔
827
                        ReadHeaderHandler(ec, num_read);
217✔
828
                }
829
        };
436✔
830

831
        switch (socket_mode_) {
218✔
832
        case SocketMode::TlsTls:
1✔
833
                http::async_read_some(
1✔
834
                        *stream_,
835
                        *response_data_.response_buffer_,
836
                        *response_data_.http_response_parser_,
837
                        handler);
838
                break;
839
        case SocketMode::Tls:
10✔
840
                http::async_read_some(
10✔
841
                        stream_->next_layer(),
842
                        *response_data_.response_buffer_,
843
                        *response_data_.http_response_parser_,
844
                        handler);
845
                break;
846
        case SocketMode::Plain:
207✔
847
                http::async_read_some(
207✔
848
                        stream_->next_layer().next_layer(),
849
                        *response_data_.response_buffer_,
850
                        *response_data_.http_response_parser_,
851
                        handler);
852
                break;
853
        }
854
}
218✔
855

856
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
217✔
857
        if (num_read > 0) {
217✔
858
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
424✔
859
        }
860

861
        if (ec) {
217✔
862
                CallErrorHandler(ec, request_, header_handler_);
5✔
863
                return;
75✔
864
        }
865

866
        if (!response_data_.http_response_parser_->is_header_done()) {
212✔
867
                ReadHeader();
×
868
                return;
×
869
        }
870

871
        auto content_length = response_data_.http_response_parser_->content_length();
212✔
872
        if (content_length) {
212✔
873
                response_body_length_ = content_length.value();
175✔
874
        } else {
875
                response_body_length_ = 0;
37✔
876
        }
877

878
        if (secondary_req_) {
212✔
879
                HandleSecondaryRequest();
5✔
880
                return;
5✔
881
        }
882

883
        response_.reset(new IncomingResponse(*this, cancelled_));
414✔
884
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
207✔
885
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
207✔
886

887
        logger_.Debug(
414✔
888
                "Received response: " + to_string(response_->status_code_) + " "
414✔
889
                + response_->status_message_);
621✔
890

891
        string debug_str;
892
        for (auto header = response_data_.http_response_parser_->get().cbegin();
243✔
893
                 header != response_data_.http_response_parser_->get().cend();
450✔
894
                 header++) {
895
                response_->headers_[string {header->name_string()}] = string {header->value()};
729✔
896
                if (logger_.Level() >= log::LogLevel::Debug) {
243✔
897
                        debug_str += string {header->name_string()};
230✔
898
                        debug_str += ": ";
230✔
899
                        debug_str += string {header->value()};
230✔
900
                        debug_str += "\n";
230✔
901
                }
902
        }
903

904
        logger_.Debug("Received headers:\n" + debug_str);
414✔
905
        debug_str.clear();
906

907
        if (response_data_.http_response_parser_->chunked()) {
207✔
908
                auto cancelled = cancelled_;
909
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
910
                CallHandler(header_handler_);
2✔
911
                if (!*cancelled) {
1✔
912
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
913
                        CallErrorHandler(err, request_, body_handler_);
2✔
914
                }
915
                return;
916
        }
917

918
        response_body_read_ = 0;
206✔
919

920
        if (response_body_read_ >= response_body_length_) {
206✔
921
                auto cancelled = cancelled_;
922
                status_ = TransactionStatus::HeaderHandlerCalled;
45✔
923
                CallHandler(header_handler_);
90✔
924
                if (!*cancelled) {
45✔
925
                        status_ = TransactionStatus::Done;
39✔
926
                        CallHandler(body_handler_);
78✔
927

928
                        // After body handler has run, set the request to cancelled. The body
929
                        // handler may have made a new request, so this is not necessarily the same
930
                        // request as is currently active (note use of shared_ptr copy, not
931
                        // `cancelled_`).
932
                        *cancelled = true;
39✔
933
                }
934
                return;
935
        }
936

937
        auto cancelled = cancelled_;
938
        status_ = TransactionStatus::HeaderHandlerCalled;
161✔
939
        CallHandler(header_handler_);
322✔
940
        if (*cancelled) {
161✔
941
                return;
942
        }
943

944
        // We know that a body reader is required here, because of the `response_body_read_ >=
945
        // response_body_length_` check above.
946
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
142✔
947
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
4✔
948
        }
949
}
950

951
void Client::HandleSecondaryRequest() {
5✔
952
        logger_.Debug(
10✔
953
                "Received proxy response: "
954
                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
10✔
955
                + string {response_data_.http_response_parser_->get().reason()});
20✔
956

957
        request_ = std::move(secondary_req_);
958

959
        if (response_data_.http_response_parser_->get().result_int() != StatusOK) {
5✔
960
                auto err = MakeError(
961
                        ProxyError,
962
                        "Proxy returned unexpected response: "
963
                                + to_string(response_data_.http_response_parser_->get().result_int()) + " "
4✔
964
                                + string {response_data_.http_response_parser_->get().reason()});
6✔
965
                CallErrorHandler(err, request_, header_handler_);
4✔
966
                return;
967
        }
968

969
        if (response_body_length_ != 0 || response_data_.http_response_parser_->chunked()) {
3✔
970
                auto err = MakeError(ProxyError, "Body not allowed in proxy response");
×
971
                CallErrorHandler(err, request_, header_handler_);
×
972
                return;
973
        }
974

975
        // We are connected. Now repeat the request cycle with the original request. Pretend
976
        // we were just connected.
977

978
        assert(request_->GetProtocol() == "https");
979

980
        // Make sure that no data is "lost" inside the buffering mechanism, since when switching to
981
        // a different layer, this will get out of sync.
982
        assert(response_data_.response_buffer_->size() == 0);
983

984
        switch (socket_mode_) {
3✔
985
        case SocketMode::TlsTls:
×
986
                // Should never get here, because this is the only place where TlsTls mode
987
                // is supposed to be turned on.
988
                assert(false);
989
                CallErrorHandler(
×
990
                        error::MakeError(
×
991
                                error::ProgrammingError,
992
                                "Any other mode than Tls is not valid when handling secondary request"),
×
993
                        request_,
×
994
                        header_handler_);
×
995
                break;
×
996
        case SocketMode::Tls:
1✔
997
                // Upgrade to TLS inside TLS.
998
                socket_mode_ = SocketMode::TlsTls;
1✔
999
                HandshakeHandler(*stream_, error_code {}, stream_->lowest_layer().remote_endpoint());
1✔
1000
                break;
1✔
1001
        case SocketMode::Plain:
2✔
1002
                // Upgrade to TLS.
1003
                socket_mode_ = SocketMode::Tls;
2✔
1004
                HandshakeHandler(
2✔
1005
                        stream_->next_layer(), error_code {}, stream_->lowest_layer().remote_endpoint());
2✔
1006
                break;
2✔
1007
        }
1008
}
1009

1010
void Client::AsyncReadNextBodyPart(
3,801✔
1011
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1012
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1013

1014
        if (status_ == TransactionStatus::ReaderCreated) {
3,801✔
1015
                status_ = TransactionStatus::BodyReadingInProgress;
140✔
1016
        }
1017

1018
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
3,801✔
1019
                auto cancelled = cancelled_;
1020
                handler(0);
72✔
1021
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
36✔
1022
                        status_ = TransactionStatus::Done;
36✔
1023
                        CallHandler(body_handler_);
72✔
1024

1025
                        // After body handler has run, set the request to cancelled. The body
1026
                        // handler may have made a new request, so this is not necessarily the same
1027
                        // request as is currently active (note use of shared_ptr copy, not
1028
                        // `cancelled_`).
1029
                        *cancelled = true;
36✔
1030
                }
1031
                return;
1032
        }
1033

1034
        reader_buf_start_ = start;
3,765✔
1035
        reader_buf_end_ = end;
3,765✔
1036
        reader_handler_ = handler;
3,765✔
1037
        size_t read_size = end - start;
3,765✔
1038
        size_t smallest = min(body_buffer_.size(), read_size);
5,878✔
1039

1040
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
3,765✔
1041
        response_data_.http_response_parser_->get().body().size = smallest;
3,765✔
1042

1043
        auto &cancelled = cancelled_;
1044
        auto &response_data = response_data_;
3,765✔
1045

1046
        auto async_handler = [this, cancelled, response_data](const error_code &ec, size_t num_read) {
3,764✔
1047
                if (!*cancelled) {
3,764✔
1048
                        ReadBodyHandler(ec, num_read);
3,764✔
1049
                }
1050
        };
7,530✔
1051

1052
        switch (socket_mode_) {
3,765✔
1053
        case SocketMode::TlsTls:
1✔
1054
                http::async_read_some(
1✔
1055
                        *stream_,
1056
                        *response_data_.response_buffer_,
1057
                        *response_data_.http_response_parser_,
1058
                        async_handler);
1059
                break;
1060
        case SocketMode::Tls:
2✔
1061
                http::async_read_some(
2✔
1062
                        stream_->next_layer(),
1063
                        *response_data_.response_buffer_,
1064
                        *response_data_.http_response_parser_,
1065
                        async_handler);
1066
                break;
1067
        case SocketMode::Plain:
3,762✔
1068
                http::async_read_some(
3,762✔
1069
                        stream_->next_layer().next_layer(),
1070
                        *response_data_.response_buffer_,
1071
                        *response_data_.http_response_parser_,
1072
                        async_handler);
1073
                break;
1074
        }
1075
}
1076

1077
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
3,764✔
1078
        if (num_read > 0) {
3,764✔
1079
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
7,430✔
1080
                response_body_read_ += num_read;
3,715✔
1081
        }
1082

1083
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,764✔
1084
                // This can be ignored. We always reset the buffer between reads anyway.
1085
                ec = error_code();
1,958✔
1086
        }
1087

1088
        assert(reader_handler_);
1089

1090
        if (response_body_read_ >= response_body_length_) {
3,764✔
1091
                status_ = TransactionStatus::BodyReadingFinished;
88✔
1092
        }
1093

1094
        auto cancelled = cancelled_;
1095

1096
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
3,764✔
1097
        size_t smallest = min(num_read, buf_size);
3,764✔
1098
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,764✔
1099
        if (ec) {
3,764✔
1100
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
98✔
1101
                reader_handler_(expected::unexpected(err));
147✔
1102
        } else {
1103
                reader_handler_(smallest);
7,430✔
1104
        }
1105

1106
        if (!*cancelled && ec) {
3,764✔
1107
                CallErrorHandler(ec, request_, body_handler_);
4✔
1108
                return;
1109
        }
1110
}
1111

1112
void Client::Cancel() {
195✔
1113
        auto cancelled = cancelled_;
1114

1115
        if (!*cancelled) {
195✔
1116
                auto err =
1117
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
246✔
1118
                switch (status_) {
123✔
1119
                case TransactionStatus::None:
1✔
1120
                        CallErrorHandler(err, request_, header_handler_);
1✔
1121
                        break;
1✔
1122
                case TransactionStatus::HeaderHandlerCalled:
120✔
1123
                case TransactionStatus::ReaderCreated:
1124
                case TransactionStatus::BodyReadingInProgress:
1125
                case TransactionStatus::BodyReadingFinished:
1126
                        CallErrorHandler(err, request_, body_handler_);
120✔
1127
                        break;
120✔
1128
                case TransactionStatus::Replying:
1129
                case TransactionStatus::SwitchingProtocol:
1130
                        // Not used by client.
1131
                        assert(false);
1132
                        break;
1133
                case TransactionStatus::BodyHandlerCalled:
1134
                case TransactionStatus::Done:
1135
                        break;
1136
                }
1137
        }
1138

1139
        if (!*cancelled) {
195✔
1140
                DoCancel();
2✔
1141
        }
1142
}
195✔
1143

1144
void Client::DoCancel() {
334✔
1145
        resolver_.cancel();
334✔
1146
        if (stream_) {
334✔
1147
                stream_->lowest_layer().cancel();
66✔
1148
                stream_->lowest_layer().close();
66✔
1149
                stream_.reset();
66✔
1150
        }
1151

1152
        request_.reset();
334✔
1153
        response_.reset();
334✔
1154

1155
        // Reset logger to no connection.
1156
        logger_ = log::Logger(logger_name_);
334✔
1157

1158
        // Set cancel state and then make a new one. Those who are interested should have their own
1159
        // pointer to the old one.
1160
        *cancelled_ = true;
334✔
1161
        cancelled_ = make_shared<bool>(true);
334✔
1162
}
334✔
1163

1164
Stream::Stream(Server &server) :
418✔
1165
        server_ {server},
1166
        logger_ {"http"},
1167
        cancelled_(make_shared<bool>(true)),
418✔
1168
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
418✔
1169
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,254✔
1170
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
836✔
1171

1172
        // This is equivalent to:
1173
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
1174
        // but compatible with Boost 1.67.
1175
        request_data_.request_buffer_->prepare(
1176
                body_buffer_.size() - request_data_.request_buffer_->size());
418✔
1177

1178
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
836✔
1179

1180
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
1181
        // they do, they should be handled higher up in the application logic.
1182
        //
1183
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
1184
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
1185
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
1186
        // possible value instead.
1187
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
1188
}
418✔
1189

1190
Stream::~Stream() {
1,254✔
1191
        DoCancel();
418✔
1192
}
418✔
1193

1194
void Stream::Cancel() {
7✔
1195
        auto cancelled = cancelled_;
1196

1197
        if (!*cancelled) {
7✔
1198
                auto err =
1199
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
1200
                switch (status_) {
7✔
1201
                case TransactionStatus::None:
×
1202
                        CallErrorHandler(err, request_, server_.header_handler_);
×
1203
                        break;
×
1204
                case TransactionStatus::HeaderHandlerCalled:
5✔
1205
                case TransactionStatus::ReaderCreated:
1206
                case TransactionStatus::BodyReadingInProgress:
1207
                case TransactionStatus::BodyReadingFinished:
1208
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
1209
                        break;
5✔
1210
                case TransactionStatus::BodyHandlerCalled:
×
1211
                        // In between body handler and reply finished. No one to handle the status
1212
                        // here.
1213
                        server_.RemoveStream(shared_from_this());
×
1214
                        break;
×
1215
                case TransactionStatus::Replying:
1✔
1216
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1217
                        break;
1✔
1218
                case TransactionStatus::SwitchingProtocol:
1✔
1219
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
1220
                        break;
1✔
1221
                case TransactionStatus::Done:
1222
                        break;
1223
                }
1224
        }
1225

1226
        if (!*cancelled) {
7✔
1227
                DoCancel();
×
1228
        }
1229
}
7✔
1230

1231
void Stream::DoCancel() {
593✔
1232
        if (socket_.is_open()) {
593✔
1233
                socket_.cancel();
204✔
1234
                socket_.close();
204✔
1235
        }
1236

1237
        // Set cancel state and then make a new one. Those who are interested should have their own
1238
        // pointer to the old one.
1239
        *cancelled_ = true;
593✔
1240
        cancelled_ = make_shared<bool>(true);
593✔
1241
}
593✔
1242

1243
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1244
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1245
}
×
1246

1247
void Stream::CallErrorHandler(
×
1248
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1249
        *cancelled_ = true;
×
1250
        cancelled_ = make_shared<bool>(true);
×
1251
        status_ = TransactionStatus::Done;
×
1252
        handler(expected::unexpected(err.WithContext(
×
1253
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1254

1255
        server_.RemoveStream(shared_from_this());
×
1256
}
×
1257

1258
void Stream::CallErrorHandler(
2✔
1259
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1260
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1261
}
2✔
1262

1263
void Stream::CallErrorHandler(
9✔
1264
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1265
        *cancelled_ = true;
9✔
1266
        cancelled_ = make_shared<bool>(true);
9✔
1267
        status_ = TransactionStatus::Done;
9✔
1268
        handler(
9✔
1269
                req,
1270
                err.WithContext(
9✔
1271
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
27✔
1272

1273
        server_.RemoveStream(shared_from_this());
9✔
1274
}
9✔
1275

1276
void Stream::CallErrorHandler(
4✔
1277
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1278
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1279
}
4✔
1280

1281
void Stream::CallErrorHandler(
7✔
1282
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1283
        *cancelled_ = true;
7✔
1284
        cancelled_ = make_shared<bool>(true);
7✔
1285
        status_ = TransactionStatus::Done;
7✔
1286
        handler(err.WithContext(
14✔
1287
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1288

1289
        server_.RemoveStream(shared_from_this());
7✔
1290
}
7✔
1291

1292
void Stream::CallErrorHandler(
×
1293
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1294
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1295
}
×
1296

1297
void Stream::CallErrorHandler(
1✔
1298
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1299
        *cancelled_ = true;
1✔
1300
        cancelled_ = make_shared<bool>(true);
1✔
1301
        status_ = TransactionStatus::Done;
1✔
1302
        handler(expected::unexpected(err.WithContext(
2✔
1303
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1304

1305
        server_.RemoveStream(shared_from_this());
1✔
1306
}
1✔
1307

1308
void Stream::AcceptHandler(const error_code &ec) {
212✔
1309
        if (ec) {
212✔
1310
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1311
                return;
×
1312
        }
1313

1314
        auto ip = socket_.remote_endpoint().address().to_string();
424✔
1315

1316
        // Use IP as context for logging.
1317
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
212✔
1318

1319
        logger_.Debug("Accepted connection.");
424✔
1320

1321
        request_.reset(new IncomingRequest(*this, cancelled_));
424✔
1322

1323
        request_->address_.host = ip;
212✔
1324

1325
        *cancelled_ = false;
1326

1327
        ReadHeader();
212✔
1328
}
1329

1330
void Stream::ReadHeader() {
212✔
1331
        auto &cancelled = cancelled_;
1332
        auto &request_data = request_data_;
212✔
1333

1334
        http::async_read_some(
424✔
1335
                socket_,
212✔
1336
                *request_data_.request_buffer_,
1337
                *request_data_.http_request_parser_,
1338
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
212✔
1339
                        if (!*cancelled) {
212✔
1340
                                ReadHeaderHandler(ec, num_read);
212✔
1341
                        }
1342
                });
212✔
1343
}
212✔
1344

1345
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
212✔
1346
        if (num_read > 0) {
212✔
1347
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
424✔
1348
        }
1349

1350
        if (ec) {
212✔
1351
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1352
                return;
175✔
1353
        }
1354

1355
        if (!request_data_.http_request_parser_->is_header_done()) {
212✔
1356
                ReadHeader();
×
1357
                return;
×
1358
        }
1359

1360
        auto method_result = BeastVerbToMethod(
1361
                request_data_.http_request_parser_->get().base().method(),
1362
                string {request_data_.http_request_parser_->get().base().method_string()});
424✔
1363
        if (!method_result) {
212✔
1364
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1365
                return;
×
1366
        }
1367
        request_->method_ = method_result.value();
212✔
1368
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
212✔
1369

1370
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
212✔
1371

1372
        string debug_str;
1373
        for (auto header = request_data_.http_request_parser_->get().cbegin();
366✔
1374
                 header != request_data_.http_request_parser_->get().cend();
578✔
1375
                 header++) {
1376
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,098✔
1377
                if (logger_.Level() >= log::LogLevel::Debug) {
366✔
1378
                        debug_str += string {header->name_string()};
313✔
1379
                        debug_str += ": ";
313✔
1380
                        debug_str += string {header->value()};
313✔
1381
                        debug_str += "\n";
313✔
1382
                }
1383
        }
1384

1385
        logger_.Debug("Received headers:\n" + debug_str);
424✔
1386
        debug_str.clear();
1387

1388
        if (request_data_.http_request_parser_->chunked()) {
212✔
1389
                auto cancelled = cancelled_;
1390
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
1391
                server_.header_handler_(request_);
2✔
1392
                if (!*cancelled) {
1✔
1393
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
1394
                        CallErrorHandler(err, request_, server_.body_handler_);
2✔
1395
                }
1396
                return;
1397
        }
1398

1399
        auto content_length = request_data_.http_request_parser_->content_length();
211✔
1400
        if (content_length) {
211✔
1401
                request_body_length_ = content_length.value();
38✔
1402
        } else {
1403
                request_body_length_ = 0;
173✔
1404
        }
1405
        request_body_read_ = 0;
211✔
1406

1407
        if (request_body_read_ >= request_body_length_) {
211✔
1408
                auto cancelled = cancelled_;
1409
                status_ = TransactionStatus::HeaderHandlerCalled;
173✔
1410
                server_.header_handler_(request_);
346✔
1411
                if (!*cancelled) {
173✔
1412
                        status_ = TransactionStatus::BodyHandlerCalled;
173✔
1413
                        CallBodyHandler();
173✔
1414
                }
1415
                return;
1416
        }
1417

1418
        auto cancelled = cancelled_;
1419
        status_ = TransactionStatus::HeaderHandlerCalled;
38✔
1420
        server_.header_handler_(request_);
76✔
1421
        if (*cancelled) {
38✔
1422
                return;
1423
        }
1424

1425
        // We know that a body reader is required here, because of the `request_body_read_ >=
1426
        // request_body_length_` check above.
1427
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
37✔
1428
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1429
        }
1430
}
1431

1432
void Stream::AsyncReadNextBodyPart(
2,030✔
1433
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1434
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1435

1436
        if (status_ == TransactionStatus::ReaderCreated) {
2,030✔
1437
                status_ = TransactionStatus::BodyReadingInProgress;
36✔
1438
        }
1439

1440
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,030✔
1441
                auto cancelled = cancelled_;
1442
                handler(0);
60✔
1443
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
30✔
1444
                        status_ = TransactionStatus::BodyHandlerCalled;
30✔
1445
                        CallBodyHandler();
30✔
1446
                }
1447
                return;
1448
        }
1449

1450
        reader_buf_start_ = start;
2,000✔
1451
        reader_buf_end_ = end;
2,000✔
1452
        reader_handler_ = handler;
2,000✔
1453
        size_t read_size = end - start;
2,000✔
1454
        size_t smallest = min(body_buffer_.size(), read_size);
3,056✔
1455

1456
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,000✔
1457
        request_data_.http_request_parser_->get().body().size = smallest;
2,000✔
1458

1459
        auto &cancelled = cancelled_;
1460
        auto &request_data = request_data_;
2,000✔
1461

1462
        http::async_read_some(
4,000✔
1463
                socket_,
2,000✔
1464
                *request_data_.request_buffer_,
1465
                *request_data_.http_request_parser_,
1466
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,000✔
1467
                        if (!*cancelled) {
2,000✔
1468
                                ReadBodyHandler(ec, num_read);
2,000✔
1469
                        }
1470
                });
2,000✔
1471
}
1472

1473
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,000✔
1474
        if (num_read > 0) {
2,000✔
1475
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
3,992✔
1476
                request_body_read_ += num_read;
1,996✔
1477
        }
1478

1479
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,000✔
1480
                // This can be ignored. We always reset the buffer between reads anyway.
1481
                ec = error_code();
979✔
1482
        }
1483

1484
        assert(reader_handler_);
1485

1486
        if (request_body_read_ >= request_body_length_) {
2,000✔
1487
                status_ = TransactionStatus::BodyReadingFinished;
30✔
1488
        }
1489

1490
        auto cancelled = cancelled_;
1491

1492
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,000✔
1493
        size_t smallest = min(num_read, buf_size);
2,000✔
1494
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,000✔
1495
        if (ec) {
2,000✔
1496
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1497
                reader_handler_(expected::unexpected(err));
12✔
1498
        } else {
1499
                reader_handler_(smallest);
3,992✔
1500
        }
1501

1502
        if (!*cancelled && ec) {
2,000✔
1503
                CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1504
                return;
1505
        }
1506
}
1507

1508
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
190✔
1509
        SetupResponse();
190✔
1510

1511
        reply_finished_handler_ = reply_finished_handler;
190✔
1512

1513
        auto &cancelled = cancelled_;
1514
        auto &response_data = response_data_;
190✔
1515

1516
        http::async_write_header(
380✔
1517
                socket_,
190✔
1518
                *response_data_.http_response_serializer_,
1519
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
190✔
1520
                        if (!*cancelled) {
190✔
1521
                                WriteHeaderHandler(ec, num_written);
189✔
1522
                        }
1523
                });
190✔
1524
}
190✔
1525

1526
void Stream::SetupResponse() {
199✔
1527
        auto response = maybe_response_.lock();
199✔
1528
        // Only called from existing responses, so this should always be true.
1529
        assert(response);
1530

1531
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1532
        status_ = TransactionStatus::Replying;
199✔
1533

1534
        // From here on we take shared ownership.
1535
        response_ = response;
1536

1537
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
398✔
1538

1539
        for (const auto &header : response->headers_) {
417✔
1540
                response_data_.http_response_->base().set(header.first, header.second);
218✔
1541
        }
1542

1543
        response_data_.http_response_->result(response->GetStatusCode());
199✔
1544
        response_data_.http_response_->reason(response->GetStatusMessage());
398✔
1545

1546
        response_data_.http_response_serializer_ =
1547
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
398✔
1548
}
199✔
1549

1550
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
189✔
1551
        if (num_written > 0) {
189✔
1552
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
378✔
1553
        }
1554

1555
        if (ec) {
189✔
1556
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1557
                return;
34✔
1558
        }
1559

1560
        auto header = response_->GetHeader("Content-Length");
378✔
1561
        if (!header || header.value() == "0") {
189✔
1562
                FinishReply();
33✔
1563
                return;
1564
        }
1565

1566
        auto length = common::StringToLongLong(header.value());
156✔
1567
        if (!length || length.value() < 0) {
156✔
1568
                auto err = error::Error(
1569
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1570
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1571
                return;
1572
        }
1573

1574
        if (!response_->body_reader_ && !response_->async_body_reader_) {
156✔
1575
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1576
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1577
                return;
1578
        }
1579

1580
        PrepareAndWriteNewBodyBuffer();
155✔
1581
}
1582

1583
void Stream::PrepareAndWriteNewBodyBuffer() {
1,871✔
1584
        // response_->body_reader_ XOR response_->async_body_reader_
1585
        assert(
1586
                (response_->body_reader_ || response_->async_body_reader_)
1587
                && !(response_->body_reader_ && response_->async_body_reader_));
1588

1589
        auto read_handler = [this](io::ExpectedSize read) {
1,872✔
1590
                if (!read) {
1,871✔
1591
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1592
                        return;
1✔
1593
                }
1594
                WriteNewBodyBuffer(read.value());
1,870✔
1595
        };
1,871✔
1596

1597
        if (response_->body_reader_) {
1,871✔
1598
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,194✔
1599
        } else {
1600
                auto err = response_->async_body_reader_->AsyncRead(
1601
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1602
                if (err != error::NoError) {
274✔
1603
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1604
                }
1605
        }
1606
}
1,871✔
1607

1608
void Stream::WriteNewBodyBuffer(size_t size) {
1,870✔
1609
        response_data_.http_response_->body().data = body_buffer_.data();
1,870✔
1610
        response_data_.http_response_->body().size = size;
1,870✔
1611

1612
        if (size > 0) {
1,870✔
1613
                response_data_.http_response_->body().more = true;
1,750✔
1614
        } else {
1615
                response_data_.http_response_->body().more = false;
120✔
1616
        }
1617

1618
        WriteBody();
1,870✔
1619
}
1,870✔
1620

1621
void Stream::WriteBody() {
3,599✔
1622
        auto &cancelled = cancelled_;
1623
        auto &response_data = response_data_;
3,599✔
1624

1625
        http::async_write_some(
7,198✔
1626
                socket_,
3,599✔
1627
                *response_data_.http_response_serializer_,
1628
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
3,561✔
1629
                        if (!*cancelled) {
3,561✔
1630
                                WriteBodyHandler(ec, num_written);
3,561✔
1631
                        }
1632
                });
3,561✔
1633
}
3,599✔
1634

1635
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
3,561✔
1636
        if (num_written > 0) {
3,561✔
1637
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,458✔
1638
        }
1639

1640
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,561✔
1641
                // Write next body block.
1642
                PrepareAndWriteNewBodyBuffer();
1,716✔
1643
        } else if (ec) {
1,845✔
1644
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1645
        } else if (num_written > 0) {
1,841✔
1646
                // We are still writing the body.
1647
                WriteBody();
1,729✔
1648
        } else {
1649
                // We are finished.
1650
                FinishReply();
112✔
1651
        }
1652
}
3,561✔
1653

1654
void Stream::FinishReply() {
145✔
1655
        // We are done.
1656
        *cancelled_ = true;
145✔
1657
        cancelled_ = make_shared<bool>(true);
145✔
1658
        status_ = TransactionStatus::Done;
145✔
1659
        // Release ownership of Body reader.
1660
        response_->body_reader_.reset();
145✔
1661
        response_->async_body_reader_.reset();
145✔
1662
        reply_finished_handler_(error::NoError);
145✔
1663
        server_.RemoveStream(shared_from_this());
145✔
1664
}
145✔
1665

1666
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1667
        SetupResponse();
9✔
1668

1669
        switch_protocol_handler_ = handler;
9✔
1670
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1671

1672
        auto &cancelled = cancelled_;
1673
        auto &response_data = response_data_;
9✔
1674

1675
        http::async_write_header(
18✔
1676
                socket_,
9✔
1677
                *response_data_.http_response_serializer_,
1678
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1679
                        if (!*cancelled) {
9✔
1680
                                SwitchingProtocolHandler(ec, num_written);
8✔
1681
                        }
1682
                });
9✔
1683

1684
        return error::NoError;
9✔
1685
}
1686

1687
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1688
        if (num_written > 0) {
8✔
1689
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1690
        }
1691

1692
        if (ec) {
8✔
1693
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1694
                return;
×
1695
        }
1696

1697
        auto socket = make_shared<RawSocket<tcp::socket>>(
1698
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1699

1700
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1701

1702
        // Rest of the connection is done directly on the socket, we are done here.
1703
        status_ = TransactionStatus::Done;
8✔
1704
        *cancelled_ = true;
8✔
1705
        cancelled_ = make_shared<bool>(true);
8✔
1706
        server_.RemoveStream(shared_from_this());
16✔
1707

1708
        switch_protocol_handler(socket);
16✔
1709
}
1710

1711
void Stream::CallBodyHandler() {
203✔
1712
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1713
        // it immediately destroys, which would destroy this stream as well. At the end of this
1714
        // function, it's ok to destroy it.
1715
        auto stream_ref = shared_from_this();
1716

1717
        server_.body_handler_(request_, error::NoError);
609✔
1718

1719
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1720
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1721
        // request has not been handled correctly.
1722
        auto response = maybe_response_.lock();
203✔
1723
        if (!response) {
203✔
1724
                logger_.Error("Handler produced no response. Closing stream prematurely.");
4✔
1725
                *cancelled_ = true;
2✔
1726
                cancelled_ = make_shared<bool>(true);
2✔
1727
                server_.RemoveStream(shared_from_this());
6✔
1728
        }
1729
}
203✔
1730

1731
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
226✔
1732
        event_loop_ {event_loop},
1733
        acceptor_(GetAsioIoContext(event_loop_)) {
406✔
1734
}
226✔
1735

1736
Server::~Server() {
452✔
1737
        Cancel();
226✔
1738
}
226✔
1739

1740
error::Error Server::AsyncServeUrl(
194✔
1741
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1742
        return AsyncServeUrl(
1743
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
792✔
1744
                        if (err != error::NoError) {
200✔
1745
                                body_handler(expected::unexpected(err));
14✔
1746
                        } else {
1747
                                body_handler(req);
386✔
1748
                        }
1749
                });
588✔
1750
}
1751

1752
error::Error Server::AsyncServeUrl(
207✔
1753
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1754
        auto err = BreakDownUrl(url, address_);
207✔
1755
        if (error::NoError != err) {
207✔
1756
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1757
        }
1758

1759
        if (address_.protocol != "http") {
207✔
1760
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1761
        }
1762

1763
        if (address_.path.size() > 0 && address_.path != "/") {
207✔
1764
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1765
        }
1766

1767
        boost::system::error_code ec;
206✔
1768
        auto address = asio::ip::make_address(address_.host, ec);
206✔
1769
        if (ec) {
206✔
1770
                return error::Error(
1771
                        ec.default_error_condition(),
×
1772
                        "Could not construct endpoint from address " + address_.host);
×
1773
        }
1774

1775
        asio::ip::tcp::endpoint endpoint(address, address_.port);
206✔
1776

1777
        ec.clear();
1778
        acceptor_.open(endpoint.protocol(), ec);
206✔
1779
        if (ec) {
206✔
1780
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1781
        }
1782

1783
        // Allow address reuse, otherwise we can't re-bind later.
1784
        ec.clear();
1785
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
206✔
1786
        if (ec) {
206✔
1787
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1788
        }
1789

1790
        ec.clear();
1791
        acceptor_.bind(endpoint, ec);
206✔
1792
        if (ec) {
206✔
1793
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1794
        }
1795

1796
        ec.clear();
1797
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
206✔
1798
        if (ec) {
206✔
1799
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1800
        }
1801

1802
        header_handler_ = header_handler;
206✔
1803
        body_handler_ = body_handler;
206✔
1804

1805
        PrepareNewStream();
206✔
1806

1807
        return error::NoError;
206✔
1808
}
1809

1810
void Server::Cancel() {
241✔
1811
        if (acceptor_.is_open()) {
241✔
1812
                acceptor_.cancel();
206✔
1813
                acceptor_.close();
206✔
1814
        }
1815
        streams_.clear();
1816
}
241✔
1817

1818
uint16_t Server::GetPort() const {
13✔
1819
        return acceptor_.local_endpoint().port();
13✔
1820
}
1821

1822
string Server::GetUrl() const {
12✔
1823
        return "http://127.0.0.1:" + to_string(GetPort());
24✔
1824
}
1825

1826
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
202✔
1827
        if (*req->cancelled_) {
202✔
1828
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1829
        }
1830
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
404✔
1831
        req->stream_.maybe_response_ = response;
202✔
1832
        return response;
202✔
1833
}
1834

1835
error::Error Server::AsyncReply(
190✔
1836
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1837
        if (*resp->cancelled_) {
190✔
1838
                return MakeError(StreamCancelledError, "Cannot send response");
×
1839
        }
1840

1841
        resp->stream_.AsyncReply(reply_finished_handler);
190✔
1842
        return error::NoError;
190✔
1843
}
1844

1845
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
52✔
1846
        if (*req->cancelled_) {
52✔
1847
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1848
        }
1849

1850
        auto &stream = req->stream_;
52✔
1851
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
52✔
1852
                return expected::unexpected(error::Error(
1✔
1853
                        make_error_condition(errc::operation_in_progress),
2✔
1854
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1855
        }
1856

1857
        if (stream.request_body_length_ == 0) {
51✔
1858
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
45✔
1859
        }
1860

1861
        stream.status_ = TransactionStatus::ReaderCreated;
36✔
1862
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
72✔
1863
}
1864

1865
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1866
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1867
}
1868

1869
void Server::PrepareNewStream() {
418✔
1870
        StreamPtr new_stream {new Stream(*this)};
418✔
1871
        streams_.insert(new_stream);
1872
        AsyncAccept(new_stream);
836✔
1873
}
418✔
1874

1875
void Server::AsyncAccept(StreamPtr stream) {
418✔
1876
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
633✔
1877
                if (ec) {
215✔
1878
                        log::Error("Could not accept connection: " + ec.message());
6✔
1879
                        return;
3✔
1880
                }
1881

1882
                stream->AcceptHandler(ec);
212✔
1883

1884
                this->PrepareNewStream();
212✔
1885
        });
1886
}
418✔
1887

1888
void Server::RemoveStream(StreamPtr stream) {
175✔
1889
        streams_.erase(stream);
175✔
1890

1891
        stream->DoCancel();
175✔
1892
}
175✔
1893

1894
} // namespace http
1895
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc