• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pe-reports / 5892227966

17 Aug 2023 02:27PM UTC coverage: 33.736% (+7.0%) from 26.737%
5892227966

Pull #565

github

web-flow
Merge 9adf19bbe into 998fa208f
Pull Request #565: Update report generator to use reportlab

93 of 477 branches covered (19.5%)

Branch coverage included in aggregate %.

443 of 1022 new or added lines in 8 files covered. (43.35%)

18 existing lines in 5 files now uncovered.

801 of 2173 relevant lines covered (36.86%)

1.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.65
/src/pe_reports/metrics.py
1
"""Class methods for report metrics."""
2

3
# Import query functions
4
# Standard Python Libraries
5
import datetime
5✔
6

7
# Third-Party Libraries
8
import pandas as pd
5✔
9

10
from .data.db_query import (
5✔
11
    query_breachdetails_view,
12
    query_creds_view,
13
    query_credsbyday_view,
14
    query_darkweb,
15
    query_darkweb_cves,
16
    query_domMasq,
17
    query_domMasq_alerts,
18
    query_shodan,
19
)
20

21

22
class Credentials:
5✔
23
    """Credentials class."""
24

25
    def __init__(self, trending_start_date, start_date, end_date, org_uid):
5✔
26
        """Initialize credentials class."""
27
        self.trending_start_date = trending_start_date
5✔
28
        self.start_date = start_date
5✔
29
        self.end_date = end_date
5✔
30
        self.org_uid = org_uid
5✔
31
        self.trending_creds_view = query_creds_view(
5✔
32
            org_uid, trending_start_date, end_date
33
        )
34
        self.creds_view = query_creds_view(org_uid, start_date, end_date)
5✔
35
        self.creds_by_day = query_credsbyday_view(
5✔
36
            org_uid, trending_start_date, end_date
37
        )
38
        self.breach_details_view = query_breachdetails_view(
5✔
39
            org_uid, start_date, end_date
40
        )
41

42
    def by_week(self):
5✔
43
        """Return number of credentials by week."""
44
        df = self.creds_by_day
5✔
45
        idx = pd.date_range(self.trending_start_date, self.end_date)
5✔
46
        df = df.set_index("mod_date").reindex(idx).fillna(0.0).rename_axis("added_date")
5✔
47
        group_limit = self.end_date + datetime.timedelta(1)
5✔
48
        df = df.groupby(
5✔
49
            pd.Grouper(level="added_date", freq="7d", origin=group_limit)
50
        ).sum()
51
        df["modified_date"] = df.index
5✔
52
        df["modified_date"] = df["modified_date"].dt.strftime("%b %d")
5✔
53
        df = df.set_index("modified_date")
5✔
54
        df = df.rename(
5✔
55
            columns={
56
                "password_included": "Passwords Included",
57
                "no_password": "No Password",
58
            }
59
        )
60
        if len(df.columns) == 0:
5!
61
            df["Passwords Included"] = 0
×
62
        return df
5✔
63

64
    def breaches(self):
5✔
65
        """Return total number of breaches."""
66
        all_breaches = self.creds_view["breach_name"]
5✔
67
        return all_breaches.nunique()
5✔
68

69
    def breach_appendix(self):
5✔
70
        """Return breach name and description to be added to the appendix."""
71
        view_df = self.creds_view
5✔
72
        view_df = view_df[["breach_name", "description"]]
5✔
73

74
        view_df = view_df.drop_duplicates()
5✔
75
        view_df.sort_values("breach_name", inplace=True)
5✔
76
        return view_df[["breach_name", "description"]]
5✔
77

78
    def breach_details(self):
5✔
79
        """Return breach details."""
80
        breach_df = self.breach_details_view
5✔
81
        breach_det_df = breach_df.rename(columns={"modified_date": "update_date"})
5✔
82
        breach_det_df["update_date"] = pd.to_datetime(breach_det_df["update_date"])
5✔
83
        if len(breach_det_df) > 0:
5!
84
            breach_det_df["update_date"] = breach_det_df["update_date"].dt.strftime(
5✔
85
                "%m/%d/%y"
86
            )
87
            breach_det_df["breach_date"] = pd.to_datetime(
5✔
88
                breach_det_df["breach_date"]
89
            ).dt.strftime("%m/%d/%y")
90

91
        breach_det_df = breach_det_df.rename(
5✔
92
            columns={
93
                "breach_name": "Breach Name",
94
                "breach_date": "Breach Date",
95
                "update_date": "Date Reported",
96
                "password_included": "Password Included",
97
                "number_of_creds": "Number of Creds",
98
            }
99
        )
100
        return breach_det_df
5✔
101

102
    def password(self):
5✔
103
        """Return total number of credentials with passwords."""
104
        return len(self.creds_view[self.creds_view["password_included"]])
5✔
105

106
    def total(self):
5✔
107
        """Return total number of credentials found in breaches."""
108
        return self.creds_view.shape[0]
5✔
109

110

111
class Domains_Masqs:
5✔
112
    """Domains Masquerading class."""
113

114
    def __init__(self, start_date, end_date, org_uid):
5✔
115
        """Initialize domains masquerading class."""
116
        self.start_date = start_date
×
117
        self.end_date = end_date
×
118
        self.org_uid = org_uid
×
119
        df = query_domMasq(org_uid, start_date, end_date)
×
120
        self.df_mal = df[df["malicious"]]
×
NEW
121
        self.dom_alerts_df = query_domMasq_alerts(org_uid, start_date, end_date)
×
122

123
    def count(self):
5✔
124
        """Return total count of malicious domains."""
125
        df = self.df_mal
×
126
        return len(df.index)
×
127

128
    def summary(self):
5✔
129
        """Return domain masquerading summary information."""
130
        if len(self.df_mal) > 0:
×
131
            domain_sum = self.df_mal[
×
132
                [
133
                    "domain_permutation",
134
                    "ipv4",
135
                    "ipv6",
136
                    "mail_server",
137
                    "name_server",
138
                ]
139
            ]
140
            # Replace empty IPv6 values with NA
NEW
141
            domain_sum.loc[domain_sum["ipv6"] == "", "ipv6"] = "NA"
×
UNCOV
142
            domain_sum = domain_sum.rename(
×
143
                columns={
144
                    "domain_permutation": "Domain",
145
                    "ipv4": "IPv4",
146
                    "ipv6": "IPv6",
147
                    "mail_server": "Mail Server",
148
                    "name_server": "Name Server",
149
                }
150
            )
151
        else:
152
            domain_sum = pd.DataFrame(
×
153
                columns=[
154
                    "Domain",
155
                    "IPv4",
156
                    "IPv6",
157
                    "Mail Server",
158
                    "Name Server",
159
                ]
160
            )
161
        return domain_sum
×
162

163
    def alert_count(self):
5✔
164
        """Return number of alerts."""
NEW
165
        dom_alert_count = len(self.dom_alerts_df)
×
NEW
166
        return dom_alert_count
×
167

168
    def alerts(self):
5✔
169
        """Return domain alerts."""
NEW
170
        dom_alerts_df = self.dom_alerts_df[["message", "date"]]
×
NEW
171
        dom_alerts_df = dom_alerts_df.rename(
×
172
            columns={"message": "Alert", "date": "Date"}
173
        )
NEW
174
        return dom_alerts_df
×
175

176
    def alerts_sum(self):
5✔
177
        """Return domain alerts summary."""
NEW
178
        dom_alerts_sum = self.dom_alerts_df[
×
179
            ["message", "date", "previous_value", "new_value"]
180
        ]
NEW
181
        return dom_alerts_sum
×
182

183

184
class Malware_Vulns:
5✔
185
    """Malware and Vulnerabilities Class."""
186

187
    def __init__(self, start_date, end_date, org_uid):
5✔
188
        """Initialize Shodan vulns and malware class."""
189
        self.start_date = start_date
×
190
        self.end_date = end_date
×
191
        self.org_uid = org_uid
×
192
        insecure_df = query_shodan(
×
193
            org_uid,
194
            start_date,
195
            end_date,
196
            "vw_shodanvulns_suspected",
197
        )
198
        self.insecure_df = insecure_df
×
199

200
        vulns_df = query_shodan(
×
201
            org_uid, start_date, end_date, "vw_shodanvulns_verified"
202
        )
203
        vulns_df["port"] = vulns_df["port"].astype(str)
×
204
        self.vulns_df = vulns_df
×
205

206
        assets_df = query_shodan(org_uid, start_date, end_date, "shodan_assets")
×
207
        self.assets_df = assets_df
×
208

209
    @staticmethod
5✔
210
    def isolate_risky_assets(df):
3✔
211
        """Return risky assets from the insecure_df dataframe."""
212
        insecure = df[df["type"] == "Insecure Protocol"]
×
213
        insecure = insecure[
×
214
            (insecure["protocol"] != "http") & (insecure["protocol"] != "smtp")
215
        ]
216
        insecure["port"] = insecure["port"].astype(str)
×
217
        return insecure[["protocol", "ip", "port"]].drop_duplicates(keep="first")
×
218

219
    def insecure_protocols(self):
5✔
220
        """Get risky assets grouped by protocol."""
221
        risky_assets = self.isolate_risky_assets(self.insecure_df)
×
222
        risky_assets = (
×
223
            risky_assets.groupby("protocol")
224
            .agg(lambda x: "  ".join(set(x)))
225
            .reset_index()
226
        )
227
        # Limit the IP column to 32 characters so the table isn't too big.
228
        # 30 characters is the max length of 2 IPs, plus the 2 spaces.
229
        if len(risky_assets.index) > 0:
×
230
            risky_assets["ip"] = risky_assets["ip"].str[:32]
×
231
            risky_assets.loc[risky_assets["ip"].str.len() == 32, "ip"] = (
×
232
                risky_assets["ip"] + "  ..."
233
            )
234

235
        return risky_assets
×
236

237
    def protocol_count(self):
5✔
238
        """Return a count for each insecure protocol."""
239
        risky_assets = self.isolate_risky_assets(self.insecure_df)
×
240
        # Horizontal bar: insecure protocol count
241
        pro_count = risky_assets.groupby(["protocol"], as_index=False)["protocol"].agg(
×
242
            {"id_count": "count"}
243
        )
244
        return pro_count
×
245

246
    def risky_ports_count(self):
5✔
247
        """Return total count of insecure protocols."""
248
        risky_assets = self.isolate_risky_assets(self.insecure_df)
×
249

250
        pro_count = risky_assets.groupby(["protocol"], as_index=False)["protocol"].agg(
×
251
            {"id_count": "count"}
252
        )
253

254
        # Total Open Ports with Insecure protocols
255
        return pro_count["id_count"].sum()
×
256

257
    def total_verif_vulns(self):
5✔
258
        """Return total count of verified vulns."""
259
        vulns_df = self.vulns_df
×
260
        verif_vulns = (
×
261
            vulns_df[["cve", "ip", "port"]]
262
            .groupby("cve")
263
            .agg(lambda x: "  ".join(set(x)))
264
            .reset_index()
265
        )
266

267
        if len(verif_vulns) > 0:
×
268
            verif_vulns["count"] = verif_vulns["ip"].str.split("  ").str.len()
×
269
            verifVulns = verif_vulns["count"].sum()
×
270

271
        else:
272
            verifVulns = 0
×
273

274
        return verifVulns
×
275

276
    def ip_count(self):
5✔
277
        """Return the number of total ips with suspected and confirmed vulns."""
NEW
278
        vulns_df = self.vulns_df
×
NEW
279
        unverif_df = self.insecure_df
×
280

NEW
281
        combined_ips = vulns_df["ip"].append(unverif_df["ip"], ignore_index=True)
×
282

NEW
283
        return len(pd.unique(combined_ips))
×
284

285
    @staticmethod
5✔
286
    def unverified_cve(df):
3✔
287
        """Subset insecure df to only potential vulnerabilities."""
NEW
288
        unverif_df = df[df["type"] != "Insecure Protocol"]
×
289
        unverif_df = unverif_df.copy()
×
290
        unverif_df["potential_vulns"] = (
×
291
            unverif_df["potential_vulns"].sort_values().apply(lambda x: sorted(x))
292
        )
293
        unverif_df["potential_vulns"] = unverif_df["potential_vulns"].astype("str")
×
294
        unverif_df = (
×
295
            unverif_df[["potential_vulns", "ip"]]
296
            .drop_duplicates(keep="first")
297
            .reset_index(drop=True)
298
        )
NEW
299
        unverif_df["potential_vulns_list"] = unverif_df["potential_vulns"].str.split(
×
300
            ","
301
        )
NEW
302
        unverif_df["count"] = unverif_df["potential_vulns_list"].str.len()
×
NEW
303
        return unverif_df
×
304

305
    def unverified_cve_count(self):
5✔
306
        """Return top 15 unverified CVEs and their counts."""
NEW
307
        unverif_df = self.unverified_cve(self.insecure_df)
×
308
        unverif_df = unverif_df[["ip", "count"]]
×
309
        unverif_df = unverif_df.sort_values(by=["count"], ascending=False)
×
310
        unverif_df = unverif_df[:15].reset_index(drop=True)
×
311
        return unverif_df
×
312

313
    def all_cves(self):
5✔
314
        """Get all verified and unverified CVEs."""
NEW
315
        unverif_df = self.unverified_cve(self.insecure_df)
×
NEW
316
        vulns_df = self.vulns_df
×
NEW
317
        verified_cves = vulns_df["cve"].tolist()
×
NEW
318
        all_cves = []
×
NEW
319
        for unverif_index, unverif_row in unverif_df.iterrows():
×
NEW
320
            for cve in unverif_row["potential_vulns_list"]:
×
NEW
321
                cve = cve.strip("[]' ")
×
NEW
322
                all_cves.append(cve)
×
NEW
323
        all_cves += verified_cves
×
NEW
324
        all_cves = list(set(all_cves))
×
NEW
325
        return all_cves
×
326

327
    def unverified_vuln_count(self):
5✔
328
        """Return the count of IP addresses with unverified vulnerabilities."""
329
        insecure_df = self.insecure_df
×
330
        unverif_df = insecure_df[insecure_df["type"] != "Insecure Protocol"]
×
331
        unverif_df = unverif_df.copy()
×
332
        unverif_df["potential_vulns"] = (
×
333
            unverif_df["potential_vulns"].sort_values().apply(lambda x: sorted(x))
334
        )
335
        unverif_df["potential_vulns"] = unverif_df["potential_vulns"].astype("str")
×
336
        unverif_df = (
×
337
            unverif_df[["potential_vulns", "ip"]]
338
            .drop_duplicates(keep="first")
339
            .reset_index(drop=True)
340
        )
341

342
        return len(unverif_df.index)
×
343

344
    def verif_vulns(self):
5✔
345
        """Return a dataframe with each CVE, the associated IPs and the affected ports."""
346
        vulns_df = self.vulns_df
×
347
        verif_vulns = (
×
348
            vulns_df[["cve", "ip", "port"]]
349
            .groupby("cve")
350
            .agg(lambda x: "  ".join(set(x)))
351
            .reset_index()
352
        )
353
        return verif_vulns
×
354

355
    def verif_vulns_summary(self):
5✔
356
        """Return summary dataframe for verified vulns."""
357
        vulns_df = self.vulns_df
×
358
        verif_vulns_summary = (
×
359
            vulns_df[["cve", "ip", "port", "summary"]]
360
            .groupby("cve")
361
            .agg(lambda x: "  ".join(set(x)))
362
            .reset_index()
363
        )
364

365
        verif_vulns_summary = verif_vulns_summary.rename(
×
366
            columns={
367
                "cve": "CVE",
368
                "ip": "IP",
369
                "port": "Port",
370
                "summary": "Summary",
371
            }
372
        )
373
        return verif_vulns_summary
×
374

375

376
class Cyber_Six:
5✔
377
    """Dark web and Cyber Six data class."""
378

379
    def __init__(
5✔
380
        self,
381
        trending_start_date,
382
        start_date,
383
        end_date,
384
        org_uid,
385
        all_cves_df,
386
        soc_med_included,
387
    ):
388
        """Initialize Cybersixgill vulns and malware class."""
389
        self.trending_start_date = trending_start_date
×
390
        self.start_date = start_date
×
391
        self.end_date = end_date
×
392
        self.org_uid = org_uid
×
NEW
393
        self.all_cves_df = all_cves_df
×
NEW
394
        self.soc_med_included = soc_med_included
×
NEW
395
        self.soc_med_platforms = [
×
396
            "twitter",
397
            "Twitter",
398
            "reddit",
399
            "Reddit",
400
            "Parler",
401
            "parler",
402
            "linkedin",
403
            "Linkedin",
404
            "discord",
405
            "forum_discord",
406
            "raddle",
407
            "telegram",
408
            "jabber",
409
            "ICQ",
410
            "icq",
411
            "mastodon",
412
        ]
UNCOV
413
        dark_web_mentions = query_darkweb(
×
414
            org_uid,
415
            start_date,
416
            end_date,
417
            "mentions",
418
        )
419
        dark_web_mentions = dark_web_mentions.drop(
×
420
            columns=["organizations_uid", "mentions_uid"],
421
            errors="ignore",
422
        )
NEW
423
        if not soc_med_included:
×
NEW
424
            dark_web_mentions = dark_web_mentions[
×
425
                ~dark_web_mentions["site"].isin(self.soc_med_platforms)
426
            ]
UNCOV
427
        self.dark_web_mentions = dark_web_mentions
×
428

429
        alerts = query_darkweb(
×
430
            org_uid,
431
            start_date,
432
            end_date,
433
            "alerts",
434
        )
435
        alerts = alerts.drop(
×
436
            columns=["organizations_uid", "alerts_uid"],
437
            errors="ignore",
438
        )
NEW
439
        if not soc_med_included:
×
NEW
440
            alerts = alerts[~alerts["site"].isin(self.soc_med_platforms)]
×
UNCOV
441
        self.alerts = alerts
×
442

443
        top_cves = query_darkweb_cves(
×
444
            "top_cves",
445
        )
446
        top_cves = top_cves[top_cves["date"] == top_cves["date"].max()]
×
447
        self.top_cves = top_cves
×
448

449
    def dark_web_count(self):
5✔
450
        """Get total number of dark web alerts."""
NEW
451
        return len(self.alerts.index)
×
452

453
    def alerts_exec(self):
5✔
454
        """Get top executive alerts."""
NEW
455
        alerts_exec = query_darkweb(
×
456
            self.org_uid,
457
            self.start_date,
458
            self.end_date,
459
            "vw_darkweb_execalerts",
460
        )
461

NEW
462
        alerts_exec = alerts_exec.drop(
×
463
            columns=["organizations_uid", "date"],
464
            errors="ignore",
465
        )
NEW
466
        if not self.soc_med_included:
×
NEW
467
            alerts_exec = alerts_exec[~alerts_exec["Site"].isin(self.soc_med_platforms)]
×
NEW
468
        alerts_exec["Title"] = alerts_exec["Title"].str[:200]
×
UNCOV
469
        return alerts_exec
×
470

471
    def alerts_threats(self):
5✔
472
        """Get threat alerts."""
NEW
473
        alerts_threats = query_darkweb(
×
474
            self.org_uid,
475
            self.start_date,
476
            self.end_date,
477
            "vw_darkweb_potentialthreats",
478
        )
NEW
479
        alerts_threats = alerts_threats.drop(
×
480
            columns=["organizations_uid", "date"],
481
            errors="ignore",
482
        )
NEW
483
        if not self.soc_med_included:
×
NEW
484
            alerts_threats = alerts_threats[
×
485
                ~alerts_threats["Site"].isin(self.soc_med_platforms)
486
            ]
UNCOV
487
        alerts_threats = (
×
488
            alerts_threats.groupby(["Site", "Threats"])["Threats"]
489
            .count()
490
            .nlargest(10)
491
            .reset_index(name="Events")
492
        )
NEW
493
        alerts_threats["Threats"] = alerts_threats["Threats"].str[:200]
×
494
        return alerts_threats
×
495

496
    def dark_web_mentions_count(self):
5✔
497
        """Get total number of dark web mentions."""
NEW
498
        return len(self.dark_web_mentions)
×
499

500
    def dark_web_content(self):
5✔
501
        """Get dark web categories."""
502
        dark_web_mentions = self.dark_web_mentions
×
503
        dark_web_content = dark_web_mentions[["category"]]
×
504
        dark_web_content = (
×
505
            dark_web_content.groupby(["category"])["category"]
506
            .count()
507
            .nlargest(10)
508
            .reset_index(name="count")
509
        )
510
        return dark_web_content
×
511

512
    def dark_web_date(self):
5✔
513
        """Get dark web mentions by date."""
NEW
514
        trending_dark_web_mentions = query_darkweb(
×
515
            self.org_uid,
516
            self.trending_start_date,
517
            self.end_date,
518
            "vw_darkweb_mentionsbydate",
519
        )
NEW
520
        dark_web_date = trending_dark_web_mentions.drop(
×
521
            columns=["organizations_uid"],
522
            errors="ignore",
523
        )
524
        idx = pd.date_range(self.trending_start_date, self.end_date)
×
525
        dark_web_date = (
×
526
            dark_web_date.set_index("date").reindex(idx).fillna(0.0).rename_axis("date")
527
        )
UNCOV
528
        group_limit = self.end_date + datetime.timedelta(1)
×
529
        dark_web_date = dark_web_date.groupby(
×
530
            pd.Grouper(level="date", freq="7d", origin=group_limit)
531
        ).sum()
532
        dark_web_date["date"] = dark_web_date.index
×
533
        dark_web_date["date"] = dark_web_date["date"].dt.strftime("%m/%d")
×
534
        dark_web_date = dark_web_date.set_index("date")
×
535
        dark_web_date = dark_web_date[["Count"]]
×
536
        return dark_web_date
×
537

538
    def social_media_most_act(self):
5✔
539
        """Get most active social media posts."""
NEW
540
        soc_med_most_act = query_darkweb(
×
541
            self.org_uid,
542
            self.start_date,
543
            self.end_date,
544
            "vw_darkweb_socmedia_mostactposts",
545
        )
NEW
546
        soc_med_most_act = soc_med_most_act.drop(
×
547
            columns=["organizations_uid", "date"],
548
            errors="ignore",
549
        )
NEW
550
        soc_med_most_act = soc_med_most_act[:10]
×
NEW
551
        soc_med_most_act["Title"] = soc_med_most_act["Title"].str[:200]
×
NEW
552
        soc_med_most_act = soc_med_most_act.replace(r"^\s*$", "Untitled", regex=True)
×
NEW
553
        return soc_med_most_act
×
554

555
    def dark_web_most_act(self):
5✔
556
        """Get most active dark web posts."""
NEW
557
        dark_web_most_act = query_darkweb(
×
558
            self.org_uid,
559
            self.start_date,
560
            self.end_date,
561
            "vw_darkweb_mostactposts",
562
        )
NEW
563
        dark_web_most_act = dark_web_most_act.drop(
×
564
            columns=["organizations_uid", "date"],
565
            errors="ignore",
566
        )
NEW
567
        dark_web_most_act = dark_web_most_act[:10]
×
NEW
568
        dark_web_most_act["Title"] = dark_web_most_act["Title"].str[:200]
×
569
        dark_web_most_act = dark_web_most_act.replace(r"^\s*$", "Untitled", regex=True)
×
570
        return dark_web_most_act
×
571

572
    def asset_alerts(self):
5✔
573
        """Get top executive mentions."""
NEW
574
        asset_alerts = query_darkweb(
×
575
            self.org_uid,
576
            self.start_date,
577
            self.end_date,
578
            "vw_darkweb_assetalerts",
579
        )
NEW
580
        asset_alerts = asset_alerts.drop(
×
581
            columns=["organizations_uid", "date"],
582
            errors="ignore",
583
        )
NEW
584
        if not self.soc_med_included:
×
NEW
585
            asset_alerts = asset_alerts[
×
586
                ~asset_alerts["Site"].isin(self.soc_med_platforms)
587
            ]
NEW
588
        asset_alerts["Title"] = asset_alerts["Title"].str[:200]
×
NEW
589
        return asset_alerts
×
590

591
    def dark_web_bad_actors(self):
5✔
592
        """Get dark web bad actors."""
NEW
593
        dark_web_bad_actors = query_darkweb(
×
594
            self.org_uid,
595
            self.start_date,
596
            self.end_date,
597
            "vw_darkweb_threatactors",
598
        )
NEW
599
        dark_web_bad_actors = dark_web_bad_actors.drop(
×
600
            columns=["organizations_uid", "date"],
601
            errors="ignore",
602
        )
NEW
603
        dark_web_bad_actors = dark_web_bad_actors.groupby(
×
604
            "Creator", as_index=False
605
        ).max()
NEW
606
        dark_web_bad_actors = dark_web_bad_actors.sort_values(
×
607
            by=["Grade"], ascending=False
608
        )
NEW
609
        dark_web_bad_actors = dark_web_bad_actors[:10]
×
610

NEW
611
        return dark_web_bad_actors
×
612

613
    def dark_web_sites(self):
5✔
614
        """Get mentions by dark web sites (top 10)."""
NEW
615
        dark_web_sites = query_darkweb(
×
616
            self.org_uid,
617
            self.start_date,
618
            self.end_date,
619
            "vw_darkweb_sites",
620
        )
NEW
621
        dark_web_sites = dark_web_sites.drop(
×
622
            columns=["organizations_uid", "date"],
623
            errors="ignore",
624
        )
NEW
625
        if not self.soc_med_included:
×
NEW
626
            dark_web_sites = dark_web_sites[
×
627
                ~dark_web_sites["Site"].isin(self.soc_med_platforms)
628
            ]
UNCOV
629
        dark_web_sites = (
×
630
            dark_web_sites.groupby(["Site"])["Site"]
631
            .count()
632
            .nlargest(10)
633
            .reset_index(name="count")
634
        )
635
        return dark_web_sites
×
636

637
    def invite_only_markets(self):
5✔
638
        """Get alerts in invite-only markets."""
NEW
639
        markets = query_darkweb(
×
640
            self.org_uid,
641
            self.start_date,
642
            self.end_date,
643
            "vw_darkweb_inviteonlymarkets",
644
        )
NEW
645
        markets = markets.drop(
×
646
            columns=["organizations_uid", "date"],
647
            errors="ignore",
648
        )
NEW
649
        markets = (
×
650
            markets.groupby(["Site"])["Site"]
651
            .count()
652
            .nlargest(10)
653
            .reset_index(name="Alerts")
654
        )
NEW
655
        return markets
×
656

657
    def top_cve_table(self):
5✔
658
        """Get top CVEs."""
NEW
659
        top_cves = self.top_cves
×
NEW
660
        top_cves["summary_short"] = top_cves["summary"].str[:500]
×
NEW
661
        top_cve_table = top_cves[["cve_id", "summary_short"]]
×
NEW
662
        top_cve_table = top_cve_table.rename(
×
663
            columns={"cve_id": "CVE", "summary_short": "Description"}
664
        )
NEW
665
        top_cve_table["Identified By"] = "Cybersixgill"
×
666

667
        # Get all CVEs found in shodan
NEW
668
        shodan_cves = self.all_cves_df
×
NEW
669
        for cve_index, cve_row in top_cve_table.iterrows():
×
NEW
670
            if cve_row["CVE"] in shodan_cves:
×
NEW
671
                top_cve_table.at[cve_index, "Identified By"] += ",   Shodan"
×
672

NEW
673
        return top_cve_table
×
674

675

676
class Core_Cyber_Six:
5✔
677
    """Collect relevant CyberSix data for a given Core report."""
678

679
    def __init__(
5✔
680
        self,
681
        all_cves_df,
682
    ):
683
        """Initialize core cybersix data class."""
NEW
684
        self.all_cves_df = all_cves_df
×
NEW
685
        top_cves = query_darkweb_cves(
×
686
            "top_cves",
687
        )
NEW
688
        top_cves = top_cves[top_cves["date"] == top_cves["date"].max()]
×
NEW
689
        self.top_cves = top_cves
×
690

691
    def top_cve_table(self):
5✔
692
        """Get top CVEs."""
693
        top_cves = self.top_cves
×
NEW
694
        top_cves["summary_short"] = top_cves["summary"].str[:500]
×
695
        top_cve_table = top_cves[["cve_id", "summary_short"]]
×
696
        top_cve_table = top_cve_table.rename(
×
697
            columns={"cve_id": "CVE", "summary_short": "Description"}
698
        )
NEW
699
        top_cve_table["Identified By"] = "Cybersixgill"
×
700

701
        # Get all CVEs found in shodan
NEW
702
        shodan_cves = self.all_cves_df
×
NEW
703
        for cve_index, cve_row in top_cve_table.iterrows():
×
NEW
704
            if cve_row["CVE"] in shodan_cves:
×
NEW
705
                top_cve_table.at[cve_index, "Identified By"] += ",   Shodan"
×
706

UNCOV
707
        return top_cve_table
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc