• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.CMFCore / 5850510583

pending completion
5850510583

Pull #130

github

drfho
fix lint test, whitespace

https://github.com/zopefoundation/Products.CMFCore/actions/runs/5850301279/job/15859553938?pr=130
Pull Request #130: ZMI: limit number of resulted folderish objects

2465 of 3663 branches covered (67.29%)

Branch coverage included in aggregate %.

8 of 8 new or added lines in 1 file covered. (100.0%)

17349 of 19306 relevant lines covered (89.86%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.32
/src/Products/CMFCore/DirectoryView.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13
""" Views of filesystem directories as folders.
1✔
14
"""
15

16
import logging
1✔
17
import os
1✔
18
import re
1✔
19
from sys import platform
1✔
20
from warnings import warn
1✔
21

22
from AccessControl.class_init import InitializeClass
1✔
23
from AccessControl.SecurityInfo import ClassSecurityInfo
1✔
24
from Acquisition import aq_inner
1✔
25
from Acquisition import aq_parent
1✔
26
from App.config import getConfiguration
1✔
27
from App.special_dtml import DTMLFile
1✔
28
from App.special_dtml import HTMLFile
1✔
29
from OFS.Folder import Folder
1✔
30
from OFS.ObjectManager import bad_id
1✔
31
from Persistence import Persistent
1✔
32
from zope.interface import implementer
1✔
33

34
from .FSMetadata import FSMetadata
1✔
35
from .FSObject import BadFile
1✔
36
from .interfaces import IDirectoryView
1✔
37
from .permissions import AccessContentsInformation as ACI
1✔
38
from .permissions import ManagePortal
1✔
39
from .utils import ProductsPath
1✔
40
from .utils import _dtmldir
1✔
41
from .utils import getPackageLocation
1✔
42
from .utils import getPackageName
1✔
43

44

45
logger = logging.getLogger('CMFCore.DirectoryView')
1✔
46

47
__reload_module__ = 0
1✔
48

49
# Ignore filesystem artifacts
50
base_ignore = ('.', '..')
1✔
51
# Ignore version control subdirectories
52
ignore = ('CVS', '.svn')
1✔
53
# Ignore suspected backups and hidden files
54
ignore_re = re.compile(r'\.|(.*~$)|#')
1✔
55

56

57
# and special names.
58
def _filtered_listdir(path, ignore):
1✔
59
    return [name for name in os.listdir(path)
1✔
60
            if name not in ignore and not ignore_re.match(name)]
61

62

63
class _walker:
1✔
64
    def __init__(self, ignore):
1✔
65
        # make a dict for faster lookup
66
        self.ignore = {x: None for x in ignore}
×
67

68
    def __call__(self, dirlist, dirname, names):
1✔
69
        # filter names inplace, so filtered directories don't get visited
70
        names[:] = [name for name in names
×
71
                    if name not in self.ignore and not ignore_re.match(name)]
72
        # append with stat info
73
        results = [(name, os.stat(os.path.join(dirname, name)).st_mtime)
×
74
                   for name in names]
75
        dirlist.extend(results)
×
76

77

78
def _generateKey(package, subdir):
1✔
79
    """Generate a key for a path inside a package.
80

81
    The key has the quality that keys for subdirectories can be derived by
82
    simply appending to the key.
83
    """
84
    return ':'.join((package, subdir.replace('\\', '/')))
1✔
85

86

87
def _findProductForPath(path, subdir=None):
1✔
88
    # like minimalpath, but raises an error if path is not inside a product
89
    p = os.path.abspath(path)
1✔
90
    for ppath in ProductsPath:
1!
91
        if p.startswith(ppath):
1!
92
            dirpath = p[len(ppath)+1:]
1✔
93
            parts = dirpath.replace('\\', '/').split('/', 1)
1✔
94
            parts.append('')
1✔
95
            if subdir:
1!
96
                subdir = '/'.join((parts[1], subdir))
×
97
                if subdir.startswith('/'):
×
98
                    subdir = subdir[1:]
×
99
            else:
100
                subdir = parts[1]
1✔
101
            return ('Products.' + parts[0], subdir)
1✔
102

103
    raise ValueError('Path is not inside a product')
×
104

105

106
class DirectoryInformation:
1✔
107
    data = None
1✔
108
    use_dir_mtime = True
1✔
109
    _v_last_read = 0
1✔
110
    _v_last_filelist = []  # Only used on Win32
1✔
111

112
    def __init__(self, filepath, reg_key, ignore=ignore):
1✔
113
        self._filepath = filepath
1✔
114
        self._reg_key = reg_key
1✔
115
        self.ignore = base_ignore + tuple(ignore)
1✔
116
        if platform == 'win32':
1!
117
            try:
×
118
                ntfs_detected = bool(os.stat(self._filepath).st_mtime % 1)
×
119
            except OSError:
×
120
                ntfs_detected = False
×
121
            if not ntfs_detected:
×
122
                self.use_dir_mtime = False
×
123
                self._walker = _walker(self.ignore)
×
124
        subdirs = []
1✔
125
        for entry in _filtered_listdir(self._filepath, ignore=self.ignore):
1✔
126
            entry_filepath = os.path.join(self._filepath, entry)
1✔
127
            if os.path.isdir(entry_filepath):
1✔
128
                subdirs.append(entry)
1✔
129
        self.subdirs = tuple(subdirs)
1✔
130

131
    def getSubdirs(self):
1✔
132
        return self.subdirs
1✔
133

134
    def _isAllowableFilename(self, entry):
1✔
135
        if entry[-1:] == '~':
1!
136
            return 0
×
137
        if entry[:1] in ('_', '#'):
1!
138
            return 0
×
139
        return 1
1✔
140

141
    def reload(self):
1✔
142
        self.data = None
1✔
143

144
    def _readTypesFile(self):
1✔
145
        """ Read the .objects file produced by FSDump.
146
        """
147
        types = {}
1✔
148
        try:
1✔
149
            f = open(os.path.join(self._filepath, '.objects'))
1✔
150
        except OSError:
1✔
151
            pass
1✔
152
        else:
153
            lines = f.readlines()
×
154
            f.close()
×
155
            for line in lines:
×
156
                try:
×
157
                    obname, meta_type = line.split(':')
×
158
                except ValueError:
×
159
                    pass
×
160
                else:
161
                    types[obname.strip()] = meta_type.strip()
×
162
        return types
1✔
163

164
    def _changed(self):
1✔
165
        if not getConfiguration().debug_mode:
1✔
166
            return 0
1✔
167
        mtime = 0.0
1✔
168
        filelist = []
1✔
169
        try:
1✔
170
            mtime = os.stat(self._filepath).st_mtime
1✔
171
            if not self.use_dir_mtime:
1!
172
                # some Windows directories don't change mtime
173
                # when a file is added to or deleted from them :-(
174
                # So keep a list of files as well, and see if that
175
                # changes
176
                os.path.walk(self._filepath, self._walker, filelist)
×
177
                filelist.sort()
×
178
        except Exception:
×
179
            logger.exception('Error checking for directory modification')
×
180

181
        if mtime != self._v_last_read or filelist != self._v_last_filelist:
1✔
182
            self._v_last_read = mtime
1✔
183
            self._v_last_filelist = filelist
1✔
184

185
            return 1
1✔
186

187
        return 0
1✔
188

189
    def getContents(self, registry):
1✔
190
        changed = self._changed()
1✔
191
        if self.data is None or changed:
1✔
192
            try:
1✔
193
                self.data, self.objects = self.prepareContents(registry)
1✔
194
            except Exception:
×
195
                logger.exception('Error during prepareContents')
×
196
                self.data = {}
×
197
                self.objects = ()
×
198

199
        return self.data, self.objects
1✔
200

201
    def prepareContents(self, registry):
1✔
202
        # Creates objects for each file.
203
        data = {}
1✔
204
        objects = []
1✔
205
        types = self._readTypesFile()
1✔
206
        for entry in _filtered_listdir(self._filepath, ignore=self.ignore):
1✔
207
            if not self._isAllowableFilename(entry):
1!
208
                continue
×
209
            entry_filepath = os.path.join(self._filepath, entry)
1✔
210
            if os.path.isdir(entry_filepath):
1✔
211
                # Add a subdirectory only if it was previously registered.
212
                entry_reg_key = '/'.join((self._reg_key, entry))
1✔
213
                info = registry.getDirectoryInfo(entry_reg_key)
1✔
214
                if info is not None:
1✔
215
                    # Folders on the file system have no extension or
216
                    # meta_type, as a crutch to enable customizing what gets
217
                    # created to represent a filesystem folder in a
218
                    # DirectoryView we use a fake type "FOLDER". That way
219
                    # other implementations can register for that type and
220
                    # circumvent the hardcoded assumption that all filesystem
221
                    # directories will turn into DirectoryViews.
222
                    mt = types.get(entry) or 'FOLDER'
1✔
223
                    t = registry.getTypeByMetaType(mt)
1✔
224
                    if t is None:
1✔
225
                        t = DirectoryView
1✔
226
                    metadata = FSMetadata(entry_filepath)
1✔
227
                    metadata.read()
1✔
228
                    ob = t(entry, entry_reg_key,
1✔
229
                           properties=metadata.getProperties())
230
                    ob_id = ob.getId()
1✔
231
                    data[ob_id] = ob
1✔
232
                    objects.append({'id': ob_id, 'meta_type': ob.meta_type})
1✔
233
            else:
234
                pos = entry.rfind('.')
1✔
235
                if pos >= 0:
1!
236
                    name = entry[:pos]
1✔
237
                    ext = os.path.normcase(entry[pos + 1:])
1✔
238
                else:
239
                    name = entry
×
240
                    ext = ''
×
241
                if not name or name == 'REQUEST':
1!
242
                    # Not an allowable id.
243
                    continue
×
244
                mo = bad_id(name)
1✔
245
                if mo is not None and mo != -1:  # Both re and regex formats
1!
246
                    # Not an allowable id.
247
                    continue
×
248
                t = None
1✔
249
                mt = types.get(entry, None)
1✔
250
                if mt is None:
1!
251
                    mt = types.get(name, None)
1✔
252
                if mt is not None:
1!
253
                    t = registry.getTypeByMetaType(mt)
×
254
                if t is None:
1!
255
                    t = registry.getTypeByExtension(ext)
1✔
256

257
                if t is not None:
1✔
258
                    metadata = FSMetadata(entry_filepath)
1✔
259
                    metadata.read()
1✔
260
                    try:
1✔
261
                        ob = t(name, entry_filepath, fullname=entry,
1✔
262
                               properties=metadata.getProperties())
263
                    except Exception:
×
264
                        import sys
×
265
                        import traceback
×
266
                        typ, val, tb = sys.exc_info()
×
267
                        try:
×
268
                            logger.exception('prepareContents')
×
269

270
                            exc_lines = traceback.format_exception(typ, val,
×
271
                                                                   tb)
272
                            ob = BadFile(name,
×
273
                                         entry_filepath,
274
                                         exc_str='\r\n'.join(exc_lines),
275
                                         fullname=entry)
276
                        finally:
277
                            tb = None   # Avoid leaking frame!
×
278

279
                    # FS-based security
280
                    permissions = metadata.getSecurity()
1✔
281
                    if permissions is not None:
1✔
282
                        for name in permissions:
1✔
283
                            acquire, roles = permissions[name]
1✔
284
                            try:
1✔
285
                                ob.manage_permission(name, roles, acquire)
1✔
286
                            except ValueError:
1✔
287
                                logger.exception('Error setting permissions')
1✔
288

289
                    # only DTML Methods and Python Scripts can have proxy roles
290
                    if hasattr(ob, '_proxy_roles'):
1✔
291
                        try:
1✔
292
                            ob._proxy_roles = tuple(metadata.getProxyRoles())
1✔
293
                        except Exception:
×
294
                            logger.exception('Error setting proxy role')
×
295

296
                    ob_id = ob.getId()
1✔
297
                    data[ob_id] = ob
1✔
298
                    objects.append({'id': ob_id, 'meta_type': ob.meta_type})
1✔
299

300
        return data, tuple(objects)
1✔
301

302

303
class DirectoryRegistry:
1✔
304

305
    def __init__(self):
1✔
306
        self._meta_types = {}
1✔
307
        self._object_types = {}
1✔
308
        self._directories = {}
1✔
309

310
    def registerFileExtension(self, ext, klass):
1✔
311
        self._object_types[ext] = klass
1✔
312

313
    def registerMetaType(self, mt, klass):
1✔
314
        self._meta_types[mt] = klass
1✔
315

316
    def getTypeByExtension(self, ext):
1✔
317
        return self._object_types.get(ext, None)
1✔
318

319
    def getTypeByMetaType(self, mt):
1✔
320
        return self._meta_types.get(mt, None)
1✔
321

322
    def registerDirectory(self, name, _prefix, subdirs=1, ignore=ignore):
1✔
323
        # This what is actually called to register a
324
        # file system directory to become a FSDV.
325
        package = getPackageName(_prefix)
1✔
326
        filepath = os.path.join(getPackageLocation(package), name)
1✔
327
        reg_key = _generateKey(package, name)
1✔
328
        self.registerDirectoryByKey(filepath, reg_key, subdirs, ignore)
1✔
329

330
    def registerDirectoryByKey(self, filepath, reg_key, subdirs=1,
1✔
331
                               ignore=ignore):
332
        info = DirectoryInformation(filepath, reg_key, ignore)
1✔
333
        self._directories[reg_key] = info
1✔
334
        if subdirs:
1!
335
            for entry in info.getSubdirs():
1✔
336
                entry_filepath = os.path.join(filepath, entry)
1✔
337
                entry_reg_key = '/'.join((reg_key, entry))
1✔
338
                self.registerDirectoryByKey(entry_filepath, entry_reg_key,
1✔
339
                                            subdirs, ignore)
340

341
    def reloadDirectory(self, reg_key):
1✔
342
        info = self.getDirectoryInfo(reg_key)
1✔
343
        if info is not None:
1!
344
            info.reload()
1✔
345

346
    def getDirectoryInfo(self, reg_key):
1✔
347
        # This is called when we need to get hold of the information
348
        # for a minimal path. Can return None.
349
        return self._directories.get(reg_key, None)
1✔
350

351
    def listDirectories(self):
1✔
352
        dirs = sorted(self._directories)
×
353
        return dirs
×
354

355

356
_dirreg = DirectoryRegistry()
1✔
357
registerDirectory = _dirreg.registerDirectory
1✔
358
registerFileExtension = _dirreg.registerFileExtension
1✔
359
registerMetaType = _dirreg.registerMetaType
1✔
360

361

362
def listFolderHierarchy(ob, path, rval, adding_meta_type=None, max=0, c=0):
1✔
363
    if not hasattr(ob, 'objectValues'):
×
364
        return
×
365
    for subob in ob.objectValues():
×
366
        c += 1
×
367
        base = getattr(subob, 'aq_base', subob)
×
368
        if getattr(base, 'isPrincipiaFolderish', 0):
×
369

370
            if adding_meta_type is not None and \
×
371
               hasattr(base, 'filtered_meta_types'):
372
                # Include only if the user is allowed to
373
                # add the given meta type in this location.
374
                meta_types = subob.filtered_meta_types()
×
375
                found = 0
×
376
                for mt in meta_types:
×
377
                    if mt['name'] == adding_meta_type:
×
378
                        found = 1
×
379
                        break
×
380
                if not found:
×
381
                    continue
×
382

383
            if path:
×
384
                subpath = path + '/' + subob.getId()
×
385
            else:
386
                subpath = subob.getId()
×
387
            title = getattr(subob, 'title', None)
×
388
            if title:
×
389
                title = re.sub(r'^(.{48}).*$', '\\g<1>...', str(title))
×
390
                name = f'{subpath} ({title})'
×
391
            else:
392
                name = subpath
×
393
            if max == 0 or c <= max:
×
394
                rval.append((subpath, name))
×
395
                listFolderHierarchy(subob, subpath, rval, adding_meta_type, max, c)
×
396

397

398
@implementer(IDirectoryView)
1✔
399
class DirectoryView(Persistent):
1✔
400

401
    """ Directory views mount filesystem directories.
402
    """
403

404
    meta_type = 'Filesystem Directory View'
1✔
405
    _dirpath = None
1✔
406
    _objects = ()
1✔
407

408
    def __init__(self, id, reg_key='', fullname=None, properties=None):
1✔
409
        if properties:
1✔
410
            # Since props come from the filesystem, this should be
411
            # safe.
412
            self.__dict__.update(properties)
1✔
413

414
        self.id = id
1✔
415
        self._dirpath = reg_key
1✔
416

417
    def __of__(self, parent):
1✔
418
        reg_key = self._dirpath
1✔
419
        info = _dirreg.getDirectoryInfo(reg_key)
1✔
420
        if info is None:
1✔
421
            # During GenericSetup a view will be created with an empty
422
            # reg_key. This is expected behaviour, so do not warn about it.
423
            if reg_key:
1✔
424
                warn('DirectoryView %s refers to a non-existing path %r' %
1✔
425
                     (self.id, reg_key), UserWarning)
426
            data = {}
1✔
427
            objects = ()
1✔
428
        else:
429
            data, objects = info.getContents(_dirreg)
1✔
430
        s = DirectoryViewSurrogate(self, data, objects)
1✔
431
        res = s.__of__(parent)
1✔
432
        return res
1✔
433

434
    def getId(self):
1✔
435
        return self.id
1✔
436

437

438
InitializeClass(DirectoryView)
1✔
439

440

441
@implementer(IDirectoryView)
1✔
442
class DirectoryViewSurrogate(Folder):
1✔
443

444
    """ Folderish DirectoryView.
445
    """
446

447
    meta_type = 'Filesystem Directory View'
1✔
448
    zmi_icon = 'far fa-folder-open'
1✔
449
    all_meta_types = ()
1✔
450

451
    security = ClassSecurityInfo()
1✔
452

453
    def __init__(self, real, data, objects):
1✔
454
        d = self.__dict__
1✔
455
        d.update(data)
1✔
456
        d.update(real.__dict__)
1✔
457
        d['_real'] = real
1✔
458
        d['_objects'] = objects
1✔
459

460
    def __setattr__(self, name, value):
1✔
461
        d = self.__dict__
1✔
462
        d[name] = value
1✔
463
        setattr(d['_real'], name, value)
1✔
464

465
    def __delattr__(self, name):
1✔
466
        d = self.__dict__
1✔
467
        del d[name]
1✔
468
        delattr(d['_real'], name)
1✔
469

470
    security.declareProtected(ManagePortal, 'manage_propertiesForm')
1✔
471
    manage_propertiesForm = DTMLFile('dirview_properties', _dtmldir)
1✔
472

473
    @security.protected(ManagePortal)
1✔
474
    def manage_properties(self, reg_key, REQUEST=None):
1✔
475
        """ Update the directory path of the DirectoryView.
476
        """
477
        self.__dict__['_real']._dirpath = reg_key
1✔
478
        if REQUEST is not None:
1!
479
            REQUEST['RESPONSE'].redirect('%s/manage_propertiesForm' %
×
480
                                         self.absolute_url())
481

482
    @security.protected(ACI)
1✔
483
    def getCustomizableObject(self):
1✔
484
        ob = aq_parent(aq_inner(self))
×
485
        while ob:
×
486
            if IDirectoryView.providedBy(ob):
×
487
                ob = aq_parent(ob)
×
488
            else:
489
                break
×
490
        return ob
×
491

492
    @security.protected(ACI)
1✔
493
    def listCustFolderPaths(self, adding_meta_type=None, max=0):
1✔
494
        """ List possible customization folders as key, value pairs.
495
        """
496
        rval = []
×
497
        ob = self.getCustomizableObject()
×
498
        listFolderHierarchy(ob, '', rval, adding_meta_type, max)
×
499
        rval.sort()
×
500
        return rval
×
501

502
    @security.protected(ACI)
1✔
503
    def getDirPath(self):
1✔
504
        return self.__dict__['_real']._dirpath
1✔
505

506
    @security.public
1✔
507
    def getId(self):
1✔
508
        return self.id
1✔
509

510

511
InitializeClass(DirectoryViewSurrogate)
1✔
512

513

514
manage_addDirectoryViewForm = HTMLFile('dtml/addFSDirView', globals())
1✔
515

516

517
def createDirectoryView(parent, reg_key, id=None):
1✔
518
    """ Add either a DirectoryView or a derivative object.
519
    """
520
    if not id:
1!
521
        id = reg_key.split('/')[-1]
×
522
    else:
523
        id = str(id)
1✔
524
    ob = DirectoryView(id, reg_key)
1✔
525
    parent._setObject(id, ob)
1✔
526

527

528
def addDirectoryViews(ob, name, _prefix):
1✔
529
    """ Add a directory view for every subdirectory of the given directory.
530

531
    Meant to be called by filesystem-based code. Note that registerDirectory()
532
    still needs to be called by product initialization code to satisfy
533
    persistence demands.
534
    """
535
    package = getPackageName(_prefix)
1✔
536
    reg_key = _generateKey(package, name)
1✔
537
    info = _dirreg.getDirectoryInfo(reg_key)
1✔
538
    if info is None:
1!
539
        raise ValueError('Not a registered directory: %s' % reg_key)
×
540
    for entry in info.getSubdirs():
1✔
541
        entry_reg_key = '/'.join((reg_key, entry))
1✔
542
        createDirectoryView(ob, entry_reg_key, entry)
1✔
543

544

545
def manage_addDirectoryView(self, reg_key, id=None, REQUEST=None):
1✔
546
    """ Add either a DirectoryView or a derivative object.
547
    """
548
    createDirectoryView(self, reg_key, id)
×
549
    if REQUEST is not None:
×
550
        return self.manage_main(self, REQUEST)
×
551

552

553
def manage_listAvailableDirectories(*args):
1✔
554
    """ List registered directories.
555
    """
556
    return list(_dirreg.listDirectories())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc