• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / d3128616-238d-446e-82c5-ab66cd38ceaf

09 May 2024 06:22PM CUT coverage: 84.07% (-0.6%) from 84.64%
d3128616-238d-446e-82c5-ab66cd38ceaf

push

circleci

web-flow
Merge pull request #4684 from mozilla/enable-flak8-bandit-checks-mpp-3802

fix MPP-3802: stop ignoring bandit security checks

3601 of 4734 branches covered (76.07%)

Branch coverage included in aggregate %.

74 of 158 new or added lines in 24 files covered. (46.84%)

5 existing lines in 5 files now uncovered.

14686 of 17018 relevant lines covered (86.3%)

10.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.67
/emails/models.py
1
from __future__ import annotations
1✔
2

3
import logging
1✔
4
import random
1✔
5
import re
1✔
6
import string
1✔
7
import uuid
1✔
8
from collections import namedtuple
1✔
9
from collections.abc import Iterable
1✔
10
from datetime import UTC, datetime, timedelta
1✔
11
from hashlib import sha256
1✔
12
from typing import Literal, cast
1✔
13

14
from django.conf import settings
1✔
15
from django.contrib.auth.models import User
1✔
16
from django.core.exceptions import BadRequest
1✔
17
from django.core.validators import MinLengthValidator
1✔
18
from django.db import models, transaction
1✔
19
from django.db.models.base import ModelBase
1✔
20
from django.db.models.query import QuerySet
1✔
21
from django.dispatch import receiver
1✔
22
from django.utils.translation.trans_real import (
1✔
23
    get_supported_language_variant,
24
    parse_accept_lang_header,
25
)
26

27
from allauth.socialaccount.models import SocialAccount
1✔
28
from rest_framework.authtoken.models import Token
1✔
29

30
from api.exceptions import ErrorContextType, RelayAPIException
1✔
31
from privaterelay.plans import get_premium_countries
1✔
32
from privaterelay.utils import (
1✔
33
    AcceptLanguageError,
34
    flag_is_active_in_task,
35
    guess_country_from_accept_lang,
36
)
37

38
from .apps import emails_config
1✔
39
from .utils import get_domains_from_settings, incr_if_enabled
1✔
40

41
if settings.PHONES_ENABLED:
1!
42
    from phones.models import RealPhone, RelayNumber
1✔
43

44

45
logger = logging.getLogger("events")
1✔
46
abuse_logger = logging.getLogger("abusemetrics")
1✔
47

48
BounceStatus = namedtuple("BounceStatus", "paused type")
1✔
49

50
DOMAIN_CHOICES = [(1, "RELAY_FIREFOX_DOMAIN"), (2, "MOZMAIL_DOMAIN")]
1✔
51
PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"]
1✔
52

53

54
def valid_available_subdomain(subdomain, *args, **kwargs):
1✔
55
    if not subdomain:
1✔
56
        raise CannotMakeSubdomainException("error-subdomain-cannot-be-empty-or-null")
1✔
57
    # valid subdomains:
58
    #   can't start or end with a hyphen
59
    #   must be 1-63 alphanumeric characters and/or hyphens
60
    subdomain = subdomain.lower()
1✔
61
    valid_subdomain_pattern = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$")
1✔
62
    valid = valid_subdomain_pattern.match(subdomain) is not None
1✔
63
    #   can't have "bad" words in them
64
    bad_word = has_bad_words(subdomain)
1✔
65
    #   can't have "blocked" words in them
66
    blocked_word = is_blocklisted(subdomain)
1✔
67
    #   can't be taken by someone else
68
    taken = (
1✔
69
        RegisteredSubdomain.objects.filter(
70
            subdomain_hash=hash_subdomain(subdomain)
71
        ).count()
72
        > 0
73
    )
74
    if not valid or bad_word or blocked_word or taken:
1✔
75
        raise CannotMakeSubdomainException("error-subdomain-not-available")
1✔
76
    return True
1✔
77

78

79
# This historical function is referenced in migration
80
# 0029_profile_add_deleted_metric_and_changeserver_storage_default
81
def default_server_storage():
1✔
82
    return True
×
83

84

85
def default_domain_numerical():
1✔
86
    domains = get_domains_from_settings()
1✔
87
    domain = domains["MOZMAIL_DOMAIN"]
1✔
88
    return get_domain_numerical(domain)
1✔
89

90

91
class Profile(models.Model):
1✔
92
    user = models.OneToOneField(User, on_delete=models.CASCADE)
1✔
93
    api_token = models.UUIDField(default=uuid.uuid4)
1✔
94
    num_address_deleted = models.PositiveIntegerField(default=0)
1✔
95
    date_subscribed = models.DateTimeField(blank=True, null=True)
1✔
96
    date_subscribed_phone = models.DateTimeField(blank=True, null=True)
1✔
97
    # TODO MPP-2972: delete date_phone_subscription_checked in favor of
98
    # date_phone_subscription_next_reset
99
    date_phone_subscription_checked = models.DateTimeField(blank=True, null=True)
1✔
100
    date_phone_subscription_start = models.DateTimeField(blank=True, null=True)
1✔
101
    date_phone_subscription_reset = models.DateTimeField(blank=True, null=True)
1✔
102
    date_phone_subscription_end = models.DateTimeField(blank=True, null=True)
1✔
103
    address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
104
    last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
105
    last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
106
    last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
107
    num_deleted_relay_addresses = models.PositiveIntegerField(default=0)
1✔
108
    num_deleted_domain_addresses = models.PositiveIntegerField(default=0)
1✔
109
    num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
110
    num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
111
    num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField(
1✔
112
        default=0, null=True
113
    )
114
    num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
115
    num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
116
    subdomain = models.CharField(
1✔
117
        blank=True,
118
        null=True,
119
        unique=True,
120
        max_length=63,
121
        db_index=True,
122
        validators=[valid_available_subdomain],
123
    )
124
    # Whether we store the user's alias labels in the server
125
    server_storage = models.BooleanField(default=True)
1✔
126
    # Whether we store the caller/sender log for the user's relay number
127
    store_phone_log = models.BooleanField(default=True)
1✔
128
    # TODO: Data migration to set null to false
129
    # TODO: Schema migration to remove null=True
130
    remove_level_one_email_trackers = models.BooleanField(null=True, default=False)
1✔
131
    onboarding_state = models.PositiveIntegerField(default=0)
1✔
132
    onboarding_free_state = models.PositiveIntegerField(default=0)
1✔
133
    auto_block_spam = models.BooleanField(default=False)
1✔
134
    forwarded_first_reply = models.BooleanField(default=False)
1✔
135
    # Empty string means the profile was created through relying party flow
136
    created_by = models.CharField(blank=True, null=True, max_length=63)
1✔
137
    sent_welcome_email = models.BooleanField(default=False)
1✔
138
    last_engagement = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
139

140
    def __str__(self):
1✔
141
        return f"{self.user} Profile"
1✔
142

143
    def save(
1✔
144
        self,
145
        force_insert: bool | tuple[ModelBase, ...] = False,
146
        force_update: bool = False,
147
        using: str | None = None,
148
        update_fields: Iterable[str] | None = None,
149
    ) -> None:
150
        # always lower-case the subdomain before saving it
151
        # TODO: change subdomain field as a custom field inheriting from
152
        # CharField to validate constraints on the field update too
153
        if self.subdomain and not self.subdomain.islower():
1✔
154
            self.subdomain = self.subdomain.lower()
1✔
155
            if update_fields is not None:
1✔
156
                update_fields = {"subdomain"}.union(update_fields)
1✔
157
        super().save(
1✔
158
            force_insert=force_insert,
159
            force_update=force_update,
160
            using=using,
161
            update_fields=update_fields,
162
        )
163
        # any time a profile is saved with server_storage False, delete the
164
        # appropriate server-stored Relay address data.
165
        if not self.server_storage:
1✔
166
            relay_addresses = RelayAddress.objects.filter(user=self.user)
1✔
167
            relay_addresses.update(description="", generated_for="", used_on="")
1✔
168
            domain_addresses = DomainAddress.objects.filter(user=self.user)
1✔
169
            domain_addresses.update(description="", used_on="")
1✔
170
        if settings.PHONES_ENABLED:
1!
171
            # any time a profile is saved with store_phone_log False, delete the
172
            # appropriate server-stored InboundContact records
173
            from phones.models import InboundContact, RelayNumber
1✔
174

175
            if not self.store_phone_log:
1✔
176
                try:
1✔
177
                    relay_number = RelayNumber.objects.get(user=self.user)
1✔
178
                    InboundContact.objects.filter(relay_number=relay_number).delete()
1✔
179
                except RelayNumber.DoesNotExist:
1✔
180
                    pass
1✔
181

182
    @property
1✔
183
    def language(self):
1✔
184
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
185
            for accept_lang, _ in parse_accept_lang_header(
1!
186
                self.fxa.extra_data.get("locale")
187
            ):
188
                try:
1✔
189
                    return get_supported_language_variant(accept_lang)
1✔
190
                except LookupError:
×
191
                    continue
×
192
        return "en"
1✔
193

194
    # This method returns whether the locale associated with the user's Mozilla account
195
    # includes a country code from a Premium country. This is less accurate than using
196
    # get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so
197
    # prefer using that if a request context is available. In other contexts, for
198
    # example when sending an email, this method can be useful.
199
    @property
1✔
200
    def fxa_locale_in_premium_country(self) -> bool:
1✔
201
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
202
            try:
1✔
203
                country = guess_country_from_accept_lang(self.fxa.extra_data["locale"])
1✔
204
            except AcceptLanguageError:
1✔
205
                return False
1✔
206
            premium_countries = get_premium_countries()
1✔
207
            if country in premium_countries:
1✔
208
                return True
1✔
209
        return False
1✔
210

211
    @property
1✔
212
    def avatar(self) -> str | None:
1✔
213
        if fxa := self.fxa:
1!
214
            return str(fxa.extra_data.get("avatar"))
1✔
215
        return None
×
216

217
    @property
1✔
218
    def relay_addresses(self) -> QuerySet[RelayAddress]:
1✔
219
        return RelayAddress.objects.filter(user=self.user)
1✔
220

221
    @property
1✔
222
    def domain_addresses(self) -> QuerySet[DomainAddress]:
1✔
223
        return DomainAddress.objects.filter(user=self.user)
1✔
224

225
    @property
1✔
226
    def total_masks(self) -> int:
1✔
227
        ra_count: int = self.relay_addresses.count()
1✔
228
        da_count: int = self.domain_addresses.count()
1✔
229
        return ra_count + da_count
1✔
230

231
    @property
1✔
232
    def at_mask_limit(self) -> bool:
1✔
233
        if self.has_premium:
1✔
234
            return False
1✔
235
        ra_count: int = self.relay_addresses.count()
1✔
236
        return ra_count >= settings.MAX_NUM_FREE_ALIASES
1✔
237

238
    def check_bounce_pause(self) -> BounceStatus:
1✔
239
        if self.last_hard_bounce:
1✔
240
            last_hard_bounce_allowed = datetime.now(UTC) - timedelta(
1✔
241
                days=settings.HARD_BOUNCE_ALLOWED_DAYS
242
            )
243
            if self.last_hard_bounce > last_hard_bounce_allowed:
1✔
244
                return BounceStatus(True, "hard")
1✔
245
            self.last_hard_bounce = None
1✔
246
            self.save()
1✔
247
        if self.last_soft_bounce:
1✔
248
            last_soft_bounce_allowed = datetime.now(UTC) - timedelta(
1✔
249
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
250
            )
251
            if self.last_soft_bounce > last_soft_bounce_allowed:
1✔
252
                return BounceStatus(True, "soft")
1✔
253
            self.last_soft_bounce = None
1✔
254
            self.save()
1✔
255
        return BounceStatus(False, "")
1✔
256

257
    @property
1✔
258
    def bounce_status(self) -> BounceStatus:
1✔
259
        return self.check_bounce_pause()
1✔
260

261
    @property
1✔
262
    def next_email_try(self) -> datetime:
1✔
263
        bounce_pause, bounce_type = self.check_bounce_pause()
1✔
264

265
        if not bounce_pause:
1✔
266
            return datetime.now(UTC)
1✔
267

268
        if bounce_type == "soft":
1✔
269
            if not self.last_soft_bounce:
1!
NEW
270
                raise ValueError("self.last_soft_bounce must be truthy value.")
×
271
            return self.last_soft_bounce + timedelta(
1✔
272
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
273
            )
274

275
        if bounce_type != "hard":
1!
NEW
276
            raise ValueError("bounce_type must be either 'soft' or 'hard'")
×
277
        if not self.last_hard_bounce:
1!
NEW
278
            raise ValueError("self.last_hard_bounce must be truthy value.")
×
279
        return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS)
1✔
280

281
    @property
1✔
282
    def last_bounce_date(self):
1✔
283
        if self.last_hard_bounce:
1✔
284
            return self.last_hard_bounce
1✔
285
        if self.last_soft_bounce:
1✔
286
            return self.last_soft_bounce
1✔
287
        return None
1✔
288

289
    @property
1✔
290
    def at_max_free_aliases(self) -> bool:
1✔
291
        relay_addresses_count: int = self.relay_addresses.count()
1✔
292
        return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES
1✔
293

294
    @property
1✔
295
    def fxa(self) -> SocialAccount | None:
1✔
296
        # Note: we are NOT using .filter() here because it invalidates
297
        # any profile instances that were queried with prefetch_related, which
298
        # we use in at least the profile view to minimize queries
299
        if not hasattr(self.user, "socialaccount_set"):
1!
NEW
300
            raise AttributeError("self.user must have socialaccount_set attribute")
×
301
        for sa in self.user.socialaccount_set.all():
1✔
302
            if sa.provider == "fxa":
1!
303
                return sa
1✔
304
        return None
1✔
305

306
    @property
1✔
307
    def display_name(self) -> str | None:
1✔
308
        # if display name is not set on FxA the
309
        # displayName key will not exist on the extra_data
310
        if fxa := self.fxa:
1!
311
            name = fxa.extra_data.get("displayName")
1✔
312
            return name if name is None else str(name)
1✔
313
        return None
×
314

315
    @property
1✔
316
    def custom_domain(self) -> str:
1✔
NEW
317
        if not self.subdomain:
×
NEW
318
            raise ValueError("self.subdomain must be truthy value.")
×
UNCOV
319
        return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}"
×
320

321
    @property
1✔
322
    def has_premium(self) -> bool:
1✔
323
        # FIXME: as we don't have all the tiers defined we are over-defining
324
        # this to mark the user as a premium user as well
325
        if not self.fxa:
1✔
326
            return False
1✔
327
        for premium_domain in PREMIUM_DOMAINS:
1✔
328
            if self.user.email.endswith(f"@{premium_domain}"):
1!
329
                return True
×
330
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
331
        for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED:
1✔
332
            if sub in user_subscriptions:
1✔
333
                return True
1✔
334
        return False
1✔
335

336
    @property
1✔
337
    def has_phone(self) -> bool:
1✔
338
        if not self.fxa:
1✔
339
            return False
1✔
340
        if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST:
1!
341
            if not flag_is_active_in_task("phones", self.user):
×
342
                return False
×
343
        if flag_is_active_in_task("free_phones", self.user):
1!
344
            return True
×
345
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
346
        for sub in settings.SUBSCRIPTIONS_WITH_PHONE:
1✔
347
            if sub in user_subscriptions:
1✔
348
                return True
1✔
349
        return False
1✔
350

351
    @property
1✔
352
    def has_vpn(self) -> bool:
1✔
353
        if not self.fxa:
1!
354
            return False
×
355
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
356
        for sub in settings.SUBSCRIPTIONS_WITH_VPN:
1✔
357
            if sub in user_subscriptions:
1✔
358
                return True
1✔
359
        return False
1✔
360

361
    @property
1✔
362
    def emails_forwarded(self) -> int:
1✔
363
        return (
1✔
364
            sum(ra.num_forwarded for ra in self.relay_addresses)
365
            + sum(da.num_forwarded for da in self.domain_addresses)
366
            + self.num_email_forwarded_in_deleted_address
367
        )
368

369
    @property
1✔
370
    def emails_blocked(self) -> int:
1✔
371
        return (
1✔
372
            sum(ra.num_blocked for ra in self.relay_addresses)
373
            + sum(da.num_blocked for da in self.domain_addresses)
374
            + self.num_email_blocked_in_deleted_address
375
        )
376

377
    @property
1✔
378
    def emails_replied(self) -> int:
1✔
379
        ra_sum = self.relay_addresses.aggregate(models.Sum("num_replied", default=0))
1✔
380
        da_sum = self.domain_addresses.aggregate(models.Sum("num_replied", default=0))
1✔
381
        return (
1✔
382
            int(ra_sum["num_replied__sum"])
383
            + int(da_sum["num_replied__sum"])
384
            + self.num_email_replied_in_deleted_address
385
        )
386

387
    @property
1✔
388
    def level_one_trackers_blocked(self) -> int:
1✔
389
        return (
1✔
390
            sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses)
391
            + sum(
392
                da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses
393
            )
394
            + (self.num_level_one_trackers_blocked_in_deleted_address or 0)
395
        )
396

397
    @property
1✔
398
    def joined_before_premium_release(self):
1✔
399
        date_created = self.user.date_joined
1✔
400
        return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00")
1✔
401

402
    @property
1✔
403
    def date_phone_registered(self) -> datetime | None:
1✔
404
        if not settings.PHONES_ENABLED:
1!
405
            return None
×
406

407
        try:
1✔
408
            real_phone = RealPhone.objects.get(user=self.user)
1✔
409
            relay_number = RelayNumber.objects.get(user=self.user)
1✔
410
        except RealPhone.DoesNotExist:
1✔
411
            return None
1✔
412
        except RelayNumber.DoesNotExist:
1✔
413
            return real_phone.verified_date
1✔
414
        return relay_number.created_at or real_phone.verified_date
1✔
415

416
    def add_subdomain(self, subdomain):
1✔
417
        # Handles if the subdomain is "" or None
418
        if not subdomain:
1✔
419
            raise CannotMakeSubdomainException(
1✔
420
                "error-subdomain-cannot-be-empty-or-null"
421
            )
422

423
        # subdomain must be all lowercase
424
        subdomain = subdomain.lower()
1✔
425

426
        if not self.has_premium:
1✔
427
            raise CannotMakeSubdomainException("error-premium-set-subdomain")
1✔
428
        if self.subdomain is not None:
1✔
429
            raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain")
1✔
430
        self.subdomain = subdomain
1✔
431
        # The validator defined in the subdomain field does not get run in full_clean()
432
        # when self.subdomain is "" or None, so we need to run the validator again to
433
        # catch these cases.
434
        valid_available_subdomain(subdomain)
1✔
435
        self.full_clean()
1✔
436
        self.save()
1✔
437

438
        RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain))
1✔
439
        return subdomain
1✔
440

441
    def update_abuse_metric(
1✔
442
        self,
443
        address_created: bool = False,
444
        replied: bool = False,
445
        email_forwarded: bool = False,
446
        forwarded_email_size: int = 0,
447
    ) -> datetime | None:
448
        # TODO MPP-3720: This should be wrapped in atomic or select_for_update to ensure
449
        # race conditions are properly handled.
450

451
        # look for abuse metrics created on the same UTC date, regardless of time.
452
        midnight_utc_today = datetime.combine(
1✔
453
            datetime.now(UTC).date(), datetime.min.time()
454
        ).astimezone(UTC)
455
        midnight_utc_tomorow = midnight_utc_today + timedelta(days=1)
1✔
456
        abuse_metric = self.user.abusemetrics_set.filter(
1✔
457
            first_recorded__gte=midnight_utc_today,
458
            first_recorded__lt=midnight_utc_tomorow,
459
        ).first()
460
        if not abuse_metric:
1✔
461
            abuse_metric = AbuseMetrics.objects.create(user=self.user)
1✔
462
            AbuseMetrics.objects.filter(first_recorded__lt=midnight_utc_today).delete()
1✔
463

464
        # increment the abuse metric
465
        if address_created:
1✔
466
            abuse_metric.num_address_created_per_day += 1
1✔
467
        if replied:
1✔
468
            abuse_metric.num_replies_per_day += 1
1✔
469
        if email_forwarded:
1✔
470
            abuse_metric.num_email_forwarded_per_day += 1
1✔
471
        if forwarded_email_size > 0:
1✔
472
            abuse_metric.forwarded_email_size_per_day += forwarded_email_size
1✔
473
        abuse_metric.last_recorded = datetime.now(UTC)
1✔
474
        abuse_metric.save()
1✔
475

476
        # check user should be flagged for abuse
477
        hit_max_create = False
1✔
478
        hit_max_replies = False
1✔
479
        hit_max_forwarded = False
1✔
480
        hit_max_forwarded_email_size = False
1✔
481

482
        hit_max_create = (
1✔
483
            abuse_metric.num_address_created_per_day
484
            >= settings.MAX_ADDRESS_CREATION_PER_DAY
485
        )
486
        hit_max_replies = (
1✔
487
            abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY
488
        )
489
        hit_max_forwarded = (
1✔
490
            abuse_metric.num_email_forwarded_per_day >= settings.MAX_FORWARDED_PER_DAY
491
        )
492
        hit_max_forwarded_email_size = (
1✔
493
            abuse_metric.forwarded_email_size_per_day
494
            >= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY
495
        )
496
        if (
1✔
497
            hit_max_create
498
            or hit_max_replies
499
            or hit_max_forwarded
500
            or hit_max_forwarded_email_size
501
        ):
502
            self.last_account_flagged = datetime.now(UTC)
1✔
503
            self.save()
1✔
504
            data = {
1✔
505
                "uid": self.fxa.uid if self.fxa else None,
506
                "flagged": self.last_account_flagged.timestamp(),
507
                "replies": abuse_metric.num_replies_per_day,
508
                "addresses": abuse_metric.num_address_created_per_day,
509
                "forwarded": abuse_metric.num_email_forwarded_per_day,
510
                "forwarded_size_in_bytes": abuse_metric.forwarded_email_size_per_day,
511
            }
512
            # log for further secops review
513
            abuse_logger.info("Abuse flagged", extra=data)
1✔
514
        return self.last_account_flagged
1✔
515

516
    @property
1✔
517
    def is_flagged(self):
1✔
518
        if not self.last_account_flagged:
1✔
519
            return False
1✔
520
        account_premium_feature_resumed = self.last_account_flagged + timedelta(
1✔
521
            days=settings.PREMIUM_FEATURE_PAUSED_DAYS
522
        )
523
        if datetime.now(UTC) > account_premium_feature_resumed:
1!
524
            # premium feature has been resumed
525
            return False
×
526
        # user was flagged and the premium feature pause period is not yet over
527
        return True
1✔
528

529
    @property
1✔
530
    def metrics_enabled(self) -> bool:
1✔
531
        """
532
        Does the user allow us to record technical and interaction data?
533

534
        This is based on the Mozilla accounts opt-out option, added around 2022. A user
535
        can go to their Mozilla account profile settings, Data Collection and Use, and
536
        deselect "Help improve Mozilla Account". This setting defaults to On, and is
537
        sent as "metricsEnabled". Some older Relay accounts do not have
538
        "metricsEnabled", and we default to On.
539
        """
540
        if self.fxa:
1✔
541
            return bool(self.fxa.extra_data.get("metricsEnabled", True))
1✔
542
        return True
1✔
543

544
    @property
1✔
545
    def plan(self) -> Literal["free", "email", "phone", "bundle"]:
1✔
546
        """The user's Relay plan as a string."""
547
        if self.has_premium:
1✔
548
            if self.has_phone:
1✔
549
                return "bundle" if self.has_vpn else "phone"
1✔
550
            else:
551
                return "email"
1✔
552
        else:
553
            return "free"
1✔
554

555
    @property
1✔
556
    def plan_term(self) -> Literal[None, "unknown", "1_month", "1_year"]:
1✔
557
        """The user's Relay plan term as a string."""
558
        plan = self.plan
1✔
559
        if plan == "free":
1✔
560
            return None
1✔
561
        if plan == "phone":
1✔
562
            start_date = self.date_phone_subscription_start
1✔
563
            end_date = self.date_phone_subscription_end
1✔
564
            if start_date and end_date:
1✔
565
                span = end_date - start_date
1✔
566
                return "1_year" if span.days > 32 else "1_month"
1✔
567
        return "unknown"
1✔
568

569
    @property
1✔
570
    def metrics_premium_status(self) -> str:
1✔
571
        plan = self.plan
1✔
572
        if plan == "free":
1✔
573
            return "free"
1✔
574
        return f"{plan}_{self.plan_term}"
1✔
575

576

577
@receiver(models.signals.post_save, sender=Profile)
1✔
578
def copy_auth_token(sender, instance=None, created=False, **kwargs):
1✔
579
    if created:
1✔
580
        # baker triggers created during tests
581
        # so first check the user doesn't already have a Token
582
        try:
1✔
583
            Token.objects.get(user=instance.user)
1✔
584
            return
1✔
585
        except Token.DoesNotExist:
1✔
586
            Token.objects.create(user=instance.user, key=instance.api_token)
1✔
587

588

589
def address_hash(address, subdomain=None, domain=None):
1✔
590
    if not domain:
1✔
591
        domain = get_domains_from_settings()["MOZMAIL_DOMAIN"]
1✔
592
    if subdomain:
1✔
593
        return sha256(f"{address}@{subdomain}.{domain}".encode()).hexdigest()
1✔
594
    if domain == settings.RELAY_FIREFOX_DOMAIN:
1✔
595
        return sha256(f"{address}".encode()).hexdigest()
1✔
596
    return sha256(f"{address}@{domain}".encode()).hexdigest()
1✔
597

598

599
def address_default():
1✔
600
    return "".join(
1✔
601
        random.choices(  # noqa: S311 (standard pseudo-random generator used)
602
            string.ascii_lowercase + string.digits, k=9
603
        )
604
    )
605

606

607
def has_bad_words(value: str) -> bool:
1✔
608
    for badword in emails_config().badwords:
1✔
609
        badword = badword.strip()
1✔
610
        if len(badword) <= 4 and badword == value:
1✔
611
            return True
1✔
612
        if len(badword) > 4 and badword in value:
1✔
613
            return True
1✔
614
    return False
1✔
615

616

617
def is_blocklisted(value: str) -> bool:
1✔
618
    return any(blockedword == value for blockedword in emails_config().blocklist)
1✔
619

620

621
def get_domain_numerical(domain_address):
1✔
622
    # get domain name from the address
623
    domains = get_domains_from_settings()
1✔
624
    domains_keys = list(domains.keys())
1✔
625
    domains_values = list(domains.values())
1✔
626
    domain_name = domains_keys[domains_values.index(domain_address)]
1✔
627
    # get domain numerical value from domain name
628
    choices = dict(DOMAIN_CHOICES)
1✔
629
    choices_keys = list(choices.keys())
1✔
630
    choices_values = list(choices.values())
1✔
631
    return choices_keys[choices_values.index(domain_name)]
1✔
632

633

634
def hash_subdomain(subdomain, domain=settings.MOZMAIL_DOMAIN):
1✔
635
    return sha256(f"{subdomain}.{domain}".encode()).hexdigest()
1✔
636

637

638
class RegisteredSubdomain(models.Model):
1✔
639
    subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True)
1✔
640
    registered_at = models.DateTimeField(auto_now_add=True)
1✔
641

642
    def __str__(self):
1✔
643
        return self.subdomain_hash
×
644

645

646
class CannotMakeSubdomainException(BadRequest):
1✔
647
    """Exception raised by Profile due to error on subdomain creation.
648

649
    Attributes:
650
        message -- optional explanation of the error
651
    """
652

653
    def __init__(self, message=None):
1✔
654
        self.message = message
1✔
655

656

657
class CannotMakeAddressException(RelayAPIException):
1✔
658
    """Base exception for RelayAddress or DomainAddress creation failure."""
659

660

661
class AccountIsPausedException(CannotMakeAddressException):
1✔
662
    default_code = "account_is_paused"
1✔
663
    default_detail = "Your account is on pause."
1✔
664
    status_code = 403
1✔
665

666

667
class RelayAddrFreeTierLimitException(CannotMakeAddressException):
1✔
668
    default_code = "free_tier_limit"
1✔
669
    default_detail_template = (
1✔
670
        "You’ve used all {free_tier_limit} email masks included with your free account."
671
        " You can reuse an existing mask, but using a unique mask for each account is"
672
        " the most secure option."
673
    )
674
    status_code = 403
1✔
675

676
    def __init__(self, free_tier_limit: int | None = None):
1✔
677
        self.free_tier_limit = free_tier_limit or settings.MAX_NUM_FREE_ALIASES
1✔
678
        super().__init__()
1✔
679

680
    def error_context(self) -> ErrorContextType:
1✔
681
        return {"free_tier_limit": self.free_tier_limit}
1✔
682

683

684
class DomainAddrFreeTierException(CannotMakeAddressException):
1✔
685
    default_code = "free_tier_no_subdomain_masks"
1✔
686
    default_detail = (
1✔
687
        "Your free account does not include custom subdomains for masks."
688
        " To create custom masks, upgrade to Relay Premium."
689
    )
690
    status_code = 403
1✔
691

692

693
class DomainAddrNeedSubdomainException(CannotMakeAddressException):
1✔
694
    default_code = "need_subdomain"
1✔
695
    default_detail = "Please select a subdomain before creating a custom email address."
1✔
696
    status_code = 400
1✔
697

698

699
class DomainAddrUpdateException(CannotMakeAddressException):
1✔
700
    """Exception raised when attempting to edit an existing domain address field."""
701

702
    default_code = "address_not_editable"
1✔
703
    default_detail = "You cannot edit an existing domain address field."
1✔
704
    status_code = 400
1✔
705

706

707
class DomainAddrUnavailableException(CannotMakeAddressException):
1✔
708
    default_code = "address_unavailable"
1✔
709
    default_detail_template = (
1✔
710
        "“{unavailable_address}” could not be created."
711
        " Please try again with a different mask name."
712
    )
713
    status_code = 400
1✔
714

715
    def __init__(self, unavailable_address: str):
1✔
716
        self.unavailable_address = unavailable_address
1✔
717
        super().__init__()
1✔
718

719
    def error_context(self) -> ErrorContextType:
1✔
720
        return {"unavailable_address": self.unavailable_address}
1✔
721

722

723
class DomainAddrDuplicateException(CannotMakeAddressException):
1✔
724
    default_code = "duplicate_address"
1✔
725
    default_detail_template = (
1✔
726
        "“{duplicate_address}” already exists."
727
        " Please try again with a different mask name."
728
    )
729
    status_code = 409
1✔
730

731
    def __init__(self, duplicate_address: str):
1✔
732
        self.duplicate_address = duplicate_address
1✔
733
        super().__init__()
1✔
734

735
    def error_context(self) -> ErrorContextType:
1✔
736
        return {"duplicate_address": self.duplicate_address}
1✔
737

738

739
class RelayAddress(models.Model):
1✔
740
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
741
    address = models.CharField(max_length=64, default=address_default, unique=True)
1✔
742
    domain = models.PositiveSmallIntegerField(
1✔
743
        choices=DOMAIN_CHOICES, default=default_domain_numerical
744
    )
745
    enabled = models.BooleanField(default=True)
1✔
746
    description = models.CharField(max_length=64, blank=True)
1✔
747
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
748
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
749
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
750
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
751
    num_blocked = models.PositiveIntegerField(default=0)
1✔
752
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
753
    num_replied = models.PositiveIntegerField(default=0)
1✔
754
    num_spam = models.PositiveIntegerField(default=0)
1✔
755
    generated_for = models.CharField(max_length=255, blank=True)
1✔
756
    block_list_emails = models.BooleanField(default=False)
1✔
757
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
758

759
    class Meta:
1✔
760
        indexes = [
1✔
761
            # Find when a user first used the add-on
762
            models.Index(
763
                name="idx_ra_created_by_addon",
764
                fields=["user"],
765
                condition=~models.Q(generated_for__exact=""),
766
                include=["created_at"],
767
            ),
768
        ]
769

770
    def __str__(self):
1✔
771
        return self.address
1✔
772

773
    def delete(self, *args, **kwargs):
1✔
774
        # TODO: create hard bounce receipt rule in AWS for the address
775
        deleted_address = DeletedAddress.objects.create(
1✔
776
            address_hash=address_hash(self.address, domain=self.domain_value),
777
            num_forwarded=self.num_forwarded,
778
            num_blocked=self.num_blocked,
779
            num_replied=self.num_replied,
780
            num_spam=self.num_spam,
781
        )
782
        deleted_address.save()
1✔
783
        profile = Profile.objects.get(user=self.user)
1✔
784
        profile.address_last_deleted = datetime.now(UTC)
1✔
785
        profile.num_address_deleted += 1
1✔
786
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
787
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
788
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
789
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
790
        ) + (self.num_level_one_trackers_blocked or 0)
791
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
792
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
793
        profile.num_deleted_relay_addresses += 1
1✔
794
        profile.last_engagement = datetime.now(UTC)
1✔
795
        profile.save()
1✔
796
        return super().delete(*args, **kwargs)
1✔
797

798
    def save(
1✔
799
        self,
800
        force_insert: bool | tuple[ModelBase, ...] = False,
801
        force_update: bool = False,
802
        using: str | None = None,
803
        update_fields: Iterable[str] | None = None,
804
    ) -> None:
805
        if self._state.adding:
1✔
806
            with transaction.atomic():
1✔
807
                locked_profile = Profile.objects.select_for_update().get(user=self.user)
1✔
808
                check_user_can_make_another_address(locked_profile)
1✔
809
                while True:
1✔
810
                    address_is_allowed = not is_blocklisted(self.address)
1✔
811
                    address_is_valid = valid_address(self.address, self.domain_value)
1✔
812
                    if address_is_valid and address_is_allowed:
1✔
813
                        break
1✔
814
                    self.address = address_default()
1✔
815
                locked_profile.update_abuse_metric(address_created=True)
1✔
816
                locked_profile.last_engagement = datetime.now(UTC)
1✔
817
                locked_profile.save()
1✔
818
        if (not self.user.profile.server_storage) and any(
1✔
819
            (self.description, self.generated_for, self.used_on)
820
        ):
821
            self.description = ""
1✔
822
            self.generated_for = ""
1✔
823
            self.used_on = ""
1✔
824
            if update_fields is not None:
1✔
825
                update_fields = {"description", "generated_for", "used_on"}.union(
1✔
826
                    update_fields
827
                )
828
        if not self.user.profile.has_premium and self.block_list_emails:
1✔
829
            self.block_list_emails = False
1✔
830
            if update_fields is not None:
1✔
831
                update_fields = {"block_list_emails"}.union(update_fields)
1✔
832
        super().save(
1✔
833
            force_insert=force_insert,
834
            force_update=force_update,
835
            using=using,
836
            update_fields=update_fields,
837
        )
838

839
    @property
1✔
840
    def domain_value(self) -> str:
1✔
841
        domain = cast(
1✔
842
            Literal["RELAY_FIREFOX_DOMAIN", "MOZMAIL_DOMAIN"], self.get_domain_display()
843
        )
844
        return get_domains_from_settings()[domain]
1✔
845

846
    @property
1✔
847
    def full_address(self) -> str:
1✔
848
        return f"{self.address}@{self.domain_value}"
1✔
849

850
    @property
1✔
851
    def metrics_id(self) -> str:
1✔
852
        if not self.id:
1!
NEW
853
            raise ValueError("self.id must be truthy value.")
×
854
        # Prefix with 'R' for RelayAddress, since there may be a DomainAddress with the
855
        # same row ID
856
        return f"R{self.id}"
1✔
857

858

859
def check_user_can_make_another_address(profile: Profile) -> None:
1✔
860
    if profile.is_flagged:
1✔
861
        raise AccountIsPausedException()
1✔
862
    # MPP-3021: return early for premium users to avoid at_max_free_aliases DB query
863
    if profile.has_premium:
1✔
864
        return
1✔
865
    if profile.at_max_free_aliases:
1✔
866
        raise RelayAddrFreeTierLimitException()
1✔
867

868

869
def valid_address_pattern(address):
1✔
870
    #   can't start or end with a hyphen
871
    #   must be 1-63 lowercase alphanumeric characters and/or hyphens
872
    valid_address_pattern = re.compile("^(?![-.])[a-z0-9-.]{1,63}(?<![-.])$")
1✔
873
    return valid_address_pattern.match(address) is not None
1✔
874

875

876
def valid_address(address: str, domain: str, subdomain: str | None = None) -> bool:
1✔
877
    address_pattern_valid = valid_address_pattern(address)
1✔
878
    address_contains_badword = has_bad_words(address)
1✔
879
    address_already_deleted = 0
1✔
880
    if not subdomain or flag_is_active_in_task(
1✔
881
        "custom_domain_management_redesign", None
882
    ):
883
        address_already_deleted = DeletedAddress.objects.filter(
1✔
884
            address_hash=address_hash(address, domain=domain, subdomain=subdomain)
885
        ).count()
886
    if (
1✔
887
        address_already_deleted > 0
888
        or address_contains_badword
889
        or not address_pattern_valid
890
    ):
891
        return False
1✔
892
    return True
1✔
893

894

895
class DeletedAddress(models.Model):
1✔
896
    address_hash = models.CharField(max_length=64, db_index=True)
1✔
897
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
898
    num_blocked = models.PositiveIntegerField(default=0)
1✔
899
    num_replied = models.PositiveIntegerField(default=0)
1✔
900
    num_spam = models.PositiveIntegerField(default=0)
1✔
901

902
    def __str__(self):
1✔
903
        return self.address_hash
×
904

905

906
def check_user_can_make_domain_address(user_profile: Profile) -> None:
1✔
907
    if not user_profile.has_premium:
1✔
908
        raise DomainAddrFreeTierException()
1✔
909

910
    if not user_profile.subdomain:
1✔
911
        raise DomainAddrNeedSubdomainException()
1✔
912

913
    if user_profile.is_flagged:
1✔
914
        raise AccountIsPausedException()
1✔
915

916

917
class DomainAddress(models.Model):
1✔
918
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
919
    address = models.CharField(
1✔
920
        max_length=64, validators=[MinLengthValidator(limit_value=1)]
921
    )
922
    enabled = models.BooleanField(default=True)
1✔
923
    description = models.CharField(max_length=64, blank=True)
1✔
924
    domain = models.PositiveSmallIntegerField(choices=DOMAIN_CHOICES, default=2)
1✔
925
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
926
    first_emailed_at = models.DateTimeField(null=True, db_index=True)
1✔
927
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
928
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
929
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
930
    num_blocked = models.PositiveIntegerField(default=0)
1✔
931
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
932
    num_replied = models.PositiveIntegerField(default=0)
1✔
933
    num_spam = models.PositiveIntegerField(default=0)
1✔
934
    block_list_emails = models.BooleanField(default=False)
1✔
935
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
936

937
    class Meta:
1✔
938
        unique_together = ["user", "address"]
1✔
939

940
    def __str__(self):
1✔
941
        return self.address
×
942

943
    def save(
1✔
944
        self,
945
        force_insert: bool | tuple[ModelBase, ...] = False,
946
        force_update: bool = False,
947
        using: str | None = None,
948
        update_fields: Iterable[str] | None = None,
949
    ) -> None:
950
        user_profile = self.user.profile
1✔
951
        if self._state.adding:
1✔
952
            check_user_can_make_domain_address(user_profile)
1✔
953
            domain_address_valid = valid_address(
1✔
954
                self.address, self.domain_value, user_profile.subdomain
955
            )
956
            if not domain_address_valid:
1✔
957
                if self.first_emailed_at:
1!
958
                    incr_if_enabled("domainaddress.create_via_email_fail")
×
959
                raise DomainAddrUnavailableException(unavailable_address=self.address)
1✔
960

961
            if DomainAddress.objects.filter(
1✔
962
                user=self.user, address=self.address
963
            ).exists():
964
                raise DomainAddrDuplicateException(duplicate_address=self.address)
1✔
965

966
            user_profile.update_abuse_metric(address_created=True)
1✔
967
            user_profile.last_engagement = datetime.now(UTC)
1✔
968
            user_profile.save(update_fields=["last_engagement"])
1✔
969
            incr_if_enabled("domainaddress.create")
1✔
970
            if self.first_emailed_at:
1✔
971
                incr_if_enabled("domainaddress.create_via_email")
1✔
972
        else:
973
            # The model is in an update state, do not allow 'address' field updates
974
            existing_instance = DomainAddress.objects.get(id=self.id)
1✔
975
            if existing_instance.address != self.address:
1✔
976
                raise DomainAddrUpdateException()
1✔
977

978
        if not user_profile.has_premium and self.block_list_emails:
1✔
979
            self.block_list_emails = False
1✔
980
            if update_fields:
1✔
981
                update_fields = {"block_list_emails"}.union(update_fields)
1✔
982
        if (not user_profile.server_storage) and (self.description or self.used_on):
1✔
983
            self.description = ""
1✔
984
            self.used_on = ""
1✔
985
            if update_fields:
1✔
986
                update_fields = {"description", "used_on"}.union(update_fields)
1✔
987
        super().save(
1✔
988
            force_insert=force_insert,
989
            force_update=force_update,
990
            using=using,
991
            update_fields=update_fields,
992
        )
993

994
    @property
1✔
995
    def user_profile(self):
1✔
996
        return Profile.objects.get(user=self.user)
1✔
997

998
    @staticmethod
1✔
999
    def make_domain_address(
1✔
1000
        user_profile: Profile, address: str | None = None, made_via_email: bool = False
1001
    ) -> DomainAddress:
1002
        check_user_can_make_domain_address(user_profile)
1✔
1003

1004
        if not address:
1✔
1005
            # FIXME: if the alias is randomly generated and has bad words
1006
            # we should retry like make_relay_address does
1007
            # not fixing this now because not sure randomly generated
1008
            # DomainAlias will be a feature
1009
            address = address_default()
1✔
1010
            # Only check for bad words if randomly generated
1011
        if not isinstance(address, str):
1!
NEW
1012
            raise TypeError("address must be type str")
×
1013

1014
        first_emailed_at = datetime.now(UTC) if made_via_email else None
1✔
1015
        domain_address = DomainAddress.objects.create(
1✔
1016
            user=user_profile.user, address=address, first_emailed_at=first_emailed_at
1017
        )
1018
        return domain_address
1✔
1019

1020
    def delete(self, *args, **kwargs):
1✔
1021
        # TODO: create hard bounce receipt rule in AWS for the address
1022
        deleted_address = DeletedAddress.objects.create(
1✔
1023
            address_hash=address_hash(
1024
                self.address, self.user_profile.subdomain, self.domain_value
1025
            ),
1026
            num_forwarded=self.num_forwarded,
1027
            num_blocked=self.num_blocked,
1028
            num_replied=self.num_replied,
1029
            num_spam=self.num_spam,
1030
        )
1031
        deleted_address.save()
1✔
1032
        # self.user_profile is a property and should not be used to
1033
        # update values on the user's profile
1034
        profile = Profile.objects.get(user=self.user)
1✔
1035
        profile.address_last_deleted = datetime.now(UTC)
1✔
1036
        profile.num_address_deleted += 1
1✔
1037
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
1038
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
1039
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
1040
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
1041
        ) + (self.num_level_one_trackers_blocked or 0)
1042
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
1043
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
1044
        profile.num_deleted_domain_addresses += 1
1✔
1045
        profile.last_engagement = datetime.now(UTC)
1✔
1046
        profile.save()
1✔
1047
        return super().delete(*args, **kwargs)
1✔
1048

1049
    @property
1✔
1050
    def domain_value(self) -> str:
1✔
1051
        domain = cast(
1✔
1052
            Literal["RELAY_FIREFOX_DOMAIN", "MOZMAIL_DOMAIN"], self.get_domain_display()
1053
        )
1054
        return get_domains_from_settings()[domain]
1✔
1055

1056
    @property
1✔
1057
    def full_address(self) -> str:
1✔
1058
        return f"{self.address}@{self.user_profile.subdomain}.{self.domain_value}"
1✔
1059

1060
    @property
1✔
1061
    def metrics_id(self) -> str:
1✔
1062
        if not self.id:
1!
NEW
1063
            raise ValueError("self.id must be truthy value.")
×
1064
        # Prefix with 'D' for DomainAddress, since there may be a RelayAddress with the
1065
        # same row ID
1066
        return f"D{self.id}"
1✔
1067

1068

1069
class Reply(models.Model):
1✔
1070
    relay_address = models.ForeignKey(
1✔
1071
        RelayAddress, on_delete=models.CASCADE, blank=True, null=True
1072
    )
1073
    domain_address = models.ForeignKey(
1✔
1074
        DomainAddress, on_delete=models.CASCADE, blank=True, null=True
1075
    )
1076
    lookup = models.CharField(max_length=255, blank=False, db_index=True)
1✔
1077
    encrypted_metadata = models.TextField(blank=False)
1✔
1078
    created_at = models.DateField(auto_now_add=True, null=False, db_index=True)
1✔
1079

1080
    @property
1✔
1081
    def address(self):
1✔
1082
        return self.relay_address or self.domain_address
1✔
1083

1084
    @property
1✔
1085
    def profile(self):
1✔
1086
        return self.address.user.profile
1✔
1087

1088
    @property
1✔
1089
    def owner_has_premium(self):
1✔
1090
        return self.profile.has_premium
1✔
1091

1092
    def increment_num_replied(self):
1✔
1093
        address = self.relay_address or self.domain_address
1✔
1094
        if not address:
1!
NEW
1095
            raise ValueError("address must be truthy value")
×
1096
        address.num_replied += 1
1✔
1097
        address.last_used_at = datetime.now(UTC)
1✔
1098
        address.save(update_fields=["num_replied", "last_used_at"])
1✔
1099
        return address.num_replied
1✔
1100

1101

1102
class AbuseMetrics(models.Model):
1✔
1103
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
1104
    first_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
1105
    last_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
1106
    num_address_created_per_day = models.PositiveSmallIntegerField(default=0)
1✔
1107
    num_replies_per_day = models.PositiveSmallIntegerField(default=0)
1✔
1108
    # Values from 0 to 32767 are safe in all databases supported by Django.
1109
    num_email_forwarded_per_day = models.PositiveSmallIntegerField(default=0)
1✔
1110
    # Values from 0 to 9.2 exabytes are safe in all databases supported by Django.
1111
    forwarded_email_size_per_day = models.PositiveBigIntegerField(default=0)
1✔
1112

1113
    class Meta:
1✔
1114
        unique_together = ["user", "first_recorded"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc