• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / zdaemon / 8926552871

02 May 2024 03:35PM UTC coverage: 80.031% (-0.3%) from 80.372%
8926552871

Pull #35

github

tseaver
Add Python 3.12 support; drop Python 3.7 support
Pull Request #35: Add Python 3.12 support; drop Python 3.7 support

403 of 565 branches covered (71.33%)

Branch coverage included in aggregate %.

1677 of 2034 relevant lines covered (82.45%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.4
/src/zdaemon/tests/testzdrun.py
1
"""Test suite for zdrun.py."""
2

3
import os
1✔
4
import shutil
1✔
5
import signal
1✔
6
import socket
1✔
7
import sys
1✔
8
import tempfile
1✔
9
import time
1✔
10
import unittest
1✔
11
from io import StringIO
1✔
12

13
import ZConfig
1✔
14

15
from zdaemon import zdctl
1✔
16
from zdaemon import zdrun
1✔
17

18

19
class ConfiguredOptions:
1✔
20
    """Options class that loads configuration from a specified string.
21

22
    This always loads from the string, regardless of any -C option
23
    that may be given.
24
    """
25

26
    def set_configuration(self, configuration):
1✔
27
        self.__configuration = configuration
1✔
28
        self.configfile = "<preloaded string>"
1✔
29

30
    def load_configfile(self):
1✔
31
        sio = StringIO(self.__configuration)
1✔
32
        cfg = ZConfig.loadConfigFile(self.schema, sio, self.zconfig_options)
1✔
33
        self.configroot, self.confighandlers = cfg
1✔
34

35

36
class ConfiguredZDRunOptions(ConfiguredOptions, zdrun.ZDRunOptions):
1✔
37

38
    def __init__(self, configuration):
1✔
39
        zdrun.ZDRunOptions.__init__(self)
1✔
40
        self.set_configuration(configuration)
1✔
41

42

43
class ZDaemonTests(unittest.TestCase):
1✔
44

45
    python = os.path.abspath(sys.executable)
1✔
46
    assert os.path.exists(python)
1✔
47
    here = os.path.abspath(os.path.dirname(__file__))
1✔
48
    assert os.path.isdir(here)
1✔
49
    nokill = os.path.join(here, "nokill.py")
1✔
50
    assert os.path.exists(nokill)
1✔
51
    parent = os.path.dirname(here)
1✔
52
    zdrun = os.path.join(parent, "zdrun.py")
1✔
53
    assert os.path.exists(zdrun)
1✔
54

55
    ppath = os.pathsep.join(sys.path)
1✔
56

57
    def setUp(self):
1✔
58
        self.zdsock = tempfile.mktemp()
1✔
59
        self.new_stdout = StringIO()
1✔
60
        self.save_stdout = sys.stdout
1✔
61
        sys.stdout = self.new_stdout
1✔
62
        self.expect = ""
1✔
63

64
    def tearDown(self):
1✔
65
        sys.stdout = self.save_stdout
1✔
66
        for sig in (signal.SIGTERM,
1✔
67
                    signal.SIGHUP,
68
                    signal.SIGINT,
69
                    signal.SIGCHLD):
70
            signal.signal(sig, signal.SIG_DFL)
1✔
71
        try:
1✔
72
            os.unlink(self.zdsock)
1✔
73
        except os.error:
1✔
74
            pass
1✔
75
        output = self.new_stdout.getvalue()
1✔
76
        self.assertEqual(self.expect, output)
1✔
77

78
    def quoteargs(self, args):
1✔
79
        for i in range(len(args)):
1✔
80
            if " " in args[i]:
1!
81
                args[i] = '"%s"' % args[i]
×
82
        return " ".join(args)
1✔
83

84
    def rundaemon(self, args):
1✔
85
        # Add quotes, in case some pathname contains spaces (e.g. Mac OS X)
86
        args = self.quoteargs(args)
1✔
87
        cmd = ('PYTHONPATH="%s" "%s" "%s" -d -s "%s" %s' %
1✔
88
               (self.ppath, self.python, self.zdrun, self.zdsock, args))
89
        os.system(cmd)
1✔
90
        # When the daemon crashes, the following may help debug it:
91
        #   os.system("PYTHONPATH=%s %s %s -s %s %s &" %
92
        #       (self.ppath, self.python, self.zdrun, self.zdsock, args))
93

94
    def _run(self, args, cmdclass=None, module=zdctl):
1✔
95
        if isinstance(args, str):
1!
96
            args = args.split()
1✔
97
        kw = {}
1✔
98
        if cmdclass:
1✔
99
            kw['cmdclass'] = cmdclass
1✔
100
        try:
1✔
101
            module.main(["-s", self.zdsock] + args, **kw)
1✔
102
        except SystemExit:
1✔
103
            pass
1✔
104

105
    def testCmdclassOverride(self):
1✔
106
        class MyCmd(zdctl.ZDCmd):
1✔
107
            def do_sproing(self, rest):
1✔
108
                print(rest)
1✔
109
        self._run("-p echo sproing expected", cmdclass=MyCmd)
1✔
110
        self.expect = "expected\n"
1✔
111

112
    def testSystem(self):
1✔
113
        self.rundaemon(["echo", "-n"])
1✔
114
        self.expect = ""
1✔
115

116
    def test_help_zdrun(self):
1✔
117
        self._run("-h", module=zdrun)
1✔
118
        self.expect = zdrun.__doc__
1✔
119

120
    def test_help_zdctl(self):
1✔
121
        self._run("-h")
1✔
122
        self.expect = zdctl.__doc__
1✔
123

124
    def testOptionsSysArgv(self):
1✔
125
        # Check that options are parsed from sys.argv by default
126
        options = zdrun.ZDRunOptions()
1✔
127
        save_sys_argv = sys.argv
1✔
128
        try:
1✔
129
            sys.argv = ["A", "B", "C"]
1✔
130
            options.realize()
1✔
131
        finally:
132
            sys.argv = save_sys_argv
1✔
133
        self.assertEqual(options.options, [])
1✔
134
        self.assertEqual(options.args, ["B", "C"])
1✔
135

136
    def testOptionsBasic(self):
1✔
137
        # Check basic option parsing
138
        options = zdrun.ZDRunOptions()
1✔
139
        options.realize(["B", "C"], "foo")
1✔
140
        self.assertEqual(options.options, [])
1✔
141
        self.assertEqual(options.args, ["B", "C"])
1✔
142
        self.assertEqual(options.progname, "foo")
1✔
143

144
    def testOptionsHelp(self):
1✔
145
        # Check that -h behaves properly
146
        options = zdrun.ZDRunOptions()
1✔
147
        try:
1✔
148
            options.realize(["-h"], doc=zdrun.__doc__)
1✔
149
        except SystemExit as err:
1✔
150
            self.assertEqual(err.code, 0)
1✔
151
        else:
152
            self.fail("SystemExit expected")
153
        self.expect = zdrun.__doc__
1✔
154

155
    def testSubprocessBasic(self):
1✔
156
        # Check basic subprocess management: spawn, kill, wait
157
        options = zdrun.ZDRunOptions()
1✔
158
        options.realize(["sleep", "100"])
1✔
159
        proc = zdrun.Subprocess(options)
1✔
160
        self.assertEqual(proc.pid, 0)
1✔
161
        pid = proc.spawn()
1✔
162
        self.assertEqual(proc.pid, pid)
1✔
163
        msg = proc.kill(signal.SIGTERM)
1✔
164
        self.assertEqual(msg, None)
1✔
165
        wpid, wsts = os.waitpid(pid, 0)
1✔
166
        self.assertEqual(wpid, pid)
1✔
167
        self.assertEqual(os.WIFSIGNALED(wsts), 1)
1✔
168
        self.assertEqual(os.WTERMSIG(wsts), signal.SIGTERM)
1✔
169
        proc.setstatus(wsts)
1✔
170
        self.assertEqual(proc.pid, 0)
1✔
171

172
    def testEventlogOverride(self):
1✔
173
        # Make sure runner.eventlog is used if it exists
174
        options = ConfiguredZDRunOptions("""\
1✔
175
            <runner>
176
              program /bin/true
177
              <eventlog>
178
                level 42
179
              </eventlog>
180
            </runner>
181

182
            <eventlog>
183
              level 35
184
            </eventlog>
185
            """)
186
        options.realize(["/bin/true"])
1✔
187
        self.assertEqual(options.config_logger.level, 42)
1✔
188

189
    def testEventlogWithoutOverride(self):
1✔
190
        # Make sure eventlog is used if runner.eventlog doesn't exist
191
        options = ConfiguredZDRunOptions("""\
1✔
192
            <runner>
193
              program /bin/true
194
            </runner>
195

196
            <eventlog>
197
              level 35
198
            </eventlog>
199
            """)
200
        options.realize(["/bin/true"])
1✔
201
        self.assertEqual(options.config_logger.level, 35)
1✔
202

203
    def testRunIgnoresParentSignals(self):
1✔
204
        # Spawn a process which will in turn spawn a zdrun process.
205
        # We make sure that the zdrun process is still running even if
206
        # its parent process receives an interrupt signal (it should
207
        # not be passed to zdrun).
208
        tmp = tempfile.mkdtemp()
1✔
209
        zdrun_socket = os.path.join(tmp, 'testsock')
1✔
210
        try:
1✔
211
            zdctlpid = os.spawnvpe(
1✔
212
                os.P_NOWAIT,
213
                sys.executable,
214
                [sys.executable, os.path.join(self.here, 'parent.py'), tmp],
215
                dict(os.environ,
216
                     PYTHONPATH=":".join(sys.path),
217
                     )
218
            )
219
            # Wait for it to start, but no longer than a minute.
220
            deadline = time.time() + 60
1✔
221
            is_started = False
1✔
222
            while time.time() < deadline:
1!
223
                response = send_action('status\n', zdrun_socket)
1✔
224
                if response is None:
1✔
225
                    time.sleep(0.05)
1✔
226
                else:
227
                    is_started = True
1✔
228
                    break
1✔
229
            self.assertTrue(is_started,
1✔
230
                            "spawned process failed to start in a minute")
231
            # Kill it, and wait a little to ensure it's dead.
232
            os.kill(zdctlpid, signal.SIGINT)
1✔
233
            time.sleep(0.25)
1✔
234
            # Make sure the child is still responsive.
235
            response = send_action('status\n', zdrun_socket,
1✔
236
                                   raise_on_error=True)
237
            self.assertTrue(b'\n' in response,
1✔
238
                            'no newline in response: ' + repr(response))
239
            # Kill the process.
240
            send_action('stop\n', zdrun_socket)
1✔
241
        finally:
242
            # Remove the tmp directory.
243
            # Caution:  this is delicate.  The code here used to do
244
            # shutil.rmtree(tmp), but that suffers a sometimes-fatal
245
            # race with zdrun.py.  The 'testsock' socket is created
246
            # by zdrun in the tmp directory, and zdrun tries to
247
            # unlink it.  If shutil.rmtree sees 'testsock' too, it
248
            # will also try to unlink it, but zdrun may complete
249
            # unlinking it before shutil gets to it (there's more
250
            # than one process here).  So, in effect, we code a
251
            # 1-level rmtree inline here, suppressing errors.
252
            for fname in os.listdir(tmp):
1✔
253
                try:
1✔
254
                    os.unlink(os.path.join(tmp, fname))
1✔
255
                except os.error:
×
256
                    pass
×
257
            os.rmdir(tmp)
1✔
258

259
    def testUmask(self):
1✔
260
        # people have a strange tendency to run the tests as root
261
        if os.getuid() == 0:
1✔
262
            self.fail("""
263
I am root!
264
Do not run the tests as root.
265
Testing proper umask handling cannot be done as root.
266
Furthermore, it is not a good idea and strongly discouraged to run zope, the
267
build system (configure, make) or the tests as root.
268
In general do not run anything as root unless absolutely necessary.
269
""")
270

271
        path = tempfile.mktemp()
1✔
272
        # With umask 666, we should create a file that we aren't able
273
        # to write.  If access says no, assume that umask works.
274
        try:
1✔
275
            touch_cmd = "/bin/touch"
1✔
276
            if not os.path.exists(touch_cmd):
1!
277
                touch_cmd = "/usr/bin/touch"  # Mac OS X
×
278
            self.rundaemon(["-m", "666", touch_cmd, path])
1✔
279
            for i in range(5):
1✔
280
                if not os.path.exists(path):
1✔
281
                    time.sleep(0.1)
1✔
282
            self.assertTrue(os.path.exists(path))
1✔
283
            self.assertTrue(not os.access(path, os.W_OK))
1✔
284
        finally:
285
            if os.path.exists(path):
1!
286
                os.remove(path)
1✔
287

288

289
class TestRunnerDirectory(unittest.TestCase):
1✔
290

291
    def setUp(self):
1✔
292
        super().setUp()
1✔
293
        self.root = tempfile.mkdtemp()
1✔
294
        self.save_stdout = sys.stdout
1✔
295
        self.save_stderr = sys.stdout
1✔
296
        sys.stdout = StringIO()
1✔
297
        sys.stderr = StringIO()
1✔
298
        self.expect = ''
1✔
299
        self.cmd = "/bin/true"
1✔
300
        if not os.path.exists(self.cmd):
1!
301
            self.cmd = "/usr/bin/true"  # Mac OS X
×
302

303
    def tearDown(self):
1✔
304
        shutil.rmtree(self.root)
1✔
305
        got = sys.stdout.getvalue()
1✔
306
        err = sys.stderr.getvalue()
1✔
307
        sys.stdout = self.save_stdout
1✔
308
        sys.stderr = self.save_stderr
1✔
309
        if err:
1!
310
            print(err, end='', file=sys.stderr)
×
311
        self.assertEqual(self.expect, got)
1✔
312
        super().tearDown()
1✔
313

314
    def run_ctl(self, opts):
1✔
315
        options = zdctl.ZDCtlOptions()
1✔
316
        options.realize(opts + ['fg'])
1✔
317
        self.expect = self.cmd + '\n'
1✔
318
        proc = zdctl.ZDCmd(options)
1✔
319
        proc.onecmd(" ".join(options.args))
1✔
320

321
    def testCtlRunDirectoryCreation(self):
1✔
322
        path = os.path.join(self.root, 'rundir')
1✔
323
        self.run_ctl(['-z', path, '-p', self.cmd])
1✔
324
        self.assertTrue(os.path.exists(path))
1✔
325

326
    def testCtlRunDirectoryCreationFromConfigFile(self):
1✔
327
        path = os.path.join(self.root, 'rundir')
1✔
328
        options = ['directory ' + path,
1✔
329
                   'program ' + self.cmd]
330
        config = self.writeConfig(
1✔
331
            '<runner>\n%s\n</runner>' % '\n'.join(options))
332
        self.run_ctl(['-C', config])
1✔
333
        self.assertTrue(os.path.exists(path))
1✔
334

335
    def testCtlRunDirectoryCreationOnlyOne(self):
1✔
336
        path = os.path.join(self.root, 'rundir', 'not-created')
1✔
337
        self.assertRaises(SystemExit,
1✔
338
                          self.run_ctl, ['-z', path, '-p', self.cmd])
339
        self.assertFalse(os.path.exists(path))
1✔
340
        got = sys.stderr.getvalue().strip()
1✔
341
        sys.stderr = StringIO()
1✔
342
        self.assertTrue(got.startswith('Error: invalid value for -z'))
1✔
343

344
    def testCtlSocketDirectoryCreation(self):
1✔
345
        path = os.path.join(self.root, 'rundir', 'sock')
1✔
346
        self.run_ctl(['-s', path, '-p', self.cmd])
1✔
347
        self.assertTrue(os.path.exists(os.path.dirname(path)))
1✔
348

349
    def testCtlSocketDirectoryCreationRelativePath(self):
1✔
350
        path = os.path.join('rundir', 'sock')
1✔
351
        self.run_ctl(['-s', path, '-p', self.cmd])
1✔
352
        self.assertTrue(
1✔
353
            os.path.exists(os.path.dirname(os.path.join(os.getcwd(), path))))
354

355
    def testCtlSocketDirectoryCreationOnlyOne(self):
1✔
356
        path = os.path.join(self.root, 'rundir', 'not-created', 'sock')
1✔
357
        self.assertRaises(SystemExit,
1✔
358
                          self.run_ctl, ['-s', path, '-p', self.cmd])
359
        self.assertFalse(os.path.exists(path))
1✔
360
        got = sys.stderr.getvalue().strip()
1✔
361
        sys.stderr = StringIO()
1✔
362
        self.assertTrue(got.startswith('Error: invalid value for -s'))
1✔
363

364
    def testCtlSocketDirectoryCreationFromConfigFile(self):
1✔
365
        path = os.path.join(self.root, 'rundir')
1✔
366
        options = ['socket-name %s/sock' % path,
1✔
367
                   'program ' + self.cmd]
368
        config = self.writeConfig(
1✔
369
            '<runner>\n%s\n</runner>' % '\n'.join(options))
370
        self.run_ctl(['-C', config])
1✔
371
        self.assertTrue(os.path.exists(path))
1✔
372

373
    def testCtlSocketDirectoryCreationFromConfigFileRelativePath(self):
1✔
374
        path = 'rel-rundir'
1✔
375
        options = ['socket-name %s/sock' % path,
1✔
376
                   'program ' + self.cmd]
377
        config = self.writeConfig(
1✔
378
            '<runner>\n%s\n</runner>' % '\n'.join(options))
379
        self.run_ctl(['-C', config])
1✔
380
        self.assertTrue(os.path.exists(os.path.join(os.getcwd(), path)))
1✔
381

382
    def writeConfig(self, config):
1✔
383
        config_file = os.path.join(self.root, 'config')
1✔
384
        with open(config_file, 'w') as f:
1✔
385
            f.write(config)
1✔
386
        return config_file
1✔
387

388
    def testDirectoryChown(self):
1✔
389
        path = os.path.join(self.root, 'foodir')
1✔
390
        options = zdctl.ZDCtlOptions()
1✔
391
        options.realize(['-p', self.cmd, 'status'])
1✔
392
        cmd = zdctl.ZDCmd(options)
1✔
393
        options.uid = 27
1✔
394
        options.gid = 28
1✔
395
        # Patch chown and geteuid, because we're not root
396
        chown = os.chown
1✔
397
        geteuid = os.geteuid
1✔
398
        calls = []
1✔
399

400
        def my_chown(*args):
1✔
401
            calls.append(('chown',) + args)
1✔
402

403
        def my_geteuid():
1✔
404
            return 0
1✔
405

406
        try:
1✔
407
            os.chown = my_chown
1✔
408
            os.geteuid = my_geteuid
1✔
409
            cmd.create_directory(path)
1✔
410
        finally:
411
            os.chown = chown
1✔
412
            os.geteuid = geteuid
1✔
413
        self.assertEqual([('chown', path, 27, 28)], calls)
1✔
414

415

416
def send_action(action, sockname, raise_on_error=False):
1✔
417
    """Send an action to the zdrun server and return the response.
418

419
    Return None if the server is not up or any other error happened.
420
    """
421
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1✔
422
    try:
1✔
423
        sock.connect(sockname)
1✔
424
        sock.send(action.encode() + b"\n")
1✔
425
        sock.shutdown(1)  # We're not writing any more
1✔
426
        response = b""
1✔
427
        while True:
428
            data = sock.recv(1000)
1✔
429
            if not data:
1✔
430
                break
1✔
431
            response += data
1✔
432
        sock.close()
1✔
433
        return response
1✔
434
    except OSError as msg:
1✔
435
        if str(msg) == 'AF_UNIX path too long':
1!
436
            # MacOS has apparent small limits on the length of a UNIX
437
            # domain socket filename, we want to make MacOS users aware
438
            # of the actual problem
439
            raise
×
440
        if raise_on_error:
1!
441
            raise
×
442
        return None
1✔
443
    finally:
444
        sock.close()
1!
445

446

447
def test_suite():
1✔
448
    suite = unittest.TestSuite()
1✔
449
    if os.name == "posix":
1!
450
        loadTestsFromTestCase = (
1✔
451
            unittest.defaultTestLoader.loadTestsFromTestCase)
452
        suite.addTest(loadTestsFromTestCase(ZDaemonTests))
1✔
453
        suite.addTest(loadTestsFromTestCase(TestRunnerDirectory))
1✔
454
    return suite
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc