diff --git a/src/traft/node.rs b/src/traft/node.rs index 6f31fd0081eb589ee068c15d8234c15bc0d8f8fb..fe2197cf178f7de3db6897f854bc841c7bf0f894 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -284,6 +284,14 @@ impl Node { notify.recv() } + /// Attempt to transfer leadership to a given node and yield. + /// + /// **This function yields** + pub fn transfer_leadership_and_yield(&self, new_leader_id: RaftId) { + self.raw_operation(|node_impl| node_impl.raw_node.transfer_leader(new_leader_id)); + fiber::reschedule(); + } + /// This function **may yield** if `self.node_impl` mutex is acquired. #[inline] fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R { @@ -955,12 +963,17 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { } //////////////////////////////////////////////////////////////////////// - // offline + // offline/expel let to_offline = peers .iter() .filter(|peer| peer.current_grade != CurrentGradeVariant::Offline) // TODO: process them all, not just the first one - .find(|peer| peer.target_grade == TargetGradeVariant::Offline); + .find(|peer| { + let (target, current) = (peer.target_grade.variant, peer.current_grade.variant); + matches!(target, TargetGradeVariant::Offline) + || !matches!(current, CurrentGradeVariant::Expelled) + && matches!(target, TargetGradeVariant::Expelled) + }); if let Some(peer) = to_offline { tlog!( Info, @@ -970,6 +983,23 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { peer.target_grade ); + // transfer leadership, if we're the one who goes offline + if peer.raft_id == node.raft_id { + if let Some(new_leader) = maybe_responding(&peers).find(|peer| { + // FIXME: linear search + voters.contains(&peer.raft_id) + }) { + tlog!( + Info, + "transferring leadership to {}", + new_leader.instance_id + ); + node.transfer_leadership_and_yield(new_leader.raft_id); + event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + continue 'governor; + } + } + let replicaset_id = &peer.replicaset_id; // choose a new replicaset master if needed let res = (|| -> traft::Result<_> { @@ -1040,7 +1070,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { // update peer's CurrentGrade let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id.clone()) - .with_current_grade(CurrentGrade::offline(peer.target_grade.incarnation)); + .with_current_grade(peer.target_grade.into()); tlog!(Info, "handling UpdatePeerRequest"; "current_grade" => %req.current_grade.expect("just set"), diff --git a/src/traft/rpc/expel.rs b/src/traft/rpc/expel.rs index 2698e3dd9572f63fc5c6605353cc5491f2061a8c..d4018a70d5a7b255ba28f008add464970c2bf281 100644 --- a/src/traft/rpc/expel.rs +++ b/src/traft/rpc/expel.rs @@ -23,9 +23,7 @@ crate::define_rpc_request! { } let req2 = UpdatePeerRequest::new(req.instance_id, req.cluster_id) - .with_target_grade(traft::TargetGradeVariant::Expelled) - // TODO: only change target grade - .with_current_grade(traft::CurrentGrade::expelled(0)); + .with_target_grade(traft::TargetGradeVariant::Expelled); node.handle_topology_request_and_wait(req2.into())?; Ok(Response {}) diff --git a/src/traft/topology.rs b/src/traft/topology.rs index f66978a0daff4b08962393bd1285aac5e62b6e70..0a03c48651b1013b5a305a605856685692711605 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -195,7 +195,17 @@ impl Topology { .get_mut(&req.instance_id) .ok_or_else(|| format!("unknown instance {}", req.instance_id))?; - if peer.current_grade == CurrentGradeVariant::Expelled { + if peer.current_grade == CurrentGradeVariant::Expelled + && !matches!( + req, + UpdatePeerRequest { + target_grade: None, + current_grade: Some(current_grade), + failure_domain: None, + .. + } if current_grade == CurrentGradeVariant::Expelled + ) + { return Err(format!( "cannot update expelled peer \"{}\"", peer.instance_id diff --git a/test/int/test_expelling.py b/test/int/test_expelling.py index e04d7547969aa06df66db4cc15ee9678805e8f77..2e28f50bae7a1eeb2a363a34dcc6096091dda9bc 100644 --- a/test/int/test_expelling.py +++ b/test/int/test_expelling.py @@ -9,11 +9,9 @@ def cluster3(cluster: Cluster): def assert_peer_expelled(expelled_peer: Instance, instance: Instance): - current_grade = instance.eval( - "return picolib.peer_info(...).current_grade.variant", - expelled_peer.instance_id, - ) - assert current_grade == "Expelled" + peer_info = instance.call("picolib.peer_info", expelled_peer.instance_id) + grades = peer_info["current_grade"]["variant"], peer_info["target_grade"]["variant"] + assert ("Expelled", "Expelled") == grades def assert_voters(voters: list[Instance], instance: Instance):