• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / pshtt / 5090121082

pending completion
5090121082

Pull #214

github

GitHub
Merge edb863eae into 6680b13c3
Pull Request #214: Adds support for sslyze>=3.0.0

54 of 303 branches covered (17.82%)

Branch coverage included in aggregate %.

6 of 72 new or added lines in 1 file covered. (8.33%)

2 existing lines in 1 file now uncovered.

361 of 886 relevant lines covered (40.74%)

1.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.65
/src/pshtt/pshtt.py
1
"""Provide the core functionality of the pshtt library."""
2

3
# Standard Python Libraries
4
import base64
3✔
5
import codecs
3✔
6
import datetime
3✔
7
import json
3✔
8
import logging
3✔
9
import os
3✔
10
from pathlib import Path  # Python3
3✔
11
import re
3✔
12
import sys
3✔
13
from urllib import parse as urlparse
3✔
14

15
# Third-Party Libraries
16
import OpenSSL
3✔
17
from publicsuffixlist.compat import PublicSuffixList  # type: ignore
3✔
18
from publicsuffixlist.update import updatePSL  # type: ignore
3✔
19
import requests
3✔
20
from sslyze import (  # type: ignore
3✔
21
    Scanner,
22
    ServerConnectivityTester,
23
    ServerNetworkLocationViaDirectConnection,
24
    ServerScanRequest,
25
)
26
from sslyze.errors import ConnectionToServerFailed  # type: ignore
3✔
27
from sslyze.plugins.certificate_info.implementation import (  # type: ignore
3✔
28
    CertificateInfoExtraArguments,
29
)
30
from sslyze.plugins.scan_commands import ScanCommand  # type: ignore
3✔
31
import urllib3
3✔
32

33
from . import utils
3✔
34
from .models import Domain, Endpoint
3✔
35

36
# We're going to be making requests with certificate validation
37
# disabled.  Commented next line due to pylint warning that urllib3 is
38
# not in requests.packages
39
# requests.packages.urllib3.disable_warnings()
40
urllib3.disable_warnings()
3✔
41

42
# Default, overrideable via --user-agent
43
USER_AGENT = "pshtt, https scanning"
3✔
44

45
# Defaults to 5 second, overrideable via --timeout
46
TIMEOUT = 5
3✔
47

48
# The fields we're collecting, will be keys in JSON and
49
# column headers in CSV.
50
HEADERS = [
3✔
51
    "Domain",
52
    "Base Domain",
53
    "Canonical URL",
54
    "Live",
55
    "HTTPS Live",
56
    "HTTPS Full Connection",
57
    "HTTPS Client Auth Required",
58
    "Redirect",
59
    "Redirect To",
60
    "Valid HTTPS",
61
    "HTTPS Publicly Trusted",
62
    "HTTPS Custom Truststore Trusted",
63
    "Defaults to HTTPS",
64
    "Downgrades HTTPS",
65
    "Strictly Forces HTTPS",
66
    "HTTPS Bad Chain",
67
    "HTTPS Bad Hostname",
68
    "HTTPS Expired Cert",
69
    "HTTPS Self Signed Cert",
70
    "HSTS",
71
    "HSTS Header",
72
    "HSTS Max Age",
73
    "HSTS Entire Domain",
74
    "HSTS Preload Ready",
75
    "HSTS Preload Pending",
76
    "HSTS Preloaded",
77
    "Base Domain HSTS Preloaded",
78
    "Domain Supports HTTPS",
79
    "Domain Enforces HTTPS",
80
    "Domain Uses Strong HSTS",
81
    "IP",
82
    "Server Header",
83
    "Server Version",
84
    "HTTPS Cert Chain Length",
85
    "HTTPS Probably Missing Intermediate Cert",
86
    "Notes",
87
    "Unknown Error",
88
]
89

90
# Used for caching the HSTS preload list from Chromium's source.
91
CACHE_PRELOAD_LIST_DEFAULT = "preloaded.json"
3✔
92
PRELOAD_LIST = None
3✔
93

94
# Used for caching the HSTS pending preload list from hstspreload.org.
95
CACHE_PRELOAD_PENDING_DEFAULT = "preload-pending.json"
3✔
96
PRELOAD_PENDING = None
3✔
97

98
# Used for determining base domain via Mozilla's public suffix list.
99
CACHE_SUFFIX_LIST_DEFAULT = "public-suffix-list.txt"
3✔
100
SUFFIX_LIST = None
3✔
101

102
# Directory to cache all third party responses, if set by user.
103
THIRD_PARTIES_CACHE = None
3✔
104

105
# Set if user wants to use a custom CA bundle
106
CA_FILE = None
3✔
107
STORE = "Mozilla"
3✔
108
PT_INT_CA_FILE = None
3✔
109

110

111
def inspect(base_domain):
3✔
112
    """Inpsect the provided domain."""
113
    domain = Domain(base_domain)
×
114
    domain.http = Endpoint("http", "root", base_domain)
×
115
    domain.httpwww = Endpoint("http", "www", base_domain)
×
116
    domain.https = Endpoint("https", "root", base_domain)
×
117
    domain.httpswww = Endpoint("https", "www", base_domain)
×
118

119
    # Analyze HTTP endpoint responsiveness and behavior.
120
    basic_check(domain.http)
×
121
    basic_check(domain.httpwww)
×
122
    basic_check(domain.https)
×
123
    basic_check(domain.httpswww)
×
124

125
    # Analyze HSTS header, if present, on each HTTPS endpoint.
126
    hsts_check(domain.https)
×
127
    hsts_check(domain.httpswww)
×
128

129
    return result_for(domain)
×
130

131

132
def result_for(domain):
3✔
133
    """Get the results for the provided domain."""
134
    # print(utils.json_for(domain.to_object()))
135

136
    # Because it will inform many other judgments, first identify
137
    # an acceptable "canonical" URL for the domain.
138
    domain.canonical = canonical_endpoint(
3✔
139
        domain.http, domain.httpwww, domain.https, domain.httpswww
140
    )
141

142
    # First, the basic fields the CSV will use.
143
    result = {
3✔
144
        "Domain": domain.domain,
145
        "Base Domain": parent_domain_for(domain.domain),
146
        "Canonical URL": domain.canonical.url,
147
        "Live": is_live(domain),
148
        "Redirect": is_redirect_domain(domain),
149
        "Redirect To": redirects_to(domain),
150
        "HTTPS Live": is_https_live(domain),
151
        "HTTPS Full Connection": is_full_connection(domain),
152
        "HTTPS Client Auth Required": is_client_auth_required(domain),
153
        "Valid HTTPS": is_valid_https(domain),
154
        "HTTPS Publicly Trusted": is_publicly_trusted(domain),
155
        "HTTPS Custom Truststore Trusted": is_custom_trusted(domain),
156
        "Defaults to HTTPS": is_defaults_to_https(domain),
157
        "Downgrades HTTPS": is_downgrades_https(domain),
158
        "Strictly Forces HTTPS": is_strictly_forces_https(domain),
159
        "HTTPS Bad Chain": is_bad_chain(domain),
160
        "HTTPS Bad Hostname": is_bad_hostname(domain),
161
        "HTTPS Expired Cert": is_expired_cert(domain),
162
        "HTTPS Self Signed Cert": is_self_signed_cert(domain),
163
        "HTTPS Cert Chain Length": cert_chain_length(domain),
164
        "HTTPS Probably Missing Intermediate Cert": is_missing_intermediate_cert(
165
            domain
166
        ),
167
        "HSTS": is_hsts(domain),
168
        "HSTS Header": hsts_header(domain),
169
        "HSTS Max Age": hsts_max_age(domain),
170
        "HSTS Entire Domain": is_hsts_entire_domain(domain),
171
        "HSTS Preload Ready": is_hsts_preload_ready(domain),
172
        "HSTS Preload Pending": is_hsts_preload_pending(domain),
173
        "HSTS Preloaded": is_hsts_preloaded(domain),
174
        "Base Domain HSTS Preloaded": is_parent_hsts_preloaded(domain),
175
        "Domain Supports HTTPS": is_domain_supports_https(domain),
176
        "Domain Enforces HTTPS": is_domain_enforces_https(domain),
177
        "Domain Uses Strong HSTS": is_domain_strong_hsts(domain),
178
        "IP": get_domain_ip(domain),
179
        "Server Header": get_domain_server_header(domain),
180
        "Server Version": get_domain_server_version(domain),
181
        "Notes": get_domain_notes(domain),
182
        "Unknown Error": did_domain_error(domain),
183
    }
184

185
    # But also capture the extended data for those who want it.
186
    result["endpoints"] = domain.to_object()
3✔
187

188
    # This bit is complicated because of the continue statements,
189
    # perhaps overly so.  For instance, the continue statement
190
    # following the "if header in ..." statement after "if not
191
    # result['HTTPS Full Connection]" means that the final if
192
    # statement that sets None values to False does not apply to those
193
    # fields.  This code should be rewritten to more clear, or at
194
    # least commented so that it is clearer what is happening to the
195
    # various fields.  There is some implied logic due to the continue
196
    # statements that is tricky, at least at first glance.
197
    #
198
    # Also, the comment before "for header in HEADERS" is not accurate
199
    # for the same reason.
200
    #
201
    # - jsf9k
202

203
    # Convert Header fields from None to False, except for:
204
    # - "HSTS Header"
205
    # - "HSTS Max Age"
206
    # - "Redirect To"
207
    for header in HEADERS:
3✔
208
        if header in ("HSTS Header", "HSTS Max Age", "Redirect To"):
3✔
209
            continue
3✔
210

211
        if not result["HTTPS Full Connection"]:
3!
212
            if header in (
3✔
213
                "HSTS",
214
                "HSTS Header",
215
                "HSTS Max Age",
216
                "HSTS Entire Domain",
217
                "HSTS Preload Ready",
218
                "Domain Uses Strong HSTS",
219
            ):
220
                continue
3✔
221

222
        if (
3✔
223
            header
224
            in ("IP", "Server Header", "Server Version", "HTTPS Cert Chain Length")
225
            and result[header] is None
226
        ):
227
            continue
3✔
228

229
        if header in (
3✔
230
            "Valid HTTPS",
231
            "HTTPS Publicly Trusted",
232
            "HTTPS Custom Truststore Trusted",
233
        ):
234
            if not result["HTTPS Live"]:
3!
235
                result[header] = False
3✔
236
            continue
3✔
237

238
        if result[header] is None:
3✔
239
            result[header] = False
3✔
240

241
    return result
3✔
242

243

244
def ping(url, allow_redirects=False, verify=True):
3✔
245
    """Attempt to reach the given URL.
246

247
    If there is a custom CA file and we want to verify
248
    use that instead when pinging with requests
249

250
    By changing the verify param from a boolean to a .pem file, the
251
    requests module will use the .pem to validate HTTPS connections.
252

253
    Note that we are using the streaming variant of the
254
    python-requests library here and we are not actually reading the
255
    content of the request.  As a result, the close() method MUST be
256
    called on the Request object returned by this method.  That is the
257
    ONLY way the connection can be closed and released back into the
258
    pool.  One way to ensure this happens is to use the "with" Python
259
    construct.
260

261
    If we ever begin reading response bodies, they will need to be
262
    explicitly read from Response.content, and we will also want to
263
    use conditional logic to read from response bodies where they
264
    exist and are useful. We'll also need to watch for Content-Type
265
    values like multipart/x-mixed-replace;boundary=ffserver that
266
    indicate that the response body will stream indefinitely.
267
    """
268
    if CA_FILE and verify:
×
269
        verify = CA_FILE
×
270

271
    return requests.get(
×
272
        url,
273
        allow_redirects=allow_redirects,
274
        # Validate certificates.
275
        verify=verify,
276
        # Setting this to true delays the retrieval of the content
277
        # until we access Response.content.  Since we aren't
278
        # interested in the actual content of the request, this will
279
        # save us time and bandwidth.
280
        #
281
        # This will also stop pshtt from hanging on URLs that stream
282
        # neverending data, like webcams.  See issue #138:
283
        # https://github.com/dhs-ncats/pshtt/issues/138
284
        stream=True,
285
        # set by --user_agent
286
        headers={"User-Agent": USER_AGENT},
287
        # set by --timeout
288
        timeout=TIMEOUT,
289
    )
290

291

292
def basic_check(endpoint):
3✔
293
    """Test the endpoint.
294

295
    At first:
296
    * Don't follow redirects. (Will only follow if necessary.)
297
      If it's a 3XX, we'll ping again to follow redirects. This is
298
      necessary to reliably scope any errors (e.g. TLS errors) to
299
      the original endpoint.
300

301
    * Validate certificates. (Will figure out error if necessary.)
302
    """
303
    utils.debug("Pinging %s...", endpoint.url, divider=True)
×
304

305
    req = None
×
306

307
    try:
×
308
        with ping(endpoint.url) as req:
×
309
            endpoint.live = True
×
310
            if endpoint.protocol == "https":
×
311
                endpoint.https_full_connection = True
×
312
                endpoint.https_valid = True
×
313

314
    except requests.exceptions.SSLError as err:
×
315
        if "bad handshake" in str(err) and (
×
316
            "sslv3 alert handshake failure" in str(err) or "Unexpected EOF" in str(err)
317
        ):
318
            logging.exception(
×
319
                "%s: Error completing TLS handshake usually due to required client authentication.",
320
                endpoint.url,
321
            )
322
            utils.debug("%s: %s", endpoint.url, err)
×
323
            endpoint.live = True
×
324
            if endpoint.protocol == "https":
×
325
                # The https can still be valid with a handshake error,
326
                # sslyze will run later and check if it is not valid
327
                endpoint.https_valid = True
×
328
                endpoint.https_full_connection = False
×
329

330
        else:
331
            logging.exception(
×
332
                "%s: Error connecting over SSL/TLS or validating certificate.",
333
                endpoint.url,
334
            )
335
            utils.debug("%s: %s", endpoint.url, err)
×
336
            # Retry with certificate validation disabled.
337
            try:
×
338
                with ping(endpoint.url, verify=False) as req:
×
339
                    endpoint.live = True
×
340
                    if endpoint.protocol == "https":
×
341
                        endpoint.https_full_connection = True
×
342
                        # sslyze later will actually check if the cert is valid
343
                        endpoint.https_valid = True
×
344
            except requests.exceptions.SSLError as err:
×
345
                # If it's a protocol error or other, it's not a full connection,
346
                # but it is live.
347
                endpoint.live = True
×
348
                if endpoint.protocol == "https":
×
349
                    endpoint.https_full_connection = False
×
350
                    # HTTPS may still be valid, sslyze will double-check later
351
                    endpoint.https_valid = True
×
352
                logging.exception(
×
353
                    "%s: Unexpected SSL protocol (or other) error during retry.",
354
                    endpoint.url,
355
                )
356
                utils.debug("%s: %s", endpoint.url, err)
×
357
                # continue on to SSLyze to check the connection
358
            except requests.exceptions.RequestException as err:
×
359
                endpoint.live = False
×
360
                logging.exception(
×
361
                    "%s: Unexpected requests exception during retry.", endpoint.url
362
                )
363
                utils.debug("%s: %s", endpoint.url, err)
×
364
                return
×
365
            except OpenSSL.SSL.Error as err:
×
366
                endpoint.live = False
×
367
                logging.exception(
×
368
                    "%s: Unexpected OpenSSL exception during retry.", endpoint.url
369
                )
370
                utils.debug("%s: %s", endpoint.url, err)
×
371
                return
×
372
            except Exception as err:
×
373
                endpoint.unknown_error = True
×
374
                logging.exception(
×
375
                    "%s: Unexpected other unknown exception during requests retry.",
376
                    endpoint.url,
377
                )
378
                utils.debug("%s: %s", endpoint.url, err)
×
379
                return
×
380

381
        # If it was a certificate error of any kind, it's live,
382
        # unless SSLyze encounters a connection error later
383
        endpoint.live = True
×
384

385
    except requests.exceptions.ConnectionError as err:
×
386
        # We can get this for some endpoints that are actually live,
387
        # so if it's https let's try sslyze to be sure
388
        if endpoint.protocol == "https":
×
389
            # https check later will set whether the endpoint is live and valid
390
            endpoint.https_full_connection = False
×
391
            endpoint.https_valid = True
×
392
        else:
393
            endpoint.live = False
×
394
        logging.exception("%s: Error connecting.", endpoint.url)
×
395
        utils.debug("%s: %s", endpoint.url, err)
×
396

397
    # And this is the parent of ConnectionError and other things.
398
    # For example, "too many redirects".
399
    # See https://github.com/kennethreitz/requests/blob/master/requests/exceptions.py
400
    except requests.exceptions.RequestException as err:
×
401
        endpoint.live = False
×
402
        logging.exception("%s: Unexpected other requests exception.", endpoint.url)
×
403
        utils.debug("%s: %s", endpoint.url, err)
×
404
        return
×
405

406
    except Exception as err:
×
407
        endpoint.unknown_error = True
×
408
        logging.exception(
×
409
            "%s: Unexpected other unknown exception during initial request.",
410
            endpoint.url,
411
        )
412
        utils.debug("%s: %s", endpoint.url, err)
×
413
        return
×
414

415
    # Run SSLyze to see if there are any errors
416
    if endpoint.protocol == "https":
×
417
        https_check(endpoint)
×
418
        # Double-check in case sslyze failed the first time, but the regular conneciton succeeded
419
        if endpoint.live is False and req is not None:
×
420
            logging.warning(
×
421
                "%s: Trying sslyze again since it connected once already.", endpoint.url
422
            )
423
            endpoint.live = True
×
424
            endpoint.https_valid = True
×
425
            https_check(endpoint)
×
426
            if endpoint.live is False:
×
427
                # sslyze failed so back everything out and don't continue analyzing the existing response
428
                req = None
×
429
                endpoint.https_valid = False
×
430
                endpoint.https_full_connection = False
×
431

432
    if req is None:
×
433
        # Ensure that full_connection is set to False if we didn't get a response
434
        if endpoint.protocol == "https":
×
435
            endpoint.https_full_connection = False
×
436
        return
×
437

438
    # try to get IP address if we can
439
    try:
×
440
        if req.raw.closed is False:
×
441
            ip = req.raw._connection.sock.socket.getpeername()[0]
×
442
            if endpoint.ip is None:
×
443
                endpoint.ip = ip
×
444
            else:
445
                if endpoint.ip != ip:
×
446
                    utils.debug(
×
447
                        "%s: Endpoint IP is already %s, but requests IP is %s.",
448
                        endpoint.url,
449
                        endpoint.ip,
450
                        ip,
451
                    )
452
    except Exception:
×
453
        # if the socket has already closed, it will throw an exception, but this is just best effort, so ignore it
454
        logging.exception("Error closing socket")
×
455

456
    # Endpoint is live, analyze the response.
457
    endpoint.headers = req.headers
×
458

459
    endpoint.status = req.status_code
×
460

461
    if req.headers.get("Server") is not None:
×
462
        endpoint.server_header = req.headers.get("Server")
×
463
        # *** in the future add logic to convert header to server version if known
464

465
    if (req.headers.get("Location") is not None) and str(endpoint.status).startswith(
×
466
        "3"
467
    ):
468
        endpoint.redirect = True
×
469
        logging.warning("%s: Found redirect.", endpoint.url)
×
470

471
    if endpoint.redirect:
×
472
        try:
×
473
            location_header = req.headers.get("Location")
×
474
            # Absolute redirects (e.g. "https://example.com/Index.aspx")
475
            if location_header.startswith("http:") or location_header.startswith(
×
476
                "https:"
477
            ):
478
                immediate = location_header
×
479

480
            # Relative redirects (e.g. "Location: /Index.aspx").
481
            # Construct absolute URI, relative to original request.
482
            else:
483
                immediate = urlparse.urljoin(endpoint.url, location_header)
×
484

485
            # Chase down the ultimate destination, ignoring any certificate warnings.
486
            ultimate_req = None
×
487
        except Exception as err:
×
488
            endpoint.unknown_error = True
×
489
            logging.exception(
×
490
                "%s: Unexpected other unknown exception when handling Requests Header.",
491
                endpoint.url,
492
            )
493
            utils.debug("%s %s", endpoint.url, err)
×
494

495
        try:
×
496
            with ping(endpoint.url, allow_redirects=True, verify=False) as ultimate_req:
×
497
                pass
×
498
        except (requests.exceptions.RequestException, OpenSSL.SSL.Error):
×
499
            # Swallow connection errors, but we won't be saving redirect info.
500
            logging.exception("Connection error")
×
501
        except Exception as err:
×
502
            endpoint.unknown_error = True
×
503
            logging.exception(
×
504
                "%s: Unexpected other unknown exception when handling redirect.",
505
                endpoint.url,
506
            )
507
            utils.debug("%s: %s", endpoint.url, err)
×
508
            return
×
509

510
        try:
×
511
            # Now establish whether the redirects were:
512
            # * internal (same exact hostname),
513
            # * within the zone (any subdomain within the parent domain)
514
            # * external (on some other parent domain)
515

516
            # The hostname of the endpoint (e.g. "www.agency.gov")
517
            subdomain_original = urlparse.urlparse(endpoint.url).hostname
×
518
            # The parent domain of the endpoint (e.g. "agency.gov")
519
            base_original = parent_domain_for(subdomain_original)
×
520

521
            # The hostname of the immediate redirect.
522
            # The parent domain of the immediate redirect.
523
            subdomain_immediate = urlparse.urlparse(immediate).hostname
×
524
            base_immediate = parent_domain_for(subdomain_immediate)
×
525

526
            endpoint.redirect_immediately_to = immediate
×
527
            endpoint.redirect_immediately_to_https = immediate.startswith("https://")
×
528
            endpoint.redirect_immediately_to_http = immediate.startswith("http://")
×
529
            endpoint.redirect_immediately_to_external = base_original != base_immediate
×
530
            endpoint.redirect_immediately_to_subdomain = (
×
531
                base_original == base_immediate
532
            ) and (subdomain_original != subdomain_immediate)
533

534
            # We're interested in whether an endpoint redirects to the www version
535
            # of itself (not whether it redirects to www prepended to any other
536
            # hostname, even within the same parent domain).
537
            endpoint.redirect_immediately_to_www = subdomain_immediate == (
×
538
                f"www.{subdomain_original}"
539
            )
540

541
            if ultimate_req is not None:
×
542
                # For ultimate destination, use the URL we arrived at,
543
                # not Location header. Auto-resolves relative redirects.
544
                eventual = ultimate_req.url
×
545

546
                # The hostname of the eventual destination.
547
                # The parent domain of the eventual destination.
548
                subdomain_eventual = urlparse.urlparse(eventual).hostname
×
549
                base_eventual = parent_domain_for(subdomain_eventual)
×
550

551
                endpoint.redirect_eventually_to = eventual
×
552
                endpoint.redirect_eventually_to_https = eventual.startswith("https://")
×
553
                endpoint.redirect_eventually_to_http = eventual.startswith("http://")
×
554
                endpoint.redirect_eventually_to_external = (
×
555
                    base_original != base_eventual
556
                )
557
                endpoint.redirect_eventually_to_subdomain = (
×
558
                    base_original == base_eventual
559
                ) and (subdomain_original != subdomain_eventual)
560

561
            # If we were able to make the first redirect, but not the ultimate redirect,
562
            # and if the immediate redirect is external, then it's accurate enough to
563
            # say that the eventual redirect is the immediate redirect, since you're capturing
564
            # the domain it's going to.
565
            # This also avoids "punishing" the domain for configuration issues of the site
566
            # it redirects to.
567
            elif endpoint.redirect_immediately_to_external:
×
568
                endpoint.redirect_eventually_to = endpoint.redirect_immediately_to
×
569
                endpoint.redirect_eventually_to_https = (
×
570
                    endpoint.redirect_immediately_to_https
571
                )
572
                endpoint.redirect_eventually_to_http = (
×
573
                    endpoint.redirect_immediately_to_http
574
                )
575
                endpoint.redirect_eventually_to_external = (
×
576
                    endpoint.redirect_immediately_to_external
577
                )
578
                endpoint.redirect_eventually_to_subdomain = (
×
579
                    endpoint.redirect_immediately_to_subdomain
580
                )
581
        except Exception as err:
×
582
            endpoint.unknown_error = True
×
583
            logging.exception(
×
584
                "%s: Unexpected other unknown exception when establishing redirects.",
585
                endpoint.url,
586
            )
587
            utils.debug("%s: %s", endpoint.url, err)
×
588

589

590
def hsts_check(endpoint):
3✔
591
    """Perform an HSTS check of the given endpoint.
592

593
    Given an endpoint and its detected headers, extract and parse
594
    any present HSTS header, decide what HSTS properties are there.
595

596
    Disqualify domains with a bad host, they won't work as valid HSTS.
597
    """
598
    try:
×
599
        if endpoint.https_bad_hostname:
×
600
            endpoint.hsts = False
×
601
            return
×
602

603
        header = endpoint.headers.get("Strict-Transport-Security")
×
604

605
        if header is None:
×
606
            endpoint.hsts = False
×
607
            return
×
608

609
        endpoint.hsts = True
×
610
        endpoint.hsts_header = header
×
611

612
        # Set max age to the string after max-age
613
        # TODO: make this more resilient to pathological HSTS headers.
614

615
        # handle multiple HSTS headers, requests comma-separates them
616
        first_pass = re.split(r",\s?", header)[0]
×
617
        second_pass = re.sub(r"\'", "", first_pass)
×
618

619
        temp = re.split(r";\s?", second_pass)
×
620

621
        if "max-age" in header.lower():
×
622
            endpoint.hsts_max_age = int(temp[0][len("max-age=") :])
×
623

624
        if endpoint.hsts_max_age is None or endpoint.hsts_max_age <= 0:
×
625
            endpoint.hsts = False
×
626
            return
×
627

628
        # check if hsts includes sub domains
629
        if "includesubdomains" in header.lower():
×
630
            endpoint.hsts_all_subdomains = True
×
631

632
        # Check is hsts has the preload flag
633
        if "preload" in header.lower():
×
634
            endpoint.hsts_preload = True
×
635
    except Exception as err:
×
636
        endpoint.unknown_error = True
×
637
        logging.exception(
×
638
            "%s: Unknown exception when handling HSTS check.", endpoint.url
639
        )
640
        utils.debug("%s: %s", endpoint.url, err)
×
641
        return
×
642

643

644
def https_check(endpoint):
3✔
645
    """Use sslyze to figure out the reason an endpoint failed to verify."""
646
    utils.debug("sslyzing %s...", endpoint.url)
×
647

648
    # remove the https:// from prefix for sslyze
649
    try:
×
650
        hostname = endpoint.url[8:]
×
NEW
651
        server_location = (
×
652
            ServerNetworkLocationViaDirectConnection.with_ip_address_lookup(
653
                hostname=hostname, port=443
654
            )
655
        )
NEW
656
        server_tester = ServerConnectivityTester()
×
NEW
657
        server_info = server_tester.perform(server_location)
×
658
        endpoint.live = True
×
NEW
659
        ip = server_location.ip_address
×
660
        if endpoint.ip is None:
×
661
            endpoint.ip = ip
×
662
        else:
663
            if endpoint.ip != ip:
×
664
                utils.debug(
×
665
                    "%s: Endpoint IP is already %s, but requests IP is %s.",
666
                    endpoint.url,
667
                    endpoint.ip,
668
                    ip,
669
                )
NEW
670
        if server_info.tls_probing_result.client_auth_requirement.name == "REQUIRED":
×
671
            endpoint.https_client_auth_required = True
×
672
            logging.warning("%s: Client Authentication REQUIRED", endpoint.url)
×
NEW
673
    except ConnectionToServerFailed as err:
×
674
        endpoint.live = False
×
675
        endpoint.https_valid = False
×
676
        logging.exception(
×
677
            "%s: Error in sslyze server connectivity check when connecting to %s",
678
            endpoint.url,
679
            err.server_location.hostname,
680
        )
681
        utils.debug("%s: %s", endpoint.url, err)
×
682
        return
×
683
    except Exception as err:
×
684
        endpoint.unknown_error = True
×
685
        logging.exception(
×
686
            "%s: Unknown exception in sslyze server connectivity check.", endpoint.url
687
        )
688
        utils.debug("%s: %s", endpoint.url, err)
×
689
        return
×
690

691
    try:
×
692
        cert_plugin_result = None
×
NEW
693
        scanner = Scanner()
×
NEW
694
        command = ScanCommand.CERTIFICATE_INFO
×
NEW
695
        if CA_FILE is not None:
×
NEW
696
            command_extra_args = {
×
697
                command: CertificateInfoExtraArguments(custom_ca_file=Path(CA_FILE))
698
            }
NEW
699
            scan_request = ServerScanRequest(
×
700
                server_info=server_info,
701
                scan_commands_extra_arguments=command_extra_args,
702
                scan_commands=[command],
703
            )
704
        else:
NEW
705
            scan_request = ServerScanRequest(
×
706
                server_info=server_info, scan_commands=[command]
707
            )
NEW
708
        scanner.queue_scan(scan_request)
×
709
        # Retrieve results from generator object
NEW
710
        scan_result = [x for x in scanner.get_results()][0]
×
NEW
711
        cert_plugin_result = scan_result.scan_commands_results[
×
712
            ScanCommand.CERTIFICATE_INFO
713
        ]
714
    except Exception as err:
×
715
        try:
×
716
            if "timed out" in str(err):
×
717
                logging.exception(
×
718
                    "%s: Retrying sslyze scanner certificate plugin.", endpoint.url
719
                )
NEW
720
                scanner.queue_scan(scan_request)
×
721
                # Consume the generator object and retrieve the first result
NEW
722
                scan_result = [x for x in scanner.get_results()][0]
×
NEW
723
                cert_plugin_result = scan_result.scan_commands_results[
×
724
                    ScanCommand.CERTIFICATE_INFO
725
                ]
726
            else:
727
                logging.exception(
×
728
                    "%s: Unknown exception in sslyze scanner certificate plugin.",
729
                    endpoint.url,
730
                )
731
                utils.debug("%s: %s", endpoint.url, err)
×
732
                endpoint.unknown_error = True
×
733
                # We could make this False, but there was an error so
734
                # we don't know
735
                endpoint.https_valid = None
×
736
                return
×
737
        except Exception:
×
738
            logging.exception(
×
739
                "%s: Unknown exception in sslyze scanner certificate plugin.",
740
                endpoint.url,
741
            )
742
            utils.debug("%s: %s", endpoint.url, err)
×
743
            endpoint.unknown_error = True
×
744
            # We could make this False, but there was an error so we
745
            # don't know
746
            endpoint.https_valid = None
×
747
            return
×
748

749
    try:
×
750
        # Default endpoint assessments to False until proven True.
NEW
751
        endpoint.https_expired_cert = False
×
NEW
752
        endpoint.https_self_signed_cert = False
×
NEW
753
        endpoint.https_bad_chain = False
×
NEW
754
        endpoint.https_bad_hostname = False
×
755

756
        # Default trust to Fase until proven True
757
        public_trust = True
×
758
        custom_trust = True
×
759
        public_not_trusted_names = []
×
NEW
760
        for certificate_deployment in cert_plugin_result.certificate_deployments:
×
NEW
761
            validation_results = certificate_deployment.path_validation_results
×
NEW
762
            for result in validation_results:
×
NEW
763
                if result.was_validation_successful:
×
764
                    # We're assuming that it is trusted to start with
NEW
765
                    pass
×
766
                else:
NEW
767
                    if "Custom" in result.trust_store.name:
×
NEW
768
                        custom_trust = False
×
769
                    else:
NEW
770
                        public_trust = False
×
NEW
771
                        public_not_trusted_names.append(result.trust_store.name)
×
772

NEW
773
                if STORE in result.trust_store.name:
×
NEW
774
                    cert_chain = result.verified_certificate_chain
×
NEW
775
                    leaf_cert = cert_chain[0]
×
776

777
                    # Check for leaf certificate expiration/self-signature.
NEW
778
                    if leaf_cert.not_valid_after < datetime.datetime.now():
×
NEW
779
                        endpoint.https_expired_cert = True
×
780

781
                    # Check to see if the cert is self-signed
NEW
782
                    if leaf_cert.issuer == leaf_cert.subject:
×
NEW
783
                        endpoint.https_self_signed_cert = True
×
784

785
                    # Check certificate chain till the second last element
786
                    # The last cert being the root cert is self signed and
787
                    # hence the self signed check is not valid
788
                    # NOTE: If this is the only flag that's set, it's probably
789
                    # an incomplete chain
790
                    # If this isn't the only flag that is set, it might be
791
                    # because there is another error. More debugging would
792
                    # need to be done at this point, but not through sslyze
793
                    # because sslyze doesn't have enough granularity
NEW
794
                    for cert in cert_chain[:-1]:
×
795
                        # Check for certificate expiration
NEW
796
                        if cert.not_valid_after < datetime.datetime.now():
×
NEW
797
                            endpoint.https_bad_chain = True
×
798

799
                        # Check to see if the cert is self-signed
NEW
800
                        if cert.issuer == cert.subject or not cert.issuer:
×
NEW
801
                            endpoint.https_bad_chain = True
×
802

803
                    # If leaf certificate subject does NOT match hostname, bad hostname
804
                    # NOTE: Since sslyze 3.0.0, ever since JSON output for certinfo,
805
                    # SAN(s) are checked as part of _certificate_matches_hostname which
806
                    # called as part of leaf_certificate_subject_matches_hostname
NEW
807
                    if (
×
808
                        not certificate_deployment.leaf_certificate_subject_matches_hostname
809
                    ):
NEW
810
                        endpoint.https_bad_hostname = True
×
811

812
        if public_trust:
×
813
            logging.warning(
×
814
                "%s: Publicly trusted by common trust stores.", endpoint.url
815
            )
816
        else:
817
            logging.warning(
×
818
                "%s: Not publicly trusted - not trusted by %s.",
819
                endpoint.url,
820
                ", ".join(public_not_trusted_names),
821
            )
822
        if CA_FILE is not None:
×
823
            if custom_trust:
×
824
                logging.warning("%s: Trusted by custom trust store.", endpoint.url)
×
825
            else:
826
                logging.warning("%s: Not trusted by custom trust store.", endpoint.url)
×
827
        else:
828
            custom_trust = None
×
829
        endpoint.https_public_trusted = public_trust
×
830
        endpoint.https_custom_trusted = custom_trust
×
831
    except Exception as err:
×
832
        # Ignore exception
833
        logging.exception(
×
834
            "%s: Unknown exception examining certificate deployment.", endpoint.url
835
        )
NEW
836
        utils.debug(
×
837
            "%s: Unknown exception examining certificate deployment: %s",
838
            endpoint.url,
839
            err,
840
        )
841

842
    try:
×
NEW
843
        endpoint.https_cert_chain_len = 0
×
NEW
844
        for certificate_deployment in cert_plugin_result.certificate_deployments:
×
NEW
845
            endpoint.https_cert_chain_len += len(
×
846
                certificate_deployment.received_certificate_chain
847
            )
UNCOV
848
        if endpoint.https_self_signed_cert is False and (
×
849
            endpoint.https_cert_chain_len < 2
850
        ):
851
            # *** TODO check that it is not a bad hostname and that the root cert is trusted before suggesting that it is an intermediate cert issue.
852
            endpoint.https_missing_intermediate_cert = True
×
NEW
853
            has_verfied_cert_chain = True
×
NEW
854
            for certificate_deployment in cert_plugin_result.certificate_deployments:
×
NEW
855
                if certificate_deployment.verified_certificate_chain is None:
×
NEW
856
                    has_verfied_cert_chain = False
×
NEW
857
            if not has_verfied_cert_chain:
×
UNCOV
858
                logging.warning(
×
859
                    "%s: Untrusted certificate chain, probably due to missing intermediate certificate.",
860
                    endpoint.url,
861
                )
862
                utils.debug(
×
863
                    "%s: Only %s certificates in certificate chain received.",
864
                    endpoint.url,
865
                    endpoint.https_cert_chain_len,
866
                )
867
            elif custom_trust is True and public_trust is False:
×
868
                # recheck public trust using custom public trust store with manually added intermediate certificates
869
                if PT_INT_CA_FILE is not None:
×
870
                    try:
×
871
                        cert_plugin_result = None
×
NEW
872
                        scanner = Scanner()
×
NEW
873
                        command = ScanCommand.CERTIFICATE_INFO
×
NEW
874
                        command_extra_args = {
×
875
                            command: CertificateInfoExtraArguments(
876
                                custom_ca_file=Path(PT_INT_CA_FILE)
877
                            )
878
                        }
NEW
879
                        scan_request = ServerScanRequest(
×
880
                            server_info=server_info,
881
                            scan_commands_extra_arguments=command_extra_args,
882
                            scan_commands=[command],
883
                        )
NEW
884
                        scanner.queue_scan(scan_request)
×
885
                        # Consume the generator object and retrieve the first result
NEW
886
                        scan_result = [x for x in scanner.get_results()][0]
×
NEW
887
                        cert_plugin_result = scan_result.scan_commands_results[
×
888
                            ScanCommand.CERTIFICATE_INFO
889
                        ]
NEW
890
                        has_verfied_cert_chain = True
×
NEW
891
                        for (
×
892
                            certificate_deployment
893
                        ) in cert_plugin_result.certificate_deployments:
NEW
894
                            if (
×
895
                                certificate_deployment.verified_certificate_chain
896
                                is None
897
                            ):
NEW
898
                                has_verfied_cert_chain = False
×
NEW
899
                        if has_verfied_cert_chain:
×
900
                            public_trust = True
×
901
                            endpoint.https_public_trusted = public_trust
×
902
                            logging.warning(
×
903
                                "%s: Trusted by special public trust store with intermediate certificates.",
904
                                endpoint.url,
905
                            )
906
                    except Exception:
×
907
                        logging.exception("Error while rechecking public trust")
×
908
        else:
909
            endpoint.https_missing_intermediate_cert = False
×
910
    except Exception:
×
911
        logging.exception("Error while determining length of certificate chain")
×
912

913
    # If anything is wrong then https is not valid
914
    if (
×
915
        endpoint.https_expired_cert
916
        or endpoint.https_self_signed_cert
917
        or endpoint.https_bad_chain
918
        or endpoint.https_bad_hostname
919
    ):
920
        endpoint.https_valid = False
×
921

922

923
def canonical_endpoint(http, httpwww, https, httpswww):
3✔
924
    """Make a best guess for the "canonical" endpoint of a domain.
925

926
    Given behavior for the four endpoints, make a best guess
927
    as to which is the "canonical" site for the domain.
928

929
    Most of the domain-level decisions rely on this guess in some way.
930

931
    A domain is "canonically" at www if:
932
     * at least one of its www endpoints responds
933
     * both root endpoints are either down or redirect *somewhere*
934
     * either both root endpoints are down, *or* at least one
935
       root endpoint redirect should immediately go to
936
       an *internal* www endpoint
937
    This is meant to affirm situations like:
938
      http:// -> https:// -> https://www
939
      https:// -> http:// -> https://www
940
    and meant to avoid affirming situations like:
941
      http:// -> http://non-www,
942
      http://www -> http://non-www
943
    or like:
944
      https:// -> 200, http:// -> http://www
945
    """
946
    at_least_one_www_used = httpswww.live or httpwww.live
3✔
947

948
    def root_unused(endpoint):
3✔
949
        return (
3✔
950
            endpoint.redirect
951
            or not endpoint.live
952
            or endpoint.https_bad_hostname  # harmless for http endpoints
953
            or not str(endpoint.status).startswith("2")
954
        )
955

956
    def root_down(endpoint):
3✔
957
        return (
3✔
958
            not endpoint.live
959
            or endpoint.https_bad_hostname
960
            or (
961
                not str(endpoint.status).startswith("2")
962
                and not str(endpoint.status).startswith("3")
963
            )
964
        )
965

966
    all_roots_unused = root_unused(https) and root_unused(http)
3✔
967

968
    all_roots_down = root_down(https) and root_down(http)
3✔
969

970
    is_www = (
3✔
971
        at_least_one_www_used
972
        and all_roots_unused
973
        and (
974
            all_roots_down
975
            or https.redirect_immediately_to_www
976
            or http.redirect_immediately_to_www
977
        )
978
    )
979

980
    # A domain is "canonically" at https if:
981
    #  * at least one of its https endpoints is live and
982
    #    doesn't have an invalid hostname
983
    #  * both http endpoints are either down or redirect *somewhere*
984
    #  * at least one http endpoint redirects immediately to
985
    #    an *internal* https endpoint
986
    # This is meant to affirm situations like:
987
    #   http:// -> http://www -> https://
988
    #   https:// -> http:// -> https://www
989
    # and meant to avoid affirming situations like:
990
    #   http:// -> http://non-www
991
    #   http://www -> http://non-www
992
    # or:
993
    #   http:// -> 200, http://www -> https://www
994
    #
995
    # It allows a site to be canonically HTTPS if the cert has
996
    # a valid hostname but invalid chain issues.
997

998
    def https_used(endpoint):
3✔
999
        return endpoint.live and not endpoint.https_bad_hostname
3✔
1000

1001
    def http_unused(endpoint):
3✔
1002
        return (
3✔
1003
            endpoint.redirect
1004
            or not endpoint.live
1005
            or not str(endpoint.status).startswith("2")
1006
        )
1007

1008
    def http_upgrades(endpoint):
3✔
1009
        return endpoint.redirect_immediately_to_https and (
3✔
1010
            not endpoint.redirect_immediately_to_external
1011
        )
1012

1013
    at_least_one_https_endpoint = https_used(https) or https_used(httpswww)
3✔
1014
    all_http_unused = http_unused(http) and http_unused(httpwww)
3✔
1015
    both_http_down = not http.live and not httpwww.live
3✔
1016
    at_least_one_http_upgrades = http_upgrades(http) or http_upgrades(httpwww)
3✔
1017

1018
    is_https = (
3✔
1019
        at_least_one_https_endpoint
1020
        and all_http_unused
1021
        and (both_http_down or at_least_one_http_upgrades)
1022
    )
1023

1024
    if is_www and is_https:
3!
1025
        return httpswww
×
1026
    if is_www and not is_https:
3!
1027
        return httpwww
×
1028
    if not is_www and is_https:
3!
1029
        return https
×
1030
    if not is_www and not is_https:
3!
1031
        return http
3✔
1032

1033

1034
##
1035
# Judgment calls based on observed endpoint data.
1036
##
1037

1038

1039
def is_live(domain):
3✔
1040
    """Check if a domain has any live endpoints."""
1041
    http, httpwww, https, httpswww = (
3✔
1042
        domain.http,
1043
        domain.httpwww,
1044
        domain.https,
1045
        domain.httpswww,
1046
    )
1047

1048
    return http.live or httpwww.live or https.live or httpswww.live
3✔
1049

1050

1051
def is_https_live(domain):
3✔
1052
    """Check if a domain has any live HTTPS endpoints."""
1053
    https, httpswww = domain.https, domain.httpswww
3✔
1054

1055
    return https.live or httpswww.live
3✔
1056

1057

1058
def is_full_connection(domain):
3✔
1059
    """Check if a domain is fully connected.
1060

1061
    Domain is "fully connected" if any HTTPS endpoint is fully connected.
1062
    """
1063
    https, httpswww = domain.https, domain.httpswww
3✔
1064

1065
    return https.https_full_connection or httpswww.https_full_connection
3✔
1066

1067

1068
def is_client_auth_required(domain):
3✔
1069
    """Check if a domain requires client authentication.
1070

1071
    Domain requires client authentication if *any* HTTPS endpoint requires it for full
1072
    TLS connection.
1073
    """
1074
    https, httpswww = domain.https, domain.httpswww
3✔
1075

1076
    return https.https_client_auth_required or httpswww.https_client_auth_required
3✔
1077

1078

1079
def is_redirect_or_down(endpoint):
3✔
1080
    """Check if an endpoint redirects to an external site or is down.
1081

1082
    Endpoint is a redirect or down if it is a redirect to an external site or it is
1083
    down in any of 3 ways: it is not live, it is HTTPS and has a bad hostname in the
1084
    cert, or it responds with a 4xx error code
1085
    """
1086
    return (
×
1087
        endpoint.redirect_eventually_to_external
1088
        or not endpoint.live
1089
        or (endpoint.protocol == "https" and endpoint.https_bad_hostname)
1090
        or (endpoint.status is not None and endpoint.status >= 400)
1091
    )
1092

1093

1094
def is_redirect(endpoint):
3✔
1095
    """Check if an endpoint is a redirect to an external site."""
1096
    return endpoint.redirect_eventually_to_external
×
1097

1098

1099
def is_redirect_domain(domain):
3✔
1100
    """Check if a domain redirects HTTP or HTTPS traffic.
1101

1102
    Domain is "a redirect domain" if at least one endpoint is
1103
    a redirect, and all endpoints are either redirects or down.
1104
    """
1105
    http, httpwww, https, httpswww = (
3✔
1106
        domain.http,
1107
        domain.httpwww,
1108
        domain.https,
1109
        domain.httpswww,
1110
    )
1111

1112
    return is_live(domain) and (
3✔
1113
        (
1114
            is_redirect(http)
1115
            or is_redirect(httpwww)
1116
            or is_redirect(https)
1117
            or is_redirect(httpswww)
1118
        )
1119
        and is_redirect_or_down(https)
1120
        and is_redirect_or_down(httpswww)
1121
        and is_redirect_or_down(httpwww)
1122
        and is_redirect_or_down(http)
1123
    )
1124

1125

1126
def is_http_redirect_domain(domain):
3✔
1127
    """Check if a domain redirects HTTP traffic.
1128

1129
    Domain is "an http redirect domain" if at least one HTTP endpoint
1130
    is a redirect, and all other http endpoints are either redirects
1131
    or down.
1132
    """
1133
    (
×
1134
        http,
1135
        httpwww,
1136
    ) = (
1137
        domain.http,
1138
        domain.httpwww,
1139
    )
1140

1141
    return is_live(domain) and (
×
1142
        (is_redirect(http) or is_redirect(httpwww))
1143
        and is_redirect_or_down(httpwww)
1144
        and is_redirect_or_down(http)
1145
    )
1146

1147

1148
def redirects_to(domain):
3✔
1149
    """Check where a domain redirects to (if it redirects).
1150

1151
    If a domain is a "redirect domain", where does it redirect to?
1152
    """
1153
    canonical = domain.canonical
3✔
1154

1155
    if is_redirect_domain(domain):
3!
1156
        return canonical.redirect_eventually_to
×
1157
    return None
3✔
1158

1159

1160
def is_valid_https(domain):
3✔
1161
    """Check if a domain has a valid HTTPS server.
1162

1163
    A domain has "valid HTTPS" if it responds on port 443 at its canonical
1164
    hostname with an unexpired valid certificate for the hostname.
1165
    """
1166
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1167

1168
    # Evaluate the HTTPS version of the canonical hostname
1169
    evaluate = https if canonical.host == "root" else httpswww
3✔
1170

1171
    return evaluate.live and evaluate.https_valid
3✔
1172

1173

1174
def is_defaults_to_https(domain):
3✔
1175
    """Check if a domain defaults to HTTPS.
1176

1177
    A domain "defaults to HTTPS" if its canonical endpoint uses HTTPS.
1178
    """
1179
    canonical = domain.canonical
3✔
1180

1181
    return canonical.protocol == "https"
3✔
1182

1183

1184
def is_downgrades_https(domain):
3✔
1185
    """Check if a domain allows downgrading HTTPS.
1186

1187
    Domain downgrades if HTTPS is supported in some way, but
1188
    its canonical HTTPS endpoint immediately redirects internally to HTTP.
1189
    """
1190
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1191

1192
    # The domain "supports" HTTPS if any HTTPS endpoint responds with
1193
    # a certificate valid for its hostname.
1194
    supports_https = (https.live and not https.https_bad_hostname) or (
3✔
1195
        httpswww.live and not httpswww.https_bad_hostname
1196
    )
1197

1198
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1199

1200
    # Explicitly convert to bool to avoid unintentionally returning None,
1201
    # which may happen if the site doesn't redirect.
1202
    return bool(
3✔
1203
        supports_https
1204
        and canonical_https.redirect_immediately_to_http
1205
        and not canonical_https.redirect_immediately_to_external
1206
    )
1207

1208

1209
def is_strictly_forces_https(domain):
3✔
1210
    """Check if a domain strictly forces HTTPS.
1211

1212
    A domain "Strictly Forces HTTPS" if one of the HTTPS endpoints is
1213
    "live", and if both *HTTP* endpoints are either:
1214

1215
     * down, or
1216
     * redirect immediately to an HTTPS URI.
1217

1218
    This is different than whether a domain "Defaults" to HTTPS.
1219

1220
    * An HTTP redirect can go to HTTPS on another domain, as long
1221
      as it's immediate.
1222
    * A domain with an invalid cert can still be enforcing HTTPS.
1223
    """
1224
    http, httpwww, https, httpswww = (
3✔
1225
        domain.http,
1226
        domain.httpwww,
1227
        domain.https,
1228
        domain.httpswww,
1229
    )
1230

1231
    def down_or_redirects(endpoint):
3✔
1232
        return not endpoint.live or endpoint.redirect_immediately_to_https
3✔
1233

1234
    https_somewhere = https.live or httpswww.live
3✔
1235
    all_http_unused = down_or_redirects(http) and down_or_redirects(httpwww)
3✔
1236

1237
    return https_somewhere and all_http_unused
3✔
1238

1239

1240
def is_publicly_trusted(domain):
3✔
1241
    """Check if a domain has a publicly trusted certificate.
1242

1243
    A domain has a "Publicly Trusted" certificate if its canonical
1244
    endpoint has a publicly trusted certificate.
1245
    """
1246
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1247

1248
    # Evaluate the HTTPS version of the canonical hostname
1249
    evaluate = https if canonical.host == "root" else httpswww
3✔
1250

1251
    return evaluate.live and evaluate.https_public_trusted
3✔
1252

1253

1254
def is_custom_trusted(domain):
3✔
1255
    """Check if a domain has a custom trusted certificate.
1256

1257
    A domain has a "Custom Trusted" certificate if its canonical
1258
    endpoint has a certificate that is trusted by the custom
1259
    truststore.
1260
    """
1261
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1262

1263
    # Evaluate the HTTPS version of the canonical hostname
1264
    evaluate = https if canonical.host == "root" else httpswww
3✔
1265

1266
    return evaluate.live and evaluate.https_custom_trusted
3✔
1267

1268

1269
def is_bad_chain(domain):
3✔
1270
    """Check if a domain has a bad certificate chain.
1271

1272
    Domain has a bad chain if its canonical HTTPS endpoint has a bad
1273
    chain.
1274
    """
1275
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1276

1277
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1278

1279
    return canonical_https.https_bad_chain
3✔
1280

1281

1282
def is_bad_hostname(domain):
3✔
1283
    """Check if a domain has a bad hostname.
1284

1285
    Domain has a bad hostname if its canonical HTTPS endpoint fails
1286
    hostname validation.
1287
    """
1288
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1289

1290
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1291

1292
    return canonical_https.https_bad_hostname
3✔
1293

1294

1295
def is_expired_cert(domain):
3✔
1296
    """Check if a domain's canonical endpoint has an expired certificate."""
1297
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1298

1299
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1300

1301
    return canonical_https.https_expired_cert
3✔
1302

1303

1304
def is_self_signed_cert(domain):
3✔
1305
    """Check if the domain's canonical endpoint has a self-signed certificate."""
1306
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1307

1308
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1309

1310
    return canonical_https.https_self_signed_cert
3✔
1311

1312

1313
def cert_chain_length(domain):
3✔
1314
    """Get the certificate chain length for a domain's canonical HTTPS endpoint."""
1315
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1316

1317
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1318

1319
    return canonical_https.https_cert_chain_len
3✔
1320

1321

1322
def is_missing_intermediate_cert(domain):
3✔
1323
    """Check if a domain's certificate chain is missing an intermediate certificate.
1324

1325
    Returns whether the served cert chain is probably missing the
1326
    needed intermediate certificate for the canonical HTTPS endpoint.
1327
    """
1328
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1329

1330
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1331

1332
    return canonical_https.https_missing_intermediate_cert
3✔
1333

1334

1335
def is_hsts(domain):
3✔
1336
    """Check if a domain's canonical endpoint has HSTS.
1337

1338
    Domain has HSTS if its canonical HTTPS endpoint has HSTS.
1339
    """
1340
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1341

1342
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1343

1344
    return canonical_https.hsts
3✔
1345

1346

1347
def hsts_header(domain):
3✔
1348
    """Get a domain's canonical endpoint's HSTS header."""
1349
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1350

1351
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1352

1353
    return canonical_https.hsts_header
3✔
1354

1355

1356
def hsts_max_age(domain):
3✔
1357
    """Get a domain's canonical endpoint's HSTS max-age."""
1358
    canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww
3✔
1359

1360
    canonical_https = httpswww if canonical.host == "www" else https
3✔
1361

1362
    return canonical_https.hsts_max_age
3✔
1363

1364

1365
def is_hsts_entire_domain(domain):
3✔
1366
    """Check if a domain's ROOT endpoint HSTS configuration includes all subdomains."""
1367
    https = domain.https
3✔
1368

1369
    return https.hsts_all_subdomains
3✔
1370

1371

1372
def is_hsts_preload_ready(domain):
3✔
1373
    """Check if a domain's ROOT endpoint is HSTS preload-ready."""
1374
    https = domain.https
3✔
1375

1376
    eighteen_weeks = (https.hsts_max_age is not None) and (
3✔
1377
        https.hsts_max_age >= 10886400
1378
    )
1379
    preload_ready = eighteen_weeks and https.hsts_all_subdomains and https.hsts_preload
3✔
1380

1381
    return preload_ready
3✔
1382

1383

1384
def is_hsts_preload_pending(domain):
3✔
1385
    """Check if a domain is pending inclusion in Chrome's HSTS preload list.
1386

1387
    If PRELOAD_PENDING is None, the caches have not been initialized, so do
1388
    that.
1389
    """
1390
    if PRELOAD_PENDING is None:
3!
1391
        logging.error("`PRELOAD_PENDING` has not yet been initialized!")
×
1392
        raise RuntimeError(
×
1393
            "`initialize_external_data()` must be called explicitly before "
1394
            "using this function"
1395
        )
1396

1397
    return domain.domain in PRELOAD_PENDING
3✔
1398

1399

1400
def is_hsts_preloaded(domain):
3✔
1401
    """Check if a domain is contained in Chrome's HSTS preload list.
1402

1403
    If PRELOAD_LIST is None, the caches have not been initialized, so do that.
1404
    """
1405
    if PRELOAD_LIST is None:
3!
1406
        logging.error("`PRELOAD_LIST` has not yet been initialized!")
×
1407
        raise RuntimeError(
×
1408
            "`initialize_external_data()` must be called explicitly before "
1409
            "using this function"
1410
        )
1411

1412
    return domain.domain in PRELOAD_LIST
3✔
1413

1414

1415
def is_parent_hsts_preloaded(domain):
3✔
1416
    """Check if a domain's parent domain is in Chrome's HSTS preload list."""
1417
    return is_hsts_preloaded(Domain(parent_domain_for(domain.domain)))
3✔
1418

1419

1420
def parent_domain_for(hostname):
3✔
1421
    """Get the parent domain for a given domain name.
1422

1423
    For "x.y.domain.gov", return "domain.gov".
1424

1425
    If SUFFIX_LIST is None, the caches have not been initialized, so do that.
1426
    """
1427
    if SUFFIX_LIST is None:
3!
1428
        logging.error("`SUFFIX_LIST` has not yet been initialized!")
×
1429
        raise RuntimeError(
×
1430
            "`initialize_external_data()` must be called explicitly before "
1431
            "using this function"
1432
        )
1433

1434
    return SUFFIX_LIST.get_public_suffix(hostname)
3✔
1435

1436

1437
def is_domain_supports_https(domain):
3✔
1438
    """Check if a domain supports HTTPS.
1439

1440
    A domain 'Supports HTTPS' when it doesn't downgrade and has valid HTTPS,
1441
    or when it doesn't downgrade and has a bad chain but not a bad hostname.
1442
    Domains with a bad chain "support" HTTPS but user-side errors should be expected.
1443
    """
1444
    return (not is_downgrades_https(domain) and is_valid_https(domain)) or (
3✔
1445
        not is_downgrades_https(domain)
1446
        and is_bad_chain(domain)
1447
        and not is_bad_hostname(domain)
1448
    )
1449

1450

1451
def is_domain_enforces_https(domain):
3✔
1452
    """Check if a domain enforces HTTPS.
1453

1454
    A domain that 'Enforces HTTPS' must 'Support HTTPS' and default to
1455
    HTTPS.  For websites (where Redirect is false) they are allowed to
1456
    eventually redirect to an https:// URI. For "redirect domains"
1457
    (domains where the Redirect value is true) they must immediately
1458
    redirect clients to an https:// URI (even if that URI is on
1459
    another domain) in order to be said to enforce HTTPS.
1460
    """
1461
    return (
3✔
1462
        is_domain_supports_https(domain)
1463
        and is_strictly_forces_https(domain)
1464
        and (is_defaults_to_https(domain) or is_http_redirect_domain(domain))
1465
    )
1466

1467

1468
def is_domain_strong_hsts(domain):
3✔
1469
    """Check if a domain is using strong HSTS."""
1470
    if is_hsts(domain) and hsts_max_age(domain):
3!
1471
        return is_hsts(domain) and hsts_max_age(domain) >= 31536000
×
1472
    return None
3✔
1473

1474

1475
def get_domain_ip(domain):
3✔
1476
    """Get the IP for the domain.
1477

1478
    This returns the first that is not None in the following priority:
1479
    Canonical -> HTTPS -> www HTTPS -> www HTTP -> HTTP
1480
    """
1481
    if domain.canonical.ip is not None:
3!
1482
        return domain.canonical.ip
×
1483
    if domain.https.ip is not None:
3!
1484
        return domain.https.ip
×
1485
    if domain.httpswww.ip is not None:
3!
1486
        return domain.httpswww.ip
×
1487
    if domain.httpwww.ip is not None:
3!
1488
        return domain.httpwww.ip
×
1489
    if domain.http.ip is not None:
3!
1490
        return domain.http.ip
×
1491
    return None
3✔
1492

1493

1494
def get_domain_server_header(domain):
3✔
1495
    """Get the Server header from the response for the domain.
1496

1497
    This returns the first that is not None in the following priority:
1498
    Canonical -> HTTPS -> www HTTPS -> www HTTP -> HTTP
1499
    """
1500
    if domain.canonical.server_header is not None:
3!
1501
        return domain.canonical.server_header.replace(",", ";")
×
1502
    if domain.https.server_header is not None:
3!
1503
        return domain.https.server_header.replace(",", ";")
×
1504
    if domain.httpswww.server_header is not None:
3!
1505
        return domain.httpswww.server_header.replace(",", ";")
×
1506
    if domain.httpwww.server_header is not None:
3!
1507
        return domain.httpwww.server_header.replace(",", ";")
×
1508
    if domain.http.server_header is not None:
3!
1509
        return domain.http.server_header.replace(",", ";")
×
1510
    return None
3✔
1511

1512

1513
def get_domain_server_version(domain):
3✔
1514
    """Get the server version for the remote web server.
1515

1516
    This returns the first that is not None in the following priority:
1517
    Canonical -> HTTPS -> www HTTPS -> www HTTP -> HTTP
1518
    The server version is based on the returned Server header.
1519
    """
1520
    if domain.canonical.server_version is not None:
3!
1521
        return domain.canonical.server_version
×
1522
    if domain.https.server_version is not None:
3!
1523
        return domain.https.server_version
×
1524
    if domain.httpswww.server_version is not None:
3!
1525
        return domain.httpswww.server_version
×
1526
    if domain.httpwww.server_version is not None:
3!
1527
        return domain.httpwww.server_version
×
1528
    if domain.http.server_version is not None:
3!
1529
        return domain.http.server_version
×
1530
    return None
3✔
1531

1532

1533
def get_domain_notes(domain):
3✔
1534
    """Combine any notes for a domain."""
1535
    all_notes = (
3✔
1536
        domain.http.notes
1537
        + domain.httpwww.notes
1538
        + domain.https.notes
1539
        + domain.httpswww.notes
1540
    )
1541
    all_notes = all_notes.replace(",", ";")
3✔
1542
    return all_notes
3✔
1543

1544

1545
def did_domain_error(domain):
3✔
1546
    """Check a domain for any unknown errors.
1547

1548
    The main purpose of this is to flag any odd websites for
1549
    further debugging with other tools.
1550
    """
1551
    http, httpwww, https, httpswww = (
3✔
1552
        domain.http,
1553
        domain.httpwww,
1554
        domain.https,
1555
        domain.httpswww,
1556
    )
1557

1558
    return (
3✔
1559
        http.unknown_error
1560
        or httpwww.unknown_error
1561
        or https.unknown_error
1562
        or httpswww.unknown_error
1563
    )
1564

1565

1566
def load_preload_pending():
3✔
1567
    """Fetch the Chrome preload pending list."""
1568
    utils.debug("Fetching hstspreload.org pending list...", divider=True)
×
1569
    pending_url = "https://hstspreload.org/api/v2/pending"
×
1570

1571
    try:
×
1572
        request = requests.get(pending_url)
×
1573
    except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err:
×
1574
        logging.exception("Failed to fetch pending preload list: %s", pending_url)
×
1575
        logging.debug(err)
×
1576
        return []
×
1577

1578
    # TODO: abstract Py 2/3 check out to utils
1579
    if sys.version_info[0] < 3:
×
1580
        raw = request.content
×
1581
    else:
1582
        raw = str(request.content, "utf-8")
×
1583

1584
    pending_json = json.loads(raw)
×
1585

1586
    pending = []
×
1587
    for entry in pending_json:
×
1588
        if entry.get("include_subdomains", False) is True:
×
1589
            pending.append(entry["name"])
×
1590

1591
    return pending
×
1592

1593

1594
def load_preload_list():
3✔
1595
    """Download and load the Chromium preload list."""
1596
    preload_json = None
×
1597

1598
    utils.debug("Fetching Chrome preload list from source...", divider=True)
×
1599

1600
    # Downloads the chromium preloaded domain list and sets it to a global set
1601
    file_url = "https://chromium.googlesource.com/chromium/src/+/main/net/http/transport_security_state_static.json?format=TEXT"
×
1602

1603
    try:
×
1604
        request = requests.get(file_url)
×
1605
    except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err:
×
1606
        logging.exception("Failed to fetch preload list: %s", file_url)
×
1607
        logging.debug(err)
×
1608
        return []
×
1609

1610
    raw = request.content
×
1611

1612
    # To avoid parsing the contents of the file out of the source tree viewer's
1613
    # HTML, we download it as a raw file. googlesource.com Base64-encodes the
1614
    # file to avoid potential content injection issues, so we need to decode it
1615
    # before using it. https://code.google.com/p/gitiles/issues/detail?id=7
1616
    raw = base64.b64decode(raw).decode("utf-8")
×
1617

1618
    # The .json file contains '//' comments, which are not actually valid JSON,
1619
    # and confuse Python's JSON decoder. Begone, foul comments!
1620
    raw = "".join([re.sub(r"^\s*//.*$", "", line) for line in raw.splitlines()])
×
1621

1622
    preload_json = json.loads(raw)
×
1623

1624
    # For our purposes, we only care about entries that includeSubDomains
1625
    fully_preloaded = []
×
1626
    for entry in preload_json["entries"]:
×
1627
        if entry.get("include_subdomains", False) is True:
×
1628
            fully_preloaded.append(entry["name"])
×
1629

1630
    return fully_preloaded
×
1631

1632

1633
# Returns an instantiated PublicSuffixList object.
1634
def load_suffix_list(cache_suffix_list=None, update_list=False):
3✔
1635
    """Download and load the public suffix list."""
1636
    if update_list:
×
1637
        utils.debug("Downloading the Public Suffix List...", divider=True)
×
1638
        try:
×
1639
            # Update the local copy
1640
            if cache_suffix_list:
×
1641
                updatePSL(cache_suffix_list)
×
1642
            # Update the built-in copy
1643
            else:
1644
                updatePSL()
×
1645
        except Exception as err:
×
1646
            logging.exception("Unable to download the Public Suffix List...")
×
1647
            utils.debug(err)
×
1648
            return None
×
1649

1650
    # Use the local copy
1651
    if cache_suffix_list:
×
1652
        utils.debug("Using cached Public Suffix List.", divider=True)
×
1653
        with codecs.open(cache_suffix_list, encoding="utf-8") as cache_file:
×
1654
            suffixes = PublicSuffixList(cache_file)
×
1655
    # Use the built-in copy
1656
    else:
1657
        suffixes = PublicSuffixList()
×
1658

1659
    return suffixes
×
1660

1661

1662
def initialize_external_data(
3✔
1663
    init_preload_list=None, init_preload_pending=None, init_suffix_list=None
1664
):
1665
    """Load any third party external data.
1666

1667
    This can be called explicitly by a library, as part of the setup needed
1668
    before calling other library functions, or called as part of running
1669
    inspect_domains() or CLI operation.
1670

1671
    If values are passed in to this function, they will be assigned to
1672
    be the cached values. This allows a caller of the Python API to manage
1673
    cached data in a customized way.
1674

1675
    It also potentially allows clients to pass in subsets of these lists,
1676
    for testing or novel performance reasons.
1677

1678
    Otherwise, if the --cache-third-parties=[DIR] flag specifies a directory,
1679
    all downloaded third party data will be cached in a directory, and
1680
    used from cache on the next pshtt run instead of hitting the network.
1681

1682
    If no values are passed in, and no --cache-third-parties flag is used,
1683
    then no cached third party data will be created or used, and pshtt will
1684
    download the latest data from those third party sources.
1685
    """
1686
    global PRELOAD_LIST, PRELOAD_PENDING, SUFFIX_LIST
1687

1688
    # The preload list should be sent in as a list of domains.
1689
    if init_preload_list is not None:
×
1690
        PRELOAD_LIST = init_preload_list
×
1691

1692
    # The PRELOAD_PENDING list should be sent in as a list of domains.
1693
    if init_preload_pending is not None:
×
1694
        PRELOAD_PENDING = init_preload_pending
×
1695

1696
    # The public suffix list should be sent in as a list of file lines.
1697
    if init_suffix_list is not None:
×
1698
        SUFFIX_LIST = PublicSuffixList(init_suffix_list)
×
1699

1700
    # If there's a specified cache dir, prepare paths.
1701
    # Only used when no data has been set yet for a source.
1702
    if THIRD_PARTIES_CACHE:
×
1703
        cache_preload_list = os.path.join(
×
1704
            THIRD_PARTIES_CACHE, CACHE_PRELOAD_LIST_DEFAULT
1705
        )
1706
        cache_preload_pending = os.path.join(
×
1707
            THIRD_PARTIES_CACHE, CACHE_PRELOAD_PENDING_DEFAULT
1708
        )
1709
        cache_suffix_list = os.path.join(THIRD_PARTIES_CACHE, CACHE_SUFFIX_LIST_DEFAULT)
×
1710
    else:
1711
        cache_preload_list, cache_preload_pending, cache_suffix_list = None, None, None
×
1712

1713
    # Load Chrome's latest versioned HSTS preload list.
1714
    if PRELOAD_LIST is None:
×
1715
        if cache_preload_list and os.path.exists(cache_preload_list):
×
1716
            utils.debug("Using cached Chrome preload list.", divider=True)
×
1717
            with open(cache_preload_list, encoding="utf-8") as cache_file:
×
1718
                PRELOAD_LIST = json.loads(cache_file.read())
×
1719
        else:
1720
            PRELOAD_LIST = load_preload_list()
×
1721

1722
            if cache_preload_list:
×
1723
                utils.debug(
×
1724
                    "Caching preload list at %s", cache_preload_list, divider=True
1725
                )
1726
                utils.write(utils.json_for(PRELOAD_LIST), cache_preload_list)
×
1727

1728
    # Load Chrome's current HSTS pending preload list.
1729
    if PRELOAD_PENDING is None:
×
1730
        if cache_preload_pending and os.path.exists(cache_preload_pending):
×
1731
            utils.debug("Using cached hstspreload.org pending list.", divider=True)
×
1732
            with open(cache_preload_pending, encoding="utf-8") as cache_file:
×
1733
                PRELOAD_PENDING = json.loads(cache_file.read())
×
1734
        else:
1735
            PRELOAD_PENDING = load_preload_pending()
×
1736

1737
            if cache_preload_pending:
×
1738
                utils.debug(
×
1739
                    "Caching preload pending list at %s",
1740
                    cache_preload_pending,
1741
                    divider=True,
1742
                )
1743
                utils.write(utils.json_for(PRELOAD_PENDING), cache_preload_pending)
×
1744

1745
    # Load Mozilla's current Public Suffix list.
1746
    if SUFFIX_LIST is None:
×
1747
        if cache_suffix_list:
×
1748
            # Retrieve the list if the path does not exist otherwise use the cached copy
1749
            SUFFIX_LIST = load_suffix_list(
×
1750
                cache_suffix_list, not os.path.exists(cache_suffix_list)
1751
            )
1752
        else:
1753
            # Load the built-in PSL
1754
            SUFFIX_LIST = load_suffix_list()
×
1755

1756

1757
def inspect_domains(domains, options):
3✔
1758
    """Run inspect() against each of the given domains with the given options."""
1759
    # Override timeout, user agent, preload cache, default CA bundle
1760
    global TIMEOUT, USER_AGENT, THIRD_PARTIES_CACHE, CA_FILE, PT_INT_CA_FILE, STORE
1761

1762
    if options.get("timeout"):
×
1763
        TIMEOUT = int(options["timeout"])
×
1764
    if options.get("user_agent"):
×
1765
        USER_AGENT = options["user_agent"]
×
1766

1767
    # Supported cache flag, a directory to store all third party requests.
1768
    if options.get("cache-third-parties"):
×
1769
        THIRD_PARTIES_CACHE = options["cache-third-parties"]
×
1770

1771
    if options.get("ca_file"):
×
1772
        CA_FILE = options["ca_file"]
×
1773
        # By default, the store that we want to check is the Mozilla store
1774
        # However, if a user wants to use their own CA bundle, check the
1775
        # "Custom" Option from the sslyze output.
1776
        STORE = "Custom"
×
1777

1778
    if options.get("pt_int_ca_file"):
×
1779
        PT_INT_CA_FILE = options["pt_int_ca_file"]
×
1780

1781
    # If this has been run once already by a Python API client, it
1782
    # can be safely run without hitting the network or disk again,
1783
    # and without overriding the data the Python user set for them.
1784
    initialize_external_data()
×
1785

1786
    # For every given domain, get inspect data.
1787
    for domain in domains:
×
1788
        yield inspect(domain)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc