• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 93411a61-2999-4895-b247-e93f842aa7c3

30 Nov 2023 06:57PM CUT coverage: 73.596% (-0.02%) from 73.614%
93411a61-2999-4895-b247-e93f842aa7c3

push

circleci

web-flow
Merge pull request #4173 from mozilla/MPP-3594-long-masks-overflow

MPP-3594: Fix long masks overflow in free onboarding

1970 of 2915 branches covered (0.0%)

Branch coverage included in aggregate %.

6261 of 8269 relevant lines covered (75.72%)

19.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.82
/emails/models.py
1
from collections import namedtuple
1✔
2
from datetime import datetime, timedelta, timezone
1✔
3
from hashlib import sha256
1✔
4
from typing import Optional
1✔
5
import logging
1✔
6
import random
1✔
7
import re
1✔
8
import string
1✔
9
import uuid
1✔
10
from django.apps import apps
1✔
11
from django.conf import settings
1✔
12
from django.contrib.auth.models import User
1✔
13
from django.core.exceptions import BadRequest
1✔
14
from django.core.validators import MinLengthValidator
1✔
15
from django.db import models, transaction
1✔
16
from django.dispatch import receiver
1✔
17
from django.utils.translation.trans_real import (
1✔
18
    parse_accept_lang_header,
19
    get_supported_language_variant,
20
)
21

22
from rest_framework.authtoken.models import Token
1✔
23

24
from api.exceptions import ErrorContextType, RelayAPIException
1✔
25
from privaterelay.plans import get_premium_countries
1✔
26
from privaterelay.utils import (
1✔
27
    AcceptLanguageError,
28
    flag_is_active_in_task,
29
    guess_country_from_accept_lang,
30
)
31

32

33
emails_config = apps.get_app_config("emails")
1✔
34
logger = logging.getLogger("events")
1✔
35
abuse_logger = logging.getLogger("abusemetrics")
1✔
36

37
BounceStatus = namedtuple("BounceStatus", "paused type")
1✔
38

39

40
def get_domains_from_settings():
1✔
41
    # HACK: detect if code is running in django tests
42
    if "testserver" in settings.ALLOWED_HOSTS:
1!
43
        return {"RELAY_FIREFOX_DOMAIN": "default.com", "MOZMAIL_DOMAIN": "test.com"}
1✔
44
    return {
×
45
        "RELAY_FIREFOX_DOMAIN": settings.RELAY_FIREFOX_DOMAIN,
46
        "MOZMAIL_DOMAIN": settings.MOZMAIL_DOMAIN,
47
    }
48

49

50
DOMAIN_CHOICES = [(1, "RELAY_FIREFOX_DOMAIN"), (2, "MOZMAIL_DOMAIN")]
1✔
51
PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"]
1✔
52

53

54
def valid_available_subdomain(subdomain, *args, **kwargs):
1✔
55
    if not subdomain:
1✔
56
        raise CannotMakeSubdomainException("error-subdomain-cannot-be-empty-or-null")
1✔
57
    # valid subdomains:
58
    #   can't start or end with a hyphen
59
    #   must be 1-63 alphanumeric characters and/or hyphens
60
    subdomain = subdomain.lower()
1✔
61
    valid_subdomain_pattern = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$")
1✔
62
    valid = valid_subdomain_pattern.match(subdomain) is not None
1✔
63
    #   can't have "bad" words in them
64
    bad_word = has_bad_words(subdomain)
1✔
65
    #   can't have "blocked" words in them
66
    blocked_word = is_blocklisted(subdomain)
1✔
67
    #   can't be taken by someone else
68
    taken = (
1✔
69
        RegisteredSubdomain.objects.filter(
70
            subdomain_hash=hash_subdomain(subdomain)
71
        ).count()
72
        > 0
73
    )
74
    if not valid or bad_word or blocked_word or taken:
1✔
75
        raise CannotMakeSubdomainException("error-subdomain-not-available")
1✔
76
    return True
1✔
77

78

79
# This historical function is referenced in migration 0029_profile_add_deleted_metric_and_changeserver_storage_default
80
def default_server_storage():
1✔
81
    return True
×
82

83

84
def default_domain_numerical():
1✔
85
    domains = get_domains_from_settings()
1✔
86
    domain = domains["MOZMAIL_DOMAIN"]
1✔
87
    return get_domain_numerical(domain)
1✔
88

89

90
class Profile(models.Model):
1✔
91
    user = models.OneToOneField(User, on_delete=models.CASCADE)
1✔
92
    api_token = models.UUIDField(default=uuid.uuid4)
1✔
93
    num_address_deleted = models.PositiveIntegerField(default=0)
1✔
94
    date_subscribed = models.DateTimeField(blank=True, null=True)
1✔
95
    date_subscribed_phone = models.DateTimeField(blank=True, null=True)
1✔
96
    # TODO: delete date_phone_subscription_checked in favor of date_phone_subscription_next_reset
97
    date_phone_subscription_checked = models.DateTimeField(blank=True, null=True)
1✔
98
    date_phone_subscription_start = models.DateTimeField(blank=True, null=True)
1✔
99
    date_phone_subscription_reset = models.DateTimeField(blank=True, null=True)
1✔
100
    date_phone_subscription_end = models.DateTimeField(blank=True, null=True)
1✔
101
    address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
102
    last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
103
    last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
104
    last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
105
    num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
106
    num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
107
    num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField(
1✔
108
        default=0, null=True
109
    )
110
    num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
111
    num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
112
    subdomain = models.CharField(
1✔
113
        blank=True,
114
        null=True,
115
        unique=True,
116
        max_length=63,
117
        db_index=True,
118
        validators=[valid_available_subdomain],
119
    )
120
    # Whether we store the user's alias labels in the server
121
    server_storage = models.BooleanField(default=True)
1✔
122
    # Whether we store the caller/sender log for the user's relay number
123
    store_phone_log = models.BooleanField(default=True)
1✔
124
    # TODO: Data migration to set null to false
125
    # TODO: Schema migration to remove null=True
126
    remove_level_one_email_trackers = models.BooleanField(null=True, default=False)
1✔
127
    onboarding_state = models.PositiveIntegerField(default=0)
1✔
128
    onboarding_free_state = models.PositiveIntegerField(default=0)
1✔
129
    auto_block_spam = models.BooleanField(default=False)
1✔
130
    forwarded_first_reply = models.BooleanField(default=False)
1✔
131
    # Empty string means the profile was created through relying party flow
132
    created_by = models.CharField(blank=True, null=True, max_length=63)
1✔
133
    sent_welcome_email = models.BooleanField(default=False)
1✔
134

135
    def __str__(self):
1✔
136
        return "%s Profile" % self.user
1✔
137

138
    def save(self, *args, **kwargs):
1✔
139
        # always lower-case the subdomain before saving it
140
        # TODO: change subdomain field as a custom field inheriting from
141
        # CharField to validate constraints on the field update too
142
        if self.subdomain and not self.subdomain.islower():
1✔
143
            self.subdomain = self.subdomain.lower()
1✔
144
        ret = super().save(*args, **kwargs)
1✔
145
        # any time a profile is saved with server_storage False, delete the
146
        # appropriate server-stored Relay address data.
147
        if not self.server_storage:
1✔
148
            relay_addresses = RelayAddress.objects.filter(user=self.user)
1✔
149
            relay_addresses.update(description="", generated_for="", used_on="")
1✔
150
        if settings.PHONES_ENABLED:
1!
151
            # any time a profile is saved with store_phone_log False, delete the
152
            # appropriate server-stored InboundContact records
153
            from phones.models import InboundContact, RelayNumber
1✔
154

155
            if not self.store_phone_log:
1✔
156
                try:
1✔
157
                    relay_number = RelayNumber.objects.get(user=self.user)
1✔
158
                    InboundContact.objects.filter(relay_number=relay_number).delete()
1✔
159
                except RelayNumber.DoesNotExist:
1✔
160
                    pass
1✔
161
        return ret
1✔
162

163
    @property
1✔
164
    def language(self):
1✔
165
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
166
            for accept_lang, _ in parse_accept_lang_header(
1!
167
                self.fxa.extra_data.get("locale")
168
            ):
169
                try:
1✔
170
                    return get_supported_language_variant(accept_lang)
1✔
171
                except LookupError:
×
172
                    continue
×
173
        return "en"
1✔
174

175
    # This method returns whether the locale associated with the user's Mozilla account
176
    # includes a country code from a Premium country. This is less accurate than using
177
    # get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so
178
    # prefer using that if a request context is available. In other contexts, for
179
    # example when sending an email, this method can be useful.
180
    @property
1✔
181
    def fxa_locale_in_premium_country(self) -> bool:
1✔
182
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
183
            try:
1✔
184
                country = guess_country_from_accept_lang(self.fxa.extra_data["locale"])
1✔
185
            except AcceptLanguageError:
1✔
186
                return False
1✔
187
            premium_countries = get_premium_countries()
1✔
188
            if country in premium_countries:
1✔
189
                return True
1✔
190
        return False
1✔
191

192
    @property
1✔
193
    def avatar(self):
1✔
194
        return self.fxa.extra_data.get("avatar")
1✔
195

196
    @property
1✔
197
    def relay_addresses(self):
1✔
198
        return RelayAddress.objects.filter(user=self.user)
1✔
199

200
    @property
1✔
201
    def domain_addresses(self):
1✔
202
        return DomainAddress.objects.filter(user=self.user)
1✔
203

204
    @property
1✔
205
    def total_masks(self) -> int:
1✔
206
        ra_count: int = self.relay_addresses.count()
1✔
207
        da_count: int = self.domain_addresses.count()
1✔
208
        return ra_count + da_count
1✔
209

210
    @property
1✔
211
    def at_mask_limit(self) -> bool:
1✔
212
        if self.has_premium:
1✔
213
            return False
1✔
214
        ra_count: int = self.relay_addresses.count()
1✔
215
        return ra_count >= settings.MAX_NUM_FREE_ALIASES
1✔
216

217
    def check_bounce_pause(self):
1✔
218
        if self.last_hard_bounce:
1✔
219
            last_hard_bounce_allowed = datetime.now(timezone.utc) - timedelta(
1✔
220
                days=settings.HARD_BOUNCE_ALLOWED_DAYS
221
            )
222
            if self.last_hard_bounce > last_hard_bounce_allowed:
1✔
223
                return BounceStatus(True, "hard")
1✔
224
            self.last_hard_bounce = None
1✔
225
            self.save()
1✔
226
        if self.last_soft_bounce:
1✔
227
            last_soft_bounce_allowed = datetime.now(timezone.utc) - timedelta(
1✔
228
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
229
            )
230
            if self.last_soft_bounce > last_soft_bounce_allowed:
1✔
231
                return BounceStatus(True, "soft")
1✔
232
            self.last_soft_bounce = None
1✔
233
            self.save()
1✔
234
        return BounceStatus(False, "")
1✔
235

236
    @property
1✔
237
    def bounce_status(self):
1✔
238
        return self.check_bounce_pause()
1✔
239

240
    @property
1✔
241
    def next_email_try(self):
1✔
242
        bounce_pause, bounce_type = self.check_bounce_pause()
1✔
243

244
        if not bounce_pause:
1✔
245
            return datetime.now(timezone.utc)
1✔
246

247
        if bounce_type == "soft":
1✔
248
            return self.last_soft_bounce + timedelta(
1✔
249
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
250
            )
251

252
        return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS)
1✔
253

254
    @property
1✔
255
    def last_bounce_date(self):
1✔
256
        if self.last_hard_bounce:
1✔
257
            return self.last_hard_bounce
1✔
258
        if self.last_soft_bounce:
1✔
259
            return self.last_soft_bounce
1✔
260
        return None
1✔
261

262
    @property
1✔
263
    def at_max_free_aliases(self) -> bool:
1✔
264
        relay_addresses_count: int = self.relay_addresses.count()
1✔
265
        return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES
1✔
266

267
    @property
1✔
268
    def fxa(self):
1✔
269
        # Note: we are NOT using .filter() here because it invalidates
270
        # any profile instances that were queried with prefetch_related, which
271
        # we use in at least the profile view to minimize queries
272
        for sa in self.user.socialaccount_set.all():
1✔
273
            if sa.provider == "fxa":
1!
274
                return sa
1✔
275
        return None
1✔
276

277
    @property
1✔
278
    def display_name(self):
1✔
279
        # if display name is not set on FxA the
280
        # displayName key will not exist on the extra_data
281
        return self.fxa.extra_data.get("displayName")
1✔
282

283
    @property
1✔
284
    def custom_domain(self):
1✔
285
        return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}"
×
286

287
    @property
1✔
288
    def has_premium(self):
1✔
289
        # FIXME: as we don't have all the tiers defined we are over-defining
290
        # this to mark the user as a premium user as well
291
        if not self.fxa:
1✔
292
            return False
1✔
293
        for premium_domain in PREMIUM_DOMAINS:
1✔
294
            if self.user.email.endswith(f"@{premium_domain}"):
1!
295
                return True
×
296
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
297
        for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED:
1✔
298
            if sub in user_subscriptions:
1✔
299
                return True
1✔
300
        return False
1✔
301

302
    @property
1✔
303
    def has_phone(self):
1✔
304
        if not self.fxa:
1✔
305
            return False
1✔
306
        if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST:
1!
307
            if not flag_is_active_in_task("phones", self.user):
×
308
                return False
×
309
        if flag_is_active_in_task("free_phones", self.user):
1!
310
            return True
×
311
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
312
        for sub in settings.SUBSCRIPTIONS_WITH_PHONE:
1✔
313
            if sub in user_subscriptions:
1✔
314
                return True
1✔
315
        return False
1✔
316

317
    @property
1✔
318
    def has_vpn(self):
1✔
319
        if not self.fxa:
1!
320
            return False
×
321
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
322
        for sub in settings.SUBSCRIPTIONS_WITH_VPN:
1✔
323
            if sub in user_subscriptions:
1!
324
                return True
×
325
        return False
1✔
326

327
    @property
1✔
328
    def emails_forwarded(self):
1✔
329
        return (
1✔
330
            sum(ra.num_forwarded for ra in self.relay_addresses)
331
            + sum(da.num_forwarded for da in self.domain_addresses)
332
            + self.num_email_forwarded_in_deleted_address
333
        )
334

335
    @property
1✔
336
    def emails_blocked(self):
1✔
337
        return (
1✔
338
            sum(ra.num_blocked for ra in self.relay_addresses)
339
            + sum(da.num_blocked for da in self.domain_addresses)
340
            + self.num_email_blocked_in_deleted_address
341
        )
342

343
    @property
1✔
344
    def emails_replied(self):
1✔
345
        # Once Django is on version 4.0 and above, we can set the default=0
346
        # and return a int instead of None
347
        # https://docs.djangoproject.com/en/4.0/ref/models/querysets/#default
348
        totals = [self.relay_addresses.aggregate(models.Sum("num_replied"))]
1✔
349
        totals.append(self.domain_addresses.aggregate(models.Sum("num_replied")))
1✔
350
        total_num_replied = 0
1✔
351
        for num in totals:
1✔
352
            total_num_replied += (
1✔
353
                num.get("num_replied__sum") if num.get("num_replied__sum") else 0
354
            )
355
        return total_num_replied + self.num_email_replied_in_deleted_address
1✔
356

357
    @property
1✔
358
    def level_one_trackers_blocked(self):
1✔
359
        return (
1✔
360
            sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses)
361
            + sum(
362
                da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses
363
            )
364
            + (self.num_level_one_trackers_blocked_in_deleted_address or 0)
365
        )
366

367
    @property
1✔
368
    def joined_before_premium_release(self):
1✔
369
        date_created = self.user.date_joined
1✔
370
        return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00")
1✔
371

372
    def add_subdomain(self, subdomain):
1✔
373
        # Handles if the subdomain is "" or None
374
        if not subdomain:
1✔
375
            raise CannotMakeSubdomainException(
1✔
376
                "error-subdomain-cannot-be-empty-or-null"
377
            )
378

379
        # subdomain must be all lowercase
380
        subdomain = subdomain.lower()
1✔
381

382
        if not self.has_premium:
1✔
383
            raise CannotMakeSubdomainException("error-premium-set-subdomain")
1✔
384
        if self.subdomain is not None:
1✔
385
            raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain")
1✔
386
        self.subdomain = subdomain
1✔
387
        # The validator defined in the subdomain field does not get run in full_clean() when self.subdomain is "" or None
388
        # So we need to run the validator again to catch these cases.
389
        valid_available_subdomain(subdomain)
1✔
390
        self.full_clean()
1✔
391
        self.save()
1✔
392

393
        RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain))
1✔
394
        return subdomain
1✔
395

396
    def update_abuse_metric(
1✔
397
        self,
398
        address_created=False,
399
        replied=False,
400
        email_forwarded=False,
401
        forwarded_email_size=0,
402
    ):
403
        #  TODO: this should be wrapped in atomic to ensure race conditions are properly handled
404
        # look for abuse metrics created on the same UTC date, regardless of time.
405
        midnight_utc_today = datetime.combine(
1✔
406
            datetime.now(timezone.utc).date(), datetime.min.time()
407
        ).astimezone(timezone.utc)
408
        midnight_utc_tomorow = midnight_utc_today + timedelta(days=1)
1✔
409
        abuse_metric = self.user.abusemetrics_set.filter(
1✔
410
            first_recorded__gte=midnight_utc_today,
411
            first_recorded__lt=midnight_utc_tomorow,
412
        ).first()
413
        if not abuse_metric:
1✔
414
            abuse_metric = AbuseMetrics.objects.create(user=self.user)
1✔
415
            AbuseMetrics.objects.filter(first_recorded__lt=midnight_utc_today).delete()
1✔
416

417
        # increment the abuse metric
418
        if address_created:
1✔
419
            abuse_metric.num_address_created_per_day += 1
1✔
420
        if replied:
1✔
421
            abuse_metric.num_replies_per_day += 1
1✔
422
        if email_forwarded:
1✔
423
            abuse_metric.num_email_forwarded_per_day += 1
1✔
424
        if forwarded_email_size > 0:
1✔
425
            abuse_metric.forwarded_email_size_per_day += forwarded_email_size
1✔
426
        abuse_metric.last_recorded = datetime.now(timezone.utc)
1✔
427
        abuse_metric.save()
1✔
428

429
        # check user should be flagged for abuse
430
        hit_max_create = False
1✔
431
        hit_max_replies = False
1✔
432
        hit_max_forwarded = False
1✔
433
        hit_max_forwarded_email_size = False
1✔
434

435
        hit_max_create = (
1✔
436
            abuse_metric.num_address_created_per_day
437
            >= settings.MAX_ADDRESS_CREATION_PER_DAY
438
        )
439
        hit_max_replies = (
1✔
440
            abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY
441
        )
442
        hit_max_forwarded = (
1✔
443
            abuse_metric.num_email_forwarded_per_day >= settings.MAX_FORWARDED_PER_DAY
444
        )
445
        hit_max_forwarded_email_size = (
1✔
446
            abuse_metric.forwarded_email_size_per_day
447
            >= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY
448
        )
449
        if (
1✔
450
            hit_max_create
451
            or hit_max_replies
452
            or hit_max_forwarded
453
            or hit_max_forwarded_email_size
454
        ):
455
            self.last_account_flagged = datetime.now(timezone.utc)
1✔
456
            self.save()
1✔
457
            data = {
1✔
458
                "uid": self.fxa.uid,
459
                "flagged": self.last_account_flagged.timestamp(),
460
                "replies": abuse_metric.num_replies_per_day,
461
                "addresses": abuse_metric.num_address_created_per_day,
462
                "forwarded": abuse_metric.num_email_forwarded_per_day,
463
                "forwarded_size_in_bytes": abuse_metric.forwarded_email_size_per_day,
464
            }
465
            # log for further secops review
466
            abuse_logger.info("Abuse flagged", extra=data)
1✔
467
        return self.last_account_flagged
1✔
468

469
    @property
1✔
470
    def is_flagged(self):
1✔
471
        if not self.last_account_flagged:
1✔
472
            return False
1✔
473
        account_premium_feature_resumed = self.last_account_flagged + timedelta(
1✔
474
            days=settings.PREMIUM_FEATURE_PAUSED_DAYS
475
        )
476
        if datetime.now(timezone.utc) > account_premium_feature_resumed:
1!
477
            # premium feature has been resumed
478
            return False
×
479
        # user was flagged and the premium feature pause period is not yet over
480
        return True
1✔
481

482

483
@receiver(models.signals.post_save, sender=Profile)
1✔
484
def copy_auth_token(sender, instance=None, created=False, **kwargs):
1✔
485
    if created:
1✔
486
        # baker triggers created during tests
487
        # so first check the user doesn't already have a Token
488
        try:
1✔
489
            Token.objects.get(user=instance.user)
1✔
490
            return
1✔
491
        except Token.DoesNotExist:
1✔
492
            Token.objects.create(user=instance.user, key=instance.api_token)
1✔
493

494

495
def address_hash(address, subdomain=None, domain=None):
1✔
496
    if not domain:
1✔
497
        domain = get_domains_from_settings()["MOZMAIL_DOMAIN"]
1✔
498
    if subdomain:
1✔
499
        return sha256(f"{address}@{subdomain}.{domain}".encode("utf-8")).hexdigest()
1✔
500
    if domain == settings.RELAY_FIREFOX_DOMAIN:
1✔
501
        return sha256(f"{address}".encode("utf-8")).hexdigest()
1✔
502
    return sha256(f"{address}@{domain}".encode("utf-8")).hexdigest()
1✔
503

504

505
def address_default():
1✔
506
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=9))
1✔
507

508

509
def has_bad_words(value):
1✔
510
    for badword in emails_config.badwords:
1✔
511
        badword = badword.strip()
1✔
512
        if len(badword) <= 4 and badword == value:
1✔
513
            return True
1✔
514
        if len(badword) > 4 and badword in value:
1✔
515
            return True
1✔
516
    return False
1✔
517

518

519
def is_blocklisted(value):
1✔
520
    return any(blockedword == value for blockedword in emails_config.blocklist)
1✔
521

522

523
def get_domain_numerical(domain_address):
1✔
524
    # get domain name from the address
525
    domains = get_domains_from_settings()
1✔
526
    domains_keys = list(domains.keys())
1✔
527
    domains_values = list(domains.values())
1✔
528
    domain_name = domains_keys[domains_values.index(domain_address)]
1✔
529
    # get domain numerical value from domain name
530
    choices = dict(DOMAIN_CHOICES)
1✔
531
    choices_keys = list(choices.keys())
1✔
532
    choices_values = list(choices.values())
1✔
533
    return choices_keys[choices_values.index(domain_name)]
1✔
534

535

536
def hash_subdomain(subdomain, domain=settings.MOZMAIL_DOMAIN):
1✔
537
    return sha256(f"{subdomain}.{domain}".encode("utf-8")).hexdigest()
1✔
538

539

540
class RegisteredSubdomain(models.Model):
1✔
541
    subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True)
1✔
542
    registered_at = models.DateTimeField(auto_now_add=True)
1✔
543

544
    def __str__(self):
1✔
545
        return self.subdomain_hash
×
546

547

548
class CannotMakeSubdomainException(BadRequest):
1✔
549
    """Exception raised by Profile due to error on subdomain creation.
550

551
    Attributes:
552
        message -- optional explanation of the error
553
    """
554

555
    def __init__(self, message=None):
1✔
556
        self.message = message
1✔
557

558

559
class CannotMakeAddressException(RelayAPIException):
1✔
560
    """Base exception for RelayAddress or DomainAddress creation failure."""
561

562

563
class AccountIsPausedException(CannotMakeAddressException):
1✔
564
    default_code = "account_is_paused"
1✔
565
    default_detail = "Your account is on pause."
1✔
566
    status_code = 403
1✔
567

568

569
class RelayAddrFreeTierLimitException(CannotMakeAddressException):
1✔
570
    default_code = "free_tier_limit"
1✔
571
    default_detail_template = (
1✔
572
        "You’ve used all {free_tier_limit} email masks included with your free account."
573
        "You can reuse an existing mask, but using a unique mask for each account is the most secure option."
574
    )
575
    status_code = 403
1✔
576

577
    def __init__(self, free_tier_limit: Optional[int] = None, *args, **kwargs):
1✔
578
        self.free_tier_limit = free_tier_limit or settings.MAX_NUM_FREE_ALIASES
1✔
579
        self.default_detail = self.default_detail_template.format(
1✔
580
            free_tier_limit=self.free_tier_limit
581
        )
582
        super().__init__(*args, **kwargs)
1✔
583

584
    def error_context(self) -> ErrorContextType:
1✔
585
        return {"free_tier_limit": self.free_tier_limit}
1✔
586

587

588
class DomainAddrFreeTierException(CannotMakeAddressException):
1✔
589
    default_code = "free_tier_no_subdomain_masks"
1✔
590
    default_detail = "Your free account does not include custom subdomains for masks. To create custom masks, upgrade to Relay Premium."
1✔
591
    status_code = 403
1✔
592

593

594
class DomainAddrNeedSubdomainException(CannotMakeAddressException):
1✔
595
    default_code = "need_subdomain"
1✔
596
    default_detail = "Please select a subdomain before creating a custom email address."
1✔
597
    status_code = 400
1✔
598

599

600
class DomainAddrUnavailableException(CannotMakeAddressException):
1✔
601
    default_code = "address_unavailable"
1✔
602
    default_detail_template = "“{unavailable_address}” could not be created. Please try again with a different mask name."
1✔
603
    status_code = 400
1✔
604

605
    def __init__(self, unavailable_address: str, *args, **kwargs):
1✔
606
        self.unavailable_address = unavailable_address
1✔
607
        self.default_detail = self.default_detail_template.format(
1✔
608
            unavailable_address=self.unavailable_address
609
        )
610
        super().__init__(*args, **kwargs)
1✔
611

612
    def error_context(self) -> ErrorContextType:
1✔
613
        return {"unavailable_address": self.unavailable_address}
1✔
614

615

616
class RelayAddress(models.Model):
1✔
617
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
618
    address = models.CharField(max_length=64, default=address_default, unique=True)
1✔
619
    domain = models.PositiveSmallIntegerField(
1✔
620
        choices=DOMAIN_CHOICES, default=default_domain_numerical
621
    )
622
    enabled = models.BooleanField(default=True)
1✔
623
    description = models.CharField(max_length=64, blank=True)
1✔
624
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
625
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
626
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
627
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
628
    num_blocked = models.PositiveIntegerField(default=0)
1✔
629
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
630
    num_replied = models.PositiveIntegerField(default=0)
1✔
631
    num_spam = models.PositiveIntegerField(default=0)
1✔
632
    generated_for = models.CharField(max_length=255, blank=True)
1✔
633
    block_list_emails = models.BooleanField(default=False)
1✔
634
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
635

636
    def __str__(self):
1✔
637
        return self.address
×
638

639
    def delete(self, *args, **kwargs):
1✔
640
        # TODO: create hard bounce receipt rule in AWS for the address
641
        deleted_address = DeletedAddress.objects.create(
1✔
642
            address_hash=address_hash(self.address, domain=self.domain_value),
643
            num_forwarded=self.num_forwarded,
644
            num_blocked=self.num_blocked,
645
            num_replied=self.num_replied,
646
            num_spam=self.num_spam,
647
        )
648
        deleted_address.save()
1✔
649
        profile = Profile.objects.get(user=self.user)
1✔
650
        profile.address_last_deleted = datetime.now(timezone.utc)
1✔
651
        profile.num_address_deleted += 1
1✔
652
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
653
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
654
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
655
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
656
        ) + (self.num_level_one_trackers_blocked or 0)
657
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
658
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
659
        profile.save()
1✔
660
        return super(RelayAddress, self).delete(*args, **kwargs)
1✔
661

662
    def save(self, *args, **kwargs):
1✔
663
        if self._state.adding:
1✔
664
            with transaction.atomic():
1✔
665
                locked_profile = Profile.objects.select_for_update().get(user=self.user)
1✔
666
                check_user_can_make_another_address(locked_profile)
1✔
667
                while True:
1✔
668
                    if valid_address(self.address, self.domain):
1!
669
                        break
1✔
670
                    self.address = address_default()
×
671
                locked_profile.update_abuse_metric(address_created=True)
1✔
672
        if not self.user.profile.server_storage:
1✔
673
            self.description = ""
1✔
674
            self.generated_for = ""
1✔
675
            self.used_on = ""
1✔
676
        if not self.user.profile.has_premium:
1✔
677
            self.block_list_emails = False
1✔
678
        return super().save(*args, **kwargs)
1✔
679

680
    @property
1✔
681
    def domain_value(self):
1✔
682
        return get_domains_from_settings().get(self.get_domain_display())
1✔
683

684
    @property
1✔
685
    def full_address(self):
1✔
686
        return "%s@%s" % (self.address, self.domain_value)
1✔
687

688

689
def check_user_can_make_another_address(profile: Profile) -> None:
1✔
690
    if profile.is_flagged:
1✔
691
        raise AccountIsPausedException()
1✔
692
    # MPP-3021: return early for premium users to avoid at_max_free_aliases DB query
693
    if profile.has_premium:
1✔
694
        return
1✔
695
    if profile.at_max_free_aliases:
1✔
696
        raise RelayAddrFreeTierLimitException()
1✔
697

698

699
def valid_address_pattern(address):
1✔
700
    #   can't start or end with a hyphen
701
    #   must be 1-63 lowercase alphanumeric characters and/or hyphens
702
    valid_address_pattern = re.compile("^(?![-.])[a-z0-9-.]{1,63}(?<![-.])$")
1✔
703
    return valid_address_pattern.match(address) is not None
1✔
704

705

706
def valid_address(address, domain):
1✔
707
    address_pattern_valid = valid_address_pattern(address)
1✔
708
    address_contains_badword = has_bad_words(address)
1✔
709
    address_is_blocklisted = is_blocklisted(address)
1✔
710
    address_already_deleted = DeletedAddress.objects.filter(
1✔
711
        address_hash=address_hash(address, domain=domain)
712
    ).count()
713
    if (
1✔
714
        address_already_deleted > 0
715
        or address_contains_badword
716
        or address_is_blocklisted
717
        or not address_pattern_valid
718
    ):
719
        return False
1✔
720
    return True
1✔
721

722

723
class DeletedAddress(models.Model):
1✔
724
    address_hash = models.CharField(max_length=64, db_index=True)
1✔
725
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
726
    num_blocked = models.PositiveIntegerField(default=0)
1✔
727
    num_replied = models.PositiveIntegerField(default=0)
1✔
728
    num_spam = models.PositiveIntegerField(default=0)
1✔
729

730
    def __str__(self):
1✔
731
        return self.address_hash
×
732

733

734
def check_user_can_make_domain_address(user_profile: Profile) -> None:
1✔
735
    if not user_profile.has_premium:
1✔
736
        raise DomainAddrFreeTierException()
1✔
737

738
    if not user_profile.subdomain:
1✔
739
        raise DomainAddrNeedSubdomainException()
1✔
740

741
    if user_profile.is_flagged:
1✔
742
        raise AccountIsPausedException()
1✔
743

744

745
class DomainAddress(models.Model):
1✔
746
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
747
    address = models.CharField(
1✔
748
        max_length=64, validators=[MinLengthValidator(limit_value=1)]
749
    )
750
    enabled = models.BooleanField(default=True)
1✔
751
    description = models.CharField(max_length=64, blank=True)
1✔
752
    domain = models.PositiveSmallIntegerField(choices=DOMAIN_CHOICES, default=2)
1✔
753
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
754
    first_emailed_at = models.DateTimeField(null=True, db_index=True)
1✔
755
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
756
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
757
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
758
    num_blocked = models.PositiveIntegerField(default=0)
1✔
759
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
760
    num_replied = models.PositiveIntegerField(default=0)
1✔
761
    num_spam = models.PositiveIntegerField(default=0)
1✔
762
    block_list_emails = models.BooleanField(default=False)
1✔
763
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
764

765
    class Meta:
1✔
766
        unique_together = ["user", "address"]
1✔
767

768
    def __str__(self):
1✔
769
        return self.address
×
770

771
    def save(self, *args, **kwargs) -> None:
1✔
772
        user_profile = self.user.profile
1✔
773
        if self._state.adding:
1✔
774
            check_user_can_make_domain_address(user_profile)
1✔
775
            pattern_valid = valid_address_pattern(self.address)
1✔
776
            address_contains_badword = has_bad_words(self.address)
1✔
777
            if not pattern_valid or address_contains_badword:
1✔
778
                raise DomainAddrUnavailableException(unavailable_address=self.address)
1✔
779
            user_profile.update_abuse_metric(address_created=True)
1✔
780
        if not user_profile.has_premium:
1✔
781
            self.block_list_emails = False
1✔
782
        if not user_profile.server_storage:
1✔
783
            self.description = ""
1✔
784
        return super().save(*args, **kwargs)
1✔
785

786
    @property
1✔
787
    def user_profile(self):
1✔
788
        return Profile.objects.get(user=self.user)
1✔
789

790
    @staticmethod
1✔
791
    def make_domain_address(user_profile, address=None, made_via_email=False):
1✔
792
        check_user_can_make_domain_address(user_profile)
1✔
793

794
        address_contains_badword = False
1✔
795
        if not address:
1✔
796
            # FIXME: if the alias is randomly generated and has bad words
797
            # we should retry like make_relay_address does
798
            # not fixing this now because not sure randomly generated
799
            # DomainAlias will be a feature
800
            address = address_default()
1✔
801
            # Only check for bad words if randomly generated
802

803
        domain_address = DomainAddress.objects.create(
1✔
804
            user=user_profile.user,
805
            address=address,
806
        )
807
        if made_via_email:
1✔
808
            # update first_emailed_at indicating alias generation impromptu.
809
            domain_address.first_emailed_at = datetime.now(timezone.utc)
1✔
810
            domain_address.save()
1✔
811
        return domain_address
1✔
812

813
    def delete(self, *args, **kwargs):
1✔
814
        # TODO: create hard bounce receipt rule in AWS for the address
815
        deleted_address = DeletedAddress.objects.create(
1✔
816
            address_hash=address_hash(
817
                self.address, self.user_profile.subdomain, self.domain_value
818
            ),
819
            num_forwarded=self.num_forwarded,
820
            num_blocked=self.num_blocked,
821
            num_replied=self.num_replied,
822
            num_spam=self.num_spam,
823
        )
824
        deleted_address.save()
1✔
825
        # self.user_profile is a property and should not be used to
826
        # update values on the user's profile
827
        profile = Profile.objects.get(user=self.user)
1✔
828
        profile.address_last_deleted = datetime.now(timezone.utc)
1✔
829
        profile.num_address_deleted += 1
1✔
830
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
831
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
832
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
833
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
834
        ) + (self.num_level_one_trackers_blocked or 0)
835
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
836
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
837
        profile.save()
1✔
838
        return super(DomainAddress, self).delete(*args, **kwargs)
1✔
839

840
    @property
1✔
841
    def domain_value(self):
1✔
842
        return get_domains_from_settings().get(self.get_domain_display())
1✔
843

844
    @property
1✔
845
    def full_address(self):
1✔
846
        return "%s@%s.%s" % (
1✔
847
            self.address,
848
            self.user_profile.subdomain,
849
            self.domain_value,
850
        )
851

852

853
class Reply(models.Model):
1✔
854
    relay_address = models.ForeignKey(
1✔
855
        RelayAddress, on_delete=models.CASCADE, blank=True, null=True
856
    )
857
    domain_address = models.ForeignKey(
1✔
858
        DomainAddress, on_delete=models.CASCADE, blank=True, null=True
859
    )
860
    lookup = models.CharField(max_length=255, blank=False, db_index=True)
1✔
861
    encrypted_metadata = models.TextField(blank=False)
1✔
862
    created_at = models.DateField(auto_now_add=True, null=False, db_index=True)
1✔
863

864
    @property
1✔
865
    def address(self):
1✔
866
        return self.relay_address or self.domain_address
1✔
867

868
    @property
1✔
869
    def profile(self):
1✔
870
        return self.address.user.profile
1✔
871

872
    @property
1✔
873
    def owner_has_premium(self):
1✔
874
        return self.profile.has_premium
1✔
875

876
    def increment_num_replied(self):
1✔
877
        address = self.relay_address or self.domain_address
1✔
878
        address.num_replied += 1
1✔
879
        address.last_used_at = datetime.now(timezone.utc)
1✔
880
        address.save(update_fields=["num_replied", "last_used_at"])
1✔
881
        return address.num_replied
1✔
882

883

884
class AbuseMetrics(models.Model):
1✔
885
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
886
    first_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
887
    last_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
888
    num_address_created_per_day = models.PositiveSmallIntegerField(default=0)
1✔
889
    num_replies_per_day = models.PositiveSmallIntegerField(default=0)
1✔
890
    # Values from 0 to 32767 are safe in all databases supported by Django.
891
    num_email_forwarded_per_day = models.PositiveSmallIntegerField(default=0)
1✔
892
    # Values from 0 to 9223372036854775807 are safe in all databases supported by Django.
893
    forwarded_email_size_per_day = models.PositiveBigIntegerField(default=0)
1✔
894

895
    class Meta:
1✔
896
        unique_together = ["user", "first_recorded"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc