/usr/share/pyshared/mvpa2/mappers/detrend.py is in python-mvpa2 2.2.0-4ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367  | # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the PyMVPA package for the
#   copyright and license terms.
#
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Mapper for data detrending."""
__docformat__ = 'restructuredtext'
import numpy as np
from mvpa2.base.types import is_sequence_type
from mvpa2.base import externals
if externals.exists('scipy', raise_=True):
    # if we construct the polynomials ourselves, we wouldn't need scipy here
    from scipy.special import legendre
    def legendre_(n, x):
        """Helper to avoid problems with scipy 0.8.0 returning inf for -1
        Scipy 0.8.0 (and possibly later) has regression of reporting
        'inf's for negative boundary. Lets guard against it for now
        """
        leg = legendre(n)
        r = leg(x)
        infs = np.isinf(r)
        if np.any(infs):
            r[infs] = leg(x[infs] + 1e-10) # offset to try to overcome problems
        return r
from mvpa2.base.dochelpers import _str, borrowkwargs
from mvpa2.mappers.base import Mapper
class PolyDetrendMapper(Mapper):
    """Mapper for regression-based removal of polynomial trends.
    Noteworthy features are the possibility for chunk-wise detrending, optional
    regressors, and the ability to use positional information about the samples
    from the dataset.
    Any sample attribute from the to be mapped dataset can be used to define
    `chunks` that shall be detrended separately. The number of chunks is
    determined from the number of unique values of the attribute and all samples
    with the same value are considered to be in the same chunk.
    It is possible to provide a list of additional sample attribute names that
    will be used as confound regressors during detrending. This, for example,
    allows to use fMRI motion correction parameters to be considered.
    Finally, it is possible to use positional information about the dataset
    samples for the detrending. This is useful, for example, if the samples in
    the dataset are not equally spaced out over the acquisition time-window.
    In that case an actually linear trend in the data would be distorted and
    not properly removed. By setting the `inspace` argument to the name of a
    samples attribute that carries this information, the mapper will take this
    into account and shift the polynomials accordingly. If `inspace` is given,
    but the dataset doesn't contain such an attribute evenly spaced coordinates
    are generated and this information is stored in the mapped dataset.
    Notes
    -----
    The mapper only support mapping of datasets, not plain data. Moreover,
    reverse mapping, or subsequent forward-mapping of partial datasets are
    currently not implemented.
    Examples
    --------
    >>> from mvpa2.datasets import dataset_wizard
    >>> from mvpa2.mappers.detrend import PolyDetrendMapper
    >>> samples = np.array([[1.0, 2, 3, 3, 2, 1],
    ...                    [-2.0, -4, -6, -6, -4, -2]]).T
    >>> chunks = [0, 0, 0, 1, 1, 1]
    >>> ds = dataset_wizard(samples, chunks=chunks)
    >>> dm = PolyDetrendMapper(chunks_attr='chunks', polyord=1)
    >>> # the mapper will be auto-trained upon first use
    >>> mds = dm.forward(ds)
    >>> # total removal all all (chunk-wise) linear trends
    >>> np.sum(np.abs(mds)) < 0.00001
    True
    """
    def __init__(self, polyord=1, chunks_attr=None, opt_regs=None, **kwargs):
        """
        Parameters
        ----------
        polyord : int or list, optional
          Order of the Legendre polynomial to remove from the data.  This
          will remove every polynomial up to and including the provided
          value.  For example, 3 will remove 0th, 1st, 2nd, and 3rd order
          polynomials from the data.  np.B.: The 0th polynomial is the
          baseline shift, the 1st is the linear trend.
          If you specify a single int and `chunks_attr` is not None, then this value
          is used for each chunk.  You can also specify a different polyord
          value for each chunk by providing a list or ndarray of polyord
          values the length of the number of chunks.
        chunks_attr : str or None
          If None, the whole dataset is detrended at once. Otherwise, the given
          samples attribute (given by its name) is used to define chunks of the
          dataset that are processed individually. In that case, all the samples
          within a chunk should be in contiguous order and the chunks should be
          sorted in order from low to high -- unless the dataset provides
          information about the coordinate of each sample in the space that
          should be spanned be the polynomials (see `inspace` argument).
        opt_regs : list or None
          Optional list of sample attribute names that should be used as
          additional regressors.  One example would be to regress out motion
          parameters.
        space : str or None
          If not None, a samples attribute of the same name is added to the
          mapped dataset that stores the coordinates of each sample in the
          space that is spanned by the polynomials. If an attribute of that
          name is already present in the input dataset its values are interpreted
          as sample coordinates in the space that should be spanned by the
          polynomials.
        """
        self.__chunks_attr = chunks_attr
        self.__polyord = polyord
        self.__opt_reg = opt_regs
        # things that come from train()
        self._polycoords = None
        self._regs = None
        # secret switch to perform in-place detrending
        self._secret_inplace_detrend = False
        # need to init last to prevent base class puking
        Mapper.__init__(self, **kwargs)
    def __repr__(self):
        s = super(PolyDetrendMapper, self).__repr__()
        return s.replace("(",
                         "(polyord=%i, chunks_attr=%s, opt_regs=%s, "
                          % (self.__polyord,
                             repr(self.__chunks_attr),
                             repr(self.__opt_reg)),
                         1)
    def __str__(self):
        return _str(self, ord=self.__polyord)
    def _scale_array(self, a):
        # scale an array into the interval [-1,1] using its min and max values
        # as input range
        return a / ((a.max() - a.min()) / 2) - 1
    def _get_polycoords(self, ds, chunk_slicer):
        """Compute human-readable and scaled polycoords.
        Parameters
        ----------
        ds : dataset
        chunk_slicer : boolean array
          True elements indicate samples selected for detrending.
        Returns
        -------
        tuple
          (real-world polycoords, scaled polycoords into [-1,1])
        """
        inspace = self.get_space()
        if chunk_slicer is None:
            nsamples = len(ds)
        else:
            nsamples = chunk_slicer.sum()
        # if we don't have to take care of an inspace thing are easy
        if inspace is None:
            polycoords_scaled = np.linspace(-1, 1, nsamples)
            return None, polycoords_scaled
        else:
            # there is interest in the inspace, but do we have information, or
            # just want to store it later on
            if inspace in ds.sa:
                # we have info
                if chunk_slicer is None:
                    chunk_slicer = slice(None)
                polycoords = ds.sa[inspace].value[chunk_slicer]
            else:
                # no info in the dataset, just be nice and generate some
                # meaningful polycoords
                polycoords = np.arange(nsamples)
            return polycoords, self._scale_array(polycoords.astype('float'))
    def _train(self, ds):
        # local binding
        chunks_attr = self.__chunks_attr
        polyord = self.__polyord
        opt_reg = self.__opt_reg
        inspace = self.get_space()
        self._polycoords = None
        # global detrending is desired
        if chunks_attr is None:
            # consider the entire dataset
            reg = []
            # create the timespan
            self._polycoords, polycoords_scaled = self._get_polycoords(ds, None)
            for n in range(polyord + 1):
                reg.append(legendre_(n, polycoords_scaled)[:, np.newaxis])
        # chunk-wise detrending is desired
        else:
             # get the unique chunks
            uchunks = ds.sa[chunks_attr].unique
            # Process the polyord to be a list with length of the number of
            # chunks
            if not is_sequence_type(polyord):
                # repeat to be proper length
                polyord = [polyord] * len(uchunks)
            elif not chunks_attr is None and len(polyord) != len(uchunks):
                raise ValueError("If you specify a sequence of polyord values "
                                 "they sequence length must match the "
                                 "number of unique chunks in the dataset.")
            # loop over each chunk
            reg = []
            update_polycoords = True
            # if the dataset know about the inspace we can store the
            # polycoords right away
            if not inspace is None and inspace in ds.sa:
                self._polycoords = ds.sa[inspace].value
                update_polycoords = False
            else:
                # otherwise we prepare and empty array that is going to be
                # filled below -- we know that those polycoords are going to
                # be ints
                self._polycoords = np.empty(len(ds), dtype='int')
            for n, chunk in enumerate(uchunks):
                # get the indices for that chunk
                cinds = ds.sa[chunks_attr].value == chunk
                # create the timespan
                polycoords, polycoords_scaled = self._get_polycoords(ds, cinds)
                if update_polycoords and not polycoords is None:
                    self._polycoords[cinds] = polycoords
                # create each polyord with the value for that chunk
                for n in range(polyord[n] + 1):
                    newreg = np.zeros((len(ds), 1))
                    newreg[cinds, 0] = legendre_(n, polycoords_scaled)
                    reg.append(newreg)
        # if we don't handle in inspace, there is no need to store polycoords
        if inspace is None:
            self._polycoords = None
        # see if add in optional regs
        if not opt_reg is None:
            # add in the optional regressors, too
            for oreg in opt_reg:
                reg.append(ds.sa[oreg].value[np.newaxis].T)
        # combine the regs (time x reg)
        self._regs = np.hstack(reg)
    def _forward_dataset(self, ds):
        # auto-train the mapper if not yet done
        if self._regs is None:
            self.train(ds)
        if self._secret_inplace_detrend:
            mds = ds
        else:
            # shallow copy to put the new stuff in
            mds = ds.copy(deep=False)
        # local binding
        regs = self._regs
        inspace = self.get_space()
        polycoords = self._polycoords
        # is it possible to map that dataset?
        if inspace is None and len(regs) != len(ds):
            raise ValueError("Cannot detrend the dataset, since it neither "
                             "provides location information of its samples "
                             "in the space spanned by the polynomials, "
                             "nor does it match the number of samples this "
                             "this mapper has been trained on. (got: %i "
                             " and was trained on %i)."
                             % (len(ds), len(regs)))
        # do we have to handle the polynomial space somehow?
        if not inspace is None:
            if inspace in ds.sa:
                space_coords = ds.sa[inspace].value
                # this dataset has some notion about our polyspace
                # we need to put the right regressor values for its samples
                # let's first see whether the coords are identical to the
                # trained ones (that should be the common case and nothing needs
                # to be done
                # otherwise look whether we can find the right regressors
                if not np.all(space_coords == polycoords):
                    # to make the stuff below work, we'd need to store chunk
                    # info too, otherwise we cannot determine the correct
                    # regressor rows
                    raise NotImplementedError
                    # determine the regressor rows that match the samples
                    reg_idx = [np.argwhere(polycoords == c).flatten()[0]
                                    for c in space_coords]
                    # slice the regressors accordingly
                    regs = regs[reg_idx]
            else:
                # the input dataset knows nothing about the polyspace
                # let's put that information into the output dataset
                mds.sa[inspace] = self._polycoords
        # regression for each feature
        fit = np.linalg.lstsq(regs, ds.samples)
        # actually we are only interested in the solution
        # res[0] is (nregr x nfeatures)
        y = fit[0]
        # remove all and keep only the residuals
        if self._secret_inplace_detrend:
            # if we are in evil mode do evil
            # cast the data to float, since in-place operations below do not
            # upcast!
            if np.issubdtype(mds.samples.dtype, np.integer):
                mds.samples = mds.samples.astype('float')
            mds.samples -= np.dot(regs, y)
        else:
            # important to assign to ensure COW behavior
            mds.samples = ds.samples - np.dot(regs, y)
        return mds
    def _forward_data(self, data):
        raise RuntimeError("%s cannot map plain data."
                           % self.__class__.__name__)
@borrowkwargs(PolyDetrendMapper, '__init__')
def poly_detrend(ds, **kwargs):
    """In-place polynomial detrending.
    This function behaves identical to the `PolyDetrendMapper`. The only
    difference is that the actual detrending is done in-place -- potentially
    causing a significant reduction of the memory demands.
    Parameters
    ----------
    ds : Dataset
      The dataset that will be detrended in-place.
    **kwargs
      For all other arguments, please see the documentation of
      PolyDetrendMapper.
    """
    dm = PolyDetrendMapper(**kwargs)
    dm._secret_inplace_detrend = True
    # map
    mapped = dm.forward(ds)
    # and append the mapper to the dataset
    mapped._append_mapper(dm)
 |