• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.PluggableAuthService / 5303493172

pending completion
5303493172

push

github

web-flow
Drop support for Python 2.7, 3.5, 3.6. (#116)

* Drop zserver extra in setup.py. Thus dropping FTP support.
* Drop support for Zope < 5.
Co-authored-by: Jens Vagelpohl <jens@plyp.com>

1288 of 1745 branches covered (73.81%)

Branch coverage included in aggregate %.

127 of 127 new or added lines in 30 files covered. (100.0%)

9619 of 10349 relevant lines covered (92.95%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.06
/src/Products/PluggableAuthService/plugins/ZODBRoleManager.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
7
# distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
""" Classes: ZODBRoleManager
1✔
15
"""
16

17
import logging
1✔
18

19
from AccessControl import ClassSecurityInfo
1✔
20
from AccessControl.class_init import InitializeClass
1✔
21
from AccessControl.requestmethod import postonly
1✔
22
from Acquisition import aq_inner
1✔
23
from Acquisition import aq_parent
1✔
24
from BTrees.OOBTree import OOBTree
1✔
25
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
26
from zope.interface import Interface
1✔
27

28
from ..interfaces.plugins import IRoleAssignerPlugin
1✔
29
from ..interfaces.plugins import IRoleEnumerationPlugin
1✔
30
from ..interfaces.plugins import IRolesPlugin
1✔
31
from ..permissions import ManageUsers
1✔
32
from ..plugins.BasePlugin import BasePlugin
1✔
33
from ..utils import classImplements
1✔
34
from ..utils import csrf_only
1✔
35

36

37
LOG = logging.getLogger('PluggableAuthService')
1✔
38

39

40
class MultiplePrincipalError(Exception):
1✔
41
    pass
1✔
42

43

44
class IZODBRoleManager(Interface):
1✔
45
    """ Marker interface.
46
    """
47

48

49
manage_addZODBRoleManagerForm = PageTemplateFile(
1✔
50
    'www/zrAdd', globals(), __name__='manage_addZODBRoleManagerForm')
51

52

53
def addZODBRoleManager(dispatcher, id, title=None, REQUEST=None):
1✔
54
    """ Add a ZODBRoleManager to a Pluggable Auth Service. """
55

56
    zum = ZODBRoleManager(id, title)
1✔
57
    dispatcher._setObject(zum.getId(), zum)
1✔
58

59
    if REQUEST is not None:
1!
60
        REQUEST['RESPONSE'].redirect('%s/manage_workspace'
×
61
                                     '?manage_tabs_message='
62
                                     'ZODBRoleManager+added.' %
63
                                     dispatcher.absolute_url())
64

65

66
class ZODBRoleManager(BasePlugin):
1✔
67

68
    """ PAS plugin for managing roles in the ZODB.
69
    """
70
    meta_type = 'ZODB Role Manager'
1✔
71
    zmi_icon = 'fas fa-user-tag'
1✔
72

73
    security = ClassSecurityInfo()
1✔
74

75
    def __init__(self, id, title=None):
1✔
76

77
        self._id = self.id = id
1✔
78
        self.title = title
1✔
79

80
        self._roles = OOBTree()
1✔
81
        self._principal_roles = OOBTree()
1✔
82

83
    def manage_afterAdd(self, item, container):
1✔
84

85
        if item is self:
1!
86
            role_holder = aq_parent(aq_inner(container))
1✔
87
            for role in getattr(role_holder, '__ac_roles__', ()):
1✔
88
                try:
1✔
89
                    if role not in ('Anonymous', 'Authenticated'):
1✔
90
                        self.addRole(role)
1✔
91
                except KeyError:
×
92
                    pass
×
93

94
        if 'Manager' not in self._roles:
1✔
95
            self.addRole('Manager')
1✔
96

97
    #
98
    #   IRolesPlugin implementation
99
    #
100
    @security.private
1✔
101
    def getRolesForPrincipal(self, principal, request=None):
1✔
102
        """ See IRolesPlugin.
103
        """
104
        result = list(self._principal_roles.get(principal.getId(), ()))
1✔
105

106
        getGroups = getattr(principal, 'getGroups', lambda: ())
1✔
107
        for group_id in getGroups():
1✔
108
            result.extend(self._principal_roles.get(group_id, ()))
1✔
109

110
        return tuple(result)
1✔
111

112
    #
113
    #   IRoleEnumerationPlugin implementation
114
    #
115
    @security.private
1✔
116
    def enumerateRoles(self, id=None, exact_match=False, sort_by=None,
1✔
117
                       max_results=None, **kw):
118
        """ See IRoleEnumerationPlugin.
119
        """
120
        role_info = []
1✔
121
        role_ids = []
1✔
122
        plugin_id = self.getId()
1✔
123

124
        if isinstance(id, str):
1✔
125
            id = [id]
1✔
126

127
        if exact_match and (id):
1✔
128
            role_ids.extend(id)
1✔
129

130
        if role_ids:
1✔
131
            role_filter = None
1✔
132

133
        else:   # Searching
134
            role_ids = self.listRoleIds()
1✔
135
            role_filter = _ZODBRoleFilter(id, **kw)
1✔
136

137
        for role_id in role_ids:
1✔
138

139
            if self._roles.get(role_id):
1✔
140
                e_url = '%s/manage_roles' % self.getId()
1✔
141
                p_qs = 'role_id=%s' % role_id
1✔
142
                m_qs = 'role_id=%s&assign=1' % role_id
1✔
143

144
                info = {}
1✔
145
                info.update(self._roles[role_id])
1✔
146

147
                info['pluginid'] = plugin_id
1✔
148
                info['properties_url'] = f'{e_url}?{p_qs}'
1✔
149
                info['members_url'] = f'{e_url}?{m_qs}'
1✔
150

151
                if not role_filter or role_filter(info):
1✔
152
                    role_info.append(info)
1✔
153

154
        return tuple(role_info)
1✔
155

156
    #
157
    #   IRoleAssignerPlugin implementation
158
    #
159
    @security.private
1✔
160
    def doAssignRoleToPrincipal(self, principal_id, role):
1✔
161
        return self.assignRoleToPrincipal(role, principal_id)
1✔
162

163
    @security.private
1✔
164
    def doRemoveRoleFromPrincipal(self, principal_id, role):
1✔
165
        return self.removeRoleFromPrincipal(role, principal_id)
×
166

167
    #
168
    #   Role management API
169
    #
170
    @security.protected(ManageUsers)
1✔
171
    def listRoleIds(self):
1✔
172
        """ Return a list of the role IDs managed by this object.
173
        """
174
        return self._roles.keys()
1✔
175

176
    @security.protected(ManageUsers)
1✔
177
    def listRoleInfo(self):
1✔
178
        """ Return a list of the role mappings.
179
        """
180
        return self._roles.values()
1✔
181

182
    @security.protected(ManageUsers)
1✔
183
    def getRoleInfo(self, role_id):
1✔
184
        """ Return a role mapping.
185
        """
186
        return self._roles[role_id]
1✔
187

188
    @security.private
1✔
189
    def addRole(self, role_id, title='', description=''):
1✔
190
        """ Add 'role_id' to the list of roles managed by this object.
191

192
        o Raise KeyError on duplicate.
193
        """
194
        if self._roles.get(role_id) is not None:
1✔
195
            raise KeyError('Duplicate role: %s' % role_id)
1✔
196

197
        self._roles[role_id] = {'id': role_id, 'title': title,
1✔
198
                                'description': description}
199

200
    @security.private
1✔
201
    def updateRole(self, role_id, title, description):
1✔
202
        """ Update title and description for the role.
203

204
        o Raise KeyError if not found.
205
        """
206
        self._roles[role_id].update({'title': title,
1✔
207
                                     'description': description})
208

209
    @security.private
1✔
210
    def removeRole(self, role_id, REQUEST=None):
1✔
211
        """ Remove 'role_id' from the list of roles managed by this object.
212

213
        o Raise KeyError if not found.
214

215
        Note that if you really want to remove a role you should first
216
        remove it from the roles in the root of the site (at the
217
        bottom of the Security tab at manage_access).
218
        """
219
        for principal_id in self._principal_roles.keys():
1✔
220
            self.removeRoleFromPrincipal(role_id, principal_id)
1✔
221

222
        del self._roles[role_id]
1✔
223

224
    #
225
    #   Role assignment API
226
    #
227
    @security.protected(ManageUsers)
1✔
228
    def listAvailablePrincipals(self, role_id, search_id):
1✔
229
        """ Return a list of principal IDs to whom a role can be assigned.
230

231
        o If supplied, 'search_id' constrains the principal IDs;  if not,
232
          return empty list.
233

234
        o Omit principals with existing assignments.
235
        """
236
        result = []
×
237

238
        if search_id:  # don't bother searching if no criteria
×
239

240
            parent = aq_parent(self)
×
241

242
            for info in parent.searchPrincipals(max_results=20,
×
243
                                                sort_by='id',
244
                                                id=search_id,
245
                                                exact_match=False):
246
                id = info['id']
×
247
                title = info.get('title', id)
×
248
                if role_id not in self._principal_roles.get(id, ()) and \
×
249
                        role_id != id:
250
                    result.append((id, title))
×
251

252
        return result
×
253

254
    @security.protected(ManageUsers)
1✔
255
    def listAssignedPrincipals(self, role_id):
1✔
256
        """ Return a list of principal IDs to whom a role is assigned.
257
        """
258
        result = []
1✔
259

260
        for k, v in self._principal_roles.items():
1✔
261
            if role_id in v:
1✔
262
                # should be at most one and only one mapping to 'k'
263

264
                parent = aq_parent(self)
1✔
265
                info = parent.searchPrincipals(id=k, exact_match=True)
1✔
266

267
                if len(info) > 1:
1✔
268
                    message = ('Multiple groups or users exist with the '
1✔
269
                               'name "%s". Remove one of the duplicate groups '
270
                               'or users.' % (k))
271
                    LOG.error(message)
1✔
272
                    raise MultiplePrincipalError(message)
1✔
273

274
                if len(info) == 0:
1✔
275
                    title = '<%s: not found>' % k
1✔
276
                else:
277
                    title = info[0].get('title', k)
1✔
278
                result.append((k, title))
1✔
279

280
        return result
1✔
281

282
    @security.private
1✔
283
    def assignRoleToPrincipal(self, role_id, principal_id):
1✔
284
        """ Assign a role to a principal (user or group).
285

286
        o Return a boolean indicating whether a new assignment was created.
287

288
        o Raise KeyError if 'role_id' is unknown.
289
        """
290
        # raise KeyError if unknown!
291
        role_info = self._roles[role_id]  # noqa
1✔
292

293
        current = self._principal_roles.get(principal_id, ())
1✔
294
        already = role_id in current
1✔
295

296
        if not already:
1✔
297
            new = current + (role_id,)
1✔
298
            self._principal_roles[principal_id] = new
1✔
299
            self._invalidatePrincipalCache(principal_id)
1✔
300

301
        return not already
1✔
302

303
    @security.private
1✔
304
    def removeRoleFromPrincipal(self, role_id, principal_id):
1✔
305
        """ Remove a role from a principal (user or group).
306

307
        o Return a boolean indicating whether the role was already present.
308

309
        o Raise KeyError if 'role_id' is unknown.
310

311
        o Ignore requests to remove a role not already assigned to the
312
          principal.
313
        """
314
        # raise KeyError if unknown!
315
        role_info = self._roles[role_id]  # noqa
1✔
316

317
        current = self._principal_roles.get(principal_id, ())
1✔
318
        new = tuple([x for x in current if x != role_id])
1✔
319
        already = current != new
1✔
320

321
        if already:
1✔
322
            self._principal_roles[principal_id] = new
1✔
323
            self._invalidatePrincipalCache(principal_id)
1✔
324

325
        return already
1✔
326

327
    #
328
    #   ZMI
329
    #
330
    manage_options = (({'label': 'Roles', 'action': 'manage_roles'},)
1✔
331
                      + BasePlugin.manage_options)
332

333
    security.declareProtected(ManageUsers, 'manage_roles')  # NOQA: D001
1✔
334
    manage_roles = PageTemplateFile('www/zrRoles', globals(),
1✔
335
                                    __name__='manage_roles')
336

337
    security.declareProtected(ManageUsers, 'manage_twoLists')  # NOQA: D001
1✔
338
    manage_twoLists = PageTemplateFile('../www/two_lists', globals(),
1✔
339
                                       __name__='manage_twoLists')
340

341
    @security.protected(ManageUsers)
1✔
342
    @csrf_only
1✔
343
    @postonly
1✔
344
    def manage_addRole(self, role_id, title, description, RESPONSE=None,
1✔
345
                       REQUEST=None):
346
        """ Add a role via the ZMI.
347
        """
348
        if not role_id:
×
349
            message = 'Please+provide+a+Role+ID'
×
350
        else:
351
            self.addRole(role_id, title, description)
×
352
            message = 'Role+added'
×
353

354
        if RESPONSE is not None:
×
355
            RESPONSE.redirect('%s/manage_roles?manage_tabs_message=%s' %
×
356
                              (self.absolute_url(), message))
357

358
    @security.protected(ManageUsers)
1✔
359
    @csrf_only
1✔
360
    @postonly
1✔
361
    def manage_updateRole(self, role_id, title, description, RESPONSE=None,
1✔
362
                          REQUEST=None):
363
        """ Update a role via the ZMI.
364
        """
365
        self.updateRole(role_id, title, description)
×
366

367
        message = 'Role+updated'
×
368

369
        if RESPONSE is not None:
×
370
            RESPONSE.redirect('%s/manage_roles?role_id=%s&'
×
371
                              'manage_tabs_message=%s' %
372
                              (self.absolute_url(), role_id, message))
373

374
    @security.protected(ManageUsers)
1✔
375
    @csrf_only
1✔
376
    @postonly
1✔
377
    def manage_removeRoles(self, role_ids, RESPONSE=None, REQUEST=None):
1✔
378
        """ Remove one or more role assignments via the ZMI.
379

380
        Note that if you really want to remove a role you should first
381
        remove it from the roles in the root of the site (at the
382
        bottom of the Security tab at manage_access).
383
        """
384
        role_ids = [_f for _f in role_ids if _f]
1✔
385

386
        if not role_ids:
1!
387
            message = 'no+roles+selected'
×
388

389
        else:
390

391
            for role_id in role_ids:
1✔
392
                self.removeRole(role_id)
1✔
393

394
            message = 'Role+assignments+removed'
1✔
395

396
        if RESPONSE is not None:
1!
397
            RESPONSE.redirect('%s/manage_roles?manage_tabs_message=%s' %
1✔
398
                              (self.absolute_url(), message))
399

400
    @security.protected(ManageUsers)
1✔
401
    @csrf_only
1✔
402
    @postonly
1✔
403
    def manage_assignRoleToPrincipals(self, role_id, principal_ids,
1✔
404
                                      RESPONSE, REQUEST=None):
405
        """ Assign a role to one or more principals via the ZMI.
406
        """
407
        assigned = []
1✔
408

409
        for principal_id in principal_ids:
1✔
410
            if self.assignRoleToPrincipal(role_id, principal_id):
1!
411
                assigned.append(principal_id)
1✔
412

413
        if not assigned:
1!
414
            message = 'Role+%s+already+assigned+to+all+principals' % role_id
×
415
        else:
416
            message = 'Role+{}+assigned+to+{}'.format(
1✔
417
                role_id, '+'.join(assigned))
418

419
        if RESPONSE is not None:
1!
420
            RESPONSE.redirect('%s/manage_roles?role_id=%s&assign=1'
1✔
421
                              '&manage_tabs_message=%s' %
422
                              (self.absolute_url(), role_id, message))
423

424
    @security.protected(ManageUsers)
1✔
425
    @csrf_only
1✔
426
    @postonly
1✔
427
    def manage_removeRoleFromPrincipals(self, role_id, principal_ids,
1✔
428
                                        RESPONSE=None, REQUEST=None):
429
        """ Remove a role from one or more principals via the ZMI.
430
        """
431
        removed = []
1✔
432

433
        for principal_id in principal_ids:
1✔
434
            if self.removeRoleFromPrincipal(role_id, principal_id):
1!
435
                removed.append(principal_id)
×
436

437
        if not removed:
1!
438
            message = 'Role+%s+alread+removed+from+all+principals' % role_id
1✔
439
        else:
440
            message = 'Role+{}+removed+from+{}'.format(
×
441
                role_id, '+'.join(removed))
442

443
        if RESPONSE is not None:
1!
444
            RESPONSE.redirect('%s/manage_roles?role_id=%s&assign=1'
1✔
445
                              '&manage_tabs_message=%s' %
446
                              (self.absolute_url(), role_id, message))
447

448

449
classImplements(ZODBRoleManager, IZODBRoleManager, IRolesPlugin,
1✔
450
                IRoleEnumerationPlugin, IRoleAssignerPlugin)
451

452

453
InitializeClass(ZODBRoleManager)
1✔
454

455

456
class _ZODBRoleFilter:
1✔
457

458
    def __init__(self, id=None, **kw):
1✔
459

460
        self._filter_ids = id
1✔
461

462
    def __call__(self, role_info):
1✔
463

464
        if self._filter_ids:
1✔
465

466
            key = 'id'
1✔
467

468
        else:
469
            return 1  # ???:  try using 'kw'
1✔
470

471
        value = role_info.get(key)
1✔
472

473
        if not value:
1!
474
            return False
×
475

476
        for id in self._filter_ids:
1✔
477
            if value.find(id) >= 0:
1✔
478
                return 1
1✔
479

480
        return False
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc