diff --git a/src/args.rs b/src/args.rs index 3600e217c8be3ffc7bc519442c67e38ddf1aa179..dac60d42767523069ca757e3bcb9c1487659ae60 100644 --- a/src/args.rs +++ b/src/args.rs @@ -179,7 +179,6 @@ impl Run { } } - #[allow(unused)] pub fn failure_domains(&self) -> HashMap<&str, &str> { let mut ret = HashMap::new(); for (k, v) in &self.failure_domains { diff --git a/src/main.rs b/src/main.rs index a5172dbb1b821b95087bcf232accd9f00b19b562..acea68b8be11253c28ae998d37fbb9887041c42a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -420,6 +420,10 @@ fn start_boot(args: &args::Run) { args.instance_id(), args.replicaset_id.clone(), args.advertise_address(), + args.failure_domains() + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(), ); let raft_id = peer.raft_id; let instance_id = peer.instance_id.clone(); @@ -501,6 +505,11 @@ fn start_join(args: &args::Run, leader_address: String) { instance_id: args.instance_id(), replicaset_id: args.replicaset_id.clone(), advertise_address: args.advertise_address(), + failure_domains: args + .failure_domains() + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(), }; let fn_name = stringify_cfunc!(traft::node::raft_join); diff --git a/src/traft/mod.rs b/src/traft/mod.rs index a85b0b70a7547e49d948f31d484c522eac43f2b7..4e98940b981c1c48da4b73e906248d4e84968d0f 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -13,6 +13,7 @@ use ::tarantool::tuple::AsTuple; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::any::Any; +use std::collections::HashMap; use std::convert::TryFrom; use std::fmt::Display; use uuid::Uuid; @@ -27,6 +28,8 @@ pub type RaftId = u64; pub type InstanceId = String; pub type ReplicasetId = String; +pub type FailureDomains = HashMap<String, String>; + ////////////////////////////////////////////////////////////////////////////////////////// /// Timestamps for raft entries. /// @@ -407,6 +410,7 @@ pub struct JoinRequest { pub instance_id: Option<String>, pub replicaset_id: Option<String>, pub advertise_address: String, + pub failure_domains: FailureDomains, } impl AsTuple for JoinRequest {} diff --git a/src/traft/node.rs b/src/traft/node.rs index e441cfb4eac2e89472bbe4d0ed54cb464731a234..02d2e4427b42ca0cf0456a0ad7ea8b486d2f7d59 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -590,8 +590,14 @@ fn raft_main_loop( instance_id, replicaset_id, advertise_address, + failure_domains, .. - }) => topology.join(instance_id, replicaset_id, advertise_address), + }) => topology.join( + instance_id, + replicaset_id, + advertise_address, + failure_domains, + ), TopologyRequest::SetActive(SetActiveRequest { instance_id, kind, .. diff --git a/src/traft/topology.rs b/src/traft/topology.rs index a1f7766576391937c690e49efa4bc18d59172420..596f449a85750827c0ad1cd08951b981637b3572 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -3,6 +3,7 @@ use std::collections::BTreeSet; use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; +use crate::traft::FailureDomains; use crate::traft::Health; use crate::traft::Peer; use crate::traft::{InstanceId, RaftId, ReplicasetId}; @@ -63,7 +64,9 @@ impl Topology { format!("i{raft_id}") } - fn choose_replicaset_id(&self) -> String { + fn choose_replicaset_id(&self, failure_domains: &FailureDomains) -> String { + // TODO: implement logic + let _ = failure_domains; for (replicaset_id, peers) in self.replicaset_map.iter() { if peers.len() < self.replication_factor as usize { return replicaset_id.clone(); @@ -85,6 +88,7 @@ impl Topology { instance_id: Option<String>, replicaset_id: Option<String>, advertise: String, + failure_domains: FailureDomains, ) -> Result<Peer, String> { if let Some(id) = instance_id.as_ref() { let existing_peer: Option<&Peer> = self.instance_map.get(id); @@ -99,9 +103,13 @@ impl Topology { let raft_id = self.max_raft_id + 1; let instance_id: String = instance_id.unwrap_or_else(|| self.choose_instance_id(raft_id)); let instance_uuid = instance_uuid(&instance_id); - let replicaset_id: String = replicaset_id.unwrap_or_else(|| self.choose_replicaset_id()); + let replicaset_id: String = + replicaset_id.unwrap_or_else(|| self.choose_replicaset_id(&failure_domains)); let replicaset_uuid = replicaset_uuid(&replicaset_id); + // TODO: store it in peer + let _ = failure_domains; + let peer = Peer { instance_id, instance_uuid, @@ -151,10 +159,11 @@ pub fn initial_peer( instance_id: Option<String>, replicaset_id: Option<String>, advertise: String, + failure_domains: FailureDomains, ) -> Peer { let mut topology = Topology::from_peers(vec![]); let mut peer = topology - .join(instance_id, replicaset_id, advertise) + .join(instance_id, replicaset_id, advertise, failure_domains) .unwrap(); peer.commit_index = 1; peer @@ -166,6 +175,7 @@ mod tests { use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; + use crate::traft::FailureDomains; use crate::traft::Health::{Offline, Online}; use crate::traft::Peer; use pretty_assertions::assert_eq; @@ -216,6 +226,7 @@ mod tests { $instance_id.map(str::to_string), $replicaset_id.map(str::to_string), $advertise_address.into(), + FailureDomains::default(), ) }; }