diff --git a/connector/perl/lib/MR/IProto.pm b/connector/perl/lib/MR/IProto.pm index 152b226620637bd80f53aadeb40cb7fd311f84e0..671ca2955304eedce1f00d62982d84ac8096677a 100644 --- a/connector/perl/lib/MR/IProto.pm +++ b/connector/perl/lib/MR/IProto.pm @@ -226,22 +226,32 @@ sub send { die "Method must be called in void context if you want to use async" if defined wantarray; $self->_send($message, $callback); return; - } - else { + } else { die "Method must be called in scalar context if you want to use sync" unless defined wantarray; my $olddie = ref $SIG{__DIE__} eq 'CODE' ? $SIG{__DIE__} : ref $SIG{__DIE__} eq 'GLOB' ? *{$SIG{__DIE__}}{CODE} : undef; local $SIG{__DIE__} = sub { local $! = 0; $olddie->(@_); } if $olddie; my %servers; my ($data, $error, $errno); - $self->_send_now($message, sub { + my $fh = $self->_send_now($message, sub { ($data, $error) = @_; $errno = $!; return; }, \%servers); - $self->_recv_now(\%servers); - $! = $errno; - die $error if $error; - return $data; + + my $cont = sub { + $self->_recv_now(\%servers, max => $message->{continue}?1:0); + $! = $errno; + return $message->{continue}->($data, $error, $errno) if $message->{continue}; + die $error if $error; + return $data; + }; + + return { + fh => $fh, + continue => $cont, + } if $message->{continue}; + + return &$cont(); } } @@ -528,8 +538,7 @@ sub _send_now { ); return; }; - $self->_send_try($sync, $args, $handler, $try); - return; + return $self->_send_try($sync, $args, $handler, $try); } sub _send_try { @@ -540,7 +549,7 @@ sub _send_try { my $connection = $server->$xsync(); $connection->send($args->{msg}, $args->{body}, $handler, $args->{no_reply}, $args->{sync}); $sync->{$connection} ||= $connection if $sync; - return; + return $connection->fh; } sub _send_retry { @@ -623,10 +632,10 @@ sub _server_callback { } sub _recv_now { - my ($self, $servers) = @_; + my ($self, $servers, %opts) = @_; while(my @servers = values %$servers) { %$servers = (); - $_->recv_all() foreach @servers; + $_->recv_all(%opts) foreach @servers; } return; } diff --git a/connector/perl/lib/MR/IProto/Connection/Async.pm b/connector/perl/lib/MR/IProto/Connection/Async.pm index d78b9180ff7eafcf10b7fac169b513c90bef57d4..2fb32bc023636bc21d59a8c920f0005c74077d4f 100644 --- a/connector/perl/lib/MR/IProto/Connection/Async.pm +++ b/connector/perl/lib/MR/IProto/Connection/Async.pm @@ -19,6 +19,7 @@ use Scalar::Util qw(weaken); has _handle => ( is => 'ro', isa => 'AnyEvent::Handle', + predicate => '_has_handle', lazy_build => 1, ); @@ -69,6 +70,8 @@ For list of arguments see L</_send>. =cut +sub fh { return $_[0]->_has_handle && $_[0]->_handle } + sub send { my $self = shift; if( $self->_in_progress < $self->max_parallel ) { diff --git a/connector/perl/lib/MR/IProto/Connection/Sync.pm b/connector/perl/lib/MR/IProto/Connection/Sync.pm index 7c3da90c46e690d3eaca9e31e2dc6d6b2e1b3d89..9e89ba7cc159f868b45f11a9573904cba2d81064 100644 --- a/connector/perl/lib/MR/IProto/Connection/Sync.pm +++ b/connector/perl/lib/MR/IProto/Connection/Sync.pm @@ -20,6 +20,7 @@ use Socket qw( TCP_NODELAY SO_KEEPALIVE SO_SNDTIMEO SO_RCVTIMEO ); has _socket => ( is => 'ro', isa => 'IO::Socket::INET', + predicate => '_has_socket', lazy_build => 1, ); @@ -32,12 +33,18 @@ has _sent => ( =over +=item fh + +Returns socket. + =item send See L<MR::IProto::Connection/send> for more information. =cut +sub fh { return $_[0]->_has_socket && $_[0]->_socket } + sub send { my ($self, $msg, $payload, $callback, $no_reply, $sync) = @_; my $server = $self->server; @@ -95,11 +102,12 @@ sub send { } sub recv_all { - my ($self) = @_; + my ($self, %opts) = @_; my $server = $self->server; my $sent = $self->_sent; my $dump_resp = $server->debug >= 6; - while (my $args = shift @$sent) { + my $n = $opts{max} || @$sent; + while ($n-- and my $args = shift @$sent) { my ($sync, $callback) = @$args; my ($resp_msg, $resp_payload); my $ok = eval { diff --git a/connector/perl/lib/MR/Tarantool/Box.pm b/connector/perl/lib/MR/Tarantool/Box.pm index 0f5dc3f6f556e3cf9d6fefc12cf0959193902b06..8d87c96a37c788c550a3d64e6fd38a015bbdb5af 100644 --- a/connector/perl/lib/MR/Tarantool/Box.pm +++ b/connector/perl/lib/MR/Tarantool/Box.pm @@ -194,9 +194,14 @@ Properly ordered arrayref of fields' numbers which are indexed. =item B<default_index> => $default_index_name_string_or_id_uint32 -Index C<id> or C<name> to be used by default for the current C<space>. +Index C<id> or C<name> to be used by default for the current C<space> in B<select> operations. Must be set if there are more than one C<\%index>es. +=item B<primary_key_index> => $primary_key_name_string_or_id_uint32 + +Index C<id> or C<name> to be used by default for the current C<space> in B<update> operations. +It is set to C<default_index> by default. + =back =item B<default_space> => $default_space_name_string_or_id_uint32 @@ -277,6 +282,7 @@ sub new { $self->{select_timeout} = $arg->{select_timeout} || $self->{timeout}; $self->{iprotoclass} = $arg->{iprotoclass} || $class->IPROTOCLASS; $self->{_last_error} = 0; + $self->{_last_error_msg} = ''; $self->{hashify} = $arg->{'hashify'} if exists $arg->{'hashify'}; $self->{default_raw} = $arg->{default_raw}; @@ -319,8 +325,11 @@ sub new { if( @{$ns->{indexes}} > 1 ) { confess "space[$namespace] default_index not given" unless defined $ns->{default_index}; confess "space[$namespace] default_index $ns->{default_index} does not exist" unless $inames->{$ns->{default_index}}; + $ns->{primary_key_index} = $ns->{default_index} unless defined $ns->{primary_key_index}; + confess "space[$namespace] primary_key_index $ns->{primary_key_index} does not exist" unless $inames->{$ns->{primary_key_index}}; } else { $ns->{default_index} ||= 0; + $ns->{primary_key_index} ||= 0; } if($ns->{fields}) { confess "space[$namespace] fields must be ARRAYREF" unless ref $ns->{fields} eq 'ARRAY'; @@ -405,48 +414,81 @@ sub _chat { my $soft_retry = $self->{softretry}; my $retry_count = 0; - while ($retry > 0) { + my $callback = delete $param{callback}; + my $return_fh = delete $param{return_fh}; + my $_cb = $callback || $return_fh; + + my $is_retry = sub { + my ($data) = @_; $retry_count++; + if($data) { + my ($ret_code, $data, $full_code) = @$data; + return 0 if $ret_code->[0] == 0; + # retry if error is soft even in case of update e.g. ROW_LOCK + if ($ret_code->[0] == 1 and --$soft_retry > 0) { + --$retry if $retry > 1; + return 1; + } + } + return 1 if --$retry; + return 0; + }; - $self->{_last_error} = 0x77777777; - $self->{server}->SetTimeout($timeout); - my $ret = $self->{server}->Chat1(%param); - my $message; + my $message; + my $process = sub { + my ($data, $error) = @_; + my $errno = $!; + if (!$error && $data) { + my ($ret_code, $data, $full_code) = @$data; - if (exists $ret->{ok}) { - my ($ret_code, $data, $full_code) = @{$ret->{ok}}; $self->{_last_error} = $full_code; + $self->{_last_error_msg} = $message = $ret_code->[0] == 0 ? "ok" : sprintf "Error %08X: %s", $full_code, $$data || $ERRORS{$full_code & 0xFFFFFF00} || 'Unknown error'; + $self->_debug("$self->{name}: $message") if $ret_code->[0] != 0 && $self->{debug} >= 1; + if ($ret_code->[0] == 0) { my $ret = $orig_unpack->($$data,$ret_code->[2]); confess __LINE__."$self->{name}: [common]: Bad response (more data left)" if length $$data > 0; - return $ret; + return $ret unless $_cb; + return &$_cb($ret); } - $self->{_last_error_msg} = $message = $ret_code->[0] == 0 ? "ok" : sprintf "Error %08X: %s", $full_code, $$data || $ERRORS{$full_code & 0xFFFFFF00} || 'Unknown error'; - $self->_debug("$self->{name}: $message") if $self->{debug} >= 1; if ($ret_code->[0] == 2) { #fatal error $self->_raise($message) if $self->{raise}; - return 0; - } - - # retry if error is soft even in case of update e.g. ROW_LOCK - if ($ret_code->[0] == 1 and --$soft_retry > 0) { - --$retry if $retry > 1; - sleep $self->{retry_delay}; - next; + return 0 unless $_cb; + return &$_cb(0, $error); } } else { # timeout has caused the failure if $ret->{timeout} $self->{_last_error} = 'fail'; - $message ||= $self->{_last_error_msg} = $ret->{fail}; + $message ||= $self->{_last_error_msg} = $error; $self->_debug("$self->{name}: $message") if $self->{debug} >= 1; + $self->_raise("$self->{name}: no success after $retry_count tries: $message\n") if $self->{raise}; + return 0 unless $_cb; + return &$_cb(0, $error); } + }; + + if ($callback) { + $self->{_last_error} = 0x77777777; + $self->{server}->SetTimeout($timeout); + return 1 if eval { $self->{server}->send({%param, is_retry => $is_retry}, $process); 1 }; + return 0; + } - last unless --$retry; + $param{continue} = $process if $return_fh; + + my $ret; + while ($retry > 0) { + $self->{_last_error} = 0x77777777; + $self->{server}->SetTimeout($timeout); + $ret = $self->{server}->Chat1(%param); + return $ret->{ok} if $param{continue} && $ret->{ok}; + last unless &$is_retry($ret->{ok}); sleep $self->{retry_delay}; }; - $self->_raise("no success after $retry_count tries\n") if $self->{raise}; + $self->_raise("no success after $retry_count tries\n") if $self->{raise} && !$ret->{ok}; + return &$process($ret->{ok}, $ret->{fail}); } sub _raise { @@ -461,6 +503,7 @@ sub _validate_param { my %pnames = map { $_ => 1 } @pnames; $pnames{space} = 1; $pnames{namespace} = 1; + $pnames{callback} = 1; foreach my $pname (keys %$param) { confess "$self->{name}: unknown param $pname\n" unless exists $pnames{$pname}; } @@ -470,7 +513,7 @@ sub _validate_param { confess "$self->{name}: bad space `$param->{namespace}'" unless exists $self->{namespaces}->{$param->{namespace}}; my $ns = $self->{namespaces}->{$param->{namespace}}; - $param->{use_index} = $ns->{default_index} unless defined $param->{use_index}; + $param->{use_index} = $pnames{use_index} ? $ns->{default_index} : $ns->{primary_key_index} unless defined $param->{use_index}; confess "$self->{name}: bad index `$param->{use_index}'" unless exists $ns->{index_names}->{$param->{use_index}}; $param->{index} = $ns->{index_names}->{$param->{use_index}}; @@ -572,11 +615,11 @@ The difference between them is the behaviour concerning tuple with the same prim =item * -B<Add> will succeed if and only if duplicate-key tuple B<does not exist> +B<Add> will succeed if and only if duplicate-key tuple B<does not exist> =item * -B<Replace> will succeed if and only if a duplicate-key tuple B<exists> +B<Replace> will succeed if and only if a duplicate-key tuple B<exists> =item * @@ -642,17 +685,27 @@ sub Insert { $self->_debug("$self->{name}: INSERT[${\join ' ', map {join' ',unpack'(H2)*',$_} @tuple}]") if $self->{debug} >= 4; + my $cb = sub { + my ($r) = @_; + + if($param->{want_result}) { + $self->_PostSelect($r, $param, $namespace); + $r = $r && $r->[0]; + } + + return $param->{callback}->($r) if $param->{callback}; + return $r; + }; + my $r = $self->_chat ( msg => 13, payload => pack("LLL (w/a*)*", $namespace->{namespace}, $flags, scalar(@tuple), @tuple), unpack => sub { $self->_unpack_affected($flags, $namespace, @_) }, - callback => $param->{callback}, + callback => $param->{callback} && $cb, ) or return; - return $r unless $param->{want_result}; - - $self->_PostSelect($r, $param, $namespace); - return $r->[0]; + return 1 if $param->{callback}; + return $cb->($r); } sub _unpack_select { @@ -749,7 +802,7 @@ sub _PackSelect { @$_{qw/field offset length/} } @{$param->{format}}; } - return pack("LLLL a* La*", $namespace->{namespace}, $param->{index}->{id}, $param->{offset} || 0, $param->{limit} || scalar(@keys), $format, scalar(@keys), join('',@keys)); + return pack("LLLL a* La*", $namespace->{namespace}, $param->{index}->{id}, $param->{offset} || 0, $param->{limit} || ($param->{default_limit_by_keys} ? scalar(@keys) : 0x7FFFFFFF), $format, scalar(@keys), join('',@keys)); } sub _PostSelect { @@ -773,13 +826,13 @@ Select tuple(s) from storage my $key = $id; my $key = [ $firstname, $lastname ]; my @keys = ($key, ...); - + my $tuple = $box->Select($key) or $box->Error && die $box->ErrorStr; my $tuple = $box->Select($key, \%options) or $box->Error && die $box->ErrorStr; - + my @tuples = $box->Select(@keys) or $box->Error && die $box->ErrorStr; my @tuples = $box->Select(@keys, \%options) or $box->Error && die $box->ErrorStr; - + my $tuples = $box->Select(\@keys) or die $box->ErrorStr; my $tuples = $box->Select(\@keys, \%options) or die $box->ErrorStr; @@ -851,13 +904,17 @@ then C<$by> must be a field name of the hash you return, otherwise it must be a number of field of the tuple. C<False> will be returned in case of error. +=item B<callback> => $code + +Async request using AnyEvent. + =back =back =cut -my @select_param_ok = qw/use_index raw want next_rows limit offset raise hashify timeout format hash_by/; +my @select_param_ok = qw/use_index raw want next_rows limit offset raise hashify timeout format hash_by callback return_fh default_limit_by_keys/; sub Select { confess q/Select isnt callable in void context/ unless defined wantarray; my ($param, $namespace) = $_[0]->_validate_param(\@_, @select_param_ok); @@ -881,6 +938,47 @@ sub Select { local $namespace->{unpack_format} = $param->{unpack_format} if $param->{unpack_format}; my $r = []; + + $param->{want} ||= !1; + my $wantarray = wantarray; + + my $cb = sub { + my ($r) = (@_); + + $self->_PostSelect($r, $param, $namespace); + + if ($r && defined(my $p = $param->{hash_by})) { + my %h; + if (@$r) { + if (ref $r->[0] eq 'HASH') { + confess "Bad hash_by `$p' for HASH" unless exists $r->[0]->{$p}; + $h{$_->{$p}} = $_ for @$r; + } elsif (ref $r->[0] eq 'ARRAY') { + confess "Bad hash_by `$p' for ARRAY" unless $p =~ m/^\d+$/ && $p >= 0 && $p < @{$r->[0]}; + $h{$_->[$p]} = $_ for @$r; + } else { + confess "i dont know how to hash_by ".ref($r->[0]); + } + } + $r = \%h; + } + + if ($param->{callback}) { + return $param->{callback}->($r); + } + + return $r if $param->{hash_by}; + return $r if $param->{want} eq 'arrayref'; + $wantarray = wantarray if $param->{return_fh}; + + if ($wantarray) { + return @{$r}; + } else { + confess "$self->{name}: too many keys in scalar context" if @keys > 1; + return $r->[0]; + } + }; + if (@keys && $payload) { $r = $self->_chat( msg => $msg, @@ -888,38 +986,16 @@ sub Select { unpack => sub { $self->_unpack_select($namespace, "SELECT", @_) }, retry => $self->{select_retry}, timeout => $param->{timeout} || $self->{select_timeout}, - callback => $param->{callback}, + callback => $param->{callback} ? $cb : 0, + return_fh=> $param->{return_fh} ? $cb : 0, ) or return; - } - - $param->{want} ||= !1; - - $self->_PostSelect($r, $param, $namespace); - - if(defined(my $p = $param->{hash_by})) { - my %h; - if(@$r) { - if (ref $r->[0] eq 'HASH') { - confess "Bad hash_by `$p' for HASH" unless exists $r->[0]->{$p}; - $h{$_->{$p}} = $_ for @$r; - } elsif(ref $r->[0] eq 'ARRAY') { - confess "Bad hash_by `$p' for ARRAY" unless $p =~ m/^\d+$/ && $p >= 0 && $p < @{$r->[0]}; - $h{$_->[$p]} = $_ for @$r; - } else { - confess "i dont know how to hash_by ".ref($r->[0]); - } - } - return \%h; - } - - return $r if $param->{want} eq 'arrayref'; - - if (wantarray) { - return @{$r}; + return $r if $param->{return_fh}; + return 1 if $param->{callback}; } else { - confess "$self->{name}: too many keys in scalar context" if @keys > 1; - return $r->[0]; + $r = []; } + + return $cb->($r); } sub SelectUnion { @@ -963,7 +1039,7 @@ Delete tuple from storage. Return false upon error. my $n_deleted = $box->Delete($key) or die $box->ErrorStr; my $n_deleted = $box->Delete($key, \%options) or die $box->ErrorStr; warn "Nothing was deleted" unless int $n_deleted; - + my $deleted_tuple_set = $box->Delete($key, { want_deleted_tuples => 1 }) or die $box->ErrorStr; warn "Nothing was deleted" unless @$deleted_tuple_set; @@ -1001,17 +1077,27 @@ sub Delete { confess "$self->{name}\->Delete: for now key cardinality of 1 is only allowed" unless 1 == @{$param->{index}->{keys}}; $self->_pack_keys($namespace, $param->{index}, $key); + my $cb = sub { + my ($r) = @_; + + if($param->{want_result}) { + $self->_PostSelect($r, $param, $namespace); + $r = $r && $r->[0]; + } + + return $param->{callback}->($r) if $param->{callback}; + return $r; + }; + my $r = $self->_chat( msg => $flags ? 21 : 20, payload => $flags ? pack("L L a*", $namespace->{namespace}, $flags, $key) : pack("L a*", $namespace->{namespace}, $key), unpack => sub { $self->_unpack_affected($flags, $namespace, @_) }, - callback => $param->{callback}, + callback => $param->{callback} && $cb, ) or return; - return $r unless $param->{want_result}; - - $self->_PostSelect($r, $param, $namespace); - return $r->[0]; + return 1 if $param->{callback}; + return $cb->($r); } sub OP_SET () { 0 } @@ -1075,11 +1161,11 @@ BEGIN { Apply several update operations to a tuple. my @op = ([ f1 => add => 10 ], [ f1 => and => 0xFF], [ f2 => set => time() ], [ misc_string => cutend => 3 ]); - + my $n_updated = $box->UpdateMulti($key, @op) or die $box->ErrorStr; my $n_updated = $box->UpdateMulti($key, @op, \%options) or die $box->ErrorStr; warn "Nothing was updated" unless int $n_updated; - + my $updated_tuple_set = $box->UpdateMulti($key, @op, { want_result => 1 }) or die $box->ErrorStr; warn "Nothing was updated" unless @$updated_tuple_set; @@ -1124,7 +1210,7 @@ Append or prepend C<< $field >> with C<$value> string. Cut C<< $value >> bytes from beginning or end of C<< $field >>. -=back +=back =back @@ -1148,7 +1234,7 @@ sub UpdateMulti { my ($param, $namespace) = $_[0]->_validate_param(\@_, qw/want_updated_tuple want_result _flags raw/); my ($self, $key, @op) = @_; - $self->_debug("$self->{name}: UPDATEMULTI(NS:$namespace->{namespace},KEY:$key)[@{[map{qq{[@$_]}}@op]}]") if $self->{debug} >= 3; + $self->_debug("$self->{name}: UPDATEMULTI(NS:$namespace->{namespace},KEY:$key)[@{[map{$_?qq{[@$_]}:q{-}}@op]}]") if $self->{debug} >= 3; confess "$self->{name}\->UpdateMulti: for now key cardinality of 1 is only allowed" unless 1 == @{$param->{index}->{keys}}; confess "$self->{name}: too many op" if scalar @op > 128; @@ -1203,17 +1289,27 @@ sub UpdateMulti { $self->_pack_keys($namespace, $param->{index}, $key); + my $cb = sub { + my ($r) = @_; + + if($param->{want_result}) { + $self->_PostSelect($r, $param, $namespace); + $r = $r && $r->[0]; + } + + return $param->{callback}->($r) if $param->{callback}; + return $r; + }; + my $r = $self->_chat( msg => 19, payload => pack("LL a* L (a*)*" , $namespace->{namespace}, $flags, $key, scalar(@op), @op), unpack => sub { $self->_unpack_affected($flags, $namespace, @_) }, - callback => $param->{callback}, + callback => $param->{callback} && $cb, ) or return; - - return $r unless $param->{want_result}; - - $self->_PostSelect($r, $param, $namespace); - return $r->[0]; + + return 1 if $param->{callback}; + return $cb->($r); } sub Update { diff --git a/connector/perl/t/box.pl b/connector/perl/t/box.pl index b9e4f854def6c1200f94e21f27cd2b734d874193..46e4d25f9b2c84fe80006cd6fdd9cf2a3b281e9a 100644 --- a/connector/perl/t/box.pl +++ b/connector/perl/t/box.pl @@ -13,7 +13,7 @@ use FindBin qw($Bin); use lib "$Bin"; use Carp qw/confess/; -use Test::More tests => 233; +use Test::More tests => 296; use Test::Exception; use List::MoreUtils qw/zip/; @@ -32,6 +32,10 @@ use constant TOO_BIG_FIELD => qr/too big field/; my $box; my $server = (shift || $ENV{BOX}) or die; +my %opts = ( + debug => $ENV{DEBUG}||0, + ipdebug => $ENV{IPDEBUG}||0, +); sub cleanup ($) { my ($id) = @_; @@ -58,6 +62,7 @@ sub def_param { name => 'main', } ], default_space => "main", + %opts, } } @@ -156,6 +161,109 @@ is_deeply scalar $box->Select(13), [13, 'some_email@test.mail.ru', 1, 2, 3, 4, ' +my $continuation = $box->Select(13,{ return_fh => 1 }); +ok $continuation, "select/continuation"; + +my $rin = ''; +vec($rin,$continuation->{fh}->fileno,1) = 1; +my $ein = $rin; +ok 0 <= select($rin,undef,$ein,2), "select/continuation/select"; + +my $res = $continuation->{continue}->(); +use Data::Dumper; +is_deeply $res, [13, 'some_email@test.mail.ru', 1, 2, 3, 4, '123456789'], "select/continuation/result"; + + +SKIP:{ + skip "AnyEvent not found", 60 unless eval { require AnyEvent; 1 }; + + my $tt = [ [1, 'rtokarev@corp.mail.ru', 11, 111, 1111, 11111, "1111111111111"], + [2, 'vostrikov@corp.mail.ru', 22, 222, 2222, 22222, "2222222222222"], + [3, 'aleinikov@corp.mail.ru', 33, 333, 3333, 33333, "3333333333333"], + [4, 'roman.s.tokarev@gmail.com', 44, 444, 4444, 44444, "4444444444444"], + [5, 'vostrIIIkov@corp.mail.ru', 55, 555, 5555, 55555, "5555555555555"] ]; + + foreach my $tuple (@$tt) { + cleanup $tuple->[0]; + } + + AnyEvent->now_update; + my $cv = AnyEvent->condvar; + foreach my $tuple (@$tt) { + $cv->begin; + ok $box->Insert(@$tuple, {callback => sub { ok $_[0], "async/insert$tuple->[0]/result"; $cv->end; }}), "async/insert$tuple->[0]"; + } + $cv->recv; + + + AnyEvent->now_update; + $cv = AnyEvent->condvar; + $cv->begin; + ok $box->Select(1,2,3,{callback => sub { + my ($res) = @_; + $cv->end; + is_deeply $res, [@$tt[0,1,2]], "async/select1/result"; + }}), "async/select1"; + + $cv->begin; + ok $box->Select(4,5,{ callback => sub { + my ($res) = @_; + $cv->end; + is_deeply $res, [@$tt[3,4]], "async/select2/result"; + }}), "async/select2"; + + $cv->recv; + + + AnyEvent->now_update; + $cv = AnyEvent->condvar; + foreach my $tuple (@$tt) { + $tuple->[4] += 10000; + $cv->begin; + ok $box->UpdateMulti($tuple->[0], [ 4 => add => 10000 ], {callback => sub { ok $_[0], "async/update1-$tuple->[0]/result"; $cv->end; }}), "async/update1-$tuple->[0]"; + } + $cv->begin; + ok $box->Select((map{$_->[0]}@$tt),{ callback => sub { + my ($res) = @_; + $cv->end; + is_deeply $res, $tt, "async/update1-select/result"; + }}), "async/update1-select"; + $cv->recv; + + AnyEvent->now_update; + $cv = AnyEvent->condvar; + foreach my $tuple (@$tt) { + $tuple->[4] += 10000; + $cv->begin; + ok $box->UpdateMulti($tuple->[0], [ 4 => add => 10000 ], {want_result => 1, callback => sub { is_deeply $_[0], $tuple, "async/update2-$tuple->[0]/result"; $cv->end; }}), "async/update2-$tuple->[0]"; + } + $cv->begin; + ok $box->Select((map{$_->[0]}@$tt),{ callback => sub { + my ($res) = @_; + $cv->end; + is_deeply $res, $tt, "async/update2-select/result"; + }}), "async/update2-select"; + $cv->recv; + + AnyEvent->now_update; + $cv = AnyEvent->condvar; + foreach my $tuple (@$tt) { + $cv->begin; + ok $box->Delete($tuple->[0], {want_result => 1, callback => sub { is_deeply $_[0], $tuple, "async/delete-$tuple->[0]/result"; $cv->end; }}), "async/delete-$tuple->[0]"; + } + $cv->begin; + ok $box->Select((map{$_->[0]}@$tt),{ callback => sub { + my ($res) = @_; + $cv->end; + is_deeply $res, [], "async/delete-select/result"; + }}), "async/delete-select"; + $cv->recv; +} + + + + + $box = $CLASS->new(def_param); ok $box->isa($CLASS), 'connect'; cleanup 13; @@ -415,7 +523,9 @@ sub def_param1 { namespace => 26, format => $format, default_index => 'primary_num1', - } ]} + } ], + %opts, + } } $box = $CLASS->new(def_param1); @@ -458,7 +568,9 @@ sub def_param_bad { namespace => 26, format => $format, default_index => 'primary_num1', - } ]} + } ], + %opts, + } } $box = $CLASS->new(def_param_bad); @@ -493,7 +605,9 @@ sub def_param_unique { space => 27, format => $format, default_index => 'id', - } ]} + } ], + %opts, + } } $box = $CLASS->new(def_param_unique); @@ -587,7 +701,7 @@ sub def_param_u64 { format => $format, default_index => 'id', } ], - debug => 0, + %opts, } }