diff --git a/src/traft/node.rs b/src/traft/node.rs index e28f1992866ce3dbe6cade541cb659194d4eb993..3c6e56f0bea2ab476172f4d35f79e7d7f198514e 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -14,6 +14,7 @@ use ::tarantool::fiber; use ::tarantool::proc; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; +use std::any::type_name; use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; @@ -43,9 +44,76 @@ use crate::traft::{JoinRequest, JoinResponse}; use super::OpResult; type RawNode = raft::RawNode<Storage>; -type Notify = fiber::Channel<Result<Box<dyn Any>, RaftError>>; type TopologyMailbox = Mailbox<(TopologyRequest, Notify)>; +#[derive(Clone)] +struct Notify { + ch: fiber::Channel<Result<Box<dyn Any>, Error>>, +} + +impl Notify { + fn new() -> Self { + Self { + ch: fiber::Channel::new(1), + } + } + + fn notify_ok_any(&self, res: Box<dyn Any>) { + self.ch.try_send(Ok(res)).ok(); + } + + fn notify_ok<T: Any>(&self, res: T) { + println!("notify_ok {}", type_name::<T>()); + self.notify_ok_any(Box::new(res)); + } + + fn notify_err<E: Into<Error>>(&self, err: E) { + self.ch.try_send(Err(err.into())).ok(); + } + + fn recv_any(self) -> Result<Box<dyn Any>, Error> { + match self.ch.recv() { + Some(v) => v, + None => { + self.ch.close(); + Err(Error::Timeout) + } + } + } + + fn recv_timeout_any(self, timeout: Duration) -> Result<Box<dyn Any>, Error> { + match self.ch.recv_timeout(timeout) { + Ok(v) => v, + Err(_) => { + self.ch.close(); + Err(Error::Timeout) + } + } + } + + fn recv_timeout<T: 'static>(self, timeout: Duration) -> Result<T, Error> { + let any: Box<dyn Any> = self.recv_timeout_any(timeout)?; + let boxed: Box<T> = any.downcast().map_err(|_| Error::DowncastError)?; + Ok(*boxed) + } + + fn recv<T: 'static>(self) -> Result<T, Error> { + let any: Box<dyn Any> = self.recv_any()?; + let boxed: Box<T> = any.downcast().map_err(|_| Error::DowncastError)?; + Ok(*boxed) + } + + fn is_closed(&self) -> bool { + self.ch.is_closed() + } +} + +impl std::fmt::Debug for Notify { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("Notify").finish_non_exhaustive() + } +} + #[derive(Debug, Error)] pub enum Error { #[error("uninitialized yet")] @@ -182,19 +250,10 @@ impl Node { } pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> { - let (rx, tx) = Notify::new(1).into_clones(); - + let (rx, tx) = Notify::new().into_clones(); self.main_inbox .send(NormalRequest::ReadIndex { notify: tx }); - - match rx.recv_timeout(timeout) { - Ok(Ok(v)) => v.downcast().map(|v| *v).map_err(|_| Error::DowncastError), - Ok(Err(e)) => Err(e.into()), - Err(_) => { - rx.close(); - Err(Error::Timeout) - } - } + rx.recv_timeout(timeout) } pub fn propose<T: OpResult + Into<traft::Op>>( @@ -202,28 +261,19 @@ impl Node { op: T, timeout: Duration, ) -> Result<T::Result, Error> { - let (rx, tx) = fiber::Channel::new(1).into_clones(); - + let (rx, tx) = Notify::new().into_clones(); self.main_inbox.send(NormalRequest::ProposeNormal { op: op.into(), notify: tx, }); - - match rx.recv_timeout(timeout) { - Ok(Ok(v)) => v.downcast().map(|v| *v).map_err(|_| Error::DowncastError), - Ok(Err(e)) => Err(e.into()), - Err(_) => { - rx.close(); - Err(Error::Timeout) - } - } + rx.recv_timeout(timeout) } pub fn campaign(&self) { - let (rx, tx) = fiber::Channel::new(1).into_clones(); + let (rx, tx) = Notify::new().into_clones(); let req = NormalRequest::Campaign { notify: tx }; self.main_inbox.send(req); - if let Some(Err(e)) = rx.recv() { + if let Err(e) = rx.recv_any() { tlog!(Error, "{e}"); } } @@ -234,13 +284,13 @@ impl Node { } pub fn tick(&self, n_times: u32) { - let (rx, tx) = fiber::Channel::new(1).into_clones(); + let (rx, tx) = Notify::new().into_clones(); let req = NormalRequest::Tick { n_times, notify: tx, }; self.main_inbox.send(req); - rx.recv(); + rx.recv_any().ok(); } pub fn timeout_now(&self) { @@ -251,14 +301,10 @@ impl Node { } pub fn change_topology(&self, req: impl Into<TopologyRequest>) -> Result<traft::RaftId, Error> { - let (rx, tx) = fiber::Channel::new(1).into_clones(); + let (rx, tx) = Notify::new().into_clones(); self.join_inbox.send((req.into(), tx)); - match rx.recv() { - Some(Ok(v)) => v.downcast().map(|v| *v).map_err(|_| Error::DowncastError), - Some(Err(e)) => Err(e.into()), - None => unreachable!(), - } + rx.recv() } } @@ -306,7 +352,7 @@ fn raft_main_loop( let ctx = traft::EntryContextNormal { lc: lc.clone(), op }.to_bytes(); if let Err(e) = raw_node.propose(ctx, vec![]) { - notify.try_send(Err(e)).expect("that's a bug"); + notify.notify_err(e); } else { notifications.insert(lc.clone(), notify); } @@ -345,7 +391,7 @@ fn raft_main_loop( if let Some(e) = reason { let e = RaftError::ConfChangeError(e.into()); - notify.try_send(Err(e)).expect("that's a bug"); + notify.notify_err(e); continue; } @@ -392,7 +438,7 @@ fn raft_main_loop( let prev_index = raw_node.raft.raft_log.last_index(); if let Err(e) = raw_node.propose_conf_change(ctx, cc) { - notify.try_send(Err(e)).expect("that's a bug"); + notify.notify_err(e); } else { // oops, current instance isn't actually a leader // (which is impossible in theory, but we're not @@ -422,12 +468,10 @@ fn raft_main_loop( raw_node.read_index(ctx); notifications.insert(lc.clone(), notify); } - NormalRequest::Campaign { notify } => { - let res = raw_node.campaign().map(|_| 0); - notify - .try_send(res.map(|_| Box::new(()) as Box<dyn Any>)) - .expect("that's a bug"); - } + NormalRequest::Campaign { notify } => match raw_node.campaign() { + Ok(()) => notify.notify_ok(()), + Err(e) => notify.notify_err(e), + }, NormalRequest::Step(msg) => { if let Err(e) = raw_node.step(msg) { tlog!(Error, "{e}"); @@ -437,9 +481,7 @@ fn raft_main_loop( for _ in 0..n_times { raw_node.tick(); } - notify - .try_send(Ok(Box::new(()) as Box<dyn Any>)) - .expect("that's a bug"); + notify.notify_ok(()); } } } @@ -466,7 +508,7 @@ fn raft_main_loop( .expect("Abnormal entry in message context") { if let Some(notify) = notifications.remove(&ctx.lc) { - notify.try_send(Ok(Box::new(rs.index) as Box<dyn Any>)).ok(); + notify.notify_ok(rs.index); } } } @@ -498,9 +540,7 @@ fn raft_main_loop( if let Some(lc) = entry.lc() { if let Some(notify) = notifications.remove(lc) { - // The notification may already have timed out. - // Don't panic. Just ignore `try_send` error. - notify.try_send(Ok(result)).ok(); + notify.notify_ok_any(result); } } @@ -511,9 +551,7 @@ fn raft_main_loop( // a re-election. let e = RaftError::ConfChangeError("ignored".into()); - // The `raft_join_loop` waits forever and never closes - // the notification channel. Panic if `try_send` fails. - latch.notify.try_send(Err(e)).expect("that's a bug"); + latch.notify.notify_err(e); } *joint_state_latch = None; } @@ -548,10 +586,7 @@ fn raft_main_loop( // Unlock the latch only when leaving the joint state if cc.changes.is_empty() { if let Some(latch) = joint_state_latch { - latch - .notify - .try_send(Ok(Box::new(entry.index) as Box<dyn Any>)) - .expect("that's a bug"); + latch.notify.notify_ok(entry.index); *joint_state_latch = None; *config_changed = true; } @@ -686,7 +721,7 @@ fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) { Err(e) => { for (_, notify) in batch { let e = RaftError::ConfChangeError(format!("{e}")); - notify.try_send(Err(e)).ok(); + notify.notify_err(e); } continue; } @@ -701,7 +736,7 @@ fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) { } Err(e) => { let e = RaftError::ConfChangeError(e); - notify.try_send(Err(e)).expect("that's a bug"); + notify.notify_err(e); } } } @@ -718,7 +753,7 @@ fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) { } tlog!(Info, "processing batch: {ids:?}"); - let (rx, tx) = fiber::Channel::new(1).into_clones(); + let (rx, tx) = Notify::new().into_clones(); main_inbox.send(NormalRequest::ProposeConfChange { term, peers: topology_diff, @@ -730,16 +765,16 @@ fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) { // will sometimes be handled and there's no need in timeout. // It also guarantees that the notification will arrive only // after the node leaves the joint state. - let res = rx.recv().expect("that's a bug"); + let res = rx.recv::<u64>(); tlog!(Info, "batch processed: {ids:?}, {res:?}"); for (notify, peer) in topology_results { match &res { - Ok(_) => notify.try_send(Ok(Box::new(peer.raft_id))).ok(), + Ok(_) => notify.notify_ok(peer.raft_id), Err(e) => { // RaftError doesn't implement the Clone trait, // so we have to be creative. let e = RaftError::ConfChangeError(format!("{e}")); - notify.try_send(Err(e)).ok() + notify.notify_err(e); } }; }