diff --git a/src/governor/mod.rs b/src/governor/mod.rs index 82d4c572022a4e050782f860d2c2ca44bcaac5ed..eb3454bcf94d863a8b1d8fab750638b97d2357e3 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -11,6 +11,7 @@ use crate::r#loop::FlowControl::{self, Continue}; use crate::storage::ToEntryIter as _; use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; use crate::tlog; +use crate::traft::error::Error; use crate::traft::network::ConnectionPool; use crate::traft::node::global; use crate::traft::node::Status; @@ -39,23 +40,6 @@ pub(crate) mod migration; pub(crate) use cc::raft_conf_change; pub(crate) use migration::get_pending_migration; -macro_rules! governor_step { - ($desc:literal $([ $($kv:tt)* ])? async { $($body:tt)+ }) => { - tlog!(Info, $desc $(; $($kv)*)?); - #[allow(redundant_semicolons)] - let res: Result<_> = async { - $($body)+; - Ok(()) - } - .await; - if let Err(e) = res { - tlog!(Warning, ::std::concat!("failed ", $desc, ": {}"), e, $(; $($kv)*)?); - event::wait_timeout(Event::TopologyChanged, Loop::RETRY_TIMEOUT).unwrap(); - return Continue; - } - } -} - impl Loop { const SYNC_TIMEOUT: Duration = Duration::from_secs(10); const RETRY_TIMEOUT: Duration = Duration::from_millis(250); @@ -65,7 +49,11 @@ impl Loop { storage, raft_storage, }: &Args, - State { status, pool }: &mut State, + State { + status, + waker, + pool, + }: &mut State, ) -> FlowControl { if !status.get().raft_state.is_leader() { status.changed().await.unwrap(); @@ -108,12 +96,28 @@ impl Loop { let plan = unwrap_ok_or!(plan, Err(e) => { tlog!(Warning, "failed constructing an action plan: {e}"); - // TODO don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Loop::RETRY_TIMEOUT).unwrap(); + _ = waker.changed().timeout(Loop::RETRY_TIMEOUT).await; return Continue; } ); + macro_rules! governor_step { + ($desc:literal $([ $($kv:tt)* ])? async { $($body:tt)+ }) => { + tlog!(Info, $desc $(; $($kv)*)?); + #[allow(redundant_semicolons)] + let res: Result<_> = async { + $($body)+; + Ok(()) + } + .await; + if let Err(e) = res { + tlog!(Warning, ::std::concat!("failed ", $desc, ": {}"), e, $(; $($kv)*)?); + _ = waker.changed().timeout(Loop::RETRY_TIMEOUT).await; + return Continue; + } + } + } + match plan { Plan::ConfChange(ConfChange { conf_change }) => { // main_loop gives the warranty that every ProposeConfChange @@ -130,7 +134,7 @@ impl Loop { Plan::TransferLeadership(TransferLeadership { to }) => { tlog!(Info, "transferring leadership to {}", to.instance_id); node.transfer_leadership_and_yield(to.raft_id); - event::wait_timeout(Event::TopologyChanged, Loop::RETRY_TIMEOUT).unwrap(); + _ = waker.changed().timeout(Loop::RETRY_TIMEOUT).await; } Plan::TransferMastership(TransferMastership { to, rpc, op }) => { @@ -448,8 +452,7 @@ impl Loop { Plan::None => { tlog!(Info, "nothing to do, waiting for events to handle"); - event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged]) - .expect("Events system must be initialized"); + _ = waker.changed().await; } } @@ -782,8 +785,11 @@ impl Loop { raft_storage, }; + let (waker_tx, waker_rx) = watch::channel(()); + let state = State { status, + waker: waker_rx, pool: ConnectionPool::builder(args.storage.clone()) .call_timeout(Duration::from_secs(1)) .connect_timeout(Duration::from_millis(500)) @@ -793,12 +799,26 @@ impl Loop { Self { _loop: crate::loop_start!("governor_loop", Self::iter_fn, args, state), + waker: waker_tx, } } + + pub fn wakeup(&self) -> Result<()> { + self.waker.send(()).map_err(|_| Error::GovernorStopped) + } + + pub async fn awoken(&self) -> Result<()> { + self.waker + .subscribe() + .changed() + .await + .map_err(|_| Error::GovernorStopped) + } } pub struct Loop { _loop: Option<fiber::UnitJoinHandle<'static>>, + waker: watch::Sender<()>, } struct Args { @@ -808,6 +828,7 @@ struct Args { struct State { status: watch::Receiver<Status>, + waker: watch::Receiver<()>, pool: ConnectionPool, } diff --git a/src/main.rs b/src/main.rs index 06676982715a66bdd03fff6ca998d7381d23b71a..d160160ce3501a490842646f3390471d7471c884 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1020,7 +1020,7 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces box_cfg.listen = Some(args.listen.clone()); tarantool::set_cfg(&box_cfg); - if let Err(e) = tarantool::on_shutdown(on_shutdown::callback) { + if let Err(e) = tarantool::on_shutdown(|| fiber::block_on(on_shutdown::callback())) { tlog!(Error, "failed setting on_shutdown trigger: {e}"); } diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs index ad52e693e80c7d801b80081dc8a5902861c9fe9a..433e9d512b9fd5a073e1527283b674577a099a5e 100644 --- a/src/on_shutdown.rs +++ b/src/on_shutdown.rs @@ -5,7 +5,6 @@ use ::tarantool::fiber; use crate::tlog; use crate::traft; use crate::traft::error::Error; -use crate::traft::event; use crate::traft::node; use crate::traft::rpc; use crate::traft::rpc::update_instance; @@ -13,7 +12,7 @@ use crate::traft::CurrentGradeVariant; use crate::traft::TargetGradeVariant; use crate::unwrap_ok_or; -pub fn callback() { +pub async fn callback() { // 1. Try setting target grade Offline in a separate fiber tlog!(Info, "trying to shutdown gracefully ..."); let go_offline = fiber::Builder::new() @@ -65,7 +64,9 @@ pub fn callback() { break; } - let Ok(()) = event::wait(event::Event::TopologyChanged) else { break }; + if let Err(e) = node.governor_loop.awoken().await { + tlog!(Warning, "failed to shutdown gracefully: {e}"); + } } } diff --git a/src/traft/error.rs b/src/traft/error.rs index 64009cc365718d990f605b13f4249e2b88935832..d9a00be7620714cb5c3a1fee6e141b6ba5da891d 100644 --- a/src/traft/error.rs +++ b/src/traft/error.rs @@ -54,6 +54,10 @@ pub enum Error { AddressUnknownForInstanceId(InstanceId), #[error("leader is uknown yet")] LeaderUnknown, + + #[error("governor has stopped")] + GovernorStopped, + #[error("other error: {0}")] Other(Box<dyn std::error::Error>), } diff --git a/src/traft/event.rs b/src/traft/event.rs index ffad2d4b7e68db8317edfc519391a7893be9cba1..3ac54274438b532ffc79e3b914135eabd928fcd2 100644 --- a/src/traft/event.rs +++ b/src/traft/event.rs @@ -24,10 +24,8 @@ pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>; JointStateEnter = "raft.joint-state-enter", JointStateLeave = "raft.joint-state-leave", JointStateDrop = "raft.joint-state-drop", - TopologyChanged = "raft.topology-changed", RaftLoopNeeded = "raft.loop-needed", RaftEntryApplied = "raft.entry-applied", - ClusterStateChanged = "picodata.cluster-state-updated", MigrateDone = "picodata.migrate-done", } } diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 29afcf060dd6a58f3fb132ef35304d7ead2faed9..e0a438b856c8b91072dbbad77634b14904bd085b 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -33,8 +33,6 @@ pub use rpc::sharding::cfg::Weight; pub use rpc::{join, update_instance}; pub use topology::Topology; -use self::event::Event; - pub type RaftId = u64; pub type RaftTerm = u64; pub type RaftIndex = u64; @@ -183,12 +181,7 @@ impl Op { instances.put(&instance).unwrap(); instance } - Self::Dml(op) => { - if op.space() == &ClusterwideSpace::Property { - event::broadcast(Event::ClusterStateChanged); - } - Box::new(op.result()) - } + Self::Dml(op) => Box::new(op.result()), } } } diff --git a/src/traft/node.rs b/src/traft/node.rs index dbe775dd9c5fe1510878e0f638e7d14c3662d02b..140462b2b225d363060f57d26b840473b41da7b4 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -124,7 +124,7 @@ pub struct Node { pub(crate) storage: Clusterwide, pub(crate) raft_storage: RaftSpaceAccess, main_loop: MainLoop, - _governor_loop: governor::Loop, + pub(crate) governor_loop: governor::Loop, status: watch::Receiver<Status>, } @@ -150,7 +150,7 @@ impl Node { let node = Node { raft_id, main_loop: MainLoop::start(node_impl.clone()), // yields - _governor_loop: governor::Loop::start( + governor_loop: governor::Loop::start( status.clone(), storage.clone(), raft_storage.clone(), @@ -606,7 +606,7 @@ impl NodeImpl { fn handle_committed_entries( &mut self, entries: &[raft::Entry], - topology_changed: &mut bool, + wake_governor: &mut bool, expelled: &mut bool, ) { for entry in entries { @@ -620,7 +620,7 @@ impl NodeImpl { match entry.entry_type { raft::EntryType::EntryNormal => { - self.handle_committed_normal_entry(entry, topology_changed, expelled) + self.handle_committed_normal_entry(entry, wake_governor, expelled) } raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => { self.handle_committed_conf_change(entry) @@ -645,7 +645,7 @@ impl NodeImpl { fn handle_committed_normal_entry( &mut self, entry: traft::Entry, - topology_changed: &mut bool, + wake_governor: &mut bool, expelled: &mut bool, ) { assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); @@ -653,14 +653,25 @@ impl NodeImpl { let index = entry.index; let op = entry.into_op().unwrap_or(traft::Op::Nop); - if let traft::Op::PersistInstance(OpPersistInstance(instance)) = &op { - *topology_changed = true; - if instance.current_grade == CurrentGradeVariant::Expelled - && instance.raft_id == self.raft_id() + match &op { + traft::Op::PersistInstance(OpPersistInstance(instance)) => { + *wake_governor = true; + if instance.current_grade == CurrentGradeVariant::Expelled + && instance.raft_id == self.raft_id() + { + // cannot exit during a transaction + *expelled = true; + } + } + traft::Op::Dml(op) + if matches!( + op.space(), + ClusterwideSpace::Property | ClusterwideSpace::Replicaset + ) => { - // cannot exit during a transaction - *expelled = true; + *wake_governor = true; } + _ => {} } // apply the operation @@ -783,7 +794,7 @@ impl NodeImpl { /// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49> /// /// This function yields. - fn advance(&mut self, topology_changed: &mut bool, expelled: &mut bool) { + fn advance(&mut self, wake_governor: &mut bool, expelled: &mut bool) { // Get the `Ready` with `RawNode::ready` interface. if !self.raw_node.has_ready() { return; @@ -815,7 +826,7 @@ impl NodeImpl { if let Err(e) = start_transaction(|| -> Result<(), TransactionError> { // Apply committed entries. - self.handle_committed_entries(ready.committed_entries(), topology_changed, expelled); + self.handle_committed_entries(ready.committed_entries(), wake_governor, expelled); // Persist uncommitted entries in the raft log. self.raft_storage.persist_entries(ready.entries()).unwrap(); @@ -852,7 +863,7 @@ impl NodeImpl { } // Apply committed entries. - self.handle_committed_entries(light_rd.committed_entries(), topology_changed, expelled); + self.handle_committed_entries(light_rd.committed_entries(), wake_governor, expelled); Ok(()) }) { @@ -946,9 +957,9 @@ impl MainLoop { node_impl.raw_node.tick(); } - let mut topology_changed = false; + let mut wake_governor = false; let mut expelled = false; - node_impl.advance(&mut topology_changed, &mut expelled); // yields + node_impl.advance(&mut wake_governor, &mut expelled); // yields if state.stop_flag.take() { return FlowControl::Break; } @@ -957,8 +968,10 @@ impl MainLoop { crate::tarantool::exit(0); } - if topology_changed { - event::broadcast(Event::TopologyChanged); + if wake_governor { + if let Err(e) = async { global()?.governor_loop.wakeup() }.await { + tlog!(Warning, "failed waking up governor: {e}"); + } } FlowControl::Continue