• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 134f1667-5e36-4ce4-9c96-cad97b5267b3

17 Oct 2024 07:37PM UTC coverage: 84.874% (+0.4%) from 84.495%
134f1667-5e36-4ce4-9c96-cad97b5267b3

Pull #5115

circleci

jwhitlock
Use '_handle_complaint: developer_mode' for logs

This will match '_handle_received: developer_mode'.
Pull Request #5115: MPP-3932: Disable masks on complaints, round 2

2414 of 3549 branches covered (68.02%)

Branch coverage included in aggregate %.

480 of 481 new or added lines in 2 files covered. (99.79%)

20 existing lines in 1 file now uncovered.

16849 of 19147 relevant lines covered (88.0%)

9.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/emails/views.py
1
import html
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import shlex
1✔
6
from datetime import UTC, datetime
1✔
7
from email import message_from_bytes
1✔
8
from email.iterators import _structure
1✔
9
from email.message import EmailMessage
1✔
10
from email.utils import parseaddr
1✔
11
from io import StringIO
1✔
12
from json import JSONDecodeError
1✔
13
from textwrap import dedent
1✔
14
from typing import Any, Literal, NamedTuple, TypedDict, TypeVar
1✔
15
from urllib.parse import urlencode
1✔
16

17
from django.conf import settings
1✔
18
from django.contrib.auth.models import User
1✔
19
from django.core.exceptions import ObjectDoesNotExist
1✔
20
from django.db import transaction
1✔
21
from django.db.models import prefetch_related_objects
1✔
22
from django.http import HttpRequest, HttpResponse
1✔
23
from django.shortcuts import render
1✔
24
from django.template.loader import render_to_string
1✔
25
from django.utils.html import escape
1✔
26
from django.views.decorators.csrf import csrf_exempt
1✔
27

28
from botocore.exceptions import ClientError
1✔
29
from codetiming import Timer
1✔
30
from decouple import strtobool
1✔
31
from markus.utils import generate_tag
1✔
32
from sentry_sdk import capture_message
1✔
33
from waffle import get_waffle_flag_model, sample_is_active
1✔
34

35
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
36
from privaterelay.models import Profile
1✔
37
from privaterelay.utils import (
1✔
38
    flag_is_active_in_task,
39
    get_subplat_upgrade_link_by_language,
40
    glean_logger,
41
)
42

43
from .exceptions import CannotMakeAddressException
1✔
44
from .models import (
1✔
45
    DeletedAddress,
46
    DomainAddress,
47
    RelayAddress,
48
    Reply,
49
    address_hash,
50
    get_domain_numerical,
51
)
52
from .policy import relay_policy
1✔
53
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
1✔
54
from .types import (
1✔
55
    AWS_MailJSON,
56
    AWS_SNSMessageJSON,
57
    EmailForwardingIssues,
58
    EmailHeaderIssues,
59
    OutgoingHeaders,
60
)
61
from .utils import (
1✔
62
    InvalidFromHeader,
63
    _get_bucket_and_key_from_s3_json,
64
    b64_lookup_key,
65
    count_all_trackers,
66
    decrypt_reply_metadata,
67
    derive_reply_keys,
68
    encode_dict_gza85,
69
    encrypt_reply_metadata,
70
    generate_from_header,
71
    get_domains_from_settings,
72
    get_message_content_from_s3,
73
    get_message_id_bytes,
74
    get_reply_to_address,
75
    histogram_if_enabled,
76
    incr_if_enabled,
77
    parse_email_header,
78
    remove_message_from_s3,
79
    remove_trackers,
80
    ses_send_raw_email,
81
    urlize_and_linebreaks,
82
)
83

84
logger = logging.getLogger("events")
1✔
85
info_logger = logging.getLogger("eventsinfo")
1✔
86

87

88
class ReplyHeadersNotFound(Exception):
1✔
89
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
90
        self.message = message
1✔
91

92

93
def first_time_user_test(request):
1✔
94
    """
95
    Demonstrate rendering of the "First time Relay user" email.
96
    Settings like language can be given in the querystring, otherwise settings
97
    come from a random free profile.
98
    """
99
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
100
    email_context = {
×
101
        "in_bundle_country": in_bundle_country,
102
        "SITE_ORIGIN": settings.SITE_ORIGIN,
103
    }
104
    if request.GET.get("format", "html") == "text":
×
105
        return render(
×
106
            request,
107
            "emails/first_time_user.txt",
108
            email_context,
109
            "text/plain; charset=utf-8",
110
        )
111
    return render(request, "emails/first_time_user.html", email_context)
×
112

113

114
def reply_requires_premium_test(request):
1✔
115
    """
116
    Demonstrate rendering of the "Reply requires premium" email.
117

118
    Settings like language can be given in the querystring, otherwise settings
119
    come from a random free profile.
120
    """
121
    email_context = {
1✔
122
        "sender": "test@example.com",
123
        "forwarded": True,
124
        "SITE_ORIGIN": settings.SITE_ORIGIN,
125
    }
126
    for param in request.GET:
1✔
127
        email_context[param] = request.GET.get(param)
1✔
128
        if param == "forwarded" and request.GET[param] == "True":
1✔
129
            email_context[param] = True
1✔
130

131
    for param in request.GET:
1✔
132
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
133
            return render(
1✔
134
                request,
135
                "emails/reply_requires_premium.txt",
136
                email_context,
137
                "text/plain; charset=utf-8",
138
            )
139
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
140

141

142
def disabled_mask_for_spam_test(request):
1✔
143
    """
144
    Demonstrate rendering of the "Disabled mask for spam" email.
145

146
    Settings like language can be given in the querystring, otherwise settings
147
    come from a random free profile.
148
    """
149
    mask = "abc123456@mozmail.com"
×
150
    email_context = {
×
151
        "mask": mask,
152
        "SITE_ORIGIN": settings.SITE_ORIGIN,
153
    }
154
    for param in request.GET:
×
155
        email_context[param] = request.GET.get(param)
×
156

157
    for param in request.GET:
×
158
        if param == "content-type" and request.GET[param] == "text/plain":
×
159
            return render(
×
160
                request,
161
                "emails/disabled_mask_for_spam.txt",
162
                email_context,
163
                "text/plain; charset=utf-8",
164
            )
165
    return render(request, "emails/disabled_mask_for_spam.html", email_context)
×
166

167

168
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
169
    # TO DO: Update with correct context when trigger is created
170
    first_forwarded_email_html = render_to_string(
×
171
        "emails/first_forwarded_email.html",
172
        {
173
            "SITE_ORIGIN": settings.SITE_ORIGIN,
174
        },
175
    )
176

177
    wrapped_email = wrap_html_email(
×
178
        first_forwarded_email_html,
179
        "en-us",
180
        True,
181
        "test@example.com",
182
        0,
183
    )
184

185
    return HttpResponse(wrapped_email)
×
186

187

188
def wrap_html_email(
1✔
189
    original_html: str,
190
    language: str,
191
    has_premium: bool,
192
    display_email: str,
193
    num_level_one_email_trackers_removed: int | None = None,
194
    tracker_report_link: str | None = None,
195
) -> str:
196
    """Add Relay banners, surveys, etc. to an HTML email"""
197
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
198
    email_context = {
1✔
199
        "original_html": original_html,
200
        "language": language,
201
        "has_premium": has_premium,
202
        "subplat_upgrade_link": subplat_upgrade_link,
203
        "display_email": display_email,
204
        "tracker_report_link": tracker_report_link,
205
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
206
        "SITE_ORIGIN": settings.SITE_ORIGIN,
207
    }
208
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
209
    # Remove empty lines
210
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
211
    return "\n".join(content_lines) + "\n"
1✔
212

213

214
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
215
    """
216
    Demonstrate rendering of forwarded HTML emails.
217

218
    Settings like language can be given in the querystring, otherwise settings
219
    come from a randomly chosen profile.
220
    """
221

222
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
223
        user_profile = None
1✔
224
    else:
225
        user_profile = Profile.objects.order_by("?").first()
1✔
226

227
    if "language" in request.GET:
1✔
228
        language = request.GET["language"]
1✔
229
    else:
230
        if user_profile is None:
1!
231
            raise ValueError("user_profile must not be None")
×
232
        language = user_profile.language
1✔
233

234
    if "has_premium" in request.GET:
1✔
235
        has_premium = strtobool(request.GET["has_premium"])
1✔
236
    else:
237
        if user_profile is None:
1!
238
            raise ValueError("user_profile must not be None")
×
239
        has_premium = user_profile.has_premium
1✔
240

241
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
242
        num_level_one_email_trackers_removed = int(
1✔
243
            request.GET["num_level_one_email_trackers_removed"]
244
        )
245
    else:
246
        num_level_one_email_trackers_removed = 0
1✔
247

248
    if "has_tracker_report_link" in request.GET:
1✔
249
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
250
    else:
251
        has_tracker_report_link = False
1✔
252
    if has_tracker_report_link:
1✔
253
        if num_level_one_email_trackers_removed:
1✔
254
            trackers = {
1✔
255
                "fake-tracker.example.com": num_level_one_email_trackers_removed
256
            }
257
        else:
258
            trackers = {}
1✔
259
        tracker_report_link = (
1✔
260
            "/tracker-report/#{"
261
            '"sender": "sender@example.com", '
262
            '"received_at": 1658434657, '
263
            f'"trackers": { json.dumps(trackers) }'
264
            "}"
265
        )
266
    else:
267
        tracker_report_link = ""
1✔
268

269
    path = "/emails/wrapped_email_test"
1✔
270
    old_query = {
1✔
271
        "language": language,
272
        "has_premium": "Yes" if has_premium else "No",
273
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
274
        "num_level_one_email_trackers_removed": str(
275
            num_level_one_email_trackers_removed
276
        ),
277
    }
278

279
    def switch_link(key, value):
1✔
280
        if old_query[key] == value:
1✔
281
            return str(value)
1✔
282
        new_query = old_query.copy()
1✔
283
        new_query[key] = value
1✔
284
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
285

286
    html_content = dedent(
1✔
287
        f"""\
288
    <p>
289
      <strong>Email rendering Test</strong>
290
    </p>
291
    <p>Settings: (<a href="{path}">clear all</a>)</p>
292
    <ul>
293
      <li>
294
        <strong>language</strong>:
295
        {escape(language)}
296
        (switch to
297
        {switch_link("language", "en-us")},
298
        {switch_link("language", "de")},
299
        {switch_link("language", "en-gb")},
300
        {switch_link("language", "fr")},
301
        {switch_link("language", "ru-ru")},
302
        {switch_link("language", "es-es")},
303
        {switch_link("language", "pt-br")},
304
        {switch_link("language", "it-it")},
305
        {switch_link("language", "en-ca")},
306
        {switch_link("language", "de-de")},
307
        {switch_link("language", "es-mx")})
308
      </li>
309
      <li>
310
        <strong>has_premium</strong>:
311
        {"Yes" if has_premium else "No"}
312
        (switch to
313
        {switch_link("has_premium", "Yes")},
314
        {switch_link("has_premium", "No")})
315
      </li>
316
      <li>
317
        <strong>has_tracker_report_link</strong>:
318
        {"Yes" if has_tracker_report_link else "No"}
319
        (switch to
320
        {switch_link("has_tracker_report_link", "Yes")},
321
        {switch_link("has_tracker_report_link", "No")})
322
      </li>
323
      <li>
324
        <strong>num_level_one_email_trackers_removed</strong>:
325
        {num_level_one_email_trackers_removed}
326
        (switch to
327
        {switch_link("num_level_one_email_trackers_removed", "0")},
328
        {switch_link("num_level_one_email_trackers_removed", "1")},
329
        {switch_link("num_level_one_email_trackers_removed", "2")})
330
      </li>
331
    </ul>
332
    """
333
    )
334

335
    wrapped_email = wrap_html_email(
1✔
336
        original_html=html_content,
337
        language=language,
338
        has_premium=has_premium,
339
        tracker_report_link=tracker_report_link,
340
        display_email="test@relay.firefox.com",
341
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
342
    )
343
    return HttpResponse(wrapped_email)
1✔
344

345

346
def _store_reply_record(
1✔
347
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
348
) -> AWS_MailJSON:
349
    # After relaying email, store a Reply record for it
350
    reply_metadata = {}
1✔
351
    for header in mail["headers"]:
1✔
352
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
353
            reply_metadata[header["name"].lower()] = header["value"]
1✔
354
    message_id_bytes = get_message_id_bytes(message_id)
1✔
355
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
356
    lookup = b64_lookup_key(lookup_key)
1✔
357
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
358
    reply_create_args: dict[str, Any] = {
1✔
359
        "lookup": lookup,
360
        "encrypted_metadata": encrypted_metadata,
361
    }
362
    if isinstance(address, DomainAddress):
1✔
363
        reply_create_args["domain_address"] = address
1✔
364
    else:
365
        if not isinstance(address, RelayAddress):
1!
366
            raise TypeError("address must be type RelayAddress")
×
367
        reply_create_args["relay_address"] = address
1✔
368
    Reply.objects.create(**reply_create_args)
1✔
369
    return mail
1✔
370

371

372
@csrf_exempt
1✔
373
def sns_inbound(request):
1✔
374
    incr_if_enabled("sns_inbound", 1)
1✔
375
    # First thing we do is verify the signature
376
    json_body = json.loads(request.body)
1✔
377
    verified_json_body = verify_from_sns(json_body)
1✔
378

379
    # Validate ARN and message type
380
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
381
    message_type = verified_json_body.get("Type", None)
1✔
382
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
383
    if error_details:
1✔
384
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
385
        return HttpResponse(error_details["error"], status=400)
1✔
386

387
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
388

389

390
def validate_sns_arn_and_type(
1✔
391
    topic_arn: str | None, message_type: str | None
392
) -> dict[str, Any] | None:
393
    """
394
    Validate Topic ARN and SNS Message Type.
395

396
    If an error is detected, the return is a dictionary of error details.
397
    If no error is detected, the return is None.
398
    """
399
    if not topic_arn:
1✔
400
        error = "Received SNS request without Topic ARN."
1✔
401
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
402
        error = "Received SNS message for wrong topic."
1✔
403
    elif not message_type:
1✔
404
        error = "Received SNS request without Message Type."
1✔
405
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
406
        error = "Received SNS message for unsupported Type."
1✔
407
    else:
408
        error = None
1✔
409

410
    if error:
1✔
411
        return {
1✔
412
            "error": error,
413
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
414
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
415
            "received_sns_type": (
416
                shlex.quote(message_type) if message_type else message_type
417
            ),
418
            "supported_sns_types": SUPPORTED_SNS_TYPES,
419
        }
420
    return None
1✔
421

422

423
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
424
    if message_type == "SubscriptionConfirmation":
×
425
        info_logger.info(
×
426
            "SNS SubscriptionConfirmation",
427
            extra={"SubscribeURL": json_body["SubscribeURL"]},
428
        )
429
        return HttpResponse("Logged SubscribeURL", status=200)
×
430
    if message_type == "Notification":
×
431
        incr_if_enabled("sns_inbound_Notification", 1)
×
432
        return _sns_notification(json_body)
×
433

434
    logger.error(
×
435
        "SNS message type did not fall under the SNS inbound logic",
436
        extra={"message_type": shlex.quote(message_type)},
437
    )
438
    capture_message(
×
439
        "Received SNS message with type not handled in inbound log",
440
        level="error",
441
        stack=True,
442
    )
443
    return HttpResponse(
×
444
        "Received SNS message with type not handled in inbound log", status=400
445
    )
446

447

448
def _sns_notification(json_body):
1✔
449
    try:
1✔
450
        message_json = json.loads(json_body["Message"])
1✔
451
    except JSONDecodeError:
1✔
452
        logger.error(
1✔
453
            "SNS notification has non-JSON message body",
454
            extra={"content": shlex.quote(json_body["Message"])},
455
        )
456
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
457

458
    event_type = message_json.get("eventType")
1✔
459
    notification_type = message_json.get("notificationType")
1✔
460
    if notification_type not in {
1✔
461
        "Complaint",
462
        "Received",
463
        "Bounce",
464
    } and event_type not in {"Complaint", "Bounce"}:
465
        logger.error(
1✔
466
            "SNS notification for unsupported type",
467
            extra={
468
                "notification_type": shlex.quote(notification_type),
469
                "event_type": shlex.quote(event_type),
470
                "keys": [shlex.quote(key) for key in message_json.keys()],
471
            },
472
        )
473
        return HttpResponse(
1✔
474
            (
475
                "Received SNS notification for unsupported Type: "
476
                f"{html.escape(shlex.quote(notification_type))}"
477
            ),
478
            status=400,
479
        )
480
    response = _sns_message(message_json)
1✔
481
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
482
    if response.status_code < 500:
1✔
483
        remove_message_from_s3(bucket, object_key)
1✔
484

485
    return response
1✔
486

487

488
def _get_recipient_with_relay_domain(recipients):
1✔
489
    domains_to_check = get_domains_from_settings().values()
1✔
490
    for recipient in recipients:
1✔
491
        for domain in domains_to_check:
1✔
492
            if domain in recipient:
1✔
493
                return recipient
1✔
494
    return None
1✔
495

496

497
def _get_relay_recipient_from_message_json(message_json):
1✔
498
    # Go thru all To, Cc, and Bcc fields and
499
    # return the one that has a Relay domain
500

501
    # First check common headers for to or cc match
502
    headers_to_check = "to", "cc"
1✔
503
    common_headers = message_json["mail"]["commonHeaders"]
1✔
504
    for header in headers_to_check:
1✔
505
        if header in common_headers:
1✔
506
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
507
            if recipient is not None:
1✔
508
                return parseaddr(recipient)[1]
1✔
509

510
    # SES-SNS sends bcc in a different part of the message
511
    recipients = message_json["receipt"]["recipients"]
1✔
512
    return _get_recipient_with_relay_domain(recipients)
1✔
513

514

515
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
516
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
517
    init_waffle_flags()
1✔
518
    notification_type = message_json.get("notificationType")
1✔
519
    event_type = message_json.get("eventType")
1✔
520
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
521
        return _handle_bounce(message_json)
1✔
522
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
523
        return _handle_complaint(message_json)
1✔
524
    if notification_type != "Received":
1!
525
        raise ValueError('notification_type must be "Received"')
×
526
    if event_type is not None:
1!
527
        raise ValueError("event_type must be None")
×
528
    return _handle_received(message_json)
1✔
529

530

531
# Enumerate the reasons that an email was not forwarded.
532
# This excludes emails dropped due to mask forwarding settings,
533
# such as "block all" and "block promotional". Those are logged
534
# as Glean email_blocked events.
535
EmailDroppedReason = Literal[
1✔
536
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
537
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
538
    "hard_bounce_pause",  # The user recently had a hard bounce
539
    "soft_bounce_pause",  # The user recently has a soft bounce
540
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
541
    "user_deactivated",  # The user account is deactivated
542
    "reply_requires_premium",  # The email is a reply from a free user
543
    "content_missing",  # Could not load the email from storage
544
    "error_from_header",  # Error generating the From: header, retryable
545
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
546
    "error_sending",  # Error sending the forwarded email (SES), retryable
547
]
548

549

550
def log_email_dropped(
1✔
551
    reason: EmailDroppedReason,
552
    mask: RelayAddress | DomainAddress,
553
    is_reply: bool = False,
554
    can_retry: bool = False,
555
) -> None:
556
    """
557
    Log that an email was dropped for a reason other than a mask blocking setting.
558

559
    This mirrors the interface of glean_logger().log_email_blocked(), which
560
    records emails dropped due to the mask's blocking setting.
561
    """
562
    extra: dict[str, str | int | bool] = {"reason": reason}
1✔
563
    if mask.user.profile.metrics_enabled:
1✔
564
        if mask.user.profile.fxa is not None:
1✔
565
            extra["fxa_id"] = mask.user.profile.fxa.uid
1✔
566
        extra["mask_id"] = mask.metrics_id
1✔
567
    extra |= {
1✔
568
        "is_random_mask": isinstance(mask, RelayAddress),
569
        "is_reply": is_reply,
570
        "can_retry": can_retry,
571
    }
572
    info_logger.info("email_dropped", extra=extra)
1✔
573

574

575
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
576
    """
577
    Handle an AWS SES received notification.
578

579
    For more information, see:
580
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
581
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
582

583
    Returns (may be incomplete):
584
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
585
      flagged for abuse, the email is under a bounce pause, the email was suppressed
586
      for spam, the list email was blocked, or the noreply address was the recipient.
587
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
588
      the email failed DMARC with reject policy, or the email is a reply chain to a
589
      non-premium user.
590
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
591
      "CC", or "BCC" fields, or the Relay address is not in the database.
592
    * 503 if the "From" address is malformed, the S3 client returned an error different
593
      from "not found", or the SES client fails
594

595
    And many other returns conditions if the email is a reply. The HTTP returns are an
596
    artifact from an earlier time when emails were sent to a webhook. Currently,
597
    production instead pulls events from a queue.
598

599
    TODO: Return a more appropriate status object
600
    TODO: Document the metrics emitted
601
    """
602
    mail = message_json["mail"]
1✔
603
    if "commonHeaders" not in mail:
1✔
604
        logger.error("SNS message without commonHeaders")
1✔
605
        return HttpResponse(
1✔
606
            "Received SNS notification without commonHeaders.", status=400
607
        )
608
    common_headers = mail["commonHeaders"]
1✔
609
    receipt = message_json["receipt"]
1✔
610

611
    _record_receipt_verdicts(receipt, "all")
1✔
612
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
613
    if to_address is None:
1✔
614
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
615
        return HttpResponse("Address does not exist", status=404)
1✔
616

617
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
618
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
619
    if not from_addresses:
1✔
620
        info_logger.error(
1✔
621
            "_handle_received: no from address",
622
            extra={
623
                "source": mail["source"],
624
                "common_headers_from": common_headers["from"],
625
            },
626
        )
627
        return HttpResponse("Unable to parse From address", status=400)
1✔
628
    from_address = from_addresses[0][1]
1✔
629

630
    try:
1✔
631
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
632
    except ValueError:
1✔
633
        # TODO: Add metric
634
        return HttpResponse("Malformed to field.", status=400)
1✔
635

636
    if to_local_portion.lower() == "noreply":
1✔
637
        incr_if_enabled("email_for_noreply_address", 1)
1✔
638
        return HttpResponse("noreply address is not supported.")
1✔
639
    try:
1✔
640
        # FIXME: this ambiguous return of either
641
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
642
        # up a bit.
643
        address = _get_address(to_address)
1✔
644
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
645
        user_profile = address.user.profile
1✔
646
    except (
1✔
647
        ObjectDoesNotExist,
648
        CannotMakeAddressException,
649
        DeletedAddress.MultipleObjectsReturned,
650
    ):
651
        if to_local_portion.lower() == "replies":
1✔
652
            response = _handle_reply(from_address, message_json, to_address)
1✔
653
        else:
654
            response = HttpResponse("Address does not exist", status=404)
1✔
655
        return response
1✔
656

657
    _record_receipt_verdicts(receipt, "valid_user")
1✔
658
    # if this is spam and the user is set to auto-block spam, early return
659
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
660
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
661
        log_email_dropped(reason="auto_block_spam", mask=address)
1✔
662
        return HttpResponse("Address rejects spam.")
1✔
663

664
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
665
        policy = receipt.get("dmarcPolicy", "none")
1✔
666
        # TODO: determine action on dmarcPolicy "quarantine"
667
        if policy == "reject":
1!
668
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
1✔
669
            incr_if_enabled(
1✔
670
                "email_suppressed_for_dmarc_failure",
671
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
672
            )
673
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
674

675
    # if this user is over bounce limits, early return
676
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
677
    if bounce_paused:
1✔
678
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
679
        incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
1✔
680
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
681
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
682
        )
683
        log_email_dropped(reason=reason, mask=address)
1✔
684
        return HttpResponse("Address is temporarily disabled.")
1✔
685

686
    # check if this is a reply from an external sender to a Relay user
687
    try:
1✔
688
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
689
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
690
        user_address = address
1✔
691
        address = reply_record.address
1✔
692
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
693
        # make sure the relay user is premium
694
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
695
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
1✔
696
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
697
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
698
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
699
        # match a Reply record, continue to treat this as a regular email from
700
        # an external sender to a relay user
701
        pass
1✔
702

703
    # if account flagged for abuse, early return
704
    if user_profile.is_flagged:
1✔
705
        log_email_dropped(reason="abuse_flag", mask=address)
1✔
706
        return HttpResponse("Address is temporarily disabled.")
1✔
707

708
    if not user_profile.user.is_active:
1✔
709
        log_email_dropped(reason="user_deactivated", mask=address)
1✔
710
        return HttpResponse("Account is deactivated.")
1✔
711

712
    # if address is set to block, early return
713
    if not address.enabled:
1✔
714
        incr_if_enabled("email_for_disabled_address", 1)
1✔
715
        address.num_blocked += 1
1✔
716
        address.save(update_fields=["num_blocked"])
1✔
717
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
718
        user_profile.last_engagement = datetime.now(UTC)
1✔
719
        user_profile.save()
1✔
720
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
721
        return HttpResponse("Address is temporarily disabled.")
1✔
722

723
    _record_receipt_verdicts(receipt, "active_alias")
1✔
724
    incr_if_enabled("email_for_active_address", 1)
1✔
725

726
    # if address is blocking list emails, and email is from list, early return
727
    if (
1✔
728
        address
729
        and address.block_list_emails
730
        and user_profile.has_premium
731
        and _check_email_from_list(mail["headers"])
732
    ):
733
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
734
        address.num_blocked += 1
1✔
735
        address.save(update_fields=["num_blocked"])
1✔
736
        user_profile.last_engagement = datetime.now(UTC)
1✔
737
        user_profile.save()
1✔
738
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
739
        return HttpResponse("Address is not accepting list emails.")
1✔
740

741
    # Collect new headers
742
    subject = common_headers.get("subject", "")
1✔
743
    destination_address = user_profile.user.email
1✔
744
    reply_address = get_reply_to_address()
1✔
745
    try:
1✔
746
        from_header = generate_from_header(from_address, to_address)
1✔
747
    except InvalidFromHeader:
1✔
748
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
749
        header_from = []
1✔
750
        for header in mail["headers"]:
1✔
751
            if header["name"].lower() == "from":
1✔
752
                header_from.append(header)
1✔
753
        info_logger.error(
1✔
754
            "generate_from_header",
755
            extra={
756
                "from_address": from_address,
757
                "source": mail["source"],
758
                "common_headers_from": common_headers["from"],
759
                "headers_from": header_from,
760
            },
761
        )
762
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
1✔
763
        return HttpResponse("Cannot parse the From address", status=503)
1✔
764

765
    # Get incoming email
766
    try:
1✔
767
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
768
    except ClientError as e:
1✔
769
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
770
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
771
            log_email_dropped(reason="content_missing", mask=address)
1✔
772
            return HttpResponse("Email not in S3", status=404)
1✔
773
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
774
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
1✔
775
        # we are returning a 503 so that SNS can retry the email processing
776
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
777

778
    # Handle developer overrides, logging
779
    dev_action = _get_developer_mode_action(address)
1✔
780
    if dev_action:
1✔
781
        if dev_action.new_destination_address:
1!
782
            destination_address = dev_action.new_destination_address
1✔
783
        _log_dev_notification(
1✔
784
            "_handle_received: developer_mode", dev_action, message_json
785
        )
786

787
    # Convert to new email
788
    headers: OutgoingHeaders = {
1✔
789
        "Subject": subject,
790
        "From": from_header,
791
        "To": destination_address,
792
        "Reply-To": reply_address,
793
        "Resent-From": from_address,
794
    }
795
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
796
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
797
    remove_level_one_trackers = bool(
1✔
798
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
799
    )
800
    (
1✔
801
        forwarded_email,
802
        issues,
803
        level_one_trackers_removed,
804
        has_html,
805
        has_text,
806
    ) = _convert_to_forwarded_email(
807
        incoming_email_bytes=incoming_email_bytes,
808
        headers=headers,
809
        to_address=to_address,
810
        from_address=from_address,
811
        language=user_profile.language,
812
        has_premium=user_profile.has_premium,
813
        sample_trackers=sample_trackers,
814
        remove_level_one_trackers=remove_level_one_trackers,
815
    )
816
    if has_html:
1✔
817
        incr_if_enabled("email_with_html_content", 1)
1✔
818
    if has_text:
1✔
819
        incr_if_enabled("email_with_text_content", 1)
1✔
820
    if issues:
1✔
821
        info_logger.info(
1✔
822
            "_handle_received: forwarding issues", extra={"issues": issues}
823
        )
824

825
    # Send new email
826
    try:
1✔
827
        ses_response = ses_send_raw_email(
1✔
828
            source_address=reply_address,
829
            destination_address=destination_address,
830
            message=forwarded_email,
831
        )
832
    except ClientError:
1✔
833
        # 503 service unavailable response to SNS so it can retry
834
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
1✔
835
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
836

837
    message_id = ses_response["MessageId"]
1✔
838
    _store_reply_record(mail, message_id, address)
1✔
839

840
    user_profile.update_abuse_metric(
1✔
841
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
842
    )
843
    user_profile.last_engagement = datetime.now(UTC)
1✔
844
    user_profile.save()
1✔
845
    address.num_forwarded += 1
1✔
846
    address.last_used_at = datetime.now(UTC)
1✔
847
    if level_one_trackers_removed:
1!
848
        address.num_level_one_trackers_blocked = (
×
849
            address.num_level_one_trackers_blocked or 0
850
        ) + level_one_trackers_removed
851
    address.save(
1✔
852
        update_fields=[
853
            "num_forwarded",
854
            "last_used_at",
855
            "block_list_emails",
856
            "num_level_one_trackers_blocked",
857
        ]
858
    )
859
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
860
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
861

862

863
class DeveloperModeAction(NamedTuple):
1✔
864
    mask_id: str
1✔
865
    action: Literal["log", "simulate_complaint"] = "log"
1✔
866
    new_destination_address: str | None = None
1✔
867

868

869
def _get_verdict(receipt, verdict_type):
1✔
870
    return receipt[f"{verdict_type}Verdict"]["status"]
1✔
871

872

873
def _check_email_from_list(headers):
1✔
874
    for header in headers:
1!
875
        if header["name"].lower().startswith("list-"):
1!
876
            return True
1✔
877
    return False
×
878

879

880
def _record_receipt_verdicts(receipt, state):
1✔
881
    verdict_tags = []
1✔
882
    for key in sorted(receipt.keys()):
1✔
883
        if key.endswith("Verdict"):
1✔
884
            value = receipt[key]["status"]
1✔
885
            verdict_tags.append(f"{key}:{value}")
1✔
886
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
887
        elif key == "dmarcPolicy":
1✔
888
            value = receipt[key]
1✔
889
            verdict_tags.append(f"{key}:{value}")
1✔
890
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
891

892

893
def _get_message_id_from_headers(headers):
1✔
894
    message_id = None
1✔
895
    for header in headers:
1✔
896
        if header["name"].lower() == "message-id":
1✔
897
            message_id = header["value"]
1✔
898
    return message_id
1✔
899

900

901
def _get_keys_from_headers(headers):
1✔
902
    in_reply_to = None
1✔
903
    for header in headers:
1✔
904
        if header["name"].lower() == "in-reply-to":
1✔
905
            in_reply_to = header["value"]
1✔
906
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
907
            return derive_reply_keys(message_id_bytes)
1✔
908

909
        if header["name"].lower() == "references":
1✔
910
            message_ids = header["value"]
1✔
911
            for message_id in message_ids.split(" "):
1✔
912
                message_id_bytes = get_message_id_bytes(message_id)
1✔
913
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
914
                try:
1✔
915
                    # FIXME: calling code is likely to duplicate this query
916
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
917
                    return lookup_key, encryption_key
1✔
918
                except Reply.DoesNotExist:
1✔
919
                    pass
1✔
920
            raise Reply.DoesNotExist
1✔
921
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
922
    raise ReplyHeadersNotFound
1✔
923

924

925
def _get_reply_record_from_lookup_key(lookup_key):
1✔
926
    lookup = b64_lookup_key(lookup_key)
1✔
927
    return Reply.objects.get(lookup=lookup)
1✔
928

929

930
def _strip_localpart_tag(address):
1✔
931
    [localpart, domain] = address.split("@")
1✔
932
    subaddress_parts = localpart.split("+")
1✔
933
    return f"{subaddress_parts[0]}@{domain}"
1✔
934

935

936
_TransportType = Literal["sns", "s3"]
1✔
937

938

939
def _get_email_bytes(
1✔
940
    message_json: AWS_SNSMessageJSON,
941
) -> tuple[bytes, _TransportType, float]:
942
    with Timer(logger=None) as load_timer:
1✔
943
        if "content" in message_json:
1✔
944
            # email content in sns message
945
            message_content = message_json["content"].encode("utf-8")
1✔
946
            transport: Literal["sns", "s3"] = "sns"
1✔
947
        else:
948
            # assume email content in S3
949
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
950
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
951
            transport = "s3"
1✔
952
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
953
    load_time_s = round(load_timer.last, 3)
1✔
954
    return (message_content, transport, load_time_s)
1✔
955

956

957
def _get_developer_mode_action(
1✔
958
    mask: RelayAddress | DomainAddress,
959
) -> DeveloperModeAction | None:
960
    """Get the developer mode actions for this mask, if enabled."""
961

962
    if not (
1✔
963
        flag_is_active_in_task("developer_mode", mask.user)
964
        and "DEV:" in mask.description
965
    ):
966
        return None
1✔
967

968
    if "DEV:simulate_complaint" in mask.description:
1!
969
        if isinstance(mask, RelayAddress):
1✔
970
            subaddress = mask.address
1✔
971
        else:
972
            subaddress = f"{mask.address}.{mask.user.profile.subdomain}"
1✔
973
        action = DeveloperModeAction(
1✔
974
            mask_id=mask.metrics_id,
975
            action="simulate_complaint",
976
            new_destination_address=f"complaint+{subaddress}@simulator.amazonses.com",
977
        )
978
    else:
979
        action = DeveloperModeAction(mask_id=mask.metrics_id, action="log")
×
980
    return action
1✔
981

982

983
def _log_dev_notification(
1✔
984
    log_message: str, dev_action: DeveloperModeAction, notification: dict[str, Any]
985
) -> None:
986
    """
987
    Log notification JSON
988

989
    This will log information beyond our privacy policy, so it should only be used on
990
    Relay staff accounts with prior permission.
991

992
    The notification JSON will be compressed, Ascii85-encoded with padding, and broken
993
    into 1024-bytes chunks. This will ensure it fits into GCP's log entry, which has a
994
    64KB limit per label value.
995
    """
996

997
    notification_gza85 = encode_dict_gza85(notification)
1✔
998
    total_parts = notification_gza85.count("\n") + 1
1✔
999
    for partnum, part in enumerate(notification_gza85.splitlines()):
1✔
1000
        info_logger.info(
1✔
1001
            log_message,
1002
            extra={
1003
                "mask_id": dev_action.mask_id,
1004
                "dev_action": dev_action.action,
1005
                "part": partnum,
1006
                "parts": total_parts,
1007
                "notification_gza85": part,
1008
            },
1009
        )
1010

1011

1012
def _convert_to_forwarded_email(
1✔
1013
    incoming_email_bytes: bytes,
1014
    headers: OutgoingHeaders,
1015
    to_address: str,
1016
    from_address: str,
1017
    language: str,
1018
    has_premium: bool,
1019
    sample_trackers: bool,
1020
    remove_level_one_trackers: bool,
1021
    now: datetime | None = None,
1022
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
1023
    """
1024
    Convert an email (as bytes) to a forwarded email.
1025

1026
    Return is a tuple:
1027
    - email - The forwarded email
1028
    - issues - Any detected issues in conversion
1029
    - level_one_trackers_removed (int) - Number of trackers removed
1030
    - has_html - True if the email has an HTML representation
1031
    - has_text - True if the email has a plain text representation
1032
    """
1033
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
1034
    # python/typeshed issue 2418
1035
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
1036
    # policy.default.message_factory is EmailMessage
1037
    if not isinstance(email, EmailMessage):
1!
1038
        raise TypeError("email must be type EmailMessage")
×
1039

1040
    # Replace headers in the original email
1041
    header_issues = _replace_headers(email, headers)
1✔
1042

1043
    # Find and replace text content
1044
    text_body = email.get_body("plain")
1✔
1045
    text_content = None
1✔
1046
    has_text = False
1✔
1047
    if text_body:
1✔
1048
        has_text = True
1✔
1049
        if not isinstance(text_body, EmailMessage):
1!
1050
            raise TypeError("text_body must be type EmailMessage")
×
1051
        text_content = text_body.get_content()
1✔
1052
        new_text_content = _convert_text_content(text_content, to_address)
1✔
1053
        text_body.set_content(new_text_content)
1✔
1054

1055
    # Find and replace HTML content
1056
    html_body = email.get_body("html")
1✔
1057
    level_one_trackers_removed = 0
1✔
1058
    has_html = False
1✔
1059
    if html_body:
1✔
1060
        has_html = True
1✔
1061
        if not isinstance(html_body, EmailMessage):
1!
1062
            raise TypeError("html_body must be type EmailMessage")
×
1063
        html_content = html_body.get_content()
1✔
1064
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1065
            html_content,
1066
            to_address,
1067
            from_address,
1068
            language,
1069
            has_premium,
1070
            sample_trackers,
1071
            remove_level_one_trackers,
1072
        )
1073
        html_body.set_content(new_content, subtype="html")
1✔
1074
    elif text_content:
1!
1075
        # Try to use the text content to generate HTML content
1076
        html_content = urlize_and_linebreaks(text_content)
1✔
1077
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1078
            html_content,
1079
            to_address,
1080
            from_address,
1081
            language,
1082
            has_premium,
1083
            sample_trackers,
1084
            remove_level_one_trackers,
1085
        )
1086
        if not isinstance(text_body, EmailMessage):
1!
1087
            raise TypeError("text_body must be type EmailMessage")
×
1088
        try:
1✔
1089
            text_body.add_alternative(new_content, subtype="html")
1✔
1090
        except TypeError as e:
×
1091
            out = StringIO()
×
1092
            _structure(email, fp=out)
×
1093
            info_logger.error(
×
1094
                "Adding HTML alternate failed",
1095
                extra={"exception": str(e), "structure": out.getvalue()},
1096
            )
1097

1098
    issues: EmailForwardingIssues = {}
1✔
1099
    if header_issues:
1✔
1100
        issues["headers"] = header_issues
1✔
1101
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
1102

1103

1104
def _replace_headers(
1✔
1105
    email: EmailMessage, headers: OutgoingHeaders
1106
) -> EmailHeaderIssues:
1107
    """
1108
    Replace the headers in email with new headers.
1109

1110
    This replaces headers in the passed email object, rather than returns an altered
1111
    copy. The primary reason is that the Python email package can read an email with
1112
    non-compliant headers or content, but can't write it. A read/write is required to
1113
    create a copy that we then alter. This code instead alters the passed EmailMessage
1114
    object, making header-specific changes in try / except statements.
1115

1116
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1117
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1118
    nice to handle the non-compliant headers without crashing before we add a source of
1119
    memory-related crashes.
1120
    """
1121
    # Look for headers to drop
1122
    to_drop: list[str] = []
1✔
1123
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
1✔
1124
    issues: EmailHeaderIssues = []
1✔
1125

1126
    # Detect non-compliant headers in incoming emails
1127
    for header in email.keys():
1✔
1128
        try:
1✔
1129
            value = email[header]
1✔
1130
        except Exception as e:
1✔
1131
            issues.append(
1✔
1132
                {"header": header, "direction": "in", "exception_on_read": repr(e)}
1133
            )
1134
            value = None
1✔
1135
        if getattr(value, "defects", None):
1✔
1136
            issues.append(
1✔
1137
                {
1138
                    "header": header,
1139
                    "direction": "in",
1140
                    "defect_count": len(value.defects),
1141
                    "parsed_value": str(value),
1142
                    "raw_value": str(value.as_raw),
1143
                }
1144
            )
1145
        elif getattr(getattr(value, "_parse_tree", None), "all_defects", []):
1✔
1146
            issues.append(
1✔
1147
                {
1148
                    "header": header,
1149
                    "direction": "in",
1150
                    "defect_count": len(value._parse_tree.all_defects),
1151
                    "parsed_value": str(value),
1152
                    "raw_value": str(value.as_raw),
1153
                }
1154
            )
1155

1156
    # Collect headers that will not be forwarded
1157
    for header in email.keys():
1✔
1158
        header_lower = header.lower()
1✔
1159
        if (
1✔
1160
            header_lower not in replacements
1161
            and header_lower != "mime-version"
1162
            and not header_lower.startswith("content-")
1163
        ):
1164
            to_drop.append(header)
1✔
1165

1166
    # Drop headers that should be dropped
1167
    for header in to_drop:
1✔
1168
        del email[header]
1✔
1169

1170
    # Replace the requested headers
1171
    for header, value in headers.items():
1✔
1172
        del email[header]
1✔
1173
        try:
1✔
1174
            email[header] = value.rstrip("\r\n")
1✔
1175
        except Exception as e:
×
1176
            issues.append(
×
1177
                {
1178
                    "header": header,
1179
                    "direction": "out",
1180
                    "exception_on_write": repr(e),
1181
                    "value": value,
1182
                }
1183
            )
1184
            continue
×
1185
        try:
1✔
1186
            parsed_value = email[header]
1✔
1187
        except Exception as e:
×
1188
            issues.append(
×
1189
                {
1190
                    "header": header,
1191
                    "direction": "out",
1192
                    "exception_on_write": repr(e),
1193
                    "value": value,
1194
                }
1195
            )
1196
            continue
×
1197
        if parsed_value.defects:
1!
1198
            issues.append(
×
1199
                {
1200
                    "header": header,
1201
                    "direction": "out",
1202
                    "defect_count": len(parsed_value.defects),
1203
                    "parsed_value": str(parsed_value),
1204
                    "raw_value": str(parsed_value.as_raw),
1205
                },
1206
            )
1207

1208
    return issues
1✔
1209

1210

1211
def _convert_html_content(
1✔
1212
    html_content: str,
1213
    to_address: str,
1214
    from_address: str,
1215
    language: str,
1216
    has_premium: bool,
1217
    sample_trackers: bool,
1218
    remove_level_one_trackers: bool,
1219
    now: datetime | None = None,
1220
) -> tuple[str, int]:
1221
    # frontend expects a timestamp in milliseconds
1222
    now = now or datetime.now(UTC)
1✔
1223
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1224

1225
    # scramble alias so that clients don't recognize it
1226
    # and apply default link styles
1227
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1228

1229
    # sample tracker numbers
1230
    if sample_trackers:
1!
1231
        count_all_trackers(html_content)
×
1232

1233
    tracker_report_link = ""
1✔
1234
    removed_count = 0
1✔
1235
    if remove_level_one_trackers:
1!
1236
        html_content, tracker_details = remove_trackers(
×
1237
            html_content, from_address, datetime_now_ms
1238
        )
1239
        removed_count = tracker_details["tracker_removed"]
×
1240
        tracker_report_details = {
×
1241
            "sender": from_address,
1242
            "received_at": datetime_now_ms,
1243
            "trackers": tracker_details["level_one"]["trackers"],
1244
        }
1245
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1246
            tracker_report_details
1247
        )
1248

1249
    wrapped_html = wrap_html_email(
1✔
1250
        original_html=html_content,
1251
        language=language,
1252
        has_premium=has_premium,
1253
        display_email=display_email,
1254
        tracker_report_link=tracker_report_link,
1255
        num_level_one_email_trackers_removed=removed_count,
1256
    )
1257
    return wrapped_html, removed_count
1✔
1258

1259

1260
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1261
    relay_header_text = (
1✔
1262
        "This email was sent to your alias "
1263
        f"{to_address}. To stop receiving emails sent to this alias, "
1264
        "update the forwarding settings in your dashboard.\n"
1265
        "---Begin Email---\n"
1266
    )
1267
    wrapped_text = relay_header_text + text_content
1✔
1268
    return wrapped_text
1✔
1269

1270

1271
def _build_reply_requires_premium_email(
1✔
1272
    from_address: str,
1273
    reply_record: Reply,
1274
    message_id: str | None,
1275
    decrypted_metadata: dict[str, Any] | None,
1276
) -> EmailMessage:
1277
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1278
    # will forward.  So, tell the user we forwarded it.
1279
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1280
    sender: str | None = ""
1✔
1281
    if decrypted_metadata is not None:
1!
1282
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1283
    ctx = {
1✔
1284
        "sender": sender or "",
1285
        "forwarded": forwarded,
1286
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1287
    }
1288
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1289
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1290

1291
    # Create the message
1292
    msg = EmailMessage()
1✔
1293
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1294
    msg["From"] = get_reply_to_address()
1✔
1295
    msg["To"] = from_address
1✔
1296
    if message_id:
1!
1297
        msg["In-Reply-To"] = message_id
1✔
1298
        msg["References"] = message_id
1✔
1299
    msg.set_content(text_body)
1✔
1300
    msg.add_alternative(html_body, subtype="html")
1✔
1301
    return msg
1✔
1302

1303

1304
def _set_forwarded_first_reply(profile):
1✔
1305
    profile.forwarded_first_reply = True
1✔
1306
    profile.save()
1✔
1307

1308

1309
def _send_reply_requires_premium_email(
1✔
1310
    from_address: str,
1311
    reply_record: Reply,
1312
    message_id: str | None,
1313
    decrypted_metadata: dict[str, Any] | None,
1314
) -> None:
1315
    msg = _build_reply_requires_premium_email(
×
1316
        from_address, reply_record, message_id, decrypted_metadata
1317
    )
1318
    try:
×
1319
        ses_send_raw_email(
×
1320
            source_address=get_reply_to_address(premium=False),
1321
            destination_address=from_address,
1322
            message=msg,
1323
        )
1324
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1325
        # So, updated the DB.
1326
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1327
    except ClientError as e:
×
1328
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1329
    incr_if_enabled("free_user_reply_attempt", 1)
×
1330

1331

1332
def _reply_allowed(
1✔
1333
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1334
):
1335
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1336
    reply_record_email = reply_record.address.user.email
1✔
1337
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1338
    if (from_address == reply_record_email) or (
1!
1339
        stripped_from_address == stripped_reply_record_address
1340
    ):
1341
        # This is a Relay user replying to an external sender;
1342

1343
        if not reply_record.profile.user.is_active:
1!
1344
            return False
×
1345

1346
        if reply_record.profile.is_flagged:
1!
1347
            return False
×
1348

1349
        if reply_record.owner_has_premium:
1!
1350
            return True
1✔
1351

1352
        # if we haven't forwarded a first reply for this user, return True to allow
1353
        # this first reply
1354
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
1355
        _send_reply_requires_premium_email(
×
1356
            from_address, reply_record, message_id, decrypted_metadata
1357
        )
1358
        return allow_first_reply
×
1359
    else:
1360
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1361
        # premium Relay user
1362
        try:
×
1363
            address = _get_address(to_address)
×
1364
            if address.user.profile.has_premium:
×
1365
                return True
×
1366
        except ObjectDoesNotExist:
×
1367
            return False
×
1368
    incr_if_enabled("free_user_reply_attempt", 1)
×
1369
    return False
×
1370

1371

1372
def _handle_reply(
1✔
1373
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1374
) -> HttpResponse:
1375
    """
1376
    Handle a reply from a Relay user to an external email.
1377

1378
    Returns (may be incomplete):
1379
    * 200 if the reply was sent
1380
    * 400 if the In-Reply-To and References headers are missing, none of the References
1381
      headers are a reply record, or the SES client raises an error
1382
    * 403 if the Relay user is not allowed to reply
1383
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1384
      the database
1385
    * 503 if the S3 client returns an error (other than not found), or the SES client
1386
      returns an error
1387

1388
    TODO: Return a more appropriate status object (see _handle_received)
1389
    TODO: Document metrics emitted
1390
    """
1391
    mail = message_json["mail"]
1✔
1392
    try:
1✔
1393
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1394
    except ReplyHeadersNotFound:
1✔
1395
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1396
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1397

1398
    try:
1✔
1399
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1400
    except Reply.DoesNotExist:
1✔
1401
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1402
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1403

1404
    address = reply_record.address
1✔
1405
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1406
    decrypted_metadata = json.loads(
1✔
1407
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1408
    )
1409
    if not _reply_allowed(
1✔
1410
        from_address, to_address, reply_record, message_id, decrypted_metadata
1411
    ):
1412
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
1✔
1413
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1414

1415
    outbound_from_address = address.full_address
1✔
1416
    incr_if_enabled("reply_email", 1)
1✔
1417
    subject = mail["commonHeaders"].get("subject", "")
1✔
1418
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1419
    headers: OutgoingHeaders = {
1✔
1420
        "Subject": subject,
1421
        "From": outbound_from_address,
1422
        "To": to_address,
1423
        "Reply-To": outbound_from_address,
1424
    }
1425

1426
    try:
1✔
1427
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1428
    except ClientError as e:
1✔
1429
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1430
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1431
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
1✔
1432
            return HttpResponse("Email not in S3", status=404)
1✔
1433
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1434
        log_email_dropped(
1✔
1435
            reason="error_storage", mask=address, is_reply=True, can_retry=True
1436
        )
1437
        # we are returning a 500 so that SNS can retry the email processing
1438
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1439

1440
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1441
    if not isinstance(email, EmailMessage):
1!
1442
        raise TypeError("email must be type EmailMessage")
×
1443

1444
    # Convert to a reply email
1445
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1446
    _replace_headers(email, headers)
1✔
1447

1448
    try:
1✔
1449
        ses_send_raw_email(
1✔
1450
            source_address=outbound_from_address,
1451
            destination_address=to_address,
1452
            message=email,
1453
        )
1454
    except ClientError:
1✔
1455
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
1✔
1456
        return HttpResponse("SES client error", status=400)
1✔
1457

1458
    reply_record.increment_num_replied()
1✔
1459
    profile = address.user.profile
1✔
1460
    profile.update_abuse_metric(replied=True)
1✔
1461
    profile.last_engagement = datetime.now(UTC)
1✔
1462
    profile.save()
1✔
1463
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1464
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1465

1466

1467
def _get_domain_address(
1✔
1468
    local_portion: str, domain_portion: str, create: bool = True
1469
) -> DomainAddress:
1470
    """
1471
    Find or create the DomainAddress for the parts of an email address.
1472

1473
    If the domain_portion is for a valid subdomain, and create=True, a new DomainAddress
1474
    will be created and returned. If create=False, DomainAddress.DoesNotExist is raised.
1475

1476
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1477

1478
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1479
    """
1480

1481
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1482
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1483
        if create:
1✔
1484
            incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1485
        raise ObjectDoesNotExist("Address does not exist")
1✔
1486
    try:
1✔
1487
        with transaction.atomic():
1✔
1488
            locked_profile = Profile.objects.select_for_update().get(
1✔
1489
                subdomain=address_subdomain
1490
            )
1491
            domain_numerical = get_domain_numerical(address_domain)
1✔
1492
            # filter DomainAddress because it may not exist
1493
            # which will throw an error with get()
1494
            domain_address = DomainAddress.objects.filter(
1✔
1495
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1496
            ).first()
1497
            if domain_address is None:
1✔
1498
                if not create:
1✔
1499
                    raise DomainAddress.DoesNotExist()
1✔
1500
                # TODO: Consider flows when a user generating alias on a fly
1501
                # was unable to receive an email due to user no longer being a
1502
                # premium user as seen in exception thrown on make_domain_address
1503
                domain_address = DomainAddress.make_domain_address(
1✔
1504
                    locked_profile.user, local_portion, True
1505
                )
1506
                glean_logger().log_email_mask_created(
1✔
1507
                    mask=domain_address,
1508
                    created_by_api=False,
1509
                )
1510
            domain_address.last_used_at = datetime.now(UTC)
1✔
1511
            domain_address.save()
1✔
1512
            return domain_address
1✔
1513
    except Profile.DoesNotExist as e:
1✔
1514
        if create:
1✔
1515
            incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1516
        raise e
1✔
1517

1518

1519
def _get_address(address: str, create: bool = True) -> RelayAddress | DomainAddress:
1✔
1520
    """
1521
    Find or create the RelayAddress or DomainAddress for an email address.
1522

1523
    If an unknown email address is for a valid subdomain, and create is True,
1524
    a new DomainAddress will be created.
1525

1526
    On failure, raises exception based on Django's ObjectDoesNotExist:
1527
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1528
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1529
    * DomainAddress.DoesNotExist - looks like unknown DomainAddress, create is False
1530
    * ObjectDoesNotExist - Unknown domain
1531
    """
1532

1533
    local_portion, domain_portion = address.split("@")
1✔
1534
    local_address = local_portion.lower()
1✔
1535
    domain = domain_portion.lower()
1✔
1536

1537
    # if the domain is not the site's 'top' relay domain,
1538
    # it may be for a user's subdomain
1539
    email_domains = get_domains_from_settings().values()
1✔
1540
    if domain not in email_domains:
1✔
1541
        return _get_domain_address(local_address, domain, create)
1✔
1542

1543
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1544
    try:
1✔
1545
        domain_numerical = get_domain_numerical(domain)
1✔
1546
        relay_address = RelayAddress.objects.get(
1✔
1547
            address=local_address, domain=domain_numerical
1548
        )
1549
        return relay_address
1✔
1550
    except RelayAddress.DoesNotExist as e:
1✔
1551
        if not create:
1✔
1552
            raise e
1✔
1553
        try:
1✔
1554
            DeletedAddress.objects.get(
1✔
1555
                address_hash=address_hash(local_address, domain=domain)
1556
            )
1557
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1558
            # TODO: create a hard bounce receipt rule in SES
1559
        except DeletedAddress.DoesNotExist:
1✔
1560
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1561
        except DeletedAddress.MultipleObjectsReturned:
1✔
1562
            # not sure why this happens on stage but let's handle it
1563
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1564
        raise e
1✔
1565

1566

1567
def _get_address_if_exists(address: str) -> RelayAddress | DomainAddress | None:
1✔
1568
    """Get the matching RelayAddress or DomainAddress, or None if it doesn't exist."""
1569
    try:
1✔
1570
        return _get_address(address, create=False)
1✔
1571
    except (RelayAddress.DoesNotExist, Profile.DoesNotExist, ObjectDoesNotExist):
1✔
1572
        return None
1✔
1573

1574

1575
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1576
    """
1577
    Handle an AWS SES bounce notification.
1578

1579
    For more information, see:
1580
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1581

1582
    Returns:
1583
    * 404 response if any email address does not match a user,
1584
    * 200 response if all match or none are given
1585

1586
    Emits a counter metric "email_bounce" with these tags:
1587
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1588
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1589
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1590
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1591

1592
    Emits an info log "bounce_notification", same data as metric, plus:
1593
    * bounce_action: 'action' from bounced recipient data, or None
1594
    * bounce_status: 'status' from bounced recipient data, or None
1595
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1596
    * bounce_extra: Extra data from bounce_recipient data, if any
1597
    * domain: User's real email address domain, if an address was given
1598
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1599
    """
1600
    bounce = message_json.get("bounce", {})
1✔
1601
    bounce_type = bounce.get("bounceType", "none")
1✔
1602
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1603
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1604

1605
    now = datetime.now(UTC)
1✔
1606
    bounce_data = []
1✔
1607
    for recipient in bounced_recipients:
1✔
1608
        recipient_address = recipient.pop("emailAddress", None)
1✔
1609
        data = {
1✔
1610
            "bounce_type": bounce_type,
1611
            "bounce_subtype": bounce_subtype,
1612
            "bounce_action": recipient.pop("action", ""),
1613
            "bounce_status": recipient.pop("status", ""),
1614
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1615
            "user_match": "no_address",
1616
            "relay_action": "no_action",
1617
        }
1618
        if recipient:
1!
1619
            data["bounce_extra"] = recipient.copy()
×
1620
        bounce_data.append(data)
1✔
1621

1622
        if recipient_address is None:
1!
1623
            continue
×
1624

1625
        recipient_address = parseaddr(recipient_address)[1]
1✔
1626
        recipient_domain = recipient_address.split("@")[1]
1✔
1627
        data["domain"] = recipient_domain
1✔
1628

1629
        try:
1✔
1630
            user = User.objects.get(email=recipient_address)
1✔
1631
            profile = user.profile
1✔
1632
            data["user_match"] = "found"
1✔
1633
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1634
                data["fxa_id"] = fxa.uid
1✔
1635
            else:
1636
                data["fxa_id"] = ""
1✔
1637
        except User.DoesNotExist:
1✔
1638
            # TODO: handle bounce for a user who no longer exists
1639
            # add to SES account-wide suppression list?
1640
            data["user_match"] = "missing"
1✔
1641
            continue
1✔
1642

1643
        action = None
1✔
1644
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1645
            # if an email bounced as spam, set to auto block spam for this user
1646
            # and DON'T set them into bounce pause state
1647
            action = "auto_block_spam"
1✔
1648
            profile.auto_block_spam = True
1✔
1649
        elif bounce_type == "Permanent":
1✔
1650
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1651
            action = "hard_bounce"
1✔
1652
            profile.last_hard_bounce = now
1✔
1653
        elif bounce_type == "Transient":
1!
1654
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1655
            action = "soft_bounce"
1✔
1656
            profile.last_soft_bounce = now
1✔
1657
        if action:
1!
1658
            data["relay_action"] = action
1✔
1659
            profile.save()
1✔
1660

1661
    if not bounce_data:
1!
1662
        # Data when there are no identified recipients
1663
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1664

1665
    for data in bounce_data:
1✔
1666
        tags = {
1✔
1667
            "bounce_type": bounce_type,
1668
            "bounce_subtype": bounce_subtype,
1669
            "user_match": data["user_match"],
1670
            "relay_action": data["relay_action"],
1671
        }
1672
        incr_if_enabled(
1✔
1673
            "email_bounce",
1674
            1,
1675
            tags=[generate_tag(key, val) for key, val in tags.items()],
1676
        )
1677
        info_logger.info("bounce_notification", extra=data)
1✔
1678

1679
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1680
        return HttpResponse("Address does not exist", status=404)
1✔
1681
    return HttpResponse("OK", status=200)
1✔
1682

1683

1684
def _build_disabled_mask_for_spam_email(
1✔
1685
    mask: RelayAddress | DomainAddress,
1686
) -> EmailMessage:
1687
    ctx = {"mask": mask.full_address, "SITE_ORIGIN": settings.SITE_ORIGIN}
1✔
1688
    html_body = render_to_string("emails/disabled_mask_for_spam.html", ctx)
1✔
1689
    text_body = render_to_string("emails/disabled_mask_for_spam.txt", ctx)
1✔
1690

1691
    # Create the message
1692
    msg = EmailMessage()
1✔
1693
    msg["Subject"] = ftl_bundle.format("relay-deactivated-your-mask")
1✔
1694
    msg["From"] = settings.RELAY_FROM_ADDRESS
1✔
1695
    msg["To"] = mask.user.email
1✔
1696
    msg.set_content(text_body)
1✔
1697
    msg.add_alternative(html_body, subtype="html")
1✔
1698
    return msg
1✔
1699

1700

1701
def _send_disabled_mask_for_spam_email(mask: RelayAddress | DomainAddress) -> None:
1✔
1702
    msg = _build_disabled_mask_for_spam_email(mask)
1✔
1703
    if not settings.RELAY_FROM_ADDRESS:
1!
1704
        raise ValueError(
×
1705
            "Must set settings.RELAY_FROM_ADDRESS to send disabled_mask_for_spam email."
1706
        )
1707
    try:
1✔
1708
        ses_send_raw_email(
1✔
1709
            source_address=settings.RELAY_FROM_ADDRESS,
1710
            destination_address=mask.user.email,
1711
            message=msg,
1712
        )
1713
    except ClientError as e:
×
NEW
1714
        logger.error("send_disabled_mask_ses_client_error", extra=e.response["Error"])
×
1715
    incr_if_enabled("send_disabled_mask_email", 1)
1✔
1716

1717

1718
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1719
    """
1720
    Handle an AWS SES complaint notification.
1721

1722
    This looks for Relay users in the complainedRecipients (real email address)
1723
    and the From: header (mask address). We expect both to match the same Relay user,
1724
    and return a 200. If one or the other do not match, a 404 is returned, and errors
1725
    may be logged.
1726

1727
    The first time a user complains, this sets the user's auto_block_spam flag to True.
1728

1729
    The second time a user complains, this disables the mask thru which the spam mail
1730
    was forwarded, and sends an email to the user to notify them the mask is disabled
1731
    and can be re-enabled on their dashboard.
1732

1733
    For more information on the complaint notification, see:
1734
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1735

1736
    Returns:
1737
    * 404 response if any email address does not match a user,
1738
    * 200 response if all match or none are given
1739

1740
    Emits a counter metric "email_complaint" with these tags:
1741
    * complaint_subtype: 'onaccountsuppressionlist', or 'none' if omitted
1742
    * complaint_feedback - feedback enumeration from ISP (usually 'abuse') or 'none'
1743
    * user_match: 'found' or 'no_recipients'
1744
    * relay_action: 'no_action', 'auto_block_spam', or 'disable_mask'
1745

1746
    Emits an info log "complaint_notification", same data as metric, plus:
1747
    * complaint_user_agent - identifies the client used to file the complaint
1748
    * complaint_extra - Extra data from complainedRecipients data, if any
1749
    * domain - User's domain, if an address was given
1750
    * found_in - "complained_recipients" (real email), "from_header" (email mask),
1751
      or "all" (matching records found in both)
1752
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1753
    * mask_match - "found" if "From" header contains an email mask, or "not_found"
1754
    """
1755
    complaint_data = _get_complaint_data(message_json)
1✔
1756
    complainers, unknown_count = _gather_complainers(complaint_data)
1✔
1757

1758
    # Reduce future complaints from complaining Relay users
1759
    actions: list[ComplaintAction] = []
1✔
1760
    for complainer in complainers:
1✔
1761
        action = _reduce_future_complaints(complainer)
1✔
1762
        actions.append(action)
1✔
1763

1764
        if (
1✔
1765
            flag_is_active_in_task("developer_mode", complainer["user"])
1766
            and action.mask_id
1767
        ):
1768
            _log_dev_notification(
1✔
1769
                "_handle_complaint: developer_mode",
1770
                DeveloperModeAction(mask_id=action.mask_id, action="log"),
1771
                message_json,
1772
            )
1773

1774
    # Log complaint and actions taken
1775
    if not actions:
1✔
1776
        # Log the complaint but that no action was taken
1777
        actions.append(ComplaintAction(user_match="no_recipients"))
1✔
1778
    for action in actions:
1✔
1779
        tags = [
1✔
1780
            generate_tag(key, val)
1781
            for key, val in {
1782
                "complaint_subtype": complaint_data.subtype or "none",
1783
                "complaint_feedback": complaint_data.feedback_type or "none",
1784
                "user_match": action.user_match,
1785
                "relay_action": action.relay_action,
1786
            }.items()
1787
        ]
1788
        incr_if_enabled("email_complaint", tags=tags)
1✔
1789

1790
        log_extra = {
1✔
1791
            "complaint_subtype": complaint_data.subtype or None,
1792
            "complaint_user_agent": complaint_data.user_agent or None,
1793
            "complaint_feedback": complaint_data.feedback_type or None,
1794
        }
1795
        log_extra.update(
1✔
1796
            {
1797
                key: value
1798
                for key, value in action._asdict().items()
1799
                if (value is not None and key != "mask_id")
1800
            }
1801
        )
1802
        info_logger.info("complaint_notification", extra=log_extra)
1✔
1803

1804
    if unknown_count:
1✔
1805
        return HttpResponse("Address does not exist", status=404)
1✔
1806
    return HttpResponse("OK", status=200)
1✔
1807

1808

1809
class RawComplaintData(NamedTuple):
1✔
1810
    complained_recipients: list[tuple[str, dict[str, Any]]]
1✔
1811
    from_addresses: list[str]
1✔
1812
    subtype: str
1✔
1813
    user_agent: str
1✔
1814
    feedback_type: str
1✔
1815

1816

1817
def _get_complaint_data(message_json: AWS_SNSMessageJSON) -> RawComplaintData:
1✔
1818
    """Extract complaint data from a complaint notification"""
1819
    complaint = message_json["complaint"]
1✔
1820

1821
    T = TypeVar("T")
1✔
1822

1823
    def get_or_log(
1✔
1824
        key: str, source: dict[str, T], data_type: type[T]
1825
    ) -> tuple[T, bool]:
1826
        """Get a value from a dictionary, or log if not found"""
1827
        if key in source:
1✔
1828
            return source[key], True
1✔
1829
        logger.error(
1✔
1830
            "_get_complaint_data: Unexpected message",
1831
            extra={"missing_key": key, "found_keys": ",".join(sorted(source.keys()))},
1832
        )
1833
        return data_type(), False
1✔
1834

1835
    raw_recipients, has_cr = get_or_log("complainedRecipients", complaint, list)
1✔
1836
    complained_recipients = []
1✔
1837
    no_entries = True
1✔
1838
    for entry in raw_recipients:
1✔
1839
        no_entries = False
1✔
1840
        raw_email_address, has_email = get_or_log("emailAddress", entry, str)
1✔
1841
        if has_email:
1✔
1842
            email_address = parseaddr(raw_email_address)[1]
1✔
1843
            extra = {
1✔
1844
                key: value for key, value in entry.items() if key != "emailAddress"
1845
            }
1846
            complained_recipients.append((email_address, extra))
1✔
1847
    if has_cr and no_entries:
1✔
1848
        logger.error("_get_complaint_data: Empty complainedRecipients")
1✔
1849

1850
    mail, has_mail = get_or_log("mail", message_json, dict)
1✔
1851
    if has_mail:
1✔
1852
        commonHeaders, has_ch = get_or_log("commonHeaders", mail, dict)
1✔
1853
    else:
1854
        commonHeaders, has_ch = {}, False
1✔
1855
    if has_ch:
1✔
1856
        raw_from_addresses, _ = get_or_log("from", commonHeaders, list)
1✔
1857
    else:
1858
        raw_from_addresses = []
1✔
1859
    from_addresses = [parseaddr(addr)[1] for addr in raw_from_addresses]
1✔
1860

1861
    feedback_type, _ = get_or_log("complaintFeedbackType", complaint, str)
1✔
1862

1863
    # Only present when destination is on account suppression list
1864
    subtype = complaint.get("complaintSubType", "")
1✔
1865
    # Only present for feedback reports
1866
    user_agent = complaint.get("userAgent", "")
1✔
1867

1868
    return RawComplaintData(
1✔
1869
        complained_recipients, from_addresses, subtype, user_agent, feedback_type
1870
    )
1871

1872

1873
class Complainer(TypedDict):
1✔
1874
    user: User
1✔
1875
    found_in: Literal["complained_recipients", "from_header", "all"]
1✔
1876
    domain: str
1✔
1877
    extra: dict[str, Any] | None
1✔
1878
    masks: list[RelayAddress | DomainAddress]
1✔
1879

1880

1881
def _gather_complainers(
1✔
1882
    complaint_data: RawComplaintData,
1883
) -> tuple[list[Complainer], int]:
1884
    """Fetch Relay Users and masks from the complaint data"""
1885

1886
    users: dict[int, Complainer] = {}
1✔
1887
    unknown_complainer_count = 0
1✔
1888
    for email_address, extra_data in complaint_data.complained_recipients:
1✔
1889
        local, domain = email_address.split("@", 1)
1✔
1890

1891
        # For developer mode complaint simulation, swap with developer's email
1892
        if domain == "simulator.amazonses.com" and local.startswith("complaint+"):
1✔
1893
            mask_part = local.removeprefix("complaint+")
1✔
1894
            mask_domain = get_domains_from_settings()["MOZMAIL_DOMAIN"]
1✔
1895
            if "." in mask_part:
1✔
1896
                local, subdomain = mask_part.split(".", 1)
1✔
1897
                mask_address = f"{local}@{subdomain}.{mask_domain}"
1✔
1898
            else:
1899
                mask_address = f"{mask_part}@{mask_domain}"
1✔
1900
            mask = _get_address_if_exists(mask_address)
1✔
1901
            if mask:
1✔
1902
                email_address = mask.user.email
1✔
1903
                domain = mask.user.email.split("@")[1]
1✔
1904

1905
        try:
1✔
1906
            user = User.objects.get(email=email_address)
1✔
1907
        except User.DoesNotExist:
1✔
1908
            logger.error("_gather_complainers: unknown complainedRecipient")
1✔
1909
            unknown_complainer_count += 1
1✔
1910
            continue
1✔
1911

1912
        if user.id in users:
1✔
1913
            logger.error("_gather_complainers: complainer appears twice, discarded")
1✔
1914
            continue
1✔
1915

1916
        users[user.id] = {
1✔
1917
            "user": user,
1918
            "found_in": "complained_recipients",
1919
            "domain": domain,
1920
            "extra": extra_data or None,
1921
            "masks": [],
1922
        }
1923

1924
    # Collect From: addresses and their users
1925
    unknown_sender_count = 0
1✔
1926
    for email_address in complaint_data.from_addresses:
1✔
1927
        mask = _get_address_if_exists(email_address)
1✔
1928
        if not mask:
1✔
1929
            logger.error("_gather_complainers: unknown mask, maybe deleted?")
1✔
1930
            unknown_sender_count += 1
1✔
1931
            continue
1✔
1932

1933
        if mask.user.id not in users:
1✔
1934
            # Add mask-only entry to users
1935
            users[mask.user.id] = {
1✔
1936
                "user": mask.user,
1937
                "found_in": "from_header",
1938
                "domain": mask.user.email.split("@")[1],
1939
                "extra": None,
1940
                "masks": [mask],
1941
            }
1942
            continue
1✔
1943

1944
        user_data = users[mask.user.id]
1✔
1945
        if mask in user_data["masks"]:
1✔
1946
            logger.error("_gather_complainers: mask appears twice")
1✔
1947
            continue
1✔
1948

1949
        user_data["masks"].append(mask)
1✔
1950
        if user_data["found_in"] in ("all", "complained_recipients"):
1✔
1951
            user_data["found_in"] = "all"
1✔
1952
        else:
1953
            logger.error("_gather_complainers: no complainer, multi-mask")
1✔
1954

1955
    return (list(users.values()), unknown_complainer_count + unknown_sender_count)
1✔
1956

1957

1958
class ComplaintAction(NamedTuple):
1✔
1959
    user_match: Literal["found", "no_recipients"]
1✔
1960
    relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
1✔
1961
    mask_match: Literal["found", "not_found"] = "not_found"
1✔
1962
    mask_id: str | None = None
1✔
1963
    found_in: Literal["complained_recipients", "from_header", "all"] | None = None
1✔
1964
    fxa_id: str | None = None
1✔
1965
    domain: str | None = None
1✔
1966
    complaint_extra: str | None = None
1✔
1967

1968

1969
def _reduce_future_complaints(complainer: Complainer) -> ComplaintAction:
1✔
1970
    """Take action to reduce future complaints from complaining user."""
1971

1972
    user = complainer["user"]
1✔
1973
    mask_match: Literal["found", "not_found"] = "not_found"
1✔
1974
    relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
1✔
1975
    mask_id = None
1✔
1976

1977
    if not user.profile.auto_block_spam:
1✔
1978
        relay_action = "auto_block_spam"
1✔
1979
        user.profile.auto_block_spam = True
1✔
1980
        user.profile.save()
1✔
1981

1982
    for mask in complainer["masks"]:
1✔
1983
        mask_match = "found"
1✔
1984
        mask_id = mask.metrics_id
1✔
1985
        if (
1✔
1986
            flag_is_active_in_task("disable_mask_on_complaint", user)
1987
            and mask.enabled
1988
            and relay_action != "auto_block_spam"
1989
        ):
1990
            relay_action = "disable_mask"
1✔
1991
            mask.enabled = False
1✔
1992
            mask.save()
1✔
1993
            _send_disabled_mask_for_spam_email(mask)
1✔
1994

1995
    return ComplaintAction(
1✔
1996
        user_match="found",
1997
        relay_action=relay_action,
1998
        mask_match=mask_match,
1999
        mask_id=mask_id,
2000
        fxa_id=user.profile.metrics_fxa_id,
2001
        domain=complainer["domain"],
2002
        found_in=complainer["found_in"],
2003
        complaint_extra=(
2004
            json.dumps(complainer["extra"]) if complainer["extra"] else None
2005
        ),
2006
    )
2007

2008

2009
_WAFFLE_FLAGS_INITIALIZED = False
1✔
2010

2011

2012
def init_waffle_flags() -> None:
1✔
2013
    """Initialize waffle flags for email tasks"""
2014
    global _WAFFLE_FLAGS_INITIALIZED
2015
    if _WAFFLE_FLAGS_INITIALIZED:
1✔
2016
        return
1✔
2017

2018
    flags: list[tuple[str, str]] = [
1✔
2019
        (
2020
            "disable_mask_on_complaint",
2021
            "MPP-3119: When a Relay user marks an email as spam, disable the mask.",
2022
        ),
2023
        (
2024
            "developer_mode",
2025
            "MPP-3932: Enable logging and overrides for Relay developers.",
2026
        ),
2027
    ]
2028
    waffle_flag_table = get_waffle_flag_model().objects
1✔
2029
    for name, note in flags:
1✔
2030
        waffle_flag_table.get_or_create(name=name, defaults={"note": note})
1✔
2031
    _WAFFLE_FLAGS_INITIALIZED = True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc