• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 3bc340e2-329f-4ed2-8700-adaaac8d78c8

15 Dec 2023 06:50PM CUT coverage: 73.514% (-0.1%) from 73.614%
3bc340e2-329f-4ed2-8700-adaaac8d78c8

push

circleci

jwhitlock
Use branch database with production tests

Previously, migrations tests were run with production code, branch
requirements, and branch migrations. Now they run with production
requirements, so that third-party migrations are tested as well.

This uses pytest --reuse-db to create a test database with the branch's
migrations, and then a pip install with the production code. This more
closely emulates the mixed environment during a deploy.

1962 of 2913 branches covered (0.0%)

Branch coverage included in aggregate %.

6273 of 8289 relevant lines covered (75.68%)

19.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.82
/emails/models.py
1
from collections import namedtuple
1✔
2
from datetime import datetime, timedelta, timezone
1✔
3
from hashlib import sha256
1✔
4
from typing import Optional
1✔
5
import logging
1✔
6
import random
1✔
7
import re
1✔
8
import string
1✔
9
import uuid
1✔
10
from django.apps import apps
1✔
11
from django.conf import settings
1✔
12
from django.contrib.auth.models import User
1✔
13
from django.core.exceptions import BadRequest
1✔
14
from django.core.validators import MinLengthValidator
1✔
15
from django.db import models, transaction
1✔
16
from django.dispatch import receiver
1✔
17
from django.utils.translation.trans_real import (
1✔
18
    parse_accept_lang_header,
19
    get_supported_language_variant,
20
)
21

22
from rest_framework.authtoken.models import Token
1✔
23

24
from api.exceptions import ErrorContextType, RelayAPIException
1✔
25
from privaterelay.plans import get_premium_countries
1✔
26
from privaterelay.utils import (
1✔
27
    AcceptLanguageError,
28
    flag_is_active_in_task,
29
    guess_country_from_accept_lang,
30
)
31

32

33
emails_config = apps.get_app_config("emails")
1✔
34
logger = logging.getLogger("events")
1✔
35
abuse_logger = logging.getLogger("abusemetrics")
1✔
36

37
BounceStatus = namedtuple("BounceStatus", "paused type")
1✔
38

39

40
def get_domains_from_settings():
1✔
41
    # HACK: detect if code is running in django tests
42
    if "testserver" in settings.ALLOWED_HOSTS:
1!
43
        return {"RELAY_FIREFOX_DOMAIN": "default.com", "MOZMAIL_DOMAIN": "test.com"}
1✔
44
    return {
×
45
        "RELAY_FIREFOX_DOMAIN": settings.RELAY_FIREFOX_DOMAIN,
46
        "MOZMAIL_DOMAIN": settings.MOZMAIL_DOMAIN,
47
    }
48

49

50
DOMAIN_CHOICES = [(1, "RELAY_FIREFOX_DOMAIN"), (2, "MOZMAIL_DOMAIN")]
1✔
51
PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"]
1✔
52

53

54
def valid_available_subdomain(subdomain, *args, **kwargs):
1✔
55
    if not subdomain:
1✔
56
        raise CannotMakeSubdomainException("error-subdomain-cannot-be-empty-or-null")
1✔
57
    # valid subdomains:
58
    #   can't start or end with a hyphen
59
    #   must be 1-63 alphanumeric characters and/or hyphens
60
    subdomain = subdomain.lower()
1✔
61
    valid_subdomain_pattern = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$")
1✔
62
    valid = valid_subdomain_pattern.match(subdomain) is not None
1✔
63
    #   can't have "bad" words in them
64
    bad_word = has_bad_words(subdomain)
1✔
65
    #   can't have "blocked" words in them
66
    blocked_word = is_blocklisted(subdomain)
1✔
67
    #   can't be taken by someone else
68
    taken = (
1✔
69
        RegisteredSubdomain.objects.filter(
70
            subdomain_hash=hash_subdomain(subdomain)
71
        ).count()
72
        > 0
73
    )
74
    if not valid or bad_word or blocked_word or taken:
1✔
75
        raise CannotMakeSubdomainException("error-subdomain-not-available")
1✔
76
    return True
1✔
77

78

79
# This historical function is referenced in migration 0029_profile_add_deleted_metric_and_changeserver_storage_default
80
def default_server_storage():
1✔
81
    return True
×
82

83

84
def default_domain_numerical():
1✔
85
    domains = get_domains_from_settings()
1✔
86
    domain = domains["MOZMAIL_DOMAIN"]
1✔
87
    return get_domain_numerical(domain)
1✔
88

89

90
class Profile(models.Model):
1✔
91
    user = models.OneToOneField(User, on_delete=models.CASCADE)
1✔
92
    api_token = models.UUIDField(default=uuid.uuid4)
1✔
93
    num_address_deleted = models.PositiveIntegerField(default=0)
1✔
94
    date_subscribed = models.DateTimeField(blank=True, null=True)
1✔
95
    date_subscribed_phone = models.DateTimeField(blank=True, null=True)
1✔
96
    # TODO: delete date_phone_subscription_checked in favor of date_phone_subscription_next_reset
97
    date_phone_subscription_checked = models.DateTimeField(blank=True, null=True)
1✔
98
    date_phone_subscription_start = models.DateTimeField(blank=True, null=True)
1✔
99
    date_phone_subscription_reset = models.DateTimeField(blank=True, null=True)
1✔
100
    date_phone_subscription_end = models.DateTimeField(blank=True, null=True)
1✔
101
    address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
102
    last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
103
    last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
104
    last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
105
    num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
106
    num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
107
    num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField(
1✔
108
        default=0, null=True
109
    )
110
    num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
111
    num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
112
    subdomain = models.CharField(
1✔
113
        blank=True,
114
        null=True,
115
        unique=True,
116
        max_length=63,
117
        db_index=True,
118
        validators=[valid_available_subdomain],
119
    )
120
    # Whether we store the user's alias labels in the server
121
    server_storage = models.BooleanField(default=True)
1✔
122
    # Whether we store the caller/sender log for the user's relay number
123
    store_phone_log = models.BooleanField(default=True)
1✔
124
    # TODO: Data migration to set null to false
125
    # TODO: Schema migration to remove null=True
126
    remove_level_one_email_trackers = models.BooleanField(null=True, default=False)
1✔
127
    onboarding_state = models.PositiveIntegerField(default=0)
1✔
128
    onboarding_free_state = models.PositiveIntegerField(default=0)
1✔
129
    auto_block_spam = models.BooleanField(default=False)
1✔
130
    forwarded_first_reply = models.BooleanField(default=False)
1✔
131
    # Empty string means the profile was created through relying party flow
132
    created_by = models.CharField(blank=True, null=True, max_length=63)
1✔
133
    sent_welcome_email = models.BooleanField(default=False)
1✔
134

135
    def __str__(self):
1✔
136
        return "%s Profile" % self.user
1✔
137

138
    def save(self, *args, **kwargs):
1✔
139
        # always lower-case the subdomain before saving it
140
        # TODO: change subdomain field as a custom field inheriting from
141
        # CharField to validate constraints on the field update too
142
        if self.subdomain and not self.subdomain.islower():
1✔
143
            self.subdomain = self.subdomain.lower()
1✔
144
        ret = super().save(*args, **kwargs)
1✔
145
        # any time a profile is saved with server_storage False, delete the
146
        # appropriate server-stored Relay address data.
147
        if not self.server_storage:
1✔
148
            relay_addresses = RelayAddress.objects.filter(user=self.user)
1✔
149
            relay_addresses.update(description="", generated_for="", used_on="")
1✔
150
        if settings.PHONES_ENABLED:
1!
151
            # any time a profile is saved with store_phone_log False, delete the
152
            # appropriate server-stored InboundContact records
153
            from phones.models import InboundContact, RelayNumber
1✔
154

155
            if not self.store_phone_log:
1✔
156
                try:
1✔
157
                    relay_number = RelayNumber.objects.get(user=self.user)
1✔
158
                    InboundContact.objects.filter(relay_number=relay_number).delete()
1✔
159
                except RelayNumber.DoesNotExist:
1✔
160
                    pass
1✔
161
        return ret
1✔
162

163
    @property
1✔
164
    def language(self):
1✔
165
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
166
            for accept_lang, _ in parse_accept_lang_header(
1!
167
                self.fxa.extra_data.get("locale")
168
            ):
169
                try:
1✔
170
                    return get_supported_language_variant(accept_lang)
1✔
171
                except LookupError:
×
172
                    continue
×
173
        return "en"
1✔
174

175
    # This method returns whether the locale associated with the user's Mozilla account
176
    # includes a country code from a Premium country. This is less accurate than using
177
    # get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so
178
    # prefer using that if a request context is available. In other contexts, for
179
    # example when sending an email, this method can be useful.
180
    @property
1✔
181
    def fxa_locale_in_premium_country(self) -> bool:
1✔
182
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
183
            try:
1✔
184
                country = guess_country_from_accept_lang(self.fxa.extra_data["locale"])
1✔
185
            except AcceptLanguageError:
1✔
186
                return False
1✔
187
            premium_countries = get_premium_countries()
1✔
188
            if country in premium_countries:
1✔
189
                return True
1✔
190
        return False
1✔
191

192
    @property
1✔
193
    def avatar(self):
1✔
194
        return self.fxa.extra_data.get("avatar")
1✔
195

196
    @property
1✔
197
    def relay_addresses(self):
1✔
198
        return RelayAddress.objects.filter(user=self.user)
1✔
199

200
    @property
1✔
201
    def domain_addresses(self):
1✔
202
        return DomainAddress.objects.filter(user=self.user)
1✔
203

204
    @property
1✔
205
    def total_masks(self) -> int:
1✔
206
        ra_count: int = self.relay_addresses.count()
1✔
207
        da_count: int = self.domain_addresses.count()
1✔
208
        return ra_count + da_count
1✔
209

210
    @property
1✔
211
    def at_mask_limit(self) -> bool:
1✔
212
        if self.has_premium:
1✔
213
            return False
1✔
214
        ra_count: int = self.relay_addresses.count()
1✔
215
        return ra_count >= settings.MAX_NUM_FREE_ALIASES
1✔
216

217
    def check_bounce_pause(self):
1✔
218
        if self.last_hard_bounce:
1✔
219
            last_hard_bounce_allowed = datetime.now(timezone.utc) - timedelta(
1✔
220
                days=settings.HARD_BOUNCE_ALLOWED_DAYS
221
            )
222
            if self.last_hard_bounce > last_hard_bounce_allowed:
1✔
223
                return BounceStatus(True, "hard")
1✔
224
            self.last_hard_bounce = None
1✔
225
            self.save()
1✔
226
        if self.last_soft_bounce:
1✔
227
            last_soft_bounce_allowed = datetime.now(timezone.utc) - timedelta(
1✔
228
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
229
            )
230
            if self.last_soft_bounce > last_soft_bounce_allowed:
1✔
231
                return BounceStatus(True, "soft")
1✔
232
            self.last_soft_bounce = None
1✔
233
            self.save()
1✔
234
        return BounceStatus(False, "")
1✔
235

236
    @property
1✔
237
    def bounce_status(self):
1✔
238
        return self.check_bounce_pause()
1✔
239

240
    @property
1✔
241
    def next_email_try(self):
1✔
242
        bounce_pause, bounce_type = self.check_bounce_pause()
1✔
243

244
        if not bounce_pause:
1✔
245
            return datetime.now(timezone.utc)
1✔
246

247
        if bounce_type == "soft":
1✔
248
            return self.last_soft_bounce + timedelta(
1✔
249
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
250
            )
251

252
        return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS)
1✔
253

254
    @property
1✔
255
    def last_bounce_date(self):
1✔
256
        if self.last_hard_bounce:
1✔
257
            return self.last_hard_bounce
1✔
258
        if self.last_soft_bounce:
1✔
259
            return self.last_soft_bounce
1✔
260
        return None
1✔
261

262
    @property
1✔
263
    def at_max_free_aliases(self) -> bool:
1✔
264
        relay_addresses_count: int = self.relay_addresses.count()
1✔
265
        return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES
1✔
266

267
    @property
1✔
268
    def fxa(self):
1✔
269
        # Note: we are NOT using .filter() here because it invalidates
270
        # any profile instances that were queried with prefetch_related, which
271
        # we use in at least the profile view to minimize queries
272
        for sa in self.user.socialaccount_set.all():
1✔
273
            if sa.provider == "fxa":
1!
274
                return sa
1✔
275
        return None
1✔
276

277
    @property
1✔
278
    def display_name(self):
1✔
279
        # if display name is not set on FxA the
280
        # displayName key will not exist on the extra_data
281
        return self.fxa.extra_data.get("displayName")
1✔
282

283
    @property
1✔
284
    def custom_domain(self):
1✔
285
        return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}"
×
286

287
    @property
1✔
288
    def has_premium(self):
1✔
289
        # FIXME: as we don't have all the tiers defined we are over-defining
290
        # this to mark the user as a premium user as well
291
        if not self.fxa:
1✔
292
            return False
1✔
293
        for premium_domain in PREMIUM_DOMAINS:
1✔
294
            if self.user.email.endswith(f"@{premium_domain}"):
1!
295
                return True
×
296
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
297
        for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED:
1✔
298
            if sub in user_subscriptions:
1✔
299
                return True
1✔
300
        return False
1✔
301

302
    @property
1✔
303
    def has_phone(self):
1✔
304
        if not self.fxa:
1✔
305
            return False
1✔
306
        if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST:
1!
307
            if not flag_is_active_in_task("phones", self.user):
×
308
                return False
×
309
        if flag_is_active_in_task("free_phones", self.user):
1!
310
            return True
×
311
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
312
        for sub in settings.SUBSCRIPTIONS_WITH_PHONE:
1✔
313
            if sub in user_subscriptions:
1✔
314
                return True
1✔
315
        return False
1✔
316

317
    @property
1✔
318
    def has_vpn(self):
1✔
319
        if not self.fxa:
1!
320
            return False
×
321
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
322
        for sub in settings.SUBSCRIPTIONS_WITH_VPN:
1✔
323
            if sub in user_subscriptions:
1!
324
                return True
×
325
        return False
1✔
326

327
    @property
1✔
328
    def emails_forwarded(self):
1✔
329
        return (
1✔
330
            sum(ra.num_forwarded for ra in self.relay_addresses)
331
            + sum(da.num_forwarded for da in self.domain_addresses)
332
            + self.num_email_forwarded_in_deleted_address
333
        )
334

335
    @property
1✔
336
    def emails_blocked(self):
1✔
337
        return (
1✔
338
            sum(ra.num_blocked for ra in self.relay_addresses)
339
            + sum(da.num_blocked for da in self.domain_addresses)
340
            + self.num_email_blocked_in_deleted_address
341
        )
342

343
    @property
1✔
344
    def emails_replied(self):
1✔
345
        # Once Django is on version 4.0 and above, we can set the default=0
346
        # and return a int instead of None
347
        # https://docs.djangoproject.com/en/4.0/ref/models/querysets/#default
348
        totals = [self.relay_addresses.aggregate(models.Sum("num_replied"))]
1✔
349
        totals.append(self.domain_addresses.aggregate(models.Sum("num_replied")))
1✔
350
        total_num_replied = 0
1✔
351
        for num in totals:
1✔
352
            total_num_replied += (
1✔
353
                num.get("num_replied__sum") if num.get("num_replied__sum") else 0
354
            )
355
        return total_num_replied + self.num_email_replied_in_deleted_address
1✔
356

357
    @property
1✔
358
    def level_one_trackers_blocked(self):
1✔
359
        return (
1✔
360
            sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses)
361
            + sum(
362
                da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses
363
            )
364
            + (self.num_level_one_trackers_blocked_in_deleted_address or 0)
365
        )
366

367
    @property
1✔
368
    def joined_before_premium_release(self):
1✔
369
        date_created = self.user.date_joined
1✔
370
        return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00")
1✔
371

372
    def add_subdomain(self, subdomain):
1✔
373
        # Handles if the subdomain is "" or None
374
        if not subdomain:
1✔
375
            raise CannotMakeSubdomainException(
1✔
376
                "error-subdomain-cannot-be-empty-or-null"
377
            )
378

379
        # subdomain must be all lowercase
380
        subdomain = subdomain.lower()
1✔
381

382
        if not self.has_premium:
1✔
383
            raise CannotMakeSubdomainException("error-premium-set-subdomain")
1✔
384
        if self.subdomain is not None:
1✔
385
            raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain")
1✔
386
        self.subdomain = subdomain
1✔
387
        # The validator defined in the subdomain field does not get run in full_clean() when self.subdomain is "" or None
388
        # So we need to run the validator again to catch these cases.
389
        valid_available_subdomain(subdomain)
1✔
390
        self.full_clean()
1✔
391
        self.save()
1✔
392

393
        RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain))
1✔
394
        return subdomain
1✔
395

396
    def update_abuse_metric(
1✔
397
        self,
398
        address_created=False,
399
        replied=False,
400
        email_forwarded=False,
401
        forwarded_email_size=0,
402
    ):
403
        #  TODO: this should be wrapped in atomic to ensure race conditions are properly handled
404
        # look for abuse metrics created on the same UTC date, regardless of time.
405
        midnight_utc_today = datetime.combine(
1✔
406
            datetime.now(timezone.utc).date(), datetime.min.time()
407
        ).astimezone(timezone.utc)
408
        midnight_utc_tomorow = midnight_utc_today + timedelta(days=1)
1✔
409
        abuse_metric = self.user.abusemetrics_set.filter(
1✔
410
            first_recorded__gte=midnight_utc_today,
411
            first_recorded__lt=midnight_utc_tomorow,
412
        ).first()
413
        if not abuse_metric:
1✔
414
            abuse_metric = AbuseMetrics.objects.create(user=self.user)
1✔
415
            AbuseMetrics.objects.filter(first_recorded__lt=midnight_utc_today).delete()
1✔
416

417
        # increment the abuse metric
418
        if address_created:
1✔
419
            abuse_metric.num_address_created_per_day += 1
1✔
420
        if replied:
1✔
421
            abuse_metric.num_replies_per_day += 1
1✔
422
        if email_forwarded:
1✔
423
            abuse_metric.num_email_forwarded_per_day += 1
1✔
424
        if forwarded_email_size > 0:
1✔
425
            abuse_metric.forwarded_email_size_per_day += forwarded_email_size
1✔
426
        abuse_metric.last_recorded = datetime.now(timezone.utc)
1✔
427
        abuse_metric.save()
1✔
428

429
        # check user should be flagged for abuse
430
        hit_max_create = False
1✔
431
        hit_max_replies = False
1✔
432
        hit_max_forwarded = False
1✔
433
        hit_max_forwarded_email_size = False
1✔
434

435
        hit_max_create = (
1✔
436
            abuse_metric.num_address_created_per_day
437
            >= settings.MAX_ADDRESS_CREATION_PER_DAY
438
        )
439
        hit_max_replies = (
1✔
440
            abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY
441
        )
442
        hit_max_forwarded = (
1✔
443
            abuse_metric.num_email_forwarded_per_day >= settings.MAX_FORWARDED_PER_DAY
444
        )
445
        hit_max_forwarded_email_size = (
1✔
446
            abuse_metric.forwarded_email_size_per_day
447
            >= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY
448
        )
449
        if (
1✔
450
            hit_max_create
451
            or hit_max_replies
452
            or hit_max_forwarded
453
            or hit_max_forwarded_email_size
454
        ):
455
            self.last_account_flagged = datetime.now(timezone.utc)
1✔
456
            self.save()
1✔
457
            data = {
1✔
458
                "uid": self.fxa.uid,
459
                "flagged": self.last_account_flagged.timestamp(),
460
                "replies": abuse_metric.num_replies_per_day,
461
                "addresses": abuse_metric.num_address_created_per_day,
462
                "forwarded": abuse_metric.num_email_forwarded_per_day,
463
                "forwarded_size_in_bytes": abuse_metric.forwarded_email_size_per_day,
464
            }
465
            # log for further secops review
466
            abuse_logger.info("Abuse flagged", extra=data)
1✔
467
        return self.last_account_flagged
1✔
468

469
    @property
1✔
470
    def is_flagged(self):
1✔
471
        if not self.last_account_flagged:
1✔
472
            return False
1✔
473
        account_premium_feature_resumed = self.last_account_flagged + timedelta(
1✔
474
            days=settings.PREMIUM_FEATURE_PAUSED_DAYS
475
        )
476
        if datetime.now(timezone.utc) > account_premium_feature_resumed:
1!
477
            # premium feature has been resumed
478
            return False
×
479
        # user was flagged and the premium feature pause period is not yet over
480
        return True
1✔
481

482

483
@receiver(models.signals.post_save, sender=Profile)
1✔
484
def copy_auth_token(sender, instance=None, created=False, **kwargs):
1✔
485
    if created:
1✔
486
        # baker triggers created during tests
487
        # so first check the user doesn't already have a Token
488
        try:
1✔
489
            Token.objects.get(user=instance.user)
1✔
490
            return
1✔
491
        except Token.DoesNotExist:
1✔
492
            Token.objects.create(user=instance.user, key=instance.api_token)
1✔
493

494

495
def address_hash(address, subdomain=None, domain=None):
1✔
496
    if not domain:
1✔
497
        domain = get_domains_from_settings()["MOZMAIL_DOMAIN"]
1✔
498
    if subdomain:
1✔
499
        return sha256(f"{address}@{subdomain}.{domain}".encode("utf-8")).hexdigest()
1✔
500
    if domain == settings.RELAY_FIREFOX_DOMAIN:
1✔
501
        return sha256(f"{address}".encode("utf-8")).hexdigest()
1✔
502
    return sha256(f"{address}@{domain}".encode("utf-8")).hexdigest()
1✔
503

504

505
def address_default():
1✔
506
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=9))
1✔
507

508

509
def has_bad_words(value):
1✔
510
    for badword in emails_config.badwords:
1✔
511
        badword = badword.strip()
1✔
512
        if len(badword) <= 4 and badword == value:
1✔
513
            return True
1✔
514
        if len(badword) > 4 and badword in value:
1✔
515
            return True
1✔
516
    return False
1✔
517

518

519
def is_blocklisted(value):
1✔
520
    return any(blockedword == value for blockedword in emails_config.blocklist)
1✔
521

522

523
def get_domain_numerical(domain_address):
1✔
524
    # get domain name from the address
525
    domains = get_domains_from_settings()
1✔
526
    domains_keys = list(domains.keys())
1✔
527
    domains_values = list(domains.values())
1✔
528
    domain_name = domains_keys[domains_values.index(domain_address)]
1✔
529
    # get domain numerical value from domain name
530
    choices = dict(DOMAIN_CHOICES)
1✔
531
    choices_keys = list(choices.keys())
1✔
532
    choices_values = list(choices.values())
1✔
533
    return choices_keys[choices_values.index(domain_name)]
1✔
534

535

536
def hash_subdomain(subdomain, domain=settings.MOZMAIL_DOMAIN):
1✔
537
    return sha256(f"{subdomain}.{domain}".encode("utf-8")).hexdigest()
1✔
538

539

540
class RegisteredSubdomain(models.Model):
1✔
541
    subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True)
1✔
542
    registered_at = models.DateTimeField(auto_now_add=True)
1✔
543

544
    def __str__(self):
1✔
545
        return self.subdomain_hash
×
546

547

548
class CannotMakeSubdomainException(BadRequest):
1✔
549
    """Exception raised by Profile due to error on subdomain creation.
550

551
    Attributes:
552
        message -- optional explanation of the error
553
    """
554

555
    def __init__(self, message=None):
1✔
556
        self.message = message
1✔
557

558

559
class CannotMakeAddressException(RelayAPIException):
1✔
560
    """Base exception for RelayAddress or DomainAddress creation failure."""
561

562

563
class AccountIsPausedException(CannotMakeAddressException):
1✔
564
    default_code = "account_is_paused"
1✔
565
    default_detail = "Your account is on pause."
1✔
566
    status_code = 403
1✔
567

568

569
class RelayAddrFreeTierLimitException(CannotMakeAddressException):
1✔
570
    default_code = "free_tier_limit"
1✔
571
    default_detail_template = (
1✔
572
        "You’ve used all {free_tier_limit} email masks included with your free account."
573
        "You can reuse an existing mask, but using a unique mask for each account is the most secure option."
574
    )
575
    status_code = 403
1✔
576

577
    def __init__(self, free_tier_limit: Optional[int] = None, *args, **kwargs):
1✔
578
        self.free_tier_limit = free_tier_limit or settings.MAX_NUM_FREE_ALIASES
1✔
579
        self.default_detail = self.default_detail_template.format(
1✔
580
            free_tier_limit=self.free_tier_limit
581
        )
582
        super().__init__(*args, **kwargs)
1✔
583

584
    def error_context(self) -> ErrorContextType:
1✔
585
        return {"free_tier_limit": self.free_tier_limit}
1✔
586

587

588
class DomainAddrFreeTierException(CannotMakeAddressException):
1✔
589
    default_code = "free_tier_no_subdomain_masks"
1✔
590
    default_detail = "Your free account does not include custom subdomains for masks. To create custom masks, upgrade to Relay Premium."
1✔
591
    status_code = 403
1✔
592

593

594
class DomainAddrNeedSubdomainException(CannotMakeAddressException):
1✔
595
    default_code = "need_subdomain"
1✔
596
    default_detail = "Please select a subdomain before creating a custom email address."
1✔
597
    status_code = 400
1✔
598

599

600
class DomainAddrUnavailableException(CannotMakeAddressException):
1✔
601
    default_code = "address_unavailable"
1✔
602
    default_detail_template = "“{unavailable_address}” could not be created. Please try again with a different mask name."
1✔
603
    status_code = 400
1✔
604

605
    def __init__(self, unavailable_address: str, *args, **kwargs):
1✔
606
        self.unavailable_address = unavailable_address
1✔
607
        self.default_detail = self.default_detail_template.format(
1✔
608
            unavailable_address=self.unavailable_address
609
        )
610
        super().__init__(*args, **kwargs)
1✔
611

612
    def error_context(self) -> ErrorContextType:
1✔
613
        return {"unavailable_address": self.unavailable_address}
1✔
614

615

616
class RelayAddress(models.Model):
1✔
617
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
618
    address = models.CharField(max_length=64, default=address_default, unique=True)
1✔
619
    domain = models.PositiveSmallIntegerField(
1✔
620
        choices=DOMAIN_CHOICES, default=default_domain_numerical
621
    )
622
    enabled = models.BooleanField(default=True)
1✔
623
    description = models.CharField(max_length=64, blank=True)
1✔
624
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
625
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
626
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
627
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
628
    num_blocked = models.PositiveIntegerField(default=0)
1✔
629
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
630
    num_replied = models.PositiveIntegerField(default=0)
1✔
631
    num_spam = models.PositiveIntegerField(default=0)
1✔
632
    generated_for = models.CharField(max_length=255, blank=True)
1✔
633
    block_list_emails = models.BooleanField(default=False)
1✔
634
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
635

636
    def __str__(self):
1✔
637
        return self.address
×
638

639
    def delete(self, *args, **kwargs):
1✔
640
        # TODO: create hard bounce receipt rule in AWS for the address
641
        deleted_address = DeletedAddress.objects.create(
1✔
642
            address_hash=address_hash(self.address, domain=self.domain_value),
643
            num_forwarded=self.num_forwarded,
644
            num_blocked=self.num_blocked,
645
            num_replied=self.num_replied,
646
            num_spam=self.num_spam,
647
        )
648
        deleted_address.save()
1✔
649
        profile = Profile.objects.get(user=self.user)
1✔
650
        profile.address_last_deleted = datetime.now(timezone.utc)
1✔
651
        profile.num_address_deleted += 1
1✔
652
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
653
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
654
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
655
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
656
        ) + (self.num_level_one_trackers_blocked or 0)
657
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
658
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
659
        profile.save()
1✔
660
        return super(RelayAddress, self).delete(*args, **kwargs)
1✔
661

662
    def save(self, *args, **kwargs):
1✔
663
        if self._state.adding:
1✔
664
            with transaction.atomic():
1✔
665
                locked_profile = Profile.objects.select_for_update().get(user=self.user)
1✔
666
                check_user_can_make_another_address(locked_profile)
1✔
667
                while True:
1✔
668
                    if valid_address(self.address, self.domain):
1!
669
                        break
1✔
670
                    self.address = address_default()
×
671
                locked_profile.update_abuse_metric(address_created=True)
1✔
672
        if not self.user.profile.server_storage:
1✔
673
            self.description = ""
1✔
674
            self.generated_for = ""
1✔
675
            self.used_on = ""
1✔
676
        if not self.user.profile.has_premium:
1✔
677
            self.block_list_emails = False
1✔
678
        return super().save(*args, **kwargs)
1✔
679

680
    @property
1✔
681
    def domain_value(self):
1✔
682
        return get_domains_from_settings().get(self.get_domain_display())
1✔
683

684
    @property
1✔
685
    def full_address(self):
1✔
686
        return "%s@%s" % (self.address, self.domain_value)
1✔
687

688

689
def check_user_can_make_another_address(profile: Profile) -> None:
1✔
690
    if profile.is_flagged:
1✔
691
        raise AccountIsPausedException()
1✔
692
    # MPP-3021: return early for premium users to avoid at_max_free_aliases DB query
693
    if profile.has_premium:
1✔
694
        return
1✔
695
    if profile.at_max_free_aliases:
1✔
696
        raise RelayAddrFreeTierLimitException()
1✔
697

698

699
def valid_address_pattern(address):
1✔
700
    #   can't start or end with a hyphen
701
    #   must be 1-63 lowercase alphanumeric characters and/or hyphens
702
    valid_address_pattern = re.compile("^(?![-.])[a-z0-9-.]{1,63}(?<![-.])$")
1✔
703
    return valid_address_pattern.match(address) is not None
1✔
704

705

706
def valid_address(address, domain):
1✔
707
    address_pattern_valid = valid_address_pattern(address)
1✔
708
    address_contains_badword = has_bad_words(address)
1✔
709
    address_is_blocklisted = is_blocklisted(address)
1✔
710
    address_already_deleted = DeletedAddress.objects.filter(
1✔
711
        address_hash=address_hash(address, domain=domain)
712
    ).count()
713
    if (
1✔
714
        address_already_deleted > 0
715
        or address_contains_badword
716
        or address_is_blocklisted
717
        or not address_pattern_valid
718
    ):
719
        return False
1✔
720
    return True
1✔
721

722

723
class DeletedAddress(models.Model):
1✔
724
    address_hash = models.CharField(max_length=64, db_index=True)
1✔
725
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
726
    num_blocked = models.PositiveIntegerField(default=0)
1✔
727
    num_replied = models.PositiveIntegerField(default=0)
1✔
728
    num_spam = models.PositiveIntegerField(default=0)
1✔
729

730
    def __str__(self):
1✔
731
        return self.address_hash
×
732

733

734
def check_user_can_make_domain_address(user_profile: Profile) -> None:
1✔
735
    if not user_profile.has_premium:
1✔
736
        raise DomainAddrFreeTierException()
1✔
737

738
    if not user_profile.subdomain:
1✔
739
        raise DomainAddrNeedSubdomainException()
1✔
740

741
    if user_profile.is_flagged:
1✔
742
        raise AccountIsPausedException()
1✔
743

744

745
class DomainAddress(models.Model):
1✔
746
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
747
    address = models.CharField(
1✔
748
        max_length=64, validators=[MinLengthValidator(limit_value=1)]
749
    )
750
    enabled = models.BooleanField(default=True)
1✔
751
    description = models.CharField(max_length=64, blank=True)
1✔
752
    domain = models.PositiveSmallIntegerField(choices=DOMAIN_CHOICES, default=2)
1✔
753
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
754
    first_emailed_at = models.DateTimeField(null=True, db_index=True)
1✔
755
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
756
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
757
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
758
    num_blocked = models.PositiveIntegerField(default=0)
1✔
759
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
760
    num_replied = models.PositiveIntegerField(default=0)
1✔
761
    num_spam = models.PositiveIntegerField(default=0)
1✔
762
    block_list_emails = models.BooleanField(default=False)
1✔
763
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
764

765
    class Meta:
1✔
766
        unique_together = ["user", "address"]
1✔
767

768
    def __str__(self):
1✔
769
        return self.address
×
770

771
    def save(self, *args, **kwargs) -> None:
1✔
772
        user_profile = self.user.profile
1✔
773
        if self._state.adding:
1✔
774
            check_user_can_make_domain_address(user_profile)
1✔
775
            pattern_valid = valid_address_pattern(self.address)
1✔
776
            address_contains_badword = has_bad_words(self.address)
1✔
777
            if not pattern_valid or address_contains_badword:
1✔
778
                raise DomainAddrUnavailableException(unavailable_address=self.address)
1✔
779
            user_profile.update_abuse_metric(address_created=True)
1✔
780
        if not user_profile.has_premium:
1✔
781
            self.block_list_emails = False
1✔
782
        if not user_profile.server_storage:
1✔
783
            self.description = ""
1✔
784
        return super().save(*args, **kwargs)
1✔
785

786
    @property
1✔
787
    def user_profile(self):
1✔
788
        return Profile.objects.get(user=self.user)
1✔
789

790
    @staticmethod
1✔
791
    def make_domain_address(user_profile, address=None, made_via_email=False):
1✔
792
        check_user_can_make_domain_address(user_profile)
1✔
793

794
        address_contains_badword = False
1✔
795
        if not address:
1✔
796
            # FIXME: if the alias is randomly generated and has bad words
797
            # we should retry like make_relay_address does
798
            # not fixing this now because not sure randomly generated
799
            # DomainAlias will be a feature
800
            address = address_default()
1✔
801
            # Only check for bad words if randomly generated
802

803
        domain_address = DomainAddress.objects.create(
1✔
804
            user=user_profile.user,
805
            address=address,
806
        )
807
        if made_via_email:
1✔
808
            # update first_emailed_at indicating alias generation impromptu.
809
            domain_address.first_emailed_at = datetime.now(timezone.utc)
1✔
810
            domain_address.save()
1✔
811
        return domain_address
1✔
812

813
    def delete(self, *args, **kwargs):
1✔
814
        # TODO: create hard bounce receipt rule in AWS for the address
815
        deleted_address = DeletedAddress.objects.create(
1✔
816
            address_hash=address_hash(
817
                self.address, self.user_profile.subdomain, self.domain_value
818
            ),
819
            num_forwarded=self.num_forwarded,
820
            num_blocked=self.num_blocked,
821
            num_replied=self.num_replied,
822
            num_spam=self.num_spam,
823
        )
824
        deleted_address.save()
1✔
825
        # self.user_profile is a property and should not be used to
826
        # update values on the user's profile
827
        profile = Profile.objects.get(user=self.user)
1✔
828
        profile.address_last_deleted = datetime.now(timezone.utc)
1✔
829
        profile.num_address_deleted += 1
1✔
830
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
831
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
832
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
833
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
834
        ) + (self.num_level_one_trackers_blocked or 0)
835
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
836
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
837
        profile.save()
1✔
838
        return super(DomainAddress, self).delete(*args, **kwargs)
1✔
839

840
    @property
1✔
841
    def domain_value(self):
1✔
842
        return get_domains_from_settings().get(self.get_domain_display())
1✔
843

844
    @property
1✔
845
    def full_address(self):
1✔
846
        return "%s@%s.%s" % (
1✔
847
            self.address,
848
            self.user_profile.subdomain,
849
            self.domain_value,
850
        )
851

852

853
class Reply(models.Model):
1✔
854
    relay_address = models.ForeignKey(
1✔
855
        RelayAddress, on_delete=models.CASCADE, blank=True, null=True
856
    )
857
    domain_address = models.ForeignKey(
1✔
858
        DomainAddress, on_delete=models.CASCADE, blank=True, null=True
859
    )
860
    lookup = models.CharField(max_length=255, blank=False, db_index=True)
1✔
861
    encrypted_metadata = models.TextField(blank=False)
1✔
862
    created_at = models.DateField(auto_now_add=True, null=False, db_index=True)
1✔
863

864
    @property
1✔
865
    def address(self):
1✔
866
        return self.relay_address or self.domain_address
1✔
867

868
    @property
1✔
869
    def profile(self):
1✔
870
        return self.address.user.profile
1✔
871

872
    @property
1✔
873
    def owner_has_premium(self):
1✔
874
        return self.profile.has_premium
1✔
875

876
    def increment_num_replied(self):
1✔
877
        address = self.relay_address or self.domain_address
1✔
878
        address.num_replied += 1
1✔
879
        address.last_used_at = datetime.now(timezone.utc)
1✔
880
        address.save(update_fields=["num_replied", "last_used_at"])
1✔
881
        return address.num_replied
1✔
882

883

884
class AbuseMetrics(models.Model):
1✔
885
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
886
    first_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
887
    last_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
888
    num_address_created_per_day = models.PositiveSmallIntegerField(default=0)
1✔
889
    num_replies_per_day = models.PositiveSmallIntegerField(default=0)
1✔
890
    # Values from 0 to 32767 are safe in all databases supported by Django.
891
    num_email_forwarded_per_day = models.PositiveSmallIntegerField(default=0)
1✔
892
    # Values from 0 to 9223372036854775807 are safe in all databases supported by Django.
893
    forwarded_email_size_per_day = models.PositiveBigIntegerField(default=0)
1✔
894

895
    class Meta:
1✔
896
        unique_together = ["user", "first_recorded"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc