• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 3956162881

pending completion
3956162881

push

github

Michael Howitz
Update to deprecation warning free releases.

4401 of 7036 branches covered (62.55%)

Branch coverage included in aggregate %.

27161 of 31488 relevant lines covered (86.26%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.56
/src/OFS/tests/testFileAndImage.py
1
import os
1✔
2
import sys
1✔
3
import time
1✔
4
import unittest
1✔
5
from io import BytesIO
1✔
6

7
import OFS.Image
1✔
8
import Testing.testbrowser
1✔
9
import Testing.ZopeTestCase
1✔
10
import transaction
1✔
11
import Zope2
1✔
12
from Acquisition import aq_base
1✔
13
from OFS.Application import Application
1✔
14
from OFS.Cache import ZCM_MANAGERS
1✔
15
from OFS.Image import Pdata
1✔
16
from OFS.SimpleItem import SimpleItem
1✔
17
from Testing.makerequest import makerequest
1✔
18
from zExceptions import Redirect
1✔
19
from zope.component import adapter
1✔
20
from zope.datetime import rfc1123_date
1✔
21
from zope.lifecycleevent.interfaces import IObjectCreatedEvent
1✔
22
from zope.lifecycleevent.interfaces import IObjectModifiedEvent
1✔
23
from ZPublisher.HTTPRequest import HTTPRequest
1✔
24
from ZPublisher.HTTPResponse import HTTPResponse
1✔
25

26

27
here = os.path.dirname(os.path.abspath(__file__))
1✔
28
filedata = os.path.join(here, 'test.gif')
1✔
29

30
Zope2.startup_wsgi()
1✔
31

32

33
def makeConnection():
1✔
34
    import ZODB
1✔
35
    from ZODB.DemoStorage import DemoStorage
1✔
36

37
    s = DemoStorage()
1✔
38
    return ZODB.DB(s).open()
1✔
39

40

41
def aputrequest(file, content_type):
1✔
42
    resp = HTTPResponse(stdout=sys.stdout)
1✔
43
    environ = {}
1✔
44
    environ['SERVER_NAME'] = 'foo'
1✔
45
    environ['SERVER_PORT'] = '80'
1✔
46
    environ['REQUEST_METHOD'] = 'PUT'
1✔
47
    environ['CONTENT_TYPE'] = content_type
1✔
48
    req = HTTPRequest(stdin=file, environ=environ, response=resp)
1✔
49
    return req
1✔
50

51

52
class DummyCache:
1✔
53

54
    def __init__(self):
1✔
55
        self.clear()
1✔
56

57
    def ZCache_set(self, ob, data, view_name='', keywords=None,
1✔
58
                   mtime_func=None):
59
        self.set = (ob, data)
1✔
60

61
    def ZCache_get(self, ob, data, view_name='', keywords=None,
1✔
62
                   mtime_func=None):
63
        self.get = ob
1✔
64
        if self.si:
1!
65
            return self.si
×
66

67
    def ZCache_invalidate(self, ob):
1✔
68
        self.invalidated = ob
1✔
69

70
    def clear(self):
1✔
71
        self.set = None
1✔
72
        self.get = None
1✔
73
        self.invalidated = None
1✔
74
        self.si = None
1✔
75

76
    def setStreamIterator(self, si):
1✔
77
        self.si = si
×
78

79

80
ADummyCache = DummyCache()
1✔
81

82

83
class DummyCacheManager(SimpleItem):
1✔
84
    def ZCacheManager_getCache(self):
1✔
85
        return ADummyCache
1✔
86

87

88
class EventCatcher:
1✔
89

90
    def __init__(self):
1✔
91
        self.created = []
1✔
92
        self.modified = []
1✔
93
        self.setUp()
1✔
94

95
    def setUp(self):
1✔
96
        from zope.component import provideHandler
1✔
97
        provideHandler(self.handleCreated)
1✔
98
        provideHandler(self.handleModified)
1✔
99

100
    def tearDown(self):
1✔
101
        from zope.component import getSiteManager
1✔
102
        getSiteManager().unregisterHandler(self.handleCreated)
1✔
103
        getSiteManager().unregisterHandler(self.handleModified)
1✔
104

105
    def reset(self):
1✔
106
        self.created = []
1✔
107
        self.modified = []
1✔
108

109
    @adapter(IObjectCreatedEvent)
1✔
110
    def handleCreated(self, event):
1✔
111
        if isinstance(event.object, OFS.Image.File):
1!
112
            self.created.append(event)
1✔
113

114
    @adapter(IObjectModifiedEvent)
1✔
115
    def handleModified(self, event):
1✔
116
        if isinstance(event.object, OFS.Image.File):
1✔
117
            self.modified.append(event)
1✔
118

119

120
class FileTests(unittest.TestCase):
1✔
121
    content_type = 'application/octet-stream'
1✔
122
    factory = 'manage_addFile'
1✔
123

124
    def setUp(self):
1✔
125
        with open(filedata, 'rb') as fd:
1✔
126
            self.data = fd.read()
1✔
127
        self.connection = makeConnection()
1✔
128
        self.eventCatcher = EventCatcher()
1✔
129
        try:
1✔
130
            r = self.connection.root()
1✔
131
            a = Application()
1✔
132
            r['Application'] = a
1✔
133
            self.root = a
1✔
134
            responseOut = self.responseOut = BytesIO()
1✔
135
            self.app = makerequest(self.root, stdout=responseOut)
1✔
136
            self.app.dcm = DummyCacheManager()
1✔
137
            factory = getattr(self.app, self.factory)
1✔
138
            factory('file',
1✔
139
                    file=self.data, content_type=self.content_type)
140
            self.app.file.ZCacheable_setManagerId('dcm')
1✔
141
            self.app.file.ZCacheable_setEnabled(enabled=1)
1✔
142
            setattr(self.app, ZCM_MANAGERS, ('dcm',))
1✔
143
            # Hack, we need a _p_mtime for the file, so we make sure that it
144
            # has one.
145
            transaction.commit()
1✔
146
        except Exception:
×
147
            transaction.abort()
×
148
            self.connection.close()
×
149
            raise
×
150
        transaction.begin()
1✔
151
        self.file = getattr(self.app, 'file')
1✔
152

153
        # Since we do the create here, let's test the events here too
154
        self.assertEqual(1, len(self.eventCatcher.created))
1✔
155
        self.assertTrue(
1✔
156
            aq_base(self.eventCatcher.created[0].object) is aq_base(self.file))
157

158
        self.assertEqual(1, len(self.eventCatcher.modified))
1✔
159
        self.assertTrue(
1✔
160
            aq_base(self.eventCatcher.created[0].object) is aq_base(self.file))
161

162
        self.eventCatcher.reset()
1✔
163

164
    def tearDown(self):
1✔
165
        del self.file
1✔
166
        transaction.abort()
1✔
167
        self.connection.close()
1✔
168
        del self.app
1✔
169
        del self.responseOut
1✔
170
        del self.root
1✔
171
        del self.connection
1✔
172
        ADummyCache.clear()
1✔
173
        self.eventCatcher.tearDown()
1✔
174

175
    def testViewImageOrFile(self):
1✔
176
        self.assertRaises(Redirect, self.file.view_image_or_file, 'foo')
1✔
177

178
    def testUpdateData(self):
1✔
179
        self.file.update_data(b'foo')
1✔
180
        self.assertEqual(self.file.size, 3)
1✔
181
        self.assertEqual(self.file.data, b'foo')
1✔
182
        self.assertTrue(ADummyCache.invalidated)
1✔
183
        self.assertTrue(ADummyCache.set)
1✔
184

185
    def testReadData(self):
1✔
186
        s = b'a' * (2 << 16)
1✔
187
        data, size = self.file._read_data(BytesIO(s))
1✔
188
        self.assertIsInstance(data, Pdata)
1✔
189
        self.assertEqual(bytes(data), s)
1✔
190
        self.assertEqual(len(s), len(bytes(data)))
1✔
191
        self.assertEqual(len(s), size)
1✔
192

193
    def testBigPdata(self):
1✔
194
        # Test that a big enough string is split into several Pdata
195
        # From a file
196
        s = b'a' * (1 << 16) * 3
1✔
197
        data, size = self.file._read_data(BytesIO(s))
1✔
198
        self.assertNotEqual(data.next, None)
1✔
199
        # From a string
200
        data, size = self.file._read_data(s)
1✔
201
        self.assertNotEqual(data.next, None)
1✔
202

203
    def testManageEditWithFileData(self):
1✔
204
        self.file.manage_edit('foobar', 'text/plain', filedata=b'ASD')
1✔
205
        self.assertEqual(self.file.title, 'foobar')
1✔
206
        self.assertEqual(self.file.content_type, 'text/plain')
1✔
207
        self.assertTrue(ADummyCache.invalidated)
1✔
208
        self.assertTrue(ADummyCache.set)
1✔
209
        self.assertEqual(1, len(self.eventCatcher.modified))
1✔
210
        self.assertTrue(self.eventCatcher.modified[0].object is self.file)
1✔
211

212
    def testManageEditWithoutFileData(self):
1✔
213
        self.file.manage_edit('foobar', 'text/plain')
1✔
214
        self.assertEqual(self.file.title, 'foobar')
1✔
215
        self.assertEqual(self.file.content_type, 'text/plain')
1✔
216
        self.assertTrue(ADummyCache.invalidated)
1✔
217
        self.assertEqual(1, len(self.eventCatcher.modified))
1✔
218
        self.assertTrue(self.eventCatcher.modified[0].object is self.file)
1✔
219

220
    def testManageUpload(self):
1✔
221
        f = BytesIO(b'jammyjohnson')
1✔
222
        self.file.manage_upload(f)
1✔
223
        self.assertEqual(self.file.data, b'jammyjohnson')
1✔
224
        self.assertEqual(self.file.content_type, 'application/octet-stream')
1✔
225
        self.assertEqual(1, len(self.eventCatcher.modified))
1✔
226
        self.assertTrue(self.eventCatcher.modified[0].object is self.file)
1✔
227

228
    def testManageUploadWithoutFileData(self):
1✔
229
        self.file.manage_upload()
1✔
230
        self.assertEqual(0, len(self.eventCatcher.modified))
1✔
231

232
    def testIfModSince(self):
1✔
233
        now = time.time()
1✔
234
        e = {'SERVER_NAME': 'foo',
1✔
235
             'SERVER_PORT': '80',
236
             'REQUEST_METHOD': 'GET'}
237

238
        # not modified since
239
        t_notmod = rfc1123_date(now)
1✔
240
        e['HTTP_IF_MODIFIED_SINCE'] = t_notmod
1✔
241
        out = BytesIO()
1✔
242
        resp = HTTPResponse(stdout=out)
1✔
243
        req = HTTPRequest(sys.stdin, e, resp)
1✔
244
        data = self.file.index_html(req, resp)
1✔
245
        self.assertEqual(resp.getStatus(), 304)
1✔
246
        self.assertEqual(data, b'')
1✔
247

248
        # modified since
249
        t_mod = rfc1123_date(now - 100)
1✔
250
        e['HTTP_IF_MODIFIED_SINCE'] = t_mod
1✔
251
        out = BytesIO()
1✔
252
        resp = HTTPResponse(stdout=out)
1✔
253
        req = HTTPRequest(sys.stdin, e, resp)
1✔
254
        data = self.file.index_html(req, resp)
1✔
255
        self.assertEqual(resp.getStatus(), 200)
1✔
256
        self.assertEqual(data, bytes(self.file.data))
1✔
257

258
    def testPUT(self):
1✔
259
        s = b'# some python\n'
1✔
260

261
        # with content type
262
        data = BytesIO(s)
1✔
263
        req = aputrequest(data, 'text/x-python')
1✔
264
        req.processInputs()
1✔
265
        self.file.PUT(req, req.RESPONSE)
1✔
266

267
        self.assertEqual(self.file.content_type, 'text/x-python')
1✔
268
        self.assertEqual(self.file.data, s)
1✔
269

270
        # without content type
271
        data.seek(0)
1✔
272
        req = aputrequest(data, '')
1✔
273
        req.processInputs()
1✔
274
        self.file.PUT(req, req.RESPONSE)
1✔
275

276
        self.assertEqual(self.file.content_type, 'text/x-python')
1✔
277
        self.assertEqual(self.file.data, s)
1✔
278

279
    def testIndexHtmlWithPdata(self):
1✔
280
        self.file.manage_upload(b'a' * (2 << 16))  # 128K
1✔
281
        self.file.index_html(self.app.REQUEST, self.app.REQUEST.RESPONSE)
1✔
282
        self.assertTrue(self.app.REQUEST.RESPONSE._wrote)
1✔
283

284
    def testIndexHtmlWithString(self):
1✔
285
        self.file.manage_upload(b'a' * 100)  # 100 bytes
1✔
286
        self.file.index_html(self.app.REQUEST, self.app.REQUEST.RESPONSE)
1✔
287
        self.assertTrue(not self.app.REQUEST.RESPONSE._wrote)
1✔
288

289
    def testPrincipiaSearchSource_not_text(self):
1✔
290
        data = ''.join([chr(x) for x in range(256)])
1✔
291
        data = data.encode('utf-8')
1✔
292
        self.file.manage_edit('foobar', 'application/octet-stream',
1✔
293
                              filedata=data)
294
        self.assertEqual(self.file.PrincipiaSearchSource(), b'')
1✔
295

296
    def testPrincipiaSearchSource_text(self):
1✔
297
        self.file.manage_edit('foobar', 'text/plain',
1✔
298
                              filedata=b'Now is the time for all good men to '
299
                                       b'come to the aid of the Party.')
300
        self.assertTrue(b'Party' in self.file.PrincipiaSearchSource())
1✔
301

302
    def test_manage_DAVget_binary(self):
1✔
303
        self.assertEqual(self.file.manage_DAVget(), self.data)
1✔
304

305
    def test_manage_DAVget_text(self):
1✔
306
        text = (b'Now is the time for all good men to '
1✔
307
                b'come to the aid of the Party.')
308
        self.file.manage_edit('foobar', 'text/plain', filedata=text)
1✔
309
        self.assertEqual(self.file.manage_DAVget(), text)
1✔
310

311
    def test_interfaces(self):
1✔
312
        from OFS.Image import File
1✔
313
        from OFS.interfaces import IWriteLock
1✔
314
        from zope.interface.verify import verifyClass
1✔
315
        from ZPublisher.HTTPRangeSupport import HTTPRangeInterface
1✔
316

317
        verifyClass(HTTPRangeInterface, File)
1✔
318
        verifyClass(IWriteLock, File)
1✔
319

320
    def testUnicode(self):
1✔
321
        val = 'some unicode string here'
1✔
322

323
        self.assertRaises(TypeError, self.file.update_data,
1✔
324
                          data=val, content_type='text/plain')
325

326
    def test__str__returns_native_string(self):
1✔
327
        small_data = b'small data'
1✔
328
        self.file.manage_upload(file=small_data)
1✔
329
        self.assertEqual(str(self.file), small_data.decode())
1✔
330

331
        # Make sure Pdata contents are handled correctly
332
        big_data = b'a' * (2 << 16)
1✔
333
        self.file.manage_upload(file=big_data)
1✔
334
        self.assertEqual(str(self.file), big_data.decode())
1✔
335

336

337
class ImageTests(FileTests):
1✔
338
    content_type = 'image/gif'
1✔
339
    factory = 'manage_addImage'
1✔
340

341
    def testUpdateData(self):
1✔
342
        self.file.update_data(self.data)
1✔
343
        self.assertEqual(self.file.size, len(self.data))
1✔
344
        self.assertEqual(self.file.data, self.data)
1✔
345
        self.assertEqual(self.file.width, 16)
1✔
346
        self.assertEqual(self.file.height, 16)
1✔
347
        self.assertTrue(ADummyCache.invalidated)
1✔
348
        self.assertTrue(ADummyCache.set)
1✔
349

350
    def testTag(self):
1✔
351
        tag_fmt = ('<img src="http://nohost/file" '
1✔
352
                   'alt="%s" title="%s" height="16" width="16" />')
353
        self.assertEqual(self.file.tag(), (tag_fmt % ('', '')))
1✔
354
        self.file.manage_changeProperties(title='foo')
1✔
355
        self.assertEqual(self.file.tag(), (tag_fmt % ('', 'foo')))
1✔
356
        self.file.manage_changeProperties(alt='bar')
1✔
357
        self.assertEqual(self.file.tag(), (tag_fmt % ('bar', 'foo')))
1✔
358

359
    testStr = testTag
1✔
360

361
    def test__str__returns_native_string(self):
1✔
362
        small_data = b'small data'
1✔
363
        self.file.manage_upload(file=small_data)
1✔
364
        self.assertIsInstance(str(self.file), str)
1✔
365

366
        # Make sure Pdata contents are handled correctly
367
        big_data = b'a' * (2 << 16)
1✔
368
        self.file.manage_upload(file=big_data)
1✔
369
        self.assertIsInstance(str(self.file), str)
1✔
370

371
    def testViewImageOrFile(self):
1✔
372
        request = self.app.REQUEST
1✔
373
        response = request.RESPONSE
1✔
374
        result = self.file.index_html(request, response)
1✔
375
        self.assertEqual(result, self.data)
1✔
376

377
    def test_interfaces(self):
1✔
378
        from OFS.Image import Image
1✔
379
        from OFS.interfaces import IWriteLock
1✔
380
        from zope.interface.verify import verifyClass
1✔
381

382
        verifyClass(IWriteLock, Image)
1✔
383

384
    def test_text_representation_is_tag(self):
1✔
385
        self.assertEqual(str(self.file),
1✔
386
                         '<img src="http://nohost/file"'
387
                         ' alt="" title="" height="16" width="16" />')
388

389

390
class FileEditTests(Testing.ZopeTestCase.FunctionalTestCase):
1✔
391
    """Browser testing ..Image.File"""
392

393
    def setUp(self):
1✔
394
        super().setUp()
1✔
395
        uf = self.app.acl_users
1✔
396
        uf.userFolderAddUser('manager', 'manager_pass', ['Manager'], [])
1✔
397
        self.app.manage_addFile('file')
1✔
398

399
        transaction.commit()
1✔
400
        self.browser = Testing.testbrowser.Browser()
1✔
401
        self.browser.login('manager', 'manager_pass')
1✔
402

403
    def test_Image__manage_main__1(self):
1✔
404
        """It shows the content of text files as text."""
405
        self.app.file.update_data('hällo'.encode())
1✔
406
        self.browser.open('http://localhost/file/manage_main')
1✔
407
        text = self.browser.getControl(name='filedata:text').value
1✔
408
        self.assertEqual(text, 'hällo')
1✔
409

410
    def test_Image__manage_main__3(self):
1✔
411
        """It shows an error message if the file content cannot be decoded."""
412
        self.app.file.update_data('hällo'.encode('latin-1'))
1✔
413
        self.browser.open('http://localhost/file/manage_main')
1✔
414
        self.assertIn(
1✔
415
            "The file could not be decoded with 'utf-8'.",
416
            self.browser.contents)
417

418
    def test_Image__manage_upload__1(self):
1✔
419
        """It uploads a file, replaces the content and sets content type."""
420
        self.browser.open('http://localhost/file/manage_main')
1✔
421
        self.browser.getControl(name='file').add_file(
1✔
422
            b'test text file', 'text/plain', 'TestFile.txt')
423
        self.browser.getControl('Upload File').click()
1✔
424
        self.assertIn('Saved changes', self.browser.contents)
1✔
425
        self.assertEqual(
1✔
426
            self.browser.getControl('Content Type').value, 'text/plain')
427
        text = self.browser.getControl(name='filedata:text').value
1✔
428
        self.assertEqual(text, 'test text file')
1✔
429

430
    def test_Image__manage_edit__1(self):
1✔
431
        """It it possible to change the file's content via browser."""
432
        self.browser.open('http://localhost/file/manage_main')
1✔
433
        text_1 = self.browser.getControl(name='filedata:text').value
1✔
434
        self.assertEqual(text_1, '')
1✔
435
        self.browser.getControl(name='filedata:text').value = 'hällo'
1✔
436
        self.browser.getControl('Save Changes').click()
1✔
437
        self.assertIn('Saved changes', self.browser.contents)
1✔
438
        text_2 = self.browser.getControl(name='filedata:text').value
1✔
439
        self.assertEqual(text_2, 'hällo')
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc