• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1007911266

18 Sep 2023 05:57AM UTC coverage: 78.924% (+0.3%) from 78.604%
1007911266

push

gitlab-ci

kacf
chore: Make condition clearer, no need for a comment then.

Signed-off-by: Kristian Amlie <kristian.amlie@northern.tech>

1 of 1 new or added line in 1 file covered. (100.0%)

6044 of 7658 relevant lines covered (78.92%)

11002.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.38
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24

25
namespace mender {
26
namespace http {
27

28
namespace common = mender::common;
29

30
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
31
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
32
const unsigned int BeastHttpVersion = 11;
33

34
namespace asio = boost::asio;
35
namespace http = boost::beast::http;
36

37
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
38

39
static http::verb MethodToBeastVerb(Method method) {
117✔
40
        switch (method) {
117✔
41
        case Method::GET:
76✔
42
                return http::verb::get;
76✔
43
        case Method::HEAD:
×
44
                return http::verb::head;
×
45
        case Method::POST:
19✔
46
                return http::verb::post;
19✔
47
        case Method::PUT:
22✔
48
                return http::verb::put;
22✔
49
        case Method::PATCH:
×
50
                return http::verb::patch;
×
51
        case Method::CONNECT:
×
52
                return http::verb::connect;
×
53
        case Method::Invalid:
×
54
                // Fallthrough to end (no-op).
55
                break;
×
56
        }
57
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
58
        // still assert here for safety.
59
        assert(false);
×
60
        return http::verb::get;
61
}
62

63
static expected::expected<Method, error::Error> BeastVerbToMethod(
115✔
64
        http::verb verb, const string &verb_string) {
65
        switch (verb) {
115✔
66
        case http::verb::get:
74✔
67
                return Method::GET;
74✔
68
        case http::verb::head:
×
69
                return Method::HEAD;
×
70
        case http::verb::post:
19✔
71
                return Method::POST;
19✔
72
        case http::verb::put:
22✔
73
                return Method::PUT;
22✔
74
        case http::verb::patch:
×
75
                return Method::PATCH;
×
76
        case http::verb::connect:
×
77
                return Method::CONNECT;
×
78
        default:
×
79
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
80
        }
81
}
82

83
template <typename StreamType>
84
class BodyAsyncReader : virtual public io::AsyncReader {
85
public:
86
        BodyAsyncReader(StreamType &stream, shared_ptr<bool> cancelled) :
68✔
87
                stream_ {stream},
88
                cancelled_ {cancelled} {
68✔
89
        }
68✔
90
        ~BodyAsyncReader() {
43✔
91
                Cancel();
43✔
92
        }
43✔
93

94
        error::Error AsyncRead(
2,044✔
95
                vector<uint8_t>::iterator start,
96
                vector<uint8_t>::iterator end,
97
                io::AsyncIoHandler handler) override {
98
                if (*cancelled_) {
2,044✔
99
                        return error::MakeError(
×
100
                                error::ProgrammingError,
101
                                "BodyAsyncReader::AsyncRead called after stream is destroyed");
×
102
                }
103
                stream_.AsyncReadNextBodyPart(start, end, handler);
2,044✔
104
                return error::NoError;
2,044✔
105
        }
106

107
        void Cancel() override {
45✔
108
                if (!*cancelled_) {
45✔
109
                        stream_.Cancel();
5✔
110
                }
111
        }
45✔
112

113
private:
114
        StreamType &stream_;
115
        shared_ptr<bool> cancelled_;
116

117
        friend class Client;
118
        friend class Server;
119
};
120

121
Client::Client(
53✔
122
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
174✔
123
        event_loop_ {event_loop},
124
        logger_name_ {logger_name},
125
        cancelled_ {make_shared<bool>(true)},
×
126
        resolver_(GetAsioIoContext(event_loop)),
127
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
106✔
128
        // This is equivalent to:
129
        //   response_buffer_.reserve(body_buffer_.size());
130
        // but compatible with Boost 1.67.
131
        response_buffer_.prepare(body_buffer_.size() - response_buffer_.size());
53✔
132

133
        ssl_ctx_.set_verify_mode(ssl::verify_peer);
53✔
134

135
        beast::error_code ec {};
53✔
136
        ssl_ctx_.set_default_verify_paths(ec); // Load the default CAs
53✔
137
        if (ec) {
53✔
138
                log::Error("Failed to load the SSL default directory");
×
139
        }
140
        if (client.server_cert_path != "") {
53✔
141
                ssl_ctx_.load_verify_file(client.server_cert_path, ec);
×
142
                if (ec) {
×
143
                        log::Error("Failed to load the server certificate!");
×
144
                }
145
        }
146
}
53✔
147

148
Client::~Client() {
53✔
149
        if (!*cancelled_) {
53✔
150
                logger_.Warning("Client destroyed while request is still active!");
×
151
        }
152
        DoCancel();
53✔
153
}
53✔
154

155
error::Error Client::AsyncCall(
124✔
156
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
157
        if (!*cancelled_) {
124✔
158
                return error::Error(
159
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
160
        }
161

162
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
124✔
163
                return error::MakeError(error::ProgrammingError, "Request is not ready");
2✔
164
        }
165

166
        if (!header_handler || !body_handler) {
122✔
167
                return error::MakeError(
168
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
1✔
169
        }
170

171
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
121✔
172
                return error::Error(
173
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
174
        }
175

176
        if (req->address_.protocol == "https") {
120✔
177
                is_https_ = true;
3✔
178
        }
179

180
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
120✔
181

182
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
183
        // request to route to our k8s cluster. Set this in all cases.
184
        req->SetHeader("HOST", req->address_.host);
120✔
185

186
        request_ = req;
120✔
187
        header_handler_ = header_handler;
120✔
188
        body_handler_ = body_handler;
120✔
189
        status_ = TransactionStatus::None;
120✔
190

191
        *cancelled_ = false;
120✔
192

193
        auto &cancelled = cancelled_;
120✔
194

195
        resolver_.async_resolve(
360✔
196
                request_->address_.host,
120✔
197
                to_string(request_->address_.port),
240✔
198
                [this, cancelled](
120✔
199
                        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
119✔
200
                        if (!*cancelled) {
120✔
201
                                ResolveHandler(ec, results);
119✔
202
                        }
203
                });
120✔
204

205
        return error::NoError;
120✔
206
}
207

208
io::ExpectedAsyncReaderPtr Client::MakeBodyAsyncReader(IncomingResponsePtr resp) {
81✔
209
        if (status_ != TransactionStatus::HeaderHandlerCalled) {
81✔
210
                return expected::unexpected(error::Error(
1✔
211
                        make_error_condition(errc::operation_in_progress),
1✔
212
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
213
        }
214

215
        if (response_body_length_ == 0) {
80✔
216
                return expected::unexpected(
12✔
217
                        MakeError(BodyMissingError, "Response does not contain a body"));
24✔
218
        }
219

220
        status_ = TransactionStatus::ReaderCreated;
68✔
221
        return make_shared<BodyAsyncReader<Client>>(resp->client_, resp->cancelled_);
136✔
222
}
223

224
void Client::CallHandler(ResponseHandler handler) {
163✔
225
        // This function exists to make sure we have a copy of the handler we're calling (in the
226
        // argument list). This is important in case the handler owns the client instance through a
227
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
228
        // does, then it destroyes the final copy of the handler, and therefore also the client,
229
        // which is why we need to make a copy here, before calling it.
230
        handler(response_);
163✔
231
}
163✔
232

233
void Client::CallErrorHandler(
11✔
234
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
235
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
11✔
236
}
11✔
237

238
void Client::CallErrorHandler(
55✔
239
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
240
        *cancelled_ = true;
55✔
241
        cancelled_ = make_shared<bool>(true);
55✔
242
        stream_.reset();
55✔
243
        status_ = TransactionStatus::Done;
55✔
244
        handler(expected::unexpected(
55✔
245
                err.WithContext(MethodToString(req->method_) + " " + req->orig_address_)));
110✔
246
}
55✔
247

248
void Client::ResolveHandler(
119✔
249
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
250
        if (ec) {
119✔
251
                CallErrorHandler(ec, request_, header_handler_);
×
252
                return;
×
253
        }
254

255
        if (logger_.Level() >= log::LogLevel::Debug) {
119✔
256
                string ips = "[";
236✔
257
                string sep;
118✔
258
                for (auto r : results) {
247✔
259
                        ips += sep;
129✔
260
                        ips += r.endpoint().address().to_string();
129✔
261
                        sep = ", ";
129✔
262
                }
263
                ips += "]";
118✔
264
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
118✔
265
        }
266

267
        resolver_results_ = results;
119✔
268

269
        stream_ = make_shared<ssl::stream<tcp::socket>>(GetAsioIoContext(event_loop_), ssl_ctx_);
119✔
270

271
        http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
119✔
272

273
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
274
        // if they do, they should be handled higher up in the application logic.
275
        //
276
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
277
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
278
        // `has_value()` in their code, causing their subsequent comparison operation to
279
        // misbehave. So pass highest possible value instead.
280
        http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
119✔
281

282
        auto &cancelled = cancelled_;
119✔
283

284
        asio::async_connect(
238✔
285
                stream_->next_layer(),
119✔
286
                resolver_results_,
119✔
287
                [this, cancelled](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
357✔
288
                        if (!*cancelled) {
119✔
289
                                if (is_https_) {
119✔
290
                                        return HandshakeHandler(ec, endpoint);
3✔
291
                                }
292
                                return ConnectHandler(ec, endpoint);
116✔
293
                        }
294
                });
295
}
296

297
void Client::HandshakeHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
3✔
298
        if (ec) {
3✔
299
                CallErrorHandler(ec, request_, header_handler_);
×
300
                return;
×
301
        }
302

303
        // Set SNI Hostname (many hosts need this to handshake successfully)
304
        if (!SSL_set_tlsext_host_name(stream_->native_handle(), request_->address_.host.c_str())) {
3✔
305
                beast::error_code ec2 {
306
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
307
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
308
        }
309

310
        auto &cancelled = cancelled_;
3✔
311

312
        stream_->async_handshake(
3✔
313
                ssl::stream_base::client, [this, cancelled, endpoint](const error_code &ec) {
10✔
314
                        if (*cancelled) {
3✔
315
                                return;
×
316
                        }
317
                        if (ec) {
3✔
318
                                logger_.Error("https: Failed to perform the SSL handshake: " + ec.message());
1✔
319
                                CallErrorHandler(ec, request_, header_handler_);
1✔
320
                                return;
1✔
321
                        }
322
                        logger_.Debug("https: Successful SSL handshake");
2✔
323
                        ConnectHandler(ec, endpoint);
2✔
324
                });
325
}
326

327

328
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
118✔
329
        if (ec) {
118✔
330
                CallErrorHandler(ec, request_, header_handler_);
1✔
331
                return;
1✔
332
        }
333

334
        logger_.Debug("Connected to " + endpoint.address().to_string());
117✔
335

336
        http_request_ = make_shared<http::request<http::buffer_body>>(
117✔
337
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
117✔
338

339
        for (const auto &header : request_->headers_) {
377✔
340
                http_request_->set(header.first, header.second);
260✔
341
        }
342

343
        http_request_serializer_ =
344
                make_shared<http::request_serializer<http::buffer_body>>(*http_request_);
117✔
345

346
        auto &cancelled = cancelled_;
117✔
347

348
        if (is_https_) {
117✔
349
                http::async_write_header(
4✔
350
                        *stream_,
2✔
351
                        *http_request_serializer_,
2✔
352
                        [this, cancelled](const error_code &ec, size_t num_written) {
4✔
353
                                if (!*cancelled) {
2✔
354
                                        WriteHeaderHandler(ec, num_written);
2✔
355
                                }
356
                        });
2✔
357
        } else {
358
                http::async_write_header(
230✔
359
                        stream_->next_layer(),
115✔
360
                        *http_request_serializer_,
115✔
361
                        [this, cancelled](const error_code &ec, size_t num_written) {
230✔
362
                                if (!*cancelled) {
115✔
363
                                        WriteHeaderHandler(ec, num_written);
115✔
364
                                }
365
                        });
115✔
366
        }
367
}
368

369
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
117✔
370
        if (num_written > 0) {
117✔
371
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
117✔
372
        }
373

374
        if (ec) {
117✔
375
                CallErrorHandler(ec, request_, header_handler_);
×
376
                return;
73✔
377
        }
378

379
        auto header = request_->GetHeader("Content-Length");
234✔
380
        if (!header || header.value() == "0") {
117✔
381
                ReadHeader();
72✔
382
                return;
72✔
383
        }
384

385
        auto length = common::StringToLongLong(header.value());
45✔
386
        if (!length || length.value() < 0) {
45✔
387
                auto err = error::Error(
388
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
389
                CallErrorHandler(err, request_, header_handler_);
×
390
                return;
×
391
        }
392
        request_body_length_ = length.value();
45✔
393

394
        if (!request_->body_gen_ && !request_->async_body_gen_) {
45✔
395
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
396
                CallErrorHandler(err, request_, header_handler_);
1✔
397
                return;
1✔
398
        }
399

400
        assert(!(request_->body_gen_ && request_->async_body_gen_));
44✔
401

402
        if (request_->body_gen_) {
44✔
403
                auto body_reader = request_->body_gen_();
38✔
404
                if (!body_reader) {
38✔
405
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
406
                        return;
×
407
                }
408
                request_->body_reader_ = body_reader.value();
38✔
409
        } else {
410
                auto body_reader = request_->async_body_gen_();
6✔
411
                if (!body_reader) {
6✔
412
                        CallErrorHandler(body_reader.error(), request_, header_handler_);
×
413
                        return;
×
414
                }
415
                request_->async_body_reader_ = body_reader.value();
6✔
416
        }
417

418
        PrepareAndWriteNewBodyBuffer();
44✔
419
}
420

421
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,136✔
422
        if (num_written > 0) {
2,136✔
423
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
1,048✔
424
        }
425

426
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,136✔
427
                // Write next block of the body.
428
                PrepareAndWriteNewBodyBuffer();
1,048✔
429
        } else if (ec) {
1,088✔
430
                CallErrorHandler(ec, request_, header_handler_);
3✔
431
        } else if (num_written > 0) {
1,085✔
432
                // We are still writing the body.
433
                WriteBody();
1,048✔
434
        } else {
435
                // We are ready to receive the response.
436
                ReadHeader();
37✔
437
        }
438
}
2,136✔
439

440
void Client::PrepareAndWriteNewBodyBuffer() {
1,092✔
441
        // request_->body_reader_ XOR request_->async_body_reader_
442
        assert(
1,092✔
443
                (request_->body_reader_ || request_->async_body_reader_)
444
                && !(request_->body_reader_ && request_->async_body_reader_));
445

446
        auto cancelled = cancelled_;
2,184✔
447
        auto read_handler = [this, cancelled](io::ExpectedSize read) {
2,185✔
448
                if (!*cancelled) {
1,092✔
449
                        if (!read) {
1,091✔
450
                                CallErrorHandler(read.error(), request_, header_handler_);
2✔
451
                                return;
2✔
452
                        }
453
                        WriteNewBodyBuffer(read.value());
1,089✔
454
                }
455
        };
2,184✔
456

457

458
        if (request_->body_reader_) {
1,092✔
459
                read_handler(request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
667✔
460
        } else {
461
                auto err = request_->async_body_reader_->AsyncRead(
425✔
462
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
1,275✔
463
                if (err != error::NoError) {
425✔
464
                        CallErrorHandler(err, request_, header_handler_);
×
465
                }
466
        }
467
}
1,092✔
468

469
void Client::WriteNewBodyBuffer(size_t size) {
1,089✔
470
        http_request_->body().data = body_buffer_.data();
1,089✔
471
        http_request_->body().size = size;
1,089✔
472

473
        if (size > 0) {
1,089✔
474
                http_request_->body().more = true;
1,052✔
475
        } else {
476
                // Release ownership of Body reader.
477
                request_->body_reader_.reset();
37✔
478
                request_->async_body_reader_.reset();
37✔
479
                http_request_->body().more = false;
37✔
480
        }
481

482
        WriteBody();
1,089✔
483
}
1,089✔
484

485
void Client::WriteBody() {
2,137✔
486
        auto &cancelled = cancelled_;
2,137✔
487

488
        if (is_https_) {
2,137✔
489
                http::async_write_some(
×
490
                        *stream_,
×
491
                        *http_request_serializer_,
×
492
                        [this, cancelled](const error_code &ec, size_t num_written) {
×
493
                                if (!*cancelled) {
×
494
                                        WriteBodyHandler(ec, num_written);
×
495
                                }
496
                        });
×
497
        } else {
498
                http::async_write_some(
4,274✔
499
                        stream_->next_layer(),
2,137✔
500
                        *http_request_serializer_,
2,137✔
501
                        [this, cancelled](const error_code &ec, size_t num_written) {
4,272✔
502
                                if (!*cancelled) {
2,136✔
503
                                        WriteBodyHandler(ec, num_written);
2,136✔
504
                                }
505
                        });
2,136✔
506
        }
507
}
2,137✔
508

509
void Client::ReadHeader() {
109✔
510
        http_response_parser_->get().body().data = body_buffer_.data();
109✔
511
        http_response_parser_->get().body().size = body_buffer_.size();
109✔
512

513
        auto &cancelled = cancelled_;
109✔
514

515
        if (is_https_) {
109✔
516
                http::async_read_some(
4✔
517
                        *stream_,
2✔
518
                        response_buffer_,
2✔
519
                        *http_response_parser_,
2✔
520
                        [this, cancelled](const error_code &ec, size_t num_read) {
4✔
521
                                if (!*cancelled) {
2✔
522
                                        ReadHeaderHandler(ec, num_read);
2✔
523
                                }
524
                        });
2✔
525
        } else {
526
                http::async_read_some(
214✔
527
                        stream_->next_layer(),
107✔
528
                        response_buffer_,
107✔
529
                        *http_response_parser_,
107✔
530
                        [this, cancelled](const error_code &ec, size_t num_read) {
212✔
531
                                if (!*cancelled) {
106✔
532
                                        ReadHeaderHandler(ec, num_read);
106✔
533
                                }
534
                        });
106✔
535
        }
536
}
109✔
537

538
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
108✔
539
        if (num_read > 0) {
108✔
540
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
104✔
541
        }
542

543
        if (ec) {
108✔
544
                CallErrorHandler(ec, request_, header_handler_);
4✔
545
                return;
38✔
546
        }
547

548
        if (!http_response_parser_->is_header_done()) {
104✔
549
                ReadHeader();
×
550
                return;
×
551
        }
552

553
        response_.reset(new IncomingResponse(*this, cancelled_));
104✔
554
        response_->status_code_ = http_response_parser_->get().result_int();
104✔
555
        response_->status_message_ = string {http_response_parser_->get().reason()};
104✔
556

557
        string debug_str;
104✔
558
        for (auto header = http_response_parser_->get().cbegin();
206✔
559
                 header != http_response_parser_->get().cend();
412✔
560
                 header++) {
561
                response_->headers_[string {header->name_string()}] = string {header->value()};
204✔
562
                if (logger_.Level() >= log::LogLevel::Debug) {
102✔
563
                        debug_str += string {header->name_string()};
101✔
564
                        debug_str += ": ";
101✔
565
                        debug_str += string {header->value()};
101✔
566
                        debug_str += "\n";
101✔
567
                }
568
        }
569

570
        logger_.Debug("Received headers:\n" + debug_str);
104✔
571
        debug_str.clear();
104✔
572

573
        if (http_response_parser_->chunked()) {
104✔
574
                auto cancelled = cancelled_;
1✔
575
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
576
                CallHandler(header_handler_);
1✔
577
                if (!*cancelled) {
1✔
578
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
579
                        CallErrorHandler(err, request_, body_handler_);
1✔
580
                }
581
                return;
1✔
582
        }
583

584
        auto content_length = http_response_parser_->content_length();
103✔
585
        if (content_length) {
103✔
586
                response_body_length_ = content_length.value();
83✔
587
        } else {
588
                response_body_length_ = 0;
20✔
589
        }
590
        response_body_read_ = 0;
103✔
591

592
        if (response_body_read_ >= response_body_length_) {
103✔
593
                auto cancelled = cancelled_;
32✔
594
                status_ = TransactionStatus::HeaderHandlerCalled;
32✔
595
                CallHandler(header_handler_);
32✔
596
                if (!*cancelled) {
32✔
597
                        *cancelled_ = true;
30✔
598
                        cancelled_ = make_shared<bool>(true);
30✔
599
                        stream_.reset();
30✔
600
                        CallHandler(body_handler_);
30✔
601
                }
602
                return;
32✔
603
        }
604

605
        auto cancelled = cancelled_;
71✔
606
        status_ = TransactionStatus::HeaderHandlerCalled;
71✔
607
        CallHandler(header_handler_);
71✔
608
        if (*cancelled) {
71✔
609
                return;
1✔
610
        }
611

612
        // We know that a body reader is required here, because of the `response_body_read_ >=
613
        // response_body_length_` check above.
614
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
70✔
615
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, body_handler_);
2✔
616
        }
617
}
618

619
void Client::AsyncReadNextBodyPart(
1,767✔
620
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
621
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
1,767✔
622

623
        if (status_ == TransactionStatus::ReaderCreated) {
1,767✔
624
                status_ = TransactionStatus::BodyReadingInProgress;
68✔
625
        }
626

627
        if (status_ != TransactionStatus::BodyReadingInProgress) {
1,767✔
628
                auto cancelled = cancelled_;
30✔
629
                handler(0);
30✔
630
                if (!*cancelled && status_ == TransactionStatus::ReachedEnd) {
30✔
631
                        status_ = TransactionStatus::Done;
29✔
632
                        *cancelled_ = true;
29✔
633
                        cancelled_ = make_shared<bool>(true);
29✔
634
                        stream_.reset();
29✔
635
                        CallHandler(body_handler_);
29✔
636
                }
637
                return;
30✔
638
        }
639

640
        reader_buf_start_ = start;
1,737✔
641
        reader_buf_end_ = end;
1,737✔
642
        reader_handler_ = handler;
1,737✔
643
        size_t read_size = end - start;
1,737✔
644
        size_t smallest = min(body_buffer_.size(), read_size);
1,737✔
645

646
        http_response_parser_->get().body().data = body_buffer_.data();
1,737✔
647
        http_response_parser_->get().body().size = smallest;
1,737✔
648

649
        auto &cancelled = cancelled_;
1,737✔
650

651
        if (is_https_) {
1,737✔
652
                http::async_read_some(
×
653
                        *stream_,
×
654
                        response_buffer_,
×
655
                        *http_response_parser_,
×
656
                        [this, cancelled](const error_code &ec, size_t num_read) {
×
657
                                if (!*cancelled) {
×
658
                                        ReadBodyHandler(ec, num_read);
×
659
                                }
660
                        });
×
661
        } else {
662
                http::async_read_some(
3,474✔
663
                        stream_->next_layer(),
1,737✔
664
                        response_buffer_,
1,737✔
665
                        *http_response_parser_,
1,737✔
666
                        [this, cancelled](const error_code &ec, size_t num_read) {
3,474✔
667
                                if (!*cancelled) {
1,737✔
668
                                        ReadBodyHandler(ec, num_read);
1,737✔
669
                                }
670
                        });
1,737✔
671
        }
672
}
673

674
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
1,737✔
675
        if (num_read > 0) {
1,737✔
676
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
1,731✔
677
                response_body_read_ += num_read;
1,731✔
678
        }
679

680
        if (ec == http::make_error_code(http::error::need_buffer)) {
1,737✔
681
                // This can be ignored. We always reset the buffer between reads anyway.
682
                ec = error_code();
979✔
683
        }
684

685
        assert(reader_handler_);
1,737✔
686

687
        if (response_body_read_ >= response_body_length_) {
1,737✔
688
                status_ = TransactionStatus::ReachedEnd;
60✔
689
        }
690

691
        auto cancelled = cancelled_;
1,737✔
692

693
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
1,737✔
694
        size_t smallest = min(num_read, buf_size);
1,737✔
695
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
1,737✔
696
        if (ec) {
1,737✔
697
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
6✔
698
                reader_handler_(expected::unexpected(err));
6✔
699
        } else {
700
                reader_handler_(smallest);
1,731✔
701
        }
702

703
        if (!*cancelled && ec) {
1,737✔
704
                CallErrorHandler(ec, request_, body_handler_);
2✔
705
                return;
2✔
706
        }
707
}
708

709
void Client::Cancel() {
39✔
710
        auto cancelled = cancelled_;
78✔
711

712
        if (!*cancelled) {
39✔
713
                auto err =
714
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP request cancelled");
76✔
715
                if (status_ == TransactionStatus::None) {
38✔
716
                        CallErrorHandler(err, request_, header_handler_);
1✔
717
                } else if (status_ != TransactionStatus::Done) {
37✔
718
                        CallErrorHandler(err, request_, body_handler_);
37✔
719
                }
720
        }
721

722
        if (!*cancelled) {
39✔
723
                DoCancel();
×
724
        }
725
}
39✔
726

727
void Client::DoCancel() {
227✔
728
        resolver_.cancel();
227✔
729
        if (stream_) {
227✔
730
                stream_->next_layer().cancel();
5✔
731
                stream_->next_layer().close();
5✔
732
                stream_.reset();
5✔
733
        }
734

735
        request_.reset();
227✔
736
        response_.reset();
227✔
737

738
        // Reset logger to no connection.
739
        logger_ = log::Logger(logger_name_);
227✔
740

741
        // Set cancel state and then make a new one. Those who are interested should have their own
742
        // pointer to the old one.
743
        *cancelled_ = true;
227✔
744
        cancelled_ = make_shared<bool>(true);
227✔
745
}
227✔
746

747
ClientConfig::ClientConfig() :
59✔
748
        ClientConfig("") {
59✔
749
}
59✔
750

751
ClientConfig::ClientConfig(string server_cert_path) :
213✔
752
        server_cert_path {server_cert_path} {
213✔
753
}
213✔
754

755
ClientConfig::~ClientConfig() {
213✔
756
}
213✔
757

758
ServerConfig::ServerConfig() {
108✔
759
}
108✔
760

761
ServerConfig::~ServerConfig() {
108✔
762
}
108✔
763

764
Stream::Stream(Server &server) :
229✔
765
        server_ {server},
766
        logger_ {"http"},
767
        cancelled_(make_shared<bool>(true)),
×
768
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
229✔
769
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
458✔
770
        // This is equivalent to:
771
        //   request_buffer_.reserve(body_buffer_.size());
772
        // but compatible with Boost 1.67.
773
        request_buffer_.prepare(body_buffer_.size() - request_buffer_.size());
229✔
774

775
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
776
        // they do, they should be handled higher up in the application logic.
777
        //
778
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
779
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
780
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
781
        // possible value instead.
782
        http_request_parser_.body_limit(numeric_limits<uint64_t>::max());
229✔
783
}
229✔
784

785
Stream::~Stream() {
229✔
786
        DoCancel();
229✔
787
}
229✔
788

789
void Stream::Cancel() {
7✔
790
        auto cancelled = cancelled_;
14✔
791

792
        if (!*cancelled) {
7✔
793
                auto err =
794
                        error::Error(make_error_condition(errc::operation_canceled), "HTTP response cancelled");
14✔
795
                if (status_ == TransactionStatus::None) {
7✔
796
                        CallErrorHandler(err, request_, server_.header_handler_);
×
797
                } else if (status_ != TransactionStatus::Done) {
7✔
798
                        CallErrorHandler(err, request_, server_.body_handler_);
5✔
799
                }
800
        }
801

802
        if (!*cancelled) {
7✔
803
                DoCancel();
2✔
804
        }
805
}
7✔
806

807
void Stream::DoCancel() {
333✔
808
        if (socket_.is_open()) {
333✔
809
                socket_.cancel();
115✔
810
                socket_.close();
115✔
811
        }
812

813
        // Set cancel state and then make a new one. Those who are interested should have their own
814
        // pointer to the old one.
815
        *cancelled_ = true;
333✔
816
        cancelled_ = make_shared<bool>(true);
333✔
817
}
333✔
818

819
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
820
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
×
821
}
×
822

823
void Stream::CallErrorHandler(
×
824
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
825
        *cancelled_ = true;
×
826
        cancelled_ = make_shared<bool>(true);
×
827
        status_ = TransactionStatus::Done;
×
828
        handler(expected::unexpected(err.WithContext(
×
829
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
830

831
        server_.RemoveStream(shared_from_this());
×
832
}
×
833

834
void Stream::CallErrorHandler(
2✔
835
        const error_code &ec, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
836
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
2✔
837
}
2✔
838

839
void Stream::CallErrorHandler(
9✔
840
        const error::Error &err, const IncomingRequestPtr &req, IdentifiedRequestHandler handler) {
841
        *cancelled_ = true;
9✔
842
        cancelled_ = make_shared<bool>(true);
9✔
843
        status_ = TransactionStatus::Done;
9✔
844
        handler(
9✔
845
                req,
846
                err.WithContext(
×
847
                        req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
18✔
848

849
        server_.RemoveStream(shared_from_this());
9✔
850
}
9✔
851

852
void Stream::CallErrorHandler(
4✔
853
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
854
        CallErrorHandler(error::Error(ec.default_error_condition(), ""), req, handler);
4✔
855
}
4✔
856

857
void Stream::CallErrorHandler(
6✔
858
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
859
        *cancelled_ = true;
6✔
860
        cancelled_ = make_shared<bool>(true);
6✔
861
        status_ = TransactionStatus::Done;
6✔
862
        handler(err.WithContext(
6✔
863
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
12✔
864

865
        server_.RemoveStream(shared_from_this());
6✔
866
}
6✔
867

868
void Stream::AcceptHandler(const error_code &ec) {
115✔
869
        if (ec) {
115✔
870
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
871
                return;
×
872
        }
873

874
        auto ip = socket_.remote_endpoint().address().to_string();
230✔
875

876
        // Use IP as context for logging.
877
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
115✔
878

879
        logger_.Debug("Accepted connection.");
115✔
880

881
        request_.reset(new IncomingRequest(*this, cancelled_));
115✔
882

883
        request_->address_.host = ip;
115✔
884

885
        *cancelled_ = false;
115✔
886

887
        ReadHeader();
115✔
888
}
889

890
void Stream::ReadHeader() {
115✔
891
        http_request_parser_.get().body().data = body_buffer_.data();
115✔
892
        http_request_parser_.get().body().size = body_buffer_.size();
115✔
893

894
        auto &cancelled = cancelled_;
115✔
895

896
        http::async_read_some(
230✔
897
                socket_,
115✔
898
                request_buffer_,
115✔
899
                http_request_parser_,
900
                [this, cancelled](const error_code &ec, size_t num_read) {
230✔
901
                        if (!*cancelled) {
115✔
902
                                ReadHeaderHandler(ec, num_read);
115✔
903
                        }
904
                });
115✔
905
}
115✔
906

907
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
115✔
908
        if (num_read > 0) {
115✔
909
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
115✔
910
        }
911

912
        if (ec) {
115✔
913
                CallErrorHandler(ec, request_, server_.header_handler_);
×
914
                return;
71✔
915
        }
916

917
        if (!http_request_parser_.is_header_done()) {
115✔
918
                ReadHeader();
×
919
                return;
×
920
        }
921

922
        auto method_result = BeastVerbToMethod(
923
                http_request_parser_.get().base().method(),
115✔
924
                string {http_request_parser_.get().base().method_string()});
230✔
925
        if (!method_result) {
115✔
926
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
927
                return;
×
928
        }
929
        request_->method_ = method_result.value();
115✔
930
        request_->address_.path = string(http_request_parser_.get().base().target());
115✔
931

932
        logger_ = logger_.WithFields(log::LogField("path", request_->address_.path));
115✔
933

934
        string debug_str;
115✔
935
        for (auto header = http_request_parser_.get().cbegin();
373✔
936
                 header != http_request_parser_.get().cend();
746✔
937
                 header++) {
938
                request_->headers_[string {header->name_string()}] = string {header->value()};
516✔
939
                if (logger_.Level() >= log::LogLevel::Debug) {
258✔
940
                        debug_str += string {header->name_string()};
257✔
941
                        debug_str += ": ";
257✔
942
                        debug_str += string {header->value()};
257✔
943
                        debug_str += "\n";
257✔
944
                }
945
        }
946

947
        logger_.Debug("Received headers:\n" + debug_str);
115✔
948
        debug_str.clear();
115✔
949

950
        if (http_request_parser_.chunked()) {
115✔
951
                auto cancelled = cancelled_;
1✔
952
                status_ = TransactionStatus::HeaderHandlerCalled;
1✔
953
                server_.header_handler_(request_);
1✔
954
                if (!*cancelled) {
1✔
955
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
956
                        CallErrorHandler(err, request_, server_.body_handler_);
1✔
957
                }
958
                return;
1✔
959
        }
960

961
        auto content_length = http_request_parser_.content_length();
114✔
962
        if (content_length) {
114✔
963
                request_body_length_ = content_length.value();
45✔
964
        } else {
965
                request_body_length_ = 0;
69✔
966
        }
967
        request_body_read_ = 0;
114✔
968

969
        if (request_body_read_ >= request_body_length_) {
114✔
970
                auto cancelled = cancelled_;
69✔
971
                status_ = TransactionStatus::HeaderHandlerCalled;
69✔
972
                server_.header_handler_(request_);
69✔
973
                if (!*cancelled) {
69✔
974
                        CallBodyHandler();
69✔
975
                }
976
                return;
69✔
977
        }
978

979
        auto cancelled = cancelled_;
45✔
980
        status_ = TransactionStatus::HeaderHandlerCalled;
45✔
981
        server_.header_handler_(request_);
45✔
982
        if (*cancelled) {
45✔
983
                return;
1✔
984
        }
985

986
        // We know that a body reader is required here, because of the `request_body_read_ >=
987
        // request_body_length_` check above.
988
        if (status_ == TransactionStatus::HeaderHandlerCalled) {
44✔
989
                CallErrorHandler(MakeError(BodyIgnoredError, ""), request_, server_.body_handler_);
1✔
990
        }
991
}
992

993
void Stream::AsyncReadNextBodyPart(
2,044✔
994
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
995
        assert(AtLeast(status_, TransactionStatus::ReaderCreated));
2,044✔
996

997
        if (status_ == TransactionStatus::ReaderCreated) {
2,044✔
998
                status_ = TransactionStatus::BodyReadingInProgress;
43✔
999
        }
1000

1001
        if (status_ != TransactionStatus::BodyReadingInProgress) {
2,044✔
1002
                auto cancelled = cancelled_;
37✔
1003
                handler(0);
37✔
1004
                if (!*cancelled && status_ == TransactionStatus::ReachedEnd) {
37✔
1005
                        status_ = TransactionStatus::Done;
37✔
1006
                        CallBodyHandler();
37✔
1007
                }
1008
                return;
37✔
1009
        }
1010

1011
        reader_buf_start_ = start;
2,007✔
1012
        reader_buf_end_ = end;
2,007✔
1013
        reader_handler_ = handler;
2,007✔
1014
        size_t read_size = end - start;
2,007✔
1015
        size_t smallest = min(body_buffer_.size(), read_size);
2,007✔
1016

1017
        http_request_parser_.get().body().data = body_buffer_.data();
2,007✔
1018
        http_request_parser_.get().body().size = smallest;
2,007✔
1019

1020
        auto &cancelled = cancelled_;
2,007✔
1021

1022
        http::async_read_some(
4,014✔
1023
                socket_,
2,007✔
1024
                request_buffer_,
2,007✔
1025
                http_request_parser_,
1026
                [this, cancelled](const error_code &ec, size_t num_read) {
4,014✔
1027
                        if (!*cancelled) {
2,007✔
1028
                                ReadBodyHandler(ec, num_read);
2,007✔
1029
                        }
1030
                });
2,007✔
1031
}
1032

1033
void Stream::ReadBodyHandler(error_code ec, size_t num_read) {
2,007✔
1034
        if (num_read > 0) {
2,007✔
1035
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
2,003✔
1036
                request_body_read_ += num_read;
2,003✔
1037
        }
1038

1039
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,007✔
1040
                // This can be ignored. We always reset the buffer between reads anyway.
1041
                ec = error_code();
979✔
1042
        }
1043

1044
        assert(reader_handler_);
2,007✔
1045

1046
        if (request_body_read_ >= request_body_length_) {
2,007✔
1047
                status_ = TransactionStatus::ReachedEnd;
37✔
1048
        }
1049

1050
        auto cancelled = cancelled_;
2,007✔
1051

1052
        size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,007✔
1053
        size_t smallest = min(num_read, buf_size);
2,007✔
1054
        copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,007✔
1055
        if (ec) {
2,007✔
1056
                auto err = error::Error(ec.default_error_condition(), "Could not read body");
4✔
1057
                reader_handler_(expected::unexpected(err));
4✔
1058
        } else {
1059
                reader_handler_(smallest);
2,003✔
1060
        }
1061

1062
        if (!*cancelled && ec) {
2,007✔
1063
                CallErrorHandler(ec, request_, server_.body_handler_);
2✔
1064
                return;
2✔
1065
        }
1066
}
1067

1068
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
102✔
1069
        auto response = maybe_response_.lock();
102✔
1070
        // Only called from existing responses, so this should always be true.
1071
        assert(response);
102✔
1072

1073
        // From here on we take shared ownership.
1074
        response_ = response;
102✔
1075

1076
        reply_finished_handler_ = reply_finished_handler;
102✔
1077

1078
        http_response_ = make_shared<http::response<http::buffer_body>>();
102✔
1079

1080
        for (const auto &header : response->headers_) {
191✔
1081
                http_response_->base().set(header.first, header.second);
89✔
1082
        }
1083

1084
        http_response_->result(response->GetStatusCode());
102✔
1085
        http_response_->reason(response->GetStatusMessage());
102✔
1086

1087
        http_response_serializer_ =
1088
                make_shared<http::response_serializer<http::buffer_body>>(*http_response_);
102✔
1089

1090
        auto &cancelled = cancelled_;
102✔
1091

1092
        http::async_write_header(
204✔
1093
                socket_,
102✔
1094
                *http_response_serializer_,
102✔
1095
                [this, cancelled](const error_code &ec, size_t num_written) {
203✔
1096
                        if (!*cancelled) {
102✔
1097
                                WriteHeaderHandler(ec, num_written);
101✔
1098
                        }
1099
                });
102✔
1100
}
102✔
1101

1102
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
101✔
1103
        if (num_written > 0) {
101✔
1104
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
101✔
1105
        }
1106

1107
        if (ec) {
101✔
1108
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
1109
                return;
33✔
1110
        }
1111

1112
        auto header = response_->GetHeader("Content-Length");
202✔
1113
        if (!header || header.value() == "0") {
101✔
1114
                FinishReply();
32✔
1115
                return;
32✔
1116
        }
1117

1118
        auto length = common::StringToLongLong(header.value());
69✔
1119
        if (!length || length.value() < 0) {
69✔
1120
                auto err = error::Error(
1121
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
1122
                CallErrorHandler(err, request_, reply_finished_handler_);
×
1123
                return;
×
1124
        }
1125

1126
        if (!response_->body_reader_ && !response_->async_body_reader_) {
69✔
1127
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1128
                CallErrorHandler(err, request_, reply_finished_handler_);
1✔
1129
                return;
1✔
1130
        }
1131

1132
        PrepareAndWriteNewBodyBuffer();
68✔
1133
}
1134

1135
void Stream::PrepareAndWriteNewBodyBuffer() {
835✔
1136
        // response_->body_reader_ XOR response_->async_body_reader_
1137
        assert(
835✔
1138
                (response_->body_reader_ || response_->async_body_reader_)
1139
                && !(response_->body_reader_ && response_->async_body_reader_));
1140

1141
        auto read_handler = [this](io::ExpectedSize read) {
1,671✔
1142
                if (!read) {
835✔
1143
                        CallErrorHandler(read.error(), request_, reply_finished_handler_);
1✔
1144
                        return;
1✔
1145
                }
1146
                WriteNewBodyBuffer(read.value());
834✔
1147
        };
835✔
1148

1149
        if (response_->body_reader_) {
835✔
1150
                read_handler(response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end()));
561✔
1151
        } else {
1152
                auto err = response_->async_body_reader_->AsyncRead(
274✔
1153
                        body_buffer_.begin(), body_buffer_.end(), read_handler);
548✔
1154
                if (err != error::NoError) {
274✔
1155
                        CallErrorHandler(err, request_, reply_finished_handler_);
×
1156
                }
1157
        }
1158
}
835✔
1159

1160
void Stream::WriteNewBodyBuffer(size_t size) {
834✔
1161
        http_response_->body().data = body_buffer_.data();
834✔
1162
        http_response_->body().size = size;
834✔
1163

1164
        if (size > 0) {
834✔
1165
                http_response_->body().more = true;
775✔
1166
        } else {
1167
                // Release ownership of Body reader.
1168
                response_->body_reader_.reset();
59✔
1169
                response_->async_body_reader_.reset();
59✔
1170
                http_response_->body().more = false;
59✔
1171
        }
1172

1173
        WriteBody();
834✔
1174
}
834✔
1175

1176
void Stream::WriteBody() {
1,605✔
1177
        auto &cancelled = cancelled_;
1,605✔
1178

1179
        http::async_write_some(
3,210✔
1180
                socket_,
1,605✔
1181
                *http_response_serializer_,
1,605✔
1182
                [this, cancelled](const error_code &ec, size_t num_written) {
3,183✔
1183
                        if (!*cancelled) {
1,592✔
1184
                                WriteBodyHandler(ec, num_written);
1,591✔
1185
                        }
1186
                });
1,592✔
1187
}
1,605✔
1188

1189
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
1,591✔
1190
        if (num_written > 0) {
1,591✔
1191
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
771✔
1192
        }
1193

1194
        if (ec == http::make_error_code(http::error::need_buffer)) {
1,591✔
1195
                // Write next body block.
1196
                PrepareAndWriteNewBodyBuffer();
767✔
1197
        } else if (ec) {
824✔
1198
                CallErrorHandler(ec, request_, reply_finished_handler_);
4✔
1199
        } else if (num_written > 0) {
820✔
1200
                // We are still writing the body.
1201
                WriteBody();
771✔
1202
        } else {
1203
                // We are finished.
1204
                FinishReply();
49✔
1205
        }
1206
}
1,591✔
1207

1208
void Stream::FinishReply() {
81✔
1209
        // We are done.
1210
        *cancelled_ = true;
81✔
1211
        cancelled_ = make_shared<bool>(true);
81✔
1212
        reply_finished_handler_(error::NoError);
81✔
1213
        server_.RemoveStream(shared_from_this());
81✔
1214
}
81✔
1215

1216
void Stream::CallBodyHandler() {
106✔
1217
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1218
        // it immediately destroys, which would destroy this stream as well. At the end of this
1219
        // function, it's ok to destroy it.
1220
        auto stream_ref = shared_from_this();
212✔
1221

1222
        server_.body_handler_(request_, error::NoError);
106✔
1223

1224
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1225
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1226
        // request has not been handled correctly.
1227
        auto response = maybe_response_.lock();
212✔
1228
        if (!response) {
106✔
1229
                logger_.Error("Handler produced no response. Closing stream prematurely.");
2✔
1230
                *cancelled_ = true;
2✔
1231
                cancelled_ = make_shared<bool>(true);
2✔
1232
                server_.RemoveStream(shared_from_this());
2✔
1233
        }
1234
}
106✔
1235

1236
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
25✔
1237
        event_loop_ {event_loop},
1238
        acceptor_(GetAsioIoContext(event_loop_)) {
25✔
1239
}
25✔
1240

1241
Server::~Server() {
25✔
1242
        Cancel();
25✔
1243
}
25✔
1244

1245
error::Error Server::AsyncServeUrl(
107✔
1246
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1247
        return AsyncServeUrl(
1248
                url, header_handler, [body_handler](IncomingRequestPtr req, error::Error err) {
108✔
1249
                        if (err != error::NoError) {
108✔
1250
                                body_handler(expected::unexpected(err));
7✔
1251
                        } else {
1252
                                body_handler(req);
101✔
1253
                        }
1254
                });
215✔
1255
}
1256

1257
error::Error Server::AsyncServeUrl(
115✔
1258
        const string &url, RequestHandler header_handler, IdentifiedRequestHandler body_handler) {
1259
        auto err = BreakDownUrl(url, address_);
230✔
1260
        if (error::NoError != err) {
115✔
1261
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1262
        }
1263

1264
        if (address_.protocol != "http") {
115✔
1265
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1266
        }
1267

1268
        if (address_.path.size() > 0 && address_.path != "/") {
115✔
1269
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
1✔
1270
        }
1271

1272
        boost::system::error_code ec;
114✔
1273
        auto address = asio::ip::make_address(address_.host, ec);
114✔
1274
        if (ec) {
114✔
1275
                return error::Error(
1276
                        ec.default_error_condition(),
×
1277
                        "Could not construct endpoint from address " + address_.host);
×
1278
        }
1279

1280
        asio::ip::tcp::endpoint endpoint(address, address_.port);
114✔
1281

1282
        ec.clear();
114✔
1283
        acceptor_.open(endpoint.protocol(), ec);
114✔
1284
        if (ec) {
114✔
1285
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1286
        }
1287

1288
        // Allow address reuse, otherwise we can't re-bind later.
1289
        ec.clear();
114✔
1290
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
114✔
1291
        if (ec) {
114✔
1292
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1293
        }
1294

1295
        ec.clear();
114✔
1296
        acceptor_.bind(endpoint, ec);
114✔
1297
        if (ec) {
114✔
1298
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1299
        }
1300

1301
        ec.clear();
114✔
1302
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
114✔
1303
        if (ec) {
114✔
1304
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1305
        }
1306

1307
        header_handler_ = header_handler;
114✔
1308
        body_handler_ = body_handler;
114✔
1309

1310
        PrepareNewStream();
114✔
1311

1312
        return error::NoError;
114✔
1313
}
1314

1315
void Server::Cancel() {
122✔
1316
        if (acceptor_.is_open()) {
122✔
1317
                acceptor_.cancel();
114✔
1318
                acceptor_.close();
114✔
1319
        }
1320
        streams_.clear();
122✔
1321
}
122✔
1322

1323
uint16_t Server::GetPort() const {
7✔
1324
        return acceptor_.local_endpoint().port();
7✔
1325
}
1326

1327
string Server::GetUrl() const {
7✔
1328
        return "http://127.0.0.1:" + to_string(GetPort());
14✔
1329
}
1330

1331
ExpectedOutgoingResponsePtr Server::MakeResponse(IncomingRequestPtr req) {
105✔
1332
        if (*req->cancelled_) {
105✔
1333
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make response"));
×
1334
        }
1335
        OutgoingResponsePtr response {new OutgoingResponse(req->stream_, req->cancelled_)};
210✔
1336
        req->stream_.maybe_response_ = response;
105✔
1337
        return response;
105✔
1338
}
1339

1340
error::Error Server::AsyncReply(
102✔
1341
        OutgoingResponsePtr resp, ReplyFinishedHandler reply_finished_handler) {
1342
        if (*resp->cancelled_) {
102✔
1343
                return MakeError(StreamCancelledError, "Cannot send response");
×
1344
        }
1345

1346
        resp->stream_.AsyncReply(reply_finished_handler);
102✔
1347
        return error::NoError;
102✔
1348
}
1349

1350
io::ExpectedAsyncReaderPtr Server::MakeBodyAsyncReader(IncomingRequestPtr req) {
54✔
1351
        if (*req->cancelled_) {
54✔
1352
                return expected::unexpected(MakeError(StreamCancelledError, "Cannot make body reader"));
×
1353
        }
1354

1355
        auto &stream = req->stream_;
54✔
1356
        if (stream.status_ != TransactionStatus::HeaderHandlerCalled) {
54✔
1357
                return expected::unexpected(error::Error(
1✔
1358
                        make_error_condition(errc::operation_in_progress),
1✔
1359
                        "MakeBodyAsyncReader called while reading is in progress"));
3✔
1360
        }
1361

1362
        if (stream.request_body_length_ == 0) {
53✔
1363
                return expected::unexpected(MakeError(BodyMissingError, "Request does not contain a body"));
20✔
1364
        }
1365

1366
        stream.status_ = TransactionStatus::ReaderCreated;
43✔
1367
        return make_shared<BodyAsyncReader<Stream>>(stream, req->cancelled_);
86✔
1368
}
1369

1370
void Server::PrepareNewStream() {
229✔
1371
        StreamPtr new_stream {new Stream(*this)};
229✔
1372
        streams_.insert(new_stream);
229✔
1373
        AsyncAccept(new_stream);
229✔
1374
}
229✔
1375

1376
void Server::AsyncAccept(StreamPtr stream) {
229✔
1377
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
229✔
1378
                if (ec) {
116✔
1379
                        log::Error("Could not accept connection: " + ec.message());
1✔
1380
                        return;
1✔
1381
                }
1382

1383
                stream->AcceptHandler(ec);
115✔
1384

1385
                this->PrepareNewStream();
115✔
1386
        });
1387
}
229✔
1388

1389
void Server::RemoveStream(const StreamPtr &stream) {
102✔
1390
        streams_.erase(stream);
102✔
1391

1392
        // Work around bug in Boost ASIO: When the handler for `async_read_some` is called with `ec
1393
        // == operation_aborted`, the handler should not access any supplied buffers, because it may
1394
        // be aborted due to object destruction. However, it does access buffers. This means it does
1395
        // not help to call `Cancel()` prior to destruction. We need to call `Cancel()` first, and
1396
        // then wait until the handler which receives `operation_aborted` has run. So do a
1397
        // `Cancel()` followed by `Post()` for this, which should queue us in the correct order:
1398
        // `operation_aborted` -> `Post` handler.
1399
        stream->DoCancel();
102✔
1400
        event_loop_.Post([stream]() {
102✔
1401
                // No-op, just keep `stream` alive until we get back to this handler.
1402
        });
204✔
1403
}
102✔
1404

1405
} // namespace http
1406
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc