• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 63ad679c-7c70-4935-8fd6-bc0176e80b72

15 Dec 2023 07:08PM CUT coverage: 73.514%. Remained the same
63ad679c-7c70-4935-8fd6-bc0176e80b72

push

circleci

jwhitlock
Use branch database with production tests

Previously, migrations tests were run with production code, branch
requirements, and branch migrations. Now they run with production
requirements, so that third-party migrations are tested as well.

This uses pytest --reuse-db to create a test database with the branch's
migrations, and then a pip install with the production code. This more
closely emulates the mixed environment during a deploy.

1962 of 2913 branches covered (0.0%)

Branch coverage included in aggregate %.

6273 of 8289 relevant lines covered (75.68%)

19.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/emails/management/commands/process_delayed_emails_from_sqs.py
1
import json
×
2
import logging
×
3
import sys
×
4
import time
×
5

6
import boto3
×
7
from botocore.exceptions import ClientError
×
8

9
from django.conf import settings
×
10
from django.core.management.base import BaseCommand
×
11

12
from emails.views import _sns_inbound_logic, validate_sns_arn_and_type, verify_from_sns
×
13
from emails.utils import incr_if_enabled
×
14

15

16
logger = logging.getLogger("events")
×
17
info_logger = logging.getLogger("eventsinfo")
×
18

19

20
def _verify_and_run_sns_inbound_on_message(message):
×
21
    incr_if_enabled("rerun_message_from_sqs", 1)
×
22
    json_body = json.loads(message.body)
×
23
    verified_json_body = verify_from_sns(json_body)
×
24
    topic_arn = verified_json_body["TopicArn"]
×
25
    message_type = verified_json_body["Type"]
×
26
    validate_sns_arn_and_type(topic_arn, message_type)
×
27
    try:
×
28
        _sns_inbound_logic(topic_arn, message_type, verified_json_body)
×
29
        info_logger.info(f"processed sqs message ID: {message.message_id}")
×
30
    except ClientError as e:
×
31
        incr_if_enabled("rerun_message_from_sqs_error", 1)
×
32
        logger.error("sqs_client_error: ", extra=e.response["Error"])
×
33
        temp_errors = ["throttling", "pause"]
×
34
        lower_error_code = e.response["Error"]["Code"].lower()
×
35
        if any(temp_error in lower_error_code for temp_error in temp_errors):
×
36
            incr_if_enabled("rerun_message_from_sqs_temp_error", 1)
×
37
            logger.error(
×
38
                '"temporary" error, sleeping for 1s: ', extra=e.response["Error"]
39
            )
40
            time.sleep(1)
×
41
            try:
×
42
                _sns_inbound_logic(topic_arn, message_type, verified_json_body)
×
43
                info_logger.info(f"processed sqs message ID: {message.message_id}")
×
44
            except ClientError as e:
×
45
                incr_if_enabled("rerun_message_from_sqs_error", 1)
×
46
                logger.error("sqs_client_error: ", extra=e.response["Error"])
×
47

48

49
class Command(BaseCommand):
×
50
    help = "Fetches messages from SQS dead-letter queue and processes them."
×
51

52
    def handle(self, *args, **options):
×
53
        self.exit_code = 0
×
54
        try:
×
55
            sqs_client = boto3.resource("sqs", region_name=settings.AWS_REGION)
×
56
            dl_queue = sqs_client.Queue(settings.AWS_SQS_QUEUE_URL)
×
57
        except ClientError as e:
×
58
            logger.error("sqs_client_error: ", extra=e.response["Error"])
×
59
            self.exit_code = 1
×
60
            sys.exit(self.exit_code)
×
61

62
        messages = dl_queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)
×
63
        while len(messages) > 0:
×
64
            for message in messages:
×
65
                try:
×
66
                    _verify_and_run_sns_inbound_on_message(message)
×
67
                except:
×
68
                    exc_type, _, _ = sys.exc_info()
×
69
                    logger.exception(f"dlq_processing_error_{exc_type}")
×
70
                finally:
71
                    message.delete()
×
72
            messages = dl_queue.receive_messages(
×
73
                MaxNumberOfMessages=10, WaitTimeSeconds=1
74
            )
75
        sys.exit(self.exit_code)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc