• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / zope.app.publisher / 16399746590

25 Jun 2025 07:15AM UTC coverage: 98.992%. Remained the same
16399746590

push

github

web-flow
Update Python version support. (#20)

* Drop support for Python 3.8.

39 of 44 branches covered (88.64%)

Branch coverage included in aggregate %.

452 of 452 relevant lines covered (100.0%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.38
/src/zope/app/publisher/xmlrpc/testing.py
1
import http.client as http_client
1✔
2
import xmlrpc.client
1✔
3
from io import BytesIO
1✔
4

5
from zope.app.wsgi.testlayer import http as _http
1✔
6

7

8
class FakeSocket:
1✔
9

10
    def __init__(self, data):
1✔
11
        self.data = data
1✔
12

13
    def makefile(self, mode, bufsize=None):
1✔
14
        assert 'b' in mode
1✔
15
        data = self.data
1✔
16
        assert isinstance(data, bytes)
1✔
17
        return BytesIO(data)
1✔
18

19

20
def http(wsgi_app, query_str, *args, **kwargs):
1✔
21
    # Strip leading \n
22
    query_str = query_str.lstrip()
1✔
23
    kwargs.setdefault('handle_errors', True)
1✔
24
    if not isinstance(query_str, bytes):
1✔
25
        query_str = query_str.encode("utf-8")
1✔
26
    return _http(wsgi_app, query_str, *args, **kwargs)
1✔
27

28

29
class ZopeTestTransport(xmlrpc.client.Transport):
1✔
30
    """xmlrpc.client transport that delegates to
31
    zope.app.wsgi.testlayer.http
32
    It can be used like a normal transport, including support for basic
33
    authentication.
34
    """
35

36
    verbose = False
1✔
37
    handleErrors = True
1✔
38

39
    def request(self, host, handler, request_body, verbose=0):
1✔
40
        request = f"POST {handler} HTTP/1.0\n"
1✔
41
        request += "Content-Length: %i\n" % len(request_body)
1✔
42
        request += "Content-Type: text/xml\n"
1✔
43

44
        host, extra_headers, _x509 = self.get_host_info(host)
1✔
45
        request += "Host: %s\n" % host
1✔
46
        if extra_headers:
1✔
47
            request += "Authorization: {}\n".format(
1✔
48
                dict(extra_headers)["Authorization"])
49

50
        request += "\n"
1✔
51
        if isinstance(request_body, bytes):
1!
52
            request = request.encode("ascii")
1✔
53
        request += request_body
1✔
54
        response = http(
1✔
55
            self.wsgi_app, request, handle_errors=self.handleErrors)
56

57
        errcode = response.getStatus()
1✔
58
        errmsg = response.getStatusString()
1✔
59
        # This is not the same way that the normal transport deals with the
60
        # headers.
61
        headers = response.getHeaders()
1✔
62

63
        if errcode != 200:
1✔
64
            raise xmlrpc.client.ProtocolError(
1✔
65
                host + handler,
66
                errcode, errmsg,
67
                headers)
68

69
        body = response.getBody()
1✔
70
        body = body if isinstance(body, bytes) else body.encode('latin-1')
1✔
71
        errmsg = (errmsg
1✔
72
                  if isinstance(errmsg, bytes)
73
                  else errmsg.encode('ascii'))  # HTTP response lines are ASCII
74
        content = b'HTTP/1.0 ' + errmsg + b'\n\n' + body
1✔
75

76
        res = http_client.HTTPResponse(FakeSocket(content))
1✔
77
        res.begin()
1✔
78
        return self.parse_response(res)
1✔
79

80

81
def ServerProxy(wsgi_app, uri, transport=None, encoding=None,
1✔
82
                verbose=0, allow_none=0, handleErrors=True,
83
                **transport_options):
84
    """A factory that creates a server proxy.
85

86
    If ``transport`` is ``None`` use the ``ZopeTestTransport``, it gets
87
    initialized with ``**transport_options``. Which options are supported
88
    depends on the used Python version, see
89
    ``xmlrpc.client.Transport.__init__`` resp.
90
    ``xmlrpc.client.Transport.__init__`` for details.
91
    """
92
    if transport is None:
1!
93
        transport = ZopeTestTransport(**transport_options)
1✔
94
        transport.wsgi_app = wsgi_app
1✔
95
    if isinstance(transport, ZopeTestTransport):
1!
96
        transport.handleErrors = handleErrors
1✔
97
    return xmlrpc.client.ServerProxy(
1✔
98
        uri, transport, encoding, verbose, allow_none)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc