• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 12779696007

15 Jan 2025 01:31AM UTC coverage: 82.319% (+0.08%) from 82.243%
12779696007

push

github

web-flow
Enable ZMI History tab for ``OFS.Image.File`` (#1245)

Modernize a bit the condition to display online text editor: support
editing json inline and tolerate larger file content.

3529 of 5999 branches covered (58.83%)

Branch coverage included in aggregate %.

20 of 21 new or added lines in 2 files covered. (95.24%)

1 existing line in 1 file now uncovered.

28065 of 32381 relevant lines covered (86.67%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.38
/src/OFS/Image.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE
11
#
12
##############################################################################
13
"""Image object
14
"""
15

16
import html
1✔
17
import os
1✔
18
import struct
1✔
19
from email.generator import _make_boundary
1✔
20
from io import BytesIO
1✔
21
from mimetypes import guess_extension
1✔
22
from urllib.parse import quote
1✔
23
from xml.dom import minidom
1✔
24

25
import ZPublisher.HTTPRequest
1✔
26
from AccessControl.class_init import InitializeClass
1✔
27
from AccessControl.Permissions import change_images_and_files  # NOQA
1✔
28
from AccessControl.Permissions import view as View
1✔
29
from AccessControl.Permissions import view_management_screens
1✔
30
from AccessControl.Permissions import webdav_access
1✔
31
from AccessControl.SecurityInfo import ClassSecurityInfo
1✔
32
from Acquisition import Implicit
1✔
33
from App.special_dtml import DTMLFile
1✔
34
from DateTime.DateTime import DateTime
1✔
35
from OFS.Cache import Cacheable
1✔
36
from OFS.History import Historical
1✔
37
from OFS.History import html_diff
1✔
38
from OFS.interfaces import IWriteLock
1✔
39
from OFS.PropertyManager import PropertyManager
1✔
40
from OFS.role import RoleManager
1✔
41
from OFS.SimpleItem import Item_w__name__
1✔
42
from OFS.SimpleItem import PathReprProvider
1✔
43
from Persistence import Persistent
1✔
44
from zExceptions import Redirect
1✔
45
from zExceptions import ResourceLockedError
1✔
46
from zope.contenttype import guess_content_type
1✔
47
from zope.datetime import rfc1123_date
1✔
48
from zope.event import notify
1✔
49
from zope.interface import implementer
1✔
50
from zope.lifecycleevent import ObjectCreatedEvent
1✔
51
from zope.lifecycleevent import ObjectModifiedEvent
1✔
52
from ZPublisher import HTTPRangeSupport
1✔
53
from ZPublisher import zpublish
1✔
54
from ZPublisher.HTTPRequest import FileUpload
1✔
55

56

57
def _get_list_from_env(name, default=None):
1✔
58
    """Get list from environment variable.
59

60
    Supports splitting on comma or white space.
61
    Use the default as fallback only when the variable is not set.
62
    So if the env variable is set to an empty string, this will ignore the
63
    default and return an empty list.
64
    """
65
    value = os.environ.get(name)
1✔
66
    if value is None:
1!
67
        return default or []
1✔
68
    value = value.strip()
×
69
    if "," in value:
×
70
        return value.split(",")
×
71
    return value.split()
×
72

73

74
# We have one list for allowed, and one for disallowed inline mimetypes.
75
# This is for security purposes.
76
# By default we use the allowlist.  We give integrators the option to choose
77
# the denylist via an environment variable.
78
ALLOWED_INLINE_MIMETYPES = _get_list_from_env(
1✔
79
    "ALLOWED_INLINE_MIMETYPES",
80
    default=[
81
        "image/gif",
82
        # The mimetypes registry lists several for jpeg 2000:
83
        "image/jp2",
84
        "image/jpeg",
85
        "image/jpeg2000-image",
86
        "image/jpeg2000",
87
        "image/jpx",
88
        "image/png",
89
        "image/webp",
90
        "image/x-icon",
91
        "image/x-jpeg2000-image",
92
        "text/plain",
93
        # By popular request we allow PDF:
94
        "application/pdf",
95
    ]
96
)
97
DISALLOWED_INLINE_MIMETYPES = _get_list_from_env(
1✔
98
    "DISALLOWED_INLINE_MIMETYPES",
99
    default=[
100
        "application/javascript",
101
        "application/x-javascript",
102
        "text/javascript",
103
        "text/html",
104
        "image/svg+xml",
105
        "image/svg+xml-compressed",
106
    ]
107
)
108
try:
1✔
109
    USE_DENYLIST = os.environ.get("OFS_IMAGE_USE_DENYLIST")
1✔
110
    USE_DENYLIST = bool(int(USE_DENYLIST))
1✔
111
except (ValueError, TypeError, AttributeError):
1✔
112
    USE_DENYLIST = False
1✔
113

114

115
manage_addFileForm = DTMLFile(
1✔
116
    'dtml/imageAdd',
117
    globals(),
118
    Kind='File',
119
    kind='file',
120
)
121

122

123
def manage_addFile(
1✔
124
    self,
125
    id,
126
    file=b'',
127
    title='',
128
    precondition='',
129
    content_type='',
130
    REQUEST=None
131
):
132
    """Add a new File object.
133

134
    Creates a new File object 'id' with the contents of 'file'"""
135

136
    id = str(id)
1✔
137
    title = str(title)
1✔
138
    content_type = str(content_type)
1✔
139
    precondition = str(precondition)
1✔
140

141
    id, title = cookId(id, title, file)
1✔
142

143
    self = self.this()
1✔
144

145
    # First, we create the file without data:
146
    self._setObject(id, File(id, title, b'', content_type, precondition))
1✔
147

148
    newFile = self._getOb(id)
1✔
149

150
    # Now we "upload" the data.  By doing this in two steps, we
151
    # can use a database trick to make the upload more efficient.
152
    if file:
1✔
153
        newFile.manage_upload(file)
1✔
154
    if content_type:
1✔
155
        newFile.content_type = content_type
1✔
156

157
    notify(ObjectCreatedEvent(newFile))
1✔
158

159
    if REQUEST is not None:
1!
160
        REQUEST.RESPONSE.redirect(self.absolute_url() + '/manage_main')
×
161

162

163
@zpublish
1✔
164
@implementer(IWriteLock, HTTPRangeSupport.HTTPRangeInterface)
1✔
165
class File(
1✔
166
    PathReprProvider,
167
    Persistent,
168
    Implicit,
169
    PropertyManager,
170
    RoleManager,
171
    Item_w__name__,
172
    Historical,
173
    Cacheable
174
):
175
    """A File object is a content object for arbitrary files."""
176
    # You can control which mimetypes may be shown inline
177
    # and which must always be downloaded, for security reasons.
178
    # Make the configuration available on the class.
179
    # Then subclasses can override this.
180
    allowed_inline_mimetypes = ALLOWED_INLINE_MIMETYPES
1✔
181
    disallowed_inline_mimetypes = DISALLOWED_INLINE_MIMETYPES
1✔
182
    use_denylist = USE_DENYLIST
1✔
183

184
    meta_type = 'File'
1✔
185
    zmi_icon = 'far fa-file-archive'
1✔
186

187
    security = ClassSecurityInfo()
1✔
188
    security.declareObjectProtected(View)
1✔
189

190
    precondition = ''
1✔
191
    size = None
1✔
192

193
    manage_editForm = DTMLFile('dtml/fileEdit', globals(),
1✔
194
                               Kind='File', kind='file')
195
    manage_editForm._setName('manage_editForm')
1✔
196

197
    security.declareProtected(view_management_screens, 'manage')  # NOQA: D001
1✔
198
    security.declareProtected(view_management_screens, 'manage_main')  # NOQA: D001,E501
1✔
199
    manage = manage_main = manage_editForm
1✔
200
    manage_uploadForm = manage_editForm
1✔
201

202
    manage_options = (({'label': 'Edit', 'action': 'manage_main'},
1✔
203
                       {'label': 'View', 'action': ''})
204
                      + PropertyManager.manage_options
205
                      + RoleManager.manage_options
206
                      + Item_w__name__.manage_options
207
                      + Cacheable.manage_options
208
                      + Historical.manage_options
209
                      )
210

211
    _properties = (
1✔
212
        {'id': 'title', 'type': 'string'},
213
        {'id': 'content_type', 'type': 'string'},
214
    )
215

216
    def __init__(self, id, title, file, content_type='', precondition=''):
1✔
217
        self.__name__ = id
1✔
218
        self.title = title
1✔
219
        self.precondition = precondition
1✔
220

221
        data, size = self._read_data(file)
1✔
222
        content_type = self._get_content_type(file, data, id, content_type)
1✔
223
        self.update_data(data, content_type, size)
1✔
224

225
    def _if_modified_since_request_handler(self, REQUEST, RESPONSE):
1✔
226
        # HTTP If-Modified-Since header handling: return True if
227
        # we can handle this request by returning a 304 response
228
        header = REQUEST.get_header('If-Modified-Since', None)
1✔
229
        if header is not None:
1✔
230
            header = header.split(';')[0]
1✔
231
            # Some proxies seem to send invalid date strings for this
232
            # header. If the date string is not valid, we ignore it
233
            # rather than raise an error to be generally consistent
234
            # with common servers such as Apache (which can usually
235
            # understand the screwy date string as a lucky side effect
236
            # of the way they parse it).
237
            # This happens to be what RFC2616 tells us to do in the face of an
238
            # invalid date.
239
            try:
1✔
240
                mod_since = int(DateTime(header).timeTime())
1✔
241
            except Exception:
×
242
                mod_since = None
×
243
            if mod_since is not None:
1!
244
                if self._p_mtime:
1!
245
                    last_mod = int(self._p_mtime)
1✔
246
                else:
247
                    last_mod = 0
×
248
                if last_mod > 0 and last_mod <= mod_since:
1✔
249
                    RESPONSE.setHeader(
1✔
250
                        'Last-Modified', rfc1123_date(self._p_mtime)
251
                    )
252
                    RESPONSE.setHeader('Content-Type', self.content_type)
1✔
253
                    RESPONSE.setHeader('Accept-Ranges', 'bytes')
1✔
254
                    RESPONSE.setStatus(304)
1✔
255
                    return True
1✔
256

257
    def _range_request_handler(self, REQUEST, RESPONSE):
1✔
258
        # HTTP Range header handling: return True if we've served a range
259
        # chunk out of our data.
260
        range = REQUEST.get_header('Range', None)
1✔
261
        request_range = REQUEST.get_header('Request-Range', None)
1✔
262
        if request_range is not None:
1✔
263
            # Netscape 2 through 4 and MSIE 3 implement a draft version
264
            # Later on, we need to serve a different mime-type as well.
265
            range = request_range
1✔
266
        if_range = REQUEST.get_header('If-Range', None)
1✔
267
        if range is not None:
1✔
268
            ranges = HTTPRangeSupport.parseRange(range)
1✔
269

270
            if if_range is not None:
1✔
271
                # Only send ranges if the data isn't modified, otherwise send
272
                # the whole object. Support both ETags and Last-Modified dates!
273
                if len(if_range) > 1 and if_range[:2] == 'ts':
1✔
274
                    # ETag:
275
                    if if_range != self.http__etag():
1✔
276
                        # Modified, so send a normal response. We delete
277
                        # the ranges, which causes us to skip to the 200
278
                        # response.
279
                        ranges = None
1✔
280
                else:
281
                    # Date
282
                    date = if_range.split(';')[0]
1✔
283
                    try:
1✔
284
                        mod_since = int(DateTime(date).timeTime())
1✔
285
                    except Exception:
1✔
286
                        mod_since = None
1✔
287
                    if mod_since is not None:
1✔
288
                        if self._p_mtime:
1!
289
                            last_mod = int(self._p_mtime)
1✔
290
                        else:
291
                            last_mod = 0
×
292
                        if last_mod > mod_since:
1✔
293
                            # Modified, so send a normal response. We delete
294
                            # the ranges, which causes us to skip to the 200
295
                            # response.
296
                            ranges = None
1✔
297

298
            if ranges:
1✔
299
                # Search for satisfiable ranges.
300
                satisfiable = 0
1✔
301
                for start, end in ranges:
1✔
302
                    if start < self.size:
1✔
303
                        satisfiable = 1
1✔
304
                        break
1✔
305

306
                if not satisfiable:
1✔
307
                    RESPONSE.setHeader(
1✔
308
                        'Content-Range', 'bytes */%d' % self.size
309
                    )
310
                    RESPONSE.setHeader('Accept-Ranges', 'bytes')
1✔
311
                    RESPONSE.setHeader(
1✔
312
                        'Last-Modified', rfc1123_date(self._p_mtime)
313
                    )
314
                    RESPONSE.setHeader('Content-Type', self.content_type)
1✔
315
                    RESPONSE.setHeader('Content-Length', self.size)
1✔
316
                    RESPONSE.setStatus(416)
1✔
317
                    return True
1✔
318

319
                ranges = HTTPRangeSupport.expandRanges(ranges, self.size)
1✔
320

321
                if len(ranges) == 1:
1✔
322
                    # Easy case, set extra header and return partial set.
323
                    start, end = ranges[0]
1✔
324
                    size = end - start
1✔
325

326
                    RESPONSE.setHeader(
1✔
327
                        'Last-Modified', rfc1123_date(self._p_mtime)
328
                    )
329
                    RESPONSE.setHeader('Content-Type', self.content_type)
1✔
330
                    RESPONSE.setHeader('Content-Length', size)
1✔
331
                    RESPONSE.setHeader('Accept-Ranges', 'bytes')
1✔
332
                    RESPONSE.setHeader(
1✔
333
                        'Content-Range',
334
                        'bytes %d-%d/%d' % (start, end - 1, self.size)
335
                    )
336
                    RESPONSE.setStatus(206)  # Partial content
1✔
337

338
                    data = self.data
1✔
339
                    if isinstance(data, bytes):
1✔
340
                        RESPONSE.write(data[start:end])
1✔
341
                        return True
1✔
342

343
                    # Linked Pdata objects. Urgh.
344
                    pos = 0
1✔
345
                    while data is not None:
1!
346
                        length = len(data.data)
1✔
347
                        pos = pos + length
1✔
348
                        if pos > start:
1✔
349
                            # We are within the range
350
                            lstart = length - (pos - start)
1✔
351

352
                            if lstart < 0:
1!
353
                                lstart = 0
×
354

355
                            # find the endpoint
356
                            if end <= pos:
1!
357
                                lend = length - (pos - end)
1✔
358

359
                                # Send and end transmission
360
                                RESPONSE.write(data[lstart:lend])
1✔
361
                                break
1✔
362

363
                            # Not yet at the end, transmit what we have.
364
                            RESPONSE.write(data[lstart:])
×
365

366
                        data = data.next
1✔
367

368
                    return True
1✔
369

370
                else:
371
                    boundary = _make_boundary()
1✔
372

373
                    # Calculate the content length
374
                    size = (8 + len(boundary)  # End marker length
1✔
375
                            + len(ranges) * (  # Constant lenght per set
376
                                49 + len(boundary)
377
                                + len(self.content_type)
378
                                + len('%d' % self.size)))
379
                    for start, end in ranges:
1✔
380
                        # Variable length per set
381
                        size = (size + len('%d%d' % (start, end - 1))
1✔
382
                                + end - start)
383

384
                    # Some clients implement an earlier draft of the spec, they
385
                    # will only accept x-byteranges.
386
                    draftprefix = (request_range is not None) and 'x-' or ''
1✔
387

388
                    RESPONSE.setHeader('Content-Length', size)
1✔
389
                    RESPONSE.setHeader('Accept-Ranges', 'bytes')
1✔
390
                    RESPONSE.setHeader(
1✔
391
                        'Last-Modified', rfc1123_date(self._p_mtime)
392
                    )
393
                    RESPONSE.setHeader(
1✔
394
                        'Content-Type',
395
                        f'multipart/{draftprefix}byteranges;'
396
                        f' boundary={boundary}'
397
                    )
398
                    RESPONSE.setStatus(206)  # Partial content
1✔
399

400
                    data = self.data
1✔
401
                    # The Pdata map allows us to jump into the Pdata chain
402
                    # arbitrarily during out-of-order range searching.
403
                    pdata_map = {}
1✔
404
                    pdata_map[0] = data
1✔
405

406
                    for start, end in ranges:
1✔
407
                        RESPONSE.write(
1✔
408
                            b'\r\n--'
409
                            + boundary.encode('ascii')
410
                            + b'\r\n'
411
                        )
412
                        RESPONSE.write(
1✔
413
                            b'Content-Type: '
414
                            + self.content_type.encode('ascii')
415
                            + b'\r\n'
416
                        )
417
                        RESPONSE.write(
1✔
418
                            b'Content-Range: bytes '
419
                            + str(start).encode('ascii')
420
                            + b'-'
421
                            + str(end - 1).encode('ascii')
422
                            + b'/'
423
                            + str(self.size).encode('ascii')
424
                            + b'\r\n\r\n'
425
                        )
426

427
                        if isinstance(data, bytes):
1✔
428
                            RESPONSE.write(data[start:end])
1✔
429

430
                        else:
431
                            # Yippee. Linked Pdata objects. The following
432
                            # calculations allow us to fast-forward through the
433
                            # Pdata chain without a lot of dereferencing if we
434
                            # did the work already.
435
                            first_size = len(pdata_map[0].data)
1✔
436
                            if start < first_size:
1✔
437
                                closest_pos = 0
1✔
438
                            else:
439
                                closest_pos = (
1✔
440
                                    ((start - first_size) >> 16 << 16)
441
                                    + first_size
442
                                )
443
                            pos = min(closest_pos, max(pdata_map.keys()))
1✔
444
                            data = pdata_map[pos]
1✔
445

446
                            while data is not None:
1!
447
                                length = len(data.data)
1✔
448
                                pos = pos + length
1✔
449
                                if pos > start:
1✔
450
                                    # We are within the range
451
                                    lstart = length - (pos - start)
1✔
452

453
                                    if lstart < 0:
1✔
454
                                        lstart = 0
1✔
455

456
                                    # find the endpoint
457
                                    if end <= pos:
1✔
458
                                        lend = length - (pos - end)
1✔
459

460
                                        # Send and loop to next range
461
                                        RESPONSE.write(data[lstart:lend])
1✔
462
                                        break
1✔
463

464
                                    # Not yet at the end,
465
                                    # transmit what we have.
466
                                    RESPONSE.write(data[lstart:])
1✔
467

468
                                data = data.next
1✔
469
                                # Store a reference to a Pdata chain link
470
                                # so we don't have to deref during
471
                                # this request again.
472
                                pdata_map[pos] = data
1✔
473

474
                    # Do not keep the link references around.
475
                    del pdata_map
1✔
476

477
                    RESPONSE.write(
1✔
478
                        b'\r\n--' + boundary.encode('ascii') + b'--\r\n')
479
                    return True
1✔
480

481
    def _should_force_download(self):
1✔
482
        # If this returns True, the caller should set a
483
        # Content-Disposition header with filename.
484
        mimetype = extract_media_type(self.content_type)
1✔
485
        if not mimetype:
1!
486
            return False
×
487
        if self.use_denylist:
1✔
488
            # We explicitly deny a few mimetypes, and allow the rest.
489
            return mimetype in self.disallowed_inline_mimetypes
1✔
490
        # Use the allowlist.
491
        # We only explicitly allow a few mimetypes, and deny the rest.
492
        return mimetype not in self.allowed_inline_mimetypes
1✔
493

494
    @zpublish
1✔
495
    @security.protected(View)
1✔
496
    def index_html(self, REQUEST, RESPONSE):
1✔
497
        """
498
        The default view of the contents of a File or Image.
499

500
        Returns the contents of the file or image.  Also, sets the
501
        Content-Type HTTP header to the objects content type.
502
        """
503

504
        if self._if_modified_since_request_handler(REQUEST, RESPONSE):
1✔
505
            # we were able to handle this by returning a 304
506
            # unfortunately, because the HTTP cache manager uses the cache
507
            # API, and because 304 responses are required to carry the Expires
508
            # header for HTTP/1.1, we need to call ZCacheable_set here.
509
            # This is nonsensical for caches other than the HTTP cache manager
510
            # unfortunately.
511
            self.ZCacheable_set(None)
1✔
512
            return b''
1✔
513

514
        if self.precondition and hasattr(self, str(self.precondition)):
1!
515
            # Grab whatever precondition was defined and then
516
            # execute it.  The precondition will raise an exception
517
            # if something violates its terms.
518
            c = getattr(self, str(self.precondition))
×
519
            if hasattr(c, 'isDocTemp') and c.isDocTemp:
×
520
                c(REQUEST['PARENTS'][1], REQUEST)
×
521
            else:
522
                c()
×
523

524
        if self._range_request_handler(REQUEST, RESPONSE):
1✔
525
            # we served a chunk of content in response to a range request.
526
            return b''
1✔
527

528
        RESPONSE.setHeader('Last-Modified', rfc1123_date(self._p_mtime))
1✔
529
        RESPONSE.setHeader('Content-Type', self.content_type)
1✔
530
        RESPONSE.setHeader('Content-Length', self.size)
1✔
531
        RESPONSE.setHeader('Accept-Ranges', 'bytes')
1✔
532

533
        if self._should_force_download():
1✔
534
            # We need a filename, even a dummy one if needed.
535
            filename = self.getId()
1✔
536
            if "." not in filename:
1!
537
                # This either returns None or ".some_extension"
538
                ext = guess_extension(self.content_type, strict=False)
1✔
539
                if not ext:
1✔
540
                    # image/svg+xml -> svg
541
                    ext = "." + self.content_type.split("/")[-1].split("+")[0]
1✔
542
                filename += f"{ext}"
1✔
543
            filename = quote(filename.encode("utf8"))
1✔
544
            RESPONSE.setHeader(
1✔
545
                "Content-Disposition",
546
                f"attachment; filename*=UTF-8''{filename}",
547
            )
548

549
        if self.ZCacheable_isCachingEnabled():
1✔
550
            result = self.ZCacheable_get(default=None)
1✔
551
            if result is not None:
1!
552
                # We will always get None from RAMCacheManager and HTTP
553
                # Accelerated Cache Manager but we will get
554
                # something implementing the IStreamIterator interface
555
                # from a "FileCacheManager"
556
                return result
×
557

558
        self.ZCacheable_set(None)
1✔
559

560
        data = self.data
1✔
561
        if isinstance(data, bytes):
1✔
562
            RESPONSE.setBase(None)
1✔
563
            return data
1✔
564

565
        while data is not None:
1✔
566
            RESPONSE.write(data.data)
1✔
567
            data = data.next
1✔
568

569
        return b''
1✔
570

571
    @zpublish
1✔
572
    @security.protected(View)
1✔
573
    def view_image_or_file(self, URL1):
1✔
574
        """The default view of the contents of the File or Image."""
575
        raise Redirect(URL1)
1✔
576

577
    @security.protected(View)
1✔
578
    def PrincipiaSearchSource(self):
1✔
579
        """Allow file objects to be searched."""
580
        if self.content_type.startswith('text/'):
1✔
581
            return bytes(self.data)
1✔
582
        return b''
1✔
583

584
    @security.private
1✔
585
    def update_data(self, data, content_type=None, size=None):
1✔
586
        if isinstance(data, str):
1✔
587
            raise TypeError('Data can only be bytes or file-like. '
1✔
588
                            'Unicode objects are expressly forbidden.')
589

590
        if content_type is not None:
1✔
591
            self.content_type = content_type
1✔
592
        if size is None:
1✔
593
            size = len(data)
1✔
594
        self.size = size
1✔
595
        self.data = data
1✔
596
        self.ZCacheable_invalidate()
1✔
597
        self.ZCacheable_set(None)
1✔
598
        self.http__refreshEtag()
1✔
599

600
    def _get_encoding(self):
1✔
601
        """Get the canonical encoding for ZMI."""
602
        return ZPublisher.HTTPRequest.default_encoding
1✔
603

604
    @zpublish
1✔
605
    @security.protected(change_images_and_files)
1✔
606
    def manage_edit(
1✔
607
        self,
608
        title,
609
        content_type,
610
        precondition='',
611
        filedata=None,
612
        REQUEST=None
613
    ):
614
        """
615
        Changes the title and content type attributes of the File or Image.
616
        """
617
        if self.wl_isLocked():
1!
618
            raise ResourceLockedError("File is locked.")
×
619

620
        self.title = str(title)
1✔
621
        self.content_type = str(content_type)
1✔
622
        if precondition:
1!
623
            self.precondition = str(precondition)
×
624
        elif self.precondition:
1!
625
            del self.precondition
×
626
        if filedata is not None:
1✔
627
            if isinstance(filedata, str):
1✔
628
                filedata = filedata.encode(self._get_encoding())
1✔
629
            self.update_data(filedata, content_type, len(filedata))
1✔
630
        else:
631
            self.ZCacheable_invalidate()
1✔
632

633
        notify(ObjectModifiedEvent(self))
1✔
634

635
        if REQUEST:
1✔
636
            message = "Saved changes."
1✔
637
            return self.manage_main(
1✔
638
                self, REQUEST, manage_tabs_message=message)
639

640
    @zpublish
1✔
641
    @security.protected(change_images_and_files)
1✔
642
    def manage_upload(self, file='', REQUEST=None):
1✔
643
        """
644
        Replaces the current contents of the File or Image object with file.
645

646
        The file or images contents are replaced with the contents of 'file'.
647
        """
648
        if self.wl_isLocked():
1!
649
            raise ResourceLockedError("File is locked.")
×
650

651
        if file:
1✔
652
            data, size = self._read_data(file)
1✔
653
            content_type = self._get_content_type(file, data, self.__name__,
1✔
654
                                                  'application/octet-stream')
655
            self.update_data(data, content_type, size)
1✔
656
            notify(ObjectModifiedEvent(self))
1✔
657
            msg = 'Saved changes.'
1✔
658
        else:
659
            msg = 'Please select a file to upload.'
1✔
660

661
        if REQUEST:
1✔
662
            return self.manage_main(
1✔
663
                self, REQUEST, manage_tabs_message=msg)
664

665
    @security.protected(change_images_and_files)
1✔
666
    def manage_is_editable_inline(self):
1✔
667
        return (
1✔
668
            self.content_type
669
            and (
670
                self.content_type.startswith('text')
671
                or self.content_type.endswith('javascript')
672
                or self.content_type == 'application/json'
673
            )
674
            and self.get_size() < 2**17
675
        )
676

677
    def manage_historyCompare(self, rev1, rev2, REQUEST,
1✔
678
                              historyComparisonResults=''):
679
        if self.manage_is_editable_inline():
1!
680
            return File.inheritedAttribute('manage_historyCompare')(
1✔
681
                self, rev1, rev2, REQUEST,
682
                historyComparisonResults=html_diff(
683
                    str(rev1), str(rev2)))
NEW
684
        return File.inheritedAttribute('manage_historyCompare')(
×
685
            self, rev1, rev2, REQUEST,
686
            historyComparisonResults=historyComparisonResults
687
        )
688

689
    def _get_content_type(self, file, body, id, content_type=None):
1✔
690
        headers = getattr(file, 'headers', None)
1✔
691
        if headers and 'content-type' in headers:
1✔
692
            content_type = headers['content-type']
1✔
693
        else:
694
            if not isinstance(body, bytes):
1✔
695
                body = body.data
1✔
696
            content_type, enc = guess_content_type(
1✔
697
                getattr(file, 'filename', id), body, content_type)
698
        return content_type
1✔
699

700
    def _read_data(self, file):
1✔
701
        import transaction
1✔
702

703
        n = 1 << 16
1✔
704

705
        if isinstance(file, str):
1!
706
            raise ValueError("Must be bytes")
×
707

708
        if isinstance(file, bytes):
1✔
709
            size = len(file)
1✔
710
            if size < n:
1✔
711
                return (file, size)
1✔
712
            # Big string: cut it into smaller chunks
713
            file = BytesIO(file)
1✔
714

715
        if isinstance(file, FileUpload) and not file:
1!
716
            raise ValueError('File not specified')
×
717

718
        if hasattr(file, '__class__') and file.__class__ is Pdata:
1!
719
            size = len(file)
×
720
            return (file, size)
×
721

722
        seek = file.seek
1✔
723
        read = file.read
1✔
724

725
        seek(0, 2)
1✔
726
        size = end = file.tell()
1✔
727

728
        if size <= 2 * n:
1✔
729
            seek(0)
1✔
730
            if size < n:
1✔
731
                return read(size), size
1✔
732
            return Pdata(read(size)), size
1✔
733

734
        # Make sure we have an _p_jar, even if we are a new object, by
735
        # doing a sub-transaction commit.
736
        transaction.savepoint(optimistic=True)
1✔
737

738
        if self._p_jar is None:
1!
739
            # Ugh
740
            seek(0)
×
741
            return Pdata(read(size)), size
×
742

743
        # Now we're going to build a linked list from back
744
        # to front to minimize the number of database updates
745
        # and to allow us to get things out of memory as soon as
746
        # possible.
747
        _next = None
1✔
748
        while end > 0:
1✔
749
            pos = end - n
1✔
750
            if pos < n:
1✔
751
                pos = 0  # we always want at least n bytes
1✔
752
            seek(pos)
1✔
753

754
            # Create the object and assign it a next pointer
755
            # in the same transaction, so that there is only
756
            # a single database update for it.
757
            data = Pdata(read(end - pos))
1✔
758
            self._p_jar.add(data)
1✔
759
            data.next = _next
1✔
760

761
            # Save the object so that we can release its memory.
762
            transaction.savepoint(optimistic=True)
1✔
763
            data._p_deactivate()
1✔
764
            # The object should be assigned an oid and be a ghost.
765
            assert data._p_oid is not None
1✔
766
            assert data._p_state == -1
1✔
767

768
            _next = data
1✔
769
            end = pos
1✔
770

771
        return (_next, size)
1✔
772

773
    @zpublish
1✔
774
    @security.protected(change_images_and_files)
1✔
775
    def PUT(self, REQUEST, RESPONSE):
1✔
776
        """Handle HTTP PUT requests"""
777
        self.dav__init(REQUEST, RESPONSE)
1✔
778
        self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1)
1✔
779
        type = REQUEST.get_header('content-type', None)
1✔
780

781
        file = REQUEST['BODYFILE']
1✔
782

783
        data, size = self._read_data(file)
1✔
784
        if isinstance(data, str):
1!
785
            data = data.encode('UTF-8')
×
786
        content_type = self._get_content_type(file, data, self.__name__,
1✔
787
                                              type or self.content_type)
788
        self.update_data(data, content_type, size)
1✔
789

790
        RESPONSE.setStatus(204)
1✔
791
        return RESPONSE
1✔
792

793
    @security.protected(View)
1✔
794
    def get_size(self):
1✔
795
        # Get the size of a file or image.
796
        # Returns the size of the file or image.
797
        size = self.size
1✔
798
        if size is None:
1!
799
            size = len(self.data)
×
800
        return size
1✔
801

802
    # deprecated; use get_size!
803
    getSize = get_size
1✔
804

805
    @security.protected(View)
1✔
806
    def getContentType(self):
1✔
807
        # Get the content type of a file or image.
808
        # Returns the content type (MIME type) of a file or image.
UNCOV
809
        return self.content_type
×
810

811
    def __bytes__(self):
1✔
812
        return bytes(self.data)
×
813

814
    def __str__(self):
1✔
815
        """In most cases, this is probably not what you want. Use ``bytes``."""
816
        if isinstance(self.data, Pdata):
1✔
817
            return bytes(self.data).decode(self._get_encoding())
1✔
818
        else:
819
            return self.data.decode(self._get_encoding())
1✔
820

821
    def __bool__(self):
1✔
822
        return True
1✔
823

824
    __nonzero__ = __bool__
1✔
825

826
    def __len__(self):
1✔
827
        data = bytes(self.data)
×
828
        return len(data)
×
829

830
    @zpublish
1✔
831
    @security.protected(webdav_access)
1✔
832
    def manage_DAVget(self):
1✔
833
        """Return body for WebDAV."""
834
        RESPONSE = self.REQUEST.RESPONSE
1✔
835

836
        if self.ZCacheable_isCachingEnabled():
1!
837
            result = self.ZCacheable_get(default=None)
1✔
838
            if result is not None:
1!
839
                # We will always get None from RAMCacheManager but we will
840
                # get something implementing the IStreamIterator interface
841
                # from FileCacheManager.
842
                # the content-length is required here by HTTPResponse.
843
                RESPONSE.setHeader('Content-Length', self.size)
×
844
                return result
×
845

846
        data = self.data
1✔
847
        if isinstance(data, bytes):
1!
848
            RESPONSE.setBase(None)
1✔
849
            return data
1✔
850

851
        while data is not None:
×
852
            RESPONSE.write(data.data)
×
853
            data = data.next
×
854

855
        return b''
×
856

857

858
InitializeClass(File)
1✔
859

860

861
manage_addImageForm = DTMLFile(
1✔
862
    'dtml/imageAdd',
863
    globals(),
864
    Kind='Image',
865
    kind='image',
866
)
867

868

869
def manage_addImage(
1✔
870
    self,
871
    id,
872
    file,
873
    title='',
874
    precondition='',
875
    content_type='',
876
    REQUEST=None
877
):
878
    """
879
    Add a new Image object.
880

881
    Creates a new Image object 'id' with the contents of 'file'.
882
    """
883
    id = str(id)
1✔
884
    title = str(title)
1✔
885
    content_type = str(content_type)
1✔
886
    precondition = str(precondition)
1✔
887

888
    id, title = cookId(id, title, file)
1✔
889

890
    self = self.this()
1✔
891

892
    # First, we create the image without data:
893
    self._setObject(id, Image(id, title, b'', content_type, precondition))
1✔
894

895
    newFile = self._getOb(id)
1✔
896

897
    # Now we "upload" the data.  By doing this in two steps, we
898
    # can use a database trick to make the upload more efficient.
899
    if file:
1!
900
        newFile.manage_upload(file)
1✔
901
    if content_type:
1!
902
        newFile.content_type = content_type
1✔
903

904
    notify(ObjectCreatedEvent(newFile))
1✔
905

906
    if REQUEST is not None:
1!
907
        try:
×
908
            url = self.DestinationURL()
×
909
        except Exception:
×
910
            url = REQUEST['URL1']
×
911
        REQUEST.RESPONSE.redirect('%s/manage_main' % url)
×
912
    return id
1✔
913

914

915
def getImageInfo(data):
1✔
916
    data = bytes(data)
1✔
917
    size = len(data)
1✔
918
    height = -1
1✔
919
    width = -1
1✔
920
    content_type = ''
1✔
921

922
    # handle GIFs
923
    if (size >= 10) and data[:6] in (b'GIF87a', b'GIF89a'):
1✔
924
        # Check to see if content_type is correct
925
        content_type = 'image/gif'
1✔
926
        w, h = struct.unpack("<HH", data[6:10])
1✔
927
        width = int(w)
1✔
928
        height = int(h)
1✔
929

930
    # See PNG v1.2 spec (http://www.cdrom.com/pub/png/spec/)
931
    # Bytes 0-7 are below, 4-byte chunk length, then 'IHDR'
932
    # and finally the 4-byte width, height
933
    elif (size >= 24
1!
934
          and data[:8] == b'\211PNG\r\n\032\n'
935
          and data[12:16] == b'IHDR'):
936
        content_type = 'image/png'
×
937
        w, h = struct.unpack(">LL", data[16:24])
×
938
        width = int(w)
×
939
        height = int(h)
×
940

941
    # Maybe this is for an older PNG version.
942
    elif (size >= 16) and (data[:8] == b'\211PNG\r\n\032\n'):
1!
943
        # Check to see if we have the right content type
944
        content_type = 'image/png'
×
945
        w, h = struct.unpack(">LL", data[8:16])
×
946
        width = int(w)
×
947
        height = int(h)
×
948

949
    # handle JPEGs
950
    elif (size >= 2) and (data[:2] == b'\377\330'):
1!
951
        content_type = 'image/jpeg'
×
952
        jpeg = BytesIO(data)
×
953
        jpeg.read(2)
×
954
        b = jpeg.read(1)
×
955
        try:
×
956
            while (b and ord(b) != 0xDA):
×
957
                while (ord(b) != 0xFF):
×
958
                    b = jpeg.read(1)
×
959
                while (ord(b) == 0xFF):
×
960
                    b = jpeg.read(1)
×
961
                if (ord(b) >= 0xC0 and ord(b) <= 0xC3):
×
962
                    jpeg.read(3)
×
963
                    h, w = struct.unpack(">HH", jpeg.read(4))
×
964
                    break
×
965
                else:
966
                    jpeg.read(int(struct.unpack(">H", jpeg.read(2))[0]) - 2)
×
967
                b = jpeg.read(1)
×
968
            width = int(w)
×
969
            height = int(h)
×
970
        except Exception:
×
971
            pass
×
972

973
    # handle SVGs
974
    elif (size >= 16) and ((b'<?xml' in data[:16]) or (b'<svg' in data[:16])):
1!
975
        try:
×
976
            xmldoc = minidom.parseString(data)
×
977
        except Exception:
×
978
            return content_type, width, height
×
979
        w = width
×
980
        h = height
×
981
        for svg in xmldoc.getElementsByTagName('svg'):
×
982
            content_type = 'image/svg+xml'
×
983
            if 'height' in svg.attributes and 'width' in svg.attributes:
×
984
                w = svg.attributes['width'].value
×
985
                h = svg.attributes['height'].value
×
986
                try:
×
987
                    w = int(float(w))
×
988
                    h = int(float(h))
×
989
                except Exception:
×
990
                    if str(w).endswith('px'):
×
991
                        w = int(float(w[:-2]))
×
992
                        h = int(float(h[:-2]))
×
993
                    elif str(w).endswith('mm'):
×
994
                        w = int(float(w[:-2]) * 3.7795)
×
995
                        h = int(float(h[:-2]) * 3.7795)
×
996
                    elif str(w).endswith('cm'):
×
997
                        w = int(float(w[:-2]) * 37.795)
×
998
                        h = int(float(h[:-2]) * 37.795)
×
999
                break
×
1000
            elif 'viewBox' in svg.attributes:
×
1001
                viewBox = svg.attributes['viewBox'].value
×
1002
                viewBox = [int(float(x)) for x in viewBox.split(' ')]
×
1003
                w = viewBox[2] - viewBox[0]
×
1004
                h = viewBox[3] - viewBox[1]
×
1005
        width = int(w)
×
1006
        height = int(h)
×
1007

1008
    return content_type, width, height
1✔
1009

1010

1011
class Image(File):
1✔
1012
    """Image objects can be GIF, PNG or JPEG and have the same methods
1013
    as File objects.  Images also have a string representation that
1014
    renders an HTML 'IMG' tag.
1015
    """
1016

1017
    meta_type = 'Image'
1✔
1018
    zmi_icon = 'far fa-file-image'
1✔
1019

1020
    security = ClassSecurityInfo()
1✔
1021
    security.declareObjectProtected(View)
1✔
1022

1023
    alt = ''
1✔
1024
    height = ''
1✔
1025
    width = ''
1✔
1026

1027
    # FIXME: Redundant, already in base class
1028
    security.declareProtected(change_images_and_files, 'manage_edit')  # NOQA: D001,E501
1✔
1029
    security.declareProtected(change_images_and_files, 'manage_upload')  # NOQA: D001,E501
1✔
1030
    security.declareProtected(View, 'index_html')  # NOQA: D001
1✔
1031
    security.declareProtected(View, 'get_size')  # NOQA: D001
1✔
1032
    security.declareProtected(View, 'getContentType')  # NOQA: D001
1✔
1033

1034
    _properties = (
1✔
1035
        {'id': 'title', 'type': 'string'},
1036
        {'id': 'alt', 'type': 'string'},
1037
        {'id': 'content_type', 'type': 'string', 'mode': 'w'},
1038
        {'id': 'height', 'type': 'string'},
1039
        {'id': 'width', 'type': 'string'},
1040
    )
1041

1042
    manage_options = (
1✔
1043
        ({'label': 'Edit', 'action': 'manage_main'},
1044
         {'label': 'View', 'action': 'view_image_or_file'})
1045
        + PropertyManager.manage_options
1046
        + RoleManager.manage_options
1047
        + Item_w__name__.manage_options
1048
        + Cacheable.manage_options
1049
    )
1050

1051
    manage_editForm = DTMLFile(
1✔
1052
        'dtml/imageEdit',
1053
        globals(),
1054
        Kind='Image',
1055
        kind='image',
1056
    )
1057
    manage_editForm._setName('manage_editForm')
1✔
1058

1059
    security.declareProtected(View, 'view_image_or_file')  # NOQA: D001
1✔
1060
    view_image_or_file = DTMLFile('dtml/imageView', globals())
1✔
1061

1062
    security.declareProtected(view_management_screens, 'manage')  # NOQA: D001
1✔
1063
    security.declareProtected(view_management_screens, 'manage_main')  # NOQA: D001,E501
1✔
1064
    manage = manage_main = manage_editForm
1✔
1065
    manage_uploadForm = manage_editForm
1✔
1066

1067
    @security.private
1✔
1068
    def update_data(self, data, content_type=None, size=None):
1✔
1069
        if isinstance(data, str):
1✔
1070
            raise TypeError('Data can only be bytes or file-like.  '
1✔
1071
                            'Unicode objects are expressly forbidden.')
1072

1073
        if size is None:
1✔
1074
            size = len(data)
1✔
1075

1076
        self.size = size
1✔
1077
        self.data = data
1✔
1078

1079
        ct, width, height = getImageInfo(data)
1✔
1080
        if ct:
1✔
1081
            content_type = ct
1✔
1082
        if width >= 0 and height >= 0:
1✔
1083
            self.width = width
1✔
1084
            self.height = height
1✔
1085

1086
        # Now we should have the correct content type, or still None
1087
        if content_type is not None:
1!
1088
            self.content_type = content_type
1✔
1089

1090
        self.ZCacheable_invalidate()
1✔
1091
        self.ZCacheable_set(None)
1✔
1092
        self.http__refreshEtag()
1✔
1093

1094
    def __bytes__(self):
1✔
1095
        return self.tag().encode('utf-8')
×
1096

1097
    def __str__(self):
1✔
1098
        return self.tag()
1✔
1099

1100
    @security.protected(View)
1✔
1101
    def tag(
1✔
1102
        self,
1103
        height=None,
1104
        width=None,
1105
        alt=None,
1106
        scale=0,
1107
        xscale=0,
1108
        yscale=0,
1109
        css_class=None,
1110
        title=None,
1111
        **args
1112
    ):
1113
        """Generate an HTML IMG tag for this image, with customization.
1114

1115
        Arguments to self.tag() can be any valid attributes of an IMG tag.
1116
        'src' will always be an absolute pathname, to prevent redundant
1117
        downloading of images. Defaults are applied intelligently for
1118
        'height', 'width', and 'alt'. If specified, the 'scale', 'xscale',
1119
        and 'yscale' keyword arguments will be used to automatically adjust
1120
        the output height and width values of the image tag.
1121
        #
1122
        Since 'class' is a Python reserved word, it cannot be passed in
1123
        directly in keyword arguments which is a problem if you are
1124
        trying to use 'tag()' to include a CSS class. The tag() method
1125
        will accept a 'css_class' argument that will be converted to
1126
        'class' in the output tag to work around this.
1127
        """
1128
        if height is None:
1!
1129
            height = self.height
1✔
1130
        if width is None:
1!
1131
            width = self.width
1✔
1132

1133
        # Auto-scaling support
1134
        xdelta = xscale or scale
1✔
1135
        ydelta = yscale or scale
1✔
1136

1137
        if xdelta and width:
1!
1138
            width = str(int(round(int(width) * xdelta)))
×
1139
        if ydelta and height:
1!
1140
            height = str(int(round(int(height) * ydelta)))
×
1141

1142
        result = '<img src="%s"' % (self.absolute_url())
1✔
1143

1144
        if alt is None:
1!
1145
            alt = getattr(self, 'alt', '')
1✔
1146
        result = f'{result} alt="{html.escape(alt, True)}"'
1✔
1147

1148
        if title is None:
1!
1149
            title = getattr(self, 'title', '')
1✔
1150
        result = f'{result} title="{html.escape(title, True)}"'
1✔
1151

1152
        if height:
1!
1153
            result = f'{result} height="{height}"'
1✔
1154

1155
        if width:
1!
1156
            result = f'{result} width="{width}"'
1✔
1157

1158
        if css_class is not None:
1!
1159
            result = f'{result} class="{css_class}"'
×
1160

1161
        for key in list(args.keys()):
1!
1162
            value = args.get(key)
×
1163
            if value:
×
1164
                result = f'{result} {key}="{value}"'
×
1165

1166
        return '%s />' % result
1✔
1167

1168

1169
InitializeClass(Image)
1✔
1170

1171

1172
def cookId(id, title, file):
1✔
1173
    if not id and hasattr(file, 'filename'):
1!
1174
        filename = file.filename
×
1175
        title = title or filename
×
1176
        id = filename[max(filename.rfind('/'),
×
1177
                          filename.rfind('\\'),
1178
                          filename.rfind(':'),
1179
                          ) + 1:]
1180
    return id, title
1✔
1181

1182

1183
class Pdata(Persistent, Implicit):
1✔
1184
    # Wrapper for possibly large data
1185

1186
    next = None
1✔
1187

1188
    def __init__(self, data):
1✔
1189
        self.data = data
1✔
1190

1191
    def __getitem__(self, key):
1✔
1192
        return self.data[key]
1✔
1193

1194
    def __len__(self):
1✔
1195
        data = bytes(self)
×
1196
        return len(data)
×
1197

1198
    def __bytes__(self):
1✔
1199
        _next = self.next
1✔
1200
        if _next is None:
1!
1201
            return self.data
1✔
1202

1203
        r = [self.data]
×
1204
        while _next is not None:
×
1205
            self = _next
×
1206
            r.append(self.data)
×
1207
            _next = self.next
×
1208

1209
        return b''.join(r)
×
1210

1211

1212
def extract_media_type(content_type):
1✔
1213
    """extract the proper media type from *content_type*.
1214

1215
    Ignore parameters and whitespace and normalize to lower case.
1216
    """
1217
    if not content_type:
1✔
1218
        return content_type
1✔
1219
    # ignore parameters
1220
    content_type = content_type.split(";", 1)[0]
1✔
1221
    # ignore whitespace
1222
    content_type = "".join(content_type.split())
1✔
1223
    # normalize to lowercase
1224
    return content_type.lower()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc