ljr/livejournal/bin/dbselectd.pl

393 lines
9.0 KiB
Perl
Raw Permalink Normal View History

2019-02-05 21:49:12 +00:00
#!/usr/bin/perl -w
#
# DB selector daemon. Returns connect information on a preferred DB
# to a requestor.
#
# <LJDEP>
# lib: Getopt::Long, POSIX::, IO::Socket, IO::Select, Socket::, Fcntl::, DBI::
# lib: cgi-bin/ljconfig.pl
# </LJDEP>
use Getopt::Long;
use POSIX;
use IO::Socket;
use IO::Select;
use strict;
use Socket;
use Fcntl;
use DBI;
my $PORT = 5151;
my $PIDFILE = "$ENV{'LJHOME'}/var/dbselectd.pid";
my $SELECT_DELAY = 0.3;
# temporary:
my $DBINFO_FILE = "$ENV{'LJHOME'}/cgi-bin/ljconfig.pl";
my $opt_foreground = 0;
GetOptions("foreground" => \$opt_foreground);
my $pid;
# statistics on known databases
my %db_lastcheck;
my %db_conncount;
my $conf_modtime = 0;
my $conf_stattime = 0;
# Buffers.
my %inbuffer = ();
my %outbuffer = ();
my %cmd = ();
my %clientinfo = ();
sub connect_to
{
my $svr = shift;
my $dbh;
if (ref $LJ::DBCACHE{$svr})
{
$dbh = $LJ::DBCACHE{$svr};
# make sure connection is still good.
my $sth = $dbh->prepare("SELECT CONNECTION_ID()"); # mysql specific
$sth->execute;
my ($id) = $sth->fetchrow_array;
if ($id) { return $dbh; }
undef $dbh;
undef $LJ::DBCACHE{$svr};
}
my $dbname = $LJ::DBINFO{$svr}->{'dbname'} || "livejournal";
$dbh = DBI->connect("DBI:mysql:$dbname:$LJ::DBINFO{$svr}->{'host'}",
$LJ::DBINFO{$svr}->{'user'},
$LJ::DBINFO{$svr}->{'pass'},
{
PrintError => 0,
});
if ($dbh)
{
$LJ::DBCACHE{$svr} = $dbh;
return $dbh;
}
return undef;
}
sub db_can
{
my $svr = shift;
my $cap = shift;
return $LJ::DBINFO{$svr}->{'role'}->{$cap};
}
sub check_server
{
my $svr = shift;
delete $db_conncount{$svr};
delete $db_lastcheck{$svr};
my $dbh = connect_to($svr);
return unless (defined $dbh);
my $sth = $dbh->prepare("SHOW PROCESSLIST");
$sth->execute;
my $ct = 0;
while (my $r = $sth->fetchrow_hashref)
{
# weight busy connections more than idle ones.
if ($r->{'State'}) { $ct += 2; }
else { $ct += 1; }
}
$db_conncount{$svr} = $ct;
$db_lastcheck{$svr} = time();
}
sub connection_load
{
my $svr = shift;
my $time = time();
if (! defined $db_lastcheck{$svr} ||
$time - $db_lastcheck{$svr} > 10)
{
check_server($svr);
}
return $db_conncount{$svr};
}
sub server_power
{
my $svr = shift;
my $cap = shift;
my $weight = $LJ::DBINFO{$svr}->{'role'}->{$cap} || 1;
my $connections = connection_load($svr);
if (defined $connections) {
$connections ||= 1;
} else {
return 0;
}
return ($weight / $connections);
}
sub use_what
{
my $c = shift;
my $cap = shift;
## reload the DB info file if it's been more than 5 seconds since
## its last stat time and if it's changed since what we remember.
my $time = time();
if ($conf_stattime + 5 < $time)
{
my $modtime = (stat($DBINFO_FILE))[9];
if ($modtime > $conf_modtime) {
delete $INC{$DBINFO_FILE};
require $DBINFO_FILE;
$conf_modtime = $modtime;
$conf_stattime = $time;
}
}
my %cand = (); # candidates
# best candidate is one the client is already connected to
foreach my $svr (keys %{$c->{'has'}}) {
if (db_can($svr, $cap)) {
$cand{$svr} = 1;
}
}
# if not connected to anything suitable, then:
unless (%cand)
{
# every db with that capability is a good candidate
foreach my $svr (keys %LJ::DBINFO) {
if (db_can($svr, $cap)) {
$cand{$svr} = 1;
}
}
}
my @cands = keys %cand;
# sort valid candidates by server's connections
my %power;
foreach (@cands) {
$power{$_} = server_power($_, $cap);
}
@cands = sort { $power{$b} <=> $power{$a} } @cands;
# use the one with the highest score:
my $use = $cands[0];
if ($use) {
unless (defined $LJ::DBINFO{$use}->{'dbname'}) {
$LJ::DBINFO{$use}->{'dbname'} = "livejournal";
}
return join(" ", $use, map { $LJ::DBINFO{$use}->{$_} } qw(host user pass dbname));
} else {
return "--";
}
}
sub handle
{
my $select = shift;
my $client = shift;
my $line = shift;
my $c = ($clientinfo{$client} ||= {});
my $out = \$outbuffer{$client};
$line =~ s/^(\S*)\s*//;
my $cmd = $1;
if ($cmd eq "HAVE") {
foreach (split(/,/, $line)) {
next if ($_ eq "master");
$c->{'has'}->{$_} = 1;
}
$$out = "OK\n";
return;
}
if ($cmd eq "NEED") {
my $cap = $line;
my $use = use_what($c, $cap);
$$out = "USE $use\n";
return;
}
if ($cmd eq "STATS") {
my $svr = $line;
$$out = "Stats follow:\n";
foreach my $s (keys %LJ::DBINFO) {
$$out .= "STATS $s = " . connection_load($s) . "\n";
}
$$out .= "End.\n";
return;
}
$$out = "unknown command.\n";
}
# Server crap is below.
$SIG{'TERM'} = sub {
unlink($PIDFILE);
exit 1;
};
if (-e $PIDFILE) {
print "$PIDFILE exists, quitting.\n";
exit 1;
}
sub write_pid
{
my $p = shift;
open(PID, ">$PIDFILE") or die "Couldn't open $PIDFILE for writing: $!\n";
print PID $p;
close(PID);
}
if ($opt_foreground) {
print "Running in foreground...\n";
$pid = $$;
write_pid($pid);
$SIG{'INT'} = sub {
unlink($PIDFILE);
exit 1;
};
} else {
print "Forking off and initializing...\n";
if ($pid = fork) {
# Parent, log pid and exit.
write_pid($pid);
print "Closing ($pid) wrote to $PIDFILE\n";
exit;
}
}
sub killpid_die
{
my $msg = shift;
unlink $PIDFILE;
die $msg;
}
# Connection stuff.
my $server = IO::Socket::INET->new(
"LocalPort" => $PORT,
"Listen" => 10,
"ReuseAddr" => 1,
"Reuse" => 1,
) or killpid_die "Can't make server socket: $@\n";
nonblock($server);
$server->sockopt(SO_REUSEADDR, 1);
my $select = IO::Select->new($server);
print "Looping.\n";
while(1)
{
my $client;
my $rv;
my $data;
# Got connection? Got data?
foreach $client ($select->can_read($SELECT_DELAY)) {
if ($client == $server) {
# New connection, since there's stuff to read from the server sock.
$client = $server->accept();
$select->add($client);
# If the nonblocking mess fails, uh, give up.
unless (nonblock($client)) {
$select->remove($client);
}
} else {
# Read what data we have.
$data = '';
$rv = $client->recv($data, POSIX::BUFSIZ, 0);
unless (defined($rv) && length($data)) {
# If a socket says you can read, but there's nothing there, it's
# actually dead. Clean it up.
cleanup($client);
$select->remove($client);
close($client);
next;
}
$inbuffer{$client} .= $data;
# Check to see if there's a newline at the end. If it is, the
# command is finished. There's only one command line, so I won't
# bother making %cmd a hash with array references to request
# lines. Although this might be needed in the future.
if ($inbuffer{$client} =~ s/^.*\n//) {
$cmd{$client} = $&;
delete $inbuffer{$client};
}
}
}
# Deal with cmd stuff.
foreach $client (keys %cmd) {
my $cmd = $cmd{$client};
$cmd =~ s/[\n\r]+$//;
handle($select, $client, $cmd);
}
%cmd = ();
# Flush buffers
foreach $client ($select->can_write($SELECT_DELAY)) {
# Don't try if there's nothing there.
next unless exists $outbuffer{$client};
$rv = $client->send($outbuffer{$client}, 0);
unless (defined $rv) {
# Something weird happened if we get here, I'll bitch if we ever
# need logging on this thing.
next;
}
if ($rv == length $outbuffer{$client} || $! == POSIX::EWOULDBLOCK) {
substr($outbuffer{$client}, 0, $rv) = '';
delete $outbuffer{$client} unless length $outbuffer{$client};
} else {
# Ahh, something broke. If it was going to block, the above would
# catch it. Close up...
cleanup($client);
$select->remove($client);
close($client);
next;
}
}
}
# Does the messy Socket based nonblock routine...
sub nonblock {
my $socket = shift;
my $flags;
$flags = fcntl($socket, F_GETFL, 0) or return 0;
fcntl($socket, F_SETFL, $flags | O_NONBLOCK) or return 0;
return 1;
}
sub cleanup
{
my $client = shift;
delete $inbuffer{$client};
delete $outbuffer{$client};
delete $cmd{$client};
delete $clientinfo{$client};
}