This file is indexed.

/usr/share/pyshared/deap/dtm/commManagerMpi4py.py is in python-deap 0.7.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
try:
    import Queue
except ImportError:
    import queue as Queue
import time
import threading
try:
    import cPickle
except ImportError:
    import pickle as cPickle
import array
import copy
import logging

try:
    from lxml import etree
except ImportError:
    try:
        import xml.etree.cElementTree as etree
    except ImportError:
        # Python 2.5
        import xml.etree.ElementTree as etree

from deap.dtm.dtmTypes import *
from deap.dtm.abstractCommManager import AbstractCommThread

_logger = logging.getLogger("dtm.communication")

DTM_MPI_MIN_LATENCY = 0.005
DTM_MPI_MAX_LATENCY = 0.01
DTM_CONCURRENT_RECV_LIMIT = 1000
DTM_CONCURRENT_SEND_LIMIT = 1000

class CommThread(AbstractCommThread):

    def __init__(self, recvQ, sendQ, mainThreadEvent, exitEvent, commReadyEvent, randomGenerator, cmdlineArgs):        
        AbstractCommThread.__init__(self, recvQ, sendQ, mainThreadEvent, exitEvent, commReadyEvent, randomGenerator, cmdlineArgs)
    
    @property
    def poolSize(self):
        return self.pSize

    @property
    def workerId(self):
        return self.currentId

    @property
    def isRootWorker(self):
        return self.currentId == 0
    
    @property
    def isLaunchProcess(self):
        return False
    
    def setTraceModeOn(self, xmlLogger):
        self.traceMode = True
        self.traceTo = xmlLogger

    def iterOverIDs(self):
        return range(self.pSize)

    def run(self):
        from mpi4py import MPI
        
        def mpiSend(msg, dest):
            # Pickle and send over MPI
            arrayBuf = array.array('b')
            arrayBuf.fromstring(cPickle.dumps(msg, cPickle.HIGHEST_PROTOCOL))
            
            b = MPI.COMM_WORLD.Isend([arrayBuf, MPI.CHAR], dest=dest, tag=self.msgSendTag)
            if self.traceMode:
                etree.SubElement(self.traceTo, "msg", {"direc" : "out", "type" : str(msg.msgType), "otherWorker" : str(dest), "msgtag" : str(self.msgSendTag), "time" : repr(time.time())})
            
            self.msgSendTag += 1
            return b, arrayBuf
        
        assert MPI.Is_initialized(), "Error in MPI Init!"
        
        self.pSize = MPI.COMM_WORLD.Get_size()
        self.currentId = MPI.COMM_WORLD.Get_rank()
        
        self.commReadyEvent.set()   # Notify the main thread that we are ready
        
        if self.currentId == 0 and MPI.Query_thread() > 0:
            # Warn only once
            _logger.warning("MPI was initialized with a thread level of %i, which is higher than MPI_THREAD_SINGLE."
            " The current MPI implementations do not always handle well the MPI_THREAD_MULTIPLE or MPI_THREAD_SERIALIZED modes."
            " As DTM was designed to work with the base, safe mode (MPI_THREAD_SINGLE), it is strongly suggested to change"
            " the 'thread_level' variable or your mpi4py settings in 'site-packages/mpi4py/rc.py', unless you have strong"
            " motivations to keep that setting. This may bring both stability and performance improvements.", MPI.Query_thread())
        
        lRecvWaiting = []
        lSendWaiting = []
        countSend = 0
        countRecv = 0
        lMessageStatus = MPI.Status()
        working = True
        
        countRecvNotTransmit = 0
        countRecvTimeInit = time.time()

        while working:
            recvSomething = False
            sendSomething = False

            if self.exitStatus.is_set():    # Exiting
                # Warning : the communication thread MUST clear the sendQ
                # BEFORE leaving (the exiting orders must be send)
                working = False

            while len(lRecvWaiting) < DTM_CONCURRENT_RECV_LIMIT and MPI.COMM_WORLD.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=lMessageStatus):
                # We received something
                lBuf = array.array('b', (0,))
                lBuf = lBuf * lMessageStatus.Get_elements(MPI.CHAR)
                
                lRecvWaiting.append((lBuf, MPI.COMM_WORLD.Irecv([lBuf, MPI.CHAR], source=lMessageStatus.Get_source(), tag=lMessageStatus.Get_tag()), lMessageStatus.Get_tag()))

                lMessageStatus = MPI.Status()
                recvSomething = True

            
            for i, reqTuple in enumerate(lRecvWaiting):
                if reqTuple[1].Test():
                    countRecv += 1
                    dataS = cPickle.loads(reqTuple[0].tostring())
                    if self.traceMode:
                        etree.SubElement(self.traceTo, "msg", {"direc" : "in", "type" : str(dataS.msgType), "otherWorker" : str(dataS.senderWid), "msgtag" : str(reqTuple[2]), "time" : repr(time.time())})
                    self.recvQ.put(dataS)
                    lRecvWaiting[i] = None
                    recvSomething = True
                    # Wake up the main thread if there's a sufficient number
                    # of pending receives
                    countRecvNotTransmit += 1
                    
                    
            if countRecvNotTransmit > 50 or (time.time() - countRecvTimeInit > 0.1 and countRecvNotTransmit > 0):
                countRecvNotTransmit = 0
                countRecvTimeInit = time.time()
                self.wakeUpMainThread.set()        
                    
            lRecvWaiting = filter(lambda d: not d is None, lRecvWaiting)
            if not isinstance(lRecvWaiting, list):
                lRecvWaiting = list(lRecvWaiting)

            while len(lSendWaiting) < DTM_CONCURRENT_SEND_LIMIT:
                # Send all pending sends, under the limit of
                # DTM_CONCURRENT_SEND_LIMIT
                try:
                    sendMsg = self.sendQ.get_nowait()
                    countSend += 1
                    sendMsg.sendTime = time.time()
                    commA, buf1 = mpiSend(sendMsg, sendMsg.receiverWid)
                    lSendWaiting.append((commA, buf1))
                    sendSomething = True
                except Queue.Empty:
                    break
            
            lSendWaiting = filter(lambda d: not d[0].Test(), lSendWaiting)
            if not isinstance(lSendWaiting, list):  # Python 3
                lSendWaiting = list(lSendWaiting)
            
            if not recvSomething:
                time.sleep(self.random.uniform(DTM_MPI_MIN_LATENCY, DTM_MPI_MAX_LATENCY))
                
        while len(lSendWaiting) > 0:
            # Send the lasts messages before shutdown
            lSendWaiting = filter(lambda d: not d[0].Test(), lSendWaiting)
            if not isinstance(lSendWaiting, list):  # Python 3
                lSendWaiting = list(lSendWaiting)
            time.sleep(self.random.uniform(DTM_MPI_MIN_LATENCY, DTM_MPI_MAX_LATENCY))
            
        del lSendWaiting
        del lRecvWaiting