• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #5649

26 Aug 2024 11:34AM UTC coverage: 72.882% (-0.1%) from 72.998%
#5649

Pull #6412

coveralls-python

web-flow
Merge branch 'dev' into feature/HDX-9990-implement-new-contact-contributor-form
Pull Request #6412: HDX-9960 & HDX-9987 contact the contributor & HDX Connect new pages

146 of 321 new or added lines in 10 files covered. (45.48%)

1 existing line in 1 file now uncovered.

11750 of 16122 relevant lines covered (72.88%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.25
/ckanext-hdx_theme/ckanext/hdx_theme/helpers/helpers.py
1
import json
1✔
2
import datetime
1✔
3
import logging
1✔
4
import re
1✔
5
import six
1✔
6
import six.moves.urllib.parse as urlparse
1✔
7

8
import ckan.authz as new_authz
1✔
9
import ckan.lib.base as base
1✔
10
import ckan.lib.helpers as h
1✔
11
import ckan.logic as logic
1✔
12
import ckan.model as model
1✔
13
import ckan.plugins.toolkit as tk
1✔
14
import ckanext.activity.model.activity as activity_model
1✔
15
import ckanext.requestdata.model as requestdata_model
1✔
16
import ckanext.hdx_theme.version as version
1✔
17

18
from six import text_type
1✔
19

20
from collections import OrderedDict
1✔
21
from ckan.lib import munge
1✔
22
from ckan.plugins import toolkit
1✔
23
from ckanext.hdx_package.helpers.freshness_calculator import UPDATE_FREQ_INFO
1✔
24
from ckanext.hdx_package.helpers.p_code_filters_helper import are_new_p_code_filters_enabled
1✔
25
from ckanext.hdx_theme.util.light_redirect import switch_url_path
1✔
26

27
_ = toolkit._
1✔
28
request = toolkit.request
1✔
29
c = toolkit.c
1✔
30
config = toolkit.config
1✔
31
ungettext = toolkit.ungettext
1✔
32

33
log = logging.getLogger(__name__)
1✔
34

35
downloadable_formats = {
1✔
36
    'csv', 'xls', 'xlsx', 'txt', 'jpg', 'jpeg', 'png', 'gif', 'zip', 'xml'
37
}
38

39

40
def is_downloadable(resource):
1✔
41
    format = resource.get('format', 'data').lower()
1✔
42
    if format in downloadable_formats:
1✔
43
        return True
1✔
44
    return False
1✔
45

46

47
def is_not_zipped(res):
1✔
48
    url = res.get('url', 'zip').strip().lower()  # Default to zip so preview doesn't show if there is no url
1✔
49
    if re.search(r'zip$', url) or re.search(r'rar$', url):
1✔
50
        return False
×
51
    return True
1✔
52

53

54
NOT_HXL_FORMAT_LIST = frozenset({'zipped shapefile', 'zip', 'geojson', 'json', 'kml', 'kmz', 'rar', 'pdf', 'excel',
1✔
55
                       'zipped', 'docx', 'doc', '7z'})
56

57

58
def _any(item, ext_list=NOT_HXL_FORMAT_LIST):
1✔
59
    return any(i in item for i in ext_list)
×
60

61

62
def is_not_hxl_format(res_format):
1✔
63
    if not res_format:
×
64
        return False
×
65
    return _any([res_format.lower()], NOT_HXL_FORMAT_LIST)
×
66

67

68
def get_facet_items_dict(facet, limit=1000, exclude_active=False):
1✔
69
    facets = h.get_facet_items_dict(
×
70
        facet, limit=limit, exclude_active=exclude_active)
71
    filtered_no_items = c.search_facets.get(facet)['items'].__len__()
×
72
    # total_no_items = json.loads(
73
    #     count.CountController.list[facet](count.CountController()))['count']
74
    # if filtered_no_items <= 50 and filtered_no_items < total_no_items:
75
    #     no_items = filtered_no_items
76
    # else:
77
    #     no_items = total_no_items
78
    no_items = filtered_no_items
×
79

80
    if c.search_facets_limits:
×
81
        limit = c.search_facets_limits.get(facet)
×
82
    if limit:
×
83
        return (facets[:limit], no_items)
×
84
    else:
85
        return (facets, no_items)
×
86

87

88
def get_last_modifier_user(dataset_id=None, group_id=None):
1✔
89
    activity_objects = None
×
90
    if group_id is not None:
×
91
        activity_objects = activity_model.group_activity_list(group_id, limit=1, offset=0)
×
92
    if dataset_id:
×
93
        activity_objects = activity_model.package_activity_list(dataset_id, limit=1, offset=0)
×
94
    if activity_objects:
×
95
        user = activity_objects[0].user_id
×
96
        t_stamp = activity_objects[0].timestamp
×
97
        user_obj = model.User.get(user)
×
98
        return {"user_obj": user_obj, "last_modified": t_stamp.isoformat()}
×
99

100
    # in case there is no update date it will be displayed the current date
101
    return {"user_obj": None, "last_modified": None}
×
102

103

104
def get_filtered_params_list(params):
1✔
105
    result = []
×
106
    for (key, value) in params.items():
×
107
        if key not in {'q', 'sort'}:
×
108
            result.append((key, value))
×
109
    return result
×
110

111

112
# def get_last_revision_package(package_id):
113
#     #     pkg_list  = model.Session.query(model.Package).filter(model.Package.id == package_id).all()
114
#     #     pkg = pkg_list[0]
115
#     #     return pkg.latest_related_revision.id
116
#     activity_objects = model.activity.package_activity_list(
117
#         package_id, limit=1, offset=0)
118
#     if len(activity_objects) > 0:
119
#         activity = activity_objects[0]
120
#         return activity.revision_id
121
#     return None
122

123

124
# def get_last_revision_group(group_id):
125
#     #     grp_list  = model.Session.query(model.Group).filter(model.Group.id == group_id).all()
126
#     #     grp = grp_list[0]
127
#     #     last_rev = grp.all_related_revisions[0][0]
128
#     activity_objects = model.activity.group_activity_list(
129
#         group_id, limit=1, offset=0)
130
#     if len(activity_objects) > 0:
131
#         activity = activity_objects[0]
132
#         return activity.revision_id
133
#     return None
134

135

136
def get_last_revision_timestamp_group(group_id):
1✔
137
    activity_objects = activity_model.group_activity_list(
×
138
        group_id, limit=1, offset=0)
139
    if len(activity_objects) > 0:
×
140
        activity = activity_objects[0]
×
141
        return h.render_datetime(activity.timestamp)
×
142
    return None
×
143

144

145
def get_dataset_date_format(date):
1✔
146
    # Is this a range?
147
    drange = date.split('-')
×
148
    if len(drange) != 2:
×
149
        drange = [date]
×
150
    dates = []
×
151
    for d in drange:
×
152
        try:
×
153
            dt = datetime.datetime.strptime(d, '%m/%d/%Y')
×
154
            dates.append(dt.strftime('%b %-d, %Y'))
×
155
        except:
×
156
            return False
×
157
    return '-'.join(dates)
×
158

159

160
def get_group_followers(grp_id):
1✔
161
    result = logic.get_action('group_follower_count')(
1✔
162
        {'model': model, 'session': model.Session},
163
        {'id': grp_id})
164
    return result
1✔
165

166

167
def hdx_dataset_follower_count(pkg_id):
1✔
168
    result = logic.get_action('dataset_follower_count')(
×
169
        {'model': model, 'session': model.Session},
170
        {'id': pkg_id})
171
    return result
×
172

173

174
def get_group_members(grp_id):
1✔
175
    try:
1✔
176
        member_list = logic.get_action('member_list')(
1✔
177
            {'model': model, 'session': model.Session},
178
            {'id': grp_id, 'object_type': 'user'})
179
    except logic.NotAuthorized:
×
180
        member_list = logic.get_action('member_list')(
×
181
            {'model': model, 'session': model.Session},
182
            {'id': grp_id, 'include_users': False})
183
    return len(member_list)
1✔
184

185

186
def hdx_get_user_info(user_id):
1✔
187
    context = {'model': model, 'session': model.Session,
1✔
188
               'user': c.user or c.author}
189
    try:
1✔
190
        user = tk.get_action('hdx_basic_user_info')(context, {'id': user_id})
1✔
191
    except logic.NotAuthorized:
×
192
        base.abort(403, _('Unauthorized to see organization member list'))
×
193
    return user
1✔
194

195

196
def hdx_get_org_member_info(user_id, org_name=None):
1✔
197
    context = {'model': model, 'session': model.Session,
1✔
198
               'user': c.user or c.author}
199

200
    if org_name:
1✔
201
        org_list = [{'name': org_name}]
1✔
202
    else:
203
        org_list = tk.get_action('hdx_organization_list_for_user')(context, {'id': user_id})
×
204

205
    user = tk.get_action('hdx_basic_user_info')(context, {'id': user_id})
1✔
206
    orgs_data = []
1✔
207

208
    for org in org_list:
1✔
209
        try:
1✔
210
            maint_pkgs = _get_packages_for_maintainer(context, user_id, org['name'])
1✔
211

212
            if maint_pkgs:
1✔
213
                org['pkgs'] = maint_pkgs
×
214
                orgs_data.append(org)
×
215
                user['maint_orgs_pkgs'] = orgs_data
×
216
        except logic.NotAuthorized:
×
217
            base.abort(403, _('Unauthorized to see organization member list'))
×
218
    return user
1✔
219

220

221
# HDX-8554 - org has to be organization name
222
def _get_packages_for_maintainer(context, id, org_name):
1✔
223
    result = logic.get_action('package_search')(context, {
1✔
224
        'q': '*:*',
225
        'fq': 'maintainer:{0}, organization:{1}'.format(id, org_name),
226
        'rows': 100,
227
    })
228
    return result['results']
1✔
229

230

231
def markdown_extract_strip(text, extract_length=190):
1✔
232
    ''' return the plain text representation of markdown encoded text.  That
233
    is the texted without any html tags.  If extract_length is 0 then it
234
    will not be truncated.'''
235
    result_text = h.markdown_extract(text, extract_length)
1✔
236
    result = result_text.rstrip('\n').replace(
1✔
237
        '\n', ' ').replace('\r', '').replace('"', "&quot;")
238
    return result
1✔
239

240

241
def render_markdown_strip(text, extract_length=190):
1✔
242
    ''' return the plain text representation of markdown encoded text.  That
243
    is the texted without any html tags.  If extract_length is 0 then it
244
    will not be truncated.'''
245
    result_text = h.render_markdown(text, extract_length)
×
246
    result = result_text.rstrip('\n').replace(
×
247
        '\n', ' ').replace('\r', '').replace('"', "&quot;")
248
    return result
×
249

250

251
def methodology_bk_compat(meth, other, render=True):
1✔
252
    if not meth and not other:
1✔
253
        return (None, None)
×
254
    standard_meths = ["Census", "Sample Survey",
1✔
255
                      "Direct Observational Data/Anecdotal Data", "Registry", "Other"]
256
    if meth in standard_meths and meth != "Other":
1✔
257
        if render:
×
258
            return (meth, None)
×
259
        else:
260
            return (meth, None)
×
261
    elif other:
1✔
262
        if render:
×
263
            return ("Other", h.render_markdown(other))
×
264
        else:
265
            return ("Other", other)
×
266
    else:
267
        meth = meth.split('Other - ')
1✔
268
        if render:
1✔
269
            return ("Other", h.render_markdown(meth[0]))
1✔
270
        else:
271
            return ("Other", meth[0])
×
272

273

274
def _hdx_strftime(_date):
1✔
275
    result = None
1✔
276
    try:
1✔
277
        result = _date.strftime('%B %d, %Y')
1✔
278
    except ValueError as e:
×
279
        month = datetime.date(1900, _date.month, 1).strftime('%B')
×
280
        result = month + " " + str(_date.day) + ", " + str(_date.year)
×
281
    return result
1✔
282

283

284
def render_date_from_concat_str(_str, separator='-'):
1✔
285
    result = ''
1✔
286
    if _str:
1✔
287
        if 'TO' in _str:
1✔
288
            res_list = []
1✔
289
            dates_list = str(_str).replace('[', '').replace(']', '').replace(' ', '').split('TO')
1✔
290
            for date in dates_list:
1✔
291
                if '*' not in date:
1✔
292
                    _date = datetime.datetime.strptime(date.split('T')[0], "%Y-%m-%d")
1✔
293
                    res_list.append(_hdx_strftime(_date))
1✔
294
                    # _date.strftime('%B %d, %Y'))
295
                else:
296
                    res_list.append(datetime.datetime.today().strftime('%B %d, %Y'))
×
297
            result = '-'.join(res_list)
1✔
298
        else:
299
            strdate_list = _str.split(separator)
×
300
            for index, strdate in enumerate(strdate_list):
×
301
                try:
×
302
                    date = datetime.datetime.strptime(strdate.strip(), '%m/%d/%Y')
×
303
                    render_strdate = date.strftime('%B %d, %Y')
×
304
                    result += render_strdate
×
305
                    if index < len(strdate_list) - 1:
×
306
                        result += ' - '
×
307
                except ValueError as e:
×
308
                    log.warning(e)
×
309

310
    return result
1✔
311

312

313
def hdx_build_nav_icon_with_message(menu_item, title, **kw):
1✔
314
    htmlResult = h.build_nav_icon(menu_item, title, **kw)
×
315
    if 'message' not in kw or not kw['message']:
×
316
        return htmlResult
×
317
    else:
318
        newResult = str(htmlResult).replace('</a>',
×
319
                                            ' <span class="nav-short-message">{message}</span></a>'.format(
320
                                                message=kw['message']))
321
        return h.literal(newResult)
×
322

323

324
def hdx_build_nav_no_icon(menu_item, title, **kw):
1✔
325
    html_result = str(h.build_nav_icon(menu_item, title, **kw))
×
326
    # print html_result
327
    start = html_result.find('<i ') - 1
×
328
    end = html_result.find('</i>') + 4
×
329
    if start > 0:
×
330
        new_result = html_result[0:start] + ' class="no-icon">' + html_result[end:]
×
331
    else:
332
        new_result = html_result
×
333
    # print new_result
334
    return h.literal(new_result)
×
335

336

337
def hdx_linked_user(user, maxlength=0):
1✔
338
    response = h.linked_user(user, maxlength)
1✔
339
    changed_response = re.sub(r"<img[^>]+/>", "", response)
1✔
340
    return h.literal(changed_response)
1✔
341

342

343
def hdx_linked_username(user, userobj, maxlength=0, avatar=20):
1✔
344
    if not isinstance(user, model.User):
1✔
345
        user_name = text_type(user)
1✔
346
        user = model.User.get(user_name)
1✔
347
        if not user:
1✔
348
            return user_name
×
349
    if user:
1✔
350
        name = user.name if model.User.VALID_NAME.match(user.name) else user.id
1✔
351
        display_name = user.display_name if userobj else user.name
1✔
352

353
        if maxlength and len(display_name) > maxlength:
1✔
354
            display_name = display_name[:maxlength] + '...'
×
355

356
        if userobj:
1✔
357
            link = h.link_to(display_name, url_for('user.read', id=name))
1✔
358
        else:
359
            link = '''<a onclick="showOnboardingWidget('#loginPopup');" href="#" aria-label="login">%s</a>''' % display_name
1✔
360

361
        return h.literal(u'{icon} {link}'.format(
1✔
362
            icon=h.user_image(
363
                user.id,
364
                size=avatar
365
            ),
366
            link=link
367
        ))
368

369

370
def hdx_show_singular_plural(num, singular_word, plural_word, show_number=True):
1✔
371
    response = None
1✔
372
    if num == 1:
1✔
373
        response = singular_word
1✔
374
    else:
375
        response = plural_word
1✔
376

377
    if show_number:
1✔
378
        return str(num) + ' ' + response
1✔
379
    else:
380
        return response
×
381

382

383
def hdx_num_of_new_related_items():
1✔
384
    max_days = 30
×
385
    count = 0
×
386
    now = datetime.datetime.now()
×
387
    for related in c.pkg.related:
×
388
        if (related.created):
×
389
            difference = now - related.created
×
390
            days = difference.days
×
391
            if days < max_days:
×
392
                count += 1
×
393
    return count
×
394

395

396
def hdx_user_count(only_sysadmin=False, include_site_user=False):
1✔
397
    site_id = config.get('ckan.site_id')
1✔
398
    q = model.Session.query(model.User).filter(model.User.state != 'deleted')
1✔
399
    if only_sysadmin:
1✔
400
        q = q.filter(model.User.sysadmin.is_(True))
1✔
401
    if not include_site_user:
1✔
402
        q = q.filter(model.User.name != site_id)
1✔
403
    return q.count()
1✔
404

405

406
def hdx_member_roles_list():
1✔
407
    context = {'model': model, 'session': model.Session,
1✔
408
               'user': c.user or c.author}
409
    return tk.get_action('member_roles_list')(context, {})
1✔
410

411

412
def hdx_version():
1✔
413
    return version.hdx_version
1✔
414

415

416
def hdx_get_extras_element(data_dict, key='key', value_key='org_url', ret_key='value'):
1✔
417
    res = ''
1✔
418

419
    if value_key in data_dict:
1✔
420
        res = data_dict[value_key]
1✔
421
    else:
422
        extras = data_dict.get('extras', [])
1✔
423
        for ex in extras:
1✔
424
            if ex[key] == value_key:
×
425
                res = ex[ret_key]
×
426
    return res
1✔
427

428

429
def load_json(obj, **kw):
1✔
430
    return json.loads(obj, **kw)
1✔
431

432

433
def escaped_dump_json(obj, **kw):
1✔
434
    # escapes </ to prevent script tag hacking.
435
    return json.dumps(obj, **kw).replace('</', '<\\/')
×
436

437

438
def hdx_group_followee_list():
1✔
439
    context = {'model': model, 'session': model.Session,
1✔
440
               'user': c.user or c.author, 'auth_user_obj': c.userobj,
441
               'for_view': True}
442

443
    list = logic.get_action('group_followee_list')(
1✔
444
        context, {'id': c.userobj.id})
445
    # filter out the orgs
446
    groups = [group for group in list if not group['is_organization']]
1✔
447

448
    return groups
1✔
449

450

451
def hdx_organizations_available_with_roles():
1✔
452
    """
453
    Gets roles of organizations the current user belongs to
454
    """
455
    import ckanext.hdx_org_group.helpers.organization_helper as hdx_helper
1✔
456
    organizations_available = h.organizations_available('read', include_dataset_count=True)
1✔
457
    # if organizations_available and len(organizations_available) > 0:
458
    orgs_where_editor = []
1✔
459
    orgs_where_admin = []
1✔
460
    am_sysadmin = new_authz.is_sysadmin(c.user)
1✔
461
    if not am_sysadmin:
1✔
462
        orgs_where_editor = set([org['id'] for org in h.organizations_available('create_dataset')])
1✔
463
        orgs_where_admin = set([org['id'] for org in h.organizations_available('admin')])
1✔
464

465
    for org in organizations_available:
1✔
466
        org['has_add_dataset_rights'] = True
×
467
        if am_sysadmin:
×
468
            org['role'] = 'sysadmin'
×
469
        elif org['id'] in orgs_where_admin:
×
470
            org['role'] = 'admin'
×
471
        elif org['id'] in orgs_where_editor:
×
472
            org['role'] = 'editor'
×
473
        else:
474
            org['role'] = 'member'
×
475
            org['has_add_dataset_rights'] = False
×
476

477
    organizations_available.sort(key=lambda y: y['display_name'].lower())
1✔
478
    hdx_helper.org_add_last_updated_field(organizations_available)
1✔
479
    return organizations_available
1✔
480

481

482
def hdx_remove_schema_and_domain_from_url(url):
1✔
483
    urlTuple = urlparse.urlparse(url)
×
484
    if url.endswith('/preview'):
×
485
        # this is the case when the file needs to be transformed
486
        # before it can be previewed
487

488
        modifiedTuple = (('', '') + urlTuple[2:6])
×
489
    else:
490
        # this is for txt files
491
        # we force https since otherwise the browser will
492
        # anyway block loading mixed active content
493
        modifiedTuple = (('',) + urlTuple[1:6])
×
494
    modifiedUrl = urlparse.urlunparse(modifiedTuple)
×
495
    return modifiedUrl
×
496

497

498
def hdx_get_ckan_config(config_name):
1✔
499
    return config.get(config_name)
1✔
500

501

502
def get_group_name_from_list(glist, gid):
1✔
503
    for group in glist:
×
504
        if group['id'] == gid:
×
505
            return group['title']
×
506
    return ""
×
507

508

509
def hdx_follow_link(obj_type, obj_id, extra_text, cls=None, confirm_text=None):
1✔
510
    obj_type = obj_type.lower()
1✔
511
    assert obj_type in h._follow_objects
1✔
512
    # If the user is logged in show the follow/unfollow button
513
    if c.user:
1✔
514
        context = {'model': model, 'session': model.Session, 'user': c.user}
1✔
515
        action = 'am_following_%s' % obj_type
1✔
516
        following = logic.get_action(action)(context, {'id': obj_id})
1✔
517
        return h.snippet('search/snippets/follow_link.html',
1✔
518
                         following=following,
519
                         obj_id=obj_id,
520
                         obj_type=obj_type,
521
                         extra_text=extra_text,
522
                         confirm_text=confirm_text,
523
                         cls=cls)
524
    return ''
×
525

526

527
def follow_status(obj_type, obj_id):
1✔
528
    obj_type = obj_type.lower()
1✔
529
    assert obj_type in h._follow_objects
1✔
530
    # If the user is logged in show the follow/unfollow button
531
    if c.user:
1✔
532
        context = {'model': model, 'session': model.Session, 'user': c.user}
1✔
533
        action = 'am_following_%s' % obj_type
1✔
534
        following = logic.get_action(action)(context, {'id': obj_id})
1✔
535
        return following
1✔
536
    return False
1✔
537

538

539
def one_active_item(items):
1✔
540
    for i in items:
×
541
        if i['active']:
×
542
            return True
×
543
    return False
×
544

545

546
def feature_count(features):
1✔
547
    count = 0
×
548
    for name in features:
×
549
        facet = h.get_facet_items_dict(name)
×
550
        for f in facet:
×
551
            count += f['count']
×
552
    return count
×
553

554

555
def hdx_follow_button(obj_type, obj_id, **kw):
1✔
556
    ''' This is a modified version of the ckan core follow_button() helper
557
    It returns a simple link for a bootstrap dropdown menu
558

559
    Return a follow button for the given object type and id.
560

561
    If the user is not logged in return an empty string instead.
562

563
    :param obj_type: the type of the object to be followed when the follow
564
        button is clicked, e.g. 'user' or 'dataset'
565
    :type obj_type: string
566
    :param obj_id: the id of the object to be followed when the follow button
567
        is clicked
568
    :type obj_id: string
569

570
    :returns: a follow button as an HTML snippet
571
    :rtype: string
572

573
    '''
574
    obj_type = obj_type.lower()
×
575
    assert obj_type in h._follow_objects
×
576
    # If the user is logged in show the follow/unfollow button
577
    if c.user:
×
578
        context = {'model': model, 'session': model.Session, 'user': c.user}
×
579
        action = 'am_following_%s' % obj_type
×
580
        following = logic.get_action(action)(context, {'id': obj_id})
×
581
        follow_extra_text = _('This Data')
×
582
        if kw and 'follow_extra_text' in kw:
×
583
            follow_extra_text = kw.pop('follow_extra_text')
×
584
        return h.snippet('snippets/hdx_follow_button.html',
×
585
                         following=following,
586
                         obj_id=obj_id,
587
                         obj_type=obj_type,
588
                         follow_extra_text=follow_extra_text,
589
                         params=kw)
590
    return ''
×
591

592

593
def hdx_add_url_param(alternative_url=None, controller=None, action=None,
1✔
594
                      extras=None, new_params=None, unwanted_keys=[]):
595
    '''
596
    MODIFIED CKAN HELPER THAT ALLOWS REMOVING SOME PARAMS
597

598
    Adds extra parameters to existing ones
599

600
    controller action & extras (dict) are used to create the base url
601
    via url_for(controller=controller, action=action, **extras)
602
    controller & action default to the current ones
603

604
    This can be overriden providing an alternative_url, which will be used
605
    instead.
606
    '''
607

608
    params_nopage = [(k, v) for k, v in request.params.items()
×
609
                     if k != 'page' and k not in unwanted_keys]
610
    params = set(params_nopage)
×
611
    if new_params:
×
612
        params |= set(new_params.items())
×
613
    if alternative_url:
×
614
        return h._url_with_params(alternative_url, params)
×
615
    return h._create_url_with_params(params=params, controller=controller,
×
616
                                     action=action, extras=extras)
617

618

619
def https_load(url):
1✔
620
    return re.sub(r'^http://', '//', url)
×
621

622

623
def count_public_datasets_for_group(datasets_list):
1✔
624
    a = len([i for i in datasets_list if i['private'] == False])
×
625
    return a
×
626

627

628
def check_all_str_fields_not_empty(dictionary, warning_template, skipped_keys=[], errors=None):
1✔
629
    for key, value in dictionary.items():
1✔
630
        if key not in skipped_keys:
1✔
631
            value = value.strip() if value else value
1✔
632
            if not value:
1✔
633
                message = warning_template.format(key)
×
634
                log.warning(message)
×
635
                add_error('Empty field', message, errors)
×
636
                return False
×
637
    return True
1✔
638

639

640
def add_error(type, message, errors):
1✔
641
    if isinstance(errors, list):
×
642
        errors.append({'_type': type, 'message': message})
×
643

644

645
def hdx_popular(type_, number, min=1, title=None):
1✔
646
    ''' display a popular icon. '''
647
    from ckan.lib.helpers import snippet as snippet
×
648
    if type_ == 'views':
×
649
        title = ungettext('{number} view', '{number} views', number)
×
650
    elif type_ == 'recent views':
×
651
        title = ungettext('{number} recent view', '{number} recent views', number)
×
652
    elif type_ == 'downloads':
×
653
        title = ungettext('{number} download', '{number} downloads', number)
×
654
    elif not title:
×
655
        raise Exception('popular() did not recieve a valid type_ or title')
×
656
    return snippet('snippets/popular.html', title=title, number=number, min=min)
×
657

658

659
def hdx_license_list():
1✔
660
    license_touple_list = model.Package.get_license_options()
1✔
661
    license_dict_list = [{'value': _id, 'text': _title} for _title, _id in license_touple_list]
1✔
662
    return license_dict_list
1✔
663

664

665
def hdx_methodology_list():
1✔
666
    result = [{'value': '-1', 'text': '-- Please select --'}, {'value': 'Census', 'text': 'Census'},
1✔
667
              {'value': 'Sample Survey', 'text': 'Sample Survey'},
668
              {'value': 'Direct Observational Data/Anecdotal Data', 'text': 'Direct Observational Data/Anecdotal Data'},
669
              {'value': 'Registry', 'text': 'Registry'}, {'value': 'Other', 'text': 'Other'}]
670
    return result
1✔
671

672

673
def hdx_location_list(include_world=True):
1✔
674
    top_values = []
1✔
675
    if include_world:
1✔
676
        top_values.append('world')
1✔
677

678
    top_locations = []
1✔
679
    bottom_locations = []
1✔
680

681
    locations = logic.get_action('cached_group_list')({}, {})
1✔
682
    for loc in locations:
1✔
683
        location_data = {'value': loc.get('name'), 'text': loc.get('title')}
1✔
684
        if loc.get('name') in top_values:
1✔
685
            top_locations.append(location_data)
1✔
686
        else:
687
            bottom_locations.append(location_data)
1✔
688

689
    return top_locations + bottom_locations
1✔
690

691

692
def hdx_location_dict(include_world=True):
1✔
NEW
693
    world_location_name = 'world'
×
NEW
694
    top_values = [world_location_name] if include_world else []
×
695

NEW
696
    locations = logic.get_action('cached_group_list')({}, {})
×
697

NEW
698
    top_locations = OrderedDict()
×
NEW
699
    bottom_locations = OrderedDict()
×
700

NEW
701
    for loc in locations:
×
NEW
702
        key = loc.get('title')
×
NEW
703
        value = loc.get('title')
×
NEW
704
        if loc.get('name') in top_values:
×
NEW
705
            top_locations[key] = value
×
706
        else:
NEW
707
            if loc.get('name') == world_location_name and include_world is False:
×
NEW
708
                continue
×
NEW
709
            bottom_locations[key] = value
×
710

NEW
711
    return OrderedDict(list(top_locations.items()) + list(bottom_locations.items()))
×
712

713

714
def hdx_user_orgs_dict(user_id, include_org_type=False):
1✔
NEW
715
    try:
×
NEW
716
        orgs = _get_action('organization_list_for_user', {'id': user_id})
×
717

NEW
718
        if include_org_type:
×
NEW
719
            query = model.Session.query(model.GroupExtra).filter_by(key='hdx_org_type', state='active')
×
NEW
720
            org_extras = query.all()
×
721

NEW
722
            extras = {org_extra.group_id: org_extra.value for org_extra in org_extras}
×
723

NEW
724
            for org in orgs:
×
NEW
725
                org_id = org.get('id')
×
NEW
726
                if org_id in extras:
×
NEW
727
                    org['org_type'] = extras[org_id]
×
728

NEW
729
        result = OrderedDict()
×
NEW
730
        for org in orgs:
×
NEW
731
            org_data = {'name': org.get('display_name')}
×
NEW
732
            if include_org_type and 'org_type' in org:
×
NEW
733
                org_data['org_type'] = org['org_type']
×
NEW
734
            result[org.get('display_name')] = org_data
×
735

NEW
736
        return result
×
737

NEW
738
    except Exception:
×
NEW
739
        return OrderedDict()
×
740

741

742
def hdx_organisation_list():
1✔
743
    orgs = h.organizations_available('create_dataset')
1✔
744
    orgs_dict_list = [{'value': org.get('name'), 'text': org.get('title')} for org in orgs]
1✔
745
    return orgs_dict_list
1✔
746

747

748
def hdx_tag_list():
1✔
749
    if c.user:
×
750
        context = {'model': model, 'session': model.Session, 'user': c.user}
×
751
        tags = logic.get_action('tag_list')(context, {})
×
752
        tags_dict_list = [{'value': tag, 'text': tag} for tag in tags]
×
753
        return tags_dict_list
×
754
    return []
×
755

756

757
def hdx_dataset_preview_values_list():
1✔
758
    import ckanext.hdx_package.helpers.custom_validator as vd
1✔
759
    result = [{'value': vd._DATASET_PREVIEW_FIRST_RESOURCE, 'text': 'Default (first resource with preview)'}]
1✔
760
    return result
1✔
761

762

763
def hdx_frequency_list(for_sysadmin=False, include_value=None):
1✔
764
    result = [
1✔
765
        {'value': key, 'text': val['title'], 'onlySysadmin': False}
766
        for key, val in UPDATE_FREQ_INFO.items()
767
    ]
768
    result.insert(0, {'value': '-999', 'text': '-- Please select --', 'onlySysadmin': False})
1✔
769

770
    filtered_result = result
1✔
771
    if not for_sysadmin:
1✔
772
        filtered_result = [r for r in result if not r.get('onlySysadmin') or include_value == r.get('value')]
×
773

774
    return filtered_result
1✔
775

776

777
def hdx_get_frequency_by_value(value):
1✔
778
    return UPDATE_FREQ_INFO.get(value, {}).get('title', '')
1✔
779

780

781
# def hdx_get_layer_info(id=None):
782
#     layer = explorer.explorer_data.get(id)
783
#     return layer
784

785

786
def hdx_get_carousel_list():
1✔
787
    return logic.get_action('hdx_carousel_settings_show')({'max_items': 3}, {})
1✔
788

789

790
def hdx_get_quick_links_list(archived=None):
1✔
791
    result = logic.get_action('hdx_quick_links_settings_show')({}, {})
1✔
792
    if archived in (True, False):
1✔
793
        result = [item for item in result if item.get('archived',False) == archived]
1✔
794
    return result
1✔
795

796

797
def _get_context():
1✔
798
    return {
1✔
799
        'model': model,
800
        'session': model.Session,
801
        'user': c.user or c.author,
802
        'auth_user_obj': c.userobj
803
    }
804

805

806
def _get_action(action, data_dict):
1✔
807
    return toolkit.get_action(action)(_get_context(), data_dict)
1✔
808

809

810
def hdx_is_current_user_a_maintainer(maintainers, pkg):
1✔
811
    if c.user:
1✔
812
        current_user = _get_action('user_show', {'id': c.user})
1✔
813
        user_id = current_user.get('id')
1✔
814
        user_name = current_user.get('name')
1✔
815

816
        if maintainers and (user_id in maintainers or user_name in maintainers):
1✔
817
            return True
1✔
818

819
    return False
1✔
820

821

822
def hdx_is_sysadmin(user_id):
1✔
823
    import ckan.authz as authz
×
824
    user_obj = model.User.get(user_id)
×
825
    if not user_obj:
×
826
        return False
×
827
        # raise logic.NotFound
828
    user = user_obj.name
×
829
    return authz.is_sysadmin(user)
×
830

831

832
def hdx_organization_list_for_user(user_id):
1✔
833
    orgs = []
×
834
    if user_id:
×
835
        # context = {
836
        #     'model': model,
837
        #     'session': model.Session,
838
        # }
839
        # user = model.User.get(user_id)
840
        # if user:
841
        #     context['auth_user_obj'] = user
842
        #     context['user'] = user.name
843
        # return tk.get_action('organization_list_for_user')(context, {'id': user_id})
844
        return tk.get_action('hdx_organization_list_for_user')(_get_context(), {'id': user_id})
×
845
    return orgs
×
846

847

848
def hdx_dataset_is_hxl(tag_list):
1✔
849
    for tag in tag_list:
1✔
850
        if tag.get('name') == 'hxl' and tag.get('display_name') == 'hxl':
1✔
851
            return True
×
852
    return False
1✔
853

854

855
def hdx_dataset_has_sadd(tag_list):
1✔
856
    for tag in tag_list:
1✔
857
        if tag.get('name') == 'sex and age disaggregated data - sadd' and tag.get(
×
858
            'display_name') == 'sex and age disaggregated data - sadd':
859
            return True
×
860
    return False
1✔
861

862

863
def hdx_switch_url_path():
1✔
864
    return switch_url_path()
×
865

866

867
def hdx_munge_title(title):
1✔
868
    return munge.munge_title_to_name(title)
1✔
869

870

871
def hdx_check_http_response(response_code, comparison_http_code):
1✔
872
    '''
873
    :param response_code: code generated by the python APP (can come from flask or pylons in different types)
874
    :type response_code: Union[int, str, list]
875
    :param comparison_http_code: what we're comparing against
876
    :type comparison_http_code: int
877

878
    :returns: whether the response_code matches the comparison_http_code
879
    :rtype: bool
880
    '''
881
    try:
1✔
882
        if response_code == comparison_http_code:
1✔
883
            return True
1✔
884
        elif response_code == str(comparison_http_code):
1✔
885
            return True
×
886
        elif (hasattr(response_code, "__len__")) and (response_code[0] == comparison_http_code):
1✔
887
            return True
1✔
888
    except TypeError as e:
×
889
        log.info('Following error was generated from hdx_check_http_response():' + text_type(e))
×
890
    return False
1✔
891

892

893
def hdx_get_request_param(param_name, default_value):
1✔
894
    '''
895
    This should get the request param value whether we're in pylons or flask
896
    '''
897
    try:
1✔
898
        value = request.args.get(param_name)
1✔
899
    except Exception as e:
×
900
        log.warning('Error when looking into "args" of request. This could be normal in a pylons request: '
×
901
                    + text_type(e))
902
        value = request.params.get(param_name)
×
903

904
    value = default_value if value is None else value
1✔
905
    return value
1✔
906

907

908
def hdx_url_for(*args, **kw):
1✔
909
    '''
910
    Removes the '/' at the end of an URL returned by the core CKAN url_for() helper.
911
    It appears when url_for() thinks it can return a flask route. But if it's a pylons
912
    controller that needs to render the page the '/' gets in the way.
913
    '''
914

915
    # If the html template calls this with both named route and controller + action, just use named route
916
    if len(args) > 0 and args[0]:
1✔
917
        kw.pop('controller', None)
1✔
918
        kw.pop('action', None)
1✔
919
    else:
920
        args = []
1✔
921

922
    url = tk.url_for(*args, **kw)
1✔
923
    if url and len(url) > 1:
1✔
924
        if url.endswith('/'):
1✔
925
            url = url[:-1]
1✔
926
        elif '/?' in url:
1✔
927
            url = url.replace('/?', '?')
1✔
928
    return url
1✔
929

930

931
url_for = hdx_url_for
1✔
932

933

934
def hdx_pending_request_data(user_id, package_id):
1✔
935
    return requestdata_model.ckanextRequestdata.get_pending_requests(package_id, user_id)
1✔
936

937

938
def hdx_dataset_is_p_coded(resource_list):
1✔
939
    for resource in resource_list:
1✔
940
        if resource.get('p_coded'):
1✔
941
            return True
×
942
    return False
1✔
943

944

945
def hdx_get_approved_tags_list():
1✔
946
    context = {'model': model, 'session': model.Session, 'user': c.user}
1✔
947

948
    approved_tags = logic.get_action('cached_approved_tags_list')(context, {})
1✔
949
    approved_tags_dict_list = [{'value': tag, 'text': tag} for tag in approved_tags]
1✔
950

951
    return approved_tags_dict_list
1✔
952

953

954
def bs5_build_nav_icon(menu_item, title, **kw):
1✔
955
    '''Build a navigation item used for example in ``user/read_base.html``.
956

957
    Outputs ``<li class="nav-item"><a href="..." class=""nav-link"><i class="icon.."></i> title</a></li>``.
958

959
    :param menu_item: the name of the defined menu item defined in
960
      config/routing as the named route of the same name
961
    :type menu_item: string
962
    :param title: text used for the link
963
    :type title: string
964
    :param kw: additional keywords needed for creating url eg ``id=...``
965

966
    :rtype: HTML literal
967

968
    '''
969
    return _bs5_make_menu_item(menu_item, title, **kw)
1✔
970

971

972
def _bs5_make_menu_item(menu_item, title, **kw):
1✔
973
    ''' build a navigation item used for example breadcrumbs
974

975
    outputs <li class="nav-item"><a href="..." class="nav-link"></i> title</a></li>
976

977
    :param menu_item: the name of the defined menu item defined in
978
    config/routing as the named route of the same name
979
    :type menu_item: string
980
    :param title: text used for the link
981
    :type title: string
982
    :param **kw: additional keywords needed for creating url eg id=...
983

984
    :rtype: HTML literal
985

986
    This function is called by wrapper functions.
987
    '''
988
    controller, action = menu_item.split('.')
1✔
989
    item = {
1✔
990
        'action': action,
991
        'controller': controller
992
    }
993
    item.update(kw)
1✔
994
    # Remove highlight controllers so that they won't appear in generated urls.
995
    item.pop('highlight_controllers', False)
1✔
996
    link = h._link_to(title, menu_item, suppress_active_class=False, **item)
1✔
997
    return h.literal('<li class="nav-item">') + link + h.literal('</li>')
1✔
998

999

1000
def hdx_decode_markup(value):
1✔
1001
    try:
×
1002
        unescaped_value = value.unescape()
×
1003
        decoded_value = json.loads(unescaped_value.replace("'", '"'))
×
1004

1005
        if isinstance(decoded_value, dict):
×
1006
            messages = []
×
1007
            for error_field, error_messages in decoded_value.items():
×
1008
                for error_message in error_messages:
×
1009
                    messages.append(error_message.strip().rstrip('.'))
×
1010
            return '. '.join(messages) + '.'
×
1011
        else:
1012
            return decoded_value
×
1013

1014
    except Exception as e:
×
1015
        return value
×
1016

1017
def hdx_generate_basemap_config_string() -> str:
1✔
1018
    conf_dict = {
×
1019
        'baseMapUrl': config.get('hdx.mapbox.baselayer.url'),
1020
        'token': config.get('hdx.mapbox.baselayer.token'),
1021
    }
1022
    return json.dumps(conf_dict)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc