From a0ec420193011dc83c98b47305133b0c32663a05 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 30 May 2023 18:10:16 +0300 Subject: [PATCH] feat: implement ddl drop space (snapshot not implemented yet) --- src/rpc/ddl_apply.rs | 44 ++++++++++++++++++++-- src/schema.rs | 23 +----------- src/storage.rs | 89 ++++++++++++++++++++++++++++++++++++++++++++ src/traft/node.rs | 61 ++++++++++++++++++++++++++++-- 4 files changed, 188 insertions(+), 29 deletions(-) diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs index 80ee2c8111..1bf886b4c5 100644 --- a/src/rpc/ddl_apply.rs +++ b/src/rpc/ddl_apply.rs @@ -1,6 +1,6 @@ use crate::op::Ddl; -use crate::storage::ddl_create_space_on_master; use crate::storage::Clusterwide; +use crate::storage::{ddl_create_space_on_master, ddl_drop_space_on_master}; use crate::storage::{local_schema_version, set_local_schema_version}; use crate::tlog; use crate::traft::error::Error; @@ -43,7 +43,7 @@ crate::define_rpc_request! { // TODO: transaction may have already started, if we're in a process of // creating a big index. If governor sends a repeat rpc request to us we // should handle this correctly - let res = apply_schema_change(storage, &ddl, pending_schema_version); + let res = apply_schema_change(storage, &ddl, pending_schema_version, false); match res { Ok(Response::Abort { .. }) | Err(_) => { let rc = unsafe { ffi::box_txn_rollback() }; @@ -79,8 +79,27 @@ crate::define_rpc_request! { } } -// TODO: move this to crate::schema maybe? -pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Result<Response> { +/// Applies the schema change described by `ddl` to the tarantool storage. This +/// function is only called on replicaset masters, other replicas get the +/// changes via tarantool replication. +/// +/// In case of successful schema change the local schema version will be set to +/// `version`. In case of [`Ddl::DropSpace`] and [`Ddl::DropIndex`] schema is +/// only changed if `is_commit` is `true`. +/// +/// The space and index definitions are extracted from picodata storage via +/// `storage`. +/// +/// `is_commit` is `true` if schema change is being applied in response to a +/// [`DdlCommit`] raft entry, else it's `false`. +/// +/// [`DdlCommit`]: crate::traft::op::Op::DdlCommit +pub fn apply_schema_change( + storage: &Clusterwide, + ddl: &Ddl, + version: u64, + is_commit: bool, +) -> Result<Response> { debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); match *ddl { @@ -94,6 +113,23 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re }); } } + + Ddl::DropSpace { id } => { + if !is_commit { + // Space is only dropped on commit. + return Ok(Response::Ok); + } + + let abort_reason = ddl_drop_space_on_master(id)?; + if let Some(e) = abort_reason { + // We return Ok(error) because currently this is the only + // way to report an application level error. + return Ok(Response::Abort { + reason: e.to_string(), + }); + } + } + _ => { todo!(); } diff --git a/src/schema.rs b/src/schema.rs index b69bd451c8..7f79c51037 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -8,7 +8,6 @@ use tarantool::{ index::{IndexId, Part}, schema::space::SpaceMetadata, space::SpaceId, - space::{Space, SystemSpace}, tlua::{self, LuaRead}, tuple::Encode, util::Value, @@ -21,7 +20,7 @@ use crate::compare_and_swap; use crate::rpc; use crate::storage::ToEntryIter; use crate::storage::SPACE_ID_INTERNAL_MAX; -use crate::storage::{set_local_schema_version, Clusterwide, ClusterwideSpaceId, PropertyName}; +use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName}; use crate::traft::op::{Ddl, DdlBuilder, Op}; use crate::traft::{self, event, node, RaftIndex}; use crate::util::instant_saturating_add; @@ -162,26 +161,6 @@ impl IndexDef { } } -pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> { - debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); - let sys_space = Space::from(SystemSpace::Space); - let sys_index = Space::from(SystemSpace::Index); - - match *ddl { - Ddl::CreateSpace { id, .. } => { - sys_index.delete(&[id, 1])?; - sys_index.delete(&[id, 0])?; - sys_space.delete(&[id])?; - set_local_schema_version(version)?; - } - _ => { - todo!(); - } - } - - Ok(()) -} - // TODO: this should be a TryFrom in tarantool-module pub fn try_space_field_type_to_index_field_type( ft: tarantool::space::FieldType, diff --git a/src/storage.rs b/src/storage.rs index bde42a04e5..9326516a7e 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1599,12 +1599,52 @@ impl Indexes { None => Ok(None), } } + + #[inline] + pub fn by_space_id(&self, space_id: SpaceId) -> tarantool::Result<EntryIter<IndexDef>> { + let iter = self.space.select(IteratorType::Eq, &[space_id])?; + Ok(EntryIter::new(iter)) + } +} + +impl ToEntryIter for Indexes { + type Entry = IndexDef; + + #[inline(always)] + fn index_iter(&self) -> Result<IndexIterator> { + Ok(self.space.select(IteratorType::All, &())?) + } } //////////////////////////////////////////////////////////////////////////////// // ddl //////////////////////////////////////////////////////////////////////////////// +pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); + let sys_space = Space::from(SystemSpace::Space); + let sys_index = Space::from(SystemSpace::Index); + + match *ddl { + Ddl::CreateSpace { id, .. } => { + sys_index.delete(&[id, 1])?; + sys_index.delete(&[id, 0])?; + sys_space.delete(&[id])?; + set_local_schema_version(version)?; + } + + Ddl::DropSpace { .. } => { + // Actual drop happens only on commit, so there's nothing to abort. + } + + _ => { + todo!(); + } + } + + Ok(()) +} + /// Create tarantool space and any required indexes. Currently it creates a /// primary index and a `bucket_id` index if it's a sharded space. /// @@ -1671,6 +1711,51 @@ pub fn ddl_create_space_on_master( Ok(res.err()) } +/// Drop tarantool space and any entities which depend on it (currently just indexes). +/// +/// Return values: +/// * `Ok(None)` in case of success. +/// * `Ok(Some(abort_reason))` in case of error which should result in a ddl abort. +/// * `Err(e)` in case of retryable errors. +/// +// FIXME: this function returns 2 kinds of errors: retryable and non-retryable. +// Currently this is impelemnted by returning one kind of errors as Err(e) and +// the other as Ok(Some(e)). This was the simplest solution at the time this +// function was implemented, as it requires the least amount of boilerplate and +// error forwarding code. But this signature is not intuitive, so maybe there's +// room for improvement. +pub fn ddl_drop_space_on_master(space_id: SpaceId) -> traft::Result<Option<TntError>> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); + let sys_space = Space::from(SystemSpace::Space); + let sys_index = Space::from(SystemSpace::Index); + let sys_truncate = Space::from(SystemSpace::Truncate); + + let iter = sys_index.select(IteratorType::Eq, &[space_id])?; + let mut index_ids = Vec::with_capacity(4); + for tuple in iter { + let index_id: IndexId = tuple + .field(1)? + .expect("decoding metadata should never fail"); + // Primary key is handled explicitly. + if index_id != 0 { + index_ids.push(index_id); + } + } + let res = (|| -> tarantool::Result<()> { + // TODO: delete it from _truncate, delete grants + for iid in index_ids.iter().rev() { + sys_index.delete(&(space_id, iid))?; + } + // Primary key must be dropped last. + sys_index.delete(&(space_id, 0))?; + sys_truncate.delete(&[space_id])?; + sys_space.delete(&[space_id])?; + + Ok(()) + })(); + Ok(res.err()) +} + //////////////////////////////////////////////////////////////////////////////// // local schema version //////////////////////////////////////////////////////////////////////////////// @@ -1693,6 +1778,10 @@ pub fn set_local_schema_version(v: u64) -> tarantool::Result<()> { Ok(()) } +//////////////////////////////////////////////////////////////////////////////// +// max space id +//////////////////////////////////////////////////////////////////////////////// + pub const SPACE_ID_INTERNAL_MAX: u32 = 1024; /// Updates box.space._schema max_id to start outside the reserved internal diff --git a/src/traft/node.rs b/src/traft/node.rs index 1bcc9e19ce..54b666511f 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -12,8 +12,8 @@ use crate::kvcell::KVCell; use crate::loop_start; use crate::r#loop::FlowControl; use crate::rpc; -use crate::schema::ddl_abort_on_master; use crate::schema::{Distribution, IndexDef, SpaceDef}; +use crate::storage::ddl_abort_on_master; use crate::storage::local_schema_version; use crate::storage::SnapshotData; use crate::storage::ToEntryIter as _; @@ -858,10 +858,12 @@ impl NodeImpl { if self.is_readonly() { return SleepAndRetry; } else { + // Master applies schema change at this point. let resp = rpc::ddl_apply::apply_schema_change( &self.storage, &ddl, pending_version, + true, ) .expect("storage error"); match resp { @@ -895,6 +897,25 @@ impl NodeImpl { // be done, but for now we just ignore the error "no such index" let _ = res; } + + Ddl::DropSpace { id } => { + self.storage + .spaces + .delete(id) + .expect("storage should never fail"); + let iter = self + .storage + .indexes + .by_space_id(id) + .expect("storage should never fail"); + for index in iter { + self.storage + .indexes + .delete(index.space_id, index.id) + .expect("storage should never fail"); + } + } + _ => { todo!() } @@ -939,6 +960,25 @@ impl NodeImpl { self.storage.indexes.delete(id, 0).expect("storage error"); self.storage.spaces.delete(id).expect("storage error"); } + + Ddl::DropSpace { id } => { + self.storage + .spaces + .update_operable(id, true) + .expect("storage should never fail"); + let iter = self + .storage + .indexes + .by_space_id(id) + .expect("storage should never fail"); + for index in iter { + self.storage + .indexes + .update_operable(index.space_id, index.id, true) + .expect("storage should never fail"); + } + } + _ => { todo!() } @@ -1117,10 +1157,25 @@ impl NodeImpl { let _ = (space_id, index_id, by_fields); todo!(); } + Ddl::DropSpace { id } => { - let _ = id; - todo!(); + self.storage + .spaces + .update_operable(id, false) + .expect("storage should never fail"); + let iter = self + .storage + .indexes + .by_space_id(id) + .expect("storage should never fail"); + for index in iter { + self.storage + .indexes + .update_operable(index.space_id, index.id, false) + .expect("storage should never fail"); + } } + Ddl::DropIndex { index_id, space_id } => { let _ = (index_id, space_id); todo!(); -- GitLab