/usr/lib/python3/dist-packages/livereload/server.py is in python3-livereload 2.4.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 | # -*- coding: utf-8 -*-
"""
livereload.server
~~~~~~~~~~~~~~~~~
WSGI app server for livereload.
:copyright: (c) 2013 by Hsiaoming Yang
"""
import os
import time
import shlex
import logging
import threading
import webbrowser
from subprocess import Popen, PIPE
from tornado.wsgi import WSGIContainer
from tornado.ioloop import IOLoop
from tornado import web
from tornado import escape
from tornado import httputil
from tornado.log import LogFormatter
from .handlers import LiveReloadHandler, LiveReloadJSHandler
from .handlers import ForceReloadHandler, StaticFileHandler
from .watcher import get_watcher_class
from six import string_types, PY3
logger = logging.getLogger('livereload')
HEAD_END = b'</head>'
def shell(cmd, output=None, mode='w', cwd=None, shell=False):
"""Execute a shell command.
You can add a shell command::
server.watch(
'style.less', shell('lessc style.less', output='style.css')
)
:param cmd: a shell command, string or list
:param output: output stdout to the given file
:param mode: only works with output, mode ``w`` means write,
mode ``a`` means append
:param cwd: set working directory before command is executed.
:param shell: if true, on Unix the executable argument specifies a
replacement shell for the default ``/bin/sh``.
"""
if not output:
output = os.devnull
else:
folder = os.path.dirname(output)
if folder and not os.path.isdir(folder):
os.makedirs(folder)
if not isinstance(cmd, (list, tuple)) and not shell:
cmd = shlex.split(cmd)
def run_shell():
try:
p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd,
shell=shell)
except OSError as e:
logger.error(e)
if e.errno == os.errno.ENOENT: # file (command) not found
logger.error("maybe you haven't installed %s", cmd[0])
return e
stdout, stderr = p.communicate()
if stderr:
logger.error(stderr)
return stderr
#: stdout is bytes, decode for python3
if PY3:
stdout = stdout.decode()
with open(output, mode) as f:
f.write(stdout)
return run_shell
class LiveScriptInjector(web.OutputTransform):
def __init__(self, request):
super(LiveScriptInjector, self).__init__(request)
def transform_first_chunk(self, status_code, headers, chunk, finishing):
if HEAD_END in chunk:
chunk = chunk.replace(HEAD_END, self.script + HEAD_END)
if 'Content-Length' in headers:
length = int(headers['Content-Length']) + len(self.script)
headers['Content-Length'] = str(length)
return status_code, headers, chunk
class LiveScriptContainer(WSGIContainer):
def __init__(self, wsgi_app, script=''):
self.wsgi_app = wsgi_app
self.script = script
def __call__(self, request):
data = {}
response = []
def start_response(status, response_headers, exc_info=None):
data["status"] = status
data["headers"] = response_headers
return response.append
app_response = self.wsgi_app(
WSGIContainer.environ(request), start_response)
try:
response.extend(app_response)
body = b"".join(response)
finally:
if hasattr(app_response, "close"):
app_response.close()
if not data:
raise Exception("WSGI app did not call start_response")
status_code, reason = data["status"].split(' ', 1)
status_code = int(status_code)
headers = data["headers"]
header_set = set(k.lower() for (k, v) in headers)
body = escape.utf8(body)
if HEAD_END in body:
body = body.replace(HEAD_END, self.script + HEAD_END)
if status_code != 304:
if "content-type" not in header_set:
headers.append(("Content-Type", "text/html; charset=UTF-8"))
if "content-length" not in header_set:
headers.append(("Content-Length", str(len(body))))
if "server" not in header_set:
headers.append(("Server", "LiveServer"))
start_line = httputil.ResponseStartLine(
"HTTP/1.1", status_code, reason
)
header_obj = httputil.HTTPHeaders()
for key, value in headers:
if key == 'Content-Length':
value = str(len(body))
header_obj.add(key, value)
request.connection.write_headers(start_line, header_obj, chunk=body)
request.connection.finish()
self._log(status_code, request)
class Server(object):
"""Livereload server interface.
Initialize a server and watch file changes::
server = Server(wsgi_app)
server.serve()
:param app: a wsgi application instance
:param watcher: A Watcher instance, you don't have to initialize
it by yourself. Under Linux, you will want to install
pyinotify and use INotifyWatcher() to avoid wasted
CPU usage.
"""
def __init__(self, app=None, watcher=None):
self.root = None
self.app = app
if not watcher:
watcher_cls = get_watcher_class()
watcher = watcher_cls()
self.watcher = watcher
def watch(self, filepath, func=None, delay=None):
"""Add the given filepath for watcher list.
Once you have intialized a server, watch file changes before
serve the server::
server.watch('static/*.stylus', 'make static')
def alert():
print('foo')
server.watch('foo.txt', alert)
server.serve()
:param filepath: files to be watched, it can be a filepath,
a directory, or a glob pattern
:param func: the function to be called, it can be a string of
shell command, or any callable object without
parameters
:param delay: Delay sending the reload message. Use 'forever' to
not send it. This is useful to compile sass files to
css, but reload on changed css files then only.
"""
if isinstance(func, string_types):
func = shell(func)
self.watcher.watch(filepath, func, delay)
def application(self, port, host, liveport=None, debug=None):
LiveReloadHandler.watcher = self.watcher
if liveport is None:
liveport = port
if debug is None and self.app:
debug = True
live_handlers = [
(r'/livereload', LiveReloadHandler),
(r'/forcereload', ForceReloadHandler),
(r'/livereload.js', LiveReloadJSHandler)
]
live_script = escape.utf8((
'<script src="http://{host}:{port}/livereload.js"></script>'
).format(host=host, port=liveport))
web_handlers = self.get_web_handlers(live_script)
class ConfiguredTransform(LiveScriptInjector):
script = live_script
if liveport == port:
handlers = live_handlers + web_handlers
app = web.Application(
handlers=handlers,
debug=debug,
transforms=[ConfiguredTransform]
)
app.listen(port, address=host)
else:
app = web.Application(
handlers=web_handlers,
debug=debug,
transforms=[ConfiguredTransform]
)
app.listen(port, address=host)
live = web.Application(handlers=live_handlers, debug=False)
live.listen(liveport, address=host)
def get_web_handlers(self, script):
if self.app:
fallback = LiveScriptContainer(self.app, script)
return [(r'.*', web.FallbackHandler, {'fallback': fallback})]
return [
(r'/(.*)', StaticFileHandler, {
'path': self.root or '.',
'default_filename': 'index.html',
}),
]
def serve(self, port=5500, liveport=None, host=None, root=None, debug=None,
open_url=False, restart_delay=2, open_url_delay=None):
"""Start serve the server with the given port.
:param port: serve on this port, default is 5500
:param liveport: live reload on this port
:param host: serve on this hostname, default is 127.0.0.1
:param root: serve static on this root directory
:param debug: set debug mode, which autoreloads the app on code changes
via Tornado (and causes polling). Defaults to True when
``self.app`` is set, otherwise False.
:param open_url_delay: open webbrowser after the delay seconds
"""
host = host or '127.0.0.1'
if root is not None:
self.root = root
self._setup_logging()
logger.info('Serving on http://%s:%s' % (host, port))
self.application(port, host, liveport=liveport, debug=debug)
# Async open web browser after 5 sec timeout
if open_url or open_url_delay:
if open_url:
logger.warn('Use `open_url_delay` instead of `open_url`')
sleep = open_url_delay or 5
def opener():
time.sleep(sleep)
webbrowser.open('http://%s:%s' % (host, port))
threading.Thread(target=opener).start()
try:
self.watcher._changes.append(('__livereload__', restart_delay))
LiveReloadHandler.start_tasks()
IOLoop.instance().start()
except KeyboardInterrupt:
logger.info('Shutting down...')
def _setup_logging(self):
logger.setLevel(logging.INFO)
channel = logging.StreamHandler()
channel.setFormatter(LogFormatter())
logger.addHandler(channel)
# need a tornado logging handler to prevent IOLoop._setup_logging
logging.getLogger('tornado').addHandler(channel)
|