/usr/share/pyshared/socketpool/util.py is in python-socketpool 0.5.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | # -*- coding: utf-8 -
#
# This file is part of socketpool.
# See the NOTICE for more information.
import errno
import os
import platform
import select
import socket
import sys
try:
from importlib import import_module
except ImportError:
import sys
def _resolve_name(name, package, level):
"""Return the absolute name of the module to be imported."""
if not hasattr(package, 'rindex'):
raise ValueError("'package' not set to a string")
dot = len(package)
for x in range(level, 1, -1):
try:
dot = package.rindex('.', 0, dot)
except ValueError:
raise ValueError("attempted relative import beyond top-level "
"package")
return "%s.%s" % (package[:dot], name)
def import_module(name, package=None):
"""Import a module.
The 'package' argument is required when performing a relative import. It
specifies the package to use as the anchor point from which to resolve the
relative import to an absolute import.
"""
if name.startswith('.'):
if not package:
raise TypeError("relative imports require the 'package' argument")
level = 0
for character in name:
if character != '.':
break
level += 1
name = _resolve_name(name[level:], package, level)
__import__(name)
return sys.modules[name]
def load_backend(backend_name):
""" load pool backend. If this is an external module it should be
passed as "somelib.backend_mod", for socketpool backend you can just
pass the name.
Supported backend are :
- thread: connection are maintained in a threadsafe queue.
- gevent: support gevent
- eventlet: support eventlet
"""
try:
if len(backend_name.split(".")) > 1:
mod = import_module(backend_name)
else:
mod = import_module("socketpool.backend_%s" % backend_name)
return mod
except ImportError:
error_msg = "%s isn't a socketpool backend" % backend_name
raise ImportError(error_msg)
def can_use_kqueue():
# See Issue #15. kqueue doesn't work on OS X 10.6 and below.
if not hasattr(select, "kqueue"):
return False
if platform.system() == 'Darwin' and platform.mac_ver()[0] < '10.7':
return False
return True
def is_connected(skt):
try:
fno = skt.fileno()
except socket.error as e:
if e[0] == errno.EBADF:
return False
raise
try:
if hasattr(select, "epoll"):
ep = select.epoll()
ep.register(fno, select.EPOLLOUT | select.EPOLLIN)
events = ep.poll(0)
for fd, ev in events:
if fno == fd and \
(ev & select.EPOLLOUT or ev & select.EPOLLIN):
ep.unregister(fno)
return True
ep.unregister(fno)
elif hasattr(select, "poll"):
p = select.poll()
p.register(fno, select.POLLOUT | select.POLLIN)
events = p.poll(0)
for fd, ev in events:
if fno == fd and \
(ev & select.POLLOUT or ev & select.POLLIN):
p.unregister(fno)
return True
p.unregister(fno)
elif can_use_kqueue():
kq = select.kqueue()
events = [
select.kevent(fno, select.KQ_FILTER_READ, select.KQ_EV_ADD),
select.kevent(fno, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
]
kq.control(events, 0)
kevents = kq.control(None, 4, 0)
for ev in kevents:
if ev.ident == fno:
if ev.flags & select.KQ_EV_ERROR:
return False
else:
return True
# delete
events = [
select.kevent(fno, select.KQ_FILTER_READ, select.KQ_EV_DELETE),
select.kevent(fno, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
]
kq.control(events, 0)
kq.close()
return True
else:
r, _, _ = select.select([fno], [], [], 0)
if not r:
return True
except IOError:
pass
except (ValueError, select.error,) as e:
pass
return False
|