• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / ZODB / 18153960591

01 Oct 2025 06:50AM UTC coverage: 83.781% (-0.03%) from 83.811%
18153960591

Pull #415

github

web-flow
Update docs/articles/old-guide/convert_zodb_guide.py

Co-authored-by: Michael Howitz <icemac@gmx.net>
Pull Request #415: Apply the latest zope.meta templates

2441 of 3542 branches covered (68.92%)

193 of 257 new or added lines in 48 files covered. (75.1%)

12 existing lines in 6 files now uncovered.

13353 of 15938 relevant lines covered (83.78%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.14
/src/ZODB/tests/util.py
1
##############################################################################
2
#
3
# Copyright (c) 2003 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""Conventience function for creating test databases
15
"""
16
import atexit
1✔
17
import doctest
1✔
18
import functools
1✔
19
import os
1✔
20
import pdb  # NOQA: T100 pdb import
1✔
21
import re
1✔
22
import runpy
1✔
23
import sys
1✔
24
import tempfile
1✔
25
import time
1✔
26
import unittest
1✔
27
import warnings
1✔
28
from time import gmtime as _real_gmtime
1✔
29
from time import time as _real_time
1✔
30
from unittest import mock
1✔
31

32
import persistent
1✔
33
import transaction
1✔
34
import zope.testing.setupstack
1✔
35
from zope.testing import renormalizing
1✔
36

37
import ZODB.utils
1✔
38
from ZODB.Connection import TransactionMetaData
1✔
39
# BBB
40
from ZODB.MappingStorage import DB  # noqa: F401 import unused
1✔
41

42

43
_current_time = _real_time()
1✔
44

45

46
checker = renormalizing.RENormalizing([
1✔
47
    (re.compile("<(.*?) object at 0x[0-9a-f]*?>"),
48
     r"<\1 object at 0x000000000000>"),
49
    # Persistent 4.4 changes the repr of persistent subclasses,
50
    # and it is slightly different with the C extension and
51
    # pure-Python module
52
    (re.compile('ZODB.tests.testcrossdatabasereferences.'),
53
     ''),
54
])
55

56

57
def setUp(test, name='test'):
1✔
58
    clear_transaction_syncs()
1✔
59
    transaction.abort()
1✔
60
    d = tempfile.mkdtemp(prefix=name)
1✔
61
    zope.testing.setupstack.register(test, zope.testing.setupstack.rmtree, d)
1✔
62
    zope.testing.setupstack.register(
1✔
63
        test, setattr, tempfile, 'tempdir', tempfile.tempdir)
64
    tempfile.tempdir = d
1✔
65
    zope.testing.setupstack.register(test, os.chdir, os.getcwd())
1✔
66
    os.chdir(d)
1✔
67
    zope.testing.setupstack.register(test, transaction.abort)
1✔
68

69

70
def tearDown(test):
1✔
71
    clear_transaction_syncs()
1✔
72
    zope.testing.setupstack.tearDown(test)
1✔
73

74

75
class TestCase(unittest.TestCase):
1✔
76

77
    def setUp(self):
1✔
78
        self.globs = {}
1✔
79
        name = self.__class__.__name__
1✔
80
        mname = getattr(self, '_TestCase__testMethodName', '')
1✔
81
        if mname:
1!
82
            name += '-' + mname
×
83
        setUp(self, name)
1✔
84

85
    tearDown = tearDown
1✔
86

87
    # propagate .level from tested method to TestCase so that e.g. @long_test
88
    # works
89
    @property
1✔
90
    def level(self):
1✔
91
        f = getattr(self, self._testMethodName)
1✔
92
        return getattr(f, 'level', 1)
1✔
93

94

95
def long_test(f):
1✔
96
    """
97
    long_test decorates f to be marked as long-running test.
98

99
    Use `zope-testrunner --at-level=1` to run tests without the long-ones.
100
    """
101
    f.level = 2
1✔
102
    return f
1✔
103

104

105
def pack(db):
1✔
106
    db.pack(time.time() + 1)
1✔
107

108

109
class P(persistent.Persistent):
1✔
110

111
    def __init__(self, name=None):
1✔
112
        self.name = name
1✔
113

114
    def __repr__(self):
1✔
115
        return 'P(%s)' % self.name
1✔
116

117

118
class MininalTestLayer:
1✔
119

120
    __bases__ = ()
1✔
121
    __module__ = ''
1✔
122

123
    def __init__(self, name):
1✔
124
        self.__name__ = name
1✔
125

126
    def setUp(self):
1✔
127
        self.here = os.getcwd()
1✔
128
        self.tmp = tempfile.mkdtemp(self.__name__, dir=os.getcwd())
1✔
129
        os.chdir(self.tmp)
1✔
130

131
        # sigh. tearDown isn't called when a layer is run in a sub-process.
132
        atexit.register(clean, self.tmp)
1✔
133

134
    def tearDown(self):
1✔
135
        os.chdir(self.here)
1✔
136
        zope.testing.setupstack.rmtree(self.tmp)
1✔
137

138
    testSetUp = testTearDown = lambda self: None
1✔
139

140

141
def clean(tmp):
1✔
142
    if os.path.isdir(tmp):
×
143
        zope.testing.setupstack.rmtree(tmp)
×
144

145

146
class AAAA_Test_Runner_Hack(unittest.TestCase):
1✔
147
    """Hack to work around a bug in the test runner.
148

149
    The first later (lex sorted) is run first in the foreground
150
    """
151

152
    layer = MininalTestLayer('!no tests here!')
1✔
153

154
    def testNothing(self):
1✔
155
        pass
×
156

157

158
def assert_warning(category, func, warning_text=''):
1✔
159
    with warnings.catch_warnings(record=True) as w:
1✔
160
        warnings.simplefilter('default')
1✔
161
        result = func()
1✔
162
        for warning in w:
1✔
163
            if ((warning.category is category)
1!
164
                    and (warning_text in str(warning.message))):
165
                return result
1✔
166
        raise AssertionError(w)
167

168

169
def assert_deprecated(func, warning_text=''):
1✔
170
    return assert_warning(DeprecationWarning, func, warning_text)
1✔
171

172

173
def wait(func=None, timeout=30):
1✔
174
    if func is None:
×
175
        return lambda f: wait(f, timeout)
×
NEW
176
    for _ in range(int(timeout * 100)):
×
177
        if func():
×
178
            return
×
179
        time.sleep(.01)
×
180
    raise AssertionError
181

182

183
def store(storage, oid, value='x', serial=ZODB.utils.z64):
1✔
184
    if not isinstance(oid, bytes):
1✔
185
        oid = ZODB.utils.p64(oid)
1✔
186
    if not isinstance(serial, bytes):
1!
187
        serial = ZODB.utils.p64(serial)
×
188
    t = TransactionMetaData()
1✔
189
    storage.tpc_begin(t)
1✔
190
    storage.store(oid, serial, value, '', t)
1✔
191
    storage.tpc_vote(t)
1✔
192
    storage.tpc_finish(t)
1✔
193

194

195
def mess_with_time(test=None, globs=None, now=1278864701.5):
1✔
196
    now = [now]
×
197

198
    def faux_time():
×
199
        now[0] += 1
×
200
        return now[0]
×
201

202
    if test is None and globs is not None:
×
203
        # sigh
204
        faux_time.globs = globs
×
205
        test = faux_time
×
206

207
    import time
×
208
    zope.testing.setupstack.register(test, setattr, time, 'time', time.time)
×
209

210
    if isinstance(time, type):
×
211
        time.time = staticmethod(faux_time)  # jython
×
212
    else:
213
        time.time = faux_time
×
214

215

216
def clear_transaction_syncs():
1✔
217
    """Clear data managers registered with the global transaction manager
218

219
    Many tests don't clean up synchronizer's registered with the
220
    global transaction managers, which can wreak havoc with following
221
    tests, now that connections interact with their storages at
222
    transaction boundaries.  We need to make sure that we clear any
223
    registered data managers.
224

225
    For now, we'll use the transaction manager's
226
    underware. Eventually, an transaction managers need to grow an API
227
    for this.
228
    """
229
    transaction.manager.clearSynchs()
1✔
230

231

232
class _TimeWrapper:
1✔
233

234
    def __init__(self, granularity=1.0):
1✔
235
        self._granularity = granularity
1✔
236
        self._lock = ZODB.utils.Lock()
1✔
237
        self.fake_gmtime = mock.Mock()
1✔
238
        self.fake_time = mock.Mock()
1✔
239
        self._configure_fakes()
1✔
240

241
    def _configure_fakes(self):
1✔
242
        def incr():
1✔
243
            global _current_time  # pylint:disable=global-statement
244
            with self._lock:
1✔
245
                _current_time = max(
1✔
246
                    _real_time(), _current_time + self._granularity)
247
            return _current_time
1✔
248
        self.fake_time.side_effect = incr
1✔
249

250
        def incr_gmtime(seconds=None):
1✔
251
            if seconds is not None:
1✔
252
                now = seconds
1✔
253
            else:
254
                now = incr()
1✔
255
            return _real_gmtime(now)
1✔
256
        self.fake_gmtime.side_effect = incr_gmtime
1✔
257

258
    def install_fakes(self):
1✔
259
        time.time = self.fake_time
1✔
260
        time.gmtime = self.fake_gmtime
1✔
261

262
    __enter__ = install_fakes
1✔
263

264
    def close(self, *args):
1✔
265
        time.time = _real_time
1✔
266
        time.gmtime = _real_gmtime
1✔
267

268
    __exit__ = close
1✔
269

270
    def __call__(self, func):
1✔
271
        @functools.wraps(func)
1✔
272
        def wrapper(*args, **kwargs):
1✔
273
            with self:
1✔
274
                return func(*args, **kwargs)
1✔
275
        return wrapper
1✔
276

277

278
def time_monotonically_increases(func_or_granularity):
1✔
279
    """
280
    Decorate a unittest method with this function to cause the value
281
    of :func:`time.time` and :func:`time.gmtime` to monotonically
282
    increase by one each time it is called. This ensures things like
283
    last modified dates always increase.
284

285
    We make three guarantees about the value of :func:`time.time`
286
    returned while the decorated function is running:
287

288
        1. It is always *at least* the value of the *real*
289
           :func:`time.time`;
290

291
        2. Each call returns a value greater than the previous call;
292

293
        3. Those two constraints hold across different invocations of
294
           functions decorated. This decorator can be applied to a
295
           method in a test case::
296

297
               class TestThing(unittest.TestCase)
298
                   @time_monotonically_increases
299
                   def test_method(self):
300
                     t = time.time()
301
                      ...
302

303
    It can also be applied to a bare function taking any number of
304
    arguments::
305

306
        @time_monotonically_increases
307
        def utility_function(a, b, c=1):
308
           t = time.time()
309
           ...
310

311
    By default, the time will be incremented in 1.0 second intervals.
312
    You can specify a particular granularity as an argument; this is
313
    useful to keep from running too far ahead of the real clock::
314

315
        @time_monotonically_increases(0.1)
316
        def smaller_increment():
317
            t1 = time.time()
318
            t2 = time.time()
319
            assrt t2 == t1 + 0.1
320
    """
321
    if isinstance(func_or_granularity, ((int,), float)):
1✔
322
        # We're being used as a factory.
323
        wrapper_factory = _TimeWrapper(func_or_granularity)
1✔
324
        return wrapper_factory
1✔
325

326
    # We're being used bare
327
    wrapper_factory = _TimeWrapper()
1✔
328
    return wrapper_factory(func_or_granularity)
1✔
329

330

331
def reset_monotonic_time(value=0.0):
1✔
332
    """
333
    Make the monotonic clock return the real time on its next
334
    call.
335
    """
336

337
    global _current_time  # pylint:disable=global-statement
338
    _current_time = value
1✔
339

340

341
class MonotonicallyIncreasingTimeMinimalTestLayer(MininalTestLayer):
1✔
342

343
    def testSetUp(self):
1✔
344
        self.time_manager = _TimeWrapper()
1✔
345
        self.time_manager.install_fakes()
1✔
346

347
    def testTearDown(self):
1✔
348
        self.time_manager.close()
1✔
349
        reset_monotonic_time()
1✔
350

351

352
def with_high_concurrency(f):
1✔
353
    """
354
    with_high_concurrency decorates f to run with high frequency of thread
355
    context switches.
356

357
    It is useful for tests that try to probabilistically reproduce race
358
    condition scenarios.
359
    """
360
    @functools.wraps(f)
1✔
361
    def _(*argv, **kw):
1✔
362
        # Python3, by default, switches every 5ms, which turns threads in
363
        # intended "high concurrency" scenarios to execute almost serially.
364
        # Raise the frequency of context switches in order to increase the
365
        # probability to reproduce interesting/tricky overlapping of
366
        # threads.
367
        #
368
        # See https://github.com/zopefoundation/ZODB/pull/345#issuecomment-822188305 and  # noqa: E501 line too long
369
        # https://github.com/zopefoundation/ZEO/issues/168#issuecomment-821829116 for details.  # noqa: E501 line too long
370
        _ = sys.getswitchinterval()
1✔
371

372
        def restore():
1✔
373
            sys.setswitchinterval(_)
1✔
374
        # ~ 100 simple instructions on modern hardware
375
        sys.setswitchinterval(5e-6)
1✔
376

377
        try:
1✔
378
            return f(*argv, **kw)
1✔
379
        finally:
380
            restore()
1✔
381

382
    return _
1✔
383

384

385
def run_module_as_script(mod, args, stdout="stdout", stderr="stderr"):
1✔
386
    """run module *mod* as script with arguments *arg*.
387

388
    stdout and stderr are redirected to files given by the
389
    correcponding parameters.
390

391
    The function is usually called in a ``setUp/tearDown`` frame
392
    which will remove the created files.
393
    """
394
    sargv, sout, serr = sys.argv, sys.stdout, sys.stderr
1✔
395
    s_set_trace = pdb.set_trace
1✔
396
    try:
1✔
397
        sys.argv = [sargv[0]] + args
1✔
398
        sys.stdout = open(stdout, "w")
1✔
399
        sys.stderr = open(stderr, "w")
1✔
400
        # to allow debugging
401
        pdb.set_trace = doctest._OutputRedirectingPdb(sout)
1✔
402
        runpy.run_module(mod, run_name="__main__", alter_sys=True)
1✔
403
    finally:
404
        sys.stdout.close()
1✔
405
        sys.stderr.close()
1✔
406
        pdb.set_trace = s_set_trace
1✔
407
        sys.argv, sys.stdout, sys.stderr = sargv, sout, serr
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc