• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / ZODB / 4734080348

pending completion
4734080348

push

github

GitHub
Merge pull request #380 from zopefoundation/racetest

2885 of 4084 branches covered (70.64%)

172 of 172 new or added lines in 2 files covered. (100.0%)

13475 of 16144 relevant lines covered (83.47%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.27
/src/ZODB/tests/racetest.py
1
##############################################################################
2
#
3
# Copyright (c) 2019 - 2022 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""Module racetest provides infrastructure and tests to verify storages against
1✔
15
data corruptions caused by race conditions in storages implementations.
16

17
It works by combining
18

19
  1) testing models for application behaviour, with
20
  2) model checkers, that drive provided application model through particular
21
     scenarios that are likely to hit specific race conditions in storage
22
     implementation.
23

24
If a race condition is hit, it is detected as a breakage of invariant defined
25
in the model specification.
26

27
A model defines application behaviour by specifying initial database state, a
28
"next" step representing database modification, and an invariant, that should
29
always be true, no matter how and in which order, simultaneously or serially,
30
the next steps are applied by database clients. A model specification is
31
represented by IModelSpec interface.
32

33
A checker drives the model through particular usage scenario where probability
34
of specific race condition is likely to be high. For example
35
_check_race_loadopen_vs_local_invalidate runs two client threads, that use
36
shared storage connection, where one thread repeatedly modifies the database,
37
and the other thread repeatedly checks the database for breakage of the model
38
invariant. This checker verifies storages and ZODB.Connection for races in
39
between load/open and local invalidations to catch bugs similar to
40
https://github.com/zopefoundation/ZODB/issues/290 and
41
https://github.com/zopefoundation/ZEO/issues/166.
42
"""
43
from __future__ import print_function
1✔
44

45
import threading
1✔
46
from random import randint
1✔
47

48
import transaction
1✔
49
from zope.interface import Interface
1✔
50
from zope.interface import implementer
1✔
51

52
from ZODB import DB
1✔
53
from ZODB import POSException
1✔
54
from ZODB.tests.MinPO import MinPO
1✔
55
from ZODB.tests.util import long_test
1✔
56
from ZODB.tests.util import with_high_concurrency
1✔
57
from ZODB.utils import at2before
1✔
58
from ZODB.utils import tid_repr
1✔
59

60

61
class IModelSpec(Interface):
1✔
62
    """IModelSpec interface represents testing specification used by
63
    check_race_*"""
64

65
    def init(root):
1✔
66
        """init should initialize database state."""
67

68
    def next(root):
1✔
69
        """next should modify database state."""
70

71
    def assertStateOK(root):
1✔
72
        """assertStateOK should verify whether database state follows
73
        intended invariant.
74

75
        If not - it should raise AssertionError with details.
76
        """
77

78

79
@implementer(IModelSpec)
1✔
80
class T2ObjectsInc:
1✔
81
    """T2ObjectsInc is specification with behaviour where two objects obj1
82
    and obj2 are incremented synchronously.
83

84
    It is used in tests where bugs can be immediately observed after the race.
85

86
    invariant:  obj1 == obj2
87
    """
88
    def init(_, root):
1✔
89
        root['obj1'] = MinPO(0)
1✔
90
        root['obj2'] = MinPO(0)
1✔
91

92
    def next(_, root):
1✔
93
        root['obj1'].value += 1
1✔
94
        root['obj2'].value += 1
1✔
95

96
    def assertStateOK(_, root):
1✔
97
        # both objects must have the same values
98
        i1 = root['obj1'].value
1✔
99
        i2 = root['obj2'].value
1✔
100

101
        if not (i1 == i2):
1✔
102
            raise AssertionError("obj1 (%d)  !=  obj2 (%d)" % (i1, i2))
103

104

105
@implementer(IModelSpec)
1✔
106
class T2ObjectsInc2Phase:
1✔
107
    """T2ObjectsInc2Phase is specification with behaviour where two objects
108
    obj1 and obj2 are incremented in lock-step.
109

110
    It is used in tests where bugs can be observed on the next transaction
111
    after the race.
112

113
    invariant:  obj1 - obj2 == phase
114
    """
115
    def init(_, root):
1✔
116
        root['obj1'] = MinPO(0)
×
117
        root['obj2'] = MinPO(0)
×
118
        root['phase'] = MinPO(0)
×
119

120
    def next(_, root):
1✔
121
        phase = root['phase']
×
122
        if phase.value == 0:
×
123
            root['obj1'].value += 1
×
124
        else:
125
            root['obj2'].value += 1
×
126
        phase.value += 1
×
127
        phase.value %= 2
×
128

129
    def assertStateOK(_, root):
1✔
130
        i1 = root['obj1'].value
×
131
        i2 = root['obj2'].value
×
132
        p = root['phase'].value
×
133

134
        if not (i1 - i2 == p):
×
135
            raise AssertionError("obj1 (%d) - obj2(%d) != phase (%d)" %
136
                                 (i1, i2, p))
137

138

139
class RaceTests(object):
1✔
140

141
    # verify storage/Connection for race in between load/open and local
142
    # invalidations.
143
    # https://github.com/zopefoundation/ZEO/issues/166
144
    # https://github.com/zopefoundation/ZODB/issues/290
145
    def check_race_loadopen_vs_local_invalidate(self):
1✔
146
        return self._check_race_loadopen_vs_local_invalidate(T2ObjectsInc())
1✔
147

148
    @with_high_concurrency
1✔
149
    def _check_race_loadopen_vs_local_invalidate(self, spec):
1✔
150
        assert IModelSpec.providedBy(spec)
1✔
151
        db = DB(self._storage)
1✔
152

153
        # `init` initializes the database according to the spec.
154
        def init():
1✔
155
            _state_init(db, spec)
1✔
156

157
        # `verify` accesses objects in the database and verifies spec
158
        # invariant.
159
        #
160
        # Access to half of the objects is organized to always trigger loading
161
        # from zstor. Access to the other half goes through zconn cache and so
162
        # verifies whether the cache is not stale.
163
        def verify(tg):
1✔
164
            transaction.begin()
1✔
165
            zconn = db.open()
1✔
166
            root = zconn.root()
1✔
167

168
            # reload some objects from zstor, while getting others from
169
            # zconn cache
170
            _state_invalidate_half1(root)
1✔
171

172
            try:
1✔
173
                spec.assertStateOK(root)
1✔
174
            except AssertionError as e:
×
175
                msg = "verify: %s\n" % e
×
176
                msg += _state_details(root)
×
177
                tg.fail(msg)
×
178

179
            # we did not changed anything; also fails with commit:
180
            transaction.abort()
1✔
181
            zconn.close()
1✔
182

183
        # `modify` changes objects in the database by executing "next" step.
184
        #
185
        # Spec invariant should be preserved.
186
        def modify(tg):
1✔
187
            transaction.begin()
1✔
188
            zconn = db.open()
1✔
189

190
            root = zconn.root()
1✔
191
            spec.next(root)
1✔
192
            spec.assertStateOK(root)
1✔
193

194
            transaction.commit()
1✔
195
            zconn.close()
1✔
196

197
        # `xrun` runs f in a loop until either N iterations, or until failed is
198
        # set.
199
        def xrun(tg, tx, f, N):
1✔
200
            for i in range(N):
1✔
201
                # print('%s.%d' % (f.__name__, i))
202
                f(tg)
1✔
203
                if tg.failed():
1!
204
                    break
×
205

206
        # loop verify and modify concurrently.
207
        init()
1✔
208

209
        N = 500
1✔
210
        tg = TestWorkGroup(self)
1✔
211
        tg.go(xrun, verify, N, name='Tverify')
1✔
212
        tg.go(xrun, modify, N, name='Tmodify')
1✔
213
        tg.wait(120)
1✔
214

215
    # client-server storages like ZEO, NEO and RelStorage allow several storage
216
    # clients to be connected to single storage server.
217
    #
218
    # For client-server storages test subclasses should implement
219
    # _new_storage_client to return new storage client that is connected to the
220
    # same storage server self._storage is connected to.
221

222
    def _new_storage_client(self):
1✔
223
        raise NotImplementedError
224

225
    # `dbopen` creates new client storage connection and wraps it with DB.
226
    def dbopen(self):
1✔
227
        try:
1✔
228
            zstor = self._new_storage_client()
1✔
229
        except NotImplementedError:
1✔
230
            # the test will be skipped from main thread because dbopen is
231
            # first used in init on the main thread before any other thread
232
            # is spawned.
233
            self.skipTest(
1✔
234
                "%s does not implement _new_storage_client" % type(self))
235
        return DB(zstor)
×
236

237
    # verify storage for race in between load and external invalidations.
238
    # https://github.com/zopefoundation/ZEO/issues/155
239
    #
240
    # This test is similar to check_race_loadopen_vs_local_invalidate but does
241
    # not reuse its code because the probability to reproduce external
242
    # invalidation bug with only 1 mutator + 1 verifier is low.
243
    def check_race_load_vs_external_invalidate(self):
1✔
244
        return self._check_race_load_vs_external_invalidate(T2ObjectsInc())
1✔
245

246
    @with_high_concurrency
1✔
247
    def _check_race_load_vs_external_invalidate(self, spec):
1✔
248
        assert IModelSpec.providedBy(spec)
1✔
249

250
        # `init` initializes the database according to the spec.
251
        def init():
1✔
252
            db = self.dbopen()
1✔
253
            _state_init(db, spec)
×
254
            db.close()
×
255

256
        # we'll run 8 T workers concurrently. As of 20210416, due to race
257
        # conditions in ZEO, it triggers the bug where T sees stale obj2 with
258
        # obj1.value != obj2.value
259
        #
260
        # The probability to reproduce the bug is significantly reduced with
261
        # decreasing n(workers): almost never with nwork=2 and sometimes with
262
        # nwork=4.
263
        nwork = 8
1✔
264

265
        # `T` is a worker that accesses database in a loop and verifies
266
        # spec invariant.
267
        #
268
        # Access to half of the objects is organized to always trigger loading
269
        # from zstor. Access to the other half goes through zconn cache and so
270
        # verifies whether the cache is not stale.
271
        #
272
        # Once in a while T tries to modify the database executing spec "next"
273
        # as test source of changes for other workers.
274
        def T(tg, tx, N):
1✔
275
            db = self.dbopen()
×
276

277
            def t_():
×
278
                transaction.begin()
×
279
                zconn = db.open()
×
280
                root = zconn.root()
×
281

282
                # reload some objects from zstor, while getting others from
283
                # zconn cache
284
                _state_invalidate_half1(root)
×
285

286
                try:
×
287
                    spec.assertStateOK(root)
×
288
                except AssertionError as e:
×
289
                    msg = "T%s: %s\n" % (tx, e)
×
290
                    msg += _state_details(root)
×
291
                    tg.fail(msg)
×
292

293
                # change objects once in a while
294
                if randint(0, 4) == 0:
×
295
                    # print("T%s: modify" % tx)
296
                    spec.next(root)
×
297
                    spec.assertStateOK(root)
×
298

299
                try:
×
300
                    transaction.commit()
×
301
                except POSException.ConflictError:
×
302
                    # print('conflict -> ignore')
303
                    transaction.abort()
×
304

305
                zconn.close()
×
306

307
            try:
×
308
                for i in range(N):
×
309
                    # print('T%s.%d' % (tx, i))
310
                    t_()
×
311
                    if tg.failed():
×
312
                        break
×
313
            finally:
314
                db.close()
×
315

316
        # run the workers concurrently.
317
        init()
1✔
318

319
        N = 100
×
320
        tg = TestWorkGroup(self)
×
321
        for _ in range(nwork):
×
322
            tg.go(T, N)
×
323
        tg.wait(120)
×
324

325
    # verify storage for race in between client disconnect and external
326
    # invalidations. https://github.com/zopefoundation/ZEO/issues/209
327
    #
328
    # This test is similar to check_race_load_vs_external_invalidate, but
329
    # increases the number of workers and also makes every worker to repeatedly
330
    # reconnect to the storage, so that the probability of disconnection is
331
    # high. It also uses T2ObjectsInc2Phase instead of T2ObjectsInc because if
332
    # an invalidation is skipped due to the disconnect/invalidation race,
333
    # T2ObjectsInc won't catch the bug as both objects will be either in old
334
    # state, or in new state after the next transaction. Contrary to that, with
335
    # T2ObjectsInc2Phase the invariant will be detected to be broken on the
336
    # next transaction.
337
    @long_test
1✔
338
    def check_race_external_invalidate_vs_disconnect(self):
1✔
339
        return self._check_race_external_invalidate_vs_disconnect(
×
340
                                                T2ObjectsInc2Phase())
341

342
    @with_high_concurrency
1✔
343
    def _check_race_external_invalidate_vs_disconnect(self, spec):
1✔
344
        assert IModelSpec.providedBy(spec)
×
345

346
        # `init` initializes the database according to the spec.
347
        def init():
×
348
            db = self.dbopen()
×
349
            _state_init(db, spec)
×
350
            db.close()
×
351

352
        nwork = 8*8   # nwork^2 from _check_race_load_vs_external_invalidate
×
353

354
        # `T` is similar to the T from _check_race_load_vs_external_invalidate
355
        # but reconnects to the database often.
356
        def T(tg, tx, N):
×
357
            def t_():
×
358
                def work1(db):
×
359
                    transaction.begin()
×
360
                    zconn = db.open()
×
361
                    root = zconn.root()
×
362

363
                    # reload some objects from zstor, while getting others from
364
                    # zconn cache
365
                    _state_invalidate_half1(root)
×
366

367
                    try:
×
368
                        spec.assertStateOK(root)
×
369
                    except AssertionError as e:
×
370
                        msg = "T%s: %s\n" % (tx, e)
×
371
                        msg += _state_details(root)
×
372
                        tg.fail(msg)
×
373

374
                        zconn.close()
×
375
                        transaction.abort()
×
376
                        return
×
377

378
                    # change objects once in a while
379
                    if randint(0, 4) == 0:
×
380
                        # print("T%s: modify" % tx)
381
                        spec.next(root)
×
382
                        spec.assertStateOK(root)
×
383

384
                    try:
×
385
                        transaction.commit()
×
386
                    except POSException.ConflictError:
×
387
                        # print('conflict -> ignore')
388
                        transaction.abort()
×
389

390
                    zconn.close()
×
391

392
                db = self.dbopen()
×
393
                try:
×
394
                    for i in range(4):
×
395
                        if tg.failed():
×
396
                            break
×
397
                        work1(db)
×
398
                finally:
399
                    db.close()
×
400

401
            for i in range(N):
×
402
                # print('T%s.%d' % (tx, i))
403
                if tg.failed():
×
404
                    break
×
405
                t_()
×
406

407
        # run the workers concurrently.
408
        init()
×
409

410
        N = 100 // (2*4)  # N reduced to save time
×
411
        tg = TestWorkGroup(self)
×
412
        for _ in range(nwork):
×
413
            tg.go(T, N)
×
414
        tg.wait(120)
×
415

416

417
# `_state_init` initializes the database according to the spec.
418
def _state_init(db, spec):
1✔
419
    transaction.begin()
1✔
420
    zconn = db.open()
1✔
421
    root = zconn.root()
1✔
422
    spec.init(root)
1✔
423
    spec.assertStateOK(root)
1✔
424
    transaction.commit()
1✔
425
    zconn.close()
1✔
426

427

428
# `_state_invalidate_half1` invalidates first 50% of database objects, so
429
# that the next time they are accessed, they are reloaded from the storage.
430
def _state_invalidate_half1(root):
1✔
431
    keys = list(sorted(root.keys()))
1✔
432
    for k in keys[:len(keys)//2]:
1✔
433
        obj = root[k]
1✔
434
        obj._p_invalidate()
1✔
435

436

437
# `_state_details` returns text details about ZODB objects directly referenced
438
# by root.
439
def _state_details(root):  # -> txt
1✔
440
    # serial for all objects
441
    keys = list(sorted(root.keys()))
×
442
    txt = ''
×
443
    txt += '  '.join('%s._p_serial: %s' % (k, tid_repr(root[k]._p_serial))
×
444
                     for k in keys)
445
    txt += '\n'
×
446

447
    # zconn.at approximated as max(serials)
448
    # XXX better retrieve real zconn.at, but currently there is no way to
449
    # retrieve it for all kind of storages.
450
    zconn = root._p_jar
×
451
    zconn_at = max(root[k]._p_serial for k in keys)
×
452
    txt += 'zconn_at: %s  # approximated as max(serials)\n' % \
×
453
           tid_repr(zconn_at)
454

455
    # zstor.loadBefore(obj, @zconn_at)
456
    zstor = zconn.db().storage
×
457

458
    def load(key):
×
459
        load_txt = 'zstor.loadBefore(%s, @zconn.at)\t->  ' % key
×
460
        obj = root[key]
×
461
        x = zstor.loadBefore(obj._p_oid, at2before(zconn_at))
×
462
        if x is None:
×
463
            load_txt += 'None'
×
464
        else:
465
            _, serial, next_serial = x
×
466
            load_txt += 'serial: %s  next_serial: %s' % (
×
467
                        tid_repr(serial), tid_repr(next_serial))
468
        load_txt += '\n'
×
469
        return load_txt
×
470

471
    for k in keys:
×
472
        txt += load(k)
×
473

474
    # try to reset storage cache and retry loading
475
    # it helps to see if an error was due to the cache getting out of sync
476
    zcache = getattr(zstor, '_cache', None)  # works for ZEO and NEO
×
477
    if zcache is not None:
×
478
        zcache.clear()
×
479
        txt += 'zstor._cache.clear()\n'
×
480
        for k in keys:
×
481
            txt += load(k)
×
482

483
    return txt
×
484

485

486
class TestWorkGroup(object):
1✔
487
    """TestWorkGroup represents group of threads that run together to verify
488
       something.
489

490
       - .go() adds test thread to the group.
491
       - .wait() waits for all spawned threads to finish and reports all
492
         collected failures to containing testcase.
493
       - a test should indicate failure by call to .fail(), it
494
         can check for a failure with .failed()
495
    """
496

497
    def __init__(self, testcase):
1✔
498
        self.testcase = testcase
1✔
499
        self.failed_event = threading.Event()
500
        self.fail_mu = threading.Lock()
501
        self.failv = []           # failures registered by .fail
502
        self.threadv = []         # spawned threads
1✔
503
        self.waitg = WaitGroup()  # to wait for spawned threads
1✔
504

505
    def fail(self, msg):
1✔
506
        """fail adds failure to test result."""
507
        with self.fail_mu:
508
            self.failv.append(msg)
509
        self.failed_event.set()
510

511
    def failed(self):
1✔
512
        """did the test already fail."""
513
        return self.failed_event.is_set()
514

515
    def go(self, f, *argv, **kw):
1✔
516
        """go spawns f(self, #thread, *argv, **kw) in new test thread."""
517
        self.waitg.add(1)
1✔
518
        tx = len(self.threadv)
1✔
519
        tname = kw.pop('name', 'T%d' % tx)
1✔
520
        t = Daemon(name=tname, target=self._run, args=(f, tx, argv, kw))
1✔
521
        self.threadv.append(t)
1✔
522
        t.start()
1✔
523

524
    def _run(self, f, tx, argv, kw):
1✔
525
        tname = self.threadv[tx].name
1✔
526
        try:
1✔
527
            f(self, tx, *argv, **kw)
1✔
528
        except Exception as e:
1✔
529
            self.fail("Unhandled exception %r in thread %s"
530
                      % (e, tname))
531
            raise
1✔
532
        finally:
533
            self.waitg.done()
1✔
534

535
    def wait(self, timeout):
1✔
536
        """wait waits for all test threads to complete and reports all
537
           collected failures to containing testcase."""
538
        if not self.waitg.wait(timeout):
1✔
539
            self.fail("test did not finish within %s seconds" % timeout)
540

541
        failed_to_finish = []
1✔
542
        for t in self.threadv:
1✔
543
            try:
1✔
544
                t.join(1)
1✔
545
            except AssertionError:
1✔
546
                self.failed_event.set()
547
                failed_to_finish.append(t.name)
1✔
548
        if failed_to_finish:
1✔
549
            self.fail("threads did not finish: %s" % failed_to_finish)
550
        del self.threadv  # avoid cyclic garbage
1✔
551

552
        if self.failed():
553
            self.testcase.fail('\n\n'.join(self.failv))
554

555

556
class Daemon(threading.Thread):
1✔
557
    """auxiliary class to create daemon threads and fail if not stopped.
558

559
    In addition, the class ensures that reports for uncaught exceptions
560
    are output holding a lock. This prevents that concurrent reports
561
    get intermixed and facilitates the exception analysis.
562
    """
563
    def __init__(self, **kw):
1✔
564
        super(Daemon, self).__init__(**kw)
1✔
565
        self.daemon = True
1✔
566
        if hasattr(self, "_invoke_excepthook"):
1!
567
            # Python 3.8+
568
            ori_invoke_excepthook = self._invoke_excepthook
1✔
569

570
            def invoke_excepthook(*args, **kw):
1✔
571
                with exc_lock:
1✔
572
                    return ori_invoke_excepthook(*args, **kw)
1✔
573

574
            self._invoke_excepthook = invoke_excepthook
1✔
575
        else:
576
            # old Python
577
            ori_run = self.run
×
578

579
            def run():
×
580
                from threading import _format_exc
×
581
                from threading import _sys
×
582
                try:
×
583
                    ori_run()
×
584
                except SystemExit:
×
585
                    pass
×
586
                except BaseException:
×
587
                    if _sys and _sys.stderr is not None:
×
588
                        with exc_lock:
×
589
                            print("Exception in thread %s:\n%s" %
×
590
                                  (self.name, _format_exc()),
591
                                  file=_sys.stderr)
592
                    else:
593
                        raise
×
594
                finally:
595
                    del self.run
×
596

597
            self.run = run
×
598

599
    def join(self, *args, **kw):
1✔
600
        super(Daemon, self).join(*args, **kw)
1✔
601
        if self.is_alive():
1✔
602
            raise AssertionError("Thread %s did not stop" % self.name)
603

604

605
# lock to ensure that Daemon exception reports are output atomically
606
exc_lock = threading.Lock()
1✔
607

608

609
class WaitGroup(object):
1✔
610
    """WaitGroup provides service to wait for spawned workers to be done.
611

612
       - .add() adds workers
613
       - .done() indicates that one worker is done
614
       - .wait() waits until all workers are done
615
    """
616
    def __init__(self):
1✔
617
        self.n = 0
1✔
618
        self.condition = threading.Condition()
1✔
619

620
    def add(self, delta):
1✔
621
        with self.condition:
1✔
622
            self.n += delta
1✔
623
            if self.n < 0:
1✔
624
                raise AssertionError("#workers is negative")
625
            if self.n == 0:
1✔
626
                self.condition.notify_all()
1✔
627

628
    def done(self):
1✔
629
        self.add(-1)
1✔
630

631
    def wait(self, timeout):  # -> ok
1✔
632
        with self.condition:
1✔
633
            if self.n == 0:
1✔
634
                return True
1✔
635
            ok = self.condition.wait(timeout)
1✔
636
            if ok is None:  # py2
1!
637
                ok = (self.n == 0)
×
638
            return ok
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc