This file is indexed.

/usr/share/pyshared/execnet/gateway.py is in python-execnet 1.0.9-0.1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
"""
gateway code for initiating popen, socket and ssh connections.
(c) 2004-2009, Holger Krekel and others
"""

import sys, os, inspect, types, linecache
import textwrap
import execnet
from execnet.gateway_base import Message, Popen2IO
from execnet import gateway_base
importdir = os.path.dirname(os.path.dirname(execnet.__file__))

class Gateway(gateway_base.BaseGateway):
    """ Gateway to a local or remote Python Intepreter. """

    def __init__(self, io, id):
        super(Gateway, self).__init__(io=io, id=id, _startcount=1)
        self._remote_bootstrap_gateway(io)
        self._initreceive()

    def __repr__(self):
        """ return string representing gateway type and status. """
        try:
            r = (self.hasreceiver() and 'receive-live' or 'not-receiving')
            i = len(self._channelfactory.channels())
        except AttributeError:
            r = "uninitialized"
            i = "no"
        return "<%s id=%r %s, %s active channels>" %(
                self.__class__.__name__, self.id, r, i)

    def exit(self):
        """ trigger gateway exit.  Defer waiting for finishing
        of receiver-thread and subprocess activity to when
        group.terminate() is called.
        """
        self._trace("gateway.exit() called")
        if self not in self._group:
            self._trace("gateway already unregistered with group")
            return
        self._group._unregister(self)
        self._trace("--> sending GATEWAY_TERMINATE")
        try:
            self._send(Message.GATEWAY_TERMINATE)
            self._io.close_write()
        except IOError:
            v = sys.exc_info()[1]
            self._trace("io-error: could not send termination sequence")
            self._trace(" exception: %r" % v)

    def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False):
        """
        set the string coercion for this gateway
        the default is to try to convert py2 str as py3 str,
        but not to try and convert py3 str to py2 str
        """
        self._unserializer.py2str_as_py3str = py2str_as_py3str
        self._unserializer.py3str_as_py2str = py3str_as_py2str
        self._send(Message.RECONFIGURE, data=(py2str_as_py3str, py3str_as_py2str))

    def _remote_bootstrap_gateway(self, io):
        """ send gateway bootstrap code to a remote Python interpreter
            endpoint, which reads from io for a string to execute.
        """
        sendexec(io,
            inspect.getsource(gateway_base),
            self._remotesetup,
            "io.write('1'.encode('ascii'))",
            "serve(io, id='%s-slave')" % self.id,
        )
        s = io.read(1)
        assert s == "1".encode('ascii')

    def _rinfo(self, update=False):
        """ return some sys/env information from remote. """
        if update or not hasattr(self, '_cache_rinfo'):
            ch = self.remote_exec(rinfo_source)
            self._cache_rinfo = RInfo(ch.receive())
        return self._cache_rinfo

    def hasreceiver(self):
        """ return True if gateway is able to receive data. """
        return self._receiverthread.isAlive() # approxmimation

    def remote_status(self):
        """ return information object about remote execution status. """
        channel = self.newchannel()
        self._send(Message.STATUS, channel.id)
        statusdict = channel.receive()
        # the other side didn't actually instantiate a channel
        # so we just delete the internal id/channel mapping
        self._channelfactory._local_close(channel.id)
        return RemoteStatus(statusdict)

    def remote_exec(self, source, **kwargs):
        """ return channel object and connect it to a remote
            execution thread where the given ``source`` executes.

            * ``source`` is a string: execute source string remotely
              with a ``channel`` put into the global namespace.
            * ``source`` is a pure function: serialize source and
              call function with ``**kwargs``, adding a
              ``channel`` object to the keyword arguments.
            * ``source`` is a pure module: execute source of module
              with a ``channel`` in its global namespace

            In all cases the binding ``__name__='__channelexec__'``
            will be available in the global namespace of the remotely
            executing code.
        """
        call_name = None
        if isinstance(source, types.ModuleType):
            linecache.updatecache(inspect.getsourcefile(source))
            source = inspect.getsource(source)
        elif isinstance(source, types.FunctionType):
            call_name = source.__name__
            source = _source_of_function(source)
        else:
            source = textwrap.dedent(str(source))

        if call_name is None and kwargs:
            raise TypeError("can't pass kwargs to non-function remote_exec")

        channel = self.newchannel()
        self._send(Message.CHANNEL_EXEC,
                   channel.id,
                   (source, call_name, kwargs))
        return channel

    def remote_init_threads(self, num=None):
        """ start up to 'num' threads for subsequent
            remote_exec() invocations to allow concurrent
            execution.
        """
        if hasattr(self, '_remotechannelthread'):
            raise IOError("remote threads already running")
        from execnet import threadpool
        source = inspect.getsource(threadpool)
        self._remotechannelthread = self.remote_exec(source)
        self._remotechannelthread.send(num)
        status = self._remotechannelthread.receive()
        assert status == "ok", status

class RInfo:
    def __init__(self, kwargs):
        self.__dict__.update(kwargs)

    def __repr__(self):
        info = ", ".join(["%s=%s" % item
                for item in self.__dict__.items()])
        return "<RInfo %r>" % info

RemoteStatus = RInfo

rinfo_source = """
import sys, os
channel.send(dict(
    executable = sys.executable,
    version_info = sys.version_info[:5],
    platform = sys.platform,
    cwd = os.getcwd(),
    pid = os.getpid(),
))
"""


def _find_non_builtin_globals(source, codeobj):
    try:
        import ast
    except ImportError:
        return None
    try:
        import __builtin__
    except ImportError:
        import builtins as __builtin__

    vars = dict.fromkeys(codeobj.co_varnames)
    all = []
    for node in ast.walk(ast.parse(source)):
        if (isinstance(node, ast.Name) and node.id not in vars and
            node.id not in __builtin__.__dict__):
            all.append(node.id)
    return all


def _source_of_function(function):
    if function.__name__ == '<lambda>':
        raise ValueError("can't evaluate lambda functions'")
    #XXX: we dont check before remote instanciation
    #     if arguments are used propperly
    args, varargs, keywords, defaults = inspect.getargspec(function)
    if args[0] != 'channel':
        raise ValueError('expected first function argument to be `channel`')

    if sys.version_info < (3,0):
        closure = function.func_closure
        codeobj = function.func_code
    else:
        closure = function.__closure__
        codeobj = function.__code__

    if closure is not None:
        raise ValueError("functions with closures can't be passed")

    try:
        source = inspect.getsource(function)
    except IOError:
        raise ValueError("can't find source file for %s" % function)

    source = textwrap.dedent(source) # just for inner functions

    used_globals = _find_non_builtin_globals(source, codeobj)
    if used_globals:
        raise ValueError(
            "the use of non-builtin globals isn't supported",
            used_globals,
        )

    return source

class PopenCmdGateway(Gateway):
    _remotesetup = "io = init_popen_io()"
    def __init__(self, args, id):
        from subprocess import Popen, PIPE
        self._popen = p = Popen(args, stdin=PIPE, stdout=PIPE)
        io = Popen2IO(p.stdin, p.stdout)
        super(PopenCmdGateway, self).__init__(io=io, id=id)
        # fix for jython 2.5.1
        if p.pid is None:
            p.pid = self.remote_exec(
                "import os; channel.send(os.getpid())").receive()

popen_bootstrapline = "import sys;exec(eval(sys.stdin.readline()))"
class PopenGateway(PopenCmdGateway):
    """ This Gateway provides interaction with a newly started
        python subprocess.
    """
    def __init__(self, id, python=None):
        """ instantiate a gateway to a subprocess
            started with the given 'python' executable.
        """
        if not python:
            python = sys.executable
        args = [str(python), '-u', '-c', popen_bootstrapline]
        super(PopenGateway, self).__init__(args, id=id)

    def _remote_bootstrap_gateway(self, io):
        sendexec(io,
                 "import sys",
                 "sys.stdout.write('1')",
                 "sys.stdout.flush()",
                 popen_bootstrapline)
        sendexec(io,
            "import sys ; sys.path.insert(0, %r)" % importdir,
            "from execnet.gateway_base import serve, init_popen_io",
            "serve(init_popen_io(), id='%s-slave')" % self.id,
        )
        s = io.read(1)
        assert s == "1".encode('ascii')

def sendexec(io, *sources):
    source = "\n".join(sources)
    io.write((repr(source)+ "\n").encode('ascii'))

class HostNotFound(Exception):
    pass

class SshGateway(PopenCmdGateway):
    """ This Gateway provides interaction with a remote Python process,
        established via the 'ssh' command line binary.
        The remote side needs to have a Python interpreter executable.
    """
    def __init__(self, sshaddress, id, remotepython=None, ssh_config=None):
        """ instantiate a remote ssh process with the
            given 'sshaddress' and remotepython version.
            you may specify an ssh_config file.
        """
        self.remoteaddress = sshaddress
        if remotepython is None:
            remotepython = "python"
        args = ['ssh', '-C' ]
        if ssh_config is not None:
            args.extend(['-F', str(ssh_config)])
        remotecmd = '%s -c "%s"' %(remotepython, popen_bootstrapline)
        args.extend([sshaddress, remotecmd])
        super(SshGateway, self).__init__(args, id=id)

    def _remote_bootstrap_gateway(self, io):
        try:
            super(SshGateway, self)._remote_bootstrap_gateway(io)
        except EOFError:
            ret = self._popen.wait()
            if ret == 255:
                raise HostNotFound(self.remoteaddress)