From 374584c5fc21ae3ff7cf1c0bc5a1fe59d5d45f86 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 8 Sep 2022 18:09:06 +0300
Subject: [PATCH 1/4] refactor: extract define_str_enum macro

---
 src/traft/event.rs | 63 +++++++++++++---------------------------------
 src/util.rs        | 62 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 79 insertions(+), 46 deletions(-)

diff --git a/src/traft/event.rs b/src/traft/event.rs
index fed643694a..c56f68a379 100644
--- a/src/traft/event.rs
+++ b/src/traft/event.rs
@@ -9,6 +9,7 @@ use ::tarantool::fiber::{mutex::MutexGuard, Cond, Mutex};
 use ::tarantool::proc;
 use ::tarantool::unwrap_or;
 
+use crate::define_str_enum;
 use crate::tlog;
 use crate::traft::error::Error;
 use crate::unwrap_ok_or;
@@ -17,54 +18,24 @@ use thiserror::Error;
 pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
 
 #[derive(Error, Debug)]
-#[error("unknown event")]
-pub struct EventFromStrError;
-
-macro_rules! define_events {
-    ($($event:tt, $str:literal;)+) => {
-        ////////////////////////////////////////////////////////////////////////
-        /// An enumeration of builtin events
-        #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, PartialOrd, Ord)]
-        pub enum Event {
-            $( $event, )+
-        }
-
-        impl Event {
-            pub const fn as_str(&self) -> &str {
-                match self {
-                    $( Self::$event => $str, )+
-                }
-            }
-        }
-
-        impl std::fmt::Display for Event {
-            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-                f.write_str(self.as_str())
-            }
-        }
-
-        impl FromStr for Event {
-            type Err = EventFromStrError;
-
-            fn from_str(s: &str) -> Result<Self, Self::Err> {
-                match s {
-                    $( $str => Ok(Self::$event), )+
-                    _ => Err(EventFromStrError),
-                }
-            }
-        }
+#[error("unknown event {0}")]
+pub struct EventFromStrError(pub String);
+
+define_str_enum! {
+    ////////////////////////////////////////////////////////////////////////////
+    /// An enumeration of builtin events
+    pub enum Event {
+        Demoted = "raft.demoted",
+        JointStateEnter = "raft.joint-state-enter",
+        JointStateLeave = "raft.joint-state-leave",
+        JointStateDrop = "raft.joint-state-drop",
+        StatusChanged = "raft.status-changed",
+        TopologyChanged = "raft.topology-changed",
+        RaftLoopNeeded = "raft.loop-needed",
+        RaftEntryApplied = "raft.entry-applied",
     }
-}
 
-define_events! {
-    Demoted, "raft.demoted";
-    JointStateEnter, "raft.joint-state-enter";
-    JointStateLeave, "raft.joint-state-leave";
-    JointStateDrop, "raft.joint-state-drop";
-    StatusChanged, "raft.status-changed";
-    TopologyChanged, "raft.topology-changed";
-    RaftLoopNeeded, "raft.loop-needed";
-    RaftEntryApplied, "raft.entry-applied";
+    FromStr::Err = EventFromStrError;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/util.rs b/src/util.rs
index abb407246c..4bda629fcc 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -33,6 +33,68 @@ macro_rules! stringify_debug {
     }};
 }
 
+#[macro_export]
+macro_rules! define_str_enum {
+    (
+        $(#[$meta:meta])*
+        pub enum $enum:ident { $($space:tt = $str:literal,)+ }
+        FromStr::Err = $err:ident;
+    ) => {
+        $(#[$meta])*
+        #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, PartialOrd, Ord)]
+        pub enum $enum {
+            $( #[doc = $str] $space, )+
+        }
+
+        impl $enum {
+            pub const fn as_str(&self) -> &str {
+                match self {
+                    $( Self::$space => $str, )+
+                }
+            }
+        }
+
+        impl std::str::FromStr for $enum {
+            type Err = $err;
+
+            fn from_str(s: &str) -> Result<Self, Self::Err> {
+                match s {
+                    $( $str => Ok(Self::$space), )+
+                    _ => Err($err(s.into())),
+                }
+            }
+        }
+
+        impl std::fmt::Display for $enum {
+            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+                f.write_str(self.as_str())
+            }
+        }
+
+        impl serde::Serialize for $enum {
+            #[inline]
+            fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+            where
+                S: serde::Serializer,
+            {
+                serializer.serialize_str(self.as_str())
+            }
+        }
+
+        impl<'de> serde::Deserialize<'de> for $enum {
+            fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                use serde::de::Error;
+                let tmp = <&str>::deserialize(deserializer)?;
+                let res = tmp.parse().map_err(|e| D::Error::custom(e))?;
+                Ok(res)
+            }
+        }
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 /// A wrapper around `String` that garantees the string is uppercase by
 /// converting it to uppercase (if needed) on construction.
-- 
GitLab


From f7e472d42e8ef08a818bc2685c8c4e1fa9ca24eb Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 8 Sep 2022 18:15:59 +0300
Subject: [PATCH 2/4] chore: bump tarantool-module

---
 Cargo.lock | 3 +--
 tarantool  | 2 +-
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f5c1cad3c9..cee1d11993 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1127,7 +1127,7 @@ checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
 
 [[package]]
 name = "tarantool"
-version = "0.6.3"
+version = "0.6.4"
 dependencies = [
  "base64",
  "bitflags",
@@ -1135,7 +1135,6 @@ dependencies = [
  "dec",
  "derivative",
  "dlopen",
- "lazy_static",
  "libc",
  "log",
  "nix 0.24.1",
diff --git a/tarantool b/tarantool
index 8a1006460d..33b156e7c5 160000
--- a/tarantool
+++ b/tarantool
@@ -1 +1 @@
-Subproject commit 8a1006460de486b908ddd8dfeb00c0ba149a860f
+Subproject commit 33b156e7c5ac6b26aacd942a4ccf1674097fe7b4
-- 
GitLab


From 3b3e8179980ccc433d486625f89e97fc73103796 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 8 Sep 2022 18:16:20 +0300
Subject: [PATCH 3/4] feat: cluster-wide DML request op

---
 src/traft/mod.rs     | 183 ++++++++++++++++++++++++++++++++++++++++++-
 src/traft/storage.rs |  65 ++++++++++++++-
 2 files changed, 241 insertions(+), 7 deletions(-)

diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 0f9af4c74f..f5b6f74b86 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -6,14 +6,15 @@ pub mod failover;
 mod network;
 pub mod node;
 pub mod notify;
-mod storage;
+pub mod storage;
 pub mod topology;
 
 use crate::stringify_debug;
 use crate::util::Uppercase;
 use ::raft::prelude as raft;
+use ::tarantool::error::Error as TntError;
 use ::tarantool::tlua::LuaError;
-use ::tarantool::tuple::Encode;
+use ::tarantool::tuple::{Encode, ToTupleBuffer, Tuple, TupleBuffer};
 use serde::de::DeserializeOwned;
 use serde::{Deserialize, Serialize};
 use std::any::Any;
@@ -26,6 +27,7 @@ use uuid::Uuid;
 use protobuf::Message as _;
 
 pub use network::ConnectionPool;
+use storage::RaftSpace;
 pub use storage::Storage;
 pub use topology::Topology;
 
@@ -90,11 +92,14 @@ pub enum Op {
     PersistReplicationFactor {
         replication_factor: u8,
     },
+    /// Cluster-wide data modification operation.
+    /// Should be used to manipulate the cluster-wide configuration.
+    Dml(OpDML),
 }
 
 impl std::fmt::Display for Op {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        match self {
+        return match self {
             Self::Nop => f.write_str("Nop"),
             Self::Info { msg } => write!(f, "Info({msg:?})"),
             Self::EvalLua(OpEvalLua { code }) => write!(f, "EvalLua({code:?})"),
@@ -105,6 +110,48 @@ impl std::fmt::Display for Op {
             Self::PersistReplicationFactor { replication_factor } => {
                 write!(f, "PersistReplicationFactor({replication_factor})")
             }
+            Self::Dml(OpDML::Insert { space, tuple }) => {
+                write!(f, "Insert({space}, {})", DisplayAsJson(tuple))
+            }
+            Self::Dml(OpDML::Replace { space, tuple }) => {
+                write!(f, "Replace({space}, {})", DisplayAsJson(tuple))
+            }
+            Self::Dml(OpDML::Update { space, key, ops }) => {
+                let key = DisplayAsJson(key);
+                let ops = DisplayAsJson(&**ops);
+                write!(f, "Update({space}, {key}, {ops})")
+            }
+            Self::Dml(OpDML::Delete { space, key }) => {
+                write!(f, "Delete({space}, {})", DisplayAsJson(key))
+            }
+        };
+
+        struct DisplayAsJson<T>(pub T);
+
+        impl std::fmt::Display for DisplayAsJson<&TupleBuffer> {
+            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+                if let Some(data) = rmp_serde::from_slice::<serde_json::Value>(self.0.as_ref())
+                    .ok()
+                    .and_then(|v| serde_json::to_string(&v).ok())
+                {
+                    return write!(f, "{data}");
+                }
+
+                write!(f, "{:?}", self.0)
+            }
+        }
+
+        impl std::fmt::Display for DisplayAsJson<&[TupleBuffer]> {
+            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+                write!(f, "[")?;
+                if let Some(elem) = self.0.first() {
+                    write!(f, "{}", DisplayAsJson(elem))?;
+                }
+                for elem in self.0.iter().skip(1) {
+                    write!(f, ", {}", DisplayAsJson(elem))?;
+                }
+                write!(f, "]")
+            }
         }
     }
 }
@@ -127,6 +174,7 @@ impl Op {
                 Storage::persist_replication_factor(*replication_factor).unwrap();
                 Box::new(())
             }
+            Self::Dml(op) => Box::new(op.result()),
         }
     }
 }
@@ -175,6 +223,135 @@ pub trait OpResult {
     fn result(&self) -> Self::Result;
 }
 
+//////////////////////////////////////////////////////////////////////////////////////////
+// OpDML
+
+/// Cluster-wide data modification operation.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+pub enum OpDML {
+    Insert {
+        space: RaftSpace,
+        #[serde(with = "serde_bytes")]
+        tuple: TupleBuffer,
+    },
+    Replace {
+        space: RaftSpace,
+        #[serde(with = "serde_bytes")]
+        tuple: TupleBuffer,
+    },
+    Update {
+        space: RaftSpace,
+        #[serde(with = "serde_bytes")]
+        key: TupleBuffer,
+        #[serde(with = "vec_of_raw_byte_buf")]
+        ops: Vec<TupleBuffer>,
+    },
+    Delete {
+        space: RaftSpace,
+        #[serde(with = "serde_bytes")]
+        key: TupleBuffer,
+    },
+}
+
+impl OpResult for OpDML {
+    type Result = Result<Option<Tuple>, ::raft::StorageError>;
+    fn result(&self) -> Self::Result {
+        match self {
+            Self::Insert { space, tuple } => Storage::insert(*space, tuple).map(Some),
+            Self::Replace { space, tuple } => Storage::replace(*space, tuple).map(Some),
+            Self::Update { space, key, ops } => Storage::update(*space, key, ops),
+            Self::Delete { space, key } => Storage::delete(*space, key),
+        }
+    }
+}
+
+impl From<OpDML> for Op {
+    fn from(op: OpDML) -> Op {
+        Op::Dml(op)
+    }
+}
+
+impl OpDML {
+    /// Serializes `tuple` and returns an [`OpDML::Insert`] in case of success.
+    pub fn insert(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
+        let res = Self::Insert {
+            space,
+            tuple: tuple.to_tuple_buffer()?,
+        };
+        Ok(res)
+    }
+
+    /// Serializes `tuple` and returns an [`OpDML::Replace`] in case of success.
+    pub fn replace(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
+        let res = Self::Replace {
+            space,
+            tuple: tuple.to_tuple_buffer()?,
+        };
+        Ok(res)
+    }
+
+    /// Serializes `key` and returns an [`OpDML::Update`] in case of success.
+    pub fn update(
+        space: RaftSpace,
+        key: &impl ToTupleBuffer,
+        ops: Vec<TupleBuffer>,
+    ) -> Result<Self, TntError> {
+        let res = Self::Update {
+            space,
+            key: key.to_tuple_buffer()?,
+            ops,
+        };
+        Ok(res)
+    }
+
+    /// Serializes `key` and returns an [`OpDML::Delete`] in case of success.
+    pub fn delete(space: RaftSpace, key: &impl ToTupleBuffer) -> Result<Self, TntError> {
+        let res = Self::Delete {
+            space,
+            key: key.to_tuple_buffer()?,
+        };
+        Ok(res)
+    }
+}
+
+mod vec_of_raw_byte_buf {
+    use super::TupleBuffer;
+    use ::tarantool::error::Error as TntError;
+    use serde::de::Error as _;
+    use serde::ser::SerializeSeq;
+    use serde::{self, Deserialize, Deserializer, Serializer};
+    use serde_bytes::{ByteBuf, Bytes};
+    use std::convert::TryFrom;
+
+    pub fn serialize<S>(v: &[TupleBuffer], ser: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        let mut seq = ser.serialize_seq(Some(v.len()))?;
+        for buf in v {
+            seq.serialize_element(Bytes::new(buf.as_ref()))?;
+        }
+        seq.end()
+    }
+
+    pub fn deserialize<'de, D>(de: D) -> Result<Vec<TupleBuffer>, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let tmp = Vec::<ByteBuf>::deserialize(de)?;
+        // FIXME(gmoshkin): redundant copy happens here,
+        // because ByteBuf and TupleBuffer are essentially the same struct,
+        // but there's no easy foolproof way
+        // to convert a Vec<ByteBuf> to Vec<TupleBuffer>
+        // because of borrow and drop checkers
+        let res: Result<_, TntError> = tmp
+            .into_iter()
+            .map(|bb| TupleBuffer::try_from(bb.into_vec()))
+            .collect();
+        res.map_err(D::Error::custom)
+    }
+}
+
 //////////////////////////////////////////////////////////////////////////////////////////
 /// Serializable struct representing a member of the raft group.
 #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
diff --git a/src/traft/storage.rs b/src/traft/storage.rs
index 52da340d80..d6c7a86c8b 100644
--- a/src/traft/storage.rs
+++ b/src/traft/storage.rs
@@ -6,12 +6,13 @@ use ::raft::StorageError;
 use ::raft::INVALID_ID;
 use ::tarantool::index::IteratorType;
 use ::tarantool::space::Space;
-use ::tarantool::tuple::Tuple;
+use ::tarantool::tuple::{ToTupleBuffer, Tuple};
 use ::tarantool::unwrap_or;
 use serde::de::DeserializeOwned;
 use serde::Serialize;
 use thiserror::Error;
 
+use crate::define_str_enum;
 use crate::tlog;
 use crate::traft;
 use crate::traft::RaftId;
@@ -20,9 +21,33 @@ use crate::traft::RaftTerm;
 
 pub struct Storage;
 
-const RAFT_GROUP: &str = "raft_group";
-const RAFT_STATE: &str = "raft_state";
-const RAFT_LOG: &str = "raft_log";
+////////////////////////////////////////////////////////////////////////////////
+// RaftSpace
+////////////////////////////////////////////////////////////////////////////////
+
+define_str_enum! {
+    /// An enumeration of builtin raft spaces
+    pub enum RaftSpace {
+        Group = "raft_group",
+        State = "raft_state",
+        Log = "raft_log",
+    }
+
+    FromStr::Err = UnknownRaftSpace;
+}
+
+#[derive(Error, Debug)]
+#[error("unknown raft space {0}")]
+pub struct UnknownRaftSpace(pub String);
+
+// TODO(gmoshkin): remove this
+const RAFT_GROUP: &str = RaftSpace::Group.as_str();
+const RAFT_STATE: &str = RaftSpace::State.as_str();
+const RAFT_LOG: &str = RaftSpace::Log.as_str();
+
+////////////////////////////////////////////////////////////////////////////////
+// Error
+////////////////////////////////////////////////////////////////////////////////
 
 #[allow(clippy::enum_variant_names)]
 #[derive(Debug, Error)]
@@ -414,6 +439,38 @@ impl Storage {
         Storage::persist_commit(hs.commit)?;
         Ok(())
     }
+
+    pub fn insert(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
+        Storage::space(space.as_str())?
+            .insert(tuple)
+            .map_err(box_err)
+    }
+
+    pub fn replace(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
+        Storage::space(space.as_str())?
+            .replace(tuple)
+            .map_err(box_err)
+    }
+
+    pub fn update(
+        space: RaftSpace,
+        key: &impl ToTupleBuffer,
+        ops: &[impl ToTupleBuffer],
+    ) -> Result<Option<Tuple>, StorageError> {
+        Storage::space(space.as_str())?
+            .update(key, ops)
+            .map_err(box_err)
+    }
+
+    #[rustfmt::skip]
+    pub fn delete(
+        space: RaftSpace,
+        key: &impl ToTupleBuffer,
+    ) -> Result<Option<Tuple>, StorageError> {
+        Storage::space(space.as_str())?
+            .delete(key)
+            .map_err(box_err)
+    }
 }
 
 impl raft::Storage for Storage {
-- 
GitLab


From 8056973bb37710029166cd63f123fd925f923caa Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 8 Sep 2022 18:14:32 +0300
Subject: [PATCH 4/4] refactor: use DMLOp for replication_factor

---
 src/main.rs          | 13 ++++++++++---
 src/traft/mod.rs     | 12 ------------
 src/traft/storage.rs | 33 ++++++++++++++++++++++++++++-----
 3 files changed, 38 insertions(+), 20 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index 9165c279d6..e2e497ee44 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -11,6 +11,7 @@ use ::tarantool::tlua;
 use ::tarantool::transaction::start_transaction;
 use std::convert::TryFrom;
 use std::time::{Duration, Instant};
+use traft::storage::{RaftSpace, RaftStateKey};
 use traft::ExpelRequest;
 
 use clap::StructOpt as _;
@@ -610,9 +611,15 @@ fn start_boot(args: &args::Run) {
         lc.inc();
         init_entries.push({
             let ctx = traft::EntryContextNormal {
-                op: traft::Op::PersistReplicationFactor {
-                    replication_factor: args.init_replication_factor,
-                },
+                op: traft::OpDML::insert(
+                    RaftSpace::State,
+                    &(
+                        RaftStateKey::ReplicationFactor,
+                        args.init_replication_factor,
+                    ),
+                )
+                .expect("cannot fail")
+                .into(),
                 lc,
             };
             let e = traft::Entry {
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index f5b6f74b86..6e8d89a4c8 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -87,11 +87,6 @@ pub enum Op {
     PersistPeer {
         peer: Peer,
     },
-
-    #[serde(alias = "persist_replication_factor")]
-    PersistReplicationFactor {
-        replication_factor: u8,
-    },
     /// Cluster-wide data modification operation.
     /// Should be used to manipulate the cluster-wide configuration.
     Dml(OpDML),
@@ -107,9 +102,6 @@ impl std::fmt::Display for Op {
             Self::PersistPeer { peer } => {
                 write!(f, "PersistPeer{}", peer)
             }
-            Self::PersistReplicationFactor { replication_factor } => {
-                write!(f, "PersistReplicationFactor({replication_factor})")
-            }
             Self::Dml(OpDML::Insert { space, tuple }) => {
                 write!(f, "Insert({space}, {})", DisplayAsJson(tuple))
             }
@@ -170,10 +162,6 @@ impl Op {
                 Storage::persist_peer(peer).unwrap();
                 Box::new(peer.clone())
             }
-            Self::PersistReplicationFactor { replication_factor } => {
-                Storage::persist_replication_factor(*replication_factor).unwrap();
-                Box::new(())
-            }
             Self::Dml(op) => Box::new(op.result()),
         }
     }
diff --git a/src/traft/storage.rs b/src/traft/storage.rs
index d6c7a86c8b..fe30988225 100644
--- a/src/traft/storage.rs
+++ b/src/traft/storage.rs
@@ -45,6 +45,33 @@ const RAFT_GROUP: &str = RaftSpace::Group.as_str();
 const RAFT_STATE: &str = RaftSpace::State.as_str();
 const RAFT_LOG: &str = RaftSpace::Log.as_str();
 
+////////////////////////////////////////////////////////////////////////////////
+// RaftStateKey
+////////////////////////////////////////////////////////////////////////////////
+
+define_str_enum! {
+    /// An enumeration of builtin raft spaces
+    pub enum RaftStateKey {
+        ReplicationFactor = "replication_factor",
+        Commit = "commit",
+        Applied = "applied",
+        Term = "term",
+        Vote = "vote",
+        Gen = "gen",
+        Voters = "voters",
+        Learners = "learners",
+        VotersOutgoing = "voters_outgoing",
+        LearnersNext = "learners_next",
+        AutoLeave = "auto_leave",
+    }
+
+    FromStr::Err = UnknownRaftStateKey;
+}
+
+#[derive(Error, Debug)]
+#[error("unknown raft state key {0}")]
+pub struct UnknownRaftStateKey(pub String);
+
 ////////////////////////////////////////////////////////////////////////////////
 // Error
 ////////////////////////////////////////////////////////////////////////////////
@@ -271,11 +298,7 @@ impl Storage {
     }
 
     pub fn replication_factor() -> Result<Option<u8>, StorageError> {
-        Storage::raft_state("replication_factor")
-    }
-
-    pub fn persist_replication_factor(replication_factor: u8) -> Result<(), StorageError> {
-        Storage::persist_raft_state("replication_factor", replication_factor)
+        Storage::raft_state(RaftStateKey::ReplicationFactor.as_str())
     }
 
     pub fn persist_commit(commit: RaftIndex) -> Result<(), StorageError> {
-- 
GitLab