• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / manage-cyhy-ops / 5535175130

pending completion
5535175130

Pull #28

github

web-flow
Merge ddfd20d37 into eee405190
Pull Request #28: ⚠️ CONFLICT! Lineage pull request for: skeleton

4 of 45 branches covered (8.89%)

Branch coverage included in aggregate %.

1 of 1 new or added line in 1 file covered. (100.0%)

17 existing lines in 1 file now uncovered.

32 of 153 relevant lines covered (20.92%)

1.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.24
/src/manage_cyhy_ops/cli.py
1
"""Manage CyHy operator data SSM parameters.
2

3
Usage:
4
  manage-cyhy-ops [--debug] [--regions=REGIONS] [--ssm-cyhy-ops=CYHY_OPS]
5
    [--ssm-ssh-prefix=SSH_PREFIX] add [--overwrite] [--username=USERNAME] SSH_KEY
6
  manage-cyhy-ops [--debug] [--regions=REGIONS] [--ssm-cyhy-ops=CYHY_OPS]
7
    [--ssm-ssh-prefix=SSH_PREFIX] remove [--full] USERNAME
8
  manage-cyhy-ops [--debug] [--regions=REGIONS] [--ssm-cyhy-ops=CYHY_OPS]
9
    [--ssm-ssh-prefix=SSH_PREFIX] list USERNAME
10
  manage-cyhy-ops (-h | --help)
11
  manage-cyhy-ops --version
12

13
Arguments:
14
  SSH_KEY   An SSH key in the format "ssh-ed25519 <SSH key> <comment>".
15
            Note: If a username is not provided with --username, then the
16
            SSH key comment should be a username in the format "firstname.lastname".
17
  USERNAME  A username in the format "firstname.lastname".
18

19
Options:
20
  -h --help                    Show this message.
21
  --version                    Show version.
22
  --debug                      Enable debug messages.
23
  --regions=REGIONS            Comma delimited list of AWS regions to use.
24
                               [default: us-east-1,us-east-2,us-west-1,us-west-2]
25
  --ssm-cyhy-ops=CYHY_OPS      The key that is used in the Parameter Store for
26
                               the list of active CyHy Operators.
27
                               [default: /cyhy/ops/users]
28
  --ssm-ssh-prefix=SSH_PREFIX  The prefix to use when working with SSH keys
29
                               stored in the Parameter Store.
30
                               [default: /ssh/public_keys]
31
  --overwrite                  Overwrite the user's SSH key if one already exists.
32
  --full                       Also remove the user's SSH key from SSM.
33
"""
34

35
# Standard Python Libraries
36
import logging
6✔
37
import sys
6✔
38
from typing import Any, Dict, List
6✔
39

40
# Third-Party Libraries
41
import docopt
6✔
42

43
# No readily available stubs/typing package appears to exist.
44
from schema import And, Or, Regex, Schema, SchemaError, Use  # type: ignore
6✔
45

46
from ._version import __version__
6✔
47
from .manageoperators import ManageOperators
6✔
48

49
ALLOWED_REGIONS = ["us-east-1", "us-east-2", "us-west-1", "us-west-2"]
6✔
50
# AWS only allows these characters in Parameter Store keys.
51
VALID_SSM_CHARS = r"^[a-zA-Z0-9.\-_/]*$"
6✔
52
# Subset of valid characters we'll allow for usernames.
53
VALID_USERNAME = r"^[a-zA-Z0-9.\-_]*$"
6✔
54
USERNAME_ERROR_MSG = (
6✔
55
    'Username must be in the format "firstname.lastname", and can only consist '
56
    'of letters, numbers, and the characters ".-_".'
57
)
58
# The Schema for an SSM key is used more than once so we define it here.
59
SSM_KEY_VALIDATE = And(
6!
60
    str,
61
    Use(str.lower),
62
    Regex(VALID_SSM_CHARS),
63
    lambda s: s[0] == "/" if "/" in s else True,
64
    error="Invalid SSM key provided.",
65
)
66
# The Schema for a username is used in multiple places so we define it here.
67
USERNAME_VALIDATE = Or(
6!
68
    None,
69
    And(
70
        str,
71
        Use(str.lower),
72
        Regex(VALID_USERNAME),
73
        lambda s: len(s.split(".")) >= 2,
74
        error=f"USER {USERNAME_ERROR_MSG}",
75
    ),
76
)
77

78

79
def main() -> None:
6✔
80
    """Provide an interface to manage CyHy operators."""
81
    args: Dict[str, str] = docopt.docopt(__doc__, version=__version__)
6✔
82

UNCOV
83
    schema: Schema = Schema(
×
84
        {
85
            "--regions": And(
86
                str,
87
                lambda s: False
88
                not in map(lambda r: r in ALLOWED_REGIONS, s.split(",")),
89
                error=f"Invalid region(s) provided. Valid regions are: {ALLOWED_REGIONS}",
90
            ),
91
            "--ssm-ssh-prefix": SSM_KEY_VALIDATE,
92
            "--ssm-cyhy-ops": SSM_KEY_VALIDATE,
93
            "--username": USERNAME_VALIDATE,
94
            "SSH_KEY": Or(
95
                None,
96
                And(
97
                    str,
98
                    lambda s: len(s.split(" ")) == 3,
99
                    error="Invalid SSH key format.",
100
                ),
101
            ),
102
            "USERNAME": USERNAME_VALIDATE,
103
            str: object,  # No other validation to perform.
104
        }
105
    )
106

107
    try:
×
UNCOV
108
        validated_args: Dict[str, Any] = schema.validate(args)
×
109
    except SchemaError as err:
×
110
        # Exit because one or more of the arguments were invalid.
UNCOV
111
        print(err, file=sys.stderr)
×
UNCOV
112
        sys.exit(1)
×
113

114
    # Set up logging.
UNCOV
115
    log_level: str = "DEBUG" if validated_args["--debug"] else "INFO"
×
UNCOV
116
    logging.basicConfig(
×
117
        format="%(asctime)-15s %(levelname)s %(message)s", level=log_level
118
    )
119

120
    logging.debug(validated_args)
×
121

122
    try:
×
123
        regions: List = validated_args["--regions"].split(",")
×
124
        cyhy_ops: str = validated_args["--ssm-cyhy-ops"]
×
125
        ssh_prefix: str = validated_args["--ssm-ssh-prefix"]
×
126
        managers: List[ManageOperators] = []
×
127
        for region in regions:
×
128
            managers.append(ManageOperators(region, cyhy_ops, ssh_prefix))
×
129
    except Exception as err:
×
UNCOV
130
        logging.error(err)
×
131
        sys.exit(1)
×
132

133
    username = validated_args["USERNAME"]
×
UNCOV
134
    overwrite_ssh_key = validated_args["--overwrite"]
×
135
    delete_ssh_key = validated_args["--full"]
×
136

137
    results: List[int] = []
×
UNCOV
138
    if validated_args["add"]:
×
139
        ssh_key: str = validated_args["SSH_KEY"]
×
140

UNCOV
141
        if validated_args["--username"]:
×
142
            username = validated_args["--username"]
×
143
        else:
144
            logging.debug("Using SSH key comment as username.")
×
145
            try:
×
146
                username = Schema(USERNAME_VALIDATE).validate(ssh_key.split(" ")[2])
×
147
            except SchemaError as err:
×
UNCOV
148
                logging.error(err)
×
149
                sys.exit(1)
×
150

UNCOV
151
        for manager in managers:
×
UNCOV
152
            results.append(
×
153
                manager.add_user(username, ssh_key, overwrite=overwrite_ssh_key)
154
            )
155
    elif validated_args["remove"]:
×
156
        for manager in managers:
×
157
            results.append(manager.remove_user(username, delete_ssh_key))
×
158
    elif validated_args["list"]:
×
UNCOV
159
        for manager in managers:
×
160
            results.append(manager.check_user(username))
×
161
    else:
UNCOV
162
        logging.info("Feature not implemented yet.")
×
163

164
    # Right now all return statuses from the Manager are 1, but that is not
165
    # guaranteed in the future. This handles any non-successful error code.
UNCOV
166
    if True in map(lambda e: e != 0, results):
×
UNCOV
167
        sys.exit(1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc