• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.PluginRegistry / 4062106322

pending completion
4062106322

push

github

GitHub
Merge pull request #25 from zopefoundation/config-with-zope-product-template-10622aef

88 of 112 branches covered (78.57%)

Branch coverage included in aggregate %.

6 of 6 new or added lines in 3 files covered. (100.0%)

802 of 852 relevant lines covered (94.13%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.21
/src/Products/PluginRegistry/PluginRegistry.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
7
# distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE
12
#
13
##############################################################################
14
""" Classes: PluginRegistry
1✔
15
"""
16
import logging
1✔
17
import os
1✔
18

19
from AccessControl import ClassSecurityInfo
1✔
20
from AccessControl.class_init import default__class_init__ as InitializeClass
1✔
21
from AccessControl.Permissions import manage_users as ManageUsers
1✔
22
from Acquisition import aq_inner
1✔
23
from Acquisition import aq_parent
1✔
24
from App.Common import package_home
1✔
25
from App.ImageFile import ImageFile
1✔
26
from OFS.interfaces import IWriteLock
1✔
27
from OFS.SimpleItem import SimpleItem
1✔
28
from Persistence import PersistentMapping
1✔
29
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
30
from zope.interface import implementer
1✔
31

32
from Products.PluginRegistry.interfaces import IPluginRegistry
1✔
33

34

35
try:
1✔
36
    from Products.PluginRegistry.exportimport import PluginRegistryExporter
1✔
37
    from Products.PluginRegistry.exportimport import _updatePluginRegistry
1✔
38
except ImportError:
39
    _HAS_GENERIC_SETUP = False
40
else:
41
    _HAS_GENERIC_SETUP = True
1✔
42

43
product_dir = package_home(globals())
1✔
44
product_prefix = os.path.split(product_dir)[0]
1✔
45

46
_wwwdir = os.path.join(product_dir, 'www')
1✔
47

48
logger = logging.getLogger('PluginRegistry')
1✔
49

50

51
@implementer(IPluginRegistry, IWriteLock)
1✔
52
class PluginRegistry(SimpleItem):
1✔
53

54
    """ Implement IPluginRegistry as an independent, ZMI-manageable object.
55

56
    o Each plugin type holds an ordered list of (id, wrapper) tuples.
57
    """
58

59
    security = ClassSecurityInfo()
1✔
60
    meta_type = 'Plugin Registry'
1✔
61
    zmi_icon = 'fas fa-plug'
1✔
62
    _plugins = None
1✔
63

64
    def __init__(self, plugin_type_info=()):
1✔
65

66
        if isinstance(plugin_type_info, str):
1!
67
            # some tool is passing us our ID.
68
            raise ValueError('Must pass a sequence of plugin info dicts!')
×
69

70
        self._plugin_types = [x[0] for x in plugin_type_info]
1✔
71
        self._plugin_type_info = PersistentMapping()
1✔
72
        for interface in plugin_type_info:
1✔
73
            self._plugin_type_info[interface[0]] = {
1✔
74
                  'id': interface[1],
75
                  'title': interface[2],
76
                  'description': interface[3]}
77

78
    #
79
    #   IPluginRegistry implementation
80
    #
81
    @security.protected(ManageUsers)
1✔
82
    def listPluginTypeInfo(self):
1✔
83

84
        """ See IPluginRegistry.
85
        """
86
        result = []
1✔
87

88
        for ptype in self._plugin_types:
1✔
89

90
            info = self._plugin_type_info[ptype].copy()
1✔
91
            info['interface'] = ptype
1✔
92
            info['methods'] = list(ptype.names())
1✔
93

94
            result.append(info)
1✔
95

96
        return result
1✔
97

98
    @security.protected(ManageUsers)
1✔
99
    def listPlugins(self, plugin_type):
1✔
100

101
        """ See IPluginRegistry.
102
        """
103
        result = []
1✔
104

105
        parent = aq_parent(aq_inner(self))
1✔
106

107
        for plugin_id in self._getPlugins(plugin_type):
1✔
108

109
            plugin = parent._getOb(plugin_id)
1✔
110
            if not _satisfies(plugin, plugin_type):
1✔
111
                logger.debug('Active plugin %s no longer implements %s' %
1✔
112
                             (plugin_id, plugin_type))
113
            else:
114
                result.append((plugin_id, plugin))
1✔
115

116
        return result
1✔
117

118
    @security.protected(ManageUsers)
1✔
119
    def getPluginInfo(self, plugin_type):
1✔
120

121
        """ See IPluginRegistry.
122
        """
123
        plugin_type = self._getInterfaceFromName(plugin_type)
×
124
        return self._plugin_type_info[plugin_type]
×
125

126
    @security.protected(ManageUsers)
1✔
127
    def listPluginIds(self, plugin_type):
1✔
128

129
        """ See IPluginRegistry.
130
        """
131

132
        return self._getPlugins(plugin_type)
1✔
133

134
    @security.protected(ManageUsers)
1✔
135
    def activatePlugin(self, plugin_type, plugin_id):
1✔
136

137
        """ See IPluginRegistry.
138
        """
139
        plugins = list(self._getPlugins(plugin_type))
1✔
140

141
        if plugin_id in plugins:
1!
142
            raise KeyError(f'Duplicate plugin id: {plugin_id}')
×
143

144
        parent = aq_parent(aq_inner(self))
1✔
145
        plugin = parent._getOb(plugin_id)
1✔
146

147
        if not _satisfies(plugin, plugin_type):
1✔
148
            raise ValueError(
1✔
149
                f'Plugin does not implement {plugin_type}')
150

151
        plugins.append(plugin_id)
1✔
152
        self._plugins[plugin_type] = tuple(plugins)
1✔
153

154
    @security.protected(ManageUsers)
1✔
155
    def deactivatePlugin(self, plugin_type, plugin_id):
1✔
156

157
        """ See IPluginRegistry.
158
        """
159
        plugins = list(self._getPlugins(plugin_type))
1✔
160

161
        if plugin_id not in plugins:
1!
162
            raise KeyError(f'Invalid plugin id: {plugin_id}')
×
163

164
        plugins = [x for x in plugins if x != plugin_id]
1✔
165
        self._plugins[plugin_type] = tuple(plugins)
1✔
166

167
    @security.protected(ManageUsers)
1✔
168
    def movePluginsTop(self, plugin_type, ids_to_move):
1✔
169

170
        """ See IPluginRegistry.
171
        """
172
        ids = list(self._getPlugins(plugin_type))
1✔
173
        indexes = list(map(ids.index, ids_to_move))
1✔
174
        indexes.sort()
1✔
175
        for i1 in indexes:
1✔
176
            ids.insert(0, ids.pop(i1))
1✔
177
        self._plugins[plugin_type] = tuple(ids)
1✔
178

179
    @security.protected(ManageUsers)
1✔
180
    def movePluginsUp(self, plugin_type, ids_to_move):
1✔
181

182
        """ See IPluginRegistry.
183
        """
184
        ids = list(self._getPlugins(plugin_type))
1✔
185
        count = len(ids)
1✔
186

187
        indexes = list(map(ids.index, ids_to_move))
1✔
188
        indexes.sort()
1✔
189

190
        for i1 in indexes:
1✔
191

192
            if i1 < 0 or i1 >= count:
1!
193
                raise IndexError(i1)
×
194

195
            i2 = i1 - 1
1✔
196
            if i2 < 0:
1✔
197
                # i1 is already on top
198
                continue
1✔
199

200
            ids[i2], ids[i1] = ids[i1], ids[i2]
1✔
201

202
        self._plugins[plugin_type] = tuple(ids)
1✔
203

204
    @security.protected(ManageUsers)
1✔
205
    def movePluginsDown(self, plugin_type, ids_to_move):
1✔
206

207
        """ See IPluginRegistry.
208
        """
209
        ids = list(self._getPlugins(plugin_type))
1✔
210
        count = len(ids)
1✔
211

212
        indexes = list(map(ids.index, ids_to_move))
1✔
213
        indexes.sort()
1✔
214
        indexes.reverse()
1✔
215

216
        for i1 in indexes:
1✔
217

218
            if i1 < 0 or i1 >= count:
1!
219
                raise IndexError(i1)
×
220

221
            i2 = i1 + 1
1✔
222
            if i2 == len(ids):
1✔
223
                # i1 is already on the bottom
224
                continue
1✔
225

226
            ids[i2], ids[i1] = ids[i1], ids[i2]
1✔
227

228
        self._plugins[plugin_type] = tuple(ids)
1✔
229

230
    #
231
    #   ZMI
232
    #
233
    arrow_right_gif = ImageFile('www/arrow-right.gif', globals())
1✔
234
    arrow_left_gif = ImageFile('www/arrow-left.gif', globals())
1✔
235
    arrow_up_gif = ImageFile('www/arrow-up.gif', globals())
1✔
236
    arrow_down_gif = ImageFile('www/arrow-down.gif', globals())
1✔
237

238
    @security.protected(ManageUsers)
1✔
239
    def manage_activatePlugins(self, plugin_type, plugin_ids, RESPONSE):
1✔
240
        """ Shim into ZMI.
241
        """
242
        interface = self._getInterfaceFromName(plugin_type)
×
243
        for id in plugin_ids:
×
244
            self.activatePlugin(interface, id)
×
245
        RESPONSE.redirect('%s/manage_plugins?plugin_type=%s' %
×
246
                          (self.absolute_url(), plugin_type))
247

248
    @security.protected(ManageUsers)
1✔
249
    def manage_deactivatePlugins(self, plugin_type, plugin_ids, RESPONSE):
1✔
250
        """ Shim into ZMI.
251
        """
252
        interface = self._getInterfaceFromName(plugin_type)
×
253
        for id in plugin_ids:
×
254
            self.deactivatePlugin(interface, id)
×
255

256
        RESPONSE.redirect('%s/manage_plugins?plugin_type=%s' %
×
257
                          (self.absolute_url(), plugin_type))
258

259
    @security.protected(ManageUsers)
1✔
260
    def manage_movePluginsUp(self, plugin_type, plugin_ids, RESPONSE):
1✔
261
        """ Shim into ZMI.
262
        """
263
        interface = self._getInterfaceFromName(plugin_type)
×
264
        self.movePluginsUp(interface, plugin_ids)
×
265

266
        RESPONSE.redirect('%s/manage_plugins?plugin_type=%s' %
×
267
                          (self.absolute_url(), plugin_type))
268

269
    @security.protected(ManageUsers)
1✔
270
    def manage_movePluginsDown(self, plugin_type, plugin_ids, RESPONSE):
1✔
271
        """ Shim into ZMI.
272
        """
273
        interface = self._getInterfaceFromName(plugin_type)
×
274
        self.movePluginsDown(interface, plugin_ids)
×
275

276
        RESPONSE.redirect('%s/manage_plugins?plugin_type=%s' %
×
277
                          (self.absolute_url(), plugin_type))
278

279
    @security.protected(ManageUsers)
1✔
280
    def getAllPlugins(self, plugin_type):
1✔
281

282
        """ Return a mapping segregating active / available plugins.
283

284
        'plugin_type' is the __name__ of the interface.
285
        """
286
        interface = self._getInterfaceFromName(plugin_type)
1✔
287

288
        active = self._getPlugins(interface)
1✔
289
        available = []
1✔
290

291
        for id, value in aq_parent(aq_inner(self)).objectItems():
1✔
292
            if _satisfies(value, interface):
1!
293
                if id not in active:
1✔
294
                    available.append(id)
1✔
295

296
        return {'active': active, 'available': available}
1✔
297

298
    @security.protected(ManageUsers)
1✔
299
    def removePluginById(self, plugin_id):
1✔
300

301
        """ Remove a plugin from any plugin types which have it configured.
302
        """
303
        for plugin_type in self._plugin_types:
1✔
304

305
            if plugin_id in self._getPlugins(plugin_type):
1!
306
                self.deactivatePlugin(plugin_type, plugin_id)
1✔
307

308
    security.declareProtected(ManageUsers,  # noqa: D001
1✔
309
                              'manage_plugins')
310
    manage_plugins = PageTemplateFile('plugins', _wwwdir)
1✔
311
    security.declareProtected(ManageUsers,  # noqa: D001
1✔
312
                              'manage_active')
313
    manage_active = PageTemplateFile('active_plugins', _wwwdir)
1✔
314
    manage_twoLists = PageTemplateFile('two_lists', _wwwdir)
1✔
315

316
    manage_options = (({'label': 'Plugins', 'action': 'manage_plugins'},
1✔
317
                       {'label': 'Active', 'action': 'manage_active'})
318
                      + SimpleItem.manage_options)
319

320
    if _HAS_GENERIC_SETUP:
1!
321
        security.declareProtected(ManageUsers,  # noqa: D001
1✔
322
                                  'manage_exportImportForm')
323
        manage_exportImportForm = PageTemplateFile('export_import', _wwwdir)
1✔
324

325
        @security.protected(ManageUsers)
1✔
326
        def getConfigAsXML(self):
1✔
327
            """ Return XML representing the registry's configuration.
328
            """
329
            pre = PluginRegistryExporter(self).__of__(self)
×
330
            return pre.generateXML()
×
331

332
        @security.protected(ManageUsers)
1✔
333
        def manage_exportImport(self, updated_xml, should_purge, RESPONSE):
1✔
334
            """ Parse XML and update the registry.
335
            """
336
            # encoding?
337
            _updatePluginRegistry(self, updated_xml, should_purge)
×
338
            RESPONSE.redirect('%s/manage_exportImportForm'
×
339
                              '?manage_tabs_message=Registry+updated.' %
340
                              self.absolute_url())
341

342
        @security.protected(ManageUsers)
1✔
343
        def manage_FTPget(self, REQUEST, RESPONSE):
1✔
344
            """
345
            """
346
            return self.getConfigAsXML()
×
347

348
        @security.protected(ManageUsers)
1✔
349
        def PUT(self, REQUEST, RESPONSE):
1✔
350
            """
351
            """
352
            xml = REQUEST['BODYFILE'].read()
×
353
            _updatePluginRegistry(self, xml, True)
×
354

355
        manage_options = (manage_options[:2]
1✔
356
                          + ({'label': 'Export / Import',
357
                              'action': 'manage_exportImportForm'},)
358
                          + manage_options[2:])
359

360
    #
361
    #   Helper methods
362
    #
363
    @security.private
1✔
364
    def _getPlugins(self, plugin_type):
1✔
365

366
        if plugin_type not in self._plugin_types:
1✔
367
            raise KeyError(plugin_type)
1✔
368

369
        if self._plugins is None:
1✔
370
            self._plugins = PersistentMapping()
1✔
371

372
        return self._plugins.setdefault(plugin_type, ())
1✔
373

374
    @security.private
1✔
375
    def _getInterfaceFromName(self, plugin_type_name):
1✔
376

377
        """ Convert the string name to an interface.
378

379
        o Raise KeyError if no such interface is known.
380
        """
381
        found = [x[0] for x in self._plugin_type_info.items()
1✔
382
                 if x[1]['id'] == plugin_type_name]
383
        if not found:
1!
384
            raise KeyError(plugin_type_name)
×
385

386
        if len(found) > 1:
1!
387
            raise KeyError(f'Waaa!:  {plugin_type_name}')
×
388

389
        return found[0]
1✔
390

391

392
InitializeClass(PluginRegistry)
1✔
393

394

395
def _satisfies(plugin, iface):
1✔
396
    checker = getattr(iface, 'providedBy', None)
1✔
397
    return checker(plugin)
1✔
398

399

400
def emptyPluginRegistry(ignored):
1✔
401
    """ Return empty registry, for filling from setup profile.
402
    """
403
    return PluginRegistry(())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc