From 8f9f356c1b219d873d2324de768acee58a4fe907 Mon Sep 17 00:00:00 2001 From: Yuriy Nevinitsin <nevinitsin@corp.mail.ru> Date: Sun, 11 Mar 2012 19:11:10 +0400 Subject: [PATCH] [connector][perl] misc fixes --- connector/perl/lib/MR/IProto.pm | 16 +- .../perl/lib/MR/IProto/Cluster/Server.pm | 9 +- .../perl/lib/MR/IProto/Connection/Sync.pm | 2 +- connector/perl/lib/MR/Tarantool/Box.pm | 37 ++- connector/perl/t/box.pl | 267 +++++++++++++++++- 5 files changed, 303 insertions(+), 28 deletions(-) diff --git a/connector/perl/lib/MR/IProto.pm b/connector/perl/lib/MR/IProto.pm index 671ca29553..6a6d4c6818 100644 --- a/connector/perl/lib/MR/IProto.pm +++ b/connector/perl/lib/MR/IProto.pm @@ -238,6 +238,8 @@ sub send { return; }, \%servers); + return if $message->{continue} && !$fh; + my $cont = sub { $self->_recv_now(\%servers, max => $message->{continue}?1:0); $! = $errno; @@ -378,6 +380,7 @@ around BUILDARGS => sub { $srvargs{tcp_nodelay} = delete $args{tcp_nodelay} if exists $args{tcp_nodelay}; $srvargs{tcp_keepalive} = delete $args{tcp_keepalive} if exists $args{tcp_keepalive}; $srvargs{dump_no_ints} = delete $args{dump_no_ints} if exists $args{dump_no_ints}; + $srvargs{prefix} = $args{prefix}; my %clusterargs; $clusterargs{balance} = delete $args{balance} if exists $args{balance}; $clusterargs{servers} = [ @@ -544,10 +547,11 @@ sub _send_now { sub _send_try { my ($self, $sync, $args, $handler, $try) = @_; my $xsync = $sync ? 'sync' : 'async'; - $self->_debug(sprintf "send msg=%d try %d of %d total", $args->{msg}, $try, $self->max_request_retries ) if $self->debug >= 2; + $args->{max_request_retries} ||= $self->max_request_retries; + $self->_debug(sprintf "send msg=%d try %d of %d total", $args->{msg}, $try, $args->{max_request_retries} ) if $self->debug >= 2; my $server = $self->cluster->server( $args->{key} ); my $connection = $server->$xsync(); - $connection->send($args->{msg}, $args->{body}, $handler, $args->{no_reply}, $args->{sync}); + return unless $connection->send($args->{msg}, $args->{body}, $handler, $args->{no_reply}, $args->{sync}); $sync->{$connection} ||= $connection if $sync; return $connection->fh; } @@ -584,8 +588,8 @@ sub _server_callback { my $retry = defined $args->{request} ? $args->{request}->retry() : ref $args->{retry} eq 'CODE' ? $args->{retry}->() : $args->{retry}; - $self->_debug("send: failed[@{[$retry, $$try+1, $self->max_request_retries]}]") if $self->debug >= 2; - if( $retry && $$try++ < $self->max_request_retries ) { + $self->_debug("send: failed[@{[$retry, $$try+1, $args->{max_request_retries}]}]") if $self->debug >= 2; + if( $retry && $$try++ < $args->{max_request_retries} ) { $self->_send_retry($sync, $args, $$handler, $$try); } else { @@ -606,10 +610,10 @@ sub _server_callback { 1; }; if($ok) { - if( defined $args->{request} && $data->retry && $$try++ < $self->max_request_retries ) { + if( defined $args->{request} && $data->retry && $$try++ < $args->{max_request_retries} ) { $self->_send_retry($sync, $args, $$handler, $$try); } - elsif( defined $args->{is_retry} && $args->{is_retry}->($data) && $$try++ < $self->max_request_retries ) { + elsif( defined $args->{is_retry} && $args->{is_retry}->($data) && $$try++ < $args->{max_request_retries} ) { $self->_send_retry($sync, $args, $$handler, $$try); } else { diff --git a/connector/perl/lib/MR/IProto/Cluster/Server.pm b/connector/perl/lib/MR/IProto/Cluster/Server.pm index e22de5e00e..ec059ad34c 100644 --- a/connector/perl/lib/MR/IProto/Cluster/Server.pm +++ b/connector/perl/lib/MR/IProto/Cluster/Server.pm @@ -29,6 +29,12 @@ coerce 'MR::IProto::Cluster::Server' ); }; +has prefix => ( + is => 'ro', + isa => 'Str', + default => 'MR::IProto', +); + =head1 ATTRIBUTES =over @@ -184,10 +190,11 @@ sub _build_sync { sub _build_debug_cb { my ($self) = @_; + my $prefix = $self->prefix; return sub { my ($msg) = @_; chomp $msg; - warn "MR::IProto: $msg\n"; + warn "$prefix: $msg\n"; return; }; } diff --git a/connector/perl/lib/MR/IProto/Connection/Sync.pm b/connector/perl/lib/MR/IProto/Connection/Sync.pm index 9e89ba7cc1..0331937094 100644 --- a/connector/perl/lib/MR/IProto/Connection/Sync.pm +++ b/connector/perl/lib/MR/IProto/Connection/Sync.pm @@ -98,7 +98,7 @@ sub send { else { $self->_handle_error($sync, $callback, $@); } - return; + return $ok; } sub recv_all { diff --git a/connector/perl/lib/MR/Tarantool/Box.pm b/connector/perl/lib/MR/Tarantool/Box.pm index b8425af61d..7bb67ac23d 100644 --- a/connector/perl/lib/MR/Tarantool/Box.pm +++ b/connector/perl/lib/MR/Tarantool/Box.pm @@ -79,7 +79,7 @@ use constant { sub IPROTOCLASS () { 'MR::IProto' } use vars qw/$VERSION %ERRORS/; -$VERSION = 0.0.13; +$VERSION = 0.0.14; BEGIN { *confess = \&MR::IProto::confess } @@ -368,6 +368,8 @@ sub _connect { name => $self->{name}, debug => $self->{'ipdebug'}, dump_no_ints => 1, + max_request_retries => 1, + retry_delay => $self->{retry_delay}, }); } @@ -418,6 +420,8 @@ sub _chat { my $return_fh = delete $param{return_fh}; my $_cb = $callback || $return_fh; + die "Can't use raise and callback together" if $callback && $self->{raise}; + my $is_retry = sub { my ($data) = @_; $retry_count++; @@ -442,7 +446,7 @@ sub _chat { my ($ret_code, $data, $full_code) = @$data; $self->{_last_error} = $full_code; - $self->{_last_error_msg} = $message = $ret_code->[0] == 0 ? "ok" : sprintf "Error %08X: %s", $full_code, $$data || $ERRORS{$full_code & 0xFFFFFF00} || 'Unknown error'; + $self->{_last_error_msg} = $message = $ret_code->[0] == 0 ? "" : sprintf "Error %08X: %s", $full_code, $$data || $ERRORS{$full_code & 0xFFFFFF00} || 'Unknown error'; $self->_debug("$self->{name}: $message") if $ret_code->[0] != 0 && $self->{debug} >= 1; if ($ret_code->[0] == 0) { @@ -470,7 +474,7 @@ sub _chat { if ($callback) { $self->{_last_error} = 0x77777777; $self->{server}->SetTimeout($timeout); - return 1 if eval { $self->{server}->send({%param, is_retry => $is_retry}, $process); 1 }; + return 1 if eval { $self->{server}->send({%param, is_retry => $is_retry, max_request_retries => $retry}, $process); 1 }; return 0; } @@ -939,7 +943,7 @@ sub Select { my $cb = sub { my ($r) = (@_); - $self->_PostSelect($r, $param, $namespace); + $self->_PostSelect($r, $param, $namespace) if $r; if ($r && defined(my $p = $param->{hash_by})) { my %h; @@ -961,10 +965,12 @@ sub Select { return $param->{callback}->($r); } + return unless $r; + return $r if $param->{hash_by}; return $r if $param->{want} eq 'arrayref'; $wantarray = wantarray if $param->{return_fh}; - + if ($wantarray) { return @{$r}; } else { @@ -978,7 +984,7 @@ sub Select { msg => $msg, payload => $payload, unpack => sub { $self->_unpack_select($namespace, "SELECT", @_) }, - retry => $self->{select_retry}, + retry => $param->{return_fh} ? 1 : $self->{select_retry}, timeout => $param->{timeout} || $self->{select_timeout}, callback => $param->{callback} ? $cb : 0, return_fh=> $param->{return_fh} ? $cb : 0, @@ -1301,7 +1307,7 @@ sub UpdateMulti { unpack => sub { $self->_unpack_affected($flags, $namespace, @_) }, callback => $param->{callback} && $cb, ) or return; - + return 1 if $param->{callback}; return $cb->($r); } @@ -1360,9 +1366,10 @@ C<< Insert, UpdateMulti, Select, Delete, Call >> methods can be given the follow =item B<callback> => sub { my ($data, $error) = @_; } -Async request using AnyEvent. -C<< $data >> is unpacked and processed according to request options data. +Do an async request using AnyEvent. +C<< $data >> contains unpacked and processed according to request options data. C<< $error >> contains a message string in case of error. +Set up C<< raise => 0 >> to use this option. =back @@ -1372,23 +1379,23 @@ C<< Select >> methods can be given the following options: =over -=item <return_fh> => 1 +=item B<return_fh> => 1 The request does only send operation on network, and returns -C<< { fh => $IO::Handle, continue => $code } >>. C<< $code >> reads data from network, -unpacks, processes according to options and returns it. +C<< { fh => $IO_Handle, continue => $code } >> or false if send operation failed. +C<< $code >> reads data from network, unpacks, processes according to options and returns it. -You should handle timeouts manually (using select() call for example). +You should handle timeouts and retries manually (using select() call for example). Usage example: my $continuation = $box->Select(13,{ return_fh => 1 }); ok $continuation, "select/continuation"; - + my $rin = ''; vec($rin,$continuation->{fh}->fileno,1) = 1; my $ein = $rin; ok 0 <= select($rin,undef,$ein,2), "select/continuation/select"; - + my $res = $continuation->{continue}->(); use Data::Dumper; is_deeply $res, [13, 'some_email@test.mail.ru', 1, 2, 3, 4, '123456789'], "select/continuation/result"; diff --git a/connector/perl/t/box.pl b/connector/perl/t/box.pl index a1de32dd90..4025819169 100644 --- a/connector/perl/t/box.pl +++ b/connector/perl/t/box.pl @@ -13,7 +13,7 @@ use FindBin qw($Bin); use lib "$Bin"; use Carp qw/confess/; -use Test::More tests => 296; +use Test::More tests => 337; use Test::Exception; use List::MoreUtils qw/zip/; @@ -28,13 +28,16 @@ use constant TUPLE_NOT_EXISTS => qr/Error 00003102/; use constant TUPLE_EXISTS => qr/Error 00003702/; use constant INDEX_VIOLATION => qr/Error 00003802/; +use constant NO_SUCCESS => qr/no success after/; + use constant TOO_BIG_FIELD => qr/too big field/; -my $box; -my $server = (shift || $ENV{BOX}) or die; -my %opts = ( +our $box; +our $server = (shift || $ENV{BOX}) or die; +our %opts = ( debug => $ENV{DEBUG}||0, ipdebug => $ENV{IPDEBUG}||0, + raise => 1, ); sub cleanup ($) { @@ -174,8 +177,12 @@ do { is_deeply $res, [13, 'some_email@test.mail.ru', 1, 2, 3, 4, '123456789'], "select/continuation/result"; }; +our $ANYEVENT = 1 && eval { require AnyEvent; 1 }; SKIP:{ - skip "AnyEvent not found", 60 unless eval { require AnyEvent; 1 }; + skip "AnyEvent not found", 60 unless $ANYEVENT; + + local $opts{raise} = 0; + $box = $CLASS->new(def_param('l&SSLL&')); my $tt = [ [1, 'rtokarev@corp.mail.ru', 11, 111, 1111, 11111, "1111111111111"], [2, 'vostrikov@corp.mail.ru', 22, 222, 2222, 22222, "2222222222222"], @@ -260,9 +267,259 @@ SKIP:{ $cv->recv; } +sub countwarn { + my ($qr, $counter) = @_; + return sub { + ++$$counter if $_[0] =~ $qr; + warn @_; + }; +}; + +do { + local $server = "127.0.0.1:1111"; + local $opts{raise} = 0; + my $try = 3; + + my $counter = 0; + local $SIG{__WARN__} = countwarn(qr/refused/i, \$counter); + + my $box = $CLASS->new(def_param('l&SSLL&')); + + throws_ok sub{my$x=$box->Select(13,{ want => "arrayref", raise => 1 })}, NO_SUCCESS, "reject/select/raise/sync"; + ok $counter == $try, "reject/select/raise/sync/counter"; + $counter = 0; + + ok !$box->Select(13,{ want => "arrayref", raise => 0 }), "reject/select/noraise/sync"; + ok $counter == $try, "reject/select/noraise/sync/counter"; + $counter = 0; + + my $continuation = $box->Select(13,{ return_fh => 1, raise => 0 }); + ok !$continuation, "reject/select/continuation"; + ok $counter == 1, "reject/select/continuation/counter"; + $counter = 0; + + + SKIP:{ + skip "AnyEvent not found", 5 unless $ANYEVENT; + + AnyEvent->now_update; + my $cv = AnyEvent->condvar; + $cv->begin; + ok $box->Select(4,5,{ callback => sub { + my ($res) = @_; + $cv->end; + ok !$res, "reject/select/async/noraise/cb"; + ok $box->Error, "reject/select/async/noraise/cb/error"; + ok $box->ErrorStr, "reject/select/async/noraise/cb/errorstr"; + }}), "reject/select/async/noraise"; + + $cv->recv; + ok $counter == $try, "reject/select/async/noraise/counter"; + $counter = 0; + } +}; + +do { + my $pid; + local $SIG{INT} = $SIG{TERM} = sub { kill 'TERM', $pid }; + + $pid = fork(); + die unless defined $pid; + unless($pid) { + $0 = "$0 <SERVER>"; + my $stop = 0; + my $h; + my $l = IO::Socket::INET->new( + LocalAddr => '127.0.0.1', + LocalPort => 1111, + Proto => 'tcp', + Listen => 10, + Blocking => 1, + ReuseAddr => 1, + ) or die $!; + $SIG{INT} = $SIG{TERM} = sub { ++$stop; close $l; close $h; exit; }; + while(!$stop) { + $h = $l->accept; + my $data; + while($h->read($data,1024) > 0) { 0; } + close $h; + } + exit; + } + + + local $server = "127.0.0.1:1111"; + local $opts{raise} = 0; + local $opts{timeout} = 0.1; + local $opts{select_timeout} = 0.1; + + my $try = 3; + + my $counter = 0; + local $SIG{__WARN__} = countwarn(qr/timed? ?out/i, \$counter); + + my $box = $CLASS->new(def_param('l&SSLL&')); + + sleep 1; + + throws_ok sub{my$x=$box->Select(13,{ want => "arrayref", raise => 1 })}, NO_SUCCESS, "timeout1/select/raise/sync"; + ok $counter == $try, "timeout1/select/raise/sync/counter"; + $counter = 0; + + ok !$box->Select(13,{ want => "arrayref", raise => 0 }), "timeout1/select/noraise/sync"; + ok $counter == $try, "/counter"; + $counter = 0; + my $continuation = $box->Select(13,{ return_fh => 1, raise => 0 }); + ok $continuation, "timeout1/select/continuation"; + ok !$continuation->{continue}->(), "timeout1/select/continuation/result"; + ok $counter == 1, "timeout1/select/continuation/counter"; + $counter = 0; + SKIP:{ + skip "AnyEvent not found", 5 unless $ANYEVENT; + + AnyEvent->now_update; + my $cv = AnyEvent->condvar; + $cv->begin; + ok $box->Select(4,5,{ callback => sub { + my ($res) = @_; + $cv->end; + ok !$res, "timeout1/select/async/noraise/cb"; + ok $box->Error, "timeout1/select/async/noraise/cb/error"; + ok $box->ErrorStr, "timeout1/select/async/noraise/cb/errorstr"; + }}), "timeout1/select/async/noraise"; + + $cv->recv; + ok $counter == $try, "timeout1/select/async/noraise/counter"; + $counter = 0; + } + + kill 'TERM', $pid; +}; + +do { + my $pid; + local $SIG{INT} = $SIG{TERM} = sub { kill 'TERM', $pid }; + + $pid = fork(); + die unless defined $pid; + unless($pid) { + $0 = "$0 <SERVER>"; + my $stop = 0; + my $h; + my @ok = (0,0,1,0,0,1,1,0,0,1); + my $l = IO::Socket::INET->new( + LocalAddr => '127.0.0.1', + LocalPort => 1111, + Proto => 'tcp', + Listen => 10, + Blocking => 1, + ReuseAddr => 1, + ) or die $!; + my ($host, $port) = split /:/, $server; + my $box = IO::Socket::INET->new( + PeerAddr => $host, + PeerPort => $port, + Proto => 'tcp', + Blocking => 1, + ) or die; + $SIG{INT} = $SIG{TERM} = sub { ++$stop; close $l; close $h; close $box; exit; }; + + while(!$stop) { + $h = $l->accept; + $h->blocking(1); + my $data = ''; + if (shift @ok) { + while(!$stop) { + $h->blocking(0); + $h->read($data,1024,length$data); + if(length$data) { + $h->blocking(1); + $h->read($data,12-length$data,length$data) while length $data < 12; + my ($len) = unpack 'x4L', $data; + $h->read($data,12+$len-length$data,length$data) while length $data < 12+$len; + $box->write($data); + + $data = ''; + $box->read($data,12-length$data, length$data) while length $data < 12; + ($len) = unpack 'x4L', $data; + $box->read($data,12+$len-length$data,length$data) while length $data < 12+$len; + $h->write($data); + close $h; + last; + } + sleep 0.1; + } + } else { + while($h->read($data,1024) > 0) { 0; } + } + close $h; + } + close $l; + close $box; + exit; + } + + + local $server = "127.0.0.1:1111"; + local $opts{raise} = 0; + local $opts{timeout} = 0.1; + local $opts{select_timeout} = 0.1; + + my $try = 2; + + my $counter = 0; + local $SIG{__WARN__} = countwarn(qr/timed? ?out/i, \$counter); + + my $box = $CLASS->new(def_param('l&SSLL&')); + + sleep 1; + + is_deeply $box->Select(13,{ want => "arrayref", raise => 1 }), [[13, 'some_email@test.mail.ru', 1, 2, 3, 4, '123456789']], "timeout2/select/raise/sync"; + ok !$box->Error, "timeout2/select/raise/sync/error"; + ok !$box->ErrorStr, "timeout2/select/raise/sync/errorstr"; + ok $counter == $try, "timeout2/select/raise/sync/counter"; + $counter = 0; + + is_deeply $box->Select(13,{ want => "arrayref", raise => 0 }), [[13, 'some_email@test.mail.ru', 1, 2, 3, 4, '123456789']], "timeout2/select/noraise/sync"; + ok !$box->Error, "timeout2/select/noraise/sync/error"; + ok !$box->ErrorStr, "timeout2/select/noraise/sync/errorstr"; + ok $counter == $try, "timeout2/select/noraise/sync/counter"; + $counter = 0; + + my $continuation = $box->Select(13,{ return_fh => 1, raise => 0, want => 'arrayref' }); + ok $continuation, "timeout2/select/continuation"; + is_deeply $continuation->{continue}->(), [[13, 'some_email@test.mail.ru', 1, 2, 3, 4, '123456789']], "timeout2/select/continuation/result"; + ok !$box->Error, "timeout2/select/continuation/error"; + ok !$box->ErrorStr, "timeout2/select/continuation/errorstr"; + ok $counter == 0, "timeout2/select/continuation/counter"; + $counter = 0; + + + SKIP:{ + skip "AnyEvent not found", 5 unless $ANYEVENT; + + AnyEvent->now_update; + my $cv = AnyEvent->condvar; + $cv->begin; + ok $box->Select(13,{ callback => sub { + my ($res) = @_; + $cv->end; + is_deeply $res, [[13, 'some_email@test.mail.ru', 1, 2, 3, 4, '123456789']], "timeout2/select/async/noraise/cb"; + ok !$box->Error, "timeout2/select/async/noraise/cb/error"; + ok !$box->ErrorStr, "timeout2/select/async/noraise/cb/errorstr"; + }}), "timeout2/select/async/noraise"; + + $cv->recv; + ok $counter == $try, "timeout2/select/async/noraise/counter"; + $counter = 0; + } + + kill 'TERM', $pid; +}; + $box = $CLASS->new(def_param); ok $box->isa($CLASS), 'connect'; -- GitLab