This file is indexed.

/usr/share/pyshared/mdp/parallel/parallelnodes.py is in python-mdp 3.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
"""
Module for MDP Nodes that support parallel training.

This module contains both the parallel base class and some parallel
implementations of MDP nodes. Note that such ParallelNodes are only needed for
training, parallel execution works with any Node that can be pickled.
"""

# WARNING: There is a problem with unpickled arrays in NumPy < 1.1.x, see
# http://projects.scipy.org/scipy/numpy/ticket/551
# To circumvent this, you can use a copy() of all unpickled arrays.

import inspect

import mdp
from mdp import numx


class NotForkableParallelException(mdp.NodeException):
    """Exception to signal that a fork is not possible.

    This exception is can be safely used and should be caught inside the
    ParallelFlow or the Scheduler.
    """
    pass


class JoinParallelException(mdp.NodeException):
    """Exception for errors when joining parallel nodes."""
    pass


class ParallelExtensionNode(mdp.ExtensionNode, mdp.Node):
    """Base class for parallel trainable MDP nodes.

    With the fork method new node instances are created which can then be
    trained. With the join method the trained instances are then merged back
    into a single node instance.

    This class defines default methods which raise a
    TrainingPhaseNotParallelException exception.
    """

    extension_name = "parallel"

    # TODO: allow that forked nodes are not forkable themselves,
    #    and are not joinable either
    #    this implies that caching does not work for these
    
    def fork(self):
        """Return a new instance of this node class for remote training.

        This is a template method, the actual forking should be implemented in
        _fork.

        The forked node should be a ParallelNode of the same class as well,
        thus allowing recursive forking and joining.
        """
        return self._fork()

    def join(self, forked_node):
        """Absorb the trained node from a fork into this parent node.

        This is a template method, the actual joining should be implemented in
        _join.
        """
        # Warning: Use the properties / setters here. Otherwise we get problems
        #    in certain situations (e.g., for FlowNode).
        if self.dtype is None:
            self.dtype = forked_node.dtype
        if self.input_dim is None:
            self.input_dim = forked_node.input_dim
        if self.output_dim is None:
            self.output_dim = forked_node.output_dim
        if forked_node._train_phase_started and not self._train_phase_started:
            self._train_phase_started = True
        self._join(forked_node)

    ## overwrite these methods ##

    def _fork(self):
        """Hook method for forking with default implementation.

        Overwrite this method for nodes that can be parallelized.
        You can use _default_fork, if that is compatible with your node class,
        typically the hard part is the joining.
        """
        raise NotForkableParallelException("fork is not implemented " +
                                           "by this node (%s)" %
                                           str(self.__class__))

    def _join(self, forked_node):
        """Hook method for joining, to be overridden."""
        raise JoinParallelException("join is not implemented " +
                                    "by this node (%s)" %
                                    str(self.__class__))
    
    @staticmethod
    def use_execute_fork():
        """Return True if node requires a fork / join even during execution.
        
        The default output is False, overwrite this method if required.
        
        Note that the same fork and join methods are used as during training,
        so the distinction must be implemented in the custom _fork and _join
        methods.
        """
        return False
    
    ## helper methods ##

    def _default_fork(self):
        """Default implementation of _fork.

        It uses introspection to determine the init kwargs and tries to fill
        them with attributes. These kwargs are then used to instanciate
        self.__class__ to create the fork instance.

        So you can use this method if all the required keys are also public
        attributes or have a single underscore in front.
        
        There are two reasons why this method does not simply replace _fork
        of ParallelExtensionNode (plus removing Node from the
        inheritance list):
        - If a node is not parallelized _fork raises an exception, as do nodes
            which can not fork due to some other reasons. Without this bahavior
            of _fork we would have to check with hasattr first if fork is
            present, adding more complexity at other places (mostly in
            container nodes).
        - This is a safeguard forcing users to think a little instead of
            relying on the inherited (but possibly incompatible)
            default implementation.
        """
        args, varargs, varkw, defaults = inspect.getargspec(self.__init__)
        args.remove("self")
        if defaults:
            non_default_keys = args[:-len(defaults)]
        else:
            non_default_keys = []
        kwargs = dict((key, getattr(self, key))
                      for key in args if hasattr(self, key))
        # look for the key with an underscore in front
        for key in kwargs:
            args.remove(key)
        under_kwargs = dict((key, getattr(self, '_' + key))
                            for key in args if hasattr(self, '_' + key))
        for key in under_kwargs:
            args.remove(key)
        kwargs.update(under_kwargs)
        # check that all the keys without default arguments are covered
        if non_default_keys:
            missing_defaults = set(non_default_keys) & set(args)
            if missing_defaults:
                err = ("could not find attributes for init arguments %s" %
                       str(missing_defaults))
                raise NotForkableParallelException(err)
        # create new instance
        return self.__class__(**kwargs)
    
    @staticmethod
    def _join_covariance(cov, forked_cov):
        """Helper method to join two CovarianceMatrix instances.
        
        cov -- Instance of CovarianceMatrix, to which the forked_cov instance
            is aded in-place.
        """
        cov._cov_mtx += forked_cov._cov_mtx
        cov._avg += forked_cov._avg
        cov._tlen += forked_cov._tlen
        

## MDP parallel node implementations ##

class ParallelPCANode(ParallelExtensionNode, mdp.nodes.PCANode):
    """Parallel version of MDP PCA node."""

    def _fork(self):
        return self._default_fork()

    def _join(self, forked_node):
        """Combine the covariance matrices."""
        if self._cov_mtx._cov_mtx is None:
            self.set_dtype(self._cov_mtx._dtype)
            self._cov_mtx = forked_node._cov_mtx
        else:
            self._join_covariance(self._cov_mtx, forked_node._cov_mtx)


class ParallelSFANode(ParallelExtensionNode, mdp.nodes.SFANode):
    """Parallel version of MDP SFA node."""

    def _fork(self):
        return self._default_fork()

    def _join(self, forked_node):
        """Combine the covariance matrices."""
        if self._cov_mtx._cov_mtx is None:
            self.set_dtype(forked_node._cov_mtx._dtype)
            self._cov_mtx = forked_node._cov_mtx
            self._dcov_mtx = forked_node._dcov_mtx
        else:
            self._join_covariance(self._cov_mtx, forked_node._cov_mtx)
            self._join_covariance(self._dcov_mtx, forked_node._dcov_mtx)


class ParallelFDANode(ParallelExtensionNode, mdp.nodes.FDANode):

    def _fork(self):
        if self.get_current_train_phase() == 1:
            forked_node = self.copy()
            # reset the variables that might contain data from this train phase
            forked_node._S_W = None
            forked_node._allcov = mdp.utils.CovarianceMatrix(dtype=self.dtype)
        else:
            forked_node = self._default_fork()
        return forked_node

    def _join(self, forked_node):
        if self.get_current_train_phase() == 1:
            if forked_node.get_current_train_phase() != 1:
                msg = ("This node is in training phase 1, but the forked node "
                       "is not.")
                raise NotForkableParallelException(msg)
            if self._S_W is None:
                self.set_dtype(forked_node._allcov._dtype)
                self._allcov = forked_node._allcov
                self._S_W = forked_node._S_W
            else:
                self._join_covariance(self._allcov, forked_node._allcov)
                self._S_W += forked_node._S_W
        else:
            for lbl in forked_node.means:
                if lbl in self.means:
                    self.means[lbl] += forked_node.means[lbl]
                    self.tlens[lbl] += forked_node.tlens[lbl]
                else:
                    self.means[lbl] = forked_node.means[lbl]
                    self.tlens[lbl] = forked_node.tlens[lbl]


class ParallelHistogramNode(ParallelExtensionNode, mdp.nodes.HistogramNode):
    """Parallel version of the HistogramNode."""

    def _fork(self):
        return self._default_fork()

    def _join(self, forked_node):
        if (self.data_hist is not None) and (forked_node.data_hist is not None):
            self.data_hist = numx.concatenate([self.data_hist,
                                            forked_node.data_hist])
        elif forked_node.data_hist != None:
            self.data_hist = forked_node.data_hist