/usr/share/pyshared/gluon/contrib/simplejsonrpc.py is in python-gluon 2.12.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | # -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation; either version 3, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
"Pythonic simple JSON RPC Client implementation"
__author__ = "Mariano Reingart (reingart@gmail.com)"
__copyright__ = "Copyright (C) 2011 Mariano Reingart"
__license__ = "LGPL 3.0"
__version__ = "0.05"
import urllib
from xmlrpclib import Transport, SafeTransport
from cStringIO import StringIO
import random
import sys
try:
import gluon.contrib.simplejson as json # try web2py json serializer
except ImportError:
try:
import json # try stdlib (py2.6)
except:
import simplejson as json # try external module
class JSONRPCError(RuntimeError):
"Error object for remote procedure call fail"
def __init__(self, code, message, data=None):
value = "%s: %s\n%s" % (code, message, '\n'.join(data or ''))
RuntimeError.__init__(self, value)
self.code = code
self.message = message
self.data = data
class JSONDummyParser:
"json wrapper for xmlrpclib parser interfase"
def __init__(self):
self.buf = StringIO()
def feed(self, data):
self.buf.write(data)
def close(self):
return self.buf.getvalue()
class JSONTransportMixin:
"json wrapper for xmlrpclib transport interfase"
def send_content(self, connection, request_body):
connection.putheader("Content-Type", "application/json")
connection.putheader("Content-Length", str(len(request_body)))
connection.endheaders()
if request_body:
connection.send(request_body)
# todo: add gzip compression
def getparser(self):
# get parser and unmarshaller
parser = JSONDummyParser()
return parser, parser
class JSONTransport(JSONTransportMixin, Transport):
pass
class JSONSafeTransport(JSONTransportMixin, SafeTransport):
pass
class ServerProxy(object):
"JSON RPC Simple Client Service Proxy"
def __init__(self, uri, transport=None, encoding=None, verbose=0,version=None):
self.location = uri # server location (url)
self.trace = verbose # show debug messages
self.exceptions = True # raise errors? (JSONRPCError)
self.timeout = None
self.json_request = self.json_response = ''
self.version = version # '2.0' for jsonrpc2
type, uri = urllib.splittype(uri)
if type not in ("http", "https"):
raise IOError("unsupported JSON-RPC protocol")
self.__host, self.__handler = urllib.splithost(uri)
if transport is None:
if type == "https":
transport = JSONSafeTransport()
else:
transport = JSONTransport()
self.__transport = transport
self.__encoding = encoding
self.__verbose = verbose
def __getattr__(self, attr):
"pseudo method that can be called"
return lambda *args: self.call(attr, *args)
def call(self, method, *args):
"JSON RPC communication (method invocation)"
# build data sent to the service
request_id = random.randint(0, sys.maxint)
data = {'id': request_id, 'method': method, 'params': args, }
if self.version:
data['jsonrpc'] = self.version #mandatory key/value for jsonrpc2 validation else err -32600
request = json.dumps(data)
# make HTTP request (retry if connection is lost)
response = self.__transport.request(
self.__host,
self.__handler,
request,
verbose=self.__verbose
)
# store plain request and response for further debugging
self.json_request = request
self.json_response = response
# parse json data coming from service
# {'version': '1.1', 'id': id, 'result': result, 'error': None}
response = json.loads(response)
self.error = response.get('error', {})
if self.error and self.exceptions:
raise JSONRPCError(self.error.get('code', 0),
self.error.get('message', ''),
self.error.get('data', None))
if response['id'] != request_id:
raise JSONRPCError(0, "JSON Request ID != Response ID")
return response.get('result')
ServiceProxy = ServerProxy
if __name__ == "__main__":
# basic tests:
location = "http://www.web2py.com.ar/webservices/sample/call/jsonrpc"
client = ServerProxy(location, verbose='--verbose' in sys.argv,)
print client.add(1, 2)
|