• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / ZODB / 5907636075

18 Aug 2023 10:27PM UTC coverage: 83.719% (+0.001%) from 83.718%
5907636075

Pull #390

github

web-flow
Merge a17af3ce3 into 0632974df
Pull Request #390: Update blob.py

2877 of 4050 branches covered (71.04%)

2 of 2 new or added lines in 1 file covered. (100.0%)

13323 of 15914 relevant lines covered (83.72%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/src/ZODB/blob.py
1
##############################################################################
2
#
3
# Copyright (c) 2005-2006 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE
12
#
13
##############################################################################
14
"""Blobs
1✔
15
"""
16

17
import binascii
1✔
18
import logging
1✔
19
import os
1✔
20
import re
1✔
21
import shutil
1✔
22
import stat
1✔
23
import sys
1✔
24
import tempfile
1✔
25
import weakref
1✔
26
from base64 import decodebytes
1✔
27
from io import BytesIO
1✔
28
from io import FileIO
1✔
29

30
import persistent
1✔
31
import zope.interface
1✔
32

33
import ZODB.interfaces
1✔
34
from ZODB import utils
1✔
35
from ZODB._compat import PersistentUnpickler
1✔
36
from ZODB._compat import ascii_bytes
1✔
37
from ZODB.interfaces import BlobError
1✔
38
from ZODB.POSException import POSKeyError
1✔
39

40

41
logger = logging.getLogger('ZODB.blob')
1✔
42

43
BLOB_SUFFIX = ".blob"
1✔
44
SAVEPOINT_SUFFIX = ".spb"
1✔
45

46
LAYOUT_MARKER = '.layout'
1✔
47
LAYOUTS = {}
1✔
48

49
valid_modes = 'r', 'w', 'r+', 'a', 'c'
1✔
50

51
# Threading issues:
52
# We want to support closing blob files when they are destroyed.
53
# This introduces a threading issue, since a blob file may be destroyed
54
# via GC in any thread.
55

56
# PyPy 2.5 doesn't properly call the cleanup function
57
# of a weakref when the weakref object dies at the same time
58
# as the object it refers to. In other words, this doesn't work:
59
#    self._ref = weakref.ref(self, lambda ref: ...)
60
# because the function never gets called
61
# (https://bitbucket.org/pypy/pypy/issue/2030).
62
# The Blob class used to use that pattern to clean up uncommitted
63
# files; now we use this module-level global (but still keep a
64
# reference in the Blob in case we need premature cleanup).
65
_blob_close_refs = []
1✔
66

67

68
@zope.interface.implementer(ZODB.interfaces.IBlob)
1✔
69
class Blob(persistent.Persistent):
1✔
70
    """A BLOB supports efficient handling of large data within ZODB."""
71

72
    _p_blob_uncommitted = None  # Filename of the uncommitted (dirty) data
1✔
73
    _p_blob_committed = None  # Filename of the committed data
1✔
74
    _p_blob_ref = None  # weakreference to self; also in _blob_close_refs
1✔
75

76
    readers = writers = None
1✔
77

78
    def __init__(self, data=None):
1✔
79
        # Raise exception if Blobs are getting subclassed
80
        # refer to ZODB-Bug No.127182 by Jim Fulton on 2007-07-20
81
        if (self.__class__ is not Blob):
1✔
82
            raise TypeError('Blobs do not support subclassing.')
1✔
83
        self.__setstate__()
1✔
84
        if data is not None:
1✔
85
            with self.open('w') as f:
1✔
86
                f.write(data)
1✔
87

88
    def __setstate__(self, state=None):
1✔
89
        # we use lists here because it will allow us to add and remove
90
        # atomically
91
        self.readers = []
1✔
92
        self.writers = []
1✔
93

94
    def __getstate__(self):
1✔
95
        return None
1✔
96

97
    def _p_deactivate(self):
1✔
98
        # Only ghostify if we are unopened.
99
        if self.readers or self.writers:
1!
100
            return
×
101
        super()._p_deactivate()
1✔
102

103
    def _p_invalidate(self):
1✔
104
        # Force-close any open readers or writers,
105
        # XXX should we warn of this? Maybe?
106
        if self._p_changed is None:
1✔
107
            return
1✔
108
        for ref in (self.readers or [])+(self.writers or []):
1✔
109
            f = ref()
1✔
110
            if f is not None:
1!
111
                f.close()
1✔
112

113
        if (self._p_blob_uncommitted):
1✔
114
            os.remove(self._p_blob_uncommitted)
1✔
115

116
        super()._p_invalidate()
1✔
117

118
    def opened(self):
1✔
119
        return bool(self.readers or self.writers)
1✔
120

121
    def closed(self, f):
1✔
122
        # We use try/except below because another thread might remove
123
        # the ref after we check it if the file is GCed.
124
        for file_refs in (self.readers, self.writers):
1✔
125
            for ref in file_refs:
1✔
126
                if ref() is f:
1!
127
                    try:
1✔
128
                        file_refs.remove(ref)
1✔
129
                    except ValueError:
×
130
                        pass
×
131
                    return
1✔
132

133
    def open(self, mode="r"):
1✔
134
        if mode not in valid_modes:
1!
135
            raise ValueError("invalid mode", mode)
×
136

137
        if mode == 'c':
1✔
138
            if (self._p_blob_uncommitted
1✔
139
                    or
140
                    not self._p_blob_committed
141
                    or
142
                    self._p_blob_committed.endswith(SAVEPOINT_SUFFIX)):
143
                raise BlobError('Uncommitted changes')
1✔
144
            return self._p_jar._storage.openCommittedBlobFile(
1✔
145
                self._p_oid, self._p_serial)
146

147
        if self.writers:
1✔
148
            raise BlobError("Already opened for writing.")
1✔
149

150
        if self.readers is None:
1✔
151
            self.readers = []
1✔
152

153
        if mode == 'r':
1✔
154
            result = None
1✔
155
            to_open = self._p_blob_uncommitted
1✔
156
            if not to_open:
1✔
157
                to_open = self._p_blob_committed
1✔
158
                if to_open:
1✔
159
                    result = self._p_jar._storage.openCommittedBlobFile(
1✔
160
                        self._p_oid, self._p_serial, self)
161
                else:
162
                    self._create_uncommitted_file()
1✔
163
                    to_open = self._p_blob_uncommitted
1✔
164
                    assert to_open
1✔
165

166
            if result is None:
1✔
167
                result = BlobFile(to_open, mode, self)
1✔
168

169
            def destroyed(ref, readers=self.readers):
1✔
170
                try:
×
171
                    readers.remove(ref)
×
172
                except ValueError:
×
173
                    pass
×
174

175
            self.readers.append(weakref.ref(result, destroyed))
1✔
176
        else:
177
            if self.readers:
1✔
178
                raise BlobError("Already opened for reading.")
1✔
179

180
            if mode == 'w':
1✔
181
                if self._p_blob_uncommitted is None:
1✔
182
                    self._create_uncommitted_file()
1✔
183
                result = BlobFile(self._p_blob_uncommitted, mode, self)
1✔
184
            else:  # 'r+' and 'a'
185
                if self._p_blob_uncommitted is None:
1✔
186
                    # Create a new working copy
187
                    self._create_uncommitted_file()
1✔
188
                    result = BlobFile(self._p_blob_uncommitted, mode, self)
1✔
189
                    if self._p_blob_committed:
1✔
190
                        with open(self._p_blob_committed, 'rb') as fp:
1✔
191
                            utils.cp(fp, result)
1✔
192
                        if mode == 'r+':
1!
193
                            result.seek(0)
×
194
                else:
195
                    # Re-use existing working copy
196
                    result = BlobFile(self._p_blob_uncommitted, mode, self)
1✔
197

198
            def destroyed(ref, writers=self.writers):
1✔
199
                try:
×
200
                    writers.remove(ref)
×
201
                except ValueError:
×
202
                    pass
×
203

204
            self.writers.append(weakref.ref(result, destroyed))
1✔
205

206
            self._p_changed = True
1✔
207

208
        return result
1✔
209

210
    def committed(self):
1✔
211
        if (self._p_blob_uncommitted
1✔
212
                or
213
                not self._p_blob_committed
214
                or
215
                self._p_blob_committed.endswith(SAVEPOINT_SUFFIX)):
216
            raise BlobError('Uncommitted changes')
1✔
217

218
        result = self._p_blob_committed
1✔
219

220
        # We do this to make sure we have the file and to let the
221
        # storage know we're accessing the file.
222
        n = self._p_jar._storage.loadBlob(self._p_oid, self._p_serial)
1✔
223
        assert result == n, (result, n)
1✔
224

225
        return result
1✔
226

227
    def consumeFile(self, filename):
1✔
228
        """Will replace the current data of the blob with the file given under
229
        filename.
230
        """
231
        if self.writers:
1✔
232
            raise BlobError("Already opened for writing.")
1✔
233
        if self.readers:
1✔
234
            raise BlobError("Already opened for reading.")
1✔
235

236
        previous_uncommitted = bool(self._p_blob_uncommitted)
1✔
237
        if previous_uncommitted:
1✔
238
            # If we have uncommitted data, we move it aside for now
239
            # in case the consumption doesn't work.
240
            target = self._p_blob_uncommitted
1✔
241
            target_aside = target+".aside"
1✔
242
            os.rename(target, target_aside)
1✔
243
        else:
244
            target = self._create_uncommitted_file()
1✔
245
            # We need to unlink the freshly created target again
246
            # to allow link() to do its job
247
            os.remove(target)
1✔
248

249
        try:
1✔
250
            rename_or_copy_blob(filename, target, chmod=False)
1✔
251
        except:  # noqa: E722 do not use bare 'except'
1✔
252
            # Recover from the failed consumption: First remove the file, it
253
            # might exist and mark the pointer to the uncommitted file.
254
            self._p_blob_uncommitted = None
1✔
255
            if os.path.exists(target):
1!
256
                os.remove(target)
1✔
257

258
            # If there was a file moved aside, bring it back including the
259
            # pointer to the uncommitted file.
260
            if previous_uncommitted:
1✔
261
                os.rename(target_aside, target)
1✔
262
                self._p_blob_uncommitted = target
1✔
263

264
            # Re-raise the exception to make the application aware of it.
265
            raise
1✔
266
        else:
267
            if previous_uncommitted:
1✔
268
                # The relinking worked so we can remove the data that we had
269
                # set aside.
270
                os.remove(target_aside)
1✔
271

272
            # We changed the blob state and have to make sure we join the
273
            # transaction.
274
            self._p_changed = True
1✔
275

276
    # utility methods
277

278
    def _create_uncommitted_file(self):
1✔
279
        assert self._p_blob_uncommitted is None, (
1✔
280
            "Uncommitted file already exists.")
281
        if self._p_jar:
1✔
282
            tempdir = self._p_jar.db()._storage.temporaryDirectory()
1✔
283
        else:
284
            tempdir = tempfile.gettempdir()
1✔
285

286
        filename = utils.mktemp(dir=tempdir, prefix="BUC")
1✔
287
        self._p_blob_uncommitted = filename
1✔
288

289
        def cleanup(ref):
1✔
290
            if os.path.exists(filename):
1✔
291
                os.remove(filename)
1✔
292
            try:
1✔
293
                _blob_close_refs.remove(ref)
1✔
294
            except ValueError:
×
295
                pass
×
296
        self._p_blob_ref = weakref.ref(self, cleanup)
1✔
297
        _blob_close_refs.append(self._p_blob_ref)
1✔
298

299
        return filename
1✔
300

301
    def _uncommitted(self):
1✔
302
        # hand uncommitted data to connection, relinquishing responsibility
303
        # for it.
304
        filename = self._p_blob_uncommitted
1✔
305
        if filename is None and self._p_blob_committed is None:
1✔
306
            filename = self._create_uncommitted_file()
1✔
307
        try:
1✔
308
            _blob_close_refs.remove(self._p_blob_ref)
1✔
309
        except ValueError:
1✔
310
            pass
1✔
311
        self._p_blob_uncommitted = self._p_blob_ref = None
1✔
312
        return filename
1✔
313

314

315
class BlobFile(FileIO):
1✔
316
    """A BlobFile that holds a file handle to actual blob data.
317

318
    It is a file that can be used within a transaction boundary; a BlobFile is
319
    just a Python file object, we only override methods which cause a change to
320
    blob data in order to call methods on our 'parent' persistent blob object
321
    signifying that the change happened.
322

323
    """
324

325
    # XXX these files should be created in the same partition as
326
    # the storage later puts them to avoid copying them ...
327

328
    def __init__(self, name, mode, blob):
1✔
329
        super().__init__(name, mode+'b')
1✔
330
        self.blob = blob
1✔
331

332
    def close(self):
1✔
333
        self.blob.closed(self)
1✔
334
        super().close()
1✔
335

336
    def __reduce__(self):
1✔
337
        # Python cannot pickle an open file with any pickle protocol
338
        # because of the underlying _io.BufferedReader/Writer object.
339
        # It's pointless to do that with a blob, so we make sure to
340
        # prohibit it.
341
        raise TypeError("Pickling a BlobFile is not allowed")
1✔
342

343

344
_pid = str(os.getpid())
1✔
345

346

347
def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):
1✔
348
    message = "({}) {}".format(subsys, msg)
1✔
349
    logger.log(level, message, exc_info=exc_info)
1✔
350

351

352
class FilesystemHelper:
1✔
353
    # Storages that implement IBlobStorage can choose to use this
354
    # helper class to generate and parse blob filenames.  This is not
355
    # a set-in-stone interface for all filesystem operations dealing
356
    # with blobs and storages needn't indirect through this if they
357
    # want to perform blob storage differently.
358

359
    def __init__(self, base_dir, layout_name='automatic'):
1✔
360
        self.base_dir = os.path.abspath(base_dir) + os.path.sep
1✔
361
        self.temp_dir = os.path.join(base_dir, 'tmp')
1✔
362

363
        if layout_name == 'automatic':
1✔
364
            layout_name = auto_layout_select(base_dir)
1✔
365
        if layout_name == 'lawn':
1✔
366
            log('The `lawn` blob directory layout is deprecated due to '
1✔
367
                'scalability issues on some file systems, please consider '
368
                'migrating to the `bushy` layout.', level=logging.WARN)
369
        self.layout_name = layout_name
1✔
370
        self.layout = LAYOUTS[layout_name]
1✔
371

372
    def create(self):
1✔
373
        if not os.path.exists(self.base_dir):
1✔
374
            os.makedirs(self.base_dir)
1✔
375
            log("Blob directory '%s' does not exist. "
1✔
376
                "Created new directory." % self.base_dir)
377
        if not os.path.exists(self.temp_dir):
1✔
378
            os.makedirs(self.temp_dir)
1✔
379
            log("Blob temporary directory '%s' does not exist. "
1✔
380
                "Created new directory." % self.temp_dir)
381

382
        layout_marker_path = os.path.join(self.base_dir, LAYOUT_MARKER)
1✔
383
        if not os.path.exists(layout_marker_path):
1✔
384
            with open(layout_marker_path, 'w') as layout_marker:
1✔
385
                layout_marker.write(self.layout_name)
1✔
386
        else:
387
            with open(layout_marker_path) as layout_marker:
1✔
388
                layout = layout_marker.read().strip()
1✔
389
            if layout != self.layout_name:
1✔
390
                raise ValueError(
1✔
391
                    "Directory layout `%s` selected for blob directory %s, but"
392
                    " marker found for layout `%s`" %
393
                    (self.layout_name, self.base_dir, layout))
394

395
    def isSecure(self, path):
1✔
396
        import warnings
×
397
        warnings.warn(
×
398
            "isSecure is deprecated. Permissions are no longer set by ZODB",
399
            DeprecationWarning, stacklevel=2)
400

401
    def checkSecure(self):
1✔
402
        import warnings
×
403
        warnings.warn(
×
404
            "checkSecure is deprecated. Permissions are no longer set by ZODB",
405
            DeprecationWarning, stacklevel=2)
406

407
    def getPathForOID(self, oid, create=False):
1✔
408
        """Given an OID, return the path on the filesystem where
409
        the blob data relating to that OID is stored.
410

411
        If the create flag is given, the path is also created if it didn't
412
        exist already.
413

414
        """
415
        # OIDs are numbers and sometimes passed around as integers. For our
416
        # computations we rely on the 64-bit packed string representation.
417
        if isinstance(oid, int):
1✔
418
            oid = utils.p64(oid)
1✔
419

420
        path = self.layout.oid_to_path(oid)
1✔
421
        path = os.path.join(self.base_dir, path)
1✔
422

423
        if create and not os.path.exists(path):
1✔
424
            try:
1✔
425
                os.makedirs(path)
1✔
426
            except OSError:
×
427
                # We might have lost a race.  If so, the directory
428
                # must exist now
429
                assert os.path.exists(path)
×
430
        return path
1✔
431

432
    def getOIDForPath(self, path):
1✔
433
        """Given a path, return an OID, if the path is a valid path for an
434
        OID. The inverse function to `getPathForOID`.
435

436
        Raises ValueError if the path is not valid for an OID.
437

438
        """
439
        path = path[len(self.base_dir):]
1✔
440
        return self.layout.path_to_oid(path)
1✔
441

442
    def createPathForOID(self, oid):
1✔
443
        """Given an OID, creates a directory on the filesystem where
444
        the blob data relating to that OID is stored, if it doesn't exist.
445
        """
446
        return self.getPathForOID(oid, create=True)
×
447

448
    def getBlobFilename(self, oid, tid):
1✔
449
        """Given an oid and a tid, return the full filename of the
450
        'committed' blob file related to that oid and tid.
451

452
        """
453
        # TIDs are numbers and sometimes passed around as integers. For our
454
        # computations we rely on the 64-bit packed string representation
455
        if isinstance(oid, int):
1!
456
            oid = utils.p64(oid)
×
457
        if isinstance(tid, int):
1!
458
            tid = utils.p64(tid)
×
459
        return os.path.join(self.base_dir,
1✔
460
                            self.layout.getBlobFilePath(oid, tid),
461
                            )
462

463
    def blob_mkstemp(self, oid, tid):
1✔
464
        """Given an oid and a tid, return a temporary file descriptor
465
        and a related filename.
466

467
        The file is guaranteed to exist on the same partition as committed
468
        data, which is important for being able to rename the file without a
469
        copy operation.  The directory in which the file will be placed, which
470
        is the return value of self.getPathForOID(oid), must exist before this
471
        method may be called successfully.
472

473
        """
474
        oidpath = self.getPathForOID(oid)
×
475
        fd, name = tempfile.mkstemp(suffix='.tmp',
×
476
                                    prefix=utils.tid_repr(tid),
477
                                    dir=oidpath)
478
        return fd, name
×
479

480
    def splitBlobFilename(self, filename):
1✔
481
        """Returns the oid and tid for a given blob filename.
482

483
        If the filename cannot be recognized as a blob filename, (None, None)
484
        is returned.
485

486
        """
487
        if not filename.endswith(BLOB_SUFFIX):
1!
488
            return None, None
×
489
        path, filename = os.path.split(filename)
1✔
490
        oid = self.getOIDForPath(path)
1✔
491

492
        serial = filename[:-len(BLOB_SUFFIX)]
1✔
493
        serial = utils.repr_to_oid(serial)
1✔
494
        return oid, serial
1✔
495

496
    def getOIDsForSerial(self, search_serial):
1✔
497
        """Return all oids related to a particular tid that exist in
498
        blob data.
499

500
        """
501
        oids = []
1✔
502
        for oid, oidpath in self.listOIDs():
1✔
503
            for filename in os.listdir(oidpath):
1✔
504
                blob_path = os.path.join(oidpath, filename)
1✔
505
                oid, serial = self.splitBlobFilename(blob_path)
1✔
506
                if search_serial == serial:
1✔
507
                    oids.append(oid)
1✔
508
        return oids
1✔
509

510
    def listOIDs(self):
1✔
511
        """Iterates over all paths under the base directory that contain blob
512
        files.
513
        """
514
        for path, dirs, files in os.walk(self.base_dir):
1✔
515
            # Make sure we traverse in a stable order. This is mainly to make
516
            # testing predictable.
517
            dirs.sort()
1✔
518
            files.sort()
1✔
519
            try:
1✔
520
                oid = self.getOIDForPath(path)
1✔
521
            except ValueError:
1✔
522
                continue
1✔
523
            yield oid, path
1✔
524

525

526
class NoBlobsFileSystemHelper:
1✔
527

528
    @property
1✔
529
    def temp_dir(self):
1✔
530
        raise TypeError("Blobs are not supported")
×
531

532
    getPathForOID = getBlobFilename = temp_dir
1✔
533

534

535
class BlobStorageError(Exception):
1✔
536
    """The blob storage encountered an invalid state."""
537

538

539
def auto_layout_select(path):
1✔
540
    # A heuristic to look at a path and determine which directory layout to
541
    # use.
542
    layout_marker = os.path.join(path, LAYOUT_MARKER)
1✔
543
    if os.path.exists(layout_marker):
1✔
544
        with open(layout_marker) as fp:
1✔
545
            layout = fp.read().strip()
1✔
546
        log('Blob directory `%s` has layout marker set. '
1✔
547
            'Selected `%s` layout. ' % (path, layout), level=logging.DEBUG)
548
    elif not os.path.exists(path):
1✔
549
        log('Blob directory %s does not exist. '
1✔
550
            'Selected `bushy` layout. ' % path)
551
        layout = 'bushy'
1✔
552
    else:
553
        # look for a non-hidden file in the directory
554
        has_files = False
1✔
555
        for name in os.listdir(path):
1✔
556
            if not name.startswith('.'):
1✔
557
                has_files = True
1✔
558
                break
1✔
559
        if not has_files:
1✔
560
            log('Blob directory `%s` is unused and has no layout marker set. '
1✔
561
                'Selected `bushy` layout. ' % path)
562
            layout = 'bushy'
1✔
563
        else:
564
            log('Blob directory `%s` is used but has no layout marker set. '
1✔
565
                'Selected `lawn` layout. ' % path)
566
            layout = 'lawn'
1✔
567
    return layout
1✔
568

569

570
class BushyLayout:
1✔
571
    """A bushy directory layout for blob directories.
572

573
    Creates an 8-level directory structure (one level per byte) in
574
    big-endian order from the OID of an object.
575

576
    """
577

578
    blob_path_pattern = re.compile(
1✔
579
        r'(0x[0-9a-f]{1,2}\%s){7,7}0x[0-9a-f]{1,2}$' % os.path.sep)
580

581
    def oid_to_path(self, oid):
1✔
582
        # Create the bushy directory structure with the least significant byte
583
        # first
584
        oid_bytes = ascii_bytes(oid)
1✔
585
        hex_bytes = binascii.hexlify(oid_bytes)
1✔
586
        assert len(hex_bytes) == 16
1✔
587

588
        directories = [b'0x' + hex_bytes[x:x+2]
1✔
589
                       for x in range(0, 16, 2)]
590

591
        if bytes is not str:  # py3
1!
592
            sep_bytes = os.path.sep.encode('ascii')
1✔
593
            path_bytes = sep_bytes.join(directories)
1✔
594
            return path_bytes.decode('ascii')
1✔
595
        else:
596
            return os.path.sep.join(directories)
×
597

598
    def path_to_oid(self, path):
1✔
599
        if self.blob_path_pattern.match(path) is None:
1✔
600
            raise ValueError("Not a valid OID path: `%s`" % path)
1✔
601
        path = [ascii_bytes(x) for x in path.split(os.path.sep)]
1✔
602
        # Each path segment stores a byte in hex representation. Turn it into
603
        # an int and then get the character for our byte string.
604
        oid = b''.join(binascii.unhexlify(byte[2:]) for byte in path)
1✔
605
        return oid
1✔
606

607
    def getBlobFilePath(self, oid, tid):
1✔
608
        """Given an oid and a tid, return the full filename of the
609
        'committed' blob file related to that oid and tid.
610

611
        """
612
        oid_path = self.oid_to_path(oid)
1✔
613
        filename = "{}{}".format(utils.tid_repr(tid), BLOB_SUFFIX)
1✔
614
        return os.path.join(oid_path, filename)
1✔
615

616

617
LAYOUTS['bushy'] = BushyLayout()
1✔
618

619

620
class LawnLayout(BushyLayout):
1✔
621
    """A shallow directory layout for blob directories.
622

623
    Creates a single level of directories (one for each oid).
624

625
    """
626

627
    def oid_to_path(self, oid):
1✔
628
        return utils.oid_repr(oid)
1✔
629

630
    def path_to_oid(self, path):
1✔
631
        try:
1✔
632
            if path == '':
1✔
633
                # This is a special case where repr_to_oid converts '' to the
634
                # OID z64.
635
                raise TypeError()
1✔
636
            return utils.repr_to_oid(path)
1✔
637
        except (TypeError, binascii.Error):
1✔
638
            raise ValueError('Not a valid OID path: `%s`' % path)
1✔
639

640

641
LAYOUTS['lawn'] = LawnLayout()
1✔
642

643

644
class BlobStorageMixin:
1✔
645
    """A mix-in to help storages support blobs."""
646

647
    def _blob_init(self, blob_dir, layout='automatic'):
1✔
648
        # XXX Log warning if storage is ClientStorage
649
        self.fshelper = FilesystemHelper(blob_dir, layout)
1✔
650
        self.fshelper.create()
1✔
651
        self.dirty_oids = []
1✔
652

653
    def _blob_init_no_blobs(self):
1✔
654
        self.fshelper = NoBlobsFileSystemHelper()
1✔
655
        self.dirty_oids = []
1✔
656

657
    def _blob_tpc_abort(self):
1✔
658
        """Blob cleanup to be called from subclass tpc_abort
659
        """
660
        while self.dirty_oids:
1✔
661
            oid, serial = self.dirty_oids.pop()
1✔
662
            clean = self.fshelper.getBlobFilename(oid, serial)
1✔
663
            if os.path.exists(clean):
1!
664
                remove_committed(clean)
1✔
665

666
    def _blob_tpc_finish(self):
1✔
667
        """Blob cleanup to be called from subclass tpc_finish
668
        """
669
        self.dirty_oids = []
1✔
670

671
    def registerDB(self, db):
1✔
672
        self.__untransform_record_data = db.untransform_record_data
1✔
673
        try:
1✔
674
            m = super().registerDB
1✔
675
        except AttributeError:
1✔
676
            pass
1✔
677
        else:
678
            m(db)
1✔
679

680
    def __untransform_record_data(self, record):
1✔
681
        return record
1✔
682

683
    def is_blob_record(self, record):
1✔
684
        if record:
1!
685
            return is_blob_record(self.__untransform_record_data(record))
1✔
686

687
    def copyTransactionsFrom(self, other):
1✔
688
        copyTransactionsFromTo(other, self)
1✔
689

690
    def loadBlob(self, oid, serial):
1✔
691
        """Return the filename where the blob file can be found.
692
        """
693
        filename = self.fshelper.getBlobFilename(oid, serial)
1✔
694
        if not os.path.exists(filename):
1✔
695
            raise POSKeyError("No blob file at %s" % filename, oid, serial)
1✔
696
        return filename
1✔
697

698
    def openCommittedBlobFile(self, oid, serial, blob=None):
1✔
699
        blob_filename = self.loadBlob(oid, serial)
1✔
700
        if blob is None:
1✔
701
            return open(blob_filename, 'rb')
1✔
702
        else:
703
            return BlobFile(blob_filename, 'r', blob)
1✔
704

705
    def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
1✔
706
                    transaction):
707
        """Write blob data already committed in a separate database
708
        """
709
        self.restore(oid, serial, data, '', prev_txn, transaction)
1✔
710
        self._blob_storeblob(oid, serial, blobfilename)
1✔
711

712
        return self._tid
1✔
713

714
    def _blob_storeblob(self, oid, serial, blobfilename):
1✔
715
        with self._lock:
1✔
716
            self.fshelper.getPathForOID(oid, create=True)
1✔
717
            targetname = self.fshelper.getBlobFilename(oid, serial)
1✔
718
            rename_or_copy_blob(blobfilename, targetname)
1✔
719

720
            # if oid already in there, something is really hosed.
721
            # The underlying storage should have complained anyway
722
            self.dirty_oids.append((oid, serial))
1✔
723

724
    def storeBlob(self, oid, oldserial, data, blobfilename, version,
1✔
725
                  transaction):
726
        """Stores data that has a BLOB attached."""
727
        assert not version, "Versions aren't supported."
1✔
728
        self.store(oid, oldserial, data, '', transaction)
1✔
729
        self._blob_storeblob(oid, self._tid, blobfilename)
1✔
730

731
    def temporaryDirectory(self):
1✔
732
        return self.fshelper.temp_dir
1✔
733

734

735
@zope.interface.implementer(ZODB.interfaces.IBlobStorage)
1✔
736
class BlobStorage(BlobStorageMixin):
1✔
737
    """A wrapper/proxy storage to support blobs.
738
    """
739

740
    def __init__(self, base_directory, storage, layout='automatic'):
1✔
741
        assert not ZODB.interfaces.IBlobStorage.providedBy(storage)
1✔
742
        self.__storage = storage
1✔
743

744
        self._blob_init(base_directory, layout)
1✔
745
        try:
1✔
746
            supportsUndo = storage.supportsUndo
1✔
747
        except AttributeError:
1✔
748
            supportsUndo = False
1✔
749
        else:
750
            supportsUndo = supportsUndo()
1✔
751
        self.__supportsUndo = supportsUndo
1✔
752
        self._blobs_pack_is_in_progress = False
1✔
753

754
        if ZODB.interfaces.IStorageRestoreable.providedBy(storage):
1✔
755
            zope.interface.directlyProvides(
1✔
756
                self,
757
                ZODB.interfaces.IBlobStorageRestoreable,
758
                zope.interface.providedBy(storage))
759

760
    def __getattr__(self, name):
1✔
761
        return getattr(self.__storage, name)
1✔
762

763
    def __len__(self):
1✔
764
        return len(self.__storage)
×
765

766
    def __repr__(self):
1✔
767
        normal_storage = self.__storage
×
768
        return '<BlobStorage proxy for {!r} at {}>'.format(normal_storage,
×
769
                                                           hex(id(self)))
770

771
    def tpc_finish(self, *arg, **kw):
1✔
772
        # We need to override the base storage's tpc_finish instead of
773
        # providing a _finish method because methods found on the proxied
774
        # object aren't rebound to the proxy
775
        tid = self.__storage.tpc_finish(*arg, **kw)
1✔
776
        self._blob_tpc_finish()
1✔
777
        return tid
1✔
778

779
    def tpc_abort(self, *arg, **kw):
1✔
780
        # We need to override the base storage's abort instead of
781
        # providing an _abort method because methods found on the proxied
782
        # object aren't rebound to the proxy
783
        self.__storage.tpc_abort(*arg, **kw)
1✔
784
        self._blob_tpc_abort()
1✔
785

786
    def _packUndoing(self, packtime, referencesf):
1✔
787
        # Walk over all existing revisions of all blob files and check
788
        # if they are still needed by attempting to load the revision
789
        # of that object from the database.  This is maybe the slowest
790
        # possible way to do this, but it's safe.
791
        for oid, oid_path in self.fshelper.listOIDs():
1✔
792
            files = os.listdir(oid_path)
1✔
793
            for filename in files:
1✔
794
                filepath = os.path.join(oid_path, filename)
1✔
795
                whatever, serial = self.fshelper.splitBlobFilename(filepath)
1✔
796
                try:
1✔
797
                    self.loadSerial(oid, serial)
1✔
798
                except POSKeyError:
1✔
799
                    remove_committed(filepath)
1✔
800

801
            if not os.listdir(oid_path):
1✔
802
                shutil.rmtree(oid_path)
1✔
803

804
    def _packNonUndoing(self, packtime, referencesf):
1✔
805
        for oid, oid_path in self.fshelper.listOIDs():
1✔
806
            exists = True
1✔
807
            try:
1✔
808
                utils.load_current(self, oid)
1✔
809
            except (POSKeyError, KeyError):
1✔
810
                exists = False
1✔
811

812
            if exists:
1✔
813
                files = os.listdir(oid_path)
1✔
814
                files.sort()
1✔
815
                latest = files[-1]  # depends on ever-increasing tids
1✔
816
                files.remove(latest)
1✔
817
                for f in files:
1✔
818
                    remove_committed(os.path.join(oid_path, f))
1✔
819
            else:
820
                remove_committed_dir(oid_path)
1✔
821
                continue
1✔
822

823
            if not os.listdir(oid_path):
1!
824
                shutil.rmtree(oid_path)
×
825

826
    def pack(self, packtime, referencesf):
1✔
827
        """Remove all unused OID/TID combinations."""
828
        with self._lock:
1✔
829
            if self._blobs_pack_is_in_progress:
1✔
830
                raise BlobStorageError('Already packing')
1✔
831
            self._blobs_pack_is_in_progress = True
1✔
832

833
        try:
1✔
834
            # Pack the underlying storage, which will allow us to determine
835
            # which serials are current.
836
            unproxied = self.__storage
1✔
837
            result = unproxied.pack(packtime, referencesf)
1✔
838

839
            # Perform a pack on the blob data.
840
            if self.__supportsUndo:
1✔
841
                self._packUndoing(packtime, referencesf)
1✔
842
            else:
843
                self._packNonUndoing(packtime, referencesf)
1✔
844
        finally:
845
            with self._lock:
1✔
846
                self._blobs_pack_is_in_progress = False
1✔
847

848
        return result
1✔
849

850
    def undo(self, serial_id, transaction):
1✔
851
        undo_serial, keys = self.__storage.undo(serial_id, transaction)
1✔
852
        # serial_id is the transaction id of the txn that we wish to undo.
853
        # "undo_serial" is the transaction id of txn in which the undo is
854
        # performed.  "keys" is the list of oids that are involved in the
855
        # undo transaction.
856

857
        # The serial_id is assumed to be given to us base-64 encoded
858
        # (belying the web UI legacy of the ZODB code :-()
859
        serial_id = decodebytes(serial_id + b'\n')
1✔
860

861
        with self._lock:
1✔
862
            # we get all the blob oids on the filesystem related to the
863
            # transaction we want to undo.
864
            for oid in self.fshelper.getOIDsForSerial(serial_id):
1✔
865
                # we want to find the serial id of the previous revision
866
                # of this blob object.
867
                load_result = self.loadBefore(oid, serial_id)
1✔
868

869
                if load_result is None:
1✔
870

871
                    # There was no previous revision of this blob
872
                    # object.  The blob was created in the transaction
873
                    # represented by serial_id.  We copy the blob data
874
                    # to a new file that references the undo
875
                    # transaction in case a user wishes to undo this
876
                    # undo. It would be nice if we had some way to
877
                    # link to old blobs.
878
                    orig_fn = self.fshelper.getBlobFilename(oid, serial_id)
1✔
879
                    new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
1✔
880
                else:
881
                    # A previous revision of this blob existed before the
882
                    # transaction implied by "serial_id".  We copy the blob
883
                    # data to a new file that references the undo transaction
884
                    # in case a user wishes to undo this undo.
885
                    data, serial_before, serial_after = load_result
1✔
886
                    orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
1✔
887
                    new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
1✔
888
                with open(orig_fn, "rb") as orig:
1✔
889
                    with open(new_fn, "wb") as new:
1✔
890
                        utils.cp(orig, new)
1✔
891
                self.dirty_oids.append((oid, undo_serial))
1✔
892

893
        return undo_serial, keys
1✔
894

895
    def new_instance(self):
1✔
896
        """Implementation of IMVCCStorage.new_instance.
897

898
        This method causes all storage instances to be wrapped with
899
        a blob storage wrapper.
900
        """
901
        base_dir = self.fshelper.base_dir
×
902
        s = self.__storage.new_instance()
×
903
        res = BlobStorage(base_dir, s)
×
904
        return res
×
905

906

907
copied = logging.getLogger('ZODB.blob.copied').debug
1✔
908

909

910
def rename_or_copy_blob(f1, f2, chmod=True):
1✔
911
    """Try to rename f1 to f2, fallback to copy.
912

913
    Under certain conditions a rename might not work, e.g. because the target
914
    directory is on a different partition. In this case we try to copy the
915
    data and remove the old file afterwards.
916

917
    """
918
    try:
1✔
919
        os.rename(f1, f2)
1✔
920
    except OSError:
1✔
921
        copied("Copied blob file %r to %r.", f1, f2)
1✔
922
        with open(f1, 'rb') as file1:
1✔
923
            with open(f2, 'wb') as file2:
1✔
924
                utils.cp(file1, file2)
1✔
925
        remove_committed(f1)
1✔
926

927
    if chmod:
1✔
928
        set_not_writable(f2)
1✔
929

930

931
if sys.platform == 'win32':
1!
932
    # On Windows, you can't remove read-only files, so make the
933
    # file writable first.
934

935
    def remove_committed(filename):
×
936
        os.chmod(filename, stat.S_IWRITE)
×
937
        os.remove(filename)
×
938

939
    def remove_committed_dir(path):
×
940
        for (dirpath, dirnames, filenames) in os.walk(path):
×
941
            for filename in filenames:
×
942
                filename = os.path.join(dirpath, filename)
×
943
                remove_committed(filename)
×
944
        shutil.rmtree(path)
×
945
    link_or_copy = shutil.copy
×
946
else:
947
    remove_committed = os.remove
1✔
948
    remove_committed_dir = shutil.rmtree
1✔
949
    
950
    try:
1✔
951
        link_or_copy = os.link
1✔
952
    except AttributeError:  # pragma: no cover
953
        # FBO termux on Android.
954
        # See https://github.com/zopefoundation/ZODB/issues/257
955
        link_or_copy = shutil.copy
956

957

958
def find_global_Blob(module, class_):
1✔
959
    if module == 'ZODB.blob' and class_ == 'Blob':
1✔
960
        return Blob
1✔
961

962

963
def is_blob_record(record):
1✔
964
    """Check whether a database record is a blob record.
965

966
    This is primarily intended to be used when copying data from one
967
    storage to another.
968

969
    """
970
    if record and (b'ZODB.blob' in record):
1✔
971
        unpickler = PersistentUnpickler(
1✔
972
            find_global_Blob, None, BytesIO(record))
973

974
        try:
1✔
975
            return unpickler.load() is Blob
1✔
976
        except (MemoryError, KeyboardInterrupt, SystemExit):
×
977
            raise
×
978
        except Exception:
×
979
            pass
×
980

981
    return False
1✔
982

983

984
def copyTransactionsFromTo(source, destination):
1✔
985
    for trans in source.iterator():
1✔
986
        destination.tpc_begin(trans, trans.tid, trans.status)
1✔
987
        for record in trans:
1✔
988
            blobfilename = None
1✔
989
            if is_blob_record(record.data):
1✔
990
                try:
1✔
991
                    blobfilename = source.loadBlob(record.oid, record.tid)
1✔
992
                except POSKeyError:
×
993
                    pass
×
994
            if blobfilename is not None:
1✔
995
                fd, name = tempfile.mkstemp(
1✔
996
                    prefix='CTFT',
997
                    suffix='.tmp', dir=destination.fshelper.temp_dir)
998
                os.close(fd)
1✔
999
                with open(blobfilename, 'rb') as sf:
1✔
1000
                    with open(name, 'wb') as df:
1✔
1001
                        utils.cp(sf, df)
1✔
1002
                destination.restoreBlob(record.oid, record.tid, record.data,
1✔
1003
                                        name, record.data_txn, trans)
1004
            else:
1005
                destination.restore(record.oid, record.tid, record.data,
1✔
1006
                                    '', record.data_txn, trans)
1007

1008
        destination.tpc_vote(trans)
1✔
1009
        destination.tpc_finish(trans)
1✔
1010

1011

1012
NO_WRITE = ~ (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1✔
1013
READ_PERMS = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
1✔
1014

1015

1016
def set_not_writable(path):
1✔
1017
    perms = stat.S_IMODE(os.lstat(path).st_mode)
1✔
1018

1019
    # Not writable:
1020
    perms &= NO_WRITE
1✔
1021

1022
    # Read perms from folder:
1023
    perms |= stat.S_IMODE(os.lstat(os.path.dirname(path)).st_mode) & READ_PERMS
1✔
1024

1025
    os.chmod(path, perms)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc