/usr/lib/python2.7/dist-packages/mne/parallel.py is in python-mne 0.7.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | """Parallel util function
"""
# Author: Alexandre Gramfort <gramfort@nmr.mgh.harvard.edu>
#
# License: Simplified BSD
import inspect
import logging
import os
from . import get_config
from .utils import logger, verbose
if 'MNE_FORCE_SERIAL' in os.environ:
_force_serial = True
else:
_force_serial = None
@verbose
def parallel_func(func, n_jobs, verbose=None, max_nbytes='auto'):
"""Return parallel instance with delayed function
Util function to use joblib only if available
Parameters
----------
func: callable
A function
n_jobs: int
Number of jobs to run in parallel
verbose : bool, str, int, or None
If not None, override default verbose level (see mne.verbose).
INFO or DEBUG will print parallel status, others will not.
max_nbytes int, str, or None
Threshold on the minimum size of arrays passed to the workers that
triggers automated memmory mapping. Can be an int in Bytes,
or a human-readable string, e.g., '1M' for 1 megabyte.
Use None to disable memmaping of large arrays. Use 'auto' to
use the value set using mne.set_memmap_min_size.
Returns
-------
parallel: instance of joblib.Parallel or list
The parallel object
my_func: callable
func if not parallel or delayed(func)
n_jobs: int
Number of jobs >= 0
"""
# for a single job, we don't need joblib
if n_jobs == 1:
n_jobs = 1
my_func = func
parallel = list
return parallel, my_func, n_jobs
try:
from joblib import Parallel, delayed
except ImportError:
try:
from sklearn.externals.joblib import Parallel, delayed
except ImportError:
logger.warn('joblib not installed. Cannot run in parallel.')
n_jobs = 1
my_func = func
parallel = list
return parallel, my_func, n_jobs
# check if joblib is recent enough to support memmaping
aspec = inspect.getargspec(Parallel.__init__)
joblib_mmap = ('temp_folder' in aspec.args and 'max_nbytes' in aspec.args)
cache_dir = get_config('MNE_CACHE_DIR', None)
if isinstance(max_nbytes, basestring) and max_nbytes == 'auto':
max_nbytes = get_config('MNE_MEMMAP_MIN_SIZE', None)
if max_nbytes is not None:
if not joblib_mmap and cache_dir is not None:
logger.warn('"MNE_CACHE_DIR" is set but a newer version of joblib '
'is needed to use the memmapping pool.')
if joblib_mmap and cache_dir is None:
logger.info('joblib supports memapping pool but "MNE_CACHE_DIR" '
'is not set in MNE-Python config. To enable it, use, '
'e.g., mne.set_cache_dir(\'/tmp/shm\'). This will '
'store temporary files under /dev/shm and can result '
'in large memory savings.')
# create keyword arguments for Parallel
kwargs = {'verbose': 5 if logger.level <= logging.INFO else 0}
if joblib_mmap:
if cache_dir is None:
max_nbytes = None # disable memmaping
kwargs['temp_folder'] = cache_dir
kwargs['max_nbytes'] = max_nbytes
n_jobs = check_n_jobs(n_jobs)
parallel = Parallel(n_jobs, **kwargs)
my_func = delayed(func)
return parallel, my_func, n_jobs
def check_n_jobs(n_jobs, allow_cuda=False):
"""Check n_jobs in particular for negative values
Parameters
----------
n_jobs : int
The number of jobs.
allow_cuda : bool
Allow n_jobs to be 'cuda'. Default: False.
Returns
-------
n_jobs : int
The checked number of jobs. Always positive (or 'cuda' if
applicable.)
"""
if _force_serial:
n_jobs = 1
logger.info('... MNE_FORCE_SERIAL set. Processing in forced serial mode.')
elif not isinstance(n_jobs, int):
if not allow_cuda:
raise ValueError('n_jobs must be an integer')
elif not isinstance(n_jobs, basestring) or n_jobs != 'cuda':
raise ValueError('n_jobs must be an integer, or "cuda"')
#else, we have n_jobs='cuda' and this is okay, so do nothing
elif n_jobs <= 0:
try:
import multiprocessing
n_cores = multiprocessing.cpu_count()
n_jobs = n_cores + n_jobs
if n_jobs <= 0:
raise ValueError('If n_jobs has a negative value it must not '
'be less than the number of CPUs present. '
'You\'ve got %s CPUs' % n_cores)
except ImportError:
# only warn if they tried to use something other than 1 job
if n_jobs != 1:
logger.warn('multiprocessing not installed. Cannot run in '
'parallel.')
n_jobs = 1
return n_jobs
|