• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / ZODB / 18153960591

01 Oct 2025 06:50AM UTC coverage: 83.781% (-0.03%) from 83.811%
18153960591

Pull #415

github

web-flow
Update docs/articles/old-guide/convert_zodb_guide.py

Co-authored-by: Michael Howitz <icemac@gmx.net>
Pull Request #415: Apply the latest zope.meta templates

2441 of 3542 branches covered (68.92%)

193 of 257 new or added lines in 48 files covered. (75.1%)

12 existing lines in 6 files now uncovered.

13353 of 15938 relevant lines covered (83.78%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.06
/src/ZODB/FileStorage/FileStorage.py
1
##############################################################################
2
#
3
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE
12
#
13
##############################################################################
14
"""Storage implementation using a log written to a single file.
15
"""
16

17
import binascii
1✔
18
import contextlib
1✔
19
import errno
1✔
20
import logging
1✔
21
import os
1✔
22
import time
1✔
23
from base64 import decodebytes
1✔
24
from base64 import encodebytes
1✔
25
from struct import pack
1✔
26
from struct import unpack
1✔
27

28
from persistent.TimeStamp import TimeStamp
1✔
29
from zc.lockfile import LockFile
1✔
30
from zope.interface import alsoProvides
1✔
31
from zope.interface import implementer
1✔
32

33
from ZODB._compat import FILESTORAGE_MAGIC
1✔
34
from ZODB._compat import Pickler
1✔
35
from ZODB._compat import _protocol
1✔
36
from ZODB._compat import loads
1✔
37
from ZODB.BaseStorage import BaseStorage
1✔
38
from ZODB.BaseStorage import DataRecord as _DataRecord
1✔
39
from ZODB.BaseStorage import TransactionRecord as _TransactionRecord
1✔
40
from ZODB.blob import BlobStorageMixin
1✔
41
from ZODB.blob import link_or_copy
1✔
42
from ZODB.blob import remove_committed
1✔
43
from ZODB.blob import remove_committed_dir
1✔
44
from ZODB.ConflictResolution import ConflictResolvingStorage
1✔
45
from ZODB.FileStorage.format import DATA_HDR
1✔
46
from ZODB.FileStorage.format import DATA_HDR_LEN
1✔
47
from ZODB.FileStorage.format import TRANS_HDR
1✔
48
from ZODB.FileStorage.format import TRANS_HDR_LEN
1✔
49
from ZODB.FileStorage.format import CorruptedDataError
1✔
50
from ZODB.FileStorage.format import CorruptedError
1✔
51
from ZODB.FileStorage.format import DataHeader
1✔
52
from ZODB.FileStorage.format import FileStorageFormatter
1✔
53
from ZODB.FileStorage.format import TxnHeader
1✔
54
from ZODB.FileStorage.fspack import FileStoragePacker
1✔
55
from ZODB.fsIndex import fsIndex
1✔
56
from ZODB.interfaces import IBlobStorageRestoreable
1✔
57
from ZODB.interfaces import IExternalGC
1✔
58
from ZODB.interfaces import IStorage
1✔
59
from ZODB.interfaces import IStorageCurrentRecordIteration
1✔
60
from ZODB.interfaces import IStorageIteration
1✔
61
from ZODB.interfaces import IStorageRestoreable
1✔
62
from ZODB.interfaces import IStorageUndoable
1✔
63
from ZODB.POSException import ConflictError
1✔
64
from ZODB.POSException import MultipleUndoErrors
1✔
65
from ZODB.POSException import POSKeyError
1✔
66
from ZODB.POSException import ReadOnlyError
1✔
67
from ZODB.POSException import StorageError
1✔
68
from ZODB.POSException import StorageSystemError
1✔
69
from ZODB.POSException import StorageTransactionError
1✔
70
from ZODB.POSException import UndoError
1✔
71
from ZODB.utils import as_bytes
1✔
72
from ZODB.utils import as_text
1✔
73
from ZODB.utils import cp
1✔
74
from ZODB.utils import load_current
1✔
75
from ZODB.utils import mktemp
1✔
76
from ZODB.utils import p64
1✔
77
from ZODB.utils import u64
1✔
78
from ZODB.utils import z64
1✔
79

80
from .. import utils
1✔
81

82

83
# Not all platforms have fsync
84
fsync = getattr(os, "fsync", None)
1✔
85

86
packed_version = FILESTORAGE_MAGIC
1✔
87

88
logger = logging.getLogger('ZODB.FileStorage')
1✔
89

90

91
def panic(message, *data):
1✔
92
    logger.critical(message, *data)
×
93
    raise CorruptedTransactionError(message % data)
×
94

95

96
class FileStorageError(StorageError):
1✔
97
    pass
1✔
98

99

100
class PackError(FileStorageError):
1✔
101
    pass
1✔
102

103

104
class FileStorageFormatError(FileStorageError):
1✔
105
    """Invalid file format
106

107
    The format of the given file is not valid.
108
    """
109

110

111
class CorruptedFileStorageError(FileStorageError,
1✔
112
                                StorageSystemError):
113
    """Corrupted file storage."""
114

115

116
class CorruptedTransactionError(CorruptedFileStorageError):
1✔
117
    pass
1✔
118

119

120
class FileStorageQuotaError(FileStorageError,
1✔
121
                            StorageSystemError):
122
    """File storage quota exceeded."""
123

124
# Intended to be raised only in fspack.py, and ignored here.
125

126

127
class RedundantPackWarning(FileStorageError):
1✔
128
    pass
1✔
129

130

131
class TempFormatter(FileStorageFormatter):
1✔
132
    """Helper class used to read formatted FileStorage data."""
133

134
    def __init__(self, afile):
1✔
135
        self._file = afile
1✔
136

137

138
@implementer(
1✔
139
    IStorageRestoreable,
140
    IStorageIteration,
141
    IStorageUndoable,
142
    IStorageCurrentRecordIteration,
143
    IExternalGC,
144
    IStorage,
145
)
146
class FileStorage(
1✔
147
    FileStorageFormatter,
148
    BlobStorageMixin,
149
    ConflictResolvingStorage,
150
    BaseStorage,
151
):
152
    """Storage that saves data in a file
153
    """
154

155
    # Set True while a pack is in progress; undo is blocked for the duration.
156
    _pack_is_in_progress = False
1✔
157

158
    def __init__(self, file_name, create=False, read_only=False, stop=None,
1✔
159
                 quota=None, pack_gc=True, pack_keep_old=True, packer=None,
160
                 blob_dir=None):
161
        """Create a file storage
162

163
        :param str file_name: Path to store data file
164
        :param bool create: Flag indicating whether a file should be
165
            created even if it already exists.
166
        :param bool read_only: Flag indicating whether the file is
167
            read only. Only one process is able to open the file
168
            non-read-only.
169
        :param bytes stop: Time-travel transaction id
170
            When the file is opened, data will be read up to the given
171
            transaction id.  Transaction ids correspond to times and
172
            you can compute transaction ids for a given time using
173
            :class:`~ZODB.TimeStamp.TimeStamp`.
174
        :param int quota: File-size quota
175
        :param bool pack_gc: Flag indicating whether garbage
176
            collection should be performed when packing.
177
        :param bool pack_keep_old: flag indicating whether old data
178
            files should be retained after packing as a ``.old`` file.
179
        :param callable packer: An alternative
180
           :interface:`packer <ZODB.FileStorage.interfaces.IFileStoragePacker>`.
181
        :param str blob_dir: A blob-directory path name.
182
           Blobs will be supported if this option is provided.
183

184
        A file storage stores data in a single file that behaves like
185
        a traditional transaction log. New data records are appended
186
        to the end of the file.  Periodically, the file is packed to
187
        free up space.  When this is done, current records as of the
188
        pack time or later are copied to a new file, which replaces
189
        the old file.
190

191
        FileStorages keep in-memory indexes mapping object oids to the
192
        location of their current records in the file. Back pointers to
193
        previous records allow access to non-current records from the
194
        current records.
195

196
        In addition to the data file, some ancillary files are
197
        created. These can be lost without affecting data
198
        integrity, however losing the index file may cause extremely
199
        slow startup. Each has a name that's a concatenation of the
200
        original file and a suffix. The files are listed below by
201
        suffix:
202

203
        .index
204
           Snapshot of the in-memory index.  This are created on
205
           shutdown, packing, and after rebuilding an index when one
206
           was not found.  For large databases, creating a
207
           file-storage object without an index file can take very
208
           long because it's necessary to scan the data file to build
209
           the index.
210

211
        .lock
212
           A lock file preventing multiple processes from opening a
213
           file storage on non-read-only mode.
214

215
        .tmp
216
          A file used to store data being committed in the first phase
217
          of 2-phase commit
218

219
        .index_tmp
220
          A temporary file used when saving the in-memory index to
221
          avoid overwriting an existing index until a new index has
222
          been fully saved.
223

224
        .pack
225
          A temporary file written while packing containing current
226
          records as of and after the pack time.
227

228
        .old
229
          The previous database file after a pack.
230

231
        When the database is packed, current records as of the pack
232
        time and later are written to the ``.pack`` file. At the end
233
        of packing, the ``.old`` file is removed, if it exists, and
234
        the data file is renamed to the ``.old`` file and finally the
235
        ``.pack`` file is rewritten to the data file.
236
        """  # noqa: E501 line too long
237

238
        if read_only:
1✔
239
            self._is_read_only = True
1✔
240
            if create:
1!
241
                raise ValueError("can't create a read-only file")
×
242
        elif stop is not None:
1!
243
            raise ValueError("time-travel only supported in read-only mode")
×
244

245
        if stop is None:
1!
246
            stop = b'\377' * 8
1✔
247

248
        # Lock the database and set up the temp file.
249
        if not read_only:
1✔
250
            # Create the lock file
251
            self._lock_file = LockFile(file_name + '.lock')
1✔
252
            self._tfile = open(file_name + '.tmp', 'w+b')
1✔
253
            self._tfmt = TempFormatter(self._tfile)
1✔
254
        else:
255
            self._tfile = None
1✔
256

257
        self._file_name = os.path.abspath(file_name)
1✔
258

259
        self._pack_gc = pack_gc
1✔
260
        self.pack_keep_old = pack_keep_old
1✔
261
        if packer is not None:
1✔
262
            self.packer = packer
1✔
263

264
        BaseStorage.__init__(self, file_name)
1✔
265

266
        index, tindex = self._newIndexes()
1✔
267
        self._initIndex(index, tindex)
1✔
268

269
        # Now open the file
270

271
        self._file = None
1✔
272
        if not create:
1✔
273
            try:
1✔
274
                self._file = open(file_name, read_only and 'rb' or 'r+b')
1✔
275
            except OSError as exc:
1✔
276
                if exc.errno == errno.EFBIG:
1!
277
                    # The file is too big to open.  Fail visibly.
278
                    raise
×
279
                if read_only:
1✔
280
                    # When open request is read-only we do not want to create
281
                    # the file
282
                    raise
1✔
283
                if exc.errno == errno.ENOENT:
1!
284
                    # The file doesn't exist.  Create it.
285
                    create = 1
1✔
286
                # If something else went wrong, it's hard to guess
287
                # what the problem was.  If the file does not exist,
288
                # create it.  Otherwise, fail.
289
                if os.path.exists(file_name):
1!
290
                    raise
×
291
                else:
292
                    create = 1
1✔
293

294
        if self._file is None and create:
1✔
295
            if os.path.exists(file_name):
1✔
296
                os.remove(file_name)
1✔
297
            self._file = open(file_name, 'w+b')
1✔
298
            self._file.write(packed_version)
1✔
299

300
        self._files = FilePool(self._file_name)
1✔
301
        r = self._restore_index()
1✔
302
        if r is not None:
1✔
303
            self._used_index = 1  # Marker for testing
1✔
304
            index, start, ltid = r
1✔
305

306
            self._initIndex(index, tindex)
1✔
307
            self._pos, self._oid, tid = read_index(
1✔
308
                self._file, file_name, index, tindex, stop,
309
                ltid=ltid, start=start, read_only=read_only,
310
            )
311
        else:
312
            self._used_index = 0  # Marker for testing
1✔
313
            self._pos, self._oid, tid = read_index(
1✔
314
                self._file, file_name, index, tindex, stop,
315
                read_only=read_only,
316
            )
317
            self._save_index()
1✔
318

319
        self._ltid = tid
1✔
320

321
        # self._pos should always point just past the last
322
        # transaction.  During 2PC, data is written after _pos.
323
        # invariant is restored at tpc_abort() or tpc_finish().
324

325
        self._ts = tid = TimeStamp(tid)
1✔
326
        t = time.time()
1✔
327
        t = TimeStamp(*time.gmtime(t)[:5] + (t % 60,))
1✔
328
        if tid > t:
1✔
329
            seconds = tid.timeTime() - t.timeTime()
1✔
330
            complainer = logger.warning
1✔
331
            if seconds > 30 * 60:   # 30 minutes -- way screwed up
1✔
332
                complainer = logger.critical
1✔
333
            complainer("%s Database records %d seconds in the future",
1✔
334
                       file_name, seconds)
335

336
        self._quota = quota
1✔
337

338
        if blob_dir:
1✔
339
            self.blob_dir = os.path.abspath(blob_dir)
1✔
340
            if create and os.path.exists(self.blob_dir):
1✔
341
                remove_committed_dir(self.blob_dir)
1✔
342

343
            self._blob_init(blob_dir)
1✔
344
            alsoProvides(self, IBlobStorageRestoreable)
1✔
345
        else:
346
            self.blob_dir = None
1✔
347
            self._blob_init_no_blobs()
1✔
348

349
    def copyTransactionsFrom(self, other):
1✔
350
        if self.blob_dir:
1✔
351
            return BlobStorageMixin.copyTransactionsFrom(self, other)
1✔
352
        else:
353
            return BaseStorage.copyTransactionsFrom(self, other)
1✔
354

355
    def _initIndex(self, index, tindex):
1✔
356
        self._index = index
1✔
357
        self._tindex = tindex
1✔
358
        self._index_get = index.get
1✔
359

360
    def __len__(self):
1✔
361
        return len(self._index)
1✔
362

363
    def _newIndexes(self):
1✔
364
        # hook to use something other than builtin dict
365
        return fsIndex(), {}
1✔
366

367
    _saved = 0
1✔
368

369
    def _save_index(self):
1✔
370
        """Write the database index to a file to support quick startup."""
371

372
        if self._is_read_only:
1✔
373
            return
1✔
374

375
        index_name = self.__name__ + '.index'
1✔
376
        tmp_name = index_name + '.index_tmp'
1✔
377

378
        self._index.save(self._pos, tmp_name)
1✔
379

380
        try:
1✔
381
            try:
1✔
382
                os.remove(index_name)
1✔
383
            except OSError:
1✔
384
                pass
1✔
385
            os.rename(tmp_name, index_name)
1✔
386
        except:  # noqa: E722 do not use bare 'except'
1✔
387
            pass
1✔
388

389
        self._saved += 1
1✔
390

391
    def _clear_index(self):
1✔
392
        index_name = self.__name__ + '.index'
×
393
        if os.path.exists(index_name):
×
394
            try:
×
395
                os.remove(index_name)
×
396
            except OSError:
×
397
                pass
×
398

399
    def _sane(self, index, pos):
1✔
400
        """Sanity check saved index data by reading the last undone trans
401

402
        Basically, we read the last not undone transaction and
403
        check to see that the included records are consistent
404
        with the index.  Any invalid record records or inconsistent
405
        object positions cause zero to be returned.
406
        """
407
        r = self._check_sanity(index, pos)
1✔
408
        if not r:
1✔
409
            logger.warning("Ignoring index for %s", self._file_name)
1✔
410
        return r
1✔
411

412
    def _check_sanity(self, index, pos):
1✔
413

414
        if pos < 100:
1✔
415
            return 0  # insane
1✔
416
        self._file.seek(0, 2)
1✔
417
        if self._file.tell() < pos:
1✔
418
            return 0  # insane
1✔
419
        ltid = None
1✔
420

421
        max_checked = 5
1✔
422
        checked = 0
1✔
423

424
        while checked < max_checked:
1!
425
            self._file.seek(pos - 8)
1✔
426
            rstl = self._file.read(8)
1✔
427
            tl = u64(rstl)
1✔
428
            pos = pos - tl - 8
1✔
429
            if pos < 4:
1!
430
                return 0  # insane
×
431
            h = self._read_txn_header(pos)
1✔
432
            if not ltid:
1!
433
                ltid = h.tid
1✔
434
            if h.tlen != tl:
1!
435
                return 0  # inconsistent lengths
×
436
            if h.status == 'u':
1!
437
                continue  # undone trans, search back
×
438
            if h.status not in ' p':
1!
439
                return 0  # insane
×
440
            if tl < h.headerlen():
1!
441
                return 0  # insane
×
442
            tend = pos + tl
1✔
443
            opos = pos + h.headerlen()
1✔
444
            if opos == tend:
1!
445
                continue  # empty trans
×
446

447
            while opos < tend and checked < max_checked:
1✔
448
                # Read the data records for this transaction
449
                h = self._read_data_header(opos)
1✔
450

451
                if opos + h.recordlen() > tend or h.tloc != pos:
1!
452
                    return 0
×
453

454
                if index.get(h.oid, 0) != opos:
1!
455
                    return 0  # insane
×
456

457
                checked += 1
1✔
458

459
                opos = opos + h.recordlen()
1✔
460

461
            return ltid
1✔
462

463
    def _restore_index(self):
1✔
464
        """Load database index to support quick startup."""
465
        # Returns (index, pos, tid), or None in case of error.
466
        # The index returned is always an instance of fsIndex.  If the
467
        # index cached in the file is a Python dict, it's converted to
468
        # fsIndex here, and, if we're not in read-only mode, the .index
469
        # file is rewritten with the converted fsIndex so we don't need to
470
        # convert it again the next time.
471
        file_name = self.__name__
1✔
472
        index_name = file_name + '.index'
1✔
473

474
        if os.path.exists(index_name):
1✔
475
            try:
1✔
476
                info = fsIndex.load(index_name)
1✔
477
            except:  # noqa: E722 do not use bare 'except'
1✔
478
                logger.exception('loading index')
1✔
479
                return None
1✔
480
        else:
481
            return None
1✔
482

483
        index = info.get('index')
1✔
484
        pos = info.get('pos')
1✔
485
        if index is None or pos is None:
1!
486
            return None
×
487
        pos = int(pos)
1✔
488

489
        if (isinstance(index, dict) or
1✔
490
                (isinstance(index, fsIndex) and
491
                 isinstance(index._data, dict))):
492
            # Convert dictionary indexes to fsIndexes *or* convert fsIndexes
493
            # which have a dict `_data` attribute to a new fsIndex (newer
494
            # fsIndexes have an OOBTree as `_data`).
495
            newindex = fsIndex()
1✔
496
            newindex.update(index)
1✔
497
            index = newindex
1✔
498
            if not self._is_read_only:
1✔
499
                # Save the converted index.
500
                f = open(index_name, 'wb')
1✔
501
                p = Pickler(f, _protocol)
1✔
502
                info['index'] = index
1✔
503
                p.dump(info)
1✔
504
                f.close()
1✔
505
                # Now call this method again to get the new data.
506
                return self._restore_index()
1✔
507

508
        tid = self._sane(index, pos)
1✔
509
        if not tid:
1✔
510
            return None
1✔
511

512
        return index, pos, tid
1✔
513

514
    def close(self):
1✔
515
        self._file.close()
1✔
516
        self._files.close()
1✔
517
        if hasattr(self, '_lock_file'):
1✔
518
            self._lock_file.close()
1✔
519
        if self._tfile:
1✔
520
            self._tfile.close()
1✔
521
        try:
1✔
522
            self._save_index()
1✔
523
        except:  # noqa: E722 do not use bare 'except'
×
524
            # Log the error and continue
525
            logger.exception("Error saving index on close()")
×
526

527
    def getSize(self):
1✔
528
        return self._pos
1✔
529

530
    def _lookup_pos(self, oid):
1✔
531
        try:
1✔
532
            return self._index[oid]
1✔
533
        except KeyError:
1✔
534
            raise POSKeyError(oid)
1✔
535
        except TypeError:
×
NEW
536
            raise TypeError(f"invalid oid {oid!r}")
×
537

538
    def load(self, oid, version=''):
1✔
539
        """Return pickle data and serial number."""
540
        assert not version
1✔
541

542
        with self._files.get() as _file:
1✔
543
            pos = self._lookup_pos(oid)
1✔
544
            h = self._read_data_header(pos, oid, _file)
1✔
545
            if h.plen:
1✔
546
                data = _file.read(h.plen)
1✔
547
                return data, h.tid
1✔
548
            elif h.back:
1!
549
                # Get the data from the backpointer, but tid from
550
                # current txn.
551
                data = self._loadBack_impl(oid, h.back, _file=_file)[0]
×
552
                return data, h.tid
×
553
            else:
554
                raise POSKeyError(oid)
1✔
555

556
    def loadSerial(self, oid, serial):
1✔
557
        with self._lock:
1✔
558
            pos = self._lookup_pos(oid)
1✔
559
            while 1:
1✔
560
                h = self._read_data_header(pos, oid)
1✔
561
                if h.tid == serial:
1✔
562
                    break
1✔
563
                pos = h.prev
1✔
564
                if h.tid < serial or not pos:
1✔
565
                    raise POSKeyError(oid)
1✔
566
            if h.plen:
1!
567
                return self._file.read(h.plen)
1✔
568
            else:
UNCOV
569
                return self._loadBack_impl(oid, h.back)[0]
×
570

571
    def loadBefore(self, oid, tid):
1✔
572
        with self._files.get() as _file:
1✔
573
            pos = self._lookup_pos(oid)
1✔
574
            end_tid = None
1✔
575
            while True:
1✔
576
                h = self._read_data_header(pos, oid, _file)
1✔
577
                if h.tid < tid:
1✔
578
                    break
1✔
579

580
                pos = h.prev
1✔
581
                end_tid = h.tid
1✔
582
                if not pos:
1✔
583
                    return None
1✔
584

585
            if h.plen:
1✔
586
                return _file.read(h.plen), h.tid, end_tid
1✔
587
            elif h.back:
1✔
588
                data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
1✔
589
                return data, h.tid, end_tid
1✔
590
            else:
591
                raise POSKeyError(oid)
1✔
592

593
    def store(self, oid, oldserial, data, version, transaction):
1✔
594
        if self._is_read_only:
1✔
595
            raise ReadOnlyError()
1✔
596
        if transaction is not self._transaction:
1✔
597
            raise StorageTransactionError(self, transaction)
1✔
598
        assert not version
1✔
599

600
        with self._lock:
1✔
601
            if oid > self._oid:
1✔
602
                self.set_max_oid(oid)
1✔
603
            old = self._index_get(oid, 0)
1✔
604
            committed_tid = None
1✔
605
            if old:
1✔
606
                h = self._read_data_header(old, oid)
1✔
607
                committed_tid = h.tid
1✔
608

609
                if oldserial != committed_tid:
1✔
610
                    data = self.tryToResolveConflict(oid, committed_tid,
1✔
611
                                                     oldserial, data)
612
                    self._resolved.append(oid)
1✔
613

614
            pos = self._pos
1✔
615
            here = pos + self._tfile.tell() + self._thl
1✔
616
            self._tindex[oid] = here
1✔
617
            new = DataHeader(oid, self._tid, old, pos, 0, len(data))
1✔
618

619
            self._tfile.write(new.asString())
1✔
620
            self._tfile.write(data)
1✔
621

622
            # Check quota
623
            if self._quota is not None and here > self._quota:
1✔
624
                raise FileStorageQuotaError(
1✔
625
                    "The storage quota has been exceeded.")
626

627
    def deleteObject(self, oid, oldserial, transaction):
1✔
628
        if self._is_read_only:
1!
629
            raise ReadOnlyError()
×
630
        if transaction is not self._transaction:
1!
631
            raise StorageTransactionError(self, transaction)
×
632

633
        with self._lock:
1✔
634
            old = self._index_get(oid, 0)
1✔
635
            if not old:
1!
636
                raise POSKeyError(oid)
×
637
            h = self._read_data_header(old, oid)
1✔
638
            committed_tid = h.tid
1✔
639

640
            if oldserial != committed_tid:
1✔
641
                raise ConflictError(
1✔
642
                    oid=oid, serials=(committed_tid, oldserial))
643

644
            pos = self._pos
1✔
645
            here = pos + self._tfile.tell() + self._thl
1✔
646
            self._tindex[oid] = here
1✔
647
            new = DataHeader(oid, self._tid, old, pos, 0, 0)
1✔
648
            self._tfile.write(new.asString())
1✔
649
            self._tfile.write(z64)
1✔
650

651
            # Check quota
652
            if self._quota is not None and here > self._quota:
1!
UNCOV
653
                raise FileStorageQuotaError(
×
654
                    "The storage quota has been exceeded.")
655

656
    def _data_find(self, tpos, oid, data):
1✔
657
        # Return backpointer for oid.  Must call with the lock held.
658
        # This is a file offset to oid's data record if found, else 0.
659
        # The data records in the transaction at tpos are searched for oid.
660
        # If a data record for oid isn't found, returns 0.
661
        # Else if oid's data record contains a backpointer, that
662
        # backpointer is returned.
663
        # Else oid's data record contains the data, and the file offset of
664
        # oid's data record is returned.  This data record should contain
665
        # a pickle identical to the 'data' argument.
666
        # When looking for oid's data record we scan all data records in
667
        # the transaction till the end looking for the latest record with oid,
668
        # even if there are multiple such records. This matches load behaviour
669
        # which uses the data record created by last store.
670

671
        # Unclear:  If the length of the stored data doesn't match len(data),
672
        # an exception is raised.  If the lengths match but the data isn't
673
        # the same, 0 is returned.  Why the discrepancy?
674
        self._file.seek(tpos)
1✔
675
        h = self._file.read(TRANS_HDR_LEN)
1✔
676
        tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
1✔
677
        status = as_text(status)
1✔
678
        self._file.read(ul + dl + el)
1✔
679
        tend = tpos + tl
1✔
680
        pos = self._file.tell()
1✔
681
        data_hdr = None
1✔
682
        data_pos = 0
1✔
683
        # scan all data records in this transaction looking for the latest
684
        # record with our oid
685
        while pos < tend:
1✔
686
            h = self._read_data_header(pos)
1✔
687
            if h.oid == oid:
1✔
688
                data_hdr = h
1✔
689
                data_pos = pos
1✔
690
            pos += h.recordlen()
1✔
691
            self._file.seek(pos)
1✔
692
        if data_hdr is None:
1✔
693
            return 0
1✔
694

695
        # return position of found data record, but make sure this looks like
696
        # the right data record to return.
697
        if data_hdr.plen == 0:
1✔
698
            # This is also a backpointer,  Gotta trust it.
699
            return data_pos
1✔
700
        else:
701
            if data_hdr.plen != len(data):
1!
702
                # The expected data doesn't match what's in the
703
                # backpointer.  Something is wrong.
704
                logger.error("Mismatch between data and"
×
705
                             " backpointer at %d", pos)
706
                return 0
×
707
            self._file.seek(data_pos + DATA_HDR_LEN)
1✔
708
            _data = self._file.read(data_hdr.plen)
1✔
709
            if data != _data:
1!
710
                return 0
×
711
            return data_pos
1✔
712

713
    def restore(self, oid, serial, data, version, prev_txn, transaction):
1✔
714
        # A lot like store() but without all the consistency checks.  This
715
        # should only be used when we /know/ the data is good, hence the
716
        # method name.  While the signature looks like store() there are some
717
        # differences:
718
        #
719
        # - serial is the serial number of /this/ revision, not of the
720
        #   previous revision.  It is used instead of self._tid, which is
721
        #   ignored.
722
        #
723
        # - Nothing is returned
724
        #
725
        # - data can be None, which indicates a George Bailey object
726
        #   (i.e. one who's creation has been transactionally undone).
727
        #
728
        # prev_txn is a backpointer.  In the original database, it's possible
729
        # that the data was actually living in a previous transaction.  This
730
        # can happen for transactional undo and other operations, and is used
731
        # as a space saving optimization.  Under some circumstances the
732
        # prev_txn may not actually exist in the target database (i.e. self)
733
        # for example, if it's been packed away.  In that case, the prev_txn
734
        # should be considered just a hint, and is ignored if the transaction
735
        # doesn't exist.
736
        if self._is_read_only:
1!
737
            raise ReadOnlyError()
×
738
        if transaction is not self._transaction:
1!
739
            raise StorageTransactionError(self, transaction)
×
740
        if version:
1!
741
            raise TypeError("Versions are no-longer supported")
×
742

743
        with self._lock:
1✔
744
            if oid > self._oid:
1✔
745
                self.set_max_oid(oid)
1✔
746
            prev_pos = 0
1✔
747
            if prev_txn is not None:
1✔
748
                prev_txn_pos = self._txn_find(prev_txn, 0)
1✔
749
                if prev_txn_pos:
1!
750
                    prev_pos = self._data_find(prev_txn_pos, oid, data)
1✔
751
            old = self._index_get(oid, 0)
1✔
752
            # Calculate the file position in the temporary file
753
            here = self._pos + self._tfile.tell() + self._thl
1✔
754
            # And update the temp file index
755
            self._tindex[oid] = here
1✔
756
            if prev_pos:
1✔
757
                # If there is a valid prev_pos, don't write data.
758
                data = None
1✔
759
            if data is None:
1✔
760
                dlen = 0
1✔
761
            else:
762
                dlen = len(data)
1✔
763

764
            # Write the recovery data record
765
            new = DataHeader(oid, serial, old, self._pos, 0, dlen)
1✔
766

767
            self._tfile.write(new.asString())
1✔
768

769
            # Finally, write the data or a backpointer.
770
            if data is None:
1✔
771
                if prev_pos:
1✔
772
                    self._tfile.write(p64(prev_pos))
1✔
773
                else:
774
                    # Write a zero backpointer, which indicates an
775
                    # un-creation transaction.
776
                    self._tfile.write(z64)
1✔
777
            else:
778
                self._tfile.write(data)
1✔
779

780
    def supportsUndo(self):
1✔
781
        return 1
1✔
782

783
    def _clear_temp(self):
1✔
784
        self._tindex.clear()
1✔
785
        if self._tfile is not None:
1!
786
            self._tfile.seek(0)
1✔
787

788
    def _begin(self, tid, u, d, e):
1✔
789
        self._nextpos = 0
1✔
790
        self._thl = TRANS_HDR_LEN + len(u) + len(d) + len(e)
1✔
791
        if self._thl > 65535:
1✔
792
            # one of u, d, or e may be > 65535
793
            # We have to check lengths here because struct.pack
794
            # doesn't raise an exception on overflow!
795
            if len(u) > 65535:
1✔
796
                raise FileStorageError('user name too long')
1✔
797
            if len(d) > 65535:
1✔
798
                raise FileStorageError('description too long')
1✔
799
            if len(e) > 65535:
1!
800
                raise FileStorageError('too much extension data')
1✔
801

802
    def tpc_vote(self, transaction):
1✔
803
        with self._lock:
1✔
804
            if transaction is not self._transaction:
1✔
805
                raise StorageTransactionError(
1✔
806
                    "tpc_vote called with wrong transaction")
807
            dlen = self._tfile.tell()
1✔
808
            self._tfile.seek(0)
1✔
809
            user, descr, ext = self._ude
1✔
810

811
            self._file.seek(self._pos)
1✔
812
            tl = self._thl + dlen
1✔
813

814
            try:
1✔
815
                h = TxnHeader(self._tid, tl, "c", len(user),
1✔
816
                              len(descr), len(ext))
817
                h.user = user
1✔
818
                h.descr = descr
1✔
819
                h.ext = ext
1✔
820
                self._file.write(h.asString())
1✔
821
                cp(self._tfile, self._file, dlen)
1✔
822
                self._file.write(p64(tl))
1✔
823
                self._file.flush()
1✔
824
            except:  # noqa: E722 do not use bare 'except'
×
825
                # Hm, an error occurred writing out the data. Maybe the
826
                # disk is full. We don't want any turd at the end.
827
                self._file.truncate(self._pos)
×
828
                self._files.flush()
×
829
                raise
×
830
            self._nextpos = self._pos + (tl + 8)
1✔
831
            return self._resolved
1✔
832

833
    def tpc_finish(self, transaction, f=None):
1✔
834
        with self._files.write_lock():
1✔
835
            with self._lock:
1✔
836
                if transaction is not self._transaction:
1✔
837
                    raise StorageTransactionError(
1✔
838
                        "tpc_finish called with wrong transaction")
839
                try:
1✔
840
                    tid = self._tid
1✔
841
                    if f is not None:
1✔
842
                        f(tid)
1✔
843
                    self._finish(tid, *self._ude)
1✔
844
                    self._clear_temp()
1✔
845
                finally:
846
                    self._ude = None
1✔
847
                    self._transaction = None
1✔
848
                    self._commit_lock.release()
1✔
849
        return tid
1✔
850

851
    def _finish(self, tid, u, d, e):
1✔
852
        # Clear the checkpoint flag
853
        self._file.seek(self._pos + 16)
1✔
854
        self._file.write(as_bytes(self._tstatus))
1✔
855
        try:
1✔
856
            # At this point, we may have committed the data to disk.
857
            # If we fail from here, we're in bad shape.
858
            self._finish_finish(tid)
1✔
859
        except:  # noqa: E722 do not use bare 'except'
1✔
860
            # Ouch.  This is bad.  Let's try to get back to where we were
861
            # and then roll over and die
862
            logger.critical("Failure in _finish. Closing.", exc_info=True)
1✔
863
            self.close()
1✔
864
            raise
1✔
865

866
    def _finish_finish(self, tid):
1✔
867
        # This is a separate method to allow tests to replace it with
868
        # something broken. :)
869

870
        self._file.flush()
1✔
871
        if fsync is not None:
1!
872
            fsync(self._file.fileno())
1✔
873

874
        self._pos = self._nextpos
1✔
875
        self._index.update(self._tindex)
1✔
876
        self._ltid = tid
1✔
877
        self._blob_tpc_finish()
1✔
878

879
    def _abort(self):
1✔
880
        if self._nextpos:
1✔
881
            self._file.truncate(self._pos)
1✔
882
            self._files.flush()
1✔
883
            self._nextpos = 0
1✔
884
            self._blob_tpc_abort()
1✔
885

886
    def _undoDataInfo(self, oid, pos, tpos):
1✔
887
        """Return the tid, data pointer, and data for the oid record at pos
888
        """
889
        if tpos:
1✔
890
            itpos = tpos - self._pos - self._thl
1✔
891
            pos = tpos
1✔
892
            tpos = self._tfile.tell()
1✔
893
            h = self._tfmt._read_data_header(itpos, oid)
1✔
894
            afile = self._tfile
1✔
895
        else:
896
            h = self._read_data_header(pos, oid)
1✔
897
            afile = self._file
1✔
898

899
        if h.oid != oid:
1!
900
            raise UndoError("Invalid undo transaction id", oid)
×
901

902
        if h.plen:
1✔
903
            data = afile.read(h.plen)
1✔
904
        else:
905
            data = ''
1✔
906
            pos = h.back
1✔
907

908
        if tpos:
1✔
909
            self._tfile.seek(tpos)  # Restore temp file to end
1✔
910

911
        return h.tid, pos, data
1✔
912

913
    def getTid(self, oid):
1✔
914
        with self._lock:
1✔
915
            pos = self._lookup_pos(oid)
1✔
916
            h = self._read_data_header(pos, oid)
1✔
917
            if h.plen == 0 and h.back == 0:
1✔
918
                # Undone creation
919
                raise POSKeyError(oid)
1✔
920
            return h.tid
1✔
921

922
    def _transactionalUndoRecord(self, oid, pos, tid, pre):
1✔
923
        """Get the undo information for a data record
924

925
        'pos' points to the data header for 'oid' in the transaction
926
        being undone.  'tid' refers to the transaction being undone.
927
        'pre' is the 'prev' field of the same data header.
928

929
        Return a 3-tuple consisting of a pickle, data pointer, and
930
        current position.  If the pickle is true, then the data
931
        pointer must be 0, but the pickle can be empty *and* the
932
        pointer 0.
933
        """
934

935
        copy = True  # Can we just copy a data pointer
1✔
936

937
        # First check if it is possible to undo this record.
938
        tpos = self._tindex.get(oid, 0)
1✔
939
        ipos = self._index.get(oid, 0)
1✔
940
        tipos = tpos or ipos
1✔
941

942
        if tipos != pos:
1✔
943
            # The transaction being undone isn't current because:
944
            # a) A later transaction was committed ipos != pos, or
945
            # b) A change was made in the current transaction. This
946
            #    could only be a previous undo in a multi-undo.
947
            #    (We don't allow multiple data managers with the same
948
            #    storage to participate in the same transaction.)
949
            assert tipos > pos
1✔
950

951
            # Get current data, as identified by tipos.  We'll use
952
            # it to decide if and how we can undo in this case.
953
            ctid, cdataptr, current_data = self._undoDataInfo(oid, ipos, tpos)
1✔
954

955
            if cdataptr != pos:
1✔
956

957
                # if cdataptr was == pos, then we'd be cool, because
958
                # we're dealing with the same data.
959

960
                # Because they aren't equal, we have to dig deeper
961

962
                # Let's see if data to be undone and current data
963
                # are the same. If not, we'll have to decide whether
964
                # we should try conflict resolution.
965

966
                try:
1✔
967
                    data_to_be_undone = self._loadBack_impl(oid, pos)[0]
1✔
968
                    if not current_data:
1✔
969
                        current_data = self._loadBack_impl(oid, cdataptr)[0]
1✔
970

971
                    if data_to_be_undone != current_data:
1✔
972
                        # OK, so the current data is different from
973
                        # the data being undone.  We can't just copy:
974
                        copy = False
1✔
975

976
                        if not pre:
1✔
977
                            # The transaction we're undoing has no
978
                            # previous state to merge with, so we
979
                            # can't resolve a conflict.
980
                            raise UndoError(
1✔
981
                                "Can't undo an add transaction followed by"
982
                                " conflicting transactions.", oid)
983
                except KeyError:
1✔
984
                    # LoadBack gave us a key error. Bail.
985
                    raise UndoError("_loadBack() failed", oid)
×
986

987
        # Return the data that should be written in the undo record.
988
        if not pre:
1✔
989
            # We're undoing object addition.  We're doing this because
990
            # subsequent transactions has no net effect on the state
991
            # (possibly because some of them were undos).
992
            return "", 0, ipos
1✔
993

994
        if copy:
1✔
995
            # we can just copy our previous-record pointer forward
996
            return "", pre, ipos
1✔
997

998
        try:
1✔
999
            pre_data = self._loadBack_impl(oid, pre)[0]
1✔
1000
        except KeyError:
×
1001
            # couldn't find oid; what's the real explanation for this?
1002
            raise UndoError("_loadBack() failed for %s", oid)
×
1003

1004
        try:
1✔
1005
            data = self.tryToResolveConflict(
1✔
1006
                oid, ctid, tid, pre_data, current_data)
1007
            return data, 0, ipos
1✔
1008
        except ConflictError:
1✔
1009
            pass
1✔
1010

1011
        raise UndoError("Some data were modified by a later transaction", oid)
1✔
1012

1013
    # undoLog() returns a description dict that includes an id entry.
1014
    # The id is opaque to the client, but contains the transaction id.
1015
    # The transactionalUndo() implementation does a simple linear
1016
    # search through the file (from the end) to find the transaction.
1017

1018
    def undoLog(self, first=0, last=-20, filter=None):
1✔
1019
        if last < 0:
1✔
1020
            # -last is supposed to be the max # of transactions.  Convert to
1021
            # a positive index.  Should have x - first = -last, which
1022
            # means x = first - last.  This is spelled out here because
1023
            # the normalization code was incorrect for years (used +1
1024
            # instead -- off by 1), until ZODB 3.4.
1025
            last = first - last
1✔
1026
        with self._lock:
1✔
1027
            if self._pack_is_in_progress:
1!
1028
                raise UndoError(
×
1029
                    'Undo is currently disabled for database maintenance.<p>')
1030
            us = UndoSearch(self._file, self._pos, first, last, filter)
1✔
1031
            while not us.finished():
1✔
1032
                # Hold lock for batches of 20 searches, so default search
1033
                # parameters will finish without letting another thread run.
1034
                for i in range(20):
1✔
1035
                    if us.finished():
1✔
1036
                        break
1✔
1037
                    us.search()
1✔
1038
                # Give another thread a chance, so that a long undoLog()
1039
                # operation doesn't block all other activity.
1040
                self._lock.release()
1✔
1041
                self._lock.acquire()
1✔
1042
            return us.results
1✔
1043

1044
    def undo(self, transaction_id, transaction):
1✔
1045
        """Undo a transaction, given by transaction_id.
1046

1047
        Do so by writing new data that reverses the action taken by
1048
        the transaction.
1049

1050
        Usually, we can get by with just copying a data pointer, by
1051
        writing a file position rather than a pickle. Sometimes, we
1052
        may do conflict resolution, in which case we actually copy
1053
        new data that results from resolution.
1054
        """
1055

1056
        if self._is_read_only:
1✔
1057
            raise ReadOnlyError()
1✔
1058
        if transaction is not self._transaction:
1!
1059
            raise StorageTransactionError(self, transaction)
×
1060

1061
        with self._lock:
1✔
1062
            # Find the right transaction to undo and call _txn_undo_write().
1063
            tid = decodebytes(transaction_id + b'\n')
1✔
1064
            assert len(tid) == 8
1✔
1065
            tpos = self._txn_find(tid, 1)
1✔
1066
            tindex = self._txn_undo_write(tpos)
1✔
1067
            self._tindex.update(tindex)
1✔
1068
            return self._tid, tindex.keys()
1✔
1069

1070
    def _txn_find(self, tid, stop_at_pack):
1✔
1071
        pos = self._pos
1✔
1072
        while pos > 39:
1!
1073
            self._file.seek(pos - 8)
1✔
1074
            pos = pos - u64(self._file.read(8)) - 8
1✔
1075
            self._file.seek(pos)
1✔
1076
            h = self._file.read(TRANS_HDR_LEN)
1✔
1077
            _tid = h[:8]
1✔
1078
            if _tid == tid:
1✔
1079
                return pos
1✔
1080
            if stop_at_pack:
1✔
1081
                # check the status field of the transaction header
1082
                if h[16] == b'p':
1!
1083
                    break
×
1084
        raise UndoError("Invalid transaction id")
×
1085

1086
    def _txn_undo_write(self, tpos):
1✔
1087
        # a helper function to write the data records for transactional undo
1088

1089
        otloc = self._pos
1✔
1090
        here = self._pos + self._tfile.tell() + self._thl
1✔
1091
        base = here - self._tfile.tell()
1✔
1092
        # Let's move the file pointer back to the start of the txn record.
1093
        th = self._read_txn_header(tpos)
1✔
1094
        if th.status != " ":
1!
1095
            raise UndoError('non-undoable transaction')
×
1096
        tend = tpos + th.tlen
1✔
1097
        pos = tpos + th.headerlen()
1✔
1098
        tindex = {}
1✔
1099

1100
        # keep track of failures, cause we may succeed later
1101
        failures = {}
1✔
1102
        # Read the data records for this transaction
1103
        while pos < tend:
1✔
1104
            h = self._read_data_header(pos)
1✔
1105
            if h.oid in failures:
1✔
1106
                del failures[h.oid]  # second chance!
1✔
1107

1108
            assert base + self._tfile.tell() == here, (here, base,
1✔
1109
                                                       self._tfile.tell())
1110
            try:
1✔
1111
                p, prev, ipos = self._transactionalUndoRecord(
1✔
1112
                    h.oid, pos, h.tid, h.prev)
1113
            except UndoError as v:
1✔
1114
                # Don't fail right away. We may be redeemed later!
1115
                failures[h.oid] = v
1✔
1116
            else:
1117

1118
                if self.blob_dir and not p and prev:
1✔
1119
                    try:
1✔
1120
                        up, userial = self._loadBackTxn(h.oid, prev)
1✔
1121
                    except POSKeyError:
1✔
1122
                        pass  # It was removed, so no need to copy data
1✔
1123
                    else:
1124
                        if self.is_blob_record(up):
1✔
1125
                            # We're undoing a blob modification operation.
1126
                            # We have to copy the blob data
1127
                            tmp = mktemp(dir=self.fshelper.temp_dir)
1✔
1128
                            with self.openCommittedBlobFile(
1✔
1129
                                    h.oid, userial) as sfp:
1130
                                with open(tmp, 'wb') as dfp:
1✔
1131
                                    cp(sfp, dfp)
1✔
1132
                            self._blob_storeblob(h.oid, self._tid, tmp)
1✔
1133

1134
                new = DataHeader(h.oid, self._tid, ipos, otloc, 0, len(p))
1✔
1135

1136
                # TODO:  This seek shouldn't be necessary, but some other
1137
                # bit of code is messing with the file pointer.
1138
                assert self._tfile.tell() == here - base, (here, base,
1✔
1139
                                                           self._tfile.tell())
1140
                self._tfile.write(new.asString())
1✔
1141
                if p:
1✔
1142
                    self._tfile.write(p)
1✔
1143
                else:
1144
                    self._tfile.write(p64(prev))
1✔
1145
                tindex[h.oid] = here
1✔
1146
                here += new.recordlen()
1✔
1147

1148
            pos += h.recordlen()
1✔
1149
            if pos > tend:
1!
1150
                raise UndoError("non-undoable transaction")
×
1151

1152
        if failures:
1✔
1153
            raise MultipleUndoErrors(list(failures.items()))
1✔
1154

1155
        return tindex
1✔
1156

1157
    def history(self, oid, size=1, filter=None):
1✔
1158
        with self._lock:
1✔
1159
            r = []
1✔
1160
            pos = self._lookup_pos(oid)
1✔
1161

1162
            while 1:
1✔
1163
                if len(r) >= size:
1✔
1164
                    return r
1✔
1165
                h = self._read_data_header(pos)
1✔
1166

1167
                th = self._read_txn_header(h.tloc)
1✔
1168
                if th.ext:
1!
1169
                    d = loads(th.ext)
×
1170
                else:
1171
                    d = {}
1✔
1172

1173
                d.update({"time": TimeStamp(h.tid).timeTime(),
1✔
1174
                          "user_name": th.user,
1175
                          "description": th.descr,
1176
                          "tid": h.tid,
1177
                          "size": h.plen,
1178
                          })
1179

1180
                if filter is None or filter(d):
1!
1181
                    r.append(d)
1✔
1182

1183
                if h.prev:
1✔
1184
                    pos = h.prev
1✔
1185
                else:
1186
                    return r
1✔
1187

1188
    def _redundant_pack(self, file, pos):
1✔
1189
        assert pos > 8, pos
×
1190
        file.seek(pos - 8)
×
1191
        p = u64(file.read(8))
×
1192
        file.seek(pos - p + 8)
×
1193
        return file.read(1) not in ' u'
×
1194

1195
    @staticmethod
1✔
1196
    def packer(storage, referencesf, stop, gc):
1✔
1197
        # Our default packer is built around the original packer.  We
1198
        # simply adapt the old interface to the new.  We don't really
1199
        # want to invest much in the old packer, at least for now.
1200
        assert referencesf is not None
1✔
1201
        p = FileStoragePacker(storage, referencesf, stop, gc)
1✔
1202
        try:
1✔
1203
            opos = p.pack()
1✔
1204
            if opos is None:
1✔
1205
                return None
1✔
1206
            return opos, p.index
1✔
1207
        finally:
1208
            p.close()
1✔
1209

1210
    def pack(self, t, referencesf, gc=None):
1✔
1211
        """Copy data from the current database file to a packed file
1212

1213
        Non-current records from transactions with time-stamp strings less
1214
        than packtss are ommitted. As are all undone records.
1215

1216
        Also, data back pointers that point before packtss are resolved and
1217
        the associated data are copied, since the old records are not copied.
1218
        """
1219
        if self._is_read_only:
1!
1220
            raise ReadOnlyError()
×
1221

1222
        stop = TimeStamp(*time.gmtime(t)[:5] + (t % 60,)).raw()
1✔
1223
        if stop == z64:
1!
1224
            raise FileStorageError('Invalid pack time')
×
1225

1226
        # If the storage is empty, there's nothing to do.
1227
        if not self._index:
1✔
1228
            return
1✔
1229

1230
        with self._lock:
1✔
1231
            if self._pack_is_in_progress:
1!
1232
                raise FileStorageError('Already packing')
×
1233
            self._pack_is_in_progress = True
1✔
1234

1235
        if gc is None:
1✔
1236
            gc = self._pack_gc
1✔
1237

1238
        oldpath = self._file_name + ".old"
1✔
1239
        if os.path.exists(oldpath):
1✔
1240
            os.remove(oldpath)
1✔
1241
        if self.blob_dir and os.path.exists(self.blob_dir + ".old"):
1✔
1242
            remove_committed_dir(self.blob_dir + ".old")
1✔
1243

1244
        have_commit_lock = False
1✔
1245
        try:
1✔
1246
            pack_result = None
1✔
1247
            try:
1✔
1248
                pack_result = self.packer(self, referencesf, stop, gc)
1✔
1249
            except RedundantPackWarning as detail:
1✔
1250
                logger.info(str(detail))
1✔
1251
            if pack_result is None:
1✔
1252
                return
1✔
1253
            have_commit_lock = True
1✔
1254
            opos, index = pack_result
1✔
1255
            with self._files.write_lock():
1✔
1256
                with self._lock:
1✔
1257
                    self._files.empty()
1✔
1258
                    self._file.close()
1✔
1259
                    try:
1✔
1260
                        os.rename(self._file_name, oldpath)
1✔
1261
                    except Exception:
×
1262
                        self._file = open(self._file_name, 'r+b')
×
1263
                        raise
×
1264

1265
                    # OK, we're beyond the point of no return
1266
                    os.rename(self._file_name + '.pack', self._file_name)
1✔
1267
                    self._file = open(self._file_name, 'r+b')
1✔
1268
                    self._initIndex(index, self._tindex)
1✔
1269
                    self._pos = opos
1✔
1270

1271
            # We're basically done.  Now we need to deal with removed
1272
            # blobs and removing the .old file (see further down).
1273

1274
            if self.blob_dir:
1✔
1275
                self._commit_lock.release()
1✔
1276
                have_commit_lock = False
1✔
1277
                self._remove_blob_files_tagged_for_removal_during_pack()
1✔
1278

1279
        finally:
1280
            if have_commit_lock:
1✔
1281
                self._commit_lock.release()
1✔
1282
            with self._lock:
1✔
1283
                self._pack_is_in_progress = False
1✔
1284

1285
        if not self.pack_keep_old:
1✔
1286
            os.remove(oldpath)
1✔
1287

1288
        with self._lock:
1✔
1289
            self._save_index()
1✔
1290

1291
    def _remove_blob_files_tagged_for_removal_during_pack(self):
1✔
1292
        lblob_dir = len(self.blob_dir)
1✔
1293
        fshelper = self.fshelper
1✔
1294
        old = self.blob_dir + '.old'
1✔
1295

1296
        # Helper to clean up dirs left empty after moving things to old
1297
        def maybe_remove_empty_dir_containing(path, level=0):
1✔
1298
            path = os.path.dirname(path)
1✔
1299
            if len(path) <= lblob_dir or os.listdir(path):
1✔
1300
                return
1✔
1301

1302
            # Path points to an empty dir.  There may be a race.  We
1303
            # might have just removed the dir for an oid (or a parent
1304
            # dir) and while we're cleaning up it's parent, another
1305
            # thread is adding a new entry to it.
1306

1307
            # We don't have to worry about level 0, as this is just a
1308
            # directory containing an object's revisions. If it is
1309
            # enmpty, the object must have been garbage.
1310

1311
            # If the level is 1 or higher, we need to be more
1312
            # careful.  We'll get the storage lock and double check
1313
            # that the dir is still empty before removing it.
1314

1315
            removed = False
1✔
1316
            if level:
1!
1317
                self._lock.acquire()
1✔
1318
            try:
1✔
1319
                if not os.listdir(path):
1!
1320
                    os.rmdir(path)
1✔
1321
                    removed = True
1✔
1322
            finally:
1323
                if level:
1!
1324
                    self._lock.release()
1✔
1325

1326
            if removed:
1!
1327
                maybe_remove_empty_dir_containing(path, level + 1)
1✔
1328

1329
        if self.pack_keep_old:
1✔
1330
            # Helpers that move oid dir or revision file to the old dir.
1331
            os.mkdir(old)
1✔
1332
            link_or_copy(os.path.join(self.blob_dir, '.layout'),
1✔
1333
                         os.path.join(old, '.layout'))
1334

1335
            def handle_file(path):
1✔
1336
                newpath = old + path[lblob_dir:]
1✔
1337
                dest = os.path.dirname(newpath)
1✔
1338
                if not os.path.exists(dest):
1!
1339
                    os.makedirs(dest)
1✔
1340
                os.rename(path, newpath)
1✔
1341
            handle_dir = handle_file
1✔
1342
        else:
1343
            # Helpers that remove an oid dir or revision file.
1344
            handle_file = remove_committed
1✔
1345
            handle_dir = remove_committed_dir
1✔
1346

1347
        # Fist step: move or remove oids or revisions
1348
        with open(os.path.join(self.blob_dir, '.removed'), 'rb') as fp:
1✔
1349
            for line in fp:
1✔
1350
                line = binascii.unhexlify(line.strip())
1✔
1351

1352
                if len(line) == 8:
1✔
1353
                    # oid is garbage, re/move dir
1354
                    path = fshelper.getPathForOID(line)
1✔
1355
                    if not os.path.exists(path):
1!
1356
                        # Hm, already gone. Odd.
1357
                        continue
×
1358
                    handle_dir(path)
1✔
1359
                    maybe_remove_empty_dir_containing(path, 1)
1✔
1360
                    continue
1✔
1361

1362
                if len(line) != 16:
1!
1363
                    raise ValueError(
×
1364
                        "Bad record in ", self.blob_dir, '.removed')
1365

1366
                oid, tid = line[:8], line[8:]
1✔
1367
                path = fshelper.getBlobFilename(oid, tid)
1✔
1368
                if not os.path.exists(path):
1!
1369
                    # Hm, already gone. Odd.
1370
                    continue
×
1371
                handle_file(path)
1✔
1372
                assert not os.path.exists(path)
1✔
1373
                maybe_remove_empty_dir_containing(path)
1✔
1374

1375
        os.remove(os.path.join(self.blob_dir, '.removed'))
1✔
1376

1377
        if not self.pack_keep_old:
1✔
1378
            return
1✔
1379

1380
        # Second step, copy remaining files.
1381
        for path, dir_names, file_names in os.walk(self.blob_dir):
1✔
1382
            for file_name in file_names:
1✔
1383
                if not file_name.endswith('.blob'):
1✔
1384
                    continue
1✔
1385
                file_path = os.path.join(path, file_name)
1✔
1386
                dest = os.path.dirname(old + file_path[lblob_dir:])
1✔
1387
                if not os.path.exists(dest):
1✔
1388
                    os.makedirs(dest)
1✔
1389
                link_or_copy(file_path, old + file_path[lblob_dir:])
1✔
1390

1391
    def iterator(self, start=None, stop=None):
1✔
1392
        return FileIterator(self._file_name, start, stop)
1✔
1393

1394
    def lastInvalidations(self, count):
1✔
1395
        file = self._file
1✔
1396
        seek = file.seek
1✔
1397
        read = file.read
1✔
1398
        with self._lock:
1✔
1399
            pos = self._pos
1✔
1400
            while count > 0 and pos > 4:
1✔
1401
                count -= 1
1✔
1402
                seek(pos - 8)
1✔
1403
                pos = pos - 8 - u64(read(8))
1✔
1404

1405
            seek(0)
1✔
1406
            return [(trans.tid, [r.oid for r in trans])
1✔
1407
                    for trans in FileIterator(self._file_name, pos=pos)]
1408

1409
    def lastTid(self, oid):
1✔
1410
        """Return last serialno committed for object oid.
1411

1412
        If there is no serialno for this oid -- which can only occur
1413
        if it is a new object -- return None.
1414
        """
1415
        try:
×
1416
            return self.getTid(oid)
×
1417
        except KeyError:
×
1418
            return None
×
1419

1420
    def cleanup(self):
1✔
1421
        """Remove all files created by this storage."""
1422
        for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
1✔
1423
            try:
1✔
1424
                os.remove(self._file_name + ext)
1✔
1425
            except OSError as e:
1✔
1426
                if e.errno != errno.ENOENT:
1!
1427
                    raise
×
1428

1429
    def record_iternext(self, next=None):
1✔
1430
        index = self._index
1✔
1431
        oid = index.minKey(next)
1✔
1432

1433
        oid_as_long, = unpack(">Q", oid)
1✔
1434
        next_oid = pack(">Q", oid_as_long + 1)
1✔
1435
        try:
1✔
1436
            next_oid = index.minKey(next_oid)
1✔
1437
        except ValueError:  # "empty tree" error
1✔
1438
            next_oid = None
1✔
1439

1440
        data, tid = load_current(self, oid)
1✔
1441

1442
        return oid, tid, data, next_oid
1✔
1443

1444
    ######################################################################
1445
    # The following 2 methods are for testing a ZEO extension mechanism
1446
    def getExtensionMethods(self):
1✔
1447
        return dict(answer_to_the_ultimate_question=None)
×
1448

1449
    def answer_to_the_ultimate_question(self):
1✔
1450
        return 42
×
1451
    #
1452
    ######################################################################
1453

1454

1455
def shift_transactions_forward(index, tindex, file, pos, opos):
1✔
1456
    """Copy transactions forward in the data file
1457

1458
    This might be done as part of a recovery effort
1459
    """
1460

1461
    # Cache a bunch of methods
1462
    seek = file.seek
×
1463
    read = file.read
×
1464
    write = file.write
×
1465

1466
    index_get = index.get
×
1467

1468
    # Initialize,
1469
    p1 = opos
×
1470
    p2 = pos
×
NEW
1471
    offset = p2 - p1
×
1472

1473
    # Copy the data in two stages.  In the packing stage,
1474
    # we skip records that are non-current or that are for
1475
    # unreferenced objects. We also skip undone transactions.
1476
    #
1477
    # After the packing stage, we copy everything but undone
1478
    # transactions, however, we have to update various back pointers.
1479
    # We have to have the storage lock in the second phase to keep
1480
    # data from being changed while we're copying.
UNCOV
1481
    while 1:
×
1482

1483
        # Read the transaction record
1484
        seek(pos)
×
1485
        h = read(TRANS_HDR_LEN)
×
1486
        if len(h) < TRANS_HDR_LEN:
×
1487
            break
×
1488
        tid, stl, status, ul, dl, el = unpack(TRANS_HDR, h)
×
1489
        status = as_text(status)
×
1490
        if status == 'c':
×
1491
            break  # Oops. we found a checkpoint flag.
×
1492
        tl = u64(stl)
×
1493
        tpos = pos
×
NEW
1494
        tend = tpos + tl
×
1495

1496
        otpos = opos  # start pos of output trans
×
1497

NEW
1498
        thl = ul + dl + el
×
1499
        h2 = read(thl)
×
1500
        if len(h2) != thl:
×
1501
            raise PackError(opos)
×
1502

1503
        # write out the transaction record
1504
        seek(opos)
×
1505
        write(h)
×
1506
        write(h2)
×
1507

NEW
1508
        thl = TRANS_HDR_LEN + thl
×
NEW
1509
        pos = tpos + thl
×
NEW
1510
        opos = otpos + thl
×
1511

1512
        while pos < tend:
×
1513
            # Read the data records for this transaction
1514
            seek(pos)
×
1515
            h = read(DATA_HDR_LEN)
×
1516
            oid, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
×
1517
            assert not vlen
×
1518
            plen = u64(splen)
×
NEW
1519
            dlen = DATA_HDR_LEN + (plen or 8)
×
1520

1521
            tindex[oid] = opos
×
1522

1523
            if plen:
×
1524
                p = read(plen)
×
1525
            else:
1526
                p = read(8)
×
1527
                p = u64(p)
×
1528
                if p >= p2:
×
NEW
1529
                    p = p - offset
×
1530
                elif p >= p1:
×
1531
                    # Ick, we're in trouble. Let's bail
1532
                    # to the index and hope for the best
1533
                    p = index_get(oid, 0)
×
1534
                p = p64(p)
×
1535

1536
            # WRITE
1537
            seek(opos)
×
1538
            sprev = p64(index_get(oid, 0))
×
1539
            write(pack(DATA_HDR,
×
1540
                       oid, serial, sprev, p64(otpos), 0, splen))
1541

1542
            write(p)
×
1543

NEW
1544
            opos = opos + dlen
×
NEW
1545
            pos = pos + dlen
×
1546

1547
        # skip the (intentionally redundant) transaction length
NEW
1548
        pos = pos + 8
×
1549

1550
        if status != 'u':
×
1551
            index.update(tindex)  # Record the position
×
1552

1553
        tindex.clear()
×
1554

1555
        write(stl)
×
NEW
1556
        opos = opos + 8
×
1557

1558
    return opos
×
1559

1560

1561
def search_back(file, pos):
1✔
1562
    seek = file.seek
×
1563
    read = file.read
×
1564
    seek(0, 2)
×
1565
    s = p = file.tell()
×
1566
    while p > pos:
×
NEW
1567
        seek(p - 8)
×
1568
        l_ = u64(read(8))
×
1569
        if l_ <= 0:
×
1570
            break
×
NEW
1571
        p = p - l_ - 8
×
1572

1573
    return p, s
×
1574

1575

1576
def recover(file_name):
1✔
1577
    file = open(file_name, 'r+b')
×
1578
    index = {}
×
1579
    tindex = {}
×
1580

1581
    pos, oid, tid = read_index(file, file_name, index, tindex, recover=1)
×
1582
    if oid is not None:
×
1583
        print("Nothing to recover")
×
1584
        return
×
1585

1586
    opos = pos
×
1587
    pos, sz = search_back(file, pos)
×
1588
    if pos < sz:
×
1589
        npos = shift_transactions_forward(index, tindex, file, pos, opos)
×
1590

1591
    file.truncate(npos)
×
1592

1593
    print("Recovered file, lost {}, ended up with {} bytes".format(
×
1594
        pos - opos, npos))
1595

1596

1597
def read_index(file, name, index, tindex, stop=b'\377' * 8,
1✔
1598
               ltid=z64, start=4, maxoid=z64, recover=0, read_only=0):
1599
    """Scan the file storage and update the index.
1600

1601
    Returns file position, max oid, and last transaction id.  It also
1602
    stores index information in the three dictionary arguments.
1603

1604
    Arguments:
1605
    file -- a file object (the Data.fs)
1606
    name -- the name of the file (presumably file.name)
1607
    index -- fsIndex, oid -> data record file offset
1608
    tindex -- dictionary, oid -> data record offset
1609
              tindex is cleared before return
1610

1611
    There are several default arguments that affect the scan or the
1612
    return values.  TODO:  document them.
1613

1614
    start -- the file position at which to start scanning for oids added
1615
             beyond the ones the passed-in indices know about.  The .index
1616
             file caches the highest ._pos FileStorage knew about when the
1617
             the .index file was last saved, and that's the intended value
1618
             to pass in for start; accept the default (and pass empty
1619
             indices) to recreate the index from scratch
1620
    maxoid -- ignored (it meant something prior to ZODB 3.2.6; the argument
1621
              still exists just so the signature of read_index() stayed the
1622
              same)
1623

1624
    The file position returned is the position just after the last
1625
    valid transaction record.  The oid returned is the maximum object
1626
    id in `index`, or z64 if the index is empty.  The transaction id is the
1627
    tid of the last transaction, or ltid if the index is empty.
1628
    """
1629

1630
    read = file.read
1✔
1631
    seek = file.seek
1✔
1632
    seek(0, 2)
1✔
1633
    file_size = file.tell()
1✔
1634
    fmt = TempFormatter(file)
1✔
1635

1636
    if file_size:
1!
1637
        if file_size < start:
1!
1638
            raise FileStorageFormatError(file.name)
×
1639
        seek(0)
1✔
1640
        if read(4) != packed_version:
1!
1641
            raise FileStorageFormatError(name)
×
1642
    else:
1643
        if not read_only:
×
1644
            file.write(packed_version)
×
1645
        return 4, z64, ltid
×
1646

1647
    index_get = index.get
1✔
1648

1649
    pos = start
1✔
1650
    seek(start)
1✔
1651
    tid = b'\0' * 7 + b'\1'
1✔
1652

1653
    while 1:
1✔
1654
        # Read the transaction record
1655
        h = read(TRANS_HDR_LEN)
1✔
1656
        if not h:
1✔
1657
            break
1✔
1658
        if len(h) != TRANS_HDR_LEN:
1!
1659
            if not read_only:
×
1660
                logger.warning('%s truncated at %s', name, pos)
×
1661
                seek(pos)
×
1662
                file.truncate()
×
1663
            break
×
1664

1665
        tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
1✔
1666
        status = as_text(status)
1✔
1667

1668
        if tid <= ltid:
1!
1669
            logger.warning("%s time-stamp reduction at %s", name, pos)
×
1670
        ltid = tid
1✔
1671

1672
        if pos + (tl + 8) > file_size or status == 'c':
1!
1673
            # Hm, the data were truncated or the checkpoint flag wasn't
1674
            # cleared.  They may also be corrupted,
1675
            # in which case, we don't want to totally lose the data.
1676
            if not read_only:
×
1677
                logger.warning("%s truncated, possibly due to damaged"
×
1678
                               " records at %s", name, pos)
1679
                _truncate(file, name, pos)
×
1680
            break
×
1681

1682
        if status not in ' up':
1!
1683
            logger.warning('%s has invalid status, %s, at %s',
×
1684
                           name, status, pos)
1685

1686
        if tl < TRANS_HDR_LEN + ul + dl + el:
1!
1687
            # We're in trouble. Find out if this is bad data in the
1688
            # middle of the file, or just a turd that Win 9x dropped
1689
            # at the end when the system crashed.
1690
            # Skip to the end and read what should be the transaction length
1691
            # of the last transaction.
1692
            seek(-8, 2)
×
1693
            rtl = u64(read(8))
×
1694
            # Now check to see if the redundant transaction length is
1695
            # reasonable:
1696
            if file_size - rtl < pos or rtl < TRANS_HDR_LEN:
×
1697
                logger.critical('%s has invalid transaction header at %s',
×
1698
                                name, pos)
1699
                if not read_only:
×
1700
                    logger.warning(
×
1701
                        "It appears that there is invalid data at the end of "
1702
                        "the file, possibly due to a system crash.  %s "
1703
                        "truncated to recover from bad data at end." % name)
1704
                    _truncate(file, name, pos)
×
1705
                break
×
1706
            else:
1707
                if recover:
×
1708
                    return pos, None, None
×
1709
                panic('%s has invalid transaction header at %s', name, pos)
×
1710

1711
        if tid >= stop:
1!
1712
            break
×
1713

1714
        tpos = pos
1✔
1715
        tend = tpos + tl
1✔
1716

1717
        if status == 'u':
1!
1718
            # Undone transaction, skip it
1719
            seek(tend)
×
1720
            h = u64(read(8))
×
1721
            if h != tl:
×
1722
                if recover:
×
1723
                    return tpos, None, None
×
1724
                panic('%s has inconsistent transaction length at %s',
×
1725
                      name, pos)
1726
            pos = tend + 8
×
1727
            continue
×
1728

1729
        pos = tpos + TRANS_HDR_LEN + ul + dl + el
1✔
1730
        while pos < tend:
1✔
1731
            # Read the data records for this transaction
1732
            h = fmt._read_data_header(pos)
1✔
1733
            dlen = h.recordlen()
1✔
1734
            tindex[h.oid] = pos
1✔
1735

1736
            if pos + dlen > tend or h.tloc != tpos:
1!
1737
                if recover:
×
1738
                    return tpos, None, None
×
1739
                panic("%s data record exceeds transaction record at %s",
×
1740
                      name, pos)
1741

1742
            if index_get(h.oid, 0) != h.prev:
1!
1743
                if h.prev:
×
1744
                    if recover:
×
1745
                        return tpos, None, None
×
1746
                    logger.error("%s incorrect previous pointer at %s",
×
1747
                                 name, pos)
1748
                else:
1749
                    logger.warning("%s incorrect previous pointer at %s",
×
1750
                                   name, pos)
1751

1752
            pos += dlen
1✔
1753

1754
        if pos != tend:
1!
1755
            if recover:
×
1756
                return tpos, None, None
×
1757
            panic("%s data records don't add up at %s", name, tpos)
×
1758

1759
        # Read the (intentionally redundant) transaction length
1760
        seek(pos)
1✔
1761
        h = u64(read(8))
1✔
1762
        if h != tl:
1!
1763
            if recover:
×
1764
                return tpos, None, None
×
1765
            panic("%s redundant transaction length check failed at %s",
×
1766
                  name, pos)
1767
        pos += 8
1✔
1768

1769
        index.update(tindex)
1✔
1770
        tindex.clear()
1✔
1771

1772
    # Caution:  fsIndex doesn't have an efficient __nonzero__ or __len__.
1773
    # That's why we do try/except instead.  fsIndex.maxKey() is fast.
1774
    try:
1✔
1775
        maxoid = index.maxKey()
1✔
1776
    except ValueError:
1✔
1777
        # The index is empty.
1778
        pass  # maxoid is already equal to z64
1✔
1779

1780
    return pos, maxoid, ltid
1✔
1781

1782

1783
def _truncate(file, name, pos):
1✔
1784
    file.seek(0, 2)
×
1785
    file_size = file.tell()
×
1786
    try:
×
1787
        i = 0
×
UNCOV
1788
        while 1:
×
NEW
1789
            oname = f'{name}.tr{i}'
×
1790
            if os.path.exists(oname):
×
1791
                i += 1
×
1792
            else:
1793
                logger.warning("Writing truncated data from %s to %s",
×
1794
                               name, oname)
1795
                o = open(oname, 'wb')
×
1796
                file.seek(pos)
×
NEW
1797
                cp(file, o, file_size - pos)
×
1798
                o.close()
×
1799
                break
×
1800
    except:  # noqa: E722 do not use bare 'except'
×
1801
        logger.exception("couldn\'t write truncated data for %s", name)
×
1802
        raise StorageSystemError("Couldn't save truncated data")
×
1803

1804
    file.seek(pos)
×
1805
    file.truncate()
×
1806

1807

1808
class FileIterator(FileStorageFormatter):
1✔
1809
    """Iterate over the transactions in a FileStorage file.
1810
    """
1811
    _ltid = z64
1✔
1812
    _file = None
1✔
1813

1814
    def __init__(self, filename, start=None, stop=None, pos=4):
1✔
1815
        assert isinstance(filename, str)
1✔
1816
        file = open(filename, 'rb')
1✔
1817
        self._file = file
1✔
1818
        self._file_name = filename
1✔
1819
        if file.read(4) != packed_version:
1!
1820
            raise FileStorageFormatError(file.name)
×
1821
        file.seek(0, 2)
1✔
1822
        self._file_size = file.tell()
1✔
1823
        if (pos < 4) or pos > self._file_size:
1!
1824
            raise ValueError("Given position is greater than the file size",
×
1825
                             pos, self._file_size)
1826
        self._pos = pos
1✔
1827
        assert start is None or isinstance(start, bytes)
1✔
1828
        assert stop is None or isinstance(stop, bytes)
1✔
1829
        self._start = start
1✔
1830
        self._stop = stop
1✔
1831
        if start:
1✔
1832
            if self._file_size <= 4:
1✔
1833
                return
1✔
1834
            self._skip_to_start(start)
1✔
1835

1836
    def __len__(self):
1✔
1837
        # Define a bogus __len__() to make the iterator work
1838
        # with code like builtin list() and tuple().
1839
        # There's a lot of C code that expects a sequence to have
1840
        # an __len__() but can cope with any sort of mistake in its
1841
        # implementation.  So just return 0.
1842
        return 0
1✔
1843

1844
    # This allows us to pass an iterator as the `other` argument to
1845
    # copyTransactionsFrom() in BaseStorage.  The advantage here is that we
1846
    # can create the iterator manually, e.g. setting start and stop, and then
1847
    # just let copyTransactionsFrom() do its thing.
1848
    def iterator(self):
1✔
1849
        return self
×
1850

1851
    def close(self):
1✔
1852
        file = self._file
1✔
1853
        if file is not None:
1✔
1854
            self._file = None
1✔
1855
            file.close()
1✔
1856

1857
    def _skip_to_start(self, start):
1✔
1858
        file = self._file
1✔
1859
        pos1 = self._pos
1✔
1860
        file.seek(pos1)
1✔
1861
        tid1 = file.read(8)  # XXX bytes
1✔
1862
        if len(tid1) < 8:
1!
1863
            raise CorruptedError("Couldn't read tid.")
×
1864
        if start < tid1:
1✔
1865
            pos2 = pos1
1✔
1866
            tid2 = tid1
1✔
1867
            file.seek(4)
1✔
1868
            tid1 = file.read(8)
1✔
1869
            if start <= tid1:
1✔
1870
                self._pos = 4
1✔
1871
                return
1✔
1872
            pos1 = 4
1✔
1873
        else:
1874
            if start == tid1:
1✔
1875
                return
1✔
1876

1877
            # Try to read the last transaction. We could be unlucky and
1878
            # opened the file while committing a transaction.  In that
1879
            # case, we'll just scan from the beginning if the file is
1880
            # small enough, otherwise we'll fail.
1881
            file.seek(self._file_size - 8)
1✔
1882
            l_ = u64(file.read(8))
1✔
1883
            if not (l_ + 12 <= self._file_size and
1!
1884
                    self._read_num(self._file_size - l_) == l_):
1885
                if self._file_size < (1 << 20):
×
1886
                    return self._scan_foreward(start)
×
1887
                raise ValueError("Can't find last transaction in large file")
×
1888
            pos2 = self._file_size - l_ - 8
1✔
1889
            file.seek(pos2)
1✔
1890
            tid2 = file.read(8)
1✔
1891
            if tid2 < tid1:
1!
1892
                raise CorruptedError("Tids out of order.")
×
1893
            if tid2 <= start:
1✔
1894
                if tid2 == start:
1✔
1895
                    self._pos = pos2
1✔
1896
                else:
1897
                    self._pos = self._file_size
1✔
1898
                return
1✔
1899

1900
        t1 = TimeStamp(tid1).timeTime()
1✔
1901
        t2 = TimeStamp(tid2).timeTime()
1✔
1902
        ts = TimeStamp(start).timeTime()
1✔
1903
        if (ts - t1) < (t2 - ts):
1✔
1904
            return self._scan_forward(pos1, start)
1✔
1905
        else:
1906
            return self._scan_backward(pos2, start)
1✔
1907

1908
    def _scan_forward(self, pos, start):
1✔
1909
        logger.debug("Scan forward %s:%s looking for %r",
1✔
1910
                     self._file_name, pos, start)
1911
        while 1:
1✔
1912
            # Read the transaction record
1913
            h = self._read_txn_header(pos)
1✔
1914
            if h.tid >= start:
1✔
1915
                self._pos = pos
1✔
1916
                return
1✔
1917

1918
            pos += h.tlen + 8
1✔
1919

1920
    def _scan_backward(self, pos, start):
1✔
1921
        logger.debug("Scan backward %s:%s looking for %r",
1✔
1922
                     self._file_name, pos, start)
1923
        file = self._file
1✔
1924
        seek = file.seek
1✔
1925
        read = file.read
1✔
1926
        while 1:
1✔
1927
            pos -= 8
1✔
1928
            seek(pos)
1✔
1929
            tlen = u64(read(8))
1✔
1930
            pos -= tlen
1✔
1931
            h = self._read_txn_header(pos)
1✔
1932
            if h.tid <= start:
1✔
1933
                if h.tid == start:
1!
1934
                    self._pos = pos
1✔
1935
                else:
1936
                    self._pos = pos + tlen + 8
×
1937
                return
1✔
1938

1939
    # Iterator protocol
1940
    def __iter__(self):
1✔
1941
        return self
1✔
1942

1943
    def __next__(self):
1✔
1944
        if self._file is None:
1✔
1945
            raise StopIteration()
1✔
1946

1947
        pos = self._pos
1✔
1948
        while True:
1✔
1949

1950
            # Read the transaction record
1951
            try:
1✔
1952
                h = self._read_txn_header(pos)
1✔
1953
            except CorruptedDataError as err:
1✔
1954
                # If buf is empty, we've reached EOF.
1955
                if not err.buf:
1!
1956
                    break
1✔
1957
                raise
×
1958

1959
            if h.tid <= self._ltid:
1!
1960
                logger.warning("%s time-stamp reduction at %s",
×
1961
                               self._file.name, pos)
1962
            self._ltid = h.tid
1✔
1963

1964
            if self._stop is not None and h.tid > self._stop:
1✔
1965
                break
1✔
1966

1967
            if h.status == "c":
1✔
1968
                # Assume we've hit the last, in-progress transaction
1969
                break
1✔
1970

1971
            if pos + h.tlen + 8 > self._file_size:
1✔
1972
                # Hm, the data were truncated or the checkpoint flag wasn't
1973
                # cleared.  They may also be corrupted,
1974
                # in which case, we don't want to totally lose the data.
1975
                logger.warning("%s truncated, possibly due to"
1✔
1976
                               " damaged records at %s", self._file.name, pos)
1977
                break
1✔
1978

1979
            if h.status not in " up":
1!
1980
                logger.warning('%s has invalid status,'
×
1981
                               ' %s, at %s', self._file.name, h.status, pos)
1982

1983
            if h.tlen < h.headerlen():
1!
1984
                # We're in trouble. Find out if this is bad data in
1985
                # the middle of the file, or just a turd that Win 9x
1986
                # dropped at the end when the system crashed.  Skip to
1987
                # the end and read what should be the transaction
1988
                # length of the last transaction.
1989
                self._file.seek(-8, 2)
×
1990
                rtl = u64(self._file.read(8))
×
1991
                # Now check to see if the redundant transaction length is
1992
                # reasonable:
1993
                if self._file_size - rtl < pos or rtl < TRANS_HDR_LEN:
×
1994
                    logger.critical("%s has invalid transaction header at %s",
×
1995
                                    self._file.name, pos)
1996
                    logger.warning(
×
1997
                        "It appears that there is invalid data at the end of "
1998
                        "the file, possibly due to a system crash.  %s "
1999
                        "truncated to recover from bad data at end."
2000
                        % self._file.name)
2001
                    break
×
2002
                else:
2003
                    logger.warning("%s has invalid transaction header at %s",
×
2004
                                   self._file.name, pos)
2005
                    break
×
2006

2007
            tpos = pos
1✔
2008
            tend = tpos + h.tlen
1✔
2009

2010
            if h.status != "u":
1!
2011
                pos = tpos + h.headerlen()
1✔
2012
                result = TransactionRecord(h.tid, h.status, h.user, h.descr,
1✔
2013
                                           h.ext, pos, tend, self._file, tpos)
2014

2015
            # Read the (intentionally redundant) transaction length
2016
            self._file.seek(tend)
1✔
2017
            rtl = u64(self._file.read(8))
1✔
2018
            if rtl != h.tlen:
1!
2019
                logger.warning("%s redundant transaction length check"
×
2020
                               " failed at %s", self._file.name, tend)
2021
                break
×
2022
            self._pos = tend + 8
1✔
2023

2024
            return result
1✔
2025

2026
        self.close()
1✔
2027
        raise StopIteration()
1✔
2028

2029
    next = __next__
1✔
2030

2031

2032
class TransactionRecord(_TransactionRecord):
1✔
2033

2034
    def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
1✔
2035
        _TransactionRecord.__init__(
1✔
2036
            self, tid, status, user, desc, ext)
2037
        self._pos = pos
1✔
2038
        self._tend = tend
1✔
2039
        self._file = file
1✔
2040
        self._tpos = tpos
1✔
2041

2042
    def __iter__(self):
1✔
2043
        return TransactionRecordIterator(self)
1✔
2044

2045

2046
class TransactionRecordIterator(FileStorageFormatter):
1✔
2047
    """Iterate over the transactions in a FileStorage file."""
2048

2049
    def __init__(self, record):
1✔
2050
        self._file = record._file
1✔
2051
        self._pos = record._pos
1✔
2052
        self._tpos = record._tpos
1✔
2053
        self._tend = record._tend
1✔
2054

2055
    def __iter__(self):
1✔
2056
        return self
1✔
2057

2058
    def __next__(self):
1✔
2059
        pos = self._pos
1✔
2060
        while pos < self._tend:
1✔
2061
            # Read the data records for this transaction
2062
            h = self._read_data_header(pos)
1✔
2063
            dlen = h.recordlen()
1✔
2064

2065
            if pos + dlen > self._tend or h.tloc != self._tpos:
1✔
2066
                logger.warning("%s data record exceeds transaction"
1✔
2067
                               " record at %s", self._file.name, pos)
2068
                break
1✔
2069

2070
            self._pos = pos + dlen
1✔
2071
            prev_txn = None
1✔
2072
            if h.plen:
1✔
2073
                data = self._file.read(h.plen)
1✔
2074
            else:
2075
                if h.back == 0:
1✔
2076
                    # If the backpointer is 0, then this transaction
2077
                    # undoes the object creation.  It undid the
2078
                    # transaction that created it.  Return None
2079
                    # instead of a pickle to indicate this.
2080
                    data = None
1✔
2081
                else:
2082
                    data, tid = self._loadBackTxn(h.oid, h.back, False)
1✔
2083
                    # Caution:  :ooks like this only goes one link back.
2084
                    # Should it go to the original data like BDBFullStorage?
2085
                    prev_txn = self.getTxnFromData(h.oid, h.back)
1✔
2086

2087
            return Record(h.oid, h.tid, data, prev_txn, pos)
1✔
2088

2089
        raise StopIteration()
1✔
2090

2091
    next = __next__
1✔
2092

2093

2094
class Record(_DataRecord):
1✔
2095

2096
    def __init__(self, oid, tid, data, prev, pos):
1✔
2097
        super().__init__(oid, tid, data, prev)
1✔
2098
        self.pos = pos
1✔
2099

2100

2101
class UndoSearch:
1✔
2102

2103
    def __init__(self, file, pos, first, last, filter=None):
1✔
2104
        self.file = file
1✔
2105
        self.pos = pos
1✔
2106
        self.first = first
1✔
2107
        self.last = last
1✔
2108
        self.filter = filter
1✔
2109
        # self.i is the index of the transaction we're _going_ to find
2110
        # next.  When it reaches self.first, we should start appending
2111
        # to self.results.  When it reaches self.last, we're done
2112
        # (although we may finish earlier).
2113
        self.i = 0
1✔
2114
        self.results = []
1✔
2115
        self.stop = False
1✔
2116

2117
    def finished(self):
1✔
2118
        """Return True if UndoSearch has found enough records."""
2119
        # BAW: Why 39 please?  This makes no sense (see also below).
2120
        return self.i >= self.last or self.pos < 39 or self.stop
1✔
2121

2122
    def search(self):
1✔
2123
        """Search for another record."""
2124
        dict = self._readnext()
1✔
2125
        if dict is not None and (self.filter is None or self.filter(dict)):
1✔
2126
            if self.i >= self.first:
1✔
2127
                self.results.append(dict)
1✔
2128
            self.i += 1
1✔
2129

2130
    def _readnext(self):
1✔
2131
        """Read the next record from the storage."""
2132
        self.file.seek(self.pos - 8)
1✔
2133
        self.pos -= u64(self.file.read(8)) + 8
1✔
2134
        self.file.seek(self.pos)
1✔
2135
        h = self.file.read(TRANS_HDR_LEN)
1✔
2136
        tid, tl, status, ul, dl, el = unpack(TRANS_HDR, h)
1✔
2137
        status = as_text(status)
1✔
2138
        if status == 'p':
1✔
2139
            self.stop = 1
1✔
2140
            return None
1✔
2141
        if status != ' ':
1!
2142
            return None
×
2143
        d = u = b''
1✔
2144
        if ul:
1✔
2145
            u = self.file.read(ul)
1✔
2146
        if dl:
1✔
2147
            d = self.file.read(dl)
1✔
2148
        e = {}
1✔
2149
        if el:
1✔
2150
            try:
1✔
2151
                e = loads(self.file.read(el))
1✔
2152
            except:  # noqa: E722 do not use bare 'except'
×
2153
                pass
×
2154
        d = {'id': encodebytes(tid).rstrip(),
1✔
2155
             'time': TimeStamp(tid).timeTime(),
2156
             'user_name': u,
2157
             'size': tl,
2158
             'description': d}
2159
        d.update(e)
1✔
2160
        return d
1✔
2161

2162

2163
class FilePool:
1✔
2164

2165
    closed = False
1✔
2166
    writing = False
1✔
2167
    writers = 0
1✔
2168

2169
    def __init__(self, file_name):
1✔
2170
        self.name = file_name
1✔
2171
        self._files = []
1✔
2172
        self._out = []
1✔
2173
        self._cond = utils.Condition()
1✔
2174

2175
    @contextlib.contextmanager
1✔
2176
    def write_lock(self):
1✔
2177
        with self._cond:
1✔
2178
            self.writers += 1
1✔
2179
            while self.writing or self._out:
1✔
2180
                self._cond.wait()
1✔
2181
            if self.closed:
1!
2182
                raise ValueError('closed')
×
2183
            self.writing = True
1✔
2184

2185
        try:
1✔
2186
            yield None
1✔
2187
        finally:
2188
            with self._cond:
1✔
2189
                self.writing = False
1✔
2190
                if self.writers > 0:
1✔
2191
                    self.writers -= 1
1✔
2192
                self._cond.notify_all()
1✔
2193

2194
    @contextlib.contextmanager
1✔
2195
    def get(self):
1✔
2196
        with self._cond:
1✔
2197
            while self.writers:
1✔
2198
                self._cond.wait()
1✔
2199
            assert not self.writing
1✔
2200
            if self.closed:
1✔
2201
                raise ValueError('closed')
1✔
2202

2203
            try:
1✔
2204
                f = self._files.pop()
1✔
2205
            except IndexError:
1✔
2206
                f = open(self.name, 'rb')
1✔
2207
            self._out.append(f)
1✔
2208

2209
        try:
1✔
2210
            yield f
1✔
2211
        finally:
2212
            self._out.remove(f)
1✔
2213
            self._files.append(f)
1✔
2214
            if not self._out:
1✔
2215
                with self._cond:
1✔
2216
                    if self.writers and not self._out:
1✔
2217
                        self._cond.notify_all()
1✔
2218

2219
    def empty(self):
1✔
2220
        while self._files:
1✔
2221
            self._files.pop().close()
1✔
2222

2223
    def flush(self):
1✔
2224
        """Empty read buffers.
2225

2226
        This is required if they contain data of rolled back transactions.
2227
        """
2228
        # Unfortunately, Python has no API to flush read buffers.
2229
        with self.write_lock():
1✔
2230
            self.empty()
1✔
2231

2232
    def close(self):
1✔
2233
        with self._cond:
1✔
2234
            self.closed = True
1✔
2235
            while self._out:
1!
2236
                self._out.pop().close()
×
2237
            self.empty()
1✔
2238
            self.writing = self.writers = 0
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc