/usr/lib/python3/dist-packages/nibabel/openers.py is in python3-nibabel 2.0.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | # emacs: -*- mode: python-mode; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the NiBabel package for the
# copyright and license terms.
#
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
""" Context manager openers for various fileobject types
"""
import bz2
import gzip
import sys
from os.path import splitext
# The largest memory chunk that gzip can use for reads
GZIP_MAX_READ_CHUNK = 100 * 1024 * 1024 # 100Mb
class BufferedGzipFile(gzip.GzipFile):
"""GzipFile able to readinto buffer >= 2**32 bytes.
This class only differs from gzip.GzipFile
in Python 3.5.0.
This works around a known issue in Python 3.5.
See https://bugs.python.org/issue25626"""
# This helps avoid defining readinto in Python 2.6,
# where it is undefined on gzip.GzipFile.
# It also helps limit the exposure to this code.
if sys.version_info[:3] == (3, 5, 0):
def __init__(self, fileish, mode='rb', compresslevel=9,
buffer_size=2**32-1):
super(BufferedGzipFile, self).__init__(fileish, mode=mode,
compresslevel=compresslevel)
self.buffer_size = buffer_size
def readinto(self, buf):
"""Uses self.buffer_size to do a buffered read."""
n_bytes = len(buf)
if n_bytes < 2 ** 32:
return super(BufferedGzipFile, self).readinto(buf)
# This works around a known issue in Python 3.5.
# See https://bugs.python.org/issue25626
mv = memoryview(buf)
n_read = 0
max_read = 2 ** 32 - 1 # Max for unsigned 32-bit integer
while (n_read < n_bytes):
n_wanted = min(n_bytes - n_read, max_read)
n_got = super(BufferedGzipFile, self).readinto(
mv[n_read:n_read + n_wanted])
n_read += n_got
if n_got != n_wanted:
break
return n_read
def _gzip_open(fileish, *args, **kwargs):
gzip_file = BufferedGzipFile(fileish, *args, **kwargs)
# Speedup for #209; attribute not present in in Python 3.5
# open gzip files with faster reads on large files using larger
# See https://github.com/nipy/nibabel/pull/210 for discussion
if hasattr(gzip_file, 'max_chunk_read'):
gzip_file.max_read_chunk = GZIP_MAX_READ_CHUNK
return gzip_file
class Opener(object):
""" Class to accept, maybe open, and context-manage file-likes / filenames
Provides context manager to close files that the constructor opened for you.
Parameters
----------
fileish : str or file-like
if str, then open with suitable opening method. If file-like, accept as
is
\*args : positional arguments
passed to opening method when `fileish` is str. ``mode``, if not
specified, is `rb`. ``compresslevel``, if relevant, and not specified,
is set from class variable ``default_compresslevel``
\*\*kwargs : keyword arguments
passed to opening method when `fileish` is str. Change of defaults as
for \*args
"""
gz_def = (_gzip_open, ('mode', 'compresslevel'))
bz2_def = (bz2.BZ2File, ('mode', 'buffering', 'compresslevel'))
compress_ext_map = {
'.gz': gz_def,
'.bz2': bz2_def,
None: (open, ('mode', 'buffering')) # default
}
#: default compression level when writing gz and bz2 files
default_compresslevel = 1
#: whether to ignore case looking for compression extensions
compress_ext_icase = True
def __init__(self, fileish, *args, **kwargs):
if self._is_fileobj(fileish):
self.fobj = fileish
self.me_opened = False
self._name = None
return
opener, arg_names = self._get_opener_argnames(fileish)
# Get full arguments to check for mode and compresslevel
full_kwargs = kwargs.copy()
n_args = len(args)
full_kwargs.update(dict(zip(arg_names[:n_args], args)))
# Set default mode
if not 'mode' in full_kwargs:
kwargs['mode'] = 'rb'
if 'compresslevel' in arg_names and 'compresslevel' not in kwargs:
kwargs['compresslevel'] = self.default_compresslevel
self.fobj = opener(fileish, *args, **kwargs)
self._name = fileish
self.me_opened = True
def _get_opener_argnames(self, fileish):
_, ext = splitext(fileish)
if self.compress_ext_icase:
ext = ext.lower()
for key in self.compress_ext_map:
if key is None:
continue
if key.lower() == ext:
return self.compress_ext_map[key]
elif ext in self.compress_ext_map:
return self.compress_ext_map[ext]
return self.compress_ext_map[None]
def _is_fileobj(self, obj):
""" Is `obj` a file-like object?
"""
return hasattr(obj, 'read') and hasattr(obj, 'write')
@property
def closed(self):
return self.fobj.closed
@property
def name(self):
""" Return ``self.fobj.name`` or self._name if not present
self._name will be None if object was created with a fileobj, otherwise
it will be the filename.
"""
try:
return self.fobj.name
except AttributeError:
return self._name
@property
def mode(self):
return self.fobj.mode
def fileno(self):
return self.fobj.fileno()
def read(self, *args, **kwargs):
return self.fobj.read(*args, **kwargs)
def write(self, *args, **kwargs):
return self.fobj.write(*args, **kwargs)
def seek(self, *args, **kwargs):
return self.fobj.seek(*args, **kwargs)
def tell(self, *args, **kwargs):
return self.fobj.tell(*args, **kwargs)
def close(self, *args, **kwargs):
return self.fobj.close(*args, **kwargs)
def __iter__(self):
return iter(self.fobj)
def close_if_mine(self):
""" Close ``self.fobj`` iff we opened it in the constructor
"""
if self.me_opened:
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close_if_mine()
|