• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / f6cbc6bd-9e3e-4239-9493-f713ff70bac9

22 Dec 2023 09:15PM CUT coverage: 73.652% (+0.1%) from 73.532%
f6cbc6bd-9e3e-4239-9493-f713ff70bac9

push

circleci

rafeerahman
Detection for OTP within emails and an api for retrieval

1989 of 2943 branches covered (0.0%)

Branch coverage included in aggregate %.

68 of 78 new or added lines in 2 files covered. (87.18%)

1 existing line in 1 file now uncovered.

6341 of 8367 relevant lines covered (75.79%)

19.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.02
/emails/views.py
1
from bs4 import BeautifulSoup
1✔
2
from collections import defaultdict
1✔
3
from copy import deepcopy
1✔
4
from datetime import datetime, timezone
1✔
5
from email import message_from_bytes
1✔
6
from email.iterators import _structure
1✔
7
from email.message import EmailMessage
1✔
8
from email.utils import parseaddr
1✔
9
import html
1✔
10
from io import StringIO
1✔
11
import json
1✔
12
from json import JSONDecodeError
1✔
13
import logging
1✔
14
import re
1✔
15
import shlex
1✔
16
from textwrap import dedent
1✔
17
from typing import Any, Literal, Optional
1✔
18
from urllib.parse import urlencode
1✔
19

20
from botocore.exceptions import ClientError
1✔
21
from codetiming import Timer
1✔
22
from decouple import strtobool
1✔
23
from django.shortcuts import render
1✔
24
from sentry_sdk import capture_message
1✔
25
from markus.utils import generate_tag
1✔
26
from waffle import sample_is_active
1✔
27

28
from django.conf import settings
1✔
29
from django.contrib.auth.models import User
1✔
30
from django.core.exceptions import ObjectDoesNotExist
1✔
31
from django.core.cache import cache
1✔
32
from django.db import transaction
1✔
33
from django.db.models import prefetch_related_objects
1✔
34
from django.http import HttpRequest, HttpResponse
1✔
35
from django.template.loader import render_to_string
1✔
36
from django.utils.html import escape
1✔
37
from django.views.decorators.csrf import csrf_exempt
1✔
38

39
from privaterelay.utils import get_subplat_upgrade_link_by_language
1✔
40

41

42
from .models import (
1✔
43
    CannotMakeAddressException,
44
    DeletedAddress,
45
    DomainAddress,
46
    Profile,
47
    RelayAddress,
48
    Reply,
49
    address_hash,
50
    get_domain_numerical,
51
    get_domains_from_settings,
52
)
53
from .policy import relay_policy
1✔
54
from .types import (
1✔
55
    AWS_SNSMessageJSON,
56
    OutgoingHeaders,
57
    EmailForwardingIssues,
58
    EmailHeaderIssues,
59
)
60
from .utils import (
1✔
61
    _get_bucket_and_key_from_s3_json,
62
    _store_reply_record,
63
    b64_lookup_key,
64
    count_all_trackers,
65
    decrypt_reply_metadata,
66
    derive_reply_keys,
67
    generate_from_header,
68
    get_message_content_from_s3,
69
    get_message_id_bytes,
70
    get_reply_to_address,
71
    histogram_if_enabled,
72
    incr_if_enabled,
73
    remove_message_from_s3,
74
    remove_trackers,
75
    ses_send_raw_email,
76
    urlize_and_linebreaks,
77
    InvalidFromHeader,
78
    parse_email_header,
79
)
80
from .sns import verify_from_sns, SUPPORTED_SNS_TYPES
1✔
81

82
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
83
from privaterelay.utils import flag_is_active_in_task
1✔
84

85
logger = logging.getLogger("events")
1✔
86
info_logger = logging.getLogger("eventsinfo")
1✔
87

88

89
class ReplyHeadersNotFound(Exception):
1✔
90
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
91
        self.message = message
1✔
92

93

94
def first_time_user_test(request):
1✔
95
    """
96
    Demonstrate rendering of the "First time Relay user" email.
97
    Settings like language can be given in the querystring, otherwise settings
98
    come from a random free profile.
99
    """
100
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
101
    email_context = {
×
102
        "in_bundle_country": in_bundle_country,
103
        "SITE_ORIGIN": settings.SITE_ORIGIN,
104
    }
105
    if request.GET.get("format", "html") == "text":
×
106
        return render(
×
107
            request,
108
            "emails/first_time_user.txt",
109
            email_context,
110
            "text/plain; charset=utf-8",
111
        )
112
    return render(request, "emails/first_time_user.html", email_context)
×
113

114

115
def reply_requires_premium_test(request):
1✔
116
    """
117
    Demonstrate rendering of the "Reply requires premium" email.
118

119
    Settings like language can be given in the querystring, otherwise settings
120
    come from a random free profile.
121
    """
122
    email_context = {
1✔
123
        "sender": "test@example.com",
124
        "forwarded": True,
125
        "SITE_ORIGIN": settings.SITE_ORIGIN,
126
    }
127
    for param in request.GET:
1✔
128
        email_context[param] = request.GET.get(param)
1✔
129
        if param == "forwarded" and request.GET[param] == "True":
1✔
130
            email_context[param] = True
1✔
131

132
    for param in request.GET:
1✔
133
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
134
            return render(
1✔
135
                request,
136
                "emails/reply_requires_premium.txt",
137
                email_context,
138
                "text/plain; charset=utf-8",
139
            )
140
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
141

142

143
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
144
    # TO DO: Update with correct context when trigger is created
145
    first_forwarded_email_html = render_to_string(
×
146
        "emails/first_forwarded_email.html",
147
        {
148
            "SITE_ORIGIN": settings.SITE_ORIGIN,
149
        },
150
    )
151

152
    wrapped_email = wrap_html_email(
×
153
        first_forwarded_email_html,
154
        "en-us",
155
        True,
156
        "test@example.com",
157
        0,
158
    )
159

160
    return HttpResponse(wrapped_email)
×
161

162

163
def wrap_html_email(
1✔
164
    original_html: str,
165
    language: str,
166
    has_premium: bool,
167
    display_email: str,
168
    num_level_one_email_trackers_removed: int | None = None,
169
    tracker_report_link: str | None = None,
170
) -> str:
171
    """Add Relay banners, surveys, etc. to an HTML email"""
172
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
173
    email_context = {
1✔
174
        "original_html": original_html,
175
        "language": language,
176
        "has_premium": has_premium,
177
        "subplat_upgrade_link": subplat_upgrade_link,
178
        "display_email": display_email,
179
        "tracker_report_link": tracker_report_link,
180
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
181
        "SITE_ORIGIN": settings.SITE_ORIGIN,
182
    }
183
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
184
    # Remove empty lines
185
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
186
    return "\n".join(content_lines) + "\n"
1✔
187

188

189
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
190
    """
191
    Demonstrate rendering of forwarded HTML emails.
192

193
    Settings like language can be given in the querystring, otherwise settings
194
    come from a randomly chosen profile.
195
    """
196

197
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
198
        user_profile = None
1✔
199
    else:
200
        user_profile = Profile.objects.order_by("?").first()
1✔
201

202
    if "language" in request.GET:
1✔
203
        language = request.GET["language"]
1✔
204
    else:
205
        assert user_profile is not None
1✔
206
        language = user_profile.language
1✔
207

208
    if "has_premium" in request.GET:
1✔
209
        has_premium = strtobool(request.GET["has_premium"])
1✔
210
    else:
211
        assert user_profile is not None
1✔
212
        has_premium = user_profile.has_premium
1✔
213

214
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
215
        num_level_one_email_trackers_removed = int(
1✔
216
            request.GET["num_level_one_email_trackers_removed"]
217
        )
218
    else:
219
        num_level_one_email_trackers_removed = 0
1✔
220

221
    if "has_tracker_report_link" in request.GET:
1✔
222
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
223
    else:
224
        has_tracker_report_link = False
1✔
225
    if has_tracker_report_link:
1✔
226
        if num_level_one_email_trackers_removed:
1✔
227
            trackers = {
1✔
228
                "fake-tracker.example.com": num_level_one_email_trackers_removed
229
            }
230
        else:
231
            trackers = {}
1✔
232
        tracker_report_link = (
1✔
233
            "/tracker-report/#{"
234
            '"sender": "sender@example.com", '
235
            '"received_at": 1658434657, '
236
            f'"trackers": { json.dumps(trackers) }'
237
            "}"
238
        )
239
    else:
240
        tracker_report_link = ""
1✔
241

242
    path = "/emails/wrapped_email_test"
1✔
243
    old_query = {
1✔
244
        "language": language,
245
        "has_premium": "Yes" if has_premium else "No",
246
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
247
        "num_level_one_email_trackers_removed": str(
248
            num_level_one_email_trackers_removed
249
        ),
250
    }
251

252
    def switch_link(key, value):
1✔
253
        if old_query[key] == value:
1✔
254
            return str(value)
1✔
255
        new_query = old_query.copy()
1✔
256
        new_query[key] = value
1✔
257
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
258

259
    html_content = dedent(
1✔
260
        f"""\
261
    <p>
262
      <strong>Email rendering Test</strong>
263
    </p>
264
    <p>Settings: (<a href="{path}">clear all</a>)</p>
265
    <ul>
266
      <li>
267
        <strong>language</strong>:
268
        {escape(language)}
269
        (switch to
270
        {switch_link("language", "en-us")},
271
        {switch_link("language", "de")},
272
        {switch_link("language", "en-gb")},
273
        {switch_link("language", "fr")},
274
        {switch_link("language", "ru-ru")},
275
        {switch_link("language", "es-es")},
276
        {switch_link("language", "pt-br")},
277
        {switch_link("language", "it-it")},
278
        {switch_link("language", "en-ca")},
279
        {switch_link("language", "de-de")},
280
        {switch_link("language", "es-mx")})
281
      </li>
282
      <li>
283
        <strong>has_premium</strong>:
284
        {"Yes" if has_premium else "No"}
285
        (switch to
286
        {switch_link("has_premium", "Yes")},
287
        {switch_link("has_premium", "No")})
288
      </li>
289
      <li>
290
        <strong>has_tracker_report_link</strong>:
291
        {"Yes" if has_tracker_report_link else "No"}
292
        (switch to
293
        {switch_link("has_tracker_report_link", "Yes")},
294
        {switch_link("has_tracker_report_link", "No")})
295
      </li>
296
      <li>
297
        <strong>num_level_one_email_trackers_removed</strong>:
298
        {num_level_one_email_trackers_removed}
299
        (switch to
300
        {switch_link("num_level_one_email_trackers_removed", "0")},
301
        {switch_link("num_level_one_email_trackers_removed", "1")},
302
        {switch_link("num_level_one_email_trackers_removed", "2")})
303
      </li>
304
    </ul>
305
    """
306
    )
307

308
    wrapped_email = wrap_html_email(
1✔
309
        original_html=html_content,
310
        language=language,
311
        has_premium=has_premium,
312
        tracker_report_link=tracker_report_link,
313
        display_email="test@relay.firefox.com",
314
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
315
    )
316
    return HttpResponse(wrapped_email)
1✔
317

318

319
@csrf_exempt
1✔
320
def sns_inbound(request):
1✔
321
    incr_if_enabled("sns_inbound", 1)
1✔
322
    # First thing we do is verify the signature
323
    json_body = json.loads(request.body)
1✔
324
    verified_json_body = verify_from_sns(json_body)
1✔
325

326
    # Validate ARN and message type
327
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
328
    message_type = verified_json_body.get("Type", None)
1✔
329
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
330
    if error_details:
1✔
331
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
332
        return HttpResponse(error_details["error"], status=400)
1✔
333

334
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
335

336

337
def validate_sns_arn_and_type(
1✔
338
    topic_arn: str, message_type: str
339
) -> Optional[dict[str, Any]]:
340
    """
341
    Validate Topic ARN and SNS Message Type.
342

343
    If an error is detected, the return is a dictionary of error details.
344
    If no error is detected, the return is None.
345
    """
346
    if not topic_arn:
1✔
347
        error = "Received SNS request without Topic ARN."
1✔
348
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
349
        error = "Received SNS message for wrong topic."
1✔
350
    elif not message_type:
1✔
351
        error = "Received SNS request without Message Type."
1✔
352
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
353
        error = "Received SNS message for unsupported Type."
1✔
354
    else:
355
        error = None
1✔
356

357
    if error:
1✔
358
        return {
1✔
359
            "error": error,
360
            "received_topic_arn": shlex.quote(topic_arn),
361
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
362
            "received_sns_type": shlex.quote(message_type),
363
            "supported_sns_types": SUPPORTED_SNS_TYPES,
364
        }
365
    return None
1✔
366

367

368
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
369
    if message_type == "SubscriptionConfirmation":
×
370
        info_logger.info(
×
371
            "SNS SubscriptionConfirmation",
372
            extra={"SubscribeURL": json_body["SubscribeURL"]},
373
        )
374
        return HttpResponse("Logged SubscribeURL", status=200)
×
375
    if message_type == "Notification":
×
376
        incr_if_enabled("sns_inbound_Notification", 1)
×
377
        return _sns_notification(json_body)
×
378

379
    logger.error(
×
380
        "SNS message type did not fall under the SNS inbound logic",
381
        extra={"message_type": shlex.quote(message_type)},
382
    )
383
    capture_message(
×
384
        "Received SNS message with type not handled in inbound log",
385
        level="error",
386
        stack=True,
387
    )
388
    return HttpResponse(
×
389
        "Received SNS message with type not handled in inbound log", status=400
390
    )
391

392

393
def _sns_notification(json_body):
1✔
394
    try:
1✔
395
        message_json = json.loads(json_body["Message"])
1✔
396
    except JSONDecodeError:
1✔
397
        logger.error(
1✔
398
            "SNS notification has non-JSON message body",
399
            extra={"content": shlex.quote(json_body["Message"])},
400
        )
401
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
402

403
    event_type = message_json.get("eventType")
1✔
404
    notification_type = message_json.get("notificationType")
1✔
405
    if notification_type not in {
1✔
406
        "Complaint",
407
        "Received",
408
        "Bounce",
409
    } and event_type not in {"Complaint", "Bounce"}:
410
        logger.error(
1✔
411
            "SNS notification for unsupported type",
412
            extra={
413
                "notification_type": shlex.quote(notification_type),
414
                "event_type": shlex.quote(event_type),
415
                "keys": [shlex.quote(key) for key in message_json.keys()],
416
            },
417
        )
418
        return HttpResponse(
1✔
419
            "Received SNS notification for unsupported Type: %s"
420
            % html.escape(shlex.quote(notification_type)),
421
            status=400,
422
        )
423
    response = _sns_message(message_json)
1✔
424
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
425
    if response.status_code < 500:
1✔
426
        remove_message_from_s3(bucket, object_key)
1✔
427

428
    return response
1✔
429

430

431
def _get_recipient_with_relay_domain(recipients):
1✔
432
    domains_to_check = get_domains_from_settings().values()
1✔
433
    for recipient in recipients:
1✔
434
        for domain in domains_to_check:
1✔
435
            if domain in recipient:
1✔
436
                return recipient
1✔
437
    return None
1✔
438

439

440
def _get_relay_recipient_from_message_json(message_json):
1✔
441
    # Go thru all To, Cc, and Bcc fields and
442
    # return the one that has a Relay domain
443

444
    # First check commmon headers for to or cc match
445
    headers_to_check = "to", "cc"
1✔
446
    common_headers = message_json["mail"]["commonHeaders"]
1✔
447
    for header in headers_to_check:
1✔
448
        if header in common_headers:
1✔
449
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
450
            if recipient is not None:
1✔
451
                return parseaddr(recipient)[1]
1✔
452

453
    # SES-SNS sends bcc in a different part of the message
454
    recipients = message_json["receipt"]["recipients"]
1✔
455
    return _get_recipient_with_relay_domain(recipients)
1✔
456

457

458
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
459
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
460
    notification_type = message_json.get("notificationType")
1✔
461
    event_type = message_json.get("eventType")
1✔
462
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
463
        return _handle_bounce(message_json)
1✔
464
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
465
        return _handle_complaint(message_json)
1✔
466
    assert notification_type == "Received" and event_type is None
1✔
467
    return _handle_received(message_json)
1✔
468

469

470
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
471
    """
472
    Handle an AWS SES received notification.
473

474
    For more information, see:
475
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
476
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
477

478
    Returns (may be incomplete):
479
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
480
      flagged for abuse, the email is under a bounce pause, the email was suppressed
481
      for spam, the list email was blocked, or the noreply address was the recipient.
482
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
483
      the email failed DMARC with reject policy, or the email is a reply chain to a
484
      non-premium user.
485
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
486
      "CC", or "BCC" fields, or the Relay address is not in the database.
487
    * 503 if the "From" address is malformed, the S3 client returned an error different
488
      from "not found", or the SES client fails
489

490
    And many other returns conditions if the email is a reply. The HTTP returns are an
491
    artifact from an earlier time when emails were sent to a webhook. Currently,
492
    production instead pulls events from a queue.
493

494
    TODO: Return a more appropriate status object
495
    TODO: Document the metrics emitted
496
    """
497
    mail = message_json["mail"]
1✔
498
    if "commonHeaders" not in mail:
1✔
499
        logger.error("SNS message without commonHeaders")
1✔
500
        return HttpResponse(
1✔
501
            "Received SNS notification without commonHeaders.", status=400
502
        )
503
    common_headers = mail["commonHeaders"]
1✔
504
    receipt = message_json["receipt"]
1✔
505

506
    _record_receipt_verdicts(receipt, "all")
1✔
507
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
508
    if to_address is None:
1✔
509
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
510
        return HttpResponse("Address does not exist", status=404)
1✔
511

512
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
513
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
514
    if not from_addresses:
1✔
515
        info_logger.error(
1✔
516
            "_handle_received: no from address",
517
            extra={
518
                "source": mail["source"],
519
                "common_headers_from": common_headers["from"],
520
            },
521
        )
522
        return HttpResponse("Unable to parse From address", status=400)
1✔
523
    from_address = from_addresses[0][1]
1✔
524

525
    try:
1✔
526
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
527
    except ValueError:
1✔
528
        # TODO: Add metric
529
        return HttpResponse("Malformed to field.", status=400)
1✔
530

531
    if to_local_portion.lower() == "noreply":
1✔
532
        incr_if_enabled("email_for_noreply_address", 1)
1✔
533
        return HttpResponse("noreply address is not supported.")
1✔
534
    try:
1✔
535
        # FIXME: this ambiguous return of either
536
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
537
        # up a bit.
538
        address = _get_address(to_address)
1✔
539
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
540
        user_profile = address.user.profile
1✔
541
    except (
1✔
542
        ObjectDoesNotExist,
543
        CannotMakeAddressException,
544
        DeletedAddress.MultipleObjectsReturned,
545
    ):
546
        if to_local_portion.lower() == "replies":
1✔
547
            response = _handle_reply(from_address, message_json, to_address)
1✔
548
        else:
549
            response = HttpResponse("Address does not exist", status=404)
1✔
550
        return response
1✔
551

552
    _record_receipt_verdicts(receipt, "valid_user")
1✔
553
    # if this is spam and the user is set to auto-block spam, early return
554
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
555
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
556
        return HttpResponse("Address rejects spam.")
1✔
557

558
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
559
        policy = receipt.get("dmarcPolicy", "none")
1✔
560
        # TODO: determine action on dmarcPolicy "quarantine"
561
        if policy == "reject":
1!
562
            incr_if_enabled(
1✔
563
                "email_suppressed_for_dmarc_failure",
564
                1,
565
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
566
            )
567
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
568

569
    # if this user is over bounce limits, early return
570
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
571
    if bounce_paused:
1✔
572
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
573
        incr_if_enabled("email_suppressed_for_%s_bounce" % bounce_type, 1)
1✔
574
        return HttpResponse("Address is temporarily disabled.")
1✔
575

576
    # check if this is a reply from an external sender to a Relay user
577
    try:
1✔
578
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
579
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
580
        address = reply_record.address
1✔
581
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
582
        # make sure the relay user is premium
583
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
584
            # TODO: Add metrics
585
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
586
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
587
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
588
        # match a Reply record, continue to treat this as a regular email from
589
        # an external sender to a relay user
590
        pass
1✔
591

592
    # if account flagged for abuse, early return
593
    if user_profile.is_flagged:
1!
594
        return HttpResponse("Address is temporarily disabled.")
×
595

596
    # if address is set to block, early return
597
    if not address.enabled:
1✔
598
        incr_if_enabled("email_for_disabled_address", 1)
1✔
599
        address.num_blocked += 1
1✔
600
        address.save(update_fields=["num_blocked"])
1✔
601
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
602
        # TODO: Add metrics
603
        return HttpResponse("Address is temporarily disabled.")
1✔
604

605
    _record_receipt_verdicts(receipt, "active_alias")
1✔
606
    incr_if_enabled("email_for_active_address", 1)
1✔
607

608
    # if address is blocking list emails, and email is from list, early return
609
    if (
1✔
610
        address
611
        and address.block_list_emails
612
        and user_profile.has_premium
613
        and _check_email_from_list(mail["headers"])
614
    ):
615
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
616
        address.num_blocked += 1
1✔
617
        address.save(update_fields=["num_blocked"])
1✔
618
        return HttpResponse("Address is not accepting list emails.")
1✔
619

620
    # Collect new headers
621
    subject = common_headers.get("subject", "")
1✔
622
    destination_address = user_profile.user.email
1✔
623
    reply_address = get_reply_to_address()
1✔
624
    try:
1✔
625
        from_header = generate_from_header(from_address, to_address)
1✔
626
    except InvalidFromHeader:
1✔
627
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
628
        header_from = []
1✔
629
        for header in mail["headers"]:
1✔
630
            if header["name"].lower() == "from":
1✔
631
                header_from.append(header)
1✔
632
        info_logger.error(
1✔
633
            "generate_from_header",
634
            extra={
635
                "from_address": from_address,
636
                "source": mail["source"],
637
                "common_headers_from": common_headers["from"],
638
                "headers_from": header_from,
639
            },
640
        )
641
        return HttpResponse("Cannot parse the From address", status=503)
1✔
642

643
    headers: OutgoingHeaders = {
1✔
644
        "Subject": subject,
645
        "From": from_header,
646
        "To": destination_address,
647
        "Reply-To": reply_address,
648
        "Resent-From": from_address,
649
    }
650

651
    # Get incoming email
652
    try:
1✔
653
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
654
    except ClientError as e:
1✔
655
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1!
656
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
×
657
            return HttpResponse("Email not in S3", status=404)
×
658
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
659
        # we are returning a 503 so that SNS can retry the email processing
660
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
661

662
    # Convert to new email
663
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
664
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
665
    remove_level_one_trackers = bool(
1✔
666
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
667
    )
668
    (
1✔
669
        forwarded_email,
670
        issues,
671
        level_one_trackers_removed,
672
        has_html,
673
        has_text,
674
    ) = _convert_to_forwarded_email(
675
        incoming_email_bytes=incoming_email_bytes,
676
        headers=headers,
677
        to_address=to_address,
678
        from_address=from_address,
679
        language=user_profile.language,
680
        has_premium=user_profile.has_premium,
681
        sample_trackers=sample_trackers,
682
        remove_level_one_trackers=remove_level_one_trackers,
683
    )
684

685
    if has_html:
1✔
686
        incr_if_enabled("email_with_html_content", 1)
1✔
687
    if has_text:
1✔
688
        incr_if_enabled("email_with_text_content", 1)
1✔
689
    if issues:
1✔
690
        info_logger.warning(
1✔
691
            "_handle_received: forwarding issues", extra={"issues": issues}
692
        )
693

694
    # Send new email
695
    try:
1✔
696
        ses_response = ses_send_raw_email(
1✔
697
            source_address=reply_address,
698
            destination_address=destination_address,
699
            message=forwarded_email,
700
        )
701
    except ClientError:
1✔
702
        # 503 service unavailable reponse to SNS so it can retry
703
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
704

705
    message_id = ses_response["MessageId"]
1✔
706
    _store_reply_record(mail, message_id, address)
1✔
707

708
    user_profile.update_abuse_metric(
1✔
709
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
710
    )
711
    address.num_forwarded += 1
1✔
712
    address.last_used_at = datetime.now(timezone.utc)
1✔
713
    if level_one_trackers_removed:
1!
714
        address.num_level_one_trackers_blocked = (
×
715
            address.num_level_one_trackers_blocked or 0
716
        ) + level_one_trackers_removed
717
    address.save(
1✔
718
        update_fields=[
719
            "num_forwarded",
720
            "last_used_at",
721
            "block_list_emails",
722
            "num_level_one_trackers_blocked",
723
        ]
724
    )
725

726
    # TODO?: Place the following code at the beginning, so the notification of the OTP can come
727
    # earlier then when the user receives the email. For now though, I will leave it here
728
    # because we want to atleast send the email, incase this new feature causes an unhandled error.
729

730
    # Only used within the add-on if the user has otp notifications toggled.
731
    otp_code = _naive_check_and_get_otp_code(
1✔
732
        incoming_email_bytes=incoming_email_bytes, headers=headers
733
    )
734
    if otp_code:
1✔
735
        # We set a timeout here so that when users open the add-on after using their masks
736
        # in a different context, they won't be notified of an old potential OTP,
737
        # since it will have been cleared.
738
        cache.set(
1✔
739
            key=f"{user_profile.user.id}_otp_code",
740
            value={"otp_code": otp_code, "mask": to_address},
741
            timeout=120,
742
        )
743

744
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
745

746

747
def _naive_check_and_get_otp_code(
1✔
748
    incoming_email_bytes: bytes, headers: OutgoingHeaders
749
) -> str:
750
    # Some of these keywords could potentially too naive, instead we could choose to focus on word combinations instead to mitigate false positives
751
    otp_keywords = set(
1✔
752
        {
753
            "authorization",
754
            "authorize",
755
            "security",
756
            "secure",
757
            "authenticate",
758
            "authentication",
759
            "verify",
760
            "confirmation",
761
            "verification",
762
            "recover",
763
            "recovery",
764
            "otp",
765
            "passcode",
766
            "password",
767
            "pin",
768
        }
769
    )
770
    otp_multi_keywords = set(
1✔
771
        {
772
            "secure password",
773
            "secure code",
774
            "one time",
775
            "one time code",
776
            "one time passcode",
777
            "confirmation code",
778
            "authentication code",
779
            "verification code",
780
            "one time password",
781
        }
782
    )
783

784
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
785
    assert isinstance(email, EmailMessage)
1✔
786
    subject_text_cleaned = _clean_text_for_otp_analysis(headers["Subject"])
1✔
787
    subject_as_set = set(subject_text_cleaned.split(" "))
1✔
788

789
    subject_keywords = subject_as_set.intersection(otp_keywords)
1✔
790
    _find_and_add_key_phrases(
1✔
791
        subject_text_cleaned, otp_multi_keywords, subject_keywords
792
    )
793

794
    # TODO?: We could do the following in one pass within _convert_to_forwarded_email,
795
    # instead of duplicating some the work being done there here. Would require some clean up, and potentially
796
    # adding some more helper functions.
797

798
    # If the set is not empty
799
    if subject_keywords:
1!
NEW
800
        otp_code = _naive_detect_potential_otp(
×
801
            subject_text_cleaned, subject_keywords.pop()
802
        )
NEW
803
        if otp_code:
×
NEW
804
            return otp_code
×
805

806
    body = email.get_body("plain")
1✔
807
    if body:
1✔
808
        assert isinstance(body, EmailMessage)
1✔
809
        body_text = body.get_content()
1✔
810
        # Cleaned text is lowered and only contains alphanumeric characters
811
        body_text_cleaned = _clean_text_for_otp_analysis(body_text)
1✔
812
        body_as_set = set(body_text_cleaned.split(" "))
1✔
813
        body_keywords = body_as_set.intersection(otp_keywords)
1✔
814
        _find_and_add_key_phrases(body_text_cleaned, otp_multi_keywords, body_keywords)
1✔
815

816
        if body_keywords:
1✔
817
            otp_code = _naive_detect_potential_otp(
1✔
818
                body_text_cleaned, body_keywords.pop()
819
            )
820
            if otp_code:
1✔
821
                return otp_code
1✔
822

823
    html_body = email.get_body("html")
1✔
824
    if html_body:
1✔
825
        assert isinstance(html_body, EmailMessage)
1✔
826
        html_content = html_body.get_content()
1✔
827
        # Removing tags and newlines from html content, and seperates the words with a space.
828
        html_as_text = BeautifulSoup(html_content, "html.parser").get_text(
1✔
829
            " ", strip=True
830
        )
831
        html_text_cleaned = _clean_text_for_otp_analysis(html_as_text)
1✔
832
        html_text_as_set = set(html_text_cleaned.split(" "))
1✔
833
        html_text_keywords = html_text_as_set.intersection(otp_keywords)
1✔
834
        _find_and_add_key_phrases(
1✔
835
            html_text_cleaned, otp_multi_keywords, html_text_keywords
836
        )
837
        if html_text_keywords:
1✔
838
            otp_code = _naive_detect_potential_otp(
1✔
839
                html_text_cleaned, html_text_keywords.pop()
840
            )
841
            return otp_code
1✔
842

843
    return ""
1✔
844

845

846
def _find_and_add_key_phrases(
1✔
847
    text: str, keywords_to_check: set, all_keywords: set
848
) -> None:
849
    """
850
    Looks for words in 'keywords_to_check' that are in 'text'.
851
    If found, we add it to all_keywords.
852

853
    Args:
854
        text (str): Text
855
        keywords_to_check (set): Keywords being looked for within the text
856
        all_keywords (set): If keywords are found, add it to the all_keywords set
857
    """
858
    for keyword in keywords_to_check:
1✔
859
        if text.find(keyword) != -1:
1✔
860
            all_keywords.add(keyword)
1✔
861

862

863
def _clean_text_for_otp_analysis(text: str) -> str:
1✔
864
    return re.sub(r"[^A-Za-z0-9 ]", "", text.lower().replace("\n", " "))
1✔
865

866

867
def _naive_detect_potential_otp(text: str, keyword: str) -> str:
1✔
868
    """
869
    Looks for a 6 or 4 digit code that is closest to the a random keyword that was contained in the text.
870

871
    Closest is defined as the absolute value of the index in 'text' of the last letter of the keyword subtracted by
872
    the index in 'text' of the last digit of the 4 or 6 digit code we found.
873

874
    Args:
875
        text (str): Cleaned text with no whitespace or newline characters
876

877
    Returns:
878
        str: A string containing the OTP code, or an empty string if nothing was found
879
    """
880
    potential_otp = ""
1✔
881
    minimum_distance = float("inf")
1✔
882
    minimum_distance_otp = ""
1✔
883
    keyword_end_index = text.find(keyword) + len(keyword) - 1
1✔
884

885
    for i in range(0, len(text)):
1✔
886
        if text[i].isdigit():
1✔
887
            potential_otp += text[i]
1✔
888
        else:
889
            potential_otp = ""
1✔
890
            continue
1✔
891

892
        # We are checking whether the code is the correct length, and that it seperated by spaces on atleast one end
893
        # i.e. " 9999 " or "9999 " or " 9999" is valid.
894
        six_digits_valid = (
1✔
895
            len(potential_otp) == 6
896
            and (i == len(text) - 1 or text[i + 1] == " ")
897
            and (i == 5 or (i > 5 and text[i - 6] == " "))
898
        )
899
        four_digits_valid = (
1✔
900
            len(potential_otp) == 4
901
            and (i == len(text) - 1 or text[i + 1] == " ")
902
            and (i == 3 or (i > 3 and text[i - 4] == " "))
903
        )
904

905
        if six_digits_valid or four_digits_valid:
1✔
906
            distance_to_keyword = abs(i - keyword_end_index)
1✔
907

908
            if distance_to_keyword < minimum_distance:
1✔
909
                minimum_distance_otp = potential_otp
1✔
910
                minimum_distance = distance_to_keyword
1✔
911

912
    return minimum_distance_otp
1✔
913

914

915
def _get_verdict(receipt, verdict_type):
1✔
916
    return receipt["%sVerdict" % verdict_type]["status"]
1✔
917

918

919
def _check_email_from_list(headers):
1✔
920
    for header in headers:
1!
921
        if header["name"].lower().startswith("list-"):
1!
922
            return True
1✔
923
    return False
×
924

925

926
def _record_receipt_verdicts(receipt, state):
1✔
927
    verdict_tags = []
1✔
928
    for key in sorted(receipt.keys()):
1✔
929
        if key.endswith("Verdict"):
1✔
930
            value = receipt[key]["status"]
1✔
931
            verdict_tags.append(f"{key}:{value}")
1✔
932
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
933
        elif key == "dmarcPolicy":
1✔
934
            value = receipt[key]
1✔
935
            verdict_tags.append(f"{key}:{value}")
1✔
936
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
937

938

939
def _get_message_id_from_headers(headers):
1✔
940
    message_id = None
1✔
941
    for header in headers:
1✔
942
        if header["name"].lower() == "message-id":
1✔
943
            message_id = header["value"]
1✔
944
    return message_id
1✔
945

946

947
def _get_keys_from_headers(headers):
1✔
948
    in_reply_to = None
1✔
949
    for header in headers:
1✔
950
        if header["name"].lower() == "in-reply-to":
1✔
951
            in_reply_to = header["value"]
1✔
952
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
953
            return derive_reply_keys(message_id_bytes)
1✔
954

955
        if header["name"].lower() == "references":
1✔
956
            message_ids = header["value"]
1✔
957
            for message_id in message_ids.split(" "):
1✔
958
                message_id_bytes = get_message_id_bytes(message_id)
1✔
959
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
960
                try:
1✔
961
                    # FIXME: calling code is likely to duplicate this query
962
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
963
                    return lookup_key, encryption_key
1✔
964
                except Reply.DoesNotExist:
1✔
965
                    pass
1✔
966
            raise Reply.DoesNotExist
1✔
967
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
968
    raise ReplyHeadersNotFound
1✔
969

970

971
def _get_reply_record_from_lookup_key(lookup_key):
1✔
972
    lookup = b64_lookup_key(lookup_key)
1✔
973
    return Reply.objects.get(lookup=lookup)
1✔
974

975

976
def _strip_localpart_tag(address):
1✔
977
    [localpart, domain] = address.split("@")
1✔
978
    subaddress_parts = localpart.split("+")
1✔
979
    return f"{subaddress_parts[0]}@{domain}"
1✔
980

981

982
_TransportType = Literal["sns", "s3"]
1✔
983

984

985
def _get_email_bytes(
1✔
986
    message_json: AWS_SNSMessageJSON,
987
) -> tuple[bytes, _TransportType, float]:
988
    with Timer(logger=None) as load_timer:
1✔
989
        if "content" in message_json:
1✔
990
            # email content in sns message
991
            message_content = message_json["content"].encode("utf-8")
1✔
992
            transport: Literal["sns", "s3"] = "sns"
1✔
993
        else:
994
            # assume email content in S3
995
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
996
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
997
            transport = "s3"
1✔
998
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
999
    load_time_s = round(load_timer.last, 3)
1✔
1000
    return (message_content, transport, load_time_s)
1✔
1001

1002

1003
def _convert_to_forwarded_email(
1✔
1004
    incoming_email_bytes: bytes,
1005
    headers: OutgoingHeaders,
1006
    to_address: str,
1007
    from_address: str,
1008
    language: str,
1009
    has_premium: bool,
1010
    sample_trackers: bool,
1011
    remove_level_one_trackers: bool,
1012
    now: datetime | None = None,
1013
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
1014
    """
1015
    Convert an email (as bytes) to a forwarded email.
1016

1017
    Return is a tuple:
1018
    - email - The forwarded email
1019
    - issues - Any detected issues in conversion
1020
    - level_one_trackers_removed (int) - Number of trackers removed
1021
    - has_html - True if the email has an HTML representation
1022
    - has_text - True if the email has a plain text representation
1023
    """
1024
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
1025
    # python/typeshed issue 2418
1026
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
1027
    # policy.default.message_factory is EmailMessage
1028
    assert isinstance(email, EmailMessage)
1✔
1029

1030
    # Replace headers in the original email
1031
    header_issues = _replace_headers(email, headers)
1✔
1032

1033
    # Find and replace text content
1034
    text_body = email.get_body("plain")
1✔
1035
    text_content = None
1✔
1036
    has_text = False
1✔
1037
    if text_body:
1✔
1038
        has_text = True
1✔
1039
        assert isinstance(text_body, EmailMessage)
1✔
1040
        text_content = text_body.get_content()
1✔
1041
        new_text_content = _convert_text_content(text_content, to_address)
1✔
1042
        text_body.set_content(new_text_content)
1✔
1043

1044
    # Find and replace HTML content
1045
    html_body = email.get_body("html")
1✔
1046
    level_one_trackers_removed = 0
1✔
1047
    has_html = False
1✔
1048
    if html_body:
1✔
1049
        has_html = True
1✔
1050
        assert isinstance(html_body, EmailMessage)
1✔
1051
        html_content = html_body.get_content()
1✔
1052
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1053
            html_content,
1054
            to_address,
1055
            from_address,
1056
            language,
1057
            has_premium,
1058
            sample_trackers,
1059
            remove_level_one_trackers,
1060
        )
1061
        html_body.set_content(new_content, subtype="html")
1✔
1062
    elif text_content:
1!
1063
        # Try to use the text content to generate HTML content
1064
        html_content = urlize_and_linebreaks(text_content)
1✔
1065
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1066
            html_content,
1067
            to_address,
1068
            from_address,
1069
            language,
1070
            has_premium,
1071
            sample_trackers,
1072
            remove_level_one_trackers,
1073
        )
1074
        assert isinstance(text_body, EmailMessage)
1✔
1075
        try:
1✔
1076
            text_body.add_alternative(new_content, subtype="html")
1✔
1077
        except TypeError as e:
×
1078
            out = StringIO()
×
1079
            _structure(email, fp=out)
×
1080
            info_logger.error(
×
1081
                "Adding HTML alternate failed",
1082
                extra={"exception": str(e), "structure": out.getvalue()},
1083
            )
1084

1085
    issues: EmailForwardingIssues = {}
1✔
1086
    if header_issues:
1✔
1087
        issues["headers"] = header_issues
1✔
1088
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
1089

1090

1091
def _replace_headers(
1✔
1092
    email: EmailMessage, headers: OutgoingHeaders
1093
) -> EmailHeaderIssues:
1094
    """
1095
    Replace the headers in email with new headers.
1096

1097
    This replaces headers in the passed email object, rather than returns an altered
1098
    copy. The primary reason is that the Python email package can read an email with
1099
    non-compliant headers or content, but can't write it. A read/write is required to
1100
    create a copy that we then alter. This code instead alters the passed EmailMessage
1101
    object, making header-specific changes in try / except statements.
1102

1103
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1104
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1105
    nice to handle the non-compliant headers without crashing before we add a source of
1106
    memory-related crashes.
1107
    """
1108
    # Look for headers to drop
1109
    to_drop: list[str] = []
1✔
1110
    replacements: set[str] = set(_k.lower() for _k in headers.keys())
1✔
1111
    issues: EmailHeaderIssues = defaultdict(list)
1✔
1112

1113
    # Detect non-compliant headers in incoming emails
1114
    for header in email.keys():
1✔
1115
        try:
1✔
1116
            value = email[header]
1✔
1117
        except Exception as e:
1✔
1118
            issues["incoming"].append((header, {"exception_on_read": repr(e)}))
1✔
1119
            value = None
1✔
1120
        if getattr(value, "defects", None):
1✔
1121
            issues["incoming"].append(
1✔
1122
                (
1123
                    header,
1124
                    {
1125
                        "defect_count": len(value.defects),
1126
                        "parsed_value": str(value),
1127
                        "unstructured_value": str(value.as_unstructured),
1128
                    },
1129
                )
1130
            )
1131

1132
    # Collect headers that will not be forwarded
1133
    for header in email.keys():
1✔
1134
        header_lower = header.lower()
1✔
1135
        if (
1✔
1136
            header_lower not in replacements
1137
            and header_lower != "mime-version"
1138
            and not header_lower.startswith("content-")
1139
        ):
1140
            to_drop.append(header)
1✔
1141

1142
    # Drop headers that should be dropped
1143
    for header in to_drop:
1✔
1144
        del email[header]
1✔
1145

1146
    # Replace the requested headers
1147
    for header, value in headers.items():
1✔
1148
        del email[header]
1✔
1149
        try:
1✔
1150
            email[header] = value
1✔
1151
        except Exception as e:
×
1152
            issues["outgoing"].append(
×
1153
                (header, {"exception_on_write": repr(e), "value": value})
1154
            )
1155
            continue
×
1156
        try:
1✔
1157
            parsed_value = email[header]
1✔
1158
        except Exception as e:
×
1159
            issues["outgoing"].append((header, {"exception_on_read": repr(e)}))
×
1160
            continue
×
1161
        if parsed_value.defects:
1!
1162
            issues["outgoing"].append(
×
1163
                (
1164
                    header,
1165
                    {
1166
                        "defect_count": len(parsed_value.defects),
1167
                        "parsed_value": str(parsed_value),
1168
                        "unstructured_value": str(parsed_value.as_unstructured),
1169
                    },
1170
                )
1171
            )
1172

1173
    return dict(issues)
1✔
1174

1175

1176
def _convert_html_content(
1✔
1177
    html_content: str,
1178
    to_address: str,
1179
    from_address: str,
1180
    language: str,
1181
    has_premium: bool,
1182
    sample_trackers: bool,
1183
    remove_level_one_trackers: bool,
1184
    now: datetime | None = None,
1185
) -> tuple[str, int]:
1186
    # frontend expects a timestamp in milliseconds
1187
    now = now or datetime.now(timezone.utc)
1✔
1188
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1189

1190
    # scramble alias so that clients don't recognize it
1191
    # and apply default link styles
1192
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1193

1194
    # sample tracker numbers
1195
    if sample_trackers:
1!
1196
        count_all_trackers(html_content)
×
1197

1198
    tracker_report_link = ""
1✔
1199
    removed_count = 0
1✔
1200
    if remove_level_one_trackers:
1!
1201
        html_content, tracker_details = remove_trackers(
×
1202
            html_content, from_address, datetime_now_ms
1203
        )
1204
        removed_count = tracker_details["tracker_removed"]
×
1205
        tracker_report_details = {
×
1206
            "sender": from_address,
1207
            "received_at": datetime_now_ms,
1208
            "trackers": tracker_details["level_one"]["trackers"],
1209
        }
1210
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1211
            tracker_report_details
1212
        )
1213

1214
    wrapped_html = wrap_html_email(
1✔
1215
        original_html=html_content,
1216
        language=language,
1217
        has_premium=has_premium,
1218
        display_email=display_email,
1219
        tracker_report_link=tracker_report_link,
1220
        num_level_one_email_trackers_removed=removed_count,
1221
    )
1222
    return wrapped_html, removed_count
1✔
1223

1224

1225
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1226
    relay_header_text = (
1✔
1227
        "This email was sent to your alias "
1228
        f"{to_address}. To stop receiving emails sent to this alias, "
1229
        "update the forwarding settings in your dashboard.\n"
1230
        "---Begin Email---\n"
1231
    )
1232
    wrapped_text = relay_header_text + text_content
1✔
1233
    return wrapped_text
1✔
1234

1235

1236
def _build_reply_requires_premium_email(
1✔
1237
    from_address: str,
1238
    reply_record: Reply,
1239
    message_id: str | None,
1240
    decrypted_metadata: dict[str, Any] | None,
1241
) -> EmailMessage:
1242
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1243
    # will forward.  So, tell the user we forwarded it.
1244
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1245
    sender: str | None = ""
1✔
1246
    if decrypted_metadata is not None:
1!
1247
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1248
    ctx = {
1✔
1249
        "sender": sender or "",
1250
        "forwarded": forwarded,
1251
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1252
    }
1253
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1254
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1255

1256
    # Create the message
1257
    msg = EmailMessage()
1✔
1258
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1259
    msg["From"] = get_reply_to_address()
1✔
1260
    msg["To"] = from_address
1✔
1261
    if message_id:
1!
1262
        msg["In-Reply-To"] = message_id
1✔
1263
        msg["References"] = message_id
1✔
1264
    msg.set_content(text_body)
1✔
1265
    msg.add_alternative(html_body, subtype="html")
1✔
1266
    return msg
1✔
1267

1268

1269
def _set_forwarded_first_reply(profile):
1✔
1270
    profile.forwarded_first_reply = True
1✔
1271
    profile.save()
1✔
1272

1273

1274
def _send_reply_requires_premium_email(
1✔
1275
    from_address: str,
1276
    reply_record: Reply,
1277
    message_id: str | None,
1278
    decrypted_metadata: dict[str, Any] | None,
1279
) -> None:
1280
    msg = _build_reply_requires_premium_email(
×
1281
        from_address, reply_record, message_id, decrypted_metadata
1282
    )
1283
    try:
×
1284
        ses_send_raw_email(
×
1285
            source_address=get_reply_to_address(premium=False),
1286
            destination_address=from_address,
1287
            message=msg,
1288
        )
1289
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1290
        # So, updated the DB.
1291
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1292
    except ClientError as e:
×
1293
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1294
    incr_if_enabled("free_user_reply_attempt", 1)
×
1295

1296

1297
def _reply_allowed(
1✔
1298
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1299
):
1300
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1301
    reply_record_email = reply_record.address.user.email
1✔
1302
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1303
    if (from_address == reply_record_email) or (
1!
1304
        stripped_from_address == stripped_reply_record_address
1305
    ):
1306
        # This is a Relay user replying to an external sender;
1307

1308
        if reply_record.profile.is_flagged:
1!
1309
            return False
×
1310

1311
        if reply_record.owner_has_premium:
1!
1312
            return True
1✔
1313

1314
        # if we haven't forwarded a first reply for this user, return True to allow
1315
        # this first reply
1316
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
1317
        _send_reply_requires_premium_email(
×
1318
            from_address, reply_record, message_id, decrypted_metadata
1319
        )
1320
        return allow_first_reply
×
1321
    else:
1322
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1323
        # premium Relay user
1324
        try:
×
1325
            address = _get_address(to_address)
×
1326
            if address.user.profile.has_premium:
×
1327
                return True
×
1328
        except ObjectDoesNotExist:
×
1329
            return False
×
1330
    incr_if_enabled("free_user_reply_attempt", 1)
×
1331
    return False
×
1332

1333

1334
def _handle_reply(
1✔
1335
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1336
) -> HttpResponse:
1337
    """
1338
    Handle a reply from a Relay user to an external email.
1339

1340
    Returns (may be incomplete):
1341
    * 200 if the reply was sent
1342
    * 400 if the In-Reply-To and References headers are missing, none of the References
1343
      headers are a reply record, or the SES client raises an error
1344
    * 403 if the Relay user is not allowed to reply
1345
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1346
      the database
1347
    * 503 if the S3 client returns an error (other than not found), or the SES client
1348
      returns an error
1349

1350
    TODO: Return a more appropriate status object (see _handle_received)
1351
    TODO: Document metrics emitted
1352
    """
1353
    mail = message_json["mail"]
1✔
1354
    try:
1✔
1355
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1356
    except ReplyHeadersNotFound:
1✔
1357
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1358
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1359

1360
    try:
1✔
1361
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1362
    except Reply.DoesNotExist:
1✔
1363
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1364
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1365

1366
    address = reply_record.address
1✔
1367
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1368
    decrypted_metadata = json.loads(
1✔
1369
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1370
    )
1371
    if not _reply_allowed(
1!
1372
        from_address, to_address, reply_record, message_id, decrypted_metadata
1373
    ):
1374
        # TODO: should we return a 200 OK here?
1375
        return HttpResponse("Relay replies require a premium account", status=403)
×
1376

1377
    outbound_from_address = address.full_address
1✔
1378
    incr_if_enabled("reply_email", 1)
1✔
1379
    subject = mail["commonHeaders"].get("subject", "")
1✔
1380
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1381
    headers: OutgoingHeaders = {
1✔
1382
        "Subject": subject,
1383
        "From": outbound_from_address,
1384
        "To": to_address,
1385
        "Reply-To": outbound_from_address,
1386
    }
1387

1388
    try:
1✔
1389
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1390
    except ClientError as e:
×
1391
        if e.response["Error"].get("Code", "") == "NoSuchKey":
×
1392
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
×
1393
            return HttpResponse("Email not in S3", status=404)
×
1394
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
×
1395
        # we are returning a 500 so that SNS can retry the email processing
1396
        return HttpResponse("Cannot fetch the message content from S3", status=503)
×
1397

1398
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1399
    assert isinstance(email, EmailMessage)
1✔
1400

1401
    # Convert to a reply email
1402
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1403
    _replace_headers(email, headers)
1✔
1404

1405
    try:
1✔
1406
        ses_send_raw_email(
1✔
1407
            source_address=outbound_from_address,
1408
            destination_address=to_address,
1409
            message=email,
1410
        )
1411
    except ClientError as e:
×
1412
        logger.error("ses_client_error", extra=e.response["Error"])
×
1413
        return HttpResponse("SES client error", status=400)
×
1414

1415
    reply_record.increment_num_replied()
1✔
1416
    profile = address.user.profile
1✔
1417
    profile.update_abuse_metric(replied=True)
1✔
1418
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1419

1420

1421
def _get_domain_address(local_portion: str, domain_portion: str) -> DomainAddress:
1✔
1422
    """
1423
    Find or create the DomainAddress for the parts of an email address.
1424

1425
    If the domain_portion is for a valid subdomain, a new DomainAddress
1426
    will be created and returned.
1427

1428
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1429

1430
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1431
    """
1432

1433
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1434
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1435
        incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1436
        raise ObjectDoesNotExist("Address does not exist")
1✔
1437
    try:
1✔
1438
        with transaction.atomic():
1✔
1439
            locked_profile = Profile.objects.select_for_update().get(
1✔
1440
                subdomain=address_subdomain
1441
            )
1442
            domain_numerical = get_domain_numerical(address_domain)
1✔
1443
            # filter DomainAddress because it may not exist
1444
            # which will throw an error with get()
1445
            domain_address = DomainAddress.objects.filter(
1✔
1446
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1447
            ).first()
1448
            if domain_address is None:
1✔
1449
                # TODO: Consider flows when a user generating alias on a fly
1450
                # was unable to receive an email due to user no longer being a
1451
                # premium user as seen in exception thrown on make_domain_address
1452
                domain_address = DomainAddress.make_domain_address(
1✔
1453
                    locked_profile, local_portion, True
1454
                )
1455
            domain_address.last_used_at = datetime.now(timezone.utc)
1✔
1456
            domain_address.save()
1✔
1457
            return domain_address
1✔
1458
    except Profile.DoesNotExist as e:
1✔
1459
        incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1460
        raise e
1✔
1461

1462

1463
def _get_address(address: str) -> RelayAddress | DomainAddress:
1✔
1464
    """
1465
    Find or create the RelayAddress or DomainAddress for an email address.
1466

1467
    If an unknown email address is for a valid subdomain, a new DomainAddress
1468
    will be created.
1469

1470
    On failure, raises exception based on Django's ObjectDoesNotExist:
1471
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1472
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1473
    * ObjectDoesNotExist - Unknown domain
1474
    """
1475

1476
    local_portion, domain_portion = address.split("@")
1✔
1477
    local_address = local_portion.lower()
1✔
1478
    domain = domain_portion.lower()
1✔
1479

1480
    # if the domain is not the site's 'top' relay domain,
1481
    # it may be for a user's subdomain
1482
    email_domains = get_domains_from_settings().values()
1✔
1483
    if domain not in email_domains:
1✔
1484
        return _get_domain_address(local_address, domain)
1✔
1485

1486
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1487
    try:
1✔
1488
        domain_numerical = get_domain_numerical(domain)
1✔
1489
        relay_address = RelayAddress.objects.get(
1✔
1490
            address=local_address, domain=domain_numerical
1491
        )
1492
        return relay_address
1✔
1493
    except RelayAddress.DoesNotExist as e:
1✔
1494
        try:
1✔
1495
            DeletedAddress.objects.get(
1✔
1496
                address_hash=address_hash(local_address, domain=domain)
1497
            )
1498
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1499
            # TODO: create a hard bounce receipt rule in SES
1500
        except DeletedAddress.DoesNotExist:
1✔
1501
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1502
        except DeletedAddress.MultipleObjectsReturned:
1✔
1503
            # not sure why this happens on stage but let's handle it
1504
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1505
        raise e
1✔
1506

1507

1508
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1509
    """
1510
    Handle an AWS SES bounce notification.
1511

1512
    For more information, see:
1513
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1514

1515
    Returns:
1516
    * 404 response if any email address does not match a user,
1517
    * 200 response if all match or none are given
1518

1519
    Emits a counter metric "email_bounce" with these tags:
1520
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1521
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1522
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1523
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1524

1525
    Emits an info log "bounce_notification", same data as metric, plus:
1526
    * bounce_action: 'action' from bounced recipient data, or None
1527
    * bounce_status: 'status' from bounced recipient data, or None
1528
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1529
    * bounce_extra: Extra data from bounce_recipient data, if any
1530
    * domain: User's real email address domain, if an address was given
1531

1532
    Emits a legacy log "bounced recipient domain: {domain}", with data from
1533
    bounced recipient data, without the email address.
1534
    """
1535
    bounce = message_json.get("bounce", {})
1✔
1536
    bounce_type = bounce.get("bounceType", "none")
1✔
1537
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1538
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1539

1540
    now = datetime.now(timezone.utc)
1✔
1541
    bounce_data = []
1✔
1542
    for recipient in bounced_recipients:
1✔
1543
        recipient_address = recipient.pop("emailAddress", None)
1✔
1544
        data = {
1✔
1545
            "bounce_type": bounce_type,
1546
            "bounce_subtype": bounce_subtype,
1547
            "bounce_action": recipient.pop("action", ""),
1548
            "bounce_status": recipient.pop("status", ""),
1549
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1550
            "user_match": "no_address",
1551
            "relay_action": "no_action",
1552
        }
1553
        if recipient:
1!
1554
            data["bounce_extra"] = recipient.copy()
×
1555
        bounce_data.append(data)
1✔
1556

1557
        if recipient_address is None:
1!
1558
            continue
×
1559

1560
        recipient_address = parseaddr(recipient_address)[1]
1✔
1561
        recipient_domain = recipient_address.split("@")[1]
1✔
1562
        data["domain"] = recipient_domain
1✔
1563

1564
        try:
1✔
1565
            user = User.objects.get(email=recipient_address)
1✔
1566
            profile = user.profile
1✔
1567
            data["user_match"] = "found"
1✔
1568
        except User.DoesNotExist:
1✔
1569
            # TODO: handle bounce for a user who no longer exists
1570
            # add to SES account-wide suppression list?
1571
            data["user_match"] = "missing"
1✔
1572
            continue
1✔
1573

1574
        action = None
1✔
1575
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1576
            # if an email bounced as spam, set to auto block spam for this user
1577
            # and DON'T set them into bounce pause state
1578
            action = "auto_block_spam"
1✔
1579
            profile.auto_block_spam = True
1✔
1580
        elif bounce_type == "Permanent":
1✔
1581
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1582
            action = "hard_bounce"
1✔
1583
            profile.last_hard_bounce = now
1✔
1584
        elif bounce_type == "Transient":
1!
1585
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1586
            action = "soft_bounce"
1✔
1587
            profile.last_soft_bounce = now
1✔
1588
        if action:
1!
1589
            data["relay_action"] = action
1✔
1590
            profile.save()
1✔
1591

1592
    if not bounce_data:
1!
1593
        # Data when there are no identified recipients
1594
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1595

1596
    for data in bounce_data:
1✔
1597
        tags = {
1✔
1598
            "bounce_type": bounce_type,
1599
            "bounce_subtype": bounce_subtype,
1600
            "user_match": data["user_match"],
1601
            "relay_action": data["relay_action"],
1602
        }
1603
        incr_if_enabled(
1✔
1604
            "email_bounce",
1605
            1,
1606
            tags=[generate_tag(key, val) for key, val in tags.items()],
1607
        )
1608
        info_logger.info("bounce_notification", extra=data)
1✔
1609

1610
        # Legacy log, can be removed Q4 2023
1611
        recipient_domain = data.get("domain")
1✔
1612
        if recipient_domain:
1!
1613
            legacy_extra = {
1✔
1614
                "action": data.get("bounce_action"),
1615
                "status": data.get("bounce_status"),
1616
                "diagnosticCode": data.get("bounce_diagnostic"),
1617
            }
1618
            legacy_extra.update(data.get("bounce_extra", {}))
1✔
1619
            info_logger.info(
1✔
1620
                f"bounced recipient domain: {recipient_domain}", extra=legacy_extra
1621
            )
1622

1623
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1624
        return HttpResponse("Address does not exist", status=404)
1✔
1625
    return HttpResponse("OK", status=200)
1✔
1626

1627

1628
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1629
    """
1630
    Handle an AWS SES complaint notification.
1631

1632
    For more information, see:
1633
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1634

1635
    Returns:
1636
    * 404 response if any email address does not match a user,
1637
    * 200 response if all match or none are given
1638

1639
    Emits a counter metric "email_complaint" with these tags:
1640
    * complaint_subtype: 'onaccounsuppressionlist', or 'none' if omitted
1641
    * complaint_feedback - feedback enumeration from ISP or 'none'
1642
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1643
    * relay_action: 'no_action', 'auto_block_spam'
1644

1645
    Emits an info log "complaint_notification", same data as metric, plus:
1646
    * complaint_user_agent - identifies the client used to file the complaint
1647
    * complaint_extra - Extra data from complainedRecipients data, if any
1648
    * domain - User's domain, if an address was given
1649

1650
    Emits a legacy log "complaint_received", with data:
1651
    * recipient_domains: list of extracted user domains
1652
    * subtype: 'onaccounsuppressionlist', or 'none'
1653
    * feedback: feedback from ISP or 'none'
1654
    """
1655
    complaint = deepcopy(message_json.get("complaint", {}))
1✔
1656
    complained_recipients = complaint.pop("complainedRecipients", [])
1✔
1657
    subtype = complaint.pop("complaintSubType", None)
1✔
1658
    user_agent = complaint.pop("userAgent", None)
1✔
1659
    feedback = complaint.pop("complaintFeedbackType", None)
1✔
1660

1661
    complaint_data = []
1✔
1662
    for recipient in complained_recipients:
1✔
1663
        recipient_address = recipient.pop("emailAddress", None)
1✔
1664
        data = {
1✔
1665
            "complaint_subtype": subtype,
1666
            "complaint_user_agent": user_agent,
1667
            "complaint_feedback": feedback,
1668
            "user_match": "no_address",
1669
            "relay_action": "no_action",
1670
        }
1671
        if recipient:
1!
1672
            data["complaint_extra"] = recipient.copy()
×
1673
        complaint_data.append(data)
1✔
1674

1675
        if recipient_address is None:
1!
1676
            continue
×
1677

1678
        recipient_address = parseaddr(recipient_address)[1]
1✔
1679
        recipient_domain = recipient_address.split("@")[1]
1✔
1680
        data["domain"] = recipient_domain
1✔
1681

1682
        try:
1✔
1683
            user = User.objects.get(email=recipient_address)
1✔
1684
            profile = user.profile
1✔
1685
            data["user_match"] = "found"
1✔
1686
        except User.DoesNotExist:
×
1687
            data["user_match"] = "missing"
×
1688
            continue
×
1689

1690
        data["relay_action"] = "auto_block_spam"
1✔
1691
        profile.auto_block_spam = True
1✔
1692
        profile.save()
1✔
1693

1694
    if not complaint_data:
1!
1695
        # Data when there are no identified recipients
1696
        complaint_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1697

1698
    for data in complaint_data:
1✔
1699
        tags = {
1✔
1700
            "complaint_subtype": subtype or "none",
1701
            "complaint_feedback": feedback or "none",
1702
            "user_match": data["user_match"],
1703
            "relay_action": data["relay_action"],
1704
        }
1705
        incr_if_enabled(
1✔
1706
            "email_complaint",
1707
            1,
1708
            tags=[generate_tag(key, val) for key, val in tags.items()],
1709
        )
1710
        info_logger.info("complaint_notification", extra=data)
1✔
1711

1712
    # Legacy log, can be removed Q4 2023
1713
    domains = [data["domain"] for data in complaint_data if "domain" in data]
1✔
1714
    info_logger.info(
1✔
1715
        "complaint_received",
1716
        extra={
1717
            "recipient_domains": sorted(domains),
1718
            "subtype": subtype,
1719
            "feedback": feedback,
1720
        },
1721
    )
1722

1723
    if any(data["user_match"] == "missing" for data in complaint_data):
1!
1724
        return HttpResponse("Address does not exist", status=404)
×
1725
    return HttpResponse("OK", status=200)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc