/usr/share/pyshared/gearman/worker_handler.py is in python-gearman 2.0.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | import logging
from gearman.command_handler import GearmanCommandHandler
from gearman.errors import InvalidWorkerState
from gearman.protocol import GEARMAN_COMMAND_PRE_SLEEP, GEARMAN_COMMAND_RESET_ABILITIES, GEARMAN_COMMAND_CAN_DO, GEARMAN_COMMAND_SET_CLIENT_ID, GEARMAN_COMMAND_GRAB_JOB_UNIQ, \
GEARMAN_COMMAND_WORK_STATUS, GEARMAN_COMMAND_WORK_COMPLETE, GEARMAN_COMMAND_WORK_FAIL, GEARMAN_COMMAND_WORK_EXCEPTION, GEARMAN_COMMAND_WORK_WARNING, GEARMAN_COMMAND_WORK_DATA
gearman_logger = logging.getLogger(__name__)
class GearmanWorkerCommandHandler(GearmanCommandHandler):
"""GearmanWorker state machine on a per connection basis
A worker can be in the following distinct states:
SLEEP -> Doing nothing, can be awoken
AWAKE -> Transitional state (for NOOP)
AWAITING_JOB -> Holding worker level job lock and awaiting a server response
EXECUTING_JOB -> Transitional state (for ASSIGN_JOB)
"""
def __init__(self, connection_manager=None):
super(GearmanWorkerCommandHandler, self).__init__(connection_manager=connection_manager)
self._handler_abilities = []
self._client_id = None
def initial_state(self, abilities=None, client_id=None):
self.set_client_id(client_id)
self.set_abilities(abilities)
self._sleep()
##################################################################
##### Public interface methods to be called by GearmanWorker #####
##################################################################
def set_abilities(self, connection_abilities_list):
assert type(connection_abilities_list) in (list, tuple)
self._handler_abilities = connection_abilities_list
self.send_command(GEARMAN_COMMAND_RESET_ABILITIES)
for task in self._handler_abilities:
self.send_command(GEARMAN_COMMAND_CAN_DO, task=task)
def set_client_id(self, client_id):
self._client_id = client_id
if self._client_id is not None:
self.send_command(GEARMAN_COMMAND_SET_CLIENT_ID, client_id=self._client_id)
###############################################################
#### Convenience methods for typical gearman jobs to call #####
###############################################################
def send_job_status(self, current_job, numerator, denominator):
assert type(numerator) in (int, float), 'Numerator must be a numeric value'
assert type(denominator) in (int, float), 'Denominator must be a numeric value'
self.send_command(GEARMAN_COMMAND_WORK_STATUS, job_handle=current_job.handle, numerator=str(numerator), denominator=str(denominator))
def send_job_complete(self, current_job, data):
"""Removes a job from the queue if its backgrounded"""
self.send_command(GEARMAN_COMMAND_WORK_COMPLETE, job_handle=current_job.handle, data=self.encode_data(data))
def send_job_failure(self, current_job):
"""Removes a job from the queue if its backgrounded"""
self.send_command(GEARMAN_COMMAND_WORK_FAIL, job_handle=current_job.handle)
def send_job_exception(self, current_job, data):
# Using GEARMAND_COMMAND_WORK_EXCEPTION is not recommended at time of this writing [2010-02-24]
# http://groups.google.com/group/gearman/browse_thread/thread/5c91acc31bd10688/529e586405ed37fe
#
self.send_command(GEARMAN_COMMAND_WORK_EXCEPTION, job_handle=current_job.handle, data=self.encode_data(data))
def send_job_data(self, current_job, data):
self.send_command(GEARMAN_COMMAND_WORK_DATA, job_handle=current_job.handle, data=self.encode_data(data))
def send_job_warning(self, current_job, data):
self.send_command(GEARMAN_COMMAND_WORK_WARNING, job_handle=current_job.handle, data=self.encode_data(data))
###########################################################
### Callbacks when we receive a command from the server ###
###########################################################
def _grab_job(self):
self.send_command(GEARMAN_COMMAND_GRAB_JOB_UNIQ)
def _sleep(self):
self.send_command(GEARMAN_COMMAND_PRE_SLEEP)
def _check_job_lock(self):
return self.connection_manager.check_job_lock(self)
def _acquire_job_lock(self):
return self.connection_manager.set_job_lock(self, lock=True)
def _release_job_lock(self):
if not self.connection_manager.set_job_lock(self, lock=False):
raise InvalidWorkerState("Unable to release job lock for %r" % self)
return True
def recv_noop(self):
"""Transition from being SLEEP --> AWAITING_JOB / SLEEP
AWAITING_JOB -> AWAITING_JOB :: Noop transition, we're already awaiting a job
SLEEP -> AWAKE -> AWAITING_JOB :: Transition if we can acquire the worker job lock
SLEEP -> AWAKE -> SLEEP :: Transition if we can NOT acquire a worker job lock
"""
if self._check_job_lock():
pass
elif self._acquire_job_lock():
self._grab_job()
else:
self._sleep()
return True
def recv_no_job(self):
"""Transition from being AWAITING_JOB --> SLEEP
AWAITING_JOB -> SLEEP :: Always transition to sleep if we have nothing to do
"""
self._release_job_lock()
self._sleep()
return True
def recv_job_assign_uniq(self, job_handle, task, unique, data):
"""Transition from being AWAITING_JOB --> EXECUTE_JOB --> SLEEP
AWAITING_JOB -> EXECUTE_JOB -> SLEEP :: Always transition once we're given a job
"""
assert task in self._handler_abilities, '%s not found in %r' % (task, self._handler_abilities)
# After this point, we know this connection handler is holding onto the job lock so we don't need to acquire it again
if not self.connection_manager.check_job_lock(self):
raise InvalidWorkerState("Received a job when we weren't expecting one")
gearman_job = self.connection_manager.create_job(self, job_handle, task, unique, self.decode_data(data))
# Create a new job
self.connection_manager.on_job_execute(gearman_job)
# Release the job lock once we're doing and go back to sleep
self._release_job_lock()
self._sleep()
return True
def recv_job_assign(self, job_handle, task, data):
"""JOB_ASSIGN and JOB_ASSIGN_UNIQ are essentially the same"""
return self.recv_job_assign(job_handle=job_handle, task=task, unique=None, data=data)
|