/usr/share/pyshared/mdp/parallel/parallelnodes.py is in python-mdp 3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 | """
Module for MDP Nodes that support parallel training.
This module contains both the parallel base class and some parallel
implementations of MDP nodes. Note that such ParallelNodes are only needed for
training, parallel execution works with any Node that can be pickled.
"""
# WARNING: There is a problem with unpickled arrays in NumPy < 1.1.x, see
# http://projects.scipy.org/scipy/numpy/ticket/551
# To circumvent this, you can use a copy() of all unpickled arrays.
import inspect
import mdp
from mdp import numx
class NotForkableParallelException(mdp.NodeException):
"""Exception to signal that a fork is not possible.
This exception is can be safely used and should be caught inside the
ParallelFlow or the Scheduler.
"""
pass
class JoinParallelException(mdp.NodeException):
"""Exception for errors when joining parallel nodes."""
pass
class ParallelExtensionNode(mdp.ExtensionNode, mdp.Node):
"""Base class for parallel trainable MDP nodes.
With the fork method new node instances are created which can then be
trained. With the join method the trained instances are then merged back
into a single node instance.
This class defines default methods which raise a
TrainingPhaseNotParallelException exception.
"""
extension_name = "parallel"
# TODO: allow that forked nodes are not forkable themselves,
# and are not joinable either
# this implies that caching does not work for these
def fork(self):
"""Return a new instance of this node class for remote training.
This is a template method, the actual forking should be implemented in
_fork.
The forked node should be a ParallelNode of the same class as well,
thus allowing recursive forking and joining.
"""
return self._fork()
def join(self, forked_node):
"""Absorb the trained node from a fork into this parent node.
This is a template method, the actual joining should be implemented in
_join.
"""
# Warning: Use the properties / setters here. Otherwise we get problems
# in certain situations (e.g., for FlowNode).
if self.dtype is None:
self.dtype = forked_node.dtype
if self.input_dim is None:
self.input_dim = forked_node.input_dim
if self.output_dim is None:
self.output_dim = forked_node.output_dim
if forked_node._train_phase_started and not self._train_phase_started:
self._train_phase_started = True
self._join(forked_node)
## overwrite these methods ##
def _fork(self):
"""Hook method for forking with default implementation.
Overwrite this method for nodes that can be parallelized.
You can use _default_fork, if that is compatible with your node class,
typically the hard part is the joining.
"""
raise NotForkableParallelException("fork is not implemented " +
"by this node (%s)" %
str(self.__class__))
def _join(self, forked_node):
"""Hook method for joining, to be overridden."""
raise JoinParallelException("join is not implemented " +
"by this node (%s)" %
str(self.__class__))
@staticmethod
def use_execute_fork():
"""Return True if node requires a fork / join even during execution.
The default output is False, overwrite this method if required.
Note that the same fork and join methods are used as during training,
so the distinction must be implemented in the custom _fork and _join
methods.
"""
return False
## helper methods ##
def _default_fork(self):
"""Default implementation of _fork.
It uses introspection to determine the init kwargs and tries to fill
them with attributes. These kwargs are then used to instanciate
self.__class__ to create the fork instance.
So you can use this method if all the required keys are also public
attributes or have a single underscore in front.
There are two reasons why this method does not simply replace _fork
of ParallelExtensionNode (plus removing Node from the
inheritance list):
- If a node is not parallelized _fork raises an exception, as do nodes
which can not fork due to some other reasons. Without this bahavior
of _fork we would have to check with hasattr first if fork is
present, adding more complexity at other places (mostly in
container nodes).
- This is a safeguard forcing users to think a little instead of
relying on the inherited (but possibly incompatible)
default implementation.
"""
args, varargs, varkw, defaults = inspect.getargspec(self.__init__)
args.remove("self")
if defaults:
non_default_keys = args[:-len(defaults)]
else:
non_default_keys = []
kwargs = dict((key, getattr(self, key))
for key in args if hasattr(self, key))
# look for the key with an underscore in front
for key in kwargs:
args.remove(key)
under_kwargs = dict((key, getattr(self, '_' + key))
for key in args if hasattr(self, '_' + key))
for key in under_kwargs:
args.remove(key)
kwargs.update(under_kwargs)
# check that all the keys without default arguments are covered
if non_default_keys:
missing_defaults = set(non_default_keys) & set(args)
if missing_defaults:
err = ("could not find attributes for init arguments %s" %
str(missing_defaults))
raise NotForkableParallelException(err)
# create new instance
return self.__class__(**kwargs)
@staticmethod
def _join_covariance(cov, forked_cov):
"""Helper method to join two CovarianceMatrix instances.
cov -- Instance of CovarianceMatrix, to which the forked_cov instance
is aded in-place.
"""
cov._cov_mtx += forked_cov._cov_mtx
cov._avg += forked_cov._avg
cov._tlen += forked_cov._tlen
## MDP parallel node implementations ##
class ParallelPCANode(ParallelExtensionNode, mdp.nodes.PCANode):
"""Parallel version of MDP PCA node."""
def _fork(self):
return self._default_fork()
def _join(self, forked_node):
"""Combine the covariance matrices."""
if self._cov_mtx._cov_mtx is None:
self.set_dtype(self._cov_mtx._dtype)
self._cov_mtx = forked_node._cov_mtx
else:
self._join_covariance(self._cov_mtx, forked_node._cov_mtx)
class ParallelSFANode(ParallelExtensionNode, mdp.nodes.SFANode):
"""Parallel version of MDP SFA node."""
def _fork(self):
return self._default_fork()
def _join(self, forked_node):
"""Combine the covariance matrices."""
if self._cov_mtx._cov_mtx is None:
self.set_dtype(forked_node._cov_mtx._dtype)
self._cov_mtx = forked_node._cov_mtx
self._dcov_mtx = forked_node._dcov_mtx
else:
self._join_covariance(self._cov_mtx, forked_node._cov_mtx)
self._join_covariance(self._dcov_mtx, forked_node._dcov_mtx)
class ParallelFDANode(ParallelExtensionNode, mdp.nodes.FDANode):
def _fork(self):
if self.get_current_train_phase() == 1:
forked_node = self.copy()
# reset the variables that might contain data from this train phase
forked_node._S_W = None
forked_node._allcov = mdp.utils.CovarianceMatrix(dtype=self.dtype)
else:
forked_node = self._default_fork()
return forked_node
def _join(self, forked_node):
if self.get_current_train_phase() == 1:
if forked_node.get_current_train_phase() != 1:
msg = ("This node is in training phase 1, but the forked node "
"is not.")
raise NotForkableParallelException(msg)
if self._S_W is None:
self.set_dtype(forked_node._allcov._dtype)
self._allcov = forked_node._allcov
self._S_W = forked_node._S_W
else:
self._join_covariance(self._allcov, forked_node._allcov)
self._S_W += forked_node._S_W
else:
for lbl in forked_node.means:
if lbl in self.means:
self.means[lbl] += forked_node.means[lbl]
self.tlens[lbl] += forked_node.tlens[lbl]
else:
self.means[lbl] = forked_node.means[lbl]
self.tlens[lbl] = forked_node.tlens[lbl]
class ParallelHistogramNode(ParallelExtensionNode, mdp.nodes.HistogramNode):
"""Parallel version of the HistogramNode."""
def _fork(self):
return self._default_fork()
def _join(self, forked_node):
if (self.data_hist is not None) and (forked_node.data_hist is not None):
self.data_hist = numx.concatenate([self.data_hist,
forked_node.data_hist])
elif forked_node.data_hist != None:
self.data_hist = forked_node.data_hist
|