From fe83fe5db87971b01868201892d1c88bb83d000c Mon Sep 17 00:00:00 2001
From: "Dmitry E. Oboukhov" <unera@debian.org>
Date: Wed, 4 Jul 2012 10:55:59 +0400
Subject: [PATCH] cleanup code

---
 connector/perl/lib/MR/Pending.pm | 140 ++++++++++++++++++-------------
 connector/perl/t/05-pending.t    |  93 +++++++++++++++++++-
 connector/perl/t/data/init.lua   |  21 +++--
 3 files changed, 186 insertions(+), 68 deletions(-)

diff --git a/connector/perl/lib/MR/Pending.pm b/connector/perl/lib/MR/Pending.pm
index c95aeeeead..a874b78eb6 100644
--- a/connector/perl/lib/MR/Pending.pm
+++ b/connector/perl/lib/MR/Pending.pm
@@ -196,7 +196,7 @@ sub send {
     foreach my $shard ( grep { $pending->{$_}->is_sleeping } keys %$pending ) {
         my $pend = $pending->{$shard};
 
-        if (($pend->_has_retry or $pend->try < $pend->retry) or !$pend->try) {
+        if ($pend->try < $pend->retry or !$pend->try) {
             next unless $pend->is_timeout;
 
             # don't repead request that have secondary retry
@@ -222,6 +222,59 @@ sub send {
     return $self;
 }
 
+
+sub _check_if_second_restart {
+    my ($self, $pend) = @_;
+
+    return unless $pend->is_pending;
+    return unless $pend->_has_onsecondary_retry;
+    return unless $pend->is_secondarytimeout;
+    return if $pend->is_second_pend;
+
+
+    my $id = $pend->id;
+    my $new_id = "$id:second_retry";
+    return if exists  $self->_pending->{$new_id};
+
+    my $orig_onerror    = $pend->onerror;
+    my $orig_onok       = $pend->onok;
+
+    my $new_pend = $self->_pending->{ $new_id } = ref($pend)->new(
+        id              => $new_id,
+        try             => 0,
+        timeout         => $pend->timeout,
+        retry_delay     => $pend->retry_delay,
+        retry           => $pend->retry,
+        is_second_pend  => 1,
+
+        onretry         => $pend->onsecondary_retry,
+        _time           => $pend->_time,
+        onok            => sub {
+
+#             warn "second pending is done ------- $_[0]";
+            splice @_, 0, 1, $id;
+
+            $pend->_set_onok(sub { 1 }),
+            $pend->_set_onerror(sub { 1 }),
+            $self->_ignoring->{ $id } = delete $self->_pending->{ $id };
+            &$orig_onok
+        },
+        onerror => $pend->onerror
+    );
+
+#     warn ">>>>>>>>> started new pending: " . $new_pend->id;
+
+    $pend->_set_onok(sub {
+#         warn "first pending is done ($new_id) ------- $_[0]";
+        $new_pend->_set_onok(sub { 1 });
+        $new_pend->_set_onerror(sub { 1 });
+        $self->_ignoring->{ $new_id } = delete $self->_pending->{ $new_id };
+        &$orig_onok;
+    });
+
+    return 1;
+}
+
 sub wait {
     my ($self) = @_;
     my $pending = $self->_pending;
@@ -243,10 +296,13 @@ sub wait {
 
     if ($n == 0) {
         $self->runcatch($self->onidle, ($self)) if $self->_has_onidle;
+
+        for my $pend (grep { $_->is_pending } values %$pending) {
+            $self->_check_if_second_restart( $pend );
+        }
         return 0;
     }
 
-
     return $n;
 }
 
@@ -256,6 +312,7 @@ sub recv {
     my ($rin, $ein) = @{$self->_waitresult};
 
     for my $shard (grep { $pending->{$_}->is_pending } keys %$pending) {
+        next unless exists $pending->{$shard};
         my $pend = $pending->{$shard};
         my $fileno = $pend->fileno;
         if (vec($rin, $fileno, 1)) {
@@ -277,55 +334,8 @@ sub recv {
         } elsif (vec($ein, $fileno, 1)) {
             $pend->close("connection reset (".$pend->last_error.")");
         } elsif ($pend->_has_onsecondary_retry) {
-
-            my $orig_onerror = $pend->onerror;
-            my $orig_onok = $pend->onok;
-
-            my $id = $pend->id;
-
-
-            my $new_id = "$id:second_retry";
-
-            unless (exists $self->_pending->{$new_id}) {
-
-
-                my $new_pend = $self->_pending->{ $new_id } = ref($pend)->new(
-                    id              => $new_id,
-                    try             => 0,
-                    timeout         => $pend->timeout,
-                    retry_delay     => $pend->retry_delay,
-                    retry           => $pend->retry,
-
-                    onretry         => $pend->onsecondary_retry,
-                    _time           => $pend->_time - $pend->retry_delay,
-                    onok            => sub {
-
-                        splice @_, 0, 1, $id;
-
-                        $pend->_set_onok(sub { 1 }),
-                        $pend->_set_onerror(sub { 1 }),
-                        $self->_ignoring->{ $id } =
-                            delete $self->_pending->{ $id };
-                        &$orig_onok
-                    },
-                    onerror => $pend->onerror
-                );
-
-
-                $pend->_set_second_pend( $pend );
-                $pend->_clear__onsecondary_retry;
-                $pend->_set_onok(sub {
-                    warn "first pending is done ------- $_[0]";
-                    delete $self->_pending->{ $new_id };
-                    $self->_ignoring->{ $new_id } = $new_pend;
-                    $new_pend->_set_onok(sub { 1 });
-                    $new_pend->_set_onerror(sub { 1 });
-                    &$orig_onok;
-                });
-            }
-
-        }
-        elsif ($pend->is_timeout) {
+            $self->_check_if_second_restart( $pend );
+        } elsif ($pend->is_timeout) {
             $pend->close("timeout (".$pend->last_error.")");
         }
     }
@@ -348,6 +358,7 @@ sub finish {
 sub iter {
     my ($self) = @_;
 
+
     $self->send or return;
     return if $self->exceptions;
 
@@ -397,8 +408,11 @@ sub check_exceptions {
 
 sub DEMOLISH {
     my ($self) = @_;
-
-    $self->_ignoring->{ $_ }->continue for keys %{ $self->_ignoring };
+    for my $pend (values %{ $self->_ignoring }) {
+        next unless $pend->is_pending;
+#         warn "waiting for pending(id=@{[$pend->id]}) is done";
+        $pend->continue;
+    }
 }
 
 no Mouse;
@@ -434,11 +448,10 @@ has id => (
 );
 
 
-has _second_pend => (
+has is_second_pend => (
     is          => 'ro',
-    isa         => 'MR::Pending::Item',
-    writer      => '_set_second_pend',
-    predicate   => '_has_second_pend',
+    isa         => 'Bool',
+    default     => 0,
 );
 
 =head2 onok onerror onretry onsecondary_retry
@@ -571,7 +584,18 @@ sub set_sleeping_mode {
 
 sub is_timeout {
     my ($self, $timeout) = @_;
-    $timeout ||= $self->is_pending ? $self->timeout : $self->retry_delay;
+
+    if ($self->is_pending) {
+        # second pends is never timeout
+        return 0 if $self->is_second_pend;
+
+        # if pend has second_retry it is never timeout
+        return 0 if $self->_has_onsecondary_retry;
+
+        $timeout ||= $self->timeout;
+    } else {
+        $timeout ||= $self->retry_delay;
+    }
     return time() - $self->_time > $timeout;
 }
 
diff --git a/connector/perl/t/05-pending.t b/connector/perl/t/05-pending.t
index 3e603c8afa..5b9edbcfd8 100644
--- a/connector/perl/t/05-pending.t
+++ b/connector/perl/t/05-pending.t
@@ -15,7 +15,7 @@ use lib "$Bin/../lib";
 
 use Carp qw/confess/;
 
-use Test::More tests => 50;
+use Test::More tests => 59;
 use Test::Exception;
 
 use List::MoreUtils qw/zip/;
@@ -308,9 +308,98 @@ for ( 1 .. 2) {
 
     }
 
-    is scalar @res, 2, 'count of results';
     is scalar @{ $res[0] }, 1, 'first callback was touched once';
     is scalar @{ $res[1] }, 1, 'second callback was touched once';
     is $res[0][0]{box}, $res[1][0]{box}, 'both requests were done by one box';
     is $res[0][0]{box}, 'box1', 'it was a box1';
 }
+
+
+$ontry = sub {
+    my ($i) = @_;
+    return $box[$i]->Call(
+        tst_pending_server_pause => [ 1 ],
+        {
+            return_fh       => 1,
+            unpack_format   => '$'
+        }
+    );
+};
+
+
+my $onsecondary_retry_touched = 0;
+$onsecondary_retry = sub {
+    $onsecondary_retry_touched++;
+    return $box->Call(
+        tst_pending_server_pause => [ 3 ],
+        {
+            return_fh       => 1,
+            unpack_format   => '$'
+        }
+    );
+};
+
+
+$started = time;
+$box->Call(tst_pending_server_pause => [ .3 ], { unpack_format => '$' });
+cmp_ok time - $started, '>=', .3, 'tst_pending_server_pause(.3)';
+cmp_ok time - $started, '<', .4, 'tst_pending_server_pause(.3)';
+
+note "** another test";
+$started = time;
+my @res = ([], []);
+{
+    my $pd = MR::Pending->new(
+        name                => "PENDINGTEST",
+        maxtime             => 1.5,
+        itertime            => 0.01,
+        pending  => [
+            MR::Pending::Item->new(
+                id           => 0,
+                onok         => sub {
+                    like $_[0], qr{^\d+$},
+                        "first request is done id: $_[0]";
+                    push @{ $res[ $_[0] ] } => {
+                        box     => $_[1][0][0][0],
+                        time    => time - $started
+                    };
+                    return 1;
+                },
+                onerror      => $onerror,
+                onretry      => $ontry,
+                timeout      => 3,
+                retry_delay  => 0.4,
+                retry        => 3,
+            ),
+            MR::Pending::Item->new(
+                id                  => 1,
+                onok                => sub {
+                    like $_[0], qr{^\d+$},
+                        "second request is done id: $_[0]";
+                    push @{ $res[ $_[0] ] } => {
+                        box     => $_[1][0][0][0],
+                        time    => time - $started
+                    };
+                    return 1;
+                },
+                onerror             => $onerror,
+                onretry             => $ontry,
+                onsecondary_retry   => $onsecondary_retry,
+                timeout             => 3,
+                second_retry_delay  => 0.1,
+                retry_delay  => 0.4,
+                retry        => 3,
+            ),
+        ]
+    );
+    $pd->work;
+    is scalar @{ $res[0] }, 1, 'first callback was touched once';
+    is scalar @{ $res[1] }, 1, 'second callback was touched once';
+    is $res[0][0]{box}, 'box1', 'it was a box1';
+    is $res[1][0]{box}, 'box2', 'it was a box2';
+    cmp_ok time - $started, '<', 2, 'Both requests got less than 2 second';
+    is $onsecondary_retry_touched, 1, 'onsecondary_retry touched once';
+}
+
+cmp_ok time - $started, '>', 3, 'Destructor waited for longer request';
+
diff --git a/connector/perl/t/data/init.lua b/connector/perl/t/data/init.lua
index a15cf43fe1..f0e40ab6a6 100644
--- a/connector/perl/t/data/init.lua
+++ b/connector/perl/t/data/init.lua
@@ -12,17 +12,22 @@ function tst_pending_server_name()
     end
 end
 
-function tst_pending_server_pause()
+function tst_pending_server_pause(delay)
     local sname = tst_pending_server_name()
+
+    if delay == nil then
     
-    if sname[1] == 'box1' then
-        box.fiber.sleep(.1)
-        return sname
-    end
+        if sname[1] == 'box1' then
+            box.fiber.sleep(.1)
+            return sname
+        end
 
-    if sname[1] == 'box2' then
-        box.fiber.sleep(1)
-        return sname
+        if sname[1] == 'box2' then
+            box.fiber.sleep(1)
+            return sname
+        end
+    else
+        box.fiber.sleep(delay)
     end
 
     return sname
-- 
GitLab