This file is indexed.

/usr/share/pyshared/landscape/broker/registration.py is in landscape-common 12.04.3-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
import time
import logging
import socket

from twisted.internet.defer import Deferred

from landscape.lib.bpickle import loads
from landscape.lib.log import log_failure
from landscape.lib.fetch import fetch, FetchError
from landscape.lib.tag import is_valid_tag_list
from landscape.lib.network import get_fqdn
from landscape.lib.vm_info import get_vm_info


EC2_HOST = "169.254.169.254"
EC2_API = "http://%s/latest" % (EC2_HOST,)


class InvalidCredentialsError(Exception):
    """
    Raised when an invalid account title and/or registration password
    is used with L{RegistrationManager.register}.
    """


def persist_property(name):

    def get(self):
        return self._persist.get(name)

    def set(self, value):
        self._persist.set(name, value)

    return property(get, set)


def config_property(name):

    def get(self):
        return getattr(self._config, name)

    return property(get)


class Identity(object):
    """Maintains details about the identity of this Landscape client.

    @ivar secure_id: A server-provided ID for secure message exchange.
    @ivar insecure_id: Non-secure server-provided ID, mainly used with
        the ping server.
    @ivar computer_title: See L{BrokerConfiguration}.
    @ivar account_name: See L{BrokerConfiguration}.
    @ivar registration_password: See L{BrokerConfiguration}.
    @ivar tags: See L{BrokerConfiguration}

    @param config: A L{BrokerConfiguration} object, used to set the
        C{computer_title}, C{account_name} and C{registration_password}
        instance variables.
    """

    secure_id = persist_property("secure-id")
    insecure_id = persist_property("insecure-id")
    computer_title = config_property("computer_title")
    account_name = config_property("account_name")
    registration_password = config_property("registration_password")
    tags = config_property("tags")

    def __init__(self, config, persist):
        self._config = config
        self._persist = persist.root_at("registration")


class RegistrationHandler(object):
    """
    An object from which registration can be requested of the server,
    and which will handle forced ID changes from the server.

    L{register} should be used to perform initial registration.
    """

    def __init__(self, config, identity, reactor, exchange, pinger,
                 message_store, fetch_async=None):
        self._config = config
        self._identity = identity
        self._reactor = reactor
        self._exchange = exchange
        self._pinger = pinger
        self._message_store = message_store
        self._reactor.call_on("run", self._fetch_ec2_data)
        self._reactor.call_on("pre-exchange", self._handle_pre_exchange)
        self._reactor.call_on("exchange-done", self._handle_exchange_done)
        self._exchange.register_message("set-id", self._handle_set_id)
        self._exchange.register_message("unknown-id", self._handle_unknown_id)
        self._exchange.register_message("registration",
                                        self._handle_registration)
        self._should_register = None
        self._fetch_async = fetch_async
        self._otp = None
        self._ec2_data = None

    def should_register(self):
        id = self._identity
        # boolean logic is hard, I'm gonna use an if
        if self._config.cloud:
            return bool(not id.secure_id
                        and self._message_store.accepts("register-cloud-vm"))
        elif self._config.provisioning_otp:
            return (not id.secure_id) and\
                self._message_store.accepts("register-provisioned-machine")
        return bool(not id.secure_id and id.computer_title and id.account_name
                    and self._message_store.accepts("register"))

    def register(self):
        """
        Attempt to register with the Landscape server.

        @return: A L{Deferred} which will either be fired with None if
            registration was successful or will fail with an
            L{InvalidCredentialsError} if not.
        """
        self._identity.secure_id = None
        self._identity.insecure_id = None
        result = RegistrationResponse(self._reactor).deferred
        self._exchange.exchange()
        return result

    def _get_data(self, path, accumulate):
        """
        Get data at C{path} on the EC2 API endpoint, and add the result to the
        C{accumulate} list.
        """
        return self._fetch_async(EC2_API + path).addCallback(accumulate.append)

    def _fetch_ec2_data(self):
        """Retrieve available EC2 information, if in a EC2 compatible cloud."""
        id = self._identity
        if self._config.cloud and not id.secure_id:
            # Fetch data from the EC2 API, to be used later in the registration
            # process
            # We ignore errors from user-data because it's common for the
            # URL to return a 404 when the data is unavailable.
            ec2_data = []
            deferred = self._fetch_async(EC2_API + "/user-data").addErrback(
                log_failure).addCallback(ec2_data.append)
            paths = [
                "/meta-data/instance-id",
                "/meta-data/reservation-id",
                "/meta-data/local-hostname",
                "/meta-data/public-hostname",
                "/meta-data/ami-launch-index",
                "/meta-data/ami-id",
                "/meta-data/local-ipv4",
                "/meta-data/public-ipv4"]
            # We're not using a DeferredList here because we want to keep the
            # number of connections to the backend minimal. See lp:567515.
            for path in paths:
                deferred.addCallback(
                    lambda ignore, path=path: self._get_data(path, ec2_data))
            # Special case the ramdisk retrieval, because it may not be present
            deferred.addCallback(
                lambda ignore: self._fetch_async(
                    EC2_API + "/meta-data/ramdisk-id").addErrback(log_failure))
            deferred.addCallback(ec2_data.append)
            # And same for kernel
            deferred.addCallback(
                lambda ignore: self._fetch_async(
                    EC2_API + "/meta-data/kernel-id").addErrback(log_failure))
            deferred.addCallback(ec2_data.append)

            def record_data(ignore):
                """Record the instance data returned by the EC2 API."""
                (raw_user_data, instance_key, reservation_key,
                 local_hostname, public_hostname, launch_index,
                 ami_key, local_ip, public_ip, ramdisk_key,
                 kernel_key) = ec2_data
                self._ec2_data = {
                    "instance_key": instance_key,
                    "reservation_key": reservation_key,
                    "local_hostname": local_hostname,
                    "public_hostname": public_hostname,
                    "launch_index": launch_index,
                    "kernel_key": kernel_key,
                    "ramdisk_key": ramdisk_key,
                    "image_key": ami_key,
                    "public_ipv4": public_ip,
                    "local_ipv4": local_ip}
                for k, v in self._ec2_data.items():
                    if v is None and k in ("ramdisk_key", "kernel_key"):
                        continue
                    self._ec2_data[k] = v.decode("utf-8")
                self._ec2_data["launch_index"] = int(
                    self._ec2_data["launch_index"])

                if self._config.otp:
                    self._otp = self._config.otp
                    return
                instance_data = _extract_ec2_instance_data(
                    raw_user_data, int(launch_index))
                if instance_data is not None:
                    self._otp = instance_data["otp"]
                    exchange_url = instance_data["exchange-url"]
                    ping_url = instance_data["ping-url"]
                    self._exchange._transport.set_url(exchange_url)
                    self._pinger.set_url(ping_url)
                    self._config.url = exchange_url
                    self._config.ping_url = ping_url
                    if "ssl-ca-certificate" in instance_data:
                        from landscape.configuration import \
                            store_public_key_data
                        public_key_file = store_public_key_data(
                            self._config, instance_data["ssl-ca-certificate"])
                        self._config.ssl_public_key = public_key_file
                        self._exchange._transport._pubkey = public_key_file
                    self._config.write()

            def log_error(error):
                log_failure(error, msg="Got error while fetching meta-data: %r"
                            % (error.value,))

            deferred.addCallback(record_data)
            deferred.addErrback(log_error)

    def _handle_exchange_done(self):
        """Registered handler for the C{"exchange-done"} event.

        If we are not registered yet, schedule another message exchange.

        The first exchange made us accept the message type "register", so
        the next "pre-exchange" event will make L{_handle_pre_exchange}
        queue a registration message for delivery.
        """
        if self.should_register() and not self._should_register:
            self._exchange.exchange()

    def _handle_pre_exchange(self):
        """
        An exchange is about to happen.  If we don't have a secure id already
        set, and we have the needed information available, queue a registration
        message with the server.
        """
        # The point of storing this flag is that if we should *not* register
        # now, and then after the exchange we *should*, we schedule an urgent
        # exchange again.  Without this flag we would just spin trying to
        # connect to the server when something is clearly preventing the
        # registration.
        self._should_register = self.should_register()

        if self._should_register:
            id = self._identity
            self._message_store.delete_all_messages()
            tags = id.tags
            if not is_valid_tag_list(tags):
                tags = None
                logging.error("Invalid tags provided for cloud "
                              "registration.")
            if self._config.cloud and self._ec2_data is not None:
                if self._otp:
                    logging.info("Queueing message to register with OTP")
                    message = {"type": "register-cloud-vm",
                               "otp": self._otp,
                               "hostname": get_fqdn(),
                               "account_name": None,
                               "registration_password": None,
                               "tags": tags,
                               "vm-info": get_vm_info()}
                    message.update(self._ec2_data)
                    self._exchange.send(message)
                elif id.account_name:
                    with_tags = ["", u"and tags %s " % tags][bool(tags)]
                    logging.info(
                        u"Queueing message to register with account %r %s"
                        u"as an EC2 instance." % (id.account_name, with_tags))
                    message = {"type": "register-cloud-vm",
                               "otp": None,
                               "hostname": get_fqdn(),
                               "account_name": id.account_name,
                               "registration_password": \
                                   id.registration_password,
                               "tags": tags,
                               "vm-info": get_vm_info()}
                    message.update(self._ec2_data)
                    self._exchange.send(message)
                else:
                    self._reactor.fire("registration-failed")
            elif id.account_name:
                with_word = ["without", "with"][bool(id.registration_password)]
                with_tags = ["", u"and tags %s " % tags][bool(tags)]
                logging.info(u"Queueing message to register with account %r %s"
                             "%s a password." % (id.account_name, with_tags,
                                                 with_word))
                message = {"type": "register",
                           "computer_title": id.computer_title,
                           "account_name": id.account_name,
                           "registration_password": id.registration_password,
                           "hostname": get_fqdn(),
                           "tags": tags,
                           "vm-info": get_vm_info()}
                self._exchange.send(message)
            elif self._config.provisioning_otp:
                logging.info(u"Queueing message to register with OTP as a"
                             u" newly provisioned machine.")
                message = {"type": "register-provisioned-machine",
                           "otp": self._config.provisioning_otp}
                self._exchange.send(message)
            else:
                self._reactor.fire("registration-failed")

    def _handle_set_id(self, message):
        """Registered handler for the C{"set-id"} event.

        Record and start using the secure and insecure IDs from the given
        message.

        Fire C{"registration-done"} and C{"resynchronize-clients"}.
        """
        id = self._identity
        id.secure_id = message.get("id")
        id.insecure_id = message.get("insecure-id")
        logging.info("Using new secure-id ending with %s for account %s.",
                     id.secure_id[-10:], id.account_name)
        logging.debug("Using new secure-id: %s", id.secure_id)
        self._reactor.fire("registration-done")
        self._reactor.fire("resynchronize-clients")

    def _handle_registration(self, message):
        if message["info"] == "unknown-account":
            self._reactor.fire("registration-failed")

    def _handle_unknown_id(self, message):
        id = self._identity
        logging.info("Client has unknown secure-id for account %s."
                     % id.account_name)
        id.secure_id = None
        id.insecure_id = None


class RegistrationResponse(object):
    """A helper for dealing with the response of a single registration request.

    @ivar deferred: The L{Deferred} that will be fired as per
        L{RegistrationHandler.register}.
    """

    def __init__(self, reactor):
        self._reactor = reactor
        self._done_id = reactor.call_on("registration-done", self._done)
        self._failed_id = reactor.call_on("registration-failed", self._failed)
        self.deferred = Deferred()

    def _cancel_calls(self):
        self._reactor.cancel_call(self._done_id)
        self._reactor.cancel_call(self._failed_id)

    def _done(self):
        self.deferred.callback(None)
        self._cancel_calls()

    def _failed(self):
        self.deferred.errback(InvalidCredentialsError())
        self._cancel_calls()


def _extract_ec2_instance_data(raw_user_data, launch_index):
    """
    Given the raw string of EC2 User Data, parse it and return the dict of
    instance data for this particular instance.

    If the data can't be parsed, a debug message will be logged and None
    will be returned.
    """
    try:
        user_data = loads(raw_user_data)
    except ValueError:
        logging.debug("Got invalid user-data %r" % (raw_user_data,))
        return

    if not isinstance(user_data, dict):
        logging.debug("user-data %r is not a dict" % (user_data,))
        return
    for key in "otps", "exchange-url", "ping-url":
        if key not in user_data:
            logging.debug("user-data %r doesn't have key %r."
                          % (user_data, key))
            return
    if len(user_data["otps"]) <= launch_index:
        logging.debug("user-data %r doesn't have OTP for launch index %d"
                      % (user_data, launch_index))
        return
    instance_data = {"otp": user_data["otps"][launch_index],
                     "exchange-url": user_data["exchange-url"],
                     "ping-url": user_data["ping-url"]}
    if "ssl-ca-certificate" in user_data:
        instance_data["ssl-ca-certificate"] = user_data["ssl-ca-certificate"]
    return instance_data


def _wait_for_network():
    """
    Keep trying to connect to the EC2 metadata server until it becomes
    accessible or until five minutes pass.

    This is necessary because the networking init script on Ubuntu is
    asynchronous; the network may not actually be up by the time the
    landscape-client init script is invoked.
    """
    timeout = 5 * 60
    port = 80

    start = time.time()
    while True:
        s = socket.socket()
        try:
            s.connect((EC2_HOST, port))
            s.close()
            return
        except socket.error:
            time.sleep(1)
            if time.time() - start > timeout:
                break


def is_cloud_managed(fetch=fetch):
    """
    Return C{True} if the machine has been started by Landscape, i.e. if we can
    find the expected data inside the EC2 user-data field.
    """
    _wait_for_network()
    try:
        raw_user_data = fetch(EC2_API + "/user-data",
                              connect_timeout=5)
        launch_index = fetch(EC2_API + "/meta-data/ami-launch-index",
                             connect_timeout=5)
    except FetchError:
        return False
    instance_data = _extract_ec2_instance_data(
        raw_user_data, int(launch_index))
    return instance_data is not None