• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / zdaemon / 12825577578

17 Jan 2025 08:41AM UTC coverage: 79.914% (-0.08%) from 79.992%
12825577578

Pull #37

github

web-flow
Trigger CI
Pull Request #37: Update Python version support.

361 of 512 branches covered (70.51%)

Branch coverage included in aggregate %.

5 of 15 new or added lines in 5 files covered. (33.33%)

5 existing lines in 4 files now uncovered.

1676 of 2037 relevant lines covered (82.28%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.58
/src/zdaemon/tests/testzdrun.py
1
"""Test suite for zdrun.py."""
2

3
import os
1✔
4
import shutil
1✔
5
import signal
1✔
6
import socket
1✔
7
import sys
1✔
8
import tempfile
1✔
9
import time
1✔
10
import unittest
1✔
11
from io import StringIO
1✔
12

13
import ZConfig
1✔
14

15
from zdaemon import zdctl
1✔
16
from zdaemon import zdrun
1✔
17

18

19
class ConfiguredOptions:
1✔
20
    """Options class that loads configuration from a specified string.
21

22
    This always loads from the string, regardless of any -C option
23
    that may be given.
24
    """
25

26
    def set_configuration(self, configuration):
1✔
27
        self.__configuration = configuration
1✔
28
        self.configfile = "<preloaded string>"
1✔
29

30
    def load_configfile(self):
1✔
31
        sio = StringIO(self.__configuration)
1✔
32
        cfg = ZConfig.loadConfigFile(self.schema, sio, self.zconfig_options)
1✔
33
        self.configroot, self.confighandlers = cfg
1✔
34

35

36
class ConfiguredZDRunOptions(ConfiguredOptions, zdrun.ZDRunOptions):
1✔
37

38
    def __init__(self, configuration):
1✔
39
        zdrun.ZDRunOptions.__init__(self)
1✔
40
        self.set_configuration(configuration)
1✔
41

42

43
class ZDaemonTests(unittest.TestCase):
1✔
44

45
    python = os.path.abspath(sys.executable)
1✔
46
    assert os.path.exists(python)
1✔
47
    here = os.path.abspath(os.path.dirname(__file__))
1✔
48
    assert os.path.isdir(here)
1✔
49
    nokill = os.path.join(here, "nokill.py")
1✔
50
    assert os.path.exists(nokill)
1✔
51
    parent = os.path.dirname(here)
1✔
52
    zdrun = os.path.join(parent, "zdrun.py")
1✔
53
    assert os.path.exists(zdrun)
1✔
54

55
    ppath = os.pathsep.join(sys.path)
1✔
56

57
    def setUp(self):
1✔
58
        self.zdsock = tempfile.mktemp()
1✔
59
        self.new_stdout = StringIO()
1✔
60
        self.save_stdout = sys.stdout
1✔
61
        sys.stdout = self.new_stdout
1✔
62
        self.expect = ""
1✔
63

64
    def tearDown(self):
1✔
65
        sys.stdout = self.save_stdout
1✔
66
        for sig in (signal.SIGTERM,
1✔
67
                    signal.SIGHUP,
68
                    signal.SIGINT,
69
                    signal.SIGCHLD):
70
            signal.signal(sig, signal.SIG_DFL)
1✔
71
        try:
1✔
72
            os.unlink(self.zdsock)
1✔
73
        except OSError:
1✔
74
            pass
1✔
75
        output = self.new_stdout.getvalue()
1✔
76
        self.assertEqual(self.expect, output)
1✔
77

78
    def quoteargs(self, args):
1✔
79
        for i in range(len(args)):
1✔
80
            if " " in args[i]:
1!
81
                args[i] = '"%s"' % args[i]
×
82
        return " ".join(args)
1✔
83

84
    def rundaemon(self, args):
1✔
85
        # Add quotes, in case some pathname contains spaces (e.g. Mac OS X)
86
        args = self.quoteargs(args)
1✔
87
        cmd = ('PYTHONPATH="%s" "%s" "%s" -d -s "%s" %s' %
1✔
88
               (self.ppath, self.python, self.zdrun, self.zdsock, args))
89
        os.system(cmd)
1✔
90
        # When the daemon crashes, the following may help debug it:
91
        #   os.system("PYTHONPATH=%s %s %s -s %s %s &" %
92
        #       (self.ppath, self.python, self.zdrun, self.zdsock, args))
93

94
    def _run(self, args, cmdclass=None, module=zdctl):
1✔
95
        if isinstance(args, str):
1!
96
            args = args.split()
1✔
97
        kw = {}
1✔
98
        if cmdclass:
1✔
99
            kw['cmdclass'] = cmdclass
1✔
100
        try:
1✔
101
            module.main(["-s", self.zdsock] + args, **kw)
1✔
102
        except SystemExit:
1✔
103
            pass
1✔
104

105
    def testCmdclassOverride(self):
1✔
106
        class MyCmd(zdctl.ZDCmd):
1✔
107
            def do_sproing(self, rest):
1✔
108
                print(rest)
1✔
109
        self._run("-p echo sproing expected", cmdclass=MyCmd)
1✔
110
        self.expect = "expected\n"
1✔
111

112
    def testSystem(self):
1✔
113
        self.rundaemon(["echo", "-n"])
1✔
114
        self.expect = ""
1✔
115

116
    def test_help_zdrun(self):
1✔
117
        self._run("-h", module=zdrun)
1✔
118
        self.expect = zdrun.__doc__
1✔
119

120
    def test_help_zdctl(self):
1✔
121
        self._run("-h")
1✔
122
        self.expect = zdctl.__doc__
1✔
123

124
    def testOptionsSysArgv(self):
1✔
125
        # Check that options are parsed from sys.argv by default
126
        options = zdrun.ZDRunOptions()
1✔
127
        save_sys_argv = sys.argv
1✔
128
        try:
1✔
129
            sys.argv = ["A", "B", "C"]
1✔
130
            options.realize()
1✔
131
        finally:
132
            sys.argv = save_sys_argv
1✔
133
        self.assertEqual(options.options, [])
1✔
134
        self.assertEqual(options.args, ["B", "C"])
1✔
135

136
    def testOptionsBasic(self):
1✔
137
        # Check basic option parsing
138
        options = zdrun.ZDRunOptions()
1✔
139
        options.realize(["B", "C"], "foo")
1✔
140
        self.assertEqual(options.options, [])
1✔
141
        self.assertEqual(options.args, ["B", "C"])
1✔
142
        self.assertEqual(options.progname, "foo")
1✔
143

144
    def testOptionsHelp(self):
1✔
145
        # Check that -h behaves properly
146
        options = zdrun.ZDRunOptions()
1✔
147
        try:
1✔
148
            options.realize(["-h"], doc=zdrun.__doc__)
1✔
149
        except SystemExit as err:
1✔
150
            self.assertEqual(err.code, 0)
1✔
151
        else:
152
            self.fail("SystemExit expected")
153
        self.expect = zdrun.__doc__
1✔
154

155
    def testSubprocessBasic(self):
1✔
156
        # Check basic subprocess management: spawn, kill, wait
157
        options = zdrun.ZDRunOptions()
1✔
158
        options.realize(["sleep", "100"])
1✔
159
        proc = zdrun.Subprocess(options)
1✔
160
        self.assertEqual(proc.pid, 0)
1✔
161
        pid = proc.spawn()
1✔
162
        self.assertEqual(proc.pid, pid)
1✔
163
        msg = proc.kill(signal.SIGTERM)
1✔
164
        self.assertEqual(msg, None)
1✔
165
        wpid, wsts = os.waitpid(pid, 0)
1✔
166
        self.assertEqual(wpid, pid)
1✔
167
        self.assertEqual(os.WIFSIGNALED(wsts), 1)
1✔
168
        self.assertEqual(os.WTERMSIG(wsts), signal.SIGTERM)
1✔
169
        proc.setstatus(wsts)
1✔
170
        self.assertEqual(proc.pid, 0)
1✔
171

172
    def testEventlogOverride(self):
1✔
173
        # Make sure runner.eventlog is used if it exists
174
        options = ConfiguredZDRunOptions("""\
1✔
175
            <runner>
176
              program /bin/true
177
              <eventlog>
178
                level 42
179
              </eventlog>
180
            </runner>
181

182
            <eventlog>
183
              level 35
184
            </eventlog>
185
            """)
186
        options.realize(["/bin/true"])
1✔
187
        self.assertEqual(options.config_logger.level, 42)
1✔
188

189
    def testEventlogWithoutOverride(self):
1✔
190
        # Make sure eventlog is used if runner.eventlog doesn't exist
191
        options = ConfiguredZDRunOptions("""\
1✔
192
            <runner>
193
              program /bin/true
194
            </runner>
195

196
            <eventlog>
197
              level 35
198
            </eventlog>
199
            """)
200
        options.realize(["/bin/true"])
1✔
201
        self.assertEqual(options.config_logger.level, 35)
1✔
202

203
    def testRunIgnoresParentSignals(self):
1✔
204
        # Spawn a process which will in turn spawn a zdrun process.
205
        # We make sure that the zdrun process is still running even if
206
        # its parent process receives an interrupt signal (it should
207
        # not be passed to zdrun).
208
        tmp = tempfile.mkdtemp()
1✔
209
        zdrun_socket = os.path.join(tmp, 'testsock')
1✔
210
        try:
1✔
211
            zdctlpid = os.spawnvpe(
1✔
212
                os.P_NOWAIT,
213
                sys.executable,
214
                [sys.executable, os.path.join(self.here, 'parent.py'), tmp],
215
                dict(os.environ,
216
                     PYTHONPATH=":".join(sys.path),
217
                     )
218
            )
219
            # Wait for it to start, but no longer than a minute.
220
            deadline = time.time() + 60
1✔
221
            is_started = False
1✔
222
            while time.time() < deadline:
1!
223
                response = send_action('status\n', zdrun_socket)
1✔
224
                if response is None:
1✔
225
                    time.sleep(0.05)
1✔
226
                else:
227
                    is_started = True
1✔
228
                    break
1✔
229
            self.assertTrue(is_started,
1✔
230
                            "spawned process failed to start in a minute")
231
            # Kill it, and wait a little to ensure it's dead.
232
            os.kill(zdctlpid, signal.SIGINT)
1✔
233
            time.sleep(0.25)
1✔
234
            # Make sure the child is still responsive.
235
            response = send_action('status\n', zdrun_socket,
1✔
236
                                   raise_on_error=True)
237
            self.assertIn(
1✔
238
                b'\n',
239
                response,
240
                'no newline in response: ' + repr(response)
241
            )
242
            # Kill the process.
243
            send_action('stop\n', zdrun_socket)
1✔
244
        finally:
245
            # Remove the tmp directory.
246
            # Caution:  this is delicate.  The code here used to do
247
            # shutil.rmtree(tmp), but that suffers a sometimes-fatal
248
            # race with zdrun.py.  The 'testsock' socket is created
249
            # by zdrun in the tmp directory, and zdrun tries to
250
            # unlink it.  If shutil.rmtree sees 'testsock' too, it
251
            # will also try to unlink it, but zdrun may complete
252
            # unlinking it before shutil gets to it (there's more
253
            # than one process here).  So, in effect, we code a
254
            # 1-level rmtree inline here, suppressing errors.
255
            for fname in os.listdir(tmp):
1✔
256
                try:
1✔
257
                    os.unlink(os.path.join(tmp, fname))
1✔
NEW
258
                except OSError:
×
259
                    pass
×
260
            os.rmdir(tmp)
1✔
261

262
    def testUmask(self):
1✔
263
        # people have a strange tendency to run the tests as root
264
        if os.getuid() == 0:
1✔
265
            self.fail("""
266
I am root!
267
Do not run the tests as root.
268
Testing proper umask handling cannot be done as root.
269
Furthermore, it is not a good idea and strongly discouraged to run zope, the
270
build system (configure, make) or the tests as root.
271
In general do not run anything as root unless absolutely necessary.
272
""")
273

274
        path = tempfile.mktemp()
1✔
275
        # With umask 666, we should create a file that we aren't able
276
        # to write.  If access says no, assume that umask works.
277
        try:
1✔
278
            touch_cmd = "/bin/touch"
1✔
279
            if not os.path.exists(touch_cmd):
1!
280
                touch_cmd = "/usr/bin/touch"  # Mac OS X
×
281
            self.rundaemon(["-m", "666", touch_cmd, path])
1✔
282
            for i in range(5):
1✔
283
                if not os.path.exists(path):
1✔
284
                    time.sleep(0.1)
1✔
285
            self.assertTrue(os.path.exists(path))
1✔
286
            self.assertTrue(not os.access(path, os.W_OK))
1✔
287
        finally:
288
            if os.path.exists(path):
1!
289
                os.remove(path)
1✔
290

291

292
class TestRunnerDirectory(unittest.TestCase):
1✔
293

294
    def setUp(self):
1✔
295
        super().setUp()
1✔
296
        self.root = tempfile.mkdtemp()
1✔
297
        self.save_stdout = sys.stdout
1✔
298
        self.save_stderr = sys.stdout
1✔
299
        sys.stdout = StringIO()
1✔
300
        sys.stderr = StringIO()
1✔
301
        self.expect = ''
1✔
302
        self.cmd = "/bin/true"
1✔
303
        if not os.path.exists(self.cmd):
1!
304
            self.cmd = "/usr/bin/true"  # Mac OS X
×
305

306
    def tearDown(self):
1✔
307
        shutil.rmtree(self.root)
1✔
308
        got = sys.stdout.getvalue()
1✔
309
        err = sys.stderr.getvalue()
1✔
310
        sys.stdout = self.save_stdout
1✔
311
        sys.stderr = self.save_stderr
1✔
312
        if err:
1!
313
            print(err, end='', file=sys.stderr)
×
314
        self.assertEqual(self.expect, got)
1✔
315
        super().tearDown()
1✔
316

317
    def run_ctl(self, opts):
1✔
318
        options = zdctl.ZDCtlOptions()
1✔
319
        options.realize(opts + ['fg'])
1✔
320
        self.expect = self.cmd + '\n'
1✔
321
        proc = zdctl.ZDCmd(options)
1✔
322
        proc.onecmd(" ".join(options.args))
1✔
323

324
    def testCtlRunDirectoryCreation(self):
1✔
325
        path = os.path.join(self.root, 'rundir')
1✔
326
        self.run_ctl(['-z', path, '-p', self.cmd])
1✔
327
        self.assertTrue(os.path.exists(path))
1✔
328

329
    def testCtlRunDirectoryCreationFromConfigFile(self):
1✔
330
        path = os.path.join(self.root, 'rundir')
1✔
331
        options = ['directory ' + path,
1✔
332
                   'program ' + self.cmd]
333
        config = self.writeConfig(
1✔
334
            '<runner>\n%s\n</runner>' % '\n'.join(options))
335
        self.run_ctl(['-C', config])
1✔
336
        self.assertTrue(os.path.exists(path))
1✔
337

338
    def testCtlRunDirectoryCreationOnlyOne(self):
1✔
339
        path = os.path.join(self.root, 'rundir', 'not-created')
1✔
340
        self.assertRaises(SystemExit,
1✔
341
                          self.run_ctl, ['-z', path, '-p', self.cmd])
342
        self.assertFalse(os.path.exists(path))
1✔
343
        got = sys.stderr.getvalue().strip()
1✔
344
        sys.stderr = StringIO()
1✔
345
        self.assertTrue(got.startswith('Error: invalid value for -z'))
1✔
346

347
    def testCtlSocketDirectoryCreation(self):
1✔
348
        path = os.path.join(self.root, 'rundir', 'sock')
1✔
349
        self.run_ctl(['-s', path, '-p', self.cmd])
1✔
350
        self.assertTrue(os.path.exists(os.path.dirname(path)))
1✔
351

352
    def testCtlSocketDirectoryCreationRelativePath(self):
1✔
353
        path = os.path.join('rundir', 'sock')
1✔
354
        self.run_ctl(['-s', path, '-p', self.cmd])
1✔
355
        self.assertTrue(
1✔
356
            os.path.exists(os.path.dirname(os.path.join(os.getcwd(), path))))
357

358
    def testCtlSocketDirectoryCreationOnlyOne(self):
1✔
359
        path = os.path.join(self.root, 'rundir', 'not-created', 'sock')
1✔
360
        self.assertRaises(SystemExit,
1✔
361
                          self.run_ctl, ['-s', path, '-p', self.cmd])
362
        self.assertFalse(os.path.exists(path))
1✔
363
        got = sys.stderr.getvalue().strip()
1✔
364
        sys.stderr = StringIO()
1✔
365
        self.assertTrue(got.startswith('Error: invalid value for -s'))
1✔
366

367
    def testCtlSocketDirectoryCreationFromConfigFile(self):
1✔
368
        path = os.path.join(self.root, 'rundir')
1✔
369
        options = ['socket-name %s/sock' % path,
1✔
370
                   'program ' + self.cmd]
371
        config = self.writeConfig(
1✔
372
            '<runner>\n%s\n</runner>' % '\n'.join(options))
373
        self.run_ctl(['-C', config])
1✔
374
        self.assertTrue(os.path.exists(path))
1✔
375

376
    def testCtlSocketDirectoryCreationFromConfigFileRelativePath(self):
1✔
377
        path = 'rel-rundir'
1✔
378
        options = ['socket-name %s/sock' % path,
1✔
379
                   'program ' + self.cmd]
380
        config = self.writeConfig(
1✔
381
            '<runner>\n%s\n</runner>' % '\n'.join(options))
382
        self.run_ctl(['-C', config])
1✔
383
        self.assertTrue(os.path.exists(os.path.join(os.getcwd(), path)))
1✔
384

385
    def writeConfig(self, config):
1✔
386
        config_file = os.path.join(self.root, 'config')
1✔
387
        with open(config_file, 'w') as f:
1✔
388
            f.write(config)
1✔
389
        return config_file
1✔
390

391
    def testDirectoryChown(self):
1✔
392
        path = os.path.join(self.root, 'foodir')
1✔
393
        options = zdctl.ZDCtlOptions()
1✔
394
        options.realize(['-p', self.cmd, 'status'])
1✔
395
        cmd = zdctl.ZDCmd(options)
1✔
396
        options.uid = 27
1✔
397
        options.gid = 28
1✔
398
        # Patch chown and geteuid, because we're not root
399
        chown = os.chown
1✔
400
        geteuid = os.geteuid
1✔
401
        calls = []
1✔
402

403
        def my_chown(*args):
1✔
404
            calls.append(('chown',) + args)
1✔
405

406
        def my_geteuid():
1✔
407
            return 0
1✔
408

409
        try:
1✔
410
            os.chown = my_chown
1✔
411
            os.geteuid = my_geteuid
1✔
412
            cmd.create_directory(path)
1✔
413
        finally:
414
            os.chown = chown
1✔
415
            os.geteuid = geteuid
1✔
416
        self.assertEqual([('chown', path, 27, 28)], calls)
1✔
417

418

419
def send_action(action, sockname, raise_on_error=False):
1✔
420
    """Send an action to the zdrun server and return the response.
421

422
    Return None if the server is not up or any other error happened.
423
    """
424
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1✔
425
    try:
1✔
426
        sock.connect(sockname)
1✔
427
        sock.send(action.encode() + b"\n")
1✔
428
        sock.shutdown(1)  # We're not writing any more
1✔
429
        response = b""
1✔
430
        while True:
1✔
431
            data = sock.recv(1000)
1✔
432
            if not data:
1✔
433
                break
1✔
434
            response += data
1✔
435
        sock.close()
1✔
436
        return response
1✔
437
    except OSError as msg:
1✔
438
        if str(msg) == 'AF_UNIX path too long':
1!
439
            # MacOS has apparent small limits on the length of a UNIX
440
            # domain socket filename, we want to make MacOS users aware
441
            # of the actual problem
442
            raise
×
443
        if raise_on_error:
1!
444
            raise
×
445
        return None
1✔
446
    finally:
447
        sock.close()
1✔
448

449

450
def test_suite():
1✔
451
    suite = unittest.TestSuite()
1✔
452
    if os.name == "posix":
1!
453
        loadTestsFromTestCase = (
1✔
454
            unittest.defaultTestLoader.loadTestsFromTestCase)
455
        suite.addTest(loadTestsFromTestCase(ZDaemonTests))
1✔
456
        suite.addTest(loadTestsFromTestCase(TestRunnerDirectory))
1✔
457
    return suite
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc