/usr/share/pyshared/brian/hears/bufferable.py is in python-brian 1.4.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | '''
The Bufferable class serves as a base for all the other Brian.hears classes
'''
from numpy import zeros, empty, hstack, vstack, arange, diff
class Bufferable(object):
'''
Base class for Brian.hears classes
Defines a buffering interface of two methods:
``buffer_init()``
Initialise the buffer, should set the time pointer to zero and do
any other initialisation that the object needs.
``buffer_fetch(start, end)``
Fetch the next samples ``start:end`` from the buffer. Value returned
should be an array of shape ``(end-start, nchannels)``. Can throw an
``IndexError`` exception if it is outside the possible range.
In addition, bufferable objects should define attributes:
``nchannels``
The number of channels in the buffer.
``samplerate``
The sample rate in Hz.
By default, the class will define a default buffering mechanism which can
easily be extended. To extend the default buffering mechanism, simply
implement the method:
``buffer_fetch_next(samples)``
Returns the next ``samples`` from the buffer.
The default methods for ``buffer_init()`` and ``buffer_fetch()`` will
define a buffer cache which will get larger if it needs to to accommodate
a ``buffer_fetch(start, end)`` where ``end-start`` is larger than the
current cache. If the filterbank has a ``minimum_buffer_size`` attribute,
the internal cache will always have at least this size, and the
``buffer_fetch_next(samples)`` method will always get called with
``samples>=minimum_buffer_size``. This can be useful to ensure that the
buffering is done efficiently internally, even if the user request
buffered chunks that are too small. If the filterbank has a
``maximum_buffer_size`` attribute then ``buffer_fetch_next(samples)`` will
always be called with ``samples<=maximum_buffer_size`` - this can be useful
for either memory consumption reasons or for implementing time varying
filters that need to update on a shorter time window than the overall
buffer size.
The following attributes will automatically be maintained:
``self.cached_buffer_start``, ``self.cached_buffer_end``
The start and end of the cached segment of the buffer
``self.cached_buffer_output``
An array of shape ``((cached_buffer_end-cached_buffer_start, nchannels)``
with the current cached segment of the buffer. Note that this array can
change size.
'''
def buffer_fetch(self, start, end):
if not hasattr(self, 'cached_buffer_start'):
self.buffer_init()
# optimisations for the most typical cases, which are when start:end is
# the current cached segment, or when start:end is the next cached
# segment of the same size as the current one
if start==self.cached_buffer_start and end==self.cached_buffer_end:
return self.cached_buffer_output
if start==self.cached_buffer_end and end-start==self.cached_buffer_output.shape[0]:
self.cached_buffer_output = self._buffer_fetch_next(end-start)
self.cached_buffer_start = start
self.cached_buffer_end = end
return self.cached_buffer_output
# handle bad inputs
if end<start:
raise IndexError('Buffer end should be larger than start.')
if start<self.cached_buffer_start:
raise IndexError('Attempted to fetch output that has disappeared from the buffer.')
# If the requested segment of the buffer is entirely within the cache,
# just return it.
if end<=self.cached_buffer_end:
bstart = start-self.cached_buffer_start
bend = end-self.cached_buffer_start
return self.cached_buffer_output[bstart:bend, :]
# Otherwise we need to fetch some new samples.
# in case of minimum_buffer_size, we need to remember what the
# requested end point was, because with a minimum buffer size we need
# to extend the end point
req_end = end
samples = end-self.cached_buffer_end
if hasattr(self, 'minimum_buffer_size'):
samples = max(samples, self.minimum_buffer_size)
end = self.cached_buffer_end+samples
newsegment = self._buffer_fetch_next(samples)
# otherwise we have an overlap situation - I guess this won't really
# happen very often but it has to be handled correctly in case.
new_end = end
# if end-start is longer than the size of the current cached buffer, we
# will have to increase the size of the cache
new_start = min(new_end-self.cached_buffer_output.shape[0], start)
new_size = new_end-new_start
new_output = empty((new_size, self.nchannels))
if samples!=new_size:
new_output[:new_size-samples, :] = self.cached_buffer_output[samples-new_size:, :]
new_output[new_size-samples:, :] = newsegment
self.cached_buffer_output = new_output
self.cached_buffer_start = new_start
self.cached_buffer_end = new_end
# return only those values up to the requested end point
return new_output[start-self.cached_buffer_start:req_end-self.cached_buffer_start, :]
def _buffer_fetch_next(self, samples):
# This method checks if there is a maximum buffer size, and if so
# splits the buffer_fetch_next into multiple pieces of at most this size
if not hasattr(self, 'maximum_buffer_size'):
return self.buffer_fetch_next(samples)
bufsize = self.maximum_buffer_size
endpoints = hstack((arange(0, samples, bufsize), samples))
sizes = diff(endpoints)
return vstack(tuple(self.buffer_fetch_next(size) for size in sizes))
def buffer_init(self):
self.cached_buffer_output = zeros((0, self.nchannels))
self.cached_buffer_start = 0
self.cached_buffer_end = 0
def buffer_fetch_next(self, samples):
raise NotImplementedError
nchannels = NotImplemented
samplerate = NotImplemented
|