• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / b2e067fe-ce4e-4099-9bef-07b368e99782

15 Apr 2024 04:18PM CUT coverage: 75.544% (+0.002%) from 75.542%
b2e067fe-ce4e-4099-9bef-07b368e99782

push

circleci

jwhitlock
Enable pyupgrade, fix issues

2443 of 3405 branches covered (71.75%)

Branch coverage included in aggregate %.

56 of 59 new or added lines in 14 files covered. (94.92%)

234 existing lines in 24 files now uncovered.

6793 of 8821 relevant lines covered (77.01%)

20.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.4
/api/views/phones.py
1
import hashlib
1✔
2
import logging
1✔
3
import re
1✔
4
import string
1✔
5
from dataclasses import asdict, dataclass, field
1✔
6
from datetime import UTC, datetime
1✔
7
from typing import Any, Literal
1✔
8

9
from django.conf import settings
1✔
10
from django.contrib.auth.models import User
1✔
11
from django.core.exceptions import ObjectDoesNotExist
1✔
12
from django.forms import model_to_dict
1✔
13

14
import django_ftl
1✔
15
import phonenumbers
1✔
16
from drf_spectacular.utils import OpenApiParameter, extend_schema
1✔
17
from rest_framework import (
1✔
18
    decorators,
19
    exceptions,
20
    permissions,
21
    response,
22
    throttling,
23
    viewsets,
24
)
25
from rest_framework.generics import get_object_or_404
1✔
26
from rest_framework.request import Request
1✔
27
from twilio.base.exceptions import TwilioRestException
1✔
28
from waffle import flag_is_active, get_waffle_flag_model
1✔
29

30
from api.views import SaveToRequestUser
1✔
31
from emails.utils import incr_if_enabled
1✔
32
from phones.apps import phones_config, twilio_client
1✔
33
from phones.iq_utils import send_iq_sms
1✔
34
from phones.models import (
1✔
35
    InboundContact,
36
    RealPhone,
37
    RelayNumber,
38
    area_code_numbers,
39
    get_last_text_sender,
40
    get_pending_unverified_realphone_records,
41
    get_valid_realphone_verification_record,
42
    get_verified_realphone_record,
43
    get_verified_realphone_records,
44
    location_numbers,
45
    send_welcome_message,
46
    suggested_numbers,
47
)
48
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
49

50
from ..exceptions import ConflictError, ErrorContextType
1✔
51
from ..permissions import HasPhoneService
1✔
52
from ..renderers import TemplateTwiMLRenderer, vCardRenderer
1✔
53
from ..serializers.phones import (
1✔
54
    InboundContactSerializer,
55
    RealPhoneSerializer,
56
    RelayNumberSerializer,
57
)
58

59
logger = logging.getLogger("events")
1✔
60
info_logger = logging.getLogger("eventsinfo")
1✔
61

62

63
def twilio_validator():
1✔
64
    return phones_config().twilio_validator
1✔
65

66

67
def twiml_app():
1✔
UNCOV
68
    return phones_config().twiml_app
×
69

70

71
class RealPhoneRateThrottle(throttling.UserRateThrottle):
1✔
72
    rate = settings.PHONE_RATE_LIMIT
1✔
73

74

75
class RealPhoneViewSet(SaveToRequestUser, viewsets.ModelViewSet):
1✔
76
    """
77
    Get real phone number records for the authenticated user.
78

79
    The authenticated user must have a subscription that grants one of the
80
    `SUBSCRIPTIONS_WITH_PHONE` capabilities.
81

82
    Client must be authenticated, and these endpoints only return data that is
83
    "owned" by the authenticated user.
84

85
    All endpoints are rate-limited to settings.PHONE_RATE_LIMIT
86
    """
87

88
    http_method_names = ["get", "post", "patch", "delete"]
1✔
89
    permission_classes = [permissions.IsAuthenticated, HasPhoneService]
1✔
90
    serializer_class = RealPhoneSerializer
1✔
91
    # TODO: this doesn't seem to e working?
92
    throttle_classes = [RealPhoneRateThrottle]
1✔
93

94
    def get_queryset(self):
1✔
95
        assert isinstance(self.request.user, User)
1✔
96
        return RealPhone.objects.filter(user=self.request.user)
1✔
97

98
    def create(self, request):
1✔
99
        """
100
        Add real phone number to the authenticated user.
101

102
        The "flow" to verify a real phone number is:
103
        1. POST a number (Will text a verification code to the number)
104
        2a. PATCH the verification code to the realphone/{id} endpoint
105
        2b. POST the number and verification code together
106

107
        The authenticated user must have a subscription that grants one of the
108
        `SUBSCRIPTIONS_WITH_PHONE` capabilities.
109

110
        The `number` field should be in [E.164][e164] format which includes a country
111
        code. If the number is not in E.164 format, this endpoint will try to
112
        create an E.164 number by prepending the country code of the client
113
        making the request (i.e., from the `X-Client-Region` HTTP header).
114

115
        If the `POST` does NOT include a `verification_code` and the number is
116
        a valid (currently, US-based) number, this endpoint will text a
117
        verification code to the number.
118

119
        If the `POST` DOES include a `verification_code`, and the code matches
120
        a code already sent to the number, this endpoint will set `verified` to
121
        `True` for this number.
122

123
        [e164]: https://en.wikipedia.org/wiki/E.164
124
        """
125
        incr_if_enabled("phones_RealPhoneViewSet.create")
1✔
126
        serializer = self.get_serializer(data=request.data)
1✔
127
        serializer.is_valid(raise_exception=True)
1✔
128

129
        # Check if the request includes a valid verification_code
130
        # value, look for any un-expired record that matches both the phone
131
        # number and verification code and mark it verified.
132
        verification_code = serializer.validated_data.get("verification_code")
1✔
133
        if verification_code:
1✔
134
            valid_record = get_valid_realphone_verification_record(
1✔
135
                request.user, serializer.validated_data["number"], verification_code
136
            )
137
            if not valid_record:
1✔
138
                incr_if_enabled("phones_RealPhoneViewSet.create.invalid_verification")
1✔
139
                raise exceptions.ValidationError(
1✔
140
                    "Could not find that verification_code for user and number."
141
                    " It may have expired."
142
                )
143

144
            headers = self.get_success_headers(serializer.validated_data)
1✔
145
            verified_valid_record = valid_record.mark_verified()
1✔
146
            incr_if_enabled("phones_RealPhoneViewSet.create.mark_verified")
1✔
147
            response_data = model_to_dict(
1✔
148
                verified_valid_record,
149
                fields=[
150
                    "id",
151
                    "number",
152
                    "verification_sent_date",
153
                    "verified",
154
                    "verified_date",
155
                ],
156
            )
157
            return response.Response(response_data, status=201, headers=headers)
1✔
158

159
        # to prevent sending verification codes to verified numbers,
160
        # check if the number is already a verified number.
161
        is_verified = get_verified_realphone_record(serializer.validated_data["number"])
1✔
162
        if is_verified:
1!
UNCOV
163
            raise ConflictError("A verified record already exists for this number.")
×
164

165
        # to prevent abusive sending of verification messages,
166
        # check if there is an un-expired verification code for the user
167
        pending_unverified_records = get_pending_unverified_realphone_records(
1✔
168
            serializer.validated_data["number"]
169
        )
170
        if pending_unverified_records:
1✔
171
            raise ConflictError(
1✔
172
                "An unverified record already exists for this number.",
173
            )
174

175
        # We call an additional _validate_number function with the request
176
        # to try to parse the number as a local national number in the
177
        # request.country attribute
178
        valid_number = _validate_number(request)
1✔
179
        serializer.validated_data["number"] = valid_number.phone_number
1✔
180
        serializer.validated_data["country_code"] = valid_number.country_code.upper()
1✔
181

182
        self.perform_create(serializer)
1✔
183
        incr_if_enabled("phones_RealPhoneViewSet.perform_create")
1✔
184
        headers = self.get_success_headers(serializer.validated_data)
1✔
185
        response_data = serializer.data
1✔
186
        response_data["message"] = (
1✔
187
            "Sent verification code to "
188
            f"{valid_number.phone_number} "
189
            f"(country: {valid_number.country_code} "
190
            f"carrier: {valid_number.carrier})"
191
        )
192
        return response.Response(response_data, status=201, headers=headers)
1✔
193

194
    # check verification_code during partial_update to compare
195
    # the value sent in the request against the value already on the instance
196
    # TODO: this logic might be able to move "up" into the model, but it will
197
    # need some more serious refactoring of the RealPhone.save() method
198
    def partial_update(self, request, *args, **kwargs):
1✔
199
        """
200
        Update the authenticated user's real phone number.
201

202
        The authenticated user must have a subscription that grants one of the
203
        `SUBSCRIPTIONS_WITH_PHONE` capabilities.
204

205
        The `{id}` should match a previously-`POST`ed resource that belongs to the user.
206

207
        The `number` field should be in [E.164][e164] format which includes a country
208
        code.
209

210
        The `verification_code` should be the code that was texted to the
211
        number during the `POST`. If it matches, this endpoint will set
212
        `verified` to `True` for this number.
213

214
        [e164]: https://en.wikipedia.org/wiki/E.164
215
        """
216
        incr_if_enabled("phones_RealPhoneViewSet.partial_update")
1✔
217
        instance = self.get_object()
1✔
218
        if request.data["number"] != instance.number:
1✔
219
            raise exceptions.ValidationError("Invalid number for ID.")
1✔
220
        # TODO: check verification_sent_date is not "expired"?
221
        # Note: the RealPhone.save() logic should prevent expired verifications
222
        if (
1✔
223
            "verification_code" not in request.data
224
            or not request.data["verification_code"] == instance.verification_code
225
        ):
226
            raise exceptions.ValidationError(
1✔
227
                "Invalid verification_code for ID. It may have expired."
228
            )
229

230
        instance.mark_verified()
1✔
231
        incr_if_enabled("phones_RealPhoneViewSet.partial_update.mark_verified")
1✔
232
        return super().partial_update(request, *args, **kwargs)
1✔
233

234
    def destroy(self, request, *args, **kwargs):
1✔
235
        """
236
        Delete a real phone resource.
237

238
        Only **un-verified** real phone resources can be deleted.
239
        """
240
        incr_if_enabled("phones_RealPhoneViewSet.destroy")
1✔
241
        instance = self.get_object()
1✔
242
        if instance.verified:
1✔
243
            raise exceptions.ValidationError(
1✔
244
                "Only un-verified real phone resources can be deleted."
245
            )
246

247
        return super().destroy(request, *args, **kwargs)
1✔
248

249

250
class RelayNumberViewSet(SaveToRequestUser, viewsets.ModelViewSet):
1✔
251
    http_method_names = ["get", "post", "patch"]
1✔
252
    permission_classes = [permissions.IsAuthenticated, HasPhoneService]
1✔
253
    serializer_class = RelayNumberSerializer
1✔
254

255
    def get_queryset(self):
1✔
256
        assert isinstance(self.request.user, User)
1✔
257
        return RelayNumber.objects.filter(user=self.request.user)
1✔
258

259
    def create(self, request, *args, **kwargs):
1✔
260
        """
261
        Provision a phone number with Twilio and assign to the authenticated user.
262

263
        ⚠️ **THIS WILL BUY A PHONE NUMBER** ⚠️
264
        If you have real account credentials in your `TWILIO_*` env vars, this
265
        will really provision a Twilio number to your account. You can use
266
        [Test Credentials][test-creds] to call this endpoint without making a
267
        real phone number purchase. If you do, you need to pass one of the
268
        [test phone numbers][test-numbers].
269

270
        The `number` should be in [E.164][e164] format.
271

272
        Every call or text to the relay number will be sent as a webhook to the
273
        URL configured for your `TWILIO_SMS_APPLICATION_SID`.
274

275
        [test-creds]: https://www.twilio.com/docs/iam/test-credentials
276
        [test-numbers]: https://www.twilio.com/docs/iam/test-credentials#test-incoming-phone-numbers-parameters-PhoneNumber
277
        [e164]: https://en.wikipedia.org/wiki/E.164
278
        """  # noqa: E501  # ignore long line for URL
279
        incr_if_enabled("phones_RelayNumberViewSet.create")
1✔
280
        existing_number = RelayNumber.objects.filter(user=request.user)
1✔
281
        if existing_number:
1!
282
            raise exceptions.ValidationError("User already has a RelayNumber.")
1✔
UNCOV
283
        return super().create(request, *args, **kwargs)
×
284

285
    def partial_update(self, request, *args, **kwargs):
1✔
286
        """
287
        Update the authenticated user's relay number.
288

289
        The authenticated user must have a subscription that grants one of the
290
        `SUBSCRIPTIONS_WITH_PHONE` capabilities.
291

292
        The `{id}` should match a previously-`POST`ed resource that belongs to
293
        the authenticated user.
294

295
        This is primarily used to toggle the `enabled` field.
296
        """
297
        incr_if_enabled("phones_RelayNumberViewSet.partial_update")
1✔
298
        return super().partial_update(request, *args, **kwargs)
1✔
299

300
    @decorators.action(detail=False)
1✔
301
    def suggestions(self, request):
1✔
302
        """
303
        Returns suggested relay numbers for the authenticated user.
304

305
        Based on the user's real number, returns available relay numbers:
306
          * `same_prefix_options`: Numbers that match as much of the user's
307
            real number as possible.
308
          * `other_areas_options`: Numbers that exactly match the user's real
309
            number, in a different area code.
310
          * `same_area_options`: Other numbers in the same area code as the user.
311
          * `random_options`: Available numbers in the user's country
312
        """
313
        incr_if_enabled("phones_RelayNumberViewSet.suggestions")
1✔
314
        numbers = suggested_numbers(request.user)
1✔
315
        return response.Response(numbers)
1✔
316

317
    @decorators.action(detail=False)
1✔
318
    def search(self, request):
1✔
319
        """
320
        Search for available numbers.
321

322
        This endpoints uses the underlying [AvailablePhoneNumbers][apn] API.
323

324
        Accepted query params:
325
          * ?location=
326
            * Will be passed to `AvailablePhoneNumbers` `in_locality` param
327
          * ?area_code=
328
            * Will be passed to `AvailablePhoneNumbers` `area_code` param
329

330
        [apn]: https://www.twilio.com/docs/phone-numbers/api/availablephonenumberlocal-resource#read-multiple-availablephonenumberlocal-resources
331
        """  # noqa: E501  # ignore long line for URL
332
        incr_if_enabled("phones_RelayNumberViewSet.search")
1✔
333
        real_phone = get_verified_realphone_records(request.user).first()
1✔
334
        if real_phone:
1✔
335
            country_code = real_phone.country_code
1✔
336
        else:
337
            country_code = "US"
1✔
338
        location = request.query_params.get("location")
1✔
339
        if location is not None:
1✔
340
            numbers = location_numbers(location, country_code)
1✔
341
            return response.Response(numbers)
1✔
342

343
        area_code = request.query_params.get("area_code")
1✔
344
        if area_code is not None:
1✔
345
            numbers = area_code_numbers(area_code, country_code)
1✔
346
            return response.Response(numbers)
1✔
347

348
        return response.Response({}, 404)
1✔
349

350

351
class InboundContactViewSet(viewsets.ModelViewSet):
1✔
352
    http_method_names = ["get", "patch"]
1✔
353
    permission_classes = [permissions.IsAuthenticated, HasPhoneService]
1✔
354
    serializer_class = InboundContactSerializer
1✔
355

356
    def get_queryset(self):
1✔
357
        request_user_relay_num = get_object_or_404(RelayNumber, user=self.request.user)
1✔
358
        return InboundContact.objects.filter(relay_number=request_user_relay_num)
1✔
359

360

361
def _validate_number(request, number_field="number"):
1✔
362
    if number_field not in request.data:
1✔
363
        raise exceptions.ValidationError({number_field: "A number is required."})
1✔
364

365
    parsed_number = _parse_number(
1✔
366
        request.data[number_field], getattr(request, "country", None)
367
    )
368
    if not parsed_number:
1✔
369
        country = None
1✔
370
        if hasattr(request, "country"):
1!
UNCOV
371
            country = request.country
×
372
        error_message = (
1✔
373
            "number must be in E.164 format, or in local national format of the"
374
            f" country detected: {country}"
375
        )
376
        raise exceptions.ValidationError(error_message)
1✔
377

378
    e164_number = f"+{parsed_number.country_code}{parsed_number.national_number}"
1✔
379
    number_details = _get_number_details(e164_number)
1✔
380
    if not number_details:
1✔
381
        raise exceptions.ValidationError(
1✔
382
            f"Could not get number details for {e164_number}"
383
        )
384

385
    if number_details.country_code.upper() not in settings.TWILIO_ALLOWED_COUNTRY_CODES:
1✔
386
        incr_if_enabled("phones_validate_number_unsupported_country")
1✔
387
        raise exceptions.ValidationError(
1✔
388
            "Relay Phone is currently only available for these country codes: "
389
            f"{sorted(settings.TWILIO_ALLOWED_COUNTRY_CODES)!r}. "
390
            "Your phone number country code is: "
391
            f"'{number_details.country_code.upper()}'."
392
        )
393

394
    return number_details
1✔
395

396

397
def _parse_number(number, country=None):
1✔
398
    try:
1✔
399
        # First try to parse assuming number is E.164 with country prefix
400
        return phonenumbers.parse(number)
1✔
401
    except phonenumbers.phonenumberutil.NumberParseException as e:
1✔
402
        if e.error_type == e.INVALID_COUNTRY_CODE and country is not None:
1✔
403
            try:
1✔
404
                # Try to parse, assuming number is local national format
405
                # in the detected request country
406
                return phonenumbers.parse(number, country)
1✔
UNCOV
407
            except Exception:
×
UNCOV
408
                return None
×
409
    return None
1✔
410

411

412
def _get_number_details(e164_number):
1✔
413
    incr_if_enabled("phones_get_number_details")
1✔
414
    try:
1✔
415
        client = twilio_client()
1✔
416
        return client.lookups.v1.phone_numbers(e164_number).fetch(type=["carrier"])
1✔
417
    except Exception:
1✔
418
        logger.exception(f"Could not get number details for {e164_number}")
1✔
419
        return None
1✔
420

421

422
@decorators.api_view()
1✔
423
@decorators.permission_classes([permissions.AllowAny])
1✔
424
@decorators.renderer_classes([vCardRenderer])
1✔
425
def vCard(request, lookup_key):
1✔
426
    """
427
    Get a Relay vCard. `lookup_key` should be passed in url path.
428

429
    We use this to return a vCard for a number. When we create a RelayNumber,
430
    we create a secret lookup_key and text it to the user.
431
    """
432
    incr_if_enabled("phones_vcard")
1✔
433
    if lookup_key is None:
1!
UNCOV
434
        return response.Response(status=404)
×
435

436
    try:
1✔
437
        relay_number = RelayNumber.objects.get(vcard_lookup_key=lookup_key)
1✔
438
    except RelayNumber.DoesNotExist:
1✔
439
        raise exceptions.NotFound()
1✔
440
    number = relay_number.number
1✔
441

442
    resp = response.Response({"number": number})
1✔
443
    resp["Content-Disposition"] = f"attachment; filename={number}.vcf"
1✔
444
    return resp
1✔
445

446

447
@decorators.api_view(["POST"])
1✔
448
@decorators.permission_classes([permissions.IsAuthenticated, HasPhoneService])
1✔
449
def resend_welcome_sms(request):
1✔
450
    """
451
    Resend the "Welcome" SMS, including vCard.
452

453
    Requires the user to be signed in and to have phone service.
454
    """
455
    incr_if_enabled("phones_resend_welcome_sms")
1✔
456
    try:
1✔
457
        relay_number = RelayNumber.objects.get(user=request.user)
1✔
UNCOV
458
    except RelayNumber.DoesNotExist:
×
UNCOV
459
        raise exceptions.NotFound()
×
460
    send_welcome_message(request.user, relay_number)
1✔
461

462
    resp = response.Response(status=201, data={"msg": "sent"})
1✔
463
    return resp
1✔
464

465

466
def _try_delete_from_twilio(message):
1✔
467
    try:
1✔
468
        message.delete()
1✔
UNCOV
469
    except TwilioRestException as e:
×
470
        # Raise the exception unless it's a 404 indicating the message is already gone
UNCOV
471
        if e.status != 404:
×
UNCOV
472
            raise e
×
473

474

475
def message_body(from_num, body):
1✔
476
    return f"[Relay 📲 {from_num}] {body}"
1✔
477

478

479
def _get_user_error_message(real_phone: RealPhone, sms_exception) -> Any:
1✔
480
    # Send a translated message to the user
481
    ftl_code = sms_exception.get_codes().replace("_", "-")
1✔
482
    ftl_id = f"sms-error-{ftl_code}"
1✔
483
    # log the error in English
484
    with django_ftl.override("en"):
1✔
485
        logger.exception(ftl_bundle.format(ftl_id, sms_exception.error_context()))
1✔
486
    with django_ftl.override(real_phone.user.profile.language):
1✔
487
        user_message = ftl_bundle.format(ftl_id, sms_exception.error_context())
1✔
488
    return user_message
1✔
489

490

491
@decorators.api_view(["POST"])
1✔
492
@decorators.permission_classes([permissions.AllowAny])
1✔
493
@decorators.renderer_classes([TemplateTwiMLRenderer])
1✔
494
def inbound_sms(request):
1✔
495
    incr_if_enabled("phones_inbound_sms")
1✔
496
    _validate_twilio_request(request)
1✔
497

498
    """
1✔
499
    TODO: delete the message from Twilio; how to do this AFTER this request? queue?
500
    E.g., with a django-celery task in phones.tasks:
501

502
    inbound_msg_sid = request.data.get("MessageSid", None)
503
    if inbound_msg_sid is None:
504
        raise exceptions.ValidationError("Request missing MessageSid")
505
    tasks._try_delete_from_twilio.delay(args=message, countdown=10)
506
    """
507

508
    inbound_body = request.data.get("Body", None)
1✔
509
    inbound_from = request.data.get("From", None)
1✔
510
    inbound_to = request.data.get("To", None)
1✔
511
    if inbound_body is None or inbound_from is None or inbound_to is None:
1✔
512
        raise exceptions.ValidationError("Request missing From, To, Or Body.")
1✔
513

514
    relay_number, real_phone = _get_phone_objects(inbound_to)
1✔
515
    _check_remaining(relay_number, "texts")
1✔
516

517
    if inbound_from == real_phone.number:
1✔
518
        try:
1✔
519
            relay_number, destination_number, body = _prepare_sms_reply(
1✔
520
                relay_number, inbound_body
521
            )
522
            client = twilio_client()
1✔
523
            incr_if_enabled("phones_send_sms_reply")
1✔
524
            client.messages.create(
1✔
525
                from_=relay_number.number, body=body, to=destination_number
526
            )
527
            relay_number.remaining_texts -= 1
1✔
528
            relay_number.texts_forwarded += 1
1✔
529
            relay_number.save()
1✔
530
        except RelaySMSException as sms_exception:
1✔
531
            user_error_message = _get_user_error_message(real_phone, sms_exception)
1✔
532
            twilio_client().messages.create(
1✔
533
                from_=relay_number.number, body=user_error_message, to=real_phone.number
534
            )
535

536
            # Return 400 on critical exceptions
537
            if sms_exception.critical:
1✔
538
                raise exceptions.ValidationError(
1✔
539
                    sms_exception.detail
540
                ) from sms_exception
541
        return response.Response(
1✔
542
            status=200,
543
            template_name="twiml_empty_response.xml",
544
        )
545

546
    number_disabled = _check_disabled(relay_number, "texts")
1✔
547
    if number_disabled:
1✔
548
        return response.Response(
1✔
549
            status=200,
550
            template_name="twiml_empty_response.xml",
551
        )
552
    inbound_contact = _get_inbound_contact(relay_number, inbound_from)
1✔
553
    if inbound_contact:
1✔
554
        _check_and_update_contact(inbound_contact, "texts", relay_number)
1✔
555

556
    client = twilio_client()
1✔
557
    app = twiml_app()
1✔
558
    incr_if_enabled("phones_outbound_sms")
1✔
559
    body = message_body(inbound_from, inbound_body)
1✔
560
    client.messages.create(
1✔
561
        from_=relay_number.number,
562
        body=body,
563
        status_callback=app.sms_status_callback,
564
        to=real_phone.number,
565
    )
566
    relay_number.remaining_texts -= 1
1✔
567
    relay_number.texts_forwarded += 1
1✔
568
    relay_number.save()
1✔
569
    return response.Response(
1✔
570
        status=201,
571
        template_name="twiml_empty_response.xml",
572
    )
573

574

575
@decorators.api_view(["POST"])
1✔
576
@decorators.permission_classes([permissions.AllowAny])
1✔
577
def inbound_sms_iq(request: Request) -> response.Response:
1✔
UNCOV
578
    incr_if_enabled("phones_inbound_sms_iq")
×
UNCOV
579
    _validate_iq_request(request)
×
580

UNCOV
581
    inbound_body = request.data.get("text", None)
×
UNCOV
582
    inbound_from = request.data.get("from", None)
×
UNCOV
583
    inbound_to = request.data.get("to", None)
×
UNCOV
584
    if inbound_body is None or inbound_from is None or inbound_to is None:
×
UNCOV
585
        raise exceptions.ValidationError("Request missing from, to, or text.")
×
586

587
    from_num = phonenumbers.format_number(
×
588
        phonenumbers.parse(inbound_from, "US"),
589
        phonenumbers.PhoneNumberFormat.E164,
590
    )
591
    single_num = inbound_to[0]
×
592
    relay_num = phonenumbers.format_number(
×
593
        phonenumbers.parse(single_num, "US"), phonenumbers.PhoneNumberFormat.E164
594
    )
595

UNCOV
596
    relay_number, real_phone = _get_phone_objects(relay_num)
×
UNCOV
597
    _check_remaining(relay_number, "texts")
×
598

599
    if from_num == real_phone.number:
×
600
        try:
×
UNCOV
601
            relay_number, destination_number, body = _prepare_sms_reply(
×
602
                relay_number, inbound_body
603
            )
604
            send_iq_sms(destination_number, relay_number.number, body)
×
605
            relay_number.remaining_texts -= 1
×
UNCOV
606
            relay_number.texts_forwarded += 1
×
607
            relay_number.save()
×
608
            incr_if_enabled("phones_send_sms_reply_iq")
×
609
        except RelaySMSException as sms_exception:
×
UNCOV
610
            user_error_message = _get_user_error_message(real_phone, sms_exception)
×
UNCOV
611
            send_iq_sms(real_phone.number, relay_number.number, user_error_message)
×
612

613
            # Return 400 on critical exceptions
614
            if sms_exception.critical:
×
615
                raise exceptions.ValidationError(
×
616
                    sms_exception.detail
617
                ) from sms_exception
618
        return response.Response(
×
619
            status=200,
620
            template_name="twiml_empty_response.xml",
621
        )
622

623
    number_disabled = _check_disabled(relay_number, "texts")
×
UNCOV
624
    if number_disabled:
×
UNCOV
625
        return response.Response(status=200)
×
626

UNCOV
627
    inbound_contact = _get_inbound_contact(relay_number, from_num)
×
UNCOV
628
    if inbound_contact:
×
UNCOV
629
        _check_and_update_contact(inbound_contact, "texts", relay_number)
×
630

631
    text = message_body(inbound_from, inbound_body)
×
632
    send_iq_sms(real_phone.number, relay_number.number, text)
×
633

UNCOV
634
    relay_number.remaining_texts -= 1
×
635
    relay_number.texts_forwarded += 1
×
636
    relay_number.save()
×
637
    return response.Response(status=200)
×
638

639

640
@decorators.api_view(["POST"])
1✔
641
@decorators.permission_classes([permissions.AllowAny])
1✔
642
@decorators.renderer_classes([TemplateTwiMLRenderer])
1✔
643
def inbound_call(request):
1✔
644
    incr_if_enabled("phones_inbound_call")
1✔
645
    _validate_twilio_request(request)
1✔
646
    inbound_from = request.data.get("Caller", None)
1✔
647
    inbound_to = request.data.get("Called", None)
1✔
648
    if inbound_from is None or inbound_to is None:
1✔
649
        raise exceptions.ValidationError("Call data missing Caller or Called.")
1✔
650

651
    relay_number, real_phone = _get_phone_objects(inbound_to)
1✔
652

653
    number_disabled = _check_disabled(relay_number, "calls")
1✔
654
    if number_disabled:
1✔
655
        say = "Sorry, that number is not available."
1✔
656
        return response.Response(
1✔
657
            {"say": say}, status=200, template_name="twiml_blocked.xml"
658
        )
659

660
    _check_remaining(relay_number, "seconds")
1✔
661

662
    inbound_contact = _get_inbound_contact(relay_number, inbound_from)
1✔
663
    if inbound_contact:
1!
664
        _check_and_update_contact(inbound_contact, "calls", relay_number)
1✔
665

666
    relay_number.calls_forwarded += 1
1✔
667
    relay_number.save()
1✔
668

669
    # Note: TemplateTwiMLRenderer will render this as TwiML
670
    incr_if_enabled("phones_outbound_call")
1✔
671
    return response.Response(
1✔
672
        {"inbound_from": inbound_from, "real_number": real_phone.number},
673
        status=201,
674
        template_name="twiml_dial.xml",
675
    )
676

677

678
@decorators.api_view(["POST"])
1✔
679
@decorators.permission_classes([permissions.AllowAny])
1✔
680
def voice_status(request):
1✔
681
    incr_if_enabled("phones_voice_status")
1✔
682
    _validate_twilio_request(request)
1✔
683
    call_sid = request.data.get("CallSid", None)
1✔
684
    called = request.data.get("Called", None)
1✔
685
    call_status = request.data.get("CallStatus", None)
1✔
686
    if call_sid is None or called is None or call_status is None:
1✔
687
        raise exceptions.ValidationError("Call data missing Called, CallStatus")
1✔
688
    if call_status != "completed":
1✔
689
        return response.Response(status=200)
1✔
690
    call_duration = request.data.get("CallDuration", None)
1✔
691
    if call_duration is None:
1✔
692
        raise exceptions.ValidationError("completed call data missing CallDuration")
1✔
693
    relay_number, _ = _get_phone_objects(called)
1✔
694
    relay_number.remaining_seconds = relay_number.remaining_seconds - int(call_duration)
1✔
695
    relay_number.save()
1✔
696
    if relay_number.remaining_seconds < 0:
1✔
697
        info_logger.info(
1✔
698
            "phone_limit_exceeded",
699
            extra={
700
                "fxa_uid": relay_number.user.profile.fxa.uid,
701
                "call_duration_in_seconds": int(call_duration),
702
                "relay_number_enabled": relay_number.enabled,
703
                "remaining_seconds": relay_number.remaining_seconds,
704
                "remaining_minutes": relay_number.remaining_minutes,
705
            },
706
        )
707
    client = twilio_client()
1✔
708
    client.calls(call_sid).delete()
1✔
709
    return response.Response(status=200)
1✔
710

711

712
@decorators.api_view(["POST"])
1✔
713
@decorators.permission_classes([permissions.AllowAny])
1✔
714
def sms_status(request):
1✔
715
    _validate_twilio_request(request)
1✔
716
    sms_status = request.data.get("SmsStatus", None)
1✔
717
    message_sid = request.data.get("MessageSid", None)
1✔
718
    if sms_status is None or message_sid is None:
1✔
719
        raise exceptions.ValidationError(
1✔
720
            "Text status data missing SmsStatus or MessageSid"
721
        )
722
    if sms_status != "delivered":
1✔
723
        return response.Response(status=200)
1✔
724
    client = twilio_client()
1✔
725
    message = client.messages(message_sid)
1✔
726
    _try_delete_from_twilio(message)
1✔
727
    return response.Response(status=200)
1✔
728

729

730
@decorators.permission_classes([permissions.IsAuthenticated, HasPhoneService])
1✔
731
@extend_schema(
1✔
732
    parameters=[OpenApiParameter(name="to", required=True, type=str)],
733
    methods=["POST"],
734
    responses={200: None},
735
)
736
@decorators.api_view(["POST"])
1✔
737
def outbound_call(request):
1✔
738
    """Make a call from the authenticated user's relay number."""
739
    # TODO: Create or update an OutboundContact (new model) on send, or limit
740
    # to InboundContacts.
741
    if not flag_is_active(request, "outbound_phone"):
1✔
742
        # Return Permission Denied error
743
        return response.Response(
1✔
744
            {"detail": "Requires outbound_phone waffle flag."}, status=403
745
        )
746
    try:
1✔
747
        real_phone = RealPhone.objects.get(user=request.user, verified=True)
1✔
748
    except RealPhone.DoesNotExist:
1✔
749
        return response.Response(
1✔
750
            {"detail": "Requires a verified real phone and phone mask."}, status=400
751
        )
752
    try:
1✔
753
        relay_number = RelayNumber.objects.get(user=request.user)
1✔
754
    except RelayNumber.DoesNotExist:
1✔
755
        return response.Response({"detail": "Requires a phone mask."}, status=400)
1✔
756

757
    client = twilio_client()
1✔
758

759
    to = _validate_number(request, "to")  # Raises ValidationError on invalid number
1✔
760
    client.calls.create(
1✔
761
        twiml=(
762
            f"<Response><Say>Dialing {to.national_format} ...</Say>"
763
            f"<Dial>{to.phone_number}</Dial></Response>"
764
        ),
765
        to=real_phone.number,
766
        from_=relay_number.number,
767
    )
768
    return response.Response(status=200)
1✔
769

770

771
@decorators.permission_classes([permissions.IsAuthenticated, HasPhoneService])
1✔
772
@extend_schema(
1✔
773
    parameters=[
774
        OpenApiParameter(name="body", required=True, type=str),
775
        OpenApiParameter(name="destination", required=True, type=str),
776
    ],
777
    methods=["POST"],
778
    responses={200: None},
779
)
780
@decorators.api_view(["POST"])
1✔
781
def outbound_sms(request):
1✔
782
    """
783
    Send a message from the user's relay number.
784

785
    POST params:
786
        body: the body of the message
787
        destination: E.164-formatted phone number
788

789
    """
790
    # TODO: Create or update an OutboundContact (new model) on send, or limit
791
    # to InboundContacts.
792
    # TODO: Reduce user's SMS messages for the month by one
793
    if not flag_is_active(request, "outbound_phone"):
1✔
794
        return response.Response(
1✔
795
            {"detail": "Requires outbound_phone waffle flag."}, status=403
796
        )
797
    try:
1✔
798
        relay_number = RelayNumber.objects.get(user=request.user)
1✔
799
    except RelayNumber.DoesNotExist:
1✔
800
        return response.Response({"detail": "Requires a phone mask."}, status=400)
1✔
801

802
    errors = {}
1✔
803
    body = request.data.get("body")
1✔
804
    if not body:
1✔
805
        errors["body"] = "A message body is required."
1✔
806
    destination_number = request.data.get("destination")
1✔
807
    if not destination_number:
1✔
808
        errors["destination"] = "A destination number is required."
1✔
809
    if errors:
1✔
810
        return response.Response(errors, status=400)
1✔
811

812
    # Raises ValidationError on invalid number
813
    to = _validate_number(request, "destination")
1✔
814

815
    client = twilio_client()
1✔
816
    client.messages.create(from_=relay_number.number, body=body, to=to.phone_number)
1✔
817
    return response.Response(status=200)
1✔
818

819

820
@decorators.permission_classes([permissions.IsAuthenticated, HasPhoneService])
1✔
821
@extend_schema(
1✔
822
    parameters=[
823
        OpenApiParameter(
824
            name="with",
825
            type=str,
826
            required=False,
827
            description="filter to messages with the given E.164 number",
828
        ),
829
        OpenApiParameter(
830
            name="direction",
831
            type=str,
832
            required=False,
833
            description="filter to inbound or outbound messages",
834
        ),
835
    ],
836
    methods=["GET"],
837
)
838
@decorators.api_view(["GET"])
1✔
839
def list_messages(request):
1✔
840
    """
841
    Get the user's SMS messages sent to or from the phone mask
842

843
    Pass ?with=<E.164> parameter to filter the messages to only the ones sent between
844
    the phone mask and the <E.164> number.
845

846
    Pass ?direction=inbound|outbound to filter the messages to only the inbound or
847
    outbound messages. If omitted, return both.
848
    """
849
    # TODO: Support filtering to messages for outbound-only phones.
850
    # TODO: Show data from our own (encrypted) store, rather than from Twilio's
851

852
    if not flag_is_active(request, "outbound_phone"):
1✔
853
        return response.Response(
1✔
854
            {"detail": "Requires outbound_phone waffle flag."}, status=403
855
        )
856
    try:
1✔
857
        relay_number = RelayNumber.objects.get(user=request.user)
1✔
858
    except RelayNumber.DoesNotExist:
1✔
859
        return response.Response({"detail": "Requires a phone mask."}, status=400)
1✔
860

861
    _with = request.query_params.get("with", None)
1✔
862
    _direction = request.query_params.get("direction", None)
1✔
863
    if _direction and _direction not in ("inbound", "outbound"):
1✔
864
        return response.Response(
1✔
865
            {"direction": "Invalid value, valid values are 'inbound' or 'outbound'"},
866
            status=400,
867
        )
868

869
    contact = None
1✔
870
    if _with:
1✔
871
        try:
1✔
872
            contact = InboundContact.objects.get(
1✔
873
                relay_number=relay_number, inbound_number=_with
874
            )
875
        except InboundContact.DoesNotExist:
1✔
876
            return response.Response(
1✔
877
                {"with": "No inbound contacts matching the number"}, status=400
878
            )
879

880
    data = {}
1✔
881
    client = twilio_client()
1✔
882
    if not _direction or _direction == "inbound":
1✔
883
        # Query Twilio for SMS messages to the user's phone mask
884
        params = {"to": relay_number.number}
1✔
885
        if contact:
1✔
886
            # Filter query to SMS from this contact to the phone mask
887
            params["from_"] = contact.inbound_number
1✔
888
        data["inbound_messages"] = convert_twilio_messages_to_dict(
1✔
889
            client.messages.list(**params)
890
        )
891
    if not _direction or _direction == "outbound":
1✔
892
        # Query Twilio for SMS messages from the user's phone mask
893
        params = {"from_": relay_number.number}
1✔
894
        if contact:
1✔
895
            # Filter query to SMS from the phone mask to this contact
896
            params["to"] = contact.inbound_number
1✔
897
        data["outbound_messages"] = convert_twilio_messages_to_dict(
1✔
898
            client.messages.list(**params)
899
        )
900
    return response.Response(data, status=200)
1✔
901

902

903
def _get_phone_objects(inbound_to):
1✔
904
    # Get RelayNumber and RealPhone
905
    try:
1✔
906
        relay_number = RelayNumber.objects.get(number=inbound_to)
1✔
907
        real_phone = RealPhone.objects.get(user=relay_number.user, verified=True)
1✔
908
    except ObjectDoesNotExist:
1✔
909
        raise exceptions.ValidationError("Could not find relay number.")
1✔
910

911
    return relay_number, real_phone
1✔
912

913

914
class RelaySMSException(Exception):
1✔
915
    """
916
    Base class for exceptions when handling SMS messages.
917

918
    Modeled after restframework.APIException, but without a status_code.
919

920
    TODO MPP-3722: Refactor to a common base class with api.exceptions.RelayAPIException
921
    """
922

923
    critical: bool
1✔
924
    default_code: str
1✔
925
    default_detail: str | None = None
1✔
926
    default_detail_template: str | None = None
1✔
927

928
    def __init__(self, critical=False, *args, **kwargs):
1✔
929
        self.critical = critical
1✔
930
        assert (
1✔
931
            self.default_detail is not None and self.default_detail_template is None
932
        ) or (self.default_detail is None and self.default_detail_template is not None)
933
        super().__init__(*args, **kwargs)
1✔
934

935
    @property
1✔
936
    def detail(self):
1✔
937
        if self.default_detail:
1✔
938
            return self.default_detail
1✔
939
        else:
940
            assert self.default_detail_template is not None
1✔
941
            return self.default_detail_template.format(**self.error_context())
1✔
942

943
    def get_codes(self):
1✔
944
        return self.default_code
1✔
945

946
    def error_context(self) -> ErrorContextType:
1✔
947
        """Return context variables for client-side translation."""
948
        return {}
1✔
949

950

951
class NoPhoneLog(RelaySMSException):
1✔
952
    default_code = "no_phone_log"
1✔
953
    default_detail_template = (
1✔
954
        "To reply, you must allow Firefox Relay to keep a log of your callers"
955
        " and text senders. You can update this under “Caller and texts log” here:"
956
        "{account_settings_url}."
957
    )
958

959
    def error_context(self) -> ErrorContextType:
1✔
960
        return {
1✔
961
            "account_settings_url": f"{settings.SITE_ORIGIN or ''}/accounts/settings/"
962
        }
963

964

965
class NoPreviousSender(RelaySMSException):
1✔
966
    default_code = "no_previous_sender"
1✔
967
    default_detail = (
1✔
968
        "Message failed to send. You can only reply to phone numbers that have sent"
969
        " you a text message."
970
    )
971

972

973
class ShortPrefixException(RelaySMSException):
1✔
974
    """Base exception for short prefix exceptions"""
975

976
    def __init__(self, short_prefix: str, *args, **kwargs):
1✔
977
        self.short_prefix = short_prefix
1✔
978
        super().__init__(*args, **kwargs)
1✔
979

980
    def error_context(self) -> ErrorContextType:
1✔
981
        return {"short_prefix": self.short_prefix}
1✔
982

983

984
class FullNumberException(RelaySMSException):
1✔
985
    """Base exception for full number exceptions"""
986

987
    def __init__(self, full_number: str, *args, **kwargs):
1✔
988
        self.full_number = full_number
1✔
989
        super().__init__(*args, **kwargs)
1✔
990

991
    def error_context(self) -> ErrorContextType:
1✔
992
        return {"full_number": self.full_number}
1✔
993

994

995
class ShortPrefixMatchesNoSenders(ShortPrefixException):
1✔
996
    default_code = "short_prefix_matches_no_senders"
1✔
997
    default_detail_template = (
1✔
998
        "Message failed to send. There is no phone number in this thread ending"
999
        " in {short_prefix}. Please check the number and try again."
1000
    )
1001

1002

1003
class FullNumberMatchesNoSenders(FullNumberException):
1✔
1004
    default_code = "full_number_matches_no_senders"
1✔
1005
    default_detail_template = (
1✔
1006
        "Message failed to send. There is no previous sender with the phone"
1007
        " number {full_number}. Please check the number and try again."
1008
    )
1009

1010

1011
class MultipleNumberMatches(ShortPrefixException):
1✔
1012
    default_code = "multiple_number_matches"
1✔
1013
    default_detail_template = (
1✔
1014
        "Message failed to send. There is more than one phone number in this"
1015
        " thread ending in {short_prefix}. To retry, start your message with"
1016
        " the complete number."
1017
    )
1018

1019

1020
class NoBodyAfterShortPrefix(ShortPrefixException):
1✔
1021
    default_code = "no_body_after_short_prefix"
1✔
1022
    default_detail_template = (
1✔
1023
        "Message failed to send. Please include a message after the sender identifier"
1024
        " {short_prefix}."
1025
    )
1026

1027

1028
class NoBodyAfterFullNumber(FullNumberException):
1✔
1029
    default_code = "no_body_after_full_number"
1✔
1030
    default_detail_template = (
1✔
1031
        "Message failed to send. Please include a message after the phone number"
1032
        " {full_number}."
1033
    )
1034

1035

1036
def _prepare_sms_reply(
1✔
1037
    relay_number: RelayNumber, inbound_body: str
1038
) -> tuple[RelayNumber, str, str]:
1039
    incr_if_enabled("phones_handle_sms_reply")
1✔
1040
    if not relay_number.storing_phone_log:
1✔
1041
        # We do not store user's contacts in our database
1042
        raise NoPhoneLog(critical=True)
1✔
1043

1044
    match = _match_senders_by_prefix(relay_number, inbound_body)
1✔
1045

1046
    # Fail if prefix match is ambiguous
1047
    if match and not match.contacts and match.match_type == "short":
1✔
1048
        raise ShortPrefixMatchesNoSenders(short_prefix=match.detected)
1✔
1049
    if match and not match.contacts and match.match_type == "full":
1✔
1050
        raise FullNumberMatchesNoSenders(full_number=match.detected)
1✔
1051
    if match and len(match.contacts) > 1:
1✔
1052
        assert match.match_type == "short"
1✔
1053
        raise MultipleNumberMatches(short_prefix=match.detected)
1✔
1054

1055
    # Determine the destination number
1056
    destination_number: str | None = None
1✔
1057
    if match:
1✔
1058
        # Use the sender matched by the prefix
1059
        assert len(match.contacts) == 1
1✔
1060
        destination_number = match.contacts[0].inbound_number
1✔
1061
    else:
1062
        # No prefix, default to last sender if any
1063
        last_sender = get_last_text_sender(relay_number)
1✔
1064
        destination_number = getattr(last_sender, "inbound_number", None)
1✔
1065

1066
    # Fail if no last sender
1067
    if destination_number is None:
1✔
1068
        raise NoPreviousSender(critical=True)
1✔
1069

1070
    # Determine the message body
1071
    if match:
1✔
1072
        body = inbound_body.removeprefix(match.prefix)
1✔
1073
    else:
1074
        body = inbound_body
1✔
1075

1076
    # Fail if the prefix matches a sender, but there is no body to send
1077
    if match and not body and match.match_type == "short":
1✔
1078
        raise NoBodyAfterShortPrefix(short_prefix=match.detected)
1✔
1079
    if match and not body and match.match_type == "full":
1✔
1080
        raise NoBodyAfterFullNumber(full_number=match.detected)
1✔
1081

1082
    return (relay_number, destination_number, body)
1✔
1083

1084

1085
@dataclass
1✔
1086
class MatchByPrefix:
1✔
1087
    """Details of parsing a text message for a prefix."""
1088

1089
    # Was it matched by short code or full number?
1090
    match_type: Literal["short", "full"]
1✔
1091
    # The prefix portion of the text message
1092
    prefix: str
1✔
1093
    # The detected short code or full number
1094
    detected: str
1✔
1095
    # The matching numbers, as e.164 strings, empty if None
1096
    numbers: list[str] = field(default_factory=list)
1✔
1097

1098

1099
@dataclass
1✔
1100
class MatchData(MatchByPrefix):
1✔
1101
    """Details of expanding a MatchByPrefix with InboundContacts."""
1102

1103
    # The matching InboundContacts
1104
    contacts: list[InboundContact] = field(default_factory=list)
1✔
1105

1106

1107
def _match_senders_by_prefix(relay_number: RelayNumber, text: str) -> MatchData | None:
1✔
1108
    """
1109
    Match a prefix to previous InboundContact(s).
1110

1111
    If no prefix was found, returns None
1112
    If a prefix was found, a MatchData object has details and matching InboundContacts
1113
    """
1114
    multi_replies_flag, _ = get_waffle_flag_model().objects.get_or_create(
1✔
1115
        name="multi_replies",
1116
        defaults={
1117
            "note": (
1118
                "MPP-2252: Use prefix on SMS text to specify the recipient,"
1119
                " rather than default of last contact."
1120
            )
1121
        },
1122
    )
1123

1124
    if (
1✔
1125
        multi_replies_flag.is_active_for_user(relay_number.user)
1126
        or multi_replies_flag.everyone
1127
    ):
1128
        # Load all the previous contacts, collect possible countries
1129
        contacts = InboundContact.objects.filter(relay_number=relay_number).all()
1✔
1130
        contacts_by_number: dict[str, InboundContact] = {}
1✔
1131
        for contact in contacts:
1✔
1132
            # TODO: don't default to US when we support other regions
1133
            pn = phonenumbers.parse(contact.inbound_number, "US")
1✔
1134
            e164 = phonenumbers.format_number(pn, phonenumbers.PhoneNumberFormat.E164)
1✔
1135
            if e164 not in contacts_by_number:
1!
1136
                contacts_by_number[e164] = contact
1✔
1137

1138
        match = _match_by_prefix(text, set(contacts_by_number.keys()))
1✔
1139
        if match:
1✔
1140
            return MatchData(
1✔
1141
                contacts=[contacts_by_number[num] for num in match.numbers],
1142
                **asdict(match),
1143
            )
1144
    return None
1✔
1145

1146

1147
_SMS_SHORT_PREFIX_RE = re.compile(
1✔
1148
    r"""
1149
^               # Start of string
1150
\s*             # One or more spaces
1151
\d{4}           # 4 digits
1152
\s*             # Optional whitespace
1153
[:]?     # At most one separator, sync with SMS_SEPARATORS below
1154
\s*             # Trailing whitespace
1155
""",
1156
    re.VERBOSE | re.ASCII,
1157
)
1158
_SMS_SEPARATORS = set(":")  # Sync with SMS_SHORT_PREFIX_RE above
1✔
1159

1160

1161
def _match_by_prefix(text: str, candidate_numbers: set[str]) -> MatchByPrefix | None:
1✔
1162
    """
1163
    Look for a prefix in a text message matching a set of candidate numbers.
1164

1165
    Arguments:
1166
    * A SMS text message
1167
    * A set of phone numbers in E.164 format
1168

1169
    Return None if no prefix was found, or MatchByPrefix with likely match(es)
1170
    """
1171
    # Gather potential region codes, needed by PhoneNumberMatcher
1172
    region_codes = set()
1✔
1173
    for candidate_number in candidate_numbers:
1✔
1174
        pn = phonenumbers.parse(candidate_number)
1✔
1175
        if pn.country_code:
1!
1176
            region_codes |= set(
1✔
1177
                phonenumbers.region_codes_for_country_code(pn.country_code)
1178
            )
1179

1180
    # Determine where the message may start
1181
    #  PhoneNumberMatcher doesn't work well with a number directly followed by text,
1182
    #  so just feed it the start of the message that _may_ be a number.
1183
    msg_start = 0
1✔
1184
    phone_characters = set(string.digits + string.punctuation + string.whitespace)
1✔
1185
    while msg_start < len(text) and text[msg_start] in phone_characters:
1✔
1186
        msg_start += 1
1✔
1187

1188
    # Does PhoneNumberMatcher detect a full number at start of message?
1189
    text_to_match = text[:msg_start]
1✔
1190
    for region_code in region_codes:
1✔
1191
        for match in phonenumbers.PhoneNumberMatcher(text_to_match, region_code):
1✔
1192
            e164 = phonenumbers.format_number(
1✔
1193
                match.number, phonenumbers.PhoneNumberFormat.E164
1194
            )
1195

1196
            # Look for end of prefix
1197
            end = match.start + len(match.raw_string)
1✔
1198
            found_one_sep = False
1✔
1199
            while True:
1✔
1200
                if end >= len(text):
1✔
1201
                    break
1✔
1202
                elif text[end].isspace():
1✔
1203
                    end += 1
1✔
1204
                elif text[end] in _SMS_SEPARATORS and not found_one_sep:
1✔
1205
                    found_one_sep = True
1✔
1206
                    end += 1
1✔
1207
                else:
1208
                    break
1✔
1209

1210
            prefix = text[:end]
1✔
1211
            if e164 in candidate_numbers:
1✔
1212
                numbers = [e164]
1✔
1213
            else:
1214
                numbers = []
1✔
1215
            return MatchByPrefix(
1✔
1216
                match_type="full", prefix=prefix, detected=e164, numbers=numbers
1217
            )
1218

1219
    # Is there a short prefix? Return all contacts whose last 4 digits match.
1220
    text_prefix_match = _SMS_SHORT_PREFIX_RE.match(text)
1✔
1221
    if text_prefix_match:
1✔
1222
        text_prefix = text_prefix_match.group(0)
1✔
1223
        digits = set(string.digits)
1✔
1224
        digit_suffix = "".join(digit for digit in text_prefix if digit in digits)
1✔
1225
        numbers = [e164 for e164 in candidate_numbers if e164[-4:] == digit_suffix]
1✔
1226
        return MatchByPrefix(
1✔
1227
            match_type="short",
1228
            prefix=text_prefix,
1229
            detected=digit_suffix,
1230
            numbers=sorted(numbers),
1231
        )
1232

1233
    # No prefix detected
1234
    return None
1✔
1235

1236

1237
def _check_disabled(relay_number, contact_type):
1✔
1238
    # Check if RelayNumber is disabled
1239
    if not relay_number.enabled:
1✔
1240
        attr = f"{contact_type}_blocked"
1✔
1241
        incr_if_enabled(f"phones_{contact_type}_global_blocked")
1✔
1242
        setattr(relay_number, attr, getattr(relay_number, attr) + 1)
1✔
1243
        relay_number.save()
1✔
1244
        return True
1✔
1245

1246

1247
def _check_remaining(relay_number, resource_type):
1✔
1248
    # Check the owner of the relay number (still) has phone service
1249
    if not relay_number.user.profile.has_phone:
1!
UNCOV
1250
        raise exceptions.ValidationError("Number owner does not have phone service")
×
1251
    model_attr = f"remaining_{resource_type}"
1✔
1252
    if getattr(relay_number, model_attr) <= 0:
1✔
1253
        incr_if_enabled(f"phones_out_of_{resource_type}")
1✔
1254
        raise exceptions.ValidationError(f"Number is out of {resource_type}.")
1✔
1255
    return True
1✔
1256

1257

1258
def _get_inbound_contact(relay_number, inbound_from):
1✔
1259
    # Check if RelayNumber is storing phone log
1260
    if not relay_number.storing_phone_log:
1✔
1261
        return None
1✔
1262

1263
    # Check if RelayNumber is blocking this inbound_from
1264
    inbound_contact, _ = InboundContact.objects.get_or_create(
1✔
1265
        relay_number=relay_number, inbound_number=inbound_from
1266
    )
1267
    return inbound_contact
1✔
1268

1269

1270
def _check_and_update_contact(inbound_contact, contact_type, relay_number):
1✔
1271
    if inbound_contact.blocked:
1✔
1272
        incr_if_enabled(f"phones_{contact_type}_specific_blocked")
1✔
1273
        contact_attr = f"num_{contact_type}_blocked"
1✔
1274
        setattr(
1✔
1275
            inbound_contact, contact_attr, getattr(inbound_contact, contact_attr) + 1
1276
        )
1277
        inbound_contact.save()
1✔
1278
        relay_attr = f"{contact_type}_blocked"
1✔
1279
        setattr(relay_number, relay_attr, getattr(relay_number, relay_attr) + 1)
1✔
1280
        relay_number.save()
1✔
1281
        raise exceptions.ValidationError(f"Number is not accepting {contact_type}.")
1✔
1282

1283
    inbound_contact.last_inbound_date = datetime.now(UTC)
1✔
1284
    singular_contact_type = contact_type[:-1]  # strip trailing "s"
1✔
1285
    inbound_contact.last_inbound_type = singular_contact_type
1✔
1286
    attr = f"num_{contact_type}"
1✔
1287
    setattr(inbound_contact, attr, getattr(inbound_contact, attr) + 1)
1✔
1288
    last_date_attr = f"last_{singular_contact_type}_date"
1✔
1289
    setattr(inbound_contact, last_date_attr, inbound_contact.last_inbound_date)
1✔
1290
    inbound_contact.save()
1✔
1291

1292

1293
def _validate_twilio_request(request):
1✔
1294
    if "X-Twilio-Signature" not in request._request.headers:
1✔
1295
        raise exceptions.ValidationError(
1✔
1296
            "Invalid request: missing X-Twilio-Signature header."
1297
        )
1298

1299
    url = request._request.build_absolute_uri()
1✔
1300
    sorted_params = {}
1✔
1301
    for param_key in sorted(request.data):
1✔
1302
        sorted_params[param_key] = request.data.get(param_key)
1✔
1303
    request_signature = request._request.headers["X-Twilio-Signature"]
1✔
1304
    validator = twilio_validator()
1✔
1305
    if not validator.validate(url, sorted_params, request_signature):
1✔
1306
        incr_if_enabled("phones_invalid_twilio_signature")
1✔
1307
        raise exceptions.ValidationError("Invalid request: invalid signature")
1✔
1308

1309

1310
def compute_iq_mac(message_id: str) -> str:
1✔
UNCOV
1311
    iq_api_key = settings.IQ_INBOUND_API_KEY
×
1312
    # FIXME: switch to proper hmac when iQ is ready
1313
    # mac = hmac.new(
1314
    #     iq_api_key.encode(), msg=message_id.encode(), digestmod=hashlib.sha256
1315
    # )
UNCOV
1316
    combined = iq_api_key + message_id
×
UNCOV
1317
    return hashlib.sha256(combined.encode()).hexdigest()
×
1318

1319

1320
def _validate_iq_request(request: Request) -> None:
1✔
UNCOV
1321
    if "Verificationtoken" not in request._request.headers:
×
UNCOV
1322
        raise exceptions.AuthenticationFailed("missing Verificationtoken header.")
×
1323

1324
    if "MessageId" not in request._request.headers:
×
1325
        raise exceptions.AuthenticationFailed("missing MessageId header.")
×
1326

UNCOV
1327
    message_id = request._request.headers["Messageid"]
×
UNCOV
1328
    mac = compute_iq_mac(message_id)
×
1329

1330
    token = request._request.headers["verificationToken"]
×
1331

1332
    if mac != token:
×
1333
        raise exceptions.AuthenticationFailed("verficiationToken != computed sha256")
×
1334

1335

1336
def convert_twilio_messages_to_dict(twilio_messages):
1✔
1337
    """
1338
    To serialize twilio messages to JSON for the API,
1339
    we need to convert them into dictionaries.
1340
    """
1341
    messages_as_dicts = []
1✔
1342
    for twilio_message in twilio_messages:
1✔
1343
        message = {}
1✔
1344
        message["from"] = twilio_message.from_
1✔
1345
        message["to"] = twilio_message.to
1✔
1346
        message["date_sent"] = twilio_message.date_sent
1✔
1347
        message["body"] = twilio_message.body
1✔
1348
        messages_as_dicts.append(message)
1✔
1349
    return messages_as_dicts
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc