• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / d3128616-238d-446e-82c5-ab66cd38ceaf

09 May 2024 06:22PM CUT coverage: 84.07% (-0.6%) from 84.64%
d3128616-238d-446e-82c5-ab66cd38ceaf

push

circleci

web-flow
Merge pull request #4684 from mozilla/enable-flak8-bandit-checks-mpp-3802

fix MPP-3802: stop ignoring bandit security checks

3601 of 4734 branches covered (76.07%)

Branch coverage included in aggregate %.

74 of 158 new or added lines in 24 files covered. (46.84%)

5 existing lines in 5 files now uncovered.

14686 of 17018 relevant lines covered (86.3%)

10.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.3
/emails/views.py
1
import html
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import shlex
1✔
6
from collections import defaultdict
1✔
7
from copy import deepcopy
1✔
8
from datetime import UTC, datetime
1✔
9
from email import message_from_bytes
1✔
10
from email.iterators import _structure
1✔
11
from email.message import EmailMessage
1✔
12
from email.utils import parseaddr
1✔
13
from io import StringIO
1✔
14
from json import JSONDecodeError
1✔
15
from textwrap import dedent
1✔
16
from typing import Any, Literal
1✔
17
from urllib.parse import urlencode
1✔
18

19
from django.conf import settings
1✔
20
from django.contrib.auth.models import User
1✔
21
from django.core.exceptions import ObjectDoesNotExist
1✔
22
from django.db import transaction
1✔
23
from django.db.models import prefetch_related_objects
1✔
24
from django.http import HttpRequest, HttpResponse
1✔
25
from django.shortcuts import render
1✔
26
from django.template.loader import render_to_string
1✔
27
from django.utils.html import escape
1✔
28
from django.views.decorators.csrf import csrf_exempt
1✔
29

30
from botocore.exceptions import ClientError
1✔
31
from codetiming import Timer
1✔
32
from decouple import strtobool
1✔
33
from markus.utils import generate_tag
1✔
34
from sentry_sdk import capture_message
1✔
35
from waffle import sample_is_active
1✔
36

37
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
38
from privaterelay.utils import (
1✔
39
    flag_is_active_in_task,
40
    get_subplat_upgrade_link_by_language,
41
    glean_logger,
42
)
43

44
from .models import (
1✔
45
    CannotMakeAddressException,
46
    DeletedAddress,
47
    DomainAddress,
48
    Profile,
49
    RelayAddress,
50
    Reply,
51
    address_hash,
52
    get_domain_numerical,
53
)
54
from .policy import relay_policy
1✔
55
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
1✔
56
from .types import (
1✔
57
    AWS_MailJSON,
58
    AWS_SNSMessageJSON,
59
    EmailForwardingIssues,
60
    EmailHeaderIssues,
61
    OutgoingHeaders,
62
)
63
from .utils import (
1✔
64
    InvalidFromHeader,
65
    _get_bucket_and_key_from_s3_json,
66
    b64_lookup_key,
67
    count_all_trackers,
68
    decrypt_reply_metadata,
69
    derive_reply_keys,
70
    encrypt_reply_metadata,
71
    generate_from_header,
72
    get_domains_from_settings,
73
    get_message_content_from_s3,
74
    get_message_id_bytes,
75
    get_reply_to_address,
76
    histogram_if_enabled,
77
    incr_if_enabled,
78
    parse_email_header,
79
    remove_message_from_s3,
80
    remove_trackers,
81
    ses_send_raw_email,
82
    urlize_and_linebreaks,
83
)
84

85
logger = logging.getLogger("events")
1✔
86
info_logger = logging.getLogger("eventsinfo")
1✔
87

88

89
class ReplyHeadersNotFound(Exception):
1✔
90
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
91
        self.message = message
1✔
92

93

94
def first_time_user_test(request):
1✔
95
    """
96
    Demonstrate rendering of the "First time Relay user" email.
97
    Settings like language can be given in the querystring, otherwise settings
98
    come from a random free profile.
99
    """
100
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
101
    email_context = {
×
102
        "in_bundle_country": in_bundle_country,
103
        "SITE_ORIGIN": settings.SITE_ORIGIN,
104
    }
105
    if request.GET.get("format", "html") == "text":
×
106
        return render(
×
107
            request,
108
            "emails/first_time_user.txt",
109
            email_context,
110
            "text/plain; charset=utf-8",
111
        )
112
    return render(request, "emails/first_time_user.html", email_context)
×
113

114

115
def reply_requires_premium_test(request):
1✔
116
    """
117
    Demonstrate rendering of the "Reply requires premium" email.
118

119
    Settings like language can be given in the querystring, otherwise settings
120
    come from a random free profile.
121
    """
122
    email_context = {
1✔
123
        "sender": "test@example.com",
124
        "forwarded": True,
125
        "SITE_ORIGIN": settings.SITE_ORIGIN,
126
    }
127
    for param in request.GET:
1✔
128
        email_context[param] = request.GET.get(param)
1✔
129
        if param == "forwarded" and request.GET[param] == "True":
1✔
130
            email_context[param] = True
1✔
131

132
    for param in request.GET:
1✔
133
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
134
            return render(
1✔
135
                request,
136
                "emails/reply_requires_premium.txt",
137
                email_context,
138
                "text/plain; charset=utf-8",
139
            )
140
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
141

142

143
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
144
    # TO DO: Update with correct context when trigger is created
145
    first_forwarded_email_html = render_to_string(
×
146
        "emails/first_forwarded_email.html",
147
        {
148
            "SITE_ORIGIN": settings.SITE_ORIGIN,
149
        },
150
    )
151

152
    wrapped_email = wrap_html_email(
×
153
        first_forwarded_email_html,
154
        "en-us",
155
        True,
156
        "test@example.com",
157
        0,
158
    )
159

160
    return HttpResponse(wrapped_email)
×
161

162

163
def wrap_html_email(
1✔
164
    original_html: str,
165
    language: str,
166
    has_premium: bool,
167
    display_email: str,
168
    num_level_one_email_trackers_removed: int | None = None,
169
    tracker_report_link: str | None = None,
170
) -> str:
171
    """Add Relay banners, surveys, etc. to an HTML email"""
172
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
173
    email_context = {
1✔
174
        "original_html": original_html,
175
        "language": language,
176
        "has_premium": has_premium,
177
        "subplat_upgrade_link": subplat_upgrade_link,
178
        "display_email": display_email,
179
        "tracker_report_link": tracker_report_link,
180
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
181
        "SITE_ORIGIN": settings.SITE_ORIGIN,
182
    }
183
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
184
    # Remove empty lines
185
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
186
    return "\n".join(content_lines) + "\n"
1✔
187

188

189
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
190
    """
191
    Demonstrate rendering of forwarded HTML emails.
192

193
    Settings like language can be given in the querystring, otherwise settings
194
    come from a randomly chosen profile.
195
    """
196

197
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
198
        user_profile = None
1✔
199
    else:
200
        user_profile = Profile.objects.order_by("?").first()
1✔
201

202
    if "language" in request.GET:
1✔
203
        language = request.GET["language"]
1✔
204
    else:
205
        if user_profile is None:
1!
NEW
206
            raise ValueError("user_profile must not be None")
×
207
        language = user_profile.language
1✔
208

209
    if "has_premium" in request.GET:
1✔
210
        has_premium = strtobool(request.GET["has_premium"])
1✔
211
    else:
212
        if user_profile is None:
1!
NEW
213
            raise ValueError("user_profile must not be None")
×
214
        has_premium = user_profile.has_premium
1✔
215

216
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
217
        num_level_one_email_trackers_removed = int(
1✔
218
            request.GET["num_level_one_email_trackers_removed"]
219
        )
220
    else:
221
        num_level_one_email_trackers_removed = 0
1✔
222

223
    if "has_tracker_report_link" in request.GET:
1✔
224
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
225
    else:
226
        has_tracker_report_link = False
1✔
227
    if has_tracker_report_link:
1✔
228
        if num_level_one_email_trackers_removed:
1✔
229
            trackers = {
1✔
230
                "fake-tracker.example.com": num_level_one_email_trackers_removed
231
            }
232
        else:
233
            trackers = {}
1✔
234
        tracker_report_link = (
1✔
235
            "/tracker-report/#{"
236
            '"sender": "sender@example.com", '
237
            '"received_at": 1658434657, '
238
            f'"trackers": { json.dumps(trackers) }'
239
            "}"
240
        )
241
    else:
242
        tracker_report_link = ""
1✔
243

244
    path = "/emails/wrapped_email_test"
1✔
245
    old_query = {
1✔
246
        "language": language,
247
        "has_premium": "Yes" if has_premium else "No",
248
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
249
        "num_level_one_email_trackers_removed": str(
250
            num_level_one_email_trackers_removed
251
        ),
252
    }
253

254
    def switch_link(key, value):
1✔
255
        if old_query[key] == value:
1✔
256
            return str(value)
1✔
257
        new_query = old_query.copy()
1✔
258
        new_query[key] = value
1✔
259
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
260

261
    html_content = dedent(
1✔
262
        f"""\
263
    <p>
264
      <strong>Email rendering Test</strong>
265
    </p>
266
    <p>Settings: (<a href="{path}">clear all</a>)</p>
267
    <ul>
268
      <li>
269
        <strong>language</strong>:
270
        {escape(language)}
271
        (switch to
272
        {switch_link("language", "en-us")},
273
        {switch_link("language", "de")},
274
        {switch_link("language", "en-gb")},
275
        {switch_link("language", "fr")},
276
        {switch_link("language", "ru-ru")},
277
        {switch_link("language", "es-es")},
278
        {switch_link("language", "pt-br")},
279
        {switch_link("language", "it-it")},
280
        {switch_link("language", "en-ca")},
281
        {switch_link("language", "de-de")},
282
        {switch_link("language", "es-mx")})
283
      </li>
284
      <li>
285
        <strong>has_premium</strong>:
286
        {"Yes" if has_premium else "No"}
287
        (switch to
288
        {switch_link("has_premium", "Yes")},
289
        {switch_link("has_premium", "No")})
290
      </li>
291
      <li>
292
        <strong>has_tracker_report_link</strong>:
293
        {"Yes" if has_tracker_report_link else "No"}
294
        (switch to
295
        {switch_link("has_tracker_report_link", "Yes")},
296
        {switch_link("has_tracker_report_link", "No")})
297
      </li>
298
      <li>
299
        <strong>num_level_one_email_trackers_removed</strong>:
300
        {num_level_one_email_trackers_removed}
301
        (switch to
302
        {switch_link("num_level_one_email_trackers_removed", "0")},
303
        {switch_link("num_level_one_email_trackers_removed", "1")},
304
        {switch_link("num_level_one_email_trackers_removed", "2")})
305
      </li>
306
    </ul>
307
    """
308
    )
309

310
    wrapped_email = wrap_html_email(
1✔
311
        original_html=html_content,
312
        language=language,
313
        has_premium=has_premium,
314
        tracker_report_link=tracker_report_link,
315
        display_email="test@relay.firefox.com",
316
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
317
    )
318
    return HttpResponse(wrapped_email)
1✔
319

320

321
def _store_reply_record(
1✔
322
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
323
) -> AWS_MailJSON:
324
    # After relaying email, store a Reply record for it
325
    reply_metadata = {}
1✔
326
    for header in mail["headers"]:
1✔
327
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
328
            reply_metadata[header["name"].lower()] = header["value"]
1✔
329
    message_id_bytes = get_message_id_bytes(message_id)
1✔
330
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
331
    lookup = b64_lookup_key(lookup_key)
1✔
332
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
333
    reply_create_args: dict[str, Any] = {
1✔
334
        "lookup": lookup,
335
        "encrypted_metadata": encrypted_metadata,
336
    }
337
    if isinstance(address, DomainAddress):
1✔
338
        reply_create_args["domain_address"] = address
1✔
339
    else:
340
        if not isinstance(address, RelayAddress):
1!
NEW
341
            raise TypeError("address must be type RelayAddress")
×
342
        reply_create_args["relay_address"] = address
1✔
343
    Reply.objects.create(**reply_create_args)
1✔
344
    return mail
1✔
345

346

347
@csrf_exempt
1✔
348
def sns_inbound(request):
1✔
349
    incr_if_enabled("sns_inbound", 1)
1✔
350
    # First thing we do is verify the signature
351
    json_body = json.loads(request.body)
1✔
352
    verified_json_body = verify_from_sns(json_body)
1✔
353

354
    # Validate ARN and message type
355
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
356
    message_type = verified_json_body.get("Type", None)
1✔
357
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
358
    if error_details:
1✔
359
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
360
        return HttpResponse(error_details["error"], status=400)
1✔
361

362
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
363

364

365
def validate_sns_arn_and_type(
1✔
366
    topic_arn: str | None, message_type: str | None
367
) -> dict[str, Any] | None:
368
    """
369
    Validate Topic ARN and SNS Message Type.
370

371
    If an error is detected, the return is a dictionary of error details.
372
    If no error is detected, the return is None.
373
    """
374
    if not topic_arn:
1✔
375
        error = "Received SNS request without Topic ARN."
1✔
376
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
377
        error = "Received SNS message for wrong topic."
1✔
378
    elif not message_type:
1✔
379
        error = "Received SNS request without Message Type."
1✔
380
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
381
        error = "Received SNS message for unsupported Type."
1✔
382
    else:
383
        error = None
1✔
384

385
    if error:
1✔
386
        return {
1✔
387
            "error": error,
388
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
389
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
390
            "received_sns_type": (
391
                shlex.quote(message_type) if message_type else message_type
392
            ),
393
            "supported_sns_types": SUPPORTED_SNS_TYPES,
394
        }
395
    return None
1✔
396

397

398
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
399
    if message_type == "SubscriptionConfirmation":
×
400
        info_logger.info(
×
401
            "SNS SubscriptionConfirmation",
402
            extra={"SubscribeURL": json_body["SubscribeURL"]},
403
        )
404
        return HttpResponse("Logged SubscribeURL", status=200)
×
405
    if message_type == "Notification":
×
406
        incr_if_enabled("sns_inbound_Notification", 1)
×
407
        return _sns_notification(json_body)
×
408

409
    logger.error(
×
410
        "SNS message type did not fall under the SNS inbound logic",
411
        extra={"message_type": shlex.quote(message_type)},
412
    )
413
    capture_message(
×
414
        "Received SNS message with type not handled in inbound log",
415
        level="error",
416
        stack=True,
417
    )
418
    return HttpResponse(
×
419
        "Received SNS message with type not handled in inbound log", status=400
420
    )
421

422

423
def _sns_notification(json_body):
1✔
424
    try:
1✔
425
        message_json = json.loads(json_body["Message"])
1✔
426
    except JSONDecodeError:
1✔
427
        logger.error(
1✔
428
            "SNS notification has non-JSON message body",
429
            extra={"content": shlex.quote(json_body["Message"])},
430
        )
431
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
432

433
    event_type = message_json.get("eventType")
1✔
434
    notification_type = message_json.get("notificationType")
1✔
435
    if notification_type not in {
1✔
436
        "Complaint",
437
        "Received",
438
        "Bounce",
439
    } and event_type not in {"Complaint", "Bounce"}:
440
        logger.error(
1✔
441
            "SNS notification for unsupported type",
442
            extra={
443
                "notification_type": shlex.quote(notification_type),
444
                "event_type": shlex.quote(event_type),
445
                "keys": [shlex.quote(key) for key in message_json.keys()],
446
            },
447
        )
448
        return HttpResponse(
1✔
449
            (
450
                "Received SNS notification for unsupported Type: "
451
                f"{html.escape(shlex.quote(notification_type))}"
452
            ),
453
            status=400,
454
        )
455
    response = _sns_message(message_json)
1✔
456
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
457
    if response.status_code < 500:
1✔
458
        remove_message_from_s3(bucket, object_key)
1✔
459

460
    return response
1✔
461

462

463
def _get_recipient_with_relay_domain(recipients):
1✔
464
    domains_to_check = get_domains_from_settings().values()
1✔
465
    for recipient in recipients:
1✔
466
        for domain in domains_to_check:
1✔
467
            if domain in recipient:
1✔
468
                return recipient
1✔
469
    return None
1✔
470

471

472
def _get_relay_recipient_from_message_json(message_json):
1✔
473
    # Go thru all To, Cc, and Bcc fields and
474
    # return the one that has a Relay domain
475

476
    # First check commmon headers for to or cc match
477
    headers_to_check = "to", "cc"
1✔
478
    common_headers = message_json["mail"]["commonHeaders"]
1✔
479
    for header in headers_to_check:
1✔
480
        if header in common_headers:
1✔
481
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
482
            if recipient is not None:
1✔
483
                return parseaddr(recipient)[1]
1✔
484

485
    # SES-SNS sends bcc in a different part of the message
486
    recipients = message_json["receipt"]["recipients"]
1✔
487
    return _get_recipient_with_relay_domain(recipients)
1✔
488

489

490
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
491
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
492
    notification_type = message_json.get("notificationType")
1✔
493
    event_type = message_json.get("eventType")
1✔
494
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
495
        return _handle_bounce(message_json)
1✔
496
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
497
        return _handle_complaint(message_json)
1✔
498
    if notification_type != "Received":
1!
NEW
499
        raise ValueError('notification_type must be "Received"')
×
500
    if event_type is not None:
1!
NEW
501
        raise ValueError("event_type must be None")
×
502
    return _handle_received(message_json)
1✔
503

504

505
# Enumerate the reasons that an email was not forwarded.
506
# This excludes emails dropped due to mask forwarding settings,
507
# such as "block all" and "block promotional". Those are logged
508
# as Glean email_blocked events.
509
EmailDroppedReason = Literal[
1✔
510
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
511
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
512
    "hard_bounce_pause",  # The user recently had a hard bounce
513
    "soft_bounce_pause",  # The user recently has a soft bounce
514
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
515
    "reply_requires_premium",  # The email is a reply from a free user
516
    "content_missing",  # Could not load the email from storage
517
    "error_from_header",  # Error generating the From: header, retryable
518
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
519
    "error_sending",  # Error sending the forwarded email (SES), retryable
520
]
521

522

523
def log_email_dropped(
1✔
524
    reason: EmailDroppedReason,
525
    mask: RelayAddress | DomainAddress,
526
    is_reply: bool = False,
527
    can_retry: bool = False,
528
) -> None:
529
    """
530
    Log that an email was dropped for a reason other than a mask blocking setting.
531

532
    This mirrors the interface of glean_logger().log_email_blocked(), which
533
    records emails dropped due to the mask's blocking setting.
534
    """
535
    extra: dict[str, str | int | bool] = {"reason": reason}
1✔
536
    if mask.user.profile.metrics_enabled:
1✔
537
        if mask.user.profile.fxa is not None:
1✔
538
            extra["fxa_id"] = mask.user.profile.fxa.uid
1✔
539
        extra["mask_id"] = mask.metrics_id
1✔
540
    extra |= {
1✔
541
        "is_random_mask": isinstance(mask, RelayAddress),
542
        "is_reply": is_reply,
543
        "can_retry": can_retry,
544
    }
545
    info_logger.info("email_dropped", extra=extra)
1✔
546

547

548
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
549
    """
550
    Handle an AWS SES received notification.
551

552
    For more information, see:
553
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
554
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
555

556
    Returns (may be incomplete):
557
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
558
      flagged for abuse, the email is under a bounce pause, the email was suppressed
559
      for spam, the list email was blocked, or the noreply address was the recipient.
560
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
561
      the email failed DMARC with reject policy, or the email is a reply chain to a
562
      non-premium user.
563
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
564
      "CC", or "BCC" fields, or the Relay address is not in the database.
565
    * 503 if the "From" address is malformed, the S3 client returned an error different
566
      from "not found", or the SES client fails
567

568
    And many other returns conditions if the email is a reply. The HTTP returns are an
569
    artifact from an earlier time when emails were sent to a webhook. Currently,
570
    production instead pulls events from a queue.
571

572
    TODO: Return a more appropriate status object
573
    TODO: Document the metrics emitted
574
    """
575
    mail = message_json["mail"]
1✔
576
    if "commonHeaders" not in mail:
1✔
577
        logger.error("SNS message without commonHeaders")
1✔
578
        return HttpResponse(
1✔
579
            "Received SNS notification without commonHeaders.", status=400
580
        )
581
    common_headers = mail["commonHeaders"]
1✔
582
    receipt = message_json["receipt"]
1✔
583

584
    _record_receipt_verdicts(receipt, "all")
1✔
585
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
586
    if to_address is None:
1✔
587
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
588
        return HttpResponse("Address does not exist", status=404)
1✔
589

590
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
591
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
592
    if not from_addresses:
1✔
593
        info_logger.error(
1✔
594
            "_handle_received: no from address",
595
            extra={
596
                "source": mail["source"],
597
                "common_headers_from": common_headers["from"],
598
            },
599
        )
600
        return HttpResponse("Unable to parse From address", status=400)
1✔
601
    from_address = from_addresses[0][1]
1✔
602

603
    try:
1✔
604
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
605
    except ValueError:
1✔
606
        # TODO: Add metric
607
        return HttpResponse("Malformed to field.", status=400)
1✔
608

609
    if to_local_portion.lower() == "noreply":
1✔
610
        incr_if_enabled("email_for_noreply_address", 1)
1✔
611
        return HttpResponse("noreply address is not supported.")
1✔
612
    try:
1✔
613
        # FIXME: this ambiguous return of either
614
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
615
        # up a bit.
616
        address = _get_address(to_address)
1✔
617
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
618
        user_profile = address.user.profile
1✔
619
    except (
1✔
620
        ObjectDoesNotExist,
621
        CannotMakeAddressException,
622
        DeletedAddress.MultipleObjectsReturned,
623
    ):
624
        if to_local_portion.lower() == "replies":
1✔
625
            response = _handle_reply(from_address, message_json, to_address)
1✔
626
        else:
627
            response = HttpResponse("Address does not exist", status=404)
1✔
628
        return response
1✔
629

630
    _record_receipt_verdicts(receipt, "valid_user")
1✔
631
    # if this is spam and the user is set to auto-block spam, early return
632
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
633
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
634
        log_email_dropped(reason="auto_block_spam", mask=address)
1✔
635
        return HttpResponse("Address rejects spam.")
1✔
636

637
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
638
        policy = receipt.get("dmarcPolicy", "none")
1✔
639
        # TODO: determine action on dmarcPolicy "quarantine"
640
        if policy == "reject":
1!
641
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
1✔
642
            incr_if_enabled(
1✔
643
                "email_suppressed_for_dmarc_failure",
644
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
645
            )
646
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
647

648
    # if this user is over bounce limits, early return
649
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
650
    if bounce_paused:
1✔
651
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
652
        incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
1✔
653
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
654
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
655
        )
656
        log_email_dropped(reason=reason, mask=address)
1✔
657
        return HttpResponse("Address is temporarily disabled.")
1✔
658

659
    # check if this is a reply from an external sender to a Relay user
660
    try:
1✔
661
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
662
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
663
        user_address = address
1✔
664
        address = reply_record.address
1✔
665
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
666
        # make sure the relay user is premium
667
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
668
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
1✔
669
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
670
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
671
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
672
        # match a Reply record, continue to treat this as a regular email from
673
        # an external sender to a relay user
674
        pass
1✔
675

676
    # if account flagged for abuse, early return
677
    if user_profile.is_flagged:
1✔
678
        log_email_dropped(reason="abuse_flag", mask=address)
1✔
679
        return HttpResponse("Address is temporarily disabled.")
1✔
680

681
    # if address is set to block, early return
682
    if not address.enabled:
1✔
683
        incr_if_enabled("email_for_disabled_address", 1)
1✔
684
        address.num_blocked += 1
1✔
685
        address.save(update_fields=["num_blocked"])
1✔
686
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
687
        user_profile.last_engagement = datetime.now(UTC)
1✔
688
        user_profile.save()
1✔
689
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
690
        return HttpResponse("Address is temporarily disabled.")
1✔
691

692
    _record_receipt_verdicts(receipt, "active_alias")
1✔
693
    incr_if_enabled("email_for_active_address", 1)
1✔
694

695
    # if address is blocking list emails, and email is from list, early return
696
    if (
1✔
697
        address
698
        and address.block_list_emails
699
        and user_profile.has_premium
700
        and _check_email_from_list(mail["headers"])
701
    ):
702
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
703
        address.num_blocked += 1
1✔
704
        address.save(update_fields=["num_blocked"])
1✔
705
        user_profile.last_engagement = datetime.now(UTC)
1✔
706
        user_profile.save()
1✔
707
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
708
        return HttpResponse("Address is not accepting list emails.")
1✔
709

710
    # Collect new headers
711
    subject = common_headers.get("subject", "")
1✔
712
    destination_address = user_profile.user.email
1✔
713
    reply_address = get_reply_to_address()
1✔
714
    try:
1✔
715
        from_header = generate_from_header(from_address, to_address)
1✔
716
    except InvalidFromHeader:
1✔
717
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
718
        header_from = []
1✔
719
        for header in mail["headers"]:
1✔
720
            if header["name"].lower() == "from":
1✔
721
                header_from.append(header)
1✔
722
        info_logger.error(
1✔
723
            "generate_from_header",
724
            extra={
725
                "from_address": from_address,
726
                "source": mail["source"],
727
                "common_headers_from": common_headers["from"],
728
                "headers_from": header_from,
729
            },
730
        )
731
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
1✔
732
        return HttpResponse("Cannot parse the From address", status=503)
1✔
733

734
    headers: OutgoingHeaders = {
1✔
735
        "Subject": subject,
736
        "From": from_header,
737
        "To": destination_address,
738
        "Reply-To": reply_address,
739
        "Resent-From": from_address,
740
    }
741

742
    # Get incoming email
743
    try:
1✔
744
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
745
    except ClientError as e:
1✔
746
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
747
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
748
            log_email_dropped(reason="content_missing", mask=address)
1✔
749
            return HttpResponse("Email not in S3", status=404)
1✔
750
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
751
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
1✔
752
        # we are returning a 503 so that SNS can retry the email processing
753
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
754

755
    # Convert to new email
756
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
757
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
758
    remove_level_one_trackers = bool(
1✔
759
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
760
    )
761
    (
1✔
762
        forwarded_email,
763
        issues,
764
        level_one_trackers_removed,
765
        has_html,
766
        has_text,
767
    ) = _convert_to_forwarded_email(
768
        incoming_email_bytes=incoming_email_bytes,
769
        headers=headers,
770
        to_address=to_address,
771
        from_address=from_address,
772
        language=user_profile.language,
773
        has_premium=user_profile.has_premium,
774
        sample_trackers=sample_trackers,
775
        remove_level_one_trackers=remove_level_one_trackers,
776
    )
777
    if has_html:
1✔
778
        incr_if_enabled("email_with_html_content", 1)
1✔
779
    if has_text:
1!
780
        incr_if_enabled("email_with_text_content", 1)
1✔
781
    if issues:
1✔
782
        info_logger.warning(
1✔
783
            "_handle_received: forwarding issues", extra={"issues": issues}
784
        )
785

786
    # Send new email
787
    try:
1✔
788
        ses_response = ses_send_raw_email(
1✔
789
            source_address=reply_address,
790
            destination_address=destination_address,
791
            message=forwarded_email,
792
        )
793
    except ClientError:
1✔
794
        # 503 service unavailable reponse to SNS so it can retry
795
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
1✔
796
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
797

798
    message_id = ses_response["MessageId"]
1✔
799
    _store_reply_record(mail, message_id, address)
1✔
800

801
    user_profile.update_abuse_metric(
1✔
802
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
803
    )
804
    user_profile.last_engagement = datetime.now(UTC)
1✔
805
    user_profile.save()
1✔
806
    address.num_forwarded += 1
1✔
807
    address.last_used_at = datetime.now(UTC)
1✔
808
    if level_one_trackers_removed:
1!
809
        address.num_level_one_trackers_blocked = (
×
810
            address.num_level_one_trackers_blocked or 0
811
        ) + level_one_trackers_removed
812
    address.save(
1✔
813
        update_fields=[
814
            "num_forwarded",
815
            "last_used_at",
816
            "block_list_emails",
817
            "num_level_one_trackers_blocked",
818
        ]
819
    )
820
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
821
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
822

823

824
def _get_verdict(receipt, verdict_type):
1✔
825
    return receipt[f"{verdict_type}Verdict"]["status"]
1✔
826

827

828
def _check_email_from_list(headers):
1✔
829
    for header in headers:
1!
830
        if header["name"].lower().startswith("list-"):
1!
831
            return True
1✔
832
    return False
×
833

834

835
def _record_receipt_verdicts(receipt, state):
1✔
836
    verdict_tags = []
1✔
837
    for key in sorted(receipt.keys()):
1✔
838
        if key.endswith("Verdict"):
1✔
839
            value = receipt[key]["status"]
1✔
840
            verdict_tags.append(f"{key}:{value}")
1✔
841
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
842
        elif key == "dmarcPolicy":
1✔
843
            value = receipt[key]
1✔
844
            verdict_tags.append(f"{key}:{value}")
1✔
845
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
846

847

848
def _get_message_id_from_headers(headers):
1✔
849
    message_id = None
1✔
850
    for header in headers:
1✔
851
        if header["name"].lower() == "message-id":
1✔
852
            message_id = header["value"]
1✔
853
    return message_id
1✔
854

855

856
def _get_keys_from_headers(headers):
1✔
857
    in_reply_to = None
1✔
858
    for header in headers:
1✔
859
        if header["name"].lower() == "in-reply-to":
1✔
860
            in_reply_to = header["value"]
1✔
861
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
862
            return derive_reply_keys(message_id_bytes)
1✔
863

864
        if header["name"].lower() == "references":
1✔
865
            message_ids = header["value"]
1✔
866
            for message_id in message_ids.split(" "):
1✔
867
                message_id_bytes = get_message_id_bytes(message_id)
1✔
868
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
869
                try:
1✔
870
                    # FIXME: calling code is likely to duplicate this query
871
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
872
                    return lookup_key, encryption_key
1✔
873
                except Reply.DoesNotExist:
1✔
874
                    pass
1✔
875
            raise Reply.DoesNotExist
1✔
876
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
877
    raise ReplyHeadersNotFound
1✔
878

879

880
def _get_reply_record_from_lookup_key(lookup_key):
1✔
881
    lookup = b64_lookup_key(lookup_key)
1✔
882
    return Reply.objects.get(lookup=lookup)
1✔
883

884

885
def _strip_localpart_tag(address):
1✔
886
    [localpart, domain] = address.split("@")
1✔
887
    subaddress_parts = localpart.split("+")
1✔
888
    return f"{subaddress_parts[0]}@{domain}"
1✔
889

890

891
_TransportType = Literal["sns", "s3"]
1✔
892

893

894
def _get_email_bytes(
1✔
895
    message_json: AWS_SNSMessageJSON,
896
) -> tuple[bytes, _TransportType, float]:
897
    with Timer(logger=None) as load_timer:
1✔
898
        if "content" in message_json:
1✔
899
            # email content in sns message
900
            message_content = message_json["content"].encode("utf-8")
1✔
901
            transport: Literal["sns", "s3"] = "sns"
1✔
902
        else:
903
            # assume email content in S3
904
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
905
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
906
            transport = "s3"
1✔
907
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
908
    load_time_s = round(load_timer.last, 3)
1✔
909
    return (message_content, transport, load_time_s)
1✔
910

911

912
def _convert_to_forwarded_email(
1✔
913
    incoming_email_bytes: bytes,
914
    headers: OutgoingHeaders,
915
    to_address: str,
916
    from_address: str,
917
    language: str,
918
    has_premium: bool,
919
    sample_trackers: bool,
920
    remove_level_one_trackers: bool,
921
    now: datetime | None = None,
922
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
923
    """
924
    Convert an email (as bytes) to a forwarded email.
925

926
    Return is a tuple:
927
    - email - The forwarded email
928
    - issues - Any detected issues in conversion
929
    - level_one_trackers_removed (int) - Number of trackers removed
930
    - has_html - True if the email has an HTML representation
931
    - has_text - True if the email has a plain text representation
932
    """
933
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
934
    # python/typeshed issue 2418
935
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
936
    # policy.default.message_factory is EmailMessage
937
    if not isinstance(email, EmailMessage):
1!
NEW
938
        raise TypeError("email must be type EmailMessage")
×
939

940
    # Replace headers in the original email
941
    header_issues = _replace_headers(email, headers)
1✔
942

943
    # Find and replace text content
944
    text_body = email.get_body("plain")
1✔
945
    text_content = None
1✔
946
    has_text = False
1✔
947
    if text_body:
1!
948
        has_text = True
1✔
949
        if not isinstance(text_body, EmailMessage):
1!
NEW
950
            raise TypeError("text_body must be type EmailMessage")
×
951
        text_content = text_body.get_content()
1✔
952
        new_text_content = _convert_text_content(text_content, to_address)
1✔
953
        text_body.set_content(new_text_content)
1✔
954

955
    # Find and replace HTML content
956
    html_body = email.get_body("html")
1✔
957
    level_one_trackers_removed = 0
1✔
958
    has_html = False
1✔
959
    if html_body:
1✔
960
        has_html = True
1✔
961
        if not isinstance(html_body, EmailMessage):
1!
NEW
962
            raise TypeError("html_body must be type EmailMessage")
×
963
        html_content = html_body.get_content()
1✔
964
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
965
            html_content,
966
            to_address,
967
            from_address,
968
            language,
969
            has_premium,
970
            sample_trackers,
971
            remove_level_one_trackers,
972
        )
973
        html_body.set_content(new_content, subtype="html")
1✔
974
    elif text_content:
1!
975
        # Try to use the text content to generate HTML content
976
        html_content = urlize_and_linebreaks(text_content)
1✔
977
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
978
            html_content,
979
            to_address,
980
            from_address,
981
            language,
982
            has_premium,
983
            sample_trackers,
984
            remove_level_one_trackers,
985
        )
986
        if not isinstance(text_body, EmailMessage):
1!
NEW
987
            raise TypeError("text_body must be type EmailMessage")
×
988
        try:
1✔
989
            text_body.add_alternative(new_content, subtype="html")
1✔
990
        except TypeError as e:
×
991
            out = StringIO()
×
992
            _structure(email, fp=out)
×
993
            info_logger.error(
×
994
                "Adding HTML alternate failed",
995
                extra={"exception": str(e), "structure": out.getvalue()},
996
            )
997

998
    issues: EmailForwardingIssues = {}
1✔
999
    if header_issues:
1✔
1000
        issues["headers"] = header_issues
1✔
1001
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
1002

1003

1004
def _replace_headers(
1✔
1005
    email: EmailMessage, headers: OutgoingHeaders
1006
) -> EmailHeaderIssues:
1007
    """
1008
    Replace the headers in email with new headers.
1009

1010
    This replaces headers in the passed email object, rather than returns an altered
1011
    copy. The primary reason is that the Python email package can read an email with
1012
    non-compliant headers or content, but can't write it. A read/write is required to
1013
    create a copy that we then alter. This code instead alters the passed EmailMessage
1014
    object, making header-specific changes in try / except statements.
1015

1016
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1017
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1018
    nice to handle the non-compliant headers without crashing before we add a source of
1019
    memory-related crashes.
1020
    """
1021
    # Look for headers to drop
1022
    to_drop: list[str] = []
1✔
1023
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
1✔
1024
    issues: EmailHeaderIssues = defaultdict(list)
1✔
1025

1026
    # Detect non-compliant headers in incoming emails
1027
    for header in email.keys():
1✔
1028
        try:
1✔
1029
            value = email[header]
1✔
1030
        except Exception as e:
1✔
1031
            issues["incoming"].append((header, {"exception_on_read": repr(e)}))
1✔
1032
            value = None
1✔
1033
        if getattr(value, "defects", None):
1✔
1034
            issues["incoming"].append(
1✔
1035
                (
1036
                    header,
1037
                    {
1038
                        "defect_count": len(value.defects),
1039
                        "parsed_value": str(value),
1040
                        "unstructured_value": str(value.as_unstructured),
1041
                    },
1042
                )
1043
            )
1044

1045
    # Collect headers that will not be forwarded
1046
    for header in email.keys():
1✔
1047
        header_lower = header.lower()
1✔
1048
        if (
1✔
1049
            header_lower not in replacements
1050
            and header_lower != "mime-version"
1051
            and not header_lower.startswith("content-")
1052
        ):
1053
            to_drop.append(header)
1✔
1054

1055
    # Drop headers that should be dropped
1056
    for header in to_drop:
1✔
1057
        del email[header]
1✔
1058

1059
    # Replace the requested headers
1060
    for header, value in headers.items():
1✔
1061
        del email[header]
1✔
1062
        try:
1✔
1063
            email[header] = value
1✔
1064
        except Exception as e:
×
1065
            issues["outgoing"].append(
×
1066
                (header, {"exception_on_write": repr(e), "value": value})
1067
            )
1068
            continue
×
1069
        try:
1✔
1070
            parsed_value = email[header]
1✔
1071
        except Exception as e:
×
1072
            issues["outgoing"].append((header, {"exception_on_read": repr(e)}))
×
1073
            continue
×
1074
        if parsed_value.defects:
1!
1075
            issues["outgoing"].append(
×
1076
                (
1077
                    header,
1078
                    {
1079
                        "defect_count": len(parsed_value.defects),
1080
                        "parsed_value": str(parsed_value),
1081
                        "unstructured_value": str(parsed_value.as_unstructured),
1082
                    },
1083
                )
1084
            )
1085

1086
    return dict(issues)
1✔
1087

1088

1089
def _convert_html_content(
1✔
1090
    html_content: str,
1091
    to_address: str,
1092
    from_address: str,
1093
    language: str,
1094
    has_premium: bool,
1095
    sample_trackers: bool,
1096
    remove_level_one_trackers: bool,
1097
    now: datetime | None = None,
1098
) -> tuple[str, int]:
1099
    # frontend expects a timestamp in milliseconds
1100
    now = now or datetime.now(UTC)
1✔
1101
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1102

1103
    # scramble alias so that clients don't recognize it
1104
    # and apply default link styles
1105
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1106

1107
    # sample tracker numbers
1108
    if sample_trackers:
1!
1109
        count_all_trackers(html_content)
×
1110

1111
    tracker_report_link = ""
1✔
1112
    removed_count = 0
1✔
1113
    if remove_level_one_trackers:
1!
1114
        html_content, tracker_details = remove_trackers(
×
1115
            html_content, from_address, datetime_now_ms
1116
        )
1117
        removed_count = tracker_details["tracker_removed"]
×
1118
        tracker_report_details = {
×
1119
            "sender": from_address,
1120
            "received_at": datetime_now_ms,
1121
            "trackers": tracker_details["level_one"]["trackers"],
1122
        }
1123
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1124
            tracker_report_details
1125
        )
1126

1127
    wrapped_html = wrap_html_email(
1✔
1128
        original_html=html_content,
1129
        language=language,
1130
        has_premium=has_premium,
1131
        display_email=display_email,
1132
        tracker_report_link=tracker_report_link,
1133
        num_level_one_email_trackers_removed=removed_count,
1134
    )
1135
    return wrapped_html, removed_count
1✔
1136

1137

1138
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1139
    relay_header_text = (
1✔
1140
        "This email was sent to your alias "
1141
        f"{to_address}. To stop receiving emails sent to this alias, "
1142
        "update the forwarding settings in your dashboard.\n"
1143
        "---Begin Email---\n"
1144
    )
1145
    wrapped_text = relay_header_text + text_content
1✔
1146
    return wrapped_text
1✔
1147

1148

1149
def _build_reply_requires_premium_email(
1✔
1150
    from_address: str,
1151
    reply_record: Reply,
1152
    message_id: str | None,
1153
    decrypted_metadata: dict[str, Any] | None,
1154
) -> EmailMessage:
1155
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1156
    # will forward.  So, tell the user we forwarded it.
1157
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1158
    sender: str | None = ""
1✔
1159
    if decrypted_metadata is not None:
1!
1160
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1161
    ctx = {
1✔
1162
        "sender": sender or "",
1163
        "forwarded": forwarded,
1164
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1165
    }
1166
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1167
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1168

1169
    # Create the message
1170
    msg = EmailMessage()
1✔
1171
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1172
    msg["From"] = get_reply_to_address()
1✔
1173
    msg["To"] = from_address
1✔
1174
    if message_id:
1!
1175
        msg["In-Reply-To"] = message_id
1✔
1176
        msg["References"] = message_id
1✔
1177
    msg.set_content(text_body)
1✔
1178
    msg.add_alternative(html_body, subtype="html")
1✔
1179
    return msg
1✔
1180

1181

1182
def _set_forwarded_first_reply(profile):
1✔
1183
    profile.forwarded_first_reply = True
1✔
1184
    profile.save()
1✔
1185

1186

1187
def _send_reply_requires_premium_email(
1✔
1188
    from_address: str,
1189
    reply_record: Reply,
1190
    message_id: str | None,
1191
    decrypted_metadata: dict[str, Any] | None,
1192
) -> None:
1193
    msg = _build_reply_requires_premium_email(
×
1194
        from_address, reply_record, message_id, decrypted_metadata
1195
    )
1196
    try:
×
1197
        ses_send_raw_email(
×
1198
            source_address=get_reply_to_address(premium=False),
1199
            destination_address=from_address,
1200
            message=msg,
1201
        )
1202
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1203
        # So, updated the DB.
1204
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1205
    except ClientError as e:
×
1206
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1207
    incr_if_enabled("free_user_reply_attempt", 1)
×
1208

1209

1210
def _reply_allowed(
1✔
1211
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1212
):
1213
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1214
    reply_record_email = reply_record.address.user.email
1✔
1215
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1216
    if (from_address == reply_record_email) or (
1!
1217
        stripped_from_address == stripped_reply_record_address
1218
    ):
1219
        # This is a Relay user replying to an external sender;
1220

1221
        if reply_record.profile.is_flagged:
1!
1222
            return False
×
1223

1224
        if reply_record.owner_has_premium:
1!
1225
            return True
1✔
1226

1227
        # if we haven't forwarded a first reply for this user, return True to allow
1228
        # this first reply
1229
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
1230
        _send_reply_requires_premium_email(
×
1231
            from_address, reply_record, message_id, decrypted_metadata
1232
        )
1233
        return allow_first_reply
×
1234
    else:
1235
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1236
        # premium Relay user
1237
        try:
×
1238
            address = _get_address(to_address)
×
1239
            if address.user.profile.has_premium:
×
1240
                return True
×
1241
        except ObjectDoesNotExist:
×
1242
            return False
×
1243
    incr_if_enabled("free_user_reply_attempt", 1)
×
1244
    return False
×
1245

1246

1247
def _handle_reply(
1✔
1248
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1249
) -> HttpResponse:
1250
    """
1251
    Handle a reply from a Relay user to an external email.
1252

1253
    Returns (may be incomplete):
1254
    * 200 if the reply was sent
1255
    * 400 if the In-Reply-To and References headers are missing, none of the References
1256
      headers are a reply record, or the SES client raises an error
1257
    * 403 if the Relay user is not allowed to reply
1258
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1259
      the database
1260
    * 503 if the S3 client returns an error (other than not found), or the SES client
1261
      returns an error
1262

1263
    TODO: Return a more appropriate status object (see _handle_received)
1264
    TODO: Document metrics emitted
1265
    """
1266
    mail = message_json["mail"]
1✔
1267
    try:
1✔
1268
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1269
    except ReplyHeadersNotFound:
1✔
1270
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1271
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1272

1273
    try:
1✔
1274
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1275
    except Reply.DoesNotExist:
1✔
1276
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1277
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1278

1279
    address = reply_record.address
1✔
1280
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1281
    decrypted_metadata = json.loads(
1✔
1282
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1283
    )
1284
    if not _reply_allowed(
1✔
1285
        from_address, to_address, reply_record, message_id, decrypted_metadata
1286
    ):
1287
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
1✔
1288
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1289

1290
    outbound_from_address = address.full_address
1✔
1291
    incr_if_enabled("reply_email", 1)
1✔
1292
    subject = mail["commonHeaders"].get("subject", "")
1✔
1293
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1294
    headers: OutgoingHeaders = {
1✔
1295
        "Subject": subject,
1296
        "From": outbound_from_address,
1297
        "To": to_address,
1298
        "Reply-To": outbound_from_address,
1299
    }
1300

1301
    try:
1✔
1302
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1303
    except ClientError as e:
1✔
1304
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1305
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1306
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
1✔
1307
            return HttpResponse("Email not in S3", status=404)
1✔
1308
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1309
        log_email_dropped(
1✔
1310
            reason="error_storage", mask=address, is_reply=True, can_retry=True
1311
        )
1312
        # we are returning a 500 so that SNS can retry the email processing
1313
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1314

1315
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1316
    if not isinstance(email, EmailMessage):
1!
NEW
1317
        raise TypeError("email must be type EmailMessage")
×
1318

1319
    # Convert to a reply email
1320
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1321
    _replace_headers(email, headers)
1✔
1322

1323
    try:
1✔
1324
        ses_send_raw_email(
1✔
1325
            source_address=outbound_from_address,
1326
            destination_address=to_address,
1327
            message=email,
1328
        )
1329
    except ClientError:
1✔
1330
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
1✔
1331
        return HttpResponse("SES client error", status=400)
1✔
1332

1333
    reply_record.increment_num_replied()
1✔
1334
    profile = address.user.profile
1✔
1335
    profile.update_abuse_metric(replied=True)
1✔
1336
    profile.last_engagement = datetime.now(UTC)
1✔
1337
    profile.save()
1✔
1338
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1339
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1340

1341

1342
def _get_domain_address(local_portion: str, domain_portion: str) -> DomainAddress:
1✔
1343
    """
1344
    Find or create the DomainAddress for the parts of an email address.
1345

1346
    If the domain_portion is for a valid subdomain, a new DomainAddress
1347
    will be created and returned.
1348

1349
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1350

1351
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1352
    """
1353

1354
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1355
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1356
        incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1357
        raise ObjectDoesNotExist("Address does not exist")
1✔
1358
    try:
1✔
1359
        with transaction.atomic():
1✔
1360
            locked_profile = Profile.objects.select_for_update().get(
1✔
1361
                subdomain=address_subdomain
1362
            )
1363
            domain_numerical = get_domain_numerical(address_domain)
1✔
1364
            # filter DomainAddress because it may not exist
1365
            # which will throw an error with get()
1366
            domain_address = DomainAddress.objects.filter(
1✔
1367
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1368
            ).first()
1369
            if domain_address is None:
1✔
1370
                # TODO: Consider flows when a user generating alias on a fly
1371
                # was unable to receive an email due to user no longer being a
1372
                # premium user as seen in exception thrown on make_domain_address
1373
                domain_address = DomainAddress.make_domain_address(
1✔
1374
                    locked_profile, local_portion, True
1375
                )
1376
                glean_logger().log_email_mask_created(
1✔
1377
                    mask=domain_address,
1378
                    created_by_api=False,
1379
                )
1380
            domain_address.last_used_at = datetime.now(UTC)
1✔
1381
            domain_address.save()
1✔
1382
            return domain_address
1✔
1383
    except Profile.DoesNotExist as e:
1✔
1384
        incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1385
        raise e
1✔
1386

1387

1388
def _get_address(address: str) -> RelayAddress | DomainAddress:
1✔
1389
    """
1390
    Find or create the RelayAddress or DomainAddress for an email address.
1391

1392
    If an unknown email address is for a valid subdomain, a new DomainAddress
1393
    will be created.
1394

1395
    On failure, raises exception based on Django's ObjectDoesNotExist:
1396
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1397
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1398
    * ObjectDoesNotExist - Unknown domain
1399
    """
1400

1401
    local_portion, domain_portion = address.split("@")
1✔
1402
    local_address = local_portion.lower()
1✔
1403
    domain = domain_portion.lower()
1✔
1404

1405
    # if the domain is not the site's 'top' relay domain,
1406
    # it may be for a user's subdomain
1407
    email_domains = get_domains_from_settings().values()
1✔
1408
    if domain not in email_domains:
1✔
1409
        return _get_domain_address(local_address, domain)
1✔
1410

1411
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1412
    try:
1✔
1413
        domain_numerical = get_domain_numerical(domain)
1✔
1414
        relay_address = RelayAddress.objects.get(
1✔
1415
            address=local_address, domain=domain_numerical
1416
        )
1417
        return relay_address
1✔
1418
    except RelayAddress.DoesNotExist as e:
1✔
1419
        try:
1✔
1420
            DeletedAddress.objects.get(
1✔
1421
                address_hash=address_hash(local_address, domain=domain)
1422
            )
1423
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1424
            # TODO: create a hard bounce receipt rule in SES
1425
        except DeletedAddress.DoesNotExist:
1✔
1426
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1427
        except DeletedAddress.MultipleObjectsReturned:
1✔
1428
            # not sure why this happens on stage but let's handle it
1429
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1430
        raise e
1✔
1431

1432

1433
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1434
    """
1435
    Handle an AWS SES bounce notification.
1436

1437
    For more information, see:
1438
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1439

1440
    Returns:
1441
    * 404 response if any email address does not match a user,
1442
    * 200 response if all match or none are given
1443

1444
    Emits a counter metric "email_bounce" with these tags:
1445
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1446
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1447
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1448
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1449

1450
    Emits an info log "bounce_notification", same data as metric, plus:
1451
    * bounce_action: 'action' from bounced recipient data, or None
1452
    * bounce_status: 'status' from bounced recipient data, or None
1453
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1454
    * bounce_extra: Extra data from bounce_recipient data, if any
1455
    * domain: User's real email address domain, if an address was given
1456
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1457
    """
1458
    bounce = message_json.get("bounce", {})
1✔
1459
    bounce_type = bounce.get("bounceType", "none")
1✔
1460
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1461
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1462

1463
    now = datetime.now(UTC)
1✔
1464
    bounce_data = []
1✔
1465
    for recipient in bounced_recipients:
1✔
1466
        recipient_address = recipient.pop("emailAddress", None)
1✔
1467
        data = {
1✔
1468
            "bounce_type": bounce_type,
1469
            "bounce_subtype": bounce_subtype,
1470
            "bounce_action": recipient.pop("action", ""),
1471
            "bounce_status": recipient.pop("status", ""),
1472
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1473
            "user_match": "no_address",
1474
            "relay_action": "no_action",
1475
        }
1476
        if recipient:
1!
1477
            data["bounce_extra"] = recipient.copy()
×
1478
        bounce_data.append(data)
1✔
1479

1480
        if recipient_address is None:
1!
1481
            continue
×
1482

1483
        recipient_address = parseaddr(recipient_address)[1]
1✔
1484
        recipient_domain = recipient_address.split("@")[1]
1✔
1485
        data["domain"] = recipient_domain
1✔
1486

1487
        try:
1✔
1488
            user = User.objects.get(email=recipient_address)
1✔
1489
            profile = user.profile
1✔
1490
            data["user_match"] = "found"
1✔
1491
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1492
                data["fxa_id"] = fxa.uid
1✔
1493
            else:
1494
                data["fxa_id"] = ""
1✔
1495
        except User.DoesNotExist:
1✔
1496
            # TODO: handle bounce for a user who no longer exists
1497
            # add to SES account-wide suppression list?
1498
            data["user_match"] = "missing"
1✔
1499
            continue
1✔
1500

1501
        action = None
1✔
1502
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1503
            # if an email bounced as spam, set to auto block spam for this user
1504
            # and DON'T set them into bounce pause state
1505
            action = "auto_block_spam"
1✔
1506
            profile.auto_block_spam = True
1✔
1507
        elif bounce_type == "Permanent":
1✔
1508
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1509
            action = "hard_bounce"
1✔
1510
            profile.last_hard_bounce = now
1✔
1511
        elif bounce_type == "Transient":
1!
1512
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1513
            action = "soft_bounce"
1✔
1514
            profile.last_soft_bounce = now
1✔
1515
        if action:
1!
1516
            data["relay_action"] = action
1✔
1517
            profile.save()
1✔
1518

1519
    if not bounce_data:
1!
1520
        # Data when there are no identified recipients
1521
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1522

1523
    for data in bounce_data:
1✔
1524
        tags = {
1✔
1525
            "bounce_type": bounce_type,
1526
            "bounce_subtype": bounce_subtype,
1527
            "user_match": data["user_match"],
1528
            "relay_action": data["relay_action"],
1529
        }
1530
        incr_if_enabled(
1✔
1531
            "email_bounce",
1532
            1,
1533
            tags=[generate_tag(key, val) for key, val in tags.items()],
1534
        )
1535
        info_logger.info("bounce_notification", extra=data)
1✔
1536

1537
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1538
        return HttpResponse("Address does not exist", status=404)
1✔
1539
    return HttpResponse("OK", status=200)
1✔
1540

1541

1542
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1543
    """
1544
    Handle an AWS SES complaint notification.
1545

1546
    For more information, see:
1547
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1548

1549
    Returns:
1550
    * 404 response if any email address does not match a user,
1551
    * 200 response if all match or none are given
1552

1553
    Emits a counter metric "email_complaint" with these tags:
1554
    * complaint_subtype: 'onaccounsuppressionlist', or 'none' if omitted
1555
    * complaint_feedback - feedback enumeration from ISP or 'none'
1556
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1557
    * relay_action: 'no_action', 'auto_block_spam'
1558

1559
    Emits an info log "complaint_notification", same data as metric, plus:
1560
    * complaint_user_agent - identifies the client used to file the complaint
1561
    * complaint_extra - Extra data from complainedRecipients data, if any
1562
    * domain - User's domain, if an address was given
1563
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1564
    """
1565
    complaint = deepcopy(message_json.get("complaint", {}))
1✔
1566
    complained_recipients = complaint.pop("complainedRecipients", [])
1✔
1567
    subtype = complaint.pop("complaintSubType", None)
1✔
1568
    user_agent = complaint.pop("userAgent", None)
1✔
1569
    feedback = complaint.pop("complaintFeedbackType", None)
1✔
1570

1571
    complaint_data = []
1✔
1572
    for recipient in complained_recipients:
1✔
1573
        recipient_address = recipient.pop("emailAddress", None)
1✔
1574
        data = {
1✔
1575
            "complaint_subtype": subtype,
1576
            "complaint_user_agent": user_agent,
1577
            "complaint_feedback": feedback,
1578
            "user_match": "no_address",
1579
            "relay_action": "no_action",
1580
        }
1581
        if recipient:
1!
1582
            data["complaint_extra"] = recipient.copy()
×
1583
        complaint_data.append(data)
1✔
1584

1585
        if recipient_address is None:
1!
1586
            continue
×
1587

1588
        recipient_address = parseaddr(recipient_address)[1]
1✔
1589
        recipient_domain = recipient_address.split("@")[1]
1✔
1590
        data["domain"] = recipient_domain
1✔
1591

1592
        try:
1✔
1593
            user = User.objects.get(email=recipient_address)
1✔
1594
            profile = user.profile
1✔
1595
            data["user_match"] = "found"
1✔
1596
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1597
                data["fxa_id"] = fxa.uid
1✔
1598
            else:
1599
                data["fxa_id"] = ""
1✔
1600
        except User.DoesNotExist:
×
1601
            data["user_match"] = "missing"
×
1602
            continue
×
1603

1604
        data["relay_action"] = "auto_block_spam"
1✔
1605
        profile.auto_block_spam = True
1✔
1606
        profile.save()
1✔
1607

1608
    if not complaint_data:
1!
1609
        # Data when there are no identified recipients
1610
        complaint_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1611

1612
    for data in complaint_data:
1✔
1613
        tags = {
1✔
1614
            "complaint_subtype": subtype or "none",
1615
            "complaint_feedback": feedback or "none",
1616
            "user_match": data["user_match"],
1617
            "relay_action": data["relay_action"],
1618
        }
1619
        incr_if_enabled(
1✔
1620
            "email_complaint",
1621
            1,
1622
            tags=[generate_tag(key, val) for key, val in tags.items()],
1623
        )
1624
        info_logger.info("complaint_notification", extra=data)
1✔
1625

1626
    if any(data["user_match"] == "missing" for data in complaint_data):
1!
1627
        return HttpResponse("Address does not exist", status=404)
×
1628
    return HttpResponse("OK", status=200)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc