• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 6263629025

21 Sep 2023 03:12PM UTC coverage: 82.146% (-0.01%) from 82.159%
6263629025

Pull #1164

github

web-flow
[pre-commit.ci lite] apply automatic fixes
Pull Request #1164: Move all linters to pre-commit.

4353 of 6963 branches covered (0.0%)

Branch coverage included in aggregate %.

487 of 487 new or added lines in 186 files covered. (100.0%)

27394 of 31684 relevant lines covered (86.46%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.17
/src/ZPublisher/tests/testHTTPRequest.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13

14
import sys
1✔
15
import unittest
1✔
16
import warnings
1✔
17
from contextlib import contextmanager
1✔
18
from io import BytesIO
1✔
19
from unittest.mock import patch
1✔
20
from urllib.parse import quote_plus
1✔
21

22
from AccessControl.tainted import TaintedString
1✔
23
from AccessControl.tainted import should_be_tainted
1✔
24
from zExceptions import NotFound
1✔
25
from zope.component import getGlobalSiteManager
1✔
26
from zope.component import provideAdapter
1✔
27
from zope.i18n.interfaces import IUserPreferredLanguages
1✔
28
from zope.i18n.interfaces.locales import ILocale
1✔
29
from zope.publisher.browser import BrowserLanguages
1✔
30
from zope.publisher.interfaces.http import IHTTPRequest
1✔
31
from zope.testing.cleanup import cleanUp
1✔
32
from ZPublisher.HTTPRequest import BadRequest
1✔
33
from ZPublisher.HTTPRequest import FileUpload
1✔
34
from ZPublisher.HTTPRequest import search_type
1✔
35
from ZPublisher.interfaces import IXmlrpcChecker
1✔
36
from ZPublisher.tests.testBaseRequest import TestRequestViewsBase
1✔
37
from ZPublisher.utils import basic_auth_encode
1✔
38
from ZPublisher.xmlrpc import is_xmlrpc_response
1✔
39

40

41
class RecordTests(unittest.TestCase):
1✔
42

43
    def _makeOne(self):
1✔
44
        from ZPublisher.HTTPRequest import record
1✔
45
        return record()
1✔
46

47
    def test_dict_methods(self):
1✔
48
        rec = self._makeOne()
1✔
49
        rec.a = 1
1✔
50
        self.assertEqual(rec['a'], 1)
1✔
51
        self.assertEqual(rec.get('a'), 1)
1✔
52
        self.assertEqual(list(rec.keys()), ['a'])
1✔
53
        self.assertEqual(list(rec.values()), [1])
1✔
54
        self.assertEqual(list(rec.items()), [('a', 1)])
1✔
55

56
    def test_dict_special_methods(self):
1✔
57
        rec = self._makeOne()
1✔
58
        rec.a = 1
1✔
59
        self.assertIn('a', rec)
1✔
60
        self.assertNotIn('b', rec)
1✔
61
        self.assertEqual(len(rec), 1)
1✔
62
        self.assertEqual(list(iter(rec)), ['a'])
1✔
63

64
    def test_copy(self):
1✔
65
        rec = self._makeOne()
1✔
66
        rec.a = 1
1✔
67
        rec.b = 'foo'
1✔
68
        new_rec = rec.copy()
1✔
69
        self.assertIsInstance(new_rec, dict)
1✔
70
        self.assertEqual(new_rec, {'a': 1, 'b': 'foo'})
1✔
71

72
    def test_eq(self):
1✔
73
        rec1 = self._makeOne()
1✔
74
        self.assertFalse(rec1, {})
1✔
75
        rec2 = self._makeOne()
1✔
76
        self.assertEqual(rec1, rec2)
1✔
77
        rec1.a = 1
1✔
78
        self.assertNotEqual(rec1, rec2)
1✔
79
        rec2.a = 1
1✔
80
        self.assertEqual(rec1, rec2)
1✔
81
        rec2.b = 'foo'
1✔
82
        self.assertNotEqual(rec1, rec2)
1✔
83

84
    def test__str__returns_native_string(self):
1✔
85
        rec = self._makeOne()
1✔
86
        rec.a = b'foo'
1✔
87
        rec.b = 8
1✔
88
        rec.c = 'bar'
1✔
89
        self.assertIsInstance(str(rec), str)
1✔
90

91
    def test_str(self):
1✔
92
        rec = self._makeOne()
1✔
93
        rec.a = 1
1✔
94
        self.assertEqual(str(rec), 'a: 1')
1✔
95

96
    def test_repr(self):
1✔
97
        rec = self._makeOne()
1✔
98
        rec.a = 1
1✔
99
        rec.b = 'foo'
1✔
100
        r = repr(rec)
1✔
101
        d = eval(r)
1✔
102
        self.assertEqual(d, rec.__dict__)
1✔
103

104

105
class HTTPRequestFactoryMixin:
1✔
106

107
    def tearDown(self):
1✔
108
        cleanUp()
×
109

110
    def _getTargetClass(self):
1✔
111
        from ZPublisher.HTTPRequest import HTTPRequest
1✔
112
        return HTTPRequest
1✔
113

114
    def _makePostEnviron(self, body=b'', multipart=True):
1✔
115
        environ = TEST_POST_ENVIRON.copy()
1✔
116
        environ["CONTENT_TYPE"] = \
1✔
117
            multipart and 'multipart/form-data; boundary=12345' \
118
            or 'application/x-www-form-urlencoded'
119
        environ['CONTENT_LENGTH'] = str(len(body))
1✔
120
        return environ
1✔
121

122
    def _makeOne(self, stdin=None, environ=None, response=None, clean=1):
1✔
123
        from ZPublisher.HTTPResponse import HTTPResponse
1✔
124
        if stdin is None:
1✔
125
            stdin = BytesIO()
1✔
126

127
        if environ is None:
1✔
128
            environ = {}
1✔
129

130
        if 'REQUEST_METHOD' not in environ:
1✔
131
            environ['REQUEST_METHOD'] = 'GET'
1✔
132

133
        if 'SERVER_NAME' not in environ:
1✔
134
            environ['SERVER_NAME'] = 'localhost'
1✔
135

136
        if 'SERVER_PORT' not in environ:
1✔
137
            environ['SERVER_PORT'] = '8080'
1✔
138

139
        if response is None:
1✔
140
            response = HTTPResponse(stdout=BytesIO())
1✔
141

142
        return self._getTargetClass()(stdin, environ, response, clean)
1✔
143

144

145
class HTTPRequestTests(unittest.TestCase, HTTPRequestFactoryMixin):
1✔
146

147
    def _processInputs(self, inputs):
1✔
148
        # Have the inputs processed, and return a HTTPRequest object
149
        # holding the result.
150
        # inputs is expected to be a list of (key, value) tuples, no CGI
151
        # encoding is required.
152

153
        query_string = []
1✔
154
        add = query_string.append
1✔
155
        for key, val in inputs:
1✔
156
            add(f"{quote_plus(key)}={quote_plus(val)}")
1✔
157
        query_string = '&'.join(query_string)
1✔
158

159
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
160
        env['QUERY_STRING'] = query_string
1✔
161
        req = self._makeOne(environ=env)
1✔
162
        req.processInputs()
1✔
163
        self._noFormValuesInOther(req)
1✔
164
        return req
1✔
165

166
    def _noTaintedValues(self, req):
1✔
167
        self.assertFalse(list(req.taintedform.keys()))
1✔
168

169
    def _valueIsOrHoldsTainted(self, val):
1✔
170
        # Recursively searches a structure for a TaintedString and returns 1
171
        # when one is found.
172
        # Also raises an Assertion if a string which *should* have been
173
        # tainted is found, or when a tainted string is not deemed dangerous.
174
        from AccessControl.tainted import TaintedString
1✔
175
        from ZPublisher.HTTPRequest import record
1✔
176

177
        retval = 0
1✔
178

179
        if isinstance(val, TaintedString):
1✔
180
            self.assertTrue(
1✔
181
                should_be_tainted(val._value),
182
                "%r is not dangerous, no taint required." % val)
183
            retval = 1
1✔
184

185
        elif isinstance(val, record):
1✔
186
            for attr, value in list(val.__dict__.items()):
1✔
187
                rval = self._valueIsOrHoldsTainted(attr)
1✔
188
                if rval:
1!
189
                    retval = 1
×
190
                rval = self._valueIsOrHoldsTainted(value)
1✔
191
                if rval:
1✔
192
                    retval = 1
1✔
193

194
        elif type(val) in (list, tuple):
1✔
195
            for entry in val:
1✔
196
                rval = self._valueIsOrHoldsTainted(entry)
1✔
197
                if rval:
1✔
198
                    retval = 1
1✔
199

200
        elif isinstance(val, str):
1✔
201
            self.assertFalse(
1✔
202
                should_be_tainted(val),
203
                "'%s' is dangerous and should have been tainted." % val)
204

205
        return retval
1✔
206

207
    def _noFormValuesInOther(self, req):
1✔
208
        for key in list(req.taintedform.keys()):
1✔
209
            self.assertNotIn(
1✔
210
                key,
211
                req.other,
212
                'REQUEST.other should not hold tainted values at first!'
213
            )
214

215
        for key in list(req.form.keys()):
1✔
216
            self.assertNotIn(
1✔
217
                key,
218
                req.other,
219
                'REQUEST.other should not hold form values at first!'
220
            )
221

222
    def _onlyTaintedformHoldsTaintedStrings(self, req):
1✔
223
        for key, val in list(req.taintedform.items()):
1✔
224
            self.assertTrue(
1✔
225
                self._valueIsOrHoldsTainted(key)
226
                or self._valueIsOrHoldsTainted(val),
227
                'Tainted form holds item %s that is not tainted' % key)
228

229
        for key, val in list(req.form.items()):
1✔
230
            if key in req.taintedform:
1✔
231
                continue
1✔
232
            self.assertFalse(
1✔
233
                self._valueIsOrHoldsTainted(key)
234
                or self._valueIsOrHoldsTainted(val),
235
                'Normal form holds item %s that is tainted' % key)
236

237
    def _taintedKeysAlsoInForm(self, req):
1✔
238
        for key in list(req.taintedform.keys()):
1✔
239
            self.assertIn(
1✔
240
                key,
241
                req.form,
242
                "Found tainted %s not in form" % key
243
            )
244
            self.assertEqual(
1✔
245
                req.form[key], req.taintedform[key],
246
                "Key %s not correctly reproduced in tainted; expected %r, "
247
                "got %r" % (key, req.form[key], req.taintedform[key]))
248

249
    def test_webdav_source_port_available(self):
1✔
250
        req = self._makeOne()
1✔
251
        self.assertFalse(req.get('WEBDAV_SOURCE_PORT'))
1✔
252

253
        req = self._makeOne(environ={'WEBDAV_SOURCE_PORT': 1})
1✔
254
        self.assertTrue(req.get('WEBDAV_SOURCE_PORT'))
1✔
255

256
    def test_no_docstring_on_instance(self):
1✔
257
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
258
        req = self._makeOne(environ=env)
1✔
259
        self.assertIsNone(req.__doc__)
1✔
260

261
    def test___bobo_traverse___raises(self):
1✔
262
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
263
        req = self._makeOne(environ=env)
1✔
264
        self.assertRaises(KeyError, req.__bobo_traverse__, 'REQUEST')
1✔
265
        self.assertRaises(KeyError, req.__bobo_traverse__, 'BODY')
1✔
266
        self.assertRaises(KeyError, req.__bobo_traverse__, 'BODYFILE')
1✔
267
        self.assertRaises(KeyError, req.__bobo_traverse__, 'RESPONSE')
1✔
268

269
    def test_processInputs_wo_query_string(self):
1✔
270
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
271
        req = self._makeOne(environ=env)
1✔
272
        req.processInputs()
1✔
273
        self._noFormValuesInOther(req)
1✔
274
        self.assertEqual(req.form, {})
1✔
275

276
    def test_processInputs_wo_marshalling(self):
1✔
277
        inputs = (
1✔
278
            ('foo', 'bar'), ('spam', 'eggs'),
279
            ('number', '1'),
280
            ('spacey key', 'val'), ('key', 'spacey val'),
281
            ('multi', '1'), ('multi', '2'))
282
        req = self._processInputs(inputs)
1✔
283

284
        formkeys = sorted(req.form.keys())
1✔
285
        self.assertEqual(
1✔
286
            formkeys,
287
            ['foo', 'key', 'multi', 'number', 'spacey key', 'spam'])
288
        self.assertEqual(req['number'], '1')
1✔
289
        self.assertEqual(req['multi'], ['1', '2'])
1✔
290
        self.assertEqual(req['spacey key'], 'val')
1✔
291
        self.assertEqual(req['key'], 'spacey val')
1✔
292

293
        self._noTaintedValues(req)
1✔
294
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
295

296
    def test_processInputs_w_simple_marshalling(self):
1✔
297
        from DateTime.DateTime import DateTime
1✔
298
        inputs = (
1✔
299
            ('num:int', '42'), ('fract:float', '4.2'), ('bign:long', '45'),
300
            ('words:string', 'Some words'), ('2tokens:tokens', 'one two'),
301
            ('aday:date', '2002/07/23'),
302
            ('accountedfor:required', 'yes'),
303
            ('multiline:lines', 'one\ntwo'),
304
            ('morewords:text', 'one\ntwo\n'))
305
        req = self._processInputs(inputs)
1✔
306

307
        formkeys = sorted(req.form.keys())
1✔
308
        self.assertEqual(
1✔
309
            formkeys,
310
            ['2tokens', 'accountedfor', 'aday', 'bign',
311
             'fract', 'morewords', 'multiline', 'num', 'words'])
312

313
        self.assertEqual(req['2tokens'], ['one', 'two'])
1✔
314
        self.assertEqual(req['accountedfor'], 'yes')
1✔
315
        self.assertEqual(req['aday'], DateTime('2002/07/23'))
1✔
316
        self.assertEqual(req['bign'], 45)
1✔
317
        self.assertEqual(req['fract'], 4.2)
1✔
318
        self.assertEqual(req['morewords'], 'one\ntwo\n')
1✔
319
        self.assertEqual(req['multiline'], ['one', 'two'])
1✔
320
        self.assertEqual(req['num'], 42)
1✔
321
        self.assertEqual(req['words'], 'Some words')
1✔
322

323
        self._noTaintedValues(req)
1✔
324
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
325

326
    def test_processInputs_w_unicode_conversions(self):
1✔
327
        # This tests native strings.
328
        reg_char = '\xae'
1✔
329
        inputs = (('ustring:ustring:utf8', 'test' + reg_char),
1✔
330
                  ('utext:utext:utf8',
331
                   'test' + reg_char + '\ntest' + reg_char + '\n'),
332
                  ('utokens:utokens:utf8',
333
                   'test' + reg_char + ' test' + reg_char),
334
                  ('ulines:ulines:utf8',
335
                   'test' + reg_char + '\ntest' + reg_char),
336
                  ('nouconverter:string:utf8', 'test' + reg_char))
337
        # unicode converters will go away with Zope 6
338
        # ignore deprecation warning for test run
339
        with warnings.catch_warnings():
1✔
340
            warnings.simplefilter('ignore')
1✔
341
            req = self._processInputs(inputs)
1✔
342

343
        formkeys = sorted(req.form.keys())
1✔
344
        self.assertEqual(
1✔
345
            formkeys,
346
            ['nouconverter', 'ulines', 'ustring', 'utext', 'utokens'])
347

348
        self.assertEqual(req['ustring'], 'test\u00AE')
1✔
349
        self.assertEqual(req['utext'], 'test\u00AE\ntest\u00AE\n')
1✔
350
        self.assertEqual(req['utokens'], ['test\u00AE', 'test\u00AE'])
1✔
351
        self.assertEqual(req['ulines'], ['test\u00AE', 'test\u00AE'])
1✔
352

353
        # expect a utf-8 encoded version
354
        self.assertEqual(req['nouconverter'], 'test' + reg_char)
1✔
355

356
        self._noTaintedValues(req)
1✔
357
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
358

359
    def test_processInputs_w_simple_containers(self):
1✔
360
        inputs = (
1✔
361
            ('oneitem:list', 'one'),
362
            ('alist:list', 'one'), ('alist:list', 'two'),
363
            ('oneitemtuple:tuple', 'one'),
364
            ('atuple:tuple', 'one'), ('atuple:tuple', 'two'),
365
            ('onerec.foo:record', 'foo'), ('onerec.bar:record', 'bar'),
366
            ('setrec.foo:records', 'foo'), ('setrec.bar:records', 'bar'),
367
            ('setrec.foo:records', 'spam'), ('setrec.bar:records', 'eggs'))
368
        req = self._processInputs(inputs)
1✔
369

370
        formkeys = sorted(req.form.keys())
1✔
371
        self.assertEqual(
1✔
372
            formkeys,
373
            ['alist', 'atuple', 'oneitem', 'oneitemtuple', 'onerec', 'setrec'])
374

375
        self.assertEqual(req['oneitem'], ['one'])
1✔
376
        self.assertEqual(req['oneitemtuple'], ('one',))
1✔
377
        self.assertEqual(req['alist'], ['one', 'two'])
1✔
378
        self.assertEqual(req['atuple'], ('one', 'two'))
1✔
379
        self.assertEqual(req['onerec'].foo, 'foo')
1✔
380
        self.assertEqual(req['onerec'].bar, 'bar')
1✔
381
        self.assertEqual(len(req['setrec']), 2)
1✔
382
        self.assertEqual(req['setrec'][0].foo, 'foo')
1✔
383
        self.assertEqual(req['setrec'][0].bar, 'bar')
1✔
384
        self.assertEqual(req['setrec'][1].foo, 'spam')
1✔
385
        self.assertEqual(req['setrec'][1].bar, 'eggs')
1✔
386

387
        self._noTaintedValues(req)
1✔
388
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
389

390
    def test_processInputs_w_marshalling_into_sequences(self):
1✔
391
        inputs = (
1✔
392
            ('ilist:int:list', '1'), ('ilist:int:list', '2'),
393
            ('ilist:list:int', '3'),
394
            ('ftuple:float:tuple', '1.0'), ('ftuple:float:tuple', '1.1'),
395
            ('ftuple:tuple:float', '1.2'),
396
            ('tlist:tokens:list', 'one two'), ('tlist:list:tokens', '3 4'))
397
        req = self._processInputs(inputs)
1✔
398

399
        formkeys = sorted(req.form.keys())
1✔
400
        self.assertEqual(formkeys, ['ftuple', 'ilist', 'tlist'])
1✔
401

402
        self.assertEqual(req['ilist'], [1, 2, 3])
1✔
403
        self.assertEqual(req['ftuple'], (1.0, 1.1, 1.2))
1✔
404
        self.assertEqual(req['tlist'], [['one', 'two'], ['3', '4']])
1✔
405

406
        self._noTaintedValues(req)
1✔
407
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
408

409
    def test_processInputs_w_records_w_sequences(self):
1✔
410
        inputs = (
1✔
411
            ('onerec.name:record', 'foo'),
412
            ('onerec.tokens:tokens:record', 'one two'),
413
            ('onerec.ints:int:record', '1'),
414
            ('onerec.ints:int:record', '2'),
415

416
            ('setrec.name:records', 'first'),
417
            ('setrec.ilist:list:int:records', '1'),
418
            ('setrec.ilist:list:int:records', '2'),
419
            ('setrec.ituple:tuple:int:records', '1'),
420
            ('setrec.ituple:tuple:int:records', '2'),
421
            ('setrec.name:records', 'second'),
422
            ('setrec.ilist:list:int:records', '1'),
423
            ('setrec.ilist:list:int:records', '2'),
424
            ('setrec.ituple:tuple:int:records', '1'),
425
            ('setrec.ituple:tuple:int:records', '2'))
426
        req = self._processInputs(inputs)
1✔
427

428
        formkeys = sorted(req.form.keys())
1✔
429
        self.assertEqual(formkeys, ['onerec', 'setrec'])
1✔
430

431
        self.assertEqual(req['onerec'].name, 'foo')
1✔
432
        self.assertEqual(req['onerec'].tokens, ['one', 'two'])
1✔
433
        # Implicit sequences and records don't mix.
434
        self.assertEqual(req['onerec'].ints, 2)
1✔
435

436
        self.assertEqual(len(req['setrec']), 2)
1✔
437
        self.assertEqual(req['setrec'][0].name, 'first')
1✔
438
        self.assertEqual(req['setrec'][1].name, 'second')
1✔
439

440
        for i in range(2):
1✔
441
            self.assertEqual(req['setrec'][i].ilist, [1, 2])
1✔
442
            self.assertEqual(req['setrec'][i].ituple, (1, 2))
1✔
443

444
        self._noTaintedValues(req)
1✔
445
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
446

447
    def test_processInputs_w_defaults(self):
1✔
448
        inputs = (
1✔
449
            ('foo:default:int', '5'),
450

451
            ('alist:int:default', '3'),
452
            ('alist:int:default', '4'),
453
            ('alist:int:default', '5'),
454
            ('alist:int', '1'),
455
            ('alist:int', '2'),
456

457
            ('explicitlist:int:list:default', '3'),
458
            ('explicitlist:int:list:default', '4'),
459
            ('explicitlist:int:list:default', '5'),
460
            ('explicitlist:int:list', '1'),
461
            ('explicitlist:int:list', '2'),
462

463
            ('bar.spam:record:default', 'eggs'),
464
            ('bar.foo:record:default', 'foo'),
465
            ('bar.foo:record', 'baz'),
466

467
            ('setrec.spam:records:default', 'eggs'),
468
            ('setrec.foo:records:default', 'foo'),
469
            ('setrec.foo:records', 'baz'),
470
            ('setrec.foo:records', 'ham'),
471
        )
472
        req = self._processInputs(inputs)
1✔
473

474
        formkeys = sorted(req.form.keys())
1✔
475
        self.assertEqual(
1✔
476
            formkeys, ['alist', 'bar', 'explicitlist', 'foo', 'setrec'])
477

478
        self.assertEqual(req['alist'], [1, 2, 3, 4, 5])
1✔
479
        self.assertEqual(req['explicitlist'], [1, 2, 3, 4, 5])
1✔
480

481
        self.assertEqual(req['foo'], 5)
1✔
482
        self.assertEqual(req['bar'].spam, 'eggs')
1✔
483
        self.assertEqual(req['bar'].foo, 'baz')
1✔
484

485
        self.assertEqual(len(req['setrec']), 2)
1✔
486
        self.assertEqual(req['setrec'][0].spam, 'eggs')
1✔
487
        self.assertEqual(req['setrec'][0].foo, 'baz')
1✔
488
        self.assertEqual(req['setrec'][1].spam, 'eggs')
1✔
489
        self.assertEqual(req['setrec'][1].foo, 'ham')
1✔
490

491
        self._noTaintedValues(req)
1✔
492
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
493

494
    def test_processInputs_wo_marshalling_w_Taints(self):
1✔
495
        inputs = (
1✔
496
            ('foo', 'bar'), ('spam', 'eggs'),
497
            ('number', '1'),
498
            ('tainted', '<tainted value>'),
499
            ('<tainted key>', 'value'),
500
            ('spacey key', 'val'), ('key', 'spacey val'),
501
            ('tinitmulti', '<1>'), ('tinitmulti', '2'),
502
            ('tdefermulti', '1'), ('tdefermulti', '<2>'),
503
            ('tallmulti', '<1>'), ('tallmulti', '<2>'))
504
        req = self._processInputs(inputs)
1✔
505

506
        taintedformkeys = sorted(req.taintedform.keys())
1✔
507
        self.assertEqual(
1✔
508
            taintedformkeys,
509
            ['<tainted key>', 'tainted',
510
             'tallmulti', 'tdefermulti', 'tinitmulti'])
511

512
        self._taintedKeysAlsoInForm(req)
1✔
513
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
514

515
    def test_processInputs_w_simple_marshalling_w_taints(self):
1✔
516
        inputs = (
1✔
517
            ('<tnum>:int', '42'), ('<tfract>:float', '4.2'),
518
            ('<tbign>:long', '45'),
519
            ('twords:string', 'Some <words>'),
520
            ('t2tokens:tokens', 'one <two>'),
521
            ('<taday>:date', '2002/07/23'),
522
            ('taccountedfor:required', '<yes>'),
523
            ('tmultiline:lines', '<one\ntwo>'),
524
            ('tmorewords:text', '<one\ntwo>\n'))
525
        req = self._processInputs(inputs)
1✔
526

527
        taintedformkeys = sorted(req.taintedform.keys())
1✔
528
        self.assertEqual(
1✔
529
            taintedformkeys,
530
            ['<taday>', '<tbign>', '<tfract>',
531
             '<tnum>', 't2tokens', 'taccountedfor', 'tmorewords', 'tmultiline',
532
             'twords'])
533

534
        self._taintedKeysAlsoInForm(req)
1✔
535
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
536

537
    def test_processInputs_w_unicode_w_taints(self):
1✔
538
        inputs = (
1✔
539
            ('tustring:ustring:utf8', '<test\xc2\xae>'),
540
            ('tutext:utext:utf8', '<test\xc2\xae>\n<test\xc2\xae\n>'),
541

542
            ('tinitutokens:utokens:utf8', '<test\xc2\xae> test\xc2\xae'),
543
            ('tinitulines:ulines:utf8', '<test\xc2\xae>\ntest\xc2\xae'),
544

545
            ('tdeferutokens:utokens:utf8', 'test\xc2\xae <test\xc2\xae>'),
546
            ('tdeferulines:ulines:utf8', 'test\xc2\xae\n<test\xc2\xae>'),
547

548
            ('tnouconverter:string:utf8', '<test\xc2\xae>'),
549
        )
550

551
        # unicode converters will go away with Zope 6
552
        # ignore deprecation warning for test run
553
        with warnings.catch_warnings():
1✔
554
            warnings.simplefilter('ignore')
1✔
555
            req = self._processInputs(inputs)
1✔
556

557
        taintedformkeys = sorted(req.taintedform.keys())
1✔
558
        self.assertEqual(
1✔
559
            taintedformkeys,
560
            ['tdeferulines', 'tdeferutokens',
561
             'tinitulines', 'tinitutokens', 'tnouconverter', 'tustring',
562
             'tutext'])
563

564
        self._taintedKeysAlsoInForm(req)
1✔
565
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
566

567
    def test_processInputs_w_simple_containers_w_taints(self):
1✔
568
        inputs = (
1✔
569
            ('toneitem:list', '<one>'),
570
            ('<tkeyoneitem>:list', 'one'),
571
            ('tinitalist:list', '<one>'), ('tinitalist:list', 'two'),
572
            ('tdeferalist:list', 'one'), ('tdeferalist:list', '<two>'),
573

574
            ('toneitemtuple:tuple', '<one>'),
575
            ('tinitatuple:tuple', '<one>'), ('tinitatuple:tuple', 'two'),
576
            ('tdeferatuple:tuple', 'one'), ('tdeferatuple:tuple', '<two>'),
577

578
            ('tinitonerec.foo:record', '<foo>'),
579
            ('tinitonerec.bar:record', 'bar'),
580
            ('tdeferonerec.foo:record', 'foo'),
581
            ('tdeferonerec.bar:record', '<bar>'),
582

583
            ('tinitinitsetrec.foo:records', '<foo>'),
584
            ('tinitinitsetrec.bar:records', 'bar'),
585
            ('tinitinitsetrec.foo:records', 'spam'),
586
            ('tinitinitsetrec.bar:records', 'eggs'),
587

588
            ('tinitdefersetrec.foo:records', 'foo'),
589
            ('tinitdefersetrec.bar:records', '<bar>'),
590
            ('tinitdefersetrec.foo:records', 'spam'),
591
            ('tinitdefersetrec.bar:records', 'eggs'),
592

593
            ('tdeferinitsetrec.foo:records', 'foo'),
594
            ('tdeferinitsetrec.bar:records', 'bar'),
595
            ('tdeferinitsetrec.foo:records', '<spam>'),
596
            ('tdeferinitsetrec.bar:records', 'eggs'),
597

598
            ('tdeferdefersetrec.foo:records', 'foo'),
599
            ('tdeferdefersetrec.bar:records', 'bar'),
600
            ('tdeferdefersetrec.foo:records', 'spam'),
601
            ('tdeferdefersetrec.bar:records', '<eggs>'))
602
        req = self._processInputs(inputs)
1✔
603

604
        taintedformkeys = sorted(req.taintedform.keys())
1✔
605
        self.assertEqual(
1✔
606
            taintedformkeys,
607
            ['<tkeyoneitem>', 'tdeferalist',
608
             'tdeferatuple', 'tdeferdefersetrec', 'tdeferinitsetrec',
609
             'tdeferonerec', 'tinitalist', 'tinitatuple', 'tinitdefersetrec',
610
             'tinitinitsetrec', 'tinitonerec', 'toneitem', 'toneitemtuple'])
611

612
        self._taintedKeysAlsoInForm(req)
1✔
613
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
614

615
    def test_processInputs_w_records_w_sequences_tainted(self):
1✔
616
        inputs = (
1✔
617
            ('tinitonerec.tokens:tokens:record', '<one> two'),
618
            ('tdeferonerec.tokens:tokens:record', 'one <two>'),
619

620
            ('tinitsetrec.name:records', 'first'),
621
            ('tinitsetrec.ilist:list:records', '<1>'),
622
            ('tinitsetrec.ilist:list:records', '2'),
623
            ('tinitsetrec.ituple:tuple:int:records', '1'),
624
            ('tinitsetrec.ituple:tuple:int:records', '2'),
625
            ('tinitsetrec.name:records', 'second'),
626
            ('tinitsetrec.ilist:list:records', '1'),
627
            ('tinitsetrec.ilist:list:records', '2'),
628
            ('tinitsetrec.ituple:tuple:int:records', '1'),
629
            ('tinitsetrec.ituple:tuple:int:records', '2'),
630

631
            ('tdeferfirstsetrec.name:records', 'first'),
632
            ('tdeferfirstsetrec.ilist:list:records', '1'),
633
            ('tdeferfirstsetrec.ilist:list:records', '<2>'),
634
            ('tdeferfirstsetrec.ituple:tuple:int:records', '1'),
635
            ('tdeferfirstsetrec.ituple:tuple:int:records', '2'),
636
            ('tdeferfirstsetrec.name:records', 'second'),
637
            ('tdeferfirstsetrec.ilist:list:records', '1'),
638
            ('tdeferfirstsetrec.ilist:list:records', '2'),
639
            ('tdeferfirstsetrec.ituple:tuple:int:records', '1'),
640
            ('tdeferfirstsetrec.ituple:tuple:int:records', '2'),
641

642
            ('tdefersecondsetrec.name:records', 'first'),
643
            ('tdefersecondsetrec.ilist:list:records', '1'),
644
            ('tdefersecondsetrec.ilist:list:records', '2'),
645
            ('tdefersecondsetrec.ituple:tuple:int:records', '1'),
646
            ('tdefersecondsetrec.ituple:tuple:int:records', '2'),
647
            ('tdefersecondsetrec.name:records', 'second'),
648
            ('tdefersecondsetrec.ilist:list:records', '1'),
649
            ('tdefersecondsetrec.ilist:list:records', '<2>'),
650
            ('tdefersecondsetrec.ituple:tuple:int:records', '1'),
651
            ('tdefersecondsetrec.ituple:tuple:int:records', '2'),
652
        )
653
        req = self._processInputs(inputs)
1✔
654

655
        taintedformkeys = sorted(req.taintedform.keys())
1✔
656
        self.assertEqual(
1✔
657
            taintedformkeys,
658
            ['tdeferfirstsetrec', 'tdeferonerec',
659
             'tdefersecondsetrec', 'tinitonerec', 'tinitsetrec'])
660

661
        self._taintedKeysAlsoInForm(req)
1✔
662
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
663

664
    def test_processInputs_w_defaults_w_taints(self):
1✔
665
        inputs = (
1✔
666
            ('tfoo:default', '<5>'),
667

668
            ('doesnnotapply:default', '<4>'),
669
            ('doesnnotapply', '4'),
670

671
            ('tinitlist:default', '3'),
672
            ('tinitlist:default', '4'),
673
            ('tinitlist:default', '5'),
674
            ('tinitlist', '<1>'),
675
            ('tinitlist', '2'),
676

677
            ('tdeferlist:default', '3'),
678
            ('tdeferlist:default', '<4>'),
679
            ('tdeferlist:default', '5'),
680
            ('tdeferlist', '1'),
681
            ('tdeferlist', '2'),
682

683
            ('tinitbar.spam:record:default', 'eggs'),
684
            ('tinitbar.foo:record:default', 'foo'),
685
            ('tinitbar.foo:record', '<baz>'),
686
            ('tdeferbar.spam:record:default', '<eggs>'),
687
            ('tdeferbar.foo:record:default', 'foo'),
688
            ('tdeferbar.foo:record', 'baz'),
689

690
            ('rdoesnotapply.spam:record:default', '<eggs>'),
691
            ('rdoesnotapply.spam:record', 'eggs'),
692

693
            ('tinitsetrec.spam:records:default', 'eggs'),
694
            ('tinitsetrec.foo:records:default', 'foo'),
695
            ('tinitsetrec.foo:records', '<baz>'),
696
            ('tinitsetrec.foo:records', 'ham'),
697

698
            ('tdefersetrec.spam:records:default', '<eggs>'),
699
            ('tdefersetrec.foo:records:default', 'foo'),
700
            ('tdefersetrec.foo:records', 'baz'),
701
            ('tdefersetrec.foo:records', 'ham'),
702

703
            ('srdoesnotapply.foo:records:default', '<eggs>'),
704
            ('srdoesnotapply.foo:records', 'baz'),
705
            ('srdoesnotapply.foo:records', 'ham'))
706
        req = self._processInputs(inputs)
1✔
707

708
        taintedformkeys = sorted(req.taintedform.keys())
1✔
709
        self.assertEqual(
1✔
710
            taintedformkeys,
711
            ['tdeferbar', 'tdeferlist',
712
             'tdefersetrec', 'tfoo', 'tinitbar', 'tinitlist', 'tinitsetrec'])
713

714
        self._taintedKeysAlsoInForm(req)
1✔
715
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
716

717
    def test_processInputs_w_tainted_attribute_raises(self):
1✔
718
        input = ('taintedattr.here<be<taint:record', 'value',)
1✔
719

720
        self.assertRaises(ValueError, self._processInputs, input)
1✔
721

722
    def test_processInputs_w_tainted_values_cleans_exceptions(self):
1✔
723
        # Feed tainted garbage to the conversion methods, and any exception
724
        # returned should be HTML safe
725
        from DateTime.interfaces import SyntaxError
1✔
726
        from ZPublisher.Converters import type_converters
1✔
727
        for type, convert in list(type_converters.items()):
1✔
728
            try:
1✔
729
                # unicode converters will go away with Zope 6
730
                # ignore deprecation warning for test run
731
                with warnings.catch_warnings():
1✔
732
                    warnings.simplefilter('ignore')
1✔
733
                    convert('<html garbage>')
1✔
734
            except Exception as e:
1!
735
                self.assertNotIn(
1✔
736
                    '<',
737
                    e.args,
738
                    '%s converter does not quote unsafe value!' % type
739
                )
740
            except SyntaxError as e:
×
741
                self.assertNotIn(
×
742
                    '<',
743
                    e,
744
                    '%s converter does not quote unsafe value!' % type
745
                )
746

747
    def test_processInputs_w_dotted_name_as_tuple(self):
1✔
748
        # Collector #500
749
        inputs = (
1✔
750
            ('name.:tuple', 'name with dot as tuple'),)
751
        req = self._processInputs(inputs)
1✔
752

753
        formkeys = sorted(req.form.keys())
1✔
754
        self.assertEqual(formkeys, ['name.'])
1✔
755

756
        self.assertEqual(req['name.'], ('name with dot as tuple',))
1✔
757

758
        self._noTaintedValues(req)
1✔
759
        self._onlyTaintedformHoldsTaintedStrings(req)
1✔
760

761
    def test_processInputs_w_cookie_parsing(self):
1✔
762
        env = {'SERVER_NAME': 'testingharnas', 'SERVER_PORT': '80'}
1✔
763

764
        env['HTTP_COOKIE'] = 'foo=bar; baz=gee'
1✔
765
        req = self._makeOne(environ=env)
1✔
766
        self.assertEqual(req.cookies['foo'], 'bar')
1✔
767
        self.assertEqual(req.cookies['baz'], 'gee')
1✔
768

769
        env['HTTP_COOKIE'] = 'foo=bar; baz="gee, like, e=mc^2"'
1✔
770
        req = self._makeOne(environ=env)
1✔
771
        self.assertEqual(req.cookies['foo'], 'bar')
1✔
772
        self.assertEqual(req.cookies['baz'], 'gee, like, e=mc^2')
1✔
773

774
        # Collector #1498: empty cookies
775
        env['HTTP_COOKIE'] = 'foo=bar; hmm; baz=gee'
1✔
776
        req = self._makeOne(environ=env)
1✔
777
        self.assertEqual(req.cookies['foo'], 'bar')
1✔
778
        self.assertEqual(req.cookies['hmm'], '')
1✔
779
        self.assertEqual(req.cookies['baz'], 'gee')
1✔
780

781
        # Unquoted multi-space cookies
782
        env['HTTP_COOKIE'] = 'single=cookie data; ' \
1✔
783
                             'quoted="cookie data with unquoted spaces"; ' \
784
                             'multi=cookie data with unquoted spaces; ' \
785
                             'multi2=cookie data with unquoted spaces'
786
        req = self._makeOne(environ=env)
1✔
787
        self.assertEqual(req.cookies['single'], 'cookie data')
1✔
788
        self.assertEqual(req.cookies['quoted'],
1✔
789
                         'cookie data with unquoted spaces')
790
        self.assertEqual(req.cookies['multi'],
1✔
791
                         'cookie data with unquoted spaces')
792
        self.assertEqual(req.cookies['multi2'],
1✔
793
                         'cookie data with unquoted spaces')
794

795
    def test_processInputs_xmlrpc(self):
1✔
796
        TEST_METHOD_CALL = (
1✔
797
            b'<?xml version="1.0"?>'
798
            b'<methodCall><methodName>test</methodName></methodCall>'
799
        )
800
        environ = self._makePostEnviron(body=TEST_METHOD_CALL)
1✔
801
        environ['CONTENT_TYPE'] = 'text/xml'
1✔
802
        req = self._makeOne(stdin=BytesIO(TEST_METHOD_CALL), environ=environ)
1✔
803
        req.processInputs()
1✔
804
        self.assertEqual(req.PATH_INFO, '/test')
1✔
805
        self.assertEqual(req.args, ())
1✔
806

807
    def test_processInputs_xmlrpc_query_string(self):
1✔
808
        TEST_METHOD_CALL = (
1✔
809
            b'<?xml version="1.0"?>'
810
            b'<methodCall><methodName>test</methodName></methodCall>'
811
        )
812
        environ = self._makePostEnviron(body=TEST_METHOD_CALL)
1✔
813
        environ['CONTENT_TYPE'] = 'text/xml'
1✔
814
        environ['QUERY_STRING'] = 'x=1'
1✔
815
        req = self._makeOne(stdin=BytesIO(TEST_METHOD_CALL), environ=environ)
1✔
816
        req.processInputs()
1✔
817
        self.assertEqual(req.PATH_INFO, '/test')
1✔
818
        self.assertEqual(req.args, ())
1✔
819
        self.assertEqual(req.form["x"], '1')
1✔
820

821
    def test_processInputs_xmlrpc_method(self):
1✔
822
        TEST_METHOD_CALL = (
1✔
823
            b'<?xml version="1.0"?>'
824
            b'<methodCall><methodName>test</methodName></methodCall>'
825
        )
826
        environ = self._makePostEnviron(body=TEST_METHOD_CALL)
1✔
827
        environ['CONTENT_TYPE'] = 'text/xml'
1✔
828
        environ['QUERY_STRING'] = ':method=method'
1✔
829
        req = self._makeOne(stdin=BytesIO(TEST_METHOD_CALL), environ=environ)
1✔
830
        with self.assertRaises(BadRequest):
1✔
831
            req.processInputs()
1✔
832

833
    def test_processInputs_SOAP(self):
1✔
834
        # ZPublisher does not really have SOAP support
835
        # all it does is put the body into ``SOAPXML``
836
        body = b'soap'
1✔
837
        environ = TEST_POST_ENVIRON.copy()
1✔
838
        environ['HTTP_SOAPACTION'] = "soapaction"
1✔
839
        req = self._makeOne(stdin=BytesIO(body), environ=environ)
1✔
840
        req.processInputs()
1✔
841
        self.assertEqual(req.SOAPXML, body)
1✔
842

843
    def test_processInputs_SOAP_query_string(self):
1✔
844
        # ZPublisher does not really have SOAP support
845
        # all it does is put the body into ``SOAPXML``
846
        body = b'soap'
1✔
847
        environ = TEST_POST_ENVIRON.copy()
1✔
848
        environ['QUERY_STRING'] = 'x=1'
1✔
849
        environ['HTTP_SOAPACTION'] = "soapaction"
1✔
850
        req = self._makeOne(stdin=BytesIO(body), environ=environ)
1✔
851
        req.processInputs()
1✔
852
        self.assertEqual(req.SOAPXML, body)
1✔
853
        self.assertEqual(req.form["x"], '1')
1✔
854

855
    def test_processInputs_w_urlencoded_and_qs(self):
1✔
856
        body = b'foo=1'
1✔
857
        environ = {
1✔
858
            'CONTENT_TYPE': 'application/x-www-form-urlencoded',
859
            'CONTENT_LENGTH': len(body),
860
            'QUERY_STRING': 'bar=2',
861
            'REQUEST_METHOD': 'POST',
862
        }
863
        req = self._makeOne(stdin=BytesIO(body), environ=environ)
1✔
864
        req.processInputs()
1✔
865
        self.assertEqual(req.form['foo'], '1')
1✔
866
        self.assertEqual(req.form['bar'], '2')
1✔
867

868
    def test_close_removes_stdin_references(self):
1✔
869
        # Verifies that all references to the input stream go away on
870
        # request.close().  Otherwise a tempfile may stick around.
871
        s = BytesIO(TEST_FILE_DATA)
1✔
872
        start_count = sys.getrefcount(s)
1✔
873

874
        environ = self._makePostEnviron(body=TEST_FILE_DATA)
1✔
875
        req = self._makeOne(stdin=s, environ=environ)
1✔
876
        req.processInputs()
1✔
877
        self.assertNotEqual(start_count, sys.getrefcount(s))  # Precondition
1✔
878
        req.close()
1✔
879
        self.assertEqual(start_count, sys.getrefcount(s))  # The test
1✔
880

881
    def test_processInputs_w_large_input_gets_tempfile(self):
1✔
882
        # checks fileupload object supports the filename
883
        s = BytesIO(TEST_LARGEFILE_DATA)
1✔
884

885
        environ = self._makePostEnviron(body=TEST_LARGEFILE_DATA)
1✔
886
        req = self._makeOne(stdin=s, environ=environ)
1✔
887
        req.processInputs()
1✔
888
        f = req.form.get('largefile')
1✔
889
        self.assertTrue(f.name)
1✔
890
        self.assertEqual(4006, len(f.file.read()))
1✔
891
        f.file.close()
1✔
892

893
    def test_processInputs_with_file_upload_gets_iterator(self):
1✔
894
        # checks fileupload object supports the iterator protocol
895
        # collector entry 1837
896
        s = BytesIO(TEST_FILE_DATA)
1✔
897

898
        environ = self._makePostEnviron(body=TEST_FILE_DATA)
1✔
899
        req = self._makeOne(stdin=s, environ=environ)
1✔
900
        req.processInputs()
1✔
901
        f = req.form.get('smallfile')
1✔
902
        self.assertEqual(list(f), [b'test\n'])
1✔
903
        f.seek(0)
1✔
904
        self.assertEqual(next(f), b'test\n')
1✔
905

906
    def test_processInputs_BODY(self):
1✔
907
        s = BytesIO(b"body")
1✔
908
        environ = TEST_POST_ENVIRON.copy()
1✔
909
        environ["CONTENT_TYPE"] = "text/plain"
1✔
910
        req = self._makeOne(stdin=s, environ=environ)
1✔
911
        req.processInputs()
1✔
912
        self.assertEqual(req["BODY"], b"body")
1✔
913
        self.assertIs(req["BODYFILE"], s)
1✔
914

915
    def test_processInputs_BODY_unseekable(self):
1✔
916
        s = _Unseekable(BytesIO(b"body"))
1✔
917
        environ = TEST_POST_ENVIRON.copy()
1✔
918
        environ["CONTENT_TYPE"] = "text/plain"
1✔
919
        req = self._makeOne(stdin=s, environ=environ)
1✔
920
        req.processInputs()
1✔
921
        self.assertEqual(req["BODY"], b"body")
1✔
922
        self.assertIs(req["BODYFILE"], s)
1✔
923

924
    def test_processInputs_seekable_form_data(self):
1✔
925
        s = BytesIO(TEST_FILE_DATA)
1✔
926
        environ = self._makePostEnviron(body=TEST_FILE_DATA)
1✔
927
        req = self._makeOne(stdin=s, environ=environ)
1✔
928
        req.processInputs()
1✔
929
        f = req.form.get('smallfile')
1✔
930
        self.assertEqual(list(f), [b'test\n'])
1✔
931
        self.assertEqual(req["BODY"], TEST_FILE_DATA)
1✔
932
        self.assertEqual(req["BODYFILE"].read(), TEST_FILE_DATA)
1✔
933

934
    def test_processInputs_unseekable_form_data(self):
1✔
935
        s = _Unseekable(BytesIO(TEST_FILE_DATA))
1✔
936
        environ = self._makePostEnviron(body=TEST_FILE_DATA)
1✔
937
        req = self._makeOne(stdin=s, environ=environ)
1✔
938
        req.processInputs()
1✔
939
        f = req.form.get('smallfile')
1✔
940
        self.assertEqual(list(f), [b'test\n'])
1✔
941
        # we cannot access ``BODY`` in this case
942
        # as the underlying file has been read
943
        with self.assertRaises(KeyError):
1✔
944
            req["BODY"]
1✔
945

946
    def test_processInputs_unspecified_file(self):
1✔
947
        s = BytesIO(TEST_FILE_DATA_UNSPECIFIED)
1✔
948
        environ = self._makePostEnviron(body=TEST_FILE_DATA_UNSPECIFIED)
1✔
949
        req = self._makeOne(stdin=s, environ=environ)
1✔
950
        req.processInputs()
1✔
951
        f = req.form.get('smallfile')
1✔
952
        self.assertEqual(f.filename, "")
1✔
953
        self.assertEqual(list(f), [])
1✔
954

955
    def test__authUserPW_simple(self):
1✔
956
        user_id = 'user'
1✔
957
        password = 'password'
1✔
958
        auth_header = basic_auth_encode(user_id, password)
1✔
959

960
        environ = {'HTTP_AUTHORIZATION': auth_header}
1✔
961
        request = self._makeOne(environ=environ)
1✔
962

963
        user_id_x, password_x = request._authUserPW()
1✔
964

965
        self.assertEqual(user_id_x, user_id)
1✔
966
        self.assertEqual(password_x, password)
1✔
967

968
    def test__authUserPW_with_embedded_colon(self):
1✔
969
        user_id = 'user'
1✔
970
        password = 'embedded:colon'
1✔
971
        auth_header = basic_auth_encode(user_id, password)
1✔
972

973
        environ = {'HTTP_AUTHORIZATION': auth_header}
1✔
974
        request = self._makeOne(environ=environ)
1✔
975

976
        user_id_x, password_x = request._authUserPW()
1✔
977

978
        self.assertEqual(user_id_x, user_id)
1✔
979
        self.assertEqual(password_x, password)
1✔
980

981
    def test__authUserPW_non_ascii(self):
1✔
982
        user_id = 'usèr'
1✔
983
        password = 'pàssword'
1✔
984
        auth_header = basic_auth_encode(user_id, password)
1✔
985

986
        environ = {'HTTP_AUTHORIZATION': auth_header}
1✔
987
        request = self._makeOne(environ=environ)
1✔
988

989
        user_id_x, password_x = request._authUserPW()
1✔
990

991
        self.assertEqual(user_id_x, user_id)
1✔
992
        self.assertEqual(password_x, password)
1✔
993

994
    def test_debug_not_in_qs_still_gets_attr(self):
1✔
995
        from zope.publisher.base import DebugFlags
1✔
996

997
        # when accessing request.debug we will see the DebugFlags instance
998
        request = self._makeOne()
1✔
999
        self.assertIsInstance(request.debug, DebugFlags)
1✔
1000
        # It won't be available through dictonary lookup, though
1001
        self.assertIsNone(request.get('debug'))
1✔
1002

1003
    def test_debug_in_qs_gets_form_var(self):
1✔
1004
        env = {'QUERY_STRING': 'debug=1'}
1✔
1005

1006
        # request.debug will actually yield a 'debug' form variable
1007
        # if it exists
1008
        request = self._makeOne(environ=env)
1✔
1009
        request.processInputs()
1✔
1010
        self.assertEqual(request.debug, '1')
1✔
1011
        self.assertEqual(request.get('debug'), '1')
1✔
1012
        self.assertEqual(request['debug'], '1')
1✔
1013

1014
        # we can still override request.debug with a form variable or directly
1015

1016
    def test_debug_override_via_form_other(self):
1✔
1017
        request = self._makeOne()
1✔
1018
        request.processInputs()
1✔
1019
        request.form['debug'] = '1'
1✔
1020
        self.assertEqual(request.debug, '1')
1✔
1021
        request['debug'] = '2'
1✔
1022
        self.assertEqual(request.debug, '2')
1✔
1023

1024
    def test_locale_property_accessor(self):
1✔
1025
        from ZPublisher.HTTPRequest import _marker
1✔
1026

1027
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1028
                       IUserPreferredLanguages)
1029

1030
        env = {'HTTP_ACCEPT_LANGUAGE': 'en'}
1✔
1031
        request = self._makeOne(environ=env)
1✔
1032

1033
        # before accessing request.locale for the first time, request._locale
1034
        # is still a marker
1035
        self.assertIs(request._locale, _marker)
1✔
1036

1037
        # when accessing request.locale we will see an ILocale
1038
        self.assertTrue(ILocale.providedBy(request.locale))
1✔
1039

1040
        # and request._locale has been set
1041
        self.assertIs(request._locale, request.locale)
1✔
1042

1043
        # It won't be available through dictonary lookup, though
1044
        self.assertIsNone(request.get('locale'))
1✔
1045

1046
    def test_locale_in_qs(self):
1✔
1047
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1048
                       IUserPreferredLanguages)
1049

1050
        # request.locale will actually yield a 'locale' form variable
1051
        # if it exists
1052
        env = {'HTTP_ACCEPT_LANGUAGE': 'en', 'QUERY_STRING': 'locale=1'}
1✔
1053
        request = self._makeOne(environ=env)
1✔
1054
        request.processInputs()
1✔
1055

1056
        self.assertEqual(request.locale, '1')
1✔
1057
        self.assertEqual(request.get('locale'), '1')
1✔
1058
        self.assertEqual(request['locale'], '1')
1✔
1059

1060
    def test_locale_property_override_via_form_other(self):
1✔
1061
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1062
                       IUserPreferredLanguages)
1063
        env = {'HTTP_ACCEPT_LANGUAGE': 'en'}
1✔
1064

1065
        # we can still override request.locale with a form variable
1066
        request = self._makeOne(environ=env)
1✔
1067
        request.processInputs()
1✔
1068

1069
        self.assertTrue(ILocale.providedBy(request.locale))
1✔
1070

1071
        request.form['locale'] = '1'
1✔
1072
        self.assertEqual(request.locale, '1')
1✔
1073

1074
        request['locale'] = '2'
1✔
1075
        self.assertEqual(request.locale, '2')
1✔
1076

1077
    def test_locale_semantics(self):
1✔
1078
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1079
                       IUserPreferredLanguages)
1080
        env_ = {'HTTP_ACCEPT_LANGUAGE': 'en'}
1✔
1081

1082
        # we should also test the correct semantics of the locale
1083
        for httplang in ('it', 'it-ch', 'it-CH', 'IT', 'IT-CH', 'IT-ch'):
1✔
1084
            env = env_.copy()
1✔
1085
            env['HTTP_ACCEPT_LANGUAGE'] = httplang
1✔
1086
            request = self._makeOne(environ=env)
1✔
1087
            locale = request.locale
1✔
1088
            self.assertTrue(ILocale.providedBy(locale))
1✔
1089
            parts = httplang.split('-')
1✔
1090
            lang = parts.pop(0).lower()
1✔
1091
            territory = variant = None
1✔
1092
            if parts:
1✔
1093
                territory = parts.pop(0).upper()
1✔
1094
            if parts:
1!
1095
                variant = parts.pop(0).upper()
×
1096
            self.assertEqual(locale.id.language, lang)
1✔
1097
            self.assertEqual(locale.id.territory, territory)
1✔
1098
            self.assertEqual(locale.id.variant, variant)
1✔
1099

1100
    def test_locale_fallback(self):
1✔
1101
        provideAdapter(BrowserLanguages, [IHTTPRequest],
1✔
1102
                       IUserPreferredLanguages)
1103

1104
        env = {'HTTP_ACCEPT_LANGUAGE': 'xx'}
1✔
1105

1106
        # Now test for non-existant locale fallback
1107
        request = self._makeOne(environ=env)
1✔
1108
        locale = request.locale
1✔
1109

1110
        self.assertTrue(ILocale.providedBy(locale))
1✔
1111
        self.assertIsNone(locale.id.language)
1✔
1112
        self.assertIsNone(locale.id.territory)
1✔
1113
        self.assertIsNone(locale.id.variant)
1✔
1114

1115
    def test_method_GET(self):
1✔
1116
        env = {'REQUEST_METHOD': 'GET'}
1✔
1117
        request = self._makeOne(environ=env)
1✔
1118
        self.assertEqual(request.method, 'GET')
1✔
1119

1120
    def test_method_POST(self):
1✔
1121
        env = {'REQUEST_METHOD': 'POST'}
1✔
1122
        request = self._makeOne(environ=env)
1✔
1123
        self.assertEqual(request.method, 'POST')
1✔
1124

1125
    def test_getClientAddr_wo_trusted_proxy(self):
1✔
1126
        env = {'REMOTE_ADDR': '127.0.0.1',
1✔
1127
               'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'}
1128
        request = self._makeOne(environ=env)
1✔
1129
        self.assertEqual(request.getClientAddr(), '127.0.0.1')
1✔
1130

1131
    def test_getClientAddr_one_trusted_proxy(self):
1✔
1132
        from ZPublisher.HTTPRequest import trusted_proxies
1✔
1133
        env = {'REMOTE_ADDR': '127.0.0.1',
1✔
1134
               'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'}
1135

1136
        orig = trusted_proxies[:]
1✔
1137
        try:
1✔
1138
            trusted_proxies.append('127.0.0.1')
1✔
1139
            request = self._makeOne(environ=env)
1✔
1140
            self.assertEqual(request.getClientAddr(), '192.168.1.100')
1✔
1141
        finally:
1142
            trusted_proxies[:] = orig
1✔
1143

1144
    def test_getClientAddr_trusted_proxy_last(self):
1✔
1145
        from ZPublisher.HTTPRequest import trusted_proxies
1✔
1146
        env = {'REMOTE_ADDR': '192.168.1.100',
1✔
1147
               'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'}
1148

1149
        orig = trusted_proxies[:]
1✔
1150
        try:
1✔
1151
            trusted_proxies.append('192.168.1.100')
1✔
1152
            request = self._makeOne(environ=env)
1✔
1153
            self.assertEqual(request.getClientAddr(), '10.1.20.30')
1✔
1154
        finally:
1155
            trusted_proxies[:] = orig
1✔
1156

1157
    def test_getClientAddr_trusted_proxy_no_REMOTE_ADDR(self):
1✔
1158
        from ZPublisher.HTTPRequest import trusted_proxies
1✔
1159
        env = {'HTTP_X_FORWARDED_FOR': '10.1.20.30, 192.168.1.100'}
1✔
1160

1161
        orig = trusted_proxies[:]
1✔
1162
        try:
1✔
1163
            trusted_proxies.append('192.168.1.100')
1✔
1164
            request = self._makeOne(environ=env)
1✔
1165
            self.assertEqual(request.getClientAddr(), '')
1✔
1166
        finally:
1167
            trusted_proxies[:] = orig
1✔
1168

1169
    def test_getHeader_exact(self):
1✔
1170
        environ = self._makePostEnviron()
1✔
1171
        request = self._makeOne(environ=environ)
1✔
1172
        self.assertEqual(request.getHeader('content-type'),
1✔
1173
                         'multipart/form-data; boundary=12345')
1174

1175
    def test_getHeader_case_insensitive(self):
1✔
1176
        environ = self._makePostEnviron()
1✔
1177
        request = self._makeOne(environ=environ)
1✔
1178
        self.assertEqual(request.getHeader('Content-Type'),
1✔
1179
                         'multipart/form-data; boundary=12345')
1180

1181
    def test_getHeader_underscore_is_dash(self):
1✔
1182
        environ = self._makePostEnviron()
1✔
1183
        request = self._makeOne(environ=environ)
1✔
1184
        self.assertEqual(request.getHeader('content_type'),
1✔
1185
                         'multipart/form-data; boundary=12345')
1186

1187
    def test_getHeader_literal_turns_off_case_normalization(self):
1✔
1188
        environ = self._makePostEnviron()
1✔
1189
        request = self._makeOne(environ=environ)
1✔
1190
        self.assertEqual(request.getHeader('Content-Type', literal=True), None)
1✔
1191

1192
    def test_getHeader_nonesuch(self):
1✔
1193
        environ = self._makePostEnviron()
1✔
1194
        request = self._makeOne(environ=environ)
1✔
1195
        self.assertEqual(request.getHeader('none-such'), None)
1✔
1196

1197
    def test_getHeader_nonesuch_with_default(self):
1✔
1198
        environ = self._makePostEnviron()
1✔
1199
        request = self._makeOne(environ=environ)
1✔
1200
        self.assertEqual(request.getHeader('Not-existant', default='Whatever'),
1✔
1201
                         'Whatever')
1202

1203
    def test_clone_updates_method_to_GET(self):
1✔
1204
        request = self._makeOne(environ={'REQUEST_METHOD': 'POST'})
1✔
1205
        request['PARENTS'] = [object()]
1✔
1206
        clone = request.clone()
1✔
1207
        self.assertEqual(clone.method, 'GET')
1✔
1208

1209
    def test_clone_keeps_preserves__auth(self):
1✔
1210
        request = self._makeOne()
1✔
1211
        request['PARENTS'] = [object()]
1✔
1212
        request._auth = 'foobar'
1✔
1213
        clone = request.clone()
1✔
1214
        self.assertEqual(clone._auth, 'foobar')
1✔
1215

1216
    def test_clone_doesnt_re_clean_environ(self):
1✔
1217
        request = self._makeOne()
1✔
1218
        request.environ['HTTP_CGI_AUTHORIZATION'] = 'lalalala'
1✔
1219
        request['PARENTS'] = [object()]
1✔
1220
        clone = request.clone()
1✔
1221
        self.assertEqual(clone.environ['HTTP_CGI_AUTHORIZATION'], 'lalalala')
1✔
1222

1223
    def test_clone_keeps_only_last_PARENT(self):
1✔
1224
        PARENTS = [object(), object()]
1✔
1225
        request = self._makeOne()
1✔
1226
        request['PARENTS'] = PARENTS
1✔
1227
        clone = request.clone()
1✔
1228
        self.assertEqual(clone['PARENTS'], PARENTS[1:])
1✔
1229

1230
    def test_clone_preserves_response_class(self):
1✔
1231
        class DummyResponse:
1✔
1232
            pass
1✔
1233
        environ = self._makePostEnviron()
1✔
1234
        request = self._makeOne(None, environ, DummyResponse())
1✔
1235
        request['PARENTS'] = [object()]
1✔
1236
        clone = request.clone()
1✔
1237
        self.assertIsInstance(clone.response, DummyResponse)
1✔
1238

1239
    def test_clone_preserves_request_subclass(self):
1✔
1240
        class SubRequest(self._getTargetClass()):
1✔
1241
            pass
1✔
1242
        environ = self._makePostEnviron()
1✔
1243
        request = SubRequest(None, environ, None)
1✔
1244
        request['PARENTS'] = [object()]
1✔
1245
        clone = request.clone()
1✔
1246
        self.assertIsInstance(clone, SubRequest)
1✔
1247

1248
    def test_clone_preserves_direct_interfaces(self):
1✔
1249
        from zope.interface import Interface
1✔
1250
        from zope.interface import directlyProvides
1✔
1251

1252
        class IFoo(Interface):
1✔
1253
            pass
1✔
1254
        request = self._makeOne()
1✔
1255
        request['PARENTS'] = [object()]
1✔
1256
        directlyProvides(request, IFoo)
1✔
1257
        clone = request.clone()
1✔
1258
        self.assertTrue(IFoo.providedBy(clone))
1✔
1259

1260
    def test_resolve_url_doesnt_send_endrequestevent(self):
1✔
1261
        # The following imports are necessary:
1262
        #  They happen implicitely in `request.resolve_url`
1263
        #  They creates `zope.schema` events
1264
        # Doing them here avoids those unrelated events
1265
        import OFS.PropertyManager  # noqa: F401
1✔
1266
        import OFS.SimpleItem  # noqa: F401
1✔
1267
        #
1268
        import zope.event
1✔
1269
        events = []
1✔
1270
        zope.event.subscribers.append(events.append)
1✔
1271
        request = self._makeOne()
1✔
1272
        request['PARENTS'] = [object()]
1✔
1273
        try:
1✔
1274
            request.resolve_url(request.script + '/')
1✔
1275
        finally:
1276
            zope.event.subscribers.remove(events.append)
1✔
1277
        self.assertFalse(
1✔
1278
            len(events),
1279
            "HTTPRequest.resolve_url should not emit events")
1280

1281
    def test_resolve_url_errorhandling(self):
1✔
1282
        # Check that resolve_url really raises the same error
1283
        # it received from ZPublisher.BaseRequest.traverse
1284
        request = self._makeOne()
1✔
1285
        request['PARENTS'] = [object()]
1✔
1286
        self.assertRaises(
1✔
1287
            NotFound, request.resolve_url, request.script + '/does_not_exist')
1288

1289
    def test_parses_json_cookies(self):
1✔
1290
        # https://bugs.launchpad.net/zope2/+bug/563229
1291
        # reports cookies in the wild with embedded double quotes (e.g,
1292
        # JSON-encoded data structures.
1293
        env = {
1✔
1294
            'SERVER_NAME': 'testingharnas',
1295
            'SERVER_PORT': '80',
1296
            'HTTP_COOKIE': 'json={"intkey":123,"stringkey":"blah"}; '
1297
                           'anothercookie=boring; baz'
1298
        }
1299
        req = self._makeOne(environ=env)
1✔
1300
        self.assertEqual(req.cookies['json'],
1✔
1301
                         '{"intkey":123,"stringkey":"blah"}')
1302
        self.assertEqual(req.cookies['anothercookie'], 'boring')
1✔
1303

1304
    def test_getVirtualRoot(self):
1✔
1305
        # https://bugs.launchpad.net/zope2/+bug/193122
1306
        req = self._makeOne()
1✔
1307

1308
        req._script = []
1✔
1309
        self.assertEqual(req.getVirtualRoot(), '')
1✔
1310

1311
        req._script = ['foo', 'bar']
1✔
1312
        self.assertEqual(req.getVirtualRoot(), '/foo/bar')
1✔
1313

1314
    def test__str__returns_native_string(self):
1✔
1315
        r = self._makeOne()
1✔
1316
        self.assertIsInstance(str(r), str)
1✔
1317

1318
    def test___str____password_field(self):
1✔
1319
        # It obscures password fields.
1320
        req = self._makeOne()
1✔
1321
        req.form['passwd'] = 'secret'
1✔
1322

1323
        self.assertNotIn('secret', str(req))
1✔
1324
        self.assertIn('password obscured', str(req))
1✔
1325

1326
    def test_text__password_field(self):
1✔
1327
        # It obscures password fields.
1328
        req = self._makeOne()
1✔
1329
        req.form['passwd'] = 'secret'
1✔
1330

1331
        self.assertNotIn('secret', req.text())
1✔
1332
        self.assertIn('password obscured', req.text())
1✔
1333

1334
    _xmlrpc_call = b"""<?xml version="1.0"?>
1✔
1335
    <methodCall>
1336
      <methodName>examples.getStateName</methodName>
1337
      <params>
1338
         <param>
1339
            <value><i4>41</i4></value>
1340
            </param>
1341
         </params>
1342
      </methodCall>
1343
    """
1344

1345
    def test_processInputs_xmlrpc_with_args(self):
1✔
1346
        req = self._makeOne(
1✔
1347
            stdin=BytesIO(self._xmlrpc_call),
1348
            environ=dict(REQUEST_METHOD="POST", CONTENT_TYPE="text/xml"))
1349
        req.processInputs()
1✔
1350
        self.assertTrue(is_xmlrpc_response(req.response))
1✔
1351
        self.assertEqual(req.args, (41,))
1✔
1352
        self.assertEqual(req.other["PATH_INFO"], "/examples/getStateName")
1✔
1353

1354
    def test_processInputs_xmlrpc_controlled_allowed(self):
1✔
1355
        req = self._makeOne(
1✔
1356
            stdin=BytesIO(self._xmlrpc_call),
1357
            environ=dict(REQUEST_METHOD="POST", CONTENT_TYPE="text/xml"))
1358
        with self._xmlrpc_control(lambda request: True):
1✔
1359
            req.processInputs()
1✔
1360
        self.assertTrue(is_xmlrpc_response(req.response))
1✔
1361

1362
    def test_processInputs_xmlrpc_controlled_disallowed(self):
1✔
1363
        req = self._makeOne(
1✔
1364
            environ=dict(REQUEST_METHOD="POST", CONTENT_TYPE="text/xml"))
1365
        with self._xmlrpc_control(lambda request: False):
1✔
1366
            req.processInputs()
1✔
1367
        self.assertFalse(is_xmlrpc_response(req.response))
1✔
1368

1369
    @contextmanager
1✔
1370
    def _xmlrpc_control(self, allow):
1✔
1371
        gsm = getGlobalSiteManager()
1✔
1372
        gsm.registerUtility(allow, IXmlrpcChecker)
1✔
1373
        yield
1✔
1374
        gsm.unregisterUtility(allow, IXmlrpcChecker)
1✔
1375

1376
    def test_url_scheme(self):
1✔
1377
        # The default is http
1378
        env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 80}
1✔
1379
        req = self._makeOne(environ=env)
1✔
1380
        self.assertEqual(req['SERVER_URL'], 'http://myhost')
1✔
1381

1382
        # If we bang a SERVER_URL into the environment it is retained
1383
        env = {'SERVER_URL': 'https://anotherserver:8443'}
1✔
1384
        req = self._makeOne(environ=env)
1✔
1385
        self.assertEqual(req['SERVER_URL'], 'https://anotherserver:8443')
1✔
1386

1387
        # Now go through the various environment values that signal
1388
        # a request uses the https URL scheme
1389
        for val in ('on', 'ON', '1'):
1✔
1390
            env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 443, 'HTTPS': val}
1✔
1391
            req = self._makeOne(environ=env)
1✔
1392
            self.assertEqual(req['SERVER_URL'], 'https://myhost')
1✔
1393

1394
        env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 443,
1✔
1395
               'SERVER_PORT_SECURE': 1}
1396
        req = self._makeOne(environ=env)
1✔
1397
        self.assertEqual(req['SERVER_URL'], 'https://myhost')
1✔
1398

1399
        env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 443,
1✔
1400
               'REQUEST_SCHEME': 'HTTPS'}
1401
        req = self._makeOne(environ=env)
1✔
1402
        self.assertEqual(req['SERVER_URL'], 'https://myhost')
1✔
1403

1404
        env = {'SERVER_NAME': 'myhost', 'SERVER_PORT': 443,
1✔
1405
               'wsgi.url_scheme': 'https'}
1406
        req = self._makeOne(environ=env)
1✔
1407
        self.assertEqual(req['SERVER_URL'], 'https://myhost')
1✔
1408

1409
    def test_form_urlencoded(self):
1✔
1410
        body = b"a=1"
1✔
1411
        env = self._makePostEnviron(body, False)
1✔
1412
        req = self._makeOne(stdin=BytesIO(body), environ=env)
1✔
1413
        req.processInputs()
1✔
1414
        self.assertEqual(req.form["a"], "1")
1✔
1415
        req = self._makeOne(stdin=BytesIO(body), environ=env)
1✔
1416
        with patch("ZPublisher.HTTPRequest.FORM_MEMORY_LIMIT", 1):
1✔
1417
            with self.assertRaises(BadRequest):
1✔
1418
                req.processInputs()
1✔
1419

1420
    def test_bytes_converter(self):
1✔
1421
        val = "äöü".encode("latin-1")
1✔
1422
        body = b"a:bytes:latin-1=" + val
1✔
1423
        env = self._makePostEnviron(body, False)
1✔
1424
        req = self._makeOne(stdin=BytesIO(body), environ=env)
1✔
1425
        req.processInputs()
1✔
1426
        self.assertEqual(req.form["a"], val)
1✔
1427

1428
    def test_get_with_body_and_query_string_ignores_body(self):
1✔
1429
        req_factory = self._getTargetClass()
1✔
1430
        req = req_factory(
1✔
1431
            BytesIO(b"foo"),
1432
            {
1433
                "SERVER_NAME": "localhost",
1434
                "SERVER_PORT": "8080",
1435
                "REQUEST_METHOD": "GET",
1436
                "QUERY_STRING": "bar"
1437
            },
1438
            None,
1439
        )
1440
        req.processInputs()
1✔
1441
        self.assertDictEqual(req.form, {"bar": ""})
1✔
1442

1443
    def test_put_with_body_and_query_string(self):
1✔
1444
        req_factory = self._getTargetClass()
1✔
1445
        req = req_factory(
1✔
1446
            BytesIO(b"foo"),
1447
            {
1448
                "SERVER_NAME": "localhost",
1449
                "SERVER_PORT": "8080",
1450
                "REQUEST_METHOD": "PUT",
1451
                "QUERY_STRING": "bar=bar"
1452
            },
1453
            None,
1454
        )
1455
        req.processInputs()
1✔
1456
        self.assertEqual(req.BODY, b"foo")
1✔
1457
        self.assertEqual(req.form["bar"], "bar")
1✔
1458

1459
    def test_put_with_body_limit(self):
1✔
1460
        req_factory = self._getTargetClass()
1✔
1461
        req = req_factory(
1✔
1462
            BytesIO(b"foo"),
1463
            {
1464
                "SERVER_NAME": "localhost",
1465
                "SERVER_PORT": "8080",
1466
                "REQUEST_METHOD": "PUT",
1467
            },
1468
            None,
1469
        )
1470
        from ZPublisher import HTTPRequest
1✔
1471
        saved_form_memory_limit = HTTPRequest.FORM_MEMORY_LIMIT
1✔
1472
        HTTPRequest.FORM_MEMORY_LIMIT = 1
1✔
1473
        try:
1✔
1474
            req.processInputs()
1✔
1475
            with self.assertRaises(BadRequest):
1✔
1476
                req.BODY
1✔
1477
        finally:
1478
            HTTPRequest.FORM_MEMORY_LIMIT = saved_form_memory_limit
1✔
1479

1480
    def test_issue_1095(self):
1✔
1481
        body = TEST_ISSUE_1095_DATA
1✔
1482
        env = self._makePostEnviron(body)
1✔
1483
        req = self._makeOne(BytesIO(body), env)
1✔
1484
        req.processInputs()
1✔
1485
        r = req["r"]
1✔
1486
        self.assertEqual(len(r), 2)
1✔
1487
        self.assertIsInstance(r[0].x, FileUpload)
1✔
1488
        self.assertIsInstance(r[1].x, str)
1✔
1489
        r = req.taintedform["r"]
1✔
1490
        self.assertIsInstance(r[0].x, FileUpload)
1✔
1491
        self.assertIsInstance(r[1].x, TaintedString)
1✔
1492

1493
    def test_field_charset(self):
1✔
1494
        body = TEST_FIELD_CHARSET_DATA
1✔
1495
        env = self._makePostEnviron(body)
1✔
1496
        env["QUERY_STRING"] = "y=" + quote_plus("äöü")
1✔
1497
        req = self._makeOne(BytesIO(body), env)
1✔
1498
        req.processInputs()
1✔
1499
        self.assertEqual(req["x"], "äöü")
1✔
1500
        self.assertEqual(req["y"], "äöü")
1✔
1501

1502
    def test_form_charset(self):
1✔
1503
        body = ("x=" + quote_plus("äöü", encoding="latin-1")).encode("ASCII")
1✔
1504
        env = self._makePostEnviron(body)
1✔
1505
        env["CONTENT_TYPE"] = \
1✔
1506
            "application/x-www-form-urlencoded; charset=latin-1"
1507
        env["QUERY_STRING"] = "y=" + quote_plus("äöü")
1✔
1508
        req = self._makeOne(BytesIO(body), env)
1✔
1509
        req.processInputs()
1✔
1510
        self.assertEqual(req["x"], "äöü")
1✔
1511
        self.assertEqual(req["y"], "äöü")
1✔
1512

1513

1514
class TestHTTPRequestZope3Views(TestRequestViewsBase):
1✔
1515

1516
    def _makeOne(self, root):
1✔
1517
        from zope.interface import directlyProvides
1✔
1518
        from zope.publisher.browser import IDefaultBrowserLayer
1✔
1519
        request = HTTPRequestFactoryMixin()._makeOne()
1✔
1520
        request['PARENTS'] = [root]
1✔
1521
        # The request needs to implement the proper interface
1522
        directlyProvides(request, IDefaultBrowserLayer)
1✔
1523
        return request
1✔
1524

1525
    def test_no_traversal_of_view_request_attribute(self):
1✔
1526
        # make sure views don't accidentally publish the 'request' attribute
1527
        root, _ = self._makeRootAndFolder()
1✔
1528

1529
        # make sure the view itself is traversable:
1530
        view = self._makeOne(root).traverse('folder/@@meth')
1✔
1531
        from ZPublisher.HTTPRequest import HTTPRequest
1✔
1532
        self.assertEqual(view.request.__class__, HTTPRequest,)
1✔
1533

1534
        # but not the request:
1535
        self.assertRaises(
1✔
1536
            NotFound,
1537
            self._makeOne(root).traverse, 'folder/@@meth/request'
1538
        )
1539

1540

1541
class TestSearchType(unittest.TestCase):
1✔
1542
    """Test `ZPublisher.HTTPRequest.search_type`
1543

1544
    see "https://github.com/zopefoundation/Zope/pull/512"
1545
    """
1546

1547
    def check(self, val, expect):
1✔
1548
        mo = search_type(val)
1✔
1549
        if expect is None:
1✔
1550
            self.assertIsNone(mo)
1✔
1551
        else:
1552
            self.assertIsNotNone(mo)
1✔
1553
            self.assertEqual(mo.group(), expect)
1✔
1554

1555
    def test_image_control(self):
1✔
1556
        self.check("abc.x", ".x")
1✔
1557
        self.check("abc.y", ".y")
1✔
1558
        self.check("abc.xy", None)
1✔
1559

1560
    def test_type(self):
1✔
1561
        self.check("abc:int", ":int")
1✔
1562

1563
    def test_leftmost(self):
1✔
1564
        self.check("abc:int:record", ":record")
1✔
1565

1566
    def test_special(self):
1✔
1567
        self.check("abc:a-_0b", ":a-_0b")
1✔
1568

1569

1570
class _Unseekable:
1✔
1571
    """Auxiliary class emulating an unseekable file like object."""
1572

1573
    def __init__(self, file):
1✔
1574
        for m in ("read", "readline", "close", "__del__"):
1✔
1575
            setattr(self, m, getattr(file, m))
1✔
1576

1577

1578
TEST_POST_ENVIRON = {
1✔
1579
    'CONTENT_LENGTH': None,
1580
    'REQUEST_METHOD': 'POST',
1581
    'SERVER_NAME': 'localhost',
1582
    'SERVER_PORT': '80',
1583
}
1584

1585
TEST_FILE_DATA = b'''
1✔
1586
--12345
1587
Content-Disposition: form-data; name="smallfile"; filename="smallfile"
1588
Content-Type: application/octet-stream
1589

1590
test
1591

1592
--12345--
1593
'''
1594

1595
TEST_FILE_DATA_UNSPECIFIED = b'''
1✔
1596
--12345
1597
Content-Disposition: form-data; name="smallfile"; filename=""
1598
Content-Type: application/octet-stream
1599

1600
--12345--
1601
'''
1602

1603
TEST_LARGEFILE_DATA = b'''
1✔
1604
--12345
1605
Content-Disposition: form-data; name="largefile"; filename="largefile"
1606
Content-Type: application/octet-stream
1607

1608
test %s
1609

1610
--12345--
1611
''' % (b'test' * 1000)
1612

1613
TEST_ISSUE_1095_DATA = b'''
1✔
1614
--12345
1615
Content-Disposition: form-data; name="r.x:records"; filename="fn"
1616
Content-Type: application/octet-stream
1617

1618
test
1619

1620
--12345
1621
Content-Disposition: form-data; name="r.x:records"
1622
Content-Type: text/html
1623

1624
<body>abc</body>
1625

1626
--12345--
1627
'''
1628

1629
TEST_FIELD_CHARSET_DATA = b'''
1✔
1630
--12345
1631
Content-Disposition: form-data; name="x"
1632
Content-Type: text/plain; charset=latin-1
1633

1634
%s
1635
--12345--
1636
''' % 'äöü'.encode("latin-1")
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc