/usr/share/pyshared/deap/dtm/commManagerMpi4py.py is in python-deap 0.7.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | try:
import Queue
except ImportError:
import queue as Queue
import time
import threading
try:
import cPickle
except ImportError:
import pickle as cPickle
import array
import copy
import logging
try:
from lxml import etree
except ImportError:
try:
import xml.etree.cElementTree as etree
except ImportError:
# Python 2.5
import xml.etree.ElementTree as etree
from deap.dtm.dtmTypes import *
from deap.dtm.abstractCommManager import AbstractCommThread
_logger = logging.getLogger("dtm.communication")
DTM_MPI_MIN_LATENCY = 0.005
DTM_MPI_MAX_LATENCY = 0.01
DTM_CONCURRENT_RECV_LIMIT = 1000
DTM_CONCURRENT_SEND_LIMIT = 1000
class CommThread(AbstractCommThread):
def __init__(self, recvQ, sendQ, mainThreadEvent, exitEvent, commReadyEvent, randomGenerator, cmdlineArgs):
AbstractCommThread.__init__(self, recvQ, sendQ, mainThreadEvent, exitEvent, commReadyEvent, randomGenerator, cmdlineArgs)
@property
def poolSize(self):
return self.pSize
@property
def workerId(self):
return self.currentId
@property
def isRootWorker(self):
return self.currentId == 0
@property
def isLaunchProcess(self):
return False
def setTraceModeOn(self, xmlLogger):
self.traceMode = True
self.traceTo = xmlLogger
def iterOverIDs(self):
return range(self.pSize)
def run(self):
from mpi4py import MPI
def mpiSend(msg, dest):
# Pickle and send over MPI
arrayBuf = array.array('b')
arrayBuf.fromstring(cPickle.dumps(msg, cPickle.HIGHEST_PROTOCOL))
b = MPI.COMM_WORLD.Isend([arrayBuf, MPI.CHAR], dest=dest, tag=self.msgSendTag)
if self.traceMode:
etree.SubElement(self.traceTo, "msg", {"direc" : "out", "type" : str(msg.msgType), "otherWorker" : str(dest), "msgtag" : str(self.msgSendTag), "time" : repr(time.time())})
self.msgSendTag += 1
return b, arrayBuf
assert MPI.Is_initialized(), "Error in MPI Init!"
self.pSize = MPI.COMM_WORLD.Get_size()
self.currentId = MPI.COMM_WORLD.Get_rank()
self.commReadyEvent.set() # Notify the main thread that we are ready
if self.currentId == 0 and MPI.Query_thread() > 0:
# Warn only once
_logger.warning("MPI was initialized with a thread level of %i, which is higher than MPI_THREAD_SINGLE."
" The current MPI implementations do not always handle well the MPI_THREAD_MULTIPLE or MPI_THREAD_SERIALIZED modes."
" As DTM was designed to work with the base, safe mode (MPI_THREAD_SINGLE), it is strongly suggested to change"
" the 'thread_level' variable or your mpi4py settings in 'site-packages/mpi4py/rc.py', unless you have strong"
" motivations to keep that setting. This may bring both stability and performance improvements.", MPI.Query_thread())
lRecvWaiting = []
lSendWaiting = []
countSend = 0
countRecv = 0
lMessageStatus = MPI.Status()
working = True
countRecvNotTransmit = 0
countRecvTimeInit = time.time()
while working:
recvSomething = False
sendSomething = False
if self.exitStatus.is_set(): # Exiting
# Warning : the communication thread MUST clear the sendQ
# BEFORE leaving (the exiting orders must be send)
working = False
while len(lRecvWaiting) < DTM_CONCURRENT_RECV_LIMIT and MPI.COMM_WORLD.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=lMessageStatus):
# We received something
lBuf = array.array('b', (0,))
lBuf = lBuf * lMessageStatus.Get_elements(MPI.CHAR)
lRecvWaiting.append((lBuf, MPI.COMM_WORLD.Irecv([lBuf, MPI.CHAR], source=lMessageStatus.Get_source(), tag=lMessageStatus.Get_tag()), lMessageStatus.Get_tag()))
lMessageStatus = MPI.Status()
recvSomething = True
for i, reqTuple in enumerate(lRecvWaiting):
if reqTuple[1].Test():
countRecv += 1
dataS = cPickle.loads(reqTuple[0].tostring())
if self.traceMode:
etree.SubElement(self.traceTo, "msg", {"direc" : "in", "type" : str(dataS.msgType), "otherWorker" : str(dataS.senderWid), "msgtag" : str(reqTuple[2]), "time" : repr(time.time())})
self.recvQ.put(dataS)
lRecvWaiting[i] = None
recvSomething = True
# Wake up the main thread if there's a sufficient number
# of pending receives
countRecvNotTransmit += 1
if countRecvNotTransmit > 50 or (time.time() - countRecvTimeInit > 0.1 and countRecvNotTransmit > 0):
countRecvNotTransmit = 0
countRecvTimeInit = time.time()
self.wakeUpMainThread.set()
lRecvWaiting = filter(lambda d: not d is None, lRecvWaiting)
if not isinstance(lRecvWaiting, list):
lRecvWaiting = list(lRecvWaiting)
while len(lSendWaiting) < DTM_CONCURRENT_SEND_LIMIT:
# Send all pending sends, under the limit of
# DTM_CONCURRENT_SEND_LIMIT
try:
sendMsg = self.sendQ.get_nowait()
countSend += 1
sendMsg.sendTime = time.time()
commA, buf1 = mpiSend(sendMsg, sendMsg.receiverWid)
lSendWaiting.append((commA, buf1))
sendSomething = True
except Queue.Empty:
break
lSendWaiting = filter(lambda d: not d[0].Test(), lSendWaiting)
if not isinstance(lSendWaiting, list): # Python 3
lSendWaiting = list(lSendWaiting)
if not recvSomething:
time.sleep(self.random.uniform(DTM_MPI_MIN_LATENCY, DTM_MPI_MAX_LATENCY))
while len(lSendWaiting) > 0:
# Send the lasts messages before shutdown
lSendWaiting = filter(lambda d: not d[0].Test(), lSendWaiting)
if not isinstance(lSendWaiting, list): # Python 3
lSendWaiting = list(lSendWaiting)
time.sleep(self.random.uniform(DTM_MPI_MIN_LATENCY, DTM_MPI_MAX_LATENCY))
del lSendWaiting
del lRecvWaiting
|