• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 18005d26-875b-4397-9529-ebc464e07316

16 May 2025 06:29PM UTC coverage: 85.352% (+0.03%) from 85.323%
18005d26-875b-4397-9529-ebc464e07316

Pull #5572

circleci

groovecoder
for MPP-3439: add IDNAEmailCleaner to clean email address domains with non-ASCII chars
Pull Request #5572: for MPP-3439: add IDNAEmailCleaner to clean email address domains with non-ASCII chars

2471 of 3617 branches covered (68.32%)

Branch coverage included in aggregate %.

58 of 59 new or added lines in 4 files covered. (98.31%)

61 existing lines in 2 files now uncovered.

17561 of 19853 relevant lines covered (88.46%)

9.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.64
/emails/tests/views_tests.py
1
import glob
1✔
2
import json
1✔
3
import logging
1✔
4
import os
1✔
5
import re
1✔
6
from copy import deepcopy
1✔
7
from datetime import UTC, datetime, timedelta
1✔
8
from email import message_from_string
1✔
9
from email.message import EmailMessage
1✔
10
from typing import Any, cast
1✔
11
from unittest._log import _LoggingWatcher
1✔
12
from unittest.mock import ANY, Mock, patch
1✔
13
from uuid import uuid4
1✔
14

15
from django.conf import settings
1✔
16
from django.contrib.auth.models import User
1✔
17
from django.core.exceptions import ObjectDoesNotExist
1✔
18
from django.http import HttpResponse
1✔
19
from django.test import Client, SimpleTestCase, TestCase, override_settings
1✔
20

21
import pytest
1✔
22
from allauth.socialaccount.models import SocialAccount
1✔
23
from botocore.exceptions import ClientError
1✔
24
from markus.main import MetricsRecord
1✔
25
from markus.testing import MetricsMock
1✔
26
from model_bakery import baker
1✔
27
from waffle.testutils import override_flag
1✔
28

29
from emails.models import (
1✔
30
    DeletedAddress,
31
    DomainAddress,
32
    RelayAddress,
33
    Reply,
34
    address_hash,
35
)
36
from emails.policy import relay_policy
1✔
37
from emails.types import AWS_SNSMessageJSON, OutgoingHeaders
1✔
38
from emails.utils import (
1✔
39
    InvalidFromHeader,
40
    b64_lookup_key,
41
    decode_dict_gza85,
42
    decrypt_reply_metadata,
43
    derive_reply_keys,
44
    encrypt_reply_metadata,
45
    get_domains_from_settings,
46
    get_message_id_bytes,
47
)
48
from emails.views import (
1✔
49
    EmailDroppedReason,
50
    RawComplaintData,
51
    ReplyHeadersNotFound,
52
    _build_disabled_mask_for_spam_email,
53
    _build_reply_requires_premium_email,
54
    _gather_complainers,
55
    _get_address,
56
    _get_address_if_exists,
57
    _get_complaint_data,
58
    _get_keys_from_headers,
59
    _get_mask_by_metrics_id,
60
    _record_receipt_verdicts,
61
    _replace_headers,
62
    _set_forwarded_first_reply,
63
    _sns_message,
64
    _sns_notification,
65
    log_email_dropped,
66
    reply_requires_premium_test,
67
    validate_sns_arn_and_type,
68
    wrapped_email_test,
69
)
70
from privaterelay.ftl_bundles import main
1✔
71
from privaterelay.glean.server_events import GLEAN_EVENT_MOZLOG_TYPE as GLEAN_LOG
1✔
72
from privaterelay.models import Profile
1✔
73
from privaterelay.tests.utils import (
1✔
74
    create_expected_glean_event,
75
    get_glean_event,
76
    log_extra,
77
    make_free_test_user,
78
    make_premium_test_user,
79
    upgrade_test_user_to_premium,
80
)
81

82
# Load the sns json fixtures from files
83
real_abs_cwd = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
1✔
84
single_rec_file = os.path.join(
1✔
85
    real_abs_cwd, "fixtures", "single_recipient_sns_body.json"
86
)
87

88

89
# Names of logs
90
INFO_LOG = "eventsinfo"
1✔
91
ERROR_LOG = "events"
1✔
92

93

94
def load_fixtures(file_suffix: str) -> dict[str, AWS_SNSMessageJSON | str]:
1✔
95
    """Load all fixtures with a particular suffix."""
96
    path = os.path.join(real_abs_cwd, "fixtures", "*" + file_suffix)
1✔
97
    ext = os.path.splitext(file_suffix)[1]
1✔
98
    fixtures: dict[str, AWS_SNSMessageJSON | str] = {}
1✔
99
    for fixture_file in glob.glob(path):
1✔
100
        file_name = os.path.basename(fixture_file)
1✔
101
        key = file_name[: -len(file_suffix)]
1✔
102
        assert key not in fixtures
1✔
103
        with open(fixture_file) as f:
1✔
104
            if ext == ".json":
1✔
105
                fixtures[key] = json.load(f)
1✔
106
            else:
107
                assert ext == ".email"
1✔
108
                fixtures[key] = f.read()
1✔
109
    return fixtures
1✔
110

111

112
def load_sns_fixtures(file_suffix: str) -> dict[str, AWS_SNSMessageJSON]:
1✔
113
    return cast(dict[str, AWS_SNSMessageJSON], load_fixtures(file_suffix + ".json"))
1✔
114

115

116
def load_email_fixtures(file_suffix: str) -> dict[str, str]:
1✔
117
    return cast(dict[str, str], load_fixtures(file_suffix + ".email"))
1✔
118

119

120
EMAIL_SNS_BODIES = load_sns_fixtures("_email_sns_body")
1✔
121
BOUNCE_SNS_BODIES = load_sns_fixtures("_bounce_sns_body")
1✔
122
INVALID_SNS_BODIES = load_sns_fixtures("_invalid_sns_body")
1✔
123
EMAIL_INCOMING = load_email_fixtures("_incoming")
1✔
124
EMAIL_EXPECTED = load_email_fixtures("_expected")
1✔
125

126
# Set mocked_function.side_effect = FAIL_TEST_IF_CALLED to safely disable a function
127
# for test and assert it was never called.
128
FAIL_TEST_IF_CALLED = Exception("This function should not have been called.")
1✔
129

130
# Set mocked ses_client.send_raw_email.side_effect = SEND_RAW_EMAIL_FAILED to
131
# simulate an error
132
SEND_RAW_EMAIL_FAILED = ClientError(
1✔
133
    operation_name="SES.send_raw_email",
134
    error_response={"Error": {"Code": "the code", "Message": "the message"}},
135
)
136

137

138
def create_email_from_notification(
1✔
139
    notification: AWS_SNSMessageJSON, text: str
140
) -> bytes:
141
    """
142
    Create an email from an SNS notification, return the serialized bytes.
143

144
    The email will have the headers from the notification and the `text` value as the
145
    plain text body.
146
    """
147
    message_json = notification.get("Message")
1✔
148
    assert message_json
1✔
149
    message = json.loads(message_json)
1✔
150
    assert "mail" in message
1✔
151
    mail_data = message["mail"]
1✔
152
    assert "content" not in mail_data
1✔
153
    email = EmailMessage()
1✔
154
    assert "headers" in mail_data
1✔
155
    for entry in mail_data["headers"]:
1✔
156
        email[entry["name"]] = entry["value"]
1✔
157
    assert email["Content-Type"].startswith("multipart/alternative")
1✔
158
    email.add_alternative(text, subtype="plain")
1✔
159
    return email.as_bytes()
1✔
160

161

162
def create_notification_from_email(email_text: str) -> AWS_SNSMessageJSON:
1✔
163
    """
164
    Create an SNS notification from a raw serialized email.
165

166
    The SNS notification is designed to be processed by _handle_received, and can be
167
    processed by _sns_inbound_logic and _sns_notification, which will pass it to
168
    _handle_received. It will not pass the external view sns_inbound, because it will
169
    fail signature checking, since it has a fake Signature and SigningCertURL.
170

171
    The SNS notification has a passing receipt, such as a passing spamVerdict,
172
    virusVerdict, and dmarcVerdict.
173

174
    The SNS notification will have the headers from the email and other mocked items.
175
    The email will be included in the notification body, not loaded from (mock) S3.
176
    """
177
    email = message_from_string(email_text, policy=relay_policy)
1✔
178
    topic_arn = "arn:aws:sns:us-east-1:168781634622:ses-inbound-grelay"
1✔
179
    if email["Message-ID"]:
1✔
180
        message_id = getattr(email["Message-ID"], "as_unstructured")
1✔
181
    else:
182
        message_id = None
1✔
183
    # This function cannot handle malformed To: addresses
184
    assert not getattr(email["To"], "defects")
1✔
185
    email_date = (
1✔
186
        getattr(email["Date"], "datetime")
187
        if "Date" in email
188
        else (datetime.now() - timedelta(minutes=5))
189
    )
190

191
    sns_message = {
1✔
192
        "notificationType": "Received",
193
        "mail": {
194
            "timestamp": email_date.isoformat(),
195
            # To handle invalid From address, find 'first' address with what looks like
196
            # an email portion and use that email, or fallback to invalid@example.com
197
            "source": next(
198
                (
199
                    addr.addr_spec
200
                    for addr in getattr(email["From"], "addresses")
201
                    if "@" in addr.addr_spec
202
                ),
203
                "invalid@example.com",
204
            ),
205
            "messageId": message_id,
206
            "destination": [
207
                addr.addr_spec for addr in getattr(email["To"], "addresses")
208
            ],
209
            "headersTruncated": False,
210
            "headers": [
211
                {"name": _h, "value": str(getattr(_v, "as_unstructured"))}
212
                for _h, _v in email.items()
213
            ],
214
            "commonHeaders": {
215
                "from": [getattr(email["From"], "as_unstructured")],
216
                "date": email["Date"],
217
                "to": [str(addr) for addr in getattr(email["To"], "addresses")],
218
                "messageId": message_id,
219
                "subject": getattr(email["Subject"], "as_unstructured"),
220
            },
221
        },
222
        "receipt": {
223
            "timestamp": (email_date + timedelta(seconds=1)).isoformat(),
224
            "processingTimeMillis": 1001,
225
            "recipients": [
226
                addr.addr_spec for addr in getattr(email["To"], "addresses")
227
            ],
228
            "spamVerdict": {"status": "PASS"},
229
            "virusVerdict": {"status": "PASS"},
230
            "spfVerdict": {"status": "PASS"},
231
            "dkimVerdict": {"status": "PASS"},
232
            "dmarcVerdict": {"status": "PASS"},
233
            "action": {"type": "SNS", "topicArn": topic_arn, "encoding": "UTF8"},
234
        },
235
        "content": email_text,
236
    }
237
    base_url = "https://sns.us-east-1.amazonaws.example.com"
1✔
238
    sns_notification = {
1✔
239
        "Type": "Notification",
240
        "MessageId": str(uuid4()),
241
        "TopicArn": topic_arn,
242
        "Subject": str(getattr(email["Subject"], "as_unstructured")),
243
        "Message": json.dumps(sns_message),
244
        "Timestamp": (email_date + timedelta(seconds=2)).isoformat(),
245
        "SignatureVersion": "1",
246
        "Signature": "invalid-signature",
247
        "SigningCertURL": f"{base_url}/SimpleNotificationService-abcd1234.pem",
248
        "UnsubscribeURL": (
249
            f"{base_url}/?Action=Unsubscribe&SubscriptionArn={topic_arn}:{uuid4()}"
250
        ),
251
    }
252
    return sns_notification
1✔
253

254

255
def assert_email_equals_fixture(
1✔
256
    output_email: str,
257
    fixture_name: str,
258
    replace_mime_boundaries: bool = False,
259
    fixture_replace: tuple[str, str] | None = None,
260
) -> None:
261
    """
262
    Assert the output equals the expected email, after optional replacements.
263

264
    If Python generated new sections in the email, such as creating an HTML section for
265
    a text-only email, then set replace_mime_boundaries=True to replace MIME boundaries
266
    with text that does not change between runs.
267

268
    If the output does not match, write the output to the fixtures directory. This
269
    allows using other diff tools, and makes it easy to capture new outputs when the
270
    email format changes.
271
    """
272
    expected = EMAIL_EXPECTED[fixture_name]
1✔
273

274
    # If requested, convert MIME boundaries in the the output_email
275
    if replace_mime_boundaries:
1✔
276
        test_output_email = _replace_mime_boundaries(output_email)
1✔
277
    else:
278
        test_output_email = output_email
1✔
279

280
    # If requested, replace a string
281
    if fixture_replace:
1✔
282
        orig_str, new_str = fixture_replace
1✔
283
        expected = expected.replace(orig_str, new_str)
1✔
284
        fixture_name += "_MODIFIED"
1✔
285

286
    if test_output_email != expected:  # pragma: no cover
287
        # Write the actual output as an aid for debugging or fixture updates
288
        path = os.path.join(real_abs_cwd, "fixtures", fixture_name + "_actual.email")
289
        open(path, "w").write(test_output_email)
290
    assert test_output_email == expected
1✔
291

292

293
def _replace_mime_boundaries(email: str) -> str:
1✔
294
    """
295
    Replace MIME boundary strings.  The replacement is "==[BOUNDARY#]==", where "#" is
296
    the order the string appears in the email.
297

298
    Per RFC 1521, 7.2.1, MIME boundary strings must not appear in the bounded content.
299
    Most email providers use random number generators when finding a unique string.
300
    Python's email library generates a large random value for email boundary strings.
301
    If the Python email library generates boundary strings (for example, creating an
302
    HTML section for a text-only email), the boundary strings are different with each
303
    test run. The replacement strings do not vary between test runs, and are still
304
    unique for different MIME sections.
305
    """
306

307
    generic_email_lines: list[str] = []
1✔
308
    mime_boundaries: dict[str, str] = {}
1✔
309
    boundary_re = re.compile(r' boundary="(.*)"')
1✔
310
    for line in email.splitlines():
1✔
311
        if " boundary=" in line:
1✔
312
            # Capture the MIME boundary and replace with generic
313
            cap = boundary_re.search(line)
1✔
314
            assert cap
1✔
315
            boundary = cap.group(1)
1✔
316
            assert boundary not in mime_boundaries
1✔
317
            generic_boundary = f"==[BOUNDARY{len(mime_boundaries)}]=="
1✔
318
            mime_boundaries[boundary] = generic_boundary
1✔
319

320
        generic_line = line
1✔
321
        for boundary, generic_boundary in mime_boundaries.items():
1✔
322
            generic_line = generic_line.replace(boundary, generic_boundary)
1✔
323
        generic_email_lines.append(generic_line)
1✔
324

325
    generic_email = "\n".join(generic_email_lines) + "\n"
1✔
326
    return generic_email
1✔
327

328

329
def assert_log_email_dropped(
1✔
330
    caplog: _LoggingWatcher,
331
    reason: EmailDroppedReason,
332
    mask: RelayAddress | DomainAddress,
333
    is_reply: bool = False,
334
    can_retry: bool = False,
335
) -> None:
336
    """Assert that there is a log entry that an email was dropped."""
337
    drop_log = None
1✔
338
    for record in caplog.records:
1✔
339
        if record.msg == "email_dropped":
1✔
340
            assert drop_log is None, "duplicate email_dropped log entry"
1✔
341
            drop_log = record
1✔
342
    assert drop_log is not None, "email_dropped log entry not found."
1✔
343
    assert drop_log.levelno == logging.INFO
1✔
344
    expected_extra = {
1✔
345
        "reason": reason,
346
        "fxa_id": getattr(mask.user.profile.fxa, "uid", ""),
347
        "mask_id": mask.metrics_id,
348
        "is_random_mask": isinstance(mask, RelayAddress),
349
        "is_reply": is_reply,
350
        "can_retry": can_retry,
351
    }
352
    if expected_extra["fxa_id"] == "":
1✔
353
        del expected_extra["fxa_id"]
1✔
354
    assert log_extra(drop_log) == expected_extra
1✔
355

356

357
@override_settings(RELAY_FROM_ADDRESS="reply@relay.example.com")
1✔
358
class SNSNotificationTestBase(TestCase):
1✔
359
    """Base class for tests of _sns_notification"""
360

361
    def setUp(self) -> None:
1✔
362
        remove_s3_patcher = patch("emails.views.remove_message_from_s3")
1✔
363
        self.mock_remove_message_from_s3 = remove_s3_patcher.start()
1✔
364
        self.addCleanup(remove_s3_patcher.stop)
1✔
365

366
        self.mock_send_raw_email = Mock(
1✔
367
            spec_set=[], return_value={"MessageId": str(uuid4())}
368
        )
369
        send_raw_email_patcher = patch(
1✔
370
            "emails.apps.EmailsConfig.ses_client",
371
            spec_set=["send_raw_email"],
372
            send_raw_email=self.mock_send_raw_email,
373
        )
374
        send_raw_email_patcher.start()
1✔
375
        self.addCleanup(send_raw_email_patcher.stop)
1✔
376

377
    def check_sent_email_matches_fixture(
1✔
378
        self,
379
        fixture_name: str,
380
        fixture_replace: tuple[str, str] | None = None,
381
        replace_mime_boundaries: bool = False,
382
        expected_source: str | None = None,
383
        expected_destination: str | None = None,
384
    ) -> None:
385
        """
386
        Extract the email and check against the expected email.
387

388
        name: the name of the test fixture to check against the email
389
        replace_mime_boundaries: if true, replace randomized MIME boundaries with
390
          sequential test versions
391
        expected_source: if set, assert that SES source address matches this string
392
        expected_destination: if set, assert that the one SES destination email matches
393
        """
394
        self.mock_send_raw_email.assert_called_once()
1✔
395
        source = self.mock_send_raw_email.call_args[1]["Source"]
1✔
396
        destinations = self.mock_send_raw_email.call_args[1]["Destinations"]
1✔
397
        assert len(destinations) == 1
1✔
398
        raw_message = self.mock_send_raw_email.call_args[1]["RawMessage"]["Data"]
1✔
399
        assert "\n\n" in raw_message, "Never found message body!"
1✔
400
        if expected_source is not None:
1✔
401
            assert source == expected_source
1✔
402
        if expected_destination is not None:
1✔
403
            assert destinations[0] == expected_destination
1✔
404
        assert_email_equals_fixture(
1✔
405
            raw_message, fixture_name, replace_mime_boundaries, fixture_replace
406
        )
407

408

409
class SNSNotificationIncomingTest(SNSNotificationTestBase):
1✔
410
    """Tests for _sns_notification for incoming emails to Relay users"""
411

412
    def setUp(self) -> None:
1✔
413
        super().setUp()
1✔
414
        self.user = baker.make(User, email="user@example.com")
1✔
415
        self.profile = self.user.profile
1✔
416
        self.profile.last_engagement = datetime.now(UTC)
1✔
417
        self.profile.save()
1✔
418
        self.sa: SocialAccount = baker.make(
1✔
419
            SocialAccount, user=self.user, provider="fxa"
420
        )
421
        self.ra = baker.make(
1✔
422
            RelayAddress, user=self.user, address="ebsbdsan7", domain=2
423
        )
424
        self.premium_user = make_premium_test_user()
1✔
425
        self.premium_user.profile.subdomain = "subdomain"
1✔
426
        self.premium_user.profile.last_engagement = datetime.now(UTC)
1✔
427
        self.premium_user.profile.save()
1✔
428

429
    def test_single_recipient_sns_notification(self) -> None:
1✔
430
        pre_sns_notification_last_engagement = self.ra.user.profile.last_engagement
1✔
431
        assert pre_sns_notification_last_engagement is not None
1✔
432
        _sns_notification(EMAIL_SNS_BODIES["single_recipient"])
1✔
433

434
        self.check_sent_email_matches_fixture(
1✔
435
            "single_recipient",
436
            expected_source="replies@default.com",
437
            expected_destination="user@example.com",
438
        )
439
        self.ra.refresh_from_db()
1✔
440
        assert self.ra.num_forwarded == 1
1✔
441
        assert self.ra.last_used_at is not None
1✔
442
        assert (datetime.now(tz=UTC) - self.ra.last_used_at).seconds < 2.0
1✔
443
        self.ra.user.profile.refresh_from_db()
1✔
444
        assert self.ra.user.profile.last_engagement is not None
1✔
445
        assert (
1✔
446
            self.ra.user.profile.last_engagement > pre_sns_notification_last_engagement
447
        )
448

449
    def test_single_french_recipient_sns_notification(self) -> None:
1✔
450
        """
451
        The email content can contain non-ASCII characters.
452

453
        In this case, the HTML content is wrapped in the Relay header translated
454
        to French.
455
        """
456
        self.sa.extra_data = {"locale": "fr, fr-fr, en-us, en"}
1✔
457
        self.sa.save()
1✔
458
        assert self.profile.language == "fr"
1✔
459

460
        _sns_notification(EMAIL_SNS_BODIES["single_recipient"])
1✔
461

462
        self.check_sent_email_matches_fixture("single_recipient_fr")
1✔
463
        self.ra.refresh_from_db()
1✔
464
        assert self.ra.num_forwarded == 1
1✔
465
        assert self.ra.last_used_at is not None
1✔
466
        assert (datetime.now(tz=UTC) - self.ra.last_used_at).seconds < 2.0
1✔
467

468
    def test_list_email_sns_notification(self) -> None:
1✔
469
        """By default, list emails should still forward."""
470
        _sns_notification(EMAIL_SNS_BODIES["single_recipient_list"])
1✔
471

472
        self.check_sent_email_matches_fixture("single_recipient_list")
1✔
473
        self.ra.refresh_from_db()
1✔
474
        assert self.ra.num_forwarded == 1
1✔
475
        assert self.ra.last_used_at is not None
1✔
476
        assert (datetime.now(tz=UTC) - self.ra.last_used_at).seconds < 2.0
1✔
477

478
    def test_block_list_email_sns_notification(self) -> None:
1✔
479
        """When an alias is blocking list emails, list emails should not forward."""
480
        self.ra.user = self.premium_user
1✔
481
        self.ra.save()
1✔
482
        self.ra.block_list_emails = True
1✔
483
        self.ra.save()
1✔
484

485
        _sns_notification(EMAIL_SNS_BODIES["single_recipient_list"])
1✔
486

487
        self.mock_send_raw_email.assert_not_called()
1✔
488
        self.ra.refresh_from_db()
1✔
489
        assert self.ra.num_forwarded == 0
1✔
490
        assert self.ra.num_blocked == 1
1✔
491

492
    def test_block_list_email_former_premium_user(self) -> None:
1✔
493
        """List emails are forwarded for formerly premium users."""
494
        self.ra.user = self.premium_user
1✔
495
        self.ra.save()
1✔
496
        self.ra.block_list_emails = True
1✔
497
        self.ra.save()
1✔
498

499
        # Remove premium from the user
500
        assert (fxa_account := self.premium_user.profile.fxa)
1✔
501
        fxa_account.extra_data["subscriptions"] = []
1✔
502
        fxa_account.save()
1✔
503
        assert not self.premium_user.profile.has_premium
1✔
504
        self.ra.refresh_from_db()
1✔
505

506
        _sns_notification(EMAIL_SNS_BODIES["single_recipient_list"])
1✔
507

508
        self.mock_send_raw_email.assert_called_once()
1✔
509
        self.ra.refresh_from_db()
1✔
510
        assert self.ra.num_forwarded == 1
1✔
511
        assert self.ra.num_blocked == 0
1✔
512
        assert self.ra.block_list_emails is False
1✔
513

514
    def test_spamVerdict_FAIL_default_still_relays(self) -> None:
1✔
515
        """For a default user, spam email will still relay."""
516
        with self.assertLogs(GLEAN_LOG) as caplog:
1✔
517
            _sns_notification(EMAIL_SNS_BODIES["spamVerdict_FAIL"])
1✔
518

519
        self.mock_send_raw_email.assert_called_once()
1✔
520
        self.ra.refresh_from_db()
1✔
521
        assert self.ra.num_forwarded == 1
1✔
522

523
        assert (event := get_glean_event(caplog)) is not None
1✔
524
        assert event["category"] == "email"
1✔
525
        assert event["name"] == "forwarded"
1✔
526

527
    @override_settings(STATSD_ENABLED=True)
1✔
528
    def test_spamVerdict_FAIL_auto_block_doesnt_relay(self) -> None:
1✔
529
        """When a user has auto_block_spam=True, spam will not relay."""
530
        self.profile.auto_block_spam = True
1✔
531
        self.profile.save()
1✔
532

533
        with self.assertLogs(INFO_LOG) as caplog, MetricsMock() as mm:
1✔
534
            _sns_notification(EMAIL_SNS_BODIES["spamVerdict_FAIL"])
1✔
535

536
        assert_log_email_dropped(caplog, "auto_block_spam", self.ra)
1✔
537
        mm.assert_incr_once("email_auto_suppressed_for_spam")
1✔
538

539
        self.mock_send_raw_email.assert_not_called()
1✔
540
        self.ra.refresh_from_db()
1✔
541
        assert self.ra.num_forwarded == 0
1✔
542

543
    def test_domain_recipient(self) -> None:
1✔
544
        with self.assertLogs(GLEAN_LOG) as caplog:
1✔
545
            _sns_notification(EMAIL_SNS_BODIES["domain_recipient"])
1✔
546

547
        self.check_sent_email_matches_fixture(
1✔
548
            "domain_recipient", expected_destination="premium@email.com"
549
        )
550
        da = DomainAddress.objects.get(user=self.premium_user, address="wildcard")
1✔
551
        assert da.num_forwarded == 1
1✔
552
        assert da.last_used_at
1✔
553
        assert (datetime.now(tz=UTC) - da.last_used_at).seconds < 2.0
1✔
554

555
        mask_event = get_glean_event(caplog, "email_mask", "created")
1✔
556
        assert mask_event is not None
1✔
557
        shared_extra_items = {
1✔
558
            "n_domain_masks": "1",
559
            "is_random_mask": "false",
560
        }
561
        expected_mask_event = create_expected_glean_event(
1✔
562
            category="email_mask",
563
            name="created",
564
            user=self.premium_user,
565
            extra_items=shared_extra_items
566
            | {"has_website": "false", "created_by_api": "false"},
567
            event_time=mask_event["timestamp"],
568
        )
569
        assert mask_event == expected_mask_event
1✔
570

571
        email_event = get_glean_event(caplog, "email", "forwarded")
1✔
572
        assert email_event is not None
1✔
573
        expected_email_event = create_expected_glean_event(
1✔
574
            category="email",
575
            name="forwarded",
576
            user=self.premium_user,
577
            extra_items=shared_extra_items | {"is_reply": "false"},
578
            event_time=email_event["timestamp"],
579
        )
580
        assert email_event == expected_email_event
1✔
581

582
    def test_successful_email_relay_message_removed_from_s3(self) -> None:
1✔
583
        _sns_notification(EMAIL_SNS_BODIES["single_recipient"])
1✔
584

585
        self.mock_send_raw_email.assert_called_once()
1✔
586
        self.mock_remove_message_from_s3.assert_called_once()
1✔
587
        self.ra.refresh_from_db()
1✔
588
        assert self.ra.num_forwarded == 1
1✔
589
        assert self.ra.last_used_at is not None
1✔
590
        assert (datetime.now(tz=UTC) - self.ra.last_used_at).seconds < 2.0
1✔
591

592
    def test_unsuccessful_email_relay_message_not_removed_from_s3(self) -> None:
1✔
593
        self.mock_send_raw_email.side_effect = SEND_RAW_EMAIL_FAILED
1✔
594
        response = _sns_notification(EMAIL_SNS_BODIES["single_recipient"])
1✔
595
        assert response.status_code == 503
1✔
596

597
        self.mock_send_raw_email.assert_called_once()
1✔
598
        self.mock_remove_message_from_s3.assert_not_called()
1✔
599
        self.ra.refresh_from_db()
1✔
600
        assert self.ra.num_forwarded == 0
1✔
601
        assert self.ra.last_used_at is None
1✔
602

603
    @patch("emails.views.generate_from_header", side_effect=InvalidFromHeader())
1✔
604
    def test_invalid_from_header(self, mock_generate_from_header: Mock) -> None:
1✔
605
        """For MPP-3407, show logging for failed from address"""
606
        with self.assertLogs(INFO_LOG) as caplog:
1✔
607
            response = _sns_notification(EMAIL_SNS_BODIES["single_recipient"])
1✔
608
        assert response.status_code == 503
1✔
609
        self.ra.refresh_from_db()
1✔
610
        assert self.ra.num_forwarded == 0
1✔
611
        assert self.ra.last_used_at is None
1✔
612

613
        log1, log2 = caplog.records
1✔
614
        assert log1.levelno == logging.ERROR
1✔
615
        assert log_extra(log1) == {
1✔
616
            "from_address": "fxastage@protonmail.com",
617
            "source": "fxastage@protonmail.com",
618
            "common_headers_from": ["fxastage <fxastage@protonmail.com>"],
619
            "headers_from": [
620
                {"name": "From", "value": "fxastage <fxastage@protonmail.com>"}
621
            ],
622
        }
623
        assert_log_email_dropped(caplog, "error_from_header", self.ra, can_retry=True)
1✔
624

625
    def test_inline_image(self) -> None:
1✔
626
        email_text = EMAIL_INCOMING["inline_image"]
1✔
627
        test_sns_notification = create_notification_from_email(email_text)
1✔
628
        _sns_notification(test_sns_notification)
1✔
629

630
        self.check_sent_email_matches_fixture("inline_image")
1✔
631
        self.mock_remove_message_from_s3.assert_called_once()
1✔
632
        self.ra.refresh_from_db()
1✔
633
        assert self.ra.num_forwarded == 1
1✔
634
        assert self.ra.last_used_at
1✔
635
        assert (datetime.now(tz=UTC) - self.ra.last_used_at).seconds < 2.0
1✔
636

637
    def test_russian_spam(self) -> None:
1✔
638
        """
639
        Base64-encoded input content can be processed.
640

641
        Python picks the output encoding it thinks will be most compact. See:
642
        https://docs.python.org/3/library/email.contentmanager.html#email.contentmanager.set_content
643

644
        The plain text remains in base64, due to high proportion of Russian characters.
645
        The HTML version is converted to quoted-printable, due to the high proportion
646
        of ASCII characters.
647
        """
648
        email_text = EMAIL_INCOMING["russian_spam"]
1✔
649
        test_sns_notification = create_notification_from_email(email_text)
1✔
650
        _sns_notification(test_sns_notification)
1✔
651

652
        self.check_sent_email_matches_fixture("russian_spam")
1✔
653
        self.mock_remove_message_from_s3.assert_called_once()
1✔
654
        self.ra.refresh_from_db()
1✔
655
        assert self.ra.num_forwarded == 1
1✔
656
        assert self.ra.last_used_at
1✔
657
        assert (datetime.now(tz=UTC) - self.ra.last_used_at).seconds < 2.0
1✔
658

659
    @patch("emails.views.info_logger")
1✔
660
    def test_plain_text(self, mock_logger: Mock) -> None:
1✔
661
        """A plain-text only email gets an HTML part."""
662
        email_text = EMAIL_INCOMING["plain_text"]
1✔
663
        test_sns_notification = create_notification_from_email(email_text)
1✔
664
        _sns_notification(test_sns_notification)
1✔
665

666
        self.check_sent_email_matches_fixture(
1✔
667
            "plain_text", replace_mime_boundaries=True
668
        )
669
        self.mock_remove_message_from_s3.assert_called_once()
1✔
670
        self.ra.refresh_from_db()
1✔
671
        assert self.ra.num_forwarded == 1
1✔
672
        assert self.ra.last_used_at
1✔
673
        assert (datetime.now(tz=UTC) - self.ra.last_used_at).seconds < 2.0
1✔
674
        mock_logger.info.assert_not_called()
1✔
675

676
    @patch("emails.views.info_logger")
1✔
677
    def test_from_with_unquoted_commas_is_parsed(self, mock_logger: Mock) -> None:
1✔
678
        """
679
        A From: header with commas in an unquoted display is forwarded.
680

681
        AWS parses these headers as a single email, Python as a list of emails.
682
        One of the root causes of MPP-3407.
683
        """
684
        email_text = EMAIL_INCOMING["emperor_norton"]
1✔
685
        test_sns_notification = create_notification_from_email(email_text)
1✔
686
        _sns_notification(test_sns_notification)
1✔
687

688
        self.check_sent_email_matches_fixture(
1✔
689
            "emperor_norton", replace_mime_boundaries=True
690
        )
691

692
        expected_header_errors = [
1✔
693
            {
694
                "header": "From",
695
                "direction": "in",
696
                "defect_count": 4,
697
                "parsed_value": (
698
                    '"Norton I.",'
699
                    " Emperor of the United States <norton@sf.us.example.com>"
700
                ),
701
                "raw_value": (
702
                    "Norton I.,"
703
                    " Emperor of the United States <norton@sf.us.example.com>"
704
                ),
705
            }
706
        ]
707

708
        mock_logger.info.assert_called_once_with(
1✔
709
            "_handle_received: forwarding issues",
710
            extra={"issues": {"headers": expected_header_errors}},
711
        )
712

713
    @patch("emails.views.info_logger")
1✔
714
    def test_from_with_nested_brackets_is_error(self, mock_logger: Mock) -> None:
1✔
715
        email_text = EMAIL_INCOMING["nested_brackets_service"]
1✔
716
        test_sns_notification = create_notification_from_email(email_text)
1✔
717
        result = _sns_notification(test_sns_notification)
1✔
718
        assert result.status_code == 400
1✔
719
        self.mock_send_raw_email.assert_not_called()
1✔
720
        mock_logger.error.assert_called_once_with(
1✔
721
            "_handle_received: no from address",
722
            extra={
723
                "source": "invalid@example.com",
724
                "common_headers_from": [
725
                    "The Service <The Service <hello@theservice.example.com>>"
726
                ],
727
            },
728
        )
729
        mock_logger.info.assert_not_called()
1✔
730

731
    @patch("emails.views.info_logger")
1✔
732
    def test_invalid_message_id_is_forwarded(self, mock_logger: Mock) -> None:
1✔
733
        email_text = EMAIL_INCOMING["message_id_in_brackets"]
1✔
734
        test_sns_notification = create_notification_from_email(email_text)
1✔
735

736
        result = _sns_notification(test_sns_notification)
1✔
737
        assert result.status_code == 200
1✔
738
        self.check_sent_email_matches_fixture(
1✔
739
            "message_id_in_brackets", replace_mime_boundaries=True
740
        )
741
        expected_header_errors = [
1✔
742
            {
743
                "header": "Message-ID",
744
                "direction": "in",
745
                "defect_count": 1,
746
                "parsed_value": "<[d7c5838b5ab944f89e3f0c1b85674aef====@example.com]>",
747
                "raw_value": "<[d7c5838b5ab944f89e3f0c1b85674aef====@example.com]>",
748
            }
749
        ]
750
        mock_logger.info.assert_called_once_with(
1✔
751
            "_handle_received: forwarding issues",
752
            extra={"issues": {"headers": expected_header_errors}},
753
        )
754

755
    @patch("emails.views.info_logger")
1✔
756
    def test_header_with_encoded_trailing_newline_is_forwarded(
1✔
757
        self, mock_logger: Mock
758
    ) -> None:
759
        """
760
        A header with a trailing encoded newline is stripped.
761
        """
762
        email_text = EMAIL_INCOMING["encoded_trailing_newline"]
1✔
763
        test_sns_notification = create_notification_from_email(email_text)
1✔
764
        _sns_notification(test_sns_notification)
1✔
765

766
        self.check_sent_email_matches_fixture("encoded_trailing_newline")
1✔
767
        self.mock_remove_message_from_s3.assert_called_once()
1✔
768
        self.ra.refresh_from_db()
1✔
769
        assert self.ra.num_forwarded == 1
1✔
770
        assert self.ra.last_used_at
1✔
771
        assert (datetime.now(tz=UTC) - self.ra.last_used_at).seconds < 2.0
1✔
772
        expected_header_errors = [
1✔
773
            {
774
                "header": "Subject",
775
                "direction": "in",
776
                "defect_count": 1,
777
                "parsed_value": "An encoded newline\n",
778
                "raw_value": "An =?UTF-8?Q?encoded_newline=0A?=",
779
            }
780
        ]
781
        mock_logger.info.assert_called_once_with(
1✔
782
            "_handle_received: forwarding issues",
783
            extra={"issues": {"headers": expected_header_errors}},
784
        )
785

786
    @override_flag("developer_mode", active=True)
1✔
787
    @patch("emails.views.info_logger")
1✔
788
    def test_developer_mode_no_label(self, mock_logger: Mock) -> None:
1✔
789
        """Developer mode does nothing special without mask label"""
790
        _sns_notification(EMAIL_SNS_BODIES["single_recipient"])
1✔
791
        self.check_sent_email_matches_fixture(
1✔
792
            "single_recipient",
793
            expected_source="replies@default.com",
794
            expected_destination="user@example.com",
795
        )
796
        self.ra.refresh_from_db()
1✔
797
        assert self.ra.num_forwarded == 1
1✔
798
        assert self.ra.last_used_at is not None
1✔
799
        mock_logger.info.assert_not_called()
1✔
800

801
    @override_flag("developer_mode", active=True)
1✔
802
    @patch("emails.views.info_logger")
1✔
803
    def test_developer_mode_simulate_complaint(self, mock_logger: Mock) -> None:
1✔
804
        """Developer mode with 'DEV:simulate_complaint' label sends to simulator"""
805
        self.ra.description = "test123 DEV:simulate_complaint"
1✔
806
        self.ra.save()
1✔
807

808
        _sns_notification(EMAIL_SNS_BODIES["single_recipient"])
1✔
809
        expected_email = f"complaint+{self.ra.metrics_id}@simulator.amazonses.com"
1✔
810

811
        self.check_sent_email_matches_fixture(
1✔
812
            "single_recipient",
813
            expected_source="replies@default.com",
814
            expected_destination=expected_email,
815
            fixture_replace=(
816
                f"To: {self.user.email}",
817
                f"To: {expected_email}",
818
            ),
819
        )
820
        self.ra.refresh_from_db()
1✔
821
        assert self.ra.num_forwarded == 1
1✔
822
        assert self.ra.last_used_at is not None
1✔
823
        log_group_id: str | None = None
1✔
824
        parts = ["", "", "", ""]
1✔
825
        for callnum, call in enumerate(mock_logger.info.mock_calls):
1✔
826
            assert call.args == ("_handle_received: developer_mode",)
1✔
827
            extra = call.kwargs["extra"]
1✔
828
            assert extra["mask_id"] == self.ra.metrics_id
1✔
829
            assert extra["dev_action"] == "simulate_complaint"
1✔
830
            assert extra["part"] == callnum
1✔
831
            assert extra["parts"] == 4
1✔
832
            if log_group_id is None:
1✔
833
                log_group_id = extra["log_group_id"]
1✔
834
                assert log_group_id
1✔
835
                assert isinstance(log_group_id, str)
1✔
836
            else:
837
                assert extra["log_group_id"] == log_group_id
1✔
838
            parts[extra["part"]] = extra["notification_gza85"]
1✔
839
        log_notification = decode_dict_gza85("\n".join(parts))
1✔
840
        expected_log_notification = json.loads(
1✔
841
            EMAIL_SNS_BODIES["single_recipient"]["Message"]
842
        )
843
        assert log_notification == expected_log_notification
1✔
844

845
    @override_flag("developer_mode", active=True)
1✔
846
    @patch("emails.views.info_logger")
1✔
847
    def test_developer_mode_simulate_complaint_domain_address(
1✔
848
        self, mock_logger: Mock
849
    ) -> None:
850
        """Domain addresses can have 'DEV:simulate_complaint' label"""
851
        domain_address = DomainAddress.objects.create(
1✔
852
            address="wildcard",
853
            user=self.premium_user,
854
            description="DEV:simulate_complaint",
855
        )
856
        _sns_notification(EMAIL_SNS_BODIES["domain_recipient"])
1✔
857
        expected_email = (
1✔
858
            f"complaint+{domain_address.metrics_id}@simulator.amazonses.com"
859
        )
860

861
        self.check_sent_email_matches_fixture(
1✔
862
            "domain_recipient",
863
            expected_source="replies@default.com",
864
            expected_destination=expected_email,
865
            fixture_replace=(
866
                f"To: {self.premium_user.email}",
867
                f"To: {expected_email}",
868
            ),
869
        )
870
        domain_address.refresh_from_db()
1✔
871
        assert domain_address.num_forwarded == 1
1✔
872
        assert domain_address.last_used_at is not None
1✔
873

874
        mock_logger.info.assert_called_with(
1✔
875
            "_handle_received: developer_mode", extra=ANY
876
        )
877

878

879
class SNSNotificationRepliesTest(SNSNotificationTestBase):
1✔
880
    """Tests for _sns_notification for replies from Relay users"""
881

882
    def setUp(self) -> None:
1✔
883
        super().setUp()
1✔
884

885
        # Create a premium user matching the s3_stored_replies sender
886
        self.user = baker.make(User, email="source@sender.com")
1✔
887
        self.user.profile.server_storage = True
1✔
888
        self.user.profile.date_subscribed = datetime.now(tz=UTC)
1✔
889
        self.user.profile.last_engagement = datetime.now(tz=UTC)
1✔
890
        self.user.profile.save()
1✔
891
        self.pre_reply_last_engagement = self.user.profile.last_engagement
1✔
892
        upgrade_test_user_to_premium(self.user)
1✔
893

894
        # Create a Reply record matching the s3_stored_replies headers
895
        lookup_key, encryption_key = derive_reply_keys(
1✔
896
            get_message_id_bytes("CA+J4FJFw0TXCr63y9dGcauvCGaZ7pXxspzOjEDhRpg5Zh4ziWg")
897
        )
898
        metadata = {
1✔
899
            "message-id": str(uuid4()),
900
            "from": "sender@external.example.com",
901
        }
902
        encrypted_metadata = encrypt_reply_metadata(encryption_key, metadata)
1✔
903
        self.relay_address = baker.make(
1✔
904
            RelayAddress, user=self.user, address="a1b2c3d4"
905
        )
906
        Reply.objects.create(
1✔
907
            lookup=b64_lookup_key(lookup_key),
908
            encrypted_metadata=encrypted_metadata,
909
            relay_address=self.relay_address,
910
        )
911

912
        get_message_content_patcher = patch("emails.views.get_message_content_from_s3")
1✔
913
        self.mock_get_content = get_message_content_patcher.start()
1✔
914
        self.addCleanup(get_message_content_patcher.stop)
1✔
915

916
    def successful_reply_test_implementation(
1✔
917
        self, text: str, expected_fixture_name: str
918
    ) -> None:
919
        """The headers of a reply refer to the Relay mask."""
920

921
        self.mock_get_content.return_value = create_email_from_notification(
1✔
922
            EMAIL_SNS_BODIES["s3_stored_replies"], text=text
923
        )
924

925
        # Successfully reply to a previous sender
926
        with self.assertLogs(GLEAN_LOG) as caplog:
1✔
927
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored_replies"])
1✔
928
        assert response.status_code == 200
1✔
929

930
        assert (event := get_glean_event(caplog)) is not None
1✔
931
        assert event["category"] == "email"
1✔
932
        assert event["name"] == "forwarded"
1✔
933
        assert event["extra"]["is_reply"] == "true"
1✔
934

935
        self.mock_remove_message_from_s3.assert_called_once()
1✔
936
        self.mock_get_content.assert_called_once()
1✔
937
        self.check_sent_email_matches_fixture(
1✔
938
            expected_fixture_name,
939
            expected_source="a1b2c3d4@test.com",
940
            expected_destination="sender@external.example.com",
941
        )
942
        self.relay_address.refresh_from_db()
1✔
943
        assert self.relay_address.num_replied == 1
1✔
944
        last_used_at = self.relay_address.last_used_at
1✔
945
        assert last_used_at
1✔
946
        assert (datetime.now(tz=UTC) - last_used_at).seconds < 2.0
1✔
947
        assert (last_en := self.relay_address.user.profile.last_engagement) is not None
1✔
948
        assert last_en > self.pre_reply_last_engagement
1✔
949

950
    def assert_log_reply_email_dropped(
1✔
951
        self,
952
        caplog: _LoggingWatcher,
953
        reason: EmailDroppedReason,
954
        can_retry: bool = False,
955
    ) -> None:
956
        assert_log_email_dropped(
1✔
957
            caplog, reason, self.relay_address, is_reply=True, can_retry=can_retry
958
        )
959

960
    def test_reply(self) -> None:
1✔
961
        self.successful_reply_test_implementation(
1✔
962
            text="this is a text reply", expected_fixture_name="s3_stored_replies"
963
        )
964

965
    def test_reply_with_emoji_in_text(self) -> None:
1✔
966
        """An email with emoji text content is sent with UTF-8 encoding."""
967
        self.successful_reply_test_implementation(
1✔
968
            text="👍 Thanks I got it!",
969
            expected_fixture_name="s3_stored_replies_with_emoji",
970
        )
971

972
    @patch("emails.views._reply_allowed")
1✔
973
    def test_reply_not_allowed(self, mocked_reply_allowed: Mock) -> None:
1✔
974
        mocked_reply_allowed.return_value = False
1✔
975
        with self.assertLogs(INFO_LOG) as caplog:
1✔
976
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored_replies"])
1✔
977
        assert response.status_code == 403
1✔
978
        assert response.content == b"Relay replies require a premium account"
1✔
979
        self.assert_log_reply_email_dropped(caplog, "reply_requires_premium")
1✔
980

981
    def test_get_message_content_from_s3_not_found(self) -> None:
1✔
982
        self.mock_get_content.side_effect = ClientError(
1✔
983
            operation_name="S3.something",
984
            error_response={"Error": {"Code": "NoSuchKey", "Message": "the message"}},
985
        )
986
        with (
1✔
987
            self.assertLogs(INFO_LOG) as info_caplog,
988
            self.assertLogs(ERROR_LOG, "ERROR") as events_caplog,
989
        ):
990
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored_replies"])
1✔
991
        self.mock_send_raw_email.assert_not_called()
1✔
992
        assert response.status_code == 404
1✔
993
        assert response.content == b"Email not in S3"
1✔
994

995
        assert len(events_caplog.records) == 1
1✔
996
        events_log = events_caplog.records[0]
1✔
997
        assert events_log.message == "s3_object_does_not_exist"
1✔
998
        assert getattr(events_log, "Code") == "NoSuchKey"
1✔
999
        assert getattr(events_log, "Message") == "the message"
1✔
1000
        self.assert_log_reply_email_dropped(info_caplog, "content_missing")
1✔
1001

1002
    def test_get_message_content_from_s3_other_error(self) -> None:
1✔
1003
        self.mock_get_content.side_effect = ClientError(
1✔
1004
            operation_name="S3.something",
1005
            error_response={"Error": {"Code": "IsNapping", "Message": "snooze"}},
1006
        )
1007
        with (
1✔
1008
            self.assertLogs(INFO_LOG) as info_caplog,
1009
            self.assertLogs(ERROR_LOG, "ERROR") as error_caplog,
1010
        ):
1011
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored_replies"])
1✔
1012
        self.mock_send_raw_email.assert_not_called()
1✔
1013
        assert response.status_code == 503
1✔
1014
        assert response.content == b"Cannot fetch the message content from S3"
1✔
1015

1016
        self.assert_log_reply_email_dropped(
1✔
1017
            info_caplog, "error_storage", can_retry=True
1018
        )
1019
        assert len(error_caplog.records) == 1
1✔
1020
        error_log = error_caplog.records[0]
1✔
1021
        assert error_log.message == "s3_client_error_get_email"
1✔
1022
        assert getattr(error_log, "Code") == "IsNapping"
1✔
1023
        assert getattr(error_log, "Message") == "snooze"
1✔
1024

1025
    def test_ses_client_error(self) -> None:
1✔
1026
        self.mock_get_content.return_value = create_email_from_notification(
1✔
1027
            EMAIL_SNS_BODIES["s3_stored_replies"], text="text content"
1028
        )
1029
        self.mock_send_raw_email.side_effect = SEND_RAW_EMAIL_FAILED
1✔
1030
        with (
1✔
1031
            self.assertLogs(INFO_LOG) as info_caplog,
1032
            self.assertLogs(ERROR_LOG, "ERROR") as error_caplog,
1033
        ):
1034
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored_replies"])
1✔
1035
        assert response.status_code == 400
1✔
1036
        assert response.content == b"SES client error"
1✔
1037

1038
        self.assert_log_reply_email_dropped(info_caplog, "error_sending")
1✔
1039
        assert len(error_caplog.records) == 1
1✔
1040
        error_log = error_caplog.records[0]
1✔
1041
        assert error_log.message == "ses_client_error_raw_email"
1✔
1042
        assert getattr(error_log, "Code") == "the code"
1✔
1043
        assert getattr(error_log, "Message") == "the message"
1✔
1044

1045

1046
@override_settings(STATSD_ENABLED=True)
1✔
1047
class BounceHandlingTest(TestCase):
1✔
1048
    def setUp(self):
1✔
1049
        self.user = baker.make(User, email="relayuser@test.com")
1✔
1050
        self.sa: SocialAccount = baker.make(
1✔
1051
            SocialAccount, user=self.user, provider="fxa", uid=str(uuid4())
1052
        )
1053

1054
    def test_sns_message_with_hard_bounce(self) -> None:
1✔
1055
        pre_request_datetime = datetime.now(UTC)
1✔
1056

1057
        with self.assertLogs(INFO_LOG) as logs, MetricsMock() as mm:
1✔
1058
            _sns_notification(BOUNCE_SNS_BODIES["hard"])
1✔
1059

1060
        self.user.refresh_from_db()
1✔
1061
        assert self.user.profile.last_hard_bounce is not None
1✔
1062
        assert self.user.profile.last_hard_bounce >= pre_request_datetime
1✔
1063

1064
        assert len(logs.records) == 1
1✔
1065
        log_data = log_extra(logs.records[0])
1✔
1066
        assert (diagnostic := log_data["bounce_diagnostic"])
1✔
1067
        assert log_data == {
1✔
1068
            "bounce_action": "failed",
1069
            "bounce_diagnostic": diagnostic,
1070
            "bounce_status": "5.1.1",
1071
            "bounce_subtype": "OnAccountSuppressionList",
1072
            "bounce_type": "Permanent",
1073
            "domain": "test.com",
1074
            "relay_action": "hard_bounce",
1075
            "user_match": "found",
1076
            "fxa_id": self.sa.uid,
1077
        }
1078

1079
        mm.assert_incr_once(
1✔
1080
            "email_bounce",
1081
            tags=[
1082
                "bounce_type:permanent",
1083
                "bounce_subtype:onaccountsuppressionlist",
1084
                "user_match:found",
1085
                "relay_action:hard_bounce",
1086
            ],
1087
        )
1088

1089
    def test_sns_message_with_soft_bounce(self) -> None:
1✔
1090
        pre_request_datetime = datetime.now(UTC)
1✔
1091

1092
        with self.assertLogs(INFO_LOG) as logs, MetricsMock() as mm:
1✔
1093
            _sns_notification(BOUNCE_SNS_BODIES["soft"])
1✔
1094

1095
        self.user.refresh_from_db()
1✔
1096
        assert self.user.profile.last_soft_bounce is not None
1✔
1097
        assert self.user.profile.last_soft_bounce >= pre_request_datetime
1✔
1098

1099
        assert len(logs.records) == 1
1✔
1100
        log_data = log_extra(logs.records[0])
1✔
1101
        assert (diagnostic := log_data["bounce_diagnostic"])
1✔
1102
        assert log_data == {
1✔
1103
            "bounce_action": "failed",
1104
            "bounce_diagnostic": diagnostic,
1105
            "bounce_status": "5.1.1",
1106
            "bounce_subtype": "SRETeamEatenByDinosaurs",
1107
            "bounce_type": "Transient",
1108
            "domain": "test.com",
1109
            "relay_action": "soft_bounce",
1110
            "user_match": "found",
1111
            "fxa_id": self.sa.uid,
1112
        }
1113

1114
        mm.assert_incr_once(
1✔
1115
            "email_bounce",
1116
            tags=[
1117
                "bounce_type:transient",
1118
                "bounce_subtype:sreteameatenbydinosaurs",
1119
                "user_match:found",
1120
                "relay_action:soft_bounce",
1121
            ],
1122
        )
1123

1124
    def test_sns_message_with_spam_bounce_sets_auto_block_spam(self):
1✔
1125
        with self.assertLogs(INFO_LOG) as logs, MetricsMock() as mm:
1✔
1126
            _sns_notification(BOUNCE_SNS_BODIES["spam"])
1✔
1127
        self.user.refresh_from_db()
1✔
1128
        assert self.user.profile.auto_block_spam
1✔
1129

1130
        assert len(logs.records) == 1
1✔
1131
        log_data = log_extra(logs.records[0])
1✔
1132
        assert (diagnostic := log_data["bounce_diagnostic"])
1✔
1133
        assert log_data == {
1✔
1134
            "bounce_action": "failed",
1135
            "bounce_diagnostic": diagnostic,
1136
            "bounce_status": "5.1.1",
1137
            "bounce_subtype": "StopRelayingSpamForThisUser",
1138
            "bounce_type": "Transient",
1139
            "domain": "test.com",
1140
            "relay_action": "auto_block_spam",
1141
            "user_match": "found",
1142
            "fxa_id": self.sa.uid,
1143
        }
1144

1145
        mm.assert_incr_once(
1✔
1146
            "email_bounce",
1147
            tags=[
1148
                "bounce_type:transient",
1149
                "bounce_subtype:stoprelayingspamforthisuser",
1150
                "user_match:found",
1151
                "relay_action:auto_block_spam",
1152
            ],
1153
        )
1154

1155
    def test_sns_message_with_hard_bounce_and_optout(self) -> None:
1✔
1156
        self.sa.extra_data["metricsEnabled"] = False
1✔
1157
        self.sa.save()
1✔
1158

1159
        with self.assertLogs(INFO_LOG) as logs:
1✔
1160
            _sns_notification(BOUNCE_SNS_BODIES["hard"])
1✔
1161

1162
        log_data = log_extra(logs.records[0])
1✔
1163
        assert log_data["user_match"] == "found"
1✔
1164
        assert not log_data["fxa_id"]
1✔
1165

1166

1167
@override_settings(STATSD_ENABLED=True)
1✔
1168
@override_settings(RELAY_FROM_ADDRESS="reply@relay.example.com")
1✔
1169
class ComplaintHandlingTest(TestCase):
1✔
1170
    """
1171
    Test Complaint notifications and events.
1172

1173
    Example derived from:
1174
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1175
    """
1176

1177
    def setUp(self):
1✔
1178
        self.user = baker.make(User, email="relayuser@test.com")
1✔
1179
        self.sa: SocialAccount = baker.make(
1✔
1180
            SocialAccount, user=self.user, provider="fxa", uid=str(uuid4())
1181
        )
1182
        self.ra = baker.make(
1✔
1183
            RelayAddress, user=self.user, address="ebsbdsan7", domain=2
1184
        )
1185

1186
        russian_spam_notification = create_notification_from_email(
1✔
1187
            EMAIL_EXPECTED["russian_spam"]
1188
        )
1189
        spam_mail_content = json.loads(russian_spam_notification["Message"])["mail"]
1✔
1190
        spam_mail_content["source"] = "replies@default.com"  # Reply-To address
1✔
1191
        spam_mail_content["messageId"] = (
1✔
1192
            "0100019291f7e695-51da71c8-36cc-4cc7-82e3-23fbf48d4bb4-000000"
1193
        )
1194
        del spam_mail_content["commonHeaders"]["date"]
1✔
1195
        del spam_mail_content["commonHeaders"]["messageId"]
1✔
1196

1197
        self.complaint_msg = {
1✔
1198
            "notificationType": "Complaint",
1199
            "complaint": {
1200
                "userAgent": "ExampleCorp Feedback Loop (V0.01)",
1201
                "complainedRecipients": [{"emailAddress": self.user.email}],
1202
                "complaintFeedbackType": "abuse",
1203
                "arrivalDate": "2009-12-03T04:24:21.000-05:00",
1204
                "timestamp": "2012-05-25T14:59:38.623Z",
1205
                "feedbackId": (
1206
                    "000001378603177f-18c07c78-fa81-4a58-9dd1-fedc3cb8f49a-000000"
1207
                ),
1208
            },
1209
            "mail": spam_mail_content,
1210
        }
1211
        self.complaint_body = {"Message": json.dumps(self.complaint_msg)}
1✔
1212
        ses_client_patcher = patch(
1✔
1213
            "emails.apps.EmailsConfig.ses_client",
1214
            spec_set=["send_raw_email"],
1215
        )
1216
        self.mock_ses_client = ses_client_patcher.start()
1✔
1217
        self.addCleanup(ses_client_patcher.stop)
1✔
1218

1219
    def test_notification_type_complaint(self):
1✔
1220
        """
1221
        A notificationType of complaint:
1222
            1. increments a counter
1223
            2. logs details,
1224
            3. sets the user profile's auto_block_spam = True, and
1225
            4. returns 200.
1226
        """
1227
        assert self.user.profile.auto_block_spam is False
1✔
1228

1229
        with self.assertLogs(INFO_LOG) as logs, MetricsMock() as mm:
1✔
1230
            response = _sns_notification(self.complaint_body)
1✔
1231
        assert response.status_code == 200
1✔
1232

1233
        self.user.profile.refresh_from_db()
1✔
1234
        assert self.user.profile.auto_block_spam is True
1✔
1235

1236
        self.ra.refresh_from_db()
1✔
1237
        assert self.ra.enabled
1✔
1238
        self.mock_ses_client.send_raw_email.assert_not_called()
1✔
1239

1240
        mm.assert_incr_once(
1✔
1241
            "email_complaint",
1242
            tags=[
1243
                "complaint_subtype:none",
1244
                "complaint_feedback:abuse",
1245
                "user_match:found",
1246
                "relay_action:auto_block_spam",
1247
            ],
1248
        )
1249
        assert len(logs.records) == 1
1✔
1250
        record = logs.records[0]
1✔
1251
        assert record.msg == "complaint_notification"
1✔
1252
        log_data = log_extra(record)
1✔
1253
        assert log_data == {
1✔
1254
            "complaint_feedback": "abuse",
1255
            "complaint_subtype": None,
1256
            "complaint_user_agent": "ExampleCorp Feedback Loop (V0.01)",
1257
            "domain": "test.com",
1258
            "relay_action": "auto_block_spam",
1259
            "user_match": "found",
1260
            "mask_match": "found",
1261
            "fxa_id": self.sa.uid,
1262
            "found_in": "all",
1263
        }
1264

1265
    def test_complaint_log_with_optout(self) -> None:
1✔
1266
        self.sa.extra_data["metricsEnabled"] = False
1✔
1267
        self.sa.save()
1✔
1268

1269
        with self.assertLogs(INFO_LOG) as logs:
1✔
1270
            _sns_notification(self.complaint_body)
1✔
1271

1272
        self.user.profile.refresh_from_db()
1✔
1273
        assert self.user.profile.auto_block_spam is True
1✔
1274

1275
        self.ra.refresh_from_db()
1✔
1276
        assert self.ra.enabled
1✔
1277
        self.mock_ses_client.send_raw_email.assert_not_called()
1✔
1278

1279
        log_data = log_extra(logs.records[0])
1✔
1280
        assert log_data["user_match"] == "found"
1✔
1281
        assert not log_data["fxa_id"]
1✔
1282

1283
    @override_flag("disable_mask_on_complaint", active=True)
1✔
1284
    def test_complaint_with_auto_block_spam_disables_mask(self):
1✔
1285
        """
1286
        A notificationType of complaint:
1287
            1. sets enabled=False on the mask, and
1288
            2. returns 200.
1289
        """
1290
        self.user.profile.auto_block_spam = True
1✔
1291
        self.user.profile.save()
1✔
1292
        assert self.ra.enabled is True
1✔
1293

1294
        with self.assertLogs(INFO_LOG) as logs, MetricsMock() as mm:
1✔
1295
            response = _sns_notification(self.complaint_body)
1✔
1296
        assert response.status_code == 200
1✔
1297

1298
        self.user.profile.refresh_from_db()
1✔
1299
        assert self.user.profile.auto_block_spam is True
1✔
1300

1301
        self.ra.refresh_from_db()
1✔
1302
        assert self.ra.enabled is False
1✔
1303

1304
        self.mock_ses_client.send_raw_email.assert_called_once()
1✔
1305
        call = self.mock_ses_client.send_raw_email.call_args
1✔
1306
        assert call.kwargs["Source"] == settings.RELAY_FROM_ADDRESS
1✔
1307
        assert call.kwargs["Destinations"] == [self.user.email]
1✔
1308
        msg_without_newlines = call.kwargs["RawMessage"]["Data"].replace("\n", "")
1✔
1309
        assert "This mask has been deactivated" in msg_without_newlines
1✔
1310
        assert self.ra.full_address in msg_without_newlines
1✔
1311

1312
        mm.assert_incr_once("send_disabled_mask_email")
1✔
1313
        mm.assert_incr_once(
1✔
1314
            "email_complaint",
1315
            tags=[
1316
                "complaint_subtype:none",
1317
                "complaint_feedback:abuse",
1318
                "user_match:found",
1319
                "relay_action:disable_mask",
1320
            ],
1321
        )
1322

1323
        assert len(logs.records) == 1
1✔
1324
        record = logs.records[0]
1✔
1325
        assert record.msg == "complaint_notification"
1✔
1326
        log_data = log_extra(record)
1✔
1327
        assert log_data == {
1✔
1328
            "complaint_feedback": "abuse",
1329
            "complaint_subtype": None,
1330
            "complaint_user_agent": "ExampleCorp Feedback Loop (V0.01)",
1331
            "domain": "test.com",
1332
            "relay_action": "disable_mask",
1333
            "user_match": "found",
1334
            "mask_match": "found",
1335
            "fxa_id": self.sa.uid,
1336
            "found_in": "all",
1337
        }
1338

1339
    @override_flag("developer_mode", active=True)
1✔
1340
    def test_complaint_developer_mode(self):
1✔
1341
        """Log complaint notification for developer_mode users."""
1342

1343
        simulator_complaint_message = deepcopy(self.complaint_msg)
1✔
1344
        simulator_complaint_message["complaint"]["complainedRecipients"] = [
1✔
1345
            {"emailAddress": f"complaint+{self.ra.metrics_id}@simulator.amazonses.com"}
1346
        ]
1347
        complaint_body = {"Message": json.dumps(simulator_complaint_message)}
1✔
1348

1349
        with self.assertLogs(INFO_LOG) as logs:
1✔
1350
            response = _sns_notification(complaint_body)
1✔
1351
        assert response.status_code == 200
1✔
1352

1353
        self.user.profile.refresh_from_db()
1✔
1354
        assert self.user.profile.auto_block_spam is True
1✔
1355
        self.mock_ses_client.send_raw_email.assert_not_called()
1✔
1356

1357
        (rec1, rec2) = logs.records
1✔
1358
        assert rec1.msg == "_handle_complaint: developer_mode"
1✔
1359
        assert getattr(rec1, "mask_id") == self.ra.metrics_id
1✔
1360
        assert getattr(rec1, "dev_action") == "log"
1✔
1361
        assert getattr(rec1, "parts") == 1
1✔
1362
        assert getattr(rec1, "part") == 0
1✔
1363
        notification_gza85 = getattr(rec1, "notification_gza85")
1✔
1364
        log_complaint = decode_dict_gza85(notification_gza85)
1✔
1365
        assert log_complaint == simulator_complaint_message
1✔
1366

1367
        assert rec2.msg == "complaint_notification"
1✔
1368

1369
    def test_complaint_from_stranger_is_404(self):
1✔
1370
        """If no Relay users match, log the complaint."""
1371
        complaint_msg = deepcopy(self.complaint_msg)
1✔
1372
        complaint_msg["complaint"]["complainedRecipients"] = [
1✔
1373
            {"emailAddress": "receiver@stranger.example.com"}
1374
        ]
1375
        complaint_msg["mail"]["commonHeaders"]["from"] = ["sender@stranger.example.com"]
1✔
1376
        complaint_body = {"Message": json.dumps(complaint_msg)}
1✔
1377

1378
        with (
1✔
1379
            self.assertLogs(INFO_LOG) as info_logs,
1380
            self.assertLogs(ERROR_LOG) as error_logs,
1381
        ):
1382
            response = _sns_notification(complaint_body)
1✔
1383
        assert response.status_code == 404
1✔
1384

1385
        self.mock_ses_client.send_raw_email.assert_not_called()
1✔
1386

1387
        (info_log,) = info_logs.records
1✔
1388
        assert info_log.msg == "complaint_notification"
1✔
1389
        assert getattr(info_log, "user_match") == "no_recipients"
1✔
1390
        assert getattr(info_log, "relay_action") == "no_action"
1✔
1391

1392
        (err_log1, err_log2) = error_logs.records
1✔
1393
        assert err_log1.msg == "_gather_complainers: unknown complainedRecipient"
1✔
1394
        assert err_log2.msg == "_gather_complainers: unknown mask, maybe deleted?"
1✔
1395

1396
    def test_build_disabled_mask_for_spam_email(self):
1✔
1397
        free_user = make_free_test_user("testreal@email.com")
1✔
1398
        test_mask_address = "w41fwbt4q"
1✔
1399
        relay_address = baker.make(
1✔
1400
            RelayAddress, user=free_user, address=test_mask_address, domain=2
1401
        )
1402

1403
        msg = _build_disabled_mask_for_spam_email(relay_address)
1✔
1404

1405
        assert msg["Subject"] == main.format("relay-deactivated-mask-email-subject")
1✔
1406
        assert msg["From"] == settings.RELAY_FROM_ADDRESS
1✔
1407
        assert msg["To"] == free_user.email
1✔
1408

1409
        text_content, html_content = get_text_and_html_content(msg)
1✔
1410
        assert test_mask_address in text_content
1✔
1411
        assert test_mask_address in html_content
1✔
1412

1413
        assert_email_equals_fixture(
1✔
1414
            msg.as_string(), "disabled_mask_for_spam", replace_mime_boundaries=True
1415
        )
1416

1417

1418
class GetComplaintDataTest(TestCase):
1✔
1419
    """
1420
    Test emails.views._get_complaint_data
1421

1422
    This function takes a AWS SES Complaint Notification as input, and
1423
    outputs a RawComplaintData with the data needed for complaint processing.
1424

1425
    The 'good data' test cases are also tested in ComplaintHandlingTest. The
1426
    edge cases of missing data in the AWS complaint message are tested here.
1427
    These edge cases are not expected in production, but are handled with
1428
    logging rather than exceptions.
1429
    """
1430

1431
    def test_full_complaint(self):
1✔
1432
        """Some data from a full complaint message is extracted."""
1433
        message = {
1✔
1434
            "notificationType": "Complaint",
1435
            "complaint": {
1436
                "userAgent": "ExampleCorp Feedback Loop (V0.01)",
1437
                "complainedRecipients": [{"emailAddress": "complainer@example.com"}],
1438
                "complaintFeedbackType": "abuse",
1439
                "arrivalDate": "2009-12-03T04:24:21.000-05:00",
1440
                "timestamp": "2012-05-25T14:59:38.623Z",
1441
                "feedbackId": (
1442
                    "000001378603177f-18c07c78-fa81-4a58-9dd1-fedc3cb8f49a-000000"
1443
                ),
1444
            },
1445
            "mail": {
1446
                "commonHeaders": {
1447
                    "from": [
1448
                        '"hello@ac.spam.example.com [via Relay]" '
1449
                        "<relay-mask@test.com>"
1450
                    ],
1451
                    "subject": "A spam message",
1452
                    "to": ["complainer@example.com"],
1453
                },
1454
                "destination": ["complainer@example.com"],
1455
                "headers": [
1456
                    {"name": "MIME-Version", "value": "1.0"},
1457
                    {
1458
                        "name": "Content-Type",
1459
                        "value": "multipart/mixed; "
1460
                        'boundary="MXFqWmhZLWxxWm5TTC1OaQ=="',
1461
                    },
1462
                    {"name": "Subject", "value": "A spam message"},
1463
                    {
1464
                        "name": "From",
1465
                        "value": '"hello@ac.spam.example.com [via Relay]" '
1466
                        "<relay-mask@test.com>",
1467
                    },
1468
                    {"name": "To", "value": "complainer@example.com"},
1469
                    {"name": "Reply-To", "value": "replies@default.com"},
1470
                    {"name": "Resent-From", "value": "hello@ac.spam.example.com"},
1471
                ],
1472
                "headersTruncated": False,
1473
                "messageId": (
1474
                    "0100019291f7e695-51da71c8-36cc-4cc7-82e3-23fbf48d4bb4-000000"
1475
                ),
1476
                "source": "replies@default.com",
1477
                "timestamp": "2024-10-21T16:46:42.622234",
1478
            },
1479
        }
1480
        complaint_data = _get_complaint_data(message)
1✔
1481
        assert complaint_data == RawComplaintData(
1✔
1482
            complained_recipients=[("complainer@example.com", {})],
1483
            from_addresses=["relay-mask@test.com"],
1484
            subtype="",
1485
            user_agent="ExampleCorp Feedback Loop (V0.01)",
1486
            feedback_type="abuse",
1487
        )
1488

1489
    def test_minimal_complaint(self):
1✔
1490
        """
1491
        Data is extracted from a minimized complaint message.
1492

1493
        This minimal form will be used for missing field tests below.
1494
        """
1495
        message = {
1✔
1496
            "complaint": {
1497
                "complainedRecipients": [{"emailAddress": "complainer@example.com"}],
1498
            },
1499
            "mail": {"commonHeaders": {"from": ["relay-mask@test.com"]}},
1500
        }
1501
        with self.assertNoLogs(ERROR_LOG):
1✔
1502
            complaint_data = _get_complaint_data(message)
1✔
1503
        assert complaint_data == RawComplaintData(
1✔
1504
            complained_recipients=[("complainer@example.com", {})],
1505
            from_addresses=["relay-mask@test.com"],
1506
            subtype="",
1507
            user_agent="",
1508
            feedback_type="",
1509
        )
1510

1511
    def test_no_complained_recipients_error_logged(self):
1✔
1512
        """When complaint.complainedRecipients is missing, an error is logged."""
1513
        message = {
1✔
1514
            "complaint": {},
1515
            "mail": {"commonHeaders": {"from": ["relay-mask@test.com"]}},
1516
        }
1517
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1518
            complaint_data = _get_complaint_data(message)
1✔
1519
        assert complaint_data.complained_recipients == []
1✔
1520

1521
        (err_log,) = error_logs.records
1✔
1522
        assert err_log.msg == "_get_complaint_data: Unexpected message format"
1✔
1523
        assert getattr(err_log, "missing_key") == "complainedRecipients"
1✔
1524
        assert getattr(err_log, "found_keys") == ""
1✔
1525

1526
    def test_empty_complained_recipients_error_logged(self):
1✔
1527
        """When complaint.complainedRecipients is empty, an error is logged."""
1528
        message = {
1✔
1529
            "complaint": {"complainedRecipients": []},
1530
            "mail": {"commonHeaders": {"from": ["relay-mask@test.com"]}},
1531
        }
1532
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1533
            complaint_data = _get_complaint_data(message)
1✔
1534
        assert complaint_data.complained_recipients == []
1✔
1535

1536
        (err_log,) = error_logs.records
1✔
1537
        assert err_log.msg == "_get_complaint_data: Empty complainedRecipients"
1✔
1538

1539
    def test_wrong_complained_recipients_error_logged(self):
1✔
1540
        """
1541
        When complaint.complainedRecipients has an object without the
1542
        emailAddress key, an error is logged.
1543
        """
1544
        message = {
1✔
1545
            "complaint": {
1546
                "complainedRecipients": [{"foo": "bar"}],
1547
            },
1548
            "mail": {"commonHeaders": {"from": ["relay-mask@test.com"]}},
1549
        }
1550
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1551
            complaint_data = _get_complaint_data(message)
1✔
1552
        assert complaint_data.complained_recipients == []
1✔
1553

1554
        (err_log,) = error_logs.records
1✔
1555
        assert err_log.msg == "_get_complaint_data: Unexpected message format"
1✔
1556
        assert getattr(err_log, "missing_key") == "emailAddress"
1✔
1557
        assert getattr(err_log, "found_keys") == "foo"
1✔
1558

1559
    def test_no_mail_error_logged(self):
1✔
1560
        """If the mail key is missing, an error is logged."""
1561
        message = {
1✔
1562
            "complaint": {
1563
                "complainedRecipients": [{"emailAddress": "complainer@example.com"}],
1564
            },
1565
        }
1566
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1567
            complaint_data = _get_complaint_data(message)
1✔
1568
        assert complaint_data.from_addresses == []
1✔
1569

1570
        (err_log,) = error_logs.records
1✔
1571
        assert err_log.msg == "_get_complaint_data: Unexpected message format"
1✔
1572
        assert getattr(err_log, "missing_key") == "mail"
1✔
1573
        assert getattr(err_log, "found_keys") == "complaint"
1✔
1574

1575
    def test_no_common_headers_error_logged(self):
1✔
1576
        """If the commonHeaders key is missing, an error is logged."""
1577
        message = {
1✔
1578
            "complaint": {
1579
                "complainedRecipients": [{"emailAddress": "complainer@example.com"}],
1580
            },
1581
            "mail": {},
1582
        }
1583
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1584
            complaint_data = _get_complaint_data(message)
1✔
1585
        assert complaint_data.from_addresses == []
1✔
1586

1587
        (err_log,) = error_logs.records
1✔
1588
        assert err_log.msg == "_get_complaint_data: Unexpected message format"
1✔
1589
        assert getattr(err_log, "missing_key") == "commonHeaders"
1✔
1590
        assert getattr(err_log, "found_keys") == ""
1✔
1591

1592
    def test_no_from_header_error_logged(self):
1✔
1593
        """If the From header entry is missing, an error is logged."""
1594
        message = {
1✔
1595
            "complaint": {
1596
                "complainedRecipients": [{"emailAddress": "complainer@example.com"}],
1597
            },
1598
            "mail": {"commonHeaders": {}},
1599
        }
1600
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1601
            complaint_data = _get_complaint_data(message)
1✔
1602
        assert complaint_data.from_addresses == []
1✔
1603

1604
        (err_log,) = error_logs.records
1✔
1605
        assert err_log.msg == "_get_complaint_data: Unexpected message format"
1✔
1606
        assert getattr(err_log, "missing_key") == "from"
1✔
1607
        assert getattr(err_log, "found_keys") == ""
1✔
1608

1609
    def test_no_feedback_type_not_an_error(self):
1✔
1610
        """If the feedback type is missing, no error is logged"""
1611
        message = {
1✔
1612
            "complaint": {
1613
                "complainedRecipients": [{"emailAddress": "complainer@example.com"}],
1614
            },
1615
            "mail": {"commonHeaders": {"from": ["relay-mask@test.com"]}},
1616
        }
1617
        with self.assertNoLogs(ERROR_LOG):
1✔
1618
            complaint_data = _get_complaint_data(message)
1✔
1619
        assert complaint_data.feedback_type == ""
1✔
1620

1621

1622
class GatherComplainersTest(TestCase):
1✔
1623
    """
1624
    Test _gather_complainers(), merging complaint data with the Relay database.
1625

1626
    This function is also tested by ComplaintHandlingTest. This case adds
1627
    tests for corner cases of weird complaint data.
1628
    """
1629

1630
    def setUp(self) -> None:
1✔
1631
        self.user = baker.make(User, email="relayuser@test.com")
1✔
1632
        self.user_domain = self.user.email.split("@")[1]
1✔
1633
        self.relay_address = baker.make(RelayAddress, user=self.user, domain=2)
1✔
1634

1635
    def test_known_relay_user(self) -> None:
1✔
1636
        data = RawComplaintData(
1✔
1637
            complained_recipients=[(self.user.email, {})],
1638
            from_addresses=[self.relay_address.full_address],
1639
            subtype="",
1640
            user_agent="agent",
1641
            feedback_type="abuse",
1642
        )
1643
        complainers, unknown_count = _gather_complainers(data)
1✔
1644
        assert complainers == [
1✔
1645
            {
1646
                "user": self.user,
1647
                "found_in": "all",
1648
                "domain": self.user_domain,
1649
                "extra": None,
1650
                "masks": [self.relay_address],
1651
            }
1652
        ]
1653
        assert unknown_count == 0
1✔
1654

1655
    def test_unknown_address(self) -> None:
1✔
1656
        """If no Relay users match, return nothing."""
1657
        data = RawComplaintData(
1✔
1658
            complained_recipients=[("receiver@stranger.example.com", {})],
1659
            from_addresses=["sender@stranger.example.com"],
1660
            subtype="",
1661
            user_agent="agent",
1662
            feedback_type="abuse",
1663
        )
1664
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1665
            complainers, unknown_count = _gather_complainers(data)
1✔
1666
        assert complainers == []
1✔
1667
        assert unknown_count == 2
1✔
1668
        (err_log1, err_log2) = error_logs.records
1✔
1669
        assert err_log1.msg == "_gather_complainers: unknown complainedRecipient"
1✔
1670
        assert err_log2.msg == "_gather_complainers: unknown mask, maybe deleted?"
1✔
1671

1672
    def test_complaint_simulator_developer_mode(self) -> None:
1✔
1673
        """If the complainer is the AWS complaint simulator, swap in the user"""
1674
        data = RawComplaintData(
1✔
1675
            complained_recipients=[
1676
                (
1677
                    f"complaint+{self.relay_address.metrics_id}@simulator.amazonses.com",
1678
                    {},
1679
                ),
1680
            ],
1681
            from_addresses=[self.relay_address.full_address],
1682
            subtype="",
1683
            user_agent="agent",
1684
            feedback_type="abuse",
1685
        )
1686
        complainers, unknown_count = _gather_complainers(data)
1✔
1687
        assert complainers == [
1✔
1688
            {
1689
                "user": self.user,
1690
                "found_in": "all",
1691
                "domain": self.user_domain,
1692
                "extra": None,
1693
                "masks": [self.relay_address],
1694
            }
1695
        ]
1696
        assert unknown_count == 0
1✔
1697

1698
    def test_complaint_simulator_developer_mode_domain_address(self) -> None:
1✔
1699
        """The AWS complaint simulator swap works with a domain address"""
1700
        premium_user = make_premium_test_user()
1✔
1701
        premium_user.profile.subdomain = "subdomain"
1✔
1702
        premium_user.profile.save()
1✔
1703

1704
        domain_address = DomainAddress.objects.create(
1✔
1705
            address="complainer",
1706
            user=premium_user,
1707
            description="DEV:simulate_complaint",
1708
        )
1709
        data = RawComplaintData(
1✔
1710
            complained_recipients=[
1711
                (f"complaint+{domain_address.metrics_id}@simulator.amazonses.com", {})
1712
            ],
1713
            from_addresses=[domain_address.full_address],
1714
            subtype="",
1715
            user_agent="agent",
1716
            feedback_type="abuse",
1717
        )
1718
        complainers, unknown_count = _gather_complainers(data)
1✔
1719
        assert complainers == [
1✔
1720
            {
1721
                "user": premium_user,
1722
                "found_in": "all",
1723
                "domain": premium_user.email.split("@")[1],
1724
                "extra": None,
1725
                "masks": [domain_address],
1726
            }
1727
        ]
1728
        assert unknown_count == 0
1✔
1729

1730
    def test_complaint_simulator_embedded_mask_not_found(self) -> None:
1✔
1731
        """
1732
        If the complainer is the AWS complaint simulator, but the embedded mask ID
1733
        is not found, then it is returns as an unknown user. If the mask is in the
1734
        From: header, then a user can still be returned, with
1735
        "found_in": "from_header" instead of "all".
1736
        """
1737
        assert not RelayAddress.objects.filter(id=2024).exists()
1✔
1738
        data = RawComplaintData(
1✔
1739
            complained_recipients=[("complaint+R2024@simulator.amazonses.com", {})],
1740
            from_addresses=[self.relay_address.full_address],
1741
            subtype="",
1742
            user_agent="agent",
1743
            feedback_type="abuse",
1744
        )
1745
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1746
            complainers, unknown_count = _gather_complainers(data)
1✔
1747
        assert complainers == [
1✔
1748
            {
1749
                "user": self.user,
1750
                "found_in": "from_header",
1751
                "domain": self.user_domain,
1752
                "extra": None,
1753
                "masks": [self.relay_address],
1754
            }
1755
        ]
1756
        assert unknown_count == 1
1✔
1757
        (err_log,) = error_logs.records
1✔
1758
        assert err_log.msg == "_gather_complainers: unknown complainedRecipient"
1✔
1759

1760
    def test_unknown_complained_recipient_logs_error(self) -> None:
1✔
1761
        """
1762
        If the complainer is not a known Relay user, log an error.
1763
        The Relay user can still be returned with a From: header match.
1764
        """
1765
        data = RawComplaintData(
1✔
1766
            complained_recipients=[("unknown@somewhere.example.com", {})],
1767
            from_addresses=[self.relay_address.full_address],
1768
            subtype="",
1769
            user_agent="agent",
1770
            feedback_type="abuse",
1771
        )
1772
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1773
            complainers, unknown_count = _gather_complainers(data)
1✔
1774
        assert complainers == [
1✔
1775
            {
1776
                "user": self.user,
1777
                "found_in": "from_header",
1778
                "domain": self.user_domain,
1779
                "extra": None,
1780
                "masks": [self.relay_address],
1781
            }
1782
        ]
1783
        assert unknown_count == 1
1✔
1784
        (err_log,) = error_logs.records
1✔
1785
        assert err_log.msg == "_gather_complainers: unknown complainedRecipient"
1✔
1786

1787
    def test_unknown_complained_recipient_two_masks_logs_errors(self) -> None:
1✔
1788
        """
1789
        If the complainer is unknown but two masks match the same user, log
1790
        the weirdness.
1791

1792
        Also, this should _never_ happen, but there's a branch instead of
1793
        raising an exception and losing data, so there's also a test.
1794
        """
1795
        second_address = RelayAddress.objects.create(user=self.user)
1✔
1796
        data = RawComplaintData(
1✔
1797
            complained_recipients=[("unknown@somewhere.example.com", {})],
1798
            from_addresses=[
1799
                self.relay_address.full_address,
1800
                second_address.full_address,
1801
            ],
1802
            subtype="",
1803
            user_agent="agent",
1804
            feedback_type="abuse",
1805
        )
1806
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1807
            complainers, unknown_count = _gather_complainers(data)
1✔
1808
        assert complainers == [
1✔
1809
            {
1810
                "user": self.user,
1811
                "found_in": "from_header",
1812
                "domain": self.user_domain,
1813
                "extra": None,
1814
                "masks": [self.relay_address, second_address],
1815
            }
1816
        ]
1817
        assert unknown_count == 1
1✔
1818
        (err_log1, err_log2) = error_logs.records
1✔
1819
        assert err_log1.msg == "_gather_complainers: unknown complainedRecipient"
1✔
1820
        assert err_log2.msg == "_gather_complainers: no complainer, multi-mask"
1✔
1821

1822
    def test_unknown_from_header_logs_error(self) -> None:
1✔
1823
        """
1824
        If the From: header does not match a known Relay user, log an error.
1825
        The Relay user can still be returned with a complainedRecipients match.
1826
        """
1827
        data = RawComplaintData(
1✔
1828
            complained_recipients=[(self.user.email, {})],
1829
            from_addresses=["unknown@somwhere.example.com"],
1830
            subtype="",
1831
            user_agent="agent",
1832
            feedback_type="abuse",
1833
        )
1834
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1835
            complainers, unknown_count = _gather_complainers(data)
1✔
1836
        assert complainers == [
1✔
1837
            {
1838
                "user": self.user,
1839
                "found_in": "complained_recipients",
1840
                "domain": self.user_domain,
1841
                "extra": None,
1842
                "masks": [],
1843
            }
1844
        ]
1845
        assert unknown_count == 1
1✔
1846
        (err_log,) = error_logs.records
1✔
1847
        assert err_log.msg == "_gather_complainers: unknown mask, maybe deleted?"
1✔
1848

1849
    def test_duplicate_complained_recipients_logs_error(self) -> None:
1✔
1850
        """If a complainer appears twice in complainedRecipieints, log the weirdness."""
1851
        data = RawComplaintData(
1✔
1852
            complained_recipients=[(self.user.email, {}), (self.user.email, {})],
1853
            from_addresses=[self.relay_address.full_address],
1854
            subtype="",
1855
            user_agent="agent",
1856
            feedback_type="abuse",
1857
        )
1858
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1859
            complainers, unknown_count = _gather_complainers(data)
1✔
1860
        assert complainers == [
1✔
1861
            {
1862
                "user": self.user,
1863
                "found_in": "all",
1864
                "domain": self.user_domain,
1865
                "extra": None,
1866
                "masks": [self.relay_address],
1867
            }
1868
        ]
1869
        assert unknown_count == 0
1✔
1870
        (err_log,) = error_logs.records
1✔
1871
        assert err_log.msg == "_gather_complainers: complainer appears twice"
1✔
1872

1873
    def test_duplicate_from_header_logs_error(self) -> None:
1✔
1874
        """If a mask appears twice in commonHeaders["from"], log the weirdness."""
1875
        data = RawComplaintData(
1✔
1876
            complained_recipients=[(self.user.email, {})],
1877
            from_addresses=[
1878
                self.relay_address.full_address,
1879
                self.relay_address.full_address,
1880
            ],
1881
            subtype="",
1882
            user_agent="agent",
1883
            feedback_type="abuse",
1884
        )
1885
        with self.assertLogs(ERROR_LOG) as error_logs:
1✔
1886
            complainers, unknown_count = _gather_complainers(data)
1✔
1887
        assert complainers == [
1✔
1888
            {
1889
                "user": self.user,
1890
                "found_in": "all",
1891
                "domain": self.user_domain,
1892
                "extra": None,
1893
                "masks": [self.relay_address],
1894
            }
1895
        ]
1896
        assert unknown_count == 0
1✔
1897
        (err_log,) = error_logs.records
1✔
1898
        assert err_log.msg == "_gather_complainers: mask appears twice"
1✔
1899

1900

1901
class GetMaskByMetricsIdTest(TestCase):
1✔
1902
    """Tests for _get_mask_by_metrics_id"""
1903

1904
    def test_get_relay_address(self) -> None:
1✔
1905
        relay_address = baker.make(RelayAddress)
1✔
1906
        assert relay_address.metrics_id.startswith("R")
1✔
1907
        assert _get_mask_by_metrics_id(relay_address.metrics_id) == relay_address
1✔
1908

1909
    def test_get_domain_address(self) -> None:
1✔
1910
        premium_user = make_premium_test_user()
1✔
1911
        premium_user.profile.subdomain = "subdomain"
1✔
1912
        premium_user.profile.save()
1✔
1913
        domain_address = baker.make(DomainAddress, user=premium_user, address="baker")
1✔
1914
        assert domain_address.metrics_id.startswith("D")
1✔
1915
        assert _get_mask_by_metrics_id(domain_address.metrics_id) == domain_address
1✔
1916

1917
    def test_empty_mask_id(self) -> None:
1✔
1918
        assert _get_mask_by_metrics_id("") is None
1✔
1919

1920
    def test_not_mask_id_by_prefix(self) -> None:
1✔
1921
        assert _get_mask_by_metrics_id("ABC") is None
1✔
1922

1923
    def test_not_mask_id_by_id(self) -> None:
1✔
1924
        assert _get_mask_by_metrics_id("Dude") is None
1✔
1925

1926
    def test_relay_address_not_found(self) -> None:
1✔
1927
        assert not RelayAddress.objects.filter(id=1999).exists()
1✔
1928
        assert _get_mask_by_metrics_id("R1999") is None
1✔
1929

1930
    def test_domain_address_not_found(self) -> None:
1✔
1931
        assert not DomainAddress.objects.filter(id=1999).exists()
1✔
1932
        assert _get_mask_by_metrics_id("D1999") is None
1✔
1933

1934

1935
class SNSNotificationRemoveEmailsInS3Test(TestCase):
1✔
1936
    def setUp(self) -> None:
1✔
1937
        self.bucket = "test-bucket"
1✔
1938
        self.key = "/emails/objectkey123"
1✔
1939

1940
        self.patcher = patch(
1✔
1941
            "emails.views._get_address", side_effect=ObjectDoesNotExist()
1942
        )
1943
        self.patcher.start()
1✔
1944
        self.addCleanup(self.patcher.stop)
1✔
1945

1946
        remove_s3_patcher = patch("emails.views.remove_message_from_s3")
1✔
1947
        self.mock_remove_message_from_s3 = remove_s3_patcher.start()
1✔
1948
        self.addCleanup(remove_s3_patcher.stop)
1✔
1949

1950
    @patch("emails.views._handle_reply")
1✔
1951
    def test_reply_email_in_s3_deleted(self, mocked_handle_reply: Mock) -> None:
1✔
1952
        expected_status_code = 200
1✔
1953
        mocked_handle_reply.return_value = HttpResponse(
1✔
1954
            "Email Relayed", status=expected_status_code
1955
        )
1956

1957
        response = _sns_notification(EMAIL_SNS_BODIES["s3_stored_replies"])
1✔
1958
        mocked_handle_reply.assert_called_once()
1✔
1959
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
1960
        assert response.status_code == expected_status_code
1✔
1961

1962
    @patch("emails.views._handle_reply")
1✔
1963
    def test_reply_email_not_in_s3_deleted_ignored(
1✔
1964
        self, mocked_handle_reply: Mock
1965
    ) -> None:
1966
        expected_status_code = 200
1✔
1967
        mocked_handle_reply.return_value = HttpResponse(
1✔
1968
            "Email Relayed", status=expected_status_code
1969
        )
1970

1971
        response = _sns_notification(EMAIL_SNS_BODIES["replies"])
1✔
1972
        mocked_handle_reply.assert_called_once()
1✔
1973
        self.mock_remove_message_from_s3.assert_called_once_with(None, None)
1✔
1974
        assert response.status_code == expected_status_code
1✔
1975

1976
    @patch("emails.views._handle_reply")
1✔
1977
    def test_reply_email_in_s3_ses_client_error_not_deleted(
1✔
1978
        self, mocked_handle_reply: Mock
1979
    ) -> None:
1980
        # SES Client Error caught in _handle_reply responds with 503
1981
        expected_status_code = 503
1✔
1982
        mocked_handle_reply.return_value = HttpResponse(
1✔
1983
            "SES Client Error", status=expected_status_code
1984
        )
1985

1986
        response = _sns_notification(EMAIL_SNS_BODIES["s3_stored_replies"])
1✔
1987
        mocked_handle_reply.assert_called_once()
1✔
1988
        self.mock_remove_message_from_s3.assert_not_called()
1✔
1989
        assert response.status_code == expected_status_code
1✔
1990

1991
    def test_address_does_not_exist_email_not_in_s3_deleted_ignored(self) -> None:
1✔
1992
        response = _sns_notification(EMAIL_SNS_BODIES["domain_recipient"])
1✔
1993
        self.mock_remove_message_from_s3.assert_called_once_with(None, None)
1✔
1994
        assert response.status_code == 404
1✔
1995
        assert response.content == b"Address does not exist"
1✔
1996

1997
    def test_address_does_not_exist_email_in_s3_deleted(self) -> None:
1✔
1998
        response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
1999
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2000
        assert response.status_code == 404
1✔
2001
        assert response.content == b"Address does not exist"
1✔
2002

2003
    def test_bounce_notification_not_in_s3_deleted_ignored(self) -> None:
1✔
2004
        response = _sns_notification(BOUNCE_SNS_BODIES["soft"])
1✔
2005
        self.mock_remove_message_from_s3.assert_called_once_with(None, None)
1✔
2006
        assert response.status_code == 404
1✔
2007
        assert response.content == b"Address does not exist"
1✔
2008

2009
    def test_email_without_commonheaders_in_s3_deleted(self) -> None:
1✔
2010
        message_wo_commonheaders = EMAIL_SNS_BODIES["s3_stored"]["Message"].replace(
1✔
2011
            "commonHeaders", "invalidHeaders"
2012
        )
2013
        notification_wo_commonheaders = EMAIL_SNS_BODIES["s3_stored"].copy()
1✔
2014
        notification_wo_commonheaders["Message"] = message_wo_commonheaders
1✔
2015
        response = _sns_notification(notification_wo_commonheaders)
1✔
2016
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2017
        assert response.status_code == 400
1✔
2018
        assert response.content == b"Received SNS notification without commonHeaders."
1✔
2019

2020
    def test_email_to_non_relay_domain_in_s3_deleted(self) -> None:
1✔
2021
        message_w_non_relay_as_recipient = EMAIL_SNS_BODIES["s3_stored"][
1✔
2022
            "Message"
2023
        ].replace("sender@test.com", "to@not-relay.com")
2024
        notification_w_non_relay_domain = EMAIL_SNS_BODIES["s3_stored"].copy()
1✔
2025
        notification_w_non_relay_domain["Message"] = message_w_non_relay_as_recipient
1✔
2026
        response = _sns_notification(notification_w_non_relay_domain)
1✔
2027
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2028
        assert response.status_code == 404
1✔
2029
        assert response.content == b"Address does not exist"
1✔
2030

2031
    def test_malformed_to_field_email_in_s3_deleted(self) -> None:
1✔
2032
        message_w_malformed_to_field = EMAIL_SNS_BODIES["s3_stored"]["Message"].replace(
1✔
2033
            "sender@test.com", "not-relay-test.com"
2034
        )
2035
        notification_w_malformed_to_field = EMAIL_SNS_BODIES["s3_stored"].copy()
1✔
2036
        notification_w_malformed_to_field["Message"] = message_w_malformed_to_field
1✔
2037
        response = _sns_notification(notification_w_malformed_to_field)
1✔
2038
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2039
        assert response.status_code == 400
1✔
2040
        assert response.content == b"Malformed to field."
1✔
2041

2042
    def test_noreply_email_in_s3_deleted(self) -> None:
1✔
2043
        message_w_email_to_noreply = EMAIL_SNS_BODIES["s3_stored"]["Message"].replace(
1✔
2044
            "sender@test.com", "noreply@default.com"
2045
        )
2046
        notification_w_email_to_noreply = EMAIL_SNS_BODIES["s3_stored"].copy()
1✔
2047
        notification_w_email_to_noreply["Message"] = message_w_email_to_noreply
1✔
2048
        response = _sns_notification(notification_w_email_to_noreply)
1✔
2049
        self.mock_remove_message_from_s3(self.bucket, self.key)
1✔
2050
        assert response.status_code == 200
1✔
2051
        assert response.content == b"noreply address is not supported."
1✔
2052

2053
    def test_noreply_mixed_case_email_in_s3_deleted(self) -> None:
1✔
2054
        message_w_email_to_noreply = EMAIL_SNS_BODIES["s3_stored"]["Message"].replace(
1✔
2055
            "sender@test.com", "NoReply@default.com"
2056
        )
2057
        notification_w_email_to_noreply = EMAIL_SNS_BODIES["s3_stored"].copy()
1✔
2058
        notification_w_email_to_noreply["Message"] = message_w_email_to_noreply
1✔
2059
        response = _sns_notification(notification_w_email_to_noreply)
1✔
2060
        self.mock_remove_message_from_s3(self.bucket, self.key)
1✔
2061
        assert response.status_code == 200
1✔
2062
        assert response.content == b"noreply address is not supported."
1✔
2063

2064
    @override_settings(STATSD_ENABLED=True)
1✔
2065
    @patch("emails.views._get_keys_from_headers")
1✔
2066
    def test_noreply_headers_reply_email_in_s3_deleted(
1✔
2067
        self, mocked_get_keys: Mock
2068
    ) -> None:
2069
        """
2070
        If replies@... email has no "In-Reply-To" header, delete email, return 400.
2071
        """
2072
        mocked_get_keys.side_effect = ReplyHeadersNotFound()
1✔
2073

2074
        with MetricsMock() as mm:
1✔
2075
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored_replies"])
1✔
2076
        mm.assert_incr_once("reply_email_header_error", tags=["detail:no-header"])
1✔
2077
        mocked_get_keys.assert_called_once()
1✔
2078
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2079
        assert response.status_code == 400
1✔
2080

2081
    @override_settings(STATSD_ENABLED=True)
1✔
2082
    def test_no_header_reply_email_not_in_s3_deleted_ignored(self) -> None:
1✔
2083
        """If replies@... email has no "In-Reply-To" header, return 400"""
2084
        sns_msg = EMAIL_SNS_BODIES["replies"]
1✔
2085
        email_data = json.loads(sns_msg["Message"])
1✔
2086
        header_names = [
1✔
2087
            entry["name"].lower() for entry in email_data["mail"]["headers"]
2088
        ]
2089
        assert "in-reply-to" not in header_names
1✔
2090

2091
        with MetricsMock() as mm:
1✔
2092
            response = _sns_notification(sns_msg)
1✔
2093
        mm.assert_incr_once("reply_email_header_error", tags=["detail:no-header"])
1✔
2094
        self.mock_remove_message_from_s3.assert_called_once_with(None, None)
1✔
2095
        assert response.status_code == 400
1✔
2096

2097
    @override_settings(STATSD_ENABLED=True)
1✔
2098
    @patch("emails.views._get_reply_record_from_lookup_key")
1✔
2099
    def test_no_reply_record_reply_email_in_s3_deleted(
1✔
2100
        self, mocked_get_record: Mock
2101
    ) -> None:
2102
        """If no DB match for In-Reply-To header, delete email, return 404."""
2103
        mocked_get_record.side_effect = Reply.DoesNotExist()
1✔
2104

2105
        with MetricsMock() as mm:
1✔
2106
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored_replies"])
1✔
2107
        mm.assert_incr_once("reply_email_header_error", tags=["detail:no-reply-record"])
1✔
2108
        mocked_get_record.assert_called_once()
1✔
2109
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2110
        assert response.status_code == 404
1✔
2111

2112
    @override_settings(STATSD_ENABLED=True)
1✔
2113
    @patch("emails.views._get_keys_from_headers")
1✔
2114
    @patch("emails.views._get_reply_record_from_lookup_key")
1✔
2115
    def test_no_reply_record_reply_email_not_in_s3_deleted_ignored(
1✔
2116
        self, mocked_get_record: Mock, mocked_get_keys: Mock
2117
    ) -> None:
2118
        """If no DB match for In-Reply-To header, return 404."""
2119
        mocked_get_keys.return_value = ("lookup", "encryption")
1✔
2120
        mocked_get_record.side_effect = Reply.DoesNotExist()
1✔
2121

2122
        with MetricsMock() as mm:
1✔
2123
            response = _sns_notification(EMAIL_SNS_BODIES["replies"])
1✔
2124
        mm.assert_incr_once("reply_email_header_error", tags=["detail:no-reply-record"])
1✔
2125
        mocked_get_record.assert_called_once()
1✔
2126
        self.mock_remove_message_from_s3.assert_called_once_with(None, None)
1✔
2127
        assert response.status_code == 404
1✔
2128

2129

2130
class SNSNotificationInvalidMessageTest(TestCase):
1✔
2131
    def test_no_message(self):
1✔
2132
        """An empty message returns a 400 error"""
2133
        json_body = {"Message": "{}"}
1✔
2134
        response = _sns_notification(json_body)
1✔
2135
        assert response.status_code == 400
1✔
2136

2137
    def test_subscription_confirmation(self):
1✔
2138
        """A subscription confirmation returns a 400 error"""
2139
        json_body = INVALID_SNS_BODIES["subscription_confirmation"]
1✔
2140
        response = _sns_notification(json_body)
1✔
2141
        assert response.status_code == 400
1✔
2142

2143
    def test_notification_type_delivery(self):
1✔
2144
        """
2145
        A notificationType of delivery returns a 400 error.
2146

2147
        Test JSON derived from:
2148
        https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
2149
        """
2150
        notification = {
1✔
2151
            "notificationType": "Delivery",
2152
            "delivery": {
2153
                "timestamp": "2014-05-28T22:41:01.184Z",
2154
                "processingTimeMillis": 546,
2155
                "recipients": ["success@simulator.amazonses.com"],
2156
                "smtpResponse": "250 ok:  Message 64111812 accepted",
2157
                "reportingMTA": "a8-70.smtp-out.amazonses.com",
2158
                "remoteMtaIp": "127.0.2.0",
2159
            },
2160
        }
2161
        json_body = {"Message": json.dumps(notification)}
1✔
2162
        response = _sns_notification(json_body)
1✔
2163
        assert response.status_code == 400
1✔
2164

2165

2166
@override_settings(STATSD_ENABLED=True)
1✔
2167
class SNSNotificationValidUserEmailsInS3Test(TestCase):
1✔
2168
    def setUp(self) -> None:
1✔
2169
        self.bucket = "test-bucket"
1✔
2170
        self.key = "/emails/objectkey123"
1✔
2171
        self.user = baker.make(User, email="sender@test.com", make_m2m=True)
1✔
2172
        self.profile = self.user.profile
1✔
2173
        assert self.profile is not None
1✔
2174
        self.address = baker.make(
1✔
2175
            RelayAddress, user=self.user, address="sender", domain=2
2176
        )
2177

2178
        remove_s3_patcher = patch("emails.views.remove_message_from_s3")
1✔
2179
        self.mock_remove_message_from_s3 = remove_s3_patcher.start()
1✔
2180
        self.addCleanup(remove_s3_patcher.stop)
1✔
2181

2182
    def expected_glean_event(
1✔
2183
        self,
2184
        timestamp: str,
2185
        reason: str | None = None,
2186
        is_reply: bool = False,
2187
    ) -> dict[str, Any]:
2188
        extra_items = {
1✔
2189
            "n_random_masks": "1",
2190
            "is_random_mask": "true",
2191
            "is_reply": "true" if is_reply else "false",
2192
        }
2193
        if reason:
1✔
2194
            extra_items["reason"] = reason
1✔
2195
        return create_expected_glean_event(
1✔
2196
            category="email",
2197
            name="blocked" if reason else "forwarded",
2198
            user=self.user,
2199
            extra_items=extra_items,
2200
            event_time=timestamp,
2201
        )
2202

2203
    def assert_log_incoming_email_dropped(
1✔
2204
        self,
2205
        caplog: _LoggingWatcher,
2206
        reason: EmailDroppedReason,
2207
        can_retry: bool = False,
2208
    ) -> None:
2209
        assert_log_email_dropped(caplog, reason, self.address, can_retry=can_retry)
1✔
2210

2211
    def test_auto_block_spam_true_email_in_s3_deleted(self) -> None:
1✔
2212
        self.profile.auto_block_spam = True
1✔
2213
        self.profile.save()
1✔
2214
        message_spamverdict_failed = EMAIL_SNS_BODIES["s3_stored"]["Message"].replace(
1✔
2215
            '"spamVerdict":{"status":"PASS"}', '"spamVerdict":{"status":"FAIL"}'
2216
        )
2217
        notification_w_spamverdict_failed = EMAIL_SNS_BODIES["s3_stored"].copy()
1✔
2218
        notification_w_spamverdict_failed["Message"] = message_spamverdict_failed
1✔
2219

2220
        with self.assertLogs(INFO_LOG) as caplog, MetricsMock() as mm:
1✔
2221
            response = _sns_notification(notification_w_spamverdict_failed)
1✔
2222
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2223
        assert response.status_code == 200
1✔
2224
        assert response.content == b"Address rejects spam."
1✔
2225
        self.assert_log_incoming_email_dropped(caplog, "auto_block_spam")
1✔
2226
        mm.assert_incr_once("email_auto_suppressed_for_spam")
1✔
2227

2228
    def test_user_bounce_soft_paused_email_in_s3_deleted(self) -> None:
1✔
2229
        self.profile.last_soft_bounce = datetime.now(UTC)
1✔
2230
        self.profile.save()
1✔
2231

2232
        with self.assertLogs(INFO_LOG) as caplog, MetricsMock() as mm:
1✔
2233
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2234
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2235
        assert response.status_code == 200
1✔
2236
        assert response.content == b"Address is temporarily disabled."
1✔
2237
        self.assert_log_incoming_email_dropped(caplog, "soft_bounce_pause")
1✔
2238
        mm.assert_incr_once("email_suppressed_for_soft_bounce")
1✔
2239

2240
    def test_user_bounce_hard_paused_email_in_s3_deleted(self) -> None:
1✔
2241
        self.profile.last_hard_bounce = datetime.now(UTC)
1✔
2242
        self.profile.save()
1✔
2243

2244
        with self.assertLogs(INFO_LOG) as caplog, MetricsMock() as mm:
1✔
2245
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2246
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2247
        assert response.status_code == 200
1✔
2248
        assert response.content == b"Address is temporarily disabled."
1✔
2249
        self.assert_log_incoming_email_dropped(caplog, "hard_bounce_pause")
1✔
2250
        mm.assert_incr_once("email_suppressed_for_hard_bounce")
1✔
2251

2252
    def test_user_deactivated_email_in_s3_deleted(self) -> None:
1✔
2253
        self.profile.user.is_active = False
1✔
2254
        self.profile.user.save()
1✔
2255

2256
        with self.assertLogs(INFO_LOG) as caplog:
1✔
2257
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2258
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2259
        assert response.status_code == 200
1✔
2260
        assert response.content == b"Account is deactivated."
1✔
2261
        self.assert_log_incoming_email_dropped(caplog, "user_deactivated")
1✔
2262

2263
    @patch("emails.views._reply_allowed")
1✔
2264
    @patch("emails.views._get_reply_record_from_lookup_key")
1✔
2265
    def test_reply_not_allowed_email_in_s3_deleted(
1✔
2266
        self, mocked_reply_record: Mock, mocked_reply_allowed: Mock
2267
    ) -> None:
2268
        # external user sending a reply to Relay user
2269
        # where the replies were being exchanged but now the user
2270
        # no longer has the premium subscription
2271
        mocked_reply_allowed.return_value = False
1✔
2272

2273
        with self.assertLogs(INFO_LOG) as caplog:
1✔
2274
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2275
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2276
        assert response.status_code == 403
1✔
2277
        assert response.content == b"Relay replies require a premium account"
1✔
2278
        self.assert_log_incoming_email_dropped(caplog, "reply_requires_premium")
1✔
2279

2280
    def test_flagged_user_email_in_s3_deleted(self) -> None:
1✔
2281
        profile = self.address.user.profile
1✔
2282
        profile.last_account_flagged = datetime.now(UTC)
1✔
2283
        profile.last_engagement = datetime.now(UTC)
1✔
2284
        profile.save()
1✔
2285
        pre_flagged_last_engagement = profile.last_engagement
1✔
2286

2287
        with self.assertLogs(INFO_LOG) as caplog:
1✔
2288
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2289
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2290
        assert response.status_code == 200
1✔
2291
        assert response.content == b"Address is temporarily disabled."
1✔
2292
        profile.refresh_from_db()
1✔
2293
        assert profile.last_engagement == pre_flagged_last_engagement
1✔
2294
        self.assert_log_incoming_email_dropped(caplog, "abuse_flag")
1✔
2295

2296
    def test_relay_address_disabled_email_in_s3_deleted(self) -> None:
1✔
2297
        self.address.enabled = False
1✔
2298
        self.address.save()
1✔
2299
        profile = self.address.user.profile
1✔
2300
        profile.last_engagement = datetime.now(UTC)
1✔
2301
        profile.save()
1✔
2302
        pre_blocked_email_last_engagement = profile.last_engagement
1✔
2303

2304
        with self.assertLogs(GLEAN_LOG) as caplog, MetricsMock() as mm:
1✔
2305
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2306
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2307
        assert response.status_code == 200
1✔
2308
        assert response.content == b"Address is temporarily disabled."
1✔
2309
        profile.refresh_from_db()
1✔
2310
        assert profile.last_engagement > pre_blocked_email_last_engagement
1✔
2311

2312
        assert (event := get_glean_event(caplog)) is not None
1✔
2313
        expected = self.expected_glean_event(event["timestamp"], "block_all")
1✔
2314
        assert event == expected
1✔
2315
        mm.assert_incr_once("email_for_disabled_address")
1✔
2316

2317
    @patch("emails.views._check_email_from_list")
1✔
2318
    def test_blocked_list_email_in_s3_deleted(
1✔
2319
        self, mocked_email_is_from_list: Mock
2320
    ) -> None:
2321
        upgrade_test_user_to_premium(self.user)
1✔
2322
        self.address.block_list_emails = True
1✔
2323
        self.address.save()
1✔
2324
        profile = self.address.user.profile
1✔
2325
        profile.last_engagement = datetime.now(UTC)
1✔
2326
        profile.save()
1✔
2327
        pre_blocked_email_last_engagement = profile.last_engagement
1✔
2328
        mocked_email_is_from_list.return_value = True
1✔
2329

2330
        with self.assertLogs(GLEAN_LOG) as caplog, MetricsMock() as mm:
1✔
2331
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2332
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2333
        assert response.status_code == 200
1✔
2334
        assert response.content == b"Address is not accepting list emails."
1✔
2335
        profile.refresh_from_db()
1✔
2336
        assert profile.last_engagement > pre_blocked_email_last_engagement
1✔
2337

2338
        assert (event := get_glean_event(caplog)) is not None
1✔
2339
        expected = self.expected_glean_event(event["timestamp"], "block_promotional")
1✔
2340
        assert event == expected
1✔
2341
        mm.assert_incr_once("list_email_for_address_blocking_lists")
1✔
2342

2343
    @patch("emails.views.get_message_content_from_s3")
1✔
2344
    def test_get_text_html_s3_client_error_email_in_s3_not_deleted(
1✔
2345
        self, mocked_get_content: Mock
2346
    ) -> None:
2347
        mocked_get_content.side_effect = ClientError(
1✔
2348
            {"Error": {"Code": "SomeErrorCode", "Message": "Details"}}, ""
2349
        )
2350

2351
        with (
1✔
2352
            self.assertLogs(INFO_LOG) as info_caplog,
2353
            self.assertLogs(ERROR_LOG, "ERROR") as error_caplog,
2354
        ):
2355
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2356
        self.mock_remove_message_from_s3.assert_not_called()
1✔
2357
        assert response.status_code == 503
1✔
2358
        assert response.content == b"Cannot fetch the message content from S3"
1✔
2359

2360
        self.assert_log_incoming_email_dropped(
1✔
2361
            info_caplog, "error_storage", can_retry=True
2362
        )
2363
        assert len(error_caplog.records) == 1
1✔
2364
        error_log = error_caplog.records[0]
1✔
2365
        assert error_log.message == "s3_client_error_get_email"
1✔
2366
        assert log_extra(error_log) == {"Code": "SomeErrorCode", "Message": "Details"}
1✔
2367

2368
    @patch("emails.apps.EmailsConfig.ses_client", spec_set=["send_raw_email"])
1✔
2369
    @patch("emails.views.get_message_content_from_s3")
1✔
2370
    def test_ses_client_error_email_in_s3_not_deleted(
1✔
2371
        self, mocked_get_content: Mock, mocked_ses_client: Mock
2372
    ) -> None:
2373
        mocked_get_content.return_value = create_email_from_notification(
1✔
2374
            EMAIL_SNS_BODIES["s3_stored"], text="text_content"
2375
        )
2376
        mocked_ses_client.send_raw_email.side_effect = SEND_RAW_EMAIL_FAILED
1✔
2377

2378
        with self.assertLogs(INFO_LOG) as caplog:
1✔
2379
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2380
        self.mock_remove_message_from_s3.assert_not_called()
1✔
2381
        assert response.status_code == 503
1✔
2382
        assert response.content == b"SES client error on Raw Email"
1✔
2383
        self.assert_log_incoming_email_dropped(caplog, "error_sending", can_retry=True)
1✔
2384

2385
    @patch("emails.apps.EmailsConfig.ses_client", spec_set=["send_raw_email"])
1✔
2386
    @patch("emails.views.get_message_content_from_s3")
1✔
2387
    def test_successful_email_in_s3_deleted(
1✔
2388
        self, mocked_get_content: Mock, mocked_ses_client: Mock
2389
    ) -> None:
2390
        mocked_get_content.return_value = create_email_from_notification(
1✔
2391
            EMAIL_SNS_BODIES["s3_stored"], "text_content"
2392
        )
2393
        mocked_ses_client.send_raw_email.return_value = {"MessageId": "NICE"}
1✔
2394

2395
        with self.assertLogs(GLEAN_LOG) as caplog:
1✔
2396
            response = _sns_notification(EMAIL_SNS_BODIES["s3_stored"])
1✔
2397
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2398
        assert response.status_code == 200
1✔
2399
        assert response.content == b"Sent email to final recipient."
1✔
2400

2401
        assert (event := get_glean_event(caplog)) is not None
1✔
2402
        expected = self.expected_glean_event(event["timestamp"])
1✔
2403
        assert event == expected
1✔
2404

2405
    @override_settings(STATSD_ENABLED=True)
1✔
2406
    @patch("emails.apps.EmailsConfig.ses_client", spec_set=["send_raw_email"])
1✔
2407
    @patch("emails.views.get_message_content_from_s3")
1✔
2408
    def test_dmarc_failure_s3_deleted(
1✔
2409
        self, mocked_get_content: Mock, mocked_ses_client: Mock
2410
    ) -> None:
2411
        """A message with a failing DMARC and a "reject" policy is rejected."""
2412
        mocked_get_content.side_effect = FAIL_TEST_IF_CALLED
1✔
2413
        mocked_ses_client.send_raw_email.side_effect = FAIL_TEST_IF_CALLED
1✔
2414

2415
        with self.assertLogs(INFO_LOG) as caplog, MetricsMock() as mm:
1✔
2416
            response = _sns_notification(EMAIL_SNS_BODIES["dmarc_failed"])
1✔
2417
        self.mock_remove_message_from_s3.assert_called_once_with(self.bucket, self.key)
1✔
2418
        assert response.status_code == 400
1✔
2419
        assert response.content == b"DMARC failure, policy is reject"
1✔
2420
        assert_log_email_dropped(caplog, "dmarc_reject_failed", self.address)
1✔
2421
        mm.assert_incr_once(
1✔
2422
            "email_suppressed_for_dmarc_failure",
2423
            tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
2424
        )
2425

2426

2427
class SnsMessageTest(TestCase):
1✔
2428
    def setUp(self) -> None:
1✔
2429
        self.message_json = json.loads(EMAIL_SNS_BODIES["s3_stored"]["Message"])
1✔
2430
        assert self.message_json["mail"]["destination"] == ["sender@test.com"]
1✔
2431

2432
        # Create a matching user and address for the recipients "sender@test.com"
2433
        user = baker.make(User)
1✔
2434
        baker.make(SocialAccount, user=user, provider="fxa")
1✔
2435
        # test.com is the second domain listed and has the numerical value 2
2436
        self.ra = baker.make(RelayAddress, user=user, address="sender", domain=2)
1✔
2437

2438
        get_content_patcher = patch(
1✔
2439
            "emails.views.get_message_content_from_s3",
2440
            return_value=create_email_from_notification(
2441
                EMAIL_SNS_BODIES["s3_stored"], "text"
2442
            ),
2443
        )
2444
        self.mock_get_content = get_content_patcher.start()
1✔
2445
        self.addCleanup(get_content_patcher.stop)
1✔
2446

2447
        ses_client_patcher = patch(
1✔
2448
            "emails.apps.EmailsConfig.ses_client",
2449
            spec_set=["send_raw_email"],
2450
        )
2451
        self.mock_ses_client = ses_client_patcher.start()
1✔
2452
        self.addCleanup(ses_client_patcher.stop)
1✔
2453

2454
    def test_get_message_content_from_s3_not_found(self) -> None:
1✔
2455
        self.mock_get_content.side_effect = ClientError(
1✔
2456
            operation_name="S3.something",
2457
            error_response={"Error": {"Code": "NoSuchKey", "Message": "the message"}},
2458
        )
2459
        with (
1✔
2460
            self.assertLogs(INFO_LOG) as info_caplog,
2461
            self.assertLogs(ERROR_LOG, "ERROR") as error_caplog,
2462
        ):
2463
            response = _sns_message(self.message_json)
1✔
2464
        self.mock_ses_client.send_raw_email.assert_not_called()
1✔
2465
        assert response.status_code == 404
1✔
2466
        assert response.content == b"Email not in S3"
1✔
2467

2468
        assert_log_email_dropped(info_caplog, "content_missing", self.ra)
1✔
2469
        assert len(error_caplog.records) == 1
1✔
2470
        error_log = error_caplog.records[0]
1✔
2471
        assert error_log.message == "s3_object_does_not_exist"
1✔
2472
        assert log_extra(error_log) == {"Code": "NoSuchKey", "Message": "the message"}
1✔
2473

2474
    def test_ses_send_raw_email_has_client_error_early_exits(self) -> None:
1✔
2475
        self.mock_ses_client.send_raw_email.side_effect = SEND_RAW_EMAIL_FAILED
1✔
2476
        with (
1✔
2477
            self.assertLogs(INFO_LOG) as info_caplog,
2478
            self.assertLogs(ERROR_LOG, "ERROR") as error_caplog,
2479
        ):
2480
            response = _sns_message(self.message_json)
1✔
2481
        self.mock_ses_client.send_raw_email.assert_called_once()
1✔
2482
        assert response.status_code == 503
1✔
2483

2484
        assert_log_email_dropped(info_caplog, "error_sending", self.ra, can_retry=True)
1✔
2485
        assert len(error_caplog.records) == 1
1✔
2486
        error_log = error_caplog.records[0]
1✔
2487
        assert error_log.message == "ses_client_error_raw_email"
1✔
2488
        assert log_extra(error_log) == {"Code": "the code", "Message": "the message"}
1✔
2489

2490
    def test_ses_send_raw_email_email_relayed_email_deleted_from_s3(self):
1✔
2491
        self.mock_ses_client.send_raw_email.return_value = {"MessageId": str(uuid4())}
1✔
2492
        with self.assertLogs(GLEAN_LOG) as caplog:
1✔
2493
            response = _sns_message(self.message_json)
1✔
2494
        self.mock_ses_client.send_raw_email.assert_called_once()
1✔
2495
        assert response.status_code == 200
1✔
2496

2497
        assert (event := get_glean_event(caplog)) is not None
1✔
2498
        assert event["category"] == "email"
1✔
2499
        assert event["name"] == "forwarded"
1✔
2500

2501

2502
@override_settings(SITE_ORIGIN="https://test.com", STATSD_ENABLED=True)
1✔
2503
class GetAddressTest(TestCase):
1✔
2504
    def setUp(self):
1✔
2505
        self.user = make_premium_test_user()
1✔
2506
        self.user.profile.subdomain = "subdomain"
1✔
2507
        self.user.profile.save()
1✔
2508
        self.relay_address = baker.make(
1✔
2509
            RelayAddress, user=self.user, address="relay123"
2510
        )
2511
        self.deleted_relay_address = baker.make(
1✔
2512
            DeletedAddress, address_hash=address_hash("deleted456", domain="test.com")
2513
        )
2514
        self.domain_address = baker.make(
1✔
2515
            DomainAddress, user=self.user, address="domain"
2516
        )
2517

2518
    def test_existing_relay_address(self):
1✔
2519
        assert _get_address("relay123@test.com") == self.relay_address
1✔
2520

2521
    def test_uppercase_local_part_of_existing_relay_address(self):
1✔
2522
        """Case-insensitive matching is used for the local part of relay addresses."""
2523
        assert _get_address("Relay123@test.com") == self.relay_address
1✔
2524

2525
    def test_uppercase_domain_part_of_existing_relay_address(self):
1✔
2526
        """Case-insensitive matching is used for the domain part of relay addresses."""
2527
        assert _get_address("relay123@Test.Com") == self.relay_address
1✔
2528

2529
    def test_unknown_relay_address_raises(self):
1✔
2530
        with pytest.raises(RelayAddress.DoesNotExist), MetricsMock() as mm:
1✔
2531
            _get_address("unknown@test.com")
1✔
2532
        mm.assert_incr_once("email_for_unknown_address")
1✔
2533

2534
    def test_deleted_relay_address_raises(self):
1✔
2535
        with pytest.raises(RelayAddress.DoesNotExist), MetricsMock() as mm:
1✔
2536
            _get_address("deleted456@test.com")
1✔
2537
        mm.assert_incr_once("email_for_deleted_address")
1✔
2538

2539
    def test_multiple_deleted_relay_addresses_raises_same_as_one(self):
1✔
2540
        """Multiple DeletedAddress records can have the same hash."""
2541
        baker.make(DeletedAddress, address_hash=self.deleted_relay_address.address_hash)
1✔
2542
        with pytest.raises(RelayAddress.DoesNotExist), MetricsMock() as mm:
1✔
2543
            _get_address("deleted456@test.com")
1✔
2544
        mm.assert_incr_once("email_for_deleted_address_multiple")
1✔
2545

2546
    def test_existing_domain_address(self) -> None:
1✔
2547
        with self.assertNoLogs(GLEAN_LOG, "INFO"):
1✔
2548
            assert _get_address("domain@subdomain.test.com") == self.domain_address
1✔
2549

2550
    def test_uppercase_local_part_of_existing_domain_address(self) -> None:
1✔
2551
        """Case-insensitive matching is used in the local part of a domain address."""
2552
        with self.assertNoLogs(GLEAN_LOG, "INFO"):
1✔
2553
            assert _get_address("Domain@subdomain.test.com") == self.domain_address
1✔
2554

2555
    def test_uppercase_subdomain_part_of_existing_domain_address(self) -> None:
1✔
2556
        """Case-insensitive matching is used in the subdomain of a domain address."""
2557
        with self.assertNoLogs(GLEAN_LOG, "INFO"):
1✔
2558
            assert _get_address("domain@SubDomain.test.com") == self.domain_address
1✔
2559

2560
    def test_uppercase_domain_part_of_existing_domain_address(self) -> None:
1✔
2561
        """Case-insensitive matching is used in the domain part of a domain address."""
2562
        with self.assertNoLogs(GLEAN_LOG, "INFO"):
1✔
2563
            assert _get_address("domain@subdomain.Test.Com") == self.domain_address
1✔
2564

2565
    def test_subdomain_for_wrong_domain_raises(self) -> None:
1✔
2566
        with (
1✔
2567
            pytest.raises(ObjectDoesNotExist) as exc_info,
2568
            MetricsMock() as mm,
2569
            self.assertNoLogs(GLEAN_LOG, "INFO"),
2570
        ):
2571
            _get_address("unknown@subdomain.example.com")
1✔
2572
        assert str(exc_info.value) == "Address does not exist"
1✔
2573
        mm.assert_incr_once("email_for_not_supported_domain")
1✔
2574

2575
    def test_unknown_subdomain_raises(self) -> None:
1✔
2576
        with (
1✔
2577
            pytest.raises(Profile.DoesNotExist),
2578
            MetricsMock() as mm,
2579
            self.assertNoLogs(GLEAN_LOG, "INFO"),
2580
        ):
2581
            _get_address("domain@unknown.test.com")
1✔
2582
        mm.assert_incr_once("email_for_dne_subdomain")
1✔
2583

2584
    def test_unknown_domain_address_is_created(self) -> None:
1✔
2585
        """
2586
        An unknown but valid domain address is created.
2587

2588
        This supports creating domain addresses on third-party sites, when
2589
        emailing a checkout receipt, or other situations when the email
2590
        cannot be pre-created.
2591
        """
2592
        assert DomainAddress.objects.filter(user=self.user).count() == 1
1✔
2593
        with self.assertLogs(GLEAN_LOG, "INFO") as caplog:
1✔
2594
            address = _get_address("unknown@subdomain.test.com")
1✔
2595
        assert address.user == self.user
1✔
2596
        assert address.address == "unknown"
1✔
2597
        assert DomainAddress.objects.filter(user=self.user).count() == 2
1✔
2598

2599
        assert (event := get_glean_event(caplog)) is not None
1✔
2600
        expected_event = create_expected_glean_event(
1✔
2601
            category="email_mask",
2602
            name="created",
2603
            user=self.user,
2604
            extra_items={
2605
                "n_random_masks": "1",
2606
                "n_domain_masks": "2",
2607
                "is_random_mask": "false",
2608
                "has_website": "false",
2609
                "created_by_api": "false",
2610
            },
2611
            event_time=event["timestamp"],
2612
        )
2613
        assert event == expected_event
1✔
2614

2615
    def test_unknown_domain_address_is_not_created(self) -> None:
1✔
2616
        """An unknown but valid domain address raises with create=False"""
2617
        assert DomainAddress.objects.filter(user=self.user).count() == 1
1✔
2618
        with pytest.raises(DomainAddress.DoesNotExist):
1✔
2619
            _get_address("unknown@subdomain.test.com", create=False)
1✔
2620
        assert DomainAddress.objects.filter(user=self.user).count() == 1
1✔
2621

2622
    def test_uppercase_local_part_of_unknown_domain_address(self) -> None:
1✔
2623
        """
2624
        Uppercase letters are allowed in the local part of a new domain address.
2625

2626
        This creates a new domain address with lower-cased letters. It supports
2627
        creating domain addresses by third-parties that would not be allowed
2628
        on the relay dashboard due to the upper-case characters, but are still
2629
        consistent with dashboard-created domain addresses.
2630
        """
2631
        assert DomainAddress.objects.filter(user=self.user).count() == 1
1✔
2632
        with self.assertLogs(GLEAN_LOG, "INFO") as caplog:
1✔
2633
            address = _get_address("Unknown@subdomain.test.com")
1✔
2634
        assert address.user == self.user
1✔
2635
        assert address.address == "unknown"
1✔
2636
        assert DomainAddress.objects.filter(user=self.user).count() == 2
1✔
2637

2638
        assert (event := get_glean_event(caplog)) is not None
1✔
2639
        expected_event = create_expected_glean_event(
1✔
2640
            category="email_mask",
2641
            name="created",
2642
            user=self.user,
2643
            extra_items={
2644
                "n_random_masks": "1",
2645
                "n_domain_masks": "2",
2646
                "is_random_mask": "false",
2647
                "has_website": "false",
2648
                "created_by_api": "false",
2649
            },
2650
            event_time=event["timestamp"],
2651
        )
2652
        assert event == expected_event
1✔
2653

2654
    def test_uppercase_local_part_of_unknown_domain_address_not_created(self) -> None:
1✔
2655
        """
2656
        Uppercase letters are allowed, but still do not create the domain address.
2657
        """
2658
        assert DomainAddress.objects.filter(user=self.user).count() == 1
1✔
2659
        with pytest.raises(DomainAddress.DoesNotExist):
1✔
2660
            _get_address("Unknown@subdomain.test.com", create=False)
1✔
2661
        assert DomainAddress.objects.filter(user=self.user).count() == 1
1✔
2662

2663

2664
@override_settings(SITE_ORIGIN="https://test.com", STATSD_ENABLED=True)
1✔
2665
class GetAddressIfExistsTest(TestCase):
1✔
2666
    def setUp(self):
1✔
2667
        self.user = make_premium_test_user()
1✔
2668
        self.user.profile.subdomain = "subdomain"
1✔
2669
        self.user.profile.save()
1✔
2670
        self.relay_address = baker.make(
1✔
2671
            RelayAddress, user=self.user, address="relay123"
2672
        )
2673
        self.deleted_relay_address = baker.make(
1✔
2674
            DeletedAddress, address_hash=address_hash("deleted456", domain="test.com")
2675
        )
2676
        self.domain_address = baker.make(
1✔
2677
            DomainAddress, user=self.user, address="domain"
2678
        )
2679

2680
    def test_existing_relay_address(self):
1✔
2681
        assert _get_address_if_exists("relay123@test.com") == self.relay_address
1✔
2682

2683
    def test_unknown_relay_address(self):
1✔
2684
        with MetricsMock() as mm:
1✔
2685
            assert _get_address_if_exists("unknown@test.com") is None
1✔
2686
        mm.assert_not_incr("email_for_unknown_address")
1✔
2687

2688
    def test_deleted_relay_address(self):
1✔
2689
        with MetricsMock() as mm:
1✔
2690
            assert _get_address_if_exists("deleted456@test.com") is None
1✔
2691
        mm.assert_not_incr("email_for_deleted_address")
1✔
2692

2693
    def test_multiple_deleted_relay_addresses_same_as_one(self):
1✔
2694
        """Multiple DeletedAddress records can have the same hash."""
2695
        baker.make(DeletedAddress, address_hash=self.deleted_relay_address.address_hash)
1✔
2696
        with MetricsMock() as mm:
1✔
2697
            assert _get_address_if_exists("deleted456@test.com") is None
1✔
2698
        mm.assert_not_incr("email_for_deleted_address_multiple")
1✔
2699

2700
    def test_existing_domain_address(self) -> None:
1✔
2701
        with self.assertNoLogs(GLEAN_LOG, "INFO"):
1✔
2702
            assert (
1✔
2703
                _get_address_if_exists("domain@subdomain.test.com")
2704
                == self.domain_address
2705
            )
2706

2707
    def test_subdomain_for_wrong_domain(self) -> None:
1✔
2708
        with MetricsMock() as mm, self.assertNoLogs(GLEAN_LOG, "INFO"):
1✔
2709
            assert _get_address_if_exists("unknown@subdomain.example.com") is None
1✔
2710
        mm.assert_not_incr("email_for_not_supported_domain")
1✔
2711

2712
    def test_unknown_subdomain(self) -> None:
1✔
2713
        with MetricsMock() as mm, self.assertNoLogs(GLEAN_LOG, "INFO"):
1✔
2714
            assert _get_address_if_exists("domain@unknown.test.com") is None
1✔
2715
        mm.assert_not_incr("email_for_dne_subdomain")
1✔
2716

2717
    def test_unknown_domain_address(self) -> None:
1✔
2718
        """An unknown but valid domain address raises with create=False"""
2719
        assert _get_address_if_exists("unknown@subdomain.test.com") is None
1✔
2720

2721
    def test_uppercase_local_part_of_unknown_domain_address(self) -> None:
1✔
2722
        assert _get_address_if_exists("Unknown@subdomain.test.com") is None
1✔
2723

2724

2725
TEST_AWS_SNS_TOPIC = "arn:aws:sns:us-east-1:111222333:relay"
1✔
2726
TEST_AWS_SNS_TOPIC2 = TEST_AWS_SNS_TOPIC + "-alt"
1✔
2727

2728

2729
@override_settings(AWS_SNS_TOPIC={TEST_AWS_SNS_TOPIC, TEST_AWS_SNS_TOPIC2})
1✔
2730
class ValidateSnsArnTypeTests(SimpleTestCase):
1✔
2731
    def test_valid_arn_and_type(self) -> None:
1✔
2732
        ret = validate_sns_arn_and_type(TEST_AWS_SNS_TOPIC, "SubscriptionConfirmation")
1✔
2733
        assert ret is None
1✔
2734

2735
    def test_no_topic_arn(self) -> None:
1✔
2736
        ret = validate_sns_arn_and_type(None, "Notification")
1✔
2737
        assert ret == {
1✔
2738
            "error": "Received SNS request without Topic ARN.",
2739
            "received_topic_arn": None,
2740
            "supported_topic_arn": [TEST_AWS_SNS_TOPIC, TEST_AWS_SNS_TOPIC2],
2741
            "received_sns_type": "Notification",
2742
            "supported_sns_types": ["SubscriptionConfirmation", "Notification"],
2743
        }
2744

2745
    def test_wrong_topic_arn(self) -> None:
1✔
2746
        ret = validate_sns_arn_and_type(TEST_AWS_SNS_TOPIC + "-new", "Notification")
1✔
2747
        assert ret is not None
1✔
2748
        assert ret["error"] == "Received SNS message for wrong topic."
1✔
2749

2750
    def test_no_message_type(self) -> None:
1✔
2751
        ret = validate_sns_arn_and_type(TEST_AWS_SNS_TOPIC2, None)
1✔
2752
        assert ret is not None
1✔
2753
        assert ret["error"] == "Received SNS request without Message Type."
1✔
2754

2755
    def test_unsupported_message_type(self) -> None:
1✔
2756
        ret = validate_sns_arn_and_type(TEST_AWS_SNS_TOPIC, "UnsubscribeConfirmation")
1✔
2757
        assert ret is not None
1✔
2758
        assert ret["error"] == "Received SNS message for unsupported Type."
1✔
2759

2760

2761
@override_settings(AWS_SNS_TOPIC={EMAIL_SNS_BODIES["s3_stored"]["TopicArn"]})
1✔
2762
class SnsInboundViewSimpleTests(SimpleTestCase):
1✔
2763
    """Tests for /emails/sns_inbound that do not require database access."""
2764

2765
    def setUp(self):
1✔
2766
        self.valid_message = EMAIL_SNS_BODIES["s3_stored"]
1✔
2767
        self.client = Client(
1✔
2768
            headers={
2769
                "x-amz-sns-topic-arn": self.valid_message["TopicArn"],
2770
                "x-amz-sns-message-type": self.valid_message["Type"],
2771
            }
2772
        )
2773
        self.url = "/emails/sns-inbound"
1✔
2774
        patcher1 = patch(
1✔
2775
            "emails.views.verify_from_sns", side_effect=self._verify_from_sns_pass
2776
        )
2777
        self.mock_verify_from_sns = patcher1.start()
1✔
2778
        self.addCleanup(patcher1.stop)
1✔
2779

2780
        patcher2 = patch(
1✔
2781
            "emails.views._sns_inbound_logic", side_effect=self._sns_inbound_logic_ok
2782
        )
2783
        self.mock_sns_inbound_logic = patcher2.start()
1✔
2784
        self.addCleanup(patcher2.stop)
1✔
2785

2786
    def _verify_from_sns_pass(self, json_body):
1✔
2787
        """A verify_from_sns that passes content"""
2788
        return json_body
1✔
2789

2790
    def _sns_inbound_logic_ok(self, topic_arn, message_type, verified_json_body):
1✔
2791
        """A _sns_inbound_logic that returns a 200 OK"""
2792
        return HttpResponse("Looks good to me.")
1✔
2793

2794
    def test_valid_message(self):
1✔
2795
        ret = self.client.post(
1✔
2796
            self.url,
2797
            data=self.valid_message,
2798
            content_type="application/json",
2799
        )
2800
        assert ret.status_code == 200
1✔
2801

2802
    def test_no_topic_arn(self):
1✔
2803
        invalid_message = deepcopy(self.valid_message)
1✔
2804
        invalid_message["TopicArn"] = None
1✔
2805
        ret = self.client.post(
1✔
2806
            self.url,
2807
            data=invalid_message,
2808
            content_type="application/json",
2809
            headers={"x-amz-sns-topic-arn": None},
2810
        )
2811
        assert ret.status_code == 400
1✔
2812
        assert ret.content == b"Received SNS request without Topic ARN."
1✔
2813

2814
    def test_wrong_topic_arn(self):
1✔
2815
        invalid_message = deepcopy(self.valid_message)
1✔
2816
        invalid_message["TopicArn"] = "wrong_arn"
1✔
2817
        ret = self.client.post(
1✔
2818
            self.url,
2819
            data=invalid_message,
2820
            content_type="application/json",
2821
            headers={"x-amz-sns-topic-arn": "wrong_arn"},
2822
        )
2823
        assert ret.status_code == 400
1✔
2824
        assert ret.content == b"Received SNS message for wrong topic."
1✔
2825

2826
    def test_no_message_type(self):
1✔
2827
        invalid_message = deepcopy(self.valid_message)
1✔
2828
        invalid_message["Type"] = None
1✔
2829
        ret = self.client.post(
1✔
2830
            self.url,
2831
            data=invalid_message,
2832
            content_type="application/json",
2833
            headers={"x-amz-sns-message-type": None},
2834
        )
2835
        assert ret.status_code == 400
1✔
2836
        assert ret.content == b"Received SNS request without Message Type."
1✔
2837

2838
    def test_unsupported_message_type(self):
1✔
2839
        invalid_message = deepcopy(self.valid_message)
1✔
2840
        invalid_message["Type"] = "UnsubscribeConfirmation"
1✔
2841
        ret = self.client.post(
1✔
2842
            self.url,
2843
            data=invalid_message,
2844
            content_type="application/json",
2845
            headers={"x-amz-sns-message-type": "UnsubscribeConfirmation"},
2846
        )
2847
        assert ret.status_code == 400
1✔
2848
        assert ret.content == b"Received SNS message for unsupported Type."
1✔
2849

2850

2851
@override_settings(STATSD_ENABLED=True)
1✔
2852
class RecordReceiptVerdictsTests(SimpleTestCase):
1✔
2853
    """Test the metrics emitter _record_receipt_verdicts."""
2854

2855
    def expected_records(self, state, receipt_overrides=None):
1✔
2856
        """Return the expected metrics emitted by calling _record_receipt_verdicts."""
2857
        verdicts = ["dkim", "dmarc", "spam", "spf", "virus"]
1✔
2858

2859
        # Five counters for each verdict type
2860
        verdict_metrics = [
1✔
2861
            MetricsRecord(
2862
                stat_type="incr",
2863
                key=f"relay.emails.verdicts.{verdict}Verdict",
2864
                value=1,
2865
                tags=[f"state:{state}"],
2866
            )
2867
            for verdict in verdicts
2868
        ]
2869

2870
        # One counter for this email processing state, with tags
2871
        receipt_overrides = receipt_overrides or {}
1✔
2872
        status = {
1✔
2873
            verdict: receipt_overrides.get(f"{verdict}Verdict", "PASS")
2874
            for verdict in verdicts
2875
        }
2876
        state_tags = [f"{verdict}Verdict:{status[verdict]}" for verdict in verdicts]
1✔
2877
        if "dmarcPolicy" in receipt_overrides:
1✔
2878
            state_tags.append(f"dmarcPolicy:{receipt_overrides['dmarcPolicy']}")
1✔
2879
            state_tags.sort()
1✔
2880
        state_metric = MetricsRecord(
1✔
2881
            stat_type="incr",
2882
            key=f"relay.emails.state.{state}",
2883
            value=1,
2884
            tags=state_tags,
2885
        )
2886

2887
        return verdict_metrics + [state_metric]
1✔
2888

2889
    def test_s3_stored_email(self):
1✔
2890
        """The s3_stored fixture passes all checks."""
2891
        body = json.loads(EMAIL_SNS_BODIES["s3_stored"]["Message"])
1✔
2892
        receipt = body["receipt"]
1✔
2893
        with MetricsMock() as mm:
1✔
2894
            _record_receipt_verdicts(receipt, "a_state")
1✔
2895
        assert mm.get_records() == self.expected_records("a_state")
1✔
2896

2897
    def test_dmarc_failed_email(self):
1✔
2898
        body = json.loads(EMAIL_SNS_BODIES["dmarc_failed"]["Message"])
1✔
2899
        receipt = body["receipt"]
1✔
2900
        with MetricsMock() as mm:
1✔
2901
            _record_receipt_verdicts(receipt, "a_state")
1✔
2902
        overrides = {
1✔
2903
            "spfVerdict": "FAIL",
2904
            "dmarcVerdict": "FAIL",
2905
            "dmarcPolicy": "reject",
2906
        }
2907
        assert mm.get_records() == self.expected_records("a_state", overrides)
1✔
2908

2909

2910
@pytest.mark.django_db
1✔
2911
def test_wrapped_email_test_from_profile(rf):
1✔
2912
    user = baker.make(User)
1✔
2913
    baker.make(
1✔
2914
        SocialAccount,
2915
        user=user,
2916
        provider="fxa",
2917
        extra_data={"locale": "de,en-US;q=0.9,en;q=0.8"},
2918
    )
2919
    request = rf.get("/emails/wrapped_email_test")
1✔
2920
    request.user = user
1✔
2921
    response = wrapped_email_test(request)
1✔
2922
    assert response.status_code == 200
1✔
2923
    no_space_html = re.sub(r"\s+", "", response.content.decode())
1✔
2924
    assert "<li><strong>language</strong>:de" in no_space_html
1✔
2925
    assert "<li><strong>has_premium</strong>:No" in no_space_html
1✔
2926
    assert "<li><strong>has_tracker_report_link</strong>:No" in no_space_html
1✔
2927
    assert (
1✔
2928
        "<li><strong>num_level_one_email_trackers_removed</strong>:0" in no_space_html
2929
    )
2930

2931

2932
@pytest.mark.parametrize("language", ("en", "fy-NL", "ja"))
1✔
2933
@pytest.mark.parametrize("has_premium", ("Yes", "No"))
1✔
2934
@pytest.mark.parametrize("has_tracker_report_link", ("Yes", "No"))
1✔
2935
@pytest.mark.parametrize("num_level_one_email_trackers_removed", ("0", "1", "2"))
1✔
2936
@pytest.mark.django_db
1✔
2937
def test_wrapped_email_test(
1✔
2938
    rf,
2939
    caplog,
2940
    language,
2941
    has_premium,
2942
    has_tracker_report_link,
2943
    num_level_one_email_trackers_removed,
2944
):
2945
    # Reload Fluent files to regenerate errors
2946
    if language == "en":
1✔
2947
        main.reload()
1✔
2948

2949
    data = {
1✔
2950
        "language": language,
2951
        "has_premium": has_premium,
2952
        "has_tracker_report_link": has_tracker_report_link,
2953
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
2954
    }
2955
    request = rf.get("/emails/wrapped_email_test", data=data)
1✔
2956
    response = wrapped_email_test(request)
1✔
2957
    assert response.status_code == 200
1✔
2958

2959
    # Check that all Fluent IDs were in the English corpus
2960
    if language == "en" and caplog.record_tuples:  # pragma: no cover
2961
        for log_name, log_level, message in caplog.record_tuples:
2962
            if log_name == "django_ftl.message_errors":
2963
                pytest.fail(message)
2964

2965
    no_space_html = re.sub(r"\s+", "", response.content.decode())
1✔
2966
    assert f"<li><strong>language</strong>:{language}" in no_space_html
1✔
2967
    assert f"<li><strong>has_premium</strong>:{has_premium}" in no_space_html
1✔
2968
    assert (
1✔
2969
        f"<li><strong>has_tracker_report_link</strong>:{has_tracker_report_link}"
2970
    ) in no_space_html
2971
    assert (
1✔
2972
        "<li><strong>num_level_one_email_trackers_removed</strong>:"
2973
        f"{num_level_one_email_trackers_removed}"
2974
    ) in no_space_html
2975
    if has_tracker_report_link == "Yes" and num_level_one_email_trackers_removed != "0":
1✔
2976
        assert "/tracker-report/#" in no_space_html
1✔
2977
    else:
2978
        assert "/tracker-report/#" not in no_space_html
1✔
2979

2980

2981
@pytest.mark.parametrize("forwarded", ("False", "True"))
1✔
2982
@pytest.mark.parametrize("content_type", ("text/plain", "text/html"))
1✔
2983
@pytest.mark.django_db
1✔
2984
def test_reply_requires_premium_test(rf, forwarded, content_type, caplog):
1✔
2985
    main.reload()  # Reload Fluent files to regenerate errors
1✔
2986
    url = (
1✔
2987
        "/emails/reply_requires_premium_test"
2988
        f"?forwarded={forwarded}&content-type={content_type}"
2989
    )
2990
    request = rf.get(url)
1✔
2991
    response = reply_requires_premium_test(request)
1✔
2992
    assert response.status_code == 200
1✔
2993
    html = response.content.decode()
1✔
2994
    assert (
1✔
2995
        "/premium/?utm_campaign=email_replies&amp;utm_source=email&amp;utm_medium=email"
2996
        in html
2997
    )
2998
    assert "Upgrade for more protection" in html
1✔
2999
    if forwarded == "True":
1✔
3000
        assert "We’ve sent this reply" in html
1✔
3001
    else:
3002
        assert "Your reply was not sent" in html
1✔
3003

3004
    # Check that all Fluent IDs were in the English corpus
3005
    if caplog.record_tuples:  # pragma: no cover
3006
        for log_name, log_level, message in caplog.record_tuples:
3007
            if log_name == "django_ftl.message_errors":
3008
                pytest.fail(message)
3009

3010

3011
def get_text_and_html_content(msg: EmailMessage) -> tuple[str, str]:
1✔
3012
    """
3013
    Return the plain text and HTML content of an email message.
3014

3015
    This replaces the legacy function msg.get_payload(). Another
3016
    option is get_body(), which will return the first match.
3017
    """
3018
    text_content: str | None = None
1✔
3019
    html_content: str | None = None
1✔
3020
    for part in msg.walk():
1✔
3021
        content_type = part.get_content_type()
1✔
3022
        if content_type == "text/plain":
1✔
3023
            if text_content is None:
1!
3024
                text_content = part.get_content()
1✔
3025
            else:
UNCOV
3026
                raise Exception("Second plain text section found.")
×
3027
        elif content_type == "text/html":
1✔
3028
            if html_content is None:
1!
3029
                html_content = part.get_content()
1✔
3030
            else:
UNCOV
3031
                raise Exception("Second HTML section found.")
×
3032
        elif content_type.startswith("multipart/"):
1!
3033
            pass
1✔
3034
        else:
UNCOV
3035
            raise ValueError(f"Unexpected content type {content_type}")
×
3036
    assert text_content is not None, "Plain text not found"
1✔
3037
    assert html_content is not None, "HTML not found"
1✔
3038
    return text_content, html_content
1✔
3039

3040

3041
@pytest.mark.django_db
1✔
3042
def test_build_reply_requires_premium_email_first_time_includes_forward_text():
1✔
3043
    # First create a valid reply record from an external sender to a free Relay user
3044
    free_user = make_free_test_user()
1✔
3045
    relay_address = baker.make(
1✔
3046
        RelayAddress, user=free_user, address="w41fwbt4q", domain=2
3047
    )
3048

3049
    original_sender = "external_sender@test.com"
1✔
3050
    original_msg_id = "<external-msg-id-123@test.com>"
1✔
3051
    original_msg_id_bytes = get_message_id_bytes(original_msg_id)
1✔
3052
    (lookup_key, encryption_key) = derive_reply_keys(original_msg_id_bytes)
1✔
3053
    original_metadata = {
1✔
3054
        "message-id": original_msg_id,
3055
        "from": original_sender,
3056
        "reply-to": original_sender,
3057
    }
3058
    original_encrypted_metadata = encrypt_reply_metadata(
1✔
3059
        encryption_key, original_metadata
3060
    )
3061
    reply_record = Reply.objects.create(
1✔
3062
        lookup=b64_lookup_key(lookup_key),
3063
        encrypted_metadata=original_encrypted_metadata,
3064
        relay_address=relay_address,
3065
    )
3066

3067
    # Now send a reply from the free Relay user to the external sender
3068
    test_from = relay_address.full_address
1✔
3069
    test_msg_id = "<relay-user-msg-id-456@usersemail.com>"
1✔
3070
    decrypted_metadata = json.loads(
1✔
3071
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
3072
    )
3073
    msg = _build_reply_requires_premium_email(
1✔
3074
        test_from, reply_record, test_msg_id, decrypted_metadata
3075
    )
3076

3077
    domain = get_domains_from_settings().get("RELAY_FIREFOX_DOMAIN")
1✔
3078
    expected_From = f"replies@{domain}"
1✔
3079
    assert msg["Subject"] == "Replies are not included with your free account"
1✔
3080
    assert msg["From"] == expected_From
1✔
3081
    assert msg["To"] == relay_address.full_address
1✔
3082

3083
    text_content, html_content = get_text_and_html_content(msg)
1✔
3084
    assert "sent this reply" in text_content
1✔
3085
    assert "sent this reply" in html_content
1✔
3086

3087
    assert_email_equals_fixture(
1✔
3088
        msg.as_string(), "reply_requires_premium_first", replace_mime_boundaries=True
3089
    )
3090

3091

3092
@pytest.mark.django_db
1✔
3093
def test_build_reply_requires_premium_email_after_forward():
1✔
3094
    # First create a valid reply record from an external sender to a free Relay user
3095
    free_user = make_free_test_user()
1✔
3096
    relay_address = baker.make(
1✔
3097
        RelayAddress, user=free_user, address="w41fwbt4q", domain=2
3098
    )
3099
    _set_forwarded_first_reply(free_user.profile)
1✔
3100

3101
    original_sender = "external_sender@test.com"
1✔
3102
    original_msg_id = "<external-msg-id-123@test.com>"
1✔
3103
    original_msg_id_bytes = get_message_id_bytes(original_msg_id)
1✔
3104
    (lookup_key, encryption_key) = derive_reply_keys(original_msg_id_bytes)
1✔
3105
    original_metadata = {
1✔
3106
        "message-id": original_msg_id,
3107
        "from": original_sender,
3108
        "reply-to": original_sender,
3109
    }
3110
    original_encrypted_metadata = encrypt_reply_metadata(
1✔
3111
        encryption_key, original_metadata
3112
    )
3113
    reply_record = Reply.objects.create(
1✔
3114
        lookup=b64_lookup_key(lookup_key),
3115
        encrypted_metadata=original_encrypted_metadata,
3116
        relay_address=relay_address,
3117
    )
3118

3119
    # Now send a reply from the free Relay user to the external sender
3120
    test_from = relay_address.full_address
1✔
3121
    test_msg_id = "<relay-user-msg-id-456@usersemail.com>"
1✔
3122
    decrypted_metadata = json.loads(
1✔
3123
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
3124
    )
3125
    msg = _build_reply_requires_premium_email(
1✔
3126
        test_from, reply_record, test_msg_id, decrypted_metadata
3127
    )
3128

3129
    domain = get_domains_from_settings().get("RELAY_FIREFOX_DOMAIN")
1✔
3130
    expected_From = f"replies@{domain}"
1✔
3131
    assert msg["Subject"] == "Replies are not included with your free account"
1✔
3132
    assert msg["From"] == expected_From
1✔
3133
    assert msg["To"] == relay_address.full_address
1✔
3134

3135
    text_content, html_content = get_text_and_html_content(msg)
1✔
3136
    assert "Your reply was not sent" in text_content
1✔
3137
    assert "Your reply was not sent" in html_content
1✔
3138

3139
    assert_email_equals_fixture(
1✔
3140
        msg.as_string(), "reply_requires_premium_second", replace_mime_boundaries=True
3141
    )
3142

3143

3144
def test_get_keys_from_headers_no_reply_headers(settings):
1✔
3145
    """If no reply headers, raise ReplyHeadersNotFound."""
3146
    msg_id = "<msg-id-123@email.com>"
1✔
3147
    headers = [{"name": "Message-Id", "value": msg_id}]
1✔
3148
    settings.STATSD_ENABLED = True
1✔
3149
    with MetricsMock() as mm, pytest.raises(ReplyHeadersNotFound):
1✔
3150
        _get_keys_from_headers(headers)
1✔
3151
    mm.assert_incr_once("mail_to_replies_without_reply_headers")
1✔
3152

3153

3154
def test_get_keys_from_headers_in_reply_to():
1✔
3155
    """If In-Reply-To header, get keys from it."""
3156
    msg_id = "<msg-id-123@email.com>"
1✔
3157
    msg_id_bytes = get_message_id_bytes(msg_id)
1✔
3158
    lookup_key, encryption_key = derive_reply_keys(msg_id_bytes)
1✔
3159
    headers = [{"name": "In-Reply-To", "value": msg_id}]
1✔
3160
    (lookup_key_from_header, encryption_key_from_header) = _get_keys_from_headers(
1✔
3161
        headers
3162
    )
3163
    assert lookup_key == lookup_key_from_header
1✔
3164
    assert encryption_key == encryption_key_from_header
1✔
3165

3166

3167
@pytest.mark.django_db
1✔
3168
def test_get_keys_from_headers_references_reply():
1✔
3169
    """
3170
    If no In-Reply-To header, get keys from References header.
3171
    """
3172
    msg_id = "<msg-id-456@email.com"
1✔
3173
    msg_id_bytes = get_message_id_bytes(msg_id)
1✔
3174
    lookup_key, encryption_key = derive_reply_keys(msg_id_bytes)
1✔
3175
    baker.make(Reply, lookup=b64_lookup_key(lookup_key))
1✔
3176
    msg_ids = f"<msg-id-123@email.com> {msg_id} <msg-id-789@email.com>"
1✔
3177
    headers = [{"name": "References", "value": msg_ids}]
1✔
3178
    (lookup_key_from_header, encryption_key_from_header) = _get_keys_from_headers(
1✔
3179
        headers
3180
    )
3181
    assert lookup_key == lookup_key_from_header
1✔
3182
    assert encryption_key == encryption_key_from_header
1✔
3183

3184

3185
@pytest.mark.django_db
1✔
3186
def test_get_keys_from_headers_references_reply_dne():
1✔
3187
    """
3188
    If no In-Reply-To header,
3189
    and no Reply record for any values in the References header,
3190
    raise Reply.DoesNotExist.
3191
    """
3192
    msg_ids = "<msg-id-123@email.com> <msg-id-456@email.com> <msg-id-789@email.com>"
1✔
3193
    headers = [{"name": "References", "value": msg_ids}]
1✔
3194
    with pytest.raises(Reply.DoesNotExist):
1✔
3195
        _get_keys_from_headers(headers)
1✔
3196

3197

3198
def test_replace_headers_read_error_is_handled() -> None:
1✔
3199
    """
3200
    A header that errors on read is added to issues.
3201

3202
    We can't create a test fixture with a header that raises an exception, because we
3203
    don't know what that header looks like, but we know they exist, because a small
3204
    minority of emails fail when reading the header. Once the exception-catching code is
3205
    in production, we'll know what a failing header looks like, but then we'll probably
3206
    handle it like we did with RelayMessageIDHeader, and we'll be back to not knowing
3207
    what headers raise exceptions.
3208

3209
    So, instead, use the plain_text fixture, add a testing header, and then patch
3210
    EmailMessage so that reading that header (and not the other ones) raises an
3211
    exception.
3212
    """
3213

3214
    email_text = EMAIL_INCOMING["plain_text"]
1✔
3215
    email = message_from_string(email_text, policy=relay_policy)
1✔
3216
    assert isinstance(email, EmailMessage)
1✔
3217

3218
    # Verify that the next headers are different than the existing headers
3219
    new_headers: OutgoingHeaders = {
1✔
3220
        "Subject": "Error Handling Test",
3221
        "From": "from@example.com",
3222
        "To": "to@example.com",
3223
    }
3224
    for name, value in new_headers.items():
1✔
3225
        assert email[name] != value
1✔
3226

3227
    # Add a header that will raise an exception when read.
3228
    # The header itself is OK, but we mock  Message.__getitem__ (called when you use
3229
    # value = email[name]) to raise an error when the test header is accessed.
3230
    email["X-Fail"] = "I am for testing read exceptions"
1✔
3231

3232
    def getitem_raise_on_x_fail(self, name):
1✔
3233
        """
3234
        Message.__getitem__ that raises for X-Fail header
3235

3236
        https://github.com/python/cpython/blob/babb787047e0f7807c8238d3b1a3128dac30bd5c/Lib/email/message.py#L409-L418
3237
        """
3238
        if name == "X-Fail":
1✔
3239
            raise RuntimeError("I failed.")
1✔
3240
        return self.get(name)
1✔
3241

3242
    # Activate our testing mock and run _replace_headers
3243
    with patch.object(EmailMessage, "__getitem__", getitem_raise_on_x_fail):
1✔
3244
        issues = _replace_headers(email, new_headers)
1✔
3245

3246
    # The mocked exception was handled and logged
3247
    assert issues == [
1✔
3248
        {
3249
            "header": "X-Fail",
3250
            "direction": "in",
3251
            "exception_on_read": "RuntimeError('I failed.')",
3252
        }
3253
    ]
3254

3255
    # _replace_headers continued working with the remaining data, the headers are now
3256
    # set to the desired new values.
3257
    for name, value in new_headers.items():
1✔
3258
        assert email[name] == value
1✔
3259

3260

3261
@pytest.mark.django_db
1✔
3262
def test_opt_out_user_has_minimal_email_dropped_log(caplog):
1✔
3263
    user = baker.make(User, email="opt-out@example.com")
1✔
3264
    address = user.relayaddress_set.create()
1✔
3265
    SocialAccount.objects.create(
1✔
3266
        user=user,
3267
        provider="fxa",
3268
        uid=str(uuid4()),
3269
        extra_data={
3270
            "avatar": "image.png",
3271
            "subscriptions": [],
3272
            "metricsEnabled": False,
3273
        },
3274
    )
3275
    log_email_dropped("abuse_flag", address)
1✔
3276
    assert len(caplog.records) == 1
1✔
3277
    assert log_extra(caplog.records[0]) == {
1✔
3278
        "reason": "abuse_flag",
3279
        "is_random_mask": True,
3280
        "is_reply": False,
3281
        "can_retry": False,
3282
    }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc