#!/usr/bin/perl
#
# MogileFS daemon - HEAVILY UNDER CONSTRUCTION
#
# Copyright 2004, Danga Interactive
#
# Authors:
#   Brad Fitzpatrick <brad@danga.com>
#   Brad Whitaker    <whitaker@danga.com>
#   Mark Smith       <junior@danga.com>
#
# License:
#   undecided.
#

package Mgd;

# don't run as root
die "mogilefsd cannot be run as root\n"
    if $< == 0;

use strict;
use Getopt::Long;
use IO::Socket;
use Symbol;
use POSIX ":sys_wait_h"; # argument for waitpid
use POSIX;
use DBI;
use DBD::mysql;
use File::Copy ();
use Carp;
use File::Basename ();
use File::Path ();
use Sys::Syslog;
use Socket qw(MSG_NOSIGNAL);
use Time::HiRes qw(gettimeofday tv_interval);
use Net::Netmask;
use LWP::UserAgent;


#####################################################################
### C O N F I G
#####################################################################

use vars qw($dbh $DEFAULT_CONFIG $DEFAULT_MOG_ROOT $MOG_ROOT $MOGSTORED_STREAM_PORT $DEBUG $USE_HTTP $FLAG_NOSIGNAL);

$DEFAULT_CONFIG = "/etc/mogilefs/mogilefsd.conf";
$DEFAULT_MOG_ROOT = "/mnt/mogilefs";
$MOGSTORED_STREAM_PORT = 7501;
$DEBUG = 0;

# used in send() calls to request not to get SIGPIPEd
eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL };

my (
    %cmdline,
    %cfgfile,
    $config,
    $skipconfig,
    $daemonize,
    $db_dsn,
    $db_user,
    $db_pass,
    $conf_port,
    $query_jobs,
    $delete_jobs,
    $replicate_jobs,
    $reaper_jobs,
    $monitor_jobs,
    $mog_root,
    $default_mindevcount,
    $worker_port,
    $upgrade,
    $min_free_space,
    $max_disk_age,
    $node_timeout,          # time in seconds to wait for storage node responses
   );

# Command-line options will override
Getopt::Long::Configure( "bundling" );
Getopt::Long::GetOptions(
    'c|config=s'    => \$config,
    's|skipconfig'  => \$skipconfig,
    'd|debug+'      => \$cmdline{debug},
    'D|daemon'      => \$cmdline{daemonize},
    'dsn=s'         => \$cmdline{db_dsn},
    'dbuser=s'      => \$cmdline{db_user},
    'dbpass=s'      => \$cmdline{db_pass},
    'r|mogroot=s'   => \$cmdline{mog_root},
    'p|confport=i'  => \$cmdline{conf_port},
    'w|workers=i'   => \$cmdline{query_jobs},
    'no_http'       => \$cmdline{no_http},
    'workerport=i'  => \$cmdline{worker_port},
    'upgrade'       => \$upgrade,
    'maxdiskage=i'  => \$cmdline{max_disk_age},
    'minfreespace=i' => \$cmdline{min_free_space},
    'default_mindevcount=i' => \$cmdline{default_mindevcount},
    'node_timeout=i' => \$cmdline{node_timeout},
);

$config = $DEFAULT_CONFIG if !$config && -r $DEFAULT_CONFIG;

# Read the config file if one was specified
if ( $config && !$skipconfig ) {
    open my $cf, "<$config" or die "open: $config: $!";

    my $configLine = qr{
        ^\s*                    # Leading space
        (\w+)                   # Key
        \s+ =? \s*              # space + optional equal + optional space
        (.+?)                   # Value
        \s*$                    # Trailing space
    }x;

    my $linecount = 0;
    while (defined( my $line = <$cf> )) {
        $linecount++;
        next if $line =~ m{^\s*(#.*)?$};
        die "Malformed config file (line $linecount)" unless $line =~ $configLine;

        my ( $key, $value ) = ( $1, $2 );
        print STDERR "Setting '$key' to '$value'\n" if $cmdline{debug};
        $cfgfile{ $key } = $value;
    }

    close $cf;
}

### FUNCTION: choose_value( $name, $default[, $boolean] )
sub choose_value ($$;$) {
    my ( $name, $default, $boolean ) = @_;

    return $cmdline{$name} if defined $cmdline{$name};
    return $cfgfile{$name} if defined $cfgfile{$name};
    return $default;
}


# Fill in defaults for those values which were either loaded from config or
# specified on the command line. Command line takes precendence, then values in
# the config file, then the defaults.
$daemonize      = choose_value( 'daemonize', 0, 1 );
$db_dsn         = choose_value( 'db_dsn', "DBI:mysql:mogilefs" );
$db_user        = choose_value( 'db_user', "mogile" );
$db_pass        = choose_value( 'db_pass', "", 1 );
$conf_port      = choose_value( 'conf_port', 7001 );
$MOG_ROOT       = choose_value( 'mog_root', $DEFAULT_MOG_ROOT );
$query_jobs     = choose_value( 'listener_jobs', undef) || # undef if not present, then we
                  choose_value( 'query_jobs', 20 );       # fall back to query_jobs, new name
$delete_jobs    = choose_value( 'delete_jobs', 1 );
$replicate_jobs = choose_value( 'replicate_jobs', 1 );
$reaper_jobs    = choose_value( 'reaper_jobs', 1 );
$monitor_jobs   = choose_value( 'monitor_jobs', 1 );
$worker_port    = choose_value( 'worker_port', 7200 );
$min_free_space = choose_value( 'min_free_space', 100 );
$max_disk_age   = choose_value( 'max_disk_age', 5 );
$DEBUG          = choose_value( 'debug', 0, 1 );
$USE_HTTP       = ! choose_value( 'no_http', 0, 1);
$default_mindevcount = choose_value( 'default_mindevcount', 2 );
$node_timeout   = choose_value( 'node_timeout', 3 );

### initial setup
Mgd::validate_dbh();
my $dbh = Mgd::get_dbh();
unless ($dbh) {
    die <<NODB;
Error: unable to establish connection with your MogileFS database.

Please verify that you have correctly setup a configuration file or are
providing the correct information in order to reach the database and try
running the MogileFS server again.
NODB
}

### make sure they have a database setup?
my $host = $dbh->selectrow_hashref('SELECT * FROM host LIMIT 1');
if ($dbh->err) {
    my $err = $dbh->errstr;
    my $db = ($db_dsn =~ /^DBI:mysql:(.+)$/) ? $1 : '<db_name>';
    die <<ERR;
Database error: $err

This is commonly caused by not having the tables created in the database.
The easiest way to setup your database is to do something like this:

    cat devnotes/sql.txt | mysql -u$db_user -p $db

NOTE: This will OVERWRITE any MogileFS related tablase in the database.
Please make SURE that this is what you want to do before you do this!
ERR
}

### but if it was undef, no hosts?
if ($host) {
    # see if they have the get port, else update it
    unless (exists $host->{http_get_port}) {
        if ($upgrade) {
            print STDERR "Updating host table...\n";
            $dbh->do("ALTER TABLE host ADD COLUMN http_get_port MEDIUMINT UNSIGNED AFTER http_port");
            die "Error: " . $dbh->errstr . "\n" if $dbh->err;
        } else {
            die <<NEEDUPDATE;
Error: host table out of date.

MogileFS needs to update your database schema.  Please rerun the MogileFS
server with the --upgrade option if you wish for us to do this.

This will NOT destroy any existing data.
NEEDUPDATE
        }
    }

    # now update to add new columns?
    unless (exists $host->{altip}) {
        if ($upgrade) {
            print STDERR "Updating host table...\n";
            $dbh->do("ALTER TABLE host ADD COLUMN altip VARCHAR(15) AFTER hostip");
            die "Error (1): " . $dbh->errstr . "\n" if $dbh->err;
            $dbh->do("ALTER TABLE host ADD COLUMN altmask VARCHAR(18) AFTER altip");
            die "Error (2): " . $dbh->errstr . "\n" if $dbh->err;
            $dbh->do("ALTER TABLE host ADD UNIQUE altip (altip)");
            die "Error (3): " . $dbh->errstr . "\n" if $dbh->err;
        } else {
            die <<NEEDUPDATE;
Error: host table out of date.

MogileFS needs to update your database schema.  Please rerun the MogileFS
server with the --upgrade option if you wish for us to do this.

This will NOT destroy any existing data.
NEEDUPDATE
        }
    }
} else {
    die <<NOHOSTS;
Error: no hosts found.

It seems like you don't have any hosts defined in your MogileFS setup.
This means that we really can't do anything, so we're going to shut down.
NOHOSTS
}

### and now check for devices
my $device = $dbh->selectrow_hashref('SELECT * FROM device LIMIT 1');
if ($device) {
    unless (exists $device->{mb_asof}) {
        if ($upgrade) {
            print STDERR "Updating device table...\n";
            $dbh->do("ALTER TABLE device ADD COLUMN mb_asof INT(10) UNSIGNED AFTER mb_used");
            die "Error: " . $dbh->errstr . "\n" if $dbh->err;
        } else {
            die <<NEEDUPDATE;
Error: device table out of date.

MogileFS needs to update your database schema.  Please rerun the MogileFS
server with the --upgrade option if you wish for us to do this.

This will NOT destroy any existing data.
NEEDUPDATE
        }
    }
} else {
    die <<NODEVICES;
Error: no devices found.

It seems like you don't have any devices defined in your MogileFS setup.
This means that we really can't do anything, so we're going to shut down.
NODEVICES
}

# see if they have the new fid table
my $fid = $dbh->selectrow_array('SELECT fid FROM unreachable_fids LIMIT 1');
if ($dbh->err) {
    if ($upgrade) {
        print STDERR "Creating unreachable_fids table...\n";
        $dbh->do("CREATE TABLE unreachable_fids (" .
                 "    fid        INT UNSIGNED NOT NULL," .
                 "    lastupdate INT UNSIGNED NOT NULL," .
                 "    PRIMARY KEY (fid)," .
                 "    INDEX (lastupdate)" .
                 ")");
        die "Error: " . $dbh->errstr . "\n" if $dbh->err;
    } else {
        die <<NEEDUPDATE;
Error: database schema out of date.

MogileFS needs to update your database schema.  Please rerun the MogileFS
server with the --upgrade option if you wish for us to do this.

This will NOT destroy any existing data.
NEEDUPDATE

    }
}

# now check for an updated tempfile table
my $devids = $dbh->selectrow_array('SELECT devids FROM tempfile LIMIT 1');
if ($dbh->err) {
    if ($upgrade) {
        print STDERR "Updating tempfile table...\n";
        $dbh->do("ALTER TABLE tempfile ADD COLUMN devids VARCHAR(60)");
        die "Error: " . $dbh->errstr . "\n" if $dbh->err;
    } else {
        die <<NEEDUPDATE;
Error: database schema out of date.

MogileFS needs to update your database schema.  Please rerun the MogileFS
server with the --upgrade option if you wish for us to do this.

This will NOT destroy any existing data.
NEEDUPDATE
    }
}

# we're done with this, so undef it before we start forking, as then
# maybe we'll end up with children having the same socket and everybody
# writing to it at the same time...
undef $dbh;

#####################################################################
### D A E M O N   F U N C T I O N S
#####################################################################

daemonize() if $daemonize;

# keep track of what all child pids are doing, and what jobs are being
# satisifed.
my %child  = ();    # pid -> job
my %todie  = ();    # pid -> 1 (lists pids that we've asked to die)
my %jobs   = ();    # jobname -> [ min, current ]
my $psock;          # IO::Socket::INET connection to parent (undef if parent)
my %streamcache;    # host -> IO::Socket::INET to mogstored
my $lastspawntime = 0; # time we last ran spawn_children sub
our $allkidsup = 0;  # if true, all our kids are running. set to 0 when a kid dies.
our $starttime = time; # time we got going
our %domaincache; # { domainname => { domainrow } }
our $domaincachetime = 0;
our $client_ip = undef; # client ip address
our $force_alt_zone = 0; # if on, force to use alternate zone (if it's defined)

# [ what we want to be at, what we are at ]
$jobs{'queryworker'} = [ $query_jobs, 0 ];
$jobs{'delete'} = [ $delete_jobs, 0 ];
$jobs{'replicate'} = [ $replicate_jobs, 0 ];
$jobs{'reaper'} = [ $reaper_jobs, 0 ];
$jobs{'monitor'} = [ $monitor_jobs, 0 ];

# open up our log
openlog('mogilefsd', 'pid', 'daemon');
Mgd::log('info', 'beginning run');

sub validate_dbh {
    return unless $dbh;
    my $id = $dbh->selectrow_array("SELECT CONNECTION_ID()");
    if (! $id) {
        # handle's dead.  don't use it.  (MySQL-ism above)
        undef $dbh;
    }
}

sub get_dbh {
    return $dbh ||= DBI->connect($db_dsn, $db_user, $db_pass);
}

# Install signal handlers.
$SIG{TERM}  = sub {
    print STDERR scalar keys %child, " children to kill.\n" if $DEBUG;
    my $count = kill( 'TERM' => keys %child );
    print STDERR "Sent SIGTERM to $count children.\n" if $DEBUG;
    exit 0;
};
$SIG{INT}  = sub {
    print STDERR scalar keys %child, " children to kill.\n" if $DEBUG;
    my $count = kill( 'INT' => keys %child );
    print STDERR "Sent SIGINT to $count children.\n" if $DEBUG;
    exit 0;
};

#############################################################################
## beginning of main execution path
#############################################################################

# setup server socket to listen for client connections
my $server = IO::Socket::INET->new(LocalPort => $conf_port,
                                   Type      => SOCK_STREAM,
                                   Proto     => 'tcp',
                                   Blocking  => 0,
                                   Reuse     => 1,
                                   Listen    => 10 )
    or die "Error creating socket: $@\n";

# accept handler for new clients
my $accept_handler = sub {
    my $csock = $server->accept();
    return unless $csock;

    printf( "Listen child making a Client for %d.\n", fileno($csock) )
        if $DEBUG >= 2;
    my $client = Client->new($csock);
    printf( "Client is %s\n", $client ) if $DEBUG >= 2;
    $client->watch_read(1);
};

# now setup socket for workers to connect to
my $wserver = IO::Socket::INET->new(LocalPort => $worker_port,
                                    LocalAddr => '127.0.0.1',
                                    Type      => SOCK_STREAM,
                                    Proto     => 'tcp',
                                    Blocking  => 0,
                                    Reuse     => 1,
                                    Listen    => 10 )
    or die "Error creating socket: $@\n";

# accept handler for new workers
my $waccept_handler = sub {
    my $csock = $wserver->accept();
    return unless $csock;

    printf( "Listen child making a Client for %d.\n", fileno($csock) )
        if $DEBUG >= 2;
    my $client = WorkerConn->new($csock);
    printf( "Child is %s\n", $client ) if $DEBUG >= 2;
    Frontend->RegisterWorkerConn($client);
};

# thing to keep jobs alive
my $spawn_children = sub {
    # run only once per second
    return 1 unless time > $lastspawntime;
    $lastspawntime = time();

    # see if anybody has died, but don't hang up on doing so
    my $pid = waitpid -1, WNOHANG;
    return 1 if $pid <= 0 && $allkidsup;
    $allkidsup = 0; # know something died

    # when a child dies, figure out what it was doing
    # and note that job has one less worker
    my $job;
    if ($pid > -1 && ($job = delete $child{$pid})) {
        my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
        error("Child $pid ($job) died: $? ($extra)");
        Frontend->NoteDeadChild($pid);

        if (my $jobstat = $jobs{$job}) {
            # if the pid is in %todie, then we have asked it to shut down
            # and have already decremented the jobstat counter and don't
            # want to do it again
            unless (my $true = delete $todie{$pid}) {
                # decrement the count of currently running jobs
                $jobstat->[1]--;
            }
        }
    }

    # foreach job, fork enough children
    while (my ($job, $jobstat) = each %jobs) {
        my $need = $jobstat->[0] - $jobstat->[1];
        if ($need > 0) {
            error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
            for (1..$need) {
                my $cpid = make_new_child($job);
                return 1 unless $cpid;
                $child{$cpid} = $job;

                # now increase the count of processes currently doing this job
                $jobstat->[1]++;
            }
        }
    }

    # if we got this far, all jobs have been re-created.  note that
    # so we avoid more CPU usage in this post-event-loop callback later
    $allkidsup = 1;

    # true value keeps us running:
    return 1;
};

# setup Danga::Socket to start handling connections
Client->DebugLevel( 3 );
Client->OtherFds( fileno($server)  => $accept_handler,
                  fileno($wserver) => $waccept_handler, );

# setup the post event loop callback to spawn jobs, and the timeout
Client->SetLoopTimeout( 250 ); # 250 milliseconds
Client->SetPostLoopCallback($spawn_children);

# and now, actually start listening for events
eval {
    print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
    Client->EventLoop();
};

if ( $@ ) { Mgd::log('err', "crash log: $@"); }
Mgd::log('info', 'ending run');
closelog();

#############################################################################
## end of main
#############################################################################

sub daemonize {
    my($pid, $sess_id, $i);

    ## Fork and exit parent
    if ($pid = fork) { exit 0; }

    ## Detach ourselves from the terminal
    croak "Cannot detach from controlling terminal"
        unless $sess_id = POSIX::setsid();

    ## Prevent possibility of acquiring a controling terminal
    $SIG{'HUP'} = 'IGNORE';
    if ($pid = fork) { exit 0; }

    ## Change working directory
    chdir "/";

    ## Clear file creation mask
    umask 0;

    print STDERR "Daemon running as pid $$.\n" if $DEBUG;

    ## Close open file descriptors
    close(STDIN);
    close(STDOUT);
    close(STDERR);

    ## Reopen stderr, stdout, stdin to /dev/null
    if ( $DEBUG ) {
        open(STDIN,  "+>/tmp/mogilefsd.log");
    } else {
        open(STDIN,  "+>/dev/null");
    }
    open(STDOUT, "+>&STDIN");
    open(STDERR, "+>&STDIN");
}

sub make_new_child {
    my $job = shift;

    my $pid;
    my $sigset;

    # block signal for fork
    $sigset = POSIX::SigSet->new(SIGINT);
    sigprocmask(SIG_BLOCK, $sigset)
        or return error("Can't block SIGINT for fork: $!");

    return error("fork failed creating $job: $!")
        unless defined ($pid = fork);

    if ($pid) {
        sigprocmask(SIG_UNBLOCK, $sigset)
            or return error("Can't unblock SIGINT for fork: $!");
        return $pid;
    }

    # as a child, we want to close these and ignore them
    close($server);
    close($wserver);

    $SIG{INT} = 'DEFAULT';
    $SIG{TERM} = 'DEFAULT';
    $0 .= " [$job]";

    # unblock signals
    sigprocmask(SIG_UNBLOCK, $sigset)
        or return error("Can't unblock SIGINT for fork: $!");

    # set our frontend into child mode
    Frontend->SetAsChild;

    # try to create a connection to the parent.  we die here because
    # we're the child and if we can't talk to the master we really need
    # to die so that a child isn't just sitting around without communication
    # to the parent.
    $psock = IO::Socket::INET->new(PeerAddr => "127.0.0.1",
                                   PeerPort => $worker_port,
                                   Type     => SOCK_STREAM,
                                   Proto    => 'tcp',)
        or die "Error creating socket to master: $@\n";
    $psock->write("$$ $job\n");

    # now call our job function
    no strict 'refs';
    my $job_handler = *{"job_$job"}{CODE};
    $job_handler->();
    exit;
}

# given (job, pid), record that this worker is about to die
sub note_pending_death {
    my ($job, $pid) = @_;

    die "$job not defined in call to note_pending_death.\n"
        unless defined $jobs{$job};

    $todie{$pid} = 1;
    $jobs{$job}->[1]--;
}

# log stuff to syslog or the screen
sub log {
    # simple logging functionality
    if (! $daemonize) {
        # syslog acts like printf so we have to use printf and append a \n
        shift; # ignore the first parameter (info, warn, critical, etc)
        printf(shift(@_) . "\n", @_);
    } else {
        # just pass the parameters to syslog
        syslog(@_);
    }
}

# argument: a string to send to the parent process.
sub send_to_parent {
    # send a string to our parent
    return unless $psock;
    $psock->write("$_[0]\r\n");
}

# argument: a string to take as indicating the error that just happened.
sub error {
    if ($psock) {
        # we're a child, pass error to parent
        send_to_parent("error $_[0]");
    } else {
        # we're a parent, so just handle output of error
        Frontend->NoteError(\$_[0]);
        Mgd::log('debug', $_[0]);
    }
    return 0;
}

sub job_delete {

  PASS:
    while (1) {
        sleep 9;
        validate_dbh();
        my $dbh = get_dbh();

        # see if we have anything from the parent
        send_to_parent('del_i_looped');
        while (defined (my $line = <$psock>)) {
            $line =~ s/\r?\n$//;
            last if $line eq '.';
            if ($line eq 'shutdown') {
                exit 0;
            }
        }

        my $LIMIT = 500;
        while (1) {
            sleep 1;
            my $delmap = $dbh->selectall_arrayref("SELECT fd.fid, fo.devid ".
                                                  "FROM file_to_delete fd ".
                                                  "LEFT JOIN file_on fo ON fd.fid=fo.fid ".
                                                  "LIMIT $LIMIT");
            my $count = $delmap ? scalar @$delmap : 0;
            next PASS unless $count;

            my %done;  # fid -> 1 (when fid is deleted from all devices)
            my %dev_down;  # devid -> 1 (when device times out due to EIO)

            foreach my $dm (@$delmap) {
                my ($fid, $devid) = @$dm;

                # if no device is returned from the query above, that
                # means there are no file_on rows for it, and we can consider
                # it now deleted.
                unless ($devid) {
                    $done{$fid} = 1;
                    next;
                }

                # don't try to delete from this device if we earlier
                # found it to be timing out with EIO
                next if $dev_down{$devid};

                my $path = make_path($devid, $fid);
                my $rv = 0;
                if (my $urlref = Mgd::is_url($path)) {
                    # hit up the server and delete it
                    my $sock = IO::Socket::INET->new(PeerAddr => $urlref->[0],
                                                     PeerPort => $urlref->[1],
                                                     Timeout => 2);
                    unless ($sock) {
                        # timeout or something, mark this device as down for now and move on
                        $dev_down{$devid} = 1;
                        next;
                    }

                    # send delete request
                    error("Sending delete for $path") if $Mgd::DEBUG >= 2;
                    $sock->write("DELETE $urlref->[2] HTTP/1.0\r\n\r\n");
                    my $response = <$sock>;
                    if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
                        if (($1 >= 200 && $1 <= 299) || $1 == 404) {
                            # effectively means all went well
                            $rv = 1;
                        } else {
                            # remote file system error?  mark node as down
                            error("Error: unlink failure: $path: $1");
                            $dev_down{$devid} = 1;
                            next;
                        }
                    } else {
                        error("Error: unknown response line: $response");
                    }
                } else {
                    # do normal unlink
                    $rv = unlink "$Mgd::MOG_ROOT/$path";

                    # device is timing out.  take note of it and
                    # continue dealing with other deletes
                    if (! $rv) {
                        if ($! == EIO) {
                            $dev_down{$devid} = 1;
                            next;
                        } elsif ($! == ENOENT) {
                            $rv = 1;  # count non-existent file as deleted
                        }
                    }
                }

                # if we deleted it, or it didn't exist, consider it
                # deleted.
                $dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
                         undef, $fid, $devid) if $rv;
            }

            if (%done) {
                my $in = join(',', keys %done);
                $dbh->do("DELETE FROM file_to_delete WHERE fid IN ($in)");
            }

            next PASS if $count < $LIMIT;
        }

    }
}

# copies a file from one Perlbal to another utilizing HTTP
sub http_copy {
    my ($sdevid, $ddevid, $fid) = @_;

    # handles setting unreachable magic; $error->(reachability, "message")
    my $error = sub {
        if ($_[0]) {
            send_to_parent("repl_unreachable $fid");

            # update database table
            Mgd::validate_dbh();
            my $dbh = Mgd::get_dbh();
            $dbh->do("REPLACE INTO unreachable_fids VALUES ($fid, UNIX_TIMESTAMP())");
        }
        return error($_[1]);
    };

    # get some information we'll need
    my $devs = Mgd::get_device_summary();
    my ($sdev, $ddev) = ($devs->{$sdevid}, $devs->{$ddevid});
    return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fid")
        unless ref $sdev && ref $ddev;
    my ($spath, $dpath) = (Mgd::make_http_path($sdevid, $fid),
                           Mgd::make_http_path($ddevid, $fid));
    my ($shost, $sport) = (Mgd::hostid_ip($sdev->{hostid}), Mgd::hostid_http_port($sdev->{hostid}));
    my ($dhost, $dport) = (Mgd::hostid_ip($ddev->{hostid}), Mgd::hostid_http_port($ddev->{hostid}));
    unless (defined $spath && defined $dpath && defined $shost && defined $dhost && $sport && $dport) {
        # show detailed information to find out what's not configured right
        error("Error: unable to replicate file fid=$fid from device id $sdevid to device id $ddevid");
        error("       http://$shost:$sport$spath -> http://$dhost:$dport$dpath");
        return 0;
    }

    # setup our pipe error handler, in case we get closed on
    my $pipe_closed = 0;
    local $SIG{PIPE} = sub { $pipe_closed = 1; };

    # okay, now get the file
    my $sock = IO::Socket::INET->new(PeerAddr => $shost, PeerPort => $sport, Timeout => 2)
        or return error("Unable to create socket to $shost:$sport for $spath");
    $sock->write("GET $spath HTTP/1.0\r\n\r\n");
    return error("Pipe closed retrieving $spath from $shost:$sport")
        if $pipe_closed;

    # we just want a content length
    my $clen;
    while (defined (my $line = <$sock>)) {
        $line =~ s/[\s\r\n]+$//;
        last unless length $line;
        if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
            # make sure we get a good response
            return $error->(1, "Error: Resource http://$shost:$sport$spath failed: HTTP $1")
                unless $1 >= 200 && $1 <= 299;
        }
        next unless $line =~ /^Content-length:\s*(\d+)\s*$/i;
        $clen = $1;
    }
    return $error->(1, "File $spath has a content-length of 0; unable to replicate")
        unless $clen;

    # open target for put
    my $dsock = IO::Socket::INET->new(PeerAddr => $dhost, PeerPort => $dport, Timeout => 2)
        or return error("Unable to create socket to $dhost:$dport for $dpath");
    $dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen\r\n\r\n")
        or return error("Unable to write data to $dpath on $dhost:$dport");
    return error("Pipe closed during write to $dpath on $dhost:$dport")
        if $pipe_closed;

    # now read data and print while we're reading.
    my ($data, $read, $written) = ('', 0, 0);
    while (!$pipe_closed && (my $bytes = $sock->read($data, $clen - $read))) {
        # now we've read in $bytes bytes
        $read += $bytes;
        my $wbytes = $dsock->send($data);
        $written += $wbytes;
        return error("Error: wrote $wbytes; expected to write $bytes; failed putting to $dpath")
            unless $wbytes == $bytes;
    }
    return error("Error: wrote $written bytes, expected to write $clen")
        unless $written == $clen;

    # now read in the response line (should be first line)
    my $line = <$dsock>;
    if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
        return 1 if $1 >= 200 && $1 <= 299;
        warn "Error: got a 404 in put: device not on host?: http://$dhost:$dport$dpath"
            if $1 == 404;
    } else {
        warn "Error: HTTP response line not recognized: $line";
    }
    return 0;
}

# replicates $fid if its devcount is less than $min.
sub replicate {
    my ($dbh, $fid, $min) = @_;

    my $lockname = "mgfs:fid:$fid:replicate";
    my $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 1)", undef,
                                     $lockname);
    return error("Unable to obtain lock $lockname")
        unless $lock;

    # hashref of devid -> $device_row_href  (where devid is alive)
    my $devs = Mgd::get_device_summary();
    return error("Device information from get_device_summary is empty")
        unless $devs && %$devs;

    # learn what devices this file is already on
    my $on_count = 0;
    my %on_host;     # hostid -> 1
    my @dead_devid;   # list of dead devids
    my @exist_devid;  # list of existing devids

    my $sth = $dbh->prepare("SELECT devid FROM file_on WHERE fid=?");
    $sth->execute($fid);
    die $dbh->errstr if $dbh->err;
    while (my ($devid) = $sth->fetchrow_array) {
        my $d = $devs->{$devid};
        unless ($d) {
            push @dead_devid, $devid;
            next;
        }
        $on_host{$d->{hostid}} = 1;
        $on_count++;
        push @exist_devid, $devid;
    }

    my $retunlock = sub {
        my $rv = shift;
        $dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname);
        return $rv ? $rv : error($_[0]);
    };

    return $retunlock->(2) if $on_count >= $min;
    return $retunlock->(0, "Source is no longer available replicating $fid") if $on_count == 0;
    return $retunlock->(0, "No eligible devices available replicating $fid") if @exist_devid == 0;

    my $sdevid;

    while ($on_count < $min) {
        my $need = $min - $on_count;

        my @good_devids = Mgd::find_deviceid(
            random => 1,
            not_on_hosts => [ keys %on_host ],
            weight_by_free => 1,
        );

        # wasn't able to replicate enough?
        last unless @good_devids;

        my $ddevid = shift @good_devids;
        $sdevid ||= @exist_devid[int(rand(scalar @exist_devid))];

        my $rv = undef;
        if ($USE_HTTP) {
            $rv = http_copy($sdevid, $ddevid, $fid);
        } else {
            my $dst_path = $MOG_ROOT . "/" . make_path($ddevid, $fid);
            my $src_path = $MOG_ROOT . "/" . make_path($sdevid, $fid);
            $rv = File::Copy::copy($src_path, $dst_path);
        }

        return $retunlock->(0, "Copier failed replicating $fid") unless $rv;
        add_file_on($fid, $ddevid, 1);
        $on_count++;
    }

    return $retunlock->(1);
}

sub get_mindevcounts {
    # make sure we have good info
    Mgd::check_host_cache();
    my $host_ct = keys %Mgd::cache_host;
    
    # find the classes for each domainid (including domains without explict classes)
    my %min; # dmid -> classid -> mindevcount
    validate_dbh();
    my $dbh = get_dbh();
    my $sth = $dbh->prepare("SELECT d.dmid, c.classid, c.mindevcount ".
                            "FROM domain d LEFT JOIN class c ON d.dmid=c.dmid");
    $sth->execute;
    while (my ($dmid, $classid, $mct) = $sth->fetchrow_array) {
        $min{$dmid} ||= {};  # note the existence of this dmid

        # classid may be NULL (undef), in which case there are no classes defined
        # and we don't note the mindevcount (yet)
        $min{$dmid}{$classid} = int($host_ct < $mct ? $host_ct : $mct) if defined $classid;
    }


    # now iterate over %min again to set the implicit class
    foreach my $dmid (keys %min) {
        # each domain's classid=0, if not defined, has an implied mindevcount of $default_mindevcount
        # which most people will probably use.
        $min{$dmid}{0} = $host_ct < $default_mindevcount ? $host_ct : $default_mindevcount
            unless exists $min{$dmid}{0};
    }

    # return ref to hash
    return \%min;
}

sub job_monitor {
    my $parse_parent_response = sub {
        # now see what was in our message queue
        while (defined (my $line = <$psock>)) {
            $line =~ s/\r?\n$//;
            last if $line eq '.';

            # now find out what command this is?
            if ($line eq 'shutdown') {
                exit 0;
            }
        }
    };

    while (1) {
        sleep 15;

        # get db and note we're starting a run
        error("Monitor running; scanning usage files")
            if $Mgd::DEBUG >= 1;
        validate_dbh();
        my $dbh = get_dbh() or return 0;

        # general report in to parent
        send_to_parent('monitor_ping');
        $parse_parent_response->();

        # get a current list of devices
        my $devs = Mgd::get_device_summary();
        next unless $devs && %$devs;

        # now iterate over devices
        foreach my $dev (values %$devs) {
            my $host = $Mgd::cache_host{$dev->{hostid}};
            my $port = $host->{http_get_port} || $host->{http_port};
            my $url = "http://$host->{hostip}:$port/dev$dev->{devid}/usage";

            # now try to get the data with a short timeout
            my $ua = LWP::UserAgent->new( timeout => 2 );
            my $response = $ua->get($url);

            unless ($response->is_success) {
                error("Failed getting dev$dev->{devid}: " . $response->status_line);
                next;
            }

            my %stats;
            my $data = $response->content;
            foreach (split(/\r?\n/, $data)) {
                next unless /^(\w+)\s*:\s*(.+)$/;
                $stats{$1} = $2;
            }

            my ($used, $total) = ($stats{used}, $stats{total});
            unless ($used && $total) {
                error("dev$dev->{devid} reports used = $used, total = $total, error?");
                next;
            }

            # bytes => megabytes
            $used /= 1024;
            $total /= 1024;

            $dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = UNIX_TIMESTAMP() " .
                     "WHERE devid = ?", undef, int($total), int($used), $dev->{devid});
            if ($dbh->err) {
                error("Database error in update query: " . $dbh->errstr);
                next;
            }

            error("dev$dev->{devid}: used = $used, total = $total")
                if $Mgd::DEBUG >= 1;
        }
    }
}

sub job_reaper {
    my $parse_parent_response = sub {
        # now see what was in our message queue
        while (defined (my $line = <$psock>)) {
            $line =~ s/\r?\n$//;
            last if $line eq '.';

            # now find out what command this is?
            if ($line eq 'shutdown') {
                exit 0;
            }
        }
    };

    while (1) {
        sleep 10;

        # get db and note we're starting a run
        error("Reaper running; looking for dead devices")
            if $Mgd::DEBUG >= 1;
        validate_dbh();
        my $dbh = get_dbh() or return 0;

        # general report in to parent
        send_to_parent('reaper_ping');
        $parse_parent_response->();

        # get a current list of devices
        my $devs = Mgd::get_device_summary(1);
        my @deaddevs = grep { $_->{status} eq 'dead' } values %$devs;
        next unless @deaddevs;

        # now iterate over dead devices
        foreach my $dev (@deaddevs) {
            my $devid = $dev->{devid};

            # look for files on this device
            my $fids = $dbh->selectcol_arrayref('SELECT fid FROM file_on WHERE devid = ? LIMIT 1000',
                                                undef, $devid);
            if ($dbh->err) {
                error("Error selecting jobs to reap: " . $dbh->errstr);
                next;
            }
            next unless $fids && @$fids;

            # note we got some
            error("Found " . scalar(@$fids) . " files on dead device $devid");

            # now iterate
            foreach my $fid (@$fids) {
                $dbh->do('DELETE FROM file_on WHERE fid = ? AND devid = ?',
                         undef, $fid, $devid);
                if ($dbh->err) {
                    error("Error deleting from file_on (file $fid, device $devid): " . $dbh->errstr);
                    next;
                }

                # now update the fid count
                unless (Mgd::update_fid_devcount($fid)) {
                    error("Error updating fid $fid devcount");
                    next;
                }

                # if debugging on, note this is done
                error("Reaper noted fid $fid no longer on device $devid")
                    if $Mgd::DEBUG >= 2;
            }
        }
    }
}

sub job_replicate {
    my $parse_parent_response = sub {
        # now see what was in our message queue
        while (defined (my $line = <$psock>)) {
            $line =~ s/\r?\n$//;
            last if $line eq '.';

            # now find out what command this is?
            if ($line =~ /^repl_was_done (\d+)/ && $_[0]) {
                delete $_[0]->{$1};
            } elsif ($line eq 'shutdown') {
                exit 0;
            }
        }
    };

    # { fid => lastcheck }; instructs us not to replicate this fid... we will clear
    # out fids from this list that are expired
    my %fidfailure;

    # { fid => 1 }; used to keep track of fids we find in the unreachable_fids table
    my %unreachable;

    my $sleep = 2;
    while (1) {
        sleep $sleep;
        validate_dbh();
        my $dbh = get_dbh() or return 0;

        # general report in to parent
        send_to_parent('repl_ping');
        $parse_parent_response->(undef);

        # start off assuming that we're going to get everything replicated and then take a break
        $sleep = 2;

        # update our unreachable fid list... we consider them good for 15 minutes
        my $urfids = $dbh->selectall_arrayref('SELECT fid, lastupdate FROM unreachable_fids');
        die $dbh->errstr if $dbh->err;
        foreach my $r (@{$urfids || []}) {
            my $nv = $r->[1] + 900;
            unless ($fidfailure{$r->[0]} && $fidfailure{$r->[0]} < $nv) {
                # given that we might have set it below to a time past the unreachable
                # 15 minute timeout, we want to only overwrite %fidfailure's idea of
                # the expiration time if we are extending it
                $fidfailure{$r->[0]} = $nv;
            }
            $unreachable{$r->[0]} = 1;
        }

        # get the min dev counts
        my %min = %{ Mgd::get_mindevcounts() };

        # iterate through each domain, replicating its contents
        foreach my $dmid (keys %min) {
            # iterate through each class, including the implicit class 0
            while (my ($classid, $min) = each %{$min{$dmid}}) {
                error("Checking replication for dmid=$dmid, classid=$classid, min=$min")
                    if $Mgd::DEBUG >= 1;

                my $LIMIT = 1000;

                # try going from devcount of 1 up to devcount of $min-1
                my %fidtodo; # fid => 1
                my $fixed = 0;
                my $attempted = 0;
                my $devcount = 1;
                while ($fixed < $LIMIT && $devcount < $min) {
                    my $now = time();
                    my $fids = $dbh->selectcol_arrayref("SELECT fid FROM file WHERE dmid=? AND classid=? ".
                                                        "AND devcount = ? AND length IS NOT NULL ".
                                                        "LIMIT $LIMIT", undef, $dmid, $classid, $devcount);
                    die $dbh->errstr if $dbh->err;
                    $fidtodo{$_} = 1 foreach @$fids;

                    # increase devcount so we try to replicate the files at the next devcount
                    $devcount++;

                    # see if we have any files to replicate
                    my $count = $fids ? scalar @$fids : 0;
                    error("  found $count for dmid=$dmid/classid=$classid/min=$min")
                        if $Mgd::DEBUG >= 1;
                    next unless $count;

                    # randomize the list so multiple daemons/threads working on
                    # replicate at the same time don't all fight over the
                    # same fids to move
                    my @randfids = randlist(@$fids);

                    error("Need to replicate: $dmid/$classid: @$fids") if $Mgd::DEBUG >= 2;
                    foreach my $fid (@randfids) {
                        # now replicate this fid
                        $attempted++;
                        next unless $fidtodo{$fid};

                        if ($fidfailure{$fid}) {
                            if ($fidfailure{$fid} < $now) {
                                delete $fidfailure{$fid};
                            } else {
                                next;
                            }
                        }

                        if (my $status = replicate($dbh, $fid, $min)) {
                            # $status is either 0 (failure, handled below), 1 (success, we actually
                            # replicated this file), or 2 (success, but someone else replicated it).
                            # so if it's 2, we just want to go to the next fid.  this file is done.
                            next if $status == 2;

                            # if it was no longer reachable, mark it reachable
                            if (delete $unreachable{$fid}) {
                                $dbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid);
                                die $dbh->errstr if $dbh->err;
                            }

                            # housekeeping
                            $fixed++;
                            send_to_parent("repl_i_did $fid");
                            $parse_parent_response->(\%fidtodo);

                            # status update
                            if ($fixed % 20 == 0) {
                                my $ratio = $fixed/$attempted*100;
                                error(sprintf("replicated=$fixed, attempted=$attempted, ratio=%.2f%%", $ratio))
                                    if $fixed % 20 == 0;
                            }
                        } else {
                            # failed in replicate, don't retry for a minute
                            $fidfailure{$fid} = $now + 60;
                        }
                    }
                }

                # if we did 1000, we just want to jump to the next pass through all domains and classes without pausing
                $sleep = 0 if $fixed >= $LIMIT;
            }
        }
    }
}

sub job_queryworker {
    # process lines of input, blocking.
    my $worker = QueryWorker->new($psock);
    while (defined (my $line = <$psock>)) {
        $line =~ s/[\r\n]+$//;
        validate_dbh();
        $worker->process_line(\$line);
    }
}


#####################################################################
### S E R V E R   A P I   F U N C T I O N S
#####################################################################

# returns hashref of devid -> $device_row_href  (where devid is alive/down, but not dead)
# cached for 15 seconds.
use vars qw($cache_device_summary $cache_device_summary_time %cache_host $cache_host_time);

# general purpose device locator.  example:
#
# my $devid = Mgd::find_deviceid(
#     random => 1,              # get random device (else find first suitable)
#     min_free_space => 100,    # with at least 100MB free
#     weight_by_free => 1,      # find result weighted by free space
#     max_disk_age => 5,        # minutes of age the last usage report can be before we ignore the disk
#     not_on_hosts => [ 1, 2 ], # no devices on hosts 1 and 2
# );
#
# returns undef if no suitable device was found.  else, if you wanted an
# array will return an array of the suitable devices--if you want just a
# single item, you get just the first one found.
sub find_deviceid {
    my %opts = ( @_ );

    # copy down global minimum free space if not specified
    $opts{min_free_space} ||= $min_free_space;
    $opts{max_disk_age} ||= $max_disk_age;
    $opts{max_disk_age} = time() - ($opts{max_disk_age} * 60);

    # setup for iterating over devices
    my $devs = Mgd::get_device_summary();
    my @devids = keys %{$devs || {}};
    my $devcount = scalar(@devids);
    my $start = $opts{random} ? int(rand($devcount)) : 0;
    my %not_on_host = ( map { $_ => 1 } @{$opts{not_on_hosts} || []} );
    my $total_free = 0;

    # now find a device that matches what they want
    my @list;
    for (my $i = 0; $i < $devcount; $i++) {
        my $idx = ($i + $start) % $devcount;
        my $dev = $devs->{$devids[$idx]};

        # series of suitability checks
        next unless $dev->{status} eq 'alive';
        next if $not_on_host{$dev->{hostid}};
        next if $opts{max_disk_age} && $dev->{mb_asof} &&
                $dev->{mb_asof} < $opts{max_disk_age};
        next if $opts{min_free_space} && $dev->{mb_total} &&
                $dev->{mb_free} < $opts{min_free_space};

        # we get here, this is a suitable device
        push @list, $dev->{devid};
        $total_free += $dev->{mb_free};
    }

    # now we have a list ordered randomly, do free space weighting
    if ($opts{weight_by_free}) {
        my $rand = int(rand($total_free));
        my $cur = 0;

        foreach my $devid (@list) {
            $cur += $devs->{$devid}->{mb_free};
            return $devid if $cur >= $rand;
        }
    }

    # return first listed suitable device
    return @list ? $list[0] : undef;
}

sub get_device_summary {
    my $include_dead = shift() ? ", 'dead'" : '';
    my $now = time;
    return $cache_device_summary if $cache_device_summary_time > $now - 15;

    my $dbh = get_dbh();

    # learn devices
    my %dev;  #
    my %hostdevs;  # hostid -> [ devid ]  (where devid is alive/down, but not dead)
    my $sth = $dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
                            "mb_used, mb_asof, status FROM device ".
                            "WHERE status IN ('alive', 'down' $include_dead)");
    $sth->execute;
    $dev{$_->{devid}} = $_ while $_ = $sth->fetchrow_hashref;

    # now override device status with host status if the host status is less than the device status
    Mgd::check_host_cache();
    foreach my $devid (keys %dev) {
        # makes others have an easier time of finding devices by free space
        $dev{$devid}->{mb_free} = $dev{$devid}->{mb_total} - $dev{$devid}->{mb_used};

        my $host_status = $cache_host{$dev{$devid}->{hostid}}->{status};
        if ($dev{$devid}->{status} eq 'alive' && $host_status ne 'alive') {
            $dev{$devid}->{status} = $host_status;
        } elsif ($dev{$devid}->{status} eq 'down' && $host_status eq 'dead') {
            $dev{$devid}->{status} = $host_status;
        }
    }

    $cache_device_summary_time = $now;
    return $cache_device_summary = \%dev;
}

sub check_host_cache {
    my $now = time;
    return if $cache_host_time > $now - 5;

    %cache_host = ();
    my $dbh = get_dbh();
    my $sth = $dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
                            "hostip, http_port, http_get_port, remoteroot, altip, altmask FROM host");
    $sth->execute;
    while (my $host = $sth->fetchrow_hashref) {
        $cache_host{$host->{hostid}} = $host;
        $cache_host{$host->{hostid}}->{mask} = Net::Netmask->new2($host->{altmask})
            if $host->{altip} && $host->{altmask};
    }
    $cache_host_time = $now;
}

sub key_filerow {
    my ($dbh, $dmid, $key) = @_;
    my $row = $dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
                                      "FROM file WHERE dmid=? AND dkey=?",
                                      undef, $dmid, $key);
    return $row;
}

# see if we should reduce the number of active children
sub job_needs_reduction {
    my $job = shift;
    return $jobs{$job}->[0] < $jobs{$job}->[1];
}

# given a file descriptor number and a timeout, wait for that descriptor to
# become readable; returns 0 or 1 on if it did or not
sub wait_for_readability {
    my ($fileno, $timeout) = @_;
    return 0 unless $fileno && $timeout;

    my $rin;
    vec($rin, $fileno, 1) = 1;
    my $nfound = select($rin, undef, undef, $timeout);

    # nfound can be undef or 0, both failures, or 1, a success
    return $nfound ? 1 : 0;
}

# get size of file, return 0 on error
sub get_file_size {
    my $path = shift;

    # quick case -- just a file on disk
    unless ($path =~ m!^http://([^:/]+)(?::(\d+))?(/.+)$!) {
        return -s "$Mgd::MOG_ROOT/$path"
    }
    my ($host, $port, $uri) = ($1, $2, $3);

    # don't sigpipe us
    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;

    # setup for sending size request to cached host
    my $req = "size $uri\r\n";
    my $reqlen = length $req;
    my $rv = 0;
    my $sock = $streamcache{$host};

    # sub to parse the response from $sock.  common code, so we have it here.
    my $parse_response = sub {
        # give the socket 3 seconds to become readable
        unless (Mgd::wait_for_readability(fileno($sock), $node_timeout)) {
            close($sock);
            return 0;
        }

        # now we know there's readable data
        my $line = <$sock>;
        return 0 unless defined $line;
        return 0 unless $line =~ /^(\S+)\s+(-?\d+)/; # expected format: "uri size"
        return error("get_file_size() requested size of $path, got back size of $1 ($2 bytes)")
            if $1 ne $uri;
        return $2+0;
    };

    # try using the cached socket
    if ($sock) {
        $rv = send($sock, $req, $FLAG_NOSIGNAL);
        if ($!) {
            undef $streamcache{$host};
        } elsif ($rv != $reqlen) {
            return error("send() didn't return expected length ($rv, not $reqlen) for $path");
        } else {
            # success
            return $parse_response->();
        }
    }

    # try creating a connection to the stream
    unless ($rv) {
        $sock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $MOGSTORED_STREAM_PORT, Timeout => 5);
        $streamcache{$host} = $sock;
        if ($sock) {
            $rv = send($sock, $req, $FLAG_NOSIGNAL);
            if ($!) {
                return error("error talking to mogstored stream ($path): $!");
            } elsif ($rv != $reqlen) {
                return error("send() didn't return expected length ($rv, not $reqlen) for $path");
            } else {
                # success
                return $parse_response->();
            }
        }
    }

    # failure case: use a HEAD request to get the size of the file
    my $sock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $port, Timeout => 3)
        or return error("get_file_size() unable to contact mogstored for size of $path");
    $sock->write("HEAD $uri HTTP/1.0\r\n\r\n");
    while (defined (my $line = <$sock>)) {
        if ($line =~ /^Content-length: (\d+)/i) {
            # success
            return $1+0;
        }
    }

    # no content length found?
    return error("get_file_size() found no content-length header in response for $path");
}

sub class_id {
    my ($dmid, $class) = @_;
    return undef unless $dmid > 0 && length $class;

    my $dbh = Mgd::get_dbh;
    my $classid = $dbh->selectrow_array
        ("SELECT classid FROM class WHERE dmid=? AND classname=?", undef, $dmid, $class)
            or return undef;
    return undef unless $classid;
    return $classid;
}

sub domain_id {
    # check the cache for this item
    my $now = time();
    if ($domaincachetime + 5 < $now) {
        %domaincache = ();

        # now get updated list
        my $dbh = Mgd::get_dbh;
        my $domains = $dbh->selectall_arrayref('SELECT dmid, namespace FROM domain');
        foreach my $row (@{$domains || []}) {
            # namespace -> dmid
            $domaincache{$row->[1]} = $row->[0];
        }

        $domaincachetime = $now;
    }

    # just use cached version
    return $domaincache{$_[0]};
}

sub class_name {
    my ($dmid, $classid) = @_;
    return undef unless $dmid > 0 && length $classid;
    # FIXME: cache this

    # lookup class
    my $dbh = Mgd::get_dbh;
    my $classname = $dbh->selectrow_array
        ("SELECT classname FROM class WHERE dmid=? AND classid=?", undef, $dmid, $classid)
            or return undef;
    return undef unless $classname;
    return $classname;
}

sub domain_name {
    my $dmid = shift;
    # FIXME: cache this

    # lookup domain
    my $dbh = Mgd::get_dbh;
    my $namespace = $dbh->selectrow_array
        ("SELECT namespace FROM domain WHERE dmid=?", undef, $dmid);
    return $namespace;

}

sub hostid_name {
    my $hostid = shift;
    check_host_cache();
    my $h = $cache_host{$hostid};
    return $h ? $h->{hostname} : undef;
}

sub hostid_ip {
    my $hostid = shift;
    check_host_cache();
    my $h = $cache_host{$hostid};
    return undef unless $h;

    # if we have a client ip and an object for alt matching...
    if ($h->{mask} && $h->{altip} &&
            ($force_alt_zone || ($client_ip && $h->{altip} && $h->{mask}->match($client_ip)))) {
        return $h->{altip};
    } else {
        return $h->{hostip};
    }
}

sub hostid_http_port {
    my $hostid = shift;
    check_host_cache();
    my $h = $cache_host{$hostid};
    return $h ? $h->{http_port} : undef;
}

sub hostid_http_get_port {
    my $hostid = shift;
    check_host_cache();
    my $h = $cache_host{$hostid};
    return $h ? $h->{http_get_port} : undef;
}

sub make_http_path {
    my ($devid, $fid) = @_;

    my $dsum = get_device_summary();
    my $dinfo = $dsum->{$devid};
    return undef unless $dinfo;
    my $hostname = hostid_name($dinfo->{hostid});

    my $nfid = sprintf '%010d', $fid;
    my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} );

    return "/dev$devid/$b/$mmm/$ttt/$nfid.fid";
}

sub make_full_url {
    # set use_get_port to be true to specify to use the get port
    my ($devid, $fid, $use_get_port) = @_;

    # get some information we'll need
    my $devs = Mgd::get_device_summary();
    my $dev = $devs->{$devid} or return undef;
    my $path = Mgd::make_http_path($devid, $fid) or return undef;
    my $host = Mgd::hostid_ip($dev->{hostid}) or return undef;
    my $port = $use_get_port ? Mgd::hostid_http_get_port($dev->{hostid}) : undef;
    $port ||= Mgd::hostid_http_port($dev->{hostid}) or return undef;
    return "http://$host:$port$path";
}

# if given an HTTP URL, break it down into [ host, port, URI ], else
# returns undef
sub is_url {
    my $path = shift;
    if ($path =~ m!^http://(.+?)(?::(\d+))?(/.+)$!) {
        return [ $1, $2 || 80, $3 ];
    }
    return undef;
}

sub make_path {
    # jump out if we should be using HTTP stuff
    return Mgd::make_full_url(@_) if $USE_HTTP;

    my ($devid, $fid) = @_;

    my $dsum = get_device_summary();
    my $dinfo = $dsum->{$devid};
    return undef unless $dinfo;
    my $hostname = hostid_name($dinfo->{hostid});

    my $nfid = sprintf '%010d', $fid;
    my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} );

    my $path = "$hostname/dev$devid/$b/$mmm/$ttt/$nfid.fid";
    make_dirs( "$MOG_ROOT/$path" ) or return undef;

    return $path;
}

sub make_get_path {
    # the get path only changes for HTTP mode
    return Mgd::make_path(@_) unless $USE_HTTP;
    return Mgd::make_full_url(@_, 1);
}

sub make_dirs
{
    my $filename = shift;
    my $dir = File::Basename::dirname($filename);
    eval { File::Path::mkpath($dir, 0, 0775); };
    return $@ ? 0 : 1;
}

sub add_file_on {
    my ($fid, $devid, $no_lock) = @_;

    my $dbh = get_dbh() or return 0;

    my $rv = $dbh->do("INSERT IGNORE INTO file_on SET fid=?, devid=?",
                      undef, $fid, $devid);
    if ($rv > 0) {
        return update_fid_devcount($fid, $no_lock);
    } else {
        # was already on that device
        return 1;
    }
}

sub update_fid_devcount {
    my ($fid, $no_lock) = @_;

    my $dbh = get_dbh() or return 0;

    my $lockname = "mgfs:fid:$fid";
    unless ($no_lock) {
        my $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 10)", undef,
                                         $lockname);
        return 0 unless $lock;
    }
    my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
                                   undef, $fid);

    $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
             $ct, $fid);

    unless ($no_lock) {
        $dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname);
    }

    return 1;
}

sub randlist
{
    my @rlist = @_;
    my $size = scalar(@rlist);

    my $i;
    for ($i=0; $i<$size; $i++)
    {
        unshift @rlist, splice(@rlist, $i+int(rand()*($size-$i)), 1);
    }
    return @rlist;
}


#####################################################################
### C L I E N T   C L A S S
### A client is a user connection for sending requests to us.  Requests
### can either be normal user requests to be sent to a QueryWorker
### or management requests that start with a !.
#####################################################################
package Client;

use Danga::Socket ();
use base qw{Danga::Socket};

use fields qw{read_buf};

sub new {
    my Client $self = shift;
    $self = fields::new($self) unless ref $self;
    $self->SUPER::new( @_ );
    return $self;
}

# Client
sub event_read {
    my Client $self = shift;

    my $bref = $self->read(1024);
    return $self->close() unless defined $bref;
    $self->{read_buf} .= $$bref;

    while ($self->{read_buf} =~ s/^(.*?)\r?\n//) {
        next unless length $1;
        Frontend->HandleClientRequest($self, $1);
    }
}

# Client
sub event_err { my $self = shift; $self->close; }
sub event_hup { my $self = shift; $self->close; }

# just note that we've died
sub close {
    # mark us as being dead
    my Client $self = shift;
    Frontend->NoteDeadClient($self);
    $self->SUPER::close(@_);
}


#####################################################################
### W O R K E R C O N N   C L A S S
### This class maintains a connection to one of our various classes
### of workers.
#####################################################################
package WorkerConn;

use Danga::Socket ();
use base qw{Danga::Socket};

use fields qw{read_buf job pid cmd_buf reqid};

sub new {
    my WorkerConn $self = shift;
    $self = fields::new($self) unless ref $self;
    $self->SUPER::new( @_ );

    # mark as not a worker by default
    $self->{pid} = 0;
    $self->{reqid} = 0;
    $self->{job} = undef;
    $self->{cmd_buf} = [];

    return $self;
}

sub event_read {
    my WorkerConn $self = shift;

    my $bref = $self->read(1024);
    return $self->close() unless defined $bref;
    $self->{read_buf} .= $$bref;

    while ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
        my $line = $1;
        if ($self->job eq 'queryworker' && (substr($line, 0, 5) ne 'error')) {
            Frontend->HandleQueryWorkerResponse($self, $line);
        } else {
            Frontend->HandleChildRequest($self, $line);
        }
    }
}

sub job {
    my WorkerConn $self = shift;
    return $self->{job} unless @_;
    return $self->{job} = shift;
}

sub pid {
    my WorkerConn $self = shift;
    return $self->{pid} unless @_;
    return $self->{pid} = shift;
}

sub event_hup { my $self = shift; $self->close; }

sub close {
    # mark us as being dead
    my WorkerConn $self = shift;
    Frontend->NoteDeadWorkerConn($self);
    $self->SUPER::close(@_);
}

sub enqueue_line {
    my WorkerConn $self = $_[0];
    return if $self->job eq 'queryworker'; # they don't use this queueing
    my $msg = "$_[1]\r\n";
    push @{$self->{cmd_buf}}, $msg;
}

sub drain_queue {
    my WorkerConn $worker = $_[0];
    foreach my $cmd (@{$worker->{cmd_buf}}) {
        $worker->write($cmd);
    }
    $worker->write(".\r\n");
    $worker->{cmd_buf} = [];
}


#####################################################################
### F R O N T E N D  C L A S S
### This class handles keeping lists of workers and clients and
### assigning them to eachother when things happen.  This is a purely
### event driven class.
#####################################################################
package Frontend;

# Mappings: fd => [ clientref, jobstring, starttime ]
# queues are just lists of Client class objects
# ChildrenByJob: job => { pid => $client }
# ErrorsTo: fid => Client
# RecentQueries: [ string, string, string, ... ]
# Stats: element => number
our ($IsChild, @QueryWorkerQueue, @ClientQueue, @RecentQueries,
     %Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
$IsChild = 0;

# when a child is spawned, they'll have copies of all the data from the
# parent, but they don't need it.  this method is called when you want
# to indicate that this Frontend is running on a child and should clean.
sub SetAsChild {
    @QueryWorkerQueue = ();
    @ClientQueue = ();
    %Mappings = ();
    $IsChild = 1;
    %ErrorsTo = ();

    # and now kill off our event loop so that we don't waste time
    Client->SetPostLoopCallback(sub { return 0; });
}

# called when a child has died.  a child is someone doing a job for us,
# but it might be a queryworker or any other type of job.  we just want
# to remove them from our list of children.  they're actually respawned
# by the make_new_child function elsewhere in Mgd.
sub NoteDeadChild {
    my $pid = $_[1];
    foreach my $job (keys %ChildrenByJob) {
        return if # bail out if we actually delete one
            delete $ChildrenByJob{$job}->{$pid};
    }
}

# called when a client dies.  clients are users, management or non.
# we just want to remove them from the error reporting interface, if
# they happen to be part of it.
sub NoteDeadClient {
    my Client $client = $_[1];
    delete $ErrorsTo{$client->{fd}};
}

# called when the error function in Mgd is called and we're in the parent,
# so it's pretty simple that basically we just spit it out to folks listening
# to errors
sub NoteError {
    my Client $client;
    return unless %ErrorsTo;

    my $msg = ":: ${$_[1]}\r\n";
    foreach $client (values %ErrorsTo) {
        $client->write(\$msg);
    }
}

# take a new connection that we know is from one of our children, but
# we're not sure what type of child, so just set it in read mode until
# they tell us what they are
sub RegisterWorkerConn {
    my WorkerConn $worker = $_[1];
    $worker->watch_read(1);
}

# take a new worker and note that it's a worker and ready to be used
# for commands.  this is called when workers connect to the frontend.
sub RegisterQueryWorker {
    # basically take the worker, mark it as a worker, enqueue it,
    # and then try to process the outstanding queues
    my WorkerConn $worker = $_[1];
    Frontend->EnqueueQueryWorker($worker);
}

# puts a worker back in the queue, deleting any outstanding jobs in
# the mapping list for this fd.
sub EnqueueQueryWorker {
    # first arg is class, second is worker
    my WorkerConn $worker = $_[1];
    delete $Mappings{$worker->{fd}};

    # see if we need to kill off some workers
    if (Mgd::job_needs_reduction('queryworker')) {
        Mgd::error("Reducing queryworker headcount by 1.");
        Frontend->AskWorkerToDie($worker);
        return;
    }

    # must be okay, so put it in the queue
    push @QueryWorkerQueue, $worker;
    Frontend->ProcessQueues;
}

# if we need to kill off a worker, this function takes in the WorkerConn
# object, tells it to die, marks us as having requested its death, and decrements
# the count of running jobs.
sub AskWorkerToDie {
    my WorkerConn $worker = $_[1];
    $worker->write("shutdown\r\n");
    Mgd::note_pending_death($worker->job, $worker->pid);
}

# kill bored query workers so we can get down to the level requested.  this
# continues killing until we run out of folks to kill.
sub CullQueryWorkers {
    while (@QueryWorkerQueue && Mgd::job_needs_reduction('queryworker')) {
        my WorkerConn $worker = shift @QueryWorkerQueue;
        Frontend->AskWorkerToDie($worker);
    }
}

# called when we get a response from a worker.  this reenqueues the
# worker so it can handle another response as well as passes the answer
# back on to the client.
sub HandleQueryWorkerResponse {
    return Mgd::error("Frontend (Child) got worker response: $_[2]") if $IsChild;

    # got a response from a worker
    my WorkerConn $worker = $_[1];
    return unless $worker && $Mappings{$worker->{fd}};

    # get the client we're working with (if any)
    my Client $client = $Mappings{$worker->{fd}}->[0];

    # if we have no client, then we just got a standard message from
    # the queryworker and need to pass it up the line
    return Frontend->HandleChildRequest($worker, $_[2]) if !$client;

    # at this point it was a command response, but if the client has gone
    # away, just reenqueue this query worker
    return Frontend->EnqueueQueryWorker($worker) if $client->{closed};

    # <numeric id> [client-side time to complete] <response>
    my ($time, $id, $res);
    if ($_[2] =~ /^(\d+-\d+)\s+(\d+\.\d+)\s+(.+)$/) {
        # save time and response for use later
        ($id, $time, $res) = ($1, $2, $3);
    } elsif ($_[2] =~ /^(\d+-\d+)\s(.+)$/) {
        # didn't match, must be in a different format?
        ($id, $time, $res) = ($1, 'undef', $2);
    }

    # now, if it doesn't match
    unless ($id eq "$worker->{pid}-$worker->{reqid}") {
        Mgd::error("Worker responded with id $id, expected $worker->{pid}-$worker->{reqid}, killing");
        $client->close('worker_mismatch');
        return Frontend->AskWorkerToDie($worker);
    }

    # now time this interval and add to @RecentQueries
    my $tinterval = Time::HiRes::tv_interval([$Mappings{$worker->{fd}}->[2]]);
    push @RecentQueries, sprintf("%s %.4f %s", $Mappings{$worker->{fd}}->[1], $tinterval, $time);
    shift @RecentQueries if scalar(@RecentQueries) > 50;

    # send text to client, put worker back in queue
    $client->write("$res\r\n");
    Frontend->EnqueueQueryWorker($worker);
}

# called from various spots to empty the queues of available pairs.
sub ProcessQueues {
    return if $IsChild;

    # try to match up a client with a worker
    while (@QueryWorkerQueue && @ClientQueue) {
        # get client that isn't closed
        my $clref;
        while (@ClientQueue) {
            $clref = shift @ClientQueue;
            if (!defined $clref || $clref->[0]->{closed}) {
                $clref = undef;
                next;
            }

            # if we get here the client is valid
            last;
        }
        next unless $clref;

        # get worker and make sure it's not closed already
        my WorkerConn $worker = shift @QueryWorkerQueue;
        if (!defined $worker || $worker->{closed}) {
            unshift @ClientQueue, $clref;
            next;
        }

        # put in mapping and send data to worker
        push @$clref, Time::HiRes::gettimeofday();
        $Mappings{$worker->{fd}} = $clref;

        # increment our counter so we know what request counter this is going out
        $worker->{reqid}++;

        $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
        $worker->watch_read(1);
    }
}

# send short descriptions of commands we support to the user
sub SendHelp {
    my Client $client = $_[1];

    # not supported yet
    #my $whaton = $_[2];

    # send general purpose help
    $client->write(<<HELP);
Welcome to mogilefsd's built-in help system.  Available commands:

    !recent     Recently executed queries and how long they took.
    !queue      Queries that are pending execution.
    !stats      General stats on what we're up to.
    !watch      Observe errors/messages from children.
    !jobs       Outstanding job counts, desired level, and pids.
    !shutdown   IMMEDIATELY kill all of mogilefsd.  IMMEDIATELY.

    !replication
                See the replication status.  Output format:
                <domain> <class> <devcount> <files>

    !to <job class> <message>
                Send <message> to all workers of <job class>.
                Mostly used for debugging.

    !want <count> <job class>
                Alter the level of workers of this class desired.
                Example: !want 20 queryworker, !want 3 replicate.
                See !jobs for what jobs are available.

More to come...
.
HELP
    
}

# called when a client sends us text.  we just create a job for
# it and then call ProcessQueues.
sub HandleClientRequest {
    return Mgd::error("Frontend (Child) got request from client: $_[2]") if $IsChild;

    # if it's just 'help', 'h', '?', or something, do that
    if ((substr($_[2], 0, 1) eq '?') || ($_[2] eq 'help') || ($_[2] eq '')) {
        Frontend->SendHelp($_[1]);
        return;
    }

    # quick check to see if we the parent should handle this
    if (substr($_[2], 0, 1) eq '!') {
        my Client $client = $_[1];
        my ($cmd, $args) = ($_[2] =~ m/^!(.+?)(?:\s+(.+))?$/);

        my @out;
        if ($cmd =~ /^stats$/) {
            # print out some stats on the queues
            my $uptime = time - $Mgd::starttime;
            my $ccount = scalar(@ClientQueue);
            my $wcount = scalar(@QueryWorkerQueue);
            my $ipcount = scalar(keys %Mappings);
            push @out, "uptime $uptime",
                       "pending_queries $ccount",
                       "processing_queries $ipcount",
                       "bored_queryworkers $wcount",
                       map { "$_ $Stats{$_}" } sort keys %Stats;

        } elsif ($cmd =~ /^repl/) {
            Mgd::validate_dbh();
            my $dbh = Mgd::get_dbh();
            my $mdcs = Mgd::get_mindevcounts();
            foreach my $dmid (sort keys %$mdcs) {
                my $dmname = Mgd::domain_name($dmid);
                foreach my $classid (sort keys %{$mdcs->{$dmid}}) {
                    my $min = $mdcs->{$dmid}->{$classid};
                    next unless $min > 1;

                    my $classname = Mgd::class_name($dmid, $classid) || '_default';
                    foreach my $ct (1..$min-1) {
                        my $count = $dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
                                                          undef, $dmid, $classid, $ct);
                        push @out, "$dmname $classname $ct $count";
                    }
                }
            }

        } elsif ($cmd =~ /^shutdown/) {
            print "User requested shutdown: $args\n";
            kill 15, $$; # kill us, that kills our kids

        } elsif ($cmd =~ /^jobs/) {
            # dump out a list of running jobs and pids
            foreach my $job (sort keys %ChildrenByJob) {
                my $ct = scalar(keys %{$ChildrenByJob{$job}});
                push @out, "$job count $ct";
                push @out, "$job desired $jobs{$job}->[0]";
                push @out, "$job pids " . join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}});
            }

        } elsif ($cmd =~ /^want/) {
            # !want <count> <jobclass>
            # set the new desired staffing level for a class
            if ($args =~ /^(\d+)\s+(\S+)/) {
                my ($count, $job) = ($1, $2);

                # validate count
                $count = 0 if $count < 0;
                # FIXME ...add an upper limit?

                # now make sure it's a real job
                if (defined $jobs{$job}) {
                    $jobs{$job}->[0] = $count;
                    $Mgd::allkidsup = 0;
                    push @out, "Now desiring $count children doing '$job'.";

                    # try to clean out the queryworkers (if that's what we're doing?)
                    Frontend->CullQueryWorkers
                        if $job eq 'queryworker';
                } else {
                    my $classes = join(", ", sort keys %jobs);
                    push @out, "ERROR: Invalid class '$job'.  Valid classes: $classes";
                }
            } else {
                push @out, "ERROR: usage: !want <count> <jobclass>";
            }

        } elsif ($cmd =~ /^to/) {
            # !to <jobclass> <message>
            # sends <message> to all children of <jobclass>
            if ($args =~ /^(\S+)\s+(.+)/) {
                my $ct = Frontend->SendToChildrenByJob($1, $2);
                push @out, "Message sent to $ct children.";

            } else {
                push @out, "ERROR: usage: !to <jobclass> <message>";
            }

        } elsif ($cmd =~ /^queue/ || $cmd =~ /^pend/) {
            foreach my $clq (@ClientQueue) {
                push @out, $clq->[1];
            }

        } elsif ($cmd =~ /^watch/) {
            if (delete $ErrorsTo{$client->{fd}}) {
                push @out, "Removed you from watcher list.";
            } else {
                $ErrorsTo{$client->{fd}} = $client;
                push @out, "Added you to watcher list.";
            }

        } elsif ($cmd =~ /^recent/) {
            # show the most recent N queries
            push @out, @RecentQueries;

        } else {
            Frontend->SendHelp($client, $args);
        }
        $client->write(join("\r\n", @out) . "\r\n") if @out;
        $client->write(".\r\n");
        return;
    }

    # just push the input onto the client queue
    $Stats{queries}++;
    push @ClientQueue, [ $_[1], "cmd " . ($_[1]->peer_ip_string || '0.0.0.0') . " $_[2]" ];
    Frontend->ProcessQueues;
}

# a child has contacted us with some command/status/something.
sub HandleChildRequest {
    return Mgd::error("Frontend (Child) got request from child: $_[2]") if $IsChild;

    # if they have no job set, then their first line is what job they are
    # and not a command.  they also specify their pid, just so we know what
    # connection goes with what pid, in case it's ever useful information.
    my WorkerConn $child = $_[1];
    unless (defined $child->job) {
        my ($pid, $job) = ($_[2] =~ /^(\d+)\s+(.+)/);
        $child->job($job);
        $child->pid($pid);

        # now do any special case startup
        if ($job eq 'queryworker') {
            Frontend->RegisterQueryWorker($child);
        }

        # add to normal list
        $ChildrenByJob{$job}->{$child->pid} = $child;
        return;
    }

    # see if we should downsize this child
    my $check_job = sub {
        if (Mgd::job_needs_reduction($child->job)) {
            Mgd::error("Reducing headcount of " . $child->job . " job by 1.");
            Frontend->AskWorkerToDie($child);
        } else {
            $child->drain_queue;
        }
    };

    # at this point we've got a command of some sort
    my $cmd = $_[2];
    if ($cmd =~ /^error (.+)$/i) {
        # pass it on to our error handler, prefaced with the child's job
        Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");

    } elsif ($cmd =~ /^queue/) {
        # send out what we have queued up for it
        $child->drain_queue;

    } elsif ($cmd =~ /^del_i_looped/) {
        $check_job->();

    } elsif ($cmd =~ /^monitor_ping/) {
        $check_job->();

    } elsif ($cmd =~ /^reaper_ping/) {
        $check_job->();

    } elsif ($cmd =~ /^repl_ping/) {
        $check_job->();

    } elsif ($cmd =~ /^repl_unreachable (\d+)/) {
        # announce to the other replicators that this fid can't be reached, but note
        # that we don't actually drain the queue to the requestor, as the replicator
        # isn't in a place where it can accept a queue drain right now.
        Frontend->SendToChildrenByJob('replicate', "repl_unreachable $1", $child);

    } elsif ($cmd =~ /^repl_i_did (\d+)/) {
        my $fid = $1;

        # announce to the other replicators that this fid was done and then drain the
        # queue to this person.
        Frontend->SendToChildrenByJob('replicate', "repl_was_done $fid", $child);
        $check_job->();

    } else {
        # unknown command
        Mgd::error("Unknown command [$_[2]] from child; job=" . $child->job);
    }
}

# given a job class, and a message, send it to all children of that job.  returns
# the number of children the message was sent to.
# arguments: ( jobclass, message, [ child ] )
# if child is specified, the message will be sent to members of the job class that
# aren't that child.  so you can exclude the one that originated the message.
sub SendToChildrenByJob {
    my $childref = $ChildrenByJob{$_[1]};
    return 0 unless defined $childref && %$childref;
    my $msg = $_[2];

    foreach my $child (values %$childref) {
        # ignore the child specified as the third arg if one is sent
        next if defined $_[3] && $_[3] == $child;

        # send the message to this child
        $child->enqueue_line($msg);
    }
    return scalar(keys %$childref);
}

# called when we notice that a worker has bit it.  we might have to restart a
# job that they had been working on.
sub NoteDeadWorkerConn {
    return if $IsChild;

    # get parms and error check
    my WorkerConn $worker = $_[1];
    return unless $worker;

    # if there's a mapping for this worker's fd, they had a job that didn't get done
    if ($Mappings{$worker->{fd}}) {
        # unshift, since this one already went through the queue once
        unshift @ClientQueue, $Mappings{$worker->{fd}};
        delete $Mappings{$worker->{fd}};

        # now try to get it processing again
        Frontend->ProcessQueues;
    }
}


#####################################################################
### W O R K E R   C L A S S
### Class that handles all of the actions that a worker can take.
#####################################################################
package QueryWorker;

use fields qw{sock querystarttime reqid};

sub new {
    my QueryWorker $self = shift;

    $self = fields::new($self) unless ref $self;
    $self->{sock} = shift;
    $self->{querystarttime} = undef;
    $self->{reqid} = undef;

    return $self;
}

sub process_line {
    my QueryWorker $self = shift;
    my $lineref = shift;

    # see what kind of command this is
    return $self->err_line('unknown_command')
        unless $$lineref =~ /^(\d+-\d+)?\s*(\S+)\s+(\S+)\s*(.*)/;
    $self->{reqid} = $1 || undef;
    my ($cmd, $line) = ($2, $4);

    # set global variables for zone determination
    $client_ip = $3;
    $force_alt_zone = 0;

    # some basic commands we support
    if ($cmd eq 'echo') {
        Mgd::send_to_parent($line);
        return;
    } elsif ($cmd eq 'shutdown') {
        exit 0;
    }

    # fallback to normal command handling
    if ($line =~ /^(\w+)\s*(.*)/) {
        my ($cmd, $args) = ($1, $2);
        $cmd = lc($cmd);

        no strict 'refs';
        $self->{querystarttime} = Time::HiRes::gettimeofday();
        my $cmd_handler = *{"cmd_$cmd"}{CODE};
        if ($cmd_handler) {
            my $args = decode_url_args(\$args);
            $force_alt_zone = 1 if $args->{zone} eq 'alt';
            $cmd_handler->($self, $args);
            return;
        }
    }

    return $self->err_line('unknown_command');
}

# returns 0 on error, or dmid of domain
sub check_domain {
    my QueryWorker $self = shift;
    my $args = shift;

    return $self->err_line("no_domain") unless length($args->{domain});

    # validate domain
    my $dmid = Mgd::domain_id($args->{domain}) or
        return $self->err_line("unreg_domain");

    return $dmid;
}

sub cmd_sleep {
    my QueryWorker $self = shift;
    my $args = shift;
    sleep($args->{duration} || 10);
    return $self->ok_line;
}

sub cmd_create_open {
    my QueryWorker $self = shift;
    my $args = shift;

    # validate parameters
    my $dmid = $self->check_domain($args) or return 0;
    my $key = $args->{key} || "";
    my $multi = $args->{multi_dest} ? 1 : 0;

    # get DB handle
    my $dbh = Mgd::get_dbh or
        return $self->err_line("nodb");

    # figure out what classid this file is for
    my $class = $args->{class};
    my $classid = 0;
    if (length($class)) {
        # TODO: cache this
        $classid = $dbh->selectrow_array("SELECT classid FROM class ".
                                         "WHERE dmid=? AND classname=?",
                                         undef, $dmid, $class)
            or return $self->err_line("unreg_class");
    }

    # find a device to put this file on that has 100Mb free.
    my (@dests, @hosts);
    my $devs = Mgd::get_device_summary();
    while (scalar(@dests) < ($multi ? 3 : 1)) {
        my $devid = Mgd::find_deviceid(
            random => 1,
            weight_by_free => 1,
            not_on_hosts => \@hosts,
        );

        last unless defined $devid;

        push @dests, $devid;
        push @hosts, $devs->{$devid}->{hostid};
    }
    return $self->err_line("no_devices") unless @dests;

    # setup the new mapping.  we store the devices that we picked for
    # this file in here, knowing that they might not be used.  create_close
    # is responsible for actually mapping in file_on.
    $dbh->do("INSERT INTO tempfile SET ".
             " fid=NULL, dmid=?, dkey=?, classid=?, createtime=UNIX_TIMESTAMP(), devids=?",
             undef, $dmid, $key, $classid, join(',', @dests));
    return undef if $dbh->err;
    my $fid = $dbh->{mysql_insertid};  # FIXME: mysql-ism
    return undef unless $fid > 0;

    # original single path support
    return $self->ok_line({
        fid => $fid,
        devid => $dests[0],
        path => Mgd::make_path($dests[0], $fid),
    }) unless $multi;

    # multiple path support
    my $ct = 0;
    my $res = {};
    foreach my $devid (@dests) {
        $ct++;
        $res->{"devid_$ct"} = $devid;
        $res->{"path_$ct"} = Mgd::make_path($devid, $fid);
    }
    $res->{fid} = $fid;
    $res->{dev_count} = $ct;
    return $self->ok_line($res);
}

sub cmd_create_close {
    my QueryWorker $self = shift;
    my $args = shift;

    # validate parameters
    my $dmid = $self->check_domain($args) or return 0;
    my $key = $args->{key};

    my $fid = $args->{fid} or return $self->err_line("no_fid");
    my $devid = $args->{devid} or return $self->err_line("no_devid");
    my $path = $args->{path} or return $self->err_line("no_path");

    # is the provided path what we'd expect for this fid/devid?
    return $self->err_line("bogus_args")
        unless $path eq Mgd::make_path($devid, $fid);

    # get DB handle
    my $dbh = Mgd::get_dbh or
        return $self->err_line("nodb");

    # find the temp file we're closing and making real
    my $trow = $dbh->selectrow_hashref("SELECT classid, dmid, dkey ".
                                       "FROM tempfile WHERE fid=?",
                                       undef, $fid);
    return $self->err_line("no_temp_file") unless $trow;

    # if a temp file is closed without a provided-key, that means to
    # delete it.
    unless (length($key)) {
        # add to to-delete list
        $dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $fid);
        $dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fid);
        return $self->ok_line;
    }

    # see if we have a fid for this key already
    my $old_file = Mgd::key_filerow($dbh, $dmid, $key);
    if ($old_file) {
        # add to to-delete list
        $dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $old_file->{fid});
        $dbh->do("DELETE FROM file WHERE fid=?", undef, $old_file->{fid});
    }

    # get size of file and verify that it matches what we were given, if anything
    my $size = Mgd::get_file_size($path);
    return $self->err_line("size_mismatch", "Expected: $args->{size}; actual: $size; path: $path")
        if $args->{size} && ($args->{size} != $size);

    # TODO: check for EIO?
    return $self->err_line("empty_file") unless $size;

    # insert file_on row
    $dbh->do("INSERT IGNORE INTO file_on SET fid = ?, devid = ?", undef, $fid, $devid);
    return $self->err_line("db_error") if $dbh->err;

    my $rv = $dbh->do("REPLACE INTO file ".
                      "SET ".
                      "  fid=?, dmid=?, dkey=?, length=?, ".
                      "  classid=?, devcount=0", undef,
                      $fid, $dmid, $key, $size, $trow->{classid});
    return $self->err_line("db_error") unless $rv;

    $dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fid);

    if (Mgd::update_fid_devcount($fid)) {
        return $self->ok_line();
    } else {
        # FIXME: handle this better
        return $self->err_line("db_error");
    }
}

sub cmd_delete {
    my QueryWorker $self = shift;
    my $args = shift;

    # validate parameters
    my $dmid = $self->check_domain($args) or return 0;
    my $key = $args->{key};
    return $self->err_line("no_key") unless length($key);

    # get DB handle
    my $dbh = Mgd::get_dbh or
        return $self->err_line("nodb");

    # is this fid still owned by this key?
    my $fid = $dbh->selectrow_array("SELECT fid FROM file WHERE dmid=? AND dkey=?",
                                    undef, $dmid, $key);
    return $self->err_line("unknown_key") unless $fid;

    $dbh->do("DELETE FROM file WHERE fid=?", undef, $fid);
    $dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $fid);

    return $self->ok_line();

}

sub cmd_list_keys {
    my QueryWorker $self = shift;
    my $args = shift;

    # validate parameters
    my $dmid = $self->check_domain($args) or return 0;
    my ($prefix, $after, $limit) = ($args->{prefix}, $args->{after}, $args->{limit});
    return $self->err_line("no_key") unless $prefix;

    # now validate that after matches prefix
    return $self->err_line('after_mismatch')
        if $after && $after !~ /^$prefix/;

    # verify there are no % or \ characters
    return $self->err_line('invalid_chars')
        if $prefix =~ /[%\\]/;

    # escape underscores
    $prefix =~ s/_/\\_/g;

    # now fix the input... prefix always ends with a % so that it works
    # in a LIKE call, and after is either blank or something
    $prefix .= '%';
    $after ||= '';
    $limit ||= 1000;
    $limit += 0;

    # get DB handle
    my $dbh = Mgd::get_dbh or
        return $self->err_line("nodb");

    # now select out our keys
    my $keys = $dbh->selectcol_arrayref
        ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
         "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);

    # if we got nothing, say so
    return $self->err_line('none_match') unless $keys && @$keys;

    # construct the output and send
    my $ret = { key_count => 0, next_after => '' };
    foreach my $key (@$keys) {
        $ret->{key_count}++;
        $ret->{next_after} = $key
            if $key gt $ret->{next_after};
        $ret->{"key_$ret->{key_count}"} = $key;
    }
    return $self->ok_line($ret);
}

sub cmd_rename {
    my QueryWorker $self = shift;
    my $args = shift;

    # validate parameters
    my $dmid = $self->check_domain($args) or return 0;
    my ($fkey, $tkey) = ($args->{from_key}, $args->{to_key});
    return $self->err_line("no_key") unless $fkey && $tkey;

    # get DB handle
    my $dbh = Mgd::get_dbh or
        return $self->err_line("nodb");

    # rename the file
    my $ct = $dbh->do('UPDATE file SET dkey = ? WHERE dmid = ? AND dkey = ?',
                      undef, $tkey, $dmid, $fkey);
    return $self->err_line("key_exists") if $dbh->err;
    return $self->err_line("unknown_key") unless $ct > 0;

    return $self->ok_line();
}

sub cmd_get_hosts {
    my QueryWorker $self = shift;
    my $args = shift;

    my $dbh = Mgd::get_dbh()
        or return $self->err_line("nodb");

    Mgd::check_host_cache();

    my $ret = { hosts => 0 };
    while (my ($hostid, $row) = each %Mgd::cache_host) {
        next if defined $args->{hostid} && $hostid != $args->{hostid};

        $ret->{hosts}++;
        while (my ($key, $val) = each %$row) {
            $ret->{"host$ret->{hosts}_$key"} = $val;
        }
    }

    return $self->ok_line($ret);
}

sub cmd_get_devices {
    my QueryWorker $self = shift;
    my $args = shift;

    my $dbh = Mgd::get_dbh()
        or return $self->err_line("nodb");

    my $devs = Mgd::get_device_summary();

    my $ret = { devices => 0 };
    while (my ($devid, $row) = each %$devs) {
        next if defined $args->{devid} && $devid != $args->{devid};

        $ret->{devices}++;
        while (my ($key, $val) = each %$row) {
            $ret->{"dev$ret->{devices}_$key"} = $val;
        }
    }

    return $self->ok_line($ret);
}

sub cmd_create_domain {
    my QueryWorker $self = shift;
    my $args = shift;

    my $dbh = Mgd::get_dbh()
        or return $self->err_line("nodb");

    my $domain = $args->{domain};
    return $self->err_line('no_domain') unless length $domain;

    # FIXME: add some sort of authentication/limitation on this?

    my $dmid = Mgd::domain_id($domain);
    return $self->err_line('domain_exists') if $dmid;

    # get the max domain id
    my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain');
    $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
             undef, $maxid + 1, $domain);
    return $self->err_line('failure') if $dbh->err;

    # return the domain id we created
    return $self->ok_line({ domain => $domain });
}

sub cmd_create_class {
    my QueryWorker $self = shift;
    my $args = shift;

    my $dbh = Mgd::get_dbh()
        or return $self->err_line("nodb");

    my $domain = $args->{domain};
    return $self->err_line('no_domain') unless length $domain;

    my $class = $args->{class};
    return $self->err_line('no_class') unless length $class;

    my $mindevcount = $args->{mindevcount}+0;
    return $self->err_line('invalid_mindevcount') unless $mindevcount > 0;

    # FIXME: add some sort of authentication/limitation on this?

    my $dmid = Mgd::domain_id($domain);
    return $self->err_line('no_domain') unless $dmid;

    my $cid = Mgd::class_id($dmid, $class);
    return $self->err_line('class_exists') if $cid && !$args->{update};

    # update or insert at this point
    if ($args->{update}) {
        # now replace the old class
        $dbh->do("REPLACE INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
                 undef, $dmid, $cid, $class, $mindevcount);
    } else {
        # get the max class id in this domain
        my $maxid = $dbh->selectrow_array
            ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid);

        # now insert the new class
        $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
                 undef, $dmid, $maxid + 1, $class, $mindevcount);
    }
    return $self->err_line('failure') if $dbh->err;

    # return success
    return $self->ok_line({ class => $class, mindevcount => $mindevcount, domain => $domain });
}

sub cmd_update_class {
    my QueryWorker $self = shift;
    my $args = shift;

    # simply passes through to create_class with update set
    $self->cmd_create_class({ %$args, update => 1 });
}

sub cmd_get_domains {
    my QueryWorker $self = shift;
    my $args = shift;

    my $dbh = Mgd::get_dbh()
        or return $self->err_line("nodb");

    my $domains = $dbh->selectall_arrayref('SELECT dmid, namespace FROM domain');

    my $ret = {};
    my $outercount = 0;
    foreach my $row (@$domains) {
        $ret->{"domain" . ++$outercount} = $row->[1];

        # setup the return row for this set of classes
        my $classes = $dbh->selectall_arrayref
            ('SELECT classname, mindevcount FROM class WHERE dmid = ?', undef, $row->[0]);
        my $innercount = 0;
        foreach my $irow (@$classes) {
            $ret->{"domain${outercount}class" . ++$innercount . "name"} = $irow->[0];
            $ret->{"domain${outercount}class" . $innercount . "mindevcount"} = $irow->[1];
        }

        # record the default class and mindevcount
        $ret->{"domain${outercount}class" . ++$innercount . "name"} = 'default';
        $ret->{"domain${outercount}class" . $innercount . "mindevcount"} = $default_mindevcount;

        $ret->{"domain${outercount}classes"} = $innercount;
    }
    $ret->{"domains"} = $outercount;

    return $self->ok_line($ret);
}

sub cmd_get_paths {
    my QueryWorker $self = shift;
    my $args = shift;

    my $key = $args->{key};

    return $self->err_line("no_key") unless length($key);

    # validate domain
    my $dmid = $self->check_domain($args) or return 0;

    # get DB handle
    my $dbh = Mgd::get_dbh or
        return $self->err_line("nodb");

    my $filerow = Mgd::key_filerow($dbh, $dmid, $key);
    return $self->err_line("unknown_key") unless $filerow;

    my $fid = $filerow->{fid};
    my $dsum = Mgd::get_device_summary();

    my $ret = {
        paths => 0,
    };

    # is this fid still owned by this key?
    my $devids = $dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
                                          undef, $fid) || [];
    my $devcount = scalar(@$devids);
    my $idx = int(rand() * $devcount);

    for (1..$devcount) {
        my $devid = $devids->[($_+$idx) % $devcount];
        my $dev = $dsum->{$devid};
        next unless $dev && $dev->{status} eq "alive";
        my $path = Mgd::make_get_path($devid, $fid);
        next unless $ret->{paths} || $args->{noverify} ||
                        (Mgd::get_file_size($path) == $filerow->{length});
        my $n = ++$ret->{paths};
        $ret->{"path$n"} = $path;
        last if $n == 2;   # one verified, one likely seems enough for now.  time will tell.
    }

    return $self->ok_line($ret);
}

sub cmd_set_state {
    my QueryWorker $self = shift;
    my $args = shift;

    # get database handle
    my $ret = {};
    my $dbh = Mgd::get_dbh
        or return $self->err_line('nodb');

    # figure out what they want to do
    my ($host, $dev, $state) = ($args->{host}, $args->{device}+0, $args->{state});
    return $self->err_line('bad_params')
        unless $host && $dev && ($state =~ /^(?:alive|down|dead)$/);

    # now get this device's current state and host
    my ($realhost, $curstate) =
        $dbh->selectrow_array('SELECT hostname, device.status FROM host, device ' .
                              'WHERE host.hostid = device.hostid AND device.devid = ?',
                              undef, $dev);

    # verify host is the same
    return $self->err_line('host_mismatch')
        unless $realhost eq $host;

    # make sure the destination state isn't too high
    return $self->err_line('state_too_high')
        if $curstate eq 'dead' && $state eq 'alive';

    # update the state in the database now
    $dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $dev);
    return $self->err_line('failure') if $dbh->err;

    # success, state changed
    return $self->ok_line($ret);
}

sub cmd_stats {
    my QueryWorker $self = shift;
    my $args = shift;

    # get database handle
    my $ret = {};
    my $dbh = Mgd::get_dbh
        or return $self->err_line('nodb');

    # get names of all domains and classes for use later
    my %classes;
    my $rows = $dbh->selectall_arrayref('SELECT class.dmid, namespace, classid, classname ' .
                                        'FROM domain, class WHERE class.dmid = domain.dmid');
    foreach my $row (@$rows) {
        $classes{$row->[0]}->{name} = $row->[1];
        $classes{$row->[0]}->{classes}->{$row->[2]} = $row->[3];
    }
    $classes{$_}->{classes}->{0} = 'default'
        foreach keys %classes;

    # get host and device information with device status
    my %devices;
    my $rows = $dbh->selectall_arrayref('SELECT device.devid, hostname, device.status ' .
                                        'FROM device, host WHERE device.hostid = host.hostid');
    foreach my $row (@$rows) {
        $devices{$row->[0]}->{host} = $row->[1];
        $devices{$row->[0]}->{status} = $row->[2];
    }

    # if they want replication counts, or didn't specify what they wanted
    if ($args->{replication} || $args->{all}) {
        # replication stats
        my $stats = $dbh->selectall_arrayref('SELECT dmid, classid, devcount, COUNT(devcount) FROM file GROUP BY 1, 2, 3');
        my $count = 0;
        foreach my $stat (@$stats) {
            $count++;
            $ret->{"replication${count}domain"} = $classes{$stat->[0]}->{name};
            $ret->{"replication${count}class"} = $classes{$stat->[0]}->{classes}->{$stat->[1]};
            $ret->{"replication${count}devcount"} = $stat->[2];
            $ret->{"replication${count}files"} = $stat->[3];
        }
        $ret->{"replicationcount"} = $count;
    }

    # file statistics (how many files there are and in what domains/classes)
    if ($args->{files} || $args->{all}) {
        my $stats = $dbh->selectall_arrayref('SELECT dmid, classid, COUNT(classid) FROM file GROUP BY 1, 2');
        my $count = 0;
        foreach my $stat (@$stats) {
            $count++;
            $ret->{"files${count}domain"} = $classes{$stat->[0]}->{name};
            $ret->{"files${count}class"} = $classes{$stat->[0]}->{classes}->{$stat->[1]};
            $ret->{"files${count}files"} = $stat->[2];
        }
        $ret->{"filescount"} = $count;
    }

    # device statistics (how many files are on each device)
    if ($args->{devices} || $args->{all}) {
        my $stats = $dbh->selectall_arrayref('SELECT devid, COUNT(devid) FROM file_on GROUP BY 1');
        my $count = 0;
        foreach my $stat (@$stats) {
            $count++;
            $ret->{"devices${count}id"} = $stat->[0];
            $ret->{"devices${count}host"} = $devices{$stat->[0]}->{host};
            $ret->{"devices${count}status"} = $devices{$stat->[0]}->{status};
            $ret->{"devices${count}files"} = $stat->[1];
        }
        $ret->{"devicescount"} = $count;
    }

    # FIXME: DO! add other stats

    return $self->ok_line($ret);
}

sub ok_line {
    my QueryWorker $self = shift;

    my $delay = '';
    if ($self->{querystarttime}) {
        $delay = sprintf("%.4f ", Time::HiRes::tv_interval([ $self->{querystarttime} ]));
        $self->{querystarttime} = undef;
    }

    my $id = defined $self->{reqid} ? "$self->{reqid} " : '';

    my $args = shift;
    my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
    $self->{sock}->write("${id}${delay}OK $argline\r\n");
    return 1;
}

# first argument: error code.
# second argument: optional error text.  text will be taken from code if no text provided.
sub err_line {
    my QueryWorker $self = shift;
    my $err_code = shift;
    my $err_text = shift || {
        'unknown_command' => "Unknown server command",
        'no_domain' => "No domain provided",
        'no_class' => "No class provided",
        'unreg_domain' => "Domain name invalid/not found",
        'class_exists' => "That class already exists in that domain",
        'domain_exists' => "That domain already exists",
        'invalid_mindevcount' => "The mindevcount must be at least 1",
        'bad_params' => "Invalid parameters to command; please see documentation",
        'host_mismatch' => "The device specified doesn't belong to the host specified",
        'state_too_high' => "Status cannot go from dead to alive; must use down",
        'failure' => "Operation failed",
        'key_exists' => "Target key name already exists; can't overwrite.",
        'none_match' => "No keys match that pattern and after-value (if any).",
        'after_mismatch' => "Pattern does not match the after-value?",
        'invalid_chars' => "Patterns must not contain backslashes (\\) or percent signs (%).",
    }->{$err_code};

    my $delay = '';
    if ($self->{querystarttime}) {
        $delay = sprintf("%.4f ", Time::HiRes::tv_interval([ $self->{querystarttime} ]));
        $self->{querystarttime} = undef;
    }

    my $id = defined $self->{reqid} ? "$self->{reqid} " : '';

    $self->{sock}->write("${id}${delay}ERR $err_code " . eurl($err_text) . "\r\n");
    return 0;
}

sub eurl
{
    my $a = $_[0];
    $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
    $a =~ tr/ /+/;
    return $a;
}

sub decode_url_args
{
    my $a = shift;
    my $buffer = ref $a ? $a : \$a;
    my $ret = {};

    my $pair;
    my @pairs = split(/&/, $$buffer);
    my ($name, $value);
    foreach $pair (@pairs)
    {
        ($name, $value) = split(/=/, $pair);
        $value =~ tr/+/ /;
        $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $name =~ tr/+/ /;
        $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
        $ret->{$name} .= $ret->{$name} ? "\0$value" : $value;
    }
    return $ret;
}


# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End:
