• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / cyhy-feeds / 4898762099

pending completion
4898762099

Pull #43

github

GitHub
Merge 0ea5ebb45 into 2806ef9f6
Pull Request #43: Enforce that all scripts use Python 3 rather than Python 2

0 of 10 new or added lines in 3 files covered. (0.0%)

0 of 315 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/aws_jobs/cyhy-data-extract.py
1
#!/usr/bin/env python3
2
"""Create compressed, encrypted, signed extract file with Federal CyHy data for integration with the Weathermap project.
×
3

4
Usage:
5
  COMMAND_NAME --config CONFIG_FILE [--cyhy-config CYHY_CONFIG] [--scan-config SCAN_CONFIG] [--assessment-config ASSESSMENT_CONFIG] [-v | --verbose] [-a | --aws ] [--cleanup-aws] [--date DATE] [--debug]
6
  COMMAND_NAME (-h | --help)
7
  COMMAND_NAME --version
8

9
Options:
10
  -h --help                                                         Show this screen
11
  --version                                                         Show version
12
  -x CYHY_CONFIG --cyhy-config=CYHY_CONFIG                          CyHy MongoDB configuration to use
13
  -y SCAN_CONFIG --scan-config=SCAN_CONFIG                          Scan MongoDB configuration to use
14
  -z ASSESSMENT_CONFIG --assessment-config=ASSESSMENT_CONFIG        Assessment MongoDB configuration to use
15
  -v --verbose                                                      Show verbose output
16
  -a --aws                                                          Output results to S3 bucket
17
  --cleanup-aws                                                     Delete old files from the S3 bucket
18
  -c CONFIG_FILE --config=CONFIG_FILE                               Configuration file for this script
19
  -d DATE --date=DATE                                               Specific date to export data from in form: %Y-%m-%d (eg. 2018-12-31) NOTE that this date is in UTC
20
  --debug                                                           Enable debug logging
21

22
"""
23

24
# Standard Python Libraries
NEW
25
from configparser import ConfigParser
×
26
from datetime import datetime
×
27
import json
×
28
import logging
×
29
from logging.handlers import RotatingFileHandler
×
30
import os
×
31
import sys
×
32
import tarfile
×
33
import time
×
34

35
# Third-Party Libraries
36
import boto3
×
37
import bson
×
38
from dateutil.relativedelta import relativedelta
×
39
import dateutil.tz as tz
×
40
from docopt import docopt
×
41
import gnupg  # pip install python-gnupg
×
42
import netaddr
×
43
from pytz import timezone
×
44

45
# cisagov Libraries
46
from dmarc import get_dmarc_data
×
47
from mongo_db_from_config import db_from_config
×
48

49
# Logging core variables
50
logger = logging.getLogger("cyhy-feeds")
×
51
LOG_FILE_NAME = "/var/log/cyhy/feeds.log"
×
52
LOG_FILE_MAX_SIZE = pow(1024, 2) * 128
×
53
LOG_FILE_BACKUP_COUNT = 9
×
54
DEFAULT_LOGGER_LEVEL = logging.INFO
×
55

56
BUCKET_NAME = "ncats-moe-data"
×
57
DOMAIN = "ncats-moe-data"
×
58
HEADER = ""
×
59
DEFAULT_ES_RETRIEVE_SIZE = 10000
×
60
DAYS_OF_DMARC_REPORTS = 1
×
61
PAGE_SIZE = 100000  # Number of documents per query
×
62
SAVEFILE_PREFIX = "cyhy_extract_"
×
63

64

65
def custom_json_handler(obj):
×
66
    """Format a provided JSON object."""
67
    if hasattr(obj, "isoformat"):
×
68
        return obj.isoformat()
×
69
    elif type(obj) == bson.objectid.ObjectId:
×
70
        return repr(obj)
×
71
    elif type(obj) == netaddr.IPAddress:
×
72
        return str(obj)
×
73
    elif type(obj) == netaddr.IPNetwork:
×
74
        return str(obj)
×
75
    elif type(obj) == netaddr.IPSet:
×
76
        return obj.iter_cidrs()
×
77
    else:
78
        raise TypeError(
×
79
            "Object of type {} with value of {} is not JSON serializable".format(
80
                type(obj), repr(obj)
81
            )
82
        )
83

84

85
def to_json(obj):
×
86
    """Return a string representation of a formatted JSON."""
87
    return json.dumps(obj, sort_keys=True, indent=4, default=custom_json_handler)
×
88

89

90
def flatten_datetime(in_datetime):
×
91
    """Flatten datetime to day, month, and year only."""
92
    return in_datetime.replace(hour=0, minute=0, second=0, microsecond=0)
×
93

94

95
# All logging code is pulled from cyhy-core and tweaked down to this single use-case.
96
# Since we are still running Python2 we cannot leverage some of the improvements
97
# made in the logging library in later version.
98
def setup_logging(debug_logging):
×
99
    """Set up logging for the script."""
100
    LOGGER_FORMAT = "%(asctime)-15s %(levelname)s %(name)s - %(message)s"
×
101
    formatter = logging.Formatter(LOGGER_FORMAT)
×
102
    formatter.converter = time.gmtime  # log times in UTC
×
103
    root = logging.getLogger()
×
104
    if debug_logging:
×
105
        root.setLevel(logging.DEBUG)
×
106
    else:
107
        root.setLevel(DEFAULT_LOGGER_LEVEL)
×
108
    file_handler = RotatingFileHandler(
×
109
        LOG_FILE_NAME, maxBytes=LOG_FILE_MAX_SIZE, backupCount=LOG_FILE_BACKUP_COUNT
110
    )
111
    file_handler.setFormatter(formatter)
×
112
    root.addHandler(file_handler)
×
113
    logger.debug("Debug mode enabled.")
×
114
    return root
×
115

116

117
def update_bucket(bucket_name, local_file, remote_file_name):
×
118
    """Update the s3 bucket with the new contents."""
119
    s3 = boto3.client("s3")
×
120
    s3.upload_file(local_file, bucket_name, remote_file_name)
×
121

122

123
def create_dummy_files(output_dir):
×
124
    """Create dummy files to test cleanup_old_files."""
125
    for n in range(1, 21):
×
126
        dummy_filename = "dummy_file_{!s}.gpg".format(n)
×
127
        full_path_dummy_filename = os.path.join(output_dir, dummy_filename)
×
128
        # Use open to create files.
129
        with open(full_path_dummy_filename, "w"):
×
130
            pass
×
131
        st = os.stat(full_path_dummy_filename)
×
132
        # Set file modification time to n days earlier than it was.
133
        # Note that there are 86400 seconds per day.
134
        os.utime(full_path_dummy_filename, (st.st_atime, st.st_mtime - (86400 * n)))
×
135

136

137
def cleanup_old_files(output_dir, file_retention_num_days):
×
138
    """Delete any *.gpg files older than file_retention_num_days in the specified output_dir."""
139
    now_unix = time.time()
×
140
    for filename in os.listdir(output_dir):
×
141
        # We only care about filenames that end with .gpg
142
        if filename.endswith(".gpg"):
×
143
            full_path_filename = os.path.join(output_dir, filename)
×
144
            # If file modification time is older than
145
            # file_retention_num_days.  Note that there are 86400
146
            # seconds per day.
147
            file_retention_in_secs = file_retention_num_days * 86400
×
148
            if os.stat(full_path_filename).st_mtime < now_unix - file_retention_in_secs:
×
149
                # Delete file locally
150
                os.remove(full_path_filename)
×
151

152

153
def cleanup_bucket_files(object_retention_days):
×
154
    """Delete oldest files if they are older than the provided retention time."""
155
    retention_time = flatten_datetime(
×
156
        datetime.now(tz.tzlocal()) - relativedelta(days=object_retention_days)
157
    )
158
    s3 = boto3.client("s3")
×
159
    response = None
×
160

161
    while True:
162
        if response is None:
×
163
            response = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=SAVEFILE_PREFIX)
×
164
        elif response["IsTruncated"] is True:
×
165
            response = s3.list_objects_v2(
×
166
                Bucket=BUCKET_NAME,
167
                Prefix=SAVEFILE_PREFIX,
168
                ContinuationToken=response["NextContinuationToken"],
169
            )
170
        else:
171
            break
×
172

173
        del_list = [
×
174
            {"Key": o["Key"]}
175
            for o in response.get("Contents", [])
176
            if flatten_datetime(o["LastModified"]) < retention_time
177
        ]
178
        # AWS requires a list of objects and an empty list is seen as malformed.
179
        if len(del_list) > 0:
×
180
            del_resp = s3.delete_objects(
×
181
                Bucket=BUCKET_NAME, Delete={"Objects": del_list}
182
            )
183
            for err in del_resp.get("Errors", []):
×
184
                logger.error(
×
185
                    "Failed to delete '{}' :: {} - {}\n".format(
186
                        err["key"], err["Code"], err["Message"]
187
                    )
188
                )
189

190

191
def generate_cursor(collection, parameters):
×
192
    """Query collection and return a cursor to be used for data retrieval."""
193
    # We set no_cursor_timeout so that long retrievals do not cause generated
194
    # cursors to expire on the MongoDB server. This allows us to generate all cursors
195
    # up front and then pull results without worrying about a generated cursor
196
    # timing out on the server.
197
    return collection.find(
×
198
        parameters["query"], parameters["projection"], no_cursor_timeout=True
199
    )
200

201

202
def query_data(collection, cursor, tbz_file, tbz_filename, end_of_data_collection):
×
203
    """Query collection for data matching query and add it to tbz_file."""
204
    logger.info("Fetching from {} collection...".format(collection))
×
205

206
    json_filename = "{}_{!s}.json".format(
×
207
        collection,
208
        end_of_data_collection.isoformat().replace(":", "").split(".")[0],
209
    )
210

211
    # The previous method converted all documents retrieved into a JSON string at
212
    # once. This had a very large memory overhead and certain queries would
213
    # consume enough memory in this process to crash the AWS instance being used
214
    # before pagination was implemented. We are now retrieving and processing
215
    # a single document at a time and the memory overhead is drastically lower.
216
    with open(json_filename, "w") as collection_file:
×
217
        collection_file.write("[")
×
218

NEW
219
        file_position = collection_file.tell()
×
220
        for doc in cursor:
×
221
            collection_file.write(to_json([doc])[1:-2])
×
NEW
222
            file_position = collection_file.tell()
×
223
            collection_file.write(",")
×
224

225
        if cursor.retrieved != 0:
×
226
            # If we output documents then we have a trailing comma, so we need to
227
            # roll back the file location to before the comma to overwrite as we finish
NEW
228
            collection_file.seek(file_position)
×
229

230
        collection_file.write("\n]")
×
231

232
    logger.info("Finished writing {} to file.".format(collection))
×
233
    tbz_file.add(json_filename)
×
234
    logger.info("Added {} to {}".format(json_filename, tbz_filename))
×
235
    # Delete file once added to tar
236
    if os.path.exists(json_filename):
×
237
        os.remove(json_filename)
×
238
        logger.info("Deleted {} as part of cleanup.".format(json_filename))
×
239

240

241
def main():
×
242
    """Retrieve data, aggreate into a compressed archive, and encrypt it to store or upload to S3."""
243
    global __doc__
244
    __doc__ = __doc__.replace("COMMAND_NAME", __file__)
×
NEW
245
    args = docopt(__doc__, version="0.0.5-rc.1")
×
246

247
    setup_logging(args["--debug"])
×
248

249
    logger.info("Beginning data extraction process.")
×
250

251
    if not (
×
252
        args["--cyhy-config"] or args["--scan-config"] or args["--assessment-config"]
253
    ):
254
        logger.error("At least one database configuration must be supplied.")
×
255
        sys.exit(1)
×
256

257
    if args["--cyhy-config"]:
×
258
        logger.debug("Creating connection to cyhy database.")
×
259
        cyhy_db = db_from_config(args["--cyhy-config"])
×
260
    if args["--scan-config"]:
×
261
        logger.debug("Creating connection to scan database.")
×
262
        scan_db = db_from_config(args["--scan-config"])
×
263
    if args["--assessment-config"]:
×
264
        logger.debug("Creating connection to assessment database.")
×
265
        assessment_db = db_from_config(args["--assessment-config"])
×
266
    now = datetime.now(tz.tzutc())
×
267

268
    # Read parameters in from config file
NEW
269
    config = ConfigParser()
×
270
    config.read([args["--config"]])
×
271
    ORGS_EXCLUDED = set(config.get("DEFAULT", "FED_ORGS_EXCLUDED").split(","))
×
272
    if ORGS_EXCLUDED == {""}:
×
273
        ORGS_EXCLUDED = set()
×
274
    GNUPG_HOME = config.get("DEFAULT", "GNUPG_HOME")
×
275
    RECIPIENTS = config.get("DEFAULT", "RECIPIENTS").split(",")
×
276
    SIGNER = config.get("DEFAULT", "SIGNER")
×
277
    SIGNER_PASSPHRASE = config.get("DEFAULT", "SIGNER_PASSPHRASE")
×
278
    OUTPUT_DIR = config.get("DEFAULT", "OUTPUT_DIR")
×
279
    # Files older than this are deleted by cleanup_old_files()
280
    FILE_RETENTION_NUM_DAYS = int(config.get("DEFAULT", "FILE_RETENTION_NUM_DAYS"))
×
281
    ES_REGION = config.get("DMARC", "ES_REGION")
×
282
    ES_URL = config.get("DMARC", "ES_URL")
×
283
    ES_RETRIEVE_SIZE = int(config.get("DMARC", "ES_RETRIEVE_SIZE"))
×
284
    ES_AWS_CONFIG_SECTION_NAME = config.get("DMARC", "ES_AWS_CONFIG_SECTION_NAME")
×
285

286
    # Check if OUTPUT_DIR exists; if not, bail out
287
    if not os.path.exists(OUTPUT_DIR):
×
288
        logger.error("Output directory '{}' does not exist.".format(OUTPUT_DIR))
×
289
        sys.exit(1)
×
290

291
    # Set up GPG (used for encrypting and signing)
292
    gpg = gnupg.GPG(
×
293
        gpgbinary="gpg2",
294
        gnupghome=GNUPG_HOME,
295
        verbose=args["--verbose"],
296
        options=["--pinentry-mode", "loopback", "-u", SIGNER],
297
    )
298
    gpg.encoding = "utf-8"
×
299

300
    if args["--date"]:
×
301
        # Note this date is in UTC timezone
302
        date_of_data = datetime.strptime(args["--date"], "%Y-%m-%d")
×
303
        end_of_data_collection = flatten_datetime(
×
304
            timezone("UTC").localize(date_of_data)
305
        )
306
    else:
307
        end_of_data_collection = flatten_datetime(now)
×
308

309
    start_of_data_collection = end_of_data_collection + relativedelta(days=-1)
×
310

311
    logger.debug(
×
312
        "Extracting data from {} to {}.".format(
313
            start_of_data_collection, end_of_data_collection
314
        )
315
    )
316

317
    # Create tar/bzip2 file for writing
318
    tbz_filename = "{}{!s}.tbz".format(
×
319
        SAVEFILE_PREFIX,
320
        end_of_data_collection.isoformat().replace(":", "").split(".")[0],
321
    )
322
    tbz_file = tarfile.open(tbz_filename, mode="w:bz2")
×
323

324
    if args["--cyhy-config"]:
×
325
        # Get a list of all non-retired orgs
326
        all_orgs = (
×
327
            cyhy_db["requests"]
328
            .find({"retired": {"$ne": True}}, {"_id": 1})
329
            .distinct("_id")
330
        )
331
        orgs = list(set(all_orgs) - ORGS_EXCLUDED)
×
332
    else:
333
        orgs = []
×
334

335
    default_projection = {"key": False}
×
336

337
    cyhy_collection = {
×
338
        "host_scans": {
339
            "query": {
340
                "owner": {"$in": orgs},
341
                "time": {
342
                    "$gte": start_of_data_collection,
343
                    "$lt": end_of_data_collection,
344
                },
345
            },
346
            "projection": default_projection,
347
        },
348
        "hosts": {
349
            "query": {
350
                "owner": {"$in": orgs},
351
                "last_change": {
352
                    "$gte": start_of_data_collection,
353
                    "$lt": end_of_data_collection,
354
                },
355
            },
356
            "projection": default_projection,
357
        },
358
        # The kevs collection does not have a field to indicate either
359
        # initial creation time or time of last modification. As a result we can
360
        # only pull the entire collection every time an extract is run.
361
        "kevs": {
362
            "query": {},
363
            "projection": default_projection,
364
        },
365
        "port_scans": {
366
            "query": {
367
                "owner": {"$in": orgs},
368
                "time": {
369
                    "$gte": start_of_data_collection,
370
                    "$lt": end_of_data_collection,
371
                },
372
            },
373
            "projection": default_projection,
374
        },
375
        # The requests collection does not have a field to indicate either
376
        # initial creation time or time of last modification. As a result we can
377
        # only pull the entire collection every time an extract is run.
378
        "requests": {
379
            "query": {},
380
            "projection": {
381
                "agency.acronym": True,
382
                "agency.location": True,
383
                "agency.name": True,
384
                "agency.type": True,
385
                "children": True,
386
                "networks": True,
387
                "period_start": True,
388
                "report_types": True,
389
                "retired": True,
390
                "scan_types": True,
391
                "stakeholder": True,
392
            },
393
        },
394
        "tickets": {
395
            "query": {
396
                "owner": {"$in": orgs},
397
                "last_change": {
398
                    "$gte": start_of_data_collection,
399
                    "$lt": end_of_data_collection,
400
                },
401
            },
402
            "projection": default_projection,
403
        },
404
        "vuln_scans": {
405
            "query": {
406
                "owner": {"$in": orgs},
407
                "time": {
408
                    "$gte": start_of_data_collection,
409
                    "$lt": end_of_data_collection,
410
                },
411
            },
412
            "projection": default_projection,
413
        },
414
    }
415

416
    scan_collection = {
×
417
        "certs": {
418
            "query": {
419
                "sct_or_not_before": {
420
                    "$gte": start_of_data_collection,
421
                    "$lt": end_of_data_collection,
422
                }
423
            },
424
            "projection": default_projection,
425
        },
426
        "https_scan": {
427
            "query": {
428
                "scan_date": {
429
                    "$gte": start_of_data_collection,
430
                    "$lt": end_of_data_collection,
431
                }
432
            },
433
            "projection": default_projection,
434
        },
435
        "sslyze_scan": {
436
            "query": {
437
                "scan_date": {
438
                    "$gte": start_of_data_collection,
439
                    "$lt": end_of_data_collection,
440
                }
441
            },
442
            "projection": default_projection,
443
        },
444
        "trustymail": {
445
            "query": {
446
                "scan_date": {
447
                    "$gte": start_of_data_collection,
448
                    "$lt": end_of_data_collection,
449
                }
450
            },
451
            "projection": default_projection,
452
        },
453
    }
454

455
    # Neither collection in the assessment database have fields that indicate an
456
    # initial creation time or time of last modification. As a result we can only
457
    # pull the entire collection every time an extract is run.
458
    assessment_collection = {
×
459
        "assessments": {"query": {}, "projection": default_projection},
460
        "findings": {"query": {}, "projection": default_projection},
461
    }
462

463
    # Get cursors for the results of our queries. Create a tuple of the collection
464
    # name and the generated cursor to later iterate over for data retrieval. We
465
    # create cursors all at once to "lock in" the query results to reduce timing
466
    # issues for data retrieval.
467
    logger.info("Creating cursors for query results.")
×
468
    cursor_list = []
×
469
    if args["--cyhy-config"]:
×
470
        for collection in cyhy_collection:
×
471
            logger.debug("Generating cursor for {}.{}".format(cyhy_db.name, collection))
×
472
            cursor_list.append(
×
473
                (
474
                    cyhy_db[collection].name,
475
                    generate_cursor(cyhy_db[collection], cyhy_collection[collection]),
476
                )
477
            )
478
    if args["--scan-config"]:
×
479
        for collection in scan_collection:
×
480
            logger.debug("Generating cursor for {}.{}".format(scan_db.name, collection))
×
481
            cursor_list.append(
×
482
                (
483
                    scan_db[collection].name,
484
                    generate_cursor(scan_db[collection], scan_collection[collection]),
485
                )
486
            )
487
    if args["--assessment-config"]:
×
488
        for collection in assessment_collection:
×
489
            logger.debug(
×
490
                "Generating cursor for {}.{}".format(assessment_db.name, collection)
491
            )
492
            cursor_list.append(
×
493
                (
494
                    assessment_db[collection].name,
495
                    generate_cursor(
496
                        assessment_db[collection], assessment_collection[collection]
497
                    ),
498
                )
499
            )
500

501
    # Use our generated cursors to pull data now.
502
    logger.info("Extracting data from database(s).")
×
503
    for collection, cursor in cursor_list:
×
504
        query_data(
×
505
            collection,
506
            cursor,
507
            tbz_file,
508
            tbz_filename,
509
            end_of_data_collection,
510
        )
511
        # Just to be safe we manually close the cursor.
512
        cursor.close()
×
513

514
    # Note that we use the elasticsearch AWS profile here
515
    json_data = to_json(
×
516
        get_dmarc_data(
517
            ES_REGION,
518
            ES_URL,
519
            DAYS_OF_DMARC_REPORTS,
520
            ES_RETRIEVE_SIZE,
521
            ES_AWS_CONFIG_SECTION_NAME,
522
        )
523
    )
524
    json_filename = "DMARC_{!s}.json".format(
×
525
        end_of_data_collection.isoformat().replace(":", "").split(".")[0]
526
    )
527
    dmarc_file = open(json_filename, "w")
×
528
    dmarc_file.write(json_data)
×
529
    dmarc_file.close()
×
530
    tbz_file.add(json_filename)
×
531
    tbz_file.close()
×
532
    if os.path.exists(json_filename):
×
533
        os.remove(json_filename)
×
534
        logger.info("Deleted {} as part of cleanup.".format(json_filename))
×
535

536
    gpg_file_name = tbz_filename + ".gpg"
×
537
    gpg_full_path_filename = os.path.join(OUTPUT_DIR, gpg_file_name)
×
538
    # Encrypt (with public keys for all RECIPIENTS) and sign (with
539
    # SIGNER's private key)
540
    with open(tbz_filename, "rb") as f:
×
541
        status = gpg.encrypt_file(
×
542
            f,
543
            RECIPIENTS,
544
            armor=False,
545
            sign=SIGNER,
546
            passphrase=SIGNER_PASSPHRASE,
547
            output=gpg_full_path_filename,
548
        )
549

550
    if not status.ok:
×
551
        logger.error("GPG Error {} :: {}".format(status.status, status.stderr))
×
552
        sys.exit(1)
×
553

554
    logger.info(
×
555
        "Encrypted, signed, and compressed JSON data written to file: {}".format(
556
            gpg_full_path_filename
557
        )
558
    )
559

560
    if args["--aws"]:
×
561
        # send the contents to the s3 bucket
562
        update_bucket(BUCKET_NAME, gpg_full_path_filename, gpg_file_name)
×
563
        logger.info("Upload to AWS bucket complete")
×
564

565
    if os.path.exists(tbz_filename):
×
566
        os.remove(tbz_filename)
×
567
        logger.info("Deleted {} as part of cleanup.".format(tbz_filename))
×
568

569
    cleanup_old_files(OUTPUT_DIR, FILE_RETENTION_NUM_DAYS)
×
570

571
    if args["--cleanup-aws"]:
×
572
        cleanup_bucket_files(FILE_RETENTION_NUM_DAYS)
×
573

574
    logger.info("Finished data extraction process.")
×
575

576

577
if __name__ == "__main__":
578
    main()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc