• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pe-reports / 5268386652

pending completion
5268386652

Pull #565

github

web-flow
Merge 40eba2026 into 14755187f
Pull Request #565: Update report generator to use reportlab

79 of 415 branches covered (19.04%)

Branch coverage included in aggregate %.

404 of 676 new or added lines in 7 files covered. (59.76%)

16 existing lines in 5 files now uncovered.

748 of 1804 relevant lines covered (41.46%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.6
/src/pe_reports/report_generator.py
1
"""cisagov/pe-reports: A tool for creating Posture & Exposure reports.
2

3
Usage:
4
  pe-reports REPORT_DATE OUTPUT_DIRECTORY [--log-level=LEVEL]
5

6
Options:
7
  -h --help                         Show this message.
8
  REPORT_DATE                       Date of the report, format YYYY-MM-DD
9
  OUTPUT_DIRECTORY                  The directory where the final PDF
10
                                    reports should be saved.
11
  -l --log-level=LEVEL              If specified, then the log level will be set to
12
                                    the specified value.  Valid values are "debug", "info",
13
                                    "warning", "error", and "critical". [default: info]
14
  -sc --soc_med_included            Include social media posts from Cybersixgill in the report.
15
"""
16

17
# Standard Python Libraries
18
import logging
5✔
19
import os
5✔
20
import sys
5✔
21
from typing import Any, Dict
5✔
22

23
# Third-Party Libraries
24
import docopt
5✔
25
import fitz
5✔
26
from schema import And, Schema, SchemaError, Use
5✔
27

28
# cisagov Libraries
29
import pe_reports
5✔
30

31
from ._version import __version__
5✔
32
from .data.db_query import connect, get_orgs
5✔
33
from .pages import init
5✔
34
from .reportlab_generator import report_gen
5✔
35

36
LOGGER = logging.getLogger(__name__)
5✔
37

38

39
def embed(
5✔
40
    output_directory,
41
    org_code,
42
    datestring,
43
    file,
44
    cred_json,
45
    da_json,
46
    vuln_json,
47
    mi_json,
48
    cred_xlsx,
49
    da_xlsx,
50
    vuln_xlsx,
51
    mi_xlsx,
52
):
53
    """Embed raw data into PDF and encrypt file."""
54
    doc = fitz.open(file)
×
55
    # Get the summary page of the PDF on page 4
NEW
56
    page = doc[4]
×
NEW
57
    output = f"{output_directory}/{org_code}/Posture_and_Exposure_Report-{org_code}-{datestring}.pdf"
×
58

59
    # Open json data as binary
NEW
60
    cc = open(cred_json, "rb").read()
×
NEW
61
    da = open(da_json, "rb").read()
×
NEW
62
    ma = open(vuln_json, "rb").read()
×
NEW
63
    mi = open(mi_json, "rb").read()
×
64

65
    # Open CSV data as binary
NEW
66
    cc_xl = open(cred_xlsx, "rb").read()
×
NEW
67
    da_xl = open(da_xlsx, "rb").read()
×
NEW
68
    ma_xl = open(vuln_xlsx, "rb").read()
×
NEW
69
    mi_xl = open(mi_xlsx, "rb").read()
×
70

71
    # Insert link to CSV data in summary page of PDF.
72
    # Use coordinates to position them on the bottom.
NEW
73
    p1 = fitz.Point(300, 607)
×
NEW
74
    p2 = fitz.Point(300, 635)
×
NEW
75
    p3 = fitz.Point(300, 663)
×
NEW
76
    p4 = fitz.Point(300, 691)
×
NEW
77
    p5 = fitz.Point(340, 607)
×
NEW
78
    p6 = fitz.Point(340, 635)
×
NEW
79
    p7 = fitz.Point(340, 663)
×
NEW
80
    p8 = fitz.Point(340, 691)
×
81

82
    # Embed and add button icon
NEW
83
    page.add_file_annot(
×
84
        p1, cc, "compromised_credentials.json", desc="Open JSON", icon="Paperclip"
85
    )
NEW
86
    page.add_file_annot(
×
87
        p2, da, "domain_alerts.json", desc="Open JSON", icon="Paperclip"
88
    )
NEW
89
    page.add_file_annot(p3, ma, "vuln_alerts.json", desc="Open JSON", icon="Paperclip")
×
UNCOV
90
    page.add_file_annot(
×
91
        p4, mi, "mention_incidents.json", desc="Open JSON", icon="Paperclip"
92
    )
93
    page.add_file_annot(
×
94
        p5, cc_xl, "compromised_credentials.xlsx", desc="Open Excel", icon="Graph"
95
    )
96
    page.add_file_annot(
×
97
        p6, da_xl, "domain_alerts.xlsx", desc="Open Excel", icon="Graph"
98
    )
NEW
99
    page.add_file_annot(p7, ma_xl, "vuln_alerts.xlsx", desc="Open Excel", icon="Graph")
×
NEW
100
    page.add_file_annot(
×
101
        p8, mi_xl, "mention_incidents.xlsx", desc="Open Excel", icon="Graph"
102
    )
103

104
    # Save doc and set garbage=4 to reduce PDF size using all 4 methods:
105
    # Remove unused objects, compact xref table, merge duplicate objects,
106
    # and check stream objects for duplication
107
    doc.save(
×
108
        output,
109
        garbage=4,
110
        deflate=True,
111
    )
112
    tooLarge = False
×
113
    # Throw error if file size is greater than 20MB
114
    filesize = os.path.getsize(output)
×
115
    if filesize >= 20000000:
×
116
        tooLarge = True
×
117

NEW
118
    return filesize, tooLarge, output
×
119

120

121
def generate_reports(datestring, output_directory, soc_med_included=False):
5✔
122
    """Process steps for generating report data."""
123
    # Get PE orgs from PE db
124
    conn = connect()
5✔
125
    if conn:
5✔
126
        pe_orgs = get_orgs(conn)
5✔
127
    else:
128
        return 1
5✔
129
    generated_reports = 0
5✔
130

131
    # Iterate over organizations
132
    if pe_orgs:
5!
133
        LOGGER.info("PE orgs count: %d", len(pe_orgs))
5✔
134
        for org in pe_orgs:
5✔
135
            # Assign organization values
136
            org_uid = org[0]
5✔
137
            org_name = org[1]
5✔
138
            org_code = org[2]
5✔
139

140
            LOGGER.info("Running on %s", org_code)
5✔
141

142
            # Create folders in output directory
143
            for dir_name in ("ppt", org_code):
5✔
144
                if not os.path.exists(f"{output_directory}/{dir_name}"):
5!
145
                    os.mkdir(f"{output_directory}/{dir_name}")
5✔
146

147
            # Insert Charts and Metrics into PDF
148
            (
5✔
149
                report_dict,
150
                cred_json,
151
                da_json,
152
                vuln_json,
153
                mi_json,
154
                cred_xlsx,
155
                da_xlsx,
156
                vuln_xlsx,
157
                mi_xlsx,
158
            ) = init(
159
                datestring,
160
                org_name,
161
                org_code,
162
                org_uid,
163
                output_directory,
164
                soc_med_included,
165
            )
166

167
            # Convert to HTML to PDF
168
            output_filename = f"{output_directory}/Posture_and_Exposure_Report-{org_code}-{datestring}.pdf"
5✔
169
            # convert_html_to_pdf(source_html, output_filename)#TODO possibly generate report here
170
            report_dict["filename"] = output_filename
5✔
171
            report_gen(report_dict, soc_med_included)
5✔
172

173
            # Grab the PDF
174
            pdf = f"{output_directory}/Posture_and_Exposure_Report-{org_code}-{datestring}.pdf"
5✔
175

176
            # Embed excel and Json files
177
            (filesize, tooLarge, output) = embed(
5✔
178
                output_directory,
179
                org_code,
180
                datestring,
181
                pdf,
182
                cred_json,
183
                da_json,
184
                vuln_json,
185
                mi_json,
186
                cred_xlsx,
187
                da_xlsx,
188
                vuln_xlsx,
189
                mi_xlsx,
190
            )
191

192
            # Log a message if the report is too large.  Our current mailer
193
            # cannot send files larger than 20MB.
194
            if tooLarge:
5!
195
                LOGGER.info(
×
196
                    "%s is too large. File size: %s Limit: 20MB", org_code, filesize
197
                )
198

199
            generated_reports += 1
5✔
200
    else:
201
        LOGGER.error(
×
202
            "Connection to pe database failed and/or there are 0 organizations stored."
203
        )
204

205
    LOGGER.info("%s reports generated", generated_reports)
5✔
206
    return generated_reports
5✔
207

208

209
def main():
5✔
210
    """Generate PDF reports."""
211
    args: Dict[str, str] = docopt.docopt(__doc__, version=__version__)
5✔
212

213
    # Validate and convert arguments as needed
214
    schema: Schema = Schema(
5✔
215
        {
216
            "--log-level": And(
217
                str,
218
                Use(str.lower),
219
                lambda n: n in ("debug", "info", "warning", "error", "critical"),
220
                error="Possible values for --log-level are "
221
                + "debug, info, warning, error, and critical.",
222
            ),
223
            str: object,  # Don't care about other keys, if any
224
        }
225
    )
226

227
    try:
5✔
228
        validated_args: Dict[str, Any] = schema.validate(args)
5✔
229
    except SchemaError as err:
5✔
230
        # Exit because one or more of the arguments were invalid
231
        print(err, file=sys.stderr)
5✔
232
        sys.exit(1)
5✔
233

234
    # Assign validated arguments to variables
235
    log_level: str = validated_args["--log-level"]
5✔
236

237
    # Setup logging to central file
238
    logging.basicConfig(
5✔
239
        filename=pe_reports.CENTRAL_LOGGING_FILE,
240
        filemode="a",
241
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
242
        datefmt="%m/%d/%Y %I:%M:%S",
243
        level=log_level.upper(),
244
    )
245

246
    LOGGER.info("Loading Posture & Exposure Report, Version : %s", __version__)
5✔
247

248
    # Create output directory
249
    if not os.path.exists(validated_args["OUTPUT_DIRECTORY"]):
5✔
250
        os.mkdir(validated_args["OUTPUT_DIRECTORY"])
5✔
251

252
    try:
5✔
253
        soc_med = validated_args["--soc_med_included"]
5✔
254
    except Exception as e:
5✔
255
        LOGGER.info(f"Social media should not included: {e}")
5✔
256
        soc_med = False
5✔
257
    # Generate reports
258
    generate_reports(
5✔
259
        validated_args["REPORT_DATE"], validated_args["OUTPUT_DIRECTORY"], soc_med
260
    )
261

262
    # Stop logging and clean up
263
    logging.shutdown()
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc