ljr/wcmtools/mogilefs/utils/mogtool

1317 lines
40 KiB
Plaintext
Raw Permalink Normal View History

2019-02-05 21:49:12 +00:00
#!/usr/bin/perl
############################################################################
=head1 NAME
mogtool -- Inject/extract data to/from a MogileFS installation
=head1 SYNOPSIS
$ mogtool [general-opts] <command> [command-opts] <command-args>
$ mogtool --trackers=127.0.0.1:6001 --domain=foo --class=bar ...
$ mogtool --conf=foo.conf ...
$ mogtool inject thefile.tgz thefilekey
$ mogtool inject --bigfile thebigfile.tgz thefilekey
$ mogtool inject --bigfile --gzip thebigfile.tar thefilekey
$ mogtool inject --bigfile --gzip mydirectory thedirkey
$ mogtool inject --bigfile --gzip /dev/hda4 thedevkey
$ mogtool inject --bigfile --gzip --verify \
--description="Description" \
--receipt="foo@bar.com, baz@bar.com" \
--concurrent=5 --chunksize=32M \
somehugefile thefilekey
$ mogtool extract thefilekey thenewfile.tgz
$ mogtool extract thefilekey -
$ mogtool extract --bigfile thedirkey .
$ mogtool extract --bigfile --asfile thedirkey thefile.tgz
$ mogtool extract --bigfile thedevkey /dev/hda4
$ mogtool delete thekey
$ mogtool list
=head1 GENERAL OPTIONS
=over 4
=item --debug
Turn on MogileFS debug output.
=item --trackers=<[preferred_ip/]ip:port>[,<[preferred_ip/]ip:port>]*
Specify one or more trackers for your MogileFS installation. Note that
you can specify preferred IPs to override the default IPs with. So it
would look something like B<10.10.0.1/10.0.0.1:8081>.
=item --domain=<domain>
Set the MogileFS domain to use.
=item --class=<class>
Set the class within the domain to use. Defaults to _default.
=item --conf=<file>
Specify a configuration file to load from.
=item --lib=<directory>
Specify a directory to use as a library path. Right now, this should
be the directory where you expect to find the MogileFS.pm file, if it's
not actually installed.
=back
=head1 COMMANDS
=over 4
=item inject|i
Insert a resource into MogileFS. See L</"INJECT OPTIONS"> and L</"INJECT ARGUMENTS">
for the rest of how to use the inject mode.
=item extract|x
Extract a resource from MogileFS. See L</"EXTRACT OPTIONS"> and L</"EXTRACT ARGUMENTS">
for how to use extract.
=item delete|rm
Delete a resource. See L</"DELETE OPTIONS"> and L</"DELETE ARGUMENTS">.
=item list|ls
List all big files contained in MogileFS. No options, no arguments.
=back
=head1 INJECT OPTIONS
The following options are used to control the behavior of the injector.
=over 4
=item --bigfile|-b
If specified, use chunking to break the resource into manageable pieces.
=item --chunksize=<size>[B|K|M|G]
When instructed to break files into chunks, the injector will use the specified
chunk size as the maximum chunk size. Defaults to 64M. You can specify the
chunk size manually and specify the units--defaults to bytes.
=item --gzip|-z
If specified, mogtool will gzip the data as it's going into MogileFS. The resource
will be marked as compressed.
Note that you do not need to specify this if the resource is already gzipped, but
it doesn't hurt. (We automatically detect that and mark it as compressed.)
=item --overwrite
If you previously were working on injecting a big file as chunks and the process
died, normally mogtool refuses to do it again. Specify this option to force the
overwrite of that file.
B<NOTE:> Other than in the above case (partial failure), mogtool will not prompt
before overwriting an existing file.
=item --verify
If on, we do a full MD5 verification of every chunk after it is replicated. This
can take a while on large files!
=item --description=<text>
Specifies a description for this file. Optional, but assists in reporting and
listing the large files in MogileFS. (This is also displayed in any receipts
that are created.)
=item --receipt=<email address>[, <email address>]*
If specified, emails a copy of the receipt file to the specified comma-separated
email addresses. Also creates a local filesystem copy of the receipt file.
=item --concurrent=<number>
Specifies the number of concurrent processes to run for MogileFS insertion. If
you are noticing mogtool spend most of it's time waiting for children and not
actually buffering data, you may wish to raise this number. The default is 1
but we've found 3 or 4 work well.
=item
=back
=head1 INJECT ARGUMENTS
=over 4
=item resource
What you actually want to inject. This can be a file, directory, or a raw
partition in the format I</dev/X>.
Please see L</"USAGE EXAMPLES"> for more information on how to inject these
different types of resources and the differences thereof.
=item key
Specifies the key to save this file to. For big files, the key is actually
"_big_N:key" and "key,#" where N is one of a bunch of things we use and # is
the chunk number.
Generally, you want this to be descriptive so you remember what it is later
and can identify the file just by looking at the key.
=back
=head1 EXTRACT OPTIONS
=over 4
=item --bigfile|-b
If specified, indicates that this resource was chunked on injection and should be
reassembled for extraction.
=item --gzip|-z
Specifies to mogtool that it should ungzip the output if and only if it was
compressed when inserted into the MogileFS system. So, if you're extracting a
file that wasn't gzipped to begin with, this doesn't do anything.
=item --asfile
Useful when extracting something previously inserted as a directory--this option
instructs mogtool to treat the resource as a file and not actually run it
through tar for decompression.
=back
=head1 EXTRACT ARGUMENTS
=over 4
=item key
Specifies the key to get the file from.
=item destination
What destination means varies depending on what type of resource you're extracting.
However, no matter what, you can specify a single dash (B<->) to mean STDOUT.
Please see the usage examples for more information on how extract works.
=back
=head1 DELETE OPTIONS
=over 4
=item --bigfile|-b
The resource is a "big file" and all chunks should be deleted.
=back
=head1 DELETE ARGUMENTS
=over 4
=item key
Specifies the key of the file to delete.
=back
=head1 USAGE EXAMPLES
I<Please note that all examples assume you have a default config file that
contains the tracker and domain to use. Saves us from having to clutter up
the command line.>
=head2 Small Files (<64MB)
When it comes to using small files, mogtool is very, very easy.
=head3 Injection
$ mogtool inject foo.dbm foo.dbm.2004.12
Injects the file I<foo.dbm> into MogileFS under the key of I<foo.dbm.2004.12>.
$ mogtool inject --gzip foo.dbm foo.dbm.2004.12
Injects the same file to the same key, but compresses it on the fly for you.
=head3 Extraction
$ mogtool extract foo.dbm.2004.12 newfoo.dbm
Retrieves the key I<foo.dbm.2004.12> and saves it as I<newfoo.dbm>.
$ mogtool extract --gzip foo.dbm.2004.12 newfoo.dbm
Gets the file and automatically decompresses it, if and only if it was compressed.
So basically, you can turn on gzip in your config file and mogtool will do the
smart thing each time.
$ mogtool extract foo.dbm.2004.12 -
Print the resource to standard out. If you want, you can pipe it somewhere or
redirect to a file (but why not just specify the filename?).
=head2 Large Files (>64MB)
Given mogtool's ability to break files into chunks and later reassemble them,
inserting large files (even files over the 4GB barrier) is relatively easy.
=head3 Injection
$ mogtool inject --bigfile largefile.dat largefile.dat
As expected, inserts the file I<largefile.dat> into the MogileFS system under
the name I<largefile.dat>. Not very creative. Uses the default 64MB chunks.
$ mogtool inject --bigfile --chunksize=16M largefile.dat largefile.dat
Specify to use 16MB chunks instead of the default. Otherwise, the same.
$ mogtool inject --bigfile --chunksize=1000K --gzip largefile.dat somekey
Do it again, but specify 1000KB chunks, gzip automatically, and upload it under
a different key I<somekey>.
$ mogtool inject --bigfile --concurrent=5 --gzip largefile.dat somekey
Same as above, but use 5 children processes for uploading chunks to MogileFS.
This can take up to 300MB of memory in this example! (It tends to use about
(concurrency + 1) * chunksize bytes.)
$ mogtool inject --bigfile --chunksize=32M --concurrent=3 --gzip \
--receipt="foo@bar.com" --verify --description="A large file" \
largefile.dat somekey
Break this file into 128MB chunks, set a description, use 3 children to
upload them, gzip the file as you go, do a full MD5 verification of every
chunk, then email a receipt with all of the MogileFS paths to me.
Lots of flexibility with mogtool.
=head3 Extraction
$ mogtool extract --bigfile somekey newfile.dat
In its basic form, extracts the previously inserted large file and saves it as
I<newfile.dat>.
$ mogtool extract --bigfile --gzip somekey newfile.dat
If the file was gzipped on entry, ungzip it and save the result. If it wasn't
gzipped, then we just save it.
=head2 Directories
Directories are easily injected and extracted with mogtool. To create the data
stream that is inserted into MogileFS, we use tar.
=head3 Injection
$ mogtool inject --bigfile mydir mykey
Run I<mydir> through tar and then save it as I<mykey>.
$ mogtool inject --bigfile --gzip --concurrent=5 mydir mykey
Inject, but also gzip and use multiple injectors.
I<Note how this is just like injecting a large file. See injection examples for
large files for more examples.>
=head3 Extraction
$ mogtool extract --bigfile mykey .
Extract the previously injected directory I<mykey> to your local directory.
$ mogtool extract --bigfile --asfile mykey foo.tar
Take the previously generated tarball and save it as I<foo.tar>. Simply creates
the file instead of extracting everything inside.
=head2 Partitions/Devices
mogtool has the ability to inject raw partitions into MogileFS and to retrieve
them later and write them back to a partition. They're treated just like directories
for the most part, we just don't pipe things through tar.
=head3 Injection
$ mogtool inject --bigfile /dev/hda3 hda3.backup
Save a raw copy of your partition I</dev/hda3> to the key I<hda3.backup>.
$ mogtool inject --bigfile --gzip /dev/hda3 hda3.backup
Same, but compress on the fly during injection.
=head3 Extraction
$ mogtool extract --bigfile hda3.backup /dev/hda4
Extract the partition at I<hda3.backup> to the partition I</dev/hda4>. B<WARNING:>
mogtool won't ask for confirmation, make sure you don't mistype partition numbers!
=head2 Deleting a Resource
B<WARNING:> Please make sure you're specifying the right parameter, as delete does
not prompt for confirmation of the request!
$ mogtool delete thekey
Delete a normal file.
$ mogtool delete --bigfile thekey
Delete a chunked file--this deletes all chunks and the receipt, so the file is gone.
=head2 Listing Big Files
$ mogtool list backup
Lists all large files stored in MogileFS. It is not possible to list all normal files
at this time.
=head1 CONFIGURATION FILE
Instead of adding a ton of options to the command line every time, mogtool enables
you to create a default configuration file that it will read all of the options from.
It searches two locations for a default configuration file: B<~/.mogtool> and
B</etc/mogilefs/mogtool.conf>. (Alternately, you can specify B<--conf=whatever> as
an option on the command line.)
The file can consist of any number of the following items:
trackers = 10.0.0.3:7001, 10.10.0.5/10.0.0.5:7001
domain = mogiledomain
class = fileclass
lib = /home/foo/lib
gzip = 1
big = 1
overwrite = 1
chunksize = 32M
receipt = foo@bar.com, baz@bar.com
verify = 1
concurrent = 3
=head1 KNOWN BUGS
None? Send me any you find! :)
=head1 PLANNED FEATURES
=over 4
=item --concurrent for extract
It would be nice to have concurrent extraction going on.
=item recover mode
If the receipt file is ever corrupt in MogileFS it would be useful to recover a
file given just a receipt. It would have the same arguments as the extract mode,
except use a receipt file as the data source.
=item partition size verification
We can easily get the partition size when we save one to MogileFS, so we should
use that information to determine during extraction if a target partition is going
to be big enough.
=item on the fly gzip extraction
Right now we can gzip on an injection, but we should support doing decompression
on the fly coming out of MogileFS.
=item make list take a prefix
If you can specify a prefix, that makes things easier for finding small files that
are stored in MogileFS.
=item more information on list
Have list load up the info file and parse it for information about each of the
big files being stored. Maybe have this as an option (-l). (This means the
reading and parsing of info files should be abstracted into a function.)
=back
=head1 AUTHOR
Mark Smith E<lt>junior@danga.comE<gt> - most of the implementation and maintenance.
Brad Fitzpatrick E<lt>brad@danga.comE<gt> - concepts and rough draft.
Copyright (c) 2002-2004 Danga Interactive. All rights reserved.
=cut
##############################################################################
use strict;
use Getopt::Long;
use Pod::Usage qw{ pod2usage };
use Digest::MD5 qw{ md5_hex };
use Time::HiRes qw{ gettimeofday tv_interval };
use LWP::Simple;
use POSIX qw(:sys_wait_h);
use Compress::Zlib;
$| = 1;
use constant ERR_FATAL => 1;
my %opts;
$opts{help} = 0;
abortWithUsage() unless
GetOptions(
# general purpose options
'trackers=s' => \$opts{trackers},
'domain=s' => \$opts{domain},
'class=s' => \$opts{class},
'config=s' => \$opts{conf},
'help' => \$opts{help},
'debug' => \$MogileFS::DEBUG,
'lib' => \$opts{lib},
# extract+inject options
'gzip|z' => \$opts{gzip},
'bigfile|b' => \$opts{big},
# inject options
'overwrite' => \$opts{overwrite},
'chunksize=s' => \$opts{chunksize},
'receipt=s' => \$opts{receipt},
'reciept=s' => \$opts{receipt}, # requested :)
'verify' => \$opts{verify},
'description=s' => \$opts{des},
'concurrent=i' => \$opts{concurrent},
# extract options
'asfile' => \$opts{asfile},
);
# now load the config file?
my @confs = ( $opts{conf}, "$ENV{HOME}/.mogtool", "/etc/mogilefs/mogtool.conf" );
foreach my $conf (@confs) {
next unless $conf && -e $conf;
open FILE, "<$conf";
foreach (<FILE>) {
s!#.*!!;
next unless m!(\w+)\s*=\s*(.+)!;
$opts{$1} = $2;
}
close FILE;
}
# now bring in MogileFS, because hopefully we have a lib by now
if ($opts{lib}) {
eval "use lib '$opts{lib}';";
}
eval "use MogileFS;";
# no trackers and domain..?
unless ($opts{trackers} && $opts{domain}) {
abortWithUsage();
}
# init connection to mogile
my $mogfs = get_mogfs();
# get our command and pass off to our functions
my $cmd = shift;
inject() if $cmd eq 'i' || $cmd eq "inject";
extract() if $cmd eq 'x' || $cmd eq "extract";
list() if $cmd eq 'ls' || $cmd eq "list";
mdelete() if $cmd eq 'rm' || $cmd eq "delete";
# fail if we get this far
abortWithUsage();
######################################################################
sub get_mogfs {
my @trackerinput = split(/\s*,\s*/, $opts{trackers});
my @trackers;
my %pref_ip;
foreach my $tracker (@trackerinput) {
if ($tracker =~ m!(.+)/(.+):(\d+)!) {
$pref_ip{$2} = $1;
push @trackers, "$2:$3";
} else {
push @trackers, $tracker;
}
}
my $mogfs = MogileFS->new(
domain => $opts{domain},
hosts => \@trackers,
)
or error("Could not initialize MogileFS", ERR_FATAL);
$mogfs->set_pref_ip(\%pref_ip);
return $mogfs;
}
sub error {
my $err = shift() || "ERROR: no error message provided!";
print STDERR "$err\n";
if (my $errstr = $mogfs->errstr) {
$errstr =~ s/^\s+//;
$errstr =~ s/\s+$//;
if ($errstr) {
print STDERR "MogileFS backend error message: $errstr\n";
}
}
if ($@) {
my $err = $@;
$err =~ s/[\r\n]+$//;
print STDERR "System error message: $@\n";
}
# if a second argument, exit
if (defined (my $exitcode = shift())) {
exit $exitcode+0;
}
}
sub inject {
my $src = shift @ARGV;
my $key = shift @ARGV;
abortWithUsage() unless $src && $key;
# make sure the source exists and the key is valid
die "Error: source $src doesn't exist.\n"
unless -e $src;
die "Error: key $key isn't valid; must not contain spaces or commas.\n"
unless $key =~ /^[^\s\,]+$/;
# before we get too far, find sendmail?
my $sendmail;
if ($opts{receipt}) {
$sendmail = `which sendmail` || '/usr/sbin/sendmail';
$sendmail =~ s/[\r\n]+$//;
unless (-e $sendmail) {
die "Error: attempted to find sendmail binary in /usr/sbin but couldn't.\n";
}
}
# open up O as the handle to use for reading data
my $type = 'unknown';
if (-d $src) {
my $taropts = ($opts{gzip} ? 'z' : '') . "cf";
$type = 'tarball';
open (O, '-|', 'tar', $taropts, '-', $src)
or die "Couldn't open tar for reading: $!\n";
} elsif (-f $src) {
$type = 'file';
open (O, "<$src")
or die "Couldn't open file for reading: $!\n";
} elsif (-b $src) {
$type = 'partition';
open (O, "<$src")
or die "Couldn't open block device for reading: $!\n";
} else {
die "Error: not file, directory, or partition.\n";
}
# now do some pre-file checking...
my $size = -s $src;
if ($type ne 'file') {
die "Error: you specified to store a file of type $type but didn't specify --bigfile. Please see documentation.\n"
unless $opts{big};
} elsif ($size > 64 * 1024 * 1024) {
die "Error: the file is more than 64MB and you didn't specify --bigfile. Please see documentation.\n"
unless $opts{big};
}
# see if there's already a pre file?
if ($opts{big}) {
my $data = $mogfs->get_file_data("_big_pre:$key");
if (defined $data) {
unless ($opts{overwrite}) {
error(<<MSG, ERR_FATAL);
ERROR: The pre-insert file for $key exists. This indicates that a previous
attempt to inject a file failed--or is still running elsewhere! Please
verify that a previous injection of this file is finished, or run mogtool
again with the --overwrite inject option.
$$data
MSG
}
# delete the pre notice since we didn't die (overwrite must be on)
$mogfs->delete("_big_pre:$key")
or error("ERROR: Unable to delete _big_pre:$key.", ERR_FATAL);
}
# now create our pre notice
my $prefh = $mogfs->new_file("_big_pre:$key", $opts{class})
or error("ERROR: Unable to create _big_pre:$key.", ERR_FATAL);
$prefh->print("starttime:" . time());
$prefh->close()
or error("ERROR: Unable to save to _big_pre:$key.", ERR_FATAL);
}
# setup config and temporary variables we're going to be using
my $chunk_size = 64 * 1024 * 1024; # 64 MB
if ($opts{big}) {
if ($opts{chunksize} && ($opts{chunksize} =~ m!^(\d+)(G|M|K|B)?!i)) {
$chunk_size = $1;
unless (lc $2 eq 'b') {
$chunk_size *= (1024 ** ( { g => 3, m => 2, k => 1 }->{lc $2} || 2 ));
}
print "NOTE: Using chunksize of $chunk_size bytes.\n";
}
}
my $read_size = ($chunk_size > 1024*1024 ? 1024*1024 : $chunk_size);
# temporary variables
my $buf;
my $bufsize = 0;
my $chunknum = 0;
my %chunkinfo; # { id => [ md5, length ] }
my %chunkbuf; # { id => data }
my %children; # { pid => chunknum }
my %chunksout; # { chunknum => pid }
# this function writes out a chunk
my $emit = sub {
my $cn = shift() + 0;
return unless $cn;
# get the length of the chunk we're going to send
my $bufsize = length $chunkbuf{$cn};
return unless $bufsize;
# now spawn off a child to do the real work
if (my $pid = fork()) {
print "Spawned child $pid to deal with chunk number $cn.\n";
$chunksout{$cn} = $pid;
$children{$pid} = $cn;
return;
}
# drop other memory references we're not using anymore
foreach my $chunknum (keys %chunkbuf) {
next if $chunknum == $cn;
delete $chunkbuf{$chunknum};
}
# as a child, get a new mogile connection
my $mogfs = get_mogfs();
my $dkey = $opts{big} ? "$key,$chunknum" : "$key";
# TODO: be resilient to transient errors, retry, etc.
my $start_time = [ gettimeofday() ];
my $try = 0;
while (1) {
$try++;
my $fh = $mogfs->new_file($dkey, $opts{class}, $bufsize);
unless (defined $fh) {
error("WARNING: Unable to create new file '$dkey'.");
printf "This was try #$try and it's been %.2f seconds since we first tried. Retrying...\n", tv_interval($start_time);
sleep 1;
next;
}
$fh->print($chunkbuf{$cn});
unless ($fh->close) {
error("WARNING: Unable to save file '$dkey'.");
printf "This was try #$try and it's been %.2f seconds since we first tried. Retrying...\n", tv_interval($start_time);
sleep 1;
next;
}
last;
}
my $diff = tv_interval($start_time);
printf " chunk $cn saved in %.2f seconds.\n", $diff;
# make sure we never return, always exit
exit 0;
};
# just used to reap our children in a loop until they're done. also
# handles respawning a child that failed.
my $reap_children = sub {
# find out if we have any kids dead
while ((my $pid = waitpid -1, WNOHANG) > 0) {
my $cnum = delete $children{$pid};
unless ($cnum) {
print "Error: reaped child $pid, but no idea what they were doing...\n";
next;
}
if (my $status = $?) {
print "Error: reaped child $pid for chunk $cnum returned non-zero status... Retrying...\n";
$emit->($cnum);
next;
}
my @paths = grep { defined $_ } $mogfs->get_paths($opts{big} ? "$key,$cnum" : "$key", 1);
unless (@paths) {
print "Error: reaped child $pid for chunk $cnum but no paths exist... Retrying...\n";
$emit->($cnum);
next;
}
delete $chunkbuf{$cnum};
delete $chunksout{$cnum};
print "Child $pid successfully finished with chunk $cnum.\n";
}
};
# this function handles parallel threads
$opts{concurrent} ||= 1;
$opts{concurrent} = 1 if $opts{concurrent} < 1;
my $handle_children = sub {
# here we pause while our children are working
my $first = 1;
while ($first || scalar(keys %children) >= $opts{concurrent}) {
$first = 0;
$reap_children->();
select undef, undef, undef, 0.1;
}
# now spawn until we hit the limit
foreach my $cnum (keys %chunkbuf) {
next if $chunksout{$cnum};
$emit->($cnum);
last if scalar(keys %children) >= $opts{concurrent};
}
};
# setup compression stuff
my $dogzip = 0;
my $zlib;
if ($opts{gzip}) {
# if they turned gzip on we may or may not need this stream, so make it
$zlib = deflateInit()
or error("Error: unable to create gzip deflation stream", ERR_FATAL);
}
# read one meg chunks while we have data
my $sum = 0;
my $readbuf = '';
while (my $rv = read(O, $readbuf, $read_size)) {
# if this is a file, and this is our first read, see if it's gzipped
if (!$sum && $rv >= 2) {
if (substr($readbuf, 0, 2) eq "\x1f\x8b") {
# this is already gzipped, so just mark it as such and insert it
$opts{gzip} = 1;
} else {
# now turn on our gzipping if the user wants the output gzipped
$dogzip = 1 if $opts{gzip};
}
}
# now run it through the deflation stream before we process it here
if ($dogzip) {
my ($out, $status) = $zlib->deflate($readbuf);
error("Error: Deflation failure processing stream", ERR_FATAL)
unless $status == Z_OK;
$readbuf = $out;
$rv = length $readbuf;
# we don't always get a chunk from deflate
next unless $rv;
}
# now stick our data into our real buffer
$buf .= $readbuf;
$bufsize += $rv;
$sum += $rv;
$readbuf = '';
# generate output
if ($type ne 'tarball' && $size && $size > $read_size) {
printf "Buffer so far: $bufsize bytes [%.2f%% complete]\r", ($sum / $size * 100);
} else {
print "Buffer so far: $bufsize bytes\r";
}
# if we have one chunk, handle it
if ($bufsize >= $chunk_size) {
$chunkbuf{++$chunknum} = substr($buf, 0, $chunk_size);
# calculate the md5, print out status, and save this chunk
my $md5 = md5_hex($buf);
if ($opts{big}) {
print "chunk $key,$chunknum: $md5, len = $chunk_size\n";
} else {
print "file $key: $md5, len = $chunk_size\n";
}
$chunkinfo{$chunknum} = [ $md5, $chunk_size ];
# reset for the next read loop
$buf = substr($buf, $chunk_size);
$bufsize = length $buf;
# now spawn children to save chunks
$handle_children->();
}
}
close O;
# now we need to flush the gzip engine
if ($dogzip) {
my ($out, $status) = $zlib->flush;
error("Error: Deflation failure processing stream", ERR_FATAL)
unless $status == Z_OK;
$buf .= $out;
$bufsize += length $out;
$sum += length $out;
}
# final piece
if ($buf) {
$chunkbuf{++$chunknum} = $buf;
my $md5 = md5_hex($buf);
if ($opts{big}) {
print "chunk $key,$chunknum: $md5, len = $bufsize\n";
} else {
print "file $key: $md5, len = $bufsize\n";
}
$chunkinfo{$chunknum} = [ $md5, $bufsize ];
}
# now, while we still have chunks to process...
while (%chunkbuf) {
$handle_children->();
sleep 1;
}
# verify replication and chunks
my %paths; # { chunknum => [ path, path, path ... ] }
my %still_need = ( %chunkinfo );
while (%still_need) {
print "Beginning replication wait: " . join(' ', sort { $a <=> $b } keys %still_need) . "\n";
sleep 1; # give things time to replicate some
# now iterate over each and get the paths
foreach my $num (keys %still_need) {
my $dkey = $opts{big} ? "$key,$num" : $key;
my @npaths = grep { defined $_ } $mogfs->get_paths($dkey, 1);
unless (@npaths) {
error("FAILURE: chunk $num has no paths at all.", ERR_FATAL);
}
if (scalar(@npaths) >= 2) {
# okay, this one's replicated, actually verify the paths
foreach my $path (@npaths) {
if ($opts{verify}) {
print " Verifying chunk $num, path $path...";
my $data = get($path);
my $len = length($data);
my $md5 = md5_hex($data);
if ($md5 ne $chunkinfo{$num}->[0]) {
print "md5 mismatch\n";
next;
} elsif ($len != $chunkinfo{$num}->[1]) {
print "length mismatch ($len, $chunkinfo{$num}->[1])\n";
next;
}
print "ok\n";
} elsif ($opts{receipt}) {
# just do a quick size check
print " Size verifying chunk $num, path $path...";
my $clen = (head($path))[1] || 0;
unless ($clen == $chunkinfo{$num}->[1]) {
print "length mismatch ($clen, $chunkinfo{$num}->[1])\n";
next;
}
print "ok\n";
}
push @{$paths{$num} ||= []}, $path;
}
# now make sure %paths contains at least 2 verified
next unless scalar(@{$paths{$num} || []}) >= 2;
delete $still_need{$num};
}
}
}
# prepare the info file
my $des = $opts{des} || 'no description';
my $compressed = $opts{gzip} ? '1' : '0';
#FIXME: add 'partblocks' to info file
# create the info file
my $info = <<INFO;
des $des
type $type
compressed $compressed
filename $src
chunks $chunknum
size $sum
INFO
foreach (sort { $a <=> $b } keys %chunkinfo) {
$info .= "part $_ bytes=$chunkinfo{$_}->[1] md5=$chunkinfo{$_}->[0] paths: ";
$info .= join(', ', @{$paths{$_} || []});
$info .= "\n";
}
# now write out the info file
if ($opts{big}) {
my $fhinfo = $mogfs->new_file("_big_info:$key", $opts{class})
or error("ERROR: Unable to create _big_info:$key.", ERR_FATAL);
$fhinfo->print($info);
$fhinfo->close()
or error("ERROR: Unable to save _big_info:$key.", ERR_FATAL);
# verify info file
print "Waiting for info file replication...\n";
while (1) {
my @paths = $mogfs->get_paths("_big_info:$key", 1);
next unless scalar(@paths) >= 2;
foreach my $path (@paths) {
my $data = get($path);
error(" FATAL: content mismatch on $path", ERR_FATAL)
unless $data eq $info;
}
last;
}
# now delete our pre file
print "Deleting pre-insert file...\n";
$mogfs->delete("_big_pre:$key")
or error("ERROR: Unable to delete _big_pre:$key", ERR_FATAL);
}
# now email and save a receipt
if ($opts{receipt}) {
open MAIL, "| $sendmail -t"
or error("ERROR: Unable to open sendmail binary: $sendmail", ERR_FATAL);
print MAIL <<MAIL;
To: $opts{receipt}
From: mogtool\@dev.null
Subject: mogtool.$key.receipt
$info
.
MAIL
close MAIL;
print "Receipt emailed.\n";
# now dump to a file
open FILE, ">mogtool.$key.receipt"
or error("ERROR: Unable to create file mogtool.$key.receipt in current directory.", ERR_FATAL);
print FILE $info;
close FILE;
print "Receipt stored in mogtool.$key.receipt.\n";
}
exit 0;
}
sub _parse_info {
my $info = shift;
my $res = {};
# parse out the header data
$res->{des} = ($info =~ /^des\s+(.+)$/m) ? $1 : undef;
$res->{type} = ($info =~ /^type\s+(.+)$/m) ? $1 : undef;
$res->{compressed} = ($info =~ /^compressed\s+(.+)$/m) ? $1 : undef;
$res->{filename} = ($info =~ /^filename\s+(.+)$/m) ? $1 : undef;
$res->{chunks} = ($info =~ /^chunks\s+(\d+)$/m) ? $1 : undef;
$res->{size} = ($info =~ /^size\s+(\d+)$/m) ? $1 : undef;
# now get the pieces
$res->{maxnum} = undef;
while ($info =~ /^part\s+(\d+)\s+bytes=(\d+)\s+md5=(.+)\s+paths:\s+(.+)$/mg) {
$res->{maxnum} = $1 if !defined $res->{maxnum} || $1 > $res->{maxnum};
$res->{parts}->{$1} = {
bytes => $2,
md5 => $3,
paths => [ split(/\s*,\s*/, $4) ],
};
}
return $res;
}
sub extract {
my $key = shift @ARGV;
my $dest = shift @ARGV;
abortWithUsage() unless $key && $dest;
error("Error: key $key isn't valid; must not contain spaces or commas.", ERR_FATAL)
unless $key =~ /^[^\s\,]+$/;
unless ($dest eq '-' || $dest eq '.') {
error("Error: destination exists: $dest (specify --overwrite if you want to kill it)", ERR_FATAL)
if -e $dest && !$opts{overwrite} && !-b $dest;
}
# see if this is really a big file
my $file;
if ($opts{big}) {
my $info = $mogfs->get_file_data("_big_info:$key");
die "$key doesn't seem to be a valid big file.\n"
unless $info && $$info;
# verify validity
$file = _parse_info($$info);
# make sure we have enough info
error("Error: info file doesn't contain the number of chunks", ERR_FATAL)
unless $file->{chunks};
error("Error: info file doesn't contain the total size", ERR_FATAL)
unless $file->{size};
} else {
# not a big file, so it has to be of a certain type
$file->{type} = 'file';
$file->{maxnum} = 1;
$file->{parts}->{1} = {
paths => [ grep { defined $_ } $mogfs->get_paths($key) ],
};
# now, if it doesn't exist..
unless (scalar(@{$file->{parts}->{1}->{paths}})) {
error("Error: file doesn't exist (or did you forget --bigfile?)", ERR_FATAL);
}
}
# several cases.. going to stdout?
if ($dest eq '-') {
*O = *STDOUT;
} else {
# open up O as the handle to use for reading data
if ($file->{type} eq 'file' || $file->{type} eq 'partition' ||
($file->{type} eq 'tarball' && $opts{asfile})) {
# just write it to the file with this name, but don't overwrite?
if ($dest eq '.') {
$dest = $file->{filename};
$dest =~ s!^(.+)/!!;
}
if (-b $dest) {
# if we're targetting a block device...
warn "FIXME: add in block checking\n";
open O, ">$dest"
or die "Couldn't open $dest: $!\n";
} elsif (-e $dest) {
if ($opts{overwrite}) {
open O, ">$dest"
or die "Couldn't open $dest: $!\n";
} else {
die "File already exists: $dest ... won't overwrite without --overwrite.\n";
}
} else {
open O, ">$dest"
or die "Couldn't open $dest: $!\n";
}
} elsif ($file->{type} eq 'tarball') {
my $taropts = ($file->{compressed} ? 'z' : '') . "xf";
open O, '|-', 'tar', $taropts, '-'
or die "Couldn't open tar for writing: $!\n";
} else {
die "Error: unable to handle type '$file->{type}'\n";
}
}
# start fetching pieces
foreach my $i (1..$file->{maxnum}) {
print "Fetching piece $i...\n";
foreach my $path (@{$file->{parts}->{$i}->{paths} || []}) {
print " Trying $path...\n";
my $data = get($path);
next unless $data;
# now verify MD5, etc
if ($opts{big}) {
my $len = length $data;
my $md5 = md5_hex($data);
print " ($len bytes, $md5)\n";
next unless $len == $file->{parts}->{$i}->{bytes} &&
$md5 eq $file->{parts}->{$i}->{md5};
}
# this chunk verified, write it out
print O $data;
last;
}
}
# at this point the file should be complete!
close O;
print "Done.\n";
# now make sure we have enough data
#$ mogtool [opts] extract <key> {<file>,<dir>,<device>}
#=> - (for stdout) (if compressed, add "z" flag)
#=> . (to untar) (if compressed, do nothing???, make .tar.gz file -- unless they use -z again?)
#=> /dev/sda4 (but check /proc/partitions that it's big enough) (if compress, Compress::Zlib to ungzip
# => foo.jpg (write it to a file)
# now check
exit 0;
}
sub list {
# list all big files in mogile
my ($ct, $after, $list);
while (($after, $list) = $mogfs->list_keys("_big_info:", $after)) {
last unless $list && @$list;
# now extract the key and dump it
foreach my $key (@$list) {
next unless $key =~ /^_big_info:(.+)$/;
$key = $1;
$ct++;
print "$key\n";
}
}
print "#$ct files found\n";
exit 0;
}
sub mdelete {
die "not implemented yet\n";
}
abortWithUsage() if $opts{help};
sub abortWithUsage {
my $msg = join '', @_;
if ( $msg ) {
pod2usage( -verbose => 1, -exitval => 1, -message => "$msg" );
} else {
pod2usage( -verbose => 1, -exitval => 1 );
}
}
__END__
Usage: mogtool [opts] <command> [command-opts] [command-args]
General options:
* --trackers=<ip:port>[,<ip:port>]*
* --domain=<domain>
* --class=<class>
* --conf=<file> Location of config file listing trackers, default
domain, and default class
Default: ~/.mogilefs, /etc/mogilefs/mogilefs.conf
* --bigfile | -b Tell mogtool to split file into 64MB chunks and
checksum the chunks,
* --gzip | -z Use gzip compression/decompression
Commands:
inject | i Inject a file into MogileFS, by key
extract | x Extract a file from MogileFS, by key
list | ls List large files in MogileFS
'inject' syntax:
$ mogtool [opts] inject [i-opts] <file,dir,device> <key>
Valid i-opts:
--overwrite Ignore existing _big_pre: and start anew.
--chunksize=n Set the size of individual chunk files. n is in the format of
number[scale] so 10 is 10 megabytes, 10M is also 10 megs, 10G, 10B, 10K...
case insensitive
--receipt=email Send a receipt to the specified email address
--verify Make sure things replicate and then check the MD5s?
--des=string Set the file description
$ mogtool [opts] extract <key> {<file>,<dir>,<device>}
=> - (for stdout) (if compressed, add "z" flag)
=> . (to untar) (if compressed, do nothing???, make .tar.gz file -- unless they use -z again?)
=> /dev/sda4 (but check /proc/partitions that it's big enough) (if compress, Compress::Zlib to ungzip)
=> foo.jpg (write it to a file)
--key
# mogtool add --key='roast.sdb1.2004-11-07' -z /dev/sda1
<key> = "cow.2004.11.17"
# this is a temporary file that we delete when we're doing recording all chunks
_big_pre:<key>
starttime=UNIXTIMESTAMP
# when done, we write the _info file and delete the _pre.
_big_info:<key>
des Cow's ljdb backup as of 2004-11-17
type { partition, file, tarball }
compressed {0, 1}
filename ljbinlog.305.gz
partblocks 234324324324
part 1 <bytes> <md5hex>
part 2 <bytes> <md5hex>
part 3 <bytes> <md5hex>
part 4 <bytes> <md5hex>
part 5 <bytes> <md5hex>
_big:<key>,<n>
_big:<key>,<n>
_big:<key>,<n>
Receipt format:
BEGIN MOGTOOL RECIEEPT
type partition
des Foo
compressed foo
part 1 bytes=23423432 md5=2349823948239423984 paths: http://dev5/2/23/23/.fid, http://dev6/23/423/4/324.fid
part 1 bytes=23423432 md5=2349823948239423984 paths: http://dev5/2/23/23/.fid, http://dev6/23/423/4/324.fid
part 1 bytes=23423432 md5=2349823948239423984 paths: http://dev5/2/23/23/.fid, http://dev6/23/423/4/324.fid
part 1 bytes=23423432 md5=2349823948239423984 paths: http://dev5/2/23/23/.fid, http://dev6/23/423/4/324.fid
END RECIEPT
###
perl -w bin/mogtool --gzip inject --overwrite --chunksize=24M --des="This is a description" --receipt="marksmith@danga.com" ../music/jesse/Unsorted jesse.music.unsorted