• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pe-reports / 5125761814

pending completion
5125761814

Pull #565

github

web-flow
Merge 39dc8f912 into 79ce2c18b
Pull Request #565: Update report generator to use reportlab

79 of 415 branches covered (19.04%)

Branch coverage included in aggregate %.

404 of 676 new or added lines in 7 files covered. (59.76%)

16 existing lines in 5 files now uncovered.

748 of 1804 relevant lines covered (41.46%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.35
/src/pe_reports/data/db_query.py
1
"""Query the PE PostgreSQL database."""
2

3
# Standard Python Libraries
4
import logging
5✔
5
import sys
5✔
6

7
# Third-Party Libraries
8
import numpy as np
5✔
9
import pandas as pd
5✔
10
import psycopg2
5✔
11
from psycopg2 import OperationalError
5✔
12
from psycopg2.extensions import AsIs
5✔
13

14
from .config import config
5✔
15

16
# Setup logging to central file
17
# To avoid a circular reference error which occurs when calling app.config["LOGGER"]
18
# we are directly calling the logger here
19
LOGGER = logging.getLogger(__name__)
5✔
20

21
CONN_PARAMS_DIC = config()
5✔
22

23

24
def show_psycopg2_exception(err):
5✔
25
    """Handle errors for PostgreSQL issues."""
26
    err_type, err_obj, traceback = sys.exc_info()
5✔
27
    LOGGER.error(
5✔
28
        "Database connection error: %s on line number: %s", err, traceback.tb_lineno
29
    )
30

31

32
def connect():
5✔
33
    """Connect to PostgreSQL database."""
34
    conn = None
5✔
35
    try:
5✔
36
        conn = psycopg2.connect(**CONN_PARAMS_DIC)
5✔
37
    except OperationalError as err:
5✔
38
        show_psycopg2_exception(err)
5✔
39
        conn = None
5✔
40
    return conn
5✔
41

42

43
def close(conn):
5✔
44
    """Close connection to PostgreSQL."""
45
    conn.close()
×
46
    return
×
47

48

49
def get_orgs(conn):
5✔
50
    """Query organizations table for orgs we report on."""
51
    try:
×
52
        cur = conn.cursor()
×
53
        sql = """SELECT * FROM organizations WHERE report_on"""
×
54
        cur.execute(sql)
×
55
        pe_orgs = cur.fetchall()
×
56
        cur.close()
×
57
        return pe_orgs
×
58
    except (Exception, psycopg2.DatabaseError) as error:
×
59
        LOGGER.error("There was a problem with your database query %s", error)
×
60
    finally:
61
        if conn is not None:
×
62
            close(conn)
×
63

64

65
def get_orgs_df():
5✔
66
    """Query organizations table into a dataframe."""
67
    conn = connect()
×
68
    try:
×
69
        sql = """SELECT * FROM organizations"""
×
70
        pe_orgs_df = pd.read_sql(sql, conn)
×
71
        return pe_orgs_df
×
72
    except (Exception, psycopg2.DatabaseError) as error:
×
73
        LOGGER.error("There was a problem with your database query %s", error)
×
74
    finally:
75
        if conn is not None:
×
76
            close(conn)
×
77

78

79
def query_creds_view(org_uid, start_date, end_date):
5✔
80
    """Query credentials view."""
81
    conn = connect()
×
82
    try:
×
83
        sql = """SELECT * FROM vw_breachcomp
×
84
        WHERE organizations_uid = %(org_uid)s
85
        AND modified_date BETWEEN %(start_date)s AND %(end_date)s"""
86
        df = pd.read_sql(
×
87
            sql,
88
            conn,
89
            params={"org_uid": org_uid, "start_date": start_date, "end_date": end_date},
90
        )
91
        return df
×
92
    except (Exception, psycopg2.DatabaseError) as error:
×
93
        LOGGER.error("There was a problem with your database query %s", error)
×
94
    finally:
95
        if conn is not None:
×
96
            close(conn)
×
97

98

99
def query_credsbyday_view(org_uid, start_date, end_date):
5✔
100
    """Query the credential exposures per day view."""
101
    conn = connect()
×
102
    try:
×
103
        sql = """SELECT mod_date, no_password, password_included FROM vw_breachcomp_credsbydate
×
104
        WHERE organizations_uid = %(org_uid)s
105
        AND mod_date BETWEEN %(start_date)s AND %(end_date)s"""
106
        df = pd.read_sql(
×
107
            sql,
108
            conn,
109
            params={"org_uid": org_uid, "start_date": start_date, "end_date": end_date},
110
        )
111
        return df
×
112
    except (Exception, psycopg2.DatabaseError) as error:
×
113
        LOGGER.error("There was a problem with your database query %s", error)
×
114
    finally:
115
        if conn is not None:
×
116
            close(conn)
×
117

118

119
def query_breachdetails_view(org_uid, start_date, end_date):
5✔
120
    """Query the breach details view."""
121
    conn = connect()
×
122
    try:
×
123
        sql = """SELECT breach_name, mod_date modified_date, breach_date, password_included, number_of_creds
×
124
        FROM vw_breachcomp_breachdetails
125
        WHERE organizations_uid = %(org_uid)s
126
        AND mod_date BETWEEN %(start_date)s AND %(end_date)s"""
127
        df = pd.read_sql(
×
128
            sql,
129
            conn,
130
            params={"org_uid": org_uid, "start_date": start_date, "end_date": end_date},
131
        )
132
        return df
×
133
    except (Exception, psycopg2.DatabaseError) as error:
×
134
        LOGGER.error("There was a problem with your database query %s", error)
×
135
    finally:
136
        if conn is not None:
×
137
            close(conn)
×
138

139

140
def query_domMasq(org_uid, start_date, end_date):
5✔
141
    """Query domain_permuations associated with a given org."""
142
    conn = connect()
×
143
    try:
×
NEW
144
        sql = """SELECT * FROM domain_permutations
×
145
        WHERE organizations_uid = %(org_uid)s
146
        AND date_active BETWEEN %(start_date)s AND %(end_date)s"""
NEW
147
        df = pd.read_sql(
×
148
            sql,
149
            conn,
150
            params={
151
                "org_uid": org_uid,
152
                "start_date": start_date,
153
                "end_date": end_date,
154
            },
155
        )
NEW
156
        return df
×
NEW
157
    except (Exception, psycopg2.DatabaseError) as error:
×
NEW
158
        LOGGER.error("There was a problem with your database query %s", error)
×
159
    finally:
NEW
160
        if conn is not None:
×
NEW
161
            close(conn)
×
162

163

164
def query_domMasq_alerts(org_uid, start_date, end_date):
5✔
165
    """Query domain alerts table."""
NEW
166
    conn = connect()
×
NEW
167
    try:
×
NEW
168
        sql = """SELECT * FROM domain_alerts
×
169
        WHERE organizations_uid = %(org_uid)s
170
        AND date BETWEEN %(start_date)s AND %(end_date)s"""
UNCOV
171
        df = pd.read_sql(
×
172
            sql,
173
            conn,
174
            params={
175
                "org_uid": org_uid,
176
                "start_date": start_date,
177
                "end_date": end_date,
178
            },
179
        )
180
        return df
×
181
    except (Exception, psycopg2.DatabaseError) as error:
×
182
        LOGGER.error("There was a problem with your database query %s", error)
×
183
    finally:
184
        if conn is not None:
×
185
            close(conn)
×
186

187

188
# The 'table' parameter is used in query_shodan, query_darkweb and
189
# query_darkweb_cves functions to call specific tables that relate to the
190
# function name.  The result of this implementation reduces the code base,
191
# the code reduction leads to an increase in efficiency by reusing the
192
# function by passing only a parameter to get the required information from
193
# the database.
194

195

196
def query_shodan(org_uid, start_date, end_date, table):
5✔
197
    """Query Shodan table."""
198
    conn = connect()
×
199
    try:
×
NEW
200
        df = pd.DataFrame()
×
NEW
201
        df_list = []
×
NEW
202
        chunk_size = 1000
×
UNCOV
203
        sql = """SELECT * FROM %(table)s
×
204
        WHERE organizations_uid = %(org_uid)s
205
        AND timestamp BETWEEN %(start_date)s AND %(end_date)s"""
NEW
206
        count = 0
×
207
        # Batch SQL call to reduce memory (https://pythonspeed.com/articles/pandas-sql-chunking/)
NEW
208
        for chunk_df in pd.read_sql(
×
209
            sql,
210
            conn,
211
            params={
212
                "table": AsIs(table),
213
                "org_uid": org_uid,
214
                "start_date": start_date,
215
                "end_date": end_date,
216
            },
217
            chunksize=chunk_size,
218
        ):
NEW
219
            count += 1
×
NEW
220
            df_list.append(chunk_df)
×
221

NEW
222
        if len(df_list) == 0:
×
NEW
223
            df = pd.read_sql(
×
224
                sql,
225
                conn,
226
                params={
227
                    "table": AsIs(table),
228
                    "org_uid": org_uid,
229
                    "start_date": start_date,
230
                    "end_date": end_date,
231
                },
232
            )
233
        else:
NEW
234
            df = pd.concat(df_list, ignore_index=True)
×
235
        return df
×
236
    except (Exception, psycopg2.DatabaseError) as error:
×
237
        LOGGER.error("There was a problem with your database query %s", error)
×
238
    finally:
239
        if conn is not None:
×
240
            close(conn)
×
241

242

243
def query_darkweb(org_uid, start_date, end_date, table):
5✔
244
    """Query Dark Web table."""
245
    conn = connect()
×
246
    try:
×
247
        sql = """SELECT * FROM %(table)s
×
248
        WHERE organizations_uid = %(org_uid)s
249
        AND date BETWEEN %(start_date)s AND %(end_date)s"""
250
        df = pd.read_sql(
×
251
            sql,
252
            conn,
253
            params={
254
                "table": AsIs(table),
255
                "org_uid": org_uid,
256
                "start_date": start_date,
257
                "end_date": end_date,
258
            },
259
        )
260
        return df
×
261
    except (Exception, psycopg2.DatabaseError) as error:
×
262
        LOGGER.error("There was a problem with your database query %s", error)
×
263
    finally:
264
        if conn is not None:
×
265
            close(conn)
×
266

267

268
def query_darkweb_cves(table):
5✔
269
    """Query Dark Web CVE table."""
270
    conn = connect()
×
271
    try:
×
272
        sql = """SELECT * FROM %(table)s"""
×
273
        df = pd.read_sql(
×
274
            sql,
275
            conn,
276
            params={"table": AsIs(table)},
277
        )
278
        return df
×
279
    except (Exception, psycopg2.DatabaseError) as error:
×
280
        LOGGER.error("There was a problem with your database query %s", error)
×
281
    finally:
282
        if conn is not None:
×
283
            close(conn)
×
284

285

286
def query_cyberSix_creds(org_uid, start_date, end_date):
5✔
287
    """Query cybersix_exposed_credentials table."""
288
    conn = connect()
×
289
    try:
×
290
        sql = """SELECT * FROM public.cybersix_exposed_credentials as creds
×
291
        WHERE organizations_uid = %(org_uid)s
292
        AND breach_date BETWEEN %(start)s AND %(end)s"""
293
        df = pd.read_sql(
×
294
            sql,
295
            conn,
296
            params={"org_uid": org_uid, "start": start_date, "end": end_date},
297
        )
298
        df["breach_date_str"] = pd.to_datetime(df["breach_date"]).dt.strftime(
×
299
            "%m/%d/%Y"
300
        )
301
        df.loc[df["breach_name"] == "", "breach_name"] = (
×
302
            "Cyber_six_" + df["breach_date_str"]
303
        )
304
        df["description"] = (
×
305
            df["description"].str.split("Query to find the related").str[0]
306
        )
307
        df["password_included"] = np.where(df["password"] != "", True, False)
×
308
        return df
×
309
    except (Exception, psycopg2.DatabaseError) as error:
×
310
        LOGGER.error("There was a problem with your database query %s", error)
×
311
    finally:
312
        if conn is not None:
×
313
            close(conn)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc