diff --git a/connector/perl/lib/MR/Pending.pm b/connector/perl/lib/MR/Pending.pm index e270472d89a526d9894c37696343f93f0764b37c..c8ed252642dc0a685a25060008dde5783aff009e 100644 --- a/connector/perl/lib/MR/Pending.pm +++ b/connector/perl/lib/MR/Pending.pm @@ -1,6 +1,38 @@ +=head1 NAME + +MR::Pending - watcher for some requests results + + +=head1 SYNOPSIS + + my $pnd = MR::Pending->new( + maxtime => 10, + itertime => 0.1, + secondary_itertime => 0.01, + name => 'My waiter', + + onidle => sub { ... }, + + pending => [ ... ] + ); + + + $pnd->work; + +=cut + package MR::Pending; use Mouse; use Time::HiRes qw/time/; +use Data::Dumper; + +=head1 ATTRIBUTES + +=head2 maxtime + +Timeout for all requests. + +=cut has maxtime => ( is => 'rw', @@ -9,6 +41,13 @@ has maxtime => ( default => 6.0, ); +=head2 itertime + +One iteration time. If all requests have no data, L<onidle> will be called +with the time. + +=cut + has itertime => ( is => 'rw', isa => 'Num', @@ -16,12 +55,41 @@ has itertime => ( default => 0.1, ); + +=head2 secondary_itertime + +Module will do secondary requests (L<onsecondary_retry>) if the first request +have no data after te timeout. + +=cut + +has secondary_itertime => ( + is => 'rw', + isa => 'Num', + predicate => '_has_secondary_itertime', + default => 0.01, +); + + +=head2 name + +Name of pending instance (for debug messages) + +=cut + has name => ( is => 'rw', isa => 'Str', required => 1, ); + +=head2 onidle + +callback. will be called for each iteration if there are no data from servers. + +=cut + has onidle => ( is => 'rw', isa => 'CodeRef', @@ -34,6 +102,13 @@ has _pending => ( default => sub { {} }, ); + +=head2 exceptions + +count of exceptions in callbacks + +=cut + has exceptions => ( is => 'rw', isa => 'Int', @@ -51,6 +126,17 @@ has _waitresult => ( isa => 'ArrayRef', ); + +=head1 METHODS + + +=head2 new + +Constructor. receives one additionall argiments: B<pending> that can contain +array of pending requests (L<MR::Pending::Item>). + +=cut + around BUILDARGS => sub { my $orig = shift; my $class = shift; @@ -71,6 +157,13 @@ sub runcatch { return $ret; } + +=head2 add(@pends) + +add pending requests (L<MR::Pending::Item>) + +=cut + sub add { my ($self, @p) = @_; my $p = $self->_pending; @@ -81,6 +174,13 @@ sub add { return $self; } + +=head2 remove(@pends) + +remove pending requests (L<MR::Pending::Item>) + +=cut + sub remove { my ($self, @p) = @_; my $p = $self->_pending; @@ -98,7 +198,8 @@ sub send { my $pend = $pending->{$shard}; if ($pend->try < $pend->retry) { next unless $pend->is_timeout; - $pend->set_pending_mode(scalar $self->runcatch($pend->onretry, ($pend->id, $pend, $self))); + my $cont = $self->runcatch($pend->onretry, ($pend->id, $pend, $self)); + $pend->set_pending_mode($cont); } else { delete $pending->{$shard}; $self->runcatch($pend->onerror, ($pend->id, "no success after @{[$pend->try]} retries", $pend, $self)); @@ -195,6 +296,13 @@ sub iter { return 1; } + +=head2 work + +do all pending requests, wait their results or timeout (L<maxtime>) + +=cut + sub work { my ($self) = @_; @@ -212,7 +320,9 @@ sub check_exceptions { my ($self, $raise) = @_; my $e = $self->_exceptions; return unless $e && @$e; - my $str = "$$: PENDING EXCEPTIONS BEGIN\n".join("\n$$:###################\n", @$e)."$$: PENDING EXCEPTIONS END"; + my $str = "$$: PENDING EXCEPTIONS BEGIN\n" + . join("\n$$:###################\n", @$e) + . "$$: PENDING EXCEPTIONS END"; die $str if $raise; warn $str if defined $raise; return $str; @@ -222,6 +332,14 @@ no Mouse; __PACKAGE__->meta->make_immutable(); +=head1 MR::Pending::Item + +one pending task + + +=head1 ATTRIBUTES + +=cut package MR::Pending::Item; @@ -229,6 +347,13 @@ use Mouse; use Time::HiRes qw/time/; use Carp; + +=head2 id + +unique id for the task + +=cut + has id => ( is => 'ro', isa => 'Str', @@ -242,6 +367,13 @@ has $_ => ( required => 1, ) for qw/onok onerror onretry/; +has $_ => ( + is => 'ro', + isa => 'CodeRef', + predicate => "_has_$_", + default => sub { sub {} }, +) for qw{onsecondary_retry}; + has $_ => ( is => 'rw', isa => 'Num', diff --git a/connector/perl/t/04-pending.t b/connector/perl/t/04-pending.t index 24802a139ab75076994dc28f980d14138716b3e8..1fd365b4856774d54e7c3898461c3c95e41f8a9e 100644 --- a/connector/perl/t/04-pending.t +++ b/connector/perl/t/04-pending.t @@ -117,6 +117,7 @@ foreach my $tuple (@$tt) { } my @box = ($box, $box2); + my @select = ([1,2,3],[2,3,4]); my $onok = sub { @@ -137,6 +138,12 @@ my $ontry = sub { return $box[$i]->Select($select[$i], {want => "arrayref", return_fh => 1}); }; +MR::Pending->new( + name => "PENDINGTEST", + maxtime => 1.1, + itertime => 0.01, +)->work; + MR::Pending->new( name => "PENDINGTEST", maxtime => 1.1, @@ -169,5 +176,3 @@ MR::Pending->new( - -