Skip to content
Snippets Groups Projects
Commit 02250914 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Yaroslav Dynnikov
Browse files

refactor(storage): storage::ClusterSpace -> storage::ClusterwideSpace

parent 9e2edd9c
No related branches found
No related tags found
1 merge request!393179/clusterwide storage
Pipeline #13921 passed
...@@ -12,7 +12,7 @@ use ::tarantool::transaction::start_transaction; ...@@ -12,7 +12,7 @@ use ::tarantool::transaction::start_transaction;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use storage::Clusterwide; use storage::Clusterwide;
use storage::{ClusterSpace, StateKey}; use storage::{ClusterwideSpace, StateKey};
use traft::rpc; use traft::rpc;
use traft::RaftSpaceAccess; use traft::RaftSpaceAccess;
...@@ -396,7 +396,7 @@ fn picolib_setup(args: &args::Run) { ...@@ -396,7 +396,7 @@ fn picolib_setup(args: &args::Run) {
"add_migration", "add_migration",
tlua::function2(|id: u64, body: String| -> traft::Result<()> { tlua::function2(|id: u64, body: String| -> traft::Result<()> {
let migration = Migration { id, body }; let migration = Migration { id, body };
let op = OpDML::insert(ClusterSpace::Migrations, &migration)?; let op = OpDML::insert(ClusterwideSpace::Migrations, &migration)?;
node::global()?.propose_and_wait(op, Duration::MAX)??; node::global()?.propose_and_wait(op, Duration::MAX)??;
Ok(()) Ok(())
}), }),
...@@ -405,7 +405,10 @@ fn picolib_setup(args: &args::Run) { ...@@ -405,7 +405,10 @@ fn picolib_setup(args: &args::Run) {
luamod.set( luamod.set(
"push_schema_version", "push_schema_version",
tlua::function1(|id: u64| -> traft::Result<()> { tlua::function1(|id: u64| -> traft::Result<()> {
let op = OpDML::replace(ClusterSpace::State, &(StateKey::DesiredSchemaVersion, id))?; let op = OpDML::replace(
ClusterwideSpace::State,
&(StateKey::DesiredSchemaVersion, id),
)?;
node::global()?.propose_and_wait(op, Duration::MAX)??; node::global()?.propose_and_wait(op, Duration::MAX)??;
Ok(()) Ok(())
}), }),
...@@ -419,7 +422,10 @@ fn picolib_setup(args: &args::Run) { ...@@ -419,7 +422,10 @@ fn picolib_setup(args: &args::Run) {
Some(m) => m.id, Some(m) => m.id,
None => return Ok(()), None => return Ok(()),
}; };
let op = OpDML::replace(ClusterSpace::State, &(StateKey::DesiredSchemaVersion, id))?; let op = OpDML::replace(
ClusterwideSpace::State,
&(StateKey::DesiredSchemaVersion, id),
)?;
node.propose_and_wait(op, Duration::MAX)??; node.propose_and_wait(op, Duration::MAX)??;
event::wait(Event::MigrateDone) event::wait(Event::MigrateDone)
}), }),
...@@ -816,7 +822,7 @@ fn start_boot(args: &args::Run) { ...@@ -816,7 +822,7 @@ fn start_boot(args: &args::Run) {
init_entries_push_op( init_entries_push_op(
traft::OpDML::insert( traft::OpDML::insert(
ClusterSpace::Addresses, ClusterwideSpace::Addresses,
&traft::PeerAddress { raft_id, address }, &traft::PeerAddress { raft_id, address },
) )
.expect("cannot fail") .expect("cannot fail")
...@@ -825,16 +831,19 @@ fn start_boot(args: &args::Run) { ...@@ -825,16 +831,19 @@ fn start_boot(args: &args::Run) {
init_entries_push_op(traft::Op::persist_peer(peer)); init_entries_push_op(traft::Op::persist_peer(peer));
init_entries_push_op( init_entries_push_op(
OpDML::insert( OpDML::insert(
ClusterSpace::State, ClusterwideSpace::State,
&(StateKey::ReplicationFactor, args.init_replication_factor), &(StateKey::ReplicationFactor, args.init_replication_factor),
) )
.expect("cannot fail") .expect("cannot fail")
.into(), .into(),
); );
init_entries_push_op( init_entries_push_op(
OpDML::insert(ClusterSpace::State, &(StateKey::DesiredSchemaVersion, 0)) OpDML::insert(
.expect("cannot fail") ClusterwideSpace::State,
.into(), &(StateKey::DesiredSchemaVersion, 0),
)
.expect("cannot fail")
.into(),
); );
init_entries.push({ init_entries.push({
......
...@@ -10,7 +10,7 @@ pub mod rpc; ...@@ -10,7 +10,7 @@ pub mod rpc;
pub mod topology; pub mod topology;
use crate::storage; use crate::storage;
use crate::storage::ClusterSpace; use crate::storage::ClusterwideSpace;
use crate::stringify_debug; use crate::stringify_debug;
use crate::util::{AnyWithTypeName, Uppercase}; use crate::util::{AnyWithTypeName, Uppercase};
use ::raft::prelude as raft; use ::raft::prelude as raft;
...@@ -190,7 +190,7 @@ impl Op { ...@@ -190,7 +190,7 @@ impl Op {
} }
Self::Dml(op) => { Self::Dml(op) => {
let res = Box::new(op.result()); let res = Box::new(op.result());
if op.space() == &ClusterSpace::State { if op.space() == &ClusterwideSpace::State {
event::broadcast(Event::ClusterStateChanged); event::broadcast(Event::ClusterStateChanged);
} }
res res
...@@ -256,24 +256,24 @@ pub trait OpResult { ...@@ -256,24 +256,24 @@ pub trait OpResult {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum OpDML { pub enum OpDML {
Insert { Insert {
space: ClusterSpace, space: ClusterwideSpace,
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
tuple: TupleBuffer, tuple: TupleBuffer,
}, },
Replace { Replace {
space: ClusterSpace, space: ClusterwideSpace,
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
tuple: TupleBuffer, tuple: TupleBuffer,
}, },
Update { Update {
space: ClusterSpace, space: ClusterwideSpace,
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
key: TupleBuffer, key: TupleBuffer,
#[serde(with = "vec_of_raw_byte_buf")] #[serde(with = "vec_of_raw_byte_buf")]
ops: Vec<TupleBuffer>, ops: Vec<TupleBuffer>,
}, },
Delete { Delete {
space: ClusterSpace, space: ClusterwideSpace,
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
key: TupleBuffer, key: TupleBuffer,
}, },
...@@ -299,7 +299,7 @@ impl From<OpDML> for Op { ...@@ -299,7 +299,7 @@ impl From<OpDML> for Op {
impl OpDML { impl OpDML {
/// Serializes `tuple` and returns an [`OpDML::Insert`] in case of success. /// Serializes `tuple` and returns an [`OpDML::Insert`] in case of success.
pub fn insert(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { pub fn insert(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> {
let res = Self::Insert { let res = Self::Insert {
space, space,
tuple: tuple.to_tuple_buffer()?, tuple: tuple.to_tuple_buffer()?,
...@@ -308,7 +308,7 @@ impl OpDML { ...@@ -308,7 +308,7 @@ impl OpDML {
} }
/// Serializes `tuple` and returns an [`OpDML::Replace`] in case of success. /// Serializes `tuple` and returns an [`OpDML::Replace`] in case of success.
pub fn replace(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { pub fn replace(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> {
let res = Self::Replace { let res = Self::Replace {
space, space,
tuple: tuple.to_tuple_buffer()?, tuple: tuple.to_tuple_buffer()?,
...@@ -318,7 +318,7 @@ impl OpDML { ...@@ -318,7 +318,7 @@ impl OpDML {
/// Serializes `key` and returns an [`OpDML::Update`] in case of success. /// Serializes `key` and returns an [`OpDML::Update`] in case of success.
pub fn update( pub fn update(
space: ClusterSpace, space: ClusterwideSpace,
key: &impl ToTupleBuffer, key: &impl ToTupleBuffer,
ops: impl Into<Vec<TupleBuffer>>, ops: impl Into<Vec<TupleBuffer>>,
) -> tarantool::Result<Self> { ) -> tarantool::Result<Self> {
...@@ -331,7 +331,7 @@ impl OpDML { ...@@ -331,7 +331,7 @@ impl OpDML {
} }
/// Serializes `key` and returns an [`OpDML::Delete`] in case of success. /// Serializes `key` and returns an [`OpDML::Delete`] in case of success.
pub fn delete(space: ClusterSpace, key: &impl ToTupleBuffer) -> tarantool::Result<Self> { pub fn delete(space: ClusterwideSpace, key: &impl ToTupleBuffer) -> tarantool::Result<Self> {
let res = Self::Delete { let res = Self::Delete {
space, space,
key: key.to_tuple_buffer()?, key: key.to_tuple_buffer()?,
...@@ -340,7 +340,7 @@ impl OpDML { ...@@ -340,7 +340,7 @@ impl OpDML {
} }
#[rustfmt::skip] #[rustfmt::skip]
pub fn space(&self) -> &ClusterSpace { pub fn space(&self) -> &ClusterwideSpace {
match &self { match &self {
Self::Insert { space, .. } => space, Self::Insert { space, .. } => space,
Self::Replace { space, .. } => space, Self::Replace { space, .. } => space,
......
...@@ -31,7 +31,7 @@ use crate::governor::raft_conf_change; ...@@ -31,7 +31,7 @@ use crate::governor::raft_conf_change;
use crate::governor::waiting_migrations; use crate::governor::waiting_migrations;
use crate::kvcell::KVCell; use crate::kvcell::KVCell;
use crate::r#loop::{FlowControl, Loop}; use crate::r#loop::{FlowControl, Loop};
use crate::storage::{ClusterSpace, Clusterwide, StateKey}; use crate::storage::{Clusterwide, ClusterwideSpace, StateKey};
use crate::stringify_cfunc; use crate::stringify_cfunc;
use crate::traft::rpc; use crate::traft::rpc;
use crate::traft::ContextCoercion as _; use crate::traft::ContextCoercion as _;
...@@ -489,7 +489,7 @@ impl NodeImpl { ...@@ -489,7 +489,7 @@ impl NodeImpl {
address, address,
}; };
let op = let op =
OpDML::replace(ClusterSpace::Addresses, &peer_address).expect("can't fail"); OpDML::replace(ClusterwideSpace::Addresses, &peer_address).expect("can't fail");
let (lc, notify) = self.schedule_notification(); let (lc, notify) = self.schedule_notification();
notify_for_address = Some(notify); notify_for_address = Some(notify);
let ctx = traft::EntryContextNormal::new(lc, op); let ctx = traft::EntryContextNormal::new(lc, op);
...@@ -1048,7 +1048,8 @@ fn raft_conf_change_loop( ...@@ -1048,7 +1048,8 @@ fn raft_conf_change_loop(
let mut ops = UpdateOps::new(); let mut ops = UpdateOps::new();
ops.assign("master_id", &peer.instance_id)?; ops.assign("master_id", &peer.instance_id)?;
let op = OpDML::update(ClusterSpace::Replicasets, &[replicaset_id], ops)?; let op =
OpDML::update(ClusterwideSpace::Replicasets, &[replicaset_id], ops)?;
tlog!(Info, "proposing replicaset master change"; "op" => ?op); tlog!(Info, "proposing replicaset master change"; "op" => ?op);
// TODO: don't hard code the timeout // TODO: don't hard code the timeout
node.propose_and_wait(op, Duration::from_secs(3))??; node.propose_and_wait(op, Duration::from_secs(3))??;
...@@ -1282,7 +1283,7 @@ fn raft_conf_change_loop( ...@@ -1282,7 +1283,7 @@ fn raft_conf_change_loop(
} else { } else {
let vshard_bootstrapped = storage.state.vshard_bootstrapped()?; let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
let req = traft::OpDML::insert( let req = traft::OpDML::insert(
ClusterSpace::Replicasets, ClusterwideSpace::Replicasets,
&traft::Replicaset { &traft::Replicaset {
replicaset_id: peer.replicaset_id.clone(), replicaset_id: peer.replicaset_id.clone(),
replicaset_uuid: peer.replicaset_uuid.clone(), replicaset_uuid: peer.replicaset_uuid.clone(),
...@@ -1367,7 +1368,7 @@ fn raft_conf_change_loop( ...@@ -1367,7 +1368,7 @@ fn raft_conf_change_loop(
// gets reconfigured // gets reconfigured
node.propose_and_wait( node.propose_and_wait(
traft::OpDML::replace( traft::OpDML::replace(
ClusterSpace::State, ClusterwideSpace::State,
&(StateKey::VshardBootstrapped, true), &(StateKey::VshardBootstrapped, true),
)?, )?,
// TODO: don't hard code the timeout // TODO: don't hard code the timeout
...@@ -1432,7 +1433,11 @@ fn raft_conf_change_loop( ...@@ -1432,7 +1433,11 @@ fn raft_conf_change_loop(
let mut ops = UpdateOps::new(); let mut ops = UpdateOps::new();
ops.assign("weight", weight)?; ops.assign("weight", weight)?;
node.propose_and_wait( node.propose_and_wait(
traft::OpDML::update(ClusterSpace::Replicasets, &[replicaset_id], ops)?, traft::OpDML::update(
ClusterwideSpace::Replicasets,
&[replicaset_id],
ops,
)?,
// TODO: don't hard code the timeout // TODO: don't hard code the timeout
Duration::from_secs(3), Duration::from_secs(3),
)??; )??;
...@@ -1525,7 +1530,7 @@ fn raft_conf_change_loop( ...@@ -1525,7 +1530,7 @@ fn raft_conf_change_loop(
let mut ops = UpdateOps::new(); let mut ops = UpdateOps::new();
ops.assign("current_schema_version", migration.id).unwrap(); ops.assign("current_schema_version", migration.id).unwrap();
let op = OpDML::update( let op = OpDML::update(
ClusterSpace::Replicasets, ClusterwideSpace::Replicasets,
&[replicaset.replicaset_id.clone()], &[replicaset.replicaset_id.clone()],
ops, ops,
) )
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment