diff --git a/src/governor/mod.rs b/src/governor/mod.rs index ab8311e6b90ef1cc111652b5f8c7976f342a5e2a..7a93abd2ded7a0134109df97b61891e4e0d5fcb1 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::ops::ControlFlow; use std::rc::Rc; use std::time::Duration; @@ -8,7 +9,6 @@ use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; use ::tarantool::fiber::r#async::watch; use crate::op::Op; -use crate::r#loop::FlowControl::{self, Continue}; use crate::rpc; use crate::rpc::update_instance::handle_update_instance_request_and_wait; use crate::storage::Clusterwide; @@ -45,11 +45,11 @@ impl Loop { waker, pool, }: &mut State, - ) -> FlowControl { + ) -> ControlFlow<()> { if !raft_status.get().raft_state.is_leader() { set_status(governor_status, "not a leader"); raft_status.changed().await.unwrap(); - return Continue; + return ControlFlow::Continue(()); } let instances = storage @@ -126,7 +126,7 @@ impl Loop { tlog!(Warning, "failed constructing an action plan: {e}"); waker.mark_seen(); _ = waker.changed().timeout(Loop::RETRY_TIMEOUT).await; - return Continue; + return ControlFlow::Continue(()); } ); @@ -143,7 +143,7 @@ impl Loop { tlog!(Warning, ::std::concat!("failed ", $desc, ": {}"), e, $(; $($kv)*)?); waker.mark_seen(); _ = waker.changed().timeout(Loop::RETRY_TIMEOUT).await; - return Continue; + return ControlFlow::Continue(()); } } } @@ -546,7 +546,7 @@ impl Loop { } } - Continue + ControlFlow::Continue(()) } pub fn start( diff --git a/src/loop.rs b/src/loop.rs index 2ba4256c3454abdf0453d1070ed3f2e98b66ffaf..985300d803c9d95355d44f52afd7ba88f25b8feb 100644 --- a/src/loop.rs +++ b/src/loop.rs @@ -1,8 +1,3 @@ -pub enum FlowControl { - Continue, - Break, -} - /// Creates the fiber and schedules it for execution. Doesn't yield. #[macro_export] macro_rules! loop_start { @@ -15,8 +10,8 @@ macro_rules! loop_start { let iter_fn = $fn; loop { match iter_fn(&mut state).await { - $crate::r#loop::FlowControl::Continue => continue, - $crate::r#loop::FlowControl::Break => break, + std::ops::ControlFlow::Continue(()) => continue, + std::ops::ControlFlow::Break(()) => break, }; } }) diff --git a/src/sentinel.rs b/src/sentinel.rs index a589f5b240e0f8292dec4c7370acfca3b36fbcae..0ba3322e86414cb9420e3e9d86f69fa7b9a62786 100644 --- a/src/sentinel.rs +++ b/src/sentinel.rs @@ -1,6 +1,5 @@ use crate::has_grades; use crate::instance::GradeVariant::*; -use crate::r#loop::FlowControl::{self, Break, Continue}; use crate::reachability::InstanceReachabilityManagerRef; use crate::rpc; use crate::storage::Clusterwide; @@ -11,6 +10,7 @@ use crate::traft::{node, RaftSpaceAccess}; use ::tarantool::fiber; use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; use ::tarantool::fiber::r#async::watch; +use std::ops::ControlFlow; use std::rc::Rc; use std::time::Duration; @@ -33,11 +33,11 @@ impl Loop { status, instance_reachability, }: &mut State, - ) -> FlowControl { + ) -> ControlFlow<()> { if status.get() == SentinelStatus::Initial || node::global().is_err() { tlog!(Info, "waiting until initialized..."); _ = status.changed().timeout(Self::SENTINEL_LONG_SLEEP).await; - return Continue; + return ControlFlow::Continue(()); } let node = node::global().expect("just checked it's ok"); @@ -53,7 +53,7 @@ impl Loop { // and we truncate _pico_instance (read uncommitted btw). // In this case we also just wait some more. _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await; - return Continue; + return ControlFlow::Continue(()); }; let req = rpc::update_instance::Request::new(instance.instance_id, cluster_id) @@ -72,7 +72,7 @@ impl Loop { } .await; match res { - Ok(_) => return Break, + Ok(_) => return ControlFlow::Break(()), Err(e) => { tlog!(Warning, "failed setting own target grade Offline: {e}, retrying ..."; @@ -101,7 +101,7 @@ impl Loop { } let Some(instance) = instance_to_downgrade else { _ = status.changed().timeout(Self::SENTINEL_LONG_SLEEP).await; - return Continue; + return ControlFlow::Continue(()); }; tlog!(Info, "setting target grade Offline"; "instance_id" => %instance.instance_id); @@ -123,7 +123,7 @@ impl Loop { } _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await; - return Continue; + return ControlFlow::Continue(()); } //////////////////////////////////////////////////////////////////////// @@ -135,7 +135,7 @@ impl Loop { // and we truncate _pico_instance (read uncommitted btw). // In this case we also just wait some more. _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await; - return Continue; + return ControlFlow::Continue(()); }; if has_grades!(instance, * -> Offline) { @@ -160,11 +160,11 @@ impl Loop { } _ = status.changed().timeout(Self::SENTINEL_SHORT_RETRY).await; - return Continue; + return ControlFlow::Continue(()); } _ = status.changed().timeout(Self::SENTINEL_LONG_SLEEP).await; - return Continue; + return ControlFlow::Continue(()); } pub fn start( diff --git a/src/traft/node.rs b/src/traft/node.rs index 6a53bd9216f748756ef89d88d499d80db8d3be4d..d9fc15016316834260a791e4fb37f29701330ec2 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -11,7 +11,6 @@ use crate::has_grades; use crate::instance::Instance; use crate::kvcell::KVCell; use crate::loop_start; -use crate::r#loop::FlowControl; use crate::reachability::instance_reachability_manager; use crate::reachability::InstanceReachabilityManagerRef; use crate::rpc; @@ -68,6 +67,7 @@ use protobuf::Message as _; use std::cell::Cell; use std::collections::HashMap; use std::convert::TryFrom; +use std::ops::ControlFlow; use std::rc::Rc; use std::time::Duration; use ApplyEntryResult::*; @@ -1924,16 +1924,16 @@ impl MainLoop { let _ = self.loop_waker.send(()); } - async fn iter_fn(state: &mut MainLoopState) -> FlowControl { + async fn iter_fn(state: &mut MainLoopState) -> ControlFlow<()> { let _ = state.loop_waker.changed().timeout(Self::TICK).await; if state.stop_flag.take() { - return FlowControl::Break; + return ControlFlow::Break(()); } // FIXME: potential deadlock - can't use sync mutex in async fn let mut node_impl = state.node_impl.lock(); // yields if state.stop_flag.take() { - return FlowControl::Break; + return ControlFlow::Break(()); } node_impl.cleanup_notifications(); @@ -1947,7 +1947,7 @@ impl MainLoop { let res = node_impl.advance(); // yields drop(node_impl); if state.stop_flag.take() { - return FlowControl::Break; + return ControlFlow::Break(()); } match res { @@ -1961,7 +1961,7 @@ impl MainLoop { Ok(()) => {} } - FlowControl::Continue + ControlFlow::Continue(()) } }