• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 3956162881

pending completion
3956162881

push

github

Michael Howitz
Update to deprecation warning free releases.

4401 of 7036 branches covered (62.55%)

Branch coverage included in aggregate %.

27161 of 31488 relevant lines covered (86.26%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.33
/src/OFS/tests/testRanges.py
1
##############################################################################
2
#
3
# Copyright (c) 2002-2009 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE
12
#
13
##############################################################################
14
import unittest
1✔
15

16

17
def makeConnection():
1✔
18
    import ZODB
1✔
19
    from ZODB.DemoStorage import DemoStorage
1✔
20

21
    s = DemoStorage()
1✔
22
    return ZODB.DB(s).open()
1✔
23

24

25
def createBigFile():
1✔
26
    # Create a file that is several 1<<16 blocks of data big, to force the
27
    # use of chained Pdata objects.
28
    # Make sure we create a file that isn't of x * 1<<16 length! Coll #671
29
    import io
1✔
30
    import random
1✔
31
    import string
1✔
32
    size = (1 << 16) * 5 + 12345
1✔
33
    file = io.BytesIO()
1✔
34

35
    for byte in range(size):
1✔
36
        letter = random.choice(string.ascii_letters)
1✔
37
        file.write(letter.encode('utf-8'))
1✔
38

39
    return file
1✔
40

41

42
TESTFOLDER_NAME = 'RangesTestSuite_testFolder'
1✔
43
BIGFILE = createBigFile()
1✔
44

45

46
class TestRequestRange(unittest.TestCase):
1✔
47

48
    # Test case setup and teardown
49
    def setUp(self):
1✔
50
        import io
1✔
51
        import string
1✔
52

53
        import transaction
1✔
54
        from OFS.Application import Application
1✔
55
        from OFS.Folder import manage_addFolder
1✔
56
        from OFS.Image import manage_addFile
1✔
57
        from Testing.makerequest import makerequest
1✔
58
        self.responseOut = io.BytesIO()
1✔
59
        self.connection = makeConnection()
1✔
60
        try:
1✔
61
            r = self.connection.root()
1✔
62
            a = Application()
1✔
63
            r['Application'] = a
1✔
64
            self.root = a
1✔
65
            self.app = makerequest(self.root, stdout=self.responseOut)
1✔
66
            try:
1✔
67
                self.app._delObject(TESTFOLDER_NAME)
1✔
68
            except AttributeError:
1✔
69
                pass
1✔
70
            manage_addFolder(self.app, TESTFOLDER_NAME)
1✔
71
            folder = getattr(self.app, TESTFOLDER_NAME)
1✔
72

73
            data = string.ascii_letters.encode('ascii')
1✔
74
            manage_addFile(
1✔
75
                folder, 'file', file=data, content_type='text/plain')
76

77
            self.file = folder.file
1✔
78
            self.data = data
1✔
79

80
            # Hack, we need a _p_mtime for the file, so we make sure that it
81
            # has one. We use a subtransaction, which means we can rollback
82
            # later and pretend we didn't touch the ZODB.
83
            transaction.commit()
1✔
84
        except Exception:
×
85
            self.connection.close()
×
86
            raise
×
87

88
    def tearDown(self):
1✔
89
        import transaction
1✔
90
        try:
1✔
91
            self.app._delObject(TESTFOLDER_NAME)
1✔
92
        except AttributeError:
×
93
            pass
×
94
        transaction.abort()
1✔
95
        self.app._p_jar.sync()
1✔
96
        self.connection.close()
1✔
97
        self.app = None
1✔
98
        del self.app
1✔
99

100
    # Utility methods
101
    def uploadBigFile(self):
1✔
102
        self.file.manage_upload(BIGFILE)
1✔
103
        self.data = BIGFILE.getvalue()
1✔
104

105
    def doGET(self, request, response):
1✔
106
        rv = self.file.index_html(request, response)
1✔
107

108
        # Large files are written to resposeOut directly, small ones are
109
        # returned from the index_html method.
110
        body = self.responseOut.getvalue()
1✔
111

112
        # Chop off any printed headers (only when response.write was used)
113
        if body:
1✔
114
            body = body.split(b'\r\n\r\n', 1)[1]
1✔
115

116
        return body + rv
1✔
117

118
    def createLastModifiedDate(self, offset=0):
1✔
119
        from zope.datetime import rfc1123_date
1✔
120
        return rfc1123_date(self.file._p_mtime + offset)
1✔
121

122
    def expectUnsatisfiable(self, range):
1✔
123
        req = self.app.REQUEST
1✔
124
        rsp = req.RESPONSE
1✔
125

126
        # Add the Range header
127
        req.environ['HTTP_RANGE'] = 'bytes=%s' % range
1✔
128

129
        body = self.doGET(req, rsp)
1✔
130

131
        self.assertTrue(rsp.getStatus() == 416)
1✔
132

133
        expect_content_range = 'bytes */%d' % len(self.data)
1✔
134
        content_range = rsp.getHeader('content-range')
1✔
135
        self.assertFalse(content_range is None)
1✔
136
        self.assertEqual(content_range, expect_content_range)
1✔
137
        self.assertEqual(body, b'')
1✔
138

139
    def expectOK(self, rangeHeader, if_range=None):
1✔
140
        req = self.app.REQUEST
1✔
141
        rsp = req.RESPONSE
1✔
142

143
        # Add headers
144
        req.environ['HTTP_RANGE'] = rangeHeader
1✔
145
        if if_range is not None:
1✔
146
            req.environ['HTTP_IF_RANGE'] = if_range
1✔
147

148
        self.doGET(req, rsp)
1✔
149
        self.assertEqual(rsp.getStatus(), 200)
1✔
150

151
    def expectSingleRange(self, range, start, end, if_range=None):
1✔
152
        req = self.app.REQUEST
1✔
153
        rsp = req.RESPONSE
1✔
154

155
        # Add headers
156
        req.environ['HTTP_RANGE'] = 'bytes=%s' % range
1✔
157
        if if_range is not None:
1✔
158
            req.environ['HTTP_IF_RANGE'] = if_range
1✔
159

160
        body = self.doGET(req, rsp)
1✔
161
        self.assertEqual(rsp.getStatus(), 206)
1✔
162

163
        expect_content_range = 'bytes %d-%d/%d' % (
1✔
164
            start, end - 1, len(self.data))
165
        content_range = rsp.getHeader('content-range')
1✔
166
        self.assertFalse(content_range is None)
1✔
167
        self.assertEqual(content_range, expect_content_range)
1✔
168
        self.assertEqual(rsp.getHeader('content-length'), str(len(body)))
1✔
169
        self.assertEqual(body, self.data[start:end])
1✔
170

171
    def expectMultipleRanges(self, range, sets, draft=0):
1✔
172
        import email
1✔
173
        import io
1✔
174
        import re
1✔
175
        rangeParse = re.compile(r'bytes\s*(\d+)-(\d+)/(\d+)')
1✔
176
        req = self.app.REQUEST
1✔
177
        rsp = req.RESPONSE
1✔
178

179
        # Add headers
180
        req.environ['HTTP_RANGE'] = 'bytes=%s' % range
1✔
181

182
        if draft:
1✔
183
            req.environ['HTTP_REQUEST_RANGE'] = 'bytes=%s' % range
1✔
184

185
        body = self.doGET(req, rsp)
1✔
186

187
        self.assertTrue(rsp.getStatus() == 206)
1✔
188
        self.assertFalse(rsp.getHeader('content-range'))
1✔
189

190
        ct = rsp.getHeader('content-type').split(';')[0]
1✔
191
        draftprefix = draft and 'x-' or ''
1✔
192
        self.assertEqual(ct, 'multipart/%sbyteranges' % draftprefix)
1✔
193
        if rsp.getHeader('content-length'):
1!
194
            self.assertEqual(rsp.getHeader('content-length'), str(len(body)))
1✔
195

196
        # Decode the multipart message and force a latin-1 encoding,
197
        # revert that later after the email was parsed
198
        bodyfile = io.StringIO(
1✔
199
            'Content-Type: '
200
            + rsp.getHeader('content-type')
201
            + '\n\n' + body.decode('latin-1')
202
        )
203

204
        # This needs text, hence the forced latin-1 decoding.
205
        msg = email.message_from_file(bodyfile)
1✔
206
        partmessages = [part for part in msg.walk()]
1✔
207

208
        # Check the different parts
209
        returnedRanges = []
1✔
210
        add = returnedRanges.append
1✔
211
        for part in partmessages:
1✔
212
            if part.get_content_maintype() == 'multipart':
1✔
213
                continue
1✔
214
            range = part.get('content-range')
1✔
215
            start, end, size = rangeParse.search(range).groups()
1✔
216
            start, end, size = int(start), int(end), int(size)
1✔
217
            end = end + 1
1✔
218

219
            self.assertEqual(size, len(self.data))
1✔
220
            # revert earlier fake latin-1 encoding
221
            body = part.get_payload().encode('latin-1')
1✔
222

223
            self.assertEqual(len(body), end - start)
1✔
224
            self.assertEqual(body, self.data[start:end])
1✔
225

226
            add((start, end))
1✔
227

228
        # Compare the ranges used with the expected range sets.
229
        self.assertEqual(returnedRanges, sets)
1✔
230

231
    # Unsatisfiable requests
232
    def testNegativeZero(self):
1✔
233
        self.expectUnsatisfiable('-0')
1✔
234

235
    def testStartBeyondLength(self):
1✔
236
        self.expectUnsatisfiable('1000-')
1✔
237

238
    def testMultipleUnsatisfiable(self):
1✔
239
        self.expectUnsatisfiable('1000-1001,2000-,-0')
1✔
240

241
    # Malformed Range header
242
    def testGarbage(self):
1✔
243
        self.expectOK('kjhdkjhd = ew;jkj h eewh ew')
1✔
244

245
    def testIllegalSpec(self):
1✔
246
        self.expectOK('notbytes=0-1000')
1✔
247

248
    # Single ranges
249
    def testSimpleRange(self):
1✔
250
        self.expectSingleRange('3-7', 3, 8)
1✔
251

252
    def testOpenEndedRange(self):
1✔
253
        self.expectSingleRange('3-', 3, len(self.data))
1✔
254

255
    def testSuffixRange(self):
1✔
256
        length = len(self.data)
1✔
257
        self.expectSingleRange('-3', length - 3, length)
1✔
258

259
    def testWithNegativeZero(self):
1✔
260
        # A satisfiable and an unsatisfiable range
261
        self.expectSingleRange('-0,3-23', 3, 24)
1✔
262

263
    def testEndOverflow(self):
1✔
264
        length = len(self.data)
1✔
265
        start, end = length - 10, length + 10
1✔
266
        range = '%d-%d' % (start, end)
1✔
267
        self.expectSingleRange(range, start, len(self.data))
1✔
268

269
    def testBigFile(self):
1✔
270
        # Files of size 1<<16 are stored in linked Pdata objects. They are
271
        # treated seperately in the range code.
272
        self.uploadBigFile()
1✔
273
        join = 3 * (1 << 16)  # A join between two linked objects
1✔
274
        start = join - 1000
1✔
275
        end = join + 1000
1✔
276
        range = '%d-%d' % (start, end - 1)
1✔
277
        self.expectSingleRange(range, start, end)
1✔
278

279
    def testBigFileEndOverflow(self):
1✔
280
        self.uploadBigFile()
1✔
281
        length = len(self.data)
1✔
282
        start, end = length - 100, length + 100
1✔
283
        range = '%d-%d' % (start, end)
1✔
284
        self.expectSingleRange(range, start, len(self.data))
1✔
285

286
    # Multiple ranges
287
    def testAdjacentRanges(self):
1✔
288
        self.expectMultipleRanges('21-25,10-20', [(21, 26), (10, 21)])
1✔
289

290
    def testMultipleRanges(self):
1✔
291
        self.expectMultipleRanges('3-7,10-15', [(3, 8), (10, 16)])
1✔
292

293
    def testMultipleRangesDraft(self):
1✔
294
        self.expectMultipleRanges('3-7,10-15', [(3, 8), (10, 16)], draft=1)
1✔
295

296
    def testMultipleRangesBigFile(self):
1✔
297
        self.uploadBigFile()
1✔
298
        self.expectMultipleRanges(
1✔
299
            '3-700,10-15,-10000',
300
            [(3, 701), (10, 16), (len(self.data) - 10000, len(self.data))])
301

302
    def testMultipleRangesBigFileOutOfOrder(self):
1✔
303
        self.uploadBigFile()
1✔
304
        self.expectMultipleRanges(
1✔
305
            '10-15,-10000,70000-80000',
306
            [(10, 16), (len(self.data) - 10000, len(self.data)),
307
             (70000, 80001)])
308

309
    def testMultipleRangesBigFileEndOverflow(self):
1✔
310
        self.uploadBigFile()
1✔
311
        length = len(self.data)
1✔
312
        start, end = length - 100, length + 100
1✔
313
        self.expectMultipleRanges(
1✔
314
            f'3-700,{start}-{end}',
315
            [(3, 701), (len(self.data) - 100, len(self.data))])
316

317
    # If-Range headers
318
    def testIllegalIfRange(self):
1✔
319
        # We assume that an illegal if-range is to be ignored, just like an
320
        # illegal if-modified since.
321
        self.expectSingleRange('10-25', 10, 26, if_range='garbage')
1✔
322

323
    def testEqualIfRangeDate(self):
1✔
324
        self.expectSingleRange(
1✔
325
            '10-25', 10, 26,
326
            if_range=self.createLastModifiedDate()
327
        )
328

329
    def testIsModifiedIfRangeDate(self):
1✔
330
        self.expectOK(
1✔
331
            '21-25,10-20',
332
            if_range=self.createLastModifiedDate(offset=-100)
333
        )
334

335
    def testIsNotModifiedIfRangeDate(self):
1✔
336
        self.expectSingleRange(
1✔
337
            '10-25', 10, 26,
338
            if_range=self.createLastModifiedDate(offset=100)
339
        )
340

341
    def testEqualIfRangeEtag(self):
1✔
342
        self.expectSingleRange(
1✔
343
            '10-25', 10, 26,
344
            if_range=self.file.http__etag()
345
        )
346

347
    def testNotEqualIfRangeEtag(self):
1✔
348
        self.expectOK(
1✔
349
            '10-25',
350
            if_range=self.file.http__etag() + 'bar'
351
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc