• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 6263821954

21 Sep 2023 03:28PM UTC coverage: 82.146% (-0.01%) from 82.159%
6263821954

Pull #1164

github

web-flow
[pre-commit.ci lite] apply automatic fixes
Pull Request #1164: Move all linters to pre-commit.

4353 of 6963 branches covered (0.0%)

Branch coverage included in aggregate %.

493 of 493 new or added lines in 187 files covered. (100.0%)

27394 of 31684 relevant lines covered (86.46%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.35
/src/OFS/ObjectManager.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13
"""Object Manager."""
1✔
14

15
import html
1✔
16
import os
1✔
17
import re
1✔
18
from io import BytesIO
1✔
19
from logging import getLogger
1✔
20
from operator import itemgetter
1✔
21
from urllib.parse import quote
1✔
22

23
import zope.sequencesort
1✔
24
from AccessControl import ClassSecurityInfo
1✔
25
from AccessControl import getSecurityManager
1✔
26
from AccessControl.class_init import InitializeClass
1✔
27
from AccessControl.Permission import getPermissions
1✔
28
from AccessControl.Permissions import access_contents_information
1✔
29
from AccessControl.Permissions import delete_objects
1✔
30
from AccessControl.Permissions import ftp_access
1✔
31
from AccessControl.Permissions import import_export_objects
1✔
32
from AccessControl.Permissions import view_management_screens
1✔
33
from Acquisition import Implicit
1✔
34
from Acquisition import aq_acquire
1✔
35
from Acquisition import aq_base
1✔
36
from Acquisition import aq_parent
1✔
37
from App.config import getConfiguration
1✔
38
from App.FactoryDispatcher import ProductDispatcher
1✔
39
from App.Management import Navigation
1✔
40
from App.Management import Tabs
1✔
41
from App.special_dtml import DTMLFile
1✔
42
from DateTime import DateTime
1✔
43
from DateTime.interfaces import DateTimeError
1✔
44
from OFS.CopySupport import CopyContainer
1✔
45
from OFS.event import ObjectWillBeAddedEvent
1✔
46
from OFS.event import ObjectWillBeRemovedEvent
1✔
47
from OFS.interfaces import IObjectManager
1✔
48
from OFS.Lockable import LockableItem
1✔
49
from OFS.subscribers import compatibilityCall
1✔
50
from OFS.Traversable import Traversable
1✔
51
from Persistence import Persistent
1✔
52
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
53
from webdav.Collection import Collection
1✔
54
from webdav.NullResource import NullResource
1✔
55
from zExceptions import BadRequest
1✔
56
from zExceptions import ResourceLockedError
1✔
57
from zope.container.contained import notifyContainerModified
1✔
58
from zope.event import notify
1✔
59
from zope.interface import implementer
1✔
60
from zope.interface.interfaces import ComponentLookupError
1✔
61
from zope.lifecycleevent import ObjectAddedEvent
1✔
62
from zope.lifecycleevent import ObjectRemovedEvent
1✔
63
from ZPublisher.HTTPResponse import make_content_disposition
1✔
64

65

66
# Constants: __replaceable__ flags:
67
NOT_REPLACEABLE = 0
1✔
68
REPLACEABLE = 1
1✔
69
UNIQUE = 2
1✔
70

71
LOG = getLogger('ObjectManager')
1✔
72
CONFIG = getConfiguration()
1✔
73

74
# the name BadRequestException is relied upon by 3rd-party code
75
BadRequestException = BadRequest
1✔
76

77
# We want a regex to define the lower ascii control bytes and DEL as bad.
78
# Additionally we want to exclude angle brackets and ampersand as they mess
79
# with the url-quoting.
80
bad_id = re.compile(r'[\x00-\x1F\x7F<>&]').search
1✔
81

82

83
def checkValidId(self, id, allow_dup=0):
1✔
84
    # If allow_dup is false, an error will be raised if an object
85
    # with the given id already exists. If allow_dup is true,
86
    # only check that the id string contains no illegal chars;
87
    # check_valid_id() will be called again later with allow_dup
88
    # set to false before the object is added.
89
    if not id or not isinstance(id, str):
1✔
90
        if isinstance(id, str):
1✔
91
            id = html.escape(id, True)
1✔
92
        raise BadRequest('Empty or invalid id specified', id)
1✔
93
    if bad_id(id) is not None:
1✔
94
        raise BadRequest(
1✔
95
            'The id "%s" contains characters '
96
            'illegal in URLs.' % html.escape(id, True))
97
    if id in ('.', '..'):
1✔
98
        raise BadRequest(
1✔
99
            'The id "%s" is invalid because it is not traversable.' % id)
100
    if id.startswith('_'):
1✔
101
        raise BadRequest(
1✔
102
            'The id "%s" is invalid because it '
103
            'begins with an underscore.' % id)
104
    if id.startswith('aq_'):
1✔
105
        raise BadRequest(
1✔
106
            'The id "%s" is invalid because it begins with "aq_".' % id)
107
    if id.endswith('__'):
1✔
108
        raise BadRequest(
1✔
109
            'The id "%s" is invalid because it '
110
            'ends with two underscores.' % id)
111
    if id.startswith('@@') or id.startswith('++'):
1✔
112
        raise BadRequest(
1✔
113
            'The id "%s" is invalid because it starts with characters '
114
            'reserved for Zope views lookup.' % id)
115
    if not allow_dup:
1✔
116
        obj = getattr(self, id, None)
1✔
117
        if obj is not None:
1✔
118
            # An object by the given id exists either in this
119
            # ObjectManager or in the acquisition path.
120
            flags = getattr(obj, '__replaceable__', NOT_REPLACEABLE)
1✔
121
            if hasattr(aq_base(self), id):
1✔
122
                # The object is located in this ObjectManager.
123
                if not flags & REPLACEABLE:
1!
124
                    raise BadRequest(
1✔
125
                        'The id "%s" is invalid - it is already in use.' % id)
126
                # else the object is replaceable even if the UNIQUE
127
                # flag is set.
128
            elif flags & UNIQUE:
1!
129
                raise BadRequest('The id "%s" is reserved.' % id)
×
130
    if id == 'REQUEST':
1✔
131
        raise BadRequest('REQUEST is a reserved name.')
1✔
132
    if '/' in id:
1✔
133
        raise BadRequest(
1✔
134
            'The id "%s" contains characters illegal in URLs.' % id)
135

136

137
class BeforeDeleteException(Exception):
1✔
138
    pass  # raise to veto deletion
1✔
139

140

141
class BreakoutException(Exception):
1✔
142
    pass  # raised to break out of loops
1✔
143

144

145
_marker = []
1✔
146

147

148
@implementer(IObjectManager)
1✔
149
class ObjectManager(
1✔
150
    CopyContainer,
151
    Navigation,
152
    Tabs,
153
    Implicit,
154
    Persistent,
155
    Collection,
156
    LockableItem,
157
    Traversable
158
):
159
    """Generic object manager.
160

161
    This class provides core behavior for collections of heterogeneous
162
    objects.
163
    """
164

165
    security = ClassSecurityInfo()
1✔
166
    security.declareObjectProtected(access_contents_information)
1✔
167
    security.setPermissionDefault(access_contents_information,
1✔
168
                                  ('Anonymous', 'Manager'))
169

170
    meta_type = 'Object Manager'
1✔
171

172
    meta_types = ()  # Sub-object types that are specific to this object
1✔
173

174
    _objects = ()
1✔
175

176
    security.declareProtected(view_management_screens, 'manage_main')  # NOQA: D001,E501
1✔
177
    manage_main = PageTemplateFile('zpt/main', globals())
1✔
178

179
    manage_index_main = DTMLFile('dtml/index_main', globals())
1✔
180

181
    manage_options = (
1✔
182
        {
183
            'label': 'Contents',
184
            'action': 'manage_main',
185
        },
186
    )
187

188
    isAnObjectManager = 1
1✔
189
    isPrincipiaFolderish = 1
1✔
190
    has_order_support = 0  # See OrderSupport.py
1✔
191

192
    # IPossibleSite API
193
    _components = None
1✔
194

195
    @security.public
1✔
196
    def getSiteManager(self):
1✔
197
        if self._components is None:
1!
198
            raise ComponentLookupError('No component registry defined.')
×
199
        return self._components
1✔
200

201
    @security.protected('Manage Site')
1✔
202
    def setSiteManager(self, components):
1✔
203
        self._components = components
1✔
204

205
    def __class_init__(self):
1✔
206
        try:
1✔
207
            mt = list(self.meta_types)
1✔
208
        except Exception:
×
209
            mt = []
×
210
        for b in self.__bases__:
1✔
211
            try:
1✔
212
                for t in b.meta_types:
1!
213
                    if t not in mt:
×
214
                        mt.append(t)
×
215
            except Exception:
1✔
216
                pass
1✔
217
        self.meta_types = tuple(sorted(mt, key=itemgetter('name')))
1✔
218

219
        InitializeClass(self)
1✔
220

221
    def all_meta_types(self, interfaces=None):
1✔
222
        # A list of products registered elsewhere
223
        import Products
1✔
224
        external_candidates = []
1✔
225

226
        # Look at all globally visible meta types.
227
        for entry in getattr(Products, 'meta_types', ()):
1✔
228
            if interfaces is not None or \
1✔
229
               entry.get("visibility", None) == "Global":
230
                external_candidates.append(entry)
1✔
231

232
        # Filter the list of external candidates based on the
233
        # specified interface constraint
234
        if interfaces is None:
1✔
235
            interface_constrained_meta_types = external_candidates
1✔
236
        else:
237
            interface_constrained_meta_types = icmt = []
1✔
238
            for entry in external_candidates:
1✔
239
                try:
1✔
240
                    eil = entry.get('interfaces', None)
1✔
241
                    if eil is not None:
1!
242
                        for ei in eil:
1✔
243
                            for i in interfaces:
1✔
244
                                if ei is i or ei.extends(i):
1✔
245
                                    icmt.append(entry)
1✔
246
                                    raise BreakoutException  # only append 1ce
1✔
247
                except BreakoutException:
1✔
248
                    pass
1✔
249

250
        # Meta types specified by this instance are not checked against the
251
        # interface constraint. This is as it always has been, but Im not
252
        # sure it is correct.
253
        interface_constrained_meta_types.extend(list(self.meta_types))
1✔
254

255
        # Filter the list based on each meta-types's container_filter
256
        meta_types = []
1✔
257
        for entry in interface_constrained_meta_types:
1✔
258
            container_filter = entry.get('container_filter', None)
1✔
259
            if container_filter is None:
1!
260
                meta_types.append(entry)
1✔
261
            else:
262
                if container_filter(self):
×
263
                    meta_types.append(entry)
×
264

265
        # Synthesize a new key into each item that decides whether to show
266
        # the modal add dialog in the Zope 4 ZMI
267
        for mt in meta_types:
1✔
268
            want_modal = getattr(mt.get('instance', None),
1✔
269
                                 'zmi_show_add_dialog', True)
270
            mt['zmi_show_add_dialog'] = 'modal' if want_modal else ''
1✔
271

272
        return meta_types
1✔
273

274
    def _subobject_permissions(self):
1✔
275
        return getPermissions()
1✔
276

277
    def filtered_meta_types(self, user=None):
1✔
278
        # Return a list of the types for which the user has
279
        # adequate permission to add that type of object.
280
        sm = getSecurityManager()
1✔
281
        meta_types = []
1✔
282
        if callable(self.all_meta_types):
1✔
283
            all = self.all_meta_types()
1✔
284
        else:
285
            all = self.all_meta_types
1✔
286
        for meta_type in all:
1✔
287
            if 'permission' in meta_type:
1✔
288
                if sm.checkPermission(meta_type['permission'], self):
1✔
289
                    meta_types.append(meta_type)
1✔
290
            else:
291
                meta_types.append(meta_type)
1✔
292
        return meta_types
1✔
293

294
    _checkId = checkValidId
1✔
295

296
    def _setOb(self, id, object):
1✔
297
        setattr(self, id, object)
1✔
298

299
    def _delOb(self, id):
1✔
300
        delattr(self, id)
1✔
301

302
    def _getOb(self, id, default=_marker):
1✔
303
        # FIXME: what we really need to do here is ensure that only
304
        # sub-items are returned. That could have a measurable hit
305
        # on performance as things are currently implemented, so for
306
        # the moment we just make sure not to expose private attrs.
307
        if id[:1] != '_' and hasattr(aq_base(self), id):
1✔
308
            return getattr(self, id)
1✔
309
        if default is _marker:
1✔
310
            raise AttributeError(id)
1✔
311
        return default
1✔
312

313
    @security.protected(access_contents_information)
1✔
314
    def hasObject(self, id):
1✔
315
        # Indicate whether the folder has an item by ID.
316
        #
317
        # This doesn't try to be more intelligent than _getOb, and doesn't
318
        # consult _objects (for performance reasons). The common use case
319
        # is to check that an object does *not* exist.
320
        if id in ('.', '..') or \
1✔
321
           id.startswith('_') or \
322
           id.startswith('aq_') or \
323
           id.endswith('__'):
324
            return False
1✔
325
        return getattr(aq_base(self), id, None) is not None
1✔
326

327
    def _setObject(self, id, object, roles=None, user=None, set_owner=1,
1✔
328
                   suppress_events=False):
329
        """Set an object into this container.
330

331
        Also sends IObjectWillBeAddedEvent and IObjectAddedEvent.
332
        """
333
        ob = object  # better name, keep original function signature
1✔
334
        v = self._checkId(id)
1✔
335
        if v is not None:
1!
336
            id = v
×
337
        t = getattr(ob, 'meta_type', None)
1✔
338

339
        # If an object by the given id already exists, remove it.
340
        for object_info in self._objects:
1✔
341
            if object_info['id'] == id:
1!
342
                self._delObject(id)
×
343
                break
×
344

345
        if not suppress_events:
1✔
346
            notify(ObjectWillBeAddedEvent(ob, self, id))
1✔
347

348
        self._objects = self._objects + ({'id': id, 'meta_type': t},)
1✔
349
        self._setOb(id, ob)
1✔
350
        ob = self._getOb(id)
1✔
351

352
        if set_owner:
1✔
353
            # TODO: eventify manage_fixupOwnershipAfterAdd
354
            # This will be called for a copy/clone, or a normal _setObject.
355
            ob.manage_fixupOwnershipAfterAdd()
1✔
356

357
            # Try to give user the local role "Owner", but only if
358
            # no local roles have been set on the object yet.
359
            if getattr(ob, '__ac_local_roles__', _marker) is None:
1✔
360
                user = getSecurityManager().getUser()
1✔
361
                if user is not None:
1✔
362
                    userid = user.getId()
1✔
363
                    if userid is not None:
1✔
364
                        ob.manage_setLocalRoles(userid, ['Owner'])
1✔
365

366
        if not suppress_events:
1✔
367
            notify(ObjectAddedEvent(ob, self, id))
1✔
368
            notifyContainerModified(self)
1✔
369

370
        compatibilityCall('manage_afterAdd', ob, ob, self)
1✔
371

372
        return id
1✔
373

374
    def manage_afterAdd(self, item, container):
1✔
375
        # Don't do recursion anymore, a subscriber does that.
376
        pass
1✔
377
    manage_afterAdd.__five_method__ = True
1✔
378

379
    def manage_afterClone(self, item):
1✔
380
        # Don't do recursion anymore, a subscriber does that.
381
        pass
1✔
382
    manage_afterClone.__five_method__ = True
1✔
383

384
    def manage_beforeDelete(self, item, container):
1✔
385
        # Don't do recursion anymore, a subscriber does that.
386
        pass
1✔
387
    manage_beforeDelete.__five_method__ = True
1✔
388

389
    def _delObject(self, id, dp=1, suppress_events=False):
1✔
390
        """Delete an object from this container.
391

392
        Also sends IObjectWillBeRemovedEvent and IObjectRemovedEvent.
393
        """
394
        ob = self._getOb(id)
1✔
395

396
        compatibilityCall('manage_beforeDelete', ob, ob, self)
1✔
397

398
        if not suppress_events:
1✔
399
            notify(ObjectWillBeRemovedEvent(ob, self, id))
1✔
400

401
        self._objects = tuple([i for i in self._objects
1✔
402
                               if i['id'] != id])
403
        self._delOb(id)
1✔
404

405
        # Indicate to the object that it has been deleted. This is
406
        # necessary for object DB mount points. Note that we have to
407
        # tolerate failure here because the object being deleted could
408
        # be a Broken object, and it is not possible to set attributes
409
        # on Broken objects.
410
        try:
1✔
411
            ob._v__object_deleted__ = 1
1✔
412
        except Exception:
1✔
413
            pass
1✔
414

415
        if not suppress_events:
1✔
416
            notify(ObjectRemovedEvent(ob, self, id))
1✔
417
            notifyContainerModified(self)
1✔
418

419
    @security.protected(access_contents_information)
1✔
420
    def objectIds(self, spec=None):
1✔
421
        # Returns a list of subobject ids of the current object.
422
        # If 'spec' is specified, returns objects whose meta_type
423
        # matches 'spec'.
424
        if spec is not None:
1!
425
            if isinstance(spec, str):
×
426
                spec = [spec]
×
427
            set = []
×
428
            for ob in self._objects:
×
429
                if ob['meta_type'] in spec:
×
430
                    set.append(ob['id'])
×
431
            return set
×
432
        return [o['id'] for o in self._objects]
1✔
433

434
    @security.protected(access_contents_information)
1✔
435
    def objectValues(self, spec=None):
1✔
436
        # Returns a list of actual subobjects of the current object.
437
        # If 'spec' is specified, returns only objects whose meta_type
438
        # match 'spec'.
439
        return [self._getOb(id) for id in self.objectIds(spec)]
1✔
440

441
    @security.protected(access_contents_information)
1✔
442
    def objectItems(self, spec=None):
1✔
443
        # Returns a list of (id, subobject) tuples of the current object.
444
        # If 'spec' is specified, returns only objects whose meta_type match
445
        # 'spec'
446
        return [(id, self._getOb(id)) for id in self.objectIds(spec)]
1✔
447

448
    def objectMap(self):
1✔
449
        # Return a tuple of mappings containing subobject meta-data
450
        return tuple(d.copy() for d in self._objects)
×
451

452
    @security.protected(access_contents_information)
1✔
453
    def objectIds_d(self, t=None):
1✔
454
        if hasattr(self, '_reserved_names'):
×
455
            n = self._reserved_names
×
456
        else:
457
            n = ()
×
458
        if not n:
×
459
            return self.objectIds(t)
×
460
        r = []
×
461
        for id in self.objectIds(t):
×
462
            if id not in n:
×
463
                r.append(id)
×
464
        return r
×
465

466
    @security.protected(access_contents_information)
1✔
467
    def objectValues_d(self, t=None):
1✔
468
        return list(map(self._getOb, self.objectIds_d(t)))
×
469

470
    @security.protected(access_contents_information)
1✔
471
    def objectItems_d(self, t=None):
1✔
472
        r = []
×
473
        for id in self.objectIds_d(t):
×
474
            r.append((id, self._getOb(id)))
×
475
        return r
×
476

477
    @security.protected(access_contents_information)
1✔
478
    def objectMap_d(self, t=None):
1✔
479
        if hasattr(self, '_reserved_names'):
×
480
            n = self._reserved_names
×
481
        else:
482
            n = ()
×
483
        if not n:
×
484
            return self._objects
×
485
        r = []
×
486
        for d in self._objects:
×
487
            if d['id'] not in n:
×
488
                r.append(d.copy())
×
489
        return r
×
490

491
    @security.protected(access_contents_information)
1✔
492
    def superValues(self, t):
1✔
493
        # Return all of the objects of a given type located in
494
        # this object and containing objects.
495
        if isinstance(t, str):
×
496
            t = (t,)
×
497
        obj = self
×
498
        seen = {}
×
499
        vals = []
×
500
        relativePhysicalPath = ()
×
501
        x = 0
×
502
        while x < 100:
×
503
            if not hasattr(obj, '_getOb'):
×
504
                break
×
505
            get = obj._getOb
×
506
            if hasattr(obj, '_objects'):
×
507
                for i in obj._objects:
×
508
                    try:
×
509
                        id = i['id']
×
510
                        physicalPath = relativePhysicalPath + (id,)
×
511
                        if physicalPath not in seen and i['meta_type'] in t:
×
512
                            vals.append(get(id))
×
513
                            seen[physicalPath] = 1
×
514
                    except Exception:
×
515
                        pass
×
516

517
            if hasattr(obj, '__parent__'):
×
518
                obj = aq_parent(obj)
×
519
                relativePhysicalPath = ('..',) + relativePhysicalPath
×
520
            else:
521
                return vals
×
522
            x = x + 1
×
523
        return vals
×
524

525
    manage_addProduct = ProductDispatcher()
1✔
526

527
    @security.protected(delete_objects)
1✔
528
    def manage_delObjects(self, ids=[], REQUEST=None):
1✔
529
        """Delete a subordinate object.
530

531
        The objects specified in 'ids' get deleted.
532
        """
533
        if isinstance(ids, str):
1✔
534
            ids = [ids]
1✔
535
        if not ids:
1!
536
            raise BadRequest('No items specified')
×
537
        try:
1✔
538
            p = self._reserved_names
1✔
539
        except Exception:
1✔
540
            p = ()
1✔
541
        for n in ids:
1✔
542
            if n in p:
1!
543
                raise BadRequest('Not Deletable')
×
544
        while ids:
1✔
545
            id = ids[-1]
1✔
546
            v = self._getOb(id, self)
1✔
547

548
            try:
1✔
549
                if v.wl_isLocked():
1!
550
                    raise ResourceLockedError(
×
551
                        'Object "%s" is locked.' % v.getId())
552
            except AttributeError:
×
553
                pass
×
554

555
            if v is self:
1!
556
                raise BadRequest('%s does not exist' %
×
557
                                 html.escape(ids[-1], True))
558
            self._delObject(id)
1✔
559
            del ids[-1]
1✔
560
        if REQUEST is not None:
1!
561
            return self.manage_main(self, REQUEST)
×
562

563
    @security.protected(access_contents_information)
1✔
564
    def tpValues(self):
1✔
565
        # Return a list of subobjects, used by tree tag.
566
        r = []
1✔
567
        if hasattr(aq_base(self), 'tree_ids'):
1!
568
            tree_ids = self.tree_ids
×
569
            try:
×
570
                tree_ids = list(tree_ids)
×
571
            except TypeError:
×
572
                pass
×
573
            if hasattr(tree_ids, 'sort'):
×
574
                tree_ids.sort()
×
575
            for id in tree_ids:
×
576
                if hasattr(self, id):
×
577
                    r.append(self._getOb(id))
×
578
        else:
579
            obj_ids = sorted(self.objectIds())
1✔
580
            for id in obj_ids:
1✔
581
                o = self._getOb(id)
1✔
582
                if hasattr(aq_base(o), 'isPrincipiaFolderish') and \
1✔
583
                   o.isPrincipiaFolderish:
584
                    r.append(o)
1✔
585
        return r
1✔
586

587
    @security.protected(import_export_objects)
1✔
588
    def manage_exportObject(
1✔
589
        self,
590
        id='',
591
        download=None,
592
        RESPONSE=None,
593
        REQUEST=None
594
    ):
595
        """Exports an object to a file and returns that file."""
596
        if not id:
1!
597
            # can't use getId() here (breaks on "old" exported objects)
598
            id = self.id
×
599
            if getattr(id, '__func__', None) is not None:
×
600
                id = id()
×
601
            ob = self
×
602
        else:
603
            ob = self._getOb(id)
1✔
604

605
        suffix = 'zexp'
1✔
606

607
        if download:
1✔
608
            with BytesIO() as f:
1✔
609
                ob._p_jar.exportFile(ob._p_oid, f)
1✔
610
                result = f.getvalue()
1✔
611

612
            if RESPONSE is not None:
1✔
613
                RESPONSE.setHeader('Content-type', 'application/data')
1✔
614
                RESPONSE.setHeader(
1✔
615
                    'Content-Disposition',
616
                    make_content_disposition('inline', f'{id}.{suffix}')
617
                )
618
            return result
1✔
619

620
        f = os.path.join(CONFIG.clienthome, f'{id}.{suffix}')
1✔
621
        with open(f, 'w+b') as fd:
1✔
622
            ob._p_jar.exportFile(ob._p_oid, fd)
1✔
623

624
        if REQUEST is not None:
1!
625
            return self.manage_main(
×
626
                self, REQUEST,
627
                manage_tabs_message=f'"{id}" successfully exported to "{f}"',
628
                title='Object exported'
629
            )
630

631
    security.declareProtected(import_export_objects, 'manage_importExportForm')  # NOQA: D001,E501
1✔
632
    manage_importExportForm = DTMLFile('dtml/importExport', globals())
1✔
633

634
    @security.protected(import_export_objects)
1✔
635
    def manage_importObject(self, file, REQUEST=None, set_owner=1,
1✔
636
                            suppress_events=False):
637
        """Import an object from a file."""
638
        dirname, file = os.path.split(file)
1✔
639
        if dirname:
1!
640
            raise BadRequest('Invalid file name %s' % html.escape(file, True))
×
641

642
        for impath in self._getImportPaths():
1!
643
            filepath = os.path.join(impath, 'import', file)
1✔
644
            if os.path.exists(filepath):
1!
645
                break
1✔
646
        else:
647
            raise BadRequest(
×
648
                'File does not exist: %s' %
649
                html.escape(
650
                    file, True))
651

652
        imported = self._importObjectFromFile(
1✔
653
            filepath, verify=bool(REQUEST), set_owner=set_owner,
654
            suppress_events=suppress_events)
655
        getId = getattr(aq_base(imported), "getId", None)  # aq wrapped
1✔
656
        id = imported.getId() if getId is not None else imported.id
1✔
657
        if getattr(id, '__func__', None) is not None:
1!
658
            id = id()
×
659

660
        if REQUEST is not None:
1!
661
            return self.manage_main(
×
662
                self,
663
                REQUEST,
664
                manage_tabs_message='"%s" successfully imported' % id,
665
                title='Object imported',
666
                update_menu=1
667
            )
668

669
    def _importObjectFromFile(self, filepath, verify=1, set_owner=1,
1✔
670
                              suppress_events=False):
671
        # locate a valid connection
672
        connection = self._p_jar
1✔
673
        obj = self
1✔
674

675
        while connection is None:
1!
676
            obj = aq_parent(obj)
×
677
            connection = obj._p_jar
×
678
        ob = connection.importFile(filepath)
1✔
679
        if verify:
1!
680
            self._verifyObjectPaste(ob, validate_src=0)
×
681
        getId = getattr(ob, "getId", None)  # not acquisition wrapped
1✔
682
        id = getId() if getId is not None else ob.id
1✔
683
        if getattr(id, '__func__', None) is not None:
1!
684
            id = id()
×
685
        self._setObject(id, ob, set_owner=set_owner,
1✔
686
                        suppress_events=suppress_events)
687

688
        # try to make ownership implicit if possible in the context
689
        # that the object was imported into.
690
        ob = self._getOb(id)
1✔
691
        ob.manage_changeOwnershipType(explicit=0)
1✔
692
        return ob
1✔
693

694
    def _getImportPaths(self):
1✔
695
        paths = []
1✔
696
        if CONFIG.instancehome not in paths:
1!
697
            paths.append(CONFIG.instancehome)
1✔
698
        if CONFIG.clienthome not in paths:
1!
699
            paths.append(CONFIG.clienthome)
1✔
700
        return paths
1✔
701

702
    def list_imports(self):
1✔
703
        listing = []
1✔
704
        for impath in self._getImportPaths():
1✔
705
            directory = os.path.join(impath, 'import')
1✔
706
            if not os.path.isdir(directory):
1!
707
                continue
1✔
708
            listing += [f for f in os.listdir(directory)
×
709
                        if f.endswith('.zexp') or f.endswith('.xml')]
710
        listing.sort()
1✔
711
        return listing
1✔
712

713
    @security.protected(ftp_access)
1✔
714
    def manage_hasId(self, REQUEST):
1✔
715
        """Check if the folder has an object with REQUEST['id']."""
716
        # When this method will be removed, please also remove
717
        # `ftp_access` both from `AppendixA`from the Zope Developers
718
        # Book and from `AccessControl.Permissions`
719
        import warnings
×
720
        warnings.warn(
×
721
            "`ObjectManager.manage_hasId` is deprecated "
722
            "and will be removed in future.",
723
            DeprecationWarning)
724
        if not REQUEST['id'] in self.objectIds():
×
725
            raise KeyError(REQUEST['id'])
×
726

727
    def __delitem__(self, name):
1✔
728
        return self.manage_delObjects(ids=[name])
1✔
729

730
    def __getitem__(self, key):
1✔
731
        if key in self:
1✔
732
            return self._getOb(key, None)
1✔
733

734
        request = getattr(self, 'REQUEST', None)
1✔
735
        if not (isinstance(request, str) or request is None):
1✔
736
            method = request.get('REQUEST_METHOD', 'GET')
1✔
737
            if request.maybe_webdav_client and method not in ('GET', 'POST'):
1✔
738
                return NullResource(self, key, request).__of__(self)
1✔
739

740
        raise KeyError(key)
1✔
741

742
    def __setitem__(self, key, value):
1✔
743
        return self._setObject(key, value)
1✔
744

745
    def __contains__(self, name):
1✔
746
        for ob in self._objects:
1✔
747
            if name == ob['id']:
1✔
748
                return True
1✔
749

750
        return False
1✔
751

752
    def __iter__(self):
1✔
753
        return iter(self.objectIds())
1✔
754

755
    def __len__(self):
1✔
756
        return len(self.objectIds())
1✔
757

758
    def __bool__(self):
1✔
759
        return True
1✔
760

761
    @security.protected(access_contents_information)
1✔
762
    def get(self, key, default=None):
1✔
763
        if key in self:
1✔
764
            return self._getOb(key, default)
1✔
765
        return default
1✔
766

767
    @security.protected(access_contents_information)
1✔
768
    def keys(self):
1✔
769
        return self.objectIds()
1✔
770

771
    @security.protected(access_contents_information)
1✔
772
    def items(self):
1✔
773
        return self.objectItems()
1✔
774

775
    @security.protected(access_contents_information)
1✔
776
    def values(self):
1✔
777
        return self.objectValues()
1✔
778

779
    @security.protected(access_contents_information)
1✔
780
    def compute_size(self, ob):
1✔
781
        if hasattr(aq_base(ob), 'get_size'):
1✔
782
            ob_size = ob.get_size()
1✔
783
            if ob_size < 1024:
1!
784
                return '1 KiB'
1✔
785
            elif ob_size > 1048576:
×
786
                return f"{ob_size / 1048576.0:0.02f} MiB"
×
787
            else:
788
                return f"{ob_size / 1024.0:0.0f} KiB"
×
789

790
    @security.protected(access_contents_information)
1✔
791
    def last_modified(self, ob):
1✔
792
        try:
1✔
793
            return DateTime(ob._p_mtime).strftime("%Y-%m-%d %H:%M")
1✔
794
        except (DateTimeError, AttributeError):
×
795
            return ''
×
796

797
    @security.protected(view_management_screens)
1✔
798
    def manage_get_sortedObjects(self, sortkey, revkey):
1✔
799
        """Return dictionaries used for the management page, sorted by sortkey
800
        (which is 'id' or an attribute of the objects).
801

802
        The direction is determined by rkey, which can be 'asc' for
803
        ascending or 'desc' for descending. It returns a list of
804
        dictionaries, with keys 'id' and 'obj', where 'id' is the ID of
805
        the object as known by the parent and 'obj' is the child object.
806
        """
807
        if sortkey not in ['position', 'title', 'meta_type', 'get_size',
1✔
808
                           '_p_mtime']:
809
            sortkey = 'id'
1✔
810

811
        items = []
1✔
812
        for id, obj in self.objectItems():
1✔
813
            item = {'id': id, 'quoted_id': quote(id), 'obj': obj}
1✔
814
            if sortkey not in ['id', 'position'] and hasattr(obj, sortkey):
1✔
815
                # add the attribute by which we need to sort
816
                item[sortkey] = getattr(obj, sortkey)
1✔
817
            items.append(item)
1✔
818

819
        if sortkey == 'position':
1✔
820
            # native ordering of Ordered Folders
821
            if revkey == 'desc':
1✔
822
                return list(reversed(items))
1✔
823
            else:
824
                return items
1✔
825

826
        if sortkey in ['id', 'title', 'meta_type']:
1✔
827
            sort_func = 'strcoll'
1✔
828
        else:
829
            sort_func = 'cmp'
1✔
830

831
        sorted_items = zope.sequencesort.sort(
1✔
832
            items,
833
            ((sortkey, sort_func, revkey), ),
834
            mapping=1
835
        )
836

837
        # remove the additional attribute
838
        return [
1✔
839
            {
840
                'id': item['id'],
841
                'quoted_id': item['quoted_id'],
842
                'obj': item['obj'],
843
            }
844
            for item in sorted_items
845
        ]
846

847
    @security.protected(view_management_screens)
1✔
848
    def getBookmarkableURLs(self):
1✔
849
        """Helper method to expose a configuration flag."""
850
        return getattr(CONFIG, 'zmi_bookmarkable_urls', True)
1✔
851

852
# Don't InitializeClass, there is a specific __class_init__ on ObjectManager
853
# InitializeClass(ObjectManager)
854

855

856
def findChildren(obj, dirname=''):
1✔
857
    """Recursive walk through the object hierarchy to find all children of an
858
    object (ajung)"""
859

860
    lst = []
1✔
861
    for name, child in obj.objectItems():
1✔
862
        if hasattr(aq_base(child), 'isPrincipiaFolderish') and \
1✔
863
           child.isPrincipiaFolderish:
864
            lst.extend(findChildren(child, dirname + obj.getId() + '/'))
1✔
865
        else:
866
            lst.append((dirname + obj.getId() + "/" + name, child))
1✔
867

868
    return lst
1✔
869

870

871
class IFAwareObjectManager:
1✔
872

873
    def all_meta_types(self, interfaces=None):
1✔
874

875
        if interfaces is None:
1!
876
            if hasattr(self, '_product_interfaces'):
1!
877
                interfaces = self._product_interfaces
1✔
878
            elif hasattr(self, 'aq_acquire'):
×
879
                try:
×
880
                    interfaces = aq_acquire(self, '_product_interfaces')
×
881
                except Exception:
×
882
                    pass
×
883

884
        return ObjectManager.all_meta_types(self, interfaces)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc