From 02e7ad7eee258787739f9b31125656f8a1f0b5ad Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 19 Oct 2022 18:20:33 +0300
Subject: [PATCH] feat(sharding): initial weights

---
 src/main.rs               | 25 ++++++++++++++++++++++++-
 src/traft/mod.rs          |  4 ++--
 src/traft/node.rs         | 33 +++++++++++++++++++++++++++++++++
 src/traft/rpc/sharding.rs | 35 ++++++++++++++++++++++++++---------
 src/traft/storage.rs      | 19 +++++++++++++++++++
 5 files changed, 104 insertions(+), 12 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index 1fbd27aee9..5304d3e871 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -18,6 +18,7 @@ use traft::Storage;
 use clap::StructOpt as _;
 use protobuf::Message as _;
 
+use crate::traft::rpc::sharding::cfg::ReplicasetWeights;
 use crate::traft::InstanceId;
 use crate::traft::{LogicalClock, RaftIndex, TargetGrade, UpdatePeerRequest};
 use traft::error::Error;
@@ -178,7 +179,7 @@ fn picolib_setup(args: &args::Run) {
             "vshard_cfg",
             tlua::function0(|| -> Result<traft::rpc::sharding::cfg::Cfg, Error> {
                 let node = traft::node::global()?;
-                traft::rpc::sharding::cfg::Cfg::from_storage(&node.storage.peers)
+                traft::rpc::sharding::cfg::Cfg::from_storage(&node.storage)
             }),
         );
         l.exec(
@@ -744,6 +745,28 @@ fn start_boot(args: &args::Run) {
             raft::Entry::try_from(e).unwrap()
         });
 
+        lc.inc();
+        init_entries.push({
+            let ctx = traft::EntryContextNormal {
+                op: traft::OpDML::insert(
+                    ClusterSpace::State,
+                    &(StateKey::ReplicasetWeights, ReplicasetWeights::new()),
+                )
+                .expect("cannot fail")
+                .into(),
+                lc,
+            };
+            let e = traft::Entry {
+                entry_type: raft::EntryType::EntryNormal,
+                index: (init_entries.len() + 1) as _,
+                term: 1,
+                data: vec![],
+                context: Some(traft::EntryContext::Normal(ctx)),
+            };
+
+            raft::Entry::try_from(e).unwrap()
+        });
+
         init_entries.push({
             let conf_change = raft::ConfChange {
                 change_type: raft::ConfChangeType::AddNode,
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 8773dea69d..b4ba3654ad 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -358,12 +358,12 @@ impl OpDML {
     pub fn update(
         space: ClusterSpace,
         key: &impl ToTupleBuffer,
-        ops: Vec<TupleBuffer>,
+        ops: impl Into<Vec<TupleBuffer>>,
     ) -> Result<Self, TntError> {
         let res = Self::Update {
             space,
             key: key.to_tuple_buffer()?,
-            ops,
+            ops: ops.into(),
         };
         Ok(res)
     }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index ef333676f8..ad378318a5 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -14,6 +14,7 @@ use ::tarantool::error::TransactionError;
 use ::tarantool::fiber;
 use ::tarantool::fiber::{Cond, Mutex};
 use ::tarantool::proc;
+use tarantool::space::UpdateOps;
 use ::tarantool::tlua;
 use ::tarantool::transaction::start_transaction;
 use std::cell::Cell;
@@ -33,6 +34,7 @@ use crate::traft::Peer;
 use crate::traft::RaftId;
 use crate::traft::RaftIndex;
 use crate::traft::RaftTerm;
+use crate::traft::storage::ClusterSpace;
 use crate::warn_or_panic;
 use crate::{unwrap_ok_or, unwrap_some_or};
 use ::tarantool::util::IntoClones as _;
@@ -1173,6 +1175,37 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                     }
                 }
             }
+            let replicaset_weight = storage
+                .state
+                .replicaset_weight(replicaset_id)
+                .expect("storage error");
+            if replicaset_weight.is_none() {
+                if let Err(e) = (|| -> Result<(), Error> {
+                    let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
+                    let weight = if vshard_bootstrapped { 0. } else { 1. };
+                    let mut ops = UpdateOps::new();
+                    ops.assign(format!("['value']['{replicaset_id}']"), weight)?;
+                    let req = traft::OpDML::update(
+                        ClusterSpace::State,
+                        &[StateKey::ReplicasetWeights],
+                        ops,
+                    )?;
+                    // TODO: don't hard code the timeout
+                    node.propose_and_wait(req, Duration::from_secs(3))??;
+                    Ok(())
+                })() {
+                    // TODO: what if all replicas have changed their grade
+                    // successfully, but the replicaset_weight failed to set?
+                    tlog!(Warning, "failed to set replicaset weight: {e}";
+                        "replicaset_id" => replicaset_id,
+                    );
+
+                    // TODO: don't hard code timeout
+                    event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1))
+                        .unwrap();
+                    continue 'governor;
+                }
+            }
 
             tlog!(Info, "configured replication"; "replicaset_id" => replicaset_id);
 
diff --git a/src/traft/rpc/sharding.rs b/src/traft/rpc/sharding.rs
index 921a8eec07..507467a4d7 100644
--- a/src/traft/rpc/sharding.rs
+++ b/src/traft/rpc/sharding.rs
@@ -17,7 +17,7 @@ fn proc_sharding(req: Request) -> Result<Response, Error> {
     let _ = req.term;
 
     let storage = &node.storage;
-    let cfg = cfg::Cfg::from_storage(&storage.peers)?;
+    let cfg = cfg::Cfg::from_storage(&storage)?;
 
     let lua = ::tarantool::lua_state();
     // TODO: fix user's permissions
@@ -67,23 +67,34 @@ impl super::Request for Request {
 pub mod cfg {
     use crate::traft::error::Error;
     use crate::traft::storage::peer_field;
-    use crate::traft::storage::Peers;
+    use crate::traft::storage::Storage;
 
     use ::tarantool::tlua;
 
     use std::collections::HashMap;
 
-    #[derive(Default, Clone, Debug, PartialEq, Eq)]
+    #[derive(Default, Clone, Debug, PartialEq)]
     #[derive(tlua::PushInto, tlua::Push, tlua::LuaRead)]
     pub struct Cfg {
         sharding: HashMap<String, Replicaset>,
         discovery_mode: DiscoveryMode,
     }
 
-    #[derive(Default, Clone, Debug, PartialEq, Eq)]
+    #[derive(Default, Clone, Debug, PartialEq)]
     #[derive(tlua::PushInto, tlua::Push, tlua::LuaRead)]
     struct Replicaset {
         replicas: HashMap<String, Replica>,
+        weight: Option<Weight>,
+    }
+
+    impl Replicaset {
+        #[inline]
+        pub fn with_weight(weight: impl Into<Option<Weight>>) -> Self {
+            Self {
+                weight: weight.into(),
+                ..Default::default()
+            }
+        }
     }
 
     #[derive(Default, Clone, Debug, PartialEq, Eq)]
@@ -103,13 +114,19 @@ pub mod cfg {
         Once,
     }
 
+    pub type ReplicasetWeights = HashMap<String, Weight>;
+    pub type Weight = f64;
+
     impl Cfg {
-        pub fn from_storage(peers: &Peers) -> Result<Self, Error> {
-            use peer_field::{InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, IsMaster};
-            type Fields = (InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, IsMaster);
+        pub fn from_storage(storage: &Storage) -> Result<Self, Error> {
+            use peer_field::{InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, ReplicasetId, IsMaster};
+            type Fields = (InstanceId, InstanceUuid, PeerAddress, ReplicasetUuid, ReplicasetId, IsMaster);
+            let replicaset_weights = storage.state.replicaset_weights()?;
             let mut sharding: HashMap<String, Replicaset> = HashMap::new();
-            for (id, uuid, addr, rset, is_master) in peers.peers_fields::<Fields>()? {
-                let replicaset = sharding.entry(rset).or_default();
+            for (id, uuid, addr, rset, rset_id, is_master) in storage.peers.peers_fields::<Fields>()? {
+                let replicaset = sharding.entry(rset).or_insert_with(||
+                    Replicaset::with_weight(replicaset_weights.get(&rset_id).copied())
+                );
                 replicaset.replicas.insert(
                     uuid,
                     Replica {
diff --git a/src/traft/storage.rs b/src/traft/storage.rs
index 3d36eacd7d..7854cf321c 100644
--- a/src/traft/storage.rs
+++ b/src/traft/storage.rs
@@ -6,6 +6,7 @@ use thiserror::Error;
 use crate::define_str_enum;
 use crate::traft;
 use crate::traft::error::Error as TraftError;
+use crate::traft::rpc::sharding::cfg::{ReplicasetWeights, Weight};
 use crate::traft::RaftId;
 use crate::traft::RaftIndex;
 
@@ -78,6 +79,7 @@ define_str_enum! {
     pub enum StateKey {
         ReplicationFactor = "replication_factor",
         VshardBootstrapped = "vshard_bootstrapped",
+        ReplicasetWeights = "replicaset_weights",
     }
 
     FromStr::Err = UnknownStateKey;
@@ -174,6 +176,23 @@ impl State {
         self.space_mut().put(&(key, value))?;
         Ok(())
     }
+
+    #[inline]
+    pub fn replicaset_weight(&self, replicaset_id: &str) -> tarantool::Result<Option<Weight>> {
+        // I tried doing tuple.try_get(format!("[1]['{replicaset_id}']").as_str())
+        // but it doesn't work :(
+        Ok(self.replicaset_weights()?.get(replicaset_id).copied())
+    }
+
+    #[inline]
+    pub fn replicaset_weights(&self) -> tarantool::Result<ReplicasetWeights> {
+        Ok(self.get(StateKey::ReplicasetWeights)?.unwrap_or_default())
+    }
+
+    #[inline]
+    pub fn vshard_bootstrapped(&self) -> tarantool::Result<bool> {
+        Ok(self.get(StateKey::VshardBootstrapped)?.unwrap_or_default())
+    }
 }
 
 impl Clone for State {
-- 
GitLab