• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / transaction / 16399678488

18 Sep 2024 07:25AM UTC coverage: 99.793% (+0.1%) from 99.696%
16399678488

push

github

dataflake
- vb [ci skip]

299 of 306 branches covered (97.71%)

Branch coverage included in aggregate %.

3083 of 3083 relevant lines covered (100.0%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.56
/src/transaction/_manager.py
1
############################################################################
2
#
3
# Copyright (c) 2004 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
############################################################################
14
"""A TransactionManager controls transaction boundaries.
15

16
It coordinates application code and resource managers, so that they
17
are associated with the right transaction.
18
"""
19
import itertools
1✔
20
import sys
1✔
21
import threading
1✔
22

23
from zope.interface import implementer
1✔
24

25
from transaction._transaction import Transaction
1✔
26
from transaction.interfaces import AlreadyInTransaction
1✔
27
from transaction.interfaces import ITransactionManager
1✔
28
from transaction.interfaces import NoTransaction
1✔
29
from transaction.interfaces import TransientError
1✔
30
from transaction.weakset import WeakSet
1✔
31

32

33
# We have to remember sets of synch objects, especially Connections.
34
# But we don't want mere registration with a transaction manager to
35
# keep a synch object alive forever; in particular, it's common
36
# practice not to explicitly close Connection objects, and keeping
37
# a Connection alive keeps a potentially huge number of other objects
38
# alive (e.g., the cache, and everything reachable from it too).
39
# Therefore we use "weak sets" internally.
40

41
# Call the ISynchronizer newTransaction() method on every element of
42
# WeakSet synchs.
43
# A transaction manager needs to do this whenever begin() is called.
44
# Since it would be good if tm.get() returned the new transaction while
45
# newTransaction() is running, calling this has to be delayed until after
46
# the transaction manager has done whatever it needs to do to make its
47
# get() return the new txn.
48
def _new_transaction(txn, synchs):
1✔
49
    if synchs:
1✔
50
        synchs.map(lambda s: s.newTransaction(txn))
1✔
51

52
# Important:  we must always pass a WeakSet (even if empty) to the Transaction
53
# constructor:  synchronizers are registered with the TM, but the
54
# ISynchronizer xyzCompletion() methods are called by Transactions without
55
# consulting the TM, so we need to pass a mutable collection of synchronizers
56
# so that Transactions "see" synchronizers that get registered after the
57
# Transaction object is constructed.
58

59

60
@implementer(ITransactionManager)
1✔
61
class TransactionManager:
1✔
62
    """Single-thread implementation of
63
    `~transaction.interfaces.ITransactionManager`.
64
    """
65

66
    def __init__(self, explicit=False):
1✔
67
        self.explicit = explicit
1✔
68
        self._txn = None
1✔
69
        self._synchs = WeakSet()
1✔
70

71
    def begin(self):
1✔
72
        """See `~transaction.interfaces.ITransactionManager`."""
73
        if self._txn is not None:
1✔
74
            if self.explicit:
1✔
75
                raise AlreadyInTransaction()
1✔
76
            self._txn.abort()
1✔
77
        txn = self._txn = Transaction(self._synchs, self)
1✔
78
        _new_transaction(txn, self._synchs)
1✔
79
        return txn
1✔
80

81
    def __enter__(self):
1✔
82
        return self.begin()
1✔
83

84
    def get(self):
1✔
85
        """See `~transaction.interfaces.ITransactionManager`."""
86
        if self._txn is None:
1✔
87
            if self.explicit:
1✔
88
                raise NoTransaction()
1✔
89
            self._txn = Transaction(self._synchs, self)
1✔
90
        return self._txn
1✔
91

92
    def free(self, txn):
1✔
93
        if txn is not self._txn:
1✔
94
            raise ValueError("Foreign transaction")
1✔
95
        self._txn = None
1✔
96

97
    def registerSynch(self, synch):
1✔
98
        """ See `~transaction.interfaces.ITransactionManager`.
99
        """
100
        self._synchs.add(synch)
1✔
101
        if self._txn is not None:
1✔
102
            synch.newTransaction(self._txn)
1✔
103

104
    def unregisterSynch(self, synch):
1✔
105
        """ See `~transaction.interfaces.ITransactionManager`.
106
        """
107
        self._synchs.remove(synch)
1✔
108

109
    def clearSynchs(self):
1✔
110
        """ See `~transaction.interfaces.ITransactionManager`.
111
        """
112
        self._synchs.clear()
1✔
113

114
    def registeredSynchs(self):
1✔
115
        """ See `~transaction.interfaces.ITransactionManager`.
116
        """
117
        return bool(self._synchs)
1✔
118

119
    def isDoomed(self):
1✔
120
        """ See `~transaction.interfaces.ITransactionManager`.
121
        """
122
        return self.get().isDoomed()
1✔
123

124
    def doom(self):
1✔
125
        """ See `~transaction.interfaces.ITransactionManager`.
126
        """
127
        return self.get().doom()
1✔
128

129
    def commit(self):
1✔
130
        """ See `~transaction.interfaces.ITransactionManager`.
131
        """
132
        return self.get().commit()
1✔
133

134
    def abort(self):
1✔
135
        """ See `~transaction.interfaces.ITransactionManager`.
136
        """
137
        return self.get().abort()
1✔
138

139
    def __exit__(self, t, v, tb):
1✔
140
        if v is None:
1✔
141
            self.commit()
1✔
142
        else:
143
            self.abort()
1✔
144

145
    def savepoint(self, optimistic=False):
1✔
146
        """ See `~transaction.interfaces.ITransactionManager`.
147
        """
148
        return self.get().savepoint(optimistic)
1✔
149

150
    def attempts(self, number=3):
1✔
151
        if number <= 0:
1✔
152
            raise ValueError("number must be positive")
1✔
153
        while number:
1✔
154
            number -= 1
1✔
155
            if number:
1✔
156
                attempt = Attempt(self)
1✔
157
                yield attempt
1✔
158
                if attempt.success:
1✔
159
                    break
1✔
160
            else:
161
                yield self
1✔
162

163
    def _retryable(self, error_type, error):
1✔
164
        if issubclass(error_type, TransientError):
1✔
165
            return True
1✔
166

167
        for dm in self.get()._resources:
1✔
168
            should_retry = getattr(dm, 'should_retry', None)
1✔
169
            if (should_retry is not None) and should_retry(error):
1✔
170
                return True
1✔
171
        return False
1✔
172

173
    run_no_func_types = int, type(None)
1✔
174

175
    def run(self, func=None, tries=3):
1✔
176
        if isinstance(func, self.run_no_func_types):
1✔
177
            if func is not None:
1✔
178
                tries = func
1✔
179
            return lambda func: self.run(func, tries)
1✔
180

181
        if tries <= 0:
1✔
182
            raise ValueError("tries must be > 0")
1✔
183

184
        # These are ordinarily strings, but that's
185
        # not required. A callable class could override them
186
        # to anything.
187
        name = func.__name__ or ''
1✔
188
        doc = func.__doc__ or ''
1✔
189

190
        if isinstance(name, bytes):
1✔
191
            name = name.decode('UTF-8')
1✔
192
        if isinstance(doc, bytes):
1✔
193
            doc = doc.decode('UTF-8')
1✔
194

195
        if name and name != '_':
1✔
196
            if doc:
1✔
197
                doc = name + '\n\n' + doc
1✔
198
            else:
199
                doc = name
1✔
200

201
        for try_no in itertools.count(1):
1!
202
            txn = self.begin()
1✔
203
            if doc:
1✔
204
                txn.note(doc)
1✔
205
            try:
1✔
206
                result = func()
1✔
207
                self.commit()
1✔
208
                return result
1✔
209
            except BaseException as exc:
1✔
210
                # Note: `abort` must not be called before `_retryable`
211
                retry = (isinstance(exc, Exception)
1✔
212
                         and try_no < tries
213
                         and self._retryable(exc.__class__, exc))
214
                self.abort()
1✔
215
                if retry:
1✔
216
                    continue
1✔
217
                else:
218
                    raise
1✔
219

220

221
@implementer(ITransactionManager)
1✔
222
class ThreadTransactionManager(threading.local):
1✔
223
    """Thread-local
224
    `transaction manager <transaction.interfaces.ITransactionManager>`.
225

226
    A thread-local transaction manager can be used as a global
227
    variable, but has a separate copy for each thread.
228

229
    Advanced applications can use the `manager` attribute to get a
230
    wrapped `TransactionManager` to allow cross-thread calls for
231
    graceful shutdown of data managers.
232
    """
233

234
    def __init__(self):
1✔
235
        self.manager = TransactionManager()
1✔
236

237
    @property
1✔
238
    def explicit(self):
1✔
239
        return self.manager.explicit
1✔
240

241
    @explicit.setter
1✔
242
    def explicit(self, v):
1✔
243
        self.manager.explicit = v
1✔
244

245
    def begin(self):
1✔
246
        return self.manager.begin()
1✔
247

248
    def get(self):
1✔
249
        return self.manager.get()
1✔
250

251
    def __enter__(self):
1✔
252
        return self.manager.__enter__()
1✔
253

254
    def commit(self):
1✔
255
        return self.manager.commit()
1✔
256

257
    def abort(self):
1✔
258
        return self.manager.abort()
1✔
259

260
    def __exit__(self, t, v, tb):
1✔
261
        return self.manager.__exit__(t, v, tb)
1✔
262

263
    def doom(self):
1✔
264
        return self.manager.doom()
1✔
265

266
    def isDoomed(self):
1✔
267
        return self.manager.isDoomed()
1✔
268

269
    def savepoint(self, optimistic=False):
1✔
270
        return self.manager.savepoint(optimistic)
1✔
271

272
    def registerSynch(self, synch):
1✔
273
        return self.manager.registerSynch(synch)
1✔
274

275
    def unregisterSynch(self, synch):
1✔
276
        return self.manager.unregisterSynch(synch)
1✔
277

278
    def clearSynchs(self):
1✔
279
        return self.manager.clearSynchs()
1✔
280

281
    def registeredSynchs(self):
1✔
282
        return self.manager.registeredSynchs()
1✔
283

284
    def attempts(self, number=3):
1✔
285
        return self.manager.attempts(number)
1✔
286

287
    def run(self, func=None, tries=3):
1✔
288
        return self.manager.run(func, tries)
1✔
289

290

291
class Attempt:
1✔
292

293
    success = False
1✔
294

295
    def __init__(self, manager):
1✔
296
        self.manager = manager
1✔
297

298
    def _retry_or_raise(self, t, v, tb):
1✔
299
        retry = self.manager._retryable(t, v)
1✔
300
        self.manager.abort()
1✔
301
        if retry:
1✔
302
            return retry  # suppress the exception if necessary
1✔
303
        raise v.with_traceback(tb)  # otherwise reraise the exception
1✔
304

305
    def __enter__(self):
1✔
306
        return self.manager.__enter__()
1✔
307

308
    def __exit__(self, t, v, tb):
1✔
309
        if v is None:
1✔
310
            try:
1✔
311
                self.manager.commit()
1✔
312
            except:  # noqa: E722 do not use bare 'except'
1✔
313
                return self._retry_or_raise(*sys.exc_info())
1✔
314
            else:
315
                self.success = True
1✔
316
        else:
317
            return self._retry_or_raise(t, v, tb)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc