From 3dfe5d3107bd4e2bd8da13a7caca8345744abbc1 Mon Sep 17 00:00:00 2001 From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com> Date: Fri, 11 Feb 2022 19:33:52 +0300 Subject: [PATCH] refactor: reorganize traft::Node code 1. Merge `on_ready` handler with the `loop_fn` body into the larger `raft_main` function. 2. Get rid of debug logs. --- picolib/traft/node.rs | 176 +++++++++++++++++++----------------------- 1 file changed, 79 insertions(+), 97 deletions(-) diff --git a/picolib/traft/node.rs b/picolib/traft/node.rs index 1f6a78bae9..9c6210904a 100644 --- a/picolib/traft/node.rs +++ b/picolib/traft/node.rs @@ -25,43 +25,14 @@ enum Request { impl Node { pub const TICK: Duration = Duration::from_millis(100); - pub fn new(cfg: &raft::Config, handle_committed_data: fn(&[u8])) -> Result<Self, RaftError> { - let logger = tlog::root(); - let mut raw_node = RawNode::new(cfg, Storage, &logger)?; - - let (tx, rx) = fiber::Channel::new(0).into_clones(); - - let loop_fn = move || { - let mut next_tick = Instant::now() + Self::TICK; - - loop { - use Request::*; - match rx.recv_timeout(Self::TICK) { - Ok(Propose(data)) => { - raw_node.propose(vec![], data).unwrap(); - } - Ok(Step(msg)) => { - raw_node.step(msg).unwrap(); - } - Err(fiber::RecvError::Timeout) => (), - Err(fiber::RecvError::Disconnected) => unreachable!(), - } - - let now = Instant::now(); - if now > next_tick { - next_tick = now + Self::TICK; - raw_node.tick(); - } - - if raw_node.has_ready() { - on_ready(&mut raw_node, handle_committed_data); - } - } - }; + pub fn new(cfg: &raft::Config, on_commit: fn(&[u8])) -> Result<Self, RaftError> { + let raw_node = RawNode::new(cfg, Storage, &tlog::root())?; + let (inbox, inbox_clone) = fiber::Channel::new(0).into_clones(); + let loop_fn = move || raft_main(inbox_clone, raw_node, on_commit); Ok(Node { + inbox, _main_loop: fiber::defer_proc(loop_fn), - inbox: tx, }) } @@ -76,84 +47,95 @@ impl Node { } } -fn on_ready(raft_group: &mut RawNode, handle_committed_data: fn(&[u8])) { - tlog!(Debug, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"); +fn raft_main(inbox: fiber::Channel<Request>, mut raw_node: RawNode, on_commit: fn(&[u8])) { + let mut next_tick = Instant::now() + Node::TICK; - // Get the `Ready` with `RawNode::ready` interface. - let mut ready: raft::Ready = raft_group.ready(); - tlog!(Debug, "--- {:?}", ready); + loop { + match inbox.recv_timeout(Node::TICK) { + Ok(Request::Propose(data)) => { + raw_node.propose(vec![], data).unwrap(); + } + Ok(Request::Step(msg)) => { + raw_node.step(msg).unwrap(); + } + Err(fiber::RecvError::Timeout) => (), + Err(fiber::RecvError::Disconnected) => unreachable!(), + } - let handle_messages = |msgs: Vec<raft::Message>| { - for _msg in msgs { - tlog!(Debug, "--- handle message: {:?}", _msg); - // Send messages to other peers. + let now = Instant::now(); + if now > next_tick { + next_tick = now + Node::TICK; + raw_node.tick(); } - }; - if !ready.messages().is_empty() { - // Send out the messages come from the node. - handle_messages(ready.take_messages()); - } + // Get the `Ready` with `RawNode::ready` interface. + if !raw_node.has_ready() { + continue; + } - if !ready.snapshot().is_empty() { - // This is a snapshot, we need to apply the snapshot at first. - let snap = ready.snapshot().clone(); - tlog!(Debug, "--- apply_snapshot: {:?}", snap); - unimplemented!(); - // store.wl().apply_snapshot(snap).unwrap(); - } + let mut ready: raft::Ready = raw_node.ready(); - let handle_committed_entries = |committed_entries: Vec<raft::Entry>| { - for entry in committed_entries { - tlog!(Debug, "--- committed_entry: {:?}", entry); - Storage::persist_applied(entry.index); + let handle_messages = |msgs: Vec<raft::Message>| { + for _msg in msgs { + // Send messages to other peers. + } + }; - if entry.get_entry_type() == raft::EntryType::EntryNormal { - handle_committed_data(entry.get_data()) + if !ready.messages().is_empty() { + // Send out the messages come from the node. + handle_messages(ready.take_messages()); + } + + if !ready.snapshot().is_empty() { + // This is a snapshot, we need to apply the snapshot at first. + unimplemented!(); + } + + let handle_committed_entries = |committed_entries: Vec<raft::Entry>| { + for entry in committed_entries { + Storage::persist_applied(entry.index); + + if entry.get_entry_type() == raft::EntryType::EntryNormal { + on_commit(entry.get_data()) + } + + // TODO: handle EntryConfChange } + }; + + handle_committed_entries(ready.take_committed_entries()); - // TODO: handle EntryConfChange + if !ready.entries().is_empty() { + // Append entries to the Raft log. + Storage::persist_entries(ready.entries()); } - }; - handle_committed_entries(ready.take_committed_entries()); - - if !ready.entries().is_empty() { - // Append entries to the Raft log. - let entries = ready.entries(); - for entry in entries { - tlog!(Debug, "--- uncommitted_entry: {:?}", entry); + + if let Some(hs) = ready.hs() { + // Raft HardState changed, and we need to persist it. + // let hs = hs.clone(); + Storage::persist_hard_state(hs); } - Storage::persist_entries(entries); - } + if !ready.persisted_messages().is_empty() { + // Send out the persisted messages come from the node. + handle_messages(ready.take_persisted_messages()); + } - if let Some(hs) = ready.hs() { - // Raft HardState changed, and we need to persist it. - // let hs = hs.clone(); - tlog!(Debug, "--- hard_state: {:?}", hs); - Storage::persist_hard_state(hs); - // store.wl().set_hardstate(hs); - } + // Advance the Raft. + let mut light_rd = raw_node.advance(ready); - if !ready.persisted_messages().is_empty() { - // Send out the persisted messages come from the node. - handle_messages(ready.take_persisted_messages()); - } + // Update commit index. + if let Some(commit) = light_rd.commit_index() { + Storage::persist_commit(commit); + } + + // Send out the messages. + handle_messages(light_rd.take_messages()); - tlog!(Debug, "ADVANCE -----------------------------------------"); + // Apply all committed entries. + handle_committed_entries(light_rd.take_committed_entries()); - // Advance the Raft. - let mut light_rd = raft_group.advance(ready); - tlog!(Debug, "--- {:?}", light_rd); - // Update commit index. - if let Some(commit) = light_rd.commit_index() { - Storage::persist_commit(commit); + // Advance the apply index. + raw_node.advance_apply(); } - // Send out the messages. - handle_messages(light_rd.take_messages()); - // Apply all committed entries. - handle_committed_entries(light_rd.take_committed_entries()); - // Advance the apply index. - raft_group.advance_apply(); - tlog!(Debug, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"); } -- GitLab