This file is indexed.

/usr/share/pyshared/socketpool/util.py is in python-socketpool 0.5.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# -*- coding: utf-8 -
#
# This file is part of socketpool.
# See the NOTICE for more information.

import errno
import os
import platform
import select
import socket
import sys

try:
    from importlib import import_module
except ImportError:
    import sys

    def _resolve_name(name, package, level):
        """Return the absolute name of the module to be imported."""
        if not hasattr(package, 'rindex'):
            raise ValueError("'package' not set to a string")
        dot = len(package)
        for x in range(level, 1, -1):
            try:
                dot = package.rindex('.', 0, dot)
            except ValueError:
                raise ValueError("attempted relative import beyond top-level "
                                  "package")
        return "%s.%s" % (package[:dot], name)


    def import_module(name, package=None):
        """Import a module.

        The 'package' argument is required when performing a relative import. It
        specifies the package to use as the anchor point from which to resolve the
        relative import to an absolute import.

        """
        if name.startswith('.'):
            if not package:
                raise TypeError("relative imports require the 'package' argument")
            level = 0
            for character in name:
                if character != '.':
                    break
                level += 1
            name = _resolve_name(name[level:], package, level)
        __import__(name)
        return sys.modules[name]

def load_backend(backend_name):
    """ load pool backend. If this is an external module it should be
    passed as "somelib.backend_mod", for socketpool backend you can just
    pass the name.

    Supported backend are :
        - thread: connection are maintained in a threadsafe queue.
        - gevent: support gevent
        - eventlet: support eventlet

    """
    try:
        if len(backend_name.split(".")) > 1:
            mod = import_module(backend_name)
        else:
            mod = import_module("socketpool.backend_%s" % backend_name)
        return mod
    except ImportError:
        error_msg = "%s isn't a socketpool backend" % backend_name
        raise ImportError(error_msg)


def can_use_kqueue():
    # See Issue #15. kqueue doesn't work on OS X 10.6 and below.
    if not hasattr(select, "kqueue"):
        return False

    if platform.system() == 'Darwin' and platform.mac_ver()[0] < '10.7':
        return False

    return True

def is_connected(skt):
    try:
        fno = skt.fileno()
    except socket.error as e:
        if e[0] == errno.EBADF:
            return False
        raise

    try:
        if hasattr(select, "epoll"):
            ep = select.epoll()
            ep.register(fno, select.EPOLLOUT | select.EPOLLIN)
            events = ep.poll(0)
            for fd, ev in events:
                if fno == fd and \
                        (ev & select.EPOLLOUT or ev & select.EPOLLIN):
                    ep.unregister(fno)
                    return True
            ep.unregister(fno)
        elif hasattr(select, "poll"):
            p = select.poll()
            p.register(fno, select.POLLOUT | select.POLLIN)
            events = p.poll(0)
            for fd, ev in events:
                if fno == fd and \
                        (ev & select.POLLOUT or ev & select.POLLIN):
                    p.unregister(fno)
                    return True
            p.unregister(fno)
        elif can_use_kqueue():
            kq = select.kqueue()
            events = [
                select.kevent(fno, select.KQ_FILTER_READ, select.KQ_EV_ADD),
                select.kevent(fno, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
            ]
            kq.control(events, 0)
            kevents = kq.control(None, 4, 0)
            for ev in kevents:
                if ev.ident == fno:
                    if ev.flags & select.KQ_EV_ERROR:
                        return False
                    else:
                        return True

            # delete
            events = [
                select.kevent(fno, select.KQ_FILTER_READ, select.KQ_EV_DELETE),
                select.kevent(fno, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
            ]
            kq.control(events, 0)
            kq.close()
            return True
        else:
            r, _, _ = select.select([fno], [], [], 0)
            if not r:
                return True

    except IOError:
        pass
    except (ValueError, select.error,) as e:
        pass

    return False