Skip to content
Snippets Groups Projects
Commit f0a44c89 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Georgy Moshkin
Browse files

refactor(storage): introduce cluster_state space

replication_factor now goes into cluster_state
parent 31e11162
No related branches found
No related tags found
1 merge request!241refactor(storage): introduce cluster_state space
Pipeline #12099 passed
......@@ -11,7 +11,7 @@ use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction;
use std::convert::TryFrom;
use std::time::{Duration, Instant};
use traft::storage::{RaftSpace, RaftStateKey};
use traft::storage::{ClusterSpace, StateKey};
use traft::ExpelRequest;
use traft::RaftSpaceAccess;
......@@ -619,11 +619,8 @@ fn start_boot(args: &args::Run) {
init_entries.push({
let ctx = traft::EntryContextNormal {
op: traft::OpDML::insert(
RaftSpace::State,
&(
RaftStateKey::ReplicationFactor,
args.init_replication_factor,
),
ClusterSpace::State,
&(StateKey::ReplicationFactor, args.init_replication_factor),
)
.expect("cannot fail")
.into(),
......
......@@ -29,7 +29,7 @@ use protobuf::Message as _;
pub use network::ConnectionPool;
pub use raft_storage::RaftSpaceAccess;
use storage::RaftSpace;
use storage::ClusterSpace;
pub use storage::Storage;
pub use topology::Topology;
......@@ -220,24 +220,24 @@ pub trait OpResult {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum OpDML {
Insert {
space: RaftSpace,
space: ClusterSpace,
#[serde(with = "serde_bytes")]
tuple: TupleBuffer,
},
Replace {
space: RaftSpace,
space: ClusterSpace,
#[serde(with = "serde_bytes")]
tuple: TupleBuffer,
},
Update {
space: RaftSpace,
space: ClusterSpace,
#[serde(with = "serde_bytes")]
key: TupleBuffer,
#[serde(with = "vec_of_raw_byte_buf")]
ops: Vec<TupleBuffer>,
},
Delete {
space: RaftSpace,
space: ClusterSpace,
#[serde(with = "serde_bytes")]
key: TupleBuffer,
},
......@@ -263,7 +263,7 @@ impl From<OpDML> for Op {
impl OpDML {
/// Serializes `tuple` and returns an [`OpDML::Insert`] in case of success.
pub fn insert(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
pub fn insert(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
let res = Self::Insert {
space,
tuple: tuple.to_tuple_buffer()?,
......@@ -272,7 +272,7 @@ impl OpDML {
}
/// Serializes `tuple` and returns an [`OpDML::Replace`] in case of success.
pub fn replace(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
pub fn replace(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> Result<Self, TntError> {
let res = Self::Replace {
space,
tuple: tuple.to_tuple_buffer()?,
......@@ -282,7 +282,7 @@ impl OpDML {
/// Serializes `key` and returns an [`OpDML::Update`] in case of success.
pub fn update(
space: RaftSpace,
space: ClusterSpace,
key: &impl ToTupleBuffer,
ops: Vec<TupleBuffer>,
) -> Result<Self, TntError> {
......@@ -295,7 +295,7 @@ impl OpDML {
}
/// Serializes `key` and returns an [`OpDML::Delete`] in case of success.
pub fn delete(space: RaftSpace, key: &impl ToTupleBuffer) -> Result<Self, TntError> {
pub fn delete(space: ClusterSpace, key: &impl ToTupleBuffer) -> Result<Self, TntError> {
let res = Self::Delete {
space,
key: key.to_tuple_buffer()?,
......
......@@ -2,8 +2,7 @@ use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::index::IteratorType;
use ::tarantool::space::Space;
use ::tarantool::tuple::{ToTupleBuffer, Tuple};
use serde::de::DeserializeOwned;
use ::tarantool::tuple::{DecodeOwned, ToTupleBuffer, Tuple};
use thiserror::Error;
use crate::define_str_enum;
......@@ -14,54 +13,42 @@ use crate::traft::RaftIndex;
pub struct Storage;
////////////////////////////////////////////////////////////////////////////////
// RaftSpace
// ClusterSpace
////////////////////////////////////////////////////////////////////////////////
define_str_enum! {
/// An enumeration of builtin raft spaces
pub enum RaftSpace {
/// An enumeration of builtin cluster-wide spaces
pub enum ClusterSpace {
Group = "raft_group",
State = "raft_state",
Log = "raft_log",
State = "cluster_state",
}
FromStr::Err = UnknownRaftSpace;
FromStr::Err = UnknownClusterSpace;
}
#[derive(Error, Debug)]
#[error("unknown raft space {0}")]
pub struct UnknownRaftSpace(pub String);
#[error("unknown cluster space {0}")]
pub struct UnknownClusterSpace(pub String);
// TODO(gmoshkin): remove this
const RAFT_GROUP: &str = RaftSpace::Group.as_str();
const RAFT_STATE: &str = RaftSpace::State.as_str();
const RAFT_GROUP: &str = ClusterSpace::Group.as_str();
////////////////////////////////////////////////////////////////////////////////
// RaftStateKey
// StateKey
////////////////////////////////////////////////////////////////////////////////
define_str_enum! {
/// An enumeration of builtin raft spaces
pub enum RaftStateKey {
pub enum StateKey {
ReplicationFactor = "replication_factor",
Commit = "commit",
Applied = "applied",
Term = "term",
Vote = "vote",
Gen = "gen",
Voters = "voters",
Learners = "learners",
VotersOutgoing = "voters_outgoing",
LearnersNext = "learners_next",
AutoLeave = "auto_leave",
}
FromStr::Err = UnknownRaftStateKey;
FromStr::Err = UnknownStateKey;
}
#[derive(Error, Debug)]
#[error("unknown raft state key {0}")]
pub struct UnknownRaftStateKey(pub String);
#[error("unknown state key {0}")]
pub struct UnknownStateKey(pub String);
////////////////////////////////////////////////////////////////////////////////
// Error
......@@ -82,9 +69,26 @@ fn box_err(e: impl std::error::Error + Sync + Send + 'static) -> StorageError {
impl Storage {
pub fn init_schema() {
crate::tarantool::eval(
r#"
box.schema.space.create('raft_group', {
::tarantool::lua_state()
.exec_with(
r#"
local STATE, GROUP = ...
box.schema.space.create(STATE, {
if_not_exists = true,
is_local = true,
format = {
{name = 'key', type = 'string', is_nullable = false},
{name = 'value', type = 'any', is_nullable = false},
}
})
box.space[STATE]:create_index('pk', {
if_not_exists = true,
parts = {{'key'}},
unique = true,
})
box.schema.space.create(GROUP, {
if_not_exists = true,
is_local = true,
format = {
......@@ -100,34 +104,40 @@ impl Storage {
{name = 'failure_domain', type = 'map', is_nullable = false},
}
})
box.space.raft_group:create_index('instance_id', {
box.space[GROUP]:create_index('instance_id', {
if_not_exists = true,
parts = {{'instance_id'}},
unique = true,
})
box.space.raft_group:create_index('raft_id', {
box.space[GROUP]:create_index('raft_id', {
if_not_exists = true,
parts = {{'raft_id'}},
unique = true,
})
box.space.raft_group:create_index('replicaset_id', {
box.space[GROUP]:create_index('replicaset_id', {
if_not_exists = true,
parts = {{'replicaset_id'}, {'commit_index'}},
unique = false,
})
"#,
)
.unwrap();
(ClusterSpace::State, RAFT_GROUP),
)
.unwrap();
}
fn space(name: &str) -> Result<Space, StorageError> {
Space::find(name)
fn space(name: impl AsRef<str> + Into<String>) -> Result<Space, StorageError> {
Space::find(name.as_ref())
.ok_or_else(|| Error::NoSuchSpace(name.into()))
.map_err(box_err)
}
fn raft_state<T: DeserializeOwned>(key: &str) -> Result<Option<T>, StorageError> {
let tuple: Option<Tuple> = Storage::space(RAFT_STATE)?.get(&(key,)).map_err(box_err)?;
fn cluster_state<T>(key: StateKey) -> Result<Option<T>, StorageError>
where
T: DecodeOwned,
{
let tuple: Option<Tuple> = Storage::space(ClusterSpace::State)?
.get(&(key,))
.map_err(box_err)?;
match tuple {
Some(t) => t.field(1).map_err(box_err),
......@@ -216,8 +226,9 @@ impl Storage {
Ok(ret)
}
#[inline]
pub fn replication_factor() -> Result<Option<u8>, StorageError> {
Storage::raft_state(RaftStateKey::ReplicationFactor.as_str())
Storage::cluster_state(StateKey::ReplicationFactor)
}
pub fn persist_peer(peer: &traft::Peer) -> Result<(), StorageError> {
......@@ -235,20 +246,20 @@ impl Storage {
Ok(())
}
pub fn insert(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
pub fn insert(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
Storage::space(space.as_str())?
.insert(tuple)
.map_err(box_err)
}
pub fn replace(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
pub fn replace(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> {
Storage::space(space.as_str())?
.replace(tuple)
.map_err(box_err)
}
pub fn update(
space: RaftSpace,
space: ClusterSpace,
key: &impl ToTupleBuffer,
ops: &[impl ToTupleBuffer],
) -> Result<Option<Tuple>, StorageError> {
......@@ -259,7 +270,7 @@ impl Storage {
#[rustfmt::skip]
pub fn delete(
space: RaftSpace,
space: ClusterSpace,
key: &impl ToTupleBuffer,
) -> Result<Option<Tuple>, StorageError> {
Storage::space(space.as_str())?
......
......@@ -54,6 +54,18 @@ macro_rules! define_str_enum {
}
}
impl AsRef<str> for $enum {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl From<$enum> for String {
fn from(e: $enum) -> Self {
e.as_str().into()
}
}
impl std::str::FromStr for $enum {
type Err = $err;
......@@ -88,10 +100,34 @@ macro_rules! define_str_enum {
{
use serde::de::Error;
let tmp = <&str>::deserialize(deserializer)?;
let res = tmp.parse().map_err(|e| D::Error::custom(e))?;
let res = tmp.parse().map_err(|_| Error::unknown_variant(tmp, &[$($str),+]))?;
Ok(res)
}
}
impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::Push<L> for $enum {
type Err = ::tarantool::tlua::Void;
fn push_to_lua(&self, lua: L) -> ::tarantool::tlua::PushResult<L, Self> {
::tarantool::tlua::PushInto::push_into_lua(self.as_str(), lua)
}
}
impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::PushOne<L> for $enum {}
impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::PushInto<L> for $enum {
type Err = ::tarantool::tlua::Void;
fn push_into_lua(self, lua: L) -> ::tarantool::tlua::PushIntoResult<L, Self> {
::tarantool::tlua::PushInto::push_into_lua(self.as_str(), lua)
}
}
impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::PushOneInto<L> for $enum {}
impl<L: ::tarantool::tlua::AsLua> ::tarantool::tlua::LuaRead<L> for $enum {
fn lua_read_at_position(lua: L, index: std::num::NonZeroI32) -> Result<Self, L> {
::tarantool::tlua::StringInLua::lua_read_at_position(&lua, index).ok()
.and_then(|s| s.parse().ok())
.ok_or(lua)
}
}
}
}
......
......@@ -182,7 +182,7 @@ def test_init_replication_factor(cluster: Cluster):
# Scenario: first instance shares --init-replication-factor to the whole cluster
# Given an Leader instance with --init_replication_factor=2
# When a new instances with different --init-replication-factor joins to the cluster
# Then all of them have raft_state[replication_factor] equals to the Leader
# Then all of them have cluster_state[replication_factor] equals to the Leader
# And there are two replicasets in the cluster
i1 = cluster.add_instance(init_replication_factor=2)
......@@ -190,7 +190,9 @@ def test_init_replication_factor(cluster: Cluster):
i3 = cluster.add_instance(init_replication_factor=4)
def read_replication_factor(instance):
return instance.eval('return box.space.raft_state:get("replication_factor")')[1]
return instance.eval(
'return box.space.cluster_state:get("replication_factor")'
)[1]
assert read_replication_factor(i1) == 2
assert read_replication_factor(i2) == 2
......
......@@ -27,7 +27,7 @@ def test_log_rollback(cluster3: Cluster):
i3.assert_raft_status("Follower")
def propose_state_change(srv: Instance, value):
code = 'box.space.raft_state:put({"test-timeline", "%s"})' % value
code = 'box.space.cluster_state:put({"test-timeline", "%s"})' % value
return srv.raft_propose_eval(code, 0.1)
propose_state_change(i1, "i1 is a leader")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment