• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 39793797-bd9d-4c05-8d96-3802176e9b77

22 Apr 2024 08:02PM CUT coverage: 75.618% (+0.007%) from 75.611%
39793797-bd9d-4c05-8d96-3802176e9b77

push

circleci

web-flow
Merge pull request #4607 from mozilla/add-ruff-again-mpp-79

MPP-79: Add `ruff` Python linter

2463 of 3426 branches covered (71.89%)

Branch coverage included in aggregate %.

184 of 194 new or added lines in 36 files covered. (94.85%)

1 existing line in 1 file now uncovered.

6813 of 8841 relevant lines covered (77.06%)

19.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.62
/emails/sns.py
1
# Inspired by django-bouncy utils:
2
# https://github.com/organizerconnect/django-bouncy/blob/master/django_bouncy/utils.py
3

4
import base64
1✔
5
import logging
1✔
6
from urllib.request import urlopen
1✔
7

8
from django.conf import settings
1✔
9
from django.core.cache import caches
1✔
10
from django.core.exceptions import SuspiciousOperation
1✔
11
from django.utils.encoding import smart_bytes
1✔
12

13
import pem
1✔
14
from OpenSSL import crypto
1✔
15

16
logger = logging.getLogger("events")
1✔
17

18
NOTIFICATION_HASH_FORMAT = """Message
1✔
19
{Message}
20
MessageId
21
{MessageId}
22
Subject
23
{Subject}
24
Timestamp
25
{Timestamp}
26
TopicArn
27
{TopicArn}
28
Type
29
{Type}
30
"""
31

32
NOTIFICATION_WITHOUT_SUBJECT_HASH_FORMAT = """Message
1✔
33
{Message}
34
MessageId
35
{MessageId}
36
Timestamp
37
{Timestamp}
38
TopicArn
39
{TopicArn}
40
Type
41
{Type}
42
"""
43

44
SUBSCRIPTION_HASH_FORMAT = """Message
1✔
45
{Message}
46
MessageId
47
{MessageId}
48
SubscribeURL
49
{SubscribeURL}
50
Timestamp
51
{Timestamp}
52
Token
53
{Token}
54
TopicArn
55
{TopicArn}
56
Type
57
{Type}
58
"""
59

60
SUPPORTED_SNS_TYPES = [
1✔
61
    "SubscriptionConfirmation",
62
    "Notification",
63
]
64

65

66
def verify_from_sns(json_body):
1✔
67
    pemfile = _grab_keyfile(json_body["SigningCertURL"])
×
68
    cert = crypto.load_certificate(crypto.FILETYPE_PEM, pemfile)
×
69
    signature = base64.decodebytes(json_body["Signature"].encode("utf-8"))
×
70

71
    hash_format = _get_hash_format(json_body)
×
72

73
    crypto.verify(
×
74
        cert, signature, hash_format.format(**json_body).encode("utf-8"), "sha1"
75
    )
76
    return json_body
×
77

78

79
def _get_hash_format(json_body):
1✔
80
    message_type = json_body["Type"]
×
81
    if message_type == "Notification":
×
82
        if "Subject" in json_body.keys():
×
83
            return NOTIFICATION_HASH_FORMAT
×
84
        return NOTIFICATION_WITHOUT_SUBJECT_HASH_FORMAT
×
85

86
    return SUBSCRIPTION_HASH_FORMAT
×
87

88

89
def _grab_keyfile(cert_url):
1✔
90
    cert_url_origin = f"https://sns.{settings.AWS_REGION}.amazonaws.com/"
1✔
91
    if not (cert_url.startswith(cert_url_origin)):
1!
92
        raise SuspiciousOperation(
1✔
93
            f'SNS SigningCertURL "{cert_url}" did not start with "{cert_url_origin}"'
94
        )
95

96
    key_cache = caches[getattr(settings, "AWS_SNS_KEY_CACHE", "default")]
×
97

98
    pemfile = key_cache.get(cert_url)
×
99
    if not pemfile:
×
NEW
100
        response = urlopen(cert_url)  # noqa: S310 (check for custom scheme)
×
101
        pemfile = response.read()
×
102
        # Extract the first certificate in the file and confirm it's a valid
103
        # PEM certificate
104
        certificates = pem.parse(smart_bytes(pemfile))
×
105

106
        # A proper certificate file will contain 1 certificate
107
        if len(certificates) != 1:
×
108
            logger.error("Invalid Certificate File: URL %s", cert_url)
×
109
            raise ValueError("Invalid Certificate File")
×
110

111
        key_cache.set(cert_url, pemfile)
×
112
    return pemfile
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc