Skip to content
Snippets Groups Projects
Commit 7f3812da authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

refactor: split handle_committed_entries function

parent 96b368d9
No related branches found
No related tags found
1 merge request!157refactor: cut raft_main loop into pieces
......@@ -29,7 +29,6 @@ use crate::traft::ContextCoercion as _;
use crate::traft::Peer;
use ::tarantool::util::IntoClones as _;
use protobuf::Message as _;
use protobuf::ProtobufEnum as _;
use crate::mailbox::Mailbox;
use crate::tlog;
......@@ -308,6 +307,125 @@ impl Node {
}
}
struct JointStateLatch {
index: u64,
notify: Notify,
}
fn handle_committed_entries(
entries: Vec<raft::Entry>,
notifications: &mut HashMap<LogicalClock, Notify>,
raw_node: &mut RawNode,
pool: &mut ConnectionPool,
joint_state_latch: &mut Option<JointStateLatch>,
config_changed: &mut bool,
) {
for entry in &entries {
let entry = match traft::Entry::try_from(entry) {
Ok(v) => v,
Err(e) => {
tlog!(
Error,
"error parsing (and applying) an entry: {e}, entry = {entry:?}"
);
continue;
}
};
if entry.entry_type == raft::EntryType::EntryNormal as i32 {
handle_committed_normal_entry(entry, notifications, joint_state_latch)
} else {
handle_committed_conf_change(entry, raw_node, pool, joint_state_latch, config_changed)
}
}
if let Some(last_entry) = entries.last() {
if let Err(e) = Storage::persist_applied(last_entry.index) {
tlog!(
Error,
"error persisting applied index: {e}";
"index" => last_entry.index
);
};
}
}
fn handle_committed_normal_entry(
entry: traft::Entry,
notifications: &mut HashMap<LogicalClock, Notify>,
joint_state_latch: &mut Option<JointStateLatch>,
) {
let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit();
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.notify_ok_any(result);
}
}
if let Some(latch) = joint_state_latch {
if entry.index == latch.index {
// It was expected to be a ConfChange entry, but it's
// normal. Raft must have overriden it, or there was
// a re-election.
let e = RaftError::ConfChangeError("rolled back".into());
latch.notify.notify_err(e);
}
*joint_state_latch = None;
}
}
fn handle_committed_conf_change(
entry: traft::Entry,
raw_node: &mut RawNode,
pool: &mut ConnectionPool,
joint_state_latch: &mut Option<JointStateLatch>,
config_changed: &mut bool,
) {
for peer in entry.iter_peers() {
let peer = traft::Peer {
commit_index: entry.index,
..peer.clone()
};
Storage::persist_peer_by_instance_id(&peer).unwrap();
pool.connect(peer.raft_id, peer.peer_address);
}
// Beware: this tiny difference in type names
// (`V2` or not `V2`) makes a significant
// difference in `entry.data` binary layout
// and in joint state transitions.
let conf_state;
if entry.entry_type == raft::EntryType::EntryConfChange as i32 {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
*config_changed = true;
conf_state = raw_node.apply_conf_change(&cc).unwrap();
} else {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
// Unlock the latch only when leaving the joint state
if cc.changes.is_empty() {
if let Some(latch) = joint_state_latch {
latch.notify.notify_ok(entry.index);
*joint_state_latch = None;
*config_changed = true;
}
}
// ConfChangeTransition::Implicit implies that at this
// moment raft-rs will implicitly propose another empty
// conf change that represents leaving the joint state.
conf_state = raw_node.apply_conf_change(&cc).unwrap()
};
Storage::persist_conf_state(&conf_state).unwrap();
}
fn raft_main_loop(
main_inbox: Mailbox<NormalRequest>,
status: Rc<RefCell<Status>>,
......@@ -336,11 +454,6 @@ fn raft_main_loop(
let mut joint_state_latch: Option<JointStateLatch> = None;
struct JointStateLatch {
index: u64,
notify: Notify,
}
loop {
// Clean up obsolete notifications
notifications.retain(|_, notify: &mut Notify| !notify.is_closed());
......@@ -522,93 +635,6 @@ fn raft_main_loop(
}
}
fn handle_committed_entries(
entries: Vec<raft::Entry>,
notifications: &mut HashMap<LogicalClock, Notify>,
raw_node: &mut RawNode,
pool: &mut ConnectionPool,
joint_state_latch: &mut Option<JointStateLatch>,
config_changed: &mut bool,
) {
for entry in entries
.iter()
.map(|e| traft::Entry::try_from(e).expect("wtf"))
{
match raft::EntryType::from_i32(entry.entry_type) {
Some(raft::EntryType::EntryNormal) => {
let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit();
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.notify_ok_any(result);
}
}
if let Some(latch) = joint_state_latch {
if entry.index == latch.index {
// It was expected to be a ConfChange entry, but it's
// normal. Raft must have overriden it, or there was
// a re-election.
let e = RaftError::ConfChangeError("ignored".into());
latch.notify.notify_err(e);
}
*joint_state_latch = None;
}
}
Some(entry_type @ raft::EntryType::EntryConfChange)
| Some(entry_type @ raft::EntryType::EntryConfChangeV2) => {
for peer in entry.iter_peers() {
let peer = traft::Peer {
commit_index: entry.index,
..peer.clone()
};
Storage::persist_peer_by_instance_id(&peer).unwrap();
pool.connect(peer.raft_id, peer.peer_address);
}
let cs = match entry_type {
// Beware: this tiny difference in type names
// (`V2` or not `V2`) makes a significant
// difference in `entry.data` binary layout
// and in joint state transitions.
raft::EntryType::EntryConfChange => {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
*config_changed = true;
raw_node.apply_conf_change(&cc).unwrap()
}
raft::EntryType::EntryConfChangeV2 => {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
// Unlock the latch only when leaving the joint state
if cc.changes.is_empty() {
if let Some(latch) = joint_state_latch {
latch.notify.notify_ok(entry.index);
*joint_state_latch = None;
*config_changed = true;
}
}
// ConfChangeTransition::Implicit implies that at this
// moment raft-rs will implicitly propose another empty
// conf change that represents leaving the joint state.
raw_node.apply_conf_change(&cc).unwrap()
}
_ => unreachable!(),
};
Storage::persist_conf_state(&cs).unwrap();
}
None => unreachable!(),
}
Storage::persist_applied(entry.index).unwrap();
}
}
let mut config_changed = false;
start_transaction(|| -> Result<(), TransactionError> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment