• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / b2e067fe-ce4e-4099-9bef-07b368e99782

15 Apr 2024 04:18PM CUT coverage: 75.544% (+0.002%) from 75.542%
b2e067fe-ce4e-4099-9bef-07b368e99782

push

circleci

jwhitlock
Enable pyupgrade, fix issues

2443 of 3405 branches covered (71.75%)

Branch coverage included in aggregate %.

56 of 59 new or added lines in 14 files covered. (94.92%)

234 existing lines in 24 files now uncovered.

6793 of 8821 relevant lines covered (77.01%)

20.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.99
/emails/views.py
1
import html
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import shlex
1✔
6
from collections import defaultdict
1✔
7
from copy import deepcopy
1✔
8
from datetime import UTC, datetime
1✔
9
from email import message_from_bytes
1✔
10
from email.iterators import _structure
1✔
11
from email.message import EmailMessage
1✔
12
from email.utils import parseaddr
1✔
13
from io import StringIO
1✔
14
from json import JSONDecodeError
1✔
15
from textwrap import dedent
1✔
16
from typing import Any, Literal
1✔
17
from urllib.parse import urlencode
1✔
18

19
from django.conf import settings
1✔
20
from django.contrib.auth.models import User
1✔
21
from django.core.exceptions import ObjectDoesNotExist
1✔
22
from django.db import transaction
1✔
23
from django.db.models import prefetch_related_objects
1✔
24
from django.http import HttpRequest, HttpResponse
1✔
25
from django.shortcuts import render
1✔
26
from django.template.loader import render_to_string
1✔
27
from django.utils.html import escape
1✔
28
from django.views.decorators.csrf import csrf_exempt
1✔
29

30
from botocore.exceptions import ClientError
1✔
31
from codetiming import Timer
1✔
32
from decouple import strtobool
1✔
33
from markus.utils import generate_tag
1✔
34
from sentry_sdk import capture_message
1✔
35
from waffle import sample_is_active
1✔
36

37
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
38
from privaterelay.utils import (
1✔
39
    flag_is_active_in_task,
40
    get_subplat_upgrade_link_by_language,
41
    glean_logger,
42
)
43

44
from .models import (
1✔
45
    CannotMakeAddressException,
46
    DeletedAddress,
47
    DomainAddress,
48
    Profile,
49
    RelayAddress,
50
    Reply,
51
    address_hash,
52
    get_domain_numerical,
53
)
54
from .policy import relay_policy
1✔
55
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
1✔
56
from .types import (
1✔
57
    AWS_MailJSON,
58
    AWS_SNSMessageJSON,
59
    EmailForwardingIssues,
60
    EmailHeaderIssues,
61
    OutgoingHeaders,
62
)
63
from .utils import (
1✔
64
    InvalidFromHeader,
65
    _get_bucket_and_key_from_s3_json,
66
    b64_lookup_key,
67
    count_all_trackers,
68
    decrypt_reply_metadata,
69
    derive_reply_keys,
70
    encrypt_reply_metadata,
71
    generate_from_header,
72
    get_domains_from_settings,
73
    get_message_content_from_s3,
74
    get_message_id_bytes,
75
    get_reply_to_address,
76
    histogram_if_enabled,
77
    incr_if_enabled,
78
    parse_email_header,
79
    remove_message_from_s3,
80
    remove_trackers,
81
    ses_send_raw_email,
82
    urlize_and_linebreaks,
83
)
84

85
logger = logging.getLogger("events")
1✔
86
info_logger = logging.getLogger("eventsinfo")
1✔
87

88

89
class ReplyHeadersNotFound(Exception):
1✔
90
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
91
        self.message = message
1✔
92

93

94
def first_time_user_test(request):
1✔
95
    """
96
    Demonstrate rendering of the "First time Relay user" email.
97
    Settings like language can be given in the querystring, otherwise settings
98
    come from a random free profile.
99
    """
100
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
UNCOV
101
    email_context = {
×
102
        "in_bundle_country": in_bundle_country,
103
        "SITE_ORIGIN": settings.SITE_ORIGIN,
104
    }
105
    if request.GET.get("format", "html") == "text":
×
UNCOV
106
        return render(
×
107
            request,
108
            "emails/first_time_user.txt",
109
            email_context,
110
            "text/plain; charset=utf-8",
111
        )
UNCOV
112
    return render(request, "emails/first_time_user.html", email_context)
×
113

114

115
def reply_requires_premium_test(request):
1✔
116
    """
117
    Demonstrate rendering of the "Reply requires premium" email.
118

119
    Settings like language can be given in the querystring, otherwise settings
120
    come from a random free profile.
121
    """
122
    email_context = {
1✔
123
        "sender": "test@example.com",
124
        "forwarded": True,
125
        "SITE_ORIGIN": settings.SITE_ORIGIN,
126
    }
127
    for param in request.GET:
1✔
128
        email_context[param] = request.GET.get(param)
1✔
129
        if param == "forwarded" and request.GET[param] == "True":
1✔
130
            email_context[param] = True
1✔
131

132
    for param in request.GET:
1✔
133
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
134
            return render(
1✔
135
                request,
136
                "emails/reply_requires_premium.txt",
137
                email_context,
138
                "text/plain; charset=utf-8",
139
            )
140
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
141

142

143
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
144
    # TO DO: Update with correct context when trigger is created
UNCOV
145
    first_forwarded_email_html = render_to_string(
×
146
        "emails/first_forwarded_email.html",
147
        {
148
            "SITE_ORIGIN": settings.SITE_ORIGIN,
149
        },
150
    )
151

UNCOV
152
    wrapped_email = wrap_html_email(
×
153
        first_forwarded_email_html,
154
        "en-us",
155
        True,
156
        "test@example.com",
157
        0,
158
    )
159

UNCOV
160
    return HttpResponse(wrapped_email)
×
161

162

163
def wrap_html_email(
1✔
164
    original_html: str,
165
    language: str,
166
    has_premium: bool,
167
    display_email: str,
168
    num_level_one_email_trackers_removed: int | None = None,
169
    tracker_report_link: str | None = None,
170
) -> str:
171
    """Add Relay banners, surveys, etc. to an HTML email"""
172
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
173
    email_context = {
1✔
174
        "original_html": original_html,
175
        "language": language,
176
        "has_premium": has_premium,
177
        "subplat_upgrade_link": subplat_upgrade_link,
178
        "display_email": display_email,
179
        "tracker_report_link": tracker_report_link,
180
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
181
        "SITE_ORIGIN": settings.SITE_ORIGIN,
182
    }
183
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
184
    # Remove empty lines
185
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
186
    return "\n".join(content_lines) + "\n"
1✔
187

188

189
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
190
    """
191
    Demonstrate rendering of forwarded HTML emails.
192

193
    Settings like language can be given in the querystring, otherwise settings
194
    come from a randomly chosen profile.
195
    """
196

197
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
198
        user_profile = None
1✔
199
    else:
200
        user_profile = Profile.objects.order_by("?").first()
1✔
201

202
    if "language" in request.GET:
1✔
203
        language = request.GET["language"]
1✔
204
    else:
205
        assert user_profile is not None
1✔
206
        language = user_profile.language
1✔
207

208
    if "has_premium" in request.GET:
1✔
209
        has_premium = strtobool(request.GET["has_premium"])
1✔
210
    else:
211
        assert user_profile is not None
1✔
212
        has_premium = user_profile.has_premium
1✔
213

214
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
215
        num_level_one_email_trackers_removed = int(
1✔
216
            request.GET["num_level_one_email_trackers_removed"]
217
        )
218
    else:
219
        num_level_one_email_trackers_removed = 0
1✔
220

221
    if "has_tracker_report_link" in request.GET:
1✔
222
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
223
    else:
224
        has_tracker_report_link = False
1✔
225
    if has_tracker_report_link:
1✔
226
        if num_level_one_email_trackers_removed:
1✔
227
            trackers = {
1✔
228
                "fake-tracker.example.com": num_level_one_email_trackers_removed
229
            }
230
        else:
231
            trackers = {}
1✔
232
        tracker_report_link = (
1✔
233
            "/tracker-report/#{"
234
            '"sender": "sender@example.com", '
235
            '"received_at": 1658434657, '
236
            f'"trackers": { json.dumps(trackers) }'
237
            "}"
238
        )
239
    else:
240
        tracker_report_link = ""
1✔
241

242
    path = "/emails/wrapped_email_test"
1✔
243
    old_query = {
1✔
244
        "language": language,
245
        "has_premium": "Yes" if has_premium else "No",
246
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
247
        "num_level_one_email_trackers_removed": str(
248
            num_level_one_email_trackers_removed
249
        ),
250
    }
251

252
    def switch_link(key, value):
1✔
253
        if old_query[key] == value:
1✔
254
            return str(value)
1✔
255
        new_query = old_query.copy()
1✔
256
        new_query[key] = value
1✔
257
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
258

259
    html_content = dedent(
1✔
260
        f"""\
261
    <p>
262
      <strong>Email rendering Test</strong>
263
    </p>
264
    <p>Settings: (<a href="{path}">clear all</a>)</p>
265
    <ul>
266
      <li>
267
        <strong>language</strong>:
268
        {escape(language)}
269
        (switch to
270
        {switch_link("language", "en-us")},
271
        {switch_link("language", "de")},
272
        {switch_link("language", "en-gb")},
273
        {switch_link("language", "fr")},
274
        {switch_link("language", "ru-ru")},
275
        {switch_link("language", "es-es")},
276
        {switch_link("language", "pt-br")},
277
        {switch_link("language", "it-it")},
278
        {switch_link("language", "en-ca")},
279
        {switch_link("language", "de-de")},
280
        {switch_link("language", "es-mx")})
281
      </li>
282
      <li>
283
        <strong>has_premium</strong>:
284
        {"Yes" if has_premium else "No"}
285
        (switch to
286
        {switch_link("has_premium", "Yes")},
287
        {switch_link("has_premium", "No")})
288
      </li>
289
      <li>
290
        <strong>has_tracker_report_link</strong>:
291
        {"Yes" if has_tracker_report_link else "No"}
292
        (switch to
293
        {switch_link("has_tracker_report_link", "Yes")},
294
        {switch_link("has_tracker_report_link", "No")})
295
      </li>
296
      <li>
297
        <strong>num_level_one_email_trackers_removed</strong>:
298
        {num_level_one_email_trackers_removed}
299
        (switch to
300
        {switch_link("num_level_one_email_trackers_removed", "0")},
301
        {switch_link("num_level_one_email_trackers_removed", "1")},
302
        {switch_link("num_level_one_email_trackers_removed", "2")})
303
      </li>
304
    </ul>
305
    """
306
    )
307

308
    wrapped_email = wrap_html_email(
1✔
309
        original_html=html_content,
310
        language=language,
311
        has_premium=has_premium,
312
        tracker_report_link=tracker_report_link,
313
        display_email="test@relay.firefox.com",
314
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
315
    )
316
    return HttpResponse(wrapped_email)
1✔
317

318

319
def _store_reply_record(
1✔
320
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
321
) -> AWS_MailJSON:
322
    # After relaying email, store a Reply record for it
323
    reply_metadata = {}
1✔
324
    for header in mail["headers"]:
1✔
325
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
326
            reply_metadata[header["name"].lower()] = header["value"]
1✔
327
    message_id_bytes = get_message_id_bytes(message_id)
1✔
328
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
329
    lookup = b64_lookup_key(lookup_key)
1✔
330
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
331
    reply_create_args: dict[str, Any] = {
1✔
332
        "lookup": lookup,
333
        "encrypted_metadata": encrypted_metadata,
334
    }
335
    if isinstance(address, DomainAddress):
1✔
336
        reply_create_args["domain_address"] = address
1✔
337
    else:
338
        assert isinstance(address, RelayAddress)
1✔
339
        reply_create_args["relay_address"] = address
1✔
340
    Reply.objects.create(**reply_create_args)
1✔
341
    return mail
1✔
342

343

344
@csrf_exempt
1✔
345
def sns_inbound(request):
1✔
346
    incr_if_enabled("sns_inbound", 1)
1✔
347
    # First thing we do is verify the signature
348
    json_body = json.loads(request.body)
1✔
349
    verified_json_body = verify_from_sns(json_body)
1✔
350

351
    # Validate ARN and message type
352
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
353
    message_type = verified_json_body.get("Type", None)
1✔
354
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
355
    if error_details:
1✔
356
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
357
        return HttpResponse(error_details["error"], status=400)
1✔
358

359
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
360

361

362
def validate_sns_arn_and_type(
1✔
363
    topic_arn: str | None, message_type: str | None
364
) -> dict[str, Any] | None:
365
    """
366
    Validate Topic ARN and SNS Message Type.
367

368
    If an error is detected, the return is a dictionary of error details.
369
    If no error is detected, the return is None.
370
    """
371
    if not topic_arn:
1✔
372
        error = "Received SNS request without Topic ARN."
1✔
373
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
374
        error = "Received SNS message for wrong topic."
1✔
375
    elif not message_type:
1✔
376
        error = "Received SNS request without Message Type."
1✔
377
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
378
        error = "Received SNS message for unsupported Type."
1✔
379
    else:
380
        error = None
1✔
381

382
    if error:
1✔
383
        return {
1✔
384
            "error": error,
385
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
386
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
387
            "received_sns_type": (
388
                shlex.quote(message_type) if message_type else message_type
389
            ),
390
            "supported_sns_types": SUPPORTED_SNS_TYPES,
391
        }
392
    return None
1✔
393

394

395
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
396
    if message_type == "SubscriptionConfirmation":
×
UNCOV
397
        info_logger.info(
×
398
            "SNS SubscriptionConfirmation",
399
            extra={"SubscribeURL": json_body["SubscribeURL"]},
400
        )
401
        return HttpResponse("Logged SubscribeURL", status=200)
×
402
    if message_type == "Notification":
×
403
        incr_if_enabled("sns_inbound_Notification", 1)
×
UNCOV
404
        return _sns_notification(json_body)
×
405

UNCOV
406
    logger.error(
×
407
        "SNS message type did not fall under the SNS inbound logic",
408
        extra={"message_type": shlex.quote(message_type)},
409
    )
UNCOV
410
    capture_message(
×
411
        "Received SNS message with type not handled in inbound log",
412
        level="error",
413
        stack=True,
414
    )
UNCOV
415
    return HttpResponse(
×
416
        "Received SNS message with type not handled in inbound log", status=400
417
    )
418

419

420
def _sns_notification(json_body):
1✔
421
    try:
1✔
422
        message_json = json.loads(json_body["Message"])
1✔
423
    except JSONDecodeError:
1✔
424
        logger.error(
1✔
425
            "SNS notification has non-JSON message body",
426
            extra={"content": shlex.quote(json_body["Message"])},
427
        )
428
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
429

430
    event_type = message_json.get("eventType")
1✔
431
    notification_type = message_json.get("notificationType")
1✔
432
    if notification_type not in {
1✔
433
        "Complaint",
434
        "Received",
435
        "Bounce",
436
    } and event_type not in {"Complaint", "Bounce"}:
437
        logger.error(
1✔
438
            "SNS notification for unsupported type",
439
            extra={
440
                "notification_type": shlex.quote(notification_type),
441
                "event_type": shlex.quote(event_type),
442
                "keys": [shlex.quote(key) for key in message_json.keys()],
443
            },
444
        )
445
        return HttpResponse(
1✔
446
            "Received SNS notification for unsupported Type: %s"
447
            % html.escape(shlex.quote(notification_type)),
448
            status=400,
449
        )
450
    response = _sns_message(message_json)
1✔
451
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
452
    if response.status_code < 500:
1✔
453
        remove_message_from_s3(bucket, object_key)
1✔
454

455
    return response
1✔
456

457

458
def _get_recipient_with_relay_domain(recipients):
1✔
459
    domains_to_check = get_domains_from_settings().values()
1✔
460
    for recipient in recipients:
1✔
461
        for domain in domains_to_check:
1✔
462
            if domain in recipient:
1✔
463
                return recipient
1✔
464
    return None
1✔
465

466

467
def _get_relay_recipient_from_message_json(message_json):
1✔
468
    # Go thru all To, Cc, and Bcc fields and
469
    # return the one that has a Relay domain
470

471
    # First check commmon headers for to or cc match
472
    headers_to_check = "to", "cc"
1✔
473
    common_headers = message_json["mail"]["commonHeaders"]
1✔
474
    for header in headers_to_check:
1✔
475
        if header in common_headers:
1✔
476
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
477
            if recipient is not None:
1✔
478
                return parseaddr(recipient)[1]
1✔
479

480
    # SES-SNS sends bcc in a different part of the message
481
    recipients = message_json["receipt"]["recipients"]
1✔
482
    return _get_recipient_with_relay_domain(recipients)
1✔
483

484

485
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
486
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
487
    notification_type = message_json.get("notificationType")
1✔
488
    event_type = message_json.get("eventType")
1✔
489
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
490
        return _handle_bounce(message_json)
1✔
491
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
492
        return _handle_complaint(message_json)
1✔
493
    assert notification_type == "Received" and event_type is None
1✔
494
    return _handle_received(message_json)
1✔
495

496

497
# Enumerate the reasons that an email was not forwarded.
498
# This excludes emails dropped due to mask forwarding settings,
499
# such as "block all" and "block promotional". Those are logged
500
# as Glean email_blocked events.
501
EmailDroppedReason = Literal[
1✔
502
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
503
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
504
    "hard_bounce_pause",  # The user recently had a hard bounce
505
    "soft_bounce_pause",  # The user recently has a soft bounce
506
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
507
    "reply_requires_premium",  # The email is a reply from a free user
508
    "content_missing",  # Could not load the email from storage
509
    "error_from_header",  # Error generating the From: header, retryable
510
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
511
    "error_sending",  # Error sending the forwarded email (SES), retryable
512
]
513

514

515
def log_email_dropped(
1✔
516
    reason: EmailDroppedReason,
517
    mask: RelayAddress | DomainAddress,
518
    is_reply: bool = False,
519
    can_retry: bool = False,
520
) -> None:
521
    """
522
    Log that an email was dropped for a reason other than a mask blocking setting.
523

524
    This mirrors the interface of glean_logger().log_email_blocked(), which
525
    records emails dropped due to the mask's blocking setting.
526
    """
527
    extra: dict[str, str | int | bool] = {"reason": reason}
1✔
528
    if mask.user.profile.metrics_enabled:
1✔
529
        if mask.user.profile.fxa is not None:
1✔
530
            extra["fxa_id"] = mask.user.profile.fxa.uid
1✔
531
        extra["mask_id"] = mask.metrics_id
1✔
532
    extra |= {
1✔
533
        "is_random_mask": isinstance(mask, RelayAddress),
534
        "is_reply": is_reply,
535
        "can_retry": can_retry,
536
    }
537
    info_logger.info("email_dropped", extra=extra)
1✔
538

539

540
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
541
    """
542
    Handle an AWS SES received notification.
543

544
    For more information, see:
545
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
546
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
547

548
    Returns (may be incomplete):
549
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
550
      flagged for abuse, the email is under a bounce pause, the email was suppressed
551
      for spam, the list email was blocked, or the noreply address was the recipient.
552
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
553
      the email failed DMARC with reject policy, or the email is a reply chain to a
554
      non-premium user.
555
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
556
      "CC", or "BCC" fields, or the Relay address is not in the database.
557
    * 503 if the "From" address is malformed, the S3 client returned an error different
558
      from "not found", or the SES client fails
559

560
    And many other returns conditions if the email is a reply. The HTTP returns are an
561
    artifact from an earlier time when emails were sent to a webhook. Currently,
562
    production instead pulls events from a queue.
563

564
    TODO: Return a more appropriate status object
565
    TODO: Document the metrics emitted
566
    """
567
    mail = message_json["mail"]
1✔
568
    if "commonHeaders" not in mail:
1✔
569
        logger.error("SNS message without commonHeaders")
1✔
570
        return HttpResponse(
1✔
571
            "Received SNS notification without commonHeaders.", status=400
572
        )
573
    common_headers = mail["commonHeaders"]
1✔
574
    receipt = message_json["receipt"]
1✔
575

576
    _record_receipt_verdicts(receipt, "all")
1✔
577
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
578
    if to_address is None:
1✔
579
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
580
        return HttpResponse("Address does not exist", status=404)
1✔
581

582
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
583
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
584
    if not from_addresses:
1✔
585
        info_logger.error(
1✔
586
            "_handle_received: no from address",
587
            extra={
588
                "source": mail["source"],
589
                "common_headers_from": common_headers["from"],
590
            },
591
        )
592
        return HttpResponse("Unable to parse From address", status=400)
1✔
593
    from_address = from_addresses[0][1]
1✔
594

595
    try:
1✔
596
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
597
    except ValueError:
1✔
598
        # TODO: Add metric
599
        return HttpResponse("Malformed to field.", status=400)
1✔
600

601
    if to_local_portion.lower() == "noreply":
1✔
602
        incr_if_enabled("email_for_noreply_address", 1)
1✔
603
        return HttpResponse("noreply address is not supported.")
1✔
604
    try:
1✔
605
        # FIXME: this ambiguous return of either
606
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
607
        # up a bit.
608
        address = _get_address(to_address)
1✔
609
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
610
        user_profile = address.user.profile
1✔
611
    except (
1✔
612
        ObjectDoesNotExist,
613
        CannotMakeAddressException,
614
        DeletedAddress.MultipleObjectsReturned,
615
    ):
616
        if to_local_portion.lower() == "replies":
1✔
617
            response = _handle_reply(from_address, message_json, to_address)
1✔
618
        else:
619
            response = HttpResponse("Address does not exist", status=404)
1✔
620
        return response
1✔
621

622
    _record_receipt_verdicts(receipt, "valid_user")
1✔
623
    # if this is spam and the user is set to auto-block spam, early return
624
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
625
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
626
        log_email_dropped(reason="auto_block_spam", mask=address)
1✔
627
        return HttpResponse("Address rejects spam.")
1✔
628

629
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
630
        policy = receipt.get("dmarcPolicy", "none")
1✔
631
        # TODO: determine action on dmarcPolicy "quarantine"
632
        if policy == "reject":
1!
633
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
1✔
634
            incr_if_enabled(
1✔
635
                "email_suppressed_for_dmarc_failure",
636
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
637
            )
638
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
639

640
    # if this user is over bounce limits, early return
641
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
642
    if bounce_paused:
1✔
643
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
644
        incr_if_enabled("email_suppressed_for_%s_bounce" % bounce_type, 1)
1✔
645
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
646
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
647
        )
648
        log_email_dropped(reason=reason, mask=address)
1✔
649
        return HttpResponse("Address is temporarily disabled.")
1✔
650

651
    # check if this is a reply from an external sender to a Relay user
652
    try:
1✔
653
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
654
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
655
        user_address = address
1✔
656
        address = reply_record.address
1✔
657
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
658
        # make sure the relay user is premium
659
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
660
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
1✔
661
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
662
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
663
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
664
        # match a Reply record, continue to treat this as a regular email from
665
        # an external sender to a relay user
666
        pass
1✔
667

668
    # if account flagged for abuse, early return
669
    if user_profile.is_flagged:
1✔
670
        log_email_dropped(reason="abuse_flag", mask=address)
1✔
671
        return HttpResponse("Address is temporarily disabled.")
1✔
672

673
    # if address is set to block, early return
674
    if not address.enabled:
1✔
675
        incr_if_enabled("email_for_disabled_address", 1)
1✔
676
        address.num_blocked += 1
1✔
677
        address.save(update_fields=["num_blocked"])
1✔
678
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
679
        user_profile.last_engagement = datetime.now(UTC)
1✔
680
        user_profile.save()
1✔
681
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
682
        return HttpResponse("Address is temporarily disabled.")
1✔
683

684
    _record_receipt_verdicts(receipt, "active_alias")
1✔
685
    incr_if_enabled("email_for_active_address", 1)
1✔
686

687
    # if address is blocking list emails, and email is from list, early return
688
    if (
1✔
689
        address
690
        and address.block_list_emails
691
        and user_profile.has_premium
692
        and _check_email_from_list(mail["headers"])
693
    ):
694
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
695
        address.num_blocked += 1
1✔
696
        address.save(update_fields=["num_blocked"])
1✔
697
        user_profile.last_engagement = datetime.now(UTC)
1✔
698
        user_profile.save()
1✔
699
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
700
        return HttpResponse("Address is not accepting list emails.")
1✔
701

702
    # Collect new headers
703
    subject = common_headers.get("subject", "")
1✔
704
    destination_address = user_profile.user.email
1✔
705
    reply_address = get_reply_to_address()
1✔
706
    try:
1✔
707
        from_header = generate_from_header(from_address, to_address)
1✔
708
    except InvalidFromHeader:
1✔
709
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
710
        header_from = []
1✔
711
        for header in mail["headers"]:
1✔
712
            if header["name"].lower() == "from":
1✔
713
                header_from.append(header)
1✔
714
        info_logger.error(
1✔
715
            "generate_from_header",
716
            extra={
717
                "from_address": from_address,
718
                "source": mail["source"],
719
                "common_headers_from": common_headers["from"],
720
                "headers_from": header_from,
721
            },
722
        )
723
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
1✔
724
        return HttpResponse("Cannot parse the From address", status=503)
1✔
725

726
    headers: OutgoingHeaders = {
1✔
727
        "Subject": subject,
728
        "From": from_header,
729
        "To": destination_address,
730
        "Reply-To": reply_address,
731
        "Resent-From": from_address,
732
    }
733

734
    # Get incoming email
735
    try:
1✔
736
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
737
    except ClientError as e:
1✔
738
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
739
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
740
            log_email_dropped(reason="content_missing", mask=address)
1✔
741
            return HttpResponse("Email not in S3", status=404)
1✔
742
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
743
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
1✔
744
        # we are returning a 503 so that SNS can retry the email processing
745
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
746

747
    # Convert to new email
748
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
749
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
750
    remove_level_one_trackers = bool(
1✔
751
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
752
    )
753
    (
1✔
754
        forwarded_email,
755
        issues,
756
        level_one_trackers_removed,
757
        has_html,
758
        has_text,
759
    ) = _convert_to_forwarded_email(
760
        incoming_email_bytes=incoming_email_bytes,
761
        headers=headers,
762
        to_address=to_address,
763
        from_address=from_address,
764
        language=user_profile.language,
765
        has_premium=user_profile.has_premium,
766
        sample_trackers=sample_trackers,
767
        remove_level_one_trackers=remove_level_one_trackers,
768
    )
769
    if has_html:
1✔
770
        incr_if_enabled("email_with_html_content", 1)
1✔
771
    if has_text:
1!
772
        incr_if_enabled("email_with_text_content", 1)
1✔
773
    if issues:
1✔
774
        info_logger.warning(
1✔
775
            "_handle_received: forwarding issues", extra={"issues": issues}
776
        )
777

778
    # Send new email
779
    try:
1✔
780
        ses_response = ses_send_raw_email(
1✔
781
            source_address=reply_address,
782
            destination_address=destination_address,
783
            message=forwarded_email,
784
        )
785
    except ClientError:
1✔
786
        # 503 service unavailable reponse to SNS so it can retry
787
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
1✔
788
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
789

790
    message_id = ses_response["MessageId"]
1✔
791
    _store_reply_record(mail, message_id, address)
1✔
792

793
    user_profile.update_abuse_metric(
1✔
794
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
795
    )
796
    user_profile.last_engagement = datetime.now(UTC)
1✔
797
    user_profile.save()
1✔
798
    address.num_forwarded += 1
1✔
799
    address.last_used_at = datetime.now(UTC)
1✔
800
    if level_one_trackers_removed:
1!
UNCOV
801
        address.num_level_one_trackers_blocked = (
×
802
            address.num_level_one_trackers_blocked or 0
803
        ) + level_one_trackers_removed
804
    address.save(
1✔
805
        update_fields=[
806
            "num_forwarded",
807
            "last_used_at",
808
            "block_list_emails",
809
            "num_level_one_trackers_blocked",
810
        ]
811
    )
812
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
813
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
814

815

816
def _get_verdict(receipt, verdict_type):
1✔
817
    return receipt["%sVerdict" % verdict_type]["status"]
1✔
818

819

820
def _check_email_from_list(headers):
1✔
821
    for header in headers:
1!
822
        if header["name"].lower().startswith("list-"):
1!
823
            return True
1✔
UNCOV
824
    return False
×
825

826

827
def _record_receipt_verdicts(receipt, state):
1✔
828
    verdict_tags = []
1✔
829
    for key in sorted(receipt.keys()):
1✔
830
        if key.endswith("Verdict"):
1✔
831
            value = receipt[key]["status"]
1✔
832
            verdict_tags.append(f"{key}:{value}")
1✔
833
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
834
        elif key == "dmarcPolicy":
1✔
835
            value = receipt[key]
1✔
836
            verdict_tags.append(f"{key}:{value}")
1✔
837
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
838

839

840
def _get_message_id_from_headers(headers):
1✔
841
    message_id = None
1✔
842
    for header in headers:
1✔
843
        if header["name"].lower() == "message-id":
1✔
844
            message_id = header["value"]
1✔
845
    return message_id
1✔
846

847

848
def _get_keys_from_headers(headers):
1✔
849
    in_reply_to = None
1✔
850
    for header in headers:
1✔
851
        if header["name"].lower() == "in-reply-to":
1✔
852
            in_reply_to = header["value"]
1✔
853
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
854
            return derive_reply_keys(message_id_bytes)
1✔
855

856
        if header["name"].lower() == "references":
1✔
857
            message_ids = header["value"]
1✔
858
            for message_id in message_ids.split(" "):
1✔
859
                message_id_bytes = get_message_id_bytes(message_id)
1✔
860
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
861
                try:
1✔
862
                    # FIXME: calling code is likely to duplicate this query
863
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
864
                    return lookup_key, encryption_key
1✔
865
                except Reply.DoesNotExist:
1✔
866
                    pass
1✔
867
            raise Reply.DoesNotExist
1✔
868
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
869
    raise ReplyHeadersNotFound
1✔
870

871

872
def _get_reply_record_from_lookup_key(lookup_key):
1✔
873
    lookup = b64_lookup_key(lookup_key)
1✔
874
    return Reply.objects.get(lookup=lookup)
1✔
875

876

877
def _strip_localpart_tag(address):
1✔
878
    [localpart, domain] = address.split("@")
1✔
879
    subaddress_parts = localpart.split("+")
1✔
880
    return f"{subaddress_parts[0]}@{domain}"
1✔
881

882

883
_TransportType = Literal["sns", "s3"]
1✔
884

885

886
def _get_email_bytes(
1✔
887
    message_json: AWS_SNSMessageJSON,
888
) -> tuple[bytes, _TransportType, float]:
889
    with Timer(logger=None) as load_timer:
1✔
890
        if "content" in message_json:
1✔
891
            # email content in sns message
892
            message_content = message_json["content"].encode("utf-8")
1✔
893
            transport: Literal["sns", "s3"] = "sns"
1✔
894
        else:
895
            # assume email content in S3
896
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
897
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
898
            transport = "s3"
1✔
899
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
900
    load_time_s = round(load_timer.last, 3)
1✔
901
    return (message_content, transport, load_time_s)
1✔
902

903

904
def _convert_to_forwarded_email(
1✔
905
    incoming_email_bytes: bytes,
906
    headers: OutgoingHeaders,
907
    to_address: str,
908
    from_address: str,
909
    language: str,
910
    has_premium: bool,
911
    sample_trackers: bool,
912
    remove_level_one_trackers: bool,
913
    now: datetime | None = None,
914
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
915
    """
916
    Convert an email (as bytes) to a forwarded email.
917

918
    Return is a tuple:
919
    - email - The forwarded email
920
    - issues - Any detected issues in conversion
921
    - level_one_trackers_removed (int) - Number of trackers removed
922
    - has_html - True if the email has an HTML representation
923
    - has_text - True if the email has a plain text representation
924
    """
925
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
926
    # python/typeshed issue 2418
927
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
928
    # policy.default.message_factory is EmailMessage
929
    assert isinstance(email, EmailMessage)
1✔
930

931
    # Replace headers in the original email
932
    header_issues = _replace_headers(email, headers)
1✔
933

934
    # Find and replace text content
935
    text_body = email.get_body("plain")
1✔
936
    text_content = None
1✔
937
    has_text = False
1✔
938
    if text_body:
1!
939
        has_text = True
1✔
940
        assert isinstance(text_body, EmailMessage)
1✔
941
        text_content = text_body.get_content()
1✔
942
        new_text_content = _convert_text_content(text_content, to_address)
1✔
943
        text_body.set_content(new_text_content)
1✔
944

945
    # Find and replace HTML content
946
    html_body = email.get_body("html")
1✔
947
    level_one_trackers_removed = 0
1✔
948
    has_html = False
1✔
949
    if html_body:
1✔
950
        has_html = True
1✔
951
        assert isinstance(html_body, EmailMessage)
1✔
952
        html_content = html_body.get_content()
1✔
953
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
954
            html_content,
955
            to_address,
956
            from_address,
957
            language,
958
            has_premium,
959
            sample_trackers,
960
            remove_level_one_trackers,
961
        )
962
        html_body.set_content(new_content, subtype="html")
1✔
963
    elif text_content:
1!
964
        # Try to use the text content to generate HTML content
965
        html_content = urlize_and_linebreaks(text_content)
1✔
966
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
967
            html_content,
968
            to_address,
969
            from_address,
970
            language,
971
            has_premium,
972
            sample_trackers,
973
            remove_level_one_trackers,
974
        )
975
        assert isinstance(text_body, EmailMessage)
1✔
976
        try:
1✔
977
            text_body.add_alternative(new_content, subtype="html")
1✔
978
        except TypeError as e:
×
979
            out = StringIO()
×
980
            _structure(email, fp=out)
×
UNCOV
981
            info_logger.error(
×
982
                "Adding HTML alternate failed",
983
                extra={"exception": str(e), "structure": out.getvalue()},
984
            )
985

986
    issues: EmailForwardingIssues = {}
1✔
987
    if header_issues:
1✔
988
        issues["headers"] = header_issues
1✔
989
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
990

991

992
def _replace_headers(
1✔
993
    email: EmailMessage, headers: OutgoingHeaders
994
) -> EmailHeaderIssues:
995
    """
996
    Replace the headers in email with new headers.
997

998
    This replaces headers in the passed email object, rather than returns an altered
999
    copy. The primary reason is that the Python email package can read an email with
1000
    non-compliant headers or content, but can't write it. A read/write is required to
1001
    create a copy that we then alter. This code instead alters the passed EmailMessage
1002
    object, making header-specific changes in try / except statements.
1003

1004
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1005
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1006
    nice to handle the non-compliant headers without crashing before we add a source of
1007
    memory-related crashes.
1008
    """
1009
    # Look for headers to drop
1010
    to_drop: list[str] = []
1✔
1011
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
1✔
1012
    issues: EmailHeaderIssues = defaultdict(list)
1✔
1013

1014
    # Detect non-compliant headers in incoming emails
1015
    for header in email.keys():
1✔
1016
        try:
1✔
1017
            value = email[header]
1✔
1018
        except Exception as e:
1✔
1019
            issues["incoming"].append((header, {"exception_on_read": repr(e)}))
1✔
1020
            value = None
1✔
1021
        if getattr(value, "defects", None):
1✔
1022
            issues["incoming"].append(
1✔
1023
                (
1024
                    header,
1025
                    {
1026
                        "defect_count": len(value.defects),
1027
                        "parsed_value": str(value),
1028
                        "unstructured_value": str(value.as_unstructured),
1029
                    },
1030
                )
1031
            )
1032

1033
    # Collect headers that will not be forwarded
1034
    for header in email.keys():
1✔
1035
        header_lower = header.lower()
1✔
1036
        if (
1✔
1037
            header_lower not in replacements
1038
            and header_lower != "mime-version"
1039
            and not header_lower.startswith("content-")
1040
        ):
1041
            to_drop.append(header)
1✔
1042

1043
    # Drop headers that should be dropped
1044
    for header in to_drop:
1✔
1045
        del email[header]
1✔
1046

1047
    # Replace the requested headers
1048
    for header, value in headers.items():
1✔
1049
        del email[header]
1✔
1050
        try:
1✔
1051
            email[header] = value
1✔
1052
        except Exception as e:
×
UNCOV
1053
            issues["outgoing"].append(
×
1054
                (header, {"exception_on_write": repr(e), "value": value})
1055
            )
UNCOV
1056
            continue
×
1057
        try:
1✔
1058
            parsed_value = email[header]
1✔
1059
        except Exception as e:
×
1060
            issues["outgoing"].append((header, {"exception_on_read": repr(e)}))
×
UNCOV
1061
            continue
×
1062
        if parsed_value.defects:
1!
UNCOV
1063
            issues["outgoing"].append(
×
1064
                (
1065
                    header,
1066
                    {
1067
                        "defect_count": len(parsed_value.defects),
1068
                        "parsed_value": str(parsed_value),
1069
                        "unstructured_value": str(parsed_value.as_unstructured),
1070
                    },
1071
                )
1072
            )
1073

1074
    return dict(issues)
1✔
1075

1076

1077
def _convert_html_content(
1✔
1078
    html_content: str,
1079
    to_address: str,
1080
    from_address: str,
1081
    language: str,
1082
    has_premium: bool,
1083
    sample_trackers: bool,
1084
    remove_level_one_trackers: bool,
1085
    now: datetime | None = None,
1086
) -> tuple[str, int]:
1087
    # frontend expects a timestamp in milliseconds
1088
    now = now or datetime.now(UTC)
1✔
1089
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1090

1091
    # scramble alias so that clients don't recognize it
1092
    # and apply default link styles
1093
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1094

1095
    # sample tracker numbers
1096
    if sample_trackers:
1!
UNCOV
1097
        count_all_trackers(html_content)
×
1098

1099
    tracker_report_link = ""
1✔
1100
    removed_count = 0
1✔
1101
    if remove_level_one_trackers:
1!
UNCOV
1102
        html_content, tracker_details = remove_trackers(
×
1103
            html_content, from_address, datetime_now_ms
1104
        )
1105
        removed_count = tracker_details["tracker_removed"]
×
UNCOV
1106
        tracker_report_details = {
×
1107
            "sender": from_address,
1108
            "received_at": datetime_now_ms,
1109
            "trackers": tracker_details["level_one"]["trackers"],
1110
        }
UNCOV
1111
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1112
            tracker_report_details
1113
        )
1114

1115
    wrapped_html = wrap_html_email(
1✔
1116
        original_html=html_content,
1117
        language=language,
1118
        has_premium=has_premium,
1119
        display_email=display_email,
1120
        tracker_report_link=tracker_report_link,
1121
        num_level_one_email_trackers_removed=removed_count,
1122
    )
1123
    return wrapped_html, removed_count
1✔
1124

1125

1126
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1127
    relay_header_text = (
1✔
1128
        "This email was sent to your alias "
1129
        f"{to_address}. To stop receiving emails sent to this alias, "
1130
        "update the forwarding settings in your dashboard.\n"
1131
        "---Begin Email---\n"
1132
    )
1133
    wrapped_text = relay_header_text + text_content
1✔
1134
    return wrapped_text
1✔
1135

1136

1137
def _build_reply_requires_premium_email(
1✔
1138
    from_address: str,
1139
    reply_record: Reply,
1140
    message_id: str | None,
1141
    decrypted_metadata: dict[str, Any] | None,
1142
) -> EmailMessage:
1143
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1144
    # will forward.  So, tell the user we forwarded it.
1145
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1146
    sender: str | None = ""
1✔
1147
    if decrypted_metadata is not None:
1!
1148
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1149
    ctx = {
1✔
1150
        "sender": sender or "",
1151
        "forwarded": forwarded,
1152
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1153
    }
1154
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1155
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1156

1157
    # Create the message
1158
    msg = EmailMessage()
1✔
1159
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1160
    msg["From"] = get_reply_to_address()
1✔
1161
    msg["To"] = from_address
1✔
1162
    if message_id:
1!
1163
        msg["In-Reply-To"] = message_id
1✔
1164
        msg["References"] = message_id
1✔
1165
    msg.set_content(text_body)
1✔
1166
    msg.add_alternative(html_body, subtype="html")
1✔
1167
    return msg
1✔
1168

1169

1170
def _set_forwarded_first_reply(profile):
1✔
1171
    profile.forwarded_first_reply = True
1✔
1172
    profile.save()
1✔
1173

1174

1175
def _send_reply_requires_premium_email(
1✔
1176
    from_address: str,
1177
    reply_record: Reply,
1178
    message_id: str | None,
1179
    decrypted_metadata: dict[str, Any] | None,
1180
) -> None:
UNCOV
1181
    msg = _build_reply_requires_premium_email(
×
1182
        from_address, reply_record, message_id, decrypted_metadata
1183
    )
1184
    try:
×
UNCOV
1185
        ses_send_raw_email(
×
1186
            source_address=get_reply_to_address(premium=False),
1187
            destination_address=from_address,
1188
            message=msg,
1189
        )
1190
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1191
        # So, updated the DB.
1192
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1193
    except ClientError as e:
×
1194
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
UNCOV
1195
    incr_if_enabled("free_user_reply_attempt", 1)
×
1196

1197

1198
def _reply_allowed(
1✔
1199
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1200
):
1201
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1202
    reply_record_email = reply_record.address.user.email
1✔
1203
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1204
    if (from_address == reply_record_email) or (
1!
1205
        stripped_from_address == stripped_reply_record_address
1206
    ):
1207
        # This is a Relay user replying to an external sender;
1208

1209
        if reply_record.profile.is_flagged:
1!
UNCOV
1210
            return False
×
1211

1212
        if reply_record.owner_has_premium:
1!
1213
            return True
1✔
1214

1215
        # if we haven't forwarded a first reply for this user, return True to allow
1216
        # this first reply
1217
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
UNCOV
1218
        _send_reply_requires_premium_email(
×
1219
            from_address, reply_record, message_id, decrypted_metadata
1220
        )
UNCOV
1221
        return allow_first_reply
×
1222
    else:
1223
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1224
        # premium Relay user
1225
        try:
×
1226
            address = _get_address(to_address)
×
1227
            if address.user.profile.has_premium:
×
1228
                return True
×
1229
        except ObjectDoesNotExist:
×
1230
            return False
×
1231
    incr_if_enabled("free_user_reply_attempt", 1)
×
UNCOV
1232
    return False
×
1233

1234

1235
def _handle_reply(
1✔
1236
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1237
) -> HttpResponse:
1238
    """
1239
    Handle a reply from a Relay user to an external email.
1240

1241
    Returns (may be incomplete):
1242
    * 200 if the reply was sent
1243
    * 400 if the In-Reply-To and References headers are missing, none of the References
1244
      headers are a reply record, or the SES client raises an error
1245
    * 403 if the Relay user is not allowed to reply
1246
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1247
      the database
1248
    * 503 if the S3 client returns an error (other than not found), or the SES client
1249
      returns an error
1250

1251
    TODO: Return a more appropriate status object (see _handle_received)
1252
    TODO: Document metrics emitted
1253
    """
1254
    mail = message_json["mail"]
1✔
1255
    try:
1✔
1256
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1257
    except ReplyHeadersNotFound:
1✔
1258
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1259
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1260

1261
    try:
1✔
1262
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1263
    except Reply.DoesNotExist:
1✔
1264
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1265
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1266

1267
    address = reply_record.address
1✔
1268
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1269
    decrypted_metadata = json.loads(
1✔
1270
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1271
    )
1272
    if not _reply_allowed(
1✔
1273
        from_address, to_address, reply_record, message_id, decrypted_metadata
1274
    ):
1275
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
1✔
1276
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1277

1278
    outbound_from_address = address.full_address
1✔
1279
    incr_if_enabled("reply_email", 1)
1✔
1280
    subject = mail["commonHeaders"].get("subject", "")
1✔
1281
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1282
    headers: OutgoingHeaders = {
1✔
1283
        "Subject": subject,
1284
        "From": outbound_from_address,
1285
        "To": to_address,
1286
        "Reply-To": outbound_from_address,
1287
    }
1288

1289
    try:
1✔
1290
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1291
    except ClientError as e:
1✔
1292
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1293
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1294
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
1✔
1295
            return HttpResponse("Email not in S3", status=404)
1✔
1296
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1297
        log_email_dropped(
1✔
1298
            reason="error_storage", mask=address, is_reply=True, can_retry=True
1299
        )
1300
        # we are returning a 500 so that SNS can retry the email processing
1301
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1302

1303
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1304
    assert isinstance(email, EmailMessage)
1✔
1305

1306
    # Convert to a reply email
1307
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1308
    _replace_headers(email, headers)
1✔
1309

1310
    try:
1✔
1311
        ses_send_raw_email(
1✔
1312
            source_address=outbound_from_address,
1313
            destination_address=to_address,
1314
            message=email,
1315
        )
1316
    except ClientError:
1✔
1317
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
1✔
1318
        return HttpResponse("SES client error", status=400)
1✔
1319

1320
    reply_record.increment_num_replied()
1✔
1321
    profile = address.user.profile
1✔
1322
    profile.update_abuse_metric(replied=True)
1✔
1323
    profile.last_engagement = datetime.now(UTC)
1✔
1324
    profile.save()
1✔
1325
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1326
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1327

1328

1329
def _get_domain_address(local_portion: str, domain_portion: str) -> DomainAddress:
1✔
1330
    """
1331
    Find or create the DomainAddress for the parts of an email address.
1332

1333
    If the domain_portion is for a valid subdomain, a new DomainAddress
1334
    will be created and returned.
1335

1336
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1337

1338
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1339
    """
1340

1341
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1342
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1343
        incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1344
        raise ObjectDoesNotExist("Address does not exist")
1✔
1345
    try:
1✔
1346
        with transaction.atomic():
1✔
1347
            locked_profile = Profile.objects.select_for_update().get(
1✔
1348
                subdomain=address_subdomain
1349
            )
1350
            domain_numerical = get_domain_numerical(address_domain)
1✔
1351
            # filter DomainAddress because it may not exist
1352
            # which will throw an error with get()
1353
            domain_address = DomainAddress.objects.filter(
1✔
1354
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1355
            ).first()
1356
            if domain_address is None:
1✔
1357
                # TODO: Consider flows when a user generating alias on a fly
1358
                # was unable to receive an email due to user no longer being a
1359
                # premium user as seen in exception thrown on make_domain_address
1360
                domain_address = DomainAddress.make_domain_address(
1✔
1361
                    locked_profile, local_portion, True
1362
                )
1363
                glean_logger().log_email_mask_created(
1✔
1364
                    mask=domain_address,
1365
                    created_by_api=False,
1366
                )
1367
            domain_address.last_used_at = datetime.now(UTC)
1✔
1368
            domain_address.save()
1✔
1369
            return domain_address
1✔
1370
    except Profile.DoesNotExist as e:
1✔
1371
        incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1372
        raise e
1✔
1373

1374

1375
def _get_address(address: str) -> RelayAddress | DomainAddress:
1✔
1376
    """
1377
    Find or create the RelayAddress or DomainAddress for an email address.
1378

1379
    If an unknown email address is for a valid subdomain, a new DomainAddress
1380
    will be created.
1381

1382
    On failure, raises exception based on Django's ObjectDoesNotExist:
1383
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1384
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1385
    * ObjectDoesNotExist - Unknown domain
1386
    """
1387

1388
    local_portion, domain_portion = address.split("@")
1✔
1389
    local_address = local_portion.lower()
1✔
1390
    domain = domain_portion.lower()
1✔
1391

1392
    # if the domain is not the site's 'top' relay domain,
1393
    # it may be for a user's subdomain
1394
    email_domains = get_domains_from_settings().values()
1✔
1395
    if domain not in email_domains:
1✔
1396
        return _get_domain_address(local_address, domain)
1✔
1397

1398
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1399
    try:
1✔
1400
        domain_numerical = get_domain_numerical(domain)
1✔
1401
        relay_address = RelayAddress.objects.get(
1✔
1402
            address=local_address, domain=domain_numerical
1403
        )
1404
        return relay_address
1✔
1405
    except RelayAddress.DoesNotExist as e:
1✔
1406
        try:
1✔
1407
            DeletedAddress.objects.get(
1✔
1408
                address_hash=address_hash(local_address, domain=domain)
1409
            )
1410
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1411
            # TODO: create a hard bounce receipt rule in SES
1412
        except DeletedAddress.DoesNotExist:
1✔
1413
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1414
        except DeletedAddress.MultipleObjectsReturned:
1✔
1415
            # not sure why this happens on stage but let's handle it
1416
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1417
        raise e
1✔
1418

1419

1420
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1421
    """
1422
    Handle an AWS SES bounce notification.
1423

1424
    For more information, see:
1425
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1426

1427
    Returns:
1428
    * 404 response if any email address does not match a user,
1429
    * 200 response if all match or none are given
1430

1431
    Emits a counter metric "email_bounce" with these tags:
1432
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1433
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1434
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1435
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1436

1437
    Emits an info log "bounce_notification", same data as metric, plus:
1438
    * bounce_action: 'action' from bounced recipient data, or None
1439
    * bounce_status: 'status' from bounced recipient data, or None
1440
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1441
    * bounce_extra: Extra data from bounce_recipient data, if any
1442
    * domain: User's real email address domain, if an address was given
1443
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1444
    """
1445
    bounce = message_json.get("bounce", {})
1✔
1446
    bounce_type = bounce.get("bounceType", "none")
1✔
1447
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1448
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1449

1450
    now = datetime.now(UTC)
1✔
1451
    bounce_data = []
1✔
1452
    for recipient in bounced_recipients:
1✔
1453
        recipient_address = recipient.pop("emailAddress", None)
1✔
1454
        data = {
1✔
1455
            "bounce_type": bounce_type,
1456
            "bounce_subtype": bounce_subtype,
1457
            "bounce_action": recipient.pop("action", ""),
1458
            "bounce_status": recipient.pop("status", ""),
1459
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1460
            "user_match": "no_address",
1461
            "relay_action": "no_action",
1462
        }
1463
        if recipient:
1!
UNCOV
1464
            data["bounce_extra"] = recipient.copy()
×
1465
        bounce_data.append(data)
1✔
1466

1467
        if recipient_address is None:
1!
UNCOV
1468
            continue
×
1469

1470
        recipient_address = parseaddr(recipient_address)[1]
1✔
1471
        recipient_domain = recipient_address.split("@")[1]
1✔
1472
        data["domain"] = recipient_domain
1✔
1473

1474
        try:
1✔
1475
            user = User.objects.get(email=recipient_address)
1✔
1476
            profile = user.profile
1✔
1477
            data["user_match"] = "found"
1✔
1478
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1479
                data["fxa_id"] = fxa.uid
1✔
1480
            else:
1481
                data["fxa_id"] = ""
1✔
1482
        except User.DoesNotExist:
1✔
1483
            # TODO: handle bounce for a user who no longer exists
1484
            # add to SES account-wide suppression list?
1485
            data["user_match"] = "missing"
1✔
1486
            continue
1✔
1487

1488
        action = None
1✔
1489
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1490
            # if an email bounced as spam, set to auto block spam for this user
1491
            # and DON'T set them into bounce pause state
1492
            action = "auto_block_spam"
1✔
1493
            profile.auto_block_spam = True
1✔
1494
        elif bounce_type == "Permanent":
1✔
1495
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1496
            action = "hard_bounce"
1✔
1497
            profile.last_hard_bounce = now
1✔
1498
        elif bounce_type == "Transient":
1!
1499
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1500
            action = "soft_bounce"
1✔
1501
            profile.last_soft_bounce = now
1✔
1502
        if action:
1!
1503
            data["relay_action"] = action
1✔
1504
            profile.save()
1✔
1505

1506
    if not bounce_data:
1!
1507
        # Data when there are no identified recipients
UNCOV
1508
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1509

1510
    for data in bounce_data:
1✔
1511
        tags = {
1✔
1512
            "bounce_type": bounce_type,
1513
            "bounce_subtype": bounce_subtype,
1514
            "user_match": data["user_match"],
1515
            "relay_action": data["relay_action"],
1516
        }
1517
        incr_if_enabled(
1✔
1518
            "email_bounce",
1519
            1,
1520
            tags=[generate_tag(key, val) for key, val in tags.items()],
1521
        )
1522
        info_logger.info("bounce_notification", extra=data)
1✔
1523

1524
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1525
        return HttpResponse("Address does not exist", status=404)
1✔
1526
    return HttpResponse("OK", status=200)
1✔
1527

1528

1529
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1530
    """
1531
    Handle an AWS SES complaint notification.
1532

1533
    For more information, see:
1534
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1535

1536
    Returns:
1537
    * 404 response if any email address does not match a user,
1538
    * 200 response if all match or none are given
1539

1540
    Emits a counter metric "email_complaint" with these tags:
1541
    * complaint_subtype: 'onaccounsuppressionlist', or 'none' if omitted
1542
    * complaint_feedback - feedback enumeration from ISP or 'none'
1543
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1544
    * relay_action: 'no_action', 'auto_block_spam'
1545

1546
    Emits an info log "complaint_notification", same data as metric, plus:
1547
    * complaint_user_agent - identifies the client used to file the complaint
1548
    * complaint_extra - Extra data from complainedRecipients data, if any
1549
    * domain - User's domain, if an address was given
1550
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1551
    """
1552
    complaint = deepcopy(message_json.get("complaint", {}))
1✔
1553
    complained_recipients = complaint.pop("complainedRecipients", [])
1✔
1554
    subtype = complaint.pop("complaintSubType", None)
1✔
1555
    user_agent = complaint.pop("userAgent", None)
1✔
1556
    feedback = complaint.pop("complaintFeedbackType", None)
1✔
1557

1558
    complaint_data = []
1✔
1559
    for recipient in complained_recipients:
1✔
1560
        recipient_address = recipient.pop("emailAddress", None)
1✔
1561
        data = {
1✔
1562
            "complaint_subtype": subtype,
1563
            "complaint_user_agent": user_agent,
1564
            "complaint_feedback": feedback,
1565
            "user_match": "no_address",
1566
            "relay_action": "no_action",
1567
        }
1568
        if recipient:
1!
UNCOV
1569
            data["complaint_extra"] = recipient.copy()
×
1570
        complaint_data.append(data)
1✔
1571

1572
        if recipient_address is None:
1!
UNCOV
1573
            continue
×
1574

1575
        recipient_address = parseaddr(recipient_address)[1]
1✔
1576
        recipient_domain = recipient_address.split("@")[1]
1✔
1577
        data["domain"] = recipient_domain
1✔
1578

1579
        try:
1✔
1580
            user = User.objects.get(email=recipient_address)
1✔
1581
            profile = user.profile
1✔
1582
            data["user_match"] = "found"
1✔
1583
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1584
                data["fxa_id"] = fxa.uid
1✔
1585
            else:
1586
                data["fxa_id"] = ""
1✔
1587
        except User.DoesNotExist:
×
1588
            data["user_match"] = "missing"
×
UNCOV
1589
            continue
×
1590

1591
        data["relay_action"] = "auto_block_spam"
1✔
1592
        profile.auto_block_spam = True
1✔
1593
        profile.save()
1✔
1594

1595
    if not complaint_data:
1!
1596
        # Data when there are no identified recipients
UNCOV
1597
        complaint_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1598

1599
    for data in complaint_data:
1✔
1600
        tags = {
1✔
1601
            "complaint_subtype": subtype or "none",
1602
            "complaint_feedback": feedback or "none",
1603
            "user_match": data["user_match"],
1604
            "relay_action": data["relay_action"],
1605
        }
1606
        incr_if_enabled(
1✔
1607
            "email_complaint",
1608
            1,
1609
            tags=[generate_tag(key, val) for key, val in tags.items()],
1610
        )
1611
        info_logger.info("complaint_notification", extra=data)
1✔
1612

1613
    if any(data["user_match"] == "missing" for data in complaint_data):
1!
UNCOV
1614
        return HttpResponse("Address does not exist", status=404)
×
1615
    return HttpResponse("OK", status=200)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc