• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / ZODB / 18153960591

01 Oct 2025 06:50AM UTC coverage: 83.781% (-0.03%) from 83.811%
18153960591

Pull #415

github

web-flow
Update docs/articles/old-guide/convert_zodb_guide.py

Co-authored-by: Michael Howitz <icemac@gmx.net>
Pull Request #415: Apply the latest zope.meta templates

2441 of 3542 branches covered (68.92%)

193 of 257 new or added lines in 48 files covered. (75.1%)

12 existing lines in 6 files now uncovered.

13353 of 15938 relevant lines covered (83.78%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.87
/src/ZODB/tests/racetest.py
1
##############################################################################
2
#
3
# Copyright (c) 2019 - 2022 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""Module racetest provides infrastructure and tests to verify storages against
15
data corruptions caused by race conditions in storages implementations.
16

17
It works by combining
18

19
  1) testing models for application behaviour, with
20
  2) model checkers, that drive provided application model through particular
21
     scenarios that are likely to hit specific race conditions in storage
22
     implementation.
23

24
If a race condition is hit, it is detected as a breakage of invariant defined
25
in the model specification.
26

27
A model defines application behaviour by specifying initial database state, a
28
"next" step representing database modification, and an invariant, that should
29
always be true, no matter how and in which order, simultaneously or serially,
30
the next steps are applied by database clients. A model specification is
31
represented by IModelSpec interface.
32

33
A checker drives the model through particular usage scenario where probability
34
of specific race condition is likely to be high. For example
35
_check_race_loadopen_vs_local_invalidate runs two client threads, that use
36
shared storage connection, where one thread repeatedly modifies the database,
37
and the other thread repeatedly checks the database for breakage of the model
38
invariant. This checker verifies storages and ZODB.Connection for races in
39
between load/open and local invalidations to catch bugs similar to
40
https://github.com/zopefoundation/ZODB/issues/290 and
41
https://github.com/zopefoundation/ZEO/issues/166.
42
"""
43

44
import threading
1✔
45
from random import randint
1✔
46

47
import transaction
1✔
48
from zope.interface import Interface
1✔
49
from zope.interface import implementer
1✔
50

51
from ZODB import DB
1✔
52
from ZODB import POSException
1✔
53
from ZODB.tests.MinPO import MinPO
1✔
54
from ZODB.tests.util import long_test
1✔
55
from ZODB.tests.util import with_high_concurrency
1✔
56
from ZODB.utils import at2before
1✔
57
from ZODB.utils import tid_repr
1✔
58

59

60
class IModelSpec(Interface):
1✔
61
    """IModelSpec interface represents testing specification used by
62
    check_race_*"""
63

64
    def init(root):
1✔
65
        """init should initialize database state."""
66

67
    def next(root):
1✔
68
        """next should modify database state."""
69

70
    def assertStateOK(root):
1✔
71
        """assertStateOK should verify whether database state follows
72
        intended invariant.
73

74
        If not - it should raise AssertionError with details.
75
        """
76

77

78
@implementer(IModelSpec)
1✔
79
class T2ObjectsInc:
1✔
80
    """T2ObjectsInc is specification with behaviour where two objects obj1
81
    and obj2 are incremented synchronously.
82

83
    It is used in tests where bugs can be immediately observed after the race.
84

85
    invariant:  obj1 == obj2
86
    """
87
    def init(_, root):
1✔
88
        root['obj1'] = MinPO(0)
1✔
89
        root['obj2'] = MinPO(0)
1✔
90

91
    def next(_, root):
1✔
92
        root['obj1'].value += 1
1✔
93
        root['obj2'].value += 1
1✔
94

95
    def assertStateOK(_, root):
1✔
96
        # both objects must have the same values
97
        i1 = root['obj1'].value
1✔
98
        i2 = root['obj2'].value
1✔
99

100
        if not (i1 == i2):
1✔
101
            raise AssertionError("obj1 (%d)  !=  obj2 (%d)" % (i1, i2))
102

103

104
@implementer(IModelSpec)
1✔
105
class T2ObjectsInc2Phase:
1✔
106
    """T2ObjectsInc2Phase is specification with behaviour where two objects
107
    obj1 and obj2 are incremented in lock-step.
108

109
    It is used in tests where bugs can be observed on the next transaction
110
    after the race.
111

112
    invariant:  obj1 - obj2 == phase
113
    """
114
    def init(_, root):
1✔
115
        root['obj1'] = MinPO(0)
×
116
        root['obj2'] = MinPO(0)
×
117
        root['phase'] = MinPO(0)
×
118

119
    def next(_, root):
1✔
120
        phase = root['phase']
×
121
        if phase.value == 0:
×
122
            root['obj1'].value += 1
×
123
        else:
124
            root['obj2'].value += 1
×
125
        phase.value += 1
×
126
        phase.value %= 2
×
127

128
    def assertStateOK(_, root):
1✔
129
        i1 = root['obj1'].value
×
130
        i2 = root['obj2'].value
×
131
        p = root['phase'].value
×
132

133
        if not (i1 - i2 == p):
×
134
            raise AssertionError("obj1 (%d) - obj2(%d) != phase (%d)" %
135
                                 (i1, i2, p))
136

137

138
class RaceTests:
1✔
139

140
    # verify storage/Connection for race in between load/open and local
141
    # invalidations.
142
    # https://github.com/zopefoundation/ZEO/issues/166
143
    # https://github.com/zopefoundation/ZODB/issues/290
144
    def test_race_loadopen_vs_local_invalidate(self):
1✔
145
        return self._check_race_loadopen_vs_local_invalidate(T2ObjectsInc())
1✔
146

147
    @with_high_concurrency
1✔
148
    def _check_race_loadopen_vs_local_invalidate(self, spec):
1✔
149
        assert IModelSpec.providedBy(spec)
1✔
150
        db = DB(self._storage)
1✔
151

152
        # `init` initializes the database according to the spec.
153
        def init():
1✔
154
            _state_init(db, spec)
1✔
155

156
        # `verify` accesses objects in the database and verifies spec
157
        # invariant.
158
        #
159
        # Access to half of the objects is organized to always trigger loading
160
        # from zstor. Access to the other half goes through zconn cache and so
161
        # verifies whether the cache is not stale.
162
        def verify(tg):
1✔
163
            transaction.begin()
1✔
164
            zconn = db.open()
1✔
165
            root = zconn.root()
1✔
166

167
            # reload some objects from zstor, while getting others from
168
            # zconn cache
169
            _state_invalidate_half1(root)
1✔
170

171
            try:
1✔
172
                spec.assertStateOK(root)
1✔
173
            except AssertionError as e:
×
174
                msg = "verify: %s\n" % e
×
175
                msg += _state_details(root)
×
176
                tg.fail(msg)
×
177

178
            # we did not changed anything; also fails with commit:
179
            transaction.abort()
1✔
180
            zconn.close()
1✔
181

182
        # `modify` changes objects in the database by executing "next" step.
183
        #
184
        # Spec invariant should be preserved.
185
        def modify(tg):
1✔
186
            transaction.begin()
1✔
187
            zconn = db.open()
1✔
188

189
            root = zconn.root()
1✔
190
            spec.next(root)
1✔
191
            spec.assertStateOK(root)
1✔
192

193
            transaction.commit()
1✔
194
            zconn.close()
1✔
195

196
        # `xrun` runs f in a loop until either N iterations, or until failed is
197
        # set.
198
        def xrun(tg, tx, f, N):
1✔
199
            for i in range(N):
1✔
200
                # print('%s.%d' % (f.__name__, i))
201
                f(tg)
1✔
202
                if tg.failed():
1!
203
                    break
×
204

205
        # loop verify and modify concurrently.
206
        init()
1✔
207

208
        N = 500
1✔
209
        tg = TestWorkGroup(self)
1✔
210
        tg.go(xrun, verify, N, name='Tverify')
1✔
211
        tg.go(xrun, modify, N, name='Tmodify')
1✔
212
        tg.wait(120)
1✔
213

214
    # client-server storages like ZEO, NEO and RelStorage allow several storage
215
    # clients to be connected to single storage server.
216
    #
217
    # For client-server storages test subclasses should implement
218
    # _new_storage_client to return new storage client that is connected to the
219
    # same storage server self._storage is connected to.
220

221
    def _new_storage_client(self):
1✔
222
        raise NotImplementedError
223

224
    # `dbopen` creates new client storage connection and wraps it with DB.
225
    def dbopen(self):
1✔
226
        try:
1✔
227
            zstor = self._new_storage_client()
1✔
228
        except NotImplementedError:
1✔
229
            # the test will be skipped from main thread because dbopen is
230
            # first used in init on the main thread before any other thread
231
            # is spawned.
232
            self.skipTest(
1✔
233
                "%s does not implement _new_storage_client" % type(self))
234
        return DB(zstor)
×
235

236
    # verify storage for race in between load and external invalidations.
237
    # https://github.com/zopefoundation/ZEO/issues/155
238
    #
239
    # This test is similar to check_race_loadopen_vs_local_invalidate but does
240
    # not reuse its code because the probability to reproduce external
241
    # invalidation bug with only 1 mutator + 1 verifier is low.
242
    def test_race_load_vs_external_invalidate(self):
1✔
243
        return self._check_race_load_vs_external_invalidate(T2ObjectsInc())
1✔
244

245
    @with_high_concurrency
1✔
246
    def _check_race_load_vs_external_invalidate(self, spec):
1✔
247
        assert IModelSpec.providedBy(spec)
1✔
248

249
        # `init` initializes the database according to the spec.
250
        def init():
1✔
251
            db = self.dbopen()
1✔
252
            _state_init(db, spec)
×
253
            db.close()
×
254

255
        # we'll run 8 T workers concurrently. As of 20210416, due to race
256
        # conditions in ZEO, it triggers the bug where T sees stale obj2 with
257
        # obj1.value != obj2.value
258
        #
259
        # The probability to reproduce the bug is significantly reduced with
260
        # decreasing n(workers): almost never with nwork=2 and sometimes with
261
        # nwork=4.
262
        nwork = 8
1✔
263

264
        # `T` is a worker that accesses database in a loop and verifies
265
        # spec invariant.
266
        #
267
        # Access to half of the objects is organized to always trigger loading
268
        # from zstor. Access to the other half goes through zconn cache and so
269
        # verifies whether the cache is not stale.
270
        #
271
        # Once in a while T tries to modify the database executing spec "next"
272
        # as test source of changes for other workers.
273
        def T(tg, tx, N):
1✔
274
            db = self.dbopen()
×
275

276
            def t_():
×
277
                transaction.begin()
×
278
                zconn = db.open()
×
279
                root = zconn.root()
×
280

281
                # reload some objects from zstor, while getting others from
282
                # zconn cache
283
                _state_invalidate_half1(root)
×
284

285
                try:
×
286
                    spec.assertStateOK(root)
×
287
                except AssertionError as e:
×
NEW
288
                    msg = f"T{tx}: {e}\n"
×
289
                    msg += _state_details(root)
×
290
                    tg.fail(msg)
×
291

292
                # change objects once in a while
293
                if randint(0, 4) == 0:
×
294
                    # print("T%s: modify" % tx)
295
                    spec.next(root)
×
296
                    spec.assertStateOK(root)
×
297

298
                try:
×
299
                    transaction.commit()
×
300
                except POSException.ConflictError:
×
301
                    # print('conflict -> ignore')
302
                    transaction.abort()
×
303

304
                zconn.close()
×
305

306
            try:
×
307
                for i in range(N):
×
308
                    # print('T%s.%d' % (tx, i))
309
                    t_()
×
310
                    if tg.failed():
×
311
                        break
×
312
            finally:
313
                db.close()
×
314

315
        # run the workers concurrently.
316
        init()
1✔
317

318
        N = 100
×
319
        tg = TestWorkGroup(self)
×
320
        for _ in range(nwork):
×
321
            tg.go(T, N)
×
322
        tg.wait(120)
×
323

324
    # verify storage for race in between client disconnect and external
325
    # invalidations. https://github.com/zopefoundation/ZEO/issues/209
326
    #
327
    # This test is similar to check_race_load_vs_external_invalidate, but
328
    # increases the number of workers and also makes every worker to repeatedly
329
    # reconnect to the storage, so that the probability of disconnection is
330
    # high. It also uses T2ObjectsInc2Phase instead of T2ObjectsInc because if
331
    # an invalidation is skipped due to the disconnect/invalidation race,
332
    # T2ObjectsInc won't catch the bug as both objects will be either in old
333
    # state, or in new state after the next transaction. Contrary to that, with
334
    # T2ObjectsInc2Phase the invariant will be detected to be broken on the
335
    # next transaction.
336
    @long_test
1✔
337
    def test_race_external_invalidate_vs_disconnect(self):
1✔
338
        return self._check_race_external_invalidate_vs_disconnect(
×
339
            T2ObjectsInc2Phase())
340

341
    @with_high_concurrency
1✔
342
    def _check_race_external_invalidate_vs_disconnect(self, spec):
1✔
343
        assert IModelSpec.providedBy(spec)
×
344

345
        # `init` initializes the database according to the spec.
346
        def init():
×
347
            db = self.dbopen()
×
348
            _state_init(db, spec)
×
349
            db.close()
×
350

NEW
351
        nwork = 8 * 8   # nwork^2 from _check_race_load_vs_external_invalidate
×
352

353
        # `T` is similar to the T from _check_race_load_vs_external_invalidate
354
        # but reconnects to the database often.
355
        def T(tg, tx, N):
×
356
            def t_():
×
357
                def work1(db):
×
358
                    transaction.begin()
×
359
                    zconn = db.open()
×
360
                    root = zconn.root()
×
361

362
                    # reload some objects from zstor, while getting others from
363
                    # zconn cache
364
                    _state_invalidate_half1(root)
×
365

366
                    try:
×
367
                        spec.assertStateOK(root)
×
368
                    except AssertionError as e:
×
NEW
369
                        msg = f"T{tx}: {e}\n"
×
370
                        msg += _state_details(root)
×
371
                        tg.fail(msg)
×
372

373
                        zconn.close()
×
374
                        transaction.abort()
×
375
                        return
×
376

377
                    # change objects once in a while
378
                    if randint(0, 4) == 0:
×
379
                        # print("T%s: modify" % tx)
380
                        spec.next(root)
×
381
                        spec.assertStateOK(root)
×
382

383
                    try:
×
384
                        transaction.commit()
×
385
                    except POSException.ConflictError:
×
386
                        # print('conflict -> ignore')
387
                        transaction.abort()
×
388

389
                    zconn.close()
×
390

391
                db = self.dbopen()
×
392
                try:
×
393
                    for i in range(4):
×
394
                        if tg.failed():
×
395
                            break
×
396
                        work1(db)
×
397
                finally:
398
                    db.close()
×
399

400
            for i in range(N):
×
401
                # print('T%s.%d' % (tx, i))
402
                if tg.failed():
×
403
                    break
×
404
                t_()
×
405

406
        # run the workers concurrently.
407
        init()
×
408

NEW
409
        N = 100 // (2 * 4)  # N reduced to save time
×
410
        tg = TestWorkGroup(self)
×
411
        for _ in range(nwork):
×
412
            tg.go(T, N)
×
413
        tg.wait(120)
×
414

415

416
# `_state_init` initializes the database according to the spec.
417
def _state_init(db, spec):
1✔
418
    transaction.begin()
1✔
419
    zconn = db.open()
1✔
420
    root = zconn.root()
1✔
421
    spec.init(root)
1✔
422
    spec.assertStateOK(root)
1✔
423
    transaction.commit()
1✔
424
    zconn.close()
1✔
425

426

427
# `_state_invalidate_half1` invalidates first 50% of database objects, so
428
# that the next time they are accessed, they are reloaded from the storage.
429
def _state_invalidate_half1(root):
1✔
430
    keys = list(sorted(root.keys()))
1✔
431
    for k in keys[:len(keys) // 2]:
1✔
432
        obj = root[k]
1✔
433
        obj._p_invalidate()
1✔
434

435

436
# `_state_details` returns text details about ZODB objects directly referenced
437
# by root.
438
def _state_details(root):  # -> txt
1✔
439
    # serial for all objects
440
    keys = list(sorted(root.keys()))
×
441
    txt = ''
×
NEW
442
    txt += '  '.join(f'{k}._p_serial: {tid_repr(root[k]._p_serial)}'
×
443
                     for k in keys)
444
    txt += '\n'
×
445

446
    # zconn.at approximated as max(serials)
447
    # XXX better retrieve real zconn.at, but currently there is no way to
448
    # retrieve it for all kind of storages.
449
    zconn = root._p_jar
×
450
    zconn_at = max(root[k]._p_serial for k in keys)
×
451
    txt += 'zconn_at: %s  # approximated as max(serials)\n' % \
×
452
           tid_repr(zconn_at)
453

454
    # zstor.loadBefore(obj, @zconn_at)
455
    zstor = zconn.db().storage
×
456

457
    def load(key):
×
458
        load_txt = 'zstor.loadBefore(%s, @zconn.at)\t->  ' % key
×
459
        obj = root[key]
×
460
        x = zstor.loadBefore(obj._p_oid, at2before(zconn_at))
×
461
        if x is None:
×
462
            load_txt += 'None'
×
463
        else:
464
            _, serial, next_serial = x
×
465
            load_txt += 'serial: {}  next_serial: {}'.format(
×
466
                        tid_repr(serial), tid_repr(next_serial))
467
        load_txt += '\n'
×
468
        return load_txt
×
469

470
    for k in keys:
×
471
        txt += load(k)
×
472

473
    # try to reset storage cache and retry loading
474
    # it helps to see if an error was due to the cache getting out of sync
475
    zcache = getattr(zstor, '_cache', None)  # works for ZEO and NEO
×
476
    if zcache is not None:
×
477
        zcache.clear()
×
478
        txt += 'zstor._cache.clear()\n'
×
479
        for k in keys:
×
480
            txt += load(k)
×
481

482
    return txt
×
483

484

485
class TestWorkGroup:
1✔
486
    """TestWorkGroup represents group of threads that run together to verify
487
       something.
488

489
       - .go() adds test thread to the group.
490
       - .wait() waits for all spawned threads to finish and reports all
491
         collected failures to containing testcase.
492
       - a test should indicate failure by call to .fail(), it
493
         can check for a failure with .failed()
494
    """
495

496
    def __init__(self, testcase):
1✔
497
        self.testcase = testcase
1✔
498
        self.failed_event = threading.Event()
499
        self.fail_mu = threading.Lock()
500
        self.failv = []           # failures registered by .fail
501
        self.threadv = []         # spawned threads
1✔
502
        self.waitg = WaitGroup()  # to wait for spawned threads
1✔
503

504
    def fail(self, msg):
1✔
505
        """fail adds failure to test result."""
506
        with self.fail_mu:
507
            self.failv.append(msg)
508
        self.failed_event.set()
509

510
    def failed(self):
1✔
511
        """did the test already fail."""
512
        return self.failed_event.is_set()
513

514
    def go(self, f, *argv, **kw):
1✔
515
        """go spawns f(self, #thread, *argv, **kw) in new test thread."""
516
        self.waitg.add(1)
1✔
517
        tx = len(self.threadv)
1✔
518
        tname = kw.pop('name', 'T%d' % tx)
1✔
519
        t = Daemon(name=tname, target=self._run, args=(f, tx, argv, kw))
1✔
520
        self.threadv.append(t)
1✔
521
        t.start()
1✔
522

523
    def _run(self, f, tx, argv, kw):
1✔
524
        tname = self.threadv[tx].name
1✔
525
        try:
1✔
526
            f(self, tx, *argv, **kw)
1✔
527
        except Exception as e:
1✔
528
            self.fail("Unhandled exception %r in thread %s"
529
                      % (e, tname))
530
            raise
1✔
531
        finally:
532
            self.waitg.done()
1✔
533

534
    def wait(self, timeout):
1✔
535
        """wait waits for all test threads to complete and reports all
536
           collected failures to containing testcase."""
537
        if not self.waitg.wait(timeout):
1✔
538
            self.fail("test did not finish within %s seconds" % timeout)
539

540
        failed_to_finish = []
1✔
541
        for t in self.threadv:
1✔
542
            try:
1✔
543
                t.join(1)
1✔
544
            except AssertionError:
1✔
545
                self.failed_event.set()
546
                failed_to_finish.append(t.name)
1✔
547
        if failed_to_finish:
1✔
548
            self.fail("threads did not finish: %s" % failed_to_finish)
549
        del self.threadv  # avoid cyclic garbage
1✔
550

551
        if self.failed():
552
            self.testcase.fail('\n\n'.join(self.failv))
553

554

555
class Daemon(threading.Thread):
1✔
556
    """auxiliary class to create daemon threads and fail if not stopped.
557

558
    In addition, the class ensures that reports for uncaught exceptions
559
    are output holding a lock. This prevents that concurrent reports
560
    get intermixed and facilitates the exception analysis.
561
    """
562

563
    def __init__(self, **kw):
1✔
564
        super().__init__(**kw)
1✔
565
        self.daemon = True
1✔
566
        if hasattr(self, "_invoke_excepthook"):
1!
567
            # Python 3.8+
568
            ori_invoke_excepthook = self._invoke_excepthook
1✔
569

570
            def invoke_excepthook(*args, **kw):
1✔
571
                with exc_lock:
1✔
572
                    return ori_invoke_excepthook(*args, **kw)
1✔
573

574
            self._invoke_excepthook = invoke_excepthook
1✔
575
        else:
576
            # old Python
577
            ori_run = self.run
×
578

579
            def run():
×
580
                from threading import _format_exc
×
581
                from threading import _sys
×
582
                try:
×
583
                    ori_run()
×
584
                except SystemExit:
×
585
                    pass
×
586
                except BaseException:
×
587
                    if _sys and _sys.stderr is not None:
×
588
                        with exc_lock:
×
589
                            print("Exception in thread %s:\n%s" %
×
590
                                  (self.name, _format_exc()),
591
                                  file=_sys.stderr)
592
                    else:
593
                        raise
×
594
                finally:
595
                    del self.run
×
596

597
            self.run = run
×
598

599
    def join(self, *args, **kw):
1✔
600
        super().join(*args, **kw)
1✔
601
        if self.is_alive():
1✔
602
            raise AssertionError("Thread %s did not stop" % self.name)
603

604

605
# lock to ensure that Daemon exception reports are output atomically
606
exc_lock = threading.Lock()
1✔
607

608

609
class WaitGroup:
1✔
610
    """WaitGroup provides service to wait for spawned workers to be done.
611

612
       - .add() adds workers
613
       - .done() indicates that one worker is done
614
       - .wait() waits until all workers are done
615
    """
616

617
    def __init__(self):
1✔
618
        self.n = 0
1✔
619
        self.condition = threading.Condition()
1✔
620

621
    def add(self, delta):
1✔
622
        with self.condition:
1✔
623
            self.n += delta
1✔
624
            if self.n < 0:
1✔
625
                raise AssertionError("#workers is negative")
626
            if self.n == 0:
1✔
627
                self.condition.notify_all()
1✔
628

629
    def done(self):
1✔
630
        self.add(-1)
1✔
631

632
    def wait(self, timeout):  # -> ok
1✔
633
        with self.condition:
1✔
634
            if self.n == 0:
1✔
635
                return True
1✔
636
            ok = self.condition.wait(timeout)
1✔
637
            if ok is None:  # py2
1!
638
                ok = (self.n == 0)
×
639
            return ok
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc