• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #6504

01 Aug 2025 08:53AM UTC coverage: 74.843%. Remained the same
#6504

push

coveralls-python

ccataalin
update exception messages to use `display_name` for object type

0 of 2 new or added lines in 2 files covered. (0.0%)

13093 of 17494 relevant lines covered (74.84%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.76
/ckanext-hdx_users/ckanext/hdx_users/helpers/novu_interaction.py
1
import logging
1✔
2
import requests
1✔
3
from typing import Optional, Dict, Any
1✔
4

5
import ckan.plugins.toolkit as tk
1✔
6

7
from ckanext.hdx_theme.helpers.helpers import hdx_supports_notifications
1✔
8
from ckanext.hdx_users.general_token_model import ObjectType, HDXGeneralToken
1✔
9
from ckanext.hdx_users.helpers.notification_platform import read_novu_config
1✔
10

11

12
log = logging.getLogger(__name__)
1✔
13

14

15
class NovuDAO:
1✔
16
    """Data Access Object for Novu API interactions"""
17

18
    def __init__(self) -> None:
1✔
19
        self.novu_api_key, self.novu_api_url = read_novu_config()
×
20
        self.headers = {
×
21
            'Authorization': f'ApiKey {self.novu_api_key}',
22
            'Content-Type': 'application/json'
23
        }
24

25
    def get_subscriber(self, subscriber_id: str) -> Optional[Dict[str, Any]]:
1✔
26
        """Get subscriber information from Novu API
27

28
        :param subscriber_id: The ID of the subscriber to retrieve
29
        :type subscriber_id: str
30
        :returns: Subscriber data if found, None if subscriber doesn't exist
31
        :rtype: Optional[Dict[str, Any]]
32
        :raises Exception: If there's an error checking subscriber
33
        """
34
        response = requests.get(f'{self.novu_api_url}/subscribers/{subscriber_id}', headers=self.headers)
×
35

36
        if response.status_code == 404:
×
37
            return None
×
38
        elif response.status_code == 200:
×
39
            return response.json().get('data', {})
×
40
        else:
41
            raise Exception(f'Error checking subscriber: {response.text}')
×
42

43
    def create_subscriber(self, subscriber_data: Dict[str, Any]) -> None:
1✔
44
        """Create a new subscriber in Novu API
45

46
        :param subscriber_data: Dictionary containing subscriber information
47
        :type subscriber_data: Dict[str, Any]
48
        :raises Exception: If subscriber creation fails
49
        """
50
        response = requests.post(f'{self.novu_api_url}/subscribers', json=subscriber_data, headers=self.headers)
×
51
        if response.status_code != 201:
×
52
            raise Exception(f'Failed to create subscriber: {response.text}')
×
53

54
    def update_subscriber(self, subscriber_id: str, subscriber_data: Dict[str, Any]) -> None:
1✔
55
        """Update an existing subscriber in Novu API
56

57
        :param subscriber_id: ID of the subscriber to update
58
        :type subscriber_id: str
59
        :param subscriber_data: Dictionary containing updated subscriber information
60
        :type subscriber_data: Dict[str, Any]
61
        :raises Exception: If subscriber update fails
62
        """
63
        response = requests.put(f'{self.novu_api_url}/subscribers/{subscriber_id}', json=subscriber_data, headers=self.headers)
×
64
        if response.status_code != 200:
×
65
            raise Exception(f'Failed to update subscriber: {response.text}')
×
66

67
    def delete_subscriber(self, subscriber_id: str) -> None:
1✔
68
        """Delete a subscriber from Novu API
69

70
        :param subscriber_id: ID of the subscriber to delete
71
        :type subscriber_id: str
72
        :raises Exception: If subscriber deletion fails
73
        """
74
        response = requests.delete(f'{self.novu_api_url}/subscribers/{subscriber_id}', headers=self.headers)
×
75
        if response.status_code not in [200, 204]:
×
76
            raise Exception(f'Failed to delete subscriber: {response.text}')
×
77

78
    def subscriber_exists(self, subscriber_id: str) -> bool:
1✔
79
        """Check if a subscriber exists
80

81
        :param subscriber_id: ID of the subscriber to check
82
        :type subscriber_id: str
83
        :returns: True if subscriber exists, False otherwise
84
        :rtype: bool
85
        """
86
        try:
×
87
            return self.get_subscriber(subscriber_id) is not None
×
88
        except Exception:
×
89
            return False
×
90

91

92
def add_subscription_info(
1✔
93
    subscriber_id: str,
94
    email: str,
95
    unsubscribe_token_obj: HDXGeneralToken,
96
    object_type: ObjectType,
97
    object_id: str,
98
    object_dict: Optional[dict[str, Any]] = None,
99
):
100
    novu_dao = NovuDAO()
1✔
101
    unsubscribe_token_key = _generate_unsubscribe_token_key(object_id, object_type)
1✔
102

103
    if not subscriber_id or not email or not object_id:
1✔
104
        raise tk.ValidationError('Missing required parameters: subscriber_id, email or object_id')
×
105

106
    notifications_enabled = hdx_supports_notifications(object_type, object_id, object_dict)
1✔
107
    if not notifications_enabled:
1✔
NEW
108
        raise tk.ValidationError(f'Notifications are not enabled for the {object_type.display_name}')
×
109

110
    subscriber_data = novu_dao.get_subscriber(subscriber_id)
1✔
111

112
    if subscriber_data is None:
1✔
113
        # Subscriber doesn't exist; create a new one
114
        new_subscriber_data = {
×
115
            'subscriberId': subscriber_id,
116
            'email': email,
117
            'data': {
118
                unsubscribe_token_key: unsubscribe_token_obj.token,
119
            }
120
        }
121
        novu_dao.create_subscriber(new_subscriber_data)
×
122
    else:
123
        # Subscriber exists, update their data
124
        existing_data = subscriber_data.get('data', {})
1✔
125
        existing_data[unsubscribe_token_key] = unsubscribe_token_obj.token
1✔
126
        update_data = {
1✔
127
            'data': existing_data
128
        }
129
        novu_dao.update_subscriber(subscriber_id, update_data)
1✔
130

131
    return {'message': f'You have successfully subscribed to notifications for this dataset.'}
1✔
132

133

134
def remove_subscription_info(subscriber_id: str, object_id: str, object_type: ObjectType):
1✔
135
    novu_dao = NovuDAO()
1✔
136

137
    unsubscribe_token_key = _generate_unsubscribe_token_key(object_id, object_type)
1✔
138

139
    subscriber_data = novu_dao.get_subscriber(subscriber_id)
1✔
140

141
    if subscriber_data is None:
1✔
142
        log.warning(f'Subscriber {subscriber_id} not found')
×
143
        return
×
144

145
    # If the subscriber exists, remove the unsubscribe token from its data
146
    existing_data = subscriber_data.get('data', {})
1✔
147
    if unsubscribe_token_key in existing_data:
1✔
148
        del existing_data[unsubscribe_token_key]
×
149
        if existing_data:
×
150
            # Update subscriber with remaining data
151
            update_data = {
×
152
                'data': existing_data
153
            }
154
            novu_dao.update_subscriber(subscriber_id, update_data)
×
155
        else:
156
            # If no data remains, remove the subscriber entirely
157
            novu_dao.delete_subscriber(subscriber_id)
×
158
    else:
159
        log.warning(f'Unsubscribe token key {unsubscribe_token_key} not found in subscriber data')
1✔
160

161

162
def _generate_unsubscribe_token_key(object_id: str, object_type: ObjectType) -> str:
1✔
163
    if object_type == ObjectType.DATASET:
1✔
164
        type = ''  # this is the default, for historical reasons
1✔
165
    else:
166
        type = object_type.value + '_'
1✔
167
    unsubscribe_token_key = 'unsubscribe_token_' + type + object_id.replace('-', '_')
1✔
168
    return unsubscribe_token_key
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc