/usr/lib/python3/dist-packages/waitress/task.py is in python3-waitress 0.8.10-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 | ##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import socket
import sys
import threading
import time
from waitress.buffers import ReadOnlyFileBasedBuffer
from waitress.compat import (
tobytes,
Queue,
Empty,
reraise,
)
from waitress.utilities import (
build_http_date,
logger,
)
rename_headers = { # or keep them without the HTTP_ prefix added
'CONTENT_LENGTH': 'CONTENT_LENGTH',
'CONTENT_TYPE': 'CONTENT_TYPE',
}
hop_by_hop = frozenset((
'connection',
'keep-alive',
'proxy-authenticate',
'proxy-authorization',
'te',
'trailers',
'transfer-encoding',
'upgrade'
))
class JustTesting(Exception):
pass
class ThreadedTaskDispatcher(object):
"""A Task Dispatcher that creates a thread for each task.
"""
stop_count = 0 # Number of threads that will stop soon.
logger = logger
def __init__(self):
self.threads = {} # { thread number -> 1 }
self.queue = Queue()
self.thread_mgmt_lock = threading.Lock()
def start_new_thread(self, target, args):
t = threading.Thread(target=target, name='waitress', args=args)
t.daemon = True
t.start()
def handler_thread(self, thread_no):
threads = self.threads
try:
while threads.get(thread_no):
task = self.queue.get()
if task is None:
# Special value: kill this thread.
break
try:
task.service()
except Exception as e:
self.logger.exception(
'Exception when servicing %r' % task)
if isinstance(e, JustTesting):
break
finally:
with self.thread_mgmt_lock:
self.stop_count -= 1
threads.pop(thread_no, None)
def set_thread_count(self, count):
with self.thread_mgmt_lock:
threads = self.threads
thread_no = 0
running = len(threads) - self.stop_count
while running < count:
# Start threads.
while thread_no in threads:
thread_no = thread_no + 1
threads[thread_no] = 1
running += 1
self.start_new_thread(self.handler_thread, (thread_no,))
thread_no = thread_no + 1
if running > count:
# Stop threads.
to_stop = running - count
self.stop_count += to_stop
for n in range(to_stop):
self.queue.put(None)
running -= 1
def add_task(self, task):
try:
task.defer()
self.queue.put(task)
except:
task.cancel()
raise
def shutdown(self, cancel_pending=True, timeout=5):
self.set_thread_count(0)
# Ensure the threads shut down.
threads = self.threads
expiration = time.time() + timeout
while threads:
if time.time() >= expiration:
self.logger.warning(
"%d thread(s) still running" %
len(threads))
break
time.sleep(0.1)
if cancel_pending:
# Cancel remaining tasks.
try:
queue = self.queue
while not queue.empty():
task = queue.get()
if task is not None:
task.cancel()
except Empty: # pragma: no cover
pass
return True
return False
class Task(object):
close_on_finish = False
status = '200 OK'
wrote_header = False
start_time = 0
content_length = None
content_bytes_written = 0
logged_write_excess = False
complete = False
chunked_response = False
logger = logger
def __init__(self, channel, request):
self.channel = channel
self.request = request
self.response_headers = []
version = request.version
if version not in ('1.0', '1.1'):
# fall back to a version we support.
version = '1.0'
self.version = version
def service(self):
try:
try:
self.start()
self.execute()
self.finish()
except socket.error:
self.close_on_finish = True
if self.channel.adj.log_socket_errors:
raise
finally:
pass
def cancel(self):
self.close_on_finish = True
def defer(self):
pass
def build_response_header(self):
version = self.version
# Figure out whether the connection should be closed.
connection = self.request.headers.get('CONNECTION', '').lower()
response_headers = self.response_headers
content_length_header = None
date_header = None
server_header = None
connection_close_header = None
for i, (headername, headerval) in enumerate(response_headers):
headername = '-'.join(
[x.capitalize() for x in headername.split('-')]
)
if headername == 'Content-Length':
content_length_header = headerval
if headername == 'Date':
date_header = headerval
if headername == 'Server':
server_header = headerval
if headername == 'Connection':
connection_close_header = headerval.lower()
# replace with properly capitalized version
response_headers[i] = (headername, headerval)
if content_length_header is None and self.content_length is not None:
content_length_header = str(self.content_length)
self.response_headers.append(
('Content-Length', content_length_header)
)
def close_on_finish():
if connection_close_header is None:
response_headers.append(('Connection', 'close'))
self.close_on_finish = True
if version == '1.0':
if connection == 'keep-alive':
if not content_length_header:
close_on_finish()
else:
response_headers.append(('Connection', 'Keep-Alive'))
else:
close_on_finish()
elif version == '1.1':
if connection == 'close':
close_on_finish()
if not content_length_header:
response_headers.append(('Transfer-Encoding', 'chunked'))
self.chunked_response = True
if not self.close_on_finish:
close_on_finish()
# under HTTP 1.1 keep-alive is default, no need to set the header
else:
raise AssertionError('neither HTTP/1.0 or HTTP/1.1')
# Set the Server and Date field, if not yet specified. This is needed
# if the server is used as a proxy.
ident = self.channel.server.adj.ident
if not server_header:
response_headers.append(('Server', ident))
else:
response_headers.append(('Via', ident))
if not date_header:
response_headers.append(('Date', build_http_date(self.start_time)))
first_line = 'HTTP/%s %s' % (self.version, self.status)
# NB: sorting headers needs to preserve same-named-header order
# as per RFC 2616 section 4.2; thus the key=lambda x: x[0] here;
# rely on stable sort to keep relative position of same-named headers
next_lines = ['%s: %s' % hv for hv in sorted(
self.response_headers, key=lambda x: x[0])]
lines = [first_line] + next_lines
res = '%s\r\n\r\n' % '\r\n'.join(lines)
return tobytes(res)
def remove_content_length_header(self):
for i, (header_name, header_value) in enumerate(self.response_headers):
if header_name.lower() == 'content-length':
del self.response_headers[i]
def start(self):
self.start_time = time.time()
def finish(self):
if not self.wrote_header:
self.write(b'')
if self.chunked_response:
# not self.write, it will chunk it!
self.channel.write_soon(b'0\r\n\r\n')
def write(self, data):
if not self.complete:
raise RuntimeError('start_response was not called before body '
'written')
channel = self.channel
if not self.wrote_header:
rh = self.build_response_header()
channel.write_soon(rh)
self.wrote_header = True
if data:
towrite = data
cl = self.content_length
if self.chunked_response:
# use chunked encoding response
towrite = tobytes(hex(len(data))[2:].upper()) + b'\r\n'
towrite += data + b'\r\n'
elif cl is not None:
towrite = data[:cl - self.content_bytes_written]
self.content_bytes_written += len(towrite)
if towrite != data and not self.logged_write_excess:
self.logger.warning(
'application-written content exceeded the number of '
'bytes specified by Content-Length header (%s)' % cl)
self.logged_write_excess = True
if towrite:
channel.write_soon(towrite)
class ErrorTask(Task):
""" An error task produces an error response
"""
complete = True
def execute(self):
e = self.request.error
body = '%s\r\n\r\n%s' % (e.reason, e.body)
tag = '\r\n\r\n(generated by waitress)'
body = body + tag
self.status = '%s %s' % (e.code, e.reason)
cl = len(body)
self.content_length = cl
self.response_headers.append(('Content-Length', str(cl)))
self.response_headers.append(('Content-Type', 'text/plain'))
if self.version == '1.1':
connection = self.request.headers.get('CONNECTION', '').lower()
if connection == 'close':
self.response_headers.append(('Connection', 'close'))
# under HTTP 1.1 keep-alive is default, no need to set the header
else:
# HTTP 1.0
self.response_headers.append(('Connection', 'close'))
self.close_on_finish = True
self.write(tobytes(body))
class WSGITask(Task):
"""A WSGI task produces a response from a WSGI application.
"""
environ = None
def execute(self):
env = self.get_environment()
def start_response(status, headers, exc_info=None):
if self.complete and not exc_info:
raise AssertionError("start_response called a second time "
"without providing exc_info.")
if exc_info:
try:
if self.wrote_header:
# higher levels will catch and handle raised exception:
# 1. "service" method in task.py
# 2. "service" method in channel.py
# 3. "handler_thread" method in task.py
reraise(exc_info[0], exc_info[1], exc_info[2])
else:
# As per WSGI spec existing headers must be cleared
self.response_headers = []
finally:
exc_info = None
self.complete = True
if not status.__class__ is str:
raise AssertionError('status %s is not a string' % status)
self.status = status
# Prepare the headers for output
for k, v in headers:
if not k.__class__ is str:
raise AssertionError(
'Header name %r is not a string in %r' % (k, (k, v))
)
if not v.__class__ is str:
raise AssertionError(
'Header value %r is not a string in %r' % (v, (k, v))
)
kl = k.lower()
if kl == 'content-length':
self.content_length = int(v)
elif kl in hop_by_hop:
raise AssertionError(
'%s is a "hop-by-hop" header; it cannot be used by '
'a WSGI application (see PEP 3333)' % k)
self.response_headers.extend(headers)
# Return a method used to write the response data.
return self.write
# Call the application to handle the request and write a response
app_iter = self.channel.server.application(env, start_response)
if app_iter.__class__ is ReadOnlyFileBasedBuffer:
# NB: do not put this inside the below try: finally: which closes
# the app_iter; we need to defer closing the underlying file. It's
# intention that we don't want to call ``close`` here if the
# app_iter is a ROFBB; the buffer (and therefore the file) will
# eventually be closed within channel.py's _flush_some or
# handle_close instead.
cl = self.content_length
size = app_iter.prepare(cl)
if size:
if cl != size:
if cl is not None:
self.remove_content_length_header()
self.content_length = size
self.write(b'') # generate headers
self.channel.write_soon(app_iter)
return
try:
first_chunk_len = None
for chunk in app_iter:
if first_chunk_len is None:
first_chunk_len = len(chunk)
# Set a Content-Length header if one is not supplied.
# start_response may not have been called until first
# iteration as per PEP, so we must reinterrogate
# self.content_length here
if self.content_length is None:
app_iter_len = None
if hasattr(app_iter, '__len__'):
app_iter_len = len(app_iter)
if app_iter_len == 1:
self.content_length = first_chunk_len
# transmit headers only after first iteration of the iterable
# that returns a non-empty bytestring (PEP 3333)
if chunk:
self.write(chunk)
cl = self.content_length
if cl is not None:
if self.content_bytes_written != cl:
# close the connection so the client isn't sitting around
# waiting for more data when there are too few bytes
# to service content-length
self.close_on_finish = True
if self.request.command != 'HEAD':
self.logger.warning(
'application returned too few bytes (%s) '
'for specified Content-Length (%s) via app_iter' % (
self.content_bytes_written, cl),
)
finally:
if hasattr(app_iter, 'close'):
app_iter.close()
def get_environment(self):
"""Returns a WSGI environment."""
environ = self.environ
if environ is not None:
# Return the cached copy.
return environ
request = self.request
path = request.path
channel = self.channel
server = channel.server
url_prefix = server.adj.url_prefix
if path.startswith('/'):
# strip extra slashes at the beginning of a path that starts
# with any number of slashes
path = '/' + path.lstrip('/')
if url_prefix:
# NB: url_prefix is guaranteed by the configuration machinery to
# be either the empty string or a string that starts with a single
# slash and ends without any slashes
if path == url_prefix:
# if the path is the same as the url prefix, the SCRIPT_NAME
# should be the url_prefix and PATH_INFO should be empty
path = ''
else:
# if the path starts with the url prefix plus a slash,
# the SCRIPT_NAME should be the url_prefix and PATH_INFO should
# the value of path from the slash until its end
url_prefix_with_trailing_slash = url_prefix + '/'
if path.startswith(url_prefix_with_trailing_slash):
path = path[len(url_prefix):]
environ = {}
environ['REQUEST_METHOD'] = request.command.upper()
environ['SERVER_PORT'] = str(server.effective_port)
environ['SERVER_NAME'] = server.server_name
environ['SERVER_SOFTWARE'] = server.adj.ident
environ['SERVER_PROTOCOL'] = 'HTTP/%s' % self.version
environ['SCRIPT_NAME'] = url_prefix
environ['PATH_INFO'] = path
environ['QUERY_STRING'] = request.query
host = environ['REMOTE_ADDR'] = channel.addr[0]
headers = dict(request.headers)
if host == server.adj.trusted_proxy:
wsgi_url_scheme = headers.pop('X_FORWARDED_PROTO',
request.url_scheme)
else:
wsgi_url_scheme = request.url_scheme
if wsgi_url_scheme not in ('http', 'https'):
raise ValueError('Invalid X_FORWARDED_PROTO value')
for key, value in headers.items():
value = value.strip()
mykey = rename_headers.get(key, None)
if mykey is None:
mykey = 'HTTP_%s' % key
if mykey not in environ:
environ[mykey] = value
# the following environment variables are required by the WSGI spec
environ['wsgi.version'] = (1, 0)
environ['wsgi.url_scheme'] = wsgi_url_scheme
environ['wsgi.errors'] = sys.stderr # apps should use the logging module
environ['wsgi.multithread'] = True
environ['wsgi.multiprocess'] = False
environ['wsgi.run_once'] = False
environ['wsgi.input'] = request.get_body_stream()
environ['wsgi.file_wrapper'] = ReadOnlyFileBasedBuffer
self.environ = environ
return environ
|