270 lines
7.1 KiB
Perl
Executable File
270 lines
7.1 KiB
Perl
Executable File
package Gearman::Taskset;
|
|
|
|
use strict;
|
|
use Carp ();
|
|
use Gearman::Util;
|
|
|
|
sub new {
|
|
my $class = shift;
|
|
my Gearman::Client $client = shift;
|
|
|
|
my $self = $class;
|
|
$self = fields::new($class) unless ref $self;
|
|
|
|
$self->{waiting} = {};
|
|
$self->{need_handle} = [];
|
|
$self->{client} = $client;
|
|
$self->{loaned_sock} = {};
|
|
|
|
return $self;
|
|
}
|
|
|
|
sub DESTROY {
|
|
my Gearman::Taskset $ts = shift;
|
|
|
|
if ($ts->{default_sock}) {
|
|
$ts->{client}->_put_js_sock($ts->{default_sockaddr}, $ts->{default_sock});
|
|
}
|
|
|
|
while (my ($hp, $sock) = each %{ $ts->{loaned_sock} }) {
|
|
$ts->{client}->_put_js_sock($hp, $sock);
|
|
}
|
|
}
|
|
|
|
sub _get_loaned_sock {
|
|
my Gearman::Taskset $ts = shift;
|
|
my $hostport = shift;
|
|
if (my $sock = $ts->{loaned_sock}{$hostport}) {
|
|
return $sock if $sock->connected;
|
|
delete $ts->{loaned_sock}{$hostport};
|
|
}
|
|
|
|
my $sock = $ts->{client}->_get_js_sock($hostport);
|
|
return $ts->{loaned_sock}{$hostport} = $sock;
|
|
}
|
|
|
|
sub wait {
|
|
my Gearman::Taskset $ts = shift;
|
|
|
|
while (keys %{$ts->{waiting}}) {
|
|
$ts->_wait_for_packet();
|
|
# TODO: timeout jobs that have been running too long. the _wait_for_packet
|
|
# loop only waits 0.5 seconds.
|
|
}
|
|
}
|
|
|
|
# ->add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hashref>
|
|
# opts:
|
|
# -- uniq
|
|
# -- on_complete
|
|
# -- on_fail
|
|
# -- on_status
|
|
# -- retry_count
|
|
# -- fail_after_idle
|
|
# -- high_priority
|
|
# ->add_task(Gearman::Task)
|
|
#
|
|
|
|
sub add_task {
|
|
my Gearman::Taskset $ts = shift;
|
|
my $task;
|
|
|
|
if (ref $_[0]) {
|
|
$task = shift;
|
|
} else {
|
|
my $func = shift;
|
|
my $arg_p = shift; # scalar or scalarref
|
|
my $opts = shift; # $uniq or hashref of opts
|
|
|
|
my $argref = ref $arg_p ? $arg_p : \$arg_p;
|
|
unless (ref $opts eq "HASH") {
|
|
$opts = { uniq => $opts };
|
|
}
|
|
|
|
$task = Gearman::Task->new($func, $argref, $opts);
|
|
}
|
|
$task->taskset($ts);
|
|
|
|
my $req = $task->pack_submit_packet;
|
|
my $len = length($req);
|
|
my $rv = $task->{jssock}->syswrite($req, $len);
|
|
die "Wrote $rv but expected to write $len" unless $rv == $len;
|
|
|
|
push @{ $ts->{need_handle} }, $task;
|
|
while (@{ $ts->{need_handle} }) {
|
|
my $rv = $ts->_wait_for_packet($task->{jssock});
|
|
if (! $rv) {
|
|
shift @{ $ts->{need_handle} }; # ditch it, it failed.
|
|
# this will resubmit it if it failed.
|
|
print " INITIAL SUBMIT FAILED\n";
|
|
return $task->fail;
|
|
}
|
|
}
|
|
|
|
return $task->handle;
|
|
}
|
|
|
|
sub _get_default_sock {
|
|
my Gearman::Taskset $ts = shift;
|
|
return $ts->{default_sock} if $ts->{default_sock};
|
|
|
|
my $getter = sub {
|
|
my $hostport = shift;
|
|
return
|
|
$ts->{loaned_sock}{$hostport} ||
|
|
$ts->{client}->_get_js_sock($hostport);
|
|
};
|
|
|
|
my ($jst, $jss) = $ts->{client}->_get_random_js_sock($getter);
|
|
$ts->{loaned_sock}{$jst} ||= $jss;
|
|
|
|
$ts->{default_sock} = $jss;
|
|
$ts->{default_sockaddr} = $jst;
|
|
return $jss;
|
|
}
|
|
|
|
sub _get_hashed_sock {
|
|
my Gearman::Taskset $ts = shift;
|
|
my $hv = shift;
|
|
|
|
my Gearman::Client $cl = $ts->{client};
|
|
|
|
for (my $off = 0; $off < $cl->{js_count}; $off++) {
|
|
my $idx = ($hv + $off) % ($cl->{js_count});
|
|
my $sock = $ts->_get_loaned_sock($cl->{job_servers}[$idx]);
|
|
return $sock if $sock;
|
|
}
|
|
|
|
return undef;
|
|
}
|
|
|
|
# returns boolean when given a sock to wait on.
|
|
# otherwise, return value is undefined.
|
|
sub _wait_for_packet {
|
|
my Gearman::Taskset $ts = shift;
|
|
my $sock = shift; # optional socket to singularly read from
|
|
|
|
my ($res, $err);
|
|
if ($sock) {
|
|
$res = Gearman::Util::read_res_packet($sock, \$err);
|
|
return 0 unless $res;
|
|
return $ts->_process_packet($res, $sock);
|
|
} else {
|
|
# TODO: cache this vector?
|
|
my ($rin, $rout, $eout);
|
|
my %watching;
|
|
|
|
for my $sock ($ts->{default_sock}, values %{ $ts->{loaned_sock} }) {
|
|
next unless $sock;
|
|
my $fd = $sock->fileno;
|
|
|
|
vec($rin, $fd, 1) = 1;
|
|
$watching{$fd} = $sock;
|
|
}
|
|
|
|
my $nfound = select($rout=$rin, undef, $eout=$rin, 0.5);
|
|
return 0 if ! $nfound;
|
|
|
|
foreach my $fd (keys %watching) {
|
|
next unless vec($rout, $fd, 1);
|
|
# TODO: deal with error vector
|
|
my $sock = $watching{$fd};
|
|
$res = Gearman::Util::read_res_packet($sock, \$err);
|
|
$ts->_process_packet($res, $sock) if $res;
|
|
}
|
|
return 1;
|
|
|
|
}
|
|
}
|
|
|
|
sub _ip_port {
|
|
my $sock = shift;
|
|
return undef unless $sock;
|
|
my $pn = getpeername($sock) or return undef;
|
|
my ($port, $iaddr) = Socket::sockaddr_in($pn);
|
|
return Socket::inet_ntoa($iaddr) . ":$port";
|
|
}
|
|
|
|
# note the failure of a task given by its jobserver-specific handle
|
|
sub _fail_jshandle {
|
|
my Gearman::Taskset $ts = shift;
|
|
my $shandle = shift;
|
|
|
|
my $task_list = $ts->{waiting}{$shandle} or
|
|
die "Uhhhh: got work_fail for unknown handle: $shandle\n";
|
|
|
|
my Gearman::Task $task = shift @$task_list or
|
|
die "Uhhhh: task_list is empty on work_fail for handle $shandle\n";
|
|
|
|
$task->fail;
|
|
delete $ts->{waiting}{$shandle} unless @$task_list;
|
|
}
|
|
|
|
sub _process_packet {
|
|
my Gearman::Taskset $ts = shift;
|
|
my ($res, $sock) = @_;
|
|
|
|
if ($res->{type} eq "job_created") {
|
|
my Gearman::Task $task = shift @{ $ts->{need_handle} } or
|
|
die "Um, got an unexpected job_created notification";
|
|
|
|
my $shandle = ${ $res->{'blobref'} };
|
|
my $ipport = _ip_port($sock);
|
|
|
|
# did sock become disconnected in the meantime?
|
|
if (! $ipport) {
|
|
$ts->_fail_jshandle($shandle);
|
|
return 1;
|
|
}
|
|
|
|
$task->handle("$ipport//$shandle");
|
|
push @{ $ts->{waiting}{$shandle} ||= [] }, $task;
|
|
return 1;
|
|
}
|
|
|
|
if ($res->{type} eq "work_fail") {
|
|
my $shandle = ${ $res->{'blobref'} };
|
|
$ts->_fail_jshandle($shandle);
|
|
return 1;
|
|
}
|
|
|
|
if ($res->{type} eq "work_complete") {
|
|
${ $res->{'blobref'} } =~ s/^(.+?)\0//
|
|
or die "Bogus work_complete from server";
|
|
my $shandle = $1;
|
|
|
|
my $task_list = $ts->{waiting}{$shandle} or
|
|
die "Uhhhh: got work_complete for unknown handle: $shandle\n";
|
|
|
|
my Gearman::Task $task = shift @$task_list or
|
|
die "Uhhhh: task_list is empty on work_complete for handle $shandle\n";
|
|
|
|
$task->complete($res->{'blobref'});
|
|
delete $ts->{waiting}{$shandle} unless @$task_list;
|
|
|
|
return 1;
|
|
}
|
|
|
|
if ($res->{type} eq "work_status") {
|
|
my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
|
|
|
|
my $task_list = $ts->{waiting}{$shandle} or
|
|
die "Uhhhh: got work_status for unknown handle: $shandle\n";
|
|
|
|
# FIXME: the server is (probably) sending a work_status packet for each
|
|
# interested client, even if the clients are the same, so probably need
|
|
# to fix the server not to do that. just put this FIXME here for now,
|
|
# though really it's a server issue.
|
|
foreach my Gearman::Task $task (@$task_list) {
|
|
$task->status($nu, $de);
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
die "Unknown/unimplemented packet type: $res->{type}";
|
|
|
|
}
|
|
|
|
1;
|