• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1033353997

11 Oct 2023 01:43PM UTC coverage: 79.99% (-0.2%) from 80.166%
1033353997

push

gitlab-ci

oleorhagen
style: Run clang-format on the whole repository

Signed-off-by: Ole Petter <ole.orhagen@northern.tech>

1 of 1 new or added line in 1 file covered. (100.0%)

6492 of 8116 relevant lines covered (79.99%)

9901.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24

25
namespace mender {
26
namespace http {
27

28
namespace common = mender::common;
29

30
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
31
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
32
const unsigned int BeastHttpVersion = 11;
33

34
namespace asio = boost::asio;
35
namespace http = boost::beast::http;
36

37
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
38

39
static http::verb MethodToBeastVerb(Method method) {
223✔
40
        switch (method) {
223✔
41
        case Method::GET:
42
                return http::verb::get;
43
        case Method::HEAD:
44
                return http::verb::head;
45
        case Method::POST:
46
                return http::verb::post;
47
        case Method::PUT:
48
                return http::verb::put;
49
        case Method::PATCH:
50
                return http::verb::patch;
51
        case Method::CONNECT:
52
                return http::verb::connect;
53
        case Method::Invalid:
54
                // Fallthrough to end (no-op).
55
                break;
56
        }
57
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
58
        // still assert here for safety.
59
        assert(false);
60
        return http::verb::get;
61
}
62

63
static expected::expected<Method, error::Error> BeastVerbToMethod(
218✔
64
        http::verb verb, const string &verb_string) {
65
        switch (verb) {
218✔
66
        case http::verb::get:
176✔
67
                return Method::GET;
68
        case http::verb::head:
×
69
                return Method::HEAD;
70
        case http::verb::post:
20✔
71
                return Method::POST;
72
        case http::verb::put:
22✔
73
                return Method::PUT;
74
        case http::verb::patch:
×
75
                return Method::PATCH;
76
        case http::verb::connect:
×
77
                return Method::CONNECT;
78
        default:
×
79
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
80
        }
81
}
82

83
template <typename StreamType>
84
class BodyAsyncReader : virtual public io::AsyncReader {
85
public:
86
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
145✔
87
                stream_ {stream},
88
                cancelled_ {cancelled} {
290✔
89
        }
145✔
90
        ~BodyAsyncReader() {
44✔
91
                Cancel();
44✔
92
        }
88✔
93

94
        error::Error AsyncRead(
2,046✔
95
                vector<uint8_t>::iterator start,
96
                vector<uint8_t>::iterator end,
97
                io::AsyncIoHandler handler) override {
98
                if (*cancelled_) {
2,046✔
99
                        return error::MakeError(
×
100
                                error::ProgrammingError,
101
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
102
                }
103
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,046✔
104
                return error::NoError;
2,046✔
105
        }
106

107
        void Cancel() override {
46✔
108
                if (!*cancelled_) {
46✔
109
                        stream_.Cancel();
4✔
110
                }
111
        }
46✔
112

113
private:
114
        StreamType &stream_;
115
        shared_ptr<bool> cancelled_;
116

117
        friend class Client;
118
        friend class Server;
119
};
120

121
template <typename StreamType>
122
class RawSocket : virtual public io::AsyncReadWriter {
123
public:
124
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
125
                destroying_ {make_shared<bool>(false)},
×
126
                stream_ {stream},
127
                buffered_ {buffered} {
×
128
                // If there are no buffered bytes, then we don't need it.
129
                if (buffered_ && buffered_->size() == 0) {
×
130
                        buffered_.reset();
×
131
                }
132
        }
×
133

134
        ~RawSocket() {
15✔
135
                *destroying_ = true;
15✔
136
                Cancel();
15✔
137
        }
30✔
138

139
        error::Error AsyncRead(
320✔
140
                vector<uint8_t>::iterator start,
141
                vector<uint8_t>::iterator end,
142
                io::AsyncIoHandler handler) override {
143
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
144
                // header and parts of the body in one block, return those first.
145
                if (buffered_) {
320✔
146
                        return DrainPrebufferedData(start, end, handler);
8✔
147
                }
148

149
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
150
                auto &destroying = destroying_;
151
                stream_->async_read_some(
632✔
152
                        read_buffer_,
316✔
153
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
154
                                if (*destroying) {
313✔
155
                                        return;
156
                                }
157

158
                                if (ec == asio::error::operation_aborted) {
313✔
159
                                        handler(expected::unexpected(error::Error(
12✔
160
                                                make_error_condition(errc::operation_canceled),
6✔
161
                                                "Could not read from socket")));
162
                                } else if (ec) {
310✔
163
                                        handler(expected::unexpected(
12✔
164
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
165
                                } else {
166
                                        handler(num_read);
608✔
167
                                }
168
                        });
169
                return error::NoError;
316✔
170
        }
171

172
        error::Error AsyncWrite(
309✔
173
                vector<uint8_t>::const_iterator start,
174
                vector<uint8_t>::const_iterator end,
175
                io::AsyncIoHandler handler) override {
176
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
177
                auto &destroying = destroying_;
178
                stream_->async_write_some(
618✔
179
                        write_buffer_,
309✔
180
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
181
                                if (*destroying) {
306✔
182
                                        return;
183
                                }
184

185
                                if (ec == asio::error::operation_aborted) {
306✔
186
                                        handler(expected::unexpected(error::Error(
×
187
                                                make_error_condition(errc::operation_canceled),
×
188
                                                "Could not write to socket")));
189
                                } else if (ec) {
306✔
190
                                        handler(expected::unexpected(
×
191
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
192
                                } else {
193
                                        handler(num_written);
612✔
194
                                }
195
                        });
196
                return error::NoError;
309✔
197
        }
198

199
        void Cancel() override {
28✔
200
                if (stream_->lowest_layer().is_open()) {
28✔
201
                        stream_->lowest_layer().cancel();
15✔
202
                        stream_->lowest_layer().close();
15✔
203
                }
204
        }
28✔
205

206
private:
207
        error::Error DrainPrebufferedData(
4✔
208
                vector<uint8_t>::iterator start,
209
                vector<uint8_t>::iterator end,
210
                io::AsyncIoHandler handler) {
211
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
8✔
212
                copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
4✔
213
                buffered_->consume(to_copy);
4✔
214
                if (buffered_->size() == 0) {
4✔
215
                        // We don't need it anymore.
216
                        buffered_.reset();
4✔
217
                }
218
                handler(to_copy);
4✔
219
                return error::NoError;
4✔
220
        }
221

222
        shared_ptr<bool> destroying_;
223
        shared_ptr<StreamType> stream_;
224
        shared_ptr<beast::flat_buffer> buffered_;
225
        asio::mutable_buffer read_buffer_;
226
        asio::const_buffer write_buffer_;
227
};
228

229
Client::Client(
406✔
230
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
231
        event_loop_ {event_loop},
232
        logger_name_ {logger_name},
233
        cancelled_ {make_shared<bool>(true)},
406✔
234
        disable_keep_alive_ {client.disable_keep_alive},
406✔
235
        resolver_(GetAsioIoContext(event_loop)),
236
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,426✔
237
        ssl_ctx_.set_verify_mode(client.skip_verify ? ssl::verify_none : ssl::verify_peer);
759✔
238

239
        if (client.client_cert_path != "" and client.client_cert_key_path != "") {
406✔
240
                ssl_ctx_.set_options(boost::asio::ssl::context::default_workarounds);
1✔
241
                ssl_ctx_.use_certificate_file(client.client_cert_path, boost::asio::ssl::context_base::pem);
1✔
242
                ssl_ctx_.use_private_key_file(
1✔
243
                        client.client_cert_key_path, boost::asio::ssl::context_base::pem);
244
        }
245

246
        beast::error_code ec {};
406✔
247
        ssl_ctx_.set_default_verify_paths(ec); // Load the default CAs
406✔
248
        if (ec) {
406✔
249
                log::Error("Failed to load the SSL default directory");
×
250
        }
251
        if (client.server_cert_path != "") {
406✔
252
                ssl_ctx_.load_verify_file(client.server_cert_path, ec);
5✔
253
                if (ec) {
5✔
254
                        log::Error("Failed to load the server certificate!");
×
255
                }
256
        }
257
}
406✔
258

259
Client::~Client() {
1,624✔
260
        if (!*cancelled_) {
406✔
261
                logger_.Warning("Client destroyed while request is still active!");
26✔
262
        }
263
        DoCancel();
406✔
264
}
406✔
265

266
error::Error Client::AsyncCall(
241✔
267
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
268
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
241✔
269
                return error::Error(
270
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
271
        }
272

273
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
241✔
274
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
275
        }
276

277
        if (!header_handler || !body_handler) {
239✔
278
                return error::MakeError(
279
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
280
        }
281

282
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
238✔
283
                return error::Error(
284
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
1✔
285
        }
286

287
        if (req->address_.protocol == "https") {
237✔
288
                is_https_ = true;
6✔
289
        }
290

291
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
237✔
292

293
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
294
        // request to route to our k8s cluster. Set this in all cases.
295
        req->SetHeader("HOST", req->address_.host);
474✔
296

297
        request_ = req;
298
        header_handler_ = header_handler;
237✔
299
        body_handler_ = body_handler;
237✔
300
        status_ = TransactionStatus::None;
237✔
301

302
        cancelled_ = make_shared<bool>(false);
237✔
303

304
        auto &cancelled = cancelled_;
305

306
        resolver_.async_resolve(
474✔
307
                request_->address_.host,
308
                to_string(request_->address_.port),
474✔
309
                [this, cancelled](
474✔
310
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
236✔
311
                        if (!*cancelled) {
237✔
312
                                ResolveHandler(ec, results);
236✔
313
                        }
314
                });
237✔
315

316
        return error::NoError;
237✔
317
}
318

319
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
163✔
320
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
163✔
321
                return expected::unexpected(error::Error(
2✔
322
                        make_error_condition(errc::operation_in_progress),
4✔
323
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
324
        }
325

326
        if (response_body_length_ == 0) {
161✔
327
                return expected::unexpected(
16✔
328
                        MakeError(BodyMissingError, "Response does not contain a body"));
48✔
329
        }
330

331
        status_ = TransactionStatus::ReaderCreated;
145✔
332
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
290✔
333
}
334

335
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
336
        if (*cancelled_) {
7✔
337
                return expected::unexpected(error::Error(
×
338
                        make_error_condition(errc::not_connected),
×
339
                        "Cannot switch protocols if endpoint is not connected"));
×
340
        }
341

342
        // Rest of the connection is done directly on the socket, we are done here.
343
        status_ = TransactionStatus::Done;
7✔
344
        *cancelled_ = true;
7✔
345
        cancelled_ = make_shared<bool>(false);
14✔
346

347
        auto stream = stream_;
348
        // This no longer belongs to us.
349
        stream_.reset();
7✔
350

351
        if (is_https_) {
7✔
352
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
353
                        stream, response_data_.response_buffer_);
×
354
        } else {
355
                return make_shared<RawSocket<tcp::socket>>(
7✔
356
                        make_shared<tcp::socket>(std::move(stream->next_layer())),
14✔
357
                        response_data_.response_buffer_);
7✔
358
        }
359
}
360

361
void Client::CallHandler(ResponseHandler handler) {
285✔
362
        // This function exists to make sure we have a copy of the handler we're calling (in the
363
        // argument list). This is important in case the handler owns the client instance through a
364
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
365
        // does, then it destroys the final copy of the handler, and therefore also the client,
366
        // which is why we need to make a copy here, before calling it.
367
        handler(response_);
285✔
368
}
285✔
369

370
void Client::CallErrorHandler(
24✔
371
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
372
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
72✔
373
}
24✔
374

375
void Client::CallErrorHandler(
151✔
376
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
377
        *cancelled_ = true;
151✔
378
        cancelled_ = make_shared<bool>(true);
151✔
379
        stream_.reset();
151✔
380
        status_ = TransactionStatus::Done;
151✔
381
        handler(expected::unexpected(
302✔
382
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
604✔
383
}
151✔
384

385
void Client::ResolveHandler(
236✔
386
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
387
        if (ec) {
236✔
388
                CallErrorHandler(ec, request_, header_handler_);
×
389
                return;
×
390
        }
391

392
        if (logger_.Level() >= log::LogLevel::Debug) {
236✔
393
                string ips = "[";
233✔
394
                string sep;
395
                for (auto r : results) {
960✔
396
                        ips += sep;
247✔
397
                        ips += r.endpoint().address().to_string();
247✔
398
                        sep = ", ";
247✔
399
                }
400
                ips += "]";
233✔
401
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
466✔
402
        }
403

404
        resolver_results_ = results;
405

406
        stream_ = make_shared<ssl::stream<tcp::socket>>(GetAsioIoContext(event_loop_), ssl_ctx_);
472✔
407

408
        if (!response_data_.response_buffer_) {
236✔
409
                // We can reuse this if preexisting.
410
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
340✔
411

412
                // This is equivalent to:
413
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
414
                // but compatible with Boost 1.67.
415
                response_data_.response_buffer_->prepare(
416
                        body_buffer_.size() - response_data_.response_buffer_->size());
170✔
417
        }
418

419
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
472✔
420

421
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
422
        // if they do, they should be handled higher up in the application logic.
423
        //
424
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
425
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
426
        // `has_value()` in their code, causing their subsequent comparison operation to
427
        // misbehave. So pass highest possible value instead.
428
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
429

430
        auto &cancelled = cancelled_;
431

432
        asio::async_connect(
236✔
433
                stream_->next_layer(),
434
                resolver_results_,
236✔
435
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
472✔
436
                        if (!*cancelled) {
236✔
437
                                if (is_https_) {
236✔
438
                                        return HandshakeHandler(ec, endpoint);
6✔
439
                                }
440
                                return ConnectHandler(ec, endpoint);
230✔
441
                        }
442
                });
443
}
444

445
void Client::HandshakeHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
6✔
446
        if (ec) {
6✔
447
                CallErrorHandler(ec, request_, header_handler_);
×
448
                return;
×
449
        }
450

451
        if (not disable_keep_alive_) {
6✔
452
                boost::asio::socket_base::keep_alive option(true);
453
                stream_->next_layer().set_option(option);
6✔
454
        }
455

456
        // Set SNI Hostname (many hosts need this to handshake successfully)
457
        if (!SSL_set_tlsext_host_name(stream_->native_handle(), request_->address_.host.c_str())) {
6✔
458
                beast::error_code ec2 {
459
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
460
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
461
        }
462

463
        auto &cancelled = cancelled_;
464

465
        stream_->async_handshake(
12✔
466
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
6✔
467
                        if (*cancelled) {
6✔
468
                                return;
469
                        }
470
                        if (ec) {
6✔
471
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
2✔
472
                                CallErrorHandler(ec, request_, header_handler_);
1✔
473
                                return;
1✔
474
                        }
475
                        logger_.Debug("https: Successful SSL handshake");
10✔
476
                        ConnectHandler(ec, endpoint);
5✔
477
                });
478
}
479

480

481
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
235✔
482
        if (ec) {
235✔
483
                CallErrorHandler(ec, request_, header_handler_);
12✔
484
                return;
12✔
485
        }
486

487
        if (not disable_keep_alive_) {
223✔
488
                boost::asio::socket_base::keep_alive option(true);
489
                stream_->next_layer().set_option(option);
134✔
490
        }
491

492
        logger_.Debug("Connected to " + endpoint.address().to_string());
446✔
493

494
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
223✔
495
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
446✔
496

497
        for (const auto &header : request_->headers_) {
636✔
498
                request_data_.http_request_->set(header.first, header.second);
413✔
499
        }
500

501
        request_data_.http_request_serializer_ =
502
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
223✔
503

504
        auto &cancelled = cancelled_;
505
        auto &request_data = request_data_;
223✔
506

507
        if (is_https_) {
223✔
508
                http::async_write_header(
5✔
509
                        *stream_,
510
                        *request_data_.http_request_serializer_,
511
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
5✔
512
                                if (!*cancelled) {
5✔
513
                                        WriteHeaderHandler(ec, num_written);
5✔
514
                                }
515
                        });
5✔
516
        } else {
517
                http::async_write_header(
218✔
518
                        stream_->next_layer(),
519
                        *request_data_.http_request_serializer_,
520
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
218✔
521
                                if (!*cancelled) {
218✔
522
                                        WriteHeaderHandler(ec, num_written);
218✔
523
                                }
524
                        });
218✔
525
        }
526
}
527

528
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
223✔
529
        if (num_written > 0) {
223✔
530
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
446✔
531
        }
532

533
        if (ec) {
223✔
534
                CallErrorHandler(ec, request_, header_handler_);
×
535
                return;
178✔
536
        }
537

538
        auto header = request_->GetHeader("Content-Length");
446✔
539
        if (!header || header.value() == "0") {
223✔
540
                ReadHeader();
177✔
541
                return;
542
        }
543

544
        auto length = common::StringToLongLong(header.value());
46✔
545
        if (!length || length.value() < 0) {
46✔
546
                auto err = error::Error(
547
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
548
                CallErrorHandler(err, request_, header_handler_);
×
549
                return;
550
        }
551
        request_body_length_ = length.value();
46✔
552

553
        if (!request_->body_gen_ && !request_->async_body_gen_) {
46✔
554
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
555
                CallErrorHandler(err, request_, header_handler_);
2✔
556
                return;
557
        }
558

559
        assert(!(request_->body_gen_ && request_->async_body_gen_));
560

561
        if (request_->body_gen_) {
45✔
562
                auto body_reader = request_->body_gen_();
39✔
563
                if (!body_reader) {
39✔
564
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
565
                        return;
566
                }
567
                request_->body_reader_ = body_reader.value();
39✔
568
        } else {
569
                auto body_reader = request_->async_body_gen_();
6✔
570
                if (!body_reader) {
6✔
571
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
572
                        return;
573
                }
574
                request_->async_body_reader_ = body_reader.value();
6✔
575
        }
576

577
        PrepareAndWriteNewBodyBuffer();
45✔
578
}
579

580
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,142✔
581
        if (num_written > 0) {
2,142✔
582
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,100✔
583
        }
584

585
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,142✔
586
                // Write next block of the body.
587
                PrepareAndWriteNewBodyBuffer();
1,050✔
588
        } else if (ec) {
1,092✔
589
                CallErrorHandler(ec, request_, header_handler_);
8✔
590
        } else if (num_written > 0) {
1,088✔
591
                // We are still writing the body.
592
                WriteBody();
1,050✔
593
        } else {
594
                // We are ready to receive the response.
595
                ReadHeader();
38✔
596
        }
597
}
2,142✔
598

599
void Client::PrepareAndWriteNewBodyBuffer() {
1,095✔
600
        // request_->body_reader_ XOR request_->async_body_reader_
601
        assert(
602
                (request_->body_reader_ || request_->async_body_reader_)
603
                && !(request_->body_reader_ && request_->async_body_reader_));
604

605
        auto cancelled = cancelled_;
606
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,462✔
607
                if (!*cancelled) {
1,095✔
608
                        if (!read) {
1,094✔
609
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
610
                                return;
2✔
611
                        }
612
                        WriteNewBodyBuffer(read.value());
1,092✔
613
                }
614
        };
1,095✔
615

616

617
        if (request_->body_reader_) {
1,095✔
618
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,340✔
619
        } else {
620
                auto err = request_->async_body_reader_->AsyncRead(
621
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
622
                if (err != error::NoError) {
425✔
623
                        CallErrorHandler(err, request_, header_handler_);
×
624
                }
625
        }
626
}
1,095✔
627

628
void Client::WriteNewBodyBuffer(size_t size) {
1,092✔
629
        request_data_.http_request_->body().data = body_buffer_.data();
1,092✔
630
        request_data_.http_request_->body().size = size;
1,092✔
631

632
        if (size > 0) {
1,092✔
633
                request_data_.http_request_->body().more = true;
1,054✔
634
        } else {
635
                // Release ownership of Body reader.
636
                request_->body_reader_.reset();
38✔
637
                request_->async_body_reader_.reset();
38✔
638
                request_data_.http_request_->body().more = false;
38✔
639
        }
640

641
        WriteBody();
1,092✔
642
}
1,092✔
643

644
void Client::WriteBody() {
2,142✔
645
        auto &cancelled = cancelled_;
646
        auto &request_data = request_data_;
2,142✔
647

648
        if (is_https_) {
2,142✔
649
                http::async_write_some(
×
650
                        *stream_,
651
                        *request_data_.http_request_serializer_,
652
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
×
653
                                if (!*cancelled) {
×
654
                                        WriteBodyHandler(ec, num_written);
×
655
                                }
656
                        });
×
657
        } else {
658
                http::async_write_some(
4,284✔
659
                        stream_->next_layer(),
660
                        *request_data_.http_request_serializer_,
661
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,142✔
662
                                if (!*cancelled) {
2,142✔
663
                                        WriteBodyHandler(ec, num_written);
2,142✔
664
                                }
665
                        });
2,142✔
666
        }
667
}
2,142✔
668

669
void Client::ReadHeader() {
215✔
670
        auto &cancelled = cancelled_;
671
        auto &response_data = response_data_;
215✔
672

673
        if (is_https_) {
215✔
674
                http::async_read_some(
5✔
675
                        *stream_,
676
                        *response_data_.response_buffer_,
677
                        *response_data_.http_response_parser_,
678
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
5✔
679
                                if (!*cancelled) {
5✔
680
                                        ReadHeaderHandler(ec, num_read);
5✔
681
                                }
682
                        });
5✔
683
        } else {
684
                http::async_read_some(
210✔
685
                        stream_->next_layer(),
686
                        *response_data_.response_buffer_,
687
                        *response_data_.http_response_parser_,
688
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
209✔
689
                                if (!*cancelled) {
209✔
690
                                        ReadHeaderHandler(ec, num_read);
209✔
691
                                }
692
                        });
209✔
693
        }
694
}
215✔
695

696
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
214✔
697
        if (num_read > 0) {
214✔
698
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
418✔
699
        }
700

701
        if (ec) {
214✔
702
                CallErrorHandler(ec, request_, header_handler_);
5✔
703
                return;
68✔
704
        }
705

706
        if (!response_data_.http_response_parser_->is_header_done()) {
209✔
707
                ReadHeader();
×
708
                return;
×
709
        }
710

711
        response_.reset(new IncomingResponse(*this, cancelled_));
418✔
712
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
209✔
713
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
209✔
714

715
        logger_.Debug(
418✔
716
                "Received response: " + to_string(response_->status_code_) + " "
418✔
717
                + response_->status_message_);
627✔
718

719
        string debug_str;
720
        for (auto header = response_data_.http_response_parser_->get().cbegin();
239✔
721
                 header != response_data_.http_response_parser_->get().cend();
448✔
722
                 header++) {
723
                response_->headers_[string {header->name_string()}] = string {header->value()};
717✔
724
                if (logger_.Level() >= log::LogLevel::Debug) {
239✔
725
                        debug_str += string {header->name_string()};
236✔
726
                        debug_str += ": ";
236✔
727
                        debug_str += string {header->value()};
236✔
728
                        debug_str += "\n";
236✔
729
                }
730
        }
731

732
        logger_.Debug("Received headers:\n" + debug_str);
418✔
733
        debug_str.clear();
734

735
        if (response_data_.http_response_parser_->chunked()) {
209✔
736
                auto cancelled = cancelled_;
737
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
738
                CallHandler(header_handler_);
2✔
739
                if (!*cancelled) {
1✔
740
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
741
                        CallErrorHandler(err, request_, body_handler_);
2✔
742
                }
743
                return;
744
        }
745

746
        auto content_length = response_data_.http_response_parser_->content_length();
208✔
747
        if (content_length) {
208✔
748
                response_body_length_ = content_length.value();
179✔
749
        } else {
750
                response_body_length_ = 0;
29✔
751
        }
752
        response_body_read_ = 0;
208✔
753

754
        if (response_body_read_ >= response_body_length_) {
208✔
755
                auto cancelled = cancelled_;
756
                status_ = TransactionStatus::HeaderHandlerCalled;
43✔
757
                CallHandler(header_handler_);
86✔
758
                if (!*cancelled) {
43✔
759
                        status_ = TransactionStatus::Done;
37✔
760
                        CallHandler(body_handler_);
74✔
761

762
                        // After body handler has run, set the request to cancelled. The body
763
                        // handler may have made a new request, so this is not necessarily the same
764
                        // request as is currently active (note use of shared_ptr copy, not
765
                        // `cancelled_`).
766
                        *cancelled = true;
37✔
767
                }
768
                return;
769
        }
770

771
        auto cancelled = cancelled_;
772
        status_ = TransactionStatus::HeaderHandlerCalled;
165✔
773
        CallHandler(header_handler_);
330✔
774
        if (*cancelled) {
165✔
775
                return;
776
        }
777

778
        // We know that a body reader is required here, because of the `response_body_read_ >=
779
        // response_body_length_` check above.
780
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
146✔
781
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
4✔
782
        }
783
}
784

785
void Client::AsyncReadNextBodyPart(
3,805✔
786
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
787
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
788

789
        if (status_ == TransactionStatus::ReaderCreated) {
3,805✔
790
                status_ = TransactionStatus::BodyReadingInProgress;
144✔
791
        }
792

793
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
3,805✔
794
                auto cancelled = cancelled_;
795
                handler(0);
78✔
796
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
39✔
797
                        status_ = TransactionStatus::Done;
39✔
798
                        CallHandler(body_handler_);
78✔
799

800
                        // After body handler has run, set the request to cancelled. The body
801
                        // handler may have made a new request, so this is not necessarily the same
802
                        // request as is currently active (note use of shared_ptr copy, not
803
                        // `cancelled_`).
804
                        *cancelled = true;
39✔
805
                }
806
                return;
807
        }
808

809
        reader_buf_start_ = start;
3,766✔
810
        reader_buf_end_ = end;
3,766✔
811
        reader_handler_ = handler;
3,766✔
812
        size_t read_size = end - start;
3,766✔
813
        size_t smallest = min(body_buffer_.size(), read_size);
5,879✔
814

815
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
3,766✔
816
        response_data_.http_response_parser_->get().body().size = smallest;
3,766✔
817

818
        auto &cancelled = cancelled_;
819
        auto &response_data = response_data_;
3,766✔
820

821
        if (is_https_) {
3,766✔
822
                http::async_read_some(
×
823
                        *stream_,
824
                        *response_data_.response_buffer_,
825
                        *response_data_.http_response_parser_,
826
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
×
827
                                if (!*cancelled) {
×
828
                                        ReadBodyHandler(ec, num_read);
×
829
                                }
830
                        });
×
831
        } else {
832
                http::async_read_some(
3,766✔
833
                        stream_->next_layer(),
834
                        *response_data_.response_buffer_,
835
                        *response_data_.http_response_parser_,
836
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
3,765✔
837
                                if (!*cancelled) {
3,765✔
838
                                        ReadBodyHandler(ec, num_read);
3,764✔
839
                                }
840
                        });
3,765✔
841
        }
842
}
843

844
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
3,764✔
845
        if (num_read > 0) {
3,764✔
846
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
7,430✔
847
                response_body_read_ += num_read;
3,715✔
848
        }
849

850
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,764✔
851
                // This can be ignored. We always reset the buffer between reads anyway.
852
                ec = error_code();
1,958✔
853
        }
854

855
        assert(reader_handler_);
856

857
        if (response_body_read_ >= response_body_length_) {
3,764✔
858
                status_ = TransactionStatus::BodyReadingFinished;
91✔
859
        }
860

861
        auto cancelled = cancelled_;
862

863
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
3,764✔
864
        size_t smallest = min(num_read, buf_size);
3,764✔
865
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,764✔
866
        if (ec) {
3,764✔
867
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
98✔
868
                reader_handler_(expected::unexpected(err));
147✔
869
        } else {
870
                reader_handler_(smallest);
7,430✔
871
        }
872

873
        if (!*cancelled && ec) {
3,764✔
874
                CallErrorHandler(ec, request_, body_handler_);
4✔
875
                return;
876
        }
877
}
878

879
void Client::Cancel() {
195✔
880
        auto cancelled = cancelled_;
881

882
        if (!*cancelled) {
195✔
883
                auto err =
884
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
246✔
885
                switch (status_) {
123✔
886
                case TransactionStatus::None:
1✔
887
                        CallErrorHandler(err, request_, header_handler_);
1✔
888
                        break;
1✔
889
                case TransactionStatus::HeaderHandlerCalled:
120✔
890
                case TransactionStatus::ReaderCreated:
891
                case TransactionStatus::BodyReadingInProgress:
892
                case TransactionStatus::BodyReadingFinished:
893
                        CallErrorHandler(err, request_, body_handler_);
120✔
894
                        break;
120✔
895
                case TransactionStatus::Replying:
896
                case TransactionStatus::SwitchingProtocol:
897
                        // Not used by client.
898
                        assert(false);
899
                        break;
900
                case TransactionStatus::BodyHandlerCalled:
901
                case TransactionStatus::Done:
902
                        break;
903
                }
904
        }
905

906
        if (!*cancelled) {
195✔
907
                DoCancel();
2✔
908
        }
909
}
195✔
910

911
void Client::DoCancel() {
408✔
912
        resolver_.cancel();
408✔
913
        if (stream_) {
408✔
914
                stream_->next_layer().cancel();
66✔
915
                stream_->next_layer().close();
66✔
916
                stream_.reset();
66✔
917
        }
918

919
        request_.reset();
408✔
920
        response_.reset();
408✔
921

922
        // Reset logger to no connection.
923
        logger_ = log::Logger(logger_name_);
408✔
924

925
        // Set cancel state and then make a new one. Those who are interested should have their own
926
        // pointer to the old one.
927
        *cancelled_ = true;
408✔
928
        cancelled_ = make_shared<bool>(true);
408✔
929
}
408✔
930

931
Stream::Stream(Server &server) :
416✔
932
        server_ {server},
933
        logger_ {"http"},
934
        cancelled_(make_shared<bool>(true)),
416✔
935
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
416✔
936
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,248✔
937
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
832✔
938

939
        // This is equivalent to:
940
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
941
        // but compatible with Boost 1.67.
942
        request_data_.request_buffer_->prepare(
943
                body_buffer_.size() - request_data_.request_buffer_->size());
416✔
944

945
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
832✔
946

947
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
948
        // they do, they should be handled higher up in the application logic.
949
        //
950
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
951
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
952
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
953
        // possible value instead.
954
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
955
}
416✔
956

957
Stream::~Stream() {
1,248✔
958
        DoCancel();
416✔
959
}
416✔
960

961
void Stream::Cancel() {
7✔
962
        auto cancelled = cancelled_;
963

964
        if (!*cancelled) {
7✔
965
                auto err =
966
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
967
                switch (status_) {
7✔
968
                case TransactionStatus::None:
×
969
                        CallErrorHandler(err, request_, server_.header_handler_);
×
970
                        break;
×
971
                case TransactionStatus::HeaderHandlerCalled:
5✔
972
                case TransactionStatus::ReaderCreated:
973
                case TransactionStatus::BodyReadingInProgress:
974
                case TransactionStatus::BodyReadingFinished:
975
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
976
                        break;
5✔
977
                case TransactionStatus::BodyHandlerCalled:
×
978
                        // In between body handler and reply finished. No one to handle the status
979
                        // here.
980
                        server_.RemoveStream(shared_from_this());
×
981
                        break;
×
982
                case TransactionStatus::Replying:
1✔
983
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
984
                        break;
1✔
985
                case TransactionStatus::SwitchingProtocol:
1✔
986
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
987
                        break;
1✔
988
                case TransactionStatus::Done:
989
                        break;
990
                }
991
        }
992

993
        if (!*cancelled) {
7✔
994
                DoCancel();
×
995
        }
996
}
7✔
997

998
void Stream::DoCancel() {
598✔
999
        if (socket_.is_open()) {
598✔
1000
                socket_.cancel();
210✔
1001
                socket_.close();
210✔
1002
        }
1003

1004
        // Set cancel state and then make a new one. Those who are interested should have their own
1005
        // pointer to the old one.
1006
        *cancelled_ = true;
598✔
1007
        cancelled_ = make_shared<bool>(true);
598✔
1008
}
598✔
1009

1010
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1011
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1012
}
×
1013

1014
void Stream::CallErrorHandler(
×
1015
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1016
        *cancelled_ = true;
×
1017
        cancelled_ = make_shared<bool>(true);
×
1018
        status_ = TransactionStatus::Done;
×
1019
        handler(expected::unexpected(err.WithContext(
×
1020
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1021

1022
        server_.RemoveStream(shared_from_this());
×
1023
}
×
1024

1025
void Stream::CallErrorHandler(
2✔
1026
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1027
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1028
}
2✔
1029

1030
void Stream::CallErrorHandler(
9✔
1031
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1032
        *cancelled_ = true;
9✔
1033
        cancelled_ = make_shared<bool>(true);
9✔
1034
        status_ = TransactionStatus::Done;
9✔
1035
        handler(
9✔
1036
                req,
1037
                err.WithContext(
9✔
1038
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
27✔
1039

1040
        server_.RemoveStream(shared_from_this());
9✔
1041
}
9✔
1042

1043
void Stream::CallErrorHandler(
4✔
1044
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1045
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1046
}
4✔
1047

1048
void Stream::CallErrorHandler(
7✔
1049
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1050
        *cancelled_ = true;
7✔
1051
        cancelled_ = make_shared<bool>(true);
7✔
1052
        status_ = TransactionStatus::Done;
7✔
1053
        handler(err.WithContext(
14✔
1054
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1055

1056
        server_.RemoveStream(shared_from_this());
7✔
1057
}
7✔
1058

1059
void Stream::CallErrorHandler(
×
1060
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1061
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1062
}
×
1063

1064
void Stream::CallErrorHandler(
1✔
1065
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1066
        *cancelled_ = true;
1✔
1067
        cancelled_ = make_shared<bool>(true);
1✔
1068
        status_ = TransactionStatus::Done;
1✔
1069
        handler(expected::unexpected(err.WithContext(
2✔
1070
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1071

1072
        server_.RemoveStream(shared_from_this());
1✔
1073
}
1✔
1074

1075
void Stream::AcceptHandler(const error_code &ec) {
218✔
1076
        if (ec) {
218✔
1077
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1078
                return;
×
1079
        }
1080

1081
        auto ip = socket_.remote_endpoint().address().to_string();
436✔
1082

1083
        // Use IP as context for logging.
1084
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
218✔
1085

1086
        logger_.Debug("Accepted connection.");
436✔
1087

1088
        request_.reset(new IncomingRequest(*this, cancelled_));
436✔
1089

1090
        request_->address_.host = ip;
218✔
1091

1092
        *cancelled_ = false;
218✔
1093

1094
        ReadHeader();
218✔
1095
}
1096

1097
void Stream::ReadHeader() {
218✔
1098
        auto &cancelled = cancelled_;
1099
        auto &request_data = request_data_;
218✔
1100

1101
        http::async_read_some(
436✔
1102
                socket_,
218✔
1103
                *request_data_.request_buffer_,
1104
                *request_data_.http_request_parser_,
1105
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
218✔
1106
                        if (!*cancelled) {
218✔
1107
                                ReadHeaderHandler(ec, num_read);
218✔
1108
                        }
1109
                });
218✔
1110
}
218✔
1111

1112
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
218✔
1113
        if (num_read > 0) {
218✔
1114
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
436✔
1115
        }
1116

1117
        if (ec) {
218✔
1118
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1119
                return;
173✔
1120
        }
1121

1122
        if (!request_data_.http_request_parser_->is_header_done()) {
218✔
1123
                ReadHeader();
×
1124
                return;
×
1125
        }
1126

1127
        auto method_result = BeastVerbToMethod(
1128
                request_data_.http_request_parser_->get().base().method(),
1129
                string {request_data_.http_request_parser_->get().base().method_string()});
436✔
1130
        if (!method_result) {
218✔
1131
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1132
                return;
×
1133
        }
1134
        request_->method_ = method_result.value();
218✔
1135
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
218✔
1136

1137
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
218✔
1138

1139
        string debug_str;
1140
        for (auto header = request_data_.http_request_parser_->get().cbegin();
408✔
1141
                 header != request_data_.http_request_parser_->get().cend();
626✔
1142
                 header++) {
1143
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,224✔
1144
                if (logger_.Level() >= log::LogLevel::Debug) {
408✔
1145
                        debug_str += string {header->name_string()};
395✔
1146
                        debug_str += ": ";
395✔
1147
                        debug_str += string {header->value()};
395✔
1148
                        debug_str += "\n";
395✔
1149
                }
1150
        }
1151

1152
        logger_.Debug("Received headers:\n" + debug_str);
436✔
1153
        debug_str.clear();
1154

1155
        if (request_data_.http_request_parser_->chunked()) {
218✔
1156
                auto cancelled = cancelled_;
1157
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
1158
                server_.header_handler_(request_);
2✔
1159
                if (!*cancelled) {
1✔
1160
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
1161
                        CallErrorHandler(err, request_, server_.body_handler_);
2✔
1162
                }
1163
                return;
1164
        }
1165

1166
        auto content_length = request_data_.http_request_parser_->content_length();
217✔
1167
        if (content_length) {
217✔
1168
                request_body_length_ = content_length.value();
46✔
1169
        } else {
1170
                request_body_length_ = 0;
171✔
1171
        }
1172
        request_body_read_ = 0;
217✔
1173

1174
        if (request_body_read_ >= request_body_length_) {
217✔
1175
                auto cancelled = cancelled_;
1176
                status_ = TransactionStatus::HeaderHandlerCalled;
171✔
1177
                server_.header_handler_(request_);
342✔
1178
                if (!*cancelled) {
171✔
1179
                        status_ = TransactionStatus::BodyHandlerCalled;
171✔
1180
                        CallBodyHandler();
171✔
1181
                }
1182
                return;
1183
        }
1184

1185
        auto cancelled = cancelled_;
1186
        status_ = TransactionStatus::HeaderHandlerCalled;
46✔
1187
        server_.header_handler_(request_);
92✔
1188
        if (*cancelled) {
46✔
1189
                return;
1190
        }
1191

1192
        // We know that a body reader is required here, because of the `request_body_read_ >=
1193
        // request_body_length_` check above.
1194
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
45✔
1195
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1196
        }
1197
}
1198

1199
void Stream::AsyncReadNextBodyPart(
2,046✔
1200
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1201
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1202

1203
        if (status_ == TransactionStatus::ReaderCreated) {
2,046✔
1204
                status_ = TransactionStatus::BodyReadingInProgress;
44✔
1205
        }
1206

1207
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,046✔
1208
                auto cancelled = cancelled_;
1209
                handler(0);
76✔
1210
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
38✔
1211
                        status_ = TransactionStatus::BodyHandlerCalled;
38✔
1212
                        CallBodyHandler();
38✔
1213
                }
1214
                return;
1215
        }
1216

1217
        reader_buf_start_ = start;
2,008✔
1218
        reader_buf_end_ = end;
2,008✔
1219
        reader_handler_ = handler;
2,008✔
1220
        size_t read_size = end - start;
2,008✔
1221
        size_t smallest = min(body_buffer_.size(), read_size);
3,064✔
1222

1223
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,008✔
1224
        request_data_.http_request_parser_->get().body().size = smallest;
2,008✔
1225

1226
        auto &cancelled = cancelled_;
1227
        auto &request_data = request_data_;
2,008✔
1228

1229
        http::async_read_some(
4,016✔
1230
                socket_,
2,008✔
1231
                *request_data_.request_buffer_,
1232
                *request_data_.http_request_parser_,
1233
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,008✔
1234
                        if (!*cancelled) {
2,008✔
1235
                                ReadBodyHandler(ec, num_read);
2,008✔
1236
                        }
1237
                });
2,008✔
1238
}
1239

1240
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,008✔
1241
        if (num_read > 0) {
2,008✔
1242
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,008✔
1243
                request_body_read_ += num_read;
2,004✔
1244
        }
1245

1246
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,008✔
1247
                // This can be ignored. We always reset the buffer between reads anyway.
1248
                ec = error_code();
979✔
1249
        }
1250

1251
        assert(reader_handler_);
1252

1253
        if (request_body_read_ >= request_body_length_) {
2,008✔
1254
                status_ = TransactionStatus::BodyReadingFinished;
38✔
1255
        }
1256

1257
        auto cancelled = cancelled_;
1258

1259
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,008✔
1260
        size_t smallest = min(num_read, buf_size);
2,008✔
1261
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,008✔
1262
        if (ec) {
2,008✔
1263
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1264
                reader_handler_(expected::unexpected(err));
12✔
1265
        } else {
1266
                reader_handler_(smallest);
4,008✔
1267
        }
1268

1269
        if (!*cancelled && ec) {
2,008✔
1270
                CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1271
                return;
1272
        }
1273
}
1274

1275
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
196✔
1276
        SetupResponse();
196✔
1277

1278
        reply_finished_handler_ = reply_finished_handler;
196✔
1279

1280
        auto &cancelled = cancelled_;
1281
        auto &response_data = response_data_;
196✔
1282

1283
        http::async_write_header(
392✔
1284
                socket_,
196✔
1285
                *response_data_.http_response_serializer_,
1286
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
196✔
1287
                        if (!*cancelled) {
196✔
1288
                                WriteHeaderHandler(ec, num_written);
195✔
1289
                        }
1290
                });
196✔
1291
}
196✔
1292

1293
void Stream::SetupResponse() {
205✔
1294
        auto response = maybe_response_.lock();
205✔
1295
        // Only called from existing responses, so this should always be true.
1296
        assert(response);
1297

1298
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1299
        status_ = TransactionStatus::Replying;
205✔
1300

1301
        // From here on we take shared ownership.
1302
        response_ = response;
1303

1304
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
410✔
1305

1306
        for (const auto &header : response->headers_) {
429✔
1307
                response_data_.http_response_->base().set(header.first, header.second);
224✔
1308
        }
1309

1310
        response_data_.http_response_->result(response->GetStatusCode());
205✔
1311
        response_data_.http_response_->reason(response->GetStatusMessage());
410✔
1312

1313
        response_data_.http_response_serializer_ =
1314
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
410✔
1315
}
205✔
1316

1317
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
195✔
1318
        if (num_written > 0) {
195✔
1319
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
390✔
1320
        }
1321

1322
        if (ec) {
195✔
1323
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1324
                return;
34✔
1325
        }
1326

1327
        auto header = response_->GetHeader("Content-Length");
390✔
1328
        if (!header || header.value() == "0") {
195✔
1329
                FinishReply();
33✔
1330
                return;
1331
        }
1332

1333
        auto length = common::StringToLongLong(header.value());
162✔
1334
        if (!length || length.value() < 0) {
162✔
1335
                auto err = error::Error(
1336
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1337
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1338
                return;
1339
        }
1340

1341
        if (!response_->body_reader_ && !response_->async_body_reader_) {
162✔
1342
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1343
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1344
                return;
1345
        }
1346

1347
        PrepareAndWriteNewBodyBuffer();
161✔
1348
}
1349

1350
void Stream::PrepareAndWriteNewBodyBuffer() {
1,883✔
1351
        // response_->body_reader_ XOR response_->async_body_reader_
1352
        assert(
1353
                (response_->body_reader_ || response_->async_body_reader_)
1354
                && !(response_->body_reader_ && response_->async_body_reader_));
1355

1356
        auto read_handler = [this](io::ExpectedSize read) {
1,884✔
1357
                if (!read) {
1,883✔
1358
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1359
                        return;
1✔
1360
                }
1361
                WriteNewBodyBuffer(read.value());
1,882✔
1362
        };
1,883✔
1363

1364
        if (response_->body_reader_) {
1,883✔
1365
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,218✔
1366
        } else {
1367
                auto err = response_->async_body_reader_->AsyncRead(
1368
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1369
                if (err != error::NoError) {
274✔
1370
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1371
                }
1372
        }
1373
}
1,883✔
1374

1375
void Stream::WriteNewBodyBuffer(size_t size) {
1,882✔
1376
        response_data_.http_response_->body().data = body_buffer_.data();
1,882✔
1377
        response_data_.http_response_->body().size = size;
1,882✔
1378

1379
        if (size > 0) {
1,882✔
1380
                response_data_.http_response_->body().more = true;
1,756✔
1381
        } else {
1382
                response_data_.http_response_->body().more = false;
126✔
1383
        }
1384

1385
        WriteBody();
1,882✔
1386
}
1,882✔
1387

1388
void Stream::WriteBody() {
3,617✔
1389
        auto &cancelled = cancelled_;
1390
        auto &response_data = response_data_;
3,617✔
1391

1392
        http::async_write_some(
7,234✔
1393
                socket_,
3,617✔
1394
                *response_data_.http_response_serializer_,
1395
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
3,580✔
1396
                        if (!*cancelled) {
3,580✔
1397
                                WriteBodyHandler(ec, num_written);
3,580✔
1398
                        }
1399
                });
3,580✔
1400
}
3,617✔
1401

1402
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
3,580✔
1403
        if (num_written > 0) {
3,580✔
1404
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,470✔
1405
        }
1406

1407
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,580✔
1408
                // Write next body block.
1409
                PrepareAndWriteNewBodyBuffer();
1,722✔
1410
        } else if (ec) {
1,858✔
1411
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1412
        } else if (num_written > 0) {
1,854✔
1413
                // We are still writing the body.
1414
                WriteBody();
1,735✔
1415
        } else {
1416
                // We are finished.
1417
                FinishReply();
119✔
1418
        }
1419
}
3,580✔
1420

1421
void Stream::FinishReply() {
152✔
1422
        // We are done.
1423
        *cancelled_ = true;
152✔
1424
        cancelled_ = make_shared<bool>(true);
152✔
1425
        status_ = TransactionStatus::Done;
152✔
1426
        // Release ownership of Body reader.
1427
        response_->body_reader_.reset();
152✔
1428
        response_->async_body_reader_.reset();
152✔
1429
        reply_finished_handler_(error::NoError);
152✔
1430
        server_.RemoveStream(shared_from_this());
152✔
1431
}
152✔
1432

1433
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1434
        SetupResponse();
9✔
1435

1436
        switch_protocol_handler_ = handler;
9✔
1437
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1438

1439
        auto &cancelled = cancelled_;
1440
        auto &response_data = response_data_;
9✔
1441

1442
        http::async_write_header(
18✔
1443
                socket_,
9✔
1444
                *response_data_.http_response_serializer_,
1445
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1446
                        if (!*cancelled) {
9✔
1447
                                SwitchingProtocolHandler(ec, num_written);
8✔
1448
                        }
1449
                });
9✔
1450

1451
        return error::NoError;
9✔
1452
}
1453

1454
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1455
        if (num_written > 0) {
8✔
1456
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1457
        }
1458

1459
        if (ec) {
8✔
1460
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1461
                return;
×
1462
        }
1463

1464
        auto socket = make_shared<RawSocket<tcp::socket>>(
1465
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1466

1467
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1468

1469
        // Rest of the connection is done directly on the socket, we are done here.
1470
        status_ = TransactionStatus::Done;
8✔
1471
        *cancelled_ = true;
8✔
1472
        cancelled_ = make_shared<bool>(true);
8✔
1473
        server_.RemoveStream(shared_from_this());
16✔
1474

1475
        switch_protocol_handler(socket);
16✔
1476
}
1477

1478
void Stream::CallBodyHandler() {
209✔
1479
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1480
        // it immediately destroys, which would destroy this stream as well. At the end of this
1481
        // function, it's ok to destroy it.
1482
        auto stream_ref = shared_from_this();
1483

1484
        server_.body_handler_(request_, error::NoError);
627✔
1485

1486
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1487
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1488
        // request has not been handled correctly.
1489
        auto response = maybe_response_.lock();
209✔
1490
        if (!response) {
209✔
1491
                logger_.Error("Handler produced no response. Closing stream prematurely.");
4✔
1492
                *cancelled_ = true;
2✔
1493
                cancelled_ = make_shared<bool>(true);
2✔
1494
                server_.RemoveStream(shared_from_this());
6✔
1495
        }
1496
}
209✔
1497

1498
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
218✔
1499
        event_loop_ {event_loop},
1500
        acceptor_(GetAsioIoContext(event_loop_)) {
390✔
1501
}
218✔
1502

1503
Server::~Server() {
436✔
1504
        Cancel();
218✔
1505
}
218✔
1506

1507
error::Error Server::AsyncServeUrl(
186✔
1508
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1509
        return AsyncServeUrl(
1510
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
760✔
1511
                        if (err != error::NoError) {
206✔
1512
                                body_handler(expected::unexpected(err));
14✔
1513
                        } else {
1514
                                body_handler(req);
398✔
1515
                        }
1516
                });
578✔
1517
}
1518

1519
error::Error Server::AsyncServeUrl(
199✔
1520
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1521
        auto err = BreakDownUrl(url, address_);
199✔
1522
        if (error::NoError != err) {
199✔
1523
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1524
        }
1525

1526
        if (address_.protocol != "http") {
199✔
1527
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1528
        }
1529

1530
        if (address_.path.size() > 0 && address_.path != "/") {
199✔
1531
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1532
        }
1533

1534
        boost::system::error_code ec;
198✔
1535
        auto address = asio::ip::make_address(address_.host, ec);
198✔
1536
        if (ec) {
198✔
1537
                return error::Error(
1538
                        ec.default_error_condition(),
×
1539
                        "Could not construct endpoint from address " + address_.host);
×
1540
        }
1541

1542
        asio::ip::tcp::endpoint endpoint(address, address_.port);
198✔
1543

1544
        ec.clear();
1545
        acceptor_.open(endpoint.protocol(), ec);
198✔
1546
        if (ec) {
198✔
1547
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1548
        }
1549

1550
        // Allow address reuse, otherwise we can't re-bind later.
1551
        ec.clear();
1552
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
198✔
1553
        if (ec) {
198✔
1554
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1555
        }
1556

1557
        ec.clear();
1558
        acceptor_.bind(endpoint, ec);
198✔
1559
        if (ec) {
198✔
1560
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1561
        }
1562

1563
        ec.clear();
1564
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
198✔
1565
        if (ec) {
198✔
1566
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1567
        }
1568

1569
        header_handler_ = header_handler;
198✔
1570
        body_handler_ = body_handler;
198✔
1571

1572
        PrepareNewStream();
198✔
1573

1574
        return error::NoError;
198✔
1575
}
1576

1577
void Server::Cancel() {
233✔
1578
        if (acceptor_.is_open()) {
233✔
1579
                acceptor_.cancel();
198✔
1580
                acceptor_.close();
198✔
1581
        }
1582
        streams_.clear();
1583
}
233✔
1584

1585
uint16_t Server::GetPort() const {
13✔
1586
        return acceptor_.local_endpoint().port();
13✔
1587
}
1588

1589
string Server::GetUrl() const {
12✔
1590
        return "http://127.0.0.1:" + to_string(GetPort());
24✔
1591
}
1592

1593
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
208✔
1594
        if (*req->cancelled_) {
208✔
1595
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1596
        }
1597
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
416✔
1598
        req->stream_.maybe_response_ = response;
208✔
1599
        return response;
208✔
1600
}
1601

1602
error::Error Server::AsyncReply(
196✔
1603
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1604
        if (*resp->cancelled_) {
196✔
1605
                return MakeError(StreamCancelledError, "Cannot send response");
×
1606
        }
1607

1608
        resp->stream_.AsyncReply(reply_finished_handler);
196✔
1609
        return error::NoError;
196✔
1610
}
1611

1612
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
60✔
1613
        if (*req->cancelled_) {
60✔
1614
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1615
        }
1616

1617
        auto &stream = req->stream_;
60✔
1618
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
60✔
1619
                return expected::unexpected(error::Error(
1✔
1620
                        make_error_condition(errc::operation_in_progress),
2✔
1621
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1622
        }
1623

1624
        if (stream.request_body_length_ == 0) {
59✔
1625
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
45✔
1626
        }
1627

1628
        stream.status_ = TransactionStatus::ReaderCreated;
44✔
1629
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
88✔
1630
}
1631

1632
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1633
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1634
}
1635

1636
void Server::PrepareNewStream() {
416✔
1637
        StreamPtr new_stream {new Stream(*this)};
416✔
1638
        streams_.insert(new_stream);
1639
        AsyncAccept(new_stream);
832✔
1640
}
416✔
1641

1642
void Server::AsyncAccept(StreamPtr stream) {
416✔
1643
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
637✔
1644
                if (ec) {
221✔
1645
                        log::Error("Could not accept connection: " + ec.message());
6✔
1646
                        return;
3✔
1647
                }
1648

1649
                stream->AcceptHandler(ec);
218✔
1650

1651
                this->PrepareNewStream();
218✔
1652
        });
1653
}
416✔
1654

1655
void Server::RemoveStream(StreamPtr stream) {
182✔
1656
        streams_.erase(stream);
182✔
1657

1658
        stream->DoCancel();
182✔
1659
}
182✔
1660

1661
} // namespace http
1662
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc