• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cisagov / trustymail / 5072383466

pending completion
5072383466

Pull #135

github

GitHub
Merge 5b2beb1f8 into 585c9a8f8
Pull Request #135: 134 fix psl args

0 of 22 new or added lines in 3 files covered. (0.0%)

1 existing line in 1 file now uncovered.

0 of 642 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/trustymail/domain.py
1
"""Provide a data model for domains and some utility functions."""
2

3
# Standard Python Libraries
4
from collections import OrderedDict
×
5
from datetime import datetime, timedelta
×
NEW
6
import json
×
NEW
7
from os import path, stat, utime
×
UNCOV
8
from typing import Dict
×
9

10
# Third-Party Libraries
11
from publicsuffixlist.compat import PublicSuffixList
×
12
from publicsuffixlist.update import updatePSL
×
13

NEW
14
from . import  trustymail
×
15

16
def get_psl():
×
17
    """Get the Public Suffix List - either new, or cached in the CWD for 24 hours.
18

19
    Returns
20
    -------
21
    PublicSuffixList: An instance of PublicSuffixList loaded with a cached or updated list
22
    """
23
    # Read environment variables written by the CLI
NEW
24
    f = open('env.json')
×
NEW
25
    env = json.load(f)
×
NEW
26
    f.close()
×
NEW
27
    filename = env['--psl-filename']
×
NEW
28
    readonly = env['--psl-read-only']
×
29
    # Download the PSL if necessary
NEW
30
    if not readonly:
×
NEW
31
        if not path.exists(filename):
×
NEW
32
            updatePSL(filename)
×
NEW
33
            utime(filename, None)  # Set mtime to now
×
34
        else:
35
            psl_age = datetime.now() - datetime.fromtimestamp(
×
36
                stat(filename).st_mtime
37
            )
38
            if psl_age > timedelta(hours=24):
×
NEW
39
                updatePSL(filename)
×
NEW
40
                utime(filename, None)  # Set mtime to now
×
41

NEW
42
    with open(filename, encoding="utf-8") as psl_file:
×
43
        psl = PublicSuffixList(psl_file)
×
44

45
    return psl
×
46

47

48
def get_public_suffix(domain):
×
49
    """Return the public suffix of a given domain."""
50
    public_list = get_psl()
×
51

52
    return public_list.get_public_suffix(domain)
×
53

54

55
def format_list(record_list):
×
56
    """Format a list into a string to increase readability in CSV."""
57
    # record_list should only be a list, not an integer, None, or
58
    # anything else.  Thus this if clause handles only empty
59
    # lists.  This makes a "null" appear in the JSON output for
60
    # empty lists, as expected.
61
    if not record_list:
×
62
        return None
×
63

64
    return ", ".join(record_list)
×
65

66

67
class Domain:
×
68
    """Store information about a domain."""
69

70
    base_domains: Dict[str, "Domain"] = {}
×
71

72
    def __init__(
×
73
        self,
74
        domain_name,
75
        timeout,
76
        smtp_timeout,
77
        smtp_localhost,
78
        smtp_ports,
79
        smtp_cache,
80
        dns_hostnames,
81
    ):
82
        """Retrieve information about a given domain name."""
83
        self.domain_name = domain_name.lower()
×
84

85
        self.base_domain_name = get_public_suffix(self.domain_name)
×
86

87
        self.is_base_domain = True
×
88
        self.base_domain = None
×
89
        if self.base_domain_name != self.domain_name:
×
90
            self.is_base_domain = False
×
91
            if self.base_domain_name not in Domain.base_domains:
×
92
                # Populate DMARC for parent.
93
                domain = trustymail.scan(
×
94
                    self.base_domain_name,
95
                    timeout,
96
                    smtp_timeout,
97
                    smtp_localhost,
98
                    smtp_ports,
99
                    smtp_cache,
100
                    {"mx": False, "starttls": False, "spf": False, "dmarc": True},
101
                    dns_hostnames,
102
                )
103
                Domain.base_domains[self.base_domain_name] = domain
×
104
            self.base_domain = Domain.base_domains[self.base_domain_name]
×
105

106
        # Start off assuming the host is live unless an error tells us otherwise.
107
        self.is_live = True
×
108

109
        # Keep entire record for potential future use.
110
        self.mx_records = None
×
111
        self.mx_records_dnssec = None
×
112
        self.spf = None
×
113
        self.spf_dnssec = None
×
114
        self.dmarc = None
×
115
        self.dmarc_dnssec = False
×
116
        self.dmarc_policy = None
×
117
        self.dmarc_subdomain_policy = None
×
118
        self.dmarc_pct = None
×
119
        self.dmarc_aggregate_uris = []
×
120
        self.dmarc_forensic_uris = []
×
121
        self.dmarc_has_aggregate_uri = False
×
122
        self.dmarc_has_forensic_uri = False
×
123
        self.dmarc_reports_address_error = False
×
124

125
        # Syntax validity - default spf to false as the lack of an SPF is a bad thing.
126
        self.valid_spf = False
×
127
        self.valid_dmarc = True
×
128
        self.syntax_errors = []
×
129

130
        # Mail Info
131
        self.mail_servers = None
×
132

133
        # A dictionary for each port for each entry in mail_servers.
134
        # The dictionary's values indicate:
135
        # 1. Whether or not the server is listening on the port
136
        # 2. Whether or not the server supports SMTP
137
        # 3. Whether or not the server supports STARTTLS
138
        self.starttls_results = {}
×
139

140
        # A list of any debugging information collected while scanning records.
141
        self.debug_info = []
×
142

143
        # A list of the ports tested for SMTP
144
        self.ports_tested = set()
×
145

146
    def has_mail(self):
×
147
        """Check if there are any mail servers associated with this domain."""
148
        if self.mail_servers is not None:
×
149
            return len(self.mail_servers) > 0
×
150
        return None
×
151

152
    def has_supports_smtp(self):
×
153
        """Check if any of the mail servers associated with this domain are listening and support SMTP."""
154
        result = None
×
155
        if len(self.starttls_results) > 0:
×
156
            result = (
×
157
                len(
158
                    filter(
159
                        lambda x: self.starttls_results[x]["supports_smtp"],
160
                        self.starttls_results.keys(),
161
                    )
162
                )
163
                > 0
164
            )
165
        return result
×
166

167
    def has_starttls(self):
×
168
        """Check if any of the mail servers associated with this domain are listening and support STARTTLS."""
169
        result = None
×
170
        if len(self.starttls_results) > 0:
×
171
            result = (
×
172
                len(
173
                    filter(
174
                        lambda x: self.starttls_results[x]["starttls"],
175
                        self.starttls_results.keys(),
176
                    )
177
                )
178
                > 0
179
            )
180
        return result
×
181

182
    def has_spf(self):
×
183
        """Check if this domain has any Sender Policy Framework records."""
184
        if self.spf is not None:
×
185
            return len(self.spf) > 0
×
186
        return None
×
187

188
    def has_dmarc(self):
×
189
        """Check if this domain has a Domain-based Message Authentication, Reporting, and Conformance record."""
190
        if self.dmarc is not None:
×
191
            return len(self.dmarc) > 0
×
192
        return None
×
193

194
    def add_mx_record(self, record):
×
195
        """Add a mail server record for this domain."""
196
        if self.mx_records is None:
×
197
            self.mx_records = []
×
198
        self.mx_records.append(record)
×
199
        # The rstrip is because dnspython's string representation of
200
        # the record will contain a trailing period if it is a FQDN.
201
        if self.mail_servers is None:
×
202
            self.mail_servers = []
×
203
        self.mail_servers.append(record.exchange.to_text().rstrip(".").lower())
×
204

205
    def parent_has_dmarc(self):
×
206
        """Check if a domain or its parent has a Domain-based Message Authentication, Reporting, and Conformance record."""
207
        ans = self.has_dmarc()
×
208
        if self.base_domain:
×
209
            ans = self.base_domain.has_dmarc()
×
210
        return ans
×
211

212
    def parent_dmarc_dnssec(self):
×
213
        """Get this domain or its parent's DMARC DNSSEC information."""
214
        ans = self.dmarc_dnssec
×
215
        if self.base_domain:
×
216
            ans = self.base_domain.dmarc_dnssec
×
217
        return ans
×
218

219
    def parent_valid_dmarc(self):
×
220
        """Check if this domain or its parent have a valid DMARC record."""
221
        ans = self.valid_dmarc
×
222
        if self.base_domain:
×
223
            return self.base_domain.valid_dmarc
×
224
        return ans
×
225

226
    def parent_dmarc_results(self):
×
227
        """Get this domain or its parent's DMARC information."""
228
        ans = format_list(self.dmarc)
×
229
        if self.base_domain:
×
230
            ans = format_list(self.base_domain.dmarc)
×
231
        return ans
×
232

233
    def get_dmarc_policy(self):
×
234
        """Get this domain or its parent's DMARC policy."""
235
        ans = self.dmarc_policy
×
236
        # If the policy was never set, or isn't in the list of valid
237
        # policies, check the parents.
238
        if ans is None or ans.lower() not in ["quarantine", "reject", "none"]:
×
239
            if self.base_domain:
×
240
                # We check the *subdomain* policy in case one was
241
                # explicitly set.  If one was not explicitly set then
242
                # the subdomain policy is populated with the value for
243
                # the domain policy by trustymail.py anyway, in
244
                # accordance with the RFC
245
                # (https://tools.ietf.org/html/rfc7489#section-6.3).
246
                ans = self.base_domain.get_dmarc_subdomain_policy()
×
247
            else:
248
                ans = None
×
249
        return ans
×
250

251
    def get_dmarc_subdomain_policy(self):
×
252
        """Get this domain or its parent's DMARC subdomain policy."""
253
        ans = self.dmarc_subdomain_policy
×
254
        # If the policy was never set, or isn't in the list of valid
255
        # policies, check the parents.
256
        if ans is None or ans.lower() not in ["quarantine", "reject", "none"]:
×
257
            if self.base_domain:
×
258
                ans = self.base_domain.get_dmarc_subdomain_policy()
×
259
            else:
260
                ans = None
×
261
        return ans
×
262

263
    def get_dmarc_pct(self):
×
264
        """Get this domain or its parent's DMARC percentage information."""
265
        ans = self.dmarc_pct
×
266
        if not ans and self.base_domain:
×
267
            # Check the parents
268
            ans = self.base_domain.get_dmarc_pct()
×
269
        return ans
×
270

271
    def get_dmarc_has_aggregate_uri(self):
×
272
        """Get this domain or its parent's DMARC aggregate URI."""
273
        ans = self.dmarc_has_aggregate_uri
×
274
        # If there are no aggregate URIs then check the parents.
275
        if not ans and self.base_domain:
×
276
            ans = self.base_domain.get_dmarc_has_aggregate_uri()
×
277
        return ans
×
278

279
    def get_dmarc_has_forensic_uri(self):
×
280
        """Check if this domain or its parent have a DMARC forensic URI."""
281
        ans = self.dmarc_has_forensic_uri
×
282
        # If there are no forensic URIs then check the parents.
283
        if not ans and self.base_domain:
×
284
            ans = self.base_domain.get_dmarc_has_forensic_uri()
×
285
        return ans
×
286

287
    def get_dmarc_aggregate_uris(self):
×
288
        """Get this domain or its parent's DMARC aggregate URIs."""
289
        ans = self.dmarc_aggregate_uris
×
290
        # If there are no aggregate URIs then check the parents.
291
        if not ans and self.base_domain:
×
292
            ans = self.base_domain.get_dmarc_aggregate_uris()
×
293
        return ans
×
294

295
    def get_dmarc_forensic_uris(self):
×
296
        """Get this domain or its parent's DMARC forensic URIs."""
297
        ans = self.dmarc_forensic_uris
×
298
        # If there are no forensic URIs then check the parents.
299
        if not ans and self.base_domain:
×
300
            ans = self.base_domain.get_dmarc_forensic_uris()
×
301
        return ans
×
302

303
    def generate_results(self):
×
304
        """Generate the results for this domain."""
305
        if len(self.starttls_results.keys()) == 0:
×
306
            domain_supports_smtp = None
×
307
            domain_supports_starttls = None
×
308
            mail_servers_that_support_smtp = None
×
309
            mail_servers_that_support_starttls = None
×
310
        else:
311
            mail_servers_that_support_smtp = [
×
312
                x
313
                for x in self.starttls_results.keys()
314
                if self.starttls_results[x]["supports_smtp"]
315
            ]
316
            mail_servers_that_support_starttls = [
×
317
                x
318
                for x in self.starttls_results.keys()
319
                if self.starttls_results[x]["starttls"]
320
            ]
321
            domain_supports_smtp = bool(mail_servers_that_support_smtp)
×
322
            domain_supports_starttls = domain_supports_smtp and all(
×
323
                [
324
                    self.starttls_results[x]["starttls"]
325
                    for x in mail_servers_that_support_smtp
326
                ]
327
            )
328

329
        results = OrderedDict(
×
330
            [
331
                ("Domain", self.domain_name),
332
                ("Base Domain", self.base_domain_name),
333
                ("Live", self.is_live),
334
                ("MX Record", self.has_mail()),
335
                ("MX Record DNSSEC", self.mx_records_dnssec),
336
                ("Mail Servers", format_list(self.mail_servers)),
337
                (
338
                    "Mail Server Ports Tested",
339
                    format_list([str(port) for port in self.ports_tested]),
340
                ),
341
                (
342
                    "Domain Supports SMTP Results",
343
                    format_list(mail_servers_that_support_smtp),
344
                ),
345
                # True if and only if at least one mail server speaks SMTP
346
                ("Domain Supports SMTP", domain_supports_smtp),
347
                (
348
                    "Domain Supports STARTTLS Results",
349
                    format_list(mail_servers_that_support_starttls),
350
                ),
351
                # True if and only if all mail servers that speak SMTP
352
                # also support STARTTLS
353
                ("Domain Supports STARTTLS", domain_supports_starttls),
354
                ("SPF Record", self.has_spf()),
355
                ("SPF Record DNSSEC", self.spf_dnssec),
356
                ("Valid SPF", self.valid_spf),
357
                ("SPF Results", format_list(self.spf)),
358
                ("DMARC Record", self.has_dmarc()),
359
                ("DMARC Record DNSSEC", self.dmarc_dnssec),
360
                ("Valid DMARC", self.has_dmarc() and self.valid_dmarc),
361
                ("DMARC Results", format_list(self.dmarc)),
362
                ("DMARC Record on Base Domain", self.parent_has_dmarc()),
363
                ("DMARC Record on Base Domain DNSSEC", self.parent_dmarc_dnssec()),
364
                (
365
                    "Valid DMARC Record on Base Domain",
366
                    self.parent_has_dmarc() and self.parent_valid_dmarc(),
367
                ),
368
                ("DMARC Results on Base Domain", self.parent_dmarc_results()),
369
                ("DMARC Policy", self.get_dmarc_policy()),
370
                ("DMARC Subdomain Policy", self.get_dmarc_subdomain_policy()),
371
                ("DMARC Policy Percentage", self.get_dmarc_pct()),
372
                (
373
                    "DMARC Aggregate Report URIs",
374
                    format_list(self.get_dmarc_aggregate_uris()),
375
                ),
376
                (
377
                    "DMARC Forensic Report URIs",
378
                    format_list(self.get_dmarc_forensic_uris()),
379
                ),
380
                ("DMARC Has Aggregate Report URI", self.get_dmarc_has_aggregate_uri()),
381
                ("DMARC Has Forensic Report URI", self.get_dmarc_has_forensic_uri()),
382
                (
383
                    "DMARC Reporting Address Acceptance Error",
384
                    self.dmarc_reports_address_error,
385
                ),
386
                ("Syntax Errors", format_list(self.syntax_errors)),
387
                ("Debug Info", format_list(self.debug_info)),
388
            ]
389
        )
390

391
        return results
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc