/usr/share/pyshared/pychess/System/ThreadPool.py is in pychess 0.10.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | """ This is a pool for reusing threads """
from threading import Thread, Condition, Lock
import GtkWorker
import Queue
import inspect
import os
import sys
import threading
import traceback
import cStringIO
import atexit
if not hasattr(Thread, "_Thread__bootstrap_inner"):
class SafeThread (Thread):
def encaps(self):
try:
self._Thread__bootstrap_inner()
except:
if self.__daemonic and (sys is None or sys.__doc__ is None):
return
raise
setattr(SafeThread, "_Thread__bootstrap_inner", SafeThread._Thread__bootstrap)
setattr(SafeThread, "_Thread__bootstrap", SafeThread.encaps)
threading.Thread = SafeThread
maxThreads = sys.maxint
class ThreadPool:
def __init__ (self):
self.queue = Queue.Queue()
self.lock = Lock()
self.threads = 0
def start (self, func, *args, **kw):
self.lock.acquire()
try:
a = self.queue.get_nowait()
except Queue.Empty:
if self.threads < maxThreads:
self.threads += 1
a = self.Worker(self.queue)
a.setDaemon(True)
a.start()
else:
a = self.queue.get(timeout=5)
from pychess.System.Log import log
log.warn("Couldn't get a thread after 5s")
a = self.queue.get()
a.func = lambda: func(*args, **kw)
a.name = self._getThreadName(a, func)
a.wcond.acquire()
a.wcond.notify()
a.wcond.release()
self.lock.release()
def _getThreadName (self, thread, func):
try:
framerecord = inspect.stack()[2]
except TypeError:
return ""
# d = os.path.basename(os.path.dirname(framerecord[1]))
f = os.path.basename(framerecord[1])
# f = os.sep.join((d, f))
caller = ":".join([str(v) for v in (f,) + framerecord[2:4]])
module = inspect.getmodule(func)
lineno = inspect.getsourcelines(func)[1]
callee = ":".join((module.__name__, str(lineno), func.__name__))
if module is GtkWorker or "repeat" in str(module):
framerecord = inspect.stack()[3]
# d = os.path.basename(os.path.dirname(framerecord[1]))
f = os.path.basename(framerecord[1])
# f = os.sep.join((d, f))
callee += " -- " + ":".join([str(v) for v in (f,) + framerecord[2:4]])
s = caller + " -- " + callee
for repl in ("pychess.", "System.", "Players."):
s = s.replace(repl, "")
return s
class Worker (threading.Thread):
def __init__ (self, queue):
Thread.__init__(self)
self.func = None
self.wcond = Condition()
self.queue = queue
self.running = True
atexit.register(self.__del__)
# We catch the trace from the thread, that created the worker
stringio = cStringIO.StringIO()
traceback.print_stack(file=stringio)
self.tracestack = traceback.extract_stack()
def run (self):
try:
while True:
if self.func:
try:
self.func()
except Exception, e:
#try:
# if glock._rlock._RLock__owner == self:
# # As a service we take care of releasing the gdk
# # lock when a thread breaks to avoid freezes
# for i in xrange(glock._rlock._RLock__count):
# glock.release()
#except AssertionError, e:
# print e
# pass
_, _, exc_traceback = sys.exc_info()
list = self.tracestack[:-2] + \
traceback.extract_tb(exc_traceback)[2:]
error = "".join(traceback.format_list(list))
print error.rstrip()
print str(e.__class__), e
self.func = None
self.queue.put(self)
self.wcond.acquire()
self.wcond.wait()
self.wcond.release()
except:
#self.threads -= 1
if self.running:
raise
def __del__ (self):
self.running = False
pool = ThreadPool()
class PooledThread (object):
def start (self):
pool.start(self.run)
def run (self):
pass
def join (self, timeout=None):
raise NotImplementedError
def setName (self, name):
raise NotImplementedError
def getName (self):
raise NotImplementedError
def isAlive (self):
raise NotImplementedError
def isDaemon (self):
return True
def setDaemon (self):
raise NotImplementedError
|