From d291744cdb335f4456faea07ba89be38920d450d Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 13 Dec 2022 17:12:29 +0300
Subject: [PATCH] fix(governor): don't bootstrap vshard until replication
 factor is satisfied

NOTE: there's still a bug, because we set each new replicaset's weight
to 1 (even ones which don't satisfy replication factor)
before the bucket distribution is bootstrapped. But there must be at
least one replicaset with non zero weight in order for vshard.*.cfg to
work.

A potential solution would be to only configure vshard once a replicaset
is filled up.
---
 src/governor/mod.rs          | 95 +++++++++++++++++++++++++++++-------
 src/traft/rpc/sharding.rs    | 37 +++++++++++---
 test/int/test_replication.py | 27 +++++++---
 3 files changed, 128 insertions(+), 31 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 02f6fe8267..2ee10b2a0a 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -161,7 +161,6 @@ impl Loop {
                                 term,
                                 commit,
                                 timeout: Self::SYNC_TIMEOUT,
-                                bootstrap: false,
                             },
                         )
                     });
@@ -449,7 +448,6 @@ impl Loop {
         });
         if let Some(instance) = to_shard {
             let res: Result<_> = async {
-                let vshard_bootstrapped = storage.properties.vshard_bootstrapped()?;
                 let commit = raft_storage.commit()?.unwrap();
                 let reqs = maybe_responding(&instances).map(|instance| {
                     (
@@ -458,7 +456,6 @@ impl Loop {
                             term,
                             commit,
                             timeout: Self::SYNC_TIMEOUT,
-                            bootstrap: !vshard_bootstrapped && instance.raft_id == node.raft_id,
                         },
                     )
                 });
@@ -480,19 +477,6 @@ impl Loop {
                     ));
                 node.handle_update_instance_request_and_wait(req)?;
 
-                if !vshard_bootstrapped {
-                    // TODO: if this fails, it will only rerun next time vshard
-                    // gets reconfigured
-                    node.propose_and_wait(
-                        OpDML::replace(
-                            ClusterwideSpace::Property,
-                            &(PropertyName::VshardBootstrapped, true),
-                        )?,
-                        // TODO: don't hard code the timeout
-                        Duration::from_secs(3),
-                    )??;
-                }
-
                 Ok(())
             }
             .await;
@@ -508,6 +492,59 @@ impl Loop {
             return Continue;
         }
 
+        ////////////////////////////////////////////////////////////////////////
+        // bootstrap sharding
+        let to_bootstrap = get_first_full_replicaset(&instances, storage);
+        if let Err(e) = to_bootstrap {
+            tlog!(
+                Warning,
+                "failed checking if bucket bootstrapping is needed: {e}"
+            );
+            // TODO: don't hard code timeout
+            event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
+            return Continue;
+        }
+        if let Ok(Some(Replicaset { master_id, .. })) = to_bootstrap {
+            // TODO: change `Info` to `Debug`
+            tlog!(Info, "bootstrapping bucket distribution";
+                "instance_id" => %master_id,
+            );
+            let res: Result<_> = async {
+                let req = sharding::bootstrap::Request {
+                    term,
+                    commit: raft_storage.commit()?.unwrap(),
+                    timeout: Self::SYNC_TIMEOUT,
+                };
+                pool.call(&master_id, &req)?
+                    // TODO: don't hard code timeout
+                    .timeout(Duration::from_secs(3))
+                    .await??;
+
+                let op = OpDML::replace(
+                    ClusterwideSpace::Property,
+                    &(PropertyName::VshardBootstrapped, true),
+                )?;
+                // TODO: don't hard code timeout
+                node.propose_and_wait(op, Duration::from_secs(3))??;
+
+                Ok(())
+            }
+            .await;
+            if let Err(e) = res {
+                tlog!(Warning, "failed bootstrapping bucket distribution: {e}");
+                // TODO: don't hard code timeout
+                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
+                return Continue;
+            }
+
+            // TODO: change `Info` to `Debug`
+            tlog!(Info, "bootstrapped bucket distribution";
+                "instance_id" => %master_id,
+            );
+
+            return Continue;
+        };
+
         ////////////////////////////////////////////////////////////////////////
         // sharding weights
         let to_update_weights = instances.iter().find(|instance| {
@@ -538,7 +575,6 @@ impl Loop {
                         term,
                         commit,
                         timeout: Self::SYNC_TIMEOUT,
-                        bootstrap: false,
                     }));
                     // TODO: don't hard code timeout
                     let res = call_all(pool, reqs, Duration::from_secs(3)).await?;
@@ -748,6 +784,31 @@ fn get_weight_changes<'p>(
     (!weight_changes.is_empty()).then_some(weight_changes)
 }
 
+#[inline(always)]
+fn get_first_full_replicaset(
+    instances: &[Instance],
+    storage: &Clusterwide,
+) -> Result<Option<Replicaset>> {
+    if storage.properties.vshard_bootstrapped()? {
+        return Ok(None);
+    }
+
+    let replication_factor = storage.properties.replication_factor()?;
+    let mut replicaset_sizes = HashMap::new();
+    let mut full_replicaset_id = None;
+    for Instance { replicaset_id, .. } in maybe_responding(instances) {
+        let replicaset_size = replicaset_sizes.entry(replicaset_id).or_insert(0);
+        *replicaset_size += 1;
+        if *replicaset_size >= replication_factor {
+            full_replicaset_id = Some(replicaset_id);
+        }
+    }
+
+    let Some(replicaset_id) = full_replicaset_id else { return Ok(None); };
+    let res = storage.replicasets.get(replicaset_id)?;
+    Ok(res)
+}
+
 #[inline(always)]
 fn maybe_responding(instances: &[Instance]) -> impl Iterator<Item = &Instance> {
     instances.iter().filter(|instance| instance.may_respond())
diff --git a/src/traft/rpc/sharding.rs b/src/traft/rpc/sharding.rs
index a68e9488f5..853ec95da3 100644
--- a/src/traft/rpc/sharding.rs
+++ b/src/traft/rpc/sharding.rs
@@ -1,5 +1,6 @@
 use ::tarantool::tlua;
 
+use crate::traft::rpc::sync::wait_for_index_timeout;
 use crate::traft::Result;
 use crate::traft::{node, RaftIndex, RaftTerm};
 
@@ -9,7 +10,7 @@ crate::define_rpc_request! {
     fn proc_sharding(req: Request) -> Result<Response> {
         let node = node::global()?;
         node.status().check_term(req.term)?;
-        super::sync::wait_for_index_timeout(req.commit, &node.raft_storage, req.timeout)?;
+        wait_for_index_timeout(req.commit, &node.raft_storage, req.timeout)?;
 
         let storage = &node.storage;
         let cfg = cfg::Cfg::from_storage(storage)?;
@@ -32,10 +33,6 @@ crate::define_rpc_request! {
         )
         .map_err(tlua::LuaError::from)?;
 
-        if req.bootstrap {
-            lua.exec("vshard.router.bootstrap()")?;
-        }
-
         // After reconfiguring vshard leaves behind net.box.connection objects,
         // which try reconnecting every 0.5 seconds. Garbage collecting them helps
         lua.exec("collectgarbage()")?;
@@ -49,7 +46,6 @@ crate::define_rpc_request! {
         pub term: RaftTerm,
         pub commit: RaftIndex,
         pub timeout: Duration,
-        pub bootstrap: bool,
     }
 
     /// Response to [`sharding::Request`].
@@ -58,6 +54,35 @@ crate::define_rpc_request! {
     pub struct Response {}
 }
 
+pub mod bootstrap {
+    use super::*;
+
+    crate::define_rpc_request! {
+        fn proc_sharding_bootstrap(req: Request) -> Result<Response> {
+            let node = node::global()?;
+            node.status().check_term(req.term)?;
+            wait_for_index_timeout(req.commit, &node.raft_storage, req.timeout)?;
+
+            ::tarantool::lua_state().exec("vshard.router.bootstrap()")?;
+
+            Ok(Response {})
+        }
+
+        /// Request to bootstrap bucket distribution.
+        #[derive(Default)]
+        pub struct Request {
+            pub term: RaftTerm,
+            pub commit: RaftIndex,
+            pub timeout: Duration,
+        }
+
+        /// Response to [`sharding::bootstrap::Request`].
+        ///
+        /// [`sharding::bootstrap::Request`]: Request
+        pub struct Response {}
+    }
+}
+
 #[rustfmt::skip]
 pub mod cfg {
     use crate::storage::Clusterwide;
diff --git a/test/int/test_replication.py b/test/int/test_replication.py
index 0a6e622649..68dc71ae5a 100644
--- a/test/int/test_replication.py
+++ b/test/int/test_replication.py
@@ -137,24 +137,35 @@ def test_bucket_discovery_single(instance: Instance):
     wait_buckets_awailable(instance, 3000)
 
 
+@funcy.retry(tries=30, timeout=0.2)
+def wait_has_buckets(i: Instance, expected_active: int):
+    i.call("vshard.storage.rebalancer_wakeup")
+    storage_info = i.call("vshard.storage.info")
+    assert expected_active == storage_info["bucket"]["active"]
+
+
 @pytest.mark.xfail(
     run=True,
     reason=(
-        "currently we bootstrap vshard even before the first replicaset is filled, "
-        "but we shouldn't"
+        "currently we set non zero weights for all replicasets before bootstrap, "
+        "even those which don't satisfy the replication factor"
     ),
 )
 def test_bucket_discovery_respects_replication_factor(cluster: Cluster):
     i1, *_ = cluster.deploy(instance_count=1, init_replication_factor=2)
-    time.sleep(1)
+    time.sleep(0.5)
     assert 0 == i1.call("vshard.router.info")["bucket"]["available_rw"]
+    assert None is i1.call("pico.space.property:get", "vshard_bootstrapped")
 
+    i2 = cluster.add_instance(replicaset_id="r2")
+    time.sleep(0.5)
+    assert 0 == i2.call("vshard.router.info")["bucket"]["available_rw"]
+    assert None is i2.call("pico.space.property:get", "vshard_bootstrapped")
 
-@funcy.retry(tries=30, timeout=0.2)
-def wait_has_buckets(i: Instance, expected_active: int):
-    i.call("vshard.storage.rebalancer_wakeup")
-    storage_info = i.call("vshard.storage.info")
-    assert expected_active == storage_info["bucket"]["active"]
+    i3 = cluster.add_instance(replicaset_id="r1")
+    time.sleep(0.5)
+    assert i3.call("pico.space.property:get", "vshard_bootstrapped")[1]
+    wait_has_buckets(i3, 3000)
 
 
 def test_bucket_rebalancing(cluster: Cluster):
-- 
GitLab