• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / ZODB / 11588847965

30 Oct 2024 07:35AM UTC coverage: 83.932% (+0.2%) from 83.766%
11588847965

Pull #403

github

web-flow
Merge branch 'master' into repozo-incremental-recover
Pull Request #403: Repozo incremental recover

2451 of 3556 branches covered (68.93%)

219 of 222 new or added lines in 2 files covered. (98.65%)

37 existing lines in 2 files now uncovered.

13482 of 16063 relevant lines covered (83.93%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.27
/src/ZODB/scripts/tests/test_repozo.py
1
##############################################################################
2
#
3
# Copyright (c) 2004-2009 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14

15
import os
1✔
16
import sys
1✔
17
import unittest
1✔
18
from hashlib import md5
1✔
19
from io import BytesIO
1✔
20
from io import StringIO
1✔
21

22
import ZODB.tests.util  # layer used at class scope
1✔
23

24

25
_NOISY = os.environ.get('NOISY_REPOZO_TEST_OUTPUT')
1✔
26

27

28
def _write_file(name, bits, mode='wb'):
1✔
29
    with open(name, mode) as f:
1✔
30
        f.write(bits)
1✔
31
        f.flush()
1✔
32

33

34
def _read_file(name, mode='rb'):
1✔
35
    with open(name, mode) as f:
1✔
36
        return f.read()
1✔
37

38

39
class OurDB:
1✔
40

41
    _file_name = None
1✔
42

43
    def __init__(self, dir):
1✔
44
        import transaction
1✔
45
        from BTrees.OOBTree import OOBTree
1✔
46
        self.dir = dir
1✔
47
        self.getdb()
1✔
48
        conn = self.db.open()
1✔
49
        conn.root()['tree'] = OOBTree()
1✔
50
        transaction.commit()
1✔
51
        self.pos = self.db.storage._pos
1✔
52
        self.close()
1✔
53

54
    def getdb(self):
1✔
55
        from ZODB import DB
1✔
56
        from ZODB.FileStorage import FileStorage
1✔
57
        self._file_name = storage_filename = os.path.join(self.dir, 'Data.fs')
1✔
58
        storage = FileStorage(storage_filename)
1✔
59
        self.db = DB(storage)
1✔
60

61
    def gettree(self):
1✔
62
        self.getdb()
1✔
63
        conn = self.db.open()
1✔
64
        return conn.root()['tree']
1✔
65

66
    def pack(self):
1✔
67
        self.getdb()
1✔
68
        self.db.pack()
1✔
69

70
    def close(self):
1✔
71
        if self.db is not None:
1!
72
            self.db.close()
1✔
73
            self.db = None
1✔
74

75
    def mutate(self):
1✔
76
        # Make random mutations to the btree in the database.
77
        import random
1✔
78

79
        import transaction
1✔
80
        tree = self.gettree()
1✔
81
        for dummy in range(100):
1✔
82
            if random.random() < 0.6:
1✔
83
                tree[random.randrange(100000)] = random.randrange(100000)
1✔
84
            else:
85
                keys = tree.keys()
1✔
86
                if keys:
1✔
87
                    del tree[keys[0]]
1✔
88
        transaction.commit()
1✔
89
        self.pos = self.db.storage._pos
1✔
90
        self.maxkey = self.db.storage._oid
1✔
91
        self.close()
1✔
92

93

94
class Test_parseargs(unittest.TestCase):
1✔
95

96
    def setUp(self):
1✔
97
        from ZODB.scripts import repozo
1✔
98
        self._old_verbosity = repozo.VERBOSE
1✔
99
        self._old_stderr = sys.stderr
1✔
100
        repozo.VERBOSE = False
1✔
101
        sys.stderr = StringIO()
1✔
102

103
    def tearDown(self):
1✔
104
        from ZODB.scripts import repozo
1✔
105
        sys.stderr = self._old_stderr
1✔
106
        repozo.VERBOSE = self._old_verbosity
1✔
107

108
    def test_short(self):
1✔
109
        from ZODB.scripts import repozo
1✔
110
        options = repozo.parseargs(['-v', '-V', '-r', '/tmp/nosuchdir'])
1✔
111
        self.assertTrue(repozo.VERBOSE)
1✔
112
        self.assertEqual(options.mode, repozo.VERIFY)
1✔
113
        self.assertEqual(options.repository, '/tmp/nosuchdir')
1✔
114

115
    def test_long(self):
1✔
116
        from ZODB.scripts import repozo
1✔
117
        options = repozo.parseargs(['--verbose', '--verify',
1✔
118
                                    '--repository=/tmp/nosuchdir'])
119
        self.assertTrue(repozo.VERBOSE)
1✔
120
        self.assertEqual(options.mode, repozo.VERIFY)
1✔
121
        self.assertEqual(options.repository, '/tmp/nosuchdir')
1✔
122

123
    def test_help(self):
1✔
124
        from ZODB.scripts import repozo
1✔
125

126
        # Note: can't mock sys.stdout in our setUp: if a test fails,
127
        # zope.testrunner will happily print the traceback and failure message
128
        # into our StringIO before running our tearDown.
129
        old_stdout = sys.stdout
1✔
130
        sys.stdout = StringIO()
1✔
131
        try:
1✔
132
            self.assertRaises(SystemExit, repozo.parseargs, ['--help'])
1✔
133
            self.assertIn('Usage:', sys.stdout.getvalue())
1✔
134
        finally:
135
            sys.stdout = old_stdout
1✔
136

137
    def test_bad_option(self):
1✔
138
        from ZODB.scripts import repozo
1✔
139
        self.assertRaises(SystemExit, repozo.parseargs,
1✔
140
                          ['--crash-please'])
141
        self.assertIn('option --crash-please not recognized',
1✔
142
                      sys.stderr.getvalue())
143

144
    def test_bad_argument(self):
1✔
145
        from ZODB.scripts import repozo
1✔
146
        self.assertRaises(SystemExit, repozo.parseargs,
1✔
147
                          ['crash', 'please'])
148
        self.assertIn('Invalid arguments: crash, please',
1✔
149
                      sys.stderr.getvalue())
150

151
    def test_mode_selection(self):
1✔
152
        from ZODB.scripts import repozo
1✔
153
        options = repozo.parseargs([
1✔
154
            '-B', '-f', '/tmp/Data.fs', '-r', '/tmp/nosuchdir'])
155
        self.assertEqual(options.mode, repozo.BACKUP)
1✔
156
        options = repozo.parseargs(['-R', '-r', '/tmp/nosuchdir'])
1✔
157
        self.assertEqual(options.mode, repozo.RECOVER)
1✔
158
        options = repozo.parseargs(['-V', '-r', '/tmp/nosuchdir'])
1✔
159
        self.assertEqual(options.mode, repozo.VERIFY)
1✔
160

161
    def test_mode_selection_is_mutually_exclusive(self):
1✔
162
        from ZODB.scripts import repozo
1✔
163
        self.assertRaises(SystemExit, repozo.parseargs, ['-B', '-R'])
1✔
164
        self.assertIn('-B, -R, and -V are mutually exclusive',
1✔
165
                      sys.stderr.getvalue())
166
        self.assertRaises(SystemExit, repozo.parseargs, ['-R', '-V'])
1✔
167
        self.assertRaises(SystemExit, repozo.parseargs, ['-V', '-B'])
1✔
168

169
    def test_mode_selection_required(self):
1✔
170
        from ZODB.scripts import repozo
1✔
171
        self.assertRaises(SystemExit, repozo.parseargs, [])
1✔
172
        self.assertIn('Either --backup, --recover or --verify is required',
1✔
173
                      sys.stderr.getvalue())
174

175
    def test_misc_flags(self):
1✔
176
        from ZODB.scripts import repozo
1✔
177
        options = repozo.parseargs([
1✔
178
            '-B', '-f', '/tmp/Data.fs', '-r', '/tmp/nosuchdir', '-F'])
179
        self.assertTrue(options.full)
1✔
180
        options = repozo.parseargs([
1✔
181
            '-B', '-f', '/tmp/Data.fs', '-r', '/tmp/nosuchdir', '-k'])
182
        self.assertTrue(options.killold)
1✔
183

184
    def test_repo_is_required(self):
1✔
185
        from ZODB.scripts import repozo
1✔
186
        self.assertRaises(SystemExit, repozo.parseargs, ['-B'])
1✔
187
        self.assertIn('--repository is required', sys.stderr.getvalue())
1✔
188

189
    def test_backup_ignored_args(self):
1✔
190
        from ZODB.scripts import repozo
1✔
191
        options = repozo.parseargs(['-B', '-r', '/tmp/nosuchdir', '-v',
1✔
192
                                    '--with-verify',
193
                                    '-f', '/tmp/Data.fs',
194
                                    '-o', '/tmp/ignored.fs',
195
                                    '-D', '2011-12-13'])
196
        self.assertEqual(options.date, None)
1✔
197
        self.assertIn('--date option is ignored in backup mode',
1✔
198
                      sys.stderr.getvalue())
199
        self.assertEqual(options.output, None)
1✔
200
        self.assertIn('--output option is ignored in backup mode',
1✔
201
                      sys.stderr.getvalue())
202
        self.assertEqual(options.withverify, False)
1✔
203
        self.assertIn(
1✔
204
            '--with-verify option is ignored in backup mode',
205
            sys.stderr.getvalue())
206

207
    def test_backup_required_args(self):
1✔
208
        from ZODB.scripts import repozo
1✔
209
        self.assertRaises(SystemExit, repozo.parseargs,
1✔
210
                          ['-B', '-r', '/tmp/nosuchdir'])
211
        self.assertIn('--file is required', sys.stderr.getvalue())
1✔
212

213
    def test_backup_no_ignored_args_warning(self):
1✔
214
        from ZODB.scripts import repozo
1✔
215
        repozo.parseargs(['-B', '-r', '/tmp/nosuchdir', '-v',
1✔
216
                          '-f', '/tmp/Data.fs',])
217
        self.assertNotIn('option is ignored', sys.stderr.getvalue())
1✔
218

219
    def test_recover_ignored_args(self):
1✔
220
        from ZODB.scripts import repozo
1✔
221
        options = repozo.parseargs(['-R', '-r', '/tmp/nosuchdir', '-v',
1✔
222
                                    '-f', '/tmp/ignored.fs',
223
                                    '-k'])
224
        self.assertEqual(options.file, None)
1✔
225
        self.assertIn('--file option is ignored in recover mode',
1✔
226
                      sys.stderr.getvalue())
227
        self.assertEqual(options.killold, False)
1✔
228
        self.assertIn('--kill-old-on-full option is ignored in recover mode',
1✔
229
                      sys.stderr.getvalue())
230

231
    def test_verify_ignored_args(self):
1✔
232
        from ZODB.scripts import repozo
1✔
233
        options = repozo.parseargs(['-V', '-r', '/tmp/nosuchdir', '-v',
1✔
234
                                    '--with-verify',
235
                                    '-o', '/tmp/ignored.fs',
236
                                    '-D', '2011-12-13',
237
                                    '-f', '/tmp/ignored.fs',
238
                                    '-z', '-k', '-F'])
239
        self.assertEqual(options.date, None)
1✔
240
        self.assertIn('--date option is ignored in verify mode',
1✔
241
                      sys.stderr.getvalue())
242
        self.assertEqual(options.output, None)
1✔
243
        self.assertIn('--output option is ignored in verify mode',
1✔
244
                      sys.stderr.getvalue())
245
        self.assertEqual(options.full, False)
1✔
246
        self.assertIn('--full option is ignored in verify mode',
1✔
247
                      sys.stderr.getvalue())
248
        self.assertEqual(options.gzip, False)
1✔
249
        self.assertIn('--gzip option is ignored in verify mode',
1✔
250
                      sys.stderr.getvalue())
251
        self.assertEqual(options.file, None)
1✔
252
        self.assertIn('--file option is ignored in verify mode',
1✔
253
                      sys.stderr.getvalue())
254
        self.assertEqual(options.killold, False)
1✔
255
        self.assertIn('--kill-old-on-full option is ignored in verify mode',
1✔
256
                      sys.stderr.getvalue())
257
        self.assertEqual(options.withverify, False)
1✔
258
        self.assertIn('--with-verify option is ignored in verify mode',
1✔
259
                      sys.stderr.getvalue())
260

261

262
class FileopsBase:
1✔
263

264
    def _makeChunks(self):
1✔
265
        from ZODB.scripts.repozo import READCHUNK
1✔
266
        return [b'x' * READCHUNK, b'y' * READCHUNK, b'z']
1✔
267

268
    def _makeFile(self, text=None):
1✔
269
        if text is None:
1✔
270
            text = b''.join(self._makeChunks())
1✔
271
        return BytesIO(text)
1✔
272

273

274
class Test_dofile(unittest.TestCase, FileopsBase):
1✔
275

276
    def _callFUT(self, func, fp, n):
1✔
277
        from ZODB.scripts.repozo import dofile
1✔
278
        return dofile(func, fp, n)
1✔
279

280
    def test_empty_read_all(self):
1✔
281
        chunks = []
1✔
282
        file = self._makeFile(b'')
1✔
283
        bytes = self._callFUT(chunks.append, file, None)
1✔
284
        self.assertEqual(bytes, 0)
1✔
285
        self.assertEqual(chunks, [])
1✔
286

287
    def test_empty_read_count(self):
1✔
288
        chunks = []
1✔
289
        file = self._makeFile(b'')
1✔
290
        bytes = self._callFUT(chunks.append, file, 42)
1✔
291
        self.assertEqual(bytes, 0)
1✔
292
        self.assertEqual(chunks, [])
1✔
293

294
    def test_nonempty_read_all(self):
1✔
295
        chunks = []
1✔
296
        file = self._makeFile()
1✔
297
        bytes = self._callFUT(chunks.append, file, None)
1✔
298
        self.assertEqual(bytes, file.tell())
1✔
299
        self.assertEqual(chunks, self._makeChunks())
1✔
300

301
    def test_nonempty_read_count(self):
1✔
302
        chunks = []
1✔
303
        file = self._makeFile()
1✔
304
        bytes = self._callFUT(chunks.append, file, 42)
1✔
305
        self.assertEqual(bytes, 42)
1✔
306
        self.assertEqual(chunks, [b'x' * 42])
1✔
307

308

309
class Test_checksum(unittest.TestCase, FileopsBase):
1✔
310

311
    def _callFUT(self, fp, n):
1✔
312
        from ZODB.scripts.repozo import checksum
1✔
313
        return checksum(fp, n)
1✔
314

315
    def test_empty_read_all(self):
1✔
316
        file = self._makeFile(b'')
1✔
317
        sum = self._callFUT(file, None)
1✔
318
        self.assertEqual(sum, md5(b'').hexdigest())
1✔
319

320
    def test_empty_read_count(self):
1✔
321
        file = self._makeFile(b'')
1✔
322
        sum = self._callFUT(file, 42)
1✔
323
        self.assertEqual(sum, md5(b'').hexdigest())
1✔
324

325
    def test_nonempty_read_all(self):
1✔
326
        file = self._makeFile()
1✔
327
        sum = self._callFUT(file, None)
1✔
328
        self.assertEqual(sum, md5(b''.join(self._makeChunks())).hexdigest())
1✔
329

330
    def test_nonempty_read_count(self):
1✔
331
        file = self._makeFile()
1✔
332
        sum = self._callFUT(file, 42)
1✔
333
        self.assertEqual(sum, md5(b'x' * 42).hexdigest())
1✔
334

335

336
class OptionsTestBase:
1✔
337

338
    _repository_directory = None
1✔
339
    _data_directory = None
1✔
340

341
    def tearDown(self):
1✔
342
        if self._repository_directory is not None:
1✔
343
            from shutil import rmtree
1✔
344
            rmtree(self._repository_directory)
1✔
345
        if self._data_directory is not None:
1✔
346
            from shutil import rmtree
1✔
347
            rmtree(self._data_directory)
1✔
348

349
    def _makeOptions(self, **kw):
1✔
350
        import tempfile
1✔
351
        self._repository_directory = tempfile.mkdtemp(prefix='test-repozo-')
1✔
352

353
        class Options:
1✔
354
            repository = self._repository_directory
1✔
355
            date = None
1✔
356

357
            def __init__(self, **kw):
1✔
358
                self.__dict__.update(kw)
1✔
359
        return Options(**kw)
1✔
360

361

362
class Test_copyfile(OptionsTestBase, unittest.TestCase):
1✔
363

364
    def _callFUT(self, options, dest, start, n):
1✔
365
        from ZODB.scripts.repozo import copyfile
1✔
366
        return copyfile(options, dest, start, n)
1✔
367

368
    def test_no_gzip(self):
1✔
369
        options = self._makeOptions(gzip=False)
1✔
370
        source = options.file = os.path.join(self._repository_directory,
1✔
371
                                             'source.txt')
372
        _write_file(source, b'x' * 1000)
1✔
373
        target = os.path.join(self._repository_directory, 'target.txt')
1✔
374
        sum = self._callFUT(options, target, 0, 100)
1✔
375
        self.assertEqual(sum, md5(b'x' * 100).hexdigest())
1✔
376
        self.assertEqual(_read_file(target), b'x' * 100)
1✔
377

378
    def test_w_gzip(self):
1✔
379
        from ZODB.scripts.repozo import _GzipCloser
1✔
380
        options = self._makeOptions(gzip=True)
1✔
381
        source = options.file = os.path.join(self._repository_directory,
1✔
382
                                             'source.txt')
383
        _write_file(source, b'x' * 1000)
1✔
384
        target = os.path.join(self._repository_directory, 'target.txt')
1✔
385
        sum = self._callFUT(options, target, 0, 100)
1✔
386
        self.assertEqual(sum, md5(b'x' * 100).hexdigest())
1✔
387
        with _GzipCloser(target, 'rb') as f:
1✔
388
            self.assertEqual(f.read(), b'x' * 100)
1✔
389

390

391
class Test_concat(OptionsTestBase, unittest.TestCase):
1✔
392

393
    def _callFUT(self, files, ofp):
1✔
394
        from ZODB.scripts.repozo import concat
1✔
395
        return concat(files, ofp)
1✔
396

397
    def _makeFile(self, name, text, gzip_file=False):
1✔
398
        import tempfile
1✔
399

400
        from ZODB.scripts.repozo import _GzipCloser
1✔
401
        if self._repository_directory is None:
1✔
402
            self._repository_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
403
        fqn = os.path.join(self._repository_directory, name)
1✔
404
        if gzip_file:
1✔
405
            _opener = _GzipCloser
1✔
406
        else:
407
            _opener = open
1✔
408
        with _opener(fqn, 'wb') as f:
1✔
409
            f.write(text)
1✔
410
            f.flush()
1✔
411
        return fqn
1✔
412

413
    def test_empty_list_no_ofp(self):
1✔
414
        bytes, sum = self._callFUT([], None)
1✔
415
        self.assertEqual(bytes, 0)
1✔
416
        self.assertEqual(sum, md5(b'').hexdigest())
1✔
417

418
    def test_w_plain_files_no_ofp(self):
1✔
419
        files = [self._makeFile(x, x.encode(), False) for x in 'ABC']
1✔
420
        bytes, sum = self._callFUT(files, None)
1✔
421
        self.assertEqual(bytes, 3)
1✔
422
        self.assertEqual(sum, md5(b'ABC').hexdigest())
1✔
423

424
    def test_w_gzipped_files_no_ofp(self):
1✔
425
        files = [self._makeFile('%s.fsz' % x, x.encode(), True) for x in 'ABC']
1✔
426
        bytes, sum = self._callFUT(files, None)
1✔
427
        self.assertEqual(bytes, 3)
1✔
428
        self.assertEqual(sum, md5(b'ABC').hexdigest())
1✔
429

430
    def test_w_ofp(self):
1✔
431

432
        class Faux:
1✔
433
            _closed = False
1✔
434

435
            def __init__(self):
1✔
436
                self._written = []
1✔
437

438
            def write(self, data):
1✔
439
                self._written.append(data)
1✔
440

441
            def close(self):
1✔
UNCOV
442
                self._closed = True
×
443

444
        files = [self._makeFile(x, x.encode(), False) for x in 'ABC']
1✔
445
        ofp = Faux()
1✔
446
        bytes, sum = self._callFUT(files, ofp)
1✔
447
        self.assertEqual(ofp._written, [x.encode() for x in 'ABC'])
1✔
448
        self.assertFalse(ofp._closed)
1✔
449

450

451
_marker = object()
1✔
452

453

454
class Test_gen_filename(OptionsTestBase, unittest.TestCase):
1✔
455

456
    def _callFUT(self, options, ext=_marker):
1✔
457
        from ZODB.scripts.repozo import gen_filename
1✔
458
        if ext is _marker:
1✔
459
            return gen_filename(options)
1✔
460
        return gen_filename(options, ext)
1✔
461

462
    def test_explicit_ext(self):
1✔
463
        options = self._makeOptions(test_now=(2010, 5, 14, 12, 52, 31))
1✔
464
        fn = self._callFUT(options, '.txt')
1✔
465
        self.assertEqual(fn, '2010-05-14-12-52-31.txt')
1✔
466

467
    def test_full_no_gzip(self):
1✔
468
        options = self._makeOptions(test_now=(2010, 5, 14, 12, 52, 31),
1✔
469
                                    full=True,
470
                                    gzip=False,
471
                                    )
472
        fn = self._callFUT(options)
1✔
473
        self.assertEqual(fn, '2010-05-14-12-52-31.fs')
1✔
474

475
    def test_full_w_gzip(self):
1✔
476
        options = self._makeOptions(test_now=(2010, 5, 14, 12, 52, 31),
1✔
477
                                    full=True,
478
                                    gzip=True,
479
                                    )
480
        fn = self._callFUT(options)
1✔
481
        self.assertEqual(fn, '2010-05-14-12-52-31.fsz')
1✔
482

483
    def test_incr_no_gzip(self):
1✔
484
        options = self._makeOptions(test_now=(2010, 5, 14, 12, 52, 31),
1✔
485
                                    full=False,
486
                                    gzip=False,
487
                                    )
488
        fn = self._callFUT(options)
1✔
489
        self.assertEqual(fn, '2010-05-14-12-52-31.deltafs')
1✔
490

491
    def test_incr_w_gzip(self):
1✔
492
        options = self._makeOptions(test_now=(2010, 5, 14, 12, 52, 31),
1✔
493
                                    full=False,
494
                                    gzip=True,
495
                                    )
496
        fn = self._callFUT(options)
1✔
497
        self.assertEqual(fn, '2010-05-14-12-52-31.deltafsz')
1✔
498

499

500
class Test_find_files(OptionsTestBase, unittest.TestCase):
1✔
501

502
    def _callFUT(self, options):
1✔
503
        from ZODB.scripts.repozo import find_files
1✔
504
        return find_files(options)
1✔
505

506
    def _makeFile(self, hour, min, sec, ext):
1✔
507
        # call _makeOptions first!
508
        name = '2010-05-14-%02d-%02d-%02d%s' % (hour, min, sec, ext)
1✔
509
        fqn = os.path.join(self._repository_directory, name)
1✔
510
        _write_file(fqn, name.encode())
1✔
511
        return fqn
1✔
512

513
    def test_no_files(self):
1✔
514
        options = self._makeOptions(date='2010-05-14-13-30-57')
1✔
515
        found = self._callFUT(options)
1✔
516
        self.assertEqual(found, [])
1✔
517

518
    def test_explicit_date(self):
1✔
519
        options = self._makeOptions(date='2010-05-14-13-30-57')
1✔
520
        files = []
1✔
521
        for h, m, s, e in [(2, 13, 14, '.fs'),
1✔
522
                           (2, 13, 14, '.dat'),
523
                           (3, 14, 15, '.deltafs'),
524
                           (4, 14, 15, '.deltafs'),
525
                           (5, 14, 15, '.deltafs'),
526
                           (12, 13, 14, '.fs'),
527
                           (12, 13, 14, '.dat'),
528
                           (13, 14, 15, '.deltafs'),
529
                           (14, 15, 16, '.deltafs'),
530
                           ]:
531
            files.append(self._makeFile(h, m, s, e))
1✔
532
        found = self._callFUT(options)
1✔
533
        # Older files, .dat file not included
534
        self.assertEqual(found, [files[5], files[7]])
1✔
535

536
    def test_using_gen_filename(self):
1✔
537
        options = self._makeOptions(date=None,
1✔
538
                                    test_now=(2010, 5, 14, 13, 30, 57))
539
        files = []
1✔
540
        for h, m, s, e in [(2, 13, 14, '.fs'),
1✔
541
                           (2, 13, 14, '.dat'),
542
                           (3, 14, 15, '.deltafs'),
543
                           (4, 14, 15, '.deltafs'),
544
                           (5, 14, 15, '.deltafs'),
545
                           (12, 13, 14, '.fs'),
546
                           (12, 13, 14, '.dat'),
547
                           (13, 14, 15, '.deltafs'),
548
                           (14, 15, 16, '.deltafs'),
549
                           ]:
550
            files.append(self._makeFile(h, m, s, e))
1✔
551
        found = self._callFUT(options)
1✔
552
        # Older files, .dat file not included
553
        self.assertEqual(found, [files[5], files[7]])
1✔
554

555

556
class Test_scandat(OptionsTestBase, unittest.TestCase):
1✔
557

558
    def _callFUT(self, repofiles):
1✔
559
        from ZODB.scripts.repozo import scandat
1✔
560
        return scandat(repofiles)
1✔
561

562
    def test_no_dat_file(self):
1✔
563
        self._makeOptions()
1✔
564
        fsfile = os.path.join(self._repository_directory, 'foo.fs')
1✔
565
        fn, startpos, endpos, sum = self._callFUT([fsfile])
1✔
566
        self.assertEqual(fn, None)
1✔
567
        self.assertEqual(startpos, None)
1✔
568
        self.assertEqual(endpos, None)
1✔
569
        self.assertEqual(sum, None)
1✔
570

571
    def test_empty_dat_file(self):
1✔
572
        self._makeOptions()
1✔
573
        fsfile = os.path.join(self._repository_directory, 'foo.fs')
1✔
574
        datfile = os.path.join(self._repository_directory, 'foo.dat')
1✔
575
        _write_file(datfile, b'')
1✔
576
        fn, startpos, endpos, sum = self._callFUT([fsfile])
1✔
577
        self.assertEqual(fn, None)
1✔
578
        self.assertEqual(startpos, None)
1✔
579
        self.assertEqual(endpos, None)
1✔
580
        self.assertEqual(sum, None)
1✔
581

582
    def test_single_line(self):
1✔
583
        self._makeOptions()
1✔
584
        fsfile = os.path.join(self._repository_directory, 'foo.fs')
1✔
585
        datfile = os.path.join(self._repository_directory, 'foo.dat')
1✔
586
        _write_file(datfile, b'foo.fs 0 123 ABC\n')
1✔
587
        fn, startpos, endpos, sum = self._callFUT([fsfile])
1✔
588
        self.assertEqual(fn, 'foo.fs')
1✔
589
        self.assertEqual(startpos, 0)
1✔
590
        self.assertEqual(endpos, 123)
1✔
591
        self.assertEqual(sum, 'ABC')
1✔
592

593
    def test_multiple_lines(self):
1✔
594
        self._makeOptions()
1✔
595
        fsfile = os.path.join(self._repository_directory, 'foo.fs')
1✔
596
        datfile = os.path.join(self._repository_directory, 'foo.dat')
1✔
597
        _write_file(datfile, b'foo.fs 0 123 ABC\n'
1✔
598
                             b'bar.deltafs 123 456 DEF\n')
599
        fn, startpos, endpos, sum = self._callFUT([fsfile])
1✔
600
        self.assertEqual(fn, 'bar.deltafs')
1✔
601
        self.assertEqual(startpos, 123)
1✔
602
        self.assertEqual(endpos, 456)
1✔
603
        self.assertEqual(sum, 'DEF')
1✔
604

605

606
class Test_delete_old_backups(OptionsTestBase, unittest.TestCase):
1✔
607

608
    def _makeOptions(self, filenames=()):
1✔
609
        options = super()._makeOptions()
1✔
610
        for filename in filenames:
1✔
611
            fqn = os.path.join(options.repository, filename)
1✔
612
            _write_file(fqn, b'testing delete_old_backups')
1✔
613
        return options
1✔
614

615
    def _callFUT(self, options=None, filenames=()):
1✔
616
        from ZODB.scripts.repozo import delete_old_backups
1✔
617
        if options is None:
1!
618
            options = self._makeOptions(filenames)
1✔
619
        return delete_old_backups(options)
1✔
620

621
    def test_empty_dir_doesnt_raise(self):
1✔
622
        self._callFUT()
1✔
623
        self.assertEqual(len(os.listdir(self._repository_directory)), 0)
1✔
624

625
    def test_no_repozo_files_doesnt_raise(self):
1✔
626
        FILENAMES = ['bogus.txt', 'not_a_repozo_file']
1✔
627
        self._callFUT(filenames=FILENAMES)
1✔
628
        remaining = os.listdir(self._repository_directory)
1✔
629
        self.assertEqual(len(remaining), len(FILENAMES))
1✔
630
        for name in FILENAMES:
1✔
631
            fqn = os.path.join(self._repository_directory, name)
1✔
632
            self.assertTrue(os.path.isfile(fqn))
1✔
633

634
    def test_doesnt_remove_current_repozo_files(self):
1✔
635
        FILENAMES = ['2009-12-20-10-08-03.fs',
1✔
636
                     '2009-12-20-10-08-03.dat',
637
                     '2009-12-20-10-08-03.index',
638
                     ]
639
        self._callFUT(filenames=FILENAMES)
1✔
640
        remaining = os.listdir(self._repository_directory)
1✔
641
        self.assertEqual(len(remaining), len(FILENAMES))
1✔
642
        for name in FILENAMES:
1✔
643
            fqn = os.path.join(self._repository_directory, name)
1✔
644
            self.assertTrue(os.path.isfile(fqn))
1✔
645

646
    def test_removes_older_repozo_files(self):
1✔
647
        OLDER_FULL = ['2009-12-20-00-01-03.fs',
1✔
648
                      '2009-12-20-00-01-03.dat',
649
                      '2009-12-20-00-01-03.index',
650
                      ]
651
        DELTAS = ['2009-12-21-00-00-01.deltafs',
1✔
652
                  '2009-12-21-00-00-01.index',
653
                  '2009-12-22-00-00-01.deltafs',
654
                  '2009-12-22-00-00-01.index',
655
                  ]
656
        CURRENT_FULL = ['2009-12-23-00-00-01.fs',
1✔
657
                        '2009-12-23-00-00-01.dat',
658
                        '2009-12-23-00-00-01.index',
659
                        ]
660
        FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
1✔
661
        self._callFUT(filenames=FILENAMES)
1✔
662
        remaining = os.listdir(self._repository_directory)
1✔
663
        self.assertEqual(len(remaining), len(CURRENT_FULL))
1✔
664
        for name in OLDER_FULL:
1✔
665
            fqn = os.path.join(self._repository_directory, name)
1✔
666
            self.assertFalse(os.path.isfile(fqn))
1✔
667
        for name in DELTAS:
1✔
668
            fqn = os.path.join(self._repository_directory, name)
1✔
669
            self.assertFalse(os.path.isfile(fqn))
1✔
670
        for name in CURRENT_FULL:
1✔
671
            fqn = os.path.join(self._repository_directory, name)
1✔
672
            self.assertTrue(os.path.isfile(fqn))
1✔
673

674
    def test_removes_older_repozo_files_zipped(self):
1✔
675
        OLDER_FULL = ['2009-12-20-00-01-03.fsz',
1✔
676
                      '2009-12-20-00-01-03.dat',
677
                      '2009-12-20-00-01-03.index',
678
                      ]
679
        DELTAS = ['2009-12-21-00-00-01.deltafsz',
1✔
680
                  '2009-12-21-00-00-01.index',
681
                  '2009-12-22-00-00-01.deltafsz',
682
                  '2009-12-22-00-00-01.index',
683
                  ]
684
        CURRENT_FULL = ['2009-12-23-00-00-01.fsz',
1✔
685
                        '2009-12-23-00-00-01.dat',
686
                        '2009-12-23-00-00-01.index',
687
                        ]
688
        FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
1✔
689
        self._callFUT(filenames=FILENAMES)
1✔
690
        remaining = os.listdir(self._repository_directory)
1✔
691
        self.assertEqual(len(remaining), len(CURRENT_FULL))
1✔
692
        for name in OLDER_FULL:
1✔
693
            fqn = os.path.join(self._repository_directory, name)
1✔
694
            self.assertFalse(os.path.isfile(fqn))
1✔
695
        for name in DELTAS:
1✔
696
            fqn = os.path.join(self._repository_directory, name)
1✔
697
            self.assertFalse(os.path.isfile(fqn))
1✔
698
        for name in CURRENT_FULL:
1✔
699
            fqn = os.path.join(self._repository_directory, name)
1✔
700
            self.assertTrue(os.path.isfile(fqn))
1✔
701

702

703
class Test_do_full_backup(OptionsTestBase, unittest.TestCase):
1✔
704

705
    def _callFUT(self, options):
1✔
706
        from ZODB.scripts.repozo import do_full_backup
1✔
707
        return do_full_backup(options)
1✔
708

709
    def _makeDB(self):
1✔
710
        import tempfile
1✔
711
        self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
712
        return OurDB(self._data_directory)
1✔
713

714
    def test_dont_overwrite_existing_file(self):
1✔
715
        from ZODB.scripts.repozo import WouldOverwriteFiles
1✔
716
        from ZODB.scripts.repozo import gen_filename
1✔
717
        db = self._makeDB()
1✔
718
        options = self._makeOptions(full=True,
1✔
719
                                    file=db._file_name,
720
                                    gzip=False,
721
                                    test_now=(2010, 5, 14, 10, 51, 22),
722
                                    )
723
        fqn = os.path.join(self._repository_directory, gen_filename(options))
1✔
724
        _write_file(fqn, b'TESTING')
1✔
725
        self.assertRaises(WouldOverwriteFiles, self._callFUT, options)
1✔
726

727
    def test_empty(self):
1✔
728
        import struct
1✔
729

730
        from ZODB.fsIndex import fsIndex
1✔
731
        from ZODB.scripts.repozo import gen_filename
1✔
732
        db = self._makeDB()
1✔
733
        options = self._makeOptions(file=db._file_name,
1✔
734
                                    gzip=False,
735
                                    killold=False,
736
                                    test_now=(2010, 5, 14, 10, 51, 22),
737
                                    )
738
        self._callFUT(options)
1✔
739
        target = os.path.join(self._repository_directory,
1✔
740
                              gen_filename(options))
741
        original = _read_file(db._file_name)
1✔
742
        self.assertEqual(_read_file(target), original)
1✔
743
        datfile = os.path.join(self._repository_directory,
1✔
744
                               gen_filename(options, '.dat'))
745
        self.assertEqual(_read_file(datfile, mode='r'),  # XXX 'rb'?
1✔
746
                         '%s 0 %d %s\n' %
747
                         (target, len(original), md5(original).hexdigest()))
748
        ndxfile = os.path.join(self._repository_directory,
1✔
749
                               gen_filename(options, '.index'))
750
        ndx_info = fsIndex.load(ndxfile)
1✔
751
        self.assertEqual(ndx_info['pos'], len(original))
1✔
752
        index = ndx_info['index']
1✔
753
        pZero = struct.pack(">Q", 0)
1✔
754
        pOne = struct.pack(">Q", 1)
1✔
755
        self.assertEqual(index.minKey(), pZero)
1✔
756
        self.assertEqual(index.maxKey(), pOne)
1✔
757

758

759
class Test_do_incremental_backup(OptionsTestBase, unittest.TestCase):
1✔
760

761
    def _callFUT(self, options, reposz, repofiles):
1✔
762
        from ZODB.scripts.repozo import do_incremental_backup
1✔
763
        return do_incremental_backup(options, reposz, repofiles)
1✔
764

765
    def _makeDB(self):
1✔
766
        import tempfile
1✔
767
        self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
768
        return OurDB(self._data_directory)
1✔
769

770
    def test_dont_overwrite_existing_file(self):
1✔
771
        from ZODB.scripts.repozo import WouldOverwriteFiles
1✔
772
        from ZODB.scripts.repozo import find_files
1✔
773
        from ZODB.scripts.repozo import gen_filename
1✔
774
        db = self._makeDB()
1✔
775
        options = self._makeOptions(full=False,
1✔
776
                                    file=db._file_name,
777
                                    gzip=False,
778
                                    test_now=(2010, 5, 14, 10, 51, 22),
779
                                    date=None,
780
                                    )
781
        fqn = os.path.join(self._repository_directory, gen_filename(options))
1✔
782
        _write_file(fqn, b'TESTING')
1✔
783
        repofiles = find_files(options)
1✔
784
        self.assertRaises(WouldOverwriteFiles,
1✔
785
                          self._callFUT, options, 0, repofiles)
786

787
    def test_no_changes(self):
1✔
788
        import struct
1✔
789

790
        from ZODB.fsIndex import fsIndex
1✔
791
        from ZODB.scripts.repozo import gen_filename
1✔
792
        db = self._makeDB()
1✔
793
        oldpos = db.pos
1✔
794
        options = self._makeOptions(file=db._file_name,
1✔
795
                                    gzip=False,
796
                                    killold=False,
797
                                    test_now=(2010, 5, 14, 10, 51, 22),
798
                                    date=None,
799
                                    )
800
        fullfile = os.path.join(self._repository_directory,
1✔
801
                                '2010-05-14-00-00-00.fs')
802
        original = _read_file(db._file_name)
1✔
803
        _write_file(fullfile, original)
1✔
804
        datfile = os.path.join(self._repository_directory,
1✔
805
                               '2010-05-14-00-00-00.dat')
806
        repofiles = [fullfile, datfile]
1✔
807
        self._callFUT(options, oldpos, repofiles)
1✔
808
        target = os.path.join(self._repository_directory,
1✔
809
                              gen_filename(options))
810
        self.assertEqual(_read_file(target), b'')
1✔
811
        self.assertEqual(_read_file(datfile, mode='r'),  # XXX mode='rb'?
1✔
812
                         '%s %d %d %s\n' %
813
                         (target, oldpos, oldpos, md5(b'').hexdigest()))
814
        ndxfile = os.path.join(self._repository_directory,
1✔
815
                               gen_filename(options, '.index'))
816
        ndx_info = fsIndex.load(ndxfile)
1✔
817
        self.assertEqual(ndx_info['pos'], oldpos)
1✔
818
        index = ndx_info['index']
1✔
819
        pZero = struct.pack(">Q", 0)
1✔
820
        pOne = struct.pack(">Q", 1)
1✔
821
        self.assertEqual(index.minKey(), pZero)
1✔
822
        self.assertEqual(index.maxKey(), pOne)
1✔
823

824
    def test_w_changes(self):
1✔
825
        import struct
1✔
826

827
        from ZODB.fsIndex import fsIndex
1✔
828
        from ZODB.scripts.repozo import gen_filename
1✔
829
        db = self._makeDB()
1✔
830
        oldpos = db.pos
1✔
831
        options = self._makeOptions(file=db._file_name,
1✔
832
                                    gzip=False,
833
                                    killold=False,
834
                                    test_now=(2010, 5, 14, 10, 51, 22),
835
                                    date=None,
836
                                    )
837
        fullfile = os.path.join(self._repository_directory,
1✔
838
                                '2010-05-14-00-00-00.fs')
839
        original = _read_file(db._file_name)
1✔
840
        f = _write_file(fullfile, original)
1✔
841
        datfile = os.path.join(self._repository_directory,
1✔
842
                               '2010-05-14-00-00-00.dat')
843
        repofiles = [fullfile, datfile]
1✔
844
        db.mutate()
1✔
845
        newpos = db.pos
1✔
846
        self._callFUT(options, oldpos, repofiles)
1✔
847
        target = os.path.join(self._repository_directory,
1✔
848
                              gen_filename(options))
849
        with open(db._file_name, 'rb') as f:
1✔
850
            f.seek(oldpos)
1✔
851
            increment = f.read()
1✔
852
        self.assertEqual(_read_file(target), increment)
1✔
853
        self.assertEqual(_read_file(datfile, mode='r'),  # XXX mode='rb'?
1✔
854
                         '%s %d %d %s\n' %
855
                         (target, oldpos, newpos,
856
                             md5(increment).hexdigest()))
857
        ndxfile = os.path.join(self._repository_directory,
1✔
858
                               gen_filename(options, '.index'))
859
        ndx_info = fsIndex.load(ndxfile)
1✔
860
        self.assertEqual(ndx_info['pos'], newpos)
1✔
861
        index = ndx_info['index']
1✔
862
        pZero = struct.pack(">Q", 0)
1✔
863
        self.assertEqual(index.minKey(), pZero)
1✔
864
        self.assertEqual(index.maxKey(), db.maxkey)
1✔
865

866

867
class Mixin_do_recover:
1✔
868
    def _callFUT(self, options):
1✔
869
        from ZODB.scripts.repozo import do_recover
1✔
870
        return do_recover(options)
1✔
871

872
    def _makeFile(self, hour, min, sec, ext, text=None):
1✔
873
        # call _makeOptions first!
874
        name = '2010-05-14-%02d-%02d-%02d%s' % (hour, min, sec, ext)
1✔
875
        if text is None:
1✔
876
            text = name
1✔
877
        fqn = os.path.join(self._repository_directory, name)
1✔
878
        _write_file(fqn, text.encode())
1✔
879
        return fqn
1✔
880

881
    def test_no_files(self):
1✔
882
        from ZODB.scripts.repozo import NoFiles
1✔
883
        options = self._makeOptions(date=None,
1✔
884
                                    test_now=(2010, 5, 15, 13, 30, 57))
885
        self.assertRaises(NoFiles, self._callFUT, options)
1✔
886

887
    def test_no_files_before_explicit_date(self):
1✔
888
        from ZODB.scripts.repozo import NoFiles
1✔
889
        options = self._makeOptions(date='2010-05-13-13-30-57')
1✔
890
        files = []
1✔
891
        for h, m, s, e in [(2, 13, 14, '.fs'),
1✔
892
                           (2, 13, 14, '.dat'),
893
                           (3, 14, 15, '.deltafs'),
894
                           (4, 14, 15, '.deltafs'),
895
                           (5, 14, 15, '.deltafs'),
896
                           (12, 13, 14, '.fs'),
897
                           (12, 13, 14, '.dat'),
898
                           (13, 14, 15, '.deltafs'),
899
                           (14, 15, 16, '.deltafs'),
900
                           ]:
901
            files.append(self._makeFile(h, m, s, e))
1✔
902
        self.assertRaises(NoFiles, self._callFUT, options)
1✔
903

904

905
class Test_do_full_recover(
1✔
906
        Mixin_do_recover,
907
        OptionsTestBase,
908
        unittest.TestCase
909
):
910
    def _makeOptions(self, **kw):
1✔
911
        options = super()._makeOptions(**kw)
1✔
912
        options.full = True
1✔
913
        return options
1✔
914

915
    def test_w_full_backup_latest_no_index(self):
1✔
916
        import tempfile
1✔
917
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
918
        output = os.path.join(dd, 'Data.fs')
1✔
919
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
920
                                    output=output,
921
                                    withverify=False)
922
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
923
        self._makeFile(4, 5, 6, '.fs', 'BBB')
1✔
924
        self._callFUT(options)
1✔
925
        self.assertEqual(_read_file(output), b'BBB')
1✔
926

927
    def test_w_full_backup_latest_index(self):
1✔
928
        import tempfile
1✔
929
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
930
        output = os.path.join(dd, 'Data.fs')
1✔
931
        index = os.path.join(dd, 'Data.fs.index')
1✔
932
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
933
                                    output=output,
934
                                    withverify=False)
935
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
936
        self._makeFile(4, 5, 6, '.fs', 'BBB')
1✔
937
        self._makeFile(4, 5, 6, '.index', 'CCC')
1✔
938
        self._callFUT(options)
1✔
939
        self.assertEqual(_read_file(output), b'BBB')
1✔
940
        self.assertEqual(_read_file(index), b'CCC')
1✔
941

942
    def test_w_incr_backup_latest_no_index(self):
1✔
943
        import tempfile
1✔
944
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
945
        output = os.path.join(dd, 'Data.fs')
1✔
946
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
947
                                    output=output,
948
                                    withverify=False)
949
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
950
        self._makeFile(4, 5, 6, '.deltafs', 'BBB')
1✔
951
        self._callFUT(options)
1✔
952
        self.assertEqual(_read_file(output), b'AAABBB')
1✔
953

954
    def test_w_incr_backup_latest_index(self):
1✔
955
        import tempfile
1✔
956
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
957
        output = os.path.join(dd, 'Data.fs')
1✔
958
        index = os.path.join(dd, 'Data.fs.index')
1✔
959
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
960
                                    output=output,
961
                                    withverify=False)
962
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
963
        self._makeFile(4, 5, 6, '.deltafs', 'BBB')
1✔
964
        self._makeFile(4, 5, 6, '.index', 'CCC')
1✔
965
        self._callFUT(options)
1✔
966
        self.assertEqual(_read_file(output), b'AAABBB')
1✔
967
        self.assertEqual(_read_file(index), b'CCC')
1✔
968

969
    def test_w_incr_backup_with_verify_all_is_fine(self):
1✔
970
        import tempfile
1✔
971
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
972
        output = os.path.join(dd, 'Data.fs')
1✔
973
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
974
                                    output=output,
975
                                    withverify=True)
976
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
977
        self._makeFile(4, 5, 6, '.deltafs', 'BBBB')
1✔
978
        self._makeFile(
1✔
979
            2, 3, 4, '.dat',
980
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
981
            '/backup/2010-05-14-04-05-06.deltafs 3 7 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
982
        self._callFUT(options)
1✔
983
        self.assertFalse(os.path.exists(output + '.part'))
1✔
984
        self.assertEqual(_read_file(output), b'AAABBBB')
1✔
985

986
    def test_w_incr_backup_with_verify_sum_inconsistent(self):
1✔
987
        import tempfile
1✔
988

989
        from ZODB.scripts.repozo import VerificationFail
1✔
990
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
991
        output = os.path.join(dd, 'Data.fs')
1✔
992
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
993
                                    output=output,
994
                                    withverify=True)
995
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
996
        self._makeFile(4, 5, 6, '.deltafs', 'BBBB')
1✔
997
        self._makeFile(
1✔
998
            2, 3, 4, '.dat',
999
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1000
            '/backup/2010-05-14-04-05-06.deltafs 3 7 f50881ced34c7d9e6bce100bf33dec61\n')  # noqa: E501 line too long
1001
        self.assertRaises(VerificationFail, self._callFUT, options)
1✔
1002
        self.assertTrue(os.path.exists(output + '.part'))
1✔
1003

1004
    def test_w_incr_backup_with_verify_size_inconsistent(self):
1✔
1005
        import tempfile
1✔
1006

1007
        from ZODB.scripts.repozo import VerificationFail
1✔
1008
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1009
        output = os.path.join(dd, 'Data.fs')
1✔
1010
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
1011
                                    output=output,
1012
                                    withverify=True)
1013
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
1014
        self._makeFile(4, 5, 6, '.deltafs', 'BBBB')
1✔
1015
        self._makeFile(
1✔
1016
            2, 3, 4, '.dat',
1017
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1018
            '/backup/2010-05-14-04-05-06.deltafs 3 8 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
1019
        self.assertRaises(VerificationFail, self._callFUT, options)
1✔
1020
        self.assertTrue(os.path.exists(output + '.part'))
1✔
1021

1022

1023
class Test_do_incremental_recover(
1✔
1024
        Mixin_do_recover,
1025
        OptionsTestBase,
1026
        unittest.TestCase
1027
):
1028
    def setUp(self):
1✔
1029
        from ZODB.scripts import repozo
1✔
1030
        self._old_verbosity = repozo.VERBOSE
1✔
1031
        self._old_stderr = sys.stderr
1✔
1032
        repozo.VERBOSE = True
1✔
1033
        sys.stderr = StringIO()
1✔
1034

1035
    def tearDown(self):
1✔
1036
        from ZODB.scripts import repozo
1✔
1037
        sys.stderr = self._old_stderr
1✔
1038
        repozo.VERBOSE = self._old_verbosity
1✔
1039

1040
    def _makeOptions(self, **kw):
1✔
1041
        options = super()._makeOptions(**kw)
1✔
1042
        options.full = False
1✔
1043
        return options
1✔
1044

1045
    def _createRecoveredDataFS(self, output, options):
1✔
1046
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
1047
        self._makeFile(4, 5, 6, '.deltafs', 'BBB')
1✔
1048
        self._makeFile(
1✔
1049
            2, 3, 4, '.dat',
1050
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1051
            '/backup/2010-05-14-04-05-06.deltafs 3 6 2bb225f0ba9a58930757a868ed57d9a3\n')  # noqa: E501 line too long
1052
        self._callFUT(options)
1✔
1053
        self.assertEqual(_read_file(output), b'AAABBB')
1✔
1054
        self.assertFalse(os.path.exists(output + '.part'))
1✔
1055
        return output
1✔
1056

1057
    def test_do_nothing(self):
1✔
1058
        import tempfile
1✔
1059
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1060
        output = os.path.join(dd, 'Data.fs')
1✔
1061
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
1062
                                    output=output,
1063
                                    withverify=False)
1064
        self._createRecoveredDataFS(output, options)
1✔
1065
        self._callFUT(options)
1✔
1066
        self.assertIn(
1✔
1067
            "doing nothing", sys.stderr.getvalue())
1068

1069
    def test_w_incr_recover_from_incr_backup(self):
1✔
1070
        import tempfile
1✔
1071
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1072
        output = os.path.join(dd, 'Data.fs')
1✔
1073
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
1074
                                    output=output,
1075
                                    withverify=False)
1076
        self._createRecoveredDataFS(output, options)
1✔
1077
        # Create 2 more .deltafs, to prove the code knows where to pick up
1078
        self._makeFile(6, 7, 8, '.deltafs', 'CCC')
1✔
1079
        self._makeFile(8, 9, 10, '.deltafs', 'DDD')
1✔
1080
        self._makeFile(
1✔
1081
            2, 3, 4, '.dat',
1082
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1083
            '/backup/2010-05-14-04-05-06.deltafs 3 6 2bb225f0ba9a58930757a868ed57d9a3\n'  # noqa: E501 line too long
1084
            '/backup/2010-05-14-06-07-08.deltafs 6 9 defb99e69a9f1f6e06f15006b1f166ae\n'  # noqa: E501 line too long
1085
            '/backup/2010-05-14-08-09-10.deltafs 9 12 45054f47ac3305a2a33e9bcceadff712\n')  # noqa: E501 line too long
1086
        os.unlink(
1✔
1087
            os.path.join(self._repository_directory,
1088
                         '2010-05-14-04-05-06.deltafs'))
1089
        self._callFUT(options)
1✔
1090
        self.assertNotIn('falling back to a full recover.',
1✔
1091
                         sys.stderr.getvalue())
1092
        self.assertEqual(_read_file(output), b'AAABBBCCCDDD')
1✔
1093
        self.assertFalse(os.path.exists(output + '.part'))
1✔
1094

1095
    def test_w_incr_backup_with_verify_sum_inconsistent(self):
1✔
1096
        import tempfile
1✔
1097
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1098
        output = os.path.join(dd, 'Data.fs')
1✔
1099
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
1100
                                    output=output,
1101
                                    withverify=True)
1102
        self._createRecoveredDataFS(output, options)
1✔
1103
        self._makeFile(6, 7, 8, '.deltafs', 'CCC')
1✔
1104
        self._makeFile(8, 9, 10, '.deltafs', 'DDD')
1✔
1105
        self._makeFile(
1✔
1106
            2, 3, 4, '.dat',
1107
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1108
            '/backup/2010-05-14-04-05-06.deltafs 3 6 2bb225f0ba9a58930757a868ed57d9a3\n'  # noqa: E501 line too long
1109
            '/backup/2010-05-14-06-07-08.deltafs 6 9 defb99e69a9f1f6e06f15006b1f166af\n'  # noqa: E501 line too long
1110
            '/backup/2010-05-14-08-09-10.deltafs 9 12 45054f47ac3305a2a33e9bcceadff712\n')  # noqa: E501 line too long
1111
        from ZODB.scripts.repozo import VerificationFail
1✔
1112
        self.assertRaises(VerificationFail, self._callFUT, options)
1✔
1113
        self.assertNotIn('falling back to a full recover.',
1✔
1114
                         sys.stderr.getvalue())
1115
        self.assertTrue(os.path.exists(output + '.part'))
1✔
1116

1117
    def test_w_incr_backup_with_verify_size_inconsistent_too_small(self):
1✔
1118
        import tempfile
1✔
1119
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1120
        output = os.path.join(dd, 'Data.fs')
1✔
1121
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
1122
                                    output=output,
1123
                                    withverify=True)
1124
        self._createRecoveredDataFS(output, options)
1✔
1125
        self._makeFile(6, 7, 8, '.deltafs', 'CCC')
1✔
1126
        self._makeFile(8, 9, 10, '.deltafs', 'DDD')
1✔
1127
        self._makeFile(
1✔
1128
            2, 3, 4, '.dat',
1129
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1130
            '/backup/2010-05-14-04-05-06.deltafs 3 6 2bb225f0ba9a58930757a868ed57d9a3\n'  # noqa: E501 line too long
1131
            '/backup/2010-05-14-06-07-08.deltafs 6 8 defb99e69a9f1f6e06f15006b1f166ae\n'  # noqa: E501 line too long
1132
            '/backup/2010-05-14-08-09-10.deltafs 9 12 45054f47ac3305a2a33e9bcceadff712\n')  # noqa: E501 line too long
1133
        from ZODB.scripts.repozo import VerificationFail
1✔
1134
        self.assertRaises(VerificationFail, self._callFUT, options)
1✔
1135
        self.assertNotIn('falling back to a full recover.',
1✔
1136
                         sys.stderr.getvalue())
1137
        self.assertTrue(os.path.exists(output + '.part'))
1✔
1138

1139
    def test_w_incr_backup_with_verify_size_inconsistent_too_big(self):
1✔
1140
        import tempfile
1✔
1141
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1142
        output = os.path.join(dd, 'Data.fs')
1✔
1143
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
1144
                                    output=output,
1145
                                    withverify=True)
1146
        self._createRecoveredDataFS(output, options)
1✔
1147
        self._makeFile(6, 7, 8, '.deltafs', 'CCC')
1✔
1148
        self._makeFile(8, 9, 10, '.deltafs', 'DDD')
1✔
1149
        self._makeFile(
1✔
1150
            2, 3, 4, '.dat',
1151
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1152
            '/backup/2010-05-14-04-05-06.deltafs 3 6 2bb225f0ba9a58930757a868ed57d9a3\n'  # noqa: E501 line too long
1153
            '/backup/2010-05-14-06-07-08.deltafs 6 10 defb99e69a9f1f6e06f15006b1f166ae\n'  # noqa: E501 line too long
1154
            '/backup/2010-05-14-08-09-10.deltafs 9 12 45054f47ac3305a2a33e9bcceadff712\n')  # noqa: E501 line too long
1155
        from ZODB.scripts.repozo import VerificationFail
1✔
1156
        self.assertRaises(VerificationFail, self._callFUT, options)
1✔
1157
        self.assertNotIn('falling back to a full recover.',
1✔
1158
                         sys.stderr.getvalue())
1159
        self.assertTrue(os.path.exists(output + '.part'))
1✔
1160

1161
    def test_w_inc_backup_switch_auto_to_full_recover_if_output_larger_than_dat(self):  # noqa: E501 line too long
1✔
1162
        import tempfile
1✔
1163
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1164
        output = os.path.join(dd, 'Data.fs')
1✔
1165
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
1166
                                    output=output,
1167
                                    withverify=False)
1168
        self._createRecoveredDataFS(output, options)
1✔
1169
        self._makeFile(6, 7, 8, '.deltafs', 'CCC')
1✔
1170
        self._makeFile(
1✔
1171
            2, 3, 4, '.dat',
1172
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1173
            '/backup/2010-05-14-04-05-06.deltafs 3 6 2bb225f0ba9a58930757a868ed57d9a3\n'  # noqa: E501 line too long
1174
            '/backup/2010-05-14-06-07-08.deltafs 6 9 defb99e69a9f1f6e06f15006b1f166ae\n')  # noqa: E501 line too long
1175
        # The ZODB is longer than announced in the .dat file
1176
        with open(output, 'r+b') as f:
1✔
1177
            f.write(b'AAABBBCCCDDD')
1✔
1178
        self._callFUT(options)
1✔
1179
        self.assertEqual(_read_file(output), b'AAABBBCCC')
1✔
1180
        self.assertFalse(os.path.exists(output + '.part'))
1✔
1181
        self.assertIn(
1✔
1182
            "falling back to a full recover", sys.stderr.getvalue())
1183

1184
    def test_w_inc_backup_switch_auto_to_full_recover_if_last_chunk_is_wrong(self):  # noqa: E501 line too long
1✔
1185
        import tempfile
1✔
1186
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1187
        output = os.path.join(dd, 'Data.fs')
1✔
1188
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
1189
                                    output=output,
1190
                                    withverify=False)
1191
        self._createRecoveredDataFS(output, options)
1✔
1192
        self._makeFile(6, 7, 8, '.deltafs', 'CCC')
1✔
1193
        self._makeFile(
1✔
1194
            2, 3, 4, '.dat',
1195
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1196
            '/backup/2010-05-14-04-05-06.deltafs 3 6 2bb225f0ba9a58930757a868ed57d9a4\n'  # noqa: E501 line too long
1197
            '/backup/2010-05-14-06-07-08.deltafs 6 9 defb99e69a9f1f6e06f15006b1f166ae\n')  # noqa: E501 line too long
1198
        self._callFUT(options)
1✔
1199
        self.assertEqual(_read_file(output), b'AAABBBCCC')
1✔
1200
        self.assertFalse(os.path.exists(output + '.part'))
1✔
1201
        self.assertIn(
1✔
1202
            "Last whole common chunk checksum did not match with backup, falling back to a full recover.",  # noqa: E501 line too long
1203
            sys.stderr.getvalue())
1204

1205
    def test_w_inc_backup_switch_auto_to_full_recover_after_pack(self):
1✔
1206
        import tempfile
1✔
1207
        dd = self._data_directory = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1208
        output = os.path.join(dd, 'Data.fs')
1✔
1209
        options = self._makeOptions(date='2010-05-15-13-30-57',
1✔
1210
                                    output=output,
1211
                                    withverify=False)
1212
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
1213
        self._makeFile(4, 5, 6, '.deltafs', 'BBB')
1✔
1214
        self._makeFile(
1✔
1215
            2, 3, 4, '.dat',
1216
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1217
            '/backup/2010-05-14-04-05-06.deltafs 3 6 2bb225f0ba9a58930757a868ed57d9a3\n')  # noqa: E501 line too long
1218
        self._callFUT(options)
1✔
1219
        self.assertEqual(_read_file(output), b'AAABBB')
1✔
1220

1221
        self._makeFile(6, 7, 8, '.fs', 'CCDD')
1✔
1222
        self._makeFile(
1✔
1223
            6, 7, 8, '.dat',
1224
            '/backup/2010-05-14-06-07-08.fs 0 4 dc0ee37408176d839c13f291a4d588de\n')  # noqa: E501 line too long
1225
        self._callFUT(options)
1✔
1226
        self.assertEqual(_read_file(output), b'CCDD')
1✔
1227
        self.assertFalse(os.path.exists(output + '.part'))
1✔
1228
        self.assertIn(
1✔
1229
            'Target file is larger than latest backup, falling back to a full recover.',  # noqa: E501 line too long
1230
            sys.stderr.getvalue())
1231

1232

1233
class Test_do_verify(OptionsTestBase, unittest.TestCase):
1✔
1234

1235
    def _callFUT(self, options):
1✔
1236
        from ZODB.scripts.repozo import do_verify
1✔
1237
        do_verify(options)
1✔
1238

1239
    def _makeFile(self, hour, min, sec, ext, text=None):
1✔
1240
        from ZODB.scripts.repozo import _GzipCloser
1✔
1241
        assert self._repository_directory, 'call _makeOptions first!'
1✔
1242
        name = '2010-05-14-%02d-%02d-%02d%s' % (hour, min, sec, ext)
1✔
1243
        if text is None:
1!
1244
            text = name
×
1245
        fqn = os.path.join(self._repository_directory, name)
1✔
1246
        if ext.endswith('fsz'):
1✔
1247
            _opener = _GzipCloser
1✔
1248
        else:
1249
            _opener = open
1✔
1250
        with _opener(fqn, 'wb') as f:
1✔
1251
            f.write(text.encode())
1✔
1252
            f.flush()
1✔
1253
        return fqn
1✔
1254

1255
    def test_no_files(self):
1✔
1256
        from ZODB.scripts.repozo import NoFiles
1✔
1257
        options = self._makeOptions()
1✔
1258
        self.assertRaises(NoFiles, self._callFUT, options)
1✔
1259

1260
    def test_all_is_fine(self):
1✔
1261
        options = self._makeOptions(quick=False)
1✔
1262
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
1263
        self._makeFile(4, 5, 6, '.deltafs', 'BBBB')
1✔
1264
        self._makeFile(
1✔
1265
            2, 3, 4, '.dat',
1266
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1267
            '/backup/2010-05-14-04-05-06.deltafs 3 7 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
1268
        self._callFUT(options)
1✔
1269

1270
    def test_all_is_fine_gzip(self):
1✔
1271
        options = self._makeOptions(quick=False)
1✔
1272
        self._makeFile(2, 3, 4, '.fsz', 'AAA')
1✔
1273
        self._makeFile(4, 5, 6, '.deltafsz', 'BBBB')
1✔
1274
        self._makeFile(
1✔
1275
            2, 3, 4, '.dat',
1276
            '/backup/2010-05-14-02-03-04.fsz 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1277
            '/backup/2010-05-14-04-05-06.deltafsz 3 7 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
1278
        self._callFUT(options)
1✔
1279

1280
    def test_missing_file(self):
1✔
1281
        from ZODB.scripts.repozo import VerificationFail
1✔
1282
        options = self._makeOptions(quick=True)
1✔
1283
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
1284
        self._makeFile(
1✔
1285
            2, 3, 4, '.dat',
1286
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1287
            '/backup/2010-05-14-04-05-06.deltafs 3 7 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
1288
        self.assertRaisesRegex(
1✔
1289
            VerificationFail,
1290
            '2010-05-14-04-05-06.deltafs is missing',
1291
            self._callFUT,
1292
            options,
1293
        )
1294

1295
    def test_missing_file_gzip(self):
1✔
1296
        from ZODB.scripts.repozo import VerificationFail
1✔
1297
        options = self._makeOptions(quick=True)
1✔
1298
        self._makeFile(2, 3, 4, '.fsz', 'AAA')
1✔
1299
        self._makeFile(
1✔
1300
            2, 3, 4, '.dat',
1301
            '/backup/2010-05-14-02-03-04.fsz 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1302
            '/backup/2010-05-14-04-05-06.deltafsz 3 7 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
1303
        self.assertRaisesRegex(
1✔
1304
            VerificationFail,
1305
            '2010-05-14-04-05-06.deltafsz is missing',
1306
            self._callFUT,
1307
            options,
1308
        )
1309

1310
    def test_bad_size(self):
1✔
1311
        from ZODB.scripts.repozo import VerificationFail
1✔
1312
        options = self._makeOptions(quick=False)
1✔
1313
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
1314
        self._makeFile(4, 5, 6, '.deltafs', 'BBB')
1✔
1315
        self._makeFile(
1✔
1316
            2, 3, 4, '.dat',
1317
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1318
            '/backup/2010-05-14-04-05-06.deltafs 3 7 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
1319
        self.assertRaisesRegex(
1✔
1320
            VerificationFail,
1321
            '2010-05-14-04-05-06.deltafs is 3 bytes, should be 4 bytes',
1322
            self._callFUT,
1323
            options,
1324
        )
1325

1326
    def test_bad_size_gzip(self):
1✔
1327
        from ZODB.scripts.repozo import VerificationFail
1✔
1328
        options = self._makeOptions(quick=False)
1✔
1329
        self._makeFile(2, 3, 4, '.fsz', 'AAA')
1✔
1330
        self._makeFile(4, 5, 6, '.deltafsz', 'BBB')
1✔
1331
        self._makeFile(
1✔
1332
            2, 3, 4, '.dat',
1333
            '/backup/2010-05-14-02-03-04.fsz 0 3 e1faffb3e614e6c2fba74296962386b7\n'   # noqa: E501 line too long
1334
            '/backup/2010-05-14-04-05-06.deltafsz 3 7 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
1335
        self.assertRaisesRegex(
1✔
1336
            VerificationFail,
1337
            (
1338
                '2010-05-14-04-05-06.deltafsz is 3 bytes'
1339
                r' \(when uncompressed\), should be 4 bytes'
1340
            ),
1341
            self._callFUT,
1342
            options,
1343
        )
1344

1345
    def test_bad_checksum(self):
1✔
1346
        from ZODB.scripts.repozo import VerificationFail
1✔
1347
        options = self._makeOptions(quick=False)
1✔
1348
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
1349
        self._makeFile(4, 5, 6, '.deltafs', 'BbBB')
1✔
1350
        self._makeFile(
1✔
1351
            2, 3, 4, '.dat',
1352
            '/backup/2010-05-14-02-03-04.fs 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1353
            '/backup/2010-05-14-04-05-06.deltafs 3 7 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
1354
        self.assertRaisesRegex(
1✔
1355
            VerificationFail,
1356
            (
1357
                '2010-05-14-04-05-06.deltafs has checksum'
1358
                + ' 36486440db255f0ee6ab109d5d231406 instead of'
1359
                + ' f50881ced34c7d9e6bce100bf33dec60'
1360
            ),
1361
            self._callFUT,
1362
            options,
1363
        )
1364

1365
    def test_bad_checksum_gzip(self):
1✔
1366
        from ZODB.scripts.repozo import VerificationFail
1✔
1367
        options = self._makeOptions(quick=False)
1✔
1368
        self._makeFile(2, 3, 4, '.fsz', 'AAA')
1✔
1369
        self._makeFile(4, 5, 6, '.deltafsz', 'BbBB')
1✔
1370
        self._makeFile(
1✔
1371
            2, 3, 4, '.dat',
1372
            '/backup/2010-05-14-02-03-04.fsz 0 3 e1faffb3e614e6c2fba74296962386b7\n'  # noqa: E501 line too long
1373
            '/backup/2010-05-14-04-05-06.deltafsz 3 7 f50881ced34c7d9e6bce100bf33dec60\n')  # noqa: E501 line too long
1374
        self.assertRaisesRegex(
1✔
1375
            VerificationFail,
1376
            (
1377
                '2010-05-14-04-05-06.deltafsz has checksum'
1378
                + r' 36486440db255f0ee6ab109d5d231406 \(when uncompressed\)'
1379
                + ' instead of f50881ced34c7d9e6bce100bf33dec60'),
1380
            self._callFUT,
1381
            options,
1382
        )
1383

1384
    def test_quick_ignores_checksums(self):
1✔
1385
        options = self._makeOptions(quick=True)
1✔
1386
        self._makeFile(2, 3, 4, '.fs', 'AAA')
1✔
1387
        self._makeFile(4, 5, 6, '.deltafs', 'BBBB')
1✔
1388
        self._makeFile(
1✔
1389
                2, 3, 4, '.dat',
1390
                '/backup/2010-05-14-02-03-04.fs 0 3 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n'  # noqa: E501 line too long
1391
                '/backup/2010-05-14-04-05-06.deltafs 3 7 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb\n')  # noqa: E501 line too long
1392
        self._callFUT(options)
1✔
1393

1394
    def test_quick_ignores_checksums_gzip(self):
1✔
1395
        options = self._makeOptions(quick=True)
1✔
1396
        self._makeFile(2, 3, 4, '.fsz', 'AAA')
1✔
1397
        self._makeFile(4, 5, 6, '.deltafsz', 'BBBB')
1✔
1398
        self._makeFile(
1✔
1399
            2, 3, 4, '.dat',
1400
            '/backup/2010-05-14-02-03-04.fsz 0 3 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n'  # noqa: E501 line too long
1401
            '/backup/2010-05-14-04-05-06.deltafsz 3 7 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb\n')  # noqa: E501 line too long
1402
        self._callFUT(options)
1✔
1403

1404

1405
class MonteCarloTests(unittest.TestCase):
1✔
1406

1407
    layer = ZODB.tests.util.MininalTestLayer('repozo')
1✔
1408

1409
    def setUp(self):
1✔
1410
        # compute directory names
1411
        import tempfile
1✔
1412
        self.basedir = tempfile.mkdtemp(prefix='zodb-test-')
1✔
1413
        self.backupdir = os.path.join(self.basedir, 'backup')
1✔
1414
        self.datadir = os.path.join(self.basedir, 'data')
1✔
1415
        self.restoredir = os.path.join(self.basedir, 'restore')
1✔
1416
        self.copydir = os.path.join(self.basedir, 'copy')
1✔
1417
        self.currdir = os.getcwd()
1✔
1418
        # create empty directories
1419
        os.mkdir(self.backupdir)
1✔
1420
        os.mkdir(self.datadir)
1✔
1421
        os.mkdir(self.restoredir)
1✔
1422
        os.mkdir(self.copydir)
1✔
1423
        os.chdir(self.datadir)
1✔
1424
        self.db = OurDB(self.datadir)
1✔
1425

1426
    def tearDown(self):
1✔
1427
        os.chdir(self.currdir)
1✔
1428
        import shutil
1✔
1429
        shutil.rmtree(self.basedir)
1✔
1430

1431
    def _callRepozoMain(self, argv):
1✔
1432
        from ZODB.scripts.repozo import main
1✔
1433
        main(argv)
1✔
1434

1435
    @ZODB.tests.util.time_monotonically_increases
1✔
1436
    def test_via_monte_carlo(self):
1✔
1437
        self.saved_snapshots = []  # list of (name, time) pairs for copies.
1✔
1438

1439
        for i in range(100):
1✔
1440
            self.mutate_pack_backup(i)
1✔
1441

1442
        # Verify snapshots can be reproduced exactly.
1443
        for copyname, copytime in self.saved_snapshots:
1✔
1444
            if _NOISY:
1!
UNCOV
1445
                print("Checking that", copyname, end=' ')
×
UNCOV
1446
                print("at", copytime, "is reproducible.")
×
1447
            self.assertRestored(copyname, copytime)
1✔
1448

1449
    def mutate_pack_backup(self, i):
1✔
1450
        import random
1✔
1451
        from shutil import copyfile
1✔
1452
        from time import gmtime
1✔
1453
        self.db.mutate()
1✔
1454

1455
        # Pack about each tenth time.
1456
        if random.random() < 0.1:
1✔
1457
            if _NOISY:
1!
UNCOV
1458
                print("packing")
×
1459
            self.db.pack()
1✔
1460
            self.db.close()
1✔
1461

1462
        # Make an incremental backup, half the time with gzip (-z).
1463
        argv = ['-BQr', self.backupdir, '-f', 'Data.fs']
1✔
1464
        if _NOISY:
1!
UNCOV
1465
            argv.insert(0, '-v')
×
1466
        if random.random() < 0.5:
1✔
1467
            argv.insert(0, '-z')
1✔
1468
        self._callRepozoMain(argv)
1✔
1469

1470
        # Save snapshots to assert that dated restores are possible
1471
        if i % 9 == 0:
1✔
1472
            srcname = os.path.join(self.datadir, 'Data.fs')
1✔
1473
            copytime = '%04d-%02d-%02d-%02d-%02d-%02d' % (gmtime()[:6])
1✔
1474
            copyname = os.path.join(self.copydir, "Data%d.fs" % i)
1✔
1475
            copyfile(srcname, copyname)
1✔
1476
            self.saved_snapshots.append((copyname, copytime))
1✔
1477

1478
        # The clock moves forward automatically on calls to time.time()
1479

1480
        # Verify current Data.fs can be reproduced exactly.
1481
        self.assertRestored()
1✔
1482

1483
    def assertRestored(self, correctpath='Data.fs', when=None):
1✔
1484
        # Do recovery to time 'when', and check that it's identical to
1485
        # correctpath.
1486
        # restore to Restored.fs
1487
        restoredfile = os.path.join(self.restoredir, 'Restored.fs')
1✔
1488
        argv = ['-Rr', self.backupdir, '-o', restoredfile]
1✔
1489
        if _NOISY:
1!
UNCOV
1490
            argv.insert(0, '-v')
×
1491
        if when is not None:
1✔
1492
            argv.append('-D')
1✔
1493
            argv.append(when)
1✔
1494
        self._callRepozoMain(argv)
1✔
1495

1496
        # check restored file content is equal to file that was backed up
1497
        fguts = _read_file(correctpath)
1✔
1498
        gguts = _read_file(restoredfile)
1✔
1499
        msg = ("guts don't match\ncorrectpath=%r when=%r\n cmd=%r" %
1✔
1500
               (correctpath, when, ' '.join(argv)))
1501
        self.assertEqual(fguts, gguts, msg)
1✔
1502

1503

1504
def test_suite():
1✔
1505
    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1✔
1506
    return unittest.TestSuite([
1✔
1507
        loadTestsFromTestCase(Test_parseargs),
1508
        loadTestsFromTestCase(Test_dofile),
1509
        loadTestsFromTestCase(Test_checksum),
1510
        loadTestsFromTestCase(Test_copyfile),
1511
        loadTestsFromTestCase(Test_concat),
1512
        loadTestsFromTestCase(Test_gen_filename),
1513
        loadTestsFromTestCase(Test_find_files),
1514
        loadTestsFromTestCase(Test_scandat),
1515
        loadTestsFromTestCase(Test_delete_old_backups),
1516
        loadTestsFromTestCase(Test_do_full_backup),
1517
        loadTestsFromTestCase(Test_do_incremental_backup),
1518
        # unittest.makeSuite(Test_do_backup),  #TODO
1519
        loadTestsFromTestCase(Test_do_full_recover),
1520
        loadTestsFromTestCase(Test_do_incremental_recover),
1521
        loadTestsFromTestCase(Test_do_verify),
1522
        # N.B.:  this test take forever to run (~40sec on a fast laptop),
1523
        # *and* it is non-deterministic.
1524
        loadTestsFromTestCase(MonteCarloTests),
1525
    ])
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc