/usr/share/pyshared/jsb/lib/runner.py is in jsonbot 0.84.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | # jsb/runner.py
#
#
""" threads management to run jobs. """
## jsb imports
from jsb.lib.threads import getname, start_new_thread, start_bot_command
from jsb.utils.exception import handle_exception
from jsb.utils.locking import locked, lockdec
from jsb.utils.lockmanager import rlockmanager, lockmanager
from jsb.utils.generic import waitevents
from jsb.utils.trace import callstack, whichmodule
from jsb.lib.threadloop import RunnerLoop
from jsb.lib.callbacks import callbacks
from jsb.lib.errors import URLNotEnabled
## basic imports
import Queue
import time
import thread
import random
import logging
import sys
## Runner class
class Runner(RunnerLoop):
"""
a runner is a thread with a queue on which jobs can be pushed.
jobs scheduled should not take too long since only one job can
be executed in a Runner at the same time.
"""
def __init__(self, name="runner", doready=True):
RunnerLoop.__init__(self, name)
self.working = False
self.starttime = time.time()
self.elapsed = self.starttime
self.finished = time.time()
self.doready = doready
self.nowrunning = ""
self.longrunning = []
self.shortrunning = []
def handle(self, descr, func, *args, **kwargs):
""" schedule a job. """
self.working = True
try:
#rlockmanager.acquire(getname(str(func)))
logging.debug('running %s: %s' % (descr, self.nowrunning))
self.starttime = time.time()
func(*args, **kwargs)
self.finished = time.time()
self.elapsed = self.finished - self.starttime
if self.elapsed > 5:
logging.debug('ALERT %s %s job taking too long: %s seconds' % (descr, str(func), self.elapsed))
except Exception, ex: handle_exception()
#finally: rlockmanager.release()
self.working = False
def done(self, event):
try: int(event.cbtype)
except ValueError:
if event.cbtype not in ['TICK', 'PING', 'NOTICE', 'TICK60']: logging.warn(str(event.cbtype))
## BotEventRunner class
class BotEventRunner(Runner):
def handle(self, speed, args):
""" schedule a bot command. """
try:
descr, func, bot, ievent = args
self.starttime = time.time()
#lockmanager.acquire(getname(str(func)))
if not ievent.nolog: logging.debug("event handler is %s" % str(func))
if self.nowrunning in self.longrunning:
logging.warn("putting %s on longrunner" % self.nowrunning)
longrunner.put(ievent.speed or speed, descr, func, bot, ievent)
return
self.working = True
try: func(bot, ievent)
except URLNotEnabled: logging.warn("urls fetching is disabled (%s)" % ievent.usercmnd) ; return
self.finished = time.time()
self.elapsed = self.finished - self.starttime
if self.elapsed > 5:
if self.nowrunning not in self.longrunning: self.longrunning.append(self.nowrunning)
if not ievent.nolog: logging.debug('ALERT %s %s job taking too long: %s seconds' % (descr, str(func), self.elapsed))
except Exception, ex:
handle_exception()
#finally: lockmanager.release(getname(str(func)))
self.working = False
class LongRunner(Runner):
def handle(self, speed, args):
""" schedule a bot command. """
try:
descr, func, bot, ievent = args
self.starttime = time.time()
#lockmanager.acquire(getname(str(func)))
#self.nowrunning = getname(func)
if not ievent.nolog: logging.debug("long event handler is %s" % str(func))
self.working = True
func(bot, ievent)
self.elapsed = time.time() - self.starttime
if self.elapsed < 1 and self.nowrunning not in self.shortrunning: self.shortrunning.append(self.nowrunning)
except Exception, ex:
handle_exception()
#finally: lockmanager.release(getname(str(func)))
self.working = False
logging.debug("long finished - %s" % self.nowrunning)
## Runners class
class Runners(object):
""" runners is a collection of runner objects. """
def __init__(self, name, max=100, runnertype=Runner, doready=True):
self.name = name
self.max = max
self.runners = []
self.runnertype = runnertype
self.doready = doready
def names(self):
return [getname(runner.name) for runner in self.runners]
def size(self):
qsize = [runner.queue.qsize() for runner in self.runners]
return "%s/%s" % (qsize, len(self.runners))
def runnersizes(self):
""" return sizes of runner objects. """
result = []
for runner in self.runners: result.append("%s - %s" % (runner.queue.qsize(), runner.name))
return result
def stop(self):
""" stop runners. """
for runner in self.runners: runner.stop()
def start(self):
""" overload this if needed. """
pass
def put(self, speed, *data):
""" put a job on a free runner. """
for runner in self.runners:
if runner.queue.empty():
runner.put(speed, *data)
return
if self.runners: self.cleanup()
runner = self.makenew()
runner.put(speed, *data)
def running(self):
""" return list of running jobs. """
result = []
for runner in self.runners:
if runner.working: result.append(runner.nowrunning)
return result
def makenew(self):
""" create a new runner. """
runner = None
if len(self.runners) < self.max:
runner = self.runnertype(self.name + "-" + str(len(self.runners)))
runner.start()
self.runners.append(runner)
else: runner = random.choice(self.runners)
return runner
def cleanup(self):
""" clean up idle runners. """
r = []
for runner in self.runners:
if runner.queue.empty(): r.append(runner)
if not r: return
for runner in r: runner.stop()
for runner in r:
try: self.runners.remove(runner)
except ValueError: pass
logging.debug("%s - cleaned %s" % (self.name, [item.name for item in r]))
logging.debug("%s - now running: %s" % (self.name, self.size()))
## show runner status
def runner_status():
print cmndrunner.runnersizes()
print callbackrunner.runnersizes()
## global runners
cmndrunner = defaultrunner = Runners("default", 100, BotEventRunner)
longrunner = Runners("long", 80, LongRunner)
callbackrunner = Runners("callback", 30, BotEventRunner)
waitrunner = Runners("wait", 20, BotEventRunner)
apirunner = Runners("api", 10, BotEventRunner)
## cleanup
def runnercleanup(bot, event):
cmndrunner.cleanup()
longrunner.cleanup()
callbackrunner.cleanup()
waitrunner.cleanup()
apirunner.cleanup()
callbacks.add("TICK60", runnercleanup)
def size():
return "cmnd: %s - callbacks: %s - wait: %s - long: %s - api: %s" % (cmndrunner.size(), callbackrunner.size(), waitrunner.size(), longrunner.size(), apirunner.size())
|