This file is indexed.

/usr/lib/python2.7/dist-packages/livereload/server.py is in python-livereload 2.2.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# -*- coding: utf-8 -*-
"""
    livereload.server
    ~~~~~~~~~~~~~~~~~

    WSGI app server for livereload.

    :copyright: (c) 2013 by Hsiaoming Yang
"""

import os
import logging
from subprocess import Popen, PIPE
import time
import threading
import webbrowser

from tornado import escape
from tornado.wsgi import WSGIContainer
from tornado.ioloop import IOLoop
from tornado.web import Application, FallbackHandler
from .handlers import LiveReloadHandler, LiveReloadJSHandler
from .handlers import ForceReloadHandler, StaticHandler
from .watcher import Watcher
from ._compat import text_types
from tornado.log import enable_pretty_logging
enable_pretty_logging()


def shell(command, output=None, mode='w'):
    """Command shell command.

    You can add a shell command::

        server.watch(
            'style.less', shell('lessc style.less', output='style.css')
        )

    :param command: a shell command, string or list
    :param output: output stdout to the given file
    :param mode: only works with output, mode ``w`` means write,
                 mode ``a`` means append
    """
    if not output:
        output = os.devnull
    else:
        folder = os.path.dirname(output)
        if folder and not os.path.isdir(folder):
            os.makedirs(folder)

    if isinstance(command, (list, tuple)):
        cmd = command
    else:
        cmd = command.split()

    def run_shell():
        try:
            p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
        except OSError as e:
            logging.error(e)
            if e.errno == os.errno.ENOENT:  # file (command) not found
                logging.error("maybe you haven't installed %s", cmd[0])
            return e
        stdout, stderr = p.communicate()
        if stderr:
            logging.error(stderr)
            return stderr
        #: stdout is bytes, decode for python3
        code = stdout.decode()
        with open(output, mode) as f:
            f.write(code)

    return run_shell


class WSGIWrapper(WSGIContainer):
    """Insert livereload scripts into response body."""

    def __call__(self, request):
        data = {}
        response = []

        def start_response(status, response_headers, exc_info=None):
            data["status"] = status
            data["headers"] = response_headers
            return response.append
        app_response = self.wsgi_application(
            WSGIContainer.environ(request), start_response)
        try:
            response.extend(app_response)
            body = b"".join(response)
        finally:
            if hasattr(app_response, "close"):
                app_response.close()
        if not data:
            raise Exception("WSGI app did not call start_response")

        status_code = int(data["status"].split()[0])
        headers = data["headers"]
        header_set = set(k.lower() for (k, v) in headers)
        body = escape.utf8(body)
        body = body.replace(
            b'</head>',
            b'<script src="/livereload.js"></script></head>'
        )

        if status_code != 304:
            if "content-length" not in header_set:
                headers.append(("Content-Length", str(len(body))))
            if "content-type" not in header_set:
                headers.append(("Content-Type", "text/html; charset=UTF-8"))
        if "server" not in header_set:
            headers.append(("Server", "livereload-tornado"))

        parts = [escape.utf8("HTTP/1.1 " + data["status"] + "\r\n")]
        for key, value in headers:
            if key.lower() == 'content-length':
                value = str(len(body))
            parts.append(
                escape.utf8(key) + b": " + escape.utf8(value) + b"\r\n"
            )
        parts.append(b"\r\n")
        parts.append(body)
        request.write(b"".join(parts))
        request.finish()
        self._log(status_code, request)


class Server(object):
    """Livereload server interface.

    Initialize a server and watch file changes::

        server = Server(wsgi_app)
        server.serve()

    :param app: a wsgi application instance
    :param watcher: A Watcher instance, you don't have to initialize
                    it by yourself. Under Linux, you will want to install
                    pyinotify and use INotifyWatcher() to avoid wasted
                    CPU usage.
    """
    def __init__(self, app=None, watcher=None):
        self.app = app
        self.port = 5500
        self.root = None
        if not watcher:
            watcher = Watcher()
        self.watcher = watcher

    def watch(self, filepath, func=None):
        """Add the given filepath for watcher list.

        Once you have intialized a server, watch file changes before
        serve the server::

            server.watch('static/*.stylus', 'make static')
            def alert():
                print('foo')
            server.watch('foo.txt', alert)
            server.serve()

        :param filepath: files to be watched, it can be a filepath,
                         a directory, or a glob pattern
        :param func: the function to be called, it can be a string of
                     shell command, or any callable object without
                     parameters
        """
        if isinstance(func, text_types):
            func = shell(func)

        self.watcher.watch(filepath, func)

    def application(self, debug=True):
        LiveReloadHandler.watcher = self.watcher
        handlers = [
            (r'/livereload', LiveReloadHandler),
            (r'/forcereload', ForceReloadHandler),
            (r'/livereload.js', LiveReloadJSHandler, dict(port=self.port)),
        ]

        if self.app:
            self.app = WSGIWrapper(self.app)
            handlers.append(
                (r'.*', FallbackHandler, dict(fallback=self.app))
            )
        else:
            handlers.append(
                (r'(.*)', StaticHandler, dict(root=self.root or '.')),
            )
        return Application(handlers=handlers, debug=debug)

    def serve(self, port=None, host=None, root=None, debug=True, open_url=False):
        """Start serve the server with the given port.

        :param port: serve on this port, default is 5500
        :param host: serve on this hostname, default is 0.0.0.0
        :param root: serve static on this root directory
        :param open_url: open system browser
        """
        if root:
            self.root = root
        if port:
            self.port = port
        if host is None:
            host = ''

        self.application(debug=debug).listen(self.port, address=host)
        logging.getLogger().setLevel(logging.INFO)

        host = host or '127.0.0.1'
        print('Serving on http://%s:%s' % (host, self.port))

        # Async open web browser after 5 sec timeout
        if open_url:
            def opener():
                time.sleep(5)
                webbrowser.open('http://%s:%s' % (host, self.port))
            threading.Thread(target=opener).start()

        try:
            IOLoop.instance().start()
        except KeyboardInterrupt:
            print('Shutting down...')