• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 6263629025

21 Sep 2023 03:12PM UTC coverage: 82.146% (-0.01%) from 82.159%
6263629025

Pull #1164

github

web-flow
[pre-commit.ci lite] apply automatic fixes
Pull Request #1164: Move all linters to pre-commit.

4353 of 6963 branches covered (0.0%)

Branch coverage included in aggregate %.

487 of 487 new or added lines in 186 files covered. (100.0%)

27394 of 31684 relevant lines covered (86.46%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.78
/src/Zope2/Startup/datatypes.py
1
##############################################################################
2
#
3
# Copyright (c) 2003 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""Datatypes for the Zope schema for use with ZConfig."""
1✔
15

16
import io
1✔
17
import os
1✔
18
import traceback
1✔
19
from collections import UserDict
1✔
20

21
from ZODB.config import ZODBDatabase
1✔
22

23

24
def security_policy_implementation(value):
1✔
25
    value = value.upper()
1✔
26
    ok = ('PYTHON', 'C')
1✔
27
    if value not in ok:
1!
28
        raise ValueError(
×
29
            "security-policy-implementation must be one of %r" % ok)
30
    return value
1✔
31

32

33
def datetime_format(value):
1✔
34
    value = value.lower()
1✔
35
    ok = ('us', 'international')
1✔
36
    if value not in ok:
1!
37
        raise ValueError("datetime-format must be one of %r" % ok)
×
38
    return value
1✔
39

40

41
def environment(section):
1✔
42
    return section.environ
1✔
43

44

45
def mount_point(value):
1✔
46
    # mount-point definition
47
    if not value:
1!
48
        raise ValueError('mount-point must not be empty')
×
49
    if not value.startswith('/'):
1!
50
        raise ValueError("mount-point '%s' is invalid: mount points must "
×
51
                         "begin with a slash" % value)
52
    return value
1✔
53

54

55
def importable_name(name):
1✔
56
    # A datatype that converts a Python dotted-path-name to an object
57
    try:
1✔
58
        components = name.split('.')
1✔
59
        start = components[0]
1✔
60
        g = globals()
1✔
61
        package = __import__(start, g, g)
1✔
62
        modulenames = [start]
1✔
63
        for component in components[1:]:
1✔
64
            modulenames.append(component)
1✔
65
            try:
1✔
66
                package = getattr(package, component)
1✔
67
            except AttributeError:
×
68
                n = '.'.join(modulenames)
×
69
                package = __import__(n, g, g, component)
×
70
        return package
1✔
71
    except ImportError:
72
        IO = io.StringIO()
73
        traceback.print_exc(file=IO)
74
        raise ValueError(
75
            f'The object named by {name!r} could not be imported\n'
76
            f'{IO.getvalue()}')
77

78

79
class ZDaemonEnvironDict(UserDict):
1✔
80
    # zdaemon 2 expects to use a 'mapping' attribute of the environ object.
81

82
    @property
1✔
83
    def mapping(self):
1✔
84
        return self.data
×
85

86

87
def root_wsgi_config(section):
1✔
88
    from ZConfig import ConfigurationError
1✔
89
    from ZConfig.matcher import SectionValue
1✔
90
    if section.environment is None:
1✔
91
        section.environment = ZDaemonEnvironDict()
1✔
92
    if section.clienthome is None:
1!
93
        section.clienthome = os.path.join(section.instancehome, "var")
1✔
94
    if getattr(section, 'pid_filename', None) is None:
1✔
95
        section.pid_filename = os.path.join(section.clienthome, 'Z4.pid')
1✔
96

97
    if not section.databases:
1✔
98
        section.databases = []
1✔
99

100
    mount_factories = {}  # { name -> factory}
1✔
101
    mount_points = {}  # { virtual path -> name }
1✔
102
    dup_err = ('Invalid configuration: ZODB databases named "%s" and "%s" are '
1✔
103
               'both configured to use the same mount point, named "%s"')
104

105
    for database in section.databases:
1✔
106
        points = database.getVirtualMountPaths()
1✔
107
        name = database.config.getSectionName()
1✔
108
        mount_factories[name] = database
1✔
109
        for point in points:
1✔
110
            if point in mount_points:
1!
111
                raise ConfigurationError(dup_err % (mount_points[point],
×
112
                                                    name, point))
113
            mount_points[point] = name
1✔
114
    section.dbtab = DBTab(mount_factories, mount_points)
1✔
115
    pconfigs = {}
1✔
116
    for pconfig in section.product_config:
1!
117
        section_name = pconfig.getSectionName()
×
118
        if isinstance(pconfig, SectionValue):
×
119
            section_type = pconfig.getSectionType()
×
120
            if section_type == 'product-config':
×
121
                pconfigs[section_name] = pconfig.mapping
×
122
            else:
123
                pconfigs[section_name] = pconfig
×
124
        else:
125
            pconfigs[section_name] = pconfig
×
126

127
    section.product_config = pconfigs
1✔
128

129
    return section
1✔
130

131

132
class ZopeDatabase(ZODBDatabase):
1✔
133
    """A ZODB database datatype that can handle an extended set of attributes
134
    for use by DBTab."""
135

136
    def createDB(self, database_name, databases):
1✔
137
        self.config.database_name = database_name
1✔
138
        return ZODBDatabase.open(self, databases)
1✔
139

140
    def open(self, database_name, databases):
1✔
141
        DB = self.createDB(database_name, databases)
1✔
142
        if self.config.connection_class:
1!
143
            # set the connection class
144
            DB.klass = self.config.connection_class
×
145
        if self.config.class_factory is not None:
1!
146
            DB.classFactory = self.config.class_factory
1✔
147
        return DB
1✔
148

149
    def getName(self):
1✔
150
        return self.name
×
151

152
    def computeMountPaths(self):
1✔
153
        mps = []
1✔
154
        for part in self.config.mount_points:
1✔
155
            real_root = None
1✔
156
            if ':' in part:
1!
157
                # 'virtual_path:real_path'
158
                virtual_path, real_path = part.split(':', 1)
×
159
                if real_path.startswith('~'):
×
160
                    # Use a special root.
161
                    # 'virtual_path:~real_root/real_path'
162
                    real_root, real_path = real_path[1:].split('/', 1)
×
163
            else:
164
                # Virtual path is the same as the real path.
165
                virtual_path = real_path = part
1✔
166
            mps.append((virtual_path, real_root, real_path))
1✔
167
        return mps
1✔
168

169
    def getVirtualMountPaths(self):
1✔
170
        return [item[0] for item in self.computeMountPaths()]
1✔
171

172
    def getMountParams(self, mount_path):
1✔
173
        """Returns (real_root, real_path, container_class) for a virtual mount
174
        path."""
175
        for (virtual_path, real_root, real_path) in self.computeMountPaths():
×
176
            if virtual_path == mount_path:
×
177
                container_class = self.config.container_class
×
178
                if not container_class and virtual_path != '/':
×
179
                    # default to OFS.Folder.Folder for nonroot mounts
180
                    # if one isn't specified in the config
181
                    container_class = 'OFS.Folder.Folder'
×
182
                return (real_root, real_path, container_class)
×
183
        raise LookupError('Nothing known about mount path %s' % mount_path)
×
184

185

186
def default_zpublisher_encoding(value):
1✔
187
    # This is a bit clunky but necessary :-(
188
    # These modules are imported during the configuration process
189
    # so a module-level call to getConfiguration in any of them
190
    # results in getting config data structure without the necessary
191
    # value in it.
192
    from ZPublisher import Converters
1✔
193
    from ZPublisher import HTTPRequest
1✔
194
    from ZPublisher import HTTPResponse
1✔
195
    Converters.default_encoding = value
1✔
196
    HTTPRequest.default_encoding = value
1✔
197
    HTTPRequest.HTTPRequest.charset = value
1✔
198
    HTTPResponse.default_encoding = value
1✔
199
    HTTPResponse.HTTPBaseResponse.charset = value
1✔
200
    return value
1✔
201

202

203
class DBTab:
1✔
204
    """A Zope database configuration, similar in purpose to /etc/fstab."""
205

206
    def __init__(self, db_factories, mount_paths):
1✔
207
        self.db_factories = db_factories  # { name -> DatabaseFactory }
1✔
208
        self.mount_paths = mount_paths    # { virtual path -> name }
1✔
209
        self.databases = {}               # { name -> DB instance }
1✔
210

211
    def listMountPaths(self):
1✔
212
        """Returns a sequence of (virtual_mount_path, database_name)."""
213
        return list(self.mount_paths.items())
×
214

215
    def listDatabaseNames(self):
1✔
216
        """Returns a sequence of names."""
217
        return list(self.db_factories.keys())
×
218

219
    def hasDatabase(self, name):
1✔
220
        """Returns true if name is the name of a configured database."""
221
        return name in self.db_factories
×
222

223
    def _mountPathError(self, mount_path):
1✔
224
        from ZConfig import ConfigurationError
×
225
        if mount_path == '/':
×
226
            raise ConfigurationError(
×
227
                "No root database configured")
228
        else:
229
            raise ConfigurationError(
×
230
                "No database configured for mount point at %s"
231
                % mount_path)
232

233
    def getDatabase(self, mount_path=None, name=None, is_root=0):
1✔
234
        """Returns an opened database.
235

236
        Requires either mount_path or name.
237
        """
238
        if name is None:
1!
239
            name = self.getName(mount_path)
1✔
240
        db = self.databases.get(name, None)
1✔
241
        if db is None:
1!
242
            factory = self.getDatabaseFactory(name=name)
1✔
243
            db = factory.open(name, self.databases)
1✔
244
        return db
1✔
245

246
    def getDatabaseFactory(self, mount_path=None, name=None):
1✔
247
        if name is None:
1!
248
            name = self.getName(mount_path)
×
249
        if name not in self.db_factories:
1!
250
            raise KeyError('%s is not a configured database' % repr(name))
×
251
        return self.db_factories[name]
1✔
252

253
    def getName(self, mount_path):
1✔
254
        name = self.mount_paths.get(mount_path)
1✔
255
        if name is None:
1!
256
            self._mountPathError(mount_path)
×
257
        return name
1✔
258

259

260
def simpleClassFactory(jar, module, name, _silly=('__doc__',), _globals={}):
1✔
261
    """Class factory."""
262
    m = __import__(module, _globals, _globals, _silly)
×
263
    return getattr(m, name)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc