• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender / 1928812091

14 Jul 2025 06:22AM UTC coverage: 75.909% (-0.01%) from 75.919%
1928812091

push

gitlab-ci

web-flow
Merge pull request #1802 from mendersoftware/dependabot/submodules/src/common/vendor/json-d33ecd3

chore: bump src/common/vendor/json from `b451735` to `d33ecd3`

7373 of 9713 relevant lines covered (75.91%)

11152.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.77
/src/mender-update/update_module/v3/update_module_download.cpp
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
#include <mender-update/update_module/v3/update_module.hpp>
16

17
#include <mender-update/progress_reader/progress_reader.hpp>
18

19
#include <common/events.hpp>
20
#include <common/events_io.hpp>
21
#include <common/log.hpp>
22
#include <common/path.hpp>
23
#include <common/processes.hpp>
24

25
namespace mender {
26
namespace update {
27
namespace update_module {
28
namespace v3 {
29

30
namespace log = mender::common::log;
31
namespace path = mender::common::path;
32
namespace processes = mender::common::processes;
33
namespace progress = mender::update::progress;
34

35

36
void UpdateModule::StartDownloadProcess() {
116✔
37
        string download_command = "Download";
116✔
38
        if (download_->downloading_with_sizes_) {
116✔
39
                download_command = "DownloadWithFileSizes";
3✔
40
        }
41
        log::Debug(
116✔
42
                "Calling Update Module with command `" + update_module_path_ + " " + download_command + " "
232✔
43
                + update_module_workdir_ + "`.");
348✔
44
        download_->proc_ = make_shared<procs::Process>(
116✔
45
                vector<string> {update_module_path_, download_command, update_module_workdir_});
580✔
46

47
        download_->proc_->SetWorkDir(update_module_workdir_);
48

49
        auto err = PrepareStreamNextPipe();
116✔
50
        if (err != error::NoError) {
116✔
51
                DownloadErrorHandler(err);
×
52
                return;
53
        }
54

55
        processes::OutputHandler stdout_handler {"Update Module output (stdout): "};
116✔
56
        processes::OutputHandler stderr_handler {"Update Module output (stderr): "};
116✔
57

58
        err = download_->proc_->Start(stdout_handler, stderr_handler);
348✔
59
        if (err != error::NoError) {
116✔
60
                DownloadErrorHandler(GetProcessError(err));
×
61
                return;
×
62
        }
63

64
        err = download_->proc_->AsyncWait(
116✔
65
                download_->event_loop_,
66
                [this](error::Error err) {
230✔
67
                        if (err.code == make_error_condition(errc::timed_out)) {
115✔
68
                                DownloadTimeoutHandler();
2✔
69
                        } else {
70
                                ProcessEndedHandler(err);
226✔
71
                        }
72
                },
115✔
73
                chrono::seconds(ctx_.GetConfig().module_timeout_seconds));
116✔
74
        if (err != error::NoError) {
116✔
75
                DownloadErrorHandler(err);
×
76
                return;
77
        }
78

79
        DownloadErrorHandler(OpenStreamNextPipe(
232✔
80
                [this](io::ExpectedAsyncWriterPtr writer) { StreamNextOpenHandler(writer); }));
250✔
81
}
82

83
void UpdateModule::StreamNextOpenHandler(io::ExpectedAsyncWriterPtr writer) {
13✔
84
        if (!writer) {
13✔
85
                DownloadErrorHandler(writer.error());
×
86
                return;
3✔
87
        }
88
        download_->stream_next_writer_ = writer.value();
13✔
89

90
        download_->module_has_started_download_ = true;
13✔
91

92
        auto reader = download_->payload_.Next();
13✔
93
        if (!reader) {
13✔
94
                if (reader.error().code
3✔
95
                        == artifact::parser_error::MakeError(
3✔
96
                                   artifact::parser_error::NoMorePayloadFilesError, "")
6✔
97
                                   .code) {
98
                        download_->module_has_finished_download_ = true;
3✔
99
                        log::Debug("Update Module finished all downloads");
6✔
100
                        EndStreamNext();
3✔
101
                } else {
102
                        DownloadErrorHandler(reader.error());
×
103
                }
104
                return;
3✔
105
        }
106
        auto payload_reader = make_shared<artifact::Reader>(std::move(reader.value()));
10✔
107

108
        auto progress_reader = make_shared<progress::Reader>(payload_reader, payload_reader->Size());
10✔
109

110
        download_->current_payload_reader_ =
111
                make_shared<events::io::AsyncReaderFromReader>(download_->event_loop_, progress_reader);
20✔
112
        download_->current_payload_name_ = payload_reader->Name();
20✔
113
        download_->current_payload_size_ = payload_reader->Size();
10✔
114

115
        auto stream_path =
116
                path::Join(update_module_workdir_, string("streams"), download_->current_payload_name_);
20✔
117
        auto err = PrepareAndOpenStreamPipe(
118
                stream_path, [this](io::ExpectedAsyncWriterPtr writer) { StreamOpenHandler(writer); });
22✔
119
        if (err != error::NoError) {
10✔
120
                DownloadErrorHandler(err);
×
121
                return;
122
        }
123

124
        string stream_next_string;
125
        if (download_->downloading_with_sizes_) {
10✔
126
                stream_next_string = path::Join("streams", download_->current_payload_name_) + " "
2✔
127
                                                         + to_string(download_->current_payload_size_);
4✔
128
        } else {
129
                stream_next_string = path::Join("streams", download_->current_payload_name_);
18✔
130
        }
131
        size_t entry_size = stream_next_string.size() + 1;
10✔
132
        if (entry_size > download_->buffer_.size()) {
10✔
133
                DownloadErrorHandler(error::Error(
×
134
                        make_error_condition(errc::no_buffer_space), "Payload name is too large for buffer"));
×
135
                return;
136
        }
137
        copy(stream_next_string.begin(), stream_next_string.end(), download_->buffer_.begin());
10✔
138
        download_->buffer_[entry_size - 1] = '\n';
10✔
139
        DownloadErrorHandler(download_->stream_next_writer_->AsyncWrite(
10✔
140
                download_->buffer_.begin(),
141
                download_->buffer_.begin() + entry_size,
142
                [this, entry_size](io::ExpectedSize result) {
9✔
143
                        StreamNextWriteHandler(entry_size, result);
9✔
144
                }));
29✔
145
}
146

147
void UpdateModule::StreamOpenHandler(io::ExpectedAsyncWriterPtr writer) {
6✔
148
        if (!writer) {
6✔
149
                DownloadErrorHandler(writer.error());
×
150
                return;
×
151
        }
152
        download_->current_stream_writer_ = writer.value();
6✔
153

154
        DownloadErrorHandler(download_->current_payload_reader_->AsyncRead(
12✔
155
                download_->buffer_.begin(), download_->buffer_.end(), [this](io::ExpectedSize result) {
6✔
156
                        PayloadReadHandler(result);
6✔
157
                }));
18✔
158
}
159

160
void UpdateModule::StreamNextWriteHandler(size_t expected_n, io::ExpectedSize result) {
9✔
161
        // Close stream-next writer.
162
        download_->stream_next_writer_.reset();
9✔
163
        if (!result) {
9✔
164
                DownloadErrorHandler(result.error());
×
165
        } else if (expected_n != result.value()) {
9✔
166
                DownloadErrorHandler(error::Error(
×
167
                        make_error_condition(errc::io_error),
×
168
                        "Unexpected number of written bytes to stream-next"));
×
169
        }
170
}
9✔
171

172
void UpdateModule::PayloadReadHandler(io::ExpectedSize result) {
635✔
173
        if (!result) {
635✔
174
                // Close streams.
175
                download_->current_stream_writer_.reset();
×
176
                download_->current_payload_reader_.reset();
×
177
                DownloadErrorHandler(result.error());
×
178
        } else if (result.value() > 0) {
635✔
179
                DownloadErrorHandler(download_->current_stream_writer_->AsyncWrite(
574✔
180
                        download_->buffer_.begin(),
181
                        download_->buffer_.begin() + result.value(),
574✔
182
                        [this, result](io::ExpectedSize write_result) {
1,722✔
183
                                StreamWriteHandler(0, result.value(), write_result);
574✔
184
                        }));
2,296✔
185
        } else {
186
                // Close streams.
187
                download_->current_stream_writer_.reset();
61✔
188
                download_->current_payload_reader_.reset();
61✔
189

190
                if (download_->downloading_to_files_) {
61✔
191
                        StartDownloadToFile();
56✔
192
                } else {
193
                        DownloadErrorHandler(OpenStreamNextPipe(
10✔
194
                                [this](io::ExpectedAsyncWriterPtr writer) { StreamNextOpenHandler(writer); }));
18✔
195
                }
196
        }
197
}
635✔
198

199
void UpdateModule::StreamWriteHandler(size_t offset, size_t expected_n, io::ExpectedSize result) {
574✔
200
        if (!result) {
574✔
201
                DownloadErrorHandler(result.error());
1✔
202
        } else if (result.value() == 0 || result.value() > expected_n) {
573✔
203
                DownloadErrorHandler(error::Error(
×
204
                        make_error_condition(errc::io_error),
×
205
                        "Unexpected number of written bytes to download stream"));
×
206
        } else if (result.value() < expected_n) {
573✔
207
                auto new_offset = offset + result.value();
×
208
                auto new_expected = expected_n - result.value();
×
209
                DownloadErrorHandler(download_->current_stream_writer_->AsyncWrite(
×
210
                        download_->buffer_.begin() + new_offset,
211
                        download_->buffer_.begin() + new_offset + new_expected,
212
                        [this, new_offset, new_expected](io::ExpectedSize write_result) {
×
213
                                StreamWriteHandler(new_offset, new_expected, write_result);
×
214
                        }));
×
215
        } else {
216
                download_->written_ += result.value();
573✔
217
                log::Trace("Wrote " + to_string(download_->written_) + " bytes to Update Module");
1,146✔
218
                DownloadErrorHandler(download_->current_payload_reader_->AsyncRead(
1,146✔
219
                        download_->buffer_.begin(), download_->buffer_.end(), [this](io::ExpectedSize result) {
573✔
220
                                PayloadReadHandler(result);
573✔
221
                        }));
1,719✔
222
        }
223
}
574✔
224

225
void UpdateModule::EndStreamNext() {
3✔
226
        // Empty write.
227
        DownloadErrorHandler(download_->stream_next_writer_->AsyncWrite(
3✔
228
                download_->buffer_.begin(), download_->buffer_.begin(), [this](io::ExpectedSize result) {
6✔
229
                        if (!result) {
3✔
230
                                DownloadErrorHandler(result.error());
×
231
                        } else {
232
                                DownloadErrorHandler(error::NoError);
3✔
233
                        }
234
                        // Close writer.
235
                        download_->stream_next_writer_.reset();
3✔
236
                        // No further action necessary. Now we just need to wait for the process to finish.
237
                }));
6✔
238
}
3✔
239

240
void UpdateModule::DownloadErrorHandler(const error::Error &err) {
1,356✔
241
        if (err != error::NoError) {
1,356✔
242
                EndDownloadLoop(err);
10✔
243
        }
244
}
1,356✔
245

246
void UpdateModule::EndDownloadLoop(const error::Error &err) {
116✔
247
        download_->download_finished_handler_(err);
116✔
248
}
116✔
249

250
void UpdateModule::DownloadTimeoutHandler() {
2✔
251
        download_->proc_->EnsureTerminated();
2✔
252
        EndDownloadLoop(error::Error(
2✔
253
                make_error_condition(errc::timed_out), "Update Module Download process timed out"));
4✔
254
}
2✔
255

256
void UpdateModule::ProcessEndedHandler(error::Error err) {
113✔
257
        if (err != error::NoError) {
113✔
258
                err = GetProcessError(err);
5✔
259
                DownloadErrorHandler(error::Error(
10✔
260
                        err.code, "Download: Update Module returned non-zero status: " + err.message));
10✔
261
        } else if (download_->module_has_finished_download_) {
108✔
262
                EndDownloadLoop(error::NoError);
3✔
263
        } else if (download_->module_has_started_download_) {
105✔
264
                DownloadErrorHandler(error::Error(
3✔
265
                        make_error_condition(errc::broken_pipe),
6✔
266
                        "Update Module started downloading, but did not finish"));
6✔
267
        } else {
268
                download_->downloading_to_files_ = true;
102✔
269
                download_->stream_next_opener_.reset();
102✔
270
                download_->current_stream_opener_.reset();
102✔
271
                err = DeleteStreamsFiles();
102✔
272
                if (err != error::NoError) {
102✔
273
                        DownloadErrorHandler(err);
×
274
                } else {
275
                        StartDownloadToFile();
102✔
276
                }
277
        }
278
}
113✔
279

280
void UpdateModule::StartDownloadToFile() {
158✔
281
        auto reader = download_->payload_.Next();
158✔
282
        if (!reader) {
158✔
283
                if (reader.error().code
101✔
284
                        == artifact::parser_error::MakeError(
101✔
285
                                   artifact::parser_error::NoMorePayloadFilesError, "")
202✔
286
                                   .code) {
287
                        log::Debug("Downloaded all files to `files` directory.");
202✔
288
                        EndDownloadLoop(error::NoError);
101✔
289
                } else {
290
                        DownloadErrorHandler(reader.error());
×
291
                }
292
                return;
101✔
293
        }
294
        auto payload_reader = make_shared<artifact::Reader>(std::move(reader.value()));
57✔
295
        download_->current_payload_reader_ =
296
                make_shared<events::io::AsyncReaderFromReader>(download_->event_loop_, payload_reader);
114✔
297
        download_->current_payload_name_ = payload_reader->Name();
57✔
298

299
        auto stream_path = path::Join(update_module_workdir_, string("files"));
114✔
300
        auto err = PrepareDownloadDirectory(stream_path);
57✔
301
        if (err != error::NoError) {
57✔
302
                DownloadErrorHandler(err);
×
303
                return;
304
        }
305

306
        stream_path = path::Join(stream_path, download_->current_payload_name_);
114✔
307

308
        auto current_stream_writer =
309
                make_shared<events::io::AsyncFileDescriptorWriter>(download_->event_loop_);
57✔
310
        err = current_stream_writer->Open(stream_path);
57✔
311
        if (err != error::NoError) {
57✔
312
                DownloadErrorHandler(err);
1✔
313
                return;
314
        }
315
        download_->current_stream_writer_ = current_stream_writer;
316

317
        DownloadErrorHandler(download_->current_payload_reader_->AsyncRead(
112✔
318
                download_->buffer_.begin(), download_->buffer_.end(), [this](io::ExpectedSize result) {
56✔
319
                        PayloadReadHandler(result);
56✔
320
                }));
168✔
321
}
322

323
} // namespace v3
324
} // namespace update_module
325
} // namespace update
326
} // namespace mender
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc