• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / a640ebd2-7460-4851-923d-ccaf50f88ede

06 May 2024 07:21AM CUT coverage: 84.562% (-0.02%) from 84.581%
a640ebd2-7460-4851-923d-ccaf50f88ede

push

circleci

web-flow
Merge pull request #4671 from mozilla/dependabot/npm_and_yarn/react-aria-3.33.0

Bump react-aria from 3.32.1 to 3.33.0

3512 of 4565 branches covered (76.93%)

Branch coverage included in aggregate %.

14619 of 16876 relevant lines covered (86.63%)

10.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/emails/management/commands/process_delayed_emails_from_sqs.py
1
import json
×
2
import logging
×
3
import shlex
×
4
import sys
×
5
import time
×
6

7
from django.conf import settings
×
8
from django.core.management.base import BaseCommand
×
9

10
import boto3
×
11
from botocore.exceptions import ClientError
×
12

13
from emails.sns import verify_from_sns
×
14
from emails.utils import incr_if_enabled
×
15
from emails.views import _sns_inbound_logic, validate_sns_arn_and_type
×
16

17
logger = logging.getLogger("events")
×
18
info_logger = logging.getLogger("eventsinfo")
×
19

20

21
def _verify_and_run_sns_inbound_on_message(message):
×
22
    incr_if_enabled("rerun_message_from_sqs", 1)
×
23
    json_body = json.loads(message.body)
×
24
    verified_json_body = verify_from_sns(json_body)
×
25
    topic_arn = verified_json_body["TopicArn"]
×
26
    message_type = verified_json_body["Type"]
×
27
    validate_sns_arn_and_type(topic_arn, message_type)
×
28
    try:
×
29
        _sns_inbound_logic(topic_arn, message_type, verified_json_body)
×
30
        info_logger.info(f"processed sqs message ID: {message.message_id}")
×
31
    except ClientError as e:
×
32
        incr_if_enabled("rerun_message_from_sqs_error", 1)
×
33
        logger.error("sqs_client_error: ", extra=e.response["Error"])
×
34
        temp_errors = ["throttling", "pause"]
×
35
        lower_error_code = e.response["Error"]["Code"].lower()
×
36
        if any(temp_error in lower_error_code for temp_error in temp_errors):
×
37
            incr_if_enabled("rerun_message_from_sqs_temp_error", 1)
×
38
            logger.error(
×
39
                '"temporary" error, sleeping for 1s: ', extra=e.response["Error"]
40
            )
41
            time.sleep(1)
×
42
            try:
×
43
                _sns_inbound_logic(topic_arn, message_type, verified_json_body)
×
44
                info_logger.info(f"processed sqs message ID: {message.message_id}")
×
45
            except ClientError as e:
×
46
                incr_if_enabled("rerun_message_from_sqs_error", 1)
×
47
                logger.error("sqs_client_error: ", extra=e.response["Error"])
×
48

49

50
class Command(BaseCommand):
×
51
    help = "Fetches messages from SQS dead-letter queue and processes them."
×
52

53
    def handle(self, *args, **options):
×
54
        self.exit_code = 0
×
55
        try:
×
56
            sqs_client = boto3.resource("sqs", region_name=settings.AWS_REGION)
×
57
            dl_queue = sqs_client.Queue(settings.AWS_SQS_QUEUE_URL)
×
58
        except ClientError as e:
×
59
            logger.error("sqs_client_error: ", extra=e.response["Error"])
×
60
            self.exit_code = 1
×
61
            sys.exit(self.exit_code)
×
62

63
        messages = dl_queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)
×
64
        while len(messages) > 0:
×
65
            for message in messages:
×
66
                try:
×
67
                    _verify_and_run_sns_inbound_on_message(message)
×
68
                except Exception as e:
×
69
                    logger.exception(
×
70
                        f"dlq_processing_error_{type(e)}",
71
                        extra={"exception": shlex.quote(str(e))},
72
                    )
73
                finally:
74
                    message.delete()
×
75
            messages = dl_queue.receive_messages(
×
76
                MaxNumberOfMessages=10, WaitTimeSeconds=1
77
            )
78
        sys.exit(self.exit_code)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc