• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 51dd9adf-bda5-4871-bc76-101ba087980c

11 Oct 2024 04:50PM CUT coverage: 84.478% (+0.07%) from 84.411%
51dd9adf-bda5-4871-bc76-101ba087980c

Pull #5090

circleci

jwhitlock
Adjust mock date to the past
Pull Request #5090: MPP-3932: Add flag 'developer_mode', use to simulate complaint and log notifications

2369 of 3511 branches covered (67.47%)

Branch coverage included in aggregate %.

116 of 117 new or added lines in 2 files covered. (99.15%)

1 existing line in 1 file now uncovered.

16429 of 18741 relevant lines covered (87.66%)

10.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.57
/emails/views.py
1
import base64
1✔
2
import html
1✔
3
import json
1✔
4
import logging
1✔
5
import re
1✔
6
import shlex
1✔
7
import zlib
1✔
8
from copy import deepcopy
1✔
9
from datetime import UTC, datetime
1✔
10
from email import message_from_bytes
1✔
11
from email.iterators import _structure
1✔
12
from email.message import EmailMessage
1✔
13
from email.utils import parseaddr
1✔
14
from io import StringIO
1✔
15
from json import JSONDecodeError
1✔
16
from textwrap import dedent
1✔
17
from typing import Any, Literal, NamedTuple
1✔
18
from urllib.parse import urlencode
1✔
19

20
from django.conf import settings
1✔
21
from django.contrib.auth.models import User
1✔
22
from django.core.exceptions import ObjectDoesNotExist
1✔
23
from django.db import transaction
1✔
24
from django.db.models import prefetch_related_objects
1✔
25
from django.http import HttpRequest, HttpResponse
1✔
26
from django.shortcuts import render
1✔
27
from django.template.loader import render_to_string
1✔
28
from django.utils.html import escape
1✔
29
from django.views.decorators.csrf import csrf_exempt
1✔
30

31
from botocore.exceptions import ClientError
1✔
32
from codetiming import Timer
1✔
33
from decouple import strtobool
1✔
34
from markus.utils import generate_tag
1✔
35
from sentry_sdk import capture_message
1✔
36
from waffle import get_waffle_flag_model, sample_is_active
1✔
37

38
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
39
from privaterelay.models import Profile
1✔
40
from privaterelay.utils import (
1✔
41
    flag_is_active_in_task,
42
    get_subplat_upgrade_link_by_language,
43
    glean_logger,
44
)
45

46
from .exceptions import CannotMakeAddressException
1✔
47
from .models import (
1✔
48
    DeletedAddress,
49
    DomainAddress,
50
    RelayAddress,
51
    Reply,
52
    address_hash,
53
    get_domain_numerical,
54
)
55
from .policy import relay_policy
1✔
56
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
1✔
57
from .types import (
1✔
58
    AWS_MailJSON,
59
    AWS_SNSMessageJSON,
60
    EmailForwardingIssues,
61
    EmailHeaderIssues,
62
    OutgoingHeaders,
63
)
64
from .utils import (
1✔
65
    InvalidFromHeader,
66
    _get_bucket_and_key_from_s3_json,
67
    b64_lookup_key,
68
    count_all_trackers,
69
    decrypt_reply_metadata,
70
    derive_reply_keys,
71
    encrypt_reply_metadata,
72
    generate_from_header,
73
    get_domains_from_settings,
74
    get_message_content_from_s3,
75
    get_message_id_bytes,
76
    get_reply_to_address,
77
    histogram_if_enabled,
78
    incr_if_enabled,
79
    parse_email_header,
80
    remove_message_from_s3,
81
    remove_trackers,
82
    ses_send_raw_email,
83
    urlize_and_linebreaks,
84
)
85

86
logger = logging.getLogger("events")
1✔
87
info_logger = logging.getLogger("eventsinfo")
1✔
88

89

90
class ReplyHeadersNotFound(Exception):
1✔
91
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
92
        self.message = message
1✔
93

94

95
def first_time_user_test(request):
1✔
96
    """
97
    Demonstrate rendering of the "First time Relay user" email.
98
    Settings like language can be given in the querystring, otherwise settings
99
    come from a random free profile.
100
    """
101
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
102
    email_context = {
×
103
        "in_bundle_country": in_bundle_country,
104
        "SITE_ORIGIN": settings.SITE_ORIGIN,
105
    }
106
    if request.GET.get("format", "html") == "text":
×
107
        return render(
×
108
            request,
109
            "emails/first_time_user.txt",
110
            email_context,
111
            "text/plain; charset=utf-8",
112
        )
113
    return render(request, "emails/first_time_user.html", email_context)
×
114

115

116
def reply_requires_premium_test(request):
1✔
117
    """
118
    Demonstrate rendering of the "Reply requires premium" email.
119

120
    Settings like language can be given in the querystring, otherwise settings
121
    come from a random free profile.
122
    """
123
    email_context = {
1✔
124
        "sender": "test@example.com",
125
        "forwarded": True,
126
        "SITE_ORIGIN": settings.SITE_ORIGIN,
127
    }
128
    for param in request.GET:
1✔
129
        email_context[param] = request.GET.get(param)
1✔
130
        if param == "forwarded" and request.GET[param] == "True":
1✔
131
            email_context[param] = True
1✔
132

133
    for param in request.GET:
1✔
134
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
135
            return render(
1✔
136
                request,
137
                "emails/reply_requires_premium.txt",
138
                email_context,
139
                "text/plain; charset=utf-8",
140
            )
141
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
142

143

144
def disabled_mask_for_spam_test(request):
1✔
145
    """
146
    Demonstrate rendering of the "Disabled mask for spam" email.
147

148
    Settings like language can be given in the querystring, otherwise settings
149
    come from a random free profile.
150
    """
151
    mask = "abc123456@mozmail.com"
×
152
    email_context = {
×
153
        "mask": mask,
154
        "SITE_ORIGIN": settings.SITE_ORIGIN,
155
    }
156
    for param in request.GET:
×
157
        email_context[param] = request.GET.get(param)
×
158

159
    for param in request.GET:
×
160
        if param == "content-type" and request.GET[param] == "text/plain":
×
161
            return render(
×
162
                request,
163
                "emails/disabled_mask_for_spam.txt",
164
                email_context,
165
                "text/plain; charset=utf-8",
166
            )
167
    return render(request, "emails/disabled_mask_for_spam.html", email_context)
×
168

169

170
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
171
    # TO DO: Update with correct context when trigger is created
172
    first_forwarded_email_html = render_to_string(
×
173
        "emails/first_forwarded_email.html",
174
        {
175
            "SITE_ORIGIN": settings.SITE_ORIGIN,
176
        },
177
    )
178

179
    wrapped_email = wrap_html_email(
×
180
        first_forwarded_email_html,
181
        "en-us",
182
        True,
183
        "test@example.com",
184
        0,
185
    )
186

187
    return HttpResponse(wrapped_email)
×
188

189

190
def wrap_html_email(
1✔
191
    original_html: str,
192
    language: str,
193
    has_premium: bool,
194
    display_email: str,
195
    num_level_one_email_trackers_removed: int | None = None,
196
    tracker_report_link: str | None = None,
197
) -> str:
198
    """Add Relay banners, surveys, etc. to an HTML email"""
199
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
200
    email_context = {
1✔
201
        "original_html": original_html,
202
        "language": language,
203
        "has_premium": has_premium,
204
        "subplat_upgrade_link": subplat_upgrade_link,
205
        "display_email": display_email,
206
        "tracker_report_link": tracker_report_link,
207
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
208
        "SITE_ORIGIN": settings.SITE_ORIGIN,
209
    }
210
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
211
    # Remove empty lines
212
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
213
    return "\n".join(content_lines) + "\n"
1✔
214

215

216
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
217
    """
218
    Demonstrate rendering of forwarded HTML emails.
219

220
    Settings like language can be given in the querystring, otherwise settings
221
    come from a randomly chosen profile.
222
    """
223

224
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
225
        user_profile = None
1✔
226
    else:
227
        user_profile = Profile.objects.order_by("?").first()
1✔
228

229
    if "language" in request.GET:
1✔
230
        language = request.GET["language"]
1✔
231
    else:
232
        if user_profile is None:
1!
233
            raise ValueError("user_profile must not be None")
×
234
        language = user_profile.language
1✔
235

236
    if "has_premium" in request.GET:
1✔
237
        has_premium = strtobool(request.GET["has_premium"])
1✔
238
    else:
239
        if user_profile is None:
1!
240
            raise ValueError("user_profile must not be None")
×
241
        has_premium = user_profile.has_premium
1✔
242

243
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
244
        num_level_one_email_trackers_removed = int(
1✔
245
            request.GET["num_level_one_email_trackers_removed"]
246
        )
247
    else:
248
        num_level_one_email_trackers_removed = 0
1✔
249

250
    if "has_tracker_report_link" in request.GET:
1✔
251
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
252
    else:
253
        has_tracker_report_link = False
1✔
254
    if has_tracker_report_link:
1✔
255
        if num_level_one_email_trackers_removed:
1✔
256
            trackers = {
1✔
257
                "fake-tracker.example.com": num_level_one_email_trackers_removed
258
            }
259
        else:
260
            trackers = {}
1✔
261
        tracker_report_link = (
1✔
262
            "/tracker-report/#{"
263
            '"sender": "sender@example.com", '
264
            '"received_at": 1658434657, '
265
            f'"trackers": { json.dumps(trackers) }'
266
            "}"
267
        )
268
    else:
269
        tracker_report_link = ""
1✔
270

271
    path = "/emails/wrapped_email_test"
1✔
272
    old_query = {
1✔
273
        "language": language,
274
        "has_premium": "Yes" if has_premium else "No",
275
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
276
        "num_level_one_email_trackers_removed": str(
277
            num_level_one_email_trackers_removed
278
        ),
279
    }
280

281
    def switch_link(key, value):
1✔
282
        if old_query[key] == value:
1✔
283
            return str(value)
1✔
284
        new_query = old_query.copy()
1✔
285
        new_query[key] = value
1✔
286
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
287

288
    html_content = dedent(
1✔
289
        f"""\
290
    <p>
291
      <strong>Email rendering Test</strong>
292
    </p>
293
    <p>Settings: (<a href="{path}">clear all</a>)</p>
294
    <ul>
295
      <li>
296
        <strong>language</strong>:
297
        {escape(language)}
298
        (switch to
299
        {switch_link("language", "en-us")},
300
        {switch_link("language", "de")},
301
        {switch_link("language", "en-gb")},
302
        {switch_link("language", "fr")},
303
        {switch_link("language", "ru-ru")},
304
        {switch_link("language", "es-es")},
305
        {switch_link("language", "pt-br")},
306
        {switch_link("language", "it-it")},
307
        {switch_link("language", "en-ca")},
308
        {switch_link("language", "de-de")},
309
        {switch_link("language", "es-mx")})
310
      </li>
311
      <li>
312
        <strong>has_premium</strong>:
313
        {"Yes" if has_premium else "No"}
314
        (switch to
315
        {switch_link("has_premium", "Yes")},
316
        {switch_link("has_premium", "No")})
317
      </li>
318
      <li>
319
        <strong>has_tracker_report_link</strong>:
320
        {"Yes" if has_tracker_report_link else "No"}
321
        (switch to
322
        {switch_link("has_tracker_report_link", "Yes")},
323
        {switch_link("has_tracker_report_link", "No")})
324
      </li>
325
      <li>
326
        <strong>num_level_one_email_trackers_removed</strong>:
327
        {num_level_one_email_trackers_removed}
328
        (switch to
329
        {switch_link("num_level_one_email_trackers_removed", "0")},
330
        {switch_link("num_level_one_email_trackers_removed", "1")},
331
        {switch_link("num_level_one_email_trackers_removed", "2")})
332
      </li>
333
    </ul>
334
    """
335
    )
336

337
    wrapped_email = wrap_html_email(
1✔
338
        original_html=html_content,
339
        language=language,
340
        has_premium=has_premium,
341
        tracker_report_link=tracker_report_link,
342
        display_email="test@relay.firefox.com",
343
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
344
    )
345
    return HttpResponse(wrapped_email)
1✔
346

347

348
def _store_reply_record(
1✔
349
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
350
) -> AWS_MailJSON:
351
    # After relaying email, store a Reply record for it
352
    reply_metadata = {}
1✔
353
    for header in mail["headers"]:
1✔
354
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
355
            reply_metadata[header["name"].lower()] = header["value"]
1✔
356
    message_id_bytes = get_message_id_bytes(message_id)
1✔
357
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
358
    lookup = b64_lookup_key(lookup_key)
1✔
359
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
360
    reply_create_args: dict[str, Any] = {
1✔
361
        "lookup": lookup,
362
        "encrypted_metadata": encrypted_metadata,
363
    }
364
    if isinstance(address, DomainAddress):
1✔
365
        reply_create_args["domain_address"] = address
1✔
366
    else:
367
        if not isinstance(address, RelayAddress):
1!
368
            raise TypeError("address must be type RelayAddress")
×
369
        reply_create_args["relay_address"] = address
1✔
370
    Reply.objects.create(**reply_create_args)
1✔
371
    return mail
1✔
372

373

374
@csrf_exempt
1✔
375
def sns_inbound(request):
1✔
376
    incr_if_enabled("sns_inbound", 1)
1✔
377
    # First thing we do is verify the signature
378
    json_body = json.loads(request.body)
1✔
379
    verified_json_body = verify_from_sns(json_body)
1✔
380

381
    # Validate ARN and message type
382
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
383
    message_type = verified_json_body.get("Type", None)
1✔
384
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
385
    if error_details:
1✔
386
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
387
        return HttpResponse(error_details["error"], status=400)
1✔
388

389
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
390

391

392
def validate_sns_arn_and_type(
1✔
393
    topic_arn: str | None, message_type: str | None
394
) -> dict[str, Any] | None:
395
    """
396
    Validate Topic ARN and SNS Message Type.
397

398
    If an error is detected, the return is a dictionary of error details.
399
    If no error is detected, the return is None.
400
    """
401
    if not topic_arn:
1✔
402
        error = "Received SNS request without Topic ARN."
1✔
403
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
404
        error = "Received SNS message for wrong topic."
1✔
405
    elif not message_type:
1✔
406
        error = "Received SNS request without Message Type."
1✔
407
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
408
        error = "Received SNS message for unsupported Type."
1✔
409
    else:
410
        error = None
1✔
411

412
    if error:
1✔
413
        return {
1✔
414
            "error": error,
415
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
416
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
417
            "received_sns_type": (
418
                shlex.quote(message_type) if message_type else message_type
419
            ),
420
            "supported_sns_types": SUPPORTED_SNS_TYPES,
421
        }
422
    return None
1✔
423

424

425
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
426
    if message_type == "SubscriptionConfirmation":
×
427
        info_logger.info(
×
428
            "SNS SubscriptionConfirmation",
429
            extra={"SubscribeURL": json_body["SubscribeURL"]},
430
        )
431
        return HttpResponse("Logged SubscribeURL", status=200)
×
432
    if message_type == "Notification":
×
433
        incr_if_enabled("sns_inbound_Notification", 1)
×
434
        return _sns_notification(json_body)
×
435

436
    logger.error(
×
437
        "SNS message type did not fall under the SNS inbound logic",
438
        extra={"message_type": shlex.quote(message_type)},
439
    )
440
    capture_message(
×
441
        "Received SNS message with type not handled in inbound log",
442
        level="error",
443
        stack=True,
444
    )
445
    return HttpResponse(
×
446
        "Received SNS message with type not handled in inbound log", status=400
447
    )
448

449

450
def _sns_notification(json_body):
1✔
451
    try:
1✔
452
        message_json = json.loads(json_body["Message"])
1✔
453
    except JSONDecodeError:
1✔
454
        logger.error(
1✔
455
            "SNS notification has non-JSON message body",
456
            extra={"content": shlex.quote(json_body["Message"])},
457
        )
458
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
459

460
    event_type = message_json.get("eventType")
1✔
461
    notification_type = message_json.get("notificationType")
1✔
462
    if notification_type not in {
1✔
463
        "Complaint",
464
        "Received",
465
        "Bounce",
466
    } and event_type not in {"Complaint", "Bounce"}:
467
        logger.error(
1✔
468
            "SNS notification for unsupported type",
469
            extra={
470
                "notification_type": shlex.quote(notification_type),
471
                "event_type": shlex.quote(event_type),
472
                "keys": [shlex.quote(key) for key in message_json.keys()],
473
            },
474
        )
475
        return HttpResponse(
1✔
476
            (
477
                "Received SNS notification for unsupported Type: "
478
                f"{html.escape(shlex.quote(notification_type))}"
479
            ),
480
            status=400,
481
        )
482
    response = _sns_message(message_json)
1✔
483
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
484
    if response.status_code < 500:
1✔
485
        remove_message_from_s3(bucket, object_key)
1✔
486

487
    return response
1✔
488

489

490
def _get_recipient_with_relay_domain(recipients):
1✔
491
    domains_to_check = get_domains_from_settings().values()
1✔
492
    for recipient in recipients:
1✔
493
        for domain in domains_to_check:
1✔
494
            if domain in recipient:
1✔
495
                return recipient
1✔
496
    return None
1✔
497

498

499
def _get_relay_recipient_from_message_json(message_json):
1✔
500
    # Go thru all To, Cc, and Bcc fields and
501
    # return the one that has a Relay domain
502

503
    # First check common headers for to or cc match
504
    headers_to_check = "to", "cc"
1✔
505
    common_headers = message_json["mail"]["commonHeaders"]
1✔
506
    for header in headers_to_check:
1✔
507
        if header in common_headers:
1✔
508
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
509
            if recipient is not None:
1✔
510
                return parseaddr(recipient)[1]
1✔
511

512
    # SES-SNS sends bcc in a different part of the message
513
    recipients = message_json["receipt"]["recipients"]
1✔
514
    return _get_recipient_with_relay_domain(recipients)
1✔
515

516

517
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
518
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
519
    notification_type = message_json.get("notificationType")
1✔
520
    event_type = message_json.get("eventType")
1✔
521
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
522
        return _handle_bounce(message_json)
1✔
523
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
524
        return _handle_complaint(message_json)
1✔
525
    if notification_type != "Received":
1!
526
        raise ValueError('notification_type must be "Received"')
×
527
    if event_type is not None:
1!
528
        raise ValueError("event_type must be None")
×
529
    return _handle_received(message_json)
1✔
530

531

532
# Enumerate the reasons that an email was not forwarded.
533
# This excludes emails dropped due to mask forwarding settings,
534
# such as "block all" and "block promotional". Those are logged
535
# as Glean email_blocked events.
536
EmailDroppedReason = Literal[
1✔
537
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
538
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
539
    "hard_bounce_pause",  # The user recently had a hard bounce
540
    "soft_bounce_pause",  # The user recently has a soft bounce
541
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
542
    "user_deactivated",  # The user account is deactivated
543
    "reply_requires_premium",  # The email is a reply from a free user
544
    "content_missing",  # Could not load the email from storage
545
    "error_from_header",  # Error generating the From: header, retryable
546
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
547
    "error_sending",  # Error sending the forwarded email (SES), retryable
548
]
549

550

551
def log_email_dropped(
1✔
552
    reason: EmailDroppedReason,
553
    mask: RelayAddress | DomainAddress,
554
    is_reply: bool = False,
555
    can_retry: bool = False,
556
) -> None:
557
    """
558
    Log that an email was dropped for a reason other than a mask blocking setting.
559

560
    This mirrors the interface of glean_logger().log_email_blocked(), which
561
    records emails dropped due to the mask's blocking setting.
562
    """
563
    extra: dict[str, str | int | bool] = {"reason": reason}
1✔
564
    if mask.user.profile.metrics_enabled:
1✔
565
        if mask.user.profile.fxa is not None:
1✔
566
            extra["fxa_id"] = mask.user.profile.fxa.uid
1✔
567
        extra["mask_id"] = mask.metrics_id
1✔
568
    extra |= {
1✔
569
        "is_random_mask": isinstance(mask, RelayAddress),
570
        "is_reply": is_reply,
571
        "can_retry": can_retry,
572
    }
573
    info_logger.info("email_dropped", extra=extra)
1✔
574

575

576
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
577
    """
578
    Handle an AWS SES received notification.
579

580
    For more information, see:
581
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
582
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
583

584
    Returns (may be incomplete):
585
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
586
      flagged for abuse, the email is under a bounce pause, the email was suppressed
587
      for spam, the list email was blocked, or the noreply address was the recipient.
588
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
589
      the email failed DMARC with reject policy, or the email is a reply chain to a
590
      non-premium user.
591
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
592
      "CC", or "BCC" fields, or the Relay address is not in the database.
593
    * 503 if the "From" address is malformed, the S3 client returned an error different
594
      from "not found", or the SES client fails
595

596
    And many other returns conditions if the email is a reply. The HTTP returns are an
597
    artifact from an earlier time when emails were sent to a webhook. Currently,
598
    production instead pulls events from a queue.
599

600
    TODO: Return a more appropriate status object
601
    TODO: Document the metrics emitted
602
    """
603
    mail = message_json["mail"]
1✔
604
    if "commonHeaders" not in mail:
1✔
605
        logger.error("SNS message without commonHeaders")
1✔
606
        return HttpResponse(
1✔
607
            "Received SNS notification without commonHeaders.", status=400
608
        )
609
    common_headers = mail["commonHeaders"]
1✔
610
    receipt = message_json["receipt"]
1✔
611

612
    _record_receipt_verdicts(receipt, "all")
1✔
613
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
614
    if to_address is None:
1✔
615
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
616
        return HttpResponse("Address does not exist", status=404)
1✔
617

618
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
619
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
620
    if not from_addresses:
1✔
621
        info_logger.error(
1✔
622
            "_handle_received: no from address",
623
            extra={
624
                "source": mail["source"],
625
                "common_headers_from": common_headers["from"],
626
            },
627
        )
628
        return HttpResponse("Unable to parse From address", status=400)
1✔
629
    from_address = from_addresses[0][1]
1✔
630

631
    try:
1✔
632
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
633
    except ValueError:
1✔
634
        # TODO: Add metric
635
        return HttpResponse("Malformed to field.", status=400)
1✔
636

637
    if to_local_portion.lower() == "noreply":
1✔
638
        incr_if_enabled("email_for_noreply_address", 1)
1✔
639
        return HttpResponse("noreply address is not supported.")
1✔
640
    try:
1✔
641
        # FIXME: this ambiguous return of either
642
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
643
        # up a bit.
644
        address = _get_address(to_address)
1✔
645
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
646
        user_profile = address.user.profile
1✔
647
    except (
1✔
648
        ObjectDoesNotExist,
649
        CannotMakeAddressException,
650
        DeletedAddress.MultipleObjectsReturned,
651
    ):
652
        if to_local_portion.lower() == "replies":
1✔
653
            response = _handle_reply(from_address, message_json, to_address)
1✔
654
        else:
655
            response = HttpResponse("Address does not exist", status=404)
1✔
656
        return response
1✔
657

658
    _record_receipt_verdicts(receipt, "valid_user")
1✔
659
    # if this is spam and the user is set to auto-block spam, early return
660
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
661
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
662
        log_email_dropped(reason="auto_block_spam", mask=address)
1✔
663
        return HttpResponse("Address rejects spam.")
1✔
664

665
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
666
        policy = receipt.get("dmarcPolicy", "none")
1✔
667
        # TODO: determine action on dmarcPolicy "quarantine"
668
        if policy == "reject":
1!
669
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
1✔
670
            incr_if_enabled(
1✔
671
                "email_suppressed_for_dmarc_failure",
672
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
673
            )
674
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
675

676
    # if this user is over bounce limits, early return
677
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
678
    if bounce_paused:
1✔
679
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
680
        incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
1✔
681
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
682
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
683
        )
684
        log_email_dropped(reason=reason, mask=address)
1✔
685
        return HttpResponse("Address is temporarily disabled.")
1✔
686

687
    # check if this is a reply from an external sender to a Relay user
688
    try:
1✔
689
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
690
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
691
        user_address = address
1✔
692
        address = reply_record.address
1✔
693
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
694
        # make sure the relay user is premium
695
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
696
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
1✔
697
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
698
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
699
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
700
        # match a Reply record, continue to treat this as a regular email from
701
        # an external sender to a relay user
702
        pass
1✔
703

704
    # if account flagged for abuse, early return
705
    if user_profile.is_flagged:
1✔
706
        log_email_dropped(reason="abuse_flag", mask=address)
1✔
707
        return HttpResponse("Address is temporarily disabled.")
1✔
708

709
    if not user_profile.user.is_active:
1✔
710
        log_email_dropped(reason="user_deactivated", mask=address)
1✔
711
        return HttpResponse("Account is deactivated.")
1✔
712

713
    # if address is set to block, early return
714
    if not address.enabled:
1✔
715
        incr_if_enabled("email_for_disabled_address", 1)
1✔
716
        address.num_blocked += 1
1✔
717
        address.save(update_fields=["num_blocked"])
1✔
718
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
719
        user_profile.last_engagement = datetime.now(UTC)
1✔
720
        user_profile.save()
1✔
721
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
722
        return HttpResponse("Address is temporarily disabled.")
1✔
723

724
    _record_receipt_verdicts(receipt, "active_alias")
1✔
725
    incr_if_enabled("email_for_active_address", 1)
1✔
726

727
    # if address is blocking list emails, and email is from list, early return
728
    if (
1✔
729
        address
730
        and address.block_list_emails
731
        and user_profile.has_premium
732
        and _check_email_from_list(mail["headers"])
733
    ):
734
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
735
        address.num_blocked += 1
1✔
736
        address.save(update_fields=["num_blocked"])
1✔
737
        user_profile.last_engagement = datetime.now(UTC)
1✔
738
        user_profile.save()
1✔
739
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
740
        return HttpResponse("Address is not accepting list emails.")
1✔
741

742
    # Collect new headers
743
    subject = common_headers.get("subject", "")
1✔
744
    destination_address = user_profile.user.email
1✔
745
    reply_address = get_reply_to_address()
1✔
746
    try:
1✔
747
        from_header = generate_from_header(from_address, to_address)
1✔
748
    except InvalidFromHeader:
1✔
749
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
750
        header_from = []
1✔
751
        for header in mail["headers"]:
1✔
752
            if header["name"].lower() == "from":
1✔
753
                header_from.append(header)
1✔
754
        info_logger.error(
1✔
755
            "generate_from_header",
756
            extra={
757
                "from_address": from_address,
758
                "source": mail["source"],
759
                "common_headers_from": common_headers["from"],
760
                "headers_from": header_from,
761
            },
762
        )
763
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
1✔
764
        return HttpResponse("Cannot parse the From address", status=503)
1✔
765

766
    # Get incoming email
767
    try:
1✔
768
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
769
    except ClientError as e:
1✔
770
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
771
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
772
            log_email_dropped(reason="content_missing", mask=address)
1✔
773
            return HttpResponse("Email not in S3", status=404)
1✔
774
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
775
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
1✔
776
        # we are returning a 503 so that SNS can retry the email processing
777
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
778

779
    # Handle developer overrides, logging
780
    dev_action = _get_developer_mode_action(address)
1✔
781
    if dev_action:
1✔
782
        if dev_action.new_destination_address:
1!
783
            destination_address = dev_action.new_destination_address
1✔
784
        _log_dev_notification(
1✔
785
            "_handle_received: developer_mode", dev_action, message_json
786
        )
787

788
    # Convert to new email
789
    headers: OutgoingHeaders = {
1✔
790
        "Subject": subject,
791
        "From": from_header,
792
        "To": destination_address,
793
        "Reply-To": reply_address,
794
        "Resent-From": from_address,
795
    }
796
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
797
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
798
    remove_level_one_trackers = bool(
1✔
799
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
800
    )
801
    (
1✔
802
        forwarded_email,
803
        issues,
804
        level_one_trackers_removed,
805
        has_html,
806
        has_text,
807
    ) = _convert_to_forwarded_email(
808
        incoming_email_bytes=incoming_email_bytes,
809
        headers=headers,
810
        to_address=to_address,
811
        from_address=from_address,
812
        language=user_profile.language,
813
        has_premium=user_profile.has_premium,
814
        sample_trackers=sample_trackers,
815
        remove_level_one_trackers=remove_level_one_trackers,
816
    )
817
    if has_html:
1✔
818
        incr_if_enabled("email_with_html_content", 1)
1✔
819
    if has_text:
1✔
820
        incr_if_enabled("email_with_text_content", 1)
1✔
821
    if issues:
1✔
822
        info_logger.info(
1✔
823
            "_handle_received: forwarding issues", extra={"issues": issues}
824
        )
825

826
    # Send new email
827
    try:
1✔
828
        ses_response = ses_send_raw_email(
1✔
829
            source_address=reply_address,
830
            destination_address=destination_address,
831
            message=forwarded_email,
832
        )
833
    except ClientError:
1✔
834
        # 503 service unavailable response to SNS so it can retry
835
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
1✔
836
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
837

838
    message_id = ses_response["MessageId"]
1✔
839
    _store_reply_record(mail, message_id, address)
1✔
840

841
    user_profile.update_abuse_metric(
1✔
842
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
843
    )
844
    user_profile.last_engagement = datetime.now(UTC)
1✔
845
    user_profile.save()
1✔
846
    address.num_forwarded += 1
1✔
847
    address.last_used_at = datetime.now(UTC)
1✔
848
    if level_one_trackers_removed:
1!
849
        address.num_level_one_trackers_blocked = (
×
850
            address.num_level_one_trackers_blocked or 0
851
        ) + level_one_trackers_removed
852
    address.save(
1✔
853
        update_fields=[
854
            "num_forwarded",
855
            "last_used_at",
856
            "block_list_emails",
857
            "num_level_one_trackers_blocked",
858
        ]
859
    )
860
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
861
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
862

863

864
class DeveloperModeAction(NamedTuple):
1✔
865
    mask_id: str
1✔
866
    action: Literal["log", "simulate_complaint"] = "log"
1✔
867
    new_destination_address: str | None = None
1✔
868

869

870
def _get_verdict(receipt, verdict_type):
1✔
871
    return receipt[f"{verdict_type}Verdict"]["status"]
1✔
872

873

874
def _check_email_from_list(headers):
1✔
875
    for header in headers:
1!
876
        if header["name"].lower().startswith("list-"):
1!
877
            return True
1✔
878
    return False
×
879

880

881
def _record_receipt_verdicts(receipt, state):
1✔
882
    verdict_tags = []
1✔
883
    for key in sorted(receipt.keys()):
1✔
884
        if key.endswith("Verdict"):
1✔
885
            value = receipt[key]["status"]
1✔
886
            verdict_tags.append(f"{key}:{value}")
1✔
887
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
888
        elif key == "dmarcPolicy":
1✔
889
            value = receipt[key]
1✔
890
            verdict_tags.append(f"{key}:{value}")
1✔
891
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
892

893

894
def _get_message_id_from_headers(headers):
1✔
895
    message_id = None
1✔
896
    for header in headers:
1✔
897
        if header["name"].lower() == "message-id":
1✔
898
            message_id = header["value"]
1✔
899
    return message_id
1✔
900

901

902
def _get_keys_from_headers(headers):
1✔
903
    in_reply_to = None
1✔
904
    for header in headers:
1✔
905
        if header["name"].lower() == "in-reply-to":
1✔
906
            in_reply_to = header["value"]
1✔
907
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
908
            return derive_reply_keys(message_id_bytes)
1✔
909

910
        if header["name"].lower() == "references":
1✔
911
            message_ids = header["value"]
1✔
912
            for message_id in message_ids.split(" "):
1✔
913
                message_id_bytes = get_message_id_bytes(message_id)
1✔
914
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
915
                try:
1✔
916
                    # FIXME: calling code is likely to duplicate this query
917
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
918
                    return lookup_key, encryption_key
1✔
919
                except Reply.DoesNotExist:
1✔
920
                    pass
1✔
921
            raise Reply.DoesNotExist
1✔
922
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
923
    raise ReplyHeadersNotFound
1✔
924

925

926
def _get_reply_record_from_lookup_key(lookup_key):
1✔
927
    lookup = b64_lookup_key(lookup_key)
1✔
928
    return Reply.objects.get(lookup=lookup)
1✔
929

930

931
def _strip_localpart_tag(address):
1✔
932
    [localpart, domain] = address.split("@")
1✔
933
    subaddress_parts = localpart.split("+")
1✔
934
    return f"{subaddress_parts[0]}@{domain}"
1✔
935

936

937
_TransportType = Literal["sns", "s3"]
1✔
938

939

940
def _get_email_bytes(
1✔
941
    message_json: AWS_SNSMessageJSON,
942
) -> tuple[bytes, _TransportType, float]:
943
    with Timer(logger=None) as load_timer:
1✔
944
        if "content" in message_json:
1✔
945
            # email content in sns message
946
            message_content = message_json["content"].encode("utf-8")
1✔
947
            transport: Literal["sns", "s3"] = "sns"
1✔
948
        else:
949
            # assume email content in S3
950
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
951
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
952
            transport = "s3"
1✔
953
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
954
    load_time_s = round(load_timer.last, 3)
1✔
955
    return (message_content, transport, load_time_s)
1✔
956

957

958
def _get_developer_mode_action(
1✔
959
    mask: RelayAddress | DomainAddress,
960
) -> DeveloperModeAction | None:
961
    """Get the developer mode actions for this mask, if enabled."""
962

963
    flag_name = "developer_mode"
1✔
964
    _, _ = get_waffle_flag_model().objects.get_or_create(
1✔
965
        name=flag_name,
966
        defaults={
967
            "note": "MPP-3932: Enable logging and overrides for Relay developers."
968
        },
969
    )
970

971
    if not (
1✔
972
        flag_is_active_in_task(flag_name, mask.user) and "DEV:" in mask.description
973
    ):
974
        return None
1✔
975

976
    if "DEV:simulate_complaint" in mask.description:
1!
977
        action = DeveloperModeAction(
1✔
978
            mask_id=mask.metrics_id,
979
            action="simulate_complaint",
980
            new_destination_address=f"complaint+{mask.address}@simulator.amazonses.com",
981
        )
982
    else:
NEW
983
        action = DeveloperModeAction(mask_id=mask.metrics_id, action="log")
×
984
    return action
1✔
985

986

987
def _log_dev_notification(
1✔
988
    log_message: str, dev_action: DeveloperModeAction, notification: dict[str, Any]
989
) -> None:
990
    """
991
    Log notification JSON
992

993
    This will log information beyond our privacy policy, so it should only be used on
994
    Relay staff accounts with prior permission.
995

996
    The notification JSON will be compressed, Ascii85-encoded with padding, and broken
997
    into 1024-bytes chunks. This will ensure it fits into GCP's log entry, which has a
998
    64KB limit per label value.
999
    """
1000

1001
    notification_gza85 = base64.a85encode(
1✔
1002
        zlib.compress(json.dumps(notification).encode()), wrapcol=1024, pad=True
1003
    ).decode("ascii")
1004
    total_parts = notification_gza85.count("\n") + 1
1✔
1005
    for partnum, part in enumerate(notification_gza85.splitlines()):
1✔
1006
        info_logger.info(
1✔
1007
            log_message,
1008
            extra={
1009
                "mask_id": dev_action.mask_id,
1010
                "dev_action": dev_action.action,
1011
                "part": partnum,
1012
                "parts": total_parts,
1013
                "notification_gza85": part,
1014
            },
1015
        )
1016

1017

1018
def _convert_to_forwarded_email(
1✔
1019
    incoming_email_bytes: bytes,
1020
    headers: OutgoingHeaders,
1021
    to_address: str,
1022
    from_address: str,
1023
    language: str,
1024
    has_premium: bool,
1025
    sample_trackers: bool,
1026
    remove_level_one_trackers: bool,
1027
    now: datetime | None = None,
1028
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
1029
    """
1030
    Convert an email (as bytes) to a forwarded email.
1031

1032
    Return is a tuple:
1033
    - email - The forwarded email
1034
    - issues - Any detected issues in conversion
1035
    - level_one_trackers_removed (int) - Number of trackers removed
1036
    - has_html - True if the email has an HTML representation
1037
    - has_text - True if the email has a plain text representation
1038
    """
1039
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
1040
    # python/typeshed issue 2418
1041
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
1042
    # policy.default.message_factory is EmailMessage
1043
    if not isinstance(email, EmailMessage):
1!
1044
        raise TypeError("email must be type EmailMessage")
×
1045

1046
    # Replace headers in the original email
1047
    header_issues = _replace_headers(email, headers)
1✔
1048

1049
    # Find and replace text content
1050
    text_body = email.get_body("plain")
1✔
1051
    text_content = None
1✔
1052
    has_text = False
1✔
1053
    if text_body:
1✔
1054
        has_text = True
1✔
1055
        if not isinstance(text_body, EmailMessage):
1!
1056
            raise TypeError("text_body must be type EmailMessage")
×
1057
        text_content = text_body.get_content()
1✔
1058
        new_text_content = _convert_text_content(text_content, to_address)
1✔
1059
        text_body.set_content(new_text_content)
1✔
1060

1061
    # Find and replace HTML content
1062
    html_body = email.get_body("html")
1✔
1063
    level_one_trackers_removed = 0
1✔
1064
    has_html = False
1✔
1065
    if html_body:
1✔
1066
        has_html = True
1✔
1067
        if not isinstance(html_body, EmailMessage):
1!
1068
            raise TypeError("html_body must be type EmailMessage")
×
1069
        html_content = html_body.get_content()
1✔
1070
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1071
            html_content,
1072
            to_address,
1073
            from_address,
1074
            language,
1075
            has_premium,
1076
            sample_trackers,
1077
            remove_level_one_trackers,
1078
        )
1079
        html_body.set_content(new_content, subtype="html")
1✔
1080
    elif text_content:
1!
1081
        # Try to use the text content to generate HTML content
1082
        html_content = urlize_and_linebreaks(text_content)
1✔
1083
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1084
            html_content,
1085
            to_address,
1086
            from_address,
1087
            language,
1088
            has_premium,
1089
            sample_trackers,
1090
            remove_level_one_trackers,
1091
        )
1092
        if not isinstance(text_body, EmailMessage):
1!
1093
            raise TypeError("text_body must be type EmailMessage")
×
1094
        try:
1✔
1095
            text_body.add_alternative(new_content, subtype="html")
1✔
1096
        except TypeError as e:
×
1097
            out = StringIO()
×
1098
            _structure(email, fp=out)
×
1099
            info_logger.error(
×
1100
                "Adding HTML alternate failed",
1101
                extra={"exception": str(e), "structure": out.getvalue()},
1102
            )
1103

1104
    issues: EmailForwardingIssues = {}
1✔
1105
    if header_issues:
1✔
1106
        issues["headers"] = header_issues
1✔
1107
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
1108

1109

1110
def _replace_headers(
1✔
1111
    email: EmailMessage, headers: OutgoingHeaders
1112
) -> EmailHeaderIssues:
1113
    """
1114
    Replace the headers in email with new headers.
1115

1116
    This replaces headers in the passed email object, rather than returns an altered
1117
    copy. The primary reason is that the Python email package can read an email with
1118
    non-compliant headers or content, but can't write it. A read/write is required to
1119
    create a copy that we then alter. This code instead alters the passed EmailMessage
1120
    object, making header-specific changes in try / except statements.
1121

1122
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1123
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1124
    nice to handle the non-compliant headers without crashing before we add a source of
1125
    memory-related crashes.
1126
    """
1127
    # Look for headers to drop
1128
    to_drop: list[str] = []
1✔
1129
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
1✔
1130
    issues: EmailHeaderIssues = []
1✔
1131

1132
    # Detect non-compliant headers in incoming emails
1133
    for header in email.keys():
1✔
1134
        try:
1✔
1135
            value = email[header]
1✔
1136
        except Exception as e:
1✔
1137
            issues.append(
1✔
1138
                {"header": header, "direction": "in", "exception_on_read": repr(e)}
1139
            )
1140
            value = None
1✔
1141
        if getattr(value, "defects", None):
1✔
1142
            issues.append(
1✔
1143
                {
1144
                    "header": header,
1145
                    "direction": "in",
1146
                    "defect_count": len(value.defects),
1147
                    "parsed_value": str(value),
1148
                    "raw_value": str(value.as_raw),
1149
                }
1150
            )
1151
        elif getattr(getattr(value, "_parse_tree", None), "all_defects", []):
1✔
1152
            issues.append(
1✔
1153
                {
1154
                    "header": header,
1155
                    "direction": "in",
1156
                    "defect_count": len(value._parse_tree.all_defects),
1157
                    "parsed_value": str(value),
1158
                    "raw_value": str(value.as_raw),
1159
                }
1160
            )
1161

1162
    # Collect headers that will not be forwarded
1163
    for header in email.keys():
1✔
1164
        header_lower = header.lower()
1✔
1165
        if (
1✔
1166
            header_lower not in replacements
1167
            and header_lower != "mime-version"
1168
            and not header_lower.startswith("content-")
1169
        ):
1170
            to_drop.append(header)
1✔
1171

1172
    # Drop headers that should be dropped
1173
    for header in to_drop:
1✔
1174
        del email[header]
1✔
1175

1176
    # Replace the requested headers
1177
    for header, value in headers.items():
1✔
1178
        del email[header]
1✔
1179
        try:
1✔
1180
            email[header] = value.rstrip("\r\n")
1✔
1181
        except Exception as e:
×
1182
            issues.append(
×
1183
                {
1184
                    "header": header,
1185
                    "direction": "out",
1186
                    "exception_on_write": repr(e),
1187
                    "value": value,
1188
                }
1189
            )
1190
            continue
×
1191
        try:
1✔
1192
            parsed_value = email[header]
1✔
1193
        except Exception as e:
×
1194
            issues.append(
×
1195
                {
1196
                    "header": header,
1197
                    "direction": "out",
1198
                    "exception_on_write": repr(e),
1199
                    "value": value,
1200
                }
1201
            )
1202
            continue
×
1203
        if parsed_value.defects:
1!
1204
            issues.append(
×
1205
                {
1206
                    "header": header,
1207
                    "direction": "out",
1208
                    "defect_count": len(parsed_value.defects),
1209
                    "parsed_value": str(parsed_value),
1210
                    "raw_value": str(parsed_value.as_raw),
1211
                },
1212
            )
1213

1214
    return issues
1✔
1215

1216

1217
def _convert_html_content(
1✔
1218
    html_content: str,
1219
    to_address: str,
1220
    from_address: str,
1221
    language: str,
1222
    has_premium: bool,
1223
    sample_trackers: bool,
1224
    remove_level_one_trackers: bool,
1225
    now: datetime | None = None,
1226
) -> tuple[str, int]:
1227
    # frontend expects a timestamp in milliseconds
1228
    now = now or datetime.now(UTC)
1✔
1229
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1230

1231
    # scramble alias so that clients don't recognize it
1232
    # and apply default link styles
1233
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1234

1235
    # sample tracker numbers
1236
    if sample_trackers:
1!
1237
        count_all_trackers(html_content)
×
1238

1239
    tracker_report_link = ""
1✔
1240
    removed_count = 0
1✔
1241
    if remove_level_one_trackers:
1!
1242
        html_content, tracker_details = remove_trackers(
×
1243
            html_content, from_address, datetime_now_ms
1244
        )
1245
        removed_count = tracker_details["tracker_removed"]
×
1246
        tracker_report_details = {
×
1247
            "sender": from_address,
1248
            "received_at": datetime_now_ms,
1249
            "trackers": tracker_details["level_one"]["trackers"],
1250
        }
1251
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1252
            tracker_report_details
1253
        )
1254

1255
    wrapped_html = wrap_html_email(
1✔
1256
        original_html=html_content,
1257
        language=language,
1258
        has_premium=has_premium,
1259
        display_email=display_email,
1260
        tracker_report_link=tracker_report_link,
1261
        num_level_one_email_trackers_removed=removed_count,
1262
    )
1263
    return wrapped_html, removed_count
1✔
1264

1265

1266
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1267
    relay_header_text = (
1✔
1268
        "This email was sent to your alias "
1269
        f"{to_address}. To stop receiving emails sent to this alias, "
1270
        "update the forwarding settings in your dashboard.\n"
1271
        "---Begin Email---\n"
1272
    )
1273
    wrapped_text = relay_header_text + text_content
1✔
1274
    return wrapped_text
1✔
1275

1276

1277
def _build_reply_requires_premium_email(
1✔
1278
    from_address: str,
1279
    reply_record: Reply,
1280
    message_id: str | None,
1281
    decrypted_metadata: dict[str, Any] | None,
1282
) -> EmailMessage:
1283
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1284
    # will forward.  So, tell the user we forwarded it.
1285
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1286
    sender: str | None = ""
1✔
1287
    if decrypted_metadata is not None:
1!
1288
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1289
    ctx = {
1✔
1290
        "sender": sender or "",
1291
        "forwarded": forwarded,
1292
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1293
    }
1294
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1295
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1296

1297
    # Create the message
1298
    msg = EmailMessage()
1✔
1299
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1300
    msg["From"] = get_reply_to_address()
1✔
1301
    msg["To"] = from_address
1✔
1302
    if message_id:
1!
1303
        msg["In-Reply-To"] = message_id
1✔
1304
        msg["References"] = message_id
1✔
1305
    msg.set_content(text_body)
1✔
1306
    msg.add_alternative(html_body, subtype="html")
1✔
1307
    return msg
1✔
1308

1309

1310
def _set_forwarded_first_reply(profile):
1✔
1311
    profile.forwarded_first_reply = True
1✔
1312
    profile.save()
1✔
1313

1314

1315
def _send_reply_requires_premium_email(
1✔
1316
    from_address: str,
1317
    reply_record: Reply,
1318
    message_id: str | None,
1319
    decrypted_metadata: dict[str, Any] | None,
1320
) -> None:
1321
    msg = _build_reply_requires_premium_email(
×
1322
        from_address, reply_record, message_id, decrypted_metadata
1323
    )
1324
    try:
×
1325
        ses_send_raw_email(
×
1326
            source_address=get_reply_to_address(premium=False),
1327
            destination_address=from_address,
1328
            message=msg,
1329
        )
1330
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1331
        # So, updated the DB.
1332
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1333
    except ClientError as e:
×
1334
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1335
    incr_if_enabled("free_user_reply_attempt", 1)
×
1336

1337

1338
def _reply_allowed(
1✔
1339
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1340
):
1341
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1342
    reply_record_email = reply_record.address.user.email
1✔
1343
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1344
    if (from_address == reply_record_email) or (
1!
1345
        stripped_from_address == stripped_reply_record_address
1346
    ):
1347
        # This is a Relay user replying to an external sender;
1348

1349
        if not reply_record.profile.user.is_active:
1!
1350
            return False
×
1351

1352
        if reply_record.profile.is_flagged:
1!
1353
            return False
×
1354

1355
        if reply_record.owner_has_premium:
1!
1356
            return True
1✔
1357

1358
        # if we haven't forwarded a first reply for this user, return True to allow
1359
        # this first reply
1360
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
1361
        _send_reply_requires_premium_email(
×
1362
            from_address, reply_record, message_id, decrypted_metadata
1363
        )
1364
        return allow_first_reply
×
1365
    else:
1366
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1367
        # premium Relay user
1368
        try:
×
1369
            address = _get_address(to_address)
×
1370
            if address.user.profile.has_premium:
×
1371
                return True
×
1372
        except ObjectDoesNotExist:
×
1373
            return False
×
1374
    incr_if_enabled("free_user_reply_attempt", 1)
×
1375
    return False
×
1376

1377

1378
def _handle_reply(
1✔
1379
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1380
) -> HttpResponse:
1381
    """
1382
    Handle a reply from a Relay user to an external email.
1383

1384
    Returns (may be incomplete):
1385
    * 200 if the reply was sent
1386
    * 400 if the In-Reply-To and References headers are missing, none of the References
1387
      headers are a reply record, or the SES client raises an error
1388
    * 403 if the Relay user is not allowed to reply
1389
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1390
      the database
1391
    * 503 if the S3 client returns an error (other than not found), or the SES client
1392
      returns an error
1393

1394
    TODO: Return a more appropriate status object (see _handle_received)
1395
    TODO: Document metrics emitted
1396
    """
1397
    mail = message_json["mail"]
1✔
1398
    try:
1✔
1399
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1400
    except ReplyHeadersNotFound:
1✔
1401
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1402
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1403

1404
    try:
1✔
1405
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1406
    except Reply.DoesNotExist:
1✔
1407
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1408
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1409

1410
    address = reply_record.address
1✔
1411
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1412
    decrypted_metadata = json.loads(
1✔
1413
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1414
    )
1415
    if not _reply_allowed(
1✔
1416
        from_address, to_address, reply_record, message_id, decrypted_metadata
1417
    ):
1418
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
1✔
1419
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1420

1421
    outbound_from_address = address.full_address
1✔
1422
    incr_if_enabled("reply_email", 1)
1✔
1423
    subject = mail["commonHeaders"].get("subject", "")
1✔
1424
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1425
    headers: OutgoingHeaders = {
1✔
1426
        "Subject": subject,
1427
        "From": outbound_from_address,
1428
        "To": to_address,
1429
        "Reply-To": outbound_from_address,
1430
    }
1431

1432
    try:
1✔
1433
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1434
    except ClientError as e:
1✔
1435
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1436
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1437
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
1✔
1438
            return HttpResponse("Email not in S3", status=404)
1✔
1439
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1440
        log_email_dropped(
1✔
1441
            reason="error_storage", mask=address, is_reply=True, can_retry=True
1442
        )
1443
        # we are returning a 500 so that SNS can retry the email processing
1444
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1445

1446
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1447
    if not isinstance(email, EmailMessage):
1!
1448
        raise TypeError("email must be type EmailMessage")
×
1449

1450
    # Convert to a reply email
1451
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1452
    _replace_headers(email, headers)
1✔
1453

1454
    try:
1✔
1455
        ses_send_raw_email(
1✔
1456
            source_address=outbound_from_address,
1457
            destination_address=to_address,
1458
            message=email,
1459
        )
1460
    except ClientError:
1✔
1461
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
1✔
1462
        return HttpResponse("SES client error", status=400)
1✔
1463

1464
    reply_record.increment_num_replied()
1✔
1465
    profile = address.user.profile
1✔
1466
    profile.update_abuse_metric(replied=True)
1✔
1467
    profile.last_engagement = datetime.now(UTC)
1✔
1468
    profile.save()
1✔
1469
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1470
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1471

1472

1473
def _get_domain_address(
1✔
1474
    local_portion: str, domain_portion: str, create: bool = True
1475
) -> DomainAddress:
1476
    """
1477
    Find or create the DomainAddress for the parts of an email address.
1478

1479
    If the domain_portion is for a valid subdomain, a new DomainAddress
1480
    will be created and returned.
1481

1482
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1483

1484
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1485
    """
1486

1487
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1488
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1489
        incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1490
        raise ObjectDoesNotExist("Address does not exist")
1✔
1491
    try:
1✔
1492
        with transaction.atomic():
1✔
1493
            locked_profile = Profile.objects.select_for_update().get(
1✔
1494
                subdomain=address_subdomain
1495
            )
1496
            domain_numerical = get_domain_numerical(address_domain)
1✔
1497
            # filter DomainAddress because it may not exist
1498
            # which will throw an error with get()
1499
            domain_address = DomainAddress.objects.filter(
1✔
1500
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1501
            ).first()
1502
            if domain_address is None:
1✔
1503
                if not create:
1!
1504
                    raise ObjectDoesNotExist("Address does not exist")
×
1505
                # TODO: Consider flows when a user generating alias on a fly
1506
                # was unable to receive an email due to user no longer being a
1507
                # premium user as seen in exception thrown on make_domain_address
1508
                domain_address = DomainAddress.make_domain_address(
1✔
1509
                    locked_profile.user, local_portion, True
1510
                )
1511
                glean_logger().log_email_mask_created(
1✔
1512
                    mask=domain_address,
1513
                    created_by_api=False,
1514
                )
1515
            domain_address.last_used_at = datetime.now(UTC)
1✔
1516
            domain_address.save()
1✔
1517
            return domain_address
1✔
1518
    except Profile.DoesNotExist as e:
1✔
1519
        incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1520
        raise e
1✔
1521

1522

1523
def _get_address(address: str, create: bool = True) -> RelayAddress | DomainAddress:
1✔
1524
    """
1525
    Find or create the RelayAddress or DomainAddress for an email address.
1526

1527
    If an unknown email address is for a valid subdomain, and create is True,
1528
    a new DomainAddress will be created.
1529

1530
    On failure, raises exception based on Django's ObjectDoesNotExist:
1531
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1532
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1533
    * ObjectDoesNotExist - Unknown domain
1534
    """
1535

1536
    local_portion, domain_portion = address.split("@")
1✔
1537
    local_address = local_portion.lower()
1✔
1538
    domain = domain_portion.lower()
1✔
1539

1540
    # if the domain is not the site's 'top' relay domain,
1541
    # it may be for a user's subdomain
1542
    email_domains = get_domains_from_settings().values()
1✔
1543
    if domain not in email_domains:
1✔
1544
        return _get_domain_address(local_address, domain)
1✔
1545

1546
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1547
    try:
1✔
1548
        domain_numerical = get_domain_numerical(domain)
1✔
1549
        relay_address = RelayAddress.objects.get(
1✔
1550
            address=local_address, domain=domain_numerical
1551
        )
1552
        return relay_address
1✔
1553
    except RelayAddress.DoesNotExist as e:
1✔
1554
        if not create:
1!
1555
            raise e
×
1556
        try:
1✔
1557
            DeletedAddress.objects.get(
1✔
1558
                address_hash=address_hash(local_address, domain=domain)
1559
            )
1560
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1561
            # TODO: create a hard bounce receipt rule in SES
1562
        except DeletedAddress.DoesNotExist:
1✔
1563
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1564
        except DeletedAddress.MultipleObjectsReturned:
1✔
1565
            # not sure why this happens on stage but let's handle it
1566
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1567
        raise e
1✔
1568

1569

1570
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1571
    """
1572
    Handle an AWS SES bounce notification.
1573

1574
    For more information, see:
1575
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1576

1577
    Returns:
1578
    * 404 response if any email address does not match a user,
1579
    * 200 response if all match or none are given
1580

1581
    Emits a counter metric "email_bounce" with these tags:
1582
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1583
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1584
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1585
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1586

1587
    Emits an info log "bounce_notification", same data as metric, plus:
1588
    * bounce_action: 'action' from bounced recipient data, or None
1589
    * bounce_status: 'status' from bounced recipient data, or None
1590
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1591
    * bounce_extra: Extra data from bounce_recipient data, if any
1592
    * domain: User's real email address domain, if an address was given
1593
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1594
    """
1595
    bounce = message_json.get("bounce", {})
1✔
1596
    bounce_type = bounce.get("bounceType", "none")
1✔
1597
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1598
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1599

1600
    now = datetime.now(UTC)
1✔
1601
    bounce_data = []
1✔
1602
    for recipient in bounced_recipients:
1✔
1603
        recipient_address = recipient.pop("emailAddress", None)
1✔
1604
        data = {
1✔
1605
            "bounce_type": bounce_type,
1606
            "bounce_subtype": bounce_subtype,
1607
            "bounce_action": recipient.pop("action", ""),
1608
            "bounce_status": recipient.pop("status", ""),
1609
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1610
            "user_match": "no_address",
1611
            "relay_action": "no_action",
1612
        }
1613
        if recipient:
1!
1614
            data["bounce_extra"] = recipient.copy()
×
1615
        bounce_data.append(data)
1✔
1616

1617
        if recipient_address is None:
1!
1618
            continue
×
1619

1620
        recipient_address = parseaddr(recipient_address)[1]
1✔
1621
        recipient_domain = recipient_address.split("@")[1]
1✔
1622
        data["domain"] = recipient_domain
1✔
1623

1624
        try:
1✔
1625
            user = User.objects.get(email=recipient_address)
1✔
1626
            profile = user.profile
1✔
1627
            data["user_match"] = "found"
1✔
1628
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1629
                data["fxa_id"] = fxa.uid
1✔
1630
            else:
1631
                data["fxa_id"] = ""
1✔
1632
        except User.DoesNotExist:
1✔
1633
            # TODO: handle bounce for a user who no longer exists
1634
            # add to SES account-wide suppression list?
1635
            data["user_match"] = "missing"
1✔
1636
            continue
1✔
1637

1638
        action = None
1✔
1639
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1640
            # if an email bounced as spam, set to auto block spam for this user
1641
            # and DON'T set them into bounce pause state
1642
            action = "auto_block_spam"
1✔
1643
            profile.auto_block_spam = True
1✔
1644
        elif bounce_type == "Permanent":
1✔
1645
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1646
            action = "hard_bounce"
1✔
1647
            profile.last_hard_bounce = now
1✔
1648
        elif bounce_type == "Transient":
1!
1649
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1650
            action = "soft_bounce"
1✔
1651
            profile.last_soft_bounce = now
1✔
1652
        if action:
1!
1653
            data["relay_action"] = action
1✔
1654
            profile.save()
1✔
1655

1656
    if not bounce_data:
1!
1657
        # Data when there are no identified recipients
1658
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1659

1660
    for data in bounce_data:
1✔
1661
        tags = {
1✔
1662
            "bounce_type": bounce_type,
1663
            "bounce_subtype": bounce_subtype,
1664
            "user_match": data["user_match"],
1665
            "relay_action": data["relay_action"],
1666
        }
1667
        incr_if_enabled(
1✔
1668
            "email_bounce",
1669
            1,
1670
            tags=[generate_tag(key, val) for key, val in tags.items()],
1671
        )
1672
        info_logger.info("bounce_notification", extra=data)
1✔
1673

1674
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1675
        return HttpResponse("Address does not exist", status=404)
1✔
1676
    return HttpResponse("OK", status=200)
1✔
1677

1678

1679
def _build_disabled_mask_for_spam_email(
1✔
1680
    mask: RelayAddress | DomainAddress, original_spam_email: dict
1681
) -> EmailMessage:
1682
    ctx = {
1✔
1683
        "mask": mask.full_address,
1684
        "spam_email": original_spam_email,
1685
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1686
    }
1687
    html_body = render_to_string("emails/disabled_mask_for_spam.html", ctx)
1✔
1688
    text_body = render_to_string("emails/disabled_mask_for_spam.txt", ctx)
1✔
1689

1690
    # Create the message
1691
    msg = EmailMessage()
1✔
1692
    msg["Subject"] = ftl_bundle.format("relay-disabled-your-mask")
1✔
1693
    msg["From"] = settings.RELAY_FROM_ADDRESS
1✔
1694
    msg["To"] = mask.user.email
1✔
1695
    msg.set_content(text_body)
1✔
1696
    msg.add_alternative(html_body, subtype="html")
1✔
1697
    return msg
1✔
1698

1699

1700
def _send_disabled_mask_for_spam_email(
1✔
1701
    mask: RelayAddress | DomainAddress, original_spam_email: dict
1702
) -> None:
1703
    msg = _build_disabled_mask_for_spam_email(mask, original_spam_email)
1✔
1704
    if not settings.RELAY_FROM_ADDRESS:
1!
1705
        raise ValueError(
×
1706
            "Must set settings.RELAY_FROM_ADDRESS to send disabled_mask_for_spam email."
1707
        )
1708
    try:
1✔
1709
        ses_send_raw_email(
1✔
1710
            source_address=settings.RELAY_FROM_ADDRESS,
1711
            destination_address=mask.user.email,
1712
            message=msg,
1713
        )
1714
    except ClientError as e:
×
1715
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1716
    incr_if_enabled("free_user_reply_attempt", 1)
1✔
1717

1718

1719
def _disable_masks_for_complaint(message_json: dict, user: User) -> None:
1✔
1720
    """
1721
    See https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#mail-object
1722

1723
    message_json.mail.source contains the envelope MAIL FROM address from which the
1724
    original message was sent.
1725

1726
    Relay sends emails From: "original-From" <relay-mask@mozmail.com>.
1727

1728
    So, we can find the mask that sent the spam email by parsing the source value.
1729
    """
1730
    flag_name = "disable_mask_on_complaint"
1✔
1731
    _, _ = get_waffle_flag_model().objects.get_or_create(
1✔
1732
        name=flag_name,
1733
        defaults={
1734
            "note": (
1735
                "MPP-3119: When a Relay user marks an email as spam, disable the mask."
1736
            )
1737
        },
1738
    )
1739
    source = message_json.get("mail", {}).get("source", "")
1✔
1740
    # parseaddr is confused by 2 email addresses in the value, so use this
1741
    # regular expression to extract the mask address by searching for any relay domains
1742
    email_domains = get_domains_from_settings().values()
1✔
1743
    domain_pattern = "|".join(re.escape(domain) for domain in email_domains)
1✔
1744
    email_regex = rf"[\w\.-]+@(?:{domain_pattern})"
1✔
1745
    matches = re.findall(email_regex, source)
1✔
1746
    if not matches:
1!
UNCOV
1747
        return
×
1748
    for mask_address in matches:
1✔
1749
        try:
1✔
1750
            address = _get_address(mask_address, False)
1✔
1751
            # ensure the mask belongs to the user for whom Relay received a complaint,
1752
            # and that they haven't already disabled the mask themselves.
1753
            if address.user != user or address.enabled is False:
1!
1754
                continue
×
1755
            if flag_is_active_in_task(flag_name, address.user):
1✔
1756
                address.enabled = False
1✔
1757
                address.save()
1✔
1758
                _send_disabled_mask_for_spam_email(
1✔
1759
                    address, message_json.get("mail", {})
1760
                )
1761
        except (
×
1762
            ObjectDoesNotExist,
1763
            RelayAddress.DoesNotExist,
1764
            DomainAddress.DoesNotExist,
1765
        ):
1766
            logger.error(
×
1767
                "Received a complaint from a destination address that does not match "
1768
                "a Relay address.",
1769
            )
1770

1771

1772
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1773
    """
1774
    Handle an AWS SES complaint notification.
1775

1776
    Sets the user's auto_block_spam flag to True.
1777

1778
    Disables the mask thru which the spam mail was forwarded, and sends an email to the
1779
    user to notify them the mask is disabled and can be re-enabled on their dashboard.
1780

1781
    For more information, see:
1782
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1783

1784
    Returns:
1785
    * 404 response if any email address does not match a user,
1786
    * 200 response if all match or none are given
1787

1788
    Emits a counter metric "email_complaint" with these tags:
1789
    * complaint_subtype: 'onaccountsuppressionlist', or 'none' if omitted
1790
    * complaint_feedback - feedback enumeration from ISP or 'none'
1791
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1792
    * relay_action: 'no_action', 'auto_block_spam'
1793

1794
    Emits an info log "complaint_notification", same data as metric, plus:
1795
    * complaint_user_agent - identifies the client used to file the complaint
1796
    * complaint_extra - Extra data from complainedRecipients data, if any
1797
    * domain - User's domain, if an address was given
1798
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1799
    """
1800
    complaint = deepcopy(message_json.get("complaint", {}))
1✔
1801
    complained_recipients = complaint.pop("complainedRecipients", [])
1✔
1802
    subtype = complaint.pop("complaintSubType", None)
1✔
1803
    user_agent = complaint.pop("userAgent", None)
1✔
1804
    feedback = complaint.pop("complaintFeedbackType", None)
1✔
1805

1806
    complaint_data = []
1✔
1807
    for recipient in complained_recipients:
1✔
1808
        recipient_address = recipient.pop("emailAddress", None)
1✔
1809
        data = {
1✔
1810
            "complaint_subtype": subtype,
1811
            "complaint_user_agent": user_agent,
1812
            "complaint_feedback": feedback,
1813
            "user_match": "no_address",
1814
            "relay_action": "no_action",
1815
        }
1816
        if recipient:
1!
1817
            data["complaint_extra"] = recipient.copy()
×
1818
        complaint_data.append(data)
1✔
1819

1820
        if recipient_address is None:
1!
1821
            continue
×
1822

1823
        recipient_address = parseaddr(recipient_address)[1]
1✔
1824
        recipient_domain = recipient_address.split("@")[1]
1✔
1825
        data["domain"] = recipient_domain
1✔
1826

1827
        try:
1✔
1828
            user = User.objects.get(email=recipient_address)
1✔
1829
            profile = user.profile
1✔
1830
            data["user_match"] = "found"
1✔
1831
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1832
                data["fxa_id"] = fxa.uid
1✔
1833
            else:
1834
                data["fxa_id"] = ""
1✔
1835
        except User.DoesNotExist:
×
1836
            data["user_match"] = "missing"
×
1837
            continue
×
1838

1839
        data["relay_action"] = "auto_block_spam"
1✔
1840
        profile.auto_block_spam = True
1✔
1841
        profile.save()
1✔
1842

1843
        _disable_masks_for_complaint(message_json, user)
1✔
1844

1845
    if not complaint_data:
1!
1846
        # Data when there are no identified recipients
1847
        complaint_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1848

1849
    if flag_is_active_in_task("developer_mode", user):
1✔
1850
        # MPP-3932: We need more information to match complaints to masks
1851
        dev_action = DeveloperModeAction(mask_id="unknown", action="log")
1✔
1852
        _log_dev_notification("_handle_complaint: MPP-3932", dev_action, message_json)
1✔
1853

1854
    for data in complaint_data:
1✔
1855
        tags = {
1✔
1856
            "complaint_subtype": subtype or "none",
1857
            "complaint_feedback": feedback or "none",
1858
            "user_match": data["user_match"],
1859
            "relay_action": data["relay_action"],
1860
        }
1861
        incr_if_enabled(
1✔
1862
            "email_complaint",
1863
            1,
1864
            tags=[generate_tag(key, val) for key, val in tags.items()],
1865
        )
1866
        info_logger.info("complaint_notification", extra=data)
1✔
1867

1868
    if any(data["user_match"] == "missing" for data in complaint_data):
1!
1869
        return HttpResponse("Address does not exist", status=404)
×
1870
    return HttpResponse("OK", status=200)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc