• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 6254eb69-bdb9-4bd1-8b91-b230e80eaca9

15 Sep 2023 05:21PM CUT coverage: 74.526% (-0.07%) from 74.591%
6254eb69-bdb9-4bd1-8b91-b230e80eaca9

push

circleci

web-flow
Merge pull request #3891 from mozilla/retire-resender-flag-3404

MPP-3404 - Retire resender flag

1895 of 2759 branches covered (0.0%)

Branch coverage included in aggregate %.

7 of 7 new or added lines in 2 files covered. (100.0%)

5966 of 7789 relevant lines covered (76.6%)

18.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.94
/emails/utils.py
1
import base64
1✔
2
import contextlib
1✔
3
from email.errors import InvalidHeaderDefect
1✔
4
from email.headerregistry import Address
1✔
5
from email.mime.multipart import MIMEMultipart
1✔
6
from email.mime.text import MIMEText
1✔
7
from email.mime.application import MIMEApplication
1✔
8
from email.utils import formataddr, parseaddr
1✔
9
from functools import cache
1✔
10
from typing import cast, Any, Callable, TypeVar
1✔
11
import json
1✔
12
import pathlib
1✔
13
import re
1✔
14
from django.template.loader import render_to_string
1✔
15
from django.utils.text import Truncator
1✔
16
import requests
1✔
17

18
from botocore.exceptions import ClientError
1✔
19
from cryptography.hazmat.primitives import hashes
1✔
20
from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand
1✔
21
from mypy_boto3_ses.type_defs import SendRawEmailResponseTypeDef
1✔
22
import jwcrypto.jwe
1✔
23
import jwcrypto.jwk
1✔
24
import markus
1✔
25
import logging
1✔
26
from urllib.parse import quote_plus, urlparse
1✔
27

28
from django.apps import apps
1✔
29
from django.conf import settings
1✔
30
from django.contrib.auth.models import Group, User
1✔
31
from django.http import HttpResponse
1✔
32
from django.template.defaultfilters import linebreaksbr, urlize
1✔
33

34
from allauth.socialaccount.models import SocialAccount
1✔
35

36
from privaterelay.plans import get_bundle_country_language_mapping
1✔
37
from privaterelay.utils import get_countries_info_from_lang_and_mapping
1✔
38

39
from .apps import EmailsConfig
1✔
40
from .models import (
1✔
41
    DomainAddress,
42
    RelayAddress,
43
    Reply,
44
    get_domains_from_settings,
45
)
46
from .types import AttachmentPair, AWS_MailJSON, MessageBody, OutgoingHeaders
1✔
47

48

49
logger = logging.getLogger("events")
1✔
50
info_logger = logging.getLogger("eventsinfo")
1✔
51
study_logger = logging.getLogger("studymetrics")
1✔
52
metrics = markus.get_metrics("fx-private-relay")
1✔
53

54
shavar_prod_lists_url = (
1✔
55
    "https://raw.githubusercontent.com/mozilla-services/shavar-prod-lists/"
56
    "master/disconnect-blacklist.json"
57
)
58
EMAILS_FOLDER_PATH = pathlib.Path(__file__).parent
1✔
59
TRACKER_FOLDER_PATH = EMAILS_FOLDER_PATH / "tracker_lists"
1✔
60

61

62
def get_trackers(level):
1✔
63
    category = "Email"
×
64
    tracker_list_name = "level-one-trackers"
×
65
    if level == 2:
×
66
        category = "EmailAggressive"
×
67
        tracker_list_name = "level-two-trackers"
×
68

69
    trackers = []
×
70
    file_name = f"{tracker_list_name}.json"
×
71
    try:
×
72
        with open(TRACKER_FOLDER_PATH / file_name, "r") as f:
×
73
            trackers = json.load(f)
×
74
    except FileNotFoundError:
×
75
        trackers = download_trackers(shavar_prod_lists_url, category)
×
76
        store_trackers(trackers, TRACKER_FOLDER_PATH, file_name)
×
77
    return trackers
×
78

79

80
def download_trackers(repo_url, category="Email"):
1✔
81
    # email tracker lists from shavar-prod-list as per agreed use under license:
82
    resp = requests.get(repo_url)
×
83
    json_resp = resp.json()
×
84
    formatted_trackers = json_resp["categories"][category]
×
85
    trackers = []
×
86
    for entity in formatted_trackers:
×
87
        for _, resources in entity.items():
×
88
            for _, domains in resources.items():
×
89
                trackers.extend(domains)
×
90
    return trackers
×
91

92

93
def store_trackers(trackers, path, file_name):
1✔
94
    with open(path / file_name, "w+") as f:
×
95
        json.dump(trackers, f, indent=4)
×
96

97

98
@cache
1✔
99
def general_trackers():
1✔
100
    return get_trackers(level=1)
×
101

102

103
@cache
1✔
104
def strict_trackers():
1✔
105
    return get_trackers(level=2)
×
106

107

108
_TimedFunction = TypeVar("_TimedFunction", bound=Callable[..., Any])
1✔
109

110

111
def time_if_enabled(name: str) -> Callable[[_TimedFunction], _TimedFunction]:
1✔
112
    def timing_decorator(func: _TimedFunction) -> _TimedFunction:
1✔
113
        def func_wrapper(*args, **kwargs):
1✔
114
            ctx_manager = (
1✔
115
                metrics.timer(name)
116
                if settings.STATSD_ENABLED
117
                else contextlib.nullcontext()
118
            )
119
            with ctx_manager:
1✔
120
                return func(*args, **kwargs)
1✔
121

122
        return cast(_TimedFunction, func_wrapper)
1✔
123

124
    return timing_decorator
1✔
125

126

127
def incr_if_enabled(name, value=1, tags=None):
1✔
128
    if settings.STATSD_ENABLED:
1✔
129
        metrics.incr(name, value, tags)
1✔
130

131

132
def histogram_if_enabled(name, value, tags=None):
1✔
133
    if settings.STATSD_ENABLED:
1!
134
        metrics.histogram(name, value=value, tags=tags)
×
135

136

137
def gauge_if_enabled(name, value, tags=None):
1✔
138
    if settings.STATSD_ENABLED:
1✔
139
        metrics.gauge(name, value, tags)
1✔
140

141

142
def get_email_domain_from_settings():
1✔
143
    email_network_locality = urlparse(settings.SITE_ORIGIN).netloc
1✔
144
    # on dev server we need to add "mail" prefix
145
    # because we can’t publish MX records on Heroku
146
    if settings.RELAY_CHANNEL == "dev":
1✔
147
        email_network_locality = f"mail.{email_network_locality}"
1✔
148
    return email_network_locality
1✔
149

150

151
def _get_hero_img_src(lang_code):
1✔
152
    img_locale = "en"
1✔
153
    avail_l10n_image_codes = [
1✔
154
        "cs",
155
        "de",
156
        "en",
157
        "es",
158
        "fi",
159
        "fr",
160
        "hu",
161
        "id",
162
        "it",
163
        "ja",
164
        "nl",
165
        "pt",
166
        "ru",
167
        "sv",
168
        "zh",
169
    ]
170
    major_lang = lang_code.split("-")[0]
1✔
171
    if major_lang in avail_l10n_image_codes:
1!
172
        img_locale = major_lang
1✔
173

174
    return (
1✔
175
        settings.SITE_ORIGIN
176
        + f"/static/images/email-images/first-time-user/hero-image-{img_locale}.png"
177
    )
178

179

180
def get_welcome_email(user: User, format: str) -> str:
1✔
181
    sa = SocialAccount.objects.get(user=user)
1✔
182
    bundle_plans = get_countries_info_from_lang_and_mapping(
1✔
183
        sa.extra_data.get("locale", "en"), get_bundle_country_language_mapping()
184
    )
185
    lang_code = user.profile.language
1✔
186
    hero_img_src = _get_hero_img_src(lang_code)
1✔
187
    return render_to_string(
1✔
188
        f"emails/first_time_user.{format}",
189
        {
190
            "in_bundle_country": bundle_plans["available_in_country"],
191
            "SITE_ORIGIN": settings.SITE_ORIGIN,
192
            "hero_img_src": hero_img_src,
193
            "language": lang_code,
194
        },
195
    )
196

197

198
@time_if_enabled("ses_send_raw_email")
1✔
199
def ses_send_raw_email(
1✔
200
    source_address: str,
201
    destination_address: str,
202
    message: MIMEMultipart,
203
) -> SendRawEmailResponseTypeDef:
204
    emails_config = apps.get_app_config("emails")
1✔
205
    assert isinstance(emails_config, EmailsConfig)
1✔
206
    ses_client = emails_config.ses_client
1✔
207
    assert ses_client
1✔
208
    assert settings.AWS_SES_CONFIGSET
1✔
209
    try:
1✔
210
        ses_response = ses_client.send_raw_email(
1✔
211
            Source=source_address,
212
            Destinations=[destination_address],
213
            RawMessage={"Data": message.as_string()},
214
            ConfigurationSetName=settings.AWS_SES_CONFIGSET,
215
        )
216
        incr_if_enabled("ses_send_raw_email", 1)
1✔
217
        return ses_response
1✔
218
    except ClientError as e:
1✔
219
        logger.error("ses_client_error_raw_email", extra=e.response["Error"])
1✔
220
        raise
1✔
221

222

223
def create_message(
1✔
224
    headers: OutgoingHeaders,
225
    message_body: MessageBody,
226
    attachments: list[AttachmentPair] | None = None,
227
) -> MIMEMultipart:
228
    msg_with_headers = _start_message_with_headers(headers)
1✔
229
    msg_with_body = _add_body_to_message(msg_with_headers, message_body)
1✔
230
    if not attachments:
1!
231
        return msg_with_body
1✔
232
    msg_with_attachments = _add_attachments_to_message(msg_with_body, attachments)
×
233
    return msg_with_attachments
×
234

235

236
def _start_message_with_headers(headers: OutgoingHeaders) -> MIMEMultipart:
1✔
237
    # Create a multipart/mixed parent container.
238
    msg = MIMEMultipart("mixed")
1✔
239
    # Add headers
240
    for name, value in headers.items():
1✔
241
        msg[name] = value
1✔
242
    return msg
1✔
243

244

245
def _add_body_to_message(
1✔
246
    msg: MIMEMultipart, message_body: MessageBody
247
) -> MIMEMultipart:
248
    # Create a multipart/alternative child container.
249
    msg_body = MIMEMultipart("alternative")
1✔
250

251
    if "Text" in message_body:
1!
252
        body_text = message_body["Text"]["Data"]
1✔
253
        # Let MIMEText determine if us-ascii encoding will work
254
        textpart = MIMEText(body_text, "plain")
1✔
255
        msg_body.attach(textpart)
1✔
256
    if "Html" in message_body:
1✔
257
        body_html = message_body["Html"]["Data"]
1✔
258
        # Our translated strings contain U+2068 (First Strong Isolate) and
259
        # U+2069 (Pop Directional Isolate), us-ascii will not work
260
        # so save time by suggesting utf-8 encoding
261
        htmlpart = MIMEText(body_html, "html", "utf-8")
1✔
262
        msg_body.attach(htmlpart)
1✔
263

264
    # Attach the multipart/alternative child container to the multipart/mixed
265
    # parent container.
266
    msg.attach(msg_body)
1✔
267
    return msg
1✔
268

269

270
def _add_attachments_to_message(
1✔
271
    msg: MIMEMultipart, attachments: list[AttachmentPair]
272
) -> MIMEMultipart:
273
    # attach attachments
274
    for actual_att_name, attachment in attachments:
×
275
        # Define the attachment part and encode it using MIMEApplication.
276
        attachment.seek(0)
×
277
        att = MIMEApplication(attachment.read())
×
278

279
        # Add a header to tell the email client to treat this
280
        # part as an attachment, and to give the attachment a name.
281
        att.add_header("Content-Disposition", "attachment", filename=actual_att_name)
×
282
        # Add the attachment to the parent container.
283
        msg.attach(att)
×
284
        attachment.close()
×
285
    return msg
×
286

287

288
def _store_reply_record(
1✔
289
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
290
) -> AWS_MailJSON:
291
    # After relaying email, store a Reply record for it
292
    reply_metadata = {}
1✔
293
    for header in mail["headers"]:
1✔
294
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
295
            reply_metadata[header["name"].lower()] = header["value"]
1✔
296
    message_id_bytes = get_message_id_bytes(message_id)
1✔
297
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
298
    lookup = b64_lookup_key(lookup_key)
1✔
299
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
300
    reply_create_args: dict[str, Any] = {
1✔
301
        "lookup": lookup,
302
        "encrypted_metadata": encrypted_metadata,
303
    }
304
    if type(address) == DomainAddress:
1✔
305
        reply_create_args["domain_address"] = address
1✔
306
    elif type(address) == RelayAddress:
1!
307
        reply_create_args["relay_address"] = address
1✔
308
    Reply.objects.create(**reply_create_args)
1✔
309
    return mail
1✔
310

311

312
def ses_relay_email(
1✔
313
    source_address: str,
314
    destination_address: str,
315
    headers: OutgoingHeaders,
316
    message_body: MessageBody,
317
    attachments: list[AttachmentPair],
318
    mail: AWS_MailJSON,
319
    address: RelayAddress | DomainAddress,
320
) -> HttpResponse:
321
    message = create_message(headers, message_body, attachments)
1✔
322
    try:
1✔
323
        ses_response = ses_send_raw_email(source_address, destination_address, message)
1✔
324
    except ClientError:
1✔
325
        # 503 service unavailable reponse to SNS so it can retry
326
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
327

328
    message_id = ses_response["MessageId"]
1✔
329
    _store_reply_record(mail, message_id, address)
1✔
330
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
331

332

333
def urlize_and_linebreaks(text, autoescape=True):
1✔
334
    return linebreaksbr(urlize(text, autoescape=autoescape), autoescape=autoescape)
×
335

336

337
def get_reply_to_address(premium: bool = True) -> str:
1✔
338
    """Return the address that relays replies."""
339
    if premium:
1!
340
        _, reply_to_address = parseaddr(
1✔
341
            "replies@%s" % get_domains_from_settings().get("RELAY_FIREFOX_DOMAIN")
342
        )
343
    else:
344
        _, reply_to_address = parseaddr(settings.RELAY_FROM_ADDRESS)
×
345
    return reply_to_address
1✔
346

347

348
def truncate(max_length: int, value: str) -> str:
1✔
349
    """
350
    Truncate a string to a maximum length.
351

352
    If the value is all ASCII, the truncation suffix will be ...
353
    If the value is non-ASCII, the truncation suffix will be … (Unicode ellipsis)
354
    """
355
    if len(value) <= max_length:
1✔
356
        return value
1✔
357
    ellipsis = "..."  # ASCII Ellipsis
1✔
358
    try:
1✔
359
        value.encode("ascii")
1✔
360
    except UnicodeEncodeError:
1✔
361
        ellipsis = "…"
1✔
362
    return Truncator(value).chars(max_length, truncate=ellipsis)
1✔
363

364

365
def generate_from_header(original_from_address: str, relay_mask: str) -> str:
1✔
366
    """
367
    Return a From: header str using the original sender and a display name that
368
    refers to Relay.
369

370
    This format was introduced in June 2023 with MPP-2117.
371
    """
372
    oneline_from_address = (
1✔
373
        original_from_address.replace("\u2028", "").replace("\r", "").replace("\n", "")
374
    )
375
    display_name, original_address = parseaddr(oneline_from_address)
1✔
376
    try:
1✔
377
        parsed_address = Address(addr_spec=original_address)
1✔
378
    except (InvalidHeaderDefect, IndexError) as e:
1✔
379
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
380
        info_logger.error(
1✔
381
            "generate_from_header",
382
            extra={
383
                "exception_type": type(e).__name__,
384
                "original_from_address": original_from_address,
385
            },
386
        )
387
        raise
1✔
388

389
    # Truncate the display name to 71 characters, so the sender portion fits on the
390
    # first line of a multi-line "From:" header, if it is ASCII. A utf-8 encoded header
391
    # will be 226 chars, still below the 998 limit of RFC 5322 2.1.1.
392
    max_length = 71
1✔
393

394
    if display_name:
1✔
395
        short_name = truncate(max_length, display_name)
1✔
396
        short_address = truncate(max_length, parsed_address.addr_spec)
1✔
397
        sender = f"{short_name} <{short_address}>"
1✔
398
    else:
399
        # Use the email address if the display name was not originally set
400
        display_name = parsed_address.addr_spec
1✔
401
        sender = truncate(max_length, display_name)
1✔
402
    return formataddr((f"{sender} [via Relay]", relay_mask))
1✔
403

404

405
def get_message_id_bytes(message_id_str: str) -> bytes:
1✔
406
    message_id = message_id_str.split("@", 1)[0].rsplit("<", 1)[-1].strip()
1✔
407
    return message_id.encode()
1✔
408

409

410
def b64_lookup_key(lookup_key: bytes) -> str:
1✔
411
    return base64.urlsafe_b64encode(lookup_key).decode("ascii")
1✔
412

413

414
def derive_reply_keys(message_id: bytes) -> tuple[bytes, bytes]:
1✔
415
    """Derive the lookup key and encrytion key from an aliased message id."""
416
    algorithm = hashes.SHA256()
1✔
417
    hkdf = HKDFExpand(algorithm=algorithm, length=16, info=b"replay replies lookup key")
1✔
418
    lookup_key = hkdf.derive(message_id)
1✔
419
    hkdf = HKDFExpand(
1✔
420
        algorithm=algorithm, length=32, info=b"replay replies encryption key"
421
    )
422
    encryption_key = hkdf.derive(message_id)
1✔
423
    return (lookup_key, encryption_key)
1✔
424

425

426
def encrypt_reply_metadata(key: bytes, payload: dict[str, str]) -> str:
1✔
427
    """Encrypt the given payload into a JWE, using the given key."""
428
    # This is a bit dumb, we have to base64-encode the key in order to load it :-/
429
    k = jwcrypto.jwk.JWK(
1✔
430
        kty="oct", k=base64.urlsafe_b64encode(key).rstrip(b"=").decode("ascii")
431
    )
432
    e = jwcrypto.jwe.JWE(
1✔
433
        json.dumps(payload), json.dumps({"alg": "dir", "enc": "A256GCM"}), recipient=k
434
    )
435
    return cast(str, e.serialize(compact=True))
1✔
436

437

438
def decrypt_reply_metadata(key, jwe):
1✔
439
    """Decrypt the given JWE into a json payload, using the given key."""
440
    # This is a bit dumb, we have to base64-encode the key in order to load it :-/
441
    k = jwcrypto.jwk.JWK(
1✔
442
        kty="oct", k=base64.urlsafe_b64encode(key).rstrip(b"=").decode("ascii")
443
    )
444
    e = jwcrypto.jwe.JWE()
1✔
445
    e.deserialize(jwe)
1✔
446
    e.decrypt(k)
1✔
447
    return e.plaintext
1✔
448

449

450
def _get_bucket_and_key_from_s3_json(message_json):
1✔
451
    # Only Received notifications have S3-stored data
452
    notification_type = message_json.get("notificationType")
1✔
453
    if notification_type != "Received":
1✔
454
        return None, None
1✔
455

456
    if "receipt" in message_json and "action" in message_json["receipt"]:
1!
457
        message_json_receipt = message_json["receipt"]
1✔
458
    else:
459
        logger.error(
×
460
            "sns_inbound_message_without_receipt",
461
            extra={"message_json_keys": message_json.keys()},
462
        )
463
        return None, None
×
464

465
    bucket = None
1✔
466
    object_key = None
1✔
467
    try:
1✔
468
        if "S3" in message_json_receipt["action"]["type"]:
1✔
469
            bucket = message_json_receipt["action"]["bucketName"]
1✔
470
            object_key = message_json_receipt["action"]["objectKey"]
1✔
471
    except (KeyError, TypeError):
×
472
        logger.error(
×
473
            "sns_inbound_message_receipt_malformed",
474
            extra={"receipt_action": message_json_receipt["action"]},
475
        )
476
    return bucket, object_key
1✔
477

478

479
@time_if_enabled("s3_get_message_content")
1✔
480
def get_message_content_from_s3(bucket, object_key):
1✔
481
    if bucket and object_key:
×
482
        s3_client = apps.get_app_config("emails").s3_client
×
483
        streamed_s3_object = s3_client.get_object(Bucket=bucket, Key=object_key).get(
×
484
            "Body"
485
        )
486
        return streamed_s3_object.read()
×
487

488

489
@time_if_enabled("s3_remove_message_from")
1✔
490
def remove_message_from_s3(bucket, object_key):
1✔
491
    if bucket is None or object_key is None:
1!
492
        return False
1✔
493
    try:
×
494
        s3_client = apps.get_app_config("emails").s3_client
×
495
        response = s3_client.delete_object(Bucket=bucket, Key=object_key)
×
496
        return response.get("DeleteMarker")
×
497
    except ClientError as e:
×
498
        if e.response["Error"].get("Code", "") == "NoSuchKey":
×
499
            logger.error("s3_delete_object_does_not_exist", extra=e.response["Error"])
×
500
        else:
501
            logger.error("s3_client_error_delete_email", extra=e.response["Error"])
×
502
        incr_if_enabled("message_not_removed_from_s3", 1)
×
503
    return False
×
504

505

506
def set_user_group(user):
1✔
507
    if "@" not in user.email:
1✔
508
        return None
1✔
509
    email_domain = user.email.split("@")[1]
1✔
510
    group_attribute = {
1✔
511
        "mozilla.com": "mozilla_corporation",
512
        "mozillafoundation.org": "mozilla_foundation",
513
        "getpocket.com": "pocket",
514
    }
515
    group_name = group_attribute.get(email_domain)
1✔
516
    if not group_name:
1!
517
        return None
1✔
518
    internal_group_qs = Group.objects.filter(name=group_name)
×
519
    internal_group = internal_group_qs.first()
×
520
    if internal_group is None:
×
521
        return None
×
522
    internal_group.user_set.add(user)
×
523

524

525
def convert_domains_to_regex_patterns(domain_pattern):
1✔
526
    return r"""(["'])(\S*://(\S*\.)*""" + re.escape(domain_pattern) + r"\S*)\1"
1✔
527

528

529
def count_tracker(html_content, trackers):
1✔
530
    tracker_total = 0
1✔
531
    details = {}
1✔
532
    # html_content needs to be str for count()
533
    for tracker in trackers:
1✔
534
        pattern = convert_domains_to_regex_patterns(tracker)
1✔
535
        html_content, count = re.subn(pattern, "", html_content)
1✔
536
        if count:
1✔
537
            tracker_total += count
1✔
538
            details[tracker] = count
1✔
539
    return {"count": tracker_total, "trackers": details}
1✔
540

541

542
def count_all_trackers(html_content):
1✔
543
    general_detail = count_tracker(html_content, general_trackers())
×
544
    strict_detail = count_tracker(html_content, strict_trackers())
×
545

546
    incr_if_enabled("tracker.general_count", general_detail["count"])
×
547
    incr_if_enabled("tracker.strict_count", strict_detail["count"])
×
548
    study_logger.info(
×
549
        "email_tracker_summary",
550
        extra={"level_one": general_detail, "level_two": strict_detail},
551
    )
552

553

554
def remove_trackers(html_content, from_address, datetime_now, level="general"):
1✔
555
    trackers = general_trackers() if level == "general" else strict_trackers()
1✔
556
    tracker_removed = 0
1✔
557
    changed_content = html_content
1✔
558

559
    for tracker in trackers:
1✔
560
        pattern = convert_domains_to_regex_patterns(tracker)
1✔
561

562
        def convert_to_tracker_warning_link(matchobj):
1✔
563
            quote, original_link, _ = matchobj.groups()
1✔
564
            tracker_link_details = {
1✔
565
                "sender": from_address,
566
                "received_at": datetime_now,
567
                "original_link": original_link,
568
            }
569
            anchor = quote_plus(json.dumps(tracker_link_details, separators=(",", ":")))
1✔
570
            url = f"{settings.SITE_ORIGIN}/contains-tracker-warning/#{anchor}"
1✔
571
            return f"{quote}{url}{quote}"
1✔
572

573
        changed_content, matched = re.subn(
1✔
574
            pattern, convert_to_tracker_warning_link, changed_content
575
        )
576
        tracker_removed += matched
1✔
577

578
    level_one_detail = count_tracker(html_content, general_trackers())
1✔
579
    level_two_detail = count_tracker(html_content, strict_trackers())
1✔
580

581
    tracker_details = {
1✔
582
        "tracker_removed": tracker_removed,
583
        "level_one": level_one_detail,
584
    }
585
    logger_details = {"level": level, "level_two": level_two_detail}
1✔
586
    logger_details.update(tracker_details)
1✔
587
    info_logger.info(
1✔
588
        "email_tracker_summary",
589
        extra=logger_details,
590
    )
591
    return changed_content, tracker_details
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc