This file is indexed.

/usr/lib/python3/dist-packages/maascli/utils.py is in python3-maas-client 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# Copyright 2012-2016 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Utilities for the command-line interface."""

__all__ = [
    "dump_response_summary",
    "ensure_trailing_slash",
    "get_response_content_type",
    "handler_command_name",
    "is_response_textual",
    "parse_docstring",
    "print_response_content",
    "print_response_headers",
    "safe_name",
    "sudo_gid",
    "sudo_uid",
]

from contextlib import contextmanager
from email.message import Message
from functools import partial
from inspect import (
    cleandoc,
    getdoc,
)
import io
import os
from os import (
    setegid,
    seteuid,
)
import re
import sys
from urllib.parse import urlparse


re_paragraph_splitter = re.compile(
    r"(?:\r\n){2,}|\r{2,}|\n{2,}", re.MULTILINE)

paragraph_split = re_paragraph_splitter.split
docstring_split = partial(paragraph_split, maxsplit=1)
remove_line_breaks = lambda string: (
    " ".join(line.strip() for line in string.splitlines()))

newline = "\n"
empty = ""


def parse_docstring(thing):
    """Parse python docstring for `thing`.

    Returns a tuple: (title, body).  As per docstring convention, title is
    the docstring's first paragraph and body is the rest.
    """
    assert not isinstance(thing, bytes)
    is_string = isinstance(thing, str)
    doc = cleandoc(thing) if is_string else getdoc(thing)
    doc = empty if doc is None else doc
    assert not isinstance(doc, bytes)
    # Break the docstring into two parts: title and body.
    parts = docstring_split(doc)
    if len(parts) == 2:
        title, body = parts[0], parts[1]
    else:
        title, body = parts[0], empty
    # Remove line breaks from the title line.
    title = remove_line_breaks(title)
    # Normalise line-breaks on newline.
    body = body.replace("\r\n", newline).replace("\r", newline)
    return title, body


re_camelcase = re.compile(
    r"([A-Z]*[a-z0-9]+|[A-Z]+)(?:(?=[^a-z0-9])|\Z)")


def safe_name(string):
    """Return a munged version of string, suitable as an ASCII filename."""
    return "-".join(re_camelcase.findall(string))


def handler_command_name(string):
    """Create a handler command name from an arbitrary string.

    Camel-case parts of string will be extracted, converted to lowercase,
    joined with hyphens, and the rest discarded. The term "handler" will also
    be removed if discovered amongst the aforementioned parts.
    """
    parts = re_camelcase.findall(string)
    parts = (part.lower() for part in parts)
    parts = (part for part in parts if part != "handler")
    return "-".join(parts)


def ensure_trailing_slash(string):
    """Ensure that `string` has a trailing forward-slash."""
    slash = b"/" if isinstance(string, bytes) else "/"
    return (string + slash) if not string.endswith(slash) else string


def api_url(string):
    """Ensure that `string` looks like a URL to the API.

    This ensures that the API version is specified explicitly (i.e. the path
    ends with /api/{version}). If not, version 2.0 is selected. It also
    ensures that the path ends with a forward-slash.

    This is suitable for use as an argument type with argparse.
    """
    url = urlparse(string)
    url = url._replace(path=ensure_trailing_slash(url.path))
    if re.search("/api/[0-9.]+/?$", url.path) is None:
        url = url._replace(path=url.path + "api/2.0/")
    return url.geturl()


def import_module(import_str):
    """Import a module."""
    __import__(import_str)
    return sys.modules[import_str]


def try_import_module(import_str, default=None):
    """Try to import a module."""
    try:
        return import_module(import_str)
    except ImportError:
        return default


def get_response_content_type(response):
    """Returns the response's content-type, without parameters.

    If the content-type was not set in the response, returns `None`.

    :type response: :class:`httplib2.Response`
    """
    try:
        content_type = response["content-type"]
    except KeyError:
        return None
    else:
        # It seems odd to create a Message instance here, but at the time of
        # writing it's the only place that has the smarts to correctly deal
        # with a Content-Type that contains a charset (or other parameters).
        message = Message()
        message.set_type(content_type)
        return message.get_content_type()


def is_response_textual(response):
    """Is the response body text?"""
    content_type = get_response_content_type(response)
    return (
        content_type.endswith("/json") or
        content_type.startswith("text/"))


def print_response_headers(headers, file=None):
    """Write the response's headers to stdout in a human-friendly way.

    :type headers: :class:`httplib2.Response`, or :class:`dict`
    """
    file = sys.stdout if file is None else file
    # Function to change headers like "transfer-encoding" into
    # "Transfer-Encoding".
    cap = lambda header: "-".join(
        part.capitalize() for part in header.split("-"))
    # Format string to prettify reporting of response headers.
    form = "%%%ds: %%s" % (
        max(len(header) for header in headers) + 2)
    # Print the response.
    for header in sorted(headers):
        print(form % (cap(header), headers[header]), file=file)


def print_response_content(response, content, file=None):
    """Write the response's content to stdout.

    If the response is textual, a trailing \n is appended.
    :param response: HTTP response metadata
    :param content: bytes
    :param file: a binary stream opened for writing (optional)
    """
    file = sys.stdout if file is None else file
    is_tty = file.isatty()
    success = response.status // 100 == 2
    is_textual = is_response_textual(response)
    # Get the underlying buffer if we're writing to stdout. This allows us to
    # write bytes directly, without attempting to convert the bytes to unicode.
    # Unicode output may not be desired; the HTTP response could be raw bytes.
    if isinstance(file, io.TextIOWrapper):
        file = file.buffer
    if is_tty and success and is_textual:
        file.write(b"Success.\n")
        file.write(b"Machine-readable output follows:\n")
    file.write(content)
    if is_tty and is_textual:
        file.write(b"\n")


def dump_response_summary(response, file=None):
    """Dump the response line and headers to stderr.

    Intended for debugging.
    """
    file = sys.stderr if file is None else file
    print(response.status, response.reason, file=file)
    print(file=file)
    print_response_headers(response, file=file)
    print(file=file)


@contextmanager
def sudo_uid():
    """Context to revert effective UID to that of the user invoking `sudo`."""
    try:
        sudo_uid = os.environ["SUDO_UID"]
    except KeyError:
        yield  # Not running under sudo.
    else:
        orig_euid = os.geteuid()
        seteuid(int(sudo_uid))
        try:
            yield
        finally:
            seteuid(orig_euid)


@contextmanager
def sudo_gid():
    """Context to revert effective GID to that of the user invoking `sudo`."""
    try:
        sudo_gid = os.environ["SUDO_GID"]
    except KeyError:
        yield  # Not running under sudo.
    else:
        orig_egid = os.getegid()
        setegid(int(sudo_gid))
        try:
            yield
        finally:
            setegid(orig_egid)