Skip to content
Snippets Groups Projects
Verified Commit a65696c8 authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

Address review comments

Replace macros with function in `traft::node`.
parent ef459775
No related branches found
No related tags found
1 merge request!71Fair raft join
......@@ -320,95 +320,97 @@ fn raft_main_loop(
let mut ready: raft::Ready = raw_node.ready();
macro_rules! handle_read_states {
($read_states:ident) => {{
for rs in $read_states {
if let Some(ctx) = traft::EntryContextNormal::read_from_bytes(&rs.request_ctx)
.expect("Abnormal entry in message context")
{
if let Some(notify) = notifications.remove(&ctx.lc) {
notify.try_send(Ok(rs.index)).ok();
}
fn handle_read_states(
read_states: Vec<raft::ReadState>,
notifications: &mut HashMap<LogicalClock, Notify>,
) {
for rs in read_states {
if let Some(ctx) = traft::EntryContextNormal::read_from_bytes(&rs.request_ctx)
.expect("Abnormal entry in message context")
{
if let Some(notify) = notifications.remove(&ctx.lc) {
notify.try_send(Ok(rs.index)).ok();
}
}
}};
}
}
macro_rules! handle_messages {
($messages:ident) => {{
for msg in $messages {
if let Err(e) = pool.send(&msg) {
tlog!(Error, "{e}");
}
fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) {
for msg in messages {
if let Err(e) = pool.send(&msg) {
tlog!(Error, "{e}");
}
}};
}
}
macro_rules! handle_committed_entries {
($entries:ident) => {{
for entry in $entries
.iter()
.map(|e| traft::Entry::try_from(e).expect("wtf"))
{
match raft::EntryType::from_i32(entry.entry_type) {
Some(raft::EntryType::EntryNormal) => {
use traft::Op::*;
match entry.op() {
None => (),
Some(Nop) => (),
Some(Info { msg }) => tlog!(Info, "{msg}"),
Some(EvalLua { code }) => crate::tarantool::eval(&code),
}
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.try_send(Ok(entry.index)).ok();
}
}
fn handle_committed_entries(
entries: Vec<raft::Entry>,
notifications: &mut HashMap<LogicalClock, Notify>,
raw_node: &mut RawNode,
pool: &mut ConnectionPool,
) {
for entry in entries
.iter()
.map(|e| traft::Entry::try_from(e).expect("wtf"))
{
match raft::EntryType::from_i32(entry.entry_type) {
Some(raft::EntryType::EntryNormal) => {
use traft::Op::*;
match entry.op() {
None => (),
Some(Nop) => (),
Some(Info { msg }) => tlog!(Info, "{msg}"),
Some(EvalLua { code }) => crate::tarantool::eval(&code),
}
Some(entry_type @ raft::EntryType::EntryConfChange)
| Some(entry_type @ raft::EntryType::EntryConfChangeV2) => {
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.try_send(Ok(entry.index)).ok();
}
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.try_send(Ok(entry.index)).ok();
}
for peer in entry.iter_peers() {
let peer = traft::Peer {
commit_index: entry.index,
..peer.clone()
};
Storage::persist_peer(&peer).unwrap();
pool.connect(peer.raft_id, peer.peer_address);
}
}
Some(entry_type @ raft::EntryType::EntryConfChange)
| Some(entry_type @ raft::EntryType::EntryConfChangeV2) => {
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.try_send(Ok(entry.index)).ok();
}
let cs = match entry_type {
raft::EntryType::EntryConfChange => {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
raw_node.apply_conf_change(&cc).unwrap()
}
raft::EntryType::EntryConfChangeV2 => {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
raw_node.apply_conf_change(&cc).unwrap()
}
_ => unreachable!(),
}
for peer in entry.iter_peers() {
let peer = traft::Peer {
commit_index: entry.index,
..peer.clone()
};
Storage::persist_conf_state(&cs).unwrap();
Storage::persist_peer(&peer).unwrap();
pool.connect(peer.raft_id, peer.peer_address);
}
None => unreachable!(),
}
Storage::persist_applied(entry.index).unwrap();
let cs = match entry_type {
raft::EntryType::EntryConfChange => {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
raw_node.apply_conf_change(&cc).unwrap()
}
raft::EntryType::EntryConfChangeV2 => {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
raw_node.apply_conf_change(&cc).unwrap()
}
_ => unreachable!(),
};
Storage::persist_conf_state(&cs).unwrap();
}
None => unreachable!(),
}
}};
Storage::persist_applied(entry.index).unwrap();
}
}
start_transaction(|| -> Result<(), TransactionError> {
if !ready.messages().is_empty() {
// Send out the messages come from the node.
let messages = ready.take_messages();
handle_messages!(messages);
handle_messages(messages, &pool);
}
if !ready.snapshot().is_empty() {
......@@ -418,7 +420,12 @@ fn raft_main_loop(
}
let committed_entries = ready.take_committed_entries();
handle_committed_entries!(committed_entries);
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
);
if !ready.entries().is_empty() {
let e = ready.entries();
......@@ -442,11 +449,11 @@ fn raft_main_loop(
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
let messages = ready.take_persisted_messages();
handle_messages!(messages);
handle_messages(messages, &pool);
}
let read_states = ready.take_read_states();
handle_read_states!(read_states);
handle_read_states(read_states, &mut notifications);
Ok(())
})
......@@ -463,11 +470,16 @@ fn raft_main_loop(
// Send out the messages.
let messages = light_rd.take_messages();
handle_messages!(messages);
handle_messages(messages, &pool);
// Apply all committed entries.
let committed_entries = light_rd.take_committed_entries();
handle_committed_entries!(committed_entries);
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
);
// Advance the apply index.
raw_node.advance_apply();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment