From 374584c5fc21ae3ff7cf1c0bc5a1fe59d5d45f86 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 8 Sep 2022 18:09:06 +0300 Subject: [PATCH 1/4] refactor: extract define_str_enum macro --- src/traft/event.rs | 63 +++++++++++++--------------------------------- src/util.rs | 62 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 46 deletions(-) diff --git a/src/traft/event.rs b/src/traft/event.rs index fed643694a..c56f68a379 100644 --- a/src/traft/event.rs +++ b/src/traft/event.rs @@ -9,6 +9,7 @@ use ::tarantool::fiber::{mutex::MutexGuard, Cond, Mutex}; use ::tarantool::proc; use ::tarantool::unwrap_or; +use crate::define_str_enum; use crate::tlog; use crate::traft::error::Error; use crate::unwrap_ok_or; @@ -17,54 +18,24 @@ use thiserror::Error; pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>; #[derive(Error, Debug)] -#[error("unknown event")] -pub struct EventFromStrError; - -macro_rules! define_events { - ($($event:tt, $str:literal;)+) => { - //////////////////////////////////////////////////////////////////////// - /// An enumeration of builtin events - #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, PartialOrd, Ord)] - pub enum Event { - $( $event, )+ - } - - impl Event { - pub const fn as_str(&self) -> &str { - match self { - $( Self::$event => $str, )+ - } - } - } - - impl std::fmt::Display for Event { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.write_str(self.as_str()) - } - } - - impl FromStr for Event { - type Err = EventFromStrError; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - match s { - $( $str => Ok(Self::$event), )+ - _ => Err(EventFromStrError), - } - } - } +#[error("unknown event {0}")] +pub struct EventFromStrError(pub String); + +define_str_enum! { + //////////////////////////////////////////////////////////////////////////// + /// An enumeration of builtin events + pub enum Event { + Demoted = "raft.demoted", + JointStateEnter = "raft.joint-state-enter", + JointStateLeave = "raft.joint-state-leave", + JointStateDrop = "raft.joint-state-drop", + StatusChanged = "raft.status-changed", + TopologyChanged = "raft.topology-changed", + RaftLoopNeeded = "raft.loop-needed", + RaftEntryApplied = "raft.entry-applied", } -} -define_events! { - Demoted, "raft.demoted"; - JointStateEnter, "raft.joint-state-enter"; - JointStateLeave, "raft.joint-state-leave"; - JointStateDrop, "raft.joint-state-drop"; - StatusChanged, "raft.status-changed"; - TopologyChanged, "raft.topology-changed"; - RaftLoopNeeded, "raft.loop-needed"; - RaftEntryApplied, "raft.entry-applied"; + FromStr::Err = EventFromStrError; } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/util.rs b/src/util.rs index abb407246c..4bda629fcc 100644 --- a/src/util.rs +++ b/src/util.rs @@ -33,6 +33,68 @@ macro_rules! stringify_debug { }}; } +#[macro_export] +macro_rules! define_str_enum { + ( + $(#[$meta:meta])* + pub enum $enum:ident { $($space:tt = $str:literal,)+ } + FromStr::Err = $err:ident; + ) => { + $(#[$meta])* + #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, PartialOrd, Ord)] + pub enum $enum { + $( #[doc = $str] $space, )+ + } + + impl $enum { + pub const fn as_str(&self) -> &str { + match self { + $( Self::$space => $str, )+ + } + } + } + + impl std::str::FromStr for $enum { + type Err = $err; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + $( $str => Ok(Self::$space), )+ + _ => Err($err(s.into())), + } + } + } + + impl std::fmt::Display for $enum { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str(self.as_str()) + } + } + + impl serde::Serialize for $enum { + #[inline] + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + serializer.serialize_str(self.as_str()) + } + } + + impl<'de> serde::Deserialize<'de> for $enum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + use serde::de::Error; + let tmp = <&str>::deserialize(deserializer)?; + let res = tmp.parse().map_err(|e| D::Error::custom(e))?; + Ok(res) + } + } + } +} + //////////////////////////////////////////////////////////////////////////////// /// A wrapper around `String` that garantees the string is uppercase by /// converting it to uppercase (if needed) on construction. -- GitLab From f7e472d42e8ef08a818bc2685c8c4e1fa9ca24eb Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 8 Sep 2022 18:15:59 +0300 Subject: [PATCH 2/4] chore: bump tarantool-module --- Cargo.lock | 3 +-- tarantool | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f5c1cad3c9..cee1d11993 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1127,7 +1127,7 @@ checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" [[package]] name = "tarantool" -version = "0.6.3" +version = "0.6.4" dependencies = [ "base64", "bitflags", @@ -1135,7 +1135,6 @@ dependencies = [ "dec", "derivative", "dlopen", - "lazy_static", "libc", "log", "nix 0.24.1", diff --git a/tarantool b/tarantool index 8a1006460d..33b156e7c5 160000 --- a/tarantool +++ b/tarantool @@ -1 +1 @@ -Subproject commit 8a1006460de486b908ddd8dfeb00c0ba149a860f +Subproject commit 33b156e7c5ac6b26aacd942a4ccf1674097fe7b4 -- GitLab From 3b3e8179980ccc433d486625f89e97fc73103796 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 8 Sep 2022 18:16:20 +0300 Subject: [PATCH 3/4] feat: cluster-wide DML request op --- src/traft/mod.rs | 183 ++++++++++++++++++++++++++++++++++++++++++- src/traft/storage.rs | 65 ++++++++++++++- 2 files changed, 241 insertions(+), 7 deletions(-) diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 0f9af4c74f..f5b6f74b86 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -6,14 +6,15 @@ pub mod failover; mod network; pub mod node; pub mod notify; -mod storage; +pub mod storage; pub mod topology; use crate::stringify_debug; use crate::util::Uppercase; use ::raft::prelude as raft; +use ::tarantool::error::Error as TntError; use ::tarantool::tlua::LuaError; -use ::tarantool::tuple::Encode; +use ::tarantool::tuple::{Encode, ToTupleBuffer, Tuple, TupleBuffer}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::any::Any; @@ -26,6 +27,7 @@ use uuid::Uuid; use protobuf::Message as _; pub use network::ConnectionPool; +use storage::RaftSpace; pub use storage::Storage; pub use topology::Topology; @@ -90,11 +92,14 @@ pub enum Op { PersistReplicationFactor { replication_factor: u8, }, + /// Cluster-wide data modification operation. + /// Should be used to manipulate the cluster-wide configuration. + Dml(OpDML), } impl std::fmt::Display for Op { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { + return match self { Self::Nop => f.write_str("Nop"), Self::Info { msg } => write!(f, "Info({msg:?})"), Self::EvalLua(OpEvalLua { code }) => write!(f, "EvalLua({code:?})"), @@ -105,6 +110,48 @@ impl std::fmt::Display for Op { Self::PersistReplicationFactor { replication_factor } => { write!(f, "PersistReplicationFactor({replication_factor})") } + Self::Dml(OpDML::Insert { space, tuple }) => { + write!(f, "Insert({space}, {})", DisplayAsJson(tuple)) + } + Self::Dml(OpDML::Replace { space, tuple }) => { + write!(f, "Replace({space}, {})", DisplayAsJson(tuple)) + } + Self::Dml(OpDML::Update { space, key, ops }) => { + let key = DisplayAsJson(key); + let ops = DisplayAsJson(&**ops); + write!(f, "Update({space}, {key}, {ops})") + } + Self::Dml(OpDML::Delete { space, key }) => { + write!(f, "Delete({space}, {})", DisplayAsJson(key)) + } + }; + + struct DisplayAsJson<T>(pub T); + + impl std::fmt::Display for DisplayAsJson<&TupleBuffer> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + if let Some(data) = rmp_serde::from_slice::<serde_json::Value>(self.0.as_ref()) + .ok() + .and_then(|v| serde_json::to_string(&v).ok()) + { + return write!(f, "{data}"); + } + + write!(f, "{:?}", self.0) + } + } + + impl std::fmt::Display for DisplayAsJson<&[TupleBuffer]> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "[")?; + if let Some(elem) = self.0.first() { + write!(f, "{}", DisplayAsJson(elem))?; + } + for elem in self.0.iter().skip(1) { + write!(f, ", {}", DisplayAsJson(elem))?; + } + write!(f, "]") + } } } } @@ -127,6 +174,7 @@ impl Op { Storage::persist_replication_factor(*replication_factor).unwrap(); Box::new(()) } + Self::Dml(op) => Box::new(op.result()), } } } @@ -175,6 +223,135 @@ pub trait OpResult { fn result(&self) -> Self::Result; } +////////////////////////////////////////////////////////////////////////////////////////// +// OpDML + +/// Cluster-wide data modification operation. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum OpDML { + Insert { + space: RaftSpace, + #[serde(with = "serde_bytes")] + tuple: TupleBuffer, + }, + Replace { + space: RaftSpace, + #[serde(with = "serde_bytes")] + tuple: TupleBuffer, + }, + Update { + space: RaftSpace, + #[serde(with = "serde_bytes")] + key: TupleBuffer, + #[serde(with = "vec_of_raw_byte_buf")] + ops: Vec<TupleBuffer>, + }, + Delete { + space: RaftSpace, + #[serde(with = "serde_bytes")] + key: TupleBuffer, + }, +} + +impl OpResult for OpDML { + type Result = Result<Option<Tuple>, ::raft::StorageError>; + fn result(&self) -> Self::Result { + match self { + Self::Insert { space, tuple } => Storage::insert(*space, tuple).map(Some), + Self::Replace { space, tuple } => Storage::replace(*space, tuple).map(Some), + Self::Update { space, key, ops } => Storage::update(*space, key, ops), + Self::Delete { space, key } => Storage::delete(*space, key), + } + } +} + +impl From<OpDML> for Op { + fn from(op: OpDML) -> Op { + Op::Dml(op) + } +} + +impl OpDML { + /// Serializes `tuple` and returns an [`OpDML::Insert`] in case of success. + pub fn insert(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> { + let res = Self::Insert { + space, + tuple: tuple.to_tuple_buffer()?, + }; + Ok(res) + } + + /// Serializes `tuple` and returns an [`OpDML::Replace`] in case of success. + pub fn replace(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> { + let res = Self::Replace { + space, + tuple: tuple.to_tuple_buffer()?, + }; + Ok(res) + } + + /// Serializes `key` and returns an [`OpDML::Update`] in case of success. + pub fn update( + space: RaftSpace, + key: &impl ToTupleBuffer, + ops: Vec<TupleBuffer>, + ) -> Result<Self, TntError> { + let res = Self::Update { + space, + key: key.to_tuple_buffer()?, + ops, + }; + Ok(res) + } + + /// Serializes `key` and returns an [`OpDML::Delete`] in case of success. + pub fn delete(space: RaftSpace, key: &impl ToTupleBuffer) -> Result<Self, TntError> { + let res = Self::Delete { + space, + key: key.to_tuple_buffer()?, + }; + Ok(res) + } +} + +mod vec_of_raw_byte_buf { + use super::TupleBuffer; + use ::tarantool::error::Error as TntError; + use serde::de::Error as _; + use serde::ser::SerializeSeq; + use serde::{self, Deserialize, Deserializer, Serializer}; + use serde_bytes::{ByteBuf, Bytes}; + use std::convert::TryFrom; + + pub fn serialize<S>(v: &[TupleBuffer], ser: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut seq = ser.serialize_seq(Some(v.len()))?; + for buf in v { + seq.serialize_element(Bytes::new(buf.as_ref()))?; + } + seq.end() + } + + pub fn deserialize<'de, D>(de: D) -> Result<Vec<TupleBuffer>, D::Error> + where + D: Deserializer<'de>, + { + let tmp = Vec::<ByteBuf>::deserialize(de)?; + // FIXME(gmoshkin): redundant copy happens here, + // because ByteBuf and TupleBuffer are essentially the same struct, + // but there's no easy foolproof way + // to convert a Vec<ByteBuf> to Vec<TupleBuffer> + // because of borrow and drop checkers + let res: Result<_, TntError> = tmp + .into_iter() + .map(|bb| TupleBuffer::try_from(bb.into_vec())) + .collect(); + res.map_err(D::Error::custom) + } +} + ////////////////////////////////////////////////////////////////////////////////////////// /// Serializable struct representing a member of the raft group. #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] diff --git a/src/traft/storage.rs b/src/traft/storage.rs index 52da340d80..d6c7a86c8b 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -6,12 +6,13 @@ use ::raft::StorageError; use ::raft::INVALID_ID; use ::tarantool::index::IteratorType; use ::tarantool::space::Space; -use ::tarantool::tuple::Tuple; +use ::tarantool::tuple::{ToTupleBuffer, Tuple}; use ::tarantool::unwrap_or; use serde::de::DeserializeOwned; use serde::Serialize; use thiserror::Error; +use crate::define_str_enum; use crate::tlog; use crate::traft; use crate::traft::RaftId; @@ -20,9 +21,33 @@ use crate::traft::RaftTerm; pub struct Storage; -const RAFT_GROUP: &str = "raft_group"; -const RAFT_STATE: &str = "raft_state"; -const RAFT_LOG: &str = "raft_log"; +//////////////////////////////////////////////////////////////////////////////// +// RaftSpace +//////////////////////////////////////////////////////////////////////////////// + +define_str_enum! { + /// An enumeration of builtin raft spaces + pub enum RaftSpace { + Group = "raft_group", + State = "raft_state", + Log = "raft_log", + } + + FromStr::Err = UnknownRaftSpace; +} + +#[derive(Error, Debug)] +#[error("unknown raft space {0}")] +pub struct UnknownRaftSpace(pub String); + +// TODO(gmoshkin): remove this +const RAFT_GROUP: &str = RaftSpace::Group.as_str(); +const RAFT_STATE: &str = RaftSpace::State.as_str(); +const RAFT_LOG: &str = RaftSpace::Log.as_str(); + +//////////////////////////////////////////////////////////////////////////////// +// Error +//////////////////////////////////////////////////////////////////////////////// #[allow(clippy::enum_variant_names)] #[derive(Debug, Error)] @@ -414,6 +439,38 @@ impl Storage { Storage::persist_commit(hs.commit)?; Ok(()) } + + pub fn insert(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> { + Storage::space(space.as_str())? + .insert(tuple) + .map_err(box_err) + } + + pub fn replace(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> { + Storage::space(space.as_str())? + .replace(tuple) + .map_err(box_err) + } + + pub fn update( + space: RaftSpace, + key: &impl ToTupleBuffer, + ops: &[impl ToTupleBuffer], + ) -> Result<Option<Tuple>, StorageError> { + Storage::space(space.as_str())? + .update(key, ops) + .map_err(box_err) + } + + #[rustfmt::skip] + pub fn delete( + space: RaftSpace, + key: &impl ToTupleBuffer, + ) -> Result<Option<Tuple>, StorageError> { + Storage::space(space.as_str())? + .delete(key) + .map_err(box_err) + } } impl raft::Storage for Storage { -- GitLab From 8056973bb37710029166cd63f123fd925f923caa Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 8 Sep 2022 18:14:32 +0300 Subject: [PATCH 4/4] refactor: use DMLOp for replication_factor --- src/main.rs | 13 ++++++++++--- src/traft/mod.rs | 12 ------------ src/traft/storage.rs | 33 ++++++++++++++++++++++++++++----- 3 files changed, 38 insertions(+), 20 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9165c279d6..e2e497ee44 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,7 @@ use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; use std::convert::TryFrom; use std::time::{Duration, Instant}; +use traft::storage::{RaftSpace, RaftStateKey}; use traft::ExpelRequest; use clap::StructOpt as _; @@ -610,9 +611,15 @@ fn start_boot(args: &args::Run) { lc.inc(); init_entries.push({ let ctx = traft::EntryContextNormal { - op: traft::Op::PersistReplicationFactor { - replication_factor: args.init_replication_factor, - }, + op: traft::OpDML::insert( + RaftSpace::State, + &( + RaftStateKey::ReplicationFactor, + args.init_replication_factor, + ), + ) + .expect("cannot fail") + .into(), lc, }; let e = traft::Entry { diff --git a/src/traft/mod.rs b/src/traft/mod.rs index f5b6f74b86..6e8d89a4c8 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -87,11 +87,6 @@ pub enum Op { PersistPeer { peer: Peer, }, - - #[serde(alias = "persist_replication_factor")] - PersistReplicationFactor { - replication_factor: u8, - }, /// Cluster-wide data modification operation. /// Should be used to manipulate the cluster-wide configuration. Dml(OpDML), @@ -107,9 +102,6 @@ impl std::fmt::Display for Op { Self::PersistPeer { peer } => { write!(f, "PersistPeer{}", peer) } - Self::PersistReplicationFactor { replication_factor } => { - write!(f, "PersistReplicationFactor({replication_factor})") - } Self::Dml(OpDML::Insert { space, tuple }) => { write!(f, "Insert({space}, {})", DisplayAsJson(tuple)) } @@ -170,10 +162,6 @@ impl Op { Storage::persist_peer(peer).unwrap(); Box::new(peer.clone()) } - Self::PersistReplicationFactor { replication_factor } => { - Storage::persist_replication_factor(*replication_factor).unwrap(); - Box::new(()) - } Self::Dml(op) => Box::new(op.result()), } } diff --git a/src/traft/storage.rs b/src/traft/storage.rs index d6c7a86c8b..fe30988225 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -45,6 +45,33 @@ const RAFT_GROUP: &str = RaftSpace::Group.as_str(); const RAFT_STATE: &str = RaftSpace::State.as_str(); const RAFT_LOG: &str = RaftSpace::Log.as_str(); +//////////////////////////////////////////////////////////////////////////////// +// RaftStateKey +//////////////////////////////////////////////////////////////////////////////// + +define_str_enum! { + /// An enumeration of builtin raft spaces + pub enum RaftStateKey { + ReplicationFactor = "replication_factor", + Commit = "commit", + Applied = "applied", + Term = "term", + Vote = "vote", + Gen = "gen", + Voters = "voters", + Learners = "learners", + VotersOutgoing = "voters_outgoing", + LearnersNext = "learners_next", + AutoLeave = "auto_leave", + } + + FromStr::Err = UnknownRaftStateKey; +} + +#[derive(Error, Debug)] +#[error("unknown raft state key {0}")] +pub struct UnknownRaftStateKey(pub String); + //////////////////////////////////////////////////////////////////////////////// // Error //////////////////////////////////////////////////////////////////////////////// @@ -271,11 +298,7 @@ impl Storage { } pub fn replication_factor() -> Result<Option<u8>, StorageError> { - Storage::raft_state("replication_factor") - } - - pub fn persist_replication_factor(replication_factor: u8) -> Result<(), StorageError> { - Storage::persist_raft_state("replication_factor", replication_factor) + Storage::raft_state(RaftStateKey::ReplicationFactor.as_str()) } pub fn persist_commit(commit: RaftIndex) -> Result<(), StorageError> { -- GitLab