• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #5187

07 Apr 2024 07:31AM UTC coverage: 70.796% (-0.003%) from 70.799%
#5187

Pull #6242

coveralls-python

aalecs
HDX-9478

Merge autocomplete functionality in single endpoint
Member search no longer searching by email
Pull Request #6242: HDX-9478 - Inviting user to an org doesn't autocomplete when using email address

1 of 3 new or added lines in 1 file covered. (33.33%)

1 existing line in 1 file now uncovered.

11081 of 15652 relevant lines covered (70.8%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.4
/ckanext-hdx_users/ckanext/hdx_users/actions/get.py
1
import logging
1✔
2

3
import ckan.logic as logic
1✔
4
import ckan.logic.action.get as user_get
1✔
5
import ckan.plugins.toolkit as tk
1✔
6
import ckanext.hdx_users.model as user_model
1✔
7
from ckan.types import ActionResult, Context, DataDict
1✔
8

9
config = tk.config
1✔
10
log = logging.getLogger(__name__)
1✔
11
_check_access = tk.check_access
1✔
12
NotFound = tk.ObjectNotFound
1✔
13
get_action = tk.get_action
1✔
14
NoOfLocs = 5
1✔
15
NoOfOrgs = 5
1✔
16
c_nepal_earthquake = config.get('hdx.crisis.nepal_earthquake')
1✔
17

18

19
@tk.side_effect_free
1✔
20
def onboarding_followee_list(context, data_dict):
1✔
21
    # TODO check if user is following org&locs
22
    result = []
×
23

24
    # data_filter = {'package_count': True, 'include_extras': True, 'all_fields': True, 'sort': 'package_count desc'}
25

26
    locs = get_action('cached_group_list')(context, {})
×
27
    locs = sorted(locs, key=lambda elem: elem.get('package_count', 0), reverse=True)
×
28

29
    result_aux = []
×
30
    i = 1
×
31
    for item in locs:
×
32
        type = 'location'
×
33
        if item.get('activity_level') == 'active':
×
34
            if item['name'] == c_nepal_earthquake:
×
35
                type = 'crisis'
×
36
            result.append(create_item(item, type, False))
×
37
        else:
38
            if i <= NoOfLocs:
×
39
                result_aux.append(create_item(item, type, False))
×
40
                i += 1
×
41

42
    orgs = get_action('cached_organization_list')({}, {})
×
43
    orgs = sorted(orgs, key=lambda elem: elem.get('package_count', 0), reverse=True)
×
44

45
    i = 1
×
46
    type = 'organization'
×
47
    for item in orgs:
×
48
        if item.get('custom_org', '0') == '1':
×
49
            result.append(create_item(item, type, False))
×
50
        else:
51
            if i <= NoOfOrgs:
×
52
                result_aux.append(create_item(item, type, False))
×
53
                i += 1
×
54

55
    result.extend(result_aux)
×
56
    return result
×
57

58

59
def create_item(item, type, follow=False):
1✔
60
    return {'id': item['id'], 'name': item['name'], 'display_name': item['display_name'], 'type': type,
×
61
            'follow': follow}
62

63

64
@logic.validate(logic.schema.default_autocomplete_schema)
1✔
65
def hdx_user_autocomplete(context, data_dict):
1✔
66
    '''Return a list of user names that contain a string.
67

68
    :param q: the string to search for
69
    :type q: string
70
    :param limit: the maximum number of user names to return (optional,
71
        default: 20)
72
    :type limit: int
73

74
    :rtype: a list of user dictionaries each with keys ``'name'``,
75
        ``'fullname'``, and ``'id'``
76

77
    '''
78
    model = context['model']
×
79
    user = context['user']
×
80

81
    _check_access('user_autocomplete', context, data_dict)
×
82

83
    q = data_dict['q']
×
84
    if data_dict['__extras']:
×
85
        org = data_dict['__extras']['org']
×
86
    limit = data_dict.get('limit', 20)
×
87
    ignore_self = data_dict.get('ignore_self', False)
×
88

89
    query = model.User.search(q).order_by(None)
×
90
    query = query.filter(model.User.state == model.State.ACTIVE)
×
91
    if ignore_self:
×
92
        query = query.filter(model.User.name != user)
×
93

94
    if org:
×
95
        query1 = query.filter(model.User.id == model.Member.table_id) \
×
96
            .filter(model.Member.table_name == "user") \
97
            .filter(model.Member.group_id == model.Group.id) \
98
            .filter((model.Group.name == org) | (model.Group.id == org)) \
99
            .filter(model.Member.state == model.State.ACTIVE)
100

101
        # needed for maintainer to display the sysadmins too (#HDX-5554)
102
        query2 = query.filter((model.User.sysadmin == True))
×
103
        query3 = query2.union(query1)
×
104

105
        # query3 = union(query1,query2)
106

107
        query3 = query3.limit(limit)
×
NEW
108
        query = query3
×
109

110
    user_list = []
×
NEW
111
    for user in query.all():
×
112
        result_dict = {}
×
113
        for k in ['id', 'name', 'fullname']:
×
114
            result_dict[k] = getattr(user, k)
×
UNCOV
115
        user_list.append(result_dict)
×
116

117
    return user_list
×
118

119
@tk.side_effect_free
1✔
120
def hdx_user_fullname_show(context, data_dict):
1✔
121
    if 'id' not in data_dict:
×
122
        raise NotFound("Id not provided")
×
123

124
    _check_access('user_show', context, data_dict)
×
125

126
    user_id = data_dict.get('id')
×
127
    user_dict = {'id': user_id}
×
128
    _set_user_names(context, user_dict)
×
129
    return user_dict
×
130

131

132
def _set_user_names(context, user_dict):
1✔
133
    if user_dict and 'id' in user_dict:
1✔
134
        try:
1✔
135
            first_name = get_action('user_extra_value_by_key_show')(context,
1✔
136
                                                                    {'user_id': user_dict.get('id'),
137
                                                                     'key': user_model.HDX_FIRST_NAME})
138
            user_dict['firstname'] = first_name.get(user_model.HDX_FIRST_NAME)
1✔
139
        except Exception as ex:
1✔
140
            user_dict['firstname'] = None
1✔
141
        try:
1✔
142
            last_name = get_action('user_extra_value_by_key_show')(context,
1✔
143
                                                                   {'user_id': user_dict.get('id'),
144
                                                                    'key': user_model.HDX_LAST_NAME})
145
            user_dict['lastname'] = last_name.get(user_model.HDX_LAST_NAME)
1✔
146
        except Exception as ex:
1✔
147
            user_dict['lastname'] = None
1✔
148
    return user_dict
1✔
149

150

151
@tk.side_effect_free
1✔
152
def user_show(context, data_dict):
1✔
153
    user_dict = user_get.user_show(context, data_dict)
1✔
154
    if user_dict:
1✔
155
        _set_user_names(context, user_dict)
1✔
156
    return user_dict
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc