• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 3956162881

pending completion
3956162881

push

github

Michael Howitz
Update to deprecation warning free releases.

4401 of 7036 branches covered (62.55%)

Branch coverage included in aggregate %.

27161 of 31488 relevant lines covered (86.26%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.36
/src/ZPublisher/BaseRequest.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13
""" Basic ZPublisher request management.
1✔
14
"""
15

16
import types
1✔
17
from urllib.parse import quote as urllib_quote
1✔
18

19
from AccessControl.ZopeSecurityPolicy import getRoles
1✔
20
from Acquisition import aq_base
1✔
21
from Acquisition import aq_inner
1✔
22
from Acquisition.interfaces import IAcquirer
1✔
23
from ExtensionClass import Base
1✔
24
from zExceptions import Forbidden
1✔
25
from zExceptions import NotFound
1✔
26
from zope.component import queryMultiAdapter
1✔
27
from zope.event import notify
1✔
28
from zope.interface import Interface
1✔
29
from zope.interface import implementer
1✔
30
from zope.location.interfaces import LocationError
1✔
31
from zope.publisher.defaultview import queryDefaultViewName
1✔
32
from zope.publisher.interfaces import EndRequestEvent
1✔
33
from zope.publisher.interfaces import IPublishTraverse
1✔
34
from zope.publisher.interfaces import NotFound as ztkNotFound
1✔
35
from zope.publisher.interfaces.browser import IBrowserPublisher
1✔
36
from zope.traversing.namespace import namespaceLookup
1✔
37
from zope.traversing.namespace import nsParse
1✔
38
from ZPublisher.Converters import type_converters
1✔
39
from ZPublisher.interfaces import UseTraversalDefault
1✔
40
from ZPublisher.xmlrpc import is_xmlrpc_response
1✔
41

42

43
_marker = []
1✔
44
UNSPECIFIED_ROLES = ''
1✔
45

46

47
def quote(text):
1✔
48
    # quote url path segments, but leave + and @ intact
49
    return urllib_quote(text, '/+@')
1✔
50

51

52
class RequestContainer(Base):
1✔
53
    __roles__ = None
1✔
54

55
    def __init__(self, **kw):
1✔
56
        for k, v in kw.items():
1✔
57
            self.__dict__[k] = v
1✔
58

59
    def manage_property_types(self):
1✔
60
        return list(type_converters.keys())
×
61

62

63
@implementer(IBrowserPublisher)
1✔
64
class DefaultPublishTraverse:
1✔
65

66
    def __init__(self, context, request):
1✔
67
        self.context = context
1✔
68
        self.request = request
1✔
69

70
    def publishTraverse(self, request, name):
1✔
71
        object = self.context
1✔
72
        URL = request['URL']
1✔
73

74
        if name[:1] == '_':
1!
75
            raise Forbidden(
×
76
                "Object name begins with an underscore at: %s" % URL)
77

78
        subobject = UseTraversalDefault  # indicator
1✔
79
        try:
1✔
80
            if hasattr(object, '__bobo_traverse__'):
1✔
81
                try:
1✔
82
                    subobject = object.__bobo_traverse__(request, name)
1✔
83
                    if isinstance(subobject, tuple) and len(subobject) > 1:
1!
84
                        # Add additional parents into the path
85
                        # XXX There are no tests for this:
86
                        request['PARENTS'][-1:] = list(subobject[:-1])
×
87
                        object, subobject = subobject[-2:]
×
88
                except (AttributeError, KeyError, NotFound) as e:
1✔
89
                    # Try to find a view
90
                    subobject = queryMultiAdapter(
1✔
91
                        (object, request), Interface, name)
92
                    if subobject is not None:
1✔
93
                        # OFS.Application.__bobo_traverse__ calls
94
                        # REQUEST.RESPONSE.notFoundError which sets the HTTP
95
                        # status code to 404
96
                        request.response.setStatus(200)
1✔
97
                        # We don't need to do the docstring security check
98
                        # for views, so lets skip it and
99
                        # return the object here.
100
                        if IAcquirer.providedBy(subobject):
1!
101
                            subobject = subobject.__of__(object)
×
102
                        return subobject
1✔
103
                    # No view found. Reraise the error
104
                    # raised by __bobo_traverse__
105
                    raise e
1✔
106
        except UseTraversalDefault:
1✔
107
            pass
1✔
108
        if subobject is UseTraversalDefault:
1✔
109
            # No __bobo_traverse__ or default traversal requested
110
            # Try with an unacquired attribute:
111
            if hasattr(aq_base(object), name):
1✔
112
                subobject = getattr(object, name)
1✔
113
            else:
114
                # We try to fall back to a view:
115
                subobject = queryMultiAdapter((object, request), Interface,
1✔
116
                                              name)
117
                if subobject is not None:
1✔
118
                    if IAcquirer.providedBy(subobject):
1✔
119
                        subobject = subobject.__of__(object)
1✔
120
                    return subobject
1✔
121

122
                # And lastly, of there is no view, try acquired attributes, but
123
                # only if there is no __bobo_traverse__:
124
                try:
1✔
125
                    subobject = getattr(object, name)
1✔
126
                    # Again, clear any error status created by
127
                    # __bobo_traverse__ because we actually found something:
128
                    request.response.setStatus(200)
1✔
129
                except AttributeError:
1✔
130
                    pass
1✔
131

132
                # Lastly we try with key access:
133
                if subobject is None:
1✔
134
                    try:
1✔
135
                        subobject = object[name]
1✔
136
                    except TypeError:  # unsubscriptable
1✔
137
                        raise KeyError(name)
1✔
138

139
        # Ensure that the object has a docstring, or that the parent
140
        # object has a pseudo-docstring for the object. Objects that
141
        # have an empty or missing docstring are not published.
142
        doc = getattr(subobject, '__doc__', None)
1✔
143
        if not doc:
1✔
144
            raise Forbidden(
1✔
145
                "The object at %s has an empty or missing "
146
                "docstring. Objects must have a docstring to be "
147
                "published." % URL
148
            )
149

150
        # Check that built-in types aren't publishable.
151
        if not typeCheck(subobject):
1✔
152
            raise Forbidden(
1✔
153
                "The object at %s is not publishable." % URL)
154

155
        return subobject
1✔
156

157
    def browserDefault(self, request):
1✔
158
        if hasattr(self.context, '__browser_default__'):
1✔
159
            return self.context.__browser_default__(request)
1✔
160
        # Zope 3.2 still uses IDefaultView name when it
161
        # registeres default views, even though it's
162
        # deprecated. So we handle that here:
163
        default_name = queryDefaultViewName(self.context, request)
1✔
164
        if default_name is not None:
1✔
165
            # Adding '@@' here forces this to be a view.
166
            # A neater solution might be desireable.
167
            return self.context, ('@@' + default_name,)
1✔
168
        return self.context, ()
1✔
169

170

171
class BaseRequest:
1✔
172
    """Provide basic ZPublisher request management
173

174
    This object provides access to request data. Request data may
175
    vary depending on the protocol used.
176

177
    Request objects are created by the object publisher and will be
178
    passed to published objects through the argument name, REQUEST.
179

180
    The request object is a mapping object that represents a
181
    collection of variable to value mappings.
182
    """
183

184
    maybe_webdav_client = 1
1✔
185

186
    # While the following assignment is not strictly necessary, it
187
    # prevents alot of unnecessary searches because, without it,
188
    # acquisition of REQUEST is disallowed, which penalizes access
189
    # in DTML with tags.
190
    __roles__ = None
1✔
191
    _file = None
1✔
192
    common = {}  # Common request data
1✔
193
    _auth = None
1✔
194
    _held = ()
1✔
195

196
    # Allow (reluctantly) access to unprotected attributes
197
    __allow_access_to_unprotected_subobjects__ = 1
1✔
198

199
    def __init__(self, other=None, **kw):
1✔
200
        """The constructor is not allowed to raise errors
201
        """
202
        self.__doc__ = None  # Make BaseRequest objects unpublishable
1✔
203
        if other is None:
1✔
204
            other = kw
1✔
205
        else:
206
            other.update(kw)
1✔
207
        self.other = other
1✔
208

209
    def clear(self):
1✔
210
        self.other.clear()
1✔
211
        self._held = None
1✔
212

213
    def close(self):
1✔
214
        try:
1✔
215
            notify(EndRequestEvent(None, self))
1✔
216
        finally:
217
            # subscribers might need the zodb, so `clear` must come afterwards
218
            # (since `self._held=None` might close the connection, see above)
219
            self.clear()
1✔
220

221
    def processInputs(self):
1✔
222
        """Do any input processing that could raise errors
223
        """
224

225
    def __len__(self):
1✔
226
        return 1
1✔
227

228
    def __setitem__(self, key, value):
1✔
229
        """Set application variables
230

231
        This method is used to set a variable in the requests "other"
232
        category.
233
        """
234
        self.other[key] = value
1✔
235

236
    set = __setitem__
1✔
237

238
    def get(self, key, default=None):
1✔
239
        """Get a variable value
240

241
        Return a value for the required variable name.
242
        The value will be looked up from one of the request data
243
        categories. The search order is environment variables,
244
        other variables, form data, and then cookies.
245

246
        """
247
        if key == 'REQUEST':
1!
248
            return self
×
249

250
        v = self.other.get(key, _marker)
1✔
251
        if v is not _marker:
1✔
252
            return v
1✔
253
        v = self.common.get(key, default)
1✔
254
        if v is not _marker:
1✔
255
            return v
1✔
256

257
        if key == 'BODY' and self._file is not None:
1!
258
            p = self._file.tell()
×
259
            self._file.seek(0)
×
260
            v = self._file.read()
×
261
            self._file.seek(p)
×
262
            self.other[key] = v
×
263
            return v
×
264

265
        if key == 'BODYFILE' and self._file is not None:
1!
266
            v = self._file
×
267
            self.other[key] = v
×
268
            return v
×
269

270
        return default
1✔
271

272
    def __getitem__(self, key, default=_marker):
1✔
273
        v = self.get(key, default)
1✔
274
        if v is _marker:
1!
275
            raise KeyError(key)
×
276
        return v
1✔
277

278
    def __bobo_traverse__(self, name):
1✔
279
        raise KeyError(name)
1✔
280

281
    def __getattr__(self, key, default=_marker):
1✔
282
        v = self.get(key, default)
1✔
283
        if v is _marker:
1✔
284
            raise AttributeError(key)
1✔
285
        return v
1✔
286

287
    def set_lazy(self, key, callable):
1✔
288
        pass            # MAYBE, we could do more, but let HTTPRequest do it
×
289

290
    def has_key(self, key):
1✔
291
        return key in self
×
292

293
    def __contains__(self, key):
1✔
294
        return self.get(key, _marker) is not _marker
×
295

296
    def keys(self):
1✔
297
        keys = {}
1✔
298
        keys.update(self.common)
1✔
299
        keys.update(self.other)
1✔
300
        return list(keys.keys())
1✔
301

302
    def items(self):
1✔
303
        result = []
1✔
304
        for k in self.keys():
1✔
305
            result.append((k, self.get(k)))
1✔
306
        return result
1✔
307

308
    def values(self):
1✔
309
        result = []
×
310
        for k in self.keys():
×
311
            result.append(self.get(k))
×
312
        return result
×
313

314
    def __str__(self):
1✔
315
        L1 = list(self.items())
1✔
316
        L1.sort()
1✔
317
        return '\n'.join("%s:\t%s" % item for item in L1)
1✔
318

319
    __repr__ = __str__
1✔
320

321
    # Original version: see zope.traversing.publicationtraverse
322
    def traverseName(self, ob, name):
1✔
323
        if name and name[:1] in '@+':
1✔
324
            # Process URI segment parameters.
325
            ns, nm = nsParse(name)
1✔
326
            if ns:
1!
327
                try:
1✔
328
                    ob2 = namespaceLookup(ns, nm, ob, self)
1✔
329
                except LocationError:
1✔
330
                    raise ztkNotFound(ob, name)
1✔
331

332
                if IAcquirer.providedBy(ob2):
1✔
333
                    ob2 = ob2.__of__(ob)
1✔
334
                return ob2
1✔
335

336
        if name == '.':
1!
337
            return ob
×
338

339
        if IPublishTraverse.providedBy(ob):
1✔
340
            ob2 = ob.publishTraverse(self, name)
1✔
341
        else:
342
            adapter = queryMultiAdapter((ob, self), IPublishTraverse)
1✔
343
            if adapter is None:
1!
344
                # Zope2 doesn't set up its own adapters in a lot of cases
345
                # so we will just use a default adapter.
346
                adapter = DefaultPublishTraverse(ob, self)
1✔
347

348
            ob2 = adapter.publishTraverse(self, name)
1✔
349

350
        return ob2
1✔
351
    traverseName__roles__ = ()
1✔
352

353
    def traverse(self, path, response=None, validated_hook=None):
1✔
354
        """Traverse the object space
355

356
        The REQUEST must already have a PARENTS item with at least one
357
        object in it.  This is typically the root object.
358
        """
359
        request = self
1✔
360
        request_get = request.get
1✔
361
        if response is None:
1!
362
            response = self.response
1✔
363

364
        # remember path for later use
365
        browser_path = path
1✔
366

367
        # Cleanup the path list
368
        if path[:1] == '/':
1✔
369
            path = path[1:]
1✔
370
        if path[-1:] == '/':
1✔
371
            path = path[:-1]
1✔
372
        clean = []
1✔
373
        for item in path.split('/'):
1✔
374
            # Make sure that certain things that dont make sense
375
            # cannot be traversed.
376
            if item in ('REQUEST', 'aq_self', 'aq_base'):
1!
377
                return response.notFoundError(path)
×
378
            if not item or item == '.':
1!
379
                continue
×
380
            elif item == '..':
1!
381
                del clean[-1]
×
382
            else:
383
                clean.append(item)
1✔
384
        path = clean
1✔
385

386
        # How did this request come in? (HTTP GET, PUT, POST, etc.)
387
        method = request_get('REQUEST_METHOD', 'GET').upper()
1✔
388

389
        # Probably a browser
390
        no_acquire_flag = 0
1✔
391
        if method in ('GET', 'POST', 'PURGE') and \
1✔
392
           not is_xmlrpc_response(response):
393
            # index_html is still the default method, only any object can
394
            # override it by implementing its own __browser_default__ method
395
            method = 'index_html'
1✔
396
        elif method != 'HEAD' and self.maybe_webdav_client:
1✔
397
            # Probably a WebDAV client.
398
            no_acquire_flag = 1
1✔
399

400
        URL = request['URL']
1✔
401
        parents = request['PARENTS']
1✔
402
        object = parents[-1]
1✔
403
        del parents[:]
1✔
404

405
        self.roles = getRoles(None, None, object, UNSPECIFIED_ROLES)
1✔
406

407
        # if the top object has a __bobo_traverse__ method, then use it
408
        # to possibly traverse to an alternate top-level object.
409
        if hasattr(object, '__bobo_traverse__'):
1✔
410
            try:
1✔
411
                new_object = object.__bobo_traverse__(request)
1✔
412
                if new_object is not None:
1!
413
                    object = new_object
×
414
                    self.roles = getRoles(None, None, object,
×
415
                                          UNSPECIFIED_ROLES)
416
            except Exception:
×
417
                pass
×
418

419
        if not path and not method:
1!
420
            return response.forbiddenError(self['URL'])
×
421

422
        # Traverse the URL to find the object:
423
        if hasattr(object, '__of__'):
1✔
424
            # Try to bind the top-level object to the request
425
            # This is how you get 'self.REQUEST'
426
            object = object.__of__(RequestContainer(REQUEST=request))
1✔
427
        parents.append(object)
1✔
428

429
        steps = self.steps
1✔
430
        self._steps = _steps = list(map(quote, steps))
1✔
431
        path.reverse()
1✔
432

433
        request['TraversalRequestNameStack'] = request.path = path
1✔
434
        request['ACTUAL_URL'] = request['URL'] + quote(browser_path)
1✔
435

436
        # Set the posttraverse for duration of the traversal here
437
        self._post_traverse = post_traverse = []
1✔
438

439
        entry_name = ''
1✔
440
        try:
1✔
441
            # We build parents in the wrong order, so we
442
            # need to make sure we reverse it when we're done.
443
            while 1:
444
                bpth = getattr(object, '__before_publishing_traverse__', None)
1✔
445
                if bpth is not None:
1✔
446
                    bpth(object, self)
1✔
447

448
                path = request.path = request['TraversalRequestNameStack']
1✔
449
                # Check for method:
450
                if path:
1✔
451
                    entry_name = path.pop()
1✔
452
                else:
453
                    # If we have reached the end of the path, we look to see
454
                    # if we can find IBrowserPublisher.browserDefault. If so,
455
                    # we call it to let the object tell us how to publish it.
456
                    # BrowserDefault returns the object to be published
457
                    # (usually self) and a sequence of names to traverse to
458
                    # find the method to be published.
459

460
                    # This is webdav support. The last object in the path
461
                    # should not be acquired. Instead, a NullResource should
462
                    # be given if it doesn't exist:
463
                    if no_acquire_flag and \
1✔
464
                       hasattr(object, 'aq_base') and \
465
                       not hasattr(object, '__bobo_traverse__'):
466

467
                        if (object.__parent__ is not
1✔
468
                                aq_inner(object).__parent__):
469
                            from webdav.NullResource import NullResource
1✔
470
                            object = NullResource(parents[-2], object.getId(),
1✔
471
                                                  self).__of__(parents[-2])
472

473
                    if IBrowserPublisher.providedBy(object):
1✔
474
                        adapter = object
1✔
475
                    else:
476
                        adapter = queryMultiAdapter((object, self),
1✔
477
                                                    IBrowserPublisher)
478
                        if adapter is None:
1!
479
                            # Zope2 doesn't set up its own adapters in a lot
480
                            # of cases so we will just use a default adapter.
481
                            adapter = DefaultPublishTraverse(object, self)
1✔
482

483
                    object, default_path = adapter.browserDefault(self)
1✔
484
                    if default_path:
1✔
485
                        request._hacked_path = 1
1✔
486
                        if len(default_path) > 1:
1!
487
                            path = list(default_path)
×
488
                            method = path.pop()
×
489
                            request['TraversalRequestNameStack'] = path
×
490
                            continue
×
491
                        else:
492
                            entry_name = default_path[0]
1✔
493
                    elif (method
1✔
494
                          and hasattr(object, method)
495
                          and entry_name != method
496
                          and getattr(object, method) is not None):
497
                        request._hacked_path = 1
1✔
498
                        entry_name = method
1✔
499
                        method = 'index_html'
1✔
500
                    else:
501
                        if hasattr(object, '__call__'):
1✔
502
                            self.roles = getRoles(
1✔
503
                                object, '__call__',
504
                                object.__call__, self.roles)
505
                        if request._hacked_path:
1!
506
                            i = URL.rfind('/')
1✔
507
                            if i > 0:
1!
508
                                response.setBase(URL[:i])
1✔
509
                        break
1✔
510
                step = quote(entry_name)
1✔
511
                _steps.append(step)
1✔
512
                request['URL'] = URL = f'{request["URL"]}/{step}'
1✔
513

514
                try:
1✔
515
                    subobject = self.traverseName(object, entry_name)
1✔
516
                    if hasattr(object, '__bobo_traverse__') or \
1✔
517
                       hasattr(object, entry_name):
518
                        check_name = entry_name
1✔
519
                    else:
520
                        check_name = None
1✔
521

522
                    self.roles = getRoles(
1✔
523
                        object, check_name, subobject,
524
                        self.roles)
525
                    object = subobject
1✔
526
                # traverseName() might raise ZTK's NotFound
527
                except (KeyError, AttributeError, ztkNotFound):
1✔
528
                    if response.debug_mode:
1!
529
                        return response.debugError(
×
530
                            "Cannot locate object at: %s" % URL)
531
                    else:
532
                        return response.notFoundError(URL)
1!
533
                except Forbidden as e:
1✔
534
                    if self.response.debug_mode:
1!
535
                        return response.debugError(e.args)
×
536
                    else:
537
                        return response.forbiddenError(entry_name)
1!
538

539
                parents.append(object)
1✔
540

541
                steps.append(entry_name)
1✔
542
        finally:
543
            parents.reverse()
1!
544

545
        # Note - no_acquire_flag is necessary to support
546
        # things like DAV.  We have to make sure
547
        # that the target object is not acquired
548
        # if the request_method is other than GET
549
        # or POST. Otherwise, you could never use
550
        # PUT to add a new object named 'test' if
551
        # an object 'test' existed above it in the
552
        # hierarchy -- you'd always get the
553
        # existing object :(
554
        if no_acquire_flag and \
1✔
555
           hasattr(parents[1], 'aq_base') and \
556
           not hasattr(parents[1], '__bobo_traverse__'):
557
            base = aq_base(parents[1])
1✔
558
            if not hasattr(base, entry_name):
1!
559
                try:
×
560
                    if entry_name not in base:
×
561
                        raise AttributeError(entry_name)
×
562
                except TypeError:
×
563
                    raise AttributeError(entry_name)
×
564

565
        # After traversal post traversal hooks aren't available anymore
566
        del self._post_traverse
1✔
567

568
        request['PUBLISHED'] = parents.pop(0)
1✔
569

570
        # Do authorization checks
571
        user = groups = None
1✔
572
        i = 0
1✔
573

574
        if 1:  # Always perform authentication.
575

576
            last_parent_index = len(parents)
1✔
577
            if hasattr(object, '__allow_groups__'):
1!
578
                groups = object.__allow_groups__
×
579
                inext = 0
×
580
            else:
581
                inext = None
1✔
582
                for i in range(last_parent_index):
1✔
583
                    if hasattr(parents[i], '__allow_groups__'):
1✔
584
                        groups = parents[i].__allow_groups__
1✔
585
                        inext = i + 1
1✔
586
                        break
1✔
587

588
            if inext is not None:
1✔
589
                i = inext
1✔
590
                v = getattr(groups, 'validate', old_validation)
1✔
591

592
                auth = request._auth
1✔
593

594
                if v is old_validation and self.roles is UNSPECIFIED_ROLES:
1!
595
                    # No roles, so if we have a named group, get roles from
596
                    # group keys
597
                    if hasattr(groups, 'keys'):
×
598
                        self.roles = list(groups.keys())
×
599
                    else:
600
                        try:
×
601
                            groups = groups()
×
602
                        except Exception:
×
603
                            pass
×
604
                        try:
×
605
                            self.roles = list(groups.keys())
×
606
                        except Exception:
×
607
                            pass
×
608

609
                    if groups is None:
×
610
                        # Public group, hack structures to get it to validate
611
                        self.roles = None
×
612
                        auth = ''
×
613

614
                if v is old_validation:
1!
615
                    user = old_validation(groups, request, auth, self.roles)
×
616
                elif self.roles is UNSPECIFIED_ROLES:
1✔
617
                    user = v(request, auth)
1✔
618
                else:
619
                    user = v(request, auth, self.roles)
1✔
620

621
                while user is None and i < last_parent_index:
1✔
622
                    parent = parents[i]
1✔
623
                    i = i + 1
1✔
624
                    if hasattr(parent, '__allow_groups__'):
1!
625
                        groups = parent.__allow_groups__
1✔
626
                    else:
627
                        continue
×
628
                    if hasattr(groups, 'validate'):
1!
629
                        v = groups.validate
1✔
630
                    else:
631
                        v = old_validation
×
632
                    if v is old_validation:
1!
633
                        user = old_validation(
×
634
                            groups, request, auth, self.roles)
635
                    elif self.roles is UNSPECIFIED_ROLES:
1!
636
                        user = v(request, auth)
×
637
                    else:
638
                        user = v(request, auth, self.roles)
1✔
639

640
            if user is None and self.roles != UNSPECIFIED_ROLES:
1✔
641
                response.unauthorized()
1✔
642

643
        if user is not None:
1✔
644
            if validated_hook is not None:
1✔
645
                validated_hook(self, user)
1✔
646
            request['AUTHENTICATED_USER'] = user
1✔
647
            request['AUTHENTICATION_PATH'] = '/'.join(steps[:-i])
1✔
648

649
        # Remove http request method from the URL.
650
        request['URL'] = URL
1✔
651

652
        # Run post traversal hooks
653
        if post_traverse:
1✔
654
            result = exec_callables(post_traverse)
1✔
655
            if result is not None:
1✔
656
                object = result
1✔
657

658
        return object
1✔
659

660
    def post_traverse(self, f, args=()):
1✔
661
        """Add a callable object and argument tuple to be post-traversed.
662

663
        If traversal and authentication succeed, each post-traversal
664
        pair is processed in the order in which they were added.
665
        Each argument tuple is passed to its callable.  If a callable
666
        returns a value other than None, no more pairs are processed,
667
        and the return value replaces the traversal result.
668
        """
669
        try:
1✔
670
            pairs = self._post_traverse
1✔
671
        except AttributeError:
×
672
            raise RuntimeError('post_traverse() may only be called '
×
673
                               'during publishing traversal.')
674
        else:
675
            pairs.append((f, tuple(args)))
1✔
676

677
    retry_count = 0
1✔
678

679
    def supports_retry(self):
1✔
680
        return 0
1✔
681

682
    def _hold(self, object):
1✔
683
        """Hold a reference to an object to delay it's destruction until mine
684
        """
685
        if self._held is not None:
1✔
686
            self._held = self._held + (object, )
1✔
687

688

689
def exec_callables(callables):
1✔
690
    result = None
1✔
691
    for (f, args) in callables:
1✔
692
        # Don't catch exceptions here. And don't hide them anyway.
693
        result = f(*args)
1✔
694
        if result is not None:
1✔
695
            return result
1✔
696

697

698
def old_validation(groups, request, auth,
1✔
699
                   roles=UNSPECIFIED_ROLES):
700

701
    if auth:
×
702
        auth = request._authUserPW()
×
703
        if auth:
×
704
            name, password = auth
×
705
        elif roles is None:
×
706
            return ''
×
707
        else:
708
            return None
×
709
    elif 'REMOTE_USER' in request.environ:
×
710
        name = request.environ['REMOTE_USER']
×
711
        password = None
×
712
    else:
713
        if roles is None:
×
714
            return ''
×
715
        return None
×
716

717
    if roles is None:
×
718
        return name
×
719

720
    keys = None
×
721
    try:
×
722
        keys = groups.keys
×
723
    except Exception:
×
724
        try:
×
725
            groups = groups()  # Maybe it was a method defining a group
×
726
            keys = groups.keys
×
727
        except Exception:
×
728
            pass
×
729

730
    if keys is not None:
×
731
        # OK, we have a named group, so apply the roles to the named
732
        # group.
733
        if roles is UNSPECIFIED_ROLES:
×
734
            roles = keys()
×
735
        g = []
×
736
        for role in roles:
×
737
            if role in groups:
×
738
                g.append(groups[role])
×
739
        groups = g
×
740

741
    for d in groups:
×
742
        if name in d and (d[name] == password or password is None):
×
743
            return name
×
744

745
    if keys is None:
×
746
        # Not a named group, so don't go further
747
        raise Forbidden(
×
748
            """<strong>You are not authorized to access this resource""")
749

750
    return None
×
751

752

753
itypes = {
1✔
754
    bool: 0,
755
    types.CodeType: 0,
756
    complex: 0,
757
    dict: 0,
758
    float: 0,
759
    types.FrameType: 0,
760
    frozenset: 0,
761
    int: 0,
762
    list: 0,
763
    type(None): 0,
764
    set: 0,
765
    slice: 0,
766
    str: 0,
767
    types.TracebackType: 0,
768
    tuple: 0,
769
}
770
for name in ('BufferType', 'DictProxyType', 'EllipsisType',
1✔
771
             'LongType', 'UnicodeType', 'XRangeType'):
772
    if hasattr(types, name):
1!
773
        itypes[getattr(types, name)] = 0
×
774

775

776
def typeCheck(obj, deny=itypes):
1✔
777
    # Return true if its ok to publish the type, false otherwise.
778
    return deny.get(type(obj), 1)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc