• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 9488668646

12 Jun 2024 07:36PM UTC coverage: 82.268% (+0.01%) from 82.257%
9488668646

push

github

web-flow
Return 404 instead of IndexError for traversal past the root (#1219)

4264 of 6886 branches covered (61.92%)

Branch coverage included in aggregate %.

12 of 12 new or added lines in 2 files covered. (100.0%)

1 existing line in 1 file now uncovered.

28013 of 32348 relevant lines covered (86.6%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.46
/src/ZPublisher/BaseRequest.py
1
##############################################################################
2
#
3
# Copyright (c) 2002-2024 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13
""" Basic ZPublisher request management.
14
"""
15

16
import types
1✔
17
import warnings
1✔
18
from os import environ
1✔
19
from urllib.parse import quote as urllib_quote
1✔
20

21
from AccessControl.ZopeSecurityPolicy import getRoles
1✔
22
from Acquisition import aq_base
1✔
23
from Acquisition import aq_inner
1✔
24
from Acquisition.interfaces import IAcquirer
1✔
25
from ExtensionClass import Base
1✔
26
from zExceptions import Forbidden
1✔
27
from zExceptions import NotFound
1✔
28
from zope.component import queryMultiAdapter
1✔
29
from zope.event import notify
1✔
30
from zope.interface import Interface
1✔
31
from zope.interface import implementer
1✔
32
from zope.location.interfaces import LocationError
1✔
33
from zope.publisher.defaultview import queryDefaultViewName
1✔
34
from zope.publisher.interfaces import EndRequestEvent
1✔
35
from zope.publisher.interfaces import IPublishTraverse
1✔
36
from zope.publisher.interfaces import NotFound as ztkNotFound
1✔
37
from zope.publisher.interfaces.browser import IBrowserPublisher
1✔
38
from zope.traversing.namespace import namespaceLookup
1✔
39
from zope.traversing.namespace import nsParse
1✔
40
from ZPublisher import zpublish_mark
1✔
41
from ZPublisher.Converters import type_converters
1✔
42
from ZPublisher.interfaces import UseTraversalDefault
1✔
43
from ZPublisher.xmlrpc import is_xmlrpc_response
1✔
44

45

46
_marker = []
1✔
47
UNSPECIFIED_ROLES = ''
1✔
48

49

50
def quote(text):
1✔
51
    # quote url path segments, but leave + and @ intact
52
    return urllib_quote(text, '/+@')
1✔
53

54

55
class RequestContainer(Base):
1✔
56
    __roles__ = None
1✔
57

58
    def __init__(self, **kw):
1✔
59
        for k, v in kw.items():
1✔
60
            self.__dict__[k] = v
1✔
61

62
    def manage_property_types(self):
1✔
63
        return list(type_converters.keys())
×
64

65

66
@implementer(IBrowserPublisher)
1✔
67
class DefaultPublishTraverse:
1✔
68

69
    def __init__(self, context, request):
1✔
70
        self.context = context
1✔
71
        self.request = request
1✔
72

73
    def publishTraverse(self, request, name):
1✔
74
        object = self.context
1✔
75
        URL = request['URL']
1✔
76

77
        if name[:1] == '_':
1!
78
            raise Forbidden(
×
79
                "Object name begins with an underscore at: %s" % URL)
80

81
        subobject = UseTraversalDefault  # indicator
1✔
82
        try:
1✔
83
            if hasattr(object, '__bobo_traverse__'):
1✔
84
                try:
1✔
85
                    subobject = object.__bobo_traverse__(request, name)
1✔
86
                    if isinstance(subobject, tuple) and len(subobject) > 1:
1!
87
                        # Add additional parents into the path
88
                        # XXX There are no tests for this:
89
                        request['PARENTS'][-1:] = list(subobject[:-1])
×
90
                        object, subobject = subobject[-2:]
×
91
                except (AttributeError, KeyError, NotFound) as e:
1✔
92
                    # Try to find a view
93
                    subobject = queryMultiAdapter(
1✔
94
                        (object, request), Interface, name)
95
                    if subobject is not None:
1✔
96
                        # OFS.Application.__bobo_traverse__ calls
97
                        # REQUEST.RESPONSE.notFoundError which sets the HTTP
98
                        # status code to 404
99
                        request.response.setStatus(200)
1✔
100
                        # We don't need to do the docstring security check
101
                        # for views, so lets skip it and
102
                        # return the object here.
103
                        if IAcquirer.providedBy(subobject):
1!
104
                            subobject = subobject.__of__(object)
×
105
                        return subobject
1✔
106
                    # No view found. Reraise the error
107
                    # raised by __bobo_traverse__
108
                    raise e
1✔
109
        except UseTraversalDefault:
1✔
110
            pass
1✔
111
        if subobject is UseTraversalDefault:
1✔
112
            # No __bobo_traverse__ or default traversal requested
113
            # Try with an unacquired attribute:
114
            if hasattr(aq_base(object), name):
1✔
115
                subobject = getattr(object, name)
1✔
116
            else:
117
                # We try to fall back to a view:
118
                subobject = queryMultiAdapter((object, request), Interface,
1✔
119
                                              name)
120
                if subobject is not None:
1✔
121
                    if IAcquirer.providedBy(subobject):
1✔
122
                        subobject = subobject.__of__(object)
1✔
123
                    return subobject
1✔
124

125
                # And lastly, of there is no view, try acquired attributes, but
126
                # only if there is no __bobo_traverse__:
127
                try:
1✔
128
                    subobject = getattr(object, name)
1✔
129
                    # Again, clear any error status created by
130
                    # __bobo_traverse__ because we actually found something:
131
                    request.response.setStatus(200)
1✔
132
                except AttributeError:
1✔
133
                    pass
1✔
134

135
                # Lastly we try with key access:
136
                if subobject is None:
1✔
137
                    try:
1✔
138
                        subobject = object[name]
1✔
139
                    except TypeError:  # unsubscriptable
1✔
140
                        raise KeyError(name)
1✔
141

142
        self.request.ensure_publishable(subobject)
1✔
143
        return subobject
1✔
144

145
    def browserDefault(self, request):
1✔
146
        if hasattr(self.context, '__browser_default__'):
1✔
147
            return self.context.__browser_default__(request)
1✔
148
        # Zope 3.2 still uses IDefaultView name when it
149
        # registeres default views, even though it's
150
        # deprecated. So we handle that here:
151
        default_name = queryDefaultViewName(self.context, request)
1✔
152
        if default_name is not None:
1✔
153
            # Adding '@@' here forces this to be a view.
154
            # A neater solution might be desireable.
155
            return self.context, ('@@' + default_name,)
1✔
156
        return self.context, ()
1✔
157

158

159
class BaseRequest:
1✔
160
    """Provide basic ZPublisher request management
161

162
    This object provides access to request data. Request data may
163
    vary depending on the protocol used.
164

165
    Request objects are created by the object publisher and will be
166
    passed to published objects through the argument name, REQUEST.
167

168
    The request object is a mapping object that represents a
169
    collection of variable to value mappings.
170
    """
171

172
    maybe_webdav_client = 1
1✔
173

174
    # While the following assignment is not strictly necessary, it
175
    # prevents alot of unnecessary searches because, without it,
176
    # acquisition of REQUEST is disallowed, which penalizes access
177
    # in DTML with tags.
178
    __roles__ = None
1✔
179
    _file = None
1✔
180
    common = {}  # Common request data
1✔
181
    _auth = None
1✔
182
    _held = ()
1✔
183

184
    # Allow (reluctantly) access to unprotected attributes
185
    __allow_access_to_unprotected_subobjects__ = 1
1✔
186

187
    def __init__(self, other=None, **kw):
1✔
188
        """The constructor is not allowed to raise errors
189
        """
190
        self.__doc__ = None  # Make BaseRequest objects unpublishable
1✔
191
        if other is None:
1✔
192
            other = kw
1✔
193
        else:
194
            other.update(kw)
1✔
195
        self.other = other
1✔
196

197
    def clear(self):
1✔
198
        self.other.clear()
1✔
199
        self._held = None
1✔
200

201
    def close(self):
1✔
202
        try:
1✔
203
            notify(EndRequestEvent(None, self))
1✔
204
        finally:
205
            # subscribers might need the zodb, so `clear` must come afterwards
206
            # (since `self._held=None` might close the connection, see above)
207
            self.clear()
1✔
208

209
    def processInputs(self):
1✔
210
        """Do any input processing that could raise errors
211
        """
212

213
    def __len__(self):
1✔
214
        return 1
1✔
215

216
    def __setitem__(self, key, value):
1✔
217
        """Set application variables
218

219
        This method is used to set a variable in the requests "other"
220
        category.
221
        """
222
        self.other[key] = value
1✔
223

224
    set = __setitem__
1✔
225

226
    def get(self, key, default=None):
1✔
227
        """Get a variable value
228

229
        Return a value for the required variable name.
230
        The value will be looked up from one of the request data
231
        categories. The search order is environment variables,
232
        other variables, form data, and then cookies.
233

234
        """
235
        if key == 'REQUEST':
1!
236
            return self
×
237

238
        v = self.other.get(key, _marker)
1✔
239
        if v is not _marker:
1✔
240
            return v
1✔
241
        v = self.common.get(key, default)
1✔
242
        if v is not _marker:
1✔
243
            return v
1✔
244

245
        if key == 'BODY' and self._file is not None:
1!
246
            p = self._file.tell()
×
247
            self._file.seek(0)
×
248
            v = self._file.read()
×
249
            self._file.seek(p)
×
250
            self.other[key] = v
×
251
            return v
×
252

253
        if key == 'BODYFILE' and self._file is not None:
1!
254
            v = self._file
×
255
            self.other[key] = v
×
256
            return v
×
257

258
        return default
1✔
259

260
    def __getitem__(self, key, default=_marker):
1✔
261
        v = self.get(key, default)
1✔
262
        if v is _marker:
1!
263
            raise KeyError(key)
×
264
        return v
1✔
265

266
    def __bobo_traverse__(self, name):
1✔
267
        raise KeyError(name)
1✔
268

269
    def __getattr__(self, key, default=_marker):
1✔
270
        v = self.get(key, default)
1✔
271
        if v is _marker:
1✔
272
            raise AttributeError(key)
1✔
273
        return v
1✔
274

275
    def set_lazy(self, key, callable):
1✔
276
        pass            # MAYBE, we could do more, but let HTTPRequest do it
×
277

278
    def has_key(self, key):
1✔
279
        return key in self
×
280

281
    def __contains__(self, key):
1✔
282
        return self.get(key, _marker) is not _marker
×
283

284
    def keys(self):
1✔
285
        keys = {}
1✔
286
        keys.update(self.common)
1✔
287
        keys.update(self.other)
1✔
288
        return list(keys.keys())
1✔
289

290
    def items(self):
1✔
291
        result = []
1✔
292
        for k in self.keys():
1✔
293
            result.append((k, self.get(k)))
1✔
294
        return result
1✔
295

296
    def values(self):
1✔
297
        result = []
×
298
        for k in self.keys():
×
299
            result.append(self.get(k))
×
300
        return result
×
301

302
    def __str__(self):
1✔
303
        L1 = list(self.items())
1✔
304
        L1.sort()
1✔
305
        return '\n'.join("%s:\t%s" % item for item in L1)
1✔
306

307
    __repr__ = __str__
1✔
308

309
    # Original version: see zope.traversing.publicationtraverse
310
    def traverseName(self, ob, name):
1✔
311
        if name and name[:1] in '@+':
1✔
312
            # Process URI segment parameters.
313
            ns, nm = nsParse(name)
1✔
314
            if ns:
1!
315
                try:
1✔
316
                    ob2 = namespaceLookup(ns, nm, ob, self)
1✔
317
                except LocationError:
1✔
318
                    raise ztkNotFound(ob, name)
1✔
319

320
                if IAcquirer.providedBy(ob2):
1✔
321
                    ob2 = ob2.__of__(ob)
1✔
322
                return ob2
1✔
323

324
        if name == '.':
1!
325
            return ob
×
326

327
        if IPublishTraverse.providedBy(ob):
1✔
328
            ob2 = ob.publishTraverse(self, name)
1✔
329
        else:
330
            adapter = queryMultiAdapter((ob, self), IPublishTraverse)
1✔
331
            if adapter is None:
1!
332
                # Zope2 doesn't set up its own adapters in a lot of cases
333
                # so we will just use a default adapter.
334
                adapter = DefaultPublishTraverse(ob, self)
1✔
335

336
            ob2 = adapter.publishTraverse(self, name)
1✔
337

338
        return ob2
1✔
339
    traverseName__roles__ = ()
1✔
340

341
    def traverse(self, path, response=None, validated_hook=None):
1✔
342
        """Traverse the object space
343

344
        The REQUEST must already have a PARENTS item with at least one
345
        object in it.  This is typically the root object.
346
        """
347
        request = self
1✔
348
        request_get = request.get
1✔
349
        if response is None:
1!
350
            response = self.response
1✔
351

352
        # remember path for later use
353
        browser_path = path
1✔
354

355
        # Cleanup the path list
356
        if path[:1] == '/':
1✔
357
            path = path[1:]
1✔
358
        if path[-1:] == '/':
1✔
359
            path = path[:-1]
1✔
360
        clean = []
1✔
361
        for item in path.split('/'):
1✔
362
            # Make sure that certain things that dont make sense
363
            # cannot be traversed.
364
            if item in ('REQUEST', 'aq_self', 'aq_base'):
1✔
365
                return response.notFoundError(path)
1✔
366
            if not item or item == '.':
1!
367
                continue
×
368
            elif item == '..':
1✔
369
                if not len(clean):
1!
370
                    return response.notFoundError(path)
1✔
UNCOV
371
                del clean[-1]
×
372
            else:
373
                clean.append(item)
1✔
374
        path = clean
1✔
375

376
        # How did this request come in? (HTTP GET, PUT, POST, etc.)
377
        method = request_get('REQUEST_METHOD', 'GET').upper()
1✔
378

379
        # Probably a browser
380
        no_acquire_flag = 0
1✔
381
        if method in ('GET', 'POST', 'PURGE') and \
1✔
382
           not is_xmlrpc_response(response):
383
            # index_html is still the default method, only any object can
384
            # override it by implementing its own __browser_default__ method
385
            method = 'index_html'
1✔
386
        elif method != 'HEAD' and self.maybe_webdav_client:
1✔
387
            # Probably a WebDAV client.
388
            no_acquire_flag = 1
1✔
389

390
        URL = request['URL']
1✔
391
        parents = request['PARENTS']
1✔
392
        object = parents[-1]
1✔
393
        del parents[:]
1✔
394

395
        self.roles = getRoles(None, None, object, UNSPECIFIED_ROLES)
1✔
396

397
        # if the top object has a __bobo_traverse__ method, then use it
398
        # to possibly traverse to an alternate top-level object.
399
        if hasattr(object, '__bobo_traverse__'):
1✔
400
            try:
1✔
401
                new_object = object.__bobo_traverse__(request)
1✔
402
                if new_object is not None:
1!
403
                    object = new_object
×
404
                    self.roles = getRoles(None, None, object,
×
405
                                          UNSPECIFIED_ROLES)
406
            except Exception:
×
407
                pass
×
408

409
        if not path and not method:
1!
410
            return response.forbiddenError(self['URL'])
×
411

412
        # Traverse the URL to find the object:
413
        if hasattr(object, '__of__'):
1✔
414
            # Try to bind the top-level object to the request
415
            # This is how you get 'self.REQUEST'
416
            object = object.__of__(RequestContainer(REQUEST=request))
1✔
417
        parents.append(object)
1✔
418

419
        steps = self.steps
1✔
420
        self._steps = _steps = list(map(quote, steps))
1✔
421
        path.reverse()
1✔
422

423
        request['TraversalRequestNameStack'] = request.path = path
1✔
424
        request['ACTUAL_URL'] = request['URL'] + quote(browser_path)
1✔
425

426
        # Set the posttraverse for duration of the traversal here
427
        self._post_traverse = post_traverse = []
1✔
428

429
        entry_name = ''
1✔
430
        try:
1✔
431
            # We build parents in the wrong order, so we
432
            # need to make sure we reverse it when we're done.
433
            while 1:
434
                bpth = getattr(object, '__before_publishing_traverse__', None)
1✔
435
                if bpth is not None:
1✔
436
                    bpth(object, self)
1✔
437

438
                path = request.path = request['TraversalRequestNameStack']
1✔
439
                # Check for method:
440
                if path:
1✔
441
                    entry_name = path.pop()
1✔
442
                else:
443
                    # If we have reached the end of the path, we look to see
444
                    # if we can find IBrowserPublisher.browserDefault. If so,
445
                    # we call it to let the object tell us how to publish it.
446
                    # BrowserDefault returns the object to be published
447
                    # (usually self) and a sequence of names to traverse to
448
                    # find the method to be published.
449

450
                    # This is webdav support. The last object in the path
451
                    # should not be acquired. Instead, a NullResource should
452
                    # be given if it doesn't exist:
453
                    if no_acquire_flag and \
1✔
454
                       hasattr(object, 'aq_base') and \
455
                       not hasattr(object, '__bobo_traverse__'):
456

457
                        if (object.__parent__ is not
1✔
458
                                aq_inner(object).__parent__):
459
                            from webdav.NullResource import NullResource
1✔
460
                            object = NullResource(parents[-2], object.getId(),
1✔
461
                                                  self).__of__(parents[-2])
462

463
                    if IBrowserPublisher.providedBy(object):
1✔
464
                        adapter = object
1✔
465
                    else:
466
                        adapter = queryMultiAdapter((object, self),
1✔
467
                                                    IBrowserPublisher)
468
                        if adapter is None:
1!
469
                            # Zope2 doesn't set up its own adapters in a lot
470
                            # of cases so we will just use a default adapter.
471
                            adapter = DefaultPublishTraverse(object, self)
1✔
472

473
                    object, default_path = adapter.browserDefault(self)
1✔
474
                    if default_path:
1✔
475
                        request._hacked_path = 1
1✔
476
                        if len(default_path) > 1:
1!
477
                            path = list(default_path)
×
478
                            method = path.pop()
×
479
                            request['TraversalRequestNameStack'] = path
×
480
                            continue
×
481
                        else:
482
                            entry_name = default_path[0]
1✔
483
                    elif (method
1✔
484
                          and hasattr(object, method)
485
                          and entry_name != method
486
                          and getattr(object, method) is not None):
487
                        request._hacked_path = 1
1✔
488
                        entry_name = method
1✔
489
                        method = 'index_html'
1✔
490
                    else:
491
                        if hasattr(object, '__call__'):
1✔
492
                            self.roles = getRoles(
1✔
493
                                object, '__call__',
494
                                object.__call__, self.roles)
495
                            self.ensure_publishable(object.__call__, True)
1✔
496
                        if request._hacked_path:
1!
497
                            i = URL.rfind('/')
1✔
498
                            if i > 0:
1!
499
                                response.setBase(URL[:i])
1✔
500
                        break
1✔
501
                step = quote(entry_name)
1✔
502
                _steps.append(step)
1✔
503
                request['URL'] = URL = f'{request["URL"]}/{step}'
1✔
504

505
                try:
1✔
506
                    subobject = self.traverseName(object, entry_name)
1✔
507
                    if hasattr(object, '__bobo_traverse__') or \
1✔
508
                       hasattr(object, entry_name):
509
                        check_name = entry_name
1✔
510
                    else:
511
                        check_name = None
1✔
512

513
                    self.roles = getRoles(
1✔
514
                        object, check_name, subobject,
515
                        self.roles)
516
                    object = subobject
1✔
517
                # traverseName() might raise ZTK's NotFound
518
                except (KeyError, AttributeError, ztkNotFound):
1✔
519
                    if response.debug_mode:
1!
520
                        return response.debugError(
×
521
                            "Cannot locate object at: %s" % URL)
522
                    else:
523
                        return response.notFoundError(URL)
1!
524
                except Forbidden as e:
1✔
525
                    if self.response.debug_mode:
1!
526
                        return response.debugError(e.args)
×
527
                    else:
528
                        return response.forbiddenError(entry_name)
1!
529

530
                parents.append(object)
1✔
531

532
                steps.append(entry_name)
1✔
533
        finally:
534
            parents.reverse()
1!
535

536
        # Note - no_acquire_flag is necessary to support
537
        # things like DAV.  We have to make sure
538
        # that the target object is not acquired
539
        # if the request_method is other than GET
540
        # or POST. Otherwise, you could never use
541
        # PUT to add a new object named 'test' if
542
        # an object 'test' existed above it in the
543
        # hierarchy -- you'd always get the
544
        # existing object :(
545
        if no_acquire_flag and \
1✔
546
           hasattr(parents[1], 'aq_base') and \
547
           not hasattr(parents[1], '__bobo_traverse__'):
548
            base = aq_base(parents[1])
1✔
549
            if not hasattr(base, entry_name):
1!
550
                try:
×
551
                    if entry_name not in base:
×
552
                        raise AttributeError(entry_name)
×
553
                except TypeError:
×
554
                    raise AttributeError(entry_name)
×
555

556
        # After traversal post traversal hooks aren't available anymore
557
        del self._post_traverse
1✔
558

559
        request['PUBLISHED'] = parents.pop(0)
1✔
560

561
        # Do authorization checks
562
        user = groups = None
1✔
563
        i = 0
1✔
564

565
        if 1:  # Always perform authentication.
566

567
            last_parent_index = len(parents)
1✔
568
            if hasattr(object, '__allow_groups__'):
1!
569
                groups = object.__allow_groups__
×
570
                inext = 0
×
571
            else:
572
                inext = None
1✔
573
                for i in range(last_parent_index):
1✔
574
                    if hasattr(parents[i], '__allow_groups__'):
1✔
575
                        groups = parents[i].__allow_groups__
1✔
576
                        inext = i + 1
1✔
577
                        break
1✔
578

579
            if inext is not None:
1✔
580
                i = inext
1✔
581
                v = getattr(groups, 'validate', old_validation)
1✔
582

583
                auth = request._auth
1✔
584

585
                if v is old_validation and self.roles is UNSPECIFIED_ROLES:
1!
586
                    # No roles, so if we have a named group, get roles from
587
                    # group keys
588
                    if hasattr(groups, 'keys'):
×
589
                        self.roles = list(groups.keys())
×
590
                    else:
591
                        try:
×
592
                            groups = groups()
×
593
                        except Exception:
×
594
                            pass
×
595
                        try:
×
596
                            self.roles = list(groups.keys())
×
597
                        except Exception:
×
598
                            pass
×
599

600
                    if groups is None:
×
601
                        # Public group, hack structures to get it to validate
602
                        self.roles = None
×
603
                        auth = ''
×
604

605
                if v is old_validation:
1!
606
                    user = old_validation(groups, request, auth, self.roles)
×
607
                elif self.roles is UNSPECIFIED_ROLES:
1✔
608
                    user = v(request, auth)
1✔
609
                else:
610
                    user = v(request, auth, self.roles)
1✔
611

612
                while user is None and i < last_parent_index:
1✔
613
                    parent = parents[i]
1✔
614
                    i = i + 1
1✔
615
                    if hasattr(parent, '__allow_groups__'):
1!
616
                        groups = parent.__allow_groups__
1✔
617
                    else:
618
                        continue
×
619
                    if hasattr(groups, 'validate'):
1!
620
                        v = groups.validate
1✔
621
                    else:
622
                        v = old_validation
×
623
                    if v is old_validation:
1!
624
                        user = old_validation(
×
625
                            groups, request, auth, self.roles)
626
                    elif self.roles is UNSPECIFIED_ROLES:
1!
627
                        user = v(request, auth)
×
628
                    else:
629
                        user = v(request, auth, self.roles)
1✔
630

631
            if user is None and self.roles != UNSPECIFIED_ROLES:
1✔
632
                response.unauthorized()
1✔
633

634
        if user is not None:
1✔
635
            if validated_hook is not None:
1✔
636
                validated_hook(self, user)
1✔
637
            request['AUTHENTICATED_USER'] = user
1✔
638
            request['AUTHENTICATION_PATH'] = '/'.join(steps[:-i])
1✔
639

640
        # Remove http request method from the URL.
641
        request['URL'] = URL
1✔
642

643
        # Run post traversal hooks
644
        if post_traverse:
1✔
645
            result = exec_callables(post_traverse)
1✔
646
            if result is not None:
1✔
647
                object = result
1✔
648

649
        return object
1✔
650

651
    def post_traverse(self, f, args=()):
1✔
652
        """Add a callable object and argument tuple to be post-traversed.
653

654
        If traversal and authentication succeed, each post-traversal
655
        pair is processed in the order in which they were added.
656
        Each argument tuple is passed to its callable.  If a callable
657
        returns a value other than None, no more pairs are processed,
658
        and the return value replaces the traversal result.
659
        """
660
        try:
1✔
661
            pairs = self._post_traverse
1✔
662
        except AttributeError:
×
663
            raise RuntimeError('post_traverse() may only be called '
×
664
                               'during publishing traversal.')
665
        else:
666
            pairs.append((f, tuple(args)))
1✔
667

668
    retry_count = 0
1✔
669

670
    def supports_retry(self):
1✔
671
        return 0
1✔
672

673
    def _hold(self, object):
1✔
674
        """Hold a reference to an object to delay it's destruction until mine
675
        """
676
        if self._held is not None:
1✔
677
            self._held = self._held + (object, )
1✔
678

679
    def ensure_publishable(self, obj, for_call=False):
1✔
680
        """raise ``Forbidden`` unless *obj* is publishable.
681

682
        *for_call* tells us whether we are called for the ``__call__``
683
        method. In general, its publishablity is determined by
684
        its ``__self__`` but it might have more restrictive prescriptions.
685
        """
686
        url, default = self["URL"], None
1✔
687
        if for_call:
1✔
688
            # We are called to check the publication
689
            # of the ``__call__`` method.
690
            # Usually, its publication indication comes from its
691
            # ``__self__`` and this has already been checked.
692
            # It can however carry a stricter publication indication
693
            # which we want to check here.
694
            # We achieve this by changing *default* from
695
            # ``None`` to ``True``. In this way, we get the publication
696
            # indication of ``__call__`` if it carries one
697
            # or ``True`` otherwise which in this case
698
            # indicates "already checked".
699
            url += "[__call__]"
1✔
700
            default = True
1✔
701
        publishable = zpublish_mark(obj, default)
1✔
702
        # ``publishable`` is either ``None``, ``True``, ``False`` or
703
        # a tuple of allowed request methods.
704
        if publishable is True:  # explicitely marked as publishable
1✔
705
            return
1✔
706
        elif publishable is False:  # explicitely marked as not publishable
1✔
707
            raise Forbidden(
1✔
708
                f"The object at {url} is marked as not publishable")
709
        elif publishable is not None:
1✔
710
            # a tuple of allowed request methods
711
            request_method = (getattr(self, "environ", None)
1✔
712
                              and self.environ.get("REQUEST_METHOD"))
713
            if  (request_method is None  # noqa: E271
1✔
714
                 or request_method.upper() not in publishable):
715
                raise Forbidden(
1✔
716
                    f"The object at {url} does not support "
717
                    f"{request_method} requests")
718
            return
1✔
719
        # ``publishable`` is ``None``
720

721
        # Check that built-in types aren't publishable.
722
        if not typeCheck(obj):
1✔
723
            raise Forbidden(
1✔
724
                "The object at %s is not publishable." % url)
725
        # Ensure that the object has a docstring
726
        doc = getattr(obj, '__doc__', None)
1✔
727
        if not doc:
1✔
728
            raise Forbidden(
1✔
729
                f"The object at {url} has an empty or missing "
730
                "docstring. Objects must either be marked via "
731
                "to `ZPublisher.zpublish` decorator or have a docstring to be "
732
                "published.")
733
        if deprecate_docstrings:
1✔
734
            warnings.warn(DocstringWarning(obj, url))
1✔
735

736

737
def exec_callables(callables):
1✔
738
    result = None
1✔
739
    for (f, args) in callables:
1✔
740
        # Don't catch exceptions here. And don't hide them anyway.
741
        result = f(*args)
1✔
742
        if result is not None:
1✔
743
            return result
1✔
744

745

746
def old_validation(groups, request, auth,
1✔
747
                   roles=UNSPECIFIED_ROLES):
748

749
    if auth:
×
750
        auth = request._authUserPW()
×
751
        if auth:
×
752
            name, password = auth
×
753
        elif roles is None:
×
754
            return ''
×
755
        else:
756
            return None
×
757
    elif 'REMOTE_USER' in request.environ:
×
758
        name = request.environ['REMOTE_USER']
×
759
        password = None
×
760
    else:
761
        if roles is None:
×
762
            return ''
×
763
        return None
×
764

765
    if roles is None:
×
766
        return name
×
767

768
    keys = None
×
769
    try:
×
770
        keys = groups.keys
×
771
    except Exception:
×
772
        try:
×
773
            groups = groups()  # Maybe it was a method defining a group
×
774
            keys = groups.keys
×
775
        except Exception:
×
776
            pass
×
777

778
    if keys is not None:
×
779
        # OK, we have a named group, so apply the roles to the named
780
        # group.
781
        if roles is UNSPECIFIED_ROLES:
×
782
            roles = keys()
×
783
        g = []
×
784
        for role in roles:
×
785
            if role in groups:
×
786
                g.append(groups[role])
×
787
        groups = g
×
788

789
    for d in groups:
×
790
        if name in d and (d[name] == password or password is None):
×
791
            return name
×
792

793
    if keys is None:
×
794
        # Not a named group, so don't go further
795
        raise Forbidden(
×
796
            """<strong>You are not authorized to access this resource""")
797

798
    return None
×
799

800

801
itypes = {
1✔
802
    bool: 0,
803
    types.CodeType: 0,
804
    complex: 0,
805
    dict: 0,
806
    float: 0,
807
    types.FrameType: 0,
808
    frozenset: 0,
809
    int: 0,
810
    list: 0,
811
    type(None): 0,
812
    set: 0,
813
    slice: 0,
814
    str: 0,
815
    types.TracebackType: 0,
816
    tuple: 0,
817
}
818
for name in ('BufferType', 'DictProxyType', 'EllipsisType',
1✔
819
             'LongType', 'UnicodeType', 'XRangeType'):
820
    if hasattr(types, name):
1!
821
        itypes[getattr(types, name)] = 0
×
822

823

824
def typeCheck(obj, deny=itypes):
1✔
825
    # Return true if its ok to publish the type, false otherwise.
826
    return deny.get(type(obj), 1)
1✔
827

828

829
deprecate_docstrings = environ.get("ZPUBLISHER_DEPRECATE_DOCSTRINGS")
1✔
830

831

832
class DocstringWarning(DeprecationWarning):
1✔
833
    def tag(self):
1✔
834
        import inspect as i
1✔
835

836
        def lineno(o, m=False):
1✔
837
            """try to determine where *o* has been defined.
838

839
            *o* is either a function or a class.
840
            """
841
            try:
1✔
842
                _, lineno = i.getsourcelines(o)
1✔
843
            except (OSError, TypeError):
1✔
844
                return ""
1✔
845
            return f"[{o.__module__}:{lineno}]" if m else f" at line {lineno}"
1✔
846

847
        obj, url = self.args
1✔
848
        desc = None
1✔
849
        if i.ismethod(obj):
1✔
850
            f = i.unwrap(obj.__func__)
1✔
851
            c = obj.__self__.__class__
1✔
852
            desc = f"'{c.__module__}.{c.__qualname__}' " \
1✔
853
                   f"method '{obj.__qualname__}'{lineno(f, 1)}"
854
        elif i.isfunction(obj):
1✔
855
            f = i.unwrap(obj)
1✔
856
            desc = f"function '{f.__module__}.{f.__qualname__}'" \
1✔
857
                   f"{lineno(f)}"
858
        else:
859
            try:
1✔
860
                cls_doc = "__doc__" not in obj.__dict__
1✔
861
            except AttributeError:
1✔
862
                cls_doc = True
1✔
863
            if cls_doc:
1✔
864
                c = obj.__class__
1✔
865
                desc = f"'{c.__module__}.{c.__qualname__}'{lineno(c)}"
1✔
866
        if desc is None:
1✔
867
            desc = f"object at '{url}'"
1✔
868
        return desc
1✔
869

870
    def __str__(self):
1✔
871
        return (f"{self.tag()} uses deprecated docstring "
1✔
872
                "publication control. Use the `ZPublisher.zpublish` decorator "
873
                "instead")
874

875

876
if deprecate_docstrings:
1!
877
    # look whether there is already a ``DocstringWarning`` filter
878
    for f in warnings.filters:
×
879
        if f[2] is DocstringWarning:
×
880
            break
×
881
    else:
882
        # provide a ``DocstringWarning`` filter
883
        # if ``deprecate_docstrings`` specifies a sensefull action
884
        # use it, otherwise ``"default"``.
885
        warn_action = deprecate_docstrings \
×
886
            if deprecate_docstrings \
887
            in ("default", "error", "ignore", "always") \
888
            else "default"
889
        warnings.filterwarnings(warn_action, category=DocstringWarning)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc