From ac4fa22c47d0dbaf5141671894d278a23cb2bca3 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Fri, 2 Jun 2023 19:22:45 +0300 Subject: [PATCH 1/6] refactor: extract ddl_create_space_on_master function --- src/rpc/ddl_apply.rs | 57 +++++------------------ src/storage.rs | 106 ++++++++++++++++++++++++++++++++----------- 2 files changed, 91 insertions(+), 72 deletions(-) diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs index f874d8e4be..80ee2c8111 100644 --- a/src/rpc/ddl_apply.rs +++ b/src/rpc/ddl_apply.rs @@ -1,4 +1,5 @@ use crate::op::Ddl; +use crate::storage::ddl_create_space_on_master; use crate::storage::Clusterwide; use crate::storage::{local_schema_version, set_local_schema_version}; use crate::tlog; @@ -9,7 +10,6 @@ use crate::traft::{RaftIndex, RaftTerm}; use std::time::Duration; use tarantool::error::{TarantoolError, TarantoolErrorCode}; use tarantool::ffi::tarantool as ffi; -use tarantool::space::{Space, SystemSpace}; crate::define_rpc_request! { fn proc_apply_schema_change(req: Request) -> Result<Response> { @@ -82,54 +82,11 @@ crate::define_rpc_request! { // TODO: move this to crate::schema maybe? pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Result<Response> { debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); - let sys_space = Space::from(SystemSpace::Space); - let sys_index = Space::from(SystemSpace::Index); match *ddl { Ddl::CreateSpace { id, .. } => { - let pico_space_def = storage - .spaces - .get(id)? - .ok_or_else(|| Error::other(format!("space with id #{id} not found")))?; - // TODO: set defaults - let tt_space_def = pico_space_def.to_space_metadata()?; - - let pico_pk_def = storage.indexes.get(id, 0)?.ok_or_else(|| { - Error::other(format!( - "primary index for space {} not found", - pico_space_def.name - )) - })?; - let tt_pk_def = pico_pk_def.to_index_metadata(); - - // For now we just assume that during space creation index with id 1 - // exists if and only if it is a bucket_id index. - let mut tt_bucket_id_def = None; - let pico_bucket_id_def = storage.indexes.get(id, 1)?; - if let Some(def) = &pico_bucket_id_def { - tt_bucket_id_def = Some(def.to_index_metadata()); - } - - let res = (|| -> tarantool::Result<()> { - if tt_pk_def.parts.is_empty() { - return Err(tarantool::set_and_get_error!( - tarantool::error::TarantoolErrorCode::ModifyIndex, - "can't create index '{}' in space '{}': parts list cannot be empty", - tt_pk_def.name, - tt_space_def.name, - ) - .into()); - } - sys_space.insert(&tt_space_def)?; - sys_index.insert(&tt_pk_def)?; - if let Some(def) = tt_bucket_id_def { - sys_index.insert(&def)?; - } - set_local_schema_version(version)?; - - Ok(()) - })(); - if let Err(e) = res { + let abort_reason = ddl_create_space_on_master(storage, id)?; + if let Some(e) = abort_reason { // We return Ok(error) because currently this is the only // way to report an application level error. return Ok(Response::Abort { @@ -142,5 +99,13 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re } } + if let Err(e) = set_local_schema_version(version) { + // We return Ok(error) because currently this is the only + // way to report an application level error. + return Ok(Response::Abort { + reason: e.to_string(), + }); + } + Ok(Response::Ok) } diff --git a/src/storage.rs b/src/storage.rs index 2db24dfcb9..bde42a04e5 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -591,9 +591,6 @@ impl Clusterwide { } pub fn apply_ddl_changes_on_replicaset_master(&self) -> traft::Result<()> { - let sys_space = Space::from(SystemSpace::Space); - let sys_index = Space::from(SystemSpace::Index); - for space_def in self.spaces.iter()? { if !space_def.operable { // If it so happens, that we receive an unfinished schema change via snapshot, @@ -602,29 +599,12 @@ impl Clusterwide { continue; } - let Some(pk_def) = self.indexes.get(space_def.id, 0)? else { - crate::warn_or_panic!("a space definition without a primary index arrived via snapshot: {space_def:?}"); - continue; - }; - - // For now we just assume that during space creation index with id 1 - // exists if and only if it is a bucket_id index. - let bucket_id_def = self.indexes.get(space_def.id, 1)?; - - // XXX: this logic is duplicated in proc_apply_schema_change, but - // the code is so small, it doesn't seem forth it extracting it for - // now - let tt_space_def = space_def.to_space_metadata()?; - let tt_pk_def = pk_def.to_index_metadata(); - let mut tt_bucket_id_def = None; - if let Some(def) = &bucket_id_def { - tt_bucket_id_def = Some(def.to_index_metadata()); - } - - sys_space.replace(&tt_space_def)?; - sys_index.replace(&tt_pk_def)?; - if let Some(def) = tt_bucket_id_def { - sys_index.replace(&def)?; + let abort_reason = ddl_create_space_on_master(self, space_def.id)?; + if let Some(e) = abort_reason { + return Err(Error::other(format!( + "failed to create space {}: {e}", + space_def.id + ))); } } @@ -1621,6 +1601,80 @@ impl Indexes { } } +//////////////////////////////////////////////////////////////////////////////// +// ddl +//////////////////////////////////////////////////////////////////////////////// + +/// Create tarantool space and any required indexes. Currently it creates a +/// primary index and a `bucket_id` index if it's a sharded space. +/// +/// Return values: +/// * `Ok(None)` in case of success. +/// * `Ok(Some(abort_reason))` in case of error which should result in a ddl abort. +/// * `Err(e)` in case of retryable errors. +/// +// FIXME: this function returns 2 kinds of errors: retryable and non-retryable. +// Currently this is impelemnted by returning one kind of errors as Err(e) and +// the other as Ok(Some(e)). This was the simplest solution at the time this +// function was implemented, as it requires the least amount of boilerplate and +// error forwarding code. But this signature is not intuitive, so maybe there's +// room for improvement. +pub fn ddl_create_space_on_master( + storage: &Clusterwide, + space_id: SpaceId, +) -> traft::Result<Option<TntError>> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); + let sys_space = Space::from(SystemSpace::Space); + let sys_index = Space::from(SystemSpace::Index); + + let pico_space_def = storage + .spaces + .get(space_id)? + .ok_or_else(|| Error::other(format!("space with id {space_id} not found")))?; + // TODO: set defaults + let tt_space_def = pico_space_def.to_space_metadata()?; + + let pico_pk_def = storage.indexes.get(space_id, 0)?.ok_or_else(|| { + Error::other(format!( + "primary index for space {} not found", + pico_space_def.name + )) + })?; + let tt_pk_def = pico_pk_def.to_index_metadata(); + + // For now we just assume that during space creation index with id 1 + // exists if and only if it is a bucket_id index. + let mut tt_bucket_id_def = None; + let pico_bucket_id_def = storage.indexes.get(space_id, 1)?; + if let Some(def) = &pico_bucket_id_def { + tt_bucket_id_def = Some(def.to_index_metadata()); + } + + let res = (|| -> tarantool::Result<()> { + if tt_pk_def.parts.is_empty() { + return Err(tarantool::set_and_get_error!( + tarantool::error::TarantoolErrorCode::ModifyIndex, + "can't create index '{}' in space '{}': parts list cannot be empty", + tt_pk_def.name, + tt_space_def.name, + ) + .into()); + } + sys_space.insert(&tt_space_def)?; + sys_index.insert(&tt_pk_def)?; + if let Some(def) = tt_bucket_id_def { + sys_index.insert(&def)?; + } + + Ok(()) + })(); + Ok(res.err()) +} + +//////////////////////////////////////////////////////////////////////////////// +// local schema version +//////////////////////////////////////////////////////////////////////////////// + pub fn local_schema_version() -> tarantool::Result<u64> { let space_schema = Space::from(SystemSpace::Schema); let tuple = space_schema.get(&["local_schema_version"])?; -- GitLab From a0ec420193011dc83c98b47305133b0c32663a05 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 30 May 2023 18:10:16 +0300 Subject: [PATCH 2/6] feat: implement ddl drop space (snapshot not implemented yet) --- src/rpc/ddl_apply.rs | 44 ++++++++++++++++++++-- src/schema.rs | 23 +----------- src/storage.rs | 89 ++++++++++++++++++++++++++++++++++++++++++++ src/traft/node.rs | 61 ++++++++++++++++++++++++++++-- 4 files changed, 188 insertions(+), 29 deletions(-) diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs index 80ee2c8111..1bf886b4c5 100644 --- a/src/rpc/ddl_apply.rs +++ b/src/rpc/ddl_apply.rs @@ -1,6 +1,6 @@ use crate::op::Ddl; -use crate::storage::ddl_create_space_on_master; use crate::storage::Clusterwide; +use crate::storage::{ddl_create_space_on_master, ddl_drop_space_on_master}; use crate::storage::{local_schema_version, set_local_schema_version}; use crate::tlog; use crate::traft::error::Error; @@ -43,7 +43,7 @@ crate::define_rpc_request! { // TODO: transaction may have already started, if we're in a process of // creating a big index. If governor sends a repeat rpc request to us we // should handle this correctly - let res = apply_schema_change(storage, &ddl, pending_schema_version); + let res = apply_schema_change(storage, &ddl, pending_schema_version, false); match res { Ok(Response::Abort { .. }) | Err(_) => { let rc = unsafe { ffi::box_txn_rollback() }; @@ -79,8 +79,27 @@ crate::define_rpc_request! { } } -// TODO: move this to crate::schema maybe? -pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Result<Response> { +/// Applies the schema change described by `ddl` to the tarantool storage. This +/// function is only called on replicaset masters, other replicas get the +/// changes via tarantool replication. +/// +/// In case of successful schema change the local schema version will be set to +/// `version`. In case of [`Ddl::DropSpace`] and [`Ddl::DropIndex`] schema is +/// only changed if `is_commit` is `true`. +/// +/// The space and index definitions are extracted from picodata storage via +/// `storage`. +/// +/// `is_commit` is `true` if schema change is being applied in response to a +/// [`DdlCommit`] raft entry, else it's `false`. +/// +/// [`DdlCommit`]: crate::traft::op::Op::DdlCommit +pub fn apply_schema_change( + storage: &Clusterwide, + ddl: &Ddl, + version: u64, + is_commit: bool, +) -> Result<Response> { debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); match *ddl { @@ -94,6 +113,23 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re }); } } + + Ddl::DropSpace { id } => { + if !is_commit { + // Space is only dropped on commit. + return Ok(Response::Ok); + } + + let abort_reason = ddl_drop_space_on_master(id)?; + if let Some(e) = abort_reason { + // We return Ok(error) because currently this is the only + // way to report an application level error. + return Ok(Response::Abort { + reason: e.to_string(), + }); + } + } + _ => { todo!(); } diff --git a/src/schema.rs b/src/schema.rs index b69bd451c8..7f79c51037 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -8,7 +8,6 @@ use tarantool::{ index::{IndexId, Part}, schema::space::SpaceMetadata, space::SpaceId, - space::{Space, SystemSpace}, tlua::{self, LuaRead}, tuple::Encode, util::Value, @@ -21,7 +20,7 @@ use crate::compare_and_swap; use crate::rpc; use crate::storage::ToEntryIter; use crate::storage::SPACE_ID_INTERNAL_MAX; -use crate::storage::{set_local_schema_version, Clusterwide, ClusterwideSpaceId, PropertyName}; +use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName}; use crate::traft::op::{Ddl, DdlBuilder, Op}; use crate::traft::{self, event, node, RaftIndex}; use crate::util::instant_saturating_add; @@ -162,26 +161,6 @@ impl IndexDef { } } -pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> { - debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); - let sys_space = Space::from(SystemSpace::Space); - let sys_index = Space::from(SystemSpace::Index); - - match *ddl { - Ddl::CreateSpace { id, .. } => { - sys_index.delete(&[id, 1])?; - sys_index.delete(&[id, 0])?; - sys_space.delete(&[id])?; - set_local_schema_version(version)?; - } - _ => { - todo!(); - } - } - - Ok(()) -} - // TODO: this should be a TryFrom in tarantool-module pub fn try_space_field_type_to_index_field_type( ft: tarantool::space::FieldType, diff --git a/src/storage.rs b/src/storage.rs index bde42a04e5..9326516a7e 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1599,12 +1599,52 @@ impl Indexes { None => Ok(None), } } + + #[inline] + pub fn by_space_id(&self, space_id: SpaceId) -> tarantool::Result<EntryIter<IndexDef>> { + let iter = self.space.select(IteratorType::Eq, &[space_id])?; + Ok(EntryIter::new(iter)) + } +} + +impl ToEntryIter for Indexes { + type Entry = IndexDef; + + #[inline(always)] + fn index_iter(&self) -> Result<IndexIterator> { + Ok(self.space.select(IteratorType::All, &())?) + } } //////////////////////////////////////////////////////////////////////////////// // ddl //////////////////////////////////////////////////////////////////////////////// +pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); + let sys_space = Space::from(SystemSpace::Space); + let sys_index = Space::from(SystemSpace::Index); + + match *ddl { + Ddl::CreateSpace { id, .. } => { + sys_index.delete(&[id, 1])?; + sys_index.delete(&[id, 0])?; + sys_space.delete(&[id])?; + set_local_schema_version(version)?; + } + + Ddl::DropSpace { .. } => { + // Actual drop happens only on commit, so there's nothing to abort. + } + + _ => { + todo!(); + } + } + + Ok(()) +} + /// Create tarantool space and any required indexes. Currently it creates a /// primary index and a `bucket_id` index if it's a sharded space. /// @@ -1671,6 +1711,51 @@ pub fn ddl_create_space_on_master( Ok(res.err()) } +/// Drop tarantool space and any entities which depend on it (currently just indexes). +/// +/// Return values: +/// * `Ok(None)` in case of success. +/// * `Ok(Some(abort_reason))` in case of error which should result in a ddl abort. +/// * `Err(e)` in case of retryable errors. +/// +// FIXME: this function returns 2 kinds of errors: retryable and non-retryable. +// Currently this is impelemnted by returning one kind of errors as Err(e) and +// the other as Ok(Some(e)). This was the simplest solution at the time this +// function was implemented, as it requires the least amount of boilerplate and +// error forwarding code. But this signature is not intuitive, so maybe there's +// room for improvement. +pub fn ddl_drop_space_on_master(space_id: SpaceId) -> traft::Result<Option<TntError>> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); + let sys_space = Space::from(SystemSpace::Space); + let sys_index = Space::from(SystemSpace::Index); + let sys_truncate = Space::from(SystemSpace::Truncate); + + let iter = sys_index.select(IteratorType::Eq, &[space_id])?; + let mut index_ids = Vec::with_capacity(4); + for tuple in iter { + let index_id: IndexId = tuple + .field(1)? + .expect("decoding metadata should never fail"); + // Primary key is handled explicitly. + if index_id != 0 { + index_ids.push(index_id); + } + } + let res = (|| -> tarantool::Result<()> { + // TODO: delete it from _truncate, delete grants + for iid in index_ids.iter().rev() { + sys_index.delete(&(space_id, iid))?; + } + // Primary key must be dropped last. + sys_index.delete(&(space_id, 0))?; + sys_truncate.delete(&[space_id])?; + sys_space.delete(&[space_id])?; + + Ok(()) + })(); + Ok(res.err()) +} + //////////////////////////////////////////////////////////////////////////////// // local schema version //////////////////////////////////////////////////////////////////////////////// @@ -1693,6 +1778,10 @@ pub fn set_local_schema_version(v: u64) -> tarantool::Result<()> { Ok(()) } +//////////////////////////////////////////////////////////////////////////////// +// max space id +//////////////////////////////////////////////////////////////////////////////// + pub const SPACE_ID_INTERNAL_MAX: u32 = 1024; /// Updates box.space._schema max_id to start outside the reserved internal diff --git a/src/traft/node.rs b/src/traft/node.rs index 1bcc9e19ce..54b666511f 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -12,8 +12,8 @@ use crate::kvcell::KVCell; use crate::loop_start; use crate::r#loop::FlowControl; use crate::rpc; -use crate::schema::ddl_abort_on_master; use crate::schema::{Distribution, IndexDef, SpaceDef}; +use crate::storage::ddl_abort_on_master; use crate::storage::local_schema_version; use crate::storage::SnapshotData; use crate::storage::ToEntryIter as _; @@ -858,10 +858,12 @@ impl NodeImpl { if self.is_readonly() { return SleepAndRetry; } else { + // Master applies schema change at this point. let resp = rpc::ddl_apply::apply_schema_change( &self.storage, &ddl, pending_version, + true, ) .expect("storage error"); match resp { @@ -895,6 +897,25 @@ impl NodeImpl { // be done, but for now we just ignore the error "no such index" let _ = res; } + + Ddl::DropSpace { id } => { + self.storage + .spaces + .delete(id) + .expect("storage should never fail"); + let iter = self + .storage + .indexes + .by_space_id(id) + .expect("storage should never fail"); + for index in iter { + self.storage + .indexes + .delete(index.space_id, index.id) + .expect("storage should never fail"); + } + } + _ => { todo!() } @@ -939,6 +960,25 @@ impl NodeImpl { self.storage.indexes.delete(id, 0).expect("storage error"); self.storage.spaces.delete(id).expect("storage error"); } + + Ddl::DropSpace { id } => { + self.storage + .spaces + .update_operable(id, true) + .expect("storage should never fail"); + let iter = self + .storage + .indexes + .by_space_id(id) + .expect("storage should never fail"); + for index in iter { + self.storage + .indexes + .update_operable(index.space_id, index.id, true) + .expect("storage should never fail"); + } + } + _ => { todo!() } @@ -1117,10 +1157,25 @@ impl NodeImpl { let _ = (space_id, index_id, by_fields); todo!(); } + Ddl::DropSpace { id } => { - let _ = id; - todo!(); + self.storage + .spaces + .update_operable(id, false) + .expect("storage should never fail"); + let iter = self + .storage + .indexes + .by_space_id(id) + .expect("storage should never fail"); + for index in iter { + self.storage + .indexes + .update_operable(index.space_id, index.id, false) + .expect("storage should never fail"); + } } + Ddl::DropIndex { index_id, space_id } => { let _ = (index_id, space_id); todo!(); -- GitLab From cb7f653d4483d31da54294988ff35aac729444c2 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Wed, 31 May 2023 17:45:05 +0300 Subject: [PATCH 3/6] test: add a couple of ddl drop space tests --- test/conftest.py | 34 ++-- test/int/test_ddl.py | 458 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 477 insertions(+), 15 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index 98b73b72f9..d813c8a661 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -624,9 +624,9 @@ class Instance: def cas( self, - dml_kind: Literal["insert", "replace", "delete"], + op_kind: Literal["insert", "replace", "delete", "drop_space"], space: str | int, - tuple: Tuple | List, + tuple: Tuple | List | None = None, index: int | None = None, term: int | None = None, ranges: List[CasRange] | None = None, @@ -666,25 +666,35 @@ class Instance: ranges=predicate_ranges, ) - if dml_kind in ["insert", "replace"]: - dml = dict( + if op_kind in ["insert", "replace"]: + op = dict( kind="dml", - op_kind=dml_kind, + op_kind=op_kind, space=space_id, tuple=msgpack.packb(tuple), ) - elif dml_kind == "delete": - dml = dict( + elif op_kind == "delete": + op = dict( kind="dml", - op_kind=dml_kind, + op_kind=op_kind, space=space_id, key=msgpack.packb(tuple), ) + elif op_kind == "drop_space": + op = dict( + kind="ddl_prepare", + op_kind=op_kind, + schema_version=self.next_schema_version(), + ddl=dict( + kind="drop_space", + id=space_id, + ), + ) else: - raise Exception(f"unsupported {dml_kind=}") + raise Exception(f"unsupported {op_kind=}") - eprint(f"CaS:\n {predicate=}\n {dml=}") - return self.call(".proc_cas", self.cluster_id, predicate, dml)[0]["index"] + eprint(f"CaS:\n {predicate=}\n {op=}") + return self.call(".proc_cas", self.cluster_id, predicate, op)[0]["index"] def next_schema_version(self) -> int: t = self.call("box.space._pico_property:get", "next_schema_version") @@ -1062,7 +1072,7 @@ class Cluster: def create_space(self, params: dict, timeout: float = 3.0): """ - Creates a space. Waits for all peers to be aware of it. + Creates a space. Waits for all online peers to be aware of it. """ index = self.instances[0].create_space(params, timeout) self.raft_wait_index(index, timeout) diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py index 6b9153ec02..87ae1e3e5e 100644 --- a/test/int/test_ddl.py +++ b/test/int/test_ddl.py @@ -1,4 +1,5 @@ import pytest +import time from conftest import Cluster, ReturnError @@ -12,6 +13,7 @@ def test_ddl_abort(cluster: Cluster): # TODO: test manual abort when we have long-running ddls +################################################################################ def test_ddl_create_space_lua(cluster: Cluster): i1, i2 = cluster.deploy(instance_count=2) @@ -95,6 +97,7 @@ def test_ddl_create_space_lua(cluster: Cluster): assert i2.call("box.space._pico_space:get", space_id) == pico_space_def +################################################################################ def test_ddl_create_space_bulky(cluster: Cluster): i1, i2, i3, i4 = cluster.deploy(instance_count=4, init_replication_factor=2) @@ -236,7 +239,7 @@ def test_ddl_create_space_bulky(cluster: Cluster): assert i4.call("box.space._index:get", [space_id, 0]) == tt_pk_def ############################################################################ - # A new replicaset catches up after the fact successfully + # A new replicaset boots up after the fact successfully i5 = cluster.add_instance(wait_online=True, replicaset_id="r3") @@ -257,7 +260,10 @@ def test_ddl_create_space_bulky(cluster: Cluster): assert i6.call("box.space._space:get", space_id) == tt_space_def assert i6.call("box.space._index:get", [space_id, 0]) == tt_pk_def + # TODO: test replica becoming master in the process of catching up + +################################################################################ def test_ddl_create_sharded_space(cluster: Cluster): i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=2) @@ -369,6 +375,7 @@ def test_ddl_create_sharded_space(cluster: Cluster): assert i2.call("box.space._index:get", [space_id, 1]) == tt_bucket_id_def +################################################################################ def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster): i1, i2, i3 = cluster.deploy(instance_count=3) @@ -422,6 +429,7 @@ def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster): assert i.eval("return box.space._pico_space:get(...).operable", space_id) +################################################################################ def test_ddl_create_space_partial_failure(cluster: Cluster): # i2 & i3 are for quorum i1, i2, i3, i4, i5 = cluster.deploy(instance_count=5) @@ -490,6 +498,7 @@ def test_ddl_create_space_partial_failure(cluster: Cluster): assert i4.call("box.space._space:get", space_id) is not None +################################################################################ def test_successful_wakeup_after_ddl(cluster: Cluster): # Manual replicaset distribution. i1 = cluster.add_instance(replicaset_id="r1", wait_online=True) @@ -526,7 +535,8 @@ def test_successful_wakeup_after_ddl(cluster: Cluster): assert i3.call("box.space._space:get", space_id) is not None -def test_ddl_from_snapshot_at_boot(cluster: Cluster): +################################################################################ +def test_ddl_create_space_from_snapshot_at_boot(cluster: Cluster): # Second instance is only for quorum i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=2) @@ -603,7 +613,8 @@ def test_ddl_from_snapshot_at_boot(cluster: Cluster): assert i4.call("box.space._schema:get", "local_schema_version")[1] == 1 -def test_ddl_from_snapshot_at_catchup(cluster: Cluster): +################################################################################ +def test_ddl_create_space_from_snapshot_at_catchup(cluster: Cluster): # Second instance is only for quorum i1 = cluster.add_instance(wait_online=True, replicaset_id="r1") i2 = cluster.add_instance(wait_online=True, replicaset_id="R2") @@ -665,6 +676,7 @@ def test_ddl_from_snapshot_at_catchup(cluster: Cluster): assert i3.call("box.space._schema:get", "local_schema_version")[1] == 1 +################################################################################ def test_ddl_create_space_at_catchup_with_master_switchover(cluster: Cluster): # For quorum. i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=1) @@ -704,3 +716,443 @@ def test_ddl_create_space_at_catchup_with_master_switchover(cluster: Cluster): # A master catches up by snapshot assert i4.call("box.space._space.index.name:get", space_name) is not None + + +################################################################################ +def test_ddl_drop_space_normal(cluster: Cluster): + # 2 replicasets with 2 replicas each + i1, *_ = cluster.deploy(instance_count=4, init_replication_factor=2) + + # Set up. + space_name = "things" + cluster.create_space( + dict( + name=space_name, + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", space_name) is not None + + # Actual behaviour we're testing + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + for i in cluster.instances: + i.raft_wait_index(index_commit) + + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", space_name) is None + + # Now we can create another space with the same name. + cluster.create_space( + dict( + name=space_name, + format=[ + dict(name="id", type="unsigned", is_nullable=False), + dict(name="value", type="any", is_nullable=False), + ], + primary_key=["id"], + distribution="global", + ), + ) + + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", space_name) is not None + + +################################################################################ +def test_ddl_drop_space_partial_failure(cluster: Cluster): + # First 3 are fore quorum. + i1, i2, i3 = cluster.deploy(instance_count=3, init_replication_factor=1) + # Test subjects. + i4 = cluster.add_instance(wait_online=True, replicaset_id="R99") + i5 = cluster.add_instance(wait_online=True, replicaset_id="R99") + + # Set up. + space_name = "trinkets" + cluster.create_space( + dict( + name=space_name, + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + index = i1.cas("insert", space_name, [9]) + for i in cluster.instances: + i.raft_wait_index(index) + + # Put a replicaset to sleep. + i4.terminate() + i5.terminate() + + # Ddl fails because all masters must be present. + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + with pytest.raises(ReturnError, match="timeout"): + i1.raft_wait_index(index_commit, timeout=3) + + entry, *_ = i1.call( + "box.space._raft_log:select", None, dict(iterator="lt", limit=1) + ) + # Has not yet been finalized + assert entry[4][1][0] == "ddl_prepare" + + # Space is not yet dropped. + assert i1.call("box.space._space.index.name:get", space_name) is not None + assert i2.call("box.space._space.index.name:get", space_name) is not None + assert i3.call("box.space._space.index.name:get", space_name) is not None + + # And no data is lost yet. + assert i1.call("box.space.trinkets:get", 9) == [9] + assert i2.call("box.space.trinkets:get", 9) == [9] + assert i3.call("box.space.trinkets:get", 9) == [9] + + # But the space is marked not operable. + assert not i1.eval( + "return box.space._pico_space.index.name:get(...).operable", space_name + ) + assert not i2.eval( + "return box.space._pico_space.index.name:get(...).operable", space_name + ) + assert not i3.eval( + "return box.space._pico_space.index.name:get(...).operable", space_name + ) + + # TODO: test manual ddl abort + + # Wakeup the sleeping master. + i4.start() + i4.wait_online() + + # TODO: how do we sync raft log at this point? + time.sleep(2) + + # Now space is dropped. + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + assert i3.call("box.space._space.index.name:get", space_name) is None + assert i4.call("box.space._space.index.name:get", space_name) is None + + # And a replica catches up by raft log successfully. + i5.start() + i5.wait_online() + assert i5.call("box.space._space.index.name:get", space_name) is None + + +################################################################################ +def test_ddl_drop_space_by_raft_log_at_catchup(cluster: Cluster): + # i1 is for quorum + i1, *_ = cluster.deploy(instance_count=1, init_replication_factor=1) + i2 = cluster.add_instance(wait_online=True, replicaset_id="r99") + # This one will be catching up. + i3 = cluster.add_instance(wait_online=True, replicaset_id="r99") + + # Set up. + cluster.create_space( + dict( + name="replace_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "replace_me") is not None + + cluster.create_space( + dict( + name="drop_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "drop_me") is not None + + # i3 will be catching up. + i3.terminate() + + # Drop the spaces + for space_name in ["replace_me", "drop_me"]: + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + i1.raft_wait_index(index_commit) + i2.raft_wait_index(index_commit) + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + + # + # We replace a sharded space with a global one to check indexes were dropped + # correctly. + cluster.create_space( + dict( + name="replace_me", + format=[ + dict(name="#", type="unsigned", is_nullable=False), + ], + primary_key=["#"], + distribution="global", + ), + ) + + assert i1.call("box.space._space.index.name:get", "replace_me") is not None + assert i2.call("box.space._space.index.name:get", "replace_me") is not None + + # Wake up the catching up instance. + i3.start() + i3.wait_online() + + # The space was dropped. + assert i3.call("box.space._space.index.name:get", "drop_me") is None + + # The space was dropped and a new one was created without conflict. + format = i3.eval("return box.space[...]:format()", "replace_me") + assert [f["name"] for f in format] == ["#"] + + +################################################################################ +def test_ddl_drop_space_by_raft_log_at_boot(cluster: Cluster): + # These guys are for quorum. + i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=1) + + # + # Set up. + # + cluster.create_space( + dict( + name="replace_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "replace_me") is not None + + cluster.create_space( + dict( + name="drop_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "drop_me") is not None + + # + # Drop spaces. + # + for space_name in ["replace_me", "drop_me"]: + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + i1.raft_wait_index(index_commit) + i2.raft_wait_index(index_commit) + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + + # + # We replace a sharded space with a global one to check indexes were dropped + # correctly. + cluster.create_space( + dict( + name="replace_me", + format=[ + dict(name="#", type="unsigned", is_nullable=False), + ], + primary_key=["#"], + distribution="global", + ), + ) + + assert i1.call("box.space._space.index.name:get", "replace_me") is not None + assert i2.call("box.space._space.index.name:get", "replace_me") is not None + + # + # Add a new replicaset. + # + i3 = cluster.add_instance(wait_online=False, replicaset_id="r99") + i4 = cluster.add_instance(wait_online=False, replicaset_id="r99") + i3.start() + i4.start() + i3.wait_online() + i4.wait_online() + + # + # Both caught up successfully. + # + assert i3.call("box.space._space.index.name:get", "drop_me") is None + assert i4.call("box.space._space.index.name:get", "drop_me") is None + + format = i3.eval("return box.space[...]:format()", "replace_me") + assert [f["name"] for f in format] == ["#"] + format = i4.eval("return box.space[...]:format()", "replace_me") + assert [f["name"] for f in format] == ["#"] + + +################################################################################ +def test_ddl_drop_space_by_snapshot_on_replica(cluster: Cluster): + # i1 is for quorum + i1, *_ = cluster.deploy(instance_count=1, init_replication_factor=1) + i2 = cluster.add_instance(wait_online=True, replicaset_id="r99") + # This one will be catching up. + i3 = cluster.add_instance(wait_online=True, replicaset_id="r99") + + # Set up. + cluster.create_space( + dict( + name="replace_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "replace_me") is not None + + cluster.create_space( + dict( + name="drop_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "drop_me") is not None + + # i3 will be catching up. + i3.terminate() + + for space_name in ["replace_me", "drop_me"]: + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + i1.raft_wait_index(index_commit) + i2.raft_wait_index(index_commit) + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + + # We replace a sharded space with a global one to check indexes were dropped + # correctly. + cluster.create_space( + dict( + name="replace_me", + format=[ + dict(name="#", type="unsigned", is_nullable=False), + ], + primary_key=["#"], + distribution="global", + ), + ) + + assert i1.call("box.space._space.index.name:get", "replace_me") is not None + assert i2.call("box.space._space.index.name:get", "replace_me") is not None + + # Compact raft log to trigger snapshot generation. + i1.raft_compact_log() + i2.raft_compact_log() + + # Wake up the catching up instance. + i3.start() + i3.wait_online() + + # The space was dropped. + assert i3.call("box.space._space.index.name:get", "drop_me") is None + + # The space was dropped and a new one was created without conflict. + format = i3.eval("return box.space[...]:format()", "replace_me") + assert [f["name"] for f in format] == ["#"] + + +################################################################################ +def test_ddl_drop_space_by_snapshot_on_master(cluster: Cluster): + # These ones are for quorum. + i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=1) + # This is a replicaset master, who will be following along with the ddl. + i3 = cluster.add_instance(wait_online=True, replicaset_id="r99") + # This is a replica, who will become master and be catching up. + i4 = cluster.add_instance(wait_online=True, replicaset_id="r99") + + # Set up. + cluster.create_space( + dict( + name="space_to_drop", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + cluster.create_space( + dict( + name="space_to_replace", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + + for space_name in ["space_to_drop", "space_to_replace"]: + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", space_name) is not None + + # i4 will be catching up. + i4.terminate() + + # + # Drop spaces. + # + for space_name in ["space_to_drop", "space_to_replace"]: + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + i1.raft_wait_index(index_commit) + i2.raft_wait_index(index_commit) + i3.raft_wait_index(index_commit) + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + assert i3.call("box.space._space.index.name:get", space_name) is None + + # We replace a sharded space with a global one to check indexes were dropped + # correctly. + cluster.create_space( + dict( + name="space_to_replace", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + + assert i1.call("box.space._space.index.name:get", "space_to_replace") is not None + assert i2.call("box.space._space.index.name:get", "space_to_replace") is not None + assert i3.call("box.space._space.index.name:get", "space_to_replace") is not None + + # Compact raft log to trigger snapshot generation. + i1.raft_compact_log() + i2.raft_compact_log() + + # Put i3 to sleep to trigger master switchover. + i3.terminate() + + # Wake up the catching up instance. i4 has become master and. + i4.start() + i4.wait_online() + + # The space was dropped. + # assert i4.call("box.space._space.index.name:get", "space_to_drop") is None + # The space was replaced. + assert i4.call("box.space._space.index.name:get", "space_to_replace") is not None -- GitLab From e9b1b821cd18477b2b3a2108bf79a2af89255e60 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Fri, 2 Jun 2023 19:24:27 +0300 Subject: [PATCH 4/6] fix: ddl drop space from snapshot --- src/storage.rs | 66 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/src/storage.rs b/src/storage.rs index 9326516a7e..7b65f2aa05 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -19,7 +19,7 @@ use crate::traft::RaftId; use crate::traft::Result; use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::rc::Rc; @@ -550,6 +550,12 @@ impl Clusterwide { pub fn apply_snapshot_data(&self, data: &SnapshotData, is_master: bool) -> Result<()> { debug_assert!(unsafe { ::tarantool::ffi::tarantool::box_txn() }); + // We need to save these before truncating _pico_space. + let mut old_space_versions: HashMap<SpaceId, u64> = HashMap::new(); + for space_def in self.spaces.iter()? { + old_space_versions.insert(space_def.id, space_def.schema_version); + } + let mut dont_exist_yet = Vec::new(); for space_dump in &data.space_dumps { let space_name = &space_dump.space_name; @@ -567,7 +573,7 @@ impl Clusterwide { // If we're not the replication master, the rest of the data will come // via tarantool replication. if is_master { - self.apply_ddl_changes_on_replicaset_master()?; + self.apply_ddl_changes_on_replicaset_master(&old_space_versions)?; set_local_schema_version(data.schema_version)?; } @@ -590,8 +596,59 @@ impl Clusterwide { Ok(()) } - pub fn apply_ddl_changes_on_replicaset_master(&self) -> traft::Result<()> { + pub fn apply_ddl_changes_on_replicaset_master( + &self, + old_space_versions: &HashMap<SpaceId, u64>, + ) -> traft::Result<()> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); + + let mut space_defs = Vec::new(); + let mut new_space_ids = HashSet::new(); for space_def in self.spaces.iter()? { + new_space_ids.insert(space_def.id); + space_defs.push(space_def); + } + + // First we drop all the spaces which have been dropped. + for &old_space_id in old_space_versions.keys() { + if new_space_ids.contains(&old_space_id) { + // Will be handled later. + continue; + } + + // Space was dropped. + let abort_reason = ddl_drop_space_on_master(old_space_id)?; + if let Some(e) = abort_reason { + return Err(Error::other(format!( + "failed to drop space {old_space_id}: {e}" + ))); + } + } + + // Now create any new spaces, or replace ones that changed. + for space_def in &space_defs { + let space_id = space_def.id; + + if let Some(&v_old) = old_space_versions.get(&space_id) { + let v_new = space_def.schema_version; + assert!(v_old <= v_new); + + if v_old == v_new { + // Space is up to date. + continue; + } + + // Space was updated, need to drop it and recreate. + let abort_reason = ddl_drop_space_on_master(space_def.id)?; + if let Some(e) = abort_reason { + return Err(Error::other(format!( + "failed to drop space {space_id}: {e}" + ))); + } + } else { + // New space. + } + if !space_def.operable { // If it so happens, that we receive an unfinished schema change via snapshot, // which will somehow get finished without the governor sending us @@ -610,9 +667,6 @@ impl Clusterwide { // TODO: secondary indexes - // TODO: check if a space exists here in box.space._space, but doesn't - // exist in pico._space, then delete it - Ok(()) } -- GitLab From 79267b4e5cec3eed14d903d0236384441cda384d Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Mon, 5 Jun 2023 10:17:25 +0300 Subject: [PATCH 5/6] refactor: extract function ddl_meta_space_update_operable --- src/storage.rs | 23 ++++++++++++++++++++ src/traft/node.rs | 53 +++++++---------------------------------------- 2 files changed, 30 insertions(+), 46 deletions(-) diff --git a/src/storage.rs b/src/storage.rs index 7b65f2aa05..f9bd521c9e 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1670,6 +1670,29 @@ impl ToEntryIter for Indexes { } } +//////////////////////////////////////////////////////////////////////////////// +// ddl meta +//////////////////////////////////////////////////////////////////////////////// + +/// Updates the field `"operable"` for a space with id `space_id` and any +/// necessary entities (currently all existing indexes). +/// +/// This function is called when applying the different ddl operations. +pub fn ddl_meta_space_update_operable( + storage: &Clusterwide, + space_id: SpaceId, + operable: bool, +) -> traft::Result<()> { + storage.spaces.update_operable(space_id, operable)?; + let iter = storage.indexes.by_space_id(space_id)?; + for index in iter { + storage + .indexes + .update_operable(index.space_id, index.id, operable)?; + } + Ok(()) +} + //////////////////////////////////////////////////////////////////////////////// // ddl //////////////////////////////////////////////////////////////////////////////// diff --git a/src/traft/node.rs b/src/traft/node.rs index 54b666511f..dba0110e3c 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -13,10 +13,10 @@ use crate::loop_start; use crate::r#loop::FlowControl; use crate::rpc; use crate::schema::{Distribution, IndexDef, SpaceDef}; -use crate::storage::ddl_abort_on_master; use crate::storage::local_schema_version; use crate::storage::SnapshotData; use crate::storage::ToEntryIter as _; +use crate::storage::{ddl_abort_on_master, ddl_meta_space_update_operable}; use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName}; use crate::stringify_cfunc; use crate::sync; @@ -881,21 +881,8 @@ impl NodeImpl { // Update pico metadata. match ddl { Ddl::CreateSpace { id, .. } => { - self.storage - .spaces - .update_operable(id, true) - .expect("storage error"); - self.storage - .indexes - .update_operable(id, 0, true) - .expect("storage error"); - // For now we just assume that during space creation index with id 1 - // exists if and only if it is a bucket_id index. - let res = self.storage.indexes.update_operable(id, 1, true); - // TODO: maybe we should first check if this index - // exists or check the space definition if this should - // be done, but for now we just ignore the error "no such index" - let _ = res; + ddl_meta_space_update_operable(&self.storage, id, true) + .expect("storage shouldn't fail"); } Ddl::DropSpace { id } => { @@ -962,21 +949,8 @@ impl NodeImpl { } Ddl::DropSpace { id } => { - self.storage - .spaces - .update_operable(id, true) - .expect("storage should never fail"); - let iter = self - .storage - .indexes - .by_space_id(id) - .expect("storage should never fail"); - for index in iter { - self.storage - .indexes - .update_operable(index.space_id, index.id, true) - .expect("storage should never fail"); - } + ddl_meta_space_update_operable(&self.storage, id, true) + .expect("storage shouldn't fail"); } _ => { @@ -1159,21 +1133,8 @@ impl NodeImpl { } Ddl::DropSpace { id } => { - self.storage - .spaces - .update_operable(id, false) - .expect("storage should never fail"); - let iter = self - .storage - .indexes - .by_space_id(id) - .expect("storage should never fail"); - for index in iter { - self.storage - .indexes - .update_operable(index.space_id, index.id, false) - .expect("storage should never fail"); - } + ddl_meta_space_update_operable(&self.storage, id, false) + .expect("storage shouldn't fail"); } Ddl::DropIndex { index_id, space_id } => { -- GitLab From 58b16cb5958c31df3289893e7e419e4514fc341a Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Mon, 5 Jun 2023 10:22:53 +0300 Subject: [PATCH 6/6] fix: ddl abort for create sharded space --- src/storage.rs | 12 ++++++++ src/traft/node.rs | 20 ++------------ test/int/test_ddl.py | 66 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 17 deletions(-) diff --git a/src/storage.rs b/src/storage.rs index f9bd521c9e..21bc0c4163 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1693,6 +1693,18 @@ pub fn ddl_meta_space_update_operable( Ok(()) } +/// Deletes the picodata internal metadata for a space with id `space_id`. +/// +/// This function is called when applying the different ddl operations. +pub fn ddl_meta_drop_space(storage: &Clusterwide, space_id: SpaceId) -> traft::Result<()> { + storage.spaces.delete(space_id)?; + let iter = storage.indexes.by_space_id(space_id)?; + for index in iter { + storage.indexes.delete(index.space_id, index.id)?; + } + Ok(()) +} + //////////////////////////////////////////////////////////////////////////////// // ddl //////////////////////////////////////////////////////////////////////////////// diff --git a/src/traft/node.rs b/src/traft/node.rs index dba0110e3c..738fbb2a78 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -13,6 +13,7 @@ use crate::loop_start; use crate::r#loop::FlowControl; use crate::rpc; use crate::schema::{Distribution, IndexDef, SpaceDef}; +use crate::storage::ddl_meta_drop_space; use crate::storage::local_schema_version; use crate::storage::SnapshotData; use crate::storage::ToEntryIter as _; @@ -886,21 +887,7 @@ impl NodeImpl { } Ddl::DropSpace { id } => { - self.storage - .spaces - .delete(id) - .expect("storage should never fail"); - let iter = self - .storage - .indexes - .by_space_id(id) - .expect("storage should never fail"); - for index in iter { - self.storage - .indexes - .delete(index.space_id, index.id) - .expect("storage should never fail"); - } + ddl_meta_drop_space(&self.storage, id).expect("storage shouldn't fail"); } _ => { @@ -944,8 +931,7 @@ impl NodeImpl { // Update pico metadata. match ddl { Ddl::CreateSpace { id, .. } => { - self.storage.indexes.delete(id, 0).expect("storage error"); - self.storage.spaces.delete(id).expect("storage error"); + ddl_meta_drop_space(&self.storage, id).expect("storage shouldn't fail"); } Ddl::DropSpace { id } => { diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py index 87ae1e3e5e..79c7486e2d 100644 --- a/test/int/test_ddl.py +++ b/test/int/test_ddl.py @@ -429,6 +429,72 @@ def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster): assert i.eval("return box.space._pico_space:get(...).operable", space_id) +################################################################################ +def test_ddl_create_space_abort(cluster: Cluster): + i1, i2, i3 = cluster.deploy(instance_count=3, init_replication_factor=1) + + # Create a conflict to force ddl abort. + space_name = "space_name_conflict" + i3.eval("box.schema.space.create(...)", space_name) + + # Terminate i3 so that other instances actually partially apply the ddl. + i3.terminate() + + # Initiate ddl create space. + space_id = 887 + index_fin = i1.propose_create_space( + dict( + id=space_id, + name=space_name, + format=[ + dict(name="id", type="unsigned", is_nullable=False), + ], + primary_key=[dict(field="id")], + distribution=dict( + kind="sharded_implicitly", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ), + wait_index=False, + ) + + index_prepare = index_fin - 1 + i1.raft_wait_index(index_prepare) + i2.raft_wait_index(index_prepare) + + def get_index_names(i, space_id): + return i.eval( + """ + local space_id = ... + local res = box.space._pico_index:select({space_id}) + for i, t in ipairs(res) do + res[i] = t.name + end + return res + """ + ) + + assert i1.call("box.space._space:get", space_id) is not None + assert get_index_names(i1, space_id) == ["primary_key", "bucket_id"] + assert i2.call("box.space._space:get", space_id) is not None + assert get_index_names(i2, space_id) == ["primary_key", "bucket_id"] + + # Wake the instance so that governor finds out there's a conflict + # and aborts the ddl op. + i3.start() + i3.wait_online() + + # Everything was cleaned up. + assert i1.call("box.space._space:get", space_id) is None + assert i2.call("box.space._space:get", space_id) is None + assert i3.call("box.space._space:get", space_id) is None + + assert get_index_names(i1, space_id) == [] + assert get_index_names(i2, space_id) == [] + assert get_index_names(i3, space_id) == [] + + ################################################################################ def test_ddl_create_space_partial_failure(cluster: Cluster): # i2 & i3 are for quorum -- GitLab