diff --git a/src/main.rs b/src/main.rs index 69d0691102977857e4a34e4916da646f826fabc2..ef345b8bef59952933587fb77783a25861ebc638 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use traft::ExpelRequest; use clap::StructOpt as _; use protobuf::Message as _; -use crate::traft::{LogicalClock, RaftIndex}; +use crate::traft::{Grade, LogicalClock, RaftIndex, UpdatePeerRequest}; use traft::error::Error; mod app; @@ -805,17 +805,14 @@ fn postjoin(args: &args::Run) { let peer = traft::Storage::peer_by_raft_id(raft_id) .unwrap() .expect("peer must be persisted at the time of postjoin"); - let instance_id = peer.instance_id; let cluster_id = traft::Storage::cluster_id() .unwrap() .expect("cluster_id must be persisted at the time of postjoin"); - tlog!(Info, "initiating self-activation of {instance_id:?}"); - let mut req = traft::UpdatePeerRequest::set_online(instance_id, cluster_id); - let new_failure_domain = args.failure_domain(); - if new_failure_domain != peer.failure_domain { - req.set_failure_domain(new_failure_domain); - } + tlog!(Info, "initiating self-activation of {}", peer.instance_id); + let req = UpdatePeerRequest::new(peer.instance_id, cluster_id) + .with_grade(Grade::Online) + .with_failure_domain(args.failure_domain()); let leader_id = node.status().leader_id.expect("leader_id deinitialized"); let leader = traft::Storage::peer_by_raft_id(leader_id).unwrap().unwrap(); diff --git a/src/traft/failover.rs b/src/traft/failover.rs index a695cca5f60f4e632f1c6f9d4d43ba3a14eddbc1..1eb5aa65a54184bec059d91eec050e9493b7a088 100644 --- a/src/traft/failover.rs +++ b/src/traft/failover.rs @@ -9,6 +9,7 @@ use crate::{stringify_cfunc, tarantool, tlog}; use crate::traft::error::Error; use crate::traft::event; use crate::traft::node; +use crate::traft::Grade; use crate::traft::Storage; use crate::traft::{UpdatePeerRequest, UpdatePeerResponse}; @@ -23,12 +24,10 @@ pub fn on_shutdown() { } let peer = Storage::peer_by_raft_id(raft_id).unwrap().unwrap(); - let req = UpdatePeerRequest::set_offline( - peer.instance_id, - Storage::cluster_id() - .unwrap() - .expect("cluster_id must be present"), - ); + let cluster_id = Storage::cluster_id() + .unwrap() + .expect("cluster_id must be present"); + let req = UpdatePeerRequest::new(peer.instance_id, cluster_id).with_grade(Grade::Offline); let fn_name = stringify_cfunc!(raft_update_peer); // will run until we get successfully deactivate or tarantool shuts down diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 6c3d53933ee29117f792502d68ffc7e29c50442c..462198308b5292c066e1cb4cffd319b444dc89be 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -607,50 +607,39 @@ impl Default for Grade { } /////////////////////////////////////////////////////////////////////////////// -/// Request to deactivate the instance. +/// Request to update the instance in the storage. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct UpdatePeerRequest { - pub grade: Grade, pub instance_id: String, pub cluster_id: String, - pub failure_domain: Option<FailureDomain>, + pub changes: Vec<PeerChange>, } -impl Encode for UpdatePeerRequest {} -impl UpdatePeerRequest { - #[inline] - pub fn set_online(instance_id: impl Into<String>, cluster_id: impl Into<String>) -> Self { - Self { - grade: Grade::Online, - instance_id: instance_id.into(), - cluster_id: cluster_id.into(), - failure_domain: None, - } - } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum PeerChange { + Grade(Grade), + FailureDomain(FailureDomain), +} +impl Encode for UpdatePeerRequest {} +impl UpdatePeerRequest { #[inline] - pub fn set_offline(instance_id: impl Into<String>, cluster_id: impl Into<String>) -> Self { + pub fn new(instance_id: String, cluster_id: String) -> Self { Self { - grade: Grade::Offline, - instance_id: instance_id.into(), - cluster_id: cluster_id.into(), - failure_domain: None, + instance_id, + cluster_id, + changes: vec![], } } - #[inline] - pub fn set_expelled(instance_id: impl Into<String>, cluster_id: impl Into<String>) -> Self { - Self { - grade: Grade::Expelled, - instance_id: instance_id.into(), - cluster_id: cluster_id.into(), - failure_domain: None, - } + pub fn with_grade(mut self, grade: Grade) -> Self { + self.changes.push(PeerChange::Grade(grade)); + self } - #[inline] - pub fn set_failure_domain(&mut self, failure_domain: FailureDomain) { - self.failure_domain = Some(failure_domain); + pub fn with_failure_domain(mut self, failure_domain: FailureDomain) -> Self { + self.changes.push(PeerChange::FailureDomain(failure_domain)); + self } } diff --git a/src/traft/node.rs b/src/traft/node.rs index 83bbe7f80c0010a655e323ba8618648997826459..d8b704f2a0fb79cc892ba9ed7d2b471a65c5ea21 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -299,12 +299,7 @@ impl Node { advertise_address, failure_domain, ), - TopologyRequest::UpdatePeer(UpdatePeerRequest { - instance_id, - grade, - failure_domain, - .. - }) => topology.update_peer(&instance_id, grade, failure_domain), + TopologyRequest::UpdatePeer(req) => topology.update_peer(req), }; let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => { @@ -997,8 +992,8 @@ fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::erro return Err(Box::from("not a leader")); } - let req = UpdatePeerRequest::set_expelled(req.instance_id, req.cluster_id); - node.handle_topology_request_and_wait(req.into())?; + let req2 = UpdatePeerRequest::new(req.instance_id, req.cluster_id).with_grade(Grade::Expelled); + node.handle_topology_request_and_wait(req2.into())?; Ok(ExpelResponse {}) } diff --git a/src/traft/topology.rs b/src/traft/topology.rs index fb00c7bad5424a2e2eca0372bd37ee608ddda3e0..d2884928522d8129ed634fc1a7893b02c263b0d9 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -6,6 +6,7 @@ use crate::traft::FailureDomain; use crate::traft::Grade; use crate::traft::Peer; use crate::traft::{InstanceId, RaftId, ReplicasetId}; +use crate::traft::{PeerChange, UpdatePeerRequest}; use crate::util::Uppercase; use raft::INVALID_INDEX; @@ -41,6 +42,7 @@ impl Topology { self } + #[allow(dead_code)] pub fn get_peer(&mut self, instance_id: &str) -> Result<Peer, String> { let peer = self .instance_map @@ -186,34 +188,31 @@ impl Topology { Ok(peer.clone()) } - pub fn update_peer( - &mut self, - instance_id: &str, - grade: Grade, - failure_domain: Option<FailureDomain>, - ) -> Result<Peer, String> { - let current_peer = self.get_peer(instance_id).unwrap(); - let grade = match current_peer.grade { - Grade::Expelled => Grade::Expelled, - _ => grade, - }; - + pub fn update_peer(&mut self, req: UpdatePeerRequest) -> Result<Peer, String> { let this = self as *const Self; let mut peer = self .instance_map - .get_mut(instance_id) - .ok_or_else(|| format!("unknown instance {}", instance_id))?; - - if let Some(fd) = failure_domain { - // SAFETY: this is safe, because rust doesn't complain if you inline - // the function - unsafe { &*this }.check_required_failure_domain(&fd)?; - self.failure_domain_names.extend(fd.names().cloned()); - peer.failure_domain = fd; + .get_mut(&req.instance_id) + .ok_or_else(|| format!("unknown instance {}", req.instance_id))?; + + for change in req.changes { + match change { + PeerChange::Grade(grade) => { + if peer.grade != Grade::Expelled { + peer.grade = grade; + } + } + PeerChange::FailureDomain(fd) => { + // SAFETY: this is safe, because rust doesn't complain if you inline + // the function + unsafe { &*this }.check_required_failure_domain(&fd)?; + self.failure_domain_names.extend(fd.names().cloned()); + peer.failure_domain = fd; + } + } } - peer.grade = grade; Ok(peer.clone()) } } @@ -242,6 +241,7 @@ mod tests { use crate::traft::FailureDomain; use crate::traft::Grade::{Offline, Online}; use crate::traft::Peer; + use crate::traft::UpdatePeerRequest; use pretty_assertions::assert_eq; macro_rules! peers { @@ -310,13 +310,15 @@ mod tests { }; } - macro_rules! set_active { + macro_rules! set_grade { ( $topology:expr, $instance_id:expr, $grade:expr $(,)? ) => { - $topology.update_peer($instance_id, $grade, None) + $topology.update_peer( + UpdatePeerRequest::new($instance_id.into(), "".into()).with_grade($grade), + ) }; } @@ -326,7 +328,11 @@ mod tests { $instance_id:expr, $failure_domain:expr $(,)? ) => { - $topology.update_peer($instance_id, Online, Some($failure_domain)) + $topology.update_peer( + UpdatePeerRequest::new($instance_id.into(), "".into()) + .with_grade(Online) + .with_failure_domain($failure_domain), + ) }; } @@ -470,24 +476,24 @@ mod tests { .with_replication_factor(1); assert_eq!( - set_active!(topology, "i1", Offline).unwrap(), + set_grade!(topology, "i1", Offline).unwrap(), peer!(1, "i1", "r1", "nowhere", Offline), ); // idempotency assert_eq!( - set_active!(topology, "i1", Offline).unwrap(), + set_grade!(topology, "i1", Offline).unwrap(), peer!(1, "i1", "r1", "nowhere", Offline), ); assert_eq!( - set_active!(topology, "i2", Online).unwrap(), + set_grade!(topology, "i2", Online).unwrap(), peer!(2, "i2", "r2", "nowhere", Online), ); // idempotency assert_eq!( - set_active!(topology, "i2", Online).unwrap(), + set_grade!(topology, "i2", Online).unwrap(), peer!(2, "i2", "r2", "nowhere", Online), ); } @@ -623,7 +629,7 @@ mod tests { assert_eq!(peer.replicaset_id, "r1"); assert_eq!( - set_active!(t, "i3", Offline).unwrap().failure_domain, + set_grade!(t, "i3", Offline).unwrap().failure_domain, faildoms! {planet: B, owner: V, dimension: C137}, ); diff --git a/test/int/test_couple.py b/test/int/test_couple.py index c509d7e98751ea97c494fca10a0182582317f03f..c6bbabfa3fd8b99e37c2dcbed692e5c53da2385b 100644 --- a/test/int/test_couple.py +++ b/test/int/test_couple.py @@ -169,9 +169,12 @@ def test_deactivation(cluster2: Cluster): def raft_update_peer( host: Instance, target: Instance, is_active: bool ) -> list[bool]: - kind = "Online" if is_active else "Offline" + grade = "Online" if is_active else "Offline" return host.call( - ".raft_update_peer", kind, target.instance_id, target.cluster_id, None + ".raft_update_peer", + target.instance_id, + target.cluster_id, + [{"Grade": grade}], ) # check idempotency