/usr/share/pyshared/gearman/admin_client_handler.py is in python-gearman 2.0.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import collections
import logging
from gearman.command_handler import GearmanCommandHandler
from gearman.errors import ProtocolError, InvalidAdminClientState
from gearman.protocol import GEARMAN_COMMAND_ECHO_REQ, GEARMAN_COMMAND_TEXT_COMMAND, \
GEARMAN_SERVER_COMMAND_STATUS, GEARMAN_SERVER_COMMAND_VERSION, \
GEARMAN_SERVER_COMMAND_WORKERS, GEARMAN_SERVER_COMMAND_MAXQUEUE, GEARMAN_SERVER_COMMAND_SHUTDOWN
gearman_logger = logging.getLogger(__name__)
EXPECTED_GEARMAN_SERVER_COMMANDS = set([GEARMAN_SERVER_COMMAND_STATUS, GEARMAN_SERVER_COMMAND_VERSION, \
GEARMAN_SERVER_COMMAND_WORKERS, GEARMAN_SERVER_COMMAND_MAXQUEUE, GEARMAN_SERVER_COMMAND_SHUTDOWN])
class GearmanAdminClientCommandHandler(GearmanCommandHandler):
"""Special GEARMAN_COMMAND_TEXT_COMMAND command handler that'll parse text responses from the server"""
STATUS_FIELDS = 4
WORKERS_FIELDS = 4
def __init__(self, connection_manager=None):
super(GearmanAdminClientCommandHandler, self).__init__(connection_manager=connection_manager)
self._sent_commands = collections.deque()
self._recv_responses = collections.deque()
self._status_response = []
self._workers_response = []
#######################################################################
##### Public interface methods to be called by GearmanAdminClient #####
#######################################################################
@property
def response_ready(self):
return bool(self._recv_responses)
def pop_response(self):
if not self._sent_commands or not self._recv_responses:
raise InvalidAdminClientState('Attempted to pop a response for a command that is not ready')
sent_command = self._sent_commands.popleft()
recv_response = self._recv_responses.popleft()
return sent_command, recv_response
def send_text_command(self, command_line):
"""Send our administrative text command"""
expected_server_command = None
for server_command in EXPECTED_GEARMAN_SERVER_COMMANDS:
if command_line.startswith(server_command):
expected_server_command = server_command
break
if not expected_server_command:
raise ProtocolError('Attempted to send an unknown server command: %r' % command_line)
self._sent_commands.append(expected_server_command)
output_text = '%s\n' % command_line
self.send_command(GEARMAN_COMMAND_TEXT_COMMAND, raw_text=output_text)
def send_echo_request(self, echo_string):
"""Send our administrative text command"""
self._sent_commands.append(GEARMAN_COMMAND_ECHO_REQ)
self.send_command(GEARMAN_COMMAND_ECHO_REQ, data=echo_string)
###########################################################
### Callbacks when we receive a command from the server ###
###########################################################
def recv_echo_res(self, data):
self._recv_responses.append(data)
return False
def recv_text_command(self, raw_text):
"""Catch GEARMAN_COMMAND_TEXT_COMMAND's and forward them onto their respective recv_server_* callbacks"""
if not self._sent_commands:
raise InvalidAdminClientState('Received an unexpected server response')
# Peek at the first command
cmd_type = self._sent_commands[0]
recv_server_command_function_name = 'recv_server_%s' % cmd_type
cmd_callback = getattr(self, recv_server_command_function_name, None)
if not cmd_callback:
gearman_logger.error('Could not handle command: %r - %r' % (cmd_type, raw_text))
raise ValueError('Could not handle command: %r - %r' % (cmd_type, raw_text))
# This must match the parameter names as defined in the command handler
completed_work = cmd_callback(raw_text)
return completed_work
def recv_server_status(self, raw_text):
"""Slowly assemble a server status message line by line"""
# If we received a '.', we've finished parsing this status message
# Pack up our output and reset our response queue
if raw_text == '.':
output_response = tuple(self._status_response)
self._recv_responses.append(output_response)
self._status_response = []
return False
# If we didn't get a final response, split our line and interpret all the data
split_tokens = raw_text.split('\t')
if len(split_tokens) != self.STATUS_FIELDS:
raise ProtocolError('Received %d tokens, expected %d tokens: %r' % (len(split_tokens), self.STATUS_FIELDS, split_tokens))
# Label our fields and make the results Python friendly
task, queued_count, running_count, worker_count = split_tokens
status_dict = {}
status_dict['task'] = task
status_dict['queued'] = int(queued_count)
status_dict['running'] = int(running_count)
status_dict['workers'] = int(worker_count)
self._status_response.append(status_dict)
return True
def recv_server_version(self, raw_text):
"""Version response is a simple passthrough"""
self._recv_responses.append(raw_text)
return False
def recv_server_workers(self, raw_text):
"""Slowly assemble a server workers message line by line"""
# If we received a '.', we've finished parsing this workers message
# Pack up our output and reset our response queue
if raw_text == '.':
output_response = tuple(self._workers_response)
self._recv_responses.append(output_response)
self._workers_response = []
return False
split_tokens = raw_text.split(' ')
if len(split_tokens) < self.WORKERS_FIELDS:
raise ProtocolError('Received %d tokens, expected >= 4 tokens: %r' % (len(split_tokens), split_tokens))
if split_tokens[3] != ':':
raise ProtocolError('Malformed worker response: %r' % (split_tokens, ))
# Label our fields and make the results Python friendly
worker_dict = {}
worker_dict['file_descriptor'] = split_tokens[0]
worker_dict['ip'] = split_tokens[1]
worker_dict['client_id'] = split_tokens[2]
worker_dict['tasks'] = tuple(split_tokens[4:])
self._workers_response.append(worker_dict)
return True
def recv_server_maxqueue(self, raw_text):
"""Maxqueue response is a simple passthrough"""
if raw_text != 'OK':
raise ProtocolError("Expected 'OK', received: %s" % raw_text)
self._recv_responses.append(raw_text)
return False
def recv_server_shutdown(self, raw_text):
"""Shutdown response is a simple passthrough"""
self._recv_responses.append(None)
return False
|