#!/usr/bin/perl ############################################################################## =head1 NAME moveucluster.pl - Moves a LiveJournal user between database clusters =head1 SYNOPSIS $ moveucluster.pl OPTIONS =head1 OPTIONS =over 4 =item -h, --help Output a help message and exit. =item --verbose[=] Verbosity level, 0, 1, or 2. =item --verify Verify count of copied rows to ensure accuracy (slower) =item --ignorebit Ignore the move in progress bit (force user move) =item --prelocked Do not set user readonly and sleep (somebody else did it) =item --delete Delete data from source cluster when done moving =item --destdelete Delete data from destination cluster before moving =item --expungedel The expungedel option is used to indicate that when a user is encountered with a statusvis of D (deleted journal) and they've been deleted for at least 31 days, instead of moving their data, mark the user as expunged. Further, if you specify the delete and expungedel options at the same time, if the user is expunged, all of their data will be deleted from the source cluster. THIS IS IRREVERSIBLE AND YOU WILL NOT BE ASKED FOR CONFIRMATION. =item --jobserver=host:port Specify a job server to get tasks from. In this mode, no other arguments are necessary, and moveucluster.pl just runs in a loop getting directions from the job server. =back =head1 AUTHOR Brad Fitzpatrick Ebrad@danga.comE Copyright (c) 2002-2004 Danga Interactive. All rights reserved. =cut ############################################################################## use strict; use Getopt::Long; use Pod::Usage qw{pod2usage}; use IO::Socket::INET; # NOTE: these options are used both by Getopt::Long for command-line parsing # in single user move move, and also set by hand when in --jobserver mode, # and the jobserver gives us directions, including whether or not users # are prelocked, need to be source-deleted, verified, etc, etc, etc. my $opt_del = 0; my $opt_destdel = 0; my $opt_verbose = 1; my $opt_movemaster = 0; my $opt_prelocked = 0; my $opt_expungedel = 0; my $opt_ignorebit = 0; my $opt_verify = 0; my $opt_help = 0; my $opt_jobserver = ""; abortWithUsage() unless GetOptions('delete' => \$opt_del, # from source 'destdelete' => \$opt_destdel, # from dest (if exists, before moving) 'verbose=i' => \$opt_verbose, 'movemaster|mm' => \$opt_movemaster, # use separate dedicated source 'prelocked' => \$opt_prelocked, # don't do own locking; master does (harness, ljumover) 'expungedel' => \$opt_expungedel, # mark as expunged if possible (+del to delete) 'ignorebit' => \$opt_ignorebit, # ignore move in progress bit cap (force) 'verify' => \$opt_verify, # slow verification pass (just for debug) 'jobserver=s' => \$opt_jobserver, 'help' => \$opt_help, ); my $optv = $opt_verbose; my $dbo; # original cluster db handle. (may be a movemaster (a slave)) my $dboa; # the actual master handle, which we delete from if deleting from source abortWithUsage() if $opt_help; if ($opt_jobserver) { multiMove(); } else { singleMove(); } sub multiMove { # the job server can keep giving us new jobs to move (or a stop command) # over and over, so we avoid perl exec times require "$ENV{'LJHOME'}/cgi-bin/ljlib.pl"; my $sock; ITER: while (1) { if ($sock && $sock->connected) { my $pipe = 0; local $SIG{PIPE} = sub { $pipe = 1; }; LJ::start_request(); my $dbh = LJ::get_dbh({raw=>1}, "master"); unless ($dbh) { print " master db unavailable\n"; sleep 2; next ITER; } $dbh->do("SET wait_timeout=28800"); my $rv = $sock->write("get_job\r\n"); if ($pipe || ! $rv) { $sock = undef; sleep 1; next ITER; } my $line = <$sock>; unless ($line) { $sock = undef; sleep 1; next ITER; } if ($line =~ /^OK IDLE/) { print "Idling.\n"; sleep 5; next ITER; } elsif ($line =~ /^OK JOB (\d+):(\d+):(\d+)\s+([\d.]+)(?:\s+([\w= ]+))?\r?\n/) { my ($uid, $srcid, $dstid, $locktime) = ($1, $2, $3, $4); my $opts = parseOpts($5); print "Got a job: $uid:$srcid:$dstid, locked for=$locktime, opts: [", join(", ", map { "$_=$opts->{$_}" } grep { $opts->{$_} } keys %$opts), "]\n"; my $u = $dbh->selectrow_hashref("SELECT * FROM user WHERE userid=?", undef, $uid); next ITER unless $u; next ITER unless $u->{clusterid} == $srcid; my $verify = sub { my $pipe = 0; local $SIG{PIPE} = sub { $pipe = 1; }; my $rv = $sock->write("finish $uid:$srcid:$dstid\r\n"); return 0 unless $rv; my $res = <$sock>; return $res =~ /^OK/ ? 1 : 0; }; # If the user is supposed to be prelocked, but the lock didn't # happen more than 3 seconds ago, wait until it has time to # "settle" and then move the user if ( $opts->{prelocked} && $locktime < 3 ) { sleep 3 - $locktime; } my $rv = eval { moveUser($dbh, $u, $dstid, $verify, $opts); }; if ($rv) { print "moveUser($u->{user}/$u->{userid}) = 1\n"; } else { print "moveUser($u->{user}/$u->{userid}) = fail: $@\n"; } LJ::end_request(); LJ::disconnect_dbs(); # end_request could do this, but we want to force it } else { die "Unknown response from server: $line\n"; } } else { print "Need job server sock...\n"; $sock = IO::Socket::INET->new(PeerAddr => $opt_jobserver, Proto => 'tcp', ); unless ($sock) { print " failed.\n"; sleep 1; next ITER; } my $ready = <$sock>; if ($ready =~ /Ready/) { print "Connected.\n"; } else { print "Bogus greeting.\n"; $sock = undef; sleep 1; next ITER; } } } } ### Parse options from job specs into a hashref sub parseOpts { my $raw = shift || ""; my $opts = {}; while ( $raw =~ m{\s*(\w+)=(\w+)}g ) { $opts->{ $1 } = $2; } foreach my $opt (qw(del destdel movemaster prelocked expungedel ignorebit verify)) { next if defined $opts->{$opt}; $opts->{$opt} = eval "\$opt_$opt"; } return $opts; } sub singleMove { my $user = shift @ARGV; my $dclust = shift @ARGV; $dclust = 0 if !defined $dclust && $opt_expungedel; # check arguments abortWithUsage() unless defined $user && defined $dclust; require "$ENV{'LJHOME'}/cgi-bin/ljlib.pl"; $user = LJ::canonical_username($user); abortWithUsage("Invalid username") unless length($user); my $dbh = LJ::get_dbh({raw=>1}, "master"); die "No master db available.\n" unless $dbh; $dbh->do("SET wait_timeout=28800"); my $u = $dbh->selectrow_hashref("SELECT * FROM user WHERE user=?", undef, $user); my $opts = parseOpts(""); # gets command-line opts my $rv = eval { moveUser($dbh, $u, $dclust, undef, $opts); }; if ($rv) { print "Moved '$user' to cluster $dclust.\n"; exit 0; } if ($@) { die "Failed to move '$user' to cluster $dclust: $@\n"; } print "ERROR: move failed.\n"; exit 1; } sub moveUser { my ($dbh, $u, $dclust, $verify_code, $opts) = @_; die "Non-existent db.\n" unless $dbh; die "Non-existent user.\n" unless $u && $u->{userid}; my $user = $u->{user}; # get lock die "Failed to get move lock.\n" unless $dbh->selectrow_array("SELECT GET_LOCK('moveucluster-$u->{userid}', 5)"); # if we want to delete the user, we don't need a destination cluster, so only get # one if we have a real valid destination cluster my $dbch; if ($dclust) { # get the destination DB handle, with a long timeout $dbch = LJ::get_cluster_master({raw=>1}, $dclust); die "Undefined or down cluster \#$dclust\n" unless $dbch; $dbch->do("SET wait_timeout=28800"); # make sure any error is a fatal error. no silent mistakes. $dbh->{'RaiseError'} = 1; $dbch->{'RaiseError'} = 1; } # we can't move to the same cluster my $sclust = $u->{'clusterid'}; if ($sclust == $dclust) { die "User '$user' is already on cluster $dclust\n"; } # we don't support "cluster 0" (the really old format) die "This mover tool doesn't support moving from cluster 0.\n" unless $sclust; die "Can't move back to legacy cluster 0\n" unless $dclust || $opts->{expungedel}; my $is_movemaster; # for every DB handle we touch, make a signature of a sorted # comma-delimited signature onto this list. likewise with the # list of tables this mover script knows about. if ANY signature # in this list isn't identical, we just abort. perhaps this # script wasn't updated, or a long-running mover job wasn't # restarted and new tables were added to the schema. my @alltables = (@LJ::USER_TABLES, @LJ::USER_TABLES_LOCAL); my $mover_sig = join(",", sort @alltables); my $get_sig = sub { my $hnd = shift; return join(",", sort @{ $hnd->selectcol_arrayref("SHOW TABLES") }); }; my $global_sig = $get_sig->($dbh); my $check_sig = sub { my $hnd = shift; my $name = shift; # no signature checks on expunges return if ! $hnd && $opts->{expungedel}; my $sig = $get_sig->($hnd); # special case: signature can be that of the global return if $sig eq $global_sig; if ($sig ne $mover_sig) { my %sigt = map { $_ => 1 } split(/,/, $sig); my @err; foreach my $tbl (@alltables) { unless ($sigt{$tbl}) { # missing a table the mover knows about push @err, "-$tbl"; next; } delete $sigt{$tbl}; } foreach my $tbl (sort keys %sigt) { push @err, "?$tbl"; } if (@err) { die "Table signature for $name doesn't match! Stopping. [@err]\n"; } } }; $check_sig->($dbch, "dbch(database dst)"); # the actual master handle, which we delete from if deleting from source $dboa = LJ::get_cluster_master({raw=>1}, $u); die "Can't get source cluster handle.\n" unless $dboa; $check_sig->($dboa, "dboa(database src)"); if ($opts->{movemaster}) { # if an a/b cluster, the movemaster (the source for moving) is # the opposite side. if not a/b, then look for a special "movemaster" # role for that clusterid my $mm_role = "cluster$u->{clusterid}"; my $ab = lc($LJ::CLUSTER_PAIR_ACTIVE{$u->{clusterid}}); if ($ab eq "a") { $mm_role .= "b"; } elsif ($ab eq "b") { $mm_role .= "a"; } else { $mm_role .= "movemaster"; } $dbo = LJ::get_dbh({raw=>1}, $mm_role); die "Couldn't get movemaster handle" unless $dbo; $check_sig->($dbo, "dbo(movemaster)"); $dbo->{'RaiseError'} = 1; $dbo->do("SET wait_timeout=28800"); my $ss = $dbo->selectrow_hashref("show slave status"); die "Move master not a slave?" unless $ss; $is_movemaster = 1; } else { $dbo = $dboa; } $dboa->{'RaiseError'} = 1; $dboa->do("SET wait_timeout=28800"); my $userid = $u->{'userid'}; # load the info on how we'll move each table. this might die (if new tables # with bizarre layouts are added which this thing can't auto-detect) so want # to do it early. my $tinfo; # hashref of $table -> { # 'idx' => $index_name # which we'll be using to iterate over # 'idxcol' => $col_name # first part of index # 'cols' => [ $col1, $col2, ] # 'pripos' => $idxcol_pos, # what field in 'cols' is $col_name # 'verifykey' => $col # key used in the debug --verify pass # } $tinfo = fetchTableInfo(); # see hack below my $prop_icon = LJ::get_prop("talk", "subjecticon"); my %rows_skipped; # $tablename -> $skipped_rows_count # find readonly cap class, complain if not found my $readonly_bit = undef; foreach (keys %LJ::CAP) { if ($LJ::CAP{$_}->{'_name'} eq "_moveinprogress" && $LJ::CAP{$_}->{'readonly'} == 1) { $readonly_bit = $_; last; } } unless (defined $readonly_bit) { die "Won't move user without %LJ::CAP capability class named '_moveinprogress' with readonly => 1\n"; } # make sure a move isn't already in progress if ($opts->{prelocked}) { unless (($u->{'caps'}+0) & (1 << $readonly_bit)) { die "User '$user' should have been prelocked.\n"; } } else { if (($u->{'caps'}+0) & (1 << $readonly_bit)) { die "User '$user' is already in the process of being moved? (cap bit $readonly_bit set)\n" unless $opts->{ignorebit}; } } if ($opts->{expungedel} && $u->{'statusvis'} eq "D" && LJ::mysqldate_to_time($u->{'statusvisdate'}) < time() - 86400*31) { print "Expunging user '$u->{'user'}'\n"; $dbh->do("INSERT INTO clustermove (userid, sclust, dclust, timestart, timedone) ". "VALUES (?,?,?,UNIX_TIMESTAMP(),UNIX_TIMESTAMP())", undef, $userid, $sclust, 0); LJ::update_user($userid, { clusterid => 0, statusvis => 'X', raw => "caps=caps&~(1<<$readonly_bit), statusvisdate=NOW()" }) or die "Couldn't update user to expunged"; # now delete all content from user cluster for this user if ($opts->{del}) { print "Deleting expungeable user data...\n" if $optv; # figure out if they have any S1 styles my $styleids = $dboa->selectcol_arrayref("SELECT styleid FROM s1style WHERE userid = $userid"); # now delete from the main tables foreach my $table (keys %$tinfo) { my $pri = $tinfo->{$table}->{idxcol}; while ($dboa->do("DELETE FROM $table WHERE $pri=$userid LIMIT 1000") > 0) { print " deleted from $table\n" if $optv; } } # and from the s1stylecache table if (@$styleids) { my $styleids_in = join(",", map { $dboa->quote($_) } @$styleids); if ($dboa->do("DELETE FROM s1stylecache WHERE styleid IN ($styleids_in)") > 0) { print " deleted from s1stylecache\n" if $optv; } } $dboa->do("DELETE FROM clustertrack2 WHERE userid=?", undef, $userid); } return 1; } # if we get to this point we have to enforce that there's a destination cluster, because # apparently the user failed the expunge test if (!defined $dclust || !defined $dbch) { die "User is not eligible for expunging.\n" if $opts->{expungedel}; } # returns state string, with a/b, readonly, and flush states. # string looks like: # "src(34)=a,dst(42)=b,readonly(34)=0,readonly(42)=0,src_flushes=32 # because if: # src a/b changes: lose readonly lock? # dst a/b changes: suspect. did one side crash? was other side caught up? # read-only changes: signals maintenance # flush counts change: causes HANDLER on src to lose state and reset my $stateString = sub { my $post = shift; # false for before, true for "after", which forces a config reload if ($post) { do "$ENV{'LJHOME'}/cgi-bin/ljconfig.pl"; do "$ENV{'LJHOME'}/cgi-bin/ljdefaults.pl"; } my @s; push @s, "src($sclust)=" . $LJ::CLUSTER_PAIR_ACTIVE{$sclust}; push @s, "dst($dclust)=" . $LJ::CLUSTER_PAIR_ACTIVE{$dclust}; push @s, "readonly($sclust)=" . ($LJ::READONLY_CLUSTER{$sclust} ? 1 : 0); push @s, "readonly($dclust)=" . ($LJ::READONLY_CLUSTER{$dclust} ? 1 : 0); my $flushes = 0; my $sth = $dbo->prepare("SHOW STATUS LIKE '%flush%'"); $sth->execute; while (my $r = $sth->fetchrow_hashref) { $flushes += $r->{Value} if $r->{Variable_name} =~ /^Com_flush|Flush_commands$/; } push @s, "src_flushes=" . $flushes; return join(",", @s); }; print "Moving '$u->{'user'}' from cluster $sclust to $dclust\n" if $optv >= 1; my $pre_state = $stateString->(); # mark that we're starting the move $dbh->do("INSERT INTO clustermove (userid, sclust, dclust, timestart) ". "VALUES (?,?,?,UNIX_TIMESTAMP())", undef, $userid, $sclust, $dclust); my $cmid = $dbh->{'mysql_insertid'}; # set readonly cap bit on user unless ($opts->{prelocked} || LJ::update_user($userid, { raw => "caps=caps|(1<<$readonly_bit)" })) { die "Failed to set readonly bit on user: $user\n"; } $dbh->do("SELECT RELEASE_LOCK('moveucluster-$u->{userid}')"); unless ($opts->{prelocked}) { # wait a bit for writes to stop if journal is somewhat active (last week update) my $secidle = $dbh->selectrow_array("SELECT UNIX_TIMESTAMP()-UNIX_TIMESTAMP(timeupdate) ". "FROM userusage WHERE userid=$userid"); if ($secidle) { sleep(2) unless $secidle > 86400*7; sleep(1) unless $secidle > 86400; } } if ($is_movemaster) { my $diff = 999_999; my $tolerance = 50_000; while ($diff > $tolerance) { my $ss = $dbo->selectrow_hashref("show slave status"); if ($ss->{'Slave_IO_Running'} eq "Yes" && $ss->{'Slave_SQL_Running'} eq "Yes") { if ($ss->{'Master_Log_File'} eq $ss->{'Relay_Master_Log_File'}) { $diff = $ss->{'Read_Master_Log_Pos'} - $ss->{'Exec_master_log_pos'}; print " diff: $diff\n" if $optv >= 1; sleep 1 if $diff > $tolerance; } else { print " (Wrong log file): $ss->{'Relay_Master_Log_File'}($ss->{'Exec_master_log_pos'}) not $ss->{'Master_Log_File'}($ss->{'Read_Master_Log_Pos'})\n" if $optv >= 1; } } else { die "Movemaster slave not running"; } } } print "Moving away from cluster $sclust\n" if $optv; my %cmd_done; # cmd_name -> 1 while (my $cmd = $dboa->selectrow_array("SELECT cmd FROM cmdbuffer WHERE journalid=$userid")) { die "Already flushed cmdbuffer job '$cmd' -- it didn't take?\n" if $cmd_done{$cmd}++; print "Flushing cmdbuffer for cmd: $cmd\n" if $optv > 1; require "$ENV{'LJHOME'}/cgi-bin/ljcmdbuffer.pl"; LJ::Cmdbuffer::flush($dbh, $dboa, $cmd, $userid); } # setup dependencies (we can skip work by not checking a table if we know # its dependent table was empty). then we have to order things so deps get # processed first. my %was_empty; # $table -> bool, table was found empty my %dep = ( "logtext2" => "log2", "logprop2" => "log2", "logsec2" => "log2", "talkprop2" => "talk2", "talktext2" => "talk2", "phoneposttrans" => "phonepostentry", # FIXME: ljcom "modblob" => "modlog", "sessions_data" => "sessions", "memkeyword2" => "memorable2", "userpicmap2" => "userpic2", "logtagsrecent" => "usertags", "logtags" => "usertags", "logkwsum" => "usertags", ); # all tables we could be moving. we need to sort them in # order so that we check dependant tables first my @tables; push @tables, grep { ! $dep{$_} } @alltables; push @tables, grep { $dep{$_} } @alltables; # these are ephemeral or handled elsewhere my %skip_table = ( "cmdbuffer" => 1, # pre-flushed "events" => 1, # handled by qbufferd (not yet used) "s1stylecache" => 1, # will be recreated "captcha_session" => 1, # temporary "tempanonips" => 1, # temporary ip storage for spam reports "recentactions" => 1, # pre-flushed by clean_caches "pendcomments" => 1, # don't need to copy these ); $skip_table{'inviterecv'} = 1 if $u->{journaltype} ne 'P'; # non-person, skip invites received $skip_table{'invitesent'} = 1 if $u->{journaltype} ne 'C'; # not community, skip invites sent # we had a concern at the time of writing this dependency optization # that we might use "log3" and "talk3" tables in the future with the # old talktext2/etc tables. if that happens and we forget about this, # this code will trip it up and make us remember: if (grep { $_ eq "log3" || $_ eq "talk3" } @tables) { die "This script needs updating.\n"; } # check if dest has existing data for this user. (but only check a few key tables) # if anything else happens to have data, we'll just fail later. but unlikely. print "Checking for existing data on target cluster...\n" if $optv > 1; foreach my $table (qw(userbio talkleft log2 talk2 sessions userproplite2)) { my $ti = $tinfo->{$table} or die "No table info for $table. Aborting."; eval { $dbch->do("HANDLER $table OPEN"); }; if ($@) { die "This mover currently only works on MySQL 4.x and above.\n" . $@; } my $idx = $ti->{idx}; my $is_there = $dbch->selectrow_array("HANDLER $table READ `$idx` = ($userid) LIMIT 1"); $dbch->do("HANDLER $table CLOSE"); next unless $is_there; if ($opts->{destdel}) { foreach my $table (@tables) { # these are ephemeral or handled elsewhere next if $skip_table{$table}; my $ti = $tinfo->{$table} or die "No table info for $table. Aborting."; my $pri = $ti->{idxcol}; while ($dbch->do("DELETE FROM $table WHERE $pri=$userid LIMIT 500") > 0) { print " deleted from $table\n" if $optv; } } last; } else { die " Existing data on destination cluster\n"; } } # start copying from source to dest. my $rows = 0; my @to_delete; # array of [ $table, $prikey ] my @styleids; # to delete, potentially foreach my $table (@tables) { next if $skip_table{$table}; # people accounts don't have moderated posts next if $u->{'journaltype'} eq "P" && ($table eq "modlog" || $table eq "modblob"); # don't waste time looking at dependent tables with empty parents next if $dep{$table} && $was_empty{$dep{$table}}; my $ti = $tinfo->{$table} or die "No table info for $table. Aborting."; my $idx = $ti->{idx}; my $idxcol = $ti->{idxcol}; my $cols = $ti->{cols}; my $pripos = $ti->{pripos}; # if we're going to be doing a verify operation later anyway, let's do it # now, so we can use the knowledge of rows per table to hint our $batch_size my $expected_rows = undef; my $expected_remain = undef; # expected rows remaining (unread) my $verifykey = $ti->{verifykey}; my %pre; if ($opts->{verify} && $verifykey) { $expected_rows = 0; if ($table eq "dudata" || $table eq "ratelog") { $expected_rows = $dbo->selectrow_array("SELECT COUNT(*) FROM $table WHERE $idxcol=$userid"); } else { my $sth; $sth = $dbo->prepare("SELECT $verifykey FROM $table WHERE $idxcol=$userid"); $sth->execute; while (my @ar = $sth->fetchrow_array) { $_ = join(",",@ar); $pre{$_} = 1; $expected_rows++; } } # no need to continue with tables that don't have any data unless ($expected_rows) { $was_empty{$table} = 1; next; } $expected_remain = $expected_rows; } eval { $dbo->do("HANDLER $table OPEN"); }; if ($@) { die "This mover currently only works on MySQL 4.x and above.\n". $@; } my $tct = 0; # total rows read for this table so far. my $hit_otheruser = 0; # bool, set to true when we encounter data from a different userid my $batch_size; # how big of a LIMIT we'll be doing my $ct = 0; # rows read in latest batch my $did_start = 0; # bool, if process has started yet (used to enter loop, and control initial HANDLER commands) my $pushed_delete = 0; # bool, if we've pushed this table on the delete list (once we find it has something) my $sqlins = ""; my $sqlvals = 0; my $flush = sub { return unless $sqlins; print "# Flushing $table ($sqlvals recs, ", length($sqlins), " bytes)\n" if $optv; $dbch->do($sqlins); $sqlins = ""; $sqlvals = 0; }; my $insert = sub { my $r = shift; # there was an old bug where we'd populate in the database # the choice of "none" for comment subject icon, instead of # just storing nothing. this hack prevents migrating those. if ($table eq "talkprop2" && $r->[2] == $prop_icon->{id} && $r->[3] eq "none") { $rows_skipped{"talkprop2"}++; return; } # now that we know it has something to delete (many tables are empty for users) unless ($pushed_delete++) { push @to_delete, [ $table, $idxcol ]; } if ($sqlins) { $sqlins .= ", "; } else { $sqlins = "INSERT INTO $table (" . join(', ', @{$cols}) . ") VALUES "; } $sqlins .= "(" . join(", ", map { $dbo->quote($_) } @$r) . ")"; $sqlvals++; $flush->() if $sqlvals > 5000 || length($sqlins) > 800_000; }; # let tables perform extra processing on the $r before it's # sent off for inserting. my $magic; # we know how to compress these two tables (currently the only two) if ($table eq "logtext2" || $table eq "talktext2") { $magic = sub { my $r = shift; return unless length($r->[3]) > 200; LJ::text_compress(\$r->[3]); }; } if ($table eq "s1style") { $magic = sub { my $r = shift; push @styleids, $r->[0]; }; } # calculate the biggest batch size that can reasonably fit in memory my $max_batch = 10000; $max_batch = 1000 if $table eq "logtext2" || $table eq "talktext2"; while (! $hit_otheruser && ($ct == $batch_size || ! $did_start)) { my $qry; if ($did_start) { # once we've done the initial big read, we want to walk slowly, because # a LIMIT of 1000 will read 1000 rows, regardless, which may be 995 # seeks into somebody else's journal that we don't care about. # on the other hand, if we did a --verify check above, we have a good # idea what to expect still, so we'll use that instead of just 25 rows. $batch_size = $expected_remain > 0 ? $expected_remain + 1 : 25; if ($batch_size > $max_batch) { $batch_size = $max_batch; } $expected_remain -= $batch_size; $qry = "HANDLER $table READ `$idx` NEXT LIMIT $batch_size"; } else { # when we're first starting out, though, let's LIMIT as high as possible, # since MySQL (with InnoDB only?) will only return rows matching the primary key, # so we'll try as big as possible. but not with myisam -- need to start # small there too, unless we have a guess at the number of rows remaining. my $src_is_innodb = 0; # FIXME: detect this. but first verify HANDLER differences. if ($src_is_innodb) { $batch_size = $max_batch; } else { # MyISAM's HANDLER behavior seems to be different. # it always returns batch_size, so we keep it # small to avoid seeks, even on the first query # (where InnoDB differs and stops when primary key # doesn't match) $batch_size = 25; if ($table eq "clustertrack2" || $table eq "userbio" || $table eq "s1usercache" || $table eq "s1overrides") { # we know these only have 1 row, so 2 will be enough to show # in one pass that we're done. $batch_size = 2; } elsif (defined $expected_rows) { # if we know how many rows remain, let's try to use that (+1 to stop it) $batch_size = $expected_rows + 1; if ($batch_size > $max_batch) { $batch_size = $max_batch; } $expected_remain -= $batch_size; } } $qry = "HANDLER $table READ `$idx` = ($userid) LIMIT $batch_size"; $did_start = 1; } my $sth = $dbo->prepare($qry); $sth->execute; $ct = 0; while (my $r = $sth->fetchrow_arrayref) { if ($r->[$pripos] != $userid) { $hit_otheruser = 1; last; } $magic->($r) if $magic; $insert->($r); $tct++; $ct++; } } $flush->(); $dbo->do("HANDLER $table CLOSE"); # verify the important tables, even if --verify is off. if (! $opts->{verify} && $table =~ /^(talk|log)(2|text2)$/) { my $dblcheck = $dbo->selectrow_array("SELECT COUNT(*) FROM $table WHERE $idxcol=$userid"); die "# Expecting: $dblcheck, but got $tct\n" unless $dblcheck == $tct; } if ($opts->{verify} && $verifykey) { if ($table eq "dudata" || $table eq "ratelog") { print "# Verifying $table on size\n"; my $post = $dbch->selectrow_array("SELECT COUNT(*) FROM $table WHERE $idxcol=$userid"); die "Moved sized is smaller" if $post < $expected_rows; } else { print "# Verifying $table on key $verifykey\n"; my %post; my $sth; $sth = $dbch->prepare("SELECT $verifykey FROM $table WHERE $idxcol=$userid"); $sth->execute; while (my @ar = $sth->fetchrow_array) { $_ = join(",",@ar); unless (delete $pre{$_}) { die "Mystery row showed up in $table: uid=$userid, $verifykey=$_"; } } my $count = scalar keys %pre; die "Rows not moved for uid=$userid, table=$table. unmoved count = $count" if $count && $count != $rows_skipped{$table}; } } $was_empty{$table} = 1 unless $tct; $rows += $tct; } print "# Rows done for '$user': $rows\n" if $optv; my $post_state = $stateString->("post"); if ($post_state ne $pre_state) { die "Move aborted due to state change during move: Before: [$pre_state], After: [$post_state]\n"; } $check_sig->($dbo, "dbo(aftermove)"); my $unlocked; if (! $verify_code || $verify_code->()) { # unset readonly and move to new cluster in one update $unlocked = LJ::update_user($userid, { clusterid => $dclust, raw => "caps=caps&~(1<<$readonly_bit)" }); print "Moved.\n" if $optv; } else { # job server went away or we don't have permission to flip the clusterid attribute # so just unlock them $unlocked = LJ::update_user($userid, { raw => "caps=caps&~(1<<$readonly_bit)" }); die "Job server said no.\n"; } # delete from the index of who's read-only. if this fails we don't really care # (not all sites might have this table anyway) because it's not used by anything # except the readonly-cleaner which can deal with all cases. if ($unlocked) { eval { $dbh->do("DELETE FROM readonly_user WHERE userid=?", undef, $userid); }; } # delete from source cluster if ($opts->{del}) { print "Deleting from source cluster...\n" if $optv; foreach my $td (@to_delete) { my ($table, $pri) = @$td; while ($dboa->do("DELETE FROM $table WHERE $pri=$userid LIMIT 1000") > 0) { print " deleted from $table\n" if $optv; } } # s1stylecache table if (@styleids) { my $styleids_in = join(",", map { $dboa->quote($_) } @styleids); if ($dboa->do("DELETE FROM s1stylecache WHERE styleid IN ($styleids_in)") > 0) { print " deleted from s1stylecache\n" if $optv; } } } else { # at minimum, we delete the clustertrack2 row so it doesn't get # included in a future ljumover.pl query from that cluster. $dboa->do("DELETE FROM clustertrack2 WHERE userid=$userid"); } $dbh->do("UPDATE clustermove SET sdeleted=?, timedone=UNIX_TIMESTAMP() ". "WHERE cmid=?", undef, $opts->{del} ? 1 : 0, $cmid); return 1; } sub fetchTableInfo { my @tables = (@LJ::USER_TABLES, @LJ::USER_TABLES_LOCAL); my $memkey = "moveucluster:" . Digest::MD5::md5_hex(join(",",@tables)); my $tinfo = LJ::MemCache::get($memkey) || {}; foreach my $table (@tables) { next if grep { $_ eq $table } qw(events s1stylecache cmdbuffer captcha_session recentactions pendcomments); next if $tinfo->{$table}; # no need to load this one # find the index we'll use my $idx; # the index name we'll be using my $idxcol; # "userid" or "journalid" my $sth = $dbo->prepare("SHOW INDEX FROM $table"); $sth->execute; my @pris; while (my $r = $sth->fetchrow_hashref) { push @pris, $r->{'Column_name'} if $r->{'Key_name'} eq "PRIMARY"; next unless $r->{'Seq_in_index'} == 1; next if $idx; if ($r->{'Column_name'} eq "journalid" || $r->{'Column_name'} eq "userid" || $r->{'Column_name'} eq "commid") { $idx = $r->{'Key_name'}; $idxcol = $r->{'Column_name'}; } } shift @pris if @pris && ($pris[0] eq "journalid" || $pris[0] eq "userid"); my $verifykey = join(",", @pris); die "can't find index for table $table\n" unless $idx; $tinfo->{$table}{idx} = $idx; $tinfo->{$table}{idxcol} = $idxcol; $tinfo->{$table}{verifykey} = $verifykey; my $cols = $tinfo->{$table}{cols} = []; my $colnum = 0; $sth = $dboa->prepare("DESCRIBE $table"); $sth->execute; while (my $r = $sth->fetchrow_hashref) { push @$cols, $r->{'Field'}; if ($r->{'Field'} eq $idxcol) { $tinfo->{$table}{pripos} = $colnum; } $colnum++; } } LJ::MemCache::set($memkey, $tinfo, 90); # not for long, but quick enough to speed a series of moves return $tinfo; } ### FUNCTION: abortWithUsage( $message ) ### Abort the program showing usage message. sub abortWithUsage { my $msg = join '', @_; if ( $msg ) { pod2usage( -verbose => 1, -exitval => 1, -message => "$msg" ); } else { pod2usage( -verbose => 1, -exitval => 1 ); } }