• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.PluginRegistry / 12348256203

12 Jun 2024 06:30AM UTC coverage: 91.854% (-0.4%) from 92.243%
12348256203

push

github

web-flow
- Add support for Python 3.12.  - Drop support for Python 3.7. (#26)

61 of 82 branches covered (74.39%)

Branch coverage included in aggregate %.

15 of 17 new or added lines in 2 files covered. (88.24%)

3 existing lines in 2 files now uncovered.

796 of 851 relevant lines covered (93.54%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.92
/src/Products/PluginRegistry/PluginRegistry.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
7
# distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE
12
#
13
##############################################################################
14
""" Classes: PluginRegistry
15
"""
16
import logging
1✔
17
import os
1✔
18

19
from AccessControl import ClassSecurityInfo
1✔
20
from AccessControl.class_init import default__class_init__ as InitializeClass
1✔
21
from AccessControl.Permissions import manage_users as ManageUsers
1✔
22
from Acquisition import aq_inner
1✔
23
from Acquisition import aq_parent
1✔
24
from App.Common import package_home
1✔
25
from App.ImageFile import ImageFile
1✔
26
from OFS.interfaces import IWriteLock
1✔
27
from OFS.SimpleItem import SimpleItem
1✔
28
from Persistence import PersistentMapping
1✔
29
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
30
from zope.interface import implementer
1✔
31

32
from Products.PluginRegistry.interfaces import IPluginRegistry
1✔
33

34

35
try:
1✔
36
    from Products.PluginRegistry.exportimport import PluginRegistryExporter
1✔
37
    from Products.PluginRegistry.exportimport import _updatePluginRegistry
1✔
NEW
38
except ModuleNotFoundError:
×
UNCOV
39
    _HAS_GENERIC_SETUP = False
×
40
else:
41
    _HAS_GENERIC_SETUP = True
1✔
42

43
product_dir = package_home(globals())
1✔
44
product_prefix = os.path.split(product_dir)[0]
1✔
45

46
_wwwdir = os.path.join(product_dir, 'www')
1✔
47

48
logger = logging.getLogger('PluginRegistry')
1✔
49

50

51
@implementer(IPluginRegistry, IWriteLock)
1✔
52
class PluginRegistry(SimpleItem):
1✔
53
    """ Implement IPluginRegistry as an independent, ZMI-manageable object.
54

55
    o Each plugin type holds an ordered list of (id, wrapper) tuples.
56
    """
57

58
    security = ClassSecurityInfo()
1✔
59
    meta_type = 'Plugin Registry'
1✔
60
    zmi_icon = 'fas fa-plug'
1✔
61
    _plugins = None
1✔
62

63
    def __init__(self, plugin_type_info=()):
1✔
64

65
        if isinstance(plugin_type_info, str):
1!
66
            # some tool is passing us our ID.
67
            raise ValueError('Must pass a sequence of plugin info dicts!')
×
68

69
        self._plugin_types = [x[0] for x in plugin_type_info]
1✔
70
        self._plugin_type_info = PersistentMapping()
1✔
71
        for interface in plugin_type_info:
1✔
72
            self._plugin_type_info[interface[0]] = {
1✔
73
                'id': interface[1],
74
                'title': interface[2],
75
                'description': interface[3]
76
            }
77

78
    #
79
    #   IPluginRegistry implementation
80
    #
81
    @security.protected(ManageUsers)
1✔
82
    def listPluginTypeInfo(self):
1✔
83
        """ See IPluginRegistry.
84
        """
85
        result = []
1✔
86

87
        for ptype in self._plugin_types:
1✔
88

89
            info = self._plugin_type_info[ptype].copy()
1✔
90
            info['interface'] = ptype
1✔
91
            info['methods'] = list(ptype.names())
1✔
92

93
            result.append(info)
1✔
94

95
        return result
1✔
96

97
    @security.protected(ManageUsers)
1✔
98
    def listPlugins(self, plugin_type):
1✔
99
        """ See IPluginRegistry.
100
        """
101
        result = []
1✔
102

103
        parent = aq_parent(aq_inner(self))
1✔
104

105
        for plugin_id in self._getPlugins(plugin_type):
1✔
106

107
            plugin = parent._getOb(plugin_id)
1✔
108
            if not _satisfies(plugin, plugin_type):
1✔
109
                logger.debug('Active plugin %s no longer implements %s' %
1✔
110
                             (plugin_id, plugin_type))
111
            else:
112
                result.append((plugin_id, plugin))
1✔
113

114
        return result
1✔
115

116
    @security.protected(ManageUsers)
1✔
117
    def getPluginInfo(self, plugin_type):
1✔
118
        """ See IPluginRegistry.
119
        """
120
        plugin_type = self._getInterfaceFromName(plugin_type)
×
121
        return self._plugin_type_info[plugin_type]
×
122

123
    @security.protected(ManageUsers)
1✔
124
    def listPluginIds(self, plugin_type):
1✔
125
        """ See IPluginRegistry.
126
        """
127

128
        return self._getPlugins(plugin_type)
1✔
129

130
    @security.protected(ManageUsers)
1✔
131
    def activatePlugin(self, plugin_type, plugin_id):
1✔
132
        """ See IPluginRegistry.
133
        """
134
        plugins = list(self._getPlugins(plugin_type))
1✔
135

136
        if plugin_id in plugins:
1!
137
            raise KeyError(f'Duplicate plugin id: {plugin_id}')
×
138

139
        parent = aq_parent(aq_inner(self))
1✔
140
        plugin = parent._getOb(plugin_id)
1✔
141

142
        if not _satisfies(plugin, plugin_type):
1✔
143
            raise ValueError(f'Plugin does not implement {plugin_type}')
1✔
144

145
        plugins.append(plugin_id)
1✔
146
        self._plugins[plugin_type] = tuple(plugins)
1✔
147

148
    @security.protected(ManageUsers)
1✔
149
    def deactivatePlugin(self, plugin_type, plugin_id):
1✔
150
        """ See IPluginRegistry.
151
        """
152
        plugins = list(self._getPlugins(plugin_type))
1✔
153

154
        if plugin_id not in plugins:
1!
155
            raise KeyError(f'Invalid plugin id: {plugin_id}')
×
156

157
        plugins = [x for x in plugins if x != plugin_id]
1✔
158
        self._plugins[plugin_type] = tuple(plugins)
1✔
159

160
    @security.protected(ManageUsers)
1✔
161
    def movePluginsTop(self, plugin_type, ids_to_move):
1✔
162
        """ See IPluginRegistry.
163
        """
164
        ids = list(self._getPlugins(plugin_type))
1✔
165
        indexes = list(map(ids.index, ids_to_move))
1✔
166
        indexes.sort()
1✔
167
        for i1 in indexes:
1✔
168
            ids.insert(0, ids.pop(i1))
1✔
169
        self._plugins[plugin_type] = tuple(ids)
1✔
170

171
    @security.protected(ManageUsers)
1✔
172
    def movePluginsUp(self, plugin_type, ids_to_move):
1✔
173
        """ See IPluginRegistry.
174
        """
175
        ids = list(self._getPlugins(plugin_type))
1✔
176
        count = len(ids)
1✔
177

178
        indexes = list(map(ids.index, ids_to_move))
1✔
179
        indexes.sort()
1✔
180

181
        for i1 in indexes:
1✔
182

183
            if i1 < 0 or i1 >= count:
1!
184
                raise IndexError(i1)
×
185

186
            i2 = i1 - 1
1✔
187
            if i2 < 0:
1✔
188
                # i1 is already on top
189
                continue
1✔
190

191
            ids[i2], ids[i1] = ids[i1], ids[i2]
1✔
192

193
        self._plugins[plugin_type] = tuple(ids)
1✔
194

195
    @security.protected(ManageUsers)
1✔
196
    def movePluginsDown(self, plugin_type, ids_to_move):
1✔
197
        """ See IPluginRegistry.
198
        """
199
        ids = list(self._getPlugins(plugin_type))
1✔
200
        count = len(ids)
1✔
201

202
        indexes = list(map(ids.index, ids_to_move))
1✔
203
        indexes.sort()
1✔
204
        indexes.reverse()
1✔
205

206
        for i1 in indexes:
1✔
207

208
            if i1 < 0 or i1 >= count:
1!
209
                raise IndexError(i1)
×
210

211
            i2 = i1 + 1
1✔
212
            if i2 == len(ids):
1✔
213
                # i1 is already on the bottom
214
                continue
1✔
215

216
            ids[i2], ids[i1] = ids[i1], ids[i2]
1✔
217

218
        self._plugins[plugin_type] = tuple(ids)
1✔
219

220
    #
221
    #   ZMI
222
    #
223
    arrow_right_gif = ImageFile('www/arrow-right.gif', globals())
1✔
224
    arrow_left_gif = ImageFile('www/arrow-left.gif', globals())
1✔
225
    arrow_up_gif = ImageFile('www/arrow-up.gif', globals())
1✔
226
    arrow_down_gif = ImageFile('www/arrow-down.gif', globals())
1✔
227

228
    @security.protected(ManageUsers)
1✔
229
    def manage_activatePlugins(self, plugin_type, plugin_ids, RESPONSE):
1✔
230
        """ Shim into ZMI.
231
        """
232
        interface = self._getInterfaceFromName(plugin_type)
×
233
        for id in plugin_ids:
×
234
            self.activatePlugin(interface, id)
×
235
        RESPONSE.redirect('%s/manage_plugins?plugin_type=%s' %
×
236
                          (self.absolute_url(), plugin_type))
237

238
    @security.protected(ManageUsers)
1✔
239
    def manage_deactivatePlugins(self, plugin_type, plugin_ids, RESPONSE):
1✔
240
        """ Shim into ZMI.
241
        """
242
        interface = self._getInterfaceFromName(plugin_type)
×
243
        for id in plugin_ids:
×
244
            self.deactivatePlugin(interface, id)
×
245

246
        RESPONSE.redirect('%s/manage_plugins?plugin_type=%s' %
×
247
                          (self.absolute_url(), plugin_type))
248

249
    @security.protected(ManageUsers)
1✔
250
    def manage_movePluginsUp(self, plugin_type, plugin_ids, RESPONSE):
1✔
251
        """ Shim into ZMI.
252
        """
253
        interface = self._getInterfaceFromName(plugin_type)
×
254
        self.movePluginsUp(interface, plugin_ids)
×
255

256
        RESPONSE.redirect('%s/manage_plugins?plugin_type=%s' %
×
257
                          (self.absolute_url(), plugin_type))
258

259
    @security.protected(ManageUsers)
1✔
260
    def manage_movePluginsDown(self, plugin_type, plugin_ids, RESPONSE):
1✔
261
        """ Shim into ZMI.
262
        """
263
        interface = self._getInterfaceFromName(plugin_type)
×
264
        self.movePluginsDown(interface, plugin_ids)
×
265

266
        RESPONSE.redirect('%s/manage_plugins?plugin_type=%s' %
×
267
                          (self.absolute_url(), plugin_type))
268

269
    @security.protected(ManageUsers)
1✔
270
    def getAllPlugins(self, plugin_type):
1✔
271
        """ Return a mapping segregating active / available plugins.
272

273
        'plugin_type' is the __name__ of the interface.
274
        """
275
        interface = self._getInterfaceFromName(plugin_type)
1✔
276

277
        active = self._getPlugins(interface)
1✔
278
        available = []
1✔
279

280
        for id, value in aq_parent(aq_inner(self)).objectItems():
1✔
281
            if _satisfies(value, interface):
1!
282
                if id not in active:
1✔
283
                    available.append(id)
1✔
284

285
        return {'active': active, 'available': available}
1✔
286

287
    @security.protected(ManageUsers)
1✔
288
    def removePluginById(self, plugin_id):
1✔
289
        """ Remove a plugin from any plugin types which have it configured.
290
        """
291
        for plugin_type in self._plugin_types:
1✔
292

293
            if plugin_id in self._getPlugins(plugin_type):
1!
294
                self.deactivatePlugin(plugin_type, plugin_id)
1✔
295

296
    security.declareProtected(
1✔
297
        ManageUsers,  # noqa: D001
298
        'manage_plugins')
299
    manage_plugins = PageTemplateFile('plugins', _wwwdir)
1✔
300
    security.declareProtected(
1✔
301
        ManageUsers,  # noqa: D001
302
        'manage_active')
303
    manage_active = PageTemplateFile('active_plugins', _wwwdir)
1✔
304
    manage_twoLists = PageTemplateFile('two_lists', _wwwdir)
1✔
305

306
    manage_options = (({
1✔
307
        'label': 'Plugins',
308
        'action': 'manage_plugins'
309
    }, {
310
        'label': 'Active',
311
        'action': 'manage_active'
312
    }) + SimpleItem.manage_options)
313

314
    if _HAS_GENERIC_SETUP:
1!
315
        security.declareProtected(
1✔
316
            ManageUsers,  # noqa: D001
317
            'manage_exportImportForm')
318
        manage_exportImportForm = PageTemplateFile('export_import', _wwwdir)
1✔
319

320
        @security.protected(ManageUsers)
1✔
321
        def getConfigAsXML(self):
1✔
322
            """ Return XML representing the registry's configuration.
323
            """
324
            pre = PluginRegistryExporter(self).__of__(self)
×
325
            return pre.generateXML()
×
326

327
        @security.protected(ManageUsers)
1✔
328
        def manage_exportImport(self, updated_xml, should_purge, RESPONSE):
1✔
329
            """ Parse XML and update the registry.
330
            """
331
            # encoding?
332
            _updatePluginRegistry(self, updated_xml, should_purge)
×
333
            RESPONSE.redirect('%s/manage_exportImportForm'
×
334
                              '?manage_tabs_message=Registry+updated.' %
335
                              self.absolute_url())
336

337
        @security.protected(ManageUsers)
1✔
338
        def manage_FTPget(self, REQUEST, RESPONSE):
1✔
339
            """
340
            """
341
            return self.getConfigAsXML()
×
342

343
        @security.protected(ManageUsers)
1✔
344
        def PUT(self, REQUEST, RESPONSE):
1✔
345
            """
346
            """
347
            xml = REQUEST['BODYFILE'].read()
×
348
            _updatePluginRegistry(self, xml, True)
×
349

350
        manage_options = (manage_options[:2] +
1✔
351
                          ({
352
                              'label': 'Export / Import',
353
                              'action': 'manage_exportImportForm'
354
                          }, ) + manage_options[2:])
355

356
    #
357
    #   Helper methods
358
    #
359
    @security.private
1✔
360
    def _getPlugins(self, plugin_type):
1✔
361

362
        if plugin_type not in self._plugin_types:
1✔
363
            raise KeyError(plugin_type)
1✔
364

365
        if self._plugins is None:
1✔
366
            self._plugins = PersistentMapping()
1✔
367

368
        return self._plugins.setdefault(plugin_type, ())
1✔
369

370
    @security.private
1✔
371
    def _getInterfaceFromName(self, plugin_type_name):
1✔
372
        """ Convert the string name to an interface.
373

374
        o Raise KeyError if no such interface is known.
375
        """
376
        found = [
1✔
377
            x[0] for x in self._plugin_type_info.items()
378
            if x[1]['id'] == plugin_type_name
379
        ]
380
        if not found:
1!
381
            raise KeyError(plugin_type_name)
×
382

383
        if len(found) > 1:
1!
384
            raise KeyError(f'Waaa!:  {plugin_type_name}')
×
385

386
        return found[0]
1✔
387

388

389
InitializeClass(PluginRegistry)
1✔
390

391

392
def _satisfies(plugin, iface):
1✔
393
    checker = getattr(iface, 'providedBy', None)
1✔
394
    return checker(plugin)
1✔
395

396

397
def emptyPluginRegistry(ignored):
1✔
398
    """ Return empty registry, for filling from setup profile.
399
    """
400
    return PluginRegistry(())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc