From 1002372ce459de1ef5f4e1ce94ecd10b405bda74 Mon Sep 17 00:00:00 2001
From: Valentin Syrovatskiy <v.syrovatskiy@picodata.io>
Date: Mon, 27 Jun 2022 07:31:26 +0000
Subject: [PATCH] feat: optional instance_id

---
 src/args.rs              |  41 +++++++++++-
 src/main.rs              |   6 +-
 src/traft/mod.rs         |  11 +---
 src/traft/node.rs        |  45 ++++++++-----
 src/traft/topology.rs    | 134 ++++++++++++++++++++++-----------------
 test/int/conftest.py     |  21 ++++--
 test/int/test_joining.py |  16 +++++
 7 files changed, 180 insertions(+), 94 deletions(-)

diff --git a/src/args.rs b/src/args.rs
index df66457988..bef897b3f2 100644
--- a/src/args.rs
+++ b/src/args.rs
@@ -50,9 +50,15 @@ pub struct Run {
     /// Execute tarantool (Lua) script
     pub tarantool_exec: Option<CString>,
 
-    #[clap(long, value_name = "name", env = "PICODATA_INSTANCE_ID")]
+    #[clap(
+        long,
+        value_name = "name",
+        default_value = "",
+        env = "PICODATA_INSTANCE_ID"
+    )]
     /// Name of the instance
-    pub instance_id: String,
+    /// Empty value means that instance_id of a Follower will be generated on the Leader
+    instance_id: String,
 
     #[clap(
         long = "advertise",
@@ -147,6 +153,13 @@ impl Run {
     pub fn log_level(&self) -> SayLevel {
         self.log_level.into()
     }
+
+    pub fn instance_id(&self) -> Option<String> {
+        match self.instance_id.as_str() {
+            "" => None,
+            any => Some(any.to_string()),
+        }
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -279,13 +292,23 @@ mod tests {
         {
             let parsed = parse![Run,];
             assert_eq!(parsed.instance_id, "instance-id-from-env");
+            assert_eq!(
+                parsed.instance_id(),
+                Some("instance-id-from-env".to_string())
+            );
             assert_eq!(parsed.peers.as_ref(), vec!["localhost:3301"]);
             assert_eq!(parsed.listen, "localhost:3301"); // default
             assert_eq!(parsed.advertise_address(), "localhost:3301"); // default
             assert_eq!(parsed.log_level(), SayLevel::Info); // default
 
             let parsed = parse![Run, "--instance-id", "instance-id-from-args"];
-            assert_eq!(parsed.instance_id, "instance-id-from-args");
+            assert_eq!(
+                parsed.instance_id(),
+                Some("instance-id-from-args".to_string())
+            );
+
+            let parsed = parse![Run, "--instance-id", ""];
+            assert_eq!(parsed.instance_id(), None);
         }
 
         std::env::set_var("PICODATA_PEER", "peer-from-env");
@@ -300,6 +323,18 @@ mod tests {
             assert_eq!(parsed.peers.as_ref(), vec!["localhost:3302"]);
         }
 
+        std::env::set_var("PICODATA_INSTANCE_ID", "");
+        {
+            let parsed = parse![Run,];
+            assert_eq!(parsed.instance_id(), None);
+        }
+
+        std::env::remove_var("PICODATA_INSTANCE_ID");
+        {
+            let parsed = parse![Run,];
+            assert_eq!(parsed.instance_id(), None);
+        }
+
         std::env::set_var("PICODATA_LISTEN", "listen-from-env");
         {
             let parsed = parse![Run,];
diff --git a/src/main.rs b/src/main.rs
index 06a651a52a..671e246e10 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -407,7 +407,7 @@ fn start_boot(args: &args::Run) {
 
     let peer = traft::topology::initial_peer(
         args.cluster_id.clone(),
-        args.instance_id.clone(),
+        args.instance_id(),
         args.replicaset_id.clone(),
         args.advertise_address(),
     );
@@ -475,7 +475,7 @@ fn start_join(args: &args::Run, leader_address: String) {
 
     let req = traft::JoinRequest {
         cluster_id: args.cluster_id.clone(),
-        instance_id: args.instance_id.clone(),
+        instance_id: args.instance_id(),
         replicaset_id: args.replicaset_id.clone(),
         voter: false,
         advertise_address: args.advertise_address(),
@@ -598,7 +598,7 @@ fn postjoin(args: &args::Run) {
         tlog!(Warning, "initiating self-promotion of {me:?}");
         let req = traft::JoinRequest {
             cluster_id: args.cluster_id.clone(),
-            instance_id: me.instance_id.clone(),
+            instance_id: Some(me.instance_id.clone()),
             replicaset_id: None, // TODO
             voter: true,
             advertise_address: args.advertise_address(),
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index a5dc5e3569..351ddeecf8 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -307,15 +307,6 @@ pub enum TopologyRequest {
     Deactivate(DeactivateRequest),
 }
 
-impl TopologyRequest {
-    pub fn instance_id(&self) -> &str {
-        match self {
-            Self::Join(JoinRequest { instance_id, .. })
-            | Self::Deactivate(DeactivateRequest { instance_id, .. }) => instance_id,
-        }
-    }
-}
-
 impl From<JoinRequest> for TopologyRequest {
     fn from(j: JoinRequest) -> Self {
         Self::Join(j)
@@ -333,7 +324,7 @@ impl From<DeactivateRequest> for TopologyRequest {
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct JoinRequest {
     pub cluster_id: String,
-    pub instance_id: String,
+    pub instance_id: Option<String>,
     pub replicaset_id: Option<String>,
     pub advertise_address: String,
     pub voter: bool,
diff --git a/src/traft/node.rs b/src/traft/node.rs
index bb419182fe..4676d36a5b 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -665,8 +665,6 @@ fn raft_main_loop(
 fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) {
     loop {
         let batch = inbox.receive_all(Duration::MAX);
-        let ids: Vec<_> = batch.iter().map(|(req, _)| req.instance_id()).collect();
-        tlog!(Info, "processing batch: {ids:?}");
 
         let term = Storage::term().unwrap().unwrap_or(0);
         let mut topology = match Storage::peers() {
@@ -680,18 +678,37 @@ fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) {
             }
         };
 
+        let mut topology_results = vec![];
+
         for (req, notify) in &batch {
-            if let Err(e) = topology.process(req) {
-                let e = RaftError::ConfChangeError(e);
-                notify.try_send(Err(e)).expect("that's a bug");
+            match topology.process(req) {
+                Ok(peer) => {
+                    topology_results.push((notify, peer));
+                }
+                Err(e) => {
+                    let e = RaftError::ConfChangeError(e);
+                    notify.try_send(Err(e)).expect("that's a bug");
+                }
             }
         }
 
+        let topology_diff = topology.diff();
+        let topology_to_replace = topology.to_replace();
+
+        let mut ids: Vec<String> = vec![];
+        for peer in &topology_diff {
+            ids.push(peer.instance_id.clone());
+        }
+        for (_, peer) in &topology_to_replace {
+            ids.push(peer.instance_id.clone());
+        }
+        tlog!(Info, "processing batch: {ids:?}");
+
         let (rx, tx) = fiber::Channel::new(1).into_clones();
         main_inbox.send(NormalRequest::ProposeConfChange {
             term,
-            peers: topology.diff(),
-            to_replace: topology.to_replace(),
+            peers: topology_diff,
+            to_replace: topology_to_replace,
             notify: tx,
         });
 
@@ -701,12 +718,12 @@ fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) {
         // after the node leaves the joint state.
         let res = rx.recv().expect("that's a bug");
         tlog!(Info, "batch processed: {ids:?}, {res:?}");
-        for (_, notify) in batch {
-            // RaftError doesn't implement the Clone trait,
-            // so we have to be creative.
+        for (notify, peer) in topology_results {
             match &res {
-                Ok(v) => notify.try_send(Ok(*v)).ok(),
+                Ok(_) => notify.try_send(Ok(peer.raft_id)).ok(),
                 Err(e) => {
+                    // RaftError doesn't implement the Clone trait,
+                    // so we have to be creative.
                     let e = RaftError::ConfChangeError(format!("{e}"));
                     notify.try_send(Err(e)).ok()
                 }
@@ -757,11 +774,9 @@ fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> {
         }));
     }
 
-    let instance_id = req.instance_id.clone();
-    node.change_topology(req)?;
+    let raft_id = node.change_topology(req)?;
 
-    let peer = Storage::peer_by_instance_id(&instance_id)?
-        .ok_or("the peer has misteriously disappeared")?;
+    let peer = Storage::peer_by_raft_id(raft_id)?.ok_or("the peer has misteriously disappeared")?;
     let raft_group = Storage::peers()?;
     let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?;
 
diff --git a/src/traft/topology.rs b/src/traft/topology.rs
index 784741276a..086e488355 100644
--- a/src/traft/topology.rs
+++ b/src/traft/topology.rs
@@ -58,9 +58,9 @@ impl Topology {
             .insert(peer.raft_id);
     }
 
-    fn peer_by_instance_id(&self, instance_id: &str) -> Option<&Peer> {
+    fn peer_by_instance_id(&self, instance_id: &str) -> Option<Peer> {
         let raft_id = self.instance_id_map.get(instance_id)?;
-        self.peers.get(raft_id)
+        self.peers.get(raft_id).cloned()
     }
 
     fn choose_replicaset_id(&self) -> String {
@@ -80,67 +80,83 @@ impl Topology {
         }
     }
 
-    pub fn join(&mut self, req: &JoinRequest) -> Result<(), String> {
-        if let Some(peer) = self.peer_by_instance_id(&req.instance_id) {
-            match &req.replicaset_id {
-                Some(replicaset_id) if replicaset_id != &peer.replicaset_id => {
-                    let e = format!(
-                        std::concat!(
-                            "{} already joined with a different replicaset_id,",
-                            " requested: {},",
-                            " existing: {}.",
-                        ),
-                        req.instance_id, replicaset_id, peer.replicaset_id
-                    );
-                    return Err(e);
-                }
-                _ => (),
-            }
-
-            let mut peer = peer.clone();
-            peer.peer_address = req.advertise_address.clone();
-            peer.voter = req.voter;
-            peer.is_active = true;
+    fn choose_instance_id(instance_id: Option<String>, raft_id: u64) -> String {
+        match instance_id {
+            Some(v) => v,
+            None => format!("i{raft_id}"),
+        }
+    }
 
-            if req.voter {
-                self.diff.insert(peer.raft_id);
-            } else {
-                let old_raft_id = peer.raft_id;
-                peer.raft_id = self.max_raft_id + 1;
+    pub fn join(&mut self, req: &JoinRequest) -> Result<Peer, String> {
+        match &req.instance_id {
+            Some(instance_id) => match self.peer_by_instance_id(instance_id) {
+                Some(peer) => self.modify_existing_instance(peer, req),
+                None => self.join_new_instance(req),
+            },
+            None => self.join_new_instance(req),
+        }
+    }
 
-                self.to_replace.insert(peer.raft_id, old_raft_id);
+    fn modify_existing_instance(&mut self, peer: Peer, req: &JoinRequest) -> Result<Peer, String> {
+        match &req.replicaset_id {
+            Some(replicaset_id) if replicaset_id != &peer.replicaset_id => {
+                let e = format!(
+                    std::concat!(
+                        "{} already joined with a different replicaset_id,",
+                        " requested: {},",
+                        " existing: {}.",
+                    ),
+                    peer.instance_id, replicaset_id, peer.replicaset_id
+                );
+                return Err(e);
             }
-            self.put_peer(peer);
+            _ => (),
+        }
 
-            return Ok(());
-        } else {
-            let raft_id = self.max_raft_id + 1;
-            let replicaset_id = match &req.replicaset_id {
-                Some(v) => v.clone(),
-                None => self.choose_replicaset_id(),
-            };
-            let replicaset_uuid = replicaset_uuid(&replicaset_id);
+        let mut peer = peer;
+        peer.peer_address = req.advertise_address.clone();
+        peer.voter = req.voter;
+        peer.is_active = true;
 
-            let peer = Peer {
-                raft_id,
-                instance_id: req.instance_id.clone(),
-                replicaset_id,
-                commit_index: INVALID_INDEX,
-                instance_uuid: instance_uuid(&req.instance_id),
-                replicaset_uuid,
-                peer_address: req.advertise_address.clone(),
-                voter: req.voter,
-                is_active: true,
-            };
+        if req.voter {
+            self.diff.insert(peer.raft_id);
+        } else {
+            let old_raft_id = peer.raft_id;
+            peer.raft_id = self.max_raft_id + 1;
 
-            self.diff.insert(raft_id);
-            self.put_peer(peer);
+            self.to_replace.insert(peer.raft_id, old_raft_id);
         }
+        self.put_peer(peer.clone());
+        Ok(peer)
+    }
+
+    fn join_new_instance(&mut self, req: &JoinRequest) -> Result<Peer, String> {
+        let raft_id = self.max_raft_id + 1;
+        let replicaset_id = match &req.replicaset_id {
+            Some(v) => v.clone(),
+            None => self.choose_replicaset_id(),
+        };
+        let replicaset_uuid = replicaset_uuid(&replicaset_id);
+        let instance_id = Self::choose_instance_id(req.instance_id.clone(), raft_id);
 
-        Ok(())
+        let peer = Peer {
+            raft_id,
+            instance_id: instance_id.clone(),
+            replicaset_id,
+            commit_index: INVALID_INDEX,
+            instance_uuid: instance_uuid(&instance_id),
+            replicaset_uuid,
+            peer_address: req.advertise_address.clone(),
+            voter: req.voter,
+            is_active: true,
+        };
+
+        self.diff.insert(raft_id);
+        self.put_peer(peer.clone());
+        Ok(peer)
     }
 
-    pub fn deactivate(&mut self, req: &DeactivateRequest) -> Result<(), String> {
+    pub fn deactivate(&mut self, req: &DeactivateRequest) -> Result<Peer, String> {
         let peer = match self.peer_by_instance_id(&req.instance_id) {
             Some(peer) => peer,
             None => {
@@ -154,17 +170,17 @@ impl Topology {
         let peer = Peer {
             voter: false,
             is_active: false,
-            ..peer.clone()
+            ..peer
         };
 
         self.diff.insert(peer.raft_id);
         // no need to call put_peer, as the peer was already in the cluster
-        self.peers.insert(peer.raft_id, peer);
+        self.peers.insert(peer.raft_id, peer.clone());
 
-        Ok(())
+        Ok(peer)
     }
 
-    pub fn process(&mut self, req: &TopologyRequest) -> Result<(), String> {
+    pub fn process(&mut self, req: &TopologyRequest) -> Result<Peer, String> {
         match req {
             TopologyRequest::Join(join) => self.join(join),
             TopologyRequest::Deactivate(deactivate) => self.deactivate(deactivate),
@@ -197,7 +213,7 @@ impl Topology {
 // Create first peer in the cluster
 pub fn initial_peer(
     cluster_id: String,
-    instance_id: String,
+    instance_id: Option<String>,
     replicaset_id: Option<String>,
     advertise_address: String,
 ) -> Peer {
@@ -271,7 +287,7 @@ mod tests {
         ) => {
             &crate::traft::TopologyRequest::Join(crate::traft::JoinRequest {
                 cluster_id: "cluster1".into(),
-                instance_id: $instance_id.into(),
+                instance_id: Some($instance_id.into()),
                 replicaset_id: $replicaset_id.map(|v: &str| v.into()),
                 advertise_address: $advertise_address.into(),
                 voter: $voter,
diff --git a/test/int/conftest.py b/test/int/conftest.py
index 453e03ef87..c70c54445d 100644
--- a/test/int/conftest.py
+++ b/test/int/conftest.py
@@ -149,6 +149,8 @@ assert color.intense_red("text") == "\x1b[31;1mtext\x1b[0m"
 
 OUT_LOCK = threading.Lock()
 
+POSITION_IN_SPACE_INSTANCE_ID = 3
+
 
 @dataclass
 class Instance:
@@ -332,6 +334,11 @@ class Instance:
         status = self._raft_status()
         assert status.is_ready
         self.raft_id = status.id
+        with self.connect(timeout=1) as conn:
+            self.instance_id = conn.space("raft_group").select((self.raft_id,))[0][
+                POSITION_IN_SPACE_INSTANCE_ID
+            ]
+            eprint(f"{self.instance_id=}")
         eprint(f"{self} is ready")
 
     @funcy.retry(tries=4, timeout=0.1, errors=AssertionError)
@@ -380,11 +387,15 @@ class Cluster:
     def __getitem__(self, item: int) -> Instance:
         return self.instances[item]
 
-    def deploy(self, *, instance_count: int) -> list[Instance]:
+    def deploy(
+        self, *, instance_count: int, generate_instance_id=True
+    ) -> list[Instance]:
         assert not self.instances, "Already deployed"
 
         for i in range(instance_count):
-            self.add_instance(wait_ready=False)
+            self.add_instance(
+                wait_ready=False, generate_instance_id=generate_instance_id
+            )
 
         for instance in self.instances:
             instance.start()
@@ -395,13 +406,15 @@ class Cluster:
         eprint(f" {self} deployed ".center(80, "="))
         return self.instances
 
-    def add_instance(self, wait_ready=True, peers=None) -> Instance:
+    def add_instance(
+        self, wait_ready=True, peers=None, generate_instance_id=True
+    ) -> Instance:
         i = 1 + len(self.instances)
 
         instance = Instance(
             binary_path=self.binary_path,
             cluster_id=self.id,
-            instance_id=f"i{i}",
+            instance_id=f"i{i}" if generate_instance_id else "",
             data_dir=f"{self.data_dir}/i{i}",
             host=self.base_host,
             port=self.base_port + i,
diff --git a/test/int/test_joining.py b/test/int/test_joining.py
index 4ba64a68d1..fcbdfa3f88 100644
--- a/test/int/test_joining.py
+++ b/test/int/test_joining.py
@@ -277,3 +277,19 @@ def test_rebootstrap_follower(cluster3: Cluster):
     i3.restart(remove_data=True)
     i3.wait_ready()
     i3.assert_raft_status("Follower")
+
+
+def test_join_without_explicit_instance_id(cluster: Cluster):
+    # Scenario: bootstrap single instance without explicitly given instance id
+    #   Given no instances started
+    #   When two instances starts without instance_id given
+    #   Then the one of the instances became Leader with instance_id=1
+    #   And the second one of the became Follower with instance_id 2
+
+    cluster.deploy(instance_count=2, generate_instance_id=False)
+
+    i1, i2 = cluster.instances
+    i1.assert_raft_status("Leader")
+    assert i1.instance_id == "i1"
+    i2.assert_raft_status("Follower")
+    assert i2.instance_id == "i2"
-- 
GitLab