From f0a44c894cdc2cb94b964b3a8a564964c0c473b2 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 14 Sep 2022 13:22:31 +0300
Subject: [PATCH] refactor(storage): introduce cluster_state space

replication_factor now goes into cluster_state
---
 src/main.rs                      |  9 +--
 src/traft/mod.rs                 | 18 +++---
 src/traft/storage.rs             | 99 ++++++++++++++++++--------------
 src/util.rs                      | 38 +++++++++++-
 test/int/test_joining.py         |  6 +-
 test/int/test_network_effects.py |  2 +-
 6 files changed, 109 insertions(+), 63 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index 48881283eb..9b47e48c29 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -11,7 +11,7 @@ use ::tarantool::tlua;
 use ::tarantool::transaction::start_transaction;
 use std::convert::TryFrom;
 use std::time::{Duration, Instant};
-use traft::storage::{RaftSpace, RaftStateKey};
+use traft::storage::{ClusterSpace, StateKey};
 use traft::ExpelRequest;
 use traft::RaftSpaceAccess;
 
@@ -619,11 +619,8 @@ fn start_boot(args: &args::Run) {
         init_entries.push({
             let ctx = traft::EntryContextNormal {
                 op: traft::OpDML::insert(
-                    RaftSpace::State,
-                    &(
-                        RaftStateKey::ReplicationFactor,
-                        args.init_replication_factor,
-                    ),
+                    ClusterSpace::State,
+                    &(StateKey::ReplicationFactor, args.init_replication_factor),
                 )
                 .expect("cannot fail")
                 .into(),
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 5bb385bea2..229aab5ce9 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -29,7 +29,7 @@ use protobuf::Message as _;
 
 pub use network::ConnectionPool;
 pub use raft_storage::RaftSpaceAccess;
-use storage::RaftSpace;
+use storage::ClusterSpace;
 pub use storage::Storage;
 pub use topology::Topology;
 
@@ -220,24 +220,24 @@ pub trait OpResult {
 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
 pub enum OpDML {
     Insert {
-        space: RaftSpace,
+        space: ClusterSpace,
         #[serde(with = "serde_bytes")]
         tuple: TupleBuffer,
     },
     Replace {
-        space: RaftSpace,
+        space: ClusterSpace,
         #[serde(with = "serde_bytes")]
         tuple: TupleBuffer,
     },
     Update {
-        space: RaftSpace,
+        space: ClusterSpace,
         #[serde(with = "serde_bytes")]
         key: TupleBuffer,
         #[serde(with = "vec_of_raw_byte_buf")]
         ops: Vec<TupleBuffer>,
     },
     Delete {
-        space: RaftSpace,
+        space: ClusterSpace,
         #[serde(with = "serde_bytes")]
         key: TupleBuffer,
     },
@@ -263,7 +263,7 @@ impl From<OpDML> for Op {
 
 impl OpDML {
     /// Serializes `tuple` and returns an [`OpDML::Insert`] in case of success.
-    pub fn insert(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
+    pub fn insert(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
         let res = Self::Insert {
             space,
             tuple: tuple.to_tuple_buffer()?,
@@ -272,7 +272,7 @@ impl OpDML {
     }
 
     /// Serializes `tuple` and returns an [`OpDML::Replace`] in case of success.
-    pub fn replace(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
+    pub fn replace(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
         let res = Self::Replace {
             space,
             tuple: tuple.to_tuple_buffer()?,
@@ -282,7 +282,7 @@ impl OpDML {
 
     /// Serializes `key` and returns an [`OpDML::Update`] in case of success.
     pub fn update(
-        space: RaftSpace,
+        space: ClusterSpace,
         key: &impl ToTupleBuffer,
         ops: Vec<TupleBuffer>,
     ) -> Result<Self, TntError> {
@@ -295,7 +295,7 @@ impl OpDML {
     }
 
     /// Serializes `key` and returns an [`OpDML::Delete`] in case of success.
-    pub fn delete(space: RaftSpace, key: &impl ToTupleBuffer) -> Result<Self, TntError> {
+    pub fn delete(space: ClusterSpace, key: &impl ToTupleBuffer) -> Result<Self, TntError> {
         let res = Self::Delete {
             space,
             key: key.to_tuple_buffer()?,
diff --git a/src/traft/storage.rs b/src/traft/storage.rs
index 462854f604..ce2496f949 100644
--- a/src/traft/storage.rs
+++ b/src/traft/storage.rs
@@ -2,8 +2,7 @@ use ::raft::StorageError;
 use ::raft::INVALID_ID;
 use ::tarantool::index::IteratorType;
 use ::tarantool::space::Space;
-use ::tarantool::tuple::{ToTupleBuffer, Tuple};
-use serde::de::DeserializeOwned;
+use ::tarantool::tuple::{DecodeOwned, ToTupleBuffer, Tuple};
 use thiserror::Error;
 
 use crate::define_str_enum;
@@ -14,54 +13,42 @@ use crate::traft::RaftIndex;
 pub struct Storage;
 
 ////////////////////////////////////////////////////////////////////////////////
-// RaftSpace
+// ClusterSpace
 ////////////////////////////////////////////////////////////////////////////////
 
 define_str_enum! {
-    /// An enumeration of builtin raft spaces
-    pub enum RaftSpace {
+    /// An enumeration of builtin cluster-wide spaces
+    pub enum ClusterSpace {
         Group = "raft_group",
-        State = "raft_state",
-        Log = "raft_log",
+        State = "cluster_state",
     }
 
-    FromStr::Err = UnknownRaftSpace;
+    FromStr::Err = UnknownClusterSpace;
 }
 
 #[derive(Error, Debug)]
-#[error("unknown raft space {0}")]
-pub struct UnknownRaftSpace(pub String);
+#[error("unknown cluster space {0}")]
+pub struct UnknownClusterSpace(pub String);
 
 // TODO(gmoshkin): remove this
-const RAFT_GROUP: &str = RaftSpace::Group.as_str();
-const RAFT_STATE: &str = RaftSpace::State.as_str();
+const RAFT_GROUP: &str = ClusterSpace::Group.as_str();
 
 ////////////////////////////////////////////////////////////////////////////////
-// RaftStateKey
+// StateKey
 ////////////////////////////////////////////////////////////////////////////////
 
 define_str_enum! {
     /// An enumeration of builtin raft spaces
-    pub enum RaftStateKey {
+    pub enum StateKey {
         ReplicationFactor = "replication_factor",
-        Commit = "commit",
-        Applied = "applied",
-        Term = "term",
-        Vote = "vote",
-        Gen = "gen",
-        Voters = "voters",
-        Learners = "learners",
-        VotersOutgoing = "voters_outgoing",
-        LearnersNext = "learners_next",
-        AutoLeave = "auto_leave",
     }
 
-    FromStr::Err = UnknownRaftStateKey;
+    FromStr::Err = UnknownStateKey;
 }
 
 #[derive(Error, Debug)]
-#[error("unknown raft state key {0}")]
-pub struct UnknownRaftStateKey(pub String);
+#[error("unknown state key {0}")]
+pub struct UnknownStateKey(pub String);
 
 ////////////////////////////////////////////////////////////////////////////////
 // Error
@@ -82,9 +69,26 @@ fn box_err(e: impl std::error::Error + Sync + Send + 'static) -> StorageError {
 
 impl Storage {
     pub fn init_schema() {
-        crate::tarantool::eval(
-            r#"
-            box.schema.space.create('raft_group', {
+        ::tarantool::lua_state()
+            .exec_with(
+                r#"
+            local STATE, GROUP = ...
+
+            box.schema.space.create(STATE, {
+                if_not_exists = true,
+                is_local = true,
+                format = {
+                    {name = 'key', type = 'string', is_nullable = false},
+                    {name = 'value', type = 'any', is_nullable = false},
+                }
+            })
+            box.space[STATE]:create_index('pk', {
+                if_not_exists = true,
+                parts = {{'key'}},
+                unique = true,
+            })
+
+            box.schema.space.create(GROUP, {
                 if_not_exists = true,
                 is_local = true,
                 format = {
@@ -100,34 +104,40 @@ impl Storage {
                     {name = 'failure_domain', type = 'map', is_nullable = false},
                 }
             })
-            box.space.raft_group:create_index('instance_id', {
+            box.space[GROUP]:create_index('instance_id', {
                 if_not_exists = true,
                 parts = {{'instance_id'}},
                 unique = true,
             })
-            box.space.raft_group:create_index('raft_id', {
+            box.space[GROUP]:create_index('raft_id', {
                 if_not_exists = true,
                 parts = {{'raft_id'}},
                 unique = true,
             })
-            box.space.raft_group:create_index('replicaset_id', {
+            box.space[GROUP]:create_index('replicaset_id', {
                 if_not_exists = true,
                 parts = {{'replicaset_id'}, {'commit_index'}},
                 unique = false,
             })
         "#,
-        )
-        .unwrap();
+                (ClusterSpace::State, RAFT_GROUP),
+            )
+            .unwrap();
     }
 
-    fn space(name: &str) -> Result<Space, StorageError> {
-        Space::find(name)
+    fn space(name: impl AsRef<str> + Into<String>) -> Result<Space, StorageError> {
+        Space::find(name.as_ref())
             .ok_or_else(|| Error::NoSuchSpace(name.into()))
             .map_err(box_err)
     }
 
-    fn raft_state<T: DeserializeOwned>(key: &str) -> Result<Option<T>, StorageError> {
-        let tuple: Option<Tuple> = Storage::space(RAFT_STATE)?.get(&(key,)).map_err(box_err)?;
+    fn cluster_state<T>(key: StateKey) -> Result<Option<T>, StorageError>
+    where
+        T: DecodeOwned,
+    {
+        let tuple: Option<Tuple> = Storage::space(ClusterSpace::State)?
+            .get(&(key,))
+            .map_err(box_err)?;
 
         match tuple {
             Some(t) => t.field(1).map_err(box_err),
@@ -216,8 +226,9 @@ impl Storage {
         Ok(ret)
     }
 
+    #[inline]
     pub fn replication_factor() -> Result<Option<u8>, StorageError> {
-        Storage::raft_state(RaftStateKey::ReplicationFactor.as_str())
+        Storage::cluster_state(StateKey::ReplicationFactor)
     }
 
     pub fn persist_peer(peer: &traft::Peer) -> Result<(), StorageError> {
@@ -235,20 +246,20 @@ impl Storage {
         Ok(())
     }
 
-    pub fn insert(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
+    pub fn insert(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
         Storage::space(space.as_str())?
             .insert(tuple)
             .map_err(box_err)
     }
 
-    pub fn replace(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
+    pub fn replace(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
         Storage::space(space.as_str())?
             .replace(tuple)
             .map_err(box_err)
     }
 
     pub fn update(
-        space: RaftSpace,
+        space: ClusterSpace,
         key: &impl ToTupleBuffer,
         ops: &[impl ToTupleBuffer],
     ) -> Result<Option<Tuple>, StorageError> {
@@ -259,7 +270,7 @@ impl Storage {
 
     #[rustfmt::skip]
     pub fn delete(
-        space: RaftSpace,
+        space: ClusterSpace,
         key: &impl ToTupleBuffer,
     ) -> Result<Option<Tuple>, StorageError> {
         Storage::space(space.as_str())?
diff --git a/src/util.rs b/src/util.rs
index 4bda629fcc..c3f9ec2b1f 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -54,6 +54,18 @@ macro_rules! define_str_enum {
             }
         }
 
+        impl AsRef<str> for $enum {
+            fn as_ref(&self) -> &str {
+                self.as_str()
+            }
+        }
+
+        impl From<$enum> for String {
+            fn from(e: $enum) -> Self {
+                e.as_str().into()
+            }
+        }
+
         impl std::str::FromStr for $enum {
             type Err = $err;
 
@@ -88,10 +100,34 @@ macro_rules! define_str_enum {
             {
                 use serde::de::Error;
                 let tmp = <&str>::deserialize(deserializer)?;
-                let res = tmp.parse().map_err(|e| D::Error::custom(e))?;
+                let res = tmp.parse().map_err(|_| Error::unknown_variant(tmp, &[$($str),+]))?;
                 Ok(res)
             }
         }
+
+        impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::Push<L> for $enum {
+            type Err = ::tarantool::tlua::Void;
+            fn push_to_lua(&self, lua: L) -> ::tarantool::tlua::PushResult<L, Self> {
+                ::tarantool::tlua::PushInto::push_into_lua(self.as_str(), lua)
+            }
+        }
+        impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::PushOne<L> for $enum {}
+
+        impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::PushInto<L> for $enum {
+            type Err = ::tarantool::tlua::Void;
+            fn push_into_lua(self, lua: L) -> ::tarantool::tlua::PushIntoResult<L, Self> {
+                ::tarantool::tlua::PushInto::push_into_lua(self.as_str(), lua)
+            }
+        }
+        impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::PushOneInto<L> for $enum {}
+
+        impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::LuaRead<L> for $enum {
+            fn lua_read_at_position(lua: L, index: std::num::NonZeroI32) -> Result<Self, L> {
+                ::tarantool::tlua::StringInLua::lua_read_at_position(&lua, index).ok()
+                    .and_then(|s| s.parse().ok())
+                    .ok_or(lua)
+            }
+        }
     }
 }
 
diff --git a/test/int/test_joining.py b/test/int/test_joining.py
index 475c0265be..9201e5fa44 100644
--- a/test/int/test_joining.py
+++ b/test/int/test_joining.py
@@ -182,7 +182,7 @@ def test_init_replication_factor(cluster: Cluster):
     # Scenario: first instance shares --init-replication-factor to the whole cluster
     #   Given an Leader instance with --init_replication_factor=2
     #   When a new instances with different --init-replication-factor joins to the cluster
-    #   Then all of them have raft_state[replication_factor] equals to the Leader
+    #   Then all of them have cluster_state[replication_factor] equals to the Leader
     #   And there are two replicasets in the cluster
 
     i1 = cluster.add_instance(init_replication_factor=2)
@@ -190,7 +190,9 @@ def test_init_replication_factor(cluster: Cluster):
     i3 = cluster.add_instance(init_replication_factor=4)
 
     def read_replication_factor(instance):
-        return instance.eval('return box.space.raft_state:get("replication_factor")')[1]
+        return instance.eval(
+            'return box.space.cluster_state:get("replication_factor")'
+        )[1]
 
     assert read_replication_factor(i1) == 2
     assert read_replication_factor(i2) == 2
diff --git a/test/int/test_network_effects.py b/test/int/test_network_effects.py
index 6130db0cb5..6717635c0b 100644
--- a/test/int/test_network_effects.py
+++ b/test/int/test_network_effects.py
@@ -27,7 +27,7 @@ def test_log_rollback(cluster3: Cluster):
     i3.assert_raft_status("Follower")
 
     def propose_state_change(srv: Instance, value):
-        code = 'box.space.raft_state:put({"test-timeline", "%s"})' % value
+        code = 'box.space.cluster_state:put({"test-timeline", "%s"})' % value
         return srv.raft_propose_eval(code, 0.1)
 
     propose_state_change(i1, "i1 is a leader")
-- 
GitLab