270 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Perl
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			270 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Perl
		
	
	
		
			Executable File
		
	
	
package Gearman::Taskset;
 | 
						|
 | 
						|
use strict;
 | 
						|
use Carp ();
 | 
						|
use Gearman::Util;
 | 
						|
 | 
						|
sub new {
 | 
						|
    my $class = shift;
 | 
						|
    my Gearman::Client $client = shift;
 | 
						|
 | 
						|
    my $self = $class;
 | 
						|
    $self = fields::new($class) unless ref $self;
 | 
						|
 | 
						|
    $self->{waiting} = {};
 | 
						|
    $self->{need_handle} = [];
 | 
						|
    $self->{client} = $client;
 | 
						|
    $self->{loaned_sock} = {};
 | 
						|
 | 
						|
    return $self;
 | 
						|
}
 | 
						|
 | 
						|
sub DESTROY {
 | 
						|
    my Gearman::Taskset $ts = shift;
 | 
						|
 | 
						|
    if ($ts->{default_sock}) {
 | 
						|
        $ts->{client}->_put_js_sock($ts->{default_sockaddr}, $ts->{default_sock});
 | 
						|
    }
 | 
						|
 | 
						|
    while (my ($hp, $sock) = each %{ $ts->{loaned_sock} }) {
 | 
						|
        $ts->{client}->_put_js_sock($hp, $sock);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub _get_loaned_sock {
 | 
						|
    my Gearman::Taskset $ts = shift;
 | 
						|
    my $hostport = shift;
 | 
						|
    if (my $sock = $ts->{loaned_sock}{$hostport}) {
 | 
						|
        return $sock if $sock->connected;
 | 
						|
        delete $ts->{loaned_sock}{$hostport};
 | 
						|
    }
 | 
						|
 | 
						|
    my $sock = $ts->{client}->_get_js_sock($hostport);
 | 
						|
    return $ts->{loaned_sock}{$hostport} = $sock;
 | 
						|
}
 | 
						|
 | 
						|
sub wait {
 | 
						|
    my Gearman::Taskset $ts = shift;
 | 
						|
 | 
						|
    while (keys %{$ts->{waiting}}) {
 | 
						|
        $ts->_wait_for_packet();
 | 
						|
        # TODO: timeout jobs that have been running too long.  the _wait_for_packet
 | 
						|
        # loop only waits 0.5 seconds.
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# ->add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hashref>
 | 
						|
#      opts:
 | 
						|
#        -- uniq
 | 
						|
#        -- on_complete
 | 
						|
#        -- on_fail
 | 
						|
#        -- on_status
 | 
						|
#        -- retry_count
 | 
						|
#        -- fail_after_idle
 | 
						|
#        -- high_priority
 | 
						|
# ->add_task(Gearman::Task)
 | 
						|
#
 | 
						|
 | 
						|
sub add_task {
 | 
						|
    my Gearman::Taskset $ts = shift;
 | 
						|
    my $task;
 | 
						|
 | 
						|
    if (ref $_[0]) {
 | 
						|
        $task = shift;
 | 
						|
    } else {
 | 
						|
        my $func = shift;
 | 
						|
        my $arg_p = shift;   # scalar or scalarref
 | 
						|
        my $opts = shift;    # $uniq or hashref of opts
 | 
						|
 | 
						|
        my $argref = ref $arg_p ? $arg_p : \$arg_p;
 | 
						|
        unless (ref $opts eq "HASH") {
 | 
						|
            $opts = { uniq => $opts };
 | 
						|
        }
 | 
						|
 | 
						|
        $task = Gearman::Task->new($func, $argref, $opts);
 | 
						|
    }
 | 
						|
    $task->taskset($ts);
 | 
						|
 | 
						|
    my $req = $task->pack_submit_packet;
 | 
						|
    my $len = length($req);
 | 
						|
    my $rv = $task->{jssock}->syswrite($req, $len);
 | 
						|
    die "Wrote $rv but expected to write $len" unless $rv == $len;
 | 
						|
 | 
						|
    push @{ $ts->{need_handle} }, $task;
 | 
						|
    while (@{ $ts->{need_handle} }) {
 | 
						|
        my $rv = $ts->_wait_for_packet($task->{jssock});
 | 
						|
        if (! $rv) {
 | 
						|
            shift @{ $ts->{need_handle} };  # ditch it, it failed.
 | 
						|
            # this will resubmit it if it failed.
 | 
						|
            print " INITIAL SUBMIT FAILED\n";
 | 
						|
            return $task->fail;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return $task->handle;
 | 
						|
}
 | 
						|
 | 
						|
sub _get_default_sock {
 | 
						|
    my Gearman::Taskset $ts = shift;
 | 
						|
    return $ts->{default_sock} if $ts->{default_sock};
 | 
						|
 | 
						|
    my $getter = sub {
 | 
						|
        my $hostport = shift;
 | 
						|
        return
 | 
						|
            $ts->{loaned_sock}{$hostport} ||
 | 
						|
            $ts->{client}->_get_js_sock($hostport);
 | 
						|
    };
 | 
						|
 | 
						|
    my ($jst, $jss) = $ts->{client}->_get_random_js_sock($getter);
 | 
						|
    $ts->{loaned_sock}{$jst} ||= $jss;
 | 
						|
 | 
						|
    $ts->{default_sock} = $jss;
 | 
						|
    $ts->{default_sockaddr} = $jst;
 | 
						|
    return $jss;
 | 
						|
}
 | 
						|
 | 
						|
sub _get_hashed_sock {
 | 
						|
    my Gearman::Taskset $ts = shift;
 | 
						|
    my $hv = shift;
 | 
						|
 | 
						|
    my Gearman::Client $cl = $ts->{client};
 | 
						|
 | 
						|
    for (my $off = 0; $off < $cl->{js_count}; $off++) {
 | 
						|
        my $idx = ($hv + $off) % ($cl->{js_count});
 | 
						|
        my $sock = $ts->_get_loaned_sock($cl->{job_servers}[$idx]);
 | 
						|
        return $sock if $sock;
 | 
						|
    }
 | 
						|
 | 
						|
    return undef;
 | 
						|
}
 | 
						|
 | 
						|
# returns boolean when given a sock to wait on.
 | 
						|
# otherwise, return value is undefined.
 | 
						|
sub _wait_for_packet {
 | 
						|
    my Gearman::Taskset $ts = shift;
 | 
						|
    my $sock = shift;  # optional socket to singularly read from
 | 
						|
 | 
						|
    my ($res, $err);
 | 
						|
    if ($sock) {
 | 
						|
        $res = Gearman::Util::read_res_packet($sock, \$err);
 | 
						|
        return 0 unless $res;
 | 
						|
        return $ts->_process_packet($res, $sock);
 | 
						|
    } else {
 | 
						|
        # TODO: cache this vector?
 | 
						|
        my ($rin, $rout, $eout);
 | 
						|
        my %watching;
 | 
						|
 | 
						|
        for my $sock ($ts->{default_sock}, values %{ $ts->{loaned_sock} }) {
 | 
						|
            next unless $sock;
 | 
						|
            my $fd = $sock->fileno;
 | 
						|
 | 
						|
            vec($rin, $fd, 1) = 1;
 | 
						|
            $watching{$fd} = $sock;
 | 
						|
        }
 | 
						|
 | 
						|
        my $nfound = select($rout=$rin, undef, $eout=$rin, 0.5);
 | 
						|
        return 0 if ! $nfound;
 | 
						|
 | 
						|
        foreach my $fd (keys %watching) {
 | 
						|
            next unless vec($rout, $fd, 1);
 | 
						|
            # TODO: deal with error vector
 | 
						|
            my $sock = $watching{$fd};
 | 
						|
            $res = Gearman::Util::read_res_packet($sock, \$err);
 | 
						|
            $ts->_process_packet($res, $sock) if $res;
 | 
						|
        }
 | 
						|
        return 1;
 | 
						|
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub _ip_port {
 | 
						|
    my $sock = shift;
 | 
						|
    return undef unless $sock;
 | 
						|
    my $pn = getpeername($sock) or return undef;
 | 
						|
    my ($port, $iaddr) = Socket::sockaddr_in($pn);
 | 
						|
    return Socket::inet_ntoa($iaddr) . ":$port";
 | 
						|
}
 | 
						|
 | 
						|
# note the failure of a task given by its jobserver-specific handle
 | 
						|
sub _fail_jshandle {
 | 
						|
    my Gearman::Taskset $ts = shift;
 | 
						|
    my $shandle = shift;
 | 
						|
 | 
						|
    my $task_list = $ts->{waiting}{$shandle} or
 | 
						|
	die "Uhhhh:  got work_fail for unknown handle: $shandle\n";
 | 
						|
 | 
						|
    my Gearman::Task $task = shift @$task_list or
 | 
						|
	die "Uhhhh:  task_list is empty on work_fail for handle $shandle\n";
 | 
						|
 | 
						|
    $task->fail;
 | 
						|
    delete $ts->{waiting}{$shandle} unless @$task_list;
 | 
						|
}
 | 
						|
 | 
						|
sub _process_packet {
 | 
						|
    my Gearman::Taskset $ts = shift;
 | 
						|
    my ($res, $sock) = @_;
 | 
						|
 | 
						|
    if ($res->{type} eq "job_created") {
 | 
						|
        my Gearman::Task $task = shift @{ $ts->{need_handle} } or
 | 
						|
            die "Um, got an unexpected job_created notification";
 | 
						|
 | 
						|
        my $shandle = ${ $res->{'blobref'} };
 | 
						|
	my $ipport = _ip_port($sock);
 | 
						|
 | 
						|
	# did sock become disconnected in the meantime?
 | 
						|
	if (! $ipport) {
 | 
						|
	    $ts->_fail_jshandle($shandle);
 | 
						|
	    return 1;
 | 
						|
	}
 | 
						|
 | 
						|
        $task->handle("$ipport//$shandle");
 | 
						|
        push @{ $ts->{waiting}{$shandle} ||= [] }, $task;
 | 
						|
        return 1;
 | 
						|
    }
 | 
						|
 | 
						|
    if ($res->{type} eq "work_fail") {
 | 
						|
        my $shandle = ${ $res->{'blobref'} };
 | 
						|
	$ts->_fail_jshandle($shandle);
 | 
						|
        return 1;
 | 
						|
    }
 | 
						|
 | 
						|
    if ($res->{type} eq "work_complete") {
 | 
						|
        ${ $res->{'blobref'} } =~ s/^(.+?)\0//
 | 
						|
            or die "Bogus work_complete from server";
 | 
						|
        my $shandle = $1;
 | 
						|
 | 
						|
        my $task_list = $ts->{waiting}{$shandle} or
 | 
						|
            die "Uhhhh:  got work_complete for unknown handle: $shandle\n";
 | 
						|
 | 
						|
        my Gearman::Task $task = shift @$task_list or
 | 
						|
            die "Uhhhh:  task_list is empty on work_complete for handle $shandle\n";
 | 
						|
 | 
						|
        $task->complete($res->{'blobref'});
 | 
						|
        delete $ts->{waiting}{$shandle} unless @$task_list;
 | 
						|
 | 
						|
        return 1;
 | 
						|
    }
 | 
						|
 | 
						|
    if ($res->{type} eq "work_status") {
 | 
						|
        my ($shandle, $nu, $de) = split(/\0/, ${ $res->{'blobref'} });
 | 
						|
 | 
						|
        my $task_list = $ts->{waiting}{$shandle} or
 | 
						|
            die "Uhhhh:  got work_status for unknown handle: $shandle\n";
 | 
						|
 | 
						|
        # FIXME: the server is (probably) sending a work_status packet for each
 | 
						|
        # interested client, even if the clients are the same, so probably need
 | 
						|
        # to fix the server not to do that.  just put this FIXME here for now,
 | 
						|
        # though really it's a server issue.
 | 
						|
        foreach my Gearman::Task $task (@$task_list) {
 | 
						|
            $task->status($nu, $de);
 | 
						|
        }
 | 
						|
 | 
						|
        return 1;
 | 
						|
    }
 | 
						|
 | 
						|
    die "Unknown/unimplemented packet type: $res->{type}";
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
1;
 |