diff --git a/src/traft/node.rs b/src/traft/node.rs index debe784338cfb8e45698fbfb8065574030feea4a..8d3d66e7b21571b7b8d94df5deedd606a95e9c22 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -788,113 +788,126 @@ fn raft_main_loop( } } -fn raft_conf_change_loop(status: Rc<RefCell<Status>>, storage: RaftSpaceAccess) { - loop { - if status.borrow().raft_state != "Leader" { - event::wait(Event::StatusChanged).expect("Events system must be initialized"); - continue; - } - - let term = storage.term().unwrap().unwrap_or(0); - let voter_ids: HashSet<RaftId> = - HashSet::from_iter(storage.voters().unwrap().unwrap_or_default()); - let learner_ids: HashSet<RaftId> = - HashSet::from_iter(storage.learners().unwrap().unwrap_or_default()); - let peer_is_active: HashMap<RaftId, bool> = Storage::peers() - .unwrap() +fn raft_conf_change(storage: &RaftSpaceAccess, peers: &[Peer]) -> Option<raft::ConfChangeV2> { + let voter_ids: HashSet<RaftId> = + HashSet::from_iter(storage.voters().unwrap().unwrap_or_default()); + let learner_ids: HashSet<RaftId> = + HashSet::from_iter(storage.learners().unwrap().unwrap_or_default()); + let peer_is_active: HashMap<RaftId, bool> = peers + .iter() + .map(|peer| (peer.raft_id, peer.is_active())) + .collect(); + + let (active_voters, to_demote): (Vec<RaftId>, Vec<RaftId>) = voter_ids + .iter() + .partition(|id| peer_is_active.get(id).copied().unwrap_or(false)); + + let active_learners: Vec<RaftId> = learner_ids + .iter() + .copied() + .filter(|id| peer_is_active.get(id).copied().unwrap_or(false)) + .collect(); + + let new_peers: Vec<RaftId> = peer_is_active + .iter() + .map(|(&id, _)| id) + .filter(|id| !voter_ids.contains(id) && !learner_ids.contains(id)) + .collect(); + + let mut changes: Vec<raft::ConfChangeSingle> = Vec::new(); + + const VOTER: bool = true; + const LEARNER: bool = false; + + changes.extend( + to_demote .into_iter() - .map(|peer| (peer.raft_id, peer.is_active())) - .collect(); - - let (active_voters, to_demote): (Vec<RaftId>, Vec<RaftId>) = voter_ids - .iter() - .partition(|id| peer_is_active.get(id).copied().unwrap_or(false)); - - let active_learners: Vec<RaftId> = learner_ids - .iter() - .copied() - .filter(|id| peer_is_active.get(id).copied().unwrap_or(false)) - .collect(); - - let new_peers: Vec<RaftId> = peer_is_active - .iter() - .map(|(&id, _)| id) - .filter(|id| !voter_ids.contains(id) && !learner_ids.contains(id)) - .collect(); - - let mut changes: Vec<raft::ConfChangeSingle> = Vec::new(); - - const VOTER: bool = true; - const LEARNER: bool = false; - - changes.extend( - to_demote - .into_iter() - .map(|id| conf_change_single(id, LEARNER)), - ); + .map(|id| conf_change_single(id, LEARNER)), + ); - let total_active = active_voters.len() + active_learners.len() + new_peers.len(); + let total_active = active_voters.len() + active_learners.len() + new_peers.len(); + + if total_active == 0 { + return None; + } - let new_peers_to_promote; - match failover::voters_needed(active_voters.len(), total_active) { - 0 => { + let new_peers_to_promote; + match failover::voters_needed(active_voters.len(), total_active) { + 0 => { + new_peers_to_promote = 0; + } + pos @ 1..=i64::MAX => { + let pos = pos as usize; + if pos < active_learners.len() { + for &raft_id in &active_learners[0..pos] { + changes.push(conf_change_single(raft_id, VOTER)) + } new_peers_to_promote = 0; - } - pos @ 1..=i64::MAX => { - let pos = pos as usize; - if pos < active_learners.len() { - for &raft_id in &active_learners[0..pos] { - changes.push(conf_change_single(raft_id, VOTER)) - } - new_peers_to_promote = 0; - } else { - for &raft_id in &active_learners { - changes.push(conf_change_single(raft_id, VOTER)) - } - new_peers_to_promote = pos - active_learners.len(); - assert!(new_peers_to_promote <= new_peers.len()); - for &raft_id in &new_peers[0..new_peers_to_promote] { - changes.push(conf_change_single(raft_id, VOTER)) - } + } else { + for &raft_id in &active_learners { + changes.push(conf_change_single(raft_id, VOTER)) } - } - neg @ i64::MIN..=-1 => { - let neg = -neg as usize; - assert!(neg < active_voters.len()); - for &raft_id in &active_voters[0..neg] { - changes.push(conf_change_single(raft_id, LEARNER)) + new_peers_to_promote = pos - active_learners.len(); + assert!(new_peers_to_promote <= new_peers.len()); + for &raft_id in &new_peers[0..new_peers_to_promote] { + changes.push(conf_change_single(raft_id, VOTER)) } - new_peers_to_promote = 0; } } - - for &raft_id in &new_peers[new_peers_to_promote..] { - changes.push(conf_change_single(raft_id, LEARNER)) + neg @ i64::MIN..=-1 => { + let neg = -neg as usize; + assert!(neg < active_voters.len()); + for &raft_id in &active_voters[0..neg] { + changes.push(conf_change_single(raft_id, LEARNER)) + } + new_peers_to_promote = 0; } + } + + for &raft_id in &new_peers[new_peers_to_promote..] { + changes.push(conf_change_single(raft_id, LEARNER)) + } - if changes.is_empty() { - event::wait(Event::TopologyChanged).expect("Events system must be initialized"); + if changes.is_empty() { + return None; + } + + let conf_change = raft::ConfChangeV2 { + transition: raft::ConfChangeTransition::Auto, + changes: changes.into(), + ..Default::default() + }; + + Some(conf_change) +} + +fn raft_conf_change_loop(status: Rc<RefCell<Status>>, storage: RaftSpaceAccess) { + loop { + if status.borrow().raft_state != "Leader" { + event::wait(Event::StatusChanged).expect("Events system must be initialized"); continue; } - let conf_change = raft::ConfChangeV2 { - transition: raft::ConfChangeTransition::Auto, - changes: changes.into(), - ..Default::default() - }; - + let peers = Storage::peers().unwrap(); + let term = storage.term().unwrap().unwrap_or(0); let node = global().expect("must be initialized"); - // main_loop gives the warranty that every ProposeConfChange - // will sometimes be handled and there's no need in timeout. - // It also guarantees that the notification will arrive only - // after the node leaves the joint state. - match node.propose_conf_change_and_wait(term, conf_change) { - Ok(()) => tlog!(Info, "conf_change processed"), - Err(e) => { - tlog!(Warning, "conf_change failed: {e}"); - fiber::sleep(Duration::from_secs(1)); + + if let Some(conf_change) = raft_conf_change(&storage, &peers) { + // main_loop gives the warranty that every ProposeConfChange + // will sometimes be handled and there's no need in timeout. + // It also guarantees that the notification will arrive only + // after the node leaves the joint state. + match node.propose_conf_change_and_wait(term, conf_change) { + Ok(()) => tlog!(Info, "conf_change processed"), + Err(e) => { + tlog!(Warning, "conf_change failed: {e}"); + fiber::sleep(Duration::from_secs(1)); + } } + continue; } + + event::wait(Event::TopologyChanged).expect("Events system must be initialized"); } }