/usr/share/pyshared/pycocumalib/broadcaster.py is in pycocuma 0.4.5-6-7.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | # Copyright (C) 2004 Henning Jacobs <henning@srcco.de>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# $Id: broadcaster.py 82 2004-07-11 13:01:44Z henning $
__all__ = ['Register', 'UnRegister', 'Broadcast', 'CurrentSource', 'CurrentTitle', 'CurrentData']
import debug
listeners = {}
currentSources = []
currentTitles = []
currentData = []
def Register(listener, arguments=(), source=None, title=None):
if not listeners.has_key((source, title)):
listeners[(source, title)] = []
listeners[(source, title)].append((listener, arguments))
def UnRegister(listener, arguments=(), source=None, title=None):
try:
lst = listeners[(source, title)]
del lst[lst.index((listener, arguments))]
except:
debug.echo("broadcaster.UnRegister(): failed to unregister listener")
lastbroadcast = None
def Broadcast(source, title, data={}, onceonly=0):
"Broadcast an Event / Notification to all registered Listeners"
global lastbroadcast
if onceonly and lastbroadcast == (source, title, data):
# We don't want to let this broadcast be repeated
return
debug.echo("broadcaster.Broadcast(): %s %s %s" % (source, title, str(data)))
currentSources.append(source)
currentTitles.append(title)
currentData.append(data)
listenerList = listeners.get((source, title), [])[:]
if source != None:
listenerList += listeners.get((None, title), [])
if title != None:
listenerList += listeners.get((source, None), [])
for listener, arguments in listenerList:
listener(*arguments)
currentSources.pop()
currentTitles.pop()
currentData.pop()
lastbroadcast = (source, title, data)
def CurrentSource():
return currentSource[-1]
def CurrentTitle():
return currentTitles[-1]
def CurrentData():
return currentData[-1]
|