• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / transaction / 16399678488

18 Sep 2024 07:25AM UTC coverage: 99.793% (+0.1%) from 99.696%
16399678488

push

github

dataflake
- vb [ci skip]

299 of 306 branches covered (97.71%)

Branch coverage included in aggregate %.

3083 of 3083 relevant lines covered (100.0%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

100.0
/src/transaction/_transaction.py
1
############################################################################
2
#
3
# Copyright (c) 2004 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
############################################################################
14
import logging
1✔
15
import sys
1✔
16
import threading
1✔
17
import traceback
1✔
18
import warnings
1✔
19
import weakref
1✔
20
from io import StringIO
1✔
21

22
from zope.interface import implementer
1✔
23

24
from transaction import interfaces
1✔
25
from transaction.interfaces import TransactionFailedError
1✔
26
from transaction.weakset import WeakSet
1✔
27

28

29
_marker = object()
1✔
30

31
_TB_BUFFER = None  # unittests may hook
1✔
32

33

34
def _makeTracebackBuffer():  # pragma NO COVER
1✔
35
    if _TB_BUFFER is not None:
1✔
36
        return _TB_BUFFER
1✔
37
    return StringIO()
1✔
38

39

40
_LOGGER = None  # unittests may hook
1✔
41

42

43
def _makeLogger():  # pragma NO COVER
1✔
44
    if _LOGGER is not None:
1✔
45
        return _LOGGER
1✔
46
    return logging.getLogger("txn.%d" % threading.get_ident())
1✔
47

48

49
class Status:
1✔
50
    # ACTIVE is the initial state.
51
    ACTIVE = "Active"
1✔
52

53
    COMMITTING = "Committing"
1✔
54
    COMMITTED = "Committed"
1✔
55

56
    DOOMED = "Doomed"
1✔
57

58
    # commit() or commit(True) raised an exception.  All further attempts
59
    # to commit or join this transaction will raise TransactionFailedError.
60
    COMMITFAILED = "Commit failed"
1✔
61

62

63
class _NoSynchronizers:
1✔
64

65
    @staticmethod
1✔
66
    def map(_f):
1✔
67
        """Do nothing."""
68

69

70
@implementer(interfaces.ITransaction)
1✔
71
class Transaction:
1✔
72
    """Default implementation of `~transaction.interfaces.ITransaction`."""
73

74
    # Assign an index to each savepoint so we can invalidate later savepoints
75
    # on rollback.  The first index assigned is 1, and it goes up by 1 each
76
    # time.
77
    _savepoint_index = 0
1✔
78

79
    # If savepoints are used, keep a weak key dict of them.  This maps a
80
    # savepoint to its index (see above).
81
    _savepoint2index = None
1✔
82

83
    # Meta data. extended_info is also metadata, but is initialized to an
84
    # empty dict in __init__.
85
    _user = ""
1✔
86
    _description = ""
1✔
87

88
    def __init__(self, synchronizers=None, manager=None):
1✔
89
        self.status = Status.ACTIVE
1✔
90
        # List of resource managers, e.g. MultiObjectResourceAdapters.
91
        self._resources = []
1✔
92

93
        # Weak set of synchronizer objects to call.
94
        if synchronizers is None:
1✔
95
            synchronizers = WeakSet()
1✔
96
        self._synchronizers = synchronizers
1✔
97

98
        self._manager = manager
1✔
99

100
        # _adapters: Connection/_p_jar -> MultiObjectResourceAdapter[Sub]
101
        self._adapters = {}
1✔
102
        self._voted = {}  # id(Connection) -> boolean, True if voted
1✔
103
        # _voted and other dictionaries use the id() of the resource
104
        # manager as a key, because we can't guess whether the actual
105
        # resource managers will be safe to use as dict keys.
106

107
        # The user, description, and extension attributes are accessed
108
        # directly by storages, leading underscore notwithstanding.
109
        self.extension = {}
1✔
110

111
        self.log = _makeLogger()
1✔
112
        self.log.debug("new transaction")
1✔
113

114
        # If a commit fails, the traceback is saved in _failure_traceback.
115
        # If another attempt is made to commit, TransactionFailedError is
116
        # raised, incorporating this traceback.
117
        self._failure_traceback = None
1✔
118

119
        # List of (hook, args, kws) tuples added by addBeforeCommitHook().
120
        self._before_commit = []
1✔
121

122
        # List of (hook, args, kws) tuples added by addAfterCommitHook().
123
        self._after_commit = []
1✔
124

125
        # List of (hook, args, kws) tuples added by addBeforeAbortHook().
126
        self._before_abort = []
1✔
127

128
        # List of (hook, args, kws) tuples added by addAfterAbortHook().
129
        self._after_abort = []
1✔
130

131
    @property
1✔
132
    def _extension(self):
1✔
133
        # for backward compatibility, since most clients used this
134
        # absent any formal API.
135
        return self.extension
1✔
136

137
    @_extension.setter
1✔
138
    def _extension(self, v):
1✔
139
        self.extension = v
1✔
140

141
    @property
1✔
142
    def user(self):
1✔
143
        return self._user
1✔
144

145
    @user.setter
1✔
146
    def user(self, v):
1✔
147
        if v is None:
1✔
148
            raise ValueError("user must not be None")
1✔
149
        self._user = text_or_warn(v)
1✔
150

151
    @property
1✔
152
    def description(self):
1✔
153
        return self._description
1✔
154

155
    @description.setter
1✔
156
    def description(self, v):
1✔
157
        if v is not None:
1✔
158
            self._description = text_or_warn(v)
1✔
159

160
    def isDoomed(self):
1✔
161
        """See `~transaction.interfaces.ITransaction`."""
162
        return self.status is Status.DOOMED
1✔
163

164
    def doom(self):
1✔
165
        """See `~transaction.interfaces.ITransaction`."""
166
        if self.status is not Status.DOOMED:
1✔
167
            if self.status is not Status.ACTIVE:
1✔
168
                # should not doom transactions in the middle,
169
                # or after, a commit
170
                raise ValueError('non-doomable')
1✔
171
            self.status = Status.DOOMED
1✔
172

173
    # Raise TransactionFailedError, due to commit()/join()/register()
174
    # getting called when the current transaction has already suffered
175
    # a commit/savepoint failure.
176
    def _prior_operation_failed(self):
1✔
177
        assert self._failure_traceback is not None
1✔
178
        raise TransactionFailedError(
1✔
179
            "An operation previously failed, with traceback:\n\n%s" %
180
            self._failure_traceback.getvalue())
181

182
    def join(self, resource):
1✔
183
        """See `~transaction.interfaces.ITransaction`."""
184
        if self.status is Status.COMMITFAILED:
1✔
185
            self._prior_operation_failed()  # doesn't return
1✔
186

187
        if (self.status is not Status.ACTIVE and
1✔
188
                self.status is not Status.DOOMED):
189
            # TODO: Should it be possible to join a committing transaction?
190
            # I think some users want it.
191
            raise ValueError(
1✔
192
                f"expected txn status {Status.ACTIVE!r} or {Status.DOOMED!r},"
193
                f" but it's {self.status!r}")
194
        self._resources.append(resource)
1✔
195

196
        if self._savepoint2index:
1✔
197
            # A data manager has joined a transaction *after* a savepoint
198
            # was created.  A couple of things are different in this case:
199
            #
200
            # 1. We need to add its savepoint to all previous savepoints.
201
            # so that if they are rolled back, we roll this one back too.
202
            #
203
            # 2. We don't actually need to ask the data manager for a
204
            # savepoint:  because it's just joining, we can just abort it to
205
            # roll back to the current state, so we simply use an
206
            # AbortSavepoint.
207
            datamanager_savepoint = AbortSavepoint(resource, self)
1✔
208
            for transaction_savepoint in self._savepoint2index.keys():
1✔
209
                transaction_savepoint._savepoints.append(
1✔
210
                    datamanager_savepoint)
211

212
    def _unjoin(self, resource):
1✔
213
        # Leave a transaction because a savepoint was rolled back on a resource
214
        # that joined later.
215

216
        # Don't use remove.  We don't want to assume anything about __eq__.
217
        self._resources = [r for r in self._resources if r is not resource]
1✔
218

219
    def savepoint(self, optimistic=False):
1✔
220
        """See `~transaction.interfaces.ITransaction`."""
221
        if self.status is Status.COMMITFAILED:
1✔
222
            self._prior_operation_failed()  # doesn't return, it raises
1✔
223

224
        try:
1✔
225
            savepoint = Savepoint(self, optimistic, *self._resources)
1✔
226
        except:  # noqa: E722 do not use bare 'except'
1✔
227
            self._cleanup(self._resources)
1✔
228
            self._saveAndRaiseCommitishError()  # reraises!
1✔
229

230
        if self._savepoint2index is None:
1✔
231
            self._savepoint2index = weakref.WeakKeyDictionary()
1✔
232
        self._savepoint_index += 1
1✔
233
        self._savepoint2index[savepoint] = self._savepoint_index
1✔
234

235
        return savepoint
1✔
236

237
    # Remove and invalidate all savepoints we know about with an index
238
    # larger than `savepoint`'s.  This is what's needed when a rollback
239
    # _to_ `savepoint` is done.
240
    def _remove_and_invalidate_after(self, savepoint):
1✔
241
        savepoint2index = self._savepoint2index
1✔
242
        index = savepoint2index[savepoint]
1✔
243
        # use list(items()) to make copy to avoid mutating while iterating
244
        for savepoint, i in list(savepoint2index.items()):
1✔
245
            if i > index:
1✔
246
                savepoint.transaction = None  # invalidate
1✔
247
                del savepoint2index[savepoint]
1✔
248

249
    # Invalidate and forget about all savepoints.
250
    def _invalidate_all_savepoints(self):
1✔
251
        for savepoint in self._savepoint2index.keys():
1✔
252
            savepoint.transaction = None  # invalidate
1✔
253
        self._savepoint2index.clear()
1✔
254

255
    def commit(self):
1✔
256
        """See `~transaction.interfaces.ITransaction`."""
257
        if self.status is Status.DOOMED:
1✔
258
            raise interfaces.DoomedTransaction(
1✔
259
                'transaction doomed, cannot commit')
260

261
        if self._savepoint2index:
1✔
262
            self._invalidate_all_savepoints()
1✔
263

264
        if self.status is Status.COMMITFAILED:
1✔
265
            self._prior_operation_failed()  # doesn't return
1✔
266

267
        self._callBeforeCommitHooks()
1✔
268

269
        self._synchronizers.map(lambda s: s.beforeCompletion(self))
1✔
270
        self.status = Status.COMMITTING
1✔
271

272
        try:
1✔
273
            self._commitResources()
1✔
274
            self.status = Status.COMMITTED
1✔
275
        except:  # noqa: E722 do not use bare 'except'
1✔
276
            t = None
1✔
277
            v = None
1✔
278
            tb = None
1✔
279
            try:
1✔
280
                t, v, tb = self._saveAndGetCommitishError()
1✔
281
                self._callAfterCommitHooks(status=False)
1✔
282
                raise v.with_traceback(tb)
1✔
283
            finally:
284
                del t, v, tb
1✔
285
        else:
286
            self._synchronizers.map(lambda s: s.afterCompletion(self))
1✔
287
            self._callAfterCommitHooks(status=True)
1✔
288
            self._free()
1✔
289
        self.log.debug("commit")
1✔
290

291
    def _saveAndGetCommitishError(self):
1✔
292
        self.status = Status.COMMITFAILED
1✔
293
        # Save the traceback for TransactionFailedError.
294
        ft = self._failure_traceback = _makeTracebackBuffer()
1✔
295
        t = None
1✔
296
        v = None
1✔
297
        tb = None
1✔
298
        try:
1✔
299
            t, v, tb = sys.exc_info()
1✔
300
            # Record how we got into commit().
301
            traceback.print_stack(sys._getframe(1), None, ft)
1✔
302
            # Append the stack entries from here down to the exception.
303
            traceback.print_tb(tb, None, ft)
1✔
304
            # Append the exception type and value.
305
            ft.writelines(traceback.format_exception_only(t, v))
1✔
306
            return t, v, tb
1✔
307
        finally:
308
            del t, v, tb
1✔
309

310
    def _saveAndRaiseCommitishError(self):
1✔
311
        t = None
1✔
312
        v = None
1✔
313
        tb = None
1✔
314
        try:
1✔
315
            t, v, tb = self._saveAndGetCommitishError()
1✔
316
            raise v.with_traceback(tb)
1✔
317
        finally:
318
            del t, v, tb
1✔
319

320
    def getBeforeCommitHooks(self):
1✔
321
        """See `~transaction.interfaces.ITransaction`."""
322
        return iter(self._before_commit)
1✔
323

324
    def addBeforeCommitHook(self, hook, args=(), kws=None):
1✔
325
        """See `~transaction.interfaces.ITransaction`."""
326
        if kws is None:
1✔
327
            kws = {}
1✔
328
        self._before_commit.append((hook, tuple(args), kws))
1✔
329

330
    def _callBeforeCommitHooks(self):
1✔
331
        # Call all hooks registered, allowing further registrations
332
        # during processing.
333
        self._call_hooks(self._before_commit)
1✔
334

335
    def getAfterCommitHooks(self):
1✔
336
        """See `~transaction.interfaces.ITransaction`."""
337
        return iter(self._after_commit)
1✔
338

339
    def addAfterCommitHook(self, hook, args=(), kws=None):
1✔
340
        """See `~transaction.interfaces.ITransaction`."""
341
        if kws is None:
1✔
342
            kws = {}
1✔
343
        self._after_commit.append((hook, tuple(args), kws))
1✔
344

345
    def _callAfterCommitHooks(self, status=True):
1✔
346
        self._call_hooks(self._after_commit,
1✔
347
                         exc=False, clean=True, prefix_args=(status,))
348

349
    def _call_hooks(self, hooks, exc=True, clean=False, prefix_args=()):
1✔
350
        """Call *hooks*.
351

352
        If *exc* is true, fail on the first exception; otherwise
353
        log the exception and continue.
354

355
        If *clean* is true, abort all resources. This is to ensure
356
        a clean state should a (after) hook has affected one
357
        of the resources.
358

359
        *prefix_args* defines additional arguments prefixed
360
        to the arguments provided by the hook definition.
361

362
        ``_call_hooks`` supports that a hook adds new hooks.
363
        """
364
        # Avoid to abort anything at the end if no hooks are registered.
365
        if not hooks:
1✔
366
            return
1✔
367
        try:
1✔
368
            # Call all hooks registered, allowing further registrations
369
            # during processing
370
            for hook, args, kws in hooks:
1✔
371
                try:
1✔
372
                    hook(*(prefix_args + args), **kws)
1✔
373
                except:  # noqa: E722 do not use bare 'except'
1✔
374
                    if exc:
1✔
375
                        raise
1✔
376
                    # We should not fail
377
                    self.log.error("Error in hook exec in %s ",
1✔
378
                                   hook, exc_info=sys.exc_info())
379
        finally:
380
            del hooks[:]  # clear hooks
1✔
381
            if clean:
1✔
382
                # The primary operation has already been performed.
383
                # But the hooks execution might have left the resources
384
                # in an unclean state. Clean up
385
                for rm in self._resources:
1✔
386
                    try:
1✔
387
                        rm.abort(self)
1✔
388
                    except:  # noqa: E722 do not use bare 'except'
1✔
389
                        # XXX should we take further actions here ?
390
                        self.log.error("Error in abort() on manager %s",
1✔
391
                                       rm, exc_info=sys.exc_info())
392

393
    def getBeforeAbortHooks(self):
1✔
394
        """See `~transaction.interfaces.ITransaction`."""
395
        return iter(self._before_abort)
1✔
396

397
    def addBeforeAbortHook(self, hook, args=(), kws=None):
1✔
398
        """See `~transaction.interfaces.ITransaction`."""
399
        if kws is None:
1✔
400
            kws = {}
1✔
401
        self._before_abort.append((hook, tuple(args), kws))
1✔
402

403
    def _callBeforeAbortHooks(self):
1✔
404
        # Call all hooks registered, allowing further registrations
405
        # during processing.
406
        self._call_hooks(self._before_abort, exc=False)
1✔
407

408
    def getAfterAbortHooks(self):
1✔
409
        """See `~transaction.interfaces.ITransaction`."""
410
        return iter(self._after_abort)
1✔
411

412
    def addAfterAbortHook(self, hook, args=(), kws=None):
1✔
413
        """See `~transaction.interfaces.ITransaction`."""
414
        if kws is None:
1✔
415
            kws = {}
1✔
416
        self._after_abort.append((hook, tuple(args), kws))
1✔
417

418
    def _callAfterAbortHooks(self):
1✔
419
        self._call_hooks(self._after_abort, clean=True)
1✔
420

421
    def _commitResources(self):
1✔
422
        # Execute the two-phase commit protocol.
423

424
        L = list(self._resources)
1✔
425
        L.sort(key=rm_key)
1✔
426
        try:
1✔
427
            for rm in L:
1✔
428
                rm.tpc_begin(self)
1✔
429
            for rm in L:
1✔
430
                rm.commit(self)
1✔
431
                self.log.debug("commit %r", rm)
1✔
432
            for rm in L:
1✔
433
                rm.tpc_vote(self)
1✔
434
                self._voted[id(rm)] = True
1✔
435

436
            try:
1✔
437
                for rm in L:
1✔
438
                    rm.tpc_finish(self)
1✔
439
            except:  # noqa: E722 do not use bare 'except'
1✔
440
                # TODO: do we need to make this warning stronger?
441
                # TODO: It would be nice if the system could be configured
442
                # to stop committing transactions at this point.
443
                self.log.critical("A storage error occurred during the second "
1✔
444
                                  "phase of the two-phase commit.  Resources "
445
                                  "may be in an inconsistent state.")
446
                raise
1✔
447
        except:  # noqa: E722 do not use bare 'except'
1✔
448
            # If an error occurs committing a transaction, we try
449
            # to revert the changes in each of the resource managers.
450
            t, v, tb = sys.exc_info()
1✔
451
            try:
1✔
452
                try:
1✔
453
                    self._cleanup(L)
1✔
454
                finally:
455
                    self._synchronizers.map(lambda s: s.afterCompletion(self))
1✔
456
                raise v.with_traceback(tb)
1✔
457
            finally:
458
                del t, v, tb
1✔
459

460
    def _cleanup(self, L):
1✔
461
        # Called when an exception occurs during tpc_vote or tpc_finish.
462
        for rm in L:
1✔
463
            if id(rm) not in self._voted:
1✔
464
                try:
1✔
465
                    rm.abort(self)
1✔
466
                except Exception:
1✔
467
                    self.log.error("Error in abort() on manager %s",
1✔
468
                                   rm, exc_info=sys.exc_info())
469
        for rm in L:
1✔
470
            try:
1✔
471
                rm.tpc_abort(self)
1✔
472
            except Exception:
1✔
473
                self.log.error("Error in tpc_abort() on manager %s",
1✔
474
                               rm, exc_info=sys.exc_info())
475

476
    def _free_manager(self):
1✔
477
        try:
1✔
478
            if self._manager:
1✔
479
                self._manager.free(self)
1✔
480
        finally:
481
            # If we try to abort a transaction and fail, the manager
482
            # may have begun a new transaction, and will raise a
483
            # ValueError from free(); we don't want that to happen
484
            # again in _free(), which abort() always calls, so be sure
485
            # to clear out the manager.
486
            self._manager = None
1✔
487

488
    def _free(self):
1✔
489
        # Called when the transaction has been committed or aborted
490
        # to break references---this transaction object will not be returned
491
        # as the current transaction from its manager after this, and all
492
        # IDatamanager objects joined to it will forgotten
493
        # All hooks and data are forgotten.
494
        self._free_manager()
1✔
495

496
        if hasattr(self, '_data'):
1✔
497
            delattr(self, '_data')
1✔
498

499
        del self._resources[:]
1✔
500

501
        del self._before_commit[:]
1✔
502
        del self._after_commit[:]
1✔
503
        del self._before_abort[:]
1✔
504
        del self._after_abort[:]
1✔
505

506
        # self._synchronizers might be shared, we can't mutate it
507
        self._synchronizers = _NoSynchronizers
1✔
508
        self._adapters = None
1✔
509
        self._voted = None
1✔
510
        self.extension = None
1✔
511

512
    def data(self, ob):
1✔
513
        try:
1✔
514
            data = self._data
1✔
515
        except AttributeError:
1✔
516
            raise KeyError(ob)
1✔
517

518
        try:
1✔
519
            return data[id(ob)]
1✔
520
        except KeyError:
1✔
521
            raise KeyError(ob)
1✔
522

523
    def set_data(self, ob, ob_data):
1✔
524
        try:
1✔
525
            data = self._data
1✔
526
        except AttributeError:
1✔
527
            data = self._data = {}
1✔
528

529
        data[id(ob)] = ob_data
1✔
530

531
    def abort(self):
1✔
532
        """See `~transaction.interfaces.ITransaction`."""
533
        try:
1✔
534
            t = None
1✔
535
            v = None
1✔
536
            tb = None
1✔
537

538
            self._callBeforeAbortHooks()
1✔
539
            if self._savepoint2index:
1✔
540
                self._invalidate_all_savepoints()
1✔
541

542
            try:
1✔
543
                self._synchronizers.map(lambda s: s.beforeCompletion(self))
1✔
544
            except:  # noqa: E722 do not use bare 'except'
1✔
545
                t, v, tb = sys.exc_info()
1✔
546
                self.log.error(
1✔
547
                    "Failed to call synchronizers", exc_info=sys.exc_info())
548

549
            for rm in self._resources:
1✔
550
                try:
1✔
551
                    rm.abort(self)
1✔
552
                except:  # noqa: E722 do not use bare 'except'
1✔
553
                    if tb is None:
1✔
554
                        t, v, tb = sys.exc_info()
1✔
555
                    self.log.error("Failed to abort resource manager: %s",
1✔
556
                                   rm, exc_info=sys.exc_info())
557

558
            self._callAfterAbortHooks()
1✔
559
            # Unlike in commit(), we are no longer the current transaction
560
            # when we call afterCompletion(). But we can't be completely
561
            # _free(): the synchronizer might want to access some data it set
562
            # before.
563
            self._free_manager()
1✔
564

565
            self._synchronizers.map(lambda s: s.afterCompletion(self))
1✔
566

567
            self.log.debug("abort")
1✔
568

569
            if tb is not None:
1✔
570
                raise v.with_traceback(tb)
1✔
571
        finally:
572
            self._free()
1✔
573
            del t, v, tb
1✔
574

575
    def note(self, text):
1✔
576
        """See `~transaction.interfaces.ITransaction`."""
577
        if text is not None:
1✔
578
            text = text_or_warn(text).strip()
1✔
579
            if self.description:
1✔
580
                self.description += "\n" + text
1✔
581
            else:
582
                self.description = text
1✔
583

584
    def setUser(self, user_name, path="/"):
1✔
585
        """See `~transaction.interfaces.ITransaction`."""
586
        self.user = f"{text_or_warn(path)} {text_or_warn(user_name)}"
1✔
587

588
    def setExtendedInfo(self, name, value):
1✔
589
        """See `~transaction.interfaces.ITransaction`."""
590
        self.extension[name] = value
1✔
591

592
    def isRetryableError(self, error):
1✔
593
        return self._manager._retryable(type(error), error)
1✔
594

595

596
# TODO: We need a better name for the adapters.
597

598

599
def rm_key(rm):
1✔
600
    func = getattr(rm, 'sortKey', None)
1✔
601
    if func is not None:
1✔
602
        return func()
1✔
603

604

605
@implementer(interfaces.ISavepoint)
1✔
606
class Savepoint:
1✔
607
    """Implementation of `~transaction.interfaces.ISavepoint`, a transaction
608
    savepoint.
609

610
    Transaction savepoints coordinate savepoints for data managers
611
    participating in a transaction.
612
    """
613

614
    def __init__(self, transaction, optimistic, *resources):
1✔
615
        self.transaction = transaction
1✔
616
        self._savepoints = savepoints = []
1✔
617

618
        for datamanager in resources:
1✔
619
            try:
1✔
620
                savepoint = datamanager.savepoint
1✔
621
            except AttributeError:
1✔
622
                if not optimistic:
1✔
623
                    raise TypeError("Savepoints unsupported", datamanager)
1✔
624
                savepoint = NoRollbackSavepoint(datamanager)
1✔
625
            else:
626
                savepoint = savepoint()
1✔
627

628
            savepoints.append(savepoint)
1✔
629

630
    @property
1✔
631
    def valid(self):
1✔
632
        return self.transaction is not None
1✔
633

634
    def rollback(self):
1✔
635
        """See `~transaction.interfaces.ISavepoint`."""
636
        transaction = self.transaction
1✔
637
        if transaction is None:
1✔
638
            raise interfaces.InvalidSavepointRollbackError(
1✔
639
                'invalidated by a later savepoint')
640
        transaction._remove_and_invalidate_after(self)
1✔
641

642
        try:
1✔
643
            for savepoint in self._savepoints:
1✔
644
                savepoint.rollback()
1✔
645
        except:  # noqa: E722 do not use bare 'except'
1✔
646
            # Mark the transaction as failed.
647
            transaction._saveAndRaiseCommitishError()  # reraises!
1✔
648

649

650
class AbortSavepoint:
1✔
651

652
    def __init__(self, datamanager, transaction):
1✔
653
        self.datamanager = datamanager
1✔
654
        self.transaction = transaction
1✔
655

656
    def rollback(self):
1✔
657
        self.datamanager.abort(self.transaction)
1✔
658
        self.transaction._unjoin(self.datamanager)
1✔
659

660

661
class NoRollbackSavepoint:
1✔
662

663
    def __init__(self, datamanager):
1✔
664
        self.datamanager = datamanager
1✔
665

666
    def rollback(self):
1✔
667
        raise TypeError("Savepoints unsupported", self.datamanager)
1✔
668

669

670
def text_or_warn(s):
1✔
671
    if isinstance(s, str):
1✔
672
        return s
1✔
673

674
    warnings.warn("Expected text", DeprecationWarning, stacklevel=3)
1✔
675
    if isinstance(s, bytes):
1✔
676
        return s.decode('utf-8', 'replace')
1✔
677
    else:
678
        return str(s)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc