From 0a1c150fe4f4eed2c4406c5a3beb87c22c6a1ebe Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 30 Jun 2022 18:24:54 +0300
Subject: [PATCH] refactor: use events instead of conds

---
 src/main.rs        |   1 +
 src/traft/event.rs | 266 +++++++++++++++++++++++++++++++++++++++++++++
 src/traft/mod.rs   |   1 +
 src/traft/node.rs  |  55 +++-------
 src/util.rs        |  10 ++
 5 files changed, 291 insertions(+), 42 deletions(-)
 create mode 100644 src/traft/event.rs

diff --git a/src/main.rs b/src/main.rs
index b7a1689e06..851940cbe3 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -357,6 +357,7 @@ fn init_common(args: &args::Run, cfg: &tarantool::Cfg) {
 
     traft::Storage::init_schema();
     init_handlers();
+    traft::event::init();
 }
 
 fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) {
diff --git a/src/traft/event.rs b/src/traft/event.rs
new file mode 100644
index 0000000000..a74e12218a
--- /dev/null
+++ b/src/traft/event.rs
@@ -0,0 +1,266 @@
+use std::borrow::Borrow;
+use std::collections::{HashMap, LinkedList};
+use std::fmt::Write;
+use std::rc::Rc;
+use std::str::FromStr;
+use std::time::Duration;
+
+use ::tarantool::fiber::{mutex::MutexGuard, Cond, Mutex};
+use ::tarantool::proc;
+use ::tarantool::unwrap_or;
+
+use crate::traft::node::Error;
+use crate::unwrap_ok_or;
+use thiserror::Error;
+
+pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
+
+#[derive(Error, Debug)]
+#[error("unknown event")]
+pub struct EventFromStrError;
+
+macro_rules! define_events {
+    ($($event:tt, $str:literal;)+) => {
+        ////////////////////////////////////////////////////////////////////////
+        /// An enumeration of builtin events
+        #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, PartialOrd, Ord)]
+        pub enum Event {
+            $( $event, )+
+        }
+
+        impl Event {
+            pub const fn as_str(&self) -> &str {
+                match self {
+                    $( Self::$event => $str, )+
+                }
+            }
+        }
+
+        impl std::fmt::Display for Event {
+            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+                f.write_str(self.as_str())
+            }
+        }
+
+        impl FromStr for Event {
+            type Err = EventFromStrError;
+
+            fn from_str(s: &str) -> Result<Self, Self::Err> {
+                match s {
+                    $( $str => Ok(Self::$event), )+
+                    _ => Err(EventFromStrError),
+                }
+            }
+        }
+    }
+}
+
+define_events! {
+    StatusChanged, "raft.status-changed";
+    TopologyChanged, "raft.topology-changed";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/// Struct that stores information about event handlers
+#[derive(Default)]
+pub struct Events {
+    handlers: HashMap<Event, Handler>,
+    conds: HashMap<Event, Rc<Cond>>,
+}
+
+impl Events {
+    // TODO: query/modify registered handlers
+    fn once_handler(&mut self, event: &Event) -> Option<Handler> {
+        self.handlers.remove(event)
+    }
+
+    #[allow(dead_code)]
+    fn add_once_handler(&mut self, event: Event, handler: Handler) {
+        self.handlers.insert(event, handler);
+    }
+
+    /// Signals to everybody who's waiting for this repeated `event`.
+    ///
+    /// **does not yield**
+    fn broadcast_repeated(&self, event: &Event) {
+        if let Some(cond) = self.conds.get(event) {
+            cond.broadcast()
+        }
+    }
+
+    /// Returns a [`Cond`] which will be signalled every time the given `event`
+    /// occurs. Can be used to wait for a repeated event.
+    ///
+    /// [`Events`] should always be accessed via [`MutexGuard`], therefore this
+    /// function returns a `Rc<Cond>` rather then waiting on it, so that the
+    /// mutex guard can be released before the fiber yields.
+    ///
+    /// **does not yield**
+    fn regular_cond(&mut self, event: Event) -> Rc<Cond> {
+        self.conds
+            .entry(event)
+            .or_insert_with(|| Rc::new(Cond::new()))
+            .clone()
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// functions
+
+#[allow(dead_code)]
+/// Waits for the event to happen or timeout to end.
+///
+/// Returns an error if the `EVENTS` is uninitialized.
+pub fn wait_timeout(event: Event, timeout: Duration) -> Result<(), Error> {
+    let mut events = events()?;
+    let cond = events.regular_cond(event);
+    // events must be released before yielding
+    drop(events);
+    cond.wait_timeout(timeout);
+    Ok(())
+}
+
+/// Waits for the event to happen.
+///
+/// Returns an error if the `EVENTS` is uninitialized.
+pub fn wait(event: Event) -> Result<(), Error> {
+    wait_timeout(event, Duration::MAX)
+}
+
+#[allow(dead_code)]
+/// Waits for any of the specified events to happen.
+///
+/// Returns an error if the `EVENTS` is uninitialized.
+pub fn wait_any_timeout(evs: &[Event], timeout: Duration) -> Result<(), Error> {
+    let mut events = events()?;
+    let cond = Rc::new(Cond::new());
+    for &event in evs {
+        let cond = cond.clone();
+        events.add_once_handler(
+            event,
+            handler(move || {
+                cond.broadcast();
+                Ok(())
+            }),
+        );
+    }
+    // events must be released before yielding
+    drop(events);
+    cond.wait_timeout(timeout);
+    Ok(())
+}
+
+#[allow(dead_code)]
+/// Waits for any of the specified events to happen.
+///
+/// Returns an error if the `EVENTS` is uninitialized.
+pub fn wait_any(evs: &[Event]) -> Result<(), Error> {
+    wait_any_timeout(evs, Duration::MAX)
+}
+
+/// Signals to everybody who's waiting for this `event` either repeated or one
+/// time.
+///
+/// If `EVENTS` is uninitialized, nothing happens
+pub fn broadcast(event: impl Borrow<Event>) {
+    let mut events = unwrap_ok_or!(events(), Err(_) => return);
+    events.broadcast_repeated(event.borrow());
+    let handler = unwrap_or!(events.once_handler(event.borrow()), return);
+    if let Err(e) = handler.handle() {
+        crate::tlog!(Warning, "error happened during handling of event: {e}";
+            "event" => event.borrow().as_str(),
+        )
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/// Struct that handles an event
+pub struct Handler {
+    // TODO: add ability to pass context to event handler
+    pub cbs: LinkedList<Box<dyn FnOnce() -> BoxResult<()>>>,
+}
+
+impl Handler {
+    fn new<T>(cb: T) -> Self
+    where
+        T: 'static,
+        T: FnOnce() -> BoxResult<()>,
+    {
+        let mut cbs: LinkedList<Box<dyn FnOnce() -> BoxResult<()>>> = LinkedList::new();
+        cbs.push_back(Box::new(cb));
+        Self { cbs }
+    }
+
+    #[allow(dead_code)]
+    /// Add a callback to this event handler
+    pub fn push<T>(&mut self, cb: T)
+    where
+        T: 'static,
+        T: FnOnce() -> BoxResult<()>,
+    {
+        self.cbs.push_back(Box::new(cb));
+    }
+
+    /// Handle the event.
+    pub fn handle(self) -> BoxResult<()> {
+        let (_, errs): (Vec<_>, Vec<_>) = self
+            .cbs
+            .into_iter()
+            .map(|cb| (cb)())
+            .partition(|res| res.is_ok());
+        match &errs[..] {
+            [] => Ok(()),
+            [_only_one_error] => errs.into_iter().next().unwrap(),
+            [..] => {
+                let mut msg = String::with_capacity(128);
+                writeln!(msg, "{} errors happened:", errs.len()).unwrap();
+                for err in errs {
+                    writeln!(msg, "{}", err.unwrap_err()).unwrap();
+                }
+                Err(msg.into())
+            }
+        }
+    }
+}
+
+#[allow(dead_code)]
+pub fn handler<T>(cb: T) -> Handler
+where
+    T: 'static,
+    T: FnOnce() -> BoxResult<()>,
+{
+    Handler::new(cb)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/// Global [`Events`] instance that handles all events received by the instance
+static mut EVENTS: Option<Box<Mutex<Events>>> = None;
+
+/// Initialize the global [`Events`] singleton. **Should only be called once**
+pub fn init() {
+    unsafe {
+        assert!(EVENTS.is_none(), "event::init() must be called only once");
+        EVENTS = Some(Box::new(Mutex::new(Events::default())));
+    }
+}
+
+/// Acquire the global [`Events`] singleton.
+pub fn events() -> Result<MutexGuard<'static, Events>, Error> {
+    if let Some(events) = unsafe { EVENTS.as_ref() } {
+        Ok(events.lock())
+    } else {
+        Err(Error::EventsUninitialized)
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// proc
+#[proc]
+fn raft_event(event: String) -> BoxResult<()> {
+    let event = Event::from_str(&event).map_err(|e| format!("{e}: {event}"))?;
+
+    let handler = events()?
+        .once_handler(&event)
+        .ok_or_else(|| format!("no handler registered for '{event}'"))?;
+    handler.handle()
+}
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index a28645cbf1..d875edeafc 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -1,6 +1,7 @@
 //! Compatibility layer between Tarantool and `raft-rs`.
 
 mod error;
+pub mod event;
 pub mod failover;
 mod network;
 pub mod node;
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 44db300e67..ed16891202 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -35,6 +35,8 @@ use std::iter::FromIterator as _;
 use crate::mailbox::Mailbox;
 use crate::tlog;
 use crate::traft;
+use crate::traft::event;
+use crate::traft::event::Event;
 use crate::traft::ConnectionPool;
 use crate::traft::LogicalClock;
 use crate::traft::Storage;
@@ -119,6 +121,8 @@ impl std::fmt::Debug for Notify {
 pub enum Error {
     #[error("uninitialized yet")]
     Uninitialized,
+    #[error("events system is uninitialized yet")]
+    EventsUninitialized,
     #[error("timeout")]
     Timeout,
     #[error("{0}")]
@@ -154,8 +158,6 @@ pub struct Node {
     main_inbox: Mailbox<NormalRequest>,
     // join_inbox: TopologyMailbox,
     status: Rc<RefCell<Status>>,
-    wake_up_status_observers: Rc<fiber::Cond>,
-    _wake_up_conf_change_loop: Rc<fiber::Cond>,
 }
 
 /// A request to the raft main loop.
@@ -226,8 +228,6 @@ impl Node {
     pub const TICK: Duration = Duration::from_millis(100);
 
     pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
-        let wake_up_status_observers = Rc::new(fiber::Cond::new());
-        let wake_up_conf_change_loop = Rc::new(fiber::Cond::new());
         let main_inbox = Mailbox::<NormalRequest>::new();
         let raw_node = RawNode::new(cfg, Storage, &tlog::root())?;
         let status = Rc::new(RefCell::new(Status {
@@ -239,41 +239,19 @@ impl Node {
 
         let main_loop_fn = {
             let status = status.clone();
-            let wake_up_status_observers = wake_up_status_observers.clone();
-            let wake_up_conf_change_loop = wake_up_conf_change_loop.clone();
             let main_inbox = main_inbox.clone();
-            move || {
-                raft_main_loop(
-                    main_inbox,
-                    status,
-                    wake_up_status_observers,
-                    wake_up_conf_change_loop,
-                    raw_node,
-                )
-            }
+            move || raft_main_loop(main_inbox, status, raw_node)
         };
 
         let conf_change_loop_fn = {
             let status = status.clone();
-            let wake_up_status_observers = wake_up_status_observers.clone();
-            let wake_up_conf_change_loop = wake_up_conf_change_loop.clone();
             let main_inbox = main_inbox.clone();
-            move || {
-                raft_conf_change_loop(
-                    status,
-                    wake_up_status_observers,
-                    wake_up_conf_change_loop,
-                    main_inbox,
-                )
-            }
+            move || raft_conf_change_loop(status, main_inbox)
         };
 
         let node = Node {
             main_inbox,
-            // join_inbox,
             status,
-            wake_up_status_observers,
-            _wake_up_conf_change_loop: wake_up_conf_change_loop,
             _main_loop: fiber::Builder::new()
                 .name("raft_main_loop")
                 .proc(main_loop_fn)
@@ -297,11 +275,11 @@ impl Node {
 
     pub fn mark_as_ready(&self) {
         self.status.borrow_mut().is_ready = true;
-        self.wake_up_status_observers.broadcast();
+        event::broadcast(Event::StatusChanged);
     }
 
     pub fn wait_status(&self) {
-        self.wake_up_status_observers.wait();
+        event::wait(Event::StatusChanged).expect("Events system wasn't initialized");
     }
 
     pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> {
@@ -537,8 +515,6 @@ fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) {
 fn raft_main_loop(
     main_inbox: Mailbox<NormalRequest>,
     status: Rc<RefCell<Status>>,
-    wake_up_status_observers: Rc<fiber::Cond>,
-    wake_up_conf_change_loop: Rc<fiber::Cond>,
     mut raw_node: RawNode,
 ) {
     let mut next_tick = Instant::now() + Node::TICK;
@@ -821,7 +797,7 @@ fn raft_main_loop(
                     id => Some(id),
                 };
                 status.raft_state = format!("{:?}", ss.raft_state);
-                wake_up_status_observers.broadcast();
+                event::broadcast(Event::StatusChanged);
             }
 
             if !ready.persisted_messages().is_empty() {
@@ -868,7 +844,7 @@ fn raft_main_loop(
         .unwrap();
 
         if topology_changed {
-            wake_up_conf_change_loop.broadcast();
+            event::broadcast(Event::TopologyChanged);
             if let Some(peer) = traft::Storage::peer_by_raft_id(raw_node.raft.id).unwrap() {
                 let mut box_cfg = crate::tarantool::cfg().unwrap();
                 assert_eq!(box_cfg.replication_connect_quorum, 0);
@@ -880,15 +856,10 @@ fn raft_main_loop(
     }
 }
 
-fn raft_conf_change_loop(
-    status: Rc<RefCell<Status>>,
-    wake_up_status_observers: Rc<fiber::Cond>,
-    wake_up_conf_change_loop: Rc<fiber::Cond>,
-    main_inbox: Mailbox<NormalRequest>,
-) {
+fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<NormalRequest>) {
     loop {
         if status.borrow().raft_state != "Leader" {
-            wake_up_status_observers.wait();
+            event::wait(Event::StatusChanged).expect("Events system must be initialized");
             continue;
         }
 
@@ -917,7 +888,7 @@ fn raft_conf_change_loop(
         }
 
         if changes.is_empty() {
-            wake_up_conf_change_loop.wait();
+            event::wait(Event::TopologyChanged).expect("Events system must be initialized");
             continue;
         }
 
diff --git a/src/util.rs b/src/util.rs
index fcffbf25e7..125de8e64a 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -3,3 +3,13 @@ pub enum Either<L, R> {
     Left(L),
     Right(R),
 }
+
+#[macro_export]
+macro_rules! unwrap_ok_or {
+    ($o:expr, $err:pat => $($else:tt)+) => {
+        match $o {
+            Ok(v) => v,
+            $err => $($else)+,
+        }
+    }
+}
-- 
GitLab