/usr/lib/python3/dist-packages/pydap/wsgi/ssf.py is in python3-pydap 3.2.2+ds1-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | """An implementation of server-side functions.
Pydap implements DAP server-side functions throught a custom WSGI middleware.
This simplifies writing custom handlers, since they don't need to parse and
apply the function calls themselves.
"""
import re
import operator
import ast
from webob import Request
from pkg_resources import iter_entry_points
import numpy as np
from six.moves import reduce, map
from six import string_types
from ..model import DatasetType, SequenceType
from ..parsers import parse_ce
from ..lib import walk, fix_shorthand, load_from_entry_point_relative
from ..handlers.lib import BaseHandler, apply_projection
from ..exceptions import ServerError
FUNCTION = re.compile(r'([^(]*)\((.*)\)')
RELOP = re.compile(r'(<=|<|>=|>|=~|=|!=)')
def load_functions():
"""Load all available functions from the system, returning a dictionary."""
# Relative import of functions:
package = 'pydap'
entry_points = 'pydap.function'
base_dict = dict(load_from_entry_point_relative(r, package)
for r in iter_entry_points(entry_points)
if r.module_name.startswith(package))
opts_dict = dict((r.name, r.load())
for r in iter_entry_points(entry_points)
if not r.module_name.startswith(package))
base_dict.update(opts_dict)
return base_dict
class ServerSideFunctions(object):
"""A WebOb based middleware for handling server-side function calls.
The middleware works by removing function calls from the request,
forwarding the request to Pydap, and then applying the functions calls to
the returned dataset.
"""
def __init__(self, app, **kwargs):
self.app = app
self.functions = load_functions()
self.functions.update(kwargs)
def __call__(self, environ, start_response):
# specify that we want the parsed dataset
environ['x-wsgiorg.want_parsed_response'] = True
req = Request(environ)
projection, selection = parse_ce(req.query_string)
# check if there are any functions calls in the request
called = (
any(s for s in selection if FUNCTION.match(s)) or
any(p for p in projection if isinstance(p, string_types)))
# ignore DAS requests and requests without functions
path, response = req.path.rsplit('.', 1)
if response == 'das' or not called:
return self.app(environ, start_response)
# apply selection without any function calls
req.query_string = '&'.join(
s for s in selection if not FUNCTION.match(s))
res = req.get_response(self.app)
# get the dataset
method = getattr(res.app_iter, 'x_wsgiorg_parsed_response', False)
if not method:
raise ServerError("Unable to call server-side function!")
dataset = method(DatasetType)
# apply selection containing server-side functions
selection = (s for s in selection if FUNCTION.match(s))
for expr in selection:
if RELOP.search(expr):
call, op, other = RELOP.split(expr)
op = {
'<': operator.lt,
'>': operator.gt,
'!=': operator.ne,
'=': operator.eq,
'>=': operator.ge,
'<=': operator.le,
'=~': lambda a, b: re.match(b, a),
}[op]
other = ast.literal_eval(other)
else:
call, op, other = expr, operator.eq, 1
# evaluate the function call
sequence = eval_function(dataset, call, self.functions)
# is this an inplace call?
for var in walk(dataset, SequenceType):
if sequence is var:
break
else:
# get the data from the resulting variable, and use it to
# constrain the original dataset
child = list(sequence.children())[0]
data = np.fromiter(child.data, child.dtype)
if data.dtype.char == "S":
valid = np.array(
list(map(lambda v: op(str(v), str(other)), data)),
bool)
else:
valid = op(data, other)
for sequence in walk(dataset, SequenceType):
sequence.data = np.rec.fromrecords(
[tuple(row) for row in sequence.iterdata()],
names=list(sequence.keys()))[valid]
# now apply projection
if projection:
projection = fix_shorthand(projection, dataset)
base = [p for p in projection if not isinstance(p, string_types)]
func = [p for p in projection if isinstance(p, string_types)]
# apply non-function projection
out = apply_projection(base, dataset)
# apply function projection
for call in func:
var = eval_function(dataset, call, self.functions)
for child in walk(var):
parent = reduce(
operator.getitem, [out] + child.id.split('.')[:-1])
if child.name not in parent.keys():
parent[child.name] = child
break
dataset = out
# Return the original response (DDS, DAS, etc.)
path, response = req.path.rsplit('.', 1)
res = BaseHandler.responses[response](dataset)
return res(environ, start_response)
def eval_function(dataset, function, functions):
"""Evaluate a given function on a dataset.
This function parses and evaluates a (possibly nested) function call,
returning its result.
"""
name, args = FUNCTION.match(function).groups()
def tokenize(input):
start = pos = count = 0
for char in input:
if char == '(':
count += 1
elif char == ')':
count -= 1
elif char == ',' and count == 0:
yield input[start:pos]
start = pos+1
pos += 1
yield input[start:]
def parse(token):
if FUNCTION.match(token):
return eval_function(dataset, token, functions)
else:
try:
names = re.sub(r'\[.*?\]', '', str(token)).split('.')
return reduce(operator.getitem, [dataset] + names)
except:
try:
return ast.literal_eval(token)
except:
return token
args = map(parse, tokenize(args))
func = functions[name]
return func(dataset, *args)
|