#!/usr/bin/perl
#
# Danga's Delayed Insert Daemon
#
#    ... because MySQL forces connections to be threads, limiting
#        total connections
#
#    ... and because TCP makes it so easy to run out of local ports
#
# Status: 2004-12-22:  experimental hack.
#
# Copyright 2004, Danga Interactive
#
# Authors:
#   Brad Fitzpatrick <brad@danga.com>
#
# License:
#   undecided.
#

use strict;
use Getopt::Long;
use Carp;
use Danga::Socket;
use IO::Socket::INET;
use POSIX ();

use vars qw($DEBUG);
$DEBUG = 0;

my (
    $daemonize,
    $nokeepalive,
   );
my $conf_port = 7400;

Getopt::Long::GetOptions(
    'd|daemon'       => \$daemonize,
    'p|port=i'       => \$conf_port,
    'debug=i'        => \$DEBUG,
    'n|no-keepalive' => \$nokeepalive,
   );

daemonize() if $daemonize;

use Socket qw(IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY SOL_SOCKET);

# Linux-specific:
use constant TCP_KEEPIDLE  => 4; # Start keeplives after this period
use constant TCP_KEEPINTVL => 5; # Interval between keepalives
use constant TCP_KEEPCNT   => 6; # Number of keepalives before death

$SIG{'PIPE'} = "IGNORE";  # handled manually

# establish SERVER socket, bind and listen.
my $server = IO::Socket::INET->new(LocalPort => $conf_port,
                                   Type      => SOCK_STREAM,
                                   Proto     => IPPROTO_TCP,
                                   Blocking  => 0,
                                   Reuse     => 1,
                                   Listen    => 10 )
    or die "Error creating socket: $@\n";

# Not sure if I'm crazy or not, but I can't see in strace where/how
# Perl 5.6 sets blocking to 0 without this.  In Perl 5.8, IO::Socket::INET
# obviously sets it from watching strace.
IO::Handle::blocking($server, 0);

my $accept_handler = sub {
    my $csock = $server->accept();
    return unless $csock;

    printf("Listen child making a Client for %d.\n", fileno($csock))
	if $DEBUG;

    IO::Handle::blocking($csock, 0);
    setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;

    # Enable keep alive
    unless ( $nokeepalive ) {
        (setsockopt($csock, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1)) &&
         setsockopt($csock, IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30)) &&
         setsockopt($csock, IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 10)) &&
         setsockopt($csock, IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30)) &&
         1
        ) || die "Couldn't set keep-alive settings on socket (Not on Linux?)";
    }

    my $client = Client->new($csock);
    $client->watch_read(1);
};

Client->Init;
Danga::Socket->OtherFds(fileno($server) => $accept_handler);
Danga::Socket->EventLoop();

sub daemonize {
    my($pid, $sess_id, $i);

    ## Fork and exit parent
    if ($pid = fork) { exit 0; }

    ## Detach ourselves from the terminal
    croak "Cannot detach from controlling terminal"
        unless $sess_id = POSIX::setsid();

    ## Prevent possibility of acquiring a controling terminal
    $SIG{'HUP'} = 'IGNORE';
    if ($pid = fork) { exit 0; }

    ## Change working directory
    chdir "/";

    ## Clear file creation mask
    umask 0;

    ## Close open file descriptors
    close(STDIN);
    close(STDOUT);
    close(STDERR);

    ## Reopen stderr, stdout, stdin to /dev/null
    open(STDIN,  "+>/dev/null");
    open(STDOUT, "+>&STDIN");
    open(STDERR, "+>&STDIN");
}

#####################################################################
### C L I E N T   C L A S S
#####################################################################
package Client;

use strict;
use Danga::Socket;
use base 'Danga::Socket';
use fields (
            'read_buf',
            'listen_queue_num',   # undef, its own fd, or -1 if listening to system queue 
	    );

our %queue;   # fd -> [ [ $table, $values ] ... ]
our %note;    # arbitrary client-generated key/value data

our @listeners;  # client objects that are listening

our $MAX_QUEUE_DEPTH;

our $is_system_attached;  # bool: if somebody is watching the system queue

sub Init {
    $MAX_QUEUE_DEPTH = 5000;

    #fd=-1 is magic and is the system default queue, which always exists
    $queue{-1} = [];
    $is_system_attached = 0;
}

sub new {
    my Client $self = shift;
    $self = fields::new($self) unless ref $self;
    $self->SUPER::new( @_ );

    $self->{read_buf} = '';
    $self->{listen_queue_num} = undef;
    return $self;
}

# Client
sub event_read {
    my Client $self = shift;

    my $bref = $self->read(1024);
    return $self->close() unless defined $bref;
    $self->{read_buf} .= $$bref;

    if ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
        my $line = $1;
        $self->process_line( $line );
    }
}

sub event_write {
    my Client $self = shift;

    # stop watching for writability if we're not subscribed to anything
    unless (defined $self->{listen_queue_num}) {
        $self->watch_write(0);
        return;
    }

    my $q = $queue{$self->{listen_queue_num}};

    while (@$q) {
        my $rec = shift @$q;
        next if $self->write("ROW $rec->[0] $rec->[1]\r\n");
        print "  Buffer was full!\n";
        return;
    }
    $self->watch_write(0);
}

sub process_line {
    my Client $self = shift;
    my $line = shift;

    if ($line =~ /^(\w+)\s*(.*)/) {
        my ($cmd, $args) = ($1, $2);
        $cmd = lc($cmd);

        no strict 'refs';
        my $cmd_handler = *{"cmd_$cmd"}{CODE};
        if ($cmd_handler) {
            $cmd_handler->($self, $args);
            next;
        }
    }

    return $self->err_line('unknown_command');
}

# Client
sub event_err { my $self = shift; $self->close; }
sub event_hup { my $self = shift; $self->close; }


# gets a lock or fails with 'taken'
sub cmd_set_note {
    my Client $self = shift;
    my $args = shift;
    return $self->err_line("bogus_format")
        unless $args =~ /(\S+)\s+(.+)/;
    $note{$1} = $2;
    return $self->ok_line;
}

# gets a lock or fails with 'taken'
sub cmd_get_note {
    my Client $self = shift;
    my $args = shift;
    return $self->err_line("bogus_format")
        unless $args =~ /(\S+)/;

    $self->write("NOTE $note{$1}\r\n");
    return 1;
}

# gets a lock or fails with 'taken'
sub cmd_insert {
    my Client $self = shift;
    my $args = shift;

    return $self->err_line("bogus_format")
        unless $args =~ /(\w+)\s+(.+)/;

    my $rec = [ $1, $2 ];
    foreach my $fd (keys %queue) {
        my $q = $queue{$fd};
        shift @$q if scalar @$q >= $MAX_QUEUE_DEPTH;
        push @$q, $rec;
    }

    foreach (@listeners) {
        $_->watch_write(1);
    }

    return $self->ok_line;
}

sub close {
    my Client $self = shift;

    # remove ourselves from the listeners array
    @listeners = grep { $_ != $self } @listeners;

    # delete our queue, unless it's the system queue
    if ($self->{listen_queue_num} != -1) {
        delete $queue{$self->{listen_queue_num}};
    } else {
        $is_system_attached = 0;
    }

    $self->SUPER::close;
}

sub cmd_subscribe {
    my Client $self = shift;
    my $args = shift;

    my $which_fd = undef;
    if ($args =~ /system/) {
        return $self->err_line("dup_sys") if $is_system_attached++;
        $which_fd = -1;
    } else {
        $which_fd = $self->{fd};
    }

    $self->{listen_queue_num} = $which_fd;
    push @listeners, $self;

    $queue{$which_fd} ||= [];
    $self->watch_write(1);
    return 1;
}


# shows current locks
sub cmd_locks {
    my Client $self = shift;
    my $args = shift;

    $self->write("LOCKS:\n");

    return 1;
}

sub cmd_noop {
    my Client $self = shift;
    # TODO: set self's last activity time so it isn't cleaned in a purge
    #       of stale connections?
    return $self->ok_line;
}

sub ok_line {
    my Client $self = shift;
    my $args = shift || {};
    my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
    $self->write("OK $argline\r\n");
    return 1;
}

sub err_line {
    my Client $self = shift;
    my $err_code = shift;
    my $err_text = {
        'unknown_command' => "Unknown server command",
        'dup_sys' => "Can't have two listeners on the system log",
    }->{$err_code};

    $self->write("ERR $err_code " . eurl($err_text) . "\r\n");
    return 0;
}

sub eurl
{
    my $a = $_[0];
    $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
    $a =~ tr/ /+/;
    return $a;
}

sub durl
{
    my ($a) = @_;
    $a =~ tr/+/ /;
    $a =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
    return $a;
}

sub decode_url_args
{
    my $a = shift;
    my $buffer = ref $a ? $a : \$a;
    my $ret = {};

    my $pair;
    my @pairs = split(/&/, $$buffer);
    my ($name, $value);
    foreach $pair (@pairs)
    {
        ($name, $value) = split(/=/, $pair);
        $value =~ tr/+/ /;
        $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $name =~ tr/+/ /;
        $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $ret->{$name} .= $ret->{$name} ? "\0$value" : $value;
    }
    return $ret;
}


# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End:
