• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / mender-auth-azure-iot / 8278283814

17 Nov 2021 07:12PM UTC coverage: 34.239%. Remained the same
8278283814

push

coveralls-python

web-flow
Merge pull request #7 from oleorhagen/MEN-5141-bak

MEN-5141: Implement the Device Twin sync with IoT Hub

63 of 181 new or added lines in 5 files covered. (34.81%)

63 of 184 relevant lines covered (34.24%)

0.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.88
/src/daemon/scripts/aggregator.py
1
# Copyright 2021 Northern.tech AS
2
#
3
#    Licensed under the Apache License, Version 2.0 (the "License");
4
#    you may not use this file except in compliance with the License.
5
#    You may obtain a copy of the License at
6
#
7
#        http://www.apache.org/licenses/LICENSE-2.0
8
#
9
#    Unless required by applicable law or agreed to in writing, software
10
#    distributed under the License is distributed on an "AS IS" BASIS,
11
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
#    See the License for the specific language governing permissions and
13
#    limitations under the License.
14
import logging
1✔
15
import subprocess
1✔
16
from typing import Dict
1✔
17

18
log = logging.getLogger(__name__)
1✔
19

20

21
class ScriptKeyValueAggregator:
1✔
22
    """Handles the parsing of the output from any Mender key-value scripts.
23

24
    These scripts support key=value pairs, with one output per line maximum.
25
    Multiple lines with a matching key are aggregated into an array."""
26

27
    def __init__(self, script_path: str):
1✔
28
        self.script_path = script_path
1✔
29
        self.vals: Dict[str, str] = {}
1✔
30

31
    def run(self) -> dict:
1✔
32
        try:
1✔
33
            output = subprocess.run(
1✔
34
                self.script_path,
35
                stdout=subprocess.PIPE,
36
                stderr=subprocess.PIPE,
37
                timeout=100,
38
                check=True,
39
            )
40
            data = output.stdout.decode()
1✔
41
            return self.parse(data)
1✔
NEW
42
        except subprocess.CalledProcessError as e:
×
NEW
43
            errout = ", stderr: " + e.stderr.decode() if e.stderr else ""
×
NEW
44
            log.error(
×
45
                f"Failed to aggregate key-value pairs from {self.script_path}.\
46
                Script returned: {e.returncode}{errout}"
47
            )
NEW
48
            return {}
×
49

50
    def collect(self) -> Dict[str, str]:
1✔
NEW
51
        with open(self.script_path) as fh:
×
NEW
52
            data = fh.read()
×
NEW
53
            return self.parse(data)
×
54

55
    def parse(self, data: str) -> Dict[str, str]:
1✔
56
        for line in data.split("\n"):
1✔
57
            if line == "":
1✔
58
                continue
1✔
59
            arr = line.strip().split("=", 1)
1✔
60
            if len(arr) < 2:
1✔
NEW
61
                log.debug("Skipping line without output")
×
NEW
62
                continue
×
63
            key, val = arr[0], arr[1]
1✔
64
            self.vals[key] = val
1✔
65
        return self.vals
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc