From 5a6eaa97ac4312010d8e961076cf3f198a727c51 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 2 May 2023 14:44:28 +0300 Subject: [PATCH] feat: NodeImpl::propose which returns entry index instead of a Notify --- src/lib.rs | 11 +++++++++-- src/traft/node.rs | 16 ++++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1012f6d31f..e69260ba50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,7 +127,7 @@ fn picolib_setup(args: &args::Run) { ); luamod.set( "raft_propose", - tlua::function1(|lua: tlua::LuaState| -> traft::Result<()> { + tlua::function1(|lua: tlua::LuaState| -> traft::Result<RaftIndex> { use tlua::{AnyLuaString, AsLua, LuaError, LuaTable}; let lua = unsafe { tlua::Lua::from_static(lua) }; let t: LuaTable<_> = AsLua::read(&lua).map_err(|(_, e)| LuaError::from(e))?; @@ -135,7 +135,14 @@ fn picolib_setup(args: &args::Run) { .eval_with("return require 'msgpack'.encode(...)", &t) .map_err(LuaError::from)?; let op: Op = Decode::decode(mp.as_bytes())?; - traft::node::global()?.propose_and_wait(op, Duration::from_secs(1)) + + let node = traft::node::global()?; + let mut node_impl = node.node_impl(); + let index = node_impl.propose(op)?; + node.main_loop.wakeup(); + // Release the lock + drop(node_impl); + Ok(index) }), ); luamod.set( diff --git a/src/traft/node.rs b/src/traft/node.rs index 23153a1ee1..d6a2c6ae49 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -128,7 +128,7 @@ pub struct Node { node_impl: Rc<Mutex<NodeImpl>>, pub(crate) storage: Clusterwide, pub(crate) raft_storage: RaftSpaceAccess, - main_loop: MainLoop, + pub(crate) main_loop: MainLoop, pub(crate) governor_loop: governor::Loop, status: watch::Receiver<Status>, watchers: Rc<Mutex<StorageWatchers>>, @@ -486,6 +486,18 @@ impl NodeImpl { Ok(notify) } + /// Proposes a raft entry to be appended to the log and returns raft index + /// at which it is expected to be committed unless it gets rejected. + /// + /// **Doesn't yield** + pub fn propose(&mut self, op: Op) -> Result<RaftIndex, RaftError> { + self.lc.inc(); + let ctx = traft::EntryContextNormal::new(self.lc, op); + self.raw_node.propose(ctx.to_bytes(), vec![])?; + let index = self.raw_node.raft.raft_log.last_index(); + Ok(index) + } + pub fn campaign(&mut self) -> Result<(), RaftError> { self.raw_node.campaign() } @@ -1275,7 +1287,7 @@ impl NodeImpl { } } -struct MainLoop { +pub(crate) struct MainLoop { _loop: Option<fiber::UnitJoinHandle<'static>>, loop_waker: watch::Sender<()>, stop_flag: Rc<Cell<bool>>, -- GitLab