• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 3bc340e2-329f-4ed2-8700-adaaac8d78c8

15 Dec 2023 06:50PM CUT coverage: 73.514% (-0.1%) from 73.614%
3bc340e2-329f-4ed2-8700-adaaac8d78c8

push

circleci

jwhitlock
Use branch database with production tests

Previously, migrations tests were run with production code, branch
requirements, and branch migrations. Now they run with production
requirements, so that third-party migrations are tested as well.

This uses pytest --reuse-db to create a test database with the branch's
migrations, and then a pip install with the production code. This more
closely emulates the mixed environment during a deploy.

1962 of 2913 branches covered (0.0%)

Branch coverage included in aggregate %.

6273 of 8289 relevant lines covered (75.68%)

19.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.21
/emails/views.py
1
from collections import defaultdict
1✔
2
from copy import deepcopy
1✔
3
from datetime import datetime, timezone
1✔
4
from email import message_from_bytes
1✔
5
from email.iterators import _structure
1✔
6
from email.message import EmailMessage
1✔
7
from email.utils import parseaddr
1✔
8
import html
1✔
9
from io import StringIO
1✔
10
import json
1✔
11
from json import JSONDecodeError
1✔
12
import logging
1✔
13
import re
1✔
14
import shlex
1✔
15
from textwrap import dedent
1✔
16
from typing import Any, Literal, Optional
1✔
17
from urllib.parse import urlencode
1✔
18

19
from botocore.exceptions import ClientError
1✔
20
from codetiming import Timer
1✔
21
from decouple import strtobool
1✔
22
from django.shortcuts import render
1✔
23
from sentry_sdk import capture_message
1✔
24
from markus.utils import generate_tag
1✔
25
from waffle import sample_is_active
1✔
26

27
from django.conf import settings
1✔
28
from django.contrib.auth.models import User
1✔
29
from django.core.exceptions import ObjectDoesNotExist
1✔
30
from django.db import transaction
1✔
31
from django.db.models import prefetch_related_objects
1✔
32
from django.http import HttpRequest, HttpResponse
1✔
33
from django.template.loader import render_to_string
1✔
34
from django.utils.html import escape
1✔
35
from django.views.decorators.csrf import csrf_exempt
1✔
36

37
from privaterelay.utils import get_subplat_upgrade_link_by_language
1✔
38

39

40
from .models import (
1✔
41
    CannotMakeAddressException,
42
    DeletedAddress,
43
    DomainAddress,
44
    Profile,
45
    RelayAddress,
46
    Reply,
47
    address_hash,
48
    get_domain_numerical,
49
    get_domains_from_settings,
50
)
51
from .policy import relay_policy
1✔
52
from .types import (
1✔
53
    AWS_SNSMessageJSON,
54
    OutgoingHeaders,
55
    EmailForwardingIssues,
56
    EmailHeaderIssues,
57
)
58
from .utils import (
1✔
59
    _get_bucket_and_key_from_s3_json,
60
    _store_reply_record,
61
    b64_lookup_key,
62
    count_all_trackers,
63
    decrypt_reply_metadata,
64
    derive_reply_keys,
65
    generate_from_header,
66
    get_message_content_from_s3,
67
    get_message_id_bytes,
68
    get_reply_to_address,
69
    histogram_if_enabled,
70
    incr_if_enabled,
71
    remove_message_from_s3,
72
    remove_trackers,
73
    ses_send_raw_email,
74
    urlize_and_linebreaks,
75
    InvalidFromHeader,
76
    parse_email_header,
77
)
78
from .sns import verify_from_sns, SUPPORTED_SNS_TYPES
1✔
79

80
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
81
from privaterelay.utils import flag_is_active_in_task
1✔
82

83
logger = logging.getLogger("events")
1✔
84
info_logger = logging.getLogger("eventsinfo")
1✔
85

86

87
class ReplyHeadersNotFound(Exception):
1✔
88
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
89
        self.message = message
1✔
90

91

92
def first_time_user_test(request):
1✔
93
    """
94
    Demonstrate rendering of the "First time Relay user" email.
95
    Settings like language can be given in the querystring, otherwise settings
96
    come from a random free profile.
97
    """
98
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
99
    email_context = {
×
100
        "in_bundle_country": in_bundle_country,
101
        "SITE_ORIGIN": settings.SITE_ORIGIN,
102
    }
103
    if request.GET.get("format", "html") == "text":
×
104
        return render(
×
105
            request,
106
            "emails/first_time_user.txt",
107
            email_context,
108
            "text/plain; charset=utf-8",
109
        )
110
    return render(request, "emails/first_time_user.html", email_context)
×
111

112

113
def reply_requires_premium_test(request):
1✔
114
    """
115
    Demonstrate rendering of the "Reply requires premium" email.
116

117
    Settings like language can be given in the querystring, otherwise settings
118
    come from a random free profile.
119
    """
120
    email_context = {
1✔
121
        "sender": "test@example.com",
122
        "forwarded": True,
123
        "SITE_ORIGIN": settings.SITE_ORIGIN,
124
    }
125
    for param in request.GET:
1✔
126
        email_context[param] = request.GET.get(param)
1✔
127
        if param == "forwarded" and request.GET[param] == "True":
1✔
128
            email_context[param] = True
1✔
129

130
    for param in request.GET:
1✔
131
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
132
            return render(
1✔
133
                request,
134
                "emails/reply_requires_premium.txt",
135
                email_context,
136
                "text/plain; charset=utf-8",
137
            )
138
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
139

140

141
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
142
    # TO DO: Update with correct context when trigger is created
143
    first_forwarded_email_html = render_to_string(
×
144
        "emails/first_forwarded_email.html",
145
        {
146
            "SITE_ORIGIN": settings.SITE_ORIGIN,
147
        },
148
    )
149

150
    wrapped_email = wrap_html_email(
×
151
        first_forwarded_email_html,
152
        "en-us",
153
        True,
154
        "test@example.com",
155
        0,
156
    )
157

158
    return HttpResponse(wrapped_email)
×
159

160

161
def wrap_html_email(
1✔
162
    original_html: str,
163
    language: str,
164
    has_premium: bool,
165
    display_email: str,
166
    num_level_one_email_trackers_removed: int | None = None,
167
    tracker_report_link: str | None = None,
168
) -> str:
169
    """Add Relay banners, surveys, etc. to an HTML email"""
170
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
171
    email_context = {
1✔
172
        "original_html": original_html,
173
        "language": language,
174
        "has_premium": has_premium,
175
        "subplat_upgrade_link": subplat_upgrade_link,
176
        "display_email": display_email,
177
        "tracker_report_link": tracker_report_link,
178
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
179
        "SITE_ORIGIN": settings.SITE_ORIGIN,
180
    }
181
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
182
    # Remove empty lines
183
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
184
    return "\n".join(content_lines) + "\n"
1✔
185

186

187
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
188
    """
189
    Demonstrate rendering of forwarded HTML emails.
190

191
    Settings like language can be given in the querystring, otherwise settings
192
    come from a randomly chosen profile.
193
    """
194

195
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
196
        user_profile = None
1✔
197
    else:
198
        user_profile = Profile.objects.order_by("?").first()
1✔
199

200
    if "language" in request.GET:
1✔
201
        language = request.GET["language"]
1✔
202
    else:
203
        assert user_profile is not None
1✔
204
        language = user_profile.language
1✔
205

206
    if "has_premium" in request.GET:
1✔
207
        has_premium = strtobool(request.GET["has_premium"])
1✔
208
    else:
209
        assert user_profile is not None
1✔
210
        has_premium = user_profile.has_premium
1✔
211

212
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
213
        num_level_one_email_trackers_removed = int(
1✔
214
            request.GET["num_level_one_email_trackers_removed"]
215
        )
216
    else:
217
        num_level_one_email_trackers_removed = 0
1✔
218

219
    if "has_tracker_report_link" in request.GET:
1✔
220
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
221
    else:
222
        has_tracker_report_link = False
1✔
223
    if has_tracker_report_link:
1✔
224
        if num_level_one_email_trackers_removed:
1✔
225
            trackers = {
1✔
226
                "fake-tracker.example.com": num_level_one_email_trackers_removed
227
            }
228
        else:
229
            trackers = {}
1✔
230
        tracker_report_link = (
1✔
231
            "/tracker-report/#{"
232
            '"sender": "sender@example.com", '
233
            '"received_at": 1658434657, '
234
            f'"trackers": { json.dumps(trackers) }'
235
            "}"
236
        )
237
    else:
238
        tracker_report_link = ""
1✔
239

240
    path = "/emails/wrapped_email_test"
1✔
241
    old_query = {
1✔
242
        "language": language,
243
        "has_premium": "Yes" if has_premium else "No",
244
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
245
        "num_level_one_email_trackers_removed": str(
246
            num_level_one_email_trackers_removed
247
        ),
248
    }
249

250
    def switch_link(key, value):
1✔
251
        if old_query[key] == value:
1✔
252
            return str(value)
1✔
253
        new_query = old_query.copy()
1✔
254
        new_query[key] = value
1✔
255
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
256

257
    html_content = dedent(
1✔
258
        f"""\
259
    <p>
260
      <strong>Email rendering Test</strong>
261
    </p>
262
    <p>Settings: (<a href="{path}">clear all</a>)</p>
263
    <ul>
264
      <li>
265
        <strong>language</strong>:
266
        {escape(language)}
267
        (switch to
268
        {switch_link("language", "en-us")},
269
        {switch_link("language", "de")},
270
        {switch_link("language", "en-gb")},
271
        {switch_link("language", "fr")},
272
        {switch_link("language", "ru-ru")},
273
        {switch_link("language", "es-es")},
274
        {switch_link("language", "pt-br")},
275
        {switch_link("language", "it-it")},
276
        {switch_link("language", "en-ca")},
277
        {switch_link("language", "de-de")},
278
        {switch_link("language", "es-mx")})
279
      </li>
280
      <li>
281
        <strong>has_premium</strong>:
282
        {"Yes" if has_premium else "No"}
283
        (switch to
284
        {switch_link("has_premium", "Yes")},
285
        {switch_link("has_premium", "No")})
286
      </li>
287
      <li>
288
        <strong>has_tracker_report_link</strong>:
289
        {"Yes" if has_tracker_report_link else "No"}
290
        (switch to
291
        {switch_link("has_tracker_report_link", "Yes")},
292
        {switch_link("has_tracker_report_link", "No")})
293
      </li>
294
      <li>
295
        <strong>num_level_one_email_trackers_removed</strong>:
296
        {num_level_one_email_trackers_removed}
297
        (switch to
298
        {switch_link("num_level_one_email_trackers_removed", "0")},
299
        {switch_link("num_level_one_email_trackers_removed", "1")},
300
        {switch_link("num_level_one_email_trackers_removed", "2")})
301
      </li>
302
    </ul>
303
    """
304
    )
305

306
    wrapped_email = wrap_html_email(
1✔
307
        original_html=html_content,
308
        language=language,
309
        has_premium=has_premium,
310
        tracker_report_link=tracker_report_link,
311
        display_email="test@relay.firefox.com",
312
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
313
    )
314
    return HttpResponse(wrapped_email)
1✔
315

316

317
@csrf_exempt
1✔
318
def sns_inbound(request):
1✔
319
    incr_if_enabled("sns_inbound", 1)
1✔
320
    # First thing we do is verify the signature
321
    json_body = json.loads(request.body)
1✔
322
    verified_json_body = verify_from_sns(json_body)
1✔
323

324
    # Validate ARN and message type
325
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
326
    message_type = verified_json_body.get("Type", None)
1✔
327
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
328
    if error_details:
1✔
329
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
330
        return HttpResponse(error_details["error"], status=400)
1✔
331

332
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
333

334

335
def validate_sns_arn_and_type(
1✔
336
    topic_arn: str, message_type: str
337
) -> Optional[dict[str, Any]]:
338
    """
339
    Validate Topic ARN and SNS Message Type.
340

341
    If an error is detected, the return is a dictionary of error details.
342
    If no error is detected, the return is None.
343
    """
344
    if not topic_arn:
1✔
345
        error = "Received SNS request without Topic ARN."
1✔
346
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
347
        error = "Received SNS message for wrong topic."
1✔
348
    elif not message_type:
1✔
349
        error = "Received SNS request without Message Type."
1✔
350
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
351
        error = "Received SNS message for unsupported Type."
1✔
352
    else:
353
        error = None
1✔
354

355
    if error:
1✔
356
        return {
1✔
357
            "error": error,
358
            "received_topic_arn": shlex.quote(topic_arn),
359
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
360
            "received_sns_type": shlex.quote(message_type),
361
            "supported_sns_types": SUPPORTED_SNS_TYPES,
362
        }
363
    return None
1✔
364

365

366
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
367
    if message_type == "SubscriptionConfirmation":
×
368
        info_logger.info(
×
369
            "SNS SubscriptionConfirmation",
370
            extra={"SubscribeURL": json_body["SubscribeURL"]},
371
        )
372
        return HttpResponse("Logged SubscribeURL", status=200)
×
373
    if message_type == "Notification":
×
374
        incr_if_enabled("sns_inbound_Notification", 1)
×
375
        return _sns_notification(json_body)
×
376

377
    logger.error(
×
378
        "SNS message type did not fall under the SNS inbound logic",
379
        extra={"message_type": shlex.quote(message_type)},
380
    )
381
    capture_message(
×
382
        "Received SNS message with type not handled in inbound log",
383
        level="error",
384
        stack=True,
385
    )
386
    return HttpResponse(
×
387
        "Received SNS message with type not handled in inbound log", status=400
388
    )
389

390

391
def _sns_notification(json_body):
1✔
392
    try:
1✔
393
        message_json = json.loads(json_body["Message"])
1✔
394
    except JSONDecodeError:
1✔
395
        logger.error(
1✔
396
            "SNS notification has non-JSON message body",
397
            extra={"content": shlex.quote(json_body["Message"])},
398
        )
399
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
400

401
    event_type = message_json.get("eventType")
1✔
402
    notification_type = message_json.get("notificationType")
1✔
403
    if notification_type not in {
1✔
404
        "Complaint",
405
        "Received",
406
        "Bounce",
407
    } and event_type not in {"Complaint", "Bounce"}:
408
        logger.error(
1✔
409
            "SNS notification for unsupported type",
410
            extra={
411
                "notification_type": shlex.quote(notification_type),
412
                "event_type": shlex.quote(event_type),
413
                "keys": [shlex.quote(key) for key in message_json.keys()],
414
            },
415
        )
416
        return HttpResponse(
1✔
417
            "Received SNS notification for unsupported Type: %s"
418
            % html.escape(shlex.quote(notification_type)),
419
            status=400,
420
        )
421
    response = _sns_message(message_json)
1✔
422
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
423
    if response.status_code < 500:
1✔
424
        remove_message_from_s3(bucket, object_key)
1✔
425

426
    return response
1✔
427

428

429
def _get_recipient_with_relay_domain(recipients):
1✔
430
    domains_to_check = get_domains_from_settings().values()
1✔
431
    for recipient in recipients:
1✔
432
        for domain in domains_to_check:
1✔
433
            if domain in recipient:
1✔
434
                return recipient
1✔
435
    return None
1✔
436

437

438
def _get_relay_recipient_from_message_json(message_json):
1✔
439
    # Go thru all To, Cc, and Bcc fields and
440
    # return the one that has a Relay domain
441

442
    # First check commmon headers for to or cc match
443
    headers_to_check = "to", "cc"
1✔
444
    common_headers = message_json["mail"]["commonHeaders"]
1✔
445
    for header in headers_to_check:
1✔
446
        if header in common_headers:
1✔
447
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
448
            if recipient is not None:
1✔
449
                return parseaddr(recipient)[1]
1✔
450

451
    # SES-SNS sends bcc in a different part of the message
452
    recipients = message_json["receipt"]["recipients"]
1✔
453
    return _get_recipient_with_relay_domain(recipients)
1✔
454

455

456
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
457
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
458
    notification_type = message_json.get("notificationType")
1✔
459
    event_type = message_json.get("eventType")
1✔
460
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
461
        return _handle_bounce(message_json)
1✔
462
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
463
        return _handle_complaint(message_json)
1✔
464
    assert notification_type == "Received" and event_type is None
1✔
465
    return _handle_received(message_json)
1✔
466

467

468
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
469
    """
470
    Handle an AWS SES received notification.
471

472
    For more information, see:
473
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
474
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
475

476
    Returns (may be incomplete):
477
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
478
      flagged for abuse, the email is under a bounce pause, the email was suppressed
479
      for spam, the list email was blocked, or the noreply address was the recipient.
480
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
481
      the email failed DMARC with reject policy, or the email is a reply chain to a
482
      non-premium user.
483
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
484
      "CC", or "BCC" fields, or the Relay address is not in the database.
485
    * 503 if the "From" address is malformed, the S3 client returned an error different
486
      from "not found", or the SES client fails
487

488
    And many other returns conditions if the email is a reply. The HTTP returns are an
489
    artifact from an earlier time when emails were sent to a webhook. Currently,
490
    production instead pulls events from a queue.
491

492
    TODO: Return a more appropriate status object
493
    TODO: Document the metrics emitted
494
    """
495
    mail = message_json["mail"]
1✔
496
    if "commonHeaders" not in mail:
1✔
497
        logger.error("SNS message without commonHeaders")
1✔
498
        return HttpResponse(
1✔
499
            "Received SNS notification without commonHeaders.", status=400
500
        )
501
    common_headers = mail["commonHeaders"]
1✔
502
    receipt = message_json["receipt"]
1✔
503

504
    _record_receipt_verdicts(receipt, "all")
1✔
505
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
506
    if to_address is None:
1✔
507
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
508
        return HttpResponse("Address does not exist", status=404)
1✔
509

510
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
511
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
512
    if not from_addresses:
1✔
513
        info_logger.error(
1✔
514
            "_handle_received: no from address",
515
            extra={
516
                "source": mail["source"],
517
                "common_headers_from": common_headers["from"],
518
            },
519
        )
520
        return HttpResponse("Unable to parse From address", status=400)
1✔
521
    from_address = from_addresses[0][1]
1✔
522

523
    try:
1✔
524
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
525
    except ValueError:
1✔
526
        # TODO: Add metric
527
        return HttpResponse("Malformed to field.", status=400)
1✔
528

529
    if to_local_portion.lower() == "noreply":
1✔
530
        incr_if_enabled("email_for_noreply_address", 1)
1✔
531
        return HttpResponse("noreply address is not supported.")
1✔
532
    try:
1✔
533
        # FIXME: this ambiguous return of either
534
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
535
        # up a bit.
536
        address = _get_address(to_address)
1✔
537
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
538
        user_profile = address.user.profile
1✔
539
    except (
1✔
540
        ObjectDoesNotExist,
541
        CannotMakeAddressException,
542
        DeletedAddress.MultipleObjectsReturned,
543
    ):
544
        if to_local_portion.lower() == "replies":
1✔
545
            response = _handle_reply(from_address, message_json, to_address)
1✔
546
        else:
547
            response = HttpResponse("Address does not exist", status=404)
1✔
548
        return response
1✔
549

550
    _record_receipt_verdicts(receipt, "valid_user")
1✔
551
    # if this is spam and the user is set to auto-block spam, early return
552
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
553
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
554
        return HttpResponse("Address rejects spam.")
1✔
555

556
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
557
        policy = receipt.get("dmarcPolicy", "none")
1✔
558
        # TODO: determine action on dmarcPolicy "quarantine"
559
        if policy == "reject":
1!
560
            incr_if_enabled(
1✔
561
                "email_suppressed_for_dmarc_failure",
562
                1,
563
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
564
            )
565
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
566

567
    # if this user is over bounce limits, early return
568
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
569
    if bounce_paused:
1✔
570
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
571
        incr_if_enabled("email_suppressed_for_%s_bounce" % bounce_type, 1)
1✔
572
        return HttpResponse("Address is temporarily disabled.")
1✔
573

574
    # check if this is a reply from an external sender to a Relay user
575
    try:
1✔
576
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
577
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
578
        address = reply_record.address
1✔
579
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
580
        # make sure the relay user is premium
581
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
582
            # TODO: Add metrics
583
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
584
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
585
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
586
        # match a Reply record, continue to treat this as a regular email from
587
        # an external sender to a relay user
588
        pass
1✔
589

590
    # if account flagged for abuse, early return
591
    if user_profile.is_flagged:
1!
592
        return HttpResponse("Address is temporarily disabled.")
×
593

594
    # if address is set to block, early return
595
    if not address.enabled:
1✔
596
        incr_if_enabled("email_for_disabled_address", 1)
1✔
597
        address.num_blocked += 1
1✔
598
        address.save(update_fields=["num_blocked"])
1✔
599
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
600
        # TODO: Add metrics
601
        return HttpResponse("Address is temporarily disabled.")
1✔
602

603
    _record_receipt_verdicts(receipt, "active_alias")
1✔
604
    incr_if_enabled("email_for_active_address", 1)
1✔
605

606
    # if address is blocking list emails, and email is from list, early return
607
    if (
1✔
608
        address
609
        and address.block_list_emails
610
        and user_profile.has_premium
611
        and _check_email_from_list(mail["headers"])
612
    ):
613
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
614
        address.num_blocked += 1
1✔
615
        address.save(update_fields=["num_blocked"])
1✔
616
        return HttpResponse("Address is not accepting list emails.")
1✔
617

618
    # Collect new headers
619
    subject = common_headers.get("subject", "")
1✔
620
    destination_address = user_profile.user.email
1✔
621
    reply_address = get_reply_to_address()
1✔
622
    try:
1✔
623
        from_header = generate_from_header(from_address, to_address)
1✔
624
    except InvalidFromHeader:
1✔
625
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
626
        header_from = []
1✔
627
        for header in mail["headers"]:
1✔
628
            if header["name"].lower() == "from":
1✔
629
                header_from.append(header)
1✔
630
        info_logger.error(
1✔
631
            "generate_from_header",
632
            extra={
633
                "from_address": from_address,
634
                "source": mail["source"],
635
                "common_headers_from": common_headers["from"],
636
                "headers_from": header_from,
637
            },
638
        )
639
        return HttpResponse("Cannot parse the From address", status=503)
1✔
640

641
    headers: OutgoingHeaders = {
1✔
642
        "Subject": subject,
643
        "From": from_header,
644
        "To": destination_address,
645
        "Reply-To": reply_address,
646
        "Resent-From": from_address,
647
    }
648

649
    # Get incoming email
650
    try:
1✔
651
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
652
    except ClientError as e:
1✔
653
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1!
654
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
×
655
            return HttpResponse("Email not in S3", status=404)
×
656
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
657
        # we are returning a 503 so that SNS can retry the email processing
658
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
659

660
    # Convert to new email
661
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
662
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
663
    remove_level_one_trackers = bool(
1✔
664
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
665
    )
666
    (
1✔
667
        forwarded_email,
668
        issues,
669
        level_one_trackers_removed,
670
        has_html,
671
        has_text,
672
    ) = _convert_to_forwarded_email(
673
        incoming_email_bytes=incoming_email_bytes,
674
        headers=headers,
675
        to_address=to_address,
676
        from_address=from_address,
677
        language=user_profile.language,
678
        has_premium=user_profile.has_premium,
679
        sample_trackers=sample_trackers,
680
        remove_level_one_trackers=remove_level_one_trackers,
681
    )
682
    if has_html:
1✔
683
        incr_if_enabled("email_with_html_content", 1)
1✔
684
    if has_text:
1!
685
        incr_if_enabled("email_with_text_content", 1)
1✔
686
    if issues:
1✔
687
        info_logger.warning(
1✔
688
            "_handle_received: forwarding issues", extra={"issues": issues}
689
        )
690

691
    # Send new email
692
    try:
1✔
693
        ses_response = ses_send_raw_email(
1✔
694
            source_address=reply_address,
695
            destination_address=destination_address,
696
            message=forwarded_email,
697
        )
698
    except ClientError:
1✔
699
        # 503 service unavailable reponse to SNS so it can retry
700
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
701

702
    message_id = ses_response["MessageId"]
1✔
703
    _store_reply_record(mail, message_id, address)
1✔
704

705
    user_profile.update_abuse_metric(
1✔
706
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
707
    )
708
    address.num_forwarded += 1
1✔
709
    address.last_used_at = datetime.now(timezone.utc)
1✔
710
    if level_one_trackers_removed:
1!
711
        address.num_level_one_trackers_blocked = (
×
712
            address.num_level_one_trackers_blocked or 0
713
        ) + level_one_trackers_removed
714
    address.save(
1✔
715
        update_fields=[
716
            "num_forwarded",
717
            "last_used_at",
718
            "block_list_emails",
719
            "num_level_one_trackers_blocked",
720
        ]
721
    )
722
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
723

724

725
def _get_verdict(receipt, verdict_type):
1✔
726
    return receipt["%sVerdict" % verdict_type]["status"]
1✔
727

728

729
def _check_email_from_list(headers):
1✔
730
    for header in headers:
1!
731
        if header["name"].lower().startswith("list-"):
1!
732
            return True
1✔
733
    return False
×
734

735

736
def _record_receipt_verdicts(receipt, state):
1✔
737
    verdict_tags = []
1✔
738
    for key in sorted(receipt.keys()):
1✔
739
        if key.endswith("Verdict"):
1✔
740
            value = receipt[key]["status"]
1✔
741
            verdict_tags.append(f"{key}:{value}")
1✔
742
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
743
        elif key == "dmarcPolicy":
1✔
744
            value = receipt[key]
1✔
745
            verdict_tags.append(f"{key}:{value}")
1✔
746
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
747

748

749
def _get_message_id_from_headers(headers):
1✔
750
    message_id = None
1✔
751
    for header in headers:
1✔
752
        if header["name"].lower() == "message-id":
1✔
753
            message_id = header["value"]
1✔
754
    return message_id
1✔
755

756

757
def _get_keys_from_headers(headers):
1✔
758
    in_reply_to = None
1✔
759
    for header in headers:
1✔
760
        if header["name"].lower() == "in-reply-to":
1✔
761
            in_reply_to = header["value"]
1✔
762
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
763
            return derive_reply_keys(message_id_bytes)
1✔
764

765
        if header["name"].lower() == "references":
1✔
766
            message_ids = header["value"]
1✔
767
            for message_id in message_ids.split(" "):
1✔
768
                message_id_bytes = get_message_id_bytes(message_id)
1✔
769
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
770
                try:
1✔
771
                    # FIXME: calling code is likely to duplicate this query
772
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
773
                    return lookup_key, encryption_key
1✔
774
                except Reply.DoesNotExist:
1✔
775
                    pass
1✔
776
            raise Reply.DoesNotExist
1✔
777
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
778
    raise ReplyHeadersNotFound
1✔
779

780

781
def _get_reply_record_from_lookup_key(lookup_key):
1✔
782
    lookup = b64_lookup_key(lookup_key)
1✔
783
    return Reply.objects.get(lookup=lookup)
1✔
784

785

786
def _strip_localpart_tag(address):
1✔
787
    [localpart, domain] = address.split("@")
1✔
788
    subaddress_parts = localpart.split("+")
1✔
789
    return f"{subaddress_parts[0]}@{domain}"
1✔
790

791

792
_TransportType = Literal["sns", "s3"]
1✔
793

794

795
def _get_email_bytes(
1✔
796
    message_json: AWS_SNSMessageJSON,
797
) -> tuple[bytes, _TransportType, float]:
798
    with Timer(logger=None) as load_timer:
1✔
799
        if "content" in message_json:
1✔
800
            # email content in sns message
801
            message_content = message_json["content"].encode("utf-8")
1✔
802
            transport: Literal["sns", "s3"] = "sns"
1✔
803
        else:
804
            # assume email content in S3
805
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
806
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
807
            transport = "s3"
1✔
808
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
809
    load_time_s = round(load_timer.last, 3)
1✔
810
    return (message_content, transport, load_time_s)
1✔
811

812

813
def _convert_to_forwarded_email(
1✔
814
    incoming_email_bytes: bytes,
815
    headers: OutgoingHeaders,
816
    to_address: str,
817
    from_address: str,
818
    language: str,
819
    has_premium: bool,
820
    sample_trackers: bool,
821
    remove_level_one_trackers: bool,
822
    now: datetime | None = None,
823
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
824
    """
825
    Convert an email (as bytes) to a forwarded email.
826

827
    Return is a tuple:
828
    - email - The forwarded email
829
    - issues - Any detected issues in conversion
830
    - level_one_trackers_removed (int) - Number of trackers removed
831
    - has_html - True if the email has an HTML representation
832
    - has_text - True if the email has a plain text representation
833
    """
834
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
835
    # python/typeshed issue 2418
836
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
837
    # policy.default.message_factory is EmailMessage
838
    assert isinstance(email, EmailMessage)
1✔
839

840
    # Replace headers in the original email
841
    header_issues = _replace_headers(email, headers)
1✔
842

843
    # Find and replace text content
844
    text_body = email.get_body("plain")
1✔
845
    text_content = None
1✔
846
    has_text = False
1✔
847
    if text_body:
1!
848
        has_text = True
1✔
849
        assert isinstance(text_body, EmailMessage)
1✔
850
        text_content = text_body.get_content()
1✔
851
        new_text_content = _convert_text_content(text_content, to_address)
1✔
852
        text_body.set_content(new_text_content)
1✔
853

854
    # Find and replace HTML content
855
    html_body = email.get_body("html")
1✔
856
    level_one_trackers_removed = 0
1✔
857
    has_html = False
1✔
858
    if html_body:
1✔
859
        has_html = True
1✔
860
        assert isinstance(html_body, EmailMessage)
1✔
861
        html_content = html_body.get_content()
1✔
862
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
863
            html_content,
864
            to_address,
865
            from_address,
866
            language,
867
            has_premium,
868
            sample_trackers,
869
            remove_level_one_trackers,
870
        )
871
        html_body.set_content(new_content, subtype="html")
1✔
872
    elif text_content:
1!
873
        # Try to use the text content to generate HTML content
874
        html_content = urlize_and_linebreaks(text_content)
1✔
875
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
876
            html_content,
877
            to_address,
878
            from_address,
879
            language,
880
            has_premium,
881
            sample_trackers,
882
            remove_level_one_trackers,
883
        )
884
        assert isinstance(text_body, EmailMessage)
1✔
885
        try:
1✔
886
            text_body.add_alternative(new_content, subtype="html")
1✔
887
        except TypeError as e:
×
888
            out = StringIO()
×
889
            _structure(email, fp=out)
×
890
            info_logger.error(
×
891
                "Adding HTML alternate failed",
892
                extra={"exception": str(e), "structure": out.getvalue()},
893
            )
894

895
    issues: EmailForwardingIssues = {}
1✔
896
    if header_issues:
1✔
897
        issues["headers"] = header_issues
1✔
898
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
899

900

901
def _replace_headers(
1✔
902
    email: EmailMessage, headers: OutgoingHeaders
903
) -> EmailHeaderIssues:
904
    """
905
    Replace the headers in email with new headers.
906

907
    This replaces headers in the passed email object, rather than returns an altered
908
    copy. The primary reason is that the Python email package can read an email with
909
    non-compliant headers or content, but can't write it. A read/write is required to
910
    create a copy that we then alter. This code instead alters the passed EmailMessage
911
    object, making header-specific changes in try / except statements.
912

913
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
914
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
915
    nice to handle the non-compliant headers without crashing before we add a source of
916
    memory-related crashes.
917
    """
918
    # Look for headers to drop
919
    to_drop: list[str] = []
1✔
920
    replacements: set[str] = set(_k.lower() for _k in headers.keys())
1✔
921
    issues: EmailHeaderIssues = defaultdict(list)
1✔
922

923
    # Detect non-compliant headers in incoming emails
924
    for header in email.keys():
1✔
925
        try:
1✔
926
            value = email[header]
1✔
927
        except Exception as e:
1✔
928
            issues["incoming"].append((header, {"exception_on_read": repr(e)}))
1✔
929
            value = None
1✔
930
        if getattr(value, "defects", None):
1✔
931
            issues["incoming"].append(
1✔
932
                (
933
                    header,
934
                    {
935
                        "defect_count": len(value.defects),
936
                        "parsed_value": str(value),
937
                        "unstructured_value": str(value.as_unstructured),
938
                    },
939
                )
940
            )
941

942
    # Collect headers that will not be forwarded
943
    for header in email.keys():
1✔
944
        header_lower = header.lower()
1✔
945
        if (
1✔
946
            header_lower not in replacements
947
            and header_lower != "mime-version"
948
            and not header_lower.startswith("content-")
949
        ):
950
            to_drop.append(header)
1✔
951

952
    # Drop headers that should be dropped
953
    for header in to_drop:
1✔
954
        del email[header]
1✔
955

956
    # Replace the requested headers
957
    for header, value in headers.items():
1✔
958
        del email[header]
1✔
959
        try:
1✔
960
            email[header] = value
1✔
961
        except Exception as e:
×
962
            issues["outgoing"].append(
×
963
                (header, {"exception_on_write": repr(e), "value": value})
964
            )
965
            continue
×
966
        try:
1✔
967
            parsed_value = email[header]
1✔
968
        except Exception as e:
×
969
            issues["outgoing"].append((header, {"exception_on_read": repr(e)}))
×
970
            continue
×
971
        if parsed_value.defects:
1!
972
            issues["outgoing"].append(
×
973
                (
974
                    header,
975
                    {
976
                        "defect_count": len(parsed_value.defects),
977
                        "parsed_value": str(parsed_value),
978
                        "unstructured_value": str(parsed_value.as_unstructured),
979
                    },
980
                )
981
            )
982

983
    return dict(issues)
1✔
984

985

986
def _convert_html_content(
1✔
987
    html_content: str,
988
    to_address: str,
989
    from_address: str,
990
    language: str,
991
    has_premium: bool,
992
    sample_trackers: bool,
993
    remove_level_one_trackers: bool,
994
    now: datetime | None = None,
995
) -> tuple[str, int]:
996
    # frontend expects a timestamp in milliseconds
997
    now = now or datetime.now(timezone.utc)
1✔
998
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
999

1000
    # scramble alias so that clients don't recognize it
1001
    # and apply default link styles
1002
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1003

1004
    # sample tracker numbers
1005
    if sample_trackers:
1!
1006
        count_all_trackers(html_content)
×
1007

1008
    tracker_report_link = ""
1✔
1009
    removed_count = 0
1✔
1010
    if remove_level_one_trackers:
1!
1011
        html_content, tracker_details = remove_trackers(
×
1012
            html_content, from_address, datetime_now_ms
1013
        )
1014
        removed_count = tracker_details["tracker_removed"]
×
1015
        tracker_report_details = {
×
1016
            "sender": from_address,
1017
            "received_at": datetime_now_ms,
1018
            "trackers": tracker_details["level_one"]["trackers"],
1019
        }
1020
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1021
            tracker_report_details
1022
        )
1023

1024
    wrapped_html = wrap_html_email(
1✔
1025
        original_html=html_content,
1026
        language=language,
1027
        has_premium=has_premium,
1028
        display_email=display_email,
1029
        tracker_report_link=tracker_report_link,
1030
        num_level_one_email_trackers_removed=removed_count,
1031
    )
1032
    return wrapped_html, removed_count
1✔
1033

1034

1035
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1036
    relay_header_text = (
1✔
1037
        "This email was sent to your alias "
1038
        f"{to_address}. To stop receiving emails sent to this alias, "
1039
        "update the forwarding settings in your dashboard.\n"
1040
        "---Begin Email---\n"
1041
    )
1042
    wrapped_text = relay_header_text + text_content
1✔
1043
    return wrapped_text
1✔
1044

1045

1046
def _build_reply_requires_premium_email(
1✔
1047
    from_address: str,
1048
    reply_record: Reply,
1049
    message_id: str | None,
1050
    decrypted_metadata: dict[str, Any] | None,
1051
) -> EmailMessage:
1052
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1053
    # will forward.  So, tell the user we forwarded it.
1054
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1055
    sender: str | None = ""
1✔
1056
    if decrypted_metadata is not None:
1!
1057
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1058
    ctx = {
1✔
1059
        "sender": sender or "",
1060
        "forwarded": forwarded,
1061
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1062
    }
1063
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1064
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1065

1066
    # Create the message
1067
    msg = EmailMessage()
1✔
1068
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1069
    msg["From"] = get_reply_to_address()
1✔
1070
    msg["To"] = from_address
1✔
1071
    if message_id:
1!
1072
        msg["In-Reply-To"] = message_id
1✔
1073
        msg["References"] = message_id
1✔
1074
    msg.set_content(text_body)
1✔
1075
    msg.add_alternative(html_body, subtype="html")
1✔
1076
    return msg
1✔
1077

1078

1079
def _set_forwarded_first_reply(profile):
1✔
1080
    profile.forwarded_first_reply = True
1✔
1081
    profile.save()
1✔
1082

1083

1084
def _send_reply_requires_premium_email(
1✔
1085
    from_address: str,
1086
    reply_record: Reply,
1087
    message_id: str | None,
1088
    decrypted_metadata: dict[str, Any] | None,
1089
) -> None:
1090
    msg = _build_reply_requires_premium_email(
×
1091
        from_address, reply_record, message_id, decrypted_metadata
1092
    )
1093
    try:
×
1094
        ses_send_raw_email(
×
1095
            source_address=get_reply_to_address(premium=False),
1096
            destination_address=from_address,
1097
            message=msg,
1098
        )
1099
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1100
        # So, updated the DB.
1101
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1102
    except ClientError as e:
×
1103
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1104
    incr_if_enabled("free_user_reply_attempt", 1)
×
1105

1106

1107
def _reply_allowed(
1✔
1108
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1109
):
1110
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1111
    reply_record_email = reply_record.address.user.email
1✔
1112
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1113
    if (from_address == reply_record_email) or (
1!
1114
        stripped_from_address == stripped_reply_record_address
1115
    ):
1116
        # This is a Relay user replying to an external sender;
1117

1118
        if reply_record.profile.is_flagged:
1!
1119
            return False
×
1120

1121
        if reply_record.owner_has_premium:
1!
1122
            return True
1✔
1123

1124
        # if we haven't forwarded a first reply for this user, return True to allow
1125
        # this first reply
1126
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
1127
        _send_reply_requires_premium_email(
×
1128
            from_address, reply_record, message_id, decrypted_metadata
1129
        )
1130
        return allow_first_reply
×
1131
    else:
1132
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1133
        # premium Relay user
1134
        try:
×
1135
            address = _get_address(to_address)
×
1136
            if address.user.profile.has_premium:
×
1137
                return True
×
1138
        except ObjectDoesNotExist:
×
1139
            return False
×
1140
    incr_if_enabled("free_user_reply_attempt", 1)
×
1141
    return False
×
1142

1143

1144
def _handle_reply(
1✔
1145
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1146
) -> HttpResponse:
1147
    """
1148
    Handle a reply from a Relay user to an external email.
1149

1150
    Returns (may be incomplete):
1151
    * 200 if the reply was sent
1152
    * 400 if the In-Reply-To and References headers are missing, none of the References
1153
      headers are a reply record, or the SES client raises an error
1154
    * 403 if the Relay user is not allowed to reply
1155
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1156
      the database
1157
    * 503 if the S3 client returns an error (other than not found), or the SES client
1158
      returns an error
1159

1160
    TODO: Return a more appropriate status object (see _handle_received)
1161
    TODO: Document metrics emitted
1162
    """
1163
    mail = message_json["mail"]
1✔
1164
    try:
1✔
1165
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1166
    except ReplyHeadersNotFound:
1✔
1167
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1168
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1169

1170
    try:
1✔
1171
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1172
    except Reply.DoesNotExist:
1✔
1173
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1174
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1175

1176
    address = reply_record.address
1✔
1177
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1178
    decrypted_metadata = json.loads(
1✔
1179
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1180
    )
1181
    if not _reply_allowed(
1!
1182
        from_address, to_address, reply_record, message_id, decrypted_metadata
1183
    ):
1184
        # TODO: should we return a 200 OK here?
1185
        return HttpResponse("Relay replies require a premium account", status=403)
×
1186

1187
    outbound_from_address = address.full_address
1✔
1188
    incr_if_enabled("reply_email", 1)
1✔
1189
    subject = mail["commonHeaders"].get("subject", "")
1✔
1190
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1191
    headers: OutgoingHeaders = {
1✔
1192
        "Subject": subject,
1193
        "From": outbound_from_address,
1194
        "To": to_address,
1195
        "Reply-To": outbound_from_address,
1196
    }
1197

1198
    try:
1✔
1199
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1200
    except ClientError as e:
×
1201
        if e.response["Error"].get("Code", "") == "NoSuchKey":
×
1202
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
×
1203
            return HttpResponse("Email not in S3", status=404)
×
1204
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
×
1205
        # we are returning a 500 so that SNS can retry the email processing
1206
        return HttpResponse("Cannot fetch the message content from S3", status=503)
×
1207

1208
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1209
    assert isinstance(email, EmailMessage)
1✔
1210

1211
    # Convert to a reply email
1212
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1213
    _replace_headers(email, headers)
1✔
1214

1215
    try:
1✔
1216
        ses_send_raw_email(
1✔
1217
            source_address=outbound_from_address,
1218
            destination_address=to_address,
1219
            message=email,
1220
        )
1221
    except ClientError as e:
×
1222
        logger.error("ses_client_error", extra=e.response["Error"])
×
1223
        return HttpResponse("SES client error", status=400)
×
1224

1225
    reply_record.increment_num_replied()
1✔
1226
    profile = address.user.profile
1✔
1227
    profile.update_abuse_metric(replied=True)
1✔
1228
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1229

1230

1231
def _get_domain_address(local_portion: str, domain_portion: str) -> DomainAddress:
1✔
1232
    """
1233
    Find or create the DomainAddress for the parts of an email address.
1234

1235
    If the domain_portion is for a valid subdomain, a new DomainAddress
1236
    will be created and returned.
1237

1238
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1239

1240
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1241
    """
1242

1243
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1244
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1245
        incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1246
        raise ObjectDoesNotExist("Address does not exist")
1✔
1247
    try:
1✔
1248
        with transaction.atomic():
1✔
1249
            locked_profile = Profile.objects.select_for_update().get(
1✔
1250
                subdomain=address_subdomain
1251
            )
1252
            domain_numerical = get_domain_numerical(address_domain)
1✔
1253
            # filter DomainAddress because it may not exist
1254
            # which will throw an error with get()
1255
            domain_address = DomainAddress.objects.filter(
1✔
1256
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1257
            ).first()
1258
            if domain_address is None:
1✔
1259
                # TODO: Consider flows when a user generating alias on a fly
1260
                # was unable to receive an email due to user no longer being a
1261
                # premium user as seen in exception thrown on make_domain_address
1262
                domain_address = DomainAddress.make_domain_address(
1✔
1263
                    locked_profile, local_portion, True
1264
                )
1265
            domain_address.last_used_at = datetime.now(timezone.utc)
1✔
1266
            domain_address.save()
1✔
1267
            return domain_address
1✔
1268
    except Profile.DoesNotExist as e:
1✔
1269
        incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1270
        raise e
1✔
1271

1272

1273
def _get_address(address: str) -> RelayAddress | DomainAddress:
1✔
1274
    """
1275
    Find or create the RelayAddress or DomainAddress for an email address.
1276

1277
    If an unknown email address is for a valid subdomain, a new DomainAddress
1278
    will be created.
1279

1280
    On failure, raises exception based on Django's ObjectDoesNotExist:
1281
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1282
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1283
    * ObjectDoesNotExist - Unknown domain
1284
    """
1285

1286
    local_portion, domain_portion = address.split("@")
1✔
1287
    local_address = local_portion.lower()
1✔
1288
    domain = domain_portion.lower()
1✔
1289

1290
    # if the domain is not the site's 'top' relay domain,
1291
    # it may be for a user's subdomain
1292
    email_domains = get_domains_from_settings().values()
1✔
1293
    if domain not in email_domains:
1✔
1294
        return _get_domain_address(local_address, domain)
1✔
1295

1296
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1297
    try:
1✔
1298
        domain_numerical = get_domain_numerical(domain)
1✔
1299
        relay_address = RelayAddress.objects.get(
1✔
1300
            address=local_address, domain=domain_numerical
1301
        )
1302
        return relay_address
1✔
1303
    except RelayAddress.DoesNotExist as e:
1✔
1304
        try:
1✔
1305
            DeletedAddress.objects.get(
1✔
1306
                address_hash=address_hash(local_address, domain=domain)
1307
            )
1308
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1309
            # TODO: create a hard bounce receipt rule in SES
1310
        except DeletedAddress.DoesNotExist:
1✔
1311
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1312
        except DeletedAddress.MultipleObjectsReturned:
1✔
1313
            # not sure why this happens on stage but let's handle it
1314
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1315
        raise e
1✔
1316

1317

1318
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1319
    """
1320
    Handle an AWS SES bounce notification.
1321

1322
    For more information, see:
1323
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1324

1325
    Returns:
1326
    * 404 response if any email address does not match a user,
1327
    * 200 response if all match or none are given
1328

1329
    Emits a counter metric "email_bounce" with these tags:
1330
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1331
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1332
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1333
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1334

1335
    Emits an info log "bounce_notification", same data as metric, plus:
1336
    * bounce_action: 'action' from bounced recipient data, or None
1337
    * bounce_status: 'status' from bounced recipient data, or None
1338
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1339
    * bounce_extra: Extra data from bounce_recipient data, if any
1340
    * domain: User's real email address domain, if an address was given
1341

1342
    Emits a legacy log "bounced recipient domain: {domain}", with data from
1343
    bounced recipient data, without the email address.
1344
    """
1345
    bounce = message_json.get("bounce", {})
1✔
1346
    bounce_type = bounce.get("bounceType", "none")
1✔
1347
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1348
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1349

1350
    now = datetime.now(timezone.utc)
1✔
1351
    bounce_data = []
1✔
1352
    for recipient in bounced_recipients:
1✔
1353
        recipient_address = recipient.pop("emailAddress", None)
1✔
1354
        data = {
1✔
1355
            "bounce_type": bounce_type,
1356
            "bounce_subtype": bounce_subtype,
1357
            "bounce_action": recipient.pop("action", ""),
1358
            "bounce_status": recipient.pop("status", ""),
1359
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1360
            "user_match": "no_address",
1361
            "relay_action": "no_action",
1362
        }
1363
        if recipient:
1!
1364
            data["bounce_extra"] = recipient.copy()
×
1365
        bounce_data.append(data)
1✔
1366

1367
        if recipient_address is None:
1!
1368
            continue
×
1369

1370
        recipient_address = parseaddr(recipient_address)[1]
1✔
1371
        recipient_domain = recipient_address.split("@")[1]
1✔
1372
        data["domain"] = recipient_domain
1✔
1373

1374
        try:
1✔
1375
            user = User.objects.get(email=recipient_address)
1✔
1376
            profile = user.profile
1✔
1377
            data["user_match"] = "found"
1✔
1378
        except User.DoesNotExist:
1✔
1379
            # TODO: handle bounce for a user who no longer exists
1380
            # add to SES account-wide suppression list?
1381
            data["user_match"] = "missing"
1✔
1382
            continue
1✔
1383

1384
        action = None
1✔
1385
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1386
            # if an email bounced as spam, set to auto block spam for this user
1387
            # and DON'T set them into bounce pause state
1388
            action = "auto_block_spam"
1✔
1389
            profile.auto_block_spam = True
1✔
1390
        elif bounce_type == "Permanent":
1✔
1391
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1392
            action = "hard_bounce"
1✔
1393
            profile.last_hard_bounce = now
1✔
1394
        elif bounce_type == "Transient":
1!
1395
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1396
            action = "soft_bounce"
1✔
1397
            profile.last_soft_bounce = now
1✔
1398
        if action:
1!
1399
            data["relay_action"] = action
1✔
1400
            profile.save()
1✔
1401

1402
    if not bounce_data:
1!
1403
        # Data when there are no identified recipients
1404
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1405

1406
    for data in bounce_data:
1✔
1407
        tags = {
1✔
1408
            "bounce_type": bounce_type,
1409
            "bounce_subtype": bounce_subtype,
1410
            "user_match": data["user_match"],
1411
            "relay_action": data["relay_action"],
1412
        }
1413
        incr_if_enabled(
1✔
1414
            "email_bounce",
1415
            1,
1416
            tags=[generate_tag(key, val) for key, val in tags.items()],
1417
        )
1418
        info_logger.info("bounce_notification", extra=data)
1✔
1419

1420
        # Legacy log, can be removed Q4 2023
1421
        recipient_domain = data.get("domain")
1✔
1422
        if recipient_domain:
1!
1423
            legacy_extra = {
1✔
1424
                "action": data.get("bounce_action"),
1425
                "status": data.get("bounce_status"),
1426
                "diagnosticCode": data.get("bounce_diagnostic"),
1427
            }
1428
            legacy_extra.update(data.get("bounce_extra", {}))
1✔
1429
            info_logger.info(
1✔
1430
                f"bounced recipient domain: {recipient_domain}", extra=legacy_extra
1431
            )
1432

1433
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1434
        return HttpResponse("Address does not exist", status=404)
1✔
1435
    return HttpResponse("OK", status=200)
1✔
1436

1437

1438
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1439
    """
1440
    Handle an AWS SES complaint notification.
1441

1442
    For more information, see:
1443
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1444

1445
    Returns:
1446
    * 404 response if any email address does not match a user,
1447
    * 200 response if all match or none are given
1448

1449
    Emits a counter metric "email_complaint" with these tags:
1450
    * complaint_subtype: 'onaccounsuppressionlist', or 'none' if omitted
1451
    * complaint_feedback - feedback enumeration from ISP or 'none'
1452
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1453
    * relay_action: 'no_action', 'auto_block_spam'
1454

1455
    Emits an info log "complaint_notification", same data as metric, plus:
1456
    * complaint_user_agent - identifies the client used to file the complaint
1457
    * complaint_extra - Extra data from complainedRecipients data, if any
1458
    * domain - User's domain, if an address was given
1459

1460
    Emits a legacy log "complaint_received", with data:
1461
    * recipient_domains: list of extracted user domains
1462
    * subtype: 'onaccounsuppressionlist', or 'none'
1463
    * feedback: feedback from ISP or 'none'
1464
    """
1465
    complaint = deepcopy(message_json.get("complaint", {}))
1✔
1466
    complained_recipients = complaint.pop("complainedRecipients", [])
1✔
1467
    subtype = complaint.pop("complaintSubType", None)
1✔
1468
    user_agent = complaint.pop("userAgent", None)
1✔
1469
    feedback = complaint.pop("complaintFeedbackType", None)
1✔
1470

1471
    complaint_data = []
1✔
1472
    for recipient in complained_recipients:
1✔
1473
        recipient_address = recipient.pop("emailAddress", None)
1✔
1474
        data = {
1✔
1475
            "complaint_subtype": subtype,
1476
            "complaint_user_agent": user_agent,
1477
            "complaint_feedback": feedback,
1478
            "user_match": "no_address",
1479
            "relay_action": "no_action",
1480
        }
1481
        if recipient:
1!
1482
            data["complaint_extra"] = recipient.copy()
×
1483
        complaint_data.append(data)
1✔
1484

1485
        if recipient_address is None:
1!
1486
            continue
×
1487

1488
        recipient_address = parseaddr(recipient_address)[1]
1✔
1489
        recipient_domain = recipient_address.split("@")[1]
1✔
1490
        data["domain"] = recipient_domain
1✔
1491

1492
        try:
1✔
1493
            user = User.objects.get(email=recipient_address)
1✔
1494
            profile = user.profile
1✔
1495
            data["user_match"] = "found"
1✔
1496
        except User.DoesNotExist:
×
1497
            data["user_match"] = "missing"
×
1498
            continue
×
1499

1500
        data["relay_action"] = "auto_block_spam"
1✔
1501
        profile.auto_block_spam = True
1✔
1502
        profile.save()
1✔
1503

1504
    if not complaint_data:
1!
1505
        # Data when there are no identified recipients
1506
        complaint_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1507

1508
    for data in complaint_data:
1✔
1509
        tags = {
1✔
1510
            "complaint_subtype": subtype or "none",
1511
            "complaint_feedback": feedback or "none",
1512
            "user_match": data["user_match"],
1513
            "relay_action": data["relay_action"],
1514
        }
1515
        incr_if_enabled(
1✔
1516
            "email_complaint",
1517
            1,
1518
            tags=[generate_tag(key, val) for key, val in tags.items()],
1519
        )
1520
        info_logger.info("complaint_notification", extra=data)
1✔
1521

1522
    # Legacy log, can be removed Q4 2023
1523
    domains = [data["domain"] for data in complaint_data if "domain" in data]
1✔
1524
    info_logger.info(
1✔
1525
        "complaint_received",
1526
        extra={
1527
            "recipient_domains": sorted(domains),
1528
            "subtype": subtype,
1529
            "feedback": feedback,
1530
        },
1531
    )
1532

1533
    if any(data["user_match"] == "missing" for data in complaint_data):
1!
1534
        return HttpResponse("Address does not exist", status=404)
×
1535
    return HttpResponse("OK", status=200)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc