• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.PluggableAuthService / 5235276870

pending completion
5235276870

push

github

web-flow
Drop support for Python 2.7, 3.5, 3.6. (#116)

* Drop zserver extra in setup.py. Thus dropping FTP support.
* Drop support for Zope < 5.
Co-authored-by: Jens Vagelpohl <jens@plyp.com>

1288 of 1745 branches covered (73.81%)

Branch coverage included in aggregate %.

127 of 127 new or added lines in 30 files covered. (100.0%)

9619 of 10349 relevant lines covered (92.95%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.15
/src/Products/PluggableAuthService/plugins/ZODBUserManager.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
7
# distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
""" Classes: ZODBUserManager
1✔
15
"""
16
import copy
1✔
17
import logging
1✔
18
from hashlib import sha1
1✔
19

20
from AccessControl import ClassSecurityInfo
1✔
21
from AccessControl.class_init import InitializeClass
1✔
22
from AccessControl.requestmethod import postonly
1✔
23
from AccessControl.SecurityManagement import getSecurityManager
1✔
24
from AuthEncoding import AuthEncoding
1✔
25
from BTrees.OOBTree import OOBTree
1✔
26
from OFS.Cache import Cacheable
1✔
27
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
28
from zope.interface import Interface
1✔
29

30
from ..interfaces.plugins import IAuthenticationPlugin
1✔
31
from ..interfaces.plugins import IUserAdderPlugin
1✔
32
from ..interfaces.plugins import IUserEnumerationPlugin
1✔
33
from ..permissions import ManageUsers
1✔
34
from ..permissions import SetOwnPassword
1✔
35
from ..plugins.BasePlugin import BasePlugin
1✔
36
from ..utils import classImplements
1✔
37
from ..utils import createViewName
1✔
38
from ..utils import csrf_only
1✔
39

40

41
logger = logging.getLogger('PluggableAuthService')
1✔
42

43

44
class IZODBUserManager(Interface):
1✔
45
    """ Marker interface.
46
    """
47

48

49
manage_addZODBUserManagerForm = PageTemplateFile(
1✔
50
    'www/zuAdd', globals(), __name__='manage_addZODBUserManagerForm')
51

52

53
def addZODBUserManager(dispatcher, id, title=None, REQUEST=None):
1✔
54
    """ Add a ZODBUserManager to a Pluggable Auth Service. """
55

56
    zum = ZODBUserManager(id, title)
1✔
57
    dispatcher._setObject(zum.getId(), zum)
1✔
58

59
    if REQUEST is not None:
1!
60
        REQUEST['RESPONSE'].redirect('%s/manage_workspace'
×
61
                                     '?manage_tabs_message='
62
                                     'ZODBUserManager+added.' %
63
                                     dispatcher.absolute_url())
64

65

66
class ZODBUserManager(BasePlugin, Cacheable):
1✔
67

68
    """ PAS plugin for managing users in the ZODB.
69
    """
70

71
    meta_type = 'ZODB User Manager'
1✔
72
    zmi_icon = 'fas fa-user'
1✔
73

74
    security = ClassSecurityInfo()
1✔
75

76
    def __init__(self, id, title=None):
1✔
77

78
        self._id = self.id = id
1✔
79
        self.title = title
1✔
80

81
        self._user_passwords = OOBTree()
1✔
82
        self._login_to_userid = OOBTree()
1✔
83
        self._userid_to_login = OOBTree()
1✔
84

85
    #
86
    #   IAuthenticationPlugin implementation
87
    #
88
    @security.private
1✔
89
    def authenticateCredentials(self, credentials):
1✔
90
        """ See IAuthenticationPlugin.
91

92
        o We expect the credentials to be those returned by
93
          ILoginPasswordExtractionPlugin.
94
        """
95
        login = credentials.get('login')
1✔
96
        password = credentials.get('password')
1✔
97

98
        if login is None or password is None:
1✔
99
            return None
1✔
100

101
        # Do we have a link between login and userid?  Do NOT fall
102
        # back to using the login as userid when there is no match, as
103
        # that gives a high chance of seeming to log in successfully,
104
        # but in reality failing.
105
        userid = self._login_to_userid.get(login)
1✔
106
        if userid is None:
1✔
107
            # Someone may be logging in with a userid instead of a
108
            # login name and the two are not the same.  We could try
109
            # turning those around, but really we should just fail.
110
            #
111
            # userid = login
112
            # login = self._userid_to_login.get(userid)
113
            # if login is None:
114
            #     return None
115
            return None
1✔
116

117
        reference = self._user_passwords.get(userid)
1✔
118

119
        if reference is None:
1!
120
            return None
×
121

122
        if AuthEncoding.is_encrypted(reference):
1✔
123
            if AuthEncoding.pw_validate(reference, password):
1✔
124
                return userid, login
1✔
125

126
        # Support previous naive behavior
127
        if isinstance(password, str):
1!
128
            password = password.encode('utf8')
1✔
129
        digested = sha1(password).hexdigest()
1✔
130

131
        if reference == digested:
1✔
132
            return userid, login
1✔
133

134
        return None
1✔
135

136
    #
137
    #   IUserEnumerationPlugin implementation
138
    #
139
    @security.private
1✔
140
    def enumerateUsers(self, id=None, login=None, exact_match=False,
1✔
141
                       sort_by=None, max_results=None, **kw):
142
        """ See IUserEnumerationPlugin.
143
        """
144
        user_info = []
1✔
145
        user_ids = []
1✔
146
        plugin_id = self.getId()
1✔
147
        view_name = createViewName('enumerateUsers', id or login)
1✔
148

149
        if isinstance(id, str):
1✔
150
            id = [id]
1✔
151

152
        if isinstance(login, str):
1✔
153
            login = [login]
1✔
154

155
        # Look in the cache first...
156
        keywords = copy.deepcopy(kw)
1✔
157
        keywords.update({'id': id, 'login': login,
1✔
158
                         'exact_match': exact_match, 'sort_by': sort_by,
159
                         'max_results': max_results})
160
        cached_info = self.ZCacheable_get(view_name=view_name,
1✔
161
                                          keywords=keywords, default=None)
162
        if cached_info is not None:
1!
163
            return tuple(cached_info)
×
164

165
        terms = id or login
1✔
166

167
        if exact_match:
1✔
168
            if terms:
1!
169

170
                if id:
1✔
171
                    # if we're doing an exact match based on id, it
172
                    # absolutely will have been qualified (if we have a
173
                    # prefix), so we can ignore any that don't begin with
174
                    # our prefix
175
                    id = [x for x in id if x.startswith(self.prefix)]
1✔
176
                    user_ids.extend([x[len(self.prefix):] for x in id])
1✔
177
                elif login:
1!
178
                    user_ids.extend([self._login_to_userid.get(x)
1✔
179
                                     for x in login])
180

181
                # we're claiming an exact match search, if we still don't
182
                # have anything, better bail.
183
                if not user_ids:
1✔
184
                    return ()
1✔
185
            else:
186
                # insane - exact match with neither login nor id
187
                return ()
×
188

189
        if user_ids:
1✔
190
            user_filter = None
1✔
191

192
        else:   # Searching
193
            user_ids = self.listUserIds()
1✔
194
            user_filter = _ZODBUserFilter(id, login, **kw)
1✔
195

196
        for user_id in user_ids:
1✔
197

198
            if self._userid_to_login.get(user_id):
1✔
199
                e_url = '%s/manage_users' % self.getId()
1✔
200
                qs = 'user_id=%s' % user_id
1✔
201

202
                info = {'id': self.prefix + user_id,
1✔
203
                        'login': self._userid_to_login[user_id],
204
                        'pluginid': plugin_id,
205
                        'editurl': f'{e_url}?{qs}'}
206

207
                if not user_filter or user_filter(info):
1✔
208
                    user_info.append(info)
1✔
209

210
        # Put the computed value into the cache
211
        self.ZCacheable_set(user_info, view_name=view_name, keywords=keywords)
1✔
212

213
        return tuple(user_info)
1✔
214

215
    #
216
    #   IUserAdderPlugin implementation
217
    #
218
    @security.private
1✔
219
    def doAddUser(self, login, password):
1✔
220
        try:
1✔
221
            self.addUser(login, login, password)
1✔
222
        except KeyError:
×
223
            return False
×
224
        return True
1✔
225

226
    #
227
    #   (notional)IZODBUserManager interface
228
    #
229
    @security.protected(ManageUsers)
1✔
230
    def listUserIds(self):
1✔
231
        """ -> (user_id_1, ... user_id_n)
232
        """
233
        return self._user_passwords.keys()
1✔
234

235
    @security.protected(ManageUsers)
1✔
236
    def getUserInfo(self, user_id):
1✔
237
        """ user_id -> dict
238
        """
239
        return {'user_id': user_id,
1✔
240
                'login_name': self._userid_to_login[user_id],
241
                'pluginid': self.getId()}
242

243
    @security.protected(ManageUsers)
1✔
244
    def listUserInfo(self):
1✔
245
        """ -> (dict, ...dict)
246

247
        o Return one mapping per user, with the following keys:
248

249
          - 'user_id'
250
          - 'login_name'
251
        """
252
        return [self.getUserInfo(x) for x in self._user_passwords.keys()]
1✔
253

254
    @security.protected(ManageUsers)
1✔
255
    def getUserIdForLogin(self, login_name):
1✔
256
        """ login_name -> user_id
257

258
        o Raise KeyError if no user exists for the login name.
259
        """
260
        return self._login_to_userid[login_name]
1✔
261

262
    @security.protected(ManageUsers)
1✔
263
    def getLoginForUserId(self, user_id):
1✔
264
        """ user_id -> login_name
265

266
        o Raise KeyError if no user exists for that ID.
267
        """
268
        return self._userid_to_login[user_id]
1✔
269

270
    @security.private
1✔
271
    def addUser(self, user_id, login_name, password):
1✔
272

273
        if self._user_passwords.get(user_id) is not None:
1✔
274
            raise KeyError('Duplicate user ID: %s' % user_id)
1✔
275

276
        if self._login_to_userid.get(login_name) is not None:
1✔
277
            raise KeyError('Duplicate login name: %s' % login_name)
1✔
278

279
        self._user_passwords[user_id] = self._pw_encrypt(password)
1✔
280
        self._login_to_userid[login_name] = user_id
1✔
281
        self._userid_to_login[user_id] = login_name
1✔
282

283
        # enumerateUsers return value has changed
284
        view_name = createViewName('enumerateUsers')
1✔
285
        self.ZCacheable_invalidate(view_name=view_name)
1✔
286

287
    @security.private
1✔
288
    def updateUser(self, user_id, login_name):
1✔
289

290
        # The following raises a KeyError if the user_id is invalid
291
        old_login = self.getLoginForUserId(user_id)
1✔
292

293
        if old_login != login_name:
1✔
294

295
            if self._login_to_userid.get(login_name) is not None:
1✔
296
                raise ValueError('Login name not available: %s' % login_name)
1✔
297

298
            del self._login_to_userid[old_login]
1✔
299
            self._login_to_userid[login_name] = user_id
1✔
300
            self._userid_to_login[user_id] = login_name
1✔
301
        # Signal success.
302
        return True
1✔
303

304
    @security.private
1✔
305
    def updateEveryLoginName(self, quit_on_first_error=True):
1✔
306
        # Update all login names to their canonical value.  This
307
        # should be done after changing the login_transform property
308
        # of pas.  You can set quit_on_first_error to False to report
309
        # all errors before quitting with an error.  This can be
310
        # useful if you want to know how many problems there are, if
311
        # any.
312
        pas = self._getPAS()
1✔
313
        transform = pas._get_login_transform_method()
1✔
314
        if not transform:
1✔
315
            logger.warning('PAS has a non-existing, empty or wrong '
1✔
316
                           'login_transform property.')
317
            return
1✔
318

319
        # Make a fresh mapping, as we do not want to add or remove
320
        # items to the original mapping while we are iterating over
321
        # it.
322
        new_login_to_userid = OOBTree()
1✔
323
        errors = []
1✔
324
        for old_login_name, user_id in self._login_to_userid.items():
1✔
325
            new_login_name = transform(old_login_name)
1✔
326
            if new_login_name in new_login_to_userid:
1!
327
                logger.error('User id %s: login name %r already taken.',
×
328
                             user_id, new_login_name)
329
                errors.append(new_login_name)
×
330
                if quit_on_first_error:
×
331
                    break
×
332
            new_login_to_userid[new_login_name] = user_id
1✔
333
            if new_login_name != old_login_name:
1!
334
                self._userid_to_login[user_id] = new_login_name
1✔
335
                # Also, remove from the cache
336
                view_name = createViewName('enumerateUsers', user_id)
1✔
337
                self.ZCacheable_invalidate(view_name=view_name)
1✔
338
                logger.debug('User id %s: changed login name from %r to %r.',
1✔
339
                             user_id, old_login_name, new_login_name)
340

341
        # If there were errors, we do not want to save any changes.
342
        if errors:
1!
343
            logger.error('There were %d errors when updating login names. '
×
344
                         'quit_on_first_error was %r', len(errors),
345
                         quit_on_first_error)
346
            # Make sure the exception we raise is not swallowed.
347
            self._dont_swallow_my_exceptions = True
×
348
            raise ValueError('Transformed login names are not unique: %s.' %
×
349
                             ', '.join(errors))
350

351
        # Make sure we did not lose any users.
352
        assert (len(self._login_to_userid.keys())
1✔
353
                == len(new_login_to_userid.keys()))
354
        # Empty the main cache.
355
        view_name = createViewName('enumerateUsers')
1✔
356
        self.ZCacheable_invalidate(view_name=view_name)
1✔
357
        # Store the new login mapping.
358
        self._login_to_userid = new_login_to_userid
1✔
359

360
    @security.private
1✔
361
    def removeUser(self, user_id):
1✔
362

363
        if self._user_passwords.get(user_id) is None:
1✔
364
            raise KeyError('Invalid user ID: %s' % user_id)
1✔
365

366
        login_name = self._userid_to_login[user_id]
1✔
367

368
        del self._user_passwords[user_id]
1✔
369
        del self._login_to_userid[login_name]
1✔
370
        del self._userid_to_login[user_id]
1✔
371

372
        # Also, remove from the cache
373
        view_name = createViewName('enumerateUsers')
1✔
374
        self.ZCacheable_invalidate(view_name=view_name)
1✔
375
        view_name = createViewName('enumerateUsers', user_id)
1✔
376
        self.ZCacheable_invalidate(view_name=view_name)
1✔
377

378
    @security.private
1✔
379
    def updateUserPassword(self, user_id, password):
1✔
380

381
        if self._user_passwords.get(user_id) is None:
1!
382
            raise KeyError('Invalid user ID: %s' % user_id)
×
383

384
        if password:
1!
385
            self._user_passwords[user_id] = self._pw_encrypt(password)
1✔
386

387
    @security.private
1✔
388
    def _pw_encrypt(self, password):
1✔
389
        """Returns the AuthEncoding encrypted password
390

391
        If 'password' is already encrypted, it is returned
392
        as is and not encrypted again.
393
        """
394
        if AuthEncoding.is_encrypted(password):
1✔
395
            return password
1✔
396
        return AuthEncoding.pw_encrypt(password)
1✔
397

398
    #
399
    #   ZMI
400
    #
401
    manage_options = (({'label': 'Users', 'action': 'manage_users'},)
1✔
402
                      + BasePlugin.manage_options
403
                      + Cacheable.manage_options)
404

405
    security.declarePublic('manage_widgets')  # NOQA: D001
1✔
406
    manage_widgets = PageTemplateFile('www/zuWidgets', globals(),
1✔
407
                                      __name__='manage_widgets')
408

409
    security.declareProtected(ManageUsers, 'manage_users')  # NOQA: D001
1✔
410
    manage_users = PageTemplateFile('www/zuUsers', globals(),
1✔
411
                                    __name__='manage_users')
412

413
    @security.protected(ManageUsers)
1✔
414
    @csrf_only
1✔
415
    @postonly
1✔
416
    def manage_addUser(self, user_id, login_name, password, confirm,
1✔
417
                       RESPONSE=None, REQUEST=None):
418
        """ Add a user via the ZMI.
419
        """
420
        if password != confirm:
×
421
            message = 'password+and+confirm+do+not+match'
×
422

423
        elif not user_id:
×
424
            message = 'Please+provide+a+User+ID'
×
425

426
        else:
427

428
            if not login_name:
×
429
                login_name = user_id
×
430

431
            # ???:  validate 'user_id', 'login_name' against policies?
432

433
            self.addUser(user_id, login_name, password)
×
434

435
            message = 'User+added'
×
436

437
        if RESPONSE is not None:
×
438
            RESPONSE.redirect('%s/manage_users?manage_tabs_message=%s' %
×
439
                              (self.absolute_url(), message))
440

441
    @security.protected(ManageUsers)
1✔
442
    @csrf_only
1✔
443
    @postonly
1✔
444
    def manage_updateUserPassword(self, user_id, password, confirm,
1✔
445
                                  RESPONSE=None, REQUEST=None):
446
        """ Update a user's login name / password via the ZMI.
447
        """
448
        if password and password != confirm:
1!
449
            message = 'password+and+confirm+do+not+match'
×
450

451
        else:
452

453
            self.updateUserPassword(user_id, password)
1✔
454

455
            message = 'password+updated'
1✔
456

457
        if RESPONSE is not None:
1!
458
            RESPONSE.redirect('%s/manage_users?manage_tabs_message=%s' %
×
459
                              (self.absolute_url(), message))
460

461
    @security.protected(ManageUsers)
1✔
462
    @csrf_only
1✔
463
    @postonly
1✔
464
    def manage_updateUser(self, user_id, login_name, RESPONSE=None,
1✔
465
                          REQUEST=None):
466
        """ Update a user's login name via the ZMI.
467
        """
468
        if not login_name:
×
469
            login_name = user_id
×
470

471
        # ???:  validate 'user_id', 'login_name' against policies?
472

473
        self.updateUser(user_id, login_name)
×
474

475
        message = 'Login+name+updated'
×
476

477
        if RESPONSE is not None:
×
478
            RESPONSE.redirect('%s/manage_users?manage_tabs_message=%s' %
×
479
                              (self.absolute_url(), message))
480

481
    @security.protected(ManageUsers)
1✔
482
    @csrf_only
1✔
483
    @postonly
1✔
484
    def manage_removeUsers(self, user_ids, RESPONSE=None, REQUEST=None):
1✔
485
        """ Remove one or more users via the ZMI.
486
        """
487
        user_ids = [_f for _f in user_ids if _f]
1✔
488

489
        if not user_ids:
1!
490
            message = 'no+users+selected'
×
491

492
        else:
493

494
            for user_id in user_ids:
1✔
495
                self.removeUser(user_id)
1✔
496

497
            message = 'Users+removed'
1✔
498

499
        if RESPONSE is not None:
1!
500
            RESPONSE.redirect('%s/manage_users?manage_tabs_message=%s' %
×
501
                              (self.absolute_url(), message))
502

503
    #
504
    #   Allow users to change their own login name and password.
505
    #
506
    @security.protected(SetOwnPassword)
1✔
507
    def getOwnUserInfo(self):
1✔
508
        """ Return current user's info.
509
        """
510
        user_id = getSecurityManager().getUser().getId()
×
511

512
        return self.getUserInfo(user_id)
×
513

514
    security.declareProtected(SetOwnPassword,  # NOQA: D001
1✔
515
                              'manage_updatePasswordForm')
516
    manage_updatePasswordForm = PageTemplateFile(
1✔
517
        'www/zuPasswd',
518
        globals(),
519
        __name__='manage_updatePasswordForm')
520

521
    @security.protected(SetOwnPassword)
1✔
522
    @csrf_only
1✔
523
    @postonly
1✔
524
    def manage_updatePassword(self, login_name, password, confirm,
1✔
525
                              RESPONSE=None, REQUEST=None):
526
        """ Update the current user's password and login name.
527
        """
528
        user_id = getSecurityManager().getUser().getId()
1✔
529
        if password != confirm:
1!
530
            message = 'password+and+confirm+do+not+match'
×
531

532
        else:
533

534
            if not login_name:
1!
535
                login_name = user_id
×
536

537
            # ???:  validate 'user_id', 'login_name' against policies?
538
            self.updateUser(user_id, login_name)
1✔
539
            self.updateUserPassword(user_id, password)
1✔
540

541
            message = 'password+updated'
1✔
542

543
        if RESPONSE is not None:
1!
544
            RESPONSE.redirect('%s/manage_updatePasswordForm'
×
545
                              '?manage_tabs_message=%s' %
546
                              (self.absolute_url(), message))
547

548

549
classImplements(ZODBUserManager, IZODBUserManager, IAuthenticationPlugin,
1✔
550
                IUserEnumerationPlugin, IUserAdderPlugin)
551

552
InitializeClass(ZODBUserManager)
1✔
553

554

555
class _ZODBUserFilter:
1✔
556

557
    def __init__(self, id=None, login=None, **kw):
1✔
558

559
        self._filter_ids = id
1✔
560
        self._filter_logins = login
1✔
561
        self._filter_keywords = kw
1✔
562

563
    def __call__(self, user_info):
1✔
564

565
        if self._filter_ids:
1✔
566

567
            key = 'id'
1✔
568
            to_test = self._filter_ids
1✔
569

570
        elif self._filter_logins:
1✔
571

572
            key = 'login'
1✔
573
            to_test = self._filter_logins
1✔
574

575
        elif self._filter_keywords:
1✔
576
            return 0    # ???:  try using 'kw'
1✔
577

578
        else:
579
            return 1    # the search is done without any criteria
1✔
580

581
        value = user_info.get(key)
1✔
582

583
        if not value:
1!
584
            return 0
×
585

586
        for contained in to_test:
1✔
587
            if value.lower().find(contained.lower()) >= 0:
1✔
588
                return 1
1✔
589

590
        return 0
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc