3038 lines
		
	
	
		
			99 KiB
		
	
	
	
		
			Perl
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			3038 lines
		
	
	
		
			99 KiB
		
	
	
	
		
			Perl
		
	
	
		
			Executable File
		
	
	
#!/usr/bin/perl
 | 
						|
#
 | 
						|
# MogileFS daemon - HEAVILY UNDER CONSTRUCTION
 | 
						|
#
 | 
						|
# Copyright 2004, Danga Interactive
 | 
						|
#
 | 
						|
# Authors:
 | 
						|
#   Brad Fitzpatrick <brad@danga.com>
 | 
						|
#   Brad Whitaker    <whitaker@danga.com>
 | 
						|
#   Mark Smith       <junior@danga.com>
 | 
						|
#
 | 
						|
# License:
 | 
						|
#   undecided.
 | 
						|
#
 | 
						|
 | 
						|
package Mgd;
 | 
						|
 | 
						|
# don't run as root
 | 
						|
die "mogilefsd cannot be run as root\n"
 | 
						|
    if $< == 0;
 | 
						|
 | 
						|
use strict;
 | 
						|
use Getopt::Long;
 | 
						|
use IO::Socket;
 | 
						|
use Symbol;
 | 
						|
use POSIX ":sys_wait_h"; # argument for waitpid
 | 
						|
use POSIX;
 | 
						|
use DBI;
 | 
						|
use DBD::mysql;
 | 
						|
use File::Copy ();
 | 
						|
use Carp;
 | 
						|
use File::Basename ();
 | 
						|
use File::Path ();
 | 
						|
use Sys::Syslog;
 | 
						|
use Socket qw(MSG_NOSIGNAL);
 | 
						|
use Time::HiRes qw(gettimeofday tv_interval);
 | 
						|
use Net::Netmask;
 | 
						|
use LWP::UserAgent;
 | 
						|
 | 
						|
 | 
						|
#####################################################################
 | 
						|
### C O N F I G
 | 
						|
#####################################################################
 | 
						|
 | 
						|
use vars qw($dbh $DEFAULT_CONFIG $DEFAULT_MOG_ROOT $MOG_ROOT $MOGSTORED_STREAM_PORT $DEBUG $USE_HTTP $FLAG_NOSIGNAL);
 | 
						|
 | 
						|
$DEFAULT_CONFIG = "/etc/mogilefs/mogilefsd.conf";
 | 
						|
$DEFAULT_MOG_ROOT = "/mnt/mogilefs";
 | 
						|
$MOGSTORED_STREAM_PORT = 7501;
 | 
						|
$DEBUG = 0;
 | 
						|
 | 
						|
# used in send() calls to request not to get SIGPIPEd
 | 
						|
eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL };
 | 
						|
 | 
						|
my (
 | 
						|
    %cmdline,
 | 
						|
    %cfgfile,
 | 
						|
    $config,
 | 
						|
    $skipconfig,
 | 
						|
    $daemonize,
 | 
						|
    $db_dsn,
 | 
						|
    $db_user,
 | 
						|
    $db_pass,
 | 
						|
    $conf_port,
 | 
						|
    $query_jobs,
 | 
						|
    $delete_jobs,
 | 
						|
    $replicate_jobs,
 | 
						|
    $reaper_jobs,
 | 
						|
    $monitor_jobs,
 | 
						|
    $mog_root,
 | 
						|
    $default_mindevcount,
 | 
						|
    $worker_port,
 | 
						|
    $upgrade,
 | 
						|
    $min_free_space,
 | 
						|
    $max_disk_age,
 | 
						|
    $node_timeout,          # time in seconds to wait for storage node responses
 | 
						|
   );
 | 
						|
 | 
						|
# Command-line options will override
 | 
						|
Getopt::Long::Configure( "bundling" );
 | 
						|
Getopt::Long::GetOptions(
 | 
						|
    'c|config=s'    => \$config,
 | 
						|
    's|skipconfig'  => \$skipconfig,
 | 
						|
    'd|debug+'      => \$cmdline{debug},
 | 
						|
    'D|daemon'      => \$cmdline{daemonize},
 | 
						|
    'dsn=s'         => \$cmdline{db_dsn},
 | 
						|
    'dbuser=s'      => \$cmdline{db_user},
 | 
						|
    'dbpass=s'      => \$cmdline{db_pass},
 | 
						|
    'r|mogroot=s'   => \$cmdline{mog_root},
 | 
						|
    'p|confport=i'  => \$cmdline{conf_port},
 | 
						|
    'w|workers=i'   => \$cmdline{query_jobs},
 | 
						|
    'no_http'       => \$cmdline{no_http},
 | 
						|
    'workerport=i'  => \$cmdline{worker_port},
 | 
						|
    'upgrade'       => \$upgrade,
 | 
						|
    'maxdiskage=i'  => \$cmdline{max_disk_age},
 | 
						|
    'minfreespace=i' => \$cmdline{min_free_space},
 | 
						|
    'default_mindevcount=i' => \$cmdline{default_mindevcount},
 | 
						|
    'node_timeout=i' => \$cmdline{node_timeout},
 | 
						|
);
 | 
						|
 | 
						|
$config = $DEFAULT_CONFIG if !$config && -r $DEFAULT_CONFIG;
 | 
						|
 | 
						|
# Read the config file if one was specified
 | 
						|
if ( $config && !$skipconfig ) {
 | 
						|
    open my $cf, "<$config" or die "open: $config: $!";
 | 
						|
 | 
						|
    my $configLine = qr{
 | 
						|
        ^\s*                    # Leading space
 | 
						|
        (\w+)                   # Key
 | 
						|
        \s+ =? \s*              # space + optional equal + optional space
 | 
						|
        (.+?)                   # Value
 | 
						|
        \s*$                    # Trailing space
 | 
						|
    }x;
 | 
						|
 | 
						|
    my $linecount = 0;
 | 
						|
    while (defined( my $line = <$cf> )) {
 | 
						|
        $linecount++;
 | 
						|
        next if $line =~ m{^\s*(#.*)?$};
 | 
						|
        die "Malformed config file (line $linecount)" unless $line =~ $configLine;
 | 
						|
 | 
						|
        my ( $key, $value ) = ( $1, $2 );
 | 
						|
        print STDERR "Setting '$key' to '$value'\n" if $cmdline{debug};
 | 
						|
        $cfgfile{ $key } = $value;
 | 
						|
    }
 | 
						|
 | 
						|
    close $cf;
 | 
						|
}
 | 
						|
 | 
						|
### FUNCTION: choose_value( $name, $default[, $boolean] )
 | 
						|
sub choose_value ($$;$) {
 | 
						|
    my ( $name, $default, $boolean ) = @_;
 | 
						|
 | 
						|
    return $cmdline{$name} if defined $cmdline{$name};
 | 
						|
    return $cfgfile{$name} if defined $cfgfile{$name};
 | 
						|
    return $default;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
# Fill in defaults for those values which were either loaded from config or
 | 
						|
# specified on the command line. Command line takes precendence, then values in
 | 
						|
# the config file, then the defaults.
 | 
						|
$daemonize      = choose_value( 'daemonize', 0, 1 );
 | 
						|
$db_dsn         = choose_value( 'db_dsn', "DBI:mysql:mogilefs" );
 | 
						|
$db_user        = choose_value( 'db_user', "mogile" );
 | 
						|
$db_pass        = choose_value( 'db_pass', "", 1 );
 | 
						|
$conf_port      = choose_value( 'conf_port', 7001 );
 | 
						|
$MOG_ROOT       = choose_value( 'mog_root', $DEFAULT_MOG_ROOT );
 | 
						|
$query_jobs     = choose_value( 'listener_jobs', undef) || # undef if not present, then we
 | 
						|
                  choose_value( 'query_jobs', 20 );       # fall back to query_jobs, new name
 | 
						|
$delete_jobs    = choose_value( 'delete_jobs', 1 );
 | 
						|
$replicate_jobs = choose_value( 'replicate_jobs', 1 );
 | 
						|
$reaper_jobs    = choose_value( 'reaper_jobs', 1 );
 | 
						|
$monitor_jobs   = choose_value( 'monitor_jobs', 1 );
 | 
						|
$worker_port    = choose_value( 'worker_port', 7200 );
 | 
						|
$min_free_space = choose_value( 'min_free_space', 100 );
 | 
						|
$max_disk_age   = choose_value( 'max_disk_age', 5 );
 | 
						|
$DEBUG          = choose_value( 'debug', 0, 1 );
 | 
						|
$USE_HTTP       = ! choose_value( 'no_http', 0, 1);
 | 
						|
$default_mindevcount = choose_value( 'default_mindevcount', 2 );
 | 
						|
$node_timeout   = choose_value( 'node_timeout', 3 );
 | 
						|
 | 
						|
### initial setup
 | 
						|
Mgd::validate_dbh();
 | 
						|
my $dbh = Mgd::get_dbh();
 | 
						|
unless ($dbh) {
 | 
						|
    die <<NODB;
 | 
						|
Error: unable to establish connection with your MogileFS database.
 | 
						|
 | 
						|
Please verify that you have correctly setup a configuration file or are
 | 
						|
providing the correct information in order to reach the database and try
 | 
						|
running the MogileFS server again.
 | 
						|
NODB
 | 
						|
}
 | 
						|
 | 
						|
### make sure they have a database setup?
 | 
						|
my $host = $dbh->selectrow_hashref('SELECT * FROM host LIMIT 1');
 | 
						|
if ($dbh->err) {
 | 
						|
    my $err = $dbh->errstr;
 | 
						|
    my $db = ($db_dsn =~ /^DBI:mysql:(.+)$/) ? $1 : '<db_name>';
 | 
						|
    die <<ERR;
 | 
						|
Database error: $err
 | 
						|
 | 
						|
This is commonly caused by not having the tables created in the database.
 | 
						|
The easiest way to setup your database is to do something like this:
 | 
						|
 | 
						|
    cat devnotes/sql.txt | mysql -u$db_user -p $db
 | 
						|
 | 
						|
NOTE: This will OVERWRITE any MogileFS related tablase in the database.
 | 
						|
Please make SURE that this is what you want to do before you do this!
 | 
						|
ERR
 | 
						|
}
 | 
						|
 | 
						|
### but if it was undef, no hosts?
 | 
						|
if ($host) {
 | 
						|
    # see if they have the get port, else update it
 | 
						|
    unless (exists $host->{http_get_port}) {
 | 
						|
        if ($upgrade) {
 | 
						|
            print STDERR "Updating host table...\n";
 | 
						|
            $dbh->do("ALTER TABLE host ADD COLUMN http_get_port MEDIUMINT UNSIGNED AFTER http_port");
 | 
						|
            die "Error: " . $dbh->errstr . "\n" if $dbh->err;
 | 
						|
        } else {
 | 
						|
            die <<NEEDUPDATE;
 | 
						|
Error: host table out of date.
 | 
						|
 | 
						|
MogileFS needs to update your database schema.  Please rerun the MogileFS
 | 
						|
server with the --upgrade option if you wish for us to do this.
 | 
						|
 | 
						|
This will NOT destroy any existing data.
 | 
						|
NEEDUPDATE
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    # now update to add new columns?
 | 
						|
    unless (exists $host->{altip}) {
 | 
						|
        if ($upgrade) {
 | 
						|
            print STDERR "Updating host table...\n";
 | 
						|
            $dbh->do("ALTER TABLE host ADD COLUMN altip VARCHAR(15) AFTER hostip");
 | 
						|
            die "Error (1): " . $dbh->errstr . "\n" if $dbh->err;
 | 
						|
            $dbh->do("ALTER TABLE host ADD COLUMN altmask VARCHAR(18) AFTER altip");
 | 
						|
            die "Error (2): " . $dbh->errstr . "\n" if $dbh->err;
 | 
						|
            $dbh->do("ALTER TABLE host ADD UNIQUE altip (altip)");
 | 
						|
            die "Error (3): " . $dbh->errstr . "\n" if $dbh->err;
 | 
						|
        } else {
 | 
						|
            die <<NEEDUPDATE;
 | 
						|
Error: host table out of date.
 | 
						|
 | 
						|
MogileFS needs to update your database schema.  Please rerun the MogileFS
 | 
						|
server with the --upgrade option if you wish for us to do this.
 | 
						|
 | 
						|
This will NOT destroy any existing data.
 | 
						|
NEEDUPDATE
 | 
						|
        }
 | 
						|
    }
 | 
						|
} else {
 | 
						|
    die <<NOHOSTS;
 | 
						|
Error: no hosts found.
 | 
						|
 | 
						|
It seems like you don't have any hosts defined in your MogileFS setup.
 | 
						|
This means that we really can't do anything, so we're going to shut down.
 | 
						|
NOHOSTS
 | 
						|
}
 | 
						|
 | 
						|
### and now check for devices
 | 
						|
my $device = $dbh->selectrow_hashref('SELECT * FROM device LIMIT 1');
 | 
						|
if ($device) {
 | 
						|
    unless (exists $device->{mb_asof}) {
 | 
						|
        if ($upgrade) {
 | 
						|
            print STDERR "Updating device table...\n";
 | 
						|
            $dbh->do("ALTER TABLE device ADD COLUMN mb_asof INT(10) UNSIGNED AFTER mb_used");
 | 
						|
            die "Error: " . $dbh->errstr . "\n" if $dbh->err;
 | 
						|
        } else {
 | 
						|
            die <<NEEDUPDATE;
 | 
						|
Error: device table out of date.
 | 
						|
 | 
						|
MogileFS needs to update your database schema.  Please rerun the MogileFS
 | 
						|
server with the --upgrade option if you wish for us to do this.
 | 
						|
 | 
						|
This will NOT destroy any existing data.
 | 
						|
NEEDUPDATE
 | 
						|
        }
 | 
						|
    }
 | 
						|
} else {
 | 
						|
    die <<NODEVICES;
 | 
						|
Error: no devices found.
 | 
						|
 | 
						|
It seems like you don't have any devices defined in your MogileFS setup.
 | 
						|
This means that we really can't do anything, so we're going to shut down.
 | 
						|
NODEVICES
 | 
						|
}
 | 
						|
 | 
						|
# see if they have the new fid table
 | 
						|
my $fid = $dbh->selectrow_array('SELECT fid FROM unreachable_fids LIMIT 1');
 | 
						|
if ($dbh->err) {
 | 
						|
    if ($upgrade) {
 | 
						|
        print STDERR "Creating unreachable_fids table...\n";
 | 
						|
        $dbh->do("CREATE TABLE unreachable_fids (" .
 | 
						|
                 "    fid        INT UNSIGNED NOT NULL," .
 | 
						|
                 "    lastupdate INT UNSIGNED NOT NULL," .
 | 
						|
                 "    PRIMARY KEY (fid)," .
 | 
						|
                 "    INDEX (lastupdate)" .
 | 
						|
                 ")");
 | 
						|
        die "Error: " . $dbh->errstr . "\n" if $dbh->err;
 | 
						|
    } else {
 | 
						|
        die <<NEEDUPDATE;
 | 
						|
Error: database schema out of date.
 | 
						|
 | 
						|
MogileFS needs to update your database schema.  Please rerun the MogileFS
 | 
						|
server with the --upgrade option if you wish for us to do this.
 | 
						|
 | 
						|
This will NOT destroy any existing data.
 | 
						|
NEEDUPDATE
 | 
						|
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# now check for an updated tempfile table
 | 
						|
my $devids = $dbh->selectrow_array('SELECT devids FROM tempfile LIMIT 1');
 | 
						|
if ($dbh->err) {
 | 
						|
    if ($upgrade) {
 | 
						|
        print STDERR "Updating tempfile table...\n";
 | 
						|
        $dbh->do("ALTER TABLE tempfile ADD COLUMN devids VARCHAR(60)");
 | 
						|
        die "Error: " . $dbh->errstr . "\n" if $dbh->err;
 | 
						|
    } else {
 | 
						|
        die <<NEEDUPDATE;
 | 
						|
Error: database schema out of date.
 | 
						|
 | 
						|
MogileFS needs to update your database schema.  Please rerun the MogileFS
 | 
						|
server with the --upgrade option if you wish for us to do this.
 | 
						|
 | 
						|
This will NOT destroy any existing data.
 | 
						|
NEEDUPDATE
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# we're done with this, so undef it before we start forking, as then
 | 
						|
# maybe we'll end up with children having the same socket and everybody
 | 
						|
# writing to it at the same time...
 | 
						|
undef $dbh;
 | 
						|
 | 
						|
#####################################################################
 | 
						|
### D A E M O N   F U N C T I O N S
 | 
						|
#####################################################################
 | 
						|
 | 
						|
daemonize() if $daemonize;
 | 
						|
 | 
						|
# keep track of what all child pids are doing, and what jobs are being
 | 
						|
# satisifed.
 | 
						|
my %child  = ();    # pid -> job
 | 
						|
my %todie  = ();    # pid -> 1 (lists pids that we've asked to die)
 | 
						|
my %jobs   = ();    # jobname -> [ min, current ]
 | 
						|
my $psock;          # IO::Socket::INET connection to parent (undef if parent)
 | 
						|
my %streamcache;    # host -> IO::Socket::INET to mogstored
 | 
						|
my $lastspawntime = 0; # time we last ran spawn_children sub
 | 
						|
our $allkidsup = 0;  # if true, all our kids are running. set to 0 when a kid dies.
 | 
						|
our $starttime = time; # time we got going
 | 
						|
our %domaincache; # { domainname => { domainrow } }
 | 
						|
our $domaincachetime = 0;
 | 
						|
our $client_ip = undef; # client ip address
 | 
						|
our $force_alt_zone = 0; # if on, force to use alternate zone (if it's defined)
 | 
						|
 | 
						|
# [ what we want to be at, what we are at ]
 | 
						|
$jobs{'queryworker'} = [ $query_jobs, 0 ];
 | 
						|
$jobs{'delete'} = [ $delete_jobs, 0 ];
 | 
						|
$jobs{'replicate'} = [ $replicate_jobs, 0 ];
 | 
						|
$jobs{'reaper'} = [ $reaper_jobs, 0 ];
 | 
						|
$jobs{'monitor'} = [ $monitor_jobs, 0 ];
 | 
						|
 | 
						|
# open up our log
 | 
						|
openlog('mogilefsd', 'pid', 'daemon');
 | 
						|
Mgd::log('info', 'beginning run');
 | 
						|
 | 
						|
sub validate_dbh {
 | 
						|
    return unless $dbh;
 | 
						|
    my $id = $dbh->selectrow_array("SELECT CONNECTION_ID()");
 | 
						|
    if (! $id) {
 | 
						|
        # handle's dead.  don't use it.  (MySQL-ism above)
 | 
						|
        undef $dbh;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub get_dbh {
 | 
						|
    return $dbh ||= DBI->connect($db_dsn, $db_user, $db_pass);
 | 
						|
}
 | 
						|
 | 
						|
# Install signal handlers.
 | 
						|
$SIG{TERM}  = sub {
 | 
						|
    print STDERR scalar keys %child, " children to kill.\n" if $DEBUG;
 | 
						|
    my $count = kill( 'TERM' => keys %child );
 | 
						|
    print STDERR "Sent SIGTERM to $count children.\n" if $DEBUG;
 | 
						|
    exit 0;
 | 
						|
};
 | 
						|
$SIG{INT}  = sub {
 | 
						|
    print STDERR scalar keys %child, " children to kill.\n" if $DEBUG;
 | 
						|
    my $count = kill( 'INT' => keys %child );
 | 
						|
    print STDERR "Sent SIGINT to $count children.\n" if $DEBUG;
 | 
						|
    exit 0;
 | 
						|
};
 | 
						|
 | 
						|
#############################################################################
 | 
						|
## beginning of main execution path
 | 
						|
#############################################################################
 | 
						|
 | 
						|
# setup server socket to listen for client connections
 | 
						|
my $server = IO::Socket::INET->new(LocalPort => $conf_port,
 | 
						|
                                   Type      => SOCK_STREAM,
 | 
						|
                                   Proto     => 'tcp',
 | 
						|
                                   Blocking  => 0,
 | 
						|
                                   Reuse     => 1,
 | 
						|
                                   Listen    => 10 )
 | 
						|
    or die "Error creating socket: $@\n";
 | 
						|
 | 
						|
# accept handler for new clients
 | 
						|
my $accept_handler = sub {
 | 
						|
    my $csock = $server->accept();
 | 
						|
    return unless $csock;
 | 
						|
 | 
						|
    printf( "Listen child making a Client for %d.\n", fileno($csock) )
 | 
						|
        if $DEBUG >= 2;
 | 
						|
    my $client = Client->new($csock);
 | 
						|
    printf( "Client is %s\n", $client ) if $DEBUG >= 2;
 | 
						|
    $client->watch_read(1);
 | 
						|
};
 | 
						|
 | 
						|
# now setup socket for workers to connect to
 | 
						|
my $wserver = IO::Socket::INET->new(LocalPort => $worker_port,
 | 
						|
                                    LocalAddr => '127.0.0.1',
 | 
						|
                                    Type      => SOCK_STREAM,
 | 
						|
                                    Proto     => 'tcp',
 | 
						|
                                    Blocking  => 0,
 | 
						|
                                    Reuse     => 1,
 | 
						|
                                    Listen    => 10 )
 | 
						|
    or die "Error creating socket: $@\n";
 | 
						|
 | 
						|
# accept handler for new workers
 | 
						|
my $waccept_handler = sub {
 | 
						|
    my $csock = $wserver->accept();
 | 
						|
    return unless $csock;
 | 
						|
 | 
						|
    printf( "Listen child making a Client for %d.\n", fileno($csock) )
 | 
						|
        if $DEBUG >= 2;
 | 
						|
    my $client = WorkerConn->new($csock);
 | 
						|
    printf( "Child is %s\n", $client ) if $DEBUG >= 2;
 | 
						|
    Frontend->RegisterWorkerConn($client);
 | 
						|
};
 | 
						|
 | 
						|
# thing to keep jobs alive
 | 
						|
my $spawn_children = sub {
 | 
						|
    # run only once per second
 | 
						|
    return 1 unless time > $lastspawntime;
 | 
						|
    $lastspawntime = time();
 | 
						|
 | 
						|
    # see if anybody has died, but don't hang up on doing so
 | 
						|
    my $pid = waitpid -1, WNOHANG;
 | 
						|
    return 1 if $pid <= 0 && $allkidsup;
 | 
						|
    $allkidsup = 0; # know something died
 | 
						|
 | 
						|
    # when a child dies, figure out what it was doing
 | 
						|
    # and note that job has one less worker
 | 
						|
    my $job;
 | 
						|
    if ($pid > -1 && ($job = delete $child{$pid})) {
 | 
						|
        my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
 | 
						|
        error("Child $pid ($job) died: $? ($extra)");
 | 
						|
        Frontend->NoteDeadChild($pid);
 | 
						|
 | 
						|
        if (my $jobstat = $jobs{$job}) {
 | 
						|
            # if the pid is in %todie, then we have asked it to shut down
 | 
						|
            # and have already decremented the jobstat counter and don't
 | 
						|
            # want to do it again
 | 
						|
            unless (my $true = delete $todie{$pid}) {
 | 
						|
                # decrement the count of currently running jobs
 | 
						|
                $jobstat->[1]--;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    # foreach job, fork enough children
 | 
						|
    while (my ($job, $jobstat) = each %jobs) {
 | 
						|
        my $need = $jobstat->[0] - $jobstat->[1];
 | 
						|
        if ($need > 0) {
 | 
						|
            error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
 | 
						|
            for (1..$need) {
 | 
						|
                my $cpid = make_new_child($job);
 | 
						|
                return 1 unless $cpid;
 | 
						|
                $child{$cpid} = $job;
 | 
						|
 | 
						|
                # now increase the count of processes currently doing this job
 | 
						|
                $jobstat->[1]++;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    # if we got this far, all jobs have been re-created.  note that
 | 
						|
    # so we avoid more CPU usage in this post-event-loop callback later
 | 
						|
    $allkidsup = 1;
 | 
						|
 | 
						|
    # true value keeps us running:
 | 
						|
    return 1;
 | 
						|
};
 | 
						|
 | 
						|
# setup Danga::Socket to start handling connections
 | 
						|
Client->DebugLevel( 3 );
 | 
						|
Client->OtherFds( fileno($server)  => $accept_handler,
 | 
						|
                  fileno($wserver) => $waccept_handler, );
 | 
						|
 | 
						|
# setup the post event loop callback to spawn jobs, and the timeout
 | 
						|
Client->SetLoopTimeout( 250 ); # 250 milliseconds
 | 
						|
Client->SetPostLoopCallback($spawn_children);
 | 
						|
 | 
						|
# and now, actually start listening for events
 | 
						|
eval {
 | 
						|
    print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
 | 
						|
    Client->EventLoop();
 | 
						|
};
 | 
						|
 | 
						|
if ( $@ ) { Mgd::log('err', "crash log: $@"); }
 | 
						|
Mgd::log('info', 'ending run');
 | 
						|
closelog();
 | 
						|
 | 
						|
#############################################################################
 | 
						|
## end of main
 | 
						|
#############################################################################
 | 
						|
 | 
						|
sub daemonize {
 | 
						|
    my($pid, $sess_id, $i);
 | 
						|
 | 
						|
    ## Fork and exit parent
 | 
						|
    if ($pid = fork) { exit 0; }
 | 
						|
 | 
						|
    ## Detach ourselves from the terminal
 | 
						|
    croak "Cannot detach from controlling terminal"
 | 
						|
        unless $sess_id = POSIX::setsid();
 | 
						|
 | 
						|
    ## Prevent possibility of acquiring a controling terminal
 | 
						|
    $SIG{'HUP'} = 'IGNORE';
 | 
						|
    if ($pid = fork) { exit 0; }
 | 
						|
 | 
						|
    ## Change working directory
 | 
						|
    chdir "/";
 | 
						|
 | 
						|
    ## Clear file creation mask
 | 
						|
    umask 0;
 | 
						|
 | 
						|
    print STDERR "Daemon running as pid $$.\n" if $DEBUG;
 | 
						|
 | 
						|
    ## Close open file descriptors
 | 
						|
    close(STDIN);
 | 
						|
    close(STDOUT);
 | 
						|
    close(STDERR);
 | 
						|
 | 
						|
    ## Reopen stderr, stdout, stdin to /dev/null
 | 
						|
    if ( $DEBUG ) {
 | 
						|
        open(STDIN,  "+>/tmp/mogilefsd.log");
 | 
						|
    } else {
 | 
						|
        open(STDIN,  "+>/dev/null");
 | 
						|
    }
 | 
						|
    open(STDOUT, "+>&STDIN");
 | 
						|
    open(STDERR, "+>&STDIN");
 | 
						|
}
 | 
						|
 | 
						|
sub make_new_child {
 | 
						|
    my $job = shift;
 | 
						|
 | 
						|
    my $pid;
 | 
						|
    my $sigset;
 | 
						|
 | 
						|
    # block signal for fork
 | 
						|
    $sigset = POSIX::SigSet->new(SIGINT);
 | 
						|
    sigprocmask(SIG_BLOCK, $sigset)
 | 
						|
        or return error("Can't block SIGINT for fork: $!");
 | 
						|
 | 
						|
    return error("fork failed creating $job: $!")
 | 
						|
        unless defined ($pid = fork);
 | 
						|
 | 
						|
    if ($pid) {
 | 
						|
        sigprocmask(SIG_UNBLOCK, $sigset)
 | 
						|
            or return error("Can't unblock SIGINT for fork: $!");
 | 
						|
        return $pid;
 | 
						|
    }
 | 
						|
 | 
						|
    # as a child, we want to close these and ignore them
 | 
						|
    close($server);
 | 
						|
    close($wserver);
 | 
						|
 | 
						|
    $SIG{INT} = 'DEFAULT';
 | 
						|
    $SIG{TERM} = 'DEFAULT';
 | 
						|
    $0 .= " [$job]";
 | 
						|
 | 
						|
    # unblock signals
 | 
						|
    sigprocmask(SIG_UNBLOCK, $sigset)
 | 
						|
        or return error("Can't unblock SIGINT for fork: $!");
 | 
						|
 | 
						|
    # set our frontend into child mode
 | 
						|
    Frontend->SetAsChild;
 | 
						|
 | 
						|
    # try to create a connection to the parent.  we die here because
 | 
						|
    # we're the child and if we can't talk to the master we really need
 | 
						|
    # to die so that a child isn't just sitting around without communication
 | 
						|
    # to the parent.
 | 
						|
    $psock = IO::Socket::INET->new(PeerAddr => "127.0.0.1",
 | 
						|
                                   PeerPort => $worker_port,
 | 
						|
                                   Type     => SOCK_STREAM,
 | 
						|
                                   Proto    => 'tcp',)
 | 
						|
        or die "Error creating socket to master: $@\n";
 | 
						|
    $psock->write("$$ $job\n");
 | 
						|
 | 
						|
    # now call our job function
 | 
						|
    no strict 'refs';
 | 
						|
    my $job_handler = *{"job_$job"}{CODE};
 | 
						|
    $job_handler->();
 | 
						|
    exit;
 | 
						|
}
 | 
						|
 | 
						|
# given (job, pid), record that this worker is about to die
 | 
						|
sub note_pending_death {
 | 
						|
    my ($job, $pid) = @_;
 | 
						|
 | 
						|
    die "$job not defined in call to note_pending_death.\n"
 | 
						|
        unless defined $jobs{$job};
 | 
						|
 | 
						|
    $todie{$pid} = 1;
 | 
						|
    $jobs{$job}->[1]--;
 | 
						|
}
 | 
						|
 | 
						|
# log stuff to syslog or the screen
 | 
						|
sub log {
 | 
						|
    # simple logging functionality
 | 
						|
    if (! $daemonize) {
 | 
						|
        # syslog acts like printf so we have to use printf and append a \n
 | 
						|
        shift; # ignore the first parameter (info, warn, critical, etc)
 | 
						|
        printf(shift(@_) . "\n", @_);
 | 
						|
    } else {
 | 
						|
        # just pass the parameters to syslog
 | 
						|
        syslog(@_);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# argument: a string to send to the parent process.
 | 
						|
sub send_to_parent {
 | 
						|
    # send a string to our parent
 | 
						|
    return unless $psock;
 | 
						|
    $psock->write("$_[0]\r\n");
 | 
						|
}
 | 
						|
 | 
						|
# argument: a string to take as indicating the error that just happened.
 | 
						|
sub error {
 | 
						|
    if ($psock) {
 | 
						|
        # we're a child, pass error to parent
 | 
						|
        send_to_parent("error $_[0]");
 | 
						|
    } else {
 | 
						|
        # we're a parent, so just handle output of error
 | 
						|
        Frontend->NoteError(\$_[0]);
 | 
						|
        Mgd::log('debug', $_[0]);
 | 
						|
    }
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
sub job_delete {
 | 
						|
 | 
						|
  PASS:
 | 
						|
    while (1) {
 | 
						|
        sleep 9;
 | 
						|
        validate_dbh();
 | 
						|
        my $dbh = get_dbh();
 | 
						|
 | 
						|
        # see if we have anything from the parent
 | 
						|
        send_to_parent('del_i_looped');
 | 
						|
        while (defined (my $line = <$psock>)) {
 | 
						|
            $line =~ s/\r?\n$//;
 | 
						|
            last if $line eq '.';
 | 
						|
            if ($line eq 'shutdown') {
 | 
						|
                exit 0;
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        my $LIMIT = 500;
 | 
						|
        while (1) {
 | 
						|
            sleep 1;
 | 
						|
            my $delmap = $dbh->selectall_arrayref("SELECT fd.fid, fo.devid ".
 | 
						|
                                                  "FROM file_to_delete fd ".
 | 
						|
                                                  "LEFT JOIN file_on fo ON fd.fid=fo.fid ".
 | 
						|
                                                  "LIMIT $LIMIT");
 | 
						|
            my $count = $delmap ? scalar @$delmap : 0;
 | 
						|
            next PASS unless $count;
 | 
						|
 | 
						|
            my %done;  # fid -> 1 (when fid is deleted from all devices)
 | 
						|
            my %dev_down;  # devid -> 1 (when device times out due to EIO)
 | 
						|
 | 
						|
            foreach my $dm (@$delmap) {
 | 
						|
                my ($fid, $devid) = @$dm;
 | 
						|
 | 
						|
                # if no device is returned from the query above, that
 | 
						|
                # means there are no file_on rows for it, and we can consider
 | 
						|
                # it now deleted.
 | 
						|
                unless ($devid) {
 | 
						|
                    $done{$fid} = 1;
 | 
						|
                    next;
 | 
						|
                }
 | 
						|
 | 
						|
                # don't try to delete from this device if we earlier
 | 
						|
                # found it to be timing out with EIO
 | 
						|
                next if $dev_down{$devid};
 | 
						|
 | 
						|
                my $path = make_path($devid, $fid);
 | 
						|
                my $rv = 0;
 | 
						|
                if (my $urlref = Mgd::is_url($path)) {
 | 
						|
                    # hit up the server and delete it
 | 
						|
                    my $sock = IO::Socket::INET->new(PeerAddr => $urlref->[0],
 | 
						|
                                                     PeerPort => $urlref->[1],
 | 
						|
                                                     Timeout => 2);
 | 
						|
                    unless ($sock) {
 | 
						|
                        # timeout or something, mark this device as down for now and move on
 | 
						|
                        $dev_down{$devid} = 1;
 | 
						|
                        next;
 | 
						|
                    }
 | 
						|
 | 
						|
                    # send delete request
 | 
						|
                    error("Sending delete for $path") if $Mgd::DEBUG >= 2;
 | 
						|
                    $sock->write("DELETE $urlref->[2] HTTP/1.0\r\n\r\n");
 | 
						|
                    my $response = <$sock>;
 | 
						|
                    if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
 | 
						|
                        if (($1 >= 200 && $1 <= 299) || $1 == 404) {
 | 
						|
                            # effectively means all went well
 | 
						|
                            $rv = 1;
 | 
						|
                        } else {
 | 
						|
                            # remote file system error?  mark node as down
 | 
						|
                            error("Error: unlink failure: $path: $1");
 | 
						|
                            $dev_down{$devid} = 1;
 | 
						|
                            next;
 | 
						|
                        }
 | 
						|
                    } else {
 | 
						|
                        error("Error: unknown response line: $response");
 | 
						|
                    }
 | 
						|
                } else {
 | 
						|
                    # do normal unlink
 | 
						|
                    $rv = unlink "$Mgd::MOG_ROOT/$path";
 | 
						|
 | 
						|
                    # device is timing out.  take note of it and
 | 
						|
                    # continue dealing with other deletes
 | 
						|
                    if (! $rv) {
 | 
						|
                        if ($! == EIO) {
 | 
						|
                            $dev_down{$devid} = 1;
 | 
						|
                            next;
 | 
						|
                        } elsif ($! == ENOENT) {
 | 
						|
                            $rv = 1;  # count non-existent file as deleted
 | 
						|
                        }
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                # if we deleted it, or it didn't exist, consider it
 | 
						|
                # deleted.
 | 
						|
                $dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
 | 
						|
                         undef, $fid, $devid) if $rv;
 | 
						|
            }
 | 
						|
 | 
						|
            if (%done) {
 | 
						|
                my $in = join(',', keys %done);
 | 
						|
                $dbh->do("DELETE FROM file_to_delete WHERE fid IN ($in)");
 | 
						|
            }
 | 
						|
 | 
						|
            next PASS if $count < $LIMIT;
 | 
						|
        }
 | 
						|
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# copies a file from one Perlbal to another utilizing HTTP
 | 
						|
sub http_copy {
 | 
						|
    my ($sdevid, $ddevid, $fid) = @_;
 | 
						|
 | 
						|
    # handles setting unreachable magic; $error->(reachability, "message")
 | 
						|
    my $error = sub {
 | 
						|
        if ($_[0]) {
 | 
						|
            send_to_parent("repl_unreachable $fid");
 | 
						|
 | 
						|
            # update database table
 | 
						|
            Mgd::validate_dbh();
 | 
						|
            my $dbh = Mgd::get_dbh();
 | 
						|
            $dbh->do("REPLACE INTO unreachable_fids VALUES ($fid, UNIX_TIMESTAMP())");
 | 
						|
        }
 | 
						|
        return error($_[1]);
 | 
						|
    };
 | 
						|
 | 
						|
    # get some information we'll need
 | 
						|
    my $devs = Mgd::get_device_summary();
 | 
						|
    my ($sdev, $ddev) = ($devs->{$sdevid}, $devs->{$ddevid});
 | 
						|
    return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fid")
 | 
						|
        unless ref $sdev && ref $ddev;
 | 
						|
    my ($spath, $dpath) = (Mgd::make_http_path($sdevid, $fid),
 | 
						|
                           Mgd::make_http_path($ddevid, $fid));
 | 
						|
    my ($shost, $sport) = (Mgd::hostid_ip($sdev->{hostid}), Mgd::hostid_http_port($sdev->{hostid}));
 | 
						|
    my ($dhost, $dport) = (Mgd::hostid_ip($ddev->{hostid}), Mgd::hostid_http_port($ddev->{hostid}));
 | 
						|
    unless (defined $spath && defined $dpath && defined $shost && defined $dhost && $sport && $dport) {
 | 
						|
        # show detailed information to find out what's not configured right
 | 
						|
        error("Error: unable to replicate file fid=$fid from device id $sdevid to device id $ddevid");
 | 
						|
        error("       http://$shost:$sport$spath -> http://$dhost:$dport$dpath");
 | 
						|
        return 0;
 | 
						|
    }
 | 
						|
 | 
						|
    # setup our pipe error handler, in case we get closed on
 | 
						|
    my $pipe_closed = 0;
 | 
						|
    local $SIG{PIPE} = sub { $pipe_closed = 1; };
 | 
						|
 | 
						|
    # okay, now get the file
 | 
						|
    my $sock = IO::Socket::INET->new(PeerAddr => $shost, PeerPort => $sport, Timeout => 2)
 | 
						|
        or return error("Unable to create socket to $shost:$sport for $spath");
 | 
						|
    $sock->write("GET $spath HTTP/1.0\r\n\r\n");
 | 
						|
    return error("Pipe closed retrieving $spath from $shost:$sport")
 | 
						|
        if $pipe_closed;
 | 
						|
 | 
						|
    # we just want a content length
 | 
						|
    my $clen;
 | 
						|
    while (defined (my $line = <$sock>)) {
 | 
						|
        $line =~ s/[\s\r\n]+$//;
 | 
						|
        last unless length $line;
 | 
						|
        if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
 | 
						|
            # make sure we get a good response
 | 
						|
            return $error->(1, "Error: Resource http://$shost:$sport$spath failed: HTTP $1")
 | 
						|
                unless $1 >= 200 && $1 <= 299;
 | 
						|
        }
 | 
						|
        next unless $line =~ /^Content-length:\s*(\d+)\s*$/i;
 | 
						|
        $clen = $1;
 | 
						|
    }
 | 
						|
    return $error->(1, "File $spath has a content-length of 0; unable to replicate")
 | 
						|
        unless $clen;
 | 
						|
 | 
						|
    # open target for put
 | 
						|
    my $dsock = IO::Socket::INET->new(PeerAddr => $dhost, PeerPort => $dport, Timeout => 2)
 | 
						|
        or return error("Unable to create socket to $dhost:$dport for $dpath");
 | 
						|
    $dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen\r\n\r\n")
 | 
						|
        or return error("Unable to write data to $dpath on $dhost:$dport");
 | 
						|
    return error("Pipe closed during write to $dpath on $dhost:$dport")
 | 
						|
        if $pipe_closed;
 | 
						|
 | 
						|
    # now read data and print while we're reading.
 | 
						|
    my ($data, $read, $written) = ('', 0, 0);
 | 
						|
    while (!$pipe_closed && (my $bytes = $sock->read($data, $clen - $read))) {
 | 
						|
        # now we've read in $bytes bytes
 | 
						|
        $read += $bytes;
 | 
						|
        my $wbytes = $dsock->send($data);
 | 
						|
        $written += $wbytes;
 | 
						|
        return error("Error: wrote $wbytes; expected to write $bytes; failed putting to $dpath")
 | 
						|
            unless $wbytes == $bytes;
 | 
						|
    }
 | 
						|
    return error("Error: wrote $written bytes, expected to write $clen")
 | 
						|
        unless $written == $clen;
 | 
						|
 | 
						|
    # now read in the response line (should be first line)
 | 
						|
    my $line = <$dsock>;
 | 
						|
    if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
 | 
						|
        return 1 if $1 >= 200 && $1 <= 299;
 | 
						|
        warn "Error: got a 404 in put: device not on host?: http://$dhost:$dport$dpath"
 | 
						|
            if $1 == 404;
 | 
						|
    } else {
 | 
						|
        warn "Error: HTTP response line not recognized: $line";
 | 
						|
    }
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
# replicates $fid if its devcount is less than $min.
 | 
						|
sub replicate {
 | 
						|
    my ($dbh, $fid, $min) = @_;
 | 
						|
 | 
						|
    my $lockname = "mgfs:fid:$fid:replicate";
 | 
						|
    my $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 1)", undef,
 | 
						|
                                     $lockname);
 | 
						|
    return error("Unable to obtain lock $lockname")
 | 
						|
        unless $lock;
 | 
						|
 | 
						|
    # hashref of devid -> $device_row_href  (where devid is alive)
 | 
						|
    my $devs = Mgd::get_device_summary();
 | 
						|
    return error("Device information from get_device_summary is empty")
 | 
						|
        unless $devs && %$devs;
 | 
						|
 | 
						|
    # learn what devices this file is already on
 | 
						|
    my $on_count = 0;
 | 
						|
    my %on_host;     # hostid -> 1
 | 
						|
    my @dead_devid;   # list of dead devids
 | 
						|
    my @exist_devid;  # list of existing devids
 | 
						|
 | 
						|
    my $sth = $dbh->prepare("SELECT devid FROM file_on WHERE fid=?");
 | 
						|
    $sth->execute($fid);
 | 
						|
    die $dbh->errstr if $dbh->err;
 | 
						|
    while (my ($devid) = $sth->fetchrow_array) {
 | 
						|
        my $d = $devs->{$devid};
 | 
						|
        unless ($d) {
 | 
						|
            push @dead_devid, $devid;
 | 
						|
            next;
 | 
						|
        }
 | 
						|
        $on_host{$d->{hostid}} = 1;
 | 
						|
        $on_count++;
 | 
						|
        push @exist_devid, $devid;
 | 
						|
    }
 | 
						|
 | 
						|
    my $retunlock = sub {
 | 
						|
        my $rv = shift;
 | 
						|
        $dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname);
 | 
						|
        return $rv ? $rv : error($_[0]);
 | 
						|
    };
 | 
						|
 | 
						|
    return $retunlock->(2) if $on_count >= $min;
 | 
						|
    return $retunlock->(0, "Source is no longer available replicating $fid") if $on_count == 0;
 | 
						|
    return $retunlock->(0, "No eligible devices available replicating $fid") if @exist_devid == 0;
 | 
						|
 | 
						|
    my $sdevid;
 | 
						|
 | 
						|
    while ($on_count < $min) {
 | 
						|
        my $need = $min - $on_count;
 | 
						|
 | 
						|
        my @good_devids = Mgd::find_deviceid(
 | 
						|
            random => 1,
 | 
						|
            not_on_hosts => [ keys %on_host ],
 | 
						|
            weight_by_free => 1,
 | 
						|
        );
 | 
						|
 | 
						|
        # wasn't able to replicate enough?
 | 
						|
        last unless @good_devids;
 | 
						|
 | 
						|
        my $ddevid = shift @good_devids;
 | 
						|
        $sdevid ||= @exist_devid[int(rand(scalar @exist_devid))];
 | 
						|
 | 
						|
        my $rv = undef;
 | 
						|
        if ($USE_HTTP) {
 | 
						|
            $rv = http_copy($sdevid, $ddevid, $fid);
 | 
						|
        } else {
 | 
						|
            my $dst_path = $MOG_ROOT . "/" . make_path($ddevid, $fid);
 | 
						|
            my $src_path = $MOG_ROOT . "/" . make_path($sdevid, $fid);
 | 
						|
            $rv = File::Copy::copy($src_path, $dst_path);
 | 
						|
        }
 | 
						|
 | 
						|
        return $retunlock->(0, "Copier failed replicating $fid") unless $rv;
 | 
						|
        add_file_on($fid, $ddevid, 1);
 | 
						|
        $on_count++;
 | 
						|
    }
 | 
						|
 | 
						|
    return $retunlock->(1);
 | 
						|
}
 | 
						|
 | 
						|
sub get_mindevcounts {
 | 
						|
    # make sure we have good info
 | 
						|
    Mgd::check_host_cache();
 | 
						|
    my $host_ct = keys %Mgd::cache_host;
 | 
						|
    
 | 
						|
    # find the classes for each domainid (including domains without explict classes)
 | 
						|
    my %min; # dmid -> classid -> mindevcount
 | 
						|
    validate_dbh();
 | 
						|
    my $dbh = get_dbh();
 | 
						|
    my $sth = $dbh->prepare("SELECT d.dmid, c.classid, c.mindevcount ".
 | 
						|
                            "FROM domain d LEFT JOIN class c ON d.dmid=c.dmid");
 | 
						|
    $sth->execute;
 | 
						|
    while (my ($dmid, $classid, $mct) = $sth->fetchrow_array) {
 | 
						|
        $min{$dmid} ||= {};  # note the existence of this dmid
 | 
						|
 | 
						|
        # classid may be NULL (undef), in which case there are no classes defined
 | 
						|
        # and we don't note the mindevcount (yet)
 | 
						|
        $min{$dmid}{$classid} = int($host_ct < $mct ? $host_ct : $mct) if defined $classid;
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
    # now iterate over %min again to set the implicit class
 | 
						|
    foreach my $dmid (keys %min) {
 | 
						|
        # each domain's classid=0, if not defined, has an implied mindevcount of $default_mindevcount
 | 
						|
        # which most people will probably use.
 | 
						|
        $min{$dmid}{0} = $host_ct < $default_mindevcount ? $host_ct : $default_mindevcount
 | 
						|
            unless exists $min{$dmid}{0};
 | 
						|
    }
 | 
						|
 | 
						|
    # return ref to hash
 | 
						|
    return \%min;
 | 
						|
}
 | 
						|
 | 
						|
sub job_monitor {
 | 
						|
    my $parse_parent_response = sub {
 | 
						|
        # now see what was in our message queue
 | 
						|
        while (defined (my $line = <$psock>)) {
 | 
						|
            $line =~ s/\r?\n$//;
 | 
						|
            last if $line eq '.';
 | 
						|
 | 
						|
            # now find out what command this is?
 | 
						|
            if ($line eq 'shutdown') {
 | 
						|
                exit 0;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    while (1) {
 | 
						|
        sleep 15;
 | 
						|
 | 
						|
        # get db and note we're starting a run
 | 
						|
        error("Monitor running; scanning usage files")
 | 
						|
            if $Mgd::DEBUG >= 1;
 | 
						|
        validate_dbh();
 | 
						|
        my $dbh = get_dbh() or return 0;
 | 
						|
 | 
						|
        # general report in to parent
 | 
						|
        send_to_parent('monitor_ping');
 | 
						|
        $parse_parent_response->();
 | 
						|
 | 
						|
        # get a current list of devices
 | 
						|
        my $devs = Mgd::get_device_summary();
 | 
						|
        next unless $devs && %$devs;
 | 
						|
 | 
						|
        # now iterate over devices
 | 
						|
        foreach my $dev (values %$devs) {
 | 
						|
            my $host = $Mgd::cache_host{$dev->{hostid}};
 | 
						|
            my $port = $host->{http_get_port} || $host->{http_port};
 | 
						|
            my $url = "http://$host->{hostip}:$port/dev$dev->{devid}/usage";
 | 
						|
 | 
						|
            # now try to get the data with a short timeout
 | 
						|
            my $ua = LWP::UserAgent->new( timeout => 2 );
 | 
						|
            my $response = $ua->get($url);
 | 
						|
 | 
						|
            unless ($response->is_success) {
 | 
						|
                error("Failed getting dev$dev->{devid}: " . $response->status_line);
 | 
						|
                next;
 | 
						|
            }
 | 
						|
 | 
						|
            my %stats;
 | 
						|
            my $data = $response->content;
 | 
						|
            foreach (split(/\r?\n/, $data)) {
 | 
						|
                next unless /^(\w+)\s*:\s*(.+)$/;
 | 
						|
                $stats{$1} = $2;
 | 
						|
            }
 | 
						|
 | 
						|
            my ($used, $total) = ($stats{used}, $stats{total});
 | 
						|
            unless ($used && $total) {
 | 
						|
                error("dev$dev->{devid} reports used = $used, total = $total, error?");
 | 
						|
                next;
 | 
						|
            }
 | 
						|
 | 
						|
            # bytes => megabytes
 | 
						|
            $used /= 1024;
 | 
						|
            $total /= 1024;
 | 
						|
 | 
						|
            $dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = UNIX_TIMESTAMP() " .
 | 
						|
                     "WHERE devid = ?", undef, int($total), int($used), $dev->{devid});
 | 
						|
            if ($dbh->err) {
 | 
						|
                error("Database error in update query: " . $dbh->errstr);
 | 
						|
                next;
 | 
						|
            }
 | 
						|
 | 
						|
            error("dev$dev->{devid}: used = $used, total = $total")
 | 
						|
                if $Mgd::DEBUG >= 1;
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub job_reaper {
 | 
						|
    my $parse_parent_response = sub {
 | 
						|
        # now see what was in our message queue
 | 
						|
        while (defined (my $line = <$psock>)) {
 | 
						|
            $line =~ s/\r?\n$//;
 | 
						|
            last if $line eq '.';
 | 
						|
 | 
						|
            # now find out what command this is?
 | 
						|
            if ($line eq 'shutdown') {
 | 
						|
                exit 0;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    while (1) {
 | 
						|
        sleep 10;
 | 
						|
 | 
						|
        # get db and note we're starting a run
 | 
						|
        error("Reaper running; looking for dead devices")
 | 
						|
            if $Mgd::DEBUG >= 1;
 | 
						|
        validate_dbh();
 | 
						|
        my $dbh = get_dbh() or return 0;
 | 
						|
 | 
						|
        # general report in to parent
 | 
						|
        send_to_parent('reaper_ping');
 | 
						|
        $parse_parent_response->();
 | 
						|
 | 
						|
        # get a current list of devices
 | 
						|
        my $devs = Mgd::get_device_summary(1);
 | 
						|
        my @deaddevs = grep { $_->{status} eq 'dead' } values %$devs;
 | 
						|
        next unless @deaddevs;
 | 
						|
 | 
						|
        # now iterate over dead devices
 | 
						|
        foreach my $dev (@deaddevs) {
 | 
						|
            my $devid = $dev->{devid};
 | 
						|
 | 
						|
            # look for files on this device
 | 
						|
            my $fids = $dbh->selectcol_arrayref('SELECT fid FROM file_on WHERE devid = ? LIMIT 1000',
 | 
						|
                                                undef, $devid);
 | 
						|
            if ($dbh->err) {
 | 
						|
                error("Error selecting jobs to reap: " . $dbh->errstr);
 | 
						|
                next;
 | 
						|
            }
 | 
						|
            next unless $fids && @$fids;
 | 
						|
 | 
						|
            # note we got some
 | 
						|
            error("Found " . scalar(@$fids) . " files on dead device $devid");
 | 
						|
 | 
						|
            # now iterate
 | 
						|
            foreach my $fid (@$fids) {
 | 
						|
                $dbh->do('DELETE FROM file_on WHERE fid = ? AND devid = ?',
 | 
						|
                         undef, $fid, $devid);
 | 
						|
                if ($dbh->err) {
 | 
						|
                    error("Error deleting from file_on (file $fid, device $devid): " . $dbh->errstr);
 | 
						|
                    next;
 | 
						|
                }
 | 
						|
 | 
						|
                # now update the fid count
 | 
						|
                unless (Mgd::update_fid_devcount($fid)) {
 | 
						|
                    error("Error updating fid $fid devcount");
 | 
						|
                    next;
 | 
						|
                }
 | 
						|
 | 
						|
                # if debugging on, note this is done
 | 
						|
                error("Reaper noted fid $fid no longer on device $devid")
 | 
						|
                    if $Mgd::DEBUG >= 2;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub job_replicate {
 | 
						|
    my $parse_parent_response = sub {
 | 
						|
        # now see what was in our message queue
 | 
						|
        while (defined (my $line = <$psock>)) {
 | 
						|
            $line =~ s/\r?\n$//;
 | 
						|
            last if $line eq '.';
 | 
						|
 | 
						|
            # now find out what command this is?
 | 
						|
            if ($line =~ /^repl_was_done (\d+)/ && $_[0]) {
 | 
						|
                delete $_[0]->{$1};
 | 
						|
            } elsif ($line eq 'shutdown') {
 | 
						|
                exit 0;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    # { fid => lastcheck }; instructs us not to replicate this fid... we will clear
 | 
						|
    # out fids from this list that are expired
 | 
						|
    my %fidfailure;
 | 
						|
 | 
						|
    # { fid => 1 }; used to keep track of fids we find in the unreachable_fids table
 | 
						|
    my %unreachable;
 | 
						|
 | 
						|
    my $sleep = 2;
 | 
						|
    while (1) {
 | 
						|
        sleep $sleep;
 | 
						|
        validate_dbh();
 | 
						|
        my $dbh = get_dbh() or return 0;
 | 
						|
 | 
						|
        # general report in to parent
 | 
						|
        send_to_parent('repl_ping');
 | 
						|
        $parse_parent_response->(undef);
 | 
						|
 | 
						|
        # start off assuming that we're going to get everything replicated and then take a break
 | 
						|
        $sleep = 2;
 | 
						|
 | 
						|
        # update our unreachable fid list... we consider them good for 15 minutes
 | 
						|
        my $urfids = $dbh->selectall_arrayref('SELECT fid, lastupdate FROM unreachable_fids');
 | 
						|
        die $dbh->errstr if $dbh->err;
 | 
						|
        foreach my $r (@{$urfids || []}) {
 | 
						|
            my $nv = $r->[1] + 900;
 | 
						|
            unless ($fidfailure{$r->[0]} && $fidfailure{$r->[0]} < $nv) {
 | 
						|
                # given that we might have set it below to a time past the unreachable
 | 
						|
                # 15 minute timeout, we want to only overwrite %fidfailure's idea of
 | 
						|
                # the expiration time if we are extending it
 | 
						|
                $fidfailure{$r->[0]} = $nv;
 | 
						|
            }
 | 
						|
            $unreachable{$r->[0]} = 1;
 | 
						|
        }
 | 
						|
 | 
						|
        # get the min dev counts
 | 
						|
        my %min = %{ Mgd::get_mindevcounts() };
 | 
						|
 | 
						|
        # iterate through each domain, replicating its contents
 | 
						|
        foreach my $dmid (keys %min) {
 | 
						|
            # iterate through each class, including the implicit class 0
 | 
						|
            while (my ($classid, $min) = each %{$min{$dmid}}) {
 | 
						|
                error("Checking replication for dmid=$dmid, classid=$classid, min=$min")
 | 
						|
                    if $Mgd::DEBUG >= 1;
 | 
						|
 | 
						|
                my $LIMIT = 1000;
 | 
						|
 | 
						|
                # try going from devcount of 1 up to devcount of $min-1
 | 
						|
                my %fidtodo; # fid => 1
 | 
						|
                my $fixed = 0;
 | 
						|
                my $attempted = 0;
 | 
						|
                my $devcount = 1;
 | 
						|
                while ($fixed < $LIMIT && $devcount < $min) {
 | 
						|
                    my $now = time();
 | 
						|
                    my $fids = $dbh->selectcol_arrayref("SELECT fid FROM file WHERE dmid=? AND classid=? ".
 | 
						|
                                                        "AND devcount = ? AND length IS NOT NULL ".
 | 
						|
                                                        "LIMIT $LIMIT", undef, $dmid, $classid, $devcount);
 | 
						|
                    die $dbh->errstr if $dbh->err;
 | 
						|
                    $fidtodo{$_} = 1 foreach @$fids;
 | 
						|
 | 
						|
                    # increase devcount so we try to replicate the files at the next devcount
 | 
						|
                    $devcount++;
 | 
						|
 | 
						|
                    # see if we have any files to replicate
 | 
						|
                    my $count = $fids ? scalar @$fids : 0;
 | 
						|
                    error("  found $count for dmid=$dmid/classid=$classid/min=$min")
 | 
						|
                        if $Mgd::DEBUG >= 1;
 | 
						|
                    next unless $count;
 | 
						|
 | 
						|
                    # randomize the list so multiple daemons/threads working on
 | 
						|
                    # replicate at the same time don't all fight over the
 | 
						|
                    # same fids to move
 | 
						|
                    my @randfids = randlist(@$fids);
 | 
						|
 | 
						|
                    error("Need to replicate: $dmid/$classid: @$fids") if $Mgd::DEBUG >= 2;
 | 
						|
                    foreach my $fid (@randfids) {
 | 
						|
                        # now replicate this fid
 | 
						|
                        $attempted++;
 | 
						|
                        next unless $fidtodo{$fid};
 | 
						|
 | 
						|
                        if ($fidfailure{$fid}) {
 | 
						|
                            if ($fidfailure{$fid} < $now) {
 | 
						|
                                delete $fidfailure{$fid};
 | 
						|
                            } else {
 | 
						|
                                next;
 | 
						|
                            }
 | 
						|
                        }
 | 
						|
 | 
						|
                        if (my $status = replicate($dbh, $fid, $min)) {
 | 
						|
                            # $status is either 0 (failure, handled below), 1 (success, we actually
 | 
						|
                            # replicated this file), or 2 (success, but someone else replicated it).
 | 
						|
                            # so if it's 2, we just want to go to the next fid.  this file is done.
 | 
						|
                            next if $status == 2;
 | 
						|
 | 
						|
                            # if it was no longer reachable, mark it reachable
 | 
						|
                            if (delete $unreachable{$fid}) {
 | 
						|
                                $dbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid);
 | 
						|
                                die $dbh->errstr if $dbh->err;
 | 
						|
                            }
 | 
						|
 | 
						|
                            # housekeeping
 | 
						|
                            $fixed++;
 | 
						|
                            send_to_parent("repl_i_did $fid");
 | 
						|
                            $parse_parent_response->(\%fidtodo);
 | 
						|
 | 
						|
                            # status update
 | 
						|
                            if ($fixed % 20 == 0) {
 | 
						|
                                my $ratio = $fixed/$attempted*100;
 | 
						|
                                error(sprintf("replicated=$fixed, attempted=$attempted, ratio=%.2f%%", $ratio))
 | 
						|
                                    if $fixed % 20 == 0;
 | 
						|
                            }
 | 
						|
                        } else {
 | 
						|
                            # failed in replicate, don't retry for a minute
 | 
						|
                            $fidfailure{$fid} = $now + 60;
 | 
						|
                        }
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                # if we did 1000, we just want to jump to the next pass through all domains and classes without pausing
 | 
						|
                $sleep = 0 if $fixed >= $LIMIT;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub job_queryworker {
 | 
						|
    # process lines of input, blocking.
 | 
						|
    my $worker = QueryWorker->new($psock);
 | 
						|
    while (defined (my $line = <$psock>)) {
 | 
						|
        $line =~ s/[\r\n]+$//;
 | 
						|
        validate_dbh();
 | 
						|
        $worker->process_line(\$line);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
#####################################################################
 | 
						|
### S E R V E R   A P I   F U N C T I O N S
 | 
						|
#####################################################################
 | 
						|
 | 
						|
# returns hashref of devid -> $device_row_href  (where devid is alive/down, but not dead)
 | 
						|
# cached for 15 seconds.
 | 
						|
use vars qw($cache_device_summary $cache_device_summary_time %cache_host $cache_host_time);
 | 
						|
 | 
						|
# general purpose device locator.  example:
 | 
						|
#
 | 
						|
# my $devid = Mgd::find_deviceid(
 | 
						|
#     random => 1,              # get random device (else find first suitable)
 | 
						|
#     min_free_space => 100,    # with at least 100MB free
 | 
						|
#     weight_by_free => 1,      # find result weighted by free space
 | 
						|
#     max_disk_age => 5,        # minutes of age the last usage report can be before we ignore the disk
 | 
						|
#     not_on_hosts => [ 1, 2 ], # no devices on hosts 1 and 2
 | 
						|
# );
 | 
						|
#
 | 
						|
# returns undef if no suitable device was found.  else, if you wanted an
 | 
						|
# array will return an array of the suitable devices--if you want just a
 | 
						|
# single item, you get just the first one found.
 | 
						|
sub find_deviceid {
 | 
						|
    my %opts = ( @_ );
 | 
						|
 | 
						|
    # copy down global minimum free space if not specified
 | 
						|
    $opts{min_free_space} ||= $min_free_space;
 | 
						|
    $opts{max_disk_age} ||= $max_disk_age;
 | 
						|
    $opts{max_disk_age} = time() - ($opts{max_disk_age} * 60);
 | 
						|
 | 
						|
    # setup for iterating over devices
 | 
						|
    my $devs = Mgd::get_device_summary();
 | 
						|
    my @devids = keys %{$devs || {}};
 | 
						|
    my $devcount = scalar(@devids);
 | 
						|
    my $start = $opts{random} ? int(rand($devcount)) : 0;
 | 
						|
    my %not_on_host = ( map { $_ => 1 } @{$opts{not_on_hosts} || []} );
 | 
						|
    my $total_free = 0;
 | 
						|
 | 
						|
    # now find a device that matches what they want
 | 
						|
    my @list;
 | 
						|
    for (my $i = 0; $i < $devcount; $i++) {
 | 
						|
        my $idx = ($i + $start) % $devcount;
 | 
						|
        my $dev = $devs->{$devids[$idx]};
 | 
						|
 | 
						|
        # series of suitability checks
 | 
						|
        next unless $dev->{status} eq 'alive';
 | 
						|
        next if $not_on_host{$dev->{hostid}};
 | 
						|
        next if $opts{max_disk_age} && $dev->{mb_asof} &&
 | 
						|
                $dev->{mb_asof} < $opts{max_disk_age};
 | 
						|
        next if $opts{min_free_space} && $dev->{mb_total} &&
 | 
						|
                $dev->{mb_free} < $opts{min_free_space};
 | 
						|
 | 
						|
        # we get here, this is a suitable device
 | 
						|
        push @list, $dev->{devid};
 | 
						|
        $total_free += $dev->{mb_free};
 | 
						|
    }
 | 
						|
 | 
						|
    # now we have a list ordered randomly, do free space weighting
 | 
						|
    if ($opts{weight_by_free}) {
 | 
						|
        my $rand = int(rand($total_free));
 | 
						|
        my $cur = 0;
 | 
						|
 | 
						|
        foreach my $devid (@list) {
 | 
						|
            $cur += $devs->{$devid}->{mb_free};
 | 
						|
            return $devid if $cur >= $rand;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    # return first listed suitable device
 | 
						|
    return @list ? $list[0] : undef;
 | 
						|
}
 | 
						|
 | 
						|
sub get_device_summary {
 | 
						|
    my $include_dead = shift() ? ", 'dead'" : '';
 | 
						|
    my $now = time;
 | 
						|
    return $cache_device_summary if $cache_device_summary_time > $now - 15;
 | 
						|
 | 
						|
    my $dbh = get_dbh();
 | 
						|
 | 
						|
    # learn devices
 | 
						|
    my %dev;  #
 | 
						|
    my %hostdevs;  # hostid -> [ devid ]  (where devid is alive/down, but not dead)
 | 
						|
    my $sth = $dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
 | 
						|
                            "mb_used, mb_asof, status FROM device ".
 | 
						|
                            "WHERE status IN ('alive', 'down' $include_dead)");
 | 
						|
    $sth->execute;
 | 
						|
    $dev{$_->{devid}} = $_ while $_ = $sth->fetchrow_hashref;
 | 
						|
 | 
						|
    # now override device status with host status if the host status is less than the device status
 | 
						|
    Mgd::check_host_cache();
 | 
						|
    foreach my $devid (keys %dev) {
 | 
						|
        # makes others have an easier time of finding devices by free space
 | 
						|
        $dev{$devid}->{mb_free} = $dev{$devid}->{mb_total} - $dev{$devid}->{mb_used};
 | 
						|
 | 
						|
        my $host_status = $cache_host{$dev{$devid}->{hostid}}->{status};
 | 
						|
        if ($dev{$devid}->{status} eq 'alive' && $host_status ne 'alive') {
 | 
						|
            $dev{$devid}->{status} = $host_status;
 | 
						|
        } elsif ($dev{$devid}->{status} eq 'down' && $host_status eq 'dead') {
 | 
						|
            $dev{$devid}->{status} = $host_status;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    $cache_device_summary_time = $now;
 | 
						|
    return $cache_device_summary = \%dev;
 | 
						|
}
 | 
						|
 | 
						|
sub check_host_cache {
 | 
						|
    my $now = time;
 | 
						|
    return if $cache_host_time > $now - 5;
 | 
						|
 | 
						|
    %cache_host = ();
 | 
						|
    my $dbh = get_dbh();
 | 
						|
    my $sth = $dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
 | 
						|
                            "hostip, http_port, http_get_port, remoteroot, altip, altmask FROM host");
 | 
						|
    $sth->execute;
 | 
						|
    while (my $host = $sth->fetchrow_hashref) {
 | 
						|
        $cache_host{$host->{hostid}} = $host;
 | 
						|
        $cache_host{$host->{hostid}}->{mask} = Net::Netmask->new2($host->{altmask})
 | 
						|
            if $host->{altip} && $host->{altmask};
 | 
						|
    }
 | 
						|
    $cache_host_time = $now;
 | 
						|
}
 | 
						|
 | 
						|
sub key_filerow {
 | 
						|
    my ($dbh, $dmid, $key) = @_;
 | 
						|
    my $row = $dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
 | 
						|
                                      "FROM file WHERE dmid=? AND dkey=?",
 | 
						|
                                      undef, $dmid, $key);
 | 
						|
    return $row;
 | 
						|
}
 | 
						|
 | 
						|
# see if we should reduce the number of active children
 | 
						|
sub job_needs_reduction {
 | 
						|
    my $job = shift;
 | 
						|
    return $jobs{$job}->[0] < $jobs{$job}->[1];
 | 
						|
}
 | 
						|
 | 
						|
# given a file descriptor number and a timeout, wait for that descriptor to
 | 
						|
# become readable; returns 0 or 1 on if it did or not
 | 
						|
sub wait_for_readability {
 | 
						|
    my ($fileno, $timeout) = @_;
 | 
						|
    return 0 unless $fileno && $timeout;
 | 
						|
 | 
						|
    my $rin;
 | 
						|
    vec($rin, $fileno, 1) = 1;
 | 
						|
    my $nfound = select($rin, undef, undef, $timeout);
 | 
						|
 | 
						|
    # nfound can be undef or 0, both failures, or 1, a success
 | 
						|
    return $nfound ? 1 : 0;
 | 
						|
}
 | 
						|
 | 
						|
# get size of file, return 0 on error
 | 
						|
sub get_file_size {
 | 
						|
    my $path = shift;
 | 
						|
 | 
						|
    # quick case -- just a file on disk
 | 
						|
    unless ($path =~ m!^http://([^:/]+)(?::(\d+))?(/.+)$!) {
 | 
						|
        return -s "$Mgd::MOG_ROOT/$path"
 | 
						|
    }
 | 
						|
    my ($host, $port, $uri) = ($1, $2, $3);
 | 
						|
 | 
						|
    # don't sigpipe us
 | 
						|
    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
 | 
						|
 | 
						|
    # setup for sending size request to cached host
 | 
						|
    my $req = "size $uri\r\n";
 | 
						|
    my $reqlen = length $req;
 | 
						|
    my $rv = 0;
 | 
						|
    my $sock = $streamcache{$host};
 | 
						|
 | 
						|
    # sub to parse the response from $sock.  common code, so we have it here.
 | 
						|
    my $parse_response = sub {
 | 
						|
        # give the socket 3 seconds to become readable
 | 
						|
        unless (Mgd::wait_for_readability(fileno($sock), $node_timeout)) {
 | 
						|
            close($sock);
 | 
						|
            return 0;
 | 
						|
        }
 | 
						|
 | 
						|
        # now we know there's readable data
 | 
						|
        my $line = <$sock>;
 | 
						|
        return 0 unless defined $line;
 | 
						|
        return 0 unless $line =~ /^(\S+)\s+(-?\d+)/; # expected format: "uri size"
 | 
						|
        return error("get_file_size() requested size of $path, got back size of $1 ($2 bytes)")
 | 
						|
            if $1 ne $uri;
 | 
						|
        return $2+0;
 | 
						|
    };
 | 
						|
 | 
						|
    # try using the cached socket
 | 
						|
    if ($sock) {
 | 
						|
        $rv = send($sock, $req, $FLAG_NOSIGNAL);
 | 
						|
        if ($!) {
 | 
						|
            undef $streamcache{$host};
 | 
						|
        } elsif ($rv != $reqlen) {
 | 
						|
            return error("send() didn't return expected length ($rv, not $reqlen) for $path");
 | 
						|
        } else {
 | 
						|
            # success
 | 
						|
            return $parse_response->();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    # try creating a connection to the stream
 | 
						|
    unless ($rv) {
 | 
						|
        $sock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $MOGSTORED_STREAM_PORT, Timeout => 5);
 | 
						|
        $streamcache{$host} = $sock;
 | 
						|
        if ($sock) {
 | 
						|
            $rv = send($sock, $req, $FLAG_NOSIGNAL);
 | 
						|
            if ($!) {
 | 
						|
                return error("error talking to mogstored stream ($path): $!");
 | 
						|
            } elsif ($rv != $reqlen) {
 | 
						|
                return error("send() didn't return expected length ($rv, not $reqlen) for $path");
 | 
						|
            } else {
 | 
						|
                # success
 | 
						|
                return $parse_response->();
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    # failure case: use a HEAD request to get the size of the file
 | 
						|
    my $sock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $port, Timeout => 3)
 | 
						|
        or return error("get_file_size() unable to contact mogstored for size of $path");
 | 
						|
    $sock->write("HEAD $uri HTTP/1.0\r\n\r\n");
 | 
						|
    while (defined (my $line = <$sock>)) {
 | 
						|
        if ($line =~ /^Content-length: (\d+)/i) {
 | 
						|
            # success
 | 
						|
            return $1+0;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    # no content length found?
 | 
						|
    return error("get_file_size() found no content-length header in response for $path");
 | 
						|
}
 | 
						|
 | 
						|
sub class_id {
 | 
						|
    my ($dmid, $class) = @_;
 | 
						|
    return undef unless $dmid > 0 && length $class;
 | 
						|
 | 
						|
    my $dbh = Mgd::get_dbh;
 | 
						|
    my $classid = $dbh->selectrow_array
 | 
						|
        ("SELECT classid FROM class WHERE dmid=? AND classname=?", undef, $dmid, $class)
 | 
						|
            or return undef;
 | 
						|
    return undef unless $classid;
 | 
						|
    return $classid;
 | 
						|
}
 | 
						|
 | 
						|
sub domain_id {
 | 
						|
    # check the cache for this item
 | 
						|
    my $now = time();
 | 
						|
    if ($domaincachetime + 5 < $now) {
 | 
						|
        %domaincache = ();
 | 
						|
 | 
						|
        # now get updated list
 | 
						|
        my $dbh = Mgd::get_dbh;
 | 
						|
        my $domains = $dbh->selectall_arrayref('SELECT dmid, namespace FROM domain');
 | 
						|
        foreach my $row (@{$domains || []}) {
 | 
						|
            # namespace -> dmid
 | 
						|
            $domaincache{$row->[1]} = $row->[0];
 | 
						|
        }
 | 
						|
 | 
						|
        $domaincachetime = $now;
 | 
						|
    }
 | 
						|
 | 
						|
    # just use cached version
 | 
						|
    return $domaincache{$_[0]};
 | 
						|
}
 | 
						|
 | 
						|
sub class_name {
 | 
						|
    my ($dmid, $classid) = @_;
 | 
						|
    return undef unless $dmid > 0 && length $classid;
 | 
						|
    # FIXME: cache this
 | 
						|
 | 
						|
    # lookup class
 | 
						|
    my $dbh = Mgd::get_dbh;
 | 
						|
    my $classname = $dbh->selectrow_array
 | 
						|
        ("SELECT classname FROM class WHERE dmid=? AND classid=?", undef, $dmid, $classid)
 | 
						|
            or return undef;
 | 
						|
    return undef unless $classname;
 | 
						|
    return $classname;
 | 
						|
}
 | 
						|
 | 
						|
sub domain_name {
 | 
						|
    my $dmid = shift;
 | 
						|
    # FIXME: cache this
 | 
						|
 | 
						|
    # lookup domain
 | 
						|
    my $dbh = Mgd::get_dbh;
 | 
						|
    my $namespace = $dbh->selectrow_array
 | 
						|
        ("SELECT namespace FROM domain WHERE dmid=?", undef, $dmid);
 | 
						|
    return $namespace;
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
sub hostid_name {
 | 
						|
    my $hostid = shift;
 | 
						|
    check_host_cache();
 | 
						|
    my $h = $cache_host{$hostid};
 | 
						|
    return $h ? $h->{hostname} : undef;
 | 
						|
}
 | 
						|
 | 
						|
sub hostid_ip {
 | 
						|
    my $hostid = shift;
 | 
						|
    check_host_cache();
 | 
						|
    my $h = $cache_host{$hostid};
 | 
						|
    return undef unless $h;
 | 
						|
 | 
						|
    # if we have a client ip and an object for alt matching...
 | 
						|
    if ($h->{mask} && $h->{altip} &&
 | 
						|
            ($force_alt_zone || ($client_ip && $h->{altip} && $h->{mask}->match($client_ip)))) {
 | 
						|
        return $h->{altip};
 | 
						|
    } else {
 | 
						|
        return $h->{hostip};
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub hostid_http_port {
 | 
						|
    my $hostid = shift;
 | 
						|
    check_host_cache();
 | 
						|
    my $h = $cache_host{$hostid};
 | 
						|
    return $h ? $h->{http_port} : undef;
 | 
						|
}
 | 
						|
 | 
						|
sub hostid_http_get_port {
 | 
						|
    my $hostid = shift;
 | 
						|
    check_host_cache();
 | 
						|
    my $h = $cache_host{$hostid};
 | 
						|
    return $h ? $h->{http_get_port} : undef;
 | 
						|
}
 | 
						|
 | 
						|
sub make_http_path {
 | 
						|
    my ($devid, $fid) = @_;
 | 
						|
 | 
						|
    my $dsum = get_device_summary();
 | 
						|
    my $dinfo = $dsum->{$devid};
 | 
						|
    return undef unless $dinfo;
 | 
						|
    my $hostname = hostid_name($dinfo->{hostid});
 | 
						|
 | 
						|
    my $nfid = sprintf '%010d', $fid;
 | 
						|
    my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} );
 | 
						|
 | 
						|
    return "/dev$devid/$b/$mmm/$ttt/$nfid.fid";
 | 
						|
}
 | 
						|
 | 
						|
sub make_full_url {
 | 
						|
    # set use_get_port to be true to specify to use the get port
 | 
						|
    my ($devid, $fid, $use_get_port) = @_;
 | 
						|
 | 
						|
    # get some information we'll need
 | 
						|
    my $devs = Mgd::get_device_summary();
 | 
						|
    my $dev = $devs->{$devid} or return undef;
 | 
						|
    my $path = Mgd::make_http_path($devid, $fid) or return undef;
 | 
						|
    my $host = Mgd::hostid_ip($dev->{hostid}) or return undef;
 | 
						|
    my $port = $use_get_port ? Mgd::hostid_http_get_port($dev->{hostid}) : undef;
 | 
						|
    $port ||= Mgd::hostid_http_port($dev->{hostid}) or return undef;
 | 
						|
    return "http://$host:$port$path";
 | 
						|
}
 | 
						|
 | 
						|
# if given an HTTP URL, break it down into [ host, port, URI ], else
 | 
						|
# returns undef
 | 
						|
sub is_url {
 | 
						|
    my $path = shift;
 | 
						|
    if ($path =~ m!^http://(.+?)(?::(\d+))?(/.+)$!) {
 | 
						|
        return [ $1, $2 || 80, $3 ];
 | 
						|
    }
 | 
						|
    return undef;
 | 
						|
}
 | 
						|
 | 
						|
sub make_path {
 | 
						|
    # jump out if we should be using HTTP stuff
 | 
						|
    return Mgd::make_full_url(@_) if $USE_HTTP;
 | 
						|
 | 
						|
    my ($devid, $fid) = @_;
 | 
						|
 | 
						|
    my $dsum = get_device_summary();
 | 
						|
    my $dinfo = $dsum->{$devid};
 | 
						|
    return undef unless $dinfo;
 | 
						|
    my $hostname = hostid_name($dinfo->{hostid});
 | 
						|
 | 
						|
    my $nfid = sprintf '%010d', $fid;
 | 
						|
    my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} );
 | 
						|
 | 
						|
    my $path = "$hostname/dev$devid/$b/$mmm/$ttt/$nfid.fid";
 | 
						|
    make_dirs( "$MOG_ROOT/$path" ) or return undef;
 | 
						|
 | 
						|
    return $path;
 | 
						|
}
 | 
						|
 | 
						|
sub make_get_path {
 | 
						|
    # the get path only changes for HTTP mode
 | 
						|
    return Mgd::make_path(@_) unless $USE_HTTP;
 | 
						|
    return Mgd::make_full_url(@_, 1);
 | 
						|
}
 | 
						|
 | 
						|
sub make_dirs
 | 
						|
{
 | 
						|
    my $filename = shift;
 | 
						|
    my $dir = File::Basename::dirname($filename);
 | 
						|
    eval { File::Path::mkpath($dir, 0, 0775); };
 | 
						|
    return $@ ? 0 : 1;
 | 
						|
}
 | 
						|
 | 
						|
sub add_file_on {
 | 
						|
    my ($fid, $devid, $no_lock) = @_;
 | 
						|
 | 
						|
    my $dbh = get_dbh() or return 0;
 | 
						|
 | 
						|
    my $rv = $dbh->do("INSERT IGNORE INTO file_on SET fid=?, devid=?",
 | 
						|
                      undef, $fid, $devid);
 | 
						|
    if ($rv > 0) {
 | 
						|
        return update_fid_devcount($fid, $no_lock);
 | 
						|
    } else {
 | 
						|
        # was already on that device
 | 
						|
        return 1;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub update_fid_devcount {
 | 
						|
    my ($fid, $no_lock) = @_;
 | 
						|
 | 
						|
    my $dbh = get_dbh() or return 0;
 | 
						|
 | 
						|
    my $lockname = "mgfs:fid:$fid";
 | 
						|
    unless ($no_lock) {
 | 
						|
        my $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 10)", undef,
 | 
						|
                                         $lockname);
 | 
						|
        return 0 unless $lock;
 | 
						|
    }
 | 
						|
    my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
 | 
						|
                                   undef, $fid);
 | 
						|
 | 
						|
    $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
 | 
						|
             $ct, $fid);
 | 
						|
 | 
						|
    unless ($no_lock) {
 | 
						|
        $dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname);
 | 
						|
    }
 | 
						|
 | 
						|
    return 1;
 | 
						|
}
 | 
						|
 | 
						|
sub randlist
 | 
						|
{
 | 
						|
    my @rlist = @_;
 | 
						|
    my $size = scalar(@rlist);
 | 
						|
 | 
						|
    my $i;
 | 
						|
    for ($i=0; $i<$size; $i++)
 | 
						|
    {
 | 
						|
        unshift @rlist, splice(@rlist, $i+int(rand()*($size-$i)), 1);
 | 
						|
    }
 | 
						|
    return @rlist;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
#####################################################################
 | 
						|
### C L I E N T   C L A S S
 | 
						|
### A client is a user connection for sending requests to us.  Requests
 | 
						|
### can either be normal user requests to be sent to a QueryWorker
 | 
						|
### or management requests that start with a !.
 | 
						|
#####################################################################
 | 
						|
package Client;
 | 
						|
 | 
						|
use Danga::Socket ();
 | 
						|
use base qw{Danga::Socket};
 | 
						|
 | 
						|
use fields qw{read_buf};
 | 
						|
 | 
						|
sub new {
 | 
						|
    my Client $self = shift;
 | 
						|
    $self = fields::new($self) unless ref $self;
 | 
						|
    $self->SUPER::new( @_ );
 | 
						|
    return $self;
 | 
						|
}
 | 
						|
 | 
						|
# Client
 | 
						|
sub event_read {
 | 
						|
    my Client $self = shift;
 | 
						|
 | 
						|
    my $bref = $self->read(1024);
 | 
						|
    return $self->close() unless defined $bref;
 | 
						|
    $self->{read_buf} .= $$bref;
 | 
						|
 | 
						|
    while ($self->{read_buf} =~ s/^(.*?)\r?\n//) {
 | 
						|
        next unless length $1;
 | 
						|
        Frontend->HandleClientRequest($self, $1);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# Client
 | 
						|
sub event_err { my $self = shift; $self->close; }
 | 
						|
sub event_hup { my $self = shift; $self->close; }
 | 
						|
 | 
						|
# just note that we've died
 | 
						|
sub close {
 | 
						|
    # mark us as being dead
 | 
						|
    my Client $self = shift;
 | 
						|
    Frontend->NoteDeadClient($self);
 | 
						|
    $self->SUPER::close(@_);
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
#####################################################################
 | 
						|
### W O R K E R C O N N   C L A S S
 | 
						|
### This class maintains a connection to one of our various classes
 | 
						|
### of workers.
 | 
						|
#####################################################################
 | 
						|
package WorkerConn;
 | 
						|
 | 
						|
use Danga::Socket ();
 | 
						|
use base qw{Danga::Socket};
 | 
						|
 | 
						|
use fields qw{read_buf job pid cmd_buf reqid};
 | 
						|
 | 
						|
sub new {
 | 
						|
    my WorkerConn $self = shift;
 | 
						|
    $self = fields::new($self) unless ref $self;
 | 
						|
    $self->SUPER::new( @_ );
 | 
						|
 | 
						|
    # mark as not a worker by default
 | 
						|
    $self->{pid} = 0;
 | 
						|
    $self->{reqid} = 0;
 | 
						|
    $self->{job} = undef;
 | 
						|
    $self->{cmd_buf} = [];
 | 
						|
 | 
						|
    return $self;
 | 
						|
}
 | 
						|
 | 
						|
sub event_read {
 | 
						|
    my WorkerConn $self = shift;
 | 
						|
 | 
						|
    my $bref = $self->read(1024);
 | 
						|
    return $self->close() unless defined $bref;
 | 
						|
    $self->{read_buf} .= $$bref;
 | 
						|
 | 
						|
    while ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
 | 
						|
        my $line = $1;
 | 
						|
        if ($self->job eq 'queryworker' && (substr($line, 0, 5) ne 'error')) {
 | 
						|
            Frontend->HandleQueryWorkerResponse($self, $line);
 | 
						|
        } else {
 | 
						|
            Frontend->HandleChildRequest($self, $line);
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub job {
 | 
						|
    my WorkerConn $self = shift;
 | 
						|
    return $self->{job} unless @_;
 | 
						|
    return $self->{job} = shift;
 | 
						|
}
 | 
						|
 | 
						|
sub pid {
 | 
						|
    my WorkerConn $self = shift;
 | 
						|
    return $self->{pid} unless @_;
 | 
						|
    return $self->{pid} = shift;
 | 
						|
}
 | 
						|
 | 
						|
sub event_hup { my $self = shift; $self->close; }
 | 
						|
 | 
						|
sub close {
 | 
						|
    # mark us as being dead
 | 
						|
    my WorkerConn $self = shift;
 | 
						|
    Frontend->NoteDeadWorkerConn($self);
 | 
						|
    $self->SUPER::close(@_);
 | 
						|
}
 | 
						|
 | 
						|
sub enqueue_line {
 | 
						|
    my WorkerConn $self = $_[0];
 | 
						|
    return if $self->job eq 'queryworker'; # they don't use this queueing
 | 
						|
    my $msg = "$_[1]\r\n";
 | 
						|
    push @{$self->{cmd_buf}}, $msg;
 | 
						|
}
 | 
						|
 | 
						|
sub drain_queue {
 | 
						|
    my WorkerConn $worker = $_[0];
 | 
						|
    foreach my $cmd (@{$worker->{cmd_buf}}) {
 | 
						|
        $worker->write($cmd);
 | 
						|
    }
 | 
						|
    $worker->write(".\r\n");
 | 
						|
    $worker->{cmd_buf} = [];
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
#####################################################################
 | 
						|
### F R O N T E N D  C L A S S
 | 
						|
### This class handles keeping lists of workers and clients and
 | 
						|
### assigning them to eachother when things happen.  This is a purely
 | 
						|
### event driven class.
 | 
						|
#####################################################################
 | 
						|
package Frontend;
 | 
						|
 | 
						|
# Mappings: fd => [ clientref, jobstring, starttime ]
 | 
						|
# queues are just lists of Client class objects
 | 
						|
# ChildrenByJob: job => { pid => $client }
 | 
						|
# ErrorsTo: fid => Client
 | 
						|
# RecentQueries: [ string, string, string, ... ]
 | 
						|
# Stats: element => number
 | 
						|
our ($IsChild, @QueryWorkerQueue, @ClientQueue, @RecentQueries,
 | 
						|
     %Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
 | 
						|
$IsChild = 0;
 | 
						|
 | 
						|
# when a child is spawned, they'll have copies of all the data from the
 | 
						|
# parent, but they don't need it.  this method is called when you want
 | 
						|
# to indicate that this Frontend is running on a child and should clean.
 | 
						|
sub SetAsChild {
 | 
						|
    @QueryWorkerQueue = ();
 | 
						|
    @ClientQueue = ();
 | 
						|
    %Mappings = ();
 | 
						|
    $IsChild = 1;
 | 
						|
    %ErrorsTo = ();
 | 
						|
 | 
						|
    # and now kill off our event loop so that we don't waste time
 | 
						|
    Client->SetPostLoopCallback(sub { return 0; });
 | 
						|
}
 | 
						|
 | 
						|
# called when a child has died.  a child is someone doing a job for us,
 | 
						|
# but it might be a queryworker or any other type of job.  we just want
 | 
						|
# to remove them from our list of children.  they're actually respawned
 | 
						|
# by the make_new_child function elsewhere in Mgd.
 | 
						|
sub NoteDeadChild {
 | 
						|
    my $pid = $_[1];
 | 
						|
    foreach my $job (keys %ChildrenByJob) {
 | 
						|
        return if # bail out if we actually delete one
 | 
						|
            delete $ChildrenByJob{$job}->{$pid};
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# called when a client dies.  clients are users, management or non.
 | 
						|
# we just want to remove them from the error reporting interface, if
 | 
						|
# they happen to be part of it.
 | 
						|
sub NoteDeadClient {
 | 
						|
    my Client $client = $_[1];
 | 
						|
    delete $ErrorsTo{$client->{fd}};
 | 
						|
}
 | 
						|
 | 
						|
# called when the error function in Mgd is called and we're in the parent,
 | 
						|
# so it's pretty simple that basically we just spit it out to folks listening
 | 
						|
# to errors
 | 
						|
sub NoteError {
 | 
						|
    my Client $client;
 | 
						|
    return unless %ErrorsTo;
 | 
						|
 | 
						|
    my $msg = ":: ${$_[1]}\r\n";
 | 
						|
    foreach $client (values %ErrorsTo) {
 | 
						|
        $client->write(\$msg);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# take a new connection that we know is from one of our children, but
 | 
						|
# we're not sure what type of child, so just set it in read mode until
 | 
						|
# they tell us what they are
 | 
						|
sub RegisterWorkerConn {
 | 
						|
    my WorkerConn $worker = $_[1];
 | 
						|
    $worker->watch_read(1);
 | 
						|
}
 | 
						|
 | 
						|
# take a new worker and note that it's a worker and ready to be used
 | 
						|
# for commands.  this is called when workers connect to the frontend.
 | 
						|
sub RegisterQueryWorker {
 | 
						|
    # basically take the worker, mark it as a worker, enqueue it,
 | 
						|
    # and then try to process the outstanding queues
 | 
						|
    my WorkerConn $worker = $_[1];
 | 
						|
    Frontend->EnqueueQueryWorker($worker);
 | 
						|
}
 | 
						|
 | 
						|
# puts a worker back in the queue, deleting any outstanding jobs in
 | 
						|
# the mapping list for this fd.
 | 
						|
sub EnqueueQueryWorker {
 | 
						|
    # first arg is class, second is worker
 | 
						|
    my WorkerConn $worker = $_[1];
 | 
						|
    delete $Mappings{$worker->{fd}};
 | 
						|
 | 
						|
    # see if we need to kill off some workers
 | 
						|
    if (Mgd::job_needs_reduction('queryworker')) {
 | 
						|
        Mgd::error("Reducing queryworker headcount by 1.");
 | 
						|
        Frontend->AskWorkerToDie($worker);
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    # must be okay, so put it in the queue
 | 
						|
    push @QueryWorkerQueue, $worker;
 | 
						|
    Frontend->ProcessQueues;
 | 
						|
}
 | 
						|
 | 
						|
# if we need to kill off a worker, this function takes in the WorkerConn
 | 
						|
# object, tells it to die, marks us as having requested its death, and decrements
 | 
						|
# the count of running jobs.
 | 
						|
sub AskWorkerToDie {
 | 
						|
    my WorkerConn $worker = $_[1];
 | 
						|
    $worker->write("shutdown\r\n");
 | 
						|
    Mgd::note_pending_death($worker->job, $worker->pid);
 | 
						|
}
 | 
						|
 | 
						|
# kill bored query workers so we can get down to the level requested.  this
 | 
						|
# continues killing until we run out of folks to kill.
 | 
						|
sub CullQueryWorkers {
 | 
						|
    while (@QueryWorkerQueue && Mgd::job_needs_reduction('queryworker')) {
 | 
						|
        my WorkerConn $worker = shift @QueryWorkerQueue;
 | 
						|
        Frontend->AskWorkerToDie($worker);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# called when we get a response from a worker.  this reenqueues the
 | 
						|
# worker so it can handle another response as well as passes the answer
 | 
						|
# back on to the client.
 | 
						|
sub HandleQueryWorkerResponse {
 | 
						|
    return Mgd::error("Frontend (Child) got worker response: $_[2]") if $IsChild;
 | 
						|
 | 
						|
    # got a response from a worker
 | 
						|
    my WorkerConn $worker = $_[1];
 | 
						|
    return unless $worker && $Mappings{$worker->{fd}};
 | 
						|
 | 
						|
    # get the client we're working with (if any)
 | 
						|
    my Client $client = $Mappings{$worker->{fd}}->[0];
 | 
						|
 | 
						|
    # if we have no client, then we just got a standard message from
 | 
						|
    # the queryworker and need to pass it up the line
 | 
						|
    return Frontend->HandleChildRequest($worker, $_[2]) if !$client;
 | 
						|
 | 
						|
    # at this point it was a command response, but if the client has gone
 | 
						|
    # away, just reenqueue this query worker
 | 
						|
    return Frontend->EnqueueQueryWorker($worker) if $client->{closed};
 | 
						|
 | 
						|
    # <numeric id> [client-side time to complete] <response>
 | 
						|
    my ($time, $id, $res);
 | 
						|
    if ($_[2] =~ /^(\d+-\d+)\s+(\d+\.\d+)\s+(.+)$/) {
 | 
						|
        # save time and response for use later
 | 
						|
        ($id, $time, $res) = ($1, $2, $3);
 | 
						|
    } elsif ($_[2] =~ /^(\d+-\d+)\s(.+)$/) {
 | 
						|
        # didn't match, must be in a different format?
 | 
						|
        ($id, $time, $res) = ($1, 'undef', $2);
 | 
						|
    }
 | 
						|
 | 
						|
    # now, if it doesn't match
 | 
						|
    unless ($id eq "$worker->{pid}-$worker->{reqid}") {
 | 
						|
        Mgd::error("Worker responded with id $id, expected $worker->{pid}-$worker->{reqid}, killing");
 | 
						|
        $client->close('worker_mismatch');
 | 
						|
        return Frontend->AskWorkerToDie($worker);
 | 
						|
    }
 | 
						|
 | 
						|
    # now time this interval and add to @RecentQueries
 | 
						|
    my $tinterval = Time::HiRes::tv_interval([$Mappings{$worker->{fd}}->[2]]);
 | 
						|
    push @RecentQueries, sprintf("%s %.4f %s", $Mappings{$worker->{fd}}->[1], $tinterval, $time);
 | 
						|
    shift @RecentQueries if scalar(@RecentQueries) > 50;
 | 
						|
 | 
						|
    # send text to client, put worker back in queue
 | 
						|
    $client->write("$res\r\n");
 | 
						|
    Frontend->EnqueueQueryWorker($worker);
 | 
						|
}
 | 
						|
 | 
						|
# called from various spots to empty the queues of available pairs.
 | 
						|
sub ProcessQueues {
 | 
						|
    return if $IsChild;
 | 
						|
 | 
						|
    # try to match up a client with a worker
 | 
						|
    while (@QueryWorkerQueue && @ClientQueue) {
 | 
						|
        # get client that isn't closed
 | 
						|
        my $clref;
 | 
						|
        while (@ClientQueue) {
 | 
						|
            $clref = shift @ClientQueue;
 | 
						|
            if (!defined $clref || $clref->[0]->{closed}) {
 | 
						|
                $clref = undef;
 | 
						|
                next;
 | 
						|
            }
 | 
						|
 | 
						|
            # if we get here the client is valid
 | 
						|
            last;
 | 
						|
        }
 | 
						|
        next unless $clref;
 | 
						|
 | 
						|
        # get worker and make sure it's not closed already
 | 
						|
        my WorkerConn $worker = shift @QueryWorkerQueue;
 | 
						|
        if (!defined $worker || $worker->{closed}) {
 | 
						|
            unshift @ClientQueue, $clref;
 | 
						|
            next;
 | 
						|
        }
 | 
						|
 | 
						|
        # put in mapping and send data to worker
 | 
						|
        push @$clref, Time::HiRes::gettimeofday();
 | 
						|
        $Mappings{$worker->{fd}} = $clref;
 | 
						|
 | 
						|
        # increment our counter so we know what request counter this is going out
 | 
						|
        $worker->{reqid}++;
 | 
						|
 | 
						|
        $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
 | 
						|
        $worker->watch_read(1);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# send short descriptions of commands we support to the user
 | 
						|
sub SendHelp {
 | 
						|
    my Client $client = $_[1];
 | 
						|
 | 
						|
    # not supported yet
 | 
						|
    #my $whaton = $_[2];
 | 
						|
 | 
						|
    # send general purpose help
 | 
						|
    $client->write(<<HELP);
 | 
						|
Welcome to mogilefsd's built-in help system.  Available commands:
 | 
						|
 | 
						|
    !recent     Recently executed queries and how long they took.
 | 
						|
    !queue      Queries that are pending execution.
 | 
						|
    !stats      General stats on what we're up to.
 | 
						|
    !watch      Observe errors/messages from children.
 | 
						|
    !jobs       Outstanding job counts, desired level, and pids.
 | 
						|
    !shutdown   IMMEDIATELY kill all of mogilefsd.  IMMEDIATELY.
 | 
						|
 | 
						|
    !replication
 | 
						|
                See the replication status.  Output format:
 | 
						|
                <domain> <class> <devcount> <files>
 | 
						|
 | 
						|
    !to <job class> <message>
 | 
						|
                Send <message> to all workers of <job class>.
 | 
						|
                Mostly used for debugging.
 | 
						|
 | 
						|
    !want <count> <job class>
 | 
						|
                Alter the level of workers of this class desired.
 | 
						|
                Example: !want 20 queryworker, !want 3 replicate.
 | 
						|
                See !jobs for what jobs are available.
 | 
						|
 | 
						|
More to come...
 | 
						|
.
 | 
						|
HELP
 | 
						|
    
 | 
						|
}
 | 
						|
 | 
						|
# called when a client sends us text.  we just create a job for
 | 
						|
# it and then call ProcessQueues.
 | 
						|
sub HandleClientRequest {
 | 
						|
    return Mgd::error("Frontend (Child) got request from client: $_[2]") if $IsChild;
 | 
						|
 | 
						|
    # if it's just 'help', 'h', '?', or something, do that
 | 
						|
    if ((substr($_[2], 0, 1) eq '?') || ($_[2] eq 'help') || ($_[2] eq '')) {
 | 
						|
        Frontend->SendHelp($_[1]);
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    # quick check to see if we the parent should handle this
 | 
						|
    if (substr($_[2], 0, 1) eq '!') {
 | 
						|
        my Client $client = $_[1];
 | 
						|
        my ($cmd, $args) = ($_[2] =~ m/^!(.+?)(?:\s+(.+))?$/);
 | 
						|
 | 
						|
        my @out;
 | 
						|
        if ($cmd =~ /^stats$/) {
 | 
						|
            # print out some stats on the queues
 | 
						|
            my $uptime = time - $Mgd::starttime;
 | 
						|
            my $ccount = scalar(@ClientQueue);
 | 
						|
            my $wcount = scalar(@QueryWorkerQueue);
 | 
						|
            my $ipcount = scalar(keys %Mappings);
 | 
						|
            push @out, "uptime $uptime",
 | 
						|
                       "pending_queries $ccount",
 | 
						|
                       "processing_queries $ipcount",
 | 
						|
                       "bored_queryworkers $wcount",
 | 
						|
                       map { "$_ $Stats{$_}" } sort keys %Stats;
 | 
						|
 | 
						|
        } elsif ($cmd =~ /^repl/) {
 | 
						|
            Mgd::validate_dbh();
 | 
						|
            my $dbh = Mgd::get_dbh();
 | 
						|
            my $mdcs = Mgd::get_mindevcounts();
 | 
						|
            foreach my $dmid (sort keys %$mdcs) {
 | 
						|
                my $dmname = Mgd::domain_name($dmid);
 | 
						|
                foreach my $classid (sort keys %{$mdcs->{$dmid}}) {
 | 
						|
                    my $min = $mdcs->{$dmid}->{$classid};
 | 
						|
                    next unless $min > 1;
 | 
						|
 | 
						|
                    my $classname = Mgd::class_name($dmid, $classid) || '_default';
 | 
						|
                    foreach my $ct (1..$min-1) {
 | 
						|
                        my $count = $dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
 | 
						|
                                                          undef, $dmid, $classid, $ct);
 | 
						|
                        push @out, "$dmname $classname $ct $count";
 | 
						|
                    }
 | 
						|
                }
 | 
						|
            }
 | 
						|
 | 
						|
        } elsif ($cmd =~ /^shutdown/) {
 | 
						|
            print "User requested shutdown: $args\n";
 | 
						|
            kill 15, $$; # kill us, that kills our kids
 | 
						|
 | 
						|
        } elsif ($cmd =~ /^jobs/) {
 | 
						|
            # dump out a list of running jobs and pids
 | 
						|
            foreach my $job (sort keys %ChildrenByJob) {
 | 
						|
                my $ct = scalar(keys %{$ChildrenByJob{$job}});
 | 
						|
                push @out, "$job count $ct";
 | 
						|
                push @out, "$job desired $jobs{$job}->[0]";
 | 
						|
                push @out, "$job pids " . join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}});
 | 
						|
            }
 | 
						|
 | 
						|
        } elsif ($cmd =~ /^want/) {
 | 
						|
            # !want <count> <jobclass>
 | 
						|
            # set the new desired staffing level for a class
 | 
						|
            if ($args =~ /^(\d+)\s+(\S+)/) {
 | 
						|
                my ($count, $job) = ($1, $2);
 | 
						|
 | 
						|
                # validate count
 | 
						|
                $count = 0 if $count < 0;
 | 
						|
                # FIXME ...add an upper limit?
 | 
						|
 | 
						|
                # now make sure it's a real job
 | 
						|
                if (defined $jobs{$job}) {
 | 
						|
                    $jobs{$job}->[0] = $count;
 | 
						|
                    $Mgd::allkidsup = 0;
 | 
						|
                    push @out, "Now desiring $count children doing '$job'.";
 | 
						|
 | 
						|
                    # try to clean out the queryworkers (if that's what we're doing?)
 | 
						|
                    Frontend->CullQueryWorkers
 | 
						|
                        if $job eq 'queryworker';
 | 
						|
                } else {
 | 
						|
                    my $classes = join(", ", sort keys %jobs);
 | 
						|
                    push @out, "ERROR: Invalid class '$job'.  Valid classes: $classes";
 | 
						|
                }
 | 
						|
            } else {
 | 
						|
                push @out, "ERROR: usage: !want <count> <jobclass>";
 | 
						|
            }
 | 
						|
 | 
						|
        } elsif ($cmd =~ /^to/) {
 | 
						|
            # !to <jobclass> <message>
 | 
						|
            # sends <message> to all children of <jobclass>
 | 
						|
            if ($args =~ /^(\S+)\s+(.+)/) {
 | 
						|
                my $ct = Frontend->SendToChildrenByJob($1, $2);
 | 
						|
                push @out, "Message sent to $ct children.";
 | 
						|
 | 
						|
            } else {
 | 
						|
                push @out, "ERROR: usage: !to <jobclass> <message>";
 | 
						|
            }
 | 
						|
 | 
						|
        } elsif ($cmd =~ /^queue/ || $cmd =~ /^pend/) {
 | 
						|
            foreach my $clq (@ClientQueue) {
 | 
						|
                push @out, $clq->[1];
 | 
						|
            }
 | 
						|
 | 
						|
        } elsif ($cmd =~ /^watch/) {
 | 
						|
            if (delete $ErrorsTo{$client->{fd}}) {
 | 
						|
                push @out, "Removed you from watcher list.";
 | 
						|
            } else {
 | 
						|
                $ErrorsTo{$client->{fd}} = $client;
 | 
						|
                push @out, "Added you to watcher list.";
 | 
						|
            }
 | 
						|
 | 
						|
        } elsif ($cmd =~ /^recent/) {
 | 
						|
            # show the most recent N queries
 | 
						|
            push @out, @RecentQueries;
 | 
						|
 | 
						|
        } else {
 | 
						|
            Frontend->SendHelp($client, $args);
 | 
						|
        }
 | 
						|
        $client->write(join("\r\n", @out) . "\r\n") if @out;
 | 
						|
        $client->write(".\r\n");
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    # just push the input onto the client queue
 | 
						|
    $Stats{queries}++;
 | 
						|
    push @ClientQueue, [ $_[1], "cmd " . ($_[1]->peer_ip_string || '0.0.0.0') . " $_[2]" ];
 | 
						|
    Frontend->ProcessQueues;
 | 
						|
}
 | 
						|
 | 
						|
# a child has contacted us with some command/status/something.
 | 
						|
sub HandleChildRequest {
 | 
						|
    return Mgd::error("Frontend (Child) got request from child: $_[2]") if $IsChild;
 | 
						|
 | 
						|
    # if they have no job set, then their first line is what job they are
 | 
						|
    # and not a command.  they also specify their pid, just so we know what
 | 
						|
    # connection goes with what pid, in case it's ever useful information.
 | 
						|
    my WorkerConn $child = $_[1];
 | 
						|
    unless (defined $child->job) {
 | 
						|
        my ($pid, $job) = ($_[2] =~ /^(\d+)\s+(.+)/);
 | 
						|
        $child->job($job);
 | 
						|
        $child->pid($pid);
 | 
						|
 | 
						|
        # now do any special case startup
 | 
						|
        if ($job eq 'queryworker') {
 | 
						|
            Frontend->RegisterQueryWorker($child);
 | 
						|
        }
 | 
						|
 | 
						|
        # add to normal list
 | 
						|
        $ChildrenByJob{$job}->{$child->pid} = $child;
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    # see if we should downsize this child
 | 
						|
    my $check_job = sub {
 | 
						|
        if (Mgd::job_needs_reduction($child->job)) {
 | 
						|
            Mgd::error("Reducing headcount of " . $child->job . " job by 1.");
 | 
						|
            Frontend->AskWorkerToDie($child);
 | 
						|
        } else {
 | 
						|
            $child->drain_queue;
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    # at this point we've got a command of some sort
 | 
						|
    my $cmd = $_[2];
 | 
						|
    if ($cmd =~ /^error (.+)$/i) {
 | 
						|
        # pass it on to our error handler, prefaced with the child's job
 | 
						|
        Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");
 | 
						|
 | 
						|
    } elsif ($cmd =~ /^queue/) {
 | 
						|
        # send out what we have queued up for it
 | 
						|
        $child->drain_queue;
 | 
						|
 | 
						|
    } elsif ($cmd =~ /^del_i_looped/) {
 | 
						|
        $check_job->();
 | 
						|
 | 
						|
    } elsif ($cmd =~ /^monitor_ping/) {
 | 
						|
        $check_job->();
 | 
						|
 | 
						|
    } elsif ($cmd =~ /^reaper_ping/) {
 | 
						|
        $check_job->();
 | 
						|
 | 
						|
    } elsif ($cmd =~ /^repl_ping/) {
 | 
						|
        $check_job->();
 | 
						|
 | 
						|
    } elsif ($cmd =~ /^repl_unreachable (\d+)/) {
 | 
						|
        # announce to the other replicators that this fid can't be reached, but note
 | 
						|
        # that we don't actually drain the queue to the requestor, as the replicator
 | 
						|
        # isn't in a place where it can accept a queue drain right now.
 | 
						|
        Frontend->SendToChildrenByJob('replicate', "repl_unreachable $1", $child);
 | 
						|
 | 
						|
    } elsif ($cmd =~ /^repl_i_did (\d+)/) {
 | 
						|
        my $fid = $1;
 | 
						|
 | 
						|
        # announce to the other replicators that this fid was done and then drain the
 | 
						|
        # queue to this person.
 | 
						|
        Frontend->SendToChildrenByJob('replicate', "repl_was_done $fid", $child);
 | 
						|
        $check_job->();
 | 
						|
 | 
						|
    } else {
 | 
						|
        # unknown command
 | 
						|
        Mgd::error("Unknown command [$_[2]] from child; job=" . $child->job);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
# given a job class, and a message, send it to all children of that job.  returns
 | 
						|
# the number of children the message was sent to.
 | 
						|
# arguments: ( jobclass, message, [ child ] )
 | 
						|
# if child is specified, the message will be sent to members of the job class that
 | 
						|
# aren't that child.  so you can exclude the one that originated the message.
 | 
						|
sub SendToChildrenByJob {
 | 
						|
    my $childref = $ChildrenByJob{$_[1]};
 | 
						|
    return 0 unless defined $childref && %$childref;
 | 
						|
    my $msg = $_[2];
 | 
						|
 | 
						|
    foreach my $child (values %$childref) {
 | 
						|
        # ignore the child specified as the third arg if one is sent
 | 
						|
        next if defined $_[3] && $_[3] == $child;
 | 
						|
 | 
						|
        # send the message to this child
 | 
						|
        $child->enqueue_line($msg);
 | 
						|
    }
 | 
						|
    return scalar(keys %$childref);
 | 
						|
}
 | 
						|
 | 
						|
# called when we notice that a worker has bit it.  we might have to restart a
 | 
						|
# job that they had been working on.
 | 
						|
sub NoteDeadWorkerConn {
 | 
						|
    return if $IsChild;
 | 
						|
 | 
						|
    # get parms and error check
 | 
						|
    my WorkerConn $worker = $_[1];
 | 
						|
    return unless $worker;
 | 
						|
 | 
						|
    # if there's a mapping for this worker's fd, they had a job that didn't get done
 | 
						|
    if ($Mappings{$worker->{fd}}) {
 | 
						|
        # unshift, since this one already went through the queue once
 | 
						|
        unshift @ClientQueue, $Mappings{$worker->{fd}};
 | 
						|
        delete $Mappings{$worker->{fd}};
 | 
						|
 | 
						|
        # now try to get it processing again
 | 
						|
        Frontend->ProcessQueues;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
#####################################################################
 | 
						|
### W O R K E R   C L A S S
 | 
						|
### Class that handles all of the actions that a worker can take.
 | 
						|
#####################################################################
 | 
						|
package QueryWorker;
 | 
						|
 | 
						|
use fields qw{sock querystarttime reqid};
 | 
						|
 | 
						|
sub new {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
 | 
						|
    $self = fields::new($self) unless ref $self;
 | 
						|
    $self->{sock} = shift;
 | 
						|
    $self->{querystarttime} = undef;
 | 
						|
    $self->{reqid} = undef;
 | 
						|
 | 
						|
    return $self;
 | 
						|
}
 | 
						|
 | 
						|
sub process_line {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $lineref = shift;
 | 
						|
 | 
						|
    # see what kind of command this is
 | 
						|
    return $self->err_line('unknown_command')
 | 
						|
        unless $$lineref =~ /^(\d+-\d+)?\s*(\S+)\s+(\S+)\s*(.*)/;
 | 
						|
    $self->{reqid} = $1 || undef;
 | 
						|
    my ($cmd, $line) = ($2, $4);
 | 
						|
 | 
						|
    # set global variables for zone determination
 | 
						|
    $client_ip = $3;
 | 
						|
    $force_alt_zone = 0;
 | 
						|
 | 
						|
    # some basic commands we support
 | 
						|
    if ($cmd eq 'echo') {
 | 
						|
        Mgd::send_to_parent($line);
 | 
						|
        return;
 | 
						|
    } elsif ($cmd eq 'shutdown') {
 | 
						|
        exit 0;
 | 
						|
    }
 | 
						|
 | 
						|
    # fallback to normal command handling
 | 
						|
    if ($line =~ /^(\w+)\s*(.*)/) {
 | 
						|
        my ($cmd, $args) = ($1, $2);
 | 
						|
        $cmd = lc($cmd);
 | 
						|
 | 
						|
        no strict 'refs';
 | 
						|
        $self->{querystarttime} = Time::HiRes::gettimeofday();
 | 
						|
        my $cmd_handler = *{"cmd_$cmd"}{CODE};
 | 
						|
        if ($cmd_handler) {
 | 
						|
            my $args = decode_url_args(\$args);
 | 
						|
            $force_alt_zone = 1 if $args->{zone} eq 'alt';
 | 
						|
            $cmd_handler->($self, $args);
 | 
						|
            return;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return $self->err_line('unknown_command');
 | 
						|
}
 | 
						|
 | 
						|
# returns 0 on error, or dmid of domain
 | 
						|
sub check_domain {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    return $self->err_line("no_domain") unless length($args->{domain});
 | 
						|
 | 
						|
    # validate domain
 | 
						|
    my $dmid = Mgd::domain_id($args->{domain}) or
 | 
						|
        return $self->err_line("unreg_domain");
 | 
						|
 | 
						|
    return $dmid;
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_sleep {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
    sleep($args->{duration} || 10);
 | 
						|
    return $self->ok_line;
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_create_open {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    # validate parameters
 | 
						|
    my $dmid = $self->check_domain($args) or return 0;
 | 
						|
    my $key = $args->{key} || "";
 | 
						|
    my $multi = $args->{multi_dest} ? 1 : 0;
 | 
						|
 | 
						|
    # get DB handle
 | 
						|
    my $dbh = Mgd::get_dbh or
 | 
						|
        return $self->err_line("nodb");
 | 
						|
 | 
						|
    # figure out what classid this file is for
 | 
						|
    my $class = $args->{class};
 | 
						|
    my $classid = 0;
 | 
						|
    if (length($class)) {
 | 
						|
        # TODO: cache this
 | 
						|
        $classid = $dbh->selectrow_array("SELECT classid FROM class ".
 | 
						|
                                         "WHERE dmid=? AND classname=?",
 | 
						|
                                         undef, $dmid, $class)
 | 
						|
            or return $self->err_line("unreg_class");
 | 
						|
    }
 | 
						|
 | 
						|
    # find a device to put this file on that has 100Mb free.
 | 
						|
    my (@dests, @hosts);
 | 
						|
    my $devs = Mgd::get_device_summary();
 | 
						|
    while (scalar(@dests) < ($multi ? 3 : 1)) {
 | 
						|
        my $devid = Mgd::find_deviceid(
 | 
						|
            random => 1,
 | 
						|
            weight_by_free => 1,
 | 
						|
            not_on_hosts => \@hosts,
 | 
						|
        );
 | 
						|
 | 
						|
        last unless defined $devid;
 | 
						|
 | 
						|
        push @dests, $devid;
 | 
						|
        push @hosts, $devs->{$devid}->{hostid};
 | 
						|
    }
 | 
						|
    return $self->err_line("no_devices") unless @dests;
 | 
						|
 | 
						|
    # setup the new mapping.  we store the devices that we picked for
 | 
						|
    # this file in here, knowing that they might not be used.  create_close
 | 
						|
    # is responsible for actually mapping in file_on.
 | 
						|
    $dbh->do("INSERT INTO tempfile SET ".
 | 
						|
             " fid=NULL, dmid=?, dkey=?, classid=?, createtime=UNIX_TIMESTAMP(), devids=?",
 | 
						|
             undef, $dmid, $key, $classid, join(',', @dests));
 | 
						|
    return undef if $dbh->err;
 | 
						|
    my $fid = $dbh->{mysql_insertid};  # FIXME: mysql-ism
 | 
						|
    return undef unless $fid > 0;
 | 
						|
 | 
						|
    # original single path support
 | 
						|
    return $self->ok_line({
 | 
						|
        fid => $fid,
 | 
						|
        devid => $dests[0],
 | 
						|
        path => Mgd::make_path($dests[0], $fid),
 | 
						|
    }) unless $multi;
 | 
						|
 | 
						|
    # multiple path support
 | 
						|
    my $ct = 0;
 | 
						|
    my $res = {};
 | 
						|
    foreach my $devid (@dests) {
 | 
						|
        $ct++;
 | 
						|
        $res->{"devid_$ct"} = $devid;
 | 
						|
        $res->{"path_$ct"} = Mgd::make_path($devid, $fid);
 | 
						|
    }
 | 
						|
    $res->{fid} = $fid;
 | 
						|
    $res->{dev_count} = $ct;
 | 
						|
    return $self->ok_line($res);
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_create_close {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    # validate parameters
 | 
						|
    my $dmid = $self->check_domain($args) or return 0;
 | 
						|
    my $key = $args->{key};
 | 
						|
 | 
						|
    my $fid = $args->{fid} or return $self->err_line("no_fid");
 | 
						|
    my $devid = $args->{devid} or return $self->err_line("no_devid");
 | 
						|
    my $path = $args->{path} or return $self->err_line("no_path");
 | 
						|
 | 
						|
    # is the provided path what we'd expect for this fid/devid?
 | 
						|
    return $self->err_line("bogus_args")
 | 
						|
        unless $path eq Mgd::make_path($devid, $fid);
 | 
						|
 | 
						|
    # get DB handle
 | 
						|
    my $dbh = Mgd::get_dbh or
 | 
						|
        return $self->err_line("nodb");
 | 
						|
 | 
						|
    # find the temp file we're closing and making real
 | 
						|
    my $trow = $dbh->selectrow_hashref("SELECT classid, dmid, dkey ".
 | 
						|
                                       "FROM tempfile WHERE fid=?",
 | 
						|
                                       undef, $fid);
 | 
						|
    return $self->err_line("no_temp_file") unless $trow;
 | 
						|
 | 
						|
    # if a temp file is closed without a provided-key, that means to
 | 
						|
    # delete it.
 | 
						|
    unless (length($key)) {
 | 
						|
        # add to to-delete list
 | 
						|
        $dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $fid);
 | 
						|
        $dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fid);
 | 
						|
        return $self->ok_line;
 | 
						|
    }
 | 
						|
 | 
						|
    # see if we have a fid for this key already
 | 
						|
    my $old_file = Mgd::key_filerow($dbh, $dmid, $key);
 | 
						|
    if ($old_file) {
 | 
						|
        # add to to-delete list
 | 
						|
        $dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $old_file->{fid});
 | 
						|
        $dbh->do("DELETE FROM file WHERE fid=?", undef, $old_file->{fid});
 | 
						|
    }
 | 
						|
 | 
						|
    # get size of file and verify that it matches what we were given, if anything
 | 
						|
    my $size = Mgd::get_file_size($path);
 | 
						|
    return $self->err_line("size_mismatch", "Expected: $args->{size}; actual: $size; path: $path")
 | 
						|
        if $args->{size} && ($args->{size} != $size);
 | 
						|
 | 
						|
    # TODO: check for EIO?
 | 
						|
    return $self->err_line("empty_file") unless $size;
 | 
						|
 | 
						|
    # insert file_on row
 | 
						|
    $dbh->do("INSERT IGNORE INTO file_on SET fid = ?, devid = ?", undef, $fid, $devid);
 | 
						|
    return $self->err_line("db_error") if $dbh->err;
 | 
						|
 | 
						|
    my $rv = $dbh->do("REPLACE INTO file ".
 | 
						|
                      "SET ".
 | 
						|
                      "  fid=?, dmid=?, dkey=?, length=?, ".
 | 
						|
                      "  classid=?, devcount=0", undef,
 | 
						|
                      $fid, $dmid, $key, $size, $trow->{classid});
 | 
						|
    return $self->err_line("db_error") unless $rv;
 | 
						|
 | 
						|
    $dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fid);
 | 
						|
 | 
						|
    if (Mgd::update_fid_devcount($fid)) {
 | 
						|
        return $self->ok_line();
 | 
						|
    } else {
 | 
						|
        # FIXME: handle this better
 | 
						|
        return $self->err_line("db_error");
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_delete {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    # validate parameters
 | 
						|
    my $dmid = $self->check_domain($args) or return 0;
 | 
						|
    my $key = $args->{key};
 | 
						|
    return $self->err_line("no_key") unless length($key);
 | 
						|
 | 
						|
    # get DB handle
 | 
						|
    my $dbh = Mgd::get_dbh or
 | 
						|
        return $self->err_line("nodb");
 | 
						|
 | 
						|
    # is this fid still owned by this key?
 | 
						|
    my $fid = $dbh->selectrow_array("SELECT fid FROM file WHERE dmid=? AND dkey=?",
 | 
						|
                                    undef, $dmid, $key);
 | 
						|
    return $self->err_line("unknown_key") unless $fid;
 | 
						|
 | 
						|
    $dbh->do("DELETE FROM file WHERE fid=?", undef, $fid);
 | 
						|
    $dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $fid);
 | 
						|
 | 
						|
    return $self->ok_line();
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_list_keys {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    # validate parameters
 | 
						|
    my $dmid = $self->check_domain($args) or return 0;
 | 
						|
    my ($prefix, $after, $limit) = ($args->{prefix}, $args->{after}, $args->{limit});
 | 
						|
    return $self->err_line("no_key") unless $prefix;
 | 
						|
 | 
						|
    # now validate that after matches prefix
 | 
						|
    return $self->err_line('after_mismatch')
 | 
						|
        if $after && $after !~ /^$prefix/;
 | 
						|
 | 
						|
    # verify there are no % or \ characters
 | 
						|
    return $self->err_line('invalid_chars')
 | 
						|
        if $prefix =~ /[%\\]/;
 | 
						|
 | 
						|
    # escape underscores
 | 
						|
    $prefix =~ s/_/\\_/g;
 | 
						|
 | 
						|
    # now fix the input... prefix always ends with a % so that it works
 | 
						|
    # in a LIKE call, and after is either blank or something
 | 
						|
    $prefix .= '%';
 | 
						|
    $after ||= '';
 | 
						|
    $limit ||= 1000;
 | 
						|
    $limit += 0;
 | 
						|
 | 
						|
    # get DB handle
 | 
						|
    my $dbh = Mgd::get_dbh or
 | 
						|
        return $self->err_line("nodb");
 | 
						|
 | 
						|
    # now select out our keys
 | 
						|
    my $keys = $dbh->selectcol_arrayref
 | 
						|
        ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
 | 
						|
         "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
 | 
						|
 | 
						|
    # if we got nothing, say so
 | 
						|
    return $self->err_line('none_match') unless $keys && @$keys;
 | 
						|
 | 
						|
    # construct the output and send
 | 
						|
    my $ret = { key_count => 0, next_after => '' };
 | 
						|
    foreach my $key (@$keys) {
 | 
						|
        $ret->{key_count}++;
 | 
						|
        $ret->{next_after} = $key
 | 
						|
            if $key gt $ret->{next_after};
 | 
						|
        $ret->{"key_$ret->{key_count}"} = $key;
 | 
						|
    }
 | 
						|
    return $self->ok_line($ret);
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_rename {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    # validate parameters
 | 
						|
    my $dmid = $self->check_domain($args) or return 0;
 | 
						|
    my ($fkey, $tkey) = ($args->{from_key}, $args->{to_key});
 | 
						|
    return $self->err_line("no_key") unless $fkey && $tkey;
 | 
						|
 | 
						|
    # get DB handle
 | 
						|
    my $dbh = Mgd::get_dbh or
 | 
						|
        return $self->err_line("nodb");
 | 
						|
 | 
						|
    # rename the file
 | 
						|
    my $ct = $dbh->do('UPDATE file SET dkey = ? WHERE dmid = ? AND dkey = ?',
 | 
						|
                      undef, $tkey, $dmid, $fkey);
 | 
						|
    return $self->err_line("key_exists") if $dbh->err;
 | 
						|
    return $self->err_line("unknown_key") unless $ct > 0;
 | 
						|
 | 
						|
    return $self->ok_line();
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_get_hosts {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    my $dbh = Mgd::get_dbh()
 | 
						|
        or return $self->err_line("nodb");
 | 
						|
 | 
						|
    Mgd::check_host_cache();
 | 
						|
 | 
						|
    my $ret = { hosts => 0 };
 | 
						|
    while (my ($hostid, $row) = each %Mgd::cache_host) {
 | 
						|
        next if defined $args->{hostid} && $hostid != $args->{hostid};
 | 
						|
 | 
						|
        $ret->{hosts}++;
 | 
						|
        while (my ($key, $val) = each %$row) {
 | 
						|
            $ret->{"host$ret->{hosts}_$key"} = $val;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return $self->ok_line($ret);
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_get_devices {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    my $dbh = Mgd::get_dbh()
 | 
						|
        or return $self->err_line("nodb");
 | 
						|
 | 
						|
    my $devs = Mgd::get_device_summary();
 | 
						|
 | 
						|
    my $ret = { devices => 0 };
 | 
						|
    while (my ($devid, $row) = each %$devs) {
 | 
						|
        next if defined $args->{devid} && $devid != $args->{devid};
 | 
						|
 | 
						|
        $ret->{devices}++;
 | 
						|
        while (my ($key, $val) = each %$row) {
 | 
						|
            $ret->{"dev$ret->{devices}_$key"} = $val;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return $self->ok_line($ret);
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_create_domain {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    my $dbh = Mgd::get_dbh()
 | 
						|
        or return $self->err_line("nodb");
 | 
						|
 | 
						|
    my $domain = $args->{domain};
 | 
						|
    return $self->err_line('no_domain') unless length $domain;
 | 
						|
 | 
						|
    # FIXME: add some sort of authentication/limitation on this?
 | 
						|
 | 
						|
    my $dmid = Mgd::domain_id($domain);
 | 
						|
    return $self->err_line('domain_exists') if $dmid;
 | 
						|
 | 
						|
    # get the max domain id
 | 
						|
    my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain');
 | 
						|
    $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
 | 
						|
             undef, $maxid + 1, $domain);
 | 
						|
    return $self->err_line('failure') if $dbh->err;
 | 
						|
 | 
						|
    # return the domain id we created
 | 
						|
    return $self->ok_line({ domain => $domain });
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_create_class {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    my $dbh = Mgd::get_dbh()
 | 
						|
        or return $self->err_line("nodb");
 | 
						|
 | 
						|
    my $domain = $args->{domain};
 | 
						|
    return $self->err_line('no_domain') unless length $domain;
 | 
						|
 | 
						|
    my $class = $args->{class};
 | 
						|
    return $self->err_line('no_class') unless length $class;
 | 
						|
 | 
						|
    my $mindevcount = $args->{mindevcount}+0;
 | 
						|
    return $self->err_line('invalid_mindevcount') unless $mindevcount > 0;
 | 
						|
 | 
						|
    # FIXME: add some sort of authentication/limitation on this?
 | 
						|
 | 
						|
    my $dmid = Mgd::domain_id($domain);
 | 
						|
    return $self->err_line('no_domain') unless $dmid;
 | 
						|
 | 
						|
    my $cid = Mgd::class_id($dmid, $class);
 | 
						|
    return $self->err_line('class_exists') if $cid && !$args->{update};
 | 
						|
 | 
						|
    # update or insert at this point
 | 
						|
    if ($args->{update}) {
 | 
						|
        # now replace the old class
 | 
						|
        $dbh->do("REPLACE INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
 | 
						|
                 undef, $dmid, $cid, $class, $mindevcount);
 | 
						|
    } else {
 | 
						|
        # get the max class id in this domain
 | 
						|
        my $maxid = $dbh->selectrow_array
 | 
						|
            ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid);
 | 
						|
 | 
						|
        # now insert the new class
 | 
						|
        $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
 | 
						|
                 undef, $dmid, $maxid + 1, $class, $mindevcount);
 | 
						|
    }
 | 
						|
    return $self->err_line('failure') if $dbh->err;
 | 
						|
 | 
						|
    # return success
 | 
						|
    return $self->ok_line({ class => $class, mindevcount => $mindevcount, domain => $domain });
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_update_class {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    # simply passes through to create_class with update set
 | 
						|
    $self->cmd_create_class({ %$args, update => 1 });
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_get_domains {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    my $dbh = Mgd::get_dbh()
 | 
						|
        or return $self->err_line("nodb");
 | 
						|
 | 
						|
    my $domains = $dbh->selectall_arrayref('SELECT dmid, namespace FROM domain');
 | 
						|
 | 
						|
    my $ret = {};
 | 
						|
    my $outercount = 0;
 | 
						|
    foreach my $row (@$domains) {
 | 
						|
        $ret->{"domain" . ++$outercount} = $row->[1];
 | 
						|
 | 
						|
        # setup the return row for this set of classes
 | 
						|
        my $classes = $dbh->selectall_arrayref
 | 
						|
            ('SELECT classname, mindevcount FROM class WHERE dmid = ?', undef, $row->[0]);
 | 
						|
        my $innercount = 0;
 | 
						|
        foreach my $irow (@$classes) {
 | 
						|
            $ret->{"domain${outercount}class" . ++$innercount . "name"} = $irow->[0];
 | 
						|
            $ret->{"domain${outercount}class" . $innercount . "mindevcount"} = $irow->[1];
 | 
						|
        }
 | 
						|
 | 
						|
        # record the default class and mindevcount
 | 
						|
        $ret->{"domain${outercount}class" . ++$innercount . "name"} = 'default';
 | 
						|
        $ret->{"domain${outercount}class" . $innercount . "mindevcount"} = $default_mindevcount;
 | 
						|
 | 
						|
        $ret->{"domain${outercount}classes"} = $innercount;
 | 
						|
    }
 | 
						|
    $ret->{"domains"} = $outercount;
 | 
						|
 | 
						|
    return $self->ok_line($ret);
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_get_paths {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    my $key = $args->{key};
 | 
						|
 | 
						|
    return $self->err_line("no_key") unless length($key);
 | 
						|
 | 
						|
    # validate domain
 | 
						|
    my $dmid = $self->check_domain($args) or return 0;
 | 
						|
 | 
						|
    # get DB handle
 | 
						|
    my $dbh = Mgd::get_dbh or
 | 
						|
        return $self->err_line("nodb");
 | 
						|
 | 
						|
    my $filerow = Mgd::key_filerow($dbh, $dmid, $key);
 | 
						|
    return $self->err_line("unknown_key") unless $filerow;
 | 
						|
 | 
						|
    my $fid = $filerow->{fid};
 | 
						|
    my $dsum = Mgd::get_device_summary();
 | 
						|
 | 
						|
    my $ret = {
 | 
						|
        paths => 0,
 | 
						|
    };
 | 
						|
 | 
						|
    # is this fid still owned by this key?
 | 
						|
    my $devids = $dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
 | 
						|
                                          undef, $fid) || [];
 | 
						|
    my $devcount = scalar(@$devids);
 | 
						|
    my $idx = int(rand() * $devcount);
 | 
						|
 | 
						|
    for (1..$devcount) {
 | 
						|
        my $devid = $devids->[($_+$idx) % $devcount];
 | 
						|
        my $dev = $dsum->{$devid};
 | 
						|
        next unless $dev && $dev->{status} eq "alive";
 | 
						|
        my $path = Mgd::make_get_path($devid, $fid);
 | 
						|
        next unless $ret->{paths} || $args->{noverify} ||
 | 
						|
                        (Mgd::get_file_size($path) == $filerow->{length});
 | 
						|
        my $n = ++$ret->{paths};
 | 
						|
        $ret->{"path$n"} = $path;
 | 
						|
        last if $n == 2;   # one verified, one likely seems enough for now.  time will tell.
 | 
						|
    }
 | 
						|
 | 
						|
    return $self->ok_line($ret);
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_set_state {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    # get database handle
 | 
						|
    my $ret = {};
 | 
						|
    my $dbh = Mgd::get_dbh
 | 
						|
        or return $self->err_line('nodb');
 | 
						|
 | 
						|
    # figure out what they want to do
 | 
						|
    my ($host, $dev, $state) = ($args->{host}, $args->{device}+0, $args->{state});
 | 
						|
    return $self->err_line('bad_params')
 | 
						|
        unless $host && $dev && ($state =~ /^(?:alive|down|dead)$/);
 | 
						|
 | 
						|
    # now get this device's current state and host
 | 
						|
    my ($realhost, $curstate) =
 | 
						|
        $dbh->selectrow_array('SELECT hostname, device.status FROM host, device ' .
 | 
						|
                              'WHERE host.hostid = device.hostid AND device.devid = ?',
 | 
						|
                              undef, $dev);
 | 
						|
 | 
						|
    # verify host is the same
 | 
						|
    return $self->err_line('host_mismatch')
 | 
						|
        unless $realhost eq $host;
 | 
						|
 | 
						|
    # make sure the destination state isn't too high
 | 
						|
    return $self->err_line('state_too_high')
 | 
						|
        if $curstate eq 'dead' && $state eq 'alive';
 | 
						|
 | 
						|
    # update the state in the database now
 | 
						|
    $dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $dev);
 | 
						|
    return $self->err_line('failure') if $dbh->err;
 | 
						|
 | 
						|
    # success, state changed
 | 
						|
    return $self->ok_line($ret);
 | 
						|
}
 | 
						|
 | 
						|
sub cmd_stats {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $args = shift;
 | 
						|
 | 
						|
    # get database handle
 | 
						|
    my $ret = {};
 | 
						|
    my $dbh = Mgd::get_dbh
 | 
						|
        or return $self->err_line('nodb');
 | 
						|
 | 
						|
    # get names of all domains and classes for use later
 | 
						|
    my %classes;
 | 
						|
    my $rows = $dbh->selectall_arrayref('SELECT class.dmid, namespace, classid, classname ' .
 | 
						|
                                        'FROM domain, class WHERE class.dmid = domain.dmid');
 | 
						|
    foreach my $row (@$rows) {
 | 
						|
        $classes{$row->[0]}->{name} = $row->[1];
 | 
						|
        $classes{$row->[0]}->{classes}->{$row->[2]} = $row->[3];
 | 
						|
    }
 | 
						|
    $classes{$_}->{classes}->{0} = 'default'
 | 
						|
        foreach keys %classes;
 | 
						|
 | 
						|
    # get host and device information with device status
 | 
						|
    my %devices;
 | 
						|
    my $rows = $dbh->selectall_arrayref('SELECT device.devid, hostname, device.status ' .
 | 
						|
                                        'FROM device, host WHERE device.hostid = host.hostid');
 | 
						|
    foreach my $row (@$rows) {
 | 
						|
        $devices{$row->[0]}->{host} = $row->[1];
 | 
						|
        $devices{$row->[0]}->{status} = $row->[2];
 | 
						|
    }
 | 
						|
 | 
						|
    # if they want replication counts, or didn't specify what they wanted
 | 
						|
    if ($args->{replication} || $args->{all}) {
 | 
						|
        # replication stats
 | 
						|
        my $stats = $dbh->selectall_arrayref('SELECT dmid, classid, devcount, COUNT(devcount) FROM file GROUP BY 1, 2, 3');
 | 
						|
        my $count = 0;
 | 
						|
        foreach my $stat (@$stats) {
 | 
						|
            $count++;
 | 
						|
            $ret->{"replication${count}domain"} = $classes{$stat->[0]}->{name};
 | 
						|
            $ret->{"replication${count}class"} = $classes{$stat->[0]}->{classes}->{$stat->[1]};
 | 
						|
            $ret->{"replication${count}devcount"} = $stat->[2];
 | 
						|
            $ret->{"replication${count}files"} = $stat->[3];
 | 
						|
        }
 | 
						|
        $ret->{"replicationcount"} = $count;
 | 
						|
    }
 | 
						|
 | 
						|
    # file statistics (how many files there are and in what domains/classes)
 | 
						|
    if ($args->{files} || $args->{all}) {
 | 
						|
        my $stats = $dbh->selectall_arrayref('SELECT dmid, classid, COUNT(classid) FROM file GROUP BY 1, 2');
 | 
						|
        my $count = 0;
 | 
						|
        foreach my $stat (@$stats) {
 | 
						|
            $count++;
 | 
						|
            $ret->{"files${count}domain"} = $classes{$stat->[0]}->{name};
 | 
						|
            $ret->{"files${count}class"} = $classes{$stat->[0]}->{classes}->{$stat->[1]};
 | 
						|
            $ret->{"files${count}files"} = $stat->[2];
 | 
						|
        }
 | 
						|
        $ret->{"filescount"} = $count;
 | 
						|
    }
 | 
						|
 | 
						|
    # device statistics (how many files are on each device)
 | 
						|
    if ($args->{devices} || $args->{all}) {
 | 
						|
        my $stats = $dbh->selectall_arrayref('SELECT devid, COUNT(devid) FROM file_on GROUP BY 1');
 | 
						|
        my $count = 0;
 | 
						|
        foreach my $stat (@$stats) {
 | 
						|
            $count++;
 | 
						|
            $ret->{"devices${count}id"} = $stat->[0];
 | 
						|
            $ret->{"devices${count}host"} = $devices{$stat->[0]}->{host};
 | 
						|
            $ret->{"devices${count}status"} = $devices{$stat->[0]}->{status};
 | 
						|
            $ret->{"devices${count}files"} = $stat->[1];
 | 
						|
        }
 | 
						|
        $ret->{"devicescount"} = $count;
 | 
						|
    }
 | 
						|
 | 
						|
    # FIXME: DO! add other stats
 | 
						|
 | 
						|
    return $self->ok_line($ret);
 | 
						|
}
 | 
						|
 | 
						|
sub ok_line {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
 | 
						|
    my $delay = '';
 | 
						|
    if ($self->{querystarttime}) {
 | 
						|
        $delay = sprintf("%.4f ", Time::HiRes::tv_interval([ $self->{querystarttime} ]));
 | 
						|
        $self->{querystarttime} = undef;
 | 
						|
    }
 | 
						|
 | 
						|
    my $id = defined $self->{reqid} ? "$self->{reqid} " : '';
 | 
						|
 | 
						|
    my $args = shift;
 | 
						|
    my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
 | 
						|
    $self->{sock}->write("${id}${delay}OK $argline\r\n");
 | 
						|
    return 1;
 | 
						|
}
 | 
						|
 | 
						|
# first argument: error code.
 | 
						|
# second argument: optional error text.  text will be taken from code if no text provided.
 | 
						|
sub err_line {
 | 
						|
    my QueryWorker $self = shift;
 | 
						|
    my $err_code = shift;
 | 
						|
    my $err_text = shift || {
 | 
						|
        'unknown_command' => "Unknown server command",
 | 
						|
        'no_domain' => "No domain provided",
 | 
						|
        'no_class' => "No class provided",
 | 
						|
        'unreg_domain' => "Domain name invalid/not found",
 | 
						|
        'class_exists' => "That class already exists in that domain",
 | 
						|
        'domain_exists' => "That domain already exists",
 | 
						|
        'invalid_mindevcount' => "The mindevcount must be at least 1",
 | 
						|
        'bad_params' => "Invalid parameters to command; please see documentation",
 | 
						|
        'host_mismatch' => "The device specified doesn't belong to the host specified",
 | 
						|
        'state_too_high' => "Status cannot go from dead to alive; must use down",
 | 
						|
        'failure' => "Operation failed",
 | 
						|
        'key_exists' => "Target key name already exists; can't overwrite.",
 | 
						|
        'none_match' => "No keys match that pattern and after-value (if any).",
 | 
						|
        'after_mismatch' => "Pattern does not match the after-value?",
 | 
						|
        'invalid_chars' => "Patterns must not contain backslashes (\\) or percent signs (%).",
 | 
						|
    }->{$err_code};
 | 
						|
 | 
						|
    my $delay = '';
 | 
						|
    if ($self->{querystarttime}) {
 | 
						|
        $delay = sprintf("%.4f ", Time::HiRes::tv_interval([ $self->{querystarttime} ]));
 | 
						|
        $self->{querystarttime} = undef;
 | 
						|
    }
 | 
						|
 | 
						|
    my $id = defined $self->{reqid} ? "$self->{reqid} " : '';
 | 
						|
 | 
						|
    $self->{sock}->write("${id}${delay}ERR $err_code " . eurl($err_text) . "\r\n");
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
sub eurl
 | 
						|
{
 | 
						|
    my $a = $_[0];
 | 
						|
    $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
 | 
						|
    $a =~ tr/ /+/;
 | 
						|
    return $a;
 | 
						|
}
 | 
						|
 | 
						|
sub decode_url_args
 | 
						|
{
 | 
						|
    my $a = shift;
 | 
						|
    my $buffer = ref $a ? $a : \$a;
 | 
						|
    my $ret = {};
 | 
						|
 | 
						|
    my $pair;
 | 
						|
    my @pairs = split(/&/, $$buffer);
 | 
						|
    my ($name, $value);
 | 
						|
    foreach $pair (@pairs)
 | 
						|
    {
 | 
						|
        ($name, $value) = split(/=/, $pair);
 | 
						|
        $value =~ tr/+/ /;
 | 
						|
        $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
 | 
						|
        $name =~ tr/+/ /;
 | 
						|
        $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
 | 
						|
        $ret->{$name} .= $ret->{$name} ? "\0$value" : $value;
 | 
						|
    }
 | 
						|
    return $ret;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
# Local Variables:
 | 
						|
# mode: perl
 | 
						|
# c-basic-indent: 4
 | 
						|
# indent-tabs-mode: nil
 | 
						|
# End:
 |