/usr/share/pyshared/twisted/web2/channel/cgi.py is in python-twisted-web2 8.1.0-3build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | import warnings
import os
import urllib
from zope.interface import implements
from twisted.internet import protocol, address
from twisted.internet import reactor, interfaces
from twisted.web2 import http, http_headers, server, responsecode
class BaseCGIChannelRequest(protocol.Protocol):
implements(interfaces.IHalfCloseableProtocol)
finished = False
requestFactory = http.Request
request = None
def makeRequest(self, vars):
headers = http_headers.Headers()
http_vers = http.parseVersion(vars['SERVER_PROTOCOL'])
if http_vers[0] != 'http' or http_vers[1] > 1:
_abortWithError(responsecode.INTERNAL_SERVER_ERROR, "Twisted.web CGITransport: Unknown HTTP version: " % vars['SERVER_PROTOCOL'])
secure = vars.get("HTTPS") in ("1", "on") # apache extension?
port = vars.get('SERVER_PORT') or 80
server_host = vars.get('SERVER_NAME') or vars.get('SERVER_ADDR') or 'localhost'
self.hostinfo = address.IPv4Address('TCP', server_host, port), bool(secure)
self.remoteinfo = address.IPv4Address(
'TCP', vars.get('REMOTE_ADDR', ''), vars.get('REMOTE_PORT', 0))
uri = vars.get('REQUEST_URI') # apache extension?
if not uri:
qstr = vars.get('QUERY_STRING', '')
if qstr:
qstr = "?"+urllib.quote(qstr, safe="")
uri = urllib.quote(vars['SCRIPT_NAME'])+urllib.quote(vars.get('PATH_INFO', ''))+qstr
for name,val in vars.iteritems():
if name.startswith('HTTP_'):
name = name[5:].replace('_', '-')
elif name == 'CONTENT_TYPE':
name = 'content-type'
else:
continue
headers.setRawHeaders(name, (val,))
self._dataRemaining = int(vars.get('CONTENT_LENGTH', '0'))
self.request = self.requestFactory(self, vars['REQUEST_METHOD'], uri, http_vers[1:3], self._dataRemaining, headers, prepathuri=vars['SCRIPT_NAME'])
def writeIntermediateResponse(self, code, headers=None):
"""Ignore, CGI doesn't support."""
pass
def write(self, data):
self.transport.write(data)
def finish(self):
if self.finished:
warnings.warn("Warning! request.finish called twice.", stacklevel=2)
return
self.finished = True
self.transport.loseConnection()
def getHostInfo(self):
return self.hostinfo
def getRemoteHost(self):
return self.remoteinfo
def abortConnection(self, closeWrite=True):
self.transport.loseConnection()
def registerProducer(self, producer, streaming):
self.transport.registerProducer(producer, streaming)
def unregisterProducer(self):
self.transport.unregisterProducer()
def writeConnectionLost(self):
self.loseConnection()
def readConnectionLost(self):
if self._dataRemaining > 0:
# content-length was wrong, abort
self.loseConnection()
class CGIChannelRequest(BaseCGIChannelRequest):
cgi_vers = (1, 0)
def __init__(self, requestFactory, vars):
self.requestFactory=requestFactory
cgi_vers = http.parseVersion(vars['GATEWAY_INTERFACE'])
if cgi_vers[0] != 'cgi' or cgi_vers[1] != 1:
_abortWithError(responsecode.INTERNAL_SERVER_ERROR, "Twisted.web CGITransport: Unknown CGI version %s" % vars['GATEWAY_INTERFACE'])
self.makeRequest(vars)
def writeHeaders(self, code, headers):
l = []
code_message = responsecode.RESPONSES.get(code, "Unknown Status")
l.append("Status: %s %s\n" % (code, code_message))
if headers is not None:
for name, valuelist in headers.getAllRawHeaders():
for value in valuelist:
l.append("%s: %s\n" % (name, value))
l.append('\n')
self.transport.writeSequence(l)
def dataReceived(self, data):
if self._dataRemaining <= 0:
return
if self._dataRemaining < len(data):
data = data[:self._dataRemaining]
self._dataRemaining -= len(data)
self.request.handleContentChunk(data)
if self._dataRemaining == 0:
self.request.handleContentComplete()
def connectionMade(self):
self.request.process()
if self._dataRemaining == 0:
self.request.handleContentComplete()
def connectionLost(self, reason):
if reactor.running:
reactor.stop()
def startCGI(site):
"""Call this as the last thing in your CGI python script in order to
hook up your site object with the incoming request.
E.g.:
>>> from twisted.web2 import channel, server
>>> if __name__ == '__main__':
... channel.startCGI(server.Site(myToplevelResource))
"""
from twisted.internet.stdio import StandardIO
StandardIO(CGIChannelRequest(site, os.environ))
reactor.run()
__all__ = ['startCGI']
|