/usr/share/pyshared/bimdp/nodes/gradient.py is in python-mdp 3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | """
Extension to get the total derivative / gradient / Jacobian matrix.
"""
import mdp
import bimdp
np = mdp.numx
class NotDifferentiableException(mdp.NodeException):
"""Exception if the total derivative does not exist."""
pass
# Default implementation is needed to satisfy the "method" request.
class GradientExtensionNode(mdp.ExtensionNode, mdp.Node):
"""Base node of the extension to calculate the gradient at a certain point.
To get the gradient simply put 'method': 'gradient' into the msg dict.
The grad array is three dimensional, with shape
(len(x), self.output_dim, self.input_dim).
The matrix formed by the last two indices is also called the Jacobian
matrix.
Nodes which have no well defined total derivative should raise the
NotDifferentiableException.
"""
extension_name = "gradient"
def _gradient(self, x, grad=None):
"""Calculate the contribution to the grad for this node at point x.
The contribution is then combined with the given gradient, to get
the gradient for the original x.
This is a template function, derived classes should override _get_grad.
"""
if self.is_training():
raise mdp.TrainingException("The training is not completed yet.")
if grad is None:
grad = np.zeros((len(x), self.input_dim, self.input_dim))
diag_indices = np.arange(self.input_dim)
grad[:,diag_indices,diag_indices] = 1.0
new_grad = self._get_grad(x)
# combine the gradients
grad = np.asarray([np.dot(new_grad[i], grad[i])
for i in range(len(new_grad))])
# update the x value for the next node
result = self._execute(x)
if isinstance(result, tuple):
x = result[0]
msg = result[1]
else:
x = result
msg = {}
msg.update({"grad": grad})
return x, msg
def _get_grad(self, x):
"""Return the grad for the given points.
Override this method.
"""
err = "Gradient not implemented for class %s." % str(self.__class__)
raise NotImplementedError(err)
def _stop_gradient(self, x, grad=None):
"""Helper method to make gradient available for stop_message."""
result = self._gradient(x, grad)
# FIXME: Is this really correct? x should be updated!
# Could remove this once we have the new stop signature.
return result[1], 1
## Implementations for specific nodes. ##
# TODO: cache the gradient for linear nodes?
# If there was a linear base class one could integrate this?
# TODO: add at least a PCA gradient implementation
@mdp.extension_method("gradient", mdp.nodes.IdentityNode, "_get_grad")
def _identity_grad(self, x):
grad = np.zeros((len(x), self.output_dim, self.input_dim))
diag_indices = np.arange(self.input_dim)
grad[:,diag_indices,diag_indices] = 1.0
return grad
@mdp.extension_method("gradient", mdp.nodes.SFANode, "_get_grad")
def _sfa_grad(self, x):
# the gradient is constant, but have to give it for each x point
return np.repeat(self.sf.T[np.newaxis,:,:], len(x), axis=0)
@mdp.extension_method("gradient", mdp.nodes.QuadraticExpansionNode,
"_get_grad")
def _quadex_grad(self, x):
# the exapansion is:
# [x1, x2, x3, x1x1, x1x2, x1x3, x2x2, x2x3, x3,x3]
dim = self.input_dim
grad = np.zeros((len(x), self.output_dim, dim))
# constant part
diag_indices = np.arange(dim)
grad[:,diag_indices,diag_indices] = 1.0
# quadratic part
i_start = dim
for i in range(dim):
grad[:, i_start:i_start+dim-i, i] = x[:,i:]
diag_indices = np.arange(dim - i)
grad[:, diag_indices+i_start, diag_indices+i] += x[:,i,np.newaxis]
i_start += (dim - i)
return grad
@mdp.extension_method("gradient", mdp.nodes.SFA2Node, "_get_grad")
def _sfa2_grad(self, x):
quadex_grad = self._expnode._get_grad(x)
sfa_grad = _sfa_grad(self, x)
return np.asarray([np.dot(sfa_grad[i], quadex_grad[i])
for i in range(len(sfa_grad))])
## mdp.hinet nodes ##
@mdp.extension_method("gradient", mdp.hinet.Layer, "_get_grad")
def _layer_grad(self, x):
in_start = 0
in_stop = 0
out_start = 0
out_stop = 0
grad = None
for node in self.nodes:
out_start = out_stop
out_stop += node.output_dim
in_start = in_stop
in_stop += node.input_dim
if grad is None:
node_grad = node._get_grad(x[:, in_start:in_stop])
grad = np.zeros([node_grad.shape[0], self.output_dim,
self.input_dim],
dtype=node_grad.dtype)
# note that the gradient is block-diagonal
grad[:, out_start:out_stop, in_start:in_stop] = node_grad
else:
grad[:, out_start:out_stop, in_start:in_stop] = \
node._get_grad(x[:, in_start:in_stop])
return grad
# this is an optimized implementation, the original implementation is
# used for reference in the unittest
@mdp.extension_method("gradient", mdp.hinet.Switchboard, "_gradient")
def _switchboard_gradient(self, x, grad=None):
if grad is None:
grad = np.zeros((len(x), self.input_dim, self.input_dim))
diag_indices = np.arange(self.input_dim)
grad[:,diag_indices,diag_indices] = 1.0
## custom implementation for greater speed
grad = grad[:, self.connections]
# update the x value for the next node
result = self._execute(x)
if isinstance(result, tuple):
x = result[0]
msg = result[1]
else:
x = result
msg = {}
msg.update({"grad": grad})
return x, msg
|