• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pe-reports / 5487713236

pending completion
5487713236

Pull #597

github

web-flow
Merge f2aedefe6 into 14755187f
Pull Request #597: Second order sql injection fixes

49 of 336 branches covered (14.58%)

Branch coverage included in aggregate %.

5 of 10 new or added lines in 2 files covered. (50.0%)

1 existing line in 1 file now uncovered.

400 of 1361 relevant lines covered (29.39%)

1.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.07
/src/pe_reports/data/db_query.py
1
"""Query the PE PostgreSQL database."""
2

3
# Standard Python Libraries
4
import logging
5✔
5
import re
5✔
6
import sys
5✔
7

8
# Third-Party Libraries
9
import numpy as np
5✔
10
import pandas as pd
5✔
11
import psycopg2
5✔
12
from psycopg2 import OperationalError
5✔
13
from psycopg2.extensions import AsIs
5✔
14

15
from .config import config
5✔
16

17
# Setup logging to central file
18
# To avoid a circular reference error which occurs when calling app.config["LOGGER"]
19
# we are directly calling the logger here
20
LOGGER = logging.getLogger(__name__)
5✔
21

22
CONN_PARAMS_DIC = config()
5✔
23

24

25
def sanitize_string(string):
5✔
26
    """Remove special characters from string."""
NEW
27
    return re.sub(r"[^a-zA-Z0-9\s]", "", string)
×
28

29

30
def sanitize_uid(string):
5✔
31
    """Remove special characters from uids."""
NEW
32
    return re.sub(r"[^a-zA-Z0-9\-\s]", "", string)
×
33

34

35
def show_psycopg2_exception(err):
5✔
36
    """Handle errors for PostgreSQL issues."""
37
    err_type, err_obj, traceback = sys.exc_info()
5✔
38
    LOGGER.error(
5✔
39
        "Database connection error: %s on line number: %s", err, traceback.tb_lineno
40
    )
41

42

43
def connect():
5✔
44
    """Connect to PostgreSQL database."""
45
    conn = None
5✔
46
    try:
5✔
47
        conn = psycopg2.connect(**CONN_PARAMS_DIC)
5✔
48
    except OperationalError as err:
5✔
49
        show_psycopg2_exception(err)
5✔
50
        conn = None
5✔
51
    return conn
5✔
52

53

54
def close(conn):
5✔
55
    """Close connection to PostgreSQL."""
56
    conn.close()
×
57
    return
×
58

59

60
def get_orgs(conn):
5✔
61
    """Query organizations table for orgs we report on."""
62
    try:
×
63
        cur = conn.cursor()
×
64
        sql = """SELECT * FROM organizations WHERE report_on"""
×
65
        cur.execute(sql)
×
66
        pe_orgs = cur.fetchall()
×
67
        cur.close()
×
68
        return pe_orgs
×
69
    except (Exception, psycopg2.DatabaseError) as error:
×
70
        LOGGER.error("There was a problem with your database query %s", error)
×
71
    finally:
72
        if conn is not None:
×
73
            close(conn)
×
74

75

76
def get_orgs_df():
5✔
77
    """Query organizations table into a dataframe."""
78
    conn = connect()
×
79
    try:
×
80
        sql = """SELECT * FROM organizations"""
×
81
        pe_orgs_df = pd.read_sql(sql, conn)
×
82
        return pe_orgs_df
×
83
    except (Exception, psycopg2.DatabaseError) as error:
×
84
        LOGGER.error("There was a problem with your database query %s", error)
×
85
    finally:
86
        if conn is not None:
×
87
            close(conn)
×
88

89

90
def query_creds_view(org_uid, start_date, end_date):
5✔
91
    """Query credentials view."""
92
    conn = connect()
×
93
    try:
×
94
        sql = """SELECT * FROM vw_breachcomp
×
95
        WHERE organizations_uid = %(org_uid)s
96
        AND modified_date BETWEEN %(start_date)s AND %(end_date)s"""
97
        df = pd.read_sql(
×
98
            sql,
99
            conn,
100
            params={
101
                "org_uid": sanitize_uid(org_uid),
102
                "start_date": start_date,
103
                "end_date": end_date,
104
            },
105
        )
106
        return df
×
107
    except (Exception, psycopg2.DatabaseError) as error:
×
108
        LOGGER.error("There was a problem with your database query %s", error)
×
109
    finally:
110
        if conn is not None:
×
111
            close(conn)
×
112

113

114
def query_credsbyday_view(org_uid, start_date, end_date):
5✔
115
    """Query the credential exposures per day view."""
116
    conn = connect()
×
117
    try:
×
118
        sql = """SELECT mod_date, no_password, password_included FROM vw_breachcomp_credsbydate
×
119
        WHERE organizations_uid = %(org_uid)s
120
        AND mod_date BETWEEN %(start_date)s AND %(end_date)s"""
121
        df = pd.read_sql(
×
122
            sql,
123
            conn,
124
            params={
125
                "org_uid": sanitize_uid(org_uid),
126
                "start_date": start_date,
127
                "end_date": end_date,
128
            },
129
        )
130
        return df
×
131
    except (Exception, psycopg2.DatabaseError) as error:
×
132
        LOGGER.error("There was a problem with your database query %s", error)
×
133
    finally:
134
        if conn is not None:
×
135
            close(conn)
×
136

137

138
def query_breachdetails_view(org_uid, start_date, end_date):
5✔
139
    """Query the breach details view."""
140
    conn = connect()
×
141
    try:
×
142
        sql = """SELECT breach_name, mod_date modified_date, breach_date, password_included, number_of_creds
×
143
        FROM vw_breachcomp_breachdetails
144
        WHERE organizations_uid = %(org_uid)s
145
        AND mod_date BETWEEN %(start_date)s AND %(end_date)s"""
146
        df = pd.read_sql(
×
147
            sql,
148
            conn,
149
            params={
150
                "org_uid": sanitize_uid(org_uid),
151
                "start_date": start_date,
152
                "end_date": end_date,
153
            },
154
        )
155
        return df
×
156
    except (Exception, psycopg2.DatabaseError) as error:
×
157
        LOGGER.error("There was a problem with your database query %s", error)
×
158
    finally:
159
        if conn is not None:
×
160
            close(conn)
×
161

162

163
def query_domMasq(org_uid, start_date, end_date):
5✔
164
    """Query domain masquerading table."""
165
    conn = connect()
×
166
    try:
×
167
        sql = """SELECT * FROM dnstwist_domain_masq
×
168
        WHERE organizations_uid = %(org_uid)s
169
        AND date_observed BETWEEN %(start_date)s AND %(end_date)s"""
170
        df = pd.read_sql(
×
171
            sql,
172
            conn,
173
            params={
174
                "org_uid": sanitize_uid(org_uid),
175
                "start_date": start_date,
176
                "end_date": end_date,
177
            },
178
        )
179
        return df
×
180
    except (Exception, psycopg2.DatabaseError) as error:
×
181
        LOGGER.error("There was a problem with your database query %s", error)
×
182
    finally:
183
        if conn is not None:
×
184
            close(conn)
×
185

186

187
# The 'table' parameter is used in query_shodan, query_darkweb and
188
# query_darkweb_cves functions to call specific tables that relate to the
189
# function name.  The result of this implementation reduces the code base,
190
# the code reduction leads to an increase in efficiency by reusing the
191
# function by passing only a parameter to get the required information from
192
# the database.
193

194

195
def query_shodan(org_uid, start_date, end_date, table):
5✔
196
    """Query Shodan table."""
197
    conn = connect()
×
198
    try:
×
199
        sql = """SELECT * FROM %(table)s
×
200
        WHERE organizations_uid = %(org_uid)s
201
        AND timestamp BETWEEN %(start_date)s AND %(end_date)s"""
202
        df = pd.read_sql(
×
203
            sql,
204
            conn,
205
            params={
206
                "table": AsIs(table),
207
                "org_uid": sanitize_uid(org_uid),
208
                "start_date": start_date,
209
                "end_date": end_date,
210
            },
211
        )
212
        return df
×
213
    except (Exception, psycopg2.DatabaseError) as error:
×
214
        LOGGER.error("There was a problem with your database query %s", error)
×
215
    finally:
216
        if conn is not None:
×
217
            close(conn)
×
218

219

220
def query_darkweb(org_uid, start_date, end_date, table):
5✔
221
    """Query Dark Web table."""
222
    conn = connect()
×
223
    try:
×
224
        sql = """SELECT * FROM %(table)s
×
225
        WHERE organizations_uid = %(org_uid)s
226
        AND date BETWEEN %(start_date)s AND %(end_date)s"""
227

UNCOV
228
        df = pd.read_sql(
×
229
            sql,
230
            conn,
231
            params={
232
                "table": sanitize_string(table),
233
                "org_uid": sanitize_uid(org_uid),
234
                "start_date": start_date,
235
                "end_date": end_date,
236
            },
237
        )
238
        return df
×
239
    except (Exception, psycopg2.DatabaseError) as error:
×
240
        LOGGER.error("There was a problem with your database query %s", error)
×
241
    finally:
242
        if conn is not None:
×
243
            close(conn)
×
244

245

246
def query_darkweb_cves(table):
5✔
247
    """Query Dark Web CVE table."""
248
    conn = connect()
×
249
    try:
×
250
        sql = """SELECT * FROM %(table)s"""
×
251
        df = pd.read_sql(
×
252
            sql,
253
            conn,
254
            params={"table": AsIs(table)},
255
        )
256
        return df
×
257
    except (Exception, psycopg2.DatabaseError) as error:
×
258
        LOGGER.error("There was a problem with your database query %s", error)
×
259
    finally:
260
        if conn is not None:
×
261
            close(conn)
×
262

263

264
def query_cyberSix_creds(org_uid, start_date, end_date):
5✔
265
    """Query cybersix_exposed_credentials table."""
266
    conn = connect()
×
267
    try:
×
268
        sql = """SELECT * FROM public.cybersix_exposed_credentials as creds
×
269
        WHERE organizations_uid = %(org_uid)s
270
        AND breach_date BETWEEN %(start)s AND %(end)s"""
271
        df = pd.read_sql(
×
272
            sql,
273
            conn,
274
            params={
275
                "org_uid": sanitize_uid(org_uid),
276
                "start": start_date,
277
                "end": end_date,
278
            },
279
        )
280
        df["breach_date_str"] = pd.to_datetime(df["breach_date"]).dt.strftime(
×
281
            "%m/%d/%Y"
282
        )
283
        df.loc[df["breach_name"] == "", "breach_name"] = (
×
284
            "Cyber_six_" + df["breach_date_str"]
285
        )
286
        df["description"] = (
×
287
            df["description"].str.split("Query to find the related").str[0]
288
        )
289
        df["password_included"] = np.where(df["password"] != "", True, False)
×
290
        return df
×
291
    except (Exception, psycopg2.DatabaseError) as error:
×
292
        LOGGER.error("There was a problem with your database query %s", error)
×
293
    finally:
294
        if conn is not None:
×
295
            close(conn)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc