From c9120139442dce877efb628dfb835fcfcade8398 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 18 Oct 2022 16:25:07 +0300
Subject: [PATCH] wip: vshard in governor

---
 src/traft/mod.rs         |  2 +
 src/traft/node.rs        | 93 +++++++++++++++++++++++++++++++++++++++-
 test/int/test_joining.py |  4 +-
 3 files changed, 96 insertions(+), 3 deletions(-)

diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 1fc3c1bb72..8773dea69d 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -847,6 +847,8 @@ crate::define_str_enum! {
         RaftSynced = "RaftSynced",
         // Instance has configured replication.
         Replicated = "Replicated",
+        // Instance has configured sharding.
+        ShardingInitialized = "ShardingInitialized",
         // Instance is active and is handling requests.
         Online = "Online",
         // Instance has permanently removed from cluster.
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 089cc48adb..19927e6d3f 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -46,7 +46,7 @@ use crate::traft::event;
 use crate::traft::event::Event;
 use crate::traft::failover;
 use crate::traft::notify::Notify;
-use crate::traft::rpc::replication;
+use crate::traft::rpc::{replication, sharding};
 use crate::traft::storage::peer_field;
 use crate::traft::storage::StateKey;
 use crate::traft::ConnectionPool;
@@ -1001,6 +1001,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
             continue 'governor;
         }
 
+        let raft_id = status.get().id;
         let peers = storage.peers.all_peers().unwrap();
         let term = storage.raft.term().unwrap().unwrap_or(0);
         let node = global().expect("must be initialized");
@@ -1029,6 +1030,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
             .filter(|peer| peer.current_grade != CurrentGrade::Offline)
             .find(|peer| peer.target_grade == TargetGrade::Offline);
         if let Some(peer) = to_offline {
+            // TODO: everybody needs to rerun vshard.*.cfg
             let instance_id = peer.instance_id.clone();
             let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
             let req = UpdatePeerRequest::new(instance_id, cluster_id)
@@ -1200,11 +1202,98 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
 
         ////////////////////////////////////////////////////////////////////////
         // sharding
+        let need_sharding = peers
+            .iter()
+            .any(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online));
+        if need_sharding {
+            let peer_ids = unwrap_ok_or!(
+                storage.peers.peers_fields::<peer_field::InstanceId>(),
+                Err(e) => {
+                    tlog!(Warning, "failed reading peer instances: {e}");
+
+                    // TODO: don't hard code timeout
+                    event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300))
+                        .unwrap();
+                    continue 'governor;
+                }
+            );
+            let peer_ids = peer_ids.collect::<Vec<_>>();
+
+            // TODO: good api needed
+            static mut SENT_COUNT: usize = 0;
+            unsafe { SENT_COUNT = 0 };
+            let (cond_rx, cond_tx) = Rc::new(fiber::Cond::new()).into_clones();
+            let peer_count = peer_ids.len();
+            let (rx, tx) = fiber::Channel::new(peer_count as _).into_clones();
+            // Send rpc request to all peers in the cluster
+            for peer_instance_id in &peer_ids {
+                let tx = tx.clone();
+                let cond_tx = cond_tx.clone();
+                let peer_iid = peer_instance_id.clone();
+                pool.call(
+                    peer_instance_id,
+                    sharding::Request {
+                        leader_id: raft_id,
+                        term,
+                    },
+                    move |res| {
+                        tx.send((peer_iid, res)).expect("mustn't fail");
+                        unsafe { SENT_COUNT += 1 };
+                        if unsafe { SENT_COUNT } == peer_count {
+                            cond_tx.signal()
+                        }
+                    },
+                )
+                .expect("shouldn't fail");
+            }
+            // TODO: don't hard code timeout
+            if !cond_rx.wait_timeout(Duration::from_secs(3)) {
+                tlog!(Warning, "failed to configure sharding: timed out");
+                continue 'governor;
+            }
+
+            let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
+            // Process all rpc responses
+            for (peer_iid, resp) in rx.into_iter().take(peer_count) {
+                let cluster_id = cluster_id.clone();
+                let peer_iid_2 = peer_iid.clone();
+                let res = resp.and_then(move |sharding::Response {}| {
+                    let req = UpdatePeerRequest::new(peer_iid_2, cluster_id)
+                        .with_current_grade(CurrentGrade::ShardingInitialized);
+                    node.handle_topology_request_and_wait(req.into())
+                });
+                match res {
+                    Ok(_) => {
+                        // TODO: change `Info` to `Debug`
+                        tlog!(Info, "configured sharding with peer";
+                            "instance_id" => &*peer_iid,
+                        );
+                    }
+                    Err(e) => {
+                        tlog!(Warning, "failed to configure sharding: {e}";
+                            "instance_id" => &*peer_iid,
+                        );
+
+                        // TODO: don't hard code timeout
+                        event::wait_timeout(Event::TopologyChanged, Duration::from_secs(10))
+                            .unwrap();
+                        continue 'governor;
+                    }
+                }
+            }
+
+            tlog!(Info, "configured sharding");
+
+            continue 'governor;
+        }
+
+        ////////////////////////////////////////////////////////////////////////
+        // ???
         // TODO
 
         let to_online = peers
             .iter()
-            .find(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online));
+            .find(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online));
         if let Some(peer) = to_online {
             let instance_id = peer.instance_id.clone();
             let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
diff --git a/test/int/test_joining.py b/test/int/test_joining.py
index 5f0606abfb..4fe8a43c3b 100644
--- a/test/int/test_joining.py
+++ b/test/int/test_joining.py
@@ -132,7 +132,9 @@ def test_replication(cluster: Cluster):
     @funcy.retry(tries=20, timeout=0.1)
     def wait_replicated(instance):
         box_replication = instance.eval("return box.cfg.replication")
-        assert set(box_replication) == set([i1.listen, i2.listen]), instance
+        assert set(box_replication) == set(
+            (f"guest:@{addr}" for addr in [i1.listen, i2.listen])
+        ), instance
 
     for instance in cluster.instances:
         with instance.connect(1) as conn:
-- 
GitLab