• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.CMFCore / 6246931310

20 Sep 2023 09:54AM UTC coverage: 86.008% (-0.3%) from 86.266%
6246931310

Pull #131

github

mauritsvanrees
gha: don't need setup-python on 27 as we use the 27 container.
Pull Request #131: Make decodeFolderFilter and encodeFolderFilter non-public.

2466 of 3689 branches covered (0.0%)

Branch coverage included in aggregate %.

6 of 6 new or added lines in 1 file covered. (100.0%)

17297 of 19289 relevant lines covered (89.67%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.8
/src/Products/CMFCore/WorkflowTool.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13
""" Basic workflow tool.
1✔
14
"""
15

16
import sys
1✔
17

18
import six
1✔
19

20
from AccessControl.class_init import InitializeClass
1✔
21
from AccessControl.requestmethod import postonly
1✔
22
from AccessControl.SecurityInfo import ClassSecurityInfo
1✔
23
from Acquisition import aq_base
1✔
24
from Acquisition import aq_inner
1✔
25
from Acquisition import aq_parent
1✔
26
from App.special_dtml import DTMLFile
1✔
27
from OFS.Folder import Folder
1✔
28
from OFS.ObjectManager import IFAwareObjectManager
1✔
29
from Persistence import PersistentMapping
1✔
30
from zope.component import adapter
1✔
31
from zope.component import adapts
1✔
32
from zope.component import getMultiAdapter
1✔
33
from zope.component import queryMultiAdapter
1✔
34
from zope.component import queryUtility
1✔
35
from zope.event import notify
1✔
36
from zope.interface import implementer
1✔
37

38
from .ActionProviderBase import ActionProviderBase
1✔
39
from .interfaces import IConfigurableWorkflowTool
1✔
40
from .interfaces import ITypesTool
1✔
41
from .interfaces import IWorkflowAware
1✔
42
from .interfaces import IWorkflowDefinition
1✔
43
from .interfaces import IWorkflowHistory
1✔
44
from .interfaces import IWorkflowStatus
1✔
45
from .interfaces import IWorkflowTool
1✔
46
from .permissions import ManagePortal
1✔
47
from .utils import Message as _
1✔
48
from .utils import UniqueObject
1✔
49
from .utils import _dtmldir
1✔
50
from .utils import registerToolInterface
1✔
51
from .WorkflowCore import ActionRaisedExceptionEvent
1✔
52
from .WorkflowCore import ActionSucceededEvent
1✔
53
from .WorkflowCore import ActionWillBeInvokedEvent
1✔
54
from .WorkflowCore import ObjectDeleted
1✔
55
from .WorkflowCore import ObjectMoved
1✔
56
from .WorkflowCore import WorkflowException
1✔
57

58

59
_marker = []  # Create a new marker object.
1✔
60

61

62
@implementer(IConfigurableWorkflowTool, IWorkflowTool)
1✔
63
class WorkflowTool(UniqueObject, IFAwareObjectManager, Folder,
1✔
64
                   ActionProviderBase):
65

66
    """ Mediator tool, mapping workflow objects
67
    """
68

69
    id = 'portal_workflow'
1✔
70
    meta_type = 'CMF Workflow Tool'
1✔
71
    _product_interfaces = (IWorkflowDefinition,)
1✔
72

73
    _chains_by_type = None  # PersistentMapping
1✔
74
    _default_chain = ('default_workflow',)
1✔
75
    _default_cataloging = 1
1✔
76

77
    security = ClassSecurityInfo()
1✔
78

79
    manage_options = (
1✔
80
        ({'label': 'Workflows', 'action': 'manage_selectWorkflows'},
81
         {'label': 'Overview', 'action': 'manage_overview'}) +
82
        Folder.manage_options)
83

84
    #
85
    #   ZMI methods
86
    #
87
    security.declareProtected(ManagePortal, 'manage_overview')
1✔
88
    manage_overview = DTMLFile('explainWorkflowTool', _dtmldir)
1✔
89

90
    _manage_selectWorkflows = DTMLFile('selectWorkflows', _dtmldir)
1✔
91

92
    @security.protected(ManagePortal)
1✔
93
    def manage_selectWorkflows(self, REQUEST, manage_tabs_message=None):
1✔
94

95
        """ Show a management screen for changing type to workflow connections.
96
        """
97
        cbt = self._chains_by_type
×
98
        ti = self._listTypeInfo()
×
99
        types_info = []
×
100
        for t in ti:
×
101
            id = t.getId()
×
102
            title = t.Title()
×
103
            if title == id:
×
104
                title = None
×
105
            if cbt is not None and id in cbt:
×
106
                chain = ', '.join(cbt[id])
×
107
            else:
108
                chain = '(Default)'
×
109
            types_info.append({'id': id,
×
110
                               'title': title,
111
                               'chain': chain})
112
        return self._manage_selectWorkflows(
×
113
            REQUEST,
114
            default_chain=', '.join(self._default_chain),
115
            types_info=types_info,
116
            management_view='Workflows',
117
            manage_tabs_message=manage_tabs_message)
118

119
    @security.protected(ManagePortal)
1✔
120
    @postonly
1✔
121
    def manage_changeWorkflows(self, default_chain, props=None, REQUEST=None):
1✔
122
        """ Changes which workflows apply to objects of which type.
123
        """
124
        if props is None:
×
125
            props = REQUEST
×
126
        cbt = self._chains_by_type
×
127
        if cbt is None:
×
128
            self._chains_by_type = cbt = PersistentMapping()
×
129
        ti = self._listTypeInfo()
×
130
        # Set up the chains by type.
131
        if not (props is None):
×
132
            for t in ti:
×
133
                id = t.getId()
×
134
                field_name = 'chain_%s' % id
×
135
                chain = props.get(field_name, '(Default)').strip()
×
136
                if chain == '(Default)':
×
137
                    # Remove from cbt.
138
                    if id in cbt:
×
139
                        del cbt[id]
×
140
                else:
141
                    chain = chain.replace(',', ' ')
×
142
                    ids = []
×
143
                    for wf_id in chain.split(' '):
×
144
                        if wf_id:
×
145
                            if not self.getWorkflowById(wf_id):
×
146
                                raise ValueError('"%s" is not a workflow ID.'
×
147
                                                 % wf_id)
148
                            ids.append(wf_id)
×
149
                    cbt[id] = tuple(ids)
×
150
        # Set up the default chain.
151
        default_chain = default_chain.replace(',', ' ')
×
152
        ids = []
×
153
        for wf_id in default_chain.split(' '):
×
154
            if wf_id:
×
155
                if not self.getWorkflowById(wf_id):
×
156
                    raise ValueError('"%s" is not a workflow ID.' % wf_id)
×
157
                ids.append(wf_id)
×
158
        self._default_chain = tuple(ids)
×
159
        if REQUEST is not None:
×
160
            return self.manage_selectWorkflows(REQUEST,
×
161
                                               manage_tabs_message='Changed.')
162

163
    #
164
    #   'IActionProvider' interface methods
165
    #
166
    @security.private
1✔
167
    def listActions(self, info=None, object=None):
1✔
168

169
        """ Returns a list of actions to be displayed to the user.
170

171
        o Invoked by the portal_actions tool.
172

173
        o Allows workflows to include actions to be displayed in the
174
          actions box.
175

176
        o Object actions are supplied by workflows that apply to the object.
177

178
        o Global actions are supplied by all workflows.
179
        """
180
        if object is not None or info is None:
×
181
            info = self._getOAI(object)
×
182
        chain = self.getChainFor(info.object)
×
183
        did = {}
×
184
        actions = []
×
185

186
        for wf_id in chain:
×
187
            did[wf_id] = 1
×
188
            wf = self.getWorkflowById(wf_id)
×
189
            if wf is not None:
×
190
                a = wf.listObjectActions(info)
×
191
                if a is not None:
×
192
                    actions.extend(a)
×
193
                a = wf.listGlobalActions(info)
×
194
                if a is not None:
×
195
                    actions.extend(a)
×
196

197
        wf_ids = self.getWorkflowIds()
×
198
        for wf_id in wf_ids:
×
199
            if wf_id not in did:
×
200
                wf = self.getWorkflowById(wf_id)
×
201
                if wf is not None:
×
202
                    a = wf.listGlobalActions(info)
×
203
                    if a is not None:
×
204
                        actions.extend(a)
×
205
        return actions
×
206

207
    #
208
    #   'IWorkflowTool' interface methods
209
    #
210
    @security.private
1✔
211
    def getCatalogVariablesFor(self, ob):
1✔
212
        """ Get a mapping of "workflow-relevant" attributes.
213
        """
214
        wfs = self.getWorkflowsFor(ob)
1✔
215
        if wfs is None:
1!
216
            return None
×
217
        # Iterate through the workflows backwards so that
218
        # earlier workflows can override later workflows.
219
        wfs.reverse()
1✔
220
        vars = {}
1✔
221
        for wf in wfs:
1✔
222
            v = wf.getCatalogVariablesFor(ob)
1✔
223
            if v is not None:
1!
224
                vars.update(v)
1✔
225
        return vars
1✔
226

227
    @security.public
1✔
228
    def doActionFor(self, ob, action, wf_id=None, *args, **kw):
1✔
229
        """ Perform the given workflow action on 'ob'.
230
        """
231
        wfs = self.getWorkflowsFor(ob)
1✔
232
        if wfs is None:
1!
233
            wfs = ()
×
234
        if wf_id is None:
1!
235
            if not wfs:
1✔
236
                raise WorkflowException(_(u'No workflows found.'))
1✔
237
            found = 0
1✔
238
            for wf in wfs:
1!
239
                if wf.isActionSupported(ob, action, **kw):
1!
240
                    found = 1
1✔
241
                    break
1✔
242
            if not found:
1!
243
                msg = _(u"No workflow provides the '${action_id}' action.",
×
244
                        mapping={'action_id': action})
245
                raise WorkflowException(msg)
×
246
        else:
247
            wf = self.getWorkflowById(wf_id)
×
248
            if wf is None:
×
249
                raise WorkflowException(
×
250
                    _(u'Requested workflow definition not found.'))
251
        return self._invokeWithNotification(
1✔
252
            wfs, ob, action, wf.doActionFor, (ob, action) + args, kw)
253

254
    @security.public
1✔
255
    def getInfoFor(self, ob, name, default=_marker, wf_id=None, *args, **kw):
1✔
256
        """ Get the given bit of workflow information for the object.
257
        """
258
        if wf_id is None:
1!
259
            wfs = self.getWorkflowsFor(ob)
1✔
260
            if wfs is None:
1!
261
                if default is _marker:
×
262
                    raise WorkflowException(_(u'No workflows found.'))
×
263
                else:
264
                    return default
×
265
            found = 0
1✔
266
            for wf in wfs:
1✔
267
                if wf.isInfoSupported(ob, name):
1✔
268
                    found = 1
1✔
269
                    break
1✔
270
            if not found:
1✔
271
                if default is _marker:
1!
272
                    msg = _(u"No workflow provides '${name}' information.",
1✔
273
                            mapping={'name': name})
274
                    raise WorkflowException(msg)
1✔
275
                else:
276
                    return default
×
277
        else:
278
            wf = self.getWorkflowById(wf_id)
×
279
            if wf is None:
×
280
                if default is _marker:
×
281
                    raise WorkflowException(
×
282
                        _(u'Requested workflow definition not found.'))
283
                else:
284
                    return default
×
285
        res = wf.getInfoFor(ob, name, default, *args, **kw)
1✔
286
        if res is _marker:
1!
287
            msg = _(u'Could not get info: ${name}', mapping={'name': name})
×
288
            raise WorkflowException(msg)
×
289
        return res
1✔
290

291
    @security.private
1✔
292
    def notifyCreated(self, ob):
1✔
293
        """ Notify all applicable workflows that an object has been created.
294
        """
295
        wfs = self.getWorkflowsFor(ob)
1✔
296
        for wf in wfs:
1✔
297
            if self.getHistoryOf(wf.getId(), ob):
1✔
298
                continue
1✔
299
            wf.notifyCreated(ob)
1✔
300
        self._reindexWorkflowVariables(ob)
1✔
301

302
    @security.private
1✔
303
    def notifyBefore(self, ob, action):
1✔
304
        """ Notify all applicable workflows of an action before it happens.
305
        """
306
        wfs = self.getWorkflowsFor(ob)
1✔
307
        for wf in wfs:
1✔
308
            wf.notifyBefore(ob, action)
1✔
309
            notify(ActionWillBeInvokedEvent(ob, wf, action))
1✔
310

311
    @security.private
1✔
312
    def notifySuccess(self, ob, action, result=None):
1✔
313
        """ Notify all applicable workflows that an action has taken place.
314
        """
315
        wfs = self.getWorkflowsFor(ob)
1✔
316
        for wf in wfs:
1✔
317
            wf.notifySuccess(ob, action, result)
1✔
318
            notify(ActionSucceededEvent(ob, wf, action, result))
1✔
319

320
    @security.private
1✔
321
    def notifyException(self, ob, action, exc):
1✔
322
        """ Notify all applicable workflows that an action failed.
323
        """
324
        wfs = self.getWorkflowsFor(ob)
1✔
325
        for wf in wfs:
1✔
326
            wf.notifyException(ob, action, exc)
1✔
327
            notify(ActionRaisedExceptionEvent(ob, wf, action, exc))
1✔
328

329
    @security.private
1✔
330
    def getHistoryOf(self, wf_id, ob):
1✔
331
        """ Get the history of an object for a given workflow.
332
        """
333
        wf = self.getWorkflowById(wf_id)
1✔
334
        return queryMultiAdapter((ob, wf), IWorkflowHistory, default=())
1✔
335

336
    @security.private
1✔
337
    def getStatusOf(self, wf_id, ob):
1✔
338
        """ Get the last element of a workflow history for a given workflow.
339
        """
340
        wf = self.getWorkflowById(wf_id)
1✔
341
        wfs = queryMultiAdapter((ob, wf), IWorkflowStatus, default=None)
1✔
342
        if wfs is not None:
1!
343
            return wfs.get()
1✔
344
        return None
×
345

346
    @security.private
1✔
347
    def setStatusOf(self, wf_id, ob, status):
1✔
348
        """ Append a record to the workflow history of a given workflow.
349
        """
350
        wf = self.getWorkflowById(wf_id)
1✔
351
        wfs = getMultiAdapter((ob, wf), IWorkflowStatus)
1✔
352
        wfs.set(status)
1✔
353

354
    #
355
    #   'IConfigurableWorkflowTool' interface methods
356
    #
357
    @security.protected(ManagePortal)
1✔
358
    @postonly
1✔
359
    def setDefaultChain(self, default_chain, REQUEST=None):
1✔
360
        """ Set the default chain for this tool.
361
        """
362
        default_chain = default_chain.replace(',', ' ')
1✔
363
        ids = []
1✔
364
        for wf_id in default_chain.split(' '):
1✔
365
            if wf_id:
1✔
366
                if not self.getWorkflowById(wf_id):
1!
367
                    raise ValueError('"%s" is not a workflow ID.' % wf_id)
×
368
                ids.append(wf_id)
1✔
369

370
        self._default_chain = tuple(ids)
1✔
371

372
    @security.protected(ManagePortal)
1✔
373
    @postonly
1✔
374
    def setChainForPortalTypes(self, pt_names, chain, verify=True,
1✔
375
                               REQUEST=None):
376
        """ Set a chain for specific portal types.
377
        """
378
        cbt = self._chains_by_type
1✔
379
        if cbt is None:
1✔
380
            self._chains_by_type = cbt = PersistentMapping()
1✔
381

382
        if isinstance(chain, six.string_types):
1✔
383
            if chain == '(Default)':
1✔
384
                chain = None
1✔
385
            else:
386
                chain = [wf.strip() for wf in chain.split(',') if wf.strip()]
1✔
387

388
        if chain is None:
1✔
389
            for type_id in pt_names:
1✔
390
                if type_id in cbt:
1✔
391
                    del cbt[type_id]
1✔
392
            return
1✔
393

394
        ti_ids = [t.getId() for t in self._listTypeInfo()]
1✔
395

396
        for type_id in pt_names:
1✔
397
            if verify and not (type_id in ti_ids):
1!
398
                continue
×
399
            cbt[type_id] = tuple(chain)
1✔
400

401
    @security.private
1✔
402
    def getDefaultChain(self):
1✔
403
        """ Get the default chain for this tool.
404
        """
405
        return self._default_chain
1✔
406

407
    @security.private
1✔
408
    def listChainOverrides(self):
1✔
409
        """ List portal type specific chain overrides.
410
        """
411
        cbt = self._chains_by_type
1✔
412
        return cbt and sorted(cbt.items()) or ()
1✔
413

414
    @security.private
1✔
415
    def getChainFor(self, ob):
1✔
416
        """ Get the chain that applies to the given object.
417
        """
418
        cbt = self._chains_by_type
1✔
419
        if isinstance(ob, six.string_types):
1!
420
            pt = ob
×
421
        elif hasattr(aq_base(ob), 'getPortalTypeName'):
1✔
422
            pt = ob.getPortalTypeName()
1✔
423
        else:
424
            pt = None
1✔
425

426
        if pt is None:
1✔
427
            return ()
1✔
428

429
        chain = None
1✔
430
        if cbt is not None:
1✔
431
            chain = cbt.get(pt, None)
1✔
432
            # Note that if chain is not in cbt or has a value of
433
            # None, we use a default chain.
434
        if chain is None:
1✔
435
            return self.getDefaultChain()
1✔
436
        return chain
1✔
437

438
    #
439
    #   Other methods
440
    #
441
    @security.protected(ManagePortal)
1✔
442
    @postonly
1✔
443
    def updateRoleMappings(self, REQUEST=None):
1✔
444
        """ Allow workflows to update the role-permission mappings.
445
        """
446
        wfs = {}
×
447
        for id in self.objectIds():
×
448
            wf = self.getWorkflowById(id)
×
449
            if hasattr(aq_base(wf), 'updateRoleMappingsFor'):
×
450
                wfs[id] = wf
×
451
        portal = aq_parent(aq_inner(self))
×
452
        count = self._recursiveUpdateRoleMappings(portal, wfs)
×
453
        if REQUEST is not None:
×
454
            msg = '%d object(s) updated.' % count
×
455
            return self.manage_selectWorkflows(REQUEST,
×
456
                                               manage_tabs_message=msg)
457
        else:
458
            return count
×
459

460
    @security.private
1✔
461
    def getWorkflowById(self, wf_id):
1✔
462
        """ Retrieve a given workflow.
463
        """
464
        wf = getattr(self, wf_id, None)
1✔
465
        if IWorkflowDefinition.providedBy(wf):
1✔
466
            return wf
1✔
467
        else:
468
            return None
1✔
469

470
    @security.private
1✔
471
    def getDefaultChainFor(self, ob):
1✔
472
        """ Get the default chain, if applicable, for ob.
473
        """
474
        ttool = queryUtility(ITypesTool)
1✔
475
        if ttool is not None and ttool.getTypeInfo(ob) is not None:
1✔
476
            return self._default_chain
1✔
477
        return ()
1✔
478

479
    @security.private
1✔
480
    def getWorkflowIds(self):
1✔
481
        """ Return the list of workflow ids.
482
        """
483
        wf_ids = []
1✔
484
        for obj_name, obj in self.objectItems():
1✔
485
            if IWorkflowDefinition.providedBy(obj):
1!
486
                wf_ids.append(obj_name)
1✔
487
        return tuple(wf_ids)
1✔
488

489
    @security.protected(ManagePortal)
1✔
490
    def getWorkflowsFor(self, ob):
1✔
491

492
        """ Find the workflows for the type of the given object.
493
        """
494
        res = []
1✔
495
        for wf_id in self.getChainFor(ob):
1✔
496
            wf = self.getWorkflowById(wf_id)
1✔
497
            if wf is not None:
1✔
498
                res.append(wf)
1✔
499
        return res
1✔
500

501
    #
502
    #   Helper methods
503
    #
504
    @security.private
1✔
505
    def _listTypeInfo(self):
1✔
506
        """ List the portal types which are available.
507
        """
508
        ttool = queryUtility(ITypesTool)
1✔
509
        if ttool is not None:
1✔
510
            return ttool.listTypeInfo()
1✔
511
        return ()
1✔
512

513
    @security.private
1✔
514
    def _invokeWithNotification(self, wfs, ob, action, func, args, kw):
1✔
515

516
        """ Private utility method:  call 'func', and deal with exceptions
517
            indicating that the object has been deleted or moved.
518
        """
519
        reindex = 1
1✔
520
        for w in wfs:
1✔
521
            w.notifyBefore(ob, action)
1✔
522
            notify(ActionWillBeInvokedEvent(ob, w, action))
1✔
523
        try:
1✔
524
            res = func(*args, **kw)
1✔
525
        except ObjectDeleted as ex:
×
526
            res = ex.getResult()
×
527
            reindex = 0
×
528
        except ObjectMoved as ex:
×
529
            res = ex.getResult()
×
530
            ob = ex.getNewObject()
×
531
        except Exception:
×
532
            exc = sys.exc_info()
×
533
            try:
×
534
                for w in wfs:
×
535
                    w.notifyException(ob, action, exc)
×
536
                    notify(ActionRaisedExceptionEvent(ob, w, action, exc))
×
537
                raise exc[0](exc[1]).with_traceback(exc[2])
×
538
            finally:
539
                exc = None
×
540
        for w in wfs:
1✔
541
            w.notifySuccess(ob, action, res)
1✔
542
            notify(ActionSucceededEvent(ob, w, action, res))
1✔
543
        if reindex:
1!
544
            self._reindexWorkflowVariables(ob)
1✔
545
        return res
1✔
546

547
    @security.private
1✔
548
    def _recursiveUpdateRoleMappings(self, ob, wfs):
1✔
549
        """ Update roles-permission mappings recursively, and
550
            reindex special index.
551
        """
552
        # Returns a count of updated objects.
553
        count = 0
×
554
        wf_ids = self.getChainFor(ob)
×
555
        if wf_ids:
×
556
            changed = 0
×
557
            for wf_id in wf_ids:
×
558
                wf = wfs.get(wf_id, None)
×
559
                if wf is not None:
×
560
                    did = wf.updateRoleMappingsFor(ob)
×
561
                    if did:
×
562
                        changed = 1
×
563
            if changed:
×
564
                count = count + 1
×
565
                if hasattr(aq_base(ob), 'reindexObject'):
×
566
                    # Reindex security-related indexes
567
                    try:
×
568
                        ob.reindexObject(idxs=['allowedRolesAndUsers'])
×
569
                    except TypeError:
×
570
                        # Catch attempts to reindex portal_catalog.
571
                        pass
×
572
        if hasattr(aq_base(ob), 'objectItems'):
×
573
            obs = ob.objectItems()
×
574
            if obs:
×
575
                for _k, v in obs:
×
576
                    changed = getattr(v, '_p_changed', 0)
×
577
                    count = count + self._recursiveUpdateRoleMappings(v, wfs)
×
578
                    if changed is None:
×
579
                        # Re-ghostify.
580
                        v._p_deactivate()
×
581
        return count
×
582

583
    @security.private
1✔
584
    def _setDefaultCataloging(self, value):
1✔
585

586
        """ Toggle whether '_reindexWorkflowVariables' actually touches
587
            the catalog (sometimes not desirable, e.g. when the workflow
588
            objects do this themselves only at particular points).
589
        """
590
        self._default_cataloging = bool(value)
×
591

592
    @security.private
1✔
593
    def _reindexWorkflowVariables(self, ob):
1✔
594

595
        """ Reindex the variables that the workflow may have changed.
596

597
        Also reindexes the security.
598
        """
599
        if not self._default_cataloging:
1!
600
            return
×
601

602
        if hasattr(aq_base(ob), 'reindexObject'):
1!
603
            # XXX We only need the keys here, no need to compute values.
604
            mapping = self.getCatalogVariablesFor(ob) or {}
×
605
            vars = list(mapping)
×
606
            ob.reindexObject(idxs=vars)
×
607

608
        # Reindex security of subobjects.
609
        if hasattr(aq_base(ob), 'reindexObjectSecurity'):
1!
610
            ob.reindexObjectSecurity()
×
611

612

613
InitializeClass(WorkflowTool)
1✔
614
registerToolInterface('portal_workflow', IWorkflowTool)
1✔
615

616

617
@implementer(IWorkflowStatus)
1✔
618
class DefaultWorkflowStatus(object):
1✔
619
    adapts(IWorkflowAware, IWorkflowDefinition)
1✔
620

621
    def __init__(self, context, workflow):
1✔
622
        self.context = aq_base(context)
1✔
623
        self.wf_id = workflow.getId()
1✔
624

625
    def get(self):
1✔
626
        history = getattr(self.context, 'workflow_history', {})
1✔
627
        wfh = history.get(self.wf_id)
1✔
628
        if wfh:
1!
629
            return wfh[-1]
1✔
630
        return None
×
631

632
    def set(self, status):
1✔
633
        history = getattr(self.context, 'workflow_history', None)
1✔
634
        if history is None:
1✔
635
            history = self.context.workflow_history = PersistentMapping()
1✔
636
        wfh = list(history.get(self.wf_id, ()))
1✔
637
        wfh.append(status)
1✔
638
        history[self.wf_id] = tuple(wfh)
1✔
639

640

641
@implementer(IWorkflowHistory)
1✔
642
@adapter(IWorkflowAware, IWorkflowDefinition)
1✔
643
def default_workflow_history(context, workflow):
1✔
644
    history = getattr(aq_base(context), 'workflow_history', {})
1✔
645
    return history.get(workflow.getId(), ())
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc