• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 0a454dad-3898-4e02-a8a1-d37613c7be87

12 Jul 2024 04:19PM CUT coverage: 85.402% (-0.02%) from 85.419%
0a454dad-3898-4e02-a8a1-d37613c7be87

push

circleci

web-flow
Merge pull request #4856 from mozilla/MPP-3807-Remove-glean-checks-for-add-on-data

MPP-3807 Remove extension related queries on Glean mentrics

4083 of 5232 branches covered (78.04%)

Branch coverage included in aggregate %.

11 of 11 new or added lines in 2 files covered. (100.0%)

3 existing lines in 2 files now uncovered.

15908 of 18176 relevant lines covered (87.52%)

10.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.16
/emails/tests/models_tests.py
1
import random
1✔
2
from datetime import UTC, datetime, timedelta
1✔
3
from hashlib import sha256
1✔
4
from unittest import skip
1✔
5
from unittest.mock import Mock, patch
1✔
6
from uuid import uuid4
1✔
7

8
from django.conf import settings
1✔
9
from django.contrib.auth.models import User
1✔
10
from django.test import TestCase, override_settings
1✔
11

12
import pytest
1✔
13
from allauth.socialaccount.models import SocialAccount
1✔
14
from model_bakery import baker
1✔
15
from waffle.testutils import override_flag
1✔
16

17
from privaterelay.tests.utils import (
1✔
18
    make_free_test_user,
19
    make_premium_test_user,
20
    make_storageless_test_user,
21
    phone_subscription,
22
    premium_subscription,
23
    vpn_subscription,
24
)
25

26
from ..apps import BadWords
1✔
27
from ..exceptions import (
1✔
28
    CannotMakeAddressException,
29
    CannotMakeSubdomainException,
30
    DomainAddrDuplicateException,
31
    DomainAddrUnavailableException,
32
)
33
from ..models import (
1✔
34
    AbuseMetrics,
35
    DeletedAddress,
36
    DomainAddress,
37
    Profile,
38
    RelayAddress,
39
    address_hash,
40
    get_domain_numerical,
41
)
42
from ..utils import get_domains_from_settings
1✔
43

44
if settings.PHONES_ENABLED:
1!
45
    from phones.models import RealPhone, RelayNumber
1✔
46

47

48
class AddressHashTest(TestCase):
1✔
49
    @override_settings(RELAY_FIREFOX_DOMAIN="firefox.com")
1✔
50
    def test_address_hash_without_subdomain_domain_firefox(self):
1✔
51
        address = "aaaaaaaaa"
1✔
52
        expected_hash = sha256(f"{address}".encode()).hexdigest()
1✔
53
        assert address_hash(address, domain="firefox.com") == expected_hash
1✔
54

55
    @override_settings(RELAY_FIREFOX_DOMAIN="firefox.com")
1✔
56
    def test_address_hash_without_subdomain_domain_not_firefoxz(self):
1✔
57
        non_default = "test.com"
1✔
58
        address = "aaaaaaaaa"
1✔
59
        expected_hash = sha256(f"{address}@{non_default}".encode()).hexdigest()
1✔
60
        assert address_hash(address, domain=non_default) == expected_hash
1✔
61

62
    def test_address_hash_with_subdomain(self):
1✔
63
        address = "aaaaaaaaa"
1✔
64
        subdomain = "test"
1✔
65
        domain = get_domains_from_settings().get("MOZMAIL_DOMAIN")
1✔
66
        expected_hash = sha256(f"{address}@{subdomain}.{domain}".encode()).hexdigest()
1✔
67
        assert address_hash(address, subdomain, domain) == expected_hash
1✔
68

69
    def test_address_hash_with_additional_domain(self):
1✔
70
        address = "aaaaaaaaa"
1✔
71
        test_domain = "test.com"
1✔
72
        expected_hash = sha256(f"{address}@{test_domain}".encode()).hexdigest()
1✔
73
        assert address_hash(address, domain=test_domain) == expected_hash
1✔
74

75

76
class GetDomainNumericalTest(TestCase):
1✔
77
    def test_get_domain_numerical(self):
1✔
78
        assert get_domain_numerical("default.com") == 1
1✔
79
        assert get_domain_numerical("test.com") == 2
1✔
80

81

82
class RelayAddressTest(TestCase):
1✔
83
    def setUp(self):
1✔
84
        self.user = make_free_test_user()
1✔
85
        self.user_profile = self.user.profile
1✔
86
        self.premium_user = make_premium_test_user()
1✔
87
        self.premium_user_profile = self.premium_user.profile
1✔
88
        self.storageless_user = make_storageless_test_user()
1✔
89

90
    def test_create_assigns_to_user(self):
1✔
91
        relay_address = RelayAddress.objects.create(user=self.user_profile.user)
1✔
92
        assert relay_address.user == self.user_profile.user
1✔
93

94
    @override_settings(MAX_NUM_FREE_ALIASES=5, MAX_ADDRESS_CREATION_PER_DAY=10)
1✔
95
    def test_create_has_limit(self) -> None:
1✔
96
        baker.make(
1✔
97
            RelayAddress,
98
            user=self.premium_user,
99
            _quantity=settings.MAX_ADDRESS_CREATION_PER_DAY,
100
        )
101
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
102
            RelayAddress.objects.create(user=self.premium_user)
1✔
103
        assert exc_info.value.get_codes() == "account_is_paused"
1✔
104
        relay_address_count = RelayAddress.objects.filter(
1✔
105
            user=self.premium_user_profile.user
106
        ).count()
107
        assert relay_address_count == 10
1✔
108

109
    def test_create_premium_user_can_exceed_free_limit(self):
1✔
110
        baker.make(
1✔
111
            RelayAddress,
112
            user=self.premium_user,
113
            _quantity=settings.MAX_NUM_FREE_ALIASES + 1,
114
        )
115
        relay_addresses = RelayAddress.objects.filter(
1✔
116
            user=self.premium_user
117
        ).values_list("address", flat=True)
118
        assert len(relay_addresses) == settings.MAX_NUM_FREE_ALIASES + 1
1✔
119

120
    def test_create_non_premium_user_cannot_pass_free_limit(self) -> None:
1✔
121
        baker.make(
1✔
122
            RelayAddress, user=self.user, _quantity=settings.MAX_NUM_FREE_ALIASES
123
        )
124
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
125
            RelayAddress.objects.create(user=self.user_profile.user)
1✔
126
        assert exc_info.value.get_codes() == "free_tier_limit"
1✔
127
        relay_addresses = RelayAddress.objects.filter(
1✔
128
            user=self.user_profile.user
129
        ).values_list("address", flat=True)
130
        assert len(relay_addresses) == settings.MAX_NUM_FREE_ALIASES
1✔
131

132
    @skip(reason="ignore test for code path that we don't actually use")
1✔
133
    def test_create_with_specified_domain(self):
1✔
134
        relay_address = RelayAddress.objects.create(
×
135
            user=self.user_profile.user, domain=2
136
        )
137
        assert relay_address.domain == 2
×
138
        assert relay_address.get_domain_display() == "MOZMAIL_DOMAIN"
×
139
        assert relay_address.domain_value == "test.com"
×
140

141
    def test_create_updates_profile_last_engagement(self) -> None:
1✔
142
        relay_address = baker.make(RelayAddress, user=self.user, enabled=True)
1✔
143
        profile = relay_address.user.profile
1✔
144
        profile.refresh_from_db()
1✔
145
        assert profile.last_engagement
1✔
146
        pre_create_last_engagement = profile.last_engagement
1✔
147

148
        baker.make(RelayAddress, user=self.user, enabled=True)
1✔
149

150
        profile.refresh_from_db()
1✔
151
        assert profile.last_engagement > pre_create_last_engagement
1✔
152

153
    def test_save_does_not_update_profile_last_engagement(self) -> None:
1✔
154
        relay_address = baker.make(RelayAddress, user=self.user, enabled=True)
1✔
155
        profile = relay_address.user.profile
1✔
156
        profile.refresh_from_db()
1✔
157
        assert profile.last_engagement
1✔
158
        pre_save_last_engagement = profile.last_engagement
1✔
159

160
        relay_address.enabled = False
1✔
161
        relay_address.save()
1✔
162

163
        profile.refresh_from_db()
1✔
164
        assert profile.last_engagement == pre_save_last_engagement
1✔
165

166
    def test_delete_updates_profile_last_engagement(self) -> None:
1✔
167
        relay_address = baker.make(RelayAddress, user=self.user)
1✔
168
        profile = relay_address.user.profile
1✔
169
        profile.refresh_from_db()
1✔
170
        assert profile.last_engagement
1✔
171
        pre_delete_last_engagement = profile.last_engagement
1✔
172

173
        relay_address.delete()
1✔
174

175
        profile.refresh_from_db()
1✔
176
        assert profile.last_engagement > pre_delete_last_engagement
1✔
177

178
    def test_delete_adds_deleted_address_object(self):
1✔
179
        relay_address = baker.make(RelayAddress, user=self.user)
1✔
180
        address_hash = sha256(relay_address.full_address.encode("utf-8")).hexdigest()
1✔
181
        relay_address.delete()
1✔
182
        deleted_count = DeletedAddress.objects.filter(address_hash=address_hash).count()
1✔
183
        assert deleted_count == 1
1✔
184

185
    def test_delete_mozmail_deleted_address_object(self):
1✔
186
        relay_address = baker.make(RelayAddress, domain=2, user=self.user)
1✔
187
        address_hash = sha256(
1✔
188
            f"{relay_address.address}@{relay_address.domain_value}".encode()
189
        ).hexdigest()
190
        relay_address.delete()
1✔
191
        deleted_count = DeletedAddress.objects.filter(address_hash=address_hash).count()
1✔
192
        assert deleted_count == 1
1✔
193

194
    def test_delete_increments_values_on_profile(self):
1✔
195
        assert self.premium_user_profile.num_address_deleted == 0
1✔
196
        assert self.premium_user_profile.num_email_forwarded_in_deleted_address == 0
1✔
197
        assert self.premium_user_profile.num_email_blocked_in_deleted_address == 0
1✔
198
        assert (
1✔
199
            self.premium_user_profile.num_level_one_trackers_blocked_in_deleted_address
200
            == 0
201
        )
202
        assert self.premium_user_profile.num_email_replied_in_deleted_address == 0
1✔
203
        assert self.premium_user_profile.num_email_spam_in_deleted_address == 0
1✔
204
        assert self.premium_user_profile.num_deleted_relay_addresses == 0
1✔
205
        assert self.premium_user_profile.num_deleted_domain_addresses == 0
1✔
206

207
        relay_address = baker.make(
1✔
208
            RelayAddress,
209
            user=self.premium_user,
210
            num_forwarded=2,
211
            num_blocked=3,
212
            num_level_one_trackers_blocked=4,
213
            num_replied=5,
214
            num_spam=6,
215
        )
216
        relay_address.delete()
1✔
217

218
        self.premium_user_profile.refresh_from_db()
1✔
219
        assert self.premium_user_profile.num_address_deleted == 1
1✔
220
        assert self.premium_user_profile.num_email_forwarded_in_deleted_address == 2
1✔
221
        assert self.premium_user_profile.num_email_blocked_in_deleted_address == 3
1✔
222
        assert (
1✔
223
            self.premium_user_profile.num_level_one_trackers_blocked_in_deleted_address
224
            == 4
225
        )
226
        assert self.premium_user_profile.num_email_replied_in_deleted_address == 5
1✔
227
        assert self.premium_user_profile.num_email_spam_in_deleted_address == 6
1✔
228
        assert self.premium_user_profile.num_deleted_relay_addresses == 1
1✔
229
        assert self.premium_user_profile.num_deleted_domain_addresses == 0
1✔
230

231
    def test_relay_address_create_repeats_deleted_address_invalid(self):
1✔
232
        user = baker.make(User)
1✔
233
        address = "random-address"
1✔
234
        relay_address = RelayAddress.objects.create(user=user, address=address)
1✔
235
        relay_address.delete()
1✔
236
        repeat_deleted_relay_address = RelayAddress.objects.create(
1✔
237
            user=user, address=address
238
        )
239
        assert not repeat_deleted_relay_address.address == address
1✔
240

241
    @patch("emails.validators.badwords", return_value=BadWords(short=set(), long=[]))
1✔
242
    @patch("emails.validators.blocklist", return_value=set(["blocked-word"]))
1✔
243
    def test_address_contains_blocklist_invalid(
1✔
244
        self, mock_blocklist: Mock, mock_badwords: Mock
245
    ) -> None:
246
        blocked_word = "blocked-word"
1✔
247
        relay_address = RelayAddress.objects.create(
1✔
248
            user=baker.make(User), address=blocked_word
249
        )
250
        assert not relay_address.address == blocked_word
1✔
251

252
    def test_free_user_cant_set_block_list_emails(self):
1✔
253
        relay_address = RelayAddress.objects.create(user=self.user)
1✔
254
        relay_address.block_list_emails = True
1✔
255
        relay_address.save()
1✔
256
        relay_address.refresh_from_db()
1✔
257
        assert relay_address.block_list_emails is False
1✔
258

259
    def test_premium_user_can_set_block_list_emails(self):
1✔
260
        relay_address = RelayAddress.objects.create(user=self.premium_user)
1✔
261
        assert relay_address.block_list_emails is False
1✔
262
        relay_address.block_list_emails = True
1✔
263
        relay_address.save()
1✔
264
        relay_address.refresh_from_db()
1✔
265
        assert relay_address.block_list_emails is True
1✔
266

267
    def test_formerly_premium_user_clears_block_list_emails(self):
1✔
268
        relay_address = RelayAddress.objects.create(
1✔
269
            user=self.premium_user, block_list_emails=True
270
        )
271
        relay_address.refresh_from_db()
1✔
272
        assert relay_address.block_list_emails is True
1✔
273

274
        # Remove premium from user
275
        assert (fxa_account := self.premium_user.profile.fxa) is not None
1✔
276
        fxa_account.extra_data["subscriptions"] = []
1✔
277
        fxa_account.save()
1✔
278
        assert not self.premium_user.profile.has_premium
1✔
279

280
        relay_address.save()
1✔
281
        assert relay_address.block_list_emails is False
1✔
282

283
    def test_storageless_user_cant_set_label(self):
1✔
284
        relay_address = RelayAddress.objects.create(user=self.storageless_user)
1✔
285
        assert relay_address.description == ""
1✔
286
        assert relay_address.generated_for == ""
1✔
287
        assert relay_address.used_on in (None, "")
1✔
288
        relay_address.description = "Arbitrary description"
1✔
289
        relay_address.generated_for = "https://example.com"
1✔
290
        relay_address.used_on = "https://example.com"
1✔
291
        relay_address.save()
1✔
292
        relay_address.refresh_from_db()
1✔
293
        assert relay_address.description == ""
1✔
294
        assert relay_address.generated_for == ""
1✔
295
        assert relay_address.used_on in (None, "")
1✔
296

297
    def test_clear_storage_with_update_fields(self) -> None:
1✔
298
        """
299
        With update_fields, the stored data is still cleared for storageless users.
300
        """
301
        relay_address = RelayAddress.objects.create(user=self.storageless_user)
1✔
302
        assert relay_address.description == ""
1✔
303

304
        # Use QuerySet.update to avoid model save method
305
        RelayAddress.objects.filter(id=relay_address.id).update(
1✔
306
            description="the description",
307
            generated_for="https://example.com",
308
            used_on="https://example.com",
309
        )
310
        relay_address.refresh_from_db()
1✔
311
        assert relay_address.description == "the description"
1✔
312
        assert relay_address.generated_for == "https://example.com"
1✔
313
        assert relay_address.used_on == "https://example.com"
1✔
314

315
        # Update a different field with update_fields to avoid full model save
316
        new_last_used_at = datetime(2024, 1, 11, tzinfo=UTC)
1✔
317
        relay_address.last_used_at = new_last_used_at
1✔
318
        relay_address.save(update_fields={"last_used_at"})
1✔
319

320
        # Since .save() added to update_fields, the storage fields are cleared
321
        relay_address.refresh_from_db()
1✔
322
        assert relay_address.last_used_at == new_last_used_at
1✔
323
        assert relay_address.description == ""
1✔
324
        assert relay_address.generated_for == ""
1✔
325
        assert relay_address.used_on in ("", None)
1✔
326

327
    def test_clear_block_list_emails_with_update_fields(self) -> None:
1✔
328
        """
329
        With update_fields, the block_list_emails flag is still cleared for free users.
330
        """
331
        relay_address = RelayAddress.objects.create(user=self.user)
1✔
332
        assert not relay_address.block_list_emails
1✔
333

334
        # Use QuerySet.update to avoid model save method
335
        RelayAddress.objects.filter(id=relay_address.id).update(block_list_emails=True)
1✔
336
        relay_address.refresh_from_db()
1✔
337
        assert relay_address.block_list_emails
1✔
338

339
        # Update a different field with update_fields to avoid full model save
340
        new_last_used_at = datetime(2024, 1, 12, tzinfo=UTC)
1✔
341
        relay_address.last_used_at = new_last_used_at
1✔
342
        relay_address.save(update_fields={"last_used_at"})
1✔
343

344
        # Since .save() added to update_fields, block_list_emails flag is cleared
345
        relay_address.refresh_from_db()
1✔
346
        assert relay_address.last_used_at == new_last_used_at
1✔
347
        assert not relay_address.block_list_emails
1✔
348

349
    def test_metrics_id(self):
1✔
350
        relay_address = RelayAddress.objects.create(user=self.user)
1✔
351
        assert relay_address.metrics_id == f"R{relay_address.id}"
1✔
352

353

354
class ProfileTestCase(TestCase):
1✔
355
    """Base class for Profile tests."""
356

357
    def setUp(self) -> None:
1✔
358
        user = baker.make(User)
1✔
359
        self.profile = user.profile
1✔
360
        assert self.profile.server_storage is True
1✔
361

362
    def get_or_create_social_account(self) -> SocialAccount:
1✔
363
        """Get the test user's social account, creating if needed."""
364
        social_account, _ = SocialAccount.objects.get_or_create(
1✔
365
            user=self.profile.user,
366
            provider="fxa",
367
            defaults={
368
                "uid": str(uuid4()),
369
                "extra_data": {"avatar": "image.png", "subscriptions": []},
370
            },
371
        )
372
        return social_account
1✔
373

374
    def upgrade_to_premium(self) -> None:
1✔
375
        """Add an unlimited emails subscription to the user."""
376
        social_account = self.get_or_create_social_account()
1✔
377
        social_account.extra_data["subscriptions"].append(premium_subscription())
1✔
378
        social_account.save()
1✔
379

380
    def upgrade_to_phone(self) -> None:
1✔
381
        """Add a phone plan to the user."""
382
        social_account = self.get_or_create_social_account()
1✔
383
        social_account.extra_data["subscriptions"].append(phone_subscription())
1✔
384
        if not self.profile.has_premium:
1!
385
            social_account.extra_data["subscriptions"].append(premium_subscription())
1✔
386
        social_account.save()
1✔
387

388
    def upgrade_to_vpn_bundle(self) -> None:
1✔
389
        """Add a phone plan to the user."""
390
        social_account = self.get_or_create_social_account()
1✔
391
        social_account.extra_data["subscriptions"].append(vpn_subscription())
1✔
392
        if not self.profile.has_premium:
1!
393
            social_account.extra_data["subscriptions"].append(premium_subscription())
1✔
394
        if not self.profile.has_phone:
1!
395
            social_account.extra_data["subscriptions"].append(phone_subscription())
1✔
396
        social_account.save()
1✔
397

398

399
class ProfileBounceTestCase(ProfileTestCase):
1✔
400
    """Base class for Profile tests that check for bounces."""
401

402
    def set_hard_bounce(self) -> datetime:
1✔
403
        """
404
        Set a hard bounce pause for the profile, return the bounce time.
405

406
        This happens when the user's email server reports a hard bounce, such as
407
        saying the email does not exist.
408
        """
409
        self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
1✔
410
            days=settings.HARD_BOUNCE_ALLOWED_DAYS - 1
411
        )
412
        self.profile.save()
1✔
413
        return self.profile.last_hard_bounce
1✔
414

415
    def set_soft_bounce(self) -> datetime:
1✔
416
        """
417
        Set a soft bounce for the profile, return the bounce time.
418

419
        This happens when the user's email server reports a soft bounce, such as
420
        saying the user's mailbox is full.
421
        """
422
        self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
1✔
423
            days=settings.SOFT_BOUNCE_ALLOWED_DAYS - 1
424
        )
425
        self.profile.save()
1✔
426
        return self.profile.last_soft_bounce
1✔
427

428

429
class ProfileCheckBouncePause(ProfileBounceTestCase):
1✔
430
    """Tests for Profile.check_bounce_pause()"""
431

432
    def test_no_bounces(self) -> None:
1✔
433
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
434

435
        assert bounce_paused is False
1✔
436
        assert bounce_type == ""
1✔
437

438
    def test_hard_bounce_pending(self) -> None:
1✔
439
        self.set_hard_bounce()
1✔
440
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
441
        assert bounce_paused is True
1✔
442
        assert bounce_type == "hard"
1✔
443

444
    def test_soft_bounce_pending(self) -> None:
1✔
445
        self.set_soft_bounce()
1✔
446
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
447
        assert bounce_paused is True
1✔
448
        assert bounce_type == "soft"
1✔
449

450
    def test_hard_and_soft_bounce_pending_shows_hard(self) -> None:
1✔
451
        self.set_hard_bounce()
1✔
452
        self.set_soft_bounce()
1✔
453
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
454
        assert bounce_paused is True
1✔
455
        assert bounce_type == "hard"
1✔
456

457
    def test_hard_bounce_over_resets_timer(self) -> None:
1✔
458
        self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
1✔
459
            days=settings.HARD_BOUNCE_ALLOWED_DAYS + 1
460
        )
461
        self.profile.save()
1✔
462
        assert self.profile.last_hard_bounce is not None
1✔
463

464
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
465

466
        assert bounce_paused is False
1✔
467
        assert bounce_type == ""
1✔
468
        assert self.profile.last_hard_bounce is None
1✔
469

470
    def test_soft_bounce_over_resets_timer(self) -> None:
1✔
471
        self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
1✔
472
            days=settings.SOFT_BOUNCE_ALLOWED_DAYS + 1
473
        )
474
        self.profile.save()
1✔
475
        assert self.profile.last_soft_bounce is not None
1✔
476

477
        bounce_paused, bounce_type = self.profile.check_bounce_pause()
1✔
478

479
        assert bounce_paused is False
1✔
480
        assert bounce_type == ""
1✔
481
        assert self.profile.last_soft_bounce is None
1✔
482

483

484
class ProfileNextEmailTryDateTest(ProfileBounceTestCase):
1✔
485
    """Tests for Profile.next_email_try"""
486

487
    def test_no_bounces_returns_today(self) -> None:
1✔
488
        assert self.profile.next_email_try.date() == datetime.now(UTC).date()
1✔
489

490
    def test_hard_bounce_returns_proper_datemath(self) -> None:
1✔
491
        last_hard_bounce = self.set_hard_bounce()
1✔
492
        expected_next_try_date = last_hard_bounce + timedelta(
1✔
493
            days=settings.HARD_BOUNCE_ALLOWED_DAYS
494
        )
495
        assert self.profile.next_email_try.date() == expected_next_try_date.date()
1✔
496

497
    def test_soft_bounce_returns_proper_datemath(self) -> None:
1✔
498
        last_soft_bounce = self.set_soft_bounce()
1✔
499
        expected_next_try_date = last_soft_bounce + timedelta(
1✔
500
            days=settings.SOFT_BOUNCE_ALLOWED_DAYS
501
        )
502
        assert self.profile.next_email_try.date() == expected_next_try_date.date()
1✔
503

504
    def test_hard_and_soft_bounce_returns_hard_datemath(self) -> None:
1✔
505
        last_soft_bounce = self.set_soft_bounce()
1✔
506
        last_hard_bounce = self.set_hard_bounce()
1✔
507
        assert last_soft_bounce != last_hard_bounce
1✔
508
        expected_next_try_date = last_hard_bounce + timedelta(
1✔
509
            days=settings.HARD_BOUNCE_ALLOWED_DAYS
510
        )
511
        assert self.profile.next_email_try.date() == expected_next_try_date.date()
1✔
512

513

514
class ProfileLastBounceDateTest(ProfileBounceTestCase):
1✔
515
    """Tests for Profile.last_bounce_date"""
516

517
    def test_no_bounces_returns_None(self) -> None:
1✔
518
        assert self.profile.last_bounce_date is None
1✔
519

520
    def test_soft_bounce_returns_its_date(self) -> None:
1✔
521
        self.set_soft_bounce()
1✔
522
        assert self.profile.last_bounce_date == self.profile.last_soft_bounce
1✔
523

524
    def test_hard_bounce_returns_its_date(self) -> None:
1✔
525
        self.set_hard_bounce()
1✔
526
        assert self.profile.last_bounce_date == self.profile.last_hard_bounce
1✔
527

528
    def test_hard_and_soft_bounces_returns_hard_date(self) -> None:
1✔
529
        self.set_soft_bounce()
1✔
530
        self.set_hard_bounce()
1✔
531
        assert self.profile.last_bounce_date == self.profile.last_hard_bounce
1✔
532

533

534
class ProfileHasPremiumTest(ProfileTestCase):
1✔
535
    """Tests for Profile.has_premium"""
536

537
    def test_default_False(self) -> None:
1✔
538
        assert self.profile.has_premium is False
1✔
539

540
    def test_premium_subscription_returns_True(self) -> None:
1✔
541
        self.upgrade_to_premium()
1✔
542
        assert self.profile.has_premium is True
1✔
543

544
    def test_phone_returns_True(self) -> None:
1✔
545
        self.upgrade_to_phone()
1✔
546
        assert self.profile.has_premium is True
1✔
547

548
    def test_vpn_bundle_returns_True(self) -> None:
1✔
549
        self.upgrade_to_vpn_bundle()
1✔
550
        assert self.profile.has_premium is True
1✔
551

552

553
class ProfileHasPhoneTest(ProfileTestCase):
1✔
554
    """Tests for Profile.has_phone"""
555

556
    def test_default_False(self) -> None:
1✔
557
        assert self.profile.has_phone is False
1✔
558

559
    def test_premium_subscription_returns_False(self) -> None:
1✔
560
        self.upgrade_to_premium()
1✔
561
        assert self.profile.has_phone is False
1✔
562

563
    def test_phone_returns_True(self) -> None:
1✔
564
        self.upgrade_to_phone()
1✔
565
        assert self.profile.has_phone is True
1✔
566

567
    def test_vpn_bundle_returns_True(self) -> None:
1✔
568
        self.upgrade_to_vpn_bundle()
1✔
569
        assert self.profile.has_phone is True
1✔
570

571

572
@pytest.mark.skipif(not settings.PHONES_ENABLED, reason="PHONES_ENABLED is False")
1✔
573
@override_settings(PHONES_NO_CLIENT_CALLS_IN_TEST=True)
1✔
574
class ProfileDatePhoneRegisteredTest(ProfileTestCase):
1✔
575
    """Tests for Profile.date_phone_registered"""
576

577
    def test_default_None(self) -> None:
1✔
578
        assert self.profile.date_phone_registered is None
1✔
579

580
    def test_real_phone_no_relay_number_returns_verified_date(self) -> None:
1✔
581
        self.upgrade_to_phone()
1✔
582
        datetime_now = datetime.now(UTC)
1✔
583
        RealPhone.objects.create(
1✔
584
            user=self.profile.user,
585
            number="+12223334444",
586
            verified=True,
587
            verified_date=datetime_now,
588
        )
589
        assert self.profile.date_phone_registered == datetime_now
1✔
590

591
    def test_real_phone_and_relay_number_w_created_at_returns_created_at_date(
1✔
592
        self,
593
    ) -> None:
594
        self.upgrade_to_phone()
1✔
595
        datetime_now = datetime.now(UTC)
1✔
596
        phone_user = self.profile.user
1✔
597
        RealPhone.objects.create(
1✔
598
            user=phone_user,
599
            number="+12223334444",
600
            verified=True,
601
            verified_date=datetime_now,
602
        )
603
        relay_number = RelayNumber.objects.create(user=phone_user)
1✔
604
        assert self.profile.date_phone_registered == relay_number.created_at
1✔
605

606
    def test_real_phone_and_relay_number_wo_created_at_returns_verified_date(
1✔
607
        self,
608
    ) -> None:
609
        self.upgrade_to_phone()
1✔
610
        datetime_now = datetime.now(UTC)
1✔
611
        phone_user = self.profile.user
1✔
612
        real_phone = RealPhone.objects.create(
1✔
613
            user=phone_user,
614
            number="+12223334444",
615
            verified=True,
616
            verified_date=datetime_now,
617
        )
618
        relay_number = RelayNumber.objects.create(user=phone_user)
1✔
619
        # since created_at is auto set, update to None
620
        relay_number.created_at = None
1✔
621
        relay_number.save()
1✔
622
        assert self.profile.date_phone_registered == real_phone.verified_date
1✔
623

624

625
class ProfileTotalMasksTest(ProfileTestCase):
1✔
626
    """Tests for Profile.total_masks"""
627

628
    def test_total_masks(self) -> None:
1✔
629
        self.upgrade_to_premium()
1✔
630
        self.profile.add_subdomain("totalmasks")
1✔
631
        assert self.profile.total_masks == 0
1✔
632
        num_relay_addresses = random.randint(0, 2)
1✔
633
        for _ in list(range(num_relay_addresses)):
1!
UNCOV
634
            baker.make(RelayAddress, user=self.profile.user)
×
635
        num_domain_addresses = random.randint(0, 2)
1✔
636
        for i in list(range(num_domain_addresses)):
1✔
637
            baker.make(DomainAddress, user=self.profile.user, address=f"mask{i}")
1✔
638
        assert self.profile.total_masks == num_relay_addresses + num_domain_addresses
1✔
639

640

641
class ProfileAtMaskLimitTest(ProfileTestCase):
1✔
642
    """Tests for Profile.at_mask_limit"""
643

644
    def test_premium_user_returns_False(self) -> None:
1✔
645
        self.upgrade_to_premium()
1✔
646
        assert self.profile.at_mask_limit is False
1✔
647
        baker.make(
1✔
648
            RelayAddress,
649
            user=self.profile.user,
650
            _quantity=settings.MAX_NUM_FREE_ALIASES,
651
        )
652
        assert self.profile.at_mask_limit is False
1✔
653

654
    def test_free_user(self) -> None:
1✔
655
        assert self.profile.at_mask_limit is False
1✔
656
        baker.make(
1✔
657
            RelayAddress,
658
            user=self.profile.user,
659
            _quantity=settings.MAX_NUM_FREE_ALIASES,
660
        )
661
        assert self.profile.at_mask_limit is True
1✔
662

663

664
class ProfileAddSubdomainTest(ProfileTestCase):
1✔
665
    """Tests for Profile.add_subdomain()"""
666

667
    def test_new_unlimited_profile(self) -> None:
1✔
668
        self.upgrade_to_premium()
1✔
669
        assert self.profile.add_subdomain("newpremium") == "newpremium"
1✔
670

671
    def test_lowercases_subdomain_value(self) -> None:
1✔
672
        self.upgrade_to_premium()
1✔
673
        assert self.profile.add_subdomain("mIxEdcAsE") == "mixedcase"
1✔
674

675
    def test_non_premium_user_raises_exception(self) -> None:
1✔
676
        expected_msg = "error-premium-set-subdomain"
1✔
677
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
678
            self.profile.add_subdomain("test")
1✔
679

680
    def test_calling_again_raises_exception(self) -> None:
1✔
681
        self.upgrade_to_premium()
1✔
682
        subdomain = "test"
1✔
683
        self.profile.subdomain = subdomain
1✔
684
        self.profile.save()
1✔
685

686
        expected_msg = "error-premium-cannot-change-subdomain"
1✔
687
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
688
            self.profile.add_subdomain(subdomain)
1✔
689

690
    def test_badword_subdomain_raises_exception(self) -> None:
1✔
691
        self.upgrade_to_premium()
1✔
692
        expected_msg = "error-subdomain-not-available"
1✔
693
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
694
            self.profile.add_subdomain("angry")
1✔
695

696
    def test_blocked_word_subdomain_raises_exception(self) -> None:
1✔
697
        self.upgrade_to_premium()
1✔
698
        expected_msg = "error-subdomain-not-available"
1✔
699
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
700
            self.profile.add_subdomain("mozilla")
1✔
701

702
    def test_empty_subdomain_raises(self) -> None:
1✔
703
        self.upgrade_to_premium()
1✔
704
        expected_msg = "error-subdomain-cannot-be-empty-or-null"
1✔
705

706
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
707
            self.profile.add_subdomain("")
1✔
708

709
    def test_null_subdomain_raises(self) -> None:
1✔
710
        self.upgrade_to_premium()
1✔
711
        expected_msg = "error-subdomain-cannot-be-empty-or-null"
1✔
712

713
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
714
            self.profile.add_subdomain(None)
1✔
715

716
    def test_subdomain_with_space_at_end_raises(self) -> None:
1✔
717
        self.upgrade_to_premium()
1✔
718
        expected_msg = "error-subdomain-not-available"
1✔
719

720
        with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
1✔
721
            self.profile.add_subdomain("mydomain ")
1✔
722

723

724
class ProfileSaveTest(ProfileTestCase):
1✔
725
    """Tests for Profile.save()"""
726

727
    def test_lowercases_subdomain_value(self) -> None:
1✔
728
        self.upgrade_to_premium()
1✔
729
        self.profile.subdomain = "mIxEdcAsE"
1✔
730
        self.profile.save()
1✔
731
        assert self.profile.subdomain == "mixedcase"
1✔
732

733
    def test_lowercases_subdomain_value_with_update_fields(self) -> None:
1✔
734
        """With update_fields, the subdomain is still lowercased."""
735
        self.upgrade_to_premium()
1✔
736
        assert self.profile.subdomain is None
1✔
737

738
        # Use QuerySet.update to avoid model .save()
739
        Profile.objects.filter(id=self.profile.id).update(subdomain="mIxEdcAsE")
1✔
740
        self.profile.refresh_from_db()
1✔
741
        assert self.profile.subdomain == "mIxEdcAsE"
1✔
742

743
        # Update a different field with update_fields to avoid a full model save
744
        new_date_subscribed = datetime(2023, 3, 3, tzinfo=UTC)
1✔
745
        self.profile.date_subscribed = new_date_subscribed
1✔
746
        self.profile.save(update_fields={"date_subscribed"})
1✔
747

748
        # Since .save() added to update_fields, subdomain is now lowercase
749
        self.profile.refresh_from_db()
1✔
750
        assert self.profile.date_subscribed == new_date_subscribed
1✔
751
        assert self.profile.subdomain == "mixedcase"
1✔
752

753
    TEST_DESCRIPTION = "test description"
1✔
754
    TEST_USED_ON = TEST_GENERATED_FOR = "secret.com"
1✔
755

756
    def add_relay_address(self) -> RelayAddress:
1✔
757
        return baker.make(
1✔
758
            RelayAddress,
759
            user=self.profile.user,
760
            description=self.TEST_DESCRIPTION,
761
            generated_for=self.TEST_GENERATED_FOR,
762
            used_on=self.TEST_USED_ON,
763
        )
764

765
    def add_domain_address(self) -> DomainAddress:
1✔
766
        self.upgrade_to_premium()
1✔
767
        self.profile.subdomain = "somesubdomain"
1✔
768
        self.profile.save()
1✔
769
        return baker.make(
1✔
770
            DomainAddress,
771
            user=self.profile.user,
772
            address="localpart",
773
            description=self.TEST_DESCRIPTION,
774
            used_on=self.TEST_USED_ON,
775
        )
776

777
    def test_save_server_storage_true_doesnt_delete_data(self) -> None:
1✔
778
        relay_address = self.add_relay_address()
1✔
779
        self.profile.server_storage = True
1✔
780
        self.profile.save()
1✔
781

782
        relay_address.refresh_from_db()
1✔
783
        assert relay_address.description == self.TEST_DESCRIPTION
1✔
784
        assert relay_address.generated_for == self.TEST_GENERATED_FOR
1✔
785
        assert relay_address.used_on == self.TEST_USED_ON
1✔
786

787
    def test_save_server_storage_false_deletes_data(self) -> None:
1✔
788
        relay_address = self.add_relay_address()
1✔
789
        domain_address = self.add_domain_address()
1✔
790
        self.profile.server_storage = False
1✔
791
        self.profile.save()
1✔
792

793
        relay_address.refresh_from_db()
1✔
794
        domain_address.refresh_from_db()
1✔
795
        assert relay_address.description == ""
1✔
796
        assert relay_address.generated_for == ""
1✔
797
        assert relay_address.used_on == ""
1✔
798
        assert domain_address.description == ""
1✔
799
        assert domain_address.used_on == ""
1✔
800

801
    def add_four_relay_addresses(self, user: User | None = None) -> list[RelayAddress]:
1✔
802
        if user is None:
1✔
803
            user = self.profile.user
1✔
804
        return baker.make(
1✔
805
            RelayAddress,
806
            user=user,
807
            description=self.TEST_DESCRIPTION,
808
            generated_for=self.TEST_GENERATED_FOR,
809
            used_on=self.TEST_USED_ON,
810
            _quantity=4,
811
        )
812

813
    def test_save_server_storage_false_deletes_ALL_data(self) -> None:
1✔
814
        self.add_four_relay_addresses()
1✔
815
        self.profile.server_storage = False
1✔
816
        self.profile.save()
1✔
817

818
        for relay_address in RelayAddress.objects.filter(user=self.profile.user):
1✔
819
            assert relay_address.description == ""
1✔
820
            assert relay_address.generated_for == ""
1✔
821

822
    def test_save_server_storage_false_only_deletes_that_profiles_data(self) -> None:
1✔
823
        other_user = make_free_test_user()
1✔
824
        assert other_user.profile.server_storage is True
1✔
825
        self.add_four_relay_addresses()
1✔
826
        self.add_four_relay_addresses(user=other_user)
1✔
827
        self.profile.server_storage = False
1✔
828
        self.profile.save()
1✔
829

830
        for relay_address in RelayAddress.objects.filter(user=self.profile.user):
1✔
831
            assert relay_address.description == ""
1✔
832
            assert relay_address.generated_for == ""
1✔
833
            assert relay_address.used_on == ""
1✔
834

835
        for relay_address in RelayAddress.objects.filter(user=other_user):
1✔
836
            assert relay_address.description == self.TEST_DESCRIPTION
1✔
837
            assert relay_address.generated_for == self.TEST_GENERATED_FOR
1✔
838
            assert relay_address.used_on == self.TEST_USED_ON
1✔
839

840

841
class ProfileDisplayNameTest(ProfileTestCase):
1✔
842
    """Tests for Profile.display_name"""
843

844
    def test_exists(self) -> None:
1✔
845
        display_name = "Display Name"
1✔
846
        social_account = self.get_or_create_social_account()
1✔
847
        social_account.extra_data["displayName"] = display_name
1✔
848
        social_account.save()
1✔
849
        assert self.profile.display_name == display_name
1✔
850

851
    def test_display_name_does_not_exist(self) -> None:
1✔
852
        self.get_or_create_social_account()
1✔
853
        assert self.profile.display_name is None
1✔
854

855

856
class ProfileLanguageTest(ProfileTestCase):
1✔
857
    """Test Profile.language"""
858

859
    def test_no_fxa_extra_data_locale_returns_default_en(self) -> None:
1✔
860
        social_account = self.get_or_create_social_account()
1✔
861
        assert "locale" not in social_account.extra_data
1✔
862
        assert self.profile.language == "en"
1✔
863

864
    def test_no_fxa_locale_returns_default_en(self) -> None:
1✔
865
        assert self.profile.language == "en"
1✔
866

867
    def test_fxa_locale_de_returns_de(self) -> None:
1✔
868
        social_account = self.get_or_create_social_account()
1✔
869
        social_account.extra_data["locale"] = "de,en-US;q=0.9,en;q=0.8"
1✔
870
        social_account.save()
1✔
871
        assert self.profile.language == "de"
1✔
872

873

874
class ProfileFxaLocaleInPremiumCountryTest(ProfileTestCase):
1✔
875
    """Tests for Profile.fxa_locale_in_premium_country"""
876

877
    def set_fxa_locale(self, locale: str) -> None:
1✔
878
        social_account = self.get_or_create_social_account()
1✔
879
        social_account.extra_data["locale"] = locale
1✔
880
        social_account.save()
1✔
881

882
    def test_when_premium_available_returns_True(self) -> None:
1✔
883
        self.set_fxa_locale("de-DE,en-xx;q=0.9,en;q=0.8")
1✔
884
        assert self.profile.fxa_locale_in_premium_country is True
1✔
885

886
    def test_en_implies_premium_available(self) -> None:
1✔
887
        self.set_fxa_locale("en;q=0.8")
1✔
888
        assert self.profile.fxa_locale_in_premium_country is True
1✔
889

890
    def test_when_premium_unavailable_returns_False(self) -> None:
1✔
891
        self.set_fxa_locale("en-IN, en;q=0.8")
1✔
892
        assert self.profile.fxa_locale_in_premium_country is False
1✔
893

894
    def test_when_premium_available_by_language_code_returns_True(self) -> None:
1✔
895
        self.set_fxa_locale("de;q=0.8")
1✔
896
        assert self.profile.fxa_locale_in_premium_country is True
1✔
897

898
    def test_invalid_language_code_returns_False(self) -> None:
1✔
899
        self.set_fxa_locale("xx;q=0.8")
1✔
900
        assert self.profile.fxa_locale_in_premium_country is False
1✔
901

902
    def test_when_premium_unavailable_by_language_code_returns_False(self) -> None:
1✔
903
        self.set_fxa_locale("zh;q=0.8")
1✔
904
        assert self.profile.fxa_locale_in_premium_country is False
1✔
905

906
    def test_no_fxa_account_returns_False(self) -> None:
1✔
907
        assert self.profile.fxa_locale_in_premium_country is False
1✔
908

909
    def test_in_estonia(self):
1✔
910
        """Estonia (EE) was added in August 2023."""
911
        self.set_fxa_locale("et-ee,et;q=0.8")
1✔
912
        assert self.profile.fxa_locale_in_premium_country is True
1✔
913

914

915
class ProfileJoinedBeforePremiumReleaseTest(ProfileTestCase):
1✔
916
    """Tests for Profile.joined_before_premium_release"""
917

918
    def test_returns_True(self) -> None:
1✔
919
        before = "2021-10-18 17:00:00+00:00"
1✔
920
        self.profile.user.date_joined = datetime.fromisoformat(before)
1✔
921
        assert self.profile.joined_before_premium_release
1✔
922

923
    def test_returns_False(self) -> None:
1✔
924
        after = "2021-10-28 17:00:00+00:00"
1✔
925
        self.profile.user.date_joined = datetime.fromisoformat(after)
1✔
926
        assert self.profile.joined_before_premium_release is False
1✔
927

928

929
class ProfileDefaultsTest(ProfileTestCase):
1✔
930
    """Tests for default Profile values"""
931

932
    def test_user_created_after_premium_release_server_storage_True(self) -> None:
1✔
933
        assert self.profile.server_storage
1✔
934

935
    def test_emails_replied_new_user_aggregates_sum_of_replies_to_zero(self) -> None:
1✔
936
        assert self.profile.emails_replied == 0
1✔
937

938

939
class ProfileEmailsRepliedTest(ProfileTestCase):
1✔
940
    """Tests for Profile.emails_replied"""
941

942
    def test_premium_user_aggregates_replies_from_all_addresses(self) -> None:
1✔
943
        self.upgrade_to_premium()
1✔
944
        self.profile.subdomain = "test"
1✔
945
        self.profile.num_email_replied_in_deleted_address = 1
1✔
946
        self.profile.save()
1✔
947
        baker.make(RelayAddress, user=self.profile.user, num_replied=3)
1✔
948
        baker.make(
1✔
949
            DomainAddress, user=self.profile.user, address="lower-case", num_replied=5
950
        )
951

952
        assert self.profile.emails_replied == 9
1✔
953

954
    def test_free_user_aggregates_replies_from_relay_addresses(self) -> None:
1✔
955
        baker.make(RelayAddress, user=self.profile.user, num_replied=3)
1✔
956
        baker.make(RelayAddress, user=self.profile.user, num_replied=5)
1✔
957

958
        assert self.profile.emails_replied == 8
1✔
959

960

961
class ProfileUpdateAbuseMetricTest(ProfileTestCase):
1✔
962
    """Tests for Profile.update_abuse_metric()"""
963

964
    def setUp(self) -> None:
1✔
965
        super().setUp()
1✔
966
        self.get_or_create_social_account()
1✔
967
        self.abuse_metric = baker.make(AbuseMetrics, user=self.profile.user)
1✔
968

969
        patcher_logger = patch("emails.models.abuse_logger.info")
1✔
970
        self.mocked_abuse_info = patcher_logger.start()
1✔
971
        self.addCleanup(patcher_logger.stop)
1✔
972

973
        # Selectively patch datatime.now() for emails models
974
        # https://docs.python.org/3/library/unittest.mock-examples.html#partial-mocking
975
        patcher = patch("emails.models.datetime")
1✔
976
        mocked_datetime = patcher.start()
1✔
977
        self.addCleanup(patcher.stop)
1✔
978

979
        self.expected_now = datetime.now(UTC)
1✔
980
        mocked_datetime.combine.return_value = datetime.combine(
1✔
981
            datetime.now(UTC).date(), datetime.min.time()
982
        )
983
        mocked_datetime.now.return_value = self.expected_now
1✔
984
        mocked_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw)
1!
985

986
    @override_settings(MAX_FORWARDED_PER_DAY=5)
1✔
987
    def test_flags_profile_when_emails_forwarded_abuse_threshold_met(self) -> None:
1✔
988
        self.abuse_metric.num_email_forwarded_per_day = 4
1✔
989
        self.abuse_metric.save()
1✔
990
        assert self.profile.last_account_flagged is None
1✔
991

992
        self.profile.update_abuse_metric(email_forwarded=True)
1✔
993
        self.abuse_metric.refresh_from_db()
1✔
994

995
        assert self.profile.fxa
1✔
996
        self.mocked_abuse_info.assert_called_once_with(
1✔
997
            "Abuse flagged",
998
            extra={
999
                "uid": self.profile.fxa.uid,
1000
                "flagged": self.expected_now.timestamp(),
1001
                "replies": 0,
1002
                "addresses": 0,
1003
                "forwarded": 5,
1004
                "forwarded_size_in_bytes": 0,
1005
            },
1006
        )
1007
        assert self.abuse_metric.num_email_forwarded_per_day == 5
1✔
1008
        assert self.profile.last_account_flagged == self.expected_now
1✔
1009

1010
    @override_settings(MAX_FORWARDED_EMAIL_SIZE_PER_DAY=100)
1✔
1011
    def test_flags_profile_when_forwarded_email_size_abuse_threshold_met(self) -> None:
1✔
1012
        self.abuse_metric.forwarded_email_size_per_day = 50
1✔
1013
        self.abuse_metric.save()
1✔
1014
        assert self.profile.last_account_flagged is None
1✔
1015

1016
        self.profile.update_abuse_metric(forwarded_email_size=50)
1✔
1017
        self.abuse_metric.refresh_from_db()
1✔
1018

1019
        assert self.profile.fxa
1✔
1020
        self.mocked_abuse_info.assert_called_once_with(
1✔
1021
            "Abuse flagged",
1022
            extra={
1023
                "uid": self.profile.fxa.uid,
1024
                "flagged": self.expected_now.timestamp(),
1025
                "replies": 0,
1026
                "addresses": 0,
1027
                "forwarded": 0,
1028
                "forwarded_size_in_bytes": 100,
1029
            },
1030
        )
1031
        assert self.abuse_metric.forwarded_email_size_per_day == 100
1✔
1032
        assert self.profile.last_account_flagged == self.expected_now
1✔
1033

1034

1035
class ProfileMetricsEnabledTest(ProfileTestCase):
1✔
1036
    def test_no_fxa_means_metrics_enabled(self) -> None:
1✔
1037
        assert not self.profile.fxa
1✔
1038
        assert self.profile.metrics_enabled
1✔
1039

1040
    def test_fxa_legacy_means_metrics_enabled(self) -> None:
1✔
1041
        self.get_or_create_social_account()
1✔
1042
        assert self.profile.fxa
1✔
1043
        assert "metricsEnabled" not in self.profile.fxa.extra_data
1✔
1044
        assert self.profile.metrics_enabled
1✔
1045

1046
    def test_fxa_opt_in_means_metrics_enabled(self) -> None:
1✔
1047
        social_account = self.get_or_create_social_account()
1✔
1048
        social_account.extra_data["metricsEnabled"] = True
1✔
1049
        social_account.save()
1✔
1050
        assert self.profile.fxa
1✔
1051
        assert self.profile.metrics_enabled
1✔
1052

1053
    def test_fxa_opt_out_means_metrics_disabled(self) -> None:
1✔
1054
        social_account = self.get_or_create_social_account()
1✔
1055
        social_account.extra_data["metricsEnabled"] = False
1✔
1056
        social_account.save()
1✔
1057
        assert self.profile.fxa
1✔
1058
        assert not self.profile.metrics_enabled
1✔
1059

1060

1061
class ProfilePlanTest(ProfileTestCase):
1✔
1062
    def test_free_user(self) -> None:
1✔
1063
        assert self.profile.plan == "free"
1✔
1064

1065
    def test_premium_user(self) -> None:
1✔
1066
        self.upgrade_to_premium()
1✔
1067
        assert self.profile.plan == "email"
1✔
1068

1069
    def test_phone_user(self) -> None:
1✔
1070
        self.upgrade_to_phone()
1✔
1071
        assert self.profile.plan == "phone"
1✔
1072

1073
    def test_vpn_bundle_user(self) -> None:
1✔
1074
        self.upgrade_to_vpn_bundle()
1✔
1075
        assert self.profile.plan == "bundle"
1✔
1076

1077

1078
class ProfilePlanTermTest(ProfileTestCase):
1✔
1079
    def test_free_user(self) -> None:
1✔
1080
        assert self.profile.plan_term is None
1✔
1081

1082
    def test_premium_user(self) -> None:
1✔
1083
        self.upgrade_to_premium()
1✔
1084
        assert self.profile.plan_term == "unknown"
1✔
1085

1086
    def test_phone_user(self) -> None:
1✔
1087
        self.upgrade_to_phone()
1✔
1088
        assert self.profile.plan_term == "unknown"
1✔
1089

1090
    def test_phone_user_1_month(self) -> None:
1✔
1091
        self.upgrade_to_phone()
1✔
1092
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1093

1094
        self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
1✔
1095
        assert self.profile.plan_term == "1_month"
1✔
1096

1097
    def test_phone_user_1_year(self) -> None:
1✔
1098
        self.upgrade_to_phone()
1✔
1099
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1100

1101
        self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
1✔
1102
        assert self.profile.plan_term == "1_year"
1✔
1103

1104
    def test_vpn_bundle_user(self) -> None:
1✔
1105
        self.upgrade_to_vpn_bundle()
1✔
1106
        assert self.profile.plan_term == "unknown"
1✔
1107

1108

1109
class ProfileMetricsPremiumStatus(ProfileTestCase):
1✔
1110
    def test_free_user(self):
1✔
1111
        assert self.profile.metrics_premium_status == "free"
1✔
1112

1113
    def test_premium_user(self) -> None:
1✔
1114
        self.upgrade_to_premium()
1✔
1115
        assert self.profile.metrics_premium_status == "email_unknown"
1✔
1116

1117
    def test_phone_user(self) -> None:
1✔
1118
        self.upgrade_to_phone()
1✔
1119
        assert self.profile.metrics_premium_status == "phone_unknown"
1✔
1120

1121
    def test_phone_user_1_month(self) -> None:
1✔
1122
        self.upgrade_to_phone()
1✔
1123
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1124

1125
        self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
1✔
1126
        assert self.profile.metrics_premium_status == "phone_1_month"
1✔
1127

1128
    def test_phone_user_1_year(self) -> None:
1✔
1129
        self.upgrade_to_phone()
1✔
1130
        self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
1✔
1131

1132
        self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
1✔
1133
        assert self.profile.metrics_premium_status == "phone_1_year"
1✔
1134

1135
    def test_vpn_bundle_user(self) -> None:
1✔
1136
        self.upgrade_to_vpn_bundle()
1✔
1137
        assert self.profile.metrics_premium_status == "bundle_unknown"
1✔
1138

1139

1140
class DomainAddressTest(TestCase):
1✔
1141
    def setUp(self):
1✔
1142
        self.subdomain = "test"
1✔
1143
        self.user = make_premium_test_user()
1✔
1144
        self.storageless_user = make_storageless_test_user()
1✔
1145
        self.user.profile.subdomain = self.subdomain
1✔
1146
        self.user.profile.save()
1✔
1147

1148
    def test_make_domain_address_assigns_to_user(self):
1✔
1149
        domain_address = DomainAddress.make_domain_address(self.user, "test-assigns")
1✔
1150
        assert domain_address.user == self.user
1✔
1151

1152
    @skip(reason="test not reliable, look at FIXME comment")
1✔
1153
    def test_make_domain_address_makes_different_addresses(self):
1✔
1154
        # FIXME: sometimes this test will fail because it randomly generates
1155
        # alias with bad words. See make_domain_address for why this has
1156
        # not been fixed yet
1157
        for i in range(5):
×
1158
            domain_address = DomainAddress.make_domain_address(
×
1159
                self.user, f"test-different-{i}"
1160
            )
1161
            assert domain_address.first_emailed_at is None
×
1162
        domain_addresses = DomainAddress.objects.filter(user=self.user).values_list(
×
1163
            "address", flat=True
1164
        )
1165
        # checks that there are 5 unique DomainAddress
1166
        assert len(set(domain_addresses)) == 5
×
1167

1168
    def test_make_domain_address_makes_requested_address(self):
1✔
1169
        domain_address = DomainAddress.make_domain_address(self.user, "foobar")
1✔
1170
        assert domain_address.address == "foobar"
1✔
1171
        assert domain_address.first_emailed_at is None
1✔
1172

1173
    @override_settings(MAX_ADDRESS_CREATION_PER_DAY=10)
1✔
1174
    def test_make_domain_address_has_limit(self) -> None:
1✔
1175
        for i in range(10):
1✔
1176
            DomainAddress.make_domain_address(self.user, "foobar" + str(i))
1✔
1177
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1178
            DomainAddress.make_domain_address(self.user, "one-too-many")
1✔
1179
        assert exc_info.value.get_codes() == "account_is_paused"
1✔
1180
        domain_address_count = DomainAddress.objects.filter(user=self.user).count()
1✔
1181
        assert domain_address_count == 10
1✔
1182

1183
    def test_make_domain_address_makes_requested_address_via_email(self):
1✔
1184
        domain_address = DomainAddress.make_domain_address(self.user, "foobar", True)
1✔
1185
        assert domain_address.address == "foobar"
1✔
1186
        assert domain_address.first_emailed_at is not None
1✔
1187

1188
    def test_make_domain_address_non_premium_user(self) -> None:
1✔
1189
        non_premium_user = baker.make(User)
1✔
1190
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1191
            DomainAddress.make_domain_address(non_premium_user, "test-non-premium")
1✔
1192
        assert exc_info.value.get_codes() == "free_tier_no_subdomain_masks"
1✔
1193

1194
    def test_make_domain_address_can_make_blocklisted_address(self):
1✔
1195
        domain_address = DomainAddress.make_domain_address(self.user, "testing")
1✔
1196
        assert domain_address.address == "testing"
1✔
1197

1198
    def test_make_domain_address_valid_premium_user_with_no_subdomain(self) -> None:
1✔
1199
        user = baker.make(User)
1✔
1200
        baker.make(
1✔
1201
            SocialAccount,
1202
            user=user,
1203
            provider="fxa",
1204
            extra_data={"subscriptions": [premium_subscription()]},
1205
        )
1206
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1207
            DomainAddress.make_domain_address(user, "test-nosubdomain")
1✔
1208
        assert exc_info.value.get_codes() == "need_subdomain"
1✔
1209

1210
    def test_make_domain_address_dupe_of_existing_raises(self):
1✔
1211
        address = "same-address"
1✔
1212
        DomainAddress.make_domain_address(self.user, address=address)
1✔
1213
        with pytest.raises(DomainAddrDuplicateException) as exc_info:
1✔
1214
            DomainAddress.make_domain_address(self.user, address=address)
1✔
1215
        assert exc_info.value.get_codes() == "duplicate_address"
1✔
1216

1217
    @override_flag("custom_domain_management_redesign", active=False)
1✔
1218
    def test_make_domain_address_can_make_dupe_of_deleted(self) -> None:
1✔
1219
        address = "same-address"
1✔
1220
        domain_address = DomainAddress.make_domain_address(self.user, address=address)
1✔
1221
        domain_address_hash = address_hash(
1✔
1222
            domain_address.address,
1223
            domain_address.user.profile.subdomain,
1224
            domain_address.domain_value,
1225
        )
1226
        domain_address.delete()
1✔
1227
        dupe_domain_address = DomainAddress.make_domain_address(
1✔
1228
            self.user, address=address
1229
        )
1230
        assert (
1✔
1231
            DeletedAddress.objects.filter(address_hash=domain_address_hash).count() == 1
1232
        )
1233
        assert dupe_domain_address.full_address == domain_address.full_address
1✔
1234

1235
    @override_flag("custom_domain_management_redesign", active=True)
1✔
1236
    def test_make_domain_address_cannot_make_dupe_of_deleted(self) -> None:
1✔
1237
        address = "same-address"
1✔
1238
        domain_address = DomainAddress.make_domain_address(self.user, address=address)
1✔
1239
        domain_address_hash = address_hash(
1✔
1240
            domain_address.address,
1241
            domain_address.user.profile.subdomain,
1242
            domain_address.domain_value,
1243
        )
1244
        domain_address.delete()
1✔
1245
        with pytest.raises(DomainAddrUnavailableException) as exc_info:
1✔
1246
            DomainAddress.make_domain_address(self.user, address=address)
1✔
1247
        assert exc_info.value.get_codes() == "address_unavailable"
1✔
1248
        assert (
1✔
1249
            DeletedAddress.objects.filter(address_hash=domain_address_hash).count() == 1
1250
        )
1251

1252
    @patch("emails.models.address_default")
1✔
1253
    def test_make_domain_address_doesnt_randomly_generate_bad_word(
1✔
1254
        self, address_default_mocked: Mock
1255
    ) -> None:
1256
        address_default_mocked.return_value = "angry0123"
1✔
1257
        with pytest.raises(CannotMakeAddressException) as exc_info:
1✔
1258
            DomainAddress.make_domain_address(self.user)
1✔
1259
        assert exc_info.value.get_codes() == "address_unavailable"
1✔
1260

1261
    def test_delete_adds_deleted_address_object(self):
1✔
1262
        domain_address = baker.make(DomainAddress, address="lower-case", user=self.user)
1✔
1263
        domain_address_hash = sha256(
1✔
1264
            domain_address.full_address.encode("utf-8")
1265
        ).hexdigest()
1266
        domain_address.delete()
1✔
1267
        deleted_address_qs = DeletedAddress.objects.filter(
1✔
1268
            address_hash=domain_address_hash
1269
        )
1270
        assert deleted_address_qs.count() == 1
1✔
1271
        assert deleted_address_qs.get().address_hash == domain_address_hash
1✔
1272

1273
    def test_premium_user_can_set_block_list_emails(self):
1✔
1274
        domain_address = DomainAddress.objects.create(
1✔
1275
            user=self.user, address="lower-case"
1276
        )
1277
        assert domain_address.block_list_emails is False
1✔
1278
        domain_address.block_list_emails = True
1✔
1279
        domain_address.save()
1✔
1280
        domain_address.refresh_from_db()
1✔
1281
        assert domain_address.block_list_emails is True
1✔
1282

1283
    def test_delete_increments_values_on_profile(self):
1✔
1284
        profile = self.user.profile
1✔
1285
        assert profile.num_address_deleted == 0
1✔
1286
        assert profile.num_email_forwarded_in_deleted_address == 0
1✔
1287
        assert profile.num_email_blocked_in_deleted_address == 0
1✔
1288
        assert profile.num_level_one_trackers_blocked_in_deleted_address == 0
1✔
1289
        assert profile.num_email_replied_in_deleted_address == 0
1✔
1290
        assert profile.num_email_spam_in_deleted_address == 0
1✔
1291
        assert profile.num_deleted_relay_addresses == 0
1✔
1292
        assert profile.num_deleted_domain_addresses == 0
1✔
1293

1294
        domain_address = DomainAddress.objects.create(
1✔
1295
            user=self.user,
1296
            address="lower-case",
1297
            num_forwarded=2,
1298
            num_blocked=3,
1299
            num_level_one_trackers_blocked=4,
1300
            num_replied=5,
1301
            num_spam=6,
1302
        )
1303
        domain_address.delete()
1✔
1304

1305
        profile.refresh_from_db()
1✔
1306
        assert profile.num_address_deleted == 1
1✔
1307
        assert profile.num_email_forwarded_in_deleted_address == 2
1✔
1308
        assert profile.num_email_blocked_in_deleted_address == 3
1✔
1309
        assert profile.num_level_one_trackers_blocked_in_deleted_address == 4
1✔
1310
        assert profile.num_email_replied_in_deleted_address == 5
1✔
1311
        assert profile.num_email_spam_in_deleted_address == 6
1✔
1312
        assert profile.num_deleted_relay_addresses == 0
1✔
1313
        assert profile.num_deleted_domain_addresses == 1
1✔
1314

1315
    def test_formerly_premium_user_clears_block_list_emails(self):
1✔
1316
        domain_address = DomainAddress.objects.create(
1✔
1317
            user=self.user, address="coupons", block_list_emails=True
1318
        )
1319
        domain_address.refresh_from_db()
1✔
1320
        assert domain_address.block_list_emails is True
1✔
1321

1322
        # Remove premium from user
1323
        assert (fxa_account := self.user.profile.fxa) is not None
1✔
1324
        fxa_account.extra_data["subscriptions"] = []
1✔
1325
        fxa_account.save()
1✔
1326
        assert not self.user.profile.has_premium
1✔
1327

1328
        domain_address.save()
1✔
1329
        assert domain_address.block_list_emails is False
1✔
1330

1331
    def test_storageless_user_cant_set_labels(self):
1✔
1332
        domain_address = DomainAddress.objects.create(
1✔
1333
            user=self.storageless_user, address="lower-case"
1334
        )
1335
        assert domain_address.description == ""
1✔
1336
        domain_address.description = "Arbitrary description"
1✔
1337
        domain_address.save()
1✔
1338
        domain_address.refresh_from_db()
1✔
1339
        assert domain_address.description == ""
1✔
1340

1341
    def test_clear_storage_with_update_fields(self) -> None:
1✔
1342
        """With update_fields, the stored data is cleared for storageless users."""
1343
        domain_address = DomainAddress.objects.create(
1✔
1344
            user=self.storageless_user, address="no-storage"
1345
        )
1346
        assert domain_address.used_on is None
1✔
1347
        assert domain_address.description == ""
1✔
1348

1349
        # Use QuerySet.update to avoid model save method
1350
        DomainAddress.objects.filter(id=domain_address.id).update(
1✔
1351
            description="the description",
1352
            used_on="https://example.com",
1353
        )
1354
        domain_address.refresh_from_db()
1✔
1355
        assert domain_address.description == "the description"
1✔
1356
        assert domain_address.used_on == "https://example.com"
1✔
1357

1358
        # Update a different field with update_fields to avoid full model save
1359
        new_last_used_at = datetime(2024, 1, 11, tzinfo=UTC)
1✔
1360
        domain_address.last_used_at = new_last_used_at
1✔
1361
        domain_address.save(update_fields={"last_used_at"})
1✔
1362

1363
        # Since .save() added to update_fields, the storage fields are cleared
1364
        domain_address.refresh_from_db()
1✔
1365
        assert domain_address.last_used_at == new_last_used_at
1✔
1366
        assert domain_address.description == ""
1✔
1367
        assert domain_address.used_on == ""
1✔
1368

1369
    def test_clear_block_list_emails_with_update_fields(self) -> None:
1✔
1370
        """
1371
        With update_fields, the block_list_emails flag is still cleared for free users.
1372
        """
1373
        domain_address = DomainAddress.objects.create(
1✔
1374
            user=self.user, address="block-list-emails", block_list_emails=True
1375
        )
1376

1377
        # Remove premium from user
1378
        assert (fxa_account := self.user.profile.fxa) is not None
1✔
1379
        fxa_account.extra_data["subscriptions"] = []
1✔
1380
        fxa_account.save()
1✔
1381
        assert not self.user.profile.has_premium
1✔
1382
        assert domain_address.block_list_emails
1✔
1383

1384
        # Update a different field with update_fields to avoid full model save
1385
        new_last_used_at = datetime(2024, 1, 12, tzinfo=UTC)
1✔
1386
        assert domain_address.last_used_at != new_last_used_at
1✔
1387
        domain_address.last_used_at = new_last_used_at
1✔
1388
        domain_address.save(update_fields={"last_used_at"})
1✔
1389

1390
        # Since .save() added to update_fields, block_list_emails flag is cleared
1391
        domain_address.refresh_from_db()
1✔
1392
        assert domain_address.last_used_at == new_last_used_at
1✔
1393
        assert not domain_address.block_list_emails
1✔
1394

1395
    def test_create_updates_profile_last_engagement(self) -> None:
1✔
1396
        DomainAddress.make_domain_address(self.user, address="create")
1✔
1397
        assert self.user.profile.last_engagement
1✔
1398
        pre_create_last_engagement = self.user.profile.last_engagement
1✔
1399

1400
        DomainAddress.make_domain_address(self.user, address="create2")
1✔
1401

1402
        self.user.profile.refresh_from_db()
1✔
1403
        assert self.user.profile.last_engagement > pre_create_last_engagement
1✔
1404

1405
    def test_save_does_not_update_profile_last_engagement(self) -> None:
1✔
1406
        domain_address = DomainAddress.make_domain_address(self.user, address="save")
1✔
1407
        assert self.user.profile.last_engagement
1✔
1408
        pre_save_last_engagement = self.user.profile.last_engagement
1✔
1409

1410
        domain_address.enabled = False
1✔
1411
        domain_address.save()
1✔
1412

1413
        self.user.profile.refresh_from_db()
1✔
1414
        assert self.user.profile.last_engagement == pre_save_last_engagement
1✔
1415

1416
    def test_delete_updates_profile_last_engagement(self) -> None:
1✔
1417
        domain_address = DomainAddress.make_domain_address(self.user, address="delete")
1✔
1418
        assert self.user.profile.last_engagement
1✔
1419
        pre_delete_last_engagement = self.user.profile.last_engagement
1✔
1420

1421
        domain_address.delete()
1✔
1422

1423
        self.user.profile.refresh_from_db()
1✔
1424
        assert self.user.profile.last_engagement > pre_delete_last_engagement
1✔
1425

1426
    def test_metrics_id(self):
1✔
1427
        address = DomainAddress.objects.create(user=self.user, address="metrics")
1✔
1428
        assert address.metrics_id == f"D{address.id}"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc