This file is indexed.

/usr/share/pyshared/pesto/testing.py is in python-pesto 25-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# Copyright (c) 2007-2010 Oliver Cope. All rights reserved.
# See LICENSE.txt for terms of redistribution and use.

"""
pesto.testing
-------------

Test utilities for WSGI applications.
"""


from itertools import chain
try:
    from wsgiref.validate import validator as wsgi_validator
except ImportError:
    # Redefine wsgi_validator as a no-op for Python < 2.5
    wsgi_validator = lambda app: app

from StringIO import StringIO
from shutil import copyfileobj
from urlparse import urlparse

from pesto.response import Response
from pesto.request import Request
from pesto.wsgiutils import make_query

CRLF = '\r\n'

class MockResponse(Response):
    """
    Response class with some extra methods to facilitate testing output of
    applications
    """

    def __init__(self, content=None, status="200 OK", headers=None, onclose=None, add_default_content_type=True, **kwargs):
        super(MockResponse, self).__init__(
            content,
            status,
            headers,
            onclose,
            add_default_content_type,
            **kwargs
        )
        # Buffer the content iterator to make sure that it is not exhausted
        # when inspecting it through the various debug methods
        self._content = list(content)
        if getattr(content, 'close', None):
            content.close()

    def __str__(self):
        """
        Return a string representation of the entire response
        """
        return ''.join(
            chain(
                ['%s\r\n' % (self.status,)],
                ('%s: %s\r\n' % (k, v) for k, v in self.headers),
                ['\r\n'],
                self.content
            )
        )

    def text(self):
        """
        Return a string representation of the entire response, using newlines
        to separate headers, rather than the CRLF required by the HTTP spec.
        """
        return ''.join(
            chain(
                ['%s\n' % (self.status,)],
                ('%s: %s\n' % (k, v) for k, v in self.headers),
                ['\n'],
                self.content
            )
        )


    @property
    def body(self):
        """
        Content part as a single string
        """
        return ''.join(self.content)

class TestApp(object):

    response_class = MockResponse

    environ_defaults = {
        'SCRIPT_NAME': "",
        'PATH_INFO': "",
        'QUERY_STRING': "",
        'SERVER_NAME': "localhost",
        'SERVER_PORT': "80",
        'SERVER_PROTOCOL': "HTTP/1.0",
        'REMOTE_ADDR': '127.0.0.1',
        'wsgi.version': (1, 0),
        'wsgi.url_scheme': 'http',
        'wsgi.multithread': False,
        'wsgi.multiprocess': False,
        'wsgi.run_once': False,
    }

    def __init__(self, app, charset='utf-8', **environ_defaults):

        self.app = app
        self.charset = charset
        self.environ_defaults = self.environ_defaults.copy()
        self.environ_defaults.update(**environ_defaults)

    @classmethod
    def make_environ(cls, REQUEST_METHOD='GET', PATH_INFO='', wsgi_input='', **kwargs):
        """
        Generate a WSGI environ suitable for testing applications

        Example usage::

            >>> from pprint import pprint
            >>> pprint(make_environ(PATH_INFO='/xyz')) # doctest: +ELLIPSIS
            {'PATH_INFO': '/xyz',
             'QUERY_STRING': '',
             'REMOTE_ADDR': '127.0.0.1',
             'REQUEST_METHOD': 'GET',
             'SCRIPT_NAME': '',
             'SERVER_NAME': 'localhost',
             'SERVER_PORT': '80',
             'SERVER_PROTOCOL': 'HTTP/1.0',
             'wsgi.errors': <StringIO.StringIO instance at 0x...>,
             'wsgi.input': <StringIO.StringIO instance at 0x...>,
             'wsgi.multiprocess': False,
             'wsgi.multithread': False,
             'wsgi.run_once': False,
             'wsgi.url_scheme': 'http',
             'wsgi.version': (1, 0)}

        """

        SCRIPT_NAME = kwargs.pop('SCRIPT_NAME', cls.environ_defaults["SCRIPT_NAME"])

        if SCRIPT_NAME and SCRIPT_NAME[-1] == "/":
            SCRIPT_NAME = SCRIPT_NAME[:-1]
            PATH_INFO = "/" + PATH_INFO

        environ = cls.environ_defaults.copy()
        environ.update(kwargs)
        for key, value in kwargs.items():
            environ[key.replace('wsgi_', 'wsgi.')] = value

        if isinstance(wsgi_input, basestring):
            wsgi_input = StringIO(wsgi_input)

        environ.update({
            'REQUEST_METHOD': REQUEST_METHOD,
            'SCRIPT_NAME': SCRIPT_NAME,
            'PATH_INFO': PATH_INFO,
            'wsgi.input': wsgi_input,
            'wsgi.errors': StringIO(),
        })

        if environ['SCRIPT_NAME'] == '/':
            environ['SCRIPT_NAME'] = ''
            environ['PATH_INFO'] = '/' + environ['PATH_INFO']

        while PATH_INFO.startswith('//'):
            PATH_INFO = PATH_INFO[1:]

        return environ

    def _request(self, REQUEST_METHOD, PATH_INFO="", **kwargs):
        """
        Generate a WSGI request of HTTP method ``REQUEST_METHOD`` and pass it
        to the application being tested.
        """
        environ = self.make_environ(REQUEST_METHOD, PATH_INFO, **kwargs)

        if '?' in environ['PATH_INFO']:
            environ['PATH_INFO'], querystring = environ['PATH_INFO'].split('?', 1)
            if environ.get('QUERY_STRING'):
                environ['QUERY_STRING'] += querystring
            else:
                environ['QUERY_STRING'] = querystring

        app = wsgi_validator(self.app)
        return self.response_class.from_wsgi(app, environ, self.start_response)

    def get(self, PATH_INFO='/', data=None, charset='UTF-8', **kwargs):
        """
        Make a GET request to the application and return the response.
        """
        if data is not None:
            kwargs.setdefault('QUERY_STRING', make_query(data, charset=charset))

        return self._request(
            'GET',
            PATH_INFO=PATH_INFO,
            **kwargs
        )

    def head(self, PATH_INFO='/', data=None, charset='UTF-8', **kwargs):
        """
        Make a GET request to the application and return the response.
        """
        if data is not None:
            kwargs.setdefault('QUERY_STRING', make_query(data, charset=charset))

        return self._request(
            'HEAD',
            PATH_INFO=PATH_INFO,
            **kwargs
        )

    def start_response(self, status, headers, exc_info=None):
        """
        WSGI start_response method
        """

    def post(self, PATH_INFO='/', data=None, charset='UTF-8', **kwargs):
        """
        Make a POST request to the application and return the response.
        """
        if data is None:
            data = {}

        data = make_query(data, charset=charset)
        wsgi_input = StringIO(data)
        wsgi_input.seek(0)

        return self._request(
            'POST',
            PATH_INFO=PATH_INFO,
            CONTENT_TYPE="application/x-www-form-urlencoded",
            CONTENT_LENGTH=str(len(data)),
            wsgi_input=wsgi_input,
        )

    def post_multipart(self, PATH_INFO='/', data=None, files=None, charset='UTF-8', **kwargs):
        """
        Create a MockWSGI configured to post multipart/form-data to the given URI.

        This is usually used for mocking file uploads

        :param data: dictionary of post data
        :param files:
            list of ``(name, filename, content_type, data)`` tuples. ``data``
            may be either a byte string, iterator or file-like object.
        """

        boundary = '----------------------------------------BoUnDaRyVaLuE'
        if data is None:
            data = {}
        if files is None:
            files = []

        items = chain(
            (
                (
                    [
                        ('Content-Disposition',
                         'form-data; name="%s"' % (name,))
                    ],
                    data.encode(charset)
                ) for name, data in data.items()
            ), (
                (
                    [
                        ('Content-Disposition',
                         'form-data; name="%s"; filename="%s"' % (name, fname)),
                        ('Content-Type', content_type)
                    ], data
                ) for name, fname, content_type, data in files
            )
        )
        post_data = StringIO()
        post_data.write('--' + boundary)
        for headers, data in items:
            post_data.write(CRLF)
            for name, value in headers:
                post_data.write('%s: %s%s' % (name, value, CRLF))
            post_data.write(CRLF)
            if hasattr(data, 'read'):
                copyfileobj(data, post_data)
            elif isinstance(data, str):
                post_data.write(data)
            else:
                for chunk in data:
                    post_data.write(chunk)
            post_data.write(CRLF)
            post_data.write('--' + boundary)
        post_data.write('--' + CRLF)
        length = post_data.tell()
        post_data.seek(0)
        kwargs.setdefault('CONTENT_LENGTH', str(length))
        return self._request(
            'POST',
            PATH_INFO,
            CONTENT_TYPE='multipart/form-data; boundary=%s' % boundary,
            wsgi_input=post_data,
            **kwargs
        )

make_environ = TestApp.make_environ

class MockRequest(object):
    """
    A mock object for testing WSGI applications

    Synopsis::

        >>> from pesto.core import to_wsgi
        >>> from pesto.response import Response
        >>> mock = MockWSGI('http://www.example.com/nelly')
        >>> mock.request.request_uri
        'http://www.example.com/nelly'
        >>> def app(request):
        ...     return Response(
        ...         content_type = 'text/html; charset=UTF-8',
        ...         x_whoa = 'Nelly',
        ...         content = ['Yop!']
        ...     )
        >>> result = mock.run(to_wsgi(app)) #doctest: +ELLIPSIS
        <pesto.wsgiutils.MockWSGI object at 0x...>
        >>> mock.headers
        [('Content-Type', 'text/html; charset=UTF-8'), ('X-Whoa', 'Nelly')]
        >>> mock.output
        ['Yop!']
        >>> print str(mock)
        200 OK\r
        Content-Type: text/html; charset=UTF-8\r
        X-Whoa: Nelly\r
        \r
        Yop!
        >>>
    """

    def __init__(self, url=None, wsgi_input=None, SCRIPT_NAME='/', charset=None, **environ):

        from pesto.core import to_wsgi

        self.status = None
        self.headers = None
        self.output = None
        self.exc_info = None
        if wsgi_input is not None:
            self.wsgi_input = wsgi_input
        else:
            self.wsgi_input = StringIO()
        self.wsgi_errors = StringIO()

        self.environ = {
            'REQUEST_METHOD'    : "GET",
            'SCRIPT_NAME'       : "/",
            'PATH_INFO'         : "",
            'QUERY_STRING'      : "",
            'CONTENT_TYPE'      : "",
            'CONTENT_LENGTH'    : "",
            'SERVER_NAME'       : "localhost",
            'SERVER_PORT'       : "80",
            'SERVER_PROTOCOL'   : "HTTP/1.0",
            'REMOTE_ADDR'       : "127.0.0.1",
            'wsgi.version'      : (1, 0),
            'wsgi.url_scheme'   : "http",
            'wsgi.input'        : self.wsgi_input,
            'wsgi.errors'       : self.wsgi_errors,
            'wsgi.multithread'  : False,
            'wsgi.multiprocess' : False,
            'wsgi.run_once'     : False,
        }
        self.mockapp = to_wsgi(lambda request: Response(['ok']))

        if url is not None:
            scheme, netloc, path, params, query, fragment = urlparse(url)
            if scheme == '':
                scheme = 'http'
            if netloc == '':
                netloc = 'example.org'
            if ':' in netloc:
                server, port = netloc.split(':')
            else:
                if scheme == 'https':
                    port = '443'
                else:
                    port = '80'
                server = netloc

            assert path.startswith(SCRIPT_NAME)
            PATH_INFO = path[len(SCRIPT_NAME):]
            if SCRIPT_NAME and SCRIPT_NAME[-1] == "/":
                SCRIPT_NAME = SCRIPT_NAME[:-1]
                PATH_INFO = "/" + PATH_INFO

            self.environ.update({
                'wsgi.url_scheme' : scheme,
                'SERVER_NAME'     : server,
                'SERVER_PORT'     : port,
                'SCRIPT_NAME'     : SCRIPT_NAME,
                'QUERY_STRING'    : query,
                'PATH_INFO'       : PATH_INFO,
            })

        self.environ.update(environ)

        if self.environ['SCRIPT_NAME'] == '/':
            self.environ['SCRIPT_NAME'] = ''
            self.environ['PATH_INFO'] = '/' + self.environ['PATH_INFO']

        self.request  = Request(self.environ)
        if charset is not None:
            self.request.charset = charset

        self.buf = StringIO()