This file is indexed.

/usr/lib/python2.7/dist-packages/provisioningserver/tags.py is in python-maas-provisioningserver 1.5.4+bzr2294-0ubuntu1.2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# Copyright 2012 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Celery jobs for managing tags.

"""

from __future__ import (
    absolute_import,
    print_function,
    unicode_literals,
    )

str = None

__metaclass__ = type
__all__ = [
    'merge_details',
    'merge_details_cleanly',
    'MissingCredentials',
    'process_node_tags',
    ]


from collections import OrderedDict
from functools import partial
import httplib
from logging import getLogger
import urllib2

from apiclient.maas_client import (
    MAASClient,
    MAASDispatcher,
    MAASOAuth,
    )
import bson
from lxml import etree
from provisioningserver.auth import (
    get_recorded_api_credentials,
    get_recorded_nodegroup_uuid,
    )
from provisioningserver.cluster_config import get_maas_url
from provisioningserver.utils import (
    classify,
    try_match_xpath,
    )
import simplejson as json


logger = getLogger(__name__)


class MissingCredentials(Exception):
    """The MAAS URL or credentials are not yet set."""


# An example laptop's lshw XML dump was 135kB. An example lab's LLDP
# XML dump was 1.6kB. A batch size of 100 would mean downloading ~14MB
# from the region controller, which seems workable. The previous batch
# size of 1000 would have resulted in a ~140MB download, which, on the
# face of it, appears excessive.
DEFAULT_BATCH_SIZE = 100


def get_cached_knowledge():
    """Get all the information that we need to know, or raise an error.

    :return: (client, nodegroup_uuid)
    """
    api_credentials = get_recorded_api_credentials()
    if api_credentials is None:
        logger.error("Not updating tags: don't have API key yet.")
        return None, None
    nodegroup_uuid = get_recorded_nodegroup_uuid()
    if nodegroup_uuid is None:
        logger.error("Not updating tags: don't have UUID yet.")
        return None, None
    client = MAASClient(
        MAASOAuth(*api_credentials), MAASDispatcher(),
        get_maas_url())
    return client, nodegroup_uuid


# A content-type: function mapping that can decode data of that type.
decoders = {
    "application/json": lambda data: json.loads(data),
    "application/bson": lambda data: bson.BSON(data).decode(),
}


def process_response(response):
    """All responses should be httplib.OK.

    Additionally, `decoders` will be consulted in an attempt to decode
    the content. If it can't be decoded it will be returned as bytes.

    :param response: The result of MAASClient.get/post/etc.
    :type response: urllib2.addinfourl (a file-like object that has a .code
        attribute.)
    """
    if response.code != httplib.OK:
        text_status = httplib.responses.get(response.code, '<unknown>')
        message = '%s, expected 200 OK' % text_status
        raise urllib2.HTTPError(
            response.url, response.code, message,
            response.headers, response.fp)
    content = response.read()
    content_type = response.headers.gettype()
    if content_type in decoders:
        decode = decoders[content_type]
        return decode(content)
    else:
        return content


def get_nodes_for_node_group(client, nodegroup_uuid):
    """Retrieve the UUIDs of nodes in a particular group.

    :param client: MAAS client instance
    :param nodegroup_uuid: Node group for which to retrieve nodes
    :return: List of UUIDs for nodes in nodegroup
    """
    path = '/api/1.0/nodegroups/%s/' % (nodegroup_uuid)
    return process_response(client.get(path, op='list_nodes'))


def get_details_for_nodes(client, nodegroup_uuid, system_ids):
    """Retrieve details for a set of nodes.

    :param client: MAAS client
    :param system_ids: List of UUIDs of systems for which to fetch LLDP data
    :return: Dictionary mapping node UUIDs to details, e.g. LLDP output
    """
    path = '/api/1.0/nodegroups/%s/' % (nodegroup_uuid,)
    return process_response(client.post(
        path, op='details', system_ids=system_ids))


def post_updated_nodes(client, tag_name, tag_definition, uuid, added, removed):
    """Update the nodes relevant for a particular tag.

    :param client: MAAS client
    :param tag_name: Name of tag
    :param tag_definition: Definition of the tag, used to assure that the work
        being done matches the current value.
    :param uuid: NodeGroup uuid of this worker. Needed for security
        permissions. (The nodegroup worker is only allowed to touch nodes in
        its nodegroup, otherwise you need to be a superuser.)
    :param added: Set of nodes to add
    :param removed: Set of nodes to remove
    """
    path = '/api/1.0/tags/%s/' % (tag_name,)
    logger.debug(
        "Updating nodes for %s %s, adding %s removing %s"
        % (tag_name, uuid, len(added), len(removed)))
    try:
        return process_response(client.post(
            path, op='update_nodes', as_json=True, nodegroup=uuid,
            definition=tag_definition, add=added, remove=removed))
    except urllib2.HTTPError as e:
        if e.code == httplib.CONFLICT:
            if e.fp is not None:
                msg = e.fp.read()
            else:
                msg = e.msg
            logger.info("Got a CONFLICT while updating tag: %s", msg)
            return {}
        raise


def _details_prepare_merge(details):
    # We may mutate the details later, so copy now to prevent
    # affecting the caller's data.
    details = details.copy()

    # Prepare an nsmap in an OrderedDict. This ensures that lxml
    # serializes namespace declarations in a stable order.
    nsmap = OrderedDict((ns, ns) for ns in sorted(details))

    # Root everything in a namespace-less element. Setting the nsmap
    # here ensures that prefixes are preserved when dumping later.
    # This element will be replaced by the root of the lshw detail.
    # However, if there is no lshw detail, this root element shares
    # its tag with the tag of an lshw XML tree, so that XPath
    # expressions written with the lshw tree in mind will still work
    # without it, e.g. "/list//{lldp}something".
    root = etree.Element("list", nsmap=nsmap)

    # We have copied details, and root is new.
    return details, root


def _details_make_backwards_compatible(details, root):
    # For backward-compatibilty, if lshw details are available, these
    # should form the root of the composite document.
    xmldata = details.get("lshw")
    if xmldata is not None:
        try:
            lshw = etree.fromstring(xmldata)
        except etree.XMLSyntaxError as e:
            logger.warn("Invalid lshw details: %s", e)
            del details["lshw"]  # Don't process again later.
        else:
            # We're throwing away the existing root, but we can adopt
            # its nsmap by becoming its child.
            root.append(lshw)
            root = lshw

    # We may have mutated details and root.
    return details, root


def _details_do_merge(details, root):
    # Merge the remaining details into the composite document.
    for namespace in sorted(details):
        xmldata = details[namespace]
        if xmldata is not None:
            try:
                detail = etree.fromstring(xmldata)
            except etree.XMLSyntaxError as e:
                logger.warn("Invalid %s details: %s", namespace, e)
            else:
                # Add the namespace to all unqualified elements.
                for elem in detail.iter("{}*"):
                    elem.tag = etree.QName(namespace, elem.tag)
                root.append(detail)

    # Re-home `root` in a new tree. This ensures that XPath
    # expressions like "/some-tag" work correctly. Without this, when
    # there's well-formed lshw data -- see the backward-compatibilty
    # hack futher up -- expressions would be evaluated from the first
    # root created in this function, even though that root is now the
    # parent of the current `root`.
    return etree.ElementTree(root)


def merge_details(details):
    """Merge node details into a single XML document.

    `details` should be of the form::

      {"name": xml-as-bytes, "name2": xml-as-bytes, ...}

    where `name` is the namespace (and prefix) where each detail's XML
    should be placed in the composite document; elements in each
    detail document without a namespace are moved into that namespace.

    The ``lshw`` detail is treated specially, purely for backwards
    compatibility. If present, it forms the root of the composite
    document, without any namespace changes, plus it will be included
    in the composite document in the ``lshw`` namespace.

    The returned document is always rooted with a ``list`` element.
    """
    details, root = _details_prepare_merge(details)
    details, root = _details_make_backwards_compatible(details, root)
    return _details_do_merge(details, root)


def merge_details_cleanly(details):
    """Merge node details into a single XML document.

    `details` should be of the form::

      {"name": xml-as-bytes, "name2": xml-as-bytes, ...}

    where `name` is the namespace (and prefix) where each detail's XML
    should be placed in the composite document; elements in each
    detail document without a namespace are moved into that namespace.

    This is similar to `merge_details`, but the ``lshw`` detail is not
    treated specially. The result of this function is not compatible
    with XPath expressions created for old releases of MAAS.

    The returned document is always rooted with a ``list`` element.
    """
    details, root = _details_prepare_merge(details)
    return _details_do_merge(details, root)


def gen_batch_slices(count, size):
    """Generate `slice`s to split `count` objects into batches.

    The batches will be evenly distributed; no batch will differ in
    length from any other by more than 1.

    Note that the slices returned include a step. This means that
    slicing a list with the aid of this function then concatenating
    the results will not give you the same list. All the elements will
    be present, but not in the same order.

    :return: An iterator of `slice`s.
    """
    batch_count, remaining = divmod(count, size)
    batch_count += 1 if remaining > 0 else 0
    for batch in xrange(batch_count):
        yield slice(batch, None, batch_count)


def gen_batches(things, batch_size):
    """Split `things` into even batches of <= `batch_size`.

    Note that batches are calculated by `get_batch_slices` which does
    not guarantee ordering.

    :type things: `list`, or anything else that can be sliced.

    :return: An iterator of `slice`s of `things`.
    """
    slices = gen_batch_slices(len(things), batch_size)
    return (things[s] for s in slices)


def gen_node_details(client, nodegroup_uuid, batches):
    """Fetch node details.

    This lazily fetches data in batches, but this detail is hidden
    from callers.

    :return: An iterator of ``(system-id, details-document)`` tuples.
    """
    get_details = partial(get_details_for_nodes, client, nodegroup_uuid)
    for batch in batches:
        for system_id, details in get_details(batch).iteritems():
            yield system_id, merge_details(details)


def process_all(client, tag_name, tag_definition, nodegroup_uuid, system_ids,
                xpath, batch_size=None):
    logger.debug(
        "processing %d system_ids for tag %s nodegroup %s",
        len(system_ids), tag_name, nodegroup_uuid)

    if batch_size is None:
        batch_size = DEFAULT_BATCH_SIZE

    batches = gen_batches(system_ids, batch_size)
    node_details = gen_node_details(client, nodegroup_uuid, batches)
    nodes_matched, nodes_unmatched = classify(
        partial(try_match_xpath, xpath, logger=logger), node_details)

    # Upload all updates for one nodegroup at one time. This should be no more
    # than ~41*10,000 = 410kB. That should take <1s even on a 10Mbit network.
    # This also allows us to track if a nodegroup has been processed in the DB,
    # without having to add another API call.
    post_updated_nodes(
        client, tag_name, tag_definition, nodegroup_uuid,
        nodes_matched, nodes_unmatched)


def process_node_tags(tag_name, tag_definition, tag_nsmap, batch_size=None):
    """Update the nodes for a new/changed tag definition.

    :param tag_name: Name of the tag to update nodes for
    :param tag_definition: Tag definition
    :param batch_size: Size of batch
    """
    client, nodegroup_uuid = get_cached_knowledge()
    if not all([client, nodegroup_uuid]):
        logger.error(
            "Unable to update tag: %s for definition %r. "
            "Please refresh secrets, then rebuild this tag."
            % (tag_name, tag_definition))
        raise MissingCredentials()
    # We evaluate this early, so we can fail before sending a bunch of data to
    # the server
    xpath = etree.XPath(tag_definition, namespaces=tag_nsmap)
    # Get nodes to process
    system_ids = get_nodes_for_node_group(client, nodegroup_uuid)
    process_all(
        client, tag_name, tag_definition, nodegroup_uuid, system_ids, xpath,
        batch_size=batch_size)