• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 0dc159f6-4e4a-4180-94a8-c657384efbeb

04 Oct 2023 08:37PM CUT coverage: 74.774% (+0.06%) from 74.712%
0dc159f6-4e4a-4180-94a8-c657384efbeb

push

circleci

web-flow
Merge pull request #3955 from mozilla/fix-MPP-3420-noreversematch

fix MPP-3420: handle and log NoReverseMatch error

1910 of 2769 branches covered (0.0%)

Branch coverage included in aggregate %.

24 of 24 new or added lines in 2 files covered. (100.0%)

6016 of 7831 relevant lines covered (76.82%)

18.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.68
/emails/models.py
1
from collections import namedtuple
1✔
2
from datetime import datetime, timedelta, timezone
1✔
3
from hashlib import sha256
1✔
4
from typing import Optional
1✔
5
import logging
1✔
6
import random
1✔
7
import re
1✔
8
import string
1✔
9
import uuid
1✔
10
from django.apps import apps
1✔
11
from django.conf import settings
1✔
12
from django.contrib.auth.models import User
1✔
13
from django.core.exceptions import BadRequest
1✔
14
from django.core.validators import MinLengthValidator
1✔
15
from django.db import models, transaction
1✔
16
from django.dispatch import receiver
1✔
17
from django.utils.translation.trans_real import (
1✔
18
    parse_accept_lang_header,
19
    get_supported_language_variant,
20
)
21

22
from rest_framework.authtoken.models import Token
1✔
23

24
from api.exceptions import ErrorContextType, RelayAPIException
1✔
25
from privaterelay.plans import get_premium_countries
1✔
26
from privaterelay.utils import (
1✔
27
    AcceptLanguageError,
28
    flag_is_active_in_task,
29
    guess_country_from_accept_lang,
30
)
31

32

33
emails_config = apps.get_app_config("emails")
1✔
34
logger = logging.getLogger("events")
1✔
35
abuse_logger = logging.getLogger("abusemetrics")
1✔
36

37
BounceStatus = namedtuple("BounceStatus", "paused type")
1✔
38

39

40
def get_domains_from_settings():
1✔
41
    # HACK: detect if code is running in django tests
42
    if "testserver" in settings.ALLOWED_HOSTS:
1!
43
        return {"RELAY_FIREFOX_DOMAIN": "default.com", "MOZMAIL_DOMAIN": "test.com"}
1✔
44
    return {
×
45
        "RELAY_FIREFOX_DOMAIN": settings.RELAY_FIREFOX_DOMAIN,
46
        "MOZMAIL_DOMAIN": settings.MOZMAIL_DOMAIN,
47
    }
48

49

50
DOMAIN_CHOICES = [(1, "RELAY_FIREFOX_DOMAIN"), (2, "MOZMAIL_DOMAIN")]
1✔
51
PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"]
1✔
52

53

54
def valid_available_subdomain(subdomain, *args, **kwargs):
1✔
55
    if not subdomain:
1✔
56
        raise CannotMakeSubdomainException("error-subdomain-cannot-be-empty-or-null")
1✔
57
    # valid subdomains:
58
    #   can't start or end with a hyphen
59
    #   must be 1-63 alphanumeric characters and/or hyphens
60
    subdomain = subdomain.lower()
1✔
61
    valid_subdomain_pattern = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$")
1✔
62
    valid = valid_subdomain_pattern.match(subdomain) is not None
1✔
63
    #   can't have "bad" words in them
64
    bad_word = has_bad_words(subdomain)
1✔
65
    #   can't have "blocked" words in them
66
    blocked_word = is_blocklisted(subdomain)
1✔
67
    #   can't be taken by someone else
68
    taken = (
1✔
69
        RegisteredSubdomain.objects.filter(
70
            subdomain_hash=hash_subdomain(subdomain)
71
        ).count()
72
        > 0
73
    )
74
    if not valid or bad_word or blocked_word or taken:
1✔
75
        raise CannotMakeSubdomainException("error-subdomain-not-available")
1✔
76
    return True
1✔
77

78

79
# This historical function is referenced in migration 0029_profile_add_deleted_metric_and_changeserver_storage_default
80
def default_server_storage():
1✔
81
    return True
×
82

83

84
def default_domain_numerical():
1✔
85
    domains = get_domains_from_settings()
1✔
86
    domain = domains["MOZMAIL_DOMAIN"]
1✔
87
    return get_domain_numerical(domain)
1✔
88

89

90
class Profile(models.Model):
1✔
91
    user = models.OneToOneField(User, on_delete=models.CASCADE)
1✔
92
    api_token = models.UUIDField(default=uuid.uuid4)
1✔
93
    num_address_deleted = models.PositiveIntegerField(default=0)
1✔
94
    date_subscribed = models.DateTimeField(blank=True, null=True)
1✔
95
    date_subscribed_phone = models.DateTimeField(blank=True, null=True)
1✔
96
    # TODO: delete date_phone_subscription_checked in favor of date_phone_subscription_next_reset
97
    date_phone_subscription_checked = models.DateTimeField(blank=True, null=True)
1✔
98
    date_phone_subscription_start = models.DateTimeField(blank=True, null=True)
1✔
99
    date_phone_subscription_reset = models.DateTimeField(blank=True, null=True)
1✔
100
    date_phone_subscription_end = models.DateTimeField(blank=True, null=True)
1✔
101
    address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
102
    last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
103
    last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
104
    last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
105
    num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
106
    num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
107
    num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField(
1✔
108
        default=0, null=True
109
    )
110
    num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
111
    num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
112
    subdomain = models.CharField(
1✔
113
        blank=True,
114
        null=True,
115
        unique=True,
116
        max_length=63,
117
        db_index=True,
118
        validators=[valid_available_subdomain],
119
    )
120
    # Whether we store the user's alias labels in the server
121
    server_storage = models.BooleanField(default=True)
1✔
122
    # Whether we store the caller/sender log for the user's relay number
123
    store_phone_log = models.BooleanField(default=True)
1✔
124
    # TODO: Data migration to set null to false
125
    # TODO: Schema migration to remove null=True
126
    remove_level_one_email_trackers = models.BooleanField(null=True, default=False)
1✔
127
    onboarding_state = models.PositiveIntegerField(default=0)
1✔
128
    auto_block_spam = models.BooleanField(default=False)
1✔
129
    forwarded_first_reply = models.BooleanField(default=False)
1✔
130
    # Empty string means the profile was created through relying party flow
131
    created_by = models.CharField(blank=True, null=True, max_length=63)
1✔
132
    sent_welcome_email = models.BooleanField(default=False)
1✔
133

134
    def __str__(self):
1✔
135
        return "%s Profile" % self.user
×
136

137
    def save(self, *args, **kwargs):
1✔
138
        # always lower-case the subdomain before saving it
139
        # TODO: change subdomain field as a custom field inheriting from
140
        # CharField to validate constraints on the field update too
141
        if self.subdomain and not self.subdomain.islower():
1✔
142
            self.subdomain = self.subdomain.lower()
1✔
143
        ret = super().save(*args, **kwargs)
1✔
144
        # any time a profile is saved with server_storage False, delete the
145
        # appropriate server-stored Relay address data.
146
        if not self.server_storage:
1✔
147
            relay_addresses = RelayAddress.objects.filter(user=self.user)
1✔
148
            relay_addresses.update(description="", generated_for="", used_on="")
1✔
149
        if settings.PHONES_ENABLED:
1!
150
            # any time a profile is saved with store_phone_log False, delete the
151
            # appropriate server-stored InboundContact records
152
            from phones.models import InboundContact, RelayNumber
1✔
153

154
            if not self.store_phone_log:
1✔
155
                try:
1✔
156
                    relay_number = RelayNumber.objects.get(user=self.user)
1✔
157
                    InboundContact.objects.filter(relay_number=relay_number).delete()
1✔
158
                except RelayNumber.DoesNotExist:
1✔
159
                    pass
1✔
160
        return ret
1✔
161

162
    @property
1✔
163
    def language(self):
1✔
164
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
165
            for accept_lang, _ in parse_accept_lang_header(
1!
166
                self.fxa.extra_data.get("locale")
167
            ):
168
                try:
1✔
169
                    return get_supported_language_variant(accept_lang)
1✔
170
                except LookupError:
×
171
                    continue
×
172
        return "en"
1✔
173

174
    # This method returns whether the locale associated with the user's Mozilla account
175
    # includes a country code from a Premium country. This is less accurate than using
176
    # get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so
177
    # prefer using that if a request context is available. In other contexts, for
178
    # example when sending an email, this method can be useful.
179
    @property
1✔
180
    def fxa_locale_in_premium_country(self) -> bool:
1✔
181
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
182
            try:
1✔
183
                country = guess_country_from_accept_lang(self.fxa.extra_data["locale"])
1✔
184
            except AcceptLanguageError:
1✔
185
                return False
1✔
186
            premium_countries = get_premium_countries()
1✔
187
            if country in premium_countries:
1✔
188
                return True
1✔
189
        return False
1✔
190

191
    @property
1✔
192
    def avatar(self):
1✔
193
        return self.fxa.extra_data.get("avatar")
1✔
194

195
    @property
1✔
196
    def relay_addresses(self):
1✔
197
        return RelayAddress.objects.filter(user=self.user)
1✔
198

199
    @property
1✔
200
    def domain_addresses(self):
1✔
201
        return DomainAddress.objects.filter(user=self.user)
1✔
202

203
    @property
1✔
204
    def total_masks(self) -> int:
1✔
205
        ra_count: int = self.relay_addresses.count()
1✔
206
        da_count: int = self.domain_addresses.count()
1✔
207
        return ra_count + da_count
1✔
208

209
    @property
1✔
210
    def at_mask_limit(self) -> bool:
1✔
211
        if self.has_premium:
1✔
212
            return False
1✔
213
        ra_count: int = self.relay_addresses.count()
1✔
214
        return ra_count >= settings.MAX_NUM_FREE_ALIASES
1✔
215

216
    def check_bounce_pause(self):
1✔
217
        if self.last_hard_bounce:
1✔
218
            last_hard_bounce_allowed = datetime.now(timezone.utc) - timedelta(
1✔
219
                days=settings.HARD_BOUNCE_ALLOWED_DAYS
220
            )
221
            if self.last_hard_bounce > last_hard_bounce_allowed:
1✔
222
                return BounceStatus(True, "hard")
1✔
223
            self.last_hard_bounce = None
1✔
224
            self.save()
1✔
225
        if self.last_soft_bounce:
1✔
226
            last_soft_bounce_allowed = datetime.now(timezone.utc) - timedelta(
1✔
227
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
228
            )
229
            if self.last_soft_bounce > last_soft_bounce_allowed:
1✔
230
                return BounceStatus(True, "soft")
1✔
231
            self.last_soft_bounce = None
1✔
232
            self.save()
1✔
233
        return BounceStatus(False, "")
1✔
234

235
    @property
1✔
236
    def bounce_status(self):
1✔
237
        return self.check_bounce_pause()
1✔
238

239
    @property
1✔
240
    def next_email_try(self):
1✔
241
        bounce_pause, bounce_type = self.check_bounce_pause()
1✔
242

243
        if not bounce_pause:
1✔
244
            return datetime.now(timezone.utc)
1✔
245

246
        if bounce_type == "soft":
1✔
247
            return self.last_soft_bounce + timedelta(
1✔
248
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
249
            )
250

251
        return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS)
1✔
252

253
    @property
1✔
254
    def last_bounce_date(self):
1✔
255
        if self.last_hard_bounce:
1✔
256
            return self.last_hard_bounce
1✔
257
        if self.last_soft_bounce:
1✔
258
            return self.last_soft_bounce
1✔
259
        return None
1✔
260

261
    @property
1✔
262
    def at_max_free_aliases(self) -> bool:
1✔
263
        relay_addresses_count: int = self.relay_addresses.count()
1✔
264
        return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES
1✔
265

266
    @property
1✔
267
    def fxa(self):
1✔
268
        # Note: we are NOT using .filter() here because it invalidates
269
        # any profile instances that were queried with prefetch_related, which
270
        # we use in at least the profile view to minimize queries
271
        for sa in self.user.socialaccount_set.all():
1✔
272
            if sa.provider == "fxa":
1!
273
                return sa
1✔
274
        return None
1✔
275

276
    @property
1✔
277
    def display_name(self):
1✔
278
        # if display name is not set on FxA the
279
        # displayName key will not exist on the extra_data
280
        return self.fxa.extra_data.get("displayName")
1✔
281

282
    @property
1✔
283
    def custom_domain(self):
1✔
284
        return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}"
×
285

286
    @property
1✔
287
    def has_premium(self):
1✔
288
        # FIXME: as we don't have all the tiers defined we are over-defining
289
        # this to mark the user as a premium user as well
290
        if not self.fxa:
1✔
291
            return False
1✔
292
        for premium_domain in PREMIUM_DOMAINS:
1✔
293
            if self.user.email.endswith(f"@{premium_domain}"):
1!
294
                return True
×
295
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
296
        for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED:
1✔
297
            if sub in user_subscriptions:
1✔
298
                return True
1✔
299
        return False
1✔
300

301
    @property
1✔
302
    def has_phone(self):
1✔
303
        if not self.fxa:
1✔
304
            return False
1✔
305
        if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST:
1!
306
            if not flag_is_active_in_task("phones", self.user):
×
307
                return False
×
308
        if flag_is_active_in_task("free_phones", self.user):
1!
309
            return True
×
310
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
311
        for sub in settings.SUBSCRIPTIONS_WITH_PHONE:
1✔
312
            if sub in user_subscriptions:
1✔
313
                return True
1✔
314
        return False
1✔
315

316
    @property
1✔
317
    def has_vpn(self):
1✔
318
        if not self.fxa:
1!
319
            return False
×
320
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
321
        for sub in settings.SUBSCRIPTIONS_WITH_VPN:
1✔
322
            if sub in user_subscriptions:
1!
323
                return True
×
324
        return False
1✔
325

326
    @property
1✔
327
    def emails_forwarded(self):
1✔
328
        return (
1✔
329
            sum(ra.num_forwarded for ra in self.relay_addresses)
330
            + sum(da.num_forwarded for da in self.domain_addresses)
331
            + self.num_email_forwarded_in_deleted_address
332
        )
333

334
    @property
1✔
335
    def emails_blocked(self):
1✔
336
        return (
1✔
337
            sum(ra.num_blocked for ra in self.relay_addresses)
338
            + sum(da.num_blocked for da in self.domain_addresses)
339
            + self.num_email_blocked_in_deleted_address
340
        )
341

342
    @property
1✔
343
    def emails_replied(self):
1✔
344
        # Once Django is on version 4.0 and above, we can set the default=0
345
        # and return a int instead of None
346
        # https://docs.djangoproject.com/en/4.0/ref/models/querysets/#default
347
        totals = [self.relay_addresses.aggregate(models.Sum("num_replied"))]
1✔
348
        totals.append(self.domain_addresses.aggregate(models.Sum("num_replied")))
1✔
349
        total_num_replied = 0
1✔
350
        for num in totals:
1✔
351
            total_num_replied += (
1✔
352
                num.get("num_replied__sum") if num.get("num_replied__sum") else 0
353
            )
354
        return total_num_replied + self.num_email_replied_in_deleted_address
1✔
355

356
    @property
1✔
357
    def level_one_trackers_blocked(self):
1✔
358
        return (
1✔
359
            sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses)
360
            + sum(
361
                da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses
362
            )
363
            + (self.num_level_one_trackers_blocked_in_deleted_address or 0)
364
        )
365

366
    @property
1✔
367
    def joined_before_premium_release(self):
1✔
368
        date_created = self.user.date_joined
1✔
369
        return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00")
1✔
370

371
    def add_subdomain(self, subdomain):
1✔
372
        # Handles if the subdomain is "" or None
373
        if not subdomain:
1✔
374
            raise CannotMakeSubdomainException(
1✔
375
                "error-subdomain-cannot-be-empty-or-null"
376
            )
377

378
        # subdomain must be all lowercase
379
        subdomain = subdomain.lower()
1✔
380

381
        if not self.has_premium:
1✔
382
            raise CannotMakeSubdomainException("error-premium-set-subdomain")
1✔
383
        if self.subdomain is not None:
1✔
384
            raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain")
1✔
385
        self.subdomain = subdomain
1✔
386
        # The validator defined in the subdomain field does not get run in full_clean() when self.subdomain is "" or None
387
        # So we need to run the validator again to catch these cases.
388
        valid_available_subdomain(subdomain)
1✔
389
        self.full_clean()
1✔
390
        self.save()
1✔
391

392
        RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain))
1✔
393
        return subdomain
1✔
394

395
    def update_abuse_metric(
1✔
396
        self,
397
        address_created=False,
398
        replied=False,
399
        email_forwarded=False,
400
        forwarded_email_size=0,
401
    ):
402
        #  TODO: this should be wrapped in atomic to ensure race conditions are properly handled
403
        # look for abuse metrics created on the same UTC date, regardless of time.
404
        midnight_utc_today = datetime.combine(
1✔
405
            datetime.now(timezone.utc).date(), datetime.min.time()
406
        ).astimezone(timezone.utc)
407
        midnight_utc_tomorow = midnight_utc_today + timedelta(days=1)
1✔
408
        abuse_metric = self.user.abusemetrics_set.filter(
1✔
409
            first_recorded__gte=midnight_utc_today,
410
            first_recorded__lt=midnight_utc_tomorow,
411
        ).first()
412
        if not abuse_metric:
1✔
413
            abuse_metric = AbuseMetrics.objects.create(user=self.user)
1✔
414
            AbuseMetrics.objects.filter(first_recorded__lt=midnight_utc_today).delete()
1✔
415

416
        # increment the abuse metric
417
        if address_created:
1✔
418
            abuse_metric.num_address_created_per_day += 1
1✔
419
        if replied:
1✔
420
            abuse_metric.num_replies_per_day += 1
1✔
421
        if email_forwarded:
1✔
422
            abuse_metric.num_email_forwarded_per_day += 1
1✔
423
        if forwarded_email_size > 0:
1✔
424
            abuse_metric.forwarded_email_size_per_day += forwarded_email_size
1✔
425
        abuse_metric.last_recorded = datetime.now(timezone.utc)
1✔
426
        abuse_metric.save()
1✔
427

428
        # check user should be flagged for abuse
429
        hit_max_create = False
1✔
430
        hit_max_replies = False
1✔
431
        hit_max_forwarded = False
1✔
432
        hit_max_forwarded_email_size = False
1✔
433

434
        hit_max_create = (
1✔
435
            abuse_metric.num_address_created_per_day
436
            >= settings.MAX_ADDRESS_CREATION_PER_DAY
437
        )
438
        hit_max_replies = (
1✔
439
            abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY
440
        )
441
        hit_max_forwarded = (
1✔
442
            abuse_metric.num_email_forwarded_per_day >= settings.MAX_FORWARDED_PER_DAY
443
        )
444
        hit_max_forwarded_email_size = (
1✔
445
            abuse_metric.forwarded_email_size_per_day
446
            >= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY
447
        )
448
        if (
1✔
449
            hit_max_create
450
            or hit_max_replies
451
            or hit_max_forwarded
452
            or hit_max_forwarded_email_size
453
        ):
454
            self.last_account_flagged = datetime.now(timezone.utc)
1✔
455
            self.save()
1✔
456
            data = {
1✔
457
                "uid": self.fxa.uid,
458
                "flagged": self.last_account_flagged.timestamp(),
459
                "replies": abuse_metric.num_replies_per_day,
460
                "addresses": abuse_metric.num_address_created_per_day,
461
                "forwarded": abuse_metric.num_email_forwarded_per_day,
462
                "forwarded_size_in_bytes": abuse_metric.forwarded_email_size_per_day,
463
            }
464
            # log for further secops review
465
            abuse_logger.info("Abuse flagged", extra=data)
1✔
466
        return self.last_account_flagged
1✔
467

468
    @property
1✔
469
    def is_flagged(self):
1✔
470
        if not self.last_account_flagged:
1✔
471
            return False
1✔
472
        account_premium_feature_resumed = self.last_account_flagged + timedelta(
1✔
473
            days=settings.PREMIUM_FEATURE_PAUSED_DAYS
474
        )
475
        if datetime.now(timezone.utc) > account_premium_feature_resumed:
1!
476
            # premium feature has been resumed
477
            return False
×
478
        # user was flagged and the premium feature pause period is not yet over
479
        return True
1✔
480

481

482
@receiver(models.signals.post_save, sender=Profile)
1✔
483
def copy_auth_token(sender, instance=None, created=False, **kwargs):
1✔
484
    if created:
1✔
485
        # baker triggers created during tests
486
        # so first check the user doesn't already have a Token
487
        try:
1✔
488
            Token.objects.get(user=instance.user)
1✔
489
            return
1✔
490
        except Token.DoesNotExist:
1✔
491
            Token.objects.create(user=instance.user, key=instance.api_token)
1✔
492

493

494
def address_hash(address, subdomain=None, domain=None):
1✔
495
    if not domain:
1✔
496
        domain = get_domains_from_settings()["MOZMAIL_DOMAIN"]
1✔
497
    if subdomain:
1✔
498
        return sha256(f"{address}@{subdomain}.{domain}".encode("utf-8")).hexdigest()
1✔
499
    if domain == settings.RELAY_FIREFOX_DOMAIN:
1✔
500
        return sha256(f"{address}".encode("utf-8")).hexdigest()
1✔
501
    return sha256(f"{address}@{domain}".encode("utf-8")).hexdigest()
1✔
502

503

504
def address_default():
1✔
505
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=9))
1✔
506

507

508
def has_bad_words(value):
1✔
509
    for badword in emails_config.badwords:
1✔
510
        badword = badword.strip()
1✔
511
        if len(badword) <= 4 and badword == value:
1✔
512
            return True
1✔
513
        if len(badword) > 4 and badword in value:
1✔
514
            return True
1✔
515
    return False
1✔
516

517

518
def is_blocklisted(value):
1✔
519
    return any(blockedword == value for blockedword in emails_config.blocklist)
1✔
520

521

522
def get_domain_numerical(domain_address):
1✔
523
    # get domain name from the address
524
    domains = get_domains_from_settings()
1✔
525
    domains_keys = list(domains.keys())
1✔
526
    domains_values = list(domains.values())
1✔
527
    domain_name = domains_keys[domains_values.index(domain_address)]
1✔
528
    # get domain numerical value from domain name
529
    choices = dict(DOMAIN_CHOICES)
1✔
530
    choices_keys = list(choices.keys())
1✔
531
    choices_values = list(choices.values())
1✔
532
    return choices_keys[choices_values.index(domain_name)]
1✔
533

534

535
def hash_subdomain(subdomain, domain=settings.MOZMAIL_DOMAIN):
1✔
536
    return sha256(f"{subdomain}.{domain}".encode("utf-8")).hexdigest()
1✔
537

538

539
class RegisteredSubdomain(models.Model):
1✔
540
    subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True)
1✔
541
    registered_at = models.DateTimeField(auto_now_add=True)
1✔
542

543
    def __str__(self):
1✔
544
        return self.subdomain_hash
×
545

546

547
class CannotMakeSubdomainException(BadRequest):
1✔
548
    """Exception raised by Profile due to error on subdomain creation.
549

550
    Attributes:
551
        message -- optional explanation of the error
552
    """
553

554
    def __init__(self, message=None):
1✔
555
        self.message = message
1✔
556

557

558
class CannotMakeAddressException(RelayAPIException):
1✔
559
    """Base exception for RelayAddress or DomainAddress creation failure."""
560

561

562
class AccountIsPausedException(CannotMakeAddressException):
1✔
563
    default_code = "account_is_paused"
1✔
564
    default_detail = "Your account is on pause."
1✔
565
    status_code = 403
1✔
566

567

568
class RelayAddrFreeTierLimitException(CannotMakeAddressException):
1✔
569
    default_code = "free_tier_limit"
1✔
570
    default_detail_template = (
1✔
571
        "You’ve used all {free_tier_limit} email masks included with your free account."
572
        "You can reuse an existing mask, but using a unique mask for each account is the most secure option."
573
    )
574
    status_code = 403
1✔
575

576
    def __init__(self, free_tier_limit: Optional[int] = None, *args, **kwargs):
1✔
577
        self.free_tier_limit = free_tier_limit or settings.MAX_NUM_FREE_ALIASES
1✔
578
        self.default_detail = self.default_detail_template.format(
1✔
579
            free_tier_limit=self.free_tier_limit
580
        )
581
        super().__init__(*args, **kwargs)
1✔
582

583
    def error_context(self) -> ErrorContextType:
1✔
584
        return {"free_tier_limit": self.free_tier_limit}
1✔
585

586

587
class DomainAddrFreeTierException(CannotMakeAddressException):
1✔
588
    default_code = "free_tier_no_subdomain_masks"
1✔
589
    default_detail = "Your free account does not include custom subdomains for masks. To create custom masks, upgrade to Relay Premium."
1✔
590
    status_code = 403
1✔
591

592

593
class DomainAddrNeedSubdomainException(CannotMakeAddressException):
1✔
594
    default_code = "need_subdomain"
1✔
595
    default_detail = "Please select a subdomain before creating a custom email address."
1✔
596
    status_code = 400
1✔
597

598

599
class DomainAddrUnavailableException(CannotMakeAddressException):
1✔
600
    default_code = "address_unavailable"
1✔
601
    default_detail_template = "“{unavailable_address}” could not be created. Please try again with a different mask name."
1✔
602
    status_code = 400
1✔
603

604
    def __init__(self, unavailable_address: str, *args, **kwargs):
1✔
605
        self.unavailable_address = unavailable_address
1✔
606
        self.default_detail = self.default_detail_template.format(
1✔
607
            unavailable_address=self.unavailable_address
608
        )
609
        super().__init__(*args, **kwargs)
1✔
610

611
    def error_context(self) -> ErrorContextType:
1✔
612
        return {"unavailable_address": self.unavailable_address}
1✔
613

614

615
class RelayAddress(models.Model):
1✔
616
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
617
    address = models.CharField(max_length=64, default=address_default, unique=True)
1✔
618
    domain = models.PositiveSmallIntegerField(
1✔
619
        choices=DOMAIN_CHOICES, default=default_domain_numerical
620
    )
621
    enabled = models.BooleanField(default=True)
1✔
622
    description = models.CharField(max_length=64, blank=True)
1✔
623
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
624
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
625
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
626
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
627
    num_blocked = models.PositiveIntegerField(default=0)
1✔
628
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
629
    num_replied = models.PositiveIntegerField(default=0)
1✔
630
    num_spam = models.PositiveIntegerField(default=0)
1✔
631
    generated_for = models.CharField(max_length=255, blank=True)
1✔
632
    block_list_emails = models.BooleanField(default=False)
1✔
633
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
634

635
    def __str__(self):
1✔
636
        return self.address
×
637

638
    def delete(self, *args, **kwargs):
1✔
639
        # TODO: create hard bounce receipt rule in AWS for the address
640
        deleted_address = DeletedAddress.objects.create(
1✔
641
            address_hash=address_hash(self.address, domain=self.domain_value),
642
            num_forwarded=self.num_forwarded,
643
            num_blocked=self.num_blocked,
644
            num_replied=self.num_replied,
645
            num_spam=self.num_spam,
646
        )
647
        deleted_address.save()
1✔
648
        profile = Profile.objects.get(user=self.user)
1✔
649
        profile.address_last_deleted = datetime.now(timezone.utc)
1✔
650
        profile.num_address_deleted += 1
1✔
651
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
652
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
653
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
654
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
655
        ) + (self.num_level_one_trackers_blocked or 0)
656
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
657
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
658
        profile.save()
1✔
659
        return super(RelayAddress, self).delete(*args, **kwargs)
1✔
660

661
    def save(self, *args, **kwargs):
1✔
662
        if self._state.adding:
1✔
663
            with transaction.atomic():
1✔
664
                locked_profile = Profile.objects.select_for_update().get(user=self.user)
1✔
665
                check_user_can_make_another_address(locked_profile)
1✔
666
                while True:
1✔
667
                    if valid_address(self.address, self.domain):
1!
668
                        break
1✔
669
                    self.address = address_default()
×
670
                locked_profile.update_abuse_metric(address_created=True)
1✔
671
        if not self.user.profile.server_storage:
1✔
672
            self.description = ""
1✔
673
            self.generated_for = ""
1✔
674
            self.used_on = ""
1✔
675
        if not self.user.profile.has_premium:
1✔
676
            self.block_list_emails = False
1✔
677
        return super().save(*args, **kwargs)
1✔
678

679
    @property
1✔
680
    def domain_value(self):
1✔
681
        return get_domains_from_settings().get(self.get_domain_display())
1✔
682

683
    @property
1✔
684
    def full_address(self):
1✔
685
        return "%s@%s" % (self.address, self.domain_value)
1✔
686

687

688
def check_user_can_make_another_address(profile: Profile) -> None:
1✔
689
    if profile.is_flagged:
1✔
690
        raise AccountIsPausedException()
1✔
691
    # MPP-3021: return early for premium users to avoid at_max_free_aliases DB query
692
    if profile.has_premium:
1✔
693
        return
1✔
694
    if profile.at_max_free_aliases:
1✔
695
        raise RelayAddrFreeTierLimitException()
1✔
696

697

698
def valid_address_pattern(address):
1✔
699
    #   can't start or end with a hyphen
700
    #   must be 1-63 lowercase alphanumeric characters and/or hyphens
701
    valid_address_pattern = re.compile("^(?![-.])[a-z0-9-.]{1,63}(?<![-.])$")
1✔
702
    return valid_address_pattern.match(address) is not None
1✔
703

704

705
def valid_address(address, domain):
1✔
706
    address_pattern_valid = valid_address_pattern(address)
1✔
707
    address_contains_badword = has_bad_words(address)
1✔
708
    address_is_blocklisted = is_blocklisted(address)
1✔
709
    address_already_deleted = DeletedAddress.objects.filter(
1✔
710
        address_hash=address_hash(address, domain=domain)
711
    ).count()
712
    if (
1✔
713
        address_already_deleted > 0
714
        or address_contains_badword
715
        or address_is_blocklisted
716
        or not address_pattern_valid
717
    ):
718
        return False
1✔
719
    return True
1✔
720

721

722
class DeletedAddress(models.Model):
1✔
723
    address_hash = models.CharField(max_length=64, db_index=True)
1✔
724
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
725
    num_blocked = models.PositiveIntegerField(default=0)
1✔
726
    num_replied = models.PositiveIntegerField(default=0)
1✔
727
    num_spam = models.PositiveIntegerField(default=0)
1✔
728

729
    def __str__(self):
1✔
730
        return self.address_hash
×
731

732

733
def check_user_can_make_domain_address(user_profile: Profile) -> None:
1✔
734
    if not user_profile.has_premium:
1✔
735
        raise DomainAddrFreeTierException()
1✔
736

737
    if not user_profile.subdomain:
1✔
738
        raise DomainAddrNeedSubdomainException()
1✔
739

740
    if user_profile.is_flagged:
1✔
741
        raise AccountIsPausedException()
1✔
742

743

744
class DomainAddress(models.Model):
1✔
745
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
746
    address = models.CharField(
1✔
747
        max_length=64, validators=[MinLengthValidator(limit_value=1)]
748
    )
749
    enabled = models.BooleanField(default=True)
1✔
750
    description = models.CharField(max_length=64, blank=True)
1✔
751
    domain = models.PositiveSmallIntegerField(choices=DOMAIN_CHOICES, default=2)
1✔
752
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
753
    first_emailed_at = models.DateTimeField(null=True, db_index=True)
1✔
754
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
755
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
756
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
757
    num_blocked = models.PositiveIntegerField(default=0)
1✔
758
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
759
    num_replied = models.PositiveIntegerField(default=0)
1✔
760
    num_spam = models.PositiveIntegerField(default=0)
1✔
761
    block_list_emails = models.BooleanField(default=False)
1✔
762
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
763

764
    class Meta:
1✔
765
        unique_together = ["user", "address"]
1✔
766

767
    def __str__(self):
1✔
768
        return self.address
×
769

770
    def save(self, *args, **kwargs) -> None:
1✔
771
        user_profile = self.user.profile
1✔
772
        if self._state.adding:
1✔
773
            check_user_can_make_domain_address(user_profile)
1✔
774
            pattern_valid = valid_address_pattern(self.address)
1✔
775
            address_contains_badword = has_bad_words(self.address)
1✔
776
            if not pattern_valid or address_contains_badword:
1✔
777
                raise DomainAddrUnavailableException(unavailable_address=self.address)
1✔
778
            user_profile.update_abuse_metric(address_created=True)
1✔
779
        if not user_profile.has_premium:
1✔
780
            self.block_list_emails = False
1✔
781
        if not user_profile.server_storage:
1✔
782
            self.description = ""
1✔
783
        return super().save(*args, **kwargs)
1✔
784

785
    @property
1✔
786
    def user_profile(self):
1✔
787
        return Profile.objects.get(user=self.user)
1✔
788

789
    @staticmethod
1✔
790
    def make_domain_address(user_profile, address=None, made_via_email=False):
1✔
791
        check_user_can_make_domain_address(user_profile)
1✔
792

793
        address_contains_badword = False
1✔
794
        if not address:
1✔
795
            # FIXME: if the alias is randomly generated and has bad words
796
            # we should retry like make_relay_address does
797
            # not fixing this now because not sure randomly generated
798
            # DomainAlias will be a feature
799
            address = address_default()
1✔
800
            # Only check for bad words if randomly generated
801

802
        domain_address = DomainAddress.objects.create(
1✔
803
            user=user_profile.user,
804
            address=address,
805
        )
806
        if made_via_email:
1✔
807
            # update first_emailed_at indicating alias generation impromptu.
808
            domain_address.first_emailed_at = datetime.now(timezone.utc)
1✔
809
            domain_address.save()
1✔
810
        return domain_address
1✔
811

812
    def delete(self, *args, **kwargs):
1✔
813
        # TODO: create hard bounce receipt rule in AWS for the address
814
        deleted_address = DeletedAddress.objects.create(
1✔
815
            address_hash=address_hash(
816
                self.address, self.user_profile.subdomain, self.domain_value
817
            ),
818
            num_forwarded=self.num_forwarded,
819
            num_blocked=self.num_blocked,
820
            num_replied=self.num_replied,
821
            num_spam=self.num_spam,
822
        )
823
        deleted_address.save()
1✔
824
        # self.user_profile is a property and should not be used to
825
        # update values on the user's profile
826
        profile = Profile.objects.get(user=self.user)
1✔
827
        profile.address_last_deleted = datetime.now(timezone.utc)
1✔
828
        profile.num_address_deleted += 1
1✔
829
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
830
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
831
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
832
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
833
        ) + (self.num_level_one_trackers_blocked or 0)
834
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
835
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
836
        profile.save()
1✔
837
        return super(DomainAddress, self).delete(*args, **kwargs)
1✔
838

839
    @property
1✔
840
    def domain_value(self):
1✔
841
        return get_domains_from_settings().get(self.get_domain_display())
1✔
842

843
    @property
1✔
844
    def full_address(self):
1✔
845
        return "%s@%s.%s" % (
1✔
846
            self.address,
847
            self.user_profile.subdomain,
848
            self.domain_value,
849
        )
850

851

852
class Reply(models.Model):
1✔
853
    relay_address = models.ForeignKey(
1✔
854
        RelayAddress, on_delete=models.CASCADE, blank=True, null=True
855
    )
856
    domain_address = models.ForeignKey(
1✔
857
        DomainAddress, on_delete=models.CASCADE, blank=True, null=True
858
    )
859
    lookup = models.CharField(max_length=255, blank=False, db_index=True)
1✔
860
    encrypted_metadata = models.TextField(blank=False)
1✔
861
    created_at = models.DateField(auto_now_add=True, null=False, db_index=True)
1✔
862

863
    @property
1✔
864
    def address(self):
1✔
865
        return self.relay_address or self.domain_address
1✔
866

867
    @property
1✔
868
    def profile(self):
1✔
869
        return self.address.user.profile
1✔
870

871
    @property
1✔
872
    def owner_has_premium(self):
1✔
873
        return self.profile.has_premium
1✔
874

875
    def increment_num_replied(self):
1✔
876
        address = self.relay_address or self.domain_address
1✔
877
        address.num_replied += 1
1✔
878
        address.last_used_at = datetime.now(timezone.utc)
1✔
879
        address.save(update_fields=["num_replied", "last_used_at"])
1✔
880
        return address.num_replied
1✔
881

882

883
class AbuseMetrics(models.Model):
1✔
884
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
885
    first_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
886
    last_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
887
    num_address_created_per_day = models.PositiveSmallIntegerField(default=0)
1✔
888
    num_replies_per_day = models.PositiveSmallIntegerField(default=0)
1✔
889
    # Values from 0 to 32767 are safe in all databases supported by Django.
890
    num_email_forwarded_per_day = models.PositiveSmallIntegerField(default=0)
1✔
891
    # Values from 0 to 9223372036854775807 are safe in all databases supported by Django.
892
    forwarded_email_size_per_day = models.PositiveBigIntegerField(default=0)
1✔
893

894
    class Meta:
1✔
895
        unique_together = ["user", "first_recorded"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc