• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.ExternalEditor / 4274894119

pending completion
4274894119

push

github

GitHub
Config with pure python template (#21)

53 of 73 branches covered (72.6%)

Branch coverage included in aggregate %.

5 of 5 new or added lines in 2 files covered. (100.0%)

176 of 205 relevant lines covered (85.85%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.93
/src/Products/ExternalEditor/ExternalEditor.py
1
##############################################################################
2
#
3
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""Zope External Editor Product by Casey Duncan."""
1✔
15

16
import urllib
1✔
17

18
from AccessControl.class_init import InitializeClass
1✔
19
from AccessControl.SecurityInfo import ClassSecurityInfo
1✔
20
from AccessControl.SecurityManagement import getSecurityManager
1✔
21
from Acquisition import Implicit
1✔
22
from Acquisition import aq_base
1✔
23
from Acquisition import aq_inner
1✔
24
from Acquisition import aq_parent
1✔
25
from OFS import Image
1✔
26
from OFS.Lockable import wl_isLocked
1✔
27
from zExceptions import BadRequest
1✔
28
from zope.datetime import rfc1123_date
1✔
29
from zope.interface import implementer
1✔
30
from ZPublisher.Iterators import IStreamIterator
1✔
31

32

33
ExternalEditorPermission = 'Use external editor'
1✔
34

35
_callbacks = []
1✔
36

37

38
@implementer(IStreamIterator)
1✔
39
class PDataStreamIterator:
1✔
40

41
    def __init__(self, data):
1✔
42
        self.data = data
×
43

44
    def __iter__(self):
1✔
45
        return self
×
46

47
    def __next__(self):
1✔
48
        if self.data is None:
×
49
            raise StopIteration
×
50
        data = self.data.data
×
51
        self.data = next(self.data)
×
52
        return data
×
53

54

55
def registerCallback(cb):
1✔
56
    """Register a callback to be called by the External Editor when
57
    it's about to be finished with collecting metadata for the
58
    to-be-edited file to allow actions to be taken, like for example
59
    inserting more metadata headers or enabling response compression
60
    or anything.
61
    """
62
    _callbacks.append(cb)
1✔
63

64

65
def applyCallbacks(ob, metadata, request, response):
1✔
66
    """Apply the registered callbacks in the order they were
67
    registered. The callbacks are free to perform any operation,
68
    including appending new metadata attributes and setting response
69
    headers.
70
    """
71
    for cb in _callbacks:
1✔
72
        cb(ob, metadata, request, response)
1✔
73

74

75
class ExternalEditor(Implicit):
1✔
76
    """Create a response that encapsulates the data needed by the
77
       ZopeEdit helper application
78
    """
79

80
    security = ClassSecurityInfo()
1✔
81
    security.declareObjectProtected(ExternalEditorPermission)
1✔
82

83
    def __before_publishing_traverse__(self, self2, request):
1✔
84
        path = request['TraversalRequestNameStack']
1✔
85
        if path:
1!
86
            target = path[-1]
1✔
87
            if target.endswith('.zem'):
1✔
88
                # Remove extension added by EditLink()
89
                # so we can traverse to the target in Zope
90
                target = target[:-4]
1✔
91
            request.set('target', target)
1✔
92
            path[:] = []
1✔
93
        else:
94
            request.set('target', None)
×
95

96
    def index_html(self, REQUEST, RESPONSE, path=None):
1✔
97
        """Publish the object to the external editor helper app"""
98

99
        security = getSecurityManager()
1✔
100
        if path is None:
1!
101
            parent = self.aq_parent
1✔
102
            try:
1✔
103
                ob = parent[REQUEST['target']]  # Try getitem
1✔
104
            except KeyError:
×
105
                ob = getattr(parent, REQUEST['target'])  # Try getattr
×
106
            except AttributeError:
×
107
                # Handle objects that are methods in ZClasses
108
                ob = parent.propertysheets.methods[REQUEST['target']]
×
109
        else:
110
            ob = self.restrictedTraverse(path)
×
111

112
        r = []
1✔
113
        r.append('url:%s' % ob.absolute_url())
1✔
114
        r.append('meta_type:%s' % ob.meta_type)
1✔
115

116
        title = getattr(aq_base(ob), 'title', None)
1✔
117
        if title is not None:
1!
118
            if callable(title):
1!
119
                title = title()
×
120
            r.append('title:%s' % title)
1✔
121

122
        if hasattr(aq_base(ob), 'content_type'):
1✔
123
            if callable(ob.content_type):
1!
124
                r.append('content_type:%s' % ob.content_type())
×
125
            else:
126
                r.append('content_type:%s' % ob.content_type)
1✔
127

128
        if REQUEST._auth:
1!
129
            if REQUEST._auth[-1] == '\n':
1!
130
                auth = REQUEST._auth[:-1]
×
131
            else:
132
                auth = REQUEST._auth
1✔
133

134
            r.append('auth:%s' % auth)
1✔
135

136
        r.append('cookie:%s' % REQUEST.environ.get('HTTP_COOKIE', ''))
1✔
137

138
        if wl_isLocked(ob):
1✔
139
            # Object is locked, send down the lock token
140
            # owned by this user (if any)
141
            user_id = security.getUser().getId()
1✔
142
            for lock in ob.wl_lockValues():
1✔
143
                if not lock.isValid():
1!
144
                    continue  # Skip invalid/expired locks
×
145
                creator = lock.getCreator()
1✔
146
                if creator and creator[1] == user_id:
1✔
147
                    # Found a lock for this user, so send it
148
                    r.append('lock-token:%s' % lock.getLockToken())
1✔
149
                    if REQUEST.get('borrow_lock'):
1✔
150
                        r.append('borrow_lock:1')
1✔
151
                    break
1✔
152

153
        # Apply any extra callbacks that might have been registered.
154
        applyCallbacks(ob, r, REQUEST, RESPONSE)
1✔
155

156
        # Finish metadata with an empty line.
157
        r.append('')
1✔
158
        metadata = '\n'.join(r)
1✔
159
        metadata = metadata.encode()
1✔
160
        metadata_len = len(metadata)
1✔
161

162
        # Check if we should send the file's data down the response.
163
        if REQUEST.get('skip_data'):
1✔
164
            # We've been requested to send only the metadata. The
165
            # client will presumably fetch the data itself.
166
            self._write_metadata(RESPONSE, metadata, metadata_len)
1✔
167
            return ''
1✔
168

169
        ob_data = getattr(aq_base(ob), 'data', None)
1✔
170
        if (ob_data is not None and isinstance(ob_data, Image.Pdata)):
1!
171
            # We have a File instance with chunked data, lets stream it.
172
            #
173
            # Note we are setting the content-length header here. This
174
            # is a simplification. Read comment below.
175
            #
176
            # We assume that ob.get_size() will return the exact size
177
            # of the PData chain. If that assumption is broken we
178
            # might have problems. This is mainly an optimization. If
179
            # we read the whole PData chain just to compute the
180
            # correct size that could cause the whole file to be read
181
            # into memory.
182
            RESPONSE.setHeader('Content-Length', ob.get_size())
×
183
            # It is safe to use this PDataStreamIterator here because
184
            # it is consumed right below. This is only used to
185
            # simplify the code below so it only has to deal with
186
            # stream iterators or plain strings.
187
            body = PDataStreamIterator(ob.data)
×
188
        elif hasattr(ob, 'manage_FTPget'):
1✔
189
            # Calling manage_FTPget *might* have side-effects. For
190
            # example, in Archetypes it does set the 'content-type'
191
            # response header, which would end up overriding our own
192
            # content-type header because we've set it 'too
193
            # early'. We've moved setting the content-type header to
194
            # the '_write_metadata' method since, and any manipulation
195
            # of response headers should happen there, if possible.
196
            try:
1✔
197
                body = ob.manage_FTPget()
1✔
198
            except TypeError:  # some need the R/R pair!
1✔
199
                body = ob.manage_FTPget(REQUEST, RESPONSE)
1✔
200
        elif hasattr(ob, 'EditableBody'):
1!
201
            body = ob.EditableBody()
×
202
        elif hasattr(ob, 'document_src'):
1!
203
            body = ob.document_src(REQUEST, RESPONSE)
×
204
        elif hasattr(ob, 'read'):
1!
205
            body = ob.read()
×
206
        elif ob_data is not None:
1!
207
            body = ob_data
1✔
208
        else:
209
            # can't read it!
210
            raise BadRequest('Object does not support external editing')
×
211

212
        if IStreamIterator.providedBy(body):
1!
213
            # We need to manage our content-length because we're streaming.
214
            # The content-length should have been set in the response by
215
            # the method that returns the iterator, but we need to fix it up
216
            # here because we insert metadata before the body.
217
            clen = RESPONSE.headers.get('content-length', None)
×
218
            assert clen is not None
×
219
            self._write_metadata(RESPONSE, metadata, metadata_len + int(clen))
×
220
            for data in body:
×
221
                RESPONSE.write(data)
×
222
            return ''
×
223

224
        # If we reached this point, body must be a string. We *must*
225
        # set the headers ourselves since _write_metadata won't get
226
        # called.
227
        self._set_headers(RESPONSE)
1✔
228
        return b'\n'.join((metadata, body))
1✔
229

230
    def _set_headers(self, RESPONSE):
1✔
231
        # Using RESPONSE.setHeader('Pragma', 'no-cache') would be better, but
232
        # this chokes crappy most MSIE versions when downloads happen on SSL.
233
        # cf. http://support.microsoft.com/support/kb/articles/q316/4/31.asp
234
        # RESPONSE.setHeader('Last-Modified', rfc1123_date())
235
        RESPONSE.setHeader('Content-Type', 'application/x-zope-edit')
1✔
236

237
        # We have to test the msie behaviour
238
        agent = self.REQUEST.get_header('User-Agent', '').lower()
1✔
239
        if agent and \
1✔
240
           ("msie" in agent or "microsoft internet explorer" in agent):
241
            RESPONSE.setHeader('Cache-Control',
1✔
242
                               'must-revalidate, post-check=0, pre-check=0')
243
            RESPONSE.setHeader('Pragma', 'public')
1✔
244
        else:
245
            RESPONSE.setHeader('Pragma', 'no-cache')
1✔
246
        now = rfc1123_date()
1✔
247
        RESPONSE.setHeader('Last-Modified', now)
1✔
248
        RESPONSE.setHeader('Expires', now)
1✔
249

250
    def _write_metadata(self, RESPONSE, metadata, length):
1✔
251
        # Set response content-type so that the browser gets hinted
252
        # about what application should handle this.
253
        self._set_headers(RESPONSE)
1✔
254

255
        # Set response length and write our metadata. The '+1' on the
256
        # content-length is the '\n' after the metadata.
257
        RESPONSE.setHeader('Content-Length', length + 1)
1✔
258
        RESPONSE.write(metadata)
1✔
259
        RESPONSE.write(b'\n')
1✔
260

261

262
InitializeClass(ExternalEditor)
1✔
263

264

265
def EditLink(self, object, borrow_lock=0, skip_data=0):
1✔
266
    """Insert the external editor link to an object if appropriate"""
267
    base = aq_base(object)
1✔
268
    user = getSecurityManager().getUser()
1✔
269
    editable = (hasattr(base, 'manage_FTPget') or
1✔
270
                hasattr(base, 'EditableBody') or
271
                hasattr(base, 'document_src') or
272
                hasattr(base, 'read'))
273
    if editable and user.has_permission(ExternalEditorPermission, object):
1✔
274
        query = {}
1✔
275
        # Add extension to URL so that the Client
276
        # launch the ZopeEditManager helper app
277
        # this is a workaround for limited MIME type
278
        ext = '.zem'
1✔
279
        if borrow_lock:
1✔
280
            query['borrow_lock'] = 1
1✔
281
        if skip_data:
1✔
282
            query['skip_data'] = 1
1✔
283
        url = "{}/externalEdit_/{}{}{}".format(
1✔
284
            aq_parent(aq_inner(object)).absolute_url(),
285
            urllib.parse.quote(object.getId()),
286
            ext, querystr(query))
287
        return ('<a href="%s" '
1✔
288
                'title="Edit using external editor">'
289
                '<img src="%s/misc_/ExternalEditor/edit_icon" '
290
                'align="middle" hspace="2" border="0" alt="External Editor" />'
291
                '</a>' % (url, object.REQUEST.BASEPATH1)
292
                )
293
    else:
294
        return ''
1✔
295

296

297
def querystr(d):
1✔
298
    """Create a query string from a dict"""
299
    if d:
1✔
300
        return '?' + '&'.join(
1✔
301
            ['{}={}'.format(name, val) for name, val in d.items()])
302
    else:
303
        return ''
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc