• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pe-reports / 5603459872

pending completion
5603459872

Pull #640

github

web-flow
Merge 7f0e5d976 into 818ecce10
Pull Request #640: Replace Get_orgs

49 of 337 branches covered (14.54%)

Branch coverage included in aggregate %.

5 of 18 new or added lines in 1 file covered. (27.78%)

41 existing lines in 1 file now uncovered.

409 of 1376 relevant lines covered (29.72%)

1.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.31
/src/pe_reports/data/db_query.py
1
"""Query the PE PostgreSQL database."""
2

3
# Standard Python Libraries
4
import logging
5✔
5
import re
5✔
6
import sys
5✔
7

8
# Third-Party Libraries
9
import requests
5✔
10
import json
5✔
11
import numpy as np
5✔
12
import pandas as pd
5✔
13
import psycopg2
5✔
14
from psycopg2 import OperationalError
5✔
15
from psycopg2.extensions import AsIs
5✔
16

17
from .config import config
5✔
18

19
# Setup logging to central file
20
# To avoid a circular reference error which occurs when calling app.config["LOGGER"]
21
# we are directly calling the logger here
22
LOGGER = logging.getLogger(__name__)
5✔
23

24
CONN_PARAMS_DIC = config()
5✔
25
PE_API_KEY = config(section='peapi').get('api_key') 
5✔
26
PE_API_URL = config(section='peapi').get('api_url')
5✔
27

28
def sanitize_string(string):
5✔
29
    """Remove special characters from string."""
UNCOV
30
    return re.sub(r"[^a-zA-Z0-9\s]", "", string)
×
31

32

33
def sanitize_uid(string):
5✔
34
    """Remove special characters from uids."""
UNCOV
35
    return re.sub(r"[^a-zA-Z0-9\-\s]", "", string)
×
36

37

38

39

40

41
def show_psycopg2_exception(err):
5✔
42
    """Handle errors for PostgreSQL issues."""
43
    err_type, err_obj, traceback = sys.exc_info()
5✔
44
    LOGGER.error(
5✔
45
        "Database connection error: %s on line number: %s", err, traceback.tb_lineno
46
    )
47

48

49
def connect():
5✔
50
    """Connect to PostgreSQL database."""
51
    conn = None
5✔
52
    try:
5✔
53
        conn = psycopg2.connect(**CONN_PARAMS_DIC)
5✔
54
    except OperationalError as err:
5✔
55
        show_psycopg2_exception(err)
5✔
56
        conn = None
5✔
57
    return conn
5✔
58

59

60
def close(conn):
5✔
61
    """Close connection to PostgreSQL."""
UNCOV
62
    conn.close()
×
UNCOV
63
    return
×
64

65
def get_orgs():
5✔
66
    """Query organizations table."""
NEW
67
    headers = {
×
68
        "Content-Type": "application/json",
69
        "access_token": f'{PE_API_KEY}',
70
    }
71
    try:
×
NEW
72
        response = requests.post(PE_API_URL, headers=headers).json()
×
NEW
73
        return response
×
NEW
74
    except requests.exceptions.HTTPError as errh:
×
NEW
75
        print(errh)
×
NEW
76
    except requests.exceptions.ConnectionError as errc:
×
NEW
77
        print(errc)
×
NEW
78
    except requests.exceptions.Timeout as errt:
×
NEW
79
        print(errt)
×
NEW
80
    except requests.exceptions.RequestException as err:
×
NEW
81
        print(err)
×
NEW
82
    except json.decoder.JSONDecodeError as err:
×
NEW
UNCOV
83
        print(err)
×
84

85

86
def get_orgs_df():
5✔
87
    """Query organizations table into a dataframe."""
88
    conn = connect()
×
89
    try:
×
90
        sql = """SELECT * FROM organizations"""
×
91
        pe_orgs_df = pd.read_sql(sql, conn)
×
92
        return pe_orgs_df
×
93
    except (Exception, psycopg2.DatabaseError) as error:
×
UNCOV
94
        LOGGER.error("There was a problem with your database query %s", error)
×
95
    finally:
96
        if conn is not None:
×
UNCOV
97
            close(conn)
×
98

99

100
def query_creds_view(org_uid, start_date, end_date):
5✔
101
    """Query credentials view."""
102
    conn = connect()
×
103
    try:
×
UNCOV
104
        sql = """SELECT * FROM vw_breachcomp
×
105
        WHERE organizations_uid = %(org_uid)s
106
        AND modified_date BETWEEN %(start_date)s AND %(end_date)s"""
UNCOV
107
        df = pd.read_sql(
×
108
            sql,
109
            conn,
110
            params={
111
                "org_uid": sanitize_uid(org_uid),
112
                "start_date": start_date,
113
                "end_date": end_date,
114
            },
115
        )
116
        return df
×
117
    except (Exception, psycopg2.DatabaseError) as error:
×
UNCOV
118
        LOGGER.error("There was a problem with your database query %s", error)
×
119
    finally:
120
        if conn is not None:
×
UNCOV
121
            close(conn)
×
122

123

124
def query_credsbyday_view(org_uid, start_date, end_date):
5✔
125
    """Query the credential exposures per day view."""
126
    conn = connect()
×
127
    try:
×
UNCOV
128
        sql = """SELECT mod_date, no_password, password_included FROM vw_breachcomp_credsbydate
×
129
        WHERE organizations_uid = %(org_uid)s
130
        AND mod_date BETWEEN %(start_date)s AND %(end_date)s"""
UNCOV
131
        df = pd.read_sql(
×
132
            sql,
133
            conn,
134
            params={
135
                "org_uid": sanitize_uid(org_uid),
136
                "start_date": start_date,
137
                "end_date": end_date,
138
            },
139
        )
140
        return df
×
141
    except (Exception, psycopg2.DatabaseError) as error:
×
UNCOV
142
        LOGGER.error("There was a problem with your database query %s", error)
×
143
    finally:
144
        if conn is not None:
×
UNCOV
145
            close(conn)
×
146

147

148
def query_breachdetails_view(org_uid, start_date, end_date):
5✔
149
    """Query the breach details view."""
150
    conn = connect()
×
151
    try:
×
UNCOV
152
        sql = """SELECT breach_name, mod_date modified_date, breach_date, password_included, number_of_creds
×
153
        FROM vw_breachcomp_breachdetails
154
        WHERE organizations_uid = %(org_uid)s
155
        AND mod_date BETWEEN %(start_date)s AND %(end_date)s"""
UNCOV
156
        df = pd.read_sql(
×
157
            sql,
158
            conn,
159
            params={
160
                "org_uid": sanitize_uid(org_uid),
161
                "start_date": start_date,
162
                "end_date": end_date,
163
            },
164
        )
165
        return df
×
166
    except (Exception, psycopg2.DatabaseError) as error:
×
UNCOV
167
        LOGGER.error("There was a problem with your database query %s", error)
×
168
    finally:
169
        if conn is not None:
×
UNCOV
170
            close(conn)
×
171

172

173
def query_domMasq(org_uid, start_date, end_date):
5✔
174
    """Query domain masquerading table."""
175
    conn = connect()
×
176
    try:
×
UNCOV
177
        sql = """SELECT * FROM dnstwist_domain_masq
×
178
        WHERE organizations_uid = %(org_uid)s
179
        AND date_observed BETWEEN %(start_date)s AND %(end_date)s"""
UNCOV
180
        df = pd.read_sql(
×
181
            sql,
182
            conn,
183
            params={
184
                "org_uid": sanitize_uid(org_uid),
185
                "start_date": start_date,
186
                "end_date": end_date,
187
            },
188
        )
189
        return df
×
190
    except (Exception, psycopg2.DatabaseError) as error:
×
UNCOV
191
        LOGGER.error("There was a problem with your database query %s", error)
×
192
    finally:
193
        if conn is not None:
×
UNCOV
194
            close(conn)
×
195

196

197
# The 'table' parameter is used in query_shodan, query_darkweb and
198
# query_darkweb_cves functions to call specific tables that relate to the
199
# function name.  The result of this implementation reduces the code base,
200
# the code reduction leads to an increase in efficiency by reusing the
201
# function by passing only a parameter to get the required information from
202
# the database.
203

204

205
def query_shodan(org_uid, start_date, end_date, table):
5✔
206
    """Query Shodan table."""
207
    conn = connect()
×
208
    try:
×
UNCOV
209
        sql = """SELECT * FROM %(table)s
×
210
        WHERE organizations_uid = %(org_uid)s
211
        AND timestamp BETWEEN %(start_date)s AND %(end_date)s"""
UNCOV
212
        df = pd.read_sql(
×
213
            sql,
214
            conn,
215
            params={
216
                "table": AsIs(table),
217
                "org_uid": sanitize_uid(org_uid),
218
                "start_date": start_date,
219
                "end_date": end_date,
220
            },
221
        )
222
        return df
×
223
    except (Exception, psycopg2.DatabaseError) as error:
×
UNCOV
224
        LOGGER.error("There was a problem with your database query %s", error)
×
225
    finally:
226
        if conn is not None:
×
UNCOV
227
            close(conn)
×
228

229

230
def query_darkweb(org_uid, start_date, end_date, table):
5✔
231
    """Query Dark Web table."""
232
    conn = connect()
×
233
    try:
×
UNCOV
234
        sql = """SELECT * FROM %(table)s
×
235
        WHERE organizations_uid = %(org_uid)s
236
        AND date BETWEEN %(start_date)s AND %(end_date)s"""
237

UNCOV
238
        df = pd.read_sql(
×
239
            sql,
240
            conn,
241
            params={
242
                "table": sanitize_string(table),
243
                "org_uid": sanitize_uid(org_uid),
244
                "start_date": start_date,
245
                "end_date": end_date,
246
            },
247
        )
248
        return df
×
249
    except (Exception, psycopg2.DatabaseError) as error:
×
UNCOV
250
        LOGGER.error("There was a problem with your database query %s", error)
×
251
    finally:
252
        if conn is not None:
×
UNCOV
253
            close(conn)
×
254

255

256
def query_darkweb_cves(table):
5✔
257
    """Query Dark Web CVE table."""
258
    conn = connect()
×
259
    try:
×
260
        sql = """SELECT * FROM %(table)s"""
×
UNCOV
261
        df = pd.read_sql(
×
262
            sql,
263
            conn,
264
            params={"table": AsIs(table)},
265
        )
266
        return df
×
267
    except (Exception, psycopg2.DatabaseError) as error:
×
UNCOV
268
        LOGGER.error("There was a problem with your database query %s", error)
×
269
    finally:
270
        if conn is not None:
×
UNCOV
271
            close(conn)
×
272

273

274
def query_cyberSix_creds(org_uid, start_date, end_date):
5✔
275
    """Query cybersix_exposed_credentials table."""
276
    conn = connect()
×
277
    try:
×
UNCOV
278
        sql = """SELECT * FROM public.cybersix_exposed_credentials as creds
×
279
        WHERE organizations_uid = %(org_uid)s
280
        AND breach_date BETWEEN %(start)s AND %(end)s"""
UNCOV
281
        df = pd.read_sql(
×
282
            sql,
283
            conn,
284
            params={
285
                "org_uid": sanitize_uid(org_uid),
286
                "start": start_date,
287
                "end": end_date,
288
            },
289
        )
UNCOV
290
        df["breach_date_str"] = pd.to_datetime(df["breach_date"]).dt.strftime(
×
291
            "%m/%d/%Y"
292
        )
UNCOV
293
        df.loc[df["breach_name"] == "", "breach_name"] = (
×
294
            "Cyber_six_" + df["breach_date_str"]
295
        )
UNCOV
296
        df["description"] = (
×
297
            df["description"].str.split("Query to find the related").str[0]
298
        )
299
        df["password_included"] = np.where(df["password"] != "", True, False)
×
300
        return df
×
301
    except (Exception, psycopg2.DatabaseError) as error:
×
UNCOV
302
        LOGGER.error("There was a problem with your database query %s", error)
×
303
    finally:
304
        if conn is not None:
×
UNCOV
305
            close(conn)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc