• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / gophish-tools / 4759283117

pending completion
4759283117

push

github

GitHub
Merge pull request #123 from cisagov/lineage/skeleton

141 of 473 branches covered (29.81%)

Branch coverage included in aggregate %.

9 of 24 new or added lines in 10 files covered. (37.5%)

223 existing lines in 5 files now uncovered.

298 of 1270 relevant lines covered (23.46%)

1.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/tools/gophish_import.py
1
"""Import an assessment JSON file into Gophish.
2

3
Usage:
4
  gophish-import [--log-level=LEVEL] [--reschedule] ASSESSMENT_FILE SERVER API_KEY
5
  gophish-import (-h | --help)
6
  gophish-import --version
7

8
Options:
9
  API_KEY                   Gophish API key.
10
  ASSESSMENT_FILE           Name of the JSON file containing assessment data.
11
  SERVER                    Full URL to Gophish server.
12
  -r --reschedule           Adjust the current schedule of an assessment with the new schedule in the ASSESSMENT_FILE.
13
  -h --help                 Show this screen.
14
  --version                 Show version.
15
  -l --log-level=LEVEL      If specified, then the log level will be set to
16
                            the specified value.  Valid values are "debug", "info",
17
                            "warning", "error", and "critical". [default: info]
18
"""
19

20
# Standard Python Libraries
21
import json
×
22
import logging
×
23
import sys
×
24
from typing import Dict
×
25

26
# Third-Party Libraries
27
from docopt import docopt
×
28

29
# No type stubs exist for gophish, so we add "type: ignore" to tell mypy to
30
# ignore this library
NEW
31
from gophish.models import (  # type: ignore
×
32
    SMTP,
33
    Campaign,
34
    Error,
35
    Group,
36
    Page,
37
    Template,
38
    User,
39
)
NEW
40
import urllib3
×
41

42
# cisagov Libraries
UNCOV
43
from tools.connect import connect_api
×
44

UNCOV
45
from ._version import __version__
×
46

47
# Disable "Insecure Request" warning: Gophish uses a self-signed certificate
48
# as default for https connections, which can not be  verified by a third
49
# party; thus, an SSL insecure request warning is produced.
NEW
50
urllib3.disable_warnings()
×
51

52

UNCOV
53
def load_landings(api, assessment):
×
54
    """Return all landing pages in an assessment."""
UNCOV
55
    pages = assessment["pages"]
×
56

UNCOV
57
    for page in pages:
×
UNCOV
58
        new_page = Page()
×
UNCOV
59
        new_page.name = page["name"]
×
UNCOV
60
        new_page.capture_credentials = page["capture_credentials"]
×
61
        new_page.capture_passwords = page["capture_passwords"]
×
UNCOV
62
        new_page.html = page["html"]
×
UNCOV
63
        if page["redirect_url"]:
×
64
            new_page.redirect_url = page["redirect_url"]
×
65

66
        # Debug page information
67

68
        logging.debug("Page Name: %s", new_page.name)
×
69
        logging.debug("Redirect URL: %s", new_page.redirect_url)
×
70

71
        """
72
         Catches when a page has already been loaded into Gophish.
73
         Finds the current Gophish page ID so it can be deleted
74
         prior to re-loading the new page.
75
        """
76
        while True:
UNCOV
77
            try:
×
UNCOV
78
                new_page = api.pages.post(new_page)
×
79
                break
×
80
            except Error as e:
×
UNCOV
81
                if e.message == "Page name already in use":
×
UNCOV
82
                    logging.warning("%s. Finding with previously loaded page.", e)
×
UNCOV
83
                    old_pages = api.pages.get()
×
UNCOV
84
                    for old_page in old_pages:
×
UNCOV
85
                        if old_page.name == new_page.name:
×
UNCOV
86
                            logging.debug("Deleting Page with ID %d", old_page.id)
×
UNCOV
87
                            api.pages.delete(old_page.id)
×
88
                            logging.info("Re-Loading new page.")
×
89
                else:
90
                    logging.error(f"{e}\n")
×
91
                    raise
×
92

93
        # Returns Landing Page ID
94
        logging.info("Landing Page %s loaded.", new_page.name)
×
95
        page["id"] = new_page.id
×
96

97
    return pages
×
98

99

UNCOV
100
def load_groups(api, assessment):
×
101
    """Return all groups in an assessment."""
102
    groups = assessment["groups"]
×
103

UNCOV
104
    for group in groups:
×
105
        logging.info("Loading Group %s", group["name"])
×
106

UNCOV
107
        new_group = Group()
×
108
        new_group.name = group["name"]
×
109

UNCOV
110
        for tgt in group["targets"]:
×
111
            target = User()
×
UNCOV
112
            target.first_name = tgt["first_name"]
×
113
            target.last_name = tgt["last_name"]
×
UNCOV
114
            target.email = tgt["email"]
×
115
            if tgt["position"]:
×
116
                target.position = tgt["position"]
×
UNCOV
117
            new_group.targets.append(target)
×
118

119
        """
120
         Catches when a Group has already been loaded into Gophish.
121
         Finds the current Gophish group ID so it can be deleted
122
         prior to re-loading the new group.
123
        """
124
        while True:
125
            try:
×
126
                new_group = api.groups.post(new_group)
×
127
                break
×
128
            except Error as e:
×
UNCOV
129
                if e.message == "Group name already in use":
×
UNCOV
130
                    logging.warning("%s. Finding previously loaded group to delete.", e)
×
UNCOV
131
                    groups = api.groups.get()
×
UNCOV
132
                    logging.debug(
×
133
                        "Checking %d for previously imported group to get ID",
134
                        len(groups),
135
                    )
136
                    for old_group in groups:
×
137
                        if old_group.name == new_group.name:
×
138
                            logging.debug("Deleting Group with ID %d", old_group.id)
×
139
                            api.groups.delete(old_group.id)
×
140
                            logging.info("Re-Loading new group.")
×
141
                else:
142
                    logging.exception(
×
143
                        "Exception encountered when loading group in Gophish (%s)", e
144
                    )
UNCOV
145
                    raise
×
146

147
        group["id"] = new_group.id
×
148

149
        logging.info("Group Ready: %s", new_group.name)
×
150

151
    return groups
×
152

153

UNCOV
154
def build_campaigns(api, assessment):
×
155
    """Build campaigns."""
156
    logging.info("Building Campaigns.")
×
UNCOV
157
    for campaign in assessment["campaigns"]:
×
158
        logging.info("Building Campaign: %s", campaign["name"])
×
159

160
        # Build Template object
UNCOV
161
        new_template = Template(
×
162
            name=campaign["template"]["name"],
163
            subject=campaign["template"]["subject"],
164
            html=campaign["template"]["html"],
165
            text=campaign["template"]["text"],
166
        )
167

168
        """
169
         Catches when Template has already been loaded into Gophish.
170
         Finds the current Gophish template ID so it can be deleted
171
         prior to re-loading the new template.
172
        """
173
        while True:
UNCOV
174
            try:
×
UNCOV
175
                new_template = api.templates.post(new_template)
×
UNCOV
176
                break
×
UNCOV
177
            except Error as e:
×
UNCOV
178
                if e.message == "Template name already in use":
×
UNCOV
179
                    logging.warning(
×
180
                        "%s. Finding previously loaded template to delete.", e.message
181
                    )
UNCOV
182
                    templates = api.templates.get()
×
UNCOV
183
                    logging.debug(
×
184
                        "Checking %d for previously imported template to get ID",
185
                        len(templates),
186
                    )
187
                    for old_template in templates:
×
188
                        if old_template.name == new_template.name:
×
189
                            logging.debug(
×
190
                                "Deleting Template with ID %d", old_template.id
191
                            )
UNCOV
192
                            api.templates.delete(old_template.id)
×
193
                            logging.info("Re-Loading new template.")
×
194
                else:
UNCOV
195
                    logging.exception(
×
196
                        "Exception encountered when loading template in Gophish (%s)",
197
                        e.message,
198
                    )
199
                    raise
×
200

201
        # Build SMTP Object
UNCOV
202
        new_smtp = SMTP(
×
203
            name=campaign["smtp"]["name"],
204
            host=campaign["smtp"]["host"],
205
            from_address=campaign["smtp"]["from_address"],
206
            interface_type="SMTP",
207
            ignore_cert_errors=True,
208
        )
UNCOV
209
        if (
×
210
            "username" in campaign["smtp"].keys()
211
            and "password" in campaign["smtp"].keys()
212
        ):
213
            new_smtp.username = campaign["smtp"]["username"]
×
UNCOV
214
            new_smtp.password = campaign["smtp"]["password"]
×
215

216
        while True:
UNCOV
217
            try:
×
UNCOV
218
                new_smtp = api.smtp.post(new_smtp)
×
UNCOV
219
                break
×
220
            except Error as e:
×
UNCOV
221
                if e.message == "SMTP name already in use":
×
UNCOV
222
                    logging.warning("%s. Finding previously loaded smtp to delete.", e)
×
UNCOV
223
                    smtps = api.smtp.get()
×
224
                    logging.debug(
×
225
                        "Checking %d for previously imported smtp profiles to get ID",
226
                        len(smtps),
227
                    )
228
                    for old_smtp in smtps:
×
229
                        if old_smtp.name == new_smtp.name:
×
230
                            logging.debug("Deleting SMTP with ID %d", old_smtp.id)
×
231
                            api.smtp.delete(old_smtp.id)
×
232
                            logging.info("Re-Loading new SMTP.")
×
233
                else:
234
                    logging.exception(
×
235
                        "Exception encountered when loading SMTP in Gophish (%s)",
236
                        e.message,
237
                    )
UNCOV
238
                    raise
×
239

240
        # Check to remove any campaigns with the same name
241
        old_campaigns = api.campaigns.get()
×
242
        for old_campaign in old_campaigns:
×
243
            if old_campaign.name == campaign["name"]:
×
UNCOV
244
                logging.warning(
×
245
                    "Previous Campaign found with name %s.", campaign["name"]
246
                )
UNCOV
247
                logging.warning(
×
248
                    "Previous Campaign with id %d being deleted.", old_campaign.id
249
                )
UNCOV
250
                api.campaigns.delete(old_campaign.id)
×
251

252
        # Loads the campaign
253
        try:
×
254
            api.campaigns.post(
×
255
                Campaign(
256
                    name=campaign["name"],
257
                    groups=[Group(name=campaign["group_name"])],
258
                    page=Page(name=campaign["page_name"]),
259
                    template=new_template,
260
                    smtp=new_smtp,
261
                    url=campaign["url"],
262
                    launch_date=campaign["launch_date"],
263
                    completed_date=campaign["complete_date"],
264
                )
265
            )
UNCOV
266
        except Exception as e:
×
UNCOV
267
            logging.exception(
×
268
                "Exception encountered when loading campaign in Gophish (%s)", e.message
269
            )
UNCOV
270
            raise
×
271

UNCOV
272
        logging.info("Campaign %s successfully loaded.", campaign["name"])
×
273

274

UNCOV
275
def main() -> None:
×
276
    """Set up logging, connect to API, import all assessment data."""
277
    args: Dict[str, str] = docopt(__doc__, version=__version__)
×
278

279
    # Set up logging
UNCOV
280
    log_level = args["--log-level"]
×
281
    try:
×
UNCOV
282
        logging.basicConfig(
×
283
            format="%(asctime)-15s %(levelname)s: %(message)s", level=log_level.upper()
284
        )
UNCOV
285
    except ValueError:
×
286
        logging.critical(
×
287
            '"%s" is not a valid logging level.  Possible values are debug, info, warning, and error.',
288
            log_level,
289
        )
UNCOV
290
        sys.exit(1)
×
291

292
    try:
×
293
        api = connect_api(args["API_KEY"], args["SERVER"])
×
UNCOV
294
        logging.debug("Connected to: %s", args["SERVER"])
×
UNCOV
295
    except Exception as e:
×
296
        logging.critical(e.args[0])
×
297
        # Stop logging and clean up
UNCOV
298
        logging.shutdown()
×
UNCOV
299
        sys.exit(1)
×
300

301
    # Load assessment JSON from file
UNCOV
302
    try:
×
303
        with open(args["ASSESSMENT_FILE"]) as json_file:
×
304
            assessment = json.load(json_file)
×
305
    except FileNotFoundError as e:
×
306
        logging.exception("Unable to locate Assessment file (%s)", e)
×
307
        # Stop logging and clean up
UNCOV
308
        logging.shutdown()
×
309
        sys.exit(1)
×
310
    except PermissionError as e:
×
UNCOV
311
        logging.exception("Permission denied for opening Assessment file (%s)", e)
×
312
        # Stop logging and clean up
313
        logging.shutdown()
×
314
        sys.exit(1)
×
315

316
    try:
×
317
        # Load Landing page
UNCOV
318
        assessment["pages"] = load_landings(api, assessment)
×
319

320
        # Load Groups into Gophish, returns group numbers correlated to Group number
321
        assessment["groups"] = load_groups(api, assessment)
×
322

323
        # Load Campaigns
324
        build_campaigns(api, assessment)
×
325

326
        # Stop logging and clean up
327
        logging.shutdown()
×
328

329
    except Exception as e:
×
UNCOV
330
        logging.exception(
×
331
            "Exception encountered while loading data from Gophish (%s: %s)", type(e), e
332
        )
UNCOV
333
        logging.critical("Closing with an error. Assessment not successfully loaded.")
×
334
        # Stop logging and clean up
335
        logging.shutdown()
×
UNCOV
336
        sys.exit(1)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc