From 022509140b211ccaf349971e9b8352bab04a906b Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 29 Nov 2022 14:49:35 +0300 Subject: [PATCH] refactor(storage): storage::ClusterSpace -> storage::ClusterwideSpace --- src/main.rs | 27 ++++++++++++++++++--------- src/traft/mod.rs | 22 +++++++++++----------- src/traft/node.rs | 19 ++++++++++++------- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/src/main.rs b/src/main.rs index 894e0fe77e..221ad6bfbc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use ::tarantool::transaction::start_transaction; use std::convert::TryFrom; use std::time::{Duration, Instant}; use storage::Clusterwide; -use storage::{ClusterSpace, StateKey}; +use storage::{ClusterwideSpace, StateKey}; use traft::rpc; use traft::RaftSpaceAccess; @@ -396,7 +396,7 @@ fn picolib_setup(args: &args::Run) { "add_migration", tlua::function2(|id: u64, body: String| -> traft::Result<()> { let migration = Migration { id, body }; - let op = OpDML::insert(ClusterSpace::Migrations, &migration)?; + let op = OpDML::insert(ClusterwideSpace::Migrations, &migration)?; node::global()?.propose_and_wait(op, Duration::MAX)??; Ok(()) }), @@ -405,7 +405,10 @@ fn picolib_setup(args: &args::Run) { luamod.set( "push_schema_version", tlua::function1(|id: u64| -> traft::Result<()> { - let op = OpDML::replace(ClusterSpace::State, &(StateKey::DesiredSchemaVersion, id))?; + let op = OpDML::replace( + ClusterwideSpace::State, + &(StateKey::DesiredSchemaVersion, id), + )?; node::global()?.propose_and_wait(op, Duration::MAX)??; Ok(()) }), @@ -419,7 +422,10 @@ fn picolib_setup(args: &args::Run) { Some(m) => m.id, None => return Ok(()), }; - let op = OpDML::replace(ClusterSpace::State, &(StateKey::DesiredSchemaVersion, id))?; + let op = OpDML::replace( + ClusterwideSpace::State, + &(StateKey::DesiredSchemaVersion, id), + )?; node.propose_and_wait(op, Duration::MAX)??; event::wait(Event::MigrateDone) }), @@ -816,7 +822,7 @@ fn start_boot(args: &args::Run) { init_entries_push_op( traft::OpDML::insert( - ClusterSpace::Addresses, + ClusterwideSpace::Addresses, &traft::PeerAddress { raft_id, address }, ) .expect("cannot fail") @@ -825,16 +831,19 @@ fn start_boot(args: &args::Run) { init_entries_push_op(traft::Op::persist_peer(peer)); init_entries_push_op( OpDML::insert( - ClusterSpace::State, + ClusterwideSpace::State, &(StateKey::ReplicationFactor, args.init_replication_factor), ) .expect("cannot fail") .into(), ); init_entries_push_op( - OpDML::insert(ClusterSpace::State, &(StateKey::DesiredSchemaVersion, 0)) - .expect("cannot fail") - .into(), + OpDML::insert( + ClusterwideSpace::State, + &(StateKey::DesiredSchemaVersion, 0), + ) + .expect("cannot fail") + .into(), ); init_entries.push({ diff --git a/src/traft/mod.rs b/src/traft/mod.rs index ef9627ab39..8a355feaa6 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -10,7 +10,7 @@ pub mod rpc; pub mod topology; use crate::storage; -use crate::storage::ClusterSpace; +use crate::storage::ClusterwideSpace; use crate::stringify_debug; use crate::util::{AnyWithTypeName, Uppercase}; use ::raft::prelude as raft; @@ -190,7 +190,7 @@ impl Op { } Self::Dml(op) => { let res = Box::new(op.result()); - if op.space() == &ClusterSpace::State { + if op.space() == &ClusterwideSpace::State { event::broadcast(Event::ClusterStateChanged); } res @@ -256,24 +256,24 @@ pub trait OpResult { #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum OpDML { Insert { - space: ClusterSpace, + space: ClusterwideSpace, #[serde(with = "serde_bytes")] tuple: TupleBuffer, }, Replace { - space: ClusterSpace, + space: ClusterwideSpace, #[serde(with = "serde_bytes")] tuple: TupleBuffer, }, Update { - space: ClusterSpace, + space: ClusterwideSpace, #[serde(with = "serde_bytes")] key: TupleBuffer, #[serde(with = "vec_of_raw_byte_buf")] ops: Vec<TupleBuffer>, }, Delete { - space: ClusterSpace, + space: ClusterwideSpace, #[serde(with = "serde_bytes")] key: TupleBuffer, }, @@ -299,7 +299,7 @@ impl From<OpDML> for Op { impl OpDML { /// Serializes `tuple` and returns an [`OpDML::Insert`] in case of success. - pub fn insert(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { + pub fn insert(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { let res = Self::Insert { space, tuple: tuple.to_tuple_buffer()?, @@ -308,7 +308,7 @@ impl OpDML { } /// Serializes `tuple` and returns an [`OpDML::Replace`] in case of success. - pub fn replace(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { + pub fn replace(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { let res = Self::Replace { space, tuple: tuple.to_tuple_buffer()?, @@ -318,7 +318,7 @@ impl OpDML { /// Serializes `key` and returns an [`OpDML::Update`] in case of success. pub fn update( - space: ClusterSpace, + space: ClusterwideSpace, key: &impl ToTupleBuffer, ops: impl Into<Vec<TupleBuffer>>, ) -> tarantool::Result<Self> { @@ -331,7 +331,7 @@ impl OpDML { } /// Serializes `key` and returns an [`OpDML::Delete`] in case of success. - pub fn delete(space: ClusterSpace, key: &impl ToTupleBuffer) -> tarantool::Result<Self> { + pub fn delete(space: ClusterwideSpace, key: &impl ToTupleBuffer) -> tarantool::Result<Self> { let res = Self::Delete { space, key: key.to_tuple_buffer()?, @@ -340,7 +340,7 @@ impl OpDML { } #[rustfmt::skip] - pub fn space(&self) -> &ClusterSpace { + pub fn space(&self) -> &ClusterwideSpace { match &self { Self::Insert { space, .. } => space, Self::Replace { space, .. } => space, diff --git a/src/traft/node.rs b/src/traft/node.rs index 2c181b44fd..a1eb3b8633 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -31,7 +31,7 @@ use crate::governor::raft_conf_change; use crate::governor::waiting_migrations; use crate::kvcell::KVCell; use crate::r#loop::{FlowControl, Loop}; -use crate::storage::{ClusterSpace, Clusterwide, StateKey}; +use crate::storage::{Clusterwide, ClusterwideSpace, StateKey}; use crate::stringify_cfunc; use crate::traft::rpc; use crate::traft::ContextCoercion as _; @@ -489,7 +489,7 @@ impl NodeImpl { address, }; let op = - OpDML::replace(ClusterSpace::Addresses, &peer_address).expect("can't fail"); + OpDML::replace(ClusterwideSpace::Addresses, &peer_address).expect("can't fail"); let (lc, notify) = self.schedule_notification(); notify_for_address = Some(notify); let ctx = traft::EntryContextNormal::new(lc, op); @@ -1048,7 +1048,8 @@ fn raft_conf_change_loop( let mut ops = UpdateOps::new(); ops.assign("master_id", &peer.instance_id)?; - let op = OpDML::update(ClusterSpace::Replicasets, &[replicaset_id], ops)?; + let op = + OpDML::update(ClusterwideSpace::Replicasets, &[replicaset_id], ops)?; tlog!(Info, "proposing replicaset master change"; "op" => ?op); // TODO: don't hard code the timeout node.propose_and_wait(op, Duration::from_secs(3))??; @@ -1282,7 +1283,7 @@ fn raft_conf_change_loop( } else { let vshard_bootstrapped = storage.state.vshard_bootstrapped()?; let req = traft::OpDML::insert( - ClusterSpace::Replicasets, + ClusterwideSpace::Replicasets, &traft::Replicaset { replicaset_id: peer.replicaset_id.clone(), replicaset_uuid: peer.replicaset_uuid.clone(), @@ -1367,7 +1368,7 @@ fn raft_conf_change_loop( // gets reconfigured node.propose_and_wait( traft::OpDML::replace( - ClusterSpace::State, + ClusterwideSpace::State, &(StateKey::VshardBootstrapped, true), )?, // TODO: don't hard code the timeout @@ -1432,7 +1433,11 @@ fn raft_conf_change_loop( let mut ops = UpdateOps::new(); ops.assign("weight", weight)?; node.propose_and_wait( - traft::OpDML::update(ClusterSpace::Replicasets, &[replicaset_id], ops)?, + traft::OpDML::update( + ClusterwideSpace::Replicasets, + &[replicaset_id], + ops, + )?, // TODO: don't hard code the timeout Duration::from_secs(3), )??; @@ -1525,7 +1530,7 @@ fn raft_conf_change_loop( let mut ops = UpdateOps::new(); ops.assign("current_schema_version", migration.id).unwrap(); let op = OpDML::update( - ClusterSpace::Replicasets, + ClusterwideSpace::Replicasets, &[replicaset.replicaset_id.clone()], ops, ) -- GitLab