• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #6168

12 Mar 2025 07:32AM UTC coverage: 74.93% (-0.06%) from 74.987%
#6168

push

coveralls-python

danmihaila
HDX-10466 fix fresh flag test

12598 of 16813 relevant lines covered (74.93%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.76
/ckanext-hdx_package/ckanext/hdx_package/helpers/helpers.py
1
import datetime
1✔
2
import json
1✔
3
import logging
1✔
4
import requests
1✔
5
import six.moves.urllib.parse as urlparse
1✔
6
import dateutil.parser
1✔
7

8
import ckanext.hdx_package.helpers.custom_validator as vd
1✔
9
import ckanext.hdx_package.helpers.analytics as analytics
1✔
10

11
from ckanext.hdx_package.exceptions import NoOrganization
1✔
12
from ckanext.hdx_package.helpers.caching import cached_group_iso_to_title
1✔
13
from ckanext.hdx_package.helpers.constants import UPDATE_FREQ_LIVE
1✔
14
from ckanext.hdx_package.helpers.freshness_calculator import FreshnessCalculator
1✔
15

16

17
import ckan.authz as new_authz
1✔
18
import ckan.lib.base as base
1✔
19
import ckan.lib.helpers as h
1✔
20
import ckan.model as model
1✔
21
import ckan.model.misc as misc
1✔
22
import ckan.model.package as package
1✔
23
import ckan.plugins.toolkit as tk
1✔
24
from ckan.common import _, c, request
1✔
25
from ckan.types import Context, DataDict
1✔
26

27

28
log = logging.getLogger(__name__)
1✔
29

30
g = tk.g
1✔
31
config = tk.config
1✔
32
get_action = tk.get_action
1✔
33
_check_access = tk.check_access
1✔
34
_get_or_bust = tk.get_or_bust
1✔
35
NotFound = tk.ObjectNotFound
1✔
36
ValidationError = tk.ValidationError
1✔
37
NotAuthorized = tk.NotAuthorized
1✔
38

39
def build_additions(groups):
1✔
40
    """
41
    Builds additions for solr searches
42
    """
43
    countries = []
1✔
44
    for g in groups:
1✔
45
        try:
1✔
46
            if 'name' in g:
1✔
47
                # grp_list = cached_group_iso_to_title()
48
                # country = grp_list[g.get('name')] if g.get('name') in grp_list else g.get('name')
49
                countries.append(cached_group_iso_to_title()[g.get('name')])
1✔
50
        except Exception as e:
1✔
51
            ex_msg = e.message if hasattr(e, 'message') else str(e)
1✔
52
            log.error(ex_msg)
1✔
53
    return json.dumps({'countries':countries})
1✔
54

55

56
def hdx_user_org_num(user_id):
1✔
57
    """
58
    Get number of orgs for a specific user
59
    """
60
    context = {'model': model, 'session': model.Session,
×
61
               'user': c.user or c.author}
62
    try:
×
63
        user = get_action('organization_list_for_user')(
×
64
            context, {'id': user_id, 'permission': 'create_dataset'})
65
    except NotAuthorized:
×
66
        base.abort(403, _('Unauthorized to see organization member list'))
×
67

68
    return user
×
69

70

71
# def hdx_organizations_available_with_roles():
72
#     """
73
#     Gets roles of organizations the current user belongs to
74
#     """
75
#     organizations_available = h.organizations_available('read')
76
#     if organizations_available and len(organizations_available) > 0:
77
#         orgs_where_editor = []
78
#         orgs_where_admin = []
79
#     am_sysadmin = new_authz.is_sysadmin(c.user)
80
#     if not am_sysadmin:
81
#         orgs_where_editor = set(
82
#             [org['id'] for org in h.organizations_available('create_dataset')])
83
#         orgs_where_admin = set([org['id']
84
#                                 for org in h.organizations_available('admin')])
85
#
86
#     for org in organizations_available:
87
#         org['has_add_dataset_rights'] = True
88
#         if am_sysadmin:
89
#             org['role'] = 'sysadmin'
90
#         elif org['id'] in orgs_where_admin:
91
#             org['role'] = 'admin'
92
#         elif org['id'] in orgs_where_editor:
93
#             org['role'] = 'editor'
94
#         else:
95
#             org['role'] = 'member'
96
#             org['has_add_dataset_rights'] = False
97
#
98
#     organizations_available.sort(key=lambda y:
99
#                                  y['display_name'].lower())
100
#     return organizations_available
101

102

103
# def hdx_get_activity_list(context, data_dict):
104
#     """
105
#     Get activity list for a given package
106
#
107
#     """
108
#     try:
109
#         activity_stream = get_action('package_activity_list')(context, data_dict)
110
#     except Exception as ex:
111
#         log.exception(ex)
112
#         activity_stream = []
113
#     #activity_stream = package_activity_list(context, data_dict)
114
#     offset = int(data_dict.get('offset', 0))
115
#     extra_vars = {
116
#         'controller': 'package',
117
#         'action': 'activity',
118
#         'id': data_dict['id'],
119
#         'offset': offset,
120
#     }
121
#     return _activity_list(context, activity_stream, extra_vars)
122

123

124
def hdx_find_license_name(license_id, license_name):
1✔
125
    """
126
    Look up license name by id
127
    """
128
    if license_name == None or len(license_name) == 0 or license_name == license_id:
1✔
129
        license_dict = {l.id: l.title
1✔
130
                        for l in package.Package._license_register.licenses}
131
        if license_id in license_dict:
1✔
132
            return license_dict[license_id]
×
133
    return license_name
1✔
134

135

136
# code copied from activity_streams.activity_list_to_html and modified to
137
# return only the activity list
138
# def _activity_list(context, activity_stream, extra_vars):
139
#     '''Return the given activity stream
140
#
141
#     :param activity_stream: the activity stream to render
142
#     :type activity_stream: list of activity dictionaries
143
#     :param extra_vars: extra variables to pass to the activity stream items
144
#         template when rendering it
145
#     :type extra_vars: dictionary
146
#
147
#
148
#     '''
149
#     activity_list = []  # These are the activity stream messages.
150
#     for activity in activity_stream:
151
#         detail = None
152
#         activity_type = activity['activity_type']
153
#         # Some activity types may have details.
154
#         if activity_type in activity_streams.activity_stream_actions_with_detail:
155
#             details = get_action('activity_detail_list')(context=context,
156
#                                                                data_dict={'id': activity['id']})
157
#             # If an activity has just one activity detail then render the
158
#             # detail instead of the activity.
159
#             if len(details) == 1:
160
#                 detail = details[0]
161
#                 object_type = detail['object_type']
162
#
163
#                 if object_type == 'PackageExtra':
164
#                     object_type = 'package_extra'
165
#
166
#                 new_activity_type = '%s %s' % (detail['activity_type'],
167
#                                                object_type.lower())
168
#                 if new_activity_type in activity_streams.activity_stream_string_functions:
169
#                     activity_type = new_activity_type
170
#
171
#         if not activity_type in activity_streams.activity_stream_string_functions:
172
#             raise NotImplementedError("No activity renderer for activity "
173
#                                       "type '%s'" % activity_type)
174
#
175
#         if activity_type in activity_streams.activity_stream_string_icons:
176
#             activity_icon = activity_streams.activity_stream_string_icons[
177
#                 activity_type]
178
#         else:
179
#             activity_icon = activity_streams.activity_stream_string_icons[
180
#                 'undefined']
181
#
182
#         activity_msg = activity_streams.activity_stream_string_functions[activity_type](context,
183
#                                                                                         activity)
184
#
185
#         # Get the data needed to render the message.
186
#         matches = re.findall('\{([^}]*)\}', activity_msg)
187
#         data = {}
188
#         for match in matches:
189
#             snippet = activity_streams.activity_snippet_functions[
190
#                 match](activity, detail)
191
#             data[str(match)] = snippet
192
#
193
#         activity_list.append({'msg': activity_msg,
194
#                               'type': activity_type.replace(' ', '-').lower(),
195
#                               'icon': activity_icon,
196
#                               'data': data,
197
#                               'timestamp': activity['timestamp'],
198
#                               'is_new': activity.get('is_new', False)})
199
#     extra_vars['activities'] = activity_list
200
#     return extra_vars
201

202

203
def hdx_tag_autocomplete_list(context: Context, data_dict: DataDict):
1✔
204
    """Return a list of tag names that contain a given string.
205

206
    By default, only free tags (tags that don't belong to any vocabulary) are
207
    searched. If the ``vocabulary_id`` argument is given then only tags
208
    belonging to that vocabulary will be searched instead.
209

210
    This was changed to return a list of approved tag names that contain a given string.
211

212
    :param query: the string to search for
213
    :type query: string
214
    :param vocabulary_id: the id or name of the tag vocabulary to search in
215
      (optional)
216
    :type vocabulary_id: string
217
    :param fields: deprecated
218
    :type fields: dictionary
219
    :param limit: the maximum number of tags to return
220
    :type limit: int
221
    :param offset: when ``limit`` is given, the offset to start returning tags
222
        from
223
    :type offset: int
224

225
    :rtype: list of strings
226

227
    """
228
    _check_access('tag_autocomplete', context, data_dict)
1✔
229

230
    approved_tags = get_action('cached_approved_tags_list')(context, {})
1✔
231
    query = data_dict.get('q', '').lower()
1✔
232

233
    is_sysadmin = new_authz.is_sysadmin(c.user)
1✔
234

235
    matching_tags = []
1✔
236

237
    for tag in approved_tags:
1✔
238
        if query in tag.lower():
1✔
239
            # Only sysadmins are allowed to use tags starting with "crisis-"
240
            if tag.startswith('crisis-') and not is_sysadmin:
1✔
241
                continue
1✔
242
            matching_tags.append(tag)
1✔
243

244
    return matching_tags
1✔
245

246

247
def hdx_retrieve_approved_tags(context, data_dict):
1✔
248
    """
249
    Get approved tag names from Google Spreadsheet and return a list.
250
    """
251
    proxy_data_preview_url = config.get('hdx.hxlproxy.url') + '/api/data-preview.json'
1✔
252
    params = {
1✔
253
        'url': 'https://docs.google.com/spreadsheets/d/1fTO8T8ZVXU9eoh3EIrw490Z2pX7E59MhHmCvT_cXmNs/edit#gid=1261258630'
254
    }
255

256
    try:
1✔
257
        response = requests.get(proxy_data_preview_url, params=params)
1✔
258
        if response.status_code == 200:
1✔
259
            items = json.loads(response.content)[1:]
1✔
260
            ordered_items = sorted([item[0].lower() for item in items])
1✔
261
            return ordered_items
1✔
262
        else:
263
            log.error('Failed to fetch approved tags. Status code: %s', response.status_code)
×
264
            return []
×
265
    except Exception as e:
×
266
        log.error('Failed to fetch approved tags. Exception: %s', e)
×
267
        return []
×
268

269

270
def _tag_search(context, data_dict):
1✔
271
    """
272
    Searches tags for autocomplete, but makes sure only return active tags
273
    """
274
    model = context['model']
×
275

276
    terms = data_dict.get('query') or data_dict.get('q') or []
×
277
    if isinstance(terms, str):
×
278
        terms = [terms]
×
279
    terms = [t.strip() for t in terms if t.strip()]
×
280

281
    if 'fields' in data_dict:
×
282
        log.warning('"fields" parameter is deprecated.  '
×
283
                    'Use the "query" parameter instead')
284

285
    fields = data_dict.get('fields', {})
×
286
    offset = data_dict.get('offset')
×
287
    limit = data_dict.get('limit')
×
288

289
    # TODO: should we check for user authentication first?
290
    q = model.Session.query(model.Tag)
×
291

292
    if 'vocabulary_id' in data_dict:
×
293
        # Filter by vocabulary.
294
        vocab = model.Vocabulary.get(_get_or_bust(data_dict, 'vocabulary_id'))
×
295
        if not vocab:
×
296
            raise NotFound
×
297
        q = q.filter(model.Tag.vocabulary_id == vocab.id)
×
298

299
# CHANGES to initial version
300
#     else:
301
# If no vocabulary_name in data dict then show free tags only.
302
#         q = q.filter(model.Tag.vocabulary_id == None)
303
# If we're searching free tags, limit results to tags that are
304
# currently applied to a package.
305
#         q = q.distinct().join(model.Tag.package_tags)
306

307
    for field, value in fields.items():
×
308
        if field in ('tag', 'tags'):
×
309
            terms.append(value)
×
310

311
    if not len(terms):
×
312
        return [], 0
×
313

314
    for term in terms:
×
315
        escaped_term = misc.escape_sql_like_special_characters(
×
316
            term, escape='\\')
317
        q = q.filter(model.Tag.name.ilike('%' + escaped_term + '%'))
×
318

319
    # q = q.join('package_tags').filter(model.PackageTag.state == 'active')
320
    count = q.count()
×
321
    q = q.offset(offset)
×
322
    q = q.limit(limit)
×
323
    tags = q.all()
×
324
    return tags, count
×
325

326

327
def pkg_topics_list(data_dict):
1✔
328
    """
329
    Get a list of topics
330
    """
331
    pkg = model.Package.get(data_dict['id'])
×
332
    vocabulary = model.Vocabulary.get('Topics')
×
333
    topics = []
×
334
    if vocabulary:
×
335
        topics = pkg.get_tags(vocab=vocabulary)
×
336
    return topics
×
337

338

339
def get_tag_vocabulary(tags):
1✔
340
    """
341
    Get vocabulary for a given list of tags
342
    """
343
    for item in tags:
1✔
344
        topic = None
1✔
345
        tag_name = item['name'].lower()
1✔
346
        vocabulary = model.Vocabulary.get('Topics')
1✔
347
        if vocabulary:
1✔
348
            item['vocabulary_id'] = vocabulary.id
×
349
            topic = model.Tag.by_name(name=tag_name, vocab=vocabulary)
×
350
        if not topic:
1✔
351
            topic = model.Tag.by_name(name=tag_name)
1✔
352
        if topic:
1✔
353
            item['id'] = topic.as_dict().get('id')
1✔
354
        # else:
355
        #     raise NotFound
356
        item['name'] = tag_name
1✔
357
    return tags
1✔
358

359

360
def filesize_format(size_in_bytes):
1✔
361
    try:
1✔
362
        d = 1024.0
1✔
363
        size = int(size_in_bytes)
1✔
364

365
        # value = formatters.localised_filesize(size_in_bytes)
366
        # return value
367

368
        for unit in ['B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
×
369
            if size < d:
×
370
                return '%3.1f%s' % (size, unit)
×
371
            size /= d
×
372
        return '%.1f%s' % (size, 'Yi')
×
373
    except Exception as e:
1✔
374
        log.warn('Error occured when formatting the numner {}. Error {}'.format(size_in_bytes, str(e)))
1✔
375
        return size_in_bytes
1✔
376

377

378
def hdx_get_proxified_resource_url(data_dict, proxy_schemes=['http','https']):
1✔
379
    """
380
    This function replaces the one with the similar name from ckanext.resourceproxy.plugin .
381
    Changes:
382
    1) Don't look at the protocol when checking if it is the same domain
383
    2) Return a domain relative url (without schema, domain or port) for local resources.
384

385
    :param data_dict: contains a resource and package dict
386
    :type data_dict: dict
387
    :param proxy_schemes: list of url schemes to proxy for.
388
    :type data_dict: list
389
    """
390

391
    same_domain = is_ckan_domain(data_dict['resource']['url'])
×
392
    parsed_url = urlparse.urlparse(data_dict['resource']['url'])
×
393
    scheme = parsed_url.scheme
×
394

395
    if not same_domain and scheme in proxy_schemes:
×
396
        url = h.url_for(
×
397
            'resource_proxy.proxy_view',
398
            id=data_dict['package']['name'],
399
            resource_id=data_dict['resource']['id'])
400
        log.info('Proxified url is {0}'.format(url))
×
401
    else:
402
        url = urlparse.urlunparse(('', '') + parsed_url[2:])
×
403
    return url
×
404

405

406
def is_ckan_domain(url):
1✔
407
    """
408
    :param url: url to check whether it's on the same domain as ckan
409
    :type url: str
410
    :return: True if it's the same domain. False otherwise
411
    :rtype: bool
412
    """
413
    ckan_url = config.get('ckan.site_url', '//localhost:5000')
1✔
414
    parsed_url = urlparse.urlparse(url)
1✔
415
    ckan_parsed_url = urlparse.urlparse(ckan_url)
1✔
416
    same_domain = True if not parsed_url.hostname or parsed_url.hostname == ckan_parsed_url.hostname else False
1✔
417
    return same_domain
1✔
418

419
def make_url_relative(url):
1✔
420
    """
421
    Transforms something like http://testdomain.com/test to /test
422
    :param url: url to check whether it's on the same domain as ckan
423
    :type url: str
424
    :return: the new url as a string
425
    :rtype: str
426
    """
427
    parsed_url = urlparse.urlparse(url)
×
428
    return urlparse.urlunparse(('', '') + parsed_url[2:])
×
429

430
def generate_mandatory_fields():
1✔
431
    """
432

433
    :return: dataset dict with mandatory fields filled
434
    :rtype: dict
435
    """
436

437
    user = c.user or c.author
×
438

439
    # random_string = str(uuid.uuid4()).replace('-', '')[:8]
440
    # dataset_name = 'autogenerated-{}-{}'.format(user, random_string)
441

442
    selected_org = None
×
443
    orgs = h.organizations_available('create_dataset')
×
444
    if len(orgs) == 0:
×
445
        raise NoOrganization(_('The user needs to belong to at least 1 organisation'))
×
446
    else:
447
        selected_org = orgs[0]
×
448

449

450
    data_dict = {
×
451
        'private': True,
452
        # 'name': dataset_name,
453
        # 'title': dataset_name,
454
        'license_id': 'cc-by',
455
        'owner_org': selected_org.get('id'),
456
        'dataset_source': selected_org.get('title'),
457
        'maintainer': user,
458
        'subnational': 1,
459
        'data_update_frequency': config.get('hdx.default_frequency'),
460
        'dataset_preview_check': '1',
461
        'dataset_preview': vd._DATASET_PREVIEW_FIRST_RESOURCE,
462
        'dataset_preview_value': vd._DATASET_PREVIEW_FIRST_RESOURCE
463
    }
464
    return data_dict
×
465

466

467
def hdx_check_add_data():
1✔
468
    data_dict = {
1✔
469
        'href': '#',
470
        'onclick': 'contributeAddDetails(null, \'header\')',
471
        'data_module': 'hdx_click_stopper',
472
        'data_module_link_type': 'header add data',
473
    }
474

475
    context = {'model': model, 'session': model.Session,
1✔
476
                   'user': g.user, 'auth_user_obj': g.userobj,
477
                   'save': 'save' in request.params}
478
    dataset_dict = None
1✔
479
    try:
1✔
480
        _check_access('package_create', context, dataset_dict)
1✔
481
    except NotAuthorized:
1✔
482
        if g.userobj or g.user:
1✔
483
            data_dict['href'] = h.url_for('hdx_org_join.find_organisation') #'/dashboard/organizations'
×
484
            data_dict['onclick'] = ''
×
485
        else:
486
            data_dict['href'] = h.url_for('hdx_signin.login', info_message_type='add-data')
1✔
487
            data_dict['onclick'] = ''
1✔
488

489
    return data_dict
1✔
490

491

492
# def hdx_get_last_modification_date(dataset_dict):
493
#     return FreshnessCalculator.dataset_last_change_date(dataset_dict)
494

495

496
def hdx_get_due_overdue_date(dataset_dict, format='%b %-d %Y'):
1✔
497
    due_date = FreshnessCalculator(dataset_dict).read_due_overdue_dates()
×
498
    # if type == 'due':
499
    d = due_date
×
500
    # else:
501
    #     d = overdue_date
502

503
    if d:
×
504
        return d.strftime(format)
×
505
    else:
506
        return None
×
507

508

509
def hdx_render_resource_updated_date(resource_dict, package_dict):
1✔
510
    if package_dict.get('data_update_frequency') == UPDATE_FREQ_LIVE:
1✔
511
        return 'Live'
×
512
    else:
513
        return h.render_datetime(resource_dict.get('last_modified'))
1✔
514

515

516
def hdx_compute_analytics(package_dict):
1✔
517
    analytics_group_names, analytics_group_ids = analytics.extract_locations_in_json(package_dict)
1✔
518
    ret_dict = {
1✔
519
        'analytics_is_cod': analytics.is_cod(package_dict),
520
        'analytics_is_indicator': analytics.is_indicator(package_dict),
521
        'analytics_is_archived': analytics.is_archived(package_dict),
522
        'analytics_dataset_availability': analytics.dataset_availability(package_dict),
523
        'analytics_group_names': analytics_group_names, 'analytics_group_ids': analytics_group_ids
524
    }
525
    return ret_dict
1✔
526

527

528
def fetch_previous_resource_dict_with_context(context, package_id, resource_id):
1✔
529
    dataset_dict = fetch_previous_package_dict_with_context(context, package_id)
×
530
    return next((r for r in dataset_dict.get('resources', []) if r['id'] == resource_id), None)
×
531

532

533
def _create_prev_package_context_key(id):
1✔
534
    context_key = 'hdx_prev_package_dict_' + id
1✔
535
    return context_key
1✔
536

537

538
def fetch_previous_package_dict_with_context(context, id):
1✔
539
    if id:
1✔
540
        context_key = _create_prev_package_context_key(id)
1✔
541
        pkg_dict = context.get(context_key)
1✔
542
        if not pkg_dict:
1✔
543
            pkg_dict = get_action('package_show')(context, {'id': id})
1✔
544
            context[context_key] = pkg_dict
1✔
545

546
        return pkg_dict or {}
1✔
547
    else:
548
        return {}
×
549

550

551
def remove_previous_package_dict_from_context(context, id):
1✔
552
    if id:
1✔
553
        context_key = _create_prev_package_context_key(id)
1✔
554
        context.pop(context_key, None)
1✔
555

556
def get_utc_end_of_today():
1✔
557
    now = datetime.datetime.utcnow()  # Get current UTC time (naive datetime)
1✔
558
    end_of_day = datetime.datetime(now.year, now.month, now.day, 23, 59, 59)
1✔
559
    return end_of_day
1✔
560

561
def end_of_dataset_date(dataset_date):
1✔
562
    """
563
    Extracts the end date from dataset_date and returns a timezone-aware datetime object.
564

565
    :param dataset_date: dataset_date metadata.
566
    :type dataset_date: str
567
    :return: End date as a datetime object.
568
    :rtype: datetime.datetime
569
    """
570
    is_dataset_date_star = False
1✔
571
    if dataset_date:
1✔
572
        dataset_end_date = dataset_date.split(' TO ')[-1].strip('[]')
1✔
573
        if dataset_end_date == '*':
1✔
574
            dataset_end_date = get_utc_end_of_today()
1✔
575
            is_dataset_date_star = True
1✔
576
        else:
577
            dataset_end_date = dateutil.parser.parse(dataset_end_date)
1✔
578
    else:
579
        dataset_end_date = None
1✔
580
    return dataset_end_date, is_dataset_date_star
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc