• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / b2e067fe-ce4e-4099-9bef-07b368e99782

15 Apr 2024 04:18PM CUT coverage: 75.544% (+0.002%) from 75.542%
b2e067fe-ce4e-4099-9bef-07b368e99782

push

circleci

jwhitlock
Enable pyupgrade, fix issues

2443 of 3405 branches covered (71.75%)

Branch coverage included in aggregate %.

56 of 59 new or added lines in 14 files covered. (94.92%)

234 existing lines in 24 files now uncovered.

6793 of 8821 relevant lines covered (77.01%)

20.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.42
/emails/models.py
1
from __future__ import annotations
1✔
2

3
import logging
1✔
4
import random
1✔
5
import re
1✔
6
import string
1✔
7
import uuid
1✔
8
from collections import namedtuple
1✔
9
from collections.abc import Iterable
1✔
10
from datetime import UTC, datetime, timedelta
1✔
11
from hashlib import sha256
1✔
12
from typing import Literal
1✔
13

14
from django.conf import settings
1✔
15
from django.contrib.auth.models import User
1✔
16
from django.core.exceptions import BadRequest
1✔
17
from django.core.validators import MinLengthValidator
1✔
18
from django.db import models, transaction
1✔
19
from django.dispatch import receiver
1✔
20
from django.utils.translation.trans_real import (
1✔
21
    get_supported_language_variant,
22
    parse_accept_lang_header,
23
)
24

25
from allauth.socialaccount.models import SocialAccount
1✔
26
from rest_framework.authtoken.models import Token
1✔
27

28
from api.exceptions import ErrorContextType, RelayAPIException
1✔
29
from privaterelay.plans import get_premium_countries
1✔
30
from privaterelay.utils import (
1✔
31
    AcceptLanguageError,
32
    flag_is_active_in_task,
33
    guess_country_from_accept_lang,
34
)
35

36
from .apps import emails_config
1✔
37
from .utils import get_domains_from_settings, incr_if_enabled
1✔
38

39
if settings.PHONES_ENABLED:
1!
40
    from phones.models import RealPhone, RelayNumber
1✔
41

42

43
logger = logging.getLogger("events")
1✔
44
abuse_logger = logging.getLogger("abusemetrics")
1✔
45

46
BounceStatus = namedtuple("BounceStatus", "paused type")
1✔
47

48
DOMAIN_CHOICES = [(1, "RELAY_FIREFOX_DOMAIN"), (2, "MOZMAIL_DOMAIN")]
1✔
49
PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"]
1✔
50

51

52
def valid_available_subdomain(subdomain, *args, **kwargs):
1✔
53
    if not subdomain:
1✔
54
        raise CannotMakeSubdomainException("error-subdomain-cannot-be-empty-or-null")
1✔
55
    # valid subdomains:
56
    #   can't start or end with a hyphen
57
    #   must be 1-63 alphanumeric characters and/or hyphens
58
    subdomain = subdomain.lower()
1✔
59
    valid_subdomain_pattern = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$")
1✔
60
    valid = valid_subdomain_pattern.match(subdomain) is not None
1✔
61
    #   can't have "bad" words in them
62
    bad_word = has_bad_words(subdomain)
1✔
63
    #   can't have "blocked" words in them
64
    blocked_word = is_blocklisted(subdomain)
1✔
65
    #   can't be taken by someone else
66
    taken = (
1✔
67
        RegisteredSubdomain.objects.filter(
68
            subdomain_hash=hash_subdomain(subdomain)
69
        ).count()
70
        > 0
71
    )
72
    if not valid or bad_word or blocked_word or taken:
1✔
73
        raise CannotMakeSubdomainException("error-subdomain-not-available")
1✔
74
    return True
1✔
75

76

77
# This historical function is referenced in migration
78
# 0029_profile_add_deleted_metric_and_changeserver_storage_default
79
def default_server_storage():
1✔
UNCOV
80
    return True
×
81

82

83
def default_domain_numerical():
1✔
84
    domains = get_domains_from_settings()
1✔
85
    domain = domains["MOZMAIL_DOMAIN"]
1✔
86
    return get_domain_numerical(domain)
1✔
87

88

89
class Profile(models.Model):
1✔
90
    user = models.OneToOneField(User, on_delete=models.CASCADE)
1✔
91
    api_token = models.UUIDField(default=uuid.uuid4)
1✔
92
    num_address_deleted = models.PositiveIntegerField(default=0)
1✔
93
    date_subscribed = models.DateTimeField(blank=True, null=True)
1✔
94
    date_subscribed_phone = models.DateTimeField(blank=True, null=True)
1✔
95
    # TODO MPP-2972: delete date_phone_subscription_checked in favor of
96
    # date_phone_subscription_next_reset
97
    date_phone_subscription_checked = models.DateTimeField(blank=True, null=True)
1✔
98
    date_phone_subscription_start = models.DateTimeField(blank=True, null=True)
1✔
99
    date_phone_subscription_reset = models.DateTimeField(blank=True, null=True)
1✔
100
    date_phone_subscription_end = models.DateTimeField(blank=True, null=True)
1✔
101
    address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
102
    last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
103
    last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
104
    last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
105
    num_deleted_relay_addresses = models.PositiveIntegerField(default=0)
1✔
106
    num_deleted_domain_addresses = models.PositiveIntegerField(default=0)
1✔
107
    num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
108
    num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
109
    num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField(
1✔
110
        default=0, null=True
111
    )
112
    num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
113
    num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0)
1✔
114
    subdomain = models.CharField(
1✔
115
        blank=True,
116
        null=True,
117
        unique=True,
118
        max_length=63,
119
        db_index=True,
120
        validators=[valid_available_subdomain],
121
    )
122
    # Whether we store the user's alias labels in the server
123
    server_storage = models.BooleanField(default=True)
1✔
124
    # Whether we store the caller/sender log for the user's relay number
125
    store_phone_log = models.BooleanField(default=True)
1✔
126
    # TODO: Data migration to set null to false
127
    # TODO: Schema migration to remove null=True
128
    remove_level_one_email_trackers = models.BooleanField(null=True, default=False)
1✔
129
    onboarding_state = models.PositiveIntegerField(default=0)
1✔
130
    onboarding_free_state = models.PositiveIntegerField(default=0)
1✔
131
    auto_block_spam = models.BooleanField(default=False)
1✔
132
    forwarded_first_reply = models.BooleanField(default=False)
1✔
133
    # Empty string means the profile was created through relying party flow
134
    created_by = models.CharField(blank=True, null=True, max_length=63)
1✔
135
    sent_welcome_email = models.BooleanField(default=False)
1✔
136
    last_engagement = models.DateTimeField(blank=True, null=True, db_index=True)
1✔
137

138
    def __str__(self):
1✔
139
        return "%s Profile" % self.user
1✔
140

141
    def save(
1✔
142
        self,
143
        force_insert: bool = False,
144
        force_update: bool = False,
145
        using: str | None = None,
146
        update_fields: Iterable[str] | None = None,
147
    ) -> None:
148
        # always lower-case the subdomain before saving it
149
        # TODO: change subdomain field as a custom field inheriting from
150
        # CharField to validate constraints on the field update too
151
        if self.subdomain and not self.subdomain.islower():
1✔
152
            self.subdomain = self.subdomain.lower()
1✔
153
            if update_fields is not None:
1✔
154
                update_fields = {"subdomain"}.union(update_fields)
1✔
155
        super().save(
1✔
156
            force_insert=force_insert,
157
            force_update=force_update,
158
            using=using,
159
            update_fields=update_fields,
160
        )
161
        # any time a profile is saved with server_storage False, delete the
162
        # appropriate server-stored Relay address data.
163
        if not self.server_storage:
1✔
164
            relay_addresses = RelayAddress.objects.filter(user=self.user)
1✔
165
            relay_addresses.update(description="", generated_for="", used_on="")
1✔
166
            domain_addresses = DomainAddress.objects.filter(user=self.user)
1✔
167
            domain_addresses.update(description="", used_on="")
1✔
168
        if settings.PHONES_ENABLED:
1!
169
            # any time a profile is saved with store_phone_log False, delete the
170
            # appropriate server-stored InboundContact records
171
            from phones.models import InboundContact, RelayNumber
1✔
172

173
            if not self.store_phone_log:
1✔
174
                try:
1✔
175
                    relay_number = RelayNumber.objects.get(user=self.user)
1✔
176
                    InboundContact.objects.filter(relay_number=relay_number).delete()
1✔
177
                except RelayNumber.DoesNotExist:
1✔
178
                    pass
1✔
179

180
    @property
1✔
181
    def language(self):
1✔
182
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
183
            for accept_lang, _ in parse_accept_lang_header(
1!
184
                self.fxa.extra_data.get("locale")
185
            ):
186
                try:
1✔
187
                    return get_supported_language_variant(accept_lang)
1✔
UNCOV
188
                except LookupError:
×
UNCOV
189
                    continue
×
190
        return "en"
1✔
191

192
    # This method returns whether the locale associated with the user's Mozilla account
193
    # includes a country code from a Premium country. This is less accurate than using
194
    # get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so
195
    # prefer using that if a request context is available. In other contexts, for
196
    # example when sending an email, this method can be useful.
197
    @property
1✔
198
    def fxa_locale_in_premium_country(self) -> bool:
1✔
199
        if self.fxa and self.fxa.extra_data.get("locale"):
1✔
200
            try:
1✔
201
                country = guess_country_from_accept_lang(self.fxa.extra_data["locale"])
1✔
202
            except AcceptLanguageError:
1✔
203
                return False
1✔
204
            premium_countries = get_premium_countries()
1✔
205
            if country in premium_countries:
1✔
206
                return True
1✔
207
        return False
1✔
208

209
    @property
1✔
210
    def avatar(self) -> str | None:
1✔
211
        if fxa := self.fxa:
1!
212
            return str(fxa.extra_data.get("avatar"))
1✔
UNCOV
213
        return None
×
214

215
    @property
1✔
216
    def relay_addresses(self):
1✔
217
        return RelayAddress.objects.filter(user=self.user)
1✔
218

219
    @property
1✔
220
    def domain_addresses(self):
1✔
221
        return DomainAddress.objects.filter(user=self.user)
1✔
222

223
    @property
1✔
224
    def total_masks(self) -> int:
1✔
225
        ra_count: int = self.relay_addresses.count()
1✔
226
        da_count: int = self.domain_addresses.count()
1✔
227
        return ra_count + da_count
1✔
228

229
    @property
1✔
230
    def at_mask_limit(self) -> bool:
1✔
231
        if self.has_premium:
1✔
232
            return False
1✔
233
        ra_count: int = self.relay_addresses.count()
1✔
234
        return ra_count >= settings.MAX_NUM_FREE_ALIASES
1✔
235

236
    def check_bounce_pause(self):
1✔
237
        if self.last_hard_bounce:
1✔
238
            last_hard_bounce_allowed = datetime.now(UTC) - timedelta(
1✔
239
                days=settings.HARD_BOUNCE_ALLOWED_DAYS
240
            )
241
            if self.last_hard_bounce > last_hard_bounce_allowed:
1✔
242
                return BounceStatus(True, "hard")
1✔
243
            self.last_hard_bounce = None
1✔
244
            self.save()
1✔
245
        if self.last_soft_bounce:
1✔
246
            last_soft_bounce_allowed = datetime.now(UTC) - timedelta(
1✔
247
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
248
            )
249
            if self.last_soft_bounce > last_soft_bounce_allowed:
1✔
250
                return BounceStatus(True, "soft")
1✔
251
            self.last_soft_bounce = None
1✔
252
            self.save()
1✔
253
        return BounceStatus(False, "")
1✔
254

255
    @property
1✔
256
    def bounce_status(self):
1✔
257
        return self.check_bounce_pause()
1✔
258

259
    @property
1✔
260
    def next_email_try(self):
1✔
261
        bounce_pause, bounce_type = self.check_bounce_pause()
1✔
262

263
        if not bounce_pause:
1✔
264
            return datetime.now(UTC)
1✔
265

266
        if bounce_type == "soft":
1✔
267
            assert self.last_soft_bounce
1✔
268
            return self.last_soft_bounce + timedelta(
1✔
269
                days=settings.SOFT_BOUNCE_ALLOWED_DAYS
270
            )
271

272
        assert bounce_type == "hard"
1✔
273
        assert self.last_hard_bounce
1✔
274
        return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS)
1✔
275

276
    @property
1✔
277
    def last_bounce_date(self):
1✔
278
        if self.last_hard_bounce:
1✔
279
            return self.last_hard_bounce
1✔
280
        if self.last_soft_bounce:
1✔
281
            return self.last_soft_bounce
1✔
282
        return None
1✔
283

284
    @property
1✔
285
    def at_max_free_aliases(self) -> bool:
1✔
286
        relay_addresses_count: int = self.relay_addresses.count()
1✔
287
        return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES
1✔
288

289
    @property
1✔
290
    def fxa(self) -> SocialAccount | None:
1✔
291
        # Note: we are NOT using .filter() here because it invalidates
292
        # any profile instances that were queried with prefetch_related, which
293
        # we use in at least the profile view to minimize queries
294
        assert hasattr(self.user, "socialaccount_set")
1✔
295
        for sa in self.user.socialaccount_set.all():
1✔
296
            if sa.provider == "fxa":
1!
297
                return sa
1✔
298
        return None
1✔
299

300
    @property
1✔
301
    def display_name(self) -> str | None:
1✔
302
        # if display name is not set on FxA the
303
        # displayName key will not exist on the extra_data
304
        if fxa := self.fxa:
1!
305
            name = fxa.extra_data.get("displayName")
1✔
306
            return name if name is None else str(name)
1✔
UNCOV
307
        return None
×
308

309
    @property
1✔
310
    def custom_domain(self) -> str:
1✔
UNCOV
311
        assert self.subdomain
×
UNCOV
312
        return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}"
×
313

314
    @property
1✔
315
    def has_premium(self) -> bool:
1✔
316
        # FIXME: as we don't have all the tiers defined we are over-defining
317
        # this to mark the user as a premium user as well
318
        if not self.fxa:
1✔
319
            return False
1✔
320
        for premium_domain in PREMIUM_DOMAINS:
1✔
321
            if self.user.email.endswith(f"@{premium_domain}"):
1!
UNCOV
322
                return True
×
323
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
324
        for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED:
1✔
325
            if sub in user_subscriptions:
1✔
326
                return True
1✔
327
        return False
1✔
328

329
    @property
1✔
330
    def has_phone(self) -> bool:
1✔
331
        if not self.fxa:
1✔
332
            return False
1✔
333
        if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST:
1!
UNCOV
334
            if not flag_is_active_in_task("phones", self.user):
×
335
                return False
×
336
        if flag_is_active_in_task("free_phones", self.user):
1!
UNCOV
337
            return True
×
338
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
339
        for sub in settings.SUBSCRIPTIONS_WITH_PHONE:
1✔
340
            if sub in user_subscriptions:
1✔
341
                return True
1✔
342
        return False
1✔
343

344
    @property
1✔
345
    def has_vpn(self):
1✔
346
        if not self.fxa:
1!
UNCOV
347
            return False
×
348
        user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
1✔
349
        for sub in settings.SUBSCRIPTIONS_WITH_VPN:
1✔
350
            if sub in user_subscriptions:
1✔
351
                return True
1✔
352
        return False
1✔
353

354
    @property
1✔
355
    def emails_forwarded(self):
1✔
356
        return (
1✔
357
            sum(ra.num_forwarded for ra in self.relay_addresses)
358
            + sum(da.num_forwarded for da in self.domain_addresses)
359
            + self.num_email_forwarded_in_deleted_address
360
        )
361

362
    @property
1✔
363
    def emails_blocked(self):
1✔
364
        return (
1✔
365
            sum(ra.num_blocked for ra in self.relay_addresses)
366
            + sum(da.num_blocked for da in self.domain_addresses)
367
            + self.num_email_blocked_in_deleted_address
368
        )
369

370
    @property
1✔
371
    def emails_replied(self):
1✔
372
        # Once Django is on version 4.0 and above, we can set the default=0
373
        # and return a int instead of None
374
        # https://docs.djangoproject.com/en/4.0/ref/models/querysets/#default
375
        totals = [self.relay_addresses.aggregate(models.Sum("num_replied"))]
1✔
376
        totals.append(self.domain_addresses.aggregate(models.Sum("num_replied")))
1✔
377
        total_num_replied = 0
1✔
378
        for num in totals:
1✔
379
            total_num_replied += (
1✔
380
                num.get("num_replied__sum") if num.get("num_replied__sum") else 0
381
            )
382
        return total_num_replied + self.num_email_replied_in_deleted_address
1✔
383

384
    @property
1✔
385
    def level_one_trackers_blocked(self):
1✔
386
        return (
1✔
387
            sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses)
388
            + sum(
389
                da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses
390
            )
391
            + (self.num_level_one_trackers_blocked_in_deleted_address or 0)
392
        )
393

394
    @property
1✔
395
    def joined_before_premium_release(self):
1✔
396
        date_created = self.user.date_joined
1✔
397
        return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00")
1✔
398

399
    @property
1✔
400
    def date_phone_registered(self) -> datetime | None:
1✔
401
        if not settings.PHONES_ENABLED:
1!
UNCOV
402
            return None
×
403

404
        try:
1✔
405
            real_phone = RealPhone.objects.get(user=self.user)
1✔
406
            relay_number = RelayNumber.objects.get(user=self.user)
1✔
407
        except RealPhone.DoesNotExist:
1✔
408
            return None
1✔
409
        except RelayNumber.DoesNotExist:
1✔
410
            return real_phone.verified_date
1✔
411
        return relay_number.created_at or real_phone.verified_date
1✔
412

413
    def add_subdomain(self, subdomain):
1✔
414
        # Handles if the subdomain is "" or None
415
        if not subdomain:
1✔
416
            raise CannotMakeSubdomainException(
1✔
417
                "error-subdomain-cannot-be-empty-or-null"
418
            )
419

420
        # subdomain must be all lowercase
421
        subdomain = subdomain.lower()
1✔
422

423
        if not self.has_premium:
1✔
424
            raise CannotMakeSubdomainException("error-premium-set-subdomain")
1✔
425
        if self.subdomain is not None:
1✔
426
            raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain")
1✔
427
        self.subdomain = subdomain
1✔
428
        # The validator defined in the subdomain field does not get run in full_clean()
429
        # when self.subdomain is "" or None, so we need to run the validator again to
430
        # catch these cases.
431
        valid_available_subdomain(subdomain)
1✔
432
        self.full_clean()
1✔
433
        self.save()
1✔
434

435
        RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain))
1✔
436
        return subdomain
1✔
437

438
    def update_abuse_metric(
1✔
439
        self,
440
        address_created=False,
441
        replied=False,
442
        email_forwarded=False,
443
        forwarded_email_size=0,
444
    ) -> datetime | None:
445
        # TODO MPP-3720: This should be wrapped in atomic or select_for_update to ensure
446
        # race conditions are properly handled.
447

448
        # look for abuse metrics created on the same UTC date, regardless of time.
449
        midnight_utc_today = datetime.combine(
1✔
450
            datetime.now(UTC).date(), datetime.min.time()
451
        ).astimezone(UTC)
452
        midnight_utc_tomorow = midnight_utc_today + timedelta(days=1)
1✔
453
        abuse_metric = self.user.abusemetrics_set.filter(
1✔
454
            first_recorded__gte=midnight_utc_today,
455
            first_recorded__lt=midnight_utc_tomorow,
456
        ).first()
457
        if not abuse_metric:
1✔
458
            abuse_metric = AbuseMetrics.objects.create(user=self.user)
1✔
459
            AbuseMetrics.objects.filter(first_recorded__lt=midnight_utc_today).delete()
1✔
460

461
        # increment the abuse metric
462
        if address_created:
1✔
463
            abuse_metric.num_address_created_per_day += 1
1✔
464
        if replied:
1✔
465
            abuse_metric.num_replies_per_day += 1
1✔
466
        if email_forwarded:
1✔
467
            abuse_metric.num_email_forwarded_per_day += 1
1✔
468
        if forwarded_email_size > 0:
1✔
469
            abuse_metric.forwarded_email_size_per_day += forwarded_email_size
1✔
470
        abuse_metric.last_recorded = datetime.now(UTC)
1✔
471
        abuse_metric.save()
1✔
472

473
        # check user should be flagged for abuse
474
        hit_max_create = False
1✔
475
        hit_max_replies = False
1✔
476
        hit_max_forwarded = False
1✔
477
        hit_max_forwarded_email_size = False
1✔
478

479
        hit_max_create = (
1✔
480
            abuse_metric.num_address_created_per_day
481
            >= settings.MAX_ADDRESS_CREATION_PER_DAY
482
        )
483
        hit_max_replies = (
1✔
484
            abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY
485
        )
486
        hit_max_forwarded = (
1✔
487
            abuse_metric.num_email_forwarded_per_day >= settings.MAX_FORWARDED_PER_DAY
488
        )
489
        hit_max_forwarded_email_size = (
1✔
490
            abuse_metric.forwarded_email_size_per_day
491
            >= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY
492
        )
493
        if (
1✔
494
            hit_max_create
495
            or hit_max_replies
496
            or hit_max_forwarded
497
            or hit_max_forwarded_email_size
498
        ):
499
            self.last_account_flagged = datetime.now(UTC)
1✔
500
            self.save()
1✔
501
            data = {
1✔
502
                "uid": self.fxa.uid if self.fxa else None,
503
                "flagged": self.last_account_flagged.timestamp(),
504
                "replies": abuse_metric.num_replies_per_day,
505
                "addresses": abuse_metric.num_address_created_per_day,
506
                "forwarded": abuse_metric.num_email_forwarded_per_day,
507
                "forwarded_size_in_bytes": abuse_metric.forwarded_email_size_per_day,
508
            }
509
            # log for further secops review
510
            abuse_logger.info("Abuse flagged", extra=data)
1✔
511
        return self.last_account_flagged
1✔
512

513
    @property
1✔
514
    def is_flagged(self):
1✔
515
        if not self.last_account_flagged:
1✔
516
            return False
1✔
517
        account_premium_feature_resumed = self.last_account_flagged + timedelta(
1✔
518
            days=settings.PREMIUM_FEATURE_PAUSED_DAYS
519
        )
520
        if datetime.now(UTC) > account_premium_feature_resumed:
1!
521
            # premium feature has been resumed
UNCOV
522
            return False
×
523
        # user was flagged and the premium feature pause period is not yet over
524
        return True
1✔
525

526
    @property
1✔
527
    def metrics_enabled(self) -> bool:
1✔
528
        """
529
        Does the user allow us to record technical and interaction data?
530

531
        This is based on the Mozilla accounts opt-out option, added around 2022. A user
532
        can go to their Mozilla account profile settings, Data Collection and Use, and
533
        deselect "Help improve Mozilla Account". This setting defaults to On, and is
534
        sent as "metricsEnabled". Some older Relay accounts do not have
535
        "metricsEnabled", and we default to On.
536
        """
537
        if self.fxa:
1✔
538
            return bool(self.fxa.extra_data.get("metricsEnabled", True))
1✔
539
        return True
1✔
540

541
    @property
1✔
542
    def plan(self) -> Literal["free", "email", "phone", "bundle"]:
1✔
543
        """The user's Relay plan as a string."""
544
        if self.has_premium:
1✔
545
            if self.has_phone:
1✔
546
                return "bundle" if self.has_vpn else "phone"
1✔
547
            else:
548
                return "email"
1✔
549
        else:
550
            return "free"
1✔
551

552
    @property
1✔
553
    def plan_term(self) -> Literal[None, "unknown", "1_month", "1_year"]:
1✔
554
        """The user's Relay plan term as a string."""
555
        plan = self.plan
1✔
556
        if plan == "free":
1✔
557
            return None
1✔
558
        if plan == "phone":
1✔
559
            start_date = self.date_phone_subscription_start
1✔
560
            end_date = self.date_phone_subscription_end
1✔
561
            if start_date and end_date:
1✔
562
                span = end_date - start_date
1✔
563
                return "1_year" if span.days > 32 else "1_month"
1✔
564
        return "unknown"
1✔
565

566
    @property
1✔
567
    def metrics_premium_status(self) -> str:
1✔
568
        plan = self.plan
1✔
569
        if plan == "free":
1✔
570
            return "free"
1✔
571
        return f"{plan}_{self.plan_term}"
1✔
572

573

574
@receiver(models.signals.post_save, sender=Profile)
1✔
575
def copy_auth_token(sender, instance=None, created=False, **kwargs):
1✔
576
    if created:
1✔
577
        # baker triggers created during tests
578
        # so first check the user doesn't already have a Token
579
        try:
1✔
580
            Token.objects.get(user=instance.user)
1✔
581
            return
1✔
582
        except Token.DoesNotExist:
1✔
583
            Token.objects.create(user=instance.user, key=instance.api_token)
1✔
584

585

586
def address_hash(address, subdomain=None, domain=None):
1✔
587
    if not domain:
1✔
588
        domain = get_domains_from_settings()["MOZMAIL_DOMAIN"]
1✔
589
    if subdomain:
1✔
590
        return sha256(f"{address}@{subdomain}.{domain}".encode()).hexdigest()
1✔
591
    if domain == settings.RELAY_FIREFOX_DOMAIN:
1✔
592
        return sha256(f"{address}".encode()).hexdigest()
1✔
593
    return sha256(f"{address}@{domain}".encode()).hexdigest()
1✔
594

595

596
def address_default():
1✔
597
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=9))
1✔
598

599

600
def has_bad_words(value) -> bool:
1✔
601
    for badword in emails_config().badwords:
1✔
602
        badword = badword.strip()
1✔
603
        if len(badword) <= 4 and badword == value:
1✔
604
            return True
1✔
605
        if len(badword) > 4 and badword in value:
1✔
606
            return True
1✔
607
    return False
1✔
608

609

610
def is_blocklisted(value: str) -> bool:
1✔
611
    return any(blockedword == value for blockedword in emails_config().blocklist)
1✔
612

613

614
def get_domain_numerical(domain_address):
1✔
615
    # get domain name from the address
616
    domains = get_domains_from_settings()
1✔
617
    domains_keys = list(domains.keys())
1✔
618
    domains_values = list(domains.values())
1✔
619
    domain_name = domains_keys[domains_values.index(domain_address)]
1✔
620
    # get domain numerical value from domain name
621
    choices = dict(DOMAIN_CHOICES)
1✔
622
    choices_keys = list(choices.keys())
1✔
623
    choices_values = list(choices.values())
1✔
624
    return choices_keys[choices_values.index(domain_name)]
1✔
625

626

627
def hash_subdomain(subdomain, domain=settings.MOZMAIL_DOMAIN):
1✔
628
    return sha256(f"{subdomain}.{domain}".encode()).hexdigest()
1✔
629

630

631
class RegisteredSubdomain(models.Model):
1✔
632
    subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True)
1✔
633
    registered_at = models.DateTimeField(auto_now_add=True)
1✔
634

635
    def __str__(self):
1✔
UNCOV
636
        return self.subdomain_hash
×
637

638

639
class CannotMakeSubdomainException(BadRequest):
1✔
640
    """Exception raised by Profile due to error on subdomain creation.
641

642
    Attributes:
643
        message -- optional explanation of the error
644
    """
645

646
    def __init__(self, message=None):
1✔
647
        self.message = message
1✔
648

649

650
class CannotMakeAddressException(RelayAPIException):
1✔
651
    """Base exception for RelayAddress or DomainAddress creation failure."""
652

653

654
class AccountIsPausedException(CannotMakeAddressException):
1✔
655
    default_code = "account_is_paused"
1✔
656
    default_detail = "Your account is on pause."
1✔
657
    status_code = 403
1✔
658

659

660
class RelayAddrFreeTierLimitException(CannotMakeAddressException):
1✔
661
    default_code = "free_tier_limit"
1✔
662
    default_detail_template = (
1✔
663
        "You’ve used all {free_tier_limit} email masks included with your free account."
664
        " You can reuse an existing mask, but using a unique mask for each account is"
665
        " the most secure option."
666
    )
667
    status_code = 403
1✔
668

669
    def __init__(self, free_tier_limit: int | None = None, *args, **kwargs):
1✔
670
        self.free_tier_limit = free_tier_limit or settings.MAX_NUM_FREE_ALIASES
1✔
671
        super().__init__(*args, **kwargs)
1✔
672

673
    def error_context(self) -> ErrorContextType:
1✔
674
        return {"free_tier_limit": self.free_tier_limit}
1✔
675

676

677
class DomainAddrFreeTierException(CannotMakeAddressException):
1✔
678
    default_code = "free_tier_no_subdomain_masks"
1✔
679
    default_detail = (
1✔
680
        "Your free account does not include custom subdomains for masks."
681
        " To create custom masks, upgrade to Relay Premium."
682
    )
683
    status_code = 403
1✔
684

685

686
class DomainAddrNeedSubdomainException(CannotMakeAddressException):
1✔
687
    default_code = "need_subdomain"
1✔
688
    default_detail = "Please select a subdomain before creating a custom email address."
1✔
689
    status_code = 400
1✔
690

691

692
class DomainAddrUpdateException(CannotMakeAddressException):
1✔
693
    """Exception raised when attempting to edit an existing domain address field."""
694

695
    default_code = "address_not_editable"
1✔
696
    default_detail = "You cannot edit an existing domain address field."
1✔
697
    status_code = 400
1✔
698

699

700
class DomainAddrUnavailableException(CannotMakeAddressException):
1✔
701
    default_code = "address_unavailable"
1✔
702
    default_detail_template = (
1✔
703
        "“{unavailable_address}” could not be created."
704
        " Please try again with a different mask name."
705
    )
706
    status_code = 400
1✔
707

708
    def __init__(self, unavailable_address: str, *args, **kwargs):
1✔
709
        self.unavailable_address = unavailable_address
1✔
710
        super().__init__(*args, **kwargs)
1✔
711

712
    def error_context(self) -> ErrorContextType:
1✔
713
        return {"unavailable_address": self.unavailable_address}
1✔
714

715

716
class DomainAddrDuplicateException(CannotMakeAddressException):
1✔
717
    default_code = "duplicate_address"
1✔
718
    default_detail_template = (
1✔
719
        "“{duplicate_address}” already exists."
720
        " Please try again with a different mask name."
721
    )
722
    status_code = 409
1✔
723

724
    def __init__(self, duplicate_address: str, *args, **kwargs):
1✔
725
        self.duplicate_address = duplicate_address
1✔
726
        super().__init__(*args, **kwargs)
1✔
727

728
    def error_context(self) -> ErrorContextType:
1✔
729
        return {"duplicate_address": self.duplicate_address}
1✔
730

731

732
class RelayAddress(models.Model):
1✔
733
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
734
    address = models.CharField(max_length=64, default=address_default, unique=True)
1✔
735
    domain = models.PositiveSmallIntegerField(
1✔
736
        choices=DOMAIN_CHOICES, default=default_domain_numerical
737
    )
738
    enabled = models.BooleanField(default=True)
1✔
739
    description = models.CharField(max_length=64, blank=True)
1✔
740
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
741
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
742
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
743
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
744
    num_blocked = models.PositiveIntegerField(default=0)
1✔
745
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
746
    num_replied = models.PositiveIntegerField(default=0)
1✔
747
    num_spam = models.PositiveIntegerField(default=0)
1✔
748
    generated_for = models.CharField(max_length=255, blank=True)
1✔
749
    block_list_emails = models.BooleanField(default=False)
1✔
750
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
751

752
    class Meta:
1✔
753
        indexes = [
1✔
754
            # Find when a user first used the add-on
755
            models.Index(
756
                name="idx_ra_created_by_addon",
757
                fields=["user"],
758
                condition=~models.Q(generated_for__exact=""),
759
                include=["created_at"],
760
            ),
761
        ]
762

763
    def __str__(self):
1✔
UNCOV
764
        return self.address
×
765

766
    def delete(self, *args, **kwargs):
1✔
767
        # TODO: create hard bounce receipt rule in AWS for the address
768
        deleted_address = DeletedAddress.objects.create(
1✔
769
            address_hash=address_hash(self.address, domain=self.domain_value),
770
            num_forwarded=self.num_forwarded,
771
            num_blocked=self.num_blocked,
772
            num_replied=self.num_replied,
773
            num_spam=self.num_spam,
774
        )
775
        deleted_address.save()
1✔
776
        profile = Profile.objects.get(user=self.user)
1✔
777
        profile.address_last_deleted = datetime.now(UTC)
1✔
778
        profile.num_address_deleted += 1
1✔
779
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
780
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
781
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
782
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
783
        ) + (self.num_level_one_trackers_blocked or 0)
784
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
785
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
786
        profile.num_deleted_relay_addresses += 1
1✔
787
        profile.last_engagement = datetime.now(UTC)
1✔
788
        profile.save()
1✔
789
        return super().delete(*args, **kwargs)
1✔
790

791
    def save(
1✔
792
        self,
793
        force_insert: bool = False,
794
        force_update: bool = False,
795
        using: str | None = None,
796
        update_fields: Iterable[str] | None = None,
797
    ) -> None:
798
        if self._state.adding:
1✔
799
            with transaction.atomic():
1✔
800
                locked_profile = Profile.objects.select_for_update().get(user=self.user)
1✔
801
                check_user_can_make_another_address(locked_profile)
1✔
802
                while True:
1✔
803
                    address_is_allowed = not is_blocklisted(self.address)
1✔
804
                    address_is_valid = valid_address(self.address, self.domain_value)
1✔
805
                    if address_is_valid and address_is_allowed:
1✔
806
                        break
1✔
807
                    self.address = address_default()
1✔
808
                locked_profile.update_abuse_metric(address_created=True)
1✔
809
                locked_profile.last_engagement = datetime.now(UTC)
1✔
810
                locked_profile.save()
1✔
811
        if (not self.user.profile.server_storage) and any(
1✔
812
            (self.description, self.generated_for, self.used_on)
813
        ):
814
            self.description = ""
1✔
815
            self.generated_for = ""
1✔
816
            self.used_on = ""
1✔
817
            if update_fields is not None:
1✔
818
                update_fields = {"description", "generated_for", "used_on"}.union(
1✔
819
                    update_fields
820
                )
821
        if not self.user.profile.has_premium and self.block_list_emails:
1✔
822
            self.block_list_emails = False
1✔
823
            if update_fields is not None:
1✔
824
                update_fields = {"block_list_emails"}.union(update_fields)
1✔
825
        super().save(
1✔
826
            force_insert=force_insert,
827
            force_update=force_update,
828
            using=using,
829
            update_fields=update_fields,
830
        )
831

832
    @property
1✔
833
    def domain_value(self):
1✔
834
        return get_domains_from_settings().get(self.get_domain_display())
1✔
835

836
    @property
1✔
837
    def full_address(self):
1✔
838
        return f"{self.address}@{self.domain_value}"
1✔
839

840
    @property
1✔
841
    def metrics_id(self) -> str:
1✔
842
        assert self.id
1✔
843
        # Prefix with 'R' for RelayAddress, since there may be a DomainAddress with the
844
        # same row ID
845
        return f"R{self.id}"
1✔
846

847

848
def check_user_can_make_another_address(profile: Profile) -> None:
1✔
849
    if profile.is_flagged:
1✔
850
        raise AccountIsPausedException()
1✔
851
    # MPP-3021: return early for premium users to avoid at_max_free_aliases DB query
852
    if profile.has_premium:
1✔
853
        return
1✔
854
    if profile.at_max_free_aliases:
1✔
855
        raise RelayAddrFreeTierLimitException()
1✔
856

857

858
def valid_address_pattern(address):
1✔
859
    #   can't start or end with a hyphen
860
    #   must be 1-63 lowercase alphanumeric characters and/or hyphens
861
    valid_address_pattern = re.compile("^(?![-.])[a-z0-9-.]{1,63}(?<![-.])$")
1✔
862
    return valid_address_pattern.match(address) is not None
1✔
863

864

865
def valid_address(address: str, domain: str, subdomain: str | None = None) -> bool:
1✔
866
    address_pattern_valid = valid_address_pattern(address)
1✔
867
    address_contains_badword = has_bad_words(address)
1✔
868
    address_already_deleted = 0
1✔
869
    if not subdomain or flag_is_active_in_task(
1✔
870
        "custom_domain_management_redesign", None
871
    ):
872
        address_already_deleted = DeletedAddress.objects.filter(
1✔
873
            address_hash=address_hash(address, domain=domain, subdomain=subdomain)
874
        ).count()
875
    if (
1✔
876
        address_already_deleted > 0
877
        or address_contains_badword
878
        or not address_pattern_valid
879
    ):
880
        return False
1✔
881
    return True
1✔
882

883

884
class DeletedAddress(models.Model):
1✔
885
    address_hash = models.CharField(max_length=64, db_index=True)
1✔
886
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
887
    num_blocked = models.PositiveIntegerField(default=0)
1✔
888
    num_replied = models.PositiveIntegerField(default=0)
1✔
889
    num_spam = models.PositiveIntegerField(default=0)
1✔
890

891
    def __str__(self):
1✔
UNCOV
892
        return self.address_hash
×
893

894

895
def check_user_can_make_domain_address(user_profile: Profile) -> None:
1✔
896
    if not user_profile.has_premium:
1✔
897
        raise DomainAddrFreeTierException()
1✔
898

899
    if not user_profile.subdomain:
1✔
900
        raise DomainAddrNeedSubdomainException()
1✔
901

902
    if user_profile.is_flagged:
1✔
903
        raise AccountIsPausedException()
1✔
904

905

906
class DomainAddress(models.Model):
1✔
907
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
908
    address = models.CharField(
1✔
909
        max_length=64, validators=[MinLengthValidator(limit_value=1)]
910
    )
911
    enabled = models.BooleanField(default=True)
1✔
912
    description = models.CharField(max_length=64, blank=True)
1✔
913
    domain = models.PositiveSmallIntegerField(choices=DOMAIN_CHOICES, default=2)
1✔
914
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
915
    first_emailed_at = models.DateTimeField(null=True, db_index=True)
1✔
916
    last_modified_at = models.DateTimeField(auto_now=True, db_index=True)
1✔
917
    last_used_at = models.DateTimeField(blank=True, null=True)
1✔
918
    num_forwarded = models.PositiveIntegerField(default=0)
1✔
919
    num_blocked = models.PositiveIntegerField(default=0)
1✔
920
    num_level_one_trackers_blocked = models.PositiveIntegerField(default=0, null=True)
1✔
921
    num_replied = models.PositiveIntegerField(default=0)
1✔
922
    num_spam = models.PositiveIntegerField(default=0)
1✔
923
    block_list_emails = models.BooleanField(default=False)
1✔
924
    used_on = models.TextField(default=None, blank=True, null=True)
1✔
925

926
    class Meta:
1✔
927
        unique_together = ["user", "address"]
1✔
928

929
    def __str__(self):
1✔
UNCOV
930
        return self.address
×
931

932
    def save(
1✔
933
        self,
934
        force_insert: bool = False,
935
        force_update: bool = False,
936
        using: str | None = None,
937
        update_fields: Iterable[str] | None = None,
938
    ) -> None:
939
        user_profile = self.user.profile
1✔
940
        if self._state.adding:
1✔
941
            check_user_can_make_domain_address(user_profile)
1✔
942
            domain_address_valid = valid_address(
1✔
943
                self.address, self.domain_value, user_profile.subdomain
944
            )
945
            if not domain_address_valid:
1✔
946
                if self.first_emailed_at:
1!
UNCOV
947
                    incr_if_enabled("domainaddress.create_via_email_fail")
×
948
                raise DomainAddrUnavailableException(unavailable_address=self.address)
1✔
949

950
            if DomainAddress.objects.filter(
1✔
951
                user=self.user, address=self.address
952
            ).exists():
953
                raise DomainAddrDuplicateException(duplicate_address=self.address)
1✔
954

955
            user_profile.update_abuse_metric(address_created=True)
1✔
956
            user_profile.last_engagement = datetime.now(UTC)
1✔
957
            user_profile.save(update_fields=["last_engagement"])
1✔
958
            incr_if_enabled("domainaddress.create")
1✔
959
            if self.first_emailed_at:
1✔
960
                incr_if_enabled("domainaddress.create_via_email")
1✔
961
        else:
962
            # The model is in an update state, do not allow 'address' field updates
963
            existing_instance = DomainAddress.objects.get(id=self.id)
1✔
964
            if existing_instance.address != self.address:
1✔
965
                raise DomainAddrUpdateException()
1✔
966

967
        if not user_profile.has_premium and self.block_list_emails:
1✔
968
            self.block_list_emails = False
1✔
969
            if update_fields:
1✔
970
                update_fields = {"block_list_emails"}.union(update_fields)
1✔
971
        if (not user_profile.server_storage) and (self.description or self.used_on):
1✔
972
            self.description = ""
1✔
973
            self.used_on = ""
1✔
974
            if update_fields:
1✔
975
                update_fields = {"description", "used_on"}.union(update_fields)
1✔
976
        super().save(
1✔
977
            force_insert=force_insert,
978
            force_update=force_update,
979
            using=using,
980
            update_fields=update_fields,
981
        )
982

983
    @property
1✔
984
    def user_profile(self):
1✔
985
        return Profile.objects.get(user=self.user)
1✔
986

987
    @staticmethod
1✔
988
    def make_domain_address(
1✔
989
        user_profile: Profile, address: str | None = None, made_via_email: bool = False
990
    ) -> DomainAddress:
991
        check_user_can_make_domain_address(user_profile)
1✔
992

993
        if not address:
1✔
994
            # FIXME: if the alias is randomly generated and has bad words
995
            # we should retry like make_relay_address does
996
            # not fixing this now because not sure randomly generated
997
            # DomainAlias will be a feature
998
            address = address_default()
1✔
999
            # Only check for bad words if randomly generated
1000
        assert isinstance(address, str)
1✔
1001

1002
        first_emailed_at = datetime.now(UTC) if made_via_email else None
1✔
1003
        domain_address = DomainAddress.objects.create(
1✔
1004
            user=user_profile.user, address=address, first_emailed_at=first_emailed_at
1005
        )
1006
        return domain_address
1✔
1007

1008
    def delete(self, *args, **kwargs):
1✔
1009
        # TODO: create hard bounce receipt rule in AWS for the address
1010
        deleted_address = DeletedAddress.objects.create(
1✔
1011
            address_hash=address_hash(
1012
                self.address, self.user_profile.subdomain, self.domain_value
1013
            ),
1014
            num_forwarded=self.num_forwarded,
1015
            num_blocked=self.num_blocked,
1016
            num_replied=self.num_replied,
1017
            num_spam=self.num_spam,
1018
        )
1019
        deleted_address.save()
1✔
1020
        # self.user_profile is a property and should not be used to
1021
        # update values on the user's profile
1022
        profile = Profile.objects.get(user=self.user)
1✔
1023
        profile.address_last_deleted = datetime.now(UTC)
1✔
1024
        profile.num_address_deleted += 1
1✔
1025
        profile.num_email_forwarded_in_deleted_address += self.num_forwarded
1✔
1026
        profile.num_email_blocked_in_deleted_address += self.num_blocked
1✔
1027
        profile.num_level_one_trackers_blocked_in_deleted_address = (
1✔
1028
            profile.num_level_one_trackers_blocked_in_deleted_address or 0
1029
        ) + (self.num_level_one_trackers_blocked or 0)
1030
        profile.num_email_replied_in_deleted_address += self.num_replied
1✔
1031
        profile.num_email_spam_in_deleted_address += self.num_spam
1✔
1032
        profile.num_deleted_domain_addresses += 1
1✔
1033
        profile.last_engagement = datetime.now(UTC)
1✔
1034
        profile.save()
1✔
1035
        return super().delete(*args, **kwargs)
1✔
1036

1037
    @property
1✔
1038
    def domain_value(self):
1✔
1039
        return get_domains_from_settings().get(self.get_domain_display())
1✔
1040

1041
    @property
1✔
1042
    def full_address(self):
1✔
1043
        return f"{self.address}@{self.user_profile.subdomain}.{self.domain_value}"
1✔
1044

1045
    @property
1✔
1046
    def metrics_id(self) -> str:
1✔
1047
        assert self.id
1✔
1048
        # Prefix with 'D' for DomainAddress, since there may be a RelayAddress with the
1049
        # same row ID
1050
        return f"D{self.id}"
1✔
1051

1052

1053
class Reply(models.Model):
1✔
1054
    relay_address = models.ForeignKey(
1✔
1055
        RelayAddress, on_delete=models.CASCADE, blank=True, null=True
1056
    )
1057
    domain_address = models.ForeignKey(
1✔
1058
        DomainAddress, on_delete=models.CASCADE, blank=True, null=True
1059
    )
1060
    lookup = models.CharField(max_length=255, blank=False, db_index=True)
1✔
1061
    encrypted_metadata = models.TextField(blank=False)
1✔
1062
    created_at = models.DateField(auto_now_add=True, null=False, db_index=True)
1✔
1063

1064
    @property
1✔
1065
    def address(self):
1✔
1066
        return self.relay_address or self.domain_address
1✔
1067

1068
    @property
1✔
1069
    def profile(self):
1✔
1070
        return self.address.user.profile
1✔
1071

1072
    @property
1✔
1073
    def owner_has_premium(self):
1✔
1074
        return self.profile.has_premium
1✔
1075

1076
    def increment_num_replied(self):
1✔
1077
        address = self.relay_address or self.domain_address
1✔
1078
        assert address
1✔
1079
        address.num_replied += 1
1✔
1080
        address.last_used_at = datetime.now(UTC)
1✔
1081
        address.save(update_fields=["num_replied", "last_used_at"])
1✔
1082
        return address.num_replied
1✔
1083

1084

1085
class AbuseMetrics(models.Model):
1✔
1086
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
1087
    first_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
1088
    last_recorded = models.DateTimeField(auto_now_add=True, db_index=True)
1✔
1089
    num_address_created_per_day = models.PositiveSmallIntegerField(default=0)
1✔
1090
    num_replies_per_day = models.PositiveSmallIntegerField(default=0)
1✔
1091
    # Values from 0 to 32767 are safe in all databases supported by Django.
1092
    num_email_forwarded_per_day = models.PositiveSmallIntegerField(default=0)
1✔
1093
    # Values from 0 to 9.2 exabytes are safe in all databases supported by Django.
1094
    forwarded_email_size_per_day = models.PositiveBigIntegerField(default=0)
1✔
1095

1096
    class Meta:
1✔
1097
        unique_together = ["user", "first_recorded"]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc