diff --git a/connector/perl/lib/MR/Pending.pm b/connector/perl/lib/MR/Pending.pm index eda0f764bd08503941c3642dbe5a331bbc9344b8..7300447963771e5c6242fedb95564948684bd9dd 100644 --- a/connector/perl/lib/MR/Pending.pm +++ b/connector/perl/lib/MR/Pending.pm @@ -56,20 +56,6 @@ has itertime => ( ); -=head2 secondary_itertime - -Module will do secondary requests (L<onsecondary_retry>) if the first request -have no data after te timeout. - -=cut - -has secondary_itertime => ( - is => 'rw', - isa => 'Num', - predicate => '_has_secondary_itertime', - default => 0.01, -); - =head2 name @@ -102,6 +88,11 @@ has _pending => ( default => sub { {} }, ); +has _ignoring => ( + is => 'ro', + isa => 'HashRef[MR::Pending::Item]', + default => sub { {} } +); =head2 exceptions @@ -127,6 +118,14 @@ has _waitresult => ( ); +has _started_time => ( + is => 'rw', + isa => 'Num', + builder => sub { time }, + lazy => 1, + clearer => '_clear__started_time' +); + =head1 METHODS @@ -196,18 +195,86 @@ sub send { my $pending = $self->_pending; foreach my $shard ( grep { $pending->{$_}->is_sleeping } keys %$pending ) { my $pend = $pending->{$shard}; - if ($pend->try < $pend->retry) { + + if ($pend->try < $pend->retry or !$pend->try) { next unless $pend->is_timeout; - my $cont = $self->runcatch($pend->onretry, ($pend->id, $pend, $self)); + + # don't repead request that have secondary retry + next if $pend->_has_second_retry_delay and $pend->try; + + my $cont = $self->runcatch($pend->onretry, + ($pend->id, $pend, $self)); $pend->set_pending_mode($cont); + } else { + delete $pending->{$shard}; - $self->runcatch($pend->onerror, ($pend->id, "no success after @{[$pend->try]} retries", $pend, $self)); + $self->runcatch($pend->onerror, + ( + $pend->id, + "no success after @{[$pend->try]} retries", + $pend, + $self + ) + ); } } return $self; } + +sub _check_if_second_restart { + my ($self, $pend) = @_; + + return unless $pend->is_pending; + return unless $pend->_has_second_retry_delay; + return unless $pend->is_secondarytimeout; + return if $pend->is_second; + + + my $id = $pend->id; + my $new_id = "$id:second_retry"; + return if exists $self->_pending->{$new_id}; + + my $orig_onerror = $pend->onerror; + my $orig_onok = $pend->onok; + + my $new_pend = $self->_pending->{ $new_id } = ref($pend)->new( + id => $new_id, + try => 0, + timeout => $pend->timeout, + retry_delay => $pend->retry_delay, + retry => $pend->retry, + is_second => 1, + + onretry => $pend->onretry, + _time => $pend->_time, + onok => sub { + +# warn "second pending is done ------- $_[0]"; + splice @_, 0, 1, $id; + + $pend->_set_onok(sub { 1 }), + $pend->_set_onerror(sub { 1 }), + $self->_ignoring->{ $id } = delete $self->_pending->{ $id }; + &$orig_onok + }, + onerror => $pend->onerror + ); + +# warn ">>>>>>>>> started new pending: " . $new_pend->id; + + $pend->_set_onok(sub { +# warn "first pending is done ($new_id) ------- $_[0]"; + $new_pend->_set_onok(sub { 1 }); + $new_pend->_set_onerror(sub { 1 }); + $self->_ignoring->{ $new_id } = delete $self->_pending->{ $new_id }; + &$orig_onok; + }); + + return 1; +} + sub wait { my ($self) = @_; my $pending = $self->_pending; @@ -229,6 +296,10 @@ sub wait { if ($n == 0) { $self->runcatch($self->onidle, ($self)) if $self->_has_onidle; + + for my $pend (grep { $_->is_pending } values %$pending) { + $self->_check_if_second_restart( $pend ); + } return 0; } @@ -241,12 +312,15 @@ sub recv { my ($rin, $ein) = @{$self->_waitresult}; for my $shard (grep { $pending->{$_}->is_pending } keys %$pending) { + next unless exists $pending->{$shard}; my $pend = $pending->{$shard}; my $fileno = $pend->fileno; if (vec($rin, $fileno, 1)) { if (my $list = $pend->continue) { if (ref $list) { - if(defined(my $okay = $self->runcatch($pend->onok, ($pend->id, $list, $pend, $self)))) { + if(defined(my $okay = + $self->runcatch($pend->onok, + ($pend->id, $list, $pend, $self)))) { if($okay) { delete $pending->{$shard}; } else { @@ -259,6 +333,8 @@ sub recv { } } elsif (vec($ein, $fileno, 1)) { $pend->close("connection reset (".$pend->last_error.")"); + } elsif ($pend->_has_second_retry_delay) { + $self->_check_if_second_restart( $pend ); } elsif ($pend->is_timeout) { $pend->close("timeout (".$pend->last_error.")"); } @@ -282,6 +358,7 @@ sub finish { sub iter { my ($self) = @_; + $self->send or return; return if $self->exceptions; @@ -307,9 +384,10 @@ sub work { my ($self) = @_; my $pending = $self->_pending; - my $time0 = time; - while(%$pending and time() - $time0 <= $self->maxtime) { + $self->_clear__started_time; + + while(%$pending and time() - $self->_started_time <= $self->maxtime) { last unless $self->iter; } $self->finish; @@ -328,6 +406,15 @@ sub check_exceptions { return $str; } +sub DEMOLISH { + my ($self) = @_; + for my $pend (values %{ $self->_ignoring }) { + next unless $pend->is_pending; +# warn "waiting for pending(id=@{[$pend->id]}) is done"; + $pend->continue; + } +} + no Mouse; __PACKAGE__->meta->make_immutable(); @@ -361,26 +448,28 @@ has id => ( ); -=head2 onok onerror onretry onsecondary_retry +has is_second => ( + is => 'ro', + isa => 'Bool', + default => 0, +); + +=head2 onok onerror onretry functions that are called on different stages =cut + + has $_ => ( is => 'ro', isa => 'CodeRef', predicate => "_has_$_", + writer => "_set_$_", required => 1, ) for qw/onok onerror onretry/; -has $_ => ( - is => 'ro', - isa => 'CodeRef', - predicate => "_has_$_", - default => sub { sub {} }, -) for qw{onsecondary_retry}; - has $_ => ( is => 'rw', isa => 'Num', @@ -446,6 +535,12 @@ has try => ( writer => '_set_try', ); +has second_retry_delay => ( + is => 'ro', + isa => 'Num', + predicate => '_has_second_retry_delay', +); + # has bornat => ( # is => 'ro', # isa => 'Str', @@ -482,10 +577,27 @@ sub set_sleeping_mode { sub is_timeout { my ($self, $timeout) = @_; - $timeout ||= $self->is_pending ? $self->timeout : $self->retry_delay; + + if ($self->is_pending) { + # second pends is never timeout + return 0 if $self->is_second; + + # if pend has second_retry it is never timeout + return 0 if $self->_has_second_retry_delay; + + $timeout ||= $self->timeout; + } else { + $timeout ||= $self->retry_delay; + } return time() - $self->_time > $timeout; } +sub is_secondarytimeout { + my ($self, $timeout) = @_; + $timeout ||= $self->second_retry_delay; + return time - $self->_time > $timeout; +} + sub continue { my ($self) = @_; my $is_cont = 0; diff --git a/connector/perl/t/05-pending.t b/connector/perl/t/05-pending.t index af3da0b645824085d906171a118ea4dceb51a076..1d6a4fd1ae94c382deb39d24dce635f1ff61c818 100644 --- a/connector/perl/t/05-pending.t +++ b/connector/perl/t/05-pending.t @@ -15,7 +15,7 @@ use lib "$Bin/../lib"; use Carp qw/confess/; -use Test::More tests => 21; +use Test::More tests => 61; use Test::Exception; use List::MoreUtils qw/zip/; @@ -23,6 +23,7 @@ use List::MoreUtils qw/zip/; BEGIN { use_ok 'Test::Tarantool'; use_ok 'MR::Pending'; + use_ok 'Time::HiRes', 'time'; } local $SIG{__DIE__} = \&confess; @@ -42,28 +43,36 @@ use constant TOO_BIG_FIELD => qr/too big field/; our $box; -our $server; # = shift || $ENV{BOX}; +our ($server1, $server2); # = shift || $ENV{BOX}; my $tarantool_config = "$Bin/data/pending.t.cfg"; ok -r $tarantool_config, "-r $tarantool_config"; my $tnt_dir = "$Bin/data"; ok -d $tnt_dir, "-d $tnt_dir"; -my $tnt_srv; +my @tnt_srv; -SKIP: { - skip 'The test uses external tarantool', 2 if $server; +for ($server1, $server2) { + SKIP: { + skip 'The test uses external tarantool', 2 if $_; - $tnt_srv = Test::Tarantool->run( - cfg => $tarantool_config, - script_dir => $tnt_dir - ); - ok $tnt_srv, 'server instance created'; - diag $tnt_srv->log unless ok $tnt_srv->started, 'server is started'; + push @tnt_srv => Test::Tarantool->run( + cfg => $tarantool_config, + script_dir => $tnt_dir + ); + ok $tnt_srv[-1], 'server instance created'; + diag $tnt_srv[-1]->log unless ok $tnt_srv[-1]->started, + 'server is started'; - $server = sprintf '127.0.0.1:%d', $tnt_srv->primary_port; + $_ = sprintf '127.0.0.1:%d', $tnt_srv[-1]->primary_port; + } } +sub server_id($) { + my $box = shift; + $box->Call(tst_pending_server_name => [], { unpack_format => '$' })->[0][0]; +} + our %opts = ( debug => $ENV{DEBUG}||0, ipdebug => $ENV{IPDEBUG}||0, @@ -81,6 +90,7 @@ sub cleanup ($) { sub def_param { my $format = shift || 'l& SSLL'; + my $server = shift || $server1; return { servers => $server, name => $CLASS, namespaces => [ { @@ -101,13 +111,16 @@ sub def_param { } } -$box = $CLASS->new(def_param('l&SSLL&')); +$box = $CLASS->new(def_param('l&SSLL&', $server1)); ok $box->isa($CLASS), 'connect'; -my $box2 = $CLASS->new(def_param('l&SSLL&')); +my $box2 = $CLASS->new(def_param('l&SSLL&', $server2)); ok $box2->isa($CLASS), 'connect'; +ok $box->Insert(151274, 'box1', 1, 2, 3, 4, 5), 'insert the first id'; +ok $box2->Insert(151274, 'box2', 1, 2, 3, 4, 5), 'insert the second id'; + my $tt = [ [1, 'rtokarev@corp.mail.ru', 11, 111, 1111, 11111, "1111111111111"], @@ -143,3 +156,232 @@ is_deeply $cdelayed->{continue}->(), [['test']], 'lua functions are working with return_fh'; +is server_id($box), 'box1', 'server1 id'; +is server_id($box2), 'box2', 'server2 id'; + + +my @box = ($box, $box2); + +my $onok = sub { + my ($i, $data) = @_; + is_deeply $data, [[[ 'box' . ($i + 1) ]]], "selected server@{[ $i + 1]} id"; + 1; +}; + +my $onerror = sub { + my ($i, $err) = @_; + fail "select $i failed"; +}; + +my $ontry = sub { + my ($i) = @_; + return $box[$i]->Call( + tst_pending_server_name => [], + { + return_fh => 1, + unpack_format => '$' + } + ); +}; + +MR::Pending->new( + name => "PENDINGTEST", + maxtime => 1.1, + itertime => 0.01, + pending => [map { MR::Pending::Item->new( + id => $_, + onok => $onok, + onerror => $onerror, + onretry => $ontry, + timeout => 0.5, + retry_delay => 0.001, + retry => 3, + ) } (0, 1)], +)->work; + + + +my $started = time; +my @done; + +$ontry = sub { + my ($i, $pend) = @_; + $i = 0 if $pend->is_second; + return $box[$i]->Call( + tst_pending_server_pause => [], + { + return_fh => 1, + unpack_format => '$' + } + ); +}; + +$onok = sub { + my ($i, $data) = @_; + $done[$i] = time - $started; + is_deeply $data, [[[ 'box' . ($i + 1) ]]], "selected server@{[ $i + 1]} id"; +}; + +MR::Pending->new( + name => "PENDINGTEST", + maxtime => 1.1, + itertime => 0.01, + pending => [map { MR::Pending::Item->new( + id => $_, + onok => $onok, + onerror => $onerror, + onretry => $ontry, + timeout => 0.8, + retry_delay => 0.001, + retry => 3, + ) } (0, 1)], +)->work; + +cmp_ok $done[0], '<', .15, 'first server response time less than .15 seconds'; +cmp_ok $done[0], '>', .1, 'first server response time more than .1 seconds'; + +cmp_ok $done[1], '<', 1.1, 'first server response time less than 1.1 seconds'; +cmp_ok $done[1], '>', 1, 'first server response time more than 1 seconds'; + + + + +note '** check onsecondary_retry'; + +for ( 1 .. 2) { + my @res = ([], []); + $started = time; + + { + MR::Pending->new( + name => "PENDINGTEST", + maxtime => 1.1, + itertime => 0.01, + pending => [ + MR::Pending::Item->new( + id => 0, + onok => sub { + like $_[0], qr{^\d+$}, + "first request is done id: $_[0]"; + push @{ $res[ $_[0] ] } => { + box => $_[1][0][0][0], + time => time - $started + }; + return 1; + }, + onerror => $onerror, + onretry => $ontry, + timeout => .9, + retry_delay => 0.4, + retry => 3, + ), + MR::Pending::Item->new( + id => 1, + onok => sub { + like $_[0], qr{^\d+$}, + "second request is done id: $_[0]"; + push @{ $res[ $_[0] ] } => { + box => $_[1][0][0][0], + time => time - $started + }; + return 1; + }, + onerror => $onerror, + onretry => $ontry, + timeout => 0.9, + second_retry_delay => 0.4, + retry_delay => 0.4, + retry => 3, + ), + ] + )->work; + + } + + is scalar @{ $res[0] }, 1, 'first callback was touched once'; + is scalar @{ $res[1] }, 1, 'second callback was touched once'; + is $res[0][0]{box}, $res[1][0]{box}, 'both requests were done by one box'; + is $res[0][0]{box}, 'box1', 'it was a box1'; +} + + +my $onsecondary_retry_touched = 0; +$ontry = sub { + my ($i, $pend) = @_; + $i = 0 if $pend->is_second; + $onsecondary_retry_touched++ if $pend->is_second; + return $box[$i]->Call( + tst_pending_server_pause => [ $pend->is_second ? 3 : 1 ], + { + return_fh => 1, + unpack_format => '$' + } + ); +}; + + +$started = time; +$box->Call(tst_pending_server_pause => [ .3 ], { unpack_format => '$' }); +cmp_ok time - $started, '>=', .3, 'tst_pending_server_pause(.3)'; +cmp_ok time - $started, '<', .4, 'tst_pending_server_pause(.3)'; + +note "** another test"; +$started = time; +my @res = ([], []); +{ + my $pd = MR::Pending->new( + name => "PENDINGTEST", + maxtime => 1.5, + itertime => 0.01, + pending => [ + MR::Pending::Item->new( + id => 0, + onok => sub { + like $_[0], qr{^\d+$}, + "first request is done id: $_[0]"; + push @{ $res[ $_[0] ] } => { + box => $_[1][0][0][0], + time => time - $started + }; + return 1; + }, + onerror => $onerror, + onretry => $ontry, + timeout => 3, + retry_delay => 0.4, + retry => 3, + ), + MR::Pending::Item->new( + id => 1, + onok => sub { + like $_[0], qr{^\d+$}, + "second request is done id: $_[0]"; + push @{ $res[ $_[0] ] } => { + box => $_[1][0][0][0], + time => time - $started + }; + return 1; + }, + onerror => $onerror, + onretry => $ontry, + timeout => 3, + second_retry_delay => 0.1, + retry_delay => 0.4, + retry => 3, + ), + ] + ); + $pd->work; + is scalar @{ $res[0] }, 1, 'first callback was touched once'; + is scalar @{ $res[1] }, 1, 'second callback was touched once'; + is $res[0][0]{box}, 'box1', 'first replay was from box1'; + cmp_ok $res[0][0]{time}, '>=', 1, 'first replay took more than 1 second'; + cmp_ok $res[1][0]{time}, '>=', 1, 'second replay took more than 1 second'; + is $res[1][0]{box}, 'box2', 'second replay was from box2'; + cmp_ok time - $started, '<', 1.1, 'Both requests got less than 1.1 second'; + is $onsecondary_retry_touched, 1, 'onsecondary_retry touched once'; + +} + +cmp_ok time - $started, '>', 3, 'Destructor waited for longer request'; + diff --git a/connector/perl/t/data/init.lua b/connector/perl/t/data/init.lua index 76a0226851a21bd8423a0bf16e9ee2d24ec2a0ea..f0e40ab6a60c3249325428640f96c8fb46e1b67f 100644 --- a/connector/perl/t/data/init.lua +++ b/connector/perl/t/data/init.lua @@ -2,6 +2,39 @@ function tst_lua_test() return 'test' end + +function tst_pending_server_name() + local tuple = box.select( 0, 0, box.pack('i', 151274) ) + if tuple == nil then + return { 'unknown' } + else + return { tuple[1] } + end +end + +function tst_pending_server_pause(delay) + local sname = tst_pending_server_name() + + if delay == nil then + + if sname[1] == 'box1' then + box.fiber.sleep(.1) + return sname + end + + if sname[1] == 'box2' then + box.fiber.sleep(1) + return sname + end + else + box.fiber.sleep(delay) + end + + return sname +end + + + function tst_server_name() local tuple = box.select( 0, 0, box.pack('i', 1) ) if tuple == nil then