• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / gophish-tools / 4812977103

pending completion
4812977103

Pull #134

github

GitHub
Merge be94355f4 into d859c2be0
Pull Request #134: Lineage pull request for: skeleton

141 of 473 branches covered (29.81%)

Branch coverage included in aggregate %.

298 of 1270 relevant lines covered (23.46%)

1.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.56
/src/tools/gophish_export.py
1
"""Export all the data from an assessment within Gophish into a single JSON file.
2

3
Usage:
4
  gophish-export [--log-level=LEVEL] ASSESSMENT_ID SERVER API_KEY
5
  gophish-export (-h | --help)
6
  gophish-export --version
7

8
Options:
9
  API_KEY                   Gophish API key.
10
  ASSESSMENT_ID             ID of the assessment to export data from.
11
  SERVER                    Full URL to Gophish server.
12
  -h --help                 Show this screen.
13
  --version                 Show version.
14
  -l --log-level=LEVEL      If specified, then the log level will be set to
15
                            the specified value.  Valid values are "debug", "info",
16
                            "warning", "error", and "critical". [default: info]
17
"""
18

19
# Standard Python Libraries
20
from datetime import datetime
6✔
21
import hashlib
6✔
22
import json
6✔
23
import logging
6✔
24
import re
6✔
25
import sys
6✔
26
from typing import Dict
6✔
27

28
# Third-Party Libraries
29
from docopt import docopt
6✔
30

31
# No type stubs exist for httpagentparser, so we add "type: ignore" to tell
32
# mypy to ignore this library
33
import httpagentparser  # type: ignore
6✔
34
import urllib3
6✔
35

36
# cisagov Libraries
37
from tools.connect import connect_api
6✔
38
from util.validate import validate_assessment_id
6✔
39

40
from ._version import __version__
6✔
41

42
# Disable "Insecure Request" warning: Gophish uses a self-signed certificate
43
# as default for https connections, which can not be  verified by a third
44
# party; thus, an SSL insecure request warning is produced.
45
urllib3.disable_warnings()
6✔
46

47

48
def assessment_exists(api, assessment_id):
6✔
49
    """Check if Gophish has at least one campaign for designated assessment.
50

51
    Args:
52
        api (Gophish API): Connection to Gophish server via the API.
53
        assessment_id (string): Assessment identifier to get campaigns from.
54

55
    Returns:
56
        boolean: Indicates if a campaign is found starting with assessment_id.
57
    """
58
    allCampaigns = api.campaigns.get()
6✔
59
    for campaign in allCampaigns:
6✔
60
        if campaign.name.startswith(assessment_id):
6✔
61
            return True
6✔
62

63
    return False
6✔
64

65

66
def export_targets(api, assessment_id):
6✔
67
    """Add all targets to a list.
68

69
    Achieved by pulling the group IDs for any group starting with
70
    the assessment id. The targets within the group are then parsed
71
    into a targets list of target dicts. Each target dict includes a
72
    sha256 hash of the target's email and assessment id with any labels.
73

74
    Args:
75
        api (Gophish API): Connection to Gophish server via the API.
76
        assessment_id (string): Assessment identifier to get campaigns from.
77

78
    Returns:
79
        List of targets from the assessment's group(s).
80
    """
81
    groupIDs = get_group_ids(api, assessment_id)
6✔
82

83
    targets = list()
6✔
84

85
    for group_id in groupIDs:
6✔
86
        # Gets target list for parsing.
87
        raw_targets = api.groups.get(group_id).as_dict()["targets"]
6✔
88

89
        for raw_target in raw_targets:
6✔
90
            target = dict()
6✔
91

92
            target["id"] = hashlib.sha256(
6✔
93
                raw_target["email"].encode("utf-8")
94
            ).hexdigest()
95
            target["customer_defined_labels"] = dict()
6✔
96

97
            if "position" in raw_target:
6!
98
                target["customer_defined_labels"][assessment_id] = [
6✔
99
                    raw_target["position"]
100
                ]
101

102
            targets.append(target)
6✔
103

104
    logging.info(
6✔
105
        "%d email targets found for assessment %s.", len(targets), assessment_id
106
    )
107

108
    return targets
6✔
109

110

111
def get_group_ids(api, assessment_id):
6✔
112
    """Return a list of group IDs for all groups starting with specified assessment_id."""
113
    rawGroup = api.groups.get()  # Holds raw list of campaigns from Gophish.
×
114
    groups = list()  # Holds list of campaign IDs that match the assessment.
×
115

116
    for group in rawGroup:
×
117
        group = group.as_dict()
×
118
        if group["name"].startswith(assessment_id):
×
119
            groups.append(group["id"])
×
120

121
    return groups
×
122

123

124
def export_campaigns(api, assessment_id):
6✔
125
    """Add all the campaigns' data for an assessment to a list.
126

127
    Args:
128
        api (Gophish API): Connection to Gophish server via the API.
129
        assessment_id (string): Assessment identifier to get campaigns from.
130

131
    Returns:
132
        List of the assessment's campaigns with data.
133
    """
134
    campaignIDs = get_campaign_ids(api, assessment_id)
×
135
    campaigns = list()
×
136

137
    for campaign_id in campaignIDs:
×
138
        campaigns.append(get_campaign_data(api, campaign_id))
×
139

140
    logging.info("%d campaigns found for assessment %s.", len(campaigns), assessment_id)
×
141

142
    return campaigns
×
143

144

145
def get_campaign_ids(api, assessment_id):
6✔
146
    """Return a list of campaign IDs for all campaigns starting with specified assessment_id."""
147
    rawCampaigns = api.campaigns.get()  # Holds raw list of campaigns from Gophish.
×
148
    campaigns = list()  # Holds list of campaign IDs that match the assessment.
×
149

150
    for campaign in rawCampaigns:
×
151
        campaign = campaign.as_dict()
×
152
        if campaign["name"].startswith(assessment_id):
×
153
            campaigns.append(campaign["id"])
×
154

155
    return campaigns
×
156

157

158
def get_campaign_data(api, campaign_id):
6✔
159
    """Return campaign metadata for the given campaign ID."""
160
    campaign = dict()
×
161

162
    # Pulls the campaign data as dict from Gophish.
163
    rawCampaign: dict = api.campaigns.get(campaign_id).as_dict()
×
164

165
    campaign["id"] = rawCampaign["name"]
×
166

167
    campaign["start_time"] = rawCampaign["launch_date"]
×
168
    campaign["end_time"] = rawCampaign["completed_date"]
×
169
    campaign["url"] = rawCampaign["url"]
×
170

171
    campaign["subject"] = rawCampaign["template"]["subject"]
×
172

173
    # Get the template ID from the Gophish template name.
174
    campaign["template"] = (
×
175
        api.templates.get(rawCampaign["template"]["id"]).as_dict()["name"].split("-")[2]
176
    )
177

178
    campaign["clicks"] = get_click_data(api, campaign_id)
×
179

180
    # Get the e-mail send status from Gophish.
181
    campaign["status"] = get_email_status(api, campaign_id)
×
182

183
    return campaign
×
184

185

186
def get_click_data(api, campaign_id):
6✔
187
    """Return a list of all clicks for a given campaign."""
188
    rawEvents = api.campaigns.get(campaign_id).as_dict()["timeline"]
×
189
    clicks = list()  # Holds list of all users that clicked.
×
190

191
    for rawEvent in rawEvents:
×
192
        if rawEvent["message"] == "Clicked Link":
×
193
            click = dict()
×
194

195
            # Builds out click document.
196
            click["user"] = hashlib.sha256(
×
197
                rawEvent["email"].encode("utf-8")
198
            ).hexdigest()
199
            click["source_ip"] = rawEvent["details"]["browser"]["address"]
×
200

201
            click["time"] = rawEvent["time"]
×
202

203
            click["application"] = get_application(rawEvent)
×
204

205
            clicks.append(click)
×
206

207
    return clicks
×
208

209

210
def get_email_status(api, campaign_id):
6✔
211
    """Return the email send status and time."""
212
    rawEvents = api.campaigns.get(campaign_id).as_dict()["timeline"]
×
213
    status = list()
×
214
    for rawEvent in rawEvents:
×
215
        email = dict()
×
216

217
        if rawEvent["message"] == "Email Sent":
×
218
            email["user"] = hashlib.sha256(
×
219
                rawEvent["email"].encode("utf-8")
220
            ).hexdigest()
221

222
            email["time"] = rawEvent["time"]
×
223

224
            email["status"] = "SUCCESS"
×
225

226
        elif rawEvent["message"] == "Error Sending Email":
×
227
            email["user"] = hashlib.sha256(
×
228
                rawEvent["email"].encode("utf-8")
229
            ).hexdigest()
230

231
            # Trim microseconds before converting to datetime.
232
            rawEvent["time"] = datetime.strptime(
×
233
                rawEvent["time"].split(".")[0], "%Y-%m-%dT%H:%M:%S"
234
            )
235
            email["time"] = rawEvent["time"]
×
236

237
            email["status"] = "Failed"
×
238

239
        if email:
×
240
            status.append(email)
×
241

242
    return status
×
243

244

245
def get_application(rawEvent):
6✔
246
    """Return application details."""
247
    application = dict()
×
248

249
    application["external_ip"] = rawEvent["details"]["browser"]["address"]
×
250

251
    # Process user agent string.
252
    userAgent = rawEvent["details"]["browser"]["user-agent"]
×
253
    application["name"] = httpagentparser.detect(userAgent)["platform"]["name"]
×
254
    application["version"] = httpagentparser.detect(userAgent)["platform"]["version"]
×
255

256
    return application
×
257

258

259
def find_unique_target_clicks_count(clicks):
6✔
260
    """Return the number of unique clicks in a click set."""
261
    uniq_users = set()
6✔
262
    for click in clicks:
6✔
263
        uniq_users.add(click["user"])
6✔
264
    return len(uniq_users)
6✔
265

266

267
def write_campaign_summary(api, assessment_id):
6✔
268
    """Output a campaign summary report to JSON, console, and a text file."""
269
    campaign_ids = get_campaign_ids(api, assessment_id)
×
270
    campaign_data_template = "campaign_data.json"
×
271
    campaign_summary_json = f"{assessment_id}_campaign_data.json"
×
272
    campaign_summary_textfile = f"{assessment_id}_summary_{datetime.strftime(datetime.now(), '%Y-%m-%dT%H:%M:%S')}.txt"
×
273

274
    with open(campaign_data_template) as template:
×
275
        campaign_data = json.load(template)
×
276

277
    logging.info("Writing campaign summary report to %s", campaign_summary_textfile)
×
278
    file_out = open(campaign_summary_textfile, "w+")
×
279
    file_out.write("Campaigns for Assessment: " + assessment_id)
×
280

281
    regex = re.compile(r"^.*_(?P<level>level-[1-6])$")
×
282
    for campaign_id in campaign_ids:
×
283
        campaign = api.campaigns.get(campaign_id)
×
284
        match = regex.fullmatch(campaign.name)
×
285
        if match:
×
286
            level = match.group("level")
×
287
        else:
288
            logging.warn(
×
289
                "Encountered campaign (%s) that is unable to be processed for campaign summary export. \n"
290
                "Campaign name is not properly suffixed with the campaign level number (e.g. '_level-1')\n"
291
                "Skipping campaign",
292
                campaign.name,
293
            )
294
            continue
×
295

296
        logging.info(level)
×
297
        clicks = get_click_data(api, campaign_id)
×
298

299
        total_clicks = api.campaigns.summary(campaign_id=campaign_id).stats.clicked
×
300
        unique_clicks = find_unique_target_clicks_count(clicks)
×
301
        if total_clicks > 0:
×
302
            percent_clicks = unique_clicks / float(total_clicks)
×
303
        else:
304
            percent_clicks = 0.0
×
305
        campaign_data[level]["subject"] = campaign.template.subject
×
306
        campaign_data[level]["sender"] = campaign.smtp.from_address
×
307
        campaign_data[level]["start_date"] = campaign.launch_date
×
308
        campaign_data[level]["end_date"] = campaign.completed_date
×
309
        campaign_data[level]["redirect"] = campaign.url
×
310
        campaign_data[level]["clicks"] = total_clicks
×
311
        campaign_data[level]["unique_clicks"] = unique_clicks
×
312
        campaign_data[level]["percent_clicks"] = percent_clicks
×
313

314
        file_out.write("\n")
×
315
        file_out.write("-" * 50)
×
316
        file_out.write("\nCampaign: %s" % campaign.name)
×
317
        file_out.write("\nSubject: %s" % campaign_data[level]["subject"])
×
318
        file_out.write("\nSender: %s" % campaign_data[level]["sender"])
×
319
        file_out.write("\nStart Date: %s" % campaign_data[level]["start_date"])
×
320
        file_out.write("\nEnd Date: %s" % campaign_data[level]["end_date"])
×
321
        file_out.write("\nRedirect: %s" % campaign_data[level]["redirect"])
×
322
        file_out.write("\nClicks: %d" % campaign_data[level]["clicks"])
×
323
        file_out.write("\nUnique Clicks: %d" % campaign_data[level]["unique_clicks"])
×
324
        file_out.write(
×
325
            "\nPercentage Clicks: %f" % campaign_data[level]["percent_clicks"]
326
        )
327

328
    file_out.close()
×
329
    logging.info("Writing out summary JSON to %s", campaign_summary_json)
×
330
    with open(campaign_summary_json, "w") as fp:
×
331
        json.dump(campaign_data, fp, indent=4)
×
332

333

334
def export_user_reports(api, assessment_id):
6✔
335
    """Build and export a user_report JSON file for each campaign in an assessment."""
336
    campaign_ids = get_campaign_ids(api, assessment_id)
×
337

338
    for campaign_id in campaign_ids:
×
339
        first_report = None
×
340
        user_report_doc = dict()
×
341
        campaign = get_campaign_data(api, campaign_id)
×
342

343
        # iterate over clicks and find the earliest click
344
        for click in campaign["clicks"]:
×
345
            click_time = datetime.strptime(
×
346
                click["time"].split(".")[0], "%Y-%m-%dT%H:%M:%S"
347
            )
348
            if first_report is None or click_time < first_report:
×
349
                first_report = click_time
×
350

351
        # The "customer" field is a placeholder added for operator convenience when
352
        # working with the JSON file created.
353
        user_report_doc["customer"] = ""
×
354
        user_report_doc["assessment"] = assessment_id
×
355
        # get_campaign_ids() returns integers, but user_report_doc["campaign"]
356
        # expects a string
357
        user_report_doc["campaign"] = str(campaign_id)
×
358
        if first_report is not None:
×
359
            user_report_doc["first_report"] = datetime.strftime(
×
360
                first_report, "%Y-%m-%dT%H:%M:%S"
361
            )
362
        else:
363
            user_report_doc["first_report"] = "No clicks reported"
×
364

365
        user_report_doc["total_num_reports"] = api.campaigns.summary(
×
366
            campaign_id=campaign_id
367
        ).stats.clicked
368

369
        logging.info(
×
370
            "Writing out user report for campaign %s in assessment %s",
371
            campaign["id"],
372
            assessment_id,
373
        )
374

375
        with open(f"{assessment_id}_{campaign_id}_user_report_doc.json", "w") as fp:
×
376
            json.dump(user_report_doc, fp, indent=4)
×
377

378

379
def main() -> None:
6✔
380
    """Set up logging, connect to API, export all assessment data."""
381
    args: Dict[str, str] = docopt(__doc__, version=__version__)
×
382

383
    # Set up logging
384
    log_level = args["--log-level"]
×
385
    try:
×
386
        logging.basicConfig(
×
387
            format="\n%(levelname)s: %(message)s", level=log_level.upper()
388
        )
389
    except ValueError:
×
390
        logging.critical(
×
391
            '"%s" is not a valid logging level. Possible values are debug, info, warning, and error.',
392
            log_level,
393
        )
394
        sys.exit(1)
×
395

396
    else:
397
        # Connect to API
398
        try:
×
399
            api = connect_api(args["API_KEY"], args["SERVER"])
×
400
            logging.debug("Connected to: %s", args["SERVER"])
×
401
        except Exception as e:
×
402
            logging.critical(e.args[0])
×
403
            sys.exit(1)
×
404

405
    if not validate_assessment_id(args["ASSESSMENT_ID"]):
×
406
        logging.critical(
×
407
            '"%s" is an invalid assessment_id format. Assessment identifiers begin with RV and are followed by '
408
            " a 4 or 5 digit numerical sequence. Examples: RV1234, RV12345",
409
            args["ASSESSMENT_ID"],
410
        )
411
        sys.exit(1)
×
412

413
    if assessment_exists(api, args["ASSESSMENT_ID"]):
×
414
        assessment_dict: Dict = dict()
×
415

416
        # Add targets list to assessment dict.
417
        assessment_dict["targets"] = export_targets(api, args["ASSESSMENT_ID"])
×
418

419
        # Add campaigns list to the assessment dict.
420
        assessment_dict["campaigns"] = export_campaigns(api, args["ASSESSMENT_ID"])
×
421

422
        with open(f'data_{args["ASSESSMENT_ID"]}.json', "w") as fp:
×
423
            json.dump(assessment_dict, fp, indent=4)
×
424

425
        logging.info("Data written to data_%s.json", args["ASSESSMENT_ID"])
×
426

427
        export_user_reports(api, args["ASSESSMENT_ID"])
×
428
        write_campaign_summary(api, args["ASSESSMENT_ID"])
×
429
    else:
430
        logging.error(
×
431
            'Assessment "%s" does not exist in Gophish.', args["ASSESSMENT_ID"]
432
        )
433
        sys.exit(1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc