• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / b8d90ede-8701-4f68-8eed-08f4f47d78e7

11 Apr 2024 02:37PM CUT coverage: 75.54% (+0.004%) from 75.536%
b8d90ede-8701-4f68-8eed-08f4f47d78e7

push

circleci

web-flow
Merge pull request #4573 from mozilla/more-complaint-bounce-logs-3784

MPP-3784: Add FxA ID to logs for bounces and complaints

2443 of 3405 branches covered (71.75%)

Branch coverage included in aggregate %.

6 of 6 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

6794 of 8823 relevant lines covered (77.0%)

20.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/emails/views.py
1
from collections import defaultdict
1✔
2
from copy import deepcopy
1✔
3
from datetime import datetime, timezone
1✔
4
from email import message_from_bytes
1✔
5
from email.iterators import _structure
1✔
6
from email.message import EmailMessage
1✔
7
from email.utils import parseaddr
1✔
8
import html
1✔
9
from io import StringIO
1✔
10
import json
1✔
11
from json import JSONDecodeError
1✔
12
import logging
1✔
13
import re
1✔
14
import shlex
1✔
15
from textwrap import dedent
1✔
16
from typing import Any, Literal
1✔
17
from urllib.parse import urlencode
1✔
18

19
from botocore.exceptions import ClientError
1✔
20
from codetiming import Timer
1✔
21
from decouple import strtobool
1✔
22
from django.shortcuts import render
1✔
23
from sentry_sdk import capture_message
1✔
24
from markus.utils import generate_tag
1✔
25
from waffle import sample_is_active
1✔
26

27
from django.conf import settings
1✔
28
from django.contrib.auth.models import User
1✔
29
from django.core.exceptions import ObjectDoesNotExist
1✔
30
from django.db import transaction
1✔
31
from django.db.models import prefetch_related_objects
1✔
32
from django.http import HttpRequest, HttpResponse
1✔
33
from django.template.loader import render_to_string
1✔
34
from django.utils.html import escape
1✔
35
from django.views.decorators.csrf import csrf_exempt
1✔
36

37
from privaterelay.utils import get_subplat_upgrade_link_by_language, glean_logger
1✔
38

39

40
from .models import (
1✔
41
    CannotMakeAddressException,
42
    DeletedAddress,
43
    DomainAddress,
44
    Profile,
45
    RelayAddress,
46
    Reply,
47
    address_hash,
48
    get_domain_numerical,
49
)
50
from .policy import relay_policy
1✔
51
from .types import (
1✔
52
    AWS_MailJSON,
53
    AWS_SNSMessageJSON,
54
    OutgoingHeaders,
55
    EmailForwardingIssues,
56
    EmailHeaderIssues,
57
)
58
from .utils import (
1✔
59
    _get_bucket_and_key_from_s3_json,
60
    b64_lookup_key,
61
    count_all_trackers,
62
    decrypt_reply_metadata,
63
    derive_reply_keys,
64
    encrypt_reply_metadata,
65
    generate_from_header,
66
    get_domains_from_settings,
67
    get_message_content_from_s3,
68
    get_message_id_bytes,
69
    get_reply_to_address,
70
    histogram_if_enabled,
71
    incr_if_enabled,
72
    remove_message_from_s3,
73
    remove_trackers,
74
    ses_send_raw_email,
75
    urlize_and_linebreaks,
76
    InvalidFromHeader,
77
    parse_email_header,
78
)
79
from .sns import verify_from_sns, SUPPORTED_SNS_TYPES
1✔
80

81
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
82
from privaterelay.utils import flag_is_active_in_task
1✔
83

84
logger = logging.getLogger("events")
1✔
85
info_logger = logging.getLogger("eventsinfo")
1✔
86

87

88
class ReplyHeadersNotFound(Exception):
1✔
89
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
90
        self.message = message
1✔
91

92

93
def first_time_user_test(request):
1✔
94
    """
95
    Demonstrate rendering of the "First time Relay user" email.
96
    Settings like language can be given in the querystring, otherwise settings
97
    come from a random free profile.
98
    """
99
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
100
    email_context = {
×
101
        "in_bundle_country": in_bundle_country,
102
        "SITE_ORIGIN": settings.SITE_ORIGIN,
103
    }
104
    if request.GET.get("format", "html") == "text":
×
105
        return render(
×
106
            request,
107
            "emails/first_time_user.txt",
108
            email_context,
109
            "text/plain; charset=utf-8",
110
        )
111
    return render(request, "emails/first_time_user.html", email_context)
×
112

113

114
def reply_requires_premium_test(request):
1✔
115
    """
116
    Demonstrate rendering of the "Reply requires premium" email.
117

118
    Settings like language can be given in the querystring, otherwise settings
119
    come from a random free profile.
120
    """
121
    email_context = {
1✔
122
        "sender": "test@example.com",
123
        "forwarded": True,
124
        "SITE_ORIGIN": settings.SITE_ORIGIN,
125
    }
126
    for param in request.GET:
1✔
127
        email_context[param] = request.GET.get(param)
1✔
128
        if param == "forwarded" and request.GET[param] == "True":
1✔
129
            email_context[param] = True
1✔
130

131
    for param in request.GET:
1✔
132
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
133
            return render(
1✔
134
                request,
135
                "emails/reply_requires_premium.txt",
136
                email_context,
137
                "text/plain; charset=utf-8",
138
            )
139
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
140

141

142
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
143
    # TO DO: Update with correct context when trigger is created
144
    first_forwarded_email_html = render_to_string(
×
145
        "emails/first_forwarded_email.html",
146
        {
147
            "SITE_ORIGIN": settings.SITE_ORIGIN,
148
        },
149
    )
150

151
    wrapped_email = wrap_html_email(
×
152
        first_forwarded_email_html,
153
        "en-us",
154
        True,
155
        "test@example.com",
156
        0,
157
    )
158

159
    return HttpResponse(wrapped_email)
×
160

161

162
def wrap_html_email(
1✔
163
    original_html: str,
164
    language: str,
165
    has_premium: bool,
166
    display_email: str,
167
    num_level_one_email_trackers_removed: int | None = None,
168
    tracker_report_link: str | None = None,
169
) -> str:
170
    """Add Relay banners, surveys, etc. to an HTML email"""
171
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
172
    email_context = {
1✔
173
        "original_html": original_html,
174
        "language": language,
175
        "has_premium": has_premium,
176
        "subplat_upgrade_link": subplat_upgrade_link,
177
        "display_email": display_email,
178
        "tracker_report_link": tracker_report_link,
179
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
180
        "SITE_ORIGIN": settings.SITE_ORIGIN,
181
    }
182
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
183
    # Remove empty lines
184
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
185
    return "\n".join(content_lines) + "\n"
1✔
186

187

188
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
189
    """
190
    Demonstrate rendering of forwarded HTML emails.
191

192
    Settings like language can be given in the querystring, otherwise settings
193
    come from a randomly chosen profile.
194
    """
195

196
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
197
        user_profile = None
1✔
198
    else:
199
        user_profile = Profile.objects.order_by("?").first()
1✔
200

201
    if "language" in request.GET:
1✔
202
        language = request.GET["language"]
1✔
203
    else:
204
        assert user_profile is not None
1✔
205
        language = user_profile.language
1✔
206

207
    if "has_premium" in request.GET:
1✔
208
        has_premium = strtobool(request.GET["has_premium"])
1✔
209
    else:
210
        assert user_profile is not None
1✔
211
        has_premium = user_profile.has_premium
1✔
212

213
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
214
        num_level_one_email_trackers_removed = int(
1✔
215
            request.GET["num_level_one_email_trackers_removed"]
216
        )
217
    else:
218
        num_level_one_email_trackers_removed = 0
1✔
219

220
    if "has_tracker_report_link" in request.GET:
1✔
221
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
222
    else:
223
        has_tracker_report_link = False
1✔
224
    if has_tracker_report_link:
1✔
225
        if num_level_one_email_trackers_removed:
1✔
226
            trackers = {
1✔
227
                "fake-tracker.example.com": num_level_one_email_trackers_removed
228
            }
229
        else:
230
            trackers = {}
1✔
231
        tracker_report_link = (
1✔
232
            "/tracker-report/#{"
233
            '"sender": "sender@example.com", '
234
            '"received_at": 1658434657, '
235
            f'"trackers": { json.dumps(trackers) }'
236
            "}"
237
        )
238
    else:
239
        tracker_report_link = ""
1✔
240

241
    path = "/emails/wrapped_email_test"
1✔
242
    old_query = {
1✔
243
        "language": language,
244
        "has_premium": "Yes" if has_premium else "No",
245
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
246
        "num_level_one_email_trackers_removed": str(
247
            num_level_one_email_trackers_removed
248
        ),
249
    }
250

251
    def switch_link(key, value):
1✔
252
        if old_query[key] == value:
1✔
253
            return str(value)
1✔
254
        new_query = old_query.copy()
1✔
255
        new_query[key] = value
1✔
256
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
257

258
    html_content = dedent(
1✔
259
        f"""\
260
    <p>
261
      <strong>Email rendering Test</strong>
262
    </p>
263
    <p>Settings: (<a href="{path}">clear all</a>)</p>
264
    <ul>
265
      <li>
266
        <strong>language</strong>:
267
        {escape(language)}
268
        (switch to
269
        {switch_link("language", "en-us")},
270
        {switch_link("language", "de")},
271
        {switch_link("language", "en-gb")},
272
        {switch_link("language", "fr")},
273
        {switch_link("language", "ru-ru")},
274
        {switch_link("language", "es-es")},
275
        {switch_link("language", "pt-br")},
276
        {switch_link("language", "it-it")},
277
        {switch_link("language", "en-ca")},
278
        {switch_link("language", "de-de")},
279
        {switch_link("language", "es-mx")})
280
      </li>
281
      <li>
282
        <strong>has_premium</strong>:
283
        {"Yes" if has_premium else "No"}
284
        (switch to
285
        {switch_link("has_premium", "Yes")},
286
        {switch_link("has_premium", "No")})
287
      </li>
288
      <li>
289
        <strong>has_tracker_report_link</strong>:
290
        {"Yes" if has_tracker_report_link else "No"}
291
        (switch to
292
        {switch_link("has_tracker_report_link", "Yes")},
293
        {switch_link("has_tracker_report_link", "No")})
294
      </li>
295
      <li>
296
        <strong>num_level_one_email_trackers_removed</strong>:
297
        {num_level_one_email_trackers_removed}
298
        (switch to
299
        {switch_link("num_level_one_email_trackers_removed", "0")},
300
        {switch_link("num_level_one_email_trackers_removed", "1")},
301
        {switch_link("num_level_one_email_trackers_removed", "2")})
302
      </li>
303
    </ul>
304
    """
305
    )
306

307
    wrapped_email = wrap_html_email(
1✔
308
        original_html=html_content,
309
        language=language,
310
        has_premium=has_premium,
311
        tracker_report_link=tracker_report_link,
312
        display_email="test@relay.firefox.com",
313
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
314
    )
315
    return HttpResponse(wrapped_email)
1✔
316

317

318
def _store_reply_record(
1✔
319
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
320
) -> AWS_MailJSON:
321
    # After relaying email, store a Reply record for it
322
    reply_metadata = {}
1✔
323
    for header in mail["headers"]:
1✔
324
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
325
            reply_metadata[header["name"].lower()] = header["value"]
1✔
326
    message_id_bytes = get_message_id_bytes(message_id)
1✔
327
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
328
    lookup = b64_lookup_key(lookup_key)
1✔
329
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
330
    reply_create_args: dict[str, Any] = {
1✔
331
        "lookup": lookup,
332
        "encrypted_metadata": encrypted_metadata,
333
    }
334
    if isinstance(address, DomainAddress):
1✔
335
        reply_create_args["domain_address"] = address
1✔
336
    else:
337
        assert isinstance(address, RelayAddress)
1✔
338
        reply_create_args["relay_address"] = address
1✔
339
    Reply.objects.create(**reply_create_args)
1✔
340
    return mail
1✔
341

342

343
@csrf_exempt
1✔
344
def sns_inbound(request):
1✔
345
    incr_if_enabled("sns_inbound", 1)
1✔
346
    # First thing we do is verify the signature
347
    json_body = json.loads(request.body)
1✔
348
    verified_json_body = verify_from_sns(json_body)
1✔
349

350
    # Validate ARN and message type
351
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
352
    message_type = verified_json_body.get("Type", None)
1✔
353
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
354
    if error_details:
1✔
355
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
356
        return HttpResponse(error_details["error"], status=400)
1✔
357

358
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
359

360

361
def validate_sns_arn_and_type(
1✔
362
    topic_arn: str | None, message_type: str | None
363
) -> dict[str, Any] | None:
364
    """
365
    Validate Topic ARN and SNS Message Type.
366

367
    If an error is detected, the return is a dictionary of error details.
368
    If no error is detected, the return is None.
369
    """
370
    if not topic_arn:
1✔
371
        error = "Received SNS request without Topic ARN."
1✔
372
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
373
        error = "Received SNS message for wrong topic."
1✔
374
    elif not message_type:
1✔
375
        error = "Received SNS request without Message Type."
1✔
376
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
377
        error = "Received SNS message for unsupported Type."
1✔
378
    else:
379
        error = None
1✔
380

381
    if error:
1✔
382
        return {
1✔
383
            "error": error,
384
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
385
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
386
            "received_sns_type": (
387
                shlex.quote(message_type) if message_type else message_type
388
            ),
389
            "supported_sns_types": SUPPORTED_SNS_TYPES,
390
        }
391
    return None
1✔
392

393

394
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
395
    if message_type == "SubscriptionConfirmation":
×
396
        info_logger.info(
×
397
            "SNS SubscriptionConfirmation",
398
            extra={"SubscribeURL": json_body["SubscribeURL"]},
399
        )
400
        return HttpResponse("Logged SubscribeURL", status=200)
×
401
    if message_type == "Notification":
×
402
        incr_if_enabled("sns_inbound_Notification", 1)
×
403
        return _sns_notification(json_body)
×
404

405
    logger.error(
×
406
        "SNS message type did not fall under the SNS inbound logic",
407
        extra={"message_type": shlex.quote(message_type)},
408
    )
409
    capture_message(
×
410
        "Received SNS message with type not handled in inbound log",
411
        level="error",
412
        stack=True,
413
    )
414
    return HttpResponse(
×
415
        "Received SNS message with type not handled in inbound log", status=400
416
    )
417

418

419
def _sns_notification(json_body):
1✔
420
    try:
1✔
421
        message_json = json.loads(json_body["Message"])
1✔
422
    except JSONDecodeError:
1✔
423
        logger.error(
1✔
424
            "SNS notification has non-JSON message body",
425
            extra={"content": shlex.quote(json_body["Message"])},
426
        )
427
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
428

429
    event_type = message_json.get("eventType")
1✔
430
    notification_type = message_json.get("notificationType")
1✔
431
    if notification_type not in {
1✔
432
        "Complaint",
433
        "Received",
434
        "Bounce",
435
    } and event_type not in {"Complaint", "Bounce"}:
436
        logger.error(
1✔
437
            "SNS notification for unsupported type",
438
            extra={
439
                "notification_type": shlex.quote(notification_type),
440
                "event_type": shlex.quote(event_type),
441
                "keys": [shlex.quote(key) for key in message_json.keys()],
442
            },
443
        )
444
        return HttpResponse(
1✔
445
            "Received SNS notification for unsupported Type: %s"
446
            % html.escape(shlex.quote(notification_type)),
447
            status=400,
448
        )
449
    response = _sns_message(message_json)
1✔
450
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
451
    if response.status_code < 500:
1✔
452
        remove_message_from_s3(bucket, object_key)
1✔
453

454
    return response
1✔
455

456

457
def _get_recipient_with_relay_domain(recipients):
1✔
458
    domains_to_check = get_domains_from_settings().values()
1✔
459
    for recipient in recipients:
1✔
460
        for domain in domains_to_check:
1✔
461
            if domain in recipient:
1✔
462
                return recipient
1✔
463
    return None
1✔
464

465

466
def _get_relay_recipient_from_message_json(message_json):
1✔
467
    # Go thru all To, Cc, and Bcc fields and
468
    # return the one that has a Relay domain
469

470
    # First check commmon headers for to or cc match
471
    headers_to_check = "to", "cc"
1✔
472
    common_headers = message_json["mail"]["commonHeaders"]
1✔
473
    for header in headers_to_check:
1✔
474
        if header in common_headers:
1✔
475
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
476
            if recipient is not None:
1✔
477
                return parseaddr(recipient)[1]
1✔
478

479
    # SES-SNS sends bcc in a different part of the message
480
    recipients = message_json["receipt"]["recipients"]
1✔
481
    return _get_recipient_with_relay_domain(recipients)
1✔
482

483

484
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
485
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
486
    notification_type = message_json.get("notificationType")
1✔
487
    event_type = message_json.get("eventType")
1✔
488
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
489
        return _handle_bounce(message_json)
1✔
490
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
491
        return _handle_complaint(message_json)
1✔
492
    assert notification_type == "Received" and event_type is None
1✔
493
    return _handle_received(message_json)
1✔
494

495

496
# Enumerate the reasons that an email was not forwarded.
497
# This excludes emails dropped due to mask forwarding settings,
498
# such as "block all" and "block promotional". Those are logged
499
# as Glean email_blocked events.
500
EmailDroppedReason = Literal[
1✔
501
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
502
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
503
    "hard_bounce_pause",  # The user recently had a hard bounce
504
    "soft_bounce_pause",  # The user recently has a soft bounce
505
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
506
    "reply_requires_premium",  # The email is a reply from a free user
507
    "content_missing",  # Could not load the email from storage
508
    "error_from_header",  # Error generating the From: header, retryable
509
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
510
    "error_sending",  # Error sending the forwarded email (SES), retryable
511
]
512

513

514
def log_email_dropped(
1✔
515
    reason: EmailDroppedReason,
516
    mask: RelayAddress | DomainAddress,
517
    is_reply: bool = False,
518
    can_retry: bool = False,
519
) -> None:
520
    """
521
    Log that an email was dropped for a reason other than a mask blocking setting.
522

523
    This mirrors the interface of glean_logger().log_email_blocked(), which
524
    records emails dropped due to the mask's blocking setting.
525
    """
526
    extra: dict[str, str | int | bool] = {"reason": reason}
1✔
527
    if mask.user.profile.metrics_enabled:
1✔
528
        if mask.user.profile.fxa is not None:
1✔
529
            extra["fxa_id"] = mask.user.profile.fxa.uid
1✔
530
        extra["mask_id"] = mask.metrics_id
1✔
531
    extra |= {
1✔
532
        "is_random_mask": isinstance(mask, RelayAddress),
533
        "is_reply": is_reply,
534
        "can_retry": can_retry,
535
    }
536
    info_logger.info("email_dropped", extra=extra)
1✔
537

538

539
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
540
    """
541
    Handle an AWS SES received notification.
542

543
    For more information, see:
544
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
545
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
546

547
    Returns (may be incomplete):
548
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
549
      flagged for abuse, the email is under a bounce pause, the email was suppressed
550
      for spam, the list email was blocked, or the noreply address was the recipient.
551
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
552
      the email failed DMARC with reject policy, or the email is a reply chain to a
553
      non-premium user.
554
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
555
      "CC", or "BCC" fields, or the Relay address is not in the database.
556
    * 503 if the "From" address is malformed, the S3 client returned an error different
557
      from "not found", or the SES client fails
558

559
    And many other returns conditions if the email is a reply. The HTTP returns are an
560
    artifact from an earlier time when emails were sent to a webhook. Currently,
561
    production instead pulls events from a queue.
562

563
    TODO: Return a more appropriate status object
564
    TODO: Document the metrics emitted
565
    """
566
    mail = message_json["mail"]
1✔
567
    if "commonHeaders" not in mail:
1✔
568
        logger.error("SNS message without commonHeaders")
1✔
569
        return HttpResponse(
1✔
570
            "Received SNS notification without commonHeaders.", status=400
571
        )
572
    common_headers = mail["commonHeaders"]
1✔
573
    receipt = message_json["receipt"]
1✔
574

575
    _record_receipt_verdicts(receipt, "all")
1✔
576
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
577
    if to_address is None:
1✔
578
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
579
        return HttpResponse("Address does not exist", status=404)
1✔
580

581
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
582
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
583
    if not from_addresses:
1✔
584
        info_logger.error(
1✔
585
            "_handle_received: no from address",
586
            extra={
587
                "source": mail["source"],
588
                "common_headers_from": common_headers["from"],
589
            },
590
        )
591
        return HttpResponse("Unable to parse From address", status=400)
1✔
592
    from_address = from_addresses[0][1]
1✔
593

594
    try:
1✔
595
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
596
    except ValueError:
1✔
597
        # TODO: Add metric
598
        return HttpResponse("Malformed to field.", status=400)
1✔
599

600
    if to_local_portion.lower() == "noreply":
1✔
601
        incr_if_enabled("email_for_noreply_address", 1)
1✔
602
        return HttpResponse("noreply address is not supported.")
1✔
603
    try:
1✔
604
        # FIXME: this ambiguous return of either
605
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
606
        # up a bit.
607
        address = _get_address(to_address)
1✔
608
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
609
        user_profile = address.user.profile
1✔
610
    except (
1✔
611
        ObjectDoesNotExist,
612
        CannotMakeAddressException,
613
        DeletedAddress.MultipleObjectsReturned,
614
    ):
615
        if to_local_portion.lower() == "replies":
1✔
616
            response = _handle_reply(from_address, message_json, to_address)
1✔
617
        else:
618
            response = HttpResponse("Address does not exist", status=404)
1✔
619
        return response
1✔
620

621
    _record_receipt_verdicts(receipt, "valid_user")
1✔
622
    # if this is spam and the user is set to auto-block spam, early return
623
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
624
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
625
        log_email_dropped(reason="auto_block_spam", mask=address)
1✔
626
        return HttpResponse("Address rejects spam.")
1✔
627

628
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
629
        policy = receipt.get("dmarcPolicy", "none")
1✔
630
        # TODO: determine action on dmarcPolicy "quarantine"
631
        if policy == "reject":
1!
632
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
1✔
633
            incr_if_enabled(
1✔
634
                "email_suppressed_for_dmarc_failure",
635
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
636
            )
637
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
638

639
    # if this user is over bounce limits, early return
640
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
641
    if bounce_paused:
1✔
642
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
643
        incr_if_enabled("email_suppressed_for_%s_bounce" % bounce_type, 1)
1✔
644
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
645
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
646
        )
647
        log_email_dropped(reason=reason, mask=address)
1✔
648
        return HttpResponse("Address is temporarily disabled.")
1✔
649

650
    # check if this is a reply from an external sender to a Relay user
651
    try:
1✔
652
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
653
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
654
        user_address = address
1✔
655
        address = reply_record.address
1✔
656
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
657
        # make sure the relay user is premium
658
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
659
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
1✔
660
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
661
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
662
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
663
        # match a Reply record, continue to treat this as a regular email from
664
        # an external sender to a relay user
665
        pass
1✔
666

667
    # if account flagged for abuse, early return
668
    if user_profile.is_flagged:
1✔
669
        log_email_dropped(reason="abuse_flag", mask=address)
1✔
670
        return HttpResponse("Address is temporarily disabled.")
1✔
671

672
    # if address is set to block, early return
673
    if not address.enabled:
1✔
674
        incr_if_enabled("email_for_disabled_address", 1)
1✔
675
        address.num_blocked += 1
1✔
676
        address.save(update_fields=["num_blocked"])
1✔
677
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
678
        user_profile.last_engagement = datetime.now(timezone.utc)
1✔
679
        user_profile.save()
1✔
680
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
681
        return HttpResponse("Address is temporarily disabled.")
1✔
682

683
    _record_receipt_verdicts(receipt, "active_alias")
1✔
684
    incr_if_enabled("email_for_active_address", 1)
1✔
685

686
    # if address is blocking list emails, and email is from list, early return
687
    if (
1✔
688
        address
689
        and address.block_list_emails
690
        and user_profile.has_premium
691
        and _check_email_from_list(mail["headers"])
692
    ):
693
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
694
        address.num_blocked += 1
1✔
695
        address.save(update_fields=["num_blocked"])
1✔
696
        user_profile.last_engagement = datetime.now(timezone.utc)
1✔
697
        user_profile.save()
1✔
698
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
699
        return HttpResponse("Address is not accepting list emails.")
1✔
700

701
    # Collect new headers
702
    subject = common_headers.get("subject", "")
1✔
703
    destination_address = user_profile.user.email
1✔
704
    reply_address = get_reply_to_address()
1✔
705
    try:
1✔
706
        from_header = generate_from_header(from_address, to_address)
1✔
707
    except InvalidFromHeader:
1✔
708
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
709
        header_from = []
1✔
710
        for header in mail["headers"]:
1✔
711
            if header["name"].lower() == "from":
1✔
712
                header_from.append(header)
1✔
713
        info_logger.error(
1✔
714
            "generate_from_header",
715
            extra={
716
                "from_address": from_address,
717
                "source": mail["source"],
718
                "common_headers_from": common_headers["from"],
719
                "headers_from": header_from,
720
            },
721
        )
722
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
1✔
723
        return HttpResponse("Cannot parse the From address", status=503)
1✔
724

725
    headers: OutgoingHeaders = {
1✔
726
        "Subject": subject,
727
        "From": from_header,
728
        "To": destination_address,
729
        "Reply-To": reply_address,
730
        "Resent-From": from_address,
731
    }
732

733
    # Get incoming email
734
    try:
1✔
735
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
736
    except ClientError as e:
1✔
737
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
738
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
739
            log_email_dropped(reason="content_missing", mask=address)
1✔
740
            return HttpResponse("Email not in S3", status=404)
1✔
741
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
742
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
1✔
743
        # we are returning a 503 so that SNS can retry the email processing
744
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
745

746
    # Convert to new email
747
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
748
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
749
    remove_level_one_trackers = bool(
1✔
750
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
751
    )
752
    (
1✔
753
        forwarded_email,
754
        issues,
755
        level_one_trackers_removed,
756
        has_html,
757
        has_text,
758
    ) = _convert_to_forwarded_email(
759
        incoming_email_bytes=incoming_email_bytes,
760
        headers=headers,
761
        to_address=to_address,
762
        from_address=from_address,
763
        language=user_profile.language,
764
        has_premium=user_profile.has_premium,
765
        sample_trackers=sample_trackers,
766
        remove_level_one_trackers=remove_level_one_trackers,
767
    )
768
    if has_html:
1✔
769
        incr_if_enabled("email_with_html_content", 1)
1✔
770
    if has_text:
1!
771
        incr_if_enabled("email_with_text_content", 1)
1✔
772
    if issues:
1✔
773
        info_logger.warning(
1✔
774
            "_handle_received: forwarding issues", extra={"issues": issues}
775
        )
776

777
    # Send new email
778
    try:
1✔
779
        ses_response = ses_send_raw_email(
1✔
780
            source_address=reply_address,
781
            destination_address=destination_address,
782
            message=forwarded_email,
783
        )
784
    except ClientError:
1✔
785
        # 503 service unavailable reponse to SNS so it can retry
786
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
1✔
787
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
788

789
    message_id = ses_response["MessageId"]
1✔
790
    _store_reply_record(mail, message_id, address)
1✔
791

792
    user_profile.update_abuse_metric(
1✔
793
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
794
    )
795
    user_profile.last_engagement = datetime.now(timezone.utc)
1✔
796
    user_profile.save()
1✔
797
    address.num_forwarded += 1
1✔
798
    address.last_used_at = datetime.now(timezone.utc)
1✔
799
    if level_one_trackers_removed:
1!
800
        address.num_level_one_trackers_blocked = (
×
801
            address.num_level_one_trackers_blocked or 0
802
        ) + level_one_trackers_removed
803
    address.save(
1✔
804
        update_fields=[
805
            "num_forwarded",
806
            "last_used_at",
807
            "block_list_emails",
808
            "num_level_one_trackers_blocked",
809
        ]
810
    )
811
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
812
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
813

814

815
def _get_verdict(receipt, verdict_type):
1✔
816
    return receipt["%sVerdict" % verdict_type]["status"]
1✔
817

818

819
def _check_email_from_list(headers):
1✔
820
    for header in headers:
1!
821
        if header["name"].lower().startswith("list-"):
1!
822
            return True
1✔
823
    return False
×
824

825

826
def _record_receipt_verdicts(receipt, state):
1✔
827
    verdict_tags = []
1✔
828
    for key in sorted(receipt.keys()):
1✔
829
        if key.endswith("Verdict"):
1✔
830
            value = receipt[key]["status"]
1✔
831
            verdict_tags.append(f"{key}:{value}")
1✔
832
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
833
        elif key == "dmarcPolicy":
1✔
834
            value = receipt[key]
1✔
835
            verdict_tags.append(f"{key}:{value}")
1✔
836
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
837

838

839
def _get_message_id_from_headers(headers):
1✔
840
    message_id = None
1✔
841
    for header in headers:
1✔
842
        if header["name"].lower() == "message-id":
1✔
843
            message_id = header["value"]
1✔
844
    return message_id
1✔
845

846

847
def _get_keys_from_headers(headers):
1✔
848
    in_reply_to = None
1✔
849
    for header in headers:
1✔
850
        if header["name"].lower() == "in-reply-to":
1✔
851
            in_reply_to = header["value"]
1✔
852
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
853
            return derive_reply_keys(message_id_bytes)
1✔
854

855
        if header["name"].lower() == "references":
1✔
856
            message_ids = header["value"]
1✔
857
            for message_id in message_ids.split(" "):
1✔
858
                message_id_bytes = get_message_id_bytes(message_id)
1✔
859
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
860
                try:
1✔
861
                    # FIXME: calling code is likely to duplicate this query
862
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
863
                    return lookup_key, encryption_key
1✔
864
                except Reply.DoesNotExist:
1✔
865
                    pass
1✔
866
            raise Reply.DoesNotExist
1✔
867
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
868
    raise ReplyHeadersNotFound
1✔
869

870

871
def _get_reply_record_from_lookup_key(lookup_key):
1✔
872
    lookup = b64_lookup_key(lookup_key)
1✔
873
    return Reply.objects.get(lookup=lookup)
1✔
874

875

876
def _strip_localpart_tag(address):
1✔
877
    [localpart, domain] = address.split("@")
1✔
878
    subaddress_parts = localpart.split("+")
1✔
879
    return f"{subaddress_parts[0]}@{domain}"
1✔
880

881

882
_TransportType = Literal["sns", "s3"]
1✔
883

884

885
def _get_email_bytes(
1✔
886
    message_json: AWS_SNSMessageJSON,
887
) -> tuple[bytes, _TransportType, float]:
888
    with Timer(logger=None) as load_timer:
1✔
889
        if "content" in message_json:
1✔
890
            # email content in sns message
891
            message_content = message_json["content"].encode("utf-8")
1✔
892
            transport: Literal["sns", "s3"] = "sns"
1✔
893
        else:
894
            # assume email content in S3
895
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
896
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
897
            transport = "s3"
1✔
898
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
899
    load_time_s = round(load_timer.last, 3)
1✔
900
    return (message_content, transport, load_time_s)
1✔
901

902

903
def _convert_to_forwarded_email(
1✔
904
    incoming_email_bytes: bytes,
905
    headers: OutgoingHeaders,
906
    to_address: str,
907
    from_address: str,
908
    language: str,
909
    has_premium: bool,
910
    sample_trackers: bool,
911
    remove_level_one_trackers: bool,
912
    now: datetime | None = None,
913
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
914
    """
915
    Convert an email (as bytes) to a forwarded email.
916

917
    Return is a tuple:
918
    - email - The forwarded email
919
    - issues - Any detected issues in conversion
920
    - level_one_trackers_removed (int) - Number of trackers removed
921
    - has_html - True if the email has an HTML representation
922
    - has_text - True if the email has a plain text representation
923
    """
924
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
925
    # python/typeshed issue 2418
926
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
927
    # policy.default.message_factory is EmailMessage
928
    assert isinstance(email, EmailMessage)
1✔
929

930
    # Replace headers in the original email
931
    header_issues = _replace_headers(email, headers)
1✔
932

933
    # Find and replace text content
934
    text_body = email.get_body("plain")
1✔
935
    text_content = None
1✔
936
    has_text = False
1✔
937
    if text_body:
1!
938
        has_text = True
1✔
939
        assert isinstance(text_body, EmailMessage)
1✔
940
        text_content = text_body.get_content()
1✔
941
        new_text_content = _convert_text_content(text_content, to_address)
1✔
942
        text_body.set_content(new_text_content)
1✔
943

944
    # Find and replace HTML content
945
    html_body = email.get_body("html")
1✔
946
    level_one_trackers_removed = 0
1✔
947
    has_html = False
1✔
948
    if html_body:
1✔
949
        has_html = True
1✔
950
        assert isinstance(html_body, EmailMessage)
1✔
951
        html_content = html_body.get_content()
1✔
952
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
953
            html_content,
954
            to_address,
955
            from_address,
956
            language,
957
            has_premium,
958
            sample_trackers,
959
            remove_level_one_trackers,
960
        )
961
        html_body.set_content(new_content, subtype="html")
1✔
962
    elif text_content:
1!
963
        # Try to use the text content to generate HTML content
964
        html_content = urlize_and_linebreaks(text_content)
1✔
965
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
966
            html_content,
967
            to_address,
968
            from_address,
969
            language,
970
            has_premium,
971
            sample_trackers,
972
            remove_level_one_trackers,
973
        )
974
        assert isinstance(text_body, EmailMessage)
1✔
975
        try:
1✔
976
            text_body.add_alternative(new_content, subtype="html")
1✔
977
        except TypeError as e:
×
978
            out = StringIO()
×
979
            _structure(email, fp=out)
×
980
            info_logger.error(
×
981
                "Adding HTML alternate failed",
982
                extra={"exception": str(e), "structure": out.getvalue()},
983
            )
984

985
    issues: EmailForwardingIssues = {}
1✔
986
    if header_issues:
1✔
987
        issues["headers"] = header_issues
1✔
988
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
989

990

991
def _replace_headers(
1✔
992
    email: EmailMessage, headers: OutgoingHeaders
993
) -> EmailHeaderIssues:
994
    """
995
    Replace the headers in email with new headers.
996

997
    This replaces headers in the passed email object, rather than returns an altered
998
    copy. The primary reason is that the Python email package can read an email with
999
    non-compliant headers or content, but can't write it. A read/write is required to
1000
    create a copy that we then alter. This code instead alters the passed EmailMessage
1001
    object, making header-specific changes in try / except statements.
1002

1003
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1004
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1005
    nice to handle the non-compliant headers without crashing before we add a source of
1006
    memory-related crashes.
1007
    """
1008
    # Look for headers to drop
1009
    to_drop: list[str] = []
1✔
1010
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
1✔
1011
    issues: EmailHeaderIssues = defaultdict(list)
1✔
1012

1013
    # Detect non-compliant headers in incoming emails
1014
    for header in email.keys():
1✔
1015
        try:
1✔
1016
            value = email[header]
1✔
1017
        except Exception as e:
1✔
1018
            issues["incoming"].append((header, {"exception_on_read": repr(e)}))
1✔
1019
            value = None
1✔
1020
        if getattr(value, "defects", None):
1✔
1021
            issues["incoming"].append(
1✔
1022
                (
1023
                    header,
1024
                    {
1025
                        "defect_count": len(value.defects),
1026
                        "parsed_value": str(value),
1027
                        "unstructured_value": str(value.as_unstructured),
1028
                    },
1029
                )
1030
            )
1031

1032
    # Collect headers that will not be forwarded
1033
    for header in email.keys():
1✔
1034
        header_lower = header.lower()
1✔
1035
        if (
1✔
1036
            header_lower not in replacements
1037
            and header_lower != "mime-version"
1038
            and not header_lower.startswith("content-")
1039
        ):
1040
            to_drop.append(header)
1✔
1041

1042
    # Drop headers that should be dropped
1043
    for header in to_drop:
1✔
1044
        del email[header]
1✔
1045

1046
    # Replace the requested headers
1047
    for header, value in headers.items():
1✔
1048
        del email[header]
1✔
1049
        try:
1✔
1050
            email[header] = value
1✔
1051
        except Exception as e:
×
1052
            issues["outgoing"].append(
×
1053
                (header, {"exception_on_write": repr(e), "value": value})
1054
            )
1055
            continue
×
1056
        try:
1✔
1057
            parsed_value = email[header]
1✔
1058
        except Exception as e:
×
1059
            issues["outgoing"].append((header, {"exception_on_read": repr(e)}))
×
1060
            continue
×
1061
        if parsed_value.defects:
1!
1062
            issues["outgoing"].append(
×
1063
                (
1064
                    header,
1065
                    {
1066
                        "defect_count": len(parsed_value.defects),
1067
                        "parsed_value": str(parsed_value),
1068
                        "unstructured_value": str(parsed_value.as_unstructured),
1069
                    },
1070
                )
1071
            )
1072

1073
    return dict(issues)
1✔
1074

1075

1076
def _convert_html_content(
1✔
1077
    html_content: str,
1078
    to_address: str,
1079
    from_address: str,
1080
    language: str,
1081
    has_premium: bool,
1082
    sample_trackers: bool,
1083
    remove_level_one_trackers: bool,
1084
    now: datetime | None = None,
1085
) -> tuple[str, int]:
1086
    # frontend expects a timestamp in milliseconds
1087
    now = now or datetime.now(timezone.utc)
1✔
1088
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1089

1090
    # scramble alias so that clients don't recognize it
1091
    # and apply default link styles
1092
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1093

1094
    # sample tracker numbers
1095
    if sample_trackers:
1!
1096
        count_all_trackers(html_content)
×
1097

1098
    tracker_report_link = ""
1✔
1099
    removed_count = 0
1✔
1100
    if remove_level_one_trackers:
1!
1101
        html_content, tracker_details = remove_trackers(
×
1102
            html_content, from_address, datetime_now_ms
1103
        )
1104
        removed_count = tracker_details["tracker_removed"]
×
1105
        tracker_report_details = {
×
1106
            "sender": from_address,
1107
            "received_at": datetime_now_ms,
1108
            "trackers": tracker_details["level_one"]["trackers"],
1109
        }
1110
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1111
            tracker_report_details
1112
        )
1113

1114
    wrapped_html = wrap_html_email(
1✔
1115
        original_html=html_content,
1116
        language=language,
1117
        has_premium=has_premium,
1118
        display_email=display_email,
1119
        tracker_report_link=tracker_report_link,
1120
        num_level_one_email_trackers_removed=removed_count,
1121
    )
1122
    return wrapped_html, removed_count
1✔
1123

1124

1125
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1126
    relay_header_text = (
1✔
1127
        "This email was sent to your alias "
1128
        f"{to_address}. To stop receiving emails sent to this alias, "
1129
        "update the forwarding settings in your dashboard.\n"
1130
        "---Begin Email---\n"
1131
    )
1132
    wrapped_text = relay_header_text + text_content
1✔
1133
    return wrapped_text
1✔
1134

1135

1136
def _build_reply_requires_premium_email(
1✔
1137
    from_address: str,
1138
    reply_record: Reply,
1139
    message_id: str | None,
1140
    decrypted_metadata: dict[str, Any] | None,
1141
) -> EmailMessage:
1142
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1143
    # will forward.  So, tell the user we forwarded it.
1144
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1145
    sender: str | None = ""
1✔
1146
    if decrypted_metadata is not None:
1!
1147
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1148
    ctx = {
1✔
1149
        "sender": sender or "",
1150
        "forwarded": forwarded,
1151
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1152
    }
1153
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1154
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1155

1156
    # Create the message
1157
    msg = EmailMessage()
1✔
1158
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1159
    msg["From"] = get_reply_to_address()
1✔
1160
    msg["To"] = from_address
1✔
1161
    if message_id:
1!
1162
        msg["In-Reply-To"] = message_id
1✔
1163
        msg["References"] = message_id
1✔
1164
    msg.set_content(text_body)
1✔
1165
    msg.add_alternative(html_body, subtype="html")
1✔
1166
    return msg
1✔
1167

1168

1169
def _set_forwarded_first_reply(profile):
1✔
1170
    profile.forwarded_first_reply = True
1✔
1171
    profile.save()
1✔
1172

1173

1174
def _send_reply_requires_premium_email(
1✔
1175
    from_address: str,
1176
    reply_record: Reply,
1177
    message_id: str | None,
1178
    decrypted_metadata: dict[str, Any] | None,
1179
) -> None:
1180
    msg = _build_reply_requires_premium_email(
×
1181
        from_address, reply_record, message_id, decrypted_metadata
1182
    )
1183
    try:
×
1184
        ses_send_raw_email(
×
1185
            source_address=get_reply_to_address(premium=False),
1186
            destination_address=from_address,
1187
            message=msg,
1188
        )
1189
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1190
        # So, updated the DB.
1191
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1192
    except ClientError as e:
×
1193
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1194
    incr_if_enabled("free_user_reply_attempt", 1)
×
1195

1196

1197
def _reply_allowed(
1✔
1198
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1199
):
1200
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1201
    reply_record_email = reply_record.address.user.email
1✔
1202
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1203
    if (from_address == reply_record_email) or (
1!
1204
        stripped_from_address == stripped_reply_record_address
1205
    ):
1206
        # This is a Relay user replying to an external sender;
1207

1208
        if reply_record.profile.is_flagged:
1!
1209
            return False
×
1210

1211
        if reply_record.owner_has_premium:
1!
1212
            return True
1✔
1213

1214
        # if we haven't forwarded a first reply for this user, return True to allow
1215
        # this first reply
1216
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
1217
        _send_reply_requires_premium_email(
×
1218
            from_address, reply_record, message_id, decrypted_metadata
1219
        )
1220
        return allow_first_reply
×
1221
    else:
1222
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1223
        # premium Relay user
1224
        try:
×
1225
            address = _get_address(to_address)
×
1226
            if address.user.profile.has_premium:
×
1227
                return True
×
1228
        except ObjectDoesNotExist:
×
1229
            return False
×
1230
    incr_if_enabled("free_user_reply_attempt", 1)
×
1231
    return False
×
1232

1233

1234
def _handle_reply(
1✔
1235
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1236
) -> HttpResponse:
1237
    """
1238
    Handle a reply from a Relay user to an external email.
1239

1240
    Returns (may be incomplete):
1241
    * 200 if the reply was sent
1242
    * 400 if the In-Reply-To and References headers are missing, none of the References
1243
      headers are a reply record, or the SES client raises an error
1244
    * 403 if the Relay user is not allowed to reply
1245
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1246
      the database
1247
    * 503 if the S3 client returns an error (other than not found), or the SES client
1248
      returns an error
1249

1250
    TODO: Return a more appropriate status object (see _handle_received)
1251
    TODO: Document metrics emitted
1252
    """
1253
    mail = message_json["mail"]
1✔
1254
    try:
1✔
1255
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1256
    except ReplyHeadersNotFound:
1✔
1257
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1258
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1259

1260
    try:
1✔
1261
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1262
    except Reply.DoesNotExist:
1✔
1263
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1264
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1265

1266
    address = reply_record.address
1✔
1267
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1268
    decrypted_metadata = json.loads(
1✔
1269
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1270
    )
1271
    if not _reply_allowed(
1✔
1272
        from_address, to_address, reply_record, message_id, decrypted_metadata
1273
    ):
1274
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
1✔
1275
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1276

1277
    outbound_from_address = address.full_address
1✔
1278
    incr_if_enabled("reply_email", 1)
1✔
1279
    subject = mail["commonHeaders"].get("subject", "")
1✔
1280
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1281
    headers: OutgoingHeaders = {
1✔
1282
        "Subject": subject,
1283
        "From": outbound_from_address,
1284
        "To": to_address,
1285
        "Reply-To": outbound_from_address,
1286
    }
1287

1288
    try:
1✔
1289
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1290
    except ClientError as e:
1✔
1291
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1292
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1293
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
1✔
1294
            return HttpResponse("Email not in S3", status=404)
1✔
1295
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1296
        log_email_dropped(
1✔
1297
            reason="error_storage", mask=address, is_reply=True, can_retry=True
1298
        )
1299
        # we are returning a 500 so that SNS can retry the email processing
1300
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1301

1302
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1303
    assert isinstance(email, EmailMessage)
1✔
1304

1305
    # Convert to a reply email
1306
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1307
    _replace_headers(email, headers)
1✔
1308

1309
    try:
1✔
1310
        ses_send_raw_email(
1✔
1311
            source_address=outbound_from_address,
1312
            destination_address=to_address,
1313
            message=email,
1314
        )
1315
    except ClientError:
1✔
1316
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
1✔
1317
        return HttpResponse("SES client error", status=400)
1✔
1318

1319
    reply_record.increment_num_replied()
1✔
1320
    profile = address.user.profile
1✔
1321
    profile.update_abuse_metric(replied=True)
1✔
1322
    profile.last_engagement = datetime.now(timezone.utc)
1✔
1323
    profile.save()
1✔
1324
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1325
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1326

1327

1328
def _get_domain_address(local_portion: str, domain_portion: str) -> DomainAddress:
1✔
1329
    """
1330
    Find or create the DomainAddress for the parts of an email address.
1331

1332
    If the domain_portion is for a valid subdomain, a new DomainAddress
1333
    will be created and returned.
1334

1335
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1336

1337
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1338
    """
1339

1340
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1341
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1342
        incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1343
        raise ObjectDoesNotExist("Address does not exist")
1✔
1344
    try:
1✔
1345
        with transaction.atomic():
1✔
1346
            locked_profile = Profile.objects.select_for_update().get(
1✔
1347
                subdomain=address_subdomain
1348
            )
1349
            domain_numerical = get_domain_numerical(address_domain)
1✔
1350
            # filter DomainAddress because it may not exist
1351
            # which will throw an error with get()
1352
            domain_address = DomainAddress.objects.filter(
1✔
1353
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1354
            ).first()
1355
            if domain_address is None:
1✔
1356
                # TODO: Consider flows when a user generating alias on a fly
1357
                # was unable to receive an email due to user no longer being a
1358
                # premium user as seen in exception thrown on make_domain_address
1359
                domain_address = DomainAddress.make_domain_address(
1✔
1360
                    locked_profile, local_portion, True
1361
                )
1362
                glean_logger().log_email_mask_created(
1✔
1363
                    mask=domain_address,
1364
                    created_by_api=False,
1365
                )
1366
            domain_address.last_used_at = datetime.now(timezone.utc)
1✔
1367
            domain_address.save()
1✔
1368
            return domain_address
1✔
1369
    except Profile.DoesNotExist as e:
1✔
1370
        incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1371
        raise e
1✔
1372

1373

1374
def _get_address(address: str) -> RelayAddress | DomainAddress:
1✔
1375
    """
1376
    Find or create the RelayAddress or DomainAddress for an email address.
1377

1378
    If an unknown email address is for a valid subdomain, a new DomainAddress
1379
    will be created.
1380

1381
    On failure, raises exception based on Django's ObjectDoesNotExist:
1382
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1383
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1384
    * ObjectDoesNotExist - Unknown domain
1385
    """
1386

1387
    local_portion, domain_portion = address.split("@")
1✔
1388
    local_address = local_portion.lower()
1✔
1389
    domain = domain_portion.lower()
1✔
1390

1391
    # if the domain is not the site's 'top' relay domain,
1392
    # it may be for a user's subdomain
1393
    email_domains = get_domains_from_settings().values()
1✔
1394
    if domain not in email_domains:
1✔
1395
        return _get_domain_address(local_address, domain)
1✔
1396

1397
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1398
    try:
1✔
1399
        domain_numerical = get_domain_numerical(domain)
1✔
1400
        relay_address = RelayAddress.objects.get(
1✔
1401
            address=local_address, domain=domain_numerical
1402
        )
1403
        return relay_address
1✔
1404
    except RelayAddress.DoesNotExist as e:
1✔
1405
        try:
1✔
1406
            DeletedAddress.objects.get(
1✔
1407
                address_hash=address_hash(local_address, domain=domain)
1408
            )
1409
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1410
            # TODO: create a hard bounce receipt rule in SES
1411
        except DeletedAddress.DoesNotExist:
1✔
1412
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1413
        except DeletedAddress.MultipleObjectsReturned:
1✔
1414
            # not sure why this happens on stage but let's handle it
1415
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1416
        raise e
1✔
1417

1418

1419
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1420
    """
1421
    Handle an AWS SES bounce notification.
1422

1423
    For more information, see:
1424
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1425

1426
    Returns:
1427
    * 404 response if any email address does not match a user,
1428
    * 200 response if all match or none are given
1429

1430
    Emits a counter metric "email_bounce" with these tags:
1431
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1432
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1433
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1434
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1435

1436
    Emits an info log "bounce_notification", same data as metric, plus:
1437
    * bounce_action: 'action' from bounced recipient data, or None
1438
    * bounce_status: 'status' from bounced recipient data, or None
1439
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1440
    * bounce_extra: Extra data from bounce_recipient data, if any
1441
    * domain: User's real email address domain, if an address was given
1442
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1443
    """
1444
    bounce = message_json.get("bounce", {})
1✔
1445
    bounce_type = bounce.get("bounceType", "none")
1✔
1446
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1447
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1448

1449
    now = datetime.now(timezone.utc)
1✔
1450
    bounce_data = []
1✔
1451
    for recipient in bounced_recipients:
1✔
1452
        recipient_address = recipient.pop("emailAddress", None)
1✔
1453
        data = {
1✔
1454
            "bounce_type": bounce_type,
1455
            "bounce_subtype": bounce_subtype,
1456
            "bounce_action": recipient.pop("action", ""),
1457
            "bounce_status": recipient.pop("status", ""),
1458
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1459
            "user_match": "no_address",
1460
            "relay_action": "no_action",
1461
        }
1462
        if recipient:
1!
1463
            data["bounce_extra"] = recipient.copy()
×
1464
        bounce_data.append(data)
1✔
1465

1466
        if recipient_address is None:
1!
1467
            continue
×
1468

1469
        recipient_address = parseaddr(recipient_address)[1]
1✔
1470
        recipient_domain = recipient_address.split("@")[1]
1✔
1471
        data["domain"] = recipient_domain
1✔
1472

1473
        try:
1✔
1474
            user = User.objects.get(email=recipient_address)
1✔
1475
            profile = user.profile
1✔
1476
            data["user_match"] = "found"
1✔
1477
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1478
                data["fxa_id"] = fxa.uid
1✔
1479
            else:
1480
                data["fxa_id"] = ""
1✔
1481
        except User.DoesNotExist:
1✔
1482
            # TODO: handle bounce for a user who no longer exists
1483
            # add to SES account-wide suppression list?
1484
            data["user_match"] = "missing"
1✔
1485
            continue
1✔
1486

1487
        action = None
1✔
1488
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1489
            # if an email bounced as spam, set to auto block spam for this user
1490
            # and DON'T set them into bounce pause state
1491
            action = "auto_block_spam"
1✔
1492
            profile.auto_block_spam = True
1✔
1493
        elif bounce_type == "Permanent":
1✔
1494
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1495
            action = "hard_bounce"
1✔
1496
            profile.last_hard_bounce = now
1✔
1497
        elif bounce_type == "Transient":
1!
1498
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1499
            action = "soft_bounce"
1✔
1500
            profile.last_soft_bounce = now
1✔
1501
        if action:
1!
1502
            data["relay_action"] = action
1✔
1503
            profile.save()
1✔
1504

1505
    if not bounce_data:
1!
1506
        # Data when there are no identified recipients
1507
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1508

1509
    for data in bounce_data:
1✔
1510
        tags = {
1✔
1511
            "bounce_type": bounce_type,
1512
            "bounce_subtype": bounce_subtype,
1513
            "user_match": data["user_match"],
1514
            "relay_action": data["relay_action"],
1515
        }
1516
        incr_if_enabled(
1✔
1517
            "email_bounce",
1518
            1,
1519
            tags=[generate_tag(key, val) for key, val in tags.items()],
1520
        )
1521
        info_logger.info("bounce_notification", extra=data)
1✔
1522

1523
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1524
        return HttpResponse("Address does not exist", status=404)
1✔
1525
    return HttpResponse("OK", status=200)
1✔
1526

1527

1528
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1529
    """
1530
    Handle an AWS SES complaint notification.
1531

1532
    For more information, see:
1533
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1534

1535
    Returns:
1536
    * 404 response if any email address does not match a user,
1537
    * 200 response if all match or none are given
1538

1539
    Emits a counter metric "email_complaint" with these tags:
1540
    * complaint_subtype: 'onaccounsuppressionlist', or 'none' if omitted
1541
    * complaint_feedback - feedback enumeration from ISP or 'none'
1542
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1543
    * relay_action: 'no_action', 'auto_block_spam'
1544

1545
    Emits an info log "complaint_notification", same data as metric, plus:
1546
    * complaint_user_agent - identifies the client used to file the complaint
1547
    * complaint_extra - Extra data from complainedRecipients data, if any
1548
    * domain - User's domain, if an address was given
1549
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1550
    """
1551
    complaint = deepcopy(message_json.get("complaint", {}))
1✔
1552
    complained_recipients = complaint.pop("complainedRecipients", [])
1✔
1553
    subtype = complaint.pop("complaintSubType", None)
1✔
1554
    user_agent = complaint.pop("userAgent", None)
1✔
1555
    feedback = complaint.pop("complaintFeedbackType", None)
1✔
1556

1557
    complaint_data = []
1✔
1558
    for recipient in complained_recipients:
1✔
1559
        recipient_address = recipient.pop("emailAddress", None)
1✔
1560
        data = {
1✔
1561
            "complaint_subtype": subtype,
1562
            "complaint_user_agent": user_agent,
1563
            "complaint_feedback": feedback,
1564
            "user_match": "no_address",
1565
            "relay_action": "no_action",
1566
        }
1567
        if recipient:
1!
1568
            data["complaint_extra"] = recipient.copy()
×
1569
        complaint_data.append(data)
1✔
1570

1571
        if recipient_address is None:
1!
1572
            continue
×
1573

1574
        recipient_address = parseaddr(recipient_address)[1]
1✔
1575
        recipient_domain = recipient_address.split("@")[1]
1✔
1576
        data["domain"] = recipient_domain
1✔
1577

1578
        try:
1✔
1579
            user = User.objects.get(email=recipient_address)
1✔
1580
            profile = user.profile
1✔
1581
            data["user_match"] = "found"
1✔
1582
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1583
                data["fxa_id"] = fxa.uid
1✔
1584
            else:
1585
                data["fxa_id"] = ""
1✔
1586
        except User.DoesNotExist:
×
1587
            data["user_match"] = "missing"
×
1588
            continue
×
1589

1590
        data["relay_action"] = "auto_block_spam"
1✔
1591
        profile.auto_block_spam = True
1✔
1592
        profile.save()
1✔
1593

1594
    if not complaint_data:
1!
1595
        # Data when there are no identified recipients
1596
        complaint_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1597

1598
    for data in complaint_data:
1✔
1599
        tags = {
1✔
1600
            "complaint_subtype": subtype or "none",
1601
            "complaint_feedback": feedback or "none",
1602
            "user_match": data["user_match"],
1603
            "relay_action": data["relay_action"],
1604
        }
1605
        incr_if_enabled(
1✔
1606
            "email_complaint",
1607
            1,
1608
            tags=[generate_tag(key, val) for key, val in tags.items()],
1609
        )
1610
        info_logger.info("complaint_notification", extra=data)
1✔
1611

1612
    if any(data["user_match"] == "missing" for data in complaint_data):
1!
UNCOV
1613
        return HttpResponse("Address does not exist", status=404)
×
1614
    return HttpResponse("OK", status=200)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc