/usr/share/pyshared/kombu/utils/eventio.py is in python-kombu 2.1.8-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | """
kombu.utils.eventio
===================
Evented IO support for multiple platforms.
:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
import errno
import select
import socket
from kombu.syn import detect_environment
__all__ = ["poll"]
POLL_READ = 0x001
POLL_ERR = 0x008 | 0x010 | 0x2000
def get_errno(exc):
try:
return exc.errno
except AttributeError:
try:
# e.args = (errno, reason)
if isinstance(exc.args, tuple) and len(exc.args) == 2:
return exc.args[0]
except AttributeError:
pass
return 0
class Poller(object):
def poll(self, timeout):
try:
return self._poll(timeout)
except Exception, exc:
if get_errno(exc) != errno.EINTR:
raise
class _epoll(Poller):
def __init__(self):
self._epoll = select.epoll()
def register(self, fd, events):
self._epoll.register(fd, events)
def unregister(self, fd):
try:
self._epoll.unregister(fd)
except socket.error:
pass
def _poll(self, timeout):
return self._epoll.poll(timeout or -1)
class _kqueue(Poller):
def __init__(self):
self._kqueue = select.kqueue()
self._active = {}
def register(self, fd, events):
self._control(fd, events, select.KQ_EV_ADD)
self._active[fd] = events
def unregister(self, fd):
events = self._active.pop(fd)
try:
self._control(fd, events, select.KQ_EV_DELETE)
except socket.error:
pass
def _control(self, fd, events, flags):
self._kqueue.control([select.kevent(fd, filter=select.KQ_FILTER_READ,
flags=flags)], 0)
def _poll(self, timeout):
kevents = self._kqueue.control(None, 1000, timeout)
events = {}
for kevent in kevents:
fd = kevent.ident
if kevent.filter == select.KQ_FILTER_READ:
events[fd] = events.get(fd, 0) | POLL_READ
if kevent.filter == select.KQ_EV_ERROR:
events[fd] = events.get(fd, 0) | POLL_ERR
return events.items()
class _select(Poller):
def __init__(self):
self._all = self._rfd, self._efd = set(), set()
def register(self, fd, events):
if events & POLL_ERR:
self._efd.add(fd)
self._rfd.add(fd)
elif events & POLL_READ:
self._rfd.add(fd)
def unregister(self, fd):
self._rfd.discard(fd)
self._efd.discard(fd)
def _poll(self, timeout):
read, _write, error = select.select(self._rfd, [], self._efd, timeout)
events = {}
for fd in read:
fd = fd.fileno()
events[fd] = events.get(fd, 0) | POLL_READ
for fd in error:
fd = fd.fileno()
events[fd] = events.get(fd, 0) | POLL_ERR
return events.items()
def _get_poller():
if detect_environment() in ("eventlet", "gevent"):
# greenlet
return _select
elif hasattr(select, "epoll"):
# Py2.6+ Linux
return _epoll
elif hasattr(select, "kqueue"):
# Py2.6+ on BSD / Darwin
return _kqueue
else:
return _select
def poll(*args, **kwargs):
return _get_poller()(*args, **kwargs)
|