• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / gophish-tools / 3989899550

pending completion
3989899550

Pull #123

github

GitHub
Merge 51f38b5e8 into ac0990ba0
Pull Request #123: ⚠️ CONFLICT! Lineage pull request for: skeleton

141 of 473 branches covered (29.81%)

Branch coverage included in aggregate %.

1 of 8 new or added lines in 4 files covered. (12.5%)

222 existing lines in 5 files now uncovered.

297 of 1269 relevant lines covered (23.4%)

1.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.56
/src/tools/gophish_export.py
1
"""Export all the data from an assessment within Gophish into a single JSON file.
2

3
Usage:
4
  gophish-export [--log-level=LEVEL] ASSESSMENT_ID SERVER API_KEY
5
  gophish-export (-h | --help)
6
  gophish-export --version
7

8
Options:
9
  API_KEY                   Gophish API key.
10
  ASSESSMENT_ID             ID of the assessment to export data from.
11
  SERVER                    Full URL to Gophish server.
12
  -h --help                 Show this screen.
13
  --version                 Show version.
14
  -l --log-level=LEVEL      If specified, then the log level will be set to
15
                            the specified value.  Valid values are "debug", "info",
16
                            "warning", "error", and "critical". [default: info]
17
"""
18

19
# Standard Python Libraries
20
from datetime import datetime
6✔
21
import hashlib
6✔
22
import json
6✔
23
import logging
6✔
24
import re
6✔
25
import sys
6✔
26
from typing import Dict
6✔
27

28
# Third-Party Libraries
29
from docopt import docopt
6✔
30
import httpagentparser
6✔
31
import requests.packages.urllib3
6✔
32

33
# cisagov Libraries
34
from tools.connect import connect_api
6✔
35
from util.validate import validate_assessment_id
6✔
36

37
from ._version import __version__
6✔
38

39
# Disable "Insecure Request" warning: Gophish uses a self-signed certificate
40
# as default for https connections, which can not be  verified by a third
41
# party; thus, an SSL insecure request warning is produced.
42
requests.packages.urllib3.disable_warnings()
6✔
43

44

45
def assessment_exists(api, assessment_id):
6✔
46
    """Check if Gophish has at least one campaign for designated assessment.
47

48
    Args:
49
        api (Gophish API): Connection to Gophish server via the API.
50
        assessment_id (string): Assessment identifier to get campaigns from.
51

52
    Returns:
53
        boolean: Indicates if a campaign is found starting with assessment_id.
54
    """
55
    allCampaigns = api.campaigns.get()
6✔
56
    for campaign in allCampaigns:
6✔
57
        if campaign.name.startswith(assessment_id):
6✔
58
            return True
6✔
59

60
    return False
6✔
61

62

63
def export_targets(api, assessment_id):
6✔
64
    """Add all targets to a list.
65

66
    Achieved by pulling the group IDs for any group starting with
67
    the assessment id. The targets within the group are then parsed
68
    into a targets list of target dicts. Each target dict includes a
69
    sha256 hash of the target's email and assessment id with any labels.
70

71
    Args:
72
        api (Gophish API): Connection to Gophish server via the API.
73
        assessment_id (string): Assessment identifier to get campaigns from.
74

75
    Returns:
76
        List of targets from the assessment's group(s).
77
    """
78
    groupIDs = get_group_ids(api, assessment_id)
6✔
79

80
    targets = list()
6✔
81

82
    for group_id in groupIDs:
6✔
83
        # Gets target list for parsing.
84
        raw_targets = api.groups.get(group_id).as_dict()["targets"]
6✔
85

86
        for raw_target in raw_targets:
6✔
87

88
            target = dict()
6✔
89

90
            target["id"] = hashlib.sha256(
6✔
91
                raw_target["email"].encode("utf-8")
92
            ).hexdigest()
93
            target["customer_defined_labels"] = dict()
6✔
94

95
            if "position" in raw_target:
6!
96
                target["customer_defined_labels"][assessment_id] = [
6✔
97
                    raw_target["position"]
98
                ]
99

100
            targets.append(target)
6✔
101

102
    logging.info(
6✔
103
        "%d email targets found for assessment %s.", len(targets), assessment_id
104
    )
105

106
    return targets
6✔
107

108

109
def get_group_ids(api, assessment_id):
6✔
110
    """Return a list of group IDs for all groups starting with specified assessment_id."""
UNCOV
111
    rawGroup = api.groups.get()  # Holds raw list of campaigns from Gophish.
×
UNCOV
112
    groups = list()  # Holds list of campaign IDs that match the assessment.
×
113

114
    for group in rawGroup:
×
115
        group = group.as_dict()
×
UNCOV
116
        if group["name"].startswith(assessment_id):
×
117
            groups.append(group["id"])
×
118

119
    return groups
×
120

121

122
def export_campaigns(api, assessment_id):
6✔
123
    """Add all the campaigns' data for an assessment to a list.
124

125
    Args:
126
        api (Gophish API): Connection to Gophish server via the API.
127
        assessment_id (string): Assessment identifier to get campaigns from.
128

129
    Returns:
130
        List of the assessment's campaigns with data.
131
    """
UNCOV
132
    campaignIDs = get_campaign_ids(api, assessment_id)
×
UNCOV
133
    campaigns = list()
×
134

135
    for campaign_id in campaignIDs:
×
136
        campaigns.append(get_campaign_data(api, campaign_id))
×
137

138
    logging.info("%d campaigns found for assessment %s.", len(campaigns), assessment_id)
×
139

UNCOV
140
    return campaigns
×
141

142

143
def get_campaign_ids(api, assessment_id):
6✔
144
    """Return a list of campaign IDs for all campaigns starting with specified assessment_id."""
UNCOV
145
    rawCampaigns = api.campaigns.get()  # Holds raw list of campaigns from Gophish.
×
UNCOV
146
    campaigns = list()  # Holds list of campaign IDs that match the assessment.
×
147

148
    for campaign in rawCampaigns:
×
149
        campaign = campaign.as_dict()
×
UNCOV
150
        if campaign["name"].startswith(assessment_id):
×
151
            campaigns.append(campaign["id"])
×
152

153
    return campaigns
×
154

155

156
def get_campaign_data(api, campaign_id):
6✔
157
    """Return campaign metadata for the given campaign ID."""
UNCOV
158
    campaign = dict()
×
159

160
    # Pulls the campaign data as dict from Gophish.
161
    rawCampaign: dict = api.campaigns.get(campaign_id).as_dict()
×
162

UNCOV
163
    campaign["id"] = rawCampaign["name"]
×
164

UNCOV
165
    campaign["start_time"] = rawCampaign["launch_date"]
×
166
    campaign["end_time"] = rawCampaign["completed_date"]
×
UNCOV
167
    campaign["url"] = rawCampaign["url"]
×
168

169
    campaign["subject"] = rawCampaign["template"]["subject"]
×
170

171
    # Get the template ID from the Gophish template name.
172
    campaign["template"] = (
×
173
        api.templates.get(rawCampaign["template"]["id"]).as_dict()["name"].split("-")[2]
174
    )
175

UNCOV
176
    campaign["clicks"] = get_click_data(api, campaign_id)
×
177

178
    # Get the e-mail send status from Gophish.
179
    campaign["status"] = get_email_status(api, campaign_id)
×
180

UNCOV
181
    return campaign
×
182

183

184
def get_click_data(api, campaign_id):
6✔
185
    """Return a list of all clicks for a given campaign."""
UNCOV
186
    rawEvents = api.campaigns.get(campaign_id).as_dict()["timeline"]
×
UNCOV
187
    clicks = list()  # Holds list of all users that clicked.
×
188

189
    for rawEvent in rawEvents:
×
190
        if rawEvent["message"] == "Clicked Link":
×
UNCOV
191
            click = dict()
×
192

193
            # Builds out click document.
194
            click["user"] = hashlib.sha256(
×
195
                rawEvent["email"].encode("utf-8")
196
            ).hexdigest()
197
            click["source_ip"] = rawEvent["details"]["browser"]["address"]
×
198

UNCOV
199
            click["time"] = rawEvent["time"]
×
200

UNCOV
201
            click["application"] = get_application(rawEvent)
×
202

UNCOV
203
            clicks.append(click)
×
204

UNCOV
205
    return clicks
×
206

207

208
def get_email_status(api, campaign_id):
6✔
209
    """Return the email send status and time."""
UNCOV
210
    rawEvents = api.campaigns.get(campaign_id).as_dict()["timeline"]
×
UNCOV
211
    status = list()
×
UNCOV
212
    for rawEvent in rawEvents:
×
213
        email = dict()
×
214

215
        if rawEvent["message"] == "Email Sent":
×
216
            email["user"] = hashlib.sha256(
×
217
                rawEvent["email"].encode("utf-8")
218
            ).hexdigest()
219

UNCOV
220
            email["time"] = rawEvent["time"]
×
221

UNCOV
222
            email["status"] = "SUCCESS"
×
223

UNCOV
224
        elif rawEvent["message"] == "Error Sending Email":
×
225

UNCOV
226
            email["user"] = hashlib.sha256(
×
227
                rawEvent["email"].encode("utf-8")
228
            ).hexdigest()
229

230
            # Trim microseconds before converting to datetime.
UNCOV
231
            rawEvent["time"] = datetime.strptime(
×
232
                rawEvent["time"].split(".")[0], "%Y-%m-%dT%H:%M:%S"
233
            )
234
            email["time"] = rawEvent["time"]
×
235

UNCOV
236
            email["status"] = "Failed"
×
237

UNCOV
238
        if email:
×
239
            status.append(email)
×
240

241
    return status
×
242

243

244
def get_application(rawEvent):
6✔
245
    """Return application details."""
UNCOV
246
    application = dict()
×
247

UNCOV
248
    application["external_ip"] = rawEvent["details"]["browser"]["address"]
×
249

250
    # Process user agent string.
251
    userAgent = rawEvent["details"]["browser"]["user-agent"]
×
UNCOV
252
    application["name"] = httpagentparser.detect(userAgent)["platform"]["name"]
×
UNCOV
253
    application["version"] = httpagentparser.detect(userAgent)["platform"]["version"]
×
254

255
    return application
×
256

257

258
def find_unique_target_clicks_count(clicks):
6✔
259
    """Return the number of unique clicks in a click set."""
260
    uniq_users = set()
6✔
261
    for click in clicks:
6✔
262
        uniq_users.add(click["user"])
6✔
263
    return len(uniq_users)
6✔
264

265

266
def write_campaign_summary(api, assessment_id):
6✔
267
    """Output a campaign summary report to JSON, console, and a text file."""
UNCOV
268
    campaign_ids = get_campaign_ids(api, assessment_id)
×
UNCOV
269
    campaign_data_template = "campaign_data.json"
×
UNCOV
270
    campaign_summary_json = f"{assessment_id}_campaign_data.json"
×
271
    campaign_summary_textfile = f"{assessment_id}_summary_{datetime.strftime(datetime.now(), '%Y-%m-%dT%H:%M:%S')}.txt"
×
272

273
    with open(campaign_data_template) as template:
×
274
        campaign_data = json.load(template)
×
275

276
    logging.info("Writing campaign summary report to %s", campaign_summary_textfile)
×
277
    file_out = open(campaign_summary_textfile, "w+")
×
UNCOV
278
    file_out.write("Campaigns for Assessment: " + assessment_id)
×
279

280
    regex = re.compile(r"^.*_(?P<level>level-[1-6])$")
×
281
    for campaign_id in campaign_ids:
×
UNCOV
282
        campaign = api.campaigns.get(campaign_id)
×
283
        match = regex.fullmatch(campaign.name)
×
284
        if match:
×
285
            level = match.group("level")
×
286
        else:
287
            logging.warn(
×
288
                "Encountered campaign (%s) that is unable to be processed for campaign summary export. \n"
289
                "Campaign name is not properly suffixed with the campaign level number (e.g. '_level-1')\n"
290
                "Skipping campaign",
291
                campaign.name,
292
            )
UNCOV
293
            continue
×
294

UNCOV
295
        logging.info(level)
×
296
        clicks = get_click_data(api, campaign_id)
×
297

298
        total_clicks = api.campaigns.summary(campaign_id=campaign_id).stats.clicked
×
299
        unique_clicks = find_unique_target_clicks_count(clicks)
×
UNCOV
300
        if total_clicks > 0:
×
301
            percent_clicks = unique_clicks / float(total_clicks)
×
302
        else:
303
            percent_clicks = 0.0
×
304
        campaign_data[level]["subject"] = campaign.template.subject
×
UNCOV
305
        campaign_data[level]["sender"] = campaign.smtp.from_address
×
306
        campaign_data[level]["start_date"] = campaign.launch_date
×
307
        campaign_data[level]["end_date"] = campaign.completed_date
×
308
        campaign_data[level]["redirect"] = campaign.url
×
309
        campaign_data[level]["clicks"] = total_clicks
×
310
        campaign_data[level]["unique_clicks"] = unique_clicks
×
311
        campaign_data[level]["percent_clicks"] = percent_clicks
×
312

313
        file_out.write("\n")
×
314
        file_out.write("-" * 50)
×
UNCOV
315
        file_out.write("\nCampaign: %s" % campaign.name)
×
316
        file_out.write("\nSubject: %s" % campaign_data[level]["subject"])
×
317
        file_out.write("\nSender: %s" % campaign_data[level]["sender"])
×
318
        file_out.write("\nStart Date: %s" % campaign_data[level]["start_date"])
×
319
        file_out.write("\nEnd Date: %s" % campaign_data[level]["end_date"])
×
320
        file_out.write("\nRedirect: %s" % campaign_data[level]["redirect"])
×
321
        file_out.write("\nClicks: %d" % campaign_data[level]["clicks"])
×
322
        file_out.write("\nUnique Clicks: %d" % campaign_data[level]["unique_clicks"])
×
323
        file_out.write(
×
324
            "\nPercentage Clicks: %f" % campaign_data[level]["percent_clicks"]
325
        )
326

UNCOV
327
    file_out.close()
×
UNCOV
328
    logging.info("Writing out summary JSON to %s", campaign_summary_json)
×
UNCOV
329
    with open(campaign_summary_json, "w") as fp:
×
330
        json.dump(campaign_data, fp, indent=4)
×
331

332

333
def export_user_reports(api, assessment_id):
6✔
334
    """Build and export a user_report JSON file for each campaign in an assessment."""
UNCOV
335
    campaign_ids = get_campaign_ids(api, assessment_id)
×
336

UNCOV
337
    for campaign_id in campaign_ids:
×
338
        first_report = None
×
UNCOV
339
        user_report_doc = dict()
×
340
        campaign = get_campaign_data(api, campaign_id)
×
341

342
        # iterate over clicks and find the earliest click
343
        for click in campaign["clicks"]:
×
UNCOV
344
            click_time = datetime.strptime(
×
345
                click["time"].split(".")[0], "%Y-%m-%dT%H:%M:%S"
346
            )
347
            if first_report is None or click_time < first_report:
×
UNCOV
348
                first_report = click_time
×
349

350
        # The "customer" field is a placeholder added for operator convenience when
351
        # working with the JSON file created.
UNCOV
352
        user_report_doc["customer"] = ""
×
UNCOV
353
        user_report_doc["assessment"] = assessment_id
×
354
        # get_campaign_ids() returns integers, but user_report_doc["campaign"]
355
        # expects a string
356
        user_report_doc["campaign"] = str(campaign_id)
×
UNCOV
357
        if first_report is not None:
×
UNCOV
358
            user_report_doc["first_report"] = datetime.strftime(
×
359
                first_report, "%Y-%m-%dT%H:%M:%S"
360
            )
361
        else:
UNCOV
362
            user_report_doc["first_report"] = "No clicks reported"
×
363

UNCOV
364
        user_report_doc["total_num_reports"] = api.campaigns.summary(
×
365
            campaign_id=campaign_id
366
        ).stats.clicked
367

UNCOV
368
        logging.info(
×
369
            "Writing out user report for campaign %s in assessment %s",
370
            campaign["id"],
371
            assessment_id,
372
        )
373

UNCOV
374
        with open(f"{assessment_id}_{campaign_id}_user_report_doc.json", "w") as fp:
×
UNCOV
375
            json.dump(user_report_doc, fp, indent=4)
×
376

377

378
def main() -> None:
6✔
379
    """Set up logging, connect to API, export all assessment data."""
UNCOV
380
    args: Dict[str, str] = docopt(__doc__, version=__version__)
×
381

382
    # Set up logging
383
    log_level = args["--log-level"]
×
UNCOV
384
    try:
×
UNCOV
385
        logging.basicConfig(
×
386
            format="\n%(levelname)s: %(message)s", level=log_level.upper()
387
        )
388
    except ValueError:
×
UNCOV
389
        logging.critical(
×
390
            '"%s" is not a valid logging level. Possible values are debug, info, warning, and error.',
391
            log_level,
392
        )
UNCOV
393
        sys.exit(1)
×
394

395
    else:
396
        # Connect to API
UNCOV
397
        try:
×
UNCOV
398
            api = connect_api(args["API_KEY"], args["SERVER"])
×
UNCOV
399
            logging.debug("Connected to: %s", args["SERVER"])
×
400
        except Exception as e:
×
401
            logging.critical(e.args[0])
×
402
            sys.exit(1)
×
403

404
    if not validate_assessment_id(args["ASSESSMENT_ID"]):
×
405
        logging.critical(
×
406
            '"%s" is an invalid assessment_id format. Assessment identifiers begin with RV and are followed by '
407
            " a 4 or 5 digit numerical sequence. Examples: RV1234, RV12345",
408
            args["ASSESSMENT_ID"],
409
        )
UNCOV
410
        sys.exit(1)
×
411

UNCOV
412
    if assessment_exists(api, args["ASSESSMENT_ID"]):
×
413
        assessment_dict: Dict = dict()
×
414

415
        # Add targets list to assessment dict.
416
        assessment_dict["targets"] = export_targets(api, args["ASSESSMENT_ID"])
×
417

418
        # Add campaigns list to the assessment dict.
419
        assessment_dict["campaigns"] = export_campaigns(api, args["ASSESSMENT_ID"])
×
420

UNCOV
421
        with open(f'data_{args["ASSESSMENT_ID"]}.json', "w") as fp:
×
422
            json.dump(assessment_dict, fp, indent=4)
×
423

424
        logging.info("Data written to data_%s.json", args["ASSESSMENT_ID"])
×
425

UNCOV
426
        export_user_reports(api, args["ASSESSMENT_ID"])
×
427
        write_campaign_summary(api, args["ASSESSMENT_ID"])
×
428
    else:
429
        logging.error(
×
430
            'Assessment "%s" does not exist in Gophish.', args["ASSESSMENT_ID"]
431
        )
432
        sys.exit(1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc