This file is indexed.

/usr/share/perl5/Gearman/Client/Async.pm is in libgearman-client-async-perl 0.94-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package Gearman::Client::Async;

=head1 NAME

Gearman::Client::Async - Asynchronous client module for Gearman for Danga::Socket applications

=head1 SYNOPSIS

    use Gearman::Client::Async;

    # Instantiate a new Gearman::Client::Async object.
    $client = Gearman::Client::Async->new(
        job_servers => [ '127.0.0.1', '192.168.0.1:123' ],
    );

    # Overwrite job server list with a new one.
    $client->set_job_servers( '10.0.0.1' );

    # Read list of job servers out of the client.
    $arrayref = $client->job_servers;
    @array = $client->job_servers;

    # Start a task
    $task = Gearman::Task->new(...); # with callbacks, etc
    $client->add_task( $task );

=head1 COPYRIGHT

Copyright 2006 Six Apart, Ltd.

License granted to use/distribute under the same terms as Perl itself.

=head1 WARRANTY

This is free software.  This comes with no warranty whatsoever.

=head1 AUTHORS

 Brad Fitzpatrick (brad@danga.com)
 Jonathan Steinert (hachi@cpan.org)

=cut

use strict;
use warnings;
use Carp qw(croak);

use fields (
            'job_servers',   # arrayref of Gearman::Client::Async::Connection objects
            't_no_random',   # don't randomize job server to use:  use first alive one.
            't_offline_host', # hashref: hostname -> $bool, if host should act as offline, for testing
            );

use Danga::Socket 1.52;
use Gearman::Objects;
use Gearman::Task;
use Gearman::JobStatus;
use Gearman::Client::Async::Connection;

use List::Util qw(first);
use vars qw($VERSION);

$VERSION = "0.94";

sub DEBUGGING () { 0 }

sub new {
    my ($class, %opts) = @_;
    my $self = $class;
    $self = fields::new($class) unless ref $self;

    $self->{job_servers}    = [];
    $self->{t_offline_host} = {};

    my $js = delete $opts{job_servers};
    $self->set_job_servers(@$js) if $js;

    croak "Unknown parameters: " . join(", ", keys %opts) if %opts;
    return $self;
}

# for testing.
sub t_set_disable_random {
    my $self = shift;
    $self->{t_no_random} = shift;
}

sub t_set_offline_host {
    my ($self, $host, $val) = @_;
    $val = 1 unless defined $val;
    $self->{t_offline_host}{$host} = $val;

    my $conn = first { $_->hostspec eq $host } @{ $self->{job_servers} }
        or die "No host found with that spec to mark offline";

    $conn->t_set_offline($val);
}

# set job servers, without shutting down dups, and shutting down old ones gracefully
sub set_job_servers {
    my Gearman::Client::Async $self = shift;

    my %being_set; # hostspec -> 1
    %being_set = map { $_, 1 } @_;

    my %exist;   # hostspec -> existing conn
    foreach my $econn (@{ $self->{job_servers} }) {
        my $spec = $econn->hostspec;
        if ($being_set{$spec}) {
            $exist{$spec} = $econn;
        } else {
            $econn->close_when_finished;
        }
    }

    my @newlist;
    foreach (@_) {
        push @newlist, $exist{$_} || Gearman::Client::Async::Connection->new( hostspec => $_ );
    }
    $self->{job_servers} = \@newlist;
}

# getter
sub job_servers {
    my Gearman::Client::Async $self = shift;
    croak "Not a setter" if @_;
    my @list = map { $_->hostspec } @{ $self->{job_servers} };
    return wantarray ? @list : \@list;
}

sub add_task {
    my Gearman::Client::Async $self = shift;
    my Gearman::Task $task = shift;

    my $try_again;
    $try_again = sub {

        my @job_servers = grep { $_->alive } @{$self->{job_servers}};
        warn "Alive servers: " . @job_servers . " out of " . @{$self->{job_servers}} . "\n" if DEBUGGING;
        unless (@job_servers) {
            $task->final_fail;
            $try_again = undef;
            return;
        }

        my $js;
        if (defined( my $hash = $task->hash )) {
            # Task is hashed, use key to fetch job server
            $js = @job_servers[$hash % @job_servers];
        }
        else {
            # Task is not hashed, random job server
            $js = @job_servers[$self->{t_no_random} ? 0 :
                               int( rand( @job_servers ))];
        }

        # TODO Fix this violation of object privacy.
        $task->{taskset} = $self;

        $js->get_in_ready_state(
                                # on_ready:
                                sub {
                                    my $timer;
                                    if (my $timeout = $task->{timeout}) {
                                        $timer = Danga::Socket->AddTimer($timeout, sub {
                                            $task->final_fail('timeout');
                                        });
                                    }
                                    $task->set_on_post_hooks(sub {
                                        $timer->cancel if $timer;

                                        # ALSO clean up our $js (connection's) waiting stuff:
                                        $js->give_up_on($task);
                                    });
                                    $js->add_task( $task );
                                    $try_again = undef;
                                },
                                # on_error:
                                $try_again,
                                );
    };
    $try_again->();
}

# Gearman::Client::Async sometimes fakes itself duck-typing style as a
# Gearman::Taskset, since a task"set" makes no sense in an async
# world, where there's no need to wait on a set of things... since
# everything happens at its own pace.  so for duck-typing reasons (or,
# er, "implementing an interface", say), we need to implement a the
# "taskset client method" but in our case, that's just us.
sub client { $_[0] }

# as a Gearman::Client-like thing, we'll be asked for our prefix, which this module
# currently doesn't support, but the base Gearman libraries expect.
sub prefix { "" }


1;