• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 3956162881

pending completion
3956162881

push

github

Michael Howitz
Update to deprecation warning free releases.

4401 of 7036 branches covered (62.55%)

Branch coverage included in aggregate %.

27161 of 31488 relevant lines covered (86.26%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.78
/src/Zope2/Startup/datatypes.py
1
##############################################################################
2
#
3
# Copyright (c) 2003 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""Datatypes for the Zope schema for use with ZConfig."""
1✔
15

16
import io
1✔
17
import os
1✔
18
import traceback
1✔
19
from collections import UserDict
1✔
20

21
from ZODB.config import ZODBDatabase
1✔
22

23

24
def security_policy_implementation(value):
1✔
25
    value = value.upper()
1✔
26
    ok = ('PYTHON', 'C')
1✔
27
    if value not in ok:
1!
28
        raise ValueError(
×
29
            "security-policy-implementation must be one of %r" % ok)
30
    return value
1✔
31

32

33
def datetime_format(value):
1✔
34
    value = value.lower()
1✔
35
    ok = ('us', 'international')
1✔
36
    if value not in ok:
1!
37
        raise ValueError("datetime-format must be one of %r" % ok)
×
38
    return value
1✔
39

40

41
def environment(section):
1✔
42
    return section.environ
1✔
43

44

45
def mount_point(value):
1✔
46
    # mount-point definition
47
    if not value:
1!
48
        raise ValueError('mount-point must not be empty')
×
49
    if not value.startswith('/'):
1!
50
        raise ValueError("mount-point '%s' is invalid: mount points must "
×
51
                         "begin with a slash" % value)
52
    return value
1✔
53

54

55
def importable_name(name):
1✔
56
    # A datatype that converts a Python dotted-path-name to an object
57
    try:
1✔
58
        components = name.split('.')
1✔
59
        start = components[0]
1✔
60
        g = globals()
1✔
61
        package = __import__(start, g, g)
1✔
62
        modulenames = [start]
1✔
63
        for component in components[1:]:
1✔
64
            modulenames.append(component)
1✔
65
            try:
1✔
66
                package = getattr(package, component)
1✔
67
            except AttributeError:
×
68
                n = '.'.join(modulenames)
×
69
                package = __import__(n, g, g, component)
×
70
        return package
1✔
71
    except ImportError:
72
        IO = io.StringIO()
73
        traceback.print_exc(file=IO)
74
        raise ValueError(
75
            f'The object named by {name!r} could not be imported\n'
76
            f'{IO.getvalue()}')
77

78

79
class ZDaemonEnvironDict(UserDict):
1✔
80
    # zdaemon 2 expects to use a 'mapping' attribute of the environ object.
81

82
    @property
1✔
83
    def mapping(self):
1✔
84
        return self.data
×
85

86

87
def root_wsgi_config(section):
1✔
88
    from ZConfig import ConfigurationError
1✔
89
    from ZConfig.matcher import SectionValue
1✔
90
    if section.environment is None:
1✔
91
        section.environment = ZDaemonEnvironDict()
1✔
92
    if section.clienthome is None:
1!
93
        section.clienthome = os.path.join(section.instancehome, "var")
1✔
94
    if getattr(section, 'pid_filename', None) is None:
1✔
95
        section.pid_filename = os.path.join(section.clienthome, 'Z4.pid')
1✔
96

97
    if not section.databases:
1✔
98
        section.databases = []
1✔
99

100
    mount_factories = {}  # { name -> factory}
1✔
101
    mount_points = {}  # { virtual path -> name }
1✔
102
    dup_err = ('Invalid configuration: ZODB databases named "%s" and "%s" are '
1✔
103
               'both configured to use the same mount point, named "%s"')
104

105
    for database in section.databases:
1✔
106
        points = database.getVirtualMountPaths()
1✔
107
        name = database.config.getSectionName()
1✔
108
        mount_factories[name] = database
1✔
109
        for point in points:
1✔
110
            if point in mount_points:
1!
111
                raise ConfigurationError(dup_err % (mount_points[point],
×
112
                                                    name, point))
113
            mount_points[point] = name
1✔
114
    section.dbtab = DBTab(mount_factories, mount_points)
1✔
115
    pconfigs = {}
1✔
116
    for pconfig in section.product_config:
1!
117
        section_name = pconfig.getSectionName()
×
118
        if isinstance(pconfig, SectionValue):
×
119
            section_type = pconfig.getSectionType()
×
120
            if section_type == 'product-config':
×
121
                pconfigs[section_name] = pconfig.mapping
×
122
            else:
123
                pconfigs[section_name] = pconfig
×
124
        else:
125
            pconfigs[section_name] = pconfig
×
126

127
    section.product_config = pconfigs
1✔
128

129
    return section
1✔
130

131

132
class ZopeDatabase(ZODBDatabase):
1✔
133
    """ A ZODB database datatype that can handle an extended set of
134
    attributes for use by DBTab """
135

136
    def createDB(self, database_name, databases):
1✔
137
        self.config.database_name = database_name
1✔
138
        return ZODBDatabase.open(self, databases)
1✔
139

140
    def open(self, database_name, databases):
1✔
141
        DB = self.createDB(database_name, databases)
1✔
142
        if self.config.connection_class:
1!
143
            # set the connection class
144
            DB.klass = self.config.connection_class
×
145
        if self.config.class_factory is not None:
1!
146
            DB.classFactory = self.config.class_factory
1✔
147
        return DB
1✔
148

149
    def getName(self):
1✔
150
        return self.name
×
151

152
    def computeMountPaths(self):
1✔
153
        mps = []
1✔
154
        for part in self.config.mount_points:
1✔
155
            real_root = None
1✔
156
            if ':' in part:
1!
157
                # 'virtual_path:real_path'
158
                virtual_path, real_path = part.split(':', 1)
×
159
                if real_path.startswith('~'):
×
160
                    # Use a special root.
161
                    # 'virtual_path:~real_root/real_path'
162
                    real_root, real_path = real_path[1:].split('/', 1)
×
163
            else:
164
                # Virtual path is the same as the real path.
165
                virtual_path = real_path = part
1✔
166
            mps.append((virtual_path, real_root, real_path))
1✔
167
        return mps
1✔
168

169
    def getVirtualMountPaths(self):
1✔
170
        return [item[0] for item in self.computeMountPaths()]
1✔
171

172
    def getMountParams(self, mount_path):
1✔
173
        """Returns (real_root, real_path, container_class) for a virtual
174
        mount path.
175
        """
176
        for (virtual_path, real_root, real_path) in self.computeMountPaths():
×
177
            if virtual_path == mount_path:
×
178
                container_class = self.config.container_class
×
179
                if not container_class and virtual_path != '/':
×
180
                    # default to OFS.Folder.Folder for nonroot mounts
181
                    # if one isn't specified in the config
182
                    container_class = 'OFS.Folder.Folder'
×
183
                return (real_root, real_path, container_class)
×
184
        raise LookupError('Nothing known about mount path %s' % mount_path)
×
185

186

187
def default_zpublisher_encoding(value):
1✔
188
    # This is a bit clunky but necessary :-(
189
    # These modules are imported during the configuration process
190
    # so a module-level call to getConfiguration in any of them
191
    # results in getting config data structure without the necessary
192
    # value in it.
193
    from ZPublisher import Converters
1✔
194
    from ZPublisher import HTTPRequest
1✔
195
    from ZPublisher import HTTPResponse
1✔
196
    Converters.default_encoding = value
1✔
197
    HTTPRequest.default_encoding = value
1✔
198
    HTTPRequest.HTTPRequest.charset = value
1✔
199
    HTTPResponse.default_encoding = value
1✔
200
    HTTPResponse.HTTPBaseResponse.charset = value
1✔
201
    return value
1✔
202

203

204
class DBTab:
1✔
205
    """A Zope database configuration, similar in purpose to /etc/fstab.
206
    """
207

208
    def __init__(self, db_factories, mount_paths):
1✔
209
        self.db_factories = db_factories  # { name -> DatabaseFactory }
1✔
210
        self.mount_paths = mount_paths    # { virtual path -> name }
1✔
211
        self.databases = {}               # { name -> DB instance }
1✔
212

213
    def listMountPaths(self):
1✔
214
        """Returns a sequence of (virtual_mount_path, database_name).
215
        """
216
        return list(self.mount_paths.items())
×
217

218
    def listDatabaseNames(self):
1✔
219
        """Returns a sequence of names.
220
        """
221
        return list(self.db_factories.keys())
×
222

223
    def hasDatabase(self, name):
1✔
224
        """Returns true if name is the name of a configured database."""
225
        return name in self.db_factories
×
226

227
    def _mountPathError(self, mount_path):
1✔
228
        from ZConfig import ConfigurationError
×
229
        if mount_path == '/':
×
230
            raise ConfigurationError(
×
231
                "No root database configured")
232
        else:
233
            raise ConfigurationError(
×
234
                "No database configured for mount point at %s"
235
                % mount_path)
236

237
    def getDatabase(self, mount_path=None, name=None, is_root=0):
1✔
238
        """Returns an opened database.  Requires either mount_path or name.
239
        """
240
        if name is None:
1!
241
            name = self.getName(mount_path)
1✔
242
        db = self.databases.get(name, None)
1✔
243
        if db is None:
1!
244
            factory = self.getDatabaseFactory(name=name)
1✔
245
            db = factory.open(name, self.databases)
1✔
246
        return db
1✔
247

248
    def getDatabaseFactory(self, mount_path=None, name=None):
1✔
249
        if name is None:
1!
250
            name = self.getName(mount_path)
×
251
        if name not in self.db_factories:
1!
252
            raise KeyError('%s is not a configured database' % repr(name))
×
253
        return self.db_factories[name]
1✔
254

255
    def getName(self, mount_path):
1✔
256
        name = self.mount_paths.get(mount_path)
1✔
257
        if name is None:
1!
258
            self._mountPathError(mount_path)
×
259
        return name
1✔
260

261

262
def simpleClassFactory(jar, module, name, _silly=('__doc__',), _globals={}):
1✔
263
    """Class factory.
264
    """
265
    m = __import__(module, _globals, _globals, _silly)
×
266
    return getattr(m, name)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc