• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 3bc340e2-329f-4ed2-8700-adaaac8d78c8

15 Dec 2023 06:50PM CUT coverage: 73.514% (-0.1%) from 73.614%
3bc340e2-329f-4ed2-8700-adaaac8d78c8

push

circleci

jwhitlock
Use branch database with production tests

Previously, migrations tests were run with production code, branch
requirements, and branch migrations. Now they run with production
requirements, so that third-party migrations are tested as well.

This uses pytest --reuse-db to create a test database with the branch's
migrations, and then a pip install with the production code. This more
closely emulates the mixed environment during a deploy.

1962 of 2913 branches covered (0.0%)

Branch coverage included in aggregate %.

6273 of 8289 relevant lines covered (75.68%)

19.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.9
/privaterelay/utils.py
1
from decimal import Decimal
1✔
2
from functools import wraps
1✔
3
from string import ascii_uppercase
1✔
4
from typing import Callable, TypedDict, cast
1✔
5
import logging
1✔
6
import random
1✔
7

8
from django.conf import settings
1✔
9
from django.contrib.auth.models import AbstractBaseUser
1✔
10
from django.http import Http404, HttpRequest
1✔
11
from django.utils.translation.trans_real import parse_accept_lang_header
1✔
12

13
from waffle import get_waffle_flag_model
1✔
14
from waffle.models import logger as waffle_logger
1✔
15
from waffle.utils import (
1✔
16
    get_cache as get_waffle_cache,
17
    get_setting as get_waffle_setting,
18
)
19

20
from .plans import (
1✔
21
    LanguageStr,
22
    PeriodStr,
23
    PlanCountryLangMapping,
24
    CountryStr,
25
    get_premium_country_language_mapping,
26
)
27

28
info_logger = logging.getLogger("eventsinfo")
1✔
29

30

31
class CountryInfo(TypedDict):
1✔
32
    country_code: str
1✔
33
    countries: list[CountryStr]
1✔
34
    available_in_country: bool
1✔
35
    plan_country_lang_mapping: PlanCountryLangMapping
1✔
36

37

38
def get_countries_info_from_request_and_mapping(
1✔
39
    request: HttpRequest, mapping: PlanCountryLangMapping
40
) -> CountryInfo:
41
    country_code = _get_cc_from_request(request)
1✔
42
    countries = sorted(mapping.keys())
1✔
43
    available_in_country = country_code in countries
1✔
44
    return {
1✔
45
        "country_code": country_code,
46
        "countries": countries,
47
        "available_in_country": available_in_country,
48
        "plan_country_lang_mapping": mapping,
49
    }
50

51

52
def get_countries_info_from_lang_and_mapping(
1✔
53
    accept_lang: str, mapping: PlanCountryLangMapping
54
) -> CountryInfo:
55
    country_code = _get_cc_from_lang(accept_lang)
1✔
56
    countries = sorted(mapping.keys())
1✔
57
    available_in_country = country_code in countries
1✔
58
    return {
1✔
59
        "country_code": country_code,
60
        "countries": countries,
61
        "available_in_country": available_in_country,
62
        "plan_country_lang_mapping": mapping,
63
    }
64

65

66
def get_subplat_upgrade_link_by_language(
1✔
67
    accept_language: str, period: PeriodStr = "yearly"
68
) -> str:
69
    country_str = guess_country_from_accept_lang(accept_language)
1✔
70
    country = cast(CountryStr, country_str)
1✔
71
    language_str = accept_language.split("-")[0].lower()
1✔
72
    language = cast(LanguageStr, language_str)
1✔
73
    country_lang_mapping = get_premium_country_language_mapping()
1✔
74
    country_details = country_lang_mapping.get(country, country_lang_mapping["US"])
1✔
75
    if language in country_details:
1!
76
        plan = country_details[language][period]
×
77
    else:
78
        first_key = list(country_details.keys())[0]
1✔
79
        plan = country_details[first_key][period]
1✔
80
    return f"{settings.FXA_BASE_ORIGIN}/subscriptions/products/{settings.PERIODICAL_PREMIUM_PROD_ID}?plan={plan['id']}"
1✔
81

82

83
def _get_cc_from_request(request: HttpRequest) -> str:
1✔
84
    """Determine the user's region / country code."""
85

86
    log_data: dict[str, str] = {}
1✔
87
    cdn_region = None
1✔
88
    region = None
1✔
89
    if "X-Client-Region" in request.headers:
1✔
90
        cdn_region = region = request.headers["X-Client-Region"].upper()
1✔
91
        log_data["cdn_region"] = cdn_region
1✔
92
        log_data["region_method"] = "cdn"
1✔
93

94
    accept_language_region = None
1✔
95
    if "Accept-Language" in request.headers:
1✔
96
        log_data["accept_lang"] = request.headers["Accept-Language"]
1✔
97
        accept_language_region = _get_cc_from_lang(request.headers["Accept-Language"])
1✔
98
        log_data["accept_lang_region"] = accept_language_region
1✔
99
        if region is None:
1✔
100
            region = accept_language_region
1✔
101
            log_data["region_method"] = "accept_lang"
1✔
102

103
    if region is None:
1✔
104
        region = "US"
1✔
105
        log_data["region_method"] = "fallback"
1✔
106
    log_data["region"] = region
1✔
107

108
    # MPP-3284: Log details of region selection. Only log once per request, since some
109
    # endpoints, like /api/v1/runtime_data, call this multiple times.
110
    if not getattr(request, "_logged_region_details", False):
1✔
111
        setattr(request, "_logged_region_details", True)
1✔
112
        info_logger.info("region_details", extra=log_data)
1✔
113

114
    return region
1✔
115

116

117
def _get_cc_from_lang(accept_lang: str) -> str:
1✔
118
    try:
1✔
119
        return guess_country_from_accept_lang(accept_lang)
1✔
120
    except AcceptLanguageError:
1✔
121
        return ""
1✔
122

123

124
# Map a primary language to the most probable country
125
# Top country derived from CLDR42 Supplemental Data, Language-Territory Information
126
# with the exception of Spanish (es), which is mapped to Spain (es) instead of
127
# Mexico (mx), which has the most speakers, but usually specifies es-MX.
128
_PRIMARY_LANGUAGE_TO_COUNTRY = {
1✔
129
    "ace": "ID",  # # Acehnese -> Indonesia
130
    "ach": "UG",  # Acholi -> Uganda
131
    "af": "ZA",  # Afrikaans -> South Africa
132
    "an": "ES",  # Aragonese -> Spain
133
    "ar": "EG",  # Arabic -> Egypt
134
    "arn": "CL",  # Mapudungun -> Chile
135
    "as": "IN",  # Assamese -> India
136
    "ast": "ES",  # Asturian -> Spain
137
    "az": "AZ",  # Azerbaijani -> Azerbaijan
138
    "be": "BY",  # Belerusian -> Belarus
139
    "bg": "BG",  # Bulgarian -> Bulgaria
140
    "bn": "BD",  # Bengali -> Bangladesh
141
    "bo": "CN",  # Tibetan -> China
142
    "br": "FR",  # Breton -> France
143
    "brx": "IN",  # Bodo -> India
144
    "bs": "BA",  # Bosnian -> Bosnia and Herzegovina
145
    "ca": "FR",  # Catalan -> France
146
    "cak": "MX",  # Kaqchikel -> Mexico
147
    "ckb": "IQ",  # Central Kurdish -> Iraq
148
    "cs": "CZ",  # Czech -> Czech Republic
149
    "cv": "RU",  # Chuvash -> Russia
150
    "cy": "GB",  # Welsh -> United Kingdom
151
    "da": "DK",  # Danish -> Denmark
152
    "de": "DE",  # German -> Germany
153
    "dsb": "DE",  # Lower Sorbian -> Germany
154
    "el": "GR",  # Greek -> Greece
155
    "en": "US",  # English -> Canada
156
    "eo": "SM",  # Esperanto -> San Marino
157
    "es": "ES",  # Spanish -> Spain (instead of Mexico, top by population)
158
    "et": "EE",  # Estonian -> Estonia
159
    "eu": "ES",  # Basque -> Spain
160
    "fa": "IR",  # Persian -> Iran
161
    "ff": "SN",  # Fulah -> Senegal
162
    "fi": "FI",  # Finnish -> Finland
163
    "fr": "FR",  # French -> France
164
    "frp": "FR",  # Arpitan -> France
165
    "fur": "IT",  # Friulian -> Italy
166
    "fy": "NL",  # Frisian -> Netherlands
167
    "ga": "IE",  # Irish -> Ireland
168
    "gd": "GB",  # Scottish Gaelic -> United Kingdom
169
    "gl": "ES",  # Galician -> Spain
170
    "gn": "PY",  # Guarani -> Paraguay
171
    "gu": "IN",  # Gujarati -> India
172
    "gv": "IM",  # Manx -> Isle of Man
173
    "he": "IL",  # Hebrew -> Israel
174
    "hi": "IN",  # Hindi -> India
175
    "hr": "HR",  # Croatian -> Croatia
176
    "hsb": "DE",  # Upper Sorbian -> Germany
177
    "hu": "HU",  # Hungarian -> Hungary
178
    "hy": "AM",  # Armenian -> Armenia
179
    "hye": "AM",  # Armenian Eastern Classic Orthography -> Armenia
180
    "ia": "FR",  # Interlingua -> France
181
    "id": "ID",  # Indonesian -> Indonesia
182
    "ilo": "PH",  # Iloko -> Philippines
183
    "is": "IS",  # Icelandic -> Iceland
184
    "it": "IT",  # Italian -> Italy
185
    "ixl": "MX",  # Ixil -> Mexico
186
    "ja": "JP",  # Japanese -> Japan
187
    "jiv": "MX",  # Shuar -> Mexico
188
    "ka": "GE",  # Georgian -> Georgia
189
    "kab": "DZ",  # Kayble -> Algeria
190
    "kk": "KZ",  # Kazakh -> Kazakhstan
191
    "km": "KH",  # Khmer -> Cambodia
192
    "kn": "IN",  # Kannada -> India
193
    "ko": "KR",  # Korean -> South Korea
194
    "ks": "IN",  # Kashmiri -> India
195
    "lb": "LU",  # Luxembourgish -> Luxembourg
196
    "lg": "UG",  # Luganda -> Uganda
197
    "lij": "IT",  # Ligurian -> Italy
198
    "lo": "LA",  # Lao -> Laos
199
    "lt": "LT",  # Lithuanian -> Lithuania
200
    "ltg": "LV",  # Latgalian -> Latvia
201
    "lus": "US",  # Mizo -> United States
202
    "lv": "LV",  # Latvian -> Latvia
203
    "mai": "IN",  # Maithili -> India
204
    "meh": "MX",  # Mixteco Yucuhiti -> Mexico
205
    "mix": "MX",  # Mixtepec Mixtec -> Mexico
206
    "mk": "MK",  # Macedonian -> North Macedonia
207
    "ml": "IN",  # Malayalam -> India
208
    "mr": "IN",  # Marathi -> India
209
    "ms": "MY",  # Malay -> Malaysia
210
    "my": "MM",  # Burmese -> Myanmar
211
    "nb": "NO",  # Norwegian Bokmål -> Norway
212
    "ne": "NP",  # Nepali -> Nepal
213
    "nl": "NL",  # Dutch -> Netherlands
214
    "nn": "NO",  # Norwegian Nynorsk -> Norway
215
    "oc": "FR",  # Occitan -> France
216
    "or": "IN",  # Odia -> India
217
    "pa": "IN",  # Punjabi -> India
218
    "pl": "PL",  # Polish -> Poland
219
    "ppl": "MX",  # Náhuat Pipil -> Mexico
220
    "pt": "BR",  # Portuguese -> Brazil
221
    "quc": "GT",  # K'iche' -> Guatemala
222
    "rm": "CH",  # Romansh -> Switzerland
223
    "ro": "RO",  # Romanian -> Romania
224
    "ru": "RU",  # Russian -> Russia
225
    "sat": "IN",  # Santali (Ol Chiki) -> India
226
    "sc": "IT",  # Sardinian -> Italy
227
    "scn": "IT",  # Sicilian -> Italy
228
    "sco": "GB",  # Scots -> United Kingdom
229
    "si": "LK",  # Sinhala -> Sri Lanka
230
    "sk": "SK",  # Slovak -> Slovakia
231
    "skr": "PK",  # Saraiki -> Pakistan
232
    "sl": "SI",  # Slovenian -> Slovenia
233
    "son": "ML",  # Songhay -> Mali
234
    "sq": "AL",  # Albanian -> Albania
235
    "sr": "RS",  # Serbian -> Serbia
236
    "sv": "SE",  # Swedish -> Sweeden
237
    "sw": "TZ",  # Swahili -> Tanzania
238
    "szl": "PL",  # Silesian -> Poland
239
    "ta": "IN",  # Tamil -> India
240
    "te": "IN",  # Telugu -> India
241
    "tg": "TJ",  # Tajik -> Tajikistan
242
    "th": "TH",  # Thai -> Thailand
243
    "tl": "PH",  # Tagalog -> Philippines
244
    "tr": "TR",  # Turkish or Crimean Tatar -> Turkey
245
    "trs": "MX",  # Triqui -> Mexico
246
    "uk": "UA",  # Ukrainian -> Ukraine
247
    "ur": "PK",  # Urdu -> Pakistan
248
    "uz": "UZ",  # Uzbek -> Uzbekistan
249
    "vi": "VN",  # Vietnamese -> Vietnam
250
    "wo": "SN",  # Wolof -> Senegal
251
    "xcl": "AM",  # Armenian Classic -> Armenia
252
    "xh": "ZA",  # Xhosa -> South Africa
253
    "zam": "MX",  # Miahuatlán Zapotec -> Mexico
254
    "zh": "CN",  # Chinese -> China
255
}
256

257
# Special cases for language tags
258
_LANGUAGE_TAG_TO_COUNTRY_OVERRIDE = {
1✔
259
    # Would be Catalan in Valencian script -> France
260
    # Change to Valencian -> Spain
261
    ("ca", "VALENCIA"): "ES",
262
    # Spanish in UN region 419 (Latin America and Carribean)
263
    # Pick Mexico, which has highest number of Spanish speakers
264
    ("es", "419"): "MX",
265
    # Would be Galician (Greenland) -> Greenland
266
    # Change to Galician (Galicia region of Spain) -> Spain
267
    ("gl", "GL"): "ES",
268
}
269

270

271
class AcceptLanguageError(ValueError):
1✔
272
    """There was an issue processing the Accept-Language header."""
273

274
    def __init__(self, message, accept_lang):
1✔
275
        super().__init__(message)
1✔
276
        self.accept_lang = accept_lang
1✔
277

278

279
def guess_country_from_accept_lang(accept_lang: str) -> str:
1✔
280
    """
281
    Guess the user's country from the Accept-Language header
282

283
    Return is a 2-letter ISO 3166 country code
284

285
    If an issue is detected, a AcceptLanguageError is raised.
286

287
    The header may come directly from a web request, or may be the header
288
    captured by Mozilla Accounts (FxA) at signup.
289

290
    Even with all this logic and special casing, it is still more accurate to
291
    use a GeoIP lookup or a country code provided by the infrastructure.
292

293
    See RFC 9110, "HTTP Semantics", section 12.5.4, "Accept-Language"
294
    See RFC 5646, "Tags for Identifying Languages", and examples in Appendix A
295
    """
296
    lang_q_pairs = parse_accept_lang_header(accept_lang.strip())
1✔
297
    if not lang_q_pairs:
1✔
298
        raise AcceptLanguageError("Invalid Accept-Language string", accept_lang)
1✔
299
    top_lang_tag = lang_q_pairs[0][0]
1✔
300

301
    subtags = top_lang_tag.split("-")
1✔
302
    lang = subtags[0].lower()
1✔
303
    if lang == "i":
1✔
304
        raise AcceptLanguageError("Irregular language tag", accept_lang)
1✔
305
    if lang == "x":
1✔
306
        raise AcceptLanguageError("Private-use language tag", accept_lang)
1✔
307
    if lang == "*":
1✔
308
        raise AcceptLanguageError("Wildcard language tag", accept_lang)
1✔
309
    if len(lang) < 2:
1✔
310
        raise AcceptLanguageError("Invalid one-character primary language", accept_lang)
1✔
311
    if len(lang) == 3 and lang[0] == "q" and lang[1] <= "t":
1✔
312
        raise AcceptLanguageError(
1✔
313
            "Private-use language tag (RFC 5646 2.2.1)", accept_lang
314
        )
315

316
    for maybe_region_raw in subtags[1:]:
1✔
317
        maybe_region = maybe_region_raw.upper()
1✔
318

319
        # Look for a special case
320
        if override := _LANGUAGE_TAG_TO_COUNTRY_OVERRIDE.get((lang, maybe_region)):
1✔
321
            return override
1✔
322

323
        if len(maybe_region) <= 1:
1✔
324
            # One-character extension or empty, stop processing
325
            break
1✔
326
        if (
1✔
327
            len(maybe_region) == 2
328
            and all(c in ascii_uppercase for c in maybe_region)
329
            and
330
            # RFC 5646 2.2.4 "Region Subtag" point 6, reserved subtags
331
            maybe_region != "AA"
332
            and maybe_region != "ZZ"
333
            and maybe_region[0] != "X"
334
            and (maybe_region[0] != "Q" or maybe_region[1] < "M")
335
        ):
336
            # Subtag is a non-private ISO 3166 country code
337
            return maybe_region
1✔
338

339
        # Subtag is probably a script, like "Hans" in "zh-Hans-CN"
340
        # Loop to the next subtag, which might be a ISO 3166 country code
341

342
    # Guess the country from a simple language tag
343
    try:
1✔
344
        return _PRIMARY_LANGUAGE_TO_COUNTRY[lang]
1✔
345
    except KeyError:
1✔
346
        raise AcceptLanguageError("Unknown langauge", accept_lang)
1✔
347

348

349
def enable_or_404(
1✔
350
    check_function: Callable[[], bool],
351
    message: str = "This conditional view is disabled.",
352
):
353
    """
354
    Returns decorator that enables a view if a check function passes,
355
    otherwise returns a 404.
356

357
    Usage:
358

359
        def percent_1():
360
           import random
361
           return random.randint(1, 100) == 1
362

363
        @enable_if(coin_flip)
364
        def lucky_view(request):
365
            #  1 in 100 chance of getting here
366
            # 99 in 100 chance of 404
367
    """
368

369
    def decorator(func):
1✔
370
        @wraps(func)
1✔
371
        def inner(*args, **kwargs):
1✔
372
            if check_function():
×
373
                return func(*args, **kwargs)
×
374
            else:
375
                raise Http404(message)  # Display a message with DEBUG=True
×
376

377
        return inner
1✔
378

379
    return decorator
1✔
380

381

382
def enable_if_setting(
1✔
383
    setting_name: str,
384
    message_fmt: str = "This view is disabled because {setting_name} is False",
385
):
386
    """
387
    Returns decorator that enables a view if a setting is truthy, otherwise
388
    returns a 404.
389

390
    Usage:
391

392
        @enable_if_setting("DEBUG")
393
        def debug_only_view(request):
394
            # DEBUG == True
395

396
    Or in URLS:
397

398
        path(
399
            "developer_info",
400
            enable_if_setting("DEBUG")(debug_only_view)
401
        ),
402
        name="developer-info",
403
    ),
404

405
    """
406

407
    def setting_is_truthy() -> bool:
1✔
408
        return bool(getattr(settings, setting_name))
×
409

410
    return enable_or_404(
1✔
411
        setting_is_truthy, message_fmt.format(setting_name=setting_name)
412
    )
413

414

415
def flag_is_active_in_task(flag_name: str, user: AbstractBaseUser | None) -> bool:
1✔
416
    """
417
    Test if a flag is active in a task (not in a web request).
418

419
    This mirrors AbstractBaseFlag.is_active, replicating these checks:
420
    * Logs missing flags, if configured
421
    * Creates missing flags, if configured
422
    * Returns default for missing flags
423
    * Checks flag.everyone
424
    * Checks flag.users and flag.groups, if a user is passed
425
    * Returns random results for flag.percent
426

427
    It does not include:
428
    * Overriding a flag with a query parameter
429
    * Persisting a flag in a cookie (includes percent flags)
430
    * Language-specific overrides (could be added later)
431
    * Read-only mode for percent flags
432

433
    When using this function, use the @override_flag decorator in tests, rather
434
    than manually creating flags in the database.
435
    """
436
    flag = get_waffle_flag_model().get(flag_name)
1✔
437
    if not flag.pk:
1✔
438
        log_level = get_waffle_setting("LOG_MISSING_FLAGS")
1✔
439
        if log_level:
1✔
440
            waffle_logger.log(log_level, "Flag %s not found", flag_name)
1✔
441
        if get_waffle_setting("CREATE_MISSING_FLAGS"):
1✔
442
            flag, _created = get_waffle_flag_model().objects.get_or_create(
1✔
443
                name=flag_name,
444
                defaults={"everyone": get_waffle_setting("FLAG_DEFAULT")},
445
            )
446
            cache = get_waffle_cache()
1✔
447
            cache.set(flag._cache_key(flag.name), flag)
1✔
448

449
        return bool(get_waffle_setting("FLAG_DEFAULT"))
1✔
450

451
    # Removed - check for override as request query parameter
452

453
    if flag.everyone:
1✔
454
        return True
1✔
455
    elif flag.everyone is False:
1✔
456
        return False
1✔
457

458
    # Removed - check for testing override in request query or cookie
459
    # Removed - check for language-specific override
460

461
    if user is not None:
1✔
462
        active_for_user = flag.is_active_for_user(user)
1✔
463
        if active_for_user is not None:
1✔
464
            return bool(active_for_user)
1✔
465

466
    if flag.percent and flag.percent > 0:
1✔
467
        # Removed - check for waffles attribute of request
468
        # Removed - check for cookie setting for flag
469
        # Removed - check for read-only mode
470

471
        if Decimal(str(random.uniform(0, 100))) <= flag.percent:
1✔
472
            # Removed - setting the flag for future checks
473
            return True
1✔
474

475
    return False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc