diff --git a/src/traft/node.rs b/src/traft/node.rs index 26894c392ea23c1d7f45dacd924ab64bee2f02d2..25e43b297f610d077fe6fd02741443de400151bb 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -268,7 +268,6 @@ struct InnerNode { pub raw_node: RawNode, pub notifications: HashMap<LogicalClock, Notify>, topology_cache: CachedCell<RaftTerm, Topology>, - #[allow(dead_code)] storage: RaftSpaceAccess, lc: LogicalClock, } @@ -479,6 +478,177 @@ impl InnerNode { Ok(rx) } + /// Is called during a transaction + fn handle_committed_entries( + &mut self, + entries: Vec<raft::Entry>, + pool: &mut ConnectionPool, + topology_changed: &mut bool, + expelled: &mut bool, + ) { + for entry in &entries { + let entry = match traft::Entry::try_from(entry) { + Ok(v) => v, + Err(e) => { + tlog!(Error, "abnormal entry: {e}, entry = {entry:?}"); + continue; + } + }; + + match entry.entry_type { + raft::EntryType::EntryNormal => { + self.handle_committed_normal_entry(entry, pool, topology_changed, expelled) + } + raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => { + self.handle_committed_conf_change(entry) + } + } + } + + if let Some(last_entry) = entries.last() { + if let Err(e) = self.storage.persist_applied(last_entry.index) { + tlog!( + Error, + "error persisting applied index: {e}"; + "index" => last_entry.index + ); + } else { + event::broadcast(Event::RaftEntryApplied); + } + } + } + + /// Is called during a transaction + fn handle_committed_normal_entry( + &mut self, + entry: traft::Entry, + pool: &mut ConnectionPool, + topology_changed: &mut bool, + expelled: &mut bool, + ) { + assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); + let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit(); + + if let Some(lc) = entry.lc() { + if let Some(notify) = self.notifications.remove(lc) { + notify.notify_ok_any(result); + } + } + + if let Some(traft::Op::PersistPeer { peer }) = entry.op() { + pool.connect(peer.raft_id, peer.peer_address.clone()); + *topology_changed = true; + if peer.grade == Grade::Expelled && peer.raft_id == self.raft_id() { + // cannot exit during a transaction + *expelled = true; + } + } + + with_joint_state_latch(|joint_state_latch| { + if let Some(latch) = joint_state_latch.take() { + if entry.index != latch.index { + joint_state_latch.set(Some(latch)); + return; + } + + // It was expected to be a ConfChange entry, but it's + // normal. Raft must have overriden it, or there was + // a re-election. + let e = RaftError::ConfChangeError("rolled back".into()); + + latch.notify.notify_err(e); + event::broadcast(Event::JointStateDrop); + } + }); + } + + /// Is called during a transaction + fn handle_committed_conf_change(&mut self, entry: traft::Entry) { + let latch_unlock = || { + with_joint_state_latch(|joint_state_latch| { + if let Some(latch) = joint_state_latch.take() { + latch.notify.notify_ok(()); + event::broadcast(Event::JointStateLeave); + } + }); + }; + + // Beware: a tiny difference in type names (`V2` or not `V2`) + // makes a significant difference in `entry.data` binary layout and + // in joint state transitions. + + // `ConfChangeTransition::Auto` implies that `ConfChangeV2` may be + // applied in an instant without entering the joint state. + + let (is_joint, conf_state) = match entry.entry_type { + raft::EntryType::EntryConfChange => { + let mut cc = raft::ConfChange::default(); + cc.merge_from_bytes(&entry.data).unwrap(); + + latch_unlock(); + + (false, self.raw_node.apply_conf_change(&cc).unwrap()) + } + raft::EntryType::EntryConfChangeV2 => { + let mut cc = raft::ConfChangeV2::default(); + cc.merge_from_bytes(&entry.data).unwrap(); + + // Unlock the latch when either of conditions is met: + // - conf_change will leave the joint state; + // - or it will be applied without even entering one. + let leave_joint = cc.leave_joint() || cc.enter_joint().is_none(); + if leave_joint { + latch_unlock(); + } + + // ConfChangeTransition::Auto implies that at this + // moment raft-rs will implicitly propose another empty + // conf change that represents leaving the joint state. + (!leave_joint, self.raw_node.apply_conf_change(&cc).unwrap()) + } + _ => unreachable!(), + }; + + let raft_id = &self.raft_id(); + let voters_old = self.storage.voters().unwrap().unwrap_or_default(); + if voters_old.contains(raft_id) && !conf_state.voters.contains(raft_id) { + if is_joint { + event::broadcast_when(Event::Demoted, Event::JointStateLeave).ok(); + } else { + event::broadcast(Event::Demoted); + } + } + + self.storage.persist_conf_state(&conf_state).unwrap(); + } + + /// Is called during a transaction + fn handle_read_states(&mut self, read_states: Vec<raft::ReadState>) { + for rs in read_states { + let ctx = match traft::EntryContextNormal::read_from_bytes(&rs.request_ctx) { + Ok(Some(v)) => v, + Ok(None) => continue, + Err(e) => { + tlog!(Error, "abnormal read_state: {e}, read_state = {rs:?}"); + continue; + } + }; + + if let Some(notify) = self.notifications.remove(&ctx.lc) { + notify.notify_ok(rs.index); + } + } + } + + /// Is called during a transaction + fn handle_messages(&self, messages: Vec<raft::Message>, pool: &ConnectionPool) { + for msg in messages { + if let Err(e) = pool.send(&msg) { + tlog!(Error, "{e}"); + } + } + } + #[inline] fn cleanup_notifications(&mut self) { self.notifications @@ -523,188 +693,6 @@ where JOINT_STATE_LATCH.with(f) } -/// Is called during a transaction -fn handle_committed_entries( - entries: Vec<raft::Entry>, - inner_node: &mut InnerNode, - storage: &mut RaftSpaceAccess, - pool: &mut ConnectionPool, - topology_changed: &mut bool, - expelled: &mut bool, -) { - for entry in &entries { - let entry = match traft::Entry::try_from(entry) { - Ok(v) => v, - Err(e) => { - tlog!(Error, "abnormal entry: {e}, entry = {entry:?}"); - continue; - } - }; - - match entry.entry_type { - raft::EntryType::EntryNormal => { - handle_committed_normal_entry(entry, pool, topology_changed, expelled, inner_node) - } - raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => { - handle_committed_conf_change(entry, inner_node, storage) - } - } - } - - if let Some(last_entry) = entries.last() { - if let Err(e) = storage.persist_applied(last_entry.index) { - tlog!( - Error, - "error persisting applied index: {e}"; - "index" => last_entry.index - ); - } else { - event::broadcast(Event::RaftEntryApplied); - } - } -} - -/// Is called during a transaction -fn handle_committed_normal_entry( - entry: traft::Entry, - pool: &mut ConnectionPool, - topology_changed: &mut bool, - expelled: &mut bool, - inner_node: &mut InnerNode, -) { - assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); - let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit(); - - if let Some(lc) = entry.lc() { - if let Some(notify) = inner_node.notifications.remove(lc) { - notify.notify_ok_any(result); - } - } - - if let Some(traft::Op::PersistPeer { peer }) = entry.op() { - pool.connect(peer.raft_id, peer.peer_address.clone()); - *topology_changed = true; - if peer.grade == Grade::Expelled && peer.raft_id == inner_node.raw_node.raft.id { - // cannot exit during a transaction - *expelled = true; - } - } - - with_joint_state_latch(|joint_state_latch| { - if let Some(latch) = joint_state_latch.take() { - if entry.index != latch.index { - joint_state_latch.set(Some(latch)); - return; - } - - // It was expected to be a ConfChange entry, but it's - // normal. Raft must have overriden it, or there was - // a re-election. - let e = RaftError::ConfChangeError("rolled back".into()); - - latch.notify.notify_err(e); - event::broadcast(Event::JointStateDrop); - } - }); -} - -/// Is called during a transaction -fn handle_committed_conf_change( - entry: traft::Entry, - inner_node: &mut InnerNode, - storage: &mut RaftSpaceAccess, -) { - let latch_unlock = || { - with_joint_state_latch(|joint_state_latch| { - if let Some(latch) = joint_state_latch.take() { - latch.notify.notify_ok(()); - event::broadcast(Event::JointStateLeave); - } - }); - }; - - // Beware: a tiny difference in type names (`V2` or not `V2`) - // makes a significant difference in `entry.data` binary layout and - // in joint state transitions. - - // `ConfChangeTransition::Auto` implies that `ConfChangeV2` may be - // applied in an instant without entering the joint state. - - let (is_joint, conf_state) = match entry.entry_type { - raft::EntryType::EntryConfChange => { - let mut cc = raft::ConfChange::default(); - cc.merge_from_bytes(&entry.data).unwrap(); - - latch_unlock(); - - (false, inner_node.raw_node.apply_conf_change(&cc).unwrap()) - } - raft::EntryType::EntryConfChangeV2 => { - let mut cc = raft::ConfChangeV2::default(); - cc.merge_from_bytes(&entry.data).unwrap(); - - // Unlock the latch when either of conditions is met: - // - conf_change will leave the joint state; - // - or it will be applied without even entering one. - let leave_joint = cc.leave_joint() || cc.enter_joint().is_none(); - if leave_joint { - latch_unlock(); - } - - // ConfChangeTransition::Auto implies that at this - // moment raft-rs will implicitly propose another empty - // conf change that represents leaving the joint state. - ( - !leave_joint, - inner_node.raw_node.apply_conf_change(&cc).unwrap(), - ) - } - _ => unreachable!(), - }; - - let raft_id = &inner_node.raw_node.raft.id; - let voters_old = storage.voters().unwrap().unwrap_or_default(); - if voters_old.contains(raft_id) && !conf_state.voters.contains(raft_id) { - if is_joint { - event::broadcast_when(Event::Demoted, Event::JointStateLeave).ok(); - } else { - event::broadcast(Event::Demoted); - } - } - - storage.persist_conf_state(&conf_state).unwrap(); -} - -/// Is called during a transaction -fn handle_read_states( - read_states: Vec<raft::ReadState>, - notifications: &mut HashMap<LogicalClock, Notify>, -) { - for rs in read_states { - let ctx = match traft::EntryContextNormal::read_from_bytes(&rs.request_ctx) { - Ok(Some(v)) => v, - Ok(None) => continue, - Err(e) => { - tlog!(Error, "abnormal read_state: {e}, read_state = {rs:?}"); - continue; - } - }; - - if let Some(notify) = notifications.remove(&ctx.lc) { - notify.notify_ok(rs.index); - } - } -} - -/// Is called during a transaction -fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) { - for msg in messages { - if let Err(e) = pool.send(&msg) { - tlog!(Error, "{e}"); - } - } -} - fn raft_main_loop( status: Rc<RefCell<Status>>, inner_node: Rc<Mutex<InnerNode>>, @@ -748,7 +736,7 @@ fn raft_main_loop( if !ready.messages().is_empty() { // Send out the messages come from the node. let messages = ready.take_messages(); - handle_messages(messages, &pool); + inner_node.handle_messages(messages, &pool); } if !ready.snapshot().is_empty() { @@ -758,10 +746,8 @@ fn raft_main_loop( } let committed_entries = ready.take_committed_entries(); - handle_committed_entries( + inner_node.handle_committed_entries( committed_entries, - &mut *inner_node, - &mut storage, &mut pool, &mut topology_changed, &mut expelled, @@ -789,11 +775,11 @@ fn raft_main_loop( if !ready.persisted_messages().is_empty() { // Send out the persisted messages come from the node. let messages = ready.take_persisted_messages(); - handle_messages(messages, &pool); + inner_node.handle_messages(messages, &pool); } let read_states = ready.take_read_states(); - handle_read_states(read_states, &mut inner_node.notifications); + inner_node.handle_read_states(read_states); Ok(()) }) @@ -814,14 +800,12 @@ fn raft_main_loop( // Send out the messages. let messages = light_rd.take_messages(); - handle_messages(messages, &pool); + inner_node.handle_messages(messages, &pool); // Apply all committed entries. let committed_entries = light_rd.take_committed_entries(); - handle_committed_entries( + inner_node.handle_committed_entries( committed_entries, - &mut *inner_node, - &mut storage, &mut pool, &mut topology_changed, &mut expelled,