• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pe-reports / 5892235336

17 Aug 2023 02:27PM UTC coverage: 33.736% (+7.0%) from 26.737%
5892235336

Pull #565

github

web-flow
Merge f450f30ab into 998fa208f
Pull Request #565: Update report generator to use reportlab

93 of 477 branches covered (19.5%)

Branch coverage included in aggregate %.

443 of 1022 new or added lines in 8 files covered. (43.35%)

18 existing lines in 5 files now uncovered.

801 of 2173 relevant lines covered (36.86%)

1.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.46
/src/pe_reports/data/db_query.py
1
"""Query the PE PostgreSQL database."""
2

3
# Standard Python Libraries
4
import json
5✔
5
import logging
5✔
6
import re
5✔
7
import sys
5✔
8

9
# Third-Party Libraries
10
import numpy as np
5✔
11
import pandas as pd
5✔
12
import psycopg2
5✔
13
from psycopg2 import OperationalError
5✔
14
from psycopg2.extensions import AsIs
5✔
15
import requests
5✔
16

17
from .config import config
5✔
18

19
# Setup logging to central file
20
# To avoid a circular reference error which occurs when calling app.config["LOGGER"]
21
# we are directly calling the logger here
22
LOGGER = logging.getLogger(__name__)
5✔
23

24
CONN_PARAMS_DIC = config()
5✔
25
PE_API_KEY = config(section="peapi").get("api_key")
5✔
26
PE_API_URL = config(section="peapi").get("api_url")
5✔
27

28

29
def sanitize_string(string):
5✔
30
    """Remove special characters from string."""
31
    return re.sub(r"[^a-zA-Z0-9\s]", "", string)
×
32

33

34
def sanitize_uid(string):
5✔
35
    """Remove special characters from uids."""
36
    return re.sub(r"[^a-zA-Z0-9\-\s]", "", string)
×
37

38

39
def show_psycopg2_exception(err):
5✔
40
    """Handle errors for PostgreSQL issues."""
41
    err_type, err_obj, traceback = sys.exc_info()
5✔
42
    LOGGER.error(
5✔
43
        "Database connection error: %s on line number: %s", err, traceback.tb_lineno
44
    )
45

46

47
def connect():
5✔
48
    """Connect to PostgreSQL database."""
49
    conn = None
5✔
50
    try:
5✔
51
        conn = psycopg2.connect(**CONN_PARAMS_DIC)
5✔
52
    except OperationalError as err:
5✔
53
        show_psycopg2_exception(err)
5✔
54
        conn = None
5✔
55
    return conn
5✔
56

57

58
def close(conn):
5✔
59
    """Close connection to PostgreSQL."""
60
    conn.close()
×
61
    return
×
62

63

64
def get_orgs():
5✔
65
    """Query organizations table."""
66
    headers = {
×
67
        "Content-Type": "application/json",
68
        "access_token": f"{PE_API_KEY}",
69
    }
70
    try:
×
71
        response = requests.post(PE_API_URL, headers=headers).json()
×
72
        return response
×
73
    except requests.exceptions.HTTPError as errh:
×
74
        print(errh)
×
75
    except requests.exceptions.ConnectionError as errc:
×
76
        print(errc)
×
77
    except requests.exceptions.Timeout as errt:
×
78
        print(errt)
×
79
    except requests.exceptions.RequestException as err:
×
80
        print(err)
×
81
    except json.decoder.JSONDecodeError as err:
×
82
        print(err)
×
83

84

85
def get_orgs_df():
5✔
86
    """Query organizations table into a dataframe."""
87
    conn = connect()
×
88
    try:
×
89
        sql = """SELECT * FROM organizations"""
×
90
        pe_orgs_df = pd.read_sql(sql, conn)
×
91
        return pe_orgs_df
×
92
    except (Exception, psycopg2.DatabaseError) as error:
×
93
        LOGGER.error("There was a problem with your database query %s", error)
×
94
    finally:
95
        if conn is not None:
×
96
            close(conn)
×
97

98

99
def query_creds_view(org_uid, start_date, end_date):
5✔
100
    """Query credentials view."""
101
    conn = connect()
×
102
    try:
×
103
        sql = """SELECT * FROM vw_breachcomp
×
104
        WHERE organizations_uid = %(org_uid)s
105
        AND modified_date BETWEEN %(start_date)s AND %(end_date)s"""
106
        df = pd.read_sql(
×
107
            sql,
108
            conn,
109
            params={
110
                "org_uid": sanitize_uid(org_uid),
111
                "start_date": start_date,
112
                "end_date": end_date,
113
            },
114
        )
115
        return df
×
116
    except (Exception, psycopg2.DatabaseError) as error:
×
117
        LOGGER.error("There was a problem with your database query %s", error)
×
118
    finally:
119
        if conn is not None:
×
120
            close(conn)
×
121

122

123
def query_credsbyday_view(org_uid, start_date, end_date):
5✔
124
    """Query the credential exposures per day view."""
125
    conn = connect()
×
126
    try:
×
127
        sql = """SELECT mod_date, no_password, password_included FROM vw_breachcomp_credsbydate
×
128
        WHERE organizations_uid = %(org_uid)s
129
        AND mod_date BETWEEN %(start_date)s AND %(end_date)s"""
130
        df = pd.read_sql(
×
131
            sql,
132
            conn,
133
            params={
134
                "org_uid": sanitize_uid(org_uid),
135
                "start_date": start_date,
136
                "end_date": end_date,
137
            },
138
        )
139
        return df
×
140
    except (Exception, psycopg2.DatabaseError) as error:
×
141
        LOGGER.error("There was a problem with your database query %s", error)
×
142
    finally:
143
        if conn is not None:
×
144
            close(conn)
×
145

146

147
def query_breachdetails_view(org_uid, start_date, end_date):
5✔
148
    """Query the breach details view."""
149
    conn = connect()
×
150
    try:
×
151
        sql = """SELECT breach_name, mod_date modified_date, breach_date, password_included, number_of_creds
×
152
        FROM vw_breachcomp_breachdetails
153
        WHERE organizations_uid = %(org_uid)s
154
        AND mod_date BETWEEN %(start_date)s AND %(end_date)s"""
155
        df = pd.read_sql(
×
156
            sql,
157
            conn,
158
            params={
159
                "org_uid": sanitize_uid(org_uid),
160
                "start_date": start_date,
161
                "end_date": end_date,
162
            },
163
        )
164
        return df
×
165
    except (Exception, psycopg2.DatabaseError) as error:
×
166
        LOGGER.error("There was a problem with your database query %s", error)
×
167
    finally:
168
        if conn is not None:
×
169
            close(conn)
×
170

171

172
def query_domMasq(org_uid, start_date, end_date):
5✔
173
    """Query domain_permuations associated with a given org."""
174
    conn = connect()
×
175
    try:
×
NEW
176
        sql = """SELECT * FROM domain_permutations
×
177
        WHERE organizations_uid = %(org_uid)s
178
        AND date_active BETWEEN %(start_date)s AND %(end_date)s"""
NEW
179
        df = pd.read_sql(
×
180
            sql,
181
            conn,
182
            params={
183
                "org_uid": org_uid,
184
                "start_date": start_date,
185
                "end_date": end_date,
186
            },
187
        )
NEW
188
        return df
×
NEW
189
    except (Exception, psycopg2.DatabaseError) as error:
×
NEW
190
        LOGGER.error("There was a problem with your database query %s", error)
×
191
    finally:
NEW
192
        if conn is not None:
×
NEW
193
            close(conn)
×
194

195

196
def query_domMasq_alerts(org_uid, start_date, end_date):
5✔
197
    """Query domain alerts table."""
NEW
198
    conn = connect()
×
NEW
199
    try:
×
NEW
200
        sql = """SELECT * FROM domain_alerts
×
201
        WHERE organizations_uid = %(org_uid)s
202
        AND date BETWEEN %(start_date)s AND %(end_date)s"""
UNCOV
203
        df = pd.read_sql(
×
204
            sql,
205
            conn,
206
            params={
207
                "org_uid": sanitize_uid(org_uid),
208
                "start_date": start_date,
209
                "end_date": end_date,
210
            },
211
        )
212
        return df
×
213
    except (Exception, psycopg2.DatabaseError) as error:
×
214
        LOGGER.error("There was a problem with your database query %s", error)
×
215
    finally:
216
        if conn is not None:
×
217
            close(conn)
×
218

219

220
# The 'table' parameter is used in query_shodan, query_darkweb and
221
# query_darkweb_cves functions to call specific tables that relate to the
222
# function name.  The result of this implementation reduces the code base,
223
# the code reduction leads to an increase in efficiency by reusing the
224
# function by passing only a parameter to get the required information from
225
# the database.
226

227

228
def query_shodan(org_uid, start_date, end_date, table):
5✔
229
    """Query Shodan table."""
230
    conn = connect()
×
231
    try:
×
NEW
232
        df = pd.DataFrame()
×
NEW
233
        df_list = []
×
NEW
234
        chunk_size = 1000
×
UNCOV
235
        sql = """SELECT * FROM %(table)s
×
236
        WHERE organizations_uid = %(org_uid)s
237
        AND timestamp BETWEEN %(start_date)s AND %(end_date)s"""
NEW
238
        count = 0
×
239
        # Batch SQL call to reduce memory (https://pythonspeed.com/articles/pandas-sql-chunking/)
NEW
240
        for chunk_df in pd.read_sql(
×
241
            sql,
242
            conn,
243
            params={
244
                "table": AsIs(table),
245
                "org_uid": sanitize_uid(org_uid),
246
                "start_date": start_date,
247
                "end_date": end_date,
248
            },
249
            chunksize=chunk_size,
250
        ):
NEW
251
            count += 1
×
NEW
252
            df_list.append(chunk_df)
×
253

NEW
254
        if len(df_list) == 0:
×
NEW
255
            df = pd.read_sql(
×
256
                sql,
257
                conn,
258
                params={
259
                    "table": AsIs(table),
260
                    "org_uid": org_uid,
261
                    "start_date": start_date,
262
                    "end_date": end_date,
263
                },
264
            )
265
        else:
NEW
266
            df = pd.concat(df_list, ignore_index=True)
×
267
        return df
×
268
    except (Exception, psycopg2.DatabaseError) as error:
×
269
        LOGGER.error("There was a problem with your database query %s", error)
×
270
    finally:
271
        if conn is not None:
×
272
            close(conn)
×
273

274

275
def query_darkweb(org_uid, start_date, end_date, table):
5✔
276
    """Query Dark Web table."""
277
    conn = connect()
×
278
    try:
×
279
        sql = """SELECT * FROM %(table)s
×
280
        WHERE organizations_uid = %(org_uid)s
281
        AND date BETWEEN %(start_date)s AND %(end_date)s"""
282

283
        df = pd.read_sql(
×
284
            sql,
285
            conn,
286
            params={
287
                "table": sanitize_string(table),
288
                "org_uid": sanitize_uid(org_uid),
289
                "start_date": start_date,
290
                "end_date": end_date,
291
            },
292
        )
293
        return df
×
294
    except (Exception, psycopg2.DatabaseError) as error:
×
295
        LOGGER.error("There was a problem with your database query %s", error)
×
296
    finally:
297
        if conn is not None:
×
298
            close(conn)
×
299

300

301
def query_darkweb_cves(table):
5✔
302
    """Query Dark Web CVE table."""
303
    conn = connect()
×
304
    try:
×
305
        sql = """SELECT * FROM %(table)s"""
×
306
        df = pd.read_sql(
×
307
            sql,
308
            conn,
309
            params={"table": AsIs(table)},
310
        )
311
        return df
×
312
    except (Exception, psycopg2.DatabaseError) as error:
×
313
        LOGGER.error("There was a problem with your database query %s", error)
×
314
    finally:
315
        if conn is not None:
×
316
            close(conn)
×
317

318

319
def query_cyberSix_creds(org_uid, start_date, end_date):
5✔
320
    """Query cybersix_exposed_credentials table."""
321
    conn = connect()
×
322
    try:
×
323
        sql = """SELECT * FROM public.cybersix_exposed_credentials as creds
×
324
        WHERE organizations_uid = %(org_uid)s
325
        AND breach_date BETWEEN %(start)s AND %(end)s"""
326
        df = pd.read_sql(
×
327
            sql,
328
            conn,
329
            params={
330
                "org_uid": sanitize_uid(org_uid),
331
                "start": start_date,
332
                "end": end_date,
333
            },
334
        )
335
        df["breach_date_str"] = pd.to_datetime(df["breach_date"]).dt.strftime(
×
336
            "%m/%d/%Y"
337
        )
338
        df.loc[df["breach_name"] == "", "breach_name"] = (
×
339
            "Cyber_six_" + df["breach_date_str"]
340
        )
341
        df["description"] = (
×
342
            df["description"].str.split("Query to find the related").str[0]
343
        )
344
        df["password_included"] = np.where(df["password"] != "", True, False)
×
345
        return df
×
346
    except (Exception, psycopg2.DatabaseError) as error:
×
347
        LOGGER.error("There was a problem with your database query %s", error)
×
348
    finally:
349
        if conn is not None:
×
350
            close(conn)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc