• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.Sessions / 16399689495

05 Apr 2025 07:12AM UTC coverage: 94.731%. Remained the same
16399689495

push

github

dataflake
- vb [ci skip]

146 of 180 branches covered (81.11%)

Branch coverage included in aggregate %.

1526 of 1585 relevant lines covered (96.28%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.65
/src/Products/Sessions/tests/testSessionDataManager.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE
11
#
12
##############################################################################
13
import unittest
1✔
14
import warnings
1✔
15

16

17
tf_name = 'temp_folder'
1✔
18
idmgr_name = 'browser_id_manager'
1✔
19
toc_name = 'temp_transient_container'
1✔
20
sdm_name = 'session_data_manager'
1✔
21

22
stuff = {}
1✔
23

24

25
def _getDB(use_temporary_folder=False):
1✔
26
    import transaction
1✔
27
    from OFS.Application import Application
1✔
28
    db = stuff.get('db')
1✔
29
    if not db:
1!
30
        from ZODB import DB
1✔
31
        from ZODB.DemoStorage import DemoStorage
1✔
32
        ds = DemoStorage()
1✔
33
        db = DB(ds, pool_size=60)
1✔
34
        conn = db.open()
1✔
35
        root = conn.root()
1✔
36
        app = Application()
1✔
37
        root['Application'] = app
1✔
38
        transaction.savepoint(optimistic=True)
1✔
39
        _populate(app, use_temporary_folder)
1✔
40
        stuff['db'] = db
1✔
41
        conn.close()
1✔
42
    return db
1✔
43

44

45
def _delDB():
1✔
46
    import transaction
1✔
47
    transaction.abort()
1✔
48
    del stuff['db']
1✔
49

50

51
def _populate(app, use_temporary_folder=False):
1✔
52
    import transaction
1✔
53
    from OFS.DTMLMethod import DTMLMethod
1✔
54
    from OFS.Folder import Folder
1✔
55

56
    from Products.TemporaryFolder.TemporaryFolder import MountedTemporaryFolder
1✔
57

58
    from ..BrowserIdManager import BrowserIdManager
1✔
59
    from ..SessionDataManager import SessionDataManager
1✔
60
    from Products.Transience.Transience import TransientObjectContainer
1✔
61
    bidmgr = BrowserIdManager(idmgr_name)
1✔
62
    if use_temporary_folder:
1✔
63
        tf = MountedTemporaryFolder(tf_name, title="Temporary Folder")
1✔
64
    else:
65
        tf = Folder(tf_name)
1✔
66
    toc = TransientObjectContainer(
1✔
67
        toc_name,
68
        title='Temporary '
69
        'Transient Object Container',
70
        timeout_mins=20
71
    )
72
    session_data_manager = SessionDataManager(
1✔
73
        id=sdm_name,
74
        path='/' + tf_name + '/' + toc_name,
75
        title='Session Data Manager',
76
        requestName='TESTOFSESSION'
77
    )
78

79
    try:
1✔
80
        app._delObject(idmgr_name)
1✔
81
    except (AttributeError, KeyError):
1✔
82
        pass
1✔
83

84
    try:
1✔
85
        app._delObject(tf_name)
1✔
86
    except (AttributeError, KeyError):
1✔
87
        pass
1✔
88

89
    try:
1✔
90
        app._delObject(sdm_name)
1✔
91
    except (AttributeError, KeyError):
1✔
92
        pass
1✔
93

94
    try:
1✔
95
        app._delObject('index_html')
1✔
96
    except (AttributeError, KeyError):
1✔
97
        pass
1✔
98

99
    app._setObject(idmgr_name, bidmgr)
1✔
100

101
    app._setObject(sdm_name, session_data_manager)
1✔
102

103
    with warnings.catch_warnings():
1✔
104
        warnings.simplefilter('ignore')
1✔
105
        app._setObject(tf_name, tf)
1✔
106
    transaction.commit()
1✔
107

108
    app.temp_folder._setObject(toc_name, toc)
1✔
109
    transaction.commit()
1✔
110

111
    # index_html necessary for publishing emulation for testAutoReqPopulate
112
    app._setObject('index_html', DTMLMethod('', __name__='foo'))
1✔
113
    transaction.commit()
1✔
114

115

116
class TestSessionManager(unittest.TestCase):
1✔
117

118
    def setUp(self):
1✔
119
        from Testing import makerequest
1✔
120
        db = _getDB()
1✔
121
        conn = db.open()
1✔
122
        root = conn.root()
1✔
123
        self.app = makerequest.makerequest(root['Application'])
1✔
124
        self.timeout = 1
1✔
125

126
    def tearDown(self):
1✔
127
        _delDB()
1✔
128
        self.app._p_jar.close()
1✔
129
        del self.app
1✔
130

131
    def test_initialization(self):
1✔
132
        from ..SessionDataManager import SessionDataManager
1✔
133
        sdm = SessionDataManager('testing')
1✔
134

135
        self.assertEqual(sdm.getId(), 'testing')
1✔
136
        self.assertEqual(sdm.title, '')
1✔
137
        self.assertFalse(sdm.getContainerPath())
1✔
138
        self.assertFalse(sdm.getRequestName())
1✔
139

140
        sdm2 = SessionDataManager('testing',
1✔
141
                                  title='Test Title',
142
                                  path='/foo/bar',
143
                                  requestName='SESS')
144
        self.assertEqual(sdm2.getId(), 'testing')
1✔
145
        self.assertEqual(sdm2.title, 'Test Title')
1✔
146
        self.assertEqual(sdm2.getContainerPath(), '/foo/bar')
1✔
147
        self.assertEqual(sdm2.getRequestName(), 'SESS')
1✔
148

149
    def test_manage_changeSDM(self):
1✔
150
        sdm = self.app.session_data_manager
1✔
151

152
        sdm.manage_changeSDM(title='Test Title',
1✔
153
                             path='/foo/bar',
154
                             requestName='SESS')
155
        self.assertEqual(sdm.title, 'Test Title')
1✔
156
        self.assertEqual(sdm.getContainerPath(), '/foo/bar')
1✔
157
        self.assertEqual(sdm.getRequestName(), 'SESS')
1✔
158

159
    def test_setTitle(self):
1✔
160
        sdm = self.app.session_data_manager
1✔
161

162
        sdm.setTitle(None)
1✔
163
        self.assertEqual(sdm.title, '')
1✔
164

165
        sdm.setTitle('')
1✔
166
        self.assertEqual(sdm.title, '')
1✔
167

168
        sdm.setTitle('foo')
1✔
169
        self.assertEqual(sdm.title, 'foo')
1✔
170

171
    def test_setContainerPath(self):
1✔
172
        from ..interfaces import SessionDataManagerErr
1✔
173

174
        sdm = self.app.session_data_manager
1✔
175

176
        sdm.setContainerPath()
1✔
177
        self.assertFalse(sdm.getContainerPath())
1✔
178

179
        sdm.setContainerPath('')
1✔
180
        self.assertFalse(sdm.getContainerPath())
1✔
181

182
        with self.assertRaises(SessionDataManagerErr):
1✔
183
            sdm.setContainerPath('\\_I_am_not_allowed')
1✔
184

185
        with self.assertRaises(SessionDataManagerErr):
1✔
186
            sdm.setContainerPath(99)
1✔
187

188
        sdm.setContainerPath('/foo/bar/baz')
1✔
189
        self.assertEqual(sdm.getContainerPath(), '/foo/bar/baz')
1✔
190

191
        sdm.setContainerPath(('', 'foo', 'bar', 'baz'))
1✔
192
        self.assertEqual(sdm.getContainerPath(), '/foo/bar/baz')
1✔
193

194
        sdm.setContainerPath(['', 'foo', 'bar', 'baz'])
1✔
195
        self.assertEqual(sdm.getContainerPath(), '/foo/bar/baz')
1✔
196

197
    def testHasId(self):
1✔
198
        self.assertEqual(
1✔
199
            self.app.session_data_manager.id,
200
            sdm_name
201
        )
202

203
    def testHasTitle(self):
1✔
204
        self.assertEqual(
1✔
205
            self.app.session_data_manager.title,
206
            'Session Data Manager'
207
        )
208

209
    def testGetSessionDataNoCreate(self):
1✔
210
        sd = self.app.session_data_manager.getSessionData(0)
1✔
211
        self.assertIsNone(sd)
1✔
212

213
    def testGetSessionDataCreate(self):
1✔
214
        from Products.Transience.Transience import TransientObject
1✔
215
        sd = self.app.session_data_manager.getSessionData(1)
1✔
216
        self.assertIs(sd.__class__, TransientObject)
1✔
217

218
    def testHasSessionData(self):
1✔
219
        self.app.session_data_manager.getSessionData()
1✔
220
        self.assertTrue(self.app.session_data_manager.hasSessionData())
1✔
221

222
    def testNotHasSessionData(self):
1✔
223
        self.assertTrue(not self.app.session_data_manager.hasSessionData())
1✔
224

225
    def testSessionDataWrappedInSDMandTOC(self):
1✔
226
        from Acquisition import aq_base
1✔
227
        sd = self.app.session_data_manager.getSessionData(1)
1✔
228
        sdm = aq_base(getattr(self.app, sdm_name))
1✔
229
        toc = aq_base(getattr(self.app.temp_folder, toc_name))
1✔
230

231
        self.assertIs(aq_base(sd.aq_parent), sdm)
1✔
232
        self.assertIs(aq_base(sd.aq_parent.aq_parent), toc)
1✔
233

234
    def testNewSessionDataObjectIsValid(self):
1✔
235
        from Acquisition import aq_base
1✔
236

237
        from Products.Transience.Transience import TransientObject
1✔
238

239
        sdType = type(TransientObject(1))
1✔
240
        sd = self.app.session_data_manager.getSessionData()
1✔
241
        self.assertIs(type(aq_base(sd)), sdType)
1✔
242
        self.assertTrue(not hasattr(sd, '_invalid'))
1✔
243

244
    def testBrowserIdIsSet(self):
1✔
245
        self.app.session_data_manager.getSessionData()
1✔
246
        mgr = getattr(self.app, idmgr_name)
1✔
247
        self.assertTrue(mgr.hasBrowserId())
1✔
248

249
    def testGetSessionDataByKey(self):
1✔
250
        sd = self.app.session_data_manager.getSessionData()
1✔
251
        mgr = getattr(self.app, idmgr_name)
1✔
252
        token = mgr.getBrowserId()
1✔
253
        bykeysd = self.app.session_data_manager.getSessionDataByKey(token)
1✔
254
        self.assertEqual(sd, bykeysd)
1✔
255

256
    def testClearSessionData(self):
1✔
257
        sd = self.app.session_data_manager.getSessionData()
1✔
258
        sd.set('somekey', 'somevalue')
1✔
259
        self.assertEqual(sd.get('somekey'), 'somevalue')
1✔
260

261
        self.app.session_data_manager.clearSessionData()
1✔
262
        sd = self.app.session_data_manager.getSessionData()
1✔
263
        self.assertIsNone(sd.get('somekey'))
1✔
264

265
    def testBadExternalSDCPath(self):
1✔
266
        sdm = self.app.session_data_manager
1✔
267
        # fake out webdav
268
        sdm.REQUEST['REQUEST_METHOD'] = 'GET'
1✔
269
        sdm.setContainerPath('/fudgeffoloo')
1✔
270
        self.assertFalse(sdm.hasSessionDataContainer())
1✔
271
        self.assertIsNone(self.app.session_data_manager.getSessionData())
1✔
272

273
    def testInvalidateSessionDataObject(self):
1✔
274
        sdm = self.app.session_data_manager
1✔
275
        sd = sdm.getSessionData()
1✔
276
        sd['test'] = 'Its alive!  Alive!'
1✔
277
        sd.invalidate()
1✔
278
        self.assertTrue(not sdm.getSessionData().has_key('test'))  # NOQA: W601
1✔
279
        self.assertNotIn('test', sdm.getSessionData())
1✔
280

281
    def testGhostUnghostSessionManager(self):
1✔
282
        import transaction
1✔
283
        sdm = self.app.session_data_manager
1✔
284
        transaction.commit()
1✔
285
        sd = sdm.getSessionData()
1✔
286
        sd.set('foo', 'bar')
1✔
287
        sdm._p_changed = None
1✔
288
        transaction.commit()
1✔
289
        self.assertEqual(sdm.getSessionData().get('foo'), 'bar')
1✔
290

291
    def testSubcommitAssignsPJar(self):
1✔
292
        global DummyPersistent  # so pickle can find it
293
        import transaction
1✔
294
        from Persistence import Persistent
1✔
295

296
        class DummyPersistent(Persistent):
1✔
297
            pass
1✔
298

299
        sd = self.app.session_data_manager.getSessionData()
1✔
300
        dummy = DummyPersistent()
1✔
301
        sd.set('dp', dummy)
1✔
302
        self.assertIsNone(sd['dp']._p_jar)
1✔
303
        transaction.savepoint(optimistic=True)
1✔
304
        self.assertIsNotNone(sd['dp']._p_jar)
1✔
305

306
    def testAqWrappedObjectsFail(self):
1✔
307
        import transaction
1✔
308
        from Acquisition import Implicit
1✔
309

310
        class DummyAqImplicit(Implicit):
1✔
311
            pass
1✔
312
        a = DummyAqImplicit()
1✔
313
        b = DummyAqImplicit()
1✔
314
        aq_wrapped = a.__of__(b)
1✔
315
        sd = self.app.session_data_manager.getSessionData()
1✔
316
        sd.set('foo', aq_wrapped)
1✔
317
        self.assertRaises(TypeError, transaction.commit)
1✔
318

319
    def testAutoReqPopulate(self):
1✔
320
        self.app.REQUEST['PARENTS'] = [self.app]
1✔
321
        self.app.REQUEST['URL'] = 'a'
1✔
322
        self.app.REQUEST.traverse('/')
1✔
323
        self.assertIn('TESTOFSESSION', self.app.REQUEST)
1✔
324

325
    def testUnlazifyAutoPopulated(self):
1✔
326
        from Acquisition import aq_base
1✔
327

328
        from Products.Transience.Transience import TransientObject
1✔
329

330
        self.app.REQUEST['PARENTS'] = [self.app]
1✔
331
        self.app.REQUEST['URL'] = 'a'
1✔
332
        self.app.REQUEST.traverse('/')
1✔
333
        sess = self.app.REQUEST['TESTOFSESSION']
1✔
334
        sdType = type(TransientObject(1))
1✔
335
        self.assertIs(type(aq_base(sess)), sdType)
1✔
336

337
    def testUsesDefaultSessionDataContainer(self):
1✔
338
        sdm = self.app.session_data_manager
1✔
339

340
        sdm.setContainerPath('/foo/bar')
1✔
341
        self.assertFalse(sdm.usesDefaultSessionDataContainer())
1✔
342

343
        sdm.setContainerPath('/temp_folder/session_data')
1✔
344
        self.assertTrue(sdm.usesDefaultSessionDataContainer())
1✔
345

346
    def testDefaultSessionDataContainerCreation(self):
1✔
347
        sdm = self.app.session_data_manager
1✔
348
        default_path = '/temp_folder/session_data'
1✔
349

350
        # At first the configuration does not use the defaults
351
        self.assertNotEqual(sdm.getContainerPath(), default_path)
1✔
352
        sdc = sdm._getSessionDataContainer()
1✔
353
        self.assertNotEqual(sdc.absolute_url_path(), default_path)
1✔
354
        with self.assertRaises(KeyError):
1✔
355
            self.app.unrestrictedTraverse(default_path)
1✔
356

357
        # Set the default, now a session data container will get created
358
        sdm.setContainerPath('/temp_folder/session_data')
1✔
359
        sdc = sdm._getSessionDataContainer()
1✔
360
        self.assertEqual(sdc.absolute_url_path(), default_path)
1✔
361
        self.app.unrestrictedTraverse(default_path)
1✔
362

363
    def testDefaultSessionDataContainerSettings(self):
1✔
364
        from ..SessionDataManager import default_sdc_settings
1✔
365

366
        sdm = self.app.session_data_manager
1✔
367

368
        self.assertEqual(sdm.getDefaultSessionDataContainerSettings(),
1✔
369
                         default_sdc_settings)
370

371
        new_settings = {
1✔
372
            'title': 'Foo title',
373
            'timeout_mins': 30,
374
            'addNotification': '/call/me',
375
            'delNotification': '/call/me/again',
376
            'limit': 500,
377
            'period_secs': 10,
378
        }
379
        sdm.manage_changeSDCDefaults(**new_settings)
1✔
380
        self.assertDictEqual(sdm.getDefaultSessionDataContainerSettings(),
1✔
381
                             new_settings)
382

383
        # Make sure the settings are applied
384
        sdm.setContainerPath('/temp_folder/session_data')
1✔
385
        sdc = sdm._getSessionDataContainer()
1✔
386
        self.assertEqual(sdc.getTimeoutMinutes(), 30)
1✔
387
        self.assertEqual(sdc.getPeriodSeconds(), 10)
1✔
388
        self.assertEqual(sdc.getSubobjectLimit(), 500)
1✔
389
        self.assertEqual(sdc.getAddNotificationTarget(), '/call/me')
1✔
390
        self.assertEqual(sdc.getDelNotificationTarget(), '/call/me/again')
1✔
391

392

393
class TestSessionManagerWithTemporaryFolder(unittest.TestCase):
1✔
394

395
    def setUp(self):
1✔
396
        from Testing import makerequest
1✔
397
        db = _getDB(use_temporary_folder=True)
1✔
398
        conn = db.open()
1✔
399
        root = conn.root()
1✔
400
        self.app = makerequest.makerequest(root['Application'])
1✔
401
        self.timeout = 1
1✔
402

403
    def tearDown(self):
1✔
404
        _delDB()
1✔
405
        self.app._p_jar.close()
1✔
406
        del self.app
1✔
407

408
    def testForeignObject(self):
1✔
409
        from ZODB.POSException import InvalidObjectReference
1✔
410
        self.assertRaises(InvalidObjectReference, self._foreignAdd)
1✔
411

412
    def _foreignAdd(self):
1✔
413
        import transaction
1✔
414
        ob = self.app.session_data_manager
1✔
415

416
        # we don't want to fail due to an acquisition wrapper
417
        ob = ob.aq_base
1✔
418

419
        # we want to fail for some other reason:
420
        sd = self.app.session_data_manager.getSessionData()
1✔
421
        sd.set('foo', ob)
1✔
422
        transaction.commit()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc