Skip to content
Snippets Groups Projects
Commit ac4fa22c authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: extract ddl_create_space_on_master function

parent 44b28d1c
No related branches found
No related tags found
1 merge request!555Feat/ddl drop space
use crate::op::Ddl;
use crate::storage::ddl_create_space_on_master;
use crate::storage::Clusterwide;
use crate::storage::{local_schema_version, set_local_schema_version};
use crate::tlog;
......@@ -9,7 +10,6 @@ use crate::traft::{RaftIndex, RaftTerm};
use std::time::Duration;
use tarantool::error::{TarantoolError, TarantoolErrorCode};
use tarantool::ffi::tarantool as ffi;
use tarantool::space::{Space, SystemSpace};
crate::define_rpc_request! {
fn proc_apply_schema_change(req: Request) -> Result<Response> {
......@@ -82,54 +82,11 @@ crate::define_rpc_request! {
// TODO: move this to crate::schema maybe?
pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Result<Response> {
debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
let sys_space = Space::from(SystemSpace::Space);
let sys_index = Space::from(SystemSpace::Index);
match *ddl {
Ddl::CreateSpace { id, .. } => {
let pico_space_def = storage
.spaces
.get(id)?
.ok_or_else(|| Error::other(format!("space with id #{id} not found")))?;
// TODO: set defaults
let tt_space_def = pico_space_def.to_space_metadata()?;
let pico_pk_def = storage.indexes.get(id, 0)?.ok_or_else(|| {
Error::other(format!(
"primary index for space {} not found",
pico_space_def.name
))
})?;
let tt_pk_def = pico_pk_def.to_index_metadata();
// For now we just assume that during space creation index with id 1
// exists if and only if it is a bucket_id index.
let mut tt_bucket_id_def = None;
let pico_bucket_id_def = storage.indexes.get(id, 1)?;
if let Some(def) = &pico_bucket_id_def {
tt_bucket_id_def = Some(def.to_index_metadata());
}
let res = (|| -> tarantool::Result<()> {
if tt_pk_def.parts.is_empty() {
return Err(tarantool::set_and_get_error!(
tarantool::error::TarantoolErrorCode::ModifyIndex,
"can't create index '{}' in space '{}': parts list cannot be empty",
tt_pk_def.name,
tt_space_def.name,
)
.into());
}
sys_space.insert(&tt_space_def)?;
sys_index.insert(&tt_pk_def)?;
if let Some(def) = tt_bucket_id_def {
sys_index.insert(&def)?;
}
set_local_schema_version(version)?;
Ok(())
})();
if let Err(e) = res {
let abort_reason = ddl_create_space_on_master(storage, id)?;
if let Some(e) = abort_reason {
// We return Ok(error) because currently this is the only
// way to report an application level error.
return Ok(Response::Abort {
......@@ -142,5 +99,13 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re
}
}
if let Err(e) = set_local_schema_version(version) {
// We return Ok(error) because currently this is the only
// way to report an application level error.
return Ok(Response::Abort {
reason: e.to_string(),
});
}
Ok(Response::Ok)
}
......@@ -591,9 +591,6 @@ impl Clusterwide {
}
pub fn apply_ddl_changes_on_replicaset_master(&self) -> traft::Result<()> {
let sys_space = Space::from(SystemSpace::Space);
let sys_index = Space::from(SystemSpace::Index);
for space_def in self.spaces.iter()? {
if !space_def.operable {
// If it so happens, that we receive an unfinished schema change via snapshot,
......@@ -602,29 +599,12 @@ impl Clusterwide {
continue;
}
let Some(pk_def) = self.indexes.get(space_def.id, 0)? else {
crate::warn_or_panic!("a space definition without a primary index arrived via snapshot: {space_def:?}");
continue;
};
// For now we just assume that during space creation index with id 1
// exists if and only if it is a bucket_id index.
let bucket_id_def = self.indexes.get(space_def.id, 1)?;
// XXX: this logic is duplicated in proc_apply_schema_change, but
// the code is so small, it doesn't seem forth it extracting it for
// now
let tt_space_def = space_def.to_space_metadata()?;
let tt_pk_def = pk_def.to_index_metadata();
let mut tt_bucket_id_def = None;
if let Some(def) = &bucket_id_def {
tt_bucket_id_def = Some(def.to_index_metadata());
}
sys_space.replace(&tt_space_def)?;
sys_index.replace(&tt_pk_def)?;
if let Some(def) = tt_bucket_id_def {
sys_index.replace(&def)?;
let abort_reason = ddl_create_space_on_master(self, space_def.id)?;
if let Some(e) = abort_reason {
return Err(Error::other(format!(
"failed to create space {}: {e}",
space_def.id
)));
}
}
......@@ -1621,6 +1601,80 @@ impl Indexes {
}
}
////////////////////////////////////////////////////////////////////////////////
// ddl
////////////////////////////////////////////////////////////////////////////////
/// Create tarantool space and any required indexes. Currently it creates a
/// primary index and a `bucket_id` index if it's a sharded space.
///
/// Return values:
/// * `Ok(None)` in case of success.
/// * `Ok(Some(abort_reason))` in case of error which should result in a ddl abort.
/// * `Err(e)` in case of retryable errors.
///
// FIXME: this function returns 2 kinds of errors: retryable and non-retryable.
// Currently this is impelemnted by returning one kind of errors as Err(e) and
// the other as Ok(Some(e)). This was the simplest solution at the time this
// function was implemented, as it requires the least amount of boilerplate and
// error forwarding code. But this signature is not intuitive, so maybe there's
// room for improvement.
pub fn ddl_create_space_on_master(
storage: &Clusterwide,
space_id: SpaceId,
) -> traft::Result<Option<TntError>> {
debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
let sys_space = Space::from(SystemSpace::Space);
let sys_index = Space::from(SystemSpace::Index);
let pico_space_def = storage
.spaces
.get(space_id)?
.ok_or_else(|| Error::other(format!("space with id {space_id} not found")))?;
// TODO: set defaults
let tt_space_def = pico_space_def.to_space_metadata()?;
let pico_pk_def = storage.indexes.get(space_id, 0)?.ok_or_else(|| {
Error::other(format!(
"primary index for space {} not found",
pico_space_def.name
))
})?;
let tt_pk_def = pico_pk_def.to_index_metadata();
// For now we just assume that during space creation index with id 1
// exists if and only if it is a bucket_id index.
let mut tt_bucket_id_def = None;
let pico_bucket_id_def = storage.indexes.get(space_id, 1)?;
if let Some(def) = &pico_bucket_id_def {
tt_bucket_id_def = Some(def.to_index_metadata());
}
let res = (|| -> tarantool::Result<()> {
if tt_pk_def.parts.is_empty() {
return Err(tarantool::set_and_get_error!(
tarantool::error::TarantoolErrorCode::ModifyIndex,
"can't create index '{}' in space '{}': parts list cannot be empty",
tt_pk_def.name,
tt_space_def.name,
)
.into());
}
sys_space.insert(&tt_space_def)?;
sys_index.insert(&tt_pk_def)?;
if let Some(def) = tt_bucket_id_def {
sys_index.insert(&def)?;
}
Ok(())
})();
Ok(res.err())
}
////////////////////////////////////////////////////////////////////////////////
// local schema version
////////////////////////////////////////////////////////////////////////////////
pub fn local_schema_version() -> tarantool::Result<u64> {
let space_schema = Space::from(SystemSpace::Schema);
let tuple = space_schema.get(&["local_schema_version"])?;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment