This file is indexed.

/usr/share/pyshared/landscape/broker/client.py is in landscape-common 12.04.3-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
from logging import info, exception

from twisted.internet.defer import maybeDeferred

from landscape.log import format_object
from landscape.lib.twisted_util import gather_results


class HandlerNotFoundError(Exception):
    """A handler for the given message type was not found."""


class BrokerClientPlugin(object):
    """A convenience for writing L{BrokerClient} plugins.

    This provides a register method which will set up a bunch of
    reactor handlers in the idiomatic way.

    If C{run} is defined on subclasses, it will be called every C{run_interval}
    seconds after being registered.

    @cvar run_interval: The interval, in seconds, to execute the C{run} method.
        If set to C{None}, then C{run} will not be scheduled.
    @cvar run_immediately: If C{True} the plugin will be run immediately after
        it is registered.
    """

    run_interval = 5
    run_immediately = False

    def register(self, client):
        self.client = client
        if getattr(self, "run", None) is not None:
            if self.run_immediately:
                self.run()
            if self.run_interval is not None:
                self.client.reactor.call_every(self.run_interval, self.run)

    @property
    def registry(self):
        """An alias for the C{client} attribute."""
        return self.client

    def call_on_accepted(self, type, callable, *args, **kwargs):
        """
        Register a callback fired upon a C{message-type-acceptance-changed}.
        """

        def acceptance_changed(acceptance):
            if acceptance:
                return callable(*args, **kwargs)

        self.client.reactor.call_on(("message-type-acceptance-changed", type),
                                    acceptance_changed)


class BrokerClient(object):
    """Basic plugin registry for clients that have to deal with the broker.

    This knows about the needs of a client when dealing with the Landscape
    broker, including interest in messages of a particular type delivered
    by the broker to the client.

    @cvar name: The name used when registering to the broker, it must be
        defined by sub-classes.
    @ivar broker: A reference to a connected L{RemoteBroker}, it must be set
        by the connecting machinery at service startup.

    @param reactor: A L{TwistedReactor}.
    """
    name = "client"

    def __init__(self, reactor):
        super(BrokerClient, self).__init__()
        self.reactor = reactor
        self.broker = None
        self._registered_messages = {}
        self._plugins = []
        self._plugin_names = {}

        # Register event handlers
        self.reactor.call_on("impending-exchange", self.notify_exchange)
        self.reactor.call_on("broker-reconnect", self.handle_reconnect)

    def ping(self):
        """Return C{True}"""
        return True

    def add(self, plugin):
        """Add a plugin.

        The plugin's C{register} method will be called with this broker client
        as its argument.

        If the plugin has a C{plugin_name} attribute, it will be possible to
        look up the plugin later with L{get_plugin}.
        """
        info("Registering plugin %s.", format_object(plugin))
        self._plugins.append(plugin)
        if hasattr(plugin, 'plugin_name'):
            self._plugin_names[plugin.plugin_name] = plugin
        plugin.register(self)

    def get_plugins(self):
        """Get the list of plugins."""
        return self._plugins[:]

    def get_plugin(self, name):
        """Get a particular plugin by name."""
        return self._plugin_names[name]

    def register_message(self, type, handler):
        """
        Register interest in a particular type of Landscape server->client
        message.

        @param type: The type of message to register C{handler} for.
        @param handler: A callable taking a message as a parameter, called
            when messages of C{type} are received.
        @return: A C{Deferred} that will fire when registration completes.
        """
        self._registered_messages[type] = handler
        return self.broker.register_client_accepted_message_type(type)

    def dispatch_message(self, message):
        """Run the handler registered for the type of the given message.

        @return: The return value of the handler, if found.
        @raises: HandlerNotFoundError if the handler was not found
        """
        type = message["type"]
        handler = self._registered_messages.get(type)
        if handler is None:
            raise HandlerNotFoundError(type)
        try:
            return handler(message)
        except:
            exception("Error running message handler for type %r: %r"
                      % (type, handler))

    def message(self, message):
        """Call C{dispatch_message} for the given C{message}.

        @return: A boolean indicating if a handler for the message was found.
        """
        try:
            self.dispatch_message(message)
            return True
        except HandlerNotFoundError:
            return False

    def exchange(self):
        """Call C{exchange} on all plugins."""
        for plugin in self.get_plugins():
            if hasattr(plugin, "exchange"):
                try:
                    plugin.exchange()
                except:
                    exception("Error during plugin exchange")

    def notify_exchange(self):
        """Notify all plugins about an impending exchange."""
        info("Got notification of impending exchange. Notifying all plugins.")
        self.exchange()

    def fire_event(self, event_type, *args, **kwargs):
        """Fire an event of a given type.

        @return: A L{Deferred} resulting in a list of returns values of
            the fired event handlers, in the order they were fired.
        """
        if event_type == "message-type-acceptance-changed":
            message_type = args[0]
            acceptance = args[1]
            results = self.reactor.fire((event_type, message_type), acceptance)
        else:
            results = self.reactor.fire(event_type, *args, **kwargs)
        return gather_results([
            maybeDeferred(lambda x: x, result) for result in results])

    def handle_reconnect(self):
        """Called when the connection with the broker is established again.

        The following needs to be done:

          - Re-register any previously registered message types, so the broker
            knows we have interest on them.

          - Re-register ourselves as client, so the broker knows we exist and
            will talk to us firing events and dispatching messages.
        """
        for type in self._registered_messages:
            self.broker.register_client_accepted_message_type(type)
        self.broker.register_client(self.name)

    def exit(self):
        """Stop the reactor and exit the process."""
        # Stop with a short delay to give a chance to reply to the
        # caller over AMP.
        self.reactor.call_later(0.1, self.reactor.stop)