• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.PluggableAuthService / 5235276870

pending completion
5235276870

push

github

web-flow
Drop support for Python 2.7, 3.5, 3.6. (#116)

* Drop zserver extra in setup.py. Thus dropping FTP support.
* Drop support for Zope < 5.
Co-authored-by: Jens Vagelpohl <jens@plyp.com>

1288 of 1745 branches covered (73.81%)

Branch coverage included in aggregate %.

127 of 127 new or added lines in 30 files covered. (100.0%)

9619 of 10349 relevant lines covered (92.95%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/src/Products/PluggableAuthService/plugins/ZODBGroupManager.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
7
# distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
""" Classes: ZODBGroupManager
1✔
15
"""
16

17

18
from AccessControl import ClassSecurityInfo
1✔
19
from AccessControl.class_init import InitializeClass
1✔
20
from AccessControl.requestmethod import postonly
1✔
21
from Acquisition import aq_parent
1✔
22
from BTrees.OOBTree import OOBTree
1✔
23
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
24
from zope.event import notify
1✔
25
from zope.interface import Interface
1✔
26

27
from ..events import GroupCreated
1✔
28
from ..events import PrincipalAddedToGroup
1✔
29
from ..events import PrincipalRemovedFromGroup
1✔
30
from ..interfaces.plugins import IGroupEnumerationPlugin
1✔
31
from ..interfaces.plugins import IGroupsPlugin
1✔
32
from ..permissions import ManageGroups
1✔
33
from ..plugins.BasePlugin import BasePlugin
1✔
34
from ..utils import classImplements
1✔
35
from ..utils import csrf_only
1✔
36

37

38
class IZODBGroupManager(Interface):
1✔
39
    """ Marker interface.
40
    """
41

42

43
manage_addZODBGroupManagerForm = PageTemplateFile(
1✔
44
    'www/zgAdd', globals(), __name__='manage_addZODBGroupManagerForm')
45

46

47
def addZODBGroupManager(dispatcher, id, title=None, REQUEST=None):
1✔
48
    """ Add a ZODBGroupManager to a Pluggable Auth Service. """
49

50
    zgm = ZODBGroupManager(id, title)
1✔
51
    dispatcher._setObject(zgm.getId(), zgm)
1✔
52

53
    if REQUEST is not None:
1!
54
        REQUEST['RESPONSE'].redirect('%s/manage_workspace'
×
55
                                     '?manage_tabs_message='
56
                                     'ZODBGroupManager+added.' %
57
                                     dispatcher.absolute_url())
58

59

60
class ZODBGroupManager(BasePlugin):
1✔
61

62
    """ PAS plugin for managing groups, and groups of groups in the ZODB
63
    """
64
    meta_type = 'ZODB Group Manager'
1✔
65
    zmi_icon = 'fas fa-users'
1✔
66

67
    security = ClassSecurityInfo()
1✔
68

69
    def __init__(self, id, title=None):
1✔
70

71
        self._id = self.id = id
1✔
72
        self.title = title
1✔
73
        self._groups = OOBTree()
1✔
74
        self._principal_groups = OOBTree()
1✔
75

76
    #
77
    #   IGroupEnumerationPlugin implementation
78
    #
79
    @security.private
1✔
80
    def enumerateGroups(self, id=None, title=None, exact_match=False,
1✔
81
                        sort_by=None, max_results=None, **kw):
82
        """ See IGroupEnumerationPlugin.
83
        """
84
        group_info = []
1✔
85
        group_ids = []
1✔
86
        plugin_id = self.getId()
1✔
87

88
        if isinstance(id, str):
1✔
89
            id = [id]
1✔
90

91
        if isinstance(title, str):
1!
92
            title = [title]
×
93

94
        if exact_match and (id or title):
1✔
95

96
            if id:
1!
97
                group_ids.extend(id)
1✔
98
            elif title:
×
99
                group_ids.extend(title)
×
100

101
        if group_ids:
1✔
102
            group_filter = None
1✔
103

104
        else:   # Searching
105
            group_ids = self.listGroupIds()
1✔
106
            group_filter = _ZODBGroupFilter(id, title, **kw)
1✔
107

108
        for group_id in group_ids:
1✔
109

110
            if self._groups.get(group_id, None):
1✔
111
                e_url = '%s/manage_groups' % self.getId()
1✔
112
                p_qs = 'group_id=%s' % group_id
1✔
113
                m_qs = 'group_id=%s&assign=1' % group_id
1✔
114

115
                info = {}
1✔
116
                info.update(self._groups[group_id])
1✔
117

118
                info['pluginid'] = plugin_id
1✔
119
                info['properties_url'] = f'{e_url}?{p_qs}'
1✔
120
                info['members_url'] = f'{e_url}?{m_qs}'
1✔
121

122
                info['id'] = '{}{}'.format(self.prefix, info['id'])
1✔
123

124
                if not group_filter or group_filter(info):
1✔
125
                    group_info.append(info)
1✔
126

127
        return tuple(group_info)
1✔
128

129
    #
130
    #   IGroupsPlugin implementation
131
    #
132
    @security.private
1✔
133
    def getGroupsForPrincipal(self, principal, request=None):
1✔
134
        """ See IGroupsPlugin.
135
        """
136
        unadorned = self._principal_groups.get(principal.getId(), ())
1✔
137
        return tuple(['{}{}'.format(self.prefix, x) for x in unadorned])
1✔
138

139
    #
140
    #   (notional)IZODBGroupManager interface
141
    #
142
    @security.protected(ManageGroups)
1✔
143
    def listGroupIds(self):
1✔
144
        """ -> (group_id_1, ... group_id_n)
145
        """
146
        return self._groups.keys()
1✔
147

148
    @security.protected(ManageGroups)
1✔
149
    def listGroupInfo(self):
1✔
150
        """ -> (dict, ...dict)
151

152
        o Return one mapping per group, with the following keys:
153

154
          - 'id'
155
        """
156
        return self._groups.values()
1✔
157

158
    @security.protected(ManageGroups)
1✔
159
    def getGroupInfo(self, group_id):
1✔
160
        """ group_id -> dict
161
        """
162
        return self._groups[group_id]
1✔
163

164
    @security.private
1✔
165
    def addGroup(self, group_id, title=None, description=None):
1✔
166
        """ Add 'group_id' to the list of groups managed by this object.
167

168
        o Raise KeyError on duplicate.
169
        """
170
        if self._groups.get(group_id) is not None:
1✔
171
            raise KeyError('Duplicate group ID: %s' % group_id)
1✔
172

173
        self._groups[group_id] = {'id': group_id, 'title': title,
1✔
174
                                  'description': description}
175
        notify(GroupCreated(group_id, self))
1✔
176

177
    @security.private
1✔
178
    def updateGroup(self, group_id, title=None, description=None):
1✔
179
        """ Update properties for 'group_id'
180

181
        o Raise KeyError if group_id doesn't already exist.
182
        """
183
        if title is not None:
1✔
184
            self._groups[group_id]['title'] = title
1✔
185
        if description is not None:
1✔
186
            self._groups[group_id]['description'] = description
1✔
187
        self._groups[group_id] = self._groups[group_id]
1✔
188

189
    @security.private
1✔
190
    def removeGroup(self, group_id):
1✔
191
        """ Remove 'role_id' from the list of roles managed by this
192
            object, removing assigned members from it before doing so.
193

194
        o Raise KeyError if 'group_id' doesn't already exist.
195
        """
196
        for principal_id in self._principal_groups.keys():
1✔
197
            self.removePrincipalFromGroup(principal_id, group_id)
1✔
198
        del self._groups[group_id]
1✔
199

200
    #
201
    #   Group assignment API
202
    #
203
    @security.protected(ManageGroups)
1✔
204
    def listAvailablePrincipals(self, group_id, search_id):
1✔
205
        """ Return a list of principal IDs to that can belong to the group.
206

207
        o If supplied, 'search_id' constrains the principal IDs;  if not,
208
          return empty list.
209

210
        o Omit principals with existing assignments.
211
        """
212
        result = []
×
213

214
        if search_id:  # don't bother searching if no criteria
×
215

216
            parent = aq_parent(self)
×
217

218
            for info in parent.searchPrincipals(max_results=20, sort_by='id',
×
219
                                                id=search_id,
220
                                                exact_match=False):
221
                id = info['id']
×
222
                title = info.get('title', id)
×
223
                if group_id not in self._principal_groups.get(id, ()) \
×
224
                        and group_id != id:
225
                    result.append((id, title))
×
226

227
        return result
×
228

229
    @security.protected(ManageGroups)
1✔
230
    def listAssignedPrincipals(self, group_id):
1✔
231
        """ Return a list of principal IDs belonging to a group.
232
        """
233
        result = []
1✔
234

235
        for k, v in self._principal_groups.items():
1✔
236
            if group_id in v:
1!
237
                parent = aq_parent(self)
1✔
238
                info = parent.searchPrincipals(id=k, exact_match=True)
1✔
239
                if len(info) == 0:
1✔
240
                    title = '<%s: not found>' % k
1✔
241
                else:
242
                    # always use the title of the first principal found
243
                    title = info[0].get('title', k)
1✔
244
                result.append((k, title))
1✔
245

246
        return result
1✔
247

248
    @security.private
1✔
249
    def addPrincipalToGroup(self, principal_id, group_id):
1✔
250
        """ Add a principal to a group.
251

252
        o Return a boolean indicating whether a new assignment was created.
253

254
        o Raise KeyError if 'group_id' is unknown.
255
        """
256
        # raise KeyError if unknown!
257
        group_info = self._groups[group_id]  # noqa
1✔
258

259
        current = self._principal_groups.get(principal_id, ())
1✔
260
        already = group_id in current
1✔
261

262
        if not already:
1!
263
            new = current + (group_id,)
1✔
264
            self._principal_groups[principal_id] = new
1✔
265
            self._invalidatePrincipalCache(principal_id)
1✔
266
            notify(PrincipalAddedToGroup(principal_id, group_id))
1✔
267

268
        return not already
1✔
269

270
    @security.private
1✔
271
    def removePrincipalFromGroup(self, principal_id, group_id):
1✔
272
        """ Remove a prinicpal from from a group.
273

274
        o Return a boolean indicating whether the principal was already
275
          a member of the group.
276

277
        o Raise KeyError if 'group_id' is unknown.
278

279
        o Ignore requests to remove a principal if not already a member
280
          of the group.
281
        """
282
        # raise KeyError if unknown!
283
        group_info = self._groups[group_id]  # noqa
1✔
284

285
        current = self._principal_groups.get(principal_id, ())
1✔
286
        new = tuple([x for x in current if x != group_id])
1✔
287
        already = current != new
1✔
288

289
        if already:
1✔
290
            self._principal_groups[principal_id] = new
1✔
291
            self._invalidatePrincipalCache(principal_id)
1✔
292
            notify(PrincipalRemovedFromGroup(principal_id, group_id))
1✔
293

294
        return already
1✔
295

296
    #
297
    #   ZMI
298
    #
299
    manage_options = (({'label': 'Groups', 'action': 'manage_groups'},)
1✔
300
                      + BasePlugin.manage_options)
301

302
    security.declarePublic('manage_widgets')  # NOQA: D001
1✔
303
    manage_widgets = PageTemplateFile('www/zuWidgets', globals(),
1✔
304
                                      __name__='manage_widgets')
305

306
    security.declareProtected(ManageGroups, 'manage_groups')  # NOQA: D001
1✔
307
    manage_groups = PageTemplateFile('www/zgGroups', globals(),
1✔
308
                                     __name__='manage_groups')
309

310
    security.declareProtected(ManageGroups, 'manage_twoLists')  # NOQA: D001
1✔
311
    manage_twoLists = PageTemplateFile('../www/two_lists', globals(),
1✔
312
                                       __name__='manage_twoLists')
313

314
    @security.protected(ManageGroups)
1✔
315
    def manage_addGroup(self, group_id, title=None, description=None,
1✔
316
                        RESPONSE=None):
317
        """ Add a group via the ZMI.
318
        """
319
        if not group_id:
×
320
            message = 'Please+provide+a+Group+ID'
×
321
        else:
322
            self.addGroup(group_id, title, description)
×
323
            message = 'Group+added'
×
324

325
        if RESPONSE is not None:
×
326
            RESPONSE.redirect('%s/manage_groups?manage_tabs_message=%s' %
×
327
                              (self.absolute_url(), message))
328

329
    @security.protected(ManageGroups)
1✔
330
    def manage_updateGroup(self, group_id, title, description, RESPONSE=None):
1✔
331
        """ Update a group via the ZMI.
332
        """
333
        self.updateGroup(group_id, title, description)
×
334

335
        message = 'Group+updated'
×
336

337
        if RESPONSE is not None:
×
338
            RESPONSE.redirect('%s/manage_groups?manage_tabs_message=%s' %
×
339
                              (self.absolute_url(), message))
340

341
    @security.protected(ManageGroups)
1✔
342
    @csrf_only
1✔
343
    @postonly
1✔
344
    def manage_removeGroups(self, group_ids, RESPONSE=None, REQUEST=None):
1✔
345
        """ Remove one or more groups via the ZMI.
346
        """
347
        group_ids = [_f for _f in group_ids if _f]
1✔
348

349
        if not group_ids:
1!
350
            message = 'no+groups+selected'
×
351

352
        else:
353

354
            for group_id in group_ids:
1✔
355
                self.removeGroup(group_id)
1✔
356

357
            message = 'Groups+removed'
1✔
358

359
        if RESPONSE is not None:
1!
360
            RESPONSE.redirect('%s/manage_groups?manage_tabs_message=%s' %
×
361
                              (self.absolute_url(), message))
362

363
    @security.protected(ManageGroups)
1✔
364
    @csrf_only
1✔
365
    @postonly
1✔
366
    def manage_addPrincipalsToGroup(self, group_id, principal_ids,
1✔
367
                                    RESPONSE=None, REQUEST=None):
368
        """ Add one or more principals to a group via the ZMI.
369
        """
370
        assigned = []
1✔
371

372
        for principal_id in principal_ids:
1✔
373
            if self.addPrincipalToGroup(principal_id, group_id):
1!
374
                assigned.append(principal_id)
1✔
375

376
        if not assigned:
1!
377
            message = 'Principals+already+members+of+%s' % group_id
×
378
        else:
379
            message = '{}+added+to+{}'.format('+'.join(assigned), group_id)
1✔
380

381
        if RESPONSE is not None:
1!
382
            RESPONSE.redirect('%s/manage_groups?group_id=%s&assign=1'
×
383
                              '&manage_tabs_message=%s' %
384
                              (self.absolute_url(), group_id, message))
385

386
    @security.protected(ManageGroups)
1✔
387
    @csrf_only
1✔
388
    @postonly
1✔
389
    def manage_removePrincipalsFromGroup(self, group_id, principal_ids,
1✔
390
                                         RESPONSE=None, REQUEST=None):
391
        """ Remove one or more principals from a group via the ZMI.
392
        """
393
        removed = []
1✔
394

395
        for principal_id in principal_ids:
1✔
396
            if self.removePrincipalFromGroup(principal_id, group_id):
1!
397
                removed.append(principal_id)
×
398

399
        if not removed:
1!
400
            message = 'Principals+not+in+group+%s' % group_id
1✔
401
        else:
402
            message = 'Principals+{}+removed+from+{}'.format(
×
403
                '+'.join(removed), group_id)
404

405
        if RESPONSE is not None:
1!
406
            RESPONSE.redirect('%s/manage_groups?group_id=%s&assign=1'
×
407
                              '&manage_tabs_message=%s' %
408
                              (self.absolute_url(), group_id, message))
409

410

411
classImplements(ZODBGroupManager, IZODBGroupManager, IGroupEnumerationPlugin,
1✔
412
                IGroupsPlugin)
413

414

415
InitializeClass(ZODBGroupManager)
1✔
416

417

418
class _ZODBGroupFilter:
1✔
419

420
    def __init__(self, id=None, title=None, **kw):
1✔
421

422
        self._filter_ids = id
1✔
423
        self._filter_titles = title
1✔
424

425
    def __call__(self, group_info):
1✔
426

427
        if self._filter_ids:
1✔
428

429
            key = 'id'
1✔
430
            to_test = self._filter_ids
1✔
431

432
        elif self._filter_titles:
1!
433

434
            key = 'title'
×
435
            to_test = self._filter_titles
×
436

437
        else:
438
            return 1  # ???:  try using 'kw'
1✔
439

440
        value = group_info.get(key)
1✔
441

442
        if not value:
1!
443
            return 0
×
444

445
        for contained in to_test:
1✔
446
            if value.lower().find(contained.lower()) >= 0:
1✔
447
                return 1
1✔
448

449
        return 0
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc