• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 3956162881

pending completion
3956162881

push

github

Michael Howitz
Update to deprecation warning free releases.

4401 of 7036 branches covered (62.55%)

Branch coverage included in aggregate %.

27161 of 31488 relevant lines covered (86.26%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.8
/src/Testing/ZopeTestCase/functional.py
1
##############################################################################
2
#
3
# Copyright (c) 2005 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13
"""Support for functional unit testing in ZTC
1✔
14

15
After Marius Gedminas' functional.py module for Zope3.
16
"""
17

18
import sys
1✔
19
from functools import partial
1✔
20
from urllib.parse import unquote_to_bytes
1✔
21

22
import transaction
1✔
23
from Testing.ZopeTestCase import interfaces
1✔
24
from Testing.ZopeTestCase import sandbox
1✔
25
from zope.interface import implementer
1✔
26
from ZPublisher.httpexceptions import HTTPExceptionHandler
1✔
27
from ZPublisher.utils import basic_auth_encode
1✔
28

29

30
def savestate(func):
1✔
31
    '''Decorator saving thread local state before executing func
32
       and restoring it afterwards.
33
    '''
34
    from AccessControl.SecurityManagement import getSecurityManager
1✔
35
    from AccessControl.SecurityManagement import setSecurityManager
1✔
36
    from zope.component.hooks import getSite
1✔
37
    from zope.component.hooks import setSite
1✔
38

39
    def wrapped_func(*args, **kw):
1✔
40
        sm, site = getSecurityManager(), getSite()
1✔
41
        try:
1✔
42
            return func(*args, **kw)
1✔
43
        finally:
44
            setSecurityManager(sm)
1✔
45
            setSite(site)
1✔
46
    return wrapped_func
1✔
47

48

49
@implementer(interfaces.IFunctional)
1✔
50
class Functional(sandbox.Sandboxed):
1✔
51
    '''Derive from this class and an xTestCase to get functional
52
       testing support::
53

54
           class MyTest(Functional, ZopeTestCase):
55
               ...
56
    '''
57

58
    @savestate
1✔
59
    def publish(self, path, basic=None, env=None, extra=None,
1✔
60
                request_method='GET', stdin=None, handle_errors=True):
61
        '''Publishes the object at 'path' returning a response object.'''
62

63
        from io import BytesIO
1✔
64

65
        from ZPublisher.HTTPRequest import WSGIRequest as Request
1✔
66
        from ZPublisher.HTTPResponse import WSGIResponse
1✔
67
        from ZPublisher.WSGIPublisher import publish_module
1✔
68

69
        # Commit the sandbox for good measure
70
        transaction.commit()
1✔
71

72
        if env is None:
1!
73
            env = {}
1✔
74
        if extra is None:
1!
75
            extra = {}
1✔
76

77
        request = self.app.REQUEST
1✔
78

79
        env['SERVER_NAME'] = request['SERVER_NAME']
1✔
80
        env['SERVER_PORT'] = request['SERVER_PORT']
1✔
81
        env['SERVER_PROTOCOL'] = 'HTTP/1.1'
1✔
82
        env['REQUEST_METHOD'] = request_method
1✔
83

84
        query = ''
1✔
85
        if '?' in path:
1✔
86
            path, query = path.split("?", 1)
1✔
87
        env['PATH_INFO'] = unquote_to_bytes(path).decode('latin-1')
1✔
88
        env['QUERY_STRING'] = query
1✔
89

90
        if basic:
1✔
91
            env['HTTP_AUTHORIZATION'] = basic_auth_encode(basic)
1✔
92

93
        if not handle_errors:
1!
94
            # Tell the publisher to skip exception views
95
            env['x-wsgiorg.throw_errors'] = True
×
96

97
        if stdin is None:
1✔
98
            stdin = BytesIO()
1✔
99
        env['wsgi.input'] = stdin
1✔
100

101
        outstream = BytesIO()
1✔
102
        response = WSGIResponse(stdout=outstream, stderr=sys.stderr)
1✔
103

104
        def request_factory(*args):
1✔
105
            request = Request(*args)
1✔
106
            for k, v in extra.items():
1!
107
                request[k] = v
×
108
            return request
1✔
109

110
        wsgi_headers = BytesIO()
1✔
111

112
        def start_response(status, headers):
1✔
113
            # Keep the fake response in-sync with the actual values
114
            # from the WSGI start_response call.
115
            response.setStatus(status.split()[0])
1✔
116
            for key, value in headers:
1✔
117
                response.setHeader(key, value)
1✔
118

119
            wsgi_headers.write(
1✔
120
                b'HTTP/1.1 ' + status.encode('ascii') + b'\r\n')
121
            headers = b'\r\n'.join([
1✔
122
                (k + ': ' + v).encode('ascii') for k, v in headers])
123
            wsgi_headers.write(headers)
1✔
124
            wsgi_headers.write(b'\r\n\r\n')
1✔
125

126
        publish = partial(
1✔
127
            publish_module,
128
            _request_factory=request_factory,
129
            _response=response)
130
        if handle_errors:
1!
131
            publish = HTTPExceptionHandler(publish)
1✔
132

133
        wsgi_result = publish(env, start_response)
1✔
134

135
        return ResponseWrapper(response, outstream, path,
1✔
136
                               wsgi_result, wsgi_headers)
137

138

139
class ResponseWrapper:
1✔
140
    '''Decorates a response object with additional introspective methods.'''
141

142
    def __init__(self, response, outstream, path,
1✔
143
                 wsgi_result=(), wsgi_headers=''):
144
        self._response = response
1✔
145
        self._outstream = outstream
1✔
146
        self._path = path
1✔
147
        self._wsgi_result = wsgi_result
1✔
148
        self._wsgi_headers = wsgi_headers
1✔
149

150
    def __getattr__(self, name):
1✔
151
        # This delegates introspection like getStatus to the fake
152
        # response class, though the actual values are part of the
153
        # _wsgi_headers / _wsgi_result. Might be better to ignore
154
        # the response and parse values out of the WSGI data instead.
155
        return getattr(self._response, name)
1✔
156

157
    def __bytes__(self):
1✔
158
        return self.getOutput()
×
159

160
    def __str__(self):
1✔
161
        return self._decode(self.getOutput())
1✔
162

163
    def getOutput(self):
1✔
164
        '''Returns the complete output, headers and all.'''
165
        return self._wsgi_headers.getvalue() + self.getBody()
1✔
166

167
    def getBody(self):
1✔
168
        '''Returns the page body, i.e. the output par headers.'''
169
        return b''.join(self._wsgi_result)
1✔
170

171
    def getPath(self):
1✔
172
        '''Returns the path used by the request.'''
173
        return self._path
×
174

175
    def getHeader(self, name):
1✔
176
        '''Returns the value of a response header.'''
177
        return self.headers.get(name.lower())
1✔
178

179
    def getCookie(self, name):
1✔
180
        '''Returns a response cookie.'''
181
        return self.cookies.get(name)
1✔
182

183
    def _decode(self, data):
1✔
184
        # This is a hack. This method is called to print a response
185
        # as part of a doctest. But if that response contains an
186
        # actual binary body, like a GIF image, there's no good
187
        # way to print that into the doctest output.
188
        try:
1✔
189
            return data.decode('utf-8')
1✔
190
        except UnicodeDecodeError:
1✔
191
            return data.decode('latin-1')
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc