From 8e0af4be6e8dd69414f56eb64f1f76b41959ee32 Mon Sep 17 00:00:00 2001
From: Yuriy Nevinitsin <nevinitsin@corp.mail.ru>
Date: Wed, 7 Mar 2012 19:47:23 +0400
Subject: [PATCH] [connector][perl] AnyEvent support. Continuation support.
 Limit = max_sint32 by default. primary_key_index option.

---
 connector/perl/lib/MR/IProto.pm               |  33 ++-
 .../perl/lib/MR/IProto/Connection/Async.pm    |   3 +
 .../perl/lib/MR/IProto/Connection/Sync.pm     |  12 +-
 connector/perl/lib/MR/Tarantool/Box.pm        | 256 ++++++++++++------
 connector/perl/t/box.pl                       | 124 ++++++++-
 5 files changed, 329 insertions(+), 99 deletions(-)

diff --git a/connector/perl/lib/MR/IProto.pm b/connector/perl/lib/MR/IProto.pm
index 152b226620..671ca29553 100644
--- a/connector/perl/lib/MR/IProto.pm
+++ b/connector/perl/lib/MR/IProto.pm
@@ -226,22 +226,32 @@ sub send {
         die "Method must be called in void context if you want to use async" if defined wantarray;
         $self->_send($message, $callback);
         return;
-    }
-    else {
+    } else {
         die "Method must be called in scalar context if you want to use sync" unless defined wantarray;
         my $olddie = ref $SIG{__DIE__} eq 'CODE' ? $SIG{__DIE__} : ref $SIG{__DIE__} eq 'GLOB' ? *{$SIG{__DIE__}}{CODE} : undef;
         local $SIG{__DIE__} = sub { local $! = 0; $olddie->(@_); } if $olddie;
         my %servers;
         my ($data, $error, $errno);
-        $self->_send_now($message, sub {
+        my $fh = $self->_send_now($message, sub {
             ($data, $error) = @_;
             $errno = $!;
             return;
         }, \%servers);
-        $self->_recv_now(\%servers);
-        $! = $errno;
-        die $error if $error;
-        return $data;
+
+        my $cont = sub {
+            $self->_recv_now(\%servers, max => $message->{continue}?1:0);
+            $! = $errno;
+            return $message->{continue}->($data, $error, $errno) if $message->{continue};
+            die $error if $error;
+            return $data;
+        };
+
+        return {
+            fh       => $fh,
+            continue => $cont,
+        } if $message->{continue};
+
+        return &$cont();
     }
 }
 
@@ -528,8 +538,7 @@ sub _send_now {
         );
         return;
     };
-    $self->_send_try($sync, $args, $handler, $try);
-    return;
+    return $self->_send_try($sync, $args, $handler, $try);
 }
 
 sub _send_try {
@@ -540,7 +549,7 @@ sub _send_try {
     my $connection = $server->$xsync();
     $connection->send($args->{msg}, $args->{body}, $handler, $args->{no_reply}, $args->{sync});
     $sync->{$connection} ||= $connection if $sync;
-    return;
+    return $connection->fh;
 }
 
 sub _send_retry {
@@ -623,10 +632,10 @@ sub _server_callback {
 }
 
 sub _recv_now {
-    my ($self, $servers) = @_;
+    my ($self, $servers, %opts) = @_;
     while(my @servers = values %$servers) {
         %$servers = ();
-        $_->recv_all() foreach @servers;
+        $_->recv_all(%opts) foreach @servers;
     }
     return;
 }
diff --git a/connector/perl/lib/MR/IProto/Connection/Async.pm b/connector/perl/lib/MR/IProto/Connection/Async.pm
index d78b9180ff..2fb32bc023 100644
--- a/connector/perl/lib/MR/IProto/Connection/Async.pm
+++ b/connector/perl/lib/MR/IProto/Connection/Async.pm
@@ -19,6 +19,7 @@ use Scalar::Util qw(weaken);
 has _handle => (
     is  => 'ro',
     isa => 'AnyEvent::Handle',
+    predicate => '_has_handle',
     lazy_build => 1,
 );
 
@@ -69,6 +70,8 @@ For list of arguments see L</_send>.
 
 =cut
 
+sub fh { return $_[0]->_has_handle && $_[0]->_handle }
+
 sub send {
     my $self = shift;
     if( $self->_in_progress < $self->max_parallel ) {
diff --git a/connector/perl/lib/MR/IProto/Connection/Sync.pm b/connector/perl/lib/MR/IProto/Connection/Sync.pm
index 7c3da90c46..9e89ba7cc1 100644
--- a/connector/perl/lib/MR/IProto/Connection/Sync.pm
+++ b/connector/perl/lib/MR/IProto/Connection/Sync.pm
@@ -20,6 +20,7 @@ use Socket qw( TCP_NODELAY SO_KEEPALIVE SO_SNDTIMEO SO_RCVTIMEO );
 has _socket => (
     is  => 'ro',
     isa => 'IO::Socket::INET',
+    predicate => '_has_socket',
     lazy_build => 1,
 );
 
@@ -32,12 +33,18 @@ has _sent => (
 
 =over
 
+=item fh
+
+Returns socket.
+
 =item send
 
 See L<MR::IProto::Connection/send> for more information.
 
 =cut
 
+sub fh { return $_[0]->_has_socket && $_[0]->_socket }
+
 sub send {
     my ($self, $msg, $payload, $callback, $no_reply, $sync) = @_;
     my $server = $self->server;
@@ -95,11 +102,12 @@ sub send {
 }
 
 sub recv_all {
-    my ($self) = @_;
+    my ($self, %opts) = @_;
     my $server = $self->server;
     my $sent = $self->_sent;
     my $dump_resp = $server->debug >= 6;
-    while (my $args = shift @$sent) {
+    my $n = $opts{max} || @$sent;
+    while ($n-- and my $args = shift @$sent) {
         my ($sync, $callback) = @$args;
         my ($resp_msg, $resp_payload);
         my $ok = eval {
diff --git a/connector/perl/lib/MR/Tarantool/Box.pm b/connector/perl/lib/MR/Tarantool/Box.pm
index 0f5dc3f6f5..8d87c96a37 100644
--- a/connector/perl/lib/MR/Tarantool/Box.pm
+++ b/connector/perl/lib/MR/Tarantool/Box.pm
@@ -194,9 +194,14 @@ Properly ordered arrayref of fields' numbers which are indexed.
 
 =item B<default_index> => $default_index_name_string_or_id_uint32
 
-Index C<id> or C<name> to be used by default for the current C<space>.
+Index C<id> or C<name> to be used by default for the current C<space> in B<select> operations.
 Must be set if there are more than one C<\%index>es.
 
+=item B<primary_key_index> => $primary_key_name_string_or_id_uint32
+
+Index C<id> or C<name> to be used by default for the current C<space> in B<update> operations.
+It is set to C<default_index> by default.
+
 =back
 
 =item B<default_space> => $default_space_name_string_or_id_uint32
@@ -277,6 +282,7 @@ sub new {
     $self->{select_timeout}  = $arg->{select_timeout} || $self->{timeout};
     $self->{iprotoclass}     = $arg->{iprotoclass} || $class->IPROTOCLASS;
     $self->{_last_error}     = 0;
+    $self->{_last_error_msg} = '';
 
     $self->{hashify}         = $arg->{'hashify'} if exists $arg->{'hashify'};
     $self->{default_raw}     = $arg->{default_raw};
@@ -319,8 +325,11 @@ sub new {
         if( @{$ns->{indexes}} > 1 ) {
             confess "space[$namespace] default_index not given" unless defined $ns->{default_index};
             confess "space[$namespace] default_index $ns->{default_index} does not exist" unless $inames->{$ns->{default_index}};
+            $ns->{primary_key_index} = $ns->{default_index} unless defined $ns->{primary_key_index};
+            confess "space[$namespace] primary_key_index $ns->{primary_key_index} does not exist" unless $inames->{$ns->{primary_key_index}};
         } else {
             $ns->{default_index} ||= 0;
+            $ns->{primary_key_index} ||= 0;
         }
         if($ns->{fields}) {
             confess "space[$namespace] fields must be ARRAYREF" unless ref $ns->{fields} eq 'ARRAY';
@@ -405,48 +414,81 @@ sub _chat {
     my $soft_retry = $self->{softretry};
     my $retry_count = 0;
 
-    while ($retry > 0) {
+    my $callback  = delete $param{callback};
+    my $return_fh = delete $param{return_fh};
+    my $_cb = $callback || $return_fh;
+
+    my $is_retry = sub {
+        my ($data) = @_;
         $retry_count++;
+        if($data) {
+            my ($ret_code, $data, $full_code) = @$data;
+            return 0 if $ret_code->[0] == 0;
+            # retry if error is soft even in case of update e.g. ROW_LOCK
+            if ($ret_code->[0] == 1 and --$soft_retry > 0) {
+                --$retry if $retry > 1;
+                return 1;
+            }
+        }
+        return 1 if --$retry;
+        return 0;
+    };
 
-        $self->{_last_error} = 0x77777777;
-        $self->{server}->SetTimeout($timeout);
-        my $ret = $self->{server}->Chat1(%param);
-        my $message;
+    my $message;
+    my $process = sub {
+        my ($data, $error) = @_;
+        my $errno = $!;
+        if (!$error && $data) {
+            my ($ret_code, $data, $full_code) = @$data;
 
-        if (exists $ret->{ok}) {
-            my ($ret_code, $data, $full_code) = @{$ret->{ok}};
             $self->{_last_error} = $full_code;
+            $self->{_last_error_msg} = $message = $ret_code->[0] == 0 ? "ok" : sprintf "Error %08X: %s", $full_code, $$data || $ERRORS{$full_code & 0xFFFFFF00} || 'Unknown error';
+            $self->_debug("$self->{name}: $message") if $ret_code->[0] != 0 && $self->{debug} >= 1;
+
             if ($ret_code->[0] == 0) {
                 my $ret = $orig_unpack->($$data,$ret_code->[2]);
                 confess __LINE__."$self->{name}: [common]: Bad response (more data left)" if length $$data > 0;
-                return $ret;
+                return $ret unless $_cb;
+                return &$_cb($ret);
             }
 
-            $self->{_last_error_msg} = $message = $ret_code->[0] == 0 ? "ok" : sprintf "Error %08X: %s", $full_code, $$data || $ERRORS{$full_code & 0xFFFFFF00} || 'Unknown error';
-            $self->_debug("$self->{name}: $message") if $self->{debug} >= 1;
             if ($ret_code->[0] == 2) { #fatal error
                 $self->_raise($message) if $self->{raise};
-                return 0;
-            }
-
-            # retry if error is soft even in case of update e.g. ROW_LOCK
-            if ($ret_code->[0] == 1 and --$soft_retry > 0) {
-                --$retry if $retry > 1;
-                sleep $self->{retry_delay};
-                next;
+                return 0 unless $_cb;
+                return &$_cb(0, $error);
             }
         } else { # timeout has caused the failure if $ret->{timeout}
             $self->{_last_error} = 'fail';
-            $message ||= $self->{_last_error_msg} = $ret->{fail};
+            $message ||= $self->{_last_error_msg} = $error;
             $self->_debug("$self->{name}: $message") if $self->{debug} >= 1;
+            $self->_raise("$self->{name}: no success after $retry_count tries: $message\n") if $self->{raise};
+            return 0 unless $_cb;
+            return &$_cb(0, $error);
         }
+    };
+
+    if ($callback) {
+        $self->{_last_error} = 0x77777777;
+        $self->{server}->SetTimeout($timeout);
+        return 1 if eval { $self->{server}->send({%param, is_retry => $is_retry}, $process); 1 };
+        return 0;
+    }
 
-        last unless --$retry;
+    $param{continue} = $process if $return_fh;
+
+    my $ret;
+    while ($retry > 0) {
+        $self->{_last_error} = 0x77777777;
+        $self->{server}->SetTimeout($timeout);
 
+        $ret = $self->{server}->Chat1(%param);
+        return $ret->{ok} if $param{continue} && $ret->{ok};
+        last unless &$is_retry($ret->{ok});
         sleep $self->{retry_delay};
     };
 
-    $self->_raise("no success after $retry_count tries\n") if $self->{raise};
+    $self->_raise("no success after $retry_count tries\n") if $self->{raise} && !$ret->{ok};
+    return &$process($ret->{ok}, $ret->{fail});
 }
 
 sub _raise {
@@ -461,6 +503,7 @@ sub _validate_param {
     my %pnames = map { $_ => 1 } @pnames;
     $pnames{space} = 1;
     $pnames{namespace} = 1;
+    $pnames{callback} = 1;
     foreach my $pname (keys %$param) {
         confess "$self->{name}: unknown param $pname\n" unless exists $pnames{$pname};
     }
@@ -470,7 +513,7 @@ sub _validate_param {
     confess "$self->{name}: bad space `$param->{namespace}'" unless exists $self->{namespaces}->{$param->{namespace}};
 
     my $ns = $self->{namespaces}->{$param->{namespace}};
-    $param->{use_index} = $ns->{default_index} unless defined $param->{use_index};
+    $param->{use_index} = $pnames{use_index} ? $ns->{default_index} : $ns->{primary_key_index} unless defined $param->{use_index};
     confess "$self->{name}: bad index `$param->{use_index}'" unless exists $ns->{index_names}->{$param->{use_index}};
     $param->{index} = $ns->{index_names}->{$param->{use_index}};
 
@@ -572,11 +615,11 @@ The difference between them is the behaviour concerning tuple with the same prim
 
 =item *
 
-B<Add> will succeed if and only if duplicate-key tuple B<does not exist> 
+B<Add> will succeed if and only if duplicate-key tuple B<does not exist>
 
 =item *
 
-B<Replace> will succeed if and only if a duplicate-key tuple B<exists> 
+B<Replace> will succeed if and only if a duplicate-key tuple B<exists>
 
 =item *
 
@@ -642,17 +685,27 @@ sub Insert {
 
     $self->_debug("$self->{name}: INSERT[${\join '   ', map {join' ',unpack'(H2)*',$_} @tuple}]") if $self->{debug} >= 4;
 
+    my $cb = sub {
+        my ($r) = @_;
+
+        if($param->{want_result}) {
+            $self->_PostSelect($r, $param, $namespace);
+            $r = $r && $r->[0];
+        }
+
+        return $param->{callback}->($r) if $param->{callback};
+        return $r;
+    };
+
     my $r = $self->_chat (
         msg      => 13,
         payload  => pack("LLL (w/a*)*", $namespace->{namespace}, $flags, scalar(@tuple), @tuple),
         unpack   => sub { $self->_unpack_affected($flags, $namespace, @_) },
-        callback => $param->{callback},
+        callback => $param->{callback} && $cb,
     ) or return;
 
-    return $r unless $param->{want_result};
-
-    $self->_PostSelect($r, $param, $namespace);
-    return $r->[0];
+    return 1 if $param->{callback};
+    return $cb->($r);
 }
 
 sub _unpack_select {
@@ -749,7 +802,7 @@ sub _PackSelect {
             @$_{qw/field offset length/}
         } @{$param->{format}};
     }
-    return pack("LLLL a* La*", $namespace->{namespace}, $param->{index}->{id}, $param->{offset} || 0, $param->{limit} || scalar(@keys), $format, scalar(@keys), join('',@keys));
+    return pack("LLLL a* La*", $namespace->{namespace}, $param->{index}->{id}, $param->{offset} || 0, $param->{limit} || ($param->{default_limit_by_keys} ? scalar(@keys) : 0x7FFFFFFF), $format, scalar(@keys), join('',@keys));
 }
 
 sub _PostSelect {
@@ -773,13 +826,13 @@ Select tuple(s) from storage
     my $key = $id;
     my $key = [ $firstname, $lastname ];
     my @keys = ($key, ...);
-    
+
     my $tuple  = $box->Select($key)              or $box->Error && die $box->ErrorStr;
     my $tuple  = $box->Select($key, \%options)   or $box->Error && die $box->ErrorStr;
-    
+
     my @tuples = $box->Select(@keys)             or $box->Error && die $box->ErrorStr;
     my @tuples = $box->Select(@keys, \%options)  or $box->Error && die $box->ErrorStr;
-    
+
     my $tuples = $box->Select(\@keys)            or die $box->ErrorStr;
     my $tuples = $box->Select(\@keys, \%options) or die $box->ErrorStr;
 
@@ -851,13 +904,17 @@ then C<$by> must be a field name of the hash you return,
 otherwise it must be a number of field of the tuple.
 C<False> will be returned in case of error.
 
+=item B<callback> => $code
+
+Async request using AnyEvent.
+
 =back
 
 =back
 
 =cut
 
-my @select_param_ok = qw/use_index raw want next_rows limit offset raise hashify timeout format hash_by/;
+my @select_param_ok = qw/use_index raw want next_rows limit offset raise hashify timeout format hash_by callback return_fh default_limit_by_keys/;
 sub Select {
     confess q/Select isnt callable in void context/ unless defined wantarray;
     my ($param, $namespace) = $_[0]->_validate_param(\@_, @select_param_ok);
@@ -881,6 +938,47 @@ sub Select {
     local $namespace->{unpack_format} = $param->{unpack_format} if $param->{unpack_format};
 
     my $r = [];
+
+    $param->{want} ||= !1;
+    my $wantarray = wantarray;
+
+    my $cb = sub {
+        my ($r) = (@_);
+
+        $self->_PostSelect($r, $param, $namespace);
+
+        if ($r && defined(my $p = $param->{hash_by})) {
+            my %h;
+            if (@$r) {
+                if (ref $r->[0] eq 'HASH') {
+                    confess "Bad hash_by `$p' for HASH" unless exists $r->[0]->{$p};
+                    $h{$_->{$p}} = $_ for @$r;
+                } elsif (ref $r->[0] eq 'ARRAY') {
+                    confess "Bad hash_by `$p' for ARRAY" unless $p =~ m/^\d+$/ && $p >= 0 && $p < @{$r->[0]};
+                    $h{$_->[$p]} = $_ for @$r;
+                } else {
+                    confess "i dont know how to hash_by ".ref($r->[0]);
+                }
+            }
+            $r = \%h;
+        }
+
+        if ($param->{callback}) {
+            return $param->{callback}->($r);
+        }
+
+        return $r if $param->{hash_by};
+        return $r if $param->{want} eq 'arrayref';
+        $wantarray = wantarray if $param->{return_fh};
+        
+        if ($wantarray) {
+            return @{$r};
+        } else {
+            confess "$self->{name}: too many keys in scalar context" if @keys > 1;
+            return $r->[0];
+        }
+    };
+
     if (@keys && $payload) {
         $r = $self->_chat(
             msg      => $msg,
@@ -888,38 +986,16 @@ sub Select {
             unpack   => sub { $self->_unpack_select($namespace, "SELECT", @_) },
             retry    => $self->{select_retry},
             timeout  => $param->{timeout} || $self->{select_timeout},
-            callback => $param->{callback},
+            callback => $param->{callback} ? $cb : 0,
+            return_fh=> $param->{return_fh} ? $cb : 0,
         ) or return;
-    }
-
-    $param->{want} ||= !1;
-
-    $self->_PostSelect($r, $param, $namespace);
-
-    if(defined(my $p = $param->{hash_by})) {
-        my %h;
-        if(@$r) {
-            if (ref $r->[0] eq 'HASH') {
-                confess "Bad hash_by `$p' for HASH" unless exists $r->[0]->{$p};
-                $h{$_->{$p}} = $_ for @$r;
-            } elsif(ref $r->[0] eq 'ARRAY') {
-                confess "Bad hash_by `$p' for ARRAY" unless $p =~ m/^\d+$/ && $p >= 0 && $p < @{$r->[0]};
-                $h{$_->[$p]} = $_ for @$r;
-            } else {
-                confess "i dont know how to hash_by ".ref($r->[0]);
-            }
-        }
-        return \%h;
-    }
-
-    return $r if $param->{want} eq 'arrayref';
-
-    if (wantarray) {
-        return @{$r};
+        return $r if $param->{return_fh};
+        return 1 if $param->{callback};
     } else {
-        confess "$self->{name}: too many keys in scalar context" if @keys > 1;
-        return $r->[0];
+        $r = [];
     }
+
+    return $cb->($r);
 }
 
 sub SelectUnion {
@@ -963,7 +1039,7 @@ Delete tuple from storage. Return false upon error.
     my $n_deleted = $box->Delete($key) or die $box->ErrorStr;
     my $n_deleted = $box->Delete($key, \%options) or die $box->ErrorStr;
     warn "Nothing was deleted" unless int $n_deleted;
-    
+
     my $deleted_tuple_set = $box->Delete($key, { want_deleted_tuples => 1 }) or die $box->ErrorStr;
     warn "Nothing was deleted" unless @$deleted_tuple_set;
 
@@ -1001,17 +1077,27 @@ sub Delete {
     confess "$self->{name}\->Delete: for now key cardinality of 1 is only allowed" unless 1 == @{$param->{index}->{keys}};
     $self->_pack_keys($namespace, $param->{index}, $key);
 
+    my $cb = sub {
+        my ($r) = @_;
+
+        if($param->{want_result}) {
+            $self->_PostSelect($r, $param, $namespace);
+            $r = $r && $r->[0];
+        }
+
+        return $param->{callback}->($r) if $param->{callback};
+        return $r;
+    };
+
     my $r = $self->_chat(
         msg      => $flags ? 21 : 20,
         payload  => $flags ? pack("L L a*", $namespace->{namespace}, $flags, $key) : pack("L a*", $namespace->{namespace}, $key),
         unpack   => sub { $self->_unpack_affected($flags, $namespace, @_) },
-        callback => $param->{callback},
+        callback => $param->{callback} && $cb,
     ) or return;
 
-    return $r unless $param->{want_result};
-
-    $self->_PostSelect($r, $param, $namespace);
-    return $r->[0];
+    return 1 if $param->{callback};
+    return $cb->($r);
 }
 
 sub OP_SET          () { 0 }
@@ -1075,11 +1161,11 @@ BEGIN {
 Apply several update operations to a tuple.
 
     my @op = ([ f1 => add => 10 ], [ f1 => and => 0xFF], [ f2 => set => time() ], [ misc_string => cutend => 3 ]);
-    
+
     my $n_updated = $box->UpdateMulti($key, @op) or die $box->ErrorStr;
     my $n_updated = $box->UpdateMulti($key, @op, \%options) or die $box->ErrorStr;
     warn "Nothing was updated" unless int $n_updated;
-    
+
     my $updated_tuple_set = $box->UpdateMulti($key, @op, { want_result => 1 }) or die $box->ErrorStr;
     warn "Nothing was updated" unless @$updated_tuple_set;
 
@@ -1124,7 +1210,7 @@ Append or prepend C<< $field >> with C<$value> string.
 
 Cut C<< $value >> bytes from beginning or end of C<< $field >>.
 
-=back 
+=back
 
 =back
 
@@ -1148,7 +1234,7 @@ sub UpdateMulti {
     my ($param, $namespace) = $_[0]->_validate_param(\@_, qw/want_updated_tuple want_result _flags raw/);
     my ($self, $key, @op) = @_;
 
-    $self->_debug("$self->{name}: UPDATEMULTI(NS:$namespace->{namespace},KEY:$key)[@{[map{qq{[@$_]}}@op]}]") if $self->{debug} >= 3;
+    $self->_debug("$self->{name}: UPDATEMULTI(NS:$namespace->{namespace},KEY:$key)[@{[map{$_?qq{[@$_]}:q{-}}@op]}]") if $self->{debug} >= 3;
 
     confess "$self->{name}\->UpdateMulti: for now key cardinality of 1 is only allowed" unless 1 == @{$param->{index}->{keys}};
     confess "$self->{name}: too many op" if scalar @op > 128;
@@ -1203,17 +1289,27 @@ sub UpdateMulti {
 
     $self->_pack_keys($namespace, $param->{index}, $key);
 
+    my $cb = sub {
+        my ($r) = @_;
+
+        if($param->{want_result}) {
+            $self->_PostSelect($r, $param, $namespace);
+            $r = $r && $r->[0];
+        }
+
+        return $param->{callback}->($r) if $param->{callback};
+        return $r;
+    };
+
     my $r = $self->_chat(
         msg      => 19,
         payload  => pack("LL a* L (a*)*" , $namespace->{namespace}, $flags, $key, scalar(@op), @op),
         unpack   => sub { $self->_unpack_affected($flags, $namespace, @_) },
-        callback => $param->{callback},
+        callback => $param->{callback} && $cb,
     ) or return;
-
-    return $r unless $param->{want_result};
-
-    $self->_PostSelect($r, $param, $namespace);
-    return $r->[0];
+    
+    return 1 if $param->{callback};
+    return $cb->($r);
 }
 
 sub Update {
diff --git a/connector/perl/t/box.pl b/connector/perl/t/box.pl
index b9e4f854de..46e4d25f9b 100644
--- a/connector/perl/t/box.pl
+++ b/connector/perl/t/box.pl
@@ -13,7 +13,7 @@ use FindBin qw($Bin);
 use lib "$Bin";
 use Carp qw/confess/;
 
-use Test::More tests => 233;
+use Test::More tests => 296;
 use Test::Exception;
 
 use List::MoreUtils qw/zip/;
@@ -32,6 +32,10 @@ use constant TOO_BIG_FIELD => qr/too big field/;
 
 my $box;
 my $server = (shift || $ENV{BOX}) or die;
+my %opts = (
+    debug => $ENV{DEBUG}||0,
+    ipdebug => $ENV{IPDEBUG}||0,
+);
 
 sub cleanup ($) {
     my ($id) = @_;
@@ -58,6 +62,7 @@ sub def_param  {
                  name          => 'main',
              } ],
              default_space => "main",
+             %opts,
          }
 }
 
@@ -156,6 +161,109 @@ is_deeply scalar $box->Select(13), [13, 'some_email@test.mail.ru', 1, 2, 3, 4, '
 
 
 
+my $continuation = $box->Select(13,{ return_fh => 1 });
+ok $continuation, "select/continuation";
+
+my $rin = '';
+vec($rin,$continuation->{fh}->fileno,1) = 1;
+my $ein = $rin;
+ok 0 <= select($rin,undef,$ein,2), "select/continuation/select";
+
+my $res = $continuation->{continue}->();
+use Data::Dumper;
+is_deeply $res, [13, 'some_email@test.mail.ru', 1, 2, 3, 4, '123456789'], "select/continuation/result";
+
+
+SKIP:{
+    skip "AnyEvent not found", 60 unless eval { require AnyEvent; 1 };
+
+    my $tt =     [ [1, 'rtokarev@corp.mail.ru',     11, 111, 1111, 11111, "1111111111111"],
+                   [2, 'vostrikov@corp.mail.ru',    22, 222, 2222, 22222, "2222222222222"],
+                   [3, 'aleinikov@corp.mail.ru',    33, 333, 3333, 33333, "3333333333333"],
+                   [4, 'roman.s.tokarev@gmail.com', 44, 444, 4444, 44444, "4444444444444"],
+                   [5, 'vostrIIIkov@corp.mail.ru',  55, 555, 5555, 55555, "5555555555555"] ];
+
+    foreach my $tuple (@$tt) {
+        cleanup $tuple->[0];
+    }
+
+    AnyEvent->now_update;
+    my $cv = AnyEvent->condvar;
+    foreach my $tuple (@$tt) {
+        $cv->begin;
+        ok $box->Insert(@$tuple, {callback => sub { ok $_[0], "async/insert$tuple->[0]/result"; $cv->end; }}), "async/insert$tuple->[0]";
+    }
+    $cv->recv;
+
+
+    AnyEvent->now_update;
+    $cv = AnyEvent->condvar;
+    $cv->begin;
+    ok $box->Select(1,2,3,{callback => sub {
+                                  my ($res) = @_;
+                                  $cv->end;
+                                  is_deeply $res, [@$tt[0,1,2]], "async/select1/result";
+                           }}), "async/select1";
+
+    $cv->begin;
+    ok $box->Select(4,5,{ callback => sub {
+                                  my ($res) = @_;
+                                  $cv->end;
+                                  is_deeply $res, [@$tt[3,4]], "async/select2/result";
+                           }}), "async/select2";
+
+    $cv->recv;
+
+
+    AnyEvent->now_update;
+    $cv = AnyEvent->condvar;
+    foreach my $tuple (@$tt) {
+        $tuple->[4] += 10000;
+        $cv->begin;
+        ok $box->UpdateMulti($tuple->[0], [ 4 => add => 10000 ], {callback => sub { ok $_[0], "async/update1-$tuple->[0]/result"; $cv->end; }}), "async/update1-$tuple->[0]";
+    }
+    $cv->begin;
+    ok $box->Select((map{$_->[0]}@$tt),{ callback => sub {
+                                  my ($res) = @_;
+                                  $cv->end;
+                                  is_deeply $res, $tt, "async/update1-select/result";
+                           }}), "async/update1-select";
+    $cv->recv;
+
+    AnyEvent->now_update;
+    $cv = AnyEvent->condvar;
+    foreach my $tuple (@$tt) {
+        $tuple->[4] += 10000;
+        $cv->begin;
+        ok $box->UpdateMulti($tuple->[0], [ 4 => add => 10000 ], {want_result => 1, callback => sub { is_deeply $_[0], $tuple, "async/update2-$tuple->[0]/result"; $cv->end; }}), "async/update2-$tuple->[0]";
+    }
+    $cv->begin;
+    ok $box->Select((map{$_->[0]}@$tt),{ callback => sub {
+                                  my ($res) = @_;
+                                  $cv->end;
+                                  is_deeply $res, $tt, "async/update2-select/result";
+                           }}), "async/update2-select";
+    $cv->recv;
+
+    AnyEvent->now_update;
+    $cv = AnyEvent->condvar;
+    foreach my $tuple (@$tt) {
+        $cv->begin;
+        ok $box->Delete($tuple->[0], {want_result => 1, callback => sub { is_deeply $_[0], $tuple, "async/delete-$tuple->[0]/result"; $cv->end; }}), "async/delete-$tuple->[0]";
+    }
+    $cv->begin;
+    ok $box->Select((map{$_->[0]}@$tt),{ callback => sub {
+                                  my ($res) = @_;
+                                  $cv->end;
+                                  is_deeply $res, [], "async/delete-select/result";
+                           }}), "async/delete-select";
+    $cv->recv;
+}
+
+
+
+
+
 $box = $CLASS->new(def_param);
 ok $box->isa($CLASS), 'connect';
 cleanup 13;
@@ -415,7 +523,9 @@ sub def_param1 {
                  namespace     => 26,
                  format        => $format,
                  default_index => 'primary_num1',
-             } ]}
+             } ],
+             %opts,
+         }
 }
 
 $box = $CLASS->new(def_param1);
@@ -458,7 +568,9 @@ sub def_param_bad {
                  namespace     => 26,
                  format        => $format,
                  default_index => 'primary_num1',
-             } ]}
+             } ],
+             %opts,
+         }
 }
 
 $box = $CLASS->new(def_param_bad);
@@ -493,7 +605,9 @@ sub def_param_unique {
                  space     => 27,
                  format        => $format,
                  default_index => 'id',
-             } ]}
+             } ],
+             %opts,
+         }
 }
 
 $box = $CLASS->new(def_param_unique);
@@ -587,7 +701,7 @@ sub def_param_u64 {
                  format        => $format,
                  default_index => 'id',
              } ],
-             debug => 0,
+             %opts,
          }
 }
 
-- 
GitLab