• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / AccessControl / 4274890848

pending completion
4274890848

push

github

Jens Vagelpohl
- vb [ci skip]

934 of 1458 branches covered (64.06%)

Branch coverage included in aggregate %.

4982 of 5839 relevant lines covered (85.32%)

4.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.12
/src/AccessControl/userfolder.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13
"""User folders.
5✔
14
"""
15

16
from base64 import decodebytes
5✔
17

18
from Acquisition import Implicit
5✔
19
from Acquisition import aq_base
5✔
20
from Acquisition import aq_parent
5✔
21
from AuthEncoding import AuthEncoding
5✔
22
from Persistence import Persistent
5✔
23
from Persistence import PersistentMapping
5✔
24
from zExceptions import BadRequest
5✔
25
from zExceptions import Unauthorized
5✔
26
from zope.interface import implementer
5✔
27

28
from AccessControl import ClassSecurityInfo
5✔
29
from AccessControl.class_init import InitializeClass
5✔
30
from AccessControl.interfaces import IStandardUserFolder
5✔
31
from AccessControl.Permissions import manage_users as ManageUsers
5✔
32
from AccessControl.rolemanager import DEFAULTMAXLISTUSERS
5✔
33
from AccessControl.rolemanager import RoleManager
5✔
34
from AccessControl.SecurityManagement import getSecurityManager
5✔
35
from AccessControl.SecurityManagement import newSecurityManager
5✔
36
from AccessControl.SecurityManagement import noSecurityManager
5✔
37
from AccessControl.users import User
5✔
38
from AccessControl.users import _remote_user_mode
5✔
39
from AccessControl.users import addr_match
5✔
40
from AccessControl.users import emergency_user
5✔
41
from AccessControl.users import host_match
5✔
42
from AccessControl.users import nobody
5✔
43
from AccessControl.ZopeSecurityPolicy import _noroles
5✔
44

45

46
class BasicUserFolder(Implicit, Persistent, RoleManager):
5✔
47
    """Base class for UserFolder-like objects"""
48

49
    meta_type = 'User Folder'
5✔
50
    id = 'acl_users'
5✔
51
    title = 'User Folder'
5✔
52
    zmi_icon = 'fa fa-user-friends'
5✔
53

54
    isPrincipiaFolderish = 1
5✔
55
    isAUserFolder = 1
5✔
56
    maxlistusers = DEFAULTMAXLISTUSERS
5✔
57
    encrypt_passwords = 1
5✔
58

59
    _remote_user_mode = _remote_user_mode
5✔
60
    _domain_auth_mode = 0
5✔
61
    _emergency_user = emergency_user
5✔
62
    _nobody = nobody
5✔
63

64
    security = ClassSecurityInfo()
5✔
65

66
    @security.protected(ManageUsers)
5✔
67
    def getUserNames(self):
4✔
68
        """Return a list of usernames"""
69
        raise NotImplementedError
70

71
    @security.protected(ManageUsers)
5✔
72
    def getUsers(self):
4✔
73
        """Return a list of user objects"""
74
        raise NotImplementedError
75

76
    @security.protected(ManageUsers)
5✔
77
    def getUser(self, name):
4✔
78
        """Return the named user object or None"""
79
        raise NotImplementedError
80

81
    @security.protected(ManageUsers)
5✔
82
    def getUserById(self, id, default=None):
5✔
83
        """Return the user corresponding to the given id.
84
        """
85
        # The connection between getting by ID and by name is not a strong
86
        # one
87
        user = self.getUser(id)
5✔
88
        if user is None:
5✔
89
            return default
5✔
90
        return user
5✔
91

92
    def _doAddUser(self, name, password, roles, domains, **kw):
5✔
93
        """Create a new user. This should be implemented by subclasses to
94
           do the actual adding of a user. The 'password' will be the
95
           original input password, unencrypted. The implementation of this
96
           method is responsible for performing any needed encryption.
97
           The implementation should return the created user or None."""
98
        raise NotImplementedError
99

100
    def _doChangeUser(self, name, password, roles, domains, **kw):
5✔
101
        """Modify an existing user. This should be implemented by subclasses
102
           to make the actual changes to a user. The 'password' will be the
103
           original input password, unencrypted. The implementation of this
104
           method is responsible for performing any needed encryption."""
105
        raise NotImplementedError
106

107
    def _doDelUsers(self, names):
5✔
108
        """Delete one or more users. This should be implemented by subclasses
109
           to do the actual deleting of users."""
110
        raise NotImplementedError
111

112
    def identify(self, auth):
5✔
113
        if isinstance(auth, str):
5✔
114
            auth = auth.encode('UTF-8')
5✔
115

116
        if auth and auth.lower().startswith(b'basic '):
5!
117
            try:
5✔
118
                name, password = decodebytes(auth.split(b' ')[-1]) \
5✔
119
                    .decode().split(':', 1)
120
            except BaseException:
×
121
                raise BadRequest('Invalid authentication token')
×
122
            return name, password
5✔
123
        else:
124
            return None, None
×
125

126
    def authenticate(self, name, password, request):
5✔
127
        emergency = self._emergency_user
×
128
        if name is None:
×
129
            return None
×
130
        if emergency and name == emergency.getUserName():
×
131
            user = emergency
×
132
        else:
133
            user = self.getUser(name)
×
134
        if user is not None and user.authenticate(password, request):
×
135
            return user
×
136
        else:
137
            return None
×
138

139
    def authorize(self, user, accessed, container, name, value, roles):
5✔
140
        user = getattr(user, 'aq_base', user).__of__(self)
×
141
        newSecurityManager(None, user)
×
142
        security = getSecurityManager()
×
143
        try:
×
144
            try:
×
145
                # This is evil: we cannot pass _noroles directly because
146
                # it is a special marker, and that special marker is not
147
                # the same between the C and Python policy implementations.
148
                # We __really__ need to stop using this marker pattern!
149
                if roles is _noroles:
×
150
                    if security.validate(accessed, container, name, value):
×
151
                        return 1
×
152
                else:
153
                    if security.validate(accessed, container, name, value,
×
154
                                         roles):
155
                        return 1
×
156
            except BaseException:
×
157
                noSecurityManager()
×
158
                raise
×
159
        except Unauthorized:
×
160
            pass
×
161
        return 0
×
162

163
    def validate(self, request, auth='', roles=_noroles):
5✔
164
        """
165
        this method performs identification, authentication, and
166
        authorization
167
        v is the object (value) we're validating access to
168
        n is the name used to access the object
169
        a is the object the object was accessed through
170
        c is the physical container of the object
171

172
        We allow the publishing machinery to defer to higher-level user
173
        folders or to raise an unauthorized by returning None from this
174
        method.
175
        """
176
        v = request['PUBLISHED']  # the published object
×
177
        a, c, n, v = self._getobcontext(v, request)
×
178

179
        # we need to continue to support this silly mode
180
        # where if there is no auth info, but if a user in our
181
        # database has no password and he has domain restrictions,
182
        # return him as the authorized user.
183
        if not auth:
×
184
            if self._domain_auth_mode:
×
185
                for user in self.getUsers():
×
186
                    if user.getDomains():
×
187
                        if self.authenticate(user.getUserName(), '', request):
×
188
                            if self.authorize(user, a, c, n, v, roles):
×
189
                                return user.__of__(self)
×
190

191
        name, password = self.identify(auth)
×
192
        user = self.authenticate(name, password, request)
×
193
        # user will be None if we can't authenticate him or if we can't find
194
        # his username in this user database.
195
        emergency = self._emergency_user
×
196
        if emergency and user is emergency:
×
197
            if self._isTop():
×
198
                # we do not need to authorize the emergency user against the
199
                # published object.
200
                return emergency.__of__(self)
×
201
            else:
202
                # we're not the top-level user folder
203
                return None
×
204
        elif user is None:
×
205
            # either we didn't find the username, or the user's password
206
            # was incorrect.  try to authorize and return the anonymous user.
207
            if self._isTop() and \
×
208
               self.authorize(self._nobody, a, c, n, v, roles):
209
                return self._nobody.__of__(self)
×
210
            else:
211
                # anonymous can't authorize or we're not top-level user folder
212
                return None
×
213
        else:
214
            # We found a user, his password was correct, and the user
215
            # wasn't the emergency user.  We need to authorize the user
216
            # against the published object.
217
            if self.authorize(user, a, c, n, v, roles):
×
218
                return user.__of__(self)
×
219
            # That didn't work.  Try to authorize the anonymous user.
220
            elif self._isTop() and \
×
221
                    self.authorize(self._nobody, a, c, n, v, roles):
222
                return self._nobody.__of__(self)
×
223
            else:
224
                # we can't authorize the user, and we either can't authorize
225
                # nobody against the published object or we're not top-level
226
                return None
×
227

228
    if _remote_user_mode:
5!
229

230
        def validate(self, request, auth='', roles=_noroles):  # NOQA: F811
×
231
            v = request['PUBLISHED']
×
232
            a, c, n, v = self._getobcontext(v, request)
×
233
            name = request.environ.get('REMOTE_USER', None)
×
234
            if name is None:
×
235
                if self._domain_auth_mode:
×
236
                    for user in self.getUsers():
×
237
                        if user.getDomains():
×
238
                            if self.authenticate(user.getUserName(),
×
239
                                                 '', request):
240
                                if self.authorize(user, a, c, n, v, roles):
×
241
                                    return user.__of__(self)
×
242

243
            user = self.getUser(name)
×
244
            # user will be None if we can't find his username in this user
245
            # database.
246
            emergency = self._emergency_user
×
247
            if emergency and name == emergency.getUserName():
×
248
                if self._isTop():
×
249
                    # we do not need to authorize the emergency user against
250
                    # the published object.
251
                    return emergency.__of__(self)
×
252
                else:
253
                    # we're not the top-level user folder
254
                    return None
×
255
            elif user is None:
×
256
                # we didn't find the username in this database
257
                # try to authorize and return the anonymous user.
258
                if self._isTop() and self.authorize(self._nobody,
×
259
                                                    a, c, n, v, roles):
260
                    return self._nobody.__of__(self)
×
261
                else:
262
                    # anonymous can't authorize or we're not top-level user
263
                    # folder
264
                    return None
×
265
            else:
266
                # We found a user and the user wasn't the emergency user.
267
                # We need to authorize the user against the published object.
268
                if self.authorize(user, a, c, n, v, roles):
×
269
                    return user.__of__(self)
×
270
                # That didn't work.  Try to authorize the anonymous user.
271
                elif self._isTop() and self.authorize(
×
272
                        self._nobody, a, c, n, v, roles):
273
                    return self._nobody.__of__(self)
×
274
                else:
275
                    # we can't authorize the user, and we either can't
276
                    # authorize nobody against the published object or
277
                    # we're not top-level
278
                    return None
×
279

280
    def _getobcontext(self, v, request):
5✔
281
        """
282
        v is the object (value) we're validating access to
283
        n is the name used to access the object
284
        a is the object the object was accessed through
285
        c is the physical container of the object
286
        """
287
        if len(request.steps) == 0:  # someone deleted root index_html
×
288
            request.RESPONSE.notFoundError('no default view (root default view'
×
289
                                           ' was probably deleted)')
290
        n = request.steps[-1]
×
291
        # default to accessed and container as v.__parent__
292
        a = c = request['PARENTS'][0]
×
293
        # try to find actual container
294
        inner = getattr(v, 'aq_inner', v)
×
295
        innerparent = getattr(inner, '__parent__', None)
×
296
        if innerparent is not None:
×
297
            # this is not a method, we needn't treat it specially
298
            c = innerparent
×
299
        elif getattr(v, '__self__', None) is not None:
×
300
            # this is a method, we need to treat it specially
301
            c = v.__self__
×
302
            c = getattr(v, 'aq_inner', v)
×
303
        request_container = getattr(request['PARENTS'][-1], '__parent__', [])
×
304
        # if pub's __parent__ or container is the request container, it
305
        # means pub was accessed from the root
306
        if a is request_container:
×
307
            a = request['PARENTS'][-1]
×
308
        if c is request_container:
×
309
            c = request['PARENTS'][-1]
×
310

311
        return a, c, n, v
×
312

313
    def _isTop(self):
5✔
314
        try:
×
315
            parent = aq_base(aq_parent(self))
×
316
            return parent.isTopLevelPrincipiaApplicationObject
×
317
        except BaseException:
×
318
            return 0
×
319

320
    def __len__(self):
5✔
321
        return 1
×
322

323
    def _isPasswordEncrypted(self, pw):
5✔
324
        return AuthEncoding.is_encrypted(pw)
5✔
325

326
    def _encryptPassword(self, pw):
5✔
327
        return AuthEncoding.pw_encrypt(pw, 'SSHA')
5✔
328

329
    def domainSpecValidate(self, spec):
5✔
330
        for ob in spec:
×
331
            am = addr_match(ob)
×
332
            hm = host_match(ob)
×
333
            if am is None and hm is None:
×
334
                return 0
×
335
        return 1
×
336

337
    @security.protected(ManageUsers)
5✔
338
    def user_names(self):
4✔
339
        return self.getUserNames()
×
340

341
    def __creatable_by_emergency_user__(self):
5✔
342
        return 1
×
343

344
    # Domain authentication support. This is a good candidate to
345
    # become deprecated in future Zope versions.
346

347
    @security.protected(ManageUsers)
5✔
348
    def setDomainAuthenticationMode(self, domain_auth_mode):
4✔
349
        """Set the domain-based authentication mode. By default, this
350
           mode is off due to the high overhead of the operation that
351
           is incurred for all anonymous accesses. If you have the
352
           'Manage Users' permission, you can call this method via
353
           the web, passing a boolean value for domain_auth_mode to
354
           turn this behavior on or off."""
355
        v = self._domain_auth_mode = domain_auth_mode and 1 or 0
×
356
        return 'Domain authentication mode set to %d' % v
×
357

358
    def domainAuthModeEnabled(self):
5✔
359
        """ returns true if domain auth mode is set to true"""
360
        return getattr(self, '_domain_auth_mode', None)
×
361

362

363
InitializeClass(BasicUserFolder)
5✔
364

365

366
@implementer(IStandardUserFolder)
5✔
367
class UserFolder(BasicUserFolder):
5✔
368
    """Standard UserFolder object
369

370
    A UserFolder holds User objects which contain information
371
    about users including name, password domain, and roles.
372
    UserFolders function chiefly to control access by authenticating
373
    users and binding them to a collection of roles."""
374

375
    meta_type = 'User Folder'
5✔
376
    zmi_show_add_dialog = False
5✔
377
    id = 'acl_users'
5✔
378
    title = 'User Folder'
5✔
379

380
    def __init__(self):
5✔
381
        self.data = PersistentMapping()
5✔
382

383
    def getUserNames(self):
5✔
384
        """Return a list of usernames"""
385
        names = self.data.keys()
5✔
386
        return sorted(names)
5✔
387

388
    def getUsers(self):
5✔
389
        """Return a list of user objects"""
390
        data = self.data
5✔
391
        names = data.keys()
5✔
392
        return [data[n] for n in sorted(names)]
5✔
393

394
    def getUser(self, name):
5✔
395
        """Return the named user object or None"""
396
        return self.data.get(name, None)
5✔
397

398
    def hasUsers(self):
5✔
399
        """ This is not a formal API method: it is used only to provide
400
        a way for the quickstart page to determine if the default user
401
        folder contains any users to provide instructions on how to
402
        add a user for newbies.  Using getUserNames or getUsers would have
403
        posed a denial of service risk."""
404
        return not not len(self.data)
×
405

406
    def _doAddUser(self, name, password, roles, domains, **kw):
5✔
407
        """Create a new user
408

409
        Note that an existing user of this name is simply overwritten."""
410
        if password is not None and self.encrypt_passwords and \
5✔
411
                not self._isPasswordEncrypted(password):
412
            password = self._encryptPassword(password)
5✔
413
        self.data[name] = User(name, password, roles, domains)
5✔
414
        return self.data[name]
5✔
415

416
    def _doChangeUser(self, name, password, roles, domains, **kw):
5✔
417
        user = self.data[name]
×
418
        if password is not None:
×
419
            if self.encrypt_passwords and \
×
420
               not self._isPasswordEncrypted(password):
421
                password = self._encryptPassword(password)
×
422
            user.__ = password
×
423
        user.roles = roles
×
424
        user.domains = domains
×
425

426
    def _doDelUsers(self, names):
5✔
427
        for name in names:
×
428
            del self.data[name]
×
429

430

431
InitializeClass(UserFolder)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc