This file is indexed.

/usr/lib/python2.7/dist-packages/async/channel.py is in python-async 0.6.2-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Contains a queue based channel implementation"""
try:
    from queue import Empty
except ImportError:
    from Queue import Empty

from .util import (
        AsyncQueue,
        SyncQueue,
        ReadOnly
        )

import threading
import sys

__all__ = ( 'Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
            'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
            'IteratorReader', 'CallbackReaderMixin', 'CallbackWriterMixin')

#{ Classes
class Channel(object):
    """A channel is similar to a file like object. It has a write end as well as one or
    more read ends. If Data is in the channel, it can be read, if not the read operation
    will block until data becomes available.
    If the channel is closed, any read operation will result in an exception

    This base class is not instantiated directly, but instead serves as constructor
    for Rwriter pairs.

    Create a new channel """
    __slots__ = 'queue'

    # The queue to use to store the actual data
    QueueCls = AsyncQueue

    def __init__(self):
        """initialize this instance with a queue holding the channel contents"""
        self.queue = self.QueueCls()


class SerialChannel(Channel):
    """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
    QueueCls = SyncQueue


class Writer(object):
    """A writer is an object providing write access to a possibly blocking reading device"""
    __slots__ = tuple()

    #{ Interface

    def __init__(self, device):
        """Initialize the instance with the device to write to"""

    def write(self, item, block=True, timeout=None):
        """Write the given item into the device
        :param block: True if the device may block until space for the item is available
        :param timeout: The time in seconds to wait for the device to become ready
        in blocking mode"""
        raise NotImplementedError()

    def size(self):
        """:return: number of items already in the device, they could be read with a reader"""
        raise NotImplementedError()

    def close(self):
        """Close the channel. Multiple close calls on a closed channel are no
        an error"""
        raise NotImplementedError()

    def closed(self):
        """:return: True if the channel was closed"""
        raise NotImplementedError()

    #} END interface


class ChannelWriter(Writer):
    """The write end of a channel, a file-like interface for a channel"""
    __slots__ = ('channel', '_put')

    def __init__(self, channel):
        """Initialize the writer to use the given channel"""
        self.channel = channel
        self._put = self.channel.queue.put

    #{ Interface
    def write(self, item, block=False, timeout=None):
        return self._put(item, block, timeout)

    def size(self):
        return self.channel.queue.qsize()

    def close(self):
        """Close the channel. Multiple close calls on a closed channel are no
        an error"""
        self.channel.queue.set_writable(False)

    def closed(self):
        """:return: True if the channel was closed"""
        return not self.channel.queue.writable()
    #} END interface


class CallbackWriterMixin(object):
    """The write end of a channel which allows you to setup a callback to be
    called after an item was written to the channel"""
    # slots don't work with mixin's :(
    # __slots__ = ('_pre_cb')

    def __init__(self, *args):
        super(CallbackWriterMixin, self).__init__(*args)
        self._pre_cb = None

    def set_pre_cb(self, fun = lambda item: item):
        """
        Install a callback to be called before the given item is written.
        It returns a possibly altered item which will be written to the channel
        instead, making it useful for pre-write item conversions.
        Providing None uninstalls the current method.

        :return: the previously installed function or None
        :note: Must be thread-safe if the channel is used in multiple threads"""
        prev = self._pre_cb
        self._pre_cb = fun
        return prev

    def write(self, item, block=True, timeout=None):
        if self._pre_cb:
            item = self._pre_cb(item)
        super(CallbackWriterMixin, self).write(item, block, timeout)


class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter):
    """Implements a channel writer with callback functionality"""
    pass


class Reader(object):
    """Allows reading from a device"""
    __slots__ = tuple()

    #{ Interface
    def __init__(self, device):
        """Initialize the instance with the device to read from"""

    #{ Iterator protocol

    def __iter__(self):
        return self

    def __next__(self):
        """Implements the iterator protocol, iterating individual items"""
        items = self.read(1)
        if items:
            return items[0]
        raise StopIteration

    def next(self):
        """Support the Python 2 iterator syntax"""
        return self.__next__()

    #} END iterator protocol


    #{ Interface

    def read(self, count=0, block=True, timeout=None):
        """
        read a list of items read from the device. The list, as a sequence
        of items, is similar to the string of characters returned when reading from
        file like objects.

        :param count: given amount of items to read. If < 1, all items will be read
        :param block: if True, the call will block until an item is available
        :param timeout: if positive and block is True, it will block only for the
            given amount of seconds, returning the items it received so far.
            The timeout is applied to each read item, not for the whole operation.
        :return: single item in a list if count is 1, or a list of count items.
            If the device was empty and count was 1, an empty list will be returned.
            If count was greater 1, a list with less than count items will be
            returned.
            If count was < 1, a list with all items that could be read will be
            returned."""
        raise NotImplementedError()

    #} END interface


class ChannelReader(Reader):
    """Allows reading from a channel. The reader is thread-safe if the channel is as well"""
    __slots__ = 'channel'

    def __init__(self, channel):
        """Initialize this instance from its parent write channel"""
        self.channel = channel

    #{ Interface

    def read(self, count=0, block=True, timeout=None):
        # if the channel is closed for writing, we never block
        # NOTE: is handled by the queue
        # We don't check for a closed state here has it costs time - most of
        # the time, it will not be closed, and will bail out automatically once
        # it gets closed


        # in non-blocking mode, its all not a problem
        out = list()
        queue = self.channel.queue
        if not block:
            # be as fast as possible in non-blocking mode, hence
            # its a bit 'unrolled'
            try:
                if count == 1:
                    out.append(queue.get(False))
                elif count < 1:
                    while True:
                        out.append(queue.get(False))
                    # END for each item
                else:
                    for i in range(count):
                        out.append(queue.get(False))
                    # END for each item
                # END handle count
            except Empty:
                pass
            # END handle exceptions
        else:
            # to get everything into one loop, we set the count accordingly
            if count == 0:
                count = sys.maxsize
            # END handle count

            i = 0
            while i < count:
                try:
                    out.append(queue.get(block, timeout))
                    i += 1
                except Empty:
                    # here we are only if
                    # someone woke us up to inform us about the queue that changed
                    # its writable state
                    # The following branch checks for closed channels, and pulls
                    # as many items as we need and as possible, before
                    # leaving the loop.
                    if not queue.writable():
                        try:
                            while i < count:
                                out.append(queue.get(False, None))
                                i += 1
                            # END count loop
                        except Empty:
                            break   # out of count loop
                        # END handle absolutely empty queue
                    # END handle closed channel

                    # if we are here, we woke up and the channel is not closed
                    # Either the queue became writable again, which currently shouldn't
                    # be able to happen in the channel, or someone read with a timeout
                    # that actually timed out.
                    # As it timed out, which is the only reason we are here,
                    # we have to abort
                    break
                # END ignore empty

            # END for each item
        # END handle blocking
        return out

    #} END interface


class CallbackReaderMixin(object):
    """A channel which sends a callback before items are read from the channel"""
    # unfortunately, slots can only use direct inheritance, have to turn it off :(
    # __slots__ = "_pre_cb"

    def __init__(self, *args):
        super(CallbackReaderMixin, self).__init__(*args)
        self._pre_cb = None
        self._post_cb = None

    def set_pre_cb(self, fun = lambda count: None):
        """
        Install a callback to call with the item count to be read before any
        item is actually read from the channel.
        Exceptions will be propagated.
        If a function is not provided, the call is effectively uninstalled.

        :return: the previously installed callback or None
        :note: The callback must be threadsafe if the channel is used by multiple threads."""
        prev = self._pre_cb
        self._pre_cb = fun
        return prev

    def set_post_cb(self, fun = lambda items: items):
        """
        Install a callback to call after items have been read, but before
        they are returned to the caller. The callback may adjust the items and/or the list.
        If no function is provided, the callback is uninstalled

        :return: the previously installed function"""
        prev = self._post_cb
        self._post_cb = fun
        return prev

    def read(self, count=0, block=True, timeout=None):
        if self._pre_cb:
            self._pre_cb(count)
        items = super(CallbackReaderMixin, self).read(count, block, timeout)

        if self._post_cb:
            items = self._post_cb(items)
        return items



class CallbackChannelReader(CallbackReaderMixin, ChannelReader):
    """Implements a channel reader with callback functionality"""
    pass


class IteratorReader(Reader):
    """A Reader allowing to read items from an iterator, instead of a channel.
    Reads will never block. Its thread-safe"""
    __slots__ = ("_empty", '_iter', '_lock')

    # the type of the lock to use when reading from the iterator
    lock_type = threading.Lock

    def __init__(self, iterator):
        self._empty = False
        if not hasattr(iterator, 'next') and not (hasattr(iterator, "__next__")):
            raise ValueError("Iterator %r needs a next() function" % iterator)
        self._iter = iterator
        self._lock = self.lock_type()

    def read(self, count=0, block=True, timeout=None):
        """Non-Blocking implementation of read"""
        # not threadsafe, but worst thing that could happen is that
        # we try to get items one more time
        if self._empty:
            return list()
        # END early abort

        self._lock.acquire()
        try:
            if count == 0:
                self._empty = True
                return list(self._iter)
            else:
                out = list()
                it = self._iter
                for i in range(count):
                    try:
                        out.append(next(it))
                    except StopIteration:
                        self._empty = True
                        break
                    # END handle empty iterator
                # END for each item to take
                return out
            # END handle count
        finally:
            self._lock.release()
        # END handle locking


#} END classes

#{ Constructors
def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
    """
    Create a channel, with a reader and a writer
    :return: tuple(reader, writer)
    :param ctype: Channel to instantiate
    :param wctype: The type of the write channel to instantiate
    :param rctype: The type of the read channel to instantiate"""
    c = ctype()
    wc = wtype(c)
    rc = rtype(c)
    return wc, rc
#} END constructors