diff --git a/src/main.rs b/src/main.rs index b7a1689e06c8161149995836a3329ab328cff80f..851940cbe33cd4d2415782d884eb95b101763428 100644 --- a/src/main.rs +++ b/src/main.rs @@ -357,6 +357,7 @@ fn init_common(args: &args::Run, cfg: &tarantool::Cfg) { traft::Storage::init_schema(); init_handlers(); + traft::event::init(); } fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { diff --git a/src/traft/event.rs b/src/traft/event.rs new file mode 100644 index 0000000000000000000000000000000000000000..a74e12218a6ec7b7ce718b955f47f0c21e8659f5 --- /dev/null +++ b/src/traft/event.rs @@ -0,0 +1,266 @@ +use std::borrow::Borrow; +use std::collections::{HashMap, LinkedList}; +use std::fmt::Write; +use std::rc::Rc; +use std::str::FromStr; +use std::time::Duration; + +use ::tarantool::fiber::{mutex::MutexGuard, Cond, Mutex}; +use ::tarantool::proc; +use ::tarantool::unwrap_or; + +use crate::traft::node::Error; +use crate::unwrap_ok_or; +use thiserror::Error; + +pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>; + +#[derive(Error, Debug)] +#[error("unknown event")] +pub struct EventFromStrError; + +macro_rules! define_events { + ($($event:tt, $str:literal;)+) => { + //////////////////////////////////////////////////////////////////////// + /// An enumeration of builtin events + #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, PartialOrd, Ord)] + pub enum Event { + $( $event, )+ + } + + impl Event { + pub const fn as_str(&self) -> &str { + match self { + $( Self::$event => $str, )+ + } + } + } + + impl std::fmt::Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str(self.as_str()) + } + } + + impl FromStr for Event { + type Err = EventFromStrError; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + $( $str => Ok(Self::$event), )+ + _ => Err(EventFromStrError), + } + } + } + } +} + +define_events! { + StatusChanged, "raft.status-changed"; + TopologyChanged, "raft.topology-changed"; +} + +//////////////////////////////////////////////////////////////////////////////// +/// Struct that stores information about event handlers +#[derive(Default)] +pub struct Events { + handlers: HashMap<Event, Handler>, + conds: HashMap<Event, Rc<Cond>>, +} + +impl Events { + // TODO: query/modify registered handlers + fn once_handler(&mut self, event: &Event) -> Option<Handler> { + self.handlers.remove(event) + } + + #[allow(dead_code)] + fn add_once_handler(&mut self, event: Event, handler: Handler) { + self.handlers.insert(event, handler); + } + + /// Signals to everybody who's waiting for this repeated `event`. + /// + /// **does not yield** + fn broadcast_repeated(&self, event: &Event) { + if let Some(cond) = self.conds.get(event) { + cond.broadcast() + } + } + + /// Returns a [`Cond`] which will be signalled every time the given `event` + /// occurs. Can be used to wait for a repeated event. + /// + /// [`Events`] should always be accessed via [`MutexGuard`], therefore this + /// function returns a `Rc<Cond>` rather then waiting on it, so that the + /// mutex guard can be released before the fiber yields. + /// + /// **does not yield** + fn regular_cond(&mut self, event: Event) -> Rc<Cond> { + self.conds + .entry(event) + .or_insert_with(|| Rc::new(Cond::new())) + .clone() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// functions + +#[allow(dead_code)] +/// Waits for the event to happen or timeout to end. +/// +/// Returns an error if the `EVENTS` is uninitialized. +pub fn wait_timeout(event: Event, timeout: Duration) -> Result<(), Error> { + let mut events = events()?; + let cond = events.regular_cond(event); + // events must be released before yielding + drop(events); + cond.wait_timeout(timeout); + Ok(()) +} + +/// Waits for the event to happen. +/// +/// Returns an error if the `EVENTS` is uninitialized. +pub fn wait(event: Event) -> Result<(), Error> { + wait_timeout(event, Duration::MAX) +} + +#[allow(dead_code)] +/// Waits for any of the specified events to happen. +/// +/// Returns an error if the `EVENTS` is uninitialized. +pub fn wait_any_timeout(evs: &[Event], timeout: Duration) -> Result<(), Error> { + let mut events = events()?; + let cond = Rc::new(Cond::new()); + for &event in evs { + let cond = cond.clone(); + events.add_once_handler( + event, + handler(move || { + cond.broadcast(); + Ok(()) + }), + ); + } + // events must be released before yielding + drop(events); + cond.wait_timeout(timeout); + Ok(()) +} + +#[allow(dead_code)] +/// Waits for any of the specified events to happen. +/// +/// Returns an error if the `EVENTS` is uninitialized. +pub fn wait_any(evs: &[Event]) -> Result<(), Error> { + wait_any_timeout(evs, Duration::MAX) +} + +/// Signals to everybody who's waiting for this `event` either repeated or one +/// time. +/// +/// If `EVENTS` is uninitialized, nothing happens +pub fn broadcast(event: impl Borrow<Event>) { + let mut events = unwrap_ok_or!(events(), Err(_) => return); + events.broadcast_repeated(event.borrow()); + let handler = unwrap_or!(events.once_handler(event.borrow()), return); + if let Err(e) = handler.handle() { + crate::tlog!(Warning, "error happened during handling of event: {e}"; + "event" => event.borrow().as_str(), + ) + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// Struct that handles an event +pub struct Handler { + // TODO: add ability to pass context to event handler + pub cbs: LinkedList<Box<dyn FnOnce() -> BoxResult<()>>>, +} + +impl Handler { + fn new<T>(cb: T) -> Self + where + T: 'static, + T: FnOnce() -> BoxResult<()>, + { + let mut cbs: LinkedList<Box<dyn FnOnce() -> BoxResult<()>>> = LinkedList::new(); + cbs.push_back(Box::new(cb)); + Self { cbs } + } + + #[allow(dead_code)] + /// Add a callback to this event handler + pub fn push<T>(&mut self, cb: T) + where + T: 'static, + T: FnOnce() -> BoxResult<()>, + { + self.cbs.push_back(Box::new(cb)); + } + + /// Handle the event. + pub fn handle(self) -> BoxResult<()> { + let (_, errs): (Vec<_>, Vec<_>) = self + .cbs + .into_iter() + .map(|cb| (cb)()) + .partition(|res| res.is_ok()); + match &errs[..] { + [] => Ok(()), + [_only_one_error] => errs.into_iter().next().unwrap(), + [..] => { + let mut msg = String::with_capacity(128); + writeln!(msg, "{} errors happened:", errs.len()).unwrap(); + for err in errs { + writeln!(msg, "{}", err.unwrap_err()).unwrap(); + } + Err(msg.into()) + } + } + } +} + +#[allow(dead_code)] +pub fn handler<T>(cb: T) -> Handler +where + T: 'static, + T: FnOnce() -> BoxResult<()>, +{ + Handler::new(cb) +} + +//////////////////////////////////////////////////////////////////////////////// +/// Global [`Events`] instance that handles all events received by the instance +static mut EVENTS: Option<Box<Mutex<Events>>> = None; + +/// Initialize the global [`Events`] singleton. **Should only be called once** +pub fn init() { + unsafe { + assert!(EVENTS.is_none(), "event::init() must be called only once"); + EVENTS = Some(Box::new(Mutex::new(Events::default()))); + } +} + +/// Acquire the global [`Events`] singleton. +pub fn events() -> Result<MutexGuard<'static, Events>, Error> { + if let Some(events) = unsafe { EVENTS.as_ref() } { + Ok(events.lock()) + } else { + Err(Error::EventsUninitialized) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// proc +#[proc] +fn raft_event(event: String) -> BoxResult<()> { + let event = Event::from_str(&event).map_err(|e| format!("{e}: {event}"))?; + + let handler = events()? + .once_handler(&event) + .ok_or_else(|| format!("no handler registered for '{event}'"))?; + handler.handle() +} diff --git a/src/traft/mod.rs b/src/traft/mod.rs index a28645cbf1f9541e33299f4d5142b0e8c3e0b7ee..d875edeafc07fc38eb41b82e869d89d0e19efe78 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -1,6 +1,7 @@ //! Compatibility layer between Tarantool and `raft-rs`. mod error; +pub mod event; pub mod failover; mod network; pub mod node; diff --git a/src/traft/node.rs b/src/traft/node.rs index 44db300e677f1b5ad6855b8d2e8aad3e45c75bb2..ed16891202456bf3a77cad68d2e21f47e84fc36e 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -35,6 +35,8 @@ use std::iter::FromIterator as _; use crate::mailbox::Mailbox; use crate::tlog; use crate::traft; +use crate::traft::event; +use crate::traft::event::Event; use crate::traft::ConnectionPool; use crate::traft::LogicalClock; use crate::traft::Storage; @@ -119,6 +121,8 @@ impl std::fmt::Debug for Notify { pub enum Error { #[error("uninitialized yet")] Uninitialized, + #[error("events system is uninitialized yet")] + EventsUninitialized, #[error("timeout")] Timeout, #[error("{0}")] @@ -154,8 +158,6 @@ pub struct Node { main_inbox: Mailbox<NormalRequest>, // join_inbox: TopologyMailbox, status: Rc<RefCell<Status>>, - wake_up_status_observers: Rc<fiber::Cond>, - _wake_up_conf_change_loop: Rc<fiber::Cond>, } /// A request to the raft main loop. @@ -226,8 +228,6 @@ impl Node { pub const TICK: Duration = Duration::from_millis(100); pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> { - let wake_up_status_observers = Rc::new(fiber::Cond::new()); - let wake_up_conf_change_loop = Rc::new(fiber::Cond::new()); let main_inbox = Mailbox::<NormalRequest>::new(); let raw_node = RawNode::new(cfg, Storage, &tlog::root())?; let status = Rc::new(RefCell::new(Status { @@ -239,41 +239,19 @@ impl Node { let main_loop_fn = { let status = status.clone(); - let wake_up_status_observers = wake_up_status_observers.clone(); - let wake_up_conf_change_loop = wake_up_conf_change_loop.clone(); let main_inbox = main_inbox.clone(); - move || { - raft_main_loop( - main_inbox, - status, - wake_up_status_observers, - wake_up_conf_change_loop, - raw_node, - ) - } + move || raft_main_loop(main_inbox, status, raw_node) }; let conf_change_loop_fn = { let status = status.clone(); - let wake_up_status_observers = wake_up_status_observers.clone(); - let wake_up_conf_change_loop = wake_up_conf_change_loop.clone(); let main_inbox = main_inbox.clone(); - move || { - raft_conf_change_loop( - status, - wake_up_status_observers, - wake_up_conf_change_loop, - main_inbox, - ) - } + move || raft_conf_change_loop(status, main_inbox) }; let node = Node { main_inbox, - // join_inbox, status, - wake_up_status_observers, - _wake_up_conf_change_loop: wake_up_conf_change_loop, _main_loop: fiber::Builder::new() .name("raft_main_loop") .proc(main_loop_fn) @@ -297,11 +275,11 @@ impl Node { pub fn mark_as_ready(&self) { self.status.borrow_mut().is_ready = true; - self.wake_up_status_observers.broadcast(); + event::broadcast(Event::StatusChanged); } pub fn wait_status(&self) { - self.wake_up_status_observers.wait(); + event::wait(Event::StatusChanged).expect("Events system wasn't initialized"); } pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> { @@ -537,8 +515,6 @@ fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) { fn raft_main_loop( main_inbox: Mailbox<NormalRequest>, status: Rc<RefCell<Status>>, - wake_up_status_observers: Rc<fiber::Cond>, - wake_up_conf_change_loop: Rc<fiber::Cond>, mut raw_node: RawNode, ) { let mut next_tick = Instant::now() + Node::TICK; @@ -821,7 +797,7 @@ fn raft_main_loop( id => Some(id), }; status.raft_state = format!("{:?}", ss.raft_state); - wake_up_status_observers.broadcast(); + event::broadcast(Event::StatusChanged); } if !ready.persisted_messages().is_empty() { @@ -868,7 +844,7 @@ fn raft_main_loop( .unwrap(); if topology_changed { - wake_up_conf_change_loop.broadcast(); + event::broadcast(Event::TopologyChanged); if let Some(peer) = traft::Storage::peer_by_raft_id(raw_node.raft.id).unwrap() { let mut box_cfg = crate::tarantool::cfg().unwrap(); assert_eq!(box_cfg.replication_connect_quorum, 0); @@ -880,15 +856,10 @@ fn raft_main_loop( } } -fn raft_conf_change_loop( - status: Rc<RefCell<Status>>, - wake_up_status_observers: Rc<fiber::Cond>, - wake_up_conf_change_loop: Rc<fiber::Cond>, - main_inbox: Mailbox<NormalRequest>, -) { +fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<NormalRequest>) { loop { if status.borrow().raft_state != "Leader" { - wake_up_status_observers.wait(); + event::wait(Event::StatusChanged).expect("Events system must be initialized"); continue; } @@ -917,7 +888,7 @@ fn raft_conf_change_loop( } if changes.is_empty() { - wake_up_conf_change_loop.wait(); + event::wait(Event::TopologyChanged).expect("Events system must be initialized"); continue; } diff --git a/src/util.rs b/src/util.rs index fcffbf25e75583926b56d5808253a2c91e12f1f4..125de8e64a3e6c0b54c7c15c7848f4e178fcac15 100644 --- a/src/util.rs +++ b/src/util.rs @@ -3,3 +3,13 @@ pub enum Either<L, R> { Left(L), Right(R), } + +#[macro_export] +macro_rules! unwrap_ok_or { + ($o:expr, $err:pat => $($else:tt)+) => { + match $o { + Ok(v) => v, + $err => $($else)+, + } + } +}