• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.ExternalEditor / 10886790212

16 Sep 2024 03:15PM UTC coverage: 81.72% (-0.3%) from 81.979%
10886790212

push

github

openlegis-br
pdata_fix

51 of 72 branches covered (70.83%)

Branch coverage included in aggregate %.

0 of 1 new or added line in 1 file covered. (0.0%)

177 of 207 relevant lines covered (85.51%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.43
/src/Products/ExternalEditor/ExternalEditor.py
1
##############################################################################
2
#
3
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""Zope External Editor Product by Casey Duncan."""
15

16
import urllib
1✔
17

18
from AccessControl.class_init import InitializeClass
1✔
19
from AccessControl.SecurityInfo import ClassSecurityInfo
1✔
20
from AccessControl.SecurityManagement import getSecurityManager
1✔
21
from Acquisition import Implicit
1✔
22
from Acquisition import aq_base
1✔
23
from Acquisition import aq_inner
1✔
24
from Acquisition import aq_parent
1✔
25
from OFS import Image
1✔
26
from OFS.Lockable import wl_isLocked
1✔
27
from zExceptions import BadRequest
1✔
28
from zope.datetime import rfc1123_date
1✔
29
from zope.interface import implementer
1✔
30
from ZPublisher.HTTPRequest import default_encoding
1✔
31
from ZPublisher.Iterators import IStreamIterator
1✔
32

33

34
ExternalEditorPermission = 'Use external editor'
1✔
35

36
_callbacks = []
1✔
37

38

39
@implementer(IStreamIterator)
1✔
40
class PDataStreamIterator:
1✔
41

42
    def __init__(self, data):
1✔
43
        self.data = data
×
44

45
    def __iter__(self):
1✔
46
        return self
×
47

48
    def __next__(self):
1✔
49
        if self.data is None:
×
50
            raise StopIteration
×
51
        data = self.data.data
×
NEW
52
        self.data = self.data.next
×
53
        return data
×
54

55

56
def registerCallback(cb):
1✔
57
    """Register a callback to be called by the External Editor when
58
    it's about to be finished with collecting metadata for the
59
    to-be-edited file to allow actions to be taken, like for example
60
    inserting more metadata headers or enabling response compression
61
    or anything.
62
    """
63
    _callbacks.append(cb)
1✔
64

65

66
def applyCallbacks(ob, metadata, request, response):
1✔
67
    """Apply the registered callbacks in the order they were
68
    registered. The callbacks are free to perform any operation,
69
    including appending new metadata attributes and setting response
70
    headers.
71
    """
72
    for cb in _callbacks:
1✔
73
        cb(ob, metadata, request, response)
1✔
74

75

76
class ExternalEditor(Implicit):
1✔
77
    """Create a response that encapsulates the data needed by the
78
       ZopeEdit helper application
79
    """
80

81
    security = ClassSecurityInfo()
1✔
82
    security.declareObjectProtected(ExternalEditorPermission)
1✔
83

84
    def __before_publishing_traverse__(self, self2, request):
1✔
85
        path = request['TraversalRequestNameStack']
1✔
86
        if path:
1!
87
            target = path[-1]
1✔
88
            if target.endswith('.zem'):
1✔
89
                # Remove extension added by EditLink()
90
                # so we can traverse to the target in Zope
91
                target = target[:-4]
1✔
92
            request.set('target', target)
1✔
93
            path[:] = []
1✔
94
        else:
95
            request.set('target', None)
×
96

97
    def index_html(self, REQUEST, RESPONSE, path=None):
1✔
98
        """Publish the object to the external editor helper app"""
99

100
        security = getSecurityManager()
1✔
101
        if path is None:
1!
102
            parent = self.aq_parent
1✔
103
            try:
1✔
104
                ob = parent[REQUEST['target']]  # Try getitem
1✔
105
            except KeyError:
×
106
                ob = getattr(parent, REQUEST['target'])  # Try getattr
×
107
            except AttributeError:
×
108
                # Handle objects that are methods in ZClasses
109
                ob = parent.propertysheets.methods[REQUEST['target']]
×
110
        else:
111
            ob = self.restrictedTraverse(path)
×
112

113
        r = []
1✔
114
        r.append('url:%s' % ob.absolute_url())
1✔
115
        r.append('meta_type:%s' % ob.meta_type)
1✔
116

117
        title = getattr(aq_base(ob), 'title', None)
1✔
118
        if title is not None:
1!
119
            if callable(title):
1!
120
                title = title()
×
121
            r.append('title:%s' % title)
1✔
122

123
        if hasattr(aq_base(ob), 'content_type'):
1✔
124
            if callable(ob.content_type):
1!
125
                r.append('content_type:%s' % ob.content_type())
×
126
            else:
127
                r.append('content_type:%s' % ob.content_type)
1✔
128

129
        if REQUEST._auth:
1!
130
            if REQUEST._auth[-1] == '\n':
1!
131
                auth = REQUEST._auth[:-1]
×
132
            else:
133
                auth = REQUEST._auth
1✔
134

135
            r.append('auth:%s' % auth)
1✔
136

137
        r.append('cookie:%s' % REQUEST.environ.get('HTTP_COOKIE', ''))
1✔
138

139
        if wl_isLocked(ob):
1✔
140
            # Object is locked, send down the lock token
141
            # owned by this user (if any)
142
            user_id = security.getUser().getId()
1✔
143
            for lock in ob.wl_lockValues():
1✔
144
                if not lock.isValid():
1!
145
                    continue  # Skip invalid/expired locks
×
146
                creator = lock.getCreator()
1✔
147
                if creator and creator[1] == user_id:
1✔
148
                    # Found a lock for this user, so send it
149
                    r.append('lock-token:%s' % lock.getLockToken())
1✔
150
                    if REQUEST.get('borrow_lock'):
1✔
151
                        r.append('borrow_lock:1')
1✔
152
                    break
1✔
153

154
        # Apply any extra callbacks that might have been registered.
155
        applyCallbacks(ob, r, REQUEST, RESPONSE)
1✔
156

157
        # Finish metadata with an empty line.
158
        r.append('')
1✔
159
        metadata = '\n'.join(r)
1✔
160
        metadata = metadata.encode()
1✔
161
        metadata_len = len(metadata)
1✔
162

163
        # Check if we should send the file's data down the response.
164
        if REQUEST.get('skip_data'):
1✔
165
            # We've been requested to send only the metadata. The
166
            # client will presumably fetch the data itself.
167
            self._write_metadata(RESPONSE, metadata, metadata_len)
1✔
168
            return ''
1✔
169

170
        ob_data = getattr(aq_base(ob), 'data', None)
1✔
171
        if (ob_data is not None and isinstance(ob_data, Image.Pdata)):
1!
172
            # We have a File instance with chunked data, lets stream it.
173
            #
174
            # Note we are setting the content-length header here. This
175
            # is a simplification. Read comment below.
176
            #
177
            # We assume that ob.get_size() will return the exact size
178
            # of the PData chain. If that assumption is broken we
179
            # might have problems. This is mainly an optimization. If
180
            # we read the whole PData chain just to compute the
181
            # correct size that could cause the whole file to be read
182
            # into memory.
183
            RESPONSE.setHeader('Content-Length', ob.get_size())
×
184
            # It is safe to use this PDataStreamIterator here because
185
            # it is consumed right below. This is only used to
186
            # simplify the code below so it only has to deal with
187
            # stream iterators or plain strings.
188
            body = PDataStreamIterator(ob.data)
×
189
        elif hasattr(ob, 'manage_FTPget'):
1✔
190
            # Calling manage_FTPget *might* have side-effects. For
191
            # example, in Archetypes it does set the 'content-type'
192
            # response header, which would end up overriding our own
193
            # content-type header because we've set it 'too
194
            # early'. We've moved setting the content-type header to
195
            # the '_write_metadata' method since, and any manipulation
196
            # of response headers should happen there, if possible.
197
            try:
1✔
198
                body = ob.manage_FTPget()
1✔
199
            except TypeError:  # some need the R/R pair!
1✔
200
                body = ob.manage_FTPget(REQUEST, RESPONSE)
1✔
201
        elif hasattr(ob, 'EditableBody'):
1!
202
            body = ob.EditableBody()
×
203
        elif hasattr(ob, 'document_src'):
1!
204
            body = ob.document_src(REQUEST, RESPONSE)
×
205
        elif hasattr(ob, 'read'):
1!
206
            body = ob.read()
×
207
        elif ob_data is not None:
1!
208
            body = ob_data
1✔
209
        else:
210
            # can't read it!
211
            raise BadRequest('Object does not support external editing')
×
212

213
        if isinstance(body, str):
1!
214
            body = body.encode(default_encoding)
×
215

216
        if IStreamIterator.providedBy(body):
1!
217
            # We need to manage our content-length because we're streaming.
218
            # The content-length should have been set in the response by
219
            # the method that returns the iterator, but we need to fix it up
220
            # here because we insert metadata before the body.
221
            clen = RESPONSE.headers.get('content-length', None)
×
222
            assert clen is not None
×
223
            self._write_metadata(RESPONSE, metadata, metadata_len + int(clen))
×
224
            for data in body:
×
225
                RESPONSE.write(data)
×
226
            return ''
×
227

228
        # If we reached this point, body must be a string. We *must*
229
        # set the headers ourselves since _write_metadata won't get
230
        # called.
231
        self._set_headers(RESPONSE)
1✔
232
        return b'\n'.join((metadata, body))
1✔
233

234
    def _set_headers(self, RESPONSE):
1✔
235
        # Using RESPONSE.setHeader('Pragma', 'no-cache') would be better, but
236
        # this chokes crappy most MSIE versions when downloads happen on SSL.
237
        # cf. http://support.microsoft.com/support/kb/articles/q316/4/31.asp
238
        # RESPONSE.setHeader('Last-Modified', rfc1123_date())
239
        RESPONSE.setHeader('Content-Type', 'application/x-zope-edit')
1✔
240

241
        # We have to test the msie behaviour
242
        agent = self.REQUEST.get_header('User-Agent', '').lower()
1✔
243
        if agent and \
1✔
244
           ("msie" in agent or "microsoft internet explorer" in agent):
245
            RESPONSE.setHeader('Cache-Control',
1✔
246
                               'must-revalidate, post-check=0, pre-check=0')
247
            RESPONSE.setHeader('Pragma', 'public')
1✔
248
        else:
249
            RESPONSE.setHeader('Pragma', 'no-cache')
1✔
250
        now = rfc1123_date()
1✔
251
        RESPONSE.setHeader('Last-Modified', now)
1✔
252
        RESPONSE.setHeader('Expires', now)
1✔
253

254
    def _write_metadata(self, RESPONSE, metadata, length):
1✔
255
        # Set response content-type so that the browser gets hinted
256
        # about what application should handle this.
257
        self._set_headers(RESPONSE)
1✔
258

259
        # Set response length and write our metadata. The '+1' on the
260
        # content-length is the '\n' after the metadata.
261
        RESPONSE.setHeader('Content-Length', length + 1)
1✔
262
        RESPONSE.write(metadata)
1✔
263
        RESPONSE.write(b'\n')
1✔
264

265

266
InitializeClass(ExternalEditor)
1✔
267

268

269
def EditLink(self, object, borrow_lock=0, skip_data=0):
1✔
270
    """Insert the external editor link to an object if appropriate"""
271
    base = aq_base(object)
1✔
272
    user = getSecurityManager().getUser()
1✔
273
    editable = (hasattr(base, 'manage_FTPget') or
1✔
274
                hasattr(base, 'EditableBody') or
275
                hasattr(base, 'document_src') or
276
                hasattr(base, 'read'))
277
    if editable and user.has_permission(ExternalEditorPermission, object):
1✔
278
        query = {}
1✔
279
        # Add extension to URL so that the Client
280
        # launch the ZopeEditManager helper app
281
        # this is a workaround for limited MIME type
282
        ext = '.zem'
1✔
283
        if borrow_lock:
1✔
284
            query['borrow_lock'] = 1
1✔
285
        if skip_data:
1✔
286
            query['skip_data'] = 1
1✔
287
        url = "{}/externalEdit_/{}{}{}".format(
1✔
288
            aq_parent(aq_inner(object)).absolute_url(),
289
            urllib.parse.quote(object.getId()),
290
            ext, querystr(query))
291
        return ('<a href="%s" '
1✔
292
                'title="Edit using external editor">'
293
                '<img src="%s/misc_/ExternalEditor/edit_icon" '
294
                'align="middle" hspace="2" border="0" alt="External Editor" />'
295
                '</a>' % (url, object.REQUEST.BASEPATH1)
296
                )
297
    else:
298
        return ''
1✔
299

300

301
def querystr(d):
1✔
302
    """Create a query string from a dict"""
303
    if d:
1✔
304
        return '?' + '&'.join(
1✔
305
            ['{}={}'.format(name, val) for name, val in d.items()])
306
    else:
307
        return ''
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc