• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pshtt / 4930561294

pending completion
4930561294

push

github

Jeremy Frasier
Minor whitespace change to make the Black linter happy

58 of 287 branches covered (20.21%)

Branch coverage included in aggregate %.

0 of 1 new or added line in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

358 of 865 relevant lines covered (41.39%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.62
/src/pshtt/pshtt.py
1
"""Provide the core functionality of the pshtt library."""
2

3
# Standard Python Libraries
4
import base64
2✔
5
import codecs
2✔
6
import json
2✔
7
import logging
2✔
8
import os
2✔
9
import re
2✔
10
import sys
2✔
11
from urllib import parse as urlparse
2✔
12

13
# Third-Party Libraries
14
import OpenSSL
2✔
15
from publicsuffixlist.compat import PublicSuffixList  # type: ignore
2✔
16
from publicsuffixlist.update import updatePSL  # type: ignore
2✔
17
import requests
2✔
18

19
# Unable to find type stubs for the sslyze package.
20
import sslyze  # type: ignore
2✔
21
from sslyze.server_connectivity_tester import (  # type: ignore
2✔
22
    ServerConnectivityError,
23
    ServerConnectivityTester,
24
)
25
import sslyze.synchronous_scanner  # type: ignore
2✔
26
import urllib3
2✔
27

28
from . import utils
2✔
29
from .models import Domain, Endpoint
2✔
30

31
# We're going to be making requests with certificate validation
32
# disabled.  Commented next line due to pylint warning that urllib3 is
33
# not in requests.packages
34
# requests.packages.urllib3.disable_warnings()
35
urllib3.disable_warnings()
2✔
36

37
# Default, overrideable via --user-agent
38
USER_AGENT = "pshtt, https scanning"
2✔
39

40
# Defaults to 5 second, overrideable via --timeout
41
TIMEOUT = 5
2✔
42

43
# The fields we're collecting, will be keys in JSON and
44
# column headers in CSV.
45
HEADERS = [
2✔
46
    "Domain",
47
    "Base Domain",
48
    "Canonical URL",
49
    "Live",
50
    "HTTPS Live",
51
    "HTTPS Full Connection",
52
    "HTTPS Client Auth Required",
53
    "Redirect",
54
    "Redirect To",
55
    "Valid HTTPS",
56
    "HTTPS Publicly Trusted",
57
    "HTTPS Custom Truststore Trusted",
58
    "Defaults to HTTPS",
59
    "Downgrades HTTPS",
60
    "Strictly Forces HTTPS",
61
    "HTTPS Bad Chain",
62
    "HTTPS Bad Hostname",
63
    "HTTPS Expired Cert",
64
    "HTTPS Self Signed Cert",
65
    "HSTS",
66
    "HSTS Header",
67
    "HSTS Max Age",
68
    "HSTS Entire Domain",
69
    "HSTS Preload Ready",
70
    "HSTS Preload Pending",
71
    "HSTS Preloaded",
72
    "Base Domain HSTS Preloaded",
73
    "Domain Supports HTTPS",
74
    "Domain Enforces HTTPS",
75
    "Domain Uses Strong HSTS",
76
    "IP",
77
    "Server Header",
78
    "Server Version",
79
    "HTTPS Cert Chain Length",
80
    "HTTPS Probably Missing Intermediate Cert",
81
    "Notes",
82
    "Unknown Error",
83
]
84

85
# Used for caching the HSTS preload list from Chromium's source.
86
CACHE_PRELOAD_LIST_DEFAULT = "preloaded.json"
2✔
87
PRELOAD_LIST = None
2✔
88

89
# Used for caching the HSTS pending preload list from hstspreload.org.
90
CACHE_PRELOAD_PENDING_DEFAULT = "preload-pending.json"
2✔
91
PRELOAD_PENDING = None
2✔
92

93
# Used for determining base domain via Mozilla's public suffix list.
94
CACHE_SUFFIX_LIST_DEFAULT = "public-suffix-list.txt"
2✔
95
SUFFIX_LIST = None
2✔
96

97
# Directory to cache all third party responses, if set by user.
98
THIRD_PARTIES_CACHE = None
2✔
99

100
# Set if user wants to use a custom CA bundle
101
CA_FILE = None
2✔
102
STORE = "Mozilla"
2✔
103
PT_INT_CA_FILE = None
2✔
104

105

106
def inspect(base_domain):
2✔
107
    """Inpsect the provided domain."""
108
    domain = Domain(base_domain)
×
109
    domain.http = Endpoint("http", "root", base_domain)
×
110
    domain.httpwww = Endpoint("http", "www", base_domain)
×
111
    domain.https = Endpoint("https", "root", base_domain)
×
112
    domain.httpswww = Endpoint("https", "www", base_domain)
×
113

114
    # Analyze HTTP endpoint responsiveness and behavior.
115
    basic_check(domain.http)
×
116
    basic_check(domain.httpwww)
×
117
    basic_check(domain.https)
×
118
    basic_check(domain.httpswww)
×
119

120
    # Analyze HSTS header, if present, on each HTTPS endpoint.
121
    hsts_check(domain.https)
×
122
    hsts_check(domain.httpswww)
×
123

124
    return result_for(domain)
×
125

126

127
def result_for(domain):
2✔
128
    """Get the results for the provided domain."""
129
    # print(utils.json_for(domain.to_object()))
130

131
    # Because it will inform many other judgments, first identify
132
    # an acceptable "canonical" URL for the domain.
133
    domain.canonical = canonical_endpoint(
2✔
134
        domain.http, domain.httpwww, domain.https, domain.httpswww
135
    )
136

137
    # First, the basic fields the CSV will use.
138
    result = {
2✔
139
        "Domain": domain.domain,
140
        "Base Domain": parent_domain_for(domain.domain),
141
        "Canonical URL": domain.canonical.url,
142
        "Live": is_live(domain),
143
        "Redirect": is_redirect_domain(domain),
144
        "Redirect To": redirects_to(domain),
145
        "HTTPS Live": is_https_live(domain),
146
        "HTTPS Full Connection": is_full_connection(domain),
147
        "HTTPS Client Auth Required": is_client_auth_required(domain),
148
        "Valid HTTPS": is_valid_https(domain),
149
        "HTTPS Publicly Trusted": is_publicly_trusted(domain),
150
        "HTTPS Custom Truststore Trusted": is_custom_trusted(domain),
151
        "Defaults to HTTPS": is_defaults_to_https(domain),
152
        "Downgrades HTTPS": is_downgrades_https(domain),
153
        "Strictly Forces HTTPS": is_strictly_forces_https(domain),
154
        "HTTPS Bad Chain": is_bad_chain(domain),
155
        "HTTPS Bad Hostname": is_bad_hostname(domain),
156
        "HTTPS Expired Cert": is_expired_cert(domain),
157
        "HTTPS Self Signed Cert": is_self_signed_cert(domain),
158
        "HTTPS Cert Chain Length": cert_chain_length(domain),
159
        "HTTPS Probably Missing Intermediate Cert": is_missing_intermediate_cert(
160
            domain
161
        ),
162
        "HSTS": is_hsts(domain),
163
        "HSTS Header": hsts_header(domain),
164
        "HSTS Max Age": hsts_max_age(domain),
165
        "HSTS Entire Domain": is_hsts_entire_domain(domain),
166
        "HSTS Preload Ready": is_hsts_preload_ready(domain),
167
        "HSTS Preload Pending": is_hsts_preload_pending(domain),
168
        "HSTS Preloaded": is_hsts_preloaded(domain),
169
        "Base Domain HSTS Preloaded": is_parent_hsts_preloaded(domain),
170
        "Domain Supports HTTPS": is_domain_supports_https(domain),
171
        "Domain Enforces HTTPS": is_domain_enforces_https(domain),
172
        "Domain Uses Strong HSTS": is_domain_strong_hsts(domain),
173
        "IP": get_domain_ip(domain),
174
        "Server Header": get_domain_server_header(domain),
175
        "Server Version": get_domain_server_version(domain),
176
        "Notes": get_domain_notes(domain),
177
        "Unknown Error": did_domain_error(domain),
178
    }
179

180
    # But also capture the extended data for those who want it.
181
    result["endpoints"] = domain.to_object()
2✔
182

183
    # This bit is complicated because of the continue statements,
184
    # perhaps overly so.  For instance, the continue statement
185
    # following the "if header in ..." statement after "if not
186
    # result['HTTPS Full Connection]" means that the final if
187
    # statement that sets None values to False does not apply to those
188
    # fields.  This code should be rewritten to more clear, or at
189
    # least commented so that it is clearer what is happening to the
190
    # various fields.  There is some implied logic due to the continue
191
    # statements that is tricky, at least at first glance.
192
    #
193
    # Also, the comment before "for header in HEADERS" is not accurate
194
    # for the same reason.
195
    #
196
    # - jsf9k
197

198
    # Convert Header fields from None to False, except for:
199
    # - "HSTS Header"
200
    # - "HSTS Max Age"
201
    # - "Redirect To"
202
    for header in HEADERS:
2✔
203
        if header in ("HSTS Header", "HSTS Max Age", "Redirect To"):
2✔
204
            continue
2✔
205

206
        if not result["HTTPS Full Connection"]:
2!
207
            if header in (
2✔
208
                "HSTS",
209
                "HSTS Header",
210
                "HSTS Max Age",
211
                "HSTS Entire Domain",
212
                "HSTS Preload Ready",
213
                "Domain Uses Strong HSTS",
214
            ):
215
                continue
2✔
216

217
        if (
2✔
218
            header
219
            in ("IP", "Server Header", "Server Version", "HTTPS Cert Chain Length")
220
            and result[header] is None
221
        ):
222
            continue
2✔
223

224
        if header in (
2✔
225
            "Valid HTTPS",
226
            "HTTPS Publicly Trusted",
227
            "HTTPS Custom Truststore Trusted",
228
        ):
229
            if not result["HTTPS Live"]:
2!
230
                result[header] = False
2✔
231
            continue
2✔
232

233
        if result[header] is None:
2✔
234
            result[header] = False
2✔
235

236
    return result
2✔
237

238

239
def ping(url, allow_redirects=False, verify=True):
2✔
240
    """Attempt to reach the given URL.
241

242
    If there is a custom CA file and we want to verify
243
    use that instead when pinging with requests
244

245
    By changing the verify param from a boolean to a .pem file, the
246
    requests module will use the .pem to validate HTTPS connections.
247

248
    Note that we are using the streaming variant of the
249
    python-requests library here and we are not actually reading the
250
    content of the request.  As a result, the close() method MUST be
251
    called on the Request object returned by this method.  That is the
252
    ONLY way the connection can be closed and released back into the
253
    pool.  One way to ensure this happens is to use the "with" Python
254
    construct.
255

256
    If we ever begin reading response bodies, they will need to be
257
    explicitly read from Response.content, and we will also want to
258
    use conditional logic to read from response bodies where they
259
    exist and are useful. We'll also need to watch for Content-Type
260
    values like multipart/x-mixed-replace;boundary=ffserver that
261
    indicate that the response body will stream indefinitely.
262
    """
263
    if CA_FILE and verify:
×
264
        verify = CA_FILE
×
265

266
    return requests.get(
×
267
        url,
268
        allow_redirects=allow_redirects,
269
        # Validate certificates.
270
        verify=verify,
271
        # Setting this to true delays the retrieval of the content
272
        # until we access Response.content.  Since we aren't
273
        # interested in the actual content of the request, this will
274
        # save us time and bandwidth.
275
        #
276
        # This will also stop pshtt from hanging on URLs that stream
277
        # neverending data, like webcams.  See issue #138:
278
        # https://github.com/dhs-ncats/pshtt/issues/138
279
        stream=True,
280
        # set by --user_agent
281
        headers={"User-Agent": USER_AGENT},
282
        # set by --timeout
283
        timeout=TIMEOUT,
284
    )
285

286

287
def basic_check(endpoint):
2✔
288
    """Test the endpoint.
289

290
    At first:
291
    * Don't follow redirects. (Will only follow if necessary.)
292
      If it's a 3XX, we'll ping again to follow redirects. This is
293
      necessary to reliably scope any errors (e.g. TLS errors) to
294
      the original endpoint.
295

296
    * Validate certificates. (Will figure out error if necessary.)
297
    """
298
    utils.debug("Pinging %s...", endpoint.url, divider=True)
×
299

300
    req = None
×
301

302
    try:
×
303
        with ping(endpoint.url) as req:
×
304
            endpoint.live = True
×
305
            if endpoint.protocol == "https":
×
306
                endpoint.https_full_connection = True
×
307
                endpoint.https_valid = True
×
308

309
    except requests.exceptions.SSLError as err:
×
310
        if "bad handshake" in str(err) and (
×
311
            "sslv3 alert handshake failure" in str(err) or "Unexpected EOF" in str(err)
312
        ):
313
            logging.exception(
×
314
                "%s: Error completing TLS handshake usually due to required client authentication.",
315
                endpoint.url,
316
            )
317
            utils.debug("%s: %s", endpoint.url, err)
×
318
            endpoint.live = True
×
319
            if endpoint.protocol == "https":
×
320
                # The https can still be valid with a handshake error,
321
                # sslyze will run later and check if it is not valid
322
                endpoint.https_valid = True
×
323
                endpoint.https_full_connection = False
×
324

325
        else:
326
            logging.exception(
×
327
                "%s: Error connecting over SSL/TLS or validating certificate.",
328
                endpoint.url,
329
            )
330
            utils.debug("%s: %s", endpoint.url, err)
×
331
            # Retry with certificate validation disabled.
332
            try:
×
333
                with ping(endpoint.url, verify=False) as req:
×
334
                    endpoint.live = True
×
335
                    if endpoint.protocol == "https":
×
336
                        endpoint.https_full_connection = True
×
337
                        # sslyze later will actually check if the cert is valid
338
                        endpoint.https_valid = True
×
339
            except requests.exceptions.SSLError as err:
×
340
                # If it's a protocol error or other, it's not a full connection,
341
                # but it is live.
342
                endpoint.live = True
×
343
                if endpoint.protocol == "https":
×
344
                    endpoint.https_full_connection = False
×
345
                    # HTTPS may still be valid, sslyze will double-check later
346
                    endpoint.https_valid = True
×
347
                logging.exception(
×
348
                    "%s: Unexpected SSL protocol (or other) error during retry.",
349
                    endpoint.url,
350
                )
351
                utils.debug("%s: %s", endpoint.url, err)
×
352
                # continue on to SSLyze to check the connection
353
            except requests.exceptions.RequestException as err:
×
354
                endpoint.live = False
×
355
                logging.exception(
×
356
                    "%s: Unexpected requests exception during retry.", endpoint.url
357
                )
358
                utils.debug("%s: %s", endpoint.url, err)
×
359
                return
×
360
            except OpenSSL.SSL.Error as err:
×
361
                endpoint.live = False
×
362
                logging.exception(
×
363
                    "%s: Unexpected OpenSSL exception during retry.", endpoint.url
364
                )
365
                utils.debug("%s: %s", endpoint.url, err)
×
366
                return
×
367
            except Exception as err:
×
368
                endpoint.unknown_error = True
×
369
                logging.exception(
×
370
                    "%s: Unexpected other unknown exception during requests retry.",
371
                    endpoint.url,
372
                )
373
                utils.debug("%s: %s", endpoint.url, err)
×
374
                return
×
375

376
        # If it was a certificate error of any kind, it's live,
377
        # unless SSLyze encounters a connection error later
378
        endpoint.live = True
×
379

380
    except requests.exceptions.ConnectionError as err:
×
381
        # We can get this for some endpoints that are actually live,
382
        # so if it's https let's try sslyze to be sure
383
        if endpoint.protocol == "https":
×
384
            # https check later will set whether the endpoint is live and valid
385
            endpoint.https_full_connection = False
×
386
            endpoint.https_valid = True
×
387
        else:
388
            endpoint.live = False
×
389
        logging.exception("%s: Error connecting.", endpoint.url)
×
390
        utils.debug("%s: %s", endpoint.url, err)
×
391

392
    # And this is the parent of ConnectionError and other things.
393
    # For example, "too many redirects".
394
    # See https://github.com/kennethreitz/requests/blob/master/requests/exceptions.py
395
    except requests.exceptions.RequestException as err:
×
396
        endpoint.live = False
×
397
        logging.exception("%s: Unexpected other requests exception.", endpoint.url)
×
398
        utils.debug("%s: %s", endpoint.url, err)
×
399
        return
×
400

401
    except Exception as err:
×
402
        endpoint.unknown_error = True
×
403
        logging.exception(
×
404
            "%s: Unexpected other unknown exception during initial request.",
405
            endpoint.url,
406
        )
407
        utils.debug("%s: %s", endpoint.url, err)
×
408
        return
×
409

410
    # Run SSLyze to see if there are any errors
411
    if endpoint.protocol == "https":
×
412
        https_check(endpoint)
×
413
        # Double-check in case sslyze failed the first time, but the regular conneciton succeeded
414
        if endpoint.live is False and req is not None:
×
415
            logging.warning(
×
416
                "%s: Trying sslyze again since it connected once already.", endpoint.url
417
            )
418
            endpoint.live = True
×
419
            endpoint.https_valid = True
×
420
            https_check(endpoint)
×
421
            if endpoint.live is False:
×
422
                # sslyze failed so back everything out and don't continue analyzing the existing response
423
                req = None
×
424
                endpoint.https_valid = False
×
425
                endpoint.https_full_connection = False
×
426

427
    if req is None:
×
428
        # Ensure that full_connection is set to False if we didn't get a response
429
        if endpoint.protocol == "https":
×
430
            endpoint.https_full_connection = False
×
431
        return
×
432

433
    # try to get IP address if we can
434
    try:
×
435
        if req.raw.closed is False:
×
436
            ip = req.raw._connection.sock.socket.getpeername()[0]
×
437
            if endpoint.ip is None:
×
438
                endpoint.ip = ip
×
439
            else:
440
                if endpoint.ip != ip:
×
441
                    utils.debug(
×
442
                        "%s: Endpoint IP is already %s, but requests IP is %s.",
443
                        endpoint.url,
444
                        endpoint.ip,
445
                        ip,
446
                    )
447
    except Exception:
×
448
        # if the socket has already closed, it will throw an exception, but this is just best effort, so ignore it
449
        logging.exception("Error closing socket")
×
450

451
    # Endpoint is live, analyze the response.
452
    endpoint.headers = req.headers
×
453

454
    endpoint.status = req.status_code
×
455

456
    if req.headers.get("Server") is not None:
×
457
        endpoint.server_header = req.headers.get("Server")
×
458
        # *** in the future add logic to convert header to server version if known
459

460
    if (req.headers.get("Location") is not None) and str(endpoint.status).startswith(
×
461
        "3"
462
    ):
463
        endpoint.redirect = True
×
464
        logging.warning("%s: Found redirect.", endpoint.url)
×
465

466
    if endpoint.redirect:
×
467
        try:
×
468
            location_header = req.headers.get("Location")
×
469
            # Absolute redirects (e.g. "https://example.com/Index.aspx")
470
            if location_header.startswith("http:") or location_header.startswith(
×
471
                "https:"
472
            ):
473
                immediate = location_header
×
474

475
            # Relative redirects (e.g. "Location: /Index.aspx").
476
            # Construct absolute URI, relative to original request.
477
            else:
478
                immediate = urlparse.urljoin(endpoint.url, location_header)
×
479

480
            # Chase down the ultimate destination, ignoring any certificate warnings.
481
            ultimate_req = None
×
482
        except Exception as err:
×
483
            endpoint.unknown_error = True
×
484
            logging.exception(
×
485
                "%s: Unexpected other unknown exception when handling Requests Header.",
486
                endpoint.url,
487
            )
488
            utils.debug("%s %s", endpoint.url, err)
×
489

490
        try:
×
491
            with ping(endpoint.url, allow_redirects=True, verify=False) as ultimate_req:
×
492
                pass
×
493
        except (requests.exceptions.RequestException, OpenSSL.SSL.Error):
×
494
            # Swallow connection errors, but we won't be saving redirect info.
495
            logging.exception("Connection error")
×
496
        except Exception as err:
×
497
            endpoint.unknown_error = True
×
498
            logging.exception(
×
499
                "%s: Unexpected other unknown exception when handling redirect.",
500
                endpoint.url,
501
            )
502
            utils.debug("%s: %s", endpoint.url, err)
×
503
            return
×
504

505
        try:
×
506
            # Now establish whether the redirects were:
507
            # * internal (same exact hostname),
508
            # * within the zone (any subdomain within the parent domain)
509
            # * external (on some other parent domain)
510

511
            # The hostname of the endpoint (e.g. "www.agency.gov")
512
            subdomain_original = urlparse.urlparse(endpoint.url).hostname
×
513
            # The parent domain of the endpoint (e.g. "agency.gov")
514
            base_original = parent_domain_for(subdomain_original)
×
515

516
            # The hostname of the immediate redirect.
517
            # The parent domain of the immediate redirect.
518
            subdomain_immediate = urlparse.urlparse(immediate).hostname
×
519
            base_immediate = parent_domain_for(subdomain_immediate)
×
520

521
            endpoint.redirect_immediately_to = immediate
×
522
            endpoint.redirect_immediately_to_https = immediate.startswith("https://")
×
523
            endpoint.redirect_immediately_to_http = immediate.startswith("http://")
×
524
            endpoint.redirect_immediately_to_external = base_original != base_immediate
×
525
            endpoint.redirect_immediately_to_subdomain = (
×
526
                base_original == base_immediate
527
            ) and (subdomain_original != subdomain_immediate)
528

529
            # We're interested in whether an endpoint redirects to the www version
530
            # of itself (not whether it redirects to www prepended to any other
531
            # hostname, even within the same parent domain).
532
            endpoint.redirect_immediately_to_www = subdomain_immediate == (
×
533
                f"www.{subdomain_original}"
534
            )
535

536
            if ultimate_req is not None:
×
537
                # For ultimate destination, use the URL we arrived at,
538
                # not Location header. Auto-resolves relative redirects.
539
                eventual = ultimate_req.url
×
540

541
                # The hostname of the eventual destination.
542
                # The parent domain of the eventual destination.
543
                subdomain_eventual = urlparse.urlparse(eventual).hostname
×
544
                base_eventual = parent_domain_for(subdomain_eventual)
×
545

546
                endpoint.redirect_eventually_to = eventual
×
547
                endpoint.redirect_eventually_to_https = eventual.startswith("https://")
×
548
                endpoint.redirect_eventually_to_http = eventual.startswith("http://")
×
549
                endpoint.redirect_eventually_to_external = (
×
550
                    base_original != base_eventual
551
                )
552
                endpoint.redirect_eventually_to_subdomain = (
×
553
                    base_original == base_eventual
554
                ) and (subdomain_original != subdomain_eventual)
555

556
            # If we were able to make the first redirect, but not the ultimate redirect,
557
            # and if the immediate redirect is external, then it's accurate enough to
558
            # say that the eventual redirect is the immediate redirect, since you're capturing
559
            # the domain it's going to.
560
            # This also avoids "punishing" the domain for configuration issues of the site
561
            # it redirects to.
562
            elif endpoint.redirect_immediately_to_external:
×
563
                endpoint.redirect_eventually_to = endpoint.redirect_immediately_to
×
564
                endpoint.redirect_eventually_to_https = (
×
565
                    endpoint.redirect_immediately_to_https
566
                )
567
                endpoint.redirect_eventually_to_http = (
×
568
                    endpoint.redirect_immediately_to_http
569
                )
570
                endpoint.redirect_eventually_to_external = (
×
571
                    endpoint.redirect_immediately_to_external
572
                )
573
                endpoint.redirect_eventually_to_subdomain = (
×
574
                    endpoint.redirect_immediately_to_subdomain
575
                )
576
        except Exception as err:
×
577
            endpoint.unknown_error = True
×
578
            logging.exception(
×
579
                "%s: Unexpected other unknown exception when establishing redirects.",
580
                endpoint.url,
581
            )
582
            utils.debug("%s: %s", endpoint.url, err)
×
583

584

585
def hsts_check(endpoint):
2✔
586
    """Perform an HSTS check of the given endpoint.
587

588
    Given an endpoint and its detected headers, extract and parse
589
    any present HSTS header, decide what HSTS properties are there.
590

591
    Disqualify domains with a bad host, they won't work as valid HSTS.
592
    """
593
    try:
×
594
        if endpoint.https_bad_hostname:
×
595
            endpoint.hsts = False
×
596
            return
×
597

598
        header = endpoint.headers.get("Strict-Transport-Security")
×
599

600
        if header is None:
×
601
            endpoint.hsts = False
×
602
            return
×
603

604
        endpoint.hsts = True
×
605
        endpoint.hsts_header = header
×
606

607
        # Set max age to the string after max-age
608
        # TODO: make this more resilient to pathological HSTS headers.
609

610
        # handle multiple HSTS headers, requests comma-separates them
611
        first_pass = re.split(r",\s?", header)[0]
×
612
        second_pass = re.sub(r"\'", "", first_pass)
×
613

614
        temp = re.split(r";\s?", second_pass)
×
615

616
        if "max-age" in header.lower():
×
617
            endpoint.hsts_max_age = int(temp[0][len("max-age=") :])
×
618

619
        if endpoint.hsts_max_age is None or endpoint.hsts_max_age <= 0:
×
620
            endpoint.hsts = False
×
621
            return
×
622

623
        # check if hsts includes sub domains
624
        if "includesubdomains" in header.lower():
×
625
            endpoint.hsts_all_subdomains = True
×
626

627
        # Check is hsts has the preload flag
628
        if "preload" in header.lower():
×
629
            endpoint.hsts_preload = True
×
630
    except Exception as err:
×
631
        endpoint.unknown_error = True
×
632
        logging.exception(
×
633
            "%s: Unknown exception when handling HSTS check.", endpoint.url
634
        )
635
        utils.debug("%s: %s", endpoint.url, err)
×
636
        return
×
637

638

639
def https_check(endpoint):
2✔
640
    """Use sslyze to figure out the reason an endpoint failed to verify."""
641
    utils.debug("sslyzing %s...", endpoint.url)
×
642

643
    # remove the https:// from prefix for sslyze
644
    try:
×
645
        hostname = endpoint.url[8:]
×
646
        server_tester = ServerConnectivityTester(hostname=hostname, port=443)
×
647
        server_info = server_tester.perform()
×
648
        endpoint.live = True
×
649
        ip = server_info.ip_address
×
650
        if endpoint.ip is None:
×
651
            endpoint.ip = ip
×
652
        else:
653
            if endpoint.ip != ip:
×
654
                utils.debug(
×
655
                    "%s: Endpoint IP is already %s, but requests IP is %s.",
656
                    endpoint.url,
657
                    endpoint.ip,
658
                    ip,
659
                )
660
        if server_info.client_auth_requirement.name == "REQUIRED":
×
661
            endpoint.https_client_auth_required = True
×
662
            logging.warning("%s: Client Authentication REQUIRED", endpoint.url)
×
663
    except ServerConnectivityError as err:
×
664
        endpoint.live = False
×
665
        endpoint.https_valid = False
×
666
        logging.exception(
×
667
            "%s: Error in sslyze server connectivity check when connecting to %s",
668
            endpoint.url,
669
            err.server_info.hostname,
670
        )
671
        utils.debug("%s: %s", endpoint.url, err)
×
672
        return
×
673
    except Exception as err:
×
674
        endpoint.unknown_error = True
×
675
        logging.exception(
×
676
            "%s: Unknown exception in sslyze server connectivity check.", endpoint.url
677
        )
678
        utils.debug("%s: %s", endpoint.url, err)
×
679
        return
×
680

681
    try:
×
682
        cert_plugin_result = None
×
683
        command = sslyze.plugins.certificate_info_plugin.CertificateInfoScanCommand(
×
684
            ca_file=CA_FILE
685
        )
686
        scanner = sslyze.synchronous_scanner.SynchronousScanner()
×
687
        cert_plugin_result = scanner.run_scan_command(server_info, command)
×
688
    except Exception as err:
×
689
        try:
×
690
            if "timed out" in str(err):
×
691
                logging.exception(
×
692
                    "%s: Retrying sslyze scanner certificate plugin.", endpoint.url
693
                )
694
                cert_plugin_result = scanner.run_scan_command(server_info, command)
×
695
            else:
696
                logging.exception(
×
697
                    "%s: Unknown exception in sslyze scanner certificate plugin.",
698
                    endpoint.url,
699
                )
700
                utils.debug("%s: %s", endpoint.url, err)
×
701
                endpoint.unknown_error = True
×
702
                # We could make this False, but there was an error so
703
                # we don't know
704
                endpoint.https_valid = None
×
705
                return
×
706
        except Exception:
×
707
            logging.exception(
×
708
                "%s: Unknown exception in sslyze scanner certificate plugin.",
709
                endpoint.url,
710
            )
711
            utils.debug("%s: %s", endpoint.url, err)
×
712
            endpoint.unknown_error = True
×
713
            # We could make this False, but there was an error so we
714
            # don't know
715
            endpoint.https_valid = None
×
716
            return
×
717

718
    try:
×
719
        public_trust = True
×
720
        custom_trust = True
×
721
        public_not_trusted_names = []
×
722
        validation_results = cert_plugin_result.path_validation_result_list
×
723
        for result in validation_results:
×
724
            if result.was_validation_successful:
×
725
                # We're assuming that it is trusted to start with
726
                pass
×
727
            else:
728
                if "Custom" in result.trust_store.name:
×
729
                    custom_trust = False
×
730
                else:
731
                    public_trust = False
×
732
                    public_not_trusted_names.append(result.trust_store.name)
×
733
        if public_trust:
×
734
            logging.warning(
×
735
                "%s: Publicly trusted by common trust stores.", endpoint.url
736
            )
737
        else:
738
            logging.warning(
×
739
                "%s: Not publicly trusted - not trusted by %s.",
740
                endpoint.url,
741
                ", ".join(public_not_trusted_names),
742
            )
743
        if CA_FILE is not None:
×
744
            if custom_trust:
×
745
                logging.warning("%s: Trusted by custom trust store.", endpoint.url)
×
746
            else:
747
                logging.warning("%s: Not trusted by custom trust store.", endpoint.url)
×
748
        else:
749
            custom_trust = None
×
750
        endpoint.https_public_trusted = public_trust
×
751
        endpoint.https_custom_trusted = custom_trust
×
752
    except Exception as err:
×
753
        # Ignore exception
754
        logging.exception("%s: Unknown exception examining trust.", endpoint.url)
×
755
        utils.debug("%s: Unknown exception examining trust: %s", endpoint.url, err)
×
756

757
    try:
×
758
        cert_response = cert_plugin_result.as_text()
×
759
    except AttributeError:
×
760
        logging.exception(
×
761
            "%s: Known error in sslyze 1.X with EC public keys. See https://github.com/nabla-c0d3/sslyze/issues/215",
762
            endpoint.url,
763
        )
764
        return
×
765
    except Exception as err:
×
766
        endpoint.unknown_error = True
×
767
        logging.exception("%s: Unknown exception in cert plugin.", endpoint.url)
×
768
        utils.debug("%s: %s", endpoint.url, err)
×
769
        return
×
770

771
    # Debugging
772
    # for msg in cert_response:
773
    #     print(msg)
774

775
    # Default endpoint assessments to False until proven True.
776
    endpoint.https_expired_cert = False
×
777
    endpoint.https_self_signed_cert = False
×
778
    endpoint.https_bad_chain = False
×
779
    endpoint.https_bad_hostname = False
×
780

781
    # STORE will be either "Mozilla" or "Custom"
782
    # depending on what the user chose.
783

784
    # A certificate can have multiple issues.
785
    for msg in cert_response:
×
786
        # Check for missing SAN.
UNCOV
787
        if (("DNS Subject Alternative Names") in msg) and (("[]") in msg):
×
788
            endpoint.https_bad_hostname = True
×
789

790
        # Check for certificate expiration.
791
        if (
×
792
            (STORE in msg)
793
            and (("FAILED") in msg)
794
            and (("certificate has expired") in msg)
795
        ):
796
            endpoint.https_expired_cert = True
×
797

798
        # Check to see if the cert is self-signed
799
        if (
×
800
            (STORE in msg)
801
            and (("FAILED") in msg)
802
            and (("self signed certificate") in msg)
803
        ):
804
            endpoint.https_self_signed_cert = True
×
805

806
        # Check to see if there is a bad chain
807

808
        # NOTE: If this is the only flag that's set, it's probably
809
        # an incomplete chain
810
        # If this isnt the only flag that is set, it's might be
811
        # because there is another error. More debugging would
812
        # need to be done at this point, but not through sslyze
813
        # because sslyze doesn't have enough granularity
814

815
        if (
×
816
            (STORE in msg)
817
            and (("FAILED") in msg)
818
            and (
819
                (("unable to get local issuer certificate") in msg)
820
                or (("self signed certificate") in msg)
821
            )
822
        ):
823
            endpoint.https_bad_chain = True
×
824

825
        # Check for whether the hostname validates.
826
        if (
×
827
            (("Hostname Validation") in msg)
828
            and (("FAILED") in msg)
829
            and (("Certificate does NOT match") in msg)
830
        ):
831
            endpoint.https_bad_hostname = True
×
832

833
    try:
×
834
        endpoint.https_cert_chain_len = len(
×
835
            cert_plugin_result.received_certificate_chain
836
        )
837
        if endpoint.https_self_signed_cert is False and (
×
838
            endpoint.https_cert_chain_len < 2
839
        ):
840
            # *** TODO check that it is not a bad hostname and that the root cert is trusted before suggesting that it is an intermediate cert issue.
841
            endpoint.https_missing_intermediate_cert = True
×
842
            if cert_plugin_result.verified_certificate_chain is None:
×
843
                logging.warning(
×
844
                    "%s: Untrusted certificate chain, probably due to missing intermediate certificate.",
845
                    endpoint.url,
846
                )
847
                utils.debug(
×
848
                    "%s: Only %d certificates in certificate chain received.",
849
                    endpoint.url,
850
                    cert_plugin_result.received_certificate_chain.__len__(),
851
                )
852
            elif custom_trust is True and public_trust is False:
×
853
                # recheck public trust using custom public trust store with manually added intermediate certificates
854
                if PT_INT_CA_FILE is not None:
×
855
                    try:
×
856
                        cert_plugin_result = None
×
857
                        command = sslyze.plugins.certificate_info_plugin.CertificateInfoScanCommand(
×
858
                            ca_file=PT_INT_CA_FILE
859
                        )
860
                        cert_plugin_result = scanner.run_scan_command(
×
861
                            server_info, command
862
                        )
863
                        if cert_plugin_result.verified_certificate_chain is not None:
×
864
                            public_trust = True
×
865
                            endpoint.https_public_trusted = public_trust
×
866
                            logging.warning(
×
867
                                "%s: Trusted by special public trust store with intermediate certificates.",
868
                                endpoint.url,
869
                            )
870
                    except Exception:
×
871
                        logging.exception("Error while rechecking public trust")
×
872
        else:
873
            endpoint.https_missing_intermediate_cert = False
×
874
    except Exception:
×
875
        logging.exception("Error while determining length of certificate chain")
×
876

877
    # If anything is wrong then https is not valid
878
    if (
×
879
        endpoint.https_expired_cert
880
        or endpoint.https_self_signed_cert
881
        or endpoint.https_bad_chain
882
        or endpoint.https_bad_hostname
883
    ):
884
        endpoint.https_valid = False
×
885

886

887
def canonical_endpoint(http, httpwww, https, httpswww):
2✔
888
    """Make a best guess for the "canonical" endpoint of a domain.
889

890
    Given behavior for the four endpoints, make a best guess
891
    as to which is the "canonical" site for the domain.
892

893
    Most of the domain-level decisions rely on this guess in some way.
894

895
    A domain is "canonically" at www if:
896
     * at least one of its www endpoints responds
897
     * both root endpoints are either down or redirect *somewhere*
898
     * either both root endpoints are down, *or* at least one
899
       root endpoint redirect should immediately go to
900
       an *internal* www endpoint
901
    This is meant to affirm situations like:
902
      http:// -> https:// -> https://www
903
      https:// -> http:// -> https://www
904
    and meant to avoid affirming situations like:
905
      http:// -> http://non-www,
906
      http://www -> http://non-www
907
    or like:
908
      https:// -> 200, http:// -> http://www
909
    """
910
    at_least_one_www_used = httpswww.live or httpwww.live
2✔
911

912
    def root_unused(endpoint):
2✔
913
        return (
2✔
914
            endpoint.redirect
915
            or not endpoint.live
916
            or endpoint.https_bad_hostname  # harmless for http endpoints
917
            or not str(endpoint.status).startswith("2")
918
        )
919

920
    def root_down(endpoint):
2✔
921
        return (
2✔
922
            not endpoint.live
923
            or endpoint.https_bad_hostname
924
            or (
925
                not str(endpoint.status).startswith("2")
926
                and not str(endpoint.status).startswith("3")
927
            )
928
        )
929

930
    all_roots_unused = root_unused(https) and root_unused(http)
2✔
931

932
    all_roots_down = root_down(https) and root_down(http)
2✔
933

934
    is_www = (
2✔
935
        at_least_one_www_used
936
        and all_roots_unused
937
        and (
938
            all_roots_down
939
            or https.redirect_immediately_to_www
940
            or http.redirect_immediately_to_www
941
        )
942
    )
943

944
    # A domain is "canonically" at https if:
945
    #  * at least one of its https endpoints is live and
946
    #    doesn't have an invalid hostname
947
    #  * both http endpoints are either down or redirect *somewhere*
948
    #  * at least one http endpoint redirects immediately to
949
    #    an *internal* https endpoint
950
    # This is meant to affirm situations like:
951
    #   http:// -> http://www -> https://
952
    #   https:// -> http:// -> https://www
953
    # and meant to avoid affirming situations like:
954
    #   http:// -> http://non-www
955
    #   http://www -> http://non-www
956
    # or:
957
    #   http:// -> 200, http://www -> https://www
958
    #
959
    # It allows a site to be canonically HTTPS if the cert has
960
    # a valid hostname but invalid chain issues.
961

962
    def https_used(endpoint):
2✔
963
        return endpoint.live and not endpoint.https_bad_hostname
2✔
964

965
    def http_unused(endpoint):
2✔
966
        return (
2✔
967
            endpoint.redirect
968
            or not endpoint.live
969
            or not str(endpoint.status).startswith("2")
970
        )
971

972
    def http_upgrades(endpoint):
2✔
973
        return endpoint.redirect_immediately_to_https and (
2✔
974
            not endpoint.redirect_immediately_to_external
975
        )
976

977
    at_least_one_https_endpoint = https_used(https) or https_used(httpswww)
2✔
978
    all_http_unused = http_unused(http) and http_unused(httpwww)
2✔
979
    both_http_down = not http.live and not httpwww.live
2✔
980
    at_least_one_http_upgrades = http_upgrades(http) or http_upgrades(httpwww)
2✔
981

982
    is_https = (
2✔
983
        at_least_one_https_endpoint
984
        and all_http_unused
985
        and (both_http_down or at_least_one_http_upgrades)
986
    )
987

988
    if is_www and is_https:
2!
989
        return httpswww
×
990
    if is_www and not is_https:
2!
991
        return httpwww
×
992
    if not is_www and is_https:
2!
993
        return https
×
994
    if not is_www and not is_https:
2!
995
        return http
2✔
996

997

998
##
999
# Judgment calls based on observed endpoint data.
1000
##
1001

1002

1003
def is_live(domain):
2✔
1004
    """Check if a domain has any live endpoints."""
1005
    http, httpwww, https, httpswww = (
2✔
1006
        domain.http,
1007
        domain.httpwww,
1008
        domain.https,
1009
        domain.httpswww,
1010
    )
1011

1012
    return http.live or httpwww.live or https.live or httpswww.live
2✔
1013

1014

1015
def is_https_live(domain):
2✔
1016
    """Check if a domain has any live HTTPS endpoints."""
1017
    https, httpswww = domain.https, domain.httpswww
2✔
1018

1019
    return https.live or httpswww.live
2✔
1020

1021

1022
def is_full_connection(domain):
2✔
1023
    """Check if a domain is fully connected.
1024

1025
    Domain is "fully connected" if any HTTPS endpoint is fully connected.
1026
    """
1027
    https, httpswww = domain.https, domain.httpswww
2✔
1028

1029
    return https.https_full_connection or httpswww.https_full_connection
2✔
1030

1031

1032
def is_client_auth_required(domain):
2✔
1033
    """Check if a domain requires client authentication.
1034

1035
    Domain requires client authentication if *any* HTTPS endpoint requires it for full
1036
    TLS connection.
1037
    """
1038
    https, httpswww = domain.https, domain.httpswww
2✔
1039

1040
    return https.https_client_auth_required or httpswww.https_client_auth_required
2✔
1041

1042

1043
def is_redirect_or_down(endpoint):
2✔
1044
    """Check if an endpoint redirects to an external site or is down.
1045

1046
    Endpoint is a redirect or down if it is a redirect to an external site or it is
1047
    down in any of 3 ways: it is not live, it is HTTPS and has a bad hostname in the
1048
    cert, or it responds with a 4xx error code
1049
    """
1050
    return (
×
1051
        endpoint.redirect_eventually_to_external
1052
        or not endpoint.live
1053
        or (endpoint.protocol == "https" and endpoint.https_bad_hostname)
1054
        or (endpoint.status is not None and endpoint.status >= 400)
1055
    )
1056

1057

1058
def is_redirect(endpoint):
2✔
1059
    """Check if an endpoint is a redirect to an external site."""
1060
    return endpoint.redirect_eventually_to_external
×
1061

1062

1063
def is_redirect_domain(domain):
2✔
1064
    """Check if a domain redirects HTTP or HTTPS traffic.
1065

1066
    Domain is "a redirect domain" if at least one endpoint is
1067
    a redirect, and all endpoints are either redirects or down.
1068
    """
1069
    http, httpwww, https, httpswww = (
2✔
1070
        domain.http,
1071
        domain.httpwww,
1072
        domain.https,
1073
        domain.httpswww,
1074
    )
1075

1076
    return is_live(domain) and (
2✔
1077
        (
1078
            is_redirect(http)
1079
            or is_redirect(httpwww)
1080
            or is_redirect(https)
1081
            or is_redirect(httpswww)
1082
        )
1083
        and is_redirect_or_down(https)
1084
        and is_redirect_or_down(httpswww)
1085
        and is_redirect_or_down(httpwww)
1086
        and is_redirect_or_down(http)
1087
    )
1088

1089

1090
def is_http_redirect_domain(domain):
2✔
1091
    """Check if a domain redirects HTTP traffic.
1092

1093
    Domain is "an http redirect domain" if at least one HTTP endpoint
1094
    is a redirect, and all other http endpoints are either redirects
1095
    or down.
1096
    """
NEW
1097
    (
×
1098
        http,
1099
        httpwww,
1100
    ) = (
1101
        domain.http,
1102
        domain.httpwww,
1103
    )
1104

1105
    return is_live(domain) and (
×
1106
        (is_redirect(http) or is_redirect(httpwww))
1107
        and is_redirect_or_down(httpwww)
1108
        and is_redirect_or_down(http)
1109
    )
1110

1111

1112
def redirects_to(domain):
2✔
1113
    """Check where a domain redirects to (if it redirects).
1114

1115
    If a domain is a "redirect domain", where does it redirect to?
1116
    """
1117
    canonical = domain.canonical
2✔
1118

1119
    if is_redirect_domain(domain):
2!
1120
        return canonical.redirect_eventually_to
×
1121
    return None
2✔
1122

1123

1124
def is_valid_https(domain):
2✔
1125
    """Check if a domain has a valid HTTPS server.
1126

1127
    A domain has "valid HTTPS" if it responds on port 443 at its canonical
1128
    hostname with an unexpired valid certificate for the hostname.
1129
    """
1130
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1131

1132
    # Evaluate the HTTPS version of the canonical hostname
1133
    evaluate = https if canonical.host == "root" else httpswww
2✔
1134

1135
    return evaluate.live and evaluate.https_valid
2✔
1136

1137

1138
def is_defaults_to_https(domain):
2✔
1139
    """Check if a domain defaults to HTTPS.
1140

1141
    A domain "defaults to HTTPS" if its canonical endpoint uses HTTPS.
1142
    """
1143
    canonical = domain.canonical
2✔
1144

1145
    return canonical.protocol == "https"
2✔
1146

1147

1148
def is_downgrades_https(domain):
2✔
1149
    """Check if a domain allows downgrading HTTPS.
1150

1151
    Domain downgrades if HTTPS is supported in some way, but
1152
    its canonical HTTPS endpoint immediately redirects internally to HTTP.
1153
    """
1154
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1155

1156
    # The domain "supports" HTTPS if any HTTPS endpoint responds with
1157
    # a certificate valid for its hostname.
1158
    supports_https = (https.live and not https.https_bad_hostname) or (
2✔
1159
        httpswww.live and not httpswww.https_bad_hostname
1160
    )
1161

1162
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1163

1164
    # Explicitly convert to bool to avoid unintentionally returning None,
1165
    # which may happen if the site doesn't redirect.
1166
    return bool(
2✔
1167
        supports_https
1168
        and canonical_https.redirect_immediately_to_http
1169
        and not canonical_https.redirect_immediately_to_external
1170
    )
1171

1172

1173
def is_strictly_forces_https(domain):
2✔
1174
    """Check if a domain strictly forces HTTPS.
1175

1176
    A domain "Strictly Forces HTTPS" if one of the HTTPS endpoints is
1177
    "live", and if both *HTTP* endpoints are either:
1178

1179
     * down, or
1180
     * redirect immediately to an HTTPS URI.
1181

1182
    This is different than whether a domain "Defaults" to HTTPS.
1183

1184
    * An HTTP redirect can go to HTTPS on another domain, as long
1185
      as it's immediate.
1186
    * A domain with an invalid cert can still be enforcing HTTPS.
1187
    """
1188
    http, httpwww, https, httpswww = (
2✔
1189
        domain.http,
1190
        domain.httpwww,
1191
        domain.https,
1192
        domain.httpswww,
1193
    )
1194

1195
    def down_or_redirects(endpoint):
2✔
1196
        return not endpoint.live or endpoint.redirect_immediately_to_https
2✔
1197

1198
    https_somewhere = https.live or httpswww.live
2✔
1199
    all_http_unused = down_or_redirects(http) and down_or_redirects(httpwww)
2✔
1200

1201
    return https_somewhere and all_http_unused
2✔
1202

1203

1204
def is_publicly_trusted(domain):
2✔
1205
    """Check if a domain has a publicly trusted certificate.
1206

1207
    A domain has a "Publicly Trusted" certificate if its canonical
1208
    endpoint has a publicly trusted certificate.
1209
    """
1210
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1211

1212
    # Evaluate the HTTPS version of the canonical hostname
1213
    evaluate = https if canonical.host == "root" else httpswww
2✔
1214

1215
    return evaluate.live and evaluate.https_public_trusted
2✔
1216

1217

1218
def is_custom_trusted(domain):
2✔
1219
    """Check if a domain has a custom trusted certificate.
1220

1221
    A domain has a "Custom Trusted" certificate if its canonical
1222
    endpoint has a certificate that is trusted by the custom
1223
    truststore.
1224
    """
1225
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1226

1227
    # Evaluate the HTTPS version of the canonical hostname
1228
    evaluate = https if canonical.host == "root" else httpswww
2✔
1229

1230
    return evaluate.live and evaluate.https_custom_trusted
2✔
1231

1232

1233
def is_bad_chain(domain):
2✔
1234
    """Check if a domain has a bad certificate chain.
1235

1236
    Domain has a bad chain if its canonical HTTPS endpoint has a bad
1237
    chain.
1238
    """
1239
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1240

1241
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1242

1243
    return canonical_https.https_bad_chain
2✔
1244

1245

1246
def is_bad_hostname(domain):
2✔
1247
    """Check if a domain has a bad hostname.
1248

1249
    Domain has a bad hostname if its canonical HTTPS endpoint fails
1250
    hostname validation.
1251
    """
1252
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1253

1254
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1255

1256
    return canonical_https.https_bad_hostname
2✔
1257

1258

1259
def is_expired_cert(domain):
2✔
1260
    """Check if a domain's canonical endpoint has an expired certificate."""
1261
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1262

1263
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1264

1265
    return canonical_https.https_expired_cert
2✔
1266

1267

1268
def is_self_signed_cert(domain):
2✔
1269
    """Check if the domain's canonical endpoint has a self-signed certificate."""
1270
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1271

1272
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1273

1274
    return canonical_https.https_self_signed_cert
2✔
1275

1276

1277
def cert_chain_length(domain):
2✔
1278
    """Get the certificate chain length for a domain's canonical HTTPS endpoint."""
1279
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1280

1281
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1282

1283
    return canonical_https.https_cert_chain_len
2✔
1284

1285

1286
def is_missing_intermediate_cert(domain):
2✔
1287
    """Check if a domain's certificate chain is missing an intermediate certificate.
1288

1289
    Returns whether the served cert chain is probably missing the
1290
    needed intermediate certificate for the canonical HTTPS endpoint.
1291
    """
1292
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1293

1294
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1295

1296
    return canonical_https.https_missing_intermediate_cert
2✔
1297

1298

1299
def is_hsts(domain):
2✔
1300
    """Check if a domain's canonical endpoint has HSTS.
1301

1302
    Domain has HSTS if its canonical HTTPS endpoint has HSTS.
1303
    """
1304
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1305

1306
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1307

1308
    return canonical_https.hsts
2✔
1309

1310

1311
def hsts_header(domain):
2✔
1312
    """Get a domain's canonical endpoint's HSTS header."""
1313
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1314

1315
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1316

1317
    return canonical_https.hsts_header
2✔
1318

1319

1320
def hsts_max_age(domain):
2✔
1321
    """Get a domain's canonical endpoint's HSTS max-age."""
1322
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1323

1324
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1325

1326
    return canonical_https.hsts_max_age
2✔
1327

1328

1329
def is_hsts_entire_domain(domain):
2✔
1330
    """Check if a domain's ROOT endpoint HSTS configuration includes all subdomains."""
1331
    https = domain.https
2✔
1332

1333
    return https.hsts_all_subdomains
2✔
1334

1335

1336
def is_hsts_preload_ready(domain):
2✔
1337
    """Check if a domain's ROOT endpoint is HSTS preload-ready."""
1338
    https = domain.https
2✔
1339

1340
    eighteen_weeks = (https.hsts_max_age is not None) and (
2✔
1341
        https.hsts_max_age >= 10886400
1342
    )
1343
    preload_ready = eighteen_weeks and https.hsts_all_subdomains and https.hsts_preload
2✔
1344

1345
    return preload_ready
2✔
1346

1347

1348
def is_hsts_preload_pending(domain):
2✔
1349
    """Check if a domain is pending inclusion in Chrome's HSTS preload list.
1350

1351
    If PRELOAD_PENDING is None, the caches have not been initialized, so do
1352
    that.
1353
    """
1354
    if PRELOAD_PENDING is None:
2!
1355
        logging.error("`PRELOAD_PENDING` has not yet been initialized!")
×
1356
        raise RuntimeError(
×
1357
            "`initialize_external_data()` must be called explicitly before "
1358
            "using this function"
1359
        )
1360

1361
    return domain.domain in PRELOAD_PENDING
2✔
1362

1363

1364
def is_hsts_preloaded(domain):
2✔
1365
    """Check if a domain is contained in Chrome's HSTS preload list.
1366

1367
    If PRELOAD_LIST is None, the caches have not been initialized, so do that.
1368
    """
1369
    if PRELOAD_LIST is None:
2!
1370
        logging.error("`PRELOAD_LIST` has not yet been initialized!")
×
1371
        raise RuntimeError(
×
1372
            "`initialize_external_data()` must be called explicitly before "
1373
            "using this function"
1374
        )
1375

1376
    return domain.domain in PRELOAD_LIST
2✔
1377

1378

1379
def is_parent_hsts_preloaded(domain):
2✔
1380
    """Check if a domain's parent domain is in Chrome's HSTS preload list."""
1381
    return is_hsts_preloaded(Domain(parent_domain_for(domain.domain)))
2✔
1382

1383

1384
def parent_domain_for(hostname):
2✔
1385
    """Get the parent domain for a given domain name.
1386

1387
    For "x.y.domain.gov", return "domain.gov".
1388

1389
    If SUFFIX_LIST is None, the caches have not been initialized, so do that.
1390
    """
1391
    if SUFFIX_LIST is None:
2!
1392
        logging.error("`SUFFIX_LIST` has not yet been initialized!")
×
1393
        raise RuntimeError(
×
1394
            "`initialize_external_data()` must be called explicitly before "
1395
            "using this function"
1396
        )
1397

1398
    return SUFFIX_LIST.get_public_suffix(hostname)
2✔
1399

1400

1401
def is_domain_supports_https(domain):
2✔
1402
    """Check if a domain supports HTTPS.
1403

1404
    A domain 'Supports HTTPS' when it doesn't downgrade and has valid HTTPS,
1405
    or when it doesn't downgrade and has a bad chain but not a bad hostname.
1406
    Domains with a bad chain "support" HTTPS but user-side errors should be expected.
1407
    """
1408
    return (not is_downgrades_https(domain) and is_valid_https(domain)) or (
2✔
1409
        not is_downgrades_https(domain)
1410
        and is_bad_chain(domain)
1411
        and not is_bad_hostname(domain)
1412
    )
1413

1414

1415
def is_domain_enforces_https(domain):
2✔
1416
    """Check if a domain enforces HTTPS.
1417

1418
    A domain that 'Enforces HTTPS' must 'Support HTTPS' and default to
1419
    HTTPS.  For websites (where Redirect is false) they are allowed to
1420
    eventually redirect to an https:// URI. For "redirect domains"
1421
    (domains where the Redirect value is true) they must immediately
1422
    redirect clients to an https:// URI (even if that URI is on
1423
    another domain) in order to be said to enforce HTTPS.
1424
    """
1425
    return (
2✔
1426
        is_domain_supports_https(domain)
1427
        and is_strictly_forces_https(domain)
1428
        and (is_defaults_to_https(domain) or is_http_redirect_domain(domain))
1429
    )
1430

1431

1432
def is_domain_strong_hsts(domain):
2✔
1433
    """Check if a domain is using strong HSTS."""
1434
    if is_hsts(domain) and hsts_max_age(domain):
2!
1435
        return is_hsts(domain) and hsts_max_age(domain) >= 31536000
×
1436
    return None
2✔
1437

1438

1439
def get_domain_ip(domain):
2✔
1440
    """Get the IP for the domain.
1441

1442
    This returns the first that is not None in the following priority:
1443
    Canonical -> HTTPS -> www HTTPS -> www HTTP -> HTTP
1444
    """
1445
    if domain.canonical.ip is not None:
2!
1446
        return domain.canonical.ip
×
1447
    if domain.https.ip is not None:
2!
1448
        return domain.https.ip
×
1449
    if domain.httpswww.ip is not None:
2!
1450
        return domain.httpswww.ip
×
1451
    if domain.httpwww.ip is not None:
2!
1452
        return domain.httpwww.ip
×
1453
    if domain.http.ip is not None:
2!
1454
        return domain.http.ip
×
1455
    return None
2✔
1456

1457

1458
def get_domain_server_header(domain):
2✔
1459
    """Get the Server header from the response for the domain.
1460

1461
    This returns the first that is not None in the following priority:
1462
    Canonical -> HTTPS -> www HTTPS -> www HTTP -> HTTP
1463
    """
1464
    if domain.canonical.server_header is not None:
2!
1465
        return domain.canonical.server_header.replace(",", ";")
×
1466
    if domain.https.server_header is not None:
2!
1467
        return domain.https.server_header.replace(",", ";")
×
1468
    if domain.httpswww.server_header is not None:
2!
1469
        return domain.httpswww.server_header.replace(",", ";")
×
1470
    if domain.httpwww.server_header is not None:
2!
1471
        return domain.httpwww.server_header.replace(",", ";")
×
1472
    if domain.http.server_header is not None:
2!
1473
        return domain.http.server_header.replace(",", ";")
×
1474
    return None
2✔
1475

1476

1477
def get_domain_server_version(domain):
2✔
1478
    """Get the server version for the remote web server.
1479

1480
    This returns the first that is not None in the following priority:
1481
    Canonical -> HTTPS -> www HTTPS -> www HTTP -> HTTP
1482
    The server version is based on the returned Server header.
1483
    """
1484
    if domain.canonical.server_version is not None:
2!
1485
        return domain.canonical.server_version
×
1486
    if domain.https.server_version is not None:
2!
1487
        return domain.https.server_version
×
1488
    if domain.httpswww.server_version is not None:
2!
1489
        return domain.httpswww.server_version
×
1490
    if domain.httpwww.server_version is not None:
2!
1491
        return domain.httpwww.server_version
×
1492
    if domain.http.server_version is not None:
2!
1493
        return domain.http.server_version
×
1494
    return None
2✔
1495

1496

1497
def get_domain_notes(domain):
2✔
1498
    """Combine any notes for a domain."""
1499
    all_notes = (
2✔
1500
        domain.http.notes
1501
        + domain.httpwww.notes
1502
        + domain.https.notes
1503
        + domain.httpswww.notes
1504
    )
1505
    all_notes = all_notes.replace(",", ";")
2✔
1506
    return all_notes
2✔
1507

1508

1509
def did_domain_error(domain):
2✔
1510
    """Check a domain for any unknown errors.
1511

1512
    The main purpose of this is to flag any odd websites for
1513
    further debugging with other tools.
1514
    """
1515
    http, httpwww, https, httpswww = (
2✔
1516
        domain.http,
1517
        domain.httpwww,
1518
        domain.https,
1519
        domain.httpswww,
1520
    )
1521

1522
    return (
2✔
1523
        http.unknown_error
1524
        or httpwww.unknown_error
1525
        or https.unknown_error
1526
        or httpswww.unknown_error
1527
    )
1528

1529

1530
def load_preload_pending():
2✔
1531
    """Fetch the Chrome preload pending list."""
1532
    utils.debug("Fetching hstspreload.org pending list...", divider=True)
×
1533
    pending_url = "https://hstspreload.org/api/v2/pending"
×
1534

1535
    try:
×
1536
        request = requests.get(pending_url)
×
1537
    except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err:
×
1538
        logging.exception("Failed to fetch pending preload list: %s", pending_url)
×
1539
        logging.debug(err)
×
1540
        return []
×
1541

1542
    # TODO: abstract Py 2/3 check out to utils
1543
    if sys.version_info[0] < 3:
×
1544
        raw = request.content
×
1545
    else:
1546
        raw = str(request.content, "utf-8")
×
1547

1548
    pending_json = json.loads(raw)
×
1549

1550
    pending = []
×
1551
    for entry in pending_json:
×
1552
        if entry.get("include_subdomains", False) is True:
×
1553
            pending.append(entry["name"])
×
1554

1555
    return pending
×
1556

1557

1558
def load_preload_list():
2✔
1559
    """Download and load the Chromium preload list."""
1560
    preload_json = None
×
1561

1562
    utils.debug("Fetching Chrome preload list from source...", divider=True)
×
1563

1564
    # Downloads the chromium preloaded domain list and sets it to a global set
1565
    file_url = "https://chromium.googlesource.com/chromium/src/+/main/net/http/transport_security_state_static.json?format=TEXT"
×
1566

1567
    try:
×
1568
        request = requests.get(file_url)
×
1569
    except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err:
×
1570
        logging.exception("Failed to fetch preload list: %s", file_url)
×
1571
        logging.debug(err)
×
1572
        return []
×
1573

1574
    raw = request.content
×
1575

1576
    # To avoid parsing the contents of the file out of the source tree viewer's
1577
    # HTML, we download it as a raw file. googlesource.com Base64-encodes the
1578
    # file to avoid potential content injection issues, so we need to decode it
1579
    # before using it. https://code.google.com/p/gitiles/issues/detail?id=7
1580
    raw = base64.b64decode(raw).decode("utf-8")
×
1581

1582
    # The .json file contains '//' comments, which are not actually valid JSON,
1583
    # and confuse Python's JSON decoder. Begone, foul comments!
1584
    raw = "".join([re.sub(r"^\s*//.*$", "", line) for line in raw.splitlines()])
×
1585

1586
    preload_json = json.loads(raw)
×
1587

1588
    # For our purposes, we only care about entries that includeSubDomains
1589
    fully_preloaded = []
×
1590
    for entry in preload_json["entries"]:
×
1591
        if entry.get("include_subdomains", False) is True:
×
1592
            fully_preloaded.append(entry["name"])
×
1593

1594
    return fully_preloaded
×
1595

1596

1597
# Returns an instantiated PublicSuffixList object.
1598
def load_suffix_list(cache_suffix_list=None, update_list=False):
2✔
1599
    """Download and load the public suffix list."""
1600
    if update_list:
×
1601
        utils.debug("Downloading the Public Suffix List...", divider=True)
×
1602
        try:
×
1603
            # Update the local copy
1604
            if cache_suffix_list:
×
1605
                updatePSL(cache_suffix_list)
×
1606
            # Update the built-in copy
1607
            else:
1608
                updatePSL()
×
1609
        except Exception as err:
×
1610
            logging.exception("Unable to download the Public Suffix List...")
×
1611
            utils.debug(err)
×
1612
            return None
×
1613

1614
    # Use the local copy
1615
    if cache_suffix_list:
×
1616
        utils.debug("Using cached Public Suffix List.", divider=True)
×
1617
        with codecs.open(cache_suffix_list, encoding="utf-8") as cache_file:
×
1618
            suffixes = PublicSuffixList(cache_file)
×
1619
    # Use the built-in copy
1620
    else:
1621
        suffixes = PublicSuffixList()
×
1622

1623
    return suffixes
×
1624

1625

1626
def initialize_external_data(
2✔
1627
    init_preload_list=None, init_preload_pending=None, init_suffix_list=None
1628
):
1629
    """Load any third party external data.
1630

1631
    This can be called explicitly by a library, as part of the setup needed
1632
    before calling other library functions, or called as part of running
1633
    inspect_domains() or CLI operation.
1634

1635
    If values are passed in to this function, they will be assigned to
1636
    be the cached values. This allows a caller of the Python API to manage
1637
    cached data in a customized way.
1638

1639
    It also potentially allows clients to pass in subsets of these lists,
1640
    for testing or novel performance reasons.
1641

1642
    Otherwise, if the --cache-third-parties=[DIR] flag specifies a directory,
1643
    all downloaded third party data will be cached in a directory, and
1644
    used from cache on the next pshtt run instead of hitting the network.
1645

1646
    If no values are passed in, and no --cache-third-parties flag is used,
1647
    then no cached third party data will be created or used, and pshtt will
1648
    download the latest data from those third party sources.
1649
    """
1650
    global PRELOAD_LIST, PRELOAD_PENDING, SUFFIX_LIST
1651

1652
    # The preload list should be sent in as a list of domains.
1653
    if init_preload_list is not None:
×
1654
        PRELOAD_LIST = init_preload_list
×
1655

1656
    # The PRELOAD_PENDING list should be sent in as a list of domains.
1657
    if init_preload_pending is not None:
×
1658
        PRELOAD_PENDING = init_preload_pending
×
1659

1660
    # The public suffix list should be sent in as a list of file lines.
1661
    if init_suffix_list is not None:
×
1662
        SUFFIX_LIST = PublicSuffixList(init_suffix_list)
×
1663

1664
    # If there's a specified cache dir, prepare paths.
1665
    # Only used when no data has been set yet for a source.
1666
    if THIRD_PARTIES_CACHE:
×
1667
        cache_preload_list = os.path.join(
×
1668
            THIRD_PARTIES_CACHE, CACHE_PRELOAD_LIST_DEFAULT
1669
        )
1670
        cache_preload_pending = os.path.join(
×
1671
            THIRD_PARTIES_CACHE, CACHE_PRELOAD_PENDING_DEFAULT
1672
        )
1673
        cache_suffix_list = os.path.join(THIRD_PARTIES_CACHE, CACHE_SUFFIX_LIST_DEFAULT)
×
1674
    else:
1675
        cache_preload_list, cache_preload_pending, cache_suffix_list = None, None, None
×
1676

1677
    # Load Chrome's latest versioned HSTS preload list.
1678
    if PRELOAD_LIST is None:
×
1679
        if cache_preload_list and os.path.exists(cache_preload_list):
×
1680
            utils.debug("Using cached Chrome preload list.", divider=True)
×
1681
            with open(cache_preload_list, encoding="utf-8") as cache_file:
×
1682
                PRELOAD_LIST = json.loads(cache_file.read())
×
1683
        else:
1684
            PRELOAD_LIST = load_preload_list()
×
1685

1686
            if cache_preload_list:
×
1687
                utils.debug(
×
1688
                    "Caching preload list at %s", cache_preload_list, divider=True
1689
                )
1690
                utils.write(utils.json_for(PRELOAD_LIST), cache_preload_list)
×
1691

1692
    # Load Chrome's current HSTS pending preload list.
1693
    if PRELOAD_PENDING is None:
×
1694
        if cache_preload_pending and os.path.exists(cache_preload_pending):
×
1695
            utils.debug("Using cached hstspreload.org pending list.", divider=True)
×
1696
            with open(cache_preload_pending, encoding="utf-8") as cache_file:
×
1697
                PRELOAD_PENDING = json.loads(cache_file.read())
×
1698
        else:
1699
            PRELOAD_PENDING = load_preload_pending()
×
1700

1701
            if cache_preload_pending:
×
1702
                utils.debug(
×
1703
                    "Caching preload pending list at %s",
1704
                    cache_preload_pending,
1705
                    divider=True,
1706
                )
1707
                utils.write(utils.json_for(PRELOAD_PENDING), cache_preload_pending)
×
1708

1709
    # Load Mozilla's current Public Suffix list.
1710
    if SUFFIX_LIST is None:
×
1711
        if cache_suffix_list:
×
1712
            # Retrieve the list if the path does not exist otherwise use the cached copy
1713
            SUFFIX_LIST = load_suffix_list(
×
1714
                cache_suffix_list, not os.path.exists(cache_suffix_list)
1715
            )
1716
        else:
1717
            # Load the built-in PSL
1718
            SUFFIX_LIST = load_suffix_list()
×
1719

1720

1721
def inspect_domains(domains, options):
2✔
1722
    """Run inspect() against each of the given domains with the given options."""
1723
    # Override timeout, user agent, preload cache, default CA bundle
1724
    global TIMEOUT, USER_AGENT, THIRD_PARTIES_CACHE, CA_FILE, PT_INT_CA_FILE, STORE
1725

1726
    if options.get("timeout"):
×
1727
        TIMEOUT = int(options["timeout"])
×
1728
    if options.get("user_agent"):
×
1729
        USER_AGENT = options["user_agent"]
×
1730

1731
    # Supported cache flag, a directory to store all third party requests.
1732
    if options.get("cache-third-parties"):
×
1733
        THIRD_PARTIES_CACHE = options["cache-third-parties"]
×
1734

1735
    if options.get("ca_file"):
×
1736
        CA_FILE = options["ca_file"]
×
1737
        # By default, the store that we want to check is the Mozilla store
1738
        # However, if a user wants to use their own CA bundle, check the
1739
        # "Custom" Option from the sslyze output.
1740
        STORE = "Custom"
×
1741

1742
    if options.get("pt_int_ca_file"):
×
1743
        PT_INT_CA_FILE = options["pt_int_ca_file"]
×
1744

1745
    # If this has been run once already by a Python API client, it
1746
    # can be safely run without hitting the network or disk again,
1747
    # and without overriding the data the Python user set for them.
1748
    initialize_external_data()
×
1749

1750
    # For every given domain, get inspect data.
1751
    for domain in domains:
×
1752
        yield inspect(domain)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc