• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / cyhy-feeds / 7350888636

16 Nov 2023 03:30PM UTC coverage: 0.0%. Remained the same
7350888636

push

github

web-flow
Merge pull request #49 from cisagov/improvement/always-capture-new-tickets

Change tickets query to include newly-created tickets

0 of 5 new or added lines in 2 files covered. (0.0%)

1 existing line in 1 file now uncovered.

0 of 315 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/aws_jobs/cyhy-data-retriever.py
1
#!/usr/bin/env python3
2
"""Retrieve a compressed, encrypted, signed extract file and verify/decrypt/uncompress it.
×
3

4
NOTES:
5
 * The python modules below must be installed for the script to work
6
 * This script expects to operate on a GPG-encrypted bzip2 tar file: e.g. filename.tbz.gpg
7

8
Usage:
9
  COMMAND_NAME [-v | --verbose] [--filename EXTRACT_FILENAME] [-a | --aws] --config CONFIG_FILE
10
  COMMAND_NAME (-h | --help)
11
  COMMAND_NAME --version
12

13
Options:
14
  -h --help                                         Show this screen
15
  --version                                         Show version
16
  -f EXTRACT_FILENAME --filename=EXTRACT_FILENAME   Name of extract file to retrieve
17
  -v --verbose                                      Show verbose output
18
  -c CONFIG_FILE --config=CONFIG_FILE               Configuration file for this script
19
  -a --aws                                          Output results to s3 bucket
20

21
"""
22

23
# Standard Python Libraries
24
from configparser import ConfigParser
×
25
from datetime import datetime
×
26
import re
×
27
import sys
×
28
import tarfile
×
29

30
# Third-Party Libraries
31
import boto3  # pip install boto3
×
32
import botocore  # pip install botocore
×
33
from dateutil.relativedelta import relativedelta
×
34
import dateutil.tz as tz
×
35
from docopt import docopt
×
36
import gnupg
×
37

38
BUCKET_NAME = "ncats-moe-data"
×
39
DOMAIN = "ncats-moe-data"
×
40

41

42
def main():
×
43
    """Process a provided file to decrypt and extract the contents of a cyhy-data-extract run."""
44
    global __doc__
45
    __doc__ = re.sub("COMMAND_NAME", __file__, __doc__)
×
46
    args = docopt(__doc__, version="0.0.5-rc.1")
×
47
    now = datetime.now(tz.tzutc())
×
48

49
    # Read parameters in from config file
50
    config = ConfigParser()
×
51
    config.read([args["--config"]])
×
52
    GNUPG_HOME = config.get("DEFAULT", "GNUPG_HOME")
×
53
    GPG_DECRYPTION_PASSPHRASE = config.get("DEFAULT", "GPG_DECRYPTION_PASSPHRASE")
×
54
    AWS_ACCESS_KEY_ID = config.get("DEFAULT", "AWS_ACCESS_KEY_ID")
×
55
    AWS_SECRET_ACCESS_KEY = config.get("DEFAULT", "AWS_SECRET_ACCESS_KEY")
×
56

57
    # Set up name of file to retrieve
58
    if args["--filename"]:  # If extract filename is provided, use that
×
59
        extract_filename = args["--filename"]
×
60
    else:
61
        # Otherwise, look for the most-recent daily file; this must
62
        # change if we start generating more than one file per day
63
        today = now + relativedelta(hour=0, minute=0, second=0, microsecond=0)
×
64
        extract_filename = "cyhy_extract_{!s}.tbz.gpg".format(
×
65
            today.isoformat().replace(":", "").split(".")[0]
66
        )
67

68
    # Download extract file from s3
69
    if args["--aws"]:
×
70
        s3 = boto3.client(
×
71
            "s3",
72
            aws_access_key_id=AWS_ACCESS_KEY_ID,
73
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
74
        )
75

76
        try:
×
77
            s3.download_file(BUCKET_NAME, extract_filename, extract_filename)
×
78
        except botocore.exceptions.ClientError as e:
×
79
            print("An Error occured accessing the AWS bucket: {!s}".format(e))
×
80

81
    # Set filename for decrypted output
82
    if extract_filename[-4:] == ".gpg":
×
83
        decrypted_filename = extract_filename.split(".gpg")[0]
×
84
    else:  # If extract_filename doesn't end in '.gpg'
85
        decrypted_filename = extract_filename + "_decrypted"
×
86

87
    # Use GPG to verify & decrypt extract file
88
    #
89
    # IMPORTANT: To pass in the passphrase for decryption,
90
    # gpg-agent.conf in GNUPG_HOME must have: allow-loopback-pinentry
91
    gpg = gnupg.GPG(
×
92
        gpgbinary="gpg2",
93
        gnupghome=GNUPG_HOME,
94
        verbose=args["--verbose"],
95
        options=["--pinentry-mode", "loopback"],
96
    )
97
    extract_stream = open(extract_filename, "rb")
×
98
    decrypted_data = gpg.decrypt_file(
×
99
        extract_stream, passphrase=GPG_DECRYPTION_PASSPHRASE, output=decrypted_filename
100
    )
101
    extract_stream.close()
×
102

103
    if not decrypted_data.ok:
×
104
        print(
×
105
            "\nFAILURE - GPG DECRYPTION ERROR!\n GPG status: {} \n GPG stderr:\n{}".format(
106
                decrypted_data.status, decrypted_data.stderr
107
            )
108
        )
109
        sys.exit(-1)
×
110

111
    print(
×
112
        "Encrypted file {} successfully decrypted to file: {}".format(
113
            extract_filename, decrypted_filename
114
        )
115
    )
116

117
    # Uncompress the tar/bzip2 (.tbz) file (decrypted_filename)
118
    tar = tarfile.open(decrypted_filename)
×
119
    tar_membernames = tar.getnames()
×
120
    if tar_membernames:
×
121
        print("Extracting files:")
×
122
        for f in tar_membernames:
×
123
            print(" {}".format(f))
×
124
        # B202: https://github.com/PyCQA/bandit/blob/main/bandit/plugins/tarfile_unsafe_members.py
NEW
125
        tar.extractall()  # nosec B202 - this archive is trusted
×
UNCOV
126
        print("Decrypted file {} successfully uncompressed".format(decrypted_filename))
×
127
    else:
128
        print(
×
129
            "\nFAILURE - ERROR UNCOMPRESSING!\n Expected .tbz file; Invalid tar data found in {}".format(
130
                decrypted_filename
131
            )
132
        )
133
        sys.exit(-1)
×
134
    tar.close()
×
135

136
    # Shell equivalent for decrypt/uncompress:
137
    # gpg -d extract_filename | tar xj
138

139
    # For debugging:
140
    # import IPython; IPython.embed()
141
    # sys.exit(0)
142

143
    print("\nSUCCESS!")
×
144
    sys.exit(1)
×
145

146

147
if __name__ == "__main__":
148
    main()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc