• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1025630817

04 Oct 2023 02:21PM UTC coverage: 80.272% (+0.2%) from 80.085%
1025630817

push

gitlab-ci

lluiscampos
feat: Implement state script retry-later functionality

Ticket: MEN-6677
Changelog: None

Signed-off-by: Lluis Campos <lluis.campos@northern.tech>

44 of 44 new or added lines in 2 files covered. (100.0%)

6482 of 8075 relevant lines covered (80.27%)

10723.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.98
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24

25
namespace mender {
26
namespace http {
27

28
namespace common = mender::common;
29

30
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
31
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
32
const unsigned int BeastHttpVersion = 11;
33

34
namespace asio = boost::asio;
35
namespace http = boost::beast::http;
36

37
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
38

39
static http::verb MethodToBeastVerb(Method method) {
222✔
40
        switch (method) {
222✔
41
        case Method::GET:
42
                return http::verb::get;
43
        case Method::HEAD:
44
                return http::verb::head;
45
        case Method::POST:
46
                return http::verb::post;
47
        case Method::PUT:
48
                return http::verb::put;
49
        case Method::PATCH:
50
                return http::verb::patch;
51
        case Method::CONNECT:
52
                return http::verb::connect;
53
        case Method::Invalid:
54
                // Fallthrough to end (no-op).
55
                break;
56
        }
57
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
58
        // still assert here for safety.
59
        assert(false);
60
        return http::verb::get;
61
}
62

63
static expected::expected<Method, error::Error> BeastVerbToMethod(
218✔
64
        http::verb verb, const string &verb_string) {
65
        switch (verb) {
218✔
66
        case http::verb::get:
176✔
67
                return Method::GET;
68
        case http::verb::head:
×
69
                return Method::HEAD;
70
        case http::verb::post:
20✔
71
                return Method::POST;
72
        case http::verb::put:
22✔
73
                return Method::PUT;
74
        case http::verb::patch:
×
75
                return Method::PATCH;
76
        case http::verb::connect:
×
77
                return Method::CONNECT;
78
        default:
×
79
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
80
        }
81
}
82

83
template <typename StreamType>
84
class BodyAsyncReader : virtual public io::AsyncReader {
85
public:
86
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
145✔
87
                stream_ {stream},
88
                cancelled_ {cancelled} {
290✔
89
        }
145✔
90
        ~BodyAsyncReader() {
44✔
91
                Cancel();
44✔
92
        }
88✔
93

94
        error::Error AsyncRead(
2,046✔
95
                vector<uint8_t>::iterator start,
96
                vector<uint8_t>::iterator end,
97
                io::AsyncIoHandler handler) override {
98
                if (*cancelled_) {
2,046✔
99
                        return error::MakeError(
×
100
                                error::ProgrammingError,
101
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
102
                }
103
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,046✔
104
                return error::NoError;
2,046✔
105
        }
106

107
        void Cancel() override {
46✔
108
                if (!*cancelled_) {
46✔
109
                        stream_.Cancel();
4✔
110
                }
111
        }
46✔
112

113
private:
114
        StreamType &stream_;
115
        shared_ptr<bool> cancelled_;
116

117
        friend class Client;
118
        friend class Server;
119
};
120

121
template <typename StreamType>
122
class RawSocket : virtual public io::AsyncReadWriter {
123
public:
124
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
125
                destroying_ {make_shared<bool>(false)},
×
126
                stream_ {stream},
127
                buffered_ {buffered} {
×
128
                // If there are no buffered bytes, then we don't need it.
129
                if (buffered_ && buffered_->size() == 0) {
×
130
                        buffered_.reset();
×
131
                }
132
        }
×
133

134
        ~RawSocket() {
15✔
135
                *destroying_ = true;
15✔
136
                Cancel();
15✔
137
        }
30✔
138

139
        error::Error AsyncRead(
320✔
140
                vector<uint8_t>::iterator start,
141
                vector<uint8_t>::iterator end,
142
                io::AsyncIoHandler handler) override {
143
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
144
                // header and parts of the body in one block, return those first.
145
                if (buffered_) {
320✔
146
                        return DrainPrebufferedData(start, end, handler);
8✔
147
                }
148

149
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
150
                auto &destroying = destroying_;
151
                stream_->async_read_some(
632✔
152
                        read_buffer_,
316✔
153
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
154
                                if (*destroying) {
313✔
155
                                        return;
156
                                }
157

158
                                if (ec == asio::error::operation_aborted) {
313✔
159
                                        handler(expected::unexpected(error::Error(
12✔
160
                                                make_error_condition(errc::operation_canceled),
6✔
161
                                                "Could not read from socket")));
162
                                } else if (ec) {
310✔
163
                                        handler(expected::unexpected(
12✔
164
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
165
                                } else {
166
                                        handler(num_read);
608✔
167
                                }
168
                        });
169
                return error::NoError;
316✔
170
        }
171

172
        error::Error AsyncWrite(
309✔
173
                vector<uint8_t>::const_iterator start,
174
                vector<uint8_t>::const_iterator end,
175
                io::AsyncIoHandler handler) override {
176
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
177
                auto &destroying = destroying_;
178
                stream_->async_write_some(
618✔
179
                        write_buffer_,
309✔
180
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
181
                                if (*destroying) {
306✔
182
                                        return;
183
                                }
184

185
                                if (ec == asio::error::operation_aborted) {
306✔
186
                                        handler(expected::unexpected(error::Error(
×
187
                                                make_error_condition(errc::operation_canceled),
×
188
                                                "Could not write to socket")));
189
                                } else if (ec) {
306✔
190
                                        handler(expected::unexpected(
×
191
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
192
                                } else {
193
                                        handler(num_written);
612✔
194
                                }
195
                        });
196
                return error::NoError;
309✔
197
        }
198

199
        void Cancel() override {
28✔
200
                if (stream_->lowest_layer().is_open()) {
28✔
201
                        stream_->lowest_layer().cancel();
15✔
202
                        stream_->lowest_layer().close();
15✔
203
                }
204
        }
28✔
205

206
private:
207
        error::Error DrainPrebufferedData(
4✔
208
                vector<uint8_t>::iterator start,
209
                vector<uint8_t>::iterator end,
210
                io::AsyncIoHandler handler) {
211
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
8✔
212
                copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
4✔
213
                buffered_->consume(to_copy);
4✔
214
                if (buffered_->size() == 0) {
4✔
215
                        // We don't need it anymore.
216
                        buffered_.reset();
4✔
217
                }
218
                handler(to_copy);
4✔
219
                return error::NoError;
4✔
220
        }
221

222
        shared_ptr<bool> destroying_;
223
        shared_ptr<StreamType> stream_;
224
        shared_ptr<beast::flat_buffer> buffered_;
225
        asio::mutable_buffer read_buffer_;
226
        asio::const_buffer write_buffer_;
227
};
228

229
Client::Client(
396✔
230
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
231
        event_loop_ {event_loop},
232
        logger_name_ {logger_name},
233
        cancelled_ {make_shared<bool>(true)},
396✔
234
        resolver_(GetAsioIoContext(event_loop)),
235
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,392✔
236
        ssl_ctx_.set_verify_mode(ssl::verify_peer);
396✔
237

238
        if (client.client_cert_path != "" and client.client_cert_key_path != "") {
396✔
239
                ssl_ctx_.set_options(boost::asio::ssl::context::default_workarounds);
1✔
240
                ssl_ctx_.use_certificate_file(client.client_cert_path, boost::asio::ssl::context_base::pem);
1✔
241
                ssl_ctx_.use_private_key_file(
1✔
242
                        client.client_cert_key_path, boost::asio::ssl::context_base::pem);
243
        }
244

245
        beast::error_code ec {};
396✔
246
        ssl_ctx_.set_default_verify_paths(ec); // Load the default CAs
396✔
247
        if (ec) {
396✔
248
                log::Error("Failed to load the SSL default directory");
×
249
        }
250
        if (client.server_cert_path != "") {
396✔
251
                ssl_ctx_.load_verify_file(client.server_cert_path, ec);
4✔
252
                if (ec) {
4✔
253
                        log::Error("Failed to load the server certificate!");
×
254
                }
255
        }
256
}
396✔
257

258
Client::~Client() {
1,584✔
259
        if (!*cancelled_) {
396✔
260
                logger_.Warning("Client destroyed while request is still active!");
24✔
261
        }
262
        DoCancel();
396✔
263
}
396✔
264

265
error::Error Client::AsyncCall(
240✔
266
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
267
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
240✔
268
                return error::Error(
269
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
270
        }
271

272
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
240✔
273
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
274
        }
275

276
        if (!header_handler || !body_handler) {
238✔
277
                return error::MakeError(
278
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
279
        }
280

281
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
237✔
282
                return error::Error(
283
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
1✔
284
        }
285

286
        if (req->address_.protocol == "https") {
236✔
287
                is_https_ = true;
5✔
288
        }
289

290
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
236✔
291

292
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
293
        // request to route to our k8s cluster. Set this in all cases.
294
        req->SetHeader("HOST", req->address_.host);
472✔
295

296
        request_ = req;
297
        header_handler_ = header_handler;
236✔
298
        body_handler_ = body_handler;
236✔
299
        status_ = TransactionStatus::None;
236✔
300

301
        cancelled_ = make_shared<bool>(false);
236✔
302

303
        auto &cancelled = cancelled_;
304

305
        resolver_.async_resolve(
472✔
306
                request_->address_.host,
307
                to_string(request_->address_.port),
472✔
308
                [this, cancelled](
472✔
309
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
235✔
310
                        if (!*cancelled) {
236✔
311
                                ResolveHandler(ec, results);
235✔
312
                        }
313
                });
236✔
314

315
        return error::NoError;
236✔
316
}
317

318
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
163✔
319
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
163✔
320
                return expected::unexpected(error::Error(
2✔
321
                        make_error_condition(errc::operation_in_progress),
4✔
322
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
323
        }
324

325
        if (response_body_length_ == 0) {
161✔
326
                return expected::unexpected(
16✔
327
                        MakeError(BodyMissingError, "Response does not contain a body"));
48✔
328
        }
329

330
        status_ = TransactionStatus::ReaderCreated;
145✔
331
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
290✔
332
}
333

334
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
335
        if (*cancelled_) {
7✔
336
                return expected::unexpected(error::Error(
×
337
                        make_error_condition(errc::not_connected),
×
338
                        "Cannot switch protocols if endpoint is not connected"));
×
339
        }
340

341
        // Rest of the connection is done directly on the socket, we are done here.
342
        status_ = TransactionStatus::Done;
7✔
343
        *cancelled_ = true;
7✔
344
        cancelled_ = make_shared<bool>(false);
14✔
345

346
        auto stream = stream_;
347
        // This no longer belongs to us.
348
        stream_.reset();
7✔
349

350
        if (is_https_) {
7✔
351
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(
×
352
                        stream, response_data_.response_buffer_);
×
353
        } else {
354
                return make_shared<RawSocket<tcp::socket>>(
7✔
355
                        make_shared<tcp::socket>(std::move(stream->next_layer())),
14✔
356
                        response_data_.response_buffer_);
7✔
357
        }
358
}
359

360
void Client::CallHandler(ResponseHandler handler) {
284✔
361
        // This function exists to make sure we have a copy of the handler we're calling (in the
362
        // argument list). This is important in case the handler owns the client instance through a
363
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
364
        // does, then it destroys the final copy of the handler, and therefore also the client,
365
        // which is why we need to make a copy here, before calling it.
366
        handler(response_);
284✔
367
}
284✔
368

369
void Client::CallErrorHandler(
24✔
370
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
371
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
72✔
372
}
24✔
373

374
void Client::CallErrorHandler(
151✔
375
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
376
        *cancelled_ = true;
151✔
377
        cancelled_ = make_shared<bool>(true);
151✔
378
        stream_.reset();
151✔
379
        status_ = TransactionStatus::Done;
151✔
380
        handler(expected::unexpected(
302✔
381
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
604✔
382
}
151✔
383

384
void Client::ResolveHandler(
235✔
385
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
386
        if (ec) {
235✔
387
                CallErrorHandler(ec, request_, header_handler_);
×
388
                return;
×
389
        }
390

391
        if (logger_.Level() >= log::LogLevel::Debug) {
235✔
392
                string ips = "[";
232✔
393
                string sep;
394
                for (auto r : results) {
954✔
395
                        ips += sep;
245✔
396
                        ips += r.endpoint().address().to_string();
245✔
397
                        sep = ", ";
245✔
398
                }
399
                ips += "]";
232✔
400
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
464✔
401
        }
402

403
        resolver_results_ = results;
404

405
        stream_ = make_shared<ssl::stream<tcp::socket>>(GetAsioIoContext(event_loop_), ssl_ctx_);
470✔
406

407
        if (!response_data_.response_buffer_) {
235✔
408
                // We can reuse this if preexisting.
409
                response_data_.response_buffer_ = make_shared<beast::flat_buffer>();
338✔
410

411
                // This is equivalent to:
412
                //   response_data_.response_buffer_.reserve(body_buffer_.size());
413
                // but compatible with Boost 1.67.
414
                response_data_.response_buffer_->prepare(
415
                        body_buffer_.size() - response_data_.response_buffer_->size());
169✔
416
        }
417

418
        response_data_.http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
470✔
419

420
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
421
        // if they do, they should be handled higher up in the application logic.
422
        //
423
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
424
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
425
        // `has_value()` in their code, causing their subsequent comparison operation to
426
        // misbehave. So pass highest possible value instead.
427
        response_data_.http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
428

429
        auto &cancelled = cancelled_;
430

431
        asio::async_connect(
235✔
432
                stream_->next_layer(),
433
                resolver_results_,
235✔
434
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
470✔
435
                        if (!*cancelled) {
235✔
436
                                if (is_https_) {
235✔
437
                                        return HandshakeHandler(ec, endpoint);
5✔
438
                                }
439
                                return ConnectHandler(ec, endpoint);
230✔
440
                        }
441
                });
442
}
443

444
void Client::HandshakeHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
5✔
445
        if (ec) {
5✔
446
                CallErrorHandler(ec, request_, header_handler_);
×
447
                return;
×
448
        }
449

450
        // Set SNI Hostname (many hosts need this to handshake successfully)
451
        if (!SSL_set_tlsext_host_name(stream_->native_handle(), request_->address_.host.c_str())) {
5✔
452
                beast::error_code ec2 {
453
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
454
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
455
        }
456

457
        auto &cancelled = cancelled_;
458

459
        stream_->async_handshake(
10✔
460
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
5✔
461
                        if (*cancelled) {
5✔
462
                                return;
463
                        }
464
                        if (ec) {
5✔
465
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
2✔
466
                                CallErrorHandler(ec, request_, header_handler_);
1✔
467
                                return;
1✔
468
                        }
469
                        logger_.Debug("https: Successful SSL handshake");
8✔
470
                        ConnectHandler(ec, endpoint);
4✔
471
                });
472
}
473

474

475
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
234✔
476
        if (ec) {
234✔
477
                CallErrorHandler(ec, request_, header_handler_);
12✔
478
                return;
12✔
479
        }
480

481
        logger_.Debug("Connected to " + endpoint.address().to_string());
444✔
482

483
        request_data_.http_request_ = make_shared<http::request<http::buffer_body>>(
222✔
484
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
444✔
485

486
        for (const auto &header : request_->headers_) {
634✔
487
                request_data_.http_request_->set(header.first, header.second);
412✔
488
        }
489

490
        request_data_.http_request_serializer_ =
491
                make_shared<http::request_serializer<http::buffer_body>>(*request_data_.http_request_);
222✔
492

493
        auto &cancelled = cancelled_;
494
        auto &request_data = request_data_;
222✔
495

496
        if (is_https_) {
222✔
497
                http::async_write_header(
4✔
498
                        *stream_,
499
                        *request_data_.http_request_serializer_,
500
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
4✔
501
                                if (!*cancelled) {
4✔
502
                                        WriteHeaderHandler(ec, num_written);
4✔
503
                                }
504
                        });
4✔
505
        } else {
506
                http::async_write_header(
218✔
507
                        stream_->next_layer(),
508
                        *request_data_.http_request_serializer_,
509
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
218✔
510
                                if (!*cancelled) {
218✔
511
                                        WriteHeaderHandler(ec, num_written);
218✔
512
                                }
513
                        });
218✔
514
        }
515
}
516

517
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
222✔
518
        if (num_written > 0) {
222✔
519
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
444✔
520
        }
521

522
        if (ec) {
222✔
523
                CallErrorHandler(ec, request_, header_handler_);
×
524
                return;
177✔
525
        }
526

527
        auto header = request_->GetHeader("Content-Length");
444✔
528
        if (!header || header.value() == "0") {
222✔
529
                ReadHeader();
176✔
530
                return;
531
        }
532

533
        auto length = common::StringToLongLong(header.value());
46✔
534
        if (!length || length.value() < 0) {
46✔
535
                auto err = error::Error(
536
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
537
                CallErrorHandler(err, request_, header_handler_);
×
538
                return;
539
        }
540
        request_body_length_ = length.value();
46✔
541

542
        if (!request_->body_gen_ && !request_->async_body_gen_) {
46✔
543
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
544
                CallErrorHandler(err, request_, header_handler_);
2✔
545
                return;
546
        }
547

548
        assert(!(request_->body_gen_ && request_->async_body_gen_));
549

550
        if (request_->body_gen_) {
45✔
551
                auto body_reader = request_->body_gen_();
39✔
552
                if (!body_reader) {
39✔
553
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
554
                        return;
555
                }
556
                request_->body_reader_ = body_reader.value();
39✔
557
        } else {
558
                auto body_reader = request_->async_body_gen_();
6✔
559
                if (!body_reader) {
6✔
560
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
561
                        return;
562
                }
563
                request_->async_body_reader_ = body_reader.value();
6✔
564
        }
565

566
        PrepareAndWriteNewBodyBuffer();
45✔
567
}
568

569
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,142✔
570
        if (num_written > 0) {
2,142✔
571
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,100✔
572
        }
573

574
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,142✔
575
                // Write next block of the body.
576
                PrepareAndWriteNewBodyBuffer();
1,050✔
577
        } else if (ec) {
1,092✔
578
                CallErrorHandler(ec, request_, header_handler_);
8✔
579
        } else if (num_written > 0) {
1,088✔
580
                // We are still writing the body.
581
                WriteBody();
1,050✔
582
        } else {
583
                // We are ready to receive the response.
584
                ReadHeader();
38✔
585
        }
586
}
2,142✔
587

588
void Client::PrepareAndWriteNewBodyBuffer() {
1,095✔
589
        // request_->body_reader_ XOR request_->async_body_reader_
590
        assert(
591
                (request_->body_reader_ || request_->async_body_reader_)
592
                && !(request_->body_reader_ && request_->async_body_reader_));
593

594
        auto cancelled = cancelled_;
595
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,462✔
596
                if (!*cancelled) {
1,095✔
597
                        if (!read) {
1,094✔
598
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
599
                                return;
2✔
600
                        }
601
                        WriteNewBodyBuffer(read.value());
1,092✔
602
                }
603
        };
1,095✔
604

605

606
        if (request_->body_reader_) {
1,095✔
607
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,340✔
608
        } else {
609
                auto err = request_->async_body_reader_->AsyncRead(
610
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
611
                if (err != error::NoError) {
425✔
612
                        CallErrorHandler(err, request_, header_handler_);
×
613
                }
614
        }
615
}
1,095✔
616

617
void Client::WriteNewBodyBuffer(size_t size) {
1,092✔
618
        request_data_.http_request_->body().data = body_buffer_.data();
1,092✔
619
        request_data_.http_request_->body().size = size;
1,092✔
620

621
        if (size > 0) {
1,092✔
622
                request_data_.http_request_->body().more = true;
1,054✔
623
        } else {
624
                // Release ownership of Body reader.
625
                request_->body_reader_.reset();
38✔
626
                request_->async_body_reader_.reset();
38✔
627
                request_data_.http_request_->body().more = false;
38✔
628
        }
629

630
        WriteBody();
1,092✔
631
}
1,092✔
632

633
void Client::WriteBody() {
2,142✔
634
        auto &cancelled = cancelled_;
635
        auto &request_data = request_data_;
2,142✔
636

637
        if (is_https_) {
2,142✔
638
                http::async_write_some(
×
639
                        *stream_,
640
                        *request_data_.http_request_serializer_,
641
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
×
642
                                if (!*cancelled) {
×
643
                                        WriteBodyHandler(ec, num_written);
×
644
                                }
645
                        });
×
646
        } else {
647
                http::async_write_some(
4,284✔
648
                        stream_->next_layer(),
649
                        *request_data_.http_request_serializer_,
650
                        [this, cancelled, request_data](const error_code &ec, size_t num_written) {
2,142✔
651
                                if (!*cancelled) {
2,142✔
652
                                        WriteBodyHandler(ec, num_written);
2,142✔
653
                                }
654
                        });
2,142✔
655
        }
656
}
2,142✔
657

658
void Client::ReadHeader() {
214✔
659
        auto &cancelled = cancelled_;
660
        auto &response_data = response_data_;
214✔
661

662
        if (is_https_) {
214✔
663
                http::async_read_some(
4✔
664
                        *stream_,
665
                        *response_data_.response_buffer_,
666
                        *response_data_.http_response_parser_,
667
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
4✔
668
                                if (!*cancelled) {
4✔
669
                                        ReadHeaderHandler(ec, num_read);
4✔
670
                                }
671
                        });
4✔
672
        } else {
673
                http::async_read_some(
210✔
674
                        stream_->next_layer(),
675
                        *response_data_.response_buffer_,
676
                        *response_data_.http_response_parser_,
677
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
209✔
678
                                if (!*cancelled) {
209✔
679
                                        ReadHeaderHandler(ec, num_read);
209✔
680
                                }
681
                        });
209✔
682
        }
683
}
214✔
684

685
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
213✔
686
        if (num_read > 0) {
213✔
687
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
416✔
688
        }
689

690
        if (ec) {
213✔
691
                CallErrorHandler(ec, request_, header_handler_);
5✔
692
                return;
67✔
693
        }
694

695
        if (!response_data_.http_response_parser_->is_header_done()) {
208✔
696
                ReadHeader();
×
697
                return;
×
698
        }
699

700
        response_.reset(new IncomingResponse(*this, cancelled_));
416✔
701
        response_->status_code_ = response_data_.http_response_parser_->get().result_int();
208✔
702
        response_->status_message_ = string {response_data_.http_response_parser_->get().reason()};
208✔
703

704
        logger_.Debug(
416✔
705
                "Received response: " + to_string(response_->status_code_) + " "
416✔
706
                + response_->status_message_);
624✔
707

708
        string debug_str;
709
        for (auto header = response_data_.http_response_parser_->get().cbegin();
238✔
710
                 header != response_data_.http_response_parser_->get().cend();
446✔
711
                 header++) {
712
                response_->headers_[string {header->name_string()}] = string {header->value()};
714✔
713
                if (logger_.Level() >= log::LogLevel::Debug) {
238✔
714
                        debug_str += string {header->name_string()};
235✔
715
                        debug_str += ": ";
235✔
716
                        debug_str += string {header->value()};
235✔
717
                        debug_str += "\n";
235✔
718
                }
719
        }
720

721
        logger_.Debug("Received headers:\n" + debug_str);
416✔
722
        debug_str.clear();
723

724
        if (response_data_.http_response_parser_->chunked()) {
208✔
725
                auto cancelled = cancelled_;
726
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
727
                CallHandler(header_handler_);
2✔
728
                if (!*cancelled) {
1✔
729
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
730
                        CallErrorHandler(err, request_, body_handler_);
2✔
731
                }
732
                return;
733
        }
734

735
        auto content_length = response_data_.http_response_parser_->content_length();
207✔
736
        if (content_length) {
207✔
737
                response_body_length_ = content_length.value();
179✔
738
        } else {
739
                response_body_length_ = 0;
28✔
740
        }
741
        response_body_read_ = 0;
207✔
742

743
        if (response_body_read_ >= response_body_length_) {
207✔
744
                auto cancelled = cancelled_;
745
                status_ = TransactionStatus::HeaderHandlerCalled;
42✔
746
                CallHandler(header_handler_);
84✔
747
                if (!*cancelled) {
42✔
748
                        status_ = TransactionStatus::Done;
36✔
749
                        CallHandler(body_handler_);
72✔
750

751
                        // After body handler has run, set the request to cancelled. The body
752
                        // handler may have made a new request, so this is not necessarily the same
753
                        // request as is currently active (note use of shared_ptr copy, not
754
                        // `cancelled_`).
755
                        *cancelled = true;
36✔
756
                }
757
                return;
758
        }
759

760
        auto cancelled = cancelled_;
761
        status_ = TransactionStatus::HeaderHandlerCalled;
165✔
762
        CallHandler(header_handler_);
330✔
763
        if (*cancelled) {
165✔
764
                return;
765
        }
766

767
        // We know that a body reader is required here, because of the `response_body_read_ >=
768
        // response_body_length_` check above.
769
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
146✔
770
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
4✔
771
        }
772
}
773

774
void Client::AsyncReadNextBodyPart(
3,809✔
775
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
776
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
777

778
        if (status_ == TransactionStatus::ReaderCreated) {
3,809✔
779
                status_ = TransactionStatus::BodyReadingInProgress;
144✔
780
        }
781

782
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
3,809✔
783
                auto cancelled = cancelled_;
784
                handler(0);
80✔
785
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
40✔
786
                        status_ = TransactionStatus::Done;
40✔
787
                        CallHandler(body_handler_);
80✔
788

789
                        // After body handler has run, set the request to cancelled. The body
790
                        // handler may have made a new request, so this is not necessarily the same
791
                        // request as is currently active (note use of shared_ptr copy, not
792
                        // `cancelled_`).
793
                        *cancelled = true;
40✔
794
                }
795
                return;
796
        }
797

798
        reader_buf_start_ = start;
3,769✔
799
        reader_buf_end_ = end;
3,769✔
800
        reader_handler_ = handler;
3,769✔
801
        size_t read_size = end - start;
3,769✔
802
        size_t smallest = min(body_buffer_.size(), read_size);
5,882✔
803

804
        response_data_.http_response_parser_->get().body().data = body_buffer_.data();
3,769✔
805
        response_data_.http_response_parser_->get().body().size = smallest;
3,769✔
806

807
        auto &cancelled = cancelled_;
808
        auto &response_data = response_data_;
3,769✔
809

810
        if (is_https_) {
3,769✔
811
                http::async_read_some(
×
812
                        *stream_,
813
                        *response_data_.response_buffer_,
814
                        *response_data_.http_response_parser_,
815
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
×
816
                                if (!*cancelled) {
×
817
                                        ReadBodyHandler(ec, num_read);
×
818
                                }
819
                        });
×
820
        } else {
821
                http::async_read_some(
3,769✔
822
                        stream_->next_layer(),
823
                        *response_data_.response_buffer_,
824
                        *response_data_.http_response_parser_,
825
                        [this, cancelled, response_data](const error_code &ec, size_t num_read) {
3,768✔
826
                                if (!*cancelled) {
3,768✔
827
                                        ReadBodyHandler(ec, num_read);
3,768✔
828
                                }
829
                        });
3,768✔
830
        }
831
}
832

833
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
3,768✔
834
        if (num_read > 0) {
3,768✔
835
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
7,438✔
836
                response_body_read_ += num_read;
3,719✔
837
        }
838

839
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,768✔
840
                // This can be ignored. We always reset the buffer between reads anyway.
841
                ec = error_code();
1,958✔
842
        }
843

844
        assert(reader_handler_);
845

846
        if (response_body_read_ >= response_body_length_) {
3,768✔
847
                status_ = TransactionStatus::BodyReadingFinished;
92✔
848
        }
849

850
        auto cancelled = cancelled_;
851

852
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
3,768✔
853
        size_t smallest = min(num_read, buf_size);
3,768✔
854
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,768✔
855
        if (ec) {
3,768✔
856
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
98✔
857
                reader_handler_(expected::unexpected(err));
147✔
858
        } else {
859
                reader_handler_(smallest);
7,438✔
860
        }
861

862
        if (!*cancelled && ec) {
3,768✔
863
                CallErrorHandler(ec, request_, body_handler_);
4✔
864
                return;
865
        }
866
}
867

868
void Client::Cancel() {
192✔
869
        auto cancelled = cancelled_;
870

871
        if (!*cancelled) {
192✔
872
                auto err =
873
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
246✔
874
                switch (status_) {
123✔
875
                case TransactionStatus::None:
1✔
876
                        CallErrorHandler(err, request_, header_handler_);
1✔
877
                        break;
1✔
878
                case TransactionStatus::HeaderHandlerCalled:
120✔
879
                case TransactionStatus::ReaderCreated:
880
                case TransactionStatus::BodyReadingInProgress:
881
                case TransactionStatus::BodyReadingFinished:
882
                        CallErrorHandler(err, request_, body_handler_);
120✔
883
                        break;
120✔
884
                case TransactionStatus::Replying:
885
                case TransactionStatus::SwitchingProtocol:
886
                        // Not used by client.
887
                        assert(false);
888
                        break;
889
                case TransactionStatus::BodyHandlerCalled:
890
                case TransactionStatus::Done:
891
                        break;
892
                }
893
        }
894

895
        if (!*cancelled) {
192✔
896
                DoCancel();
2✔
897
        }
898
}
192✔
899

900
void Client::DoCancel() {
398✔
901
        resolver_.cancel();
398✔
902
        if (stream_) {
398✔
903
                stream_->next_layer().cancel();
65✔
904
                stream_->next_layer().close();
65✔
905
                stream_.reset();
65✔
906
        }
907

908
        request_.reset();
398✔
909
        response_.reset();
398✔
910

911
        // Reset logger to no connection.
912
        logger_ = log::Logger(logger_name_);
398✔
913

914
        // Set cancel state and then make a new one. Those who are interested should have their own
915
        // pointer to the old one.
916
        *cancelled_ = true;
398✔
917
        cancelled_ = make_shared<bool>(true);
398✔
918
}
398✔
919

920
ClientConfig::ClientConfig() :
91✔
921
        ClientConfig("") {
182✔
922
}
91✔
923

924
ClientConfig::ClientConfig(
377✔
925
        const string &server_cert_path,
926
        const string &client_cert_path,
927
        const string &client_cert_key_path) :
377✔
928
        server_cert_path {server_cert_path},
929
        client_cert_path {client_cert_path},
930
        client_cert_key_path {client_cert_key_path} {};
377✔
931

932
ClientConfig::~ClientConfig() {
377✔
933
}
377✔
934

935
ServerConfig::ServerConfig() {
184✔
936
}
184✔
937

938
ServerConfig::~ServerConfig() {
184✔
939
}
184✔
940

941
Stream::Stream(Server &server) :
414✔
942
        server_ {server},
943
        logger_ {"http"},
944
        cancelled_(make_shared<bool>(true)),
414✔
945
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
414✔
946
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,242✔
947
        request_data_.request_buffer_ = make_shared<beast::flat_buffer>();
828✔
948

949
        // This is equivalent to:
950
        //   request_data_.request_buffer_.reserve(body_buffer_.size());
951
        // but compatible with Boost 1.67.
952
        request_data_.request_buffer_->prepare(
953
                body_buffer_.size() - request_data_.request_buffer_->size());
414✔
954

955
        request_data_.http_request_parser_ = make_shared<http::request_parser<http::buffer_body>>();
828✔
956

957
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
958
        // they do, they should be handled higher up in the application logic.
959
        //
960
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
961
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
962
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
963
        // possible value instead.
964
        request_data_.http_request_parser_->body_limit(numeric_limits<uint64_t>::max());
965
}
414✔
966

967
Stream::~Stream() {
1,242✔
968
        DoCancel();
414✔
969
}
414✔
970

971
void Stream::Cancel() {
7✔
972
        auto cancelled = cancelled_;
973

974
        if (!*cancelled) {
7✔
975
                auto err =
976
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
977
                switch (status_) {
7✔
978
                case TransactionStatus::None:
×
979
                        CallErrorHandler(err, request_, server_.header_handler_);
×
980
                        break;
×
981
                case TransactionStatus::HeaderHandlerCalled:
5✔
982
                case TransactionStatus::ReaderCreated:
983
                case TransactionStatus::BodyReadingInProgress:
984
                case TransactionStatus::BodyReadingFinished:
985
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
986
                        break;
5✔
987
                case TransactionStatus::BodyHandlerCalled:
×
988
                        // In between body handler and reply finished. No one to handle the status
989
                        // here.
990
                        server_.RemoveStream(shared_from_this());
×
991
                        break;
×
992
                case TransactionStatus::Replying:
1✔
993
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
994
                        break;
1✔
995
                case TransactionStatus::SwitchingProtocol:
1✔
996
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
997
                        break;
1✔
998
                case TransactionStatus::Done:
999
                        break;
1000
                }
1001
        }
1002

1003
        if (!*cancelled) {
7✔
1004
                DoCancel();
×
1005
        }
1006
}
7✔
1007

1008
void Stream::DoCancel() {
596✔
1009
        if (socket_.is_open()) {
596✔
1010
                socket_.cancel();
210✔
1011
                socket_.close();
210✔
1012
        }
1013

1014
        // Set cancel state and then make a new one. Those who are interested should have their own
1015
        // pointer to the old one.
1016
        *cancelled_ = true;
596✔
1017
        cancelled_ = make_shared<bool>(true);
596✔
1018
}
596✔
1019

1020
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1021
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1022
}
×
1023

1024
void Stream::CallErrorHandler(
×
1025
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1026
        *cancelled_ = true;
×
1027
        cancelled_ = make_shared<bool>(true);
×
1028
        status_ = TransactionStatus::Done;
×
1029
        handler(expected::unexpected(err.WithContext(
×
1030
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1031

1032
        server_.RemoveStream(shared_from_this());
×
1033
}
×
1034

1035
void Stream::CallErrorHandler(
2✔
1036
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1037
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1038
}
2✔
1039

1040
void Stream::CallErrorHandler(
9✔
1041
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1042
        *cancelled_ = true;
9✔
1043
        cancelled_ = make_shared<bool>(true);
9✔
1044
        status_ = TransactionStatus::Done;
9✔
1045
        handler(
9✔
1046
                req,
1047
                err.WithContext(
9✔
1048
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
27✔
1049

1050
        server_.RemoveStream(shared_from_this());
9✔
1051
}
9✔
1052

1053
void Stream::CallErrorHandler(
4✔
1054
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1055
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1056
}
4✔
1057

1058
void Stream::CallErrorHandler(
7✔
1059
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1060
        *cancelled_ = true;
7✔
1061
        cancelled_ = make_shared<bool>(true);
7✔
1062
        status_ = TransactionStatus::Done;
7✔
1063
        handler(err.WithContext(
14✔
1064
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1065

1066
        server_.RemoveStream(shared_from_this());
7✔
1067
}
7✔
1068

1069
void Stream::CallErrorHandler(
×
1070
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1071
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1072
}
×
1073

1074
void Stream::CallErrorHandler(
1✔
1075
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1076
        *cancelled_ = true;
1✔
1077
        cancelled_ = make_shared<bool>(true);
1✔
1078
        status_ = TransactionStatus::Done;
1✔
1079
        handler(expected::unexpected(err.WithContext(
2✔
1080
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1081

1082
        server_.RemoveStream(shared_from_this());
1✔
1083
}
1✔
1084

1085
void Stream::AcceptHandler(const error_code &ec) {
218✔
1086
        if (ec) {
218✔
1087
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1088
                return;
×
1089
        }
1090

1091
        auto ip = socket_.remote_endpoint().address().to_string();
436✔
1092

1093
        // Use IP as context for logging.
1094
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
218✔
1095

1096
        logger_.Debug("Accepted connection.");
436✔
1097

1098
        request_.reset(new IncomingRequest(*this, cancelled_));
436✔
1099

1100
        request_->address_.host = ip;
218✔
1101

1102
        *cancelled_ = false;
218✔
1103

1104
        ReadHeader();
218✔
1105
}
1106

1107
void Stream::ReadHeader() {
218✔
1108
        auto &cancelled = cancelled_;
1109
        auto &request_data = request_data_;
218✔
1110

1111
        http::async_read_some(
436✔
1112
                socket_,
218✔
1113
                *request_data_.request_buffer_,
1114
                *request_data_.http_request_parser_,
1115
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
218✔
1116
                        if (!*cancelled) {
218✔
1117
                                ReadHeaderHandler(ec, num_read);
218✔
1118
                        }
1119
                });
218✔
1120
}
218✔
1121

1122
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
218✔
1123
        if (num_read > 0) {
218✔
1124
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
436✔
1125
        }
1126

1127
        if (ec) {
218✔
1128
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1129
                return;
173✔
1130
        }
1131

1132
        if (!request_data_.http_request_parser_->is_header_done()) {
218✔
1133
                ReadHeader();
×
1134
                return;
×
1135
        }
1136

1137
        auto method_result = BeastVerbToMethod(
1138
                request_data_.http_request_parser_->get().base().method(),
1139
                string {request_data_.http_request_parser_->get().base().method_string()});
436✔
1140
        if (!method_result) {
218✔
1141
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1142
                return;
×
1143
        }
1144
        request_->method_ = method_result.value();
218✔
1145
        request_->address_.path = string(request_data_.http_request_parser_->get().base().target());
218✔
1146

1147
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
218✔
1148

1149
        string debug_str;
1150
        for (auto header = request_data_.http_request_parser_->get().cbegin();
408✔
1151
                 header != request_data_.http_request_parser_->get().cend();
626✔
1152
                 header++) {
1153
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,224✔
1154
                if (logger_.Level() >= log::LogLevel::Debug) {
408✔
1155
                        debug_str += string {header->name_string()};
395✔
1156
                        debug_str += ": ";
395✔
1157
                        debug_str += string {header->value()};
395✔
1158
                        debug_str += "\n";
395✔
1159
                }
1160
        }
1161

1162
        logger_.Debug("Received headers:\n" + debug_str);
436✔
1163
        debug_str.clear();
1164

1165
        if (request_data_.http_request_parser_->chunked()) {
218✔
1166
                auto cancelled = cancelled_;
1167
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
1168
                server_.header_handler_(request_);
2✔
1169
                if (!*cancelled) {
1✔
1170
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
1171
                        CallErrorHandler(err, request_, server_.body_handler_);
2✔
1172
                }
1173
                return;
1174
        }
1175

1176
        auto content_length = request_data_.http_request_parser_->content_length();
217✔
1177
        if (content_length) {
217✔
1178
                request_body_length_ = content_length.value();
46✔
1179
        } else {
1180
                request_body_length_ = 0;
171✔
1181
        }
1182
        request_body_read_ = 0;
217✔
1183

1184
        if (request_body_read_ >= request_body_length_) {
217✔
1185
                auto cancelled = cancelled_;
1186
                status_ = TransactionStatus::HeaderHandlerCalled;
171✔
1187
                server_.header_handler_(request_);
342✔
1188
                if (!*cancelled) {
171✔
1189
                        status_ = TransactionStatus::BodyHandlerCalled;
171✔
1190
                        CallBodyHandler();
171✔
1191
                }
1192
                return;
1193
        }
1194

1195
        auto cancelled = cancelled_;
1196
        status_ = TransactionStatus::HeaderHandlerCalled;
46✔
1197
        server_.header_handler_(request_);
92✔
1198
        if (*cancelled) {
46✔
1199
                return;
1200
        }
1201

1202
        // We know that a body reader is required here, because of the `request_body_read_ >=
1203
        // request_body_length_` check above.
1204
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
45✔
1205
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1206
        }
1207
}
1208

1209
void Stream::AsyncReadNextBodyPart(
2,046✔
1210
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1211
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1212

1213
        if (status_ == TransactionStatus::ReaderCreated) {
2,046✔
1214
                status_ = TransactionStatus::BodyReadingInProgress;
44✔
1215
        }
1216

1217
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,046✔
1218
                auto cancelled = cancelled_;
1219
                handler(0);
76✔
1220
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
38✔
1221
                        status_ = TransactionStatus::BodyHandlerCalled;
38✔
1222
                        CallBodyHandler();
38✔
1223
                }
1224
                return;
1225
        }
1226

1227
        reader_buf_start_ = start;
2,008✔
1228
        reader_buf_end_ = end;
2,008✔
1229
        reader_handler_ = handler;
2,008✔
1230
        size_t read_size = end - start;
2,008✔
1231
        size_t smallest = min(body_buffer_.size(), read_size);
3,064✔
1232

1233
        request_data_.http_request_parser_->get().body().data = body_buffer_.data();
2,008✔
1234
        request_data_.http_request_parser_->get().body().size = smallest;
2,008✔
1235

1236
        auto &cancelled = cancelled_;
1237
        auto &request_data = request_data_;
2,008✔
1238

1239
        http::async_read_some(
4,016✔
1240
                socket_,
2,008✔
1241
                *request_data_.request_buffer_,
1242
                *request_data_.http_request_parser_,
1243
                [this, cancelled, request_data](const error_code &ec, size_t num_read) {
2,008✔
1244
                        if (!*cancelled) {
2,008✔
1245
                                ReadBodyHandler(ec, num_read);
2,008✔
1246
                        }
1247
                });
2,008✔
1248
}
1249

1250
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,008✔
1251
        if (num_read > 0) {
2,008✔
1252
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,008✔
1253
                request_body_read_ += num_read;
2,004✔
1254
        }
1255

1256
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,008✔
1257
                // This can be ignored. We always reset the buffer between reads anyway.
1258
                ec = error_code();
979✔
1259
        }
1260

1261
        assert(reader_handler_);
1262

1263
        if (request_body_read_ >= request_body_length_) {
2,008✔
1264
                status_ = TransactionStatus::BodyReadingFinished;
38✔
1265
        }
1266

1267
        auto cancelled = cancelled_;
1268

1269
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,008✔
1270
        size_t smallest = min(num_read, buf_size);
2,008✔
1271
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,008✔
1272
        if (ec) {
2,008✔
1273
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1274
                reader_handler_(expected::unexpected(err));
12✔
1275
        } else {
1276
                reader_handler_(smallest);
4,008✔
1277
        }
1278

1279
        if (!*cancelled && ec) {
2,008✔
1280
                CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1281
                return;
1282
        }
1283
}
1284

1285
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
196✔
1286
        SetupResponse();
196✔
1287

1288
        reply_finished_handler_ = reply_finished_handler;
196✔
1289

1290
        auto &cancelled = cancelled_;
1291
        auto &response_data = response_data_;
196✔
1292

1293
        http::async_write_header(
392✔
1294
                socket_,
196✔
1295
                *response_data_.http_response_serializer_,
1296
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
196✔
1297
                        if (!*cancelled) {
196✔
1298
                                WriteHeaderHandler(ec, num_written);
195✔
1299
                        }
1300
                });
196✔
1301
}
196✔
1302

1303
void Stream::SetupResponse() {
205✔
1304
        auto response = maybe_response_.lock();
205✔
1305
        // Only called from existing responses, so this should always be true.
1306
        assert(response);
1307

1308
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1309
        status_ = TransactionStatus::Replying;
205✔
1310

1311
        // From here on we take shared ownership.
1312
        response_ = response;
1313

1314
        response_data_.http_response_ = make_shared<http::response<http::buffer_body>>();
410✔
1315

1316
        for (const auto &header : response->headers_) {
429✔
1317
                response_data_.http_response_->base().set(header.first, header.second);
224✔
1318
        }
1319

1320
        response_data_.http_response_->result(response->GetStatusCode());
205✔
1321
        response_data_.http_response_->reason(response->GetStatusMessage());
410✔
1322

1323
        response_data_.http_response_serializer_ =
1324
                make_shared<http::response_serializer<http::buffer_body>>(*response_data_.http_response_);
410✔
1325
}
205✔
1326

1327
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
195✔
1328
        if (num_written > 0) {
195✔
1329
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
390✔
1330
        }
1331

1332
        if (ec) {
195✔
1333
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1334
                return;
34✔
1335
        }
1336

1337
        auto header = response_->GetHeader("Content-Length");
390✔
1338
        if (!header || header.value() == "0") {
195✔
1339
                FinishReply();
33✔
1340
                return;
1341
        }
1342

1343
        auto length = common::StringToLongLong(header.value());
162✔
1344
        if (!length || length.value() < 0) {
162✔
1345
                auto err = error::Error(
1346
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1347
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1348
                return;
1349
        }
1350

1351
        if (!response_->body_reader_ && !response_->async_body_reader_) {
162✔
1352
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1353
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1354
                return;
1355
        }
1356

1357
        PrepareAndWriteNewBodyBuffer();
161✔
1358
}
1359

1360
void Stream::PrepareAndWriteNewBodyBuffer() {
1,883✔
1361
        // response_->body_reader_ XOR response_->async_body_reader_
1362
        assert(
1363
                (response_->body_reader_ || response_->async_body_reader_)
1364
                && !(response_->body_reader_ && response_->async_body_reader_));
1365

1366
        auto read_handler = [this](io::ExpectedSize read) {
1,884✔
1367
                if (!read) {
1,883✔
1368
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1369
                        return;
1✔
1370
                }
1371
                WriteNewBodyBuffer(read.value());
1,882✔
1372
        };
1,883✔
1373

1374
        if (response_->body_reader_) {
1,883✔
1375
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,218✔
1376
        } else {
1377
                auto err = response_->async_body_reader_->AsyncRead(
1378
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1379
                if (err != error::NoError) {
274✔
1380
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1381
                }
1382
        }
1383
}
1,883✔
1384

1385
void Stream::WriteNewBodyBuffer(size_t size) {
1,882✔
1386
        response_data_.http_response_->body().data = body_buffer_.data();
1,882✔
1387
        response_data_.http_response_->body().size = size;
1,882✔
1388

1389
        if (size > 0) {
1,882✔
1390
                response_data_.http_response_->body().more = true;
1,756✔
1391
        } else {
1392
                response_data_.http_response_->body().more = false;
126✔
1393
        }
1394

1395
        WriteBody();
1,882✔
1396
}
1,882✔
1397

1398
void Stream::WriteBody() {
3,617✔
1399
        auto &cancelled = cancelled_;
1400
        auto &response_data = response_data_;
3,617✔
1401

1402
        http::async_write_some(
7,234✔
1403
                socket_,
3,617✔
1404
                *response_data_.http_response_serializer_,
1405
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
3,580✔
1406
                        if (!*cancelled) {
3,580✔
1407
                                WriteBodyHandler(ec, num_written);
3,580✔
1408
                        }
1409
                });
3,580✔
1410
}
3,617✔
1411

1412
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
3,580✔
1413
        if (num_written > 0) {
3,580✔
1414
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,470✔
1415
        }
1416

1417
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,580✔
1418
                // Write next body block.
1419
                PrepareAndWriteNewBodyBuffer();
1,722✔
1420
        } else if (ec) {
1,858✔
1421
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1422
        } else if (num_written > 0) {
1,854✔
1423
                // We are still writing the body.
1424
                WriteBody();
1,735✔
1425
        } else {
1426
                // We are finished.
1427
                FinishReply();
119✔
1428
        }
1429
}
3,580✔
1430

1431
void Stream::FinishReply() {
152✔
1432
        // We are done.
1433
        *cancelled_ = true;
152✔
1434
        cancelled_ = make_shared<bool>(true);
152✔
1435
        status_ = TransactionStatus::Done;
152✔
1436
        // Release ownership of Body reader.
1437
        response_->body_reader_.reset();
152✔
1438
        response_->async_body_reader_.reset();
152✔
1439
        reply_finished_handler_(error::NoError);
152✔
1440
        server_.RemoveStream(shared_from_this());
152✔
1441
}
152✔
1442

1443
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1444
        SetupResponse();
9✔
1445

1446
        switch_protocol_handler_ = handler;
9✔
1447
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1448

1449
        auto &cancelled = cancelled_;
1450
        auto &response_data = response_data_;
9✔
1451

1452
        http::async_write_header(
18✔
1453
                socket_,
9✔
1454
                *response_data_.http_response_serializer_,
1455
                [this, cancelled, response_data](const error_code &ec, size_t num_written) {
9✔
1456
                        if (!*cancelled) {
9✔
1457
                                SwitchingProtocolHandler(ec, num_written);
8✔
1458
                        }
1459
                });
9✔
1460

1461
        return error::NoError;
9✔
1462
}
1463

1464
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1465
        if (num_written > 0) {
8✔
1466
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1467
        }
1468

1469
        if (ec) {
8✔
1470
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1471
                return;
×
1472
        }
1473

1474
        auto socket = make_shared<RawSocket<tcp::socket>>(
1475
                make_shared<tcp::socket>(std::move(socket_)), request_data_.request_buffer_);
8✔
1476

1477
        auto switch_protocol_handler = switch_protocol_handler_;
8✔
1478

1479
        // Rest of the connection is done directly on the socket, we are done here.
1480
        status_ = TransactionStatus::Done;
8✔
1481
        *cancelled_ = true;
8✔
1482
        cancelled_ = make_shared<bool>(true);
8✔
1483
        server_.RemoveStream(shared_from_this());
16✔
1484

1485
        switch_protocol_handler(socket);
16✔
1486
}
1487

1488
void Stream::CallBodyHandler() {
209✔
1489
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1490
        // it immediately destroys, which would destroy this stream as well. At the end of this
1491
        // function, it's ok to destroy it.
1492
        auto stream_ref = shared_from_this();
1493

1494
        server_.body_handler_(request_, error::NoError);
627✔
1495

1496
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1497
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1498
        // request has not been handled correctly.
1499
        auto response = maybe_response_.lock();
209✔
1500
        if (!response) {
209✔
1501
                logger_.Error("Handler produced no response. Closing stream prematurely.");
4✔
1502
                *cancelled_ = true;
2✔
1503
                cancelled_ = make_shared<bool>(true);
2✔
1504
                server_.RemoveStream(shared_from_this());
6✔
1505
        }
1506
}
209✔
1507

1508
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
216✔
1509
        event_loop_ {event_loop},
1510
        acceptor_(GetAsioIoContext(event_loop_)) {
386✔
1511
}
216✔
1512

1513
Server::~Server() {
432✔
1514
        Cancel();
216✔
1515
}
216✔
1516

1517
error::Error Server::AsyncServeUrl(
184✔
1518
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1519
        return AsyncServeUrl(
1520
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
752✔
1521
                        if (err != error::NoError) {
206✔
1522
                                body_handler(expected::unexpected(err));
14✔
1523
                        } else {
1524
                                body_handler(req);
398✔
1525
                        }
1526
                });
574✔
1527
}
1528

1529
error::Error Server::AsyncServeUrl(
197✔
1530
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1531
        auto err = BreakDownUrl(url, address_);
197✔
1532
        if (error::NoError != err) {
197✔
1533
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1534
        }
1535

1536
        if (address_.protocol != "http") {
197✔
1537
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1538
        }
1539

1540
        if (address_.path.size() > 0 && address_.path != "/") {
197✔
1541
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1542
        }
1543

1544
        boost::system::error_code ec;
196✔
1545
        auto address = asio::ip::make_address(address_.host, ec);
196✔
1546
        if (ec) {
196✔
1547
                return error::Error(
1548
                        ec.default_error_condition(),
×
1549
                        "Could not construct endpoint from address " + address_.host);
×
1550
        }
1551

1552
        asio::ip::tcp::endpoint endpoint(address, address_.port);
196✔
1553

1554
        ec.clear();
1555
        acceptor_.open(endpoint.protocol(), ec);
196✔
1556
        if (ec) {
196✔
1557
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1558
        }
1559

1560
        // Allow address reuse, otherwise we can't re-bind later.
1561
        ec.clear();
1562
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
196✔
1563
        if (ec) {
196✔
1564
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1565
        }
1566

1567
        ec.clear();
1568
        acceptor_.bind(endpoint, ec);
196✔
1569
        if (ec) {
196✔
1570
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1571
        }
1572

1573
        ec.clear();
1574
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
196✔
1575
        if (ec) {
196✔
1576
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1577
        }
1578

1579
        header_handler_ = header_handler;
196✔
1580
        body_handler_ = body_handler;
196✔
1581

1582
        PrepareNewStream();
196✔
1583

1584
        return error::NoError;
196✔
1585
}
1586

1587
void Server::Cancel() {
231✔
1588
        if (acceptor_.is_open()) {
231✔
1589
                acceptor_.cancel();
196✔
1590
                acceptor_.close();
196✔
1591
        }
1592
        streams_.clear();
1593
}
231✔
1594

1595
uint16_t Server::GetPort() const {
13✔
1596
        return acceptor_.local_endpoint().port();
13✔
1597
}
1598

1599
string Server::GetUrl() const {
12✔
1600
        return "http://127.0.0.1:" + to_string(GetPort());
24✔
1601
}
1602

1603
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
208✔
1604
        if (*req->cancelled_) {
208✔
1605
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1606
        }
1607
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
416✔
1608
        req->stream_.maybe_response_ = response;
208✔
1609
        return response;
208✔
1610
}
1611

1612
error::Error Server::AsyncReply(
196✔
1613
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1614
        if (*resp->cancelled_) {
196✔
1615
                return MakeError(StreamCancelledError, "Cannot send response");
×
1616
        }
1617

1618
        resp->stream_.AsyncReply(reply_finished_handler);
196✔
1619
        return error::NoError;
196✔
1620
}
1621

1622
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
60✔
1623
        if (*req->cancelled_) {
60✔
1624
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1625
        }
1626

1627
        auto &stream = req->stream_;
60✔
1628
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
60✔
1629
                return expected::unexpected(error::Error(
1✔
1630
                        make_error_condition(errc::operation_in_progress),
2✔
1631
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1632
        }
1633

1634
        if (stream.request_body_length_ == 0) {
59✔
1635
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
45✔
1636
        }
1637

1638
        stream.status_ = TransactionStatus::ReaderCreated;
44✔
1639
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
88✔
1640
}
1641

1642
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1643
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1644
}
1645

1646
void Server::PrepareNewStream() {
414✔
1647
        StreamPtr new_stream {new Stream(*this)};
414✔
1648
        streams_.insert(new_stream);
1649
        AsyncAccept(new_stream);
828✔
1650
}
414✔
1651

1652
void Server::AsyncAccept(StreamPtr stream) {
414✔
1653
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
635✔
1654
                if (ec) {
221✔
1655
                        log::Error("Could not accept connection: " + ec.message());
6✔
1656
                        return;
3✔
1657
                }
1658

1659
                stream->AcceptHandler(ec);
218✔
1660

1661
                this->PrepareNewStream();
218✔
1662
        });
1663
}
414✔
1664

1665
void Server::RemoveStream(StreamPtr stream) {
182✔
1666
        streams_.erase(stream);
182✔
1667

1668
        stream->DoCancel();
182✔
1669
}
182✔
1670

1671
} // namespace http
1672
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc