diff --git a/connector/perl/lib/MR/Pending.pm b/connector/perl/lib/MR/Pending.pm index c95aeeeeadd095e9b4b72100a64b35f573e6ac76..a874b78eb641aa2fb211c4e1d0b6a9d93376ed16 100644 --- a/connector/perl/lib/MR/Pending.pm +++ b/connector/perl/lib/MR/Pending.pm @@ -196,7 +196,7 @@ sub send { foreach my $shard ( grep { $pending->{$_}->is_sleeping } keys %$pending ) { my $pend = $pending->{$shard}; - if (($pend->_has_retry or $pend->try < $pend->retry) or !$pend->try) { + if ($pend->try < $pend->retry or !$pend->try) { next unless $pend->is_timeout; # don't repead request that have secondary retry @@ -222,6 +222,59 @@ sub send { return $self; } + +sub _check_if_second_restart { + my ($self, $pend) = @_; + + return unless $pend->is_pending; + return unless $pend->_has_onsecondary_retry; + return unless $pend->is_secondarytimeout; + return if $pend->is_second_pend; + + + my $id = $pend->id; + my $new_id = "$id:second_retry"; + return if exists $self->_pending->{$new_id}; + + my $orig_onerror = $pend->onerror; + my $orig_onok = $pend->onok; + + my $new_pend = $self->_pending->{ $new_id } = ref($pend)->new( + id => $new_id, + try => 0, + timeout => $pend->timeout, + retry_delay => $pend->retry_delay, + retry => $pend->retry, + is_second_pend => 1, + + onretry => $pend->onsecondary_retry, + _time => $pend->_time, + onok => sub { + +# warn "second pending is done ------- $_[0]"; + splice @_, 0, 1, $id; + + $pend->_set_onok(sub { 1 }), + $pend->_set_onerror(sub { 1 }), + $self->_ignoring->{ $id } = delete $self->_pending->{ $id }; + &$orig_onok + }, + onerror => $pend->onerror + ); + +# warn ">>>>>>>>> started new pending: " . $new_pend->id; + + $pend->_set_onok(sub { +# warn "first pending is done ($new_id) ------- $_[0]"; + $new_pend->_set_onok(sub { 1 }); + $new_pend->_set_onerror(sub { 1 }); + $self->_ignoring->{ $new_id } = delete $self->_pending->{ $new_id }; + &$orig_onok; + }); + + return 1; +} + sub wait { my ($self) = @_; my $pending = $self->_pending; @@ -243,10 +296,13 @@ sub wait { if ($n == 0) { $self->runcatch($self->onidle, ($self)) if $self->_has_onidle; + + for my $pend (grep { $_->is_pending } values %$pending) { + $self->_check_if_second_restart( $pend ); + } return 0; } - return $n; } @@ -256,6 +312,7 @@ sub recv { my ($rin, $ein) = @{$self->_waitresult}; for my $shard (grep { $pending->{$_}->is_pending } keys %$pending) { + next unless exists $pending->{$shard}; my $pend = $pending->{$shard}; my $fileno = $pend->fileno; if (vec($rin, $fileno, 1)) { @@ -277,55 +334,8 @@ sub recv { } elsif (vec($ein, $fileno, 1)) { $pend->close("connection reset (".$pend->last_error.")"); } elsif ($pend->_has_onsecondary_retry) { - - my $orig_onerror = $pend->onerror; - my $orig_onok = $pend->onok; - - my $id = $pend->id; - - - my $new_id = "$id:second_retry"; - - unless (exists $self->_pending->{$new_id}) { - - - my $new_pend = $self->_pending->{ $new_id } = ref($pend)->new( - id => $new_id, - try => 0, - timeout => $pend->timeout, - retry_delay => $pend->retry_delay, - retry => $pend->retry, - - onretry => $pend->onsecondary_retry, - _time => $pend->_time - $pend->retry_delay, - onok => sub { - - splice @_, 0, 1, $id; - - $pend->_set_onok(sub { 1 }), - $pend->_set_onerror(sub { 1 }), - $self->_ignoring->{ $id } = - delete $self->_pending->{ $id }; - &$orig_onok - }, - onerror => $pend->onerror - ); - - - $pend->_set_second_pend( $pend ); - $pend->_clear__onsecondary_retry; - $pend->_set_onok(sub { - warn "first pending is done ------- $_[0]"; - delete $self->_pending->{ $new_id }; - $self->_ignoring->{ $new_id } = $new_pend; - $new_pend->_set_onok(sub { 1 }); - $new_pend->_set_onerror(sub { 1 }); - &$orig_onok; - }); - } - - } - elsif ($pend->is_timeout) { + $self->_check_if_second_restart( $pend ); + } elsif ($pend->is_timeout) { $pend->close("timeout (".$pend->last_error.")"); } } @@ -348,6 +358,7 @@ sub finish { sub iter { my ($self) = @_; + $self->send or return; return if $self->exceptions; @@ -397,8 +408,11 @@ sub check_exceptions { sub DEMOLISH { my ($self) = @_; - - $self->_ignoring->{ $_ }->continue for keys %{ $self->_ignoring }; + for my $pend (values %{ $self->_ignoring }) { + next unless $pend->is_pending; +# warn "waiting for pending(id=@{[$pend->id]}) is done"; + $pend->continue; + } } no Mouse; @@ -434,11 +448,10 @@ has id => ( ); -has _second_pend => ( +has is_second_pend => ( is => 'ro', - isa => 'MR::Pending::Item', - writer => '_set_second_pend', - predicate => '_has_second_pend', + isa => 'Bool', + default => 0, ); =head2 onok onerror onretry onsecondary_retry @@ -571,7 +584,18 @@ sub set_sleeping_mode { sub is_timeout { my ($self, $timeout) = @_; - $timeout ||= $self->is_pending ? $self->timeout : $self->retry_delay; + + if ($self->is_pending) { + # second pends is never timeout + return 0 if $self->is_second_pend; + + # if pend has second_retry it is never timeout + return 0 if $self->_has_onsecondary_retry; + + $timeout ||= $self->timeout; + } else { + $timeout ||= $self->retry_delay; + } return time() - $self->_time > $timeout; } diff --git a/connector/perl/t/05-pending.t b/connector/perl/t/05-pending.t index 3e603c8afac6880f77129f60ac47ad9de576c7ad..5b9edbcfd895ab1d2ae920dc86e788a1fdbd91aa 100644 --- a/connector/perl/t/05-pending.t +++ b/connector/perl/t/05-pending.t @@ -15,7 +15,7 @@ use lib "$Bin/../lib"; use Carp qw/confess/; -use Test::More tests => 50; +use Test::More tests => 59; use Test::Exception; use List::MoreUtils qw/zip/; @@ -308,9 +308,98 @@ for ( 1 .. 2) { } - is scalar @res, 2, 'count of results'; is scalar @{ $res[0] }, 1, 'first callback was touched once'; is scalar @{ $res[1] }, 1, 'second callback was touched once'; is $res[0][0]{box}, $res[1][0]{box}, 'both requests were done by one box'; is $res[0][0]{box}, 'box1', 'it was a box1'; } + + +$ontry = sub { + my ($i) = @_; + return $box[$i]->Call( + tst_pending_server_pause => [ 1 ], + { + return_fh => 1, + unpack_format => '$' + } + ); +}; + + +my $onsecondary_retry_touched = 0; +$onsecondary_retry = sub { + $onsecondary_retry_touched++; + return $box->Call( + tst_pending_server_pause => [ 3 ], + { + return_fh => 1, + unpack_format => '$' + } + ); +}; + + +$started = time; +$box->Call(tst_pending_server_pause => [ .3 ], { unpack_format => '$' }); +cmp_ok time - $started, '>=', .3, 'tst_pending_server_pause(.3)'; +cmp_ok time - $started, '<', .4, 'tst_pending_server_pause(.3)'; + +note "** another test"; +$started = time; +my @res = ([], []); +{ + my $pd = MR::Pending->new( + name => "PENDINGTEST", + maxtime => 1.5, + itertime => 0.01, + pending => [ + MR::Pending::Item->new( + id => 0, + onok => sub { + like $_[0], qr{^\d+$}, + "first request is done id: $_[0]"; + push @{ $res[ $_[0] ] } => { + box => $_[1][0][0][0], + time => time - $started + }; + return 1; + }, + onerror => $onerror, + onretry => $ontry, + timeout => 3, + retry_delay => 0.4, + retry => 3, + ), + MR::Pending::Item->new( + id => 1, + onok => sub { + like $_[0], qr{^\d+$}, + "second request is done id: $_[0]"; + push @{ $res[ $_[0] ] } => { + box => $_[1][0][0][0], + time => time - $started + }; + return 1; + }, + onerror => $onerror, + onretry => $ontry, + onsecondary_retry => $onsecondary_retry, + timeout => 3, + second_retry_delay => 0.1, + retry_delay => 0.4, + retry => 3, + ), + ] + ); + $pd->work; + is scalar @{ $res[0] }, 1, 'first callback was touched once'; + is scalar @{ $res[1] }, 1, 'second callback was touched once'; + is $res[0][0]{box}, 'box1', 'it was a box1'; + is $res[1][0]{box}, 'box2', 'it was a box2'; + cmp_ok time - $started, '<', 2, 'Both requests got less than 2 second'; + is $onsecondary_retry_touched, 1, 'onsecondary_retry touched once'; +} + +cmp_ok time - $started, '>', 3, 'Destructor waited for longer request'; + diff --git a/connector/perl/t/data/init.lua b/connector/perl/t/data/init.lua index a15cf43fe12c70ec82bb3998810c090eac5e071f..f0e40ab6a60c3249325428640f96c8fb46e1b67f 100644 --- a/connector/perl/t/data/init.lua +++ b/connector/perl/t/data/init.lua @@ -12,17 +12,22 @@ function tst_pending_server_name() end end -function tst_pending_server_pause() +function tst_pending_server_pause(delay) local sname = tst_pending_server_name() + + if delay == nil then - if sname[1] == 'box1' then - box.fiber.sleep(.1) - return sname - end + if sname[1] == 'box1' then + box.fiber.sleep(.1) + return sname + end - if sname[1] == 'box2' then - box.fiber.sleep(1) - return sname + if sname[1] == 'box2' then + box.fiber.sleep(1) + return sname + end + else + box.fiber.sleep(delay) end return sname