From 0755c236681d4d8bd61a15152af5fd5365fcc8fb Mon Sep 17 00:00:00 2001
From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com>
Date: Tue, 21 Jun 2022 23:43:08 +0300
Subject: [PATCH] feature: extend JoinRequest with failure domains

Part of https://git.picodata.io/picodata/picodata/picodata/-/issues/98
---
 src/args.rs           |  1 -
 src/main.rs           |  9 +++++++++
 src/traft/mod.rs      |  4 ++++
 src/traft/node.rs     |  8 +++++++-
 src/traft/topology.rs | 17 ++++++++++++++---
 5 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/src/args.rs b/src/args.rs
index 3600e217c8..dac60d4276 100644
--- a/src/args.rs
+++ b/src/args.rs
@@ -179,7 +179,6 @@ impl Run {
         }
     }
 
-    #[allow(unused)]
     pub fn failure_domains(&self) -> HashMap<&str, &str> {
         let mut ret = HashMap::new();
         for (k, v) in &self.failure_domains {
diff --git a/src/main.rs b/src/main.rs
index a5172dbb1b..acea68b8be 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -420,6 +420,10 @@ fn start_boot(args: &args::Run) {
         args.instance_id(),
         args.replicaset_id.clone(),
         args.advertise_address(),
+        args.failure_domains()
+            .into_iter()
+            .map(|(k, v)| (k.into(), v.into()))
+            .collect(),
     );
     let raft_id = peer.raft_id;
     let instance_id = peer.instance_id.clone();
@@ -501,6 +505,11 @@ fn start_join(args: &args::Run, leader_address: String) {
         instance_id: args.instance_id(),
         replicaset_id: args.replicaset_id.clone(),
         advertise_address: args.advertise_address(),
+        failure_domains: args
+            .failure_domains()
+            .into_iter()
+            .map(|(k, v)| (k.into(), v.into()))
+            .collect(),
     };
 
     let fn_name = stringify_cfunc!(traft::node::raft_join);
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index a85b0b70a7..4e98940b98 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -13,6 +13,7 @@ use ::tarantool::tuple::AsTuple;
 use serde::de::DeserializeOwned;
 use serde::{Deserialize, Serialize};
 use std::any::Any;
+use std::collections::HashMap;
 use std::convert::TryFrom;
 use std::fmt::Display;
 use uuid::Uuid;
@@ -27,6 +28,8 @@ pub type RaftId = u64;
 pub type InstanceId = String;
 pub type ReplicasetId = String;
 
+pub type FailureDomains = HashMap<String, String>;
+
 //////////////////////////////////////////////////////////////////////////////////////////
 /// Timestamps for raft entries.
 ///
@@ -407,6 +410,7 @@ pub struct JoinRequest {
     pub instance_id: Option<String>,
     pub replicaset_id: Option<String>,
     pub advertise_address: String,
+    pub failure_domains: FailureDomains,
 }
 impl AsTuple for JoinRequest {}
 
diff --git a/src/traft/node.rs b/src/traft/node.rs
index e441cfb4ea..02d2e4427b 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -590,8 +590,14 @@ fn raft_main_loop(
                             instance_id,
                             replicaset_id,
                             advertise_address,
+                            failure_domains,
                             ..
-                        }) => topology.join(instance_id, replicaset_id, advertise_address),
+                        }) => topology.join(
+                            instance_id,
+                            replicaset_id,
+                            advertise_address,
+                            failure_domains,
+                        ),
 
                         TopologyRequest::SetActive(SetActiveRequest {
                             instance_id, kind, ..
diff --git a/src/traft/topology.rs b/src/traft/topology.rs
index a1f7766576..596f449a85 100644
--- a/src/traft/topology.rs
+++ b/src/traft/topology.rs
@@ -3,6 +3,7 @@ use std::collections::BTreeSet;
 
 use crate::traft::instance_uuid;
 use crate::traft::replicaset_uuid;
+use crate::traft::FailureDomains;
 use crate::traft::Health;
 use crate::traft::Peer;
 use crate::traft::{InstanceId, RaftId, ReplicasetId};
@@ -63,7 +64,9 @@ impl Topology {
         format!("i{raft_id}")
     }
 
-    fn choose_replicaset_id(&self) -> String {
+    fn choose_replicaset_id(&self, failure_domains: &FailureDomains) -> String {
+        // TODO: implement logic
+        let _ = failure_domains;
         for (replicaset_id, peers) in self.replicaset_map.iter() {
             if peers.len() < self.replication_factor as usize {
                 return replicaset_id.clone();
@@ -85,6 +88,7 @@ impl Topology {
         instance_id: Option<String>,
         replicaset_id: Option<String>,
         advertise: String,
+        failure_domains: FailureDomains,
     ) -> Result<Peer, String> {
         if let Some(id) = instance_id.as_ref() {
             let existing_peer: Option<&Peer> = self.instance_map.get(id);
@@ -99,9 +103,13 @@ impl Topology {
         let raft_id = self.max_raft_id + 1;
         let instance_id: String = instance_id.unwrap_or_else(|| self.choose_instance_id(raft_id));
         let instance_uuid = instance_uuid(&instance_id);
-        let replicaset_id: String = replicaset_id.unwrap_or_else(|| self.choose_replicaset_id());
+        let replicaset_id: String =
+            replicaset_id.unwrap_or_else(|| self.choose_replicaset_id(&failure_domains));
         let replicaset_uuid = replicaset_uuid(&replicaset_id);
 
+        // TODO: store it in peer
+        let _ = failure_domains;
+
         let peer = Peer {
             instance_id,
             instance_uuid,
@@ -151,10 +159,11 @@ pub fn initial_peer(
     instance_id: Option<String>,
     replicaset_id: Option<String>,
     advertise: String,
+    failure_domains: FailureDomains,
 ) -> Peer {
     let mut topology = Topology::from_peers(vec![]);
     let mut peer = topology
-        .join(instance_id, replicaset_id, advertise)
+        .join(instance_id, replicaset_id, advertise, failure_domains)
         .unwrap();
     peer.commit_index = 1;
     peer
@@ -166,6 +175,7 @@ mod tests {
 
     use crate::traft::instance_uuid;
     use crate::traft::replicaset_uuid;
+    use crate::traft::FailureDomains;
     use crate::traft::Health::{Offline, Online};
     use crate::traft::Peer;
     use pretty_assertions::assert_eq;
@@ -216,6 +226,7 @@ mod tests {
                 $instance_id.map(str::to_string),
                 $replicaset_id.map(str::to_string),
                 $advertise_address.into(),
+                FailureDomains::default(),
             )
         };
     }
-- 
GitLab