• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / gophish-tools / 4759283117

pending completion
4759283117

push

github

GitHub
Merge pull request #123 from cisagov/lineage/skeleton

141 of 473 branches covered (29.81%)

Branch coverage included in aggregate %.

9 of 24 new or added lines in 10 files covered. (37.5%)

223 existing lines in 5 files now uncovered.

298 of 1270 relevant lines covered (23.46%)

1.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.56
/src/tools/gophish_export.py
1
"""Export all the data from an assessment within Gophish into a single JSON file.
2

3
Usage:
4
  gophish-export [--log-level=LEVEL] ASSESSMENT_ID SERVER API_KEY
5
  gophish-export (-h | --help)
6
  gophish-export --version
7

8
Options:
9
  API_KEY                   Gophish API key.
10
  ASSESSMENT_ID             ID of the assessment to export data from.
11
  SERVER                    Full URL to Gophish server.
12
  -h --help                 Show this screen.
13
  --version                 Show version.
14
  -l --log-level=LEVEL      If specified, then the log level will be set to
15
                            the specified value.  Valid values are "debug", "info",
16
                            "warning", "error", and "critical". [default: info]
17
"""
18

19
# Standard Python Libraries
20
from datetime import datetime
6✔
21
import hashlib
6✔
22
import json
6✔
23
import logging
6✔
24
import re
6✔
25
import sys
6✔
26
from typing import Dict
6✔
27

28
# Third-Party Libraries
29
from docopt import docopt
6✔
30

31
# No type stubs exist for httpagentparser, so we add "type: ignore" to tell
32
# mypy to ignore this library
33
import httpagentparser  # type: ignore
6✔
34
import urllib3
6✔
35

36
# cisagov Libraries
37
from tools.connect import connect_api
6✔
38
from util.validate import validate_assessment_id
6✔
39

40
from ._version import __version__
6✔
41

42
# Disable "Insecure Request" warning: Gophish uses a self-signed certificate
43
# as default for https connections, which can not be  verified by a third
44
# party; thus, an SSL insecure request warning is produced.
45
urllib3.disable_warnings()
6✔
46

47

48
def assessment_exists(api, assessment_id):
6✔
49
    """Check if Gophish has at least one campaign for designated assessment.
50

51
    Args:
52
        api (Gophish API): Connection to Gophish server via the API.
53
        assessment_id (string): Assessment identifier to get campaigns from.
54

55
    Returns:
56
        boolean: Indicates if a campaign is found starting with assessment_id.
57
    """
58
    allCampaigns = api.campaigns.get()
6✔
59
    for campaign in allCampaigns:
6✔
60
        if campaign.name.startswith(assessment_id):
6✔
61
            return True
6✔
62

63
    return False
6✔
64

65

66
def export_targets(api, assessment_id):
6✔
67
    """Add all targets to a list.
68

69
    Achieved by pulling the group IDs for any group starting with
70
    the assessment id. The targets within the group are then parsed
71
    into a targets list of target dicts. Each target dict includes a
72
    sha256 hash of the target's email and assessment id with any labels.
73

74
    Args:
75
        api (Gophish API): Connection to Gophish server via the API.
76
        assessment_id (string): Assessment identifier to get campaigns from.
77

78
    Returns:
79
        List of targets from the assessment's group(s).
80
    """
81
    groupIDs = get_group_ids(api, assessment_id)
6✔
82

83
    targets = list()
6✔
84

85
    for group_id in groupIDs:
6✔
86
        # Gets target list for parsing.
87
        raw_targets = api.groups.get(group_id).as_dict()["targets"]
6✔
88

89
        for raw_target in raw_targets:
6✔
90

91
            target = dict()
6✔
92

93
            target["id"] = hashlib.sha256(
6✔
94
                raw_target["email"].encode("utf-8")
95
            ).hexdigest()
96
            target["customer_defined_labels"] = dict()
6✔
97

98
            if "position" in raw_target:
6!
99
                target["customer_defined_labels"][assessment_id] = [
6✔
100
                    raw_target["position"]
101
                ]
102

103
            targets.append(target)
6✔
104

105
    logging.info(
6✔
106
        "%d email targets found for assessment %s.", len(targets), assessment_id
107
    )
108

109
    return targets
6✔
110

111

112
def get_group_ids(api, assessment_id):
6✔
113
    """Return a list of group IDs for all groups starting with specified assessment_id."""
UNCOV
114
    rawGroup = api.groups.get()  # Holds raw list of campaigns from Gophish.
×
UNCOV
115
    groups = list()  # Holds list of campaign IDs that match the assessment.
×
116

117
    for group in rawGroup:
×
118
        group = group.as_dict()
×
UNCOV
119
        if group["name"].startswith(assessment_id):
×
120
            groups.append(group["id"])
×
121

122
    return groups
×
123

124

125
def export_campaigns(api, assessment_id):
6✔
126
    """Add all the campaigns' data for an assessment to a list.
127

128
    Args:
129
        api (Gophish API): Connection to Gophish server via the API.
130
        assessment_id (string): Assessment identifier to get campaigns from.
131

132
    Returns:
133
        List of the assessment's campaigns with data.
134
    """
UNCOV
135
    campaignIDs = get_campaign_ids(api, assessment_id)
×
UNCOV
136
    campaigns = list()
×
137

138
    for campaign_id in campaignIDs:
×
139
        campaigns.append(get_campaign_data(api, campaign_id))
×
140

141
    logging.info("%d campaigns found for assessment %s.", len(campaigns), assessment_id)
×
142

UNCOV
143
    return campaigns
×
144

145

146
def get_campaign_ids(api, assessment_id):
6✔
147
    """Return a list of campaign IDs for all campaigns starting with specified assessment_id."""
UNCOV
148
    rawCampaigns = api.campaigns.get()  # Holds raw list of campaigns from Gophish.
×
UNCOV
149
    campaigns = list()  # Holds list of campaign IDs that match the assessment.
×
150

151
    for campaign in rawCampaigns:
×
152
        campaign = campaign.as_dict()
×
UNCOV
153
        if campaign["name"].startswith(assessment_id):
×
154
            campaigns.append(campaign["id"])
×
155

156
    return campaigns
×
157

158

159
def get_campaign_data(api, campaign_id):
6✔
160
    """Return campaign metadata for the given campaign ID."""
UNCOV
161
    campaign = dict()
×
162

163
    # Pulls the campaign data as dict from Gophish.
164
    rawCampaign: dict = api.campaigns.get(campaign_id).as_dict()
×
165

UNCOV
166
    campaign["id"] = rawCampaign["name"]
×
167

UNCOV
168
    campaign["start_time"] = rawCampaign["launch_date"]
×
169
    campaign["end_time"] = rawCampaign["completed_date"]
×
UNCOV
170
    campaign["url"] = rawCampaign["url"]
×
171

172
    campaign["subject"] = rawCampaign["template"]["subject"]
×
173

174
    # Get the template ID from the Gophish template name.
175
    campaign["template"] = (
×
176
        api.templates.get(rawCampaign["template"]["id"]).as_dict()["name"].split("-")[2]
177
    )
178

UNCOV
179
    campaign["clicks"] = get_click_data(api, campaign_id)
×
180

181
    # Get the e-mail send status from Gophish.
182
    campaign["status"] = get_email_status(api, campaign_id)
×
183

UNCOV
184
    return campaign
×
185

186

187
def get_click_data(api, campaign_id):
6✔
188
    """Return a list of all clicks for a given campaign."""
UNCOV
189
    rawEvents = api.campaigns.get(campaign_id).as_dict()["timeline"]
×
UNCOV
190
    clicks = list()  # Holds list of all users that clicked.
×
191

192
    for rawEvent in rawEvents:
×
193
        if rawEvent["message"] == "Clicked Link":
×
UNCOV
194
            click = dict()
×
195

196
            # Builds out click document.
197
            click["user"] = hashlib.sha256(
×
198
                rawEvent["email"].encode("utf-8")
199
            ).hexdigest()
200
            click["source_ip"] = rawEvent["details"]["browser"]["address"]
×
201

UNCOV
202
            click["time"] = rawEvent["time"]
×
203

UNCOV
204
            click["application"] = get_application(rawEvent)
×
205

UNCOV
206
            clicks.append(click)
×
207

UNCOV
208
    return clicks
×
209

210

211
def get_email_status(api, campaign_id):
6✔
212
    """Return the email send status and time."""
UNCOV
213
    rawEvents = api.campaigns.get(campaign_id).as_dict()["timeline"]
×
UNCOV
214
    status = list()
×
UNCOV
215
    for rawEvent in rawEvents:
×
216
        email = dict()
×
217

218
        if rawEvent["message"] == "Email Sent":
×
219
            email["user"] = hashlib.sha256(
×
220
                rawEvent["email"].encode("utf-8")
221
            ).hexdigest()
222

UNCOV
223
            email["time"] = rawEvent["time"]
×
224

UNCOV
225
            email["status"] = "SUCCESS"
×
226

UNCOV
227
        elif rawEvent["message"] == "Error Sending Email":
×
228

UNCOV
229
            email["user"] = hashlib.sha256(
×
230
                rawEvent["email"].encode("utf-8")
231
            ).hexdigest()
232

233
            # Trim microseconds before converting to datetime.
UNCOV
234
            rawEvent["time"] = datetime.strptime(
×
235
                rawEvent["time"].split(".")[0], "%Y-%m-%dT%H:%M:%S"
236
            )
237
            email["time"] = rawEvent["time"]
×
238

UNCOV
239
            email["status"] = "Failed"
×
240

UNCOV
241
        if email:
×
242
            status.append(email)
×
243

244
    return status
×
245

246

247
def get_application(rawEvent):
6✔
248
    """Return application details."""
UNCOV
249
    application = dict()
×
250

UNCOV
251
    application["external_ip"] = rawEvent["details"]["browser"]["address"]
×
252

253
    # Process user agent string.
254
    userAgent = rawEvent["details"]["browser"]["user-agent"]
×
UNCOV
255
    application["name"] = httpagentparser.detect(userAgent)["platform"]["name"]
×
UNCOV
256
    application["version"] = httpagentparser.detect(userAgent)["platform"]["version"]
×
257

258
    return application
×
259

260

261
def find_unique_target_clicks_count(clicks):
6✔
262
    """Return the number of unique clicks in a click set."""
263
    uniq_users = set()
6✔
264
    for click in clicks:
6✔
265
        uniq_users.add(click["user"])
6✔
266
    return len(uniq_users)
6✔
267

268

269
def write_campaign_summary(api, assessment_id):
6✔
270
    """Output a campaign summary report to JSON, console, and a text file."""
UNCOV
271
    campaign_ids = get_campaign_ids(api, assessment_id)
×
UNCOV
272
    campaign_data_template = "campaign_data.json"
×
UNCOV
273
    campaign_summary_json = f"{assessment_id}_campaign_data.json"
×
274
    campaign_summary_textfile = f"{assessment_id}_summary_{datetime.strftime(datetime.now(), '%Y-%m-%dT%H:%M:%S')}.txt"
×
275

276
    with open(campaign_data_template) as template:
×
277
        campaign_data = json.load(template)
×
278

279
    logging.info("Writing campaign summary report to %s", campaign_summary_textfile)
×
280
    file_out = open(campaign_summary_textfile, "w+")
×
UNCOV
281
    file_out.write("Campaigns for Assessment: " + assessment_id)
×
282

283
    regex = re.compile(r"^.*_(?P<level>level-[1-6])$")
×
284
    for campaign_id in campaign_ids:
×
UNCOV
285
        campaign = api.campaigns.get(campaign_id)
×
286
        match = regex.fullmatch(campaign.name)
×
287
        if match:
×
288
            level = match.group("level")
×
289
        else:
290
            logging.warn(
×
291
                "Encountered campaign (%s) that is unable to be processed for campaign summary export. \n"
292
                "Campaign name is not properly suffixed with the campaign level number (e.g. '_level-1')\n"
293
                "Skipping campaign",
294
                campaign.name,
295
            )
UNCOV
296
            continue
×
297

UNCOV
298
        logging.info(level)
×
299
        clicks = get_click_data(api, campaign_id)
×
300

301
        total_clicks = api.campaigns.summary(campaign_id=campaign_id).stats.clicked
×
302
        unique_clicks = find_unique_target_clicks_count(clicks)
×
UNCOV
303
        if total_clicks > 0:
×
304
            percent_clicks = unique_clicks / float(total_clicks)
×
305
        else:
306
            percent_clicks = 0.0
×
307
        campaign_data[level]["subject"] = campaign.template.subject
×
UNCOV
308
        campaign_data[level]["sender"] = campaign.smtp.from_address
×
309
        campaign_data[level]["start_date"] = campaign.launch_date
×
310
        campaign_data[level]["end_date"] = campaign.completed_date
×
311
        campaign_data[level]["redirect"] = campaign.url
×
312
        campaign_data[level]["clicks"] = total_clicks
×
313
        campaign_data[level]["unique_clicks"] = unique_clicks
×
314
        campaign_data[level]["percent_clicks"] = percent_clicks
×
315

316
        file_out.write("\n")
×
317
        file_out.write("-" * 50)
×
UNCOV
318
        file_out.write("\nCampaign: %s" % campaign.name)
×
319
        file_out.write("\nSubject: %s" % campaign_data[level]["subject"])
×
320
        file_out.write("\nSender: %s" % campaign_data[level]["sender"])
×
321
        file_out.write("\nStart Date: %s" % campaign_data[level]["start_date"])
×
322
        file_out.write("\nEnd Date: %s" % campaign_data[level]["end_date"])
×
323
        file_out.write("\nRedirect: %s" % campaign_data[level]["redirect"])
×
324
        file_out.write("\nClicks: %d" % campaign_data[level]["clicks"])
×
325
        file_out.write("\nUnique Clicks: %d" % campaign_data[level]["unique_clicks"])
×
326
        file_out.write(
×
327
            "\nPercentage Clicks: %f" % campaign_data[level]["percent_clicks"]
328
        )
329

UNCOV
330
    file_out.close()
×
UNCOV
331
    logging.info("Writing out summary JSON to %s", campaign_summary_json)
×
UNCOV
332
    with open(campaign_summary_json, "w") as fp:
×
333
        json.dump(campaign_data, fp, indent=4)
×
334

335

336
def export_user_reports(api, assessment_id):
6✔
337
    """Build and export a user_report JSON file for each campaign in an assessment."""
UNCOV
338
    campaign_ids = get_campaign_ids(api, assessment_id)
×
339

UNCOV
340
    for campaign_id in campaign_ids:
×
341
        first_report = None
×
UNCOV
342
        user_report_doc = dict()
×
343
        campaign = get_campaign_data(api, campaign_id)
×
344

345
        # iterate over clicks and find the earliest click
346
        for click in campaign["clicks"]:
×
UNCOV
347
            click_time = datetime.strptime(
×
348
                click["time"].split(".")[0], "%Y-%m-%dT%H:%M:%S"
349
            )
350
            if first_report is None or click_time < first_report:
×
UNCOV
351
                first_report = click_time
×
352

353
        # The "customer" field is a placeholder added for operator convenience when
354
        # working with the JSON file created.
UNCOV
355
        user_report_doc["customer"] = ""
×
UNCOV
356
        user_report_doc["assessment"] = assessment_id
×
357
        # get_campaign_ids() returns integers, but user_report_doc["campaign"]
358
        # expects a string
359
        user_report_doc["campaign"] = str(campaign_id)
×
UNCOV
360
        if first_report is not None:
×
UNCOV
361
            user_report_doc["first_report"] = datetime.strftime(
×
362
                first_report, "%Y-%m-%dT%H:%M:%S"
363
            )
364
        else:
UNCOV
365
            user_report_doc["first_report"] = "No clicks reported"
×
366

UNCOV
367
        user_report_doc["total_num_reports"] = api.campaigns.summary(
×
368
            campaign_id=campaign_id
369
        ).stats.clicked
370

UNCOV
371
        logging.info(
×
372
            "Writing out user report for campaign %s in assessment %s",
373
            campaign["id"],
374
            assessment_id,
375
        )
376

UNCOV
377
        with open(f"{assessment_id}_{campaign_id}_user_report_doc.json", "w") as fp:
×
UNCOV
378
            json.dump(user_report_doc, fp, indent=4)
×
379

380

381
def main() -> None:
6✔
382
    """Set up logging, connect to API, export all assessment data."""
UNCOV
383
    args: Dict[str, str] = docopt(__doc__, version=__version__)
×
384

385
    # Set up logging
386
    log_level = args["--log-level"]
×
UNCOV
387
    try:
×
UNCOV
388
        logging.basicConfig(
×
389
            format="\n%(levelname)s: %(message)s", level=log_level.upper()
390
        )
391
    except ValueError:
×
UNCOV
392
        logging.critical(
×
393
            '"%s" is not a valid logging level. Possible values are debug, info, warning, and error.',
394
            log_level,
395
        )
UNCOV
396
        sys.exit(1)
×
397

398
    else:
399
        # Connect to API
UNCOV
400
        try:
×
UNCOV
401
            api = connect_api(args["API_KEY"], args["SERVER"])
×
UNCOV
402
            logging.debug("Connected to: %s", args["SERVER"])
×
403
        except Exception as e:
×
404
            logging.critical(e.args[0])
×
405
            sys.exit(1)
×
406

407
    if not validate_assessment_id(args["ASSESSMENT_ID"]):
×
408
        logging.critical(
×
409
            '"%s" is an invalid assessment_id format. Assessment identifiers begin with RV and are followed by '
410
            " a 4 or 5 digit numerical sequence. Examples: RV1234, RV12345",
411
            args["ASSESSMENT_ID"],
412
        )
UNCOV
413
        sys.exit(1)
×
414

UNCOV
415
    if assessment_exists(api, args["ASSESSMENT_ID"]):
×
416
        assessment_dict: Dict = dict()
×
417

418
        # Add targets list to assessment dict.
419
        assessment_dict["targets"] = export_targets(api, args["ASSESSMENT_ID"])
×
420

421
        # Add campaigns list to the assessment dict.
422
        assessment_dict["campaigns"] = export_campaigns(api, args["ASSESSMENT_ID"])
×
423

UNCOV
424
        with open(f'data_{args["ASSESSMENT_ID"]}.json', "w") as fp:
×
425
            json.dump(assessment_dict, fp, indent=4)
×
426

427
        logging.info("Data written to data_%s.json", args["ASSESSMENT_ID"])
×
428

UNCOV
429
        export_user_reports(api, args["ASSESSMENT_ID"])
×
430
        write_campaign_summary(api, args["ASSESSMENT_ID"])
×
431
    else:
432
        logging.error(
×
433
            'Assessment "%s" does not exist in Gophish.', args["ASSESSMENT_ID"]
434
        )
435
        sys.exit(1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc