• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.Sessions / 16399689495

05 Apr 2025 07:12AM UTC coverage: 94.731%. Remained the same
16399689495

push

github

dataflake
- vb [ci skip]

146 of 180 branches covered (81.11%)

Branch coverage included in aggregate %.

1526 of 1585 relevant lines covered (96.28%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.41
/src/Products/Sessions/SessionDataManager.py
1
############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE
11
#
12
############################################################################
13

14
import re
1✔
15
import sys
1✔
16
from logging import getLogger
1✔
17

18
from AccessControl.class_init import InitializeClass
1✔
19
from AccessControl.Permissions import access_contents_information
1✔
20
from AccessControl.Permissions import view_management_screens
1✔
21
from AccessControl.SecurityInfo import ClassSecurityInfo
1✔
22
from Acquisition import Implicit
1✔
23
from Acquisition import aq_inner
1✔
24
from Acquisition import aq_parent
1✔
25
from App.Management import Tabs
1✔
26
from App.special_dtml import DTMLFile
1✔
27
from OFS.owner import Owned
1✔
28
from OFS.role import RoleManager
1✔
29
from OFS.SimpleItem import Item
1✔
30
from Persistence import Persistent
1✔
31
from ZODB.POSException import ConflictError
1✔
32
from zope.interface import implementer
1✔
33
from ZPublisher.BeforeTraverse import registerBeforeTraverse
1✔
34
from ZPublisher.BeforeTraverse import unregisterBeforeTraverse
1✔
35

36
from .BrowserIdManager import BROWSERID_MANAGER_NAME
1✔
37
from .common import DEBUG
1✔
38
from .interfaces import IMutableSessionDataManager
1✔
39
from .interfaces import SessionDataManagerErr
1✔
40
from .permissions import access_session_data
1✔
41
from .permissions import access_user_session_data
1✔
42
from .permissions import change_session_data_managers
1✔
43
from Products.Transience.Transience import TransientObjectContainer
1✔
44

45

46
bad_path_chars_in = re.compile(r'[^a-zA-Z0-9-_~\,\. \/]').search
1✔
47
LOG = getLogger('SessionDataManager')
1✔
48
# Default settings for the standard temporary session data container
49
default_sdc_settings = {
1✔
50
    'title': '',
51
    'timeout_mins': 20,
52
    'addNotification': '',
53
    'delNotification': '',
54
    'limit': 0,
55
    'period_secs': 20,
56
}
57

58

59
constructSessionDataManagerForm = DTMLFile(
1✔
60
    'dtml/addDataManager',
61
    globals()
62
)
63

64

65
def constructSessionDataManager(
1✔
66
    self,
67
    id,
68
    title='',
69
    path=None,
70
    requestName=None,
71
    REQUEST=None
72
):
73
    """ """
74
    ob = SessionDataManager(id, path, title, requestName)
×
75
    self._setObject(id, ob)
×
76
    if REQUEST is not None:
×
77
        return self.manage_main(self, REQUEST, update_menu=1)
×
78

79

80
class SessionIdManagerErr(Exception):
1✔
81
    pass
1✔
82

83

84
@implementer(IMutableSessionDataManager)
1✔
85
class SessionDataManager(Item, Implicit, Persistent, RoleManager, Owned, Tabs):
1✔
86
    """The Zope default session data manager implementation."""
87

88
    meta_type = 'Session Data Manager'
1✔
89
    zmi_icon = 'far fa-clock'
1✔
90

91
    manage_options = (
1✔
92
        {
93
            'label': 'Settings',
94
            'action': 'manage_sessiondatamgr',
95
        },
96
        {
97
            'label': 'Security',
98
            'action': 'manage_access',
99
        },
100
        {
101
            'label': 'Ownership',
102
            'action': 'manage_owner',
103
        },
104
    )
105

106
    security = ClassSecurityInfo()
1✔
107
    security.declareObjectPublic()
1✔
108

109
    ok = {
1✔
110
        'meta_type': 1,
111
        'id': 1,
112
        'title': 1,
113
        'zmi_icon': 1,
114
        'title_or_id': 1,
115
    }
116
    security.setDefaultAccess(ok)
1✔
117
    security.setPermissionDefault(change_session_data_managers, ['Manager'])
1✔
118
    security.setPermissionDefault(view_management_screens, ['Manager'])
1✔
119
    security.setPermissionDefault(
1✔
120
        access_contents_information,
121
        ['Manager', 'Anonymous'],
122
    )
123
    security.setPermissionDefault(access_user_session_data, ['Manager'])
1✔
124
    security.setPermissionDefault(
1✔
125
        access_session_data,
126
        ['Manager', 'Anonymous', ],
127
    )
128

129
    manage_sessiondatamgr = DTMLFile(
1✔
130
        'dtml/manageDataManager',
131
        globals()
132
    )
133

134
    # INTERFACE METHODS FOLLOW
135

136
    @security.protected(access_session_data)
1✔
137
    def getSessionData(self, create=1):
1✔
138
        """ """
139
        key = self.getBrowserIdManager().getBrowserId(create=create)
1✔
140
        if key is not None:
1✔
141
            return self._getSessionDataObject(key)
1✔
142

143
    @security.protected(access_session_data)
1✔
144
    def hasSessionData(self):
1✔
145
        """ """
146
        key = self.getBrowserIdManager().getBrowserId(create=0)
1✔
147
        if key is None:
1✔
148
            return 0
1✔
149
        return self._hasSessionDataObject(key)
1✔
150

151
    @security.protected(access_user_session_data)
1✔
152
    def clearSessionData(self):
1✔
153
        key = self.getBrowserIdManager().getBrowserId(create=0)
1✔
154
        if key is None:
1!
155
            return
×
156
        return self._clearSessionDataObject(key)
1✔
157

158
    @security.protected(access_user_session_data)
1✔
159
    def getSessionDataByKey(self, key):
1✔
160
        return self._getSessionDataObjectByKey(key)
1✔
161

162
    @security.protected(access_contents_information)
1✔
163
    def getBrowserIdManager(self):
1✔
164
        """ """
165
        mgr = getattr(self, BROWSERID_MANAGER_NAME, None)
1✔
166
        if mgr is None:
1!
167
            raise SessionDataManagerErr(
×
168
                'No browser id manager named %s could be found.' %
169
                BROWSERID_MANAGER_NAME
170
            )
171
        return mgr
1✔
172

173
    # END INTERFACE METHODS
174

175
    def __init__(self, id, path=None, title='', requestName=None):
1✔
176
        self.id = id
1✔
177
        self.setContainerPath(path)
1✔
178
        self.setTitle(title)
1✔
179
        self._requestSessionName = requestName
1✔
180

181
    @security.protected(change_session_data_managers)
1✔
182
    def manage_changeSDM(
1✔
183
        self,
184
        title,
185
        path=None,
186
        requestName=None,
187
        REQUEST=None
188
    ):
189
        """ """
190
        self.setContainerPath(path)
1✔
191
        self.setTitle(title)
1✔
192
        if requestName:
1!
193
            if requestName != self._requestSessionName:
1!
194
                self.updateTraversalData(requestName)
1✔
195
        else:
196
            self.updateTraversalData(None)
×
197
        if REQUEST is not None:
1!
198
            return self.manage_sessiondatamgr(
×
199
                self,
200
                REQUEST,
201
                manage_tabs_message='Changes saved.'
202
            )
203

204
    @security.protected(change_session_data_managers)
1✔
205
    def setTitle(self, title):
1✔
206
        """ """
207
        if not title:
1✔
208
            self.title = ''
1✔
209
        else:
210
            self.title = str(title)
1✔
211

212
    @security.protected(change_session_data_managers)
1✔
213
    def setContainerPath(self, path=None):
1✔
214
        """ """
215
        if not path:
1✔
216
            self.obpath = None  # undefined state
1✔
217
        elif isinstance(path, str):
1✔
218
            if bad_path_chars_in(path):
1✔
219
                raise SessionDataManagerErr(
1✔
220
                    'Container path contains characters invalid in a Zope '
221
                    'object path'
222
                )
223
            self.obpath = path.split('/')
1✔
224
        elif isinstance(path, (list, tuple)):
1✔
225
            self.obpath = list(path)  # sequence
1✔
226
        else:
227
            raise SessionDataManagerErr('Bad path value %s' % path)
1✔
228

229
    @security.protected(view_management_screens)
1✔
230
    def getContainerPath(self):
1✔
231
        """ """
232
        if self.obpath is not None:
1✔
233
            return '/'.join(self.obpath)
1✔
234
        return ''  # blank string represents undefined state
1✔
235

236
    @security.protected(view_management_screens)
1✔
237
    def hasSessionDataContainer(self):
1✔
238
        """ ZMI helper: do we have a valid session data container? """
239
        container = self._getSessionDataContainer()
1✔
240
        if container is not None and \
1!
241
           getattr(container, 'new_or_existing', None) is not None:
242
            return True
×
243

244
    @security.protected(view_management_screens)
1✔
245
    def usesDefaultSessionDataContainer(self):
1✔
246
        """ ZMI helper: is the default temporary folder session container used?
247
        """
248
        return self.obpath == ['', 'temp_folder', 'session_data']
1✔
249

250
    def _hasSessionDataObject(self, key):
1✔
251
        """ """
252
        c = self._getSessionDataContainer()
1✔
253
        return key in c
1✔
254

255
    def _getSessionDataObject(self, key):
1✔
256
        """ returns new or existing session data object """
257
        container = self._getSessionDataContainer()
1✔
258
        if container is None:
1✔
259
            return None
1✔
260

261
        ob = container.new_or_existing(key)
1✔
262
        # hasattr hides conflicts; be explicit by comparing to None
263
        # because otherwise __len__ of the requested object might be called!
264
        if getattr(ob, '__of__', None) is not None and \
1!
265
           getattr(ob, 'aq_parent', None) is not None:
266
            # splice ourselves into the acquisition chain
267
            return ob.__of__(self.__of__(ob.aq_parent))
1✔
268
        return ob.__of__(self)
×
269

270
    def _clearSessionDataObject(self, key):
1✔
271
        """ clear session data object for ``key`` """
272
        container = self._getSessionDataContainer()
1✔
273
        if container is None:
1!
274
            return None
×
275

276
        ob = container.get(key)
1✔
277
        if ob is not None:
1!
278
            ob.clear()
1✔
279

280
    def _getSessionDataObjectByKey(self, key):
1✔
281
        """ returns new or existing session data object """
282
        container = self._getSessionDataContainer()
1✔
283
        if container is None:
1!
284
            return None
×
285

286
        ob = container.get(key)
1✔
287
        if ob is not None:
1!
288
            # hasattr hides conflicts; be explicit by comparing to None
289
            # because otherwise __len__ of the requested object might be
290
            # called!
291
            if getattr(ob, '__of__', None) is not None and \
1!
292
               getattr(ob, 'aq_parent', None) is not None:
293
                # splice ourselves into the acquisition chain
294
                return ob.__of__(self.__of__(ob.aq_parent))
1✔
295
            return ob.__of__(self)
×
296

297
    def _getSessionDataContainer(self):
1✔
298
        """ Do not cache the results of this call.  Doing so breaks the
299
        transactions for mounted storages. """
300
        if self.obpath is None:
1!
301
            err = 'Session data container is unspecified in %s' % self.getId()
×
302
            LOG.warning(err)
×
303
            return None
×
304

305
        try:
1✔
306
            # This should arguably use restrictedTraverse, but it
307
            # currently fails for mounted storages.  This might
308
            # be construed as a security hole, albeit a minor one.
309
            # unrestrictedTraverse is also much faster.
310
            # hasattr hides conflicts
311
            if DEBUG and not getattr(self, '_v_wrote_dc_type', None):
1!
312
                args = '/'.join(self.obpath)
×
313
                LOG.debug('External data container at %s in use' % args)
×
314
                self._v_wrote_dc_type = 1
×
315
            return self.unrestrictedTraverse(self.obpath)
1✔
316
        except ConflictError:
1✔
317
            raise
×
318
        except Exception:
1✔
319
            # If this is a default configuration then the session data
320
            # container is inside a memory-based temporary folder, which
321
            # is wiped after each restart. Try to create it.
322
            if self.usesDefaultSessionDataContainer():
1✔
323
                try:
1✔
324
                    return self._setDefaultSessionDataContainer()
1✔
325
                except (AttributeError, KeyError):
×
326
                    # Temporary folder doesn't exist, give up
327
                    pass
×
328

329
            err = "External session data container '%s' not found."
1✔
330
            LOG.warning(err % '/'.join(self.obpath))
1✔
331
            return None
1✔
332

333
    def _setDefaultSessionDataContainer(self):
1✔
334
        tf = self.unrestrictedTraverse('/temp_folder')
1✔
335
        settings = self.getDefaultSessionDataContainerSettings()
1✔
336
        sdc = TransientObjectContainer('session_data', **settings)
1✔
337
        tf._setObject('session_data', sdc)
1✔
338
        LOG.info('Added session data container at /temp_folder/session_data')
1✔
339

340
        # Prevent accidental deletion by adding it to the reserved names
341
        tf_reserved = getattr(tf, '_reserved_names', ())
1✔
342
        if 'session_data' not in tf_reserved:
1!
343
            tf._reserved_names = tf_reserved + ('session_data',)
1✔
344

345
        return self.unrestrictedTraverse(self.obpath)
1✔
346

347
    @security.protected(view_management_screens)
1✔
348
    def getDefaultSessionDataContainerSettings(self):
1✔
349
        """ ZMI helper: Return the session data container default settings """
350
        return getattr(self, '_sdc_settings', default_sdc_settings)
1✔
351

352
    @security.protected(change_session_data_managers)
1✔
353
    def manage_changeSDCDefaults(self,
1✔
354
                                 title='',
355
                                 timeout_mins=20,
356
                                 addNotification='',
357
                                 delNotification='',
358
                                 limit=0,
359
                                 period_secs=20,
360
                                 REQUEST=None):
361
        """ Collect settings for the default session data container """
362
        self._sdc_settings = {
1✔
363
            'title': title,
364
            'timeout_mins': timeout_mins,
365
            'addNotification': addNotification,
366
            'delNotification': delNotification,
367
            'limit': limit,
368
            'period_secs': period_secs,
369
        }
370

371
        if REQUEST is not None:
1!
372
            msg = ('Session data container changes saved. They will be applied'
×
373
                   ' when you restart the Zope process.')
374
            return self.manage_sessiondatamgr(manage_tabs_message=msg)
×
375

376
    @security.protected(view_management_screens)
1✔
377
    def getRequestName(self):
1✔
378
        """ """
379
        return self._requestSessionName or ''
1✔
380

381
    def manage_afterAdd(self, item, container):
1✔
382
        """ Add our traversal hook """
383
        self.updateTraversalData(self._requestSessionName)
1✔
384

385
    def manage_beforeDelete(self, item, container):
1✔
386
        """ Clean up on delete """
387
        self.updateTraversalData(None)
×
388

389
    def updateTraversalData(self, requestSessionName=None):
1✔
390
        # Note this cant be called directly at add -- manage_afterAdd will
391
        # work though.
392
        parent = aq_parent(aq_inner(self))
1✔
393

394
        if getattr(self, '_hasTraversalHook', None):
1✔
395
            unregisterBeforeTraverse(parent, 'SessionDataManager')
1✔
396
            del self._hasTraversalHook
1✔
397
            self._requestSessionName = None
1✔
398

399
        if requestSessionName:
1!
400
            hook = SessionDataManagerTraverser(requestSessionName, self.id)
1✔
401
            registerBeforeTraverse(parent, hook, 'SessionDataManager', 50)
1✔
402
            self._hasTraversalHook = 1
1✔
403
            self._requestSessionName = requestSessionName
1✔
404

405

406
InitializeClass(SessionDataManager)
1✔
407

408

409
class SessionDataManagerTraverser(Persistent):
1✔
410

411
    def __init__(self, requestSessionName, sessionDataManagerName):
1✔
412
        self._requestSessionName = requestSessionName
1✔
413
        self._sessionDataManager = sessionDataManagerName
1✔
414

415
    def __call__(self, container, request):
1✔
416
        """
417
        This method places a session data object reference in
418
        the request.  It is called on each and every request to Zope in
419
        Zopes after 2.5.0 when there is a session data manager installed
420
        in the root.
421
        """
422
        try:
1✔
423
            sdmName = self._sessionDataManager
1✔
424
            if not isinstance(sdmName, str):
1!
425
                # Zopes v2.5.0 - 2.5.1b1 stuck the actual session data
426
                # manager object in _sessionDataManager in order to use
427
                # its getSessionData method.  We don't actually want to
428
                # do this, because it's safer to use getattr to get the
429
                # data manager object by name.  Using getattr also puts
430
                # the sdm in the right context automatically.  Here we
431
                # pay the penance for backwards compatibility:
432
                sdmName = sdmName.id
×
433
            sdm = getattr(container, sdmName)
1✔
434
            getSessionData = sdm.getSessionData
1✔
435
        except Exception:
×
436
            msg = 'Session automatic traversal failed to get session data'
×
437
            LOG.warning(msg, exc_info=sys.exc_info())
×
438
            return
×
439

440
        # set the getSessionData method in the "lazy" namespace
441
        if self._requestSessionName is not None:
1!
442
            request.set_lazy(self._requestSessionName, getSessionData)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc