• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 9726c3df-005d-4656-8f72-db2728947c8a

24 Jun 2024 07:38PM CUT coverage: 85.323% (-0.02%) from 85.34%
9726c3df-005d-4656-8f72-db2728947c8a

push

circleci

web-flow
Merge pull request #4816 from mozilla/dependabot/npm_and_yarn/typescript-5.5.2

Bump typescript from 5.4.5 to 5.5.2

3990 of 5128 branches covered (77.81%)

Branch coverage included in aggregate %.

15758 of 18017 relevant lines covered (87.46%)

10.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.96
/emails/tests/models_tests.py
1
import random
1✔
2
from datetime import UTC, datetime, timedelta
1✔
3
from hashlib import sha256
1✔
4
from unittest import skip
1✔
5
from unittest.mock import Mock, patch
1✔
6
from uuid import uuid4
1✔
7

8
from django.conf import settings
1✔
9
from django.contrib.auth.models import User
1✔
10
from django.test import TestCase, override_settings
1✔
11

12
import pytest
1✔
13
from allauth.socialaccount.models import SocialAccount
1✔
14
from model_bakery import baker
1✔
15
from waffle.testutils import override_flag
1✔
16

17
from privaterelay.tests.utils import (
1✔
18
    make_free_test_user,
19
    make_premium_test_user,
20
    make_storageless_test_user,
21
    phone_subscription,
22
    premium_subscription,
23
    vpn_subscription,
24
)
25

26
from ..exceptions import (
1✔
27
    CannotMakeAddressException,
28
    CannotMakeSubdomainException,
29
    DomainAddrDuplicateException,
30
    DomainAddrUnavailableException,
31
)
32
from ..models import (
1✔
33
    AbuseMetrics,
34
    DeletedAddress,
35
    DomainAddress,
36
    Profile,
37
    RelayAddress,
38
    address_hash,
39
    get_domain_numerical,
40
)
41
from ..utils import get_domains_from_settings
1✔
42

43
if settings.PHONES_ENABLED:
1!
44
    from phones.models import RealPhone, RelayNumber
1✔
45

46

47
class AddressHashTest(TestCase):
1✔
48
    @override_settings(RELAY_FIREFOX_DOMAIN="firefox.com")
1✔
49
    def test_address_hash_without_subdomain_domain_firefox(self):
1✔
50
        address = "aaaaaaaaa"
1✔
51
        expected_hash = sha256(f"{address}".encode()).hexdigest()
1✔
52
        assert address_hash(address, domain="firefox.com") == expected_hash
1✔
53

54
    @override_settings(RELAY_FIREFOX_DOMAIN="firefox.com")
1✔
55
    def test_address_hash_without_subdomain_domain_not_firefoxz(self):
1✔
56
        non_default = "test.com"
1✔
57
        address = "aaaaaaaaa"
1✔
58
        expected_hash = sha256(f"{address}@{non_default}".encode()).hexdigest()
1✔
59
        assert address_hash(address, domain=non_default) == expected_hash
1✔
60

61
    def test_address_hash_with_subdomain(self):
1✔
62
        address = "aaaaaaaaa"
1✔
63
        subdomain = "test"
1✔
64
        domain = get_domains_from_settings().get("MOZMAIL_DOMAIN")
1✔
65
        expected_hash = sha256(f"{address}@{subdomain}.{domain}".encode()).hexdigest()
1✔
66
        assert address_hash(address, subdomain, domain) == expected_hash
1✔
67

68
    def test_address_hash_with_additional_domain(self):
1✔
69
        address = "aaaaaaaaa"
1✔
70
        test_domain = "test.com"
1✔
71
        expected_hash = sha256(f"{address}@{test_domain}".encode()).hexdigest()
1✔
72
        assert address_hash(address, domain=test_domain) == expected_hash
1✔
73

74

75
class GetDomainNumericalTest(TestCase):
1✔
76
    def test_get_domain_numerical(self):
1✔
77
        assert get_domain_numerical("default.com") == 1
1✔
78
        assert get_domain_numerical("test.com") == 2
1✔
79

80

81
class RelayAddressTest(TestCase):
1✔
82
    def setUp(self):
1✔
83
        self.user = make_free_test_user()
1✔
84
        self.user_profile = self.user.profile
1✔
85
        self.premium_user = make_premium_test_user()
1✔
86
        self.premium_user_profile = self.premium_user.profile
1✔
87
        self.storageless_user = make_storageless_test_user()
1✔
88

89
    def test_create_assigns_to_user(self):
1✔
90
        relay_address = RelayAddress.objects.create(user=self.user_profile.user)
1✔
91
        assert relay_address.user == self.user_profile.user
1✔
92

93
    @override_settings(MAX_NUM_FREE_ALIASES=5, MAX_ADDRESS_CREATION_PER_DAY=10)
1✔
94
    def test_create_has_limit(self) -> None:
1✔
95
        baker.make(
1✔
96
            RelayAddress,
97
            user=self.premium_user,
98
            _quantity=settings.MAX_ADDRESS_CREATION_PER_DAY,
99
        )
100
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
101
            RelayAddress.objects.create(user=self.premium_user)
1✔
102
        assert exc_info.value.get_codes() == "account_is_paused"
1✔
103
        relay_address_count = RelayAddress.objects.filter(
1✔
104
            user=self.premium_user_profile.user
105
        ).count()
106
        assert relay_address_count == 10
1✔
107

108
    def test_create_premium_user_can_exceed_free_limit(self):
1✔
109
        baker.make(
1✔
110
            RelayAddress,
111
            user=self.premium_user,
112
            _quantity=settings.MAX_NUM_FREE_ALIASES + 1,
113
        )
114
        relay_addresses = RelayAddress.objects.filter(
1✔
115
            user=self.premium_user
116
        ).values_list("address", flat=True)
117
        assert len(relay_addresses) == settings.MAX_NUM_FREE_ALIASES + 1
1✔
118

119
    def test_create_non_premium_user_cannot_pass_free_limit(self) -> None:
1✔
120
        baker.make(
1✔
121
            RelayAddress, user=self.user, _quantity=settings.MAX_NUM_FREE_ALIASES
122
        )
123
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
124
            RelayAddress.objects.create(user=self.user_profile.user)
1✔
125
        assert exc_info.value.get_codes() == "free_tier_limit"
1✔
126
        relay_addresses = RelayAddress.objects.filter(
1✔
127
            user=self.user_profile.user
128
        ).values_list("address", flat=True)
129
        assert len(relay_addresses) == settings.MAX_NUM_FREE_ALIASES
1✔
130

131
    @skip(reason="ignore test for code path that we don't actually use")
1✔
132
    def test_create_with_specified_domain(self):
1✔
133
        relay_address = RelayAddress.objects.create(
×
134
            user=self.user_profile.user, domain=2
135
        )
136
        assert relay_address.domain == 2
×
137
        assert relay_address.get_domain_display() == "MOZMAIL_DOMAIN"
×
138
        assert relay_address.domain_value == "test.com"
×
139

140
    def test_create_updates_profile_last_engagement(self) -> None:
1✔
141
        relay_address = baker.make(RelayAddress, user=self.user, enabled=True)
1✔
142
        profile = relay_address.user.profile
1✔
143
        profile.refresh_from_db()
1✔
144
        assert profile.last_engagement
1✔
145
        pre_create_last_engagement = profile.last_engagement
1✔
146

147
        baker.make(RelayAddress, user=self.user, enabled=True)
1✔
148

149
        profile.refresh_from_db()
1✔
150
        assert profile.last_engagement > pre_create_last_engagement
1✔
151

152
    def test_save_does_not_update_profile_last_engagement(self) -> None:
1✔
153
        relay_address = baker.make(RelayAddress, user=self.user, enabled=True)
1✔
154
        profile = relay_address.user.profile
1✔
155
        profile.refresh_from_db()
1✔
156
        assert profile.last_engagement
1✔
157
        pre_save_last_engagement = profile.last_engagement
1✔
158

159
        relay_address.enabled = False
1✔
160
        relay_address.save()
1✔
161

162
        profile.refresh_from_db()
1✔
163
        assert profile.last_engagement == pre_save_last_engagement
1✔
164

165
    def test_delete_updates_profile_last_engagement(self) -> None:
1✔
166
        relay_address = baker.make(RelayAddress, user=self.user)
1✔
167
        profile = relay_address.user.profile
1✔
168
        profile.refresh_from_db()
1✔
169
        assert profile.last_engagement
1✔
170
        pre_delete_last_engagement = profile.last_engagement
1✔
171

172
        relay_address.delete()
1✔
173

174
        profile.refresh_from_db()
1✔
175
        assert profile.last_engagement > pre_delete_last_engagement
1✔
176

177
    def test_delete_adds_deleted_address_object(self):
1✔
178
        relay_address = baker.make(RelayAddress, user=self.user)
1✔
179
        address_hash = sha256(relay_address.full_address.encode("utf-8")).hexdigest()
1✔
180
        relay_address.delete()
1✔
181
        deleted_count = DeletedAddress.objects.filter(address_hash=address_hash).count()
1✔
182
        assert deleted_count == 1
1✔
183

184
    def test_delete_mozmail_deleted_address_object(self):
1✔
185
        relay_address = baker.make(RelayAddress, domain=2, user=self.user)
1✔
186
        address_hash = sha256(
1✔
187
            f"{relay_address.address}@{relay_address.domain_value}".encode()
188
        ).hexdigest()
189
        relay_address.delete()
1✔
190
        deleted_count = DeletedAddress.objects.filter(address_hash=address_hash).count()
1✔
191
        assert deleted_count == 1
1✔
192

193
    def test_delete_increments_values_on_profile(self):
1✔
194
        assert self.premium_user_profile.num_address_deleted == 0
1✔
195
        assert self.premium_user_profile.num_email_forwarded_in_deleted_address == 0
1✔
196
        assert self.premium_user_profile.num_email_blocked_in_deleted_address == 0
1✔
197
        assert (
1✔
198
            self.premium_user_profile.num_level_one_trackers_blocked_in_deleted_address
199
            == 0
200
        )
201
        assert self.premium_user_profile.num_email_replied_in_deleted_address == 0
1✔
202
        assert self.premium_user_profile.num_email_spam_in_deleted_address == 0
1✔
203
        assert self.premium_user_profile.num_deleted_relay_addresses == 0
1✔
204
        assert self.premium_user_profile.num_deleted_domain_addresses == 0
1✔
205

206
        relay_address = baker.make(
1✔
207
            RelayAddress,
208
            user=self.premium_user,
209
            num_forwarded=2,
210
            num_blocked=3,
211
            num_level_one_trackers_blocked=4,
212
            num_replied=5,
213
            num_spam=6,
214
        )
215
        relay_address.delete()
1✔
216

217
        self.premium_user_profile.refresh_from_db()
1✔
218
        assert self.premium_user_profile.num_address_deleted == 1
1✔
219
        assert self.premium_user_profile.num_email_forwarded_in_deleted_address == 2
1✔
220
        assert self.premium_user_profile.num_email_blocked_in_deleted_address == 3
1✔
221
        assert (
1✔
222
            self.premium_user_profile.num_level_one_trackers_blocked_in_deleted_address
223
            == 4
224
        )
225
        assert self.premium_user_profile.num_email_replied_in_deleted_address == 5
1✔
226
        assert self.premium_user_profile.num_email_spam_in_deleted_address == 6
1✔
227
        assert self.premium_user_profile.num_deleted_relay_addresses == 1
1✔
228
        assert self.premium_user_profile.num_deleted_domain_addresses == 0
1✔
229

230
    def test_relay_address_create_repeats_deleted_address_invalid(self):
1✔
231
        user = baker.make(User)
1✔
232
        address = "random-address"
1✔
233
        relay_address = RelayAddress.objects.create(user=user, address=address)
1✔
234
        relay_address.delete()
1✔
235
        repeat_deleted_relay_address = RelayAddress.objects.create(
1✔
236
            user=user, address=address
237
        )
238
        assert not repeat_deleted_relay_address.address == address
1✔
239

240
    @patch("emails.validators.badwords", return_value=[])
1✔
241
    @patch("emails.validators.blocklist", return_value=["blocked-word"])
1✔
242
    def test_address_contains_blocklist_invalid(
1✔
243
        self, mock_blocklist: Mock, mock_badwords: Mock
244
    ) -> None:
245
        blocked_word = "blocked-word"
1✔
246
        relay_address = RelayAddress.objects.create(
1✔
247
            user=baker.make(User), address=blocked_word
248
        )
249
        assert not relay_address.address == blocked_word
1✔
250

251
    def test_free_user_cant_set_block_list_emails(self):
1✔
252
        relay_address = RelayAddress.objects.create(user=self.user)
1✔
253
        relay_address.block_list_emails = True
1✔
254
        relay_address.save()
1✔
255
        relay_address.refresh_from_db()
1✔
256
        assert relay_address.block_list_emails is False
1✔
257

258
    def test_premium_user_can_set_block_list_emails(self):
1✔
259
        relay_address = RelayAddress.objects.create(user=self.premium_user)
1✔
260
        assert relay_address.block_list_emails is False
1✔
261
        relay_address.block_list_emails = True
1✔
262
        relay_address.save()
1✔
263
        relay_address.refresh_from_db()
1✔
264
        assert relay_address.block_list_emails is True
1✔
265

266
    def test_formerly_premium_user_clears_block_list_emails(self):
1✔
267
        relay_address = RelayAddress.objects.create(
1✔
268
            user=self.premium_user, block_list_emails=True
269
        )
270
        relay_address.refresh_from_db()
1✔
271
        assert relay_address.block_list_emails is True
1✔
272

273
        # Remove premium from user
274
        assert (fxa_account := self.premium_user.profile.fxa) is not None
1✔
275
        fxa_account.extra_data["subscriptions"] = []
1✔
276
        fxa_account.save()
1✔
277
        assert not self.premium_user.profile.has_premium
1✔
278

279
        relay_address.save()
1✔
280
        assert relay_address.block_list_emails is False
1✔
281

282
    def test_storageless_user_cant_set_label(self):
1✔
283
        relay_address = RelayAddress.objects.create(user=self.storageless_user)
1✔
284
        assert relay_address.description == ""
1✔
285
        assert relay_address.generated_for == ""
1✔
286
        assert relay_address.used_on in (None, "")
1✔
287
        relay_address.description = "Arbitrary description"
1✔
288
        relay_address.generated_for = "https://example.com"
1✔
289
        relay_address.used_on = "https://example.com"
1✔
290
        relay_address.save()
1✔
291
        relay_address.refresh_from_db()
1✔
292
        assert relay_address.description == ""
1✔
293
        assert relay_address.generated_for == ""
1✔
294
        assert relay_address.used_on in (None, "")
1✔
295

296
    def test_clear_storage_with_update_fields(self) -> None:
1✔
297
        """
298
        With update_fields, the stored data is still cleared for storageless users.
299
        """
300
        relay_address = RelayAddress.objects.create(user=self.storageless_user)
1✔
301
        assert relay_address.description == ""
1✔
302

303
        # Use QuerySet.update to avoid model save method
304
        RelayAddress.objects.filter(id=relay_address.id).update(
1✔
305
            description="the description",
306
            generated_for="https://example.com",
307
            used_on="https://example.com",
308
        )
309
        relay_address.refresh_from_db()
1✔
310
        assert relay_address.description == "the description"
1✔
311
        assert relay_address.generated_for == "https://example.com"
1✔
312
        assert relay_address.used_on == "https://example.com"
1✔
313

314
        # Update a different field with update_fields to avoid full model save
315
        new_last_used_at = datetime(2024, 1, 11, tzinfo=UTC)
1✔
316
        relay_address.last_used_at = new_last_used_at
1✔
317
        relay_address.save(update_fields={"last_used_at"})
1✔
318

319
        # Since .save() added to update_fields, the storage fields are cleared
320
        relay_address.refresh_from_db()
1✔
321
        assert relay_address.last_used_at == new_last_used_at
1✔
322
        assert relay_address.description == ""
1✔
323
        assert relay_address.generated_for == ""
1✔
324
        assert relay_address.used_on in ("", None)
1✔
325

326
    def test_clear_block_list_emails_with_update_fields(self) -> None:
1✔
327
        """
328
        With update_fields, the block_list_emails flag is still cleared for free users.
329
        """
330
        relay_address = RelayAddress.objects.create(user=self.user)
1✔
331
        assert not relay_address.block_list_emails
1✔
332

333
        # Use QuerySet.update to avoid model save method
334
        RelayAddress.objects.filter(id=relay_address.id).update(block_list_emails=True)
1✔
335
        relay_address.refresh_from_db()
1✔
336
        assert relay_address.block_list_emails
1✔
337

338
        # Update a different field with update_fields to avoid full model save
339
        new_last_used_at = datetime(2024, 1, 12, tzinfo=UTC)
1✔
340
        relay_address.last_used_at = new_last_used_at
1✔
341
        relay_address.save(update_fields={"last_used_at"})
1✔
342

343
        # Since .save() added to update_fields, block_list_emails flag is cleared
344
        relay_address.refresh_from_db()
1✔
345
        assert relay_address.last_used_at == new_last_used_at
1✔
346
        assert not relay_address.block_list_emails
1✔
347

348
    def test_metrics_id(self):
1✔
349
        relay_address = RelayAddress.objects.create(user=self.user)
1✔
350
        assert relay_address.metrics_id == f"R{relay_address.id}"
1✔
351

352

353
class ProfileTestCase(TestCase):
1✔
354
    """Base class for Profile tests."""
355

356
    def setUp(self) -> None:
1✔
357
        user = baker.make(User)
1✔
358
        self.profile = user.profile
1✔
359
        assert self.profile.server_storage is True
1✔
360

361
    def get_or_create_social_account(self) -> SocialAccount:
1✔
362
        """Get the test user's social account, creating if needed."""
363
        social_account, _ = SocialAccount.objects.get_or_create(
1✔
364
            user=self.profile.user,
365
            provider="fxa",
366
            defaults={
367
                "uid": str(uuid4()),
368
                "extra_data": {"avatar": "image.png", "subscriptions": []},
369
            },
370
        )
371
        return social_account
1✔
372

373
    def upgrade_to_premium(self) -> None:
1✔
374
        """Add an unlimited emails subscription to the user."""
375
        social_account = self.get_or_create_social_account()
1✔
376
        social_account.extra_data["subscriptions"].append(premium_subscription())
1✔
377
        social_account.save()
1✔
378

379
    def upgrade_to_phone(self) -> None:
1✔
380
        """Add a phone plan to the user."""
381
        social_account = self.get_or_create_social_account()
1✔
382
        social_account.extra_data["subscriptions"].append(phone_subscription())
1✔
383
        if not self.profile.has_premium:
1!
384
            social_account.extra_data["subscriptions"].append(premium_subscription())
1✔
385
        social_account.save()
1✔
386

387
    def upgrade_to_vpn_bundle(self) -> None:
1✔
388
        """Add a phone plan to the user."""
389
        social_account = self.get_or_create_social_account()
1✔
390
        social_account.extra_data["subscriptions"].append(vpn_subscription())
1✔
391
        if not self.profile.has_premium:
1!
392
            social_account.extra_data["subscriptions"].append(premium_subscription())
1✔
393
        if not self.profile.has_phone:
1!
394
            social_account.extra_data["subscriptions"].append(phone_subscription())
1✔
395
        social_account.save()
1✔
396

397

398
class ProfileBounceTestCase(ProfileTestCase):
1✔
399
    """Base class for Profile tests that check for bounces."""
400

401
    def set_hard_bounce(self) -> datetime:
1✔
402
        """
403
        Set a hard bounce pause for the profile, return the bounce time.
404

405
        This happens when the user's email server reports a hard bounce, such as
406
        saying the email does not exist.
407
        """
408
        self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
1✔
409
            days=settings.HARD_BOUNCE_ALLOWED_DAYS - 1
410
        )
411
        self.profile.save()
1✔
412
        return self.profile.last_hard_bounce
1✔
413

414
    def set_soft_bounce(self) -> datetime:
1✔
415
        """
416
        Set a soft bounce for the profile, return the bounce time.
417

418
        This happens when the user's email server reports a soft bounce, such as
419
        saying the user's mailbox is full.
420
        """
421
        self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
1✔
422
            days=settings.SOFT_BOUNCE_ALLOWED_DAYS - 1
423
        )
424
        self.profile.save()
1✔
425
        return self.profile.last_soft_bounce
1✔
426

427

428
class ProfileCheckBouncePause(ProfileBounceTestCase):
1✔
429
    """Tests for Profile.check_bounce_pause()"""
430

431
    def test_no_bounces(self) -> None:
1✔
432
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
433

434
        assert bounce_paused is False
1✔
435
        assert bounce_type == ""
1✔
436

437
    def test_hard_bounce_pending(self) -> None:
1✔
438
        self.set_hard_bounce()
1✔
439
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
440
        assert bounce_paused is True
1✔
441
        assert bounce_type == "hard"
1✔
442

443
    def test_soft_bounce_pending(self) -> None:
1✔
444
        self.set_soft_bounce()
1✔
445
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
446
        assert bounce_paused is True
1✔
447
        assert bounce_type == "soft"
1✔
448

449
    def test_hard_and_soft_bounce_pending_shows_hard(self) -> None:
1✔
450
        self.set_hard_bounce()
1✔
451
        self.set_soft_bounce()
1✔
452
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
453
        assert bounce_paused is True
1✔
454
        assert bounce_type == "hard"
1✔
455

456
    def test_hard_bounce_over_resets_timer(self) -> None:
1✔
457
        self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
1✔
458
            days=settings.HARD_BOUNCE_ALLOWED_DAYS + 1
459
        )
460
        self.profile.save()
1✔
461
        assert self.profile.last_hard_bounce is not None
1✔
462

463
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
464

465
        assert bounce_paused is False
1✔
466
        assert bounce_type == ""
1✔
467
        assert self.profile.last_hard_bounce is None
1✔
468

469
    def test_soft_bounce_over_resets_timer(self) -> None:
1✔
470
        self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
1✔
471
            days=settings.SOFT_BOUNCE_ALLOWED_DAYS + 1
472
        )
473
        self.profile.save()
1✔
474
        assert self.profile.last_soft_bounce is not None
1✔
475

476
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
477

478
        assert bounce_paused is False
1✔
479
        assert bounce_type == ""
1✔
480
        assert self.profile.last_soft_bounce is None
1✔
481

482

483
class ProfileNextEmailTryDateTest(ProfileBounceTestCase):
1✔
484
    """Tests for Profile.next_email_try"""
485

486
    def test_no_bounces_returns_today(self) -> None:
1✔
487
        assert self.profile.next_email_try.date() == datetime.now(UTC).date()
1✔
488

489
    def test_hard_bounce_returns_proper_datemath(self) -> None:
1✔
490
        last_hard_bounce = self.set_hard_bounce()
1✔
491
        expected_next_try_date = last_hard_bounce + timedelta(
1✔
492
            days=settings.HARD_BOUNCE_ALLOWED_DAYS
493
        )
494
        assert self.profile.next_email_try.date() == expected_next_try_date.date()
1✔
495

496
    def test_soft_bounce_returns_proper_datemath(self) -> None:
1✔
497
        last_soft_bounce = self.set_soft_bounce()
1✔
498
        expected_next_try_date = last_soft_bounce + timedelta(
1✔
499
            days=settings.SOFT_BOUNCE_ALLOWED_DAYS
500
        )
501
        assert self.profile.next_email_try.date() == expected_next_try_date.date()
1✔
502

503
    def test_hard_and_soft_bounce_returns_hard_datemath(self) -> None:
1✔
504
        last_soft_bounce = self.set_soft_bounce()
1✔
505
        last_hard_bounce = self.set_hard_bounce()
1✔
506
        assert last_soft_bounce != last_hard_bounce
1✔
507
        expected_next_try_date = last_hard_bounce + timedelta(
1✔
508
            days=settings.HARD_BOUNCE_ALLOWED_DAYS
509
        )
510
        assert self.profile.next_email_try.date() == expected_next_try_date.date()
1✔
511

512

513
class ProfileLastBounceDateTest(ProfileBounceTestCase):
1✔
514
    """Tests for Profile.last_bounce_date"""
515

516
    def test_no_bounces_returns_None(self) -> None:
1✔
517
        assert self.profile.last_bounce_date is None
1✔
518

519
    def test_soft_bounce_returns_its_date(self) -> None:
1✔
520
        self.set_soft_bounce()
1✔
521
        assert self.profile.last_bounce_date == self.profile.last_soft_bounce
1✔
522

523
    def test_hard_bounce_returns_its_date(self) -> None:
1✔
524
        self.set_hard_bounce()
1✔
525
        assert self.profile.last_bounce_date == self.profile.last_hard_bounce
1✔
526

527
    def test_hard_and_soft_bounces_returns_hard_date(self) -> None:
1✔
528
        self.set_soft_bounce()
1✔
529
        self.set_hard_bounce()
1✔
530
        assert self.profile.last_bounce_date == self.profile.last_hard_bounce
1✔
531

532

533
class ProfileHasPremiumTest(ProfileTestCase):
1✔
534
    """Tests for Profile.has_premium"""
535

536
    def test_default_False(self) -> None:
1✔
537
        assert self.profile.has_premium is False
1✔
538

539
    def test_premium_subscription_returns_True(self) -> None:
1✔
540
        self.upgrade_to_premium()
1✔
541
        assert self.profile.has_premium is True
1✔
542

543
    def test_phone_returns_True(self) -> None:
1✔
544
        self.upgrade_to_phone()
1✔
545
        assert self.profile.has_premium is True
1✔
546

547
    def test_vpn_bundle_returns_True(self) -> None:
1✔
548
        self.upgrade_to_vpn_bundle()
1✔
549
        assert self.profile.has_premium is True
1✔
550

551

552
class ProfileHasPhoneTest(ProfileTestCase):
1✔
553
    """Tests for Profile.has_phone"""
554

555
    def test_default_False(self) -> None:
1✔
556
        assert self.profile.has_phone is False
1✔
557

558
    def test_premium_subscription_returns_False(self) -> None:
1✔
559
        self.upgrade_to_premium()
1✔
560
        assert self.profile.has_phone is False
1✔
561

562
    def test_phone_returns_True(self) -> None:
1✔
563
        self.upgrade_to_phone()
1✔
564
        assert self.profile.has_phone is True
1✔
565

566
    def test_vpn_bundle_returns_True(self) -> None:
1✔
567
        self.upgrade_to_vpn_bundle()
1✔
568
        assert self.profile.has_phone is True
1✔
569

570

571
@pytest.mark.skipif(not settings.PHONES_ENABLED, reason="PHONES_ENABLED is False")
1✔
572
@override_settings(PHONES_NO_CLIENT_CALLS_IN_TEST=True)
1✔
573
class ProfileDatePhoneRegisteredTest(ProfileTestCase):
1✔
574
    """Tests for Profile.date_phone_registered"""
575

576
    def test_default_None(self) -> None:
1✔
577
        assert self.profile.date_phone_registered is None
1✔
578

579
    def test_real_phone_no_relay_number_returns_verified_date(self) -> None:
1✔
580
        self.upgrade_to_phone()
1✔
581
        datetime_now = datetime.now(UTC)
1✔
582
        RealPhone.objects.create(
1✔
583
            user=self.profile.user,
584
            number="+12223334444",
585
            verified=True,
586
            verified_date=datetime_now,
587
        )
588
        assert self.profile.date_phone_registered == datetime_now
1✔
589

590
    def test_real_phone_and_relay_number_w_created_at_returns_created_at_date(
1✔
591
        self,
592
    ) -> None:
593
        self.upgrade_to_phone()
1✔
594
        datetime_now = datetime.now(UTC)
1✔
595
        phone_user = self.profile.user
1✔
596
        RealPhone.objects.create(
1✔
597
            user=phone_user,
598
            number="+12223334444",
599
            verified=True,
600
            verified_date=datetime_now,
601
        )
602
        relay_number = RelayNumber.objects.create(user=phone_user)
1✔
603
        assert self.profile.date_phone_registered == relay_number.created_at
1✔
604

605
    def test_real_phone_and_relay_number_wo_created_at_returns_verified_date(
1✔
606
        self,
607
    ) -> None:
608
        self.upgrade_to_phone()
1✔
609
        datetime_now = datetime.now(UTC)
1✔
610
        phone_user = self.profile.user
1✔
611
        real_phone = RealPhone.objects.create(
1✔
612
            user=phone_user,
613
            number="+12223334444",
614
            verified=True,
615
            verified_date=datetime_now,
616
        )
617
        relay_number = RelayNumber.objects.create(user=phone_user)
1✔
618
        # since created_at is auto set, update to None
619
        relay_number.created_at = None
1✔
620
        relay_number.save()
1✔
621
        assert self.profile.date_phone_registered == real_phone.verified_date
1✔
622

623

624
class ProfileTotalMasksTest(ProfileTestCase):
1✔
625
    """Tests for Profile.total_masks"""
626

627
    def test_total_masks(self) -> None:
1✔
628
        self.upgrade_to_premium()
1✔
629
        self.profile.add_subdomain("totalmasks")
1✔
630
        assert self.profile.total_masks == 0
1✔
631
        num_relay_addresses = random.randint(0, 2)
1✔
632
        for _ in list(range(num_relay_addresses)):
1!
633
            baker.make(RelayAddress, user=self.profile.user)
×
634
        num_domain_addresses = random.randint(0, 2)
1✔
635
        for i in list(range(num_domain_addresses)):
1!
636
            baker.make(DomainAddress, user=self.profile.user, address=f"mask{i}")
×
637
        assert self.profile.total_masks == num_relay_addresses + num_domain_addresses
1✔
638

639

640
class ProfileAtMaskLimitTest(ProfileTestCase):
1✔
641
    """Tests for Profile.at_mask_limit"""
642

643
    def test_premium_user_returns_False(self) -> None:
1✔
644
        self.upgrade_to_premium()
1✔
645
        assert self.profile.at_mask_limit is False
1✔
646
        baker.make(
1✔
647
            RelayAddress,
648
            user=self.profile.user,
649
            _quantity=settings.MAX_NUM_FREE_ALIASES,
650
        )
651
        assert self.profile.at_mask_limit is False
1✔
652

653
    def test_free_user(self) -> None:
1✔
654
        assert self.profile.at_mask_limit is False
1✔
655
        baker.make(
1✔
656
            RelayAddress,
657
            user=self.profile.user,
658
            _quantity=settings.MAX_NUM_FREE_ALIASES,
659
        )
660
        assert self.profile.at_mask_limit is True
1✔
661

662

663
class ProfileAddSubdomainTest(ProfileTestCase):
1✔
664
    """Tests for Profile.add_subdomain()"""
665

666
    def test_new_unlimited_profile(self) -> None:
1✔
667
        self.upgrade_to_premium()
1✔
668
        assert self.profile.add_subdomain("newpremium") == "newpremium"
1✔
669

670
    def test_lowercases_subdomain_value(self) -> None:
1✔
671
        self.upgrade_to_premium()
1✔
672
        assert self.profile.add_subdomain("mIxEdcAsE") == "mixedcase"
1✔
673

674
    def test_non_premium_user_raises_exception(self) -> None:
1✔
675
        expected_msg = "error-premium-set-subdomain"
1✔
676
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
677
            self.profile.add_subdomain("test")
1✔
678

679
    def test_calling_again_raises_exception(self) -> None:
1✔
680
        self.upgrade_to_premium()
1✔
681
        subdomain = "test"
1✔
682
        self.profile.subdomain = subdomain
1✔
683
        self.profile.save()
1✔
684

685
        expected_msg = "error-premium-cannot-change-subdomain"
1✔
686
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
687
            self.profile.add_subdomain(subdomain)
1✔
688

689
    def test_badword_subdomain_raises_exception(self) -> None:
1✔
690
        self.upgrade_to_premium()
1✔
691
        expected_msg = "error-subdomain-not-available"
1✔
692
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
693
            self.profile.add_subdomain("angry")
1✔
694

695
    def test_blocked_word_subdomain_raises_exception(self) -> None:
1✔
696
        self.upgrade_to_premium()
1✔
697
        expected_msg = "error-subdomain-not-available"
1✔
698
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
699
            self.profile.add_subdomain("mozilla")
1✔
700

701
    def test_empty_subdomain_raises(self) -> None:
1✔
702
        self.upgrade_to_premium()
1✔
703
        expected_msg = "error-subdomain-cannot-be-empty-or-null"
1✔
704

705
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
706
            self.profile.add_subdomain("")
1✔
707

708
    def test_null_subdomain_raises(self) -> None:
1✔
709
        self.upgrade_to_premium()
1✔
710
        expected_msg = "error-subdomain-cannot-be-empty-or-null"
1✔
711

712
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
713
            self.profile.add_subdomain(None)
1✔
714

715
    def test_subdomain_with_space_at_end_raises(self) -> None:
1✔
716
        self.upgrade_to_premium()
1✔
717
        expected_msg = "error-subdomain-not-available"
1✔
718

719
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
720
            self.profile.add_subdomain("mydomain ")
1✔
721

722

723
class ProfileSaveTest(ProfileTestCase):
1✔
724
    """Tests for Profile.save()"""
725

726
    def test_lowercases_subdomain_value(self) -> None:
1✔
727
        self.upgrade_to_premium()
1✔
728
        self.profile.subdomain = "mIxEdcAsE"
1✔
729
        self.profile.save()
1✔
730
        assert self.profile.subdomain == "mixedcase"
1✔
731

732
    def test_lowercases_subdomain_value_with_update_fields(self) -> None:
1✔
733
        """With update_fields, the subdomain is still lowercased."""
734
        self.upgrade_to_premium()
1✔
735
        assert self.profile.subdomain is None
1✔
736

737
        # Use QuerySet.update to avoid model .save()
738
        Profile.objects.filter(id=self.profile.id).update(subdomain="mIxEdcAsE")
1✔
739
        self.profile.refresh_from_db()
1✔
740
        assert self.profile.subdomain == "mIxEdcAsE"
1✔
741

742
        # Update a different field with update_fields to avoid a full model save
743
        new_date_subscribed = datetime(2023, 3, 3, tzinfo=UTC)
1✔
744
        self.profile.date_subscribed = new_date_subscribed
1✔
745
        self.profile.save(update_fields={"date_subscribed"})
1✔
746

747
        # Since .save() added to update_fields, subdomain is now lowercase
748
        self.profile.refresh_from_db()
1✔
749
        assert self.profile.date_subscribed == new_date_subscribed
1✔
750
        assert self.profile.subdomain == "mixedcase"
1✔
751

752
    TEST_DESCRIPTION = "test description"
1✔
753
    TEST_USED_ON = TEST_GENERATED_FOR = "secret.com"
1✔
754

755
    def add_relay_address(self) -> RelayAddress:
1✔
756
        return baker.make(
1✔
757
            RelayAddress,
758
            user=self.profile.user,
759
            description=self.TEST_DESCRIPTION,
760
            generated_for=self.TEST_GENERATED_FOR,
761
            used_on=self.TEST_USED_ON,
762
        )
763

764
    def add_domain_address(self) -> DomainAddress:
1✔
765
        self.upgrade_to_premium()
1✔
766
        self.profile.subdomain = "somesubdomain"
1✔
767
        self.profile.save()
1✔
768
        return baker.make(
1✔
769
            DomainAddress,
770
            user=self.profile.user,
771
            address="localpart",
772
            description=self.TEST_DESCRIPTION,
773
            used_on=self.TEST_USED_ON,
774
        )
775

776
    def test_save_server_storage_true_doesnt_delete_data(self) -> None:
1✔
777
        relay_address = self.add_relay_address()
1✔
778
        self.profile.server_storage = True
1✔
779
        self.profile.save()
1✔
780

781
        relay_address.refresh_from_db()
1✔
782
        assert relay_address.description == self.TEST_DESCRIPTION
1✔
783
        assert relay_address.generated_for == self.TEST_GENERATED_FOR
1✔
784
        assert relay_address.used_on == self.TEST_USED_ON
1✔
785

786
    def test_save_server_storage_false_deletes_data(self) -> None:
1✔
787
        relay_address = self.add_relay_address()
1✔
788
        domain_address = self.add_domain_address()
1✔
789
        self.profile.server_storage = False
1✔
790
        self.profile.save()
1✔
791

792
        relay_address.refresh_from_db()
1✔
793
        domain_address.refresh_from_db()
1✔
794
        assert relay_address.description == ""
1✔
795
        assert relay_address.generated_for == ""
1✔
796
        assert relay_address.used_on == ""
1✔
797
        assert domain_address.description == ""
1✔
798
        assert domain_address.used_on == ""
1✔
799

800
    def add_four_relay_addresses(self, user: User | None = None) -> list[RelayAddress]:
1✔
801
        if user is None:
1✔
802
            user = self.profile.user
1✔
803
        return baker.make(
1✔
804
            RelayAddress,
805
            user=user,
806
            description=self.TEST_DESCRIPTION,
807
            generated_for=self.TEST_GENERATED_FOR,
808
            used_on=self.TEST_USED_ON,
809
            _quantity=4,
810
        )
811

812
    def test_save_server_storage_false_deletes_ALL_data(self) -> None:
1✔
813
        self.add_four_relay_addresses()
1✔
814
        self.profile.server_storage = False
1✔
815
        self.profile.save()
1✔
816

817
        for relay_address in RelayAddress.objects.filter(user=self.profile.user):
1✔
818
            assert relay_address.description == ""
1✔
819
            assert relay_address.generated_for == ""
1✔
820

821
    def test_save_server_storage_false_only_deletes_that_profiles_data(self) -> None:
1✔
822
        other_user = make_free_test_user()
1✔
823
        assert other_user.profile.server_storage is True
1✔
824
        self.add_four_relay_addresses()
1✔
825
        self.add_four_relay_addresses(user=other_user)
1✔
826
        self.profile.server_storage = False
1✔
827
        self.profile.save()
1✔
828

829
        for relay_address in RelayAddress.objects.filter(user=self.profile.user):
1✔
830
            assert relay_address.description == ""
1✔
831
            assert relay_address.generated_for == ""
1✔
832
            assert relay_address.used_on == ""
1✔
833

834
        for relay_address in RelayAddress.objects.filter(user=other_user):
1✔
835
            assert relay_address.description == self.TEST_DESCRIPTION
1✔
836
            assert relay_address.generated_for == self.TEST_GENERATED_FOR
1✔
837
            assert relay_address.used_on == self.TEST_USED_ON
1✔
838

839

840
class ProfileDisplayNameTest(ProfileTestCase):
1✔
841
    """Tests for Profile.display_name"""
842

843
    def test_exists(self) -> None:
1✔
844
        display_name = "Display Name"
1✔
845
        social_account = self.get_or_create_social_account()
1✔
846
        social_account.extra_data["displayName"] = display_name
1✔
847
        social_account.save()
1✔
848
        assert self.profile.display_name == display_name
1✔
849

850
    def test_display_name_does_not_exist(self) -> None:
1✔
851
        self.get_or_create_social_account()
1✔
852
        assert self.profile.display_name is None
1✔
853

854

855
class ProfileLanguageTest(ProfileTestCase):
1✔
856
    """Test Profile.language"""
857

858
    def test_no_fxa_extra_data_locale_returns_default_en(self) -> None:
1✔
859
        social_account = self.get_or_create_social_account()
1✔
860
        assert "locale" not in social_account.extra_data
1✔
861
        assert self.profile.language == "en"
1✔
862

863
    def test_no_fxa_locale_returns_default_en(self) -> None:
1✔
864
        assert self.profile.language == "en"
1✔
865

866
    def test_fxa_locale_de_returns_de(self) -> None:
1✔
867
        social_account = self.get_or_create_social_account()
1✔
868
        social_account.extra_data["locale"] = "de,en-US;q=0.9,en;q=0.8"
1✔
869
        social_account.save()
1✔
870
        assert self.profile.language == "de"
1✔
871

872

873
class ProfileFxaLocaleInPremiumCountryTest(ProfileTestCase):
1✔
874
    """Tests for Profile.fxa_locale_in_premium_country"""
875

876
    def set_fxa_locale(self, locale: str) -> None:
1✔
877
        social_account = self.get_or_create_social_account()
1✔
878
        social_account.extra_data["locale"] = locale
1✔
879
        social_account.save()
1✔
880

881
    def test_when_premium_available_returns_True(self) -> None:
1✔
882
        self.set_fxa_locale("de-DE,en-xx;q=0.9,en;q=0.8")
1✔
883
        assert self.profile.fxa_locale_in_premium_country is True
1✔
884

885
    def test_en_implies_premium_available(self) -> None:
1✔
886
        self.set_fxa_locale("en;q=0.8")
1✔
887
        assert self.profile.fxa_locale_in_premium_country is True
1✔
888

889
    def test_when_premium_unavailable_returns_False(self) -> None:
1✔
890
        self.set_fxa_locale("en-IN, en;q=0.8")
1✔
891
        assert self.profile.fxa_locale_in_premium_country is False
1✔
892

893
    def test_when_premium_available_by_language_code_returns_True(self) -> None:
1✔
894
        self.set_fxa_locale("de;q=0.8")
1✔
895
        assert self.profile.fxa_locale_in_premium_country is True
1✔
896

897
    def test_invalid_language_code_returns_False(self) -> None:
1✔
898
        self.set_fxa_locale("xx;q=0.8")
1✔
899
        assert self.profile.fxa_locale_in_premium_country is False
1✔
900

901
    def test_when_premium_unavailable_by_language_code_returns_False(self) -> None:
1✔
902
        self.set_fxa_locale("zh;q=0.8")
1✔
903
        assert self.profile.fxa_locale_in_premium_country is False
1✔
904

905
    def test_no_fxa_account_returns_False(self) -> None:
1✔
906
        assert self.profile.fxa_locale_in_premium_country is False
1✔
907

908
    def test_in_estonia(self):
1✔
909
        """Estonia (EE) was added in August 2023."""
910
        self.set_fxa_locale("et-ee,et;q=0.8")
1✔
911
        assert self.profile.fxa_locale_in_premium_country is True
1✔
912

913

914
class ProfileJoinedBeforePremiumReleaseTest(ProfileTestCase):
1✔
915
    """Tests for Profile.joined_before_premium_release"""
916

917
    def test_returns_True(self) -> None:
1✔
918
        before = "2021-10-18 17:00:00+00:00"
1✔
919
        self.profile.user.date_joined = datetime.fromisoformat(before)
1✔
920
        assert self.profile.joined_before_premium_release
1✔
921

922
    def test_returns_False(self) -> None:
1✔
923
        after = "2021-10-28 17:00:00+00:00"
1✔
924
        self.profile.user.date_joined = datetime.fromisoformat(after)
1✔
925
        assert self.profile.joined_before_premium_release is False
1✔
926

927

928
class ProfileDefaultsTest(ProfileTestCase):
1✔
929
    """Tests for default Profile values"""
930

931
    def test_user_created_after_premium_release_server_storage_True(self) -> None:
1✔
932
        assert self.profile.server_storage
1✔
933

934
    def test_emails_replied_new_user_aggregates_sum_of_replies_to_zero(self) -> None:
1✔
935
        assert self.profile.emails_replied == 0
1✔
936

937

938
class ProfileEmailsRepliedTest(ProfileTestCase):
1✔
939
    """Tests for Profile.emails_replied"""
940

941
    def test_premium_user_aggregates_replies_from_all_addresses(self) -> None:
1✔
942
        self.upgrade_to_premium()
1✔
943
        self.profile.subdomain = "test"
1✔
944
        self.profile.num_email_replied_in_deleted_address = 1
1✔
945
        self.profile.save()
1✔
946
        baker.make(RelayAddress, user=self.profile.user, num_replied=3)
1✔
947
        baker.make(
1✔
948
            DomainAddress, user=self.profile.user, address="lower-case", num_replied=5
949
        )
950

951
        assert self.profile.emails_replied == 9
1✔
952

953
    def test_free_user_aggregates_replies_from_relay_addresses(self) -> None:
1✔
954
        baker.make(RelayAddress, user=self.profile.user, num_replied=3)
1✔
955
        baker.make(RelayAddress, user=self.profile.user, num_replied=5)
1✔
956

957
        assert self.profile.emails_replied == 8
1✔
958

959

960
class ProfileUpdateAbuseMetricTest(ProfileTestCase):
1✔
961
    """Tests for Profile.update_abuse_metric()"""
962

963
    def setUp(self) -> None:
1✔
964
        super().setUp()
1✔
965
        self.get_or_create_social_account()
1✔
966
        self.abuse_metric = baker.make(AbuseMetrics, user=self.profile.user)
1✔
967

968
        patcher_logger = patch("emails.models.abuse_logger.info")
1✔
969
        self.mocked_abuse_info = patcher_logger.start()
1✔
970
        self.addCleanup(patcher_logger.stop)
1✔
971

972
        # Selectively patch datatime.now() for emails models
973
        # https://docs.python.org/3/library/unittest.mock-examples.html#partial-mocking
974
        patcher = patch("emails.models.datetime")
1✔
975
        mocked_datetime = patcher.start()
1✔
976
        self.addCleanup(patcher.stop)
1✔
977

978
        self.expected_now = datetime.now(UTC)
1✔
979
        mocked_datetime.combine.return_value = datetime.combine(
1✔
980
            datetime.now(UTC).date(), datetime.min.time()
981
        )
982
        mocked_datetime.now.return_value = self.expected_now
1✔
983
        mocked_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw)
1!
984

985
    @override_settings(MAX_FORWARDED_PER_DAY=5)
1✔
986
    def test_flags_profile_when_emails_forwarded_abuse_threshold_met(self) -> None:
1✔
987
        self.abuse_metric.num_email_forwarded_per_day = 4
1✔
988
        self.abuse_metric.save()
1✔
989
        assert self.profile.last_account_flagged is None
1✔
990

991
        self.profile.update_abuse_metric(email_forwarded=True)
1✔
992
        self.abuse_metric.refresh_from_db()
1✔
993

994
        assert self.profile.fxa
1✔
995
        self.mocked_abuse_info.assert_called_once_with(
1✔
996
            "Abuse flagged",
997
            extra={
998
                "uid": self.profile.fxa.uid,
999
                "flagged": self.expected_now.timestamp(),
1000
                "replies": 0,
1001
                "addresses": 0,
1002
                "forwarded": 5,
1003
                "forwarded_size_in_bytes": 0,
1004
            },
1005
        )
1006
        assert self.abuse_metric.num_email_forwarded_per_day == 5
1✔
1007
        assert self.profile.last_account_flagged == self.expected_now
1✔
1008

1009
    @override_settings(MAX_FORWARDED_EMAIL_SIZE_PER_DAY=100)
1✔
1010
    def test_flags_profile_when_forwarded_email_size_abuse_threshold_met(self) -> None:
1✔
1011
        self.abuse_metric.forwarded_email_size_per_day = 50
1✔
1012
        self.abuse_metric.save()
1✔
1013
        assert self.profile.last_account_flagged is None
1✔
1014

1015
        self.profile.update_abuse_metric(forwarded_email_size=50)
1✔
1016
        self.abuse_metric.refresh_from_db()
1✔
1017

1018
        assert self.profile.fxa
1✔
1019
        self.mocked_abuse_info.assert_called_once_with(
1✔
1020
            "Abuse flagged",
1021
            extra={
1022
                "uid": self.profile.fxa.uid,
1023
                "flagged": self.expected_now.timestamp(),
1024
                "replies": 0,
1025
                "addresses": 0,
1026
                "forwarded": 0,
1027
                "forwarded_size_in_bytes": 100,
1028
            },
1029
        )
1030
        assert self.abuse_metric.forwarded_email_size_per_day == 100
1✔
1031
        assert self.profile.last_account_flagged == self.expected_now
1✔
1032

1033

1034
class ProfileMetricsEnabledTest(ProfileTestCase):
1✔
1035
    def test_no_fxa_means_metrics_enabled(self) -> None:
1✔
1036
        assert not self.profile.fxa
1✔
1037
        assert self.profile.metrics_enabled
1✔
1038

1039
    def test_fxa_legacy_means_metrics_enabled(self) -> None:
1✔
1040
        self.get_or_create_social_account()
1✔
1041
        assert self.profile.fxa
1✔
1042
        assert "metricsEnabled" not in self.profile.fxa.extra_data
1✔
1043
        assert self.profile.metrics_enabled
1✔
1044

1045
    def test_fxa_opt_in_means_metrics_enabled(self) -> None:
1✔
1046
        social_account = self.get_or_create_social_account()
1✔
1047
        social_account.extra_data["metricsEnabled"] = True
1✔
1048
        social_account.save()
1✔
1049
        assert self.profile.fxa
1✔
1050
        assert self.profile.metrics_enabled
1✔
1051

1052
    def test_fxa_opt_out_means_metrics_disabled(self) -> None:
1✔
1053
        social_account = self.get_or_create_social_account()
1✔
1054
        social_account.extra_data["metricsEnabled"] = False
1✔
1055
        social_account.save()
1✔
1056
        assert self.profile.fxa
1✔
1057
        assert not self.profile.metrics_enabled
1✔
1058

1059

1060
class ProfilePlanTest(ProfileTestCase):
1✔
1061
    def test_free_user(self) -> None:
1✔
1062
        assert self.profile.plan == "free"
1✔
1063

1064
    def test_premium_user(self) -> None:
1✔
1065
        self.upgrade_to_premium()
1✔
1066
        assert self.profile.plan == "email"
1✔
1067

1068
    def test_phone_user(self) -> None:
1✔
1069
        self.upgrade_to_phone()
1✔
1070
        assert self.profile.plan == "phone"
1✔
1071

1072
    def test_vpn_bundle_user(self) -> None:
1✔
1073
        self.upgrade_to_vpn_bundle()
1✔
1074
        assert self.profile.plan == "bundle"
1✔
1075

1076

1077
class ProfilePlanTermTest(ProfileTestCase):
1✔
1078
    def test_free_user(self) -> None:
1✔
1079
        assert self.profile.plan_term is None
1✔
1080

1081
    def test_premium_user(self) -> None:
1✔
1082
        self.upgrade_to_premium()
1✔
1083
        assert self.profile.plan_term == "unknown"
1✔
1084

1085
    def test_phone_user(self) -> None:
1✔
1086
        self.upgrade_to_phone()
1✔
1087
        assert self.profile.plan_term == "unknown"
1✔
1088

1089
    def test_phone_user_1_month(self) -> None:
1✔
1090
        self.upgrade_to_phone()
1✔
1091
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1092

1093
        self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
1✔
1094
        assert self.profile.plan_term == "1_month"
1✔
1095

1096
    def test_phone_user_1_year(self) -> None:
1✔
1097
        self.upgrade_to_phone()
1✔
1098
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1099

1100
        self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
1✔
1101
        assert self.profile.plan_term == "1_year"
1✔
1102

1103
    def test_vpn_bundle_user(self) -> None:
1✔
1104
        self.upgrade_to_vpn_bundle()
1✔
1105
        assert self.profile.plan_term == "unknown"
1✔
1106

1107

1108
class ProfileMetricsPremiumStatus(ProfileTestCase):
1✔
1109
    def test_free_user(self):
1✔
1110
        assert self.profile.metrics_premium_status == "free"
1✔
1111

1112
    def test_premium_user(self) -> None:
1✔
1113
        self.upgrade_to_premium()
1✔
1114
        assert self.profile.metrics_premium_status == "email_unknown"
1✔
1115

1116
    def test_phone_user(self) -> None:
1✔
1117
        self.upgrade_to_phone()
1✔
1118
        assert self.profile.metrics_premium_status == "phone_unknown"
1✔
1119

1120
    def test_phone_user_1_month(self) -> None:
1✔
1121
        self.upgrade_to_phone()
1✔
1122
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1123

1124
        self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
1✔
1125
        assert self.profile.metrics_premium_status == "phone_1_month"
1✔
1126

1127
    def test_phone_user_1_year(self) -> None:
1✔
1128
        self.upgrade_to_phone()
1✔
1129
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1130

1131
        self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
1✔
1132
        assert self.profile.metrics_premium_status == "phone_1_year"
1✔
1133

1134
    def test_vpn_bundle_user(self) -> None:
1✔
1135
        self.upgrade_to_vpn_bundle()
1✔
1136
        assert self.profile.metrics_premium_status == "bundle_unknown"
1✔
1137

1138

1139
class DomainAddressTest(TestCase):
1✔
1140
    def setUp(self):
1✔
1141
        self.subdomain = "test"
1✔
1142
        self.user = make_premium_test_user()
1✔
1143
        self.storageless_user = make_storageless_test_user()
1✔
1144
        self.user.profile.subdomain = self.subdomain
1✔
1145
        self.user.profile.save()
1✔
1146

1147
    def test_make_domain_address_assigns_to_user(self):
1✔
1148
        domain_address = DomainAddress.make_domain_address(self.user, "test-assigns")
1✔
1149
        assert domain_address.user == self.user
1✔
1150

1151
    @skip(reason="test not reliable, look at FIXME comment")
1✔
1152
    def test_make_domain_address_makes_different_addresses(self):
1✔
1153
        # FIXME: sometimes this test will fail because it randomly generates
1154
        # alias with bad words. See make_domain_address for why this has
1155
        # not been fixed yet
1156
        for i in range(5):
×
1157
            domain_address = DomainAddress.make_domain_address(
×
1158
                self.user, f"test-different-{i}"
1159
            )
1160
            assert domain_address.first_emailed_at is None
×
1161
        domain_addresses = DomainAddress.objects.filter(user=self.user).values_list(
×
1162
            "address", flat=True
1163
        )
1164
        # checks that there are 5 unique DomainAddress
1165
        assert len(set(domain_addresses)) == 5
×
1166

1167
    def test_make_domain_address_makes_requested_address(self):
1✔
1168
        domain_address = DomainAddress.make_domain_address(self.user, "foobar")
1✔
1169
        assert domain_address.address == "foobar"
1✔
1170
        assert domain_address.first_emailed_at is None
1✔
1171

1172
    @override_settings(MAX_ADDRESS_CREATION_PER_DAY=10)
1✔
1173
    def test_make_domain_address_has_limit(self) -> None:
1✔
1174
        for i in range(10):
1✔
1175
            DomainAddress.make_domain_address(self.user, "foobar" + str(i))
1✔
1176
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1177
            DomainAddress.make_domain_address(self.user, "one-too-many")
1✔
1178
        assert exc_info.value.get_codes() == "account_is_paused"
1✔
1179
        domain_address_count = DomainAddress.objects.filter(user=self.user).count()
1✔
1180
        assert domain_address_count == 10
1✔
1181

1182
    def test_make_domain_address_makes_requested_address_via_email(self):
1✔
1183
        domain_address = DomainAddress.make_domain_address(self.user, "foobar", True)
1✔
1184
        assert domain_address.address == "foobar"
1✔
1185
        assert domain_address.first_emailed_at is not None
1✔
1186

1187
    def test_make_domain_address_non_premium_user(self) -> None:
1✔
1188
        non_premium_user = baker.make(User)
1✔
1189
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1190
            DomainAddress.make_domain_address(non_premium_user, "test-non-premium")
1✔
1191
        assert exc_info.value.get_codes() == "free_tier_no_subdomain_masks"
1✔
1192

1193
    def test_make_domain_address_can_make_blocklisted_address(self):
1✔
1194
        domain_address = DomainAddress.make_domain_address(self.user, "testing")
1✔
1195
        assert domain_address.address == "testing"
1✔
1196

1197
    def test_make_domain_address_valid_premium_user_with_no_subdomain(self) -> None:
1✔
1198
        user = baker.make(User)
1✔
1199
        baker.make(
1✔
1200
            SocialAccount,
1201
            user=user,
1202
            provider="fxa",
1203
            extra_data={"subscriptions": [premium_subscription()]},
1204
        )
1205
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1206
            DomainAddress.make_domain_address(user, "test-nosubdomain")
1✔
1207
        assert exc_info.value.get_codes() == "need_subdomain"
1✔
1208

1209
    def test_make_domain_address_dupe_of_existing_raises(self):
1✔
1210
        address = "same-address"
1✔
1211
        DomainAddress.make_domain_address(self.user, address=address)
1✔
1212
        with pytest.raises(DomainAddrDuplicateException) as exc_info:
1✔
1213
            DomainAddress.make_domain_address(self.user, address=address)
1✔
1214
        assert exc_info.value.get_codes() == "duplicate_address"
1✔
1215

1216
    @override_flag("custom_domain_management_redesign", active=False)
1✔
1217
    def test_make_domain_address_can_make_dupe_of_deleted(self):
1✔
1218
        address = "same-address"
1✔
1219
        domain_address = DomainAddress.make_domain_address(self.user, address=address)
1✔
1220
        domain_address_hash = address_hash(
1✔
1221
            domain_address.address,
1222
            domain_address.user_profile.subdomain,
1223
            domain_address.domain_value,
1224
        )
1225
        domain_address.delete()
1✔
1226
        dupe_domain_address = DomainAddress.make_domain_address(
1✔
1227
            self.user, address=address
1228
        )
1229
        assert (
1✔
1230
            DeletedAddress.objects.filter(address_hash=domain_address_hash).count() == 1
1231
        )
1232
        assert dupe_domain_address.full_address == domain_address.full_address
1✔
1233

1234
    @override_flag("custom_domain_management_redesign", active=True)
1✔
1235
    def test_make_domain_address_cannot_make_dupe_of_deleted(self):
1✔
1236
        address = "same-address"
1✔
1237
        domain_address = DomainAddress.make_domain_address(self.user, address=address)
1✔
1238
        domain_address_hash = address_hash(
1✔
1239
            domain_address.address,
1240
            domain_address.user_profile.subdomain,
1241
            domain_address.domain_value,
1242
        )
1243
        domain_address.delete()
1✔
1244
        with pytest.raises(DomainAddrUnavailableException) as exc_info:
1✔
1245
            DomainAddress.make_domain_address(self.user, address=address)
1✔
1246
        assert exc_info.value.get_codes() == "address_unavailable"
1✔
1247
        assert (
1✔
1248
            DeletedAddress.objects.filter(address_hash=domain_address_hash).count() == 1
1249
        )
1250

1251
    @patch("emails.models.address_default")
1✔
1252
    def test_make_domain_address_doesnt_randomly_generate_bad_word(
1✔
1253
        self, address_default_mocked: Mock
1254
    ) -> None:
1255
        address_default_mocked.return_value = "angry0123"
1✔
1256
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1257
            DomainAddress.make_domain_address(self.user)
1✔
1258
        assert exc_info.value.get_codes() == "address_unavailable"
1✔
1259

1260
    def test_delete_adds_deleted_address_object(self):
1✔
1261
        domain_address = baker.make(DomainAddress, address="lower-case", user=self.user)
1✔
1262
        domain_address_hash = sha256(
1✔
1263
            domain_address.full_address.encode("utf-8")
1264
        ).hexdigest()
1265
        domain_address.delete()
1✔
1266
        deleted_address_qs = DeletedAddress.objects.filter(
1✔
1267
            address_hash=domain_address_hash
1268
        )
1269
        assert deleted_address_qs.count() == 1
1✔
1270
        assert deleted_address_qs.get().address_hash == domain_address_hash
1✔
1271

1272
    def test_premium_user_can_set_block_list_emails(self):
1✔
1273
        domain_address = DomainAddress.objects.create(
1✔
1274
            user=self.user, address="lower-case"
1275
        )
1276
        assert domain_address.block_list_emails is False
1✔
1277
        domain_address.block_list_emails = True
1✔
1278
        domain_address.save()
1✔
1279
        domain_address.refresh_from_db()
1✔
1280
        assert domain_address.block_list_emails is True
1✔
1281

1282
    def test_delete_increments_values_on_profile(self):
1✔
1283
        profile = self.user.profile
1✔
1284
        assert profile.num_address_deleted == 0
1✔
1285
        assert profile.num_email_forwarded_in_deleted_address == 0
1✔
1286
        assert profile.num_email_blocked_in_deleted_address == 0
1✔
1287
        assert profile.num_level_one_trackers_blocked_in_deleted_address == 0
1✔
1288
        assert profile.num_email_replied_in_deleted_address == 0
1✔
1289
        assert profile.num_email_spam_in_deleted_address == 0
1✔
1290
        assert profile.num_deleted_relay_addresses == 0
1✔
1291
        assert profile.num_deleted_domain_addresses == 0
1✔
1292

1293
        domain_address = DomainAddress.objects.create(
1✔
1294
            user=self.user,
1295
            address="lower-case",
1296
            num_forwarded=2,
1297
            num_blocked=3,
1298
            num_level_one_trackers_blocked=4,
1299
            num_replied=5,
1300
            num_spam=6,
1301
        )
1302
        domain_address.delete()
1✔
1303

1304
        profile.refresh_from_db()
1✔
1305
        assert profile.num_address_deleted == 1
1✔
1306
        assert profile.num_email_forwarded_in_deleted_address == 2
1✔
1307
        assert profile.num_email_blocked_in_deleted_address == 3
1✔
1308
        assert profile.num_level_one_trackers_blocked_in_deleted_address == 4
1✔
1309
        assert profile.num_email_replied_in_deleted_address == 5
1✔
1310
        assert profile.num_email_spam_in_deleted_address == 6
1✔
1311
        assert profile.num_deleted_relay_addresses == 0
1✔
1312
        assert profile.num_deleted_domain_addresses == 1
1✔
1313

1314
    def test_formerly_premium_user_clears_block_list_emails(self):
1✔
1315
        domain_address = DomainAddress.objects.create(
1✔
1316
            user=self.user, address="coupons", block_list_emails=True
1317
        )
1318
        domain_address.refresh_from_db()
1✔
1319
        assert domain_address.block_list_emails is True
1✔
1320

1321
        # Remove premium from user
1322
        assert (fxa_account := self.user.profile.fxa) is not None
1✔
1323
        fxa_account.extra_data["subscriptions"] = []
1✔
1324
        fxa_account.save()
1✔
1325
        assert not self.user.profile.has_premium
1✔
1326

1327
        domain_address.save()
1✔
1328
        assert domain_address.block_list_emails is False
1✔
1329

1330
    def test_storageless_user_cant_set_labels(self):
1✔
1331
        domain_address = DomainAddress.objects.create(
1✔
1332
            user=self.storageless_user, address="lower-case"
1333
        )
1334
        assert domain_address.description == ""
1✔
1335
        domain_address.description = "Arbitrary description"
1✔
1336
        domain_address.save()
1✔
1337
        domain_address.refresh_from_db()
1✔
1338
        assert domain_address.description == ""
1✔
1339

1340
    def test_clear_storage_with_update_fields(self) -> None:
1✔
1341
        """With update_fields, the stored data is cleared for storageless users."""
1342
        domain_address = DomainAddress.objects.create(
1✔
1343
            user=self.storageless_user, address="no-storage"
1344
        )
1345
        assert domain_address.used_on is None
1✔
1346
        assert domain_address.description == ""
1✔
1347

1348
        # Use QuerySet.update to avoid model save method
1349
        DomainAddress.objects.filter(id=domain_address.id).update(
1✔
1350
            description="the description",
1351
            used_on="https://example.com",
1352
        )
1353
        domain_address.refresh_from_db()
1✔
1354
        assert domain_address.description == "the description"
1✔
1355
        assert domain_address.used_on == "https://example.com"
1✔
1356

1357
        # Update a different field with update_fields to avoid full model save
1358
        new_last_used_at = datetime(2024, 1, 11, tzinfo=UTC)
1✔
1359
        domain_address.last_used_at = new_last_used_at
1✔
1360
        domain_address.save(update_fields={"last_used_at"})
1✔
1361

1362
        # Since .save() added to update_fields, the storage fields are cleared
1363
        domain_address.refresh_from_db()
1✔
1364
        assert domain_address.last_used_at == new_last_used_at
1✔
1365
        assert domain_address.description == ""
1✔
1366
        assert domain_address.used_on == ""
1✔
1367

1368
    def test_clear_block_list_emails_with_update_fields(self) -> None:
1✔
1369
        """
1370
        With update_fields, the block_list_emails flag is still cleared for free users.
1371
        """
1372
        domain_address = DomainAddress.objects.create(
1✔
1373
            user=self.user, address="block-list-emails", block_list_emails=True
1374
        )
1375

1376
        # Remove premium from user
1377
        assert (fxa_account := self.user.profile.fxa) is not None
1✔
1378
        fxa_account.extra_data["subscriptions"] = []
1✔
1379
        fxa_account.save()
1✔
1380
        assert not self.user.profile.has_premium
1✔
1381
        assert domain_address.block_list_emails
1✔
1382

1383
        # Update a different field with update_fields to avoid full model save
1384
        new_last_used_at = datetime(2024, 1, 12, tzinfo=UTC)
1✔
1385
        assert domain_address.last_used_at != new_last_used_at
1✔
1386
        domain_address.last_used_at = new_last_used_at
1✔
1387
        domain_address.save(update_fields={"last_used_at"})
1✔
1388

1389
        # Since .save() added to update_fields, block_list_emails flag is cleared
1390
        domain_address.refresh_from_db()
1✔
1391
        assert domain_address.last_used_at == new_last_used_at
1✔
1392
        assert not domain_address.block_list_emails
1✔
1393

1394
    def test_create_updates_profile_last_engagement(self) -> None:
1✔
1395
        DomainAddress.make_domain_address(self.user, address="create")
1✔
1396
        assert self.user.profile.last_engagement
1✔
1397
        pre_create_last_engagement = self.user.profile.last_engagement
1✔
1398

1399
        DomainAddress.make_domain_address(self.user, address="create2")
1✔
1400

1401
        self.user.profile.refresh_from_db()
1✔
1402
        assert self.user.profile.last_engagement > pre_create_last_engagement
1✔
1403

1404
    def test_save_does_not_update_profile_last_engagement(self) -> None:
1✔
1405
        domain_address = DomainAddress.make_domain_address(self.user, address="save")
1✔
1406
        assert self.user.profile.last_engagement
1✔
1407
        pre_save_last_engagement = self.user.profile.last_engagement
1✔
1408

1409
        domain_address.enabled = False
1✔
1410
        domain_address.save()
1✔
1411

1412
        self.user.profile.refresh_from_db()
1✔
1413
        assert self.user.profile.last_engagement == pre_save_last_engagement
1✔
1414

1415
    def test_delete_updates_profile_last_engagement(self) -> None:
1✔
1416
        domain_address = DomainAddress.make_domain_address(self.user, address="delete")
1✔
1417
        assert self.user.profile.last_engagement
1✔
1418
        pre_delete_last_engagement = self.user.profile.last_engagement
1✔
1419

1420
        domain_address.delete()
1✔
1421

1422
        self.user.profile.refresh_from_db()
1✔
1423
        assert self.user.profile.last_engagement > pre_delete_last_engagement
1✔
1424

1425
    def test_metrics_id(self):
1✔
1426
        address = DomainAddress.objects.create(user=self.user, address="metrics")
1✔
1427
        assert address.metrics_id == f"D{address.id}"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc