From 90d7e51fb0e9f815185cda42e31ea471f4e29a70 Mon Sep 17 00:00:00 2001 From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com> Date: Mon, 19 Sep 2022 09:51:40 +0300 Subject: [PATCH] refactor: move main_loop into the InnerNode --- src/traft/node.rs | 174 +++++++++++++++++++++++----------------------- 1 file changed, 86 insertions(+), 88 deletions(-) diff --git a/src/traft/node.rs b/src/traft/node.rs index 294444c4c8..1dbff00d3d 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -109,9 +109,8 @@ impl Node { let main_loop_fn = { let status = status.clone(); let inner_node = inner_node.clone(); - let storage = storage.clone(); let raft_loop_cond = raft_loop_cond.clone(); - move || raft_main_loop(status, inner_node, storage, raft_loop_cond) + move || raft_main_loop(status, inner_node, raft_loop_cond) }; let conf_change_loop_fn = { @@ -660,6 +659,90 @@ impl InnerNode { } } + fn advance( + &mut self, + status: &RefCell<Status>, + topology_changed: &mut bool, + expelled: &mut bool, + ) { + // Get the `Ready` with `RawNode::ready` interface. + if !self.raw_node.has_ready() { + return; + } + + let mut ready: raft::Ready = self.raw_node.ready(); + + start_transaction(|| -> Result<(), TransactionError> { + if !ready.messages().is_empty() { + // Send out the messages come from the node. + let messages = ready.take_messages(); + self.handle_messages(messages); + } + + if !ready.snapshot().is_empty() { + // This is a snapshot, we need to apply the snapshot at first. + unimplemented!(); + } + + let committed_entries = ready.take_committed_entries(); + self.handle_committed_entries(committed_entries, topology_changed, expelled); + + if !ready.entries().is_empty() { + let e = ready.entries(); + // Append entries to the Raft log. + self.storage.persist_entries(e).unwrap(); + } + + if let Some(hs) = ready.hs() { + // Raft HardState changed, and we need to persist it. + // let hs = hs.clone(); + self.storage.persist_hard_state(hs).unwrap(); + } + + if let Some(ss) = ready.ss() { + let mut status = status.borrow_mut(); + status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id); + status.raft_state = format!("{:?}", ss.raft_state); + event::broadcast(Event::StatusChanged); + } + + if !ready.persisted_messages().is_empty() { + // Send out the persisted messages come from the node. + let messages = ready.take_persisted_messages(); + self.handle_messages(messages); + } + + let read_states = ready.take_read_states(); + self.handle_read_states(read_states); + + Ok(()) + }) + .unwrap(); + + // Advance the Raft. + let mut light_rd = self.raw_node.advance(ready); + + // Update commit index. + start_transaction(|| -> Result<(), TransactionError> { + if let Some(commit) = light_rd.commit_index() { + self.storage.persist_commit(commit).unwrap(); + } + + // Send out the messages. + let messages = light_rd.take_messages(); + self.handle_messages(messages); + + // Apply all committed entries. + let committed_entries = light_rd.take_committed_entries(); + self.handle_committed_entries(committed_entries, topology_changed, expelled); + + // Advance the apply index. + self.raw_node.advance_apply(); + Ok(()) + }) + .unwrap(); + } + #[inline] fn cleanup_notifications(&mut self) { self.notifications @@ -707,7 +790,6 @@ where fn raft_main_loop( status: Rc<RefCell<Status>>, inner_node: Rc<Mutex<InnerNode>>, - mut storage: RaftSpaceAccess, raft_loop_cond: Rc<Cond>, ) { let mut next_tick = Instant::now() + Node::TICK; @@ -724,93 +806,9 @@ fn raft_main_loop( inner_node.raw_node.tick(); } - // Get the `Ready` with `RawNode::ready` interface. - if !inner_node.raw_node.has_ready() { - continue; - } - - let mut ready: raft::Ready = inner_node.raw_node.ready(); let mut topology_changed = false; let mut expelled = false; - - start_transaction(|| -> Result<(), TransactionError> { - if !ready.messages().is_empty() { - // Send out the messages come from the node. - let messages = ready.take_messages(); - inner_node.handle_messages(messages); - } - - if !ready.snapshot().is_empty() { - // This is a snapshot, we need to apply the snapshot at first. - unimplemented!(); - // Storage::apply_snapshot(ready.snapshot()).unwrap(); - } - - let committed_entries = ready.take_committed_entries(); - inner_node.handle_committed_entries( - committed_entries, - &mut topology_changed, - &mut expelled, - ); - - if !ready.entries().is_empty() { - let e = ready.entries(); - // Append entries to the Raft log. - storage.persist_entries(e).unwrap(); - } - - if let Some(hs) = ready.hs() { - // Raft HardState changed, and we need to persist it. - // let hs = hs.clone(); - storage.persist_hard_state(hs).unwrap(); - } - - if let Some(ss) = ready.ss() { - let mut status = status.borrow_mut(); - status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id); - status.raft_state = format!("{:?}", ss.raft_state); - event::broadcast(Event::StatusChanged); - } - - if !ready.persisted_messages().is_empty() { - // Send out the persisted messages come from the node. - let messages = ready.take_persisted_messages(); - inner_node.handle_messages(messages); - } - - let read_states = ready.take_read_states(); - inner_node.handle_read_states(read_states); - - Ok(()) - }) - .unwrap(); - - // Advance the Raft. - let mut light_rd = inner_node.raw_node.advance(ready); - - // Update commit index. - start_transaction(|| -> Result<(), TransactionError> { - if let Some(commit) = light_rd.commit_index() { - storage.persist_commit(commit).unwrap(); - } - - // Send out the messages. - let messages = light_rd.take_messages(); - inner_node.handle_messages(messages); - - // Apply all committed entries. - let committed_entries = light_rd.take_committed_entries(); - inner_node.handle_committed_entries( - committed_entries, - &mut topology_changed, - &mut expelled, - ); - - // Advance the apply index. - inner_node.raw_node.advance_apply(); - Ok(()) - }) - .unwrap(); + inner_node.advance(&status, &mut topology_changed, &mut expelled); if expelled { crate::tarantool::exit(0); -- GitLab