This file is indexed.

/usr/lib/python3/dist-packages/webtest/http.py is in python3-webtest 2.0.18-1ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# -*- coding: utf-8 -*-
"""
This module contains some helpers to deal with the real http
world.
"""

import threading
import logging
import select
import socket
import time
import os

import six
import webob
from six.moves import http_client
from waitress.server import TcpWSGIServer


def get_free_port():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(('', 0))
    ip, port = s.getsockname()
    s.close()
    ip = os.environ.get('WEBTEST_SERVER_BIND', '127.0.0.1')
    return ip, port


def check_server(host, port, path_info='/', timeout=3, retries=30):
    """Perform a request until the server reply"""
    if retries < 0:
        return 0
    time.sleep(.3)
    for i in range(retries):
        try:
            conn = http_client.HTTPConnection(host, int(port), timeout=timeout)
            conn.request('GET', path_info)
            res = conn.getresponse()
            return res.status
        except (socket.error, http_client.HTTPException):
            time.sleep(.3)
    return 0


class StopableWSGIServer(TcpWSGIServer):
    """StopableWSGIServer is a TcpWSGIServer which run in a separated thread.
    This allow to use tools like casperjs or selenium.

    Server instance have an ``application_url`` attribute formated with the
    server host and port.
    """

    was_shutdown = False

    def __init__(self, application, *args, **kwargs):
        super(StopableWSGIServer, self).__init__(self.wrapper, *args, **kwargs)
        self.runner = None
        self.test_app = application
        self.application_url = 'http://%s:%s/' % (self.adj.host, self.adj.port)

    def wrapper(self, environ, start_response):
        """Wrap the wsgi application to override some path:

        ``/__application__``: allow to ping the server.

        ``/__file__?__file__={path}``: serve the file found at ``path``
        """
        if '__file__' in environ['PATH_INFO']:
            req = webob.Request(environ)
            resp = webob.Response()
            resp.content_type = 'text/html; charset=UTF-8'
            filename = req.params.get('__file__')
            if os.path.isfile(filename):
                body = open(filename, 'rb').read()
                body = body.replace(six.b('http://localhost/'),
                                    six.b('http://%s/' % req.host))
                resp.body = body
            else:
                resp.status = '404 Not Found'
            return resp(environ, start_response)
        elif '__application__' in environ['PATH_INFO']:
            return webob.Response('server started')(environ, start_response)
        return self.test_app(environ, start_response)

    def run(self):
        """Run the server"""
        try:
            self.asyncore.loop(.5, map=self._map)
        except select.error:  # pragma: no cover
            if not self.was_shutdown:
                raise

    def shutdown(self):
        """Shutdown the server"""
        # avoid showing traceback related to asyncore
        self.was_shutdown = True
        self.logger.setLevel(logging.FATAL)
        while self._map:
            triggers = list(self._map.values())
            for trigger in triggers:
                trigger.handle_close()
        self.maintenance(0)
        self.task_dispatcher.shutdown()
        return True

    @classmethod
    def create(cls, application, **kwargs):
        """Start a server to serve ``application``. Return a server
        instance."""
        host, port = get_free_port()
        if 'port' not in kwargs:
            kwargs['port'] = port
        if 'host' not in kwargs:
            kwargs['host'] = host
        if 'expose_tracebacks' not in kwargs:
            kwargs['expose_tracebacks'] = True
        server = cls(application, **kwargs)
        server.runner = threading.Thread(target=server.run)
        server.runner.daemon = True
        server.runner.start()
        return server

    def wait(self, retries=30):
        """Wait until the server is started"""
        running = check_server(self.adj.host, self.adj.port,
                               '/__application__', retries=retries)
        if running:
            return True
        try:
            self.shutdown()
        finally:
            return False