• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pe-reports / 5892227966

17 Aug 2023 02:27PM UTC coverage: 33.736% (+7.0%) from 26.737%
5892227966

Pull #565

github

web-flow
Merge 9adf19bbe into 998fa208f
Pull Request #565: Update report generator to use reportlab

93 of 477 branches covered (19.5%)

Branch coverage included in aggregate %.

443 of 1022 new or added lines in 8 files covered. (43.35%)

18 existing lines in 5 files now uncovered.

801 of 2173 relevant lines covered (36.86%)

1.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.14
/src/pe_reports/report_generator.py
1
"""cisagov/pe-reports: A tool for creating Posture & Exposure reports.
2

3
Usage:
4
  pe-reports REPORT_DATE OUTPUT_DIRECTORY [--log-level=LEVEL] [--soc_med_included]
5

6
Options:
7
  -h --help                         Show this message.
8
  REPORT_DATE                       Date of the report, format YYYY-MM-DD
9
  OUTPUT_DIRECTORY                  The directory where the final PDF
10
                                    reports should be saved.
11
  -l --log-level=LEVEL              If specified, then the log level will be set to
12
                                    the specified value.  Valid values are "debug", "info",
13
                                    "warning", "error", and "critical". [default: info]
14
  -s --soc_med_included             Include social media posts from Cybersixgill in the report.
15
"""
16

17
# Standard Python Libraries
18
import logging
5✔
19
import os
5✔
20
import sys
5✔
21
from typing import Any, Dict
5✔
22

23
# Third-Party Libraries
24
import docopt
5✔
25
import fitz
5✔
26
from schema import And, Schema, SchemaError, Use
5✔
27

28
# cisagov Libraries
29
import pe_reports
5✔
30

31
from ._version import __version__
5✔
32
from .data.db_query import connect, get_orgs
5✔
33
from .pages import init
5✔
34
from .reportlab_core_generator import core_report_gen
5✔
35
from .reportlab_generator import report_gen
5✔
36

37
LOGGER = logging.getLogger(__name__)
5✔
38

39

40
def embed(
5✔
41
    output_directory,
42
    org_code,
43
    datestring,
44
    file,
45
    cred_json,
46
    da_json,
47
    vuln_json,
48
    mi_json,
49
    cred_xlsx,
50
    da_xlsx,
51
    vuln_xlsx,
52
    mi_xlsx,
53
):
54
    """Embed raw data into PDF and encrypt file."""
55
    doc = fitz.open(file)
×
56
    # Get the summary page of the PDF on page 4
NEW
57
    page = doc[4]
×
NEW
58
    output = f"{output_directory}/{org_code}/Posture_and_Exposure_Report-{org_code}-{datestring}.pdf"
×
59

60
    # Open json data as binary
NEW
61
    cc = open(cred_json, "rb").read()
×
NEW
62
    da = open(da_json, "rb").read()
×
NEW
63
    ma = open(vuln_json, "rb").read()
×
NEW
64
    if mi_json:
×
NEW
65
        mi = open(mi_json, "rb").read()
×
66

67
    # Open CSV data as binary
NEW
68
    cc_xl = open(cred_xlsx, "rb").read()
×
NEW
69
    da_xl = open(da_xlsx, "rb").read()
×
NEW
70
    ma_xl = open(vuln_xlsx, "rb").read()
×
NEW
71
    if mi_xlsx:
×
NEW
72
        mi_xl = open(mi_xlsx, "rb").read()
×
73

74
    # Insert link to CSV data in summary page of PDF.
75
    # Use coordinates to position them on the bottom.
NEW
76
    p1 = fitz.Point(300, 607)
×
NEW
77
    p2 = fitz.Point(300, 635)
×
NEW
78
    p3 = fitz.Point(300, 663)
×
NEW
79
    p4 = fitz.Point(300, 691)
×
NEW
80
    p5 = fitz.Point(340, 607)
×
NEW
81
    p6 = fitz.Point(340, 635)
×
NEW
82
    p7 = fitz.Point(340, 663)
×
NEW
83
    p8 = fitz.Point(340, 691)
×
84

85
    # Embed and add button icon
UNCOV
86
    page.add_file_annot(
×
87
        p1, cc, "compromised_credentials.json", desc="Open JSON", icon="Paperclip"
88
    )
89
    page.add_file_annot(
×
90
        p2, da, "domain_alerts.json", desc="Open JSON", icon="Paperclip"
91
    )
NEW
92
    page.add_file_annot(p3, ma, "vuln_alerts.json", desc="Open JSON", icon="Paperclip")
×
NEW
93
    if mi_json:
×
NEW
94
        page.add_file_annot(
×
95
            p4, mi, "mention_incidents.json", desc="Open JSON", icon="Paperclip"
96
        )
UNCOV
97
    page.add_file_annot(
×
98
        p5, cc_xl, "compromised_credentials.xlsx", desc="Open Excel", icon="Graph"
99
    )
NEW
100
    page.add_file_annot(
×
101
        p6, da_xl, "domain_alerts.xlsx", desc="Open Excel", icon="Graph"
102
    )
NEW
103
    page.add_file_annot(p7, ma_xl, "vuln_alerts.xlsx", desc="Open Excel", icon="Graph")
×
NEW
104
    if mi_xlsx:
×
NEW
105
        page.add_file_annot(
×
106
            p8, mi_xl, "mention_incidents.xlsx", desc="Open Excel", icon="Graph"
107
        )
108

109
    # Save doc and set garbage=4 to reduce PDF size using all 4 methods:
110
    # Remove unused objects, compact xref table, merge duplicate objects,
111
    # and check stream objects for duplication
112
    doc.save(
×
113
        output,
114
        garbage=4,
115
        deflate=True,
116
    )
117
    tooLarge = False
×
118
    # Throw error if file size is greater than 20MB
119
    filesize = os.path.getsize(output)
×
120
    if filesize >= 20000000:
×
121
        tooLarge = True
×
122

NEW
123
    return filesize, tooLarge, output
×
124

125

126
def generate_reports(datestring, output_directory, soc_med_included=False):
5✔
127
    """Process steps for generating report data."""
128
    # Get PE orgs from PE db
129
    conn = connect()
5✔
130
    if conn:
5✔
131
        pe_orgs = get_orgs(conn)
5✔
132
    else:
133
        return 1
5✔
134
    generated_reports = 0
5✔
135

136
    # Iterate over organizations
137
    if pe_orgs:
5!
138
        LOGGER.info("PE orgs count: %d", len(pe_orgs))
5✔
139
        for org in pe_orgs:
5✔
140
            # Assign organization values
141
            org_uid = org[0]
5✔
142
            org_name = org[1]
5✔
143
            org_code = org[2]
5✔
144
            premium = org[8]
5✔
145

146
            LOGGER.info("Running on %s", org_code)
5✔
147

148
            # Create folders in output directory
149
            for dir_name in ("ppt", org_code):
5✔
150
                if not os.path.exists(f"{output_directory}/{dir_name}"):
5!
151
                    os.mkdir(f"{output_directory}/{dir_name}")
5✔
152

153
            # Insert Charts and Metrics into PDF
154
            (
5✔
155
                report_dict,
156
                cred_json,
157
                da_json,
158
                vuln_json,
159
                mi_json,
160
                cred_xlsx,
161
                da_xlsx,
162
                vuln_xlsx,
163
                mi_xlsx,
164
            ) = init(
165
                datestring,
166
                org_name,
167
                org_code,
168
                org_uid,
169
                premium,
170
                output_directory,
171
                soc_med_included,
172
            )
173

174
            # Convert to HTML to PDF
175
            output_filename = f"{output_directory}/Posture_and_Exposure_Report-{org_code}-{datestring}.pdf"
5✔
176

177
            report_dict["filename"] = output_filename
5✔
178
            if premium:
5!
179
                report_gen(report_dict, soc_med_included)
5✔
180
            else:
NEW
181
                core_report_gen(report_dict)
×
182

183
            # Grab the PDF
184
            pdf = f"{output_directory}/Posture_and_Exposure_Report-{org_code}-{datestring}.pdf"
5✔
185

186
            # Embed Excel and JSON files
187
            (filesize, tooLarge, output) = embed(
5✔
188
                output_directory,
189
                org_code,
190
                datestring,
191
                pdf,
192
                cred_json,
193
                da_json,
194
                vuln_json,
195
                mi_json,
196
                cred_xlsx,
197
                da_xlsx,
198
                vuln_xlsx,
199
                mi_xlsx,
200
            )
201

202
            # Log a message if the report is too large.  Our current mailer
203
            # cannot send files larger than 20MB.
204
            if tooLarge:
5!
205
                LOGGER.info(
×
206
                    "%s is too large. File size: %s Limit: 20MB", org_code, filesize
207
                )
208

209
            generated_reports += 1
5✔
210
    else:
211
        LOGGER.error(
×
212
            "Connection to pe database failed and/or there are 0 organizations stored."
213
        )
214

215
    LOGGER.info("%s reports generated", generated_reports)
5✔
216
    return generated_reports
5✔
217

218

219
def main():
5✔
220
    """Generate PDF reports."""
221
    args: Dict[str, str] = docopt.docopt(__doc__, version=__version__)
5✔
222

223
    # Validate and convert arguments as needed
224
    schema: Schema = Schema(
5✔
225
        {
226
            "--log-level": And(
227
                str,
228
                Use(str.lower),
229
                lambda n: n in ("debug", "info", "warning", "error", "critical"),
230
                error="Possible values for --log-level are "
231
                + "debug, info, warning, error, and critical.",
232
            ),
233
            str: object,  # Don't care about other keys, if any
234
        }
235
    )
236

237
    try:
5✔
238
        validated_args: Dict[str, Any] = schema.validate(args)
5✔
239
    except SchemaError as err:
5✔
240
        # Exit because one or more of the arguments were invalid
241
        print(err, file=sys.stderr)
5✔
242
        sys.exit(1)
5✔
243

244
    # Assign validated arguments to variables
245
    log_level: str = validated_args["--log-level"]
5✔
246

247
    # Setup logging to central file
248
    logging.basicConfig(
5✔
249
        filename=pe_reports.CENTRAL_LOGGING_FILE,
250
        filemode="a",
251
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
252
        datefmt="%m/%d/%Y %I:%M:%S",
253
        level=log_level.upper(),
254
    )
255

256
    LOGGER.info("Loading Posture & Exposure Report, Version : %s", __version__)
5✔
257

258
    # Create output directory
259
    if not os.path.exists(validated_args["OUTPUT_DIRECTORY"]):
5✔
260
        os.mkdir(validated_args["OUTPUT_DIRECTORY"])
5✔
261

262
    # Generate reports
263
    generate_reports(
5✔
264
        validated_args["REPORT_DATE"],
265
        validated_args["OUTPUT_DIRECTORY"],
266
        validated_args["--soc_med_included"],
267
    )
268

269
    # Stop logging and clean up
270
    logging.shutdown()
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc