From 2dac77c5393fab902fb8f0e6ba309d0b1d4ad479 Mon Sep 17 00:00:00 2001
From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com>
Date: Thu, 26 May 2022 17:44:22 +0300
Subject: [PATCH] feature: set up replication when instance joins

Picodata already assigns `replicaset_id` to an instance when it joins,
but it wasn't used in Tarantool `box.cfg` yet. Now it is.

It's also important to set up listen port in `start_join` immediately.
Without it Tarantool will stuck waiting for connection to self.

Part of https://git.picodata.io/picodata/picodata/picodata/-/issues/52
---
 src/main.rs          |  2 +-
 src/traft/node.rs    | 19 +++++++------
 src/traft/storage.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 77 insertions(+), 10 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index bfc8b2d0d5..367e35d4ed 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -493,7 +493,7 @@ fn start_join(args: &args::Run, leader_address: String) {
     assert!(tarantool::cfg().is_none());
 
     let cfg = tarantool::Cfg {
-        listen: None,
+        listen: Some(args.listen.clone()),
         read_only: false,
         instance_uuid: Some(resp.peer.instance_uuid.clone()),
         replicaset_uuid: Some(resp.peer.replicaset_uuid.clone()),
diff --git a/src/traft/node.rs b/src/traft/node.rs
index efed75fc91..55a8d2ba1a 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -704,13 +704,14 @@ fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> {
     let instance_id = req.instance_id.clone();
     node.join_one(req)?;
 
-    let resp = JoinResponse {
-        peer: {
-            // TODO: get rid of unwrap
-            Storage::peer_by_instance_id(&instance_id)?.unwrap()
-        },
-        raft_group: Storage::peers()?,
-        box_replication: vec![],
-    };
-    Ok(resp)
+    let peer = Storage::peer_by_instance_id(&instance_id)?
+        .ok_or("the peer has misteriously disappeared")?;
+    let raft_group = Storage::peers()?;
+    let box_replication = Storage::box_replication(&peer)?;
+
+    Ok(JoinResponse {
+        peer,
+        raft_group,
+        box_replication,
+    })
 }
diff --git a/src/traft/storage.rs b/src/traft/storage.rs
index f621bea8cb..d854310d70 100644
--- a/src/traft/storage.rs
+++ b/src/traft/storage.rs
@@ -91,6 +91,11 @@ impl Storage {
                 parts = {{'instance_id'}},
                 unique = true,
             })
+            box.space.raft_group:create_index('replicaset_id', {
+                if_not_exists = true,
+                parts = {{'replicaset_id'}, {'commit_index'}},
+                unique = false,
+            })
             box.space.raft_group:create_index('peer_address', {
                 if_not_exists = true,
                 parts = {{'peer_address'}},
@@ -168,6 +173,33 @@ impl Storage {
         Ok(ret)
     }
 
+    pub fn box_replication(peer: &traft::Peer) -> Result<Vec<String>, StorageError> {
+        let mut ret = Vec::new();
+
+        const IDX: &str = "replicaset_id";
+        let iter = Storage::space(RAFT_GROUP)?
+            .index(IDX)
+            .ok_or_else(|| Error::NoSuchIndex(RAFT_GROUP.into(), IDX.into()))
+            .map_err(box_err!())?
+            .select(IteratorType::GE, &(&peer.replicaset_id,))
+            .map_err(box_err!())?;
+
+        for tuple in iter {
+            let replica: traft::Peer = tuple.into_struct().map_err(box_err!())?;
+
+            if replica.replicaset_id != peer.replicaset_id
+                || replica.commit_index > peer.commit_index
+            {
+                // In Tarantool the iteration must be interrupted explicitly.
+                break;
+            }
+
+            ret.push(replica.peer_address);
+        }
+
+        Ok(ret)
+    }
+
     pub fn id() -> Result<Option<u64>, StorageError> {
         Storage::raft_state("id")
     }
@@ -564,6 +596,29 @@ inventory::submit!(crate::InnerTest {
             assert_eq!(Storage::peer_by_instance_id("i6"), Ok(None));
         }
 
+        let box_replication = |rsid: &str, idx: u64| {
+            let peer = traft::Peer {
+                replicaset_id: rsid.into(),
+                commit_index: idx,
+                ..Default::default()
+            };
+            Storage::box_replication(&peer).unwrap()
+        };
+
+        {
+            assert_eq!(box_replication("r1", 0), Vec::<&str>::new());
+            assert_eq!(box_replication("XX", 99), Vec::<&str>::new());
+
+            assert_eq!(box_replication("r1", 1), vec!["addr:1"]);
+            assert_eq!(box_replication("r1", 2), vec!["addr:1", "addr:2"]);
+            assert_eq!(box_replication("r1", 99), vec!["addr:1", "addr:2"]);
+
+            assert_eq!(box_replication("r2", 10), vec!["addr:3", "addr:4"]);
+            assert_eq!(box_replication("r2", 10), vec!["addr:3", "addr:4"]);
+
+            assert_eq!(box_replication("r3", 10), vec!["addr:5"]);
+        }
+
         raft_group.index("instance_id").unwrap().drop().unwrap();
 
         assert_err!(
@@ -575,6 +630,17 @@ inventory::submit!(crate::InnerTest {
             )
         );
 
+        raft_group.index("replicaset_id").unwrap().drop().unwrap();
+
+        assert_err!(
+            Storage::box_replication(&traft::Peer::default()),
+            concat!(
+                "unknown error",
+                " no such index \"replicaset_id\"",
+                " in space \"raft_group\""
+            )
+        );
+
         raft_group.index("peer_address").unwrap().drop().unwrap();
         raft_group.primary_key().drop().unwrap();
 
-- 
GitLab