/usr/lib/python2.7/dist-packages/pyqtgraph/multiprocess/remoteproxy.py is in python-pyqtgraph 0.9.10-5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 | import os, time, sys, traceback, weakref
import numpy as np
import threading
try:
import __builtin__ as builtins
import cPickle as pickle
except ImportError:
import builtins
import pickle
# color printing for debugging
from ..util import cprint
class ClosedError(Exception):
"""Raised when an event handler receives a request to close the connection
or discovers that the connection has been closed."""
pass
class NoResultError(Exception):
"""Raised when a request for the return value of a remote call fails
because the call has not yet returned."""
pass
class RemoteEventHandler(object):
"""
This class handles communication between two processes. One instance is present on
each process and listens for communication from the other process. This enables
(amongst other things) ObjectProxy instances to look up their attributes and call
their methods.
This class is responsible for carrying out actions on behalf of the remote process.
Each instance holds one end of a Connection which allows python
objects to be passed between processes.
For the most common operations, see _import(), close(), and transfer()
To handle and respond to incoming requests, RemoteEventHandler requires that its
processRequests method is called repeatedly (this is usually handled by the Process
classes defined in multiprocess.processes).
"""
handlers = {} ## maps {process ID : handler}. This allows unpickler to determine which process
## an object proxy belongs to
def __init__(self, connection, name, pid, debug=False):
self.debug = debug
self.conn = connection
self.name = name
self.results = {} ## reqId: (status, result); cache of request results received from the remote process
## status is either 'result' or 'error'
## if 'error', then result will be (exception, formatted exceprion)
## where exception may be None if it could not be passed through the Connection.
self.resultLock = threading.RLock()
self.proxies = {} ## maps {weakref(proxy): proxyId}; used to inform the remote process when a proxy has been deleted.
self.proxyLock = threading.RLock()
## attributes that affect the behavior of the proxy.
## See ObjectProxy._setProxyOptions for description
self.proxyOptions = {
'callSync': 'sync', ## 'sync', 'async', 'off'
'timeout': 10, ## float
'returnType': 'auto', ## 'proxy', 'value', 'auto'
'autoProxy': False, ## bool
'deferGetattr': False, ## True, False
'noProxyTypes': [ type(None), str, int, float, tuple, list, dict, LocalObjectProxy, ObjectProxy ],
}
self.optsLock = threading.RLock()
self.nextRequestId = 0
self.exited = False
# Mutexes to help prevent issues when multiple threads access the same RemoteEventHandler
self.processLock = threading.RLock()
self.sendLock = threading.RLock()
RemoteEventHandler.handlers[pid] = self ## register this handler as the one communicating with pid
@classmethod
def getHandler(cls, pid):
try:
return cls.handlers[pid]
except:
print(pid, cls.handlers)
raise
def debugMsg(self, msg):
if not self.debug:
return
cprint.cout(self.debug, "[%d] %s\n" % (os.getpid(), str(msg)), -1)
def getProxyOption(self, opt):
with self.optsLock:
return self.proxyOptions[opt]
def setProxyOptions(self, **kwds):
"""
Set the default behavior options for object proxies.
See ObjectProxy._setProxyOptions for more info.
"""
with self.optsLock:
self.proxyOptions.update(kwds)
def processRequests(self):
"""Process all pending requests from the pipe, return
after no more events are immediately available. (non-blocking)
Returns the number of events processed.
"""
with self.processLock:
if self.exited:
self.debugMsg(' processRequests: exited already; raise ClosedError.')
raise ClosedError()
numProcessed = 0
while self.conn.poll():
#try:
#poll = self.conn.poll()
#if not poll:
#break
#except IOError: # this can happen if the remote process dies.
## might it also happen in other circumstances?
#raise ClosedError()
try:
self.handleRequest()
numProcessed += 1
except ClosedError:
self.debugMsg('processRequests: got ClosedError from handleRequest; setting exited=True.')
self.exited = True
raise
#except IOError as err: ## let handleRequest take care of this.
#self.debugMsg(' got IOError from handleRequest; try again.')
#if err.errno == 4: ## interrupted system call; try again
#continue
#else:
#raise
except:
print("Error in process %s" % self.name)
sys.excepthook(*sys.exc_info())
if numProcessed > 0:
self.debugMsg('processRequests: finished %d requests' % numProcessed)
return numProcessed
def handleRequest(self):
"""Handle a single request from the remote process.
Blocks until a request is available."""
result = None
while True:
try:
## args, kwds are double-pickled to ensure this recv() call never fails
cmd, reqId, nByteMsgs, optStr = self.conn.recv()
break
except EOFError:
self.debugMsg(' handleRequest: got EOFError from recv; raise ClosedError.')
## remote process has shut down; end event loop
raise ClosedError()
except IOError as err:
if err.errno == 4: ## interrupted system call; try again
self.debugMsg(' handleRequest: got IOError 4 from recv; try again.')
continue
else:
self.debugMsg(' handleRequest: got IOError %d from recv (%s); raise ClosedError.' % (err.errno, err.strerror))
raise ClosedError()
self.debugMsg(" handleRequest: received %s %s" % (str(cmd), str(reqId)))
## read byte messages following the main request
byteData = []
if nByteMsgs > 0:
self.debugMsg(" handleRequest: reading %d byte messages" % nByteMsgs)
for i in range(nByteMsgs):
while True:
try:
byteData.append(self.conn.recv_bytes())
break
except EOFError:
self.debugMsg(" handleRequest: got EOF while reading byte messages; raise ClosedError.")
raise ClosedError()
except IOError as err:
if err.errno == 4:
self.debugMsg(" handleRequest: got IOError 4 while reading byte messages; try again.")
continue
else:
self.debugMsg(" handleRequest: got IOError while reading byte messages; raise ClosedError.")
raise ClosedError()
try:
if cmd == 'result' or cmd == 'error':
resultId = reqId
reqId = None ## prevents attempt to return information from this request
## (this is already a return from a previous request)
opts = pickle.loads(optStr)
self.debugMsg(" handleRequest: id=%s opts=%s" % (str(reqId), str(opts)))
#print os.getpid(), "received request:", cmd, reqId, opts
returnType = opts.get('returnType', 'auto')
if cmd == 'result':
with self.resultLock:
self.results[resultId] = ('result', opts['result'])
elif cmd == 'error':
with self.resultLock:
self.results[resultId] = ('error', (opts['exception'], opts['excString']))
elif cmd == 'getObjAttr':
result = getattr(opts['obj'], opts['attr'])
elif cmd == 'callObj':
obj = opts['obj']
fnargs = opts['args']
fnkwds = opts['kwds']
## If arrays were sent as byte messages, they must be re-inserted into the
## arguments
if len(byteData) > 0:
for i,arg in enumerate(fnargs):
if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__':
ind = arg[1]
dtype, shape = arg[2]
fnargs[i] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape)
for k,arg in fnkwds.items():
if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__':
ind = arg[1]
dtype, shape = arg[2]
fnkwds[k] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape)
if len(fnkwds) == 0: ## need to do this because some functions do not allow keyword arguments.
try:
result = obj(*fnargs)
except:
print("Failed to call object %s: %d, %s" % (obj, len(fnargs), fnargs[1:]))
raise
else:
result = obj(*fnargs, **fnkwds)
elif cmd == 'getObjValue':
result = opts['obj'] ## has already been unpickled into its local value
returnType = 'value'
elif cmd == 'transfer':
result = opts['obj']
returnType = 'proxy'
elif cmd == 'transferArray':
## read array data from next message:
result = np.fromstring(byteData[0], dtype=opts['dtype']).reshape(opts['shape'])
returnType = 'proxy'
elif cmd == 'import':
name = opts['module']
fromlist = opts.get('fromlist', [])
mod = builtins.__import__(name, fromlist=fromlist)
if len(fromlist) == 0:
parts = name.lstrip('.').split('.')
result = mod
for part in parts[1:]:
result = getattr(result, part)
else:
result = map(mod.__getattr__, fromlist)
elif cmd == 'del':
LocalObjectProxy.releaseProxyId(opts['proxyId'])
#del self.proxiedObjects[opts['objId']]
elif cmd == 'close':
if reqId is not None:
result = True
returnType = 'value'
exc = None
except:
exc = sys.exc_info()
if reqId is not None:
if exc is None:
self.debugMsg(" handleRequest: sending return value for %d: %s" % (reqId, str(result)))
#print "returnValue:", returnValue, result
if returnType == 'auto':
with self.optsLock:
noProxyTypes = self.proxyOptions['noProxyTypes']
result = self.autoProxy(result, noProxyTypes)
elif returnType == 'proxy':
result = LocalObjectProxy(result)
try:
self.replyResult(reqId, result)
except:
sys.excepthook(*sys.exc_info())
self.replyError(reqId, *sys.exc_info())
else:
self.debugMsg(" handleRequest: returning exception for %d" % reqId)
self.replyError(reqId, *exc)
elif exc is not None:
sys.excepthook(*exc)
if cmd == 'close':
if opts.get('noCleanup', False) is True:
os._exit(0) ## exit immediately, do not pass GO, do not collect $200.
## (more importantly, do not call any code that would
## normally be invoked at exit)
else:
raise ClosedError()
def replyResult(self, reqId, result):
self.send(request='result', reqId=reqId, callSync='off', opts=dict(result=result))
def replyError(self, reqId, *exc):
print("error: %s %s %s" % (self.name, str(reqId), str(exc[1])))
excStr = traceback.format_exception(*exc)
try:
self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=exc[1], excString=excStr))
except:
self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=None, excString=excStr))
def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, byteData=None, **kwds):
"""Send a request or return packet to the remote process.
Generally it is not necessary to call this method directly; it is for internal use.
(The docstring has information that is nevertheless useful to the programmer
as it describes the internal protocol used to communicate between processes)
============== ====================================================================
**Arguments:**
request String describing the type of request being sent (see below)
reqId Integer uniquely linking a result back to the request that generated
it. (most requests leave this blank)
callSync 'sync': return the actual result of the request
'async': return a Request object which can be used to look up the
result later
'off': return no result
timeout Time in seconds to wait for a response when callSync=='sync'
opts Extra arguments sent to the remote process that determine the way
the request will be handled (see below)
returnType 'proxy', 'value', or 'auto'
byteData If specified, this is a list of objects to be sent as byte messages
to the remote process.
This is used to send large arrays without the cost of pickling.
============== ====================================================================
Description of request strings and options allowed for each:
============= ============= ========================================================
request option description
------------- ------------- --------------------------------------------------------
getObjAttr Request the remote process return (proxy to) an
attribute of an object.
obj reference to object whose attribute should be
returned
attr string name of attribute to return
returnValue bool or 'auto' indicating whether to return a proxy or
the actual value.
callObj Request the remote process call a function or
method. If a request ID is given, then the call's
return value will be sent back (or information
about the error that occurred while running the
function)
obj the (reference to) object to call
args tuple of arguments to pass to callable
kwds dict of keyword arguments to pass to callable
returnValue bool or 'auto' indicating whether to return a proxy or
the actual value.
getObjValue Request the remote process return the value of
a proxied object (must be picklable)
obj reference to object whose value should be returned
transfer Copy an object to the remote process and request
it return a proxy for the new object.
obj The object to transfer.
import Request the remote process import new symbols
and return proxy(ies) to the imported objects
module the string name of the module to import
fromlist optional list of string names to import from module
del Inform the remote process that a proxy has been
released (thus the remote process may be able to
release the original object)
proxyId id of proxy which is no longer referenced by
remote host
close Instruct the remote process to stop its event loop
and exit. Optionally, this request may return a
confirmation.
result Inform the remote process that its request has
been processed
result return value of a request
error Inform the remote process that its request failed
exception the Exception that was raised (or None if the
exception could not be pickled)
excString string-formatted version of the exception and
traceback
============= =====================================================================
"""
if self.exited:
self.debugMsg(' send: exited already; raise ClosedError.')
raise ClosedError()
with self.sendLock:
#if len(kwds) > 0:
#print "Warning: send() ignored args:", kwds
if opts is None:
opts = {}
assert callSync in ['off', 'sync', 'async'], 'callSync must be one of "off", "sync", or "async"'
if reqId is None:
if callSync != 'off': ## requested return value; use the next available request ID
reqId = self.nextRequestId
self.nextRequestId += 1
else:
## If requestId is provided, this _must_ be a response to a previously received request.
assert request in ['result', 'error']
if returnType is not None:
opts['returnType'] = returnType
#print os.getpid(), "send request:", request, reqId, opts
## double-pickle args to ensure that at least status and request ID get through
try:
optStr = pickle.dumps(opts)
except:
print("==== Error pickling this object: ====")
print(opts)
print("=======================================")
raise
nByteMsgs = 0
if byteData is not None:
nByteMsgs = len(byteData)
## Send primary request
request = (request, reqId, nByteMsgs, optStr)
self.debugMsg('send request: cmd=%s nByteMsgs=%d id=%s opts=%s' % (str(request[0]), nByteMsgs, str(reqId), str(opts)))
self.conn.send(request)
## follow up by sending byte messages
if byteData is not None:
for obj in byteData: ## Remote process _must_ be prepared to read the same number of byte messages!
self.conn.send_bytes(obj)
self.debugMsg(' sent %d byte messages' % len(byteData))
self.debugMsg(' call sync: %s' % callSync)
if callSync == 'off':
return
req = Request(self, reqId, description=str(request), timeout=timeout)
if callSync == 'async':
return req
if callSync == 'sync':
try:
return req.result()
except NoResultError:
return req
def close(self, callSync='off', noCleanup=False, **kwds):
try:
self.send(request='close', opts=dict(noCleanup=noCleanup), callSync=callSync, **kwds)
self.exited = True
except ClosedError:
pass
def getResult(self, reqId):
## raises NoResultError if the result is not available yet
#print self.results.keys(), os.getpid()
with self.resultLock:
haveResult = reqId in self.results
if not haveResult:
try:
self.processRequests()
except ClosedError: ## even if remote connection has closed, we may have
## received new data during this call to processRequests()
pass
with self.resultLock:
if reqId not in self.results:
raise NoResultError()
status, result = self.results.pop(reqId)
if status == 'result':
return result
elif status == 'error':
#print ''.join(result)
exc, excStr = result
if exc is not None:
print("===== Remote process raised exception on request: =====")
print(''.join(excStr))
print("===== Local Traceback to request follows: =====")
raise exc
else:
print(''.join(excStr))
raise Exception("Error getting result. See above for exception from remote process.")
else:
raise Exception("Internal error.")
def _import(self, mod, **kwds):
"""
Request the remote process import a module (or symbols from a module)
and return the proxied results. Uses built-in __import__() function, but
adds a bit more processing:
_import('module') => returns module
_import('module.submodule') => returns submodule
(note this differs from behavior of __import__)
_import('module', fromlist=[name1, name2, ...]) => returns [module.name1, module.name2, ...]
(this also differs from behavior of __import__)
"""
return self.send(request='import', callSync='sync', opts=dict(module=mod), **kwds)
def getObjAttr(self, obj, attr, **kwds):
return self.send(request='getObjAttr', opts=dict(obj=obj, attr=attr), **kwds)
def getObjValue(self, obj, **kwds):
return self.send(request='getObjValue', opts=dict(obj=obj), **kwds)
def callObj(self, obj, args, kwds, **opts):
opts = opts.copy()
args = list(args)
## Decide whether to send arguments by value or by proxy
with self.optsLock:
noProxyTypes = opts.pop('noProxyTypes', None)
if noProxyTypes is None:
noProxyTypes = self.proxyOptions['noProxyTypes']
autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy'])
if autoProxy is True:
args = [self.autoProxy(v, noProxyTypes) for v in args]
for k, v in kwds.iteritems():
opts[k] = self.autoProxy(v, noProxyTypes)
byteMsgs = []
## If there are arrays in the arguments, send those as byte messages.
## We do this because pickling arrays is too expensive.
for i,arg in enumerate(args):
if arg.__class__ == np.ndarray:
args[i] = ("__byte_message__", len(byteMsgs), (arg.dtype, arg.shape))
byteMsgs.append(arg)
for k,v in kwds.items():
if v.__class__ == np.ndarray:
kwds[k] = ("__byte_message__", len(byteMsgs), (v.dtype, v.shape))
byteMsgs.append(v)
return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), byteData=byteMsgs, **opts)
def registerProxy(self, proxy):
with self.proxyLock:
ref = weakref.ref(proxy, self.deleteProxy)
self.proxies[ref] = proxy._proxyId
def deleteProxy(self, ref):
with self.proxyLock:
proxyId = self.proxies.pop(ref)
try:
self.send(request='del', opts=dict(proxyId=proxyId), callSync='off')
except IOError: ## if remote process has closed down, there is no need to send delete requests anymore
pass
def transfer(self, obj, **kwds):
"""
Transfer an object by value to the remote host (the object must be picklable)
and return a proxy for the new remote object.
"""
if obj.__class__ is np.ndarray:
opts = {'dtype': obj.dtype, 'shape': obj.shape}
return self.send(request='transferArray', opts=opts, byteData=[obj], **kwds)
else:
return self.send(request='transfer', opts=dict(obj=obj), **kwds)
def autoProxy(self, obj, noProxyTypes):
## Return object wrapped in LocalObjectProxy _unless_ its type is in noProxyTypes.
for typ in noProxyTypes:
if isinstance(obj, typ):
return obj
return LocalObjectProxy(obj)
class Request(object):
"""
Request objects are returned when calling an ObjectProxy in asynchronous mode
or if a synchronous call has timed out. Use hasResult() to ask whether
the result of the call has been returned yet. Use result() to get
the returned value.
"""
def __init__(self, process, reqId, description=None, timeout=10):
self.proc = process
self.description = description
self.reqId = reqId
self.gotResult = False
self._result = None
self.timeout = timeout
def result(self, block=True, timeout=None):
"""
Return the result for this request.
If block is True, wait until the result has arrived or *timeout* seconds passes.
If the timeout is reached, raise NoResultError. (use timeout=None to disable)
If block is False, raise NoResultError immediately if the result has not arrived yet.
If the process's connection has closed before the result arrives, raise ClosedError.
"""
if self.gotResult:
return self._result
if timeout is None:
timeout = self.timeout
if block:
start = time.time()
while not self.hasResult():
if self.proc.exited:
raise ClosedError()
time.sleep(0.005)
if timeout >= 0 and time.time() - start > timeout:
print("Request timed out: %s" % self.description)
import traceback
traceback.print_stack()
raise NoResultError()
return self._result
else:
self._result = self.proc.getResult(self.reqId) ## raises NoResultError if result is not available yet
self.gotResult = True
return self._result
def hasResult(self):
"""Returns True if the result for this request has arrived."""
try:
self.result(block=False)
except NoResultError:
pass
return self.gotResult
class LocalObjectProxy(object):
"""
Used for wrapping local objects to ensure that they are send by proxy to a remote host.
Note that 'proxy' is just a shorter alias for LocalObjectProxy.
For example::
data = [1,2,3,4,5]
remotePlot.plot(data) ## by default, lists are pickled and sent by value
remotePlot.plot(proxy(data)) ## force the object to be sent by proxy
"""
nextProxyId = 0
proxiedObjects = {} ## maps {proxyId: object}
@classmethod
def registerObject(cls, obj):
## assign it a unique ID so we can keep a reference to the local object
pid = cls.nextProxyId
cls.nextProxyId += 1
cls.proxiedObjects[pid] = obj
#print "register:", cls.proxiedObjects
return pid
@classmethod
def lookupProxyId(cls, pid):
return cls.proxiedObjects[pid]
@classmethod
def releaseProxyId(cls, pid):
del cls.proxiedObjects[pid]
#print "release:", cls.proxiedObjects
def __init__(self, obj, **opts):
"""
Create a 'local' proxy object that, when sent to a remote host,
will appear as a normal ObjectProxy to *obj*.
Any extra keyword arguments are passed to proxy._setProxyOptions()
on the remote side.
"""
self.processId = os.getpid()
#self.objectId = id(obj)
self.typeStr = repr(obj)
#self.handler = handler
self.obj = obj
self.opts = opts
def __reduce__(self):
## a proxy is being pickled and sent to a remote process.
## every time this happens, a new proxy will be generated in the remote process,
## so we keep a new ID so we can track when each is released.
pid = LocalObjectProxy.registerObject(self.obj)
return (unpickleObjectProxy, (self.processId, pid, self.typeStr, None, self.opts))
## alias
proxy = LocalObjectProxy
def unpickleObjectProxy(processId, proxyId, typeStr, attributes=None, opts=None):
if processId == os.getpid():
obj = LocalObjectProxy.lookupProxyId(proxyId)
if attributes is not None:
for attr in attributes:
obj = getattr(obj, attr)
return obj
else:
proxy = ObjectProxy(processId, proxyId=proxyId, typeStr=typeStr)
if opts is not None:
proxy._setProxyOptions(**opts)
return proxy
class ObjectProxy(object):
"""
Proxy to an object stored by the remote process. Proxies are created
by calling Process._import(), Process.transfer(), or by requesting/calling
attributes on existing proxy objects.
For the most part, this object can be used exactly as if it
were a local object::
rsys = proc._import('sys') # returns proxy to sys module on remote process
rsys.stdout # proxy to remote sys.stdout
rsys.stdout.write # proxy to remote sys.stdout.write
rsys.stdout.write('hello') # calls sys.stdout.write('hello') on remote machine
# and returns the result (None)
When calling a proxy to a remote function, the call can be made synchronous
(result of call is returned immediately), asynchronous (result is returned later),
or return can be disabled entirely::
ros = proc._import('os')
## synchronous call; result is returned immediately
pid = ros.getpid()
## asynchronous call
request = ros.getpid(_callSync='async')
while not request.hasResult():
time.sleep(0.01)
pid = request.result()
## disable return when we know it isn't needed
rsys.stdout.write('hello', _callSync='off')
Additionally, values returned from a remote function call are automatically
returned either by value (must be picklable) or by proxy.
This behavior can be forced::
rnp = proc._import('numpy')
arrProxy = rnp.array([1,2,3,4], _returnType='proxy')
arrValue = rnp.array([1,2,3,4], _returnType='value')
The default callSync and returnType behaviors (as well as others) can be set
for each proxy individually using ObjectProxy._setProxyOptions() or globally using
proc.setProxyOptions().
"""
def __init__(self, processId, proxyId, typeStr='', parent=None):
object.__init__(self)
## can't set attributes directly because setattr is overridden.
self.__dict__['_processId'] = processId
self.__dict__['_typeStr'] = typeStr
self.__dict__['_proxyId'] = proxyId
self.__dict__['_attributes'] = ()
## attributes that affect the behavior of the proxy.
## in all cases, a value of None causes the proxy to ask
## its parent event handler to make the decision
self.__dict__['_proxyOptions'] = {
'callSync': None, ## 'sync', 'async', None
'timeout': None, ## float, None
'returnType': None, ## 'proxy', 'value', 'auto', None
'deferGetattr': None, ## True, False, None
'noProxyTypes': None, ## list of types to send by value instead of by proxy
}
self.__dict__['_handler'] = RemoteEventHandler.getHandler(processId)
self.__dict__['_handler'].registerProxy(self) ## handler will watch proxy; inform remote process when the proxy is deleted.
def _setProxyOptions(self, **kwds):
"""
Change the behavior of this proxy. For all options, a value of None
will cause the proxy to instead use the default behavior defined
by its parent Process.
Options are:
============= =============================================================
callSync 'sync', 'async', 'off', or None.
If 'async', then calling methods will return a Request object
which can be used to inquire later about the result of the
method call.
If 'sync', then calling a method
will block until the remote process has returned its result
or the timeout has elapsed (in this case, a Request object
is returned instead).
If 'off', then the remote process is instructed _not_ to
reply and the method call will return None immediately.
returnType 'auto', 'proxy', 'value', or None.
If 'proxy', then the value returned when calling a method
will be a proxy to the object on the remote process.
If 'value', then attempt to pickle the returned object and
send it back.
If 'auto', then the decision is made by consulting the
'noProxyTypes' option.
autoProxy bool or None. If True, arguments to __call__ are
automatically converted to proxy unless their type is
listed in noProxyTypes (see below). If False, arguments
are left untouched. Use proxy(obj) to manually convert
arguments before sending.
timeout float or None. Length of time to wait during synchronous
requests before returning a Request object instead.
deferGetattr True, False, or None.
If False, all attribute requests will be sent to the remote
process immediately and will block until a response is
received (or timeout has elapsed).
If True, requesting an attribute from the proxy returns a
new proxy immediately. The remote process is _not_ contacted
to make this request. This is faster, but it is possible to
request an attribute that does not exist on the proxied
object. In this case, AttributeError will not be raised
until an attempt is made to look up the attribute on the
remote process.
noProxyTypes List of object types that should _not_ be proxied when
sent to the remote process.
============= =============================================================
"""
self._proxyOptions.update(kwds)
def _getValue(self):
"""
Return the value of the proxied object
(the remote object must be picklable)
"""
return self._handler.getObjValue(self)
def _getProxyOption(self, opt):
val = self._proxyOptions[opt]
if val is None:
return self._handler.getProxyOption(opt)
return val
def _getProxyOptions(self):
return dict([(k, self._getProxyOption(k)) for k in self._proxyOptions])
def __reduce__(self):
return (unpickleObjectProxy, (self._processId, self._proxyId, self._typeStr, self._attributes))
def __repr__(self):
#objRepr = self.__getattr__('__repr__')(callSync='value')
return "<ObjectProxy for process %d, object 0x%x: %s >" % (self._processId, self._proxyId, self._typeStr)
def __getattr__(self, attr, **kwds):
"""
Calls __getattr__ on the remote object and returns the attribute
by value or by proxy depending on the options set (see
ObjectProxy._setProxyOptions and RemoteEventHandler.setProxyOptions)
If the option 'deferGetattr' is True for this proxy, then a new proxy object
is returned _without_ asking the remote object whether the named attribute exists.
This can save time when making multiple chained attribute requests,
but may also defer a possible AttributeError until later, making
them more difficult to debug.
"""
opts = self._getProxyOptions()
for k in opts:
if '_'+k in kwds:
opts[k] = kwds.pop('_'+k)
if opts['deferGetattr'] is True:
return self._deferredAttr(attr)
else:
#opts = self._getProxyOptions()
return self._handler.getObjAttr(self, attr, **opts)
def _deferredAttr(self, attr):
return DeferredObjectProxy(self, attr)
def __call__(self, *args, **kwds):
"""
Attempts to call the proxied object from the remote process.
Accepts extra keyword arguments:
_callSync 'off', 'sync', or 'async'
_returnType 'value', 'proxy', or 'auto'
If the remote call raises an exception on the remote process,
it will be re-raised on the local process.
"""
opts = self._getProxyOptions()
for k in opts:
if '_'+k in kwds:
opts[k] = kwds.pop('_'+k)
return self._handler.callObj(obj=self, args=args, kwds=kwds, **opts)
## Explicitly proxy special methods. Is there a better way to do this??
def _getSpecialAttr(self, attr):
## this just gives us an easy way to change the behavior of the special methods
return self._deferredAttr(attr)
def __getitem__(self, *args):
return self._getSpecialAttr('__getitem__')(*args)
def __setitem__(self, *args):
return self._getSpecialAttr('__setitem__')(*args, _callSync='off')
def __setattr__(self, *args):
return self._getSpecialAttr('__setattr__')(*args, _callSync='off')
def __str__(self, *args):
return self._getSpecialAttr('__str__')(*args, _returnType='value')
def __len__(self, *args):
return self._getSpecialAttr('__len__')(*args)
def __add__(self, *args):
return self._getSpecialAttr('__add__')(*args)
def __sub__(self, *args):
return self._getSpecialAttr('__sub__')(*args)
def __div__(self, *args):
return self._getSpecialAttr('__div__')(*args)
def __truediv__(self, *args):
return self._getSpecialAttr('__truediv__')(*args)
def __floordiv__(self, *args):
return self._getSpecialAttr('__floordiv__')(*args)
def __mul__(self, *args):
return self._getSpecialAttr('__mul__')(*args)
def __pow__(self, *args):
return self._getSpecialAttr('__pow__')(*args)
def __iadd__(self, *args):
return self._getSpecialAttr('__iadd__')(*args, _callSync='off')
def __isub__(self, *args):
return self._getSpecialAttr('__isub__')(*args, _callSync='off')
def __idiv__(self, *args):
return self._getSpecialAttr('__idiv__')(*args, _callSync='off')
def __itruediv__(self, *args):
return self._getSpecialAttr('__itruediv__')(*args, _callSync='off')
def __ifloordiv__(self, *args):
return self._getSpecialAttr('__ifloordiv__')(*args, _callSync='off')
def __imul__(self, *args):
return self._getSpecialAttr('__imul__')(*args, _callSync='off')
def __ipow__(self, *args):
return self._getSpecialAttr('__ipow__')(*args, _callSync='off')
def __rshift__(self, *args):
return self._getSpecialAttr('__rshift__')(*args)
def __lshift__(self, *args):
return self._getSpecialAttr('__lshift__')(*args)
def __irshift__(self, *args):
return self._getSpecialAttr('__irshift__')(*args, _callSync='off')
def __ilshift__(self, *args):
return self._getSpecialAttr('__ilshift__')(*args, _callSync='off')
def __eq__(self, *args):
return self._getSpecialAttr('__eq__')(*args)
def __ne__(self, *args):
return self._getSpecialAttr('__ne__')(*args)
def __lt__(self, *args):
return self._getSpecialAttr('__lt__')(*args)
def __gt__(self, *args):
return self._getSpecialAttr('__gt__')(*args)
def __le__(self, *args):
return self._getSpecialAttr('__le__')(*args)
def __ge__(self, *args):
return self._getSpecialAttr('__ge__')(*args)
def __and__(self, *args):
return self._getSpecialAttr('__and__')(*args)
def __or__(self, *args):
return self._getSpecialAttr('__or__')(*args)
def __xor__(self, *args):
return self._getSpecialAttr('__xor__')(*args)
def __iand__(self, *args):
return self._getSpecialAttr('__iand__')(*args, _callSync='off')
def __ior__(self, *args):
return self._getSpecialAttr('__ior__')(*args, _callSync='off')
def __ixor__(self, *args):
return self._getSpecialAttr('__ixor__')(*args, _callSync='off')
def __mod__(self, *args):
return self._getSpecialAttr('__mod__')(*args)
def __radd__(self, *args):
return self._getSpecialAttr('__radd__')(*args)
def __rsub__(self, *args):
return self._getSpecialAttr('__rsub__')(*args)
def __rdiv__(self, *args):
return self._getSpecialAttr('__rdiv__')(*args)
def __rfloordiv__(self, *args):
return self._getSpecialAttr('__rfloordiv__')(*args)
def __rtruediv__(self, *args):
return self._getSpecialAttr('__rtruediv__')(*args)
def __rmul__(self, *args):
return self._getSpecialAttr('__rmul__')(*args)
def __rpow__(self, *args):
return self._getSpecialAttr('__rpow__')(*args)
def __rrshift__(self, *args):
return self._getSpecialAttr('__rrshift__')(*args)
def __rlshift__(self, *args):
return self._getSpecialAttr('__rlshift__')(*args)
def __rand__(self, *args):
return self._getSpecialAttr('__rand__')(*args)
def __ror__(self, *args):
return self._getSpecialAttr('__ror__')(*args)
def __rxor__(self, *args):
return self._getSpecialAttr('__ror__')(*args)
def __rmod__(self, *args):
return self._getSpecialAttr('__rmod__')(*args)
def __hash__(self):
## Required for python3 since __eq__ is defined.
return id(self)
class DeferredObjectProxy(ObjectProxy):
"""
This class represents an attribute (or sub-attribute) of a proxied object.
It is used to speed up attribute requests. Take the following scenario::
rsys = proc._import('sys')
rsys.stdout.write('hello')
For this simple example, a total of 4 synchronous requests are made to
the remote process:
1) import sys
2) getattr(sys, 'stdout')
3) getattr(stdout, 'write')
4) write('hello')
This takes a lot longer than running the equivalent code locally. To
speed things up, we can 'defer' the two attribute lookups so they are
only carried out when neccessary::
rsys = proc._import('sys')
rsys._setProxyOptions(deferGetattr=True)
rsys.stdout.write('hello')
This example only makes two requests to the remote process; the two
attribute lookups immediately return DeferredObjectProxy instances
immediately without contacting the remote process. When the call
to write() is made, all attribute requests are processed at the same time.
Note that if the attributes requested do not exist on the remote object,
making the call to write() will raise an AttributeError.
"""
def __init__(self, parentProxy, attribute):
## can't set attributes directly because setattr is overridden.
for k in ['_processId', '_typeStr', '_proxyId', '_handler']:
self.__dict__[k] = getattr(parentProxy, k)
self.__dict__['_parent'] = parentProxy ## make sure parent stays alive
self.__dict__['_attributes'] = parentProxy._attributes + (attribute,)
self.__dict__['_proxyOptions'] = parentProxy._proxyOptions.copy()
def __repr__(self):
return ObjectProxy.__repr__(self) + '.' + '.'.join(self._attributes)
def _undefer(self):
"""
Return a non-deferred ObjectProxy referencing the same object
"""
return self._parent.__getattr__(self._attributes[-1], _deferGetattr=False)
|