This file is indexed.

/usr/lib/python2.7/dist-packages/scoop/shared.py is in python-scoop 0.7.1.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python
#
#    This file is part of Scalable COncurrent Operations in Python (SCOOP). 
#
#    SCOOP is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as
#    published by the Free Software Foundation, either version 3 of
#    the License, or (at your option) any later version.
#
#    SCOOP is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public
#    License along with SCOOP. If not, see <http://www.gnu.org/licenses/>.
#

import itertools
from inspect import ismethod
from functools import reduce
import time

from . import encapsulation, utils
import scoop
from .fallbacks import ensureScoopStartedProperly, NotStartedProperly


elements = None


def _ensureAtomicity(fn):
    """Ensure atomicity of passed elements on the whole worker pool"""
    @ensureScoopStartedProperly
    def wrapper(*args, **kwargs):
        """setConst(**kwargs)
        Set a constant that will be shared to every workers.
        This call blocks until the constant has propagated to at least one
        worker.

        :param \*\*kwargs: One or more combination(s) key=value. Key being the
            variable name and value the object to share.

        :returns: None.

        Usage: setConst(name=value)
        """
        # Note that the docstring is the one of setConst.
        # This is because of the documentation framework (sphinx) limitations.

        from . import _control

        # Enforce retrieval of currently awaiting constants
        _control.execQueue.socket.pumpInfoSocket()

        for key, value in kwargs.items():
            # Object name existence check
            if key in itertools.chain(*(elem.keys() for elem in elements.values())):
                raise TypeError("This constant already exists: {0}.".format(key))

        # Retry element propagation until it is returned
        while all(key in elements.get(scoop.worker, []) for key in kwargs.keys()) is not True:
            scoop.logger.debug("Sending global variables {0}...".format(
                list(kwargs.keys())
            ))
            # Call the function
            fn(*args, **kwargs)

            # Enforce retrieval of currently awaiting constants
            _control.execQueue.socket.pumpInfoSocket()

            # TODO: Make previous blocking instead of sleep
            time.sleep(0.1)

        # Atomicity check
        elementNames = list(itertools.chain(*(elem.keys() for elem in elements.values())))
        if len(elementNames) != len(set(elementNames)):
            raise TypeError("This constant already exists: {0}.".format(key))

    return wrapper


@_ensureAtomicity
def setConst(**kwargs):
    """setConst(**kwargs)
    Set a constant that will be shared to every workers.

    :param **kwargs: One or more combination(s) key=value. Key being the
        variable name and value the object to share.

    :returns: None.

    Usage: setConst(name=value)
    """
    from . import _control
    
    sendVariable = _control.execQueue.socket.sendVariable

    for key, value in kwargs.items():
        # Propagate the constant
        # for file-like objects, see encapsulation.py where copyreg was
        # used to overload standard pickling.
        if callable(value):
            sendVariable(key, encapsulation.FunctionEncapsulation(value, key))
        else:
            sendVariable(key, value)


def getConst(name, timeout=0.1):
    """Get a shared constant.

    :param name: The name of the shared variable to retrieve.
    :param timeout: The maximum time to wait in seconds for the propagation of
        the constant.

    :returns: The shared object.

    Usage: value = getConst('name')
    """
    from . import _control
    import time

    timeStamp = time.time()
    while True:
        # Enforce retrieval of currently awaiting constants
        _control.execQueue.socket.pumpInfoSocket()

        # Constants concatenation
        constants = dict(reduce(
            lambda x, y: x + list(y.items()),
            elements.values(),
            []
        ))
        timeoutHappened = time.time() - timeStamp > timeout
        if constants.get(name) is not None or timeoutHappened:
            return constants.get(name)
        time.sleep(0.01)


class SharedElementEncapsulation(object):
    """Encapsulates a reference to an element available in the shared module.

    This is used by Futures (map on lambda, for instance)."""
    def __init__(self, element):
        self.isMethod = False
        if utils.isStr(element):
            # Already shared element
            assert getConst(element, timeout=0) != None, (
                "Element must already be shared."
            )
            self.uniqueID = element
        else:
            # Element to share
            # Determine if function is a method. Methods derived from external
            # languages such as C++ aren't detected by ismethod.
            if ismethod(element):
                # Must share whole object before ability to use its method
                self.isMethod = True
                self.methodName = element.__name__
                element = element.__self__

            # Lambda-like or unshared code to share
            uniqueID = str(scoop.worker) + str(id(element)) + str(hash(element))
            self.uniqueID = uniqueID
            if getConst(uniqueID, timeout=0) == None:
                funcRef = {uniqueID: element}
                setConst(**funcRef)

    def __repr__(self):
        return self.uniqueID

    def __call__(self, *args, **kwargs):
        if self.isMethod:
            wholeObj = getConst(
                self.__repr__(),
                timeout=float("inf"),
            )
            return getattr(wholeObj, self.methodName)(*args, **kwargs)
        else:
            return getConst(self.__repr__(),
                            timeout=float("inf"))(*args, **kwargs)

    def __name__(self):
        return self.__repr__()