• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.PluggableAuthService / 5303493172

pending completion
5303493172

push

github

web-flow
Drop support for Python 2.7, 3.5, 3.6. (#116)

* Drop zserver extra in setup.py. Thus dropping FTP support.
* Drop support for Zope < 5.
Co-authored-by: Jens Vagelpohl <jens@plyp.com>

1288 of 1745 branches covered (73.81%)

Branch coverage included in aggregate %.

127 of 127 new or added lines in 30 files covered. (100.0%)

9619 of 10349 relevant lines covered (92.95%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.32
/src/Products/PluggableAuthService/PluggableAuthService.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
7
# distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
""" Classes: PluggableAuthService
1✔
15
"""
16
import logging
1✔
17

18
from AccessControl import ClassSecurityInfo
1✔
19
from AccessControl import ModuleSecurityInfo
1✔
20
from AccessControl.class_init import InitializeClass
1✔
21
from AccessControl.Permissions import manage_users as ManageUsers
1✔
22
from AccessControl.SecurityManagement import getSecurityManager
1✔
23
from AccessControl.SecurityManagement import newSecurityManager
1✔
24
from AccessControl.SecurityManagement import noSecurityManager
1✔
25
from AccessControl.users import emergency_user
1✔
26
from AccessControl.users import nobody
1✔
27
from Acquisition import Implicit
1✔
28
from Acquisition import aq_base
1✔
29
from Acquisition import aq_inner
1✔
30
from Acquisition import aq_parent
1✔
31
from OFS.Cache import Cacheable
1✔
32
from OFS.Folder import Folder
1✔
33
from OFS.interfaces import IObjectManager
1✔
34
from OFS.interfaces import IPropertyManager
1✔
35
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
36
from zExceptions import Unauthorized
1✔
37
from zope.event import notify
1✔
38
from ZPublisher import BeforeTraverse
1✔
39
from ZTUtils import Batch
1✔
40

41
from Products.PluginRegistry.PluginRegistry import PluginRegistry
1✔
42
from Products.StandardCacheManagers.RAMCacheManager import RAMCacheManager
1✔
43

44
from .events import PrincipalCreated
1✔
45
from .interfaces.authservice import IPluggableAuthService
1✔
46
from .interfaces.authservice import _noroles
1✔
47
from .interfaces.plugins import IAnonymousUserFactoryPlugin
1✔
48
from .interfaces.plugins import IAuthenticationPlugin
1✔
49
from .interfaces.plugins import IChallengePlugin
1✔
50
from .interfaces.plugins import IChallengeProtocolChooser
1✔
51
from .interfaces.plugins import ICredentialsResetPlugin
1✔
52
from .interfaces.plugins import ICredentialsUpdatePlugin
1✔
53
from .interfaces.plugins import IExtractionPlugin
1✔
54
from .interfaces.plugins import IGroupEnumerationPlugin
1✔
55
from .interfaces.plugins import IGroupsPlugin
1✔
56
from .interfaces.plugins import ILoginPasswordHostExtractionPlugin
1✔
57
from .interfaces.plugins import INotCompetentPlugin
1✔
58
from .interfaces.plugins import IPropertiesPlugin
1✔
59
from .interfaces.plugins import IRequestTypeSniffer
1✔
60
from .interfaces.plugins import IRoleAssignerPlugin
1✔
61
from .interfaces.plugins import IRoleEnumerationPlugin
1✔
62
from .interfaces.plugins import IRolesPlugin
1✔
63
from .interfaces.plugins import IUpdatePlugin
1✔
64
from .interfaces.plugins import IUserAdderPlugin
1✔
65
from .interfaces.plugins import IUserEnumerationPlugin
1✔
66
from .interfaces.plugins import IUserFactoryPlugin
1✔
67
from .interfaces.plugins import IValidationPlugin
1✔
68
from .permissions import SearchPrincipals
1✔
69
from .PropertiedUser import PropertiedUser
1✔
70
from .utils import _wwwdir
1✔
71
from .utils import classImplements
1✔
72
from .utils import createKeywords
1✔
73
from .utils import createViewName
1✔
74
from .utils import url_local
1✔
75

76

77
security = ModuleSecurityInfo(
1✔
78
    'Products.PluggableAuthService.PluggableAuthService')
79

80
logger = logging.getLogger('PluggableAuthService')
1✔
81

82
#   Errors which plugins may raise, and which we suppress:
83
_SWALLOWABLE_PLUGIN_EXCEPTIONS = (NameError, AttributeError, KeyError,
1✔
84
                                  TypeError, ValueError)
85

86

87
# except if they tell us not to do so
88
def reraise(plugin):
1✔
89
    try:
1✔
90
        doreraise = plugin._dont_swallow_my_exceptions
1✔
91
    except AttributeError:
1✔
92
        return
1✔
93
    if doreraise:
×
94
        raise
×
95

96

97
MultiPlugins = []
1✔
98

99

100
def registerMultiPlugin(meta_type):
1✔
101
    """ Register a 'multi-plugin' in order to expose it to the Add List
102
    """
103
    if meta_type in MultiPlugins:
1!
104
        raise RuntimeError('Meta-type (%s) already available to Add List'
×
105
                           % meta_type)
106
    MultiPlugins.append(meta_type)
1✔
107

108

109
class DumbHTTPExtractor(Implicit):
1✔
110

111
    security = ClassSecurityInfo()
1✔
112

113
    @security.private
1✔
114
    def extractCredentials(self, request):
1✔
115
        """ Pull HTTP credentials out of the request.
116
        """
117
        creds = {}
1✔
118
        login_pw = request._authUserPW()
1✔
119

120
        if login_pw is not None:
1✔
121
            name, password = login_pw
1✔
122

123
            creds['login'] = name
1✔
124
            creds['password'] = password
1✔
125
            creds['remote_host'] = request.get('REMOTE_HOST', '')
1✔
126

127
            try:
1✔
128
                creds['remote_address'] = request.getClientAddr()
1✔
129
            except AttributeError:
1✔
130
                creds['remote_address'] = request.get('REMOTE_ADDR', '')
1✔
131

132
        return creds
1✔
133

134

135
classImplements(DumbHTTPExtractor, ILoginPasswordHostExtractionPlugin)
1✔
136

137

138
InitializeClass(DumbHTTPExtractor)
1✔
139

140

141
class EmergencyUserAuthenticator(Implicit):
1✔
142

143
    security = ClassSecurityInfo()
1✔
144

145
    @security.private
1✔
146
    def authenticateCredentials(self, credentials):
1✔
147
        """ Check credentials against the emergency user.
148
        """
149
        if isinstance(credentials, dict):
1!
150

151
            eu = emergency_user
1✔
152
            eu_name = eu.getUserName()
1✔
153
            login = credentials.get('login')
1✔
154

155
            if login == eu_name:
1✔
156
                password = credentials.get('password')
1✔
157

158
                if eu.authenticate(password, {}):
1!
159
                    return (eu_name, None)
1✔
160

161
        return (None, None)
1✔
162

163

164
classImplements(EmergencyUserAuthenticator, IAuthenticationPlugin)
1✔
165

166

167
InitializeClass(EmergencyUserAuthenticator)
1✔
168

169

170
class PluggableAuthService(Folder, Cacheable):
1✔
171

172
    """ All-singing, all-dancing user folder.
173
    """
174
    security = ClassSecurityInfo()
1✔
175

176
    meta_type = 'Pluggable Auth Service'
1✔
177
    zmi_icon = 'fa fa-users-cog'
1✔
178
    zmi_show_add_dialog = False
1✔
179

180
    _id = id = 'acl_users'
1✔
181

182
    _emergency_user = emergency_user
1✔
183
    _nobody = nobody
1✔
184

185
    maxlistusers = -1   # Don't allow local role form to try to list us!
1✔
186

187
    # Method for transforming a login name.  This needs to be the name
188
    # of a method on this plugin.  See the applyTransform method.
189
    login_transform = ''
1✔
190

191
    _properties = (
1✔
192
        dict(id='title', type='string', mode='w', label='Title'),
193
        dict(id='login_transform', type='string', mode='w',
194
             label='Transform to apply to login name'),
195
    )
196

197
    def getId(self):
1✔
198

199
        return self._id
1✔
200

201
    #
202
    #   IUserFolder implementation
203
    #
204
    @security.protected(ManageUsers)
1✔
205
    def getUser(self, name):
1✔
206
        """ See IUserFolder.
207
        """
208
        plugins = self._getOb('plugins')
1✔
209

210
        name = self.applyTransform(name)
1✔
211
        user_info = self._verifyUser(plugins, login=name)
1✔
212

213
        if not user_info:
1✔
214
            return None
1✔
215

216
        return self._findUser(plugins, user_info['id'], user_info['login'])
1✔
217

218
    @security.protected(ManageUsers)
1✔
219
    def getUserById(self, id, default=None):
1✔
220
        """ See IUserFolder.
221
        """
222
        plugins = self._getOb('plugins')
1✔
223

224
        user_info = self._verifyUser(plugins, user_id=id)
1✔
225

226
        if not user_info:
1✔
227
            return default
1✔
228

229
        return self._findUser(plugins, user_info['id'], user_info['login'])
1✔
230

231
    @security.public
1✔
232
    def validate(self, request, auth='', roles=_noroles):
1✔
233
        """ See IUserFolder.
234
        """
235
        plugins = self._getOb('plugins')
1✔
236
        is_top = self._isTop()
1✔
237

238
        if not is_top and self._isNotCompetent(request, plugins):
1✔
239
            # this user folder should not try to authenticate this request
240
            return None
1✔
241

242
        user_ids = self._extractUserIds(request, plugins)
1✔
243
        (accessed, container, name, value) = \
1✔
244
            self._getObjectContext(request['PUBLISHED'], request)
245

246
        for user_id, login in user_ids:
1✔
247

248
            user = self._findUser(plugins, user_id, login, request=request)
1✔
249

250
            if aq_base(user) is emergency_user:
1!
251

252
                if is_top:
×
253
                    return user
×
254
                else:
255
                    return None
×
256

257
            if self._authorizeUser(user, accessed, container, name, value,
1✔
258
                                   roles):
259
                return user
1✔
260

261
        if not is_top:
1✔
262
            return None
1✔
263

264
        #
265
        #   No other user folder above us can satisfy, and we have no user;
266
        #   return a constructed anonymous only if anonymous is authorized.
267
        #
268
        anonymous = self._createAnonymousUser(plugins)
1✔
269
        if self._authorizeUser(anonymous, accessed, container, name,
1✔
270
                               value, roles):
271
            return anonymous
1✔
272

273
        return None
1✔
274

275
    @security.protected(SearchPrincipals)
1✔
276
    def searchUsers(self, **kw):
1✔
277
        """ Search for users
278
        """
279
        search_name = kw.get('name', None)
1✔
280

281
        result = []
1✔
282
        max_results = kw.get('max_results', '')
1✔
283
        sort_by = kw.get('sort_by', '')
1✔
284

285
        # We apply sorting and slicing here across all sets, so don't
286
        # make the plugin do it
287
        if sort_by:
1!
288
            del kw['sort_by']
×
289
        if max_results:
1!
290
            del kw['max_results']
×
291
        if search_name:
1✔
292
            if kw.get('id') is not None:
1!
293
                del kw['id']  # don't even bother searching by id
×
294
            # Note: name can be a sequence.
295
            kw['login'] = self.applyTransform(kw['name'])
1✔
296
        if kw.get('login', None):
1✔
297
            kw['login'] = self.applyTransform(kw['login'])
1✔
298

299
        plugins = self._getOb('plugins')
1✔
300
        enumerators = plugins.listPlugins(IUserEnumerationPlugin)
1✔
301

302
        for enumerator_id, enum in enumerators:
1✔
303
            try:
1✔
304
                user_list = enum.enumerateUsers(**kw)
1✔
305
                for user_info in user_list:
1✔
306
                    info = {}
1✔
307
                    info.update(user_info)
1✔
308
                    info['userid'] = info['id']
1✔
309
                    info['principal_type'] = 'user'
1✔
310
                    if 'title' not in info:
1!
311
                        info['title'] = info['login']
1✔
312
                    result.append(info)
1✔
313

314
            except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
×
315
                reraise(enum)
×
316
                logger.debug('UserEnumerationPlugin %s error' % enumerator_id,
×
317
                             exc_info=True)
318

319
        if sort_by:
1!
320
            result = sorted(result, key=lambda x: x.get(sort_by, '').lower())
×
321

322
        if max_results:
1!
323
            try:
×
324
                max_results = int(max_results)
×
325
                result = result[:max_results]
×
326
            except ValueError:
×
327
                pass
×
328

329
        return tuple(result)
1✔
330

331
    @security.protected(SearchPrincipals)
1✔
332
    def searchGroups(self, **kw):
1✔
333
        """ Search for groups
334
        """
335
        search_name = kw.get('name', None)
1✔
336

337
        result = []
1✔
338
        max_results = kw.get('max_results', '')
1✔
339
        sort_by = kw.get('sort_by', '')
1✔
340

341
        # We apply sorting and slicing here across all sets, so don't
342
        # make the plugin do it
343
        if sort_by:
1!
344
            del kw['sort_by']
×
345
        if max_results:
1!
346
            del kw['max_results']
×
347
        if search_name:
1✔
348
            if kw.get('id') is not None:
1!
349
                del kw['id']
×
350
            if 'title' not in kw:
1!
351
                kw['title'] = kw['name']
×
352

353
        plugins = self._getOb('plugins')
1✔
354
        enumerators = plugins.listPlugins(IGroupEnumerationPlugin)
1✔
355

356
        for enumerator_id, enum in enumerators:
1✔
357
            try:
1✔
358
                group_list = enum.enumerateGroups(**kw)
1✔
359
                for group_info in group_list:
1✔
360
                    info = {}
1✔
361
                    info.update(group_info)
1✔
362
                    info['groupid'] = info['id']
1✔
363
                    info['principal_type'] = 'group'
1✔
364
                    if 'title' not in info:
1!
365
                        info['title'] = '(Group) %s' % info['groupid']
1✔
366
                    result.append(info)
1✔
367
            except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
×
368
                reraise(enum)
×
369
                logger.debug('GroupEnumerationPlugin %s error' % enumerator_id,
×
370
                             exc_info=True)
371

372
        if sort_by:
1!
373
            result = sorted(result, key=lambda x: x.get(sort_by, '').lower())
×
374

375
        if max_results:
1!
376
            try:
×
377
                max_results = int(max_results)
×
378
                result = result[:max_results + 1]
×
379
            except ValueError:
×
380
                pass
×
381

382
        return tuple(result)
1✔
383

384
    @security.protected(SearchPrincipals)
1✔
385
    def searchPrincipals(self, groups_first=False, **kw):
1✔
386
        """ Search for principals (users, groups, or both)
387
        """
388
        max_results = kw.get('max_results', '')
1✔
389

390
        search_name = kw.get('name', None)
1✔
391
        if search_name:
1✔
392
            if 'title' not in kw:
1!
393
                kw['title'] = search_name
1✔
394
            kw['login'] = search_name
1✔
395

396
        # For groups we search the original name
397
        # (e.g. Administrators), for users we apply the transform,
398
        # which could lowercase the name.
399
        groups = [d.copy() for d in self.searchGroups(**kw)]
1✔
400
        if kw.get('login', None):
1✔
401
            kw['login'] = self.applyTransform(kw['login'])
1✔
402
        users = [d.copy() for d in self.searchUsers(**kw)]
1✔
403

404
        if groups_first:
1!
405
            result = groups + users
×
406
        else:
407
            result = users + groups
1✔
408

409
        if max_results:
1!
410
            try:
×
411
                max_results = int(max_results)
×
412
                result = result[:max_results + 1]
×
413
            except ValueError:
×
414
                pass
×
415

416
        return tuple(result)
1✔
417

418
    @security.private
1✔
419
    def __creatable_by_emergency_user__(self):
1✔
420
        return 1
1✔
421

422
    @security.private
1✔
423
    def _setObject(self, id, object, roles=None, user=None, set_owner=0,
1✔
424
                   suppress_events=False):
425
        #
426
        #   Override ObjectManager's version to change the default for
427
        #   'set_owner' (we don't want to enforce ownership on contained
428
        #   objects).
429
        Folder._setObject(self, id, object, roles, user, set_owner)
1✔
430

431
    @security.private
1✔
432
    def _delOb(self, id):
1✔
433
        #
434
        #   Override ObjectManager's version to clean up any plugin
435
        #   registrations for the deleted object
436
        #
437
        plugins = self._getOb('plugins', None)
1✔
438

439
        if plugins is not None:
1!
440
            plugins.removePluginById(id)
1✔
441

442
        Folder._delOb(self, id)
1✔
443

444
    #
445
    # ZMI stuff
446
    #
447
    security.declareProtected(ManageUsers, 'manage_search')  # NOQA: D001
1✔
448
    manage_search = PageTemplateFile('www/pasSearch', globals())
1✔
449

450
    manage_options = (Folder.manage_options[:1]
1✔
451
                      + ({'label': 'Search', 'action': 'manage_search'},)
452
                      + Folder.manage_options[2:]
453
                      + Cacheable.manage_options)
454

455
    @security.protected(ManageUsers)
1✔
456
    def resultsBatch(self, results, REQUEST, size=20, orphan=2, overlap=0):
1✔
457
        """ ZMI helper for getting batching for displaying search results
458
        """
459
        try:
×
460
            start_val = REQUEST.get('batch_start', '0')
×
461
            start = int(start_val)
×
462
            size = int(REQUEST.get('batch_size', size))
×
463
        except ValueError:
×
464
            start = 0
×
465

466
        batch = Batch(results, size, start, 0, orphan, overlap)
×
467

468
        if batch.end < len(results):
×
469
            qs = self._getBatchLink(REQUEST.get('QUERY_STRING', ''),
×
470
                                    start, batch.end)
471
            REQUEST.set('next_batch_url', '{}?{}'.format(
×
472
                REQUEST.get('URL'), qs))
473

474
        if start > 0:
×
475
            new_start = start - size - 1
×
476

477
            if new_start < 0:
×
478
                new_start = 0
×
479

480
            qs = self._getBatchLink(REQUEST.get('QUERY_STRING', ''),
×
481
                                    start, new_start)
482
            REQUEST.set('previous_batch_url',
×
483
                        '{}?{}'.format(REQUEST.get('URL'), qs))
484

485
        return batch
×
486

487
    @security.private
1✔
488
    def _getBatchLink(self, qs, old_start, new_start):
1✔
489
        """ Internal helper to generate correct query strings
490
        """
491
        if new_start is not None:
×
492
            if not qs:
×
493
                qs = 'batch_start=%d' % new_start
×
494
            elif qs.startswith('batch_start='):
×
495
                qs = qs.replace('batch_start=%d' % old_start,
×
496
                                'batch_start=%d' % new_start)
497
            elif qs.find('&batch_start=') != -1:
×
498
                qs = qs.replace('&batch_start=%d' % old_start,
×
499
                                '&batch_start=%d' % new_start)
500
            else:
501
                qs = '%s&batch_start=%d' % (qs, new_start)
×
502

503
        return qs
×
504

505
    #
506
    #   Helper methods
507
    #
508
    @security.private
1✔
509
    def _isNotCompetent(self, request, plugins):
1✔
510
        """ return true when this user folder should not try authentication.
511

512
        Never called for top level user folder.
513
        """
514
        try:
1✔
515
            not_competents = plugins.listPlugins(INotCompetentPlugin)
1✔
516
        except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
×
517
            logger.debug('NotCompetent plugin listing error', exc_info=True)
×
518
            not_competents = ()
×
519

520
        for not_competent_id, not_competent in not_competents:
1✔
521
            try:
1✔
522
                if not_competent.isNotCompetentToAuthenticate(request):
1✔
523
                    return True
1✔
524
            except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
1✔
525
                reraise(not_competent)
1✔
526
                logger.debug('NotCompetentPlugin %s error' % not_competent_id,
1✔
527
                             exc_info=True)
528
                continue
1✔
529
        return False
1✔
530

531
    @security.private
1✔
532
    def _extractUserIds(self, request, plugins):
1✔
533
        """ request -> [(validated_user_id, login)]
534

535
        o For each set of extracted credentials, try to authenticate
536
          a user;  accumulate a list of pairs (ID, login) of such users
537
          over all our authentication and extraction plugins.
538
        """
539
        try:
1✔
540
            extractors = plugins.listPlugins(IExtractionPlugin)
1✔
541
        except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
×
542
            logger.debug('Extractor plugin listing error', exc_info=True)
×
543
            extractors = ()
×
544

545
        if not extractors:
1✔
546
            extractors = (('default', DumbHTTPExtractor()),)
1✔
547

548
        try:
1✔
549
            authenticators = plugins.listPlugins(IAuthenticationPlugin)
1✔
550
        except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
×
551
            logger.debug('Authenticator plugin listing error', exc_info=True)
×
552
            authenticators = ()
×
553

554
        result = []
1✔
555

556
        for extractor_id, extractor in extractors:
1✔
557

558
            try:
1✔
559
                credentials = extractor.extractCredentials(request)
1✔
560
            except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
×
561
                reraise(extractor)
×
562
                logger.debug('ExtractionPlugin %s error' % extractor_id,
×
563
                             exc_info=True)
564
                continue
×
565

566
            if credentials:
1✔
567

568
                try:
1✔
569
                    credentials['extractor'] = extractor_id  # ??? in key?
1✔
570
                    # Test if ObjectCacheEntries.aggregateIndex would work
571
                    items = sorted(credentials.items())  # noqa
1✔
572
                except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
1✔
573
                    # ???: would reraise be good here, and which plugin to ask
574
                    # whether not to swallow the exception - the extractor?
575
                    logger.debug('Credentials error: %s' % credentials,
1✔
576
                                 exc_info=True)
577
                    continue
1✔
578

579
                # First try to authenticate against the emergency
580
                # user and return immediately if authenticated
581
                user_id, name = self._tryEmergencyUserAuthentication(
1✔
582
                    credentials)
583

584
                if user_id is not None:
1!
585
                    return [(user_id, name)]
×
586

587
                # Now see if the user ids can be retrieved from the cache
588
                credentials['login'] = self.applyTransform(
1✔
589
                    credentials.get('login'))
590
                view_name = createViewName('_extractUserIds',
1✔
591
                                           credentials.get('login'))
592
                keywords = createKeywords(**credentials)
1✔
593
                user_ids = self.ZCacheable_get(view_name=view_name,
1✔
594
                                               keywords=keywords,
595
                                               default=None)
596
                if user_ids is None:
1✔
597
                    user_ids = []
1✔
598

599
                    for authenticator_id, auth in authenticators:
1✔
600

601
                        try:
1✔
602
                            uid_and_info = auth.authenticateCredentials(
1✔
603
                                credentials)
604

605
                            if uid_and_info is None:
1✔
606
                                continue
1✔
607

608
                            user_id, info = uid_and_info
1✔
609

610
                        except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
1✔
611
                            reraise(auth)
1✔
612
                            logger.debug(
1✔
613
                                f'AuthenticationPlugin {authenticator_id}'
614
                                ' error', exc_info=True)
615
                            continue
1✔
616

617
                        if user_id is not None:
1✔
618
                            user_ids.append((user_id, info))
1✔
619

620
                    if user_ids:
1✔
621
                        self.ZCacheable_set(user_ids, view_name=view_name,
1✔
622
                                            keywords=keywords)
623

624
                result.extend(user_ids)
1✔
625

626
        # Emergency user via HTTP basic auth always wins
627
        user_id, name = self._tryEmergencyUserAuthentication(
1✔
628
            DumbHTTPExtractor().extractCredentials(request))
629

630
        if user_id is not None:
1✔
631
            return [(user_id, name)]
1✔
632

633
        return result
1✔
634

635
    @security.private
1✔
636
    def _tryEmergencyUserAuthentication(self, credentials):
1✔
637
        """ credentials -> emergency_user or None
638
        """
639
        try:
1✔
640
            eua = EmergencyUserAuthenticator()
1✔
641
            user_id, name = eua.authenticateCredentials(credentials)
1✔
642
        except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
×
643
            logger.debug('Credentials error: %s' % credentials, exc_info=True)
×
644
            user_id, name = (None, None)
×
645

646
        return (user_id, name)
1✔
647

648
    @security.private
1✔
649
    def _getGroupsForPrincipal(self, principal, request=None, plugins=None,
1✔
650
                               ignore_plugins=None):
651
        all_groups = {}
1✔
652

653
        if ignore_plugins is None:
1✔
654
            ignore_plugins = ()
1✔
655

656
        if plugins is None:
1!
657
            plugins = self._getOb('plugins')
×
658
        groupmakers = plugins.listPlugins(IGroupsPlugin)
1✔
659

660
        for groupmaker_id, groupmaker in groupmakers:
1✔
661

662
            if groupmaker_id in ignore_plugins:
1✔
663
                continue
1✔
664
            groups = groupmaker.getGroupsForPrincipal(principal, request)
1✔
665
            for group in groups:
1✔
666
                principal._addGroups([group])
1✔
667
                all_groups[group] = 1
1✔
668

669
        return all_groups.keys()
1✔
670

671
    @security.private
1✔
672
    def _createAnonymousUser(self, plugins):
1✔
673
        """ Allow IAnonymousUserFactoryPlugins to create or fall back.
674
        """
675
        factories = plugins.listPlugins(IAnonymousUserFactoryPlugin)
1✔
676

677
        for factory_id, factory in factories:
1✔
678

679
            anon = factory.createAnonymousUser()
1✔
680

681
            if anon is not None:
1!
682
                return anon.__of__(self)
1✔
683

684
        return nobody.__of__(self)
1✔
685

686
    @security.private
1✔
687
    def _createUser(self, plugins, user_id, name):
1✔
688
        """ Allow IUserFactoryPlugins to create, or fall back to default.
689
        """
690
        name = self.applyTransform(name)
1✔
691
        factories = plugins.listPlugins(IUserFactoryPlugin)
1✔
692

693
        for factory_id, factory in factories:
1✔
694

695
            user = factory.createUser(user_id, name)
1✔
696

697
            if user is not None:
1!
698
                return user.__of__(self)
1✔
699

700
        return PropertiedUser(user_id, name).__of__(self)
1✔
701

702
    @security.private
1✔
703
    def _findUser(self, plugins, user_id, name=None, request=None):
1✔
704
        """ user_id -> decorated_user
705
        """
706
        if user_id == self._emergency_user.getUserName():
1✔
707
            return self._emergency_user
1✔
708

709
        # See if the user can be retrieved from the cache
710
        view_name = createViewName('_findUser', user_id)
1✔
711
        name = self.applyTransform(name)
1✔
712
        keywords = createKeywords(user_id=user_id, name=name)
1✔
713
        user = self.ZCacheable_get(view_name=view_name, keywords=keywords,
1✔
714
                                   default=None)
715

716
        if user is None:
1✔
717

718
            user = self._createUser(plugins, user_id, name)
1✔
719
            propfinders = plugins.listPlugins(IPropertiesPlugin)
1✔
720

721
            for propfinder_id, propfinder in propfinders:
1✔
722

723
                data = propfinder.getPropertiesForUser(user, request)
1✔
724
                if data:
1!
725
                    user.addPropertysheet(propfinder_id, data)
1✔
726

727
            groups = self._getGroupsForPrincipal(user, request,
1✔
728
                                                 plugins=plugins)
729
            user._addGroups(groups)
1✔
730

731
            rolemakers = plugins.listPlugins(IRolesPlugin)
1✔
732

733
            for rolemaker_id, rolemaker in rolemakers:
1✔
734
                try:
1✔
735
                    roles = rolemaker.getRolesForPrincipal(user, request)
1✔
736
                except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
1✔
737
                    logger.debug('IRolesPlugin %s error' % rolemaker_id,
1✔
738
                                 exc_info=True)
739
                else:
740
                    if roles:
1✔
741
                        user._addRoles(roles)
1✔
742

743
            user._addRoles(['Authenticated'])
1✔
744

745
            # Cache the user if caching is enabled
746
            base_user = aq_base(user)
1✔
747
            if getattr(base_user, '_p_jar', None) is None:
1!
748
                self.ZCacheable_set(base_user, view_name=view_name,
1✔
749
                                    keywords=keywords)
750

751
        return user.__of__(self)
1✔
752

753
    @security.private
1✔
754
    def _verifyUser(self, plugins, user_id=None, login=None):
1✔
755
        """ user_id -> info_dict or None
756
        """
757
        if user_id is None and login is None:
1✔
758
            # Avoid possible hugely expensive and/or wrong behavior of
759
            # plugin enumerators.
760
            return None
1✔
761

762
        criteria = {'exact_match': True}
1✔
763

764
        if user_id is not None:
1✔
765
            criteria['id'] = user_id
1✔
766

767
        if login is not None:
1✔
768
            criteria['login'] = self.applyTransform(login)
1✔
769

770
        view_name = createViewName('_verifyUser', user_id or login)
1✔
771
        keywords = createKeywords(**criteria)
1✔
772
        cached_info = self.ZCacheable_get(view_name=view_name,
1✔
773
                                          keywords=keywords, default=None)
774

775
        if cached_info is not None:
1✔
776
            return cached_info
1✔
777

778
        enumerators = plugins.listPlugins(IUserEnumerationPlugin)
1✔
779

780
        for enumerator_id, enumerator in enumerators:
1✔
781
            try:
1✔
782
                info = enumerator.enumerateUsers(**criteria)
1✔
783

784
                if info:
1✔
785
                    # Put the computed value into the cache
786
                    self.ZCacheable_set(info[0], view_name=view_name,
1✔
787
                                        keywords=keywords)
788
                    return info[0]
1✔
789

790
            except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
×
791
                reraise(enumerator)
×
792
                msg = 'UserEnumerationPlugin %s error' % enumerator_id
×
793
                logger.debug(msg, exc_info=True)
×
794

795
        return None
1✔
796

797
    @security.private
1✔
798
    def _authorizeUser(self, user, accessed, container, name, value,
1✔
799
                       roles=_noroles):
800
        """ -> boolean (whether user has roles).
801

802
        o Add the user to the SM's stack, if successful.
803

804
        o Return
805
        """
806
        user = aq_base(user).__of__(self)
1✔
807
        newSecurityManager(None, user)
1✔
808
        security = getSecurityManager()
1✔
809
        try:
1✔
810
            try:
1✔
811
                if roles is _noroles:
1✔
812
                    if security.validate(accessed, container, name, value):
1!
813
                        return 1
1✔
814
                else:
815
                    if security.validate(accessed, container, name, value,
1✔
816
                                         roles):
817
                        return 1
1✔
818
            except:  # noqa
1✔
819
                noSecurityManager()
1✔
820
                raise
1✔
821

822
        except Unauthorized:
1✔
823
            pass
1✔
824

825
        return 0
1✔
826

827
    @security.private
1✔
828
    def _isTop(self):
1✔
829
        """ Are we the user folder in the root object?
830
        """
831
        try:
1✔
832
            parent = aq_base(aq_parent(self))
1✔
833
            if parent is None:
1!
834
                return 0
×
835
            return parent.isTopLevelPrincipiaApplicationObject
1✔
836
        except AttributeError:
1✔
837
            return 0
1✔
838

839
    @security.private
1✔
840
    def _getObjectContext(self, v, request):
1✔
841
        """ request -> (a, c, n, v)
842

843
        o 'a 'is the object the object was accessed through
844

845
        o 'c 'is the physical container of the object
846

847
        o 'n 'is the name used to access the object
848

849
        o 'v' is the object (value) we're validating access to
850

851
        o ???:  Lifted from AccessControl.User.BasicUserFolder._getobcontext
852
        """
853
        if len(request.steps) == 0:  # someone deleted root index_html
1✔
854

855
            request['RESPONSE'].notFoundError(
1✔
856
                'no default view (root default view was probably deleted)')
857

858
        root = request['PARENTS'][-1]
1✔
859
        request_container = aq_parent(root)
1✔
860

861
        n = request.steps[-1]
1✔
862

863
        # default to accessed and container as v.aq_parent
864
        a = c = request['PARENTS'][0]
1✔
865

866
        # try to find actual container
867
        inner = aq_inner(v)
1✔
868
        innerparent = aq_parent(inner)
1✔
869

870
        if innerparent is not None:
1✔
871

872
            # this is not a method, we needn't treat it specially
873
            c = innerparent
1✔
874

875
        elif hasattr(v, '__self__'):
1!
876

877
            # this is a method, we need to treat it specially
878
            c = v.__self__
1✔
879
            c = aq_inner(v)
1✔
880

881
        # if pub's aq_parent or container is the request container, it
882
        # means pub was accessed from the root
883
        if a is request_container:
1✔
884
            a = root
1✔
885

886
        if c is request_container:
1✔
887
            c = root
1✔
888

889
        return a, c, n, v
1✔
890

891
    @security.private
1✔
892
    def _getEmergencyUser(self):
1✔
893

894
        return emergency_user.__of__(self)
1✔
895

896
    @security.private
1✔
897
    def _doAddUser(self, login, password, roles, domains, **kw):
1✔
898
        """ Create a user with login, password and roles if, and only if,
899
            we have a registered user manager and role manager that will
900
            accept specific plugin interfaces.
901
        """
902
        plugins = self._getOb('plugins')
1✔
903
        useradders = plugins.listPlugins(IUserAdderPlugin)
1✔
904
        roleassigners = plugins.listPlugins(IRoleAssignerPlugin)
1✔
905

906
        user = None
1✔
907
        login = self.applyTransform(login)
1✔
908

909
        if not (useradders and roleassigners):
1✔
910
            raise NotImplementedError('There are no plugins'
911
                                      ' that can create'
912
                                      ' users and assign roles to them.')
913

914
        for useradder_id, useradder in useradders:
1!
915
            if useradder.doAddUser(login, password):
1!
916
                # ???: Adds user to cache, but without roles...
917
                user = self.getUser(login)
1✔
918
                break
1✔
919

920
        # No useradder was successful.
921
        if user is None:
1!
922
            return
×
923

924
        for roleassigner_id, roleassigner in roleassigners:
1✔
925
            for role in roles:
1✔
926
                try:
1✔
927
                    roleassigner.doAssignRoleToPrincipal(user.getId(), role)
1✔
928
                except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
1✔
929
                    reraise(roleassigner)
1✔
930
                    logger.debug('RoleAssigner %s error' % roleassigner_id,
1✔
931
                                 exc_info=True)
932
                    pass
1✔
933

934
        if user is not None:
1!
935
            notify(PrincipalCreated(user))
1✔
936
        return user
1✔
937

938
    @security.public
1✔
939
    def all_meta_types(self):
1✔
940
        """ What objects can be put in here?
941
        """
942
        import Products
×
943
        meta_types = Products.meta_types
×
944
        allowed_types = tuple(MultiPlugins) + (RAMCacheManager.meta_type,)
×
945

946
        meta_types = [x for x in meta_types if x['name'] in allowed_types]
×
947
        dlg_type = 'zmi_show_add_dialog'
×
948

949
        for mt in meta_types:
×
950
            want_modal = getattr(mt.get('instance', None), dlg_type, True)
×
951
            mt[dlg_type] = 'modal' if want_modal else ''
×
952

953
        return meta_types
×
954

955
    @security.private
1✔
956
    def manage_beforeDelete(self, item, container):
1✔
957
        if item is self:
1!
958
            try:
1✔
959
                del container.__allow_groups__
1✔
960
            except:  # noqa
×
961
                pass
×
962

963
            handle = self.meta_type + '/' + self.getId()
1✔
964
            BeforeTraverse.unregisterBeforeTraverse(container, handle)
1✔
965

966
    @security.private
1✔
967
    def manage_afterAdd(self, item, container):
1✔
968
        if item is self:
1!
969
            container.__allow_groups__ = aq_base(self)
1✔
970

971
            handle = self.meta_type + '/' + self.getId()
1✔
972
            container = container.this()
1✔
973
            nc = BeforeTraverse.NameCaller(self.getId())
1✔
974
            BeforeTraverse.registerBeforeTraverse(container, nc, handle)
1✔
975

976
    def __call__(self, container, req):
1✔
977
        """ The __before_publishing_traverse__ hook.
978
        """
979
        resp = req['RESPONSE']
1✔
980
        req._hold(ResponseCleanup(resp))
1✔
981
        stack = getattr(resp, '_unauthorized_stack', [])
1✔
982
        stack.append(resp._unauthorized)
1✔
983
        resp._unauthorized_stack = stack
1✔
984
        resp._unauthorized = self._unauthorized
1✔
985
        resp._has_challenged = False
1✔
986

987
    @security.private
1✔
988
    def applyTransform(self, value):
1✔
989
        """ Transform for login name.
990

991
        Possibly transform the login, for example by making it lower
992
        case.
993

994
        value must be a string (or unicode) or it may be a sequence
995
        (list, tuple), in which case we need to iterate over it.
996
        """
997
        if not value:
1✔
998
            return value
1✔
999
        transform = self._get_login_transform_method()
1✔
1000
        if not transform:
1✔
1001
            return value
1✔
1002
        if isinstance(value, str):
1✔
1003
            return transform(value)
1✔
1004
        result = []
1✔
1005
        for v in value:
1✔
1006
            result.append(transform(v))
1✔
1007
        if isinstance(value, tuple):
1✔
1008
            return tuple(result)
1✔
1009
        return result
1✔
1010

1011
    @security.private
1✔
1012
    def _get_login_transform_method(self):
1✔
1013
        """ Get the transform method for the login name or None.
1014
        """
1015
        login_transform = getattr(self, 'login_transform', None)
1✔
1016
        if not login_transform:
1✔
1017
            return
1✔
1018
        transform = getattr(self, login_transform.strip(), None)
1✔
1019
        if transform is None:
1✔
1020
            logger.debug('Transform method %r not found in plugin %r.',
1✔
1021
                         self.login_transform, self)
1022
            return
1✔
1023
        return transform
1✔
1024

1025
    @security.private
1✔
1026
    def _setPropValue(self, id, value):
1✔
1027
        if id == 'login_transform':
1!
1028
            orig_value = getattr(self, id)
×
1029
        super()._setPropValue(id, value)
1✔
1030
        if id == 'login_transform' and value and value != orig_value:
1!
1031
            logger.debug('login_transform changed from %r to %r. '
×
1032
                         'Updating existing login names.', orig_value, value)
1033
            self.updateAllLoginNames()
×
1034

1035
    @security.private
1✔
1036
    def lower(self, value):
1✔
1037
        """ Transform for login name.
1038

1039
        Strip the value and lowercase it.
1040

1041
        To use this, set login_tranform to 'lower'.
1042
        """
1043
        return value.strip().lower()
1✔
1044

1045
    @security.private
1✔
1046
    def upper(self, value):
1✔
1047
        """ Transform for login name.
1048

1049
        Strip the value and uppercase it.
1050

1051
        To use this, set login_tranform to 'upper'.
1052
        """
1053
        return value.strip().upper()
1✔
1054

1055
    #
1056
    # Response override
1057
    #
1058
    def _unauthorized(self):
1✔
1059
        req = self.REQUEST
1✔
1060
        resp = req['RESPONSE']
1✔
1061
        if resp._has_challenged:  # Been here already
1!
1062
            return
×
1063
        if not self.challenge(req, resp):
1✔
1064
            # Need to fall back here
1065
            resp = self._cleanupResponse()
1✔
1066
            resp._unauthorized()
1✔
1067
        else:
1068
            resp._has_challenged = True
1✔
1069

1070
    def challenge(self, request, response):
1✔
1071
        plugins = self._getOb('plugins')
1✔
1072

1073
        # Find valid protocols for this request type
1074
        valid_protocols = []
1✔
1075
        choosers = []
1✔
1076
        try:
1✔
1077
            choosers = plugins.listPlugins(IChallengeProtocolChooser)
1✔
1078
        except KeyError:
×
1079
            # Work around the fact that old instances might not have
1080
            # IChallengeProtocolChooser registered with the
1081
            # PluginRegistry.
1082
            pass
×
1083

1084
        for chooser_id, chooser in choosers:
1✔
1085
            choosen = chooser.chooseProtocols(request)
1✔
1086
            if choosen is None:
1✔
1087
                continue
1✔
1088
            valid_protocols.extend(choosen)
1✔
1089

1090
        # Go through all challenge plugins
1091
        challengers = plugins.listPlugins(IChallengePlugin)
1✔
1092

1093
        protocol = None
1✔
1094

1095
        for challenger_id, challenger in challengers:
1✔
1096
            challenger_protocol = getattr(challenger, 'protocol',
1✔
1097
                                          challenger_id)
1098
            if valid_protocols and challenger_protocol not in valid_protocols:
1✔
1099
                # Skip invalid protocol for this request type.
1100
                continue
1✔
1101
            if protocol is None or protocol == challenger_protocol:
1✔
1102
                if challenger.challenge(request, response):
1✔
1103
                    protocol = challenger_protocol
1✔
1104

1105
        if protocol is not None:
1✔
1106
            # something fired, so it was a successful PAS challenge
1107
            return True
1✔
1108

1109
        # nothing fired, so trigger the fallback
1110
        return False
1✔
1111

1112
    def _cleanupResponse(self):
1✔
1113
        resp = self.REQUEST['RESPONSE']
1✔
1114
        # No errors of any sort may propagate, and we don't care *what*
1115
        # they are, even to log them.
1116
        stack = getattr(resp, '_unauthorized_stack', [])
1✔
1117

1118
        if stack:
1!
1119
            resp._unauthorized = stack.pop()
1✔
1120
        else:
1121
            try:
×
1122
                del resp._unauthorized
×
1123
            except:  # noqa
×
1124
                pass
×
1125

1126
        return resp
1✔
1127

1128
    @security.public
1✔
1129
    def hasUsers(self):
1✔
1130
        """Zope quick start sacrifice.
1131

1132
        The quick start page expects a hasUsers() method.
1133
        """
1134
        return True
×
1135

1136
    @security.public
1✔
1137
    def updateCredentials(self, request, response, login, new_password):
1✔
1138
        """Central updateCredentials method
1139

1140
        This method is needed for cases where the credentials storage and
1141
        the credentials extraction is handled by different plugins. Example
1142
        case would be if the CookieAuthHelper is used as a Challenge and
1143
        Extraction plugin only to take advantage of the login page feature
1144
        but the credentials are not stored in the CookieAuthHelper cookie
1145
        but somewhere else, like in a Session.
1146
        """
1147
        login = self.applyTransform(login)
1✔
1148
        plugins = self._getOb('plugins')
1✔
1149
        cred_updaters = plugins.listPlugins(ICredentialsUpdatePlugin)
1✔
1150

1151
        for updater_id, updater in cred_updaters:
1!
1152
            updater.updateCredentials(request, response, login, new_password)
×
1153

1154
    @security.public
1✔
1155
    def logout(self, REQUEST):
1✔
1156
        """Publicly accessible method to log out a user
1157
        """
1158
        self.resetCredentials(REQUEST, REQUEST['RESPONSE'])
1✔
1159

1160
        # Little bit of a hack: Issuing a redirect to the same place
1161
        # where the user was so that in the second request the now-destroyed
1162
        # credentials can be acted upon to e.g. go back to the login page
1163
        referrer = REQUEST.get('HTTP_REFERER')  # optional header
1✔
1164
        if referrer:
1!
1165
            REQUEST['RESPONSE'].redirect(url_local(referrer))
×
1166

1167
    @security.public
1✔
1168
    def resetCredentials(self, request, response):
1✔
1169
        """Reset credentials by informing all active resetCredentials plugins
1170
        """
1171
        user = getSecurityManager().getUser()
1✔
1172
        if aq_base(user) is not nobody:
1!
1173
            plugins = self._getOb('plugins')
1✔
1174
            cred_resetters = plugins.listPlugins(ICredentialsResetPlugin)
1✔
1175

1176
            for resetter_id, resetter in cred_resetters:
1✔
1177
                resetter.resetCredentials(request, response)
1✔
1178

1179
    @security.protected(ManageUsers)
1✔
1180
    def updateLoginName(self, user_id, login_name):
1✔
1181
        """Update login name of user.
1182
        """
1183
        logger.debug('Called updateLoginName, user_id=%r, login_name=%r',
1✔
1184
                     user_id, login_name)
1185
        login_name = self.applyTransform(login_name)
1✔
1186
        # Note: we do not call self.getUserById(user_id) here to see
1187
        # if this user exists, or call getUserName() on the answer to
1188
        # compare it with the transformed login name, because the user
1189
        # may be reported with a login name that is transformed on the
1190
        # fly, for example in the _verifyUser call, even though the
1191
        # plugin has not actually transformed the login name yet in
1192
        # the backend.
1193
        self._updateLoginName(user_id, login_name)
1✔
1194

1195
    @security.public
1✔
1196
    def updateOwnLoginName(self, login_name):
1✔
1197
        """Update own login name of authenticated user.
1198
        """
1199
        logger.debug('Called updateOwnLoginName, login_name=%r', login_name)
1✔
1200
        login_name = self.applyTransform(login_name)
1✔
1201
        user = getSecurityManager().getUser()
1✔
1202
        if aq_base(user) is nobody:
1✔
1203
            return
1✔
1204
        user_id = user.getId()
1✔
1205
        # Note: we do not compare the login name here.  See the
1206
        # comment in updateLoginName above.
1207
        self._updateLoginName(user_id, login_name)
1✔
1208

1209
    def _updateLoginName(self, user_id, login_name):
1✔
1210
        # Note: we do not compare the login name here.  See the
1211
        # comment in updateLoginName above.
1212
        plugins = self._getOb('plugins')
1✔
1213
        updaters = plugins.listPlugins(IUserEnumerationPlugin)
1✔
1214

1215
        # Call the updaters.  One of them *must* succeed without an
1216
        # exception, even if it does not change anything.  When a
1217
        # login name is already taken, we do not want to fail
1218
        # silently.
1219
        success = False
1✔
1220
        for updater_id, updater in updaters:
1✔
1221
            if not hasattr(updater, 'updateUser'):
1!
1222
                # This was a later addition to the interface, so we
1223
                # are forgiving.
1224
                logger.warn('%s plugin lacks updateUser method of '
×
1225
                            'IUserEnumerationPlugin.', updater_id)
1226
                continue
×
1227
            try:
1✔
1228
                result = updater.updateUser(user_id, login_name)
1✔
1229
            except _SWALLOWABLE_PLUGIN_EXCEPTIONS:
×
1230
                reraise(updater)
×
1231
                logger.debug('UpdateLoginNamePlugin %s error' % updater_id,
×
1232
                             exc_info=True)
1233
            else:
1234
                if result:
1!
1235
                    success = True
1✔
1236
                    logger.debug('login name changed to: %r', login_name)
1✔
1237

1238
        if not success:
1!
1239
            raise ValueError('Cannot update login name of user %r to %r. '
×
1240
                             'Possibly duplicate.' % (user_id, login_name))
1241

1242
    @security.protected(ManageUsers)
1✔
1243
    def updateAllLoginNames(self, quit_on_first_error=True):
1✔
1244
        """Update login names of all users to their canonical value.
1245

1246
        This should be done after changing the login_transform
1247
        property of PAS.
1248

1249
        You can set quit_on_first_error to False to report all errors
1250
        before quitting with an error.  This can be useful if you want
1251
        to know how many problems there are, if any.
1252
        """
1253
        plugins = self._getOb('plugins')
1✔
1254
        updaters = plugins.listPlugins(IUserEnumerationPlugin)
1✔
1255
        for updater_id, updater in updaters:
1✔
1256
            if not hasattr(updater, 'updateEveryLoginName'):
1!
1257
                # This was a later addition to the interface, so we
1258
                # are forgiving.
1259
                logger.warn('%s plugin lacks updateEveryLoginName method of '
×
1260
                            'IUserEnumerationPlugin.', updater_id)
1261
                continue
×
1262
            # Note: do not swallow any exceptions here.
1263
            updater.updateEveryLoginName(
1✔
1264
                quit_on_first_error=quit_on_first_error)
1265

1266

1267
classImplements(PluggableAuthService,
1✔
1268
                (IPluggableAuthService, IObjectManager, IPropertyManager))
1269

1270

1271
InitializeClass(PluggableAuthService)
1✔
1272

1273

1274
class ResponseCleanup:
1✔
1275
    def __init__(self, resp):
1✔
1276
        self.resp = resp
1✔
1277

1278
    def __del__(self):
1✔
1279
        # Free the references.
1280
        #
1281
        # No errors of any sort may propagate, and we don't care *what*
1282
        # they are, even to log them.
1283
        stack = getattr(self.resp, '_unauthorized_stack', [])
1✔
1284
        old = None
1✔
1285

1286
        while stack:
1✔
1287
            old = stack.pop()
1✔
1288

1289
        if old is not None:
1✔
1290
            self.resp._unauthorized = old
1✔
1291
        else:
1292
            try:
1✔
1293
                del self.resp._unauthorized
1✔
1294
            except:  # noqa
×
1295
                pass
×
1296

1297
        try:
1✔
1298
            del self.resp
1✔
1299
        except:  # noqa
×
1300
            pass
×
1301

1302

1303
_PLUGIN_TYPE_INFO = (
1✔
1304
    (IExtractionPlugin, 'IExtractionPlugin', 'extraction',
1305
     'Extraction plugins are responsible for extracting credentials '
1306
     'from the request.'),
1307
    (IAuthenticationPlugin, 'IAuthenticationPlugin', 'authentication',
1308
     'Authentication plugins are responsible for validating credentials '
1309
     'generated by the Extraction Plugin.'),
1310
    (IChallengePlugin, 'IChallengePlugin', 'challenge',
1311
     'Challenge plugins initiate a challenge to the user to provide '
1312
     'credentials.'),
1313
    (ICredentialsUpdatePlugin, 'ICredentialsUpdatePlugin',
1314
     'update credentials',
1315
     'Credential update plugins respond to the user changing '
1316
     'credentials.'),
1317
    (ICredentialsResetPlugin, 'ICredentialsResetPlugin', 'reset credentials',
1318
     'Credential clear plugins respond to a user logging out.'),
1319
    (IUserFactoryPlugin, 'IUserFactoryPlugin', 'userfactory', 'Create users.'),
1320
    (IAnonymousUserFactoryPlugin, 'IAnonymousUserFactoryPlugin',
1321
     'anonymoususerfactory', 'Create anonymous users.'),
1322
    (IPropertiesPlugin, 'IPropertiesPlugin', 'properties',
1323
     'Properties plugins generate property sheets for users.'),
1324
    (IGroupsPlugin, 'IGroupsPlugin', 'groups',
1325
     'Groups plugins determine the groups to which a user belongs.'),
1326
    (IRolesPlugin, 'IRolesPlugin', 'roles',
1327
     'Roles plugins determine the global roles which a user has.'),
1328
    (IUpdatePlugin, 'IUpdatePlugin', 'update',
1329
     'Update plugins allow the user or the application to update '
1330
     "the user's properties."),  # noqa: Q000
1331
    (IValidationPlugin, 'IValidationPlugin', 'validation',
1332
     'Validation plugins specify allowable values for user properties '
1333
     '(e.g., minimum password length, allowed characters, etc.)'),
1334
    (IUserEnumerationPlugin, 'IUserEnumerationPlugin', 'user_enumeration',
1335
     'Enumeration plugins allow querying users by ID, and searching for '
1336
     'users who match particular criteria.'),
1337
    (IUserAdderPlugin, 'IUserAdderPlugin', 'user_adder',
1338
     'User Adder plugins allow the Pluggable Auth Service to create users.'),
1339
    (IGroupEnumerationPlugin, 'IGroupEnumerationPlugin', 'group_enumeration',
1340
     'Enumeration plugins allow querying groups by ID.'),
1341
    (IRoleEnumerationPlugin, 'IRoleEnumerationPlugin', 'role_enumeration',
1342
     'Enumeration plugins allow querying roles by ID.'),
1343
    (IRoleAssignerPlugin, 'IRoleAssignerPlugin', 'role_assigner',
1344
     'Role Assigner plugins allow the Pluggable Auth Service to assign'
1345
     ' roles to principals.'),
1346
    (IChallengeProtocolChooser, 'IChallengeProtocolChooser',
1347
     'challenge_protocol_chooser',
1348
     'Challenge Protocol Chooser plugins decide what authorization'
1349
     'protocol to use for a given request type.'),
1350
    (IRequestTypeSniffer, 'IRequestTypeSniffer', 'request_type_sniffer',
1351
     'Request Type Sniffer plugins detect the type of an incoming request.'),
1352
    (INotCompetentPlugin, 'INotCompetentPlugin', 'notcompetent',
1353
     'Not-Competent plugins check whether this user folder should not '
1354
     'authenticate the current request. '
1355
     'These plugins are not used for a top level user folder. '
1356
     'They are typically used to prevent shadowing of authentications by '
1357
     'higher level user folders.'))
1358

1359

1360
def addPluggableAuthService(dispatcher, base_profile=None,
1✔
1361
                            extension_profiles=(), create_snapshot=True,
1362
                            setup_tool_id='setup_tool', REQUEST=None):
1363
    """ Add a PluggableAuthService to 'dispatcher'.
1364

1365
    o BBB for non-GenericSetup use.
1366
    """
1367
    pas = PluggableAuthService()
1✔
1368
    preg = PluginRegistry(_PLUGIN_TYPE_INFO)
1✔
1369
    preg._setId('plugins')
1✔
1370
    pas._setObject('plugins', preg)
1✔
1371
    dispatcher._setObject(pas.getId(), pas)
1✔
1372

1373
    if REQUEST is not None:
1!
1374
        REQUEST['RESPONSE'].redirect('%s/manage_workspace'
×
1375
                                     '?manage_tabs_message='
1376
                                     'PluggableAuthService+added.' %
1377
                                     dispatcher.absolute_url())
1378

1379

1380
def addConfiguredPASForm(dispatcher):
1✔
1381
    """ Wrap the PTF in 'dispatcher', including 'profile_registry' in options.
1382
    """
1383
    from Products.GenericSetup import EXTENSION
×
1384
    from Products.GenericSetup import profile_registry
×
1385

1386
    wrapped = PageTemplateFile('pasAddForm', _wwwdir).__of__(dispatcher)
×
1387

1388
    base_profiles = []
×
1389
    extension_profiles = []
×
1390

1391
    for info in profile_registry.listProfileInfo(for_=IPluggableAuthService):
×
1392
        if info.get('type') == EXTENSION:
×
1393
            extension_profiles.append(info)
×
1394
        else:
1395
            base_profiles.append(info)
×
1396

1397
    return wrapped(base_profiles=tuple(base_profiles),
×
1398
                   extension_profiles=tuple(extension_profiles))
1399

1400

1401
def addConfiguredPAS(dispatcher, base_profile, extension_profiles=(),
1✔
1402
                     create_snapshot=True, setup_tool_id='setup_tool',
1403
                     REQUEST=None):
1404
    """ Add a PluggableAuthService to 'self.
1405
    """
1406
    from Products.GenericSetup.tool import SetupTool
×
1407

1408
    pas = PluggableAuthService()
×
1409
    preg = PluginRegistry(_PLUGIN_TYPE_INFO)
×
1410
    preg._setId('plugins')
×
1411
    pas._setObject('plugins', preg)
×
1412
    dispatcher._setObject(pas.getId(), pas)
×
1413

1414
    pas = dispatcher._getOb(pas.getId())    # wrapped
×
1415
    tool = SetupTool(setup_tool_id)
×
1416
    pas._setObject(tool.getId(), tool)
×
1417

1418
    tool = pas._getOb(tool.getId())       # wrapped
×
1419
    tool.runAllImportStepsFromProfile('profile-%s' % base_profile)
×
1420

1421
    for extension_profile in extension_profiles:
×
1422
        tool.runAllImportStepsFromProfile('profile-%s' % extension_profile)
×
1423

1424
    if create_snapshot:
×
1425
        tool.createSnapshot('initial_configuration')
×
1426

1427
    if REQUEST is not None:
×
1428
        REQUEST['RESPONSE'].redirect('%s/manage_workspace'
×
1429
                                     '?manage_tabs_message='
1430
                                     'PluggableAuthService+added.' %
1431
                                     dispatcher.absolute_url())
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc