Skip to content
Snippets Groups Projects
Commit 90d7e51f authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov Committed by Yaroslav Dynnikov
Browse files

refactor: move main_loop into the InnerNode

parent 23462877
No related branches found
No related tags found
1 merge request!250Move the code here and there
...@@ -109,9 +109,8 @@ impl Node { ...@@ -109,9 +109,8 @@ impl Node {
let main_loop_fn = { let main_loop_fn = {
let status = status.clone(); let status = status.clone();
let inner_node = inner_node.clone(); let inner_node = inner_node.clone();
let storage = storage.clone();
let raft_loop_cond = raft_loop_cond.clone(); let raft_loop_cond = raft_loop_cond.clone();
move || raft_main_loop(status, inner_node, storage, raft_loop_cond) move || raft_main_loop(status, inner_node, raft_loop_cond)
}; };
let conf_change_loop_fn = { let conf_change_loop_fn = {
...@@ -660,6 +659,90 @@ impl InnerNode { ...@@ -660,6 +659,90 @@ impl InnerNode {
} }
} }
fn advance(
&mut self,
status: &RefCell<Status>,
topology_changed: &mut bool,
expelled: &mut bool,
) {
// Get the `Ready` with `RawNode::ready` interface.
if !self.raw_node.has_ready() {
return;
}
let mut ready: raft::Ready = self.raw_node.ready();
start_transaction(|| -> Result<(), TransactionError> {
if !ready.messages().is_empty() {
// Send out the messages come from the node.
let messages = ready.take_messages();
self.handle_messages(messages);
}
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
unimplemented!();
}
let committed_entries = ready.take_committed_entries();
self.handle_committed_entries(committed_entries, topology_changed, expelled);
if !ready.entries().is_empty() {
let e = ready.entries();
// Append entries to the Raft log.
self.storage.persist_entries(e).unwrap();
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
self.storage.persist_hard_state(hs).unwrap();
}
if let Some(ss) = ready.ss() {
let mut status = status.borrow_mut();
status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id);
status.raft_state = format!("{:?}", ss.raft_state);
event::broadcast(Event::StatusChanged);
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
let messages = ready.take_persisted_messages();
self.handle_messages(messages);
}
let read_states = ready.take_read_states();
self.handle_read_states(read_states);
Ok(())
})
.unwrap();
// Advance the Raft.
let mut light_rd = self.raw_node.advance(ready);
// Update commit index.
start_transaction(|| -> Result<(), TransactionError> {
if let Some(commit) = light_rd.commit_index() {
self.storage.persist_commit(commit).unwrap();
}
// Send out the messages.
let messages = light_rd.take_messages();
self.handle_messages(messages);
// Apply all committed entries.
let committed_entries = light_rd.take_committed_entries();
self.handle_committed_entries(committed_entries, topology_changed, expelled);
// Advance the apply index.
self.raw_node.advance_apply();
Ok(())
})
.unwrap();
}
#[inline] #[inline]
fn cleanup_notifications(&mut self) { fn cleanup_notifications(&mut self) {
self.notifications self.notifications
...@@ -707,7 +790,6 @@ where ...@@ -707,7 +790,6 @@ where
fn raft_main_loop( fn raft_main_loop(
status: Rc<RefCell<Status>>, status: Rc<RefCell<Status>>,
inner_node: Rc<Mutex<InnerNode>>, inner_node: Rc<Mutex<InnerNode>>,
mut storage: RaftSpaceAccess,
raft_loop_cond: Rc<Cond>, raft_loop_cond: Rc<Cond>,
) { ) {
let mut next_tick = Instant::now() + Node::TICK; let mut next_tick = Instant::now() + Node::TICK;
...@@ -724,93 +806,9 @@ fn raft_main_loop( ...@@ -724,93 +806,9 @@ fn raft_main_loop(
inner_node.raw_node.tick(); inner_node.raw_node.tick();
} }
// Get the `Ready` with `RawNode::ready` interface.
if !inner_node.raw_node.has_ready() {
continue;
}
let mut ready: raft::Ready = inner_node.raw_node.ready();
let mut topology_changed = false; let mut topology_changed = false;
let mut expelled = false; let mut expelled = false;
inner_node.advance(&status, &mut topology_changed, &mut expelled);
start_transaction(|| -> Result<(), TransactionError> {
if !ready.messages().is_empty() {
// Send out the messages come from the node.
let messages = ready.take_messages();
inner_node.handle_messages(messages);
}
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
unimplemented!();
// Storage::apply_snapshot(ready.snapshot()).unwrap();
}
let committed_entries = ready.take_committed_entries();
inner_node.handle_committed_entries(
committed_entries,
&mut topology_changed,
&mut expelled,
);
if !ready.entries().is_empty() {
let e = ready.entries();
// Append entries to the Raft log.
storage.persist_entries(e).unwrap();
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
storage.persist_hard_state(hs).unwrap();
}
if let Some(ss) = ready.ss() {
let mut status = status.borrow_mut();
status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id);
status.raft_state = format!("{:?}", ss.raft_state);
event::broadcast(Event::StatusChanged);
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
let messages = ready.take_persisted_messages();
inner_node.handle_messages(messages);
}
let read_states = ready.take_read_states();
inner_node.handle_read_states(read_states);
Ok(())
})
.unwrap();
// Advance the Raft.
let mut light_rd = inner_node.raw_node.advance(ready);
// Update commit index.
start_transaction(|| -> Result<(), TransactionError> {
if let Some(commit) = light_rd.commit_index() {
storage.persist_commit(commit).unwrap();
}
// Send out the messages.
let messages = light_rd.take_messages();
inner_node.handle_messages(messages);
// Apply all committed entries.
let committed_entries = light_rd.take_committed_entries();
inner_node.handle_committed_entries(
committed_entries,
&mut topology_changed,
&mut expelled,
);
// Advance the apply index.
inner_node.raw_node.advance_apply();
Ok(())
})
.unwrap();
if expelled { if expelled {
crate::tarantool::exit(0); crate::tarantool::exit(0);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment