/usr/share/pyshared/Pyro/EventService/Server.py is in pyro 1:3.14-1.1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 | #############################################################################
#
# Event Service daemon and server classes
#
# This is part of "Pyro" - Python Remote Objects
# which is (c) Irmen de Jong - irmen@razorvine.net
#
#############################################################################
import time, types, re, sys, traceback, os
import Pyro.core, Pyro.naming, Pyro.util, Pyro.constants
from Pyro.errors import *
from Pyro.EventService.Event import Event
import Queue
from threading import Thread
Log=Pyro.util.Log
# SUBSCRIBER - each subscriber has one of these worker threads
class Subscriber(Thread):
def __init__(self, remote):
Thread.__init__(self)
self.remote=remote
# set the callback method to ONEWAY mode:
self.remote._setOneway("event")
self.queue=Queue.Queue(Pyro.config.PYRO_ES_QUEUESIZE)
def run(self):
while 1:
event=self.queue.get()
if isinstance(event,Event):
try:
self.remote.event(event)
except ProtocolError,x:
break
else:
break # it was no Event, so exit
# this reads all pending items from the queue so that any
# tasks that are blocked on the queue can continue.
(queue, self.queue) = (self.queue, None)
try:
while 1:
queue.get(block=0)
except Queue.Empty:
pass
# release the remote connection
self.remote._release()
del self.remote
def send(self, event):
if self.queue:
self.queue.put(event, block=Pyro.config.PYRO_ES_BLOCKQUEUE)
def running(self):
return self.queue
# The EVENTSERVICE is the actual Pyro server.
#
# BTW: Subscribers are remembered trough their proxy class.
# This class is capable of being a correct key in a dictionary.
class EventService(Pyro.core.ObjBase):
def __init__(self):
Pyro.core.ObjBase.__init__(self)
self.subscribers={} # subject -> { threadname-> subscriberthread }
self.subscribersMatch={} # subjectPattern -> { threadname->subscriberthread }
self.subscriptionWorkers={} # subscriber -> subscription thread object
def _mksequence(self, seq):
if not (type(seq) in (types.TupleType,types.ListType)):
return (seq,)
return seq
def getSubscriptionWorker(self, subscriber):
# If this subscriber doesn't have its own subscription thread, create one.
if subscriber not in self.subscriptionWorkers:
worker = Subscriber(subscriber)
worker.start()
self.subscriptionWorkers[subscriber]=worker
return worker
else:
return self.subscriptionWorkers[subscriber]
def subscribe(self, subjects, subscriber):
if not subjects: return
# Subscribe into a dictionary; this way; somebody can subscribe
# only once to this subject. Subjects are exact strings.
for subject in self._mksequence(subjects):
worker=self.getSubscriptionWorker(subscriber)
self.subscribers.setdefault(subject.lower(),{}) [worker.getName()]=worker
def subscribeMatch(self, subjects, subscriber):
if not subjects: return
# Subscribe into a dictionary; this way; somebody can subscribe
# only once to this subject. Subjects are regex patterns.
for subject in self._mksequence(subjects):
worker=self.getSubscriptionWorker(subscriber)
matcher = re.compile(subject,re.IGNORECASE)
self.subscribersMatch.setdefault(matcher,{}) [worker.getName()]=worker
def unsubscribe(self, subjects, subscriber):
if not subjects: return
for subject in self._mksequence(subjects):
try:
blaat=self.subscribers[subject.lower()] # check for subject
worker=self.subscriptionWorkers[subscriber]
del self.subscribers[subject.lower()] [worker.getName()]
self.killWorkerIfLastSubject(subscriber, worker)
except KeyError,x:
try:
m=re.compile(subject,re.IGNORECASE)
worker=self.subscriptionWorkers[subscriber]
del self.subscribersMatch[m] [worker.getName()]
self.killWorkerIfLastSubject(subscriber,worker)
except KeyError,x:
pass
def publish(self, subjects, message):
if not subjects: return
# keep the creation time, this must be the same for all events.
creationTime=time.time()
# publish a message. Subjects must be exact strings
for subject in self._mksequence(subjects):
event = Event(subject, message, creationTime)
subjectLC=subject.lower()
try:
for (name,s) in self.subscribers[subjectLC].items():
try:
if s.running():
s.send(event)
else:
try:
del self.subscribers[subjectLC][name]
except KeyError:
pass
except Queue.Full:
pass
except KeyError:
pass
# process the subject patterns
for (m,subs) in self.subscribersMatch.items():
if m.match(subject):
# send event to all subscribers
for (name,s) in subs.items():
try:
if s.running():
s.send(event)
else:
try:
del subs[name]
except KeyError:
pass
except Queue.Full:
pass
def killWorkerIfLastSubject(self, subscriber, worker):
item=(worker.getName(),worker)
for v in self.subscribers.values():
if item in v.items():
return
for v in self.subscribersMatch.values():
if item in v.items():
return
worker.send("QUIT")
del self.subscriptionWorkers[subscriber]
class EventServiceStarter(object):
def __init__(self, identification=None):
Pyro.core.initServer()
self.running=1
self.identification=identification
self.started = Pyro.util.getEventObject()
def start(self, *args, **kwargs): # see _start for allowed arguments
kwargs["startloop"]=1
self._start(*args, **kwargs )
def initialize(self, *args, **kwargs): # see _start for allowed arguments
kwargs["startloop"]=0
self._start( *args, **kwargs )
def getServerSockets(self):
return self.daemon.getServerSockets()
def waitUntilStarted(self,timeout=None):
self.started.wait(timeout)
return self.started.isSet()
def _start(self,hostname='',port=None,startloop=1,useNameServer=1,norange=0):
daemon = Pyro.core.Daemon(host=hostname,port=port,norange=norange)
if self.identification:
daemon.setAllowedIdentifications([self.identification])
print 'Requiring connection authentication.'
if useNameServer:
locator = Pyro.naming.NameServerLocator(identification=self.identification)
ns = locator.getNS()
# check if ES already running
try:
ns.resolve(Pyro.constants.EVENTSERVER_NAME)
print 'The Event Server appears to be already running.'
print 'You cannot start multiple Event Servers.'
ans=raw_input('Start new Event Server anyway (y/n)? ')
if ans!='y':
return
ns.unregister(Pyro.constants.EVENTSERVER_NAME)
except NamingError:
pass
daemon.useNameServer(ns)
es = EventService()
esURI=daemon.connect(es, Pyro.constants.EVENTSERVER_NAME)
print 'URI=',esURI
message = daemon.validateHostnameAndIP()
if message:
print "\nWARNING:",message,"\n"
print 'Event Server started.'
self.started.set() # signal that we've started.
if startloop:
Log.msg('ES daemon','This is the Pyro Event Server.')
try:
if os.name!="java":
# I use a timeout here otherwise you can't break gracefully on Windows
daemon.setTimeout(20)
daemon.requestLoop(lambda s=self: s.running)
except KeyboardInterrupt:
Log.warn('ES daemon','shutdown on user break signal')
print 'Shutting down on user break signal.'
self.shutdown(es)
except:
try:
(exc_type, exc_value, exc_trb) = sys.exc_info()
out = ''.join(traceback.format_exception(exc_type, exc_value, exc_trb)[-5:])
Log.error('ES daemon', 'Unexpected exception, type',exc_type,
'\n--- partial traceback of this exception follows:\n',
out,'\n--- end of traceback')
print '*** Exception occured!!! Partial traceback:'
print out
print '*** Resuming operations...'
finally:
del exc_type, exc_value, exc_trb # delete refs to allow proper GC
Log.msg('ES daemon','Shut down gracefully.')
print 'Event Server gracefully stopped.'
else:
# no loop, store the required objects for getServerSockets()
self.daemon=daemon
self.es=es
if os.name!="java":
daemon.setTimeout(20) # XXX fixed timeout
def mustContinueRunning(self):
return self.running
def handleRequests(self, timeout=None):
# this method must be called from a custom event loop
self.daemon.handleRequests(timeout=timeout)
def shutdown(self,es):
if es:
# internal shutdown call with specified ES object
daemon=es.getDaemon()
else:
# custom shutdown call w/o specified ES object, use stored instance
daemon=self.daemon
es=self.es
del self.es, self.daemon
try:
daemon.disconnect(es) # clean up nicely
except NamingError,x:
Log.warn('ES daemon','disconnect error during shutdown:',x)
except ConnectionClosedError,x:
Log.warn('ES daemon','lost connection with Name Server, cannot unregister')
self.running=0
daemon.shutdown()
def start(argv):
Args = Pyro.util.ArgParser()
Args.parse(argv,'hNn:p:i:')
if Args.hasOpt('h'):
print 'Usage: pyro-es [-h] [-n hostname] [-p port] [-N] [-i identification]'
print ' where -p = ES server port (0 for auto)'
print ' -n = non-default hostname to bind on'
print ' -N = do not use the name server'
print ' -i = the required authentication ID for ES clients,'
print ' also used to connect to other Pyro services'
print ' -h = print this help'
raise SystemExit
hostname = Args.getOpt('n',None)
port = Args.getOpt('p',None)
useNameServer = not Args.hasOpt('N')
ident = Args.getOpt('i',None)
if port:
port=int(port)
norange=(port==0)
Args.printIgnored()
if Args.args:
print 'Ignored arguments:',' '.join(Args.args)
print '*** Pyro Event Server ***'
starter=EventServiceStarter(identification=ident)
starter.start(hostname,port,useNameServer=useNameServer,norange=norange)
# allow easy starting of the ES by using python -m
if __name__=="__main__":
start(sys.argv[1:])
|