• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pe-reports / 5268386652

pending completion
5268386652

Pull #565

github

web-flow
Merge 40eba2026 into 14755187f
Pull Request #565: Update report generator to use reportlab

79 of 415 branches covered (19.04%)

Branch coverage included in aggregate %.

404 of 676 new or added lines in 7 files covered. (59.76%)

16 existing lines in 5 files now uncovered.

748 of 1804 relevant lines covered (41.46%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.7
/src/pe_reports/metrics.py
1
"""Class methods for report metrics."""
2

3
# Import query functions
4
# Standard Python Libraries
5
import datetime
5✔
6

7
# Third-Party Libraries
8
import pandas as pd
5✔
9

10
from .data.db_query import (
5✔
11
    query_breachdetails_view,
12
    query_creds_view,
13
    query_credsbyday_view,
14
    query_darkweb,
15
    query_darkweb_cves,
16
    query_domMasq,
17
    query_domMasq_alerts,
18
    query_shodan,
19
)
20

21

22
class Credentials:
5✔
23
    """Credentials class."""
24

25
    def __init__(self, trending_start_date, start_date, end_date, org_uid):
5✔
26
        """Initialize credentials class."""
27
        self.trending_start_date = trending_start_date
5✔
28
        self.start_date = start_date
5✔
29
        self.end_date = end_date
5✔
30
        self.org_uid = org_uid
5✔
31
        self.trending_creds_view = query_creds_view(
5✔
32
            org_uid, trending_start_date, end_date
33
        )
34
        self.creds_view = query_creds_view(org_uid, start_date, end_date)
5✔
35
        self.creds_by_day = query_credsbyday_view(
5✔
36
            org_uid, trending_start_date, end_date
37
        )
38
        self.breach_details_view = query_breachdetails_view(
5✔
39
            org_uid, start_date, end_date
40
        )
41

42
    def by_week(self):
5✔
43
        """Return number of credentials by week."""
44
        df = self.creds_by_day
5✔
45
        idx = pd.date_range(self.trending_start_date, self.end_date)
5✔
46
        df = df.set_index("mod_date").reindex(idx).fillna(0.0).rename_axis("added_date")
5✔
47
        group_limit = self.end_date + datetime.timedelta(1)
5✔
48
        df = df.groupby(
5✔
49
            pd.Grouper(level="added_date", freq="7d", origin=group_limit)
50
        ).sum()
51
        df["modified_date"] = df.index
5✔
52
        df["modified_date"] = df["modified_date"].dt.strftime("%b %d")
5✔
53
        df = df.set_index("modified_date")
5✔
54
        df = df.rename(
5✔
55
            columns={
56
                "password_included": "Passwords Included",
57
                "no_password": "No Password",
58
            }
59
        )
60
        if len(df.columns) == 0:
5!
61
            df["Passwords Included"] = 0
×
62
        return df
5✔
63

64
    def breaches(self):
5✔
65
        """Return total number of breaches."""
66
        all_breaches = self.creds_view["breach_name"]
5✔
67
        return all_breaches.nunique()
5✔
68

69
    def breach_appendix(self):
5✔
70
        """Return breach name and description to be added to the appendix."""
71
        view_df = self.creds_view
5✔
72
        view_df = view_df[["breach_name", "description"]]
5✔
73

74
        view_df = view_df.drop_duplicates()
5✔
75
        view_df.sort_values("breach_name", inplace=True)
5✔
76
        return view_df[["breach_name", "description"]]
5✔
77

78
    def breach_details(self):
5✔
79
        """Return breach details."""
80
        breach_df = self.breach_details_view
5✔
81
        breach_det_df = breach_df.rename(columns={"modified_date": "update_date"})
5✔
82
        breach_det_df["update_date"] = pd.to_datetime(breach_det_df["update_date"])
5✔
83
        if len(breach_det_df) > 0:
5!
84
            breach_det_df["update_date"] = breach_det_df["update_date"].dt.strftime(
5✔
85
                "%m/%d/%y"
86
            )
87
            breach_det_df["breach_date"] = pd.to_datetime(
5✔
88
                breach_det_df["breach_date"]
89
            ).dt.strftime("%m/%d/%y")
90

91
        breach_det_df = breach_det_df.rename(
5✔
92
            columns={
93
                "breach_name": "Breach Name",
94
                "breach_date": "Breach Date",
95
                "update_date": "Date Reported",
96
                "password_included": "Password Included",
97
                "number_of_creds": "Number of Creds",
98
            }
99
        )
100
        return breach_det_df
5✔
101

102
    def password(self):
5✔
103
        """Return total number of credentials with passwords."""
104
        return len(self.creds_view[self.creds_view["password_included"]])
5✔
105

106
    def total(self):
5✔
107
        """Return total number of credentials found in breaches."""
108
        return self.creds_view.shape[0]
5✔
109

110

111
class Domains_Masqs:
5✔
112
    """Domains Masquerading class."""
113

114
    def __init__(self, start_date, end_date, org_uid):
5✔
115
        """Initialize domains masquerading class."""
116
        self.start_date = start_date
×
117
        self.end_date = end_date
×
118
        self.org_uid = org_uid
×
119
        df = query_domMasq(org_uid, start_date, end_date)
×
120
        self.df_mal = df[df["malicious"]]
×
NEW
121
        self.dom_alerts_df = query_domMasq_alerts(org_uid, start_date, end_date)
×
122

123
    def count(self):
5✔
124
        """Return total count of malicious domains."""
125
        df = self.df_mal
×
126
        return len(df.index)
×
127

128
    def summary(self):
5✔
129
        """Return domain masquerading summary information."""
130
        if len(self.df_mal) > 0:
×
131
            domain_sum = self.df_mal[
×
132
                [
133
                    "domain_permutation",
134
                    "ipv4",
135
                    "ipv6",
136
                    "mail_server",
137
                    "name_server",
138
                ]
139
            ]
NEW
140
            domain_sum.loc[domain_sum["ipv6"] == "", "ipv6"] = "NA"
×
141
            domain_sum = domain_sum.rename(
×
142
                columns={
143
                    "domain_permutation": "Domain",
144
                    "ipv4": "IPv4",
145
                    "ipv6": "IPv6",
146
                    "mail_server": "Mail Server",
147
                    "name_server": "Name Server",
148
                }
149
            )
150
        else:
151
            domain_sum = pd.DataFrame(
×
152
                columns=[
153
                    "Domain",
154
                    "IPv4",
155
                    "IPv6",
156
                    "Mail Server",
157
                    "Name Server",
158
                ]
159
            )
160
        return domain_sum
×
161

162
    def alert_count(self):
5✔
163
        """Return number of alerts."""
NEW
164
        dom_alert_count = len(self.dom_alerts_df)
×
NEW
165
        return dom_alert_count
×
166

167
    def alerts(self):
5✔
168
        """Return domain alerts."""
NEW
169
        dom_alerts_df = self.dom_alerts_df[["message", "date"]]
×
NEW
170
        dom_alerts_df = dom_alerts_df.rename(
×
171
            columns={"message": "Alert", "date": "Date"}
172
        )
NEW
173
        return dom_alerts_df
×
174

175
    def alerts_sum(self):
5✔
176
        """Return domain alerts summary."""
NEW
177
        dom_alerts_sum = self.dom_alerts_df[
×
178
            ["message", "date", "previous_value", "new_value"]
179
        ]
NEW
180
        return dom_alerts_sum
×
181

182

183
class Malware_Vulns:
5✔
184
    """Malware and Vulnerabilities Class."""
185

186
    def __init__(self, start_date, end_date, org_uid):
5✔
187
        """Initialize Shodan vulns and malware class."""
188
        self.start_date = start_date
×
189
        self.end_date = end_date
×
190
        self.org_uid = org_uid
×
191
        insecure_df = query_shodan(
×
192
            org_uid,
193
            start_date,
194
            end_date,
195
            "vw_shodanvulns_suspected",
196
        )
197
        self.insecure_df = insecure_df
×
198

199
        vulns_df = query_shodan(
×
200
            org_uid, start_date, end_date, "vw_shodanvulns_verified"
201
        )
202
        vulns_df["port"] = vulns_df["port"].astype(str)
×
203
        self.vulns_df = vulns_df
×
204

205
        assets_df = query_shodan(org_uid, start_date, end_date, "shodan_assets")
×
206
        self.assets_df = assets_df
×
207

208
    @staticmethod
5✔
209
    def isolate_risky_assets(df):
3✔
210
        """Return risky assets from the insecure_df dataframe."""
211
        insecure = df[df["type"] == "Insecure Protocol"]
×
212
        insecure = insecure[
×
213
            (insecure["protocol"] != "http") & (insecure["protocol"] != "smtp")
214
        ]
215
        insecure["port"] = insecure["port"].astype(str)
×
216
        return insecure[["protocol", "ip", "port"]].drop_duplicates(keep="first")
×
217

218
    def insecure_protocols(self):
5✔
219
        """Get risky assets grouped by protocol."""
220
        risky_assets = self.isolate_risky_assets(self.insecure_df)
×
221
        risky_assets = (
×
222
            risky_assets.groupby("protocol")
223
            .agg(lambda x: "  ".join(set(x)))
224
            .reset_index()
225
        )
226
        # Limit the IP column to 32 characters so the table isn't too big.
227
        # 30 characters is the max length of 2 IPs, plus the 2 spaces.
228
        if len(risky_assets.index) > 0:
×
229
            risky_assets["ip"] = risky_assets["ip"].str[:32]
×
230
            risky_assets.loc[risky_assets["ip"].str.len() == 32, "ip"] = (
×
231
                risky_assets["ip"] + "  ..."
232
            )
233

234
        return risky_assets
×
235

236
    def protocol_count(self):
5✔
237
        """Return a count for each insecure protocol."""
238
        risky_assets = self.isolate_risky_assets(self.insecure_df)
×
239
        # Horizontal bar: insecure protocol count
240
        pro_count = risky_assets.groupby(["protocol"], as_index=False)["protocol"].agg(
×
241
            {"id_count": "count"}
242
        )
243
        return pro_count
×
244

245
    def risky_ports_count(self):
5✔
246
        """Return total count of insecure protocols."""
247
        risky_assets = self.isolate_risky_assets(self.insecure_df)
×
248

249
        pro_count = risky_assets.groupby(["protocol"], as_index=False)["protocol"].agg(
×
250
            {"id_count": "count"}
251
        )
252

253
        # Total Open Ports with Insecure protocols
254
        return pro_count["id_count"].sum()
×
255

256
    def total_verif_vulns(self):
5✔
257
        """Return total count of verified vulns."""
258
        vulns_df = self.vulns_df
×
259
        verif_vulns = (
×
260
            vulns_df[["cve", "ip", "port"]]
261
            .groupby("cve")
262
            .agg(lambda x: "  ".join(set(x)))
263
            .reset_index()
264
        )
265

266
        if len(verif_vulns) > 0:
×
267
            verif_vulns["count"] = verif_vulns["ip"].str.split("  ").str.len()
×
268
            verifVulns = verif_vulns["count"].sum()
×
269

270
        else:
271
            verifVulns = 0
×
272

273
        return verifVulns
×
274

275
    def ip_count(self):
5✔
276
        """Return the number of total ips with suspected and confirmed vulns."""
NEW
277
        vulns_df = self.vulns_df
×
NEW
278
        unverif_df = self.insecure_df
×
279

NEW
280
        combined_ips = vulns_df["ip"].append(unverif_df["ip"], ignore_index=True)
×
281

NEW
282
        return len(pd.unique(combined_ips))
×
283

284
    @staticmethod
5✔
285
    def unverified_cve(df):
3✔
286
        """Subset insecure df to only potential vulnerabilities."""
NEW
287
        unverif_df = df[df["type"] != "Insecure Protocol"]
×
288
        unverif_df = unverif_df.copy()
×
289
        unverif_df["potential_vulns"] = (
×
290
            unverif_df["potential_vulns"].sort_values().apply(lambda x: sorted(x))
291
        )
292
        unverif_df["potential_vulns"] = unverif_df["potential_vulns"].astype("str")
×
293
        unverif_df = (
×
294
            unverif_df[["potential_vulns", "ip"]]
295
            .drop_duplicates(keep="first")
296
            .reset_index(drop=True)
297
        )
NEW
298
        unverif_df["potential_vulns_list"] = unverif_df["potential_vulns"].str.split(
×
299
            ","
300
        )
NEW
301
        unverif_df["count"] = unverif_df["potential_vulns_list"].str.len()
×
NEW
302
        return unverif_df
×
303

304
    def unverified_cve_count(self):
5✔
305
        """Return top 15 unverified CVEs and their counts."""
NEW
306
        unverif_df = self.unverified_cve(self.insecure_df)
×
307
        unverif_df = unverif_df[["ip", "count"]]
×
308
        unverif_df = unverif_df.sort_values(by=["count"], ascending=False)
×
309
        unverif_df = unverif_df[:15].reset_index(drop=True)
×
310
        return unverif_df
×
311

312
    def all_cves(self):
5✔
313
        """Get all verified and unverified CVEs."""
NEW
314
        unverif_df = self.unverified_cve(self.insecure_df)
×
NEW
315
        vulns_df = self.vulns_df
×
NEW
316
        verified_cves = vulns_df["cve"].tolist()
×
NEW
317
        all_cves = []
×
NEW
318
        for unverif_index, unverif_row in unverif_df.iterrows():
×
NEW
319
            for cve in unverif_row["potential_vulns_list"]:
×
NEW
320
                cve = cve.strip("[]' ")
×
NEW
321
                all_cves.append(cve)
×
NEW
322
        all_cves += verified_cves
×
NEW
323
        all_cves = list(set(all_cves))
×
NEW
324
        return all_cves
×
325

326
    def unverified_vuln_count(self):
5✔
327
        """Return the count of IP addresses with unverified vulnerabilities."""
328
        insecure_df = self.insecure_df
×
329
        unverif_df = insecure_df[insecure_df["type"] != "Insecure Protocol"]
×
330
        unverif_df = unverif_df.copy()
×
331
        unverif_df["potential_vulns"] = (
×
332
            unverif_df["potential_vulns"].sort_values().apply(lambda x: sorted(x))
333
        )
334
        unverif_df["potential_vulns"] = unverif_df["potential_vulns"].astype("str")
×
335
        unverif_df = (
×
336
            unverif_df[["potential_vulns", "ip"]]
337
            .drop_duplicates(keep="first")
338
            .reset_index(drop=True)
339
        )
340

341
        return len(unverif_df.index)
×
342

343
    def verif_vulns(self):
5✔
344
        """Return a dataframe with each CVE, the associated IPs and the affected ports."""
345
        vulns_df = self.vulns_df
×
346
        verif_vulns = (
×
347
            vulns_df[["cve", "ip", "port"]]
348
            .groupby("cve")
349
            .agg(lambda x: "  ".join(set(x)))
350
            .reset_index()
351
        )
352
        return verif_vulns
×
353

354
    def verif_vulns_summary(self):
5✔
355
        """Return summary dataframe for verified vulns."""
356
        vulns_df = self.vulns_df
×
357
        verif_vulns_summary = (
×
358
            vulns_df[["cve", "ip", "port", "summary"]]
359
            .groupby("cve")
360
            .agg(lambda x: "  ".join(set(x)))
361
            .reset_index()
362
        )
363

364
        verif_vulns_summary = verif_vulns_summary.rename(
×
365
            columns={
366
                "cve": "CVE",
367
                "ip": "IP",
368
                "port": "Port",
369
                "summary": "Summary",
370
            }
371
        )
372
        return verif_vulns_summary
×
373

374

375
class Cyber_Six:
5✔
376
    """Dark web and Cyber Six data class."""
377

378
    def __init__(
5✔
379
        self,
380
        trending_start_date,
381
        start_date,
382
        end_date,
383
        org_uid,
384
        all_cves_df,
385
        soc_med_included,
386
    ):
387
        """Initialize Cybersixgill vulns and malware class."""
388
        self.trending_start_date = trending_start_date
×
389
        self.start_date = start_date
×
390
        self.end_date = end_date
×
391
        self.org_uid = org_uid
×
NEW
392
        self.all_cves_df = all_cves_df
×
NEW
393
        self.soc_med_included = soc_med_included
×
NEW
394
        self.soc_med_platforms = [
×
395
            "twitter",
396
            "Twitter",
397
            "reddit",
398
            "Reddit",
399
            "Parler",
400
            "parler",
401
            "linkedin",
402
            "Linkedin",
403
            "discord",
404
            "forum_discord",
405
            "raddle",
406
            "telegram",
407
            "jabber",
408
            "ICQ",
409
            "icq",
410
            "mastodon",
411
        ]
UNCOV
412
        dark_web_mentions = query_darkweb(
×
413
            org_uid,
414
            start_date,
415
            end_date,
416
            "mentions",
417
        )
418
        dark_web_mentions = dark_web_mentions.drop(
×
419
            columns=["organizations_uid", "mentions_uid"],
420
            errors="ignore",
421
        )
NEW
422
        if not soc_med_included:
×
NEW
423
            dark_web_mentions = dark_web_mentions[
×
424
                ~dark_web_mentions["site"].isin(self.soc_med_platforms)
425
            ]
UNCOV
426
        self.dark_web_mentions = dark_web_mentions
×
427

428
        alerts = query_darkweb(
×
429
            org_uid,
430
            start_date,
431
            end_date,
432
            "alerts",
433
        )
434
        alerts = alerts.drop(
×
435
            columns=["organizations_uid", "alerts_uid"],
436
            errors="ignore",
437
        )
NEW
438
        if not soc_med_included:
×
NEW
439
            alerts = alerts[~alerts["site"].isin(self.soc_med_platforms)]
×
UNCOV
440
        self.alerts = alerts
×
441

442
        top_cves = query_darkweb_cves(
×
443
            "top_cves",
444
        )
445
        top_cves = top_cves[top_cves["date"] == top_cves["date"].max()]
×
446
        self.top_cves = top_cves
×
447

448
    def dark_web_count(self):
5✔
449
        """Get total number of dark web alerts."""
NEW
450
        return len(self.alerts.index)
×
451

452
    def alerts_exec(self):
5✔
453
        """Get top executive alerts."""
NEW
454
        alerts_exec = query_darkweb(
×
455
            self.org_uid,
456
            self.start_date,
457
            self.end_date,
458
            "vw_darkweb_execalerts",
459
        )
460

NEW
461
        alerts_exec = alerts_exec.drop(
×
462
            columns=["organizations_uid", "date"],
463
            errors="ignore",
464
        )
NEW
465
        if not self.soc_med_included:
×
NEW
466
            alerts_exec = alerts_exec[~alerts_exec["Site"].isin(self.soc_med_platforms)]
×
NEW
467
        alerts_exec["Title"] = alerts_exec["Title"].str[:200]
×
UNCOV
468
        return alerts_exec
×
469

470
    def alerts_threats(self):
5✔
471
        """Get threat alerts."""
NEW
472
        alerts_threats = query_darkweb(
×
473
            self.org_uid,
474
            self.start_date,
475
            self.end_date,
476
            "vw_darkweb_potentialthreats",
477
        )
NEW
478
        alerts_threats = alerts_threats.drop(
×
479
            columns=["organizations_uid", "date"],
480
            errors="ignore",
481
        )
NEW
482
        if not self.soc_med_included:
×
NEW
483
            alerts_threats = alerts_threats[
×
484
                ~alerts_threats["Site"].isin(self.soc_med_platforms)
485
            ]
UNCOV
486
        alerts_threats = (
×
487
            alerts_threats.groupby(["Site", "Threats"])["Threats"]
488
            .count()
489
            .nlargest(10)
490
            .reset_index(name="Events")
491
        )
NEW
492
        alerts_threats["Threats"] = alerts_threats["Threats"].str[:200]
×
493
        return alerts_threats
×
494

495
    def dark_web_mentions_count(self):
5✔
496
        """Get total number of dark web mentions."""
NEW
497
        return len(self.dark_web_mentions)
×
498

499
    def dark_web_content(self):
5✔
500
        """Get dark web categories."""
501
        dark_web_mentions = self.dark_web_mentions
×
502
        dark_web_content = dark_web_mentions[["category"]]
×
503
        dark_web_content = (
×
504
            dark_web_content.groupby(["category"])["category"]
505
            .count()
506
            .nlargest(10)
507
            .reset_index(name="count")
508
        )
509
        return dark_web_content
×
510

511
    def dark_web_date(self):
5✔
512
        """Get dark web mentions by date."""
NEW
513
        trending_dark_web_mentions = query_darkweb(
×
514
            self.org_uid,
515
            self.trending_start_date,
516
            self.end_date,
517
            "vw_darkweb_mentionsbydate",
518
        )
NEW
519
        dark_web_date = trending_dark_web_mentions.drop(
×
520
            columns=["organizations_uid"],
521
            errors="ignore",
522
        )
523
        idx = pd.date_range(self.trending_start_date, self.end_date)
×
524
        dark_web_date = (
×
525
            dark_web_date.set_index("date").reindex(idx).fillna(0.0).rename_axis("date")
526
        )
UNCOV
527
        group_limit = self.end_date + datetime.timedelta(1)
×
528
        dark_web_date = dark_web_date.groupby(
×
529
            pd.Grouper(level="date", freq="7d", origin=group_limit)
530
        ).sum()
531
        dark_web_date["date"] = dark_web_date.index
×
532
        dark_web_date["date"] = dark_web_date["date"].dt.strftime("%m/%d")
×
533
        dark_web_date = dark_web_date.set_index("date")
×
534
        dark_web_date = dark_web_date[["Count"]]
×
535
        return dark_web_date
×
536

537
    def social_media_most_act(self):
5✔
538
        """Get most active social media posts."""
NEW
539
        soc_med_most_act = query_darkweb(
×
540
            self.org_uid,
541
            self.start_date,
542
            self.end_date,
543
            "vw_darkweb_socmedia_mostactposts",
544
        )
NEW
545
        soc_med_most_act = soc_med_most_act.drop(
×
546
            columns=["organizations_uid", "date"],
547
            errors="ignore",
548
        )
NEW
549
        soc_med_most_act = soc_med_most_act[:10]
×
NEW
550
        soc_med_most_act["Title"] = soc_med_most_act["Title"].str[:200]
×
NEW
551
        soc_med_most_act = soc_med_most_act.replace(r"^\s*$", "Untitled", regex=True)
×
NEW
552
        return soc_med_most_act
×
553

554
    def dark_web_most_act(self):
5✔
555
        """Get most active dark web posts."""
NEW
556
        dark_web_most_act = query_darkweb(
×
557
            self.org_uid,
558
            self.start_date,
559
            self.end_date,
560
            "vw_darkweb_mostactposts",
561
        )
NEW
562
        dark_web_most_act = dark_web_most_act.drop(
×
563
            columns=["organizations_uid", "date"],
564
            errors="ignore",
565
        )
NEW
566
        dark_web_most_act = dark_web_most_act[:10]
×
NEW
567
        dark_web_most_act["Title"] = dark_web_most_act["Title"].str[:200]
×
568
        dark_web_most_act = dark_web_most_act.replace(r"^\s*$", "Untitled", regex=True)
×
569
        return dark_web_most_act
×
570

571
    def asset_alerts(self):
5✔
572
        """Get top executive mentions."""
NEW
573
        asset_alerts = query_darkweb(
×
574
            self.org_uid,
575
            self.start_date,
576
            self.end_date,
577
            "vw_darkweb_assetalerts",
578
        )
NEW
579
        asset_alerts = asset_alerts.drop(
×
580
            columns=["organizations_uid", "date"],
581
            errors="ignore",
582
        )
NEW
583
        if not self.soc_med_included:
×
NEW
584
            asset_alerts = asset_alerts[
×
585
                ~asset_alerts["Site"].isin(self.soc_med_platforms)
586
            ]
NEW
587
        asset_alerts["Title"] = asset_alerts["Title"].str[:200]
×
NEW
588
        return asset_alerts
×
589

590
    def dark_web_bad_actors(self):
5✔
591
        """Get dark web bad actors."""
NEW
592
        dark_web_bad_actors = query_darkweb(
×
593
            self.org_uid,
594
            self.start_date,
595
            self.end_date,
596
            "vw_darkweb_threatactors",
597
        )
NEW
598
        dark_web_bad_actors = dark_web_bad_actors.drop(
×
599
            columns=["organizations_uid", "date"],
600
            errors="ignore",
601
        )
NEW
602
        dark_web_bad_actors = dark_web_bad_actors.groupby(
×
603
            "Creator", as_index=False
604
        ).max()
NEW
605
        dark_web_bad_actors = dark_web_bad_actors.sort_values(
×
606
            by=["Grade"], ascending=False
607
        )
NEW
608
        dark_web_bad_actors = dark_web_bad_actors[:10]
×
609

NEW
610
        return dark_web_bad_actors
×
611

612
    def dark_web_sites(self):
5✔
613
        """Get mentions by dark web sites (top 10)."""
NEW
614
        dark_web_sites = query_darkweb(
×
615
            self.org_uid,
616
            self.start_date,
617
            self.end_date,
618
            "vw_darkweb_sites",
619
        )
NEW
620
        dark_web_sites = dark_web_sites.drop(
×
621
            columns=["organizations_uid", "date"],
622
            errors="ignore",
623
        )
NEW
624
        if not self.soc_med_included:
×
NEW
625
            dark_web_sites = dark_web_sites[
×
626
                ~dark_web_sites["Site"].isin(self.soc_med_platforms)
627
            ]
UNCOV
628
        dark_web_sites = (
×
629
            dark_web_sites.groupby(["Site"])["Site"]
630
            .count()
631
            .nlargest(10)
632
            .reset_index(name="count")
633
        )
634
        return dark_web_sites
×
635

636
    def invite_only_markets(self):
5✔
637
        """Get alerts in invite-only markets."""
NEW
638
        markets = query_darkweb(
×
639
            self.org_uid,
640
            self.start_date,
641
            self.end_date,
642
            "vw_darkweb_inviteonlymarkets",
643
        )
NEW
644
        markets = markets.drop(
×
645
            columns=["organizations_uid", "date"],
646
            errors="ignore",
647
        )
NEW
648
        markets = (
×
649
            markets.groupby(["Site"])["Site"]
650
            .count()
651
            .nlargest(10)
652
            .reset_index(name="Alerts")
653
        )
NEW
654
        return markets
×
655

656
    def top_cve_table(self):
5✔
657
        """Get top CVEs."""
658
        top_cves = self.top_cves
×
NEW
659
        top_cves["summary_short"] = top_cves["summary"].str[:500]
×
660
        top_cve_table = top_cves[["cve_id", "summary_short"]]
×
661
        top_cve_table = top_cve_table.rename(
×
662
            columns={"cve_id": "CVE", "summary_short": "Description"}
663
        )
NEW
664
        top_cve_table["Identified By"] = "Cybersixgill"
×
665

666
        # Get all CVEs found in shodan
NEW
667
        shodan_cves = self.all_cves_df
×
NEW
668
        for cve_index, cve_row in top_cve_table.iterrows():
×
NEW
669
            if cve_row["CVE"] in shodan_cves:
×
NEW
670
                top_cve_table.at[cve_index, "Identified By"] += ",   Shodan"
×
671

UNCOV
672
        return top_cve_table
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc