• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pshtt / 4126312479

pending completion
4126312479

push

github

jmorrowomni
Merge https://github.com/cisagov/skeleton-python-library into lineage/skeleton

58 of 287 branches covered (20.21%)

Branch coverage included in aggregate %.

358 of 865 relevant lines covered (41.39%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.62
/src/pshtt/pshtt.py
1
"""Provide the core functionality of the pshtt library."""
2

3
# Standard Python Libraries
4
import base64
2✔
5
import codecs
2✔
6
import json
2✔
7
import logging
2✔
8
import os
2✔
9
import re
2✔
10
import sys
2✔
11
from urllib import parse as urlparse
2✔
12

13
# Third-Party Libraries
14
import OpenSSL
2✔
15
from publicsuffixlist.compat import PublicSuffixList  # type: ignore
2✔
16
from publicsuffixlist.update import updatePSL  # type: ignore
2✔
17
import requests
2✔
18

19
# Unable to find type stubs for the sslyze package.
20
import sslyze  # type: ignore
2✔
21
from sslyze.server_connectivity_tester import (  # type: ignore
2✔
22
    ServerConnectivityError,
23
    ServerConnectivityTester,
24
)
25
import sslyze.synchronous_scanner  # type: ignore
2✔
26
import urllib3
2✔
27

28
from . import utils
2✔
29
from .models import Domain, Endpoint
2✔
30

31
# We're going to be making requests with certificate validation
32
# disabled.  Commented next line due to pylint warning that urllib3 is
33
# not in requests.packages
34
# requests.packages.urllib3.disable_warnings()
35
urllib3.disable_warnings()
2✔
36

37
# Default, overrideable via --user-agent
38
USER_AGENT = "pshtt, https scanning"
2✔
39

40
# Defaults to 5 second, overrideable via --timeout
41
TIMEOUT = 5
2✔
42

43
# The fields we're collecting, will be keys in JSON and
44
# column headers in CSV.
45
HEADERS = [
2✔
46
    "Domain",
47
    "Base Domain",
48
    "Canonical URL",
49
    "Live",
50
    "HTTPS Live",
51
    "HTTPS Full Connection",
52
    "HTTPS Client Auth Required",
53
    "Redirect",
54
    "Redirect To",
55
    "Valid HTTPS",
56
    "HTTPS Publicly Trusted",
57
    "HTTPS Custom Truststore Trusted",
58
    "Defaults to HTTPS",
59
    "Downgrades HTTPS",
60
    "Strictly Forces HTTPS",
61
    "HTTPS Bad Chain",
62
    "HTTPS Bad Hostname",
63
    "HTTPS Expired Cert",
64
    "HTTPS Self Signed Cert",
65
    "HSTS",
66
    "HSTS Header",
67
    "HSTS Max Age",
68
    "HSTS Entire Domain",
69
    "HSTS Preload Ready",
70
    "HSTS Preload Pending",
71
    "HSTS Preloaded",
72
    "Base Domain HSTS Preloaded",
73
    "Domain Supports HTTPS",
74
    "Domain Enforces HTTPS",
75
    "Domain Uses Strong HSTS",
76
    "IP",
77
    "Server Header",
78
    "Server Version",
79
    "HTTPS Cert Chain Length",
80
    "HTTPS Probably Missing Intermediate Cert",
81
    "Notes",
82
    "Unknown Error",
83
]
84

85
# Used for caching the HSTS preload list from Chromium's source.
86
CACHE_PRELOAD_LIST_DEFAULT = "preloaded.json"
2✔
87
PRELOAD_LIST = None
2✔
88

89
# Used for caching the HSTS pending preload list from hstspreload.org.
90
CACHE_PRELOAD_PENDING_DEFAULT = "preload-pending.json"
2✔
91
PRELOAD_PENDING = None
2✔
92

93
# Used for determining base domain via Mozilla's public suffix list.
94
CACHE_SUFFIX_LIST_DEFAULT = "public-suffix-list.txt"
2✔
95
SUFFIX_LIST = None
2✔
96

97
# Directory to cache all third party responses, if set by user.
98
THIRD_PARTIES_CACHE = None
2✔
99

100
# Set if user wants to use a custom CA bundle
101
CA_FILE = None
2✔
102
STORE = "Mozilla"
2✔
103
PT_INT_CA_FILE = None
2✔
104

105

106
def inspect(base_domain):
2✔
107
    """Inpsect the provided domain."""
108
    domain = Domain(base_domain)
×
109
    domain.http = Endpoint("http", "root", base_domain)
×
110
    domain.httpwww = Endpoint("http", "www", base_domain)
×
111
    domain.https = Endpoint("https", "root", base_domain)
×
112
    domain.httpswww = Endpoint("https", "www", base_domain)
×
113

114
    # Analyze HTTP endpoint responsiveness and behavior.
115
    basic_check(domain.http)
×
116
    basic_check(domain.httpwww)
×
117
    basic_check(domain.https)
×
118
    basic_check(domain.httpswww)
×
119

120
    # Analyze HSTS header, if present, on each HTTPS endpoint.
121
    hsts_check(domain.https)
×
122
    hsts_check(domain.httpswww)
×
123

124
    return result_for(domain)
×
125

126

127
def result_for(domain):
2✔
128
    """Get the results for the provided domain."""
129
    # print(utils.json_for(domain.to_object()))
130

131
    # Because it will inform many other judgments, first identify
132
    # an acceptable "canonical" URL for the domain.
133
    domain.canonical = canonical_endpoint(
2✔
134
        domain.http, domain.httpwww, domain.https, domain.httpswww
135
    )
136

137
    # First, the basic fields the CSV will use.
138
    result = {
2✔
139
        "Domain": domain.domain,
140
        "Base Domain": parent_domain_for(domain.domain),
141
        "Canonical URL": domain.canonical.url,
142
        "Live": is_live(domain),
143
        "Redirect": is_redirect_domain(domain),
144
        "Redirect To": redirects_to(domain),
145
        "HTTPS Live": is_https_live(domain),
146
        "HTTPS Full Connection": is_full_connection(domain),
147
        "HTTPS Client Auth Required": is_client_auth_required(domain),
148
        "Valid HTTPS": is_valid_https(domain),
149
        "HTTPS Publicly Trusted": is_publicly_trusted(domain),
150
        "HTTPS Custom Truststore Trusted": is_custom_trusted(domain),
151
        "Defaults to HTTPS": is_defaults_to_https(domain),
152
        "Downgrades HTTPS": is_downgrades_https(domain),
153
        "Strictly Forces HTTPS": is_strictly_forces_https(domain),
154
        "HTTPS Bad Chain": is_bad_chain(domain),
155
        "HTTPS Bad Hostname": is_bad_hostname(domain),
156
        "HTTPS Expired Cert": is_expired_cert(domain),
157
        "HTTPS Self Signed Cert": is_self_signed_cert(domain),
158
        "HTTPS Cert Chain Length": cert_chain_length(domain),
159
        "HTTPS Probably Missing Intermediate Cert": is_missing_intermediate_cert(
160
            domain
161
        ),
162
        "HSTS": is_hsts(domain),
163
        "HSTS Header": hsts_header(domain),
164
        "HSTS Max Age": hsts_max_age(domain),
165
        "HSTS Entire Domain": is_hsts_entire_domain(domain),
166
        "HSTS Preload Ready": is_hsts_preload_ready(domain),
167
        "HSTS Preload Pending": is_hsts_preload_pending(domain),
168
        "HSTS Preloaded": is_hsts_preloaded(domain),
169
        "Base Domain HSTS Preloaded": is_parent_hsts_preloaded(domain),
170
        "Domain Supports HTTPS": is_domain_supports_https(domain),
171
        "Domain Enforces HTTPS": is_domain_enforces_https(domain),
172
        "Domain Uses Strong HSTS": is_domain_strong_hsts(domain),
173
        "IP": get_domain_ip(domain),
174
        "Server Header": get_domain_server_header(domain),
175
        "Server Version": get_domain_server_version(domain),
176
        "Notes": get_domain_notes(domain),
177
        "Unknown Error": did_domain_error(domain),
178
    }
179

180
    # But also capture the extended data for those who want it.
181
    result["endpoints"] = domain.to_object()
2✔
182

183
    # This bit is complicated because of the continue statements,
184
    # perhaps overly so.  For instance, the continue statement
185
    # following the "if header in ..." statement after "if not
186
    # result['HTTPS Full Connection]" means that the final if
187
    # statement that sets None values to False does not apply to those
188
    # fields.  This code should be rewritten to more clear, or at
189
    # least commented so that it is clearer what is happening to the
190
    # various fields.  There is some implied logic due to the continue
191
    # statements that is tricky, at least at first glance.
192
    #
193
    # Also, the comment before "for header in HEADERS" is not accurate
194
    # for the same reason.
195
    #
196
    # - jsf9k
197

198
    # Convert Header fields from None to False, except for:
199
    # - "HSTS Header"
200
    # - "HSTS Max Age"
201
    # - "Redirect To"
202
    for header in HEADERS:
2✔
203
        if header in ("HSTS Header", "HSTS Max Age", "Redirect To"):
2✔
204
            continue
2✔
205

206
        if not result["HTTPS Full Connection"]:
2!
207
            if header in (
2✔
208
                "HSTS",
209
                "HSTS Header",
210
                "HSTS Max Age",
211
                "HSTS Entire Domain",
212
                "HSTS Preload Ready",
213
                "Domain Uses Strong HSTS",
214
            ):
215
                continue
2✔
216

217
        if (
2✔
218
            header
219
            in ("IP", "Server Header", "Server Version", "HTTPS Cert Chain Length")
220
            and result[header] is None
221
        ):
222
            continue
2✔
223

224
        if header in (
2✔
225
            "Valid HTTPS",
226
            "HTTPS Publicly Trusted",
227
            "HTTPS Custom Truststore Trusted",
228
        ):
229
            if not result["HTTPS Live"]:
2!
230
                result[header] = False
2✔
231
            continue
2✔
232

233
        if result[header] is None:
2✔
234
            result[header] = False
2✔
235

236
    return result
2✔
237

238

239
def ping(url, allow_redirects=False, verify=True):
2✔
240
    """Attempt to reach the given URL.
241

242
    If there is a custom CA file and we want to verify
243
    use that instead when pinging with requests
244

245
    By changing the verify param from a boolean to a .pem file, the
246
    requests module will use the .pem to validate HTTPS connections.
247

248
    Note that we are using the streaming variant of the
249
    python-requests library here and we are not actually reading the
250
    content of the request.  As a result, the close() method MUST be
251
    called on the Request object returned by this method.  That is the
252
    ONLY way the connection can be closed and released back into the
253
    pool.  One way to ensure this happens is to use the "with" Python
254
    construct.
255

256
    If we ever begin reading response bodies, they will need to be
257
    explicitly read from Response.content, and we will also want to
258
    use conditional logic to read from response bodies where they
259
    exist and are useful. We'll also need to watch for Content-Type
260
    values like multipart/x-mixed-replace;boundary=ffserver that
261
    indicate that the response body will stream indefinitely.
262
    """
263
    if CA_FILE and verify:
×
264
        verify = CA_FILE
×
265

266
    return requests.get(
×
267
        url,
268
        allow_redirects=allow_redirects,
269
        # Validate certificates.
270
        verify=verify,
271
        # Setting this to true delays the retrieval of the content
272
        # until we access Response.content.  Since we aren't
273
        # interested in the actual content of the request, this will
274
        # save us time and bandwidth.
275
        #
276
        # This will also stop pshtt from hanging on URLs that stream
277
        # neverending data, like webcams.  See issue #138:
278
        # https://github.com/dhs-ncats/pshtt/issues/138
279
        stream=True,
280
        # set by --user_agent
281
        headers={"User-Agent": USER_AGENT},
282
        # set by --timeout
283
        timeout=TIMEOUT,
284
    )
285

286

287
def basic_check(endpoint):
2✔
288
    """Test the endpoint.
289

290
    At first:
291
    * Don't follow redirects. (Will only follow if necessary.)
292
      If it's a 3XX, we'll ping again to follow redirects. This is
293
      necessary to reliably scope any errors (e.g. TLS errors) to
294
      the original endpoint.
295

296
    * Validate certificates. (Will figure out error if necessary.)
297
    """
298
    utils.debug("Pinging %s...", endpoint.url, divider=True)
×
299

300
    req = None
×
301

302
    try:
×
303
        with ping(endpoint.url) as req:
×
304
            endpoint.live = True
×
305
            if endpoint.protocol == "https":
×
306
                endpoint.https_full_connection = True
×
307
                endpoint.https_valid = True
×
308

309
    except requests.exceptions.SSLError as err:
×
310
        if "bad handshake" in str(err) and (
×
311
            "sslv3 alert handshake failure" in str(err) or "Unexpected EOF" in str(err)
312
        ):
313
            logging.exception(
×
314
                "%s: Error completing TLS handshake usually due to required client authentication.",
315
                endpoint.url,
316
            )
317
            utils.debug("%s: %s", endpoint.url, err)
×
318
            endpoint.live = True
×
319
            if endpoint.protocol == "https":
×
320
                # The https can still be valid with a handshake error,
321
                # sslyze will run later and check if it is not valid
322
                endpoint.https_valid = True
×
323
                endpoint.https_full_connection = False
×
324

325
        else:
326
            logging.exception(
×
327
                "%s: Error connecting over SSL/TLS or validating certificate.",
328
                endpoint.url,
329
            )
330
            utils.debug("%s: %s", endpoint.url, err)
×
331
            # Retry with certificate validation disabled.
332
            try:
×
333
                with ping(endpoint.url, verify=False) as req:
×
334
                    endpoint.live = True
×
335
                    if endpoint.protocol == "https":
×
336
                        endpoint.https_full_connection = True
×
337
                        # sslyze later will actually check if the cert is valid
338
                        endpoint.https_valid = True
×
339
            except requests.exceptions.SSLError as err:
×
340
                # If it's a protocol error or other, it's not a full connection,
341
                # but it is live.
342
                endpoint.live = True
×
343
                if endpoint.protocol == "https":
×
344
                    endpoint.https_full_connection = False
×
345
                    # HTTPS may still be valid, sslyze will double-check later
346
                    endpoint.https_valid = True
×
347
                logging.exception(
×
348
                    "%s: Unexpected SSL protocol (or other) error during retry.",
349
                    endpoint.url,
350
                )
351
                utils.debug("%s: %s", endpoint.url, err)
×
352
                # continue on to SSLyze to check the connection
353
            except requests.exceptions.RequestException as err:
×
354
                endpoint.live = False
×
355
                logging.exception(
×
356
                    "%s: Unexpected requests exception during retry.", endpoint.url
357
                )
358
                utils.debug("%s: %s", endpoint.url, err)
×
359
                return
×
360
            except OpenSSL.SSL.Error as err:
×
361
                endpoint.live = False
×
362
                logging.exception(
×
363
                    "%s: Unexpected OpenSSL exception during retry.", endpoint.url
364
                )
365
                utils.debug("%s: %s", endpoint.url, err)
×
366
                return
×
367
            except Exception as err:
×
368
                endpoint.unknown_error = True
×
369
                logging.exception(
×
370
                    "%s: Unexpected other unknown exception during requests retry.",
371
                    endpoint.url,
372
                )
373
                utils.debug("%s: %s", endpoint.url, err)
×
374
                return
×
375

376
        # If it was a certificate error of any kind, it's live,
377
        # unless SSLyze encounters a connection error later
378
        endpoint.live = True
×
379

380
    except requests.exceptions.ConnectionError as err:
×
381
        # We can get this for some endpoints that are actually live,
382
        # so if it's https let's try sslyze to be sure
383
        if endpoint.protocol == "https":
×
384
            # https check later will set whether the endpoint is live and valid
385
            endpoint.https_full_connection = False
×
386
            endpoint.https_valid = True
×
387
        else:
388
            endpoint.live = False
×
389
        logging.exception("%s: Error connecting.", endpoint.url)
×
390
        utils.debug("%s: %s", endpoint.url, err)
×
391

392
    # And this is the parent of ConnectionError and other things.
393
    # For example, "too many redirects".
394
    # See https://github.com/kennethreitz/requests/blob/master/requests/exceptions.py
395
    except requests.exceptions.RequestException as err:
×
396
        endpoint.live = False
×
397
        logging.exception("%s: Unexpected other requests exception.", endpoint.url)
×
398
        utils.debug("%s: %s", endpoint.url, err)
×
399
        return
×
400

401
    except Exception as err:
×
402
        endpoint.unknown_error = True
×
403
        logging.exception(
×
404
            "%s: Unexpected other unknown exception during initial request.",
405
            endpoint.url,
406
        )
407
        utils.debug("%s: %s", endpoint.url, err)
×
408
        return
×
409

410
    # Run SSLyze to see if there are any errors
411
    if endpoint.protocol == "https":
×
412
        https_check(endpoint)
×
413
        # Double-check in case sslyze failed the first time, but the regular conneciton succeeded
414
        if endpoint.live is False and req is not None:
×
415
            logging.warning(
×
416
                "%s: Trying sslyze again since it connected once already.", endpoint.url
417
            )
418
            endpoint.live = True
×
419
            endpoint.https_valid = True
×
420
            https_check(endpoint)
×
421
            if endpoint.live is False:
×
422
                # sslyze failed so back everything out and don't continue analyzing the existing response
423
                req = None
×
424
                endpoint.https_valid = False
×
425
                endpoint.https_full_connection = False
×
426

427
    if req is None:
×
428
        # Ensure that full_connection is set to False if we didn't get a response
429
        if endpoint.protocol == "https":
×
430
            endpoint.https_full_connection = False
×
431
        return
×
432

433
    # try to get IP address if we can
434
    try:
×
435
        if req.raw.closed is False:
×
436
            ip = req.raw._connection.sock.socket.getpeername()[0]
×
437
            if endpoint.ip is None:
×
438
                endpoint.ip = ip
×
439
            else:
440
                if endpoint.ip != ip:
×
441
                    utils.debug(
×
442
                        "%s: Endpoint IP is already %s, but requests IP is %s.",
443
                        endpoint.url,
444
                        endpoint.ip,
445
                        ip,
446
                    )
447
    except Exception:
×
448
        # if the socket has already closed, it will throw an exception, but this is just best effort, so ignore it
449
        logging.exception("Error closing socket")
×
450

451
    # Endpoint is live, analyze the response.
452
    endpoint.headers = req.headers
×
453

454
    endpoint.status = req.status_code
×
455

456
    if req.headers.get("Server") is not None:
×
457
        endpoint.server_header = req.headers.get("Server")
×
458
        # *** in the future add logic to convert header to server version if known
459

460
    if (req.headers.get("Location") is not None) and str(endpoint.status).startswith(
×
461
        "3"
462
    ):
463
        endpoint.redirect = True
×
464
        logging.warning("%s: Found redirect.", endpoint.url)
×
465

466
    if endpoint.redirect:
×
467
        try:
×
468
            location_header = req.headers.get("Location")
×
469
            # Absolute redirects (e.g. "https://example.com/Index.aspx")
470
            if location_header.startswith("http:") or location_header.startswith(
×
471
                "https:"
472
            ):
473
                immediate = location_header
×
474

475
            # Relative redirects (e.g. "Location: /Index.aspx").
476
            # Construct absolute URI, relative to original request.
477
            else:
478
                immediate = urlparse.urljoin(endpoint.url, location_header)
×
479

480
            # Chase down the ultimate destination, ignoring any certificate warnings.
481
            ultimate_req = None
×
482
        except Exception as err:
×
483
            endpoint.unknown_error = True
×
484
            logging.exception(
×
485
                "%s: Unexpected other unknown exception when handling Requests Header.",
486
                endpoint.url,
487
            )
488
            utils.debug("%s %s", endpoint.url, err)
×
489

490
        try:
×
491
            with ping(endpoint.url, allow_redirects=True, verify=False) as ultimate_req:
×
492
                pass
×
493
        except (requests.exceptions.RequestException, OpenSSL.SSL.Error):
×
494
            # Swallow connection errors, but we won't be saving redirect info.
495
            logging.exception("Connection error")
×
496
        except Exception as err:
×
497
            endpoint.unknown_error = True
×
498
            logging.exception(
×
499
                "%s: Unexpected other unknown exception when handling redirect.",
500
                endpoint.url,
501
            )
502
            utils.debug("%s: %s", endpoint.url, err)
×
503
            return
×
504

505
        try:
×
506
            # Now establish whether the redirects were:
507
            # * internal (same exact hostname),
508
            # * within the zone (any subdomain within the parent domain)
509
            # * external (on some other parent domain)
510

511
            # The hostname of the endpoint (e.g. "www.agency.gov")
512
            subdomain_original = urlparse.urlparse(endpoint.url).hostname
×
513
            # The parent domain of the endpoint (e.g. "agency.gov")
514
            base_original = parent_domain_for(subdomain_original)
×
515

516
            # The hostname of the immediate redirect.
517
            # The parent domain of the immediate redirect.
518
            subdomain_immediate = urlparse.urlparse(immediate).hostname
×
519
            base_immediate = parent_domain_for(subdomain_immediate)
×
520

521
            endpoint.redirect_immediately_to = immediate
×
522
            endpoint.redirect_immediately_to_https = immediate.startswith("https://")
×
523
            endpoint.redirect_immediately_to_http = immediate.startswith("http://")
×
524
            endpoint.redirect_immediately_to_external = base_original != base_immediate
×
525
            endpoint.redirect_immediately_to_subdomain = (
×
526
                base_original == base_immediate
527
            ) and (subdomain_original != subdomain_immediate)
528

529
            # We're interested in whether an endpoint redirects to the www version
530
            # of itself (not whether it redirects to www prepended to any other
531
            # hostname, even within the same parent domain).
532
            endpoint.redirect_immediately_to_www = subdomain_immediate == (
×
533
                f"www.{subdomain_original}"
534
            )
535

536
            if ultimate_req is not None:
×
537
                # For ultimate destination, use the URL we arrived at,
538
                # not Location header. Auto-resolves relative redirects.
539
                eventual = ultimate_req.url
×
540

541
                # The hostname of the eventual destination.
542
                # The parent domain of the eventual destination.
543
                subdomain_eventual = urlparse.urlparse(eventual).hostname
×
544
                base_eventual = parent_domain_for(subdomain_eventual)
×
545

546
                endpoint.redirect_eventually_to = eventual
×
547
                endpoint.redirect_eventually_to_https = eventual.startswith("https://")
×
548
                endpoint.redirect_eventually_to_http = eventual.startswith("http://")
×
549
                endpoint.redirect_eventually_to_external = (
×
550
                    base_original != base_eventual
551
                )
552
                endpoint.redirect_eventually_to_subdomain = (
×
553
                    base_original == base_eventual
554
                ) and (subdomain_original != subdomain_eventual)
555

556
            # If we were able to make the first redirect, but not the ultimate redirect,
557
            # and if the immediate redirect is external, then it's accurate enough to
558
            # say that the eventual redirect is the immediate redirect, since you're capturing
559
            # the domain it's going to.
560
            # This also avoids "punishing" the domain for configuration issues of the site
561
            # it redirects to.
562
            elif endpoint.redirect_immediately_to_external:
×
563
                endpoint.redirect_eventually_to = endpoint.redirect_immediately_to
×
564
                endpoint.redirect_eventually_to_https = (
×
565
                    endpoint.redirect_immediately_to_https
566
                )
567
                endpoint.redirect_eventually_to_http = (
×
568
                    endpoint.redirect_immediately_to_http
569
                )
570
                endpoint.redirect_eventually_to_external = (
×
571
                    endpoint.redirect_immediately_to_external
572
                )
573
                endpoint.redirect_eventually_to_subdomain = (
×
574
                    endpoint.redirect_immediately_to_subdomain
575
                )
576
        except Exception as err:
×
577
            endpoint.unknown_error = True
×
578
            logging.exception(
×
579
                "%s: Unexpected other unknown exception when establishing redirects.",
580
                endpoint.url,
581
            )
582
            utils.debug("%s: %s", endpoint.url, err)
×
583

584

585
def hsts_check(endpoint):
2✔
586
    """Perform an HSTS check of the given endpoint.
587

588
    Given an endpoint and its detected headers, extract and parse
589
    any present HSTS header, decide what HSTS properties are there.
590

591
    Disqualify domains with a bad host, they won't work as valid HSTS.
592
    """
593
    try:
×
594
        if endpoint.https_bad_hostname:
×
595
            endpoint.hsts = False
×
596
            return
×
597

598
        header = endpoint.headers.get("Strict-Transport-Security")
×
599

600
        if header is None:
×
601
            endpoint.hsts = False
×
602
            return
×
603

604
        endpoint.hsts = True
×
605
        endpoint.hsts_header = header
×
606

607
        # Set max age to the string after max-age
608
        # TODO: make this more resilient to pathological HSTS headers.
609

610
        # handle multiple HSTS headers, requests comma-separates them
611
        first_pass = re.split(r",\s?", header)[0]
×
612
        second_pass = re.sub(r"\'", "", first_pass)
×
613

614
        temp = re.split(r";\s?", second_pass)
×
615

616
        if "max-age" in header.lower():
×
617
            endpoint.hsts_max_age = int(temp[0][len("max-age=") :])
×
618

619
        if endpoint.hsts_max_age is None or endpoint.hsts_max_age <= 0:
×
620
            endpoint.hsts = False
×
621
            return
×
622

623
        # check if hsts includes sub domains
624
        if "includesubdomains" in header.lower():
×
625
            endpoint.hsts_all_subdomains = True
×
626

627
        # Check is hsts has the preload flag
628
        if "preload" in header.lower():
×
629
            endpoint.hsts_preload = True
×
630
    except Exception as err:
×
631
        endpoint.unknown_error = True
×
632
        logging.exception(
×
633
            "%s: Unknown exception when handling HSTS check.", endpoint.url
634
        )
635
        utils.debug("%s: %s", endpoint.url, err)
×
636
        return
×
637

638

639
def https_check(endpoint):
2✔
640
    """Use sslyze to figure out the reason an endpoint failed to verify."""
641
    utils.debug("sslyzing %s...", endpoint.url)
×
642

643
    # remove the https:// from prefix for sslyze
644
    try:
×
645
        hostname = endpoint.url[8:]
×
646
        server_tester = ServerConnectivityTester(hostname=hostname, port=443)
×
647
        server_info = server_tester.perform()
×
648
        endpoint.live = True
×
649
        ip = server_info.ip_address
×
650
        if endpoint.ip is None:
×
651
            endpoint.ip = ip
×
652
        else:
653
            if endpoint.ip != ip:
×
654
                utils.debug(
×
655
                    "%s: Endpoint IP is already %s, but requests IP is %s.",
656
                    endpoint.url,
657
                    endpoint.ip,
658
                    ip,
659
                )
660
        if server_info.client_auth_requirement.name == "REQUIRED":
×
661
            endpoint.https_client_auth_required = True
×
662
            logging.warning("%s: Client Authentication REQUIRED", endpoint.url)
×
663
    except ServerConnectivityError as err:
×
664
        endpoint.live = False
×
665
        endpoint.https_valid = False
×
666
        logging.exception(
×
667
            "%s: Error in sslyze server connectivity check when connecting to %s",
668
            endpoint.url,
669
            err.server_info.hostname,
670
        )
671
        utils.debug("%s: %s", endpoint.url, err)
×
672
        return
×
673
    except Exception as err:
×
674
        endpoint.unknown_error = True
×
675
        logging.exception(
×
676
            "%s: Unknown exception in sslyze server connectivity check.", endpoint.url
677
        )
678
        utils.debug("%s: %s", endpoint.url, err)
×
679
        return
×
680

681
    try:
×
682
        cert_plugin_result = None
×
683
        command = sslyze.plugins.certificate_info_plugin.CertificateInfoScanCommand(
×
684
            ca_file=CA_FILE
685
        )
686
        scanner = sslyze.synchronous_scanner.SynchronousScanner()
×
687
        cert_plugin_result = scanner.run_scan_command(server_info, command)
×
688
    except Exception as err:
×
689
        try:
×
690
            if "timed out" in str(err):
×
691
                logging.exception(
×
692
                    "%s: Retrying sslyze scanner certificate plugin.", endpoint.url
693
                )
694
                cert_plugin_result = scanner.run_scan_command(server_info, command)
×
695
            else:
696
                logging.exception(
×
697
                    "%s: Unknown exception in sslyze scanner certificate plugin.",
698
                    endpoint.url,
699
                )
700
                utils.debug("%s: %s", endpoint.url, err)
×
701
                endpoint.unknown_error = True
×
702
                # We could make this False, but there was an error so
703
                # we don't know
704
                endpoint.https_valid = None
×
705
                return
×
706
        except Exception:
×
707
            logging.exception(
×
708
                "%s: Unknown exception in sslyze scanner certificate plugin.",
709
                endpoint.url,
710
            )
711
            utils.debug("%s: %s", endpoint.url, err)
×
712
            endpoint.unknown_error = True
×
713
            # We could make this False, but there was an error so we
714
            # don't know
715
            endpoint.https_valid = None
×
716
            return
×
717

718
    try:
×
719
        public_trust = True
×
720
        custom_trust = True
×
721
        public_not_trusted_names = []
×
722
        validation_results = cert_plugin_result.path_validation_result_list
×
723
        for result in validation_results:
×
724
            if result.was_validation_successful:
×
725
                # We're assuming that it is trusted to start with
726
                pass
×
727
            else:
728
                if "Custom" in result.trust_store.name:
×
729
                    custom_trust = False
×
730
                else:
731
                    public_trust = False
×
732
                    public_not_trusted_names.append(result.trust_store.name)
×
733
        if public_trust:
×
734
            logging.warning(
×
735
                "%s: Publicly trusted by common trust stores.", endpoint.url
736
            )
737
        else:
738
            logging.warning(
×
739
                "%s: Not publicly trusted - not trusted by %s.",
740
                endpoint.url,
741
                ", ".join(public_not_trusted_names),
742
            )
743
        if CA_FILE is not None:
×
744
            if custom_trust:
×
745
                logging.warning("%s: Trusted by custom trust store.", endpoint.url)
×
746
            else:
747
                logging.warning("%s: Not trusted by custom trust store.", endpoint.url)
×
748
        else:
749
            custom_trust = None
×
750
        endpoint.https_public_trusted = public_trust
×
751
        endpoint.https_custom_trusted = custom_trust
×
752
    except Exception as err:
×
753
        # Ignore exception
754
        logging.exception("%s: Unknown exception examining trust.", endpoint.url)
×
755
        utils.debug("%s: Unknown exception examining trust: %s", endpoint.url, err)
×
756

757
    try:
×
758
        cert_response = cert_plugin_result.as_text()
×
759
    except AttributeError:
×
760
        logging.exception(
×
761
            "%s: Known error in sslyze 1.X with EC public keys. See https://github.com/nabla-c0d3/sslyze/issues/215",
762
            endpoint.url,
763
        )
764
        return
×
765
    except Exception as err:
×
766
        endpoint.unknown_error = True
×
767
        logging.exception("%s: Unknown exception in cert plugin.", endpoint.url)
×
768
        utils.debug("%s: %s", endpoint.url, err)
×
769
        return
×
770

771
    # Debugging
772
    # for msg in cert_response:
773
    #     print(msg)
774

775
    # Default endpoint assessments to False until proven True.
776
    endpoint.https_expired_cert = False
×
777
    endpoint.https_self_signed_cert = False
×
778
    endpoint.https_bad_chain = False
×
779
    endpoint.https_bad_hostname = False
×
780

781
    # STORE will be either "Mozilla" or "Custom"
782
    # depending on what the user chose.
783

784
    # A certificate can have multiple issues.
785
    for msg in cert_response:
×
786

787
        # Check for missing SAN.
788
        if (("DNS Subject Alternative Names") in msg) and (("[]") in msg):
×
789
            endpoint.https_bad_hostname = True
×
790

791
        # Check for certificate expiration.
792
        if (
×
793
            (STORE in msg)
794
            and (("FAILED") in msg)
795
            and (("certificate has expired") in msg)
796
        ):
797
            endpoint.https_expired_cert = True
×
798

799
        # Check to see if the cert is self-signed
800
        if (
×
801
            (STORE in msg)
802
            and (("FAILED") in msg)
803
            and (("self signed certificate") in msg)
804
        ):
805
            endpoint.https_self_signed_cert = True
×
806

807
        # Check to see if there is a bad chain
808

809
        # NOTE: If this is the only flag that's set, it's probably
810
        # an incomplete chain
811
        # If this isnt the only flag that is set, it's might be
812
        # because there is another error. More debugging would
813
        # need to be done at this point, but not through sslyze
814
        # because sslyze doesn't have enough granularity
815

816
        if (
×
817
            (STORE in msg)
818
            and (("FAILED") in msg)
819
            and (
820
                (("unable to get local issuer certificate") in msg)
821
                or (("self signed certificate") in msg)
822
            )
823
        ):
824
            endpoint.https_bad_chain = True
×
825

826
        # Check for whether the hostname validates.
827
        if (
×
828
            (("Hostname Validation") in msg)
829
            and (("FAILED") in msg)
830
            and (("Certificate does NOT match") in msg)
831
        ):
832
            endpoint.https_bad_hostname = True
×
833

834
    try:
×
835
        endpoint.https_cert_chain_len = len(
×
836
            cert_plugin_result.received_certificate_chain
837
        )
838
        if endpoint.https_self_signed_cert is False and (
×
839
            endpoint.https_cert_chain_len < 2
840
        ):
841
            # *** TODO check that it is not a bad hostname and that the root cert is trusted before suggesting that it is an intermediate cert issue.
842
            endpoint.https_missing_intermediate_cert = True
×
843
            if cert_plugin_result.verified_certificate_chain is None:
×
844
                logging.warning(
×
845
                    "%s: Untrusted certificate chain, probably due to missing intermediate certificate.",
846
                    endpoint.url,
847
                )
848
                utils.debug(
×
849
                    "%s: Only %d certificates in certificate chain received.",
850
                    endpoint.url,
851
                    cert_plugin_result.received_certificate_chain.__len__(),
852
                )
853
            elif custom_trust is True and public_trust is False:
×
854
                # recheck public trust using custom public trust store with manually added intermediate certificates
855
                if PT_INT_CA_FILE is not None:
×
856
                    try:
×
857
                        cert_plugin_result = None
×
858
                        command = sslyze.plugins.certificate_info_plugin.CertificateInfoScanCommand(
×
859
                            ca_file=PT_INT_CA_FILE
860
                        )
861
                        cert_plugin_result = scanner.run_scan_command(
×
862
                            server_info, command
863
                        )
864
                        if cert_plugin_result.verified_certificate_chain is not None:
×
865
                            public_trust = True
×
866
                            endpoint.https_public_trusted = public_trust
×
867
                            logging.warning(
×
868
                                "%s: Trusted by special public trust store with intermediate certificates.",
869
                                endpoint.url,
870
                            )
871
                    except Exception:
×
872
                        logging.exception("Error while rechecking public trust")
×
873
        else:
874
            endpoint.https_missing_intermediate_cert = False
×
875
    except Exception:
×
876
        logging.exception("Error while determining length of certificate chain")
×
877

878
    # If anything is wrong then https is not valid
879
    if (
×
880
        endpoint.https_expired_cert
881
        or endpoint.https_self_signed_cert
882
        or endpoint.https_bad_chain
883
        or endpoint.https_bad_hostname
884
    ):
885
        endpoint.https_valid = False
×
886

887

888
def canonical_endpoint(http, httpwww, https, httpswww):
2✔
889
    """Make a best guess for the "canonical" endpoint of a domain.
890

891
    Given behavior for the four endpoints, make a best guess
892
    as to which is the "canonical" site for the domain.
893

894
    Most of the domain-level decisions rely on this guess in some way.
895

896
    A domain is "canonically" at www if:
897
     * at least one of its www endpoints responds
898
     * both root endpoints are either down or redirect *somewhere*
899
     * either both root endpoints are down, *or* at least one
900
       root endpoint redirect should immediately go to
901
       an *internal* www endpoint
902
    This is meant to affirm situations like:
903
      http:// -> https:// -> https://www
904
      https:// -> http:// -> https://www
905
    and meant to avoid affirming situations like:
906
      http:// -> http://non-www,
907
      http://www -> http://non-www
908
    or like:
909
      https:// -> 200, http:// -> http://www
910
    """
911
    at_least_one_www_used = httpswww.live or httpwww.live
2✔
912

913
    def root_unused(endpoint):
2✔
914
        return (
2✔
915
            endpoint.redirect
916
            or not endpoint.live
917
            or endpoint.https_bad_hostname  # harmless for http endpoints
918
            or not str(endpoint.status).startswith("2")
919
        )
920

921
    def root_down(endpoint):
2✔
922
        return (
2✔
923
            not endpoint.live
924
            or endpoint.https_bad_hostname
925
            or (
926
                not str(endpoint.status).startswith("2")
927
                and not str(endpoint.status).startswith("3")
928
            )
929
        )
930

931
    all_roots_unused = root_unused(https) and root_unused(http)
2✔
932

933
    all_roots_down = root_down(https) and root_down(http)
2✔
934

935
    is_www = (
2✔
936
        at_least_one_www_used
937
        and all_roots_unused
938
        and (
939
            all_roots_down
940
            or https.redirect_immediately_to_www
941
            or http.redirect_immediately_to_www
942
        )
943
    )
944

945
    # A domain is "canonically" at https if:
946
    #  * at least one of its https endpoints is live and
947
    #    doesn't have an invalid hostname
948
    #  * both http endpoints are either down or redirect *somewhere*
949
    #  * at least one http endpoint redirects immediately to
950
    #    an *internal* https endpoint
951
    # This is meant to affirm situations like:
952
    #   http:// -> http://www -> https://
953
    #   https:// -> http:// -> https://www
954
    # and meant to avoid affirming situations like:
955
    #   http:// -> http://non-www
956
    #   http://www -> http://non-www
957
    # or:
958
    #   http:// -> 200, http://www -> https://www
959
    #
960
    # It allows a site to be canonically HTTPS if the cert has
961
    # a valid hostname but invalid chain issues.
962

963
    def https_used(endpoint):
2✔
964
        return endpoint.live and not endpoint.https_bad_hostname
2✔
965

966
    def http_unused(endpoint):
2✔
967
        return (
2✔
968
            endpoint.redirect
969
            or not endpoint.live
970
            or not str(endpoint.status).startswith("2")
971
        )
972

973
    def http_upgrades(endpoint):
2✔
974
        return endpoint.redirect_immediately_to_https and (
2✔
975
            not endpoint.redirect_immediately_to_external
976
        )
977

978
    at_least_one_https_endpoint = https_used(https) or https_used(httpswww)
2✔
979
    all_http_unused = http_unused(http) and http_unused(httpwww)
2✔
980
    both_http_down = not http.live and not httpwww.live
2✔
981
    at_least_one_http_upgrades = http_upgrades(http) or http_upgrades(httpwww)
2✔
982

983
    is_https = (
2✔
984
        at_least_one_https_endpoint
985
        and all_http_unused
986
        and (both_http_down or at_least_one_http_upgrades)
987
    )
988

989
    if is_www and is_https:
2!
990
        return httpswww
×
991
    if is_www and not is_https:
2!
992
        return httpwww
×
993
    if not is_www and is_https:
2!
994
        return https
×
995
    if not is_www and not is_https:
2!
996
        return http
2✔
997

998

999
##
1000
# Judgment calls based on observed endpoint data.
1001
##
1002

1003

1004
def is_live(domain):
2✔
1005
    """Check if a domain has any live endpoints."""
1006
    http, httpwww, https, httpswww = (
2✔
1007
        domain.http,
1008
        domain.httpwww,
1009
        domain.https,
1010
        domain.httpswww,
1011
    )
1012

1013
    return http.live or httpwww.live or https.live or httpswww.live
2✔
1014

1015

1016
def is_https_live(domain):
2✔
1017
    """Check if a domain has any live HTTPS endpoints."""
1018
    https, httpswww = domain.https, domain.httpswww
2✔
1019

1020
    return https.live or httpswww.live
2✔
1021

1022

1023
def is_full_connection(domain):
2✔
1024
    """Check if a domain is fully connected.
1025

1026
    Domain is "fully connected" if any HTTPS endpoint is fully connected.
1027
    """
1028
    https, httpswww = domain.https, domain.httpswww
2✔
1029

1030
    return https.https_full_connection or httpswww.https_full_connection
2✔
1031

1032

1033
def is_client_auth_required(domain):
2✔
1034
    """Check if a domain requires client authentication.
1035

1036
    Domain requires client authentication if *any* HTTPS endpoint requires it for full
1037
    TLS connection.
1038
    """
1039
    https, httpswww = domain.https, domain.httpswww
2✔
1040

1041
    return https.https_client_auth_required or httpswww.https_client_auth_required
2✔
1042

1043

1044
def is_redirect_or_down(endpoint):
2✔
1045
    """Check if an endpoint redirects to an external site or is down.
1046

1047
    Endpoint is a redirect or down if it is a redirect to an external site or it is
1048
    down in any of 3 ways: it is not live, it is HTTPS and has a bad hostname in the
1049
    cert, or it responds with a 4xx error code
1050
    """
1051
    return (
×
1052
        endpoint.redirect_eventually_to_external
1053
        or not endpoint.live
1054
        or (endpoint.protocol == "https" and endpoint.https_bad_hostname)
1055
        or (endpoint.status is not None and endpoint.status >= 400)
1056
    )
1057

1058

1059
def is_redirect(endpoint):
2✔
1060
    """Check if an endpoint is a redirect to an external site."""
1061
    return endpoint.redirect_eventually_to_external
×
1062

1063

1064
def is_redirect_domain(domain):
2✔
1065
    """Check if a domain redirects HTTP or HTTPS traffic.
1066

1067
    Domain is "a redirect domain" if at least one endpoint is
1068
    a redirect, and all endpoints are either redirects or down.
1069
    """
1070
    http, httpwww, https, httpswww = (
2✔
1071
        domain.http,
1072
        domain.httpwww,
1073
        domain.https,
1074
        domain.httpswww,
1075
    )
1076

1077
    return is_live(domain) and (
2✔
1078
        (
1079
            is_redirect(http)
1080
            or is_redirect(httpwww)
1081
            or is_redirect(https)
1082
            or is_redirect(httpswww)
1083
        )
1084
        and is_redirect_or_down(https)
1085
        and is_redirect_or_down(httpswww)
1086
        and is_redirect_or_down(httpwww)
1087
        and is_redirect_or_down(http)
1088
    )
1089

1090

1091
def is_http_redirect_domain(domain):
2✔
1092
    """Check if a domain redirects HTTP traffic.
1093

1094
    Domain is "an http redirect domain" if at least one HTTP endpoint
1095
    is a redirect, and all other http endpoints are either redirects
1096
    or down.
1097
    """
1098
    http, httpwww, = (
×
1099
        domain.http,
1100
        domain.httpwww,
1101
    )
1102

1103
    return is_live(domain) and (
×
1104
        (is_redirect(http) or is_redirect(httpwww))
1105
        and is_redirect_or_down(httpwww)
1106
        and is_redirect_or_down(http)
1107
    )
1108

1109

1110
def redirects_to(domain):
2✔
1111
    """Check where a domain redirects to (if it redirects).
1112

1113
    If a domain is a "redirect domain", where does it redirect to?
1114
    """
1115
    canonical = domain.canonical
2✔
1116

1117
    if is_redirect_domain(domain):
2!
1118
        return canonical.redirect_eventually_to
×
1119
    return None
2✔
1120

1121

1122
def is_valid_https(domain):
2✔
1123
    """Check if a domain has a valid HTTPS server.
1124

1125
    A domain has "valid HTTPS" if it responds on port 443 at its canonical
1126
    hostname with an unexpired valid certificate for the hostname.
1127
    """
1128
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1129

1130
    # Evaluate the HTTPS version of the canonical hostname
1131
    evaluate = https if canonical.host == "root" else httpswww
2✔
1132

1133
    return evaluate.live and evaluate.https_valid
2✔
1134

1135

1136
def is_defaults_to_https(domain):
2✔
1137
    """Check if a domain defaults to HTTPS.
1138

1139
    A domain "defaults to HTTPS" if its canonical endpoint uses HTTPS.
1140
    """
1141
    canonical = domain.canonical
2✔
1142

1143
    return canonical.protocol == "https"
2✔
1144

1145

1146
def is_downgrades_https(domain):
2✔
1147
    """Check if a domain allows downgrading HTTPS.
1148

1149
    Domain downgrades if HTTPS is supported in some way, but
1150
    its canonical HTTPS endpoint immediately redirects internally to HTTP.
1151
    """
1152
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1153

1154
    # The domain "supports" HTTPS if any HTTPS endpoint responds with
1155
    # a certificate valid for its hostname.
1156
    supports_https = (https.live and not https.https_bad_hostname) or (
2✔
1157
        httpswww.live and not httpswww.https_bad_hostname
1158
    )
1159

1160
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1161

1162
    # Explicitly convert to bool to avoid unintentionally returning None,
1163
    # which may happen if the site doesn't redirect.
1164
    return bool(
2✔
1165
        supports_https
1166
        and canonical_https.redirect_immediately_to_http
1167
        and not canonical_https.redirect_immediately_to_external
1168
    )
1169

1170

1171
def is_strictly_forces_https(domain):
2✔
1172
    """Check if a domain strictly forces HTTPS.
1173

1174
    A domain "Strictly Forces HTTPS" if one of the HTTPS endpoints is
1175
    "live", and if both *HTTP* endpoints are either:
1176

1177
     * down, or
1178
     * redirect immediately to an HTTPS URI.
1179

1180
    This is different than whether a domain "Defaults" to HTTPS.
1181

1182
    * An HTTP redirect can go to HTTPS on another domain, as long
1183
      as it's immediate.
1184
    * A domain with an invalid cert can still be enforcing HTTPS.
1185
    """
1186
    http, httpwww, https, httpswww = (
2✔
1187
        domain.http,
1188
        domain.httpwww,
1189
        domain.https,
1190
        domain.httpswww,
1191
    )
1192

1193
    def down_or_redirects(endpoint):
2✔
1194
        return not endpoint.live or endpoint.redirect_immediately_to_https
2✔
1195

1196
    https_somewhere = https.live or httpswww.live
2✔
1197
    all_http_unused = down_or_redirects(http) and down_or_redirects(httpwww)
2✔
1198

1199
    return https_somewhere and all_http_unused
2✔
1200

1201

1202
def is_publicly_trusted(domain):
2✔
1203
    """Check if a domain has a publicly trusted certificate.
1204

1205
    A domain has a "Publicly Trusted" certificate if its canonical
1206
    endpoint has a publicly trusted certificate.
1207
    """
1208
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1209

1210
    # Evaluate the HTTPS version of the canonical hostname
1211
    evaluate = https if canonical.host == "root" else httpswww
2✔
1212

1213
    return evaluate.live and evaluate.https_public_trusted
2✔
1214

1215

1216
def is_custom_trusted(domain):
2✔
1217
    """Check if a domain has a custom trusted certificate.
1218

1219
    A domain has a "Custom Trusted" certificate if its canonical
1220
    endpoint has a certificate that is trusted by the custom
1221
    truststore.
1222
    """
1223
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1224

1225
    # Evaluate the HTTPS version of the canonical hostname
1226
    evaluate = https if canonical.host == "root" else httpswww
2✔
1227

1228
    return evaluate.live and evaluate.https_custom_trusted
2✔
1229

1230

1231
def is_bad_chain(domain):
2✔
1232
    """Check if a domain has a bad certificate chain.
1233

1234
    Domain has a bad chain if its canonical HTTPS endpoint has a bad
1235
    chain.
1236
    """
1237
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1238

1239
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1240

1241
    return canonical_https.https_bad_chain
2✔
1242

1243

1244
def is_bad_hostname(domain):
2✔
1245
    """Check if a domain has a bad hostname.
1246

1247
    Domain has a bad hostname if its canonical HTTPS endpoint fails
1248
    hostname validation.
1249
    """
1250
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1251

1252
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1253

1254
    return canonical_https.https_bad_hostname
2✔
1255

1256

1257
def is_expired_cert(domain):
2✔
1258
    """Check if a domain's canonical endpoint has an expired certificate."""
1259
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1260

1261
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1262

1263
    return canonical_https.https_expired_cert
2✔
1264

1265

1266
def is_self_signed_cert(domain):
2✔
1267
    """Check if the domain's canonical endpoint has a self-signed certificate."""
1268
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1269

1270
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1271

1272
    return canonical_https.https_self_signed_cert
2✔
1273

1274

1275
def cert_chain_length(domain):
2✔
1276
    """Get the certificate chain length for a domain's canonical HTTPS endpoint."""
1277
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1278

1279
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1280

1281
    return canonical_https.https_cert_chain_len
2✔
1282

1283

1284
def is_missing_intermediate_cert(domain):
2✔
1285
    """Check if a domain's certificate chain is missing an intermediate certificate.
1286

1287
    Returns whether the served cert chain is probably missing the
1288
    needed intermediate certificate for the canonical HTTPS endpoint.
1289
    """
1290
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1291

1292
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1293

1294
    return canonical_https.https_missing_intermediate_cert
2✔
1295

1296

1297
def is_hsts(domain):
2✔
1298
    """Check if a domain's canonical endpoint has HSTS.
1299

1300
    Domain has HSTS if its canonical HTTPS endpoint has HSTS.
1301
    """
1302
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1303

1304
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1305

1306
    return canonical_https.hsts
2✔
1307

1308

1309
def hsts_header(domain):
2✔
1310
    """Get a domain's canonical endpoint's HSTS header."""
1311
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1312

1313
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1314

1315
    return canonical_https.hsts_header
2✔
1316

1317

1318
def hsts_max_age(domain):
2✔
1319
    """Get a domain's canonical endpoint's HSTS max-age."""
1320
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
2✔
1321

1322
    canonical_https = httpswww if canonical.host == "www" else https
2✔
1323

1324
    return canonical_https.hsts_max_age
2✔
1325

1326

1327
def is_hsts_entire_domain(domain):
2✔
1328
    """Check if a domain's ROOT endpoint HSTS configuration includes all subdomains."""
1329
    https = domain.https
2✔
1330

1331
    return https.hsts_all_subdomains
2✔
1332

1333

1334
def is_hsts_preload_ready(domain):
2✔
1335
    """Check if a domain's ROOT endpoint is HSTS preload-ready."""
1336
    https = domain.https
2✔
1337

1338
    eighteen_weeks = (https.hsts_max_age is not None) and (
2✔
1339
        https.hsts_max_age >= 10886400
1340
    )
1341
    preload_ready = eighteen_weeks and https.hsts_all_subdomains and https.hsts_preload
2✔
1342

1343
    return preload_ready
2✔
1344

1345

1346
def is_hsts_preload_pending(domain):
2✔
1347
    """Check if a domain is pending inclusion in Chrome's HSTS preload list.
1348

1349
    If PRELOAD_PENDING is None, the caches have not been initialized, so do
1350
    that.
1351
    """
1352
    if PRELOAD_PENDING is None:
2!
1353
        logging.error("`PRELOAD_PENDING` has not yet been initialized!")
×
1354
        raise RuntimeError(
×
1355
            "`initialize_external_data()` must be called explicitly before "
1356
            "using this function"
1357
        )
1358

1359
    return domain.domain in PRELOAD_PENDING
2✔
1360

1361

1362
def is_hsts_preloaded(domain):
2✔
1363
    """Check if a domain is contained in Chrome's HSTS preload list.
1364

1365
    If PRELOAD_LIST is None, the caches have not been initialized, so do that.
1366
    """
1367
    if PRELOAD_LIST is None:
2!
1368
        logging.error("`PRELOAD_LIST` has not yet been initialized!")
×
1369
        raise RuntimeError(
×
1370
            "`initialize_external_data()` must be called explicitly before "
1371
            "using this function"
1372
        )
1373

1374
    return domain.domain in PRELOAD_LIST
2✔
1375

1376

1377
def is_parent_hsts_preloaded(domain):
2✔
1378
    """Check if a domain's parent domain is in Chrome's HSTS preload list."""
1379
    return is_hsts_preloaded(Domain(parent_domain_for(domain.domain)))
2✔
1380

1381

1382
def parent_domain_for(hostname):
2✔
1383
    """Get the parent domain for a given domain name.
1384

1385
    For "x.y.domain.gov", return "domain.gov".
1386

1387
    If SUFFIX_LIST is None, the caches have not been initialized, so do that.
1388
    """
1389
    if SUFFIX_LIST is None:
2!
1390
        logging.error("`SUFFIX_LIST` has not yet been initialized!")
×
1391
        raise RuntimeError(
×
1392
            "`initialize_external_data()` must be called explicitly before "
1393
            "using this function"
1394
        )
1395

1396
    return SUFFIX_LIST.get_public_suffix(hostname)
2✔
1397

1398

1399
def is_domain_supports_https(domain):
2✔
1400
    """Check if a domain supports HTTPS.
1401

1402
    A domain 'Supports HTTPS' when it doesn't downgrade and has valid HTTPS,
1403
    or when it doesn't downgrade and has a bad chain but not a bad hostname.
1404
    Domains with a bad chain "support" HTTPS but user-side errors should be expected.
1405
    """
1406
    return (not is_downgrades_https(domain) and is_valid_https(domain)) or (
2✔
1407
        not is_downgrades_https(domain)
1408
        and is_bad_chain(domain)
1409
        and not is_bad_hostname(domain)
1410
    )
1411

1412

1413
def is_domain_enforces_https(domain):
2✔
1414
    """Check if a domain enforces HTTPS.
1415

1416
    A domain that 'Enforces HTTPS' must 'Support HTTPS' and default to
1417
    HTTPS.  For websites (where Redirect is false) they are allowed to
1418
    eventually redirect to an https:// URI. For "redirect domains"
1419
    (domains where the Redirect value is true) they must immediately
1420
    redirect clients to an https:// URI (even if that URI is on
1421
    another domain) in order to be said to enforce HTTPS.
1422
    """
1423
    return (
2✔
1424
        is_domain_supports_https(domain)
1425
        and is_strictly_forces_https(domain)
1426
        and (is_defaults_to_https(domain) or is_http_redirect_domain(domain))
1427
    )
1428

1429

1430
def is_domain_strong_hsts(domain):
2✔
1431
    """Check if a domain is using strong HSTS."""
1432
    if is_hsts(domain) and hsts_max_age(domain):
2!
1433
        return is_hsts(domain) and hsts_max_age(domain) >= 31536000
×
1434
    return None
2✔
1435

1436

1437
def get_domain_ip(domain):
2✔
1438
    """Get the IP for the domain.
1439

1440
    This returns the first that is not None in the following priority:
1441
    Canonical -> HTTPS -> www HTTPS -> www HTTP -> HTTP
1442
    """
1443
    if domain.canonical.ip is not None:
2!
1444
        return domain.canonical.ip
×
1445
    if domain.https.ip is not None:
2!
1446
        return domain.https.ip
×
1447
    if domain.httpswww.ip is not None:
2!
1448
        return domain.httpswww.ip
×
1449
    if domain.httpwww.ip is not None:
2!
1450
        return domain.httpwww.ip
×
1451
    if domain.http.ip is not None:
2!
1452
        return domain.http.ip
×
1453
    return None
2✔
1454

1455

1456
def get_domain_server_header(domain):
2✔
1457
    """Get the Server header from the response for the domain.
1458

1459
    This returns the first that is not None in the following priority:
1460
    Canonical -> HTTPS -> www HTTPS -> www HTTP -> HTTP
1461
    """
1462
    if domain.canonical.server_header is not None:
2!
1463
        return domain.canonical.server_header.replace(",", ";")
×
1464
    if domain.https.server_header is not None:
2!
1465
        return domain.https.server_header.replace(",", ";")
×
1466
    if domain.httpswww.server_header is not None:
2!
1467
        return domain.httpswww.server_header.replace(",", ";")
×
1468
    if domain.httpwww.server_header is not None:
2!
1469
        return domain.httpwww.server_header.replace(",", ";")
×
1470
    if domain.http.server_header is not None:
2!
1471
        return domain.http.server_header.replace(",", ";")
×
1472
    return None
2✔
1473

1474

1475
def get_domain_server_version(domain):
2✔
1476
    """Get the server version for the remote web server.
1477

1478
    This returns the first that is not None in the following priority:
1479
    Canonical -> HTTPS -> www HTTPS -> www HTTP -> HTTP
1480
    The server version is based on the returned Server header.
1481
    """
1482
    if domain.canonical.server_version is not None:
2!
1483
        return domain.canonical.server_version
×
1484
    if domain.https.server_version is not None:
2!
1485
        return domain.https.server_version
×
1486
    if domain.httpswww.server_version is not None:
2!
1487
        return domain.httpswww.server_version
×
1488
    if domain.httpwww.server_version is not None:
2!
1489
        return domain.httpwww.server_version
×
1490
    if domain.http.server_version is not None:
2!
1491
        return domain.http.server_version
×
1492
    return None
2✔
1493

1494

1495
def get_domain_notes(domain):
2✔
1496
    """Combine any notes for a domain."""
1497
    all_notes = (
2✔
1498
        domain.http.notes
1499
        + domain.httpwww.notes
1500
        + domain.https.notes
1501
        + domain.httpswww.notes
1502
    )
1503
    all_notes = all_notes.replace(",", ";")
2✔
1504
    return all_notes
2✔
1505

1506

1507
def did_domain_error(domain):
2✔
1508
    """Check a domain for any unknown errors.
1509

1510
    The main purpose of this is to flag any odd websites for
1511
    further debugging with other tools.
1512
    """
1513
    http, httpwww, https, httpswww = (
2✔
1514
        domain.http,
1515
        domain.httpwww,
1516
        domain.https,
1517
        domain.httpswww,
1518
    )
1519

1520
    return (
2✔
1521
        http.unknown_error
1522
        or httpwww.unknown_error
1523
        or https.unknown_error
1524
        or httpswww.unknown_error
1525
    )
1526

1527

1528
def load_preload_pending():
2✔
1529
    """Fetch the Chrome preload pending list."""
1530
    utils.debug("Fetching hstspreload.org pending list...", divider=True)
×
1531
    pending_url = "https://hstspreload.org/api/v2/pending"
×
1532

1533
    try:
×
1534
        request = requests.get(pending_url)
×
1535
    except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err:
×
1536
        logging.exception("Failed to fetch pending preload list: %s", pending_url)
×
1537
        logging.debug(err)
×
1538
        return []
×
1539

1540
    # TODO: abstract Py 2/3 check out to utils
1541
    if sys.version_info[0] < 3:
×
1542
        raw = request.content
×
1543
    else:
1544
        raw = str(request.content, "utf-8")
×
1545

1546
    pending_json = json.loads(raw)
×
1547

1548
    pending = []
×
1549
    for entry in pending_json:
×
1550
        if entry.get("include_subdomains", False) is True:
×
1551
            pending.append(entry["name"])
×
1552

1553
    return pending
×
1554

1555

1556
def load_preload_list():
2✔
1557
    """Download and load the Chromium preload list."""
1558
    preload_json = None
×
1559

1560
    utils.debug("Fetching Chrome preload list from source...", divider=True)
×
1561

1562
    # Downloads the chromium preloaded domain list and sets it to a global set
1563
    file_url = "https://chromium.googlesource.com/chromium/src/+/main/net/http/transport_security_state_static.json?format=TEXT"
×
1564

1565
    try:
×
1566
        request = requests.get(file_url)
×
1567
    except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err:
×
1568
        logging.exception("Failed to fetch preload list: %s", file_url)
×
1569
        logging.debug(err)
×
1570
        return []
×
1571

1572
    raw = request.content
×
1573

1574
    # To avoid parsing the contents of the file out of the source tree viewer's
1575
    # HTML, we download it as a raw file. googlesource.com Base64-encodes the
1576
    # file to avoid potential content injection issues, so we need to decode it
1577
    # before using it. https://code.google.com/p/gitiles/issues/detail?id=7
1578
    raw = base64.b64decode(raw).decode("utf-8")
×
1579

1580
    # The .json file contains '//' comments, which are not actually valid JSON,
1581
    # and confuse Python's JSON decoder. Begone, foul comments!
1582
    raw = "".join([re.sub(r"^\s*//.*$", "", line) for line in raw.splitlines()])
×
1583

1584
    preload_json = json.loads(raw)
×
1585

1586
    # For our purposes, we only care about entries that includeSubDomains
1587
    fully_preloaded = []
×
1588
    for entry in preload_json["entries"]:
×
1589
        if entry.get("include_subdomains", False) is True:
×
1590
            fully_preloaded.append(entry["name"])
×
1591

1592
    return fully_preloaded
×
1593

1594

1595
# Returns an instantiated PublicSuffixList object.
1596
def load_suffix_list(cache_suffix_list=None, update_list=False):
2✔
1597
    """Download and load the public suffix list."""
1598
    if update_list:
×
1599
        utils.debug("Downloading the Public Suffix List...", divider=True)
×
1600
        try:
×
1601
            # Update the local copy
1602
            if cache_suffix_list:
×
1603
                updatePSL(cache_suffix_list)
×
1604
            # Update the built-in copy
1605
            else:
1606
                updatePSL()
×
1607
        except Exception as err:
×
1608
            logging.exception("Unable to download the Public Suffix List...")
×
1609
            utils.debug(err)
×
1610
            return None
×
1611

1612
    # Use the local copy
1613
    if cache_suffix_list:
×
1614
        utils.debug("Using cached Public Suffix List.", divider=True)
×
1615
        with codecs.open(cache_suffix_list, encoding="utf-8") as cache_file:
×
1616
            suffixes = PublicSuffixList(cache_file)
×
1617
    # Use the built-in copy
1618
    else:
1619
        suffixes = PublicSuffixList()
×
1620

1621
    return suffixes
×
1622

1623

1624
def initialize_external_data(
2✔
1625
    init_preload_list=None, init_preload_pending=None, init_suffix_list=None
1626
):
1627
    """Load any third party external data.
1628

1629
    This can be called explicitly by a library, as part of the setup needed
1630
    before calling other library functions, or called as part of running
1631
    inspect_domains() or CLI operation.
1632

1633
    If values are passed in to this function, they will be assigned to
1634
    be the cached values. This allows a caller of the Python API to manage
1635
    cached data in a customized way.
1636

1637
    It also potentially allows clients to pass in subsets of these lists,
1638
    for testing or novel performance reasons.
1639

1640
    Otherwise, if the --cache-third-parties=[DIR] flag specifies a directory,
1641
    all downloaded third party data will be cached in a directory, and
1642
    used from cache on the next pshtt run instead of hitting the network.
1643

1644
    If no values are passed in, and no --cache-third-parties flag is used,
1645
    then no cached third party data will be created or used, and pshtt will
1646
    download the latest data from those third party sources.
1647
    """
1648
    global PRELOAD_LIST, PRELOAD_PENDING, SUFFIX_LIST
1649

1650
    # The preload list should be sent in as a list of domains.
1651
    if init_preload_list is not None:
×
1652
        PRELOAD_LIST = init_preload_list
×
1653

1654
    # The PRELOAD_PENDING list should be sent in as a list of domains.
1655
    if init_preload_pending is not None:
×
1656
        PRELOAD_PENDING = init_preload_pending
×
1657

1658
    # The public suffix list should be sent in as a list of file lines.
1659
    if init_suffix_list is not None:
×
1660
        SUFFIX_LIST = PublicSuffixList(init_suffix_list)
×
1661

1662
    # If there's a specified cache dir, prepare paths.
1663
    # Only used when no data has been set yet for a source.
1664
    if THIRD_PARTIES_CACHE:
×
1665
        cache_preload_list = os.path.join(
×
1666
            THIRD_PARTIES_CACHE, CACHE_PRELOAD_LIST_DEFAULT
1667
        )
1668
        cache_preload_pending = os.path.join(
×
1669
            THIRD_PARTIES_CACHE, CACHE_PRELOAD_PENDING_DEFAULT
1670
        )
1671
        cache_suffix_list = os.path.join(THIRD_PARTIES_CACHE, CACHE_SUFFIX_LIST_DEFAULT)
×
1672
    else:
1673
        cache_preload_list, cache_preload_pending, cache_suffix_list = None, None, None
×
1674

1675
    # Load Chrome's latest versioned HSTS preload list.
1676
    if PRELOAD_LIST is None:
×
1677
        if cache_preload_list and os.path.exists(cache_preload_list):
×
1678
            utils.debug("Using cached Chrome preload list.", divider=True)
×
1679
            with open(cache_preload_list, encoding="utf-8") as cache_file:
×
1680
                PRELOAD_LIST = json.loads(cache_file.read())
×
1681
        else:
1682
            PRELOAD_LIST = load_preload_list()
×
1683

1684
            if cache_preload_list:
×
1685
                utils.debug(
×
1686
                    "Caching preload list at %s", cache_preload_list, divider=True
1687
                )
1688
                utils.write(utils.json_for(PRELOAD_LIST), cache_preload_list)
×
1689

1690
    # Load Chrome's current HSTS pending preload list.
1691
    if PRELOAD_PENDING is None:
×
1692
        if cache_preload_pending and os.path.exists(cache_preload_pending):
×
1693
            utils.debug("Using cached hstspreload.org pending list.", divider=True)
×
1694
            with open(cache_preload_pending, encoding="utf-8") as cache_file:
×
1695
                PRELOAD_PENDING = json.loads(cache_file.read())
×
1696
        else:
1697
            PRELOAD_PENDING = load_preload_pending()
×
1698

1699
            if cache_preload_pending:
×
1700
                utils.debug(
×
1701
                    "Caching preload pending list at %s",
1702
                    cache_preload_pending,
1703
                    divider=True,
1704
                )
1705
                utils.write(utils.json_for(PRELOAD_PENDING), cache_preload_pending)
×
1706

1707
    # Load Mozilla's current Public Suffix list.
1708
    if SUFFIX_LIST is None:
×
1709
        if cache_suffix_list:
×
1710
            # Retrieve the list if the path does not exist otherwise use the cached copy
1711
            SUFFIX_LIST = load_suffix_list(
×
1712
                cache_suffix_list, not os.path.exists(cache_suffix_list)
1713
            )
1714
        else:
1715
            # Load the built-in PSL
1716
            SUFFIX_LIST = load_suffix_list()
×
1717

1718

1719
def inspect_domains(domains, options):
2✔
1720
    """Run inspect() against each of the given domains with the given options."""
1721
    # Override timeout, user agent, preload cache, default CA bundle
1722
    global TIMEOUT, USER_AGENT, THIRD_PARTIES_CACHE, CA_FILE, PT_INT_CA_FILE, STORE
1723

1724
    if options.get("timeout"):
×
1725
        TIMEOUT = int(options["timeout"])
×
1726
    if options.get("user_agent"):
×
1727
        USER_AGENT = options["user_agent"]
×
1728

1729
    # Supported cache flag, a directory to store all third party requests.
1730
    if options.get("cache-third-parties"):
×
1731
        THIRD_PARTIES_CACHE = options["cache-third-parties"]
×
1732

1733
    if options.get("ca_file"):
×
1734
        CA_FILE = options["ca_file"]
×
1735
        # By default, the store that we want to check is the Mozilla store
1736
        # However, if a user wants to use their own CA bundle, check the
1737
        # "Custom" Option from the sslyze output.
1738
        STORE = "Custom"
×
1739

1740
    if options.get("pt_int_ca_file"):
×
1741
        PT_INT_CA_FILE = options["pt_int_ca_file"]
×
1742

1743
    # If this has been run once already by a Python API client, it
1744
    # can be safely run without hitting the network or disk again,
1745
    # and without overriding the data the Python user set for them.
1746
    initialize_external_data()
×
1747

1748
    # For every given domain, get inspect data.
1749
    for domain in domains:
×
1750
        yield inspect(domain)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc