diff --git a/connector/perl/lib/MR/Pending.pm b/connector/perl/lib/MR/Pending.pm index e270472d89a526d9894c37696343f93f0764b37c..eda0f764bd08503941c3642dbe5a331bbc9344b8 100644 --- a/connector/perl/lib/MR/Pending.pm +++ b/connector/perl/lib/MR/Pending.pm @@ -1,6 +1,38 @@ +=head1 NAME + +MR::Pending - watcher for some requests results + + +=head1 SYNOPSIS + + my $pnd = MR::Pending->new( + maxtime => 10, + itertime => 0.1, + secondary_itertime => 0.01, + name => 'My waiter', + + onidle => sub { ... }, + + pending => [ ... ] + ); + + + $pnd->work; + +=cut + package MR::Pending; use Mouse; use Time::HiRes qw/time/; +use Data::Dumper; + +=head1 ATTRIBUTES + +=head2 maxtime + +Timeout for all requests. + +=cut has maxtime => ( is => 'rw', @@ -9,6 +41,13 @@ has maxtime => ( default => 6.0, ); +=head2 itertime + +One iteration time. If all requests have no data, L<onidle> will be called +with the time. + +=cut + has itertime => ( is => 'rw', isa => 'Num', @@ -16,12 +55,41 @@ has itertime => ( default => 0.1, ); + +=head2 secondary_itertime + +Module will do secondary requests (L<onsecondary_retry>) if the first request +have no data after te timeout. + +=cut + +has secondary_itertime => ( + is => 'rw', + isa => 'Num', + predicate => '_has_secondary_itertime', + default => 0.01, +); + + +=head2 name + +Name of pending instance (for debug messages) + +=cut + has name => ( is => 'rw', isa => 'Str', required => 1, ); + +=head2 onidle + +callback. will be called for each iteration if there are no data from servers. + +=cut + has onidle => ( is => 'rw', isa => 'CodeRef', @@ -34,6 +102,13 @@ has _pending => ( default => sub { {} }, ); + +=head2 exceptions + +count of exceptions in callbacks + +=cut + has exceptions => ( is => 'rw', isa => 'Int', @@ -51,6 +126,17 @@ has _waitresult => ( isa => 'ArrayRef', ); + +=head1 METHODS + + +=head2 new + +Constructor. receives one additionall argiments: B<pending> that can contain +array of pending requests (L<MR::Pending::Item>). + +=cut + around BUILDARGS => sub { my $orig = shift; my $class = shift; @@ -71,6 +157,13 @@ sub runcatch { return $ret; } + +=head2 add(@pends) + +add pending requests (L<MR::Pending::Item>) + +=cut + sub add { my ($self, @p) = @_; my $p = $self->_pending; @@ -81,6 +174,13 @@ sub add { return $self; } + +=head2 remove(@pends) + +remove pending requests (L<MR::Pending::Item>) + +=cut + sub remove { my ($self, @p) = @_; my $p = $self->_pending; @@ -98,7 +198,8 @@ sub send { my $pend = $pending->{$shard}; if ($pend->try < $pend->retry) { next unless $pend->is_timeout; - $pend->set_pending_mode(scalar $self->runcatch($pend->onretry, ($pend->id, $pend, $self))); + my $cont = $self->runcatch($pend->onretry, ($pend->id, $pend, $self)); + $pend->set_pending_mode($cont); } else { delete $pending->{$shard}; $self->runcatch($pend->onerror, ($pend->id, "no success after @{[$pend->try]} retries", $pend, $self)); @@ -195,6 +296,13 @@ sub iter { return 1; } + +=head2 work + +do all pending requests, wait their results or timeout (L<maxtime>) + +=cut + sub work { my ($self) = @_; @@ -212,7 +320,9 @@ sub check_exceptions { my ($self, $raise) = @_; my $e = $self->_exceptions; return unless $e && @$e; - my $str = "$$: PENDING EXCEPTIONS BEGIN\n".join("\n$$:###################\n", @$e)."$$: PENDING EXCEPTIONS END"; + my $str = "$$: PENDING EXCEPTIONS BEGIN\n" + . join("\n$$:###################\n", @$e) + . "$$: PENDING EXCEPTIONS END"; die $str if $raise; warn $str if defined $raise; return $str; @@ -222,6 +332,14 @@ no Mouse; __PACKAGE__->meta->make_immutable(); +=head1 MR::Pending::Item + +one pending task + + +=head1 ATTRIBUTES + +=cut package MR::Pending::Item; @@ -229,12 +347,26 @@ use Mouse; use Time::HiRes qw/time/; use Carp; + +=head2 id + +unique id for the task + +=cut + has id => ( is => 'ro', isa => 'Str', required => 1, ); + +=head2 onok onerror onretry onsecondary_retry + +functions that are called on different stages + +=cut + has $_ => ( is => 'ro', isa => 'CodeRef', @@ -242,6 +374,13 @@ has $_ => ( required => 1, ) for qw/onok onerror onretry/; +has $_ => ( + is => 'ro', + isa => 'CodeRef', + predicate => "_has_$_", + default => sub { sub {} }, +) for qw{onsecondary_retry}; + has $_ => ( is => 'rw', isa => 'Num', diff --git a/connector/perl/lib/MR/Tarantool/Box.pm b/connector/perl/lib/MR/Tarantool/Box.pm index 1648a45b79e3158045dcffba136171ce5fb0bb92..e078e5a6d955ad235da82a100db75ff2b40ff007 100644 --- a/connector/perl/lib/MR/Tarantool/Box.pm +++ b/connector/perl/lib/MR/Tarantool/Box.pm @@ -608,9 +608,11 @@ Format to unpack the result tuple, the same as C<format> option for C<new()> =cut sub Call { - my ($param, $namespace) = $_[0]->_validate_param(\@_, qw/flags raise unpack unpack_format unpack_format_from_space/); + my ($param, $namespace) = $_[0]->_validate_param(\@_, qw/flags raise unpack unpack_format unpack_format_from_space return_fh/); my ($self, $sp_name, $tuple) = @_; + my $return_fh = delete $param->{return_fh}; + my $flags = $param->{flags} || 0; local $self->{raise} = $param->{raise} if defined $param->{raise}; @@ -644,8 +646,15 @@ sub Call { $self->_chat ( msg => 22, payload => pack("L w/a* L(w/a*)*", $flags, $sp_name, scalar(@$tuple), @$tuple), - unpack => $param->{unpack} || sub { $self->_unpack_select($space||$namespace, "CALL", @_) }, + unpack => $param->{unpack} || sub { + local $namespace->{unpack_format} = $unpack_format + if !$space && $unpack_format; + local $namespace->{append_for_unpack} = '' + if !$space && $unpack_format; + $self->_unpack_select($space||$namespace, "CALL", @_) + }, callback => $param->{callback}, + return_fh => $return_fh ? sub { return $_[0] } : 0 ); } @@ -784,6 +793,7 @@ sub Insert { sub _unpack_select { my ($self, $ns, $debug_prefix) = @_; + $debug_prefix ||= "SELECT"; confess __LINE__."$self->{name}: [$debug_prefix]: Bad response" if length $_[3] < 4; my $result_count = unpack('L', substr($_[3], 0, 4, '')); diff --git a/connector/perl/t/03-box.t b/connector/perl/t/03-box.t index 1cf66e64e5458c63991308d3fd29749251360dd5..cb4805b161949210a7573ee9343d96110d5bc493 100644 --- a/connector/perl/t/03-box.t +++ b/connector/perl/t/03-box.t @@ -53,16 +53,15 @@ ok -r $tarantool_config, "-r $tarantool_config"; our $server = shift || $ENV{BOX}; +my $tnt_srv; SKIP: { skip 'The test uses external tarantool instance', 2 if $server; - my $tnt_srv = Test::Tarantool->run(cfg => $tarantool_config); + $tnt_srv = Test::Tarantool->run(cfg => $tarantool_config); ok $tnt_srv, 'server instance created'; diag $tnt_srv->log unless ok $tnt_srv->started, 'server is started'; - $server = sprintf '127.0.0.1:%d', $tnt_srv->primary_port; - } ok $server, 'server address was defined ' . $server || 'undef'; diff --git a/connector/perl/t/pending.pl b/connector/perl/t/04-pending.t similarity index 64% rename from connector/perl/t/pending.pl rename to connector/perl/t/04-pending.t index f0db3b360f95fc152c73a9511768b1af61a3de2a..1fd365b4856774d54e7c3898461c3c95e41f8a9e 100644 --- a/connector/perl/t/pending.pl +++ b/connector/perl/t/04-pending.t @@ -11,14 +11,19 @@ BEGIN { } use FindBin qw($Bin); use lib "$Bin"; +use lib "$Bin/../lib"; + use Carp qw/confess/; -use Test::More tests => 24; +use Test::More tests => 29; use Test::Exception; use List::MoreUtils qw/zip/; -use MR::Pending; +BEGIN { + use_ok 'Test::Tarantool'; + use_ok 'MR::Pending'; +} local $SIG{__DIE__} = \&confess; @@ -35,7 +40,25 @@ use constant NO_SUCCESS => qr/no success after/; use constant TOO_BIG_FIELD => qr/too big field/; our $box; -our $server = (shift || $ENV{BOX}) or die; + + +our $server = shift || $ENV{BOX}; + +my $tarantool_config = "$Bin/data/pending.t.cfg"; +ok -r $tarantool_config, "-r $tarantool_config"; +my $tnt_srv; + +SKIP: { + skip 'The test uses external tarantool', 2 if $server; + + $tnt_srv = Test::Tarantool->run(cfg => $tarantool_config); + ok $tnt_srv, 'server instance created'; + diag $tnt_srv->log unless ok $tnt_srv->started, 'server is started'; + + $server = sprintf '127.0.0.1:%d', $tnt_srv->primary_port; +} + + our %opts = ( debug => $ENV{DEBUG}||0, ipdebug => $ENV{IPDEBUG}||0, @@ -49,6 +72,8 @@ sub cleanup ($) { ok $box->Delete($id) == 0, 'delete of non existing record'; } + + sub def_param { my $format = shift || 'l& SSLL'; return { servers => $server, @@ -92,6 +117,7 @@ foreach my $tuple (@$tt) { } my @box = ($box, $box2); + my @select = ([1,2,3],[2,3,4]); my $onok = sub { @@ -112,6 +138,12 @@ my $ontry = sub { return $box[$i]->Select($select[$i], {want => "arrayref", return_fh => 1}); }; +MR::Pending->new( + name => "PENDINGTEST", + maxtime => 1.1, + itertime => 0.01, +)->work; + MR::Pending->new( name => "PENDINGTEST", maxtime => 1.1, @@ -144,67 +176,3 @@ MR::Pending->new( - -__END__ - -space[0].enabled = 1 -space[0].index[0].type = "HASH" -space[0].index[0].unique = 1 -space[0].index[0].key_field[0].fieldno = 0 -space[0].index[0].key_field[0].type = "NUM" -space[0].index[1].type = "HASH" -space[0].index[1].unique = 1 -space[0].index[1].key_field[0].fieldno = 1 -space[0].index[1].key_field[0].type = "STR" - -space[20].enabled = 1 -space[20].index[0].type = "HASH" -space[20].index[0].unique = 1 -space[20].index[0].key_field[0].fieldno = 0 -space[20].index[0].key_field[0].type = "NUM64" - - -space[26].enabled = 1 -space[26].index[0].type = "HASH" -space[26].index[0].unique = 1 -space[26].index[0].key_field[0].fieldno = 0 -space[26].index[0].key_field[0].type = "NUM" -space[26].index[1].type = "TREE" -space[26].index[1].unique = 0 -space[26].index[1].key_field[0].fieldno = 1 -space[26].index[1].key_field[0].type = "STR" -space[26].index[2].type = "TREE" -space[26].index[2].unique = 0 -space[26].index[2].key_field[0].fieldno = 1 -space[26].index[2].key_field[0].type = "STR" -space[26].index[2].key_field[1].fieldno = 2 -space[26].index[2].key_field[1].type = "NUM" - - - -space[27].enabled = 1 -space[27].index[0].type = "HASH" -space[27].index[0].unique = 1 -space[27].index[0].key_field[0].fieldno = 0 -space[27].index[0].key_field[0].type = "NUM" -space[27].index[1].type = "HASH" -space[27].index[1].unique = 1 -space[27].index[1].key_field[0].fieldno = 1 -space[27].index[1].key_field[0].type = "STR" - -space[27].index[2].type = "TREE" -space[27].index[2].unique = 1 -space[27].index[2].key_field[0].fieldno = 2 -space[27].index[2].key_field[0].type = "STR" - -space[27].index[2].type = "TREE" -space[27].index[2].unique = 1 -space[27].index[2].key_field[0].fieldno = 3 -space[27].index[2].key_field[0].type = "STR" - -space[27].index[3].type = "TREE" -space[27].index[3].unique = 1 -space[27].index[3].key_field[0].fieldno = 2 -space[27].index[3].key_field[0].type = "STR" -space[27].index[3].key_field[1].fieldno = 3 -space[27].index[3].key_field[1].type = "STR" diff --git a/connector/perl/t/05-pending.t b/connector/perl/t/05-pending.t new file mode 100644 index 0000000000000000000000000000000000000000..af3da0b645824085d906171a118ea4dceb51a076 --- /dev/null +++ b/connector/perl/t/05-pending.t @@ -0,0 +1,145 @@ +#!/usr/bin/perl + +# Tarantool/Box config below + +use strict; +use warnings; +BEGIN { + sub mPOP::Config::GetValue ($) { + die; + } +} +use FindBin qw($Bin); +use lib "$Bin"; +use lib "$Bin/../lib"; + +use Carp qw/confess/; + +use Test::More tests => 21; +use Test::Exception; + +use List::MoreUtils qw/zip/; + +BEGIN { + use_ok 'Test::Tarantool'; + use_ok 'MR::Pending'; +} + +local $SIG{__DIE__} = \&confess; + +our $CLASS; +BEGIN { $CLASS = $ENV{BOXCLASS} || 'MR::Tarantool::Box'; eval "require $CLASS" or die $@; } + +use constant ILL_PARAM => qr/Error 00000202/; +use constant TUPLE_NOT_EXISTS => qr/Error 00003102/; +use constant TUPLE_EXISTS => qr/Error 00003702/; +use constant INDEX_VIOLATION => qr/Error 00003802/; + +use constant NO_SUCCESS => qr/no success after/; + +use constant TOO_BIG_FIELD => qr/too big field/; + +our $box; + + +our $server; # = shift || $ENV{BOX}; + +my $tarantool_config = "$Bin/data/pending.t.cfg"; +ok -r $tarantool_config, "-r $tarantool_config"; +my $tnt_dir = "$Bin/data"; +ok -d $tnt_dir, "-d $tnt_dir"; +my $tnt_srv; + +SKIP: { + skip 'The test uses external tarantool', 2 if $server; + + $tnt_srv = Test::Tarantool->run( + cfg => $tarantool_config, + script_dir => $tnt_dir + ); + ok $tnt_srv, 'server instance created'; + diag $tnt_srv->log unless ok $tnt_srv->started, 'server is started'; + + $server = sprintf '127.0.0.1:%d', $tnt_srv->primary_port; +} + + +our %opts = ( + debug => $ENV{DEBUG}||0, + ipdebug => $ENV{IPDEBUG}||0, + raise => 1, +); + +sub cleanup ($) { + my ($id) = @_; + die unless defined $id; + ok defined $box->Delete($id), 'delete of possible existing record'; + ok $box->Delete($id) == 0, 'delete of non existing record'; +} + + + +sub def_param { + my $format = shift || 'l& SSLL'; + return { servers => $server, + name => $CLASS, + namespaces => [ { + indexes => [ { + index_name => 'primary_id', + keys => [0], + }, { + index_name => 'primary_email', + keys => [1], + }, ], + namespace => 0, + format => $format, + default_index => 'primary_id', + name => 'main', + } ], + default_space => "main", + %opts, + } +} + +$box = $CLASS->new(def_param('l&SSLL&')); +ok $box->isa($CLASS), 'connect'; + +my $box2 = $CLASS->new(def_param('l&SSLL&')); +ok $box2->isa($CLASS), 'connect'; + + + + +my $tt = [ [1, 'rtokarev@corp.mail.ru', 11, 111, 1111, 11111, "1111111111111"], + [2, 'vostrikov@corp.mail.ru', 22, 222, 2222, 22222, "2222222222222"], + [3, 'aleinikov@corp.mail.ru', 33, 333, 3333, 33333, "3333333333333"], + [4, 'roman.s.tokarev@gmail.com', 44, 444, 4444, 44444, "4444444444444"], + [5, 'vostrIIIkov@corp.mail.ru', 55, 555, 5555, 55555, "5555555555555"] ]; + +foreach my $tuple (@$tt) { + ok eval{ $box->Delete($tuple->[0]); 1 }, "delete$tuple->[0]"; +} + +foreach my $tuple (@$tt) { + ok $box->Insert(@$tuple), "insert$tuple->[0]"; +} + + +is_deeply eval { + $box->Call('tst_lua_test' => [], { unpack_format => '$' }) +}, [['test']], 'lua functions are working'; + + +# note explain +# $box->Select(1, { return_fh => 1 }); + +my $cdelayed = $box->Call( + 'tst_lua_test' => [], { unpack_format => '$', return_fh => 1 } +); + +isa_ok $cdelayed, 'HASH'; + +is_deeply $cdelayed->{continue}->(), [['test']], + 'lua functions are working with return_fh'; + + diff --git a/connector/perl/t/data/init.lua b/connector/perl/t/data/init.lua index 91b47ad977e96a43cb2926a790b009ff3e83fb14..76a0226851a21bd8423a0bf16e9ee2d24ec2a0ea 100644 --- a/connector/perl/t/data/init.lua +++ b/connector/perl/t/data/init.lua @@ -1,3 +1,7 @@ +function tst_lua_test() + return 'test' +end + function tst_server_name() local tuple = box.select( 0, 0, box.pack('i', 1) ) if tuple == nil then diff --git a/connector/perl/t/data/pending.t.cfg b/connector/perl/t/data/pending.t.cfg new file mode 100644 index 0000000000000000000000000000000000000000..1dbbbcbc50edb590d29fef2b46551a7c693d5f50 --- /dev/null +++ b/connector/perl/t/data/pending.t.cfg @@ -0,0 +1,62 @@ +space[0].enabled = 1 +space[0].index[0].type = "HASH" +space[0].index[0].unique = 1 +space[0].index[0].key_field[0].fieldno = 0 +space[0].index[0].key_field[0].type = "NUM" +space[0].index[1].type = "HASH" +space[0].index[1].unique = 1 +space[0].index[1].key_field[0].fieldno = 1 +space[0].index[1].key_field[0].type = "STR" + +space[20].enabled = 1 +space[20].index[0].type = "HASH" +space[20].index[0].unique = 1 +space[20].index[0].key_field[0].fieldno = 0 +space[20].index[0].key_field[0].type = "NUM64" + + +space[26].enabled = 1 +space[26].index[0].type = "HASH" +space[26].index[0].unique = 1 +space[26].index[0].key_field[0].fieldno = 0 +space[26].index[0].key_field[0].type = "NUM" +space[26].index[1].type = "TREE" +space[26].index[1].unique = 0 +space[26].index[1].key_field[0].fieldno = 1 +space[26].index[1].key_field[0].type = "STR" +space[26].index[2].type = "TREE" +space[26].index[2].unique = 0 +space[26].index[2].key_field[0].fieldno = 1 +space[26].index[2].key_field[0].type = "STR" +space[26].index[2].key_field[1].fieldno = 2 +space[26].index[2].key_field[1].type = "NUM" + + + +space[27].enabled = 1 +space[27].index[0].type = "HASH" +space[27].index[0].unique = 1 +space[27].index[0].key_field[0].fieldno = 0 +space[27].index[0].key_field[0].type = "NUM" +space[27].index[1].type = "HASH" +space[27].index[1].unique = 1 +space[27].index[1].key_field[0].fieldno = 1 +space[27].index[1].key_field[0].type = "STR" + +space[27].index[2].type = "TREE" +space[27].index[2].unique = 1 +space[27].index[2].key_field[0].fieldno = 2 +space[27].index[2].key_field[0].type = "STR" + +space[27].index[2].type = "TREE" +space[27].index[2].unique = 1 +space[27].index[2].key_field[0].fieldno = 3 +space[27].index[2].key_field[0].type = "STR" + +space[27].index[3].type = "TREE" +space[27].index[3].unique = 1 +space[27].index[3].key_field[0].fieldno = 2 +space[27].index[3].key_field[0].type = "STR" +space[27].index[3].key_field[1].fieldno = 3 +space[27].index[3].key_field[1].type = "STR" +