• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / ZODB / 18153960591

01 Oct 2025 06:50AM UTC coverage: 83.781% (-0.03%) from 83.811%
18153960591

Pull #415

github

web-flow
Update docs/articles/old-guide/convert_zodb_guide.py

Co-authored-by: Michael Howitz <icemac@gmx.net>
Pull Request #415: Apply the latest zope.meta templates

2441 of 3542 branches covered (68.92%)

193 of 257 new or added lines in 48 files covered. (75.1%)

12 existing lines in 6 files now uncovered.

13353 of 15938 relevant lines covered (83.78%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.09
/src/ZODB/tests/StorageTestBase.py
1
##############################################################################
2
#
3
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""Provide a mixin base class for storage tests.
15

16
The StorageTestBase class provides basic setUp() and tearDown()
17
semantics (which you can override), and it also provides a helper
18
method _dostore() which performs a complete store transaction for a
19
single object revision.
20
"""
21

22
import sys
1✔
23
import time
1✔
24
from io import BytesIO
1✔
25

26
import ZODB.tests.util
1✔
27
from ZODB._compat import PersistentPickler
1✔
28
from ZODB._compat import Unpickler
1✔
29
from ZODB._compat import _protocol
1✔
30
from ZODB.Connection import TransactionMetaData
1✔
31
from ZODB.tests.MinPO import MinPO
1✔
32
from ZODB.utils import u64
1✔
33
from ZODB.utils import z64
1✔
34

35

36
ZERO = z64
1✔
37

38

39
def snooze():
1✔
40
    # In Windows, it's possible that two successive time.time() calls return
41
    # the same value.  Tim guarantees that time never runs backwards.  You
42
    # usually want to call this before you pack a storage, or must make other
43
    # guarantees about increasing timestamps.
44
    now = time.time()
1✔
45
    while now == time.time():
1!
46
        time.sleep(0.1)
×
47

48

49
def _persistent_id(obj):
1✔
50
    oid = getattr(obj, "_p_oid", None)
1✔
51
    if getattr(oid, "__get__", None) is not None:
1!
52
        return None
×
53
    else:
54
        return oid
1✔
55

56

57
def zodb_pickle(obj):
1✔
58
    """Create a pickle in the format expected by ZODB."""
59
    f = BytesIO()
1✔
60
    p = PersistentPickler(_persistent_id, f, _protocol)
1✔
61
    klass = obj.__class__
1✔
62
    assert not hasattr(obj, '__getinitargs__'), "not ready for constructors"
1✔
63
    args = None
1✔
64

65
    mod = getattr(klass, '__module__', None)
1✔
66
    if mod is not None:
1!
67
        klass = mod, klass.__name__
1✔
68

69
    state = obj.__getstate__()
1✔
70

71
    p.dump((klass, args))
1✔
72
    p.dump(state)
1✔
73
    return f.getvalue()
1✔
74

75

76
def persistent_load(pid):
1✔
77
    # helper for zodb_unpickle
NEW
78
    return f"ref to {pid[1][0]}.{pid[1][1]} oid={u64(pid[0])}"
×
79

80

81
def zodb_unpickle(data):
1✔
82
    """Unpickle an object stored using the format expected by ZODB."""
83
    f = BytesIO(data)
1✔
84
    u = Unpickler(f)
1✔
85
    u.persistent_load = persistent_load
1✔
86
    klass_info = u.load()
1✔
87
    if isinstance(klass_info, tuple):
1!
88
        if isinstance(klass_info[0], type):
1!
89
            # Unclear:  what is the second part of klass_info?
90
            klass, xxx = klass_info
×
91
            assert not xxx
×
92
        else:
93
            if isinstance(klass_info[0], tuple):
1!
94
                modname, klassname = klass_info[0]
1✔
95
            else:
96
                modname, klassname = klass_info
×
97
            if modname == "__main__":
1!
98
                ns = globals()
×
99
            else:
100
                mod = import_helper(modname)
1✔
101
                ns = mod.__dict__
1✔
102
            try:
1✔
103
                klass = ns[klassname]
1✔
104
            except KeyError:
×
105
                print(f"can't find {klassname} in {ns!r}", file=sys.stderr)
×
106
        inst = klass()
1✔
107
    else:
108
        raise ValueError("expected class info: %s" % repr(klass_info))
×
109
    state = u.load()
1✔
110
    inst.__setstate__(state)
1✔
111
    return inst
1✔
112

113

114
def import_helper(name):
1✔
115
    __import__(name)
1✔
116
    return sys.modules[name]
1✔
117

118

119
class StorageTestBase(ZODB.tests.util.TestCase):
1✔
120

121
    # It would be simpler if concrete tests didn't need to extend
122
    # setUp() and tearDown().
123

124
    _storage = None
1✔
125

126
    def _close(self):
1✔
127
        # You should override this if closing your storage requires additional
128
        # shutdown operations.
129
        if self._storage is not None:
1!
130
            self._storage.close()
1✔
131

132
    def tearDown(self):
1✔
133
        self._close()
1✔
134
        ZODB.tests.util.TestCase.tearDown(self)
1✔
135

136
    def _dostore(self, oid=None, revid=None, data=None,
1✔
137
                 already_pickled=0, user=None, description=None,
138
                 extension=None):
139
        """Do a complete storage transaction.  The defaults are:
140

141
         - oid=None, ask the storage for a new oid
142
         - revid=None, use a revid of ZERO
143
         - data=None, pickle up some arbitrary data (the integer 7)
144

145
        Returns the object's new revision id.
146
        """
147
        if oid is None:
1✔
148
            oid = self._storage.new_oid()
1✔
149
        if revid is None:
1✔
150
            revid = ZERO
1✔
151
        if data is None:
1✔
152
            data = MinPO(7)
1✔
153
        if isinstance(data, int):
1✔
154
            data = MinPO(data)
1✔
155
        if not already_pickled:
1✔
156
            data = zodb_pickle(data)
1✔
157
        # Begin the transaction
158
        t = TransactionMetaData(extension=extension)
1✔
159
        if user is not None:
1✔
160
            t.user = user
1✔
161
        if description is not None:
1✔
162
            t.description = description
1✔
163
        try:
1✔
164
            self._storage.tpc_begin(t)
1✔
165
            # Store an object
166
            self._storage.store(oid, revid, data, '', t)
1✔
167
            # Finish the transaction
168
            self._storage.tpc_vote(t)
1✔
169
            revid = self._storage.tpc_finish(t)
1✔
170
        except:  # noqa: E722 do not use bare 'except'
1✔
171
            self._storage.tpc_abort(t)
1✔
172
            raise
1✔
173
        return revid
1✔
174

175
    def _dostoreNP(self, oid=None, revid=None, data=None,
1✔
176
                   user=None, description=None):
177
        return self._dostore(oid, revid, data, 1, user, description)
1✔
178

179
    # The following methods depend on optional storage features.
180

181
    def _undo(self, tid, expected_oids=None, note=None):
1✔
182
        # Undo a tid that affects a single object (oid).
183
        # This is very specialized.
184
        t = TransactionMetaData()
1✔
185
        t.note(note or "undo")
1✔
186
        self._storage.tpc_begin(t)
1✔
187
        undo_result = self._storage.undo(tid, t)
1✔
188
        vote_result = self._storage.tpc_vote(t)
1✔
189
        if expected_oids is not None:
1✔
190
            oids = set(undo_result[1]) if undo_result else set()
1✔
191
            if vote_result:
1!
192
                oids.update(vote_result)
×
193
            self.assertEqual(oids, set(expected_oids))
1✔
194
        return self._storage.tpc_finish(t)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc