#!/usr/bin/perl # # MogileFS client library # # Copyright 2004 Danga Interactive # # Authors: # Brad Whitaker # Brad Fitzpatrick # Mark Smith # # License: # GPL or Artistic License # # FIXME: add url to website (once we have one) # FIXME: add POD docs (Michael?) # ################################################################################ # MogileFS class # package MogileFS; use strict; use Carp; use IO::WrapTie; use LWP::UserAgent; use fields qw(root domain backend); sub new { my MogileFS $self = shift; $self = fields::new($self) unless ref $self; return $self->_init(@_); } sub reload { my MogileFS $self = shift; return undef unless $self; return $self->_init(@_); } sub errstr { my MogileFS $self = shift; return undef unless $self; return $self->{backend}->errstr; } # expects as argument a hashref of "standard-ip" => "preferred-ip" sub set_pref_ip { my MogileFS $self = shift; $self->{backend}->set_pref_ip(shift) if $self->{backend}; } # returns MogileFS::NewFile object, or undef if no device # available for writing sub new_file { my MogileFS $self = shift; my ($key, $class, $bytes) = @_; $bytes += 0; my $res = $self->{backend}->do_request ("create_open", { domain => $self->{domain}, class => $class, key => $key, multi_dest => 1, }) or return undef; my $dests = []; # [ [devid,path], [devid,path], ... ] # determine old vs. new format to populate destinations unless (exists $res->{dev_count}) { push @$dests, [ $res->{devid}, $res->{path} ]; } else { for my $i (1..$res->{dev_count}) { push @$dests, [ $res->{"devid_$i"}, $res->{"path_$i"} ]; } } my $main_dest = shift @$dests; my ($main_devid, $main_path) = ($main_dest->[0], $main_dest->[1]); # create a MogileFS::NewFile object, based off of IO::File if ($main_path =~ m!^http://!) { return IO::WrapTie::wraptie('MogileFS::NewHTTPFile', mg => $self, fid => $res->{fid}, path => $main_path, devid => $main_devid, backup_dests => $dests, class => $class, key => $key, content_length => $bytes+0, ); } else { return MogileFS::NewFile->new( mg => $self, fid => $res->{fid}, path => $main_path, devid => $main_devid, class => $class, key => $key ); } } # old style calling: # get_paths(key, noverify) # new style calling: # get_paths(key, { noverify => 0/1, zone => "zone" }); # but with both, second parameter is optional sub get_paths { my MogileFS $self = shift; my ($key, $opts) = @_; # handle parameters, if any my ($noverify, $zone); if (ref $opts) { $noverify = 1 if $opts->{noverify}; $zone = $opts->{zone} || undef; } else { $noverify = 1 if $opts; } my $res = $self->{backend}->do_request ("get_paths", { domain => $self->{domain}, key => $key, noverify => $noverify ? 1 : 0, zone => $zone, }) or return undef; my @paths = map { $res->{"path$_"} } (1..$res->{paths}); return @paths if scalar(@paths) > 0 && $paths[0] =~ m!^http://!; return map { "$self->{root}/$_"} @paths; } # given a key, returns a scalar reference pointing at a string containing # the contents of the file. takes one parameter; a scalar key to get the # data for the file. sub get_file_data { # given a key, load some paths and get data my MogileFS $self = $_[0]; my ($key, $timeout) = ($_[1], $_[2]); my @paths = $self->get_paths($key, 1); return undef unless @paths; # iterate over each foreach my $path (@paths) { next unless defined $path; if ($path =~ m!^http://!) { # try via HTTP my $ua = new LWP::UserAgent; $ua->timeout($timeout || 10); my $res = $ua->get($path); if ($res->is_success) { my $contents = $res->content; return \$contents; } } else { # open the file from disk and just grab it all open FILE, "<$path" or next; my $contents; { local $/ = undef; $contents = ; } close FILE; return \$contents if $contents; } } return undef; } # TODO: delete method on MogileFS::NewFile object # this method returns undef only on a fatal error such as inability to actually # delete a resource and inability to contact the server. attempting to delete # something that doesn't exist counts as success, as it doesn't exist. sub delete { my MogileFS $self = shift; my $key = shift; my $rv = $self->{backend}->do_request ("delete", { domain => $self->{domain}, key => $key, }); # if it's unknown_key, not an error return undef unless defined $rv || $self->{backend}->{lasterr} eq 'unknown_key'; return 1; } # just makes some sleeping happen. first and only argument is number of # seconds to instruct backend thread to sleep for. sub sleep { my MogileFS $self = shift; my $duration = shift; $self->{backend}->do_request("sleep", { duration => $duration + 0 }) or return undef; return 1; } # this method renames a file. it returns an undef on error (only a fatal error # is considered as undef; "file didn't exist" isn't an error). sub rename { my MogileFS $self = shift; my ($fkey, $tkey) = @_; my $rv = $self->{backend}->do_request ("rename", { domain => $self->{domain}, from_key => $fkey, to_key => $tkey, }); # if it's unknown_key, not an error return undef unless defined $rv || $self->{backend}->{lasterr} eq 'unknown_key'; return 1; } # used to get a list of keys matching a certain prefix. expected arguments: # ( $prefix, $after, $limit ) # prefix specifies what you want to get a list of. after is the item specified # as a return value from this function last time you called it. limit is optional # and defaults to 1000 keys returned. # # if you expect an array of return values, returns: # ($after, $keys) # but if you expect only a single value, you just get the arrayref of keys. the # value $after is to be used as $after when you call this function again. # # when there are no more keys in the list, you will get back undef(s). sub list_keys { my MogileFS $self = shift; my ($prefix, $after, $limit) = @_; my $res = $self->{backend}->do_request ("list_keys", { domain => $self->{domain}, prefix => $prefix, after => $after, limit => $limit, }) or return undef; # construct our list of keys and the new after value my $resafter = $res->{next_after}; my $reslist = []; for (my $i = 1; $i <= $res->{key_count}+0; $i++) { push @$reslist, $res->{"key_$i"}; } return wantarray ? ($resafter, $reslist) : $reslist; } ################################################################################ # MogileFS class methods # sub _fail { croak "MogileFS: $_[0]"; } sub _debug { return 1 unless $MogileFS::DEBUG; my $msg = shift; my $ref = shift; chomp $msg; eval "use Data::Dumper;"; print STDERR "$msg\n" . Dumper($ref) . "\n"; return 1; } sub _init { my MogileFS $self = shift; my %args = @_; # FIXME: add actual validation { # root is only needed for NFS based installations $self->{root} = $args{root}; # get domain (required) $self->{domain} = $args{domain} or _fail("constructor requires parameter 'domain'"); # create a new backend object if there's not one already, # otherwise call a reload on the existing one if ($self->{backend}) { $self->{backend}->reload( hosts => $args{hosts} ); } else { $self->{backend} = MogileFS::Backend->new( hosts => $args{hosts} ); } _fail("cannot instantiate MogileFS::Backend") unless $self->{backend}; } _debug("MogileFS object: [$self]", $self); return $self; } ################################################################################ # MogileFS::Admin class # package MogileFS::Admin; use strict; use Carp; use fields qw(backend); sub new { my MogileFS::Admin $self = shift; $self = fields::new($self) unless ref $self; my %args = @_; $self->{backend} = new MogileFS::Backend ( hosts => $args{hosts} ) or _fail("couldn't instantiate MogileFS::Backend"); return $self; } sub get_hosts { my MogileFS::Admin $self = shift; my $hostid = shift; my $args = $hostid ? { hostid => $hostid } : {}; my $res = $self->{backend}->do_request("get_hosts", $args) or return undef; my @ret = (); foreach my $ct (1..$res->{hosts}) { push @ret, { map { $_ => $res->{"host${ct}_$_"} } qw(hostid status hostname hostip http_port remoteroot) }; } return \@ret; } sub get_devices { my MogileFS::Admin $self = shift; my $devid = shift; my $args = $devid ? { devid => $devid } : {}; my $res = $self->{backend}->do_request("get_devices", $args) or return undef; my @ret = (); foreach my $ct (1..$res->{devices}) { push @ret, { (map { $_ => $res->{"dev${ct}_$_"} } qw(devid hostid status)), (map { $_ => $res->{"dev${ct}_$_"}+0 } qw(mb_total mb_used)) }; } return \@ret; } # get a hashref of statistics on how the MogileFS server is doing. there are several # sections of statistics, in this form: # { # replication => { "domain-name" => { "class-name" => { devcount => filecount }, ... }, ... }, # } sub get_stats { my MogileFS::Admin $self = shift; my $ret = {}; # do the request, default to request all stats if they didn't specify any push @_, 'all' unless @_; my $res = $self->{backend}->do_request("stats", { map { $_ => 1 } @_ }) or return undef; # get replication statistics foreach my $i (1..$res->{"replicationcount"}) { $ret->{replication}->{$res->{"replication${i}domain"}}->{$res->{"replication${i}class"}}->{$res->{"replication${i}devcount"}} = $res->{"replication${i}files"}; } # get file statistics foreach my $i (1..$res->{"filescount"}) { $ret->{files}->{$res->{"files${i}domain"}}->{$res->{"files${i}class"}} = $res->{"files${i}files"}; } # get device statistics foreach my $i (1..$res->{"devicescount"}) { $ret->{devices}->{$res->{"devices${i}id"}} = { host => $res->{"devices${i}host"}, status => $res->{"devices${i}status"}, files => $res->{"devices${i}files"}, }; } # return the created response return $ret; } # get a hashref of the domains we know about in the format of # { domain_name => { class_name => mindevcount, class_name => mindevcount, ... }, ... } sub get_domains { my MogileFS::Admin $self = shift; my $res = $self->{backend}->do_request("get_domains", {}) or return undef; my $ret; foreach my $i (1..$res->{domains}) { $ret->{$res->{"domain$i"}} = { map { $res->{"domain${i}class${_}name"} => $res->{"domain${i}class${_}mindevcount"} } (1..$res->{"domain${i}classes"}) }; } return $ret; } # create a new domain sub create_domain { my MogileFS::Admin $self = shift; my $domain = shift; my $res = $self->{backend}->do_request("create_domain", { domain => $domain }); return undef unless $res->{domain} eq $domain; return 1; } # create a class within a domain sub create_class { my MogileFS::Admin $self = shift; # wrapper around _mod_class(create) return $self->_mod_class(@_, 'create'); } # update a class's mindevcount within a domain sub update_class { my MogileFS::Admin $self = shift; # wrapper around _mod_class(update) return $self->_mod_class(@_, 'update'); } # change the state of a device; pass in the hostname of the host the # device is located on, the device id number, and the state you want # the host to be set to. returns 1 on success, undef on error. sub change_device_state { my MogileFS::Admin $self = shift; my ($host, $device, $state) = @_; my $res = $self->{backend}->do_request("set_state", { host => $host, device => $device, state => $state, }) or return undef; return 1; } ################################################################################ # MogileFS::Admin class methods # sub _fail { croak "MogileFS::Admin: $_[0]"; } *_debug = *MogileFS::_debug; # modify a class within a domain sub _mod_class { my MogileFS::Admin $self = shift; my ($domain, $class, $mindevcount, $verb) = @_; $verb ||= 'create'; my $res = $self->{backend}->do_request("${verb}_class", { domain => $domain, class => $class, mindevcount => $mindevcount, }); return undef unless $res->{class} eq $class; return 1; } ###################################################################### # MogileFS::Backend class # package MogileFS::Backend; use strict; no strict 'refs'; use Carp; use IO::Socket::INET; use Socket qw( MSG_NOSIGNAL PF_INET IPPROTO_TCP SOCK_STREAM ); use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN ); use POSIX (); use fields ('hosts', # arrayref of "$host:$port" of mogilefsd servers 'host_dead', # "$host:$port" -> $time (of last connect failure) 'lasterr', # string: \w+ identifer of last error 'lasterrstr', # string: english of last error 'sock_cache', # cached socket to mogilefsd tracker 'pref_ip', # hashref; { ip => preferred ip } 'timeout', # time in seconds to allow sockets to become readable ); use vars qw($FLAG_NOSIGNAL $PROTO_TCP); eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; }; sub new { my MogileFS::Backend $self = shift; $self = fields::new($self) unless ref $self; return $self->_init(@_); } sub reload { my MogileFS::Backend $self = shift; return undef unless $self; return $self->_init(@_); } sub set_pref_ip { my MogileFS::Backend $self = shift; $self->{pref_ip} = shift; $self->{pref_ip} = undef unless $self->{pref_ip} && ref $self->{pref_ip} eq 'HASH'; } sub _wait_for_readability { my ($fileno, $timeout) = @_; return 0 unless $fileno && $timeout; my $rin; vec($rin, $fileno, 1) = 1; my $nfound = select($rin, undef, undef, $timeout); # undef/0 are failure, 1 is success return $nfound ? 1 : 0; } sub do_request { my MogileFS::Backend $self = shift; my ($cmd, $args) = @_; _fail("invalid arguments to do_request") unless $cmd && $args; local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL; my $sock = $self->{sock_cache}; my $argstr = _encode_url_string(%$args); my $req = "$cmd $argstr\r\n"; my $reqlen = length($req); my $rv = 0; if ($sock) { # try our cached one, but assume it might be bogus _debug("SOCK: cached = $sock, REQ: $req"); $rv = send($sock, $req, $FLAG_NOSIGNAL); if ($! || ! defined $rv) { # undef is error, but $! may not be populated, we've found undef $self->{sock_cache}; } elsif ($rv != $reqlen) { return _fail("send() didn't return expected length ($rv, not $reqlen)"); } } unless ($rv) { $sock = $self->_get_sock or return _fail("couldn't connect to mogilefsd backend"); _debug("SOCK: $sock, REQ: $req"); $rv = send($sock, $req, $FLAG_NOSIGNAL); if ($!) { return _fail("error talking to mogilefsd tracker: $!"); } elsif ($rv != $reqlen) { return _fail("send() didn't return expected length ($rv, not $reqlen)"); } $self->{sock_cache} = $sock; } # wait up to 3 seconds for the socket to come to life unless (_wait_for_readability(fileno($sock), $self->{timeout})) { close($sock); return _fail("socket never became readable"); } my $line = <$sock>; _debug("RESPONSE: $line"); return _fail("socket closed on read") unless defined $line; # ERR if ($line =~ /^ERR\s+(\w+)\s*(\S*)/) { $self->{'lasterr'} = $1; $self->{'lasterrstr'} = $2 ? _unescape_url_string($2) : undef; _debug("LASTERR: $1 $2"); return undef; } # OK if ($line =~ /^OK\s+\d*\s*(\S*)/) { my $args = _decode_url_string($1); _debug("RETURN_VARS: ", $args); return $args; } _fail("invalid response from server: [$line]"); return undef; } sub errstr { my MogileFS::Backend $self = shift; return join(" ", $self->{'lasterr'}, $self->{'lasterrstr'}); } ################################################################################ # MogileFS::Backend class methods # sub _fail { croak "MogileFS::Backend: $_[0]"; } *_debug = *MogileFS::_debug; sub _connect_sock { # sock, sin, timeout my ($sock, $sin, $timeout) = @_; $timeout ||= 0.25; # make the socket non-blocking for the connection if wanted, but # unconditionally set it back to blocking mode at the end if ($timeout) { IO::Handle::blocking($sock, 0); } else { IO::Handle::blocking($sock, 1); } my $ret = connect($sock, $sin); if (!$ret && $timeout && $!==EINPROGRESS) { my $win=''; vec($win, fileno($sock), 1) = 1; if (select(undef, $win, undef, $timeout) > 0) { $ret = connect($sock, $sin); # EISCONN means connected & won't re-connect, so success $ret = 1 if !$ret && $!==EISCONN; } } # turn blocking back on, as we expect to do blocking IO on our sockets IO::Handle::blocking($sock, 1) if $timeout; return $ret; } sub _sock_to_host { # (host) my MogileFS::Backend $self = shift; my $host = shift; # create a socket and try to do a non-blocking connect my ($ip, $port) = $host =~ /^(.*):(\d+)$/; my $sock = "Sock_$host"; my $connected = 0; my $proto = $PROTO_TCP ||= getprotobyname('tcp'); my $sin; # try preferred ips if ($self->{pref_ip} && (my $prefip = $self->{pref_ip}->{$ip})) { _debug("using preferred ip $prefip over $ip"); socket($sock, PF_INET, SOCK_STREAM, $proto); $sin = Socket::sockaddr_in($port, Socket::inet_aton($prefip)); if (_connect_sock($sock, $sin, 0.1)) { $connected = 1; } else { _debug("failed connect to preferred ip $prefip"); close $sock; } } # now try the original ip unless ($connected) { socket($sock, PF_INET, SOCK_STREAM, $proto); $sin = Socket::sockaddr_in($port, Socket::inet_aton($ip)); return undef unless _connect_sock($sock, $sin); } # just throw back the socket we have so far return $sock; } sub _init { my MogileFS::Backend $self = shift; my %args = @_; # FIXME: add actual validation { $self->{hosts} = $args{hosts} or _fail("constructor requires parameter 'hosts'"); _fail("'hosts' argument must be an arrayref") unless ref $self->{hosts} eq 'ARRAY'; _fail("'hosts' argument must be of form: 'host:port'") if grep(! /:\d+$/, @{$self->{hosts}}); _fail("'timeout' argument must be a number") if $args{timeout} && $args{timeout} !~ /^\d+$/; $self->{timeout} = $args{timeout} || 3; } $self->{host_dead} = {}; return $self; } # return a new mogilefsd socket, trying different hosts until one is found, # or undef if they're all dead sub _get_sock { my MogileFS::Backend $self = shift; return undef unless $self; my $size = scalar(@{$self->{hosts}}); my $tries = $size > 15 ? 15 : $size; my $idx = int(rand() * $size); my $now = time(); my $sock; foreach (1..$tries) { my $host = $self->{hosts}->[$idx++ % $size]; # try dead hosts every 5 seconds next if $self->{host_dead}->{$host} && $self->{host_dead}->{$host} > $now - 5; last if $sock = $self->_sock_to_host($host); # mark sock as dead _debug("marking host dead: $host @ $now"); $self->{host_dead}->{$host} = $now; } return $sock; } sub _escape_url_string { my $str = shift; $str =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg; $str =~ tr/ /+/; return $str; } sub _unescape_url_string { my $str = shift; $str =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg; $str =~ tr/+/ /; return $str; } sub _encode_url_string { my %args = @_; return undef unless %args; return join("&", map { _escape_url_string($_) . '=' . _escape_url_string($args{$_}) } grep { defined $args{$_} } keys %args ); } sub _decode_url_string { my $arg = shift; my $buffer = ref $arg ? $arg : \$arg; my $hashref = {}; # output hash my $pair; my @pairs = split(/&/, $$buffer); my ($name, $value); foreach $pair (@pairs) { ($name, $value) = split(/=/, $pair); $value =~ tr/+/ /; $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg; $name =~ tr/+/ /; $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg; $hashref->{$name} .= $hashref->{$name} ? "\0$value" : $value; } return $hashref; } ###################################################################### # MogileFS::NewFile object # package MogileFS::NewFile; use strict; use IO::File; use base 'IO::File'; use Carp; sub new { my MogileFS::NewFile $self = shift; my %args = @_; my $mg = $args{mg}; # FIXME: validate %args my $file = "$mg->{root}/$args{path}"; # Note: we open the file for read/write (+) with clobber (>) because # although we mostly want to just clobber it and start afresh, some # modules later on (in our case, Image::Size) may want to seek around # in the file and do some reads. my $fh = new IO::File "+>$file" or _fail("couldn't open: $file: $!"); my $attr = _get_attrs($fh); # prefix our keys with "mogilefs_newfile_" as per IO::Handle recommendations # and copy safe %args values into $attr $attr->{"mogilefs_newfile_$_"} = $args{$_} foreach qw(mg fid devid key path); return bless $fh; } # some getter/setter functions sub path { return _getset(shift, "path"); } sub key { return _getset(shift, "key", @_); } sub class { return _getset(shift, "class", @_); } sub close { my MogileFS::NewFile $self = shift; # actually close the file $self->SUPER::close; # get a reference to the hash part of the $fh typeglob ref my $attr = $self->_get_attrs; my MogileFS $mg = $attr->{mogilefs_newfile_mg}; my $domain = $mg->{domain}; my $fid = $attr->{mogilefs_newfile_fid}; my $devid = $attr->{mogilefs_newfile_devid}; my $path = $attr->{mogilefs_newfile_path}; my $key = shift || $attr->{mogilefs_newfile_key}; $mg->{backend}->do_request ("create_close", { fid => $fid, devid => $devid, domain => $domain, key => $key, path => $path, }) or return undef; return 1; } ################################################################################ # MogileFS::NewFile class methods # sub _fail { croak "MogileFS::NewFile: $_[0]"; } *_debug = *MogileFS::debug; # get a reference to the hash part of the $fh typeglob ref sub _get_attrs { return \%{*{$_[0]}}; } sub _getset { my MogileFS::NewFile $self = shift; my $item = shift; my $attrs = $self->_get_attrs; # we're a setter if (@_) { return $attrs->{"mogilefs_newfile_$item"} = shift; } # we're a getter return $attrs->{"mogilefs_newfile_$item"}; } ################################################################################ # MogileFS::HTTPFile object # NOTE: This is meant to be used within IO::WrapTie... # package MogileFS::NewHTTPFile; use strict; no strict 'refs'; use Carp; use POSIX qw( EAGAIN ); use Socket qw( PF_INET SOCK_STREAM ); use Errno qw( EINPROGRESS EISCONN ); use vars qw($PROTO_TCP); use fields ('host', 'sock', # IO::Socket; created only when we need it 'uri', 'data', # buffered data we have 'pos', # simulated file position 'length', # length of data field 'content_length', # declared length of data we will be receiving (not required) 'mg', 'fid', 'devid', 'class', 'key', 'path', # full URL to save data to 'backup_dests', 'bytes_out', # count of how many bytes we've written to the socket 'data_in', # storage for data we've read from the socket ); sub path { _getset(shift, 'path'); } sub class { _getset(shift, 'class', @_); } sub key { _getset(shift, 'key', @_); } sub _parse_url { my MogileFS::NewHTTPFile $self = shift; my $url = shift; return 0 unless $url =~ m!http://(.+?)(/.+)$!; $self->{host} = $1; $self->{uri} = $2; $self->{path} = $url; return 1; } sub TIEHANDLE { my MogileFS::NewHTTPFile $self = shift; $self = fields::new($self) unless ref $self; my %args = @_; return undef unless $self->_parse_url($args{path}); $self->{data} = ''; $self->{length} = 0; $self->{backup_dests} = $args{backup_dests} || []; $self->{content_length} = $args{content_length} + 0; $self->{pos} = 0; $self->{$_} = $args{$_} foreach qw(mg fid devid class key); $self->{bytes_out} = 0; $self->{data_in} = ''; return $self; } *new = *TIEHANDLE; sub _sock_to_host { # (host) my MogileFS::NewHTTPFile $self = shift; my $host = shift; # setup my ($ip, $port) = $host =~ /^(.*):(\d+)$/; my $sock = "Sock_$host"; my $proto = $PROTO_TCP ||= getprotobyname('tcp'); my $sin; # create the socket socket($sock, PF_INET, SOCK_STREAM, $proto); $sin = Socket::sockaddr_in($port, Socket::inet_aton($ip)); # unblock the socket IO::Handle::blocking($sock, 0); # attempt a connection my $ret = connect($sock, $sin); if (!$ret && $! == EINPROGRESS) { my $win = ''; vec($win, fileno($sock), 1) = 1; # watch for writeability if (select(undef, $win, undef, 3) > 0) { $ret = connect($sock, $sin); # EISCONN means connected & won't re-connect, so success $ret = 1 if !$ret && $! == EISCONN; } } # just throw back the socket we have return $sock if $ret; return undef; } sub _connect_sock { my MogileFS::NewHTTPFile $self = shift; return 1 if $self->{sock}; my @down_hosts; while (!$self->{sock} && $self->{host}) { # attempt to connect return 1 if $self->{sock} = $self->_sock_to_host($self->{host}); push @down_hosts, $self->{host}; if (my $dest = shift @{$self->{backup_dests}}) { # dest is [$devid,$path] $self->_parse_url($dest->[1]) or _fail("bogus URL"); $self->{devid} = $dest->[0]; } else { $self->{host} = undef; } } _fail("unable to open socket to storage node (tried: @down_hosts): $!"); } # abstracted read; implements what ends up being a blocking read but # does it in terms of non-blocking operations. sub _getline { my MogileFS::NewHTTPFile $self = shift; return undef unless $self->{sock}; # short cut if we already have data read if ($self->{data_in} =~ s/^(.*?\r?\n)//) { return $1; } my $rin = ''; vec($rin, fileno($self->{sock}), 1) = 1; # nope, we have to read a line my $nfound; while ($nfound = select($rin, undef, undef, 3)) { my $data; my $bytesin = sysread($self->{sock}, $data, 1024); if (defined $bytesin) { $self->{data_in} .= $data; } else { next if $! == EAGAIN; _fail("error reading from node for device $self->{devid}: $!"); } # return a line if we got one if ($self->{data_in} =~ s/^(.*?\r?\n)//) { return $1; } } # if we got here, nothing was readable in our time limit return undef; } # abstracted write function that uses non-blocking I/O and checking for # writability to ensure that we don't get stuck doing a write if the # node we're talking to goes down. also handles logic to fall back to # a backup node if we're on our first write and the first node is down. # this entire function is a blocking function, it just uses intelligent # non-blocking write functionality. # # this function returns success (1) or it croaks on failure. sub _write { my MogileFS::NewHTTPFile $self = shift; return undef unless $self->{sock}; my $win = ''; vec($win, fileno($self->{sock}), 1) = 1; # setup data and counters my $data = shift(); my $bytesleft = length($data); my $bytessent = 0; # main sending loop for data, will keep looping until all of the data # we've been asked to send is sent my $nfound; while ($bytesleft && ($nfound = select(undef, $win, undef, 3))) { my $bytesout = syswrite($self->{sock}, $data, $bytesleft, $bytessent); if (defined $bytesout) { # update our myriad counters $bytessent += $bytesout; $self->{bytes_out} += $bytesout; $bytesleft -= $bytesout; } else { # if we get EAGAIN, restart the select loop, else fail next if $! == EAGAIN; _fail("error writing to node for device $self->{devid}: $!"); } } return 1 unless $bytesleft; # at this point, we had a socket error, since we have bytes left, and # the loop above didn't finish sending them. if this was our first # write, let's try to fall back to a different host. unless ($self->{bytes_out}) { if (my $dest = shift @{$self->{backup_dests}}) { # dest is [$devid,$path] $self->_parse_url($dest->[1]) or _fail("bogus URL"); $self->{devid} = $dest->[0]; $self->_connect_sock; # now repass this write to try again return $self->_write($data); } } # total failure (croak) $self->{sock} = undef; _fail("unable to write to any allocated storage node"); } sub PRINT { my MogileFS::NewHTTPFile $self = shift; # get data to send to server my $data = shift; my $newlen = length $data; $self->{pos} += $newlen; # now make socket if we don't have one if (!$self->{sock} && $self->{content_length}) { $self->_connect_sock; $self->_write("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{content_length}\r\n\r\n"); } # write some data to our socket if ($self->{sock}) { # save the first 1024 bytes of data so that we can seek back to it # and do some work later if ($self->{length} < 1024) { if ($self->{length} + $newlen > 1024) { $self->{length} = 1024; $self->{data} .= substr($data, 0, 1024 - $self->{length}); } else { $self->{length} += $newlen; $self->{data} .= $data; } } # actually write $self->_write($data); } else { # or not, just stick it on our queued data $self->{data} .= $data; $self->{length} += $newlen; } } *print = *PRINT; sub CLOSE { my MogileFS::NewHTTPFile $self = shift; # if we're closed and we have no sock... unless ($self->{sock}) { $self->_connect_sock; $self->_write("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{length}\r\n\r\n"); $self->_write($self->{data}); } # set a message in $! and $@ my $err = sub { $@ = "$_[0]\n"; return undef; }; # get response from put if ($self->{sock}) { my $line = $self->_getline; return $err->("Unable to read response line from server") unless defined $line; if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { # all 2xx responses are success unless ($1 >= 200 && $1 <= 299) { # read through to the body my ($found_header, $body); while (defined (my $l = $self->_getline)) { # remove trailing stuff $l =~ s/[\r\n\s]+$//g; $found_header = 1 unless $l; next unless $found_header; # add line to the body, with a space for readability $body .= " $l"; } $body = substr($body, 0, 512) if length $body > 512; return $err->("HTTP response $1 from upload: $body"); } } else { return $err->("Response line not understood: $line"); } $self->{sock}->close; } my MogileFS $mg = $self->{mg}; my $domain = $mg->{domain}; my $fid = $self->{fid}; my $devid = $self->{devid}; my $path = $self->{path}; my $key = shift || $self->{key}; my $rv = $mg->{backend}->do_request ("create_close", { fid => $fid, devid => $devid, domain => $domain, size => $self->{content_length} ? $self->{content_length} : $self->{length}, key => $key, path => $path, }); unless ($rv) { # set $@, as our callers expect $@ to contain the error message that # failed during a close. since we failed in the backend, we have to # do this manually. return $err->("$mg->{backend}->{lasterr}: $mg->{backend}->{lasterrstr}"); } return 1; } *close = *CLOSE; sub TELL { # return our current pos return $_[0]->{pos}; } *tell = *TELL; sub SEEK { # simply set pos... _fail("seek past end of file") if $_[1] > $_[0]->{length}; $_[0]->{pos} = $_[1]; } *seek = *SEEK; sub EOF { return ($_[0]->{pos} >= $_[0]->{length}) ? 1 : 0; } *eof = *EOF; sub BINMODE { # no-op, we're always in binary mode } *binmode = *BINMODE; sub READ { my MogileFS::NewHTTPFile $self = shift; my $count = $_[1] + 0; my $max = $self->{length} - $self->{pos}; $max = $count if $count < $max; $_[0] = substr($self->{data}, $self->{pos}, $max); $self->{pos} += $max; return $max; } *read = *READ; ################################################################################ # MogileFS::NewHTTPFile class methods # sub _fail { croak "MogileFS::NewHTTPFile: $_[0]"; } *_debug = *MogileFS::debug; sub _getset { my MogileFS::NewHTTPFile $self = shift; my $what = shift; if (@_) { # note: we're a TIEHANDLE interface, so we're not QUITE like a # normal class... our parameters tend to come in via an arrayref my $val = shift; $val = shift(@$val) if ref $val eq 'ARRAY'; return $self->{$what} = $val; } else { return $self->{$what}; } } 1;