This file is indexed.

/usr/lib/python3/dist-packages/testtools/content.py is in python3-testtools 1.8.1-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# Copyright (c) 2009-2012 testtools developers. See LICENSE for details.

"""Content - a MIME-like Content object."""

__all__ = [
    'attach_file',
    'Content',
    'content_from_file',
    'content_from_stream',
    'json_content',
    'text_content',
    'TracebackContent',
    ]

import codecs
import inspect
import json
import os
import sys

from extras import try_import
# To let setup.py work, make this a conditional import.
traceback = try_import('traceback2')

from testtools.compat import (
    _b,
    _u,
    istext,
    str_is_unicode,
)
from testtools.content_type import ContentType, JSON, UTF8_TEXT


functools = try_import('functools')

_join_b = _b("").join


DEFAULT_CHUNK_SIZE = 4096

STDOUT_LINE = '\nStdout:\n%s'
STDERR_LINE = '\nStderr:\n%s'


def _iter_chunks(stream, chunk_size, seek_offset=None, seek_whence=0):
    """Read 'stream' in chunks of 'chunk_size'.

    :param stream: A file-like object to read from.
    :param chunk_size: The size of each read from 'stream'.
    :param seek_offset: If non-None, seek before iterating.
    :param seek_whence: Pass through to the seek call, if seeking.
    """
    if seek_offset is not None:
        stream.seek(seek_offset, seek_whence)
    chunk = stream.read(chunk_size)
    while chunk:
        yield chunk
        chunk = stream.read(chunk_size)


class Content(object):
    """A MIME-like Content object.

    'Content' objects can be serialised to bytes using the iter_bytes method.
    If the 'Content-Type' is recognised by other code, they are welcome to
    look for richer contents that mere byte serialisation - for example in
    memory object graphs etc. However, such code MUST be prepared to receive
    a generic 'Content' object that has been reconstructed from a byte stream.

    :ivar content_type: The content type of this Content.
    """

    def __init__(self, content_type, get_bytes):
        """Create a ContentType."""
        if None in (content_type, get_bytes):
            raise ValueError("None not permitted in %r, %r" % (
                content_type, get_bytes))
        self.content_type = content_type
        self._get_bytes = get_bytes

    def __eq__(self, other):
        return (self.content_type == other.content_type and
            _join_b(self.iter_bytes()) == _join_b(other.iter_bytes()))

    def as_text(self):
        """Return all of the content as text.

        This is only valid where ``iter_text`` is.  It will load all of the
        content into memory.  Where this is a concern, use ``iter_text``
        instead.
        """
        return _u('').join(self.iter_text())

    def iter_bytes(self):
        """Iterate over bytestrings of the serialised content."""
        return self._get_bytes()

    def iter_text(self):
        """Iterate over the text of the serialised content.

        This is only valid for text MIME types, and will use ISO-8859-1 if
        no charset parameter is present in the MIME type. (This is somewhat
        arbitrary, but consistent with RFC2617 3.7.1).

        :raises ValueError: If the content type is not text/\*.
        """
        if self.content_type.type != "text":
            raise ValueError("Not a text type %r" % self.content_type)
        return self._iter_text()

    def _iter_text(self):
        """Worker for iter_text - does the decoding."""
        encoding = self.content_type.parameters.get('charset', 'ISO-8859-1')
        decoder = codecs.getincrementaldecoder(encoding)()
        for bytes in self.iter_bytes():
            yield decoder.decode(bytes)
        final = decoder.decode(_b(''), True)
        if final:
            yield final

    def __repr__(self):
        return "<Content type=%r, value=%r>" % (
            self.content_type, _join_b(self.iter_bytes()))


class StackLinesContent(Content):
    """Content object for stack lines.

    This adapts a list of "preprocessed" stack lines into a 'Content' object.
    The stack lines are most likely produced from ``traceback.extract_stack``
    or ``traceback.extract_tb``.

    text/x-traceback;language=python is used for the mime type, in order to
    provide room for other languages to format their tracebacks differently.
    """

    # Whether or not to hide layers of the stack trace that are
    # unittest/testtools internal code.  Defaults to True since the
    # system-under-test is rarely unittest or testtools.
    HIDE_INTERNAL_STACK = True

    def __init__(self, stack_lines, prefix_content="", postfix_content=""):
        """Create a StackLinesContent for ``stack_lines``.

        :param stack_lines: A list of preprocessed stack lines, probably
            obtained by calling ``traceback.extract_stack`` or
            ``traceback.extract_tb``.
        :param prefix_content: If specified, a unicode string to prepend to the
            text content.
        :param postfix_content: If specified, a unicode string to append to the
            text content.
        """
        content_type = ContentType('text', 'x-traceback',
            {"language": "python", "charset": "utf8"})
        value = prefix_content + \
            self._stack_lines_to_unicode(stack_lines) + \
            postfix_content
        super(StackLinesContent, self).__init__(
            content_type, lambda: [value.encode("utf8")])

    def _stack_lines_to_unicode(self, stack_lines):
        """Converts a list of pre-processed stack lines into a unicode string.
        """
        msg_lines = traceback.format_list(stack_lines)
        return _u('').join(msg_lines)


class TracebackContent(Content):
    """Content object for tracebacks.

    This adapts an exc_info tuple to the 'Content' interface.
    'text/x-traceback;language=python' is used for the mime type, in order to
    provide room for other languages to format their tracebacks differently.
    """

    def __init__(self, err, test, capture_locals=False):
        """Create a TracebackContent for ``err``.

        :param err: An exc_info error tuple.
        :param test: A test object used to obtain failureException.
        :param capture_locals: If true, show locals in the traceback.
        """
        if err is None:
            raise ValueError("err may not be None")

        exctype, value, tb = err
        # Skip test runner traceback levels
        if StackLinesContent.HIDE_INTERNAL_STACK:
            while tb and '__unittest' in tb.tb_frame.f_globals:
                tb = tb.tb_next

        limit = None
        # Disabled due to https://bugs.launchpad.net/testtools/+bug/1188420
        if (False
            and StackLinesContent.HIDE_INTERNAL_STACK
            and test.failureException
            and isinstance(value, test.failureException)):
            # Skip assert*() traceback levels
            limit = 0
            while tb and not self._is_relevant_tb_level(tb):
                limit += 1
                tb = tb.tb_next

        stack_lines = list(traceback.TracebackException(exctype, value, tb,
            limit=limit, capture_locals=capture_locals).format())
        content_type = ContentType('text', 'x-traceback',
            {"language": "python", "charset": "utf8"})
        super(TracebackContent, self).__init__(
            content_type, lambda: [x.encode('utf8') for x in stack_lines])


def StacktraceContent(prefix_content="", postfix_content=""):
    """Content object for stack traces.

    This function will create and return a 'Content' object that contains a
    stack trace.

    The mime type is set to 'text/x-traceback;language=python', so other
    languages can format their stack traces differently.

    :param prefix_content: A unicode string to add before the stack lines.
    :param postfix_content: A unicode string to add after the stack lines.
    """
    stack = traceback.walk_stack(None)
    def filter_stack(stack):
        # Discard the filter_stack frame.
        next(stack)
        # Discard the StacktraceContent frame.
        next(stack)
        for f, f_lineno in stack:
            if StackLinesContent.HIDE_INTERNAL_STACK:
                if '__unittest' in f.f_globals:
                    return
                yield f, f_lineno
    extract = traceback.StackSummary.extract(filter_stack(stack))
    extract.reverse()
    return StackLinesContent(extract, prefix_content, postfix_content)


def json_content(json_data):
    """Create a JSON Content object from JSON-encodeable data."""
    data = json.dumps(json_data)
    if str_is_unicode:
        # The json module perversely returns native str not bytes
        data = data.encode('utf8')
    return Content(JSON, lambda: [data])


def text_content(text):
    """Create a Content object from some text.

    This is useful for adding details which are short strings.
    """
    if not istext(text):
        raise TypeError(
            "text_content must be given text, not '%s'." % type(text).__name__
        )
    return Content(UTF8_TEXT, lambda: [text.encode('utf8')])


def maybe_wrap(wrapper, func):
    """Merge metadata for func into wrapper if functools is present."""
    if functools is not None:
        wrapper = functools.update_wrapper(wrapper, func)
    return wrapper


def content_from_file(path, content_type=None, chunk_size=DEFAULT_CHUNK_SIZE,
                      buffer_now=False, seek_offset=None, seek_whence=0):
    """Create a Content object from a file on disk.

    Note that unless ``buffer_now`` is explicitly passed in as True, the file
    will only be read from when ``iter_bytes`` is called.

    :param path: The path to the file to be used as content.
    :param content_type: The type of content.  If not specified, defaults
        to UTF8-encoded text/plain.
    :param chunk_size: The size of chunks to read from the file.
        Defaults to ``DEFAULT_CHUNK_SIZE``.
    :param buffer_now: If True, read the file from disk now and keep it in
        memory. Otherwise, only read when the content is serialized.
    :param seek_offset: If non-None, seek within the stream before reading it.
    :param seek_whence: If supplied, pass to ``stream.seek()`` when seeking.
    """
    if content_type is None:
        content_type = UTF8_TEXT
    def reader():
        with open(path, 'rb') as stream:
            for chunk in _iter_chunks(stream,
                                      chunk_size,
                                      seek_offset,
                                      seek_whence):
                yield chunk
    return content_from_reader(reader, content_type, buffer_now)


def content_from_stream(stream, content_type=None,
                        chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=False,
                        seek_offset=None, seek_whence=0):
    """Create a Content object from a file-like stream.

    Note that unless ``buffer_now`` is explicitly passed in as True, the stream
    will only be read from when ``iter_bytes`` is called.

    :param stream: A file-like object to read the content from. The stream
        is not closed by this function or the 'Content' object it returns.
    :param content_type: The type of content. If not specified, defaults
        to UTF8-encoded text/plain.
    :param chunk_size: The size of chunks to read from the file.
        Defaults to ``DEFAULT_CHUNK_SIZE``.
    :param buffer_now: If True, reads from the stream right now. Otherwise,
        only reads when the content is serialized. Defaults to False.
    :param seek_offset: If non-None, seek within the stream before reading it.
    :param seek_whence: If supplied, pass to ``stream.seek()`` when seeking.
    """
    if content_type is None:
        content_type = UTF8_TEXT
    reader = lambda: _iter_chunks(stream, chunk_size, seek_offset, seek_whence)
    return content_from_reader(reader, content_type, buffer_now)


def content_from_reader(reader, content_type, buffer_now):
    """Create a Content object that will obtain the content from reader.

    :param reader: A callback to read the content. Should return an iterable of
        bytestrings.
    :param content_type: The content type to create.
    :param buffer_now: If True the reader is evaluated immediately and
        buffered.
    """
    if content_type is None:
        content_type = UTF8_TEXT
    if buffer_now:
        contents = list(reader())
        reader = lambda: contents
    return Content(content_type, reader)


def attach_file(detailed, path, name=None, content_type=None,
                chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=True):
    """Attach a file to this test as a detail.

    This is a convenience method wrapping around ``addDetail``.

    Note that by default the contents of the file will be read immediately. If
    ``buffer_now`` is False, then the file *must* exist when the test result is
    called with the results of this test, after the test has been torn down.

    :param detailed: An object with details
    :param path: The path to the file to attach.
    :param name: The name to give to the detail for the attached file.
    :param content_type: The content type of the file.  If not provided,
        defaults to UTF8-encoded text/plain.
    :param chunk_size: The size of chunks to read from the file.  Defaults
        to something sensible.
    :param buffer_now: If False the file content is read when the content
        object is evaluated rather than when attach_file is called.
        Note that this may be after any cleanups that obj_with_details has, so
        if the file is a temporary file disabling buffer_now may cause the file
        to be read after it is deleted. To handle those cases, using
        attach_file as a cleanup is recommended because it guarantees a
        sequence for when the attach_file call is made::

            detailed.addCleanup(attach_file, 'foo.txt', detailed)
    """
    if name is None:
        name = os.path.basename(path)
    content_object = content_from_file(
        path, content_type, chunk_size, buffer_now)
    detailed.addDetail(name, content_object)