/usr/share/pyshared/ZSI/twisted/interfaces.py is in python-zsi 2.1~a1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | ###########################################################################
# Joshua R. Boverhof, LBNL
# See Copyright for copyright notice!
# $Id: $
###########################################################################
import sys, warnings
# twisted & related imports
from zope.interface import classProvides, implements, Interface
# ZSI imports
from ZSI import EvaluateException, ParseException, ParsedSoap, SoapWriter
#
# Stability: Unstable
#
def CheckInputArgs(*interfaces):
"""Must provide at least one interface, the last one may be repeated.
"""
l = len(interfaces)
def wrapper(func):
def check_args(self, *args, **kw):
for i in range(len(args)):
if (l > i and interfaces[i].providedBy(args[i])) or interfaces[-1].providedBy(args[i]):
continue
if l > i: raise TypeError, 'arg %s does not implement %s' %(args[i], interfaces[i])
raise TypeError, 'arg %s does not implement %s' %(args[i], interfaces[-1])
func(self, *args, **kw)
return check_args
return wrapper
class HandlerChainInterface(Interface):
def processRequest(self, input, **kw):
"""returns a representation of the request, the
last link in the chain must return a response
pyobj with a typecode attribute.
Parameters:
input --
Keyword Parameters:
request -- HTTPRequest instance
resource -- Resource instance
"""
def processResponse(self, output, **kw):
"""returns a string representing the soap response.
Parameters
output --
Keyword Parameters:
request -- HTTPRequest instance
resource -- Resource instance
"""
class CallbackChainInterface(Interface):
def processRequest(self, input, **kw):
"""returns a response pyobj with a typecode
attribute.
Parameters:
input --
Keyword Parameters:
request -- HTTPRequest instance
resource -- Resource instance
"""
class DataHandler:
"""
class variables:
readerClass -- factory class to create reader for ParsedSoap instances.
writerClass -- ElementProxy implementation to use for SoapWriter instances.
"""
classProvides(HandlerChainInterface)
readerClass = None
writerClass = None
@classmethod
def processRequest(cls, input, **kw):
return ParsedSoap(input, readerclass=cls.readerClass)
@classmethod
def processResponse(cls, output, **kw):
sw = SoapWriter(outputclass=cls.writerClass)
sw.serialize(output)
return sw
class DefaultHandlerChain:
@CheckInputArgs(CallbackChainInterface, HandlerChainInterface)
def __init__(self, cb, *handlers):
self.handlercb = cb
self.handlers = handlers
def processRequest(self, arg, **kw):
for h in self.handlers:
arg = h.processRequest(arg, **kw)
return self.handlercb.processRequest(arg, **kw)
def processResponse(self, arg, **kw):
if arg is None:
return
for h in self.handlers:
arg = h.processResponse(arg, **kw)
s = str(arg)
return s
|