ljr/wcmtools/gearman/server/gearmand

695 lines
16 KiB
Perl
Executable File

#!/usr/bin/perl
#
# Gearman
#
# Status: 2005-04-13
#
# Copyright 2005, Danga Interactive
#
# Authors:
# Brad Fitzpatrick <brad@danga.com>
# Brad Whitaker <whitaker@danga.com>
#
# License:
# terms of Perl itself.
#
use strict;
use Getopt::Long;
use Carp;
use Danga::Socket;
use IO::Socket::INET;
use POSIX ();
use lib '../lib';
use Gearman::Util;
use vars qw($DEBUG);
$DEBUG = 0;
my (
$daemonize,
$nokeepalive,
);
my $conf_port = 7003;
Getopt::Long::GetOptions(
'd|daemon' => \$daemonize,
'p|port=i' => \$conf_port,
'debug=i' => \$DEBUG,
);
daemonize() if $daemonize;
use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET);
$SIG{'PIPE'} = "IGNORE"; # handled manually
# establish SERVER socket, bind and listen.
my $server = IO::Socket::INET->new(LocalPort => $conf_port,
Type => SOCK_STREAM,
Proto => IPPROTO_TCP,
Blocking => 0,
Reuse => 1,
Listen => 10 )
or die "Error creating socket: $@\n";
# Not sure if I'm crazy or not, but I can't see in strace where/how
# Perl 5.6 sets blocking to 0 without this. In Perl 5.8, IO::Socket::INET
# obviously sets it from watching strace.
IO::Handle::blocking($server, 0);
my $accept_handler = sub {
my $csock = $server->accept();
return unless $csock;
printf("Listen child making a Client for %d.\n", fileno($csock))
if $DEBUG;
IO::Handle::blocking($csock, 0);
setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
my $client = Client->new($csock);
$client->watch_read(1);
};
Client->OtherFds(fileno($server) => $accept_handler);
sub daemonize {
my($pid, $sess_id, $i);
## Fork and exit parent
if ($pid = fork) { exit 0; }
## Detach ourselves from the terminal
croak "Cannot detach from controlling terminal"
unless $sess_id = POSIX::setsid();
## Prevent possibility of acquiring a controling terminal
$SIG{'HUP'} = 'IGNORE';
if ($pid = fork) { exit 0; }
## Change working directory
chdir "/";
## Clear file creation mask
umask 0;
## Close open file descriptors
close(STDIN);
close(STDOUT);
close(STDERR);
## Reopen stderr, stdout, stdin to /dev/null
open(STDIN, "+>/dev/null");
open(STDOUT, "+>&STDIN");
open(STDERR, "+>&STDIN");
}
#####################################################################
### Job definition
package Job;
use Sys::Hostname;
use fields (
'func',
'uniq',
'argref',
'listeners', # arrayref of interested Clients
'worker',
'handle',
'status', # [1, 100]
'require_listener',
);
our $handle_ct = 0;
our $handle_base = "H:" . hostname() . ":";
our %job_queue; # job_name -> [Job, Job*] (key only exists if non-empty)
our %jobOfHandle; # handle -> Job
our %jobOfUniq; # func -> uniq -> Job
#####################################################################
### Client definition
package Client;
use Danga::Socket;
use base 'Danga::Socket';
use fields (
'can_do', # { $job_name => 1 }
'can_do_list',
'can_do_iter',
'read_buf',
'sleeping', # 0/1: they've said they're sleeping and we haven't woken them up
'doing', # { $job_handle => Job }
'client_id', # opaque string, no whitespace. workers give this so checker scripts
# can tell apart the same worker connected to multiple jobservers.
);
#####################################################################
### J O B C L A S S
#####################################################################
package Job;
sub new {
my Job $self = shift;
my ($func, $uniq, $argref, $highpri) = @_;
$self = fields::new($self) unless ref $self;
# if they specified a uniq, see if we have a dup job running already
# to merge with
if (length($uniq)) {
# a unique value of "-" means "use my args as my unique key"
$uniq = $$argref if $uniq eq "-";
if ($jobOfUniq{$func} && $jobOfUniq{$func}{$uniq}) {
# found a match
return $jobOfUniq{$func}{$uniq};
} else {
# create a new key
$jobOfUniq{$func} ||= {};
$jobOfUniq{$func}{$uniq} = $self;
}
}
$self->{'func'} = $func;
$self->{'uniq'} = $uniq;
$self->{'require_listener'} = 1;
$self->{'argref'} = $argref;
$self->{'listeners'} = [];
$handle_ct++;
$self->{'handle'} = $handle_base . $handle_ct;
my $jq = ($job_queue{$func} ||= []);
if ($highpri) {
unshift @$jq, $self;
} else {
push @$jq, $self;
}
$jobOfHandle{$self->{'handle'}} = $self;
return $self;
}
sub Grab {
my ($class, $func) = @_;
return undef unless $job_queue{$func};
my $empty = sub {
delete $job_queue{$func};
return undef;
};
my Job $job;
while (1) {
$job = shift @{$job_queue{$func}};
return $empty->() unless $job;
return $job unless $job->{require_listener};
foreach my Client $c (@{$job->{listeners}}) {
return $job unless $c->{closed};
}
$job->note_finished(0);
}
}
sub GetByHandle {
my ($class, $handle) = @_;
return $jobOfHandle{$handle};
}
sub add_listener {
my Job $self = shift;
my Client $li = shift;
push @{$self->{listeners}}, $li;
}
sub relay_to_listeners {
my Job $self = shift;
foreach my Client $c (@{$self->{listeners}}) {
next if $c->{closed};
$c->write($_[0]);
}
}
sub note_finished {
my Job $self = shift;
my $success = shift;
if (length($self->{uniq})) {
delete $jobOfUniq{$self->{func}}{$self->{uniq}};
}
delete $jobOfHandle{$self->{handle}};
}
# accessors:
sub worker {
my Job $self = shift;
return $self->{'worker'} unless @_;
return $self->{'worker'} = shift;
}
sub require_listener {
my Job $self = shift;
return $self->{'require_listener'} unless @_;
return $self->{'require_listener'} = shift;
}
# takes arrayref of [numerator,denominator]
sub status {
my Job $self = shift;
return $self->{'status'} unless @_;
return $self->{'status'} = shift;
}
sub handle {
my Job $self = shift;
return $self->{'handle'};
}
sub func {
my Job $self = shift;
return $self->{'func'};
}
sub argref {
my Job $self = shift;
return $self->{'argref'};
}
#####################################################################
### C L I E N T C L A S S
#####################################################################
package Client;
our %sleepers; # func -> [ sleepers ] (wiped on wakeup)
our %client_map; # fd -> Client object
# Class Method:
sub new {
my Client $self = shift;
$self = fields::new($self) unless ref $self;
$self->SUPER::new( @_ );
$self->{read_buf} = '';
$self->{sleeping} = 0;
$self->{can_do} = {};
$self->{doing} = {}; # handle -> Job
$self->{can_do_list} = [];
$self->{can_do_iter} = 0; # numeric iterator for where we start looking for jobs
$self->{client_id} = "-";
$client_map{$self->{fd}} = $self;
return $self;
}
# Class Method:
sub WakeUpSleepers {
my ($class, $func) = @_;
return unless $sleepers{$func};
my Client $c;
foreach $c (@{$sleepers{$func}}) {
next if $c->{closed} || ! $c->{sleeping};
$c->res_packet("noop");
$c->{sleeping} = 0;
}
delete $sleepers{$func};
return;
}
sub close {
my Client $self = shift;
while (my ($handle, $job) = each %{$self->{doing}}) {
my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
$job->relay_to_listeners($msg);
$job->note_finished(0);
}
delete $client_map{$self->{fd}};
$self->CMD_reset_abilities;
$self->SUPER::close;
}
# Client
sub event_read {
my Client $self = shift;
my $bref = $self->read(1024);
return $self->close unless defined $bref;
$self->{read_buf} .= $$bref;
my $found_cmd;
do {
$found_cmd = 1;
my $blen = length($self->{read_buf});
if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) {
my ($cmd, $len) = unpack("NN", $1);
if ($blen < $len + 12) {
# not here yet.
$found_cmd = 0;
return;
}
$self->process_cmd($cmd, substr($self->{read_buf}, 12, $len));
# and slide down buf:
$self->{read_buf} = substr($self->{read_buf}, 12+$len);
} elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) {
# ASCII command case (useful for telnetting in)
my $line = $1;
$self->process_line($line);
} else {
$found_cmd = 0;
}
} while ($found_cmd);
}
# line-based commands
sub process_line {
my Client $self = shift;
my $line = shift;
if ($line =~ /^(\w+)\s*(.*)/) {
my ($cmd, $args) = ($1, $2);
$cmd = lc($cmd);
no strict 'refs';
my $cmd_handler = *{"TXTCMD_$cmd"}{CODE};
if ($cmd_handler) {
my $args = decode_url_args(\$args);
$cmd_handler->($self, $args);
next;
}
}
return $self->err_line('unknown_command');
}
sub TXTCMD_workers {
my Client $self = shift;
my $args = shift;
foreach my $fd (sort { $a <=> $b } keys %client_map) {
my Client $cl = $client_map{$fd};
$self->write("$fd " . $cl->peer_ip_string . " $cl->{client_id} : @{$cl->{can_do_list}}\n");
}
$self->write(".\n");
}
sub CMD_echo_req {
my Client $self = shift;
my $blobref = shift;
return $self->res_packet("echo_res", $$blobref);
}
sub CMD_work_status {
my Client $self = shift;
my $ar = shift;
my ($handle, $nu, $de) = split(/\0/, $$ar);
my $job = $self->{doing}{$handle};
return $self->error_packet("not_worker") unless $job && $job->worker == $self;
my $msg = Gearman::Util::pack_res_command("work_status", $$ar);
$job->relay_to_listeners($msg);
$job->status([$nu, $de]);
return 1;
}
sub CMD_work_complete {
my Client $self = shift;
my $ar = shift;
$$ar =~ s/^(.+?)\0//;
my $handle = $1;
my $job = delete $self->{doing}{$handle};
return $self->error_packet("not_worker") unless $job && $job->worker == $self;
my $msg = Gearman::Util::pack_res_command("work_complete", join("\0", $handle, $$ar));
$job->relay_to_listeners($msg);
$job->note_finished(1);
return 1;
}
sub CMD_work_fail {
my Client $self = shift;
my $ar = shift;
my $handle = $$ar;
my $job = delete $self->{doing}{$handle};
return $self->error_packet("not_worker") unless $job && $job->worker == $self;
my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
$job->relay_to_listeners($msg);
$job->note_finished(1);
return 1;
}
sub CMD_pre_sleep {
my Client $self = shift;
$self->{'sleeping'} = 1;
foreach my $cd (@{$self->{can_do_list}}) {
# immediately wake the sleeper up if there are things to be done
if ($job_queue{$cd}) {
$self->res_packet("noop");
$self->{sleeping} = 0;
return;
}
push @{$sleepers{$cd} ||= []}, $self;
}
return 1;
}
sub CMD_grab_job {
my Client $self = shift;
my $job;
my $can_do_size = scalar @{$self->{can_do_list}};
unless ($can_do_size) {
$self->res_packet("no_job");
return;
}
# the offset where we start asking for jobs, to prevent starvation
# of some job types.
$self->{can_do_iter} = ($self->{can_do_iter} + 1) % $can_do_size;
my $tried = 0;
while ($tried < $can_do_size) {
my $idx = ($tried + $self->{can_do_iter}) % $can_do_size;
$tried++;
my $job_to_grab = $self->{can_do_list}->[$idx];
$job = Job->Grab($job_to_grab);
if ($job) {
$job->worker($self);
$self->{doing}{$job->handle} = $job;
return $self->res_packet("job_assign",
join("\0",
$job->handle,
$job->func,
${$job->argref},
));
}
}
$self->res_packet("no_job");
}
sub CMD_can_do {
my Client $self = shift;
my $ar = shift;
$self->{can_do}->{$$ar} = 1;
$self->_setup_can_do_list;
}
sub CMD_set_client_id {
my Client $self = shift;
my $ar = shift;
$self->{client_id} = $$ar;
$self->{client_id} =~ s/\s+//g;
$self->{client_id} = "-" unless length $self->{client_id};
}
sub CMD_cant_do {
my Client $self = shift;
my $ar = shift;
delete $self->{can_do}->{$$ar};
$self->_setup_can_do_list;
}
sub CMD_get_status {
my Client $self = shift;
my $ar = shift;
my $job = Job->GetByHandle($$ar);
# handles can't contain nulls
return if $$ar =~ /\0/;
my ($known, $running, $num, $den);
$known = 0;
$running = 0;
if ($job) {
$known = 1;
$running = $job->worker ? 1 : 0;
if (my $stat = $job->status) {
($num, $den) = @$stat;
}
}
$self->res_packet("status_res", join("\0",
$$ar,
$known,
$running,
$num,
$den));
}
sub CMD_reset_abilities {
my Client $self = shift;
$self->{can_do} = {};
$self->_setup_can_do_list;
}
sub _setup_can_do_list {
my Client $self = shift;
$self->{can_do_list} = [ keys %{$self->{can_do}} ];
$self->{can_do_iter} = 0;
}
sub CMD_submit_job { push @_, 1; &_cmd_submit_job; }
sub CMD_submit_job_bg { push @_, 0; &_cmd_submit_job; }
sub CMD_submit_job_high { push @_, 1, 1; &_cmd_submit_job; }
sub _cmd_submit_job {
my Client $self = shift;
my $ar = shift;
my $subscribe = shift;
my $high_pri = shift;
return $self->error_packet("invalid_args", "No func/uniq header [$$ar].")
unless $$ar =~ s/^(.+?)\0(.*?)\0//;
my ($func, $uniq) = ($1, $2);
my $job = Job->new($func, $uniq, $ar, $high_pri);
if ($subscribe) {
$job->add_listener($self);
} else {
# background mode
$job->require_listener(0);
}
$self->res_packet("job_created", $job->handle);
Client->WakeUpSleepers($func);
}
sub res_packet {
my Client $self = shift;
my ($code, $arg) = @_;
$self->write(Gearman::Util::pack_res_command($code, $arg));
return 1;
}
sub error_packet {
my Client $self = shift;
my ($code, $msg) = @_;
$self->write(Gearman::Util::pack_res_command("error", "$code\0$msg"));
return 0;
}
sub process_cmd {
my Client $self = shift;
my $cmd = shift;
my $blob = shift;
my $cmd_name = "CMD_" . Gearman::Util::cmd_name($cmd);
my $ret = eval {
$self->$cmd_name(\$blob);
};
return $ret unless $@;
print "Error: $@\n";
return $self->error_packet("server_error", $@);
}
# Client
sub event_err { my $self = shift; $self->close; }
sub event_hup { my $self = shift; $self->close; }
sub err_line {
my Client $self = shift;
my $err_code = shift;
my $err_text = {
'unknown_command' => "Unknown server command",
}->{$err_code};
$self->write("ERR $err_code " . eurl($err_text) . "\r\n");
return 0;
}
sub eurl
{
my $a = $_[0];
$a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
$a =~ tr/ /+/;
return $a;
}
sub durl
{
my ($a) = @_;
$a =~ tr/+/ /;
$a =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
return $a;
}
sub decode_url_args
{
my $a = shift;
my $buffer = ref $a ? $a : \$a;
my $ret = {};
my $pair;
my @pairs = split(/&/, $$buffer);
my ($name, $value);
foreach $pair (@pairs)
{
($name, $value) = split(/=/, $pair);
$value =~ tr/+/ /;
$value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
$name =~ tr/+/ /;
$name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
$ret->{$name} .= $ret->{$name} ? "\0$value" : $value;
}
return $ret;
}
package main;
Client->EventLoop();
# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End: