• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 3956162881

pending completion
3956162881

push

github

Michael Howitz
Update to deprecation warning free releases.

4401 of 7036 branches covered (62.55%)

Branch coverage included in aggregate %.

27161 of 31488 relevant lines covered (86.26%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.9
/src/Testing/ZopeTestCase/zopedoctest/functional.py
1
##############################################################################
2
#
3
# Copyright (c) 2005 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13
"""Support for (functional) doc tests
1✔
14
"""
15

16
import doctest
1✔
17
import email.parser
1✔
18
import re
1✔
19
import sys
1✔
20
import warnings
1✔
21
from doctest import ELLIPSIS
1✔
22
from doctest import NORMALIZE_WHITESPACE
1✔
23
from doctest import REPORT_NDIFF
1✔
24
from functools import partial
1✔
25
from io import BytesIO
1✔
26

27
import transaction
1✔
28
from Testing.ZopeTestCase import Functional
1✔
29
from Testing.ZopeTestCase import FunctionalTestCase
1✔
30
from Testing.ZopeTestCase import ZopeTestCase
1✔
31
from Testing.ZopeTestCase import folder_name
1✔
32
from Testing.ZopeTestCase import standard_permissions
1✔
33
from Testing.ZopeTestCase import user_name
1✔
34
from Testing.ZopeTestCase import user_password
1✔
35
from Testing.ZopeTestCase import user_role
1✔
36
from Testing.ZopeTestCase.functional import ResponseWrapper
1✔
37
from Testing.ZopeTestCase.functional import savestate
1✔
38
from Testing.ZopeTestCase.sandbox import AppZapper
1✔
39
from ZPublisher.httpexceptions import HTTPExceptionHandler
1✔
40
from ZPublisher.utils import basic_auth_encode
1✔
41

42

43
class HTTPHeaderOutput:
1✔
44

45
    status = '200'
1✔
46
    reason = 'OK'
1✔
47

48
    def __init__(self, protocol, omit):
1✔
49
        self.headers = {}
1✔
50
        self.headersl = []
1✔
51
        self.protocol = protocol
1✔
52
        self.omit = omit
1✔
53

54
    def setResponseStatus(self, status, reason):
1✔
55
        self.status, self.reason = status, reason
1✔
56

57
    def setResponseHeaders(self, mapping):
1✔
58
        self.headers.update({
1✔
59
            '-'.join([s.capitalize() for s in name.split('-')]): v
60
            for name, v in mapping.items()
61
            if name.lower() not in self.omit
62
        })
63

64
    def appendResponseHeaders(self, lst):
1✔
65
        if lst and isinstance(lst[0], str):
1✔
66
            headers = [split_header(header) for header in lst]
1✔
67
        else:
68
            headers = lst
1✔
69
        self.headersl.extend(
1✔
70
            [('-'.join([s.capitalize() for s in name.split('-')]), v)
71
             for name, v in headers
72
             if name.lower() not in self.omit]
73
        )
74

75
    def __str__(self):
1✔
76
        out = ["%s: %s" % header for header in self.headers.items()]
1✔
77
        out.extend(["%s: %s" % header for header in self.headersl])
1✔
78
        out.sort()
1✔
79
        out.insert(0, f"{self.protocol} {self.status} {self.reason}")
1✔
80
        return '\n'.join(out)
1✔
81

82

83
class DocResponseWrapper(ResponseWrapper):
1✔
84
    """Response Wrapper for use in doctests
85
    """
86

87
    def __init__(self, response, outstream, path, header_output,
1✔
88
                 wsgi_result=(), wsgi_headers=''):
89
        ResponseWrapper.__init__(self, response, outstream, path,
1✔
90
                                 wsgi_result, wsgi_headers)
91
        self.header_output = header_output
1✔
92

93
    def __str__(self):
1✔
94
        body = self._decode(self.getBody())
1✔
95
        if body:
1✔
96
            return f"{self.header_output}\n\n{body}"
1✔
97
        return "%s\n" % (self.header_output)
1✔
98

99

100
basicre = re.compile(r'Basic (.+)?:(.+)?$')
1✔
101
headerre = re.compile(r'(\S+): (.+)$')
1✔
102

103

104
def split_header(header):
1✔
105
    return headerre.match(header).group(1, 2)
1✔
106

107

108
def auth_header(header):
1✔
109
    match = basicre.match(header)
1✔
110
    if match:
1✔
111
        u, p = match.group(1, 2)
1✔
112
        if u is None:
1✔
113
            u = ''
1✔
114
        if p is None:
1✔
115
            p = ''
1✔
116
        return basic_auth_encode(u, p)
1✔
117
    return header
1✔
118

119

120
def getRootFolder():
1✔
121
    return AppZapper().app()
1✔
122

123

124
def sync():
1✔
125
    getRootFolder()._p_jar.sync()
1✔
126

127

128
@savestate
1✔
129
def http(request_string, handle_errors=True):
1✔
130
    """Execute an HTTP request string via the publisher
131

132
    This is used for HTTP doc tests.
133
    """
134
    from urllib.parse import unquote_to_bytes
1✔
135

136
    from ZPublisher.HTTPRequest import WSGIRequest as Request
1✔
137
    from ZPublisher.HTTPResponse import WSGIResponse
1✔
138
    from ZPublisher.WSGIPublisher import publish_module
1✔
139

140
    # Commit work done by previous python code.
141
    transaction.commit()
1✔
142

143
    # Discard leading white space to make call layout simpler
144
    request_string = request_string.lstrip()
1✔
145

146
    # Split off and parse the command line
147
    newline = request_string.find('\n')
1✔
148
    command_line = request_string[:newline].rstrip()
1✔
149
    request_string = request_string[newline + 1:]
1✔
150
    method, path, protocol = command_line.split()
1✔
151

152
    env = {
1✔
153
        'HTTP_HOST': 'localhost',
154
        'HTTP_REFERER': 'localhost',
155
        'REQUEST_METHOD': method,
156
        'SERVER_PROTOCOL': protocol,
157
    }
158

159
    query = ''
1✔
160
    if '?' in path:
1✔
161
        path, query = path.split("?", 1)
1✔
162
    env['PATH_INFO'] = unquote_to_bytes(path).decode('latin-1')
1✔
163
    env['QUERY_STRING'] = query
1✔
164

165
    header_output = HTTPHeaderOutput(
1✔
166
        protocol, ('x-content-type-warning', 'x-powered-by'))
167

168
    # With a HeaderParser the payload is always a string, Parser would create a
169
    # list of messages for multipart messages.
170
    parser = email.parser.HeaderParser()
1✔
171
    msg = parser.parsestr(request_string)
1✔
172
    headers = msg.items()
1✔
173
    body = msg.get_payload()
1✔
174

175
    if isinstance(body, str):
1!
176
        body = body.encode('utf-8')
1✔
177

178
    # Store request body without headers
179
    instream = BytesIO(body)
1✔
180

181
    for name, value in headers:
1✔
182
        name = ('_'.join(name.upper().split('-')))
1✔
183
        if name not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
1✔
184
            name = 'HTTP_' + name
1✔
185
        env[name] = value.rstrip()
1✔
186

187
    if 'HTTP_AUTHORIZATION' in env:
1✔
188
        env['HTTP_AUTHORIZATION'] = auth_header(env['HTTP_AUTHORIZATION'])
1✔
189

190
    if not handle_errors:
1✔
191
        # Tell the publisher to skip exception views
192
        env['x-wsgiorg.throw_errors'] = True
1✔
193

194
    outstream = BytesIO()
1✔
195
    response = WSGIResponse(stdout=outstream, stderr=sys.stderr)
1✔
196

197
    env['wsgi.input'] = instream
1✔
198
    wsgi_headers = BytesIO()
1✔
199

200
    def start_response(status, headers):
1✔
201
        wsgi_headers.write(
1✔
202
            b'HTTP/1.1 ' + status.encode('ascii') + b'\r\n')
203
        headers = b'\r\n'.join([
1✔
204
            (k + ': ' + v).encode('ascii') for k, v in headers])
205
        wsgi_headers.write(headers)
1✔
206
        wsgi_headers.write(b'\r\n\r\n')
1✔
207

208
    publish = partial(
1✔
209
        publish_module,
210
        _request_factory=Request,
211
        _response=response)
212
    if handle_errors:
1✔
213
        publish = HTTPExceptionHandler(publish)
1✔
214

215
    wsgi_result = publish(env, start_response)
1✔
216

217
    header_output.setResponseStatus(response.getStatus(), response.errmsg)
1✔
218
    header_output.setResponseHeaders(response.headers)
1✔
219
    header_output.headersl.extend(response._cookie_list())
1✔
220
    header_output.appendResponseHeaders(response.accumulated_headers)
1✔
221

222
    sync()
1✔
223

224
    return DocResponseWrapper(
1✔
225
        response, outstream, path, header_output, wsgi_result, wsgi_headers)
226

227

228
class ZopeSuiteFactory:
1✔
229

230
    def __init__(self, *args, **kw):
1✔
231
        self._args = args
1✔
232
        self._kw = kw
1✔
233
        self._layer = None
1✔
234
        self.setup_globs()
1✔
235
        self.setup_test_class()
1✔
236
        self.setup_optionflags()
1✔
237

238
    def doctestsuite(self):
1✔
239
        suite = doctest.DocTestSuite(*self._args, **self._kw)
1✔
240
        if self._layer is not None:
1!
241
            suite.layer = self._layer
1✔
242
        return suite
1✔
243

244
    def docfilesuite(self):
1✔
245
        suite = doctest.DocFileSuite(*self._args, **self._kw)
1✔
246
        if self._layer is not None:
1!
247
            suite.layer = self._layer
1✔
248
        return suite
1✔
249

250
    def setup_globs(self):
1✔
251
        globs = self._kw.setdefault('globs', {})
1✔
252
        globs['folder_name'] = folder_name
1✔
253
        globs['user_name'] = user_name
1✔
254
        globs['user_password'] = user_password
1✔
255
        globs['user_role'] = user_role
1✔
256
        globs['standard_permissions'] = standard_permissions
1✔
257

258
    def setup_test_class(self):
1✔
259
        test_class = self._kw.get('test_class', ZopeTestCase)
1✔
260

261
        if 'test_class' in self._kw:
1✔
262
            del self._kw['test_class']
1✔
263

264
        if hasattr(test_class, 'layer'):
1!
265
            self._layer = test_class.layer
1✔
266

267
        # If the test_class does not have a runTest method, we add
268
        # a dummy attribute so that TestCase construction works.
269
        # runTest may get called, so set it to something that works.
270
        if not hasattr(test_class, 'runTest'):
1✔
271
            setattr(test_class, 'runTest', lambda x: None)
1✔
272

273
        # Create a TestCase instance which will be used to execute
274
        # the setUp and tearDown methods, as well as be passed into
275
        # the test globals as 'self'.
276
        test_instance = test_class()
1✔
277

278
        kwsetUp = self._kw.get('setUp')
1✔
279

280
        def setUp(test):
1✔
281
            test_instance.setUp()
1✔
282
            test.globs['test'] = test
1✔
283
            test.globs['self'] = test_instance
1✔
284
            if hasattr(test_instance, 'app'):
1!
285
                test.globs['app'] = test_instance.app
1✔
286
            if hasattr(test_instance, 'folder'):
1!
287
                test.globs['folder'] = test_instance.folder
1✔
288
            if hasattr(test_instance, 'portal'):
1!
289
                test.globs['portal'] = test_instance.portal
×
290
                test.globs['portal_name'] = test_instance.portal.getId()
×
291
            test_instance.globs = test.globs
1✔
292
            if kwsetUp is not None:
1✔
293
                kwsetUp(test_instance)
1✔
294

295
        self._kw['setUp'] = setUp
1✔
296

297
        kwtearDown = self._kw.get('tearDown')
1✔
298

299
        def tearDown(test):
1✔
300
            if kwtearDown is not None:
1!
301
                kwtearDown(test_instance)
×
302
            test_instance.tearDown()
1✔
303

304
        self._kw['tearDown'] = tearDown
1✔
305

306
    def setup_optionflags(self):
1✔
307
        if 'optionflags' not in self._kw:
1!
308
            self._kw['optionflags'] = (ELLIPSIS | NORMALIZE_WHITESPACE)
1✔
309

310

311
class FunctionalSuiteFactory(ZopeSuiteFactory):
1✔
312

313
    def setup_globs(self):
1✔
314
        ZopeSuiteFactory.setup_globs(self)
1✔
315
        globs = self._kw.setdefault('globs', {})
1✔
316
        globs['http'] = http
1✔
317
        globs['getRootFolder'] = getRootFolder
1✔
318
        globs['sync'] = sync
1✔
319
        globs['user_auth'] = basic_auth_encode(user_name, user_password)
1✔
320

321
    def setup_test_class(self):
1✔
322
        test_class = self._kw.get('test_class', FunctionalTestCase)
1✔
323

324
        # If the passed-in test_class doesn't subclass Functional,
325
        # we mix it in for you, but we will issue a warning.
326
        if not issubclass(test_class, Functional):
1✔
327
            name = test_class.__name__
1✔
328
            warnings.warn(("The test_class you are using doesn't "
1✔
329
                           "subclass from ZopeTestCase.Functional. "
330
                           "Please fix that."), UserWarning, 4)
331
            if 'Functional' not in name:
1!
332
                name = 'Functional%s' % name
1✔
333
            test_class = type(name, (Functional, test_class), {})
1✔
334

335
        self._kw['test_class'] = test_class
1✔
336
        ZopeSuiteFactory.setup_test_class(self)
1✔
337

338
    def setup_optionflags(self):
1✔
339
        if 'optionflags' not in self._kw:
1✔
340
            self._kw['optionflags'] = (
1✔
341
                ELLIPSIS | REPORT_NDIFF | NORMALIZE_WHITESPACE)
342

343

344
def ZopeDocTestSuite(module=None, **kw):
1✔
345
    module = doctest._normalize_module(module)
1✔
346
    return ZopeSuiteFactory(module, **kw).doctestsuite()
1✔
347

348

349
def ZopeDocFileSuite(*paths, **kw):
1✔
350
    if kw.get('module_relative', True):
1!
351
        kw['package'] = doctest._normalize_module(kw.get('package'))
1✔
352
    return ZopeSuiteFactory(*paths, **kw).docfilesuite()
1✔
353

354

355
def FunctionalDocTestSuite(module=None, **kw):
1✔
356
    module = doctest._normalize_module(module)
1✔
357
    return FunctionalSuiteFactory(module, **kw).doctestsuite()
1✔
358

359

360
def FunctionalDocFileSuite(*paths, **kw):
1✔
361
    if kw.get('module_relative', True):
1!
362
        kw['package'] = doctest._normalize_module(kw.get('package'))
1✔
363
    return FunctionalSuiteFactory(*paths, **kw).docfilesuite()
1✔
364

365

366
__all__ = [
1✔
367
    'ZopeDocTestSuite',
368
    'ZopeDocFileSuite',
369
    'FunctionalDocTestSuite',
370
    'FunctionalDocFileSuite',
371
]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc