• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 7cf72e3a-ff1a-4365-9f40-8ea40594b90b

04 Dec 2024 09:35PM CUT coverage: 85.057% (+0.005%) from 85.052%
7cf72e3a-ff1a-4365-9f40-8ea40594b90b

push

circleci

web-flow
Merge pull request #5235 from mozilla/remove-pyopenssl-mpp-3852

MPP-3852: Use `cryptography` for SNS signature validation, remove `pyopenssl` and `pem`

2432 of 3561 branches covered (68.3%)

Branch coverage included in aggregate %.

62 of 63 new or added lines in 4 files covered. (98.41%)

1 existing line in 1 file now uncovered.

16990 of 19273 relevant lines covered (88.15%)

9.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.22
/emails/sns.py
1
# Inspired by django-bouncy utils:
2
# https://github.com/organizerconnect/django-bouncy/blob/master/django_bouncy/utils.py
3

4
import base64
1✔
5
import logging
1✔
6
from typing import Any
1✔
7
from urllib.request import urlopen
1✔
8

9
from django.conf import settings
1✔
10
from django.core.cache import caches
1✔
11
from django.core.exceptions import SuspiciousOperation
1✔
12

13
from cryptography import x509
1✔
14
from cryptography.exceptions import InvalidSignature
1✔
15
from cryptography.hazmat.primitives import hashes, serialization
1✔
16
from cryptography.hazmat.primitives.asymmetric import padding, rsa
1✔
17

18
logger = logging.getLogger("events")
1✔
19

20
NOTIFICATION_HASH_FORMAT = """\
1✔
21
Message
22
{Message}
23
MessageId
24
{MessageId}
25
Subject
26
{Subject}
27
Timestamp
28
{Timestamp}
29
TopicArn
30
{TopicArn}
31
Type
32
{Type}
33
"""
34

35
NOTIFICATION_WITHOUT_SUBJECT_HASH_FORMAT = """\
1✔
36
Message
37
{Message}
38
MessageId
39
{MessageId}
40
Timestamp
41
{Timestamp}
42
TopicArn
43
{TopicArn}
44
Type
45
{Type}
46
"""
47

48
SUBSCRIPTION_HASH_FORMAT = """\
1✔
49
Message
50
{Message}
51
MessageId
52
{MessageId}
53
SubscribeURL
54
{SubscribeURL}
55
Timestamp
56
{Timestamp}
57
Token
58
{Token}
59
TopicArn
60
{TopicArn}
61
Type
62
{Type}
63
"""
64

65
SUPPORTED_SNS_TYPES = [
1✔
66
    "SubscriptionConfirmation",
67
    "Notification",
68
]
69

70

71
class VerificationFailed(ValueError):
1✔
72
    pass
1✔
73

74

75
def verify_from_sns(json_body: dict[str, Any]) -> dict[str, Any]:
1✔
76
    """
77
    Raise an exception if SNS signature verification fails.
78

79
    https://docs.aws.amazon.com/sns/latest/dg/sns-verify-signature-of-message.html
80

81
    Only supports SignatureVersion 1. SignatureVersion 2 (SHA256) was added in
82
    September 2022, and requires opt-in.
83
    """
84
    signing_cert_url = json_body["SigningCertURL"]
1✔
85
    cert_pubkey = _get_signing_public_key(signing_cert_url)
1✔
86
    signature = base64.decodebytes(json_body["Signature"].encode())
1✔
87
    hash_format = _get_hash_format(json_body)
1✔
88

89
    try:
1✔
90
        cert_pubkey.verify(
1✔
91
            signature,
92
            hash_format.format(**json_body).encode(),
93
            padding.PKCS1v15(),
94
            hashes.SHA1(),  # noqa: S303  # Use of insecure hash SHA1
95
        )
96
    except InvalidSignature as e:
1✔
97
        raise VerificationFailed(
1✔
98
            f"Invalid signature with SigningCertURL {signing_cert_url}"
99
        ) from e
100

101
    return json_body
1✔
102

103

104
def _get_hash_format(json_body: dict[str, Any]) -> str:
1✔
105
    message_type = json_body["Type"]
1✔
106
    if message_type == "Notification":
1✔
107
        if "Subject" in json_body.keys():
1✔
108
            return NOTIFICATION_HASH_FORMAT
1✔
109
        return NOTIFICATION_WITHOUT_SUBJECT_HASH_FORMAT
1✔
110

111
    return SUBSCRIPTION_HASH_FORMAT
1✔
112

113

114
def _get_signing_public_key(cert_url: str) -> rsa.RSAPublicKey:
1✔
115
    """
116
    Download the signing certificate and return the public key.
117

118
    Or, return the cached public key from a previous call.
119
    """
120
    cert_url_origin = f"https://sns.{settings.AWS_REGION}.amazonaws.com/"
1✔
121
    if not (cert_url.startswith(cert_url_origin)):
1✔
122
        raise SuspiciousOperation(
1✔
123
            f'SNS SigningCertURL "{cert_url}" did not start with "{cert_url_origin}"'
124
        )
125

126
    key_cache = caches[getattr(settings, "AWS_SNS_KEY_CACHE", "default")]
1✔
127
    cache_key = f"{cert_url}:public_key"
1✔
128
    public_pem = key_cache.get(cache_key)
1✔
129

130
    set_cache = False
1✔
131
    if public_pem:
1✔
132
        cert_pubkey = serialization.load_pem_public_key(public_pem)
1✔
133
    else:
134
        set_cache = True
1✔
135
        response = urlopen(cert_url)  # noqa: S310 (check for custom scheme)
1✔
136
        cert_pem = response.read()
1✔
137

138
        # Extract the first certificate in the file and confirm it's a valid
139
        # PEM certificate
140
        certs = x509.load_pem_x509_certificates(cert_pem)
1✔
141

142
        # A proper certificate file will contain 1 certificate
143
        if len(certs) != 1:
1✔
144
            raise VerificationFailed(
1✔
145
                f"SigningCertURL {cert_url} has {len(certs)} certificates."
146
            )
147
        cert_pubkey = certs[0].public_key()
1✔
148
        public_pem = cert_pubkey.public_bytes(
1✔
149
            encoding=serialization.Encoding.PEM,
150
            format=serialization.PublicFormat.SubjectPublicKeyInfo,
151
        )
152

153
    if not isinstance(cert_pubkey, rsa.RSAPublicKey):
1!
NEW
154
        raise VerificationFailed(f"SigningCertURL {cert_url} is not an RSA key")
×
155

156
    if set_cache:
1✔
157
        key_cache.set(cache_key, public_pem)
1✔
158
    return cert_pubkey
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc