3038 lines
99 KiB
Perl
Executable File
3038 lines
99 KiB
Perl
Executable File
#!/usr/bin/perl
|
|
#
|
|
# MogileFS daemon - HEAVILY UNDER CONSTRUCTION
|
|
#
|
|
# Copyright 2004, Danga Interactive
|
|
#
|
|
# Authors:
|
|
# Brad Fitzpatrick <brad@danga.com>
|
|
# Brad Whitaker <whitaker@danga.com>
|
|
# Mark Smith <junior@danga.com>
|
|
#
|
|
# License:
|
|
# undecided.
|
|
#
|
|
|
|
package Mgd;
|
|
|
|
# don't run as root
|
|
die "mogilefsd cannot be run as root\n"
|
|
if $< == 0;
|
|
|
|
use strict;
|
|
use Getopt::Long;
|
|
use IO::Socket;
|
|
use Symbol;
|
|
use POSIX ":sys_wait_h"; # argument for waitpid
|
|
use POSIX;
|
|
use DBI;
|
|
use DBD::mysql;
|
|
use File::Copy ();
|
|
use Carp;
|
|
use File::Basename ();
|
|
use File::Path ();
|
|
use Sys::Syslog;
|
|
use Socket qw(MSG_NOSIGNAL);
|
|
use Time::HiRes qw(gettimeofday tv_interval);
|
|
use Net::Netmask;
|
|
use LWP::UserAgent;
|
|
|
|
|
|
#####################################################################
|
|
### C O N F I G
|
|
#####################################################################
|
|
|
|
use vars qw($dbh $DEFAULT_CONFIG $DEFAULT_MOG_ROOT $MOG_ROOT $MOGSTORED_STREAM_PORT $DEBUG $USE_HTTP $FLAG_NOSIGNAL);
|
|
|
|
$DEFAULT_CONFIG = "/etc/mogilefs/mogilefsd.conf";
|
|
$DEFAULT_MOG_ROOT = "/mnt/mogilefs";
|
|
$MOGSTORED_STREAM_PORT = 7501;
|
|
$DEBUG = 0;
|
|
|
|
# used in send() calls to request not to get SIGPIPEd
|
|
eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL };
|
|
|
|
my (
|
|
%cmdline,
|
|
%cfgfile,
|
|
$config,
|
|
$skipconfig,
|
|
$daemonize,
|
|
$db_dsn,
|
|
$db_user,
|
|
$db_pass,
|
|
$conf_port,
|
|
$query_jobs,
|
|
$delete_jobs,
|
|
$replicate_jobs,
|
|
$reaper_jobs,
|
|
$monitor_jobs,
|
|
$mog_root,
|
|
$default_mindevcount,
|
|
$worker_port,
|
|
$upgrade,
|
|
$min_free_space,
|
|
$max_disk_age,
|
|
$node_timeout, # time in seconds to wait for storage node responses
|
|
);
|
|
|
|
# Command-line options will override
|
|
Getopt::Long::Configure( "bundling" );
|
|
Getopt::Long::GetOptions(
|
|
'c|config=s' => \$config,
|
|
's|skipconfig' => \$skipconfig,
|
|
'd|debug+' => \$cmdline{debug},
|
|
'D|daemon' => \$cmdline{daemonize},
|
|
'dsn=s' => \$cmdline{db_dsn},
|
|
'dbuser=s' => \$cmdline{db_user},
|
|
'dbpass=s' => \$cmdline{db_pass},
|
|
'r|mogroot=s' => \$cmdline{mog_root},
|
|
'p|confport=i' => \$cmdline{conf_port},
|
|
'w|workers=i' => \$cmdline{query_jobs},
|
|
'no_http' => \$cmdline{no_http},
|
|
'workerport=i' => \$cmdline{worker_port},
|
|
'upgrade' => \$upgrade,
|
|
'maxdiskage=i' => \$cmdline{max_disk_age},
|
|
'minfreespace=i' => \$cmdline{min_free_space},
|
|
'default_mindevcount=i' => \$cmdline{default_mindevcount},
|
|
'node_timeout=i' => \$cmdline{node_timeout},
|
|
);
|
|
|
|
$config = $DEFAULT_CONFIG if !$config && -r $DEFAULT_CONFIG;
|
|
|
|
# Read the config file if one was specified
|
|
if ( $config && !$skipconfig ) {
|
|
open my $cf, "<$config" or die "open: $config: $!";
|
|
|
|
my $configLine = qr{
|
|
^\s* # Leading space
|
|
(\w+) # Key
|
|
\s+ =? \s* # space + optional equal + optional space
|
|
(.+?) # Value
|
|
\s*$ # Trailing space
|
|
}x;
|
|
|
|
my $linecount = 0;
|
|
while (defined( my $line = <$cf> )) {
|
|
$linecount++;
|
|
next if $line =~ m{^\s*(#.*)?$};
|
|
die "Malformed config file (line $linecount)" unless $line =~ $configLine;
|
|
|
|
my ( $key, $value ) = ( $1, $2 );
|
|
print STDERR "Setting '$key' to '$value'\n" if $cmdline{debug};
|
|
$cfgfile{ $key } = $value;
|
|
}
|
|
|
|
close $cf;
|
|
}
|
|
|
|
### FUNCTION: choose_value( $name, $default[, $boolean] )
|
|
sub choose_value ($$;$) {
|
|
my ( $name, $default, $boolean ) = @_;
|
|
|
|
return $cmdline{$name} if defined $cmdline{$name};
|
|
return $cfgfile{$name} if defined $cfgfile{$name};
|
|
return $default;
|
|
}
|
|
|
|
|
|
# Fill in defaults for those values which were either loaded from config or
|
|
# specified on the command line. Command line takes precendence, then values in
|
|
# the config file, then the defaults.
|
|
$daemonize = choose_value( 'daemonize', 0, 1 );
|
|
$db_dsn = choose_value( 'db_dsn', "DBI:mysql:mogilefs" );
|
|
$db_user = choose_value( 'db_user', "mogile" );
|
|
$db_pass = choose_value( 'db_pass', "", 1 );
|
|
$conf_port = choose_value( 'conf_port', 7001 );
|
|
$MOG_ROOT = choose_value( 'mog_root', $DEFAULT_MOG_ROOT );
|
|
$query_jobs = choose_value( 'listener_jobs', undef) || # undef if not present, then we
|
|
choose_value( 'query_jobs', 20 ); # fall back to query_jobs, new name
|
|
$delete_jobs = choose_value( 'delete_jobs', 1 );
|
|
$replicate_jobs = choose_value( 'replicate_jobs', 1 );
|
|
$reaper_jobs = choose_value( 'reaper_jobs', 1 );
|
|
$monitor_jobs = choose_value( 'monitor_jobs', 1 );
|
|
$worker_port = choose_value( 'worker_port', 7200 );
|
|
$min_free_space = choose_value( 'min_free_space', 100 );
|
|
$max_disk_age = choose_value( 'max_disk_age', 5 );
|
|
$DEBUG = choose_value( 'debug', 0, 1 );
|
|
$USE_HTTP = ! choose_value( 'no_http', 0, 1);
|
|
$default_mindevcount = choose_value( 'default_mindevcount', 2 );
|
|
$node_timeout = choose_value( 'node_timeout', 3 );
|
|
|
|
### initial setup
|
|
Mgd::validate_dbh();
|
|
my $dbh = Mgd::get_dbh();
|
|
unless ($dbh) {
|
|
die <<NODB;
|
|
Error: unable to establish connection with your MogileFS database.
|
|
|
|
Please verify that you have correctly setup a configuration file or are
|
|
providing the correct information in order to reach the database and try
|
|
running the MogileFS server again.
|
|
NODB
|
|
}
|
|
|
|
### make sure they have a database setup?
|
|
my $host = $dbh->selectrow_hashref('SELECT * FROM host LIMIT 1');
|
|
if ($dbh->err) {
|
|
my $err = $dbh->errstr;
|
|
my $db = ($db_dsn =~ /^DBI:mysql:(.+)$/) ? $1 : '<db_name>';
|
|
die <<ERR;
|
|
Database error: $err
|
|
|
|
This is commonly caused by not having the tables created in the database.
|
|
The easiest way to setup your database is to do something like this:
|
|
|
|
cat devnotes/sql.txt | mysql -u$db_user -p $db
|
|
|
|
NOTE: This will OVERWRITE any MogileFS related tablase in the database.
|
|
Please make SURE that this is what you want to do before you do this!
|
|
ERR
|
|
}
|
|
|
|
### but if it was undef, no hosts?
|
|
if ($host) {
|
|
# see if they have the get port, else update it
|
|
unless (exists $host->{http_get_port}) {
|
|
if ($upgrade) {
|
|
print STDERR "Updating host table...\n";
|
|
$dbh->do("ALTER TABLE host ADD COLUMN http_get_port MEDIUMINT UNSIGNED AFTER http_port");
|
|
die "Error: " . $dbh->errstr . "\n" if $dbh->err;
|
|
} else {
|
|
die <<NEEDUPDATE;
|
|
Error: host table out of date.
|
|
|
|
MogileFS needs to update your database schema. Please rerun the MogileFS
|
|
server with the --upgrade option if you wish for us to do this.
|
|
|
|
This will NOT destroy any existing data.
|
|
NEEDUPDATE
|
|
}
|
|
}
|
|
|
|
# now update to add new columns?
|
|
unless (exists $host->{altip}) {
|
|
if ($upgrade) {
|
|
print STDERR "Updating host table...\n";
|
|
$dbh->do("ALTER TABLE host ADD COLUMN altip VARCHAR(15) AFTER hostip");
|
|
die "Error (1): " . $dbh->errstr . "\n" if $dbh->err;
|
|
$dbh->do("ALTER TABLE host ADD COLUMN altmask VARCHAR(18) AFTER altip");
|
|
die "Error (2): " . $dbh->errstr . "\n" if $dbh->err;
|
|
$dbh->do("ALTER TABLE host ADD UNIQUE altip (altip)");
|
|
die "Error (3): " . $dbh->errstr . "\n" if $dbh->err;
|
|
} else {
|
|
die <<NEEDUPDATE;
|
|
Error: host table out of date.
|
|
|
|
MogileFS needs to update your database schema. Please rerun the MogileFS
|
|
server with the --upgrade option if you wish for us to do this.
|
|
|
|
This will NOT destroy any existing data.
|
|
NEEDUPDATE
|
|
}
|
|
}
|
|
} else {
|
|
die <<NOHOSTS;
|
|
Error: no hosts found.
|
|
|
|
It seems like you don't have any hosts defined in your MogileFS setup.
|
|
This means that we really can't do anything, so we're going to shut down.
|
|
NOHOSTS
|
|
}
|
|
|
|
### and now check for devices
|
|
my $device = $dbh->selectrow_hashref('SELECT * FROM device LIMIT 1');
|
|
if ($device) {
|
|
unless (exists $device->{mb_asof}) {
|
|
if ($upgrade) {
|
|
print STDERR "Updating device table...\n";
|
|
$dbh->do("ALTER TABLE device ADD COLUMN mb_asof INT(10) UNSIGNED AFTER mb_used");
|
|
die "Error: " . $dbh->errstr . "\n" if $dbh->err;
|
|
} else {
|
|
die <<NEEDUPDATE;
|
|
Error: device table out of date.
|
|
|
|
MogileFS needs to update your database schema. Please rerun the MogileFS
|
|
server with the --upgrade option if you wish for us to do this.
|
|
|
|
This will NOT destroy any existing data.
|
|
NEEDUPDATE
|
|
}
|
|
}
|
|
} else {
|
|
die <<NODEVICES;
|
|
Error: no devices found.
|
|
|
|
It seems like you don't have any devices defined in your MogileFS setup.
|
|
This means that we really can't do anything, so we're going to shut down.
|
|
NODEVICES
|
|
}
|
|
|
|
# see if they have the new fid table
|
|
my $fid = $dbh->selectrow_array('SELECT fid FROM unreachable_fids LIMIT 1');
|
|
if ($dbh->err) {
|
|
if ($upgrade) {
|
|
print STDERR "Creating unreachable_fids table...\n";
|
|
$dbh->do("CREATE TABLE unreachable_fids (" .
|
|
" fid INT UNSIGNED NOT NULL," .
|
|
" lastupdate INT UNSIGNED NOT NULL," .
|
|
" PRIMARY KEY (fid)," .
|
|
" INDEX (lastupdate)" .
|
|
")");
|
|
die "Error: " . $dbh->errstr . "\n" if $dbh->err;
|
|
} else {
|
|
die <<NEEDUPDATE;
|
|
Error: database schema out of date.
|
|
|
|
MogileFS needs to update your database schema. Please rerun the MogileFS
|
|
server with the --upgrade option if you wish for us to do this.
|
|
|
|
This will NOT destroy any existing data.
|
|
NEEDUPDATE
|
|
|
|
}
|
|
}
|
|
|
|
# now check for an updated tempfile table
|
|
my $devids = $dbh->selectrow_array('SELECT devids FROM tempfile LIMIT 1');
|
|
if ($dbh->err) {
|
|
if ($upgrade) {
|
|
print STDERR "Updating tempfile table...\n";
|
|
$dbh->do("ALTER TABLE tempfile ADD COLUMN devids VARCHAR(60)");
|
|
die "Error: " . $dbh->errstr . "\n" if $dbh->err;
|
|
} else {
|
|
die <<NEEDUPDATE;
|
|
Error: database schema out of date.
|
|
|
|
MogileFS needs to update your database schema. Please rerun the MogileFS
|
|
server with the --upgrade option if you wish for us to do this.
|
|
|
|
This will NOT destroy any existing data.
|
|
NEEDUPDATE
|
|
}
|
|
}
|
|
|
|
# we're done with this, so undef it before we start forking, as then
|
|
# maybe we'll end up with children having the same socket and everybody
|
|
# writing to it at the same time...
|
|
undef $dbh;
|
|
|
|
#####################################################################
|
|
### D A E M O N F U N C T I O N S
|
|
#####################################################################
|
|
|
|
daemonize() if $daemonize;
|
|
|
|
# keep track of what all child pids are doing, and what jobs are being
|
|
# satisifed.
|
|
my %child = (); # pid -> job
|
|
my %todie = (); # pid -> 1 (lists pids that we've asked to die)
|
|
my %jobs = (); # jobname -> [ min, current ]
|
|
my $psock; # IO::Socket::INET connection to parent (undef if parent)
|
|
my %streamcache; # host -> IO::Socket::INET to mogstored
|
|
my $lastspawntime = 0; # time we last ran spawn_children sub
|
|
our $allkidsup = 0; # if true, all our kids are running. set to 0 when a kid dies.
|
|
our $starttime = time; # time we got going
|
|
our %domaincache; # { domainname => { domainrow } }
|
|
our $domaincachetime = 0;
|
|
our $client_ip = undef; # client ip address
|
|
our $force_alt_zone = 0; # if on, force to use alternate zone (if it's defined)
|
|
|
|
# [ what we want to be at, what we are at ]
|
|
$jobs{'queryworker'} = [ $query_jobs, 0 ];
|
|
$jobs{'delete'} = [ $delete_jobs, 0 ];
|
|
$jobs{'replicate'} = [ $replicate_jobs, 0 ];
|
|
$jobs{'reaper'} = [ $reaper_jobs, 0 ];
|
|
$jobs{'monitor'} = [ $monitor_jobs, 0 ];
|
|
|
|
# open up our log
|
|
openlog('mogilefsd', 'pid', 'daemon');
|
|
Mgd::log('info', 'beginning run');
|
|
|
|
sub validate_dbh {
|
|
return unless $dbh;
|
|
my $id = $dbh->selectrow_array("SELECT CONNECTION_ID()");
|
|
if (! $id) {
|
|
# handle's dead. don't use it. (MySQL-ism above)
|
|
undef $dbh;
|
|
}
|
|
}
|
|
|
|
sub get_dbh {
|
|
return $dbh ||= DBI->connect($db_dsn, $db_user, $db_pass);
|
|
}
|
|
|
|
# Install signal handlers.
|
|
$SIG{TERM} = sub {
|
|
print STDERR scalar keys %child, " children to kill.\n" if $DEBUG;
|
|
my $count = kill( 'TERM' => keys %child );
|
|
print STDERR "Sent SIGTERM to $count children.\n" if $DEBUG;
|
|
exit 0;
|
|
};
|
|
$SIG{INT} = sub {
|
|
print STDERR scalar keys %child, " children to kill.\n" if $DEBUG;
|
|
my $count = kill( 'INT' => keys %child );
|
|
print STDERR "Sent SIGINT to $count children.\n" if $DEBUG;
|
|
exit 0;
|
|
};
|
|
|
|
#############################################################################
|
|
## beginning of main execution path
|
|
#############################################################################
|
|
|
|
# setup server socket to listen for client connections
|
|
my $server = IO::Socket::INET->new(LocalPort => $conf_port,
|
|
Type => SOCK_STREAM,
|
|
Proto => 'tcp',
|
|
Blocking => 0,
|
|
Reuse => 1,
|
|
Listen => 10 )
|
|
or die "Error creating socket: $@\n";
|
|
|
|
# accept handler for new clients
|
|
my $accept_handler = sub {
|
|
my $csock = $server->accept();
|
|
return unless $csock;
|
|
|
|
printf( "Listen child making a Client for %d.\n", fileno($csock) )
|
|
if $DEBUG >= 2;
|
|
my $client = Client->new($csock);
|
|
printf( "Client is %s\n", $client ) if $DEBUG >= 2;
|
|
$client->watch_read(1);
|
|
};
|
|
|
|
# now setup socket for workers to connect to
|
|
my $wserver = IO::Socket::INET->new(LocalPort => $worker_port,
|
|
LocalAddr => '127.0.0.1',
|
|
Type => SOCK_STREAM,
|
|
Proto => 'tcp',
|
|
Blocking => 0,
|
|
Reuse => 1,
|
|
Listen => 10 )
|
|
or die "Error creating socket: $@\n";
|
|
|
|
# accept handler for new workers
|
|
my $waccept_handler = sub {
|
|
my $csock = $wserver->accept();
|
|
return unless $csock;
|
|
|
|
printf( "Listen child making a Client for %d.\n", fileno($csock) )
|
|
if $DEBUG >= 2;
|
|
my $client = WorkerConn->new($csock);
|
|
printf( "Child is %s\n", $client ) if $DEBUG >= 2;
|
|
Frontend->RegisterWorkerConn($client);
|
|
};
|
|
|
|
# thing to keep jobs alive
|
|
my $spawn_children = sub {
|
|
# run only once per second
|
|
return 1 unless time > $lastspawntime;
|
|
$lastspawntime = time();
|
|
|
|
# see if anybody has died, but don't hang up on doing so
|
|
my $pid = waitpid -1, WNOHANG;
|
|
return 1 if $pid <= 0 && $allkidsup;
|
|
$allkidsup = 0; # know something died
|
|
|
|
# when a child dies, figure out what it was doing
|
|
# and note that job has one less worker
|
|
my $job;
|
|
if ($pid > -1 && ($job = delete $child{$pid})) {
|
|
my $extra = $todie{$pid} ? "expected" : "UNEXPECTED";
|
|
error("Child $pid ($job) died: $? ($extra)");
|
|
Frontend->NoteDeadChild($pid);
|
|
|
|
if (my $jobstat = $jobs{$job}) {
|
|
# if the pid is in %todie, then we have asked it to shut down
|
|
# and have already decremented the jobstat counter and don't
|
|
# want to do it again
|
|
unless (my $true = delete $todie{$pid}) {
|
|
# decrement the count of currently running jobs
|
|
$jobstat->[1]--;
|
|
}
|
|
}
|
|
}
|
|
|
|
# foreach job, fork enough children
|
|
while (my ($job, $jobstat) = each %jobs) {
|
|
my $need = $jobstat->[0] - $jobstat->[1];
|
|
if ($need > 0) {
|
|
error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need.");
|
|
for (1..$need) {
|
|
my $cpid = make_new_child($job);
|
|
return 1 unless $cpid;
|
|
$child{$cpid} = $job;
|
|
|
|
# now increase the count of processes currently doing this job
|
|
$jobstat->[1]++;
|
|
}
|
|
}
|
|
}
|
|
|
|
# if we got this far, all jobs have been re-created. note that
|
|
# so we avoid more CPU usage in this post-event-loop callback later
|
|
$allkidsup = 1;
|
|
|
|
# true value keeps us running:
|
|
return 1;
|
|
};
|
|
|
|
# setup Danga::Socket to start handling connections
|
|
Client->DebugLevel( 3 );
|
|
Client->OtherFds( fileno($server) => $accept_handler,
|
|
fileno($wserver) => $waccept_handler, );
|
|
|
|
# setup the post event loop callback to spawn jobs, and the timeout
|
|
Client->SetLoopTimeout( 250 ); # 250 milliseconds
|
|
Client->SetPostLoopCallback($spawn_children);
|
|
|
|
# and now, actually start listening for events
|
|
eval {
|
|
print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
|
|
Client->EventLoop();
|
|
};
|
|
|
|
if ( $@ ) { Mgd::log('err', "crash log: $@"); }
|
|
Mgd::log('info', 'ending run');
|
|
closelog();
|
|
|
|
#############################################################################
|
|
## end of main
|
|
#############################################################################
|
|
|
|
sub daemonize {
|
|
my($pid, $sess_id, $i);
|
|
|
|
## Fork and exit parent
|
|
if ($pid = fork) { exit 0; }
|
|
|
|
## Detach ourselves from the terminal
|
|
croak "Cannot detach from controlling terminal"
|
|
unless $sess_id = POSIX::setsid();
|
|
|
|
## Prevent possibility of acquiring a controling terminal
|
|
$SIG{'HUP'} = 'IGNORE';
|
|
if ($pid = fork) { exit 0; }
|
|
|
|
## Change working directory
|
|
chdir "/";
|
|
|
|
## Clear file creation mask
|
|
umask 0;
|
|
|
|
print STDERR "Daemon running as pid $$.\n" if $DEBUG;
|
|
|
|
## Close open file descriptors
|
|
close(STDIN);
|
|
close(STDOUT);
|
|
close(STDERR);
|
|
|
|
## Reopen stderr, stdout, stdin to /dev/null
|
|
if ( $DEBUG ) {
|
|
open(STDIN, "+>/tmp/mogilefsd.log");
|
|
} else {
|
|
open(STDIN, "+>/dev/null");
|
|
}
|
|
open(STDOUT, "+>&STDIN");
|
|
open(STDERR, "+>&STDIN");
|
|
}
|
|
|
|
sub make_new_child {
|
|
my $job = shift;
|
|
|
|
my $pid;
|
|
my $sigset;
|
|
|
|
# block signal for fork
|
|
$sigset = POSIX::SigSet->new(SIGINT);
|
|
sigprocmask(SIG_BLOCK, $sigset)
|
|
or return error("Can't block SIGINT for fork: $!");
|
|
|
|
return error("fork failed creating $job: $!")
|
|
unless defined ($pid = fork);
|
|
|
|
if ($pid) {
|
|
sigprocmask(SIG_UNBLOCK, $sigset)
|
|
or return error("Can't unblock SIGINT for fork: $!");
|
|
return $pid;
|
|
}
|
|
|
|
# as a child, we want to close these and ignore them
|
|
close($server);
|
|
close($wserver);
|
|
|
|
$SIG{INT} = 'DEFAULT';
|
|
$SIG{TERM} = 'DEFAULT';
|
|
$0 .= " [$job]";
|
|
|
|
# unblock signals
|
|
sigprocmask(SIG_UNBLOCK, $sigset)
|
|
or return error("Can't unblock SIGINT for fork: $!");
|
|
|
|
# set our frontend into child mode
|
|
Frontend->SetAsChild;
|
|
|
|
# try to create a connection to the parent. we die here because
|
|
# we're the child and if we can't talk to the master we really need
|
|
# to die so that a child isn't just sitting around without communication
|
|
# to the parent.
|
|
$psock = IO::Socket::INET->new(PeerAddr => "127.0.0.1",
|
|
PeerPort => $worker_port,
|
|
Type => SOCK_STREAM,
|
|
Proto => 'tcp',)
|
|
or die "Error creating socket to master: $@\n";
|
|
$psock->write("$$ $job\n");
|
|
|
|
# now call our job function
|
|
no strict 'refs';
|
|
my $job_handler = *{"job_$job"}{CODE};
|
|
$job_handler->();
|
|
exit;
|
|
}
|
|
|
|
# given (job, pid), record that this worker is about to die
|
|
sub note_pending_death {
|
|
my ($job, $pid) = @_;
|
|
|
|
die "$job not defined in call to note_pending_death.\n"
|
|
unless defined $jobs{$job};
|
|
|
|
$todie{$pid} = 1;
|
|
$jobs{$job}->[1]--;
|
|
}
|
|
|
|
# log stuff to syslog or the screen
|
|
sub log {
|
|
# simple logging functionality
|
|
if (! $daemonize) {
|
|
# syslog acts like printf so we have to use printf and append a \n
|
|
shift; # ignore the first parameter (info, warn, critical, etc)
|
|
printf(shift(@_) . "\n", @_);
|
|
} else {
|
|
# just pass the parameters to syslog
|
|
syslog(@_);
|
|
}
|
|
}
|
|
|
|
# argument: a string to send to the parent process.
|
|
sub send_to_parent {
|
|
# send a string to our parent
|
|
return unless $psock;
|
|
$psock->write("$_[0]\r\n");
|
|
}
|
|
|
|
# argument: a string to take as indicating the error that just happened.
|
|
sub error {
|
|
if ($psock) {
|
|
# we're a child, pass error to parent
|
|
send_to_parent("error $_[0]");
|
|
} else {
|
|
# we're a parent, so just handle output of error
|
|
Frontend->NoteError(\$_[0]);
|
|
Mgd::log('debug', $_[0]);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
sub job_delete {
|
|
|
|
PASS:
|
|
while (1) {
|
|
sleep 9;
|
|
validate_dbh();
|
|
my $dbh = get_dbh();
|
|
|
|
# see if we have anything from the parent
|
|
send_to_parent('del_i_looped');
|
|
while (defined (my $line = <$psock>)) {
|
|
$line =~ s/\r?\n$//;
|
|
last if $line eq '.';
|
|
if ($line eq 'shutdown') {
|
|
exit 0;
|
|
}
|
|
}
|
|
|
|
my $LIMIT = 500;
|
|
while (1) {
|
|
sleep 1;
|
|
my $delmap = $dbh->selectall_arrayref("SELECT fd.fid, fo.devid ".
|
|
"FROM file_to_delete fd ".
|
|
"LEFT JOIN file_on fo ON fd.fid=fo.fid ".
|
|
"LIMIT $LIMIT");
|
|
my $count = $delmap ? scalar @$delmap : 0;
|
|
next PASS unless $count;
|
|
|
|
my %done; # fid -> 1 (when fid is deleted from all devices)
|
|
my %dev_down; # devid -> 1 (when device times out due to EIO)
|
|
|
|
foreach my $dm (@$delmap) {
|
|
my ($fid, $devid) = @$dm;
|
|
|
|
# if no device is returned from the query above, that
|
|
# means there are no file_on rows for it, and we can consider
|
|
# it now deleted.
|
|
unless ($devid) {
|
|
$done{$fid} = 1;
|
|
next;
|
|
}
|
|
|
|
# don't try to delete from this device if we earlier
|
|
# found it to be timing out with EIO
|
|
next if $dev_down{$devid};
|
|
|
|
my $path = make_path($devid, $fid);
|
|
my $rv = 0;
|
|
if (my $urlref = Mgd::is_url($path)) {
|
|
# hit up the server and delete it
|
|
my $sock = IO::Socket::INET->new(PeerAddr => $urlref->[0],
|
|
PeerPort => $urlref->[1],
|
|
Timeout => 2);
|
|
unless ($sock) {
|
|
# timeout or something, mark this device as down for now and move on
|
|
$dev_down{$devid} = 1;
|
|
next;
|
|
}
|
|
|
|
# send delete request
|
|
error("Sending delete for $path") if $Mgd::DEBUG >= 2;
|
|
$sock->write("DELETE $urlref->[2] HTTP/1.0\r\n\r\n");
|
|
my $response = <$sock>;
|
|
if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
|
|
if (($1 >= 200 && $1 <= 299) || $1 == 404) {
|
|
# effectively means all went well
|
|
$rv = 1;
|
|
} else {
|
|
# remote file system error? mark node as down
|
|
error("Error: unlink failure: $path: $1");
|
|
$dev_down{$devid} = 1;
|
|
next;
|
|
}
|
|
} else {
|
|
error("Error: unknown response line: $response");
|
|
}
|
|
} else {
|
|
# do normal unlink
|
|
$rv = unlink "$Mgd::MOG_ROOT/$path";
|
|
|
|
# device is timing out. take note of it and
|
|
# continue dealing with other deletes
|
|
if (! $rv) {
|
|
if ($! == EIO) {
|
|
$dev_down{$devid} = 1;
|
|
next;
|
|
} elsif ($! == ENOENT) {
|
|
$rv = 1; # count non-existent file as deleted
|
|
}
|
|
}
|
|
}
|
|
|
|
# if we deleted it, or it didn't exist, consider it
|
|
# deleted.
|
|
$dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
|
|
undef, $fid, $devid) if $rv;
|
|
}
|
|
|
|
if (%done) {
|
|
my $in = join(',', keys %done);
|
|
$dbh->do("DELETE FROM file_to_delete WHERE fid IN ($in)");
|
|
}
|
|
|
|
next PASS if $count < $LIMIT;
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
# copies a file from one Perlbal to another utilizing HTTP
|
|
sub http_copy {
|
|
my ($sdevid, $ddevid, $fid) = @_;
|
|
|
|
# handles setting unreachable magic; $error->(reachability, "message")
|
|
my $error = sub {
|
|
if ($_[0]) {
|
|
send_to_parent("repl_unreachable $fid");
|
|
|
|
# update database table
|
|
Mgd::validate_dbh();
|
|
my $dbh = Mgd::get_dbh();
|
|
$dbh->do("REPLACE INTO unreachable_fids VALUES ($fid, UNIX_TIMESTAMP())");
|
|
}
|
|
return error($_[1]);
|
|
};
|
|
|
|
# get some information we'll need
|
|
my $devs = Mgd::get_device_summary();
|
|
my ($sdev, $ddev) = ($devs->{$sdevid}, $devs->{$ddevid});
|
|
return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fid")
|
|
unless ref $sdev && ref $ddev;
|
|
my ($spath, $dpath) = (Mgd::make_http_path($sdevid, $fid),
|
|
Mgd::make_http_path($ddevid, $fid));
|
|
my ($shost, $sport) = (Mgd::hostid_ip($sdev->{hostid}), Mgd::hostid_http_port($sdev->{hostid}));
|
|
my ($dhost, $dport) = (Mgd::hostid_ip($ddev->{hostid}), Mgd::hostid_http_port($ddev->{hostid}));
|
|
unless (defined $spath && defined $dpath && defined $shost && defined $dhost && $sport && $dport) {
|
|
# show detailed information to find out what's not configured right
|
|
error("Error: unable to replicate file fid=$fid from device id $sdevid to device id $ddevid");
|
|
error(" http://$shost:$sport$spath -> http://$dhost:$dport$dpath");
|
|
return 0;
|
|
}
|
|
|
|
# setup our pipe error handler, in case we get closed on
|
|
my $pipe_closed = 0;
|
|
local $SIG{PIPE} = sub { $pipe_closed = 1; };
|
|
|
|
# okay, now get the file
|
|
my $sock = IO::Socket::INET->new(PeerAddr => $shost, PeerPort => $sport, Timeout => 2)
|
|
or return error("Unable to create socket to $shost:$sport for $spath");
|
|
$sock->write("GET $spath HTTP/1.0\r\n\r\n");
|
|
return error("Pipe closed retrieving $spath from $shost:$sport")
|
|
if $pipe_closed;
|
|
|
|
# we just want a content length
|
|
my $clen;
|
|
while (defined (my $line = <$sock>)) {
|
|
$line =~ s/[\s\r\n]+$//;
|
|
last unless length $line;
|
|
if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
|
|
# make sure we get a good response
|
|
return $error->(1, "Error: Resource http://$shost:$sport$spath failed: HTTP $1")
|
|
unless $1 >= 200 && $1 <= 299;
|
|
}
|
|
next unless $line =~ /^Content-length:\s*(\d+)\s*$/i;
|
|
$clen = $1;
|
|
}
|
|
return $error->(1, "File $spath has a content-length of 0; unable to replicate")
|
|
unless $clen;
|
|
|
|
# open target for put
|
|
my $dsock = IO::Socket::INET->new(PeerAddr => $dhost, PeerPort => $dport, Timeout => 2)
|
|
or return error("Unable to create socket to $dhost:$dport for $dpath");
|
|
$dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen\r\n\r\n")
|
|
or return error("Unable to write data to $dpath on $dhost:$dport");
|
|
return error("Pipe closed during write to $dpath on $dhost:$dport")
|
|
if $pipe_closed;
|
|
|
|
# now read data and print while we're reading.
|
|
my ($data, $read, $written) = ('', 0, 0);
|
|
while (!$pipe_closed && (my $bytes = $sock->read($data, $clen - $read))) {
|
|
# now we've read in $bytes bytes
|
|
$read += $bytes;
|
|
my $wbytes = $dsock->send($data);
|
|
$written += $wbytes;
|
|
return error("Error: wrote $wbytes; expected to write $bytes; failed putting to $dpath")
|
|
unless $wbytes == $bytes;
|
|
}
|
|
return error("Error: wrote $written bytes, expected to write $clen")
|
|
unless $written == $clen;
|
|
|
|
# now read in the response line (should be first line)
|
|
my $line = <$dsock>;
|
|
if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
|
|
return 1 if $1 >= 200 && $1 <= 299;
|
|
warn "Error: got a 404 in put: device not on host?: http://$dhost:$dport$dpath"
|
|
if $1 == 404;
|
|
} else {
|
|
warn "Error: HTTP response line not recognized: $line";
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
# replicates $fid if its devcount is less than $min.
|
|
sub replicate {
|
|
my ($dbh, $fid, $min) = @_;
|
|
|
|
my $lockname = "mgfs:fid:$fid:replicate";
|
|
my $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 1)", undef,
|
|
$lockname);
|
|
return error("Unable to obtain lock $lockname")
|
|
unless $lock;
|
|
|
|
# hashref of devid -> $device_row_href (where devid is alive)
|
|
my $devs = Mgd::get_device_summary();
|
|
return error("Device information from get_device_summary is empty")
|
|
unless $devs && %$devs;
|
|
|
|
# learn what devices this file is already on
|
|
my $on_count = 0;
|
|
my %on_host; # hostid -> 1
|
|
my @dead_devid; # list of dead devids
|
|
my @exist_devid; # list of existing devids
|
|
|
|
my $sth = $dbh->prepare("SELECT devid FROM file_on WHERE fid=?");
|
|
$sth->execute($fid);
|
|
die $dbh->errstr if $dbh->err;
|
|
while (my ($devid) = $sth->fetchrow_array) {
|
|
my $d = $devs->{$devid};
|
|
unless ($d) {
|
|
push @dead_devid, $devid;
|
|
next;
|
|
}
|
|
$on_host{$d->{hostid}} = 1;
|
|
$on_count++;
|
|
push @exist_devid, $devid;
|
|
}
|
|
|
|
my $retunlock = sub {
|
|
my $rv = shift;
|
|
$dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname);
|
|
return $rv ? $rv : error($_[0]);
|
|
};
|
|
|
|
return $retunlock->(2) if $on_count >= $min;
|
|
return $retunlock->(0, "Source is no longer available replicating $fid") if $on_count == 0;
|
|
return $retunlock->(0, "No eligible devices available replicating $fid") if @exist_devid == 0;
|
|
|
|
my $sdevid;
|
|
|
|
while ($on_count < $min) {
|
|
my $need = $min - $on_count;
|
|
|
|
my @good_devids = Mgd::find_deviceid(
|
|
random => 1,
|
|
not_on_hosts => [ keys %on_host ],
|
|
weight_by_free => 1,
|
|
);
|
|
|
|
# wasn't able to replicate enough?
|
|
last unless @good_devids;
|
|
|
|
my $ddevid = shift @good_devids;
|
|
$sdevid ||= @exist_devid[int(rand(scalar @exist_devid))];
|
|
|
|
my $rv = undef;
|
|
if ($USE_HTTP) {
|
|
$rv = http_copy($sdevid, $ddevid, $fid);
|
|
} else {
|
|
my $dst_path = $MOG_ROOT . "/" . make_path($ddevid, $fid);
|
|
my $src_path = $MOG_ROOT . "/" . make_path($sdevid, $fid);
|
|
$rv = File::Copy::copy($src_path, $dst_path);
|
|
}
|
|
|
|
return $retunlock->(0, "Copier failed replicating $fid") unless $rv;
|
|
add_file_on($fid, $ddevid, 1);
|
|
$on_count++;
|
|
}
|
|
|
|
return $retunlock->(1);
|
|
}
|
|
|
|
sub get_mindevcounts {
|
|
# make sure we have good info
|
|
Mgd::check_host_cache();
|
|
my $host_ct = keys %Mgd::cache_host;
|
|
|
|
# find the classes for each domainid (including domains without explict classes)
|
|
my %min; # dmid -> classid -> mindevcount
|
|
validate_dbh();
|
|
my $dbh = get_dbh();
|
|
my $sth = $dbh->prepare("SELECT d.dmid, c.classid, c.mindevcount ".
|
|
"FROM domain d LEFT JOIN class c ON d.dmid=c.dmid");
|
|
$sth->execute;
|
|
while (my ($dmid, $classid, $mct) = $sth->fetchrow_array) {
|
|
$min{$dmid} ||= {}; # note the existence of this dmid
|
|
|
|
# classid may be NULL (undef), in which case there are no classes defined
|
|
# and we don't note the mindevcount (yet)
|
|
$min{$dmid}{$classid} = int($host_ct < $mct ? $host_ct : $mct) if defined $classid;
|
|
}
|
|
|
|
|
|
# now iterate over %min again to set the implicit class
|
|
foreach my $dmid (keys %min) {
|
|
# each domain's classid=0, if not defined, has an implied mindevcount of $default_mindevcount
|
|
# which most people will probably use.
|
|
$min{$dmid}{0} = $host_ct < $default_mindevcount ? $host_ct : $default_mindevcount
|
|
unless exists $min{$dmid}{0};
|
|
}
|
|
|
|
# return ref to hash
|
|
return \%min;
|
|
}
|
|
|
|
sub job_monitor {
|
|
my $parse_parent_response = sub {
|
|
# now see what was in our message queue
|
|
while (defined (my $line = <$psock>)) {
|
|
$line =~ s/\r?\n$//;
|
|
last if $line eq '.';
|
|
|
|
# now find out what command this is?
|
|
if ($line eq 'shutdown') {
|
|
exit 0;
|
|
}
|
|
}
|
|
};
|
|
|
|
while (1) {
|
|
sleep 15;
|
|
|
|
# get db and note we're starting a run
|
|
error("Monitor running; scanning usage files")
|
|
if $Mgd::DEBUG >= 1;
|
|
validate_dbh();
|
|
my $dbh = get_dbh() or return 0;
|
|
|
|
# general report in to parent
|
|
send_to_parent('monitor_ping');
|
|
$parse_parent_response->();
|
|
|
|
# get a current list of devices
|
|
my $devs = Mgd::get_device_summary();
|
|
next unless $devs && %$devs;
|
|
|
|
# now iterate over devices
|
|
foreach my $dev (values %$devs) {
|
|
my $host = $Mgd::cache_host{$dev->{hostid}};
|
|
my $port = $host->{http_get_port} || $host->{http_port};
|
|
my $url = "http://$host->{hostip}:$port/dev$dev->{devid}/usage";
|
|
|
|
# now try to get the data with a short timeout
|
|
my $ua = LWP::UserAgent->new( timeout => 2 );
|
|
my $response = $ua->get($url);
|
|
|
|
unless ($response->is_success) {
|
|
error("Failed getting dev$dev->{devid}: " . $response->status_line);
|
|
next;
|
|
}
|
|
|
|
my %stats;
|
|
my $data = $response->content;
|
|
foreach (split(/\r?\n/, $data)) {
|
|
next unless /^(\w+)\s*:\s*(.+)$/;
|
|
$stats{$1} = $2;
|
|
}
|
|
|
|
my ($used, $total) = ($stats{used}, $stats{total});
|
|
unless ($used && $total) {
|
|
error("dev$dev->{devid} reports used = $used, total = $total, error?");
|
|
next;
|
|
}
|
|
|
|
# bytes => megabytes
|
|
$used /= 1024;
|
|
$total /= 1024;
|
|
|
|
$dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = UNIX_TIMESTAMP() " .
|
|
"WHERE devid = ?", undef, int($total), int($used), $dev->{devid});
|
|
if ($dbh->err) {
|
|
error("Database error in update query: " . $dbh->errstr);
|
|
next;
|
|
}
|
|
|
|
error("dev$dev->{devid}: used = $used, total = $total")
|
|
if $Mgd::DEBUG >= 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
sub job_reaper {
|
|
my $parse_parent_response = sub {
|
|
# now see what was in our message queue
|
|
while (defined (my $line = <$psock>)) {
|
|
$line =~ s/\r?\n$//;
|
|
last if $line eq '.';
|
|
|
|
# now find out what command this is?
|
|
if ($line eq 'shutdown') {
|
|
exit 0;
|
|
}
|
|
}
|
|
};
|
|
|
|
while (1) {
|
|
sleep 10;
|
|
|
|
# get db and note we're starting a run
|
|
error("Reaper running; looking for dead devices")
|
|
if $Mgd::DEBUG >= 1;
|
|
validate_dbh();
|
|
my $dbh = get_dbh() or return 0;
|
|
|
|
# general report in to parent
|
|
send_to_parent('reaper_ping');
|
|
$parse_parent_response->();
|
|
|
|
# get a current list of devices
|
|
my $devs = Mgd::get_device_summary(1);
|
|
my @deaddevs = grep { $_->{status} eq 'dead' } values %$devs;
|
|
next unless @deaddevs;
|
|
|
|
# now iterate over dead devices
|
|
foreach my $dev (@deaddevs) {
|
|
my $devid = $dev->{devid};
|
|
|
|
# look for files on this device
|
|
my $fids = $dbh->selectcol_arrayref('SELECT fid FROM file_on WHERE devid = ? LIMIT 1000',
|
|
undef, $devid);
|
|
if ($dbh->err) {
|
|
error("Error selecting jobs to reap: " . $dbh->errstr);
|
|
next;
|
|
}
|
|
next unless $fids && @$fids;
|
|
|
|
# note we got some
|
|
error("Found " . scalar(@$fids) . " files on dead device $devid");
|
|
|
|
# now iterate
|
|
foreach my $fid (@$fids) {
|
|
$dbh->do('DELETE FROM file_on WHERE fid = ? AND devid = ?',
|
|
undef, $fid, $devid);
|
|
if ($dbh->err) {
|
|
error("Error deleting from file_on (file $fid, device $devid): " . $dbh->errstr);
|
|
next;
|
|
}
|
|
|
|
# now update the fid count
|
|
unless (Mgd::update_fid_devcount($fid)) {
|
|
error("Error updating fid $fid devcount");
|
|
next;
|
|
}
|
|
|
|
# if debugging on, note this is done
|
|
error("Reaper noted fid $fid no longer on device $devid")
|
|
if $Mgd::DEBUG >= 2;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
sub job_replicate {
|
|
my $parse_parent_response = sub {
|
|
# now see what was in our message queue
|
|
while (defined (my $line = <$psock>)) {
|
|
$line =~ s/\r?\n$//;
|
|
last if $line eq '.';
|
|
|
|
# now find out what command this is?
|
|
if ($line =~ /^repl_was_done (\d+)/ && $_[0]) {
|
|
delete $_[0]->{$1};
|
|
} elsif ($line eq 'shutdown') {
|
|
exit 0;
|
|
}
|
|
}
|
|
};
|
|
|
|
# { fid => lastcheck }; instructs us not to replicate this fid... we will clear
|
|
# out fids from this list that are expired
|
|
my %fidfailure;
|
|
|
|
# { fid => 1 }; used to keep track of fids we find in the unreachable_fids table
|
|
my %unreachable;
|
|
|
|
my $sleep = 2;
|
|
while (1) {
|
|
sleep $sleep;
|
|
validate_dbh();
|
|
my $dbh = get_dbh() or return 0;
|
|
|
|
# general report in to parent
|
|
send_to_parent('repl_ping');
|
|
$parse_parent_response->(undef);
|
|
|
|
# start off assuming that we're going to get everything replicated and then take a break
|
|
$sleep = 2;
|
|
|
|
# update our unreachable fid list... we consider them good for 15 minutes
|
|
my $urfids = $dbh->selectall_arrayref('SELECT fid, lastupdate FROM unreachable_fids');
|
|
die $dbh->errstr if $dbh->err;
|
|
foreach my $r (@{$urfids || []}) {
|
|
my $nv = $r->[1] + 900;
|
|
unless ($fidfailure{$r->[0]} && $fidfailure{$r->[0]} < $nv) {
|
|
# given that we might have set it below to a time past the unreachable
|
|
# 15 minute timeout, we want to only overwrite %fidfailure's idea of
|
|
# the expiration time if we are extending it
|
|
$fidfailure{$r->[0]} = $nv;
|
|
}
|
|
$unreachable{$r->[0]} = 1;
|
|
}
|
|
|
|
# get the min dev counts
|
|
my %min = %{ Mgd::get_mindevcounts() };
|
|
|
|
# iterate through each domain, replicating its contents
|
|
foreach my $dmid (keys %min) {
|
|
# iterate through each class, including the implicit class 0
|
|
while (my ($classid, $min) = each %{$min{$dmid}}) {
|
|
error("Checking replication for dmid=$dmid, classid=$classid, min=$min")
|
|
if $Mgd::DEBUG >= 1;
|
|
|
|
my $LIMIT = 1000;
|
|
|
|
# try going from devcount of 1 up to devcount of $min-1
|
|
my %fidtodo; # fid => 1
|
|
my $fixed = 0;
|
|
my $attempted = 0;
|
|
my $devcount = 1;
|
|
while ($fixed < $LIMIT && $devcount < $min) {
|
|
my $now = time();
|
|
my $fids = $dbh->selectcol_arrayref("SELECT fid FROM file WHERE dmid=? AND classid=? ".
|
|
"AND devcount = ? AND length IS NOT NULL ".
|
|
"LIMIT $LIMIT", undef, $dmid, $classid, $devcount);
|
|
die $dbh->errstr if $dbh->err;
|
|
$fidtodo{$_} = 1 foreach @$fids;
|
|
|
|
# increase devcount so we try to replicate the files at the next devcount
|
|
$devcount++;
|
|
|
|
# see if we have any files to replicate
|
|
my $count = $fids ? scalar @$fids : 0;
|
|
error(" found $count for dmid=$dmid/classid=$classid/min=$min")
|
|
if $Mgd::DEBUG >= 1;
|
|
next unless $count;
|
|
|
|
# randomize the list so multiple daemons/threads working on
|
|
# replicate at the same time don't all fight over the
|
|
# same fids to move
|
|
my @randfids = randlist(@$fids);
|
|
|
|
error("Need to replicate: $dmid/$classid: @$fids") if $Mgd::DEBUG >= 2;
|
|
foreach my $fid (@randfids) {
|
|
# now replicate this fid
|
|
$attempted++;
|
|
next unless $fidtodo{$fid};
|
|
|
|
if ($fidfailure{$fid}) {
|
|
if ($fidfailure{$fid} < $now) {
|
|
delete $fidfailure{$fid};
|
|
} else {
|
|
next;
|
|
}
|
|
}
|
|
|
|
if (my $status = replicate($dbh, $fid, $min)) {
|
|
# $status is either 0 (failure, handled below), 1 (success, we actually
|
|
# replicated this file), or 2 (success, but someone else replicated it).
|
|
# so if it's 2, we just want to go to the next fid. this file is done.
|
|
next if $status == 2;
|
|
|
|
# if it was no longer reachable, mark it reachable
|
|
if (delete $unreachable{$fid}) {
|
|
$dbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid);
|
|
die $dbh->errstr if $dbh->err;
|
|
}
|
|
|
|
# housekeeping
|
|
$fixed++;
|
|
send_to_parent("repl_i_did $fid");
|
|
$parse_parent_response->(\%fidtodo);
|
|
|
|
# status update
|
|
if ($fixed % 20 == 0) {
|
|
my $ratio = $fixed/$attempted*100;
|
|
error(sprintf("replicated=$fixed, attempted=$attempted, ratio=%.2f%%", $ratio))
|
|
if $fixed % 20 == 0;
|
|
}
|
|
} else {
|
|
# failed in replicate, don't retry for a minute
|
|
$fidfailure{$fid} = $now + 60;
|
|
}
|
|
}
|
|
}
|
|
|
|
# if we did 1000, we just want to jump to the next pass through all domains and classes without pausing
|
|
$sleep = 0 if $fixed >= $LIMIT;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
sub job_queryworker {
|
|
# process lines of input, blocking.
|
|
my $worker = QueryWorker->new($psock);
|
|
while (defined (my $line = <$psock>)) {
|
|
$line =~ s/[\r\n]+$//;
|
|
validate_dbh();
|
|
$worker->process_line(\$line);
|
|
}
|
|
}
|
|
|
|
|
|
#####################################################################
|
|
### S E R V E R A P I F U N C T I O N S
|
|
#####################################################################
|
|
|
|
# returns hashref of devid -> $device_row_href (where devid is alive/down, but not dead)
|
|
# cached for 15 seconds.
|
|
use vars qw($cache_device_summary $cache_device_summary_time %cache_host $cache_host_time);
|
|
|
|
# general purpose device locator. example:
|
|
#
|
|
# my $devid = Mgd::find_deviceid(
|
|
# random => 1, # get random device (else find first suitable)
|
|
# min_free_space => 100, # with at least 100MB free
|
|
# weight_by_free => 1, # find result weighted by free space
|
|
# max_disk_age => 5, # minutes of age the last usage report can be before we ignore the disk
|
|
# not_on_hosts => [ 1, 2 ], # no devices on hosts 1 and 2
|
|
# );
|
|
#
|
|
# returns undef if no suitable device was found. else, if you wanted an
|
|
# array will return an array of the suitable devices--if you want just a
|
|
# single item, you get just the first one found.
|
|
sub find_deviceid {
|
|
my %opts = ( @_ );
|
|
|
|
# copy down global minimum free space if not specified
|
|
$opts{min_free_space} ||= $min_free_space;
|
|
$opts{max_disk_age} ||= $max_disk_age;
|
|
$opts{max_disk_age} = time() - ($opts{max_disk_age} * 60);
|
|
|
|
# setup for iterating over devices
|
|
my $devs = Mgd::get_device_summary();
|
|
my @devids = keys %{$devs || {}};
|
|
my $devcount = scalar(@devids);
|
|
my $start = $opts{random} ? int(rand($devcount)) : 0;
|
|
my %not_on_host = ( map { $_ => 1 } @{$opts{not_on_hosts} || []} );
|
|
my $total_free = 0;
|
|
|
|
# now find a device that matches what they want
|
|
my @list;
|
|
for (my $i = 0; $i < $devcount; $i++) {
|
|
my $idx = ($i + $start) % $devcount;
|
|
my $dev = $devs->{$devids[$idx]};
|
|
|
|
# series of suitability checks
|
|
next unless $dev->{status} eq 'alive';
|
|
next if $not_on_host{$dev->{hostid}};
|
|
next if $opts{max_disk_age} && $dev->{mb_asof} &&
|
|
$dev->{mb_asof} < $opts{max_disk_age};
|
|
next if $opts{min_free_space} && $dev->{mb_total} &&
|
|
$dev->{mb_free} < $opts{min_free_space};
|
|
|
|
# we get here, this is a suitable device
|
|
push @list, $dev->{devid};
|
|
$total_free += $dev->{mb_free};
|
|
}
|
|
|
|
# now we have a list ordered randomly, do free space weighting
|
|
if ($opts{weight_by_free}) {
|
|
my $rand = int(rand($total_free));
|
|
my $cur = 0;
|
|
|
|
foreach my $devid (@list) {
|
|
$cur += $devs->{$devid}->{mb_free};
|
|
return $devid if $cur >= $rand;
|
|
}
|
|
}
|
|
|
|
# return first listed suitable device
|
|
return @list ? $list[0] : undef;
|
|
}
|
|
|
|
sub get_device_summary {
|
|
my $include_dead = shift() ? ", 'dead'" : '';
|
|
my $now = time;
|
|
return $cache_device_summary if $cache_device_summary_time > $now - 15;
|
|
|
|
my $dbh = get_dbh();
|
|
|
|
# learn devices
|
|
my %dev; #
|
|
my %hostdevs; # hostid -> [ devid ] (where devid is alive/down, but not dead)
|
|
my $sth = $dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
|
|
"mb_used, mb_asof, status FROM device ".
|
|
"WHERE status IN ('alive', 'down' $include_dead)");
|
|
$sth->execute;
|
|
$dev{$_->{devid}} = $_ while $_ = $sth->fetchrow_hashref;
|
|
|
|
# now override device status with host status if the host status is less than the device status
|
|
Mgd::check_host_cache();
|
|
foreach my $devid (keys %dev) {
|
|
# makes others have an easier time of finding devices by free space
|
|
$dev{$devid}->{mb_free} = $dev{$devid}->{mb_total} - $dev{$devid}->{mb_used};
|
|
|
|
my $host_status = $cache_host{$dev{$devid}->{hostid}}->{status};
|
|
if ($dev{$devid}->{status} eq 'alive' && $host_status ne 'alive') {
|
|
$dev{$devid}->{status} = $host_status;
|
|
} elsif ($dev{$devid}->{status} eq 'down' && $host_status eq 'dead') {
|
|
$dev{$devid}->{status} = $host_status;
|
|
}
|
|
}
|
|
|
|
$cache_device_summary_time = $now;
|
|
return $cache_device_summary = \%dev;
|
|
}
|
|
|
|
sub check_host_cache {
|
|
my $now = time;
|
|
return if $cache_host_time > $now - 5;
|
|
|
|
%cache_host = ();
|
|
my $dbh = get_dbh();
|
|
my $sth = $dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
|
|
"hostip, http_port, http_get_port, remoteroot, altip, altmask FROM host");
|
|
$sth->execute;
|
|
while (my $host = $sth->fetchrow_hashref) {
|
|
$cache_host{$host->{hostid}} = $host;
|
|
$cache_host{$host->{hostid}}->{mask} = Net::Netmask->new2($host->{altmask})
|
|
if $host->{altip} && $host->{altmask};
|
|
}
|
|
$cache_host_time = $now;
|
|
}
|
|
|
|
sub key_filerow {
|
|
my ($dbh, $dmid, $key) = @_;
|
|
my $row = $dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
|
|
"FROM file WHERE dmid=? AND dkey=?",
|
|
undef, $dmid, $key);
|
|
return $row;
|
|
}
|
|
|
|
# see if we should reduce the number of active children
|
|
sub job_needs_reduction {
|
|
my $job = shift;
|
|
return $jobs{$job}->[0] < $jobs{$job}->[1];
|
|
}
|
|
|
|
# given a file descriptor number and a timeout, wait for that descriptor to
|
|
# become readable; returns 0 or 1 on if it did or not
|
|
sub wait_for_readability {
|
|
my ($fileno, $timeout) = @_;
|
|
return 0 unless $fileno && $timeout;
|
|
|
|
my $rin;
|
|
vec($rin, $fileno, 1) = 1;
|
|
my $nfound = select($rin, undef, undef, $timeout);
|
|
|
|
# nfound can be undef or 0, both failures, or 1, a success
|
|
return $nfound ? 1 : 0;
|
|
}
|
|
|
|
# get size of file, return 0 on error
|
|
sub get_file_size {
|
|
my $path = shift;
|
|
|
|
# quick case -- just a file on disk
|
|
unless ($path =~ m!^http://([^:/]+)(?::(\d+))?(/.+)$!) {
|
|
return -s "$Mgd::MOG_ROOT/$path"
|
|
}
|
|
my ($host, $port, $uri) = ($1, $2, $3);
|
|
|
|
# don't sigpipe us
|
|
local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
|
|
|
|
# setup for sending size request to cached host
|
|
my $req = "size $uri\r\n";
|
|
my $reqlen = length $req;
|
|
my $rv = 0;
|
|
my $sock = $streamcache{$host};
|
|
|
|
# sub to parse the response from $sock. common code, so we have it here.
|
|
my $parse_response = sub {
|
|
# give the socket 3 seconds to become readable
|
|
unless (Mgd::wait_for_readability(fileno($sock), $node_timeout)) {
|
|
close($sock);
|
|
return 0;
|
|
}
|
|
|
|
# now we know there's readable data
|
|
my $line = <$sock>;
|
|
return 0 unless defined $line;
|
|
return 0 unless $line =~ /^(\S+)\s+(-?\d+)/; # expected format: "uri size"
|
|
return error("get_file_size() requested size of $path, got back size of $1 ($2 bytes)")
|
|
if $1 ne $uri;
|
|
return $2+0;
|
|
};
|
|
|
|
# try using the cached socket
|
|
if ($sock) {
|
|
$rv = send($sock, $req, $FLAG_NOSIGNAL);
|
|
if ($!) {
|
|
undef $streamcache{$host};
|
|
} elsif ($rv != $reqlen) {
|
|
return error("send() didn't return expected length ($rv, not $reqlen) for $path");
|
|
} else {
|
|
# success
|
|
return $parse_response->();
|
|
}
|
|
}
|
|
|
|
# try creating a connection to the stream
|
|
unless ($rv) {
|
|
$sock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $MOGSTORED_STREAM_PORT, Timeout => 5);
|
|
$streamcache{$host} = $sock;
|
|
if ($sock) {
|
|
$rv = send($sock, $req, $FLAG_NOSIGNAL);
|
|
if ($!) {
|
|
return error("error talking to mogstored stream ($path): $!");
|
|
} elsif ($rv != $reqlen) {
|
|
return error("send() didn't return expected length ($rv, not $reqlen) for $path");
|
|
} else {
|
|
# success
|
|
return $parse_response->();
|
|
}
|
|
}
|
|
}
|
|
|
|
# failure case: use a HEAD request to get the size of the file
|
|
my $sock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $port, Timeout => 3)
|
|
or return error("get_file_size() unable to contact mogstored for size of $path");
|
|
$sock->write("HEAD $uri HTTP/1.0\r\n\r\n");
|
|
while (defined (my $line = <$sock>)) {
|
|
if ($line =~ /^Content-length: (\d+)/i) {
|
|
# success
|
|
return $1+0;
|
|
}
|
|
}
|
|
|
|
# no content length found?
|
|
return error("get_file_size() found no content-length header in response for $path");
|
|
}
|
|
|
|
sub class_id {
|
|
my ($dmid, $class) = @_;
|
|
return undef unless $dmid > 0 && length $class;
|
|
|
|
my $dbh = Mgd::get_dbh;
|
|
my $classid = $dbh->selectrow_array
|
|
("SELECT classid FROM class WHERE dmid=? AND classname=?", undef, $dmid, $class)
|
|
or return undef;
|
|
return undef unless $classid;
|
|
return $classid;
|
|
}
|
|
|
|
sub domain_id {
|
|
# check the cache for this item
|
|
my $now = time();
|
|
if ($domaincachetime + 5 < $now) {
|
|
%domaincache = ();
|
|
|
|
# now get updated list
|
|
my $dbh = Mgd::get_dbh;
|
|
my $domains = $dbh->selectall_arrayref('SELECT dmid, namespace FROM domain');
|
|
foreach my $row (@{$domains || []}) {
|
|
# namespace -> dmid
|
|
$domaincache{$row->[1]} = $row->[0];
|
|
}
|
|
|
|
$domaincachetime = $now;
|
|
}
|
|
|
|
# just use cached version
|
|
return $domaincache{$_[0]};
|
|
}
|
|
|
|
sub class_name {
|
|
my ($dmid, $classid) = @_;
|
|
return undef unless $dmid > 0 && length $classid;
|
|
# FIXME: cache this
|
|
|
|
# lookup class
|
|
my $dbh = Mgd::get_dbh;
|
|
my $classname = $dbh->selectrow_array
|
|
("SELECT classname FROM class WHERE dmid=? AND classid=?", undef, $dmid, $classid)
|
|
or return undef;
|
|
return undef unless $classname;
|
|
return $classname;
|
|
}
|
|
|
|
sub domain_name {
|
|
my $dmid = shift;
|
|
# FIXME: cache this
|
|
|
|
# lookup domain
|
|
my $dbh = Mgd::get_dbh;
|
|
my $namespace = $dbh->selectrow_array
|
|
("SELECT namespace FROM domain WHERE dmid=?", undef, $dmid);
|
|
return $namespace;
|
|
|
|
}
|
|
|
|
sub hostid_name {
|
|
my $hostid = shift;
|
|
check_host_cache();
|
|
my $h = $cache_host{$hostid};
|
|
return $h ? $h->{hostname} : undef;
|
|
}
|
|
|
|
sub hostid_ip {
|
|
my $hostid = shift;
|
|
check_host_cache();
|
|
my $h = $cache_host{$hostid};
|
|
return undef unless $h;
|
|
|
|
# if we have a client ip and an object for alt matching...
|
|
if ($h->{mask} && $h->{altip} &&
|
|
($force_alt_zone || ($client_ip && $h->{altip} && $h->{mask}->match($client_ip)))) {
|
|
return $h->{altip};
|
|
} else {
|
|
return $h->{hostip};
|
|
}
|
|
}
|
|
|
|
sub hostid_http_port {
|
|
my $hostid = shift;
|
|
check_host_cache();
|
|
my $h = $cache_host{$hostid};
|
|
return $h ? $h->{http_port} : undef;
|
|
}
|
|
|
|
sub hostid_http_get_port {
|
|
my $hostid = shift;
|
|
check_host_cache();
|
|
my $h = $cache_host{$hostid};
|
|
return $h ? $h->{http_get_port} : undef;
|
|
}
|
|
|
|
sub make_http_path {
|
|
my ($devid, $fid) = @_;
|
|
|
|
my $dsum = get_device_summary();
|
|
my $dinfo = $dsum->{$devid};
|
|
return undef unless $dinfo;
|
|
my $hostname = hostid_name($dinfo->{hostid});
|
|
|
|
my $nfid = sprintf '%010d', $fid;
|
|
my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} );
|
|
|
|
return "/dev$devid/$b/$mmm/$ttt/$nfid.fid";
|
|
}
|
|
|
|
sub make_full_url {
|
|
# set use_get_port to be true to specify to use the get port
|
|
my ($devid, $fid, $use_get_port) = @_;
|
|
|
|
# get some information we'll need
|
|
my $devs = Mgd::get_device_summary();
|
|
my $dev = $devs->{$devid} or return undef;
|
|
my $path = Mgd::make_http_path($devid, $fid) or return undef;
|
|
my $host = Mgd::hostid_ip($dev->{hostid}) or return undef;
|
|
my $port = $use_get_port ? Mgd::hostid_http_get_port($dev->{hostid}) : undef;
|
|
$port ||= Mgd::hostid_http_port($dev->{hostid}) or return undef;
|
|
return "http://$host:$port$path";
|
|
}
|
|
|
|
# if given an HTTP URL, break it down into [ host, port, URI ], else
|
|
# returns undef
|
|
sub is_url {
|
|
my $path = shift;
|
|
if ($path =~ m!^http://(.+?)(?::(\d+))?(/.+)$!) {
|
|
return [ $1, $2 || 80, $3 ];
|
|
}
|
|
return undef;
|
|
}
|
|
|
|
sub make_path {
|
|
# jump out if we should be using HTTP stuff
|
|
return Mgd::make_full_url(@_) if $USE_HTTP;
|
|
|
|
my ($devid, $fid) = @_;
|
|
|
|
my $dsum = get_device_summary();
|
|
my $dinfo = $dsum->{$devid};
|
|
return undef unless $dinfo;
|
|
my $hostname = hostid_name($dinfo->{hostid});
|
|
|
|
my $nfid = sprintf '%010d', $fid;
|
|
my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} );
|
|
|
|
my $path = "$hostname/dev$devid/$b/$mmm/$ttt/$nfid.fid";
|
|
make_dirs( "$MOG_ROOT/$path" ) or return undef;
|
|
|
|
return $path;
|
|
}
|
|
|
|
sub make_get_path {
|
|
# the get path only changes for HTTP mode
|
|
return Mgd::make_path(@_) unless $USE_HTTP;
|
|
return Mgd::make_full_url(@_, 1);
|
|
}
|
|
|
|
sub make_dirs
|
|
{
|
|
my $filename = shift;
|
|
my $dir = File::Basename::dirname($filename);
|
|
eval { File::Path::mkpath($dir, 0, 0775); };
|
|
return $@ ? 0 : 1;
|
|
}
|
|
|
|
sub add_file_on {
|
|
my ($fid, $devid, $no_lock) = @_;
|
|
|
|
my $dbh = get_dbh() or return 0;
|
|
|
|
my $rv = $dbh->do("INSERT IGNORE INTO file_on SET fid=?, devid=?",
|
|
undef, $fid, $devid);
|
|
if ($rv > 0) {
|
|
return update_fid_devcount($fid, $no_lock);
|
|
} else {
|
|
# was already on that device
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
sub update_fid_devcount {
|
|
my ($fid, $no_lock) = @_;
|
|
|
|
my $dbh = get_dbh() or return 0;
|
|
|
|
my $lockname = "mgfs:fid:$fid";
|
|
unless ($no_lock) {
|
|
my $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 10)", undef,
|
|
$lockname);
|
|
return 0 unless $lock;
|
|
}
|
|
my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
|
|
undef, $fid);
|
|
|
|
$dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
|
|
$ct, $fid);
|
|
|
|
unless ($no_lock) {
|
|
$dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname);
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
sub randlist
|
|
{
|
|
my @rlist = @_;
|
|
my $size = scalar(@rlist);
|
|
|
|
my $i;
|
|
for ($i=0; $i<$size; $i++)
|
|
{
|
|
unshift @rlist, splice(@rlist, $i+int(rand()*($size-$i)), 1);
|
|
}
|
|
return @rlist;
|
|
}
|
|
|
|
|
|
#####################################################################
|
|
### C L I E N T C L A S S
|
|
### A client is a user connection for sending requests to us. Requests
|
|
### can either be normal user requests to be sent to a QueryWorker
|
|
### or management requests that start with a !.
|
|
#####################################################################
|
|
package Client;
|
|
|
|
use Danga::Socket ();
|
|
use base qw{Danga::Socket};
|
|
|
|
use fields qw{read_buf};
|
|
|
|
sub new {
|
|
my Client $self = shift;
|
|
$self = fields::new($self) unless ref $self;
|
|
$self->SUPER::new( @_ );
|
|
return $self;
|
|
}
|
|
|
|
# Client
|
|
sub event_read {
|
|
my Client $self = shift;
|
|
|
|
my $bref = $self->read(1024);
|
|
return $self->close() unless defined $bref;
|
|
$self->{read_buf} .= $$bref;
|
|
|
|
while ($self->{read_buf} =~ s/^(.*?)\r?\n//) {
|
|
next unless length $1;
|
|
Frontend->HandleClientRequest($self, $1);
|
|
}
|
|
}
|
|
|
|
# Client
|
|
sub event_err { my $self = shift; $self->close; }
|
|
sub event_hup { my $self = shift; $self->close; }
|
|
|
|
# just note that we've died
|
|
sub close {
|
|
# mark us as being dead
|
|
my Client $self = shift;
|
|
Frontend->NoteDeadClient($self);
|
|
$self->SUPER::close(@_);
|
|
}
|
|
|
|
|
|
#####################################################################
|
|
### W O R K E R C O N N C L A S S
|
|
### This class maintains a connection to one of our various classes
|
|
### of workers.
|
|
#####################################################################
|
|
package WorkerConn;
|
|
|
|
use Danga::Socket ();
|
|
use base qw{Danga::Socket};
|
|
|
|
use fields qw{read_buf job pid cmd_buf reqid};
|
|
|
|
sub new {
|
|
my WorkerConn $self = shift;
|
|
$self = fields::new($self) unless ref $self;
|
|
$self->SUPER::new( @_ );
|
|
|
|
# mark as not a worker by default
|
|
$self->{pid} = 0;
|
|
$self->{reqid} = 0;
|
|
$self->{job} = undef;
|
|
$self->{cmd_buf} = [];
|
|
|
|
return $self;
|
|
}
|
|
|
|
sub event_read {
|
|
my WorkerConn $self = shift;
|
|
|
|
my $bref = $self->read(1024);
|
|
return $self->close() unless defined $bref;
|
|
$self->{read_buf} .= $$bref;
|
|
|
|
while ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
|
|
my $line = $1;
|
|
if ($self->job eq 'queryworker' && (substr($line, 0, 5) ne 'error')) {
|
|
Frontend->HandleQueryWorkerResponse($self, $line);
|
|
} else {
|
|
Frontend->HandleChildRequest($self, $line);
|
|
}
|
|
}
|
|
}
|
|
|
|
sub job {
|
|
my WorkerConn $self = shift;
|
|
return $self->{job} unless @_;
|
|
return $self->{job} = shift;
|
|
}
|
|
|
|
sub pid {
|
|
my WorkerConn $self = shift;
|
|
return $self->{pid} unless @_;
|
|
return $self->{pid} = shift;
|
|
}
|
|
|
|
sub event_hup { my $self = shift; $self->close; }
|
|
|
|
sub close {
|
|
# mark us as being dead
|
|
my WorkerConn $self = shift;
|
|
Frontend->NoteDeadWorkerConn($self);
|
|
$self->SUPER::close(@_);
|
|
}
|
|
|
|
sub enqueue_line {
|
|
my WorkerConn $self = $_[0];
|
|
return if $self->job eq 'queryworker'; # they don't use this queueing
|
|
my $msg = "$_[1]\r\n";
|
|
push @{$self->{cmd_buf}}, $msg;
|
|
}
|
|
|
|
sub drain_queue {
|
|
my WorkerConn $worker = $_[0];
|
|
foreach my $cmd (@{$worker->{cmd_buf}}) {
|
|
$worker->write($cmd);
|
|
}
|
|
$worker->write(".\r\n");
|
|
$worker->{cmd_buf} = [];
|
|
}
|
|
|
|
|
|
#####################################################################
|
|
### F R O N T E N D C L A S S
|
|
### This class handles keeping lists of workers and clients and
|
|
### assigning them to eachother when things happen. This is a purely
|
|
### event driven class.
|
|
#####################################################################
|
|
package Frontend;
|
|
|
|
# Mappings: fd => [ clientref, jobstring, starttime ]
|
|
# queues are just lists of Client class objects
|
|
# ChildrenByJob: job => { pid => $client }
|
|
# ErrorsTo: fid => Client
|
|
# RecentQueries: [ string, string, string, ... ]
|
|
# Stats: element => number
|
|
our ($IsChild, @QueryWorkerQueue, @ClientQueue, @RecentQueries,
|
|
%Mappings, %ChildrenByJob, %ErrorsTo, %Stats);
|
|
$IsChild = 0;
|
|
|
|
# when a child is spawned, they'll have copies of all the data from the
|
|
# parent, but they don't need it. this method is called when you want
|
|
# to indicate that this Frontend is running on a child and should clean.
|
|
sub SetAsChild {
|
|
@QueryWorkerQueue = ();
|
|
@ClientQueue = ();
|
|
%Mappings = ();
|
|
$IsChild = 1;
|
|
%ErrorsTo = ();
|
|
|
|
# and now kill off our event loop so that we don't waste time
|
|
Client->SetPostLoopCallback(sub { return 0; });
|
|
}
|
|
|
|
# called when a child has died. a child is someone doing a job for us,
|
|
# but it might be a queryworker or any other type of job. we just want
|
|
# to remove them from our list of children. they're actually respawned
|
|
# by the make_new_child function elsewhere in Mgd.
|
|
sub NoteDeadChild {
|
|
my $pid = $_[1];
|
|
foreach my $job (keys %ChildrenByJob) {
|
|
return if # bail out if we actually delete one
|
|
delete $ChildrenByJob{$job}->{$pid};
|
|
}
|
|
}
|
|
|
|
# called when a client dies. clients are users, management or non.
|
|
# we just want to remove them from the error reporting interface, if
|
|
# they happen to be part of it.
|
|
sub NoteDeadClient {
|
|
my Client $client = $_[1];
|
|
delete $ErrorsTo{$client->{fd}};
|
|
}
|
|
|
|
# called when the error function in Mgd is called and we're in the parent,
|
|
# so it's pretty simple that basically we just spit it out to folks listening
|
|
# to errors
|
|
sub NoteError {
|
|
my Client $client;
|
|
return unless %ErrorsTo;
|
|
|
|
my $msg = ":: ${$_[1]}\r\n";
|
|
foreach $client (values %ErrorsTo) {
|
|
$client->write(\$msg);
|
|
}
|
|
}
|
|
|
|
# take a new connection that we know is from one of our children, but
|
|
# we're not sure what type of child, so just set it in read mode until
|
|
# they tell us what they are
|
|
sub RegisterWorkerConn {
|
|
my WorkerConn $worker = $_[1];
|
|
$worker->watch_read(1);
|
|
}
|
|
|
|
# take a new worker and note that it's a worker and ready to be used
|
|
# for commands. this is called when workers connect to the frontend.
|
|
sub RegisterQueryWorker {
|
|
# basically take the worker, mark it as a worker, enqueue it,
|
|
# and then try to process the outstanding queues
|
|
my WorkerConn $worker = $_[1];
|
|
Frontend->EnqueueQueryWorker($worker);
|
|
}
|
|
|
|
# puts a worker back in the queue, deleting any outstanding jobs in
|
|
# the mapping list for this fd.
|
|
sub EnqueueQueryWorker {
|
|
# first arg is class, second is worker
|
|
my WorkerConn $worker = $_[1];
|
|
delete $Mappings{$worker->{fd}};
|
|
|
|
# see if we need to kill off some workers
|
|
if (Mgd::job_needs_reduction('queryworker')) {
|
|
Mgd::error("Reducing queryworker headcount by 1.");
|
|
Frontend->AskWorkerToDie($worker);
|
|
return;
|
|
}
|
|
|
|
# must be okay, so put it in the queue
|
|
push @QueryWorkerQueue, $worker;
|
|
Frontend->ProcessQueues;
|
|
}
|
|
|
|
# if we need to kill off a worker, this function takes in the WorkerConn
|
|
# object, tells it to die, marks us as having requested its death, and decrements
|
|
# the count of running jobs.
|
|
sub AskWorkerToDie {
|
|
my WorkerConn $worker = $_[1];
|
|
$worker->write("shutdown\r\n");
|
|
Mgd::note_pending_death($worker->job, $worker->pid);
|
|
}
|
|
|
|
# kill bored query workers so we can get down to the level requested. this
|
|
# continues killing until we run out of folks to kill.
|
|
sub CullQueryWorkers {
|
|
while (@QueryWorkerQueue && Mgd::job_needs_reduction('queryworker')) {
|
|
my WorkerConn $worker = shift @QueryWorkerQueue;
|
|
Frontend->AskWorkerToDie($worker);
|
|
}
|
|
}
|
|
|
|
# called when we get a response from a worker. this reenqueues the
|
|
# worker so it can handle another response as well as passes the answer
|
|
# back on to the client.
|
|
sub HandleQueryWorkerResponse {
|
|
return Mgd::error("Frontend (Child) got worker response: $_[2]") if $IsChild;
|
|
|
|
# got a response from a worker
|
|
my WorkerConn $worker = $_[1];
|
|
return unless $worker && $Mappings{$worker->{fd}};
|
|
|
|
# get the client we're working with (if any)
|
|
my Client $client = $Mappings{$worker->{fd}}->[0];
|
|
|
|
# if we have no client, then we just got a standard message from
|
|
# the queryworker and need to pass it up the line
|
|
return Frontend->HandleChildRequest($worker, $_[2]) if !$client;
|
|
|
|
# at this point it was a command response, but if the client has gone
|
|
# away, just reenqueue this query worker
|
|
return Frontend->EnqueueQueryWorker($worker) if $client->{closed};
|
|
|
|
# <numeric id> [client-side time to complete] <response>
|
|
my ($time, $id, $res);
|
|
if ($_[2] =~ /^(\d+-\d+)\s+(\d+\.\d+)\s+(.+)$/) {
|
|
# save time and response for use later
|
|
($id, $time, $res) = ($1, $2, $3);
|
|
} elsif ($_[2] =~ /^(\d+-\d+)\s(.+)$/) {
|
|
# didn't match, must be in a different format?
|
|
($id, $time, $res) = ($1, 'undef', $2);
|
|
}
|
|
|
|
# now, if it doesn't match
|
|
unless ($id eq "$worker->{pid}-$worker->{reqid}") {
|
|
Mgd::error("Worker responded with id $id, expected $worker->{pid}-$worker->{reqid}, killing");
|
|
$client->close('worker_mismatch');
|
|
return Frontend->AskWorkerToDie($worker);
|
|
}
|
|
|
|
# now time this interval and add to @RecentQueries
|
|
my $tinterval = Time::HiRes::tv_interval([$Mappings{$worker->{fd}}->[2]]);
|
|
push @RecentQueries, sprintf("%s %.4f %s", $Mappings{$worker->{fd}}->[1], $tinterval, $time);
|
|
shift @RecentQueries if scalar(@RecentQueries) > 50;
|
|
|
|
# send text to client, put worker back in queue
|
|
$client->write("$res\r\n");
|
|
Frontend->EnqueueQueryWorker($worker);
|
|
}
|
|
|
|
# called from various spots to empty the queues of available pairs.
|
|
sub ProcessQueues {
|
|
return if $IsChild;
|
|
|
|
# try to match up a client with a worker
|
|
while (@QueryWorkerQueue && @ClientQueue) {
|
|
# get client that isn't closed
|
|
my $clref;
|
|
while (@ClientQueue) {
|
|
$clref = shift @ClientQueue;
|
|
if (!defined $clref || $clref->[0]->{closed}) {
|
|
$clref = undef;
|
|
next;
|
|
}
|
|
|
|
# if we get here the client is valid
|
|
last;
|
|
}
|
|
next unless $clref;
|
|
|
|
# get worker and make sure it's not closed already
|
|
my WorkerConn $worker = shift @QueryWorkerQueue;
|
|
if (!defined $worker || $worker->{closed}) {
|
|
unshift @ClientQueue, $clref;
|
|
next;
|
|
}
|
|
|
|
# put in mapping and send data to worker
|
|
push @$clref, Time::HiRes::gettimeofday();
|
|
$Mappings{$worker->{fd}} = $clref;
|
|
|
|
# increment our counter so we know what request counter this is going out
|
|
$worker->{reqid}++;
|
|
|
|
$worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n");
|
|
$worker->watch_read(1);
|
|
}
|
|
}
|
|
|
|
# send short descriptions of commands we support to the user
|
|
sub SendHelp {
|
|
my Client $client = $_[1];
|
|
|
|
# not supported yet
|
|
#my $whaton = $_[2];
|
|
|
|
# send general purpose help
|
|
$client->write(<<HELP);
|
|
Welcome to mogilefsd's built-in help system. Available commands:
|
|
|
|
!recent Recently executed queries and how long they took.
|
|
!queue Queries that are pending execution.
|
|
!stats General stats on what we're up to.
|
|
!watch Observe errors/messages from children.
|
|
!jobs Outstanding job counts, desired level, and pids.
|
|
!shutdown IMMEDIATELY kill all of mogilefsd. IMMEDIATELY.
|
|
|
|
!replication
|
|
See the replication status. Output format:
|
|
<domain> <class> <devcount> <files>
|
|
|
|
!to <job class> <message>
|
|
Send <message> to all workers of <job class>.
|
|
Mostly used for debugging.
|
|
|
|
!want <count> <job class>
|
|
Alter the level of workers of this class desired.
|
|
Example: !want 20 queryworker, !want 3 replicate.
|
|
See !jobs for what jobs are available.
|
|
|
|
More to come...
|
|
.
|
|
HELP
|
|
|
|
}
|
|
|
|
# called when a client sends us text. we just create a job for
|
|
# it and then call ProcessQueues.
|
|
sub HandleClientRequest {
|
|
return Mgd::error("Frontend (Child) got request from client: $_[2]") if $IsChild;
|
|
|
|
# if it's just 'help', 'h', '?', or something, do that
|
|
if ((substr($_[2], 0, 1) eq '?') || ($_[2] eq 'help') || ($_[2] eq '')) {
|
|
Frontend->SendHelp($_[1]);
|
|
return;
|
|
}
|
|
|
|
# quick check to see if we the parent should handle this
|
|
if (substr($_[2], 0, 1) eq '!') {
|
|
my Client $client = $_[1];
|
|
my ($cmd, $args) = ($_[2] =~ m/^!(.+?)(?:\s+(.+))?$/);
|
|
|
|
my @out;
|
|
if ($cmd =~ /^stats$/) {
|
|
# print out some stats on the queues
|
|
my $uptime = time - $Mgd::starttime;
|
|
my $ccount = scalar(@ClientQueue);
|
|
my $wcount = scalar(@QueryWorkerQueue);
|
|
my $ipcount = scalar(keys %Mappings);
|
|
push @out, "uptime $uptime",
|
|
"pending_queries $ccount",
|
|
"processing_queries $ipcount",
|
|
"bored_queryworkers $wcount",
|
|
map { "$_ $Stats{$_}" } sort keys %Stats;
|
|
|
|
} elsif ($cmd =~ /^repl/) {
|
|
Mgd::validate_dbh();
|
|
my $dbh = Mgd::get_dbh();
|
|
my $mdcs = Mgd::get_mindevcounts();
|
|
foreach my $dmid (sort keys %$mdcs) {
|
|
my $dmname = Mgd::domain_name($dmid);
|
|
foreach my $classid (sort keys %{$mdcs->{$dmid}}) {
|
|
my $min = $mdcs->{$dmid}->{$classid};
|
|
next unless $min > 1;
|
|
|
|
my $classname = Mgd::class_name($dmid, $classid) || '_default';
|
|
foreach my $ct (1..$min-1) {
|
|
my $count = $dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
|
|
undef, $dmid, $classid, $ct);
|
|
push @out, "$dmname $classname $ct $count";
|
|
}
|
|
}
|
|
}
|
|
|
|
} elsif ($cmd =~ /^shutdown/) {
|
|
print "User requested shutdown: $args\n";
|
|
kill 15, $$; # kill us, that kills our kids
|
|
|
|
} elsif ($cmd =~ /^jobs/) {
|
|
# dump out a list of running jobs and pids
|
|
foreach my $job (sort keys %ChildrenByJob) {
|
|
my $ct = scalar(keys %{$ChildrenByJob{$job}});
|
|
push @out, "$job count $ct";
|
|
push @out, "$job desired $jobs{$job}->[0]";
|
|
push @out, "$job pids " . join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}});
|
|
}
|
|
|
|
} elsif ($cmd =~ /^want/) {
|
|
# !want <count> <jobclass>
|
|
# set the new desired staffing level for a class
|
|
if ($args =~ /^(\d+)\s+(\S+)/) {
|
|
my ($count, $job) = ($1, $2);
|
|
|
|
# validate count
|
|
$count = 0 if $count < 0;
|
|
# FIXME ...add an upper limit?
|
|
|
|
# now make sure it's a real job
|
|
if (defined $jobs{$job}) {
|
|
$jobs{$job}->[0] = $count;
|
|
$Mgd::allkidsup = 0;
|
|
push @out, "Now desiring $count children doing '$job'.";
|
|
|
|
# try to clean out the queryworkers (if that's what we're doing?)
|
|
Frontend->CullQueryWorkers
|
|
if $job eq 'queryworker';
|
|
} else {
|
|
my $classes = join(", ", sort keys %jobs);
|
|
push @out, "ERROR: Invalid class '$job'. Valid classes: $classes";
|
|
}
|
|
} else {
|
|
push @out, "ERROR: usage: !want <count> <jobclass>";
|
|
}
|
|
|
|
} elsif ($cmd =~ /^to/) {
|
|
# !to <jobclass> <message>
|
|
# sends <message> to all children of <jobclass>
|
|
if ($args =~ /^(\S+)\s+(.+)/) {
|
|
my $ct = Frontend->SendToChildrenByJob($1, $2);
|
|
push @out, "Message sent to $ct children.";
|
|
|
|
} else {
|
|
push @out, "ERROR: usage: !to <jobclass> <message>";
|
|
}
|
|
|
|
} elsif ($cmd =~ /^queue/ || $cmd =~ /^pend/) {
|
|
foreach my $clq (@ClientQueue) {
|
|
push @out, $clq->[1];
|
|
}
|
|
|
|
} elsif ($cmd =~ /^watch/) {
|
|
if (delete $ErrorsTo{$client->{fd}}) {
|
|
push @out, "Removed you from watcher list.";
|
|
} else {
|
|
$ErrorsTo{$client->{fd}} = $client;
|
|
push @out, "Added you to watcher list.";
|
|
}
|
|
|
|
} elsif ($cmd =~ /^recent/) {
|
|
# show the most recent N queries
|
|
push @out, @RecentQueries;
|
|
|
|
} else {
|
|
Frontend->SendHelp($client, $args);
|
|
}
|
|
$client->write(join("\r\n", @out) . "\r\n") if @out;
|
|
$client->write(".\r\n");
|
|
return;
|
|
}
|
|
|
|
# just push the input onto the client queue
|
|
$Stats{queries}++;
|
|
push @ClientQueue, [ $_[1], "cmd " . ($_[1]->peer_ip_string || '0.0.0.0') . " $_[2]" ];
|
|
Frontend->ProcessQueues;
|
|
}
|
|
|
|
# a child has contacted us with some command/status/something.
|
|
sub HandleChildRequest {
|
|
return Mgd::error("Frontend (Child) got request from child: $_[2]") if $IsChild;
|
|
|
|
# if they have no job set, then their first line is what job they are
|
|
# and not a command. they also specify their pid, just so we know what
|
|
# connection goes with what pid, in case it's ever useful information.
|
|
my WorkerConn $child = $_[1];
|
|
unless (defined $child->job) {
|
|
my ($pid, $job) = ($_[2] =~ /^(\d+)\s+(.+)/);
|
|
$child->job($job);
|
|
$child->pid($pid);
|
|
|
|
# now do any special case startup
|
|
if ($job eq 'queryworker') {
|
|
Frontend->RegisterQueryWorker($child);
|
|
}
|
|
|
|
# add to normal list
|
|
$ChildrenByJob{$job}->{$child->pid} = $child;
|
|
return;
|
|
}
|
|
|
|
# see if we should downsize this child
|
|
my $check_job = sub {
|
|
if (Mgd::job_needs_reduction($child->job)) {
|
|
Mgd::error("Reducing headcount of " . $child->job . " job by 1.");
|
|
Frontend->AskWorkerToDie($child);
|
|
} else {
|
|
$child->drain_queue;
|
|
}
|
|
};
|
|
|
|
# at this point we've got a command of some sort
|
|
my $cmd = $_[2];
|
|
if ($cmd =~ /^error (.+)$/i) {
|
|
# pass it on to our error handler, prefaced with the child's job
|
|
Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1");
|
|
|
|
} elsif ($cmd =~ /^queue/) {
|
|
# send out what we have queued up for it
|
|
$child->drain_queue;
|
|
|
|
} elsif ($cmd =~ /^del_i_looped/) {
|
|
$check_job->();
|
|
|
|
} elsif ($cmd =~ /^monitor_ping/) {
|
|
$check_job->();
|
|
|
|
} elsif ($cmd =~ /^reaper_ping/) {
|
|
$check_job->();
|
|
|
|
} elsif ($cmd =~ /^repl_ping/) {
|
|
$check_job->();
|
|
|
|
} elsif ($cmd =~ /^repl_unreachable (\d+)/) {
|
|
# announce to the other replicators that this fid can't be reached, but note
|
|
# that we don't actually drain the queue to the requestor, as the replicator
|
|
# isn't in a place where it can accept a queue drain right now.
|
|
Frontend->SendToChildrenByJob('replicate', "repl_unreachable $1", $child);
|
|
|
|
} elsif ($cmd =~ /^repl_i_did (\d+)/) {
|
|
my $fid = $1;
|
|
|
|
# announce to the other replicators that this fid was done and then drain the
|
|
# queue to this person.
|
|
Frontend->SendToChildrenByJob('replicate', "repl_was_done $fid", $child);
|
|
$check_job->();
|
|
|
|
} else {
|
|
# unknown command
|
|
Mgd::error("Unknown command [$_[2]] from child; job=" . $child->job);
|
|
}
|
|
}
|
|
|
|
# given a job class, and a message, send it to all children of that job. returns
|
|
# the number of children the message was sent to.
|
|
# arguments: ( jobclass, message, [ child ] )
|
|
# if child is specified, the message will be sent to members of the job class that
|
|
# aren't that child. so you can exclude the one that originated the message.
|
|
sub SendToChildrenByJob {
|
|
my $childref = $ChildrenByJob{$_[1]};
|
|
return 0 unless defined $childref && %$childref;
|
|
my $msg = $_[2];
|
|
|
|
foreach my $child (values %$childref) {
|
|
# ignore the child specified as the third arg if one is sent
|
|
next if defined $_[3] && $_[3] == $child;
|
|
|
|
# send the message to this child
|
|
$child->enqueue_line($msg);
|
|
}
|
|
return scalar(keys %$childref);
|
|
}
|
|
|
|
# called when we notice that a worker has bit it. we might have to restart a
|
|
# job that they had been working on.
|
|
sub NoteDeadWorkerConn {
|
|
return if $IsChild;
|
|
|
|
# get parms and error check
|
|
my WorkerConn $worker = $_[1];
|
|
return unless $worker;
|
|
|
|
# if there's a mapping for this worker's fd, they had a job that didn't get done
|
|
if ($Mappings{$worker->{fd}}) {
|
|
# unshift, since this one already went through the queue once
|
|
unshift @ClientQueue, $Mappings{$worker->{fd}};
|
|
delete $Mappings{$worker->{fd}};
|
|
|
|
# now try to get it processing again
|
|
Frontend->ProcessQueues;
|
|
}
|
|
}
|
|
|
|
|
|
#####################################################################
|
|
### W O R K E R C L A S S
|
|
### Class that handles all of the actions that a worker can take.
|
|
#####################################################################
|
|
package QueryWorker;
|
|
|
|
use fields qw{sock querystarttime reqid};
|
|
|
|
sub new {
|
|
my QueryWorker $self = shift;
|
|
|
|
$self = fields::new($self) unless ref $self;
|
|
$self->{sock} = shift;
|
|
$self->{querystarttime} = undef;
|
|
$self->{reqid} = undef;
|
|
|
|
return $self;
|
|
}
|
|
|
|
sub process_line {
|
|
my QueryWorker $self = shift;
|
|
my $lineref = shift;
|
|
|
|
# see what kind of command this is
|
|
return $self->err_line('unknown_command')
|
|
unless $$lineref =~ /^(\d+-\d+)?\s*(\S+)\s+(\S+)\s*(.*)/;
|
|
$self->{reqid} = $1 || undef;
|
|
my ($cmd, $line) = ($2, $4);
|
|
|
|
# set global variables for zone determination
|
|
$client_ip = $3;
|
|
$force_alt_zone = 0;
|
|
|
|
# some basic commands we support
|
|
if ($cmd eq 'echo') {
|
|
Mgd::send_to_parent($line);
|
|
return;
|
|
} elsif ($cmd eq 'shutdown') {
|
|
exit 0;
|
|
}
|
|
|
|
# fallback to normal command handling
|
|
if ($line =~ /^(\w+)\s*(.*)/) {
|
|
my ($cmd, $args) = ($1, $2);
|
|
$cmd = lc($cmd);
|
|
|
|
no strict 'refs';
|
|
$self->{querystarttime} = Time::HiRes::gettimeofday();
|
|
my $cmd_handler = *{"cmd_$cmd"}{CODE};
|
|
if ($cmd_handler) {
|
|
my $args = decode_url_args(\$args);
|
|
$force_alt_zone = 1 if $args->{zone} eq 'alt';
|
|
$cmd_handler->($self, $args);
|
|
return;
|
|
}
|
|
}
|
|
|
|
return $self->err_line('unknown_command');
|
|
}
|
|
|
|
# returns 0 on error, or dmid of domain
|
|
sub check_domain {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
return $self->err_line("no_domain") unless length($args->{domain});
|
|
|
|
# validate domain
|
|
my $dmid = Mgd::domain_id($args->{domain}) or
|
|
return $self->err_line("unreg_domain");
|
|
|
|
return $dmid;
|
|
}
|
|
|
|
sub cmd_sleep {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
sleep($args->{duration} || 10);
|
|
return $self->ok_line;
|
|
}
|
|
|
|
sub cmd_create_open {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
# validate parameters
|
|
my $dmid = $self->check_domain($args) or return 0;
|
|
my $key = $args->{key} || "";
|
|
my $multi = $args->{multi_dest} ? 1 : 0;
|
|
|
|
# get DB handle
|
|
my $dbh = Mgd::get_dbh or
|
|
return $self->err_line("nodb");
|
|
|
|
# figure out what classid this file is for
|
|
my $class = $args->{class};
|
|
my $classid = 0;
|
|
if (length($class)) {
|
|
# TODO: cache this
|
|
$classid = $dbh->selectrow_array("SELECT classid FROM class ".
|
|
"WHERE dmid=? AND classname=?",
|
|
undef, $dmid, $class)
|
|
or return $self->err_line("unreg_class");
|
|
}
|
|
|
|
# find a device to put this file on that has 100Mb free.
|
|
my (@dests, @hosts);
|
|
my $devs = Mgd::get_device_summary();
|
|
while (scalar(@dests) < ($multi ? 3 : 1)) {
|
|
my $devid = Mgd::find_deviceid(
|
|
random => 1,
|
|
weight_by_free => 1,
|
|
not_on_hosts => \@hosts,
|
|
);
|
|
|
|
last unless defined $devid;
|
|
|
|
push @dests, $devid;
|
|
push @hosts, $devs->{$devid}->{hostid};
|
|
}
|
|
return $self->err_line("no_devices") unless @dests;
|
|
|
|
# setup the new mapping. we store the devices that we picked for
|
|
# this file in here, knowing that they might not be used. create_close
|
|
# is responsible for actually mapping in file_on.
|
|
$dbh->do("INSERT INTO tempfile SET ".
|
|
" fid=NULL, dmid=?, dkey=?, classid=?, createtime=UNIX_TIMESTAMP(), devids=?",
|
|
undef, $dmid, $key, $classid, join(',', @dests));
|
|
return undef if $dbh->err;
|
|
my $fid = $dbh->{mysql_insertid}; # FIXME: mysql-ism
|
|
return undef unless $fid > 0;
|
|
|
|
# original single path support
|
|
return $self->ok_line({
|
|
fid => $fid,
|
|
devid => $dests[0],
|
|
path => Mgd::make_path($dests[0], $fid),
|
|
}) unless $multi;
|
|
|
|
# multiple path support
|
|
my $ct = 0;
|
|
my $res = {};
|
|
foreach my $devid (@dests) {
|
|
$ct++;
|
|
$res->{"devid_$ct"} = $devid;
|
|
$res->{"path_$ct"} = Mgd::make_path($devid, $fid);
|
|
}
|
|
$res->{fid} = $fid;
|
|
$res->{dev_count} = $ct;
|
|
return $self->ok_line($res);
|
|
}
|
|
|
|
sub cmd_create_close {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
# validate parameters
|
|
my $dmid = $self->check_domain($args) or return 0;
|
|
my $key = $args->{key};
|
|
|
|
my $fid = $args->{fid} or return $self->err_line("no_fid");
|
|
my $devid = $args->{devid} or return $self->err_line("no_devid");
|
|
my $path = $args->{path} or return $self->err_line("no_path");
|
|
|
|
# is the provided path what we'd expect for this fid/devid?
|
|
return $self->err_line("bogus_args")
|
|
unless $path eq Mgd::make_path($devid, $fid);
|
|
|
|
# get DB handle
|
|
my $dbh = Mgd::get_dbh or
|
|
return $self->err_line("nodb");
|
|
|
|
# find the temp file we're closing and making real
|
|
my $trow = $dbh->selectrow_hashref("SELECT classid, dmid, dkey ".
|
|
"FROM tempfile WHERE fid=?",
|
|
undef, $fid);
|
|
return $self->err_line("no_temp_file") unless $trow;
|
|
|
|
# if a temp file is closed without a provided-key, that means to
|
|
# delete it.
|
|
unless (length($key)) {
|
|
# add to to-delete list
|
|
$dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $fid);
|
|
$dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fid);
|
|
return $self->ok_line;
|
|
}
|
|
|
|
# see if we have a fid for this key already
|
|
my $old_file = Mgd::key_filerow($dbh, $dmid, $key);
|
|
if ($old_file) {
|
|
# add to to-delete list
|
|
$dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $old_file->{fid});
|
|
$dbh->do("DELETE FROM file WHERE fid=?", undef, $old_file->{fid});
|
|
}
|
|
|
|
# get size of file and verify that it matches what we were given, if anything
|
|
my $size = Mgd::get_file_size($path);
|
|
return $self->err_line("size_mismatch", "Expected: $args->{size}; actual: $size; path: $path")
|
|
if $args->{size} && ($args->{size} != $size);
|
|
|
|
# TODO: check for EIO?
|
|
return $self->err_line("empty_file") unless $size;
|
|
|
|
# insert file_on row
|
|
$dbh->do("INSERT IGNORE INTO file_on SET fid = ?, devid = ?", undef, $fid, $devid);
|
|
return $self->err_line("db_error") if $dbh->err;
|
|
|
|
my $rv = $dbh->do("REPLACE INTO file ".
|
|
"SET ".
|
|
" fid=?, dmid=?, dkey=?, length=?, ".
|
|
" classid=?, devcount=0", undef,
|
|
$fid, $dmid, $key, $size, $trow->{classid});
|
|
return $self->err_line("db_error") unless $rv;
|
|
|
|
$dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fid);
|
|
|
|
if (Mgd::update_fid_devcount($fid)) {
|
|
return $self->ok_line();
|
|
} else {
|
|
# FIXME: handle this better
|
|
return $self->err_line("db_error");
|
|
}
|
|
}
|
|
|
|
sub cmd_delete {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
# validate parameters
|
|
my $dmid = $self->check_domain($args) or return 0;
|
|
my $key = $args->{key};
|
|
return $self->err_line("no_key") unless length($key);
|
|
|
|
# get DB handle
|
|
my $dbh = Mgd::get_dbh or
|
|
return $self->err_line("nodb");
|
|
|
|
# is this fid still owned by this key?
|
|
my $fid = $dbh->selectrow_array("SELECT fid FROM file WHERE dmid=? AND dkey=?",
|
|
undef, $dmid, $key);
|
|
return $self->err_line("unknown_key") unless $fid;
|
|
|
|
$dbh->do("DELETE FROM file WHERE fid=?", undef, $fid);
|
|
$dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $fid);
|
|
|
|
return $self->ok_line();
|
|
|
|
}
|
|
|
|
sub cmd_list_keys {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
# validate parameters
|
|
my $dmid = $self->check_domain($args) or return 0;
|
|
my ($prefix, $after, $limit) = ($args->{prefix}, $args->{after}, $args->{limit});
|
|
return $self->err_line("no_key") unless $prefix;
|
|
|
|
# now validate that after matches prefix
|
|
return $self->err_line('after_mismatch')
|
|
if $after && $after !~ /^$prefix/;
|
|
|
|
# verify there are no % or \ characters
|
|
return $self->err_line('invalid_chars')
|
|
if $prefix =~ /[%\\]/;
|
|
|
|
# escape underscores
|
|
$prefix =~ s/_/\\_/g;
|
|
|
|
# now fix the input... prefix always ends with a % so that it works
|
|
# in a LIKE call, and after is either blank or something
|
|
$prefix .= '%';
|
|
$after ||= '';
|
|
$limit ||= 1000;
|
|
$limit += 0;
|
|
|
|
# get DB handle
|
|
my $dbh = Mgd::get_dbh or
|
|
return $self->err_line("nodb");
|
|
|
|
# now select out our keys
|
|
my $keys = $dbh->selectcol_arrayref
|
|
('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
|
|
"ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
|
|
|
|
# if we got nothing, say so
|
|
return $self->err_line('none_match') unless $keys && @$keys;
|
|
|
|
# construct the output and send
|
|
my $ret = { key_count => 0, next_after => '' };
|
|
foreach my $key (@$keys) {
|
|
$ret->{key_count}++;
|
|
$ret->{next_after} = $key
|
|
if $key gt $ret->{next_after};
|
|
$ret->{"key_$ret->{key_count}"} = $key;
|
|
}
|
|
return $self->ok_line($ret);
|
|
}
|
|
|
|
sub cmd_rename {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
# validate parameters
|
|
my $dmid = $self->check_domain($args) or return 0;
|
|
my ($fkey, $tkey) = ($args->{from_key}, $args->{to_key});
|
|
return $self->err_line("no_key") unless $fkey && $tkey;
|
|
|
|
# get DB handle
|
|
my $dbh = Mgd::get_dbh or
|
|
return $self->err_line("nodb");
|
|
|
|
# rename the file
|
|
my $ct = $dbh->do('UPDATE file SET dkey = ? WHERE dmid = ? AND dkey = ?',
|
|
undef, $tkey, $dmid, $fkey);
|
|
return $self->err_line("key_exists") if $dbh->err;
|
|
return $self->err_line("unknown_key") unless $ct > 0;
|
|
|
|
return $self->ok_line();
|
|
}
|
|
|
|
sub cmd_get_hosts {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
my $dbh = Mgd::get_dbh()
|
|
or return $self->err_line("nodb");
|
|
|
|
Mgd::check_host_cache();
|
|
|
|
my $ret = { hosts => 0 };
|
|
while (my ($hostid, $row) = each %Mgd::cache_host) {
|
|
next if defined $args->{hostid} && $hostid != $args->{hostid};
|
|
|
|
$ret->{hosts}++;
|
|
while (my ($key, $val) = each %$row) {
|
|
$ret->{"host$ret->{hosts}_$key"} = $val;
|
|
}
|
|
}
|
|
|
|
return $self->ok_line($ret);
|
|
}
|
|
|
|
sub cmd_get_devices {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
my $dbh = Mgd::get_dbh()
|
|
or return $self->err_line("nodb");
|
|
|
|
my $devs = Mgd::get_device_summary();
|
|
|
|
my $ret = { devices => 0 };
|
|
while (my ($devid, $row) = each %$devs) {
|
|
next if defined $args->{devid} && $devid != $args->{devid};
|
|
|
|
$ret->{devices}++;
|
|
while (my ($key, $val) = each %$row) {
|
|
$ret->{"dev$ret->{devices}_$key"} = $val;
|
|
}
|
|
}
|
|
|
|
return $self->ok_line($ret);
|
|
}
|
|
|
|
sub cmd_create_domain {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
my $dbh = Mgd::get_dbh()
|
|
or return $self->err_line("nodb");
|
|
|
|
my $domain = $args->{domain};
|
|
return $self->err_line('no_domain') unless length $domain;
|
|
|
|
# FIXME: add some sort of authentication/limitation on this?
|
|
|
|
my $dmid = Mgd::domain_id($domain);
|
|
return $self->err_line('domain_exists') if $dmid;
|
|
|
|
# get the max domain id
|
|
my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain');
|
|
$dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
|
|
undef, $maxid + 1, $domain);
|
|
return $self->err_line('failure') if $dbh->err;
|
|
|
|
# return the domain id we created
|
|
return $self->ok_line({ domain => $domain });
|
|
}
|
|
|
|
sub cmd_create_class {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
my $dbh = Mgd::get_dbh()
|
|
or return $self->err_line("nodb");
|
|
|
|
my $domain = $args->{domain};
|
|
return $self->err_line('no_domain') unless length $domain;
|
|
|
|
my $class = $args->{class};
|
|
return $self->err_line('no_class') unless length $class;
|
|
|
|
my $mindevcount = $args->{mindevcount}+0;
|
|
return $self->err_line('invalid_mindevcount') unless $mindevcount > 0;
|
|
|
|
# FIXME: add some sort of authentication/limitation on this?
|
|
|
|
my $dmid = Mgd::domain_id($domain);
|
|
return $self->err_line('no_domain') unless $dmid;
|
|
|
|
my $cid = Mgd::class_id($dmid, $class);
|
|
return $self->err_line('class_exists') if $cid && !$args->{update};
|
|
|
|
# update or insert at this point
|
|
if ($args->{update}) {
|
|
# now replace the old class
|
|
$dbh->do("REPLACE INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
|
|
undef, $dmid, $cid, $class, $mindevcount);
|
|
} else {
|
|
# get the max class id in this domain
|
|
my $maxid = $dbh->selectrow_array
|
|
('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid);
|
|
|
|
# now insert the new class
|
|
$dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
|
|
undef, $dmid, $maxid + 1, $class, $mindevcount);
|
|
}
|
|
return $self->err_line('failure') if $dbh->err;
|
|
|
|
# return success
|
|
return $self->ok_line({ class => $class, mindevcount => $mindevcount, domain => $domain });
|
|
}
|
|
|
|
sub cmd_update_class {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
# simply passes through to create_class with update set
|
|
$self->cmd_create_class({ %$args, update => 1 });
|
|
}
|
|
|
|
sub cmd_get_domains {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
my $dbh = Mgd::get_dbh()
|
|
or return $self->err_line("nodb");
|
|
|
|
my $domains = $dbh->selectall_arrayref('SELECT dmid, namespace FROM domain');
|
|
|
|
my $ret = {};
|
|
my $outercount = 0;
|
|
foreach my $row (@$domains) {
|
|
$ret->{"domain" . ++$outercount} = $row->[1];
|
|
|
|
# setup the return row for this set of classes
|
|
my $classes = $dbh->selectall_arrayref
|
|
('SELECT classname, mindevcount FROM class WHERE dmid = ?', undef, $row->[0]);
|
|
my $innercount = 0;
|
|
foreach my $irow (@$classes) {
|
|
$ret->{"domain${outercount}class" . ++$innercount . "name"} = $irow->[0];
|
|
$ret->{"domain${outercount}class" . $innercount . "mindevcount"} = $irow->[1];
|
|
}
|
|
|
|
# record the default class and mindevcount
|
|
$ret->{"domain${outercount}class" . ++$innercount . "name"} = 'default';
|
|
$ret->{"domain${outercount}class" . $innercount . "mindevcount"} = $default_mindevcount;
|
|
|
|
$ret->{"domain${outercount}classes"} = $innercount;
|
|
}
|
|
$ret->{"domains"} = $outercount;
|
|
|
|
return $self->ok_line($ret);
|
|
}
|
|
|
|
sub cmd_get_paths {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
my $key = $args->{key};
|
|
|
|
return $self->err_line("no_key") unless length($key);
|
|
|
|
# validate domain
|
|
my $dmid = $self->check_domain($args) or return 0;
|
|
|
|
# get DB handle
|
|
my $dbh = Mgd::get_dbh or
|
|
return $self->err_line("nodb");
|
|
|
|
my $filerow = Mgd::key_filerow($dbh, $dmid, $key);
|
|
return $self->err_line("unknown_key") unless $filerow;
|
|
|
|
my $fid = $filerow->{fid};
|
|
my $dsum = Mgd::get_device_summary();
|
|
|
|
my $ret = {
|
|
paths => 0,
|
|
};
|
|
|
|
# is this fid still owned by this key?
|
|
my $devids = $dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
|
|
undef, $fid) || [];
|
|
my $devcount = scalar(@$devids);
|
|
my $idx = int(rand() * $devcount);
|
|
|
|
for (1..$devcount) {
|
|
my $devid = $devids->[($_+$idx) % $devcount];
|
|
my $dev = $dsum->{$devid};
|
|
next unless $dev && $dev->{status} eq "alive";
|
|
my $path = Mgd::make_get_path($devid, $fid);
|
|
next unless $ret->{paths} || $args->{noverify} ||
|
|
(Mgd::get_file_size($path) == $filerow->{length});
|
|
my $n = ++$ret->{paths};
|
|
$ret->{"path$n"} = $path;
|
|
last if $n == 2; # one verified, one likely seems enough for now. time will tell.
|
|
}
|
|
|
|
return $self->ok_line($ret);
|
|
}
|
|
|
|
sub cmd_set_state {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
# get database handle
|
|
my $ret = {};
|
|
my $dbh = Mgd::get_dbh
|
|
or return $self->err_line('nodb');
|
|
|
|
# figure out what they want to do
|
|
my ($host, $dev, $state) = ($args->{host}, $args->{device}+0, $args->{state});
|
|
return $self->err_line('bad_params')
|
|
unless $host && $dev && ($state =~ /^(?:alive|down|dead)$/);
|
|
|
|
# now get this device's current state and host
|
|
my ($realhost, $curstate) =
|
|
$dbh->selectrow_array('SELECT hostname, device.status FROM host, device ' .
|
|
'WHERE host.hostid = device.hostid AND device.devid = ?',
|
|
undef, $dev);
|
|
|
|
# verify host is the same
|
|
return $self->err_line('host_mismatch')
|
|
unless $realhost eq $host;
|
|
|
|
# make sure the destination state isn't too high
|
|
return $self->err_line('state_too_high')
|
|
if $curstate eq 'dead' && $state eq 'alive';
|
|
|
|
# update the state in the database now
|
|
$dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $dev);
|
|
return $self->err_line('failure') if $dbh->err;
|
|
|
|
# success, state changed
|
|
return $self->ok_line($ret);
|
|
}
|
|
|
|
sub cmd_stats {
|
|
my QueryWorker $self = shift;
|
|
my $args = shift;
|
|
|
|
# get database handle
|
|
my $ret = {};
|
|
my $dbh = Mgd::get_dbh
|
|
or return $self->err_line('nodb');
|
|
|
|
# get names of all domains and classes for use later
|
|
my %classes;
|
|
my $rows = $dbh->selectall_arrayref('SELECT class.dmid, namespace, classid, classname ' .
|
|
'FROM domain, class WHERE class.dmid = domain.dmid');
|
|
foreach my $row (@$rows) {
|
|
$classes{$row->[0]}->{name} = $row->[1];
|
|
$classes{$row->[0]}->{classes}->{$row->[2]} = $row->[3];
|
|
}
|
|
$classes{$_}->{classes}->{0} = 'default'
|
|
foreach keys %classes;
|
|
|
|
# get host and device information with device status
|
|
my %devices;
|
|
my $rows = $dbh->selectall_arrayref('SELECT device.devid, hostname, device.status ' .
|
|
'FROM device, host WHERE device.hostid = host.hostid');
|
|
foreach my $row (@$rows) {
|
|
$devices{$row->[0]}->{host} = $row->[1];
|
|
$devices{$row->[0]}->{status} = $row->[2];
|
|
}
|
|
|
|
# if they want replication counts, or didn't specify what they wanted
|
|
if ($args->{replication} || $args->{all}) {
|
|
# replication stats
|
|
my $stats = $dbh->selectall_arrayref('SELECT dmid, classid, devcount, COUNT(devcount) FROM file GROUP BY 1, 2, 3');
|
|
my $count = 0;
|
|
foreach my $stat (@$stats) {
|
|
$count++;
|
|
$ret->{"replication${count}domain"} = $classes{$stat->[0]}->{name};
|
|
$ret->{"replication${count}class"} = $classes{$stat->[0]}->{classes}->{$stat->[1]};
|
|
$ret->{"replication${count}devcount"} = $stat->[2];
|
|
$ret->{"replication${count}files"} = $stat->[3];
|
|
}
|
|
$ret->{"replicationcount"} = $count;
|
|
}
|
|
|
|
# file statistics (how many files there are and in what domains/classes)
|
|
if ($args->{files} || $args->{all}) {
|
|
my $stats = $dbh->selectall_arrayref('SELECT dmid, classid, COUNT(classid) FROM file GROUP BY 1, 2');
|
|
my $count = 0;
|
|
foreach my $stat (@$stats) {
|
|
$count++;
|
|
$ret->{"files${count}domain"} = $classes{$stat->[0]}->{name};
|
|
$ret->{"files${count}class"} = $classes{$stat->[0]}->{classes}->{$stat->[1]};
|
|
$ret->{"files${count}files"} = $stat->[2];
|
|
}
|
|
$ret->{"filescount"} = $count;
|
|
}
|
|
|
|
# device statistics (how many files are on each device)
|
|
if ($args->{devices} || $args->{all}) {
|
|
my $stats = $dbh->selectall_arrayref('SELECT devid, COUNT(devid) FROM file_on GROUP BY 1');
|
|
my $count = 0;
|
|
foreach my $stat (@$stats) {
|
|
$count++;
|
|
$ret->{"devices${count}id"} = $stat->[0];
|
|
$ret->{"devices${count}host"} = $devices{$stat->[0]}->{host};
|
|
$ret->{"devices${count}status"} = $devices{$stat->[0]}->{status};
|
|
$ret->{"devices${count}files"} = $stat->[1];
|
|
}
|
|
$ret->{"devicescount"} = $count;
|
|
}
|
|
|
|
# FIXME: DO! add other stats
|
|
|
|
return $self->ok_line($ret);
|
|
}
|
|
|
|
sub ok_line {
|
|
my QueryWorker $self = shift;
|
|
|
|
my $delay = '';
|
|
if ($self->{querystarttime}) {
|
|
$delay = sprintf("%.4f ", Time::HiRes::tv_interval([ $self->{querystarttime} ]));
|
|
$self->{querystarttime} = undef;
|
|
}
|
|
|
|
my $id = defined $self->{reqid} ? "$self->{reqid} " : '';
|
|
|
|
my $args = shift;
|
|
my $argline = join('&', map { eurl($_) . "=" . eurl($args->{$_}) } keys %$args);
|
|
$self->{sock}->write("${id}${delay}OK $argline\r\n");
|
|
return 1;
|
|
}
|
|
|
|
# first argument: error code.
|
|
# second argument: optional error text. text will be taken from code if no text provided.
|
|
sub err_line {
|
|
my QueryWorker $self = shift;
|
|
my $err_code = shift;
|
|
my $err_text = shift || {
|
|
'unknown_command' => "Unknown server command",
|
|
'no_domain' => "No domain provided",
|
|
'no_class' => "No class provided",
|
|
'unreg_domain' => "Domain name invalid/not found",
|
|
'class_exists' => "That class already exists in that domain",
|
|
'domain_exists' => "That domain already exists",
|
|
'invalid_mindevcount' => "The mindevcount must be at least 1",
|
|
'bad_params' => "Invalid parameters to command; please see documentation",
|
|
'host_mismatch' => "The device specified doesn't belong to the host specified",
|
|
'state_too_high' => "Status cannot go from dead to alive; must use down",
|
|
'failure' => "Operation failed",
|
|
'key_exists' => "Target key name already exists; can't overwrite.",
|
|
'none_match' => "No keys match that pattern and after-value (if any).",
|
|
'after_mismatch' => "Pattern does not match the after-value?",
|
|
'invalid_chars' => "Patterns must not contain backslashes (\\) or percent signs (%).",
|
|
}->{$err_code};
|
|
|
|
my $delay = '';
|
|
if ($self->{querystarttime}) {
|
|
$delay = sprintf("%.4f ", Time::HiRes::tv_interval([ $self->{querystarttime} ]));
|
|
$self->{querystarttime} = undef;
|
|
}
|
|
|
|
my $id = defined $self->{reqid} ? "$self->{reqid} " : '';
|
|
|
|
$self->{sock}->write("${id}${delay}ERR $err_code " . eurl($err_text) . "\r\n");
|
|
return 0;
|
|
}
|
|
|
|
sub eurl
|
|
{
|
|
my $a = $_[0];
|
|
$a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
|
|
$a =~ tr/ /+/;
|
|
return $a;
|
|
}
|
|
|
|
sub decode_url_args
|
|
{
|
|
my $a = shift;
|
|
my $buffer = ref $a ? $a : \$a;
|
|
my $ret = {};
|
|
|
|
my $pair;
|
|
my @pairs = split(/&/, $$buffer);
|
|
my ($name, $value);
|
|
foreach $pair (@pairs)
|
|
{
|
|
($name, $value) = split(/=/, $pair);
|
|
$value =~ tr/+/ /;
|
|
$value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
|
|
$name =~ tr/+/ /;
|
|
$name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
|
|
$ret->{$name} .= $ret->{$name} ? "\0$value" : $value;
|
|
}
|
|
return $ret;
|
|
}
|
|
|
|
|
|
# Local Variables:
|
|
# mode: perl
|
|
# c-basic-indent: 4
|
|
# indent-tabs-mode: nil
|
|
# End:
|