• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 4276dcd9-8c30-4786-8a41-f9c1cdae7f05

05 Mar 2024 07:15PM CUT coverage: 74.734% (+0.6%) from 74.139%
4276dcd9-8c30-4786-8a41-f9c1cdae7f05

Pull #4452

circleci

jwhitlock
Pass user to create_expected_glean_event

Pass the related user to the test helper create_expected_glean_event, so
that the user-specific values such as fxa_id and date_joined_relay can
be extracted in the helper rather than each test function.
Pull Request #4452: MPP-3352: Add first Glean metrics to measure email mask usage

2084 of 3047 branches covered (68.4%)

Branch coverage included in aggregate %.

251 of 256 new or added lines in 7 files covered. (98.05%)

79 existing lines in 3 files now uncovered.

6763 of 8791 relevant lines covered (76.93%)

20.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.67
/emails/views.py
1
from collections import defaultdict
1✔
2
from copy import deepcopy
1✔
3
from datetime import datetime, timezone
1✔
4
from email import message_from_bytes
1✔
5
from email.iterators import _structure
1✔
6
from email.message import EmailMessage
1✔
7
from email.utils import parseaddr
1✔
8
import html
1✔
9
from io import StringIO
1✔
10
import json
1✔
11
from json import JSONDecodeError
1✔
12
import logging
1✔
13
import re
1✔
14
import shlex
1✔
15
from textwrap import dedent
1✔
16
from typing import Any, Literal
1✔
17
from urllib.parse import urlencode
1✔
18

19
from botocore.exceptions import ClientError
1✔
20
from codetiming import Timer
1✔
21
from decouple import strtobool
1✔
22
from django.shortcuts import render
1✔
23
from sentry_sdk import capture_message
1✔
24
from markus.utils import generate_tag
1✔
25
from waffle import sample_is_active
1✔
26

27
from django.conf import settings
1✔
28
from django.contrib.auth.models import User
1✔
29
from django.core.exceptions import ObjectDoesNotExist
1✔
30
from django.db import transaction
1✔
31
from django.db.models import prefetch_related_objects
1✔
32
from django.http import HttpRequest, HttpResponse
1✔
33
from django.template.loader import render_to_string
1✔
34
from django.utils.html import escape
1✔
35
from django.views.decorators.csrf import csrf_exempt
1✔
36

37
from privaterelay.utils import get_subplat_upgrade_link_by_language, glean_logger
1✔
38

39

40
from .models import (
1✔
41
    CannotMakeAddressException,
42
    DeletedAddress,
43
    DomainAddress,
44
    Profile,
45
    RelayAddress,
46
    Reply,
47
    address_hash,
48
    get_domain_numerical,
49
)
50
from .policy import relay_policy
1✔
51
from .types import (
1✔
52
    AWS_MailJSON,
53
    AWS_SNSMessageJSON,
54
    OutgoingHeaders,
55
    EmailForwardingIssues,
56
    EmailHeaderIssues,
57
)
58
from .utils import (
1✔
59
    _get_bucket_and_key_from_s3_json,
60
    b64_lookup_key,
61
    count_all_trackers,
62
    decrypt_reply_metadata,
63
    derive_reply_keys,
64
    encrypt_reply_metadata,
65
    generate_from_header,
66
    get_domains_from_settings,
67
    get_message_content_from_s3,
68
    get_message_id_bytes,
69
    get_reply_to_address,
70
    histogram_if_enabled,
71
    incr_if_enabled,
72
    remove_message_from_s3,
73
    remove_trackers,
74
    ses_send_raw_email,
75
    urlize_and_linebreaks,
76
    InvalidFromHeader,
77
    parse_email_header,
78
)
79
from .sns import verify_from_sns, SUPPORTED_SNS_TYPES
1✔
80

81
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
82
from privaterelay.utils import flag_is_active_in_task
1✔
83

84
logger = logging.getLogger("events")
1✔
85
info_logger = logging.getLogger("eventsinfo")
1✔
86

87

88
class ReplyHeadersNotFound(Exception):
1✔
89
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
90
        self.message = message
1✔
91

92

93
def first_time_user_test(request):
1✔
94
    """
95
    Demonstrate rendering of the "First time Relay user" email.
96
    Settings like language can be given in the querystring, otherwise settings
97
    come from a random free profile.
98
    """
99
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
100
    email_context = {
×
101
        "in_bundle_country": in_bundle_country,
102
        "SITE_ORIGIN": settings.SITE_ORIGIN,
103
    }
104
    if request.GET.get("format", "html") == "text":
×
105
        return render(
×
106
            request,
107
            "emails/first_time_user.txt",
108
            email_context,
109
            "text/plain; charset=utf-8",
110
        )
111
    return render(request, "emails/first_time_user.html", email_context)
×
112

113

114
def reply_requires_premium_test(request):
1✔
115
    """
116
    Demonstrate rendering of the "Reply requires premium" email.
117

118
    Settings like language can be given in the querystring, otherwise settings
119
    come from a random free profile.
120
    """
121
    email_context = {
1✔
122
        "sender": "test@example.com",
123
        "forwarded": True,
124
        "SITE_ORIGIN": settings.SITE_ORIGIN,
125
    }
126
    for param in request.GET:
1✔
127
        email_context[param] = request.GET.get(param)
1✔
128
        if param == "forwarded" and request.GET[param] == "True":
1✔
129
            email_context[param] = True
1✔
130

131
    for param in request.GET:
1✔
132
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
133
            return render(
1✔
134
                request,
135
                "emails/reply_requires_premium.txt",
136
                email_context,
137
                "text/plain; charset=utf-8",
138
            )
139
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
140

141

142
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
143
    # TO DO: Update with correct context when trigger is created
144
    first_forwarded_email_html = render_to_string(
×
145
        "emails/first_forwarded_email.html",
146
        {
147
            "SITE_ORIGIN": settings.SITE_ORIGIN,
148
        },
149
    )
150

151
    wrapped_email = wrap_html_email(
×
152
        first_forwarded_email_html,
153
        "en-us",
154
        True,
155
        "test@example.com",
156
        0,
157
    )
158

159
    return HttpResponse(wrapped_email)
×
160

161

162
def wrap_html_email(
1✔
163
    original_html: str,
164
    language: str,
165
    has_premium: bool,
166
    display_email: str,
167
    num_level_one_email_trackers_removed: int | None = None,
168
    tracker_report_link: str | None = None,
169
) -> str:
170
    """Add Relay banners, surveys, etc. to an HTML email"""
171
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
172
    email_context = {
1✔
173
        "original_html": original_html,
174
        "language": language,
175
        "has_premium": has_premium,
176
        "subplat_upgrade_link": subplat_upgrade_link,
177
        "display_email": display_email,
178
        "tracker_report_link": tracker_report_link,
179
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
180
        "SITE_ORIGIN": settings.SITE_ORIGIN,
181
    }
182
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
183
    # Remove empty lines
184
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
185
    return "\n".join(content_lines) + "\n"
1✔
186

187

188
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
189
    """
190
    Demonstrate rendering of forwarded HTML emails.
191

192
    Settings like language can be given in the querystring, otherwise settings
193
    come from a randomly chosen profile.
194
    """
195

196
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
197
        user_profile = None
1✔
198
    else:
199
        user_profile = Profile.objects.order_by("?").first()
1✔
200

201
    if "language" in request.GET:
1✔
202
        language = request.GET["language"]
1✔
203
    else:
204
        assert user_profile is not None
1✔
205
        language = user_profile.language
1✔
206

207
    if "has_premium" in request.GET:
1✔
208
        has_premium = strtobool(request.GET["has_premium"])
1✔
209
    else:
210
        assert user_profile is not None
1✔
211
        has_premium = user_profile.has_premium
1✔
212

213
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
214
        num_level_one_email_trackers_removed = int(
1✔
215
            request.GET["num_level_one_email_trackers_removed"]
216
        )
217
    else:
218
        num_level_one_email_trackers_removed = 0
1✔
219

220
    if "has_tracker_report_link" in request.GET:
1✔
221
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
222
    else:
223
        has_tracker_report_link = False
1✔
224
    if has_tracker_report_link:
1✔
225
        if num_level_one_email_trackers_removed:
1✔
226
            trackers = {
1✔
227
                "fake-tracker.example.com": num_level_one_email_trackers_removed
228
            }
229
        else:
230
            trackers = {}
1✔
231
        tracker_report_link = (
1✔
232
            "/tracker-report/#{"
233
            '"sender": "sender@example.com", '
234
            '"received_at": 1658434657, '
235
            f'"trackers": { json.dumps(trackers) }'
236
            "}"
237
        )
238
    else:
239
        tracker_report_link = ""
1✔
240

241
    path = "/emails/wrapped_email_test"
1✔
242
    old_query = {
1✔
243
        "language": language,
244
        "has_premium": "Yes" if has_premium else "No",
245
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
246
        "num_level_one_email_trackers_removed": str(
247
            num_level_one_email_trackers_removed
248
        ),
249
    }
250

251
    def switch_link(key, value):
1✔
252
        if old_query[key] == value:
1✔
253
            return str(value)
1✔
254
        new_query = old_query.copy()
1✔
255
        new_query[key] = value
1✔
256
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
257

258
    html_content = dedent(
1✔
259
        f"""\
260
    <p>
261
      <strong>Email rendering Test</strong>
262
    </p>
263
    <p>Settings: (<a href="{path}">clear all</a>)</p>
264
    <ul>
265
      <li>
266
        <strong>language</strong>:
267
        {escape(language)}
268
        (switch to
269
        {switch_link("language", "en-us")},
270
        {switch_link("language", "de")},
271
        {switch_link("language", "en-gb")},
272
        {switch_link("language", "fr")},
273
        {switch_link("language", "ru-ru")},
274
        {switch_link("language", "es-es")},
275
        {switch_link("language", "pt-br")},
276
        {switch_link("language", "it-it")},
277
        {switch_link("language", "en-ca")},
278
        {switch_link("language", "de-de")},
279
        {switch_link("language", "es-mx")})
280
      </li>
281
      <li>
282
        <strong>has_premium</strong>:
283
        {"Yes" if has_premium else "No"}
284
        (switch to
285
        {switch_link("has_premium", "Yes")},
286
        {switch_link("has_premium", "No")})
287
      </li>
288
      <li>
289
        <strong>has_tracker_report_link</strong>:
290
        {"Yes" if has_tracker_report_link else "No"}
291
        (switch to
292
        {switch_link("has_tracker_report_link", "Yes")},
293
        {switch_link("has_tracker_report_link", "No")})
294
      </li>
295
      <li>
296
        <strong>num_level_one_email_trackers_removed</strong>:
297
        {num_level_one_email_trackers_removed}
298
        (switch to
299
        {switch_link("num_level_one_email_trackers_removed", "0")},
300
        {switch_link("num_level_one_email_trackers_removed", "1")},
301
        {switch_link("num_level_one_email_trackers_removed", "2")})
302
      </li>
303
    </ul>
304
    """
305
    )
306

307
    wrapped_email = wrap_html_email(
1✔
308
        original_html=html_content,
309
        language=language,
310
        has_premium=has_premium,
311
        tracker_report_link=tracker_report_link,
312
        display_email="test@relay.firefox.com",
313
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
314
    )
315
    return HttpResponse(wrapped_email)
1✔
316

317

318
def _store_reply_record(
1✔
319
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
320
) -> AWS_MailJSON:
321
    # After relaying email, store a Reply record for it
322
    reply_metadata = {}
1✔
323
    for header in mail["headers"]:
1✔
324
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
325
            reply_metadata[header["name"].lower()] = header["value"]
1✔
326
    message_id_bytes = get_message_id_bytes(message_id)
1✔
327
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
328
    lookup = b64_lookup_key(lookup_key)
1✔
329
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
330
    reply_create_args: dict[str, Any] = {
1✔
331
        "lookup": lookup,
332
        "encrypted_metadata": encrypted_metadata,
333
    }
334
    if type(address) == DomainAddress:
1✔
335
        reply_create_args["domain_address"] = address
1✔
336
    elif type(address) == RelayAddress:
1!
337
        reply_create_args["relay_address"] = address
1✔
338
    Reply.objects.create(**reply_create_args)
1✔
339
    return mail
1✔
340

341

342
@csrf_exempt
1✔
343
def sns_inbound(request):
1✔
344
    incr_if_enabled("sns_inbound", 1)
1✔
345
    # First thing we do is verify the signature
346
    json_body = json.loads(request.body)
1✔
347
    verified_json_body = verify_from_sns(json_body)
1✔
348

349
    # Validate ARN and message type
350
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
351
    message_type = verified_json_body.get("Type", None)
1✔
352
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
353
    if error_details:
1✔
354
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
355
        return HttpResponse(error_details["error"], status=400)
1✔
356

357
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
358

359

360
def validate_sns_arn_and_type(
1✔
361
    topic_arn: str | None, message_type: str | None
362
) -> dict[str, Any] | None:
363
    """
364
    Validate Topic ARN and SNS Message Type.
365

366
    If an error is detected, the return is a dictionary of error details.
367
    If no error is detected, the return is None.
368
    """
369
    if not topic_arn:
1✔
370
        error = "Received SNS request without Topic ARN."
1✔
371
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
372
        error = "Received SNS message for wrong topic."
1✔
373
    elif not message_type:
1✔
374
        error = "Received SNS request without Message Type."
1✔
375
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
376
        error = "Received SNS message for unsupported Type."
1✔
377
    else:
378
        error = None
1✔
379

380
    if error:
1✔
381
        return {
1✔
382
            "error": error,
383
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
384
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
385
            "received_sns_type": (
386
                shlex.quote(message_type) if message_type else message_type
387
            ),
388
            "supported_sns_types": SUPPORTED_SNS_TYPES,
389
        }
390
    return None
1✔
391

392

393
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
UNCOV
394
    if message_type == "SubscriptionConfirmation":
×
395
        info_logger.info(
×
396
            "SNS SubscriptionConfirmation",
397
            extra={"SubscribeURL": json_body["SubscribeURL"]},
398
        )
UNCOV
399
        return HttpResponse("Logged SubscribeURL", status=200)
×
400
    if message_type == "Notification":
×
401
        incr_if_enabled("sns_inbound_Notification", 1)
×
402
        return _sns_notification(json_body)
×
403

UNCOV
404
    logger.error(
×
405
        "SNS message type did not fall under the SNS inbound logic",
406
        extra={"message_type": shlex.quote(message_type)},
407
    )
UNCOV
408
    capture_message(
×
409
        "Received SNS message with type not handled in inbound log",
410
        level="error",
411
        stack=True,
412
    )
UNCOV
413
    return HttpResponse(
×
414
        "Received SNS message with type not handled in inbound log", status=400
415
    )
416

417

418
def _sns_notification(json_body):
1✔
419
    try:
1✔
420
        message_json = json.loads(json_body["Message"])
1✔
421
    except JSONDecodeError:
1✔
422
        logger.error(
1✔
423
            "SNS notification has non-JSON message body",
424
            extra={"content": shlex.quote(json_body["Message"])},
425
        )
426
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
427

428
    event_type = message_json.get("eventType")
1✔
429
    notification_type = message_json.get("notificationType")
1✔
430
    if notification_type not in {
1✔
431
        "Complaint",
432
        "Received",
433
        "Bounce",
434
    } and event_type not in {"Complaint", "Bounce"}:
435
        logger.error(
1✔
436
            "SNS notification for unsupported type",
437
            extra={
438
                "notification_type": shlex.quote(notification_type),
439
                "event_type": shlex.quote(event_type),
440
                "keys": [shlex.quote(key) for key in message_json.keys()],
441
            },
442
        )
443
        return HttpResponse(
1✔
444
            "Received SNS notification for unsupported Type: %s"
445
            % html.escape(shlex.quote(notification_type)),
446
            status=400,
447
        )
448
    response = _sns_message(message_json)
1✔
449
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
450
    if response.status_code < 500:
1✔
451
        remove_message_from_s3(bucket, object_key)
1✔
452

453
    return response
1✔
454

455

456
def _get_recipient_with_relay_domain(recipients):
1✔
457
    domains_to_check = get_domains_from_settings().values()
1✔
458
    for recipient in recipients:
1✔
459
        for domain in domains_to_check:
1✔
460
            if domain in recipient:
1✔
461
                return recipient
1✔
462
    return None
1✔
463

464

465
def _get_relay_recipient_from_message_json(message_json):
1✔
466
    # Go thru all To, Cc, and Bcc fields and
467
    # return the one that has a Relay domain
468

469
    # First check commmon headers for to or cc match
470
    headers_to_check = "to", "cc"
1✔
471
    common_headers = message_json["mail"]["commonHeaders"]
1✔
472
    for header in headers_to_check:
1✔
473
        if header in common_headers:
1✔
474
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
475
            if recipient is not None:
1✔
476
                return parseaddr(recipient)[1]
1✔
477

478
    # SES-SNS sends bcc in a different part of the message
479
    recipients = message_json["receipt"]["recipients"]
1✔
480
    return _get_recipient_with_relay_domain(recipients)
1✔
481

482

483
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
484
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
485
    notification_type = message_json.get("notificationType")
1✔
486
    event_type = message_json.get("eventType")
1✔
487
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
488
        return _handle_bounce(message_json)
1✔
489
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
490
        return _handle_complaint(message_json)
1✔
491
    assert notification_type == "Received" and event_type is None
1✔
492
    return _handle_received(message_json)
1✔
493

494

495
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
496
    """
497
    Handle an AWS SES received notification.
498

499
    For more information, see:
500
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
501
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
502

503
    Returns (may be incomplete):
504
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
505
      flagged for abuse, the email is under a bounce pause, the email was suppressed
506
      for spam, the list email was blocked, or the noreply address was the recipient.
507
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
508
      the email failed DMARC with reject policy, or the email is a reply chain to a
509
      non-premium user.
510
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
511
      "CC", or "BCC" fields, or the Relay address is not in the database.
512
    * 503 if the "From" address is malformed, the S3 client returned an error different
513
      from "not found", or the SES client fails
514

515
    And many other returns conditions if the email is a reply. The HTTP returns are an
516
    artifact from an earlier time when emails were sent to a webhook. Currently,
517
    production instead pulls events from a queue.
518

519
    TODO: Return a more appropriate status object
520
    TODO: Document the metrics emitted
521
    """
522
    mail = message_json["mail"]
1✔
523
    if "commonHeaders" not in mail:
1✔
524
        logger.error("SNS message without commonHeaders")
1✔
525
        return HttpResponse(
1✔
526
            "Received SNS notification without commonHeaders.", status=400
527
        )
528
    common_headers = mail["commonHeaders"]
1✔
529
    receipt = message_json["receipt"]
1✔
530

531
    _record_receipt_verdicts(receipt, "all")
1✔
532
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
533
    if to_address is None:
1✔
534
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
535
        return HttpResponse("Address does not exist", status=404)
1✔
536

537
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
538
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
539
    if not from_addresses:
1✔
540
        info_logger.error(
1✔
541
            "_handle_received: no from address",
542
            extra={
543
                "source": mail["source"],
544
                "common_headers_from": common_headers["from"],
545
            },
546
        )
547
        return HttpResponse("Unable to parse From address", status=400)
1✔
548
    from_address = from_addresses[0][1]
1✔
549

550
    try:
1✔
551
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
552
    except ValueError:
1✔
553
        # TODO: Add metric
554
        return HttpResponse("Malformed to field.", status=400)
1✔
555

556
    if to_local_portion.lower() == "noreply":
1✔
557
        incr_if_enabled("email_for_noreply_address", 1)
1✔
558
        return HttpResponse("noreply address is not supported.")
1✔
559
    try:
1✔
560
        # FIXME: this ambiguous return of either
561
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
562
        # up a bit.
563
        address = _get_address(to_address)
1✔
564
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
565
        user_profile = address.user.profile
1✔
566
    except (
1✔
567
        ObjectDoesNotExist,
568
        CannotMakeAddressException,
569
        DeletedAddress.MultipleObjectsReturned,
570
    ):
571
        if to_local_portion.lower() == "replies":
1✔
572
            response = _handle_reply(from_address, message_json, to_address)
1✔
573
        else:
574
            response = HttpResponse("Address does not exist", status=404)
1✔
575
        return response
1✔
576

577
    _record_receipt_verdicts(receipt, "valid_user")
1✔
578
    # if this is spam and the user is set to auto-block spam, early return
579
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
580
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
581
        glean_logger().log_email_blocked(mask=address, reason="auto_block_spam")
1✔
582
        return HttpResponse("Address rejects spam.")
1✔
583

584
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
585
        policy = receipt.get("dmarcPolicy", "none")
1✔
586
        # TODO: determine action on dmarcPolicy "quarantine"
587
        if policy == "reject":
1!
588
            glean_logger().log_email_blocked(mask=address, reason="dmarc_reject_failed")
1✔
589
            incr_if_enabled(
1✔
590
                "email_suppressed_for_dmarc_failure",
591
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
592
            )
593
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
594

595
    # if this user is over bounce limits, early return
596
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
597
    if bounce_paused:
1✔
598
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
599
        incr_if_enabled("email_suppressed_for_%s_bounce" % bounce_type, 1)
1✔
600
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
601
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
602
        )
603
        glean_logger().log_email_blocked(mask=address, reason=reason)
1✔
604
        return HttpResponse("Address is temporarily disabled.")
1✔
605

606
    # check if this is a reply from an external sender to a Relay user
607
    try:
1✔
608
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
609
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
610
        user_address = address
1✔
611
        address = reply_record.address
1✔
612
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
613
        # make sure the relay user is premium
614
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
615
            glean_logger().log_email_blocked(
1✔
616
                mask=user_address, reason="reply_requires_premium"
617
            )
618
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
619
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
620
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
621
        # match a Reply record, continue to treat this as a regular email from
622
        # an external sender to a relay user
623
        pass
1✔
624

625
    # if account flagged for abuse, early return
626
    if user_profile.is_flagged:
1✔
627
        glean_logger().log_email_blocked(mask=address, reason="abuse_flag")
1✔
628
        return HttpResponse("Address is temporarily disabled.")
1✔
629

630
    # if address is set to block, early return
631
    if not address.enabled:
1✔
632
        incr_if_enabled("email_for_disabled_address", 1)
1✔
633
        address.num_blocked += 1
1✔
634
        address.save(update_fields=["num_blocked"])
1✔
635
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
636
        user_profile.last_engagement = datetime.now(timezone.utc)
1✔
637
        user_profile.save()
1✔
638
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
639
        return HttpResponse("Address is temporarily disabled.")
1✔
640

641
    _record_receipt_verdicts(receipt, "active_alias")
1✔
642
    incr_if_enabled("email_for_active_address", 1)
1✔
643

644
    # if address is blocking list emails, and email is from list, early return
645
    if (
1✔
646
        address
647
        and address.block_list_emails
648
        and user_profile.has_premium
649
        and _check_email_from_list(mail["headers"])
650
    ):
651
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
652
        address.num_blocked += 1
1✔
653
        address.save(update_fields=["num_blocked"])
1✔
654
        user_profile.last_engagement = datetime.now(timezone.utc)
1✔
655
        user_profile.save()
1✔
656
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
657
        return HttpResponse("Address is not accepting list emails.")
1✔
658

659
    # Collect new headers
660
    subject = common_headers.get("subject", "")
1✔
661
    destination_address = user_profile.user.email
1✔
662
    reply_address = get_reply_to_address()
1✔
663
    try:
1✔
664
        from_header = generate_from_header(from_address, to_address)
1✔
665
    except InvalidFromHeader:
1✔
666
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
667
        header_from = []
1✔
668
        for header in mail["headers"]:
1✔
669
            if header["name"].lower() == "from":
1✔
670
                header_from.append(header)
1✔
671
        info_logger.error(
1✔
672
            "generate_from_header",
673
            extra={
674
                "from_address": from_address,
675
                "source": mail["source"],
676
                "common_headers_from": common_headers["from"],
677
                "headers_from": header_from,
678
            },
679
        )
680
        glean_logger().log_email_blocked(
1✔
681
            mask=address, reason="error_from_header", can_retry=True
682
        )
683
        return HttpResponse("Cannot parse the From address", status=503)
1✔
684

685
    headers: OutgoingHeaders = {
1✔
686
        "Subject": subject,
687
        "From": from_header,
688
        "To": destination_address,
689
        "Reply-To": reply_address,
690
        "Resent-From": from_address,
691
    }
692

693
    # Get incoming email
694
    try:
1✔
695
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
696
    except ClientError as e:
1✔
697
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
698
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
699
            glean_logger().log_email_blocked(mask=address, reason="content_missing")
1✔
700
            return HttpResponse("Email not in S3", status=404)
1✔
701
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
702
        glean_logger().log_email_blocked(
1✔
703
            mask=address, reason="error_storage", can_retry=True
704
        )
705
        # we are returning a 503 so that SNS can retry the email processing
706
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
707

708
    # Convert to new email
709
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
710
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
711
    remove_level_one_trackers = bool(
1✔
712
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
713
    )
714
    (
1✔
715
        forwarded_email,
716
        issues,
717
        level_one_trackers_removed,
718
        has_html,
719
        has_text,
720
    ) = _convert_to_forwarded_email(
721
        incoming_email_bytes=incoming_email_bytes,
722
        headers=headers,
723
        to_address=to_address,
724
        from_address=from_address,
725
        language=user_profile.language,
726
        has_premium=user_profile.has_premium,
727
        sample_trackers=sample_trackers,
728
        remove_level_one_trackers=remove_level_one_trackers,
729
    )
730
    if has_html:
1✔
731
        incr_if_enabled("email_with_html_content", 1)
1✔
732
    if has_text:
1!
733
        incr_if_enabled("email_with_text_content", 1)
1✔
734
    if issues:
1✔
735
        info_logger.warning(
1✔
736
            "_handle_received: forwarding issues", extra={"issues": issues}
737
        )
738

739
    # Send new email
740
    try:
1✔
741
        ses_response = ses_send_raw_email(
1✔
742
            source_address=reply_address,
743
            destination_address=destination_address,
744
            message=forwarded_email,
745
        )
746
    except ClientError:
1✔
747
        # 503 service unavailable reponse to SNS so it can retry
748
        glean_logger().log_email_blocked(
1✔
749
            mask=address, reason="error_sending", can_retry=True
750
        )
751
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
752

753
    message_id = ses_response["MessageId"]
1✔
754
    _store_reply_record(mail, message_id, address)
1✔
755

756
    user_profile.update_abuse_metric(
1✔
757
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
758
    )
759
    user_profile.last_engagement = datetime.now(timezone.utc)
1✔
760
    user_profile.save()
1✔
761
    address.num_forwarded += 1
1✔
762
    address.last_used_at = datetime.now(timezone.utc)
1✔
763
    if level_one_trackers_removed:
1!
UNCOV
764
        address.num_level_one_trackers_blocked = (
×
765
            address.num_level_one_trackers_blocked or 0
766
        ) + level_one_trackers_removed
767
    address.save(
1✔
768
        update_fields=[
769
            "num_forwarded",
770
            "last_used_at",
771
            "block_list_emails",
772
            "num_level_one_trackers_blocked",
773
        ]
774
    )
775
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
776
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
777

778

779
def _get_verdict(receipt, verdict_type):
1✔
780
    return receipt["%sVerdict" % verdict_type]["status"]
1✔
781

782

783
def _check_email_from_list(headers):
1✔
784
    for header in headers:
1!
785
        if header["name"].lower().startswith("list-"):
1!
786
            return True
1✔
UNCOV
787
    return False
×
788

789

790
def _record_receipt_verdicts(receipt, state):
1✔
791
    verdict_tags = []
1✔
792
    for key in sorted(receipt.keys()):
1✔
793
        if key.endswith("Verdict"):
1✔
794
            value = receipt[key]["status"]
1✔
795
            verdict_tags.append(f"{key}:{value}")
1✔
796
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
797
        elif key == "dmarcPolicy":
1✔
798
            value = receipt[key]
1✔
799
            verdict_tags.append(f"{key}:{value}")
1✔
800
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
801

802

803
def _get_message_id_from_headers(headers):
1✔
804
    message_id = None
1✔
805
    for header in headers:
1✔
806
        if header["name"].lower() == "message-id":
1✔
807
            message_id = header["value"]
1✔
808
    return message_id
1✔
809

810

811
def _get_keys_from_headers(headers):
1✔
812
    in_reply_to = None
1✔
813
    for header in headers:
1✔
814
        if header["name"].lower() == "in-reply-to":
1✔
815
            in_reply_to = header["value"]
1✔
816
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
817
            return derive_reply_keys(message_id_bytes)
1✔
818

819
        if header["name"].lower() == "references":
1✔
820
            message_ids = header["value"]
1✔
821
            for message_id in message_ids.split(" "):
1✔
822
                message_id_bytes = get_message_id_bytes(message_id)
1✔
823
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
824
                try:
1✔
825
                    # FIXME: calling code is likely to duplicate this query
826
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
827
                    return lookup_key, encryption_key
1✔
828
                except Reply.DoesNotExist:
1✔
829
                    pass
1✔
830
            raise Reply.DoesNotExist
1✔
831
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
832
    raise ReplyHeadersNotFound
1✔
833

834

835
def _get_reply_record_from_lookup_key(lookup_key):
1✔
836
    lookup = b64_lookup_key(lookup_key)
1✔
837
    return Reply.objects.get(lookup=lookup)
1✔
838

839

840
def _strip_localpart_tag(address):
1✔
841
    [localpart, domain] = address.split("@")
1✔
842
    subaddress_parts = localpart.split("+")
1✔
843
    return f"{subaddress_parts[0]}@{domain}"
1✔
844

845

846
_TransportType = Literal["sns", "s3"]
1✔
847

848

849
def _get_email_bytes(
1✔
850
    message_json: AWS_SNSMessageJSON,
851
) -> tuple[bytes, _TransportType, float]:
852
    with Timer(logger=None) as load_timer:
1✔
853
        if "content" in message_json:
1✔
854
            # email content in sns message
855
            message_content = message_json["content"].encode("utf-8")
1✔
856
            transport: Literal["sns", "s3"] = "sns"
1✔
857
        else:
858
            # assume email content in S3
859
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
860
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
861
            transport = "s3"
1✔
862
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
863
    load_time_s = round(load_timer.last, 3)
1✔
864
    return (message_content, transport, load_time_s)
1✔
865

866

867
def _convert_to_forwarded_email(
1✔
868
    incoming_email_bytes: bytes,
869
    headers: OutgoingHeaders,
870
    to_address: str,
871
    from_address: str,
872
    language: str,
873
    has_premium: bool,
874
    sample_trackers: bool,
875
    remove_level_one_trackers: bool,
876
    now: datetime | None = None,
877
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
878
    """
879
    Convert an email (as bytes) to a forwarded email.
880

881
    Return is a tuple:
882
    - email - The forwarded email
883
    - issues - Any detected issues in conversion
884
    - level_one_trackers_removed (int) - Number of trackers removed
885
    - has_html - True if the email has an HTML representation
886
    - has_text - True if the email has a plain text representation
887
    """
888
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
889
    # python/typeshed issue 2418
890
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
891
    # policy.default.message_factory is EmailMessage
892
    assert isinstance(email, EmailMessage)
1✔
893

894
    # Replace headers in the original email
895
    header_issues = _replace_headers(email, headers)
1✔
896

897
    # Find and replace text content
898
    text_body = email.get_body("plain")
1✔
899
    text_content = None
1✔
900
    has_text = False
1✔
901
    if text_body:
1!
902
        has_text = True
1✔
903
        assert isinstance(text_body, EmailMessage)
1✔
904
        text_content = text_body.get_content()
1✔
905
        new_text_content = _convert_text_content(text_content, to_address)
1✔
906
        text_body.set_content(new_text_content)
1✔
907

908
    # Find and replace HTML content
909
    html_body = email.get_body("html")
1✔
910
    level_one_trackers_removed = 0
1✔
911
    has_html = False
1✔
912
    if html_body:
1✔
913
        has_html = True
1✔
914
        assert isinstance(html_body, EmailMessage)
1✔
915
        html_content = html_body.get_content()
1✔
916
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
917
            html_content,
918
            to_address,
919
            from_address,
920
            language,
921
            has_premium,
922
            sample_trackers,
923
            remove_level_one_trackers,
924
        )
925
        html_body.set_content(new_content, subtype="html")
1✔
926
    elif text_content:
1!
927
        # Try to use the text content to generate HTML content
928
        html_content = urlize_and_linebreaks(text_content)
1✔
929
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
930
            html_content,
931
            to_address,
932
            from_address,
933
            language,
934
            has_premium,
935
            sample_trackers,
936
            remove_level_one_trackers,
937
        )
938
        assert isinstance(text_body, EmailMessage)
1✔
939
        try:
1✔
940
            text_body.add_alternative(new_content, subtype="html")
1✔
UNCOV
941
        except TypeError as e:
×
UNCOV
942
            out = StringIO()
×
UNCOV
943
            _structure(email, fp=out)
×
UNCOV
944
            info_logger.error(
×
945
                "Adding HTML alternate failed",
946
                extra={"exception": str(e), "structure": out.getvalue()},
947
            )
948

949
    issues: EmailForwardingIssues = {}
1✔
950
    if header_issues:
1✔
951
        issues["headers"] = header_issues
1✔
952
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
953

954

955
def _replace_headers(
1✔
956
    email: EmailMessage, headers: OutgoingHeaders
957
) -> EmailHeaderIssues:
958
    """
959
    Replace the headers in email with new headers.
960

961
    This replaces headers in the passed email object, rather than returns an altered
962
    copy. The primary reason is that the Python email package can read an email with
963
    non-compliant headers or content, but can't write it. A read/write is required to
964
    create a copy that we then alter. This code instead alters the passed EmailMessage
965
    object, making header-specific changes in try / except statements.
966

967
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
968
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
969
    nice to handle the non-compliant headers without crashing before we add a source of
970
    memory-related crashes.
971
    """
972
    # Look for headers to drop
973
    to_drop: list[str] = []
1✔
974
    replacements: set[str] = set(_k.lower() for _k in headers.keys())
1✔
975
    issues: EmailHeaderIssues = defaultdict(list)
1✔
976

977
    # Detect non-compliant headers in incoming emails
978
    for header in email.keys():
1✔
979
        try:
1✔
980
            value = email[header]
1✔
981
        except Exception as e:
1✔
982
            issues["incoming"].append((header, {"exception_on_read": repr(e)}))
1✔
983
            value = None
1✔
984
        if getattr(value, "defects", None):
1✔
985
            issues["incoming"].append(
1✔
986
                (
987
                    header,
988
                    {
989
                        "defect_count": len(value.defects),
990
                        "parsed_value": str(value),
991
                        "unstructured_value": str(value.as_unstructured),
992
                    },
993
                )
994
            )
995

996
    # Collect headers that will not be forwarded
997
    for header in email.keys():
1✔
998
        header_lower = header.lower()
1✔
999
        if (
1✔
1000
            header_lower not in replacements
1001
            and header_lower != "mime-version"
1002
            and not header_lower.startswith("content-")
1003
        ):
1004
            to_drop.append(header)
1✔
1005

1006
    # Drop headers that should be dropped
1007
    for header in to_drop:
1✔
1008
        del email[header]
1✔
1009

1010
    # Replace the requested headers
1011
    for header, value in headers.items():
1✔
1012
        del email[header]
1✔
1013
        try:
1✔
1014
            email[header] = value
1✔
UNCOV
1015
        except Exception as e:
×
UNCOV
1016
            issues["outgoing"].append(
×
1017
                (header, {"exception_on_write": repr(e), "value": value})
1018
            )
UNCOV
1019
            continue
×
1020
        try:
1✔
1021
            parsed_value = email[header]
1✔
UNCOV
1022
        except Exception as e:
×
UNCOV
1023
            issues["outgoing"].append((header, {"exception_on_read": repr(e)}))
×
UNCOV
1024
            continue
×
1025
        if parsed_value.defects:
1!
UNCOV
1026
            issues["outgoing"].append(
×
1027
                (
1028
                    header,
1029
                    {
1030
                        "defect_count": len(parsed_value.defects),
1031
                        "parsed_value": str(parsed_value),
1032
                        "unstructured_value": str(parsed_value.as_unstructured),
1033
                    },
1034
                )
1035
            )
1036

1037
    return dict(issues)
1✔
1038

1039

1040
def _convert_html_content(
1✔
1041
    html_content: str,
1042
    to_address: str,
1043
    from_address: str,
1044
    language: str,
1045
    has_premium: bool,
1046
    sample_trackers: bool,
1047
    remove_level_one_trackers: bool,
1048
    now: datetime | None = None,
1049
) -> tuple[str, int]:
1050
    # frontend expects a timestamp in milliseconds
1051
    now = now or datetime.now(timezone.utc)
1✔
1052
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1053

1054
    # scramble alias so that clients don't recognize it
1055
    # and apply default link styles
1056
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1057

1058
    # sample tracker numbers
1059
    if sample_trackers:
1!
1060
        count_all_trackers(html_content)
×
1061

1062
    tracker_report_link = ""
1✔
1063
    removed_count = 0
1✔
1064
    if remove_level_one_trackers:
1!
UNCOV
1065
        html_content, tracker_details = remove_trackers(
×
1066
            html_content, from_address, datetime_now_ms
1067
        )
UNCOV
1068
        removed_count = tracker_details["tracker_removed"]
×
UNCOV
1069
        tracker_report_details = {
×
1070
            "sender": from_address,
1071
            "received_at": datetime_now_ms,
1072
            "trackers": tracker_details["level_one"]["trackers"],
1073
        }
UNCOV
1074
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1075
            tracker_report_details
1076
        )
1077

1078
    wrapped_html = wrap_html_email(
1✔
1079
        original_html=html_content,
1080
        language=language,
1081
        has_premium=has_premium,
1082
        display_email=display_email,
1083
        tracker_report_link=tracker_report_link,
1084
        num_level_one_email_trackers_removed=removed_count,
1085
    )
1086
    return wrapped_html, removed_count
1✔
1087

1088

1089
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1090
    relay_header_text = (
1✔
1091
        "This email was sent to your alias "
1092
        f"{to_address}. To stop receiving emails sent to this alias, "
1093
        "update the forwarding settings in your dashboard.\n"
1094
        "---Begin Email---\n"
1095
    )
1096
    wrapped_text = relay_header_text + text_content
1✔
1097
    return wrapped_text
1✔
1098

1099

1100
def _build_reply_requires_premium_email(
1✔
1101
    from_address: str,
1102
    reply_record: Reply,
1103
    message_id: str | None,
1104
    decrypted_metadata: dict[str, Any] | None,
1105
) -> EmailMessage:
1106
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1107
    # will forward.  So, tell the user we forwarded it.
1108
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1109
    sender: str | None = ""
1✔
1110
    if decrypted_metadata is not None:
1!
1111
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1112
    ctx = {
1✔
1113
        "sender": sender or "",
1114
        "forwarded": forwarded,
1115
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1116
    }
1117
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1118
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1119

1120
    # Create the message
1121
    msg = EmailMessage()
1✔
1122
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1123
    msg["From"] = get_reply_to_address()
1✔
1124
    msg["To"] = from_address
1✔
1125
    if message_id:
1!
1126
        msg["In-Reply-To"] = message_id
1✔
1127
        msg["References"] = message_id
1✔
1128
    msg.set_content(text_body)
1✔
1129
    msg.add_alternative(html_body, subtype="html")
1✔
1130
    return msg
1✔
1131

1132

1133
def _set_forwarded_first_reply(profile):
1✔
1134
    profile.forwarded_first_reply = True
1✔
1135
    profile.save()
1✔
1136

1137

1138
def _send_reply_requires_premium_email(
1✔
1139
    from_address: str,
1140
    reply_record: Reply,
1141
    message_id: str | None,
1142
    decrypted_metadata: dict[str, Any] | None,
1143
) -> None:
UNCOV
1144
    msg = _build_reply_requires_premium_email(
×
1145
        from_address, reply_record, message_id, decrypted_metadata
1146
    )
UNCOV
1147
    try:
×
UNCOV
1148
        ses_send_raw_email(
×
1149
            source_address=get_reply_to_address(premium=False),
1150
            destination_address=from_address,
1151
            message=msg,
1152
        )
1153
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1154
        # So, updated the DB.
UNCOV
1155
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
UNCOV
1156
    except ClientError as e:
×
UNCOV
1157
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
UNCOV
1158
    incr_if_enabled("free_user_reply_attempt", 1)
×
1159

1160

1161
def _reply_allowed(
1✔
1162
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1163
):
1164
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1165
    reply_record_email = reply_record.address.user.email
1✔
1166
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1167
    if (from_address == reply_record_email) or (
1!
1168
        stripped_from_address == stripped_reply_record_address
1169
    ):
1170
        # This is a Relay user replying to an external sender;
1171

1172
        if reply_record.profile.is_flagged:
1!
UNCOV
1173
            return False
×
1174

1175
        if reply_record.owner_has_premium:
1!
1176
            return True
1✔
1177

1178
        # if we haven't forwarded a first reply for this user, return True to allow
1179
        # this first reply
1180
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
UNCOV
1181
        _send_reply_requires_premium_email(
×
1182
            from_address, reply_record, message_id, decrypted_metadata
1183
        )
1184
        return allow_first_reply
×
1185
    else:
1186
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1187
        # premium Relay user
UNCOV
1188
        try:
×
UNCOV
1189
            address = _get_address(to_address)
×
UNCOV
1190
            if address.user.profile.has_premium:
×
1191
                return True
×
1192
        except ObjectDoesNotExist:
×
1193
            return False
×
1194
    incr_if_enabled("free_user_reply_attempt", 1)
×
UNCOV
1195
    return False
×
1196

1197

1198
def _handle_reply(
1✔
1199
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1200
) -> HttpResponse:
1201
    """
1202
    Handle a reply from a Relay user to an external email.
1203

1204
    Returns (may be incomplete):
1205
    * 200 if the reply was sent
1206
    * 400 if the In-Reply-To and References headers are missing, none of the References
1207
      headers are a reply record, or the SES client raises an error
1208
    * 403 if the Relay user is not allowed to reply
1209
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1210
      the database
1211
    * 503 if the S3 client returns an error (other than not found), or the SES client
1212
      returns an error
1213

1214
    TODO: Return a more appropriate status object (see _handle_received)
1215
    TODO: Document metrics emitted
1216
    """
1217
    mail = message_json["mail"]
1✔
1218
    try:
1✔
1219
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1220
    except ReplyHeadersNotFound:
1✔
1221
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1222
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1223

1224
    try:
1✔
1225
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1226
    except Reply.DoesNotExist:
1✔
1227
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1228
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1229

1230
    address = reply_record.address
1✔
1231
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1232
    decrypted_metadata = json.loads(
1✔
1233
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1234
    )
1235
    if not _reply_allowed(
1✔
1236
        from_address, to_address, reply_record, message_id, decrypted_metadata
1237
    ):
1238
        glean_logger().log_email_blocked(
1✔
1239
            mask=address, is_reply=True, reason="reply_requires_premium"
1240
        )
1241
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1242

1243
    outbound_from_address = address.full_address
1✔
1244
    incr_if_enabled("reply_email", 1)
1✔
1245
    subject = mail["commonHeaders"].get("subject", "")
1✔
1246
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1247
    headers: OutgoingHeaders = {
1✔
1248
        "Subject": subject,
1249
        "From": outbound_from_address,
1250
        "To": to_address,
1251
        "Reply-To": outbound_from_address,
1252
    }
1253

1254
    try:
1✔
1255
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1256
    except ClientError as e:
1✔
1257
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1258
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1259
            glean_logger().log_email_blocked(
1✔
1260
                mask=address, reason="content_missing", is_reply=True
1261
            )
1262
            return HttpResponse("Email not in S3", status=404)
1✔
1263
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1264
        glean_logger().log_email_blocked(
1✔
1265
            mask=address, reason="error_storage", is_reply=True, can_retry=True
1266
        )
1267
        # we are returning a 500 so that SNS can retry the email processing
1268
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1269

1270
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1271
    assert isinstance(email, EmailMessage)
1✔
1272

1273
    # Convert to a reply email
1274
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1275
    _replace_headers(email, headers)
1✔
1276

1277
    try:
1✔
1278
        ses_send_raw_email(
1✔
1279
            source_address=outbound_from_address,
1280
            destination_address=to_address,
1281
            message=email,
1282
        )
1283
    except ClientError:
1✔
1284
        glean_logger().log_email_blocked(
1✔
1285
            mask=address, reason="error_sending", is_reply=True
1286
        )
1287
        return HttpResponse("SES client error", status=400)
1✔
1288

1289
    reply_record.increment_num_replied()
1✔
1290
    profile = address.user.profile
1✔
1291
    profile.update_abuse_metric(replied=True)
1✔
1292
    profile.last_engagement = datetime.now(timezone.utc)
1✔
1293
    profile.save()
1✔
1294
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1295
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1296

1297

1298
def _get_domain_address(local_portion: str, domain_portion: str) -> DomainAddress:
1✔
1299
    """
1300
    Find or create the DomainAddress for the parts of an email address.
1301

1302
    If the domain_portion is for a valid subdomain, a new DomainAddress
1303
    will be created and returned.
1304

1305
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1306

1307
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1308
    """
1309

1310
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1311
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1312
        incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1313
        raise ObjectDoesNotExist("Address does not exist")
1✔
1314
    try:
1✔
1315
        with transaction.atomic():
1✔
1316
            locked_profile = Profile.objects.select_for_update().get(
1✔
1317
                subdomain=address_subdomain
1318
            )
1319
            domain_numerical = get_domain_numerical(address_domain)
1✔
1320
            # filter DomainAddress because it may not exist
1321
            # which will throw an error with get()
1322
            domain_address = DomainAddress.objects.filter(
1✔
1323
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1324
            ).first()
1325
            if domain_address is None:
1✔
1326
                # TODO: Consider flows when a user generating alias on a fly
1327
                # was unable to receive an email due to user no longer being a
1328
                # premium user as seen in exception thrown on make_domain_address
1329
                domain_address = DomainAddress.make_domain_address(
1✔
1330
                    locked_profile, local_portion, True
1331
                )
1332
                glean_logger().log_email_mask_created(
1✔
1333
                    mask=domain_address,
1334
                    created_by_api=False,
1335
                )
1336
            domain_address.last_used_at = datetime.now(timezone.utc)
1✔
1337
            domain_address.save()
1✔
1338
            return domain_address
1✔
1339
    except Profile.DoesNotExist as e:
1✔
1340
        incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1341
        raise e
1✔
1342

1343

1344
def _get_address(address: str) -> RelayAddress | DomainAddress:
1✔
1345
    """
1346
    Find or create the RelayAddress or DomainAddress for an email address.
1347

1348
    If an unknown email address is for a valid subdomain, a new DomainAddress
1349
    will be created.
1350

1351
    On failure, raises exception based on Django's ObjectDoesNotExist:
1352
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1353
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1354
    * ObjectDoesNotExist - Unknown domain
1355
    """
1356

1357
    local_portion, domain_portion = address.split("@")
1✔
1358
    local_address = local_portion.lower()
1✔
1359
    domain = domain_portion.lower()
1✔
1360

1361
    # if the domain is not the site's 'top' relay domain,
1362
    # it may be for a user's subdomain
1363
    email_domains = get_domains_from_settings().values()
1✔
1364
    if domain not in email_domains:
1✔
1365
        return _get_domain_address(local_address, domain)
1✔
1366

1367
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1368
    try:
1✔
1369
        domain_numerical = get_domain_numerical(domain)
1✔
1370
        relay_address = RelayAddress.objects.get(
1✔
1371
            address=local_address, domain=domain_numerical
1372
        )
1373
        return relay_address
1✔
1374
    except RelayAddress.DoesNotExist as e:
1✔
1375
        try:
1✔
1376
            DeletedAddress.objects.get(
1✔
1377
                address_hash=address_hash(local_address, domain=domain)
1378
            )
1379
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1380
            # TODO: create a hard bounce receipt rule in SES
1381
        except DeletedAddress.DoesNotExist:
1✔
1382
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1383
        except DeletedAddress.MultipleObjectsReturned:
1✔
1384
            # not sure why this happens on stage but let's handle it
1385
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1386
        raise e
1✔
1387

1388

1389
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1390
    """
1391
    Handle an AWS SES bounce notification.
1392

1393
    For more information, see:
1394
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1395

1396
    Returns:
1397
    * 404 response if any email address does not match a user,
1398
    * 200 response if all match or none are given
1399

1400
    Emits a counter metric "email_bounce" with these tags:
1401
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1402
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1403
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1404
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1405

1406
    Emits an info log "bounce_notification", same data as metric, plus:
1407
    * bounce_action: 'action' from bounced recipient data, or None
1408
    * bounce_status: 'status' from bounced recipient data, or None
1409
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1410
    * bounce_extra: Extra data from bounce_recipient data, if any
1411
    * domain: User's real email address domain, if an address was given
1412

1413
    Emits a legacy log "bounced recipient domain: {domain}", with data from
1414
    bounced recipient data, without the email address.
1415
    """
1416
    bounce = message_json.get("bounce", {})
1✔
1417
    bounce_type = bounce.get("bounceType", "none")
1✔
1418
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1419
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1420

1421
    now = datetime.now(timezone.utc)
1✔
1422
    bounce_data = []
1✔
1423
    for recipient in bounced_recipients:
1✔
1424
        recipient_address = recipient.pop("emailAddress", None)
1✔
1425
        data = {
1✔
1426
            "bounce_type": bounce_type,
1427
            "bounce_subtype": bounce_subtype,
1428
            "bounce_action": recipient.pop("action", ""),
1429
            "bounce_status": recipient.pop("status", ""),
1430
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1431
            "user_match": "no_address",
1432
            "relay_action": "no_action",
1433
        }
1434
        if recipient:
1!
UNCOV
1435
            data["bounce_extra"] = recipient.copy()
×
1436
        bounce_data.append(data)
1✔
1437

1438
        if recipient_address is None:
1!
UNCOV
1439
            continue
×
1440

1441
        recipient_address = parseaddr(recipient_address)[1]
1✔
1442
        recipient_domain = recipient_address.split("@")[1]
1✔
1443
        data["domain"] = recipient_domain
1✔
1444

1445
        try:
1✔
1446
            user = User.objects.get(email=recipient_address)
1✔
1447
            profile = user.profile
1✔
1448
            data["user_match"] = "found"
1✔
1449
        except User.DoesNotExist:
1✔
1450
            # TODO: handle bounce for a user who no longer exists
1451
            # add to SES account-wide suppression list?
1452
            data["user_match"] = "missing"
1✔
1453
            continue
1✔
1454

1455
        action = None
1✔
1456
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1457
            # if an email bounced as spam, set to auto block spam for this user
1458
            # and DON'T set them into bounce pause state
1459
            action = "auto_block_spam"
1✔
1460
            profile.auto_block_spam = True
1✔
1461
        elif bounce_type == "Permanent":
1✔
1462
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1463
            action = "hard_bounce"
1✔
1464
            profile.last_hard_bounce = now
1✔
1465
        elif bounce_type == "Transient":
1!
1466
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1467
            action = "soft_bounce"
1✔
1468
            profile.last_soft_bounce = now
1✔
1469
        if action:
1!
1470
            data["relay_action"] = action
1✔
1471
            profile.save()
1✔
1472

1473
    if not bounce_data:
1!
1474
        # Data when there are no identified recipients
UNCOV
1475
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1476

1477
    for data in bounce_data:
1✔
1478
        tags = {
1✔
1479
            "bounce_type": bounce_type,
1480
            "bounce_subtype": bounce_subtype,
1481
            "user_match": data["user_match"],
1482
            "relay_action": data["relay_action"],
1483
        }
1484
        incr_if_enabled(
1✔
1485
            "email_bounce",
1486
            1,
1487
            tags=[generate_tag(key, val) for key, val in tags.items()],
1488
        )
1489
        info_logger.info("bounce_notification", extra=data)
1✔
1490

1491
        # Legacy log, can be removed Q4 2023
1492
        recipient_domain = data.get("domain")
1✔
1493
        if recipient_domain:
1!
1494
            legacy_extra = {
1✔
1495
                "action": data.get("bounce_action"),
1496
                "status": data.get("bounce_status"),
1497
                "diagnosticCode": data.get("bounce_diagnostic"),
1498
            }
1499
            legacy_extra.update(data.get("bounce_extra", {}))
1✔
1500
            info_logger.info(
1✔
1501
                f"bounced recipient domain: {recipient_domain}", extra=legacy_extra
1502
            )
1503

1504
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1505
        return HttpResponse("Address does not exist", status=404)
1✔
1506
    return HttpResponse("OK", status=200)
1✔
1507

1508

1509
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1510
    """
1511
    Handle an AWS SES complaint notification.
1512

1513
    For more information, see:
1514
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1515

1516
    Returns:
1517
    * 404 response if any email address does not match a user,
1518
    * 200 response if all match or none are given
1519

1520
    Emits a counter metric "email_complaint" with these tags:
1521
    * complaint_subtype: 'onaccounsuppressionlist', or 'none' if omitted
1522
    * complaint_feedback - feedback enumeration from ISP or 'none'
1523
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1524
    * relay_action: 'no_action', 'auto_block_spam'
1525

1526
    Emits an info log "complaint_notification", same data as metric, plus:
1527
    * complaint_user_agent - identifies the client used to file the complaint
1528
    * complaint_extra - Extra data from complainedRecipients data, if any
1529
    * domain - User's domain, if an address was given
1530

1531
    Emits a legacy log "complaint_received", with data:
1532
    * recipient_domains: list of extracted user domains
1533
    * subtype: 'onaccounsuppressionlist', or 'none'
1534
    * feedback: feedback from ISP or 'none'
1535
    """
1536
    complaint = deepcopy(message_json.get("complaint", {}))
1✔
1537
    complained_recipients = complaint.pop("complainedRecipients", [])
1✔
1538
    subtype = complaint.pop("complaintSubType", None)
1✔
1539
    user_agent = complaint.pop("userAgent", None)
1✔
1540
    feedback = complaint.pop("complaintFeedbackType", None)
1✔
1541

1542
    complaint_data = []
1✔
1543
    for recipient in complained_recipients:
1✔
1544
        recipient_address = recipient.pop("emailAddress", None)
1✔
1545
        data = {
1✔
1546
            "complaint_subtype": subtype,
1547
            "complaint_user_agent": user_agent,
1548
            "complaint_feedback": feedback,
1549
            "user_match": "no_address",
1550
            "relay_action": "no_action",
1551
        }
1552
        if recipient:
1!
UNCOV
1553
            data["complaint_extra"] = recipient.copy()
×
1554
        complaint_data.append(data)
1✔
1555

1556
        if recipient_address is None:
1!
UNCOV
1557
            continue
×
1558

1559
        recipient_address = parseaddr(recipient_address)[1]
1✔
1560
        recipient_domain = recipient_address.split("@")[1]
1✔
1561
        data["domain"] = recipient_domain
1✔
1562

1563
        try:
1✔
1564
            user = User.objects.get(email=recipient_address)
1✔
1565
            profile = user.profile
1✔
1566
            data["user_match"] = "found"
1✔
UNCOV
1567
        except User.DoesNotExist:
×
UNCOV
1568
            data["user_match"] = "missing"
×
UNCOV
1569
            continue
×
1570

1571
        data["relay_action"] = "auto_block_spam"
1✔
1572
        profile.auto_block_spam = True
1✔
1573
        profile.save()
1✔
1574

1575
    if not complaint_data:
1!
1576
        # Data when there are no identified recipients
UNCOV
1577
        complaint_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1578

1579
    for data in complaint_data:
1✔
1580
        tags = {
1✔
1581
            "complaint_subtype": subtype or "none",
1582
            "complaint_feedback": feedback or "none",
1583
            "user_match": data["user_match"],
1584
            "relay_action": data["relay_action"],
1585
        }
1586
        incr_if_enabled(
1✔
1587
            "email_complaint",
1588
            1,
1589
            tags=[generate_tag(key, val) for key, val in tags.items()],
1590
        )
1591
        info_logger.info("complaint_notification", extra=data)
1✔
1592

1593
    # Legacy log, can be removed Q4 2023
1594
    domains = [data["domain"] for data in complaint_data if "domain" in data]
1✔
1595
    info_logger.info(
1✔
1596
        "complaint_received",
1597
        extra={
1598
            "recipient_domains": sorted(domains),
1599
            "subtype": subtype,
1600
            "feedback": feedback,
1601
        },
1602
    )
1603

1604
    if any(data["user_match"] == "missing" for data in complaint_data):
1!
UNCOV
1605
        return HttpResponse("Address does not exist", status=404)
×
1606
    return HttpResponse("OK", status=200)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc