This file is indexed.

/usr/share/pyshared/gearman/client_handler.py is in python-gearman 2.0.2-2build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import collections
import time
import logging

from gearman.command_handler import GearmanCommandHandler
from gearman.constants import JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import InvalidClientState
from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority

gearman_logger = logging.getLogger(__name__)

class GearmanClientCommandHandler(GearmanCommandHandler):
    """Maintains the state of this connection on behalf of a GearmanClient"""
    def __init__(self, connection_manager=None):
        super(GearmanClientCommandHandler, self).__init__(connection_manager=connection_manager)

        # When we first submit jobs, we don't have a handle assigned yet... these handles will be returned in the order of submission
        self.requests_awaiting_handles = collections.deque()
        self.handle_to_request_map = dict()

    ##################################################################
    ##### Public interface methods to be called by GearmanClient #####
    ##################################################################
    def send_job_request(self, current_request):
        """Register a newly created job request"""
        self._assert_request_state(current_request, JOB_UNKNOWN)

        gearman_job = current_request.job

        # Handle the I/O for requesting a job - determine which COMMAND we need to send
        cmd_type = submit_cmd_for_background_priority(current_request.background, current_request.priority)

        outbound_data = self.encode_data(gearman_job.data)
        self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data)

        # Once this command is sent, our request needs to wait for a handle
        current_request.state = JOB_PENDING

        self.requests_awaiting_handles.append(current_request)

    def send_get_status_of_job(self, current_request):
        """Forward the status of a job"""
        self._register_request(current_request)
        self.send_command(GEARMAN_COMMAND_GET_STATUS, job_handle=current_request.job.handle)

    def on_io_error(self):
        for pending_request in self.requests_awaiting_handles:
            pending_request.state = JOB_UNKNOWN

        for inflight_request in self.handle_to_request_map.itervalues():
            inflight_request.state = JOB_UNKNOWN

    def _register_request(self, current_request):
        self.handle_to_request_map[current_request.job.handle] = current_request

    def _unregister_request(self, current_request):
        # De-allocate this request for all jobs
        return self.handle_to_request_map.pop(current_request.job.handle, None)

    ##################################################################
    ## Gearman command callbacks with kwargs defined by protocol.py ##
    ##################################################################
    def _assert_request_state(self, current_request, expected_state):
        if current_request.state != expected_state:
            raise InvalidClientState('Expected handle (%s) to be in state %r, got %r' % (current_request.job.handle, expected_state, current_request.state))

    def recv_job_created(self, job_handle):
        if not self.requests_awaiting_handles:
            raise InvalidClientState('Received a job_handle with no pending requests')

        # If our client got a JOB_CREATED, our request now has a server handle
        current_request = self.requests_awaiting_handles.popleft()
        self._assert_request_state(current_request, JOB_PENDING)

        # Update the state of this request
        current_request.job.handle = job_handle
        current_request.state = JOB_CREATED
        self._register_request(current_request)

        return True

    def recv_work_data(self, job_handle, data):
        # Queue a WORK_DATA update
        current_request = self.handle_to_request_map[job_handle]
        self._assert_request_state(current_request, JOB_CREATED)

        current_request.data_updates.append(self.decode_data(data))

        return True

    def recv_work_warning(self, job_handle, data):
        # Queue a WORK_WARNING update
        current_request = self.handle_to_request_map[job_handle]
        self._assert_request_state(current_request, JOB_CREATED)

        current_request.warning_updates.append(self.decode_data(data))

        return True

    def recv_work_status(self, job_handle, numerator, denominator):
        # Queue a WORK_STATUS update
        current_request = self.handle_to_request_map[job_handle]
        self._assert_request_state(current_request, JOB_CREATED)

        # The protocol spec is ambiguous as to what type the numerator and denominator is...
        # But according to Eskil, gearmand interprets these as integers
        current_request.status = {
            'handle': job_handle,
            'known': True,
            'running': True,
            'numerator': int(numerator),
            'denominator': int(denominator),
            'time_received': time.time()
        }
        return True

    def recv_work_complete(self, job_handle, data):
        # Update the state of our request and store our returned result
        current_request = self.handle_to_request_map[job_handle]
        self._assert_request_state(current_request, JOB_CREATED)

        current_request.result = self.decode_data(data)
        current_request.state = JOB_COMPLETE
        self._unregister_request(current_request)

        return True

    def recv_work_fail(self, job_handle):
        # Update the state of our request and mark this job as failed
        current_request = self.handle_to_request_map[job_handle]
        self._assert_request_state(current_request, JOB_CREATED)

        current_request.state = JOB_FAILED
        self._unregister_request(current_request)

        return True

    def recv_work_exception(self, job_handle, data):
        # Using GEARMAND_COMMAND_WORK_EXCEPTION is not recommended at time of this writing [2010-02-24]
        # http://groups.google.com/group/gearman/browse_thread/thread/5c91acc31bd10688/529e586405ed37fe
        #
        current_request = self.handle_to_request_map[job_handle]
        self._assert_request_state(current_request, JOB_CREATED)

        current_request.exception = self.decode_data(data)

        return True

    def recv_status_res(self, job_handle, known, running, numerator, denominator):
        # If we received a STATUS_RES update about this request, update our known status
        current_request = self.handle_to_request_map[job_handle]

        job_known = bool(known == '1')
        # Make our status response Python friendly
        current_request.status = {
            'handle': job_handle,
            'known': job_known,
            'running': bool(running == '1'),
            'numerator': int(numerator),
            'denominator': int(denominator),
            'time_received': time.time()
        }

        # If the server doesn't know about this request, we no longer need to track it
        if not job_known:
            self._unregister_request(current_request)

        return True