• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 5286698593

pending completion
5286698593

push

github

icemac
- fix pip version incompatibility under Python 3.12

4283 of 6889 branches covered (62.17%)

Branch coverage included in aggregate %.

27156 of 31453 relevant lines covered (86.34%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.17
/src/ZPublisher/tests/testHTTPRequest.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13

14
import sys
1✔
15
import unittest
1✔
16
import warnings
1✔
17
from contextlib import contextmanager
1✔
18
from io import BytesIO
1✔
19
from unittest.mock import patch
1✔
20
from urllib.parse import quote_plus
1✔
21

22
from AccessControl.tainted import TaintedString
1✔
23
from AccessControl.tainted import should_be_tainted
1✔
24
from zExceptions import NotFound
1✔
25
from zope.component import getGlobalSiteManager
1✔
26
from zope.component import provideAdapter
1✔
27
from zope.i18n.interfaces import IUserPreferredLanguages
1✔
28
from zope.i18n.interfaces.locales import ILocale
1✔
29
from zope.publisher.browser import BrowserLanguages
1✔
30
from zope.publisher.interfaces.http import IHTTPRequest
1✔
31
from zope.testing.cleanup import cleanUp
1✔
32
from ZPublisher.HTTPRequest import BadRequest
1✔
33
from ZPublisher.HTTPRequest import FileUpload
1✔
34
from ZPublisher.HTTPRequest import search_type
1✔
35
from ZPublisher.interfaces import IXmlrpcChecker
1✔
36
from ZPublisher.tests.testBaseRequest import TestRequestViewsBase
1✔
37
from ZPublisher.utils import basic_auth_encode
1✔
38
from ZPublisher.xmlrpc import is_xmlrpc_response
1✔
39

40

41
class RecordTests(unittest.TestCase):
1✔
42

43
    def _makeOne(self):
1✔
44
        from ZPublisher.HTTPRequest import record
1✔
45
        return record()
1✔
46

47
    def test_dict_methods(self):
1✔
48
        rec = self._makeOne()
1✔
49
        rec.a = 1
1✔
50
        self.assertEqual(rec['a'], 1)
1✔
51
        self.assertEqual(rec.get('a'), 1)
1✔
52
        self.assertEqual(list(rec.keys()), ['a'])
1✔
53
        self.assertEqual(list(rec.values()), [1])
1✔
54
        self.assertEqual(list(rec.items()), [('a', 1)])
1✔
55

56
    def test_dict_special_methods(self):
1✔
57
        rec = self._makeOne()
1✔
58
        rec.a = 1
1✔
59
        self.assertTrue('a' in rec)
1✔
60
        self.assertFalse('b' in rec)
1✔
61
        self.assertEqual(len(rec), 1)
1✔
62
        self.assertEqual(list(iter(rec)), ['a'])
1✔
63

64
    def test_copy(self):
1✔
65
        rec = self._makeOne()
1✔
66
        rec.a = 1
1✔
67
        rec.b = 'foo'
1✔
68
        new_rec = rec.copy()
1✔
69
        self.assertIsInstance(new_rec, dict)
1✔
70
        self.assertEqual(new_rec, {'a': 1, 'b': 'foo'})
1✔
71

72
    def test_eq(self):
1✔
73
        rec1 = self._makeOne()
1✔
74
        self.assertFalse(rec1, {})
1✔
75
        rec2 = self._makeOne()
1✔
76
        self.assertEqual(rec1, rec2)
1✔
77
        rec1.a = 1
1✔
78
        self.assertNotEqual(rec1, rec2)
1✔
79
        rec2.a = 1
1✔
80
        self.assertEqual(rec1, rec2)
1✔
81
        rec2.b = 'foo'
1✔
82
        self.assertNotEqual(rec1, rec2)
1✔
83

84
    def test__str__returns_native_string(self):
1✔
85
        rec = self._makeOne()
1✔
86
        rec.a = b'foo'
1✔
87
        rec.b = 8
1✔
88
        rec.c = 'bar'
1✔
89
        self.assertIsInstance(str(rec), str)
1✔
90

91
    def test_str(self):
1✔
92
        rec = self._makeOne()
1✔
93
        rec.a = 1
1✔
94
        self.assertEqual(str(rec), 'a: 1')
1✔
95

96
    def test_repr(self):
1✔
97
        rec = self._makeOne()
1✔
98
        rec.a = 1
1✔
99
        rec.b = 'foo'
1✔
100
        r = repr(rec)
1✔
101
        d = eval(r)
1✔
102
        self.assertEqual(d, rec.__dict__)
1✔
103

104

105
class HTTPRequestFactoryMixin:
1✔
106

107
    def tearDown(self):
1✔
108
        cleanUp()
×
109

110
    def _getTargetClass(self):
1✔
111
        from ZPublisher.HTTPRequest import HTTPRequest
1✔
112
        return HTTPRequest
1✔
113

114
    def _makePostEnviron(self, body=b'', multipart=True):
1✔
115
        environ = TEST_POST_ENVIRON.copy()
1✔
116
        environ["CONTENT_TYPE"] = \
1✔
117
            multipart and 'multipart/form-data; boundary=12345' \
118
            or 'application/x-www-form-urlencoded'
119
        environ['CONTENT_LENGTH'] = str(len(body))
1✔
120
        return environ
1✔
121

122
    def _makeOne(self, stdin=None, environ=None, response=None, clean=1):
1✔
123
        from ZPublisher.HTTPResponse import HTTPResponse
1✔
124
        if stdin is None:
1✔
125
            stdin = BytesIO()
1✔
126

127
        if environ is None:
1✔
128
            environ = {}
1✔
129

130
        if 'REQUEST_METHOD' not in environ:
1✔
131
            environ['REQUEST_METHOD'] = 'GET'
1✔
132

133
        if 'SERVER_NAME' not in environ:
1✔
134
            environ['SERVER_NAME'] = 'localhost'
1✔
135

136
        if 'SERVER_PORT' not in environ:
1✔
137
            environ['SERVER_PORT'] = '8080'
1✔
138

139
        if response is None:
1✔
140
            response = HTTPResponse(stdout=BytesIO())
1✔
141

142
        return self._getTargetClass()(stdin, environ, response, clean)
1✔
143

144

145
class HTTPRequestTests(unittest.TestCase, HTTPRequestFactoryMixin):
1✔
146

147
    def _processInputs(self, inputs):
1✔
148
        # Have the inputs processed, and return a HTTPRequest object
149
        # holding the result.
150
        # inputs is expected to be a list of (key, value) tuples, no CGI
151
        # encoding is required.
152

153
        query_string = []
1✔
154
        add = query_string.append
1✔
155
        for key, val in inputs:
1✔
156
            add(f"{quote_plus(key)}={quote_plus(val)}")
1✔
157
        query_string = '&'.join(query_string)
1✔
158

159
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
160
        env['QUERY_STRING'] = query_string
1✔
161
        req = self._makeOne(environ=env)
1✔
162
        req.processInputs()
1✔
163
        self._noFormValuesInOther(req)
1✔
164
        return req
1✔
165

166
    def _noTaintedValues(self, req):
1✔
167
        self.assertFalse(list(req.taintedform.keys()))
1✔
168

169
    def _valueIsOrHoldsTainted(self, val):
1✔
170
        # Recursively searches a structure for a TaintedString and returns 1
171
        # when one is found.
172
        # Also raises an Assertion if a string which *should* have been
173
        # tainted is found, or when a tainted string is not deemed dangerous.
174
        from AccessControl.tainted import TaintedString
1✔
175
        from ZPublisher.HTTPRequest import record
1✔
176

177
        retval = 0
1✔
178

179
        if isinstance(val, TaintedString):
1✔
180
            self.assertTrue(
1✔
181
                should_be_tainted(val._value),
182
                "%r is not dangerous, no taint required." % val)
183
            retval = 1
1✔
184

185
        elif isinstance(val, record):
1✔
186
            for attr, value in list(val.__dict__.items()):
1✔
187
                rval = self._valueIsOrHoldsTainted(attr)
1✔
188
                if rval:
1!
189
                    retval = 1
×
190
                rval = self._valueIsOrHoldsTainted(value)
1✔
191
                if rval:
1✔
192
                    retval = 1
1✔
193

194
        elif type(val) in (list, tuple):
1✔
195
            for entry in val:
1✔
196
                rval = self._valueIsOrHoldsTainted(entry)
1✔
197
                if rval:
1✔
198
                    retval = 1
1✔
199

200
        elif isinstance(val, str):
1✔
201
            self.assertFalse(
1✔
202
                should_be_tainted(val),
203
                "'%s' is dangerous and should have been tainted." % val)
204

205
        return retval
1✔
206

207
    def _noFormValuesInOther(self, req):
1✔
208
        for key in list(req.taintedform.keys()):
1✔
209
            self.assertFalse(
1✔
210
                key in req.other,
211
                'REQUEST.other should not hold tainted values at first!')
212

213
        for key in list(req.form.keys()):
1✔
214
            self.assertFalse(
1✔
215
                key in req.other,
216
                'REQUEST.other should not hold form values at first!')
217

218
    def _onlyTaintedformHoldsTaintedStrings(self, req):
1✔
219
        for key, val in list(req.taintedform.items()):
1✔
220
            self.assertTrue(
1✔
221
                self._valueIsOrHoldsTainted(key)
222
                or self._valueIsOrHoldsTainted(val),
223
                'Tainted form holds item %s that is not tainted' % key)
224

225
        for key, val in list(req.form.items()):
1✔
226
            if key in req.taintedform:
1✔
227
                continue
1✔
228
            self.assertFalse(
1✔
229
                self._valueIsOrHoldsTainted(key)
230
                or self._valueIsOrHoldsTainted(val),
231
                'Normal form holds item %s that is tainted' % key)
232

233
    def _taintedKeysAlsoInForm(self, req):
1✔
234
        for key in list(req.taintedform.keys()):
1✔
235
            self.assertTrue(
1✔
236
                key in req.form,
237
                "Found tainted %s not in form" % key)
238
            self.assertEqual(
1✔
239
                req.form[key], req.taintedform[key],
240
                "Key %s not correctly reproduced in tainted; expected %r, "
241
                "got %r" % (key, req.form[key], req.taintedform[key]))
242

243
    def test_webdav_source_port_available(self):
1✔
244
        req = self._makeOne()
1✔
245
        self.assertFalse(req.get('WEBDAV_SOURCE_PORT'))
1✔
246

247
        req = self._makeOne(environ={'WEBDAV_SOURCE_PORT': 1})
1✔
248
        self.assertTrue(req.get('WEBDAV_SOURCE_PORT'))
1✔
249

250
    def test_no_docstring_on_instance(self):
1✔
251
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
252
        req = self._makeOne(environ=env)
1✔
253
        self.assertTrue(req.__doc__ is None)
1✔
254

255
    def test___bobo_traverse___raises(self):
1✔
256
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
257
        req = self._makeOne(environ=env)
1✔
258
        self.assertRaises(KeyError, req.__bobo_traverse__, 'REQUEST')
1✔
259
        self.assertRaises(KeyError, req.__bobo_traverse__, 'BODY')
1✔
260
        self.assertRaises(KeyError, req.__bobo_traverse__, 'BODYFILE')
1✔
261
        self.assertRaises(KeyError, req.__bobo_traverse__, 'RESPONSE')
1✔
262

263
    def test_processInputs_wo_query_string(self):
1✔
264
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
265
        req = self._makeOne(environ=env)
1✔
266
        req.processInputs()
1✔
267
        self._noFormValuesInOther(req)
1✔
268
        self.assertEqual(req.form, {})
1✔
269

270
    def test_processInputs_wo_marshalling(self):
1✔
271
        inputs = (
1✔
272
            ('foo', 'bar'), ('spam', 'eggs'),
273
            ('number', '1'),
274
            ('spacey key', 'val'), ('key', 'spacey val'),
275
            ('multi', '1'), ('multi', '2'))
276
        req = self._processInputs(inputs)
1✔
277

278
        formkeys = list(req.form.keys())
1✔
279
        formkeys.sort()
1✔
280
        self.assertEqual(
1✔
281
            formkeys,
282
            ['foo', 'key', 'multi', 'number', 'spacey key', 'spam'])
283
        self.assertEqual(req['number'], '1')
1✔
284
        self.assertEqual(req['multi'], ['1', '2'])
1✔
285
        self.assertEqual(req['spacey key'], 'val')
1✔
286
        self.assertEqual(req['key'], 'spacey val')
1✔
287

288
        self._noTaintedValues(req)
1✔
289
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
290

291
    def test_processInputs_w_simple_marshalling(self):
1✔
292
        from DateTime.DateTime import DateTime
1✔
293
        inputs = (
1✔
294
            ('num:int', '42'), ('fract:float', '4.2'), ('bign:long', '45'),
295
            ('words:string', 'Some words'), ('2tokens:tokens', 'one two'),
296
            ('aday:date', '2002/07/23'),
297
            ('accountedfor:required', 'yes'),
298
            ('multiline:lines', 'one\ntwo'),
299
            ('morewords:text', 'one\ntwo\n'))
300
        req = self._processInputs(inputs)
1✔
301

302
        formkeys = list(req.form.keys())
1✔
303
        formkeys.sort()
1✔
304
        self.assertEqual(
1✔
305
            formkeys,
306
            ['2tokens', 'accountedfor', 'aday', 'bign',
307
             'fract', 'morewords', 'multiline', 'num', 'words'])
308

309
        self.assertEqual(req['2tokens'], ['one', 'two'])
1✔
310
        self.assertEqual(req['accountedfor'], 'yes')
1✔
311
        self.assertEqual(req['aday'], DateTime('2002/07/23'))
1✔
312
        self.assertEqual(req['bign'], 45)
1✔
313
        self.assertEqual(req['fract'], 4.2)
1✔
314
        self.assertEqual(req['morewords'], 'one\ntwo\n')
1✔
315
        self.assertEqual(req['multiline'], ['one', 'two'])
1✔
316
        self.assertEqual(req['num'], 42)
1✔
317
        self.assertEqual(req['words'], 'Some words')
1✔
318

319
        self._noTaintedValues(req)
1✔
320
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
321

322
    def test_processInputs_w_unicode_conversions(self):
1✔
323
        # This tests native strings.
324
        reg_char = '\xae'
1✔
325
        inputs = (('ustring:ustring:utf8', 'test' + reg_char),
1✔
326
                  ('utext:utext:utf8',
327
                   'test' + reg_char + '\ntest' + reg_char + '\n'),
328
                  ('utokens:utokens:utf8',
329
                   'test' + reg_char + ' test' + reg_char),
330
                  ('ulines:ulines:utf8',
331
                   'test' + reg_char + '\ntest' + reg_char),
332
                  ('nouconverter:string:utf8', 'test' + reg_char))
333
        # unicode converters will go away with Zope 6
334
        # ignore deprecation warning for test run
335
        with warnings.catch_warnings():
1✔
336
            warnings.simplefilter('ignore')
1✔
337
            req = self._processInputs(inputs)
1✔
338

339
        formkeys = list(req.form.keys())
1✔
340
        formkeys.sort()
1✔
341
        self.assertEqual(
1✔
342
            formkeys,
343
            ['nouconverter', 'ulines', 'ustring', 'utext', 'utokens'])
344

345
        self.assertEqual(req['ustring'], 'test\u00AE')
1✔
346
        self.assertEqual(req['utext'], 'test\u00AE\ntest\u00AE\n')
1✔
347
        self.assertEqual(req['utokens'], ['test\u00AE', 'test\u00AE'])
1✔
348
        self.assertEqual(req['ulines'], ['test\u00AE', 'test\u00AE'])
1✔
349

350
        # expect a utf-8 encoded version
351
        self.assertEqual(req['nouconverter'], 'test' + reg_char)
1✔
352

353
        self._noTaintedValues(req)
1✔
354
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
355

356
    def test_processInputs_w_simple_containers(self):
1✔
357
        inputs = (
1✔
358
            ('oneitem:list', 'one'),
359
            ('alist:list', 'one'), ('alist:list', 'two'),
360
            ('oneitemtuple:tuple', 'one'),
361
            ('atuple:tuple', 'one'), ('atuple:tuple', 'two'),
362
            ('onerec.foo:record', 'foo'), ('onerec.bar:record', 'bar'),
363
            ('setrec.foo:records', 'foo'), ('setrec.bar:records', 'bar'),
364
            ('setrec.foo:records', 'spam'), ('setrec.bar:records', 'eggs'))
365
        req = self._processInputs(inputs)
1✔
366

367
        formkeys = list(req.form.keys())
1✔
368
        formkeys.sort()
1✔
369
        self.assertEqual(
1✔
370
            formkeys,
371
            ['alist', 'atuple', 'oneitem', 'oneitemtuple', 'onerec', 'setrec'])
372

373
        self.assertEqual(req['oneitem'], ['one'])
1✔
374
        self.assertEqual(req['oneitemtuple'], ('one',))
1✔
375
        self.assertEqual(req['alist'], ['one', 'two'])
1✔
376
        self.assertEqual(req['atuple'], ('one', 'two'))
1✔
377
        self.assertEqual(req['onerec'].foo, 'foo')
1✔
378
        self.assertEqual(req['onerec'].bar, 'bar')
1✔
379
        self.assertEqual(len(req['setrec']), 2)
1✔
380
        self.assertEqual(req['setrec'][0].foo, 'foo')
1✔
381
        self.assertEqual(req['setrec'][0].bar, 'bar')
1✔
382
        self.assertEqual(req['setrec'][1].foo, 'spam')
1✔
383
        self.assertEqual(req['setrec'][1].bar, 'eggs')
1✔
384

385
        self._noTaintedValues(req)
1✔
386
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
387

388
    def test_processInputs_w_marshalling_into_sequences(self):
1✔
389
        inputs = (
1✔
390
            ('ilist:int:list', '1'), ('ilist:int:list', '2'),
391
            ('ilist:list:int', '3'),
392
            ('ftuple:float:tuple', '1.0'), ('ftuple:float:tuple', '1.1'),
393
            ('ftuple:tuple:float', '1.2'),
394
            ('tlist:tokens:list', 'one two'), ('tlist:list:tokens', '3 4'))
395
        req = self._processInputs(inputs)
1✔
396

397
        formkeys = list(req.form.keys())
1✔
398
        formkeys.sort()
1✔
399
        self.assertEqual(formkeys, ['ftuple', 'ilist', 'tlist'])
1✔
400

401
        self.assertEqual(req['ilist'], [1, 2, 3])
1✔
402
        self.assertEqual(req['ftuple'], (1.0, 1.1, 1.2))
1✔
403
        self.assertEqual(req['tlist'], [['one', 'two'], ['3', '4']])
1✔
404

405
        self._noTaintedValues(req)
1✔
406
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
407

408
    def test_processInputs_w_records_w_sequences(self):
1✔
409
        inputs = (
1✔
410
            ('onerec.name:record', 'foo'),
411
            ('onerec.tokens:tokens:record', 'one two'),
412
            ('onerec.ints:int:record', '1'),
413
            ('onerec.ints:int:record', '2'),
414

415
            ('setrec.name:records', 'first'),
416
            ('setrec.ilist:list:int:records', '1'),
417
            ('setrec.ilist:list:int:records', '2'),
418
            ('setrec.ituple:tuple:int:records', '1'),
419
            ('setrec.ituple:tuple:int:records', '2'),
420
            ('setrec.name:records', 'second'),
421
            ('setrec.ilist:list:int:records', '1'),
422
            ('setrec.ilist:list:int:records', '2'),
423
            ('setrec.ituple:tuple:int:records', '1'),
424
            ('setrec.ituple:tuple:int:records', '2'))
425
        req = self._processInputs(inputs)
1✔
426

427
        formkeys = list(req.form.keys())
1✔
428
        formkeys.sort()
1✔
429
        self.assertEqual(formkeys, ['onerec', 'setrec'])
1✔
430

431
        self.assertEqual(req['onerec'].name, 'foo')
1✔
432
        self.assertEqual(req['onerec'].tokens, ['one', 'two'])
1✔
433
        # Implicit sequences and records don't mix.
434
        self.assertEqual(req['onerec'].ints, 2)
1✔
435

436
        self.assertEqual(len(req['setrec']), 2)
1✔
437
        self.assertEqual(req['setrec'][0].name, 'first')
1✔
438
        self.assertEqual(req['setrec'][1].name, 'second')
1✔
439

440
        for i in range(2):
1✔
441
            self.assertEqual(req['setrec'][i].ilist, [1, 2])
1✔
442
            self.assertEqual(req['setrec'][i].ituple, (1, 2))
1✔
443

444
        self._noTaintedValues(req)
1✔
445
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
446

447
    def test_processInputs_w_defaults(self):
1✔
448
        inputs = (
1✔
449
            ('foo:default:int', '5'),
450

451
            ('alist:int:default', '3'),
452
            ('alist:int:default', '4'),
453
            ('alist:int:default', '5'),
454
            ('alist:int', '1'),
455
            ('alist:int', '2'),
456

457
            ('explicitlist:int:list:default', '3'),
458
            ('explicitlist:int:list:default', '4'),
459
            ('explicitlist:int:list:default', '5'),
460
            ('explicitlist:int:list', '1'),
461
            ('explicitlist:int:list', '2'),
462

463
            ('bar.spam:record:default', 'eggs'),
464
            ('bar.foo:record:default', 'foo'),
465
            ('bar.foo:record', 'baz'),
466

467
            ('setrec.spam:records:default', 'eggs'),
468
            ('setrec.foo:records:default', 'foo'),
469
            ('setrec.foo:records', 'baz'),
470
            ('setrec.foo:records', 'ham'),
471
        )
472
        req = self._processInputs(inputs)
1✔
473

474
        formkeys = list(req.form.keys())
1✔
475
        formkeys.sort()
1✔
476
        self.assertEqual(
1✔
477
            formkeys, ['alist', 'bar', 'explicitlist', 'foo', 'setrec'])
478

479
        self.assertEqual(req['alist'], [1, 2, 3, 4, 5])
1✔
480
        self.assertEqual(req['explicitlist'], [1, 2, 3, 4, 5])
1✔
481

482
        self.assertEqual(req['foo'], 5)
1✔
483
        self.assertEqual(req['bar'].spam, 'eggs')
1✔
484
        self.assertEqual(req['bar'].foo, 'baz')
1✔
485

486
        self.assertEqual(len(req['setrec']), 2)
1✔
487
        self.assertEqual(req['setrec'][0].spam, 'eggs')
1✔
488
        self.assertEqual(req['setrec'][0].foo, 'baz')
1✔
489
        self.assertEqual(req['setrec'][1].spam, 'eggs')
1✔
490
        self.assertEqual(req['setrec'][1].foo, 'ham')
1✔
491

492
        self._noTaintedValues(req)
1✔
493
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
494

495
    def test_processInputs_wo_marshalling_w_Taints(self):
1✔
496
        inputs = (
1✔
497
            ('foo', 'bar'), ('spam', 'eggs'),
498
            ('number', '1'),
499
            ('tainted', '<tainted value>'),
500
            ('<tainted key>', 'value'),
501
            ('spacey key', 'val'), ('key', 'spacey val'),
502
            ('tinitmulti', '<1>'), ('tinitmulti', '2'),
503
            ('tdefermulti', '1'), ('tdefermulti', '<2>'),
504
            ('tallmulti', '<1>'), ('tallmulti', '<2>'))
505
        req = self._processInputs(inputs)
1✔
506

507
        taintedformkeys = list(req.taintedform.keys())
1✔
508
        taintedformkeys.sort()
1✔
509
        self.assertEqual(
1✔
510
            taintedformkeys,
511
            ['<tainted key>', 'tainted',
512
             'tallmulti', 'tdefermulti', 'tinitmulti'])
513

514
        self._taintedKeysAlsoInForm(req)
1✔
515
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
516

517
    def test_processInputs_w_simple_marshalling_w_taints(self):
1✔
518
        inputs = (
1✔
519
            ('<tnum>:int', '42'), ('<tfract>:float', '4.2'),
520
            ('<tbign>:long', '45'),
521
            ('twords:string', 'Some <words>'),
522
            ('t2tokens:tokens', 'one <two>'),
523
            ('<taday>:date', '2002/07/23'),
524
            ('taccountedfor:required', '<yes>'),
525
            ('tmultiline:lines', '<one\ntwo>'),
526
            ('tmorewords:text', '<one\ntwo>\n'))
527
        req = self._processInputs(inputs)
1✔
528

529
        taintedformkeys = list(req.taintedform.keys())
1✔
530
        taintedformkeys.sort()
1✔
531
        self.assertEqual(
1✔
532
            taintedformkeys,
533
            ['<taday>', '<tbign>', '<tfract>',
534
             '<tnum>', 't2tokens', 'taccountedfor', 'tmorewords', 'tmultiline',
535
             'twords'])
536

537
        self._taintedKeysAlsoInForm(req)
1✔
538
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
539

540
    def test_processInputs_w_unicode_w_taints(self):
1✔
541
        inputs = (
1✔
542
            ('tustring:ustring:utf8', '<test\xc2\xae>'),
543
            ('tutext:utext:utf8', '<test\xc2\xae>\n<test\xc2\xae\n>'),
544

545
            ('tinitutokens:utokens:utf8', '<test\xc2\xae> test\xc2\xae'),
546
            ('tinitulines:ulines:utf8', '<test\xc2\xae>\ntest\xc2\xae'),
547

548
            ('tdeferutokens:utokens:utf8', 'test\xc2\xae <test\xc2\xae>'),
549
            ('tdeferulines:ulines:utf8', 'test\xc2\xae\n<test\xc2\xae>'),
550

551
            ('tnouconverter:string:utf8', '<test\xc2\xae>'),
552
        )
553

554
        # unicode converters will go away with Zope 6
555
        # ignore deprecation warning for test run
556
        with warnings.catch_warnings():
1✔
557
            warnings.simplefilter('ignore')
1✔
558
            req = self._processInputs(inputs)
1✔
559

560
        taintedformkeys = list(req.taintedform.keys())
1✔
561
        taintedformkeys.sort()
1✔
562
        self.assertEqual(
1✔
563
            taintedformkeys,
564
            ['tdeferulines', 'tdeferutokens',
565
             'tinitulines', 'tinitutokens', 'tnouconverter', 'tustring',
566
             'tutext'])
567

568
        self._taintedKeysAlsoInForm(req)
1✔
569
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
570

571
    def test_processInputs_w_simple_containers_w_taints(self):
1✔
572
        inputs = (
1✔
573
            ('toneitem:list', '<one>'),
574
            ('<tkeyoneitem>:list', 'one'),
575
            ('tinitalist:list', '<one>'), ('tinitalist:list', 'two'),
576
            ('tdeferalist:list', 'one'), ('tdeferalist:list', '<two>'),
577

578
            ('toneitemtuple:tuple', '<one>'),
579
            ('tinitatuple:tuple', '<one>'), ('tinitatuple:tuple', 'two'),
580
            ('tdeferatuple:tuple', 'one'), ('tdeferatuple:tuple', '<two>'),
581

582
            ('tinitonerec.foo:record', '<foo>'),
583
            ('tinitonerec.bar:record', 'bar'),
584
            ('tdeferonerec.foo:record', 'foo'),
585
            ('tdeferonerec.bar:record', '<bar>'),
586

587
            ('tinitinitsetrec.foo:records', '<foo>'),
588
            ('tinitinitsetrec.bar:records', 'bar'),
589
            ('tinitinitsetrec.foo:records', 'spam'),
590
            ('tinitinitsetrec.bar:records', 'eggs'),
591

592
            ('tinitdefersetrec.foo:records', 'foo'),
593
            ('tinitdefersetrec.bar:records', '<bar>'),
594
            ('tinitdefersetrec.foo:records', 'spam'),
595
            ('tinitdefersetrec.bar:records', 'eggs'),
596

597
            ('tdeferinitsetrec.foo:records', 'foo'),
598
            ('tdeferinitsetrec.bar:records', 'bar'),
599
            ('tdeferinitsetrec.foo:records', '<spam>'),
600
            ('tdeferinitsetrec.bar:records', 'eggs'),
601

602
            ('tdeferdefersetrec.foo:records', 'foo'),
603
            ('tdeferdefersetrec.bar:records', 'bar'),
604
            ('tdeferdefersetrec.foo:records', 'spam'),
605
            ('tdeferdefersetrec.bar:records', '<eggs>'))
606
        req = self._processInputs(inputs)
1✔
607

608
        taintedformkeys = list(req.taintedform.keys())
1✔
609
        taintedformkeys.sort()
1✔
610
        self.assertEqual(
1✔
611
            taintedformkeys,
612
            ['<tkeyoneitem>', 'tdeferalist',
613
             'tdeferatuple', 'tdeferdefersetrec', 'tdeferinitsetrec',
614
             'tdeferonerec', 'tinitalist', 'tinitatuple', 'tinitdefersetrec',
615
             'tinitinitsetrec', 'tinitonerec', 'toneitem', 'toneitemtuple'])
616

617
        self._taintedKeysAlsoInForm(req)
1✔
618
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
619

620
    def test_processInputs_w_records_w_sequences_tainted(self):
1✔
621
        inputs = (
1✔
622
            ('tinitonerec.tokens:tokens:record', '<one> two'),
623
            ('tdeferonerec.tokens:tokens:record', 'one <two>'),
624

625
            ('tinitsetrec.name:records', 'first'),
626
            ('tinitsetrec.ilist:list:records', '<1>'),
627
            ('tinitsetrec.ilist:list:records', '2'),
628
            ('tinitsetrec.ituple:tuple:int:records', '1'),
629
            ('tinitsetrec.ituple:tuple:int:records', '2'),
630
            ('tinitsetrec.name:records', 'second'),
631
            ('tinitsetrec.ilist:list:records', '1'),
632
            ('tinitsetrec.ilist:list:records', '2'),
633
            ('tinitsetrec.ituple:tuple:int:records', '1'),
634
            ('tinitsetrec.ituple:tuple:int:records', '2'),
635

636
            ('tdeferfirstsetrec.name:records', 'first'),
637
            ('tdeferfirstsetrec.ilist:list:records', '1'),
638
            ('tdeferfirstsetrec.ilist:list:records', '<2>'),
639
            ('tdeferfirstsetrec.ituple:tuple:int:records', '1'),
640
            ('tdeferfirstsetrec.ituple:tuple:int:records', '2'),
641
            ('tdeferfirstsetrec.name:records', 'second'),
642
            ('tdeferfirstsetrec.ilist:list:records', '1'),
643
            ('tdeferfirstsetrec.ilist:list:records', '2'),
644
            ('tdeferfirstsetrec.ituple:tuple:int:records', '1'),
645
            ('tdeferfirstsetrec.ituple:tuple:int:records', '2'),
646

647
            ('tdefersecondsetrec.name:records', 'first'),
648
            ('tdefersecondsetrec.ilist:list:records', '1'),
649
            ('tdefersecondsetrec.ilist:list:records', '2'),
650
            ('tdefersecondsetrec.ituple:tuple:int:records', '1'),
651
            ('tdefersecondsetrec.ituple:tuple:int:records', '2'),
652
            ('tdefersecondsetrec.name:records', 'second'),
653
            ('tdefersecondsetrec.ilist:list:records', '1'),
654
            ('tdefersecondsetrec.ilist:list:records', '<2>'),
655
            ('tdefersecondsetrec.ituple:tuple:int:records', '1'),
656
            ('tdefersecondsetrec.ituple:tuple:int:records', '2'),
657
        )
658
        req = self._processInputs(inputs)
1✔
659

660
        taintedformkeys = list(req.taintedform.keys())
1✔
661
        taintedformkeys.sort()
1✔
662
        self.assertEqual(
1✔
663
            taintedformkeys,
664
            ['tdeferfirstsetrec', 'tdeferonerec',
665
             'tdefersecondsetrec', 'tinitonerec', 'tinitsetrec'])
666

667
        self._taintedKeysAlsoInForm(req)
1✔
668
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
669

670
    def test_processInputs_w_defaults_w_taints(self):
1✔
671
        inputs = (
1✔
672
            ('tfoo:default', '<5>'),
673

674
            ('doesnnotapply:default', '<4>'),
675
            ('doesnnotapply', '4'),
676

677
            ('tinitlist:default', '3'),
678
            ('tinitlist:default', '4'),
679
            ('tinitlist:default', '5'),
680
            ('tinitlist', '<1>'),
681
            ('tinitlist', '2'),
682

683
            ('tdeferlist:default', '3'),
684
            ('tdeferlist:default', '<4>'),
685
            ('tdeferlist:default', '5'),
686
            ('tdeferlist', '1'),
687
            ('tdeferlist', '2'),
688

689
            ('tinitbar.spam:record:default', 'eggs'),
690
            ('tinitbar.foo:record:default', 'foo'),
691
            ('tinitbar.foo:record', '<baz>'),
692
            ('tdeferbar.spam:record:default', '<eggs>'),
693
            ('tdeferbar.foo:record:default', 'foo'),
694
            ('tdeferbar.foo:record', 'baz'),
695

696
            ('rdoesnotapply.spam:record:default', '<eggs>'),
697
            ('rdoesnotapply.spam:record', 'eggs'),
698

699
            ('tinitsetrec.spam:records:default', 'eggs'),
700
            ('tinitsetrec.foo:records:default', 'foo'),
701
            ('tinitsetrec.foo:records', '<baz>'),
702
            ('tinitsetrec.foo:records', 'ham'),
703

704
            ('tdefersetrec.spam:records:default', '<eggs>'),
705
            ('tdefersetrec.foo:records:default', 'foo'),
706
            ('tdefersetrec.foo:records', 'baz'),
707
            ('tdefersetrec.foo:records', 'ham'),
708

709
            ('srdoesnotapply.foo:records:default', '<eggs>'),
710
            ('srdoesnotapply.foo:records', 'baz'),
711
            ('srdoesnotapply.foo:records', 'ham'))
712
        req = self._processInputs(inputs)
1✔
713

714
        taintedformkeys = list(req.taintedform.keys())
1✔
715
        taintedformkeys.sort()
1✔
716
        self.assertEqual(
1✔
717
            taintedformkeys,
718
            ['tdeferbar', 'tdeferlist',
719
             'tdefersetrec', 'tfoo', 'tinitbar', 'tinitlist', 'tinitsetrec'])
720

721
        self._taintedKeysAlsoInForm(req)
1✔
722
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
723

724
    def test_processInputs_w_tainted_attribute_raises(self):
1✔
725
        input = ('taintedattr.here<be<taint:record', 'value',)
1✔
726

727
        self.assertRaises(ValueError, self._processInputs, input)
1✔
728

729
    def test_processInputs_w_tainted_values_cleans_exceptions(self):
1✔
730
        # Feed tainted garbage to the conversion methods, and any exception
731
        # returned should be HTML safe
732
        from DateTime.interfaces import SyntaxError
1✔
733
        from ZPublisher.Converters import type_converters
1✔
734
        for type, convert in list(type_converters.items()):
1✔
735
            try:
1✔
736
                # unicode converters will go away with Zope 6
737
                # ignore deprecation warning for test run
738
                with warnings.catch_warnings():
1✔
739
                    warnings.simplefilter('ignore')
1✔
740
                    convert('<html garbage>')
1✔
741
            except Exception as e:
1!
742
                self.assertFalse(
1✔
743
                    '<' in e.args,
744
                    '%s converter does not quote unsafe value!' % type)
745
            except SyntaxError as e:
×
746
                self.assertFalse(
×
747
                    '<' in e,
748
                    '%s converter does not quote unsafe value!' % type)
749

750
    def test_processInputs_w_dotted_name_as_tuple(self):
1✔
751
        # Collector #500
752
        inputs = (
1✔
753
            ('name.:tuple', 'name with dot as tuple'),)
754
        req = self._processInputs(inputs)
1✔
755

756
        formkeys = list(req.form.keys())
1✔
757
        formkeys.sort()
1✔
758
        self.assertEqual(formkeys, ['name.'])
1✔
759

760
        self.assertEqual(req['name.'], ('name with dot as tuple',))
1✔
761

762
        self._noTaintedValues(req)
1✔
763
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
764

765
    def test_processInputs_w_cookie_parsing(self):
1✔
766
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
767

768
        env['HTTP_COOKIE'] = 'foo=bar; baz=gee'
1✔
769
        req = self._makeOne(environ=env)
1✔
770
        self.assertEqual(req.cookies['foo'], 'bar')
1✔
771
        self.assertEqual(req.cookies['baz'], 'gee')
1✔
772

773
        env['HTTP_COOKIE'] = 'foo=bar; baz="gee, like, e=mc^2"'
1✔
774
        req = self._makeOne(environ=env)
1✔
775
        self.assertEqual(req.cookies['foo'], 'bar')
1✔
776
        self.assertEqual(req.cookies['baz'], 'gee, like, e=mc^2')
1✔
777

778
        # Collector #1498: empty cookies
779
        env['HTTP_COOKIE'] = 'foo=bar; hmm; baz=gee'
1✔
780
        req = self._makeOne(environ=env)
1✔
781
        self.assertEqual(req.cookies['foo'], 'bar')
1✔
782
        self.assertEqual(req.cookies['hmm'], '')
1✔
783
        self.assertEqual(req.cookies['baz'], 'gee')
1✔
784

785
        # Unquoted multi-space cookies
786
        env['HTTP_COOKIE'] = 'single=cookie data; ' \
1✔
787
                             'quoted="cookie data with unquoted spaces"; ' \
788
                             'multi=cookie data with unquoted spaces; ' \
789
                             'multi2=cookie data with unquoted spaces'
790
        req = self._makeOne(environ=env)
1✔
791
        self.assertEqual(req.cookies['single'], 'cookie data')
1✔
792
        self.assertEqual(req.cookies['quoted'],
1✔
793
                         'cookie data with unquoted spaces')
794
        self.assertEqual(req.cookies['multi'],
1✔
795
                         'cookie data with unquoted spaces')
796
        self.assertEqual(req.cookies['multi2'],
1✔
797
                         'cookie data with unquoted spaces')
798

799
    def test_processInputs_xmlrpc(self):
1✔
800
        TEST_METHOD_CALL = (
1✔
801
            b'<?xml version="1.0"?>'
802
            b'<methodCall><methodName>test</methodName></methodCall>'
803
        )
804
        environ = self._makePostEnviron(body=TEST_METHOD_CALL)
1✔
805
        environ['CONTENT_TYPE'] = 'text/xml'
1✔
806
        req = self._makeOne(stdin=BytesIO(TEST_METHOD_CALL), environ=environ)
1✔
807
        req.processInputs()
1✔
808
        self.assertEqual(req.PATH_INFO, '/test')
1✔
809
        self.assertEqual(req.args, ())
1✔
810

811
    def test_processInputs_xmlrpc_query_string(self):
1✔
812
        TEST_METHOD_CALL = (
1✔
813
            b'<?xml version="1.0"?>'
814
            b'<methodCall><methodName>test</methodName></methodCall>'
815
        )
816
        environ = self._makePostEnviron(body=TEST_METHOD_CALL)
1✔
817
        environ['CONTENT_TYPE'] = 'text/xml'
1✔
818
        environ['QUERY_STRING'] = 'x=1'
1✔
819
        req = self._makeOne(stdin=BytesIO(TEST_METHOD_CALL), environ=environ)
1✔
820
        req.processInputs()
1✔
821
        self.assertEqual(req.PATH_INFO, '/test')
1✔
822
        self.assertEqual(req.args, ())
1✔
823
        self.assertEqual(req.form["x"], '1')
1✔
824

825
    def test_processInputs_xmlrpc_method(self):
1✔
826
        TEST_METHOD_CALL = (
1✔
827
            b'<?xml version="1.0"?>'
828
            b'<methodCall><methodName>test</methodName></methodCall>'
829
        )
830
        environ = self._makePostEnviron(body=TEST_METHOD_CALL)
1✔
831
        environ['CONTENT_TYPE'] = 'text/xml'
1✔
832
        environ['QUERY_STRING'] = ':method=method'
1✔
833
        req = self._makeOne(stdin=BytesIO(TEST_METHOD_CALL), environ=environ)
1✔
834
        with self.assertRaises(BadRequest):
1✔
835
            req.processInputs()
1✔
836

837
    def test_processInputs_SOAP(self):
1✔
838
        # ZPublisher does not really have SOAP support
839
        # all it does is put the body into ``SOAPXML``
840
        body = b'soap'
1✔
841
        environ = TEST_POST_ENVIRON.copy()
1✔
842
        environ['HTTP_SOAPACTION'] = "soapaction"
1✔
843
        req = self._makeOne(stdin=BytesIO(body), environ=environ)
1✔
844
        req.processInputs()
1✔
845
        self.assertEqual(req.SOAPXML, body)
1✔
846

847
    def test_processInputs_SOAP_query_string(self):
1✔
848
        # ZPublisher does not really have SOAP support
849
        # all it does is put the body into ``SOAPXML``
850
        body = b'soap'
1✔
851
        environ = TEST_POST_ENVIRON.copy()
1✔
852
        environ['QUERY_STRING'] = 'x=1'
1✔
853
        environ['HTTP_SOAPACTION'] = "soapaction"
1✔
854
        req = self._makeOne(stdin=BytesIO(body), environ=environ)
1✔
855
        req.processInputs()
1✔
856
        self.assertEqual(req.SOAPXML, body)
1✔
857
        self.assertEqual(req.form["x"], '1')
1✔
858

859
    def test_processInputs_w_urlencoded_and_qs(self):
1✔
860
        body = b'foo=1'
1✔
861
        environ = {
1✔
862
            'CONTENT_TYPE': 'application/x-www-form-urlencoded',
863
            'CONTENT_LENGTH': len(body),
864
            'QUERY_STRING': 'bar=2',
865
            'REQUEST_METHOD': 'POST',
866
        }
867
        req = self._makeOne(stdin=BytesIO(body), environ=environ)
1✔
868
        req.processInputs()
1✔
869
        self.assertEqual(req.form['foo'], '1')
1✔
870
        self.assertEqual(req.form['bar'], '2')
1✔
871

872
    def test_close_removes_stdin_references(self):
1✔
873
        # Verifies that all references to the input stream go away on
874
        # request.close().  Otherwise a tempfile may stick around.
875
        s = BytesIO(TEST_FILE_DATA)
1✔
876
        start_count = sys.getrefcount(s)
1✔
877

878
        environ = self._makePostEnviron(body=TEST_FILE_DATA)
1✔
879
        req = self._makeOne(stdin=s, environ=environ)
1✔
880
        req.processInputs()
1✔
881
        self.assertNotEqual(start_count, sys.getrefcount(s))  # Precondition
1✔
882
        req.close()
1✔
883
        self.assertEqual(start_count, sys.getrefcount(s))  # The test
1✔
884

885
    def test_processInputs_w_large_input_gets_tempfile(self):
1✔
886
        # checks fileupload object supports the filename
887
        s = BytesIO(TEST_LARGEFILE_DATA)
1✔
888

889
        environ = self._makePostEnviron(body=TEST_LARGEFILE_DATA)
1✔
890
        req = self._makeOne(stdin=s, environ=environ)
1✔
891
        req.processInputs()
1✔
892
        f = req.form.get('largefile')
1✔
893
        self.assertTrue(f.name)
1✔
894
        self.assertEqual(4006, len(f.file.read()))
1✔
895
        f.file.close()
1✔
896

897
    def test_processInputs_with_file_upload_gets_iterator(self):
1✔
898
        # checks fileupload object supports the iterator protocol
899
        # collector entry 1837
900
        s = BytesIO(TEST_FILE_DATA)
1✔
901

902
        environ = self._makePostEnviron(body=TEST_FILE_DATA)
1✔
903
        req = self._makeOne(stdin=s, environ=environ)
1✔
904
        req.processInputs()
1✔
905
        f = req.form.get('smallfile')
1✔
906
        self.assertEqual(list(f), [b'test\n'])
1✔
907
        f.seek(0)
1✔
908
        self.assertEqual(next(f), b'test\n')
1✔
909

910
    def test_processInputs_BODY(self):
1✔
911
        s = BytesIO(b"body")
1✔
912
        environ = TEST_POST_ENVIRON.copy()
1✔
913
        environ["CONTENT_TYPE"] = "text/plain"
1✔
914
        req = self._makeOne(stdin=s, environ=environ)
1✔
915
        req.processInputs()
1✔
916
        self.assertEqual(req["BODY"], b"body")
1✔
917
        self.assertIs(req["BODYFILE"], s)
1✔
918

919
    def test_processInputs_BODY_unseekable(self):
1✔
920
        s = _Unseekable(BytesIO(b"body"))
1✔
921
        environ = TEST_POST_ENVIRON.copy()
1✔
922
        environ["CONTENT_TYPE"] = "text/plain"
1✔
923
        req = self._makeOne(stdin=s, environ=environ)
1✔
924
        req.processInputs()
1✔
925
        self.assertEqual(req["BODY"], b"body")
1✔
926
        self.assertIs(req["BODYFILE"], s)
1✔
927

928
    def test_processInputs_seekable_form_data(self):
1✔
929
        s = BytesIO(TEST_FILE_DATA)
1✔
930
        environ = self._makePostEnviron(body=TEST_FILE_DATA)
1✔
931
        req = self._makeOne(stdin=s, environ=environ)
1✔
932
        req.processInputs()
1✔
933
        f = req.form.get('smallfile')
1✔
934
        self.assertEqual(list(f), [b'test\n'])
1✔
935
        self.assertEqual(req["BODY"], TEST_FILE_DATA)
1✔
936
        self.assertEqual(req["BODYFILE"].read(), TEST_FILE_DATA)
1✔
937

938
    def test_processInputs_unseekable_form_data(self):
1✔
939
        s = _Unseekable(BytesIO(TEST_FILE_DATA))
1✔
940
        environ = self._makePostEnviron(body=TEST_FILE_DATA)
1✔
941
        req = self._makeOne(stdin=s, environ=environ)
1✔
942
        req.processInputs()
1✔
943
        f = req.form.get('smallfile')
1✔
944
        self.assertEqual(list(f), [b'test\n'])
1✔
945
        # we cannot access ``BODY`` in this case
946
        # as the underlying file has been read
947
        with self.assertRaises(KeyError):
1✔
948
            req["BODY"]
1✔
949

950
    def test_processInputs_unspecified_file(self):
1✔
951
        s = BytesIO(TEST_FILE_DATA_UNSPECIFIED)
1✔
952
        environ = self._makePostEnviron(body=TEST_FILE_DATA_UNSPECIFIED)
1✔
953
        req = self._makeOne(stdin=s, environ=environ)
1✔
954
        req.processInputs()
1✔
955
        f = req.form.get('smallfile')
1✔
956
        self.assertEqual(f.filename, "")
1✔
957
        self.assertEqual(list(f), [])
1✔
958

959
    def test__authUserPW_simple(self):
1✔
960
        user_id = 'user'
1✔
961
        password = 'password'
1✔
962
        auth_header = basic_auth_encode(user_id, password)
1✔
963

964
        environ = {'HTTP_AUTHORIZATION': auth_header}
1✔
965
        request = self._makeOne(environ=environ)
1✔
966

967
        user_id_x, password_x = request._authUserPW()
1✔
968

969
        self.assertEqual(user_id_x, user_id)
1✔
970
        self.assertEqual(password_x, password)
1✔
971

972
    def test__authUserPW_with_embedded_colon(self):
1✔
973
        user_id = 'user'
1✔
974
        password = 'embedded:colon'
1✔
975
        auth_header = basic_auth_encode(user_id, password)
1✔
976

977
        environ = {'HTTP_AUTHORIZATION': auth_header}
1✔
978
        request = self._makeOne(environ=environ)
1✔
979

980
        user_id_x, password_x = request._authUserPW()
1✔
981

982
        self.assertEqual(user_id_x, user_id)
1✔
983
        self.assertEqual(password_x, password)
1✔
984

985
    def test__authUserPW_non_ascii(self):
1✔
986
        user_id = 'usèr'
1✔
987
        password = 'pàssword'
1✔
988
        auth_header = basic_auth_encode(user_id, password)
1✔
989

990
        environ = {'HTTP_AUTHORIZATION': auth_header}
1✔
991
        request = self._makeOne(environ=environ)
1✔
992

993
        user_id_x, password_x = request._authUserPW()
1✔
994

995
        self.assertEqual(user_id_x, user_id)
1✔
996
        self.assertEqual(password_x, password)
1✔
997

998
    def test_debug_not_in_qs_still_gets_attr(self):
1✔
999
        from zope.publisher.base import DebugFlags
1✔
1000

1001
        # when accessing request.debug we will see the DebugFlags instance
1002
        request = self._makeOne()
1✔
1003
        self.assertIsInstance(request.debug, DebugFlags)
1✔
1004
        # It won't be available through dictonary lookup, though
1005
        self.assertTrue(request.get('debug') is None)
1✔
1006

1007
    def test_debug_in_qs_gets_form_var(self):
1✔
1008
        env = {'QUERY_STRING': 'debug=1'}
1✔
1009

1010
        # request.debug will actually yield a 'debug' form variable
1011
        # if it exists
1012
        request = self._makeOne(environ=env)
1✔
1013
        request.processInputs()
1✔
1014
        self.assertEqual(request.debug, '1')
1✔
1015
        self.assertEqual(request.get('debug'), '1')
1✔
1016
        self.assertEqual(request['debug'], '1')
1✔
1017

1018
        # we can still override request.debug with a form variable or directly
1019

1020
    def test_debug_override_via_form_other(self):
1✔
1021
        request = self._makeOne()
1✔
1022
        request.processInputs()
1✔
1023
        request.form['debug'] = '1'
1✔
1024
        self.assertEqual(request.debug, '1')
1✔
1025
        request['debug'] = '2'
1✔
1026
        self.assertEqual(request.debug, '2')
1✔
1027

1028
    def test_locale_property_accessor(self):
1✔
1029
        from ZPublisher.HTTPRequest import _marker
1✔
1030

1031
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1032
                       IUserPreferredLanguages)
1033

1034
        env = {'HTTP_ACCEPT_LANGUAGE': 'en'}
1✔
1035
        request = self._makeOne(environ=env)
1✔
1036

1037
        # before accessing request.locale for the first time, request._locale
1038
        # is still a marker
1039
        self.assertTrue(request._locale is _marker)
1✔
1040

1041
        # when accessing request.locale we will see an ILocale
1042
        self.assertTrue(ILocale.providedBy(request.locale))
1✔
1043

1044
        # and request._locale has been set
1045
        self.assertTrue(request._locale is request.locale)
1✔
1046

1047
        # It won't be available through dictonary lookup, though
1048
        self.assertTrue(request.get('locale') is None)
1✔
1049

1050
    def test_locale_in_qs(self):
1✔
1051
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1052
                       IUserPreferredLanguages)
1053

1054
        # request.locale will actually yield a 'locale' form variable
1055
        # if it exists
1056
        env = {'HTTP_ACCEPT_LANGUAGE': 'en', 'QUERY_STRING': 'locale=1'}
1✔
1057
        request = self._makeOne(environ=env)
1✔
1058
        request.processInputs()
1✔
1059

1060
        self.assertEqual(request.locale, '1')
1✔
1061
        self.assertEqual(request.get('locale'), '1')
1✔
1062
        self.assertEqual(request['locale'], '1')
1✔
1063

1064
    def test_locale_property_override_via_form_other(self):
1✔
1065
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1066
                       IUserPreferredLanguages)
1067
        env = {'HTTP_ACCEPT_LANGUAGE': 'en'}
1✔
1068

1069
        # we can still override request.locale with a form variable
1070
        request = self._makeOne(environ=env)
1✔
1071
        request.processInputs()
1✔
1072

1073
        self.assertTrue(ILocale.providedBy(request.locale))
1✔
1074

1075
        request.form['locale'] = '1'
1✔
1076
        self.assertEqual(request.locale, '1')
1✔
1077

1078
        request['locale'] = '2'
1✔
1079
        self.assertEqual(request.locale, '2')
1✔
1080

1081
    def test_locale_semantics(self):
1✔
1082
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1083
                       IUserPreferredLanguages)
1084
        env_ = {'HTTP_ACCEPT_LANGUAGE': 'en'}
1✔
1085

1086
        # we should also test the correct semantics of the locale
1087
        for httplang in ('it', 'it-ch', 'it-CH', 'IT', 'IT-CH', 'IT-ch'):
1✔
1088
            env = env_.copy()
1✔
1089
            env['HTTP_ACCEPT_LANGUAGE'] = httplang
1✔
1090
            request = self._makeOne(environ=env)
1✔
1091
            locale = request.locale
1✔
1092
            self.assertTrue(ILocale.providedBy(locale))
1✔
1093
            parts = httplang.split('-')
1✔
1094
            lang = parts.pop(0).lower()
1✔
1095
            territory = variant = None
1✔
1096
            if parts:
1✔
1097
                territory = parts.pop(0).upper()
1✔
1098
            if parts:
1!
1099
                variant = parts.pop(0).upper()
×
1100
            self.assertEqual(locale.id.language, lang)
1✔
1101
            self.assertEqual(locale.id.territory, territory)
1✔
1102
            self.assertEqual(locale.id.variant, variant)
1✔
1103

1104
    def test_locale_fallback(self):
1✔
1105
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1106
                       IUserPreferredLanguages)
1107

1108
        env = {'HTTP_ACCEPT_LANGUAGE': 'xx'}
1✔
1109

1110
        # Now test for non-existant locale fallback
1111
        request = self._makeOne(environ=env)
1✔
1112
        locale = request.locale
1✔
1113

1114
        self.assertTrue(ILocale.providedBy(locale))
1✔
1115
        self.assertTrue(locale.id.language is None)
1✔
1116
        self.assertTrue(locale.id.territory is None)
1✔
1117
        self.assertTrue(locale.id.variant is None)
1✔
1118

1119
    def test_method_GET(self):
1✔
1120
        env = {'REQUEST_METHOD': 'GET'}
1✔
1121
        request = self._makeOne(environ=env)
1✔
1122
        self.assertEqual(request.method, 'GET')
1✔
1123

1124
    def test_method_POST(self):
1✔
1125
        env = {'REQUEST_METHOD': 'POST'}
1✔
1126
        request = self._makeOne(environ=env)
1✔
1127
        self.assertEqual(request.method, 'POST')
1✔
1128

1129
    def test_getClientAddr_wo_trusted_proxy(self):
1✔
1130
        env = {'REMOTE_ADDR': '127.0.0.1',
1✔
1131
               'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'}
1132
        request = self._makeOne(environ=env)
1✔
1133
        self.assertEqual(request.getClientAddr(), '127.0.0.1')
1✔
1134

1135
    def test_getClientAddr_one_trusted_proxy(self):
1✔
1136
        from ZPublisher.HTTPRequest import trusted_proxies
1✔
1137
        env = {'REMOTE_ADDR': '127.0.0.1',
1✔
1138
               'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'}
1139

1140
        orig = trusted_proxies[:]
1✔
1141
        try:
1✔
1142
            trusted_proxies.append('127.0.0.1')
1✔
1143
            request = self._makeOne(environ=env)
1✔
1144
            self.assertEqual(request.getClientAddr(), '192.168.1.100')
1✔
1145
        finally:
1146
            trusted_proxies[:] = orig
1✔
1147

1148
    def test_getClientAddr_trusted_proxy_last(self):
1✔
1149
        from ZPublisher.HTTPRequest import trusted_proxies
1✔
1150
        env = {'REMOTE_ADDR': '192.168.1.100',
1✔
1151
               'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'}
1152

1153
        orig = trusted_proxies[:]
1✔
1154
        try:
1✔
1155
            trusted_proxies.append('192.168.1.100')
1✔
1156
            request = self._makeOne(environ=env)
1✔
1157
            self.assertEqual(request.getClientAddr(), '10.1.20.30')
1✔
1158
        finally:
1159
            trusted_proxies[:] = orig
1✔
1160

1161
    def test_getClientAddr_trusted_proxy_no_REMOTE_ADDR(self):
1✔
1162
        from ZPublisher.HTTPRequest import trusted_proxies
1✔
1163
        env = {'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'}
1✔
1164

1165
        orig = trusted_proxies[:]
1✔
1166
        try:
1✔
1167
            trusted_proxies.append('192.168.1.100')
1✔
1168
            request = self._makeOne(environ=env)
1✔
1169
            self.assertEqual(request.getClientAddr(), '')
1✔
1170
        finally:
1171
            trusted_proxies[:] = orig
1✔
1172

1173
    def test_getHeader_exact(self):
1✔
1174
        environ = self._makePostEnviron()
1✔
1175
        request = self._makeOne(environ=environ)
1✔
1176
        self.assertEqual(request.getHeader('content-type'),
1✔
1177
                         'multipart/form-data; boundary=12345')
1178

1179
    def test_getHeader_case_insensitive(self):
1✔
1180
        environ = self._makePostEnviron()
1✔
1181
        request = self._makeOne(environ=environ)
1✔
1182
        self.assertEqual(request.getHeader('Content-Type'),
1✔
1183
                         'multipart/form-data; boundary=12345')
1184

1185
    def test_getHeader_underscore_is_dash(self):
1✔
1186
        environ = self._makePostEnviron()
1✔
1187
        request = self._makeOne(environ=environ)
1✔
1188
        self.assertEqual(request.getHeader('content_type'),
1✔
1189
                         'multipart/form-data; boundary=12345')
1190

1191
    def test_getHeader_literal_turns_off_case_normalization(self):
1✔
1192
        environ = self._makePostEnviron()
1✔
1193
        request = self._makeOne(environ=environ)
1✔
1194
        self.assertEqual(request.getHeader('Content-Type', literal=True), None)
1✔
1195

1196
    def test_getHeader_nonesuch(self):
1✔
1197
        environ = self._makePostEnviron()
1✔
1198
        request = self._makeOne(environ=environ)
1✔
1199
        self.assertEqual(request.getHeader('none-such'), None)
1✔
1200

1201
    def test_getHeader_nonesuch_with_default(self):
1✔
1202
        environ = self._makePostEnviron()
1✔
1203
        request = self._makeOne(environ=environ)
1✔
1204
        self.assertEqual(request.getHeader('Not-existant', default='Whatever'),
1✔
1205
                         'Whatever')
1206

1207
    def test_clone_updates_method_to_GET(self):
1✔
1208
        request = self._makeOne(environ={'REQUEST_METHOD': 'POST'})
1✔
1209
        request['PARENTS'] = [object()]
1✔
1210
        clone = request.clone()
1✔
1211
        self.assertEqual(clone.method, 'GET')
1✔
1212

1213
    def test_clone_keeps_preserves__auth(self):
1✔
1214
        request = self._makeOne()
1✔
1215
        request['PARENTS'] = [object()]
1✔
1216
        request._auth = 'foobar'
1✔
1217
        clone = request.clone()
1✔
1218
        self.assertEqual(clone._auth, 'foobar')
1✔
1219

1220
    def test_clone_doesnt_re_clean_environ(self):
1✔
1221
        request = self._makeOne()
1✔
1222
        request.environ['HTTP_CGI_AUTHORIZATION'] = 'lalalala'
1✔
1223
        request['PARENTS'] = [object()]
1✔
1224
        clone = request.clone()
1✔
1225
        self.assertEqual(clone.environ['HTTP_CGI_AUTHORIZATION'], 'lalalala')
1✔
1226

1227
    def test_clone_keeps_only_last_PARENT(self):
1✔
1228
        PARENTS = [object(), object()]
1✔
1229
        request = self._makeOne()
1✔
1230
        request['PARENTS'] = PARENTS
1✔
1231
        clone = request.clone()
1✔
1232
        self.assertEqual(clone['PARENTS'], PARENTS[1:])
1✔
1233

1234
    def test_clone_preserves_response_class(self):
1✔
1235
        class DummyResponse:
1✔
1236
            pass
1✔
1237
        environ = self._makePostEnviron()
1✔
1238
        request = self._makeOne(None, environ, DummyResponse())
1✔
1239
        request['PARENTS'] = [object()]
1✔
1240
        clone = request.clone()
1✔
1241
        self.assertIsInstance(clone.response, DummyResponse)
1✔
1242

1243
    def test_clone_preserves_request_subclass(self):
1✔
1244
        class SubRequest(self._getTargetClass()):
1✔
1245
            pass
1✔
1246
        environ = self._makePostEnviron()
1✔
1247
        request = SubRequest(None, environ, None)
1✔
1248
        request['PARENTS'] = [object()]
1✔
1249
        clone = request.clone()
1✔
1250
        self.assertIsInstance(clone, SubRequest)
1✔
1251

1252
    def test_clone_preserves_direct_interfaces(self):
1✔
1253
        from zope.interface import Interface
1✔
1254
        from zope.interface import directlyProvides
1✔
1255

1256
        class IFoo(Interface):
1✔
1257
            pass
1✔
1258
        request = self._makeOne()
1✔
1259
        request['PARENTS'] = [object()]
1✔
1260
        directlyProvides(request, IFoo)
1✔
1261
        clone = request.clone()
1✔
1262
        self.assertTrue(IFoo.providedBy(clone))
1✔
1263

1264
    def test_resolve_url_doesnt_send_endrequestevent(self):
1✔
1265
        # The following imports are necessary:
1266
        #  They happen implicitely in `request.resolve_url`
1267
        #  They creates `zope.schema` events
1268
        # Doing them here avoids those unrelated events
1269
        import OFS.PropertyManager  # noqa: F401
1✔
1270
        import OFS.SimpleItem  # noqa: F401
1✔
1271
        #
1272
        import zope.event
1✔
1273
        events = []
1✔
1274
        zope.event.subscribers.append(events.append)
1✔
1275
        request = self._makeOne()
1✔
1276
        request['PARENTS'] = [object()]
1✔
1277
        try:
1✔
1278
            request.resolve_url(request.script + '/')
1✔
1279
        finally:
1280
            zope.event.subscribers.remove(events.append)
1✔
1281
        self.assertFalse(
1✔
1282
            len(events),
1283
            "HTTPRequest.resolve_url should not emit events")
1284

1285
    def test_resolve_url_errorhandling(self):
1✔
1286
        # Check that resolve_url really raises the same error
1287
        # it received from ZPublisher.BaseRequest.traverse
1288
        request = self._makeOne()
1✔
1289
        request['PARENTS'] = [object()]
1✔
1290
        self.assertRaises(
1✔
1291
            NotFound, request.resolve_url, request.script + '/does_not_exist')
1292

1293
    def test_parses_json_cookies(self):
1✔
1294
        # https://bugs.launchpad.net/zope2/+bug/563229
1295
        # reports cookies in the wild with embedded double quotes (e.g,
1296
        # JSON-encoded data structures.
1297
        env = {
1✔
1298
            'SERVER_NAME': 'testingharnas',
1299
            'SERVER_PORT': '80',
1300
            'HTTP_COOKIE': 'json={"intkey":123,"stringkey":"blah"}; '
1301
                           'anothercookie=boring; baz'
1302
        }
1303
        req = self._makeOne(environ=env)
1✔
1304
        self.assertEqual(req.cookies['json'],
1✔
1305
                         '{"intkey":123,"stringkey":"blah"}')
1306
        self.assertEqual(req.cookies['anothercookie'], 'boring')
1✔
1307

1308
    def test_getVirtualRoot(self):
1✔
1309
        # https://bugs.launchpad.net/zope2/+bug/193122
1310
        req = self._makeOne()
1✔
1311

1312
        req._script = []
1✔
1313
        self.assertEqual(req.getVirtualRoot(), '')
1✔
1314

1315
        req._script = ['foo', 'bar']
1✔
1316
        self.assertEqual(req.getVirtualRoot(), '/foo/bar')
1✔
1317

1318
    def test__str__returns_native_string(self):
1✔
1319
        r = self._makeOne()
1✔
1320
        self.assertIsInstance(str(r), str)
1✔
1321

1322
    def test___str____password_field(self):
1✔
1323
        # It obscures password fields.
1324
        req = self._makeOne()
1✔
1325
        req.form['passwd'] = 'secret'
1✔
1326

1327
        self.assertNotIn('secret', str(req))
1✔
1328
        self.assertIn('password obscured', str(req))
1✔
1329

1330
    def test_text__password_field(self):
1✔
1331
        # It obscures password fields.
1332
        req = self._makeOne()
1✔
1333
        req.form['passwd'] = 'secret'
1✔
1334

1335
        self.assertNotIn('secret', req.text())
1✔
1336
        self.assertIn('password obscured', req.text())
1✔
1337

1338
    _xmlrpc_call = b"""<?xml version="1.0"?>
1✔
1339
    <methodCall>
1340
      <methodName>examples.getStateName</methodName>
1341
      <params>
1342
         <param>
1343
            <value><i4>41</i4></value>
1344
            </param>
1345
         </params>
1346
      </methodCall>
1347
    """
1348

1349
    def test_processInputs_xmlrpc_with_args(self):
1✔
1350
        req = self._makeOne(
1✔
1351
            stdin=BytesIO(self._xmlrpc_call),
1352
            environ=dict(REQUEST_METHOD="POST", CONTENT_TYPE="text/xml"))
1353
        req.processInputs()
1✔
1354
        self.assertTrue(is_xmlrpc_response(req.response))
1✔
1355
        self.assertEqual(req.args, (41,))
1✔
1356
        self.assertEqual(req.other["PATH_INFO"], "/examples/getStateName")
1✔
1357

1358
    def test_processInputs_xmlrpc_controlled_allowed(self):
1✔
1359
        req = self._makeOne(
1✔
1360
            stdin=BytesIO(self._xmlrpc_call),
1361
            environ=dict(REQUEST_METHOD="POST", CONTENT_TYPE="text/xml"))
1362
        with self._xmlrpc_control(lambda request: True):
1✔
1363
            req.processInputs()
1✔
1364
        self.assertTrue(is_xmlrpc_response(req.response))
1✔
1365

1366
    def test_processInputs_xmlrpc_controlled_disallowed(self):
1✔
1367
        req = self._makeOne(
1✔
1368
            environ=dict(REQUEST_METHOD="POST", CONTENT_TYPE="text/xml"))
1369
        with self._xmlrpc_control(lambda request: False):
1✔
1370
            req.processInputs()
1✔
1371
        self.assertFalse(is_xmlrpc_response(req.response))
1✔
1372

1373
    @contextmanager
1✔
1374
    def _xmlrpc_control(self, allow):
1✔
1375
        gsm = getGlobalSiteManager()
1✔
1376
        gsm.registerUtility(allow, IXmlrpcChecker)
1✔
1377
        yield
1✔
1378
        gsm.unregisterUtility(allow, IXmlrpcChecker)
1✔
1379

1380
    def test_url_scheme(self):
1✔
1381
        # The default is http
1382
        env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 80}
1✔
1383
        req = self._makeOne(environ=env)
1✔
1384
        self.assertEqual(req['SERVER_URL'], 'http://myhost')
1✔
1385

1386
        # If we bang a SERVER_URL into the environment it is retained
1387
        env = {'SERVER_URL': 'https://anotherserver:8443'}
1✔
1388
        req = self._makeOne(environ=env)
1✔
1389
        self.assertEqual(req['SERVER_URL'], 'https://anotherserver:8443')
1✔
1390

1391
        # Now go through the various environment values that signal
1392
        # a request uses the https URL scheme
1393
        for val in ('on', 'ON', '1'):
1✔
1394
            env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 443, 'HTTPS': val}
1✔
1395
            req = self._makeOne(environ=env)
1✔
1396
            self.assertEqual(req['SERVER_URL'], 'https://myhost')
1✔
1397

1398
        env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 443,
1✔
1399
               'SERVER_PORT_SECURE': 1}
1400
        req = self._makeOne(environ=env)
1✔
1401
        self.assertEqual(req['SERVER_URL'], 'https://myhost')
1✔
1402

1403
        env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 443,
1✔
1404
               'REQUEST_SCHEME': 'HTTPS'}
1405
        req = self._makeOne(environ=env)
1✔
1406
        self.assertEqual(req['SERVER_URL'], 'https://myhost')
1✔
1407

1408
        env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 443,
1✔
1409
               'wsgi.url_scheme': 'https'}
1410
        req = self._makeOne(environ=env)
1✔
1411
        self.assertEqual(req['SERVER_URL'], 'https://myhost')
1✔
1412

1413
    def test_form_urlencoded(self):
1✔
1414
        body = b"a=1"
1✔
1415
        env = self._makePostEnviron(body, False)
1✔
1416
        req = self._makeOne(stdin=BytesIO(body), environ=env)
1✔
1417
        req.processInputs()
1✔
1418
        self.assertEqual(req.form["a"], "1")
1✔
1419
        req = self._makeOne(stdin=BytesIO(body), environ=env)
1✔
1420
        with patch("ZPublisher.HTTPRequest.FORM_MEMORY_LIMIT", 1):
1✔
1421
            with self.assertRaises(BadRequest):
1✔
1422
                req.processInputs()
1✔
1423

1424
    def test_bytes_converter(self):
1✔
1425
        val = "äöü".encode("latin-1")
1✔
1426
        body = b"a:bytes:latin-1=" + val
1✔
1427
        env = self._makePostEnviron(body, False)
1✔
1428
        req = self._makeOne(stdin=BytesIO(body), environ=env)
1✔
1429
        req.processInputs()
1✔
1430
        self.assertEqual(req.form["a"], val)
1✔
1431

1432
    def test_get_with_body_and_query_string_ignores_body(self):
1✔
1433
        req_factory = self._getTargetClass()
1✔
1434
        req = req_factory(
1✔
1435
            BytesIO(b"foo"),
1436
            {
1437
                "SERVER_NAME": "localhost",
1438
                "SERVER_PORT": "8080",
1439
                "REQUEST_METHOD": "GET",
1440
                "QUERY_STRING": "bar"
1441
            },
1442
            None,
1443
        )
1444
        req.processInputs()
1✔
1445
        self.assertDictEqual(req.form, {"bar": ""})
1✔
1446

1447
    def test_put_with_body_and_query_string(self):
1✔
1448
        req_factory = self._getTargetClass()
1✔
1449
        req = req_factory(
1✔
1450
            BytesIO(b"foo"),
1451
            {
1452
                "SERVER_NAME": "localhost",
1453
                "SERVER_PORT": "8080",
1454
                "REQUEST_METHOD": "PUT",
1455
                "QUERY_STRING": "bar=bar"
1456
            },
1457
            None,
1458
        )
1459
        req.processInputs()
1✔
1460
        self.assertEqual(req.BODY, b"foo")
1✔
1461
        self.assertEqual(req.form["bar"], "bar")
1✔
1462

1463
    def test_issue_1095(self):
1✔
1464
        body = TEST_ISSUE_1095_DATA
1✔
1465
        env = self._makePostEnviron(body)
1✔
1466
        req = self._makeOne(BytesIO(body), env)
1✔
1467
        req.processInputs()
1✔
1468
        r = req["r"]
1✔
1469
        self.assertEqual(len(r), 2)
1✔
1470
        self.assertIsInstance(r[0].x, FileUpload)
1✔
1471
        self.assertIsInstance(r[1].x, str)
1✔
1472
        r = req.taintedform["r"]
1✔
1473
        self.assertIsInstance(r[0].x, FileUpload)
1✔
1474
        self.assertIsInstance(r[1].x, TaintedString)
1✔
1475

1476
    def test_field_charset(self):
1✔
1477
        body = TEST_FIELD_CHARSET_DATA
1✔
1478
        env = self._makePostEnviron(body)
1✔
1479
        env["QUERY_STRING"] = "y=" + quote_plus("äöü")
1✔
1480
        req = self._makeOne(BytesIO(body), env)
1✔
1481
        req.processInputs()
1✔
1482
        self.assertEqual(req["x"], "äöü")
1✔
1483
        self.assertEqual(req["y"], "äöü")
1✔
1484

1485
    def test_form_charset(self):
1✔
1486
        body = ("x=" + quote_plus("äöü", encoding="latin-1")).encode("ASCII")
1✔
1487
        env = self._makePostEnviron(body)
1✔
1488
        env["CONTENT_TYPE"] = \
1✔
1489
            "application/x-www-form-urlencoded; charset=latin-1"
1490
        env["QUERY_STRING"] = "y=" + quote_plus("äöü")
1✔
1491
        req = self._makeOne(BytesIO(body), env)
1✔
1492
        req.processInputs()
1✔
1493
        self.assertEqual(req["x"], "äöü")
1✔
1494
        self.assertEqual(req["y"], "äöü")
1✔
1495

1496

1497
class TestHTTPRequestZope3Views(TestRequestViewsBase):
1✔
1498

1499
    def _makeOne(self, root):
1✔
1500
        from zope.interface import directlyProvides
1✔
1501
        from zope.publisher.browser import IDefaultBrowserLayer
1✔
1502
        request = HTTPRequestFactoryMixin()._makeOne()
1✔
1503
        request['PARENTS'] = [root]
1✔
1504
        # The request needs to implement the proper interface
1505
        directlyProvides(request, IDefaultBrowserLayer)
1✔
1506
        return request
1✔
1507

1508
    def test_no_traversal_of_view_request_attribute(self):
1✔
1509
        # make sure views don't accidentally publish the 'request' attribute
1510
        root, _ = self._makeRootAndFolder()
1✔
1511

1512
        # make sure the view itself is traversable:
1513
        view = self._makeOne(root).traverse('folder/@@meth')
1✔
1514
        from ZPublisher.HTTPRequest import HTTPRequest
1✔
1515
        self.assertEqual(view.request.__class__, HTTPRequest,)
1✔
1516

1517
        # but not the request:
1518
        self.assertRaises(
1✔
1519
            NotFound,
1520
            self._makeOne(root).traverse, 'folder/@@meth/request'
1521
        )
1522

1523

1524
class TestSearchType(unittest.TestCase):
1✔
1525
    """Test `ZPublisher.HTTPRequest.search_type`
1526

1527
    see "https://github.com/zopefoundation/Zope/pull/512"
1528
    """
1529
    def check(self, val, expect):
1✔
1530
        mo = search_type(val)
1✔
1531
        if expect is None:
1✔
1532
            self.assertIsNone(mo)
1✔
1533
        else:
1534
            self.assertIsNotNone(mo)
1✔
1535
            self.assertEqual(mo.group(), expect)
1✔
1536

1537
    def test_image_control(self):
1✔
1538
        self.check("abc.x", ".x")
1✔
1539
        self.check("abc.y", ".y")
1✔
1540
        self.check("abc.xy", None)
1✔
1541

1542
    def test_type(self):
1✔
1543
        self.check("abc:int", ":int")
1✔
1544

1545
    def test_leftmost(self):
1✔
1546
        self.check("abc:int:record", ":record")
1✔
1547

1548
    def test_special(self):
1✔
1549
        self.check("abc:a-_0b", ":a-_0b")
1✔
1550

1551

1552
class _Unseekable:
1✔
1553
    """Auxiliary class emulating an unseekable file like object"""
1554
    def __init__(self, file):
1✔
1555
        for m in ("read", "readline", "close", "__del__"):
1✔
1556
            setattr(self, m, getattr(file, m))
1✔
1557

1558

1559
TEST_POST_ENVIRON = {
1✔
1560
    'CONTENT_LENGTH': None,
1561
    'REQUEST_METHOD': 'POST',
1562
    'SERVER_NAME': 'localhost',
1563
    'SERVER_PORT': '80',
1564
}
1565

1566
TEST_FILE_DATA = b'''
1✔
1567
--12345
1568
Content-Disposition: form-data; name="smallfile"; filename="smallfile"
1569
Content-Type: application/octet-stream
1570

1571
test
1572

1573
--12345--
1574
'''
1575

1576
TEST_FILE_DATA_UNSPECIFIED = b'''
1✔
1577
--12345
1578
Content-Disposition: form-data; name="smallfile"; filename=""
1579
Content-Type: application/octet-stream
1580

1581
--12345--
1582
'''
1583

1584
TEST_LARGEFILE_DATA = b'''
1✔
1585
--12345
1586
Content-Disposition: form-data; name="largefile"; filename="largefile"
1587
Content-Type: application/octet-stream
1588

1589
test %s
1590

1591
--12345--
1592
''' % (b'test' * 1000)
1593

1594
TEST_ISSUE_1095_DATA = b'''
1✔
1595
--12345
1596
Content-Disposition: form-data; name="r.x:records"; filename="fn"
1597
Content-Type: application/octet-stream
1598

1599
test
1600

1601
--12345
1602
Content-Disposition: form-data; name="r.x:records"
1603
Content-Type: text/html
1604

1605
<body>abc</body>
1606

1607
--12345--
1608
'''
1609

1610
TEST_FIELD_CHARSET_DATA = b'''
1✔
1611
--12345
1612
Content-Disposition: form-data; name="x"
1613
Content-Type: text/plain; charset=latin-1
1614

1615
%s
1616
--12345--
1617
''' % 'äöü'.encode("latin-1")
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc