• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 98c45938-8d23-4015-90f8-7dff916ce789

21 Jun 2024 01:50PM CUT coverage: 85.357% (+0.02%) from 85.337%
98c45938-8d23-4015-90f8-7dff916ce789

push

circleci

web-flow
Merge pull request #4799 from mozilla/split-exceptions-validators-mpp-3827

MPP-3827: Move non-model code out of `emails/models.py`

4004 of 5135 branches covered (77.97%)

Branch coverage included in aggregate %.

335 of 339 new or added lines in 9 files covered. (98.82%)

1 existing line in 1 file now uncovered.

15746 of 18003 relevant lines covered (87.46%)

10.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.58
/emails/models.py
1
from __future__ import annotations
1✔
2

3
import logging
1✔
4
import random
1✔
5
import string
1✔
6
import uuid
1✔
7
from collections import namedtuple
1✔
8
from collections.abc import Iterable
1✔
9
from datetime import UTC, datetime, timedelta
1✔
10
from hashlib import sha256
1✔
11
from typing import Literal, cast
1✔
12

13
from django.conf import settings
1✔
14
from django.contrib.auth.models import User
1✔
15
from django.core.validators import MinLengthValidator
1✔
16
from django.db import models, transaction
1✔
17
from django.db.models.base import ModelBase
1✔
18
from django.db.models.query import QuerySet
1✔
19
from django.utils.translation.trans_real import (
1✔
20
    get_supported_language_variant,
21
    parse_accept_lang_header,
22
)
23

24
from allauth.socialaccount.models import SocialAccount
1✔
25

26
from privaterelay.plans import get_premium_countries
1✔
27
from privaterelay.utils import (
1✔
28
    AcceptLanguageError,
29
    flag_is_active_in_task,
30
    guess_country_from_accept_lang,
31
)
32

33
from .exceptions import (
1✔
34
    CannotMakeSubdomainException,
35
    DomainAddrDuplicateException,
36
    DomainAddrUnavailableException,
37
    DomainAddrUpdateException,
38
)
39
from .utils import get_domains_from_settings, incr_if_enabled
1✔
40
from .validators import (
1✔
41
    check_user_can_make_another_address,
42
    check_user_can_make_domain_address,
43
    is_blocklisted,
44
    valid_address,
45
    valid_available_subdomain,
46
)
47

48
if settings.PHONES_ENABLED:
1!
49
    from phones.models import RealPhone, RelayNumber
1✔
50

51

52
logger = logging.getLogger("events")
1✔
53
abuse_logger = logging.getLogger("abusemetrics")
1✔
54

55
BounceStatus = namedtuple("BounceStatus", "paused type")
1✔
56

57
DOMAIN_CHOICES = [(1, "RELAY_FIREFOX_DOMAIN"), (2, "MOZMAIL_DOMAIN")]
1✔
58
PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"]
1✔
59

60

61
def default_server_storage() -> bool:
1✔
62
    """
63
    This historical function is referenced in migration
64
    0029_profile_add_deleted_metric_and_changeserver_storage_default
65
    """
UNCOV
66
    return True
×
67

68

69
def default_domain_numerical() -> int:
1✔
70
    """Return the default value for RelayAddress.domain"""
71
    domains = get_domains_from_settings()
1✔
72
    domain = domains["MOZMAIL_DOMAIN"]
1✔
73
    return get_domain_numerical(domain)
1✔
74

75

76
def get_domain_numerical(domain_address: str) -> int:
1✔
77
    """Turn a domain name into a numerical domain"""
78
    # get domain name from the address
79
    domains = get_domains_from_settings()
1✔
80
    domains_keys = list(domains.keys())
1✔
81
    domains_values = list(domains.values())
1✔
82
    domain_name = domains_keys[domains_values.index(domain_address)]
1✔
83
    # get domain numerical value from domain name
84
    choices = dict(DOMAIN_CHOICES)
1✔
85
    choices_keys = list(choices.keys())
1✔
86
    choices_values = list(choices.values())
1✔
87
    return choices_keys[choices_values.index(domain_name)]
1✔
88

89

90
def address_hash(
1✔
91
    address: str, subdomain: str | None = None, domain: str | None = None
92
) -> str:
93
    """Create a hash of a Relay address, to prevent re-use."""
94
    if not domain:
1✔
95
        domain = get_domains_from_settings()["MOZMAIL_DOMAIN"]
1✔
96
    if subdomain:
1✔
97
        return sha256(f"{address}@{subdomain}.{domain}".encode()).hexdigest()
1✔
98
    if domain == settings.RELAY_FIREFOX_DOMAIN:
1✔
99
        return sha256(f"{address}".encode()).hexdigest()
1✔
100
    return sha256(f"{address}@{domain}".encode()).hexdigest()
1✔
101

102

103
def address_default() -> str:
1✔
104
    """Return a random value for RelayAddress.address"""
105
    return "".join(
1✔
106
        random.choices(  # noqa: S311 (standard pseudo-random generator used)
107
            string.ascii_lowercase + string.digits, k=9
108
        )
109
    )
110

111

112
def hash_subdomain(subdomain: str, domain: str = settings.MOZMAIL_DOMAIN) -> str:
1✔
113
    return sha256(f"{subdomain}.{domain}".encode()).hexdigest()
1✔
114

115

116
class Profile(models.Model):
1✔
117
    user = models.OneToOneField(User, on_delete=models.CASCADE)
1✔
118
    api_token = models.UUIDField(default=uuid.uuid4)
1✔
119
    num_address_deleted = models.PositiveIntegerField(default=0)
1✔
120
    date_subscribed = models.DateTimeField(blank=True, null=True)
1✔
121
    date_subscribed_phone = models.DateTimeField(blank=True, null=True)
1✔
122
    # TODO MPP-2972: delete date_phone_subscription_checked in favor of
123
    # date_phone_subscription_next_reset
124
    date_phone_subscription_checked = models.DateTimeField(blank=True, null=True)
1✔
125
    date_phone_subscription_start = models.DateTimeField(blank=True, null=True)
1✔
126
    date_phone_subscription_reset = models.DateTimeField(blank=True, null=True)
1✔
127
    date_phone_subscription_end = models.DateTimeField(blank=True, null=True)
1✔
128
    address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
129
    last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
130
    last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
131
    last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
132
    num_deleted_relay_addresses = models.PositiveIntegerField(default=0)
1✔
133
    num_deleted_domain_addresses = models.PositiveIntegerField(default=0)
1✔
134
    num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
135
    num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
136
    num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField(
1✔
137
        default=0, null=True
138
    )
139
    num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
140
    num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
141
    subdomain = models.CharField(
1✔
142
        blank=True,
143
        null=True,
144
        unique=True,
145
        max_length=63,
146
        db_index=True,
147
        validators=[valid_available_subdomain],
148
    )
149
    # Whether we store the user's alias labels in the server
150
    server_storage = models.BooleanField(default=True)
1✔
151
    # Whether we store the caller/sender log for the user's relay number
152
    store_phone_log = models.BooleanField(default=True)
1✔
153
    # TODO: Data migration to set null to false
154
    # TODO: Schema migration to remove null=True
155
    remove_level_one_email_trackers = models.BooleanField(null=True, default=False)
1✔
156
    onboarding_state = models.PositiveIntegerField(default=0)
1✔
157
    onboarding_free_state = models.PositiveIntegerField(default=0)
1✔
158
    auto_block_spam = models.BooleanField(default=False)
1✔
159
    forwarded_first_reply = models.BooleanField(default=False)
1✔
160
    # Empty string means the profile was created through relying party flow
161
    created_by = models.CharField(blank=True, null=True, max_length=63)
1✔
162
    sent_welcome_email = models.BooleanField(default=False)
1✔
163
    last_engagement = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
164

165
    def __str__(self):
1✔
166
        return f"{self.user} Profile"
1✔
167

168
    def save(
1✔
169
        self,
170
        force_insert: bool | tuple[ModelBase, ...] = False,
171
        force_update: bool = False,
172
        using: str | None = None,
173
        update_fields: Iterable[str] | None = None,
174
    ) -> None:
175
        # always lower-case the subdomain before saving it
176
        # TODO: change subdomain field as a custom field inheriting from
177
        # CharField to validate constraints on the field update too
178
        if self.subdomain and not self.subdomain.islower():
1✔
179
            self.subdomain = self.subdomain.lower()
1✔
180
            if update_fields is not None:
1✔
181
                update_fields = {"subdomain"}.union(update_fields)
1✔
182
        super().save(
1✔
183
            force_insert=force_insert,
184
            force_update=force_update,
185
            using=using,
186
            update_fields=update_fields,
187
        )
188
        # any time a profile is saved with server_storage False, delete the
189
        # appropriate server-stored Relay address data.
190
        if not self.server_storage:
1✔
191
            relay_addresses = RelayAddress.objects.filter(user=self.user)
1✔
192
            relay_addresses.update(description="", generated_for="", used_on="")
1✔
193
            domain_addresses = DomainAddress.objects.filter(user=self.user)
1✔
194
            domain_addresses.update(description="", used_on="")
1✔
195
        if settings.PHONES_ENABLED:
1!
196
            # any time a profile is saved with store_phone_log False, delete the
197
            # appropriate server-stored InboundContact records
198
            from phones.models import InboundContact, RelayNumber
1✔
199

200
            if not self.store_phone_log:
1✔
201
                try:
1✔
202
                    relay_number = RelayNumber.objects.get(user=self.user)
1✔
203
                    InboundContact.objects.filter(relay_number=relay_number).delete()
1✔
204
                except RelayNumber.DoesNotExist:
1✔
205
                    pass
1✔
206

207
    @property
1✔
208
    def language(self):
1✔
209
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
210
            for accept_lang, _ in parse_accept_lang_header(
1!
211
                self.fxa.extra_data.get("locale")
212
            ):
213
                try:
1✔
214
                    return get_supported_language_variant(accept_lang)
1✔
215
                except LookupError:
×
216
                    continue
×
217
        return "en"
1✔
218

219
    # This method returns whether the locale associated with the user's Mozilla account
220
    # includes a country code from a Premium country. This is less accurate than using
221
    # get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so
222
    # prefer using that if a request context is available. In other contexts, for
223
    # example when sending an email, this method can be useful.
224
    @property
1✔
225
    def fxa_locale_in_premium_country(self) -> bool:
1✔
226
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
227
            try:
1✔
228
                country = guess_country_from_accept_lang(self.fxa.extra_data["locale"])
1✔
229
            except AcceptLanguageError:
1✔
230
                return False
1✔
231
            premium_countries = get_premium_countries()
1✔
232
            if country in premium_countries:
1✔
233
                return True
1✔
234
        return False
1✔
235

236
    @property
1✔
237
    def avatar(self) -> str | None:
1✔
238
        if fxa := self.fxa:
1!
239
            return str(fxa.extra_data.get("avatar"))
1✔
240
        return None
×
241

242
    @property
1✔
243
    def relay_addresses(self) -> QuerySet[RelayAddress]:
1✔
244
        return RelayAddress.objects.filter(user=self.user)
1✔
245

246
    @property
1✔
247
    def domain_addresses(self) -> QuerySet[DomainAddress]:
1✔
248
        return DomainAddress.objects.filter(user=self.user)
1✔
249

250
    @property
1✔
251
    def total_masks(self) -> int:
1✔
252
        ra_count: int = self.relay_addresses.count()
1✔
253
        da_count: int = self.domain_addresses.count()
1✔
254
        return ra_count + da_count
1✔
255

256
    @property
1✔
257
    def at_mask_limit(self) -> bool:
1✔
258
        if self.has_premium:
1✔
259
            return False
1✔
260
        ra_count: int = self.relay_addresses.count()
1✔
261
        return ra_count >= settings.MAX_NUM_FREE_ALIASES
1✔
262

263
    def check_bounce_pause(self) -> BounceStatus:
1✔
264
        if self.last_hard_bounce:
1✔
265
            last_hard_bounce_allowed = datetime.now(UTC) - timedelta(
1✔
266
                days=settings.HARD_BOUNCE_ALLOWED_DAYS
267
            )
268
            if self.last_hard_bounce > last_hard_bounce_allowed:
1✔
269
                return BounceStatus(True, "hard")
1✔
270
            self.last_hard_bounce = None
1✔
271
            self.save()
1✔
272
        if self.last_soft_bounce:
1✔
273
            last_soft_bounce_allowed = datetime.now(UTC) - timedelta(
1✔
274
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
275
            )
276
            if self.last_soft_bounce > last_soft_bounce_allowed:
1✔
277
                return BounceStatus(True, "soft")
1✔
278
            self.last_soft_bounce = None
1✔
279
            self.save()
1✔
280
        return BounceStatus(False, "")
1✔
281

282
    @property
1✔
283
    def bounce_status(self) -> BounceStatus:
1✔
284
        return self.check_bounce_pause()
1✔
285

286
    @property
1✔
287
    def next_email_try(self) -> datetime:
1✔
288
        bounce_pause, bounce_type = self.check_bounce_pause()
1✔
289

290
        if not bounce_pause:
1✔
291
            return datetime.now(UTC)
1✔
292

293
        if bounce_type == "soft":
1✔
294
            if not self.last_soft_bounce:
1!
295
                raise ValueError("self.last_soft_bounce must be truthy value.")
×
296
            return self.last_soft_bounce + timedelta(
1✔
297
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
298
            )
299

300
        if bounce_type != "hard":
1!
301
            raise ValueError("bounce_type must be either 'soft' or 'hard'")
×
302
        if not self.last_hard_bounce:
1!
303
            raise ValueError("self.last_hard_bounce must be truthy value.")
×
304
        return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS)
1✔
305

306
    @property
1✔
307
    def last_bounce_date(self):
1✔
308
        if self.last_hard_bounce:
1✔
309
            return self.last_hard_bounce
1✔
310
        if self.last_soft_bounce:
1✔
311
            return self.last_soft_bounce
1✔
312
        return None
1✔
313

314
    @property
1✔
315
    def at_max_free_aliases(self) -> bool:
1✔
316
        relay_addresses_count: int = self.relay_addresses.count()
1✔
317
        return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES
1✔
318

319
    @property
1✔
320
    def fxa(self) -> SocialAccount | None:
1✔
321
        # Note: we are NOT using .filter() here because it invalidates
322
        # any profile instances that were queried with prefetch_related, which
323
        # we use in at least the profile view to minimize queries
324
        if not hasattr(self.user, "socialaccount_set"):
1!
325
            raise AttributeError("self.user must have socialaccount_set attribute")
×
326
        for sa in self.user.socialaccount_set.all():
1✔
327
            if sa.provider == "fxa":
1!
328
                return sa
1✔
329
        return None
1✔
330

331
    @property
1✔
332
    def display_name(self) -> str | None:
1✔
333
        # if display name is not set on FxA the
334
        # displayName key will not exist on the extra_data
335
        if fxa := self.fxa:
1!
336
            name = fxa.extra_data.get("displayName")
1✔
337
            return name if name is None else str(name)
1✔
338
        return None
×
339

340
    @property
1✔
341
    def custom_domain(self) -> str:
1✔
342
        if not self.subdomain:
×
343
            raise ValueError("self.subdomain must be truthy value.")
×
344
        return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}"
×
345

346
    @property
1✔
347
    def has_premium(self) -> bool:
1✔
348
        if not self.user.is_active:
1!
349
            return False
×
350

351
        # FIXME: as we don't have all the tiers defined we are over-defining
352
        # this to mark the user as a premium user as well
353
        if not self.fxa:
1✔
354
            return False
1✔
355
        for premium_domain in PREMIUM_DOMAINS:
1✔
356
            if self.user.email.endswith(f"@{premium_domain}"):
1!
357
                return True
×
358
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
359
        for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED:
1✔
360
            if sub in user_subscriptions:
1✔
361
                return True
1✔
362
        return False
1✔
363

364
    @property
1✔
365
    def has_phone(self) -> bool:
1✔
366
        if not self.fxa:
1✔
367
            return False
1✔
368
        if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST:
1!
369
            if not flag_is_active_in_task("phones", self.user):
×
370
                return False
×
371
        if flag_is_active_in_task("free_phones", self.user):
1!
372
            return True
×
373
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
374
        for sub in settings.SUBSCRIPTIONS_WITH_PHONE:
1✔
375
            if sub in user_subscriptions:
1✔
376
                return True
1✔
377
        return False
1✔
378

379
    @property
1✔
380
    def has_vpn(self) -> bool:
1✔
381
        if not self.fxa:
1!
382
            return False
×
383
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
384
        for sub in settings.SUBSCRIPTIONS_WITH_VPN:
1✔
385
            if sub in user_subscriptions:
1✔
386
                return True
1✔
387
        return False
1✔
388

389
    @property
1✔
390
    def emails_forwarded(self) -> int:
1✔
391
        return (
1✔
392
            sum(ra.num_forwarded for ra in self.relay_addresses)
393
            + sum(da.num_forwarded for da in self.domain_addresses)
394
            + self.num_email_forwarded_in_deleted_address
395
        )
396

397
    @property
1✔
398
    def emails_blocked(self) -> int:
1✔
399
        return (
1✔
400
            sum(ra.num_blocked for ra in self.relay_addresses)
401
            + sum(da.num_blocked for da in self.domain_addresses)
402
            + self.num_email_blocked_in_deleted_address
403
        )
404

405
    @property
1✔
406
    def emails_replied(self) -> int:
1✔
407
        ra_sum = self.relay_addresses.aggregate(models.Sum("num_replied", default=0))
1✔
408
        da_sum = self.domain_addresses.aggregate(models.Sum("num_replied", default=0))
1✔
409
        return (
1✔
410
            int(ra_sum["num_replied__sum"])
411
            + int(da_sum["num_replied__sum"])
412
            + self.num_email_replied_in_deleted_address
413
        )
414

415
    @property
1✔
416
    def level_one_trackers_blocked(self) -> int:
1✔
417
        return (
1✔
418
            sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses)
419
            + sum(
420
                da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses
421
            )
422
            + (self.num_level_one_trackers_blocked_in_deleted_address or 0)
423
        )
424

425
    @property
1✔
426
    def joined_before_premium_release(self):
1✔
427
        date_created = self.user.date_joined
1✔
428
        return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00")
1✔
429

430
    @property
1✔
431
    def date_phone_registered(self) -> datetime | None:
1✔
432
        if not settings.PHONES_ENABLED:
1!
433
            return None
×
434

435
        try:
1✔
436
            real_phone = RealPhone.objects.get(user=self.user)
1✔
437
            relay_number = RelayNumber.objects.get(user=self.user)
1✔
438
        except RealPhone.DoesNotExist:
1✔
439
            return None
1✔
440
        except RelayNumber.DoesNotExist:
1✔
441
            return real_phone.verified_date
1✔
442
        return relay_number.created_at or real_phone.verified_date
1✔
443

444
    def add_subdomain(self, subdomain):
1✔
445
        # Handles if the subdomain is "" or None
446
        if not subdomain:
1✔
447
            raise CannotMakeSubdomainException(
1✔
448
                "error-subdomain-cannot-be-empty-or-null"
449
            )
450

451
        # subdomain must be all lowercase
452
        subdomain = subdomain.lower()
1✔
453

454
        if not self.has_premium:
1✔
455
            raise CannotMakeSubdomainException("error-premium-set-subdomain")
1✔
456
        if self.subdomain is not None:
1✔
457
            raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain")
1✔
458
        self.subdomain = subdomain
1✔
459
        # The validator defined in the subdomain field does not get run in full_clean()
460
        # when self.subdomain is "" or None, so we need to run the validator again to
461
        # catch these cases.
462
        valid_available_subdomain(subdomain)
1✔
463
        self.full_clean()
1✔
464
        self.save()
1✔
465

466
        RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain))
1✔
467
        return subdomain
1✔
468

469
    def update_abuse_metric(
1✔
470
        self,
471
        address_created: bool = False,
472
        replied: bool = False,
473
        email_forwarded: bool = False,
474
        forwarded_email_size: int = 0,
475
    ) -> datetime | None:
476
        # TODO MPP-3720: This should be wrapped in atomic or select_for_update to ensure
477
        # race conditions are properly handled.
478

479
        # look for abuse metrics created on the same UTC date, regardless of time.
480
        midnight_utc_today = datetime.combine(
1✔
481
            datetime.now(UTC).date(), datetime.min.time()
482
        ).astimezone(UTC)
483
        midnight_utc_tomorow = midnight_utc_today + timedelta(days=1)
1✔
484
        abuse_metric = self.user.abusemetrics_set.filter(
1✔
485
            first_recorded__gte=midnight_utc_today,
486
            first_recorded__lt=midnight_utc_tomorow,
487
        ).first()
488
        if not abuse_metric:
1✔
489
            abuse_metric = AbuseMetrics.objects.create(user=self.user)
1✔
490
            AbuseMetrics.objects.filter(first_recorded__lt=midnight_utc_today).delete()
1✔
491

492
        # increment the abuse metric
493
        if address_created:
1✔
494
            abuse_metric.num_address_created_per_day += 1
1✔
495
        if replied:
1✔
496
            abuse_metric.num_replies_per_day += 1
1✔
497
        if email_forwarded:
1✔
498
            abuse_metric.num_email_forwarded_per_day += 1
1✔
499
        if forwarded_email_size > 0:
1✔
500
            abuse_metric.forwarded_email_size_per_day += forwarded_email_size
1✔
501
        abuse_metric.last_recorded = datetime.now(UTC)
1✔
502
        abuse_metric.save()
1✔
503

504
        # check user should be flagged for abuse
505
        hit_max_create = False
1✔
506
        hit_max_replies = False
1✔
507
        hit_max_forwarded = False
1✔
508
        hit_max_forwarded_email_size = False
1✔
509

510
        hit_max_create = (
1✔
511
            abuse_metric.num_address_created_per_day
512
            >= settings.MAX_ADDRESS_CREATION_PER_DAY
513
        )
514
        hit_max_replies = (
1✔
515
            abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY
516
        )
517
        hit_max_forwarded = (
1✔
518
            abuse_metric.num_email_forwarded_per_day >= settings.MAX_FORWARDED_PER_DAY
519
        )
520
        hit_max_forwarded_email_size = (
1✔
521
            abuse_metric.forwarded_email_size_per_day
522
            >= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY
523
        )
524
        if (
1✔
525
            hit_max_create
526
            or hit_max_replies
527
            or hit_max_forwarded
528
            or hit_max_forwarded_email_size
529
        ):
530
            self.last_account_flagged = datetime.now(UTC)
1✔
531
            self.save()
1✔
532
            data = {
1✔
533
                "uid": self.fxa.uid if self.fxa else None,
534
                "flagged": self.last_account_flagged.timestamp(),
535
                "replies": abuse_metric.num_replies_per_day,
536
                "addresses": abuse_metric.num_address_created_per_day,
537
                "forwarded": abuse_metric.num_email_forwarded_per_day,
538
                "forwarded_size_in_bytes": abuse_metric.forwarded_email_size_per_day,
539
            }
540
            # log for further secops review
541
            abuse_logger.info("Abuse flagged", extra=data)
1✔
542
        return self.last_account_flagged
1✔
543

544
    @property
1✔
545
    def is_flagged(self):
1✔
546
        if not self.last_account_flagged:
1✔
547
            return False
1✔
548
        account_premium_feature_resumed = self.last_account_flagged + timedelta(
1✔
549
            days=settings.PREMIUM_FEATURE_PAUSED_DAYS
550
        )
551
        if datetime.now(UTC) > account_premium_feature_resumed:
1!
552
            # premium feature has been resumed
553
            return False
×
554
        # user was flagged and the premium feature pause period is not yet over
555
        return True
1✔
556

557
    @property
1✔
558
    def metrics_enabled(self) -> bool:
1✔
559
        """
560
        Does the user allow us to record technical and interaction data?
561

562
        This is based on the Mozilla accounts opt-out option, added around 2022. A user
563
        can go to their Mozilla account profile settings, Data Collection and Use, and
564
        deselect "Help improve Mozilla Account". This setting defaults to On, and is
565
        sent as "metricsEnabled". Some older Relay accounts do not have
566
        "metricsEnabled", and we default to On.
567
        """
568
        if self.fxa:
1✔
569
            return bool(self.fxa.extra_data.get("metricsEnabled", True))
1✔
570
        return True
1✔
571

572
    @property
1✔
573
    def plan(self) -> Literal["free", "email", "phone", "bundle"]:
1✔
574
        """The user's Relay plan as a string."""
575
        if self.has_premium:
1✔
576
            if self.has_phone:
1✔
577
                return "bundle" if self.has_vpn else "phone"
1✔
578
            else:
579
                return "email"
1✔
580
        else:
581
            return "free"
1✔
582

583
    @property
1✔
584
    def plan_term(self) -> Literal[None, "unknown", "1_month", "1_year"]:
1✔
585
        """The user's Relay plan term as a string."""
586
        plan = self.plan
1✔
587
        if plan == "free":
1✔
588
            return None
1✔
589
        if plan == "phone":
1✔
590
            start_date = self.date_phone_subscription_start
1✔
591
            end_date = self.date_phone_subscription_end
1✔
592
            if start_date and end_date:
1✔
593
                span = end_date - start_date
1✔
594
                return "1_year" if span.days > 32 else "1_month"
1✔
595
        return "unknown"
1✔
596

597
    @property
1✔
598
    def metrics_premium_status(self) -> str:
1✔
599
        plan = self.plan
1✔
600
        if plan == "free":
1✔
601
            return "free"
1✔
602
        return f"{plan}_{self.plan_term}"
1✔
603

604

605
class RegisteredSubdomain(models.Model):
1✔
606
    subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True)
1✔
607
    registered_at = models.DateTimeField(auto_now_add=True)
1✔
608

609
    def __str__(self):
1✔
610
        return self.subdomain_hash
×
611

612

613
class RelayAddress(models.Model):
1✔
614
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
615
    address = models.CharField(max_length=64, default=address_default, unique=True)
1✔
616
    domain = models.PositiveSmallIntegerField(
1✔
617
        choices=DOMAIN_CHOICES, default=default_domain_numerical
618
    )
619
    enabled = models.BooleanField(default=True)
1✔
620
    description = models.CharField(max_length=64, blank=True)
1✔
621
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
622
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
623
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
624
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
625
    num_blocked = models.PositiveIntegerField(default=0)
1✔
626
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
627
    num_replied = models.PositiveIntegerField(default=0)
1✔
628
    num_spam = models.PositiveIntegerField(default=0)
1✔
629
    generated_for = models.CharField(max_length=255, blank=True)
1✔
630
    block_list_emails = models.BooleanField(default=False)
1✔
631
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
632

633
    class Meta:
1✔
634
        indexes = [
1✔
635
            # Find when a user first used the add-on
636
            models.Index(
637
                name="idx_ra_created_by_addon",
638
                fields=["user"],
639
                condition=~models.Q(generated_for__exact=""),
640
                include=["created_at"],
641
            ),
642
        ]
643
        verbose_name_plural = "relay addresses"
1✔
644

645
    def __str__(self):
1✔
646
        return self.address
1✔
647

648
    def delete(self, *args, **kwargs):
1✔
649
        # TODO: create hard bounce receipt rule in AWS for the address
650
        deleted_address = DeletedAddress.objects.create(
1✔
651
            address_hash=address_hash(self.address, domain=self.domain_value),
652
            num_forwarded=self.num_forwarded,
653
            num_blocked=self.num_blocked,
654
            num_replied=self.num_replied,
655
            num_spam=self.num_spam,
656
        )
657
        deleted_address.save()
1✔
658
        profile = Profile.objects.get(user=self.user)
1✔
659
        profile.address_last_deleted = datetime.now(UTC)
1✔
660
        profile.num_address_deleted += 1
1✔
661
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
662
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
663
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
664
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
665
        ) + (self.num_level_one_trackers_blocked or 0)
666
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
667
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
668
        profile.num_deleted_relay_addresses += 1
1✔
669
        profile.last_engagement = datetime.now(UTC)
1✔
670
        profile.save()
1✔
671
        return super().delete(*args, **kwargs)
1✔
672

673
    def save(
1✔
674
        self,
675
        force_insert: bool | tuple[ModelBase, ...] = False,
676
        force_update: bool = False,
677
        using: str | None = None,
678
        update_fields: Iterable[str] | None = None,
679
    ) -> None:
680
        if self._state.adding:
1✔
681
            with transaction.atomic():
1✔
682
                locked_profile = Profile.objects.select_for_update().get(user=self.user)
1✔
683
                check_user_can_make_another_address(locked_profile.user)
1✔
684
                while True:
1✔
685
                    address_is_allowed = not is_blocklisted(self.address)
1✔
686
                    address_is_valid = valid_address(self.address, self.domain_value)
1✔
687
                    if address_is_valid and address_is_allowed:
1✔
688
                        break
1✔
689
                    self.address = address_default()
1✔
690
                locked_profile.update_abuse_metric(address_created=True)
1✔
691
                locked_profile.last_engagement = datetime.now(UTC)
1✔
692
                locked_profile.save()
1✔
693
        if (not self.user.profile.server_storage) and any(
1✔
694
            (self.description, self.generated_for, self.used_on)
695
        ):
696
            self.description = ""
1✔
697
            self.generated_for = ""
1✔
698
            self.used_on = ""
1✔
699
            if update_fields is not None:
1✔
700
                update_fields = {"description", "generated_for", "used_on"}.union(
1✔
701
                    update_fields
702
                )
703
        if not self.user.profile.has_premium and self.block_list_emails:
1✔
704
            self.block_list_emails = False
1✔
705
            if update_fields is not None:
1✔
706
                update_fields = {"block_list_emails"}.union(update_fields)
1✔
707
        super().save(
1✔
708
            force_insert=force_insert,
709
            force_update=force_update,
710
            using=using,
711
            update_fields=update_fields,
712
        )
713

714
    @property
1✔
715
    def domain_value(self) -> str:
1✔
716
        domain = cast(
1✔
717
            Literal["RELAY_FIREFOX_DOMAIN", "MOZMAIL_DOMAIN"], self.get_domain_display()
718
        )
719
        return get_domains_from_settings()[domain]
1✔
720

721
    @property
1✔
722
    def full_address(self) -> str:
1✔
723
        return f"{self.address}@{self.domain_value}"
1✔
724

725
    @property
1✔
726
    def metrics_id(self) -> str:
1✔
727
        if not self.id:
1!
728
            raise ValueError("self.id must be truthy value.")
×
729
        # Prefix with 'R' for RelayAddress, since there may be a DomainAddress with the
730
        # same row ID
731
        return f"R{self.id}"
1✔
732

733

734
class DeletedAddress(models.Model):
1✔
735
    address_hash = models.CharField(max_length=64, db_index=True)
1✔
736
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
737
    num_blocked = models.PositiveIntegerField(default=0)
1✔
738
    num_replied = models.PositiveIntegerField(default=0)
1✔
739
    num_spam = models.PositiveIntegerField(default=0)
1✔
740

741
    def __str__(self):
1✔
742
        return self.address_hash
×
743

744

745
class DomainAddress(models.Model):
1✔
746
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
747
    address = models.CharField(
1✔
748
        max_length=64, validators=[MinLengthValidator(limit_value=1)]
749
    )
750
    enabled = models.BooleanField(default=True)
1✔
751
    description = models.CharField(max_length=64, blank=True)
1✔
752
    domain = models.PositiveSmallIntegerField(choices=DOMAIN_CHOICES, default=2)
1✔
753
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
754
    first_emailed_at = models.DateTimeField(null=True, db_index=True)
1✔
755
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
756
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
757
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
758
    num_blocked = models.PositiveIntegerField(default=0)
1✔
759
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
760
    num_replied = models.PositiveIntegerField(default=0)
1✔
761
    num_spam = models.PositiveIntegerField(default=0)
1✔
762
    block_list_emails = models.BooleanField(default=False)
1✔
763
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
764

765
    class Meta:
1✔
766
        unique_together = ["user", "address"]
1✔
767
        verbose_name_plural = "domain addresses"
1✔
768

769
    def __str__(self):
1✔
770
        return self.address
×
771

772
    def save(
1✔
773
        self,
774
        force_insert: bool | tuple[ModelBase, ...] = False,
775
        force_update: bool = False,
776
        using: str | None = None,
777
        update_fields: Iterable[str] | None = None,
778
    ) -> None:
779
        if self._state.adding:
1✔
780
            check_user_can_make_domain_address(self.user)
1✔
781
            domain_address_valid = valid_address(
1✔
782
                self.address, self.domain_value, self.user.profile.subdomain
783
            )
784
            if not domain_address_valid:
1✔
785
                if self.first_emailed_at:
1!
786
                    incr_if_enabled("domainaddress.create_via_email_fail")
×
787
                raise DomainAddrUnavailableException(unavailable_address=self.address)
1✔
788

789
            if DomainAddress.objects.filter(
1✔
790
                user=self.user, address=self.address
791
            ).exists():
792
                raise DomainAddrDuplicateException(duplicate_address=self.address)
1✔
793

794
            self.user.profile.update_abuse_metric(address_created=True)
1✔
795
            self.user.profile.last_engagement = datetime.now(UTC)
1✔
796
            self.user.profile.save(update_fields=["last_engagement"])
1✔
797
            incr_if_enabled("domainaddress.create")
1✔
798
            if self.first_emailed_at:
1✔
799
                incr_if_enabled("domainaddress.create_via_email")
1✔
800
        else:
801
            # The model is in an update state, do not allow 'address' field updates
802
            existing_instance = DomainAddress.objects.get(id=self.id)
1✔
803
            if existing_instance.address != self.address:
1✔
804
                raise DomainAddrUpdateException()
1✔
805

806
        if not self.user.profile.has_premium and self.block_list_emails:
1✔
807
            self.block_list_emails = False
1✔
808
            if update_fields:
1✔
809
                update_fields = {"block_list_emails"}.union(update_fields)
1✔
810
        if (not self.user.profile.server_storage) and (
1✔
811
            self.description or self.used_on
812
        ):
813
            self.description = ""
1✔
814
            self.used_on = ""
1✔
815
            if update_fields:
1✔
816
                update_fields = {"description", "used_on"}.union(update_fields)
1✔
817
        super().save(
1✔
818
            force_insert=force_insert,
819
            force_update=force_update,
820
            using=using,
821
            update_fields=update_fields,
822
        )
823

824
    @property
1✔
825
    def user_profile(self):
1✔
826
        return Profile.objects.get(user=self.user)
1✔
827

828
    @staticmethod
1✔
829
    def make_domain_address(
1✔
830
        user: User, address: str | None = None, made_via_email: bool = False
831
    ) -> DomainAddress:
832
        check_user_can_make_domain_address(user)
1✔
833

834
        if not address:
1✔
835
            # FIXME: if the alias is randomly generated and has bad words
836
            # we should retry like make_relay_address does
837
            # not fixing this now because not sure randomly generated
838
            # DomainAlias will be a feature
839
            address = address_default()
1✔
840
            # Only check for bad words if randomly generated
841
        if not isinstance(address, str):
1!
842
            raise TypeError("address must be type str")
×
843

844
        first_emailed_at = datetime.now(UTC) if made_via_email else None
1✔
845
        domain_address = DomainAddress.objects.create(
1✔
846
            user=user, address=address, first_emailed_at=first_emailed_at
847
        )
848
        return domain_address
1✔
849

850
    def delete(self, *args, **kwargs):
1✔
851
        # TODO: create hard bounce receipt rule in AWS for the address
852
        deleted_address = DeletedAddress.objects.create(
1✔
853
            address_hash=address_hash(
854
                self.address, self.user_profile.subdomain, self.domain_value
855
            ),
856
            num_forwarded=self.num_forwarded,
857
            num_blocked=self.num_blocked,
858
            num_replied=self.num_replied,
859
            num_spam=self.num_spam,
860
        )
861
        deleted_address.save()
1✔
862
        # self.user_profile is a property and should not be used to
863
        # update values on the user's profile
864
        profile = Profile.objects.get(user=self.user)
1✔
865
        profile.address_last_deleted = datetime.now(UTC)
1✔
866
        profile.num_address_deleted += 1
1✔
867
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
868
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
869
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
870
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
871
        ) + (self.num_level_one_trackers_blocked or 0)
872
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
873
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
874
        profile.num_deleted_domain_addresses += 1
1✔
875
        profile.last_engagement = datetime.now(UTC)
1✔
876
        profile.save()
1✔
877
        return super().delete(*args, **kwargs)
1✔
878

879
    @property
1✔
880
    def domain_value(self) -> str:
1✔
881
        domain = cast(
1✔
882
            Literal["RELAY_FIREFOX_DOMAIN", "MOZMAIL_DOMAIN"], self.get_domain_display()
883
        )
884
        return get_domains_from_settings()[domain]
1✔
885

886
    @property
1✔
887
    def full_address(self) -> str:
1✔
888
        return f"{self.address}@{self.user_profile.subdomain}.{self.domain_value}"
1✔
889

890
    @property
1✔
891
    def metrics_id(self) -> str:
1✔
892
        if not self.id:
1!
893
            raise ValueError("self.id must be truthy value.")
×
894
        # Prefix with 'D' for DomainAddress, since there may be a RelayAddress with the
895
        # same row ID
896
        return f"D{self.id}"
1✔
897

898

899
class Reply(models.Model):
1✔
900
    relay_address = models.ForeignKey(
1✔
901
        RelayAddress, on_delete=models.CASCADE, blank=True, null=True
902
    )
903
    domain_address = models.ForeignKey(
1✔
904
        DomainAddress, on_delete=models.CASCADE, blank=True, null=True
905
    )
906
    lookup = models.CharField(max_length=255, blank=False, db_index=True)
1✔
907
    encrypted_metadata = models.TextField(blank=False)
1✔
908
    created_at = models.DateField(auto_now_add=True, null=False, db_index=True)
1✔
909

910
    @property
1✔
911
    def address(self):
1✔
912
        return self.relay_address or self.domain_address
1✔
913

914
    @property
1✔
915
    def profile(self):
1✔
916
        return self.address.user.profile
1✔
917

918
    @property
1✔
919
    def owner_has_premium(self):
1✔
920
        return self.profile.has_premium
1✔
921

922
    def increment_num_replied(self):
1✔
923
        address = self.relay_address or self.domain_address
1✔
924
        if not address:
1!
925
            raise ValueError("address must be truthy value")
×
926
        address.num_replied += 1
1✔
927
        address.last_used_at = datetime.now(UTC)
1✔
928
        address.save(update_fields=["num_replied", "last_used_at"])
1✔
929
        return address.num_replied
1✔
930

931

932
class AbuseMetrics(models.Model):
1✔
933
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
934
    first_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
935
    last_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
936
    num_address_created_per_day = models.PositiveSmallIntegerField(default=0)
1✔
937
    num_replies_per_day = models.PositiveSmallIntegerField(default=0)
1✔
938
    # Values from 0 to 32767 are safe in all databases supported by Django.
939
    num_email_forwarded_per_day = models.PositiveSmallIntegerField(default=0)
1✔
940
    # Values from 0 to 9.2 exabytes are safe in all databases supported by Django.
941
    forwarded_email_size_per_day = models.PositiveBigIntegerField(default=0)
1✔
942

943
    class Meta:
1✔
944
        unique_together = ["user", "first_recorded"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc