• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 23182427-2247-476e-8849-1b3b4acceb2c

15 Oct 2024 04:08PM CUT coverage: 84.491% (+0.09%) from 84.402%
23182427-2247-476e-8849-1b3b4acceb2c

push

circleci

web-flow
Merge pull request #5090 from mozilla/add-developer-mode-flag-mpp-3932

MPP-3932: Add flag 'developer_mode', use to simulate complaint and log notifications

2372 of 3515 branches covered (67.48%)

Branch coverage included in aggregate %.

143 of 145 new or added lines in 4 files covered. (98.62%)

1 existing line in 1 file now uncovered.

16456 of 18769 relevant lines covered (87.68%)

10.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.55
/emails/views.py
1
import html
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import shlex
1✔
6
from copy import deepcopy
1✔
7
from datetime import UTC, datetime
1✔
8
from email import message_from_bytes
1✔
9
from email.iterators import _structure
1✔
10
from email.message import EmailMessage
1✔
11
from email.utils import parseaddr
1✔
12
from io import StringIO
1✔
13
from json import JSONDecodeError
1✔
14
from textwrap import dedent
1✔
15
from typing import Any, Literal, NamedTuple
1✔
16
from urllib.parse import urlencode
1✔
17

18
from django.conf import settings
1✔
19
from django.contrib.auth.models import User
1✔
20
from django.core.exceptions import ObjectDoesNotExist
1✔
21
from django.db import transaction
1✔
22
from django.db.models import prefetch_related_objects
1✔
23
from django.http import HttpRequest, HttpResponse
1✔
24
from django.shortcuts import render
1✔
25
from django.template.loader import render_to_string
1✔
26
from django.utils.html import escape
1✔
27
from django.views.decorators.csrf import csrf_exempt
1✔
28

29
from botocore.exceptions import ClientError
1✔
30
from codetiming import Timer
1✔
31
from decouple import strtobool
1✔
32
from markus.utils import generate_tag
1✔
33
from sentry_sdk import capture_message
1✔
34
from waffle import get_waffle_flag_model, sample_is_active
1✔
35

36
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
37
from privaterelay.models import Profile
1✔
38
from privaterelay.utils import (
1✔
39
    flag_is_active_in_task,
40
    get_subplat_upgrade_link_by_language,
41
    glean_logger,
42
)
43

44
from .exceptions import CannotMakeAddressException
1✔
45
from .models import (
1✔
46
    DeletedAddress,
47
    DomainAddress,
48
    RelayAddress,
49
    Reply,
50
    address_hash,
51
    get_domain_numerical,
52
)
53
from .policy import relay_policy
1✔
54
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
1✔
55
from .types import (
1✔
56
    AWS_MailJSON,
57
    AWS_SNSMessageJSON,
58
    EmailForwardingIssues,
59
    EmailHeaderIssues,
60
    OutgoingHeaders,
61
)
62
from .utils import (
1✔
63
    InvalidFromHeader,
64
    _get_bucket_and_key_from_s3_json,
65
    b64_lookup_key,
66
    count_all_trackers,
67
    decrypt_reply_metadata,
68
    derive_reply_keys,
69
    encode_dict_gza85,
70
    encrypt_reply_metadata,
71
    generate_from_header,
72
    get_domains_from_settings,
73
    get_message_content_from_s3,
74
    get_message_id_bytes,
75
    get_reply_to_address,
76
    histogram_if_enabled,
77
    incr_if_enabled,
78
    parse_email_header,
79
    remove_message_from_s3,
80
    remove_trackers,
81
    ses_send_raw_email,
82
    urlize_and_linebreaks,
83
)
84

85
logger = logging.getLogger("events")
1✔
86
info_logger = logging.getLogger("eventsinfo")
1✔
87

88

89
class ReplyHeadersNotFound(Exception):
1✔
90
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
91
        self.message = message
1✔
92

93

94
def first_time_user_test(request):
1✔
95
    """
96
    Demonstrate rendering of the "First time Relay user" email.
97
    Settings like language can be given in the querystring, otherwise settings
98
    come from a random free profile.
99
    """
100
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
101
    email_context = {
×
102
        "in_bundle_country": in_bundle_country,
103
        "SITE_ORIGIN": settings.SITE_ORIGIN,
104
    }
105
    if request.GET.get("format", "html") == "text":
×
106
        return render(
×
107
            request,
108
            "emails/first_time_user.txt",
109
            email_context,
110
            "text/plain; charset=utf-8",
111
        )
112
    return render(request, "emails/first_time_user.html", email_context)
×
113

114

115
def reply_requires_premium_test(request):
1✔
116
    """
117
    Demonstrate rendering of the "Reply requires premium" email.
118

119
    Settings like language can be given in the querystring, otherwise settings
120
    come from a random free profile.
121
    """
122
    email_context = {
1✔
123
        "sender": "test@example.com",
124
        "forwarded": True,
125
        "SITE_ORIGIN": settings.SITE_ORIGIN,
126
    }
127
    for param in request.GET:
1✔
128
        email_context[param] = request.GET.get(param)
1✔
129
        if param == "forwarded" and request.GET[param] == "True":
1✔
130
            email_context[param] = True
1✔
131

132
    for param in request.GET:
1✔
133
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
134
            return render(
1✔
135
                request,
136
                "emails/reply_requires_premium.txt",
137
                email_context,
138
                "text/plain; charset=utf-8",
139
            )
140
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
141

142

143
def disabled_mask_for_spam_test(request):
1✔
144
    """
145
    Demonstrate rendering of the "Disabled mask for spam" email.
146

147
    Settings like language can be given in the querystring, otherwise settings
148
    come from a random free profile.
149
    """
150
    mask = "abc123456@mozmail.com"
×
151
    email_context = {
×
152
        "mask": mask,
153
        "SITE_ORIGIN": settings.SITE_ORIGIN,
154
    }
155
    for param in request.GET:
×
156
        email_context[param] = request.GET.get(param)
×
157

158
    for param in request.GET:
×
159
        if param == "content-type" and request.GET[param] == "text/plain":
×
160
            return render(
×
161
                request,
162
                "emails/disabled_mask_for_spam.txt",
163
                email_context,
164
                "text/plain; charset=utf-8",
165
            )
166
    return render(request, "emails/disabled_mask_for_spam.html", email_context)
×
167

168

169
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
170
    # TO DO: Update with correct context when trigger is created
171
    first_forwarded_email_html = render_to_string(
×
172
        "emails/first_forwarded_email.html",
173
        {
174
            "SITE_ORIGIN": settings.SITE_ORIGIN,
175
        },
176
    )
177

178
    wrapped_email = wrap_html_email(
×
179
        first_forwarded_email_html,
180
        "en-us",
181
        True,
182
        "test@example.com",
183
        0,
184
    )
185

186
    return HttpResponse(wrapped_email)
×
187

188

189
def wrap_html_email(
1✔
190
    original_html: str,
191
    language: str,
192
    has_premium: bool,
193
    display_email: str,
194
    num_level_one_email_trackers_removed: int | None = None,
195
    tracker_report_link: str | None = None,
196
) -> str:
197
    """Add Relay banners, surveys, etc. to an HTML email"""
198
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
199
    email_context = {
1✔
200
        "original_html": original_html,
201
        "language": language,
202
        "has_premium": has_premium,
203
        "subplat_upgrade_link": subplat_upgrade_link,
204
        "display_email": display_email,
205
        "tracker_report_link": tracker_report_link,
206
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
207
        "SITE_ORIGIN": settings.SITE_ORIGIN,
208
    }
209
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
210
    # Remove empty lines
211
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
212
    return "\n".join(content_lines) + "\n"
1✔
213

214

215
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
216
    """
217
    Demonstrate rendering of forwarded HTML emails.
218

219
    Settings like language can be given in the querystring, otherwise settings
220
    come from a randomly chosen profile.
221
    """
222

223
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
224
        user_profile = None
1✔
225
    else:
226
        user_profile = Profile.objects.order_by("?").first()
1✔
227

228
    if "language" in request.GET:
1✔
229
        language = request.GET["language"]
1✔
230
    else:
231
        if user_profile is None:
1!
232
            raise ValueError("user_profile must not be None")
×
233
        language = user_profile.language
1✔
234

235
    if "has_premium" in request.GET:
1✔
236
        has_premium = strtobool(request.GET["has_premium"])
1✔
237
    else:
238
        if user_profile is None:
1!
239
            raise ValueError("user_profile must not be None")
×
240
        has_premium = user_profile.has_premium
1✔
241

242
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
243
        num_level_one_email_trackers_removed = int(
1✔
244
            request.GET["num_level_one_email_trackers_removed"]
245
        )
246
    else:
247
        num_level_one_email_trackers_removed = 0
1✔
248

249
    if "has_tracker_report_link" in request.GET:
1✔
250
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
251
    else:
252
        has_tracker_report_link = False
1✔
253
    if has_tracker_report_link:
1✔
254
        if num_level_one_email_trackers_removed:
1✔
255
            trackers = {
1✔
256
                "fake-tracker.example.com": num_level_one_email_trackers_removed
257
            }
258
        else:
259
            trackers = {}
1✔
260
        tracker_report_link = (
1✔
261
            "/tracker-report/#{"
262
            '"sender": "sender@example.com", '
263
            '"received_at": 1658434657, '
264
            f'"trackers": { json.dumps(trackers) }'
265
            "}"
266
        )
267
    else:
268
        tracker_report_link = ""
1✔
269

270
    path = "/emails/wrapped_email_test"
1✔
271
    old_query = {
1✔
272
        "language": language,
273
        "has_premium": "Yes" if has_premium else "No",
274
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
275
        "num_level_one_email_trackers_removed": str(
276
            num_level_one_email_trackers_removed
277
        ),
278
    }
279

280
    def switch_link(key, value):
1✔
281
        if old_query[key] == value:
1✔
282
            return str(value)
1✔
283
        new_query = old_query.copy()
1✔
284
        new_query[key] = value
1✔
285
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
286

287
    html_content = dedent(
1✔
288
        f"""\
289
    <p>
290
      <strong>Email rendering Test</strong>
291
    </p>
292
    <p>Settings: (<a href="{path}">clear all</a>)</p>
293
    <ul>
294
      <li>
295
        <strong>language</strong>:
296
        {escape(language)}
297
        (switch to
298
        {switch_link("language", "en-us")},
299
        {switch_link("language", "de")},
300
        {switch_link("language", "en-gb")},
301
        {switch_link("language", "fr")},
302
        {switch_link("language", "ru-ru")},
303
        {switch_link("language", "es-es")},
304
        {switch_link("language", "pt-br")},
305
        {switch_link("language", "it-it")},
306
        {switch_link("language", "en-ca")},
307
        {switch_link("language", "de-de")},
308
        {switch_link("language", "es-mx")})
309
      </li>
310
      <li>
311
        <strong>has_premium</strong>:
312
        {"Yes" if has_premium else "No"}
313
        (switch to
314
        {switch_link("has_premium", "Yes")},
315
        {switch_link("has_premium", "No")})
316
      </li>
317
      <li>
318
        <strong>has_tracker_report_link</strong>:
319
        {"Yes" if has_tracker_report_link else "No"}
320
        (switch to
321
        {switch_link("has_tracker_report_link", "Yes")},
322
        {switch_link("has_tracker_report_link", "No")})
323
      </li>
324
      <li>
325
        <strong>num_level_one_email_trackers_removed</strong>:
326
        {num_level_one_email_trackers_removed}
327
        (switch to
328
        {switch_link("num_level_one_email_trackers_removed", "0")},
329
        {switch_link("num_level_one_email_trackers_removed", "1")},
330
        {switch_link("num_level_one_email_trackers_removed", "2")})
331
      </li>
332
    </ul>
333
    """
334
    )
335

336
    wrapped_email = wrap_html_email(
1✔
337
        original_html=html_content,
338
        language=language,
339
        has_premium=has_premium,
340
        tracker_report_link=tracker_report_link,
341
        display_email="test@relay.firefox.com",
342
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
343
    )
344
    return HttpResponse(wrapped_email)
1✔
345

346

347
def _store_reply_record(
1✔
348
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
349
) -> AWS_MailJSON:
350
    # After relaying email, store a Reply record for it
351
    reply_metadata = {}
1✔
352
    for header in mail["headers"]:
1✔
353
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
354
            reply_metadata[header["name"].lower()] = header["value"]
1✔
355
    message_id_bytes = get_message_id_bytes(message_id)
1✔
356
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
357
    lookup = b64_lookup_key(lookup_key)
1✔
358
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
359
    reply_create_args: dict[str, Any] = {
1✔
360
        "lookup": lookup,
361
        "encrypted_metadata": encrypted_metadata,
362
    }
363
    if isinstance(address, DomainAddress):
1✔
364
        reply_create_args["domain_address"] = address
1✔
365
    else:
366
        if not isinstance(address, RelayAddress):
1!
367
            raise TypeError("address must be type RelayAddress")
×
368
        reply_create_args["relay_address"] = address
1✔
369
    Reply.objects.create(**reply_create_args)
1✔
370
    return mail
1✔
371

372

373
@csrf_exempt
1✔
374
def sns_inbound(request):
1✔
375
    incr_if_enabled("sns_inbound", 1)
1✔
376
    # First thing we do is verify the signature
377
    json_body = json.loads(request.body)
1✔
378
    verified_json_body = verify_from_sns(json_body)
1✔
379

380
    # Validate ARN and message type
381
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
382
    message_type = verified_json_body.get("Type", None)
1✔
383
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
384
    if error_details:
1✔
385
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
386
        return HttpResponse(error_details["error"], status=400)
1✔
387

388
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
389

390

391
def validate_sns_arn_and_type(
1✔
392
    topic_arn: str | None, message_type: str | None
393
) -> dict[str, Any] | None:
394
    """
395
    Validate Topic ARN and SNS Message Type.
396

397
    If an error is detected, the return is a dictionary of error details.
398
    If no error is detected, the return is None.
399
    """
400
    if not topic_arn:
1✔
401
        error = "Received SNS request without Topic ARN."
1✔
402
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
403
        error = "Received SNS message for wrong topic."
1✔
404
    elif not message_type:
1✔
405
        error = "Received SNS request without Message Type."
1✔
406
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
407
        error = "Received SNS message for unsupported Type."
1✔
408
    else:
409
        error = None
1✔
410

411
    if error:
1✔
412
        return {
1✔
413
            "error": error,
414
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
415
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
416
            "received_sns_type": (
417
                shlex.quote(message_type) if message_type else message_type
418
            ),
419
            "supported_sns_types": SUPPORTED_SNS_TYPES,
420
        }
421
    return None
1✔
422

423

424
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
425
    if message_type == "SubscriptionConfirmation":
×
426
        info_logger.info(
×
427
            "SNS SubscriptionConfirmation",
428
            extra={"SubscribeURL": json_body["SubscribeURL"]},
429
        )
430
        return HttpResponse("Logged SubscribeURL", status=200)
×
431
    if message_type == "Notification":
×
432
        incr_if_enabled("sns_inbound_Notification", 1)
×
433
        return _sns_notification(json_body)
×
434

435
    logger.error(
×
436
        "SNS message type did not fall under the SNS inbound logic",
437
        extra={"message_type": shlex.quote(message_type)},
438
    )
439
    capture_message(
×
440
        "Received SNS message with type not handled in inbound log",
441
        level="error",
442
        stack=True,
443
    )
444
    return HttpResponse(
×
445
        "Received SNS message with type not handled in inbound log", status=400
446
    )
447

448

449
def _sns_notification(json_body):
1✔
450
    try:
1✔
451
        message_json = json.loads(json_body["Message"])
1✔
452
    except JSONDecodeError:
1✔
453
        logger.error(
1✔
454
            "SNS notification has non-JSON message body",
455
            extra={"content": shlex.quote(json_body["Message"])},
456
        )
457
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
458

459
    event_type = message_json.get("eventType")
1✔
460
    notification_type = message_json.get("notificationType")
1✔
461
    if notification_type not in {
1✔
462
        "Complaint",
463
        "Received",
464
        "Bounce",
465
    } and event_type not in {"Complaint", "Bounce"}:
466
        logger.error(
1✔
467
            "SNS notification for unsupported type",
468
            extra={
469
                "notification_type": shlex.quote(notification_type),
470
                "event_type": shlex.quote(event_type),
471
                "keys": [shlex.quote(key) for key in message_json.keys()],
472
            },
473
        )
474
        return HttpResponse(
1✔
475
            (
476
                "Received SNS notification for unsupported Type: "
477
                f"{html.escape(shlex.quote(notification_type))}"
478
            ),
479
            status=400,
480
        )
481
    response = _sns_message(message_json)
1✔
482
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
483
    if response.status_code < 500:
1✔
484
        remove_message_from_s3(bucket, object_key)
1✔
485

486
    return response
1✔
487

488

489
def _get_recipient_with_relay_domain(recipients):
1✔
490
    domains_to_check = get_domains_from_settings().values()
1✔
491
    for recipient in recipients:
1✔
492
        for domain in domains_to_check:
1✔
493
            if domain in recipient:
1✔
494
                return recipient
1✔
495
    return None
1✔
496

497

498
def _get_relay_recipient_from_message_json(message_json):
1✔
499
    # Go thru all To, Cc, and Bcc fields and
500
    # return the one that has a Relay domain
501

502
    # First check common headers for to or cc match
503
    headers_to_check = "to", "cc"
1✔
504
    common_headers = message_json["mail"]["commonHeaders"]
1✔
505
    for header in headers_to_check:
1✔
506
        if header in common_headers:
1✔
507
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
508
            if recipient is not None:
1✔
509
                return parseaddr(recipient)[1]
1✔
510

511
    # SES-SNS sends bcc in a different part of the message
512
    recipients = message_json["receipt"]["recipients"]
1✔
513
    return _get_recipient_with_relay_domain(recipients)
1✔
514

515

516
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
517
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
518
    notification_type = message_json.get("notificationType")
1✔
519
    event_type = message_json.get("eventType")
1✔
520
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
521
        return _handle_bounce(message_json)
1✔
522
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
523
        return _handle_complaint(message_json)
1✔
524
    if notification_type != "Received":
1!
525
        raise ValueError('notification_type must be "Received"')
×
526
    if event_type is not None:
1!
527
        raise ValueError("event_type must be None")
×
528
    return _handle_received(message_json)
1✔
529

530

531
# Enumerate the reasons that an email was not forwarded.
532
# This excludes emails dropped due to mask forwarding settings,
533
# such as "block all" and "block promotional". Those are logged
534
# as Glean email_blocked events.
535
EmailDroppedReason = Literal[
1✔
536
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
537
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
538
    "hard_bounce_pause",  # The user recently had a hard bounce
539
    "soft_bounce_pause",  # The user recently has a soft bounce
540
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
541
    "user_deactivated",  # The user account is deactivated
542
    "reply_requires_premium",  # The email is a reply from a free user
543
    "content_missing",  # Could not load the email from storage
544
    "error_from_header",  # Error generating the From: header, retryable
545
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
546
    "error_sending",  # Error sending the forwarded email (SES), retryable
547
]
548

549

550
def log_email_dropped(
1✔
551
    reason: EmailDroppedReason,
552
    mask: RelayAddress | DomainAddress,
553
    is_reply: bool = False,
554
    can_retry: bool = False,
555
) -> None:
556
    """
557
    Log that an email was dropped for a reason other than a mask blocking setting.
558

559
    This mirrors the interface of glean_logger().log_email_blocked(), which
560
    records emails dropped due to the mask's blocking setting.
561
    """
562
    extra: dict[str, str | int | bool] = {"reason": reason}
1✔
563
    if mask.user.profile.metrics_enabled:
1✔
564
        if mask.user.profile.fxa is not None:
1✔
565
            extra["fxa_id"] = mask.user.profile.fxa.uid
1✔
566
        extra["mask_id"] = mask.metrics_id
1✔
567
    extra |= {
1✔
568
        "is_random_mask": isinstance(mask, RelayAddress),
569
        "is_reply": is_reply,
570
        "can_retry": can_retry,
571
    }
572
    info_logger.info("email_dropped", extra=extra)
1✔
573

574

575
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
576
    """
577
    Handle an AWS SES received notification.
578

579
    For more information, see:
580
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
581
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
582

583
    Returns (may be incomplete):
584
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
585
      flagged for abuse, the email is under a bounce pause, the email was suppressed
586
      for spam, the list email was blocked, or the noreply address was the recipient.
587
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
588
      the email failed DMARC with reject policy, or the email is a reply chain to a
589
      non-premium user.
590
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
591
      "CC", or "BCC" fields, or the Relay address is not in the database.
592
    * 503 if the "From" address is malformed, the S3 client returned an error different
593
      from "not found", or the SES client fails
594

595
    And many other returns conditions if the email is a reply. The HTTP returns are an
596
    artifact from an earlier time when emails were sent to a webhook. Currently,
597
    production instead pulls events from a queue.
598

599
    TODO: Return a more appropriate status object
600
    TODO: Document the metrics emitted
601
    """
602
    mail = message_json["mail"]
1✔
603
    if "commonHeaders" not in mail:
1✔
604
        logger.error("SNS message without commonHeaders")
1✔
605
        return HttpResponse(
1✔
606
            "Received SNS notification without commonHeaders.", status=400
607
        )
608
    common_headers = mail["commonHeaders"]
1✔
609
    receipt = message_json["receipt"]
1✔
610

611
    _record_receipt_verdicts(receipt, "all")
1✔
612
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
613
    if to_address is None:
1✔
614
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
615
        return HttpResponse("Address does not exist", status=404)
1✔
616

617
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
618
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
619
    if not from_addresses:
1✔
620
        info_logger.error(
1✔
621
            "_handle_received: no from address",
622
            extra={
623
                "source": mail["source"],
624
                "common_headers_from": common_headers["from"],
625
            },
626
        )
627
        return HttpResponse("Unable to parse From address", status=400)
1✔
628
    from_address = from_addresses[0][1]
1✔
629

630
    try:
1✔
631
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
632
    except ValueError:
1✔
633
        # TODO: Add metric
634
        return HttpResponse("Malformed to field.", status=400)
1✔
635

636
    if to_local_portion.lower() == "noreply":
1✔
637
        incr_if_enabled("email_for_noreply_address", 1)
1✔
638
        return HttpResponse("noreply address is not supported.")
1✔
639
    try:
1✔
640
        # FIXME: this ambiguous return of either
641
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
642
        # up a bit.
643
        address = _get_address(to_address)
1✔
644
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
645
        user_profile = address.user.profile
1✔
646
    except (
1✔
647
        ObjectDoesNotExist,
648
        CannotMakeAddressException,
649
        DeletedAddress.MultipleObjectsReturned,
650
    ):
651
        if to_local_portion.lower() == "replies":
1✔
652
            response = _handle_reply(from_address, message_json, to_address)
1✔
653
        else:
654
            response = HttpResponse("Address does not exist", status=404)
1✔
655
        return response
1✔
656

657
    _record_receipt_verdicts(receipt, "valid_user")
1✔
658
    # if this is spam and the user is set to auto-block spam, early return
659
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
660
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
661
        log_email_dropped(reason="auto_block_spam", mask=address)
1✔
662
        return HttpResponse("Address rejects spam.")
1✔
663

664
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
665
        policy = receipt.get("dmarcPolicy", "none")
1✔
666
        # TODO: determine action on dmarcPolicy "quarantine"
667
        if policy == "reject":
1!
668
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
1✔
669
            incr_if_enabled(
1✔
670
                "email_suppressed_for_dmarc_failure",
671
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
672
            )
673
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
674

675
    # if this user is over bounce limits, early return
676
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
677
    if bounce_paused:
1✔
678
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
679
        incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
1✔
680
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
681
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
682
        )
683
        log_email_dropped(reason=reason, mask=address)
1✔
684
        return HttpResponse("Address is temporarily disabled.")
1✔
685

686
    # check if this is a reply from an external sender to a Relay user
687
    try:
1✔
688
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
689
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
690
        user_address = address
1✔
691
        address = reply_record.address
1✔
692
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
693
        # make sure the relay user is premium
694
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
695
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
1✔
696
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
697
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
698
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
699
        # match a Reply record, continue to treat this as a regular email from
700
        # an external sender to a relay user
701
        pass
1✔
702

703
    # if account flagged for abuse, early return
704
    if user_profile.is_flagged:
1✔
705
        log_email_dropped(reason="abuse_flag", mask=address)
1✔
706
        return HttpResponse("Address is temporarily disabled.")
1✔
707

708
    if not user_profile.user.is_active:
1✔
709
        log_email_dropped(reason="user_deactivated", mask=address)
1✔
710
        return HttpResponse("Account is deactivated.")
1✔
711

712
    # if address is set to block, early return
713
    if not address.enabled:
1✔
714
        incr_if_enabled("email_for_disabled_address", 1)
1✔
715
        address.num_blocked += 1
1✔
716
        address.save(update_fields=["num_blocked"])
1✔
717
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
718
        user_profile.last_engagement = datetime.now(UTC)
1✔
719
        user_profile.save()
1✔
720
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
721
        return HttpResponse("Address is temporarily disabled.")
1✔
722

723
    _record_receipt_verdicts(receipt, "active_alias")
1✔
724
    incr_if_enabled("email_for_active_address", 1)
1✔
725

726
    # if address is blocking list emails, and email is from list, early return
727
    if (
1✔
728
        address
729
        and address.block_list_emails
730
        and user_profile.has_premium
731
        and _check_email_from_list(mail["headers"])
732
    ):
733
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
734
        address.num_blocked += 1
1✔
735
        address.save(update_fields=["num_blocked"])
1✔
736
        user_profile.last_engagement = datetime.now(UTC)
1✔
737
        user_profile.save()
1✔
738
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
739
        return HttpResponse("Address is not accepting list emails.")
1✔
740

741
    # Collect new headers
742
    subject = common_headers.get("subject", "")
1✔
743
    destination_address = user_profile.user.email
1✔
744
    reply_address = get_reply_to_address()
1✔
745
    try:
1✔
746
        from_header = generate_from_header(from_address, to_address)
1✔
747
    except InvalidFromHeader:
1✔
748
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
749
        header_from = []
1✔
750
        for header in mail["headers"]:
1✔
751
            if header["name"].lower() == "from":
1✔
752
                header_from.append(header)
1✔
753
        info_logger.error(
1✔
754
            "generate_from_header",
755
            extra={
756
                "from_address": from_address,
757
                "source": mail["source"],
758
                "common_headers_from": common_headers["from"],
759
                "headers_from": header_from,
760
            },
761
        )
762
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
1✔
763
        return HttpResponse("Cannot parse the From address", status=503)
1✔
764

765
    # Get incoming email
766
    try:
1✔
767
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
768
    except ClientError as e:
1✔
769
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
770
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
771
            log_email_dropped(reason="content_missing", mask=address)
1✔
772
            return HttpResponse("Email not in S3", status=404)
1✔
773
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
774
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
1✔
775
        # we are returning a 503 so that SNS can retry the email processing
776
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
777

778
    # Handle developer overrides, logging
779
    dev_action = _get_developer_mode_action(address)
1✔
780
    if dev_action:
1✔
781
        if dev_action.new_destination_address:
1!
782
            destination_address = dev_action.new_destination_address
1✔
783
        _log_dev_notification(
1✔
784
            "_handle_received: developer_mode", dev_action, message_json
785
        )
786

787
    # Convert to new email
788
    headers: OutgoingHeaders = {
1✔
789
        "Subject": subject,
790
        "From": from_header,
791
        "To": destination_address,
792
        "Reply-To": reply_address,
793
        "Resent-From": from_address,
794
    }
795
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
796
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
797
    remove_level_one_trackers = bool(
1✔
798
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
799
    )
800
    (
1✔
801
        forwarded_email,
802
        issues,
803
        level_one_trackers_removed,
804
        has_html,
805
        has_text,
806
    ) = _convert_to_forwarded_email(
807
        incoming_email_bytes=incoming_email_bytes,
808
        headers=headers,
809
        to_address=to_address,
810
        from_address=from_address,
811
        language=user_profile.language,
812
        has_premium=user_profile.has_premium,
813
        sample_trackers=sample_trackers,
814
        remove_level_one_trackers=remove_level_one_trackers,
815
    )
816
    if has_html:
1✔
817
        incr_if_enabled("email_with_html_content", 1)
1✔
818
    if has_text:
1✔
819
        incr_if_enabled("email_with_text_content", 1)
1✔
820
    if issues:
1✔
821
        info_logger.info(
1✔
822
            "_handle_received: forwarding issues", extra={"issues": issues}
823
        )
824

825
    # Send new email
826
    try:
1✔
827
        ses_response = ses_send_raw_email(
1✔
828
            source_address=reply_address,
829
            destination_address=destination_address,
830
            message=forwarded_email,
831
        )
832
    except ClientError:
1✔
833
        # 503 service unavailable response to SNS so it can retry
834
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
1✔
835
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
836

837
    message_id = ses_response["MessageId"]
1✔
838
    _store_reply_record(mail, message_id, address)
1✔
839

840
    user_profile.update_abuse_metric(
1✔
841
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
842
    )
843
    user_profile.last_engagement = datetime.now(UTC)
1✔
844
    user_profile.save()
1✔
845
    address.num_forwarded += 1
1✔
846
    address.last_used_at = datetime.now(UTC)
1✔
847
    if level_one_trackers_removed:
1!
848
        address.num_level_one_trackers_blocked = (
×
849
            address.num_level_one_trackers_blocked or 0
850
        ) + level_one_trackers_removed
851
    address.save(
1✔
852
        update_fields=[
853
            "num_forwarded",
854
            "last_used_at",
855
            "block_list_emails",
856
            "num_level_one_trackers_blocked",
857
        ]
858
    )
859
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
860
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
861

862

863
class DeveloperModeAction(NamedTuple):
1✔
864
    mask_id: str
1✔
865
    action: Literal["log", "simulate_complaint"] = "log"
1✔
866
    new_destination_address: str | None = None
1✔
867

868

869
def _get_verdict(receipt, verdict_type):
1✔
870
    return receipt[f"{verdict_type}Verdict"]["status"]
1✔
871

872

873
def _check_email_from_list(headers):
1✔
874
    for header in headers:
1!
875
        if header["name"].lower().startswith("list-"):
1!
876
            return True
1✔
877
    return False
×
878

879

880
def _record_receipt_verdicts(receipt, state):
1✔
881
    verdict_tags = []
1✔
882
    for key in sorted(receipt.keys()):
1✔
883
        if key.endswith("Verdict"):
1✔
884
            value = receipt[key]["status"]
1✔
885
            verdict_tags.append(f"{key}:{value}")
1✔
886
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
887
        elif key == "dmarcPolicy":
1✔
888
            value = receipt[key]
1✔
889
            verdict_tags.append(f"{key}:{value}")
1✔
890
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
891

892

893
def _get_message_id_from_headers(headers):
1✔
894
    message_id = None
1✔
895
    for header in headers:
1✔
896
        if header["name"].lower() == "message-id":
1✔
897
            message_id = header["value"]
1✔
898
    return message_id
1✔
899

900

901
def _get_keys_from_headers(headers):
1✔
902
    in_reply_to = None
1✔
903
    for header in headers:
1✔
904
        if header["name"].lower() == "in-reply-to":
1✔
905
            in_reply_to = header["value"]
1✔
906
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
907
            return derive_reply_keys(message_id_bytes)
1✔
908

909
        if header["name"].lower() == "references":
1✔
910
            message_ids = header["value"]
1✔
911
            for message_id in message_ids.split(" "):
1✔
912
                message_id_bytes = get_message_id_bytes(message_id)
1✔
913
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
914
                try:
1✔
915
                    # FIXME: calling code is likely to duplicate this query
916
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
917
                    return lookup_key, encryption_key
1✔
918
                except Reply.DoesNotExist:
1✔
919
                    pass
1✔
920
            raise Reply.DoesNotExist
1✔
921
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
922
    raise ReplyHeadersNotFound
1✔
923

924

925
def _get_reply_record_from_lookup_key(lookup_key):
1✔
926
    lookup = b64_lookup_key(lookup_key)
1✔
927
    return Reply.objects.get(lookup=lookup)
1✔
928

929

930
def _strip_localpart_tag(address):
1✔
931
    [localpart, domain] = address.split("@")
1✔
932
    subaddress_parts = localpart.split("+")
1✔
933
    return f"{subaddress_parts[0]}@{domain}"
1✔
934

935

936
_TransportType = Literal["sns", "s3"]
1✔
937

938

939
def _get_email_bytes(
1✔
940
    message_json: AWS_SNSMessageJSON,
941
) -> tuple[bytes, _TransportType, float]:
942
    with Timer(logger=None) as load_timer:
1✔
943
        if "content" in message_json:
1✔
944
            # email content in sns message
945
            message_content = message_json["content"].encode("utf-8")
1✔
946
            transport: Literal["sns", "s3"] = "sns"
1✔
947
        else:
948
            # assume email content in S3
949
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
950
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
951
            transport = "s3"
1✔
952
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
953
    load_time_s = round(load_timer.last, 3)
1✔
954
    return (message_content, transport, load_time_s)
1✔
955

956

957
def _get_developer_mode_action(
1✔
958
    mask: RelayAddress | DomainAddress,
959
) -> DeveloperModeAction | None:
960
    """Get the developer mode actions for this mask, if enabled."""
961

962
    flag_name = "developer_mode"
1✔
963
    _, _ = get_waffle_flag_model().objects.get_or_create(
1✔
964
        name=flag_name,
965
        defaults={
966
            "note": "MPP-3932: Enable logging and overrides for Relay developers."
967
        },
968
    )
969

970
    if not (
1✔
971
        flag_is_active_in_task(flag_name, mask.user) and "DEV:" in mask.description
972
    ):
973
        return None
1✔
974

975
    if "DEV:simulate_complaint" in mask.description:
1!
976
        action = DeveloperModeAction(
1✔
977
            mask_id=mask.metrics_id,
978
            action="simulate_complaint",
979
            new_destination_address=f"complaint+{mask.address}@simulator.amazonses.com",
980
        )
981
    else:
NEW
982
        action = DeveloperModeAction(mask_id=mask.metrics_id, action="log")
×
983
    return action
1✔
984

985

986
def _log_dev_notification(
1✔
987
    log_message: str, dev_action: DeveloperModeAction, notification: dict[str, Any]
988
) -> None:
989
    """
990
    Log notification JSON
991

992
    This will log information beyond our privacy policy, so it should only be used on
993
    Relay staff accounts with prior permission.
994

995
    The notification JSON will be compressed, Ascii85-encoded with padding, and broken
996
    into 1024-bytes chunks. This will ensure it fits into GCP's log entry, which has a
997
    64KB limit per label value.
998
    """
999

1000
    notification_gza85 = encode_dict_gza85(notification)
1✔
1001
    total_parts = notification_gza85.count("\n") + 1
1✔
1002
    for partnum, part in enumerate(notification_gza85.splitlines()):
1✔
1003
        info_logger.info(
1✔
1004
            log_message,
1005
            extra={
1006
                "mask_id": dev_action.mask_id,
1007
                "dev_action": dev_action.action,
1008
                "part": partnum,
1009
                "parts": total_parts,
1010
                "notification_gza85": part,
1011
            },
1012
        )
1013

1014

1015
def _convert_to_forwarded_email(
1✔
1016
    incoming_email_bytes: bytes,
1017
    headers: OutgoingHeaders,
1018
    to_address: str,
1019
    from_address: str,
1020
    language: str,
1021
    has_premium: bool,
1022
    sample_trackers: bool,
1023
    remove_level_one_trackers: bool,
1024
    now: datetime | None = None,
1025
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
1026
    """
1027
    Convert an email (as bytes) to a forwarded email.
1028

1029
    Return is a tuple:
1030
    - email - The forwarded email
1031
    - issues - Any detected issues in conversion
1032
    - level_one_trackers_removed (int) - Number of trackers removed
1033
    - has_html - True if the email has an HTML representation
1034
    - has_text - True if the email has a plain text representation
1035
    """
1036
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
1037
    # python/typeshed issue 2418
1038
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
1039
    # policy.default.message_factory is EmailMessage
1040
    if not isinstance(email, EmailMessage):
1!
1041
        raise TypeError("email must be type EmailMessage")
×
1042

1043
    # Replace headers in the original email
1044
    header_issues = _replace_headers(email, headers)
1✔
1045

1046
    # Find and replace text content
1047
    text_body = email.get_body("plain")
1✔
1048
    text_content = None
1✔
1049
    has_text = False
1✔
1050
    if text_body:
1✔
1051
        has_text = True
1✔
1052
        if not isinstance(text_body, EmailMessage):
1!
1053
            raise TypeError("text_body must be type EmailMessage")
×
1054
        text_content = text_body.get_content()
1✔
1055
        new_text_content = _convert_text_content(text_content, to_address)
1✔
1056
        text_body.set_content(new_text_content)
1✔
1057

1058
    # Find and replace HTML content
1059
    html_body = email.get_body("html")
1✔
1060
    level_one_trackers_removed = 0
1✔
1061
    has_html = False
1✔
1062
    if html_body:
1✔
1063
        has_html = True
1✔
1064
        if not isinstance(html_body, EmailMessage):
1!
1065
            raise TypeError("html_body must be type EmailMessage")
×
1066
        html_content = html_body.get_content()
1✔
1067
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1068
            html_content,
1069
            to_address,
1070
            from_address,
1071
            language,
1072
            has_premium,
1073
            sample_trackers,
1074
            remove_level_one_trackers,
1075
        )
1076
        html_body.set_content(new_content, subtype="html")
1✔
1077
    elif text_content:
1!
1078
        # Try to use the text content to generate HTML content
1079
        html_content = urlize_and_linebreaks(text_content)
1✔
1080
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1081
            html_content,
1082
            to_address,
1083
            from_address,
1084
            language,
1085
            has_premium,
1086
            sample_trackers,
1087
            remove_level_one_trackers,
1088
        )
1089
        if not isinstance(text_body, EmailMessage):
1!
1090
            raise TypeError("text_body must be type EmailMessage")
×
1091
        try:
1✔
1092
            text_body.add_alternative(new_content, subtype="html")
1✔
1093
        except TypeError as e:
×
1094
            out = StringIO()
×
1095
            _structure(email, fp=out)
×
1096
            info_logger.error(
×
1097
                "Adding HTML alternate failed",
1098
                extra={"exception": str(e), "structure": out.getvalue()},
1099
            )
1100

1101
    issues: EmailForwardingIssues = {}
1✔
1102
    if header_issues:
1✔
1103
        issues["headers"] = header_issues
1✔
1104
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
1105

1106

1107
def _replace_headers(
1✔
1108
    email: EmailMessage, headers: OutgoingHeaders
1109
) -> EmailHeaderIssues:
1110
    """
1111
    Replace the headers in email with new headers.
1112

1113
    This replaces headers in the passed email object, rather than returns an altered
1114
    copy. The primary reason is that the Python email package can read an email with
1115
    non-compliant headers or content, but can't write it. A read/write is required to
1116
    create a copy that we then alter. This code instead alters the passed EmailMessage
1117
    object, making header-specific changes in try / except statements.
1118

1119
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1120
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1121
    nice to handle the non-compliant headers without crashing before we add a source of
1122
    memory-related crashes.
1123
    """
1124
    # Look for headers to drop
1125
    to_drop: list[str] = []
1✔
1126
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
1✔
1127
    issues: EmailHeaderIssues = []
1✔
1128

1129
    # Detect non-compliant headers in incoming emails
1130
    for header in email.keys():
1✔
1131
        try:
1✔
1132
            value = email[header]
1✔
1133
        except Exception as e:
1✔
1134
            issues.append(
1✔
1135
                {"header": header, "direction": "in", "exception_on_read": repr(e)}
1136
            )
1137
            value = None
1✔
1138
        if getattr(value, "defects", None):
1✔
1139
            issues.append(
1✔
1140
                {
1141
                    "header": header,
1142
                    "direction": "in",
1143
                    "defect_count": len(value.defects),
1144
                    "parsed_value": str(value),
1145
                    "raw_value": str(value.as_raw),
1146
                }
1147
            )
1148
        elif getattr(getattr(value, "_parse_tree", None), "all_defects", []):
1✔
1149
            issues.append(
1✔
1150
                {
1151
                    "header": header,
1152
                    "direction": "in",
1153
                    "defect_count": len(value._parse_tree.all_defects),
1154
                    "parsed_value": str(value),
1155
                    "raw_value": str(value.as_raw),
1156
                }
1157
            )
1158

1159
    # Collect headers that will not be forwarded
1160
    for header in email.keys():
1✔
1161
        header_lower = header.lower()
1✔
1162
        if (
1✔
1163
            header_lower not in replacements
1164
            and header_lower != "mime-version"
1165
            and not header_lower.startswith("content-")
1166
        ):
1167
            to_drop.append(header)
1✔
1168

1169
    # Drop headers that should be dropped
1170
    for header in to_drop:
1✔
1171
        del email[header]
1✔
1172

1173
    # Replace the requested headers
1174
    for header, value in headers.items():
1✔
1175
        del email[header]
1✔
1176
        try:
1✔
1177
            email[header] = value.rstrip("\r\n")
1✔
1178
        except Exception as e:
×
1179
            issues.append(
×
1180
                {
1181
                    "header": header,
1182
                    "direction": "out",
1183
                    "exception_on_write": repr(e),
1184
                    "value": value,
1185
                }
1186
            )
1187
            continue
×
1188
        try:
1✔
1189
            parsed_value = email[header]
1✔
1190
        except Exception as e:
×
1191
            issues.append(
×
1192
                {
1193
                    "header": header,
1194
                    "direction": "out",
1195
                    "exception_on_write": repr(e),
1196
                    "value": value,
1197
                }
1198
            )
1199
            continue
×
1200
        if parsed_value.defects:
1!
1201
            issues.append(
×
1202
                {
1203
                    "header": header,
1204
                    "direction": "out",
1205
                    "defect_count": len(parsed_value.defects),
1206
                    "parsed_value": str(parsed_value),
1207
                    "raw_value": str(parsed_value.as_raw),
1208
                },
1209
            )
1210

1211
    return issues
1✔
1212

1213

1214
def _convert_html_content(
1✔
1215
    html_content: str,
1216
    to_address: str,
1217
    from_address: str,
1218
    language: str,
1219
    has_premium: bool,
1220
    sample_trackers: bool,
1221
    remove_level_one_trackers: bool,
1222
    now: datetime | None = None,
1223
) -> tuple[str, int]:
1224
    # frontend expects a timestamp in milliseconds
1225
    now = now or datetime.now(UTC)
1✔
1226
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1227

1228
    # scramble alias so that clients don't recognize it
1229
    # and apply default link styles
1230
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1231

1232
    # sample tracker numbers
1233
    if sample_trackers:
1!
1234
        count_all_trackers(html_content)
×
1235

1236
    tracker_report_link = ""
1✔
1237
    removed_count = 0
1✔
1238
    if remove_level_one_trackers:
1!
1239
        html_content, tracker_details = remove_trackers(
×
1240
            html_content, from_address, datetime_now_ms
1241
        )
1242
        removed_count = tracker_details["tracker_removed"]
×
1243
        tracker_report_details = {
×
1244
            "sender": from_address,
1245
            "received_at": datetime_now_ms,
1246
            "trackers": tracker_details["level_one"]["trackers"],
1247
        }
1248
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1249
            tracker_report_details
1250
        )
1251

1252
    wrapped_html = wrap_html_email(
1✔
1253
        original_html=html_content,
1254
        language=language,
1255
        has_premium=has_premium,
1256
        display_email=display_email,
1257
        tracker_report_link=tracker_report_link,
1258
        num_level_one_email_trackers_removed=removed_count,
1259
    )
1260
    return wrapped_html, removed_count
1✔
1261

1262

1263
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1264
    relay_header_text = (
1✔
1265
        "This email was sent to your alias "
1266
        f"{to_address}. To stop receiving emails sent to this alias, "
1267
        "update the forwarding settings in your dashboard.\n"
1268
        "---Begin Email---\n"
1269
    )
1270
    wrapped_text = relay_header_text + text_content
1✔
1271
    return wrapped_text
1✔
1272

1273

1274
def _build_reply_requires_premium_email(
1✔
1275
    from_address: str,
1276
    reply_record: Reply,
1277
    message_id: str | None,
1278
    decrypted_metadata: dict[str, Any] | None,
1279
) -> EmailMessage:
1280
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1281
    # will forward.  So, tell the user we forwarded it.
1282
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1283
    sender: str | None = ""
1✔
1284
    if decrypted_metadata is not None:
1!
1285
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1286
    ctx = {
1✔
1287
        "sender": sender or "",
1288
        "forwarded": forwarded,
1289
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1290
    }
1291
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1292
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1293

1294
    # Create the message
1295
    msg = EmailMessage()
1✔
1296
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1297
    msg["From"] = get_reply_to_address()
1✔
1298
    msg["To"] = from_address
1✔
1299
    if message_id:
1!
1300
        msg["In-Reply-To"] = message_id
1✔
1301
        msg["References"] = message_id
1✔
1302
    msg.set_content(text_body)
1✔
1303
    msg.add_alternative(html_body, subtype="html")
1✔
1304
    return msg
1✔
1305

1306

1307
def _set_forwarded_first_reply(profile):
1✔
1308
    profile.forwarded_first_reply = True
1✔
1309
    profile.save()
1✔
1310

1311

1312
def _send_reply_requires_premium_email(
1✔
1313
    from_address: str,
1314
    reply_record: Reply,
1315
    message_id: str | None,
1316
    decrypted_metadata: dict[str, Any] | None,
1317
) -> None:
1318
    msg = _build_reply_requires_premium_email(
×
1319
        from_address, reply_record, message_id, decrypted_metadata
1320
    )
1321
    try:
×
1322
        ses_send_raw_email(
×
1323
            source_address=get_reply_to_address(premium=False),
1324
            destination_address=from_address,
1325
            message=msg,
1326
        )
1327
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1328
        # So, updated the DB.
1329
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1330
    except ClientError as e:
×
1331
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1332
    incr_if_enabled("free_user_reply_attempt", 1)
×
1333

1334

1335
def _reply_allowed(
1✔
1336
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1337
):
1338
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1339
    reply_record_email = reply_record.address.user.email
1✔
1340
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1341
    if (from_address == reply_record_email) or (
1!
1342
        stripped_from_address == stripped_reply_record_address
1343
    ):
1344
        # This is a Relay user replying to an external sender;
1345

1346
        if not reply_record.profile.user.is_active:
1!
1347
            return False
×
1348

1349
        if reply_record.profile.is_flagged:
1!
1350
            return False
×
1351

1352
        if reply_record.owner_has_premium:
1!
1353
            return True
1✔
1354

1355
        # if we haven't forwarded a first reply for this user, return True to allow
1356
        # this first reply
1357
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
1358
        _send_reply_requires_premium_email(
×
1359
            from_address, reply_record, message_id, decrypted_metadata
1360
        )
1361
        return allow_first_reply
×
1362
    else:
1363
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1364
        # premium Relay user
1365
        try:
×
1366
            address = _get_address(to_address)
×
1367
            if address.user.profile.has_premium:
×
1368
                return True
×
1369
        except ObjectDoesNotExist:
×
1370
            return False
×
1371
    incr_if_enabled("free_user_reply_attempt", 1)
×
1372
    return False
×
1373

1374

1375
def _handle_reply(
1✔
1376
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1377
) -> HttpResponse:
1378
    """
1379
    Handle a reply from a Relay user to an external email.
1380

1381
    Returns (may be incomplete):
1382
    * 200 if the reply was sent
1383
    * 400 if the In-Reply-To and References headers are missing, none of the References
1384
      headers are a reply record, or the SES client raises an error
1385
    * 403 if the Relay user is not allowed to reply
1386
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1387
      the database
1388
    * 503 if the S3 client returns an error (other than not found), or the SES client
1389
      returns an error
1390

1391
    TODO: Return a more appropriate status object (see _handle_received)
1392
    TODO: Document metrics emitted
1393
    """
1394
    mail = message_json["mail"]
1✔
1395
    try:
1✔
1396
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1397
    except ReplyHeadersNotFound:
1✔
1398
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1399
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1400

1401
    try:
1✔
1402
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1403
    except Reply.DoesNotExist:
1✔
1404
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1405
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1406

1407
    address = reply_record.address
1✔
1408
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1409
    decrypted_metadata = json.loads(
1✔
1410
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1411
    )
1412
    if not _reply_allowed(
1✔
1413
        from_address, to_address, reply_record, message_id, decrypted_metadata
1414
    ):
1415
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
1✔
1416
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1417

1418
    outbound_from_address = address.full_address
1✔
1419
    incr_if_enabled("reply_email", 1)
1✔
1420
    subject = mail["commonHeaders"].get("subject", "")
1✔
1421
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1422
    headers: OutgoingHeaders = {
1✔
1423
        "Subject": subject,
1424
        "From": outbound_from_address,
1425
        "To": to_address,
1426
        "Reply-To": outbound_from_address,
1427
    }
1428

1429
    try:
1✔
1430
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1431
    except ClientError as e:
1✔
1432
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1433
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1434
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
1✔
1435
            return HttpResponse("Email not in S3", status=404)
1✔
1436
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1437
        log_email_dropped(
1✔
1438
            reason="error_storage", mask=address, is_reply=True, can_retry=True
1439
        )
1440
        # we are returning a 500 so that SNS can retry the email processing
1441
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1442

1443
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1444
    if not isinstance(email, EmailMessage):
1!
1445
        raise TypeError("email must be type EmailMessage")
×
1446

1447
    # Convert to a reply email
1448
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1449
    _replace_headers(email, headers)
1✔
1450

1451
    try:
1✔
1452
        ses_send_raw_email(
1✔
1453
            source_address=outbound_from_address,
1454
            destination_address=to_address,
1455
            message=email,
1456
        )
1457
    except ClientError:
1✔
1458
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
1✔
1459
        return HttpResponse("SES client error", status=400)
1✔
1460

1461
    reply_record.increment_num_replied()
1✔
1462
    profile = address.user.profile
1✔
1463
    profile.update_abuse_metric(replied=True)
1✔
1464
    profile.last_engagement = datetime.now(UTC)
1✔
1465
    profile.save()
1✔
1466
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1467
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1468

1469

1470
def _get_domain_address(
1✔
1471
    local_portion: str, domain_portion: str, create: bool = True
1472
) -> DomainAddress:
1473
    """
1474
    Find or create the DomainAddress for the parts of an email address.
1475

1476
    If the domain_portion is for a valid subdomain, a new DomainAddress
1477
    will be created and returned.
1478

1479
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1480

1481
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1482
    """
1483

1484
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1485
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1486
        incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1487
        raise ObjectDoesNotExist("Address does not exist")
1✔
1488
    try:
1✔
1489
        with transaction.atomic():
1✔
1490
            locked_profile = Profile.objects.select_for_update().get(
1✔
1491
                subdomain=address_subdomain
1492
            )
1493
            domain_numerical = get_domain_numerical(address_domain)
1✔
1494
            # filter DomainAddress because it may not exist
1495
            # which will throw an error with get()
1496
            domain_address = DomainAddress.objects.filter(
1✔
1497
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1498
            ).first()
1499
            if domain_address is None:
1✔
1500
                if not create:
1!
1501
                    raise ObjectDoesNotExist("Address does not exist")
×
1502
                # TODO: Consider flows when a user generating alias on a fly
1503
                # was unable to receive an email due to user no longer being a
1504
                # premium user as seen in exception thrown on make_domain_address
1505
                domain_address = DomainAddress.make_domain_address(
1✔
1506
                    locked_profile.user, local_portion, True
1507
                )
1508
                glean_logger().log_email_mask_created(
1✔
1509
                    mask=domain_address,
1510
                    created_by_api=False,
1511
                )
1512
            domain_address.last_used_at = datetime.now(UTC)
1✔
1513
            domain_address.save()
1✔
1514
            return domain_address
1✔
1515
    except Profile.DoesNotExist as e:
1✔
1516
        incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1517
        raise e
1✔
1518

1519

1520
def _get_address(address: str, create: bool = True) -> RelayAddress | DomainAddress:
1✔
1521
    """
1522
    Find or create the RelayAddress or DomainAddress for an email address.
1523

1524
    If an unknown email address is for a valid subdomain, and create is True,
1525
    a new DomainAddress will be created.
1526

1527
    On failure, raises exception based on Django's ObjectDoesNotExist:
1528
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1529
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1530
    * ObjectDoesNotExist - Unknown domain
1531
    """
1532

1533
    local_portion, domain_portion = address.split("@")
1✔
1534
    local_address = local_portion.lower()
1✔
1535
    domain = domain_portion.lower()
1✔
1536

1537
    # if the domain is not the site's 'top' relay domain,
1538
    # it may be for a user's subdomain
1539
    email_domains = get_domains_from_settings().values()
1✔
1540
    if domain not in email_domains:
1✔
1541
        return _get_domain_address(local_address, domain)
1✔
1542

1543
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1544
    try:
1✔
1545
        domain_numerical = get_domain_numerical(domain)
1✔
1546
        relay_address = RelayAddress.objects.get(
1✔
1547
            address=local_address, domain=domain_numerical
1548
        )
1549
        return relay_address
1✔
1550
    except RelayAddress.DoesNotExist as e:
1✔
1551
        if not create:
1!
1552
            raise e
×
1553
        try:
1✔
1554
            DeletedAddress.objects.get(
1✔
1555
                address_hash=address_hash(local_address, domain=domain)
1556
            )
1557
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1558
            # TODO: create a hard bounce receipt rule in SES
1559
        except DeletedAddress.DoesNotExist:
1✔
1560
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1561
        except DeletedAddress.MultipleObjectsReturned:
1✔
1562
            # not sure why this happens on stage but let's handle it
1563
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1564
        raise e
1✔
1565

1566

1567
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1568
    """
1569
    Handle an AWS SES bounce notification.
1570

1571
    For more information, see:
1572
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1573

1574
    Returns:
1575
    * 404 response if any email address does not match a user,
1576
    * 200 response if all match or none are given
1577

1578
    Emits a counter metric "email_bounce" with these tags:
1579
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1580
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1581
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1582
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1583

1584
    Emits an info log "bounce_notification", same data as metric, plus:
1585
    * bounce_action: 'action' from bounced recipient data, or None
1586
    * bounce_status: 'status' from bounced recipient data, or None
1587
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1588
    * bounce_extra: Extra data from bounce_recipient data, if any
1589
    * domain: User's real email address domain, if an address was given
1590
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1591
    """
1592
    bounce = message_json.get("bounce", {})
1✔
1593
    bounce_type = bounce.get("bounceType", "none")
1✔
1594
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1595
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1596

1597
    now = datetime.now(UTC)
1✔
1598
    bounce_data = []
1✔
1599
    for recipient in bounced_recipients:
1✔
1600
        recipient_address = recipient.pop("emailAddress", None)
1✔
1601
        data = {
1✔
1602
            "bounce_type": bounce_type,
1603
            "bounce_subtype": bounce_subtype,
1604
            "bounce_action": recipient.pop("action", ""),
1605
            "bounce_status": recipient.pop("status", ""),
1606
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1607
            "user_match": "no_address",
1608
            "relay_action": "no_action",
1609
        }
1610
        if recipient:
1!
1611
            data["bounce_extra"] = recipient.copy()
×
1612
        bounce_data.append(data)
1✔
1613

1614
        if recipient_address is None:
1!
1615
            continue
×
1616

1617
        recipient_address = parseaddr(recipient_address)[1]
1✔
1618
        recipient_domain = recipient_address.split("@")[1]
1✔
1619
        data["domain"] = recipient_domain
1✔
1620

1621
        try:
1✔
1622
            user = User.objects.get(email=recipient_address)
1✔
1623
            profile = user.profile
1✔
1624
            data["user_match"] = "found"
1✔
1625
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1626
                data["fxa_id"] = fxa.uid
1✔
1627
            else:
1628
                data["fxa_id"] = ""
1✔
1629
        except User.DoesNotExist:
1✔
1630
            # TODO: handle bounce for a user who no longer exists
1631
            # add to SES account-wide suppression list?
1632
            data["user_match"] = "missing"
1✔
1633
            continue
1✔
1634

1635
        action = None
1✔
1636
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1637
            # if an email bounced as spam, set to auto block spam for this user
1638
            # and DON'T set them into bounce pause state
1639
            action = "auto_block_spam"
1✔
1640
            profile.auto_block_spam = True
1✔
1641
        elif bounce_type == "Permanent":
1✔
1642
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1643
            action = "hard_bounce"
1✔
1644
            profile.last_hard_bounce = now
1✔
1645
        elif bounce_type == "Transient":
1!
1646
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1647
            action = "soft_bounce"
1✔
1648
            profile.last_soft_bounce = now
1✔
1649
        if action:
1!
1650
            data["relay_action"] = action
1✔
1651
            profile.save()
1✔
1652

1653
    if not bounce_data:
1!
1654
        # Data when there are no identified recipients
1655
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1656

1657
    for data in bounce_data:
1✔
1658
        tags = {
1✔
1659
            "bounce_type": bounce_type,
1660
            "bounce_subtype": bounce_subtype,
1661
            "user_match": data["user_match"],
1662
            "relay_action": data["relay_action"],
1663
        }
1664
        incr_if_enabled(
1✔
1665
            "email_bounce",
1666
            1,
1667
            tags=[generate_tag(key, val) for key, val in tags.items()],
1668
        )
1669
        info_logger.info("bounce_notification", extra=data)
1✔
1670

1671
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1672
        return HttpResponse("Address does not exist", status=404)
1✔
1673
    return HttpResponse("OK", status=200)
1✔
1674

1675

1676
def _build_disabled_mask_for_spam_email(
1✔
1677
    mask: RelayAddress | DomainAddress, original_spam_email: dict
1678
) -> EmailMessage:
1679
    ctx = {
1✔
1680
        "mask": mask.full_address,
1681
        "spam_email": original_spam_email,
1682
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1683
    }
1684
    html_body = render_to_string("emails/disabled_mask_for_spam.html", ctx)
1✔
1685
    text_body = render_to_string("emails/disabled_mask_for_spam.txt", ctx)
1✔
1686

1687
    # Create the message
1688
    msg = EmailMessage()
1✔
1689
    msg["Subject"] = ftl_bundle.format("relay-disabled-your-mask")
1✔
1690
    msg["From"] = settings.RELAY_FROM_ADDRESS
1✔
1691
    msg["To"] = mask.user.email
1✔
1692
    msg.set_content(text_body)
1✔
1693
    msg.add_alternative(html_body, subtype="html")
1✔
1694
    return msg
1✔
1695

1696

1697
def _send_disabled_mask_for_spam_email(
1✔
1698
    mask: RelayAddress | DomainAddress, original_spam_email: dict
1699
) -> None:
1700
    msg = _build_disabled_mask_for_spam_email(mask, original_spam_email)
1✔
1701
    if not settings.RELAY_FROM_ADDRESS:
1!
1702
        raise ValueError(
×
1703
            "Must set settings.RELAY_FROM_ADDRESS to send disabled_mask_for_spam email."
1704
        )
1705
    try:
1✔
1706
        ses_send_raw_email(
1✔
1707
            source_address=settings.RELAY_FROM_ADDRESS,
1708
            destination_address=mask.user.email,
1709
            message=msg,
1710
        )
1711
    except ClientError as e:
×
1712
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1713
    incr_if_enabled("free_user_reply_attempt", 1)
1✔
1714

1715

1716
def _disable_masks_for_complaint(message_json: dict, user: User) -> None:
1✔
1717
    """
1718
    See https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#mail-object
1719

1720
    message_json.mail.source contains the envelope MAIL FROM address from which the
1721
    original message was sent.
1722

1723
    Relay sends emails From: "original-From" <relay-mask@mozmail.com>.
1724

1725
    So, we can find the mask that sent the spam email by parsing the source value.
1726
    """
1727
    flag_name = "disable_mask_on_complaint"
1✔
1728
    _, _ = get_waffle_flag_model().objects.get_or_create(
1✔
1729
        name=flag_name,
1730
        defaults={
1731
            "note": (
1732
                "MPP-3119: When a Relay user marks an email as spam, disable the mask."
1733
            )
1734
        },
1735
    )
1736
    source = message_json.get("mail", {}).get("source", "")
1✔
1737
    # parseaddr is confused by 2 email addresses in the value, so use this
1738
    # regular expression to extract the mask address by searching for any relay domains
1739
    email_domains = get_domains_from_settings().values()
1✔
1740
    domain_pattern = "|".join(re.escape(domain) for domain in email_domains)
1✔
1741
    email_regex = rf"[\w\.-]+@(?:{domain_pattern})"
1✔
1742
    matches = re.findall(email_regex, source)
1✔
1743
    if not matches:
1!
UNCOV
1744
        return
×
1745
    for mask_address in matches:
1✔
1746
        try:
1✔
1747
            address = _get_address(mask_address, False)
1✔
1748
            # ensure the mask belongs to the user for whom Relay received a complaint,
1749
            # and that they haven't already disabled the mask themselves.
1750
            if address.user != user or address.enabled is False:
1!
1751
                continue
×
1752
            if flag_is_active_in_task(flag_name, address.user):
1✔
1753
                address.enabled = False
1✔
1754
                address.save()
1✔
1755
                _send_disabled_mask_for_spam_email(
1✔
1756
                    address, message_json.get("mail", {})
1757
                )
1758
        except (
×
1759
            ObjectDoesNotExist,
1760
            RelayAddress.DoesNotExist,
1761
            DomainAddress.DoesNotExist,
1762
        ):
1763
            logger.error(
×
1764
                "Received a complaint from a destination address that does not match "
1765
                "a Relay address.",
1766
            )
1767

1768

1769
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1770
    """
1771
    Handle an AWS SES complaint notification.
1772

1773
    Sets the user's auto_block_spam flag to True.
1774

1775
    Disables the mask thru which the spam mail was forwarded, and sends an email to the
1776
    user to notify them the mask is disabled and can be re-enabled on their dashboard.
1777

1778
    For more information, see:
1779
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1780

1781
    Returns:
1782
    * 404 response if any email address does not match a user,
1783
    * 200 response if all match or none are given
1784

1785
    Emits a counter metric "email_complaint" with these tags:
1786
    * complaint_subtype: 'onaccountsuppressionlist', or 'none' if omitted
1787
    * complaint_feedback - feedback enumeration from ISP or 'none'
1788
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1789
    * relay_action: 'no_action', 'auto_block_spam'
1790

1791
    Emits an info log "complaint_notification", same data as metric, plus:
1792
    * complaint_user_agent - identifies the client used to file the complaint
1793
    * complaint_extra - Extra data from complainedRecipients data, if any
1794
    * domain - User's domain, if an address was given
1795
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1796
    """
1797
    complaint = deepcopy(message_json.get("complaint", {}))
1✔
1798
    complained_recipients = complaint.pop("complainedRecipients", [])
1✔
1799
    subtype = complaint.pop("complaintSubType", None)
1✔
1800
    user_agent = complaint.pop("userAgent", None)
1✔
1801
    feedback = complaint.pop("complaintFeedbackType", None)
1✔
1802

1803
    complaint_data = []
1✔
1804
    for recipient in complained_recipients:
1✔
1805
        recipient_address = recipient.pop("emailAddress", None)
1✔
1806
        data = {
1✔
1807
            "complaint_subtype": subtype,
1808
            "complaint_user_agent": user_agent,
1809
            "complaint_feedback": feedback,
1810
            "user_match": "no_address",
1811
            "relay_action": "no_action",
1812
        }
1813
        if recipient:
1!
1814
            data["complaint_extra"] = recipient.copy()
×
1815
        complaint_data.append(data)
1✔
1816

1817
        if recipient_address is None:
1!
1818
            continue
×
1819

1820
        recipient_address = parseaddr(recipient_address)[1]
1✔
1821
        recipient_domain = recipient_address.split("@")[1]
1✔
1822
        data["domain"] = recipient_domain
1✔
1823

1824
        try:
1✔
1825
            user = User.objects.get(email=recipient_address)
1✔
1826
            profile = user.profile
1✔
1827
            data["user_match"] = "found"
1✔
1828
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1829
                data["fxa_id"] = fxa.uid
1✔
1830
            else:
1831
                data["fxa_id"] = ""
1✔
1832
        except User.DoesNotExist:
×
1833
            data["user_match"] = "missing"
×
1834
            continue
×
1835

1836
        data["relay_action"] = "auto_block_spam"
1✔
1837
        profile.auto_block_spam = True
1✔
1838
        profile.save()
1✔
1839

1840
        _disable_masks_for_complaint(message_json, user)
1✔
1841

1842
    if not complaint_data:
1!
1843
        # Data when there are no identified recipients
1844
        complaint_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1845

1846
    if flag_is_active_in_task("developer_mode", user):
1✔
1847
        # MPP-3932: We need more information to match complaints to masks
1848
        dev_action = DeveloperModeAction(mask_id="unknown", action="log")
1✔
1849
        _log_dev_notification("_handle_complaint: MPP-3932", dev_action, message_json)
1✔
1850

1851
    for data in complaint_data:
1✔
1852
        tags = {
1✔
1853
            "complaint_subtype": subtype or "none",
1854
            "complaint_feedback": feedback or "none",
1855
            "user_match": data["user_match"],
1856
            "relay_action": data["relay_action"],
1857
        }
1858
        incr_if_enabled(
1✔
1859
            "email_complaint",
1860
            1,
1861
            tags=[generate_tag(key, val) for key, val in tags.items()],
1862
        )
1863
        info_logger.info("complaint_notification", extra=data)
1✔
1864

1865
    if any(data["user_match"] == "missing" for data in complaint_data):
1!
1866
        return HttpResponse("Address does not exist", status=404)
×
1867
    return HttpResponse("OK", status=200)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc