/usr/lib/python2.7/dist-packages/flower/utils/search.py is in python-flower 0.8.3+dfsg-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | import re
def parse_search_terms(raw_search_value):
search_regexp = r'(?:[^\s,"]|"(?:\\.|[^"])*")+' # splits by space, ignores space in quotes
if not raw_search_value:
return {}
parsed_search = {}
for query_part in re.findall(search_regexp, raw_search_value):
if not query_part:
continue
if query_part.startswith('result:'):
parsed_search['result'] = preprocess_search_value(query_part[len('result:'):])
elif query_part.startswith('args:'):
if 'args' not in parsed_search:
parsed_search['args'] = []
parsed_search['args'].append(preprocess_search_value(query_part[len('args:'):]))
elif query_part.startswith('kwargs:'):
if 'kwargs'not in parsed_search:
parsed_search['kwargs'] = {}
key, value = [p.strip() for p in query_part[len('kwargs:'):].split('=')]
parsed_search['kwargs'][key] = preprocess_search_value(value)
else:
parsed_search['any'] = preprocess_search_value(query_part)
return parsed_search
def satisfies_search_terms(task, any_value_search_term, result_search_term, args_search_terms, kwargs_search_terms):
if not any([any_value_search_term, result_search_term, args_search_terms, kwargs_search_terms]):
return True
terms = [
any_value_search_term and any_value_search_term in '|'.join([task.args, task.kwargs, str(task.result)]),
result_search_term and result_search_term in task.result,
kwargs_search_terms and all(
stringified_dict_contains_value(k, v, task.kwargs) for k, v in kwargs_search_terms.items()
),
args_search_terms and task_args_contains_search_args(task.args, args_search_terms)
]
return any(terms)
def stringified_dict_contains_value(key, value, str_dict):
"""
Checks if dict in for of string like "{'test': 5}" contains key/value pair.
This works faster, then creating actual dict from string since this operation is called
for each task in case of kwargs search.
"""
value = str(value)
try:
key_index = str_dict.index(key) + len(key) + 3 # + 3 for key right quote, one for colon and one for space
except ValueError: # key not found
return False
try:
comma_index = str_dict.index(',', key_index)
except ValueError: # last value in dict
comma_index = str_dict.index('}', key_index)
return str(value) == str_dict[key_index:comma_index].strip('"\'')
def preprocess_search_value(raw_value):
return raw_value.strip('" ') if raw_value else ''
def task_args_contains_search_args(task_args, search_args):
return all(a in task_args for a in search_args)
|