• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1024599080

03 Oct 2023 06:47PM UTC coverage: 80.244% (+0.1%) from 80.141%
1024599080

push

gitlab-ci

lluiscampos
feat: Implement state script retry-later functionality

Ticket: MEN-6677
Changelog: None

Signed-off-by: Lluis Campos <lluis.campos@northern.tech>

33 of 33 new or added lines in 2 files covered. (100.0%)

6499 of 8099 relevant lines covered (80.24%)

10689.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.27
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24

25
namespace mender {
26
namespace http {
27

28
namespace common = mender::common;
29

30
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
31
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
32
const unsigned int BeastHttpVersion = 11;
33

34
namespace asio = boost::asio;
35
namespace http = boost::beast::http;
36

37
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
38

39
static http::verb MethodToBeastVerb(Method method) {
222✔
40
        switch (method) {
222✔
41
        case Method::GET:
42
                return http::verb::get;
43
        case Method::HEAD:
44
                return http::verb::head;
45
        case Method::POST:
46
                return http::verb::post;
47
        case Method::PUT:
48
                return http::verb::put;
49
        case Method::PATCH:
50
                return http::verb::patch;
51
        case Method::CONNECT:
52
                return http::verb::connect;
53
        case Method::Invalid:
54
                // Fallthrough to end (no-op).
55
                break;
56
        }
57
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
58
        // still assert here for safety.
59
        assert(false);
60
        return http::verb::get;
61
}
62

63
static expected::expected<Method, error::Error> BeastVerbToMethod(
218✔
64
        http::verb verb, const string &verb_string) {
65
        switch (verb) {
218✔
66
        case http::verb::get:
176✔
67
                return Method::GET;
68
        case http::verb::head:
×
69
                return Method::HEAD;
70
        case http::verb::post:
20✔
71
                return Method::POST;
72
        case http::verb::put:
22✔
73
                return Method::PUT;
74
        case http::verb::patch:
×
75
                return Method::PATCH;
76
        case http::verb::connect:
×
77
                return Method::CONNECT;
78
        default:
×
79
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
80
        }
81
}
82

83
template <typename StreamType>
84
class BodyAsyncReader : virtual public io::AsyncReader {
85
public:
86
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
145✔
87
                stream_ {stream},
88
                cancelled_ {cancelled} {
290✔
89
        }
145✔
90
        ~BodyAsyncReader() {
44✔
91
                Cancel();
44✔
92
        }
88✔
93

94
        error::Error AsyncRead(
2,046✔
95
                vector<uint8_t>::iterator start,
96
                vector<uint8_t>::iterator end,
97
                io::AsyncIoHandler handler) override {
98
                if (*cancelled_) {
2,046✔
99
                        return error::MakeError(
×
100
                                error::ProgrammingError,
101
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
102
                }
103
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,046✔
104
                return error::NoError;
2,046✔
105
        }
106

107
        void Cancel() override {
46✔
108
                if (!*cancelled_) {
46✔
109
                        stream_.Cancel();
4✔
110
                }
111
        }
46✔
112

113
private:
114
        StreamType &stream_;
115
        shared_ptr<bool> cancelled_;
116

117
        friend class Client;
118
        friend class Server;
119
};
120

121
template <typename StreamType>
122
class RawSocket : virtual public io::AsyncReadWriter {
123
public:
124
        RawSocket(shared_ptr<StreamType> stream, shared_ptr<beast::flat_buffer> buffered) :
×
125
                destroying_ {make_shared<bool>(false)},
×
126
                stream_ {stream},
127
                buffered_ {buffered} {
×
128
                // If there are no buffered bytes, then we don't need it.
129
                if (buffered_ && buffered_->size() == 0) {
×
130
                        buffered_.reset();
×
131
                }
132
        }
×
133

134
        ~RawSocket() {
15✔
135
                *destroying_ = true;
15✔
136
                Cancel();
15✔
137
        }
30✔
138

139
        error::Error AsyncRead(
320✔
140
                vector<uint8_t>::iterator start,
141
                vector<uint8_t>::iterator end,
142
                io::AsyncIoHandler handler) override {
143
                // If we have prebuffered bytes, which can happen if the HTTP parser read the
144
                // header and parts of the body in one block, return those first.
145
                if (buffered_) {
320✔
146
                        return DrainPrebufferedData(start, end, handler);
8✔
147
                }
148

149
                read_buffer_ = asio::buffer(&*start, end - start);
316✔
150
                auto &destroying = destroying_;
151
                stream_->async_read_some(
632✔
152
                        read_buffer_,
316✔
153
                        [destroying, handler](const boost::system::error_code &ec, size_t num_read) {
313✔
154
                                if (*destroying) {
313✔
155
                                        return;
156
                                }
157

158
                                if (ec == asio::error::operation_aborted) {
313✔
159
                                        handler(expected::unexpected(error::Error(
12✔
160
                                                make_error_condition(errc::operation_canceled),
6✔
161
                                                "Could not read from socket")));
162
                                } else if (ec) {
310✔
163
                                        handler(expected::unexpected(
12✔
164
                                                error::Error(ec.default_error_condition(), "Could not read from socket")));
12✔
165
                                } else {
166
                                        handler(num_read);
608✔
167
                                }
168
                        });
169
                return error::NoError;
316✔
170
        }
171

172
        error::Error AsyncWrite(
309✔
173
                vector<uint8_t>::const_iterator start,
174
                vector<uint8_t>::const_iterator end,
175
                io::AsyncIoHandler handler) override {
176
                write_buffer_ = asio::buffer(&*start, end - start);
309✔
177
                auto &destroying = destroying_;
178
                stream_->async_write_some(
618✔
179
                        write_buffer_,
309✔
180
                        [destroying, handler](const boost::system::error_code &ec, size_t num_written) {
306✔
181
                                if (*destroying) {
306✔
182
                                        return;
183
                                }
184

185
                                if (ec == asio::error::operation_aborted) {
306✔
186
                                        handler(expected::unexpected(error::Error(
×
187
                                                make_error_condition(errc::operation_canceled),
×
188
                                                "Could not write to socket")));
189
                                } else if (ec) {
306✔
190
                                        handler(expected::unexpected(
×
191
                                                error::Error(ec.default_error_condition(), "Could not write to socket")));
×
192
                                } else {
193
                                        handler(num_written);
612✔
194
                                }
195
                        });
196
                return error::NoError;
309✔
197
        }
198

199
        void Cancel() override {
28✔
200
                if (stream_->lowest_layer().is_open()) {
28✔
201
                        stream_->lowest_layer().cancel();
15✔
202
                        stream_->lowest_layer().close();
15✔
203
                }
204
        }
28✔
205

206
private:
207
        error::Error DrainPrebufferedData(
4✔
208
                vector<uint8_t>::iterator start,
209
                vector<uint8_t>::iterator end,
210
                io::AsyncIoHandler handler) {
211
                size_t to_copy = min(static_cast<size_t>(end - start), buffered_->size());
8✔
212
                copy_n(static_cast<const uint8_t *>(buffered_->cdata().data()), to_copy, start);
4✔
213
                buffered_->consume(to_copy);
4✔
214
                if (buffered_->size() == 0) {
4✔
215
                        // We don't need it anymore.
216
                        buffered_.reset();
4✔
217
                }
218
                handler(to_copy);
4✔
219
                return error::NoError;
4✔
220
        }
221

222
        shared_ptr<bool> destroying_;
223
        shared_ptr<StreamType> stream_;
224
        shared_ptr<beast::flat_buffer> buffered_;
225
        asio::mutable_buffer read_buffer_;
226
        asio::const_buffer write_buffer_;
227
};
228

229
Client::Client(
396✔
230
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
231
        event_loop_ {event_loop},
232
        logger_name_ {logger_name},
233
        cancelled_ {make_shared<bool>(true)},
396✔
234
        resolver_(GetAsioIoContext(event_loop)),
235
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,392✔
236
        ssl_ctx_.set_verify_mode(ssl::verify_peer);
396✔
237

238
        if (client.client_cert_path != "" and client.client_cert_key_path != "") {
396✔
239
                ssl_ctx_.set_options(boost::asio::ssl::context::default_workarounds);
1✔
240
                ssl_ctx_.use_certificate_file(client.client_cert_path, boost::asio::ssl::context_base::pem);
1✔
241
                ssl_ctx_.use_private_key_file(
1✔
242
                        client.client_cert_key_path, boost::asio::ssl::context_base::pem);
243
        }
244

245
        beast::error_code ec {};
396✔
246
        ssl_ctx_.set_default_verify_paths(ec); // Load the default CAs
396✔
247
        if (ec) {
396✔
248
                log::Error("Failed to load the SSL default directory");
×
249
        }
250
        if (client.server_cert_path != "") {
396✔
251
                ssl_ctx_.load_verify_file(client.server_cert_path, ec);
4✔
252
                if (ec) {
4✔
253
                        log::Error("Failed to load the server certificate!");
×
254
                }
255
        }
256
}
396✔
257

258
Client::~Client() {
1,188✔
259
        if (!*cancelled_) {
396✔
260
                logger_.Warning("Client destroyed while request is still active!");
26✔
261
        }
262
        DoCancel();
396✔
263
}
396✔
264

265
error::Error Client::AsyncCall(
240✔
266
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
267
        if (!*cancelled_ && status_ != TransactionStatus::Done) {
240✔
268
                return error::Error(
269
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
270
        }
271

272
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
240✔
273
                return error::MakeError(error::ProgrammingError, "Request is not ready");
4✔
274
        }
275

276
        if (!header_handler || !body_handler) {
238✔
277
                return error::MakeError(
278
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
2✔
279
        }
280

281
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
237✔
282
                return error::Error(
283
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
1✔
284
        }
285

286
        if (req->address_.protocol == "https") {
236✔
287
                is_https_ = true;
5✔
288
        }
289

290
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
236✔
291

292
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
293
        // request to route to our k8s cluster. Set this in all cases.
294
        req->SetHeader("HOST", req->address_.host);
472✔
295

296
        request_ = req;
297
        header_handler_ = header_handler;
236✔
298
        body_handler_ = body_handler;
236✔
299
        status_ = TransactionStatus::None;
236✔
300

301
        cancelled_ = make_shared<bool>(false);
236✔
302

303
        auto &cancelled = cancelled_;
304

305
        resolver_.async_resolve(
472✔
306
                request_->address_.host,
307
                to_string(request_->address_.port),
472✔
308
                [this, cancelled](
472✔
309
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
235✔
310
                        if (!*cancelled) {
236✔
311
                                ResolveHandler(ec, results);
235✔
312
                        }
313
                });
236✔
314

315
        return error::NoError;
236✔
316
}
317

318
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
163✔
319
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
163✔
320
                return expected::unexpected(error::Error(
2✔
321
                        make_error_condition(errc::operation_in_progress),
4✔
322
                        "MakeBodyAsyncReader called while reading is in progress"));
6✔
323
        }
324

325
        if (response_body_length_ == 0) {
161✔
326
                return expected::unexpected(
16✔
327
                        MakeError(BodyMissingError, "Response does not contain a body"));
48✔
328
        }
329

330
        status_ = TransactionStatus::ReaderCreated;
145✔
331
        return make_shared<BodyAsyncReader<Client>>(resp->client_.GetHttpClient(), resp->cancelled_);
290✔
332
}
333

334
io::ExpectedAsyncReadWriterPtr Client::SwitchProtocol(IncomingResponsePtr req) {
7✔
335
        if (*cancelled_) {
7✔
336
                return expected::unexpected(error::Error(
×
337
                        make_error_condition(errc::not_connected),
×
338
                        "Cannot switch protocols if endpoint is not connected"));
×
339
        }
340

341
        // Rest of the connection is done directly on the socket, we are done here.
342
        status_ = TransactionStatus::Done;
7✔
343
        *cancelled_ = true;
7✔
344
        cancelled_ = make_shared<bool>(false);
14✔
345

346
        auto stream = stream_;
347
        // This no longer belongs to us.
348
        stream_.reset();
7✔
349

350
        if (is_https_) {
7✔
351
                return make_shared<RawSocket<ssl::stream<tcp::socket>>>(stream, response_buffer_);
×
352
        } else {
353
                return make_shared<RawSocket<tcp::socket>>(
7✔
354
                        make_shared<tcp::socket>(std::move(stream->next_layer())), response_buffer_);
14✔
355
        }
356
}
357

358
void Client::CallHandler(ResponseHandler handler) {
283✔
359
        // This function exists to make sure we have a copy of the handler we're calling (in the
360
        // argument list). This is important in case the handler owns the client instance through a
361
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
362
        // does, then it destroys the final copy of the handler, and therefore also the client,
363
        // which is why we need to make a copy here, before calling it.
364
        handler(response_);
283✔
365
}
283✔
366

367
void Client::CallErrorHandler(
24✔
368
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
369
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
72✔
370
}
24✔
371

372
void Client::CallErrorHandler(
151✔
373
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
374
        *cancelled_ = true;
151✔
375
        cancelled_ = make_shared<bool>(true);
151✔
376
        stream_.reset();
151✔
377
        status_ = TransactionStatus::Done;
151✔
378
        handler(expected::unexpected(
302✔
379
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
604✔
380
}
151✔
381

382
void Client::ResolveHandler(
235✔
383
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
384
        if (ec) {
235✔
385
                CallErrorHandler(ec, request_, header_handler_);
×
386
                return;
×
387
        }
388

389
        if (logger_.Level() >= log::LogLevel::Debug) {
235✔
390
                string ips = "[";
232✔
391
                string sep;
392
                for (auto r : results) {
954✔
393
                        ips += sep;
245✔
394
                        ips += r.endpoint().address().to_string();
245✔
395
                        sep = ", ";
245✔
396
                }
397
                ips += "]";
232✔
398
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
464✔
399
        }
400

401
        resolver_results_ = results;
402

403
        stream_ = make_shared<ssl::stream<tcp::socket>>(GetAsioIoContext(event_loop_), ssl_ctx_);
470✔
404

405
        if (!response_buffer_) {
235✔
406
                // We can reuse this if preexisting.
407
                response_buffer_ = make_shared<beast::flat_buffer>();
338✔
408

409
                // This is equivalent to:
410
                //   response_buffer_.reserve(body_buffer_.size());
411
                // but compatible with Boost 1.67.
412
                response_buffer_->prepare(body_buffer_.size() - response_buffer_->size());
169✔
413
        }
414

415
        http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
470✔
416

417
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
418
        // if they do, they should be handled higher up in the application logic.
419
        //
420
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
421
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
422
        // `has_value()` in their code, causing their subsequent comparison operation to
423
        // misbehave. So pass highest possible value instead.
424
        http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
425

426
        auto &cancelled = cancelled_;
427

428
        asio::async_connect(
235✔
429
                stream_->next_layer(),
430
                resolver_results_,
235✔
431
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
470✔
432
                        if (!*cancelled) {
235✔
433
                                if (is_https_) {
235✔
434
                                        return HandshakeHandler(ec, endpoint);
5✔
435
                                }
436
                                return ConnectHandler(ec, endpoint);
230✔
437
                        }
438
                });
439
}
440

441
void Client::HandshakeHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
5✔
442
        if (ec) {
5✔
443
                CallErrorHandler(ec, request_, header_handler_);
×
444
                return;
×
445
        }
446

447
        // Set SNI Hostname (many hosts need this to handshake successfully)
448
        if (!SSL_set_tlsext_host_name(stream_->native_handle(), request_->address_.host.c_str())) {
5✔
449
                beast::error_code ec2 {
450
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
451
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
452
        }
453

454
        auto &cancelled = cancelled_;
455

456
        stream_->async_handshake(
10✔
457
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
5✔
458
                        if (*cancelled) {
5✔
459
                                return;
460
                        }
461
                        if (ec) {
5✔
462
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
2✔
463
                                CallErrorHandler(ec, request_, header_handler_);
1✔
464
                                return;
1✔
465
                        }
466
                        logger_.Debug("https: Successful SSL handshake");
8✔
467
                        ConnectHandler(ec, endpoint);
4✔
468
                });
469
}
470

471

472
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
234✔
473
        if (ec) {
234✔
474
                CallErrorHandler(ec, request_, header_handler_);
12✔
475
                return;
12✔
476
        }
477

478
        logger_.Debug("Connected to " + endpoint.address().to_string());
444✔
479

480
        http_request_ = make_shared<http::request<http::buffer_body>>(
222✔
481
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
444✔
482

483
        for (const auto &header : request_->headers_) {
634✔
484
                http_request_->set(header.first, header.second);
412✔
485
        }
486

487
        http_request_serializer_ =
488
                make_shared<http::request_serializer<http::buffer_body>>(*http_request_);
222✔
489

490
        auto &cancelled = cancelled_;
491

492
        if (is_https_) {
222✔
493
                http::async_write_header(
4✔
494
                        *stream_,
495
                        *http_request_serializer_,
496
                        [this, cancelled](const error_code &ec, size_t num_written) {
8✔
497
                                if (!*cancelled) {
4✔
498
                                        WriteHeaderHandler(ec, num_written);
4✔
499
                                }
500
                        });
4✔
501
        } else {
502
                http::async_write_header(
218✔
503
                        stream_->next_layer(),
504
                        *http_request_serializer_,
505
                        [this, cancelled](const error_code &ec, size_t num_written) {
436✔
506
                                if (!*cancelled) {
218✔
507
                                        WriteHeaderHandler(ec, num_written);
218✔
508
                                }
509
                        });
218✔
510
        }
511
}
512

513
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
222✔
514
        if (num_written > 0) {
222✔
515
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
444✔
516
        }
517

518
        if (ec) {
222✔
519
                CallErrorHandler(ec, request_, header_handler_);
×
520
                return;
177✔
521
        }
522

523
        auto header = request_->GetHeader("Content-Length");
444✔
524
        if (!header || header.value() == "0") {
222✔
525
                ReadHeader();
176✔
526
                return;
527
        }
528

529
        auto length = common::StringToLongLong(header.value());
46✔
530
        if (!length || length.value() < 0) {
46✔
531
                auto err = error::Error(
532
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
533
                CallErrorHandler(err, request_, header_handler_);
×
534
                return;
535
        }
536
        request_body_length_ = length.value();
46✔
537

538
        if (!request_->body_gen_ && !request_->async_body_gen_) {
46✔
539
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
540
                CallErrorHandler(err, request_, header_handler_);
2✔
541
                return;
542
        }
543

544
        assert(!(request_->body_gen_ && request_->async_body_gen_));
545

546
        if (request_->body_gen_) {
45✔
547
                auto body_reader = request_->body_gen_();
39✔
548
                if (!body_reader) {
39✔
549
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
550
                        return;
551
                }
552
                request_->body_reader_ = body_reader.value();
39✔
553
        } else {
554
                auto body_reader = request_->async_body_gen_();
6✔
555
                if (!body_reader) {
6✔
556
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
557
                        return;
558
                }
559
                request_->async_body_reader_ = body_reader.value();
6✔
560
        }
561

562
        PrepareAndWriteNewBodyBuffer();
45✔
563
}
564

565
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,142✔
566
        if (num_written > 0) {
2,142✔
567
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
2,100✔
568
        }
569

570
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,142✔
571
                // Write next block of the body.
572
                PrepareAndWriteNewBodyBuffer();
1,050✔
573
        } else if (ec) {
1,092✔
574
                CallErrorHandler(ec, request_, header_handler_);
8✔
575
        } else if (num_written > 0) {
1,088✔
576
                // We are still writing the body.
577
                WriteBody();
1,050✔
578
        } else {
579
                // We are ready to receive the response.
580
                ReadHeader();
38✔
581
        }
582
}
2,142✔
583

584
void Client::PrepareAndWriteNewBodyBuffer() {
1,095✔
585
        // request_->body_reader_ XOR request_->async_body_reader_
586
        assert(
587
                (request_->body_reader_ || request_->async_body_reader_)
588
                && !(request_->body_reader_ && request_->async_body_reader_));
589

590
        auto cancelled = cancelled_;
591
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
3,462✔
592
                if (!*cancelled) {
1,095✔
593
                        if (!read) {
1,094✔
594
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
595
                                return;
2✔
596
                        }
597
                        WriteNewBodyBuffer(read.value());
1,092✔
598
                }
599
        };
1,095✔
600

601

602
        if (request_->body_reader_) {
1,095✔
603
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
1,340✔
604
        } else {
605
                auto err = request_->async_body_reader_->AsyncRead(
606
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
850✔
607
                if (err != error::NoError) {
425✔
608
                        CallErrorHandler(err, request_, header_handler_);
×
609
                }
610
        }
611
}
1,095✔
612

613
void Client::WriteNewBodyBuffer(size_t size) {
1,092✔
614
        http_request_->body().data = body_buffer_.data();
1,092✔
615
        http_request_->body().size = size;
1,092✔
616

617
        if (size > 0) {
1,092✔
618
                http_request_->body().more = true;
1,054✔
619
        } else {
620
                // Release ownership of Body reader.
621
                request_->body_reader_.reset();
38✔
622
                request_->async_body_reader_.reset();
38✔
623
                http_request_->body().more = false;
38✔
624
        }
625

626
        WriteBody();
1,092✔
627
}
1,092✔
628

629
void Client::WriteBody() {
2,142✔
630
        auto &cancelled = cancelled_;
631

632
        if (is_https_) {
2,142✔
633
                http::async_write_some(
×
634
                        *stream_,
635
                        *http_request_serializer_,
636
                        [this, cancelled](const error_code &ec, size_t num_written) {
×
637
                                if (!*cancelled) {
×
638
                                        WriteBodyHandler(ec, num_written);
×
639
                                }
640
                        });
×
641
        } else {
642
                http::async_write_some(
2,142✔
643
                        stream_->next_layer(),
644
                        *http_request_serializer_,
645
                        [this, cancelled](const error_code &ec, size_t num_written) {
4,284✔
646
                                if (!*cancelled) {
2,142✔
647
                                        WriteBodyHandler(ec, num_written);
2,142✔
648
                                }
649
                        });
2,142✔
650
        }
651
}
2,142✔
652

653
void Client::ReadHeader() {
214✔
654
        auto &cancelled = cancelled_;
655

656
        if (is_https_) {
214✔
657
                http::async_read_some(
4✔
658
                        *stream_,
659
                        *response_buffer_,
660
                        *http_response_parser_,
661
                        [this, cancelled](const error_code &ec, size_t num_read) {
8✔
662
                                if (!*cancelled) {
4✔
663
                                        ReadHeaderHandler(ec, num_read);
4✔
664
                                }
665
                        });
4✔
666
        } else {
667
                http::async_read_some(
210✔
668
                        stream_->next_layer(),
669
                        *response_buffer_,
670
                        *http_response_parser_,
671
                        [this, cancelled](const error_code &ec, size_t num_read) {
419✔
672
                                if (!*cancelled) {
209✔
673
                                        ReadHeaderHandler(ec, num_read);
209✔
674
                                }
675
                        });
209✔
676
        }
677
}
214✔
678

679
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
213✔
680
        if (num_read > 0) {
213✔
681
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
416✔
682
        }
683

684
        if (ec) {
213✔
685
                CallErrorHandler(ec, request_, header_handler_);
5✔
686
                return;
67✔
687
        }
688

689
        if (!http_response_parser_->is_header_done()) {
208✔
690
                ReadHeader();
×
691
                return;
×
692
        }
693

694
        response_.reset(new IncomingResponse(*this, cancelled_));
416✔
695
        response_->status_code_ = http_response_parser_->get().result_int();
208✔
696
        response_->status_message_ = string {http_response_parser_->get().reason()};
208✔
697

698
        logger_.Debug(
416✔
699
                "Received response: " + to_string(response_->status_code_) + " "
416✔
700
                + response_->status_message_);
624✔
701

702
        string debug_str;
703
        for (auto header = http_response_parser_->get().cbegin();
238✔
704
                 header != http_response_parser_->get().cend();
446✔
705
                 header++) {
706
                response_->headers_[string {header->name_string()}] = string {header->value()};
714✔
707
                if (logger_.Level() >= log::LogLevel::Debug) {
238✔
708
                        debug_str += string {header->name_string()};
235✔
709
                        debug_str += ": ";
235✔
710
                        debug_str += string {header->value()};
235✔
711
                        debug_str += "\n";
235✔
712
                }
713
        }
714

715
        logger_.Debug("Received headers:\n" + debug_str);
416✔
716
        debug_str.clear();
717

718
        if (http_response_parser_->chunked()) {
208✔
719
                auto cancelled = cancelled_;
720
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
721
                CallHandler(header_handler_);
2✔
722
                if (!*cancelled) {
1✔
723
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
724
                        CallErrorHandler(err, request_, body_handler_);
2✔
725
                }
726
                return;
727
        }
728

729
        auto content_length = http_response_parser_->content_length();
207✔
730
        if (content_length) {
207✔
731
                response_body_length_ = content_length.value();
179✔
732
        } else {
733
                response_body_length_ = 0;
28✔
734
        }
735
        response_body_read_ = 0;
207✔
736

737
        if (response_body_read_ >= response_body_length_) {
207✔
738
                auto cancelled = cancelled_;
739
                status_ = TransactionStatus::HeaderHandlerCalled;
42✔
740
                CallHandler(header_handler_);
84✔
741
                if (!*cancelled) {
42✔
742
                        status_ = TransactionStatus::Done;
36✔
743
                        CallHandler(body_handler_);
72✔
744

745
                        // After body handler has run, set the request to cancelled. The body
746
                        // handler may have made a new request, so this is not necessarily the same
747
                        // request as is currently active (note use of shared_ptr copy, not
748
                        // `cancelled_`).
749
                        *cancelled = true;
36✔
750
                }
751
                return;
752
        }
753

754
        auto cancelled = cancelled_;
755
        status_ = TransactionStatus::HeaderHandlerCalled;
165✔
756
        CallHandler(header_handler_);
330✔
757
        if (*cancelled) {
165✔
758
                return;
759
        }
760

761
        // We know that a body reader is required here, because of the `response_body_read_ >=
762
        // response_body_length_` check above.
763
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
146✔
764
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
4✔
765
        }
766
}
767

768
void Client::AsyncReadNextBodyPart(
3,806✔
769
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
770
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
771

772
        if (status_ == TransactionStatus::ReaderCreated) {
3,806✔
773
                status_ = TransactionStatus::BodyReadingInProgress;
144✔
774
        }
775

776
        if (AtLeast(status_, TransactionStatus::BodyReadingFinished)) {
3,806✔
777
                auto cancelled = cancelled_;
778
                handler(0);
78✔
779
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
39✔
780
                        status_ = TransactionStatus::Done;
39✔
781
                        CallHandler(body_handler_);
78✔
782

783
                        // After body handler has run, set the request to cancelled. The body
784
                        // handler may have made a new request, so this is not necessarily the same
785
                        // request as is currently active (note use of shared_ptr copy, not
786
                        // `cancelled_`).
787
                        *cancelled = true;
39✔
788
                }
789
                return;
790
        }
791

792
        reader_buf_start_ = start;
3,767✔
793
        reader_buf_end_ = end;
3,767✔
794
        reader_handler_ = handler;
3,767✔
795
        size_t read_size = end - start;
3,767✔
796
        size_t smallest = min(body_buffer_.size(), read_size);
5,880✔
797

798
        http_response_parser_->get().body().data = body_buffer_.data();
3,767✔
799
        http_response_parser_->get().body().size = smallest;
3,767✔
800

801
        auto &cancelled = cancelled_;
802

803
        if (is_https_) {
3,767✔
804
                http::async_read_some(
×
805
                        *stream_,
806
                        *response_buffer_,
807
                        *http_response_parser_,
808
                        [this, cancelled](const error_code &ec, size_t num_read) {
×
809
                                if (!*cancelled) {
×
810
                                        ReadBodyHandler(ec, num_read);
×
811
                                }
812
                        });
×
813
        } else {
814
                http::async_read_some(
3,767✔
815
                        stream_->next_layer(),
816
                        *response_buffer_,
817
                        *http_response_parser_,
818
                        [this, cancelled](const error_code &ec, size_t num_read) {
7,533✔
819
                                if (!*cancelled) {
3,766✔
820
                                        ReadBodyHandler(ec, num_read);
3,765✔
821
                                }
822
                        });
3,766✔
823
        }
824
}
825

826
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
3,765✔
827
        if (num_read > 0) {
3,765✔
828
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
7,432✔
829
                response_body_read_ += num_read;
3,716✔
830
        }
831

832
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,765✔
833
                // This can be ignored. We always reset the buffer between reads anyway.
834
                ec = error_code();
1,958✔
835
        }
836

837
        assert(reader_handler_);
838

839
        if (response_body_read_ >= response_body_length_) {
3,765✔
840
                status_ = TransactionStatus::BodyReadingFinished;
91✔
841
        }
842

843
        auto cancelled = cancelled_;
844

845
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
3,765✔
846
        size_t smallest = min(num_read, buf_size);
3,765✔
847
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
3,765✔
848
        if (ec) {
3,765✔
849
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
98✔
850
                reader_handler_(expected::unexpected(err));
147✔
851
        } else {
852
                reader_handler_(smallest);
7,432✔
853
        }
854

855
        if (!*cancelled && ec) {
3,765✔
856
                CallErrorHandler(ec, request_, body_handler_);
4✔
857
                return;
858
        }
859
}
860

861
void Client::Cancel() {
192✔
862
        auto cancelled = cancelled_;
863

864
        if (!*cancelled) {
192✔
865
                auto err =
866
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
246✔
867
                switch (status_) {
123✔
868
                case TransactionStatus::None:
1✔
869
                        CallErrorHandler(err, request_, header_handler_);
1✔
870
                        break;
1✔
871
                case TransactionStatus::HeaderHandlerCalled:
120✔
872
                case TransactionStatus::ReaderCreated:
873
                case TransactionStatus::BodyReadingInProgress:
874
                case TransactionStatus::BodyReadingFinished:
875
                        CallErrorHandler(err, request_, body_handler_);
120✔
876
                        break;
120✔
877
                case TransactionStatus::Replying:
878
                case TransactionStatus::SwitchingProtocol:
879
                        // Not used by client.
880
                        assert(false);
881
                        break;
882
                case TransactionStatus::BodyHandlerCalled:
883
                case TransactionStatus::Done:
884
                        break;
885
                }
886
        }
887

888
        if (!*cancelled) {
192✔
889
                DoCancel();
2✔
890
        }
891
}
192✔
892

893
void Client::DoCancel() {
398✔
894
        resolver_.cancel();
398✔
895
        if (stream_) {
398✔
896
                stream_->next_layer().cancel();
65✔
897
                stream_->next_layer().close();
65✔
898
                stream_.reset();
65✔
899
        }
900

901
        request_.reset();
398✔
902
        response_.reset();
398✔
903

904
        // Reset logger to no connection.
905
        logger_ = log::Logger(logger_name_);
398✔
906

907
        // Set cancel state and then make a new one. Those who are interested should have their own
908
        // pointer to the old one.
909
        *cancelled_ = true;
398✔
910
        cancelled_ = make_shared<bool>(true);
398✔
911
}
398✔
912

913
ClientConfig::ClientConfig() :
91✔
914
        ClientConfig("") {
182✔
915
}
91✔
916

917
ClientConfig::ClientConfig(
377✔
918
        const string &server_cert_path,
919
        const string &client_cert_path,
920
        const string &client_cert_key_path) :
377✔
921
        server_cert_path {server_cert_path},
922
        client_cert_path {client_cert_path},
923
        client_cert_key_path {client_cert_key_path} {};
377✔
924

925
ClientConfig::~ClientConfig() {
377✔
926
}
377✔
927

928
ServerConfig::ServerConfig() {
184✔
929
}
184✔
930

931
ServerConfig::~ServerConfig() {
184✔
932
}
184✔
933

934
Stream::Stream(Server &server) :
414✔
935
        server_ {server},
936
        logger_ {"http"},
937
        cancelled_(make_shared<bool>(true)),
414✔
938
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
414✔
939
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
1,242✔
940
        request_buffer_ = make_shared<beast::flat_buffer>();
828✔
941

942
        // This is equivalent to:
943
        //   request_buffer_.reserve(body_buffer_.size());
944
        // but compatible with Boost 1.67.
945
        request_buffer_->prepare(body_buffer_.size() - request_buffer_->size());
414✔
946

947
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
948
        // they do, they should be handled higher up in the application logic.
949
        //
950
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
951
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
952
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
953
        // possible value instead.
954
        http_request_parser_.body_limit(numeric_limits<uint64_t>::max());
955
}
414✔
956

957
Stream::~Stream() {
828✔
958
        DoCancel();
414✔
959
}
414✔
960

961
void Stream::Cancel() {
7✔
962
        auto cancelled = cancelled_;
963

964
        if (!*cancelled) {
7✔
965
                auto err =
966
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
967
                switch (status_) {
7✔
968
                case TransactionStatus::None:
×
969
                        CallErrorHandler(err, request_, server_.header_handler_);
×
970
                        break;
×
971
                case TransactionStatus::HeaderHandlerCalled:
5✔
972
                case TransactionStatus::ReaderCreated:
973
                case TransactionStatus::BodyReadingInProgress:
974
                case TransactionStatus::BodyReadingFinished:
975
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
976
                        break;
5✔
977
                case TransactionStatus::BodyHandlerCalled:
978
                        // In between body handler and reply finished. No one to handle the status
979
                        // here.
980
                        break;
981
                case TransactionStatus::Replying:
1✔
982
                        CallErrorHandler(err, request_, reply_finished_handler_);
3✔
983
                        break;
1✔
984
                case TransactionStatus::SwitchingProtocol:
1✔
985
                        CallErrorHandler(err, request_, switch_protocol_handler_);
3✔
986
                        break;
1✔
987
                case TransactionStatus::Done:
988
                        break;
989
                }
990
        }
991

992
        if (!*cancelled) {
7✔
993
                DoCancel();
×
994
        }
995
}
7✔
996

997
void Stream::DoCancel() {
599✔
998
        if (socket_.is_open()) {
599✔
999
                socket_.cancel();
210✔
1000
                socket_.close();
210✔
1001
        }
1002

1003
        // Set cancel state and then make a new one. Those who are interested should have their own
1004
        // pointer to the old one.
1005
        *cancelled_ = true;
599✔
1006
        cancelled_ = make_shared<bool>(true);
599✔
1007
}
599✔
1008

1009
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
1010
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1011
}
×
1012

1013
void Stream::CallErrorHandler(
×
1014
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
1015
        *cancelled_ = true;
×
1016
        cancelled_ = make_shared<bool>(true);
×
1017
        status_ = TransactionStatus::Done;
×
1018
        handler(expected::unexpected(err.WithContext(
×
1019
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
1020

1021
        server_.RemoveStream(shared_from_this());
×
1022
}
×
1023

1024
void Stream::CallErrorHandler(
2✔
1025
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1026
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
6✔
1027
}
2✔
1028

1029
void Stream::CallErrorHandler(
9✔
1030
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
1031
        *cancelled_ = true;
9✔
1032
        cancelled_ = make_shared<bool>(true);
9✔
1033
        status_ = TransactionStatus::Done;
9✔
1034
        handler(
9✔
1035
                req,
1036
                err.WithContext(
9✔
1037
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
27✔
1038

1039
        server_.RemoveStream(shared_from_this());
9✔
1040
}
9✔
1041

1042
void Stream::CallErrorHandler(
4✔
1043
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
1044
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
12✔
1045
}
4✔
1046

1047
void Stream::CallErrorHandler(
7✔
1048
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
1049
        *cancelled_ = true;
7✔
1050
        cancelled_ = make_shared<bool>(true);
7✔
1051
        status_ = TransactionStatus::Done;
7✔
1052
        handler(err.WithContext(
14✔
1053
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
14✔
1054

1055
        server_.RemoveStream(shared_from_this());
7✔
1056
}
7✔
1057

1058
void Stream::CallErrorHandler(
×
1059
        const error_code &ec, const RequestPtr &req, SwitchProtocolHandler handler) {
1060
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
1061
}
×
1062

1063
void Stream::CallErrorHandler(
1✔
1064
        const error::Error &err, const RequestPtr &req, SwitchProtocolHandler handler) {
1065
        *cancelled_ = true;
1✔
1066
        cancelled_ = make_shared<bool>(true);
1✔
1067
        status_ = TransactionStatus::Done;
1✔
1068
        handler(expected::unexpected(err.WithContext(
2✔
1069
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
4✔
1070

1071
        server_.RemoveStream(shared_from_this());
1✔
1072
}
1✔
1073

1074
void Stream::AcceptHandler(const error_code &ec) {
218✔
1075
        if (ec) {
218✔
1076
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
1077
                return;
×
1078
        }
1079

1080
        auto ip = socket_.remote_endpoint().address().to_string();
436✔
1081

1082
        // Use IP as context for logging.
1083
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
218✔
1084

1085
        logger_.Debug("Accepted connection.");
436✔
1086

1087
        request_.reset(new IncomingRequest(*this, cancelled_));
436✔
1088

1089
        request_->address_.host = ip;
218✔
1090

1091
        *cancelled_ = false;
218✔
1092

1093
        ReadHeader();
218✔
1094
}
1095

1096
void Stream::ReadHeader() {
218✔
1097
        auto &cancelled = cancelled_;
1098

1099
        http::async_read_some(
218✔
1100
                socket_,
218✔
1101
                *request_buffer_,
1102
                http_request_parser_,
1103
                [this, cancelled](const error_code &ec, size_t num_read) {
218✔
1104
                        if (!*cancelled) {
218✔
1105
                                ReadHeaderHandler(ec, num_read);
218✔
1106
                        }
1107
                });
218✔
1108
}
218✔
1109

1110
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
218✔
1111
        if (num_read > 0) {
218✔
1112
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
436✔
1113
        }
1114

1115
        if (ec) {
218✔
1116
                CallErrorHandler(ec, request_, server_.header_handler_);
×
1117
                return;
173✔
1118
        }
1119

1120
        if (!http_request_parser_.is_header_done()) {
218✔
1121
                ReadHeader();
×
1122
                return;
×
1123
        }
1124

1125
        auto method_result = BeastVerbToMethod(
1126
                http_request_parser_.get().base().method(),
1127
                string {http_request_parser_.get().base().method_string()});
436✔
1128
        if (!method_result) {
218✔
1129
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
1130
                return;
×
1131
        }
1132
        request_->method_ = method_result.value();
218✔
1133
        request_->address_.path = string(http_request_parser_.get().base().target());
218✔
1134

1135
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
218✔
1136

1137
        string debug_str;
1138
        for (auto header = http_request_parser_.get().cbegin();
408✔
1139
                 header != http_request_parser_.get().cend();
626✔
1140
                 header++) {
1141
                request_->headers_[string {header->name_string()}] = string {header->value()};
1,224✔
1142
                if (logger_.Level() >= log::LogLevel::Debug) {
408✔
1143
                        debug_str += string {header->name_string()};
395✔
1144
                        debug_str += ": ";
395✔
1145
                        debug_str += string {header->value()};
395✔
1146
                        debug_str += "\n";
395✔
1147
                }
1148
        }
1149

1150
        logger_.Debug("Received headers:\n" + debug_str);
436✔
1151
        debug_str.clear();
1152

1153
        if (http_request_parser_.chunked()) {
218✔
1154
                auto cancelled = cancelled_;
1155
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
1156
                server_.header_handler_(request_);
2✔
1157
                if (!*cancelled) {
1✔
1158
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
1159
                        CallErrorHandler(err, request_, server_.body_handler_);
2✔
1160
                }
1161
                return;
1162
        }
1163

1164
        auto content_length = http_request_parser_.content_length();
217✔
1165
        if (content_length) {
217✔
1166
                request_body_length_ = content_length.value();
46✔
1167
        } else {
1168
                request_body_length_ = 0;
171✔
1169
        }
1170
        request_body_read_ = 0;
217✔
1171

1172
        if (request_body_read_ >= request_body_length_) {
217✔
1173
                auto cancelled = cancelled_;
1174
                status_ = TransactionStatus::HeaderHandlerCalled;
171✔
1175
                server_.header_handler_(request_);
342✔
1176
                if (!*cancelled) {
171✔
1177
                        status_ = TransactionStatus::BodyHandlerCalled;
171✔
1178
                        CallBodyHandler();
171✔
1179
                }
1180
                return;
1181
        }
1182

1183
        auto cancelled = cancelled_;
1184
        status_ = TransactionStatus::HeaderHandlerCalled;
46✔
1185
        server_.header_handler_(request_);
92✔
1186
        if (*cancelled) {
46✔
1187
                return;
1188
        }
1189

1190
        // We know that a body reader is required here, because of the `request_body_read_ >=
1191
        // request_body_length_` check above.
1192
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
45✔
1193
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
2✔
1194
        }
1195
}
1196

1197
void Stream::AsyncReadNextBodyPart(
2,046✔
1198
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
1199
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1200

1201
        if (status_ == TransactionStatus::ReaderCreated) {
2,046✔
1202
                status_ = TransactionStatus::BodyReadingInProgress;
44✔
1203
        }
1204

1205
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,046✔
1206
                auto cancelled = cancelled_;
1207
                handler(0);
76✔
1208
                if (!*cancelled && status_ == TransactionStatus::BodyReadingFinished) {
38✔
1209
                        status_ = TransactionStatus::BodyHandlerCalled;
38✔
1210
                        CallBodyHandler();
38✔
1211
                }
1212
                return;
1213
        }
1214

1215
        reader_buf_start_ = start;
2,008✔
1216
        reader_buf_end_ = end;
2,008✔
1217
        reader_handler_ = handler;
2,008✔
1218
        size_t read_size = end - start;
2,008✔
1219
        size_t smallest = min(body_buffer_.size(), read_size);
2,008✔
1220

1221
        http_request_parser_.get().body().data = body_buffer_.data();
2,008✔
1222
        http_request_parser_.get().body().size = smallest;
2,008✔
1223

1224
        auto &cancelled = cancelled_;
1225

1226
        http::async_read_some(
2,008✔
1227
                socket_,
2,008✔
1228
                *request_buffer_,
1229
                http_request_parser_,
1230
                [this, cancelled](const error_code &ec, size_t num_read) {
4,016✔
1231
                        if (!*cancelled) {
2,008✔
1232
                                ReadBodyHandler(ec, num_read);
2,008✔
1233
                        }
1234
                });
2,008✔
1235
}
1236

1237
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,008✔
1238
        if (num_read > 0) {
2,008✔
1239
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
4,008✔
1240
                request_body_read_ += num_read;
2,004✔
1241
        }
1242

1243
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,008✔
1244
                // This can be ignored. We always reset the buffer between reads anyway.
1245
                ec = error_code();
979✔
1246
        }
1247

1248
        assert(reader_handler_);
1249

1250
        if (request_body_read_ >= request_body_length_) {
2,008✔
1251
                status_ = TransactionStatus::BodyReadingFinished;
38✔
1252
        }
1253

1254
        auto cancelled = cancelled_;
1255

1256
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,008✔
1257
        size_t smallest = min(num_read, buf_size);
2,008✔
1258
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,008✔
1259
        if (ec) {
2,008✔
1260
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
8✔
1261
                reader_handler_(expected::unexpected(err));
12✔
1262
        } else {
1263
                reader_handler_(smallest);
4,008✔
1264
        }
1265

1266
        if (!*cancelled && ec) {
2,008✔
1267
                CallErrorHandler(ec, request_, server_.body_handler_);
4✔
1268
                return;
1269
        }
1270
}
1271

1272
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
196✔
1273
        SetupResponse();
196✔
1274

1275
        reply_finished_handler_ = reply_finished_handler;
196✔
1276

1277
        auto &cancelled = cancelled_;
1278

1279
        http::async_write_header(
196✔
1280
                socket_,
196✔
1281
                *http_response_serializer_,
1282
                [this, cancelled](const error_code &ec, size_t num_written) {
196✔
1283
                        if (!*cancelled) {
196✔
1284
                                WriteHeaderHandler(ec, num_written);
195✔
1285
                        }
1286
                });
196✔
1287
}
196✔
1288

1289
void Stream::SetupResponse() {
205✔
1290
        auto response = maybe_response_.lock();
205✔
1291
        // Only called from existing responses, so this should always be true.
1292
        assert(response);
1293

1294
        assert(status_ == TransactionStatus::BodyHandlerCalled);
1295
        status_ = TransactionStatus::Replying;
205✔
1296

1297
        // From here on we take shared ownership.
1298
        response_ = response;
1299

1300
        http_response_ = make_shared<http::response<http::buffer_body>>();
410✔
1301

1302
        for (const auto &header : response->headers_) {
429✔
1303
                http_response_->base().set(header.first, header.second);
224✔
1304
        }
1305

1306
        http_response_->result(response->GetStatusCode());
205✔
1307
        http_response_->reason(response->GetStatusMessage());
410✔
1308

1309
        http_response_serializer_ =
1310
                make_shared<http::response_serializer<http::buffer_body>>(*http_response_);
410✔
1311
}
205✔
1312

1313
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
195✔
1314
        if (num_written > 0) {
195✔
1315
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
390✔
1316
        }
1317

1318
        if (ec) {
195✔
1319
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1320
                return;
34✔
1321
        }
1322

1323
        auto header = response_->GetHeader("Content-Length");
390✔
1324
        if (!header || header.value() == "0") {
195✔
1325
                FinishReply();
33✔
1326
                return;
1327
        }
1328

1329
        auto length = common::StringToLongLong(header.value());
162✔
1330
        if (!length || length.value() < 0) {
162✔
1331
                auto err = error::Error(
1332
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1333
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1334
                return;
1335
        }
1336

1337
        if (!response_->body_reader_ && !response_->async_body_reader_) {
162✔
1338
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1339
                CallErrorHandler(err, request_, reply_finished_handler_);
3✔
1340
                return;
1341
        }
1342

1343
        PrepareAndWriteNewBodyBuffer();
161✔
1344
}
1345

1346
void Stream::PrepareAndWriteNewBodyBuffer() {
1,883✔
1347
        // response_->body_reader_ XOR response_->async_body_reader_
1348
        assert(
1349
                (response_->body_reader_ || response_->async_body_reader_)
1350
                && !(response_->body_reader_ && response_->async_body_reader_));
1351

1352
        auto read_handler = [this](io::ExpectedSize read) {
1,884✔
1353
                if (!read) {
1,883✔
1354
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
3✔
1355
                        return;
1✔
1356
                }
1357
                WriteNewBodyBuffer(read.value());
1,882✔
1358
        };
1,883✔
1359

1360
        if (response_->body_reader_) {
1,883✔
1361
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
3,218✔
1362
        } else {
1363
                auto err = response_->async_body_reader_->AsyncRead(
1364
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
274✔
1365
                if (err != error::NoError) {
274✔
1366
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1367
                }
1368
        }
1369
}
1,883✔
1370

1371
void Stream::WriteNewBodyBuffer(size_t size) {
1,882✔
1372
        http_response_->body().data = body_buffer_.data();
1,882✔
1373
        http_response_->body().size = size;
1,882✔
1374

1375
        if (size > 0) {
1,882✔
1376
                http_response_->body().more = true;
1,755✔
1377
        } else {
1378
                http_response_->body().more = false;
127✔
1379
        }
1380

1381
        WriteBody();
1,882✔
1382
}
1,882✔
1383

1384
void Stream::WriteBody() {
3,616✔
1385
        auto &cancelled = cancelled_;
1386

1387
        http::async_write_some(
3,616✔
1388
                socket_,
3,616✔
1389
                *http_response_serializer_,
1390
                [this, cancelled](const error_code &ec, size_t num_written) {
3,580✔
1391
                        if (!*cancelled) {
3,580✔
1392
                                WriteBodyHandler(ec, num_written);
3,580✔
1393
                        }
1394
                });
3,580✔
1395
}
3,616✔
1396

1397
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
3,580✔
1398
        if (num_written > 0) {
3,580✔
1399
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
3,468✔
1400
        }
1401

1402
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,580✔
1403
                // Write next body block.
1404
                PrepareAndWriteNewBodyBuffer();
1,722✔
1405
        } else if (ec) {
1,858✔
1406
                CallErrorHandler(ec, request_, reply_finished_handler_);
12✔
1407
        } else if (num_written > 0) {
1,854✔
1408
                // We are still writing the body.
1409
                WriteBody();
1,734✔
1410
        } else {
1411
                // We are finished.
1412
                FinishReply();
120✔
1413
        }
1414
}
3,580✔
1415

1416
void Stream::FinishReply() {
153✔
1417
        // We are done.
1418
        *cancelled_ = true;
153✔
1419
        cancelled_ = make_shared<bool>(true);
153✔
1420
        status_ = TransactionStatus::Done;
153✔
1421
        // Release ownership of Body reader.
1422
        response_->body_reader_.reset();
153✔
1423
        response_->async_body_reader_.reset();
153✔
1424
        reply_finished_handler_(error::NoError);
153✔
1425
        server_.RemoveStream(shared_from_this());
153✔
1426
}
153✔
1427

1428
error::Error Stream::AsyncSwitchProtocol(SwitchProtocolHandler handler) {
9✔
1429
        SetupResponse();
9✔
1430

1431
        switch_protocol_handler_ = handler;
9✔
1432
        status_ = TransactionStatus::SwitchingProtocol;
9✔
1433

1434
        auto &cancelled = cancelled_;
1435

1436
        http::async_write_header(
9✔
1437
                socket_,
9✔
1438
                *http_response_serializer_,
1439
                [this, cancelled](const error_code &ec, size_t num_written) {
9✔
1440
                        if (!*cancelled) {
9✔
1441
                                SwitchingProtocolHandler(ec, num_written);
8✔
1442
                        }
1443
                });
9✔
1444

1445
        return error::NoError;
9✔
1446
}
1447

1448
void Stream::SwitchingProtocolHandler(error_code ec, size_t num_written) {
8✔
1449
        if (num_written > 0) {
8✔
1450
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
16✔
1451
        }
1452

1453
        if (ec) {
8✔
1454
                CallErrorHandler(ec, request_, switch_protocol_handler_);
×
1455
                return;
×
1456
        }
1457

1458
        auto socket = make_shared<RawSocket<tcp::socket>>(
1459
                make_shared<tcp::socket>(std::move(socket_)), request_buffer_);
8✔
1460

1461
        // Rest of the connection is done directly on the socket, we are done here.
1462
        status_ = TransactionStatus::Done;
8✔
1463
        *cancelled_ = true;
8✔
1464
        cancelled_ = make_shared<bool>(true);
8✔
1465
        server_.RemoveStream(shared_from_this());
16✔
1466

1467
        switch_protocol_handler_(socket);
16✔
1468
}
1469

1470
void Stream::CallBodyHandler() {
209✔
1471
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1472
        // it immediately destroys, which would destroy this stream as well. At the end of this
1473
        // function, it's ok to destroy it.
1474
        auto stream_ref = shared_from_this();
1475

1476
        server_.body_handler_(request_, error::NoError);
627✔
1477

1478
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1479
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1480
        // request has not been handled correctly.
1481
        auto response = maybe_response_.lock();
209✔
1482
        if (!response) {
209✔
1483
                logger_.Error("Handler produced no response. Closing stream prematurely.");
4✔
1484
                *cancelled_ = true;
2✔
1485
                cancelled_ = make_shared<bool>(true);
2✔
1486
                server_.RemoveStream(shared_from_this());
6✔
1487
        }
1488
}
209✔
1489

1490
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
216✔
1491
        event_loop_ {event_loop},
1492
        acceptor_(GetAsioIoContext(event_loop_)) {
386✔
1493
}
216✔
1494

1495
Server::~Server() {
432✔
1496
        Cancel();
216✔
1497
}
216✔
1498

1499
error::Error Server::AsyncServeUrl(
184✔
1500
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1501
        return AsyncServeUrl(
1502
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
752✔
1503
                        if (err != error::NoError) {
206✔
1504
                                body_handler(expected::unexpected(err));
14✔
1505
                        } else {
1506
                                body_handler(req);
398✔
1507
                        }
1508
                });
574✔
1509
}
1510

1511
error::Error Server::AsyncServeUrl(
197✔
1512
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1513
        auto err = BreakDownUrl(url, address_);
197✔
1514
        if (error::NoError != err) {
197✔
1515
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1516
        }
1517

1518
        if (address_.protocol != "http") {
197✔
1519
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1520
        }
1521

1522
        if (address_.path.size() > 0 && address_.path != "/") {
197✔
1523
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
2✔
1524
        }
1525

1526
        boost::system::error_code ec;
196✔
1527
        auto address = asio::ip::make_address(address_.host, ec);
196✔
1528
        if (ec) {
196✔
1529
                return error::Error(
1530
                        ec.default_error_condition(),
×
1531
                        "Could not construct endpoint from address " + address_.host);
×
1532
        }
1533

1534
        asio::ip::tcp::endpoint endpoint(address, address_.port);
196✔
1535

1536
        ec.clear();
1537
        acceptor_.open(endpoint.protocol(), ec);
196✔
1538
        if (ec) {
196✔
1539
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1540
        }
1541

1542
        // Allow address reuse, otherwise we can't re-bind later.
1543
        ec.clear();
1544
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
196✔
1545
        if (ec) {
196✔
1546
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1547
        }
1548

1549
        ec.clear();
1550
        acceptor_.bind(endpoint, ec);
196✔
1551
        if (ec) {
196✔
1552
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1553
        }
1554

1555
        ec.clear();
1556
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
196✔
1557
        if (ec) {
196✔
1558
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1559
        }
1560

1561
        header_handler_ = header_handler;
196✔
1562
        body_handler_ = body_handler;
196✔
1563

1564
        PrepareNewStream();
196✔
1565

1566
        return error::NoError;
196✔
1567
}
1568

1569
void Server::Cancel() {
231✔
1570
        if (acceptor_.is_open()) {
231✔
1571
                acceptor_.cancel();
196✔
1572
                acceptor_.close();
196✔
1573
        }
1574
        streams_.clear();
1575
}
231✔
1576

1577
uint16_t Server::GetPort() const {
13✔
1578
        return acceptor_.local_endpoint().port();
13✔
1579
}
1580

1581
string Server::GetUrl() const {
12✔
1582
        return "http://127.0.0.1:" + to_string(GetPort());
24✔
1583
}
1584

1585
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
208✔
1586
        if (*req->cancelled_) {
208✔
1587
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1588
        }
1589
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
416✔
1590
        req->stream_.maybe_response_ = response;
208✔
1591
        return response;
208✔
1592
}
1593

1594
error::Error Server::AsyncReply(
196✔
1595
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1596
        if (*resp->cancelled_) {
196✔
1597
                return MakeError(StreamCancelledError, "Cannot send response");
×
1598
        }
1599

1600
        resp->stream_.AsyncReply(reply_finished_handler);
196✔
1601
        return error::NoError;
196✔
1602
}
1603

1604
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
60✔
1605
        if (*req->cancelled_) {
60✔
1606
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1607
        }
1608

1609
        auto &stream = req->stream_;
60✔
1610
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
60✔
1611
                return expected::unexpected(error::Error(
1✔
1612
                        make_error_condition(errc::operation_in_progress),
2✔
1613
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1614
        }
1615

1616
        if (stream.request_body_length_ == 0) {
59✔
1617
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
45✔
1618
        }
1619

1620
        stream.status_ = TransactionStatus::ReaderCreated;
44✔
1621
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
88✔
1622
}
1623

1624
error::Error Server::AsyncSwitchProtocol(OutgoingResponsePtr resp, SwitchProtocolHandler handler) {
9✔
1625
        return resp->stream_.AsyncSwitchProtocol(handler);
18✔
1626
}
1627

1628
void Server::PrepareNewStream() {
414✔
1629
        StreamPtr new_stream {new Stream(*this)};
414✔
1630
        streams_.insert(new_stream);
1631
        AsyncAccept(new_stream);
828✔
1632
}
414✔
1633

1634
void Server::AsyncAccept(StreamPtr stream) {
414✔
1635
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
635✔
1636
                if (ec) {
221✔
1637
                        log::Error("Could not accept connection: " + ec.message());
6✔
1638
                        return;
3✔
1639
                }
1640

1641
                stream->AcceptHandler(ec);
218✔
1642

1643
                this->PrepareNewStream();
218✔
1644
        });
1645
}
414✔
1646

1647
void Server::RemoveStream(const StreamPtr &stream) {
185✔
1648
        streams_.erase(stream);
185✔
1649

1650
        // Work around bug in Boost ASIO: When the handler for `async_read_some` is called with `ec
1651
        // == operation_aborted`, the handler should not access any supplied buffers, because it may
1652
        // be aborted due to object destruction. However, it does access buffers. This means it does
1653
        // not help to call `Cancel()` prior to destruction. We need to call `Cancel()` first, and
1654
        // then wait until the handler which receives `operation_aborted` has run. So do a
1655
        // `Cancel()` followed by `Post()` for this, which should queue us in the correct order:
1656
        // `operation_aborted` -> `Post` handler.
1657
        stream->DoCancel();
185✔
1658
        event_loop_.Post([stream]() {
740✔
1659
                // No-op, just keep `stream` alive until we get back to this handler.
1660
        });
370✔
1661
}
185✔
1662

1663
} // namespace http
1664
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc