/usr/share/pyshared/pylons/controllers/xmlrpc.py is in python-pylons 1.0.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 | """The base WSGI XMLRPCController"""
import inspect
import logging
import types
import xmlrpclib
from paste.response import replace_header
from pylons.controllers import WSGIController
from pylons.controllers.util import abort, Response
__all__ = ['XMLRPCController']
log = logging.getLogger(__name__)
XMLRPC_MAPPING = ((basestring, 'string'), (list, 'array'), (bool, 'boolean'),
(int, 'int'), (float, 'double'), (dict, 'struct'),
(xmlrpclib.DateTime, 'dateTime.iso8601'),
(xmlrpclib.Binary, 'base64'))
def xmlrpc_sig(args):
"""Returns a list of the function signature in string format based on a
tuple provided by xmlrpclib."""
signature = []
for param in args:
for type, xml_name in XMLRPC_MAPPING:
if isinstance(param, type):
signature.append(xml_name)
break
return signature
def xmlrpc_fault(code, message):
"""Convienence method to return a Pylons response XMLRPC Fault"""
fault = xmlrpclib.Fault(code, message)
return Response(body=xmlrpclib.dumps(fault, methodresponse=True))
class XMLRPCController(WSGIController):
"""XML-RPC Controller that speaks WSGI
This controller handles XML-RPC responses and complies with the
`XML-RPC Specification <http://www.xmlrpc.com/spec>`_ as well as
the `XML-RPC Introspection
<http://scripts.incutio.com/xmlrpc/introspection.html>`_
specification.
By default, methods with names containing a dot are translated to
use an underscore. For example, the `system.methodHelp` is handled
by the method :meth:`system_methodHelp`.
Methods in the XML-RPC controller will be called with the method
given in the XMLRPC body. Methods may be annotated with a signature
attribute to declare the valid arguments and return types.
For example::
class MyXML(XMLRPCController):
def userstatus(self):
return 'basic string'
userstatus.signature = [ ['string'] ]
def userinfo(self, username, age=None):
user = LookUpUser(username)
response = {'username':user.name}
if age and age > 10:
response['age'] = age
return response
userinfo.signature = [['struct', 'string'],
['struct', 'string', 'int']]
Since XML-RPC methods can take different sets of data, each set of
valid arguments is its own list. The first value in the list is the
type of the return argument. The rest of the arguments are the
types of the data that must be passed in.
In the last method in the example above, since the method can
optionally take an integer value both sets of valid parameter lists
should be provided.
Valid types that can be checked in the signature and their
corresponding Python types::
'string' - str
'array' - list
'boolean' - bool
'int' - int
'double' - float
'struct' - dict
'dateTime.iso8601' - xmlrpclib.DateTime
'base64' - xmlrpclib.Binary
The class variable ``allow_none`` is passed to xmlrpclib.dumps;
enabling it allows translating ``None`` to XML (an extension to the
XML-RPC specification)
.. note::
Requiring a signature is optional.
"""
allow_none = False
max_body_length = 4194304
def _get_method_args(self):
return self.rpc_kargs
def __call__(self, environ, start_response):
"""Parse an XMLRPC body for the method, and call it with the
appropriate arguments"""
# Pull out the length, return an error if there is no valid
# length or if the length is larger than the max_body_length.
log_debug = self._pylons_log_debug
length = environ.get('CONTENT_LENGTH')
if length:
length = int(length)
else:
# No valid Content-Length header found
if log_debug:
log.debug("No Content-Length found, returning 411 error")
abort(411)
if length > self.max_body_length or length == 0:
if log_debug:
log.debug("Content-Length larger than max body length. Max: "
"%s, Sent: %s. Returning 413 error",
self.max_body_length, length)
abort(413, "XML body too large")
body = environ['wsgi.input'].read(int(environ['CONTENT_LENGTH']))
rpc_args, orig_method = xmlrpclib.loads(body)
method = self._find_method_name(orig_method)
func = self._find_method(method)
if not func:
if log_debug:
log.debug("Method: %r not found, returning xmlrpc fault",
method)
return xmlrpc_fault(0, "No such method name %r" %
method)(environ, start_response)
# Signature checking for params
if hasattr(func, 'signature'):
if log_debug:
log.debug("Checking XMLRPC argument signature")
valid_args = False
params = xmlrpc_sig(rpc_args)
for sig in func.signature:
# Next sig if we don't have the same amount of args
if len(sig) - 1 != len(rpc_args):
continue
# If the params match, we're valid
if params == sig[1:]:
valid_args = True
break
if not valid_args:
if log_debug:
log.debug("Bad argument signature recieved, returning "
"xmlrpc fault")
msg = ("Incorrect argument signature. %r recieved does not "
"match %r signature for method %r" % \
(params, func.signature, orig_method))
return xmlrpc_fault(0, msg)(environ, start_response)
# Change the arg list into a keyword dict based off the arg
# names in the functions definition
arglist = inspect.getargspec(func)[0][1:]
kargs = dict(zip(arglist, rpc_args))
kargs['action'], kargs['environ'] = method, environ
kargs['start_response'] = start_response
self.rpc_kargs = kargs
self._func = func
# Now that we know the method is valid, and the args are valid,
# we can dispatch control to the default WSGIController
status = []
headers = []
exc_info = []
def change_content(new_status, new_headers, new_exc_info=None):
status.append(new_status)
headers.extend(new_headers)
exc_info.append(new_exc_info)
output = WSGIController.__call__(self, environ, change_content)
output = list(output)
headers.append(('Content-Length', str(len(output[0]))))
replace_header(headers, 'Content-Type', 'text/xml')
start_response(status[0], headers, exc_info[0])
return output
def _dispatch_call(self):
"""Dispatch the call to the function chosen by __call__"""
raw_response = self._inspect_call(self._func)
if not isinstance(raw_response, xmlrpclib.Fault):
raw_response = (raw_response,)
response = xmlrpclib.dumps(raw_response, methodresponse=True,
allow_none=self.allow_none)
return response
def _find_method(self, name):
"""Locate a method in the controller by the specified name and
return it"""
# Keep private methods private
if name.startswith('_'):
if self._pylons_log_debug:
log.debug("Action starts with _, private action not allowed")
return
if self._pylons_log_debug:
log.debug("Looking for XMLRPC method: %r", name)
try:
func = getattr(self, name, None)
except UnicodeEncodeError:
return
if isinstance(func, types.MethodType):
return func
def _find_method_name(self, name):
"""Locate a method in the controller by the appropriate name
By default, this translates method names like
'system.methodHelp' into 'system_methodHelp'.
"""
return name.replace('.', '_')
def _publish_method_name(self, name):
"""Translate an internal method name to a publicly viewable one
By default, this translates internal method names like
'blog_view' into 'blog.view'.
"""
return name.replace('_', '.')
def system_listMethods(self):
"""Returns a list of XML-RPC methods for this XML-RPC resource"""
methods = []
for method in dir(self):
meth = getattr(self, method)
if not method.startswith('_') and isinstance(meth,
types.MethodType):
methods.append(self._publish_method_name(method))
return methods
system_listMethods.signature = [['array']]
def system_methodSignature(self, name):
"""Returns an array of array's for the valid signatures for a
method.
The first value of each array is the return value of the
method. The result is an array to indicate multiple signatures
a method may be capable of.
"""
method = self._find_method(self._find_method_name(name))
if method:
return getattr(method, 'signature', '')
else:
return xmlrpclib.Fault(0, 'No such method name')
system_methodSignature.signature = [['array', 'string'],
['string', 'string']]
def system_methodHelp(self, name):
"""Returns the documentation for a method"""
method = self._find_method(self._find_method_name(name))
if method:
help = MethodHelp.getdoc(method)
sig = getattr(method, 'signature', None)
if sig:
help += "\n\nMethod signature: %s" % sig
return help
return xmlrpclib.Fault(0, "No such method name")
system_methodHelp.signature = [['string', 'string']]
class MethodHelp(object):
"""Wrapper for formatting doc strings from XMLRPCController
methods"""
def __init__(self, doc):
self.__doc__ = doc
def getdoc(method):
"""Return a formatted doc string, via inspect.getdoc, from the
specified XMLRPCController method
The method's help attribute is used if it exists, otherwise the
method's doc string is used.
"""
help = getattr(method, 'help', None)
if help is None:
help = method.__doc__
doc = inspect.getdoc(MethodHelp(help))
if doc is None:
return ''
return doc
getdoc = staticmethod(getdoc)
|