• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 11e2fb5d-e61e-4d95-95a8-c8254f107594

24 Jun 2024 02:48PM CUT coverage: 85.351% (-0.009%) from 85.36%
11e2fb5d-e61e-4d95-95a8-c8254f107594

push

circleci

web-flow
Merge pull request #4817 from mozilla/dependabot/npm_and_yarn/stylelint-scss-6.3.2

Bump stylelint-scss from 6.3.1 to 6.3.2

3980 of 5113 branches covered (77.84%)

Branch coverage included in aggregate %.

15737 of 17988 relevant lines covered (87.49%)

10.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.04
/emails/tests/models_tests.py
1
import random
1✔
2
from datetime import UTC, datetime, timedelta
1✔
3
from hashlib import sha256
1✔
4
from unittest import skip
1✔
5
from unittest.mock import Mock, patch
1✔
6
from uuid import uuid4
1✔
7

8
from django.conf import settings
1✔
9
from django.contrib.auth.models import User
1✔
10
from django.test import TestCase, override_settings
1✔
11

12
import pytest
1✔
13
from allauth.socialaccount.models import SocialAccount
1✔
14
from model_bakery import baker
1✔
15
from waffle.testutils import override_flag
1✔
16

17
from ..exceptions import (
1✔
18
    CannotMakeAddressException,
19
    CannotMakeSubdomainException,
20
    DomainAddrDuplicateException,
21
    DomainAddrUnavailableException,
22
)
23
from ..models import (
1✔
24
    AbuseMetrics,
25
    DeletedAddress,
26
    DomainAddress,
27
    Profile,
28
    RelayAddress,
29
    address_hash,
30
    get_domain_numerical,
31
)
32
from ..utils import get_domains_from_settings
1✔
33

34
if settings.PHONES_ENABLED:
1!
35
    from phones.models import RealPhone, RelayNumber
1✔
36

37

38
def make_free_test_user(email: str = "") -> User:
1✔
39
    if email:
1✔
40
        user = baker.make(User, email=email)
1✔
41
    else:
42
        user = baker.make(User)
1✔
43
    user.profile.server_storage = True
1✔
44
    user.profile.save()
1✔
45
    baker.make(
1✔
46
        SocialAccount,
47
        user=user,
48
        uid=str(uuid4()),
49
        provider="fxa",
50
        extra_data={"avatar": "avatar.png"},
51
    )
52
    return user
1✔
53

54

55
def make_premium_test_user() -> User:
1✔
56
    premium_user = baker.make(User, email="premium@email.com")
1✔
57
    premium_user.profile.server_storage = True
1✔
58
    premium_user.profile.date_subscribed = datetime.now(tz=UTC)
1✔
59
    premium_user.profile.save()
1✔
60
    upgrade_test_user_to_premium(premium_user)
1✔
61
    return premium_user
1✔
62

63

64
def make_storageless_test_user() -> User:
1✔
65
    storageless_user = baker.make(User)
1✔
66
    storageless_user_profile = storageless_user.profile
1✔
67
    storageless_user_profile.server_storage = False
1✔
68
    storageless_user_profile.subdomain = "mydomain"
1✔
69
    storageless_user_profile.date_subscribed = datetime.now(tz=UTC)
1✔
70
    storageless_user_profile.save()
1✔
71
    upgrade_test_user_to_premium(storageless_user)
1✔
72
    return storageless_user
1✔
73

74

75
def premium_subscription() -> str:
1✔
76
    """Return a Mozilla account subscription that provides unlimited emails"""
77
    assert settings.SUBSCRIPTIONS_WITH_UNLIMITED
1✔
78
    premium_only_plans = list(
1✔
79
        set(settings.SUBSCRIPTIONS_WITH_UNLIMITED)
80
        - set(settings.SUBSCRIPTIONS_WITH_PHONE)
81
        - set(settings.SUBSCRIPTIONS_WITH_VPN)
82
    )
83
    assert premium_only_plans
1✔
84
    return random.choice(premium_only_plans)
1✔
85

86

87
def upgrade_test_user_to_premium(user):
1✔
88
    random_sub = premium_subscription()
1✔
89
    baker.make(
1✔
90
        SocialAccount,
91
        user=user,
92
        uid=str(uuid4()),
93
        provider="fxa",
94
        extra_data={"avatar": "avatar.png", "subscriptions": [random_sub]},
95
    )
96
    return user
1✔
97

98

99
def phone_subscription() -> str:
1✔
100
    """Return a Mozilla account subscription that provides a phone mask"""
101
    assert settings.SUBSCRIPTIONS_WITH_PHONE
1✔
102
    phones_only_plans = list(
1✔
103
        set(settings.SUBSCRIPTIONS_WITH_PHONE)
104
        - set(settings.SUBSCRIPTIONS_WITH_VPN)
105
        - set(settings.SUBSCRIPTIONS_WITH_UNLIMITED)
106
    )
107
    assert phones_only_plans
1✔
108
    return random.choice(phones_only_plans)
1✔
109

110

111
def vpn_subscription() -> str:
1✔
112
    """Return a Mozilla account subscription that provides the VPN"""
113
    assert settings.SUBSCRIPTIONS_WITH_VPN
1✔
114
    vpn_only_plans = list(
1✔
115
        set(settings.SUBSCRIPTIONS_WITH_VPN)
116
        - set(settings.SUBSCRIPTIONS_WITH_PHONE)
117
        - set(settings.SUBSCRIPTIONS_WITH_UNLIMITED)
118
    )
119
    assert vpn_only_plans
1✔
120
    return random.choice(vpn_only_plans)
1✔
121

122

123
class AddressHashTest(TestCase):
1✔
124
    @override_settings(RELAY_FIREFOX_DOMAIN="firefox.com")
1✔
125
    def test_address_hash_without_subdomain_domain_firefox(self):
1✔
126
        address = "aaaaaaaaa"
1✔
127
        expected_hash = sha256(f"{address}".encode()).hexdigest()
1✔
128
        assert address_hash(address, domain="firefox.com") == expected_hash
1✔
129

130
    @override_settings(RELAY_FIREFOX_DOMAIN="firefox.com")
1✔
131
    def test_address_hash_without_subdomain_domain_not_firefoxz(self):
1✔
132
        non_default = "test.com"
1✔
133
        address = "aaaaaaaaa"
1✔
134
        expected_hash = sha256(f"{address}@{non_default}".encode()).hexdigest()
1✔
135
        assert address_hash(address, domain=non_default) == expected_hash
1✔
136

137
    def test_address_hash_with_subdomain(self):
1✔
138
        address = "aaaaaaaaa"
1✔
139
        subdomain = "test"
1✔
140
        domain = get_domains_from_settings().get("MOZMAIL_DOMAIN")
1✔
141
        expected_hash = sha256(f"{address}@{subdomain}.{domain}".encode()).hexdigest()
1✔
142
        assert address_hash(address, subdomain, domain) == expected_hash
1✔
143

144
    def test_address_hash_with_additional_domain(self):
1✔
145
        address = "aaaaaaaaa"
1✔
146
        test_domain = "test.com"
1✔
147
        expected_hash = sha256(f"{address}@{test_domain}".encode()).hexdigest()
1✔
148
        assert address_hash(address, domain=test_domain) == expected_hash
1✔
149

150

151
class GetDomainNumericalTest(TestCase):
1✔
152
    def test_get_domain_numerical(self):
1✔
153
        assert get_domain_numerical("default.com") == 1
1✔
154
        assert get_domain_numerical("test.com") == 2
1✔
155

156

157
class RelayAddressTest(TestCase):
1✔
158
    def setUp(self):
1✔
159
        self.user = make_free_test_user()
1✔
160
        self.user_profile = self.user.profile
1✔
161
        self.premium_user = make_premium_test_user()
1✔
162
        self.premium_user_profile = self.premium_user.profile
1✔
163
        self.storageless_user = make_storageless_test_user()
1✔
164

165
    def test_create_assigns_to_user(self):
1✔
166
        relay_address = RelayAddress.objects.create(user=self.user_profile.user)
1✔
167
        assert relay_address.user == self.user_profile.user
1✔
168

169
    @override_settings(MAX_NUM_FREE_ALIASES=5, MAX_ADDRESS_CREATION_PER_DAY=10)
1✔
170
    def test_create_has_limit(self) -> None:
1✔
171
        baker.make(
1✔
172
            RelayAddress,
173
            user=self.premium_user,
174
            _quantity=settings.MAX_ADDRESS_CREATION_PER_DAY,
175
        )
176
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
177
            RelayAddress.objects.create(user=self.premium_user)
1✔
178
        assert exc_info.value.get_codes() == "account_is_paused"
1✔
179
        relay_address_count = RelayAddress.objects.filter(
1✔
180
            user=self.premium_user_profile.user
181
        ).count()
182
        assert relay_address_count == 10
1✔
183

184
    def test_create_premium_user_can_exceed_free_limit(self):
1✔
185
        baker.make(
1✔
186
            RelayAddress,
187
            user=self.premium_user,
188
            _quantity=settings.MAX_NUM_FREE_ALIASES + 1,
189
        )
190
        relay_addresses = RelayAddress.objects.filter(
1✔
191
            user=self.premium_user
192
        ).values_list("address", flat=True)
193
        assert len(relay_addresses) == settings.MAX_NUM_FREE_ALIASES + 1
1✔
194

195
    def test_create_non_premium_user_cannot_pass_free_limit(self) -> None:
1✔
196
        baker.make(
1✔
197
            RelayAddress, user=self.user, _quantity=settings.MAX_NUM_FREE_ALIASES
198
        )
199
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
200
            RelayAddress.objects.create(user=self.user_profile.user)
1✔
201
        assert exc_info.value.get_codes() == "free_tier_limit"
1✔
202
        relay_addresses = RelayAddress.objects.filter(
1✔
203
            user=self.user_profile.user
204
        ).values_list("address", flat=True)
205
        assert len(relay_addresses) == settings.MAX_NUM_FREE_ALIASES
1✔
206

207
    @skip(reason="ignore test for code path that we don't actually use")
1✔
208
    def test_create_with_specified_domain(self):
1✔
209
        relay_address = RelayAddress.objects.create(
×
210
            user=self.user_profile.user, domain=2
211
        )
212
        assert relay_address.domain == 2
×
213
        assert relay_address.get_domain_display() == "MOZMAIL_DOMAIN"
×
214
        assert relay_address.domain_value == "test.com"
×
215

216
    def test_create_updates_profile_last_engagement(self) -> None:
1✔
217
        relay_address = baker.make(RelayAddress, user=self.user, enabled=True)
1✔
218
        profile = relay_address.user.profile
1✔
219
        profile.refresh_from_db()
1✔
220
        assert profile.last_engagement
1✔
221
        pre_create_last_engagement = profile.last_engagement
1✔
222

223
        baker.make(RelayAddress, user=self.user, enabled=True)
1✔
224

225
        profile.refresh_from_db()
1✔
226
        assert profile.last_engagement > pre_create_last_engagement
1✔
227

228
    def test_save_does_not_update_profile_last_engagement(self) -> None:
1✔
229
        relay_address = baker.make(RelayAddress, user=self.user, enabled=True)
1✔
230
        profile = relay_address.user.profile
1✔
231
        profile.refresh_from_db()
1✔
232
        assert profile.last_engagement
1✔
233
        pre_save_last_engagement = profile.last_engagement
1✔
234

235
        relay_address.enabled = False
1✔
236
        relay_address.save()
1✔
237

238
        profile.refresh_from_db()
1✔
239
        assert profile.last_engagement == pre_save_last_engagement
1✔
240

241
    def test_delete_updates_profile_last_engagement(self) -> None:
1✔
242
        relay_address = baker.make(RelayAddress, user=self.user)
1✔
243
        profile = relay_address.user.profile
1✔
244
        profile.refresh_from_db()
1✔
245
        assert profile.last_engagement
1✔
246
        pre_delete_last_engagement = profile.last_engagement
1✔
247

248
        relay_address.delete()
1✔
249

250
        profile.refresh_from_db()
1✔
251
        assert profile.last_engagement > pre_delete_last_engagement
1✔
252

253
    def test_delete_adds_deleted_address_object(self):
1✔
254
        relay_address = baker.make(RelayAddress, user=self.user)
1✔
255
        address_hash = sha256(relay_address.full_address.encode("utf-8")).hexdigest()
1✔
256
        relay_address.delete()
1✔
257
        deleted_count = DeletedAddress.objects.filter(address_hash=address_hash).count()
1✔
258
        assert deleted_count == 1
1✔
259

260
    def test_delete_mozmail_deleted_address_object(self):
1✔
261
        relay_address = baker.make(RelayAddress, domain=2, user=self.user)
1✔
262
        address_hash = sha256(
1✔
263
            f"{relay_address.address}@{relay_address.domain_value}".encode()
264
        ).hexdigest()
265
        relay_address.delete()
1✔
266
        deleted_count = DeletedAddress.objects.filter(address_hash=address_hash).count()
1✔
267
        assert deleted_count == 1
1✔
268

269
    def test_delete_increments_values_on_profile(self):
1✔
270
        assert self.premium_user_profile.num_address_deleted == 0
1✔
271
        assert self.premium_user_profile.num_email_forwarded_in_deleted_address == 0
1✔
272
        assert self.premium_user_profile.num_email_blocked_in_deleted_address == 0
1✔
273
        assert (
1✔
274
            self.premium_user_profile.num_level_one_trackers_blocked_in_deleted_address
275
            == 0
276
        )
277
        assert self.premium_user_profile.num_email_replied_in_deleted_address == 0
1✔
278
        assert self.premium_user_profile.num_email_spam_in_deleted_address == 0
1✔
279
        assert self.premium_user_profile.num_deleted_relay_addresses == 0
1✔
280
        assert self.premium_user_profile.num_deleted_domain_addresses == 0
1✔
281

282
        relay_address = baker.make(
1✔
283
            RelayAddress,
284
            user=self.premium_user,
285
            num_forwarded=2,
286
            num_blocked=3,
287
            num_level_one_trackers_blocked=4,
288
            num_replied=5,
289
            num_spam=6,
290
        )
291
        relay_address.delete()
1✔
292

293
        self.premium_user_profile.refresh_from_db()
1✔
294
        assert self.premium_user_profile.num_address_deleted == 1
1✔
295
        assert self.premium_user_profile.num_email_forwarded_in_deleted_address == 2
1✔
296
        assert self.premium_user_profile.num_email_blocked_in_deleted_address == 3
1✔
297
        assert (
1✔
298
            self.premium_user_profile.num_level_one_trackers_blocked_in_deleted_address
299
            == 4
300
        )
301
        assert self.premium_user_profile.num_email_replied_in_deleted_address == 5
1✔
302
        assert self.premium_user_profile.num_email_spam_in_deleted_address == 6
1✔
303
        assert self.premium_user_profile.num_deleted_relay_addresses == 1
1✔
304
        assert self.premium_user_profile.num_deleted_domain_addresses == 0
1✔
305

306
    def test_relay_address_create_repeats_deleted_address_invalid(self):
1✔
307
        user = baker.make(User)
1✔
308
        address = "random-address"
1✔
309
        relay_address = RelayAddress.objects.create(user=user, address=address)
1✔
310
        relay_address.delete()
1✔
311
        repeat_deleted_relay_address = RelayAddress.objects.create(
1✔
312
            user=user, address=address
313
        )
314
        assert not repeat_deleted_relay_address.address == address
1✔
315

316
    @patch("emails.validators.badwords", return_value=[])
1✔
317
    @patch("emails.validators.blocklist", return_value=["blocked-word"])
1✔
318
    def test_address_contains_blocklist_invalid(
1✔
319
        self, mock_blocklist: Mock, mock_badwords: Mock
320
    ) -> None:
321
        blocked_word = "blocked-word"
1✔
322
        relay_address = RelayAddress.objects.create(
1✔
323
            user=baker.make(User), address=blocked_word
324
        )
325
        assert not relay_address.address == blocked_word
1✔
326

327
    def test_free_user_cant_set_block_list_emails(self):
1✔
328
        relay_address = RelayAddress.objects.create(user=self.user)
1✔
329
        relay_address.block_list_emails = True
1✔
330
        relay_address.save()
1✔
331
        relay_address.refresh_from_db()
1✔
332
        assert relay_address.block_list_emails is False
1✔
333

334
    def test_premium_user_can_set_block_list_emails(self):
1✔
335
        relay_address = RelayAddress.objects.create(user=self.premium_user)
1✔
336
        assert relay_address.block_list_emails is False
1✔
337
        relay_address.block_list_emails = True
1✔
338
        relay_address.save()
1✔
339
        relay_address.refresh_from_db()
1✔
340
        assert relay_address.block_list_emails is True
1✔
341

342
    def test_formerly_premium_user_clears_block_list_emails(self):
1✔
343
        relay_address = RelayAddress.objects.create(
1✔
344
            user=self.premium_user, block_list_emails=True
345
        )
346
        relay_address.refresh_from_db()
1✔
347
        assert relay_address.block_list_emails is True
1✔
348

349
        # Remove premium from user
350
        assert (fxa_account := self.premium_user.profile.fxa) is not None
1✔
351
        fxa_account.extra_data["subscriptions"] = []
1✔
352
        fxa_account.save()
1✔
353
        assert not self.premium_user.profile.has_premium
1✔
354

355
        relay_address.save()
1✔
356
        assert relay_address.block_list_emails is False
1✔
357

358
    def test_storageless_user_cant_set_label(self):
1✔
359
        relay_address = RelayAddress.objects.create(user=self.storageless_user)
1✔
360
        assert relay_address.description == ""
1✔
361
        assert relay_address.generated_for == ""
1✔
362
        assert relay_address.used_on in (None, "")
1✔
363
        relay_address.description = "Arbitrary description"
1✔
364
        relay_address.generated_for = "https://example.com"
1✔
365
        relay_address.used_on = "https://example.com"
1✔
366
        relay_address.save()
1✔
367
        relay_address.refresh_from_db()
1✔
368
        assert relay_address.description == ""
1✔
369
        assert relay_address.generated_for == ""
1✔
370
        assert relay_address.used_on in (None, "")
1✔
371

372
    def test_clear_storage_with_update_fields(self) -> None:
1✔
373
        """
374
        With update_fields, the stored data is still cleared for storageless users.
375
        """
376
        relay_address = RelayAddress.objects.create(user=self.storageless_user)
1✔
377
        assert relay_address.description == ""
1✔
378

379
        # Use QuerySet.update to avoid model save method
380
        RelayAddress.objects.filter(id=relay_address.id).update(
1✔
381
            description="the description",
382
            generated_for="https://example.com",
383
            used_on="https://example.com",
384
        )
385
        relay_address.refresh_from_db()
1✔
386
        assert relay_address.description == "the description"
1✔
387
        assert relay_address.generated_for == "https://example.com"
1✔
388
        assert relay_address.used_on == "https://example.com"
1✔
389

390
        # Update a different field with update_fields to avoid full model save
391
        new_last_used_at = datetime(2024, 1, 11, tzinfo=UTC)
1✔
392
        relay_address.last_used_at = new_last_used_at
1✔
393
        relay_address.save(update_fields={"last_used_at"})
1✔
394

395
        # Since .save() added to update_fields, the storage fields are cleared
396
        relay_address.refresh_from_db()
1✔
397
        assert relay_address.last_used_at == new_last_used_at
1✔
398
        assert relay_address.description == ""
1✔
399
        assert relay_address.generated_for == ""
1✔
400
        assert relay_address.used_on in ("", None)
1✔
401

402
    def test_clear_block_list_emails_with_update_fields(self) -> None:
1✔
403
        """
404
        With update_fields, the block_list_emails flag is still cleared for free users.
405
        """
406
        relay_address = RelayAddress.objects.create(user=self.user)
1✔
407
        assert not relay_address.block_list_emails
1✔
408

409
        # Use QuerySet.update to avoid model save method
410
        RelayAddress.objects.filter(id=relay_address.id).update(block_list_emails=True)
1✔
411
        relay_address.refresh_from_db()
1✔
412
        assert relay_address.block_list_emails
1✔
413

414
        # Update a different field with update_fields to avoid full model save
415
        new_last_used_at = datetime(2024, 1, 12, tzinfo=UTC)
1✔
416
        relay_address.last_used_at = new_last_used_at
1✔
417
        relay_address.save(update_fields={"last_used_at"})
1✔
418

419
        # Since .save() added to update_fields, block_list_emails flag is cleared
420
        relay_address.refresh_from_db()
1✔
421
        assert relay_address.last_used_at == new_last_used_at
1✔
422
        assert not relay_address.block_list_emails
1✔
423

424
    def test_metrics_id(self):
1✔
425
        relay_address = RelayAddress.objects.create(user=self.user)
1✔
426
        assert relay_address.metrics_id == f"R{relay_address.id}"
1✔
427

428

429
class ProfileTestCase(TestCase):
1✔
430
    """Base class for Profile tests."""
431

432
    def setUp(self) -> None:
1✔
433
        user = baker.make(User)
1✔
434
        self.profile = user.profile
1✔
435
        assert self.profile.server_storage is True
1✔
436

437
    def get_or_create_social_account(self) -> SocialAccount:
1✔
438
        """Get the test user's social account, creating if needed."""
439
        social_account, _ = SocialAccount.objects.get_or_create(
1✔
440
            user=self.profile.user,
441
            provider="fxa",
442
            defaults={
443
                "uid": str(uuid4()),
444
                "extra_data": {"avatar": "image.png", "subscriptions": []},
445
            },
446
        )
447
        return social_account
1✔
448

449
    def upgrade_to_premium(self) -> None:
1✔
450
        """Add an unlimited emails subscription to the user."""
451
        social_account = self.get_or_create_social_account()
1✔
452
        social_account.extra_data["subscriptions"].append(premium_subscription())
1✔
453
        social_account.save()
1✔
454

455
    def upgrade_to_phone(self) -> None:
1✔
456
        """Add a phone plan to the user."""
457
        social_account = self.get_or_create_social_account()
1✔
458
        social_account.extra_data["subscriptions"].append(phone_subscription())
1✔
459
        if not self.profile.has_premium:
1!
460
            social_account.extra_data["subscriptions"].append(premium_subscription())
1✔
461
        social_account.save()
1✔
462

463
    def upgrade_to_vpn_bundle(self) -> None:
1✔
464
        """Add a phone plan to the user."""
465
        social_account = self.get_or_create_social_account()
1✔
466
        social_account.extra_data["subscriptions"].append(vpn_subscription())
1✔
467
        if not self.profile.has_premium:
1!
468
            social_account.extra_data["subscriptions"].append(premium_subscription())
1✔
469
        if not self.profile.has_phone:
1!
470
            social_account.extra_data["subscriptions"].append(phone_subscription())
1✔
471
        social_account.save()
1✔
472

473

474
class ProfileBounceTestCase(ProfileTestCase):
1✔
475
    """Base class for Profile tests that check for bounces."""
476

477
    def set_hard_bounce(self) -> datetime:
1✔
478
        """
479
        Set a hard bounce pause for the profile, return the bounce time.
480

481
        This happens when the user's email server reports a hard bounce, such as
482
        saying the email does not exist.
483
        """
484
        self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
1✔
485
            days=settings.HARD_BOUNCE_ALLOWED_DAYS - 1
486
        )
487
        self.profile.save()
1✔
488
        return self.profile.last_hard_bounce
1✔
489

490
    def set_soft_bounce(self) -> datetime:
1✔
491
        """
492
        Set a soft bounce for the profile, return the bounce time.
493

494
        This happens when the user's email server reports a soft bounce, such as
495
        saying the user's mailbox is full.
496
        """
497
        self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
1✔
498
            days=settings.SOFT_BOUNCE_ALLOWED_DAYS - 1
499
        )
500
        self.profile.save()
1✔
501
        return self.profile.last_soft_bounce
1✔
502

503

504
class ProfileCheckBouncePause(ProfileBounceTestCase):
1✔
505
    """Tests for Profile.check_bounce_pause()"""
506

507
    def test_no_bounces(self) -> None:
1✔
508
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
509

510
        assert bounce_paused is False
1✔
511
        assert bounce_type == ""
1✔
512

513
    def test_hard_bounce_pending(self) -> None:
1✔
514
        self.set_hard_bounce()
1✔
515
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
516
        assert bounce_paused is True
1✔
517
        assert bounce_type == "hard"
1✔
518

519
    def test_soft_bounce_pending(self) -> None:
1✔
520
        self.set_soft_bounce()
1✔
521
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
522
        assert bounce_paused is True
1✔
523
        assert bounce_type == "soft"
1✔
524

525
    def test_hard_and_soft_bounce_pending_shows_hard(self) -> None:
1✔
526
        self.set_hard_bounce()
1✔
527
        self.set_soft_bounce()
1✔
528
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
529
        assert bounce_paused is True
1✔
530
        assert bounce_type == "hard"
1✔
531

532
    def test_hard_bounce_over_resets_timer(self) -> None:
1✔
533
        self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
1✔
534
            days=settings.HARD_BOUNCE_ALLOWED_DAYS + 1
535
        )
536
        self.profile.save()
1✔
537
        assert self.profile.last_hard_bounce is not None
1✔
538

539
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
540

541
        assert bounce_paused is False
1✔
542
        assert bounce_type == ""
1✔
543
        assert self.profile.last_hard_bounce is None
1✔
544

545
    def test_soft_bounce_over_resets_timer(self) -> None:
1✔
546
        self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
1✔
547
            days=settings.SOFT_BOUNCE_ALLOWED_DAYS + 1
548
        )
549
        self.profile.save()
1✔
550
        assert self.profile.last_soft_bounce is not None
1✔
551

552
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
553

554
        assert bounce_paused is False
1✔
555
        assert bounce_type == ""
1✔
556
        assert self.profile.last_soft_bounce is None
1✔
557

558

559
class ProfileNextEmailTryDateTest(ProfileBounceTestCase):
1✔
560
    """Tests for Profile.next_email_try"""
561

562
    def test_no_bounces_returns_today(self) -> None:
1✔
563
        assert self.profile.next_email_try.date() == datetime.now(UTC).date()
1✔
564

565
    def test_hard_bounce_returns_proper_datemath(self) -> None:
1✔
566
        last_hard_bounce = self.set_hard_bounce()
1✔
567
        expected_next_try_date = last_hard_bounce + timedelta(
1✔
568
            days=settings.HARD_BOUNCE_ALLOWED_DAYS
569
        )
570
        assert self.profile.next_email_try.date() == expected_next_try_date.date()
1✔
571

572
    def test_soft_bounce_returns_proper_datemath(self) -> None:
1✔
573
        last_soft_bounce = self.set_soft_bounce()
1✔
574
        expected_next_try_date = last_soft_bounce + timedelta(
1✔
575
            days=settings.SOFT_BOUNCE_ALLOWED_DAYS
576
        )
577
        assert self.profile.next_email_try.date() == expected_next_try_date.date()
1✔
578

579
    def test_hard_and_soft_bounce_returns_hard_datemath(self) -> None:
1✔
580
        last_soft_bounce = self.set_soft_bounce()
1✔
581
        last_hard_bounce = self.set_hard_bounce()
1✔
582
        assert last_soft_bounce != last_hard_bounce
1✔
583
        expected_next_try_date = last_hard_bounce + timedelta(
1✔
584
            days=settings.HARD_BOUNCE_ALLOWED_DAYS
585
        )
586
        assert self.profile.next_email_try.date() == expected_next_try_date.date()
1✔
587

588

589
class ProfileLastBounceDateTest(ProfileBounceTestCase):
1✔
590
    """Tests for Profile.last_bounce_date"""
591

592
    def test_no_bounces_returns_None(self) -> None:
1✔
593
        assert self.profile.last_bounce_date is None
1✔
594

595
    def test_soft_bounce_returns_its_date(self) -> None:
1✔
596
        self.set_soft_bounce()
1✔
597
        assert self.profile.last_bounce_date == self.profile.last_soft_bounce
1✔
598

599
    def test_hard_bounce_returns_its_date(self) -> None:
1✔
600
        self.set_hard_bounce()
1✔
601
        assert self.profile.last_bounce_date == self.profile.last_hard_bounce
1✔
602

603
    def test_hard_and_soft_bounces_returns_hard_date(self) -> None:
1✔
604
        self.set_soft_bounce()
1✔
605
        self.set_hard_bounce()
1✔
606
        assert self.profile.last_bounce_date == self.profile.last_hard_bounce
1✔
607

608

609
class ProfileHasPremiumTest(ProfileTestCase):
1✔
610
    """Tests for Profile.has_premium"""
611

612
    def test_default_False(self) -> None:
1✔
613
        assert self.profile.has_premium is False
1✔
614

615
    def test_premium_subscription_returns_True(self) -> None:
1✔
616
        self.upgrade_to_premium()
1✔
617
        assert self.profile.has_premium is True
1✔
618

619
    def test_phone_returns_True(self) -> None:
1✔
620
        self.upgrade_to_phone()
1✔
621
        assert self.profile.has_premium is True
1✔
622

623
    def test_vpn_bundle_returns_True(self) -> None:
1✔
624
        self.upgrade_to_vpn_bundle()
1✔
625
        assert self.profile.has_premium is True
1✔
626

627

628
class ProfileHasPhoneTest(ProfileTestCase):
1✔
629
    """Tests for Profile.has_phone"""
630

631
    def test_default_False(self) -> None:
1✔
632
        assert self.profile.has_phone is False
1✔
633

634
    def test_premium_subscription_returns_False(self) -> None:
1✔
635
        self.upgrade_to_premium()
1✔
636
        assert self.profile.has_phone is False
1✔
637

638
    def test_phone_returns_True(self) -> None:
1✔
639
        self.upgrade_to_phone()
1✔
640
        assert self.profile.has_phone is True
1✔
641

642
    def test_vpn_bundle_returns_True(self) -> None:
1✔
643
        self.upgrade_to_vpn_bundle()
1✔
644
        assert self.profile.has_phone is True
1✔
645

646

647
@pytest.mark.skipif(not settings.PHONES_ENABLED, reason="PHONES_ENABLED is False")
1✔
648
@override_settings(PHONES_NO_CLIENT_CALLS_IN_TEST=True)
1✔
649
class ProfileDatePhoneRegisteredTest(ProfileTestCase):
1✔
650
    """Tests for Profile.date_phone_registered"""
651

652
    def test_default_None(self) -> None:
1✔
653
        assert self.profile.date_phone_registered is None
1✔
654

655
    def test_real_phone_no_relay_number_returns_verified_date(self) -> None:
1✔
656
        self.upgrade_to_phone()
1✔
657
        datetime_now = datetime.now(UTC)
1✔
658
        RealPhone.objects.create(
1✔
659
            user=self.profile.user,
660
            number="+12223334444",
661
            verified=True,
662
            verified_date=datetime_now,
663
        )
664
        assert self.profile.date_phone_registered == datetime_now
1✔
665

666
    def test_real_phone_and_relay_number_w_created_at_returns_created_at_date(
1✔
667
        self,
668
    ) -> None:
669
        self.upgrade_to_phone()
1✔
670
        datetime_now = datetime.now(UTC)
1✔
671
        phone_user = self.profile.user
1✔
672
        RealPhone.objects.create(
1✔
673
            user=phone_user,
674
            number="+12223334444",
675
            verified=True,
676
            verified_date=datetime_now,
677
        )
678
        relay_number = RelayNumber.objects.create(user=phone_user)
1✔
679
        assert self.profile.date_phone_registered == relay_number.created_at
1✔
680

681
    def test_real_phone_and_relay_number_wo_created_at_returns_verified_date(
1✔
682
        self,
683
    ) -> None:
684
        self.upgrade_to_phone()
1✔
685
        datetime_now = datetime.now(UTC)
1✔
686
        phone_user = self.profile.user
1✔
687
        real_phone = RealPhone.objects.create(
1✔
688
            user=phone_user,
689
            number="+12223334444",
690
            verified=True,
691
            verified_date=datetime_now,
692
        )
693
        relay_number = RelayNumber.objects.create(user=phone_user)
1✔
694
        # since created_at is auto set, update to None
695
        relay_number.created_at = None
1✔
696
        relay_number.save()
1✔
697
        assert self.profile.date_phone_registered == real_phone.verified_date
1✔
698

699

700
class ProfileTotalMasksTest(ProfileTestCase):
1✔
701
    """Tests for Profile.total_masks"""
702

703
    def test_total_masks(self) -> None:
1✔
704
        self.upgrade_to_premium()
1✔
705
        self.profile.add_subdomain("totalmasks")
1✔
706
        assert self.profile.total_masks == 0
1✔
707
        num_relay_addresses = random.randint(0, 2)
1✔
708
        for _ in list(range(num_relay_addresses)):
1!
709
            baker.make(RelayAddress, user=self.profile.user)
×
710
        num_domain_addresses = random.randint(0, 2)
1✔
711
        for i in list(range(num_domain_addresses)):
1!
712
            baker.make(DomainAddress, user=self.profile.user, address=f"mask{i}")
×
713
        assert self.profile.total_masks == num_relay_addresses + num_domain_addresses
1✔
714

715

716
class ProfileAtMaskLimitTest(ProfileTestCase):
1✔
717
    """Tests for Profile.at_mask_limit"""
718

719
    def test_premium_user_returns_False(self) -> None:
1✔
720
        self.upgrade_to_premium()
1✔
721
        assert self.profile.at_mask_limit is False
1✔
722
        baker.make(
1✔
723
            RelayAddress,
724
            user=self.profile.user,
725
            _quantity=settings.MAX_NUM_FREE_ALIASES,
726
        )
727
        assert self.profile.at_mask_limit is False
1✔
728

729
    def test_free_user(self) -> None:
1✔
730
        assert self.profile.at_mask_limit is False
1✔
731
        baker.make(
1✔
732
            RelayAddress,
733
            user=self.profile.user,
734
            _quantity=settings.MAX_NUM_FREE_ALIASES,
735
        )
736
        assert self.profile.at_mask_limit is True
1✔
737

738

739
class ProfileAddSubdomainTest(ProfileTestCase):
1✔
740
    """Tests for Profile.add_subdomain()"""
741

742
    def test_new_unlimited_profile(self) -> None:
1✔
743
        self.upgrade_to_premium()
1✔
744
        assert self.profile.add_subdomain("newpremium") == "newpremium"
1✔
745

746
    def test_lowercases_subdomain_value(self) -> None:
1✔
747
        self.upgrade_to_premium()
1✔
748
        assert self.profile.add_subdomain("mIxEdcAsE") == "mixedcase"
1✔
749

750
    def test_non_premium_user_raises_exception(self) -> None:
1✔
751
        expected_msg = "error-premium-set-subdomain"
1✔
752
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
753
            self.profile.add_subdomain("test")
1✔
754

755
    def test_calling_again_raises_exception(self) -> None:
1✔
756
        self.upgrade_to_premium()
1✔
757
        subdomain = "test"
1✔
758
        self.profile.subdomain = subdomain
1✔
759
        self.profile.save()
1✔
760

761
        expected_msg = "error-premium-cannot-change-subdomain"
1✔
762
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
763
            self.profile.add_subdomain(subdomain)
1✔
764

765
    def test_badword_subdomain_raises_exception(self) -> None:
1✔
766
        self.upgrade_to_premium()
1✔
767
        expected_msg = "error-subdomain-not-available"
1✔
768
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
769
            self.profile.add_subdomain("angry")
1✔
770

771
    def test_blocked_word_subdomain_raises_exception(self) -> None:
1✔
772
        self.upgrade_to_premium()
1✔
773
        expected_msg = "error-subdomain-not-available"
1✔
774
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
775
            self.profile.add_subdomain("mozilla")
1✔
776

777
    def test_empty_subdomain_raises(self) -> None:
1✔
778
        self.upgrade_to_premium()
1✔
779
        expected_msg = "error-subdomain-cannot-be-empty-or-null"
1✔
780

781
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
782
            self.profile.add_subdomain("")
1✔
783

784
    def test_null_subdomain_raises(self) -> None:
1✔
785
        self.upgrade_to_premium()
1✔
786
        expected_msg = "error-subdomain-cannot-be-empty-or-null"
1✔
787

788
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
789
            self.profile.add_subdomain(None)
1✔
790

791
    def test_subdomain_with_space_at_end_raises(self) -> None:
1✔
792
        self.upgrade_to_premium()
1✔
793
        expected_msg = "error-subdomain-not-available"
1✔
794

795
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
796
            self.profile.add_subdomain("mydomain ")
1✔
797

798

799
class ProfileSaveTest(ProfileTestCase):
1✔
800
    """Tests for Profile.save()"""
801

802
    def test_lowercases_subdomain_value(self) -> None:
1✔
803
        self.upgrade_to_premium()
1✔
804
        self.profile.subdomain = "mIxEdcAsE"
1✔
805
        self.profile.save()
1✔
806
        assert self.profile.subdomain == "mixedcase"
1✔
807

808
    def test_lowercases_subdomain_value_with_update_fields(self) -> None:
1✔
809
        """With update_fields, the subdomain is still lowercased."""
810
        self.upgrade_to_premium()
1✔
811
        assert self.profile.subdomain is None
1✔
812

813
        # Use QuerySet.update to avoid model .save()
814
        Profile.objects.filter(id=self.profile.id).update(subdomain="mIxEdcAsE")
1✔
815
        self.profile.refresh_from_db()
1✔
816
        assert self.profile.subdomain == "mIxEdcAsE"
1✔
817

818
        # Update a different field with update_fields to avoid a full model save
819
        new_date_subscribed = datetime(2023, 3, 3, tzinfo=UTC)
1✔
820
        self.profile.date_subscribed = new_date_subscribed
1✔
821
        self.profile.save(update_fields={"date_subscribed"})
1✔
822

823
        # Since .save() added to update_fields, subdomain is now lowercase
824
        self.profile.refresh_from_db()
1✔
825
        assert self.profile.date_subscribed == new_date_subscribed
1✔
826
        assert self.profile.subdomain == "mixedcase"
1✔
827

828
    TEST_DESCRIPTION = "test description"
1✔
829
    TEST_USED_ON = TEST_GENERATED_FOR = "secret.com"
1✔
830

831
    def add_relay_address(self) -> RelayAddress:
1✔
832
        return baker.make(
1✔
833
            RelayAddress,
834
            user=self.profile.user,
835
            description=self.TEST_DESCRIPTION,
836
            generated_for=self.TEST_GENERATED_FOR,
837
            used_on=self.TEST_USED_ON,
838
        )
839

840
    def add_domain_address(self) -> DomainAddress:
1✔
841
        self.upgrade_to_premium()
1✔
842
        self.profile.subdomain = "somesubdomain"
1✔
843
        self.profile.save()
1✔
844
        return baker.make(
1✔
845
            DomainAddress,
846
            user=self.profile.user,
847
            address="localpart",
848
            description=self.TEST_DESCRIPTION,
849
            used_on=self.TEST_USED_ON,
850
        )
851

852
    def test_save_server_storage_true_doesnt_delete_data(self) -> None:
1✔
853
        relay_address = self.add_relay_address()
1✔
854
        self.profile.server_storage = True
1✔
855
        self.profile.save()
1✔
856

857
        relay_address.refresh_from_db()
1✔
858
        assert relay_address.description == self.TEST_DESCRIPTION
1✔
859
        assert relay_address.generated_for == self.TEST_GENERATED_FOR
1✔
860
        assert relay_address.used_on == self.TEST_USED_ON
1✔
861

862
    def test_save_server_storage_false_deletes_data(self) -> None:
1✔
863
        relay_address = self.add_relay_address()
1✔
864
        domain_address = self.add_domain_address()
1✔
865
        self.profile.server_storage = False
1✔
866
        self.profile.save()
1✔
867

868
        relay_address.refresh_from_db()
1✔
869
        domain_address.refresh_from_db()
1✔
870
        assert relay_address.description == ""
1✔
871
        assert relay_address.generated_for == ""
1✔
872
        assert relay_address.used_on == ""
1✔
873
        assert domain_address.description == ""
1✔
874
        assert domain_address.used_on == ""
1✔
875

876
    def add_four_relay_addresses(self, user: User | None = None) -> list[RelayAddress]:
1✔
877
        if user is None:
1✔
878
            user = self.profile.user
1✔
879
        return baker.make(
1✔
880
            RelayAddress,
881
            user=user,
882
            description=self.TEST_DESCRIPTION,
883
            generated_for=self.TEST_GENERATED_FOR,
884
            used_on=self.TEST_USED_ON,
885
            _quantity=4,
886
        )
887

888
    def test_save_server_storage_false_deletes_ALL_data(self) -> None:
1✔
889
        self.add_four_relay_addresses()
1✔
890
        self.profile.server_storage = False
1✔
891
        self.profile.save()
1✔
892

893
        for relay_address in RelayAddress.objects.filter(user=self.profile.user):
1✔
894
            assert relay_address.description == ""
1✔
895
            assert relay_address.generated_for == ""
1✔
896

897
    def test_save_server_storage_false_only_deletes_that_profiles_data(self) -> None:
1✔
898
        other_user = make_free_test_user()
1✔
899
        assert other_user.profile.server_storage is True
1✔
900
        self.add_four_relay_addresses()
1✔
901
        self.add_four_relay_addresses(user=other_user)
1✔
902
        self.profile.server_storage = False
1✔
903
        self.profile.save()
1✔
904

905
        for relay_address in RelayAddress.objects.filter(user=self.profile.user):
1✔
906
            assert relay_address.description == ""
1✔
907
            assert relay_address.generated_for == ""
1✔
908
            assert relay_address.used_on == ""
1✔
909

910
        for relay_address in RelayAddress.objects.filter(user=other_user):
1✔
911
            assert relay_address.description == self.TEST_DESCRIPTION
1✔
912
            assert relay_address.generated_for == self.TEST_GENERATED_FOR
1✔
913
            assert relay_address.used_on == self.TEST_USED_ON
1✔
914

915

916
class ProfileDisplayNameTest(ProfileTestCase):
1✔
917
    """Tests for Profile.display_name"""
918

919
    def test_exists(self) -> None:
1✔
920
        display_name = "Display Name"
1✔
921
        social_account = self.get_or_create_social_account()
1✔
922
        social_account.extra_data["displayName"] = display_name
1✔
923
        social_account.save()
1✔
924
        assert self.profile.display_name == display_name
1✔
925

926
    def test_display_name_does_not_exist(self) -> None:
1✔
927
        self.get_or_create_social_account()
1✔
928
        assert self.profile.display_name is None
1✔
929

930

931
class ProfileLanguageTest(ProfileTestCase):
1✔
932
    """Test Profile.language"""
933

934
    def test_no_fxa_extra_data_locale_returns_default_en(self) -> None:
1✔
935
        social_account = self.get_or_create_social_account()
1✔
936
        assert "locale" not in social_account.extra_data
1✔
937
        assert self.profile.language == "en"
1✔
938

939
    def test_no_fxa_locale_returns_default_en(self) -> None:
1✔
940
        assert self.profile.language == "en"
1✔
941

942
    def test_fxa_locale_de_returns_de(self) -> None:
1✔
943
        social_account = self.get_or_create_social_account()
1✔
944
        social_account.extra_data["locale"] = "de,en-US;q=0.9,en;q=0.8"
1✔
945
        social_account.save()
1✔
946
        assert self.profile.language == "de"
1✔
947

948

949
class ProfileFxaLocaleInPremiumCountryTest(ProfileTestCase):
1✔
950
    """Tests for Profile.fxa_locale_in_premium_country"""
951

952
    def set_fxa_locale(self, locale: str) -> None:
1✔
953
        social_account = self.get_or_create_social_account()
1✔
954
        social_account.extra_data["locale"] = locale
1✔
955
        social_account.save()
1✔
956

957
    def test_when_premium_available_returns_True(self) -> None:
1✔
958
        self.set_fxa_locale("de-DE,en-xx;q=0.9,en;q=0.8")
1✔
959
        assert self.profile.fxa_locale_in_premium_country is True
1✔
960

961
    def test_en_implies_premium_available(self) -> None:
1✔
962
        self.set_fxa_locale("en;q=0.8")
1✔
963
        assert self.profile.fxa_locale_in_premium_country is True
1✔
964

965
    def test_when_premium_unavailable_returns_False(self) -> None:
1✔
966
        self.set_fxa_locale("en-IN, en;q=0.8")
1✔
967
        assert self.profile.fxa_locale_in_premium_country is False
1✔
968

969
    def test_when_premium_available_by_language_code_returns_True(self) -> None:
1✔
970
        self.set_fxa_locale("de;q=0.8")
1✔
971
        assert self.profile.fxa_locale_in_premium_country is True
1✔
972

973
    def test_invalid_language_code_returns_False(self) -> None:
1✔
974
        self.set_fxa_locale("xx;q=0.8")
1✔
975
        assert self.profile.fxa_locale_in_premium_country is False
1✔
976

977
    def test_when_premium_unavailable_by_language_code_returns_False(self) -> None:
1✔
978
        self.set_fxa_locale("zh;q=0.8")
1✔
979
        assert self.profile.fxa_locale_in_premium_country is False
1✔
980

981
    def test_no_fxa_account_returns_False(self) -> None:
1✔
982
        assert self.profile.fxa_locale_in_premium_country is False
1✔
983

984
    def test_in_estonia(self):
1✔
985
        """Estonia (EE) was added in August 2023."""
986
        self.set_fxa_locale("et-ee,et;q=0.8")
1✔
987
        assert self.profile.fxa_locale_in_premium_country is True
1✔
988

989

990
class ProfileJoinedBeforePremiumReleaseTest(ProfileTestCase):
1✔
991
    """Tests for Profile.joined_before_premium_release"""
992

993
    def test_returns_True(self) -> None:
1✔
994
        before = "2021-10-18 17:00:00+00:00"
1✔
995
        self.profile.user.date_joined = datetime.fromisoformat(before)
1✔
996
        assert self.profile.joined_before_premium_release
1✔
997

998
    def test_returns_False(self) -> None:
1✔
999
        after = "2021-10-28 17:00:00+00:00"
1✔
1000
        self.profile.user.date_joined = datetime.fromisoformat(after)
1✔
1001
        assert self.profile.joined_before_premium_release is False
1✔
1002

1003

1004
class ProfileDefaultsTest(ProfileTestCase):
1✔
1005
    """Tests for default Profile values"""
1006

1007
    def test_user_created_after_premium_release_server_storage_True(self) -> None:
1✔
1008
        assert self.profile.server_storage
1✔
1009

1010
    def test_emails_replied_new_user_aggregates_sum_of_replies_to_zero(self) -> None:
1✔
1011
        assert self.profile.emails_replied == 0
1✔
1012

1013

1014
class ProfileEmailsRepliedTest(ProfileTestCase):
1✔
1015
    """Tests for Profile.emails_replied"""
1016

1017
    def test_premium_user_aggregates_replies_from_all_addresses(self) -> None:
1✔
1018
        self.upgrade_to_premium()
1✔
1019
        self.profile.subdomain = "test"
1✔
1020
        self.profile.num_email_replied_in_deleted_address = 1
1✔
1021
        self.profile.save()
1✔
1022
        baker.make(RelayAddress, user=self.profile.user, num_replied=3)
1✔
1023
        baker.make(
1✔
1024
            DomainAddress, user=self.profile.user, address="lower-case", num_replied=5
1025
        )
1026

1027
        assert self.profile.emails_replied == 9
1✔
1028

1029
    def test_free_user_aggregates_replies_from_relay_addresses(self) -> None:
1✔
1030
        baker.make(RelayAddress, user=self.profile.user, num_replied=3)
1✔
1031
        baker.make(RelayAddress, user=self.profile.user, num_replied=5)
1✔
1032

1033
        assert self.profile.emails_replied == 8
1✔
1034

1035

1036
class ProfileUpdateAbuseMetricTest(ProfileTestCase):
1✔
1037
    """Tests for Profile.update_abuse_metric()"""
1038

1039
    def setUp(self) -> None:
1✔
1040
        super().setUp()
1✔
1041
        self.get_or_create_social_account()
1✔
1042
        self.abuse_metric = baker.make(AbuseMetrics, user=self.profile.user)
1✔
1043

1044
        patcher_logger = patch("emails.models.abuse_logger.info")
1✔
1045
        self.mocked_abuse_info = patcher_logger.start()
1✔
1046
        self.addCleanup(patcher_logger.stop)
1✔
1047

1048
        # Selectively patch datatime.now() for emails models
1049
        # https://docs.python.org/3/library/unittest.mock-examples.html#partial-mocking
1050
        patcher = patch("emails.models.datetime")
1✔
1051
        mocked_datetime = patcher.start()
1✔
1052
        self.addCleanup(patcher.stop)
1✔
1053

1054
        self.expected_now = datetime.now(UTC)
1✔
1055
        mocked_datetime.combine.return_value = datetime.combine(
1✔
1056
            datetime.now(UTC).date(), datetime.min.time()
1057
        )
1058
        mocked_datetime.now.return_value = self.expected_now
1✔
1059
        mocked_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw)
1!
1060

1061
    @override_settings(MAX_FORWARDED_PER_DAY=5)
1✔
1062
    def test_flags_profile_when_emails_forwarded_abuse_threshold_met(self) -> None:
1✔
1063
        self.abuse_metric.num_email_forwarded_per_day = 4
1✔
1064
        self.abuse_metric.save()
1✔
1065
        assert self.profile.last_account_flagged is None
1✔
1066

1067
        self.profile.update_abuse_metric(email_forwarded=True)
1✔
1068
        self.abuse_metric.refresh_from_db()
1✔
1069

1070
        assert self.profile.fxa
1✔
1071
        self.mocked_abuse_info.assert_called_once_with(
1✔
1072
            "Abuse flagged",
1073
            extra={
1074
                "uid": self.profile.fxa.uid,
1075
                "flagged": self.expected_now.timestamp(),
1076
                "replies": 0,
1077
                "addresses": 0,
1078
                "forwarded": 5,
1079
                "forwarded_size_in_bytes": 0,
1080
            },
1081
        )
1082
        assert self.abuse_metric.num_email_forwarded_per_day == 5
1✔
1083
        assert self.profile.last_account_flagged == self.expected_now
1✔
1084

1085
    @override_settings(MAX_FORWARDED_EMAIL_SIZE_PER_DAY=100)
1✔
1086
    def test_flags_profile_when_forwarded_email_size_abuse_threshold_met(self) -> None:
1✔
1087
        self.abuse_metric.forwarded_email_size_per_day = 50
1✔
1088
        self.abuse_metric.save()
1✔
1089
        assert self.profile.last_account_flagged is None
1✔
1090

1091
        self.profile.update_abuse_metric(forwarded_email_size=50)
1✔
1092
        self.abuse_metric.refresh_from_db()
1✔
1093

1094
        assert self.profile.fxa
1✔
1095
        self.mocked_abuse_info.assert_called_once_with(
1✔
1096
            "Abuse flagged",
1097
            extra={
1098
                "uid": self.profile.fxa.uid,
1099
                "flagged": self.expected_now.timestamp(),
1100
                "replies": 0,
1101
                "addresses": 0,
1102
                "forwarded": 0,
1103
                "forwarded_size_in_bytes": 100,
1104
            },
1105
        )
1106
        assert self.abuse_metric.forwarded_email_size_per_day == 100
1✔
1107
        assert self.profile.last_account_flagged == self.expected_now
1✔
1108

1109

1110
class ProfileMetricsEnabledTest(ProfileTestCase):
1✔
1111
    def test_no_fxa_means_metrics_enabled(self) -> None:
1✔
1112
        assert not self.profile.fxa
1✔
1113
        assert self.profile.metrics_enabled
1✔
1114

1115
    def test_fxa_legacy_means_metrics_enabled(self) -> None:
1✔
1116
        self.get_or_create_social_account()
1✔
1117
        assert self.profile.fxa
1✔
1118
        assert "metricsEnabled" not in self.profile.fxa.extra_data
1✔
1119
        assert self.profile.metrics_enabled
1✔
1120

1121
    def test_fxa_opt_in_means_metrics_enabled(self) -> None:
1✔
1122
        social_account = self.get_or_create_social_account()
1✔
1123
        social_account.extra_data["metricsEnabled"] = True
1✔
1124
        social_account.save()
1✔
1125
        assert self.profile.fxa
1✔
1126
        assert self.profile.metrics_enabled
1✔
1127

1128
    def test_fxa_opt_out_means_metrics_disabled(self) -> None:
1✔
1129
        social_account = self.get_or_create_social_account()
1✔
1130
        social_account.extra_data["metricsEnabled"] = False
1✔
1131
        social_account.save()
1✔
1132
        assert self.profile.fxa
1✔
1133
        assert not self.profile.metrics_enabled
1✔
1134

1135

1136
class ProfilePlanTest(ProfileTestCase):
1✔
1137
    def test_free_user(self) -> None:
1✔
1138
        assert self.profile.plan == "free"
1✔
1139

1140
    def test_premium_user(self) -> None:
1✔
1141
        self.upgrade_to_premium()
1✔
1142
        assert self.profile.plan == "email"
1✔
1143

1144
    def test_phone_user(self) -> None:
1✔
1145
        self.upgrade_to_phone()
1✔
1146
        assert self.profile.plan == "phone"
1✔
1147

1148
    def test_vpn_bundle_user(self) -> None:
1✔
1149
        self.upgrade_to_vpn_bundle()
1✔
1150
        assert self.profile.plan == "bundle"
1✔
1151

1152

1153
class ProfilePlanTermTest(ProfileTestCase):
1✔
1154
    def test_free_user(self) -> None:
1✔
1155
        assert self.profile.plan_term is None
1✔
1156

1157
    def test_premium_user(self) -> None:
1✔
1158
        self.upgrade_to_premium()
1✔
1159
        assert self.profile.plan_term == "unknown"
1✔
1160

1161
    def test_phone_user(self) -> None:
1✔
1162
        self.upgrade_to_phone()
1✔
1163
        assert self.profile.plan_term == "unknown"
1✔
1164

1165
    def test_phone_user_1_month(self) -> None:
1✔
1166
        self.upgrade_to_phone()
1✔
1167
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1168

1169
        self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
1✔
1170
        assert self.profile.plan_term == "1_month"
1✔
1171

1172
    def test_phone_user_1_year(self) -> None:
1✔
1173
        self.upgrade_to_phone()
1✔
1174
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1175

1176
        self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
1✔
1177
        assert self.profile.plan_term == "1_year"
1✔
1178

1179
    def test_vpn_bundle_user(self) -> None:
1✔
1180
        self.upgrade_to_vpn_bundle()
1✔
1181
        assert self.profile.plan_term == "unknown"
1✔
1182

1183

1184
class ProfileMetricsPremiumStatus(ProfileTestCase):
1✔
1185
    def test_free_user(self):
1✔
1186
        assert self.profile.metrics_premium_status == "free"
1✔
1187

1188
    def test_premium_user(self) -> None:
1✔
1189
        self.upgrade_to_premium()
1✔
1190
        assert self.profile.metrics_premium_status == "email_unknown"
1✔
1191

1192
    def test_phone_user(self) -> None:
1✔
1193
        self.upgrade_to_phone()
1✔
1194
        assert self.profile.metrics_premium_status == "phone_unknown"
1✔
1195

1196
    def test_phone_user_1_month(self) -> None:
1✔
1197
        self.upgrade_to_phone()
1✔
1198
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1199

1200
        self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
1✔
1201
        assert self.profile.metrics_premium_status == "phone_1_month"
1✔
1202

1203
    def test_phone_user_1_year(self) -> None:
1✔
1204
        self.upgrade_to_phone()
1✔
1205
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1206

1207
        self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
1✔
1208
        assert self.profile.metrics_premium_status == "phone_1_year"
1✔
1209

1210
    def test_vpn_bundle_user(self) -> None:
1✔
1211
        self.upgrade_to_vpn_bundle()
1✔
1212
        assert self.profile.metrics_premium_status == "bundle_unknown"
1✔
1213

1214

1215
class DomainAddressTest(TestCase):
1✔
1216
    def setUp(self):
1✔
1217
        self.subdomain = "test"
1✔
1218
        self.user = make_premium_test_user()
1✔
1219
        self.storageless_user = make_storageless_test_user()
1✔
1220
        self.user.profile.subdomain = self.subdomain
1✔
1221
        self.user.profile.save()
1✔
1222

1223
    def test_make_domain_address_assigns_to_user(self):
1✔
1224
        domain_address = DomainAddress.make_domain_address(self.user, "test-assigns")
1✔
1225
        assert domain_address.user == self.user
1✔
1226

1227
    @skip(reason="test not reliable, look at FIXME comment")
1✔
1228
    def test_make_domain_address_makes_different_addresses(self):
1✔
1229
        # FIXME: sometimes this test will fail because it randomly generates
1230
        # alias with bad words. See make_domain_address for why this has
1231
        # not been fixed yet
1232
        for i in range(5):
×
1233
            domain_address = DomainAddress.make_domain_address(
×
1234
                self.user, f"test-different-{i}"
1235
            )
1236
            assert domain_address.first_emailed_at is None
×
1237
        domain_addresses = DomainAddress.objects.filter(user=self.user).values_list(
×
1238
            "address", flat=True
1239
        )
1240
        # checks that there are 5 unique DomainAddress
1241
        assert len(set(domain_addresses)) == 5
×
1242

1243
    def test_make_domain_address_makes_requested_address(self):
1✔
1244
        domain_address = DomainAddress.make_domain_address(self.user, "foobar")
1✔
1245
        assert domain_address.address == "foobar"
1✔
1246
        assert domain_address.first_emailed_at is None
1✔
1247

1248
    @override_settings(MAX_ADDRESS_CREATION_PER_DAY=10)
1✔
1249
    def test_make_domain_address_has_limit(self) -> None:
1✔
1250
        for i in range(10):
1✔
1251
            DomainAddress.make_domain_address(self.user, "foobar" + str(i))
1✔
1252
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1253
            DomainAddress.make_domain_address(self.user, "one-too-many")
1✔
1254
        assert exc_info.value.get_codes() == "account_is_paused"
1✔
1255
        domain_address_count = DomainAddress.objects.filter(user=self.user).count()
1✔
1256
        assert domain_address_count == 10
1✔
1257

1258
    def test_make_domain_address_makes_requested_address_via_email(self):
1✔
1259
        domain_address = DomainAddress.make_domain_address(self.user, "foobar", True)
1✔
1260
        assert domain_address.address == "foobar"
1✔
1261
        assert domain_address.first_emailed_at is not None
1✔
1262

1263
    def test_make_domain_address_non_premium_user(self) -> None:
1✔
1264
        non_premium_user = baker.make(User)
1✔
1265
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1266
            DomainAddress.make_domain_address(non_premium_user, "test-non-premium")
1✔
1267
        assert exc_info.value.get_codes() == "free_tier_no_subdomain_masks"
1✔
1268

1269
    def test_make_domain_address_can_make_blocklisted_address(self):
1✔
1270
        domain_address = DomainAddress.make_domain_address(self.user, "testing")
1✔
1271
        assert domain_address.address == "testing"
1✔
1272

1273
    def test_make_domain_address_valid_premium_user_with_no_subdomain(self) -> None:
1✔
1274
        user = baker.make(User)
1✔
1275
        baker.make(
1✔
1276
            SocialAccount,
1277
            user=user,
1278
            provider="fxa",
1279
            extra_data={"subscriptions": [premium_subscription()]},
1280
        )
1281
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1282
            DomainAddress.make_domain_address(user, "test-nosubdomain")
1✔
1283
        assert exc_info.value.get_codes() == "need_subdomain"
1✔
1284

1285
    def test_make_domain_address_dupe_of_existing_raises(self):
1✔
1286
        address = "same-address"
1✔
1287
        DomainAddress.make_domain_address(self.user, address=address)
1✔
1288
        with pytest.raises(DomainAddrDuplicateException) as exc_info:
1✔
1289
            DomainAddress.make_domain_address(self.user, address=address)
1✔
1290
        assert exc_info.value.get_codes() == "duplicate_address"
1✔
1291

1292
    @override_flag("custom_domain_management_redesign", active=False)
1✔
1293
    def test_make_domain_address_can_make_dupe_of_deleted(self):
1✔
1294
        address = "same-address"
1✔
1295
        domain_address = DomainAddress.make_domain_address(self.user, address=address)
1✔
1296
        domain_address_hash = address_hash(
1✔
1297
            domain_address.address,
1298
            domain_address.user_profile.subdomain,
1299
            domain_address.domain_value,
1300
        )
1301
        domain_address.delete()
1✔
1302
        dupe_domain_address = DomainAddress.make_domain_address(
1✔
1303
            self.user, address=address
1304
        )
1305
        assert (
1✔
1306
            DeletedAddress.objects.filter(address_hash=domain_address_hash).count() == 1
1307
        )
1308
        assert dupe_domain_address.full_address == domain_address.full_address
1✔
1309

1310
    @override_flag("custom_domain_management_redesign", active=True)
1✔
1311
    def test_make_domain_address_cannot_make_dupe_of_deleted(self):
1✔
1312
        address = "same-address"
1✔
1313
        domain_address = DomainAddress.make_domain_address(self.user, address=address)
1✔
1314
        domain_address_hash = address_hash(
1✔
1315
            domain_address.address,
1316
            domain_address.user_profile.subdomain,
1317
            domain_address.domain_value,
1318
        )
1319
        domain_address.delete()
1✔
1320
        with pytest.raises(DomainAddrUnavailableException) as exc_info:
1✔
1321
            DomainAddress.make_domain_address(self.user, address=address)
1✔
1322
        assert exc_info.value.get_codes() == "address_unavailable"
1✔
1323
        assert (
1✔
1324
            DeletedAddress.objects.filter(address_hash=domain_address_hash).count() == 1
1325
        )
1326

1327
    @patch("emails.models.address_default")
1✔
1328
    def test_make_domain_address_doesnt_randomly_generate_bad_word(
1✔
1329
        self, address_default_mocked: Mock
1330
    ) -> None:
1331
        address_default_mocked.return_value = "angry0123"
1✔
1332
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1333
            DomainAddress.make_domain_address(self.user)
1✔
1334
        assert exc_info.value.get_codes() == "address_unavailable"
1✔
1335

1336
    def test_delete_adds_deleted_address_object(self):
1✔
1337
        domain_address = baker.make(DomainAddress, address="lower-case", user=self.user)
1✔
1338
        domain_address_hash = sha256(
1✔
1339
            domain_address.full_address.encode("utf-8")
1340
        ).hexdigest()
1341
        domain_address.delete()
1✔
1342
        deleted_address_qs = DeletedAddress.objects.filter(
1✔
1343
            address_hash=domain_address_hash
1344
        )
1345
        assert deleted_address_qs.count() == 1
1✔
1346
        assert deleted_address_qs.get().address_hash == domain_address_hash
1✔
1347

1348
    def test_premium_user_can_set_block_list_emails(self):
1✔
1349
        domain_address = DomainAddress.objects.create(
1✔
1350
            user=self.user, address="lower-case"
1351
        )
1352
        assert domain_address.block_list_emails is False
1✔
1353
        domain_address.block_list_emails = True
1✔
1354
        domain_address.save()
1✔
1355
        domain_address.refresh_from_db()
1✔
1356
        assert domain_address.block_list_emails is True
1✔
1357

1358
    def test_delete_increments_values_on_profile(self):
1✔
1359
        profile = self.user.profile
1✔
1360
        assert profile.num_address_deleted == 0
1✔
1361
        assert profile.num_email_forwarded_in_deleted_address == 0
1✔
1362
        assert profile.num_email_blocked_in_deleted_address == 0
1✔
1363
        assert profile.num_level_one_trackers_blocked_in_deleted_address == 0
1✔
1364
        assert profile.num_email_replied_in_deleted_address == 0
1✔
1365
        assert profile.num_email_spam_in_deleted_address == 0
1✔
1366
        assert profile.num_deleted_relay_addresses == 0
1✔
1367
        assert profile.num_deleted_domain_addresses == 0
1✔
1368

1369
        domain_address = DomainAddress.objects.create(
1✔
1370
            user=self.user,
1371
            address="lower-case",
1372
            num_forwarded=2,
1373
            num_blocked=3,
1374
            num_level_one_trackers_blocked=4,
1375
            num_replied=5,
1376
            num_spam=6,
1377
        )
1378
        domain_address.delete()
1✔
1379

1380
        profile.refresh_from_db()
1✔
1381
        assert profile.num_address_deleted == 1
1✔
1382
        assert profile.num_email_forwarded_in_deleted_address == 2
1✔
1383
        assert profile.num_email_blocked_in_deleted_address == 3
1✔
1384
        assert profile.num_level_one_trackers_blocked_in_deleted_address == 4
1✔
1385
        assert profile.num_email_replied_in_deleted_address == 5
1✔
1386
        assert profile.num_email_spam_in_deleted_address == 6
1✔
1387
        assert profile.num_deleted_relay_addresses == 0
1✔
1388
        assert profile.num_deleted_domain_addresses == 1
1✔
1389

1390
    def test_formerly_premium_user_clears_block_list_emails(self):
1✔
1391
        domain_address = DomainAddress.objects.create(
1✔
1392
            user=self.user, address="coupons", block_list_emails=True
1393
        )
1394
        domain_address.refresh_from_db()
1✔
1395
        assert domain_address.block_list_emails is True
1✔
1396

1397
        # Remove premium from user
1398
        assert (fxa_account := self.user.profile.fxa) is not None
1✔
1399
        fxa_account.extra_data["subscriptions"] = []
1✔
1400
        fxa_account.save()
1✔
1401
        assert not self.user.profile.has_premium
1✔
1402

1403
        domain_address.save()
1✔
1404
        assert domain_address.block_list_emails is False
1✔
1405

1406
    def test_storageless_user_cant_set_labels(self):
1✔
1407
        domain_address = DomainAddress.objects.create(
1✔
1408
            user=self.storageless_user, address="lower-case"
1409
        )
1410
        assert domain_address.description == ""
1✔
1411
        domain_address.description = "Arbitrary description"
1✔
1412
        domain_address.save()
1✔
1413
        domain_address.refresh_from_db()
1✔
1414
        assert domain_address.description == ""
1✔
1415

1416
    def test_clear_storage_with_update_fields(self) -> None:
1✔
1417
        """With update_fields, the stored data is cleared for storageless users."""
1418
        domain_address = DomainAddress.objects.create(
1✔
1419
            user=self.storageless_user, address="no-storage"
1420
        )
1421
        assert domain_address.used_on is None
1✔
1422
        assert domain_address.description == ""
1✔
1423

1424
        # Use QuerySet.update to avoid model save method
1425
        DomainAddress.objects.filter(id=domain_address.id).update(
1✔
1426
            description="the description",
1427
            used_on="https://example.com",
1428
        )
1429
        domain_address.refresh_from_db()
1✔
1430
        assert domain_address.description == "the description"
1✔
1431
        assert domain_address.used_on == "https://example.com"
1✔
1432

1433
        # Update a different field with update_fields to avoid full model save
1434
        new_last_used_at = datetime(2024, 1, 11, tzinfo=UTC)
1✔
1435
        domain_address.last_used_at = new_last_used_at
1✔
1436
        domain_address.save(update_fields={"last_used_at"})
1✔
1437

1438
        # Since .save() added to update_fields, the storage fields are cleared
1439
        domain_address.refresh_from_db()
1✔
1440
        assert domain_address.last_used_at == new_last_used_at
1✔
1441
        assert domain_address.description == ""
1✔
1442
        assert domain_address.used_on == ""
1✔
1443

1444
    def test_clear_block_list_emails_with_update_fields(self) -> None:
1✔
1445
        """
1446
        With update_fields, the block_list_emails flag is still cleared for free users.
1447
        """
1448
        domain_address = DomainAddress.objects.create(
1✔
1449
            user=self.user, address="block-list-emails", block_list_emails=True
1450
        )
1451

1452
        # Remove premium from user
1453
        assert (fxa_account := self.user.profile.fxa) is not None
1✔
1454
        fxa_account.extra_data["subscriptions"] = []
1✔
1455
        fxa_account.save()
1✔
1456
        assert not self.user.profile.has_premium
1✔
1457
        assert domain_address.block_list_emails
1✔
1458

1459
        # Update a different field with update_fields to avoid full model save
1460
        new_last_used_at = datetime(2024, 1, 12, tzinfo=UTC)
1✔
1461
        assert domain_address.last_used_at != new_last_used_at
1✔
1462
        domain_address.last_used_at = new_last_used_at
1✔
1463
        domain_address.save(update_fields={"last_used_at"})
1✔
1464

1465
        # Since .save() added to update_fields, block_list_emails flag is cleared
1466
        domain_address.refresh_from_db()
1✔
1467
        assert domain_address.last_used_at == new_last_used_at
1✔
1468
        assert not domain_address.block_list_emails
1✔
1469

1470
    def test_create_updates_profile_last_engagement(self) -> None:
1✔
1471
        DomainAddress.make_domain_address(self.user, address="create")
1✔
1472
        assert self.user.profile.last_engagement
1✔
1473
        pre_create_last_engagement = self.user.profile.last_engagement
1✔
1474

1475
        DomainAddress.make_domain_address(self.user, address="create2")
1✔
1476

1477
        self.user.profile.refresh_from_db()
1✔
1478
        assert self.user.profile.last_engagement > pre_create_last_engagement
1✔
1479

1480
    def test_save_does_not_update_profile_last_engagement(self) -> None:
1✔
1481
        domain_address = DomainAddress.make_domain_address(self.user, address="save")
1✔
1482
        assert self.user.profile.last_engagement
1✔
1483
        pre_save_last_engagement = self.user.profile.last_engagement
1✔
1484

1485
        domain_address.enabled = False
1✔
1486
        domain_address.save()
1✔
1487

1488
        self.user.profile.refresh_from_db()
1✔
1489
        assert self.user.profile.last_engagement == pre_save_last_engagement
1✔
1490

1491
    def test_delete_updates_profile_last_engagement(self) -> None:
1✔
1492
        domain_address = DomainAddress.make_domain_address(self.user, address="delete")
1✔
1493
        assert self.user.profile.last_engagement
1✔
1494
        pre_delete_last_engagement = self.user.profile.last_engagement
1✔
1495

1496
        domain_address.delete()
1✔
1497

1498
        self.user.profile.refresh_from_db()
1✔
1499
        assert self.user.profile.last_engagement > pre_delete_last_engagement
1✔
1500

1501
    def test_metrics_id(self):
1✔
1502
        address = DomainAddress.objects.create(user=self.user, address="metrics")
1✔
1503
        assert address.metrics_id == f"D{address.id}"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc