• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1038172795

16 Oct 2023 12:17PM UTC coverage: 80.166% (+0.6%) from 79.608%
1038172795

push

gitlab-ci

kacf
test: Add test for verifying hostname when doing TLS handshake.

Ticket: MEN-6788

Signed-off-by: Kristian Amlie <kristian.amlie@northern.tech>

6479 of 8082 relevant lines covered (80.17%)

10736.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.82
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24

25
namespace mender {
26
namespace http {
27

28
namespace common = mender::common;
29

30
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
31
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
32
const unsigned int BeastHttpVersion = 11;
33

34
namespace asio = boost::asio;
35
namespace http = boost::beast::http;
36

37
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
38

39
static http::verb MethodToBeastVerb(Method method) {
223✔
40
        switch (method) {
223✔
41
        case Method::GET:
42
                return http::verb::get;
43
        case Method::HEAD:
44
                return http::verb::head;
45
        case Method::POST:
46
                return http::verb::post;
47
        case Method::PUT:
48
                return http::verb::put;
49
        case Method::PATCH:
50
                return http::verb::patch;
51
        case Method::CONNECT:
52
                return http::verb::connect;
53
        case Method::Invalid:
54
                // Fallthrough to end (no-op).
55
                break;
56
        }
57
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
58
        // still assert here for safety.
59
        assert(false);
60
        return http::verb::get;
61
}
62

63
static expected::expected<Method, error::Error> BeastVerbToMethod(
218✔
64
        http::verb verb, const string &verb_string) {
65
        switch (verb) {
218✔
66
        case http::verb::get:
176✔
67
                return Method::GET;
68
        case http::verb::head:
×
69
                return Method::HEAD;
70
        case http::verb::post:
20✔
71
                return Method::POST;
72
        case http::verb::put:
22✔
73
                return Method::PUT;
74
        case http::verb::patch:
×
75
                return Method::PATCH;
76
        case http::verb::connect:
×
77
                return Method::CONNECT;
78
        default:
×
79
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
80
        }
81
}
82

83
template <typename StreamType>
84
class BodyAsyncReader : virtual public io::AsyncReader {
85
public:
86
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
145✔
87
                stream_ {stream},
88
                cancelled_ {cancelled} {
290✔
89
        }
145✔
90
        ~BodyAsyncReader() {
44✔
91
                Cancel();
44✔
92
        }
88✔
93

94
        error::Error AsyncRead(
2,046✔
95
                vector<uint8_t>::iterator start,
96
                vector<uint8_t>::iterator end,
97
                io::AsyncIoHandler handler) override {
98
                if (*cancelled_) {
2,046✔
99
                        return error::MakeError(
×
100
                                error::ProgrammingError,
101
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
102
                }
103
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,046✔
104
                return error::NoError;
2,046✔
105
        }
106

107
        void Cancel() override {
46✔
108
                if (!*cancelled_) {
46✔
109
                        stream_.Cancel();
4✔
110
                }
111
        }
46✔
112

113
private:
114
        StreamType &stream_;
115
        shared_ptr<bool> cancelled_;
116

117
        friend class Client;
118
        friend class Server;
119
};
120

121
template <typename StreamType>
122
class RawSocket : virtual public io::AsyncReadWriter {
123
public:
124
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
125
                destroying_ {make_shared<bool>(false)},
×
126
                stream_ {stream},
127
                buffered_ {buffered} {
×
128
                // If there are no buffered bytes, then we don't need it.
129
                if (buffered_ && buffered_->size() == 0) {
×
130
                        buffered_.reset();
×
131
                }
132
        }
×
133

134
        ~RawSocket() {
15✔
135
                *destroying_ = true;
15✔
136
                Cancel();
15✔
137
        }
30✔
138

139
        error::Error AsyncRead(
320✔
140
                vector<uint8_t>::iterator start,
141
                vector<uint8_t>::iterator end,
142
                io::AsyncIoHandler handler) override {
143
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
144
                // header and parts of the body in one block, return those first.
145
                if (buffered_) {
320✔
146
                        return DrainPrebufferedData(start, end, handler);
8✔
147
                }
148

149
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
150
                auto &destroying = destroying_;
151
                stream_->async_read_some(
632✔
152
                        read_buffer_,
316✔
153
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
154
                                if (*destroying) {
313✔
155
                                        return;
156
                                }
157

158
                                if (ec == asio::error::operation_aborted) {
313✔
159
                                        handler(expected::unexpected(error::Error(
12✔
160
                                                make_error_condition(errc::operation_canceled),
6✔
161
                                                "Could not read from socket")));
162
                                } else if (ec) {
310✔
163
                                        handler(expected::unexpected(
12✔
164
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
165
                                } else {
166
                                        handler(num_read);
608✔
167
                                }
168
                        });
169
                return error::NoError;
316✔
170
        }
171

172
        error::Error AsyncWrite(
309✔
173
                vector<uint8_t>::const_iterator start,
174
                vector<uint8_t>::const_iterator end,
175
                io::AsyncIoHandler handler) override {
176
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
177
                auto &destroying = destroying_;
178
                stream_->async_write_some(
618✔
179
                        write_buffer_,
309✔
180
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
181
                                if (*destroying) {
306✔
182
                                        return;
183
                                }
184

185
                                if (ec == asio::error::operation_aborted) {
306✔
186
                                        handler(expected::unexpected(error::Error(
×
187
                                                make_error_condition(errc::operation_canceled),
×
188
                                                "Could not write to socket")));
189
                                } else if (ec) {
306✔
190
                                        handler(expected::unexpected(
×
191
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
192
                                } else {
193
                                        handler(num_written);
612✔
194
                                }
195
                        });
196
                return error::NoError;
309✔
197
        }
198

199
        void Cancel() override {
28✔
200
                if (stream_->lowest_layer().is_open()) {
28✔
201
                        stream_->lowest_layer().cancel();
15✔
202
                        stream_->lowest_layer().close();
15✔
203
                }
204
        }
28✔
205

206
private:
207
        error::Error DrainPrebufferedData(
4✔
208
                vector<uint8_t>::iterator start,
209
                vector<uint8_t>::iterator end,
210
                io::AsyncIoHandler handler) {
211
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
8✔
212
                copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
4✔
213
                buffered_->consume(to_copy);
4✔
214
                if (buffered_->size() == 0) {
4✔
215
                        // We don't need it anymore.
216
                        buffered_.reset();
4✔
217
                }
218
                handler(to_copy);
4✔
219
                return error::NoError;
4✔
220
        }
221

222
        shared_ptr<bool> destroying_;
223
        shared_ptr<StreamType> stream_;
224
        shared_ptr<beast::flat_buffer> buffered_;
225
        asio::mutable_buffer read_buffer_;
226
        asio::const_buffer write_buffer_;
227
};
228

229
Client::Client(
406✔
230
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
231
        event_loop_ {event_loop},
232
        logger_name_ {logger_name},
233
        cancelled_ {make_shared<bool>(true)},
406✔
234
        resolver_(GetAsioIoContext(event_loop)),
235
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,426✔
236
        ssl_ctx_.set_verify_mode(client.skip_verify ? ssl::verify_none : ssl::verify_peer);
731✔
237

238
        if (client.client_cert_path != "" and client.client_cert_key_path != "") {
406✔
239
                ssl_ctx_.set_options(boost::asio::ssl::context::default_workarounds);
1✔
240
                ssl_ctx_.use_certificate_file(client.client_cert_path, boost::asio::ssl::context_base::pem);
1✔
241
                ssl_ctx_.use_private_key_file(
1✔
242
                        client.client_cert_key_path, boost::asio::ssl::context_base::pem);
243
        }
244

245
        beast::error_code ec {};
406✔
246
        ssl_ctx_.set_default_verify_paths(ec); // Load the default CAs
406✔
247
        if (ec) {
406✔
248
                log::Error("Failed to load the SSL default directory");
×
249
        }
250
        if (client.server_cert_path != "") {
406✔
251
                ssl_ctx_.load_verify_file(client.server_cert_path, ec);
5✔
252
                if (ec) {
5✔
253
                        log::Error("Failed to load the server certificate!");
×
254
                }
255
        }
256
}
406✔
257

258
Client::~Client() {
1,624✔
259
        if (!*cancelled_) {
406✔
260
                logger_.Warning("Client destroyed while request is still active!");
24✔
261
        }
262
        DoCancel();
406✔
263
}
406✔
264

265
error::Error Client::AsyncCall(
241✔
266
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
267
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
241✔
268
                return error::Error(
269
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
270
        }
271

272
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
241✔
273
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
274
        }
275

276
        if (!header_handler || !body_handler) {
239✔
277
                return error::MakeError(
278
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
279
        }
280

281
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
238✔
282
                return error::Error(
283
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
1✔
284
        }
285

286
        if (req->address_.protocol == "https") {
237✔
287
                is_https_ = true;
6✔
288
        }
289

290
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
237✔
291

292
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
293
        // request to route to our k8s cluster. Set this in all cases.
294
        req->SetHeader("HOST", req->address_.host);
474✔
295

296
        request_ = req;
297
        header_handler_ = header_handler;
237✔
298
        body_handler_ = body_handler;
237✔
299
        status_ = TransactionStatus::None;
237✔
300

301
        cancelled_ = make_shared<bool>(false);
237✔
302

303
        auto &cancelled = cancelled_;
304

305
        resolver_.async_resolve(
474✔
306
                request_->address_.host,
307
                to_string(request_->address_.port),
474✔
308
                [this, cancelled](
474✔
309
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
236✔
310
                        if (!*cancelled) {
237✔
311
                                ResolveHandler(ec, results);
236✔
312
                        }
313
                });
237✔
314

315
        return error::NoError;
237✔
316
}
317

318
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
163✔
319
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
163✔
320
                return expected::unexpected(error::Error(
2✔
321
                        make_error_condition(errc::operation_in_progress),
4✔
322
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
323
        }
324

325
        if (response_body_length_ == 0) {
161✔
326
                return expected::unexpected(
16✔
327
                        MakeError(BodyMissingError, "Response does not contain a body"));
48✔
328
        }
329

330
        status_ = TransactionStatus::ReaderCreated;
145✔
331
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
290✔
332
}
333

334
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
335
        if (*cancelled_) {
7✔
336
                return expected::unexpected(error::Error(
×
337
                        make_error_condition(errc::not_connected),
×
338
                        "Cannot switch protocols if endpoint is not connected"));
×
339
        }
340

341
        // Rest of the connection is done directly on the socket, we are done here.
342
        status_ = TransactionStatus::Done;
7✔
343
        *cancelled_ = true;
7✔
344
        cancelled_ = make_shared<bool>(false);
14✔
345

346
        auto stream = stream_;
347
        // This no longer belongs to us.
348
        stream_.reset();
7✔
349

350
        if (is_https_) {
7✔
351
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
352
                        stream, response_data_.response_buffer_);
×
353
        } else {
354
                return make_shared<RawSocket<tcp::socket>>(
7✔
355
                        make_shared<tcp::socket>(std::move(stream->next_layer())),
14✔
356
                        response_data_.response_buffer_);
7✔
357
        }
358
}
359

360
void Client::CallHandler(ResponseHandler handler) {
286✔
361
        // This function exists to make sure we have a copy of the handler we're calling (in the
362
        // argument list). This is important in case the handler owns the client instance through a
363
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
364
        // does, then it destroys the final copy of the handler, and therefore also the client,
365
        // which is why we need to make a copy here, before calling it.
366
        handler(response_);
286✔
367
}
286✔
368

369
void Client::CallErrorHandler(
24✔
370
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
371
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
72✔
372
}
24✔
373

374
void Client::CallErrorHandler(
151✔
375
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
376
        *cancelled_ = true;
151✔
377
        cancelled_ = make_shared<bool>(true);
151✔
378
        stream_.reset();
151✔
379
        status_ = TransactionStatus::Done;
151✔
380
        handler(expected::unexpected(
302✔
381
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
604✔
382
}
151✔
383

384
void Client::ResolveHandler(
236✔
385
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
386
        if (ec) {
236✔
387
                CallErrorHandler(ec, request_, header_handler_);
×
388
                return;
×
389
        }
390

391
        if (logger_.Level() >= log::LogLevel::Debug) {
236✔
392
                string ips = "[";
233✔
393
                string sep;
394
                for (auto r : results) {
960✔
395
                        ips += sep;
247✔
396
                        ips += r.endpoint().address().to_string();
247✔
397
                        sep = ", ";
247✔
398
                }
399
                ips += "]";
233✔
400
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
466✔
401
        }
402

403
        resolver_results_ = results;
404

405
        stream_ = make_shared<ssl::stream<tcp::socket>>(GetAsioIoContext(event_loop_), ssl_ctx_);
472✔
406

407
        if (!response_data_.response_buffer_) {
236✔
408
                // We can reuse this if preexisting.
409
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
340✔
410

411
                // This is equivalent to:
412
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
413
                // but compatible with Boost 1.67.
414
                response_data_.response_buffer_->prepare(
415
                        body_buffer_.size() - response_data_.response_buffer_->size());
170✔
416
        }
417

418
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
472✔
419

420
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
421
        // if they do, they should be handled higher up in the application logic.
422
        //
423
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
424
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
425
        // `has_value()` in their code, causing their subsequent comparison operation to
426
        // misbehave. So pass highest possible value instead.
427
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
428

429
        auto &cancelled = cancelled_;
430

431
        asio::async_connect(
236✔
432
                stream_->next_layer(),
433
                resolver_results_,
236✔
434
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
472✔
435
                        if (!*cancelled) {
236✔
436
                                if (is_https_) {
236✔
437
                                        return HandshakeHandler(ec, endpoint);
6✔
438
                                }
439
                                return ConnectHandler(ec, endpoint);
230✔
440
                        }
441
                });
442
}
443

444
void Client::HandshakeHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
6✔
445
        if (ec) {
6✔
446
                CallErrorHandler(ec, request_, header_handler_);
×
447
                return;
×
448
        }
449

450
        // Set SNI Hostname (many hosts need this to handshake successfully)
451
        if (!SSL_set_tlsext_host_name(stream_->native_handle(), request_->address_.host.c_str())) {
6✔
452
                beast::error_code ec2 {
453
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
454
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
455
        }
456

457
        auto &cancelled = cancelled_;
458

459
        stream_->async_handshake(
12✔
460
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
6✔
461
                        if (*cancelled) {
6✔
462
                                return;
463
                        }
464
                        if (ec) {
6✔
465
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
2✔
466
                                CallErrorHandler(ec, request_, header_handler_);
1✔
467
                                return;
1✔
468
                        }
469
                        logger_.Debug("https: Successful SSL handshake");
10✔
470
                        ConnectHandler(ec, endpoint);
5✔
471
                });
472
}
473

474

475
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
235✔
476
        if (ec) {
235✔
477
                CallErrorHandler(ec, request_, header_handler_);
12✔
478
                return;
12✔
479
        }
480

481
        logger_.Debug("Connected to " + endpoint.address().to_string());
446✔
482

483
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
223✔
484
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
446✔
485

486
        for (const auto &header : request_->headers_) {
636✔
487
                request_data_.http_request_->set(header.first, header.second);
413✔
488
        }
489

490
        request_data_.http_request_serializer_ =
491
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
223✔
492

493
        auto &cancelled = cancelled_;
494
        auto &request_data = request_data_;
223✔
495

496
        if (is_https_) {
223✔
497
                http::async_write_header(
5✔
498
                        *stream_,
499
                        *request_data_.http_request_serializer_,
500
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
5✔
501
                                if (!*cancelled) {
5✔
502
                                        WriteHeaderHandler(ec, num_written);
5✔
503
                                }
504
                        });
5✔
505
        } else {
506
                http::async_write_header(
218✔
507
                        stream_->next_layer(),
508
                        *request_data_.http_request_serializer_,
509
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
218✔
510
                                if (!*cancelled) {
218✔
511
                                        WriteHeaderHandler(ec, num_written);
218✔
512
                                }
513
                        });
218✔
514
        }
515
}
516

517
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
223✔
518
        if (num_written > 0) {
223✔
519
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
446✔
520
        }
521

522
        if (ec) {
223✔
523
                CallErrorHandler(ec, request_, header_handler_);
×
524
                return;
178✔
525
        }
526

527
        auto header = request_->GetHeader("Content-Length");
446✔
528
        if (!header || header.value() == "0") {
223✔
529
                ReadHeader();
177✔
530
                return;
531
        }
532

533
        auto length = common::StringToLongLong(header.value());
46✔
534
        if (!length || length.value() < 0) {
46✔
535
                auto err = error::Error(
536
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
537
                CallErrorHandler(err, request_, header_handler_);
×
538
                return;
539
        }
540
        request_body_length_ = length.value();
46✔
541

542
        if (!request_->body_gen_ && !request_->async_body_gen_) {
46✔
543
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
544
                CallErrorHandler(err, request_, header_handler_);
2✔
545
                return;
546
        }
547

548
        assert(!(request_->body_gen_ && request_->async_body_gen_));
549

550
        if (request_->body_gen_) {
45✔
551
                auto body_reader = request_->body_gen_();
39✔
552
                if (!body_reader) {
39✔
553
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
554
                        return;
555
                }
556
                request_->body_reader_ = body_reader.value();
39✔
557
        } else {
558
                auto body_reader = request_->async_body_gen_();
6✔
559
                if (!body_reader) {
6✔
560
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
561
                        return;
562
                }
563
                request_->async_body_reader_ = body_reader.value();
6✔
564
        }
565

566
        PrepareAndWriteNewBodyBuffer();
45✔
567
}
568

569
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,142✔
570
        if (num_written > 0) {
2,142✔
571
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,100✔
572
        }
573

574
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,142✔
575
                // Write next block of the body.
576
                PrepareAndWriteNewBodyBuffer();
1,050✔
577
        } else if (ec) {
1,092✔
578
                CallErrorHandler(ec, request_, header_handler_);
8✔
579
        } else if (num_written > 0) {
1,088✔
580
                // We are still writing the body.
581
                WriteBody();
1,050✔
582
        } else {
583
                // We are ready to receive the response.
584
                ReadHeader();
38✔
585
        }
586
}
2,142✔
587

588
void Client::PrepareAndWriteNewBodyBuffer() {
1,095✔
589
        // request_->body_reader_ XOR request_->async_body_reader_
590
        assert(
591
                (request_->body_reader_ || request_->async_body_reader_)
592
                && !(request_->body_reader_ && request_->async_body_reader_));
593

594
        auto cancelled = cancelled_;
595
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,462✔
596
                if (!*cancelled) {
1,095✔
597
                        if (!read) {
1,094✔
598
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
599
                                return;
2✔
600
                        }
601
                        WriteNewBodyBuffer(read.value());
1,092✔
602
                }
603
        };
1,095✔
604

605

606
        if (request_->body_reader_) {
1,095✔
607
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,340✔
608
        } else {
609
                auto err = request_->async_body_reader_->AsyncRead(
610
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
611
                if (err != error::NoError) {
425✔
612
                        CallErrorHandler(err, request_, header_handler_);
×
613
                }
614
        }
615
}
1,095✔
616

617
void Client::WriteNewBodyBuffer(size_t size) {
1,092✔
618
        request_data_.http_request_->body().data = body_buffer_.data();
1,092✔
619
        request_data_.http_request_->body().size = size;
1,092✔
620

621
        if (size > 0) {
1,092✔
622
                request_data_.http_request_->body().more = true;
1,054✔
623
        } else {
624
                // Release ownership of Body reader.
625
                request_->body_reader_.reset();
38✔
626
                request_->async_body_reader_.reset();
38✔
627
                request_data_.http_request_->body().more = false;
38✔
628
        }
629

630
        WriteBody();
1,092✔
631
}
1,092✔
632

633
void Client::WriteBody() {
2,142✔
634
        auto &cancelled = cancelled_;
635
        auto &request_data = request_data_;
2,142✔
636

637
        if (is_https_) {
2,142✔
638
                http::async_write_some(
×
639
                        *stream_,
640
                        *request_data_.http_request_serializer_,
641
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
×
642
                                if (!*cancelled) {
×
643
                                        WriteBodyHandler(ec, num_written);
×
644
                                }
645
                        });
×
646
        } else {
647
                http::async_write_some(
4,284✔
648
                        stream_->next_layer(),
649
                        *request_data_.http_request_serializer_,
650
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,142✔
651
                                if (!*cancelled) {
2,142✔
652
                                        WriteBodyHandler(ec, num_written);
2,142✔
653
                                }
654
                        });
2,142✔
655
        }
656
}
2,142✔
657

658
void Client::ReadHeader() {
215✔
659
        auto &cancelled = cancelled_;
660
        auto &response_data = response_data_;
215✔
661

662
        if (is_https_) {
215✔
663
                http::async_read_some(
5✔
664
                        *stream_,
665
                        *response_data_.response_buffer_,
666
                        *response_data_.http_response_parser_,
667
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
5✔
668
                                if (!*cancelled) {
5✔
669
                                        ReadHeaderHandler(ec, num_read);
5✔
670
                                }
671
                        });
5✔
672
        } else {
673
                http::async_read_some(
210✔
674
                        stream_->next_layer(),
675
                        *response_data_.response_buffer_,
676
                        *response_data_.http_response_parser_,
677
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
209✔
678
                                if (!*cancelled) {
209✔
679
                                        ReadHeaderHandler(ec, num_read);
209✔
680
                                }
681
                        });
209✔
682
        }
683
}
215✔
684

685
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
214✔
686
        if (num_read > 0) {
214✔
687
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
418✔
688
        }
689

690
        if (ec) {
214✔
691
                CallErrorHandler(ec, request_, header_handler_);
5✔
692
                return;
68✔
693
        }
694

695
        if (!response_data_.http_response_parser_->is_header_done()) {
209✔
696
                ReadHeader();
×
697
                return;
×
698
        }
699

700
        response_.reset(new IncomingResponse(*this, cancelled_));
418✔
701
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
209✔
702
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
209✔
703

704
        logger_.Debug(
418✔
705
                "Received response: " + to_string(response_->status_code_) + " "
418✔
706
                + response_->status_message_);
627✔
707

708
        string debug_str;
709
        for (auto header = response_data_.http_response_parser_->get().cbegin();
239✔
710
                 header != response_data_.http_response_parser_->get().cend();
448✔
711
                 header++) {
712
                response_->headers_[string {header->name_string()}] = string {header->value()};
717✔
713
                if (logger_.Level() >= log::LogLevel::Debug) {
239✔
714
                        debug_str += string {header->name_string()};
236✔
715
                        debug_str += ": ";
236✔
716
                        debug_str += string {header->value()};
236✔
717
                        debug_str += "\n";
236✔
718
                }
719
        }
720

721
        logger_.Debug("Received headers:\n" + debug_str);
418✔
722
        debug_str.clear();
723

724
        if (response_data_.http_response_parser_->chunked()) {
209✔
725
                auto cancelled = cancelled_;
726
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
727
                CallHandler(header_handler_);
2✔
728
                if (!*cancelled) {
1✔
729
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
730
                        CallErrorHandler(err, request_, body_handler_);
2✔
731
                }
732
                return;
733
        }
734

735
        auto content_length = response_data_.http_response_parser_->content_length();
208✔
736
        if (content_length) {
208✔
737
                response_body_length_ = content_length.value();
179✔
738
        } else {
739
                response_body_length_ = 0;
29✔
740
        }
741
        response_body_read_ = 0;
208✔
742

743
        if (response_body_read_ >= response_body_length_) {
208✔
744
                auto cancelled = cancelled_;
745
                status_ = TransactionStatus::HeaderHandlerCalled;
43✔
746
                CallHandler(header_handler_);
86✔
747
                if (!*cancelled) {
43✔
748
                        status_ = TransactionStatus::Done;
37✔
749
                        CallHandler(body_handler_);
74✔
750

751
                        // After body handler has run, set the request to cancelled. The body
752
                        // handler may have made a new request, so this is not necessarily the same
753
                        // request as is currently active (note use of shared_ptr copy, not
754
                        // `cancelled_`).
755
                        *cancelled = true;
37✔
756
                }
757
                return;
758
        }
759

760
        auto cancelled = cancelled_;
761
        status_ = TransactionStatus::HeaderHandlerCalled;
165✔
762
        CallHandler(header_handler_);
330✔
763
        if (*cancelled) {
165✔
764
                return;
765
        }
766

767
        // We know that a body reader is required here, because of the `response_body_read_ >=
768
        // response_body_length_` check above.
769
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
146✔
770
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
4✔
771
        }
772
}
773

774
void Client::AsyncReadNextBodyPart(
3,807✔
775
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
776
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
777

778
        if (status_ == TransactionStatus::ReaderCreated) {
3,807✔
779
                status_ = TransactionStatus::BodyReadingInProgress;
144✔
780
        }
781

782
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
3,807✔
783
                auto cancelled = cancelled_;
784
                handler(0);
80✔
785
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
40✔
786
                        status_ = TransactionStatus::Done;
40✔
787
                        CallHandler(body_handler_);
80✔
788

789
                        // After body handler has run, set the request to cancelled. The body
790
                        // handler may have made a new request, so this is not necessarily the same
791
                        // request as is currently active (note use of shared_ptr copy, not
792
                        // `cancelled_`).
793
                        *cancelled = true;
40✔
794
                }
795
                return;
796
        }
797

798
        reader_buf_start_ = start;
3,767✔
799
        reader_buf_end_ = end;
3,767✔
800
        reader_handler_ = handler;
3,767✔
801
        size_t read_size = end - start;
3,767✔
802
        size_t smallest = min(body_buffer_.size(), read_size);
5,880✔
803

804
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
3,767✔
805
        response_data_.http_response_parser_->get().body().size = smallest;
3,767✔
806

807
        auto &cancelled = cancelled_;
808
        auto &response_data = response_data_;
3,767✔
809

810
        if (is_https_) {
3,767✔
811
                http::async_read_some(
×
812
                        *stream_,
813
                        *response_data_.response_buffer_,
814
                        *response_data_.http_response_parser_,
815
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
×
816
                                if (!*cancelled) {
×
817
                                        ReadBodyHandler(ec, num_read);
×
818
                                }
819
                        });
×
820
        } else {
821
                http::async_read_some(
3,767✔
822
                        stream_->next_layer(),
823
                        *response_data_.response_buffer_,
824
                        *response_data_.http_response_parser_,
825
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
3,766✔
826
                                if (!*cancelled) {
3,766✔
827
                                        ReadBodyHandler(ec, num_read);
3,766✔
828
                                }
829
                        });
3,766✔
830
        }
831
}
832

833
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
3,766✔
834
        if (num_read > 0) {
3,766✔
835
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
7,434✔
836
                response_body_read_ += num_read;
3,717✔
837
        }
838

839
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,766✔
840
                // This can be ignored. We always reset the buffer between reads anyway.
841
                ec = error_code();
1,958✔
842
        }
843

844
        assert(reader_handler_);
845

846
        if (response_body_read_ >= response_body_length_) {
3,766✔
847
                status_ = TransactionStatus::BodyReadingFinished;
92✔
848
        }
849

850
        auto cancelled = cancelled_;
851

852
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
3,766✔
853
        size_t smallest = min(num_read, buf_size);
3,766✔
854
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,766✔
855
        if (ec) {
3,766✔
856
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
98✔
857
                reader_handler_(expected::unexpected(err));
147✔
858
        } else {
859
                reader_handler_(smallest);
7,434✔
860
        }
861

862
        if (!*cancelled && ec) {
3,766✔
863
                CallErrorHandler(ec, request_, body_handler_);
4✔
864
                return;
865
        }
866
}
867

868
void Client::Cancel() {
195✔
869
        auto cancelled = cancelled_;
870

871
        if (!*cancelled) {
195✔
872
                auto err =
873
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
246✔
874
                switch (status_) {
123✔
875
                case TransactionStatus::None:
1✔
876
                        CallErrorHandler(err, request_, header_handler_);
1✔
877
                        break;
1✔
878
                case TransactionStatus::HeaderHandlerCalled:
120✔
879
                case TransactionStatus::ReaderCreated:
880
                case TransactionStatus::BodyReadingInProgress:
881
                case TransactionStatus::BodyReadingFinished:
882
                        CallErrorHandler(err, request_, body_handler_);
120✔
883
                        break;
120✔
884
                case TransactionStatus::Replying:
885
                case TransactionStatus::SwitchingProtocol:
886
                        // Not used by client.
887
                        assert(false);
888
                        break;
889
                case TransactionStatus::BodyHandlerCalled:
890
                case TransactionStatus::Done:
891
                        break;
892
                }
893
        }
894

895
        if (!*cancelled) {
195✔
896
                DoCancel();
2✔
897
        }
898
}
195✔
899

900
void Client::DoCancel() {
408✔
901
        resolver_.cancel();
408✔
902
        if (stream_) {
408✔
903
                stream_->next_layer().cancel();
66✔
904
                stream_->next_layer().close();
66✔
905
                stream_.reset();
66✔
906
        }
907

908
        request_.reset();
408✔
909
        response_.reset();
408✔
910

911
        // Reset logger to no connection.
912
        logger_ = log::Logger(logger_name_);
408✔
913

914
        // Set cancel state and then make a new one. Those who are interested should have their own
915
        // pointer to the old one.
916
        *cancelled_ = true;
408✔
917
        cancelled_ = make_shared<bool>(true);
408✔
918
}
408✔
919

920
Stream::Stream(Server &server) :
416✔
921
        server_ {server},
922
        logger_ {"http"},
923
        cancelled_(make_shared<bool>(true)),
416✔
924
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
416✔
925
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,248✔
926
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
832✔
927

928
        // This is equivalent to:
929
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
930
        // but compatible with Boost 1.67.
931
        request_data_.request_buffer_->prepare(
932
                body_buffer_.size() - request_data_.request_buffer_->size());
416✔
933

934
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
832✔
935

936
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
937
        // they do, they should be handled higher up in the application logic.
938
        //
939
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
940
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
941
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
942
        // possible value instead.
943
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
944
}
416✔
945

946
Stream::~Stream() {
1,248✔
947
        DoCancel();
416✔
948
}
416✔
949

950
void Stream::Cancel() {
7✔
951
        auto cancelled = cancelled_;
952

953
        if (!*cancelled) {
7✔
954
                auto err =
955
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
956
                switch (status_) {
7✔
957
                case TransactionStatus::None:
×
958
                        CallErrorHandler(err, request_, server_.header_handler_);
×
959
                        break;
×
960
                case TransactionStatus::HeaderHandlerCalled:
5✔
961
                case TransactionStatus::ReaderCreated:
962
                case TransactionStatus::BodyReadingInProgress:
963
                case TransactionStatus::BodyReadingFinished:
964
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
965
                        break;
5✔
966
                case TransactionStatus::BodyHandlerCalled:
×
967
                        // In between body handler and reply finished. No one to handle the status
968
                        // here.
969
                        server_.RemoveStream(shared_from_this());
×
970
                        break;
×
971
                case TransactionStatus::Replying:
1✔
972
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
973
                        break;
1✔
974
                case TransactionStatus::SwitchingProtocol:
1✔
975
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
976
                        break;
1✔
977
                case TransactionStatus::Done:
978
                        break;
979
                }
980
        }
981

982
        if (!*cancelled) {
7✔
983
                DoCancel();
×
984
        }
985
}
7✔
986

987
void Stream::DoCancel() {
599✔
988
        if (socket_.is_open()) {
599✔
989
                socket_.cancel();
210✔
990
                socket_.close();
210✔
991
        }
992

993
        // Set cancel state and then make a new one. Those who are interested should have their own
994
        // pointer to the old one.
995
        *cancelled_ = true;
599✔
996
        cancelled_ = make_shared<bool>(true);
599✔
997
}
599✔
998

999
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1000
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1001
}
×
1002

1003
void Stream::CallErrorHandler(
×
1004
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1005
        *cancelled_ = true;
×
1006
        cancelled_ = make_shared<bool>(true);
×
1007
        status_ = TransactionStatus::Done;
×
1008
        handler(expected::unexpected(err.WithContext(
×
1009
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1010

1011
        server_.RemoveStream(shared_from_this());
×
1012
}
×
1013

1014
void Stream::CallErrorHandler(
2✔
1015
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1016
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1017
}
2✔
1018

1019
void Stream::CallErrorHandler(
9✔
1020
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1021
        *cancelled_ = true;
9✔
1022
        cancelled_ = make_shared<bool>(true);
9✔
1023
        status_ = TransactionStatus::Done;
9✔
1024
        handler(
9✔
1025
                req,
1026
                err.WithContext(
9✔
1027
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
27✔
1028

1029
        server_.RemoveStream(shared_from_this());
9✔
1030
}
9✔
1031

1032
void Stream::CallErrorHandler(
4✔
1033
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1034
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1035
}
4✔
1036

1037
void Stream::CallErrorHandler(
7✔
1038
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1039
        *cancelled_ = true;
7✔
1040
        cancelled_ = make_shared<bool>(true);
7✔
1041
        status_ = TransactionStatus::Done;
7✔
1042
        handler(err.WithContext(
14✔
1043
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1044

1045
        server_.RemoveStream(shared_from_this());
7✔
1046
}
7✔
1047

1048
void Stream::CallErrorHandler(
×
1049
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1050
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1051
}
×
1052

1053
void Stream::CallErrorHandler(
1✔
1054
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1055
        *cancelled_ = true;
1✔
1056
        cancelled_ = make_shared<bool>(true);
1✔
1057
        status_ = TransactionStatus::Done;
1✔
1058
        handler(expected::unexpected(err.WithContext(
2✔
1059
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1060

1061
        server_.RemoveStream(shared_from_this());
1✔
1062
}
1✔
1063

1064
void Stream::AcceptHandler(const error_code &ec) {
218✔
1065
        if (ec) {
218✔
1066
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1067
                return;
×
1068
        }
1069

1070
        auto ip = socket_.remote_endpoint().address().to_string();
436✔
1071

1072
        // Use IP as context for logging.
1073
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
218✔
1074

1075
        logger_.Debug("Accepted connection.");
436✔
1076

1077
        request_.reset(new IncomingRequest(*this, cancelled_));
436✔
1078

1079
        request_->address_.host = ip;
218✔
1080

1081
        *cancelled_ = false;
218✔
1082

1083
        ReadHeader();
218✔
1084
}
1085

1086
void Stream::ReadHeader() {
218✔
1087
        auto &cancelled = cancelled_;
1088
        auto &request_data = request_data_;
218✔
1089

1090
        http::async_read_some(
436✔
1091
                socket_,
218✔
1092
                *request_data_.request_buffer_,
1093
                *request_data_.http_request_parser_,
1094
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
218✔
1095
                        if (!*cancelled) {
218✔
1096
                                ReadHeaderHandler(ec, num_read);
218✔
1097
                        }
1098
                });
218✔
1099
}
218✔
1100

1101
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
218✔
1102
        if (num_read > 0) {
218✔
1103
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
436✔
1104
        }
1105

1106
        if (ec) {
218✔
1107
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1108
                return;
173✔
1109
        }
1110

1111
        if (!request_data_.http_request_parser_->is_header_done()) {
218✔
1112
                ReadHeader();
×
1113
                return;
×
1114
        }
1115

1116
        auto method_result = BeastVerbToMethod(
1117
                request_data_.http_request_parser_->get().base().method(),
1118
                string {request_data_.http_request_parser_->get().base().method_string()});
436✔
1119
        if (!method_result) {
218✔
1120
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1121
                return;
×
1122
        }
1123
        request_->method_ = method_result.value();
218✔
1124
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
218✔
1125

1126
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
218✔
1127

1128
        string debug_str;
1129
        for (auto header = request_data_.http_request_parser_->get().cbegin();
408✔
1130
                 header != request_data_.http_request_parser_->get().cend();
626✔
1131
                 header++) {
1132
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,224✔
1133
                if (logger_.Level() >= log::LogLevel::Debug) {
408✔
1134
                        debug_str += string {header->name_string()};
395✔
1135
                        debug_str += ": ";
395✔
1136
                        debug_str += string {header->value()};
395✔
1137
                        debug_str += "\n";
395✔
1138
                }
1139
        }
1140

1141
        logger_.Debug("Received headers:\n" + debug_str);
436✔
1142
        debug_str.clear();
1143

1144
        if (request_data_.http_request_parser_->chunked()) {
218✔
1145
                auto cancelled = cancelled_;
1146
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
1147
                server_.header_handler_(request_);
2✔
1148
                if (!*cancelled) {
1✔
1149
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
1150
                        CallErrorHandler(err, request_, server_.body_handler_);
2✔
1151
                }
1152
                return;
1153
        }
1154

1155
        auto content_length = request_data_.http_request_parser_->content_length();
217✔
1156
        if (content_length) {
217✔
1157
                request_body_length_ = content_length.value();
46✔
1158
        } else {
1159
                request_body_length_ = 0;
171✔
1160
        }
1161
        request_body_read_ = 0;
217✔
1162

1163
        if (request_body_read_ >= request_body_length_) {
217✔
1164
                auto cancelled = cancelled_;
1165
                status_ = TransactionStatus::HeaderHandlerCalled;
171✔
1166
                server_.header_handler_(request_);
342✔
1167
                if (!*cancelled) {
171✔
1168
                        status_ = TransactionStatus::BodyHandlerCalled;
171✔
1169
                        CallBodyHandler();
171✔
1170
                }
1171
                return;
1172
        }
1173

1174
        auto cancelled = cancelled_;
1175
        status_ = TransactionStatus::HeaderHandlerCalled;
46✔
1176
        server_.header_handler_(request_);
92✔
1177
        if (*cancelled) {
46✔
1178
                return;
1179
        }
1180

1181
        // We know that a body reader is required here, because of the `request_body_read_ >=
1182
        // request_body_length_` check above.
1183
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
45✔
1184
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1185
        }
1186
}
1187

1188
void Stream::AsyncReadNextBodyPart(
2,046✔
1189
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1190
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1191

1192
        if (status_ == TransactionStatus::ReaderCreated) {
2,046✔
1193
                status_ = TransactionStatus::BodyReadingInProgress;
44✔
1194
        }
1195

1196
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,046✔
1197
                auto cancelled = cancelled_;
1198
                handler(0);
76✔
1199
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
38✔
1200
                        status_ = TransactionStatus::BodyHandlerCalled;
38✔
1201
                        CallBodyHandler();
38✔
1202
                }
1203
                return;
1204
        }
1205

1206
        reader_buf_start_ = start;
2,008✔
1207
        reader_buf_end_ = end;
2,008✔
1208
        reader_handler_ = handler;
2,008✔
1209
        size_t read_size = end - start;
2,008✔
1210
        size_t smallest = min(body_buffer_.size(), read_size);
3,064✔
1211

1212
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,008✔
1213
        request_data_.http_request_parser_->get().body().size = smallest;
2,008✔
1214

1215
        auto &cancelled = cancelled_;
1216
        auto &request_data = request_data_;
2,008✔
1217

1218
        http::async_read_some(
4,016✔
1219
                socket_,
2,008✔
1220
                *request_data_.request_buffer_,
1221
                *request_data_.http_request_parser_,
1222
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,008✔
1223
                        if (!*cancelled) {
2,008✔
1224
                                ReadBodyHandler(ec, num_read);
2,008✔
1225
                        }
1226
                });
2,008✔
1227
}
1228

1229
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,008✔
1230
        if (num_read > 0) {
2,008✔
1231
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,008✔
1232
                request_body_read_ += num_read;
2,004✔
1233
        }
1234

1235
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,008✔
1236
                // This can be ignored. We always reset the buffer between reads anyway.
1237
                ec = error_code();
979✔
1238
        }
1239

1240
        assert(reader_handler_);
1241

1242
        if (request_body_read_ >= request_body_length_) {
2,008✔
1243
                status_ = TransactionStatus::BodyReadingFinished;
38✔
1244
        }
1245

1246
        auto cancelled = cancelled_;
1247

1248
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,008✔
1249
        size_t smallest = min(num_read, buf_size);
2,008✔
1250
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,008✔
1251
        if (ec) {
2,008✔
1252
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1253
                reader_handler_(expected::unexpected(err));
12✔
1254
        } else {
1255
                reader_handler_(smallest);
4,008✔
1256
        }
1257

1258
        if (!*cancelled && ec) {
2,008✔
1259
                CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1260
                return;
1261
        }
1262
}
1263

1264
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
196✔
1265
        SetupResponse();
196✔
1266

1267
        reply_finished_handler_ = reply_finished_handler;
196✔
1268

1269
        auto &cancelled = cancelled_;
1270
        auto &response_data = response_data_;
196✔
1271

1272
        http::async_write_header(
392✔
1273
                socket_,
196✔
1274
                *response_data_.http_response_serializer_,
1275
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
196✔
1276
                        if (!*cancelled) {
196✔
1277
                                WriteHeaderHandler(ec, num_written);
195✔
1278
                        }
1279
                });
196✔
1280
}
196✔
1281

1282
void Stream::SetupResponse() {
205✔
1283
        auto response = maybe_response_.lock();
205✔
1284
        // Only called from existing responses, so this should always be true.
1285
        assert(response);
1286

1287
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1288
        status_ = TransactionStatus::Replying;
205✔
1289

1290
        // From here on we take shared ownership.
1291
        response_ = response;
1292

1293
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
410✔
1294

1295
        for (const auto &header : response->headers_) {
429✔
1296
                response_data_.http_response_->base().set(header.first, header.second);
224✔
1297
        }
1298

1299
        response_data_.http_response_->result(response->GetStatusCode());
205✔
1300
        response_data_.http_response_->reason(response->GetStatusMessage());
410✔
1301

1302
        response_data_.http_response_serializer_ =
1303
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
410✔
1304
}
205✔
1305

1306
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
195✔
1307
        if (num_written > 0) {
195✔
1308
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
390✔
1309
        }
1310

1311
        if (ec) {
195✔
1312
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1313
                return;
34✔
1314
        }
1315

1316
        auto header = response_->GetHeader("Content-Length");
390✔
1317
        if (!header || header.value() == "0") {
195✔
1318
                FinishReply();
33✔
1319
                return;
1320
        }
1321

1322
        auto length = common::StringToLongLong(header.value());
162✔
1323
        if (!length || length.value() < 0) {
162✔
1324
                auto err = error::Error(
1325
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1326
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1327
                return;
1328
        }
1329

1330
        if (!response_->body_reader_ && !response_->async_body_reader_) {
162✔
1331
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1332
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1333
                return;
1334
        }
1335

1336
        PrepareAndWriteNewBodyBuffer();
161✔
1337
}
1338

1339
void Stream::PrepareAndWriteNewBodyBuffer() {
1,883✔
1340
        // response_->body_reader_ XOR response_->async_body_reader_
1341
        assert(
1342
                (response_->body_reader_ || response_->async_body_reader_)
1343
                && !(response_->body_reader_ && response_->async_body_reader_));
1344

1345
        auto read_handler = [this](io::ExpectedSize read) {
1,884✔
1346
                if (!read) {
1,883✔
1347
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1348
                        return;
1✔
1349
                }
1350
                WriteNewBodyBuffer(read.value());
1,882✔
1351
        };
1,883✔
1352

1353
        if (response_->body_reader_) {
1,883✔
1354
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,218✔
1355
        } else {
1356
                auto err = response_->async_body_reader_->AsyncRead(
1357
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1358
                if (err != error::NoError) {
274✔
1359
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1360
                }
1361
        }
1362
}
1,883✔
1363

1364
void Stream::WriteNewBodyBuffer(size_t size) {
1,882✔
1365
        response_data_.http_response_->body().data = body_buffer_.data();
1,882✔
1366
        response_data_.http_response_->body().size = size;
1,882✔
1367

1368
        if (size > 0) {
1,882✔
1369
                response_data_.http_response_->body().more = true;
1,755✔
1370
        } else {
1371
                response_data_.http_response_->body().more = false;
127✔
1372
        }
1373

1374
        WriteBody();
1,882✔
1375
}
1,882✔
1376

1377
void Stream::WriteBody() {
3,616✔
1378
        auto &cancelled = cancelled_;
1379
        auto &response_data = response_data_;
3,616✔
1380

1381
        http::async_write_some(
7,232✔
1382
                socket_,
3,616✔
1383
                *response_data_.http_response_serializer_,
1384
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
3,580✔
1385
                        if (!*cancelled) {
3,580✔
1386
                                WriteBodyHandler(ec, num_written);
3,580✔
1387
                        }
1388
                });
3,580✔
1389
}
3,616✔
1390

1391
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
3,580✔
1392
        if (num_written > 0) {
3,580✔
1393
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,468✔
1394
        }
1395

1396
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,580✔
1397
                // Write next body block.
1398
                PrepareAndWriteNewBodyBuffer();
1,722✔
1399
        } else if (ec) {
1,858✔
1400
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1401
        } else if (num_written > 0) {
1,854✔
1402
                // We are still writing the body.
1403
                WriteBody();
1,734✔
1404
        } else {
1405
                // We are finished.
1406
                FinishReply();
120✔
1407
        }
1408
}
3,580✔
1409

1410
void Stream::FinishReply() {
153✔
1411
        // We are done.
1412
        *cancelled_ = true;
153✔
1413
        cancelled_ = make_shared<bool>(true);
153✔
1414
        status_ = TransactionStatus::Done;
153✔
1415
        // Release ownership of Body reader.
1416
        response_->body_reader_.reset();
153✔
1417
        response_->async_body_reader_.reset();
153✔
1418
        reply_finished_handler_(error::NoError);
153✔
1419
        server_.RemoveStream(shared_from_this());
153✔
1420
}
153✔
1421

1422
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1423
        SetupResponse();
9✔
1424

1425
        switch_protocol_handler_ = handler;
9✔
1426
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1427

1428
        auto &cancelled = cancelled_;
1429
        auto &response_data = response_data_;
9✔
1430

1431
        http::async_write_header(
18✔
1432
                socket_,
9✔
1433
                *response_data_.http_response_serializer_,
1434
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1435
                        if (!*cancelled) {
9✔
1436
                                SwitchingProtocolHandler(ec, num_written);
8✔
1437
                        }
1438
                });
9✔
1439

1440
        return error::NoError;
9✔
1441
}
1442

1443
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1444
        if (num_written > 0) {
8✔
1445
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1446
        }
1447

1448
        if (ec) {
8✔
1449
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1450
                return;
×
1451
        }
1452

1453
        auto socket = make_shared<RawSocket<tcp::socket>>(
1454
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1455

1456
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1457

1458
        // Rest of the connection is done directly on the socket, we are done here.
1459
        status_ = TransactionStatus::Done;
8✔
1460
        *cancelled_ = true;
8✔
1461
        cancelled_ = make_shared<bool>(true);
8✔
1462
        server_.RemoveStream(shared_from_this());
16✔
1463

1464
        switch_protocol_handler(socket);
16✔
1465
}
1466

1467
void Stream::CallBodyHandler() {
209✔
1468
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1469
        // it immediately destroys, which would destroy this stream as well. At the end of this
1470
        // function, it's ok to destroy it.
1471
        auto stream_ref = shared_from_this();
1472

1473
        server_.body_handler_(request_, error::NoError);
627✔
1474

1475
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1476
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1477
        // request has not been handled correctly.
1478
        auto response = maybe_response_.lock();
209✔
1479
        if (!response) {
209✔
1480
                logger_.Error("Handler produced no response. Closing stream prematurely.");
4✔
1481
                *cancelled_ = true;
2✔
1482
                cancelled_ = make_shared<bool>(true);
2✔
1483
                server_.RemoveStream(shared_from_this());
6✔
1484
        }
1485
}
209✔
1486

1487
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
218✔
1488
        event_loop_ {event_loop},
1489
        acceptor_(GetAsioIoContext(event_loop_)) {
390✔
1490
}
218✔
1491

1492
Server::~Server() {
436✔
1493
        Cancel();
218✔
1494
}
218✔
1495

1496
error::Error Server::AsyncServeUrl(
186✔
1497
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1498
        return AsyncServeUrl(
1499
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
760✔
1500
                        if (err != error::NoError) {
206✔
1501
                                body_handler(expected::unexpected(err));
14✔
1502
                        } else {
1503
                                body_handler(req);
398✔
1504
                        }
1505
                });
578✔
1506
}
1507

1508
error::Error Server::AsyncServeUrl(
199✔
1509
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1510
        auto err = BreakDownUrl(url, address_);
199✔
1511
        if (error::NoError != err) {
199✔
1512
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1513
        }
1514

1515
        if (address_.protocol != "http") {
199✔
1516
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1517
        }
1518

1519
        if (address_.path.size() > 0 && address_.path != "/") {
199✔
1520
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1521
        }
1522

1523
        boost::system::error_code ec;
198✔
1524
        auto address = asio::ip::make_address(address_.host, ec);
198✔
1525
        if (ec) {
198✔
1526
                return error::Error(
1527
                        ec.default_error_condition(),
×
1528
                        "Could not construct endpoint from address " + address_.host);
×
1529
        }
1530

1531
        asio::ip::tcp::endpoint endpoint(address, address_.port);
198✔
1532

1533
        ec.clear();
1534
        acceptor_.open(endpoint.protocol(), ec);
198✔
1535
        if (ec) {
198✔
1536
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1537
        }
1538

1539
        // Allow address reuse, otherwise we can't re-bind later.
1540
        ec.clear();
1541
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
198✔
1542
        if (ec) {
198✔
1543
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1544
        }
1545

1546
        ec.clear();
1547
        acceptor_.bind(endpoint, ec);
198✔
1548
        if (ec) {
198✔
1549
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1550
        }
1551

1552
        ec.clear();
1553
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
198✔
1554
        if (ec) {
198✔
1555
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1556
        }
1557

1558
        header_handler_ = header_handler;
198✔
1559
        body_handler_ = body_handler;
198✔
1560

1561
        PrepareNewStream();
198✔
1562

1563
        return error::NoError;
198✔
1564
}
1565

1566
void Server::Cancel() {
233✔
1567
        if (acceptor_.is_open()) {
233✔
1568
                acceptor_.cancel();
198✔
1569
                acceptor_.close();
198✔
1570
        }
1571
        streams_.clear();
1572
}
233✔
1573

1574
uint16_t Server::GetPort() const {
13✔
1575
        return acceptor_.local_endpoint().port();
13✔
1576
}
1577

1578
string Server::GetUrl() const {
12✔
1579
        return "http://127.0.0.1:" + to_string(GetPort());
24✔
1580
}
1581

1582
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
208✔
1583
        if (*req->cancelled_) {
208✔
1584
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1585
        }
1586
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
416✔
1587
        req->stream_.maybe_response_ = response;
208✔
1588
        return response;
208✔
1589
}
1590

1591
error::Error Server::AsyncReply(
196✔
1592
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1593
        if (*resp->cancelled_) {
196✔
1594
                return MakeError(StreamCancelledError, "Cannot send response");
×
1595
        }
1596

1597
        resp->stream_.AsyncReply(reply_finished_handler);
196✔
1598
        return error::NoError;
196✔
1599
}
1600

1601
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
60✔
1602
        if (*req->cancelled_) {
60✔
1603
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1604
        }
1605

1606
        auto &stream = req->stream_;
60✔
1607
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
60✔
1608
                return expected::unexpected(error::Error(
1✔
1609
                        make_error_condition(errc::operation_in_progress),
2✔
1610
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1611
        }
1612

1613
        if (stream.request_body_length_ == 0) {
59✔
1614
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
45✔
1615
        }
1616

1617
        stream.status_ = TransactionStatus::ReaderCreated;
44✔
1618
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
88✔
1619
}
1620

1621
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1622
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1623
}
1624

1625
void Server::PrepareNewStream() {
416✔
1626
        StreamPtr new_stream {new Stream(*this)};
416✔
1627
        streams_.insert(new_stream);
1628
        AsyncAccept(new_stream);
832✔
1629
}
416✔
1630

1631
void Server::AsyncAccept(StreamPtr stream) {
416✔
1632
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
637✔
1633
                if (ec) {
221✔
1634
                        log::Error("Could not accept connection: " + ec.message());
6✔
1635
                        return;
3✔
1636
                }
1637

1638
                stream->AcceptHandler(ec);
218✔
1639

1640
                this->PrepareNewStream();
218✔
1641
        });
1642
}
416✔
1643

1644
void Server::RemoveStream(StreamPtr stream) {
183✔
1645
        streams_.erase(stream);
183✔
1646

1647
        stream->DoCancel();
183✔
1648
}
183✔
1649

1650
} // namespace http
1651
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc