• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 978727395

24 Aug 2023 11:26AM UTC coverage: 79.085% (+0.2%) from 78.84%
978727395

push

gitlab-ci

lluiscampos
feat: Implement `http::DownloadResumer`

Implement class to download the Artifact, which will react to server
disconnections or other sorts of short read by scheduling new HTTP
requests with `Range` header.

See https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests for
an introduction to the feature, and read the specification for more
details.

The user calls _once_ `AsyncCall` with the header and body handlers, and
`DownloadResumer` will call back these handlers _once_ (each). The data
is passed to the user at operation completion.

The validation of the `Content-Range` header and the cases for the unit
tests are heavily inspired by the legacy client. See:
* https://github.com/mendersoftware/mender/blob/<a class=hub.com/mendersoftware/mender/commit/d9010526d35d3ac861ea1e4210d36c2fef748ef8">d9010526d/client/update_resumer.go#L113
* https://github.com/mendersoftware/mender/blob/d9010526d35d3ac861ea1e4210d36c2fef748ef8/client/update_resumer_test.go#L197

Ticket: MEN-6498
Changelog: None

Signed-off-by: Lluis Campos <lluis.campos@northern.tech>

231 of 231 new or added lines in 3 files covered. (100.0%)

5706 of 7215 relevant lines covered (79.09%)

278.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.31
/common/http/platform/beast/http.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <common/http.hpp>
16

17
#include <algorithm>
18

19
#include <boost/asio/ip/tcp.hpp>
20
#include <boost/asio/ssl/verify_mode.hpp>
21
#include <boost/asio.hpp>
22

23
#include <common/common.hpp>
24

25
namespace mender {
26
namespace http {
27

28
namespace common = mender::common;
29

30
// At the time of writing, Beast only supports HTTP/1.1, and is unlikely to support HTTP/2
31
// according to this discussion: https://github.com/boostorg/beast/issues/1302.
32
const unsigned int BeastHttpVersion = 11;
33

34
namespace asio = boost::asio;
35
namespace http = boost::beast::http;
36

37
const int HTTP_BEAST_BUFFER_SIZE = MENDER_BUFSIZE;
38

39
static http::verb MethodToBeastVerb(Method method) {
174✔
40
        switch (method) {
174✔
41
        case Method::GET:
146✔
42
                return http::verb::get;
146✔
43
        case Method::HEAD:
×
44
                return http::verb::head;
×
45
        case Method::POST:
19✔
46
                return http::verb::post;
19✔
47
        case Method::PUT:
9✔
48
                return http::verb::put;
9✔
49
        case Method::PATCH:
×
50
                return http::verb::patch;
×
51
        case Method::CONNECT:
×
52
                return http::verb::connect;
×
53
        case Method::Invalid:
×
54
                // Fallthrough to end (no-op).
55
                break;
×
56
        }
57
        // Don't use "default" case. This should generate a warning if we ever add any methods. But
58
        // still assert here for safety.
59
        assert(false);
×
60
        return http::verb::get;
61
}
62

63
static expected::expected<Method, error::Error> BeastVerbToMethod(
171✔
64
        http::verb verb, const string &verb_string) {
65
        switch (verb) {
171✔
66
        case http::verb::get:
143✔
67
                return Method::GET;
143✔
68
        case http::verb::head:
×
69
                return Method::HEAD;
×
70
        case http::verb::post:
19✔
71
                return Method::POST;
19✔
72
        case http::verb::put:
9✔
73
                return Method::PUT;
9✔
74
        case http::verb::patch:
×
75
                return Method::PATCH;
×
76
        case http::verb::connect:
×
77
                return Method::CONNECT;
×
78
        default:
×
79
                return expected::unexpected(MakeError(UnsupportedMethodError, verb_string));
×
80
        }
81
}
82

83
error::Error OutgoingResponse::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
167✔
84
        auto stream = stream_.lock();
334✔
85
        if (!stream) {
167✔
86
                return MakeError(StreamCancelledError, "Cannot send response");
1✔
87
        }
88

89
        stream->AsyncReply(reply_finished_handler);
166✔
90
        has_replied_ = true;
166✔
91
        return error::NoError;
166✔
92
}
93

94
Client::Client(
52✔
95
        const ClientConfig &client, events::EventLoop &event_loop, const string &logger_name) :
175✔
96
        event_loop_ {event_loop},
97
        logger_name_ {logger_name},
98
        cancelled_ {make_shared<bool>(false)},
×
99
        resolver_(GetAsioIoContext(event_loop)),
100
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
104✔
101
        // This is equivalent to:
102
        //   response_buffer_.reserve(body_buffer_.size());
103
        // but compatible with Boost 1.67.
104
        response_buffer_.prepare(body_buffer_.size() - response_buffer_.size());
52✔
105

106
        ssl_ctx_.set_verify_mode(ssl::verify_peer);
52✔
107

108
        beast::error_code ec {};
52✔
109
        ssl_ctx_.set_default_verify_paths(ec); // Load the default CAs
52✔
110
        if (ec) {
52✔
111
                log::Error("Failed to load the SSL default directory");
×
112
        }
113
        if (client.server_cert_path != "") {
52✔
114
                ssl_ctx_.load_verify_file(client.server_cert_path, ec);
×
115
                if (ec) {
×
116
                        log::Error("Failed to load the server certificate!");
×
117
                }
118
        }
119
}
52✔
120

121
Client::~Client() {
52✔
122
        if (client_active_) {
52✔
123
                logger_.Warning("Client destroyed while request is still active!");
×
124
        }
125
        Cancel();
52✔
126
}
52✔
127

128
error::Error Client::AsyncCall(
191✔
129
        OutgoingRequestPtr req, ResponseHandler header_handler, ResponseHandler body_handler) {
130
        if (client_active_) {
191✔
131
                return error::Error(
132
                        make_error_condition(errc::operation_in_progress), "HTTP call already ongoing");
×
133
        }
134

135
        if (req->address_.protocol == "" || req->address_.host == "" || req->address_.port < 0) {
191✔
136
                return error::MakeError(error::ProgrammingError, "Request is not ready");
2✔
137
        }
138

139
        if (!header_handler || !body_handler) {
189✔
140
                return error::MakeError(
141
                        error::ProgrammingError, "header_handler and body_handler can not be nullptr");
1✔
142
        }
143

144
        if (req->address_.protocol != "http" && req->address_.protocol != "https") {
188✔
145
                return error::Error(
146
                        make_error_condition(errc::protocol_not_supported), req->address_.protocol);
2✔
147
        }
148

149
        if (req->address_.protocol == "https") {
187✔
150
                is_https_ = true;
3✔
151
        }
152

153
        logger_ = log::Logger(logger_name_).WithFields(log::LogField("url", req->orig_address_));
187✔
154

155
        // NOTE: The AWS loadbalancer requires that the HOST header always be set, in order for the
156
        // request to route to our k8s cluster. Set this in all cases.
157
        req->SetHeader("HOST", req->address_.host);
187✔
158

159
        request_ = req;
187✔
160
        header_handler_ = header_handler;
187✔
161
        body_handler_ = body_handler;
187✔
162
        ignored_body_message_issued_ = false;
187✔
163

164
        // See comment in header.
165
        client_active_.reset(this, [](Client *) {});
187✔
166

167
        weak_ptr<Client> weak_client(client_active_);
374✔
168

169
        resolver_.async_resolve(
561✔
170
                request_->address_.host,
187✔
171
                to_string(request_->address_.port),
374✔
172
                [weak_client](const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
187✔
173
                        auto client = weak_client.lock();
374✔
174
                        if (client) {
187✔
175
                                client->ResolveHandler(ec, results);
186✔
176
                        }
177
                });
187✔
178

179
        return error::NoError;
187✔
180
}
181

182
void Client::CallHandler(ResponseHandler handler) {
263✔
183
        // This function exists to make sure we have a copy of the handler we're calling (in the
184
        // argument list). This is important in case the handler owns the client instance through a
185
        // capture, and it replaces the handler with a different one (using `AsyncCall`). If it
186
        // does, then it destroys the final copy of the handler, and therefore also the client,
187
        // which is why we need to make a copy here, before calling it.
188
        handler(response_);
263✔
189
}
263✔
190

191
void Client::CallErrorHandler(
86✔
192
        const error_code &ec, const OutgoingRequestPtr &req, ResponseHandler handler) {
193
        client_active_.reset();
86✔
194
        stream_.reset();
86✔
195
        handler(expected::unexpected(error::Error(
172✔
196
                ec.default_error_condition(), MethodToString(req->method_) + " " + req->orig_address_)));
258✔
197
}
86✔
198

199
void Client::CallErrorHandler(
2✔
200
        const error::Error &err, const OutgoingRequestPtr &req, ResponseHandler handler) {
201
        client_active_.reset();
2✔
202
        stream_.reset();
2✔
203
        handler(expected::unexpected(error::Error(
4✔
204
                err.code, err.message + ": " + MethodToString(req->method_) + " " + req->orig_address_)));
6✔
205
}
2✔
206

207
void Client::ResolveHandler(
186✔
208
        const error_code &ec, const asio::ip::tcp::resolver::results_type &results) {
209
        if (ec) {
186✔
210
                CallErrorHandler(ec, request_, header_handler_);
×
211
                return;
×
212
        }
213

214
        if (logger_.Level() >= log::LogLevel::Debug) {
186✔
215
                string ips = "[";
370✔
216
                string sep;
185✔
217
                for (auto r : results) {
381✔
218
                        ips += sep;
196✔
219
                        ips += r.endpoint().address().to_string();
196✔
220
                        sep = ", ";
196✔
221
                }
222
                ips += "]";
185✔
223
                logger_.Debug("Hostname " + request_->address_.host + " resolved to " + ips);
185✔
224
        }
225

226
        resolver_results_ = results;
186✔
227

228
        stream_ = make_shared<ssl::stream<tcp::socket>>(GetAsioIoContext(event_loop_), ssl_ctx_);
186✔
229

230
        http_response_parser_ = make_shared<http::response_parser<http::buffer_body>>();
186✔
231

232
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and
233
        // if they do, they should be handled higher up in the application logic.
234
        //
235
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to
236
        // pass an uninitialized `optional` to mean unlimited, but they do not check for
237
        // `has_value()` in their code, causing their subsequent comparison operation to
238
        // misbehave. So pass highest possible value instead.
239
        http_response_parser_->body_limit(numeric_limits<uint64_t>::max());
186✔
240

241
        weak_ptr<Client> weak_client(client_active_);
186✔
242

243
        asio::async_connect(
372✔
244
                stream_->next_layer(),
186✔
245
                resolver_results_,
186✔
246
                [weak_client](const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
186✔
247
                        auto client = weak_client.lock();
186✔
248
                        if (client) {
186✔
249
                                if (client->is_https_) {
186✔
250
                                        return client->HandshakeHandler(ec, endpoint);
3✔
251
                                }
252
                                return client->ConnectHandler(ec, endpoint);
183✔
253
                        }
254
                });
255
}
256

257
void Client::HandshakeHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
3✔
258
        if (ec) {
3✔
259
                CallErrorHandler(ec, request_, header_handler_);
×
260
                return;
×
261
        }
262

263
        // Set SNI Hostname (many hosts need this to handshake successfully)
264
        if (!SSL_set_tlsext_host_name(stream_->native_handle(), request_->address_.host.c_str())) {
3✔
265
                beast::error_code ec2 {
266
                        static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()};
×
267
                logger_.Error("Failed to set SNI host name: " + ec2.message());
×
268
        }
269

270
        weak_ptr<Client> weak_client(client_active_);
3✔
271

272
        stream_->async_handshake(
3✔
273
                ssl::stream_base::client, [weak_client, endpoint](const error_code &ec) {
3✔
274
                        auto client = weak_client.lock();
3✔
275
                        if (!client) {
3✔
276
                                return;
×
277
                        }
278
                        if (ec) {
3✔
279
                                client->logger_.Error(
2✔
280
                                        "https: Failed to perform the SSL handshake: " + ec.message());
2✔
281
                                client->CallErrorHandler(ec, client->request_, client->header_handler_);
1✔
282
                                return;
1✔
283
                        }
284
                        client->logger_.Debug("https: Successful SSL handshake");
2✔
285
                        client->ConnectHandler(ec, endpoint);
2✔
286
                });
287
}
288

289

290
void Client::ConnectHandler(const error_code &ec, const asio::ip::tcp::endpoint &endpoint) {
185✔
291
        if (ec) {
185✔
292
                CallErrorHandler(ec, request_, header_handler_);
11✔
293
                return;
11✔
294
        }
295

296
        logger_.Debug("Connected to " + endpoint.address().to_string());
174✔
297

298
        http_request_ = make_shared<http::request<http::buffer_body>>(
174✔
299
                MethodToBeastVerb(request_->method_), request_->address_.path, BeastHttpVersion);
174✔
300

301
        for (const auto &header : request_->headers_) {
529✔
302
                http_request_->set(header.first, header.second);
355✔
303
        }
304

305
        http_request_serializer_ =
306
                make_shared<http::request_serializer<http::buffer_body>>(*http_request_);
174✔
307

308
        weak_ptr<Client> weak_client(client_active_);
348✔
309

310
        if (is_https_) {
174✔
311
                http::async_write_header(
4✔
312
                        *stream_,
2✔
313
                        *http_request_serializer_,
2✔
314
                        [weak_client](const error_code &ec, size_t num_written) {
2✔
315
                                auto client = weak_client.lock();
4✔
316
                                if (client) {
2✔
317
                                        client->WriteHeaderHandler(ec, num_written);
2✔
318
                                }
319
                        });
2✔
320
        } else {
321
                http::async_write_header(
344✔
322
                        stream_->next_layer(),
172✔
323
                        *http_request_serializer_,
172✔
324
                        [weak_client](const error_code &ec, size_t num_written) {
172✔
325
                                auto client = weak_client.lock();
344✔
326
                                if (client) {
172✔
327
                                        client->WriteHeaderHandler(ec, num_written);
172✔
328
                                }
329
                        });
172✔
330
        }
331
}
332

333
void Client::WriteHeaderHandler(const error_code &ec, size_t num_written) {
174✔
334
        if (num_written > 0) {
174✔
335
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
174✔
336
        }
337

338
        if (ec) {
174✔
339
                CallErrorHandler(ec, request_, header_handler_);
×
340
                return;
143✔
341
        }
342

343
        auto header = request_->GetHeader("Content-Length");
348✔
344
        if (!header || header.value() == "0") {
174✔
345
                ReadHeader();
142✔
346
                return;
142✔
347
        }
348

349
        auto length = common::StringToLongLong(header.value());
32✔
350
        if (!length || length.value() < 0) {
32✔
351
                auto err = error::Error(
352
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
353
                CallErrorHandler(err, request_, header_handler_);
×
354
                return;
×
355
        }
356
        request_body_length_ = length.value();
32✔
357

358
        if (!request_->body_gen_) {
32✔
359
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
360
                CallErrorHandler(err, request_, header_handler_);
1✔
361
                return;
1✔
362
        }
363

364
        auto body_reader = request_->body_gen_();
31✔
365
        if (!body_reader) {
31✔
366
                CallErrorHandler(body_reader.error(), request_, header_handler_);
×
367
                return;
×
368
        }
369
        request_->body_reader_ = body_reader.value();
31✔
370

371
        PrepareBufferAndWriteBody();
31✔
372
}
373

374
void Client::WriteBodyHandler(const error_code &ec, size_t num_written) {
407✔
375
        if (num_written > 0) {
407✔
376
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
188✔
377
        }
378

379
        if (ec == http::make_error_code(http::error::need_buffer)) {
407✔
380
                // Write next block of the body.
381
                PrepareBufferAndWriteBody();
188✔
382
        } else if (ec) {
219✔
383
                CallErrorHandler(ec, request_, header_handler_);
1✔
384
        } else if (num_written > 0) {
218✔
385
                // We are still writing the body.
386
                WriteBody();
188✔
387
        } else {
388
                // We are ready to receive the response.
389
                ReadHeader();
30✔
390
        }
391
}
407✔
392

393
void Client::PrepareBufferAndWriteBody() {
219✔
394
        auto read = request_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end());
219✔
395
        if (!read) {
219✔
396
                CallErrorHandler(read.error(), request_, header_handler_);
×
397
                return;
×
398
        }
399

400
        http_request_->body().data = body_buffer_.data();
219✔
401
        http_request_->body().size = read.value();
219✔
402

403
        if (read.value() > 0) {
219✔
404
                http_request_->body().more = true;
189✔
405
        } else {
406
                // Release ownership of Body reader.
407
                request_->body_reader_.reset();
30✔
408
                http_request_->body().more = false;
30✔
409
        }
410

411
        WriteBody();
219✔
412
}
413

414
void Client::WriteBody() {
407✔
415
        weak_ptr<Client> weak_client(client_active_);
814✔
416

417
        if (is_https_) {
407✔
418
                http::async_write_some(
×
419
                        *stream_,
×
420
                        *http_request_serializer_,
×
421
                        [weak_client](const error_code &ec, size_t num_written) {
×
422
                                auto client = weak_client.lock();
×
423
                                if (client) {
×
424
                                        client->WriteBodyHandler(ec, num_written);
×
425
                                }
426
                        });
×
427
        } else {
428
                http::async_write_some(
814✔
429
                        stream_->next_layer(),
407✔
430
                        *http_request_serializer_,
407✔
431
                        [weak_client](const error_code &ec, size_t num_written) {
407✔
432
                                auto client = weak_client.lock();
814✔
433
                                if (client) {
407✔
434
                                        client->WriteBodyHandler(ec, num_written);
407✔
435
                                }
436
                        });
407✔
437
        }
438
}
407✔
439

440
void Client::ReadHeader() {
172✔
441
        http_response_parser_->get().body().data = body_buffer_.data();
172✔
442
        http_response_parser_->get().body().size = body_buffer_.size();
172✔
443

444
        weak_ptr<Client> weak_client(client_active_);
344✔
445

446
        if (is_https_) {
172✔
447
                http::async_read_some(
4✔
448
                        *stream_,
2✔
449
                        response_buffer_,
2✔
450
                        *http_response_parser_,
2✔
451
                        [weak_client](const error_code &ec, size_t num_read) {
2✔
452
                                auto client = weak_client.lock();
4✔
453
                                if (client) {
2✔
454
                                        client->ReadHeaderHandler(ec, num_read);
2✔
455
                                }
456
                        });
2✔
457
        } else {
458
                http::async_read_some(
340✔
459
                        stream_->next_layer(),
170✔
460
                        response_buffer_,
170✔
461
                        *http_response_parser_,
170✔
462
                        [weak_client](const error_code &ec, size_t num_read) {
169✔
463
                                auto client = weak_client.lock();
338✔
464
                                if (client) {
169✔
465
                                        client->ReadHeaderHandler(ec, num_read);
169✔
466
                                }
467
                        });
169✔
468
        }
469
}
172✔
470

471
void Client::ReadHeaderHandler(const error_code &ec, size_t num_read) {
171✔
472
        if (num_read > 0) {
171✔
473
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
168✔
474
        }
475

476
        if (ec) {
171✔
477
                CallErrorHandler(ec, request_, header_handler_);
3✔
478
                return;
35✔
479
        }
480

481
        if (!http_response_parser_->is_header_done()) {
168✔
482
                ReadHeader();
×
483
                return;
×
484
        }
485

486
        response_.reset(new IncomingResponse(client_active_));
168✔
487
        response_->status_code_ = http_response_parser_->get().result_int();
168✔
488
        response_->status_message_ = string {http_response_parser_->get().reason()};
168✔
489

490
        string debug_str;
168✔
491
        for (auto header = http_response_parser_->get().cbegin();
386✔
492
                 header != http_response_parser_->get().cend();
772✔
493
                 header++) {
494
                response_->headers_[string {header->name_string()}] = string {header->value()};
436✔
495
                if (logger_.Level() >= log::LogLevel::Debug) {
218✔
496
                        debug_str += string {header->name_string()};
217✔
497
                        debug_str += ": ";
217✔
498
                        debug_str += string {header->value()};
217✔
499
                        debug_str += "\n";
217✔
500
                }
501
        }
502

503
        logger_.Debug("Received headers:\n" + debug_str);
168✔
504
        debug_str.clear();
168✔
505

506
        if (http_response_parser_->chunked()) {
168✔
507
                auto cancelled = cancelled_;
1✔
508
                CallHandler(header_handler_);
1✔
509
                if (!*cancelled) {
1✔
510
                        auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
511
                        CallErrorHandler(err, request_, body_handler_);
1✔
512
                }
513
                return;
1✔
514
        }
515

516
        auto content_length = http_response_parser_->content_length();
167✔
517
        if (content_length) {
167✔
518
                response_body_length_ = content_length.value();
149✔
519
        } else {
520
                response_body_length_ = 0;
18✔
521
        }
522
        response_body_read_ = 0;
167✔
523

524
        if (response_body_read_ >= response_body_length_) {
167✔
525
                auto cancelled = cancelled_;
30✔
526
                CallHandler(header_handler_);
30✔
527
                if (!*cancelled) {
30✔
528
                        // Release ownership of writer, which closes it if there are no other holders.
529
                        if (response_) {
29✔
530
                                response_->body_writer_.reset();
29✔
531
                        }
532
                        client_active_.reset();
29✔
533
                        stream_.reset();
29✔
534
                        CallHandler(body_handler_);
29✔
535
                }
536
                return;
30✔
537
        }
538

539
        auto cancelled = cancelled_;
137✔
540
        CallHandler(header_handler_);
137✔
541
        if (*cancelled) {
137✔
542
                return;
1✔
543
        }
544

545
        if (response_ && !response_->body_async_reader_) {
136✔
546
                // If there is no registered reader, then we need to schedule the download
547
                // ourselves. Else the reader will do it.
548
                ReadNextBodyPart(body_buffer_.size());
104✔
549
        }
550
}
551

552
void Client::AsyncReadNextBodyPart(
2,144✔
553
        vector<uint8_t>::iterator start, vector<uint8_t>::iterator end, io::AsyncIoHandler handler) {
554
        reader_buf_start_ = start;
2,144✔
555
        reader_buf_end_ = end;
2,144✔
556
        reader_handler_ = handler;
2,144✔
557
        size_t read_size = end - start;
2,144✔
558
        size_t smallest = min(body_buffer_.size(), read_size);
2,144✔
559
        ReadNextBodyPart(smallest);
2,144✔
560
}
2,144✔
561

562
void Client::ReadNextBodyPart(size_t count) {
3,523✔
563
        http_response_parser_->get().body().data = body_buffer_.data();
3,523✔
564
        http_response_parser_->get().body().size = count;
3,523✔
565

566
        weak_ptr<Client> weak_client(client_active_);
7,046✔
567

568
        if (is_https_) {
3,523✔
569
                http::async_read_some(
2✔
570
                        *stream_,
1✔
571
                        response_buffer_,
1✔
572
                        *http_response_parser_,
1✔
573
                        [weak_client](const error_code &ec, size_t num_read) {
1✔
574
                                auto client = weak_client.lock();
2✔
575
                                if (client) {
1✔
576
                                        client->ReadBodyHandler(ec, num_read);
1✔
577
                                }
578
                        });
1✔
579
        } else {
580
                http::async_read_some(
7,044✔
581
                        stream_->next_layer(),
3,522✔
582
                        response_buffer_,
3,522✔
583
                        *http_response_parser_,
3,522✔
584
                        [weak_client](const error_code &ec, size_t num_read) {
3,521✔
585
                                auto client = weak_client.lock();
7,042✔
586
                                if (client) {
3,521✔
587
                                        client->ReadBodyHandler(ec, num_read);
3,521✔
588
                                }
589
                        });
3,521✔
590
        }
591
}
3,523✔
592

593
void Client::ReadBodyHandler(error_code ec, size_t num_read) {
3,522✔
594
        if (num_read > 0) {
3,522✔
595
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
3,452✔
596
                response_body_read_ += num_read;
3,452✔
597
        }
598

599
        if (ec == http::make_error_code(http::error::need_buffer)) {
3,522✔
600
                // This can be ignored. We always reset the buffer between reads anyway.
601
                ec = error_code();
1,958✔
602
        }
603

604
        if (response_->body_async_reader_) {
3,522✔
605
                assert(reader_handler_);
2,143✔
606

607
                size_t buf_size = reader_buf_end_ - reader_buf_start_;
2,143✔
608
                size_t smallest = min(num_read, buf_size);
2,143✔
609
                copy_n(body_buffer_.begin(), smallest, reader_buf_start_);
2,143✔
610
                if (ec) {
2,143✔
611
                        auto err = error::Error(ec.default_error_condition(), "Could not read body");
1✔
612
                        reader_handler_(expected::unexpected(err));
1✔
613
                } else {
614
                        reader_handler_(smallest);
2,142✔
615
                }
616
                if (num_read == 0) {
2,143✔
617
                        response_->body_async_reader_->done_ = true;
1✔
618
                }
619
        }
620

621
        if (ec) {
3,522✔
622
                CallErrorHandler(ec, request_, body_handler_);
70✔
623
                return;
70✔
624
        }
625

626
        if (response_->body_writer_ && num_read > 0) {
3,452✔
627
                auto written =
628
                        response_->body_writer_->Write(body_buffer_.begin(), body_buffer_.begin() + num_read);
1,071✔
629
                if (!written) {
1,071✔
630
                        CallErrorHandler(written.error(), request_, body_handler_);
×
631
                        return;
×
632
                } else if (written.value() != num_read) {
1,071✔
633
                        CallErrorHandler(
×
634
                                error::Error(make_error_condition(errc::io_error), "Short write when writing body"),
×
635
                                request_,
×
636
                                body_handler_);
×
637
                        return;
×
638
                }
639
        }
640

641
        if (!response_->body_writer_ && !response_->body_async_reader_ && num_read > 0
5,833✔
642
                && !ignored_body_message_issued_) {
5,833✔
643
                logger_.Debug("Response contains a body, but we are ignoring it");
15✔
644
                ignored_body_message_issued_ = true;
15✔
645
        }
646

647
        if (response_body_read_ >= response_body_length_) {
3,452✔
648
                // Release ownership of writer, which closes it if there are no other holders.
649
                if (response_) {
66✔
650
                        response_->body_writer_.reset();
66✔
651
                }
652
                client_active_.reset();
66✔
653
                stream_.reset();
66✔
654
                CallHandler(body_handler_);
66✔
655
                return;
66✔
656
        }
657

658
        if (response_ && !response_->body_async_reader_) {
3,386✔
659
                ReadNextBodyPart(body_buffer_.size());
1,275✔
660
        }
661
}
662

663
void Client::Cancel() {
230✔
664
        resolver_.cancel();
230✔
665
        if (stream_) {
230✔
666
                stream_->next_layer().cancel();
3✔
667
                stream_->next_layer().close();
3✔
668
                stream_.reset();
3✔
669
        }
670
        client_active_.reset();
230✔
671

672
        request_.reset();
230✔
673
        response_.reset();
230✔
674

675
        // Reset logger to no connection.
676
        logger_ = log::Logger(logger_name_);
230✔
677

678
        // Set cancel state and then make a new one. Those who are interested should have their own
679
        // pointer to the old one.
680
        *cancelled_ = true;
230✔
681
        cancelled_ = make_shared<bool>(false);
230✔
682
}
230✔
683

684
ClientConfig::ClientConfig() :
69✔
685
        ClientConfig("") {
69✔
686
}
69✔
687

688
ClientConfig::ClientConfig(string server_cert_path) :
220✔
689
        server_cert_path {server_cert_path} {
220✔
690
}
220✔
691

692
ClientConfig::~ClientConfig() {
220✔
693
}
220✔
694

695
ServerConfig::ServerConfig() {
119✔
696
}
119✔
697

698
ServerConfig::~ServerConfig() {
119✔
699
}
119✔
700

701
Stream::Stream(Server &server) :
291✔
702
        server_ {server},
703
        logger_ {"http"},
704
        socket_(server_.GetAsioIoContext(server_.event_loop_)),
291✔
705
        body_buffer_(HTTP_BEAST_BUFFER_SIZE) {
582✔
706
        // This is equivalent to:
707
        //   request_buffer_.reserve(body_buffer_.size());
708
        // but compatible with Boost 1.67.
709
        request_buffer_.prepare(body_buffer_.size() - request_buffer_.size());
291✔
710

711
        // Don't enforce limits. Since we stream everything, limits don't generally apply, and if
712
        // they do, they should be handled higher up in the application logic.
713
        //
714
        // Note: There is a bug in Beast here (tested on 1.74): One is supposed to be able to pass
715
        // an uninitialized `optional` to mean unlimited, but they do not check for `has_value()` in
716
        // their code, causing their subsequent comparison operation to misbehave. So pass highest
717
        // possible value instead.
718
        http_request_parser_.body_limit(numeric_limits<uint64_t>::max());
291✔
719
}
291✔
720

721
Stream::~Stream() {
291✔
722
        Cancel();
291✔
723
}
291✔
724

725
void Stream::Cancel() {
294✔
726
        if (socket_.is_open()) {
294✔
727
                socket_.cancel();
172✔
728
                socket_.close();
172✔
729
        }
730
        stream_active_.reset();
294✔
731
}
294✔
732

733
void Stream::CallErrorHandler(const error_code &ec, const RequestPtr &req, RequestHandler handler) {
×
734
        stream_active_.reset();
×
735
        handler(expected::unexpected(error::Error(
×
736
                ec.default_error_condition(),
×
737
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath())));
×
738

739
        server_.RemoveStream(shared_from_this());
×
740
}
×
741

742
void Stream::CallErrorHandler(
1✔
743
        const error::Error &err, const RequestPtr &req, RequestHandler handler) {
744
        stream_active_.reset();
1✔
745
        handler(expected::unexpected(error::Error(
2✔
746
                err.code,
1✔
747
                err.message + ": " + req->address_.host + ": " + MethodToString(req->method_) + " "
2✔
748
                        + request_->GetPath())));
5✔
749

750
        server_.RemoveStream(shared_from_this());
1✔
751
}
1✔
752

753
void Stream::CallErrorHandler(
1✔
754
        const error_code &ec, const RequestPtr &req, ReplyFinishedHandler handler) {
755
        stream_active_.reset();
1✔
756
        handler(error::Error(
1✔
757
                ec.default_error_condition(),
1✔
758
                req->address_.host + ": " + MethodToString(req->method_) + " " + request_->GetPath()));
3✔
759

760
        server_.RemoveStream(shared_from_this());
1✔
761
}
1✔
762

763
void Stream::CallErrorHandler(
1✔
764
        const error::Error &err, const RequestPtr &req, ReplyFinishedHandler handler) {
765
        stream_active_.reset();
1✔
766
        handler(error::Error(
1✔
767
                err.code,
1✔
768
                err.message + ": " + req->address_.host + ": " + MethodToString(req->method_) + " "
2✔
769
                        + request_->GetPath()));
4✔
770

771
        server_.RemoveStream(shared_from_this());
1✔
772
}
1✔
773

774
void Stream::AcceptHandler(const error_code &ec) {
172✔
775
        if (ec) {
172✔
776
                log::Error("Error while accepting HTTP connection: " + ec.message());
×
777
                return;
×
778
        }
779

780
        auto ip = socket_.remote_endpoint().address().to_string();
344✔
781

782
        // Use IP as context for logging.
783
        logger_ = log::Logger("http_server").WithFields(log::LogField("ip", ip));
172✔
784

785
        logger_.Debug("Accepted connection.");
172✔
786

787
        request_.reset(new IncomingRequest);
172✔
788
        request_->stream_ = shared_from_this();
172✔
789

790
        request_->address_.host = ip;
172✔
791

792
        stream_active_.reset(this, [](Stream *) {});
172✔
793

794
        ReadHeader();
172✔
795
}
796

797
void Stream::ReadHeader() {
172✔
798
        http_request_parser_.get().body().data = body_buffer_.data();
172✔
799
        http_request_parser_.get().body().size = body_buffer_.size();
172✔
800

801
        weak_ptr<Stream> weak_stream(stream_active_);
172✔
802

803
        http::async_read_some(
344✔
804
                socket_,
172✔
805
                request_buffer_,
172✔
806
                http_request_parser_,
807
                [weak_stream](const error_code &ec, size_t num_read) {
171✔
808
                        auto stream = weak_stream.lock();
342✔
809
                        if (stream) {
171✔
810
                                stream->ReadHeaderHandler(ec, num_read);
171✔
811
                        }
812
                });
171✔
813
}
172✔
814

815
void Stream::ReadHeaderHandler(const error_code &ec, size_t num_read) {
171✔
816
        if (num_read > 0) {
171✔
817
                logger_.Trace("Read " + to_string(num_read) + " bytes of header data from stream.");
171✔
818
        }
819

820
        if (ec) {
171✔
821
                CallErrorHandler(ec, request_, server_.header_handler_);
×
822
                return;
140✔
823
        }
824

825
        if (!http_request_parser_.is_header_done()) {
171✔
826
                ReadHeader();
×
827
                return;
×
828
        }
829

830
        auto method_result = BeastVerbToMethod(
831
                http_request_parser_.get().base().method(),
171✔
832
                string {http_request_parser_.get().base().method_string()});
342✔
833
        if (!method_result) {
171✔
834
                CallErrorHandler(method_result.error(), request_, server_.header_handler_);
×
835
                return;
×
836
        }
837
        request_->method_ = method_result.value();
171✔
838
        request_->address_.path = string(http_request_parser_.get().base().target());
171✔
839

840
        string debug_str;
171✔
841
        for (auto header = http_request_parser_.get().cbegin();
522✔
842
                 header != http_request_parser_.get().cend();
1,044✔
843
                 header++) {
844
                request_->headers_[string {header->name_string()}] = string {header->value()};
702✔
845
                if (logger_.Level() >= log::LogLevel::Debug) {
351✔
846
                        debug_str += string {header->name_string()};
350✔
847
                        debug_str += ": ";
350✔
848
                        debug_str += string {header->value()};
350✔
849
                        debug_str += "\n";
350✔
850
                }
851
        }
852

853
        logger_.Debug("Received headers:\n" + debug_str);
171✔
854
        debug_str.clear();
171✔
855

856
        if (http_request_parser_.chunked()) {
171✔
857
                server_.header_handler_(request_);
1✔
858
                auto err = MakeError(UnsupportedBodyType, "`Transfer-Encoding: chunked` not supported");
2✔
859
                CallErrorHandler(err, request_, server_.body_handler_);
1✔
860
                return;
1✔
861
        }
862

863
        if (http_request_parser_.is_done()) {
170✔
864
                server_.header_handler_(request_);
139✔
865
                CallBodyHandler();
139✔
866
                return;
139✔
867
        }
868

869
        http_request_parser_.get().body().data = body_buffer_.data();
31✔
870
        http_request_parser_.get().body().size = body_buffer_.size();
31✔
871

872
        weak_ptr<Stream> weak_stream(stream_active_);
31✔
873

874
        http::async_read_some(
62✔
875
                socket_,
31✔
876
                request_buffer_,
31✔
877
                http_request_parser_,
878
                [weak_stream](const error_code &ec, size_t num_read) {
31✔
879
                        auto stream = weak_stream.lock();
62✔
880
                        if (stream) {
31✔
881
                                stream->ReadBodyHandler(ec, num_read);
30✔
882
                        }
883
                });
31✔
884

885
        // Call this after scheduling the read above, so that the handler can cancel it if
886
        // necessary.
887
        server_.header_handler_(request_);
31✔
888
}
889

890
void Stream::ReadBodyHandler(const error_code &ec, size_t num_read) {
186✔
891
        if (num_read > 0) {
186✔
892
                logger_.Trace("Read " + to_string(num_read) + " bytes of body data from stream.");
186✔
893
        }
894

895
        if (ec) {
186✔
896
                CallErrorHandler(ec, request_, server_.body_handler_);
×
897
                return;
×
898
        }
899

900
        if (request_->body_writer_ && num_read > 0) {
186✔
901
                auto written =
902
                        request_->body_writer_->Write(body_buffer_.begin(), body_buffer_.begin() + num_read);
97✔
903
                if (!written) {
97✔
904
                        CallErrorHandler(written.error(), request_, server_.body_handler_);
×
905
                        return;
×
906
                } else if (written.value() != num_read) {
97✔
907
                        CallErrorHandler(
×
908
                                error::Error(make_error_condition(errc::io_error), "Short write when writing body"),
×
909
                                request_,
×
910
                                server_.body_handler_);
×
911
                        return;
×
912
                }
913
        } else if (num_read > 0 && !ignored_body_message_issued_) {
89✔
914
                logger_.Debug("Request contains a body, but we are ignoring it");
14✔
915
                ignored_body_message_issued_ = true;
14✔
916
        }
917

918
        if (!http_request_parser_.is_done()) {
186✔
919
                http_request_parser_.get().body().data = body_buffer_.data();
156✔
920
                http_request_parser_.get().body().size = body_buffer_.size();
156✔
921

922
                weak_ptr<Stream> weak_stream(stream_active_);
156✔
923

924
                http::async_read_some(
312✔
925
                        socket_,
156✔
926
                        request_buffer_,
156✔
927
                        http_request_parser_,
928
                        [weak_stream](const error_code &ec, size_t num_read) {
156✔
929
                                auto stream = weak_stream.lock();
312✔
930
                                if (stream) {
156✔
931
                                        stream->ReadBodyHandler(ec, num_read);
156✔
932
                                }
933
                        });
156✔
934
                return;
156✔
935
        }
936

937
        CallBodyHandler();
30✔
938
}
939

940
void Stream::AsyncReply(ReplyFinishedHandler reply_finished_handler) {
166✔
941
        auto response = maybe_response_.lock();
332✔
942
        // Only called from existing responses, so this should always be true.
943
        assert(response);
166✔
944

945
        // From here on we take shared ownership.
946
        response_ = response;
166✔
947

948
        reply_finished_handler_ = reply_finished_handler;
166✔
949

950
        http_response_ = make_shared<http::response<http::buffer_body>>();
166✔
951

952
        for (const auto &header : response->headers_) {
371✔
953
                http_response_->base().set(header.first, header.second);
205✔
954
        }
955

956
        http_response_->result(response->GetStatusCode());
166✔
957
        http_response_->reason(response->GetStatusMessage());
166✔
958

959
        http_response_serializer_ =
960
                make_shared<http::response_serializer<http::buffer_body>>(*http_response_);
166✔
961

962
        weak_ptr<Stream> weak_stream(stream_active_);
166✔
963

964
        http::async_write_header(
332✔
965
                socket_,
166✔
966
                *http_response_serializer_,
166✔
967
                [weak_stream](const error_code &ec, size_t num_written) {
166✔
968
                        auto stream = weak_stream.lock();
332✔
969
                        if (stream) {
166✔
970
                                stream->WriteHeaderHandler(ec, num_written);
165✔
971
                        }
972
                });
166✔
973
}
166✔
974

975
void Stream::WriteHeaderHandler(const error_code &ec, size_t num_written) {
165✔
976
        if (num_written > 0) {
165✔
977
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of header data to stream.");
165✔
978
        }
979

980
        if (ec) {
165✔
981
                CallErrorHandler(ec, request_, reply_finished_handler_);
×
982
                return;
31✔
983
        }
984

985
        auto header = response_->GetHeader("Content-Length");
330✔
986
        if (!header || header.value() == "0") {
165✔
987
                FinishReply();
30✔
988
                return;
30✔
989
        }
990

991
        auto length = common::StringToLongLong(header.value());
135✔
992
        if (!length || length.value() < 0) {
135✔
993
                auto err = error::Error(
994
                        length.error().code, "Content-Length contains invalid number: " + header.value());
×
995
                CallErrorHandler(err, request_, reply_finished_handler_);
×
996
                return;
×
997
        }
998

999
        if (!response_->body_reader_) {
135✔
1000
                auto err = MakeError(BodyMissingError, "Content-Length is non-zero, but body is missing");
2✔
1001
                CallErrorHandler(err, request_, reply_finished_handler_);
1✔
1002
                return;
1✔
1003
        }
1004

1005
        PrepareBufferAndWriteBody();
134✔
1006
}
1007

1008
void Stream::PrepareBufferAndWriteBody() {
1,546✔
1009
        auto read = response_->body_reader_->Read(body_buffer_.begin(), body_buffer_.end());
1,546✔
1010
        if (!read) {
1,546✔
1011
                CallErrorHandler(read.error(), request_, reply_finished_handler_);
×
1012
                return;
×
1013
        }
1014

1015
        http_response_->body().data = body_buffer_.data();
1,546✔
1016
        http_response_->body().size = read.value();
1,546✔
1017

1018
        if (read.value() > 0) {
1,546✔
1019
                http_response_->body().more = true;
1,425✔
1020
        } else {
1021
                // Release ownership of Body reader.
1022
                response_->body_reader_.reset();
121✔
1023
                http_response_->body().more = false;
121✔
1024
        }
1025

1026
        WriteBody();
1,546✔
1027
}
1028

1029
void Stream::WriteBody() {
2,970✔
1030
        weak_ptr<Stream> weak_stream(stream_active_);
2,970✔
1031

1032
        http::async_write_some(
5,940✔
1033
                socket_,
2,970✔
1034
                *http_response_serializer_,
2,970✔
1035
                [weak_stream](const error_code &ec, size_t num_written) {
2,949✔
1036
                        auto stream = weak_stream.lock();
5,898✔
1037
                        if (stream) {
2,949✔
1038
                                stream->WriteBodyHandler(ec, num_written);
2,949✔
1039
                        }
1040
                });
2,949✔
1041
}
2,970✔
1042

1043
void Stream::WriteBodyHandler(const error_code &ec, size_t num_written) {
2,949✔
1044
        if (num_written > 0) {
2,949✔
1045
                logger_.Trace("Wrote " + to_string(num_written) + " bytes of body data to stream.");
1,424✔
1046
        }
1047

1048
        if (ec == http::make_error_code(http::error::need_buffer)) {
2,949✔
1049
                // Write next body block.
1050
                PrepareBufferAndWriteBody();
1,412✔
1051
        } else if (ec) {
1,537✔
1052
                CallErrorHandler(ec, request_, reply_finished_handler_);
1✔
1053
        } else if (num_written > 0) {
1,536✔
1054
                // We are still writing the body.
1055
                WriteBody();
1,424✔
1056
        } else {
1057
                // We are finished.
1058
                FinishReply();
112✔
1059
        }
1060
}
2,949✔
1061

1062
void Stream::FinishReply() {
142✔
1063
        // We are done.
1064
        stream_active_.reset();
142✔
1065
        reply_finished_handler_(error::NoError);
142✔
1066
        server_.RemoveStream(shared_from_this());
142✔
1067
}
142✔
1068

1069
void Stream::CallBodyHandler() {
169✔
1070
        // Release ownership of writer, which closes it if there are no other holders.
1071
        request_->body_writer_.reset();
169✔
1072

1073
        // Get a pointer to ourselves. This is just in case the body handler make a response, which
1074
        // it immediately destroys, which would destroy this stream as well. At the end of this
1075
        // function, it's ok to destroy it.
1076
        auto stream_ref = shared_from_this();
338✔
1077

1078
        server_.body_handler_(request_);
169✔
1079

1080
        // MakeResponse() should have been called inside body handler. It can use this to generate a
1081
        // response, either immediately, or later. Therefore it should still exist, otherwise the
1082
        // request has not been handled correctly.
1083
        auto response = maybe_response_.lock();
338✔
1084
        if (!response) {
169✔
1085
                logger_.Error("Handler produced no response. Closing stream prematurely.");
2✔
1086
                server_.RemoveStream(shared_from_this());
2✔
1087
        }
1088
}
169✔
1089

1090
Server::Server(const ServerConfig &server, events::EventLoop &event_loop) :
141✔
1091
        event_loop_ {event_loop},
1092
        acceptor_(GetAsioIoContext(event_loop_)) {
141✔
1093
}
141✔
1094

1095
Server::~Server() {
141✔
1096
        Cancel();
141✔
1097
}
141✔
1098

1099
error::Error Server::Setup(
141✔
1100
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1101
        auto err = BreakDownUrl(url, address_);
282✔
1102
        if (error::NoError != err) {
141✔
1103
                return MakeError(InvalidUrlError, "Could not parse URL " + url + ": " + err.String());
×
1104
        }
1105

1106
        if (address_.protocol != "http") {
141✔
1107
                return error::Error(make_error_condition(errc::protocol_not_supported), address_.protocol);
×
1108
        }
1109

1110
        if (address_.path.size() > 0 && address_.path != "/") {
141✔
1111
                return MakeError(InvalidUrlError, "URLs with paths are not supported when listening.");
1✔
1112
        }
1113

1114
        header_handler_ = header_handler;
140✔
1115
        body_handler_ = body_handler;
140✔
1116

1117
        return error::NoError;
140✔
1118
}
1119

1120
error::Error Server::Start() {
119✔
1121
        boost::system::error_code ec;
119✔
1122
        auto address = asio::ip::make_address(address_.host, ec);
119✔
1123
        if (ec) {
119✔
1124
                return error::Error(
1125
                        ec.default_error_condition(),
×
1126
                        "Could not construct endpoint from address " + address_.host);
×
1127
        }
1128

1129
        asio::ip::tcp::endpoint endpoint(address, address_.port);
119✔
1130

1131
        ec.clear();
119✔
1132
        acceptor_.open(endpoint.protocol(), ec);
119✔
1133
        if (ec) {
119✔
1134
                return error::Error(ec.default_error_condition(), "Could not open acceptor");
×
1135
        }
1136

1137
        // Allow address reuse, otherwise we can't re-bind later.
1138
        ec.clear();
119✔
1139
        acceptor_.set_option(asio::socket_base::reuse_address(true), ec);
119✔
1140
        if (ec) {
119✔
1141
                return error::Error(ec.default_error_condition(), "Could not set socket options");
×
1142
        }
1143

1144
        ec.clear();
119✔
1145
        acceptor_.bind(endpoint, ec);
119✔
1146
        if (ec) {
119✔
1147
                return error::Error(ec.default_error_condition(), "Could not bind socket");
×
1148
        }
1149

1150
        ec.clear();
119✔
1151
        acceptor_.listen(asio::socket_base::max_listen_connections, ec);
119✔
1152
        if (ec) {
119✔
1153
                return error::Error(ec.default_error_condition(), "Could not start listening");
×
1154
        }
1155

1156
        PrepareNewStream();
119✔
1157

1158
        return error::NoError;
119✔
1159
}
1160

1161
error::Error Server::AsyncServeUrl(
119✔
1162
        const string &url, RequestHandler header_handler, RequestHandler body_handler) {
1163
        auto err = Setup(url, header_handler, body_handler);
357✔
1164
        if (err != error::NoError) {
119✔
1165
                return err;
1✔
1166
        }
1167
        return Start();
118✔
1168
}
1169

1170
void Server::Cancel() {
143✔
1171
        if (acceptor_.is_open()) {
143✔
1172
                acceptor_.cancel();
119✔
1173
                acceptor_.close();
119✔
1174
        }
1175
        streams_.clear();
143✔
1176
}
143✔
1177

1178
void Server::PrepareNewStream() {
291✔
1179
        StreamPtr new_stream {new Stream(*this)};
291✔
1180
        streams_.insert(new_stream);
291✔
1181
        AsyncAccept(new_stream);
291✔
1182
}
291✔
1183

1184
void Server::AsyncAccept(StreamPtr stream) {
291✔
1185
        acceptor_.async_accept(stream->socket_, [this, stream](const error_code &ec) {
291✔
1186
                if (ec) {
174✔
1187
                        log::Error("Could not accept connection: " + ec.message());
2✔
1188
                        return;
2✔
1189
                }
1190

1191
                stream->AcceptHandler(ec);
172✔
1192

1193
                this->PrepareNewStream();
172✔
1194
        });
1195
}
291✔
1196

1197
void Server::RemoveStream(const StreamPtr &stream) {
148✔
1198
        streams_.erase(stream);
148✔
1199
}
148✔
1200

1201
} // namespace http
1202
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc