From 95124e3825f7df734bc6a14a7ddce4c77cb299db Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 11 Aug 2022 15:06:41 +0300 Subject: [PATCH] refactor: yielding functions now have "wait" or "yield" in their name --- src/main.rs | 18 ++++++++++-------- src/traft/failover.rs | 2 +- src/traft/node.rs | 27 +++++++++++++++------------ 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1cfdc2886b..50ec47e4d2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -58,20 +58,21 @@ fn picolib_setup(args: &args::Run) { luamod.set( "raft_tick", tlua::function1(|n_times: u32| -> Result<(), Error> { - traft::node::global()?.tick(n_times); + traft::node::global()?.tick_and_yield(n_times); Ok(()) }), ); luamod.set( "raft_read_index", tlua::function1(|timeout: f64| -> Result<RaftIndex, Error> { - traft::node::global()?.read_index(Duration::from_secs_f64(timeout)) + traft::node::global()?.wait_for_read_state(Duration::from_secs_f64(timeout)) }), ); luamod.set( "raft_propose_info", tlua::function1(|x: String| -> Result<(), Error> { - traft::node::global()?.propose(traft::Op::Info { msg: x }, Duration::from_secs(1)) + traft::node::global()? + .propose_and_wait(traft::Op::Info { msg: x }, Duration::from_secs(1)) }), ); luamod.set( @@ -98,7 +99,7 @@ fn picolib_setup(args: &args::Run) { |x: String, opts: Option<ProposeEvalOpts>| -> Result<(), Error> { let timeout = opts.and_then(|opts| opts.timeout).unwrap_or(10.0); traft::node::global()? - .propose( + .propose_and_wait( traft::OpEvalLua { code: x }, Duration::from_secs_f64(timeout), ) @@ -109,7 +110,8 @@ fn picolib_setup(args: &args::Run) { luamod.set( "raft_return_one", tlua::function1(|timeout: f64| -> Result<u8, Error> { - traft::node::global()?.propose(traft::OpReturnOne, Duration::from_secs_f64(timeout)) + traft::node::global()? + .propose_and_wait(traft::OpReturnOne, Duration::from_secs_f64(timeout)) }), ); { @@ -721,8 +723,8 @@ fn postjoin(args: &args::Run) { ) ); - node.tick(1); // apply configuration, if any - node.campaign().ok(); // trigger election immediately + node.tick_and_yield(1); // apply configuration, if any + node.campaign_and_yield().ok(); // trigger election immediately assert_eq!(node.status().raft_state, "Leader"); } @@ -741,7 +743,7 @@ fn postjoin(args: &args::Run) { } let timeout = Duration::from_secs(10); - if let Err(e) = node.read_index(timeout) { + if let Err(e) = node.wait_for_read_state(timeout) { tlog!(Debug, "unable to get a read barrier: {e}"); fiber::sleep(Duration::from_millis(100)); continue; diff --git a/src/traft/failover.rs b/src/traft/failover.rs index a1b9fb5c23..04386cb37a 100644 --- a/src/traft/failover.rs +++ b/src/traft/failover.rs @@ -83,7 +83,7 @@ fn raft_update_peer( })); } - node.handle_topology_request(req.into())?; + node.handle_topology_request_and_wait(req.into())?; Ok(UpdatePeerResponse {}) } diff --git a/src/traft/node.rs b/src/traft/node.rs index ce99135ec8..9110a96d2d 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -140,7 +140,7 @@ impl Node { }; // Wait for the node to enter the main loop - node.tick(0); + node.tick_and_yield(0); Ok(node) } @@ -160,7 +160,7 @@ impl Node { } /// **This function yields** - pub fn read_index(&self, timeout: Duration) -> Result<RaftIndex, Error> { + pub fn wait_for_read_state(&self, timeout: Duration) -> Result<RaftIndex, Error> { self.raw_operation(|raw_node| { // In some states `raft-rs` ignores the ReadIndex request. // Check it preliminary, don't wait for the timeout. @@ -188,7 +188,7 @@ impl Node { /// Propose an operation and wait for it's result. /// **This function yields** - pub fn propose<T: OpResult + Into<traft::Op>>( + pub fn propose_and_wait<T: OpResult + Into<traft::Op>>( &self, op: T, timeout: Duration, @@ -205,7 +205,7 @@ impl Node { /// Become a candidate and wait for a main loop round so that there's a /// chance we become the leader. /// **This function yields** - pub fn campaign(&self) -> Result<(), Error> { + pub fn campaign_and_yield(&self) -> Result<(), Error> { self.raw_operation(|raw_node| raw_node.campaign().map_err(Into::into))?; // Even though we don't expect a response, we still should let the // main_loop do an iteration. Without rescheduling, the Ready state @@ -216,7 +216,7 @@ impl Node { } /// **This function yields** - pub fn step(&self, msg: raft::Message) { + pub fn step_and_yield(&self, msg: raft::Message) { self.raw_operation(|raw_node| { if msg.to != raw_node.raft.id { return Ok(()); @@ -235,7 +235,7 @@ impl Node { } /// **This function yields** - pub fn tick(&self, n_times: u32) { + pub fn tick_and_yield(&self, n_times: u32) { self.raw_operation(|raw_node| { for _ in 0..n_times { raw_node.tick(); @@ -250,7 +250,7 @@ impl Node { /// **This function yields** pub fn timeout_now(&self) { - self.step(raft::Message { + self.step_and_yield(raft::Message { to: self.raft_id, from: self.raft_id, msg_type: raft::MessageType::MsgTimeoutNow, @@ -264,7 +264,10 @@ impl Node { /// Returns an error if the callee node isn't a Raft leader. /// /// **This function yields** - pub fn handle_topology_request(&self, req: TopologyRequest) -> Result<traft::Peer, Error> { + pub fn handle_topology_request_and_wait( + &self, + req: TopologyRequest, + ) -> Result<traft::Peer, Error> { self.raw_operation(|raw_node| { if raw_node.raft.state != RaftStateRole::Leader { return Err(RaftError::ConfChangeError("not a leader".into()).into()); @@ -320,7 +323,7 @@ impl Node { /// Only the conf_change_loop on a leader is eligible to call this function. /// /// **This function yields** - fn propose_conf_change( + fn propose_conf_change_and_wait( &self, term: RaftTerm, conf_change: raft::ConfChangeV2, @@ -828,7 +831,7 @@ fn raft_conf_change_loop(status: Rc<RefCell<Status>>) { // will sometimes be handled and there's no need in timeout. // It also guarantees that the notification will arrive only // after the node leaves the joint state. - match node.propose_conf_change(term, conf_change) { + match node.propose_conf_change_and_wait(term, conf_change) { Ok(()) => tlog!(Info, "conf_change processed"), Err(e) => { tlog!(Warning, "conf_change failed: {e}"); @@ -875,7 +878,7 @@ pub fn global() -> Result<&'static Node, Error> { fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn std::error::Error>> { let node = global()?; for pb in pbs { - node.step(raft::Message::try_from(pb)?); + node.step_and_yield(raft::Message::try_from(pb)?); } Ok(()) } @@ -893,7 +896,7 @@ fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error })); } - let peer = node.handle_topology_request(req.into())?; + let peer = node.handle_topology_request_and_wait(req.into())?; let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?; // A joined peer needs to communicate with other nodes. -- GitLab