#!/usr/bin/perl
#
# Danga's Distributed Lock Daemon
#
# Status: 2004-05-18:  quick hack.  not for production yet.
#
# Copyright 2004, Danga Interactive
#
# Authors:
#   Brad Fitzpatrick <brad@danga.com>
#
# License:
#   undecided.
#

use strict;
use Getopt::Long;
use Carp;
use Danga::Socket;
use IO::Socket::INET;
use POSIX ();

use vars qw($DEBUG);
$DEBUG = 0;

my (
    $daemonize,
    $nokeepalive,
   );
my $conf_port = 7002;

Getopt::Long::GetOptions(
    'd|daemon'       => \$daemonize,
    'p|port=i'       => \$conf_port,
    'debug=i'        => \$DEBUG,
    'n|no-keepalive' => \$nokeepalive,
   );

daemonize() if $daemonize;

use Socket qw(IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);

# Linux-specific:
use constant TCP_KEEPIDLE  => 4; # Start keeplives after this period
use constant TCP_KEEPINTVL => 5; # Interval between keepalives
use constant TCP_KEEPCNT   => 6; # Number of keepalives before death

$SIG{'PIPE'} = "IGNORE";  # handled manually

# establish SERVER socket, bind and listen.
my $server = IO::Socket::INET->new(LocalPort => $conf_port,
                                   Type      => SOCK_STREAM,
                                   Proto     => IPPROTO_TCP,
                                   Blocking  => 0,
                                   Reuse     => 1,
                                   Listen    => 10 )
    or die "Error creating socket: $@\n";

# Not sure if I'm crazy or not, but I can't see in strace where/how
# Perl 5.6 sets blocking to 0 without this.  In Perl 5.8, IO::Socket::INET
# obviously sets it from watching strace.
IO::Handle::blocking($server, 0);

my $accept_handler = sub {
    my $csock = $server->accept();
    return unless $csock;

    printf("Listen child making a Client for %d.\n", fileno($csock))
	if $DEBUG;

    IO::Handle::blocking($csock, 0);
    setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;

    # Enable keep alive
    unless ( $nokeepalive ) {
        (setsockopt($csock, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1)) &&
         setsockopt($csock, IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30)) &&
         setsockopt($csock, IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 10)) &&
         setsockopt($csock, IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30)) &&
         1
        ) || die "Couldn't set keep-alive settings on socket (Not on Linux?)";
    }

    my $client = Client->new($csock);
    $client->watch_read(1);
};

Client->OtherFds(fileno($server) => $accept_handler);
Client->EventLoop();

sub daemonize {
    my($pid, $sess_id, $i);

    ## Fork and exit parent
    if ($pid = fork) { exit 0; }

    ## Detach ourselves from the terminal
    croak "Cannot detach from controlling terminal"
        unless $sess_id = POSIX::setsid();

    ## Prevent possibility of acquiring a controling terminal
    $SIG{'HUP'} = 'IGNORE';
    if ($pid = fork) { exit 0; }

    ## Change working directory
    chdir "/";

    ## Clear file creation mask
    umask 0;

    ## Close open file descriptors
    close(STDIN);
    close(STDOUT);
    close(STDERR);

    ## Reopen stderr, stdout, stdin to /dev/null
    open(STDIN,  "+>/dev/null");
    open(STDOUT, "+>&STDIN");
    open(STDERR, "+>&STDIN");
}

#####################################################################
### C L I E N T   C L A S S
#####################################################################
package Client;

use Danga::Socket;
use base 'Danga::Socket';
use fields (
            'locks',  # hashref of locks held by this connection. values are 1
            'read_buf',
	    );

our (%holder);  # hash of lock -> Client object holding it
# TODO: out %waiters, lock -> arrayref of client waiters (waker should check not closed)

sub new {
    my Client $self = shift;
    $self = fields::new($self) unless ref $self;
    $self->SUPER::new( @_ );

    $self->{locks} = {};
    $self->{read_buf} = '';
    return $self;
}

# Client
sub event_read {
    my Client $self = shift;

    my $bref = $self->read(1024);
    return $self->close() unless defined $bref;
    $self->{read_buf} .= $$bref;

    if ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
        my $line = $1;
        $self->process_line( $line );
    }
}

sub process_line {
    my Client $self = shift;
    my $line = shift;

    if ($line =~ /^(\w+)\s*(.*)/) {
        my ($cmd, $args) = ($1, $2);
        $cmd = lc($cmd);

        no strict 'refs';
        my $cmd_handler = *{"cmd_$cmd"}{CODE};
        if ($cmd_handler) {
            my $args = decode_url_args(\$args);
            $cmd_handler->($self, $args);
            next;
        }
    }

    return $self->err_line('unknown_command');
}

sub close {
    my Client $self = shift;

    foreach my $lock (keys %{$self->{locks}}) {
	_release_lock($self, $lock);
    }

    $self->SUPER::close;
}

sub _release_lock {
    my Client $self = shift;
    my $lock = shift;

    # TODO: notify waiters
    delete $self->{locks}{$lock};
    delete $holder{$lock};
    return 1;
}


# Client
sub event_err { my $self = shift; $self->close; }
sub event_hup { my $self = shift; $self->close; }


# gets a lock or fails with 'taken'
sub cmd_trylock {
    my Client $self = shift;
    my $args = shift;

    my $lock = $args->{lock};
    return $self->err_line("empty_lock") unless length($lock);
    return $self->err_line("taken") if defined $holder{$lock};

    $holder{$lock} = $self;
    $self->{locks}{$lock} = 1;

    return $self->ok_line();
}

# releases a lock or fails with 'didnthave'
sub cmd_releaselock {
    my Client $self = shift;
    my $args = shift;

    my $lock = $args->{lock};
    return $self->err_line("empty_lock") unless length($lock);
    return $self->err_line("didnthave") unless $self->{locks}{$lock};

    _release_lock($self, $lock);
    return $self->ok_line;
}

# shows current locks
sub cmd_locks {
    my Client $self = shift;
    my $args = shift;

    $self->write("LOCKS:\n");
    foreach my $k (sort keys %holder) {
	$self->write("  $k = " . $holder{$k}->as_string . "\n");
    }

    return 1;
}

sub cmd_noop {
    my Client $self = shift;
    # TODO: set self's last activity time so it isn't cleaned in a purge
    #       of stale connections?
    return $self->ok_line;
}

sub ok_line {
    my Client $self = shift;
    my $args = shift || {};
    my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
    $self->write("OK $argline\r\n");
    return 1;
}

sub err_line {
    my Client $self = shift;
    my $err_code = shift;
    my $err_text = {
        'unknown_command' => "Unknown server command",
    }->{$err_code};

    $self->write("ERR $err_code " . eurl($err_text) . "\r\n");
    return 0;
}

sub eurl
{
    my $a = $_[0];
    $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
    $a =~ tr/ /+/;
    return $a;
}

sub durl
{
    my ($a) = @_;
    $a =~ tr/+/ /;
    $a =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
    return $a;
}

sub decode_url_args
{
    my $a = shift;
    my $buffer = ref $a ? $a : \$a;
    my $ret = {};

    my $pair;
    my @pairs = split(/&/, $$buffer);
    my ($name, $value);
    foreach $pair (@pairs)
    {
        ($name, $value) = split(/=/, $pair);
        $value =~ tr/+/ /;
        $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $name =~ tr/+/ /;
        $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $ret->{$name} .= $ret->{$name} ? "\0$value" : $value;
    }
    return $ret;
}


# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End:
