• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1001210836

12 Sep 2023 01:20PM UTC coverage: 79.201% (+0.09%) from 79.112%
1001210836

push

gitlab-ci

vpodzime
feat: Add support for signals delivering two strings

Ticket: MEN-6651
Changelog: none
Signed-off-by: Vratislav Podzimek <v.podzimek@mykolab.com>

5731 of 7236 relevant lines covered (79.2%)

256.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.93
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24

25
namespace mender {
26
namespace http {
27

28
namespace common = mender::common;
29

30
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
31
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
32
const unsigned int BeastHttpVersion = 11;
33

34
namespace asio = boost::asio;
35
namespace http = boost::beast::http;
36

37
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
38

39
static http::verb MethodToBeastVerb(Method method) {
104✔
40
        switch (method) {
104✔
41
        case Method::GET:
76✔
42
                return http::verb::get;
76✔
43
        case Method::HEAD:
×
44
                return http::verb::head;
×
45
        case Method::POST:
19✔
46
                return http::verb::post;
19✔
47
        case Method::PUT:
9✔
48
                return http::verb::put;
9✔
49
        case Method::PATCH:
×
50
                return http::verb::patch;
×
51
        case Method::CONNECT:
×
52
                return http::verb::connect;
×
53
        case Method::Invalid:
×
54
                // Fallthrough to end (no-op).
55
                break;
×
56
        }
57
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
58
        // still assert here for safety.
59
        assert(false);
×
60
        return http::verb::get;
61
}
62

63
static expected::expected<Method, error::Error> BeastVerbToMethod(
102✔
64
        http::verb verb, const string &verb_string) {
65
        switch (verb) {
102✔
66
        case http::verb::get:
74✔
67
                return Method::GET;
74✔
68
        case http::verb::head:
×
69
                return Method::HEAD;
×
70
        case http::verb::post:
19✔
71
                return Method::POST;
19✔
72
        case http::verb::put:
9✔
73
                return Method::PUT;
9✔
74
        case http::verb::patch:
×
75
                return Method::PATCH;
×
76
        case http::verb::connect:
×
77
                return Method::CONNECT;
×
78
        default:
×
79
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
80
        }
81
}
82

83
template <typename StreamType>
84
class BodyAsyncReader : virtual public io::AsyncReader {
85
public:
86
        BodyAsyncReader(weak_ptr<StreamType> stream) :
62✔
87
                stream_ {stream} {
62✔
88
        }
62✔
89
        ~BodyAsyncReader() {
33✔
90
                Cancel();
33✔
91
        }
33✔
92

93
        error::Error AsyncRead(
1,350✔
94
                vector<uint8_t>::iterator start,
95
                vector<uint8_t>::iterator end,
96
                io::AsyncIoHandler handler) override {
97
                auto stream = stream_.lock();
2,700✔
98
                if (!stream) {
1,350✔
99
                        return error::MakeError(
×
100
                                error::ProgrammingError,
101
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
102
                }
103
                stream->AsyncReadNextBodyPart(start, end, handler);
1,350✔
104
                return error::NoError;
1,350✔
105
        }
106

107
        void Cancel() override {
33✔
108
                auto stream = stream_.lock();
66✔
109
                if (stream) {
33✔
110
                        stream->Cancel();
×
111
                }
112
        }
33✔
113

114
private:
115
        weak_ptr<StreamType> stream_;
116

117
        friend class Client;
118
        friend class Server;
119
};
120

121
Client::Client(
213✔
122
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
213✔
123
        event_loop_ {event_loop},
124
        logger_name_ {logger_name},
125
        cancelled_ {make_shared<bool>(false)},
×
126
        resolver_(GetAsioIoContext(event_loop)),
127
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
426✔
128
        // This is equivalent to:
129
        //   response_buffer_.reserve(body_buffer_.size());
130
        // but compatible with Boost 1.67.
131
        response_buffer_.prepare(body_buffer_.size() - response_buffer_.size());
213✔
132

133
        ssl_ctx_.set_verify_mode(ssl::verify_peer);
213✔
134

135
        beast::error_code ec {};
213✔
136
        ssl_ctx_.set_default_verify_paths(ec); // Load the default CAs
213✔
137
        if (ec) {
213✔
138
                log::Error("Failed to load the SSL default directory");
×
139
        }
140
        if (client.server_cert_path != "") {
213✔
141
                ssl_ctx_.load_verify_file(client.server_cert_path, ec);
2✔
142
                if (ec) {
2✔
143
                        log::Error("Failed to load the server certificate!");
×
144
                }
145
        }
146
}
213✔
147

148
Client::~Client() {
213✔
149
        if (client_active_) {
213✔
150
                logger_.Warning("Client destroyed while request is still active!");
4✔
151
        }
152
        Cancel();
213✔
153
}
213✔
154

155
error::Error Client::AsyncCall(
110✔
156
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
157
        if (client_active_) {
110✔
158
                return error::Error(
159
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
160
        }
161

162
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
110✔
163
                return error::MakeError(error::ProgrammingError, "Request is not ready");
2✔
164
        }
165

166
        if (!header_handler || !body_handler) {
108✔
167
                return error::MakeError(
168
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
1✔
169
        }
170

171
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
107✔
172
                return error::Error(
173
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
174
        }
175

176
        if (req->address_.protocol == "https") {
106✔
177
                is_https_ = true;
3✔
178
        }
179

180
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
106✔
181

182
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
183
        // request to route to our k8s cluster. Set this in all cases.
184
        req->SetHeader("HOST", req->address_.host);
106✔
185

186
        request_ = req;
106✔
187
        header_handler_ = header_handler;
106✔
188
        body_handler_ = body_handler;
106✔
189
        body_status_ = BodyReadingStatus::None;
106✔
190

191
        // See comment in header.
192
        client_active_.reset(this, [](Client *) {});
106✔
193

194
        weak_ptr<Client> weak_client(client_active_);
212✔
195

196
        resolver_.async_resolve(
318✔
197
                request_->address_.host,
106✔
198
                to_string(request_->address_.port),
212✔
199
                [weak_client](const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
106✔
200
                        auto client = weak_client.lock();
212✔
201
                        if (client) {
106✔
202
                                client->ResolveHandler(ec, results);
105✔
203
                        }
204
                });
106✔
205

206
        return error::NoError;
106✔
207
}
208

209
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
74✔
210
        if (body_status_ != BodyReadingStatus::None) {
74✔
211
                return expected::unexpected(error::Error(
1✔
212
                        make_error_condition(errc::operation_in_progress),
1✔
213
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
214
        }
215

216
        if (response_body_length_ == 0) {
73✔
217
                return expected::unexpected(
11✔
218
                        MakeError(BodyMissingError, "Response does not contain a body"));
22✔
219
        }
220

221
        body_status_ = BodyReadingStatus::ReaderCreated;
62✔
222
        return make_shared<BodyAsyncReader<Client>>(resp->client_);
124✔
223
}
224

225
void Client::CallHandler(ResponseHandler handler) {
152✔
226
        // This function exists to make sure we have a copy of the handler we're calling (in the
227
        // argument list). This is important in case the handler owns the client instance through a
228
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
229
        // does, then it destroyes the final copy of the handler, and therefore also the client,
230
        // which is why we need to make a copy here, before calling it.
231
        handler(response_);
152✔
232
}
152✔
233

234
void Client::CallErrorHandler(
9✔
235
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
236
        client_active_.reset();
9✔
237
        stream_.reset();
9✔
238
        handler(expected::unexpected(error::Error(
18✔
239
                ec.default_error_condition(), MethodToString(req->method_) + " " + req->orig_address_)));
27✔
240
}
9✔
241

242
void Client::CallErrorHandler(
5✔
243
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
244
        client_active_.reset();
5✔
245
        stream_.reset();
5✔
246
        handler(expected::unexpected(error::Error(
10✔
247
                err.code, err.message + ": " + MethodToString(req->method_) + " " + req->orig_address_)));
15✔
248
}
5✔
249

250
void Client::ResolveHandler(
105✔
251
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
252
        if (ec) {
105✔
253
                CallErrorHandler(ec, request_, header_handler_);
×
254
                return;
×
255
        }
256

257
        if (logger_.Level() >= log::LogLevel::Debug) {
105✔
258
                string ips = "[";
208✔
259
                string sep;
104✔
260
                for (auto r : results) {
219✔
261
                        ips += sep;
115✔
262
                        ips += r.endpoint().address().to_string();
115✔
263
                        sep = ", ";
115✔
264
                }
265
                ips += "]";
104✔
266
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
104✔
267
        }
268

269
        resolver_results_ = results;
105✔
270

271
        stream_ = make_shared<ssl::stream<tcp::socket>>(GetAsioIoContext(event_loop_), ssl_ctx_);
105✔
272

273
        http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
105✔
274

275
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
276
        // if they do, they should be handled higher up in the application logic.
277
        //
278
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
279
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
280
        // `has_value()` in their code, causing their subsequent comparison operation to
281
        // misbehave. So pass highest possible value instead.
282
        http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
105✔
283

284
        weak_ptr<Client> weak_client(client_active_);
105✔
285

286
        asio::async_connect(
210✔
287
                stream_->next_layer(),
105✔
288
                resolver_results_,
105✔
289
                [weak_client](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
105✔
290
                        auto client = weak_client.lock();
105✔
291
                        if (client) {
105✔
292
                                if (client->is_https_) {
105✔
293
                                        return client->HandshakeHandler(ec, endpoint);
3✔
294
                                }
295
                                return client->ConnectHandler(ec, endpoint);
102✔
296
                        }
297
                });
298
}
299

300
void Client::HandshakeHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
3✔
301
        if (ec) {
3✔
302
                CallErrorHandler(ec, request_, header_handler_);
×
303
                return;
×
304
        }
305

306
        // Set SNI Hostname (many hosts need this to handshake successfully)
307
        if (!SSL_set_tlsext_host_name(stream_->native_handle(), request_->address_.host.c_str())) {
3✔
308
                beast::error_code ec2 {
309
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
310
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
311
        }
312

313
        weak_ptr<Client> weak_client(client_active_);
3✔
314

315
        stream_->async_handshake(
3✔
316
                ssl::stream_base::client, [weak_client, endpoint](const error_code &ec) {
3✔
317
                        auto client = weak_client.lock();
3✔
318
                        if (!client) {
3✔
319
                                return;
×
320
                        }
321
                        if (ec) {
3✔
322
                                client->logger_.Error(
2✔
323
                                        "https: Failed to perform the SSL handshake: " + ec.message());
2✔
324
                                client->CallErrorHandler(ec, client->request_, client->header_handler_);
1✔
325
                                return;
1✔
326
                        }
327
                        client->logger_.Debug("https: Successful SSL handshake");
2✔
328
                        client->ConnectHandler(ec, endpoint);
2✔
329
                });
330
}
331

332

333
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
104✔
334
        if (ec) {
104✔
335
                CallErrorHandler(ec, request_, header_handler_);
×
336
                return;
×
337
        }
338

339
        logger_.Debug("Connected to " + endpoint.address().to_string());
104✔
340

341
        http_request_ = make_shared<http::request<http::buffer_body>>(
104✔
342
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
104✔
343

344
        for (const auto &header : request_->headers_) {
341✔
345
                http_request_->set(header.first, header.second);
237✔
346
        }
347

348
        http_request_serializer_ =
349
                make_shared<http::request_serializer<http::buffer_body>>(*http_request_);
104✔
350

351
        weak_ptr<Client> weak_client(client_active_);
208✔
352

353
        if (is_https_) {
104✔
354
                http::async_write_header(
4✔
355
                        *stream_,
2✔
356
                        *http_request_serializer_,
2✔
357
                        [weak_client](const error_code &ec, size_t num_written) {
2✔
358
                                auto client = weak_client.lock();
4✔
359
                                if (client) {
2✔
360
                                        client->WriteHeaderHandler(ec, num_written);
2✔
361
                                }
362
                        });
2✔
363
        } else {
364
                http::async_write_header(
204✔
365
                        stream_->next_layer(),
102✔
366
                        *http_request_serializer_,
102✔
367
                        [weak_client](const error_code &ec, size_t num_written) {
102✔
368
                                auto client = weak_client.lock();
204✔
369
                                if (client) {
102✔
370
                                        client->WriteHeaderHandler(ec, num_written);
102✔
371
                                }
372
                        });
102✔
373
        }
374
}
375

376
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
104✔
377
        if (num_written > 0) {
104✔
378
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
104✔
379
        }
380

381
        if (ec) {
104✔
382
                CallErrorHandler(ec, request_, header_handler_);
×
383
                return;
70✔
384
        }
385

386
        auto header = request_->GetHeader("Content-Length");
208✔
387
        if (!header || header.value() == "0") {
104✔
388
                ReadHeader();
69✔
389
                return;
69✔
390
        }
391

392
        auto length = common::StringToLongLong(header.value());
35✔
393
        if (!length || length.value() < 0) {
35✔
394
                auto err = error::Error(
395
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
396
                CallErrorHandler(err, request_, header_handler_);
×
397
                return;
×
398
        }
399
        request_body_length_ = length.value();
35✔
400

401
        if (!request_->body_gen_ && !request_->async_body_gen_) {
35✔
402
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
403
                CallErrorHandler(err, request_, header_handler_);
1✔
404
                return;
1✔
405
        }
406

407
        assert(!(request_->body_gen_ && request_->async_body_gen_));
34✔
408

409
        if (request_->body_gen_) {
34✔
410
                auto body_reader = request_->body_gen_();
33✔
411
                if (!body_reader) {
33✔
412
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
413
                        return;
×
414
                }
415
                request_->body_reader_ = body_reader.value();
33✔
416
        } else {
417
                auto body_reader = request_->async_body_gen_();
1✔
418
                if (!body_reader) {
1✔
419
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
420
                        return;
×
421
                }
422
                request_->async_body_reader_ = body_reader.value();
1✔
423
        }
424

425
        PrepareAndWriteNewBodyBuffer();
34✔
426
}
427

428
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
710✔
429
        if (num_written > 0) {
710✔
430
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
339✔
431
        }
432

433
        if (ec == http::make_error_code(http::error::need_buffer)) {
710✔
434
                // Write next block of the body.
435
                PrepareAndWriteNewBodyBuffer();
339✔
436
        } else if (ec) {
371✔
437
                CallErrorHandler(ec, request_, header_handler_);
1✔
438
        } else if (num_written > 0) {
370✔
439
                // We are still writing the body.
440
                WriteBody();
339✔
441
        } else {
442
                // We are ready to receive the response.
443
                ReadHeader();
31✔
444
        }
445
}
710✔
446

447
void Client::PrepareAndWriteNewBodyBuffer() {
373✔
448
        // request_->body_reader_ XOR request_->async_body_reader_
449
        assert(
373✔
450
                (request_->body_reader_ || request_->async_body_reader_)
451
                && !(request_->body_reader_ && request_->async_body_reader_));
452

453
        auto cancelled = cancelled_;
746✔
454
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
747✔
455
                if (!*cancelled) {
373✔
456
                        if (!read) {
373✔
457
                                CallErrorHandler(read.error(), request_, header_handler_);
1✔
458
                                return;
1✔
459
                        }
460
                        WriteNewBodyBuffer(read.value());
372✔
461
                }
462
        };
746✔
463

464

465
        if (request_->body_reader_) {
373✔
466
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
296✔
467
        } else {
468
                auto err = request_->async_body_reader_->AsyncRead(
77✔
469
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
231✔
470
                if (err != error::NoError) {
77✔
471
                        CallErrorHandler(err, request_, header_handler_);
×
472
                }
473
        }
474
}
373✔
475

476
void Client::WriteNewBodyBuffer(size_t size) {
372✔
477
        http_request_->body().data = body_buffer_.data();
372✔
478
        http_request_->body().size = size;
372✔
479

480
        if (size > 0) {
372✔
481
                http_request_->body().more = true;
341✔
482
        } else {
483
                // Release ownership of Body reader.
484
                request_->body_reader_.reset();
31✔
485
                http_request_->body().more = false;
31✔
486
        }
487

488
        WriteBody();
372✔
489
}
372✔
490

491
void Client::WriteBody() {
711✔
492
        weak_ptr<Client> weak_client(client_active_);
1,422✔
493

494
        if (is_https_) {
711✔
495
                http::async_write_some(
×
496
                        *stream_,
×
497
                        *http_request_serializer_,
×
498
                        [weak_client](const error_code &ec, size_t num_written) {
×
499
                                auto client = weak_client.lock();
×
500
                                if (client) {
×
501
                                        client->WriteBodyHandler(ec, num_written);
×
502
                                }
503
                        });
×
504
        } else {
505
                http::async_write_some(
1,422✔
506
                        stream_->next_layer(),
711✔
507
                        *http_request_serializer_,
711✔
508
                        [weak_client](const error_code &ec, size_t num_written) {
710✔
509
                                auto client = weak_client.lock();
1,420✔
510
                                if (client) {
710✔
511
                                        client->WriteBodyHandler(ec, num_written);
710✔
512
                                }
513
                        });
710✔
514
        }
515
}
711✔
516

517
void Client::ReadHeader() {
100✔
518
        http_response_parser_->get().body().data = body_buffer_.data();
100✔
519
        http_response_parser_->get().body().size = body_buffer_.size();
100✔
520

521
        weak_ptr<Client> weak_client(client_active_);
200✔
522

523
        if (is_https_) {
100✔
524
                http::async_read_some(
4✔
525
                        *stream_,
2✔
526
                        response_buffer_,
2✔
527
                        *http_response_parser_,
2✔
528
                        [weak_client](const error_code &ec, size_t num_read) {
2✔
529
                                auto client = weak_client.lock();
4✔
530
                                if (client) {
2✔
531
                                        client->ReadHeaderHandler(ec, num_read);
2✔
532
                                }
533
                        });
2✔
534
        } else {
535
                http::async_read_some(
196✔
536
                        stream_->next_layer(),
98✔
537
                        response_buffer_,
98✔
538
                        *http_response_parser_,
98✔
539
                        [weak_client](const error_code &ec, size_t num_read) {
97✔
540
                                auto client = weak_client.lock();
194✔
541
                                if (client) {
97✔
542
                                        client->ReadHeaderHandler(ec, num_read);
97✔
543
                                }
544
                        });
97✔
545
        }
546
}
100✔
547

548
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
99✔
549
        if (num_read > 0) {
99✔
550
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
96✔
551
        }
552

553
        if (ec) {
99✔
554
                CallErrorHandler(ec, request_, header_handler_);
3✔
555
                return;
35✔
556
        }
557

558
        if (!http_response_parser_->is_header_done()) {
96✔
559
                ReadHeader();
×
560
                return;
×
561
        }
562

563
        response_.reset(new IncomingResponse(client_active_));
96✔
564
        response_->status_code_ = http_response_parser_->get().result_int();
96✔
565
        response_->status_message_ = string {http_response_parser_->get().reason()};
96✔
566

567
        string debug_str;
96✔
568
        for (auto header = http_response_parser_->get().cbegin();
192✔
569
                 header != http_response_parser_->get().cend();
384✔
570
                 header++) {
571
                response_->headers_[string {header->name_string()}] = string {header->value()};
192✔
572
                if (logger_.Level() >= log::LogLevel::Debug) {
96✔
573
                        debug_str += string {header->name_string()};
95✔
574
                        debug_str += ": ";
95✔
575
                        debug_str += string {header->value()};
95✔
576
                        debug_str += "\n";
95✔
577
                }
578
        }
579

580
        logger_.Debug("Received headers:\n" + debug_str);
96✔
581
        debug_str.clear();
96✔
582

583
        if (http_response_parser_->chunked()) {
96✔
584
                auto cancelled = cancelled_;
1✔
585
                CallHandler(header_handler_);
1✔
586
                if (!*cancelled) {
1✔
587
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
588
                        CallErrorHandler(err, request_, body_handler_);
1✔
589
                }
590
                return;
1✔
591
        }
592

593
        auto content_length = http_response_parser_->content_length();
95✔
594
        if (content_length) {
95✔
595
                response_body_length_ = content_length.value();
77✔
596
        } else {
597
                response_body_length_ = 0;
18✔
598
        }
599
        response_body_read_ = 0;
95✔
600

601
        if (response_body_read_ >= response_body_length_) {
95✔
602
                auto cancelled = cancelled_;
30✔
603
                CallHandler(header_handler_);
30✔
604
                if (!*cancelled) {
30✔
605
                        client_active_.reset();
28✔
606
                        stream_.reset();
28✔
607
                        CallHandler(body_handler_);
28✔
608
                }
609
                return;
30✔
610
        }
611

612
        auto cancelled = cancelled_;
65✔
613
        CallHandler(header_handler_);
65✔
614
        if (*cancelled) {
65✔
615
                return;
1✔
616
        }
617

618
        if (body_status_ == BodyReadingStatus::None) {
64✔
619
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
2✔
620
        }
621
}
622

623
void Client::AsyncReadNextBodyPart(
1,375✔
624
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
625
        assert(body_status_ != BodyReadingStatus::None);
1,375✔
626

627
        if (body_status_ == BodyReadingStatus::ReaderCreated) {
1,375✔
628
                body_status_ = BodyReadingStatus::InProgress;
62✔
629
        }
630

631
        if (body_status_ != BodyReadingStatus::InProgress) {
1,375✔
632
                handler(0);
28✔
633
                if (body_status_ == BodyReadingStatus::ReachedEnd) {
28✔
634
                        body_status_ = BodyReadingStatus::Done;
28✔
635
                        client_active_.reset();
28✔
636
                        stream_.reset();
28✔
637
                        CallHandler(body_handler_);
28✔
638
                }
639
                return;
28✔
640
        }
641

642
        reader_buf_start_ = start;
1,347✔
643
        reader_buf_end_ = end;
1,347✔
644
        reader_handler_ = handler;
1,347✔
645
        size_t read_size = end - start;
1,347✔
646
        size_t smallest = min(body_buffer_.size(), read_size);
1,347✔
647

648
        http_response_parser_->get().body().data = body_buffer_.data();
1,347✔
649
        http_response_parser_->get().body().size = smallest;
1,347✔
650

651
        weak_ptr<Client> weak_client(client_active_);
2,694✔
652

653
        if (is_https_) {
1,347✔
654
                http::async_read_some(
×
655
                        *stream_,
×
656
                        response_buffer_,
×
657
                        *http_response_parser_,
×
658
                        [weak_client](const error_code &ec, size_t num_read) {
×
659
                                auto client = weak_client.lock();
×
660
                                if (client) {
×
661
                                        client->ReadBodyHandler(ec, num_read);
×
662
                                }
663
                        });
×
664
        } else {
665
                http::async_read_some(
2,694✔
666
                        stream_->next_layer(),
1,347✔
667
                        response_buffer_,
1,347✔
668
                        *http_response_parser_,
1,347✔
669
                        [weak_client](const error_code &ec, size_t num_read) {
1,347✔
670
                                auto client = weak_client.lock();
2,694✔
671
                                if (client) {
1,347✔
672
                                        client->ReadBodyHandler(ec, num_read);
1,347✔
673
                                }
674
                        });
1,347✔
675
        }
676
}
677

678
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
1,347✔
679
        if (num_read > 0) {
1,347✔
680
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
1,343✔
681
                response_body_read_ += num_read;
1,343✔
682
        }
683

684
        if (ec == http::make_error_code(http::error::need_buffer)) {
1,347✔
685
                // This can be ignored. We always reset the buffer between reads anyway.
686
                ec = error_code();
979✔
687
        }
688

689
        assert(reader_handler_);
1,347✔
690

691
        if (response_body_read_ >= response_body_length_) {
1,347✔
692
                body_status_ = BodyReadingStatus::ReachedEnd;
58✔
693
        }
694

695
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
1,347✔
696
        size_t smallest = min(num_read, buf_size);
1,347✔
697
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
1,347✔
698
        if (ec) {
1,347✔
699
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
4✔
700
                reader_handler_(expected::unexpected(err));
4✔
701
        } else {
702
                reader_handler_(smallest);
1,343✔
703
        }
704

705
        if (ec) {
1,347✔
706
                CallErrorHandler(ec, request_, body_handler_);
4✔
707
                return;
4✔
708
        }
709
}
710

711
void Client::Cancel() {
246✔
712
        resolver_.cancel();
246✔
713
        if (stream_) {
246✔
714
                stream_->next_layer().cancel();
35✔
715
                stream_->next_layer().close();
35✔
716
                stream_.reset();
35✔
717
        }
718
        client_active_.reset();
246✔
719

720
        request_.reset();
246✔
721
        response_.reset();
246✔
722

723
        // Reset logger to no connection.
724
        logger_ = log::Logger(logger_name_);
246✔
725

726
        // Set cancel state and then make a new one. Those who are interested should have their own
727
        // pointer to the old one.
728
        *cancelled_ = true;
246✔
729
        cancelled_ = make_shared<bool>(false);
246✔
730
}
246✔
731

732
ClientConfig::ClientConfig() :
52✔
733
        ClientConfig("") {
52✔
734
}
52✔
735

736
ClientConfig::ClientConfig(string server_cert_path) :
206✔
737
        server_cert_path {server_cert_path} {
206✔
738
}
206✔
739

740
ClientConfig::~ClientConfig() {
206✔
741
}
206✔
742

743
ServerConfig::ServerConfig() {
101✔
744
}
101✔
745

746
ServerConfig::~ServerConfig() {
101✔
747
}
101✔
748

749
Stream::Stream(Server &server) :
202✔
750
        server_ {server},
751
        logger_ {"http"},
752
        cancelled_(make_shared<bool>(false)),
×
753
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
202✔
754
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
404✔
755
        // This is equivalent to:
756
        //   request_buffer_.reserve(body_buffer_.size());
757
        // but compatible with Boost 1.67.
758
        request_buffer_.prepare(body_buffer_.size() - request_buffer_.size());
202✔
759

760
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
761
        // they do, they should be handled higher up in the application logic.
762
        //
763
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
764
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
765
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
766
        // possible value instead.
767
        http_request_parser_.body_limit(numeric_limits<uint64_t>::max());
202✔
768
}
202✔
769

770
Stream::~Stream() {
202✔
771
        Cancel();
202✔
772
}
202✔
773

774
void Stream::Cancel() {
294✔
775
        if (socket_.is_open()) {
294✔
776
                socket_.cancel();
102✔
777
                socket_.close();
102✔
778
        }
779
        stream_active_.reset();
294✔
780

781
        // Set cancel state and then make a new one. Those who are interested should have their own
782
        // pointer to the old one.
783
        *cancelled_ = true;
294✔
784
        cancelled_ = make_shared<bool>(false);
294✔
785
}
294✔
786

787
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
2✔
788
        stream_active_.reset();
2✔
789
        handler(expected::unexpected(error::Error(
4✔
790
                ec.default_error_condition(),
2✔
791
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
8✔
792

793
        server_.RemoveStream(shared_from_this());
2✔
794
}
2✔
795

796
void Stream::CallErrorHandler(
2✔
797
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
798
        stream_active_.reset();
2✔
799
        handler(expected::unexpected(error::Error(
4✔
800
                err.code,
2✔
801
                err.message + ": " + req->address_.host + ": " + MethodToString(req->method_) + " "
4✔
802
                        + request_->GetPath())));
10✔
803

804
        server_.RemoveStream(shared_from_this());
2✔
805
}
2✔
806

807
void Stream::CallErrorHandler(
2✔
808
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
809
        stream_active_.reset();
2✔
810
        handler(error::Error(
2✔
811
                ec.default_error_condition(),
2✔
812
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
6✔
813

814
        server_.RemoveStream(shared_from_this());
2✔
815
}
2✔
816

817
void Stream::CallErrorHandler(
1✔
818
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
819
        stream_active_.reset();
1✔
820
        handler(error::Error(
1✔
821
                err.code,
1✔
822
                err.message + ": " + req->address_.host + ": " + MethodToString(req->method_) + " "
2✔
823
                        + request_->GetPath()));
4✔
824

825
        server_.RemoveStream(shared_from_this());
1✔
826
}
1✔
827

828
void Stream::AcceptHandler(const error_code &ec) {
102✔
829
        if (ec) {
102✔
830
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
831
                return;
×
832
        }
833

834
        auto ip = socket_.remote_endpoint().address().to_string();
204✔
835

836
        // Use IP as context for logging.
837
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
102✔
838

839
        logger_.Debug("Accepted connection.");
102✔
840

841
        request_.reset(new IncomingRequest(shared_from_this()));
102✔
842

843
        request_->address_.host = ip;
102✔
844

845
        stream_active_.reset(this, [](Stream *) {});
102✔
846

847
        ReadHeader();
102✔
848
}
849

850
void Stream::ReadHeader() {
102✔
851
        http_request_parser_.get().body().data = body_buffer_.data();
102✔
852
        http_request_parser_.get().body().size = body_buffer_.size();
102✔
853

854
        weak_ptr<Stream> weak_stream(stream_active_);
102✔
855

856
        http::async_read_some(
204✔
857
                socket_,
102✔
858
                request_buffer_,
102✔
859
                http_request_parser_,
860
                [weak_stream](const error_code &ec, size_t num_read) {
102✔
861
                        auto stream = weak_stream.lock();
204✔
862
                        if (stream) {
102✔
863
                                stream->ReadHeaderHandler(ec, num_read);
102✔
864
                        }
865
                });
102✔
866
}
102✔
867

868
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
102✔
869
        if (num_read > 0) {
102✔
870
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
102✔
871
        }
872

873
        if (ec) {
102✔
874
                CallErrorHandler(ec, request_, server_.header_handler_);
×
875
                return;
68✔
876
        }
877

878
        if (!http_request_parser_.is_header_done()) {
102✔
879
                ReadHeader();
×
880
                return;
×
881
        }
882

883
        auto method_result = BeastVerbToMethod(
884
                http_request_parser_.get().base().method(),
102✔
885
                string {http_request_parser_.get().base().method_string()});
204✔
886
        if (!method_result) {
102✔
887
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
888
                return;
×
889
        }
890
        request_->method_ = method_result.value();
102✔
891
        request_->address_.path = string(http_request_parser_.get().base().target());
102✔
892

893
        string debug_str;
102✔
894
        for (auto header = http_request_parser_.get().cbegin();
337✔
895
                 header != http_request_parser_.get().cend();
674✔
896
                 header++) {
897
                request_->headers_[string {header->name_string()}] = string {header->value()};
470✔
898
                if (logger_.Level() >= log::LogLevel::Debug) {
235✔
899
                        debug_str += string {header->name_string()};
234✔
900
                        debug_str += ": ";
234✔
901
                        debug_str += string {header->value()};
234✔
902
                        debug_str += "\n";
234✔
903
                }
904
        }
905

906
        logger_.Debug("Received headers:\n" + debug_str);
102✔
907
        debug_str.clear();
102✔
908

909
        if (http_request_parser_.chunked()) {
102✔
910
                auto cancelled = cancelled_;
1✔
911
                server_.header_handler_(request_);
1✔
912
                if (!*cancelled) {
1✔
913
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
914
                        CallErrorHandler(err, request_, server_.body_handler_);
1✔
915
                }
916
                return;
1✔
917
        }
918

919
        auto content_length = http_request_parser_.content_length();
101✔
920
        if (content_length) {
101✔
921
                request_body_length_ = content_length.value();
35✔
922
        } else {
923
                request_body_length_ = 0;
66✔
924
        }
925
        request_body_read_ = 0;
101✔
926

927
        if (request_body_read_ >= request_body_length_) {
101✔
928
                auto cancelled = cancelled_;
66✔
929
                server_.header_handler_(request_);
66✔
930
                if (!*cancelled) {
66✔
931
                        CallBodyHandler();
66✔
932
                }
933
                return;
66✔
934
        }
935

936
        auto cancelled = cancelled_;
35✔
937
        server_.header_handler_(request_);
35✔
938
        if (*cancelled) {
35✔
939
                return;
1✔
940
        }
941

942
        if (body_status_ == BodyReadingStatus::None) {
34✔
943
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
1✔
944
        }
945
}
946

947
void Stream::AsyncReadNextBodyPart(
1,350✔
948
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
949
        assert(body_status_ != BodyReadingStatus::None);
1,350✔
950

951
        if (body_status_ == BodyReadingStatus::ReaderCreated) {
1,350✔
952
                body_status_ = BodyReadingStatus::InProgress;
33✔
953
        }
954

955
        if (body_status_ != BodyReadingStatus::InProgress) {
1,350✔
956
                handler(0);
31✔
957
                if (body_status_ == BodyReadingStatus::ReachedEnd) {
31✔
958
                        body_status_ = BodyReadingStatus::Done;
31✔
959
                        CallBodyHandler();
31✔
960
                }
961
                return;
31✔
962
        }
963

964
        reader_buf_start_ = start;
1,319✔
965
        reader_buf_end_ = end;
1,319✔
966
        reader_handler_ = handler;
1,319✔
967
        size_t read_size = end - start;
1,319✔
968
        size_t smallest = min(body_buffer_.size(), read_size);
1,319✔
969

970
        http_request_parser_.get().body().data = body_buffer_.data();
1,319✔
971
        http_request_parser_.get().body().size = smallest;
1,319✔
972

973
        weak_ptr<Stream> weak_stream(stream_active_);
1,319✔
974

975
        http::async_read_some(
2,638✔
976
                socket_,
1,319✔
977
                request_buffer_,
1,319✔
978
                http_request_parser_,
979
                [weak_stream](const error_code &ec, size_t num_read) {
1,319✔
980
                        auto stream = weak_stream.lock();
2,638✔
981
                        if (stream) {
1,319✔
982
                                stream->ReadBodyHandler(ec, num_read);
1,319✔
983
                        }
984
                });
1,319✔
985
}
986

987
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
1,319✔
988
        if (num_read > 0) {
1,319✔
989
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
1,317✔
990
                request_body_read_ += num_read;
1,317✔
991
        }
992

993
        if (ec == http::make_error_code(http::error::need_buffer)) {
1,319✔
994
                // This can be ignored. We always reset the buffer between reads anyway.
995
                ec = error_code();
979✔
996
        }
997

998
        assert(reader_handler_);
1,319✔
999

1000
        if (request_body_read_ >= request_body_length_) {
1,319✔
1001
                body_status_ = BodyReadingStatus::ReachedEnd;
31✔
1002
        }
1003

1004
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
1,319✔
1005
        size_t smallest = min(num_read, buf_size);
1,319✔
1006
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
1,319✔
1007
        if (ec) {
1,319✔
1008
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
2✔
1009
                reader_handler_(expected::unexpected(err));
2✔
1010
        } else {
1011
                reader_handler_(smallest);
1,317✔
1012
        }
1013

1014
        if (ec) {
1,319✔
1015
                CallErrorHandler(ec, request_, server_.body_handler_);
2✔
1016
                return;
2✔
1017
        }
1018
}
1019

1020
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
94✔
1021
        auto response = maybe_response_.lock();
188✔
1022
        // Only called from existing responses, so this should always be true.
1023
        assert(response);
94✔
1024

1025
        // From here on we take shared ownership.
1026
        response_ = response;
94✔
1027

1028
        reply_finished_handler_ = reply_finished_handler;
94✔
1029

1030
        http_response_ = make_shared<http::response<http::buffer_body>>();
94✔
1031

1032
        for (const auto &header : response->headers_) {
177✔
1033
                http_response_->base().set(header.first, header.second);
83✔
1034
        }
1035

1036
        http_response_->result(response->GetStatusCode());
94✔
1037
        http_response_->reason(response->GetStatusMessage());
94✔
1038

1039
        http_response_serializer_ =
1040
                make_shared<http::response_serializer<http::buffer_body>>(*http_response_);
94✔
1041

1042
        weak_ptr<Stream> weak_stream(stream_active_);
94✔
1043

1044
        http::async_write_header(
188✔
1045
                socket_,
94✔
1046
                *http_response_serializer_,
94✔
1047
                [weak_stream](const error_code &ec, size_t num_written) {
94✔
1048
                        auto stream = weak_stream.lock();
188✔
1049
                        if (stream) {
94✔
1050
                                stream->WriteHeaderHandler(ec, num_written);
93✔
1051
                        }
1052
                });
94✔
1053
}
94✔
1054

1055
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
93✔
1056
        if (num_written > 0) {
93✔
1057
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
93✔
1058
        }
1059

1060
        if (ec) {
93✔
1061
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1062
                return;
31✔
1063
        }
1064

1065
        auto header = response_->GetHeader("Content-Length");
186✔
1066
        if (!header || header.value() == "0") {
93✔
1067
                FinishReply();
30✔
1068
                return;
30✔
1069
        }
1070

1071
        auto length = common::StringToLongLong(header.value());
63✔
1072
        if (!length || length.value() < 0) {
63✔
1073
                auto err = error::Error(
1074
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1075
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1076
                return;
×
1077
        }
1078

1079
        if (!response_->body_reader_ && !response_->async_body_reader_) {
63✔
1080
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1081
                CallErrorHandler(err, request_, reply_finished_handler_);
1✔
1082
                return;
1✔
1083
        }
1084

1085
        PrepareAndWriteNewBodyBuffer();
62✔
1086
}
1087

1088
void Stream::PrepareAndWriteNewBodyBuffer() {
420✔
1089
        // response_->body_reader_ XOR response_->async_body_reader_
1090
        assert(
420✔
1091
                (response_->body_reader_ || response_->async_body_reader_)
1092
                && !(response_->body_reader_ && response_->async_body_reader_));
1093

1094
        auto read_handler = [this](io::ExpectedSize read) {
840✔
1095
                if (!read) {
420✔
1096
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
×
1097
                        return;
×
1098
                }
1099
                WriteNewBodyBuffer(read.value());
420✔
1100
        };
420✔
1101

1102
        if (response_->body_reader_) {
420✔
1103
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
343✔
1104
        } else {
1105
                auto err = response_->async_body_reader_->AsyncRead(
77✔
1106
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
154✔
1107
                if (err != error::NoError) {
77✔
1108
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1109
                }
1110
        }
1111
}
420✔
1112

1113
void Stream::WriteNewBodyBuffer(size_t size) {
420✔
1114
        http_response_->body().data = body_buffer_.data();
420✔
1115
        http_response_->body().size = size;
420✔
1116

1117
        if (size > 0) {
420✔
1118
                http_response_->body().more = true;
364✔
1119
        } else {
1120
                // Release ownership of Body reader.
1121
                response_->body_reader_.reset();
56✔
1122
                http_response_->body().more = false;
56✔
1123
        }
1124

1125
        WriteBody();
420✔
1126
}
420✔
1127

1128
void Stream::WriteBody() {
782✔
1129
        weak_ptr<Stream> weak_stream(stream_active_);
782✔
1130

1131
        http::async_write_some(
1,564✔
1132
                socket_,
782✔
1133
                *http_response_serializer_,
782✔
1134
                [weak_stream](const error_code &ec, size_t num_written) {
769✔
1135
                        auto stream = weak_stream.lock();
1,538✔
1136
                        if (stream) {
769✔
1137
                                stream->WriteBodyHandler(ec, num_written);
769✔
1138
                        }
1139
                });
769✔
1140
}
782✔
1141

1142
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
769✔
1143
        if (num_written > 0) {
769✔
1144
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
362✔
1145
        }
1146

1147
        if (ec == http::make_error_code(http::error::need_buffer)) {
769✔
1148
                // Write next body block.
1149
                PrepareAndWriteNewBodyBuffer();
358✔
1150
        } else if (ec) {
411✔
1151
                CallErrorHandler(ec, request_, reply_finished_handler_);
2✔
1152
        } else if (num_written > 0) {
409✔
1153
                // We are still writing the body.
1154
                WriteBody();
362✔
1155
        } else {
1156
                // We are finished.
1157
                FinishReply();
47✔
1158
        }
1159
}
769✔
1160

1161
void Stream::FinishReply() {
77✔
1162
        // We are done.
1163
        stream_active_.reset();
77✔
1164
        reply_finished_handler_(error::NoError);
77✔
1165
        server_.RemoveStream(shared_from_this());
77✔
1166
}
77✔
1167

1168
void Stream::CallBodyHandler() {
97✔
1169
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1170
        // it immediately destroys, which would destroy this stream as well. At the end of this
1171
        // function, it's ok to destroy it.
1172
        auto stream_ref = shared_from_this();
194✔
1173

1174
        server_.body_handler_(request_);
97✔
1175

1176
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1177
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1178
        // request has not been handled correctly.
1179
        auto response = maybe_response_.lock();
194✔
1180
        if (!response) {
97✔
1181
                logger_.Error("Handler produced no response. Closing stream prematurely.");
2✔
1182
                server_.RemoveStream(shared_from_this());
2✔
1183
        }
1184
}
97✔
1185

1186
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
101✔
1187
        event_loop_ {event_loop},
1188
        acceptor_(GetAsioIoContext(event_loop_)) {
101✔
1189
}
101✔
1190

1191
Server::~Server() {
101✔
1192
        Cancel();
101✔
1193
}
101✔
1194

1195
error::Error Server::AsyncServeUrl(
101✔
1196
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1197
        auto err = BreakDownUrl(url, address_);
202✔
1198
        if (error::NoError != err) {
101✔
1199
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1200
        }
1201

1202
        if (address_.protocol != "http") {
101✔
1203
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1204
        }
1205

1206
        if (address_.path.size() > 0 && address_.path != "/") {
101✔
1207
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
1✔
1208
        }
1209

1210
        boost::system::error_code ec;
100✔
1211
        auto address = asio::ip::make_address(address_.host, ec);
100✔
1212
        if (ec) {
100✔
1213
                return error::Error(
1214
                        ec.default_error_condition(),
×
1215
                        "Could not construct endpoint from address " + address_.host);
×
1216
        }
1217

1218
        asio::ip::tcp::endpoint endpoint(address, address_.port);
100✔
1219

1220
        ec.clear();
100✔
1221
        acceptor_.open(endpoint.protocol(), ec);
100✔
1222
        if (ec) {
100✔
1223
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1224
        }
1225

1226
        // Allow address reuse, otherwise we can't re-bind later.
1227
        ec.clear();
100✔
1228
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
100✔
1229
        if (ec) {
100✔
1230
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1231
        }
1232

1233
        ec.clear();
100✔
1234
        acceptor_.bind(endpoint, ec);
100✔
1235
        if (ec) {
100✔
1236
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1237
        }
1238

1239
        ec.clear();
100✔
1240
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
100✔
1241
        if (ec) {
100✔
1242
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1243
        }
1244

1245
        header_handler_ = header_handler;
100✔
1246
        body_handler_ = body_handler;
100✔
1247

1248
        PrepareNewStream();
100✔
1249

1250
        return error::NoError;
100✔
1251
}
1252

1253
void Server::Cancel() {
101✔
1254
        if (acceptor_.is_open()) {
101✔
1255
                acceptor_.cancel();
100✔
1256
                acceptor_.close();
100✔
1257
        }
1258
        streams_.clear();
101✔
1259
}
101✔
1260

1261
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
96✔
1262
        auto stream = req->stream_.lock();
192✔
1263
        if (!stream) {
96✔
1264
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1265
        }
1266
        OutgoingResponsePtr response {new OutgoingResponse};
192✔
1267
        response->stream_ = stream;
96✔
1268
        stream->maybe_response_ = response;
96✔
1269
        return response;
96✔
1270
}
1271

1272
error::Error Server::AsyncReply(
94✔
1273
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1274
        auto stream = resp->stream_.lock();
188✔
1275
        if (!stream) {
94✔
1276
                return MakeError(StreamCancelledError, "Cannot send response");
×
1277
        }
1278

1279
        stream->AsyncReply(reply_finished_handler);
94✔
1280
        return error::NoError;
94✔
1281
}
1282

1283
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
42✔
1284
        auto stream = req->stream_.lock();
84✔
1285
        if (!stream) {
42✔
1286
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1287
        }
1288

1289
        if (stream->body_status_ != BodyReadingStatus::None) {
42✔
1290
                return expected::unexpected(error::Error(
1✔
1291
                        make_error_condition(errc::operation_in_progress),
1✔
1292
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1293
        }
1294

1295
        if (stream->request_body_length_ == 0) {
41✔
1296
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
16✔
1297
        }
1298

1299
        stream->body_status_ = BodyReadingStatus::ReaderCreated;
33✔
1300
        return make_shared<BodyAsyncReader<Stream>>(stream);
66✔
1301
}
1302

1303
void Server::PrepareNewStream() {
202✔
1304
        StreamPtr new_stream {new Stream(*this)};
202✔
1305
        streams_.insert(new_stream);
202✔
1306
        AsyncAccept(new_stream);
202✔
1307
}
202✔
1308

1309
void Server::AsyncAccept(StreamPtr stream) {
202✔
1310
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
202✔
1311
                if (ec) {
102✔
1312
                        log::Error("Could not accept connection: " + ec.message());
×
1313
                        return;
×
1314
                }
1315

1316
                stream->AcceptHandler(ec);
102✔
1317

1318
                this->PrepareNewStream();
102✔
1319
        });
1320
}
202✔
1321

1322
void Server::RemoveStream(const StreamPtr &stream) {
89✔
1323
        streams_.erase(stream);
89✔
1324

1325
        // Work around bug in Boost ASIO: When the handler for `async_read_some` is called with `ec
1326
        // == operation_aborted`, the handler should not access any supplied buffers, because it may
1327
        // be aborted due to object destruction. However, it does access buffers. This means it does
1328
        // not help to call `Cancel()` prior to destruction. We need to call `Cancel()` first, and
1329
        // then wait until the handler which receives `operation_aborted` has run. So do a
1330
        // `Cancel()` followed by `Post()` for this, which should queue us in the correct order:
1331
        // `operation_aborted` -> `Post` handler.
1332
        stream->Cancel();
89✔
1333
        event_loop_.Post([stream]() {
89✔
1334
                // No-op, just keep `stream` alive until we get back to this handler.
1335
        });
178✔
1336
}
89✔
1337

1338
} // namespace http
1339
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc