• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 5a575d84-99cc-4dcf-af84-811569dbe7be

16 Jul 2024 07:07PM CUT coverage: 85.432% (+0.007%) from 85.425%
5a575d84-99cc-4dcf-af84-811569dbe7be

push

circleci

web-flow
Merge pull request #4881 from mozilla/update-header-logging-mpp4841

MPP-3839: Update email header logging

4093 of 5240 branches covered (78.11%)

Branch coverage included in aggregate %.

25 of 28 new or added lines in 4 files covered. (89.29%)

2 existing lines in 2 files now uncovered.

15934 of 18202 relevant lines covered (87.54%)

10.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.45
/emails/views.py
1
import html
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import shlex
1✔
6
from copy import deepcopy
1✔
7
from datetime import UTC, datetime
1✔
8
from email import message_from_bytes
1✔
9
from email.iterators import _structure
1✔
10
from email.message import EmailMessage
1✔
11
from email.utils import parseaddr
1✔
12
from io import StringIO
1✔
13
from json import JSONDecodeError
1✔
14
from textwrap import dedent
1✔
15
from typing import Any, Literal
1✔
16
from urllib.parse import urlencode
1✔
17

18
from django.conf import settings
1✔
19
from django.contrib.auth.models import User
1✔
20
from django.core.exceptions import ObjectDoesNotExist
1✔
21
from django.db import transaction
1✔
22
from django.db.models import prefetch_related_objects
1✔
23
from django.http import HttpRequest, HttpResponse
1✔
24
from django.shortcuts import render
1✔
25
from django.template.loader import render_to_string
1✔
26
from django.utils.html import escape
1✔
27
from django.views.decorators.csrf import csrf_exempt
1✔
28

29
from botocore.exceptions import ClientError
1✔
30
from codetiming import Timer
1✔
31
from decouple import strtobool
1✔
32
from markus.utils import generate_tag
1✔
33
from sentry_sdk import capture_message
1✔
34
from waffle import sample_is_active
1✔
35

36
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
37
from privaterelay.utils import (
1✔
38
    flag_is_active_in_task,
39
    get_subplat_upgrade_link_by_language,
40
    glean_logger,
41
)
42

43
from .exceptions import CannotMakeAddressException
1✔
44
from .models import (
1✔
45
    DeletedAddress,
46
    DomainAddress,
47
    Profile,
48
    RelayAddress,
49
    Reply,
50
    address_hash,
51
    get_domain_numerical,
52
)
53
from .policy import relay_policy
1✔
54
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
1✔
55
from .types import (
1✔
56
    AWS_MailJSON,
57
    AWS_SNSMessageJSON,
58
    EmailForwardingIssues,
59
    EmailHeaderIssues,
60
    OutgoingHeaders,
61
)
62
from .utils import (
1✔
63
    InvalidFromHeader,
64
    _get_bucket_and_key_from_s3_json,
65
    b64_lookup_key,
66
    count_all_trackers,
67
    decrypt_reply_metadata,
68
    derive_reply_keys,
69
    encrypt_reply_metadata,
70
    generate_from_header,
71
    get_domains_from_settings,
72
    get_message_content_from_s3,
73
    get_message_id_bytes,
74
    get_reply_to_address,
75
    histogram_if_enabled,
76
    incr_if_enabled,
77
    parse_email_header,
78
    remove_message_from_s3,
79
    remove_trackers,
80
    ses_send_raw_email,
81
    urlize_and_linebreaks,
82
)
83

84
logger = logging.getLogger("events")
1✔
85
info_logger = logging.getLogger("eventsinfo")
1✔
86

87

88
class ReplyHeadersNotFound(Exception):
1✔
89
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
90
        self.message = message
1✔
91

92

93
def first_time_user_test(request):
1✔
94
    """
95
    Demonstrate rendering of the "First time Relay user" email.
96
    Settings like language can be given in the querystring, otherwise settings
97
    come from a random free profile.
98
    """
99
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
100
    email_context = {
×
101
        "in_bundle_country": in_bundle_country,
102
        "SITE_ORIGIN": settings.SITE_ORIGIN,
103
    }
104
    if request.GET.get("format", "html") == "text":
×
105
        return render(
×
106
            request,
107
            "emails/first_time_user.txt",
108
            email_context,
109
            "text/plain; charset=utf-8",
110
        )
111
    return render(request, "emails/first_time_user.html", email_context)
×
112

113

114
def reply_requires_premium_test(request):
1✔
115
    """
116
    Demonstrate rendering of the "Reply requires premium" email.
117

118
    Settings like language can be given in the querystring, otherwise settings
119
    come from a random free profile.
120
    """
121
    email_context = {
1✔
122
        "sender": "test@example.com",
123
        "forwarded": True,
124
        "SITE_ORIGIN": settings.SITE_ORIGIN,
125
    }
126
    for param in request.GET:
1✔
127
        email_context[param] = request.GET.get(param)
1✔
128
        if param == "forwarded" and request.GET[param] == "True":
1✔
129
            email_context[param] = True
1✔
130

131
    for param in request.GET:
1✔
132
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
133
            return render(
1✔
134
                request,
135
                "emails/reply_requires_premium.txt",
136
                email_context,
137
                "text/plain; charset=utf-8",
138
            )
139
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
140

141

142
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
143
    # TO DO: Update with correct context when trigger is created
144
    first_forwarded_email_html = render_to_string(
×
145
        "emails/first_forwarded_email.html",
146
        {
147
            "SITE_ORIGIN": settings.SITE_ORIGIN,
148
        },
149
    )
150

151
    wrapped_email = wrap_html_email(
×
152
        first_forwarded_email_html,
153
        "en-us",
154
        True,
155
        "test@example.com",
156
        0,
157
    )
158

159
    return HttpResponse(wrapped_email)
×
160

161

162
def wrap_html_email(
1✔
163
    original_html: str,
164
    language: str,
165
    has_premium: bool,
166
    display_email: str,
167
    num_level_one_email_trackers_removed: int | None = None,
168
    tracker_report_link: str | None = None,
169
) -> str:
170
    """Add Relay banners, surveys, etc. to an HTML email"""
171
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
172
    email_context = {
1✔
173
        "original_html": original_html,
174
        "language": language,
175
        "has_premium": has_premium,
176
        "subplat_upgrade_link": subplat_upgrade_link,
177
        "display_email": display_email,
178
        "tracker_report_link": tracker_report_link,
179
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
180
        "SITE_ORIGIN": settings.SITE_ORIGIN,
181
    }
182
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
183
    # Remove empty lines
184
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
185
    return "\n".join(content_lines) + "\n"
1✔
186

187

188
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
189
    """
190
    Demonstrate rendering of forwarded HTML emails.
191

192
    Settings like language can be given in the querystring, otherwise settings
193
    come from a randomly chosen profile.
194
    """
195

196
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
197
        user_profile = None
1✔
198
    else:
199
        user_profile = Profile.objects.order_by("?").first()
1✔
200

201
    if "language" in request.GET:
1✔
202
        language = request.GET["language"]
1✔
203
    else:
204
        if user_profile is None:
1!
205
            raise ValueError("user_profile must not be None")
×
206
        language = user_profile.language
1✔
207

208
    if "has_premium" in request.GET:
1✔
209
        has_premium = strtobool(request.GET["has_premium"])
1✔
210
    else:
211
        if user_profile is None:
1!
212
            raise ValueError("user_profile must not be None")
×
213
        has_premium = user_profile.has_premium
1✔
214

215
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
216
        num_level_one_email_trackers_removed = int(
1✔
217
            request.GET["num_level_one_email_trackers_removed"]
218
        )
219
    else:
220
        num_level_one_email_trackers_removed = 0
1✔
221

222
    if "has_tracker_report_link" in request.GET:
1✔
223
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
224
    else:
225
        has_tracker_report_link = False
1✔
226
    if has_tracker_report_link:
1✔
227
        if num_level_one_email_trackers_removed:
1✔
228
            trackers = {
1✔
229
                "fake-tracker.example.com": num_level_one_email_trackers_removed
230
            }
231
        else:
232
            trackers = {}
1✔
233
        tracker_report_link = (
1✔
234
            "/tracker-report/#{"
235
            '"sender": "sender@example.com", '
236
            '"received_at": 1658434657, '
237
            f'"trackers": { json.dumps(trackers) }'
238
            "}"
239
        )
240
    else:
241
        tracker_report_link = ""
1✔
242

243
    path = "/emails/wrapped_email_test"
1✔
244
    old_query = {
1✔
245
        "language": language,
246
        "has_premium": "Yes" if has_premium else "No",
247
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
248
        "num_level_one_email_trackers_removed": str(
249
            num_level_one_email_trackers_removed
250
        ),
251
    }
252

253
    def switch_link(key, value):
1✔
254
        if old_query[key] == value:
1✔
255
            return str(value)
1✔
256
        new_query = old_query.copy()
1✔
257
        new_query[key] = value
1✔
258
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
259

260
    html_content = dedent(
1✔
261
        f"""\
262
    <p>
263
      <strong>Email rendering Test</strong>
264
    </p>
265
    <p>Settings: (<a href="{path}">clear all</a>)</p>
266
    <ul>
267
      <li>
268
        <strong>language</strong>:
269
        {escape(language)}
270
        (switch to
271
        {switch_link("language", "en-us")},
272
        {switch_link("language", "de")},
273
        {switch_link("language", "en-gb")},
274
        {switch_link("language", "fr")},
275
        {switch_link("language", "ru-ru")},
276
        {switch_link("language", "es-es")},
277
        {switch_link("language", "pt-br")},
278
        {switch_link("language", "it-it")},
279
        {switch_link("language", "en-ca")},
280
        {switch_link("language", "de-de")},
281
        {switch_link("language", "es-mx")})
282
      </li>
283
      <li>
284
        <strong>has_premium</strong>:
285
        {"Yes" if has_premium else "No"}
286
        (switch to
287
        {switch_link("has_premium", "Yes")},
288
        {switch_link("has_premium", "No")})
289
      </li>
290
      <li>
291
        <strong>has_tracker_report_link</strong>:
292
        {"Yes" if has_tracker_report_link else "No"}
293
        (switch to
294
        {switch_link("has_tracker_report_link", "Yes")},
295
        {switch_link("has_tracker_report_link", "No")})
296
      </li>
297
      <li>
298
        <strong>num_level_one_email_trackers_removed</strong>:
299
        {num_level_one_email_trackers_removed}
300
        (switch to
301
        {switch_link("num_level_one_email_trackers_removed", "0")},
302
        {switch_link("num_level_one_email_trackers_removed", "1")},
303
        {switch_link("num_level_one_email_trackers_removed", "2")})
304
      </li>
305
    </ul>
306
    """
307
    )
308

309
    wrapped_email = wrap_html_email(
1✔
310
        original_html=html_content,
311
        language=language,
312
        has_premium=has_premium,
313
        tracker_report_link=tracker_report_link,
314
        display_email="test@relay.firefox.com",
315
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
316
    )
317
    return HttpResponse(wrapped_email)
1✔
318

319

320
def _store_reply_record(
1✔
321
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
322
) -> AWS_MailJSON:
323
    # After relaying email, store a Reply record for it
324
    reply_metadata = {}
1✔
325
    for header in mail["headers"]:
1✔
326
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
327
            reply_metadata[header["name"].lower()] = header["value"]
1✔
328
    message_id_bytes = get_message_id_bytes(message_id)
1✔
329
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
330
    lookup = b64_lookup_key(lookup_key)
1✔
331
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
332
    reply_create_args: dict[str, Any] = {
1✔
333
        "lookup": lookup,
334
        "encrypted_metadata": encrypted_metadata,
335
    }
336
    if isinstance(address, DomainAddress):
1✔
337
        reply_create_args["domain_address"] = address
1✔
338
    else:
339
        if not isinstance(address, RelayAddress):
1!
340
            raise TypeError("address must be type RelayAddress")
×
341
        reply_create_args["relay_address"] = address
1✔
342
    Reply.objects.create(**reply_create_args)
1✔
343
    return mail
1✔
344

345

346
@csrf_exempt
1✔
347
def sns_inbound(request):
1✔
348
    incr_if_enabled("sns_inbound", 1)
1✔
349
    # First thing we do is verify the signature
350
    json_body = json.loads(request.body)
1✔
351
    verified_json_body = verify_from_sns(json_body)
1✔
352

353
    # Validate ARN and message type
354
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
355
    message_type = verified_json_body.get("Type", None)
1✔
356
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
357
    if error_details:
1✔
358
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
359
        return HttpResponse(error_details["error"], status=400)
1✔
360

361
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
362

363

364
def validate_sns_arn_and_type(
1✔
365
    topic_arn: str | None, message_type: str | None
366
) -> dict[str, Any] | None:
367
    """
368
    Validate Topic ARN and SNS Message Type.
369

370
    If an error is detected, the return is a dictionary of error details.
371
    If no error is detected, the return is None.
372
    """
373
    if not topic_arn:
1✔
374
        error = "Received SNS request without Topic ARN."
1✔
375
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
376
        error = "Received SNS message for wrong topic."
1✔
377
    elif not message_type:
1✔
378
        error = "Received SNS request without Message Type."
1✔
379
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
380
        error = "Received SNS message for unsupported Type."
1✔
381
    else:
382
        error = None
1✔
383

384
    if error:
1✔
385
        return {
1✔
386
            "error": error,
387
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
388
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
389
            "received_sns_type": (
390
                shlex.quote(message_type) if message_type else message_type
391
            ),
392
            "supported_sns_types": SUPPORTED_SNS_TYPES,
393
        }
394
    return None
1✔
395

396

397
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
398
    if message_type == "SubscriptionConfirmation":
×
399
        info_logger.info(
×
400
            "SNS SubscriptionConfirmation",
401
            extra={"SubscribeURL": json_body["SubscribeURL"]},
402
        )
403
        return HttpResponse("Logged SubscribeURL", status=200)
×
404
    if message_type == "Notification":
×
405
        incr_if_enabled("sns_inbound_Notification", 1)
×
406
        return _sns_notification(json_body)
×
407

408
    logger.error(
×
409
        "SNS message type did not fall under the SNS inbound logic",
410
        extra={"message_type": shlex.quote(message_type)},
411
    )
412
    capture_message(
×
413
        "Received SNS message with type not handled in inbound log",
414
        level="error",
415
        stack=True,
416
    )
417
    return HttpResponse(
×
418
        "Received SNS message with type not handled in inbound log", status=400
419
    )
420

421

422
def _sns_notification(json_body):
1✔
423
    try:
1✔
424
        message_json = json.loads(json_body["Message"])
1✔
425
    except JSONDecodeError:
1✔
426
        logger.error(
1✔
427
            "SNS notification has non-JSON message body",
428
            extra={"content": shlex.quote(json_body["Message"])},
429
        )
430
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
431

432
    event_type = message_json.get("eventType")
1✔
433
    notification_type = message_json.get("notificationType")
1✔
434
    if notification_type not in {
1✔
435
        "Complaint",
436
        "Received",
437
        "Bounce",
438
    } and event_type not in {"Complaint", "Bounce"}:
439
        logger.error(
1✔
440
            "SNS notification for unsupported type",
441
            extra={
442
                "notification_type": shlex.quote(notification_type),
443
                "event_type": shlex.quote(event_type),
444
                "keys": [shlex.quote(key) for key in message_json.keys()],
445
            },
446
        )
447
        return HttpResponse(
1✔
448
            (
449
                "Received SNS notification for unsupported Type: "
450
                f"{html.escape(shlex.quote(notification_type))}"
451
            ),
452
            status=400,
453
        )
454
    response = _sns_message(message_json)
1✔
455
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
456
    if response.status_code < 500:
1✔
457
        remove_message_from_s3(bucket, object_key)
1✔
458

459
    return response
1✔
460

461

462
def _get_recipient_with_relay_domain(recipients):
1✔
463
    domains_to_check = get_domains_from_settings().values()
1✔
464
    for recipient in recipients:
1✔
465
        for domain in domains_to_check:
1✔
466
            if domain in recipient:
1✔
467
                return recipient
1✔
468
    return None
1✔
469

470

471
def _get_relay_recipient_from_message_json(message_json):
1✔
472
    # Go thru all To, Cc, and Bcc fields and
473
    # return the one that has a Relay domain
474

475
    # First check commmon headers for to or cc match
476
    headers_to_check = "to", "cc"
1✔
477
    common_headers = message_json["mail"]["commonHeaders"]
1✔
478
    for header in headers_to_check:
1✔
479
        if header in common_headers:
1✔
480
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
481
            if recipient is not None:
1✔
482
                return parseaddr(recipient)[1]
1✔
483

484
    # SES-SNS sends bcc in a different part of the message
485
    recipients = message_json["receipt"]["recipients"]
1✔
486
    return _get_recipient_with_relay_domain(recipients)
1✔
487

488

489
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
490
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
491
    notification_type = message_json.get("notificationType")
1✔
492
    event_type = message_json.get("eventType")
1✔
493
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
494
        return _handle_bounce(message_json)
1✔
495
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
496
        return _handle_complaint(message_json)
1✔
497
    if notification_type != "Received":
1!
498
        raise ValueError('notification_type must be "Received"')
×
499
    if event_type is not None:
1!
500
        raise ValueError("event_type must be None")
×
501
    return _handle_received(message_json)
1✔
502

503

504
# Enumerate the reasons that an email was not forwarded.
505
# This excludes emails dropped due to mask forwarding settings,
506
# such as "block all" and "block promotional". Those are logged
507
# as Glean email_blocked events.
508
EmailDroppedReason = Literal[
1✔
509
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
510
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
511
    "hard_bounce_pause",  # The user recently had a hard bounce
512
    "soft_bounce_pause",  # The user recently has a soft bounce
513
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
514
    "user_deactivated",  # The user account is deactivated
515
    "reply_requires_premium",  # The email is a reply from a free user
516
    "content_missing",  # Could not load the email from storage
517
    "error_from_header",  # Error generating the From: header, retryable
518
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
519
    "error_sending",  # Error sending the forwarded email (SES), retryable
520
]
521

522

523
def log_email_dropped(
1✔
524
    reason: EmailDroppedReason,
525
    mask: RelayAddress | DomainAddress,
526
    is_reply: bool = False,
527
    can_retry: bool = False,
528
) -> None:
529
    """
530
    Log that an email was dropped for a reason other than a mask blocking setting.
531

532
    This mirrors the interface of glean_logger().log_email_blocked(), which
533
    records emails dropped due to the mask's blocking setting.
534
    """
535
    extra: dict[str, str | int | bool] = {"reason": reason}
1✔
536
    if mask.user.profile.metrics_enabled:
1✔
537
        if mask.user.profile.fxa is not None:
1✔
538
            extra["fxa_id"] = mask.user.profile.fxa.uid
1✔
539
        extra["mask_id"] = mask.metrics_id
1✔
540
    extra |= {
1✔
541
        "is_random_mask": isinstance(mask, RelayAddress),
542
        "is_reply": is_reply,
543
        "can_retry": can_retry,
544
    }
545
    info_logger.info("email_dropped", extra=extra)
1✔
546

547

548
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
549
    """
550
    Handle an AWS SES received notification.
551

552
    For more information, see:
553
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
554
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
555

556
    Returns (may be incomplete):
557
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
558
      flagged for abuse, the email is under a bounce pause, the email was suppressed
559
      for spam, the list email was blocked, or the noreply address was the recipient.
560
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
561
      the email failed DMARC with reject policy, or the email is a reply chain to a
562
      non-premium user.
563
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
564
      "CC", or "BCC" fields, or the Relay address is not in the database.
565
    * 503 if the "From" address is malformed, the S3 client returned an error different
566
      from "not found", or the SES client fails
567

568
    And many other returns conditions if the email is a reply. The HTTP returns are an
569
    artifact from an earlier time when emails were sent to a webhook. Currently,
570
    production instead pulls events from a queue.
571

572
    TODO: Return a more appropriate status object
573
    TODO: Document the metrics emitted
574
    """
575
    mail = message_json["mail"]
1✔
576
    if "commonHeaders" not in mail:
1✔
577
        logger.error("SNS message without commonHeaders")
1✔
578
        return HttpResponse(
1✔
579
            "Received SNS notification without commonHeaders.", status=400
580
        )
581
    common_headers = mail["commonHeaders"]
1✔
582
    receipt = message_json["receipt"]
1✔
583

584
    _record_receipt_verdicts(receipt, "all")
1✔
585
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
586
    if to_address is None:
1✔
587
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
588
        return HttpResponse("Address does not exist", status=404)
1✔
589

590
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
591
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
592
    if not from_addresses:
1✔
593
        info_logger.error(
1✔
594
            "_handle_received: no from address",
595
            extra={
596
                "source": mail["source"],
597
                "common_headers_from": common_headers["from"],
598
            },
599
        )
600
        return HttpResponse("Unable to parse From address", status=400)
1✔
601
    from_address = from_addresses[0][1]
1✔
602

603
    try:
1✔
604
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
605
    except ValueError:
1✔
606
        # TODO: Add metric
607
        return HttpResponse("Malformed to field.", status=400)
1✔
608

609
    if to_local_portion.lower() == "noreply":
1✔
610
        incr_if_enabled("email_for_noreply_address", 1)
1✔
611
        return HttpResponse("noreply address is not supported.")
1✔
612
    try:
1✔
613
        # FIXME: this ambiguous return of either
614
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
615
        # up a bit.
616
        address = _get_address(to_address)
1✔
617
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
618
        user_profile = address.user.profile
1✔
619
    except (
1✔
620
        ObjectDoesNotExist,
621
        CannotMakeAddressException,
622
        DeletedAddress.MultipleObjectsReturned,
623
    ):
624
        if to_local_portion.lower() == "replies":
1✔
625
            response = _handle_reply(from_address, message_json, to_address)
1✔
626
        else:
627
            response = HttpResponse("Address does not exist", status=404)
1✔
628
        return response
1✔
629

630
    _record_receipt_verdicts(receipt, "valid_user")
1✔
631
    # if this is spam and the user is set to auto-block spam, early return
632
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
633
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
634
        log_email_dropped(reason="auto_block_spam", mask=address)
1✔
635
        return HttpResponse("Address rejects spam.")
1✔
636

637
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
638
        policy = receipt.get("dmarcPolicy", "none")
1✔
639
        # TODO: determine action on dmarcPolicy "quarantine"
640
        if policy == "reject":
1!
641
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
1✔
642
            incr_if_enabled(
1✔
643
                "email_suppressed_for_dmarc_failure",
644
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
645
            )
646
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
647

648
    # if this user is over bounce limits, early return
649
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
650
    if bounce_paused:
1✔
651
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
652
        incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
1✔
653
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
654
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
655
        )
656
        log_email_dropped(reason=reason, mask=address)
1✔
657
        return HttpResponse("Address is temporarily disabled.")
1✔
658

659
    # check if this is a reply from an external sender to a Relay user
660
    try:
1✔
661
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
662
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
663
        user_address = address
1✔
664
        address = reply_record.address
1✔
665
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
666
        # make sure the relay user is premium
667
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
668
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
1✔
669
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
670
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
671
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
672
        # match a Reply record, continue to treat this as a regular email from
673
        # an external sender to a relay user
674
        pass
1✔
675

676
    # if account flagged for abuse, early return
677
    if user_profile.is_flagged:
1✔
678
        log_email_dropped(reason="abuse_flag", mask=address)
1✔
679
        return HttpResponse("Address is temporarily disabled.")
1✔
680

681
    if not user_profile.user.is_active:
1✔
682
        log_email_dropped(reason="user_deactivated", mask=address)
1✔
683
        return HttpResponse("Account is deactivated.")
1✔
684

685
    # if address is set to block, early return
686
    if not address.enabled:
1✔
687
        incr_if_enabled("email_for_disabled_address", 1)
1✔
688
        address.num_blocked += 1
1✔
689
        address.save(update_fields=["num_blocked"])
1✔
690
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
691
        user_profile.last_engagement = datetime.now(UTC)
1✔
692
        user_profile.save()
1✔
693
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
694
        return HttpResponse("Address is temporarily disabled.")
1✔
695

696
    _record_receipt_verdicts(receipt, "active_alias")
1✔
697
    incr_if_enabled("email_for_active_address", 1)
1✔
698

699
    # if address is blocking list emails, and email is from list, early return
700
    if (
1✔
701
        address
702
        and address.block_list_emails
703
        and user_profile.has_premium
704
        and _check_email_from_list(mail["headers"])
705
    ):
706
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
707
        address.num_blocked += 1
1✔
708
        address.save(update_fields=["num_blocked"])
1✔
709
        user_profile.last_engagement = datetime.now(UTC)
1✔
710
        user_profile.save()
1✔
711
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
712
        return HttpResponse("Address is not accepting list emails.")
1✔
713

714
    # Collect new headers
715
    subject = common_headers.get("subject", "")
1✔
716
    destination_address = user_profile.user.email
1✔
717
    reply_address = get_reply_to_address()
1✔
718
    try:
1✔
719
        from_header = generate_from_header(from_address, to_address)
1✔
720
    except InvalidFromHeader:
1✔
721
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
722
        header_from = []
1✔
723
        for header in mail["headers"]:
1✔
724
            if header["name"].lower() == "from":
1✔
725
                header_from.append(header)
1✔
726
        info_logger.error(
1✔
727
            "generate_from_header",
728
            extra={
729
                "from_address": from_address,
730
                "source": mail["source"],
731
                "common_headers_from": common_headers["from"],
732
                "headers_from": header_from,
733
            },
734
        )
735
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
1✔
736
        return HttpResponse("Cannot parse the From address", status=503)
1✔
737

738
    headers: OutgoingHeaders = {
1✔
739
        "Subject": subject,
740
        "From": from_header,
741
        "To": destination_address,
742
        "Reply-To": reply_address,
743
        "Resent-From": from_address,
744
    }
745

746
    # Get incoming email
747
    try:
1✔
748
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
749
    except ClientError as e:
1✔
750
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
751
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
752
            log_email_dropped(reason="content_missing", mask=address)
1✔
753
            return HttpResponse("Email not in S3", status=404)
1✔
754
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
755
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
1✔
756
        # we are returning a 503 so that SNS can retry the email processing
757
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
758

759
    # Convert to new email
760
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
761
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
762
    remove_level_one_trackers = bool(
1✔
763
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
764
    )
765
    (
1✔
766
        forwarded_email,
767
        issues,
768
        level_one_trackers_removed,
769
        has_html,
770
        has_text,
771
    ) = _convert_to_forwarded_email(
772
        incoming_email_bytes=incoming_email_bytes,
773
        headers=headers,
774
        to_address=to_address,
775
        from_address=from_address,
776
        language=user_profile.language,
777
        has_premium=user_profile.has_premium,
778
        sample_trackers=sample_trackers,
779
        remove_level_one_trackers=remove_level_one_trackers,
780
    )
781
    if has_html:
1✔
782
        incr_if_enabled("email_with_html_content", 1)
1✔
783
    if has_text:
1✔
784
        incr_if_enabled("email_with_text_content", 1)
1✔
785
    if issues:
1✔
786
        info_logger.info(
1✔
787
            "_handle_received: forwarding issues", extra={"issues": issues}
788
        )
789

790
    # Send new email
791
    try:
1✔
792
        ses_response = ses_send_raw_email(
1✔
793
            source_address=reply_address,
794
            destination_address=destination_address,
795
            message=forwarded_email,
796
        )
797
    except ClientError:
1✔
798
        # 503 service unavailable reponse to SNS so it can retry
799
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
1✔
800
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
801

802
    message_id = ses_response["MessageId"]
1✔
803
    _store_reply_record(mail, message_id, address)
1✔
804

805
    user_profile.update_abuse_metric(
1✔
806
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
807
    )
808
    user_profile.last_engagement = datetime.now(UTC)
1✔
809
    user_profile.save()
1✔
810
    address.num_forwarded += 1
1✔
811
    address.last_used_at = datetime.now(UTC)
1✔
812
    if level_one_trackers_removed:
1!
813
        address.num_level_one_trackers_blocked = (
×
814
            address.num_level_one_trackers_blocked or 0
815
        ) + level_one_trackers_removed
816
    address.save(
1✔
817
        update_fields=[
818
            "num_forwarded",
819
            "last_used_at",
820
            "block_list_emails",
821
            "num_level_one_trackers_blocked",
822
        ]
823
    )
824
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
825
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
826

827

828
def _get_verdict(receipt, verdict_type):
1✔
829
    return receipt[f"{verdict_type}Verdict"]["status"]
1✔
830

831

832
def _check_email_from_list(headers):
1✔
833
    for header in headers:
1!
834
        if header["name"].lower().startswith("list-"):
1!
835
            return True
1✔
836
    return False
×
837

838

839
def _record_receipt_verdicts(receipt, state):
1✔
840
    verdict_tags = []
1✔
841
    for key in sorted(receipt.keys()):
1✔
842
        if key.endswith("Verdict"):
1✔
843
            value = receipt[key]["status"]
1✔
844
            verdict_tags.append(f"{key}:{value}")
1✔
845
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
846
        elif key == "dmarcPolicy":
1✔
847
            value = receipt[key]
1✔
848
            verdict_tags.append(f"{key}:{value}")
1✔
849
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
850

851

852
def _get_message_id_from_headers(headers):
1✔
853
    message_id = None
1✔
854
    for header in headers:
1✔
855
        if header["name"].lower() == "message-id":
1✔
856
            message_id = header["value"]
1✔
857
    return message_id
1✔
858

859

860
def _get_keys_from_headers(headers):
1✔
861
    in_reply_to = None
1✔
862
    for header in headers:
1✔
863
        if header["name"].lower() == "in-reply-to":
1✔
864
            in_reply_to = header["value"]
1✔
865
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
866
            return derive_reply_keys(message_id_bytes)
1✔
867

868
        if header["name"].lower() == "references":
1✔
869
            message_ids = header["value"]
1✔
870
            for message_id in message_ids.split(" "):
1✔
871
                message_id_bytes = get_message_id_bytes(message_id)
1✔
872
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
873
                try:
1✔
874
                    # FIXME: calling code is likely to duplicate this query
875
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
876
                    return lookup_key, encryption_key
1✔
877
                except Reply.DoesNotExist:
1✔
878
                    pass
1✔
879
            raise Reply.DoesNotExist
1✔
880
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
881
    raise ReplyHeadersNotFound
1✔
882

883

884
def _get_reply_record_from_lookup_key(lookup_key):
1✔
885
    lookup = b64_lookup_key(lookup_key)
1✔
886
    return Reply.objects.get(lookup=lookup)
1✔
887

888

889
def _strip_localpart_tag(address):
1✔
890
    [localpart, domain] = address.split("@")
1✔
891
    subaddress_parts = localpart.split("+")
1✔
892
    return f"{subaddress_parts[0]}@{domain}"
1✔
893

894

895
_TransportType = Literal["sns", "s3"]
1✔
896

897

898
def _get_email_bytes(
1✔
899
    message_json: AWS_SNSMessageJSON,
900
) -> tuple[bytes, _TransportType, float]:
901
    with Timer(logger=None) as load_timer:
1✔
902
        if "content" in message_json:
1✔
903
            # email content in sns message
904
            message_content = message_json["content"].encode("utf-8")
1✔
905
            transport: Literal["sns", "s3"] = "sns"
1✔
906
        else:
907
            # assume email content in S3
908
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
909
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
910
            transport = "s3"
1✔
911
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
912
    load_time_s = round(load_timer.last, 3)
1✔
913
    return (message_content, transport, load_time_s)
1✔
914

915

916
def _convert_to_forwarded_email(
1✔
917
    incoming_email_bytes: bytes,
918
    headers: OutgoingHeaders,
919
    to_address: str,
920
    from_address: str,
921
    language: str,
922
    has_premium: bool,
923
    sample_trackers: bool,
924
    remove_level_one_trackers: bool,
925
    now: datetime | None = None,
926
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
927
    """
928
    Convert an email (as bytes) to a forwarded email.
929

930
    Return is a tuple:
931
    - email - The forwarded email
932
    - issues - Any detected issues in conversion
933
    - level_one_trackers_removed (int) - Number of trackers removed
934
    - has_html - True if the email has an HTML representation
935
    - has_text - True if the email has a plain text representation
936
    """
937
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
938
    # python/typeshed issue 2418
939
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
940
    # policy.default.message_factory is EmailMessage
941
    if not isinstance(email, EmailMessage):
1!
942
        raise TypeError("email must be type EmailMessage")
×
943

944
    # Replace headers in the original email
945
    header_issues = _replace_headers(email, headers)
1✔
946

947
    # Find and replace text content
948
    text_body = email.get_body("plain")
1✔
949
    text_content = None
1✔
950
    has_text = False
1✔
951
    if text_body:
1✔
952
        has_text = True
1✔
953
        if not isinstance(text_body, EmailMessage):
1!
954
            raise TypeError("text_body must be type EmailMessage")
×
955
        text_content = text_body.get_content()
1✔
956
        new_text_content = _convert_text_content(text_content, to_address)
1✔
957
        text_body.set_content(new_text_content)
1✔
958

959
    # Find and replace HTML content
960
    html_body = email.get_body("html")
1✔
961
    level_one_trackers_removed = 0
1✔
962
    has_html = False
1✔
963
    if html_body:
1✔
964
        has_html = True
1✔
965
        if not isinstance(html_body, EmailMessage):
1!
966
            raise TypeError("html_body must be type EmailMessage")
×
967
        html_content = html_body.get_content()
1✔
968
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
969
            html_content,
970
            to_address,
971
            from_address,
972
            language,
973
            has_premium,
974
            sample_trackers,
975
            remove_level_one_trackers,
976
        )
977
        html_body.set_content(new_content, subtype="html")
1✔
978
    elif text_content:
1!
979
        # Try to use the text content to generate HTML content
980
        html_content = urlize_and_linebreaks(text_content)
1✔
981
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
982
            html_content,
983
            to_address,
984
            from_address,
985
            language,
986
            has_premium,
987
            sample_trackers,
988
            remove_level_one_trackers,
989
        )
990
        if not isinstance(text_body, EmailMessage):
1!
991
            raise TypeError("text_body must be type EmailMessage")
×
992
        try:
1✔
993
            text_body.add_alternative(new_content, subtype="html")
1✔
994
        except TypeError as e:
×
995
            out = StringIO()
×
996
            _structure(email, fp=out)
×
997
            info_logger.error(
×
998
                "Adding HTML alternate failed",
999
                extra={"exception": str(e), "structure": out.getvalue()},
1000
            )
1001

1002
    issues: EmailForwardingIssues = {}
1✔
1003
    if header_issues:
1✔
1004
        issues["headers"] = header_issues
1✔
1005
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
1006

1007

1008
def _replace_headers(
1✔
1009
    email: EmailMessage, headers: OutgoingHeaders
1010
) -> EmailHeaderIssues:
1011
    """
1012
    Replace the headers in email with new headers.
1013

1014
    This replaces headers in the passed email object, rather than returns an altered
1015
    copy. The primary reason is that the Python email package can read an email with
1016
    non-compliant headers or content, but can't write it. A read/write is required to
1017
    create a copy that we then alter. This code instead alters the passed EmailMessage
1018
    object, making header-specific changes in try / except statements.
1019

1020
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1021
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1022
    nice to handle the non-compliant headers without crashing before we add a source of
1023
    memory-related crashes.
1024
    """
1025
    # Look for headers to drop
1026
    to_drop: list[str] = []
1✔
1027
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
1✔
1028
    issues: EmailHeaderIssues = []
1✔
1029

1030
    # Detect non-compliant headers in incoming emails
1031
    for header in email.keys():
1✔
1032
        try:
1✔
1033
            value = email[header]
1✔
1034
        except Exception as e:
1✔
1035
            issues.append(
1✔
1036
                {"header": header, "direction": "in", "exception_on_read": repr(e)}
1037
            )
1038
            value = None
1✔
1039
        if getattr(value, "defects", None):
1✔
1040
            issues.append(
1✔
1041
                {
1042
                    "header": header,
1043
                    "direction": "in",
1044
                    "defect_count": len(value.defects),
1045
                    "parsed_value": str(value),
1046
                    "raw_value": str(value.as_raw),
1047
                }
1048
            )
1049
        elif getattr(getattr(value, "_parse_tree", None), "all_defects", []):
1✔
1050
            issues.append(
1✔
1051
                {
1052
                    "header": header,
1053
                    "direction": "in",
1054
                    "defect_count": len(value._parse_tree.all_defects),
1055
                    "parsed_value": str(value),
1056
                    "raw_value": str(value.as_raw),
1057
                }
1058
            )
1059

1060
    # Collect headers that will not be forwarded
1061
    for header in email.keys():
1✔
1062
        header_lower = header.lower()
1✔
1063
        if (
1✔
1064
            header_lower not in replacements
1065
            and header_lower != "mime-version"
1066
            and not header_lower.startswith("content-")
1067
        ):
1068
            to_drop.append(header)
1✔
1069

1070
    # Drop headers that should be dropped
1071
    for header in to_drop:
1✔
1072
        del email[header]
1✔
1073

1074
    # Replace the requested headers
1075
    for header, value in headers.items():
1✔
1076
        del email[header]
1✔
1077
        try:
1✔
1078
            email[header] = value.rstrip("\r\n")
1✔
1079
        except Exception as e:
×
NEW
1080
            issues.append(
×
1081
                {
1082
                    "header": header,
1083
                    "direction": "out",
1084
                    "exception_on_write": repr(e),
1085
                    "value": value,
1086
                }
1087
            )
1088
            continue
×
1089
        try:
1✔
1090
            parsed_value = email[header]
1✔
1091
        except Exception as e:
×
NEW
1092
            issues.append(
×
1093
                {
1094
                    "header": header,
1095
                    "direction": "out",
1096
                    "exception_on_write": repr(e),
1097
                    "value": value,
1098
                }
1099
            )
UNCOV
1100
            continue
×
1101
        if parsed_value.defects:
1!
NEW
1102
            issues.append(
×
1103
                {
1104
                    "header": header,
1105
                    "direction": "out",
1106
                    "defect_count": len(parsed_value.defects),
1107
                    "parsed_value": str(parsed_value),
1108
                    "raw_value": str(parsed_value.as_raw),
1109
                },
1110
            )
1111

1112
    return issues
1✔
1113

1114

1115
def _convert_html_content(
1✔
1116
    html_content: str,
1117
    to_address: str,
1118
    from_address: str,
1119
    language: str,
1120
    has_premium: bool,
1121
    sample_trackers: bool,
1122
    remove_level_one_trackers: bool,
1123
    now: datetime | None = None,
1124
) -> tuple[str, int]:
1125
    # frontend expects a timestamp in milliseconds
1126
    now = now or datetime.now(UTC)
1✔
1127
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1128

1129
    # scramble alias so that clients don't recognize it
1130
    # and apply default link styles
1131
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1132

1133
    # sample tracker numbers
1134
    if sample_trackers:
1!
1135
        count_all_trackers(html_content)
×
1136

1137
    tracker_report_link = ""
1✔
1138
    removed_count = 0
1✔
1139
    if remove_level_one_trackers:
1!
1140
        html_content, tracker_details = remove_trackers(
×
1141
            html_content, from_address, datetime_now_ms
1142
        )
1143
        removed_count = tracker_details["tracker_removed"]
×
1144
        tracker_report_details = {
×
1145
            "sender": from_address,
1146
            "received_at": datetime_now_ms,
1147
            "trackers": tracker_details["level_one"]["trackers"],
1148
        }
1149
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1150
            tracker_report_details
1151
        )
1152

1153
    wrapped_html = wrap_html_email(
1✔
1154
        original_html=html_content,
1155
        language=language,
1156
        has_premium=has_premium,
1157
        display_email=display_email,
1158
        tracker_report_link=tracker_report_link,
1159
        num_level_one_email_trackers_removed=removed_count,
1160
    )
1161
    return wrapped_html, removed_count
1✔
1162

1163

1164
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1165
    relay_header_text = (
1✔
1166
        "This email was sent to your alias "
1167
        f"{to_address}. To stop receiving emails sent to this alias, "
1168
        "update the forwarding settings in your dashboard.\n"
1169
        "---Begin Email---\n"
1170
    )
1171
    wrapped_text = relay_header_text + text_content
1✔
1172
    return wrapped_text
1✔
1173

1174

1175
def _build_reply_requires_premium_email(
1✔
1176
    from_address: str,
1177
    reply_record: Reply,
1178
    message_id: str | None,
1179
    decrypted_metadata: dict[str, Any] | None,
1180
) -> EmailMessage:
1181
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1182
    # will forward.  So, tell the user we forwarded it.
1183
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1184
    sender: str | None = ""
1✔
1185
    if decrypted_metadata is not None:
1!
1186
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1187
    ctx = {
1✔
1188
        "sender": sender or "",
1189
        "forwarded": forwarded,
1190
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1191
    }
1192
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1193
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1194

1195
    # Create the message
1196
    msg = EmailMessage()
1✔
1197
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1198
    msg["From"] = get_reply_to_address()
1✔
1199
    msg["To"] = from_address
1✔
1200
    if message_id:
1!
1201
        msg["In-Reply-To"] = message_id
1✔
1202
        msg["References"] = message_id
1✔
1203
    msg.set_content(text_body)
1✔
1204
    msg.add_alternative(html_body, subtype="html")
1✔
1205
    return msg
1✔
1206

1207

1208
def _set_forwarded_first_reply(profile):
1✔
1209
    profile.forwarded_first_reply = True
1✔
1210
    profile.save()
1✔
1211

1212

1213
def _send_reply_requires_premium_email(
1✔
1214
    from_address: str,
1215
    reply_record: Reply,
1216
    message_id: str | None,
1217
    decrypted_metadata: dict[str, Any] | None,
1218
) -> None:
1219
    msg = _build_reply_requires_premium_email(
×
1220
        from_address, reply_record, message_id, decrypted_metadata
1221
    )
1222
    try:
×
1223
        ses_send_raw_email(
×
1224
            source_address=get_reply_to_address(premium=False),
1225
            destination_address=from_address,
1226
            message=msg,
1227
        )
1228
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1229
        # So, updated the DB.
1230
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1231
    except ClientError as e:
×
1232
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1233
    incr_if_enabled("free_user_reply_attempt", 1)
×
1234

1235

1236
def _reply_allowed(
1✔
1237
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1238
):
1239
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1240
    reply_record_email = reply_record.address.user.email
1✔
1241
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1242
    if (from_address == reply_record_email) or (
1!
1243
        stripped_from_address == stripped_reply_record_address
1244
    ):
1245
        # This is a Relay user replying to an external sender;
1246

1247
        if not reply_record.profile.user.is_active:
1!
1248
            return False
×
1249

1250
        if reply_record.profile.is_flagged:
1!
1251
            return False
×
1252

1253
        if reply_record.owner_has_premium:
1!
1254
            return True
1✔
1255

1256
        # if we haven't forwarded a first reply for this user, return True to allow
1257
        # this first reply
1258
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
1259
        _send_reply_requires_premium_email(
×
1260
            from_address, reply_record, message_id, decrypted_metadata
1261
        )
1262
        return allow_first_reply
×
1263
    else:
1264
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1265
        # premium Relay user
1266
        try:
×
1267
            address = _get_address(to_address)
×
1268
            if address.user.profile.has_premium:
×
1269
                return True
×
1270
        except ObjectDoesNotExist:
×
1271
            return False
×
1272
    incr_if_enabled("free_user_reply_attempt", 1)
×
1273
    return False
×
1274

1275

1276
def _handle_reply(
1✔
1277
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1278
) -> HttpResponse:
1279
    """
1280
    Handle a reply from a Relay user to an external email.
1281

1282
    Returns (may be incomplete):
1283
    * 200 if the reply was sent
1284
    * 400 if the In-Reply-To and References headers are missing, none of the References
1285
      headers are a reply record, or the SES client raises an error
1286
    * 403 if the Relay user is not allowed to reply
1287
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1288
      the database
1289
    * 503 if the S3 client returns an error (other than not found), or the SES client
1290
      returns an error
1291

1292
    TODO: Return a more appropriate status object (see _handle_received)
1293
    TODO: Document metrics emitted
1294
    """
1295
    mail = message_json["mail"]
1✔
1296
    try:
1✔
1297
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1298
    except ReplyHeadersNotFound:
1✔
1299
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1300
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1301

1302
    try:
1✔
1303
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1304
    except Reply.DoesNotExist:
1✔
1305
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1306
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1307

1308
    address = reply_record.address
1✔
1309
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1310
    decrypted_metadata = json.loads(
1✔
1311
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1312
    )
1313
    if not _reply_allowed(
1✔
1314
        from_address, to_address, reply_record, message_id, decrypted_metadata
1315
    ):
1316
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
1✔
1317
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1318

1319
    outbound_from_address = address.full_address
1✔
1320
    incr_if_enabled("reply_email", 1)
1✔
1321
    subject = mail["commonHeaders"].get("subject", "")
1✔
1322
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1323
    headers: OutgoingHeaders = {
1✔
1324
        "Subject": subject,
1325
        "From": outbound_from_address,
1326
        "To": to_address,
1327
        "Reply-To": outbound_from_address,
1328
    }
1329

1330
    try:
1✔
1331
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1332
    except ClientError as e:
1✔
1333
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1334
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1335
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
1✔
1336
            return HttpResponse("Email not in S3", status=404)
1✔
1337
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1338
        log_email_dropped(
1✔
1339
            reason="error_storage", mask=address, is_reply=True, can_retry=True
1340
        )
1341
        # we are returning a 500 so that SNS can retry the email processing
1342
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1343

1344
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1345
    if not isinstance(email, EmailMessage):
1!
1346
        raise TypeError("email must be type EmailMessage")
×
1347

1348
    # Convert to a reply email
1349
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1350
    _replace_headers(email, headers)
1✔
1351

1352
    try:
1✔
1353
        ses_send_raw_email(
1✔
1354
            source_address=outbound_from_address,
1355
            destination_address=to_address,
1356
            message=email,
1357
        )
1358
    except ClientError:
1✔
1359
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
1✔
1360
        return HttpResponse("SES client error", status=400)
1✔
1361

1362
    reply_record.increment_num_replied()
1✔
1363
    profile = address.user.profile
1✔
1364
    profile.update_abuse_metric(replied=True)
1✔
1365
    profile.last_engagement = datetime.now(UTC)
1✔
1366
    profile.save()
1✔
1367
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1368
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1369

1370

1371
def _get_domain_address(local_portion: str, domain_portion: str) -> DomainAddress:
1✔
1372
    """
1373
    Find or create the DomainAddress for the parts of an email address.
1374

1375
    If the domain_portion is for a valid subdomain, a new DomainAddress
1376
    will be created and returned.
1377

1378
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1379

1380
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1381
    """
1382

1383
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1384
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1385
        incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1386
        raise ObjectDoesNotExist("Address does not exist")
1✔
1387
    try:
1✔
1388
        with transaction.atomic():
1✔
1389
            locked_profile = Profile.objects.select_for_update().get(
1✔
1390
                subdomain=address_subdomain
1391
            )
1392
            domain_numerical = get_domain_numerical(address_domain)
1✔
1393
            # filter DomainAddress because it may not exist
1394
            # which will throw an error with get()
1395
            domain_address = DomainAddress.objects.filter(
1✔
1396
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1397
            ).first()
1398
            if domain_address is None:
1✔
1399
                # TODO: Consider flows when a user generating alias on a fly
1400
                # was unable to receive an email due to user no longer being a
1401
                # premium user as seen in exception thrown on make_domain_address
1402
                domain_address = DomainAddress.make_domain_address(
1✔
1403
                    locked_profile.user, local_portion, True
1404
                )
1405
                glean_logger().log_email_mask_created(
1✔
1406
                    mask=domain_address,
1407
                    created_by_api=False,
1408
                )
1409
            domain_address.last_used_at = datetime.now(UTC)
1✔
1410
            domain_address.save()
1✔
1411
            return domain_address
1✔
1412
    except Profile.DoesNotExist as e:
1✔
1413
        incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1414
        raise e
1✔
1415

1416

1417
def _get_address(address: str) -> RelayAddress | DomainAddress:
1✔
1418
    """
1419
    Find or create the RelayAddress or DomainAddress for an email address.
1420

1421
    If an unknown email address is for a valid subdomain, a new DomainAddress
1422
    will be created.
1423

1424
    On failure, raises exception based on Django's ObjectDoesNotExist:
1425
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1426
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1427
    * ObjectDoesNotExist - Unknown domain
1428
    """
1429

1430
    local_portion, domain_portion = address.split("@")
1✔
1431
    local_address = local_portion.lower()
1✔
1432
    domain = domain_portion.lower()
1✔
1433

1434
    # if the domain is not the site's 'top' relay domain,
1435
    # it may be for a user's subdomain
1436
    email_domains = get_domains_from_settings().values()
1✔
1437
    if domain not in email_domains:
1✔
1438
        return _get_domain_address(local_address, domain)
1✔
1439

1440
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1441
    try:
1✔
1442
        domain_numerical = get_domain_numerical(domain)
1✔
1443
        relay_address = RelayAddress.objects.get(
1✔
1444
            address=local_address, domain=domain_numerical
1445
        )
1446
        return relay_address
1✔
1447
    except RelayAddress.DoesNotExist as e:
1✔
1448
        try:
1✔
1449
            DeletedAddress.objects.get(
1✔
1450
                address_hash=address_hash(local_address, domain=domain)
1451
            )
1452
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1453
            # TODO: create a hard bounce receipt rule in SES
1454
        except DeletedAddress.DoesNotExist:
1✔
1455
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1456
        except DeletedAddress.MultipleObjectsReturned:
1✔
1457
            # not sure why this happens on stage but let's handle it
1458
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1459
        raise e
1✔
1460

1461

1462
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1463
    """
1464
    Handle an AWS SES bounce notification.
1465

1466
    For more information, see:
1467
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1468

1469
    Returns:
1470
    * 404 response if any email address does not match a user,
1471
    * 200 response if all match or none are given
1472

1473
    Emits a counter metric "email_bounce" with these tags:
1474
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1475
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1476
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1477
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1478

1479
    Emits an info log "bounce_notification", same data as metric, plus:
1480
    * bounce_action: 'action' from bounced recipient data, or None
1481
    * bounce_status: 'status' from bounced recipient data, or None
1482
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1483
    * bounce_extra: Extra data from bounce_recipient data, if any
1484
    * domain: User's real email address domain, if an address was given
1485
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1486
    """
1487
    bounce = message_json.get("bounce", {})
1✔
1488
    bounce_type = bounce.get("bounceType", "none")
1✔
1489
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1490
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1491

1492
    now = datetime.now(UTC)
1✔
1493
    bounce_data = []
1✔
1494
    for recipient in bounced_recipients:
1✔
1495
        recipient_address = recipient.pop("emailAddress", None)
1✔
1496
        data = {
1✔
1497
            "bounce_type": bounce_type,
1498
            "bounce_subtype": bounce_subtype,
1499
            "bounce_action": recipient.pop("action", ""),
1500
            "bounce_status": recipient.pop("status", ""),
1501
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1502
            "user_match": "no_address",
1503
            "relay_action": "no_action",
1504
        }
1505
        if recipient:
1!
1506
            data["bounce_extra"] = recipient.copy()
×
1507
        bounce_data.append(data)
1✔
1508

1509
        if recipient_address is None:
1!
1510
            continue
×
1511

1512
        recipient_address = parseaddr(recipient_address)[1]
1✔
1513
        recipient_domain = recipient_address.split("@")[1]
1✔
1514
        data["domain"] = recipient_domain
1✔
1515

1516
        try:
1✔
1517
            user = User.objects.get(email=recipient_address)
1✔
1518
            profile = user.profile
1✔
1519
            data["user_match"] = "found"
1✔
1520
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1521
                data["fxa_id"] = fxa.uid
1✔
1522
            else:
1523
                data["fxa_id"] = ""
1✔
1524
        except User.DoesNotExist:
1✔
1525
            # TODO: handle bounce for a user who no longer exists
1526
            # add to SES account-wide suppression list?
1527
            data["user_match"] = "missing"
1✔
1528
            continue
1✔
1529

1530
        action = None
1✔
1531
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1532
            # if an email bounced as spam, set to auto block spam for this user
1533
            # and DON'T set them into bounce pause state
1534
            action = "auto_block_spam"
1✔
1535
            profile.auto_block_spam = True
1✔
1536
        elif bounce_type == "Permanent":
1✔
1537
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1538
            action = "hard_bounce"
1✔
1539
            profile.last_hard_bounce = now
1✔
1540
        elif bounce_type == "Transient":
1!
1541
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1542
            action = "soft_bounce"
1✔
1543
            profile.last_soft_bounce = now
1✔
1544
        if action:
1!
1545
            data["relay_action"] = action
1✔
1546
            profile.save()
1✔
1547

1548
    if not bounce_data:
1!
1549
        # Data when there are no identified recipients
1550
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1551

1552
    for data in bounce_data:
1✔
1553
        tags = {
1✔
1554
            "bounce_type": bounce_type,
1555
            "bounce_subtype": bounce_subtype,
1556
            "user_match": data["user_match"],
1557
            "relay_action": data["relay_action"],
1558
        }
1559
        incr_if_enabled(
1✔
1560
            "email_bounce",
1561
            1,
1562
            tags=[generate_tag(key, val) for key, val in tags.items()],
1563
        )
1564
        info_logger.info("bounce_notification", extra=data)
1✔
1565

1566
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1567
        return HttpResponse("Address does not exist", status=404)
1✔
1568
    return HttpResponse("OK", status=200)
1✔
1569

1570

1571
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1572
    """
1573
    Handle an AWS SES complaint notification.
1574

1575
    For more information, see:
1576
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1577

1578
    Returns:
1579
    * 404 response if any email address does not match a user,
1580
    * 200 response if all match or none are given
1581

1582
    Emits a counter metric "email_complaint" with these tags:
1583
    * complaint_subtype: 'onaccounsuppressionlist', or 'none' if omitted
1584
    * complaint_feedback - feedback enumeration from ISP or 'none'
1585
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1586
    * relay_action: 'no_action', 'auto_block_spam'
1587

1588
    Emits an info log "complaint_notification", same data as metric, plus:
1589
    * complaint_user_agent - identifies the client used to file the complaint
1590
    * complaint_extra - Extra data from complainedRecipients data, if any
1591
    * domain - User's domain, if an address was given
1592
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1593
    """
1594
    complaint = deepcopy(message_json.get("complaint", {}))
1✔
1595
    complained_recipients = complaint.pop("complainedRecipients", [])
1✔
1596
    subtype = complaint.pop("complaintSubType", None)
1✔
1597
    user_agent = complaint.pop("userAgent", None)
1✔
1598
    feedback = complaint.pop("complaintFeedbackType", None)
1✔
1599

1600
    complaint_data = []
1✔
1601
    for recipient in complained_recipients:
1✔
1602
        recipient_address = recipient.pop("emailAddress", None)
1✔
1603
        data = {
1✔
1604
            "complaint_subtype": subtype,
1605
            "complaint_user_agent": user_agent,
1606
            "complaint_feedback": feedback,
1607
            "user_match": "no_address",
1608
            "relay_action": "no_action",
1609
        }
1610
        if recipient:
1!
1611
            data["complaint_extra"] = recipient.copy()
×
1612
        complaint_data.append(data)
1✔
1613

1614
        if recipient_address is None:
1!
1615
            continue
×
1616

1617
        recipient_address = parseaddr(recipient_address)[1]
1✔
1618
        recipient_domain = recipient_address.split("@")[1]
1✔
1619
        data["domain"] = recipient_domain
1✔
1620

1621
        try:
1✔
1622
            user = User.objects.get(email=recipient_address)
1✔
1623
            profile = user.profile
1✔
1624
            data["user_match"] = "found"
1✔
1625
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1626
                data["fxa_id"] = fxa.uid
1✔
1627
            else:
1628
                data["fxa_id"] = ""
1✔
1629
        except User.DoesNotExist:
×
1630
            data["user_match"] = "missing"
×
1631
            continue
×
1632

1633
        data["relay_action"] = "auto_block_spam"
1✔
1634
        profile.auto_block_spam = True
1✔
1635
        profile.save()
1✔
1636

1637
    if not complaint_data:
1!
1638
        # Data when there are no identified recipients
1639
        complaint_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1640

1641
    for data in complaint_data:
1✔
1642
        tags = {
1✔
1643
            "complaint_subtype": subtype or "none",
1644
            "complaint_feedback": feedback or "none",
1645
            "user_match": data["user_match"],
1646
            "relay_action": data["relay_action"],
1647
        }
1648
        incr_if_enabled(
1✔
1649
            "email_complaint",
1650
            1,
1651
            tags=[generate_tag(key, val) for key, val in tags.items()],
1652
        )
1653
        info_logger.info("complaint_notification", extra=data)
1✔
1654

1655
    if any(data["user_match"] == "missing" for data in complaint_data):
1!
1656
        return HttpResponse("Address does not exist", status=404)
×
1657
    return HttpResponse("OK", status=200)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc