#!/usr/bin/perl
#
# Gearman
#
# Status: 2005-04-13
#
# Copyright 2005, Danga Interactive
#
# Authors:
#   Brad Fitzpatrick <brad@danga.com>
#   Brad Whitaker <whitaker@danga.com>
#
# License:
#   terms of Perl itself.
#

use strict;
use Getopt::Long;
use Carp;
use Danga::Socket;
use IO::Socket::INET;
use POSIX ();
use lib '../lib';
use Gearman::Util;

use vars qw($DEBUG);
$DEBUG = 0;

my (
    $daemonize,
    $nokeepalive,
   );
my $conf_port = 7003;

Getopt::Long::GetOptions(
    'd|daemon'       => \$daemonize,
    'p|port=i'       => \$conf_port,
    'debug=i'        => \$DEBUG,
   );

daemonize() if $daemonize;

use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET);

$SIG{'PIPE'} = "IGNORE";  # handled manually

# establish SERVER socket, bind and listen.
my $server = IO::Socket::INET->new(LocalPort => $conf_port,
                                   Type      => SOCK_STREAM,
                                   Proto     => IPPROTO_TCP,
                                   Blocking  => 0,
                                   Reuse     => 1,
                                   Listen    => 10 )
    or die "Error creating socket: $@\n";

# Not sure if I'm crazy or not, but I can't see in strace where/how
# Perl 5.6 sets blocking to 0 without this.  In Perl 5.8, IO::Socket::INET
# obviously sets it from watching strace.
IO::Handle::blocking($server, 0);

my $accept_handler = sub {
    my $csock = $server->accept();
    return unless $csock;

    printf("Listen child making a Client for %d.\n", fileno($csock))
	if $DEBUG;

    IO::Handle::blocking($csock, 0);
    setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;

    my $client = Client->new($csock);
    $client->watch_read(1);
};

Client->OtherFds(fileno($server) => $accept_handler);

sub daemonize {
    my($pid, $sess_id, $i);

    ## Fork and exit parent
    if ($pid = fork) { exit 0; }

    ## Detach ourselves from the terminal
    croak "Cannot detach from controlling terminal"
        unless $sess_id = POSIX::setsid();

    ## Prevent possibility of acquiring a controling terminal
    $SIG{'HUP'} = 'IGNORE';
    if ($pid = fork) { exit 0; }

    ## Change working directory
    chdir "/";

    ## Clear file creation mask
    umask 0;

    ## Close open file descriptors
    close(STDIN);
    close(STDOUT);
    close(STDERR);

    ## Reopen stderr, stdout, stdin to /dev/null
    open(STDIN,  "+>/dev/null");
    open(STDOUT, "+>&STDIN");
    open(STDERR, "+>&STDIN");
}

#####################################################################
### Job definition
package Job;
use Sys::Hostname;

use fields (
            'func',
            'uniq',
            'argref',
            'listeners',  # arrayref of interested Clients
            'worker',
            'handle',
            'status',  # [1, 100]
            'require_listener',
            );

our $handle_ct = 0;
our $handle_base = "H:" . hostname() . ":";

our %job_queue;  # job_name -> [Job, Job*]   (key only exists if non-empty)
our %jobOfHandle;  # handle -> Job
our %jobOfUniq;   # func -> uniq -> Job

#####################################################################
### Client definition
package Client;

use Danga::Socket;
use base 'Danga::Socket';
use fields (
            'can_do',  # { $job_name => 1 }
            'can_do_list',
            'can_do_iter',
            'read_buf',
            'sleeping',   # 0/1:  they've said they're sleeping and we haven't woken them up
            'doing',  # { $job_handle => Job }
            'client_id',  # opaque string, no whitespace.  workers give this so checker scripts 
                          # can tell apart the same worker connected to multiple jobservers.
	    );


#####################################################################
### J O B   C L A S S
#####################################################################
package Job;

sub new {
    my Job $self = shift;
    my ($func, $uniq, $argref, $highpri) = @_;

    $self = fields::new($self) unless ref $self;

    # if they specified a uniq, see if we have a dup job running already
    # to merge with
    if (length($uniq)) {
        # a unique value of "-" means "use my args as my unique key"
        $uniq = $$argref if $uniq eq "-";
        if ($jobOfUniq{$func} && $jobOfUniq{$func}{$uniq}) {
            # found a match
            return $jobOfUniq{$func}{$uniq};
        } else {
            # create a new key
            $jobOfUniq{$func} ||= {};
            $jobOfUniq{$func}{$uniq} = $self;
        }
    }

    $self->{'func'} = $func;
    $self->{'uniq'} = $uniq;
    $self->{'require_listener'} = 1;
    $self->{'argref'} = $argref;
    $self->{'listeners'} = [];

    $handle_ct++;
    $self->{'handle'} = $handle_base . $handle_ct;

    my $jq = ($job_queue{$func} ||= []);
    if ($highpri) {
        unshift @$jq, $self;
    } else {
        push @$jq, $self;
    }

    $jobOfHandle{$self->{'handle'}} = $self;

    return $self;
}

sub Grab {
    my ($class, $func) = @_;
    return undef unless $job_queue{$func};

    my $empty = sub {
        delete $job_queue{$func};
        return undef;
    };

    my Job $job;
    while (1) {
        $job = shift @{$job_queue{$func}};
        return $empty->() unless $job;
        return $job unless $job->{require_listener};

        foreach my Client $c (@{$job->{listeners}}) {
            return $job unless $c->{closed};
        }
        $job->note_finished(0);
    }
}

sub GetByHandle {
    my ($class, $handle) = @_;
    return $jobOfHandle{$handle};
}

sub add_listener {
    my Job $self = shift;
    my Client $li = shift;
    push @{$self->{listeners}}, $li;
}

sub relay_to_listeners {
    my Job $self = shift;
    foreach my Client $c (@{$self->{listeners}}) {
        next if $c->{closed};
        $c->write($_[0]);
    }
}

sub note_finished {
    my Job $self = shift;
    my $success = shift;

    if (length($self->{uniq})) {
        delete $jobOfUniq{$self->{func}}{$self->{uniq}};
    }
    delete $jobOfHandle{$self->{handle}};
}

# accessors:
sub worker {
    my Job $self = shift;
    return $self->{'worker'} unless @_;
    return $self->{'worker'} = shift;
}
sub require_listener {
    my Job $self = shift;
    return $self->{'require_listener'} unless @_;
    return $self->{'require_listener'} = shift;
}

# takes arrayref of [numerator,denominator]
sub status {
    my Job $self = shift;
    return $self->{'status'} unless @_;
    return $self->{'status'} = shift;
}

sub handle {
    my Job $self = shift;
    return $self->{'handle'};
}

sub func {
    my Job $self = shift;
    return $self->{'func'};
}

sub argref {
    my Job $self = shift;
    return $self->{'argref'};
}


#####################################################################
### C L I E N T   C L A S S
#####################################################################
package Client;

our %sleepers;  # func -> [ sleepers ]  (wiped on wakeup)

our %client_map;  # fd -> Client object

# Class Method:
sub new {
    my Client $self = shift;
    $self = fields::new($self) unless ref $self;
    $self->SUPER::new( @_ );

    $self->{read_buf} = '';
    $self->{sleeping} = 0;
    $self->{can_do} = {};
    $self->{doing} = {};       # handle -> Job
    $self->{can_do_list} = [];
    $self->{can_do_iter} = 0;  # numeric iterator for where we start looking for jobs
    $self->{client_id} = "-";

    $client_map{$self->{fd}} = $self;

    return $self;
}

# Class Method:
sub WakeUpSleepers {
    my ($class, $func) = @_;
    return unless $sleepers{$func};
    my Client $c;
    foreach $c (@{$sleepers{$func}}) {
        next if $c->{closed} || ! $c->{sleeping};
        $c->res_packet("noop");
        $c->{sleeping} = 0;
    }
    delete $sleepers{$func};
    return;
}


sub close {
    my Client $self = shift;

    while (my ($handle, $job) = each %{$self->{doing}}) {
        my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
        $job->relay_to_listeners($msg);
        $job->note_finished(0);
    }

    delete $client_map{$self->{fd}};
    $self->CMD_reset_abilities;

    $self->SUPER::close;
}

# Client
sub event_read {
    my Client $self = shift;

    my $bref = $self->read(1024);
    return $self->close unless defined $bref;
    $self->{read_buf} .= $$bref;

    my $found_cmd;
    do {
        $found_cmd = 1;
        my $blen = length($self->{read_buf});

        if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) {
            my ($cmd, $len) = unpack("NN", $1);
            if ($blen < $len + 12) {
                # not here yet.
                $found_cmd = 0;
                return;
            }

            $self->process_cmd($cmd, substr($self->{read_buf}, 12, $len));

            # and slide down buf:
            $self->{read_buf} = substr($self->{read_buf}, 12+$len);

        } elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) {
            # ASCII command case (useful for telnetting in)
            my $line = $1;
            $self->process_line($line);
        } else {
            $found_cmd = 0;
        }
    } while ($found_cmd);
}

# line-based commands
sub process_line {
    my Client $self = shift;
    my $line = shift;

    if ($line =~ /^(\w+)\s*(.*)/) {
        my ($cmd, $args) = ($1, $2);
        $cmd = lc($cmd);

        no strict 'refs';
        my $cmd_handler = *{"TXTCMD_$cmd"}{CODE};
        if ($cmd_handler) {
            my $args = decode_url_args(\$args);
            $cmd_handler->($self, $args);
            next;
        }
    }

    return $self->err_line('unknown_command');
}

sub TXTCMD_workers {
    my Client $self = shift;
    my $args = shift;

    foreach my $fd (sort { $a <=> $b } keys %client_map) {
        my Client $cl = $client_map{$fd};
        $self->write("$fd " . $cl->peer_ip_string . " $cl->{client_id} : @{$cl->{can_do_list}}\n");

    }
    $self->write(".\n");
}

sub CMD_echo_req {
    my Client $self = shift;
    my $blobref = shift;

    return $self->res_packet("echo_res", $$blobref);
}

sub CMD_work_status {
    my Client $self = shift;
    my $ar = shift;
    my ($handle, $nu, $de) = split(/\0/, $$ar);

    my $job = $self->{doing}{$handle};
    return $self->error_packet("not_worker") unless $job && $job->worker == $self;

    my $msg = Gearman::Util::pack_res_command("work_status", $$ar);
    $job->relay_to_listeners($msg);
    $job->status([$nu, $de]);
    return 1;
}

sub CMD_work_complete {
    my Client $self = shift;
    my $ar = shift;

    $$ar =~ s/^(.+?)\0//;
    my $handle = $1;

    my $job = delete $self->{doing}{$handle};
    return $self->error_packet("not_worker") unless $job && $job->worker == $self;

    my $msg = Gearman::Util::pack_res_command("work_complete", join("\0", $handle, $$ar));
    $job->relay_to_listeners($msg);
    $job->note_finished(1);

    return 1;
}

sub CMD_work_fail {
    my Client $self = shift;
    my $ar = shift;
    my $handle = $$ar;
    my $job = delete $self->{doing}{$handle};
    return $self->error_packet("not_worker") unless $job && $job->worker == $self;

    my $msg = Gearman::Util::pack_res_command("work_fail", $handle);
    $job->relay_to_listeners($msg);
    $job->note_finished(1);
    return 1;
}

sub CMD_pre_sleep {
    my Client $self = shift;
    $self->{'sleeping'} = 1;

    foreach my $cd (@{$self->{can_do_list}}) {

        # immediately wake the sleeper up if there are things to be done
        if ($job_queue{$cd}) {
            $self->res_packet("noop");
            $self->{sleeping} = 0;
            return;
        }

        push @{$sleepers{$cd} ||= []}, $self;
    }
    return 1;
}

sub CMD_grab_job {
    my Client $self = shift;

    my $job;
    my $can_do_size = scalar @{$self->{can_do_list}};

    unless ($can_do_size) {
        $self->res_packet("no_job");
        return;
    }

    # the offset where we start asking for jobs, to prevent starvation
    # of some job types.
    $self->{can_do_iter} = ($self->{can_do_iter} + 1) % $can_do_size;

    my $tried = 0;
    while ($tried < $can_do_size) {
        my $idx = ($tried + $self->{can_do_iter}) % $can_do_size;
        $tried++;
        my $job_to_grab = $self->{can_do_list}->[$idx];
        $job = Job->Grab($job_to_grab);
        if ($job) {
            $job->worker($self);
            $self->{doing}{$job->handle} = $job;
            return $self->res_packet("job_assign",
                                     join("\0",
                                          $job->handle,
                                          $job->func,
                                          ${$job->argref},
                                          ));
        }
    }

    $self->res_packet("no_job");
}

sub CMD_can_do {
    my Client $self = shift;
    my $ar = shift;

    $self->{can_do}->{$$ar} = 1;
    $self->_setup_can_do_list;
}

sub CMD_set_client_id {
    my Client $self = shift;
    my $ar = shift;

    $self->{client_id} = $$ar;
    $self->{client_id} =~ s/\s+//g;
    $self->{client_id} = "-" unless length $self->{client_id};
}

sub CMD_cant_do {
    my Client $self = shift;
    my $ar = shift;

    delete $self->{can_do}->{$$ar};
    $self->_setup_can_do_list;
}

sub CMD_get_status {
    my Client $self = shift;
    my $ar = shift;
    my $job = Job->GetByHandle($$ar);

    # handles can't contain nulls
    return if $$ar =~ /\0/;

    my ($known, $running, $num, $den);
    $known = 0;
    $running = 0;
    if ($job) {
        $known = 1;
        $running = $job->worker ? 1 : 0;
        if (my $stat = $job->status) {
            ($num, $den) = @$stat;
        }
    }

    $self->res_packet("status_res", join("\0",
                                         $$ar,
                                         $known,
                                         $running,
                                         $num,
                                         $den));
}

sub CMD_reset_abilities {
    my Client $self = shift;

    $self->{can_do} = {};
    $self->_setup_can_do_list;
}

sub _setup_can_do_list {
    my Client $self = shift;
    $self->{can_do_list} = [ keys %{$self->{can_do}} ];
    $self->{can_do_iter} = 0;
}

sub CMD_submit_job    {  push @_, 1; &_cmd_submit_job; }
sub CMD_submit_job_bg {  push @_, 0; &_cmd_submit_job; }
sub CMD_submit_job_high {  push @_, 1, 1; &_cmd_submit_job; }

sub _cmd_submit_job {
    my Client $self = shift;
    my $ar = shift;
    my $subscribe = shift;
    my $high_pri = shift;

    return $self->error_packet("invalid_args", "No func/uniq header [$$ar].")
        unless $$ar =~ s/^(.+?)\0(.*?)\0//;

    my ($func, $uniq) = ($1, $2);

    my $job = Job->new($func, $uniq, $ar, $high_pri);

    if ($subscribe) {
        $job->add_listener($self);
    } else {
        # background mode
        $job->require_listener(0);
    }

    $self->res_packet("job_created", $job->handle);
    Client->WakeUpSleepers($func);
}

sub res_packet {
    my Client $self = shift;
    my ($code, $arg) = @_;
    $self->write(Gearman::Util::pack_res_command($code, $arg));
    return 1;
}

sub error_packet {
    my Client $self = shift;
    my ($code, $msg) = @_;
    $self->write(Gearman::Util::pack_res_command("error", "$code\0$msg"));
    return 0;
}

sub process_cmd {
    my Client $self = shift;
    my $cmd = shift;
    my $blob = shift;

    my $cmd_name = "CMD_" . Gearman::Util::cmd_name($cmd);
    my $ret = eval {
        $self->$cmd_name(\$blob);
    };
    return $ret unless $@;
    print "Error: $@\n";
    return $self->error_packet("server_error", $@);
}

# Client
sub event_err { my $self = shift; $self->close; }
sub event_hup { my $self = shift; $self->close; }

sub err_line {
    my Client $self = shift;
    my $err_code = shift;
    my $err_text = {
        'unknown_command' => "Unknown server command",
    }->{$err_code};

    $self->write("ERR $err_code " . eurl($err_text) . "\r\n");
    return 0;
}

sub eurl
{
    my $a = $_[0];
    $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
    $a =~ tr/ /+/;
    return $a;
}

sub durl
{
    my ($a) = @_;
    $a =~ tr/+/ /;
    $a =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
    return $a;
}

sub decode_url_args
{
    my $a = shift;
    my $buffer = ref $a ? $a : \$a;
    my $ret = {};

    my $pair;
    my @pairs = split(/&/, $$buffer);
    my ($name, $value);
    foreach $pair (@pairs)
    {
        ($name, $value) = split(/=/, $pair);
        $value =~ tr/+/ /;
        $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $name =~ tr/+/ /;
        $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $ret->{$name} .= $ret->{$name} ? "\0$value" : $value;
    }
    return $ret;
}

package main;
Client->EventLoop();

# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End:
