diff --git a/src/lib.rs b/src/lib.rs index 1012f6d31fd902e520c1ee6fb256a71a8d3150bd..e69260ba50a813636c26f9666b4bd4bd48b202ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,7 +127,7 @@ fn picolib_setup(args: &args::Run) { ); luamod.set( "raft_propose", - tlua::function1(|lua: tlua::LuaState| -> traft::Result<()> { + tlua::function1(|lua: tlua::LuaState| -> traft::Result<RaftIndex> { use tlua::{AnyLuaString, AsLua, LuaError, LuaTable}; let lua = unsafe { tlua::Lua::from_static(lua) }; let t: LuaTable<_> = AsLua::read(&lua).map_err(|(_, e)| LuaError::from(e))?; @@ -135,7 +135,14 @@ fn picolib_setup(args: &args::Run) { .eval_with("return require 'msgpack'.encode(...)", &t) .map_err(LuaError::from)?; let op: Op = Decode::decode(mp.as_bytes())?; - traft::node::global()?.propose_and_wait(op, Duration::from_secs(1)) + + let node = traft::node::global()?; + let mut node_impl = node.node_impl(); + let index = node_impl.propose(op)?; + node.main_loop.wakeup(); + // Release the lock + drop(node_impl); + Ok(index) }), ); luamod.set( diff --git a/src/traft/node.rs b/src/traft/node.rs index 23153a1ee16946832bea40d603d4574c0f3aedc0..d6a2c6ae4927d1c0ef5212f91a229f699a3d7e61 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -128,7 +128,7 @@ pub struct Node { node_impl: Rc<Mutex<NodeImpl>>, pub(crate) storage: Clusterwide, pub(crate) raft_storage: RaftSpaceAccess, - main_loop: MainLoop, + pub(crate) main_loop: MainLoop, pub(crate) governor_loop: governor::Loop, status: watch::Receiver<Status>, watchers: Rc<Mutex<StorageWatchers>>, @@ -486,6 +486,18 @@ impl NodeImpl { Ok(notify) } + /// Proposes a raft entry to be appended to the log and returns raft index + /// at which it is expected to be committed unless it gets rejected. + /// + /// **Doesn't yield** + pub fn propose(&mut self, op: Op) -> Result<RaftIndex, RaftError> { + self.lc.inc(); + let ctx = traft::EntryContextNormal::new(self.lc, op); + self.raw_node.propose(ctx.to_bytes(), vec![])?; + let index = self.raw_node.raft.raft_log.last_index(); + Ok(index) + } + pub fn campaign(&mut self) -> Result<(), RaftError> { self.raw_node.campaign() } @@ -1275,7 +1287,7 @@ impl NodeImpl { } } -struct MainLoop { +pub(crate) struct MainLoop { _loop: Option<fiber::UnitJoinHandle<'static>>, loop_waker: watch::Sender<()>, stop_flag: Rc<Cell<bool>>,