• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / eb069295-bd0a-4be9-a474-ff0fcd23e8d0

19 Feb 2024 08:30AM CUT coverage: 73.976% (+0.02%) from 73.959%
eb069295-bd0a-4be9-a474-ff0fcd23e8d0

push

circleci

web-flow
Merge pull request #4422 from mozilla/dependabot/npm_and_yarn/types/react-18.2.56

Bump @types/react from 18.2.55 to 18.2.56

2043 of 3011 branches covered (67.85%)

Branch coverage included in aggregate %.

6502 of 8540 relevant lines covered (76.14%)

20.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/emails/management/commands/process_delayed_emails_from_sqs.py
1
import json
×
2
import logging
×
3
import shlex
×
4
import sys
×
5
import time
×
6

7
import boto3
×
8
from botocore.exceptions import ClientError
×
9

10
from django.conf import settings
×
11
from django.core.management.base import BaseCommand
×
12

13
from emails.sns import verify_from_sns
×
14
from emails.views import _sns_inbound_logic, validate_sns_arn_and_type
×
15
from emails.utils import incr_if_enabled
×
16

17

18
logger = logging.getLogger("events")
×
19
info_logger = logging.getLogger("eventsinfo")
×
20

21

22
def _verify_and_run_sns_inbound_on_message(message):
×
23
    incr_if_enabled("rerun_message_from_sqs", 1)
×
24
    json_body = json.loads(message.body)
×
25
    verified_json_body = verify_from_sns(json_body)
×
26
    topic_arn = verified_json_body["TopicArn"]
×
27
    message_type = verified_json_body["Type"]
×
28
    validate_sns_arn_and_type(topic_arn, message_type)
×
29
    try:
×
30
        _sns_inbound_logic(topic_arn, message_type, verified_json_body)
×
31
        info_logger.info(f"processed sqs message ID: {message.message_id}")
×
32
    except ClientError as e:
×
33
        incr_if_enabled("rerun_message_from_sqs_error", 1)
×
34
        logger.error("sqs_client_error: ", extra=e.response["Error"])
×
35
        temp_errors = ["throttling", "pause"]
×
36
        lower_error_code = e.response["Error"]["Code"].lower()
×
37
        if any(temp_error in lower_error_code for temp_error in temp_errors):
×
38
            incr_if_enabled("rerun_message_from_sqs_temp_error", 1)
×
39
            logger.error(
×
40
                '"temporary" error, sleeping for 1s: ', extra=e.response["Error"]
41
            )
42
            time.sleep(1)
×
43
            try:
×
44
                _sns_inbound_logic(topic_arn, message_type, verified_json_body)
×
45
                info_logger.info(f"processed sqs message ID: {message.message_id}")
×
46
            except ClientError as e:
×
47
                incr_if_enabled("rerun_message_from_sqs_error", 1)
×
48
                logger.error("sqs_client_error: ", extra=e.response["Error"])
×
49

50

51
class Command(BaseCommand):
×
52
    help = "Fetches messages from SQS dead-letter queue and processes them."
×
53

54
    def handle(self, *args, **options):
×
55
        self.exit_code = 0
×
56
        try:
×
57
            sqs_client = boto3.resource("sqs", region_name=settings.AWS_REGION)
×
58
            dl_queue = sqs_client.Queue(settings.AWS_SQS_QUEUE_URL)
×
59
        except ClientError as e:
×
60
            logger.error("sqs_client_error: ", extra=e.response["Error"])
×
61
            self.exit_code = 1
×
62
            sys.exit(self.exit_code)
×
63

64
        messages = dl_queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)
×
65
        while len(messages) > 0:
×
66
            for message in messages:
×
67
                try:
×
68
                    _verify_and_run_sns_inbound_on_message(message)
×
69
                except Exception as e:
×
70
                    logger.exception(
×
71
                        f"dlq_processing_error_{type(e)}",
72
                        extra={"exception": shlex.quote(str(e))},
73
                    )
74
                finally:
75
                    message.delete()
×
76
            messages = dl_queue.receive_messages(
×
77
                MaxNumberOfMessages=10, WaitTimeSeconds=1
78
            )
79
        sys.exit(self.exit_code)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc