• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / d3128616-238d-446e-82c5-ab66cd38ceaf

09 May 2024 06:22PM CUT coverage: 84.07% (-0.6%) from 84.64%
d3128616-238d-446e-82c5-ab66cd38ceaf

push

circleci

web-flow
Merge pull request #4684 from mozilla/enable-flak8-bandit-checks-mpp-3802

fix MPP-3802: stop ignoring bandit security checks

3601 of 4734 branches covered (76.07%)

Branch coverage included in aggregate %.

74 of 158 new or added lines in 24 files covered. (46.84%)

5 existing lines in 5 files now uncovered.

14686 of 17018 relevant lines covered (86.3%)

10.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.28
/phones/models.py
1
from __future__ import annotations
1✔
2

3
import logging
1✔
4
import secrets
1✔
5
import string
1✔
6
from collections.abc import Iterator
1✔
7
from datetime import UTC, datetime, timedelta
1✔
8
from math import floor
1✔
9

10
from django.conf import settings
1✔
11
from django.contrib.auth.models import User
1✔
12
from django.core.cache import cache
1✔
13
from django.core.exceptions import BadRequest, ValidationError
1✔
14
from django.db import models
1✔
15
from django.db.migrations.recorder import MigrationRecorder
1✔
16
from django.db.models.signals import post_save
1✔
17
from django.dispatch.dispatcher import receiver
1✔
18
from django.urls import reverse
1✔
19

20
import phonenumbers
1✔
21
from twilio.base.exceptions import TwilioRestException
1✔
22
from twilio.rest import Client
1✔
23

24
from emails.utils import incr_if_enabled
1✔
25

26
from .apps import phones_config, twilio_client
1✔
27
from .iq_utils import send_iq_sms
1✔
28

29
logger = logging.getLogger("eventsinfo")
1✔
30

31

32
MAX_MINUTES_TO_VERIFY_REAL_PHONE = 5
1✔
33
LAST_CONTACT_TYPE_CHOICES = [
1✔
34
    ("call", "call"),
35
    ("text", "text"),
36
]
37

38

39
def verification_code_default():
1✔
40
    return str(secrets.randbelow(1000000)).zfill(6)
1✔
41

42

43
def verification_sent_date_default():
1✔
44
    return datetime.now(UTC)
1✔
45

46

47
def get_expired_unverified_realphone_records(number):
1✔
48
    return RealPhone.objects.filter(
1✔
49
        number=number,
50
        verified=False,
51
        verification_sent_date__lt=(
52
            datetime.now(UTC)
53
            - timedelta(0, 60 * settings.MAX_MINUTES_TO_VERIFY_REAL_PHONE)
54
        ),
55
    )
56

57

58
def get_pending_unverified_realphone_records(number):
1✔
59
    return RealPhone.objects.filter(
1✔
60
        number=number,
61
        verified=False,
62
        verification_sent_date__gt=(
63
            datetime.now(UTC)
64
            - timedelta(0, 60 * settings.MAX_MINUTES_TO_VERIFY_REAL_PHONE)
65
        ),
66
    )
67

68

69
def get_verified_realphone_records(user):
1✔
70
    return RealPhone.objects.filter(user=user, verified=True)
1✔
71

72

73
def get_verified_realphone_record(number):
1✔
74
    return RealPhone.objects.filter(number=number, verified=True).first()
1✔
75

76

77
def get_valid_realphone_verification_record(user, number, verification_code):
1✔
78
    return RealPhone.objects.filter(
1✔
79
        user=user,
80
        number=number,
81
        verification_code=verification_code,
82
        verification_sent_date__gt=(
83
            datetime.now(UTC)
84
            - timedelta(0, 60 * settings.MAX_MINUTES_TO_VERIFY_REAL_PHONE)
85
        ),
86
    ).first()
87

88

89
def get_last_text_sender(relay_number: RelayNumber) -> InboundContact | None:
1✔
90
    """
91
    Get the last text sender.
92

93
    MPP-2581 introduces a last_text_date column for determining the last sender.
94
    Before MPP-2581, the last_inbound_date with last_inbound_type=text was used.
95
    During the transition, look at both methods.
96
    """
97
    try:
1✔
98
        latest = InboundContact.objects.filter(
1✔
99
            relay_number=relay_number, last_text_date__isnull=False
100
        ).latest("last_text_date")
101
    except InboundContact.DoesNotExist:
1✔
102
        latest = None
1✔
103

104
    try:
1✔
105
        latest_by_old_method = InboundContact.objects.filter(
1✔
106
            relay_number=relay_number, last_inbound_type="text"
107
        ).latest("last_inbound_date")
108
    except InboundContact.DoesNotExist:
1✔
109
        latest_by_old_method = None
1✔
110

111
    if (latest is None and latest_by_old_method is not None) or (
1✔
112
        latest
113
        and latest_by_old_method
114
        and latest != latest_by_old_method
115
        and latest.last_text_date
116
        and latest_by_old_method.last_inbound_date > latest.last_text_date
117
    ):
118
        # Pre-MPP-2581 server handled the latest text message
119
        return latest_by_old_method
1✔
120

121
    return latest
1✔
122

123

124
def iq_fmt(e164_number: str) -> str:
1✔
125
    return "1" + str(phonenumbers.parse(e164_number, "E164").national_number)
×
126

127

128
class RealPhone(models.Model):
1✔
129
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
130
    number = models.CharField(max_length=15)
1✔
131
    verification_code = models.CharField(
1✔
132
        max_length=8, default=verification_code_default
133
    )
134
    verification_sent_date = models.DateTimeField(
1✔
135
        blank=True, null=True, default=verification_sent_date_default
136
    )
137
    verified = models.BooleanField(default=False)
1✔
138
    verified_date = models.DateTimeField(blank=True, null=True)
1✔
139
    country_code = models.CharField(max_length=2, default="US")
1✔
140

141
    class Meta:
1✔
142
        constraints = [
1✔
143
            models.UniqueConstraint(
144
                fields=["number", "verified"],
145
                condition=models.Q(verified=True),
146
                name="unique_verified_number",
147
            )
148
        ]
149

150
    def save(self, *args, **kwargs):
1✔
151
        # delete any expired unverified RealPhone records for this number
152
        # note: it doesn't matter which user is trying to create a new
153
        # RealPhone record - any expired unverified record for the number
154
        # should be deleted
155
        expired_verification_records = get_expired_unverified_realphone_records(
1✔
156
            self.number
157
        )
158
        expired_verification_records.delete()
1✔
159

160
        # We are not ready to support multiple real phone numbers per user,
161
        # so raise an exception if this save() would create a second
162
        # RealPhone record for the user
163
        user_verified_number_records = get_verified_realphone_records(self.user)
1✔
164
        for verified_number in user_verified_number_records:
1✔
165
            if (
1✔
166
                verified_number.number == self.number
167
                and verified_number.verification_code == self.verification_code
168
            ):
169
                # User is verifying the same number twice
170
                return super().save(*args, **kwargs)
1✔
171
            else:
172
                raise BadRequest("User already has a verified number.")
1✔
173

174
        # call super save to save into the DB
175
        # See also: realphone_post_save receiver below
176
        return super().save(*args, **kwargs)
1✔
177

178
    def mark_verified(self):
1✔
179
        incr_if_enabled("phones_RealPhone.mark_verified")
1✔
180
        self.verified = True
1✔
181
        self.verified_date = datetime.now(UTC)
1✔
182
        self.save(force_update=True)
1✔
183
        return self
1✔
184

185

186
@receiver(post_save, sender=RealPhone, dispatch_uid="realphone_post_save")
1✔
187
def realphone_post_save(sender, instance, created, **kwargs):
1✔
188
    # don't do anything if running migrations
189
    if type(instance) == MigrationRecorder.Migration:
1!
190
        return
×
191

192
    if created:
1✔
193
        # only send verification_code when creating new record
194
        incr_if_enabled("phones_RealPhone.post_save_created_send_verification")
1✔
195
        text_body = (
1✔
196
            f"Your Firefox Relay verification code is {instance.verification_code}"
197
        )
198
        if settings.PHONES_NO_CLIENT_CALLS_IN_TEST:
1✔
199
            return
1✔
200
        if settings.IQ_FOR_VERIFICATION:
1!
201
            send_iq_sms(instance.number, settings.IQ_MAIN_NUMBER, text_body)
×
202
            return
×
203
        client = twilio_client()
1✔
204
        client.messages.create(
1✔
205
            body=text_body,
206
            from_=settings.TWILIO_MAIN_NUMBER,
207
            to=instance.number,
208
        )
209

210

211
def vcard_lookup_key_default():
1✔
212
    return "".join(
1✔
213
        secrets.choice(string.ascii_letters + string.digits) for i in range(6)
214
    )
215

216

217
class RelayNumber(models.Model):
1✔
218
    user = models.ForeignKey(User, on_delete=models.CASCADE)
1✔
219
    number = models.CharField(max_length=15, db_index=True, unique=True)
1✔
220
    vendor = models.CharField(max_length=15, default="twilio")
1✔
221
    location = models.CharField(max_length=255)
1✔
222
    country_code = models.CharField(max_length=2, default="US")
1✔
223
    vcard_lookup_key = models.CharField(
1✔
224
        max_length=6, default=vcard_lookup_key_default, unique=True
225
    )
226
    enabled = models.BooleanField(default=True)
1✔
227
    remaining_seconds = models.IntegerField(
1✔
228
        default=settings.MAX_MINUTES_PER_BILLING_CYCLE * 60
229
    )
230
    remaining_texts = models.IntegerField(default=settings.MAX_TEXTS_PER_BILLING_CYCLE)
1✔
231
    calls_forwarded = models.IntegerField(default=0)
1✔
232
    calls_blocked = models.IntegerField(default=0)
1✔
233
    texts_forwarded = models.IntegerField(default=0)
1✔
234
    texts_blocked = models.IntegerField(default=0)
1✔
235
    created_at = models.DateTimeField(null=True, auto_now_add=True)
1✔
236

237
    @property
1✔
238
    def remaining_minutes(self) -> int:
1✔
239
        # return a 0 or positive int for remaining minutes
240
        return floor(max(self.remaining_seconds, 0) / 60)
1✔
241

242
    @property
1✔
243
    def calls_and_texts_forwarded(self) -> int:
1✔
244
        return self.calls_forwarded + self.texts_forwarded
1✔
245

246
    @property
1✔
247
    def calls_and_texts_blocked(self) -> int:
1✔
248
        return self.calls_blocked + self.texts_blocked
1✔
249

250
    @property
1✔
251
    def storing_phone_log(self) -> bool:
1✔
252
        return bool(self.user.profile.store_phone_log)
1✔
253

254
    def save(self, *args, **kwargs):
1✔
255
        realphone = get_verified_realphone_records(self.user).first()
1✔
256
        if not realphone:
1✔
257
            raise ValidationError("User does not have a verified real phone.")
1✔
258

259
        # if this number exists for this user, this is an update call
260
        existing_numbers = RelayNumber.objects.filter(user=self.user)
1✔
261
        this_number = existing_numbers.filter(number=self.number).first()
1✔
262
        if this_number and this_number.id == self.id:
1✔
263
            return super().save(*args, **kwargs)
1✔
264
        elif existing_numbers.exists():
1✔
265
            raise ValidationError("User can have only one relay number.")
1✔
266

267
        if RelayNumber.objects.filter(number=self.number).exists():
1✔
268
            raise ValidationError("This number is already claimed.")
1✔
269

270
        use_twilio = (
1✔
271
            self.vendor == "twilio" and not settings.PHONES_NO_CLIENT_CALLS_IN_TEST
272
        )
273

274
        if use_twilio:
1✔
275
            # Before saving into DB provision the number in Twilio
276
            client = twilio_client()
1✔
277

278
            # Since this will charge the Twilio account, first see if this
279
            # is running with TEST creds to avoid charges.
280
            if settings.TWILIO_TEST_ACCOUNT_SID:
1!
281
                client = phones_config().twilio_test_client
×
282

283
            twilio_incoming_number = client.incoming_phone_numbers.create(
1✔
284
                phone_number=self.number,
285
                sms_application_sid=settings.TWILIO_SMS_APPLICATION_SID,
286
                voice_application_sid=settings.TWILIO_SMS_APPLICATION_SID,
287
            )
288

289
        # Assume number was selected through suggested_numbers, so same country
290
        # as realphone
291
        self.country_code = realphone.country_code.upper()
1✔
292

293
        # Add US numbers to the Relay messaging service, so it goes into our
294
        # US A2P 10DLC campaign
295
        if use_twilio and self.country_code == "US":
1✔
296
            if settings.TWILIO_MESSAGING_SERVICE_SID:
1✔
297
                register_with_messaging_service(client, twilio_incoming_number.sid)
1✔
298
            else:
299
                logger.warning(
1✔
300
                    "Skipping Twilio Messaging Service registration, since"
301
                    " TWILIO_MESSAGING_SERVICE_SID is empty.",
302
                    extra={"number_sid": twilio_incoming_number.sid},
303
                )
304

305
        return super().save(*args, **kwargs)
1✔
306

307

308
class CachedList:
1✔
309
    """A list that is stored in a cache."""
310

311
    def __init__(self, cache_key: str) -> None:
1✔
312
        self.cache_key = cache_key
1✔
313
        cache_value = cache.get(self.cache_key, "")
1✔
314
        if cache_value:
1✔
315
            self.data = cache_value.split(",")
1✔
316
        else:
317
            self.data = []
1✔
318

319
    def __iter__(self) -> Iterator[str]:
1✔
320
        return (item for item in self.data)
1✔
321

322
    def append(self, item: str) -> None:
1✔
323
        self.data.append(item)
1✔
324
        self.data.sort()
1✔
325
        cache.set(self.cache_key, ",".join(self.data))
1✔
326

327

328
def register_with_messaging_service(client: Client, number_sid: str) -> None:
1✔
329
    """Register a Twilio US phone number with a Messaging Service."""
330

331
    if not settings.TWILIO_MESSAGING_SERVICE_SID:
1!
NEW
332
        raise ValueError(
×
333
            "settings.TWILIO_MESSAGING_SERVICE_SID must contain a value when calling "
334
            "register_with_messaging_service"
335
        )
336

337
    closed_sids = CachedList("twilio_messaging_service_closed")
1✔
338

339
    for service_sid in settings.TWILIO_MESSAGING_SERVICE_SID:
1✔
340
        if service_sid in closed_sids:
1✔
341
            continue
1✔
342
        try:
1✔
343
            client.messaging.v1.services(service_sid).phone_numbers.create(
1✔
344
                phone_number_sid=number_sid
345
            )
346
        except TwilioRestException as err:
1✔
347
            log_extra = {
1✔
348
                "err_msg": err.msg,
349
                "status": err.status,
350
                "code": err.code,
351
                "service_sid": service_sid,
352
                "number_sid": number_sid,
353
            }
354
            if err.status == 409 and err.code == 21710:
1✔
355
                # Log "Phone Number is already in the Messaging Service"
356
                # https://www.twilio.com/docs/api/errors/21710
357
                logger.warning("twilio_messaging_service", extra=log_extra)
1✔
358
                return
1✔
359
            elif err.status == 412 and err.code == 21714:
1✔
360
                # Log "Number Pool size limit reached", continue to next service
361
                # https://www.twilio.com/docs/api/errors/21714
362
                closed_sids.append(service_sid)
1✔
363
                logger.warning("twilio_messaging_service", extra=log_extra)
1✔
364
            else:
365
                # Log and re-raise other Twilio errors
366
                logger.error("twilio_messaging_service", extra=log_extra)
1✔
367
                raise
1✔
368
        else:
369
            return  # Successfully registered with service
1✔
370

371
    raise Exception("All services in TWILIO_MESSAGING_SERVICE_SID are full")
1✔
372

373

374
@receiver(post_save, sender=RelayNumber)
1✔
375
def relaynumber_post_save(sender, instance, created, **kwargs):
1✔
376
    # don't do anything if running migrations
377
    if type(instance) == MigrationRecorder.Migration:
1!
378
        return
×
379

380
    # TODO: if IQ_FOR_NEW_NUMBERS, send welcome message via IQ
381
    if not instance.vendor == "twilio":
1!
382
        return
×
383

384
    if created:
1✔
385
        incr_if_enabled("phones_RelayNumber.post_save_created_send_welcome")
1✔
386
        if not settings.PHONES_NO_CLIENT_CALLS_IN_TEST:
1✔
387
            # only send welcome vCard when creating new record
388
            send_welcome_message(instance.user, instance)
1✔
389

390

391
def send_welcome_message(user, relay_number):
1✔
392
    real_phone = RealPhone.objects.get(user=user)
1✔
393
    if not settings.SITE_ORIGIN:
1!
NEW
394
        raise ValueError(
×
395
            "settings.SITE_ORIGIN must contain a value when calling "
396
            "send_welcome_message"
397
        )
398
    media_url = settings.SITE_ORIGIN + reverse(
1✔
399
        "vCard", kwargs={"lookup_key": relay_number.vcard_lookup_key}
400
    )
401
    client = twilio_client()
1✔
402
    client.messages.create(
1✔
403
        body=(
404
            "Welcome to Relay phone masking!"
405
            " 🎉 Please add your number to your contacts."
406
            " This will help you identify your Relay messages and calls."
407
        ),
408
        from_=settings.TWILIO_MAIN_NUMBER,
409
        to=real_phone.number,
410
        media_url=[media_url],
411
    )
412

413

414
def last_inbound_date_default():
1✔
415
    return datetime.now(UTC)
1✔
416

417

418
class InboundContact(models.Model):
1✔
419
    relay_number = models.ForeignKey(RelayNumber, on_delete=models.CASCADE)
1✔
420
    inbound_number = models.CharField(max_length=15)
1✔
421
    last_inbound_date = models.DateTimeField(default=last_inbound_date_default)
1✔
422
    last_inbound_type = models.CharField(
1✔
423
        max_length=4, choices=LAST_CONTACT_TYPE_CHOICES, default="text"
424
    )
425

426
    num_calls = models.PositiveIntegerField(default=0)
1✔
427
    num_calls_blocked = models.PositiveIntegerField(default=0)
1✔
428
    last_call_date = models.DateTimeField(null=True)
1✔
429

430
    num_texts = models.PositiveIntegerField(default=0)
1✔
431
    num_texts_blocked = models.PositiveIntegerField(default=0)
1✔
432
    last_text_date = models.DateTimeField(null=True)
1✔
433

434
    blocked = models.BooleanField(default=False)
1✔
435

436
    class Meta:
1✔
437
        indexes = [models.Index(fields=["relay_number", "inbound_number"])]
1✔
438

439

440
def suggested_numbers(user):
1✔
441
    real_phone = get_verified_realphone_records(user).first()
1✔
442
    if real_phone is None:
1✔
443
        raise BadRequest(
1✔
444
            "available_numbers: This user hasn't verified a RealPhone yet."
445
        )
446

447
    existing_number = RelayNumber.objects.filter(user=user)
1✔
448
    if existing_number:
1✔
449
        raise BadRequest(
1✔
450
            "available_numbers: Another RelayNumber already exists for this user."
451
        )
452

453
    real_num = real_phone.number
1✔
454
    client = twilio_client()
1✔
455
    avail_nums = client.available_phone_numbers(real_phone.country_code)
1✔
456

457
    # TODO: can we make multiple pattern searches in a single Twilio API request
458
    same_prefix_options = []
1✔
459
    # look for numbers with same area code and 3-number prefix
460
    contains = f"{real_num[:8]}****" if real_num else ""
1✔
461
    twilio_nums = avail_nums.local.list(contains=contains, limit=10)
1✔
462
    same_prefix_options.extend(convert_twilio_numbers_to_dict(twilio_nums))
1✔
463

464
    # look for numbers with same area code, 2-number prefix and suffix
465
    contains = f"{real_num[:7]}***{real_num[10:]}" if real_num else ""
1✔
466
    twilio_nums = avail_nums.local.list(contains=contains, limit=10)
1✔
467
    same_prefix_options.extend(convert_twilio_numbers_to_dict(twilio_nums))
1✔
468

469
    # look for numbers with same area code and 1-number prefix
470
    contains = f"{real_num[:6]}******" if real_num else ""
1✔
471
    twilio_nums = avail_nums.local.list(contains=contains, limit=10)
1✔
472
    same_prefix_options.extend(convert_twilio_numbers_to_dict(twilio_nums))
1✔
473

474
    # look for same number in other area codes
475
    contains = f"+1***{real_num[5:]}" if real_num else ""
1✔
476
    twilio_nums = avail_nums.local.list(contains=contains, limit=10)
1✔
477
    other_areas_options = convert_twilio_numbers_to_dict(twilio_nums)
1✔
478

479
    # look for any numbers in the area code
480
    contains = f"{real_num[:5]}*******" if real_num else ""
1✔
481
    twilio_nums = avail_nums.local.list(contains=contains, limit=10)
1✔
482
    same_area_options = convert_twilio_numbers_to_dict(twilio_nums)
1✔
483

484
    # look for any available numbers
485
    twilio_nums = avail_nums.local.list(limit=10)
1✔
486
    random_options = convert_twilio_numbers_to_dict(twilio_nums)
1✔
487

488
    return {
1✔
489
        "real_num": real_num,
490
        "same_prefix_options": same_prefix_options,
491
        "other_areas_options": other_areas_options,
492
        "same_area_options": same_area_options,
493
        "random_options": random_options,
494
    }
495

496

497
def location_numbers(location, country_code="US"):
1✔
498
    client = twilio_client()
1✔
499
    avail_nums = client.available_phone_numbers(country_code)
1✔
500
    twilio_nums = avail_nums.local.list(in_locality=location, limit=10)
1✔
501
    return convert_twilio_numbers_to_dict(twilio_nums)
1✔
502

503

504
def area_code_numbers(area_code, country_code="US"):
1✔
505
    client = twilio_client()
1✔
506
    avail_nums = client.available_phone_numbers(country_code)
1✔
507
    twilio_nums = avail_nums.local.list(area_code=area_code, limit=10)
1✔
508
    return convert_twilio_numbers_to_dict(twilio_nums)
1✔
509

510

511
def convert_twilio_numbers_to_dict(twilio_numbers):
1✔
512
    """
513
    To serialize twilio numbers to JSON for the API,
514
    we need to convert them into dictionaries.
515
    """
516
    numbers_as_dicts = []
1✔
517
    for twilio_number in twilio_numbers:
1✔
518
        number = {}
1✔
519
        number["friendly_name"] = twilio_number.friendly_name
1✔
520
        number["iso_country"] = twilio_number.iso_country
1✔
521
        number["locality"] = twilio_number.locality
1✔
522
        number["phone_number"] = twilio_number.phone_number
1✔
523
        number["postal_code"] = twilio_number.postal_code
1✔
524
        number["region"] = twilio_number.region
1✔
525
        numbers_as_dicts.append(number)
1✔
526
    return numbers_as_dicts
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc