/usr/lib/python3/dist-packages/pyutilib/workflow/tasks.py is in python3-pyutilib 5.3.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | # _________________________________________________________________________
#
# PyUtilib: A Python utility library.
# Copyright (c) 2008 Sandia Corporation.
# This software is distributed under the BSD License.
# Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
# the U.S. Government retains certain rights in this software.
# _________________________________________________________________________
__all__ = ['TaskPlugin', 'TaskFactory', 'WorkflowPlugin']
from pyutilib.workflow import port
from pyutilib.workflow import task
from pyutilib.workflow import workflow
from pyutilib.component.core import *
class IWorkflowTask(Interface):
pass
TaskFactory = CreatePluginFactory(IWorkflowTask)
class TaskPlugin(Plugin, task.Task):
implements(IWorkflowTask)
def __init__(self, *args, **kwds): #pragma:nocover
Plugin.__init__(self, *args, **kwds)
task.Task.__init__(self, *args, **kwds)
def __repr__(self):
return task.Task.__repr__(self) #pragma:nocover
class WorkflowPlugin(Plugin, workflow.Workflow):
implements(IWorkflowTask)
def __init__(self, *args, **kwds): #pragma:nocover
Plugin.__init__(self, *args, **kwds)
workflow.Workflow.__init__(self, *args, **kwds)
def __repr__(self): #pragma:nocover
return workflow.Workflow.__repr__(self)
class Selection_Task(TaskPlugin):
alias('workflow.selection')
def __init__(self, *args, **kwds):
TaskPlugin.__init__(self, *args, **kwds)
#
self.inputs.declare('index')
self.inputs.declare('data')
#
self.outputs.declare('selection')
def execute(self):
self.selection = self.data[self.index]
class Switch_Task(TaskPlugin):
alias('workflow.switch')
def __init__(self, *args, **kwds):
TaskPlugin.__init__(self, *args, **kwds)
self._branches = {}
self.inputs.declare('value')
def add_branch(self, value, task):
self._branches[value] = 'task'+str(task.id)
self.output_controls.declare(self._branches[value])
task.input_controls.declare('task'+str(self.id))
setattr(task.input_controls, 'task'+str(self.id), self.output_controls[ self._branches[value] ])
def execute(self):
flag=False
for key in self._branches:
if self.value == key:
self.output_controls[self._branches[key]].set_ready()
flag=True
else:
self.output_controls[self._branches[key]].reset()
if not flag:
raise ValueError("Branch condition has value '%s' but no branch is indexed with that value.\n Valid branch indices: %s" % (str(self.value), sorted(self._branches.keys())))
def __repr__(self):
return TaskPlugin.__repr__(self) #pragma:nocover
class IfThen_Task(Switch_Task):
alias('workflow.branch')
def __init__(self, *args, **kwds):
Switch_Task.__init__(self, *args, **kwds)
def __repr__(self):
return Switch_Task.__repr__(self) #pragma:nocover
|