• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 6263629025

21 Sep 2023 03:12PM UTC coverage: 82.146% (-0.01%) from 82.159%
6263629025

Pull #1164

github

web-flow
[pre-commit.ci lite] apply automatic fixes
Pull Request #1164: Move all linters to pre-commit.

4353 of 6963 branches covered (0.0%)

Branch coverage included in aggregate %.

487 of 487 new or added lines in 186 files covered. (100.0%)

27394 of 31684 relevant lines covered (86.46%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.47
/src/OFS/Cache.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE
11
#
12
##############################################################################
13
"""Cacheable object and cache management base classes."""
1✔
14

15
import sys
1✔
16
import time
1✔
17
from logging import getLogger
1✔
18

19
from AccessControl.class_init import InitializeClass
1✔
20
from AccessControl.Permissions import view_management_screens
1✔
21
from AccessControl.SecurityInfo import ClassSecurityInfo
1✔
22
from AccessControl.SecurityManagement import getSecurityManager
1✔
23
from AccessControl.unauthorized import Unauthorized
1✔
24
from Acquisition import aq_acquire
1✔
25
from Acquisition import aq_base
1✔
26
from Acquisition import aq_get
1✔
27
from Acquisition import aq_inner
1✔
28
from Acquisition import aq_parent
1✔
29
from App.special_dtml import DTMLFile
1✔
30

31

32
ZCM_MANAGERS = '__ZCacheManager_ids__'
1✔
33

34
ViewManagementScreensPermission = view_management_screens
1✔
35
ChangeCacheSettingsPermission = 'Change cache settings'
1✔
36
LOG = getLogger('Cache')
1✔
37

38

39
def isCacheable(ob):
1✔
40
    return getattr(aq_base(ob), '_isCacheable', 0)
×
41

42

43
def managersExist(ob):
1✔
44
    # Returns 1 if any CacheManagers exist in the context of ob.
45
    if aq_get(ob, ZCM_MANAGERS, None, 1):
1✔
46
        return 1
1✔
47
    return 0
1✔
48

49

50
def filterCacheTab(ob):
1✔
51
    return managersExist(ob)
1✔
52

53

54
def filterCacheManagers(orig, container, name, value, extra):
1✔
55
    """This is a filter method for aq_acquire.
56

57
    It causes objects to be found only if they are in the list of cache
58
    managers.
59
    """
60
    if hasattr(aq_base(container), ZCM_MANAGERS) and \
1!
61
       name in getattr(container, ZCM_MANAGERS):
62
        return 1
1✔
63
    return 0
×
64

65

66
def getVerifiedManagerIds(container):
1✔
67
    """Gets the list of cache managers in a container, verifying each one."""
68
    ids = getattr(container, ZCM_MANAGERS, ())
1✔
69
    rval = []
1✔
70
    for id in ids:
1✔
71
        if getattr(getattr(container, id, None), '_isCacheManager', 0):
1!
72
            rval.append(id)
1✔
73
    return tuple(rval)
1✔
74

75

76
# Anytime a CacheManager is added or removed, all _v_ZCacheable_cache
77
# attributes must be invalidated.  manager_timestamp is a way to do
78
# that.
79
manager_timestamp = 0
1✔
80

81

82
class Cacheable:
1✔
83
    """Mix-in for cacheable objects."""
84

85
    manage_options = (
1✔
86
        {
87
            'label': 'Cache',
88
            'action': 'ZCacheable_manage',
89
            'filter': filterCacheTab,
90
        },
91
    )
92

93
    security = ClassSecurityInfo()
1✔
94
    security.setPermissionDefault(ChangeCacheSettingsPermission, ('Manager',))
1✔
95

96
    security.declareProtected(ViewManagementScreensPermission, 'ZCacheable_manage')  # NOQA: D001,E501
1✔
97
    ZCacheable_manage = DTMLFile('dtml/cacheable', globals())
1✔
98

99
    _v_ZCacheable_cache = None
1✔
100
    _v_ZCacheable_manager_timestamp = 0
1✔
101
    __manager_id = None
1✔
102
    __enabled = True
1✔
103
    _isCacheable = True
1✔
104

105
    @security.private
1✔
106
    def ZCacheable_getManager(self):
1✔
107
        """Returns the currently associated cache manager."""
108
        manager_id = self.__manager_id
1✔
109
        if manager_id is None:
1!
110
            return None
×
111
        try:
1✔
112
            return aq_acquire(
1✔
113
                self,
114
                manager_id,
115
                containment=1,
116
                filter=filterCacheManagers,
117
                extra=None,
118
                default=None
119
            )
120
        except AttributeError:
×
121
            return None
×
122

123
    @security.private
1✔
124
    def ZCacheable_getCache(self):
1✔
125
        """Gets the cache associated with this object."""
126
        if self.__manager_id is None:
1✔
127
            return None
1✔
128
        c = self._v_ZCacheable_cache
1✔
129
        if c is not None:
1✔
130
            # We have a volatile reference to the cache.
131
            if self._v_ZCacheable_manager_timestamp == manager_timestamp:
1!
132
                return aq_base(c)
1✔
133
        manager = self.ZCacheable_getManager()
1✔
134
        if manager is not None:
1!
135
            c = aq_base(manager.ZCacheManager_getCache())
1✔
136
        else:
137
            return None
×
138
        # Set a volatile reference to the cache then return it.
139
        self._v_ZCacheable_cache = c
1✔
140
        self._v_ZCacheable_manager_timestamp = manager_timestamp
1✔
141
        return c
1✔
142

143
    @security.private
1✔
144
    def ZCacheable_isCachingEnabled(self):
1✔
145
        """Returns true only if associated with a cache manager and caching of
146
        this method is enabled."""
147
        return self.__enabled and self.ZCacheable_getCache()
1✔
148

149
    @security.private
1✔
150
    def ZCacheable_getObAndView(self, view_name):
1✔
151
        # Returns self and view_name unchanged.
152
        return self, view_name
1✔
153

154
    @security.private
1✔
155
    def ZCacheable_get(
1✔
156
        self,
157
        view_name='',
158
        keywords=None,
159
        mtime_func=None,
160
        default=None
161
    ):
162
        """Retrieves the cached view for the object under the conditions
163
        specified by keywords.
164

165
        If the value is not yet cached, returns the default.
166
        """
167
        c = self.ZCacheable_getCache()
1✔
168
        if c is not None and self.__enabled:
1✔
169
            ob, view_name = self.ZCacheable_getObAndView(view_name)
1✔
170
            try:
1✔
171
                val = c.ZCache_get(ob, view_name, keywords,
1✔
172
                                   mtime_func, default)
173
                return val
1✔
174
            except Exception:
×
175
                LOG.warning('ZCache_get() exception')
×
176
                return default
×
177
        return default
1✔
178

179
    @security.private
1✔
180
    def ZCacheable_set(
1✔
181
        self,
182
        data,
183
        view_name='',
184
        keywords=None,
185
        mtime_func=None
186
    ):
187
        """Cacheable views should call this method after generating cacheable
188
        results.
189

190
        The data argument can be of any Python type.
191
        """
192
        c = self.ZCacheable_getCache()
1✔
193
        if c is not None and self.__enabled:
1✔
194
            ob, view_name = self.ZCacheable_getObAndView(view_name)
1✔
195
            try:
1✔
196
                c.ZCache_set(ob, data, view_name, keywords,
1✔
197
                             mtime_func)
198
            except Exception:
×
199
                LOG.warning('ZCache_set() exception')
×
200

201
    @security.protected(ViewManagementScreensPermission)
1✔
202
    def ZCacheable_invalidate(self, view_name='', REQUEST=None):
1✔
203
        """Called after a cacheable object is edited.
204

205
        Causes all cache entries that apply to the view_name to be
206
        removed. Returns a status message.
207
        """
208
        c = self.ZCacheable_getCache()
1✔
209
        if c is not None:
1✔
210
            ob, view_name = self.ZCacheable_getObAndView(view_name)
1✔
211
            try:
1✔
212
                message = c.ZCache_invalidate(ob)
1✔
213
                if not message:
1!
214
                    message = 'Invalidated.'
1✔
215
            except Exception:
×
216
                exc = sys.exc_info()
×
217
                try:
×
218
                    LOG.warning('ZCache_invalidate() exception')
×
219
                    message = 'An exception occurred: %s: %s' % exc[:2]
×
220
                finally:
221
                    exc = None
×
222
        else:
223
            message = 'This object is not associated with a cache manager.'
1✔
224

225
        if REQUEST is not None:
1!
226
            return self.ZCacheable_manage(
×
227
                self, REQUEST, management_view='Cache',
228
                manage_tabs_message=message)
229
        return message
1✔
230

231
    @security.private
1✔
232
    def ZCacheable_getModTime(self, mtime_func=None):
1✔
233
        """Returns the highest of the last mod times."""
234
        # Based on:
235
        #   mtime_func
236
        #   self.mtime
237
        #   self.__class__.mtime
238
        mtime = 0
1✔
239
        if mtime_func:
1!
240
            # Allow mtime_func to influence the mod time.
241
            mtime = mtime_func()
×
242
        base = aq_base(self)
1✔
243
        mtime = max(getattr(base, '_p_mtime', mtime) or 0, mtime)
1✔
244
        klass = getattr(base, '__class__', None)
1✔
245
        if klass:
1!
246
            klass_mtime = getattr(klass, '_p_mtime', mtime)
1✔
247
            if isinstance(klass_mtime, int):
1!
248
                mtime = max(klass_mtime, mtime)
1✔
249
        return mtime
1✔
250

251
    @security.protected(ViewManagementScreensPermission)
1✔
252
    def ZCacheable_getManagerId(self):
1✔
253
        """Returns the id of the current ZCacheManager."""
254
        return self.__manager_id
×
255

256
    @security.protected(ViewManagementScreensPermission)
1✔
257
    def ZCacheable_getManagerURL(self):
1✔
258
        """Returns the URL of the current ZCacheManager."""
259
        manager = self.ZCacheable_getManager()
×
260
        if manager is not None:
×
261
            return manager.absolute_url()
×
262
        return None
×
263

264
    @security.protected(ViewManagementScreensPermission)
1✔
265
    def ZCacheable_getManagerIds(self):
1✔
266
        """Returns a list of mappings containing the id and title of the
267
        available ZCacheManagers."""
268
        rval = []
×
269
        ob = self
×
270
        used_ids = {}
×
271
        while ob is not None:
×
272
            if hasattr(aq_base(ob), ZCM_MANAGERS):
×
273
                ids = getattr(ob, ZCM_MANAGERS)
×
274
                for id in ids:
×
275
                    manager = getattr(ob, id, None)
×
276
                    if manager is not None:
×
277
                        id = manager.getId()
×
278
                        if id not in used_ids:
×
279
                            title = getattr(aq_base(manager), 'title', '')
×
280
                            rval.append({'id': id, 'title': title})
×
281
                            used_ids[id] = 1
×
282
            ob = aq_parent(aq_inner(ob))
×
283
        return tuple(rval)
×
284

285
    @security.protected(ChangeCacheSettingsPermission)
1✔
286
    def ZCacheable_setManagerId(self, manager_id, REQUEST=None):
1✔
287
        """Changes the manager_id for this object."""
288
        self.ZCacheable_invalidate()
1✔
289
        if not manager_id:
1!
290
            # User requested disassociation
291
            # from the cache manager.
292
            manager_id = None
×
293
        else:
294
            manager_id = str(manager_id)
1✔
295
        self.__manager_id = manager_id
1✔
296
        self._v_ZCacheable_cache = None
1✔
297

298
        if REQUEST is not None:
1!
299
            return self.ZCacheable_manage(
×
300
                self,
301
                REQUEST,
302
                management_view='Cache',
303
                manage_tabs_message='Cache settings changed.'
304
            )
305

306
    @security.protected(ViewManagementScreensPermission)
1✔
307
    def ZCacheable_enabled(self):
1✔
308
        """Returns true if caching is enabled for this object or method."""
309
        return self.__enabled
×
310

311
    @security.protected(ChangeCacheSettingsPermission)
1✔
312
    def ZCacheable_setEnabled(self, enabled=0, REQUEST=None):
1✔
313
        """Changes the enabled flag."""
314
        self.__enabled = enabled and 1 or 0
1✔
315

316
        if REQUEST is not None:
1!
317
            return self.ZCacheable_manage(
×
318
                self, REQUEST, management_view='Cache',
319
                manage_tabs_message='Cache settings changed.')
320

321
    @security.protected(ViewManagementScreensPermission)
1✔
322
    def ZCacheable_configHTML(self):
1✔
323
        """Override to provide configuration of caching behavior that can only
324
        be specific to the cacheable object."""
325
        return ''
×
326

327

328
InitializeClass(Cacheable)
1✔
329

330

331
def findCacheables(
1✔
332
    ob,
333
    manager_id,
334
    require_assoc,
335
    subfolders,
336
    meta_types,
337
    rval,
338
    path
339
):
340
    """Used by the CacheManager UI.
341

342
    Recursive.  Similar to the Zope "Find" function.  Finds all
343
    Cacheable objects in a hierarchy.
344
    """
345
    try:
×
346
        if meta_types:
×
347
            subobs = ob.objectValues(meta_types)
×
348
        else:
349
            subobs = ob.objectValues()
×
350
        sm = getSecurityManager()
×
351

352
        # Add to the list of cacheable objects.
353
        for subob in subobs:
×
354
            if not isCacheable(subob):
×
355
                continue
×
356
            associated = (subob.ZCacheable_getManagerId() == manager_id)
×
357
            if require_assoc and not associated:
×
358
                continue
×
359
            if not sm.checkPermission('Change cache settings', subob):
×
360
                continue
×
361
            subpath = path + (subob.getId(),)
×
362
            info = {
×
363
                'sortkey': subpath,
364
                'path': '/'.join(subpath),
365
                'title': getattr(aq_base(subob), 'title', ''),
366
                'icon': None,
367
                'associated': associated,
368
            }
369
            rval.append(info)
×
370

371
        # Visit subfolders.
372
        if subfolders:
×
373
            if meta_types:
×
374
                subobs = ob.objectValues()
×
375
            for subob in subobs:
×
376
                subpath = path + (subob.getId(),)
×
377
                if hasattr(aq_base(subob), 'objectValues'):
×
378
                    if sm.checkPermission(
×
379
                            'Access contents information', subob):
380
                        findCacheables(
×
381
                            subob, manager_id, require_assoc,
382
                            subfolders, meta_types, rval, subpath)
383
    except Exception:
×
384
        # Ignore exceptions.
385
        import traceback
×
386
        traceback.print_exc()
×
387

388

389
class Cache:
1✔
390
    """A base class (and interface description) for caches.
391

392
    Note that Cache objects are not intended to be visible by restricted
393
    code.
394
    """
395

396
    def ZCache_invalidate(self, ob):
1✔
397
        raise NotImplementedError
398

399
    def ZCache_get(self, ob, view_name, keywords, mtime_func, default):
1✔
400
        # view_name: If an object provides different views that would
401
        #   benefit from caching, it will set view_name.
402
        #   Otherwise view_name will be an empty string.
403
        #
404
        # keywords: Either None or a mapping containing keys that
405
        #   distinguish this cache entry from others even though
406
        #   ob and view_name are the same.  DTMLMethods use keywords
407
        #   derived from the DTML namespace.
408
        #
409
        # mtime_func: When the Cache calls ZCacheable_getModTime(),
410
        #   it should pass this as an argument.  It is provided to
411
        #   allow cacheable objects to provide their own computation
412
        #   of the object's modification time.
413
        #
414
        # default: If no entry is found, ZCache_get() should return
415
        #   default.
416
        raise NotImplementedError
417

418
    def ZCache_set(self, ob, data, view_name, keywords, mtime_func):
1✔
419
        # See ZCache_get() for parameter descriptions.
420
        raise NotImplementedError
421

422

423
class CacheManager:
1✔
424
    """A base class for cache managers.
425

426
    Implement ZCacheManager_getCache().
427
    """
428

429
    security = ClassSecurityInfo()
1✔
430
    security.setPermissionDefault(ChangeCacheSettingsPermission, ('Manager',))
1✔
431

432
    @security.private
1✔
433
    def ZCacheManager_getCache(self):
1✔
434
        raise NotImplementedError
435

436
    _isCacheManager = 1
1✔
437

438
    manage_options = (
1✔
439
        {
440
            'label': 'Associate',
441
            'action': 'ZCacheManager_associate',
442
        },
443
    )
444

445
    def manage_afterAdd(self, item, container):
1✔
446
        # Adds self to the list of cache managers in the container.
447
        if aq_base(self) is aq_base(item):
1!
448
            ids = getVerifiedManagerIds(container)
1✔
449
            id = self.getId()
1✔
450
            if id not in ids:
1!
451
                setattr(container, ZCM_MANAGERS, ids + (id,))
1✔
452
                global manager_timestamp
453
                manager_timestamp = time.time()
1✔
454

455
    def manage_beforeDelete(self, item, container):
1✔
456
        # Removes self from the list of cache managers.
457
        if aq_base(self) is aq_base(item):
1!
458
            ids = getVerifiedManagerIds(container)
1✔
459
            id = self.getId()
1✔
460
            if id in ids:
1!
461
                manager_ids = [s for s in ids if s != id]
1✔
462
                if manager_ids:
1!
463
                    setattr(container, ZCM_MANAGERS, manager_ids)
×
464
                elif getattr(aq_base(self), ZCM_MANAGERS, None) is not None:
1!
465
                    delattr(self, ZCM_MANAGERS)
×
466
                global manager_timestamp
467
                manager_timestamp = time.time()
1✔
468

469
    security.declareProtected(ChangeCacheSettingsPermission, 'ZCacheManager_associate')  # NOQA: D001,E501
1✔
470
    ZCacheManager_associate = DTMLFile('dtml/cmassoc', globals())
1✔
471

472
    @security.protected(ChangeCacheSettingsPermission)
1✔
473
    def ZCacheManager_locate(
1✔
474
        self,
475
        require_assoc,
476
        subfolders,
477
        meta_types=[],
478
        REQUEST=None
479
    ):
480
        """Locates cacheable objects."""
481
        ob = aq_parent(aq_inner(self))
×
482
        rval = []
×
483
        manager_id = self.getId()
×
484
        if '' in meta_types:
×
485
            # User selected "All".
486
            meta_types = []
×
487
        findCacheables(
×
488
            ob,
489
            manager_id,
490
            require_assoc,
491
            subfolders,
492
            meta_types,
493
            rval,
494
            ()
495
        )
496

497
        if REQUEST is not None:
×
498
            return self.ZCacheManager_associate(
×
499
                self,
500
                REQUEST,
501
                show_results=1,
502
                results=rval,
503
                management_view="Associate"
504
            )
505
        return rval
×
506

507
    @security.protected(ChangeCacheSettingsPermission)
1✔
508
    def ZCacheManager_setAssociations(self, props=None, REQUEST=None):
1✔
509
        """Associates and un-associates cacheable objects with this cache
510
        manager."""
511
        addcount = 0
×
512
        remcount = 0
×
513
        parent = aq_parent(aq_inner(self))
×
514
        sm = getSecurityManager()
×
515
        my_id = str(self.getId())
×
516
        if props is None:
×
517
            props = REQUEST.form
×
518
        for key, do_associate in props.items():
×
519
            if key[:10] == 'associate_':
×
520
                path = key[10:]
×
521
                ob = parent.restrictedTraverse(path)
×
522
                if not sm.checkPermission('Change cache settings', ob):
×
523
                    raise Unauthorized
×
524
                if not isCacheable(ob):
×
525
                    # Not a cacheable object.
526
                    continue
×
527
                manager_id = str(ob.ZCacheable_getManagerId())
×
528
                if do_associate:
×
529
                    if manager_id != my_id:
×
530
                        ob.ZCacheable_setManagerId(my_id)
×
531
                        addcount = addcount + 1
×
532
                else:
533
                    if manager_id == my_id:
×
534
                        ob.ZCacheable_setManagerId(None)
×
535
                        remcount = remcount + 1
×
536

537
        if REQUEST is not None:
×
538
            return self.ZCacheManager_associate(
×
539
                self, REQUEST, management_view="Associate",
540
                manage_tabs_message='%d association(s) made, %d removed.' %
541
                (addcount, remcount)
542
            )
543

544

545
InitializeClass(CacheManager)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc