/usr/lib/python3/dist-packages/pyutilib/workflow/workflow.py is in python3-pyutilib 5.3.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | # _________________________________________________________________________
#
# PyUtilib: A Python utility library.
# Copyright (c) 2008 Sandia Corporation.
# This software is distributed under the BSD License.
# Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
# the U.S. Government retains certain rights in this software.
# _________________________________________________________________________
# Q: When passing options, these values do not initialize the startup task.
# Is this a bug?
# TODO: only set option values for variables that show up in a workflow's inputs
# TODO: add graceful management of exceptions
# show the task tree, etc...
__all__ = ['Workflow']
import argparse
from collections import deque
from pyutilib.workflow.task import Task, EmptyTask, NoTask
from pyutilib.misc import Options
try:
from collections import OrderedDict
except:
from ordereddict import OrderedDict
def _collect_parser_groups(t):
for key in t._parser_group:
#
# NOTE: we are changing the properties of the group
# instances here. This is OK _only_ because we are
# printing the help info and then terminating.
#
# FIXME: The 'parser' object is not defined in the lines below
raise NotImplementedError("")
#t._parser_group[key].parser = parser
#parser.add_argument_group(t._parser_group[key])
def _set_arguments(t):
for arg in t._parser_arg:
args = arg[0]
kwargs = arg[1]
try:
t._parser.add_argument(*args, **kwargs)
except argparse.ArgumentError:
pass
class Workflow(Task):
def __init__(self, id=None, name=None, parser=None):
Task.__init__(self, id=id, name=name, parser=None)
self._tasks = {}
self._start_task = EmptyTask()
self._final_task = EmptyTask()
self.add(self._start_task)
self.add(self._final_task)
def add(self, task, loadall=True):
if self.debug:
print("ADDING",task.id) #pragma:nocover
if task.id == NoTask.id:
return
if task.id in self._tasks:
return
self._tasks[task.id] = task
if not loadall:
return
for name in task.inputs:
for t in task.inputs[name].from_tasks():
self.add(t)
if len(task.inputs[name].from_tasks()) > 0:
continue
if not task.inputs[name].constant:
#
# Constant input values are not added to the start task
#
if not name in self._start_task.outputs:
self._start_task.outputs.declare(name)
self.inputs.declare(name, optional=True)
# TODO: this is a bit of a hack...
val = getattr(task.inputs,name).get_value()
try:
setattr(task.inputs, name, getattr(self._start_task.outputs, name))
except ValueError:
# TBD: when do we get this exception?
pass
getattr(self.inputs,name).set_value(val)
for name in task.output_controls:
for c in task.output_controls[name].output_connections:
self.add(c.to_port.task())
#
for name in task.outputs:
if len(task.outputs[name].output_connections) > 0:
for c in task.outputs[name].output_connections:
self.add(c.to_port.task())
else:
if name in self._final_task.inputs:
raise ValueError("Cannot declare a workplan with multiple output values that share the same name: %s" % name)
self.outputs.declare(name)
self._final_task.inputs.declare(name)
setattr(self._final_task.inputs, name, task.outputs[name])
def _call_init(self, *options, **kwds):
Task._call_init(self, *options, **kwds)
for i in self.inputs:
val = self.inputs[i].get_value()
if val is not None:
self._start_task.outputs[i].set_value( val )
self._start_task.outputs[i].set_ready()
#
# TBD: this appears to be redundant
#
#for key in kwds:
# if key not in self._start_task.outputs:
# raise ValueError, "Cannot specify value for option %s. Valid option names are %s" % (key, self._start_task.outputs.keys())
# self._start_task.outputs[key].set_value( kwds[key] )
def _call_fini(self, *options, **kwds):
ans = Options()
for key in self._final_task.inputs:
self._final_task.inputs[key].compute_value()
ans[key] = self._final_task.inputs[key].get_value()
getattr(self.outputs, key).set_value( ans[key] )
for key in self.outputs:
self.outputs[key].set_ready()
return ans
def set_options(self, args):
self._dfs_([self._start_task.id], lambda t: t.set_options(args))
def options(self):
return self._start_task.outputs.keys()
def print_help(self):
parser = argparse.ArgumentParser()
self._dfs_([self._start_task.id], _collect_parser_groups)
parser.print_help()
def set_arguments(self, parser=None):
if parser is None:
parser = self._parser
self._dfs_([self._start_task.id], _set_arguments)
def reset(self):
return self._dfs_([self._start_task.id], lambda t: t.reset())
def execute(self):
#return self._dfs_([self._start_task.id], lambda t: t.__call__())
if self.debug: #pragma:nocover
print( self.name, '---------------')
print( self.name, '---------------')
print( self.name, ' STARTING')
print( self.name, '---------------')
print( self.name, '---------------')
#
queued = set([self._start_task.id])
queue = deque([self._start_task])
waiting = OrderedDict()
while len(queue)+len(waiting) > 0:
for id in waiting.keys():
t = waiting[id]
if not t.id in queued and t.ready():
#
if self.debug: #pragma:nocover
print(self.name, "WAITING: ",t.id," not queued and task ready")
print(self.name, "Waiting task",t.name,t.id,t.ready())
#
queue.append(t)
queued.add(t.id)
del waiting[t.id]
if len(queue) == 0:
break
# TBD: should we sleep and add a timelimit before raising this exception?
#if len(waiting) == 0:
# break
#print self.name, "ERROR", waiting.keys()
#raise RuntimeError, "Workflow failed to terminate normally. All available tasks are blocked."
task = queue.popleft()
#
if self.debug: #pragma:nocover
print(self.name, "TASK ",str(task))
print(self.name, "QUEUE ",queued)
print(self.name, "WAITING",waiting.keys())
print(self.name, "Executing Task "+task.name,task.next_task_ids())
#
queued.remove(task.id)
task()
for t in task.next_tasks():
if t.id in queued:
continue
if t.ready():
#
if self.debug: #pragma:nocover
print(self.name, "NEXT: ",t.id," not queued and task ready")
print(self.name, "Scheduling task",t.name,t.id,t.ready())
#
queue.append(t)
queued.add(t.id)
if t.id in waiting:
del waiting[t.id]
else:
if not t.id in waiting:
waiting[t.id] = t
#
if self.debug: #pragma:nocover
print(self.name, "NEXT: ",t.id," not queued and task NOT ready")
print(self.name, "Ignoring task",t.name, t.id,t.ready())
if self.debug: #pragma:nocover
print(self.name, "FINAL QUEUE ",queued)
print(self.name, "FINAL WAITING",waiting.keys())
print(self.name, '---------------')
print(self.name, " LOOP")
print(self.name, '---------------')
if self.debug: #pragma:nocover
print(self.name, '---------------')
def __str__(self):
return "\n".join(["Workflow %s:" % self.name]+self._dfs_([self._start_task.id], lambda t: t._name()))
def __repr__(self):
return "Workflow %s:\n" % self.name+Task.__repr__(self)+'\n'+"\n".join(self._dfs_([self._start_task.id], lambda t: str(t)))
def _dfs_(self, indices, fn, touched=None):
if touched is None:
touched = set()
ans=[]
for i in indices:
if i in touched:
# With this design, this condition should never be triggered
# TODO: verify that this is an O(n) search algorithm; I think it's
# O(n^2)
continue #pragma:nocover
ok=True
task = self._tasks[i]
for j in task.prev_task_ids():
if (j == NoTask.id) or (j in touched):
continue
ok=False
break
if not ok:
continue
tmp = fn(task)
if tmp is not None:
ans.append(tmp)
touched.add(i)
ans = ans + self._dfs_(task.next_task_ids(), fn, touched)
return ans
|