/usr/share/pyshared/landscape/broker/client.py is in landscape-common 12.04.3-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | from logging import info, exception
from twisted.internet.defer import maybeDeferred
from landscape.log import format_object
from landscape.lib.twisted_util import gather_results
class HandlerNotFoundError(Exception):
"""A handler for the given message type was not found."""
class BrokerClientPlugin(object):
"""A convenience for writing L{BrokerClient} plugins.
This provides a register method which will set up a bunch of
reactor handlers in the idiomatic way.
If C{run} is defined on subclasses, it will be called every C{run_interval}
seconds after being registered.
@cvar run_interval: The interval, in seconds, to execute the C{run} method.
If set to C{None}, then C{run} will not be scheduled.
@cvar run_immediately: If C{True} the plugin will be run immediately after
it is registered.
"""
run_interval = 5
run_immediately = False
def register(self, client):
self.client = client
if getattr(self, "run", None) is not None:
if self.run_immediately:
self.run()
if self.run_interval is not None:
self.client.reactor.call_every(self.run_interval, self.run)
@property
def registry(self):
"""An alias for the C{client} attribute."""
return self.client
def call_on_accepted(self, type, callable, *args, **kwargs):
"""
Register a callback fired upon a C{message-type-acceptance-changed}.
"""
def acceptance_changed(acceptance):
if acceptance:
return callable(*args, **kwargs)
self.client.reactor.call_on(("message-type-acceptance-changed", type),
acceptance_changed)
class BrokerClient(object):
"""Basic plugin registry for clients that have to deal with the broker.
This knows about the needs of a client when dealing with the Landscape
broker, including interest in messages of a particular type delivered
by the broker to the client.
@cvar name: The name used when registering to the broker, it must be
defined by sub-classes.
@ivar broker: A reference to a connected L{RemoteBroker}, it must be set
by the connecting machinery at service startup.
@param reactor: A L{TwistedReactor}.
"""
name = "client"
def __init__(self, reactor):
super(BrokerClient, self).__init__()
self.reactor = reactor
self.broker = None
self._registered_messages = {}
self._plugins = []
self._plugin_names = {}
# Register event handlers
self.reactor.call_on("impending-exchange", self.notify_exchange)
self.reactor.call_on("broker-reconnect", self.handle_reconnect)
def ping(self):
"""Return C{True}"""
return True
def add(self, plugin):
"""Add a plugin.
The plugin's C{register} method will be called with this broker client
as its argument.
If the plugin has a C{plugin_name} attribute, it will be possible to
look up the plugin later with L{get_plugin}.
"""
info("Registering plugin %s.", format_object(plugin))
self._plugins.append(plugin)
if hasattr(plugin, 'plugin_name'):
self._plugin_names[plugin.plugin_name] = plugin
plugin.register(self)
def get_plugins(self):
"""Get the list of plugins."""
return self._plugins[:]
def get_plugin(self, name):
"""Get a particular plugin by name."""
return self._plugin_names[name]
def register_message(self, type, handler):
"""
Register interest in a particular type of Landscape server->client
message.
@param type: The type of message to register C{handler} for.
@param handler: A callable taking a message as a parameter, called
when messages of C{type} are received.
@return: A C{Deferred} that will fire when registration completes.
"""
self._registered_messages[type] = handler
return self.broker.register_client_accepted_message_type(type)
def dispatch_message(self, message):
"""Run the handler registered for the type of the given message.
@return: The return value of the handler, if found.
@raises: HandlerNotFoundError if the handler was not found
"""
type = message["type"]
handler = self._registered_messages.get(type)
if handler is None:
raise HandlerNotFoundError(type)
try:
return handler(message)
except:
exception("Error running message handler for type %r: %r"
% (type, handler))
def message(self, message):
"""Call C{dispatch_message} for the given C{message}.
@return: A boolean indicating if a handler for the message was found.
"""
try:
self.dispatch_message(message)
return True
except HandlerNotFoundError:
return False
def exchange(self):
"""Call C{exchange} on all plugins."""
for plugin in self.get_plugins():
if hasattr(plugin, "exchange"):
try:
plugin.exchange()
except:
exception("Error during plugin exchange")
def notify_exchange(self):
"""Notify all plugins about an impending exchange."""
info("Got notification of impending exchange. Notifying all plugins.")
self.exchange()
def fire_event(self, event_type, *args, **kwargs):
"""Fire an event of a given type.
@return: A L{Deferred} resulting in a list of returns values of
the fired event handlers, in the order they were fired.
"""
if event_type == "message-type-acceptance-changed":
message_type = args[0]
acceptance = args[1]
results = self.reactor.fire((event_type, message_type), acceptance)
else:
results = self.reactor.fire(event_type, *args, **kwargs)
return gather_results([
maybeDeferred(lambda x: x, result) for result in results])
def handle_reconnect(self):
"""Called when the connection with the broker is established again.
The following needs to be done:
- Re-register any previously registered message types, so the broker
knows we have interest on them.
- Re-register ourselves as client, so the broker knows we exist and
will talk to us firing events and dispatching messages.
"""
for type in self._registered_messages:
self.broker.register_client_accepted_message_type(type)
self.broker.register_client(self.name)
def exit(self):
"""Stop the reactor and exit the process."""
# Stop with a short delay to give a chance to reply to the
# caller over AMP.
self.reactor.call_later(0.1, self.reactor.stop)
|