• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #6920

22 Nov 2025 06:13PM UTC coverage: 78.218% (+0.007%) from 78.211%
#6920

push

coveralls-python

teodorescuserban
HDX-10015 Add production config and development tools

Production configuration:
- Add boto3 to requirements.txt for SES API support
- Configure plugin in prod.ini.tpl and common-config-ini.txt
- Add HDX_SMTP_DOMAIN to hdx_theme config declaration
- Update setup_py_helper.sh for plugin packaging

Development tools:
- run-plugin-tests.sh: Quick test runner with docker compose
  - Auto-initializes test environment if needed
  - Reuses running stack for fast iteration
  - Shows coverage for specified plugin only
- TESTING.md: Comprehensive testing documentation

13915 of 17790 relevant lines covered (78.22%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.79
/ckanext-hdx_smtp_assumerole/ckanext/hdx_smtp_assumerole/helpers/credentials_manager.py
1
# encoding: utf-8
2

3
import logging
1✔
4
import threading
1✔
5
from datetime import datetime, timedelta, timezone
1✔
6
from typing import Dict, Any, Optional
1✔
7

8
from ckanext.hdx_smtp_assumerole.helpers.smtp_assume_role import (
1✔
9
    assume_role_for_smtp,
10
    SMTPAssumeRoleException
11
)
12

13
log = logging.getLogger(__name__)
1✔
14

15

16
class SMTPCredentialsManager:
1✔
17
    """
18
    Singleton credentials manager for SMTP AssumeRole.
19

20
    Manages temporary AWS SES SMTP credentials with automatic refresh.
21
    Each container/process has its own independent instance (no shared state).
22
    Thread-safe within the same container using local RLock.
23

24
    Features:
25
    - Lazy refresh: only refreshes when credentials are about to expire
26
    - Thread-safe: uses RLock for atomic operations within container
27
    - Independent: no coordination needed between containers
28
    - Automatic: transparent refresh before email sending
29
    """
30

31
    _instance = None
1✔
32
    _lock = threading.RLock()
1✔
33

34
    def __init__(self) -> None:
1✔
35
        """Initialize credentials manager with empty state."""
36
        self.credentials: Optional[Dict[str, Any]] = None
1✔
37
        self.expiration_time: Optional[datetime] = None
1✔
38
        self.config: Optional[Dict[str, Any]] = None
1✔
39
        self.role_name_or_arn: Optional[str] = None
1✔
40
        self.region: Optional[str] = None
1✔
41
        self.session_name: Optional[str] = None
1✔
42
        self._initialized: bool = False
1✔
43

44
    @classmethod
1✔
45
    def get_instance(cls) -> 'SMTPCredentialsManager':
1✔
46
        """
47
        Get or create singleton instance.
48
        Thread-safe singleton creation.
49

50
        :return: SMTPCredentialsManager instance
51
        :rtype: SMTPCredentialsManager
52
        """
53
        if cls._instance is None:
1✔
54
            with cls._lock:
1✔
55
                if cls._instance is None:
1✔
56
                    cls._instance = cls()
1✔
57
        return cls._instance
1✔
58

59
    def initialize(self, config: Dict[str, Any]) -> None:
1✔
60
        """
61
        Initialize manager with configuration and load initial credentials.
62
        Should be called once at application startup.
63

64
        :param config: CKAN config dict
65
        :type config: dict
66
        """
67
        with self._lock:
1✔
68
            if self._initialized:
1✔
69
                log.debug('SMTPCredentialsManager already initialized, skipping')
1✔
70
                return
1✔
71

72
            log.debug('Initializing SES credentials manager')
1✔
73

74
            self.config = config
1✔
75
            self.role_name_or_arn = config.get('ckanext.hdx_smtp_assumerole.role_arn')
1✔
76
            self.region = config.get('ckanext.hdx_smtp_assumerole.region')
1✔
77
            self.session_name = config.get('ckanext.hdx_smtp_assumerole.session_name', 'ckan-ses-session')
1✔
78

79
            # Load initial credentials
80
            self._load_credentials()
1✔
81
            self._initialized = True
1✔
82

83
            log.debug('SES credentials manager initialized successfully')
1✔
84

85
    def ensure_fresh_credentials(self) -> None:
1✔
86
        """
87
        Ensure credentials are fresh and valid.
88
        Refreshes credentials if they expire within 5 minutes.
89
        Thread-safe - uses lock to prevent concurrent refreshes.
90

91
        This is the main entry point called before sending emails.
92
        """
93
        if not self._initialized:
1✔
94
            log.warning('SMTPCredentialsManager not initialized, skipping credential check')
×
95
            return
×
96

97
        if self._needs_refresh():
1✔
98
            with self._lock:
1✔
99
                # Double-check after acquiring lock (another thread might have refreshed)
100
                if self._needs_refresh():
1✔
101
                    log.info('Credentials expiring soon, refreshing...')
1✔
102
                    self._load_credentials()
1✔
103

104
    def _needs_refresh(self) -> bool:
1✔
105
        """
106
        Check if credentials need to be refreshed.
107
        Returns True if credentials don't exist or expire within 5 minutes.
108
        Thread-safe - acquires lock internally for consistent reads.
109

110
        :return: True if refresh is needed
111
        :rtype: bool
112
        """
113
        with self._lock:
1✔
114
            if self.credentials is None or self.expiration_time is None:
1✔
115
                log.debug('Credentials not loaded, refresh needed')
1✔
116
                return True
1✔
117

118
            # Use UTC timezone explicitly to avoid None tzinfo issues
119
            now = datetime.now(timezone.utc)
1✔
120
            # Ensure expiration_time is timezone-aware
121
            expiration = self.expiration_time
1✔
122
            if expiration.tzinfo is None:
1✔
123
                expiration = expiration.replace(tzinfo=timezone.utc)
1✔
124

125
            time_until_expiry = expiration - now
1✔
126

127
            # Refresh if less than 5 minutes until expiry
128
            if time_until_expiry < timedelta(minutes=5):
1✔
129
                log.debug(f'Credentials expire in {time_until_expiry}, refresh needed')
1✔
130
                return True
1✔
131

132
            log.debug(f'Credentials still valid for {time_until_expiry}, no refresh needed')
1✔
133
            return False
1✔
134

135
    def _load_credentials(self) -> None:
1✔
136
        """
137
        Load fresh credentials via AssumeRole.
138
        Updates credentials cache for SES API usage.
139
        Should only be called within a lock.
140

141
        :raises SMTPAssumeRoleException: If credential loading fails
142
        """
143
        try:
1✔
144
            log.debug(f'Loading fresh SES credentials via AssumeRole for role: {self.role_name_or_arn}')
1✔
145

146
            # Assume role and get temporary credentials
147
            credentials = assume_role_for_smtp(
1✔
148
                role_name_or_arn=self.role_name_or_arn,
149
                region=self.region,
150
                session_name=self.session_name
151
            )
152

153
            # Update cache
154
            self.credentials = credentials
1✔
155
            self.expiration_time = credentials['expiration']
1✔
156

157
            # Update CKAN config (currently no-op for SES API)
158
            self._update_ckan_config(credentials)
1✔
159

160
            log.debug(f'Successfully loaded fresh SES credentials, expire at: {self.expiration_time}')
1✔
161

162
        except SMTPAssumeRoleException as e:
1✔
163
            log.error(f'Failed to load SES credentials: {str(e)}')
1✔
164
            raise
1✔
165
        except Exception as e:
×
166
            log.error(f'Unexpected error loading SES credentials: {str(e)}')
×
167
            raise SMTPAssumeRoleException(f'Unexpected error: {str(e)}')
×
168

169
    def _update_ckan_config(self, credentials: Dict[str, Any]) -> None:
1✔
170
        """
171
        Update CKAN configuration (placeholder for future use).
172

173
        Note: With SES API, we don't need to update SMTP config settings
174
        since emails are sent via boto3 API calls, not SMTP protocol.
175

176
        :param credentials: Credentials dict from assume_role_for_smtp
177
        :type credentials: dict
178
        """
179
        # No config updates needed for SES API
180
        # Credentials are passed directly to boto3.client('ses')
181
        pass
1✔
182

183
    def get_ses_credentials(self) -> Optional[Dict[str, Any]]:
1✔
184
        """
185
        Get credentials for SES API usage.
186

187
        :return: Dict with AWS credentials for boto3
188
        :rtype: dict
189
        """
190
        with self._lock:
1✔
191
            if not self._initialized or self.credentials is None:
1✔
192
                return None
1✔
193

194
            return {
1✔
195
                'access_key': self.credentials.get('access_key'),
196
                'secret_key': self.credentials.get('secret_key'),
197
                'session_token': self.credentials.get('session_token'),
198
                'region': self.region
199
            }
200

201
    def get_credentials_info(self) -> Dict[str, Any]:
1✔
202
        """
203
        Get information about current credentials (for debugging/monitoring).
204

205
        :return: Dict with credentials info
206
        :rtype: dict
207
        """
208
        with self._lock:
1✔
209
            if not self._initialized or self.credentials is None:
1✔
210
                return {
1✔
211
                    'initialized': self._initialized,
212
                    'has_credentials': False
213
                }
214

215
            # Use UTC timezone explicitly to avoid None tzinfo issues
216
            now = datetime.now(timezone.utc)
1✔
217
            # Ensure expiration_time is timezone-aware
218
            expiration = self.expiration_time
1✔
219
            if expiration.tzinfo is None:
1✔
220
                expiration = expiration.replace(tzinfo=timezone.utc)
1✔
221

222
            time_until_expiry = expiration - now
1✔
223

224
            # Mask access key for security (show first 4 and last 4 chars)
225
            access_key = self.credentials.get('access_key', 'N/A')
1✔
226
            if len(access_key) > 8:
1✔
227
                masked_access_key = access_key[:4] + '***' + access_key[-4:]
1✔
228
            else:
229
                masked_access_key = access_key
1✔
230

231
            return {
1✔
232
                'initialized': True,
233
                'has_credentials': True,
234
                'expiration_time': str(self.expiration_time),
235
                'time_until_expiry': str(time_until_expiry),
236
                'needs_refresh': self._needs_refresh(),
237
                'region': self.region,
238
                'access_key': masked_access_key
239
            }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc