• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #5649

26 Aug 2024 11:34AM UTC coverage: 72.882% (-0.1%) from 72.998%
#5649

Pull #6412

coveralls-python

web-flow
Merge branch 'dev' into feature/HDX-9990-implement-new-contact-contributor-form
Pull Request #6412: HDX-9960 & HDX-9987 contact the contributor & HDX Connect new pages

146 of 321 new or added lines in 10 files covered. (45.48%)

1 existing line in 1 file now uncovered.

11750 of 16122 relevant lines covered (72.88%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.49
/ckanext-hdx_package/ckanext/hdx_package/controller_logic/dataset_request_access.py
1
import json
1✔
2
import logging
1✔
3

4
import ckan.lib.navl.dictization_functions as dictization_functions
1✔
5
import ckan.logic as logic
1✔
6
import ckan.model as model
1✔
7
import ckan.plugins.toolkit as tk
1✔
8
import ckanext.hdx_users.helpers.mailer as hdx_mailer
1✔
9
from ckan.types import Context, DataDict, Request
1✔
10
from ckan.lib.navl.dictization_functions import validate
1✔
11
from ckanext.requestdata.logic.schema import request_create_schema
1✔
12
from ckanext.requestdata.view_helper import process_extras_fields
1✔
13

14
get_action = tk.get_action
1✔
15
check_access = tk.check_access
1✔
16
config = tk.config
1✔
17
h = tk.h
1✔
18
g = tk.g
1✔
19
NotAuthorized = tk.NotAuthorized
1✔
20
NotFound = tk.ObjectNotFound
1✔
21
unicode_safe = tk.get_validator('unicode_safe')
1✔
22
log = logging.getLogger(__name__)
1✔
23

24

25
class DatasetRequestAccessLogic(object):
1✔
26
    def __init__(self, context: Context, request: Request):
1✔
27
        self.request = request
1✔
28
        self.context = context
1✔
29
        self.form = request.form
1✔
30
        self.schema = request_create_schema()
1✔
31

32
    def read(self) -> DataDict:
1✔
33
        data_dict = logic.clean_dict(dictization_functions.unflatten(logic.tuplize_dict(logic.parse_params(self.form))))
1✔
34
        return data_dict
1✔
35

36
    def validate(self, data_dict: DataDict):
1✔
37
        try:
1✔
38
            validated_response = validate(data_dict, self.schema, self.context)
1✔
NEW
39
        except Exception as ex:
×
NEW
40
            log.error(ex)
×
41

42
        return validated_response
1✔
43

44
    def send_request(self) -> tuple[bool, str]:
1✔
45
        data = self.request.form.to_dict()
1✔
46
        get_action('requestdata_request_create')(self.context, data)
1✔
47

48
        pkg_dict = get_action('package_show')(self.context, {'id': data['package_id']})
1✔
49

50
        maintainer_id = pkg_dict['maintainer']
1✔
51
        if maintainer_id is None:
1✔
NEW
52
            return False, 'Dataset maintainer email not found.'
×
53

54
        user_obj = self.context['auth_user_obj']
1✔
55

56
        # Get users objects from maintainers list
57
        context_user_show = {
1✔
58
            'model': model,
59
            'session': model.Session,
60
            'user': g.user,
61
            'auth_user_obj': g.userobj,
62
            'keep_email': True,
63
        }
64

65
        data_dict = {
1✔
66
            'users': []
67
        }
68
        recipients = []
1✔
69
        maintainer_dict = {}
1✔
70
        try:
1✔
71
            maintainer_dict = get_action('user_show')(context_user_show, {'id': maintainer_id})
1✔
72
            data_dict['users'].append(maintainer_dict)
1✔
73
            recipients.append({'display_name': maintainer_dict.get('fullname'), 'email': maintainer_dict.get('email')})
1✔
NEW
74
        except NotFound:
×
NEW
75
            pass
×
76

77
        if len(recipients) == 0:
1✔
NEW
78
            admins = _org_admins_for_dataset(self.context, pkg_dict['name'])
×
79

NEW
80
            for admin in admins:
×
NEW
81
                recipients.append({'display_name': admin.get('fullname'), 'email': admin.get('email')})
×
82

83
        sender_name = data.get('sender_name', '')
1✔
84
        sender_email = data.get('email_address', '')
1✔
85
        user_email = user_obj.email
1✔
86
        message = data['message_content']
1✔
87

88
        try:
1✔
89
            sender_org = get_action('organization_show')(self.context, {'id': data.get('sender_organization_id')})
1✔
NEW
90
        except NotFound:
×
NEW
91
            sender_org = None
×
92

93
        organizations = get_action('organization_list_for_user')(self.context, {
1✔
94
            'id': user_obj.id,
95
            'permission': 'read'
96
        })
97
        extras = json.loads(process_extras_fields(data, organizations, sender_org))
1✔
98

99
        _send_email_to_maintainer(sender_name, message, user_email, extras, recipients, maintainer_dict, pkg_dict)
1✔
100
        _send_email_to_requester(sender_name, sender_email, message, user_email, pkg_dict)
1✔
101

102
        # notify package creator that new data request was made
103
        get_action('requestdata_notification_create')(self.context, data_dict)
1✔
104

105
        data_dict = {
1✔
106
            'package_id': data['package_id'],
107
            'flag': 'request'
108
        }
109
        get_action('requestdata_increment_request_data_counters')(self.context, data_dict)
1✔
110

111
        return True, 'Email message was successfully sent.'
1✔
112

113

114
def _org_admins_for_dataset(context: Context, dataset_name: str):
1✔
NEW
115
    pkg_dict = get_action('package_show')(context, {'id': dataset_name})
×
NEW
116
    owner_org = pkg_dict['owner_org']
×
117

NEW
118
    org = get_action('organization_show')(context, {'id': owner_org})
×
119

NEW
120
    admins = []
×
NEW
121
    for user in org['users']:
×
NEW
122
        if user['capacity'] == 'admin':
×
NEW
123
            db_user = model.User.get(user['id'])
×
NEW
124
            data = {
×
125
                'email': db_user.email,
126
                'fullname': db_user.fullname or db_user.name
127
            }
NEW
128
            admins.append(data)
×
129

NEW
130
    return admins
×
131

132

133
def _send_email_to_requester(sender_name: str, sender_email: str, message: str, user_email: str,
1✔
134
                             pkg_dict: DataDict) -> None:
135
    subject = u'Request for access to metadata-only dataset'
1✔
136
    email_data = {
1✔
137
        'user_fullname': sender_name,
138
        'msg': message,
139
        'org_name': pkg_dict.get('organization').get('title'),
140
        'dataset_link': h.url_for('dataset_read', id=pkg_dict['name'], qualified=True),
141
        'dataset_title': pkg_dict['title'],
142
    }
143
    senders_email = [{'display_name': sender_name, 'email': sender_email}]
1✔
144
    hdx_mailer.mail_recipient(senders_email, subject, email_data, footer=user_email,
1✔
145
                              snippet='email/content/request_data_to_user.html')
146

147

148
def _send_email_to_maintainer(sender_name: str, message: str, user_email: str, extras, recipients,
1✔
149
                              maintainer_dict: DataDict, pkg_dict: DataDict):
150
    subject = sender_name + u' has requested access to one of your datasets: ' + pkg_dict['title']
1✔
151
    email_data = {
1✔
152
        'user_fullname': sender_name,
153
        'user_email': user_email,
154
        'msg': message,
155
        'extras': extras,
156
        'org_name': pkg_dict.get('organization').get('title'),
157
        'dataset_link': h.url_for('dataset_read', id=pkg_dict['name'], qualified=True),
158
        'dataset_title': pkg_dict['title'],
159
        'maintainer_fullname': maintainer_dict.get('display_name') or maintainer_dict.get(
160
            'fullname') if maintainer_dict else 'HDX user',
161
        'requestdata_org_url': h.url_for('requestdata_organization_requests.requested_data',
162
                                         id=pkg_dict.get('owner_org'), qualified=True)
163
    }
164
    hdx_mailer.mail_recipient(recipients, subject, email_data, footer='hdx@un.org',
1✔
165
                              snippet='email/content/request_data_to_admins.html')
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc