• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / ZODB / 18153960591

01 Oct 2025 06:50AM UTC coverage: 83.781% (-0.03%) from 83.811%
18153960591

Pull #415

github

web-flow
Update docs/articles/old-guide/convert_zodb_guide.py

Co-authored-by: Michael Howitz <icemac@gmx.net>
Pull Request #415: Apply the latest zope.meta templates

2441 of 3542 branches covered (68.92%)

193 of 257 new or added lines in 48 files covered. (75.1%)

12 existing lines in 6 files now uncovered.

13353 of 15938 relevant lines covered (83.78%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.56
/src/ZODB/tests/BasicStorage.py
1
##############################################################################
2
#
3
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""Run the basic tests for a storage as described in the official storage API
15

16
The most complete and most out-of-date description of the interface is:
17
http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storage_Interface_Info.html
18

19
All storages should be able to pass these tests.
20
"""
21
import threading
1✔
22
import time
1✔
23

24
import zope.interface
1✔
25
import zope.interface.verify
1✔
26

27
from ZODB import POSException
1✔
28
from ZODB.Connection import TransactionMetaData
1✔
29
from ZODB.tests.MinPO import MinPO
1✔
30
from ZODB.tests.racetest import RaceTests
1✔
31
from ZODB.tests.StorageTestBase import ZERO
1✔
32
from ZODB.tests.StorageTestBase import zodb_pickle
1✔
33
from ZODB.tests.StorageTestBase import zodb_unpickle
1✔
34

35
from .. import utils
1✔
36

37

38
class BasicStorage(RaceTests):
1✔
39
    def testBasics(self):
1✔
40
        self.assertEqual(self._storage.lastTransaction(), ZERO)
1✔
41

42
        t = TransactionMetaData()
1✔
43
        self._storage.tpc_begin(t)
1✔
44
        self.assertRaises(POSException.StorageTransactionError,
1✔
45
                          self._storage.tpc_begin, t)
46
        # Aborting is easy
47
        self._storage.tpc_abort(t)
1✔
48
        # Test a few expected exceptions when we're doing operations giving a
49
        # different Transaction object than the one we've begun on.
50
        self._storage.tpc_begin(t)
1✔
51
        self.assertRaises(
1✔
52
            POSException.StorageTransactionError,
53
            self._storage.store,
54
            ZERO, ZERO, b'', '', TransactionMetaData())
55

56
        self.assertRaises(
1✔
57
            POSException.StorageTransactionError,
58
            self._storage.store,
59
            ZERO, 1, b'2', '', TransactionMetaData())
60

61
        self.assertRaises(
1✔
62
            POSException.StorageTransactionError,
63
            self._storage.tpc_vote, TransactionMetaData())
64
        self._storage.tpc_abort(t)
1✔
65

66
    def testSerialIsNoneForInitialRevision(self):
1✔
67
        eq = self.assertEqual
1✔
68
        oid = self._storage.new_oid()
1✔
69
        txn = TransactionMetaData()
1✔
70
        self._storage.tpc_begin(txn)
1✔
71
        # Use None for serial.  Don't use _dostore() here because that coerces
72
        # serial=None to serial=ZERO.
73
        self._storage.store(oid, None, zodb_pickle(MinPO(11)),
1✔
74
                            '', txn)
75
        self._storage.tpc_vote(txn)
1✔
76
        newrevid = self._storage.tpc_finish(txn)
1✔
77
        data, revid = utils.load_current(self._storage, oid)
1✔
78
        value = zodb_unpickle(data)
1✔
79
        eq(value, MinPO(11))
1✔
80
        eq(revid, newrevid)
1✔
81

82
    def testStore(self):
1✔
83
        revid = ZERO
1✔
84
        newrevid = self._dostore(revid=None)
1✔
85
        # Finish the transaction.
86
        self.assertNotEqual(newrevid, revid)
1✔
87

88
    def testStoreAndLoad(self):
1✔
89
        eq = self.assertEqual
1✔
90
        oid = self._storage.new_oid()
1✔
91
        self._dostore(oid=oid, data=MinPO(7))
1✔
92
        data, revid = utils.load_current(self._storage, oid)
1✔
93
        value = zodb_unpickle(data)
1✔
94
        eq(value, MinPO(7))
1✔
95
        # Now do a bunch of updates to an object
96
        for i in range(13, 22):
1✔
97
            revid = self._dostore(oid, revid=revid, data=MinPO(i))
1✔
98
        # Now get the latest revision of the object
99
        data, revid = utils.load_current(self._storage, oid)
1✔
100
        eq(zodb_unpickle(data), MinPO(21))
1✔
101

102
    def testConflicts(self):
1✔
103
        oid = self._storage.new_oid()
1✔
104
        revid1 = self._dostore(oid, data=MinPO(11))
1✔
105
        self._dostore(oid, revid=revid1, data=MinPO(12))
1✔
106
        self.assertRaises(POSException.ConflictError,
1✔
107
                          self._dostore,
108
                          oid, revid=revid1, data=MinPO(13))
109

110
    def testWriteAfterAbort(self):
1✔
111
        oid = self._storage.new_oid()
1✔
112
        t = TransactionMetaData()
1✔
113
        self._storage.tpc_begin(t)
1✔
114
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
1✔
115
        # Now abort this transaction
116
        self._storage.tpc_abort(t)
1✔
117
        # Now start all over again
118
        oid = self._storage.new_oid()
1✔
119
        self._dostore(oid=oid, data=MinPO(6))
1✔
120

121
    def testAbortAfterVote(self):
1✔
122
        oid1 = self._storage.new_oid()
1✔
123
        revid1 = self._dostore(oid=oid1, data=MinPO(-2))
1✔
124
        oid = self._storage.new_oid()
1✔
125
        t = TransactionMetaData()
1✔
126
        self._storage.tpc_begin(t)
1✔
127
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
1✔
128
        # Now abort this transaction
129
        self._storage.tpc_vote(t)
1✔
130
        self._storage.tpc_abort(t)
1✔
131
        # Now start all over again
132
        oid = self._storage.new_oid()
1✔
133
        revid = self._dostore(oid=oid, data=MinPO(6))
1✔
134

135
        for oid, revid in [(oid1, revid1), (oid, revid)]:
1✔
136
            data, _revid = utils.load_current(self._storage, oid)
1✔
137
            self.assertEqual(revid, _revid)
1✔
138

139
    def testStoreTwoObjects(self):
1✔
140
        noteq = self.assertNotEqual
1✔
141
        p31, p32, p51, p52 = map(MinPO, (31, 32, 51, 52))
1✔
142
        oid1 = self._storage.new_oid()
1✔
143
        oid2 = self._storage.new_oid()
1✔
144
        noteq(oid1, oid2)
1✔
145
        revid1 = self._dostore(oid1, data=p31)
1✔
146
        revid2 = self._dostore(oid2, data=p51)
1✔
147
        noteq(revid1, revid2)
1✔
148
        revid3 = self._dostore(oid1, revid=revid1, data=p32)
1✔
149
        revid4 = self._dostore(oid2, revid=revid2, data=p52)
1✔
150
        noteq(revid3, revid4)
1✔
151

152
    def testGetTid(self):
1✔
153
        if not hasattr(self._storage, 'getTid'):
1!
154
            return
×
155
        eq = self.assertEqual
1✔
156
        p41, p42 = map(MinPO, (41, 42))
1✔
157
        oid = self._storage.new_oid()
1✔
158
        self.assertRaises(KeyError, self._storage.getTid, oid)
1✔
159
        # Now store a revision
160
        revid1 = self._dostore(oid, data=p41)
1✔
161
        eq(revid1, self._storage.getTid(oid))
1✔
162
        # And another one
163
        revid2 = self._dostore(oid, revid=revid1, data=p42)
1✔
164
        eq(revid2, self._storage.getTid(oid))
1✔
165

166
    def testLen(self):
1✔
167
        # len(storage) reports the number of objects.
168
        # check it is zero when empty
169
        self.assertEqual(len(self._storage), 0)
1✔
170
        # check it is correct when the storage contains two object.
171
        # len may also be zero, for storages that do not keep track
172
        # of this number
173
        self._dostore(data=MinPO(22))
1✔
174
        self._dostore(data=MinPO(23))
1✔
175
        self.assertIn(len(self._storage), [0, 2])
1✔
176

177
    def testGetSize(self):
1✔
178
        self._dostore(data=MinPO(25))
1✔
179
        size = self._storage.getSize()
1✔
180
        # The storage API doesn't make any claims about what size
181
        # means except that it ought to be printable.
182
        str(size)
1✔
183

184
    def testNote(self):
1✔
185
        oid = self._storage.new_oid()
1✔
186
        t = TransactionMetaData()
1✔
187
        self._storage.tpc_begin(t)
1✔
188
        t.note('this is a test')
1✔
189
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
1✔
190
        self._storage.tpc_vote(t)
1✔
191
        self._storage.tpc_finish(t)
1✔
192

193
    def testInterfaces(self):
1✔
194
        for iface in zope.interface.providedBy(self._storage):
1✔
195
            zope.interface.verify.verifyObject(iface, self._storage)
1✔
196

197
    def testMultipleEmptyTransactions(self):
1✔
198
        # There was a bug in handling empty transactions in mapping
199
        # storage that caused the commit lock not to be released. :(
200
        t = TransactionMetaData()
1✔
201
        self._storage.tpc_begin(t)
1✔
202
        self._storage.tpc_vote(t)
1✔
203
        self._storage.tpc_finish(t)
1✔
204
        t = TransactionMetaData()
1✔
205
        self._storage.tpc_begin(t)      # Hung here before
1✔
206
        self._storage.tpc_vote(t)
1✔
207
        self._storage.tpc_finish(t)
1✔
208

209
    def _do_store_in_separate_thread(self, oid, revid, voted):
1✔
210
        # We'll run the competing trans in a separate thread:
211
        thread = threading.Thread(name='T2',
1✔
212
                                  target=self._dostore, args=(oid,),
213
                                  kwargs=dict(revid=revid))
214
        thread.daemon = True
1✔
215
        thread.start()
1✔
216
        thread.join(.1)
1✔
217
        return thread
1✔
218

219
    def test_checkCurrentSerialInTransaction(self):
1✔
220
        oid = b'\0\0\0\0\0\0\0\xf0'
1✔
221
        tid = self._dostore(oid)
1✔
222
        tid2 = self._dostore(oid, revid=tid)
1✔
223
        data = b'cpersistent\nPersistent\nq\x01.N.'  # a simple persistent obj
1✔
224

225
        # ---------------------------------------------------------------------
226
        # stale read
227
        t = TransactionMetaData()
1✔
228
        self._storage.tpc_begin(t)
1✔
229
        try:
1✔
230
            self._storage.store(b'\0\0\0\0\0\0\0\xf1',
1✔
231
                                b'\0\0\0\0\0\0\0\0', data, '', t)
232
            self._storage.checkCurrentSerialInTransaction(oid, tid, t)
1✔
233
            self._storage.tpc_vote(t)
×
234
        except POSException.ReadConflictError as v:
1✔
235
            self.assertEqual(v.oid, oid)
1✔
236
            self.assertEqual(v.serials, (tid2, tid))
1✔
237
        else:
UNCOV
238
            if 0:
×
239
                self.assertTrue(False, "No conflict error")
240

241
        self._storage.tpc_abort(t)
1✔
242

243
        # ---------------------------------------------------------------------
244
        # non-stale read, no stress. :)
245
        t = TransactionMetaData()
1✔
246
        self._storage.tpc_begin(t)
1✔
247
        self._storage.store(b'\0\0\0\0\0\0\0\xf2',
1✔
248
                            b'\0\0\0\0\0\0\0\0', data, '', t)
249
        self._storage.checkCurrentSerialInTransaction(oid, tid2, t)
1✔
250
        self._storage.tpc_vote(t)
1✔
251
        self._storage.tpc_finish(t)
1✔
252

253
        # ---------------------------------------------------------------------
254
        # non-stale read, competition after vote.  The competing
255
        # transaction must produce a tid > this transaction's tid
256
        t = TransactionMetaData()
1✔
257
        self._storage.tpc_begin(t)
1✔
258
        self._storage.store(b'\0\0\0\0\0\0\0\xf3',
1✔
259
                            b'\0\0\0\0\0\0\0\0', data, '', t)
260
        self._storage.checkCurrentSerialInTransaction(oid, tid2, t)
1✔
261
        self._storage.tpc_vote(t)
1✔
262

263
        # We'll run the competing trans in a separate thread:
264
        thread = self._do_store_in_separate_thread(oid, tid2, True)
1✔
265
        self._storage.tpc_finish(t)
1✔
266
        thread.join(33)
1✔
267

268
        tid3 = utils.load_current(self._storage, oid)[1]
1✔
269
        self.assertGreater(
1✔
270
            tid3,
271
            utils.load_current(
272
                self._storage, b'\0\0\0\0\0\0\0\xf3')[1]
273
        )
274

275
        # ---------------------------------------------------------------------
276
        # non-stale competing trans after checkCurrentSerialInTransaction
277
        t = TransactionMetaData()
1✔
278
        self._storage.tpc_begin(t)
1✔
279
        self._storage.store(b'\0\0\0\0\0\0\0\xf4',
1✔
280
                            b'\0\0\0\0\0\0\0\0', data, '', t)
281
        self._storage.checkCurrentSerialInTransaction(oid, tid3, t)
1✔
282

283
        thread = self._do_store_in_separate_thread(oid, tid3, False)
1✔
284

285
        # There are 2 possibilities:
286
        # 1. The store happens before this transaction completes,
287
        #    in which case, the vote below fails.
288
        # 2. The store happens after this trans, in which case, the
289
        #    tid of the object is greater than this transaction's tid.
290
        try:
1✔
291
            self._storage.tpc_vote(t)
1✔
292
        except POSException.ReadConflictError:
×
293
            thread.join()  # OK :)
×
294
        else:
295
            self._storage.tpc_finish(t)
1✔
296
            thread.join()
1✔
297
            tid4 = utils.load_current(self._storage, oid)[1]
1✔
298
            self.assertGreater(
1✔
299
                tid4,
300
                utils.load_current(self._storage, b'\0\0\0\0\0\0\0\xf4')[1]
301
            )
302

303
    def test_tid_ordering_w_commit(self):
1✔
304

305
        # It's important that storages always give a consistent
306
        # ordering for revisions, tids.  This is most likely to fail
307
        # around commit.  Here we'll do some basic tests to check this.
308

309
        # We'll use threads to arrange for ordering to go wrong and
310
        # verify that a storage gets it right.
311

312
        # First, some initial data.
313
        t = TransactionMetaData()
1✔
314
        self._storage.tpc_begin(t)
1✔
315
        self._storage.store(ZERO, ZERO, b'x', '', t)
1✔
316
        self._storage.tpc_vote(t)
1✔
317
        tids = []
1✔
318
        self._storage.tpc_finish(t, lambda tid: tids.append(tid))
1✔
319

320
        # OK, now we'll start a new transaction, take it to finish,
321
        # and then block finish while we do some other operations.
322

323
        t = TransactionMetaData()
1✔
324
        self._storage.tpc_begin(t)
1✔
325
        self._storage.store(ZERO, tids[0], b'y', '', t)
1✔
326
        self._storage.tpc_vote(t)
1✔
327

328
        to_join = []
1✔
329

330
        def run_in_thread(func):
1✔
331
            t = threading.Thread(target=func)
1✔
332
            t.daemon = True
1✔
333
            t.start()
1✔
334
            to_join.append(t)
1✔
335

336
        started = threading.Event()
1✔
337
        finish = threading.Event()
1✔
338

339
        @run_in_thread
1✔
340
        def commit():
1✔
341
            def callback(tid):
1✔
342
                started.set()
1✔
343
                tids.append(tid)
1✔
344
                finish.wait()
1✔
345

346
            self._storage.tpc_finish(t, callback)
1✔
347

348
        results = {}
1✔
349
        started.wait()
1✔
350
        attempts = []
1✔
351
        attempts_cond = utils.Condition()
1✔
352

353
        def update_attempts():
1✔
354
            with attempts_cond:
1✔
355
                attempts.append(1)
1✔
356
                attempts_cond.notify_all()
1✔
357

358
        @run_in_thread
1✔
359
        def load():
1✔
360
            update_attempts()
1✔
361
            results['load'] = utils.load_current(self._storage, ZERO)[1]
1✔
362
            results['lastTransaction'] = self._storage.lastTransaction()
1✔
363

364
        expected_attempts = 1
1✔
365

366
        if hasattr(self._storage, 'getTid'):
1!
367
            expected_attempts += 1
1✔
368

369
            @run_in_thread
1✔
370
            def getTid():
1✔
371
                update_attempts()
1✔
372
                results['getTid'] = self._storage.getTid(ZERO)
1✔
373

374
        if hasattr(self._storage, 'lastInvalidations'):
1✔
375
            expected_attempts += 1
1✔
376

377
            @run_in_thread
1✔
378
            def lastInvalidations():
1✔
379
                update_attempts()
1✔
380
                invals = self._storage.lastInvalidations(1)
1✔
381
                if invals:
1!
382
                    results['lastInvalidations'] = invals[0][0]
1✔
383

384
        with attempts_cond:
1✔
385
            while len(attempts) < expected_attempts:
1!
386
                attempts_cond.wait()
×
387

388
        time.sleep(.01)  # for good measure :)
1✔
389
        finish.set()
1✔
390

391
        for t in to_join:
1✔
392
            t.join(1)
1✔
393

394
        self.assertEqual(results.pop('load'), tids[1])
1✔
395
        self.assertEqual(results.pop('lastTransaction'), tids[1])
1✔
396
        for m, tid in results.items():
1✔
397
            self.assertEqual(tid, tids[1])
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc