/usr/lib/python3/dist-packages/nibabel/openers.py is in python3-nibabel 2.2.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251  | # emacs: -*- mode: python-mode; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the NiBabel package for the
#   copyright and license terms.
#
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
""" Context manager openers for various fileobject types
"""
import bz2
import gzip
import sys
import warnings
from os.path import splitext
from distutils.version import StrictVersion
# is indexed_gzip present and modern?
try:
    from indexed_gzip import SafeIndexedGzipFile, __version__ as version
    HAVE_INDEXED_GZIP = True
    if StrictVersion(version) < StrictVersion('0.6.0'):
        warnings.warn('indexed_gzip is present, but too old '
                      '(>= 0.6.0 required): {})'.format(version))
        HAVE_INDEXED_GZIP = False
    del version
except ImportError:
    HAVE_INDEXED_GZIP = False
# The largest memory chunk that gzip can use for reads
GZIP_MAX_READ_CHUNK = 100 * 1024 * 1024  # 100Mb
class BufferedGzipFile(gzip.GzipFile):
    """GzipFile able to readinto buffer >= 2**32 bytes.
    This class only differs from gzip.GzipFile
    in Python 3.5.0.
    This works around a known issue in Python 3.5.
    See https://bugs.python.org/issue25626
    """
    # This helps avoid defining readinto in Python 2.6,
    #   where it is undefined on gzip.GzipFile.
    # It also helps limit the exposure to this code.
    if sys.version_info[:3] == (3, 5, 0):
        def __init__(self, fileish, mode='rb', compresslevel=9,
                     buffer_size=2**32 - 1):
            super(BufferedGzipFile, self).__init__(fileish, mode=mode,
                                                   compresslevel=compresslevel)
            self.buffer_size = buffer_size
        def readinto(self, buf):
            """Uses self.buffer_size to do a buffered read."""
            n_bytes = len(buf)
            if n_bytes < 2 ** 32:
                return super(BufferedGzipFile, self).readinto(buf)
            # This works around a known issue in Python 3.5.
            # See https://bugs.python.org/issue25626
            mv = memoryview(buf)
            n_read = 0
            max_read = 2 ** 32 - 1  # Max for unsigned 32-bit integer
            while (n_read < n_bytes):
                n_wanted = min(n_bytes - n_read, max_read)
                n_got = super(BufferedGzipFile, self).readinto(
                    mv[n_read:n_read + n_wanted])
                n_read += n_got
                if n_got != n_wanted:
                    break
            return n_read
def _gzip_open(filename, mode='rb', compresslevel=9, keep_open=False):
    # use indexed_gzip if possible for faster read access
    if keep_open and mode == 'rb' and HAVE_INDEXED_GZIP:
        gzip_file = SafeIndexedGzipFile(filename)
    # Fall-back to built-in GzipFile (wrapped with the BufferedGzipFile class
    # defined above)
    else:
        gzip_file = BufferedGzipFile(filename, mode, compresslevel)
    # Speedup for #209, for versions of python < 3.5. Open gzip files with
    # faster reads on large files using a larger read buffer. See
    # https://github.com/nipy/nibabel/pull/210 for discussion
    if hasattr(gzip_file, 'max_read_chunk'):
        gzip_file.max_read_chunk = GZIP_MAX_READ_CHUNK
    return gzip_file
class Opener(object):
    """ Class to accept, maybe open, and context-manage file-likes / filenames
    Provides context manager to close files that the constructor opened for
    you.
    Parameters
    ----------
    fileish : str or file-like
        if str, then open with suitable opening method. If file-like, accept as
        is
    \*args : positional arguments
        passed to opening method when `fileish` is str.  ``mode``, if not
        specified, is `rb`.  ``compresslevel``, if relevant, and not specified,
        is set from class variable ``default_compresslevel``. ``keep_open``, if
        relevant, and not specified, is ``False``.
    \*\*kwargs : keyword arguments
        passed to opening method when `fileish` is str.  Change of defaults as
        for \*args
    """
    gz_def = (_gzip_open, ('mode', 'compresslevel', 'keep_open'))
    bz2_def = (bz2.BZ2File, ('mode', 'buffering', 'compresslevel'))
    compress_ext_map = {
        '.gz': gz_def,
        '.bz2': bz2_def,
        None: (open, ('mode', 'buffering'))  # default
    }
    #: default compression level when writing gz and bz2 files
    default_compresslevel = 1
    #: whether to ignore case looking for compression extensions
    compress_ext_icase = True
    def __init__(self, fileish, *args, **kwargs):
        if self._is_fileobj(fileish):
            self.fobj = fileish
            self.me_opened = False
            self._name = None
            return
        opener, arg_names = self._get_opener_argnames(fileish)
        # Get full arguments to check for mode and compresslevel
        full_kwargs = kwargs.copy()
        n_args = len(args)
        full_kwargs.update(dict(zip(arg_names[:n_args], args)))
        # Set default mode
        if 'mode' not in full_kwargs:
            kwargs['mode'] = 'rb'
        # Default compression level
        if 'compresslevel' in arg_names and 'compresslevel' not in kwargs:
            kwargs['compresslevel'] = self.default_compresslevel
        # Default keep_open hint
        if 'keep_open' in arg_names:
            kwargs.setdefault('keep_open', False)
        # Clear keep_open hint if it is not relevant for the file type
        else:
            kwargs.pop('keep_open', None)
        self.fobj = opener(fileish, *args, **kwargs)
        self._name = fileish
        self.me_opened = True
    def _get_opener_argnames(self, fileish):
        _, ext = splitext(fileish)
        if self.compress_ext_icase:
            ext = ext.lower()
            for key in self.compress_ext_map:
                if key is None:
                    continue
                if key.lower() == ext:
                    return self.compress_ext_map[key]
        elif ext in self.compress_ext_map:
            return self.compress_ext_map[ext]
        return self.compress_ext_map[None]
    def _is_fileobj(self, obj):
        """ Is `obj` a file-like object?
        """
        return hasattr(obj, 'read') and hasattr(obj, 'write')
    @property
    def closed(self):
        return self.fobj.closed
    @property
    def name(self):
        """ Return ``self.fobj.name`` or self._name if not present
        self._name will be None if object was created with a fileobj, otherwise
        it will be the filename.
        """
        try:
            return self.fobj.name
        except AttributeError:
            return self._name
    @property
    def mode(self):
        return self.fobj.mode
    def fileno(self):
        return self.fobj.fileno()
    def read(self, *args, **kwargs):
        return self.fobj.read(*args, **kwargs)
    def write(self, *args, **kwargs):
        return self.fobj.write(*args, **kwargs)
    def seek(self, *args, **kwargs):
        return self.fobj.seek(*args, **kwargs)
    def tell(self, *args, **kwargs):
        return self.fobj.tell(*args, **kwargs)
    def close(self, *args, **kwargs):
        return self.fobj.close(*args, **kwargs)
    def __iter__(self):
        return iter(self.fobj)
    def close_if_mine(self):
        """ Close ``self.fobj`` iff we opened it in the constructor
        """
        if self.me_opened:
            self.close()
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close_if_mine()
class ImageOpener(Opener):
    """ Opener-type class to collect extra compressed extensions
    A trivial sub-class of opener to which image classes can add extra
    extensions with custom openers, such as compressed openers.
    To add an extension, add a line to the class definition (not __init__):
        ImageOpener.compress_ext_map[ext] = func_def
    ``ext`` is a file extension beginning with '.' and should be included in
    the image class's ``valid_exts`` tuple.
    ``func_def`` is a `(function, (args,))` tuple, where `function accepts a
    filename as the first parameter, and `args` defines the other arguments
    that `function` accepts. These arguments must be any (unordered) subset of
    `mode`, `compresslevel`, and `buffering`.
    """
    # Add new extensions to this dictionary
    compress_ext_map = Opener.compress_ext_map.copy()
 |